109
197
The following functions form part of the C plugin API
111
200
extern "C" int mysql_tmpfile(const char *prefix)
113
202
char filename[FN_REFLEN];
114
int fd = internal::create_temp_file(filename, drizzle_tmpdir, prefix, MYF(MY_WME));
203
File fd = create_temp_file(filename, mysql_tmpdir, prefix,
204
O_CREAT | O_EXCL | O_RDWR | O_TEMPORARY,
208
This can be removed once the following bug is fixed:
209
Bug #28903 create_temp_file() doesn't honor O_TEMPORARY option
210
(file not removed) (Unix)
116
212
unlink(filename);
123
int session_tablespace_op(const Session *session)
125
return test(session->tablespace_op);
129
Set the process info field of the Session structure.
131
This function is used by plug-ins. Internally, the
132
Session::set_proc_info() function should be used.
134
@see Session::set_proc_info
137
set_session_proc_info(Session *session, const char *info)
139
session->set_proc_info(info);
143
const char *get_session_proc_info(Session *session)
145
return session->get_proc_info();
148
void **Session::getEngineData(const plugin::MonitoredInTransaction *monitored)
150
return static_cast<void **>(&ha_data[monitored->getId()].ha_ptr);
153
ResourceContext *Session::getResourceContext(const plugin::MonitoredInTransaction *monitored,
156
return &ha_data[monitored->getId()].resource_context[index];
160
int64_t session_test_options(const Session *session, int64_t test_options)
162
return session->options & test_options;
166
int session_sql_command(const Session *session)
168
return (int) session->lex->sql_command;
172
int session_tx_isolation(const Session *session)
174
return (int) session->variables.tx_isolation;
177
Session::Session(plugin::Client *client_arg)
179
Open_tables_state(refresh_version),
180
mem_root(&main_mem_root),
186
lock_id(&main_lock_id),
188
ha_data(plugin::num_trx_monitored_objects),
189
arg_of_last_insert_id_function(false),
190
first_successful_insert_id_in_prev_stmt(0),
191
first_successful_insert_id_in_cur_stmt(0),
194
some_tables_deleted(false),
197
is_fatal_error(false),
198
transaction_rollback_request(false),
199
is_fatal_sub_stmt_error(0),
200
derived_tables_processing(false),
201
tablespace_op(false),
204
transaction_message(NULL),
205
statement_message(NULL)
207
memset(process_list_info, 0, PROCESS_LIST_WIDTH);
208
client->setSession(this);
220
int thd_in_lock_tables(const THD *thd)
222
return test(thd->in_lock_tables);
227
int thd_tablespace_op(const THD *thd)
229
return test(thd->tablespace_op);
234
const char *set_thd_proc_info(THD *thd, const char *info,
235
const char *calling_function __attribute__((unused)),
236
const char *calling_file __attribute__((unused)),
237
const unsigned int calling_line __attribute__((unused)))
239
const char *old_info= thd->get_proc_info();
240
thd->set_proc_info(info);
245
void **thd_ha_data(const THD *thd, const struct handlerton *hton)
247
return (void **) &thd->ha_data[hton->slot].ha_ptr;
251
int64_t thd_test_options(const THD *thd, int64_t test_options)
253
return thd->options & test_options;
257
int thd_sql_command(const THD *thd)
259
return (int) thd->lex->sql_command;
263
int thd_tx_isolation(const THD *thd)
265
return (int) thd->variables.tx_isolation;
269
void thd_inc_row_count(THD *thd)
275
Clear this diagnostics area.
277
Normally called at the end of a statement.
281
Diagnostics_area::reset_diagnostics_area()
283
can_overwrite_status= false;
284
/** Don't take chances in production */
290
m_total_warn_count= 0;
292
/** Tiny reset in debug mode to see garbage right away */
298
Set OK status -- ends commands that do not return a
299
result set, e.g. INSERT/UPDATE/DELETE.
303
Diagnostics_area::set_ok_status(THD *thd, ha_rows affected_rows_arg,
304
uint64_t last_insert_id_arg,
305
const char *message_arg)
309
In production, refuse to overwrite an error or a custom response
312
if (is_error() || is_disabled())
314
/** Only allowed to report success if has not yet reported an error */
316
m_server_status= thd->server_status;
317
m_total_warn_count= thd->total_warn_count;
318
m_affected_rows= affected_rows_arg;
319
m_last_insert_id= last_insert_id_arg;
321
strmake(m_message, message_arg, sizeof(m_message) - 1);
333
Diagnostics_area::set_eof_status(THD *thd)
335
/** Only allowed to report eof if has not yet reported an error */
339
In production, refuse to overwrite an error or a custom response
342
if (is_error() || is_disabled())
345
m_server_status= thd->server_status;
347
If inside a stored procedure, do not return the total
348
number of warnings, since they are not available to the client
351
m_total_warn_count= thd->total_warn_count;
361
Diagnostics_area::set_error_status(THD *thd __attribute__((unused)),
363
const char *message_arg)
366
Only allowed to report error if has not yet reported a success
367
The only exception is when we flush the message to the client,
368
an error can happen during the flush.
370
assert(! is_set() || can_overwrite_status);
372
In production, refuse to overwrite a custom response with an
378
m_sql_errno= sql_errno_arg;
379
strmake(m_message, message_arg, sizeof(m_message) - 1);
386
Mark the diagnostics area as 'DISABLED'.
388
This is used in rare cases when the COM_ command at hand sends a response
389
in a custom format. One example is the query cache, another is
394
Diagnostics_area::disable_status()
397
m_status= DA_DISABLED;
402
:Statement(&main_lex, &main_mem_root, CONVENTIONAL_EXECUTION,
403
/* statement id */ 0),
404
Open_tables_state(refresh_version), rli_fake(0),
405
lock_id(&main_lock_id),
406
user_time(0), in_sub_stmt(0),
407
binlog_table_maps(0), binlog_flags(0UL),
408
arg_of_last_insert_id_function(false),
409
first_successful_insert_id_in_prev_stmt(0),
410
first_successful_insert_id_in_prev_stmt_for_binlog(0),
411
first_successful_insert_id_in_cur_stmt(0),
412
stmt_depends_on_first_successful_insert_id_in_prev_stmt(false),
415
transaction_rollback_request(0),
416
is_fatal_sub_stmt_error(0),
421
derived_tables_processing(false),
211
427
Pass nominal parameters to init_alloc_root only to ensure that
212
428
the destructor works OK in case of an error. The main_mem_root
213
429
will be re-initialized in init_for_queries().
215
memory::init_sql_alloc(&main_mem_root, memory::ROOT_MIN_BLOCK_SIZE, 0);
431
init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0);
434
catalog= (char*)"std"; // the only catalog we have for now
435
main_security_ctx.init();
436
security_ctx= &main_security_ctx;
437
some_tables_deleted=no_errors=password= 0;
217
439
count_cuted_fields= CHECK_FIELD_IGNORE;
218
440
killed= NOT_KILLED;
442
is_slave_error= thread_specific_used= false;
443
hash_clear(&handler_tables_hash);
222
446
cuted_fields= sent_row_count= row_count= 0L;
223
448
row_count_func= -1;
224
449
statement_id_counter= 0UL;
225
// Must be reset to handle error with Session's created for init of mysqld
450
// Must be reset to handle error with THD's created for init of mysqld
226
451
lex->current_select= 0;
227
452
start_time=(time_t) 0;
229
454
utime_after_lock= 0L;
230
457
memset(&variables, 0, sizeof(variables));
236
dbug_sentry=Session_SENTRY_MAGIC;
237
cleanup_done= abort_on_warning= no_warnings_for_error= false;
463
db_charset= global_system_variables.collation_database;
464
memset(ha_data, 0, sizeof(ha_data));
466
binlog_evt_union.do_union= false;
468
dbug_sentry=THD_SENTRY_MAGIC;
470
client_capabilities= 0; // minimalistic client
471
system_thread= NON_SYSTEM_THREAD;
472
cleanup_done= abort_on_warning= no_warnings_for_error= 0;
473
peer_port= 0; // For SHOW PROCESSLIST
474
transaction.m_pending_rows_event= 0;
476
#ifdef SIGNAL_WITH_VIO_CLOSE
238
479
pthread_mutex_init(&LOCK_delete, MY_MUTEX_INIT_FAST);
240
481
/* Variables with default values */
241
482
proc_info="login";
242
where= Session::DEFAULT_WHERE;
243
command= COM_CONNECT;
245
plugin_sessionvar_init(this);
247
variables= global_system_variables above has reset
248
variables.pseudo_thread_id to 0. We need to correct it here to
249
avoid temporary tables replication failure.
251
variables.pseudo_thread_id= thread_id;
252
server_status= SERVER_STATUS_AUTOCOMMIT;
253
options= session_startup_options;
255
if (variables.max_join_size == HA_POS_ERROR)
256
options |= OPTION_BIG_SELECTS;
258
options &= ~OPTION_BIG_SELECTS;
260
open_options=ha_open_options;
261
update_lock_default= TL_WRITE;
262
session_tx_isolation= (enum_tx_isolation) variables.tx_isolation;
264
memset(warn_count, 0, sizeof(warn_count));
266
memset(&status_var, 0, sizeof(status_var));
483
where= THD::DEFAULT_WHERE;
484
server_id = ::server_id;
268
490
/* Initialize sub structures */
269
memory::init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
491
init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
492
user_connect=(USER_CONN *)0;
270
493
hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0,
271
494
(hash_get_key) get_var_key,
272
495
(hash_free_key) free_user_var, 0);
497
/* For user vars replication*/
499
my_init_dynamic_array(&user_var_events,
500
sizeof(BINLOG_USER_VAR_EVENT *), 16, 16);
502
memset(&user_var_events, 0, sizeof(user_var_events));
505
protocol= &protocol_text; // Default protocol
506
protocol_text.init(this);
508
tablespace_op= false;
510
randominit(&rand, tmp + (ulong) &rand, tmp + (ulong) ::global_query_id);
274
511
substitute_null_with_insert_id = false;
275
512
thr_lock_info_init(&lock_info); /* safety: will be reset after start */
276
513
thr_lock_owner_init(&main_lock_id, &lock_info);
524
858
assert(thread_stack);
526
if (pthread_setspecific(THR_Session, this) ||
527
pthread_setspecific(THR_Mem_root, &mem_root))
860
if (my_pthread_setspecific_ptr(THR_THD, this) ||
861
my_pthread_setspecific_ptr(THR_MALLOC, &mem_root))
530
863
mysys_var=my_thread_var;
533
865
Let mysqld define the thread id (not mysys)
534
This allows us to move Session to different threads if needed.
866
This allows us to move THD to different threads if needed.
536
868
mysys_var->id= thread_id;
537
869
real_id= pthread_self(); // For debugging
540
We have to call thr_lock_info_init() again here as Session may have been
872
We have to call thr_lock_info_init() again here as THD may have been
541
873
created in another thread
543
875
thr_lock_info_init(&lock_info);
548
Init Session for query processing.
549
This has to be called once before we call mysql_parse.
550
See also comments in session.h.
884
THD::cleanup_after_query()
887
This function is used to reset thread data to its default state.
890
This function is not suitable for setting thread data to some
891
non-default values, as there is only one replication thread, so
892
different master threads may overwrite data of each other on
553
void Session::prepareForQueries()
555
if (variables.max_join_size == HA_POS_ERROR)
556
options |= OPTION_BIG_SELECTS;
558
version= refresh_version;
563
reset_root_defaults(mem_root, variables.query_alloc_block_size,
564
variables.query_prealloc_size);
565
transaction.xid_state.xid.null();
566
transaction.xid_state.in_session=1;
569
bool Session::initGlobals()
573
disconnect(ER_OUT_OF_RESOURCES, true);
574
statistic_increment(aborted_connects, &LOCK_status);
582
if (initGlobals() || authenticate())
590
while (! client->haveError() && killed != KILL_CONNECTION)
592
if (! executeStatement())
599
bool Session::schedule()
601
scheduler= plugin::Scheduler::getScheduler();
604
connection_count.increment();
606
if (connection_count > max_used_connections)
607
max_used_connections= connection_count;
609
thread_id= variables.pseudo_thread_id= global_thread_id++;
611
pthread_mutex_lock(&LOCK_thread_count);
612
getSessionList().push_back(this);
613
pthread_mutex_unlock(&LOCK_thread_count);
615
if (scheduler->addSession(this))
617
DRIZZLE_CONNECTION_START(thread_id);
618
char error_message_buff[DRIZZLE_ERRMSG_SIZE];
620
killed= Session::KILL_CONNECTION;
622
statistic_increment(aborted_connects, &LOCK_status);
624
/* Can't use my_error() since store_globals has not been called. */
625
/* TODO replace will better error message */
626
snprintf(error_message_buff, sizeof(error_message_buff),
627
ER(ER_CANT_CREATE_THREAD), 1);
628
client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
636
const char* Session::enter_cond(pthread_cond_t *cond,
637
pthread_mutex_t* mutex,
640
const char* old_msg = get_proc_info();
641
safe_mutex_assert_owner(mutex);
642
mysys_var->current_mutex = mutex;
643
mysys_var->current_cond = cond;
644
this->set_proc_info(msg);
648
void Session::exit_cond(const char* old_msg)
651
Putting the mutex unlock in exit_cond() ensures that
652
mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
653
locked (if that would not be the case, you'll get a deadlock if someone
654
does a Session::awake() on you).
656
pthread_mutex_unlock(mysys_var->current_mutex);
657
pthread_mutex_lock(&mysys_var->mutex);
658
mysys_var->current_mutex = 0;
659
mysys_var->current_cond = 0;
660
this->set_proc_info(old_msg);
661
pthread_mutex_unlock(&mysys_var->mutex);
664
bool Session::authenticate()
667
if (client->authenticate())
670
statistic_increment(aborted_connects, &LOCK_status);
674
bool Session::checkUser(const char *passwd, uint32_t passwd_len, const char *in_db)
676
const string passwd_str(passwd, passwd_len);
677
bool is_authenticated=
678
plugin::Authentication::isAuthenticated(getSecurityContext(),
681
if (is_authenticated != true)
683
/* isAuthenticated has pushed the error message */
687
/* Change database if necessary */
688
if (in_db && in_db[0])
690
SchemaIdentifier identifier(in_db);
691
if (mysql_change_db(this, identifier))
693
/* mysql_change_db() has pushed the error message. */
698
password= test(passwd_len); // remember for error messages
700
/* Ready to handle queries */
704
bool Session::executeStatement()
707
uint32_t packet_length;
709
enum enum_server_command l_command;
712
indicator of uninitialized lex => normal flow of errors handling
715
lex->current_select= 0;
717
main_da.reset_diagnostics_area();
719
if (client->readCommand(&l_packet, &packet_length) == false)
722
if (packet_length == 0)
725
l_command= (enum enum_server_command) (unsigned char) l_packet[0];
727
if (command >= COM_END)
728
command= COM_END; // Wrong command
730
assert(packet_length);
731
return ! dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
734
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
736
/* Remove garbage at start and end of query */
737
while (in_packet_length > 0 && my_isspace(charset(), in_packet[0]))
742
const char *pos= in_packet + in_packet_length; /* Point at end null */
743
while (in_packet_length > 0 &&
744
(pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
750
query.assign(in_packet, in_packet + in_packet_length);
755
bool Session::endTransaction(enum enum_mysql_completiontype completion)
759
TransactionServices &transaction_services= TransactionServices::singleton();
761
if (transaction.xid_state.xa_state != XA_NOTR)
763
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
770
* We don't use endActiveTransaction() here to ensure that this works
771
* even if there is a problem with the OPTION_AUTO_COMMIT flag
772
* (Which of course should never happen...)
774
server_status&= ~SERVER_STATUS_IN_TRANS;
775
if (transaction_services.ha_commit_trans(this, true))
777
options&= ~(OPTION_BEGIN);
780
do_release= 1; /* fall through */
781
case COMMIT_AND_CHAIN:
782
result= endActiveTransaction();
783
if (result == true && completion == COMMIT_AND_CHAIN)
784
result= startTransaction();
786
case ROLLBACK_RELEASE:
787
do_release= 1; /* fall through */
789
case ROLLBACK_AND_CHAIN:
791
server_status&= ~SERVER_STATUS_IN_TRANS;
792
if (transaction_services.ha_rollback_trans(this, true))
794
options&= ~(OPTION_BEGIN);
795
if (result == true && (completion == ROLLBACK_AND_CHAIN))
796
result= startTransaction();
800
my_error(ER_UNKNOWN_COM_ERROR, MYF(0));
805
my_error(killed_errno(), MYF(0));
806
else if ((result == true) && do_release)
807
killed= Session::KILL_CONNECTION;
812
bool Session::endActiveTransaction()
815
TransactionServices &transaction_services= TransactionServices::singleton();
817
if (transaction.xid_state.xa_state != XA_NOTR)
819
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
822
if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
824
server_status&= ~SERVER_STATUS_IN_TRANS;
825
if (transaction_services.ha_commit_trans(this, true))
828
options&= ~(OPTION_BEGIN);
832
bool Session::startTransaction(start_transaction_option_t opt)
836
if (! endActiveTransaction())
842
options|= OPTION_BEGIN;
843
server_status|= SERVER_STATUS_IN_TRANS;
845
if (plugin::TransactionalStorageEngine::notifyStartTransaction(this, opt))
854
void Session::cleanup_after_query()
857
Reset rand_used so that detection of calls to rand() will save random
896
void THD::cleanup_after_query()
899
Reset rand_used so that detection of calls to rand() will save random
858
900
seeds if needed by the slave.
902
Do not reset rand_used if inside a stored function or trigger because
903
only the call to these operations is logged. Thus only the calling
904
statement needs to detect rand() calls made by its substatements. These
905
substatements must not set rand_used to 0 because it would remove the
906
detection of rand() by the calling statement.
908
if (!in_sub_stmt) /* stored functions and triggers are a special case */
861
910
/* Forget those values, for next binlogger: */
911
stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
862
912
auto_inc_intervals_in_cur_stmt_for_binlog.empty();
864
915
if (first_successful_insert_id_in_cur_stmt > 0)
866
917
/* set what LAST_INSERT_ID() will return */
867
first_successful_insert_id_in_prev_stmt= first_successful_insert_id_in_cur_stmt;
918
first_successful_insert_id_in_prev_stmt=
919
first_successful_insert_id_in_cur_stmt;
868
920
first_successful_insert_id_in_cur_stmt= 0;
869
921
substitute_null_with_insert_id= true;
871
arg_of_last_insert_id_function= false;
923
arg_of_last_insert_id_function= 0;
872
924
/* Free Items that were created during this execution */
874
926
/* Reset where. */
875
where= Session::DEFAULT_WHERE;
927
where= THD::DEFAULT_WHERE;
879
932
Create a LEX_STRING in this connection.
941
1155
item->maybe_null= 1;
942
1156
field_list.push_back(new Item_empty_string("Extra", 255, cs));
943
return (result->send_fields(field_list));
946
void select_result::send_error(uint32_t errcode, const char *err)
1157
return (result->send_fields(field_list,
1158
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF));
1161
#ifdef SIGNAL_WITH_VIO_CLOSE
1162
void THD::close_active_vio()
1164
safe_mutex_assert_owner(&LOCK_delete);
1167
vio_close(active_vio);
1175
struct Item_change_record: public ilink
1179
/* Placement new was hidden by `new' in ilink (TODO: check): */
1180
static void *operator new(size_t size __attribute__((unused)),
1183
static void operator delete(void *ptr __attribute__((unused)),
1184
size_t size __attribute__((unused)))
1186
static void operator delete(void *ptr __attribute__((unused)),
1187
void *mem __attribute__((unused)))
1188
{ /* never called */ }
1193
Register an item tree tree transformation, performed by the query
1194
optimizer. We need a pointer to runtime_memroot because it may be !=
1195
thd->mem_root (due to possible set_n_backup_active_arena called for thd).
1198
void THD::nocheck_register_item_tree_change(Item **place, Item *old_value,
1199
MEM_ROOT *runtime_memroot)
1201
Item_change_record *change;
1203
Now we use one node per change, which adds some memory overhead,
1204
but still is rather fast as we use alloc_root for allocations.
1205
A list of item tree changes of an average query should be short.
1207
void *change_mem= alloc_root(runtime_memroot, sizeof(*change));
1208
if (change_mem == 0)
1211
OOM, thd->fatal_error() is called by the error handler of the
1212
memroot. Just return.
1216
change= new (change_mem) Item_change_record;
1217
change->place= place;
1218
change->old_value= old_value;
1219
change_list.append(change);
1223
void THD::rollback_item_tree_changes()
1225
I_List_iterator<Item_change_record> it(change_list);
1226
Item_change_record *change;
1228
while ((change= it++))
1229
*change->place= change->old_value;
1230
/* We can forget about changes memory: it's allocated in runtime memroot */
1231
change_list.empty();
1237
Check that the endpoint is still available.
1240
bool THD::vio_is_connected()
1244
/* End of input is signaled by poll if the socket is aborted. */
1245
if (vio_poll_read(net.vio, 0))
1248
/* Socket is aborted if signaled but no data is available. */
1249
if (vio_peek_read(net.vio, &bytes))
1252
return bytes ? true : false;
1256
/*****************************************************************************
1257
** Functions to provide a interface to select results
1258
*****************************************************************************/
1260
select_result::select_result()
1265
void select_result::send_error(uint errcode,const char *err)
948
1267
my_message(errcode, err, MYF(0));
1271
void select_result::cleanup()
1276
bool select_result::check_simple_select() const
1278
my_error(ER_SP_BAD_CURSOR_QUERY, MYF(0));
1283
static String default_line_term("\n",default_charset_info);
1284
static String default_escaped("\\",default_charset_info);
1285
static String default_field_term("\t",default_charset_info);
1287
sql_exchange::sql_exchange(char *name, bool flag,
1288
enum enum_filetype filetype_arg)
1289
:file_name(name), opt_enclosed(0), dumpfile(flag), skip_lines(0)
1291
filetype= filetype_arg;
1292
field_term= &default_field_term;
1293
enclosed= line_start= &my_empty_string;
1294
line_term= &default_line_term;
1295
escaped= &default_escaped;
1299
bool select_send::send_fields(List<Item> &list, uint flags)
1302
if (!(res= thd->protocol->send_fields(&list, flags)))
1303
is_result_set_started= 1;
1307
void select_send::abort()
1314
Cleanup an instance of this class for re-use
1315
at next execution of a prepared statement/
1316
stored procedure statement.
1319
void select_send::cleanup()
1321
is_result_set_started= false;
1324
/* Send data to client. Returns 0 if ok */
1326
bool select_send::send_data(List<Item> &items)
1328
if (unit->offset_limit_cnt)
1329
{ // using limit offset,count
1330
unit->offset_limit_cnt--;
1335
We may be passing the control from mysqld to the client: release the
1336
InnoDB adaptive hash S-latch to avoid thread deadlocks if it was reserved
1339
ha_release_temporary_latches(thd);
1341
List_iterator_fast<Item> li(items);
1342
Protocol *protocol= thd->protocol;
1343
char buff[MAX_FIELD_WIDTH];
1344
String buffer(buff, sizeof(buff), &my_charset_bin);
1346
protocol->prepare_for_resend();
1350
if (item->send(protocol, &buffer))
1352
protocol->free(); // Free used buffer
1353
my_message(ER_OUT_OF_RESOURCES, ER(ER_OUT_OF_RESOURCES), MYF(0));
1357
thd->sent_row_count++;
1358
if (thd->is_error())
1360
protocol->remove_last_row();
1364
return(protocol->write());
1368
bool select_send::send_eof()
1371
We may be passing the control from mysqld to the client: release the
1372
InnoDB adaptive hash S-latch to avoid thread deadlocks if it was reserved
1375
ha_release_temporary_latches(thd);
1377
/* Unlock tables before sending packet to gain some speed */
1380
mysql_unlock_tables(thd, thd->lock);
1384
is_result_set_started= 0;
951
1389
/************************************************************************
952
1390
Handling writing to file
953
1391
************************************************************************/
955
void select_to_file::send_error(uint32_t errcode,const char *err)
1393
void select_to_file::send_error(uint errcode,const char *err)
957
1395
my_message(errcode, err, MYF(0));
960
(void) end_io_cache(cache);
961
(void) internal::my_close(file, MYF(0));
962
(void) internal::my_delete(path, MYF(0)); // Delete file on error
1398
(void) end_io_cache(&cache);
1399
(void) my_close(file,MYF(0));
1400
(void) my_delete(path,MYF(0)); // Delete file on error
1597
2258
access to mysql.proc table to find definitions of stored routines.
1598
2259
****************************************************************************/
1600
void Session::reset_n_backup_open_tables_state(Open_tables_state *backup)
2261
void THD::reset_n_backup_open_tables_state(Open_tables_state *backup)
1602
2263
backup->set_open_tables_state(this);
1603
2264
reset_open_tables_state();
1604
backups_available= false;
2265
state_flags|= Open_tables_state::BACKUPS_AVAIL;
1608
void Session::restore_backup_open_tables_state(Open_tables_state *backup)
2270
void THD::restore_backup_open_tables_state(Open_tables_state *backup)
1611
2273
Before we will throw away current open tables state we want
1612
2274
to be sure that it was properly cleaned up.
1614
2276
assert(open_tables == 0 && temporary_tables == 0 &&
1615
derived_tables == 0 &&
2277
handler_tables == 0 && derived_tables == 0 &&
2278
lock == 0 && locked_tables == 0);
1617
2279
set_open_tables_state(backup);
1620
bool Session::set_db(const std::string &new_db)
1622
/* Do not reallocate memory if current chunk is big enough. */
1623
if (new_db.length())
1635
2284
Check the killed state of a user thread
1636
@param session user thread
2285
@param thd user thread
1637
2286
@retval 0 the user thread is active
1638
2287
@retval 1 the user thread has been killed
1640
extern "C" int session_killed(const Session *session)
2289
extern "C" int thd_killed(const DRIZZLE_THD thd)
1642
return(session->killed);
2291
return(thd->killed);
1646
Return the session id of a user session
1647
@param pointer to Session object
1648
@return session's id
2295
Return the thread id of a user thread
2296
@param thd user thread
1650
extern "C" unsigned long session_get_thread_id(const Session *session)
1652
return (unsigned long) session->getSessionId();
1656
const struct charset_info_st *session_charset(Session *session)
1658
return(session->charset());
1661
int session_non_transactional_update(const Session *session)
1663
return(session->transaction.all.hasModifiedNonTransData());
1666
void session_mark_transaction_to_rollback(Session *session, bool all)
1668
mark_transaction_to_rollback(session, all);
2299
extern "C" unsigned long thd_get_thread_id(const DRIZZLE_THD thd)
2301
return((unsigned long)thd->thread_id);
2305
#ifdef INNODB_COMPATIBILITY_HOOKS
2306
extern "C" const struct charset_info_st *thd_charset(DRIZZLE_THD thd)
2308
return(thd->charset());
2311
extern "C" char **thd_query(DRIZZLE_THD thd)
2313
return(&thd->query);
2316
extern "C" int thd_slave_thread(const DRIZZLE_THD thd)
2318
return(thd->slave_thread);
2321
extern "C" int thd_non_transactional_update(const DRIZZLE_THD thd)
2323
return(thd->transaction.all.modified_non_trans_table);
2326
extern "C" int thd_binlog_format(const DRIZZLE_THD thd)
2328
return (int) thd->variables.binlog_format;
2331
extern "C" void thd_mark_transaction_to_rollback(DRIZZLE_THD thd, bool all)
2333
mark_transaction_to_rollback(thd, all);
2335
#endif // INNODB_COMPATIBILITY_HOOKS */
1672
2339
Mark transaction to rollback and mark error as fatal to a sub-statement.
1674
@param session Thread handle
2341
@param thd Thread handle
1675
2342
@param all true <=> rollback main transaction.
1677
void mark_transaction_to_rollback(Session *session, bool all)
1681
session->is_fatal_sub_stmt_error= true;
1682
session->transaction_rollback_request= all;
1686
void Session::disconnect(uint32_t errcode, bool should_lock)
1688
/* Allow any plugins to cleanup their session variables */
1689
plugin_sessionvar_cleanup(this);
1691
/* If necessary, log any aborted or unauthorized connections */
1692
if (killed || client->wasAborted())
1693
statistic_increment(aborted_threads, &LOCK_status);
1695
if (client->wasAborted())
1697
if (! killed && variables.log_warnings > 1)
1699
SecurityContext *sctx= &security_ctx;
1701
errmsg_printf(ERRMSG_LVL_WARN, ER(ER_NEW_ABORTING_CONNECTION)
1703
, (db.empty() ? "unconnected" : db.c_str())
1704
, sctx->getUser().empty() == false ? sctx->getUser().c_str() : "unauthenticated"
1705
, sctx->getIp().c_str()
1706
, (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1710
/* Close out our connection to the client */
1712
(void) pthread_mutex_lock(&LOCK_thread_count);
1713
killed= Session::KILL_CONNECTION;
1714
if (client->isConnected())
1718
/*my_error(errcode, ER(errcode));*/
1719
client->sendError(errcode, ER(errcode));
1724
(void) pthread_mutex_unlock(&LOCK_thread_count);
1727
void Session::reset_for_next_command()
1732
Those two lines below are theoretically unneeded as
1733
Session::cleanup_after_query() should take care of this already.
1735
auto_inc_intervals_in_cur_stmt_for_binlog.empty();
1737
is_fatal_error= false;
1738
server_status&= ~ (SERVER_MORE_RESULTS_EXISTS |
1739
SERVER_QUERY_NO_INDEX_USED |
1740
SERVER_QUERY_NO_GOOD_INDEX_USED);
1743
main_da.reset_diagnostics_area();
1744
total_warn_count=0; // Warnings for this query
1745
sent_row_count= examined_row_count= 0;
1749
Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
1752
void Session::close_temporary_tables()
1757
if (not temporary_tables)
1760
for (table= temporary_tables; table; table= tmp_next)
1762
tmp_next= table->next;
1765
temporary_tables= NULL;
1769
unlink from session->temporary tables and close temporary table
1772
void Session::close_temporary_table(Table *table)
1776
table->prev->next= table->next;
1777
if (table->prev->next)
1778
table->next->prev= table->prev;
1782
/* removing the item from the list */
1783
assert(table == temporary_tables);
1785
slave must reset its temporary list pointer to zero to exclude
1786
passing non-zero value to end_slave via rli->save_temporary_tables
1787
when no temp tables opened, see an invariant below.
1789
temporary_tables= table->next;
1790
if (temporary_tables)
1791
table->next->prev= NULL;
1797
Close and drop a temporary table
1800
This dosn't unlink table from session->temporary
1801
If this is needed, use close_temporary_table()
1804
void Session::nukeTable(Table *table)
1806
plugin::StorageEngine *table_type= table->s->db_type();
1808
table->free_io_cache();
1809
table->closefrm(false);
1811
TableIdentifier identifier(table->s->getSchemaName(), table->s->table_name.str, table->s->path.str);
1812
rm_temporary_table(table_type, identifier);
1814
table->s->free_table_share();
1816
/* This makes me sad, but we're allocating it via malloc */
1820
/** Clear most status variables. */
1821
extern time_t flush_status_time;
1822
extern uint32_t max_used_connections;
1824
void Session::refresh_status()
1826
pthread_mutex_lock(&LOCK_status);
1828
/* Add thread's status variabes to global status */
1829
add_to_status(&global_status_var, &status_var);
1831
/* Reset thread's status variables */
1832
memset(&status_var, 0, sizeof(status_var));
1834
/* Reset some global variables */
1835
reset_status_vars();
1837
/* Reset the counters of all key caches (default and named). */
1838
reset_key_cache_counters();
1839
flush_status_time= time((time_t*) 0);
1840
max_used_connections= 1; /* We set it to one, because we know we exist */
1841
pthread_mutex_unlock(&LOCK_status);
1844
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1846
user_var_entry *entry= NULL;
1848
entry= (user_var_entry*) hash_search(&user_vars, (unsigned char*) name.str, name.length);
1850
if ((entry == NULL) && create_if_not_exists)
1852
if (!hash_inited(&user_vars))
1854
entry= new (nothrow) user_var_entry(name.str, query_id);
1859
if (my_hash_insert(&user_vars, (unsigned char*) entry))
1862
free((char*) entry);
1871
void Session::mark_temp_tables_as_free_for_reuse()
1873
for (Table *table= temporary_tables ; table ; table= table->next)
1875
if (table->query_id == query_id)
1878
table->cursor->ha_reset();
1883
void Session::mark_used_tables_as_free_for_reuse(Table *table)
1885
for (; table ; table= table->next)
1887
if (table->query_id == query_id)
1890
table->cursor->ha_reset();
1896
Unlocks tables and frees derived tables.
1897
Put all normal tables used by thread in free list.
1899
It will only close/mark as free for reuse tables opened by this
1900
substatement, it will also check if we are closing tables after
1901
execution of complete query (i.e. we are on upper level) and will
1902
leave prelocked mode if needed.
1904
void Session::close_thread_tables()
1909
We are assuming here that session->derived_tables contains ONLY derived
1910
tables for this substatement. i.e. instead of approach which uses
1911
query_id matching for determining which of the derived tables belong
1912
to this substatement we rely on the ability of substatements to
1913
save/restore session->derived_tables during their execution.
1915
TODO: Probably even better approach is to simply associate list of
1916
derived tables with (sub-)statement instead of thread and destroy
1917
them at the end of its execution.
1923
Close all derived tables generated in queries like
1924
SELECT * FROM (SELECT * FROM t1)
1926
for (table= derived_tables ; table ; table= next)
1929
table->free_tmp_table(this);
1935
Mark all temporary tables used by this statement as free for reuse.
1937
mark_temp_tables_as_free_for_reuse();
1939
Let us commit transaction for statement. Since in 5.0 we only have
1940
one statement transaction and don't allow several nested statement
1941
transactions this call will do nothing if we are inside of stored
1942
function or trigger (i.e. statement transaction is already active and
1943
does not belong to statement for which we do close_thread_tables()).
1944
TODO: This should be fixed in later releases.
1946
if (backups_available == false)
1948
TransactionServices &transaction_services= TransactionServices::singleton();
1949
main_da.can_overwrite_status= true;
1950
transaction_services.ha_autocommit_or_rollback(this, is_error());
1951
main_da.can_overwrite_status= false;
1952
transaction.stmt.reset();
1958
For RBR we flush the pending event just before we unlock all the
1959
tables. This means that we are at the end of a topmost
1960
statement, so we ensure that the STMT_END_F flag is set on the
1961
pending event. For statements that are *inside* stored
1962
functions, the pending event will not be flushed: that will be
1963
handled either before writing a query log event (inside
1964
binlog_query()) or when preparing a pending event.
1966
mysql_unlock_tables(this, lock);
1970
Note that we need to hold LOCK_open while changing the
1971
open_tables list. Another thread may work on it.
1972
(See: remove_table_from_cache(), mysql_wait_completed_table())
1973
Closing a MERGE child before the parent would be fatal if the
1974
other thread tries to abort the MERGE lock in between.
1977
close_open_tables();
1980
void Session::close_tables_for_reopen(TableList **tables)
1983
If table list consists only from tables from prelocking set, table list
1984
for new attempt should be empty, so we have to update list's root pointer.
1986
if (lex->first_not_own_table() == *tables)
1988
lex->chop_off_not_own_tables();
1989
for (TableList *tmp= *tables; tmp; tmp= tmp->next_global)
1991
close_thread_tables();
1994
bool Session::openTablesLock(TableList *tables)
2001
if (open_tables_from_list(&tables, &counter))
2004
if (not lock_tables(tables, counter, &need_reopen))
2006
if (not need_reopen)
2008
close_tables_for_reopen(&tables);
2010
if ((mysql_handle_derived(lex, &mysql_derived_prepare) ||
2011
(fill_derived_tables() &&
2012
mysql_handle_derived(lex, &mysql_derived_filling))))
2018
bool Session::openTables(TableList *tables, uint32_t flags)
2021
bool ret= fill_derived_tables();
2022
assert(ret == false);
2023
if (open_tables_from_list(&tables, &counter, flags) ||
2024
mysql_handle_derived(lex, &mysql_derived_prepare))
2029
bool Session::rm_temporary_table(TableIdentifier &identifier)
2031
if (plugin::StorageEngine::dropTable(*this, identifier))
2033
errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
2034
identifier.getSQLPath().c_str(), errno);
2035
dumpTemporaryTableNames("rm_temporary_table()");
2043
bool Session::rm_temporary_table(plugin::StorageEngine *base, TableIdentifier &identifier)
2047
if (plugin::StorageEngine::dropTable(*this, *base, identifier))
2049
errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
2050
identifier.getSQLPath().c_str(), errno);
2051
dumpTemporaryTableNames("rm_temporary_table()");
2060
@note this will be removed, I am looking through Hudson to see if it is finding
2061
any tables that are missed during cleanup.
2063
void Session::dumpTemporaryTableNames(const char *foo)
2067
if (not temporary_tables)
2070
cerr << "Begin Run: " << foo << "\n";
2071
for (table= temporary_tables; table; table= table->next)
2073
bool have_proto= false;
2075
message::Table *proto= table->s->getTableProto();
2076
if (table->s->getTableProto())
2079
const char *answer= have_proto ? "true" : "false";
2083
cerr << "\tTable Name " << table->s->getSchemaName() << "." << table->s->table_name.str << " : " << answer << "\n";
2084
cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
2087
cerr << "\tTabl;e Name " << table->s->getSchemaName() << "." << table->s->table_name.str << " : " << answer << "\n";
2091
bool Session::storeTableMessage(TableIdentifier &identifier, message::Table &table_message)
2093
table_message_cache.insert(make_pair(identifier.getPath(), table_message));
2098
bool Session::removeTableMessage(TableIdentifier &identifier)
2100
TableMessageCache::iterator iter;
2102
iter= table_message_cache.find(identifier.getPath());
2104
if (iter == table_message_cache.end())
2107
table_message_cache.erase(iter);
2112
bool Session::getTableMessage(TableIdentifier &identifier, message::Table &table_message)
2114
TableMessageCache::iterator iter;
2116
iter= table_message_cache.find(identifier.getPath());
2118
if (iter == table_message_cache.end())
2121
table_message.CopyFrom(((*iter).second));
2126
bool Session::doesTableMessageExist(TableIdentifier &identifier)
2128
TableMessageCache::iterator iter;
2130
iter= table_message_cache.find(identifier.getPath());
2132
if (iter == table_message_cache.end())
2140
bool Session::renameTableMessage(TableIdentifier &from, TableIdentifier &to)
2142
TableMessageCache::iterator iter;
2144
table_message_cache[to.getPath()]= table_message_cache[from.getPath()];
2146
iter= table_message_cache.find(to.getPath());
2148
if (iter == table_message_cache.end())
2153
(*iter).second.set_schema(to.getSchemaName());
2154
(*iter).second.set_name(to.getTableName());
2159
} /* namespace drizzled */
2345
void mark_transaction_to_rollback(THD *thd, bool all)
2349
thd->is_fatal_sub_stmt_error= true;
2350
thd->transaction_rollback_request= all;
2353
/***************************************************************************
2354
Handling of XA id cacheing
2355
***************************************************************************/
2357
pthread_mutex_t LOCK_xid_cache;
2360
extern "C" uchar *xid_get_hash_key(const uchar *, size_t *, bool);
2361
extern "C" void xid_free_hash(void *);
2363
uchar *xid_get_hash_key(const uchar *ptr, size_t *length,
2364
bool not_used __attribute__((unused)))
2366
*length=((XID_STATE*)ptr)->xid.key_length();
2367
return ((XID_STATE*)ptr)->xid.key();
2370
void xid_free_hash(void *ptr)
2372
if (!((XID_STATE*)ptr)->in_thd)
2373
my_free((uchar*)ptr, MYF(0));
2376
bool xid_cache_init()
2378
pthread_mutex_init(&LOCK_xid_cache, MY_MUTEX_INIT_FAST);
2379
return hash_init(&xid_cache, &my_charset_bin, 100, 0, 0,
2380
xid_get_hash_key, xid_free_hash, 0) != 0;
2383
void xid_cache_free()
2385
if (hash_inited(&xid_cache))
2387
hash_free(&xid_cache);
2388
pthread_mutex_destroy(&LOCK_xid_cache);
2392
XID_STATE *xid_cache_search(XID *xid)
2394
pthread_mutex_lock(&LOCK_xid_cache);
2395
XID_STATE *res=(XID_STATE *)hash_search(&xid_cache, xid->key(), xid->key_length());
2396
pthread_mutex_unlock(&LOCK_xid_cache);
2401
bool xid_cache_insert(XID *xid, enum xa_states xa_state)
2405
pthread_mutex_lock(&LOCK_xid_cache);
2406
if (hash_search(&xid_cache, xid->key(), xid->key_length()))
2408
else if (!(xs=(XID_STATE *)my_malloc(sizeof(*xs), MYF(MY_WME))))
2412
xs->xa_state=xa_state;
2415
res=my_hash_insert(&xid_cache, (uchar*)xs);
2417
pthread_mutex_unlock(&LOCK_xid_cache);
2422
bool xid_cache_insert(XID_STATE *xid_state)
2424
pthread_mutex_lock(&LOCK_xid_cache);
2425
assert(hash_search(&xid_cache, xid_state->xid.key(),
2426
xid_state->xid.key_length())==0);
2427
bool res=my_hash_insert(&xid_cache, (uchar*)xid_state);
2428
pthread_mutex_unlock(&LOCK_xid_cache);
2433
void xid_cache_delete(XID_STATE *xid_state)
2435
pthread_mutex_lock(&LOCK_xid_cache);
2436
hash_delete(&xid_cache, (uchar *)xid_state);
2437
pthread_mutex_unlock(&LOCK_xid_cache);
2441
Implementation of interface to write rows to the binary log through the
2442
thread. The thread is responsible for writing the rows it has
2443
inserted/updated/deleted.
2446
#ifndef DRIZZLE_CLIENT
2449
Template member function for ensuring that there is an rows log
2450
event of the apropriate type before proceeding.
2453
- Events of type 'RowEventT' have the type code 'type_code'.
2456
If a non-NULL pointer is returned, the pending event for thread 'thd' will
2457
be an event of type 'RowEventT' (which have the type code 'type_code')
2458
will either empty or have enough space to hold 'needed' bytes. In
2459
addition, the columns bitmap will be correct for the row, meaning that
2460
the pending event will be flushed if the columns in the event differ from
2461
the columns suppled to the function.
2464
If no error, a non-NULL pending event (either one which already existed or
2465
the newly created one).
2469
template <class RowsEventT> Rows_log_event*
2470
THD::binlog_prepare_pending_rows_event(Table* table, uint32_t serv_id,
2472
bool is_transactional,
2473
RowsEventT *hint __attribute__((unused)))
2475
/* Pre-conditions */
2476
assert(table->s->table_map_id != UINT32_MAX);
2478
/* Fetch the type code for the RowsEventT template parameter */
2479
int const type_code= RowsEventT::TYPE_CODE;
2482
There is no good place to set up the transactional data, so we
2485
if (binlog_setup_trx_data())
2488
Rows_log_event* pending= binlog_get_pending_rows_event();
2490
if (unlikely(pending && !pending->is_valid()))
2494
Check if the current event is non-NULL and a write-rows
2495
event. Also check if the table provided is mapped: if it is not,
2496
then we have switched to writing to a new table.
2497
If there is no pending event, we need to create one. If there is a pending
2498
event, but it's not about the same table id, or not of the same type
2499
(between Write, Update and Delete), or not the same affected columns, or
2500
going to be too big, flush this event to disk and create a new pending
2503
The last test is necessary for the Cluster injector to work
2504
correctly. The reason is that the Cluster can inject two write
2505
rows with different column bitmaps if there is an insert followed
2506
by an update in the same transaction, and these are grouped into a
2507
single epoch/transaction when fed to the injector.
2509
TODO: Fix the code so that the last test can be removed.
2512
pending->server_id != serv_id ||
2513
pending->get_table_id() != table->s->table_map_id ||
2514
pending->get_type_code() != type_code ||
2515
pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
2516
!bitmap_cmp(pending->get_cols(), table->write_set))
2518
/* Create a new RowsEventT... */
2519
Rows_log_event* const
2520
ev= new RowsEventT(this, table, table->s->table_map_id,
2524
ev->server_id= serv_id; // I don't like this, it's too easy to forget.
2526
flush the pending event and replace it with the newly created
2529
if (unlikely(mysql_bin_log.flush_and_set_pending_rows_event(this, ev)))
2535
return(ev); /* This is the new pending event */
2537
return(pending); /* This is the current pending event */
2540
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
2542
Instantiate the versions we need, we have -fno-implicit-template as
2545
template Rows_log_event*
2546
THD::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
2547
Write_rows_log_event*);
2549
template Rows_log_event*
2550
THD::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
2551
Delete_rows_log_event *);
2553
template Rows_log_event*
2554
THD::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
2555
Update_rows_log_event *);
2560
Class to handle temporary allocation of memory for row data.
2562
The responsibilities of the class is to provide memory for
2563
packing one or two rows of packed data (depending on what
2564
constructor is called).
2566
In order to make the allocation more efficient for "simple" rows,
2567
i.e., rows that do not contain any blobs, a pointer to the
2568
allocated memory is of memory is stored in the table structure
2569
for simple rows. If memory for a table containing a blob field
2570
is requested, only memory for that is allocated, and subsequently
2571
released when the object is destroyed.
2574
class Row_data_memory {
2577
Build an object to keep track of a block-local piece of memory
2578
for storing a row of data.
2581
Table where the pre-allocated memory is stored.
2584
Length of data that is needed, if the record contain blobs.
2586
Row_data_memory(Table *table, size_t const len1)
2589
m_alloc_checked= false;
2590
allocate_memory(table, len1);
2591
m_ptr[0]= has_memory() ? m_memory : 0;
2595
Row_data_memory(Table *table, size_t const len1, size_t const len2)
2598
m_alloc_checked= false;
2599
allocate_memory(table, len1 + len2);
2600
m_ptr[0]= has_memory() ? m_memory : 0;
2601
m_ptr[1]= has_memory() ? m_memory + len1 : 0;
2606
if (m_memory != 0 && m_release_memory_on_destruction)
2607
my_free((uchar*) m_memory, MYF(MY_WME));
2611
Is there memory allocated?
2613
@retval true There is memory allocated
2614
@retval false Memory allocation failed
2616
bool has_memory() const {
2617
m_alloc_checked= true;
2618
return m_memory != 0;
2623
assert(s < sizeof(m_ptr)/sizeof(*m_ptr));
2624
assert(m_ptr[s] != 0);
2625
assert(m_alloc_checked == true);
2630
void allocate_memory(Table *const table, size_t const total_length)
2632
if (table->s->blob_fields == 0)
2635
The maximum length of a packed record is less than this
2636
length. We use this value instead of the supplied length
2637
when allocating memory for records, since we don't know how
2638
the memory will be used in future allocations.
2640
Since table->s->reclength is for unpacked records, we have
2641
to add two bytes for each field, which can potentially be
2642
added to hold the length of a packed field.
2644
size_t const maxlen= table->s->reclength + 2 * table->s->fields;
2647
Allocate memory for two records if memory hasn't been
2648
allocated. We allocate memory for two records so that it can
2649
be used when processing update rows as well.
2651
if (table->write_row_record == 0)
2652
table->write_row_record=
2653
(uchar *) alloc_root(&table->mem_root, 2 * maxlen);
2654
m_memory= table->write_row_record;
2655
m_release_memory_on_destruction= false;
2659
m_memory= (uchar *) my_malloc(total_length, MYF(MY_WME));
2660
m_release_memory_on_destruction= true;
2664
mutable bool m_alloc_checked;
2665
bool m_release_memory_on_destruction;
2672
int THD::binlog_write_row(Table* table, bool is_trans,
2673
uchar const *record)
2675
assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
2678
Pack records into format for transfer. We are allocating more
2679
memory than needed, but that doesn't matter.
2681
Row_data_memory memory(table, table->max_row_length(record));
2682
if (!memory.has_memory())
2683
return HA_ERR_OUT_OF_MEM;
2685
uchar *row_data= memory.slot(0);
2687
size_t const len= pack_row(table, table->write_set, row_data, record);
2689
Rows_log_event* const ev=
2690
binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
2691
static_cast<Write_rows_log_event*>(0));
2693
if (unlikely(ev == 0))
2694
return HA_ERR_OUT_OF_MEM;
2696
return ev->add_row_data(row_data, len);
2699
int THD::binlog_update_row(Table* table, bool is_trans,
2700
const uchar *before_record,
2701
const uchar *after_record)
2703
assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
2705
size_t const before_maxlen = table->max_row_length(before_record);
2706
size_t const after_maxlen = table->max_row_length(after_record);
2708
Row_data_memory row_data(table, before_maxlen, after_maxlen);
2709
if (!row_data.has_memory())
2710
return HA_ERR_OUT_OF_MEM;
2712
uchar *before_row= row_data.slot(0);
2713
uchar *after_row= row_data.slot(1);
2715
size_t const before_size= pack_row(table, table->read_set, before_row,
2717
size_t const after_size= pack_row(table, table->write_set, after_row,
2720
Rows_log_event* const ev=
2721
binlog_prepare_pending_rows_event(table, server_id,
2722
before_size + after_size, is_trans,
2723
static_cast<Update_rows_log_event*>(0));
2725
if (unlikely(ev == 0))
2726
return HA_ERR_OUT_OF_MEM;
2729
ev->add_row_data(before_row, before_size) ||
2730
ev->add_row_data(after_row, after_size);
2733
int THD::binlog_delete_row(Table* table, bool is_trans,
2734
uchar const *record)
2736
assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
2739
Pack records into format for transfer. We are allocating more
2740
memory than needed, but that doesn't matter.
2742
Row_data_memory memory(table, table->max_row_length(record));
2743
if (unlikely(!memory.has_memory()))
2744
return HA_ERR_OUT_OF_MEM;
2746
uchar *row_data= memory.slot(0);
2748
size_t const len= pack_row(table, table->read_set, row_data, record);
2750
Rows_log_event* const ev=
2751
binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
2752
static_cast<Delete_rows_log_event*>(0));
2754
if (unlikely(ev == 0))
2755
return HA_ERR_OUT_OF_MEM;
2757
return ev->add_row_data(row_data, len);
2761
int THD::binlog_flush_pending_rows_event(bool stmt_end)
2764
We shall flush the pending event even if we are not in row-based
2765
mode: it might be the case that we left row-based mode before
2766
flushing anything (e.g., if we have explicitly locked tables).
2768
if (!mysql_bin_log.is_open())
2772
Mark the event as the last event of a statement if the stmt_end
2776
if (Rows_log_event *pending= binlog_get_pending_rows_event())
2780
pending->set_flags(Rows_log_event::STMT_END_F);
2781
pending->flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
2782
binlog_table_maps= 0;
2785
error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0);
2793
Member function that will log query, either row-based or
2794
statement-based depending on the value of the 'current_stmt_binlog_row_based'
2795
the value of the 'qtype' flag.
2797
This function should be called after the all calls to ha_*_row()
2798
functions have been issued, but before tables are unlocked and
2802
There shall be no writes to any system table after calling
2803
binlog_query(), so these writes has to be moved to before the call
2804
of binlog_query() for correct functioning.
2806
This is necessesary not only for RBR, but the master might crash
2807
after binlogging the query but before changing the system tables.
2808
This means that the slave and the master are not in the same state
2809
(after the master has restarted), so therefore we have to
2810
eliminate this problem.
2813
Error code, or 0 if no error.
2815
int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
2816
ulong query_len, bool is_trans, bool suppress_use,
2817
THD::killed_state killed_status_arg)
2819
assert(query_arg && mysql_bin_log.is_open());
2821
if (int error= binlog_flush_pending_rows_event(true))
2825
If we are in statement mode and trying to log an unsafe statement,
2826
we should print a warning.
2828
if (lex->is_stmt_unsafe() &&
2829
variables.binlog_format == BINLOG_FORMAT_STMT)
2831
assert(this->query != NULL);
2832
push_warning(this, DRIZZLE_ERROR::WARN_LEVEL_WARN,
2833
ER_BINLOG_UNSAFE_STATEMENT,
2834
ER(ER_BINLOG_UNSAFE_STATEMENT));
2835
if (!(binlog_flags & BINLOG_FLAG_UNSAFE_STMT_PRINTED))
2837
char warn_buf[DRIZZLE_ERRMSG_SIZE];
2838
snprintf(warn_buf, DRIZZLE_ERRMSG_SIZE, "%s Statement: %s",
2839
ER(ER_BINLOG_UNSAFE_STATEMENT), this->query);
2840
sql_print_warning(warn_buf);
2841
binlog_flags|= BINLOG_FLAG_UNSAFE_STMT_PRINTED;
2846
case THD::ROW_QUERY_TYPE:
2847
if (current_stmt_binlog_row_based)
2849
/* Otherwise, we fall through */
2850
case THD::DRIZZLE_QUERY_TYPE:
2852
Using this query type is a conveniece hack, since we have been
2853
moving back and forth between using RBR for replication of
2854
system tables and not using it.
2856
Make sure to change in check_table_binlog_row_based() according
2857
to how you treat this.
2859
case THD::STMT_QUERY_TYPE:
2861
The DRIZZLE_LOG::write() function will set the STMT_END_F flag and
2862
flush the pending rows event if necessary.
2865
Query_log_event qinfo(this, query_arg, query_len, is_trans, suppress_use,
2867
qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
2869
Binlog table maps will be irrelevant after a Query_log_event
2870
(they are just removed on the slave side) so after the query
2871
log event is written to the binary log, we pretend that no
2872
table maps were written.
2874
int error= mysql_bin_log.write(&qinfo);
2875
binlog_table_maps= 0;
2880
case THD::QUERY_TYPE_COUNT:
2882
assert(0 <= qtype && qtype < QUERY_TYPE_COUNT);
2887
bool Discrete_intervals_list::append(uint64_t start, uint64_t val,
2890
/* first, see if this can be merged with previous */
2891
if ((head == NULL) || tail->merge_if_contiguous(start, val, incr))
2893
/* it cannot, so need to add a new interval */
2894
Discrete_interval *new_interval= new Discrete_interval(start, val, incr);
2895
return(append(new_interval));
2900
bool Discrete_intervals_list::append(Discrete_interval *new_interval)
2902
if (unlikely(new_interval == NULL))
2905
head= current= new_interval;
2907
tail->next= new_interval;
2913
#endif /* !defined(DRIZZLE_CLIENT) */