78
static int send_file(Session *session)
77
static int send_file(THD *thd)
80
NET* net = &session->net;
81
80
int fd = -1, error = 1;
83
82
char fname[FN_REFLEN+1];
84
83
const char *errmsg = 0;
86
85
unsigned long packet_len;
87
unsigned char buf[IO_SIZE]; // It's safe to alloc this
86
uchar buf[IO_SIZE]; // It's safe to alloc this
90
89
The client might be slow loading the data, give him wait_timeout to do
93
92
old_timeout= net->read_timeout;
94
my_net_set_read_timeout(net, session->variables.net_wait_timeout);
93
my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
97
96
We need net_flush here because the client will not know it needs to send
244
243
my_message(errmsg, ER(errmsg), MYF(0));
252
bool purge_master_logs(Session* session, const char* to_log)
251
bool purge_master_logs(THD* thd, const char* to_log)
254
253
char search_file_name[FN_REFLEN];
255
254
if (!mysql_bin_log.is_open())
261
260
mysql_bin_log.make_log_name(search_file_name, to_log);
262
return purge_error_message(session,
261
return purge_error_message(thd,
263
262
mysql_bin_log.purge_logs(search_file_name, 0, 1,
268
bool purge_master_logs_before_date(Session* session, time_t purge_time)
267
bool purge_master_logs_before_date(THD* thd, time_t purge_time)
270
269
if (!mysql_bin_log.is_open())
275
return purge_error_message(session,
274
return purge_error_message(thd,
276
275
mysql_bin_log.purge_logs_before_date(purge_time));
310
309
An auxiliary function for calling in mysql_binlog_send
311
310
to initialize the heartbeat timeout in waiting for a binlogged event.
313
@param[in] session Session to access a user variable
312
@param[in] thd THD to access a user variable
315
314
@return heartbeat period an uint64_t of nanoseconds
316
315
or zero if heartbeat was not demanded by slave
318
static uint64_t get_heartbeat_period(Session * session)
317
static uint64_t get_heartbeat_period(THD * thd)
321
320
LEX_STRING name= { C_STRING_WITH_LEN("master_heartbeat_period")};
322
321
user_var_entry *entry=
323
(user_var_entry*) hash_search(&session->user_vars, (unsigned char*) name.str,
322
(user_var_entry*) hash_search(&thd->user_vars, (uchar*) name.str,
325
324
return entry? entry->val_int(&null_value) : 0;
377
376
TODO: Clean up loop to only have one call to send_file()
380
void mysql_binlog_send(Session* session, char* log_ident, my_off_t pos,
379
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
384
383
char *log_file_name = linfo.log_file_name;
385
384
char search_file_name[FN_REFLEN], *name;
388
String* packet = &session->packet;
387
String* packet = &thd->packet;
390
389
const char *errmsg = "Unknown error";
391
NET* net = &session->net;
390
NET* net = &thd->net;
392
391
pthread_mutex_t *log_lock;
393
392
bool binlog_can_be_corrupted= false;
805
804
end_io_cache(&log);
806
805
(void)my_close(file, MYF(MY_WME));
809
session->set_proc_info("Waiting to finalize termination");
808
thd_proc_info(thd, "Waiting to finalize termination");
810
809
pthread_mutex_lock(&LOCK_thread_count);
811
session->current_linfo = 0;
810
thd->current_linfo = 0;
812
811
pthread_mutex_unlock(&LOCK_thread_count);
816
session->set_proc_info("Waiting to finalize termination");
815
thd_proc_info(thd, "Waiting to finalize termination");
817
816
end_io_cache(&log);
819
818
Exclude iteration through thread list
820
819
this is needed for purge_logs() - it will iterate through
821
thread list and update session->current_linfo->index_file_offset
820
thread list and update thd->current_linfo->index_file_offset
822
821
this mutex will make sure that it never tried to update our linfo
823
822
after we return from this stack frame
825
824
pthread_mutex_lock(&LOCK_thread_count);
826
session->current_linfo = 0;
825
thd->current_linfo = 0;
827
826
pthread_mutex_unlock(&LOCK_thread_count);
829
828
(void) my_close(file, MYF(MY_WME));
846
845
don't wan't to touch the other thread), so set the bit to 0 for the
849
if (session->lex->slave_session_opt)
850
thread_mask&= session->lex->slave_session_opt;
848
if (thd->lex->slave_thd_opt)
849
thread_mask&= thd->lex->slave_thd_opt;
851
850
if (thread_mask) //some threads are stopped, start them
853
if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
852
if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
854
854
slave_errno=ER_MASTER_INFO;
855
else if (server_id_supplied && *mi->getHostname())
855
else if (server_id_supplied && *mi->host)
858
858
If we will start SQL thread we will care about UNTIL options If
864
864
pthread_mutex_lock(&mi->rli.data_lock);
866
if (session->lex->mi.pos)
866
if (thd->lex->mi.pos)
868
868
mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS;
869
mi->rli.until_log_pos= session->lex->mi.pos;
869
mi->rli.until_log_pos= thd->lex->mi.pos;
871
We don't check session->lex->mi.log_file_name for NULL here
871
We don't check thd->lex->mi.log_file_name for NULL here
872
872
since it is checked in sql_yacc.yy
874
strmake(mi->rli.until_log_name, session->lex->mi.log_file_name,
874
strmake(mi->rli.until_log_name, thd->lex->mi.log_file_name,
875
875
sizeof(mi->rli.until_log_name)-1);
877
else if (session->lex->mi.relay_log_pos)
877
else if (thd->lex->mi.relay_log_pos)
879
879
mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS;
880
mi->rli.until_log_pos= session->lex->mi.relay_log_pos;
881
strmake(mi->rli.until_log_name, session->lex->mi.relay_log_name,
880
mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
881
strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
882
882
sizeof(mi->rli.until_log_name)-1);
911
911
/* Issuing warning then started without --skip-slave-start */
912
912
if (!opt_skip_slave_start)
913
push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
913
push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
914
914
ER_MISSING_SKIP_SLAVE,
915
915
ER(ER_MISSING_SKIP_SLAVE));
918
918
pthread_mutex_unlock(&mi->rli.data_lock);
920
else if (session->lex->mi.pos || session->lex->mi.relay_log_pos)
921
push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
920
else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
921
push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
922
922
ER(ER_UNTIL_COND_IGNORED));
924
924
if (!slave_errno)
1138
session->set_proc_info("Changing master");
1139
LEX_MASTER_INFO* lex_mi= &session->lex->mi;
1140
thd_proc_info(thd, "Changing master");
1141
LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
1140
1142
// TODO: see if needs re-write
1141
if (mi->init_master_info(master_info_file, relay_log_info_file, thread_mask))
1143
if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
1143
1146
my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
1144
1147
unlock_slave_threads(mi);
1159
1162
if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
1164
mi->master_log_name[0] = 0;
1165
mi->master_log_pos= BIN_LOG_HEADER_SIZE;
1162
1168
if (lex_mi->log_file_name)
1163
mi->setLogName(lex_mi->log_file_name);
1169
strmake(mi->master_log_name, lex_mi->log_file_name,
1170
sizeof(mi->master_log_name)-1);
1164
1171
if (lex_mi->pos)
1166
mi->setLogPosition(lex_mi->pos);
1173
mi->master_log_pos= lex_mi->pos;
1169
1176
if (lex_mi->host)
1170
mi->setHost(lex_mi->host, lex_mi->port);
1177
strmake(mi->host, lex_mi->host, sizeof(mi->host)-1);
1171
1178
if (lex_mi->user)
1172
mi->setUsername(lex_mi->user);
1179
strmake(mi->user, lex_mi->user, sizeof(mi->user)-1);
1173
1180
if (lex_mi->password)
1174
mi->setPassword(lex_mi->password);
1181
strmake(mi->password, lex_mi->password, sizeof(mi->password)-1);
1183
mi->port = lex_mi->port;
1175
1184
if (lex_mi->connect_retry)
1176
1185
mi->connect_retry = lex_mi->connect_retry;
1177
1186
if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1178
1187
mi->heartbeat_period = lex_mi->heartbeat_period;
1180
mi->heartbeat_period= (float) cmin((double)SLAVE_MAX_HEARTBEAT_PERIOD,
1189
mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
1181
1190
(slave_net_timeout/2.0));
1182
mi->received_heartbeats= 0L; // counter lives until master is CHANGEd
1191
mi->received_heartbeats= 0LL; // counter lives until master is CHANGEd
1192
if (lex_mi->ssl != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1193
mi->ssl= (lex_mi->ssl == LEX_MASTER_INFO::LEX_MI_ENABLE);
1195
if (lex_mi->ssl_verify_server_cert != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
1196
mi->ssl_verify_server_cert=
1197
(lex_mi->ssl_verify_server_cert == LEX_MASTER_INFO::LEX_MI_ENABLE);
1200
strmake(mi->ssl_ca, lex_mi->ssl_ca, sizeof(mi->ssl_ca)-1);
1201
if (lex_mi->ssl_capath)
1202
strmake(mi->ssl_capath, lex_mi->ssl_capath, sizeof(mi->ssl_capath)-1);
1203
if (lex_mi->ssl_cert)
1204
strmake(mi->ssl_cert, lex_mi->ssl_cert, sizeof(mi->ssl_cert)-1);
1205
if (lex_mi->ssl_cipher)
1206
strmake(mi->ssl_cipher, lex_mi->ssl_cipher, sizeof(mi->ssl_cipher)-1);
1207
if (lex_mi->ssl_key)
1208
strmake(mi->ssl_key, lex_mi->ssl_key, sizeof(mi->ssl_key)-1);
1209
if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
1210
lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
1211
lex_mi->ssl_verify_server_cert )
1212
push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1213
ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
1184
1215
if (lex_mi->relay_log_name)
1186
1217
need_relay_log_purge= 0;
1187
mi->rli.event_relay_log_name.assign(lex_mi->relay_log_name);
1218
strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
1219
sizeof(mi->rli.group_relay_log_name)-1);
1220
strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
1221
sizeof(mi->rli.event_relay_log_name)-1);
1190
1224
if (lex_mi->relay_log_pos)
1215
1249
Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
1216
not initialized), so we use a cmax().
1250
not initialized), so we use a max().
1217
1251
What happens to mi->rli.master_log_pos during the initialization stages
1218
1252
of replication is not 100% clear, so we guard against problems using
1221
mi->setLogPosition(((BIN_LOG_HEADER_SIZE > mi->rli.group_master_log_pos)
1222
? BIN_LOG_HEADER_SIZE
1223
: mi->rli.group_master_log_pos));
1224
mi->setLogName(mi->rli.group_master_log_name.c_str());
1255
mi->master_log_pos = max(BIN_LOG_HEADER_SIZE,
1256
mi->rli.group_master_log_pos);
1257
strmake(mi->master_log_name, mi->rli.group_master_log_name,
1258
sizeof(mi->master_log_name)-1);
1227
1261
Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
1228
1262
a slave before).
1264
if (flush_master_info(mi, 0))
1232
1266
my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
1233
1267
unlock_slave_threads(mi);
1272
1306
''/0: we have lost all copies of the original good coordinates.
1273
1307
That's why we always save good coords in rli.
1275
mi->rli.group_master_log_pos= mi->getLogPosition();
1276
mi->rli.group_master_log_name.assign(mi->getLogName());
1309
mi->rli.group_master_log_pos= mi->master_log_pos;
1310
strmake(mi->rli.group_master_log_name,mi->master_log_name,
1311
sizeof(mi->rli.group_master_log_name)-1);
1278
if (mi->rli.group_master_log_name.size() == 0) // uninitialized case
1279
mi->rli.group_master_log_pos= 0;
1313
if (!mi->rli.group_master_log_name[0]) // uninitialized case
1314
mi->rli.group_master_log_pos=0;
1281
1316
pthread_mutex_lock(&mi->rli.data_lock);
1282
1317
mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
1308
1343
ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
1311
return mysql_bin_log.reset_logs(session);
1346
return mysql_bin_log.reset_logs(thd);
1314
1349
int cmp_master_pos(const char* log_file_name1, uint64_t log_pos1,
1315
1350
const char* log_file_name2, uint64_t log_pos2)
1318
uint32_t log_file_name1_len= strlen(log_file_name1);
1319
uint32_t log_file_name2_len= strlen(log_file_name2);
1353
uint log_file_name1_len= strlen(log_file_name1);
1354
uint log_file_name2_len= strlen(log_file_name2);
1321
1356
// We assume that both log names match up to '.'
1322
1357
if (log_file_name1_len == log_file_name2_len)
1332
bool show_binlog_info(Session* session)
1334
Protocol *protocol= session->protocol;
1367
bool mysql_show_binlog_events(THD* thd)
1369
Protocol *protocol= thd->protocol;
1370
List<Item> field_list;
1371
const char *errmsg= 0;
1376
Log_event::init_show_field_list(&field_list);
1377
if (protocol->send_fields(&field_list,
1378
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
1381
Format_description_log_event *description_event= new
1382
Format_description_log_event(3); /* MySQL 4.0 by default */
1385
Wait for handlers to insert any pending information
1386
into the binlog. For e.g. ndb which updates the binlog asynchronously
1387
this is needed so that the uses sees all its own commands in the binlog
1389
ha_binlog_wait(thd);
1391
if (mysql_bin_log.is_open())
1393
LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
1394
SELECT_LEX_UNIT *unit= &thd->lex->unit;
1395
ha_rows event_count, limit_start, limit_end;
1396
my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
1397
char search_file_name[FN_REFLEN], *name;
1398
const char *log_file_name = lex_mi->log_file_name;
1399
pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
1403
unit->set_limit(thd->lex->current_select);
1404
limit_start= unit->offset_limit_cnt;
1405
limit_end= unit->select_limit_cnt;
1407
name= search_file_name;
1409
mysql_bin_log.make_log_name(search_file_name, log_file_name);
1411
name=0; // Find first log
1413
linfo.index_file_offset = 0;
1415
if (mysql_bin_log.find_log_pos(&linfo, name, 1))
1417
errmsg = "Could not find target log";
1421
pthread_mutex_lock(&LOCK_thread_count);
1422
thd->current_linfo = &linfo;
1423
pthread_mutex_unlock(&LOCK_thread_count);
1425
if ((file=open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
1429
to account binlog event header size
1431
thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
1433
pthread_mutex_lock(log_lock);
1436
open_binlog() sought to position 4.
1437
Read the first event in case it's a Format_description_log_event, to
1438
know the format. If there's no such event, we are 3.23 or 4.x. This
1439
code, like before, can't read 3.23 binlogs.
1440
This code will fail on a mixed relay log (one which has Format_desc then
1441
Rotate then Format_desc).
1443
ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
1446
if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
1448
delete description_event;
1449
description_event= (Format_description_log_event*) ev;
1455
my_b_seek(&log, pos);
1457
if (!description_event->is_valid())
1459
errmsg="Invalid Format_description event; could be out of memory";
1463
for (event_count = 0;
1464
(ev = Log_event::read_log_event(&log,(pthread_mutex_t*) 0,
1465
description_event)); )
1467
if (event_count >= limit_start &&
1468
ev->net_send(protocol, linfo.log_file_name, pos))
1470
errmsg = "Net error";
1472
pthread_mutex_unlock(log_lock);
1476
pos = my_b_tell(&log);
1479
if (++event_count >= limit_end)
1483
if (event_count < limit_end && log.error)
1485
errmsg = "Wrong offset or I/O error";
1486
pthread_mutex_unlock(log_lock);
1490
pthread_mutex_unlock(log_lock);
1496
delete description_event;
1500
(void) my_close(file, MYF(MY_WME));
1504
my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
1505
"SHOW BINLOG EVENTS", errmsg);
1509
pthread_mutex_lock(&LOCK_thread_count);
1510
thd->current_linfo = 0;
1511
pthread_mutex_unlock(&LOCK_thread_count);
1516
bool show_binlog_info(THD* thd)
1518
Protocol *protocol= thd->protocol;
1335
1519
List<Item> field_list;
1336
1520
field_list.push_back(new Item_empty_string("File", FN_REFLEN));
1337
1521
field_list.push_back(new Item_return_int("Position",20,
1457
1641
int log_loaded_block(IO_CACHE* file)
1459
1643
LOAD_FILE_INFO *lf_info;
1461
1645
/* buffer contains position where we started last read */
1462
unsigned char* buffer= (unsigned char*) my_b_get_buffer_start(file);
1463
uint32_t max_event_size= current_session->variables.max_allowed_packet;
1646
uchar* buffer= (uchar*) my_b_get_buffer_start(file);
1647
uint max_event_size= current_thd->variables.max_allowed_packet;
1464
1648
lf_info= (LOAD_FILE_INFO*) file->arg;
1465
if (lf_info->session->current_stmt_binlog_row_based)
1649
if (lf_info->thd->current_stmt_binlog_row_based)
1467
1651
if (lf_info->last_pos_in_file != HA_POS_ERROR &&
1468
1652
lf_info->last_pos_in_file >= my_b_get_pos_in_file(file))
1471
1655
for (block_len= my_b_get_bytes_in_buffer(file); block_len > 0;
1472
buffer += cmin(block_len, max_event_size),
1473
block_len -= cmin(block_len, max_event_size))
1656
buffer += min(block_len, max_event_size),
1657
block_len -= min(block_len, max_event_size))
1475
1659
lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
1476
1660
if (lf_info->wrote_create_file)
1478
Append_block_log_event a(lf_info->session, lf_info->session->db, buffer,
1479
cmin(block_len, max_event_size),
1662
Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
1663
min(block_len, max_event_size),
1480
1664
lf_info->log_delayed);
1481
1665
mysql_bin_log.write(&a);
1485
Begin_load_query_log_event b(lf_info->session, lf_info->session->db,
1669
Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
1487
cmin(block_len, max_event_size),
1671
min(block_len, max_event_size),
1488
1672
lf_info->log_delayed);
1489
1673
mysql_bin_log.write(&b);
1490
1674
lf_info->wrote_create_file= 1;
1518
1702
sys_var_sync_binlog_period(sys_var_chain *chain, const char *name_arg,
1519
1703
ulong *value_ptr)
1520
1704
:sys_var_long_ptr(chain, name_arg,value_ptr) {}
1521
bool update(Session *session, set_var *var);
1705
bool update(THD *thd, set_var *var);
1524
static void fix_slave_net_timeout(Session *session,
1708
static void fix_slave_net_timeout(THD *thd,
1525
1709
enum_var_type type __attribute__((unused)))
1711
#ifdef HAVE_REPLICATION
1527
1712
pthread_mutex_lock(&LOCK_active_mi);
1528
1713
if (active_mi && slave_net_timeout < active_mi->heartbeat_period)
1529
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
1714
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
1530
1715
ER_SLAVE_HEARTBEAT_VALUE_OUT_OF_RANGE,
1531
1716
"The currect value for master_heartbeat_period"
1532
1717
" exceeds the new value of `slave_net_timeout' sec."
1533
1718
" A sensible value for the period should be"
1534
1719
" less than the timeout.");
1535
1720
pthread_mutex_unlock(&LOCK_active_mi);