1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2008 Sun Microsystems
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; version 2 of the License.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20
#include <drizzled/server_includes.h>
23
#include "rpl_filter.h"
24
#include "rpl_utility.h"
25
#include "rpl_record.h"
26
#include <mysys/my_dir.h>
27
#include <drizzled/error.h>
28
#include <libdrizzle/pack.h>
32
#include <mysys/base64.h>
33
#include <mysys/my_bitmap.h>
35
#include <drizzled/gettext.h>
36
#include <libdrizzle/libdrizzle.h>
37
#include <drizzled/error.h>
38
#include <drizzled/query_id.h>
39
#include <drizzled/tztime.h>
40
#include <drizzled/slave.h>
43
static const char *HA_ERR(int i)
46
case HA_ERR_KEY_NOT_FOUND: return "HA_ERR_KEY_NOT_FOUND";
47
case HA_ERR_FOUND_DUPP_KEY: return "HA_ERR_FOUND_DUPP_KEY";
48
case HA_ERR_RECORD_CHANGED: return "HA_ERR_RECORD_CHANGED";
49
case HA_ERR_WRONG_INDEX: return "HA_ERR_WRONG_INDEX";
50
case HA_ERR_CRASHED: return "HA_ERR_CRASHED";
51
case HA_ERR_WRONG_IN_RECORD: return "HA_ERR_WRONG_IN_RECORD";
52
case HA_ERR_OUT_OF_MEM: return "HA_ERR_OUT_OF_MEM";
53
case HA_ERR_NOT_A_TABLE: return "HA_ERR_NOT_A_TABLE";
54
case HA_ERR_WRONG_COMMAND: return "HA_ERR_WRONG_COMMAND";
55
case HA_ERR_OLD_FILE: return "HA_ERR_OLD_FILE";
56
case HA_ERR_NO_ACTIVE_RECORD: return "HA_ERR_NO_ACTIVE_RECORD";
57
case HA_ERR_RECORD_DELETED: return "HA_ERR_RECORD_DELETED";
58
case HA_ERR_RECORD_FILE_FULL: return "HA_ERR_RECORD_FILE_FULL";
59
case HA_ERR_INDEX_FILE_FULL: return "HA_ERR_INDEX_FILE_FULL";
60
case HA_ERR_END_OF_FILE: return "HA_ERR_END_OF_FILE";
61
case HA_ERR_UNSUPPORTED: return "HA_ERR_UNSUPPORTED";
62
case HA_ERR_TO_BIG_ROW: return "HA_ERR_TO_BIG_ROW";
63
case HA_WRONG_CREATE_OPTION: return "HA_WRONG_CREATE_OPTION";
64
case HA_ERR_FOUND_DUPP_UNIQUE: return "HA_ERR_FOUND_DUPP_UNIQUE";
65
case HA_ERR_UNKNOWN_CHARSET: return "HA_ERR_UNKNOWN_CHARSET";
66
case HA_ERR_WRONG_MRG_TABLE_DEF: return "HA_ERR_WRONG_MRG_TABLE_DEF";
67
case HA_ERR_CRASHED_ON_REPAIR: return "HA_ERR_CRASHED_ON_REPAIR";
68
case HA_ERR_CRASHED_ON_USAGE: return "HA_ERR_CRASHED_ON_USAGE";
69
case HA_ERR_LOCK_WAIT_TIMEOUT: return "HA_ERR_LOCK_WAIT_TIMEOUT";
70
case HA_ERR_LOCK_TABLE_FULL: return "HA_ERR_LOCK_TABLE_FULL";
71
case HA_ERR_READ_ONLY_TRANSACTION: return "HA_ERR_READ_ONLY_TRANSACTION";
72
case HA_ERR_LOCK_DEADLOCK: return "HA_ERR_LOCK_DEADLOCK";
73
case HA_ERR_CANNOT_ADD_FOREIGN: return "HA_ERR_CANNOT_ADD_FOREIGN";
74
case HA_ERR_NO_REFERENCED_ROW: return "HA_ERR_NO_REFERENCED_ROW";
75
case HA_ERR_ROW_IS_REFERENCED: return "HA_ERR_ROW_IS_REFERENCED";
76
case HA_ERR_NO_SAVEPOINT: return "HA_ERR_NO_SAVEPOINT";
77
case HA_ERR_NON_UNIQUE_BLOCK_SIZE: return "HA_ERR_NON_UNIQUE_BLOCK_SIZE";
78
case HA_ERR_NO_SUCH_TABLE: return "HA_ERR_NO_SUCH_TABLE";
79
case HA_ERR_TABLE_EXIST: return "HA_ERR_TABLE_EXIST";
80
case HA_ERR_NO_CONNECTION: return "HA_ERR_NO_CONNECTION";
81
case HA_ERR_NULL_IN_SPATIAL: return "HA_ERR_NULL_IN_SPATIAL";
82
case HA_ERR_TABLE_DEF_CHANGED: return "HA_ERR_TABLE_DEF_CHANGED";
83
case HA_ERR_NO_PARTITION_FOUND: return "HA_ERR_NO_PARTITION_FOUND";
84
case HA_ERR_RBR_LOGGING_FAILED: return "HA_ERR_RBR_LOGGING_FAILED";
85
case HA_ERR_DROP_INDEX_FK: return "HA_ERR_DROP_INDEX_FK";
86
case HA_ERR_FOREIGN_DUPLICATE_KEY: return "HA_ERR_FOREIGN_DUPLICATE_KEY";
87
case HA_ERR_TABLE_NEEDS_UPGRADE: return "HA_ERR_TABLE_NEEDS_UPGRADE";
88
case HA_ERR_TABLE_READONLY: return "HA_ERR_TABLE_READONLY";
89
case HA_ERR_AUTOINC_READ_FAILED: return "HA_ERR_AUTOINC_READ_FAILED";
90
case HA_ERR_AUTOINC_ERANGE: return "HA_ERR_AUTOINC_ERANGE";
91
case HA_ERR_GENERIC: return "HA_ERR_GENERIC";
92
case HA_ERR_RECORD_IS_THE_SAME: return "HA_ERR_RECORD_IS_THE_SAME";
93
case HA_ERR_LOGGING_IMPOSSIBLE: return "HA_ERR_LOGGING_IMPOSSIBLE";
94
case HA_ERR_CORRUPT_EVENT: return "HA_ERR_CORRUPT_EVENT";
95
case HA_ERR_ROWS_EVENT_APPLY : return "HA_ERR_ROWS_EVENT_APPLY";
101
Error reporting facility for Rows_log_event::do_apply_event
103
@param level error, warning or info
104
@param ha_error HA_ERR_ code
105
@param rli pointer to the active Relay_log_info instance
106
@param session pointer to the slave thread's session
107
@param table pointer to the event's table object
108
@param type the type of the event
109
@param log_name the master binlog file name
110
@param pos the master binlog file pos (the next after the event)
113
static void inline slave_rows_error_report(enum loglevel level, int ha_error,
114
Relay_log_info const *rli, Session *session,
115
Table *table, const char * type,
116
const char *log_name, ulong pos)
118
const char *handler_error= HA_ERR(ha_error);
119
char buff[MAX_SLAVE_ERRMSG], *slider;
120
const char *buff_end= buff + sizeof(buff);
122
List_iterator_fast<DRIZZLE_ERROR> it(session->warn_list);
126
for (err= it++, slider= buff; err && slider < buff_end - 1;
127
slider += len, err= it++)
129
len= snprintf(slider, buff_end - slider,
130
_(" %s, Error_code: %d;"), err->msg, err->code);
133
rli->report(level, session->is_error()? session->main_da.sql_errno() : 0,
134
_("Could not execute %s event on table %s.%s;"
135
"%s handler error %s; "
136
"the event's master log %s, end_log_pos %lu"),
137
type, table->s->db.str,
138
table->s->table_name.str,
140
handler_error == NULL? _("<unknown>") : handler_error,
146
Cache that will automatically be written to a dedicated file on
152
class Write_on_release_cache
160
typedef unsigned short flag_set;
166
Write_on_release_cache
167
cache Pointer to cache to use
168
file File to write cache to upon destruction
169
flags Flags for the cache
173
Class used to guarantee copy of cache to file before exiting the
174
current block. On successful copy of the cache, the cache will
175
be reinited as a WRITE_CACHE.
177
Currently, a pointer to the cache is provided in the
178
constructor, but it would be possible to create a subclass
179
holding the IO_CACHE itself.
181
Write_on_release_cache(IO_CACHE *cache, FILE *file, flag_set flags = 0)
182
: m_cache(cache), m_file(file), m_flags(flags)
184
reinit_io_cache(m_cache, WRITE_CACHE, 0L, false, true);
187
~Write_on_release_cache()
189
copy_event_cache_to_file_and_reinit(m_cache, m_file);
190
if (m_flags | FLUSH_F)
195
Return a pointer to the internal IO_CACHE.
202
Function to return a pointer to the internal cache, so that the
203
object can be treated as a IO_CACHE and used with the my_b_*
207
A pointer to the internal IO_CACHE.
209
IO_CACHE *operator&()
215
// Hidden, to prevent usage.
216
Write_on_release_cache(Write_on_release_cache const&);
223
uint32_t debug_not_change_ts_if_art_event= 1; // bug#29309 simulation
229
static void clear_all_errors(Session *session, Relay_log_info *rli)
231
session->is_slave_error = 0;
232
session->clear_error();
238
Ignore error code specified on command line.
241
inline int ignored_error_code(int err_code)
243
return ((err_code == ER_SLAVE_IGNORED_TABLE) ||
244
(use_slave_mask && bitmap_is_set(&slave_error_mask, err_code)));
252
static char *pretty_print_str(char *packet, const char *str, int len)
254
const char *end= str + len;
260
switch ((c=*str++)) {
261
case '\n': *pos++= '\\'; *pos++= 'n'; break;
262
case '\r': *pos++= '\\'; *pos++= 'r'; break;
263
case '\\': *pos++= '\\'; *pos++= '\\'; break;
264
case '\b': *pos++= '\\'; *pos++= 'b'; break;
265
case '\t': *pos++= '\\'; *pos++= 't'; break;
266
case '\'': *pos++= '\\'; *pos++= '\''; break;
267
case 0 : *pos++= '\\'; *pos++= '0'; break;
279
Creates a temporary name for load data infile:.
281
@param buf Store new filename here
282
@param file_id File_id (part of file name)
283
@param event_server_id Event_id (part of file name)
284
@param ext Extension for file name
287
Pointer to start of extension
290
static char *slave_load_file_stem(char *buf, uint32_t file_id,
291
int event_server_id, const char *ext)
294
fn_format(buf,"SQL_LOAD-",slave_load_tmpdir, "", MY_UNPACK_FILENAME);
297
buf= strchr(buf, '\0');
298
buf= int10_to_str(::server_id, buf, 10);
300
buf= int10_to_str(event_server_id, buf, 10);
302
res= int10_to_str(file_id, buf, 10);
303
my_stpcpy(res, ext); // Add extension last
304
return res; // Pointer to extension
309
Delete all temporary files used for SQL_LOAD.
312
static void cleanup_load_tmpdir()
317
char fname[FN_REFLEN], prefbuf[31], *p;
319
if (!(dirp=my_dir(slave_load_tmpdir,MYF(MY_WME))))
323
When we are deleting temporary files, we should only remove
324
the files associated with the server id of our server.
325
We don't use event_server_id here because since we've disabled
326
direct binlogging of Create_file/Append_file/Exec_load events
327
we cannot meet Start_log event in the middle of events from one
330
p= strmake(prefbuf, STRING_WITH_LEN("SQL_LOAD-"));
331
p= int10_to_str(::server_id, p, 10);
335
for (i=0 ; i < (uint)dirp->number_off_files; i++)
337
file=dirp->dir_entry+i;
338
if (is_prefix(file->name, prefbuf))
340
fn_format(fname,file->name,slave_load_tmpdir,"",MY_UNPACK_FILENAME);
341
my_delete(fname, MYF(0));
353
static bool write_str(IO_CACHE *file, const char *str, uint32_t length)
355
unsigned char tmp[1];
356
tmp[0]= (unsigned char) length;
357
return (my_b_safe_write(file, tmp, sizeof(tmp)) ||
358
my_b_safe_write(file, (unsigned char*) str, length));
366
static inline int read_str(const char **buf, const char *buf_end,
367
const char **str, uint8_t *len)
369
if (*buf + ((uint) (unsigned char) **buf) >= buf_end)
371
*len= (uint8_t) **buf;
373
(*buf)+= (uint) *len+1;
379
Transforms a string into "" or its expression in 0x... form.
382
char *str_to_hex(char *to, const char *from, uint32_t len)
388
to= octet2hex(to, from, len);
391
to= my_stpcpy(to, "\"\"");
392
return to; // pointer to end 0 of 'to'
397
Append a version of the 'from' string suitable for use in a query to
398
the 'to' string. To generate a correct escaping, the character set
399
information in 'csinfo' is used.
403
append_query_string(const CHARSET_INFO * const csinfo,
404
String const *from, String *to)
407
uint32_t const orig_len= to->length();
408
if (to->reserve(orig_len + from->length()*2+3))
411
beg= to->c_ptr_quick() + to->length();
413
if (csinfo->escape_with_backslash_is_dangerous)
414
ptr= str_to_hex(ptr, from->ptr(), from->length());
418
ptr+= drizzle_escape_string(ptr, from->ptr(), from->length());
421
to->length(orig_len + ptr - beg);
426
/**************************************************************************
427
Log_event methods (= the parent class of all events)
428
**************************************************************************/
432
returns the human readable name of the event's type
435
const char* Log_event::get_type_str(Log_event_type type)
438
case START_EVENT_V3: return "Start_v3";
439
case STOP_EVENT: return "Stop";
440
case QUERY_EVENT: return "Query";
441
case ROTATE_EVENT: return "Rotate";
442
case INTVAR_EVENT: return "Intvar";
443
case LOAD_EVENT: return "Load";
444
case NEW_LOAD_EVENT: return "New_load";
445
case SLAVE_EVENT: return "Slave";
446
case CREATE_FILE_EVENT: return "Create_file";
447
case APPEND_BLOCK_EVENT: return "Append_block";
448
case DELETE_FILE_EVENT: return "Delete_file";
449
case EXEC_LOAD_EVENT: return "Exec_load";
450
case RAND_EVENT: return "RAND";
451
case XID_EVENT: return "Xid";
452
case USER_VAR_EVENT: return "User var";
453
case FORMAT_DESCRIPTION_EVENT: return "Format_desc";
454
case TABLE_MAP_EVENT: return "Table_map";
455
case PRE_GA_WRITE_ROWS_EVENT: return "Write_rows_event_old";
456
case PRE_GA_UPDATE_ROWS_EVENT: return "Update_rows_event_old";
457
case PRE_GA_DELETE_ROWS_EVENT: return "Delete_rows_event_old";
458
case WRITE_ROWS_EVENT: return "Write_rows";
459
case UPDATE_ROWS_EVENT: return "Update_rows";
460
case DELETE_ROWS_EVENT: return "Delete_rows";
461
case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query";
462
case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query";
463
case INCIDENT_EVENT: return "Incident";
464
default: return "Unknown"; /* impossible */
468
const char* Log_event::get_type_str()
470
return get_type_str(get_type_code());
475
Log_event::Log_event()
478
Log_event::Log_event(Session* session_arg, uint16_t flags_arg, bool using_trans)
479
:log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), session(session_arg)
481
server_id= session->server_id;
482
when= session->start_time;
483
cache_stmt= using_trans;
488
This minimal constructor is for when you are not even sure that there
489
is a valid Session. For example in the server when we are shutting down or
490
flushing logs after receiving a SIGHUP (then we must write a Rotate to
491
the binlog but we have no Session, so we need this minimal constructor).
494
Log_event::Log_event()
495
:temp_buf(0), exec_time(0), flags(0), cache_stmt(0),
498
server_id= ::server_id;
500
We can't call my_time() here as this would cause a call before
509
Log_event::Log_event()
512
Log_event::Log_event(const char* buf,
513
const Format_description_log_event* description_event)
514
:temp_buf(0), cache_stmt(0)
517
when= uint4korr(buf);
518
server_id= uint4korr(buf + SERVER_ID_OFFSET);
519
data_written= uint4korr(buf + EVENT_LEN_OFFSET);
520
if (description_event->binlog_version==1)
527
log_pos= uint4korr(buf + LOG_POS_OFFSET);
529
If the log is 4.0 (so here it can only be a 4.0 relay log read by
530
the SQL thread or a 4.0 master binlog read by the I/O thread),
531
log_pos is the beginning of the event: we transform it into the end
532
of the event, which is more useful.
533
But how do you know that the log is 4.0: you know it if
534
description_event is version 3 *and* you are not reading a
535
Format_desc (remember that mysqlbinlog starts by assuming that 5.0
536
logs are in 4.0 format, until it finds a Format_desc).
538
if (description_event->binlog_version==3 &&
539
buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos)
542
If log_pos=0, don't change it. log_pos==0 is a marker to mean
543
"don't change rli->group_master_log_pos" (see
544
inc_group_relay_log_pos()). As it is unreal log_pos, adding the
545
event len's is nonsense. For example, a fake Rotate event should
546
not have its log_pos (which is 0) changed or it will modify
547
Exec_master_log_pos in SHOW SLAVE STATUS, displaying a nonsense
548
value of (a non-zero offset which does not exist in the master's
549
binlog, so which will cause problems if the user uses this value
552
log_pos+= data_written; /* purecov: inspected */
555
flags= uint2korr(buf + FLAGS_OFFSET);
556
if ((buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) ||
557
(buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT))
560
These events always have a header which stops here (i.e. their
564
Initialization to zero of all other Log_event members as they're
565
not specified. Currently there are no such members; in the future
566
there will be an event UID (but Format_description and Rotate
567
don't need this UID, as they are not propagated through
568
--log-slave-updates (remember the UID is used to not play a query
569
twice when you have two masters which are slaves of a 3rd master).
574
/* otherwise, go on with reading the header from buf (nothing now) */
578
int Log_event::do_update_pos(Relay_log_info *rli)
581
rli is null when (as far as I (Guilhem) know) the caller is
582
Load_log_event::do_apply_event *and* that one is called from
583
Execute_load_log_event::do_apply_event. In this case, we don't
584
do anything here ; Execute_load_log_event::do_apply_event will
585
call Log_event::do_apply_event again later with the proper rli.
586
Strictly speaking, if we were sure that rli is null only in the
587
case discussed above, 'if (rli)' is useless here. But as we are
588
not 100% sure, keep it for now.
590
Matz: I don't think we will need this check with this refactoring.
595
bug#29309 simulation: resetting the flag to force
596
wrong behaviour of artificial event to update
597
rli->last_master_timestamp for only one time -
598
the first FLUSH LOGS in the test.
600
if (debug_not_change_ts_if_art_event == 1
601
&& is_artificial_event())
602
debug_not_change_ts_if_art_event= 0;
603
rli->stmt_done(log_pos,
604
is_artificial_event() &&
605
debug_not_change_ts_if_art_event > 0 ? 0 : when);
606
if (debug_not_change_ts_if_art_event == 0)
607
debug_not_change_ts_if_art_event= 2;
609
return 0; // Cannot fail currently
613
Log_event::enum_skip_reason
614
Log_event::do_shall_skip(Relay_log_info *rli)
616
if ((server_id == ::server_id && !rli->replicate_same_server_id) || (rli->slave_skip_counter == 1 && rli->is_in_group()))
617
return EVENT_SKIP_IGNORE;
618
else if (rli->slave_skip_counter > 0)
619
return EVENT_SKIP_COUNT;
621
return EVENT_SKIP_NOT;
626
Log_event::pack_info()
629
void Log_event::pack_info(Protocol *protocol)
631
protocol->store("", &my_charset_bin);
636
init_show_field_list() prepares the column names and types for the
637
output of SHOW BINLOG EVENTS; it is used only by SHOW BINLOG
641
void Log_event::init_show_field_list(List<Item>* field_list)
643
field_list->push_back(new Item_empty_string("Log_name", 20));
644
field_list->push_back(new Item_return_int("Pos", MY_INT32_NUM_DECIMAL_DIGITS,
645
DRIZZLE_TYPE_LONGLONG));
646
field_list->push_back(new Item_empty_string("Event_type", 20));
647
field_list->push_back(new Item_return_int("Server_id", 10,
649
field_list->push_back(new Item_return_int("End_log_pos",
650
MY_INT32_NUM_DECIMAL_DIGITS,
651
DRIZZLE_TYPE_LONGLONG));
652
field_list->push_back(new Item_empty_string("Info", 20));
659
bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
661
unsigned char header[LOG_EVENT_HEADER_LEN];
664
/* Store number of bytes that will be written by this event */
665
data_written= event_data_length + sizeof(header);
668
log_pos != 0 if this is relay-log event. In this case we should not
672
if (is_artificial_event())
675
We should not do any cleanup on slave when reading this. We
676
mark this by setting log_pos to 0. Start_log_event_v3() will
677
detect this on reading and set artificial_event=1 for the event.
684
Calculate position of end of event
686
Note that with a SEQ_READ_APPEND cache, my_b_tell() does not
687
work well. So this will give slightly wrong positions for the
688
Format_desc/Rotate/Stop events which the slave writes to its
689
relay log. For example, the initial Format_desc will have
690
end_log_pos=91 instead of 95. Because after writing the first 4
691
bytes of the relay log, my_b_tell() still reports 0. Because
692
my_b_append() does not update the counter which my_b_tell()
693
later uses (one should probably use my_b_append_tell() to work
694
around this). To get right positions even when writing to the
695
relay log, we use the (new) my_b_safe_tell().
697
Note that this raises a question on the correctness of all these
698
assert(my_b_tell()=rli->event_relay_log_pos).
700
If in a transaction, the log_pos which we calculate below is not
701
very good (because then my_b_safe_tell() returns start position
702
of the BEGIN, so it's like the statement was at the BEGIN's
703
place), but it's not a very serious problem (as the slave, when
704
it is in a transaction, does not take those end_log_pos into
705
account (as it calls inc_event_relay_log_pos()). To be fixed
706
later, so that it looks less strange. But not bug.
709
log_pos= my_b_safe_tell(file)+data_written;
712
now= (ulong) get_time(); // Query start time
715
Header will be of size LOG_EVENT_HEADER_LEN for all events, except for
716
FORMAT_DESCRIPTION_EVENT and ROTATE_EVENT, where it will be
717
LOG_EVENT_MINIMAL_HEADER_LEN (remember these 2 have a frozen header,
718
because we read them before knowing the format).
721
int4store(header, now); // timestamp
722
header[EVENT_TYPE_OFFSET]= get_type_code();
723
int4store(header+ SERVER_ID_OFFSET, server_id);
724
int4store(header+ EVENT_LEN_OFFSET, data_written);
725
int4store(header+ LOG_POS_OFFSET, log_pos);
726
int2store(header+ FLAGS_OFFSET, flags);
728
return(my_b_safe_write(file, header, sizeof(header)) != 0);
733
This needn't be format-tolerant, because we only read
734
LOG_EVENT_MINIMAL_HEADER_LEN (we just want to read the event's length).
737
int Log_event::read_log_event(IO_CACHE* file, String* packet,
738
pthread_mutex_t* log_lock)
742
char buf[LOG_EVENT_MINIMAL_HEADER_LEN];
745
pthread_mutex_lock(log_lock);
746
if (my_b_read(file, (unsigned char*) buf, sizeof(buf)))
749
If the read hits eof, we must report it as eof so the caller
750
will know it can go into cond_wait to be woken up on the next
754
result= LOG_READ_EOF;
756
result= (file->error > 0 ? LOG_READ_TRUNC : LOG_READ_IO);
759
data_len= uint4korr(buf + EVENT_LEN_OFFSET);
760
if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN ||
761
data_len > current_session->variables.max_allowed_packet)
763
result= ((data_len < LOG_EVENT_MINIMAL_HEADER_LEN) ? LOG_READ_BOGUS :
768
/* Append the log event header to packet */
769
if (packet->append(buf, sizeof(buf)))
771
/* Failed to allocate packet */
772
result= LOG_READ_MEM;
775
data_len-= LOG_EVENT_MINIMAL_HEADER_LEN;
778
/* Append rest of event, read directly from file into packet */
779
if (packet->append(file, data_len))
782
Fatal error occured when appending rest of the event
783
to packet, possible failures:
784
1. EOF occured when reading from file, it's really an error
785
as data_len is >=0 there's supposed to be more bytes available.
786
file->error will have been set to number of bytes left to read
787
2. Read was interrupted, file->error would normally be set to -1
788
3. Failed to allocate memory for packet, my_errno
789
will be ENOMEM(file->error shuold be 0, but since the
790
memory allocation occurs before the call to read it might
793
result= (my_errno == ENOMEM ? LOG_READ_MEM :
794
(file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO));
795
/* Implicit goto end; */
801
pthread_mutex_unlock(log_lock);
805
#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock);
806
#define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock);
810
Allocates memory; The caller is responsible for clean-up.
812
Log_event* Log_event::read_log_event(IO_CACHE* file,
813
pthread_mutex_t* log_lock,
814
const Format_description_log_event
817
assert(description_event != 0);
818
char head[LOG_EVENT_MINIMAL_HEADER_LEN];
820
First we only want to read at most LOG_EVENT_MINIMAL_HEADER_LEN, just to
821
check the event for sanity and to know its length; no need to really parse
822
it. We say "at most" because this could be a 3.23 master, which has header
823
of 13 bytes, whereas LOG_EVENT_MINIMAL_HEADER_LEN is 19 bytes (it's
824
"minimal" over the set {MySQL >=4.0}).
826
uint32_t header_size= cmin(description_event->common_header_len,
827
LOG_EVENT_MINIMAL_HEADER_LEN);
830
if (my_b_read(file, (unsigned char *) head, header_size))
834
No error here; it could be that we are at the file's end. However
835
if the next my_b_read() fails (below), it will be an error as we
836
were able to read the first bytes.
840
uint32_t data_len = uint4korr(head + EVENT_LEN_OFFSET);
842
const char *error= 0;
844
#ifndef max_allowed_packet
845
Session *session=current_session;
846
uint32_t max_allowed_packet= session ? session->variables.max_allowed_packet : ~(ulong)0;
849
if (data_len > max_allowed_packet)
851
error = "Event too big";
855
if (data_len < header_size)
857
error = "Event too small";
861
// some events use the extra byte to null-terminate strings
862
if (!(buf = (char*) my_malloc(data_len+1, MYF(MY_WME))))
864
error = "Out of memory";
868
memcpy(buf, head, header_size);
869
if (my_b_read(file, (unsigned char*) buf + header_size, data_len - header_size))
871
error = "read error";
874
if ((res= read_log_event(buf, data_len, &error, description_event)))
875
res->register_temp_buf(buf);
882
sql_print_error(_("Error in Log_event::read_log_event(): "
883
"'%s', data_len: %d, event_type: %d"),
884
error,data_len,head[EVENT_TYPE_OFFSET]);
887
The SQL slave thread will check if file->error<0 to know
888
if there was an I/O error. Even if there is no "low-level" I/O errors
889
with 'file', any of the high-level above errors is worrying
890
enough to stop the SQL thread now ; as we are skipping the current event,
891
going on with reading and successfully executing other events can
892
only corrupt the slave's databases. So stop.
901
Binlog format tolerance is in (buf, event_len, description_event)
905
Log_event* Log_event::read_log_event(const char* buf, uint32_t event_len,
907
const Format_description_log_event *description_event)
910
assert(description_event != 0);
912
/* Check the integrity */
913
if (event_len < EVENT_LEN_OFFSET ||
914
buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT ||
915
(uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET))
917
*error="Sanity check failed"; // Needed to free buffer
918
return(NULL); // general sanity check - will fail on a partial read
921
uint32_t event_type= buf[EVENT_TYPE_OFFSET];
922
if (event_type > description_event->number_of_event_types &&
923
event_type != FORMAT_DESCRIPTION_EVENT)
926
It is unsafe to use the description_event if its post_header_len
927
array does not include the event type.
934
In some previuos versions (see comment in
935
Format_description_log_event::Format_description_log_event(char*,...)),
936
event types were assigned different id numbers than in the
937
present version. In order to replicate from such versions to the
938
present version, we must map those event type id's to our event
939
type id's. The mapping is done with the event_type_permutation
940
array, which was set up when the Format_description_log_event
943
if (description_event->event_type_permutation)
944
event_type= description_event->event_type_permutation[event_type];
948
ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT);
951
ev = new Load_log_event(buf, event_len, description_event);
954
ev = new Load_log_event(buf, event_len, description_event);
957
ev = new Rotate_log_event(buf, event_len, description_event);
959
case CREATE_FILE_EVENT:
960
ev = new Create_file_log_event(buf, event_len, description_event);
962
case APPEND_BLOCK_EVENT:
963
ev = new Append_block_log_event(buf, event_len, description_event);
965
case DELETE_FILE_EVENT:
966
ev = new Delete_file_log_event(buf, event_len, description_event);
968
case EXEC_LOAD_EVENT:
969
ev = new Execute_load_log_event(buf, event_len, description_event);
971
case START_EVENT_V3: /* this is sent only by MySQL <=4.x */
972
ev = new Start_log_event_v3(buf, description_event);
975
ev = new Stop_log_event(buf, description_event);
978
ev = new Intvar_log_event(buf, description_event);
981
ev = new Xid_log_event(buf, description_event);
984
ev = new Rand_log_event(buf, description_event);
987
ev = new User_var_log_event(buf, description_event);
989
case FORMAT_DESCRIPTION_EVENT:
990
ev = new Format_description_log_event(buf, event_len, description_event);
992
case WRITE_ROWS_EVENT:
993
ev = new Write_rows_log_event(buf, event_len, description_event);
995
case UPDATE_ROWS_EVENT:
996
ev = new Update_rows_log_event(buf, event_len, description_event);
998
case DELETE_ROWS_EVENT:
999
ev = new Delete_rows_log_event(buf, event_len, description_event);
1001
case TABLE_MAP_EVENT:
1002
ev = new Table_map_log_event(buf, event_len, description_event);
1004
case BEGIN_LOAD_QUERY_EVENT:
1005
ev = new Begin_load_query_log_event(buf, event_len, description_event);
1007
case EXECUTE_LOAD_QUERY_EVENT:
1008
ev= new Execute_load_query_log_event(buf, event_len, description_event);
1010
case INCIDENT_EVENT:
1011
ev = new Incident_log_event(buf, event_len, description_event);
1020
is_valid() are small event-specific sanity tests which are
1021
important; for example there are some my_malloc() in constructors
1022
(e.g. Query_log_event::Query_log_event(char*...)); when these
1023
my_malloc() fail we can't return an error out of the constructor
1024
(because constructor is "void") ; so instead we leave the pointer we
1025
wanted to allocate (e.g. 'query') to 0 and we test it in is_valid().
1026
Same for Format_description_log_event, member 'post_header_len'.
1028
if (!ev || !ev->is_valid())
1031
*error= "Found invalid event in binary log";
1037
inline Log_event::enum_skip_reason
1038
Log_event::continue_group(Relay_log_info *rli)
1040
if (rli->slave_skip_counter == 1)
1041
return Log_event::EVENT_SKIP_IGNORE;
1042
return Log_event::do_shall_skip(rli);
1045
/**************************************************************************
1046
Query_log_event methods
1047
**************************************************************************/
1050
This (which is used only for SHOW BINLOG EVENTS) could be updated to
1051
print SET @@session_var=. But this is not urgent, as SHOW BINLOG EVENTS is
1052
only an information, it does not produce suitable queries to replay (for
1053
example it does not print LOAD DATA INFILE).
1058
void Query_log_event::pack_info(Protocol *protocol)
1060
// TODO: show the catalog ??
1062
if (!(buf= (char*) my_malloc(9 + db_len + q_len, MYF(MY_WME))))
1065
if (!(flags & LOG_EVENT_SUPPRESS_USE_F)
1068
pos= my_stpcpy(buf, "use `");
1069
memcpy(pos, db, db_len);
1070
pos= my_stpcpy(pos+db_len, "`; ");
1074
memcpy(pos, query, q_len);
1077
protocol->store(buf, pos-buf, &my_charset_bin);
1083
Utility function for the next method (Query_log_event::write()) .
1085
static void write_str_with_code_and_len(char **dst, const char *src,
1086
int len, uint32_t code)
1090
*((*dst)++)= (unsigned char) len;
1091
memcpy(*dst, src, len);
1097
Query_log_event::write().
1100
In this event we have to modify the header to have the correct
1101
EVENT_LEN_OFFSET as we don't yet know how many status variables we
1105
bool Query_log_event::write(IO_CACHE* file)
1108
@todo if catalog can be of length FN_REFLEN==512, then we are not
1109
replicating it correctly, since the length is stored in a byte
1112
unsigned char buf[QUERY_HEADER_LEN+
1113
1+4+ // code of flags2 and flags2
1114
1+8+ // code of sql_mode and sql_mode
1115
1+1+FN_REFLEN+ // code of catalog and catalog length and catalog
1116
1+4+ // code of autoinc and the 2 autoinc variables
1117
1+6+ // code of charset and charset
1118
1+1+MAX_TIME_ZONE_NAME_LENGTH+ // code of tz and tz length and tz name
1119
1+2+ // code of lc_time_names and lc_time_names_number
1120
1+2 // code of charset_database and charset_database_number
1121
], *start, *start_of_status;
1125
return 1; // Something wrong with event
1128
We want to store the thread id:
1129
(- as an information for the user when he reads the binlog)
1130
- if the query uses temporary table: for the slave SQL thread to know to
1131
which master connection the temp table belongs.
1132
Now imagine we (write()) are called by the slave SQL thread (we are
1133
logging a query executed by this thread; the slave runs with
1134
--log-slave-updates). Then this query will be logged with
1135
thread_id=the_thread_id_of_the_SQL_thread. Imagine that 2 temp tables of
1136
the same name were created simultaneously on the master (in the master
1138
CREATE TEMPORARY TABLE t; (thread 1)
1139
CREATE TEMPORARY TABLE t; (thread 2)
1141
then in the slave's binlog there will be
1142
CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
1143
CREATE TEMPORARY TABLE t; (thread_id_of_the_slave_SQL_thread)
1144
which is bad (same thread id!).
1146
To avoid this, we log the thread's thread id EXCEPT for the SQL
1147
slave thread for which we log the original (master's) thread id.
1148
Now this moves the bug: what happens if the thread id on the
1149
master was 10 and when the slave replicates the query, a
1150
connection number 10 is opened by a normal client on the slave,
1151
and updates a temp table of the same name? We get a problem
1152
again. To avoid this, in the handling of temp tables (sql_base.cc)
1153
we use thread_id AND server_id. TODO when this is merged into
1154
4.1: in 4.1, slave_proxy_id has been renamed to pseudo_thread_id
1155
and is a session variable: that's to make mysqlbinlog work with
1156
temp tables. We probably need to introduce
1158
SET PSEUDO_SERVER_ID
1159
for mysqlbinlog in 4.1. mysqlbinlog would print:
1160
SET PSEUDO_SERVER_ID=
1161
SET PSEUDO_THREAD_ID=
1162
for each query using temp tables.
1164
int4store(buf + Q_THREAD_ID_OFFSET, slave_proxy_id);
1165
int4store(buf + Q_EXEC_TIME_OFFSET, exec_time);
1166
buf[Q_DB_LEN_OFFSET] = (char) db_len;
1167
int2store(buf + Q_ERR_CODE_OFFSET, error_code);
1170
You MUST always write status vars in increasing order of code. This
1171
guarantees that a slightly older slave will be able to parse those he
1174
start_of_status= start= buf+QUERY_HEADER_LEN;
1177
*start++= Q_FLAGS2_CODE;
1178
int4store(start, flags2);
1181
if (sql_mode_inited)
1183
*start++= Q_SQL_MODE_CODE;
1184
int8store(start, (uint64_t)sql_mode);
1187
if (catalog_len) // i.e. this var is inited (false for 4.0 events)
1189
write_str_with_code_and_len((char **)(&start),
1190
catalog, catalog_len, Q_CATALOG_NZ_CODE);
1192
In 5.0.x where x<4 masters we used to store the end zero here. This was
1193
a waste of one byte so we don't do it in x>=4 masters. We change code to
1194
Q_CATALOG_NZ_CODE, because re-using the old code would make x<4 slaves
1195
of this x>=4 master segfault (expecting a zero when there is
1196
none). Remaining compatibility problems are: the older slave will not
1197
find the catalog; but it is will not crash, and it's not an issue
1198
that it does not find the catalog as catalogs were not used in these
1199
older MySQL versions (we store it in binlog and read it from relay log
1200
but do nothing useful with it). What is an issue is that the older slave
1201
will stop processing the Q_* blocks (and jumps to the db/query) as soon
1202
as it sees unknown Q_CATALOG_NZ_CODE; so it will not be able to read
1203
Q_AUTO_INCREMENT*, Q_CHARSET and so replication will fail silently in
1204
various ways. Documented that you should not mix alpha/beta versions if
1205
they are not exactly the same version, with example of 5.0.3->5.0.2 and
1206
5.0.4->5.0.3. If replication is from older to new, the new will
1207
recognize Q_CATALOG_CODE and have no problem.
1210
if (auto_increment_increment != 1 || auto_increment_offset != 1)
1212
*start++= Q_AUTO_INCREMENT;
1213
int2store(start, auto_increment_increment);
1214
int2store(start+2, auto_increment_offset);
1219
*start++= Q_CHARSET_CODE;
1220
memcpy(start, charset, 6);
1225
/* In the TZ sys table, column Name is of length 64 so this should be ok */
1226
assert(time_zone_len <= MAX_TIME_ZONE_NAME_LENGTH);
1227
*start++= Q_TIME_ZONE_CODE;
1228
*start++= time_zone_len;
1229
memcpy(start, time_zone_str, time_zone_len);
1230
start+= time_zone_len;
1232
if (lc_time_names_number)
1234
assert(lc_time_names_number <= 0xFFFF);
1235
*start++= Q_LC_TIME_NAMES_CODE;
1236
int2store(start, lc_time_names_number);
1239
if (charset_database_number)
1241
assert(charset_database_number <= 0xFFFF);
1242
*start++= Q_CHARSET_DATABASE_CODE;
1243
int2store(start, charset_database_number);
1247
Here there could be code like
1248
if (command-line-option-which-says-"log_this_variable" && inited)
1250
*start++= Q_THIS_VARIABLE_CODE;
1251
int4store(start, this_variable);
1256
/* Store length of status variables */
1257
status_vars_len= (uint) (start-start_of_status);
1258
assert(status_vars_len <= MAX_SIZE_LOG_EVENT_STATUS);
1259
int2store(buf + Q_STATUS_VARS_LEN_OFFSET, status_vars_len);
1262
Calculate length of whole event
1263
The "1" below is the \0 in the db's length
1265
event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len;
1267
return (write_header(file, event_length) ||
1268
my_b_safe_write(file, (unsigned char*) buf, QUERY_HEADER_LEN) ||
1269
write_post_header_for_derived(file) ||
1270
my_b_safe_write(file, (unsigned char*) start_of_status,
1271
(uint) (start-start_of_status)) ||
1272
my_b_safe_write(file, (db) ? (unsigned char*) db : (unsigned char*)"", db_len + 1) ||
1273
my_b_safe_write(file, (unsigned char*) query, q_len)) ? 1 : 0;
1277
The simplest constructor that could possibly work. This is used for
1278
creating static objects that have a special meaning and are invisible
1281
Query_log_event::Query_log_event()
1282
:Log_event(), data_buf(0)
1289
Query_log_event::Query_log_event()
1290
session_arg - thread handle
1291
query_arg - array of char representing the query
1292
query_length - size of the `query_arg' array
1293
using_trans - there is a modified transactional table
1294
suppress_use - suppress the generation of 'USE' statements
1295
killed_status_arg - an optional with default to Session::KILLED_NO_VALUE
1296
if the value is different from the default, the arg
1297
is set to the current session->killed value.
1298
A caller might need to masquerade session->killed with
1299
Session::NOT_KILLED.
1301
Creates an event for binlogging
1302
The value for local `killed_status' can be supplied by caller.
1304
Query_log_event::Query_log_event(Session* session_arg, const char* query_arg,
1305
ulong query_length, bool using_trans,
1307
Session::killed_state killed_status_arg)
1308
:Log_event(session_arg,
1309
(session_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F :
1311
(suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
1313
data_buf(0), query(query_arg), catalog(session_arg->catalog),
1314
db(session_arg->db), q_len((uint32_t) query_length),
1315
thread_id(session_arg->thread_id),
1316
/* save the original thread id; we already know the server id */
1317
slave_proxy_id(session_arg->variables.pseudo_thread_id),
1318
flags2_inited(1), sql_mode_inited(1), charset_inited(1),
1320
auto_increment_increment(session_arg->variables.auto_increment_increment),
1321
auto_increment_offset(session_arg->variables.auto_increment_offset),
1322
lc_time_names_number(session_arg->variables.lc_time_names->number),
1323
charset_database_number(0)
1327
if (killed_status_arg == Session::KILLED_NO_VALUE)
1328
killed_status_arg= session_arg->killed;
1331
(killed_status_arg == Session::NOT_KILLED) ?
1332
(session_arg->is_error() ? session_arg->main_da.sql_errno() : 0) :
1333
(session_arg->killed_errno());
1336
exec_time = (ulong) (end_time - session_arg->start_time);
1338
@todo this means that if we have no catalog, then it is replicated
1339
as an existing catalog of length zero. is that safe? /sven
1341
catalog_len = (catalog) ? (uint32_t) strlen(catalog) : 0;
1342
/* status_vars_len is set just before writing the event */
1343
db_len = (db) ? (uint32_t) strlen(db) : 0;
1344
if (session_arg->variables.collation_database != session_arg->db_charset)
1345
charset_database_number= session_arg->variables.collation_database->number;
1348
If we don't use flags2 for anything else than options contained in
1349
session_arg->options, it would be more efficient to flags2=session_arg->options
1350
(OPTIONS_WRITTEN_TO_BIN_LOG would be used only at reading time).
1351
But it's likely that we don't want to use 32 bits for 3 bits; in the future
1352
we will probably want to reclaim the 29 bits. So we need the &.
1354
flags2= (uint32_t) (session_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG);
1355
assert(session_arg->variables.character_set_client->number < 256*256);
1356
assert(session_arg->variables.collation_connection->number < 256*256);
1357
assert(session_arg->variables.collation_server->number < 256*256);
1358
assert(session_arg->variables.character_set_client->mbminlen == 1);
1359
int2store(charset, session_arg->variables.character_set_client->number);
1360
int2store(charset+2, session_arg->variables.collation_connection->number);
1361
int2store(charset+4, session_arg->variables.collation_server->number);
1362
if (session_arg->time_zone_used)
1365
Note that our event becomes dependent on the Time_zone object
1366
representing the time zone. Fortunately such objects are never deleted
1367
or changed during mysqld's lifetime.
1369
time_zone_len= session_arg->variables.time_zone->get_name()->length();
1370
time_zone_str= session_arg->variables.time_zone->get_name()->ptr();
1377
/* 2 utility functions for the next method */
1380
Read a string with length from memory.
1382
This function reads the string-with-length stored at
1383
<code>src</code> and extract the length into <code>*len</code> and
1384
a pointer to the start of the string into <code>*dst</code>. The
1385
string can then be copied using <code>memcpy()</code> with the
1386
number of bytes given in <code>*len</code>.
1388
@param src Pointer to variable holding a pointer to the memory to
1389
read the string from.
1390
@param dst Pointer to variable holding a pointer where the actual
1391
string starts. Starting from this position, the string
1392
can be copied using @c memcpy().
1393
@param len Pointer to variable where the length will be stored.
1394
@param end One-past-the-end of the memory where the string is
1397
@return Zero if the entire string can be copied successfully,
1398
@c UINT_MAX if the length could not be read from memory
1399
(that is, if <code>*src >= end</code>), otherwise the
1400
number of bytes that are missing to read the full
1401
string, which happends <code>*dst + *len >= end</code>.
1404
get_str_len_and_pointer(const Log_event::Byte **src,
1407
const Log_event::Byte *end)
1410
return -1; // Will be UINT_MAX in two-complement arithmetics
1411
uint32_t length= **src;
1414
if (*src + length >= end)
1415
return *src + length - end + 1; // Number of bytes missing
1416
*dst= (char *)*src + 1; // Will be copied later
1423
static void copy_str_and_move(const char **src,
1424
Log_event::Byte **dst,
1427
memcpy(*dst, *src, len);
1428
*src= (const char *)*dst;
1435
Macro to check that there is enough space to read from memory.
1437
@param PTR Pointer to memory
1438
@param END End of memory
1439
@param CNT Number of bytes that should be read.
1441
#define CHECK_SPACE(PTR,END,CNT) \
1443
assert((PTR) + (CNT) <= (END)); \
1444
if ((PTR) + (CNT) > (END)) { \
1452
This is used by the SQL slave thread to prepare the event before execution.
1454
Query_log_event::Query_log_event(const char* buf, uint32_t event_len,
1455
const Format_description_log_event
1457
Log_event_type event_type)
1458
:Log_event(buf, description_event), data_buf(0), query(NULL),
1459
db(NULL), catalog_len(0), status_vars_len(0),
1460
flags2_inited(0), sql_mode_inited(0), charset_inited(0),
1461
auto_increment_increment(1), auto_increment_offset(1),
1462
time_zone_len(0), lc_time_names_number(0), charset_database_number(0)
1466
uint8_t common_header_len, post_header_len;
1467
Log_event::Byte *start;
1468
const Log_event::Byte *end;
1471
common_header_len= description_event->common_header_len;
1472
post_header_len= description_event->post_header_len[event_type-1];
1475
We test if the event's length is sensible, and if so we compute data_len.
1476
We cannot rely on QUERY_HEADER_LEN here as it would not be format-tolerant.
1477
We use QUERY_HEADER_MINIMAL_LEN which is the same for 3.23, 4.0 & 5.0.
1479
if (event_len < (uint)(common_header_len + post_header_len))
1481
data_len = event_len - (common_header_len + post_header_len);
1482
buf+= common_header_len;
1484
slave_proxy_id= thread_id = uint4korr(buf + Q_THREAD_ID_OFFSET);
1485
exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET);
1486
db_len = (uint)buf[Q_DB_LEN_OFFSET]; // TODO: add a check of all *_len vars
1487
error_code = uint2korr(buf + Q_ERR_CODE_OFFSET);
1490
5.0 format starts here.
1491
Depending on the format, we may or not have affected/warnings etc
1492
The remnent post-header to be parsed has length:
1494
tmp= post_header_len - QUERY_HEADER_MINIMAL_LEN;
1497
status_vars_len= uint2korr(buf + Q_STATUS_VARS_LEN_OFFSET);
1499
Check if status variable length is corrupt and will lead to very
1500
wrong data. We could be even more strict and require data_len to
1501
be even bigger, but this will suffice to catch most corruption
1502
errors that can lead to a crash.
1504
if (status_vars_len > cmin(data_len, (uint32_t)MAX_SIZE_LOG_EVENT_STATUS))
1509
data_len-= status_vars_len;
1513
We have parsed everything we know in the post header for QUERY_EVENT,
1514
the rest of post header is either comes from older version MySQL or
1515
dedicated to derived events (e.g. Execute_load_query...)
1518
/* variable-part: the status vars; only in MySQL 5.0 */
1520
start= (Log_event::Byte*) (buf+post_header_len);
1521
end= (const Log_event::Byte*) (start+status_vars_len);
1522
for (const Log_event::Byte* pos= start; pos < end;)
1526
CHECK_SPACE(pos, end, 4);
1528
flags2= uint4korr(pos);
1531
case Q_SQL_MODE_CODE:
1533
CHECK_SPACE(pos, end, 8);
1535
sql_mode= (ulong) uint8korr(pos); // QQ: Fix when sql_mode is uint64_t
1539
case Q_CATALOG_NZ_CODE:
1540
if (get_str_len_and_pointer(&pos, &catalog, &catalog_len, end))
1546
case Q_AUTO_INCREMENT:
1547
CHECK_SPACE(pos, end, 4);
1548
auto_increment_increment= uint2korr(pos);
1549
auto_increment_offset= uint2korr(pos+2);
1552
case Q_TIME_ZONE_CODE:
1554
if (get_str_len_and_pointer(&pos, &time_zone_str, &time_zone_len, end))
1561
case Q_CATALOG_CODE: /* for 5.0.x where 0<=x<=3 masters */
1562
CHECK_SPACE(pos, end, 1);
1563
if ((catalog_len= *pos))
1564
catalog= (char*) pos+1; // Will be copied later
1565
CHECK_SPACE(pos, end, catalog_len + 2);
1566
pos+= catalog_len+2; // leap over end 0
1567
catalog_nz= 0; // catalog has end 0 in event
1569
case Q_LC_TIME_NAMES_CODE:
1570
CHECK_SPACE(pos, end, 2);
1571
lc_time_names_number= uint2korr(pos);
1574
case Q_CHARSET_DATABASE_CODE:
1575
CHECK_SPACE(pos, end, 2);
1576
charset_database_number= uint2korr(pos);
1580
/* That's why you must write status vars in growing order of code */
1581
pos= (const unsigned char*) end; // Break loop
1585
if (!(start= data_buf = (Log_event::Byte*) my_malloc(catalog_len + 1 +
1590
if (catalog_len) // If catalog is given
1593
@todo we should clean up and do only copy_str_and_move; it
1594
works for both cases. Then we can remove the catalog_nz
1597
if (likely(catalog_nz)) // true except if event comes from 5.0.0|1|2|3.
1598
copy_str_and_move(&catalog, &start, catalog_len);
1601
memcpy(start, catalog, catalog_len+1); // copy end 0
1602
catalog= (const char *)start;
1603
start+= catalog_len+1;
1607
copy_str_and_move(&time_zone_str, &start, time_zone_len);
1610
if time_zone_len or catalog_len are 0, then time_zone and catalog
1611
are uninitialized at this point. shouldn't they point to the
1612
zero-length null-terminated strings we allocated space for in the
1613
my_alloc call above? /sven
1616
/* A 2nd variable part; this is common to all versions */
1617
memcpy(start, end, data_len); // Copy db and query
1618
start[data_len]= '\0'; // End query with \0 (For safetly)
1620
query= (char *)(start + db_len + 1);
1621
q_len= data_len - db_len -1;
1627
Query_log_event::do_apply_event()
1629
int Query_log_event::do_apply_event(Relay_log_info const *rli)
1631
return do_apply_event(rli, query, q_len);
1637
Compare the values of "affected rows" around here. Something
1640
if ((uint32_t) affected_in_event != (uint32_t) affected_on_slave)
1642
sql_print_error("Slave: did not get the expected number of affected \
1643
rows running query from master - expected %d, got %d (this numbers \
1644
should have matched modulo 4294967296).", 0, ...);
1645
session->query_error = 1;
1648
We may also want an option to tell the slave to ignore "affected"
1649
mismatch. This mismatch could be implemented with a new ER_ code, and
1650
to ignore it you would use --slave-skip-errors...
1652
int Query_log_event::do_apply_event(Relay_log_info const *rli,
1653
const char *query_arg, uint32_t q_len_arg)
1656
int expected_error,actual_error= 0;
1657
Query_id &query_id= Query_id::get_query_id();
1659
Colleagues: please never free(session->catalog) in MySQL. This would
1660
lead to bugs as here session->catalog is a part of an alloced block,
1661
not an entire alloced block (see
1662
Query_log_event::do_apply_event()). Same for session->db. Thank
1665
session->catalog= catalog_len ? (char *) catalog : (char *)"";
1666
new_db.length= db_len;
1667
new_db.str= (char *) rpl_filter->get_rewrite_db(db, &new_db.length);
1668
session->set_db(new_db.str, new_db.length); /* allocates a copy of 'db' */
1669
session->variables.auto_increment_increment= auto_increment_increment;
1670
session->variables.auto_increment_offset= auto_increment_offset;
1673
InnoDB internally stores the master log position it has executed so far,
1674
i.e. the position just after the COMMIT event.
1675
When InnoDB will want to store, the positions in rli won't have
1676
been updated yet, so group_master_log_* will point to old BEGIN
1677
and event_master_log* will point to the beginning of current COMMIT.
1678
But log_pos of the COMMIT Query event is what we want, i.e. the pos of the
1679
END of the current log event (COMMIT). We save it in rli so that InnoDB can
1682
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
1684
clear_all_errors(session, const_cast<Relay_log_info*>(rli));
1685
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
1688
Note: We do not need to execute reset_one_shot_variables() if this
1690
Reason: The db stored in binlog events is the same for SET and for
1691
its companion query. If the SET is ignored because of
1692
db_ok(), the companion query will also be ignored, and if
1693
the companion query is ignored in the db_ok() test of
1694
::do_apply_event(), then the companion SET also have so
1695
we don't need to reset_one_shot_variables().
1697
if (rpl_filter->db_ok(session->db))
1699
session->set_time((time_t)when);
1700
session->query_length= q_len_arg;
1701
session->query= (char*)query_arg;
1702
session->query_id= query_id.next();
1703
session->variables.pseudo_thread_id= thread_id; // for temp tables
1705
if (ignored_error_code((expected_error= error_code)) ||
1706
!check_expected_error(session,rli,expected_error))
1710
all bits of session->options which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG
1711
must take their value from flags2.
1713
session->options= flags2|(session->options & ~OPTIONS_WRITTEN_TO_BIN_LOG);
1716
String tmp(time_zone_str, time_zone_len, &my_charset_bin);
1717
if (!(session->variables.time_zone= my_tz_find(session, &tmp)))
1719
my_error(ER_UNKNOWN_TIME_ZONE, MYF(0), tmp.c_ptr());
1720
session->variables.time_zone= global_system_variables.time_zone;
1721
goto compare_errors;
1724
if (lc_time_names_number)
1726
if (!(session->variables.lc_time_names=
1727
my_locale_by_number(lc_time_names_number)))
1729
my_printf_error(ER_UNKNOWN_ERROR,
1730
"Unknown locale: '%d'", MYF(0), lc_time_names_number);
1731
session->variables.lc_time_names= &my_locale_en_US;
1732
goto compare_errors;
1736
session->variables.lc_time_names= &my_locale_en_US;
1737
if (charset_database_number)
1739
const CHARSET_INFO *cs;
1740
if (!(cs= get_charset(charset_database_number, MYF(0))))
1743
int10_to_str((int) charset_database_number, buf, -10);
1744
my_error(ER_UNKNOWN_COLLATION, MYF(0), buf);
1745
goto compare_errors;
1747
session->variables.collation_database= cs;
1750
session->variables.collation_database= session->db_charset;
1752
/* Execute the query (note that we bypass dispatch_command()) */
1753
const char* found_semicolon= NULL;
1754
mysql_parse(session, session->query, session->query_length, &found_semicolon);
1755
log_slow_statement(session);
1760
The query got a really bad error on the master (thread killed etc),
1761
which could be inconsistent. Parse it to test the table names: if the
1762
replicate-*-do|ignore-table rules say "this query must be ignored" then
1763
we exit gracefully; otherwise we warn about the bad error and tell DBA
1766
if (mysql_test_parse_for_slave(session, session->query, session->query_length))
1767
clear_all_errors(session, const_cast<Relay_log_info*>(rli)); /* Can ignore query */
1770
rli->report(ERROR_LEVEL, expected_error,
1771
_("Query partially completed on the master "
1772
"(error on master: %d) and was aborted. There is a "
1773
"chance that your master is inconsistent at this "
1774
"point. If you are sure that your master is ok, run "
1775
"this query manually on the slave and then restart the "
1776
"slave with SET GLOBAL SQL_SLAVE_SKIP_COUNTER=1; "
1777
"START SLAVE; . Query: '%s'"),
1778
expected_error, session->query);
1779
session->is_slave_error= 1;
1787
If we expected a non-zero error code, and we don't get the same error
1788
code, and none of them should be ignored.
1790
actual_error= session->is_error() ? session->main_da.sql_errno() : 0;
1791
if ((expected_error != actual_error) &&
1793
!ignored_error_code(actual_error) &&
1794
!ignored_error_code(expected_error))
1796
rli->report(ERROR_LEVEL, 0,
1797
_("Query caused differenxt errors on master and slave.\n"
1798
"Error on master: '%s' (%d), Error on slave: '%s' (%d).\n"
1799
"Default database: '%s'. Query: '%s'"),
1802
actual_error ? session->main_da.message() : _("no error"),
1804
print_slave_db_safe(db), query_arg);
1805
session->is_slave_error= 1;
1808
If we get the same error code as expected, or they should be ignored.
1810
else if (expected_error == actual_error ||
1811
ignored_error_code(actual_error))
1813
clear_all_errors(session, const_cast<Relay_log_info*>(rli));
1814
session->killed= Session::NOT_KILLED;
1817
Other cases: mostly we expected no error and get one.
1819
else if (session->is_slave_error || session->is_fatal_error)
1821
rli->report(ERROR_LEVEL, actual_error,
1822
_("Error '%s' on query. Default database: '%s'. Query: '%s'"),
1823
(actual_error ? session->main_da.message() :
1824
_("unexpected success or fatal error")),
1825
print_slave_db_safe(session->db), query_arg);
1826
session->is_slave_error= 1;
1830
TODO: compare the values of "affected rows" around here. Something
1832
if ((uint32_t) affected_in_event != (uint32_t) affected_on_slave)
1834
sql_print_error("Slave: did not get the expected number of affected \
1835
rows running query from master - expected %d, got %d (this numbers \
1836
should have matched modulo 4294967296).", 0, ...);
1837
session->is_slave_error = 1;
1839
We may also want an option to tell the slave to ignore "affected"
1840
mismatch. This mismatch could be implemented with a new ER_ code, and
1841
to ignore it you would use --slave-skip-errors...
1843
To do the comparison we need to know the value of "affected" which the
1844
above mysql_parse() computed. And we need to know the value of
1845
"affected" in the master's binlog. Both will be implemented later. The
1846
important thing is that we now have the format ready to log the values
1847
of "affected" in the binlog. So we can release 5.0.0 before effectively
1848
logging "affected" and effectively comparing it.
1850
} /* End of if (db_ok(... */
1853
pthread_mutex_lock(&LOCK_thread_count);
1855
Probably we have set session->query, session->db, session->catalog to point to places
1856
in the data_buf of this event. Now the event is going to be deleted
1857
probably, so data_buf will be freed, so the session->... listed above will be
1858
pointers to freed memory.
1859
So we must set them to 0, so that those bad pointers values are not later
1860
used. Note that "cleanup" queries like automatic DROP TEMPORARY Table
1861
don't suffer from these assignments to 0 as DROP TEMPORARY
1862
Table uses the db.table syntax.
1864
session->catalog= 0;
1865
session->set_db(NULL, 0); /* will free the current database */
1866
session->query= 0; // just to be sure
1867
session->query_length= 0;
1868
pthread_mutex_unlock(&LOCK_thread_count);
1869
close_thread_tables(session);
1871
As a disk space optimization, future masters will not log an event for
1872
LAST_INSERT_ID() if that function returned 0 (and thus they will be able
1873
to replace the Session::stmt_depends_on_first_successful_insert_id_in_prev_stmt
1874
variable by (Session->first_successful_insert_id_in_prev_stmt > 0) ; with the
1875
resetting below we are ready to support that.
1877
session->first_successful_insert_id_in_prev_stmt_for_binlog= 0;
1878
session->first_successful_insert_id_in_prev_stmt= 0;
1879
session->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
1880
free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
1881
return session->is_slave_error;
1884
int Query_log_event::do_update_pos(Relay_log_info *rli)
1887
Note that we will not increment group* positions if we are just
1888
after a SET ONE_SHOT, because SET ONE_SHOT should not be separated
1889
from its following updating query.
1891
if (session->one_shot_set)
1893
rli->inc_event_relay_log_pos();
1897
return Log_event::do_update_pos(rli);
1901
Log_event::enum_skip_reason
1902
Query_log_event::do_shall_skip(Relay_log_info *rli)
1904
assert(query && q_len > 0);
1906
if (rli->slave_skip_counter > 0)
1908
if (strcmp("BEGIN", query) == 0)
1910
session->options|= OPTION_BEGIN;
1911
return(Log_event::continue_group(rli));
1914
if (strcmp("COMMIT", query) == 0 || strcmp("ROLLBACK", query) == 0)
1916
session->options&= ~OPTION_BEGIN;
1917
return(Log_event::EVENT_SKIP_COUNT);
1920
return(Log_event::do_shall_skip(rli));
1924
/**************************************************************************
1925
Start_log_event_v3 methods
1926
**************************************************************************/
1928
Start_log_event_v3::Start_log_event_v3()
1929
:Log_event(), created(0), binlog_version(BINLOG_VERSION),
1930
artificial_event(0), dont_set_created(0)
1932
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
1936
Start_log_event_v3::pack_info()
1939
void Start_log_event_v3::pack_info(Protocol *protocol)
1941
char buf[12 + ST_SERVER_VER_LEN + 14 + 22], *pos;
1942
pos= my_stpcpy(buf, "Server ver: ");
1943
pos= my_stpcpy(pos, server_version);
1944
pos= my_stpcpy(pos, ", Binlog ver: ");
1945
pos= int10_to_str(binlog_version, pos, 10);
1946
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
1951
Start_log_event_v3::Start_log_event_v3()
1954
Start_log_event_v3::Start_log_event_v3(const char* buf,
1955
const Format_description_log_event
1957
:Log_event(buf, description_event)
1959
buf+= description_event->common_header_len;
1960
binlog_version= uint2korr(buf+ST_BINLOG_VER_OFFSET);
1961
memcpy(server_version, buf+ST_SERVER_VER_OFFSET,
1963
// prevent overrun if log is corrupted on disk
1964
server_version[ST_SERVER_VER_LEN-1]= 0;
1965
created= uint4korr(buf+ST_CREATED_OFFSET);
1966
/* We use log_pos to mark if this was an artificial event or not */
1967
artificial_event= (log_pos == 0);
1968
dont_set_created= 1;
1973
Start_log_event_v3::write()
1976
bool Start_log_event_v3::write(IO_CACHE* file)
1978
char buff[START_V3_HEADER_LEN];
1979
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
1980
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
1981
if (!dont_set_created)
1982
created= when= get_time();
1983
int4store(buff + ST_CREATED_OFFSET,created);
1984
return (write_header(file, sizeof(buff)) ||
1985
my_b_safe_write(file, (unsigned char*) buff, sizeof(buff)));
1990
Start_log_event_v3::do_apply_event() .
1994
- To handle the case where the master died without having time to write
1995
DROP TEMPORARY Table, DO RELEASE_LOCK (prepared statements' deletion is
1996
TODO), we clean up all temporary tables that we got, if we are sure we
2000
- Remove all active user locks.
2001
Guilhem 2003-06: this is true but not urgent: the worst it can cause is
2002
the use of a bit of memory for a user lock which will not be used
2003
anymore. If the user lock is later used, the old one will be released. In
2004
other words, no deadlock problem.
2007
int Start_log_event_v3::do_apply_event(Relay_log_info const *rli)
2009
switch (binlog_version)
2014
This can either be 4.x (then a Start_log_event_v3 is only at master
2015
startup so we are sure the master has restarted and cleared his temp
2016
tables; the event always has 'created'>0) or 5.0 (then we have to test
2021
close_temporary_tables(session);
2022
cleanup_load_tmpdir();
2027
Now the older formats; in that case load_tmpdir is cleaned up by the I/O
2031
if (strncmp(rli->relay_log.description_event_for_exec->server_version,
2032
"3.23.57",7) >= 0 && created)
2035
Can distinguish, based on the value of 'created': this event was
2036
generated at master startup.
2038
close_temporary_tables(session);
2041
Otherwise, can't distinguish a Start_log_event generated at
2042
master startup and one generated by master FLUSH LOGS, so cannot
2043
be sure temp tables have to be dropped. So do nothing.
2047
/* this case is impossible */
2053
/***************************************************************************
2054
Format_description_log_event methods
2055
****************************************************************************/
2058
Format_description_log_event 1st ctor.
2060
Ctor. Can be used to create the event to write to the binary log (when the
2061
server starts or when FLUSH LOGS), or to create artificial events to parse
2062
binlogs from MySQL 3.23 or 4.x.
2063
When in a client, only the 2nd use is possible.
2065
@param binlog_version the binlog version for which we want to build
2066
an event. Can be 1 (=MySQL 3.23), 3 (=4.0.x
2067
x>=2 and 4.1) or 4 (MySQL 5.0). Note that the
2068
old 4.0 (binlog version 2) is not supported;
2069
it should not be used for replication with
2073
Format_description_log_event::
2074
Format_description_log_event(uint8_t binlog_ver, const char* server_ver)
2075
:Start_log_event_v3(), event_type_permutation(0)
2077
binlog_version= binlog_ver;
2078
switch (binlog_ver) {
2079
case 4: /* MySQL 5.0 */
2080
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
2081
common_header_len= LOG_EVENT_HEADER_LEN;
2082
number_of_event_types= LOG_EVENT_TYPES;
2083
/* we'll catch my_malloc() error in is_valid() */
2084
post_header_len=(uint8_t*) my_malloc(number_of_event_types*sizeof(uint8_t),
2087
This long list of assignments is not beautiful, but I see no way to
2088
make it nicer, as the right members are #defines, not array members, so
2089
it's impossible to write a loop.
2091
if (post_header_len)
2093
post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
2094
post_header_len[QUERY_EVENT-1]= QUERY_HEADER_LEN;
2095
post_header_len[ROTATE_EVENT-1]= ROTATE_HEADER_LEN;
2096
post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
2097
post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
2098
post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
2099
post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
2100
post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
2101
post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
2102
post_header_len[FORMAT_DESCRIPTION_EVENT-1]= FORMAT_DESCRIPTION_HEADER_LEN;
2103
post_header_len[TABLE_MAP_EVENT-1]= TABLE_MAP_HEADER_LEN;
2104
post_header_len[WRITE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
2105
post_header_len[UPDATE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
2106
post_header_len[DELETE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
2107
post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= post_header_len[APPEND_BLOCK_EVENT-1];
2108
post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= EXECUTE_LOAD_QUERY_HEADER_LEN;
2109
post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN;
2110
post_header_len[HEARTBEAT_LOG_EVENT-1]= 0;
2115
case 3: /* 4.0.x x>=2 */
2117
We build an artificial (i.e. not sent by the master) event, which
2118
describes what those old master versions send.
2121
my_stpcpy(server_version, server_ver ? server_ver : "3.23");
2123
my_stpcpy(server_version, server_ver ? server_ver : "4.0");
2124
common_header_len= binlog_ver==1 ? OLD_HEADER_LEN :
2125
LOG_EVENT_MINIMAL_HEADER_LEN;
2127
The first new event in binlog version 4 is Format_desc. So any event type
2128
after that does not exist in older versions. We use the events known by
2129
version 3, even if version 1 had only a subset of them (this is not a
2130
problem: it uses a few bytes for nothing but unifies code; it does not
2131
make the slave detect less corruptions).
2133
number_of_event_types= FORMAT_DESCRIPTION_EVENT - 1;
2134
post_header_len=(uint8_t*) my_malloc(number_of_event_types*sizeof(uint8_t),
2136
if (post_header_len)
2138
post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
2139
post_header_len[QUERY_EVENT-1]= QUERY_HEADER_MINIMAL_LEN;
2140
post_header_len[STOP_EVENT-1]= 0;
2141
post_header_len[ROTATE_EVENT-1]= (binlog_ver==1) ? 0 : ROTATE_HEADER_LEN;
2142
post_header_len[INTVAR_EVENT-1]= 0;
2143
post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
2144
post_header_len[SLAVE_EVENT-1]= 0;
2145
post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
2146
post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
2147
post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
2148
post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
2149
post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
2150
post_header_len[RAND_EVENT-1]= 0;
2151
post_header_len[USER_VAR_EVENT-1]= 0;
2154
default: /* Includes binlog version 2 i.e. 4.0.x x<=1 */
2155
post_header_len= 0; /* will make is_valid() fail */
2158
calc_server_version_split();
2163
The problem with this constructor is that the fixed header may have a
2164
length different from this version, but we don't know this length as we
2165
have not read the Format_description_log_event which says it, yet. This
2166
length is in the post-header of the event, but we don't know where the
2169
So this type of event HAS to:
2170
- either have the header's length at the beginning (in the header, at a
2171
fixed position which will never be changed), not in the post-header. That
2172
would make the header be "shifted" compared to other events.
2173
- or have a header of size LOG_EVENT_MINIMAL_HEADER_LEN (19), in all future
2174
versions, so that we know for sure.
2176
I (Guilhem) chose the 2nd solution. Rotate has the same constraint (because
2177
it is sent before Format_description_log_event).
2180
Format_description_log_event::
2181
Format_description_log_event(const char* buf,
2184
Format_description_log_event*
2186
:Start_log_event_v3(buf, description_event), event_type_permutation(0)
2188
buf+= LOG_EVENT_MINIMAL_HEADER_LEN;
2189
if ((common_header_len=buf[ST_COMMON_HEADER_LEN_OFFSET]) < OLD_HEADER_LEN)
2190
return; /* sanity check */
2191
number_of_event_types=
2192
event_len-(LOG_EVENT_MINIMAL_HEADER_LEN+ST_COMMON_HEADER_LEN_OFFSET+1);
2193
/* If alloc fails, we'll detect it in is_valid() */
2194
post_header_len= (uint8_t*) my_memdup((unsigned char*)buf+ST_COMMON_HEADER_LEN_OFFSET+1,
2195
number_of_event_types*
2196
sizeof(*post_header_len), MYF(0));
2197
calc_server_version_split();
2200
In some previous versions, the events were given other event type
2201
id numbers than in the present version. When replicating from such
2202
a version, we therefore set up an array that maps those id numbers
2203
to the id numbers of the present server.
2205
If post_header_len is null, it means malloc failed, and is_valid
2206
will fail, so there is no need to do anything.
2208
The trees in which events have wrong id's are:
2210
mysql-5.1-wl1012.old mysql-5.1-wl2325-5.0-drop6p13-alpha
2211
mysql-5.1-wl2325-5.0-drop6 mysql-5.1-wl2325-5.0
2212
mysql-5.1-wl2325-no-dd
2214
(this was found by grepping for two lines in sequence where the
2215
first matches "FORMAT_DESCRIPTION_EVENT," and the second matches
2216
"TABLE_MAP_EVENT," in log_event.h in all trees)
2218
In these trees, the following server_versions existed since
2219
TABLE_MAP_EVENT was introduced:
2221
5.1.1-a_drop5p3 5.1.1-a_drop5p4 5.1.1-alpha
2222
5.1.2-a_drop5p10 5.1.2-a_drop5p11 5.1.2-a_drop5p12
2223
5.1.2-a_drop5p13 5.1.2-a_drop5p14 5.1.2-a_drop5p15
2224
5.1.2-a_drop5p16 5.1.2-a_drop5p16b 5.1.2-a_drop5p16c
2225
5.1.2-a_drop5p17 5.1.2-a_drop5p4 5.1.2-a_drop5p5
2226
5.1.2-a_drop5p6 5.1.2-a_drop5p7 5.1.2-a_drop5p8
2227
5.1.2-a_drop5p9 5.1.3-a_drop5p17 5.1.3-a_drop5p17b
2228
5.1.3-a_drop5p17c 5.1.4-a_drop5p18 5.1.4-a_drop5p19
2229
5.1.4-a_drop5p20 5.1.4-a_drop6p0 5.1.4-a_drop6p1
2230
5.1.4-a_drop6p2 5.1.5-a_drop5p20 5.2.0-a_drop6p3
2231
5.2.0-a_drop6p4 5.2.0-a_drop6p5 5.2.0-a_drop6p6
2232
5.2.1-a_drop6p10 5.2.1-a_drop6p11 5.2.1-a_drop6p12
2233
5.2.1-a_drop6p6 5.2.1-a_drop6p7 5.2.1-a_drop6p8
2234
5.2.2-a_drop6p13 5.2.2-a_drop6p13-alpha 5.2.2-a_drop6p13b
2237
(this was found by grepping for "mysql," in all historical
2238
versions of configure.in in the trees listed above).
2240
There are 5.1.1-alpha versions that use the new event id's, so we
2241
do not test that version string. So replication from 5.1.1-alpha
2242
with the other event id's to a new version does not work.
2243
Moreover, we can safely ignore the part after drop[56]. This
2244
allows us to simplify the big list above to the following regexes:
2246
5\.1\.[1-5]-a_drop5.*
2248
5\.2\.[0-2]-a_drop6.*
2250
This is what we test for in the 'if' below.
2252
if (post_header_len &&
2253
server_version[0] == '5' && server_version[1] == '.' &&
2254
server_version[3] == '.' &&
2255
strncmp(server_version + 5, "-a_drop", 7) == 0 &&
2256
((server_version[2] == '1' &&
2257
server_version[4] >= '1' && server_version[4] <= '5' &&
2258
server_version[12] == '5') ||
2259
(server_version[2] == '1' &&
2260
server_version[4] == '4' &&
2261
server_version[12] == '6') ||
2262
(server_version[2] == '2' &&
2263
server_version[4] >= '0' && server_version[4] <= '2' &&
2264
server_version[12] == '6')))
2266
if (number_of_event_types != 22)
2268
/* this makes is_valid() return false. */
2269
free(post_header_len);
2270
post_header_len= NULL;
2273
static const uint8_t perm[23]=
2275
UNKNOWN_EVENT, START_EVENT_V3, QUERY_EVENT, STOP_EVENT, ROTATE_EVENT,
2276
INTVAR_EVENT, LOAD_EVENT, SLAVE_EVENT, CREATE_FILE_EVENT,
2277
APPEND_BLOCK_EVENT, EXEC_LOAD_EVENT, DELETE_FILE_EVENT,
2279
RAND_EVENT, USER_VAR_EVENT,
2280
FORMAT_DESCRIPTION_EVENT,
2282
PRE_GA_WRITE_ROWS_EVENT,
2283
PRE_GA_UPDATE_ROWS_EVENT,
2284
PRE_GA_DELETE_ROWS_EVENT,
2286
BEGIN_LOAD_QUERY_EVENT,
2287
EXECUTE_LOAD_QUERY_EVENT,
2289
event_type_permutation= perm;
2291
Since we use (permuted) event id's to index the post_header_len
2292
array, we need to permute the post_header_len array too.
2294
uint8_t post_header_len_temp[23];
2295
for (int i= 1; i < 23; i++)
2296
post_header_len_temp[perm[i] - 1]= post_header_len[i - 1];
2297
for (int i= 0; i < 22; i++)
2298
post_header_len[i] = post_header_len_temp[i];
2303
bool Format_description_log_event::write(IO_CACHE* file)
2306
We don't call Start_log_event_v3::write() because this would make 2
2309
unsigned char buff[FORMAT_DESCRIPTION_HEADER_LEN];
2310
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
2311
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
2312
if (!dont_set_created)
2313
created= when= get_time();
2314
int4store(buff + ST_CREATED_OFFSET,created);
2315
buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN;
2316
memcpy(buff+ST_COMMON_HEADER_LEN_OFFSET+1, post_header_len,
2318
return (write_header(file, sizeof(buff)) ||
2319
my_b_safe_write(file, buff, sizeof(buff)));
2323
int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
2326
As a transaction NEVER spans on 2 or more binlogs:
2327
if we have an active transaction at this point, the master died
2328
while writing the transaction to the binary log, i.e. while
2329
flushing the binlog cache to the binlog. XA guarantees that master has
2330
rolled back. So we roll back.
2331
Note: this event could be sent by the master to inform us of the
2332
format of its binlog; in other words maybe it is not at its
2333
original place when it comes to us; we'll know this by checking
2334
log_pos ("artificial" events have log_pos == 0).
2336
if (!artificial_event && created && session->transaction.all.ha_list)
2338
/* This is not an error (XA is safe), just an information */
2339
rli->report(INFORMATION_LEVEL, 0,
2340
_("Rolling back unfinished transaction (no COMMIT "
2341
"or ROLLBACK in relay log). A probable cause is that "
2342
"the master died while writing the transaction to "
2343
"its binary log, thus rolled back too."));
2344
const_cast<Relay_log_info*>(rli)->cleanup_context(session, 1);
2347
If this event comes from ourselves, there is no cleaning task to
2348
perform, we don't call Start_log_event_v3::do_apply_event()
2349
(this was just to update the log's description event).
2351
if (server_id != ::server_id)
2354
If the event was not requested by the slave i.e. the master sent
2355
it while the slave asked for a position >4, the event will make
2356
rli->group_master_log_pos advance. Say that the slave asked for
2357
position 1000, and the Format_desc event's end is 96. Then in
2358
the beginning of replication rli->group_master_log_pos will be
2359
0, then 96, then jump to first really asked event (which is
2360
>96). So this is ok.
2362
return(Start_log_event_v3::do_apply_event(rli));
2367
int Format_description_log_event::do_update_pos(Relay_log_info *rli)
2369
/* save the information describing this binlog */
2370
delete rli->relay_log.description_event_for_exec;
2371
rli->relay_log.description_event_for_exec= this;
2373
if (server_id == ::server_id)
2376
We only increase the relay log position if we are skipping
2377
events and do not touch any group_* variables, nor flush the
2378
relay log info. If there is a crash, we will have to re-skip
2379
the events again, but that is a minor issue.
2381
If we do not skip stepping the group log position (and the
2382
server id was changed when restarting the server), it might well
2383
be that we start executing at a position that is invalid, e.g.,
2384
at a Rows_log_event or a Query_log_event preceeded by a
2385
Intvar_log_event instead of starting at a Table_map_log_event or
2386
the Intvar_log_event respectively.
2388
rli->inc_event_relay_log_pos();
2393
return Log_event::do_update_pos(rli);
2397
Log_event::enum_skip_reason
2398
Format_description_log_event::do_shall_skip(Relay_log_info *rli __attribute__((unused)))
2400
return Log_event::EVENT_SKIP_NOT;
2405
Splits the event's 'server_version' string into three numeric pieces stored
2406
into 'server_version_split':
2407
X.Y.Zabc (X,Y,Z numbers, a not a digit) -> {X,Y,Z}
2410
'server_version_split' is then used for lookups to find if the server which
2411
created this event has some known bug.
2413
void Format_description_log_event::calc_server_version_split()
2415
char *p= server_version, *r;
2417
for (uint32_t i= 0; i<=2; i++)
2419
number= strtoul(p, &r, 10);
2420
server_version_split[i]= (unsigned char)number;
2421
assert(number < 256); // fit in unsigned char
2423
assert(!((i == 0) && (*r != '.'))); // should be true in practice
2425
p++; // skip the dot
2430
/**************************************************************************
2431
Load_log_event methods
2432
General note about Load_log_event: the binlogging of LOAD DATA INFILE is
2433
going to be changed in 5.0 (or maybe in 5.1; not decided yet).
2434
However, the 5.0 slave could still have to read such events (from a 4.x
2435
master), convert them (which just means maybe expand the header, when 5.0
2436
servers have a UID in events) (remember that whatever is after the header
2437
will be like in 4.x, as this event's format is not modified in 5.0 as we
2438
will use new types of events to log the new LOAD DATA INFILE features).
2439
To be able to read/convert, we just need to not assume that the common
2440
header is of length LOG_EVENT_HEADER_LEN (we must use the description
2442
Note that I (Guilhem) manually tested replication of a big LOAD DATA INFILE
2443
between 3.23 and 5.0, and between 4.0 and 5.0, and it works fine (and the
2444
positions displayed in SHOW SLAVE STATUS then are fine too).
2445
**************************************************************************/
2448
Load_log_event::pack_info()
2451
uint32_t Load_log_event::get_query_buffer_length()
2454
5 + db_len + 3 + // "use DB; "
2455
18 + fname_len + 2 + // "LOAD DATA INFILE 'file''"
2457
9 + // " REPLACE or IGNORE "
2458
13 + table_name_len*2 + // "INTO Table `table`"
2459
21 + sql_ex.field_term_len*4 + 2 + // " FIELDS TERMINATED BY 'str'"
2460
23 + sql_ex.enclosed_len*4 + 2 + // " OPTIONALLY ENCLOSED BY 'str'"
2461
12 + sql_ex.escaped_len*4 + 2 + // " ESCAPED BY 'str'"
2462
21 + sql_ex.line_term_len*4 + 2 + // " LINES TERMINATED BY 'str'"
2463
19 + sql_ex.line_start_len*4 + 2 + // " LINES STARTING BY 'str'"
2464
15 + 22 + // " IGNORE xxx LINES"
2465
3 + (num_fields-1)*2 + field_block_len; // " (field1, field2, ...)"
2469
void Load_log_event::print_query(bool need_db, char *buf,
2470
char **end, char **fn_start, char **fn_end)
2474
if (need_db && db && db_len)
2476
pos= my_stpcpy(pos, "use `");
2477
memcpy(pos, db, db_len);
2478
pos= my_stpcpy(pos+db_len, "`; ");
2481
pos= my_stpcpy(pos, "LOAD DATA ");
2486
if (check_fname_outside_temp_buf())
2487
pos= my_stpcpy(pos, "LOCAL ");
2488
pos= my_stpcpy(pos, "INFILE '");
2489
memcpy(pos, fname, fname_len);
2490
pos= my_stpcpy(pos+fname_len, "' ");
2492
if (sql_ex.opt_flags & REPLACE_FLAG)
2493
pos= my_stpcpy(pos, " REPLACE ");
2494
else if (sql_ex.opt_flags & IGNORE_FLAG)
2495
pos= my_stpcpy(pos, " IGNORE ");
2497
pos= my_stpcpy(pos ,"INTO");
2502
pos= my_stpcpy(pos ," Table `");
2503
memcpy(pos, table_name, table_name_len);
2504
pos+= table_name_len;
2506
/* We have to create all optinal fields as the default is not empty */
2507
pos= my_stpcpy(pos, "` FIELDS TERMINATED BY ");
2508
pos= pretty_print_str(pos, sql_ex.field_term, sql_ex.field_term_len);
2509
if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG)
2510
pos= my_stpcpy(pos, " OPTIONALLY ");
2511
pos= my_stpcpy(pos, " ENCLOSED BY ");
2512
pos= pretty_print_str(pos, sql_ex.enclosed, sql_ex.enclosed_len);
2514
pos= my_stpcpy(pos, " ESCAPED BY ");
2515
pos= pretty_print_str(pos, sql_ex.escaped, sql_ex.escaped_len);
2517
pos= my_stpcpy(pos, " LINES TERMINATED BY ");
2518
pos= pretty_print_str(pos, sql_ex.line_term, sql_ex.line_term_len);
2519
if (sql_ex.line_start_len)
2521
pos= my_stpcpy(pos, " STARTING BY ");
2522
pos= pretty_print_str(pos, sql_ex.line_start, sql_ex.line_start_len);
2525
if ((long) skip_lines > 0)
2527
pos= my_stpcpy(pos, " IGNORE ");
2528
pos= int64_t10_to_str((int64_t) skip_lines, pos, 10);
2529
pos= my_stpcpy(pos," LINES ");
2535
const char *field= fields;
2536
pos= my_stpcpy(pos, " (");
2537
for (i = 0; i < num_fields; i++)
2544
memcpy(pos, field, field_lens[i]);
2545
pos+= field_lens[i];
2546
field+= field_lens[i] + 1;
2555
void Load_log_event::pack_info(Protocol *protocol)
2559
if (!(buf= (char*) my_malloc(get_query_buffer_length(), MYF(MY_WME))))
2561
print_query(true, buf, &end, 0, 0);
2562
protocol->store(buf, end-buf, &my_charset_bin);
2568
Load_log_event::write_data_header()
2571
bool Load_log_event::write_data_header(IO_CACHE* file)
2573
char buf[LOAD_HEADER_LEN];
2574
int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id);
2575
int4store(buf + L_EXEC_TIME_OFFSET, exec_time);
2576
int4store(buf + L_SKIP_LINES_OFFSET, skip_lines);
2577
buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
2578
buf[L_DB_LEN_OFFSET] = (char)db_len;
2579
int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
2580
return my_b_safe_write(file, (unsigned char*)buf, LOAD_HEADER_LEN) != 0;
2585
Load_log_event::write_data_body()
2588
bool Load_log_event::write_data_body(IO_CACHE* file)
2590
if (sql_ex.write_data(file))
2592
if (num_fields && fields && field_lens)
2594
if (my_b_safe_write(file, (unsigned char*)field_lens, num_fields) ||
2595
my_b_safe_write(file, (unsigned char*)fields, field_block_len))
2598
return (my_b_safe_write(file, (unsigned char*)table_name, table_name_len + 1) ||
2599
my_b_safe_write(file, (unsigned char*)db, db_len + 1) ||
2600
my_b_safe_write(file, (unsigned char*)fname, fname_len));
2605
Load_log_event::Load_log_event()
2608
Load_log_event::Load_log_event(Session *session_arg, sql_exchange *ex,
2609
const char *db_arg, const char *table_name_arg,
2610
List<Item> &fields_arg,
2611
enum enum_duplicates handle_dup,
2612
bool ignore, bool using_trans)
2613
:Log_event(session_arg,
2614
session_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0,
2616
thread_id(session_arg->thread_id),
2617
slave_proxy_id(session_arg->variables.pseudo_thread_id),
2618
num_fields(0),fields(0),
2619
field_lens(0),field_block_len(0),
2620
table_name(table_name_arg ? table_name_arg : ""),
2621
db(db_arg), fname(ex->file_name), local_fname(false)
2625
exec_time = (ulong) (end_time - session_arg->start_time);
2626
/* db can never be a zero pointer in 4.0 */
2627
db_len = (uint32_t) strlen(db);
2628
table_name_len = (uint32_t) strlen(table_name);
2629
fname_len = (fname) ? (uint) strlen(fname) : 0;
2630
sql_ex.field_term = (char*) ex->field_term->ptr();
2631
sql_ex.field_term_len = (uint8_t) ex->field_term->length();
2632
sql_ex.enclosed = (char*) ex->enclosed->ptr();
2633
sql_ex.enclosed_len = (uint8_t) ex->enclosed->length();
2634
sql_ex.line_term = (char*) ex->line_term->ptr();
2635
sql_ex.line_term_len = (uint8_t) ex->line_term->length();
2636
sql_ex.line_start = (char*) ex->line_start->ptr();
2637
sql_ex.line_start_len = (uint8_t) ex->line_start->length();
2638
sql_ex.escaped = (char*) ex->escaped->ptr();
2639
sql_ex.escaped_len = (uint8_t) ex->escaped->length();
2640
sql_ex.opt_flags = 0;
2641
sql_ex.cached_new_format = -1;
2644
sql_ex.opt_flags|= DUMPFILE_FLAG;
2645
if (ex->opt_enclosed)
2646
sql_ex.opt_flags|= OPT_ENCLOSED_FLAG;
2648
sql_ex.empty_flags= 0;
2650
switch (handle_dup) {
2652
sql_ex.opt_flags|= REPLACE_FLAG;
2654
case DUP_UPDATE: // Impossible here
2659
sql_ex.opt_flags|= IGNORE_FLAG;
2661
if (!ex->field_term->length())
2662
sql_ex.empty_flags |= FIELD_TERM_EMPTY;
2663
if (!ex->enclosed->length())
2664
sql_ex.empty_flags |= ENCLOSED_EMPTY;
2665
if (!ex->line_term->length())
2666
sql_ex.empty_flags |= LINE_TERM_EMPTY;
2667
if (!ex->line_start->length())
2668
sql_ex.empty_flags |= LINE_START_EMPTY;
2669
if (!ex->escaped->length())
2670
sql_ex.empty_flags |= ESCAPED_EMPTY;
2672
skip_lines = ex->skip_lines;
2674
List_iterator<Item> li(fields_arg);
2675
field_lens_buf.length(0);
2676
fields_buf.length(0);
2678
while ((item = li++))
2681
unsigned char len = (unsigned char) strlen(item->name);
2682
field_block_len += len + 1;
2683
fields_buf.append(item->name, len + 1);
2684
field_lens_buf.append((char*)&len, 1);
2687
field_lens = (const unsigned char*)field_lens_buf.ptr();
2688
fields = fields_buf.ptr();
2694
The caller must do buf[event_len] = 0 before he starts using the
2697
Load_log_event::Load_log_event(const char *buf, uint32_t event_len,
2698
const Format_description_log_event *description_event)
2699
:Log_event(buf, description_event), num_fields(0), fields(0),
2700
field_lens(0),field_block_len(0),
2701
table_name(0), db(0), fname(0), local_fname(false)
2704
I (Guilhem) manually tested replication of LOAD DATA INFILE for 3.23->5.0,
2705
4.0->5.0 and 5.0->5.0 and it works.
2708
copy_log_event(buf, event_len,
2709
((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
2711
description_event->common_header_len :
2712
LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN),
2714
/* otherwise it's a derived class, will call copy_log_event() itself */
2720
Load_log_event::copy_log_event()
2723
int Load_log_event::copy_log_event(const char *buf, ulong event_len,
2725
const Format_description_log_event *description_event)
2728
char* buf_end = (char*)buf + event_len;
2729
/* this is the beginning of the post-header */
2730
const char* data_head = buf + description_event->common_header_len;
2731
slave_proxy_id= thread_id= uint4korr(data_head + L_THREAD_ID_OFFSET);
2732
exec_time = uint4korr(data_head + L_EXEC_TIME_OFFSET);
2733
skip_lines = uint4korr(data_head + L_SKIP_LINES_OFFSET);
2734
table_name_len = (uint)data_head[L_TBL_LEN_OFFSET];
2735
db_len = (uint)data_head[L_DB_LEN_OFFSET];
2736
num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
2738
if ((int) event_len < body_offset)
2741
Sql_ex.init() on success returns the pointer to the first byte after
2742
the sql_ex structure, which is the start of field lengths array.
2744
if (!(field_lens= (unsigned char*)sql_ex.init((char*)buf + body_offset,
2746
buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
2749
data_len = event_len - body_offset;
2750
if (num_fields > data_len) // simple sanity check against corruption
2752
for (uint32_t i = 0; i < num_fields; i++)
2753
field_block_len += (uint)field_lens[i] + 1;
2755
fields = (char*)field_lens + num_fields;
2756
table_name = fields + field_block_len;
2757
db = table_name + table_name_len + 1;
2758
fname = db + db_len + 1;
2759
fname_len = strlen(fname);
2760
// null termination is accomplished by the caller doing buf[event_len]=0
2767
Load_log_event::set_fields()
2770
This function can not use the member variable
2771
for the database, since LOAD DATA INFILE on the slave
2772
can be for a different database than the current one.
2773
This is the reason for the affected_db argument to this method.
2776
void Load_log_event::set_fields(const char* affected_db,
2777
List<Item> &field_list,
2778
Name_resolution_context *context)
2781
const char* field = fields;
2782
for (i= 0; i < num_fields; i++)
2784
field_list.push_back(new Item_field(context,
2785
affected_db, table_name, field));
2786
field+= field_lens[i] + 1;
2792
Does the data loading job when executing a LOAD DATA on the slave.
2796
@param use_rli_only_for_errors If set to 1, rli is provided to
2797
Load_log_event::exec_event only for this
2798
function to have RPL_LOG_NAME and
2799
rli->last_slave_error, both being used by
2800
error reports. rli's position advancing
2801
is skipped (done by the caller which is
2802
Execute_load_log_event::exec_event).
2803
If set to 0, rli is provided for full use,
2804
i.e. for error reports and position
2808
fix this; this can be done by testing rules in
2809
Create_file_log_event::exec_event() and then discarding Append_block and
2812
this is a bug - this needs to be moved to the I/O thread
2820
int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli,
2821
bool use_rli_only_for_errors)
2824
Query_id &query_id= Query_id::get_query_id();
2825
new_db.length= db_len;
2826
new_db.str= (char *) rpl_filter->get_rewrite_db(db, &new_db.length);
2827
session->set_db(new_db.str, new_db.length);
2828
assert(session->query == 0);
2829
session->query_length= 0; // Should not be needed
2830
session->is_slave_error= 0;
2831
clear_all_errors(session, const_cast<Relay_log_info*>(rli));
2833
/* see Query_log_event::do_apply_event() and BUG#13360 */
2834
assert(!rli->m_table_map.count());
2836
Usually lex_start() is called by mysql_parse(), but we need it here
2837
as the present method does not call mysql_parse().
2840
mysql_reset_session_for_next_command(session);
2842
if (!use_rli_only_for_errors)
2845
Saved for InnoDB, see comment in
2846
Query_log_event::do_apply_event()
2848
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
2852
We test replicate_*_db rules. Note that we have already prepared
2853
the file to load, even if we are going to ignore and delete it
2854
now. So it is possible that we did a lot of disk writes for
2855
nothing. In other words, a big LOAD DATA INFILE on the master will
2856
still consume a lot of space on the slave (space in the relay log
2857
+ space of temp files: twice the space of the file to load...)
2858
even if it will finally be ignored. TODO: fix this; this can be
2859
done by testing rules in Create_file_log_event::do_apply_event()
2860
and then discarding Append_block and al. Another way is do the
2861
filtering in the I/O thread (more efficient: no disk writes at
2865
Note: We do not need to execute reset_one_shot_variables() if this
2867
Reason: The db stored in binlog events is the same for SET and for
2868
its companion query. If the SET is ignored because of
2869
db_ok(), the companion query will also be ignored, and if
2870
the companion query is ignored in the db_ok() test of
2871
::do_apply_event(), then the companion SET also have so
2872
we don't need to reset_one_shot_variables().
2874
if (rpl_filter->db_ok(session->db))
2876
session->set_time((time_t)when);
2877
session->query_id = query_id.next();
2879
Initing session->row_count is not necessary in theory as this variable has no
2880
influence in the case of the slave SQL thread (it is used to generate a
2881
"data truncated" warning but which is absorbed and never gets to the
2882
error log); still we init it to avoid a Valgrind message.
2884
drizzle_reset_errors(session, 0);
2887
memset(&tables, 0, sizeof(tables));
2888
tables.db= session->strmake(session->db, session->db_length);
2889
tables.alias = tables.table_name = (char*) table_name;
2890
tables.lock_type = TL_WRITE;
2893
// the table will be opened in mysql_load
2894
if (rpl_filter->is_on() && !rpl_filter->tables_ok(session->db, &tables))
2896
// TODO: this is a bug - this needs to be moved to the I/O thread
2898
skip_load_data_infile(net);
2904
enum enum_duplicates handle_dup;
2906
char *load_data_query;
2909
Forge LOAD DATA INFILE query which will be used in SHOW PROCESS LIST
2910
and written to slave's binlog if binlogging is on.
2912
if (!(load_data_query= (char *)session->alloc(get_query_buffer_length() + 1)))
2915
This will set session->fatal_error in case of OOM. So we surely will notice
2916
that something is wrong.
2921
print_query(false, load_data_query, &end, (char **)&session->lex->fname_start,
2922
(char **)&session->lex->fname_end);
2924
session->query_length= end - load_data_query;
2925
session->query= load_data_query;
2927
if (sql_ex.opt_flags & REPLACE_FLAG)
2929
handle_dup= DUP_REPLACE;
2931
else if (sql_ex.opt_flags & IGNORE_FLAG)
2934
handle_dup= DUP_ERROR;
2939
When replication is running fine, if it was DUP_ERROR on the
2940
master then we could choose IGNORE here, because if DUP_ERROR
2941
suceeded on master, and data is identical on the master and slave,
2942
then there should be no uniqueness errors on slave, so IGNORE is
2943
the same as DUP_ERROR. But in the unlikely case of uniqueness errors
2944
(because the data on the master and slave happen to be different
2945
(user error or bug), we want LOAD DATA to print an error message on
2946
the slave to discover the problem.
2948
If reading from net (a 3.23 master), mysql_load() will change this
2951
handle_dup= DUP_ERROR;
2954
We need to set session->lex->sql_command and session->lex->duplicates
2955
since InnoDB tests these variables to decide if this is a LOAD
2956
DATA ... REPLACE INTO ... statement even though mysql_parse()
2957
is not called. This is not needed in 5.0 since there the LOAD
2958
DATA ... statement is replicated using mysql_parse(), which
2959
sets the session->lex fields correctly.
2961
session->lex->sql_command= SQLCOM_LOAD;
2962
session->lex->duplicates= handle_dup;
2964
sql_exchange ex((char*)fname, sql_ex.opt_flags & DUMPFILE_FLAG);
2965
String field_term(sql_ex.field_term,sql_ex.field_term_len,&my_charset_utf8_general_ci);
2966
String enclosed(sql_ex.enclosed,sql_ex.enclosed_len,&my_charset_utf8_general_ci);
2967
String line_term(sql_ex.line_term,sql_ex.line_term_len,&my_charset_utf8_general_ci);
2968
String line_start(sql_ex.line_start,sql_ex.line_start_len,&my_charset_utf8_general_ci);
2969
String escaped(sql_ex.escaped,sql_ex.escaped_len, &my_charset_utf8_general_ci);
2970
ex.field_term= &field_term;
2971
ex.enclosed= &enclosed;
2972
ex.line_term= &line_term;
2973
ex.line_start= &line_start;
2974
ex.escaped= &escaped;
2976
ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
2977
if (sql_ex.empty_flags & FIELD_TERM_EMPTY)
2978
ex.field_term->length(0);
2980
ex.skip_lines = skip_lines;
2981
List<Item> field_list;
2982
session->lex->select_lex.context.resolve_in_table_list_only(&tables);
2983
set_fields(tables.db, field_list, &session->lex->select_lex.context);
2984
session->variables.pseudo_thread_id= thread_id;
2987
// mysql_load will use session->net to read the file
2988
session->net.vio = net->vio;
2990
Make sure the client does not get confused about the packet sequence
2992
session->net.pkt_nr = net->pkt_nr;
2995
It is safe to use tmp_list twice because we are not going to
2996
update it inside mysql_load().
2998
List<Item> tmp_list;
2999
if (mysql_load(session, &ex, &tables, field_list, tmp_list, tmp_list,
3000
handle_dup, ignore, net != 0))
3001
session->is_slave_error= 1;
3002
if (session->cuted_fields)
3004
/* log_pos is the position of the LOAD event in the master log */
3005
sql_print_warning(_("Slave: load data infile on table '%s' at "
3006
"log position %s in log '%s' produced %ld "
3007
"warning(s). Default database: '%s'"),
3009
llstr(log_pos,llbuff), RPL_LOG_NAME,
3010
(ulong) session->cuted_fields,
3011
print_slave_db_safe(session->db));
3014
net->pkt_nr= session->net.pkt_nr;
3020
We will just ask the master to send us /dev/null if we do not
3021
want to load the data.
3022
TODO: this a bug - needs to be done in I/O thread
3025
skip_load_data_infile(net);
3029
session->net.vio = 0;
3030
const char *remember_db= session->db;
3031
pthread_mutex_lock(&LOCK_thread_count);
3032
session->catalog= 0;
3033
session->set_db(NULL, 0); /* will free the current database */
3035
session->query_length= 0;
3036
pthread_mutex_unlock(&LOCK_thread_count);
3037
close_thread_tables(session);
3039
if (session->is_slave_error)
3041
/* this err/sql_errno code is copy-paste from net_send_error() */
3044
if (session->is_error())
3046
err= session->main_da.message();
3047
sql_errno= session->main_da.sql_errno();
3051
sql_errno=ER_UNKNOWN_ERROR;
3054
rli->report(ERROR_LEVEL, sql_errno,
3055
_("Error '%s' running LOAD DATA INFILE on table '%s'. "
3056
"Default database: '%s'"),
3057
err, (char*)table_name, print_slave_db_safe(remember_db));
3058
free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
3061
free_root(session->mem_root,MYF(MY_KEEP_PREALLOC));
3063
if (session->is_fatal_error)
3066
snprintf(buf, sizeof(buf),
3067
_("Running LOAD DATA INFILE on table '%-.64s'."
3068
" Default database: '%-.64s'"),
3070
print_slave_db_safe(remember_db));
3072
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
3073
ER(ER_SLAVE_FATAL_ERROR), buf);
3077
return ( use_rli_only_for_errors ? 0 : Log_event::do_apply_event(rli) );
3081
/**************************************************************************
3082
Rotate_log_event methods
3083
**************************************************************************/
3086
Rotate_log_event::pack_info()
3089
void Rotate_log_event::pack_info(Protocol *protocol)
3091
char buf1[256], buf[22];
3092
String tmp(buf1, sizeof(buf1), &my_charset_utf8_general_ci);
3094
tmp.append(new_log_ident, ident_len);
3095
tmp.append(STRING_WITH_LEN(";pos="));
3096
tmp.append(llstr(pos,buf));
3097
protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin);
3102
Rotate_log_event::Rotate_log_event() (2 constructors)
3106
Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg,
3107
uint32_t ident_len_arg, uint64_t pos_arg,
3109
:Log_event(), new_log_ident(new_log_ident_arg),
3110
pos(pos_arg),ident_len(ident_len_arg ? ident_len_arg :
3111
(uint) strlen(new_log_ident_arg)), flags(flags_arg)
3113
if (flags & DUP_NAME)
3114
new_log_ident= my_strndup(new_log_ident_arg, ident_len, MYF(MY_WME));
3119
Rotate_log_event::Rotate_log_event(const char* buf, uint32_t event_len,
3120
const Format_description_log_event* description_event)
3121
:Log_event(buf, description_event) ,new_log_ident(0), flags(DUP_NAME)
3123
// The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET
3124
uint8_t header_size= description_event->common_header_len;
3125
uint8_t post_header_len= description_event->post_header_len[ROTATE_EVENT-1];
3126
uint32_t ident_offset;
3127
if (event_len < header_size)
3130
pos = post_header_len ? uint8korr(buf + R_POS_OFFSET) : 4;
3131
ident_len = (uint)(event_len -
3132
(header_size+post_header_len));
3133
ident_offset = post_header_len;
3134
set_if_smaller(ident_len,FN_REFLEN-1);
3135
new_log_ident= my_strndup(buf + ident_offset, (uint) ident_len, MYF(MY_WME));
3141
Rotate_log_event::write()
3144
bool Rotate_log_event::write(IO_CACHE* file)
3146
char buf[ROTATE_HEADER_LEN];
3147
int8store(buf + R_POS_OFFSET, pos);
3148
return (write_header(file, ROTATE_HEADER_LEN + ident_len) ||
3149
my_b_safe_write(file, (unsigned char*)buf, ROTATE_HEADER_LEN) ||
3150
my_b_safe_write(file, (unsigned char*)new_log_ident, (uint) ident_len));
3155
Got a rotate log event from the master.
3157
This is mainly used so that we can later figure out the logname and
3158
position for the master.
3160
We can't rotate the slave's BINlog as this will cause infinitive rotations
3161
in a A -> B -> A setup.
3162
The NOTES below is a wrong comment which will disappear when 4.1 is merged.
3167
int Rotate_log_event::do_update_pos(Relay_log_info *rli)
3169
pthread_mutex_lock(&rli->data_lock);
3170
rli->event_relay_log_pos= my_b_tell(rli->cur_log);
3172
If we are in a transaction or in a group: the only normal case is
3173
when the I/O thread was copying a big transaction, then it was
3174
stopped and restarted: we have this in the relay log:
3182
In that case, we don't want to touch the coordinates which
3183
correspond to the beginning of the transaction. Starting from
3184
5.0.0, there also are some rotates from the slave itself, in the
3185
relay log, which shall not change the group positions.
3187
if ((server_id != ::server_id || rli->replicate_same_server_id) &&
3188
!rli->is_in_group())
3190
rli->group_master_log_name.assign(new_log_ident, ident_len+1);
3191
rli->notify_group_master_log_name_update();
3192
rli->group_master_log_pos= pos;
3193
rli->group_relay_log_name.assign(rli->event_relay_log_name);
3194
rli->notify_group_relay_log_name_update();
3195
rli->group_relay_log_pos= rli->event_relay_log_pos;
3197
Reset session->options and sql_mode etc, because this could be the signal of
3198
a master's downgrade from 5.0 to 4.0.
3199
However, no need to reset description_event_for_exec: indeed, if the next
3200
master is 5.0 (even 5.0.1) we will soon get a Format_desc; if the next
3201
master is 4.0 then the events are in the slave's format (conversion).
3203
set_slave_thread_options(session);
3204
session->variables.auto_increment_increment=
3205
session->variables.auto_increment_offset= 1;
3207
pthread_mutex_unlock(&rli->data_lock);
3208
pthread_cond_broadcast(&rli->data_cond);
3209
flush_relay_log_info(rli);
3215
Log_event::enum_skip_reason
3216
Rotate_log_event::do_shall_skip(Relay_log_info *rli)
3218
enum_skip_reason reason= Log_event::do_shall_skip(rli);
3221
case Log_event::EVENT_SKIP_NOT:
3222
case Log_event::EVENT_SKIP_COUNT:
3223
return Log_event::EVENT_SKIP_NOT;
3225
case Log_event::EVENT_SKIP_IGNORE:
3226
return Log_event::EVENT_SKIP_IGNORE;
3229
return Log_event::EVENT_SKIP_NOT; // To keep compiler happy
3233
/**************************************************************************
3234
Intvar_log_event methods
3235
**************************************************************************/
3238
Intvar_log_event::pack_info()
3241
void Intvar_log_event::pack_info(Protocol *protocol)
3243
char buf[256], *pos;
3244
pos= strmake(buf, get_var_type_name(), sizeof(buf)-23);
3246
pos= int64_t10_to_str(val, pos, -10);
3247
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3252
Intvar_log_event::Intvar_log_event()
3255
Intvar_log_event::Intvar_log_event(const char* buf,
3256
const Format_description_log_event* description_event)
3257
:Log_event(buf, description_event)
3259
buf+= description_event->common_header_len;
3260
type= buf[I_TYPE_OFFSET];
3261
val= uint8korr(buf+I_VAL_OFFSET);
3266
Intvar_log_event::get_var_type_name()
3269
const char* Intvar_log_event::get_var_type_name()
3272
case LAST_INSERT_ID_EVENT: return "LAST_INSERT_ID";
3273
case INSERT_ID_EVENT: return "INSERT_ID";
3274
default: /* impossible */ return "UNKNOWN";
3280
Intvar_log_event::write()
3283
bool Intvar_log_event::write(IO_CACHE* file)
3285
unsigned char buf[9];
3286
buf[I_TYPE_OFFSET]= (unsigned char) type;
3287
int8store(buf + I_VAL_OFFSET, val);
3288
return (write_header(file, sizeof(buf)) ||
3289
my_b_safe_write(file, buf, sizeof(buf)));
3294
Intvar_log_event::print()
3298
Intvar_log_event::do_apply_event()
3301
int Intvar_log_event::do_apply_event(Relay_log_info const *rli)
3304
We are now in a statement until the associated query log event has
3307
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
3310
case LAST_INSERT_ID_EVENT:
3311
session->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 1;
3312
session->first_successful_insert_id_in_prev_stmt= val;
3314
case INSERT_ID_EVENT:
3315
session->force_one_auto_inc_interval(val);
3321
int Intvar_log_event::do_update_pos(Relay_log_info *rli)
3323
rli->inc_event_relay_log_pos();
3328
Log_event::enum_skip_reason
3329
Intvar_log_event::do_shall_skip(Relay_log_info *rli)
3332
It is a common error to set the slave skip counter to 1 instead of
3333
2 when recovering from an insert which used a auto increment,
3334
rand, or user var. Therefore, if the slave skip counter is 1, we
3335
just say that this event should be skipped by ignoring it, meaning
3336
that we do not change the value of the slave skip counter since it
3337
will be decreased by the following insert event.
3339
return continue_group(rli);
3343
/**************************************************************************
3344
Rand_log_event methods
3345
**************************************************************************/
3347
void Rand_log_event::pack_info(Protocol *protocol)
3349
char buf1[256], *pos;
3350
pos= my_stpcpy(buf1,"rand_seed1=");
3351
pos= int10_to_str((long) seed1, pos, 10);
3352
pos= my_stpcpy(pos, ",rand_seed2=");
3353
pos= int10_to_str((long) seed2, pos, 10);
3354
protocol->store(buf1, (uint) (pos-buf1), &my_charset_bin);
3358
Rand_log_event::Rand_log_event(const char* buf,
3359
const Format_description_log_event* description_event)
3360
:Log_event(buf, description_event)
3362
buf+= description_event->common_header_len;
3363
seed1= uint8korr(buf+RAND_SEED1_OFFSET);
3364
seed2= uint8korr(buf+RAND_SEED2_OFFSET);
3368
bool Rand_log_event::write(IO_CACHE* file)
3370
unsigned char buf[16];
3371
int8store(buf + RAND_SEED1_OFFSET, seed1);
3372
int8store(buf + RAND_SEED2_OFFSET, seed2);
3373
return (write_header(file, sizeof(buf)) ||
3374
my_b_safe_write(file, buf, sizeof(buf)));
3378
int Rand_log_event::do_apply_event(Relay_log_info const *rli)
3381
We are now in a statement until the associated query log event has
3384
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
3386
session->rand.seed1= (ulong) seed1;
3387
session->rand.seed2= (ulong) seed2;
3391
int Rand_log_event::do_update_pos(Relay_log_info *rli)
3393
rli->inc_event_relay_log_pos();
3398
Log_event::enum_skip_reason
3399
Rand_log_event::do_shall_skip(Relay_log_info *rli)
3402
It is a common error to set the slave skip counter to 1 instead of
3403
2 when recovering from an insert which used a auto increment,
3404
rand, or user var. Therefore, if the slave skip counter is 1, we
3405
just say that this event should be skipped by ignoring it, meaning
3406
that we do not change the value of the slave skip counter since it
3407
will be decreased by the following insert event.
3409
return continue_group(rli);
3413
/**************************************************************************
3414
Xid_log_event methods
3415
**************************************************************************/
3417
void Xid_log_event::pack_info(Protocol *protocol)
3419
char buf[128], *pos;
3420
pos= my_stpcpy(buf, "COMMIT /* xid=");
3421
pos= int64_t10_to_str(xid, pos, 10);
3422
pos= my_stpcpy(pos, " */");
3423
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3428
It's ok not to use int8store here,
3429
as long as xid_t::set(uint64_t) and
3430
xid_t::get_my_xid doesn't do it either.
3431
We don't care about actual values of xids as long as
3432
identical numbers compare identically
3436
Xid_log_event(const char* buf,
3437
const Format_description_log_event *description_event)
3438
:Log_event(buf, description_event)
3440
buf+= description_event->common_header_len;
3441
memcpy(&xid, buf, sizeof(xid));
3445
bool Xid_log_event::write(IO_CACHE* file)
3447
return write_header(file, sizeof(xid)) ||
3448
my_b_safe_write(file, (unsigned char*) &xid, sizeof(xid));
3452
int Xid_log_event::do_apply_event(Relay_log_info const *rli __attribute__((unused)))
3454
return end_trans(session, COMMIT);
3457
Log_event::enum_skip_reason
3458
Xid_log_event::do_shall_skip(Relay_log_info *rli)
3460
if (rli->slave_skip_counter > 0) {
3461
session->options&= ~OPTION_BEGIN;
3462
return(Log_event::EVENT_SKIP_COUNT);
3464
return(Log_event::do_shall_skip(rli));
3468
/**************************************************************************
3469
User_var_log_event methods
3470
**************************************************************************/
3472
void User_var_log_event::pack_info(Protocol* protocol)
3475
uint32_t val_offset= 4 + name_len;
3476
uint32_t event_len= val_offset;
3480
if (!(buf= (char*) my_malloc(val_offset + 5, MYF(MY_WME))))
3482
my_stpcpy(buf + val_offset, "NULL");
3483
event_len= val_offset + 4;
3490
float8get(real_val, val);
3491
if (!(buf= (char*) my_malloc(val_offset + MY_GCVT_MAX_FIELD_WIDTH + 1,
3494
event_len+= my_gcvt(real_val, MY_GCVT_ARG_DOUBLE, MY_GCVT_MAX_FIELD_WIDTH,
3495
buf + val_offset, NULL);
3498
if (!(buf= (char*) my_malloc(val_offset + 22, MYF(MY_WME))))
3500
event_len= int64_t10_to_str(uint8korr(val), buf + val_offset,-10)-buf;
3502
case DECIMAL_RESULT:
3504
if (!(buf= (char*) my_malloc(val_offset + DECIMAL_MAX_STR_LENGTH,
3507
String str(buf+val_offset, DECIMAL_MAX_STR_LENGTH, &my_charset_bin);
3509
binary2my_decimal(E_DEC_FATAL_ERROR, (unsigned char*) (val+2), &dec, val[0],
3511
my_decimal2string(E_DEC_FATAL_ERROR, &dec, 0, 0, 0, &str);
3512
event_len= str.length() + val_offset;
3516
/* 15 is for 'COLLATE' and other chars */
3517
buf= (char*) my_malloc(event_len+val_len*2+1+2*MY_CS_NAME_SIZE+15,
3519
const CHARSET_INFO *cs;
3522
if (!(cs= get_charset(charset_number, MYF(0))))
3524
my_stpcpy(buf+val_offset, "???");
3529
char *p= strxmov(buf + val_offset, "_", cs->csname, " ", NULL);
3530
p= str_to_hex(p, val, val_len);
3531
p= strxmov(p, " COLLATE ", cs->name, NULL);
3543
memcpy(buf+2, name, name_len);
3544
buf[2+name_len]= '`';
3545
buf[3+name_len]= '=';
3546
protocol->store(buf, event_len, &my_charset_bin);
3551
User_var_log_event::
3552
User_var_log_event(const char* buf,
3553
const Format_description_log_event* description_event)
3554
:Log_event(buf, description_event)
3556
buf+= description_event->common_header_len;
3557
name_len= uint4korr(buf);
3558
name= (char *) buf + UV_NAME_LEN_SIZE;
3559
buf+= UV_NAME_LEN_SIZE + name_len;
3560
is_null= (bool) *buf;
3563
type= STRING_RESULT;
3564
charset_number= my_charset_bin.number;
3570
type= (Item_result) buf[UV_VAL_IS_NULL];
3571
charset_number= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE);
3572
val_len= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
3573
UV_CHARSET_NUMBER_SIZE);
3574
val= (char *) (buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
3575
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE);
3580
bool User_var_log_event::write(IO_CACHE* file)
3582
char buf[UV_NAME_LEN_SIZE];
3583
char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
3584
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE];
3585
unsigned char buf2[(8 > DECIMAL_MAX_FIELD_SIZE + 2) ? 8 : DECIMAL_MAX_FIELD_SIZE +2], *pos= buf2;
3586
uint32_t buf1_length;
3589
int4store(buf, name_len);
3591
if ((buf1[0]= is_null))
3594
val_len= 0; // Length of 'pos'
3599
int4store(buf1 + 2, charset_number);
3603
float8store(buf2, *(double*) val);
3606
int8store(buf2, *(int64_t*) val);
3608
case DECIMAL_RESULT:
3610
my_decimal *dec= (my_decimal *)val;
3611
dec->fix_buffer_pointer();
3612
buf2[0]= (char)(dec->intg + dec->frac);
3613
buf2[1]= (char)dec->frac;
3614
decimal2bin((decimal_t*)val, buf2+2, buf2[0], buf2[1]);
3615
val_len= decimal_bin_size(buf2[0], buf2[1]) + 2;
3619
pos= (unsigned char*) val;
3626
int4store(buf1 + 2 + UV_CHARSET_NUMBER_SIZE, val_len);
3630
/* Length of the whole event */
3631
event_length= sizeof(buf)+ name_len + buf1_length + val_len;
3633
return (write_header(file, event_length) ||
3634
my_b_safe_write(file, (unsigned char*) buf, sizeof(buf)) ||
3635
my_b_safe_write(file, (unsigned char*) name, name_len) ||
3636
my_b_safe_write(file, (unsigned char*) buf1, buf1_length) ||
3637
my_b_safe_write(file, pos, val_len));
3643
User_var_log_event::do_apply_event()
3646
int User_var_log_event::do_apply_event(Relay_log_info const *rli)
3649
const CHARSET_INFO *charset;
3650
if (!(charset= get_charset(charset_number, MYF(MY_WME))))
3652
LEX_STRING user_var_name;
3653
user_var_name.str= name;
3654
user_var_name.length= name_len;
3659
We are now in a statement until the associated query log event has
3662
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
3666
it= new Item_null();
3672
float8get(real_val, val);
3673
it= new Item_float(real_val, 0);
3674
val= (char*) &real_val; // Pointer to value in native format
3678
int_val= (int64_t) uint8korr(val);
3679
it= new Item_int(int_val);
3680
val= (char*) &int_val; // Pointer to value in native format
3683
case DECIMAL_RESULT:
3685
Item_decimal *dec= new Item_decimal((unsigned char*) val+2, val[0], val[1]);
3687
val= (char *)dec->val_decimal(NULL);
3688
val_len= sizeof(my_decimal);
3692
it= new Item_string(val, val_len, charset);
3700
Item_func_set_user_var e(user_var_name, it);
3702
Item_func_set_user_var can't substitute something else on its place =>
3703
0 can be passed as last argument (reference on item)
3705
e.fix_fields(session, 0);
3707
A variable can just be considered as a table with
3708
a single record and with a single column. Thus, like
3709
a column value, it could always have IMPLICIT derivation.
3711
e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT, 0);
3712
free_root(session->mem_root,0);
3717
int User_var_log_event::do_update_pos(Relay_log_info *rli)
3719
rli->inc_event_relay_log_pos();
3723
Log_event::enum_skip_reason
3724
User_var_log_event::do_shall_skip(Relay_log_info *rli)
3727
It is a common error to set the slave skip counter to 1 instead
3728
of 2 when recovering from an insert which used a auto increment,
3729
rand, or user var. Therefore, if the slave skip counter is 1, we
3730
just say that this event should be skipped by ignoring it, meaning
3731
that we do not change the value of the slave skip counter since it
3732
will be decreased by the following insert event.
3734
return continue_group(rli);
3738
/**************************************************************************
3739
Slave_log_event methods
3740
**************************************************************************/
3742
void Slave_log_event::pack_info(Protocol *protocol)
3744
char buf[256+HOSTNAME_LENGTH], *pos;
3745
pos= my_stpcpy(buf, "host=");
3746
pos= my_stpncpy(pos, master_host.c_str(), HOSTNAME_LENGTH);
3747
pos= my_stpcpy(pos, ",port=");
3748
pos= int10_to_str((long) master_port, pos, 10);
3749
pos= my_stpcpy(pos, ",log=");
3750
pos= my_stpcpy(pos, master_log.c_str());
3751
pos= my_stpcpy(pos, ",pos=");
3752
pos= int64_t10_to_str(master_pos, pos, 10);
3753
protocol->store(buf, pos-buf, &my_charset_bin);
3759
re-write this better without holding both locks at the same time
3761
Slave_log_event::Slave_log_event(Session* session_arg,
3762
Relay_log_info* rli)
3763
:Log_event(session_arg, 0, 0) , mem_pool(0), master_host(0)
3765
if (!rli->inited) // QQ When can this happen ?
3768
Master_info* mi = rli->mi;
3769
// TODO: re-write this better without holding both locks at the same time
3770
pthread_mutex_lock(&mi->data_lock);
3771
pthread_mutex_lock(&rli->data_lock);
3772
// on OOM, just do not initialize the structure and print the error
3773
if ((mem_pool = (char*)my_malloc(get_data_size() + 1,
3776
master_host.assign(mi->getHostname());
3777
master_log.assign(rli->group_master_log_name);
3778
master_port = mi->getPort();
3779
master_pos = rli->group_master_log_pos;
3782
sql_print_error(_("Out of memory while recording slave event"));
3783
pthread_mutex_unlock(&rli->data_lock);
3784
pthread_mutex_unlock(&mi->data_lock);
3789
Slave_log_event::~Slave_log_event()
3795
int Slave_log_event::get_data_size()
3797
return master_host.length() + master_log.length() + 1 + SL_MASTER_HOST_OFFSET;
3801
bool Slave_log_event::write(IO_CACHE* file)
3803
ulong event_length= get_data_size();
3804
int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos);
3805
int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port);
3806
// log and host are already there
3808
return (write_header(file, event_length) ||
3809
my_b_safe_write(file, (unsigned char*) mem_pool, event_length));
3813
void Slave_log_event::init_from_mem_pool()
3815
master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET);
3816
master_port = uint2korr(mem_pool + SL_MASTER_PORT_OFFSET);
3818
/* Assign these correctly */
3819
master_host.assign(mem_pool + SL_MASTER_HOST_OFFSET);
3820
master_log.assign();
3825
int Slave_log_event::do_apply_event(Relay_log_info const *rli __attribute__((unused)))
3827
if (mysql_bin_log.is_open())
3828
mysql_bin_log.write(this);
3833
/**************************************************************************
3834
Stop_log_event methods
3835
**************************************************************************/
3838
The master stopped. We used to clean up all temporary tables but
3839
this is useless as, as the master has shut down properly, it has
3840
written all DROP TEMPORARY Table (prepared statements' deletion is
3841
TODO only when we binlog prep stmts). We used to clean up
3842
slave_load_tmpdir, but this is useless as it has been cleared at the
3843
end of LOAD DATA INFILE. So we have nothing to do here. The place
3844
were we must do this cleaning is in
3845
Start_log_event_v3::do_apply_event(), not here. Because if we come
3846
here, the master was sane.
3848
int Stop_log_event::do_update_pos(Relay_log_info *rli)
3851
We do not want to update master_log pos because we get a rotate event
3852
before stop, so by now group_master_log_name is set to the next log.
3853
If we updated it, we will have incorrect master coordinates and this
3854
could give false triggers in MASTER_POS_WAIT() that we have reached
3855
the target position when in fact we have not.
3857
if (session->options & OPTION_BEGIN)
3858
rli->inc_event_relay_log_pos();
3861
rli->inc_group_relay_log_pos(0);
3862
flush_relay_log_info(rli);
3868
/**************************************************************************
3869
Create_file_log_event methods
3870
**************************************************************************/
3873
Create_file_log_event ctor
3876
Create_file_log_event::
3877
Create_file_log_event(Session* session_arg, sql_exchange* ex,
3878
const char* db_arg, const char* table_name_arg,
3879
List<Item>& fields_arg, enum enum_duplicates handle_dup,
3881
unsigned char* block_arg, uint32_t block_len_arg, bool using_trans)
3882
:Load_log_event(session_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup, ignore,
3884
fake_base(0), block(block_arg), event_buf(0), block_len(block_len_arg),
3885
file_id(session_arg->file_id = mysql_bin_log.next_file_id())
3887
sql_ex.force_new_format();
3893
Create_file_log_event::write_data_body()
3896
bool Create_file_log_event::write_data_body(IO_CACHE* file)
3899
if ((res= Load_log_event::write_data_body(file)) || fake_base)
3901
return (my_b_safe_write(file, (unsigned char*) "", 1) ||
3902
my_b_safe_write(file, (unsigned char*) block, block_len));
3907
Create_file_log_event::write_data_header()
3910
bool Create_file_log_event::write_data_header(IO_CACHE* file)
3913
unsigned char buf[CREATE_FILE_HEADER_LEN];
3914
if ((res= Load_log_event::write_data_header(file)) || fake_base)
3916
int4store(buf + CF_FILE_ID_OFFSET, file_id);
3917
return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN) != 0;
3922
Create_file_log_event::write_base()
3925
bool Create_file_log_event::write_base(IO_CACHE* file)
3928
fake_base= 1; // pretend we are Load event
3935
Create_file_log_event ctor
3938
Create_file_log_event::Create_file_log_event(const char* buf, uint32_t len,
3939
const Format_description_log_event* description_event)
3940
:Load_log_event(buf,0,description_event),fake_base(0),block(0),inited_from_old(0)
3942
uint32_t block_offset;
3943
uint32_t header_len= description_event->common_header_len;
3944
uint8_t load_header_len= description_event->post_header_len[LOAD_EVENT-1];
3945
uint8_t create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1];
3946
if (!(event_buf= (char*) my_memdup(buf, len, MYF(MY_WME))) ||
3947
copy_log_event(event_buf,len,
3948
((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
3949
load_header_len + header_len :
3950
(fake_base ? (header_len+load_header_len) :
3951
(header_len+load_header_len) +
3952
create_file_header_len)),
3955
if (description_event->binlog_version!=1)
3957
file_id= uint4korr(buf +
3959
load_header_len + CF_FILE_ID_OFFSET);
3961
Note that it's ok to use get_data_size() below, because it is computed
3962
with values we have already read from this event (because we called
3963
copy_log_event()); we are not using slave's format info to decode
3964
master's format, we are really using master's format info.
3965
Anyway, both formats should be identical (except the common_header_len)
3966
as these Load events are not changed between 4.0 and 5.0 (as logging of
3967
LOAD DATA INFILE does not use Load_log_event in 5.0).
3969
The + 1 is for \0 terminating fname
3971
block_offset= (description_event->common_header_len +
3972
Load_log_event::get_data_size() +
3973
create_file_header_len + 1);
3974
if (len < block_offset)
3976
block = (unsigned char*)buf + block_offset;
3977
block_len = len - block_offset;
3981
sql_ex.force_new_format();
3982
inited_from_old = 1;
3989
Create_file_log_event::pack_info()
3992
void Create_file_log_event::pack_info(Protocol *protocol)
3994
char buf[NAME_LEN*2 + 30 + 21*2], *pos;
3995
pos= my_stpcpy(buf, "db=");
3996
memcpy(pos, db, db_len);
3997
pos= my_stpcpy(pos + db_len, ";table=");
3998
memcpy(pos, table_name, table_name_len);
3999
pos= my_stpcpy(pos + table_name_len, ";file_id=");
4000
pos= int10_to_str((long) file_id, pos, 10);
4001
pos= my_stpcpy(pos, ";block_len=");
4002
pos= int10_to_str((long) block_len, pos, 10);
4003
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
4008
Create_file_log_event::do_apply_event()
4011
int Create_file_log_event::do_apply_event(Relay_log_info const *rli)
4013
char proc_info[17+FN_REFLEN+10], *fname_buf;
4019
memset(&file, 0, sizeof(file));
4020
fname_buf= my_stpcpy(proc_info, "Making temp file ");
4021
ext= slave_load_file_stem(fname_buf, file_id, server_id, ".info");
4022
session->set_proc_info(proc_info);
4023
my_delete(fname_buf, MYF(0)); // old copy may exist already
4024
if ((fd= my_create(fname_buf, CREATE_MODE,
4026
MYF(MY_WME))) < 0 ||
4027
init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
4028
MYF(MY_WME|MY_NABP)))
4030
rli->report(ERROR_LEVEL, my_errno,
4031
_("Error in Create_file event: could not open file '%s'"),
4036
// a trick to avoid allocating another buffer
4038
fname_len= (uint) (my_stpcpy(ext, ".data") - fname);
4039
if (write_base(&file))
4041
my_stpcpy(ext, ".info"); // to have it right in the error message
4042
rli->report(ERROR_LEVEL, my_errno,
4043
_("Error in Create_file event: could not write to file '%s'"),
4047
end_io_cache(&file);
4048
my_close(fd, MYF(0));
4050
// fname_buf now already has .data, not .info, because we did our trick
4051
my_delete(fname_buf, MYF(0)); // old copy may exist already
4052
if ((fd= my_create(fname_buf, CREATE_MODE,
4056
rli->report(ERROR_LEVEL, my_errno,
4057
_("Error in Create_file event: could not open file '%s'"),
4061
if (my_write(fd, (unsigned char*) block, block_len, MYF(MY_WME+MY_NABP)))
4063
rli->report(ERROR_LEVEL, my_errno,
4064
_("Error in Create_file event: write to '%s' failed"),
4068
error=0; // Everything is ok
4072
end_io_cache(&file);
4074
my_close(fd, MYF(0));
4075
session->set_proc_info(0);
4080
/**************************************************************************
4081
Append_block_log_event methods
4082
**************************************************************************/
4085
Append_block_log_event ctor
4088
Append_block_log_event::Append_block_log_event(Session *session_arg,
4090
unsigned char *block_arg,
4091
uint32_t block_len_arg,
4093
:Log_event(session_arg,0, using_trans), block(block_arg),
4094
block_len(block_len_arg), file_id(session_arg->file_id), db(db_arg)
4100
Append_block_log_event ctor
4103
Append_block_log_event::Append_block_log_event(const char* buf, uint32_t len,
4104
const Format_description_log_event* description_event)
4105
:Log_event(buf, description_event),block(0)
4107
uint8_t common_header_len= description_event->common_header_len;
4108
uint8_t append_block_header_len=
4109
description_event->post_header_len[APPEND_BLOCK_EVENT-1];
4110
uint32_t total_header_len= common_header_len+append_block_header_len;
4111
if (len < total_header_len)
4113
file_id= uint4korr(buf + common_header_len + AB_FILE_ID_OFFSET);
4114
block= (unsigned char*)buf + total_header_len;
4115
block_len= len - total_header_len;
4121
Append_block_log_event::write()
4124
bool Append_block_log_event::write(IO_CACHE* file)
4126
unsigned char buf[APPEND_BLOCK_HEADER_LEN];
4127
int4store(buf + AB_FILE_ID_OFFSET, file_id);
4128
return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) ||
4129
my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
4130
my_b_safe_write(file, (unsigned char*) block, block_len));
4135
Append_block_log_event::pack_info()
4138
void Append_block_log_event::pack_info(Protocol *protocol)
4142
length= (uint) sprintf(buf, ";file_id=%u;block_len=%u", file_id,
4144
protocol->store(buf, length, &my_charset_bin);
4149
Append_block_log_event::get_create_or_append()
4152
int Append_block_log_event::get_create_or_append() const
4154
return 0; /* append to the file, fail if not exists */
4158
Append_block_log_event::do_apply_event()
4161
int Append_block_log_event::do_apply_event(Relay_log_info const *rli)
4163
char proc_info[17+FN_REFLEN+10], *fname= proc_info+17;
4167
fname= my_stpcpy(proc_info, "Making temp file ");
4168
slave_load_file_stem(fname, file_id, server_id, ".data");
4169
session->set_proc_info(proc_info);
4170
if (get_create_or_append())
4172
my_delete(fname, MYF(0)); // old copy may exist already
4173
if ((fd= my_create(fname, CREATE_MODE,
4177
rli->report(ERROR_LEVEL, my_errno,
4178
_("Error in %s event: could not create file '%s'"),
4179
get_type_str(), fname);
4183
else if ((fd = my_open(fname, O_WRONLY | O_APPEND,
4186
rli->report(ERROR_LEVEL, my_errno,
4187
_("Error in %s event: could not open file '%s'"),
4188
get_type_str(), fname);
4191
if (my_write(fd, (unsigned char*) block, block_len, MYF(MY_WME+MY_NABP)))
4193
rli->report(ERROR_LEVEL, my_errno,
4194
_("Error in %s event: write to '%s' failed"),
4195
get_type_str(), fname);
4202
my_close(fd, MYF(0));
4203
session->set_proc_info(0);
4208
/**************************************************************************
4209
Delete_file_log_event methods
4210
**************************************************************************/
4213
Delete_file_log_event ctor
4216
Delete_file_log_event::Delete_file_log_event(Session *session_arg, const char* db_arg,
4218
:Log_event(session_arg, 0, using_trans), file_id(session_arg->file_id), db(db_arg)
4223
Delete_file_log_event ctor
4226
Delete_file_log_event::Delete_file_log_event(const char* buf, uint32_t len,
4227
const Format_description_log_event* description_event)
4228
:Log_event(buf, description_event),file_id(0)
4230
uint8_t common_header_len= description_event->common_header_len;
4231
uint8_t delete_file_header_len= description_event->post_header_len[DELETE_FILE_EVENT-1];
4232
if (len < (uint)(common_header_len + delete_file_header_len))
4234
file_id= uint4korr(buf + common_header_len + DF_FILE_ID_OFFSET);
4239
Delete_file_log_event::write()
4242
bool Delete_file_log_event::write(IO_CACHE* file)
4244
unsigned char buf[DELETE_FILE_HEADER_LEN];
4245
int4store(buf + DF_FILE_ID_OFFSET, file_id);
4246
return (write_header(file, sizeof(buf)) ||
4247
my_b_safe_write(file, buf, sizeof(buf)));
4252
Delete_file_log_event::pack_info()
4255
void Delete_file_log_event::pack_info(Protocol *protocol)
4259
length= (uint) sprintf(buf, ";file_id=%u", (uint) file_id);
4260
protocol->store(buf, (int32_t) length, &my_charset_bin);
4264
Delete_file_log_event::do_apply_event()
4267
int Delete_file_log_event::do_apply_event(Relay_log_info const *rli __attribute__((unused)))
4269
char fname[FN_REFLEN+10];
4270
char *ext= slave_load_file_stem(fname, file_id, server_id, ".data");
4271
(void) my_delete(fname, MYF(MY_WME));
4272
my_stpcpy(ext, ".info");
4273
(void) my_delete(fname, MYF(MY_WME));
4278
/**************************************************************************
4279
Execute_load_log_event methods
4280
**************************************************************************/
4283
Execute_load_log_event ctor
4286
Execute_load_log_event::Execute_load_log_event(Session *session_arg,
4289
:Log_event(session_arg, 0, using_trans), file_id(session_arg->file_id), db(db_arg)
4295
Execute_load_log_event ctor
4298
Execute_load_log_event::Execute_load_log_event(const char* buf, uint32_t len,
4299
const Format_description_log_event* description_event)
4300
:Log_event(buf, description_event), file_id(0)
4302
uint8_t common_header_len= description_event->common_header_len;
4303
uint8_t exec_load_header_len= description_event->post_header_len[EXEC_LOAD_EVENT-1];
4304
if (len < (uint)(common_header_len+exec_load_header_len))
4306
file_id= uint4korr(buf + common_header_len + EL_FILE_ID_OFFSET);
4311
Execute_load_log_event::write()
4314
bool Execute_load_log_event::write(IO_CACHE* file)
4316
unsigned char buf[EXEC_LOAD_HEADER_LEN];
4317
int4store(buf + EL_FILE_ID_OFFSET, file_id);
4318
return (write_header(file, sizeof(buf)) ||
4319
my_b_safe_write(file, buf, sizeof(buf)));
4324
Execute_load_log_event::pack_info()
4327
void Execute_load_log_event::pack_info(Protocol *protocol)
4331
length= (uint) sprintf(buf, ";file_id=%u", (uint) file_id);
4332
protocol->store(buf, (int32_t) length, &my_charset_bin);
4337
Execute_load_log_event::do_apply_event()
4340
int Execute_load_log_event::do_apply_event(Relay_log_info const *rli)
4342
char fname[FN_REFLEN+10];
4347
Load_log_event *lev= 0;
4349
ext= slave_load_file_stem(fname, file_id, server_id, ".info");
4350
if ((fd = my_open(fname, O_RDONLY,
4351
MYF(MY_WME))) < 0 ||
4352
init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
4353
MYF(MY_WME|MY_NABP)))
4355
rli->report(ERROR_LEVEL, my_errno,
4356
_("Error in Exec_load event: could not open file '%s'"),
4360
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
4361
(pthread_mutex_t*)0,
4362
rli->relay_log.description_event_for_exec)) ||
4363
lev->get_type_code() != NEW_LOAD_EVENT)
4365
rli->report(ERROR_LEVEL, 0,
4366
_("Error in Exec_load event: "
4367
"file '%s' appears corrupted"),
4372
lev->session = session;
4374
lev->do_apply_event should use rli only for errors i.e. should
4375
not advance rli's position.
4377
lev->do_apply_event is the place where the table is loaded (it
4378
calls mysql_load()).
4381
const_cast<Relay_log_info*>(rli)->future_group_master_log_pos= log_pos;
4382
if (lev->do_apply_event(0,rli,1))
4385
We want to indicate the name of the file that could not be loaded
4387
But as we are here we are sure the error is in rli->last_slave_error and
4388
rli->last_slave_errno (example of error: duplicate entry for key), so we
4389
don't want to overwrite it with the filename.
4390
What we want instead is add the filename to the current error message.
4392
char *tmp= my_strdup(rli->last_error().message, MYF(MY_WME));
4395
rli->report(ERROR_LEVEL, rli->last_error().number,
4396
_("%s. Failed executing load from '%s'"),
4403
We have an open file descriptor to the .info file; we need to close it
4404
or Windows will refuse to delete the file in my_delete().
4408
my_close(fd, MYF(0));
4409
end_io_cache(&file);
4412
(void) my_delete(fname, MYF(MY_WME));
4413
memcpy(ext, ".data", 6);
4414
(void) my_delete(fname, MYF(MY_WME));
4421
my_close(fd, MYF(0));
4422
end_io_cache(&file);
4428
/**************************************************************************
4429
Begin_load_query_log_event methods
4430
**************************************************************************/
4432
Begin_load_query_log_event::
4433
Begin_load_query_log_event(Session* session_arg, const char* db_arg, unsigned char* block_arg,
4434
uint32_t block_len_arg, bool using_trans)
4435
:Append_block_log_event(session_arg, db_arg, block_arg, block_len_arg,
4438
file_id= session_arg->file_id= mysql_bin_log.next_file_id();
4442
Begin_load_query_log_event::
4443
Begin_load_query_log_event(const char* buf, uint32_t len,
4444
const Format_description_log_event* desc_event)
4445
:Append_block_log_event(buf, len, desc_event)
4450
int Begin_load_query_log_event::get_create_or_append() const
4452
return 1; /* create the file */
4456
Log_event::enum_skip_reason
4457
Begin_load_query_log_event::do_shall_skip(Relay_log_info *rli)
4460
If the slave skip counter is 1, then we should not start executing
4463
return continue_group(rli);
4467
/**************************************************************************
4468
Execute_load_query_log_event methods
4469
**************************************************************************/
4472
Execute_load_query_log_event::
4473
Execute_load_query_log_event(Session *session_arg, const char* query_arg,
4474
ulong query_length_arg, uint32_t fn_pos_start_arg,
4475
uint32_t fn_pos_end_arg,
4476
enum_load_dup_handling dup_handling_arg,
4477
bool using_trans, bool suppress_use,
4478
Session::killed_state killed_err_arg):
4479
Query_log_event(session_arg, query_arg, query_length_arg, using_trans,
4480
suppress_use, killed_err_arg),
4481
file_id(session_arg->file_id), fn_pos_start(fn_pos_start_arg),
4482
fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg)
4487
Execute_load_query_log_event::
4488
Execute_load_query_log_event(const char* buf, uint32_t event_len,
4489
const Format_description_log_event* desc_event):
4490
Query_log_event(buf, event_len, desc_event, EXECUTE_LOAD_QUERY_EVENT),
4491
file_id(0), fn_pos_start(0), fn_pos_end(0)
4493
if (!Query_log_event::is_valid())
4496
buf+= desc_event->common_header_len;
4498
fn_pos_start= uint4korr(buf + ELQ_FN_POS_START_OFFSET);
4499
fn_pos_end= uint4korr(buf + ELQ_FN_POS_END_OFFSET);
4500
dup_handling= (enum_load_dup_handling)(*(buf + ELQ_DUP_HANDLING_OFFSET));
4502
if (fn_pos_start > q_len || fn_pos_end > q_len ||
4503
dup_handling > LOAD_DUP_REPLACE)
4506
file_id= uint4korr(buf + ELQ_FILE_ID_OFFSET);
4510
ulong Execute_load_query_log_event::get_post_header_size_for_derived()
4512
return EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN;
4517
Execute_load_query_log_event::write_post_header_for_derived(IO_CACHE* file)
4519
unsigned char buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN];
4520
int4store(buf, file_id);
4521
int4store(buf + 4, fn_pos_start);
4522
int4store(buf + 4 + 4, fn_pos_end);
4523
*(buf + 4 + 4 + 4)= (unsigned char) dup_handling;
4524
return my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
4528
void Execute_load_query_log_event::pack_info(Protocol *protocol)
4531
if (!(buf= (char*) my_malloc(9 + db_len + q_len + 10 + 21, MYF(MY_WME))))
4536
pos= my_stpcpy(buf, "use `");
4537
memcpy(pos, db, db_len);
4538
pos= my_stpcpy(pos+db_len, "`; ");
4542
memcpy(pos, query, q_len);
4545
pos= my_stpcpy(pos, " ;file_id=");
4546
pos= int10_to_str((long) file_id, pos, 10);
4547
protocol->store(buf, pos-buf, &my_charset_bin);
4553
Execute_load_query_log_event::do_apply_event(Relay_log_info const *rli)
4561
buf= (char*) my_malloc(q_len + 1 - (fn_pos_end - fn_pos_start) +
4562
(FN_REFLEN + 10) + 10 + 8 + 5, MYF(MY_WME));
4564
/* Replace filename and LOCAL keyword in query before executing it */
4567
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
4568
ER(ER_SLAVE_FATAL_ERROR),
4569
_("Not enough memory"));
4574
memcpy(p, query, fn_pos_start);
4576
fname= (p= strmake(p, STRING_WITH_LEN(" INFILE \'")));
4577
p= slave_load_file_stem(p, file_id, server_id, ".data");
4578
fname_end= p= strchr(p, '\0'); // Safer than p=p+5
4580
switch (dup_handling) {
4581
case LOAD_DUP_IGNORE:
4582
p= strmake(p, STRING_WITH_LEN(" IGNORE"));
4584
case LOAD_DUP_REPLACE:
4585
p= strmake(p, STRING_WITH_LEN(" REPLACE"));
4588
/* Ordinary load data */
4591
p= strmake(p, STRING_WITH_LEN(" INTO"));
4592
p= strmake(p, query+fn_pos_end, q_len-fn_pos_end);
4594
error= Query_log_event::do_apply_event(rli, buf, p-buf);
4596
/* Forging file name for deletion in same buffer */
4600
If there was an error the slave is going to stop, leave the
4601
file so that we can re-execute this event at START SLAVE.
4604
(void) my_delete(fname, MYF(MY_WME));
4611
/**************************************************************************
4613
**************************************************************************/
4616
sql_ex_info::write_data()
4619
bool sql_ex_info::write_data(IO_CACHE* file)
4623
return (write_str(file, field_term, (uint) field_term_len) ||
4624
write_str(file, enclosed, (uint) enclosed_len) ||
4625
write_str(file, line_term, (uint) line_term_len) ||
4626
write_str(file, line_start, (uint) line_start_len) ||
4627
write_str(file, escaped, (uint) escaped_len) ||
4628
my_b_safe_write(file,(unsigned char*) &opt_flags,1));
4633
@todo This is sensitive to field padding. We should write a
4634
char[7], not an old_sql_ex. /sven
4637
old_ex.field_term= *field_term;
4638
old_ex.enclosed= *enclosed;
4639
old_ex.line_term= *line_term;
4640
old_ex.line_start= *line_start;
4641
old_ex.escaped= *escaped;
4642
old_ex.opt_flags= opt_flags;
4643
old_ex.empty_flags=empty_flags;
4644
return my_b_safe_write(file, (unsigned char*) &old_ex, sizeof(old_ex)) != 0;
4653
const char *sql_ex_info::init(const char *buf, const char *buf_end,
4654
bool use_new_format)
4656
cached_new_format = use_new_format;
4661
The code below assumes that buf will not disappear from
4662
under our feet during the lifetime of the event. This assumption
4663
holds true in the slave thread if the log is in new format, but is not
4664
the case when we have old format because we will be reusing net buffer
4665
to read the actual file before we write out the Create_file event.
4667
if (read_str(&buf, buf_end, &field_term, &field_term_len) ||
4668
read_str(&buf, buf_end, &enclosed, &enclosed_len) ||
4669
read_str(&buf, buf_end, &line_term, &line_term_len) ||
4670
read_str(&buf, buf_end, &line_start, &line_start_len) ||
4671
read_str(&buf, buf_end, &escaped, &escaped_len))
4677
field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1;
4678
field_term = buf++; // Use first byte in string
4684
empty_flags= *buf++;
4685
if (empty_flags & FIELD_TERM_EMPTY)
4687
if (empty_flags & ENCLOSED_EMPTY)
4689
if (empty_flags & LINE_TERM_EMPTY)
4691
if (empty_flags & LINE_START_EMPTY)
4693
if (empty_flags & ESCAPED_EMPTY)
4700
/**************************************************************************
4701
Rows_log_event member functions
4702
**************************************************************************/
4704
Rows_log_event::Rows_log_event(Session *session_arg, Table *tbl_arg, ulong tid,
4705
MY_BITMAP const *cols, bool is_transactional)
4706
: Log_event(session_arg, 0, is_transactional),
4710
m_width(tbl_arg ? tbl_arg->s->fields : 1),
4711
m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
4712
, m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
4715
We allow a special form of dummy event when the table, and cols
4716
are null and the table id is UINT32_MAX. This is a temporary
4717
solution, to be able to terminate a started statement in the
4718
binary log: the extraneous events will be removed in the future.
4720
assert((tbl_arg && tbl_arg->s && tid != UINT32_MAX) || (!tbl_arg && !cols && tid == UINT32_MAX));
4722
if (session_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS)
4723
set_flags(NO_FOREIGN_KEY_CHECKS_F);
4724
if (session_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
4725
set_flags(RELAXED_UNIQUE_CHECKS_F);
4726
/* if bitmap_init fails, caught in is_valid() */
4727
if (likely(!bitmap_init(&m_cols,
4728
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
4732
/* Cols can be zero if this is a dummy binrows event */
4733
if (likely(cols != NULL))
4735
memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
4736
create_last_word_mask(&m_cols);
4741
// Needed because bitmap_init() does not set it to null on failure
4747
Rows_log_event::Rows_log_event(const char *buf, uint32_t event_len,
4748
Log_event_type event_type,
4749
const Format_description_log_event
4751
: Log_event(buf, description_event),
4754
m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
4755
, m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
4757
uint8_t const common_header_len= description_event->common_header_len;
4758
uint8_t const post_header_len= description_event->post_header_len[event_type-1];
4760
const char *post_start= buf + common_header_len;
4761
post_start+= RW_MAPID_OFFSET;
4762
if (post_header_len == 6)
4764
/* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
4765
m_table_id= uint4korr(post_start);
4770
m_table_id= (ulong) uint6korr(post_start);
4771
post_start+= RW_FLAGS_OFFSET;
4774
m_flags= uint2korr(post_start);
4776
unsigned char const *const var_start=
4777
(const unsigned char *)buf + common_header_len + post_header_len;
4778
unsigned char const *const ptr_width= var_start;
4779
unsigned char *ptr_after_width= (unsigned char*) ptr_width;
4780
m_width = net_field_length(&ptr_after_width);
4781
/* if bitmap_init fails, catched in is_valid() */
4782
if (likely(!bitmap_init(&m_cols,
4783
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
4787
memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8);
4788
create_last_word_mask(&m_cols);
4789
ptr_after_width+= (m_width + 7) / 8;
4793
// Needed because bitmap_init() does not set it to null on failure
4794
m_cols.bitmap= NULL;
4798
m_cols_ai.bitmap= m_cols.bitmap; /* See explanation in is_valid() */
4800
if (event_type == UPDATE_ROWS_EVENT)
4802
/* if bitmap_init fails, caught in is_valid() */
4803
if (likely(!bitmap_init(&m_cols_ai,
4804
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
4808
memcpy(m_cols_ai.bitmap, ptr_after_width, (m_width + 7) / 8);
4809
create_last_word_mask(&m_cols_ai);
4810
ptr_after_width+= (m_width + 7) / 8;
4814
// Needed because bitmap_init() does not set it to null on failure
4815
m_cols_ai.bitmap= 0;
4820
const unsigned char* const ptr_rows_data= (const unsigned char*) ptr_after_width;
4822
size_t const data_size= event_len - (ptr_rows_data - (const unsigned char *) buf);
4824
m_rows_buf= (unsigned char*) my_malloc(data_size, MYF(MY_WME));
4825
if (likely((bool)m_rows_buf))
4827
m_curr_row= m_rows_buf;
4828
m_rows_end= m_rows_buf + data_size;
4829
m_rows_cur= m_rows_end;
4830
memcpy(m_rows_buf, ptr_rows_data, data_size);
4833
m_cols.bitmap= 0; // to not free it
4838
Rows_log_event::~Rows_log_event()
4840
if (m_cols.bitmap == m_bitbuf) // no my_malloc happened
4841
m_cols.bitmap= 0; // so no free in bitmap_free
4842
bitmap_free(&m_cols); // To pair with bitmap_init().
4843
free((unsigned char*)m_rows_buf);
4846
int Rows_log_event::get_data_size()
4848
int const type_code= get_type_code();
4850
unsigned char buf[sizeof(m_width)+1];
4851
unsigned char *end= net_store_length(buf, (m_width + 7) / 8);
4853
int data_size= ROWS_HEADER_LEN;
4854
data_size+= no_bytes_in_map(&m_cols);
4855
data_size+= end - buf;
4857
if (type_code == UPDATE_ROWS_EVENT)
4858
data_size+= no_bytes_in_map(&m_cols_ai);
4860
data_size+= (m_rows_cur - m_rows_buf);
4865
int Rows_log_event::do_add_row_data(unsigned char *row_data, size_t length)
4868
When the table has a primary key, we would probably want, by default, to
4869
log only the primary key value instead of the entire "before image". This
4870
would save binlog space. TODO
4874
If length is zero, there is nothing to write, so we just
4875
return. Note that this is not an optimization, since calling
4876
realloc() with size 0 means free().
4884
assert(m_rows_buf <= m_rows_cur);
4885
assert(!m_rows_buf || (m_rows_end && m_rows_buf <= m_rows_end));
4886
assert(m_rows_cur <= m_rows_end);
4888
/* The cast will always work since m_rows_cur <= m_rows_end */
4889
if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length)
4891
size_t const block_size= 1024;
4892
my_ptrdiff_t const cur_size= m_rows_cur - m_rows_buf;
4893
my_ptrdiff_t const new_alloc=
4894
block_size * ((cur_size + length + block_size - 1) / block_size);
4896
unsigned char* const new_buf= (unsigned char*)my_realloc((unsigned char*)m_rows_buf, (uint) new_alloc,
4897
MYF(MY_ALLOW_ZERO_PTR|MY_WME));
4898
if (unlikely(!new_buf))
4899
return(HA_ERR_OUT_OF_MEM);
4901
/* If the memory moved, we need to move the pointers */
4902
if (new_buf != m_rows_buf)
4904
m_rows_buf= new_buf;
4905
m_rows_cur= m_rows_buf + cur_size;
4909
The end pointer should always be changed to point to the end of
4910
the allocated memory.
4912
m_rows_end= m_rows_buf + new_alloc;
4915
assert(m_rows_cur + length <= m_rows_end);
4916
memcpy(m_rows_cur, row_data, length);
4917
m_rows_cur+= length;
4922
int Rows_log_event::do_apply_event(Relay_log_info const *rli)
4926
If m_table_id == UINT32_MAX, then we have a dummy event that does not
4927
contain any data. In that case, we just remove all tables in the
4928
tables_to_lock list, close the thread tables, and return with
4931
if (m_table_id == UINT32_MAX)
4934
This one is supposed to be set: just an extra check so that
4935
nothing strange has happened.
4937
assert(get_flags(STMT_END_F));
4939
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
4940
close_thread_tables(session);
4941
session->clear_error();
4946
'session' has been set by exec_relay_log_event(), just before calling
4947
do_apply_event(). We still check here to prevent future coding
4950
assert(rli->sql_session == session);
4953
If there is no locks taken, this is the first binrow event seen
4954
after the table map events. We should then lock all the tables
4955
used in the transaction and proceed with execution of the actual
4960
bool need_reopen= 1; /* To execute the first lap of the loop below */
4963
lock_tables() reads the contents of session->lex, so they must be
4964
initialized. Contrary to in
4965
Table_map_log_event::do_apply_event() we don't call
4966
mysql_init_query() as that may reset the binlog format.
4971
There are a few flags that are replicated with each row event.
4972
Make sure to set/clear them before executing the main body of
4975
if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
4976
session->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
4978
session->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
4980
if (get_flags(RELAXED_UNIQUE_CHECKS_F))
4981
session->options|= OPTION_RELAXED_UNIQUE_CHECKS;
4983
session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
4984
/* A small test to verify that objects have consistent types */
4985
assert(sizeof(session->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
4988
while ((error= lock_tables(session, rli->tables_to_lock,
4989
rli->tables_to_lock_count, &need_reopen)))
4993
if (session->is_slave_error || session->is_fatal_error)
4996
Error reporting borrowed from Query_log_event with many excessive
4997
simplifications (we don't honour --slave-skip-errors)
4999
uint32_t actual_error= session->main_da.sql_errno();
5000
rli->report(ERROR_LEVEL, actual_error,
5001
_("Error '%s' in %s event: when locking tables"),
5003
? session->main_da.message()
5004
: _("unexpected success or fatal error")),
5006
session->is_fatal_error= 1;
5010
rli->report(ERROR_LEVEL, error,
5011
_("Error in %s event: when locking tables"),
5014
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5019
So we need to reopen the tables.
5021
We need to flush the pending RBR event, since it keeps a
5022
pointer to an open table.
5024
ALTERNATIVE SOLUTION (not implemented): Extract a pointer to
5025
the pending RBR event and reset the table pointer after the
5026
tables has been reopened.
5028
NOTE: For this new scheme there should be no pending event:
5029
need to add code to assert that is the case.
5031
session->binlog_flush_pending_rows_event(false);
5032
TableList *tables= rli->tables_to_lock;
5033
close_tables_for_reopen(session, &tables);
5035
uint32_t tables_count= rli->tables_to_lock_count;
5036
if ((error= open_tables(session, &tables, &tables_count, 0)))
5038
if (session->is_slave_error || session->is_fatal_error)
5041
Error reporting borrowed from Query_log_event with many excessive
5042
simplifications (we don't honour --slave-skip-errors)
5044
uint32_t actual_error= session->main_da.sql_errno();
5045
rli->report(ERROR_LEVEL, actual_error,
5046
_("Error '%s' on reopening tables"),
5048
? session->main_da.message()
5049
: _("unexpected success or fatal error")));
5050
session->is_slave_error= 1;
5052
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5058
When the open and locking succeeded, we check all tables to
5059
ensure that they still have the correct type.
5061
We can use a down cast here since we know that every table added
5062
to the tables_to_lock is a RPL_TableList.
5066
RPL_TableList *ptr= rli->tables_to_lock;
5067
for ( ; ptr ; ptr= static_cast<RPL_TableList*>(ptr->next_global))
5069
if (ptr->m_tabledef.compatible_with(rli, ptr->table))
5071
mysql_unlock_tables(session, session->lock);
5073
session->is_slave_error= 1;
5074
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5075
return(ERR_BAD_TABLE_DEF);
5081
... and then we add all the tables to the table map and remove
5082
them from tables to lock.
5084
We also invalidate the query cache for all the tables, since
5085
they will now be changed.
5087
TODO [/Matz]: Maybe the query cache should not be invalidated
5088
here? It might be that a table is not changed, even though it
5089
was locked for the statement. We do know that each
5090
Rows_log_event contain at least one row, so after processing one
5091
Rows_log_event, we can invalidate the query cache for the
5094
for (TableList *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
5096
const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
5102
m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
5107
table == NULL means that this table should not be replicated
5108
(this was set up by Table_map_log_event::do_apply_event()
5109
which tested replicate-* rules).
5113
It's not needed to set_time() but
5114
1) it continues the property that "Time" in SHOW PROCESSLIST shows how
5115
much slave is behind
5116
2) it will be needed when we allow replication from a table with no
5117
TIMESTAMP column to a table with one.
5118
So we call set_time(), like in SBR. Presently it changes nothing.
5120
session->set_time((time_t)when);
5122
There are a few flags that are replicated with each row event.
5123
Make sure to set/clear them before executing the main body of
5126
if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
5127
session->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
5129
session->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
5131
if (get_flags(RELAXED_UNIQUE_CHECKS_F))
5132
session->options|= OPTION_RELAXED_UNIQUE_CHECKS;
5134
session->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
5136
if (slave_allow_batching)
5137
session->options|= OPTION_ALLOW_BATCH;
5139
session->options&= ~OPTION_ALLOW_BATCH;
5141
/* A small test to verify that objects have consistent types */
5142
assert(sizeof(session->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
5145
Now we are in a statement and will stay in a statement until we
5148
We set this flag here, before actually applying any rows, in
5149
case the SQL thread is stopped and we need to detect that we're
5150
inside a statement and halting abruptly might cause problems
5153
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
5155
if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
5156
set_flags(COMPLETE_ROWS_F);
5159
Set tables write and read sets.
5161
Read_set contains all slave columns (in case we are going to fetch
5162
a complete record from slave)
5164
Write_set equals the m_cols bitmap sent from master but it can be
5165
longer if slave has extra columns.
5168
bitmap_set_all(table->read_set);
5169
bitmap_set_all(table->write_set);
5170
if (!get_flags(COMPLETE_ROWS_F))
5171
bitmap_intersect(table->write_set,&m_cols);
5173
this->slave_exec_mode= slave_exec_mode_options; // fix the mode
5175
// Do event specific preparations
5176
error= do_before_row_operations(rli);
5178
// row processing loop
5180
while (error == 0 && m_curr_row < m_rows_end)
5182
/* in_use can have been set to NULL in close_tables_for_reopen */
5183
Session* old_session= table->in_use;
5185
table->in_use= session;
5187
error= do_exec_row(rli);
5189
table->in_use = old_session;
5195
The following list of "idempotent" errors
5196
means that an error from the list might happen
5197
because of idempotent (more than once)
5198
applying of a binlog file.
5199
Notice, that binlog has a ddl operation its
5200
second applying may cause
5202
case HA_ERR_TABLE_DEF_CHANGED:
5203
case HA_ERR_CANNOT_ADD_FOREIGN:
5205
which are not included into to the list.
5207
case HA_ERR_RECORD_CHANGED:
5208
case HA_ERR_RECORD_DELETED:
5209
case HA_ERR_KEY_NOT_FOUND:
5210
case HA_ERR_END_OF_FILE:
5211
case HA_ERR_FOUND_DUPP_KEY:
5212
case HA_ERR_FOUND_DUPP_UNIQUE:
5213
case HA_ERR_FOREIGN_DUPLICATE_KEY:
5214
case HA_ERR_NO_REFERENCED_ROW:
5215
case HA_ERR_ROW_IS_REFERENCED:
5216
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
5218
if (global_system_variables.log_warnings)
5219
slave_rows_error_report(WARNING_LEVEL, error, rli, session, table,
5221
RPL_LOG_NAME, (ulong) log_pos);
5227
session->is_slave_error= 1;
5232
If m_curr_row_end was not set during event execution (e.g., because
5233
of errors) we can't proceed to the next row. If the error is transient
5234
(i.e., error==0 at this point) we must call unpack_current_row() to set
5237
if (!m_curr_row_end && !error)
5238
unpack_current_row(rli, &m_cols);
5240
// at this moment m_curr_row_end should be set
5241
assert(error || m_curr_row_end != NULL);
5242
assert(error || m_curr_row < m_curr_row_end);
5243
assert(error || m_curr_row_end <= m_rows_end);
5245
m_curr_row= m_curr_row_end;
5247
} // row processing loop
5249
error= do_after_row_operations(rli, error);
5252
session->options|= OPTION_KEEP_LOG;
5257
We need to delay this clear until here bacause unpack_current_row() uses
5258
master-side table definitions stored in rli.
5260
if (rli->tables_to_lock && get_flags(STMT_END_F))
5261
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
5262
/* reset OPTION_ALLOW_BATCH as not affect later events */
5263
session->options&= ~OPTION_ALLOW_BATCH;
5266
{ /* error has occured during the transaction */
5267
slave_rows_error_report(ERROR_LEVEL, error, rli, session, table,
5268
get_type_str(), RPL_LOG_NAME, (ulong) log_pos);
5273
If one day we honour --skip-slave-errors in row-based replication, and
5274
the error should be skipped, then we would clear mappings, rollback,
5275
close tables, but the slave SQL thread would not stop and then may
5276
assume the mapping is still available, the tables are still open...
5277
So then we should clear mappings/rollback/close here only if this is a
5279
For now we code, knowing that error is not skippable and so slave SQL
5280
thread is certainly going to stop.
5281
rollback at the caller along with sbr.
5283
session->reset_current_stmt_binlog_row_based();
5284
const_cast<Relay_log_info*>(rli)->cleanup_context(session, error);
5285
session->is_slave_error= 1;
5290
This code would ideally be placed in do_update_pos() instead, but
5291
since we have no access to table there, we do the setting of
5292
last_event_start_time here instead.
5294
if (table && (table->s->primary_key == MAX_KEY) &&
5295
!cache_stmt && get_flags(STMT_END_F) == RLE_NO_FLAGS)
5298
------------ Temporary fix until WL#2975 is implemented ---------
5300
This event is not the last one (no STMT_END_F). If we stop now
5301
(in case of terminate_slave_thread()), how will we restart? We
5302
have to restart from Table_map_log_event, but as this table is
5303
not transactional, the rows already inserted will still be
5304
present, and idempotency is not guaranteed (no PK) so we risk
5305
that repeating leads to double insert. So we desperately try to
5306
continue, hope we'll eventually leave this buggy situation (by
5307
executing the final Rows_log_event). If we are in a hopeless
5308
wait (reached end of last relay log and nothing gets appended
5309
there), we timeout after one minute, and notify DBA about the
5310
problem. When WL#2975 is implemented, just remove the member
5311
Relay_log_info::last_event_start_time and all its occurrences.
5313
const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
5319
Log_event::enum_skip_reason
5320
Rows_log_event::do_shall_skip(Relay_log_info *rli)
5323
If the slave skip counter is 1 and this event does not end a
5324
statement, then we should not start executing on the next event.
5325
Otherwise, we defer the decision to the normal skipping logic.
5327
if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
5328
return Log_event::EVENT_SKIP_IGNORE;
5330
return Log_event::do_shall_skip(rli);
5334
Rows_log_event::do_update_pos(Relay_log_info *rli)
5338
if (get_flags(STMT_END_F))
5341
This is the end of a statement or transaction, so close (and
5342
unlock) the tables we opened when processing the
5343
Table_map_log_event starting the statement.
5345
OBSERVER. This will clear *all* mappings, not only those that
5346
are open for the table. There is not good handle for on-close
5349
NOTE. Even if we have no table ('table' == 0) we still need to be
5350
here, so that we increase the group relay log position. If we didn't, we
5351
could have a group relay log position which lags behind "forever"
5352
(assume the last master's transaction is ignored by the slave because of
5353
replicate-ignore rules).
5355
session->binlog_flush_pending_rows_event(true);
5358
If this event is not in a transaction, the call below will, if some
5359
transactional storage engines are involved, commit the statement into
5360
them and flush the pending event to binlog.
5361
If this event is in a transaction, the call will do nothing, but a
5362
Xid_log_event will come next which will, if some transactional engines
5363
are involved, commit the transaction and flush the pending event to the
5366
error= ha_autocommit_or_rollback(session, 0);
5369
Now what if this is not a transactional engine? we still need to
5370
flush the pending event to the binlog; we did it with
5371
session->binlog_flush_pending_rows_event(). Note that we imitate
5372
what is done for real queries: a call to
5373
ha_autocommit_or_rollback() (sometimes only if involves a
5374
transactional engine), and a call to be sure to have the pending
5378
session->reset_current_stmt_binlog_row_based();
5380
rli->cleanup_context(session, 0);
5384
Indicate that a statement is finished.
5385
Step the group log position if we are not in a transaction,
5386
otherwise increase the event log position.
5388
rli->stmt_done(log_pos, when);
5391
Clear any errors pushed in session->net.last_err* if for example "no key
5392
found" (as this is allowed). This is a safety measure; apparently
5393
those errors (e.g. when executing a Delete_rows_log_event of a
5394
non-existing row, like in rpl_row_mystery22.test,
5395
session->net.last_error = "Can't find record in 't1'" and last_errno=1032)
5396
do not become visible. We still prefer to wipe them out.
5398
session->clear_error();
5401
rli->report(ERROR_LEVEL, error,
5402
_("Error in %s event: commit of row events failed, "
5404
get_type_str(), m_table->s->db.str,
5405
m_table->s->table_name.str);
5409
rli->inc_event_relay_log_pos();
5415
bool Rows_log_event::write_data_header(IO_CACHE *file)
5417
unsigned char buf[ROWS_HEADER_LEN]; // No need to init the buffer
5418
assert(m_table_id != UINT32_MAX);
5419
int6store(buf + RW_MAPID_OFFSET, (uint64_t)m_table_id);
5420
int2store(buf + RW_FLAGS_OFFSET, m_flags);
5421
return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
5424
bool Rows_log_event::write_data_body(IO_CACHE*file)
5427
Note that this should be the number of *bits*, not the number of
5430
unsigned char sbuf[sizeof(m_width)];
5431
my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
5433
unsigned char *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
5434
assert(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
5436
res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
5438
res= res || my_b_safe_write(file, (unsigned char*) m_cols.bitmap,
5439
no_bytes_in_map(&m_cols));
5441
TODO[refactor write]: Remove the "down cast" here (and elsewhere).
5443
if (get_type_code() == UPDATE_ROWS_EVENT)
5445
res= res || my_b_safe_write(file, (unsigned char*) m_cols_ai.bitmap,
5446
no_bytes_in_map(&m_cols_ai));
5448
res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
5455
void Rows_log_event::pack_info(Protocol *protocol)
5458
char const *const flagstr=
5459
get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
5460
size_t bytes= snprintf(buf, sizeof(buf),
5461
"table_id: %lu%s", m_table_id, flagstr);
5462
protocol->store(buf, bytes, &my_charset_bin);
5466
/**************************************************************************
5467
Table_map_log_event member functions and support functions
5468
**************************************************************************/
5471
@page How replication of field metadata works.
5473
When a table map is created, the master first calls
5474
Table_map_log_event::save_field_metadata() which calculates how many
5475
values will be in the field metadata. Only those fields that require the
5476
extra data are added. The method also loops through all of the fields in
5477
the table calling the method Field::save_field_metadata() which returns the
5478
values for the field that will be saved in the metadata and replicated to
5479
the slave. Once all fields have been processed, the table map is written to
5480
the binlog adding the size of the field metadata and the field metadata to
5481
the end of the body of the table map.
5483
When a table map is read on the slave, the field metadata is read from the
5484
table map and passed to the table_def class constructor which saves the
5485
field metadata from the table map into an array based on the type of the
5486
field. Field metadata values not present (those fields that do not use extra
5487
data) in the table map are initialized as zero (0). The array size is the
5488
same as the columns for the table on the slave.
5490
Additionally, values saved for field metadata on the master are saved as a
5491
string of bytes (unsigned char) in the binlog. A field may require 1 or more bytes
5492
to store the information. In cases where values require multiple bytes
5493
(e.g. values > 255), the endian-safe methods are used to properly encode
5494
the values on the master and decode them on the slave. When the field
5495
metadata values are captured on the slave, they are stored in an array of
5496
type uint16_t. This allows the least number of casts to prevent casting bugs
5497
when the field metadata is used in comparisons of field attributes. When
5498
the field metadata is used for calculating addresses in pointer math, the
5499
type used is uint32_t.
5503
Save the field metadata based on the real_type of the field.
5504
The metadata saved depends on the type of the field. Some fields
5505
store a single byte for pack_length() while others store two bytes
5506
for field_length (max length).
5511
We may want to consider changing the encoding of the information.
5512
Currently, the code attempts to minimize the number of bytes written to
5513
the tablemap. There are at least two other alternatives; 1) using
5514
net_store_length() to store the data allowing it to choose the number of
5515
bytes that are appropriate thereby making the code much easier to
5516
maintain (only 1 place to change the encoding), or 2) use a fixed number
5517
of bytes for each field. The problem with option 1 is that net_store_length()
5518
will use one byte if the value < 251, but 3 bytes if it is > 250. Thus,
5519
for fields like CHAR which can be no larger than 255 characters, the method
5520
will use 3 bytes when the value is > 250. Further, every value that is
5521
encoded using 2 parts (e.g., pack_length, field_length) will be numerically
5522
> 250 therefore will use 3 bytes for eah value. The problem with option 2
5523
is less wasteful for space but does waste 1 byte for every field that does
5526
int Table_map_log_event::save_field_metadata()
5529
for (unsigned int i= 0 ; i < m_table->s->fields ; i++)
5530
index+= m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
5535
Constructor used to build an event for writing to the binary log.
5536
Mats says tbl->s lives longer than this event so it's ok to copy pointers
5537
(tbl->s->db etc) and not pointer content.
5539
Table_map_log_event::Table_map_log_event(Session *session, Table *tbl, ulong tid,
5540
bool is_transactional __attribute__((unused)),
5542
: Log_event(session, 0, true),
5544
m_dbnam(tbl->s->db.str),
5545
m_dblen(m_dbnam ? tbl->s->db.length : 0),
5546
m_tblnam(tbl->s->table_name.str),
5547
m_tbllen(tbl->s->table_name.length),
5548
m_colcnt(tbl->s->fields),
5553
m_field_metadata(0),
5554
m_field_metadata_size(0),
5558
assert(m_table_id != UINT32_MAX);
5560
In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in
5561
table.cc / alloc_table_share():
5562
Use the fact the key is db/0/table_name/0
5563
As we rely on this let's assert it.
5565
assert((tbl->s->db.str == 0) ||
5566
(tbl->s->db.str[tbl->s->db.length] == 0));
5567
assert(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
5570
m_data_size= TABLE_MAP_HEADER_LEN;
5571
m_data_size+= m_dblen + 2; // Include length and terminating \0
5572
m_data_size+= m_tbllen + 2; // Include length and terminating \0
5573
m_data_size+= 1 + m_colcnt; // COLCNT and column types
5575
/* If malloc fails, caught in is_valid() */
5576
if ((m_memory= (unsigned char*) my_malloc(m_colcnt, MYF(MY_WME))))
5578
m_coltype= reinterpret_cast<unsigned char*>(m_memory);
5579
for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
5580
m_coltype[i]= m_table->field[i]->type();
5584
Calculate a bitmap for the results of maybe_null() for all columns.
5585
The bitmap is used to determine when there is a column from the master
5586
that is not on the slave and is null and thus not in the row data during
5589
uint32_t num_null_bytes= (m_table->s->fields + 7) / 8;
5590
m_data_size+= num_null_bytes;
5591
m_meta_memory= (unsigned char *)my_multi_malloc(MYF(MY_WME),
5592
&m_null_bits, num_null_bytes,
5593
&m_field_metadata, (m_colcnt * 2),
5596
memset(m_field_metadata, 0, (m_colcnt * 2));
5599
Create an array for the field metadata and store it.
5601
m_field_metadata_size= save_field_metadata();
5602
assert(m_field_metadata_size <= (m_colcnt * 2));
5605
Now set the size of the data to the size of the field metadata array
5606
plus one or two bytes for number of elements in the field metadata array.
5608
if (m_field_metadata_size > 255)
5609
m_data_size+= m_field_metadata_size + 2;
5611
m_data_size+= m_field_metadata_size + 1;
5613
memset(m_null_bits, 0, num_null_bytes);
5614
for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
5615
if (m_table->field[i]->maybe_null())
5616
m_null_bits[(i / 8)]+= 1 << (i % 8);
5622
Constructor used by slave to read the event from the binary log.
5624
Table_map_log_event::Table_map_log_event(const char *buf, uint32_t event_len,
5625
const Format_description_log_event
5628
: Log_event(buf, description_event),
5630
m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
5631
m_colcnt(0), m_coltype(0),
5632
m_memory(NULL), m_table_id(ULONG_MAX), m_flags(0),
5633
m_data_size(0), m_field_metadata(0), m_field_metadata_size(0),
5634
m_null_bits(0), m_meta_memory(NULL)
5636
unsigned int bytes_read= 0;
5638
uint8_t common_header_len= description_event->common_header_len;
5639
uint8_t post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
5641
/* Read the post-header */
5642
const char *post_start= buf + common_header_len;
5644
post_start+= TM_MAPID_OFFSET;
5645
if (post_header_len == 6)
5647
/* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
5648
m_table_id= uint4korr(post_start);
5653
assert(post_header_len == TABLE_MAP_HEADER_LEN);
5654
m_table_id= (ulong) uint6korr(post_start);
5655
post_start+= TM_FLAGS_OFFSET;
5658
assert(m_table_id != UINT32_MAX);
5660
m_flags= uint2korr(post_start);
5662
/* Read the variable part of the event */
5663
const char *const vpart= buf + common_header_len + post_header_len;
5665
/* Extract the length of the various parts from the buffer */
5666
unsigned char const *const ptr_dblen= (unsigned char const*)vpart + 0;
5667
m_dblen= *(unsigned char*) ptr_dblen;
5669
/* Length of database name + counter + terminating null */
5670
unsigned char const *const ptr_tbllen= ptr_dblen + m_dblen + 2;
5671
m_tbllen= *(unsigned char*) ptr_tbllen;
5673
/* Length of table name + counter + terminating null */
5674
unsigned char const *const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
5675
unsigned char *ptr_after_colcnt= (unsigned char*) ptr_colcnt;
5676
m_colcnt= net_field_length(&ptr_after_colcnt);
5678
/* Allocate mem for all fields in one go. If fails, caught in is_valid() */
5679
m_memory= (unsigned char*) my_multi_malloc(MYF(MY_WME),
5680
&m_dbnam, (uint) m_dblen + 1,
5681
&m_tblnam, (uint) m_tbllen + 1,
5682
&m_coltype, (uint) m_colcnt,
5687
/* Copy the different parts into their memory */
5688
strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen + 1, m_dblen + 1);
5689
strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen + 1);
5690
memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
5692
ptr_after_colcnt= ptr_after_colcnt + m_colcnt;
5693
bytes_read= ptr_after_colcnt - (unsigned char *)buf;
5694
if (bytes_read < event_len)
5696
m_field_metadata_size= net_field_length(&ptr_after_colcnt);
5697
assert(m_field_metadata_size <= (m_colcnt * 2));
5698
uint32_t num_null_bytes= (m_colcnt + 7) / 8;
5699
m_meta_memory= (unsigned char *)my_multi_malloc(MYF(MY_WME),
5700
&m_null_bits, num_null_bytes,
5701
&m_field_metadata, m_field_metadata_size,
5703
memcpy(m_field_metadata, ptr_after_colcnt, m_field_metadata_size);
5704
ptr_after_colcnt= (unsigned char*)ptr_after_colcnt + m_field_metadata_size;
5705
memcpy(m_null_bits, ptr_after_colcnt, num_null_bytes);
5712
Table_map_log_event::~Table_map_log_event()
5714
free(m_meta_memory);
5719
Return value is an error code, one of:
5721
-1 Failure to open table [from open_tables()]
5723
1 No room for more tables [from set_table()]
5724
2 Out of memory [from set_table()]
5725
3 Wrong table definition
5726
4 Daisy-chaining RBR with SBR not possible
5729
int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
5731
RPL_TableList *table_list;
5732
char *db_mem, *tname_mem;
5733
Query_id &query_id= Query_id::get_query_id();
5736
assert(rli->sql_session == session);
5738
/* Step the query id to mark what columns that are actually used. */
5739
session->query_id= query_id.next();
5741
if (!(memory= my_multi_malloc(MYF(MY_WME),
5742
&table_list, (uint) sizeof(RPL_TableList),
5743
&db_mem, (uint) NAME_LEN + 1,
5744
&tname_mem, (uint) NAME_LEN + 1,
5746
return(HA_ERR_OUT_OF_MEM);
5748
memset(table_list, 0, sizeof(*table_list));
5749
table_list->db = db_mem;
5750
table_list->alias= table_list->table_name = tname_mem;
5751
table_list->lock_type= TL_WRITE;
5752
table_list->next_global= table_list->next_local= 0;
5753
table_list->table_id= m_table_id;
5754
table_list->updating= 1;
5755
my_stpcpy(table_list->db, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
5756
my_stpcpy(table_list->table_name, m_tblnam);
5760
if (!rpl_filter->db_ok(table_list->db) ||
5761
(rpl_filter->is_on() && !rpl_filter->tables_ok("", table_list)))
5768
open_tables() reads the contents of session->lex, so they must be
5769
initialized, so we should call lex_start(); to be even safer, we
5770
call mysql_init_query() which does a more complete set of inits.
5773
mysql_reset_session_for_next_command(session);
5775
Check if the slave is set to use SBR. If so, it should switch
5776
to using RBR until the end of the "statement", i.e., next
5777
STMT_END_F or next error.
5779
if (!session->current_stmt_binlog_row_based &&
5780
mysql_bin_log.is_open() && (session->options & OPTION_BIN_LOG))
5782
session->set_current_stmt_binlog_row_based();
5786
Open the table if it is not already open and add the table to
5787
table map. Note that for any table that should not be
5788
replicated, a filter is needed.
5790
The creation of a new TableList is used to up-cast the
5791
table_list consisting of RPL_TableList items. This will work
5792
since the only case where the argument to open_tables() is
5793
changed, is when session->lex->query_tables == table_list, i.e.,
5794
when the statement requires prelocking. Since this is not
5795
executed when a statement is executed, this case will not occur.
5796
As a precaution, an assertion is added to ensure that the bad
5799
Either way, the memory in the list is *never* released
5800
internally in the open_tables() function, hence we take a copy
5801
of the pointer to make sure that it's not lost.
5804
assert(session->lex->query_tables != table_list);
5805
TableList *tmp_table_list= table_list;
5806
if ((error= open_tables(session, &tmp_table_list, &count, 0)))
5808
if (session->is_slave_error || session->is_fatal_error)
5811
Error reporting borrowed from Query_log_event with many excessive
5812
simplifications (we don't honour --slave-skip-errors)
5814
uint32_t actual_error= session->main_da.sql_errno();
5815
rli->report(ERROR_LEVEL, actual_error,
5816
_("Error '%s' on opening table `%s`.`%s`"),
5818
? session->main_da.message()
5819
: _("unexpected success or fatal error")),
5820
table_list->db, table_list->table_name);
5821
session->is_slave_error= 1;
5826
m_table= table_list->table;
5829
This will fail later otherwise, the 'in_use' field should be
5830
set to the current thread.
5832
assert(m_table->in_use);
5835
Use placement new to construct the table_def instance in the
5836
memory allocated for it inside table_list.
5838
The memory allocated by the table_def structure (i.e., not the
5839
memory allocated *for* the table_def structure) is released
5840
inside Relay_log_info::clear_tables_to_lock() by calling the
5841
table_def destructor explicitly.
5843
new (&table_list->m_tabledef) table_def(m_coltype, m_colcnt,
5844
m_field_metadata, m_field_metadata_size, m_null_bits);
5845
table_list->m_tabledef_valid= true;
5848
We record in the slave's information that the table should be
5849
locked by linking the table into the list of tables to lock.
5851
table_list->next_global= table_list->next_local= rli->tables_to_lock;
5852
const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
5853
const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
5854
/* 'memory' is freed in clear_tables_to_lock */
5864
Log_event::enum_skip_reason
5865
Table_map_log_event::do_shall_skip(Relay_log_info *rli)
5868
If the slave skip counter is 1, then we should not start executing
5871
return continue_group(rli);
5874
int Table_map_log_event::do_update_pos(Relay_log_info *rli)
5876
rli->inc_event_relay_log_pos();
5881
bool Table_map_log_event::write_data_header(IO_CACHE *file)
5883
assert(m_table_id != UINT32_MAX);
5884
unsigned char buf[TABLE_MAP_HEADER_LEN];
5885
int6store(buf + TM_MAPID_OFFSET, (uint64_t)m_table_id);
5886
int2store(buf + TM_FLAGS_OFFSET, m_flags);
5887
return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
5890
bool Table_map_log_event::write_data_body(IO_CACHE *file)
5892
assert(m_dbnam != NULL);
5893
assert(m_tblnam != NULL);
5894
/* We use only one byte per length for storage in event: */
5895
assert(m_dblen < 128);
5896
assert(m_tbllen < 128);
5898
unsigned char const dbuf[]= { (unsigned char) m_dblen };
5899
unsigned char const tbuf[]= { (unsigned char) m_tbllen };
5901
unsigned char cbuf[sizeof(m_colcnt)];
5902
unsigned char *const cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
5903
assert(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
5906
Store the size of the field metadata.
5908
unsigned char mbuf[sizeof(m_field_metadata_size)];
5909
unsigned char *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
5911
return (my_b_safe_write(file, dbuf, sizeof(dbuf)) ||
5912
my_b_safe_write(file, (const unsigned char*)m_dbnam, m_dblen+1) ||
5913
my_b_safe_write(file, tbuf, sizeof(tbuf)) ||
5914
my_b_safe_write(file, (const unsigned char*)m_tblnam, m_tbllen+1) ||
5915
my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
5916
my_b_safe_write(file, m_coltype, m_colcnt) ||
5917
my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
5918
my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
5919
my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
5924
Print some useful information for the SHOW BINARY LOG information
5928
void Table_map_log_event::pack_info(Protocol *protocol)
5931
size_t bytes= snprintf(buf, sizeof(buf),
5932
"table_id: %lu (%s.%s)",
5933
m_table_id, m_dbnam, m_tblnam);
5934
protocol->store(buf, bytes, &my_charset_bin);
5938
/**************************************************************************
5939
Write_rows_log_event member functions
5940
**************************************************************************/
5943
Constructor used to build an event for writing to the binary log.
5945
Write_rows_log_event::Write_rows_log_event(Session *session_arg, Table *tbl_arg,
5947
bool is_transactional)
5948
: Rows_log_event(session_arg, tbl_arg, tid_arg, tbl_arg->write_set, is_transactional)
5953
Constructor used by slave to read the event from the binary log.
5955
Write_rows_log_event::Write_rows_log_event(const char *buf, uint32_t event_len,
5956
const Format_description_log_event
5958
: Rows_log_event(buf, event_len, WRITE_ROWS_EVENT, description_event)
5963
Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
5968
todo: to introduce a property for the event (handler?) which forces
5969
applying the event in the replace (idempotent) fashion.
5971
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
5974
We are using REPLACE semantics and not INSERT IGNORE semantics
5975
when writing rows, that is: new rows replace old rows. We need to
5976
inform the storage engine that it should use this behaviour.
5979
/* Tell the storage engine that we are using REPLACE semantics. */
5980
session->lex->duplicates= DUP_REPLACE;
5983
Pretend we're executing a REPLACE command: this is needed for
5984
InnoDB since it is not (properly) checking the
5985
lex->duplicates flag.
5987
session->lex->sql_command= SQLCOM_REPLACE;
5989
Do not raise the error flag in case of hitting to an unique attribute
5991
m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
5994
m_table->file->ha_start_bulk_insert(0);
5996
We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
5997
any TIMESTAMP column with data from the row but instead will use
5998
the event's current time.
5999
As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra
6000
columns, we know that all TIMESTAMP columns on slave will receive explicit
6001
data from the row, so TIMESTAMP_NO_AUTO_SET is ok.
6002
When we allow a table without TIMESTAMP to be replicated to a table having
6003
more columns including a TIMESTAMP column, or when we allow a TIMESTAMP
6004
column to be replicated into a BIGINT column and the slave's table has a
6005
TIMESTAMP column, then the slave's TIMESTAMP column will take its value
6006
from set_time() which we called earlier (consistent with SBR). And then in
6007
some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
6008
analyze if explicit data is provided for slave's TIMESTAMP columns).
6010
m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
6016
Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
6020
if (bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1)
6022
m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
6023
m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
6025
resetting the extra with
6026
table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
6028
explanation: file->reset() performs this duty
6029
ultimately. Still todo: fix
6032
if ((local_error= m_table->file->ha_end_bulk_insert()))
6034
m_table->file->print_error(local_error, MYF(0));
6036
return error? error : local_error;
6041
Check if there are more UNIQUE keys after the given key.
6044
last_uniq_key(Table *table, uint32_t keyno)
6046
while (++keyno < table->s->keys)
6047
if (table->key_info[keyno].flags & HA_NOSAME)
6053
Check if an error is a duplicate key error.
6055
This function is used to check if an error code is one of the
6056
duplicate key error, i.e., and error code for which it is sensible
6057
to do a <code>get_dup_key()</code> to retrieve the duplicate key.
6059
@param errcode The error code to check.
6061
@return <code>true</code> if the error code is such that
6062
<code>get_dup_key()</code> will return true, <code>false</code>
6066
is_duplicate_key_error(int errcode)
6070
case HA_ERR_FOUND_DUPP_KEY:
6071
case HA_ERR_FOUND_DUPP_UNIQUE:
6078
Write the current row into event's table.
6080
The row is located in the row buffer, pointed by @c m_curr_row member.
6081
Number of columns of the row is stored in @c m_width member (it can be
6082
different from the number of columns in the table to which we insert).
6083
Bitmap @c m_cols indicates which columns are present in the row. It is assumed
6084
that event's table is already open and pointed by @c m_table.
6086
If the same record already exists in the table it can be either overwritten
6087
or an error is reported depending on the value of @c overwrite flag
6088
(error reporting not yet implemented). Note that the matching record can be
6089
different from the row we insert if we use primary keys to identify records in
6092
The row to be inserted can contain values only for selected columns. The
6093
missing columns are filled with default values using @c prepare_record()
6094
function. If a matching record is found in the table and @c overwritte is
6095
true, the missing columns are taken from it.
6097
@param rli Relay log info (needed for row unpacking).
6099
Shall we overwrite if the row already exists or signal
6100
error (currently ignored).
6102
@returns Error code on failure, 0 on success.
6104
This method, if successful, sets @c m_curr_row_end pointer to point at the
6105
next row in the rows buffer. This is done when unpacking the row to be
6108
@note If a matching record is found, it is either updated using
6109
@c ha_update_row() or first deleted and then new record written.
6113
Rows_log_event::write_row(const Relay_log_info *const rli,
6114
const bool overwrite)
6116
assert(m_table != NULL && session != NULL);
6118
Table *table= m_table; // pointer to event's table
6121
auto_afree_ptr<char> key(NULL);
6123
/* fill table->record[0] with default values */
6126
We only check if the columns have default values for non-NDB
6127
engines, for NDB we ignore the check since updates are sent as
6128
writes, causing errors when trying to prepare the record.
6130
TODO[ndb]: Elimiate this hard-coded dependency on NDB. Ideally,
6131
the engine should be able to set a flag that it want the default
6132
values filled in and one flag to handle the case that the default
6133
values should be checked. Maybe these two flags can be combined.
6135
if ((error= prepare_record(table, &m_cols, m_width, true)))
6138
/* unpack row into table->record[0] */
6139
error= unpack_current_row(rli, &m_cols);
6141
// Temporary fix to find out why it fails [/Matz]
6142
memcpy(m_table->write_set->bitmap, m_cols.bitmap, (m_table->write_set->n_bits + 7) / 8);
6145
Try to write record. If a corresponding record already exists in the table,
6146
we try to change it using ha_update_row() if possible. Otherwise we delete
6147
it and repeat the whole process again.
6149
TODO: Add safety measures against infinite looping.
6152
while ((error= table->file->ha_write_row(table->record[0])))
6154
if (error == HA_ERR_LOCK_DEADLOCK ||
6155
error == HA_ERR_LOCK_WAIT_TIMEOUT ||
6156
(keynum= table->file->get_dup_key(error)) < 0 ||
6160
Deadlock, waiting for lock or just an error from the handler
6161
such as HA_ERR_FOUND_DUPP_KEY when overwrite is false.
6162
Retrieval of the duplicate key number may fail
6163
- either because the error was not "duplicate key" error
6164
- or because the information which key is not available
6166
table->file->print_error(error, MYF(0));
6170
We need to retrieve the old row into record[1] to be able to
6171
either update or delete the offending record. We either:
6173
- use rnd_pos() with a row-id (available as dupp_row) to the
6174
offending row, if that is possible (MyISAM and Blackhole), or else
6176
- use index_read_idx() with the key that is duplicated, to
6177
retrieve the offending row.
6179
if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
6181
if (table->file->inited && (error= table->file->ha_index_end()))
6183
if ((error= table->file->ha_rnd_init(false)))
6186
error= table->file->rnd_pos(table->record[1], table->file->dup_ref);
6187
table->file->ha_rnd_end();
6190
table->file->print_error(error, MYF(0));
6196
if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
6201
if (key.get() == NULL)
6203
key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
6204
if (key.get() == NULL)
6210
key_copy((unsigned char*)key.get(), table->record[0], table->key_info + keynum,
6212
error= table->file->index_read_idx_map(table->record[1], keynum,
6213
(const unsigned char*)key.get(),
6218
table->file->print_error(error, MYF(0));
6224
Now, record[1] should contain the offending row. That
6225
will enable us to update it or, alternatively, delete it (so
6226
that we can insert the new row afterwards).
6230
If row is incomplete we will use the record found to fill
6233
if (!get_flags(COMPLETE_ROWS_F))
6235
restore_record(table,record[1]);
6236
error= unpack_current_row(rli, &m_cols);
6240
REPLACE is defined as either INSERT or DELETE + INSERT. If
6241
possible, we can replace it with an UPDATE, but that will not
6242
work on InnoDB if FOREIGN KEY checks are necessary.
6244
I (Matz) am not sure of the reason for the last_uniq_key()
6245
check as, but I'm guessing that it's something along the
6248
Suppose that we got the duplicate key to be a key that is not
6249
the last unique key for the table and we perform an update:
6250
then there might be another key for which the unique check will
6251
fail, so we're better off just deleting the row and inserting
6254
if (last_uniq_key(table, keynum) &&
6255
!table->file->referenced_by_foreign_key())
6257
error=table->file->ha_update_row(table->record[1],
6261
case HA_ERR_RECORD_IS_THE_SAME:
6268
table->file->print_error(error, MYF(0));
6275
if ((error= table->file->ha_delete_row(table->record[1])))
6277
table->file->print_error(error, MYF(0));
6280
/* Will retry ha_write_row() with the offending row removed. */
6288
int Rows_log_event::unpack_current_row(const Relay_log_info *const rli,
6289
MY_BITMAP const *cols)
6292
ASSERT_OR_RETURN_ERROR(m_curr_row < m_rows_end, HA_ERR_CORRUPT_EVENT);
6293
int const result= ::unpack_row(rli, m_table, m_width, m_curr_row, cols,
6294
&m_curr_row_end, &m_master_reclength);
6295
if (m_curr_row_end > m_rows_end)
6296
my_error(ER_SLAVE_CORRUPT_EVENT, MYF(0));
6297
ASSERT_OR_RETURN_ERROR(m_curr_row_end <= m_rows_end, HA_ERR_CORRUPT_EVENT);
6303
Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
6305
assert(m_table != NULL);
6307
write_row(rli, /* if 1 then overwrite */
6308
bit_is_set(slave_exec_mode, SLAVE_EXEC_MODE_IDEMPOTENT) == 1);
6310
if (error && !session->is_error())
6313
my_error(ER_UNKNOWN_ERROR, MYF(0));
6320
/**************************************************************************
6321
Delete_rows_log_event member functions
6322
**************************************************************************/
6325
Compares table->record[0] and table->record[1]
6327
Returns TRUE if different.
6329
static bool record_compare(Table *table)
6332
Need to set the X bit and the filler bits in both records since
6333
there are engines that do not set it correctly.
6335
In addition, since MyISAM checks that one hasn't tampered with the
6336
record, it is necessary to restore the old bytes into the record
6337
after doing the comparison.
6339
TODO[record format ndb]: Remove it once NDB returns correct
6340
records. Check that the other engines also return correct records.
6343
unsigned char saved_x[2], saved_filler[2];
6345
if (table->s->null_bytes > 0)
6347
for (int i = 0 ; i < 2 ; ++i)
6349
saved_x[i]= table->record[i][0];
6350
saved_filler[i]= table->record[i][table->s->null_bytes - 1];
6351
table->record[i][0]|= 1U;
6352
table->record[i][table->s->null_bytes - 1]|=
6353
256U - (1U << table->s->last_null_bit_pos);
6357
if (table->s->blob_fields + table->s->varchar_fields == 0)
6359
result= cmp_record(table,record[1]);
6360
goto record_compare_exit;
6363
/* Compare null bits */
6364
if (memcmp(table->null_flags,
6365
table->null_flags+table->s->rec_buff_length,
6366
table->s->null_bytes))
6368
result= true; // Diff in NULL value
6369
goto record_compare_exit;
6372
/* Compare updated fields */
6373
for (Field **ptr=table->field ; *ptr ; ptr++)
6375
if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
6378
goto record_compare_exit;
6382
record_compare_exit:
6384
Restore the saved bytes.
6386
TODO[record format ndb]: Remove this code once NDB returns the
6387
correct record format.
6389
if (table->s->null_bytes > 0)
6391
for (int i = 0 ; i < 2 ; ++i)
6393
table->record[i][0]= saved_x[i];
6394
table->record[i][table->s->null_bytes - 1]= saved_filler[i];
6402
Locate the current row in event's table.
6404
The current row is pointed by @c m_curr_row. Member @c m_width tells how many
6405
columns are there in the row (this can be differnet from the number of columns
6406
in the table). It is assumed that event's table is already open and pointed
6409
If a corresponding record is found in the table it is stored in
6410
@c m_table->record[0]. Note that when record is located based on a primary
6411
key, it is possible that the record found differs from the row being located.
6413
If no key is specified or table does not have keys, a table scan is used to
6414
find the row. In that case the row should be complete and contain values for
6415
all columns. However, it can still be shorter than the table, i.e. the table
6416
can contain extra columns not present in the row. It is also possible that
6417
the table has fewer columns than the row being located.
6419
@returns Error code on failure, 0 on success.
6421
@post In case of success @c m_table->record[0] contains the record found.
6422
Also, the internal "cursor" of the table is positioned at the record found.
6424
@note If the engine allows random access of the records, a combination of
6425
@c position() and @c rnd_pos() will be used.
6428
int Rows_log_event::find_row(const Relay_log_info *rli)
6430
assert(m_table && m_table->in_use != NULL);
6432
Table *table= m_table;
6435
/* unpack row - missing fields get default values */
6436
prepare_record(table, &m_cols, m_width, false/* don't check errors */);
6437
error= unpack_current_row(rli, &m_cols);
6439
// Temporary fix to find out why it fails [/Matz]
6440
memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
6442
if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
6443
table->s->primary_key < MAX_KEY)
6446
Use a more efficient method to fetch the record given by
6447
table->record[0] if the engine allows it. We first compute a
6448
row reference using the position() member function (it will be
6449
stored in table->file->ref) and the use rnd_pos() to position
6450
the "cursor" (i.e., record[0] in this case) at the correct row.
6452
TODO: Add a check that the correct record has been fetched by
6453
comparing with the original record. Take into account that the
6454
record on the master and slave can be of different
6455
length. Something along these lines should work:
6457
ADD>>> store_record(table,record[1]);
6458
int error= table->file->rnd_pos(table->record[0], table->file->ref);
6459
ADD>>> assert(memcmp(table->record[1], table->record[0],
6460
table->s->reclength) == 0);
6463
int error= table->file->rnd_pos_by_record(table->record[0]);
6464
table->file->ha_rnd_end();
6467
table->file->print_error(error, MYF(0));
6472
// We can't use position() - try other methods.
6475
Save copy of the record in table->record[1]. It might be needed
6476
later if linear search is used to find exact match.
6478
store_record(table,record[1]);
6480
if (table->s->keys > 0)
6482
/* We have a key: search the table using the index */
6483
if (!table->file->inited && (error= table->file->ha_index_init(0, false)))
6485
table->file->print_error(error, MYF(0));
6489
/* Fill key data for the row */
6492
key_copy(m_key, table->record[0], table->key_info, 0);
6495
We need to set the null bytes to ensure that the filler bit are
6496
all set when returning. There are storage engines that just set
6497
the necessary bits on the bytes and don't set the filler bits
6500
my_ptrdiff_t const pos=
6501
table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
6502
table->record[0][pos]= 0xFF;
6504
if ((error= table->file->index_read_map(table->record[0], m_key,
6506
HA_READ_KEY_EXACT)))
6508
table->file->print_error(error, MYF(0));
6509
table->file->ha_index_end();
6514
Below is a minor "optimization". If the key (i.e., key number
6515
0) has the HA_NOSAME flag set, we know that we have found the
6516
correct record (since there can be no duplicates); otherwise, we
6517
have to compare the record with the one found to see if it is
6520
CAVEAT! This behaviour is essential for the replication of,
6521
e.g., the mysql.proc table since the correct record *shall* be
6522
found using the primary key *only*. There shall be no
6523
comparison of non-PK columns to decide if the correct record is
6524
found. I can see no scenario where it would be incorrect to
6525
chose the row to change only using a PK or an UNNI.
6527
if (table->key_info->flags & HA_NOSAME)
6529
table->file->ha_index_end();
6534
In case key is not unique, we still have to iterate over records found
6535
and find the one which is identical to the row given. A copy of the
6536
record we are looking for is stored in record[1].
6538
while (record_compare(table))
6541
We need to set the null bytes to ensure that the filler bit
6542
are all set when returning. There are storage engines that
6543
just set the necessary bits on the bytes and don't set the
6544
filler bits correctly.
6546
TODO[record format ndb]: Remove this code once NDB returns the
6547
correct record format.
6549
if (table->s->null_bytes > 0)
6551
table->record[0][table->s->null_bytes - 1]|=
6552
256U - (1U << table->s->last_null_bit_pos);
6555
if ((error= table->file->index_next(table->record[0])))
6557
table->file->print_error(error, MYF(0));
6558
table->file->ha_index_end();
6564
Have to restart the scan to be able to fetch the next row.
6566
table->file->ha_index_end();
6570
int restart_count= 0; // Number of times scanning has restarted from top
6572
/* We don't have a key: search the table using rnd_next() */
6573
if ((error= table->file->ha_rnd_init(1)))
6575
table->file->print_error(error, MYF(0));
6579
/* Continue until we find the right record or have made a full loop */
6582
error= table->file->rnd_next(table->record[0]);
6587
case HA_ERR_RECORD_DELETED:
6590
case HA_ERR_END_OF_FILE:
6591
if (++restart_count < 2)
6592
table->file->ha_rnd_init(1);
6596
table->file->print_error(error, MYF(0));
6597
table->file->ha_rnd_end();
6601
while (restart_count < 2 && record_compare(table));
6604
Note: above record_compare will take into accout all record fields
6605
which might be incorrect in case a partial row was given in the event
6607
table->file->ha_rnd_end();
6609
assert(error == HA_ERR_END_OF_FILE || error == HA_ERR_RECORD_DELETED || error == 0);
6613
table->default_column_bitmaps();
6616
table->default_column_bitmaps();
6622
Constructor used to build an event for writing to the binary log.
6625
Delete_rows_log_event::Delete_rows_log_event(Session *session_arg, Table *tbl_arg,
6627
bool is_transactional)
6628
: Rows_log_event(session_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
6633
Constructor used by slave to read the event from the binary log.
6635
Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint32_t event_len,
6636
const Format_description_log_event
6638
: Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event)
6644
Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
6646
if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
6647
m_table->s->primary_key < MAX_KEY)
6650
We don't need to allocate any memory for m_key since it is not used.
6655
if (m_table->s->keys > 0)
6657
// Allocate buffer for key searches
6658
m_key= (unsigned char*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
6660
return HA_ERR_OUT_OF_MEM;
6667
Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
6670
/*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
6671
m_table->file->ha_index_or_rnd_end();
6678
int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli)
6681
assert(m_table != NULL);
6683
if (!(error= find_row(rli)))
6686
Delete the record found, located in record[0]
6688
error= m_table->file->ha_delete_row(m_table->record[0]);
6694
/**************************************************************************
6695
Update_rows_log_event member functions
6696
**************************************************************************/
6699
Constructor used to build an event for writing to the binary log.
6701
Update_rows_log_event::Update_rows_log_event(Session *session_arg, Table *tbl_arg,
6703
bool is_transactional)
6704
: Rows_log_event(session_arg, tbl_arg, tid, tbl_arg->read_set, is_transactional)
6706
init(tbl_arg->write_set);
6709
void Update_rows_log_event::init(MY_BITMAP const *cols)
6711
/* if bitmap_init fails, caught in is_valid() */
6712
if (likely(!bitmap_init(&m_cols_ai,
6713
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
6717
/* Cols can be zero if this is a dummy binrows event */
6718
if (likely(cols != NULL))
6720
memcpy(m_cols_ai.bitmap, cols->bitmap, no_bytes_in_map(cols));
6721
create_last_word_mask(&m_cols_ai);
6727
Update_rows_log_event::~Update_rows_log_event()
6729
if (m_cols_ai.bitmap == m_bitbuf_ai) // no my_malloc happened
6730
m_cols_ai.bitmap= 0; // so no free in bitmap_free
6731
bitmap_free(&m_cols_ai); // To pair with bitmap_init().
6736
Constructor used by slave to read the event from the binary log.
6738
Update_rows_log_event::Update_rows_log_event(const char *buf, uint32_t event_len,
6740
Format_description_log_event
6742
: Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event)
6748
Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
6750
if (m_table->s->keys > 0)
6752
// Allocate buffer for key searches
6753
m_key= (unsigned char*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
6755
return HA_ERR_OUT_OF_MEM;
6758
m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
6764
Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
6767
/*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
6768
m_table->file->ha_index_or_rnd_end();
6769
free(m_key); // Free for multi_malloc
6776
Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
6778
assert(m_table != NULL);
6780
int error= find_row(rli);
6784
We need to read the second image in the event of error to be
6785
able to skip to the next pair of updates
6787
m_curr_row= m_curr_row_end;
6788
unpack_current_row(rli, &m_cols_ai);
6793
This is the situation after locating BI:
6795
===|=== before image ====|=== after image ===|===
6797
m_curr_row m_curr_row_end
6799
BI found in the table is stored in record[0]. We copy it to record[1]
6800
and unpack AI to record[0].
6803
store_record(m_table,record[1]);
6805
m_curr_row= m_curr_row_end;
6806
error= unpack_current_row(rli, &m_cols_ai); // this also updates m_curr_row_end
6809
Now we have the right row to update. The old row (the one we're
6810
looking for) is in record[1] and the new row is in record[0].
6813
// Temporary fix to find out why it fails [/Matz]
6814
memcpy(m_table->read_set->bitmap, m_cols.bitmap, (m_table->read_set->n_bits + 7) / 8);
6815
memcpy(m_table->write_set->bitmap, m_cols_ai.bitmap, (m_table->write_set->n_bits + 7) / 8);
6817
error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
6818
if (error == HA_ERR_RECORD_IS_THE_SAME)
6825
Incident_log_event::Incident_log_event(const char *buf, uint32_t event_len,
6826
const Format_description_log_event *descr_event)
6827
: Log_event(buf, descr_event)
6829
uint8_t const common_header_len=
6830
descr_event->common_header_len;
6831
uint8_t const post_header_len=
6832
descr_event->post_header_len[INCIDENT_EVENT-1];
6834
m_incident= static_cast<Incident>(uint2korr(buf + common_header_len));
6835
char const *ptr= buf + common_header_len + post_header_len;
6836
char const *const str_end= buf + event_len;
6837
uint8_t len= 0; // Assignment to keep compiler happy
6838
const char *str= NULL; // Assignment to keep compiler happy
6839
read_str(&ptr, str_end, &str, &len);
6840
m_message.str= const_cast<char*>(str);
6841
m_message.length= len;
6846
Incident_log_event::~Incident_log_event()
6852
Incident_log_event::description() const
6854
static const char *const description[]= {
6855
"NOTHING", // Not used
6859
assert(0 <= m_incident);
6860
assert((size_t) m_incident <= sizeof(description)/sizeof(*description));
6862
return description[m_incident];
6866
void Incident_log_event::pack_info(Protocol *protocol)
6870
if (m_message.length > 0)
6871
bytes= snprintf(buf, sizeof(buf), "#%d (%s)",
6872
m_incident, description());
6874
bytes= snprintf(buf, sizeof(buf), "#%d (%s): %s",
6875
m_incident, description(), m_message.str);
6876
protocol->store(buf, bytes, &my_charset_bin);
6881
Incident_log_event::do_apply_event(Relay_log_info const *rli)
6883
rli->report(ERROR_LEVEL, ER_SLAVE_INCIDENT,
6884
ER(ER_SLAVE_INCIDENT),
6886
m_message.length > 0 ? m_message.str : "<none>");
6892
Incident_log_event::write_data_header(IO_CACHE *file)
6894
unsigned char buf[sizeof(int16_t)];
6895
int2store(buf, (int16_t) m_incident);
6896
return(my_b_safe_write(file, buf, sizeof(buf)));
6900
Incident_log_event::write_data_body(IO_CACHE *file)
6902
return(write_str(file, m_message.str, m_message.length));
6905
Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint32_t event_len,
6906
const Format_description_log_event* description_event)
6907
:Log_event(buf, description_event)
6909
uint8_t header_size= description_event->common_header_len;
6910
ident_len = event_len - header_size;
6911
set_if_smaller(ident_len,FN_REFLEN-1);
6912
log_ident= buf + header_size;