77
static int send_file(THD *thd)
78
static int send_file(Session *session)
80
NET* net = &session->net;
80
81
int fd = -1, error = 1;
82
83
char fname[FN_REFLEN+1];
83
84
const char *errmsg = 0;
85
86
unsigned long packet_len;
86
uchar buf[IO_SIZE]; // It's safe to alloc this
87
unsigned char buf[IO_SIZE]; // It's safe to alloc this
89
90
The client might be slow loading the data, give him wait_timeout to do
92
93
old_timeout= net->read_timeout;
93
my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
94
my_net_set_read_timeout(net, session->variables.net_wait_timeout);
96
97
We need net_flush here because the client will not know it needs to send
243
244
my_message(errmsg, ER(errmsg), MYF(0));
251
bool purge_master_logs(THD* thd, const char* to_log)
252
bool purge_master_logs(Session* session, const char* to_log)
253
254
char search_file_name[FN_REFLEN];
254
255
if (!mysql_bin_log.is_open())
260
261
mysql_bin_log.make_log_name(search_file_name, to_log);
261
return purge_error_message(thd,
262
return purge_error_message(session,
262
263
mysql_bin_log.purge_logs(search_file_name, 0, 1,
267
bool purge_master_logs_before_date(THD* thd, time_t purge_time)
268
bool purge_master_logs_before_date(Session* session, time_t purge_time)
269
270
if (!mysql_bin_log.is_open())
274
return purge_error_message(thd,
275
return purge_error_message(session,
275
276
mysql_bin_log.purge_logs_before_date(purge_time));
309
310
An auxiliary function for calling in mysql_binlog_send
310
311
to initialize the heartbeat timeout in waiting for a binlogged event.
312
@param[in] thd THD to access a user variable
313
@param[in] session Session to access a user variable
314
315
@return heartbeat period an uint64_t of nanoseconds
315
316
or zero if heartbeat was not demanded by slave
317
static uint64_t get_heartbeat_period(THD * thd)
318
static uint64_t get_heartbeat_period(Session * session)
320
321
LEX_STRING name= { C_STRING_WITH_LEN("master_heartbeat_period")};
321
322
user_var_entry *entry=
322
(user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str,
323
(user_var_entry*) hash_search(&session->user_vars, (unsigned char*) name.str,
324
325
return entry? entry->val_int(&null_value) : 0;
376
377
TODO: Clean up loop to only have one call to send_file()
379
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
380
void mysql_binlog_send(Session* session, char* log_ident, my_off_t pos,
383
384
char *log_file_name = linfo.log_file_name;
384
385
char search_file_name[FN_REFLEN], *name;
387
String* packet = &thd->packet;
388
String* packet = &session->packet;
389
390
const char *errmsg = "Unknown error";
390
NET* net = &thd->net;
391
NET* net = &session->net;
391
392
pthread_mutex_t *log_lock;
392
393
bool binlog_can_be_corrupted= false;
804
805
end_io_cache(&log);
805
806
(void)my_close(file, MYF(MY_WME));
808
thd_proc_info(thd, "Waiting to finalize termination");
809
session->set_proc_info("Waiting to finalize termination");
809
810
pthread_mutex_lock(&LOCK_thread_count);
810
thd->current_linfo = 0;
811
session->current_linfo = 0;
811
812
pthread_mutex_unlock(&LOCK_thread_count);
815
thd_proc_info(thd, "Waiting to finalize termination");
816
session->set_proc_info("Waiting to finalize termination");
816
817
end_io_cache(&log);
818
819
Exclude iteration through thread list
819
820
this is needed for purge_logs() - it will iterate through
820
thread list and update thd->current_linfo->index_file_offset
821
thread list and update session->current_linfo->index_file_offset
821
822
this mutex will make sure that it never tried to update our linfo
822
823
after we return from this stack frame
824
825
pthread_mutex_lock(&LOCK_thread_count);
825
thd->current_linfo = 0;
826
session->current_linfo = 0;
826
827
pthread_mutex_unlock(&LOCK_thread_count);
828
829
(void) my_close(file, MYF(MY_WME));
845
846
don't wan't to touch the other thread), so set the bit to 0 for the
848
if (thd->lex->slave_thd_opt)
849
thread_mask&= thd->lex->slave_thd_opt;
849
if (session->lex->slave_session_opt)
850
thread_mask&= session->lex->slave_session_opt;
850
851
if (thread_mask) //some threads are stopped, start them
852
if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
853
if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
854
854
slave_errno=ER_MASTER_INFO;
855
else if (server_id_supplied && *mi->host)
855
else if (server_id_supplied && *mi->getHostname())
858
858
If we will start SQL thread we will care about UNTIL options If
864
864
pthread_mutex_lock(&mi->rli.data_lock);
866
if (thd->lex->mi.pos)
866
if (session->lex->mi.pos)
868
868
mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS;
869
mi->rli.until_log_pos= thd->lex->mi.pos;
869
mi->rli.until_log_pos= session->lex->mi.pos;
871
We don't check thd->lex->mi.log_file_name for NULL here
871
We don't check session->lex->mi.log_file_name for NULL here
872
872
since it is checked in sql_yacc.yy
874
strmake(mi->rli.until_log_name, thd->lex->mi.log_file_name,
874
strmake(mi->rli.until_log_name, session->lex->mi.log_file_name,
875
875
sizeof(mi->rli.until_log_name)-1);
877
else if (thd->lex->mi.relay_log_pos)
877
else if (session->lex->mi.relay_log_pos)
879
879
mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS;
880
mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
881
strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
880
mi->rli.until_log_pos= session->lex->mi.relay_log_pos;
881
strmake(mi->rli.until_log_name, session->lex->mi.relay_log_name,
882
882
sizeof(mi->rli.until_log_name)-1);
911
911
/* Issuing warning then started without --skip-slave-start */
912
912
if (!opt_skip_slave_start)
913
push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
913
push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
914
914
ER_MISSING_SKIP_SLAVE,
915
915
ER(ER_MISSING_SKIP_SLAVE));
918
918
pthread_mutex_unlock(&mi->rli.data_lock);
920
else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
921
push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
920
else if (session->lex->mi.pos || session->lex->mi.relay_log_pos)
921
push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
922
922
ER(ER_UNTIL_COND_IGNORED));
924
924
if (!slave_errno)
1140
thd_proc_info(thd, "Changing master");
1141
LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
1138
session->set_proc_info("Changing master");
1139
LEX_MASTER_INFO* lex_mi= &session->lex->mi;
1142
1140
// TODO: see if needs re-write
1143
if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
1141
if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
1146
1143
my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
1147
1144
unlock_slave_threads(mi);
1162
1159
if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
1164
mi->master_log_name[0] = 0;
1165
mi->master_log_pos= BIN_LOG_HEADER_SIZE;
1168
1162
if (lex_mi->log_file_name)
1169
strmake(mi->master_log_name, lex_mi->log_file_name,
1170
sizeof(mi->master_log_name)-1);
1163
mi->setLogName(lex_mi->log_file_name);
1171
1164
if (lex_mi->pos)
1173
mi->master_log_pos= lex_mi->pos;
1166
mi->setLogPosition(lex_mi->pos);
1176
1169
if (lex_mi->host)
1177
strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
1170
mi->setHost(lex_mi->host, lex_mi->port);
1178
1171
if (lex_mi->user)
1179
strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
1172
mi->setUsername(lex_mi->user);
1180
1173
if (lex_mi->password)
1181
strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
1183
mi->port = lex_mi->port;
1174
mi->setPassword(lex_mi->password);
1184
1175
if (lex_mi->connect_retry)
1185
1176
mi->connect_retry = lex_mi->connect_retry;
1186
1177
if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1187
1178
mi->heartbeat_period = lex_mi->heartbeat_period;
1189
mi->heartbeat_period= (float) min((double)SLAVE_MAX_HEARTBEAT_PERIOD,
1180
mi->heartbeat_period= (float) cmin((double)SLAVE_MAX_HEARTBEAT_PERIOD,
1190
1181
(slave_net_timeout/2.0));
1191
mi->received_heartbeats= 0LL; // counter lives until master is CHANGEd
1192
if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1193
mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
1195
if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1196
mi->ssl_verify_server_cert=
1197
(lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
1200
strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
1201
if (lex_mi->ssl_capath)
1202
strmake(mi->ssl_capath, lex_mi->ssl_capath, sizeof(mi->ssl_capath)-1);
1203
if (lex_mi->ssl_cert)
1204
strmake(mi->ssl_cert, lex_mi->ssl_cert, sizeof(mi->ssl_cert)-1);
1205
if (lex_mi->ssl_cipher)
1206
strmake(mi->ssl_cipher, lex_mi->ssl_cipher, sizeof(mi->ssl_cipher)-1);
1207
if (lex_mi->ssl_key)
1208
strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
1209
if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
1210
lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
1211
lex_mi->ssl_verify_server_cert )
1212
push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1213
ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
1182
mi->received_heartbeats= 0L; // counter lives until master is CHANGEd
1215
1184
if (lex_mi->relay_log_name)
1217
1186
need_relay_log_purge= 0;
1218
strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
1219
sizeof(mi->rli.group_relay_log_name)-1);
1220
strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
1221
sizeof(mi->rli.event_relay_log_name)-1);
1187
mi->rli.event_relay_log_name.assign(lex_mi->relay_log_name);
1224
1190
if (lex_mi->relay_log_pos)
1249
1215
Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
1250
not initialized), so we use a max().
1216
not initialized), so we use a cmax().
1251
1217
What happens to mi->rli.master_log_pos during the initialization stages
1252
1218
of replication is not 100% clear, so we guard against problems using
1255
mi->master_log_pos = ((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
1256
? BIN_LOG_HEADER_SIZE
1257
: mi->rli.group_master_log_pos);
1258
strmake(mi->master_log_name, mi->rli.group_master_log_name,
1259
sizeof(mi->master_log_name)-1);
1221
mi->setLogPosition(((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
1222
? BIN_LOG_HEADER_SIZE
1223
: mi->rli.group_master_log_pos));
1224
mi->setLogName(mi->rli.group_master_log_name.c_str());
1262
1227
Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
1263
1228
a slave before).
1265
if (flush_master_info(mi, 0))
1267
1232
my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
1268
1233
unlock_slave_threads(mi);
1307
1272
''/0: we have lost all copies of the original good coordinates.
1308
1273
That's why we always save good coords in rli.
1310
mi->rli.group_master_log_pos= mi->master_log_pos;
1311
strmake(mi->rli.group_master_log_name,mi->master_log_name,
1312
sizeof(mi->rli.group_master_log_name)-1);
1275
mi->rli.group_master_log_pos= mi->getLogPosition();
1276
mi->rli.group_master_log_name.assign(mi->getLogName());
1314
if (!mi->rli.group_master_log_name[0]) // uninitialized case
1315
mi->rli.group_master_log_pos=0;
1278
if (mi->rli.group_master_log_name.size() == 0) // uninitialized case
1279
mi->rli.group_master_log_pos= 0;
1317
1281
pthread_mutex_lock(&mi->rli.data_lock);
1318
1282
mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
1344
1308
ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
1347
return mysql_bin_log.reset_logs(thd);
1311
return mysql_bin_log.reset_logs(session);
1350
1314
int cmp_master_pos(const char* log_file_name1, uint64_t log_pos1,
1351
1315
const char* log_file_name2, uint64_t log_pos2)
1354
uint log_file_name1_len= strlen(log_file_name1);
1355
uint log_file_name2_len= strlen(log_file_name2);
1318
uint32_t log_file_name1_len= strlen(log_file_name1);
1319
uint32_t log_file_name2_len= strlen(log_file_name2);
1357
1321
// We assume that both log names match up to '.'
1358
1322
if (log_file_name1_len == log_file_name2_len)
1368
bool mysql_show_binlog_events(THD* thd)
1370
Protocol *protocol= thd->protocol;
1371
List<Item> field_list;
1372
const char *errmsg= 0;
1377
Log_event::init_show_field_list(&field_list);
1378
if (protocol->send_fields(&field_list,
1379
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1382
Format_description_log_event *description_event= new
1383
Format_description_log_event(3); /* MySQL 4.0 by default */
1386
Wait for handlers to insert any pending information
1387
into the binlog. For e.g. ndb which updates the binlog asynchronously
1388
this is needed so that the uses sees all its own commands in the binlog
1390
ha_binlog_wait(thd);
1392
if (mysql_bin_log.is_open())
1394
LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
1395
SELECT_LEX_UNIT *unit= &thd->lex->unit;
1396
ha_rows event_count, limit_start, limit_end;
1397
my_off_t pos = max((uint64_t)BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
1398
char search_file_name[FN_REFLEN], *name;
1399
const char *log_file_name = lex_mi->log_file_name;
1400
pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
1404
unit->set_limit(thd->lex->current_select);
1405
limit_start= unit->offset_limit_cnt;
1406
limit_end= unit->select_limit_cnt;
1408
name= search_file_name;
1410
mysql_bin_log.make_log_name(search_file_name, log_file_name);
1412
name=0; // Find first log
1414
linfo.index_file_offset = 0;
1416
if (mysql_bin_log.find_log_pos(&linfo, name, 1))
1418
errmsg = "Could not find target log";
1422
pthread_mutex_lock(&LOCK_thread_count);
1423
thd->current_linfo = &linfo;
1424
pthread_mutex_unlock(&LOCK_thread_count);
1426
if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
1430
to account binlog event header size
1432
thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
1434
pthread_mutex_lock(log_lock);
1437
open_binlog() sought to position 4.
1438
Read the first event in case it's a Format_description_log_event, to
1439
know the format. If there's no such event, we are 3.23 or 4.x. This
1440
code, like before, can't read 3.23 binlogs.
1441
This code will fail on a mixed relay log (one which has Format_desc then
1442
Rotate then Format_desc).
1444
ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
1447
if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
1449
delete description_event;
1450
description_event= (Format_description_log_event*) ev;
1456
my_b_seek(&log, pos);
1458
if (!description_event->is_valid())
1460
errmsg="Invalid Format_description event; could be out of memory";
1464
for (event_count = 0;
1465
(ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
1466
description_event)); )
1468
if (event_count >= limit_start &&
1469
ev->net_send(protocol, linfo.log_file_name, pos))
1471
errmsg = "Net error";
1473
pthread_mutex_unlock(log_lock);
1477
pos = my_b_tell(&log);
1480
if (++event_count >= limit_end)
1484
if (event_count < limit_end && log.error)
1486
errmsg = "Wrong offset or I/O error";
1487
pthread_mutex_unlock(log_lock);
1491
pthread_mutex_unlock(log_lock);
1497
delete description_event;
1501
(void) my_close(file, MYF(MY_WME));
1505
my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
1506
"SHOW BINLOG EVENTS", errmsg);
1510
pthread_mutex_lock(&LOCK_thread_count);
1511
thd->current_linfo = 0;
1512
pthread_mutex_unlock(&LOCK_thread_count);
1517
bool show_binlog_info(THD* thd)
1519
Protocol *protocol= thd->protocol;
1332
bool show_binlog_info(Session* session)
1334
Protocol *protocol= session->protocol;
1520
1335
List<Item> field_list;
1521
1336
field_list.push_back(new Item_empty_string("File", FN_REFLEN));
1522
1337
field_list.push_back(new Item_return_int("Position",20,
1642
1457
int log_loaded_block(IO_CACHE* file)
1644
1459
LOAD_FILE_INFO *lf_info;
1646
1461
/* buffer contains position where we started last read */
1647
uchar* buffer= (uchar*) my_b_get_buffer_start(file);
1648
uint max_event_size= current_thd->variables.max_allowed_packet;
1462
unsigned char* buffer= (unsigned char*) my_b_get_buffer_start(file);
1463
uint32_t max_event_size= current_session->variables.max_allowed_packet;
1649
1464
lf_info= (LOAD_FILE_INFO*) file->arg;
1650
if (lf_info->thd->current_stmt_binlog_row_based)
1465
if (lf_info->session->current_stmt_binlog_row_based)
1652
1467
if (lf_info->last_pos_in_file != HA_POS_ERROR &&
1653
1468
lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
1656
1471
for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
1657
buffer += min(block_len, max_event_size),
1658
block_len -= min(block_len, max_event_size))
1472
buffer += cmin(block_len, max_event_size),
1473
block_len -= cmin(block_len, max_event_size))
1660
1475
lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
1661
1476
if (lf_info->wrote_create_file)
1663
Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
1664
min(block_len, max_event_size),
1478
Append_block_log_event a(lf_info->session, lf_info->session->db, buffer,
1479
cmin(block_len, max_event_size),
1665
1480
lf_info->log_delayed);
1666
1481
mysql_bin_log.write(&a);
1670
Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
1485
Begin_load_query_log_event b(lf_info->session, lf_info->session->db,
1672
min(block_len, max_event_size),
1487
cmin(block_len, max_event_size),
1673
1488
lf_info->log_delayed);
1674
1489
mysql_bin_log.write(&b);
1675
1490
lf_info->wrote_create_file= 1;
1703
1518
sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg,
1704
1519
ulong *value_ptr)
1705
1520
:sys_var_long_ptr(chain, name_arg,value_ptr) {}
1706
bool update(THD *thd, set_var *var);
1521
bool update(Session *session, set_var *var);
1709
static void fix_slave_net_timeout(THD *thd,
1524
static void fix_slave_net_timeout(Session *session,
1710
1525
enum_var_type type __attribute__((unused)))
1712
#ifdef HAVE_REPLICATION
1713
1527
pthread_mutex_lock(&LOCK_active_mi);
1714
1528
if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
1715
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
1529
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
1716
1530
ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
1717
1531
"The currect value for master_heartbeat_period"
1718
1532
" exceeds the new value of `slave_net_timeout' sec."
1719
1533
" A sensible value for the period should be"
1720
1534
" less than the timeout.");
1721
1535
pthread_mutex_unlock(&LOCK_active_mi);