1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2008 Sun Microsystems
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; version 2 of the License.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
1
/* Copyright (C) 2000-2004 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
20
17
#include <drizzled/server_includes.h>
21
#include <drizzled/log_event.h>
22
#include <drizzled/replication/rli.h>
23
#include <drizzled/replication/mi.h>
24
#include <libdrizzle/libdrizzle.h>
25
#include <mysys/hash.h>
26
#include <drizzled/replication/utility.h>
27
#include <drizzled/replication/record.h>
20
#include "rpl_filter.h"
21
#include "rpl_utility.h"
22
#include "rpl_record.h"
28
23
#include <mysys/my_dir.h>
29
24
#include <drizzled/error.h>
30
25
#include <libdrizzle/pack.h>
31
#include <drizzled/sql_parse.h>
32
#include <drizzled/sql_base.h>
33
#include <drizzled/sql_load.h>
34
#include <drizzled/item/return_int.h>
35
#include <drizzled/item/empty_string.h>
37
27
#include <algorithm>
40
29
#include <mysys/base64.h>
41
30
#include <mysys/my_bitmap.h>
1178
1175
int4store(start, flags2);
1178
if (sql_mode_inited)
1180
*start++= Q_SQL_MODE_CODE;
1181
int8store(start, (uint64_t)sql_mode);
1184
if (catalog_len) // i.e. this var is inited (false for 4.0 events)
1186
write_str_with_code_and_len((char **)(&start),
1187
catalog, catalog_len, Q_CATALOG_NZ_CODE);
1189
In 5.0.x where x<4 masters we used to store the end zero here. This was
1190
a waste of one byte so we don't do it in x>=4 masters. We change code to
1191
Q_CATALOG_NZ_CODE, because re-using the old code would make x<4 slaves
1192
of this x>=4 master segfault (expecting a zero when there is
1193
none). Remaining compatibility problems are: the older slave will not
1194
find the catalog; but it is will not crash, and it's not an issue
1195
that it does not find the catalog as catalogs were not used in these
1196
older MySQL versions (we store it in binlog and read it from relay log
1197
but do nothing useful with it). What is an issue is that the older slave
1198
will stop processing the Q_* blocks (and jumps to the db/query) as soon
1199
as it sees unknown Q_CATALOG_NZ_CODE; so it will not be able to read
1200
Q_AUTO_INCREMENT*, Q_CHARSET and so replication will fail silently in
1201
various ways. Documented that you should not mix alpha/beta versions if
1202
they are not exactly the same version, with example of 5.0.3->5.0.2 and
1203
5.0.4->5.0.3. If replication is from older to new, the new will
1204
recognize Q_CATALOG_CODE and have no problem.
1207
if (auto_increment_increment != 1 || auto_increment_offset != 1)
1209
*start++= Q_AUTO_INCREMENT;
1210
int2store(start, auto_increment_increment);
1211
int2store(start+2, auto_increment_offset);
1216
*start++= Q_CHARSET_CODE;
1217
memcpy(start, charset, 6);
1222
/* In the TZ sys table, column Name is of length 64 so this should be ok */
1223
assert(time_zone_len <= MAX_TIME_ZONE_NAME_LENGTH);
1224
*start++= Q_TIME_ZONE_CODE;
1225
*start++= time_zone_len;
1226
memcpy(start, time_zone_str, time_zone_len);
1227
start+= time_zone_len;
1181
1229
if (lc_time_names_number)
1183
1231
assert(lc_time_names_number <= 0xFFFF);
1251
1299
The value for local `killed_status' can be supplied by caller.
1253
1301
Query_log_event::Query_log_event(Session* session_arg, const char* query_arg,
1254
ulong query_length, bool using_trans,
1302
ulong query_length, bool using_trans,
1256
1304
Session::killed_state killed_status_arg)
1257
:Log_event(session_arg,
1258
(session_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0) |
1259
(suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
1261
data_buf(0), query(query_arg), catalog(session_arg->catalog),
1262
db(session_arg->db), q_len((uint32_t) query_length),
1263
thread_id(session_arg->thread_id),
1264
/* save the original thread id; we already know the server id */
1265
slave_proxy_id(session_arg->variables.pseudo_thread_id),
1266
flags2_inited(1), sql_mode_inited(1), charset_inited(1),
1268
auto_increment_increment(session_arg->variables.auto_increment_increment),
1269
auto_increment_offset(session_arg->variables.auto_increment_offset),
1270
lc_time_names_number(session_arg->variables.lc_time_names->number),
1305
:Log_event(session_arg,
1306
(session_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F :
1308
(suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
1310
data_buf(0), query(query_arg), catalog(session_arg->catalog),
1311
db(session_arg->db), q_len((uint32_t) query_length),
1312
thread_id(session_arg->thread_id),
1313
/* save the original thread id; we already know the server id */
1314
slave_proxy_id(session_arg->variables.pseudo_thread_id),
1315
flags2_inited(1), sql_mode_inited(1), charset_inited(1),
1317
auto_increment_increment(session_arg->variables.auto_increment_increment),
1318
auto_increment_offset(session_arg->variables.auto_increment_offset),
1319
lc_time_names_number(session_arg->variables.lc_time_names->number),
1271
1320
charset_database_number(0)
1273
1322
time_t end_time;
1300
1349
we will probably want to reclaim the 29 bits. So we need the &.
1302
1351
flags2= (uint32_t) (session_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG);
1352
assert(session_arg->variables.character_set_client->number < 256*256);
1353
assert(session_arg->variables.collation_connection->number < 256*256);
1303
1354
assert(session_arg->variables.collation_server->number < 256*256);
1355
assert(session_arg->variables.character_set_client->mbminlen == 1);
1356
int2store(charset, session_arg->variables.character_set_client->number);
1357
int2store(charset+2, session_arg->variables.collation_connection->number);
1304
1358
int2store(charset+4, session_arg->variables.collation_server->number);
1308
static void copy_str_and_move(const char **src,
1309
Log_event::Byte **dst,
1359
if (session_arg->time_zone_used)
1362
Note that our event becomes dependent on the Time_zone object
1363
representing the time zone. Fortunately such objects are never deleted
1364
or changed during mysqld's lifetime.
1366
time_zone_len= session_arg->variables.time_zone->get_name()->length();
1367
time_zone_str= session_arg->variables.time_zone->get_name()->ptr();
1374
/* 2 utility functions for the next method */
1377
Read a string with length from memory.
1379
This function reads the string-with-length stored at
1380
<code>src</code> and extract the length into <code>*len</code> and
1381
a pointer to the start of the string into <code>*dst</code>. The
1382
string can then be copied using <code>memcpy()</code> with the
1383
number of bytes given in <code>*len</code>.
1385
@param src Pointer to variable holding a pointer to the memory to
1386
read the string from.
1387
@param dst Pointer to variable holding a pointer where the actual
1388
string starts. Starting from this position, the string
1389
can be copied using @c memcpy().
1390
@param len Pointer to variable where the length will be stored.
1391
@param end One-past-the-end of the memory where the string is
1394
@return Zero if the entire string can be copied successfully,
1395
@c UINT_MAX if the length could not be read from memory
1396
(that is, if <code>*src >= end</code>), otherwise the
1397
number of bytes that are missing to read the full
1398
string, which happends <code>*dst + *len >= end</code>.
1401
get_str_len_and_pointer(const Log_event::Byte **src,
1404
const Log_event::Byte *end)
1407
return -1; // Will be UINT_MAX in two-complement arithmetics
1408
uint32_t length= **src;
1411
if (*src + length >= end)
1412
return *src + length - end + 1; // Number of bytes missing
1413
*dst= (char *)*src + 1; // Will be copied later
1420
static void copy_str_and_move(const char **src,
1421
Log_event::Byte **dst,
1312
1424
memcpy(*dst, *src, len);
2112
case 3: /* 4.0.x x>=2 */
2114
We build an artificial (i.e. not sent by the master) event, which
2115
describes what those old master versions send.
2118
my_stpcpy(server_version, server_ver ? server_ver : "3.23");
2120
my_stpcpy(server_version, server_ver ? server_ver : "4.0");
2121
common_header_len= binlog_ver==1 ? OLD_HEADER_LEN :
2122
LOG_EVENT_MINIMAL_HEADER_LEN;
2124
The first new event in binlog version 4 is Format_desc. So any event type
2125
after that does not exist in older versions. We use the events known by
2126
version 3, even if version 1 had only a subset of them (this is not a
2127
problem: it uses a few bytes for nothing but unifies code; it does not
2128
make the slave detect less corruptions).
2130
number_of_event_types= FORMAT_DESCRIPTION_EVENT - 1;
2131
post_header_len=(uint8_t*) my_malloc(number_of_event_types*sizeof(uint8_t),
2133
if (post_header_len)
2135
post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
2136
post_header_len[QUERY_EVENT-1]= QUERY_HEADER_MINIMAL_LEN;
2137
post_header_len[STOP_EVENT-1]= 0;
2138
post_header_len[ROTATE_EVENT-1]= (binlog_ver==1) ? 0 : ROTATE_HEADER_LEN;
2139
post_header_len[INTVAR_EVENT-1]= 0;
2140
post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
2141
post_header_len[SLAVE_EVENT-1]= 0;
2142
post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
2143
post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
2144
post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
2145
post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
2146
post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
2147
post_header_len[RAND_EVENT-1]= 0;
2148
post_header_len[USER_VAR_EVENT-1]= 0;
1937
2151
default: /* Includes binlog version 2 i.e. 4.0.x x<=1 */
2152
post_header_len= 0; /* will make is_valid() fail */
1940
2155
calc_server_version_split();
2254
2471
if (need_db && db && db_len)
2256
pos= strcpy(pos, "use `")+5;
2473
pos= my_stpcpy(pos, "use `");
2257
2474
memcpy(pos, db, db_len);
2258
pos= strcpy(pos+db_len, "`; ")+3;
2475
pos= my_stpcpy(pos+db_len, "`; ");
2261
pos= strcpy(pos, "LOAD DATA ")+10;
2478
pos= my_stpcpy(pos, "LOAD DATA ");
2264
2481
*fn_start= pos;
2266
2483
if (check_fname_outside_temp_buf())
2267
pos= strcpy(pos, "LOCAL ")+6;
2268
pos= strcpy(pos, "INFILE '")+8;
2484
pos= my_stpcpy(pos, "LOCAL ");
2485
pos= my_stpcpy(pos, "INFILE '");
2269
2486
memcpy(pos, fname, fname_len);
2270
pos= strcpy(pos+fname_len, "' ")+2;
2487
pos= my_stpcpy(pos+fname_len, "' ");
2272
2489
if (sql_ex.opt_flags & REPLACE_FLAG)
2273
pos= strcpy(pos, " REPLACE ")+9;
2490
pos= my_stpcpy(pos, " REPLACE ");
2274
2491
else if (sql_ex.opt_flags & IGNORE_FLAG)
2275
pos= strcpy(pos, " IGNORE ")+8;
2492
pos= my_stpcpy(pos, " IGNORE ");
2277
pos= strcpy(pos ,"INTO")+4;
2494
pos= my_stpcpy(pos ,"INTO");
2282
pos= strcpy(pos ," Table `")+8;
2499
pos= my_stpcpy(pos ," Table `");
2283
2500
memcpy(pos, table_name, table_name_len);
2284
2501
pos+= table_name_len;
2286
2503
/* We have to create all optinal fields as the default is not empty */
2287
pos= strcpy(pos, "` FIELDS TERMINATED BY ")+23;
2504
pos= my_stpcpy(pos, "` FIELDS TERMINATED BY ");
2288
2505
pos= pretty_print_str(pos, sql_ex.field_term, sql_ex.field_term_len);
2289
2506
if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG)
2290
pos= strcpy(pos, " OPTIONALLY ")+12;
2291
pos= strcpy(pos, " ENCLOSED BY ")+13;
2507
pos= my_stpcpy(pos, " OPTIONALLY ");
2508
pos= my_stpcpy(pos, " ENCLOSED BY ");
2292
2509
pos= pretty_print_str(pos, sql_ex.enclosed, sql_ex.enclosed_len);
2294
pos= strcpy(pos, " ESCAPED BY ")+12;
2511
pos= my_stpcpy(pos, " ESCAPED BY ");
2295
2512
pos= pretty_print_str(pos, sql_ex.escaped, sql_ex.escaped_len);
2297
pos= strcpy(pos, " LINES TERMINATED BY ")+21;
2514
pos= my_stpcpy(pos, " LINES TERMINATED BY ");
2298
2515
pos= pretty_print_str(pos, sql_ex.line_term, sql_ex.line_term_len);
2299
2516
if (sql_ex.line_start_len)
2301
pos= strcpy(pos, " STARTING BY ")+13;
2518
pos= my_stpcpy(pos, " STARTING BY ");
2302
2519
pos= pretty_print_str(pos, sql_ex.line_start, sql_ex.line_start_len);
2305
2522
if ((long) skip_lines > 0)
2307
pos= strcpy(pos, " IGNORE ")+8;
2524
pos= my_stpcpy(pos, " IGNORE ");
2308
2525
pos= int64_t10_to_str((int64_t) skip_lines, pos, 10);
2309
pos= strcpy(pos," LINES ")+7;
2526
pos= my_stpcpy(pos," LINES ");
2312
2529
if (num_fields)
2315
2532
const char *field= fields;
2316
pos= strcpy(pos, " (")+2;
2533
pos= my_stpcpy(pos, " (");
2317
2534
for (i = 0; i < num_fields; i++)
3005
3230
/**************************************************************************
3231
Intvar_log_event methods
3232
**************************************************************************/
3235
Intvar_log_event::pack_info()
3238
void Intvar_log_event::pack_info(Protocol *protocol)
3240
char buf[256], *pos;
3241
pos= strmake(buf, get_var_type_name(), sizeof(buf)-23);
3243
pos= int64_t10_to_str(val, pos, -10);
3244
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3249
Intvar_log_event::Intvar_log_event()
3252
Intvar_log_event::Intvar_log_event(const char* buf,
3253
const Format_description_log_event* description_event)
3254
:Log_event(buf, description_event)
3256
buf+= description_event->common_header_len;
3257
type= buf[I_TYPE_OFFSET];
3258
val= uint8korr(buf+I_VAL_OFFSET);
3263
Intvar_log_event::get_var_type_name()
3266
const char* Intvar_log_event::get_var_type_name()
3269
case LAST_INSERT_ID_EVENT: return "LAST_INSERT_ID";
3270
case INSERT_ID_EVENT: return "INSERT_ID";
3271
default: /* impossible */ return "UNKNOWN";
3277
Intvar_log_event::write()
3280
bool Intvar_log_event::write(IO_CACHE* file)
3282
unsigned char buf[9];
3283
buf[I_TYPE_OFFSET]= (unsigned char) type;
3284
int8store(buf + I_VAL_OFFSET, val);
3285
return (write_header(file, sizeof(buf)) ||
3286
my_b_safe_write(file, buf, sizeof(buf)));
3291
Intvar_log_event::print()
3295
Intvar_log_event::do_apply_event()
3298
int Intvar_log_event::do_apply_event(Relay_log_info const *rli)
3301
We are now in a statement until the associated query log event has
3304
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
3307
case LAST_INSERT_ID_EVENT:
3308
session->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 1;
3309
session->first_successful_insert_id_in_prev_stmt= val;
3311
case INSERT_ID_EVENT:
3312
session->force_one_auto_inc_interval(val);
3318
int Intvar_log_event::do_update_pos(Relay_log_info *rli)
3320
rli->inc_event_relay_log_pos();
3325
Log_event::enum_skip_reason
3326
Intvar_log_event::do_shall_skip(Relay_log_info *rli)
3329
It is a common error to set the slave skip counter to 1 instead of
3330
2 when recovering from an insert which used a auto increment,
3331
rand, or user var. Therefore, if the slave skip counter is 1, we
3332
just say that this event should be skipped by ignoring it, meaning
3333
that we do not change the value of the slave skip counter since it
3334
will be decreased by the following insert event.
3336
return continue_group(rli);
3340
/**************************************************************************
3341
Rand_log_event methods
3342
**************************************************************************/
3344
void Rand_log_event::pack_info(Protocol *protocol)
3346
char buf1[256], *pos;
3347
pos= my_stpcpy(buf1,"rand_seed1=");
3348
pos= int10_to_str((long) seed1, pos, 10);
3349
pos= my_stpcpy(pos, ",rand_seed2=");
3350
pos= int10_to_str((long) seed2, pos, 10);
3351
protocol->store(buf1, (uint) (pos-buf1), &my_charset_bin);
3355
Rand_log_event::Rand_log_event(const char* buf,
3356
const Format_description_log_event* description_event)
3357
:Log_event(buf, description_event)
3359
buf+= description_event->common_header_len;
3360
seed1= uint8korr(buf+RAND_SEED1_OFFSET);
3361
seed2= uint8korr(buf+RAND_SEED2_OFFSET);
3365
bool Rand_log_event::write(IO_CACHE* file)
3367
unsigned char buf[16];
3368
int8store(buf + RAND_SEED1_OFFSET, seed1);
3369
int8store(buf + RAND_SEED2_OFFSET, seed2);
3370
return (write_header(file, sizeof(buf)) ||
3371
my_b_safe_write(file, buf, sizeof(buf)));
3375
int Rand_log_event::do_apply_event(Relay_log_info const *rli)
3378
We are now in a statement until the associated query log event has
3381
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
3383
session->rand.seed1= (ulong) seed1;
3384
session->rand.seed2= (ulong) seed2;
3388
int Rand_log_event::do_update_pos(Relay_log_info *rli)
3390
rli->inc_event_relay_log_pos();
3395
Log_event::enum_skip_reason
3396
Rand_log_event::do_shall_skip(Relay_log_info *rli)
3399
It is a common error to set the slave skip counter to 1 instead of
3400
2 when recovering from an insert which used a auto increment,
3401
rand, or user var. Therefore, if the slave skip counter is 1, we
3402
just say that this event should be skipped by ignoring it, meaning
3403
that we do not change the value of the slave skip counter since it
3404
will be decreased by the following insert event.
3406
return continue_group(rli);
3410
/**************************************************************************
3006
3411
Xid_log_event methods
3007
3412
**************************************************************************/
3009
3414
void Xid_log_event::pack_info(Protocol *protocol)
3011
3416
char buf[128], *pos;
3012
pos= strcpy(buf, "COMMIT /* xid=")+14;
3417
pos= my_stpcpy(buf, "COMMIT /* xid=");
3013
3418
pos= int64_t10_to_str(xid, pos, 10);
3014
pos= strcpy(pos, " */")+3;
3419
pos= my_stpcpy(pos, " */");
3015
3420
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
3020
3425
It's ok not to use int8store here,
3021
as long as XID::set(uint64_t) and
3022
XID::get_my_xid doesn't do it either.
3426
as long as xid_t::set(uint64_t) and
3427
xid_t::get_my_xid doesn't do it either.
3023
3428
We don't care about actual values of xids as long as
3024
3429
identical numbers compare identically
3060
3465
/**************************************************************************
3466
User_var_log_event methods
3467
**************************************************************************/
3469
void User_var_log_event::pack_info(Protocol* protocol)
3472
uint32_t val_offset= 4 + name_len;
3473
uint32_t event_len= val_offset;
3477
if (!(buf= (char*) my_malloc(val_offset + 5, MYF(MY_WME))))
3479
my_stpcpy(buf + val_offset, "NULL");
3480
event_len= val_offset + 4;
3487
float8get(real_val, val);
3488
if (!(buf= (char*) my_malloc(val_offset + MY_GCVT_MAX_FIELD_WIDTH + 1,
3491
event_len+= my_gcvt(real_val, MY_GCVT_ARG_DOUBLE, MY_GCVT_MAX_FIELD_WIDTH,
3492
buf + val_offset, NULL);
3495
if (!(buf= (char*) my_malloc(val_offset + 22, MYF(MY_WME))))
3497
event_len= int64_t10_to_str(uint8korr(val), buf + val_offset,-10)-buf;
3499
case DECIMAL_RESULT:
3501
if (!(buf= (char*) my_malloc(val_offset + DECIMAL_MAX_STR_LENGTH,
3504
String str(buf+val_offset, DECIMAL_MAX_STR_LENGTH, &my_charset_bin);
3506
binary2my_decimal(E_DEC_FATAL_ERROR, (unsigned char*) (val+2), &dec, val[0],
3508
my_decimal2string(E_DEC_FATAL_ERROR, &dec, 0, 0, 0, &str);
3509
event_len= str.length() + val_offset;
3513
/* 15 is for 'COLLATE' and other chars */
3514
buf= (char*) my_malloc(event_len+val_len*2+1+2*MY_CS_NAME_SIZE+15,
3516
const CHARSET_INFO *cs;
3519
if (!(cs= get_charset(charset_number, MYF(0))))
3521
my_stpcpy(buf+val_offset, "???");
3526
char *p= strxmov(buf + val_offset, "_", cs->csname, " ", NULL);
3527
p= str_to_hex(p, val, val_len);
3528
p= strxmov(p, " COLLATE ", cs->name, NULL);
3540
memcpy(buf+2, name, name_len);
3541
buf[2+name_len]= '`';
3542
buf[3+name_len]= '=';
3543
protocol->store(buf, event_len, &my_charset_bin);
3548
User_var_log_event::
3549
User_var_log_event(const char* buf,
3550
const Format_description_log_event* description_event)
3551
:Log_event(buf, description_event)
3553
buf+= description_event->common_header_len;
3554
name_len= uint4korr(buf);
3555
name= (char *) buf + UV_NAME_LEN_SIZE;
3556
buf+= UV_NAME_LEN_SIZE + name_len;
3557
is_null= (bool) *buf;
3560
type= STRING_RESULT;
3561
charset_number= my_charset_bin.number;
3567
type= (Item_result) buf[UV_VAL_IS_NULL];
3568
charset_number= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE);
3569
val_len= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
3570
UV_CHARSET_NUMBER_SIZE);
3571
val= (char *) (buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
3572
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE);
3577
bool User_var_log_event::write(IO_CACHE* file)
3579
char buf[UV_NAME_LEN_SIZE];
3580
char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
3581
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE];
3582
unsigned char buf2[(8 > DECIMAL_MAX_FIELD_SIZE + 2) ? 8 : DECIMAL_MAX_FIELD_SIZE +2], *pos= buf2;
3583
uint32_t buf1_length;
3586
int4store(buf, name_len);
3588
if ((buf1[0]= is_null))
3591
val_len= 0; // Length of 'pos'
3596
int4store(buf1 + 2, charset_number);
3600
float8store(buf2, *(double*) val);
3603
int8store(buf2, *(int64_t*) val);
3605
case DECIMAL_RESULT:
3607
my_decimal *dec= (my_decimal *)val;
3608
dec->fix_buffer_pointer();
3609
buf2[0]= (char)(dec->intg + dec->frac);
3610
buf2[1]= (char)dec->frac;
3611
decimal2bin((decimal_t*)val, buf2+2, buf2[0], buf2[1]);
3612
val_len= decimal_bin_size(buf2[0], buf2[1]) + 2;
3616
pos= (unsigned char*) val;
3623
int4store(buf1 + 2 + UV_CHARSET_NUMBER_SIZE, val_len);
3627
/* Length of the whole event */
3628
event_length= sizeof(buf)+ name_len + buf1_length + val_len;
3630
return (write_header(file, event_length) ||
3631
my_b_safe_write(file, (unsigned char*) buf, sizeof(buf)) ||
3632
my_b_safe_write(file, (unsigned char*) name, name_len) ||
3633
my_b_safe_write(file, (unsigned char*) buf1, buf1_length) ||
3634
my_b_safe_write(file, pos, val_len));
3640
User_var_log_event::do_apply_event()
3643
int User_var_log_event::do_apply_event(Relay_log_info const *rli)
3646
const CHARSET_INFO *charset;
3647
if (!(charset= get_charset(charset_number, MYF(MY_WME))))
3649
LEX_STRING user_var_name;
3650
user_var_name.str= name;
3651
user_var_name.length= name_len;
3656
We are now in a statement until the associated query log event has
3659
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
3663
it= new Item_null();
3669
float8get(real_val, val);
3670
it= new Item_float(real_val, 0);
3671
val= (char*) &real_val; // Pointer to value in native format
3675
int_val= (int64_t) uint8korr(val);
3676
it= new Item_int(int_val);
3677
val= (char*) &int_val; // Pointer to value in native format
3680
case DECIMAL_RESULT:
3682
Item_decimal *dec= new Item_decimal((unsigned char*) val+2, val[0], val[1]);
3684
val= (char *)dec->val_decimal(NULL);
3685
val_len= sizeof(my_decimal);
3689
it= new Item_string(val, val_len, charset);
3697
Item_func_set_user_var e(user_var_name, it);
3699
Item_func_set_user_var can't substitute something else on its place =>
3700
0 can be passed as last argument (reference on item)
3702
e.fix_fields(session, 0);
3704
A variable can just be considered as a table with
3705
a single record and with a single column. Thus, like
3706
a column value, it could always have IMPLICIT derivation.
3708
e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT, 0);
3709
free_root(session->mem_root,0);
3714
int User_var_log_event::do_update_pos(Relay_log_info *rli)
3716
rli->inc_event_relay_log_pos();
3720
Log_event::enum_skip_reason
3721
User_var_log_event::do_shall_skip(Relay_log_info *rli)
3724
It is a common error to set the slave skip counter to 1 instead
3725
of 2 when recovering from an insert which used a auto increment,
3726
rand, or user var. Therefore, if the slave skip counter is 1, we
3727
just say that this event should be skipped by ignoring it, meaning
3728
that we do not change the value of the slave skip counter since it
3729
will be decreased by the following insert event.
3731
return continue_group(rli);
3735
/**************************************************************************
3061
3736
Slave_log_event methods
3062
3737
**************************************************************************/
3064
3739
void Slave_log_event::pack_info(Protocol *protocol)
3066
ostringstream stream;
3067
stream << "host=" << master_host << ",port=" << master_port;
3068
stream << ",log=" << master_log << ",pos=" << master_pos;
3070
protocol->store(stream.str().c_str(), stream.str().length(),
3741
char buf[256+HOSTNAME_LENGTH], *pos;
3742
pos= my_stpcpy(buf, "host=");
3743
pos= my_stpncpy(pos, master_host.c_str(), HOSTNAME_LENGTH);
3744
pos= my_stpcpy(pos, ",port=");
3745
pos= int10_to_str((long) master_port, pos, 10);
3746
pos= my_stpcpy(pos, ",log=");
3747
pos= my_stpcpy(pos, master_log.c_str());
3748
pos= my_stpcpy(pos, ",pos=");
3749
pos= int64_t10_to_str(master_pos, pos, 10);
3750
protocol->store(buf, pos-buf, &my_charset_bin);
4748
5468
@page How replication of field metadata works.
4750
When a table map is created, the master first calls
4751
Table_map_log_event::save_field_metadata() which calculates how many
4752
values will be in the field metadata. Only those fields that require the
4753
extra data are added. The method also loops through all of the fields in
5470
When a table map is created, the master first calls
5471
Table_map_log_event::save_field_metadata() which calculates how many
5472
values will be in the field metadata. Only those fields that require the
5473
extra data are added. The method also loops through all of the fields in
4754
5474
the table calling the method Field::save_field_metadata() which returns the
4755
5475
values for the field that will be saved in the metadata and replicated to
4756
5476
the slave. Once all fields have been processed, the table map is written to
4757
5477
the binlog adding the size of the field metadata and the field metadata to
4758
5478
the end of the body of the table map.
4760
When a table map is read on the slave, the field metadata is read from the
4761
table map and passed to the table_def class constructor which saves the
4762
field metadata from the table map into an array based on the type of the
4763
field. Field metadata values not present (those fields that do not use extra
4764
data) in the table map are initialized as zero (0). The array size is the
5480
When a table map is read on the slave, the field metadata is read from the
5481
table map and passed to the table_def class constructor which saves the
5482
field metadata from the table map into an array based on the type of the
5483
field. Field metadata values not present (those fields that do not use extra
5484
data) in the table map are initialized as zero (0). The array size is the
4765
5485
same as the columns for the table on the slave.
4767
Additionally, values saved for field metadata on the master are saved as a
5487
Additionally, values saved for field metadata on the master are saved as a
4768
5488
string of bytes (unsigned char) in the binlog. A field may require 1 or more bytes
4769
to store the information. In cases where values require multiple bytes
4770
(e.g. values > 255), the endian-safe methods are used to properly encode
5489
to store the information. In cases where values require multiple bytes
5490
(e.g. values > 255), the endian-safe methods are used to properly encode
4771
5491
the values on the master and decode them on the slave. When the field
4772
5492
metadata values are captured on the slave, they are stored in an array of
4773
5493
type uint16_t. This allows the least number of casts to prevent casting bugs
4774
5494
when the field metadata is used in comparisons of field attributes. When
4775
5495
the field metadata is used for calculating addresses in pointer math, the
4776
type used is uint32_t.
5496
type used is uint32_t.
5337
6075
Write the current row into event's table.
5339
6077
The row is located in the row buffer, pointed by @c m_curr_row member.
5340
Number of columns of the row is stored in @c m_width member (it can be
5341
different from the number of columns in the table to which we insert).
5342
Bitmap @c m_cols indicates which columns are present in the row. It is assumed
6078
Number of columns of the row is stored in @c m_width member (it can be
6079
different from the number of columns in the table to which we insert).
6080
Bitmap @c m_cols indicates which columns are present in the row. It is assumed
5343
6081
that event's table is already open and pointed by @c m_table.
5345
If the same record already exists in the table it can be either overwritten
5346
or an error is reported depending on the value of @c overwrite flag
6083
If the same record already exists in the table it can be either overwritten
6084
or an error is reported depending on the value of @c overwrite flag
5347
6085
(error reporting not yet implemented). Note that the matching record can be
5348
6086
different from the row we insert if we use primary keys to identify records in
5351
The row to be inserted can contain values only for selected columns. The
5352
missing columns are filled with default values using @c prepare_record()
6089
The row to be inserted can contain values only for selected columns. The
6090
missing columns are filled with default values using @c prepare_record()
5353
6091
function. If a matching record is found in the table and @c overwritte is
5354
6092
true, the missing columns are taken from it.
5356
6094
@param rli Relay log info (needed for row unpacking).
5358
Shall we overwrite if the row already exists or signal
6096
Shall we overwrite if the row already exists or signal
5359
6097
error (currently ignored).
5361
6099
@returns Error code on failure, 0 on success.
5363
6101
This method, if successful, sets @c m_curr_row_end pointer to point at the
5364
next row in the rows buffer. This is done when unpacking the row to be
6102
next row in the rows buffer. This is done when unpacking the row to be
5367
@note If a matching record is found, it is either updated using
6105
@note If a matching record is found, it is either updated using
5368
6106
@c ha_update_row() or first deleted and then new record written.
5372
6110
Rows_log_event::write_row(const Relay_log_info *const rli,
5653
6399
Locate the current row in event's table.
5655
The current row is pointed by @c m_curr_row. Member @c m_width tells how many
5656
columns are there in the row (this can be differnet from the number of columns
5657
in the table). It is assumed that event's table is already open and pointed
6401
The current row is pointed by @c m_curr_row. Member @c m_width tells how many
6402
columns are there in the row (this can be differnet from the number of columns
6403
in the table). It is assumed that event's table is already open and pointed
5660
If a corresponding record is found in the table it is stored in
5661
@c m_table->record[0]. Note that when record is located based on a primary
6406
If a corresponding record is found in the table it is stored in
6407
@c m_table->record[0]. Note that when record is located based on a primary
5662
6408
key, it is possible that the record found differs from the row being located.
5664
If no key is specified or table does not have keys, a table scan is used to
6410
If no key is specified or table does not have keys, a table scan is used to
5665
6411
find the row. In that case the row should be complete and contain values for
5666
all columns. However, it can still be shorter than the table, i.e. the table
5667
can contain extra columns not present in the row. It is also possible that
5668
the table has fewer columns than the row being located.
5670
@returns Error code on failure, 0 on success.
5672
@post In case of success @c m_table->record[0] contains the record found.
6412
all columns. However, it can still be shorter than the table, i.e. the table
6413
can contain extra columns not present in the row. It is also possible that
6414
the table has fewer columns than the row being located.
6416
@returns Error code on failure, 0 on success.
6418
@post In case of success @c m_table->record[0] contains the record found.
5673
6419
Also, the internal "cursor" of the table is positioned at the record found.
5675
6421
@note If the engine allows random access of the records, a combination of
5676
@c position() and @c rnd_pos() will be used.
6422
@c position() and @c rnd_pos() will be used.
5679
6425
int Rows_log_event::find_row(const Relay_log_info *rli)