82
82
!strcmp(field_name.str, other.field_name.str);
85
Open_tables_state::Open_tables_state(uint64_t version_arg) :
88
open_tables= temporary_tables= derived_tables= NULL;
89
extra_lock= lock= NULL;
86
Construct an (almost) deep copy of this key. Only those
87
elements that are known to never change are not copied.
88
If out of memory, a partial copy is returned and an error is set
92
Key::Key(const Key &rhs, MEM_ROOT *mem_root)
94
key_create_info(rhs.key_create_info),
95
columns(rhs.columns, mem_root),
97
generated(rhs.generated)
99
list_copy_and_replace_each_value(columns, mem_root);
103
Construct an (almost) deep copy of this foreign key. Only those
104
elements that are known to never change are not copied.
105
If out of memory, a partial copy is returned and an error is set
109
Foreign_key::Foreign_key(const Foreign_key &rhs, MEM_ROOT *mem_root)
111
ref_table(rhs.ref_table),
112
ref_columns(rhs.ref_columns),
113
delete_opt(rhs.delete_opt),
114
update_opt(rhs.update_opt),
115
match_opt(rhs.match_opt)
117
list_copy_and_replace_each_value(ref_columns, mem_root);
121
Test if a foreign key (= generated key) is a prefix of the given key
122
(ignoring key name, key type and order of columns)
125
This is only used to test if an index for a FOREIGN KEY exists
128
We only compare field names
131
0 Generated key is a prefix of other key
135
bool foreign_key_prefix(Key *a, Key *b)
137
/* Ensure that 'a' is the generated key */
140
if (b->generated && a->columns.elements > b->columns.elements)
141
std::swap(a, b); // Put shorter key in 'a'
146
return true; // No foreign key
147
std::swap(a, b); // Put generated key in 'a'
150
/* Test if 'a' is a prefix of 'b' */
151
if (a->columns.elements > b->columns.elements)
152
return true; // Can't be prefix
154
List_iterator<Key_part_spec> col_it1(a->columns);
155
List_iterator<Key_part_spec> col_it2(b->columns);
156
const Key_part_spec *col1, *col2;
158
#ifdef ENABLE_WHEN_INNODB_CAN_HANDLE_SWAPED_FOREIGN_KEY_COLUMNS
159
while ((col1= col_it1++))
163
while ((col2= col_it2++))
172
return true; // Error
174
return false; // Is prefix
176
while ((col1= col_it1++))
179
if (!(*col1 == *col2))
182
return false; // Is prefix
187
/****************************************************************************
188
** Thread specific functions
189
****************************************************************************/
191
Open_tables_state::Open_tables_state(ulong version_arg)
192
:version(version_arg), state_flags(0U)
194
reset_open_tables_state();
93
198
The following functions form part of the C plugin API
95
int mysql_tmpfile(const char *prefix)
201
extern "C" int mysql_tmpfile(const char *prefix)
97
203
char filename[FN_REFLEN];
98
int fd = internal::create_temp_file(filename, drizzle_tmpdir.c_str(), prefix, MYF(MY_WME));
204
File fd = create_temp_file(filename, mysql_tmpdir, prefix,
205
O_CREAT | O_EXCL | O_RDWR | O_TEMPORARY,
209
This can be removed once the following bug is fixed:
210
Bug #28903 create_temp_file() doesn't honor O_TEMPORARY option
211
(file not removed) (Unix)
100
213
unlink(filename);
106
int session_tablespace_op(const Session *session)
108
return test(session->tablespace_op);
221
int thd_in_lock_tables(const THD *thd)
223
return test(thd->in_lock_tables);
228
int thd_tablespace_op(const THD *thd)
230
return test(thd->tablespace_op);
112
Set the process info field of the Session structure.
235
Set the process info field of the THD structure.
114
237
This function is used by plug-ins. Internally, the
115
Session::set_proc_info() function should be used.
238
THD::set_proc_info() function should be used.
117
@see Session::set_proc_info
240
@see THD::set_proc_info
119
void set_session_proc_info(Session *session, const char *info)
121
session->set_proc_info(info);
124
const char *get_session_proc_info(Session *session)
126
return session->get_proc_info();
129
void **Session::getEngineData(const plugin::MonitoredInTransaction *monitored)
131
return static_cast<void **>(&ha_data[monitored->getId()].ha_ptr);
134
ResourceContext *Session::getResourceContext(const plugin::MonitoredInTransaction *monitored,
137
return &ha_data[monitored->getId()].resource_context[index];
140
int64_t session_test_options(const Session *session, int64_t test_options)
142
return session->options & test_options;
145
int session_sql_command(const Session *session)
147
return (int) session->lex->sql_command;
150
enum_tx_isolation session_tx_isolation(const Session *session)
152
return (enum_tx_isolation)session->variables.tx_isolation;
155
Session::Session(plugin::Client *client_arg) :
156
Open_tables_state(refresh_version),
157
mem_root(&main_mem_root),
163
lock_id(&main_lock_id),
165
ha_data(plugin::num_trx_monitored_objects),
166
arg_of_last_insert_id_function(false),
167
first_successful_insert_id_in_prev_stmt(0),
168
first_successful_insert_id_in_cur_stmt(0),
171
some_tables_deleted(false),
174
is_fatal_error(false),
175
transaction_rollback_request(false),
176
is_fatal_sub_stmt_error(0),
177
derived_tables_processing(false),
178
tablespace_op(false),
181
transaction_message(NULL),
182
statement_message(NULL),
183
session_event_observers(NULL),
186
memset(process_list_info, 0, PROCESS_LIST_WIDTH);
187
client->setSession(this);
243
set_thd_proc_info(THD *thd, const char *info)
245
thd->set_proc_info(info);
249
const char *get_thd_proc_info(THD *thd)
251
return thd->get_proc_info();
255
void **thd_ha_data(const THD *thd, const struct handlerton *hton)
257
return (void **) &thd->ha_data[hton->slot].ha_ptr;
261
int64_t thd_test_options(const THD *thd, int64_t test_options)
263
return thd->options & test_options;
267
int thd_sql_command(const THD *thd)
269
return (int) thd->lex->sql_command;
273
int thd_tx_isolation(const THD *thd)
275
return (int) thd->variables.tx_isolation;
279
void thd_inc_row_count(THD *thd)
285
Clear this diagnostics area.
287
Normally called at the end of a statement.
291
Diagnostics_area::reset_diagnostics_area()
293
can_overwrite_status= false;
294
/** Don't take chances in production */
300
m_total_warn_count= 0;
302
/** Tiny reset in debug mode to see garbage right away */
308
Set OK status -- ends commands that do not return a
309
result set, e.g. INSERT/UPDATE/DELETE.
313
Diagnostics_area::set_ok_status(THD *thd, ha_rows affected_rows_arg,
314
uint64_t last_insert_id_arg,
315
const char *message_arg)
319
In production, refuse to overwrite an error or a custom response
322
if (is_error() || is_disabled())
324
/** Only allowed to report success if has not yet reported an error */
326
m_server_status= thd->server_status;
327
m_total_warn_count= thd->total_warn_count;
328
m_affected_rows= affected_rows_arg;
329
m_last_insert_id= last_insert_id_arg;
331
strmake(m_message, message_arg, sizeof(m_message) - 1);
343
Diagnostics_area::set_eof_status(THD *thd)
345
/** Only allowed to report eof if has not yet reported an error */
349
In production, refuse to overwrite an error or a custom response
352
if (is_error() || is_disabled())
355
m_server_status= thd->server_status;
357
If inside a stored procedure, do not return the total
358
number of warnings, since they are not available to the client
361
m_total_warn_count= thd->total_warn_count;
371
Diagnostics_area::set_error_status(THD *thd __attribute__((unused)),
372
uint32_t sql_errno_arg,
373
const char *message_arg)
376
Only allowed to report error if has not yet reported a success
377
The only exception is when we flush the message to the client,
378
an error can happen during the flush.
380
assert(! is_set() || can_overwrite_status);
382
In production, refuse to overwrite a custom response with an
388
m_sql_errno= sql_errno_arg;
389
strmake(m_message, message_arg, sizeof(m_message) - 1);
396
Mark the diagnostics area as 'DISABLED'.
398
This is used in rare cases when the COM_ command at hand sends a response
399
in a custom format. One example is the query cache, another is
404
Diagnostics_area::disable_status()
407
m_status= DA_DISABLED;
412
:Statement(&main_lex, &main_mem_root,
413
/* statement id */ 0),
414
Open_tables_state(refresh_version), rli_fake(0),
415
lock_id(&main_lock_id),
417
binlog_table_maps(0), binlog_flags(0UL),
418
arg_of_last_insert_id_function(false),
419
first_successful_insert_id_in_prev_stmt(0),
420
first_successful_insert_id_in_prev_stmt_for_binlog(0),
421
first_successful_insert_id_in_cur_stmt(0),
422
stmt_depends_on_first_successful_insert_id_in_prev_stmt(false),
425
transaction_rollback_request(0),
426
is_fatal_sub_stmt_error(0),
431
derived_tables_processing(false),
190
437
Pass nominal parameters to init_alloc_root only to ensure that
191
438
the destructor works OK in case of an error. The main_mem_root
192
439
will be re-initialized in init_for_queries().
194
memory::init_sql_alloc(&main_mem_root, memory::ROOT_MIN_BLOCK_SIZE, 0);
196
count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
441
init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0);
443
catalog= (char*)"std"; // the only catalog we have for now
444
main_security_ctx.init();
445
security_ctx= &main_security_ctx;
446
some_tables_deleted=no_errors=password= 0;
448
count_cuted_fields= CHECK_FIELD_IGNORE;
197
449
killed= NOT_KILLED;
451
is_slave_error= thread_specific_used= false;
452
hash_clear(&handler_tables_hash);
201
455
cuted_fields= sent_row_count= row_count= 0L;
202
457
row_count_func= -1;
203
458
statement_id_counter= 0UL;
204
// Must be reset to handle error with Session's created for init of mysqld
459
// Must be reset to handle error with THD's created for init of mysqld
205
460
lex->current_select= 0;
206
461
start_time=(time_t) 0;
208
463
utime_after_lock= 0L;
209
466
memset(&variables, 0, sizeof(variables));
215
scoreboard_index= -1;
216
dbug_sentry=Session_SENTRY_MAGIC;
217
cleanup_done= abort_on_warning= no_warnings_for_error= false;
219
/* query_cache init */
472
db_charset= global_system_variables.collation_database;
473
memset(ha_data, 0, sizeof(ha_data));
475
binlog_evt_union.do_union= false;
476
dbug_sentry=THD_SENTRY_MAGIC;
478
client_capabilities= 0; // minimalistic client
479
system_thread= NON_SYSTEM_THREAD;
480
cleanup_done= abort_on_warning= no_warnings_for_error= 0;
481
peer_port= 0; // For SHOW PROCESSLIST
482
transaction.m_pending_rows_event= 0;
484
pthread_mutex_init(&LOCK_delete, MY_MUTEX_INIT_FAST);
223
486
/* Variables with default values */
224
487
proc_info="login";
225
where= Session::DEFAULT_WHERE;
226
command= COM_CONNECT;
228
plugin_sessionvar_init(this);
488
where= THD::DEFAULT_WHERE;
489
server_id = ::server_id;
495
/* Initialize sub structures */
496
init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
497
user_connect=(USER_CONN *)0;
498
hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0,
499
(hash_get_key) get_var_key,
500
(hash_free_key) free_user_var, 0);
502
/* For user vars replication*/
504
my_init_dynamic_array(&user_var_events,
505
sizeof(BINLOG_USER_VAR_EVENT *), 16, 16);
507
memset(&user_var_events, 0, sizeof(user_var_events));
510
protocol= &protocol_text; // Default protocol
511
protocol_text.init(this);
513
tablespace_op= false;
515
randominit(&rand, tmp + (ulong) &rand, tmp + (ulong) ::global_query_id);
516
substitute_null_with_insert_id = false;
517
thr_lock_info_init(&lock_info); /* safety: will be reset after start */
518
thr_lock_owner_init(&main_lock_id, &lock_info);
520
m_internal_handler= NULL;
524
void THD::push_internal_handler(Internal_error_handler *handler)
527
TODO: The current implementation is limited to 1 handler at a time only.
528
THD and sp_rcontext need to be modified to use a common handler stack.
530
assert(m_internal_handler == NULL);
531
m_internal_handler= handler;
535
bool THD::handle_error(uint32_t sql_errno, const char *message,
536
DRIZZLE_ERROR::enum_warning_level level)
538
if (m_internal_handler)
540
return m_internal_handler->handle_error(sql_errno, message, level, this);
543
return false; // 'false', as per coding style
547
void THD::pop_internal_handler()
549
assert(m_internal_handler != NULL);
550
m_internal_handler= NULL;
554
void *thd_alloc(DRIZZLE_THD thd, unsigned int size)
556
return thd->alloc(size);
560
void *thd_calloc(DRIZZLE_THD thd, unsigned int size)
562
return thd->calloc(size);
566
char *thd_strdup(DRIZZLE_THD thd, const char *str)
568
return thd->strdup(str);
572
char *thd_strmake(DRIZZLE_THD thd, const char *str, unsigned int size)
574
return thd->strmake(str, size);
578
LEX_STRING *thd_make_lex_string(THD *thd, LEX_STRING *lex_str,
579
const char *str, unsigned int size,
580
int allocate_lex_string)
582
return thd->make_lex_string(lex_str, str, size,
583
(bool) allocate_lex_string);
587
void *thd_memdup(DRIZZLE_THD thd, const void* str, unsigned int size)
589
return thd->memdup(str, size);
593
void thd_get_xid(const DRIZZLE_THD thd, DRIZZLE_XID *xid)
595
*xid = *(DRIZZLE_XID *) &thd->transaction.xid_state.xid;
599
Init common variables that has to be reset on start and on change_user
604
pthread_mutex_lock(&LOCK_global_system_variables);
605
plugin_thdvar_init(this);
606
variables.time_format= date_time_format_copy((THD*) 0,
607
variables.time_format);
608
variables.date_format= date_time_format_copy((THD*) 0,
609
variables.date_format);
610
variables.datetime_format= date_time_format_copy((THD*) 0,
611
variables.datetime_format);
230
613
variables= global_system_variables above has reset
231
614
variables.pseudo_thread_id to 0. We need to correct it here to
232
615
avoid temporary tables replication failure.
234
617
variables.pseudo_thread_id= thread_id;
618
pthread_mutex_unlock(&LOCK_global_system_variables);
235
619
server_status= SERVER_STATUS_AUTOCOMMIT;
236
options= session_startup_options;
620
options= thd_startup_options;
238
622
if (variables.max_join_size == HA_POS_ERROR)
239
623
options |= OPTION_BIG_SELECTS;
241
625
options &= ~OPTION_BIG_SELECTS;
627
transaction.all.modified_non_trans_table= transaction.stmt.modified_non_trans_table= false;
243
628
open_options=ha_open_options;
244
update_lock_default= TL_WRITE;
629
update_lock_default= (variables.low_priority_updates ?
630
TL_WRITE_LOW_PRIORITY :
245
632
session_tx_isolation= (enum_tx_isolation) variables.tx_isolation;
246
633
warn_list.empty();
247
634
memset(warn_count, 0, sizeof(warn_count));
248
635
total_warn_count= 0;
637
reset_current_stmt_binlog_row_based();
249
638
memset(&status_var, 0, sizeof(status_var));
251
/* Initialize sub structures */
252
memory::init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
254
substitute_null_with_insert_id = false;
255
lock_info.init(); /* safety: will be reset after start */
256
thr_lock_owner_init(&main_lock_id, &lock_info);
258
m_internal_handler= NULL;
260
plugin::EventObserver::registerSessionEvents(*this);
263
void Session::free_items()
266
/* This works because items are allocated with memory::sql_alloc() */
267
for (; free_list; free_list= next)
269
next= free_list->next;
270
free_list->delete_self();
274
void Session::push_internal_handler(Internal_error_handler *handler)
277
TODO: The current implementation is limited to 1 handler at a time only.
278
Session and sp_rcontext need to be modified to use a common handler stack.
280
assert(m_internal_handler == NULL);
281
m_internal_handler= handler;
284
bool Session::handle_error(uint32_t sql_errno, const char *message,
285
DRIZZLE_ERROR::enum_warning_level level)
287
if (m_internal_handler)
289
return m_internal_handler->handle_error(sql_errno, message, level, this);
292
return false; // 'false', as per coding style
295
void Session::setAbort(bool arg)
297
mysys_var->abort= arg;
300
void Session::lockOnSys()
306
boost::mutex::scoped_lock scopedLock(mysys_var->mutex);
307
if (mysys_var->current_cond)
309
mysys_var->current_mutex->lock();
310
pthread_cond_broadcast(mysys_var->current_cond->native_handle());
311
mysys_var->current_mutex->unlock();
315
void Session::pop_internal_handler()
317
assert(m_internal_handler != NULL);
318
m_internal_handler= NULL;
321
void Session::get_xid(DRIZZLE_XID *xid)
323
*xid = *(DRIZZLE_XID *) &transaction.xid_state.xid;
643
Init THD for query processing.
644
This has to be called once before we call mysql_parse.
645
See also comments in sql_class.h.
648
void THD::init_for_queries()
651
ha_enable_transaction(this,true);
653
reset_root_defaults(mem_root, variables.query_alloc_block_size,
654
variables.query_prealloc_size);
655
reset_root_defaults(&transaction.mem_root,
656
variables.trans_alloc_block_size,
657
variables.trans_prealloc_size);
658
transaction.xid_state.xid.null();
659
transaction.xid_state.in_thd=1;
326
663
/* Do operations that may take a long time */
328
void Session::cleanup(void)
665
void THD::cleanup(void)
330
assert(cleanup_done == false);
667
assert(cleanup_done == 0);
332
669
killed= KILL_CONNECTION;
333
670
#ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE
461
851
assert(thread_stack);
463
currentSession().release();
464
currentSession().reset(this);
466
currentMemRoot().release();
467
currentMemRoot().reset(&mem_root);
853
if (my_pthread_setspecific_ptr(THR_THD, this) ||
854
my_pthread_setspecific_ptr(THR_MALLOC, &mem_root))
469
856
mysys_var=my_thread_var;
472
858
Let mysqld define the thread id (not mysys)
473
This allows us to move Session to different threads if needed.
859
This allows us to move THD to different threads if needed.
475
861
mysys_var->id= thread_id;
862
real_id= pthread_self(); // For debugging
478
We have to call thr_lock_info_init() again here as Session may have been
865
We have to call thr_lock_info_init() again here as THD may have been
479
866
created in another thread
868
thr_lock_info_init(&lock_info);
487
Init Session for query processing.
488
This has to be called once before we call mysql_parse.
489
See also comments in session.h.
877
THD::cleanup_after_query()
880
This function is used to reset thread data to its default state.
883
This function is not suitable for setting thread data to some
884
non-default values, as there is only one replication thread, so
885
different master threads may overwrite data of each other on
492
void Session::prepareForQueries()
494
if (variables.max_join_size == HA_POS_ERROR)
495
options |= OPTION_BIG_SELECTS;
497
version= refresh_version;
502
mem_root->reset_root_defaults(variables.query_alloc_block_size,
503
variables.query_prealloc_size);
504
transaction.xid_state.xid.null();
505
transaction.xid_state.in_session=1;
510
bool Session::initGlobals()
514
disconnect(ER_OUT_OF_RESOURCES, true);
515
status_var.aborted_connects++;
523
if (initGlobals() || authenticate())
531
while (! client->haveError() && killed != KILL_CONNECTION)
533
if (! executeStatement())
540
bool Session::schedule()
542
scheduler= plugin::Scheduler::getScheduler();
545
connection_count.increment();
547
if (connection_count > current_global_counters.max_used_connections)
549
current_global_counters.max_used_connections= connection_count;
552
current_global_counters.connections++;
553
thread_id= variables.pseudo_thread_id= global_thread_id++;
555
LOCK_thread_count.lock();
556
getSessionList().push_back(this);
557
LOCK_thread_count.unlock();
559
if (scheduler->addSession(this))
561
DRIZZLE_CONNECTION_START(thread_id);
562
char error_message_buff[DRIZZLE_ERRMSG_SIZE];
564
killed= Session::KILL_CONNECTION;
566
status_var.aborted_connects++;
568
/* Can't use my_error() since store_globals has not been called. */
569
/* TODO replace will better error message */
570
snprintf(error_message_buff, sizeof(error_message_buff),
571
ER(ER_CANT_CREATE_THREAD), 1);
572
client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
580
const char* Session::enter_cond(boost::condition_variable &cond, boost::mutex &mutex, const char* msg)
582
const char* old_msg = get_proc_info();
583
safe_mutex_assert_owner(mutex);
584
mysys_var->current_mutex = &mutex;
585
mysys_var->current_cond = &cond;
586
this->set_proc_info(msg);
590
void Session::exit_cond(const char* old_msg)
593
Putting the mutex unlock in exit_cond() ensures that
594
mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
595
locked (if that would not be the case, you'll get a deadlock if someone
596
does a Session::awake() on you).
598
mysys_var->current_mutex->unlock();
599
boost::mutex::scoped_lock scopedLock(mysys_var->mutex);
600
mysys_var->current_mutex = 0;
601
mysys_var->current_cond = 0;
602
this->set_proc_info(old_msg);
605
bool Session::authenticate()
608
if (client->authenticate())
611
status_var.aborted_connects++;
616
bool Session::checkUser(const char *passwd, uint32_t passwd_len, const char *in_db)
618
const string passwd_str(passwd, passwd_len);
619
bool is_authenticated=
620
plugin::Authentication::isAuthenticated(getSecurityContext(),
623
if (is_authenticated != true)
625
status_var.access_denied++;
626
/* isAuthenticated has pushed the error message */
630
/* Change database if necessary */
631
if (in_db && in_db[0])
633
SchemaIdentifier identifier(in_db);
634
if (mysql_change_db(this, identifier))
636
/* mysql_change_db() has pushed the error message. */
641
password= test(passwd_len); // remember for error messages
643
/* Ready to handle queries */
647
bool Session::executeStatement()
650
uint32_t packet_length;
652
enum enum_server_command l_command;
655
indicator of uninitialized lex => normal flow of errors handling
658
lex->current_select= 0;
660
main_da.reset_diagnostics_area();
662
if (client->readCommand(&l_packet, &packet_length) == false)
665
if (killed == KILL_CONNECTION)
668
if (packet_length == 0)
671
l_command= (enum enum_server_command) (unsigned char) l_packet[0];
673
if (command >= COM_END)
674
command= COM_END; // Wrong command
676
assert(packet_length);
677
return ! dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
680
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
682
/* Remove garbage at start and end of query */
683
while (in_packet_length > 0 && my_isspace(charset(), in_packet[0]))
688
const char *pos= in_packet + in_packet_length; /* Point at end null */
689
while (in_packet_length > 0 &&
690
(pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
696
query.assign(in_packet, in_packet + in_packet_length);
701
bool Session::endTransaction(enum enum_mysql_completiontype completion)
705
TransactionServices &transaction_services= TransactionServices::singleton();
707
if (transaction.xid_state.xa_state != XA_NOTR)
709
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
716
* We don't use endActiveTransaction() here to ensure that this works
717
* even if there is a problem with the OPTION_AUTO_COMMIT flag
718
* (Which of course should never happen...)
720
server_status&= ~SERVER_STATUS_IN_TRANS;
721
if (transaction_services.commitTransaction(this, true))
723
options&= ~(OPTION_BEGIN);
726
do_release= 1; /* fall through */
727
case COMMIT_AND_CHAIN:
728
result= endActiveTransaction();
729
if (result == true && completion == COMMIT_AND_CHAIN)
730
result= startTransaction();
732
case ROLLBACK_RELEASE:
733
do_release= 1; /* fall through */
735
case ROLLBACK_AND_CHAIN:
737
server_status&= ~SERVER_STATUS_IN_TRANS;
738
if (transaction_services.rollbackTransaction(this, true))
740
options&= ~(OPTION_BEGIN);
741
if (result == true && (completion == ROLLBACK_AND_CHAIN))
742
result= startTransaction();
746
my_error(ER_UNKNOWN_COM_ERROR, MYF(0));
751
my_error(killed_errno(), MYF(0));
752
else if ((result == true) && do_release)
753
killed= Session::KILL_CONNECTION;
758
bool Session::endActiveTransaction()
761
TransactionServices &transaction_services= TransactionServices::singleton();
763
if (transaction.xid_state.xa_state != XA_NOTR)
765
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
768
if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
770
server_status&= ~SERVER_STATUS_IN_TRANS;
771
if (transaction_services.commitTransaction(this, true))
774
options&= ~(OPTION_BEGIN);
778
bool Session::startTransaction(start_transaction_option_t opt)
782
if (! endActiveTransaction())
788
options|= OPTION_BEGIN;
789
server_status|= SERVER_STATUS_IN_TRANS;
791
if (plugin::TransactionalStorageEngine::notifyStartTransaction(this, opt))
800
void Session::cleanup_after_query()
803
Reset rand_used so that detection of calls to rand() will save random
889
void THD::cleanup_after_query()
892
Reset rand_used so that detection of calls to rand() will save random
804
893
seeds if needed by the slave.
807
896
/* Forget those values, for next binlogger: */
897
stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
808
898
auto_inc_intervals_in_cur_stmt_for_binlog.empty();
810
901
if (first_successful_insert_id_in_cur_stmt > 0)
812
903
/* set what LAST_INSERT_ID() will return */
813
first_successful_insert_id_in_prev_stmt= first_successful_insert_id_in_cur_stmt;
904
first_successful_insert_id_in_prev_stmt=
905
first_successful_insert_id_in_cur_stmt;
814
906
first_successful_insert_id_in_cur_stmt= 0;
815
907
substitute_null_with_insert_id= true;
817
arg_of_last_insert_id_function= false;
909
arg_of_last_insert_id_function= 0;
818
910
/* Free Items that were created during this execution */
820
912
/* Reset where. */
821
where= Session::DEFAULT_WHERE;
823
/* Reset the temporary shares we built */
824
for (std::vector<TableShareInstance *>::iterator iter= temporary_shares.begin();
825
iter != temporary_shares.end(); iter++)
829
temporary_shares.clear();
913
where= THD::DEFAULT_WHERE;
833
918
Create a LEX_STRING in this connection.
1939
/***************************************************************************
1940
Dump of select to variables
1941
***************************************************************************/
1943
int select_dumpvar::prepare(List<Item> &list, SELECT_LEX_UNIT *u)
1947
if (var_list.elements != list.elements)
1949
my_message(ER_WRONG_NUMBER_OF_COLUMNS_IN_SELECT,
1950
ER(ER_WRONG_NUMBER_OF_COLUMNS_IN_SELECT), MYF(0));
1957
bool select_dumpvar::check_simple_select() const
1959
my_error(ER_SP_BAD_CURSOR_SELECT, MYF(0));
1964
void select_dumpvar::cleanup()
1970
void Query_arena::free_items()
1973
/* This works because items are allocated with sql_alloc() */
1974
for (; free_list; free_list= next)
1976
next= free_list->next;
1977
free_list->delete_self();
1979
/* Postcondition: free_list is 0 */
1988
Statement::Statement(LEX *lex_arg, MEM_ROOT *mem_root_arg, ulong id_arg)
1989
:Query_arena(mem_root_arg),
1991
mark_used_columns(MARK_COLUMNS_READ),
1494
2002
Don't free mem_root, as mem_root is freed in the end of dispatch_command
1495
2003
(once for any command).
1497
void Session::end_statement()
2005
void THD::end_statement()
1499
2007
/* Cleanup SQL processing state to reuse this statement in next query. */
1501
query_cache_key= ""; // reset the cache key
1502
resetResultsetMessage();
1505
bool Session::copy_db_to(char **p_db, size_t *p_db_length)
2012
bool THD::copy_db_to(char **p_db, size_t *p_db_length)
1509
2016
my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1512
*p_db= strmake(db.c_str(), db.length());
1513
*p_db_length= db.length();
2019
*p_db= strmake(db, db_length);
2020
*p_db_length= db_length;
2025
bool select_dumpvar::send_data(List<Item> &items)
2027
List_iterator_fast<my_var> var_li(var_list);
2028
List_iterator<Item> it(items);
2032
if (unit->offset_limit_cnt)
2033
{ // using limit offset,count
2034
unit->offset_limit_cnt--;
2039
my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0));
2042
while ((mv= var_li++) && (item= it++))
2046
Item_func_set_user_var *suv= new Item_func_set_user_var(mv->s, item);
2047
suv->fix_fields(thd, 0);
2052
return(thd->is_error());
2055
bool select_dumpvar::send_eof()
2058
push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
2059
ER_SP_FETCH_NO_DATA, ER(ER_SP_FETCH_NO_DATA));
2061
In order to remember the value of affected rows for ROW_COUNT()
2062
function, SELECT INTO has to have an own SQLCOM.
2063
TODO: split from SQLCOM_SELECT
2065
::my_ok(thd,row_count);
1517
2069
/****************************************************************************
1519
2071
****************************************************************************/
1521
void Tmp_Table_Param::init()
2073
void TMP_TABLE_PARAM::init()
1523
2075
field_count= sum_func_count= func_count= hidden_field_count= 0;
1524
2076
group_parts= group_length= group_null_parts= 0;
1525
2077
quick_group= 1;
1526
2078
table_charset= 0;
1527
2079
precomputed_group_by= 0;
2080
bit_fields_as_long= 0;
1530
void Tmp_Table_Param::cleanup(void)
2085
void thd_increment_bytes_sent(ulong length)
1532
/* Fix for Intel compiler */
1535
delete [] copy_field;
1536
save_copy_field= copy_field= 0;
2087
THD *thd=current_thd;
2088
if (likely(thd != 0))
2089
{ /* current_thd==0 when close_connection() calls net_send_error() */
2090
thd->status_var.bytes_sent+= length;
1540
void Session::send_kill_message() const
2095
void thd_increment_bytes_received(ulong length)
2097
current_thd->status_var.bytes_received+= length;
2101
void thd_increment_net_big_packet_count(ulong length)
2103
current_thd->status_var.net_big_packet_count+= length;
2106
void THD::send_kill_message() const
1542
2108
int err= killed_errno();
1544
2110
my_message(err, ER(err), MYF(0));
1547
void Session::set_status_var_init()
2113
void THD::set_status_var_init()
1549
2115
memset(&status_var, 0, sizeof(status_var));
1553
bool Session::set_db(const std::string &new_db)
1555
/* Do not reallocate memory if current chunk is big enough. */
1556
if (new_db.length())
2119
void Security_context::init()
2125
void Security_context::destroy()
2127
// If not pointer to constant
2141
void Security_context::skip_grants()
2143
/* privileges for the user are unknown everything is allowed */
2147
/****************************************************************************
2148
Handling of open and locked tables states.
2150
This is used when we want to open/lock (and then close) some tables when
2151
we already have a set of tables open and locked. We use these methods for
2152
access to mysql.proc table to find definitions of stored routines.
2153
****************************************************************************/
2155
void THD::reset_n_backup_open_tables_state(Open_tables_state *backup)
2157
backup->set_open_tables_state(this);
2158
reset_open_tables_state();
2159
state_flags|= Open_tables_state::BACKUPS_AVAIL;
2164
void THD::restore_backup_open_tables_state(Open_tables_state *backup)
2167
Before we will throw away current open tables state we want
2168
to be sure that it was properly cleaned up.
2170
assert(open_tables == 0 && temporary_tables == 0 &&
2171
handler_tables == 0 && derived_tables == 0 &&
2172
lock == 0 && locked_tables == 0);
2173
set_open_tables_state(backup);
1568
2178
Check the killed state of a user thread
1569
@param session user thread
2179
@param thd user thread
1570
2180
@retval 0 the user thread is active
1571
2181
@retval 1 the user thread has been killed
1573
int session_killed(const Session *session)
1575
return(session->killed);
1579
const struct charset_info_st *session_charset(Session *session)
1581
return(session->charset());
2183
extern "C" int thd_killed(const DRIZZLE_THD thd)
2185
return(thd->killed);
2189
Return the thread id of a user thread
2190
@param thd user thread
2193
extern "C" unsigned long thd_get_thread_id(const DRIZZLE_THD thd)
2195
return((unsigned long)thd->thread_id);
2199
#ifdef INNODB_COMPATIBILITY_HOOKS
2200
extern "C" const struct charset_info_st *thd_charset(DRIZZLE_THD thd)
2202
return(thd->charset());
2205
extern "C" char **thd_query(DRIZZLE_THD thd)
2207
return(&thd->query);
2210
extern "C" int thd_slave_thread(const DRIZZLE_THD thd)
2212
return(thd->slave_thread);
2215
extern "C" int thd_non_transactional_update(const DRIZZLE_THD thd)
2217
return(thd->transaction.all.modified_non_trans_table);
2220
extern "C" int thd_binlog_format(const DRIZZLE_THD thd)
2222
return (int) thd->variables.binlog_format;
2225
extern "C" void thd_mark_transaction_to_rollback(DRIZZLE_THD thd, bool all)
2227
mark_transaction_to_rollback(thd, all);
2229
#endif // INNODB_COMPATIBILITY_HOOKS */
1585
2233
Mark transaction to rollback and mark error as fatal to a sub-statement.
1587
@param session Thread handle
2235
@param thd Thread handle
1588
2236
@param all true <=> rollback main transaction.
1590
void mark_transaction_to_rollback(Session *session, bool all)
1594
session->is_fatal_sub_stmt_error= true;
1595
session->transaction_rollback_request= all;
1599
void Session::disconnect(uint32_t errcode, bool should_lock)
1601
/* Allow any plugins to cleanup their session variables */
1602
plugin_sessionvar_cleanup(this);
1604
/* If necessary, log any aborted or unauthorized connections */
1605
if (killed || client->wasAborted())
1607
status_var.aborted_threads++;
1610
if (client->wasAborted())
1612
if (! killed && variables.log_warnings > 1)
1614
SecurityContext *sctx= &security_ctx;
1616
errmsg_printf(ERRMSG_LVL_WARN, ER(ER_NEW_ABORTING_CONNECTION)
1618
, (db.empty() ? "unconnected" : db.c_str())
1619
, sctx->getUser().empty() == false ? sctx->getUser().c_str() : "unauthenticated"
1620
, sctx->getIp().c_str()
1621
, (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1625
/* Close out our connection to the client */
1627
LOCK_thread_count.lock();
1628
killed= Session::KILL_CONNECTION;
1629
if (client->isConnected())
1633
/*my_error(errcode, ER(errcode));*/
1634
client->sendError(errcode, ER(errcode));
1639
(void) LOCK_thread_count.unlock();
1642
void Session::reset_for_next_command()
1647
Those two lines below are theoretically unneeded as
1648
Session::cleanup_after_query() should take care of this already.
1650
auto_inc_intervals_in_cur_stmt_for_binlog.empty();
1652
is_fatal_error= false;
1653
server_status&= ~ (SERVER_MORE_RESULTS_EXISTS |
1654
SERVER_QUERY_NO_INDEX_USED |
1655
SERVER_QUERY_NO_GOOD_INDEX_USED);
1658
main_da.reset_diagnostics_area();
1659
total_warn_count=0; // Warnings for this query
1660
sent_row_count= examined_row_count= 0;
1664
Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
1667
void Session::close_temporary_tables()
1672
if (not temporary_tables)
1675
for (table= temporary_tables; table; table= tmp_next)
1677
tmp_next= table->getNext();
1680
temporary_tables= NULL;
1684
unlink from session->temporary tables and close temporary table
1687
void Session::close_temporary_table(Table *table)
1689
if (table->getPrev())
1691
table->getPrev()->setNext(table->getNext());
1692
if (table->getPrev()->getNext())
1694
table->getNext()->setPrev(table->getPrev());
1699
/* removing the item from the list */
1700
assert(table == temporary_tables);
1702
slave must reset its temporary list pointer to zero to exclude
1703
passing non-zero value to end_slave via rli->save_temporary_tables
1704
when no temp tables opened, see an invariant below.
1706
temporary_tables= table->getNext();
1707
if (temporary_tables)
1709
table->getNext()->setPrev(NULL);
1716
Close and drop a temporary table
1719
This dosn't unlink table from session->temporary
1720
If this is needed, use close_temporary_table()
1723
void Session::nukeTable(Table *table)
1725
plugin::StorageEngine *table_type= table->getShare()->db_type();
1727
table->free_io_cache();
1728
table->delete_table();
1730
TableIdentifier identifier(table->getShare()->getSchemaName(), table->getShare()->getTableName(), table->getShare()->getPath());
1731
rm_temporary_table(table_type, identifier);
1733
delete table->getMutableShare();
1735
/* This makes me sad, but we're allocating it via malloc */
1739
/** Clear most status variables. */
1740
extern time_t flush_status_time;
1742
void Session::refresh_status()
1744
/* Reset thread's status variables */
1745
memset(&status_var, 0, sizeof(status_var));
1747
flush_status_time= time((time_t*) 0);
1748
current_global_counters.max_used_connections= 1; /* We set it to one, because we know we exist */
1749
current_global_counters.connections= 0;
1752
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1754
user_var_entry *entry= NULL;
1755
UserVarsRange ppp= user_vars.equal_range(std::string(name.str, name.length));
1757
for (UserVars::iterator iter= ppp.first;
1758
iter != ppp.second; ++iter)
1760
entry= (*iter).second;
1763
if ((entry == NULL) && create_if_not_exists)
1765
entry= new (nothrow) user_var_entry(name.str, query_id);
1770
std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(std::string(name.str, name.length), entry));
1772
if (not returnable.second)
1782
void Session::mark_temp_tables_as_free_for_reuse()
1784
for (Table *table= temporary_tables ; table ; table= table->getNext())
1786
if (table->query_id == query_id)
1789
table->cursor->ha_reset();
1794
void Session::mark_used_tables_as_free_for_reuse(Table *table)
1796
for (; table ; table= table->getNext())
1798
if (table->query_id == query_id)
1801
table->cursor->ha_reset();
1807
Unlocks tables and frees derived tables.
1808
Put all normal tables used by thread in free list.
1810
It will only close/mark as free for reuse tables opened by this
1811
substatement, it will also check if we are closing tables after
1812
execution of complete query (i.e. we are on upper level) and will
1813
leave prelocked mode if needed.
1815
void Session::close_thread_tables()
1818
derived_tables= NULL; // They should all be invalid by this point
1821
Mark all temporary tables used by this statement as free for reuse.
1823
mark_temp_tables_as_free_for_reuse();
1825
Let us commit transaction for statement. Since in 5.0 we only have
1826
one statement transaction and don't allow several nested statement
1827
transactions this call will do nothing if we are inside of stored
1828
function or trigger (i.e. statement transaction is already active and
1829
does not belong to statement for which we do close_thread_tables()).
1830
TODO: This should be fixed in later releases.
1833
TransactionServices &transaction_services= TransactionServices::singleton();
1834
main_da.can_overwrite_status= true;
1835
transaction_services.autocommitOrRollback(this, is_error());
1836
main_da.can_overwrite_status= false;
1837
transaction.stmt.reset();
1843
For RBR we flush the pending event just before we unlock all the
1844
tables. This means that we are at the end of a topmost
1845
statement, so we ensure that the STMT_END_F flag is set on the
1846
pending event. For statements that are *inside* stored
1847
functions, the pending event will not be flushed: that will be
1848
handled either before writing a query log event (inside
1849
binlog_query()) or when preparing a pending event.
1851
mysql_unlock_tables(this, lock);
1855
Note that we need to hold LOCK_open while changing the
1856
open_tables list. Another thread may work on it.
1857
(See: remove_table_from_cache(), mysql_wait_completed_table())
1858
Closing a MERGE child before the parent would be fatal if the
1859
other thread tries to abort the MERGE lock in between.
1862
close_open_tables();
1865
void Session::close_tables_for_reopen(TableList **tables)
1868
If table list consists only from tables from prelocking set, table list
1869
for new attempt should be empty, so we have to update list's root pointer.
1871
if (lex->first_not_own_table() == *tables)
1873
lex->chop_off_not_own_tables();
1874
for (TableList *tmp= *tables; tmp; tmp= tmp->next_global)
1876
close_thread_tables();
1879
bool Session::openTablesLock(TableList *tables)
1886
if (open_tables_from_list(&tables, &counter))
1889
if (not lock_tables(tables, counter, &need_reopen))
1891
if (not need_reopen)
1893
close_tables_for_reopen(&tables);
1895
if ((mysql_handle_derived(lex, &mysql_derived_prepare) ||
1896
(fill_derived_tables() &&
1897
mysql_handle_derived(lex, &mysql_derived_filling))))
1903
bool Session::openTables(TableList *tables, uint32_t flags)
1906
bool ret= fill_derived_tables();
1907
assert(ret == false);
1908
if (open_tables_from_list(&tables, &counter, flags) ||
1909
mysql_handle_derived(lex, &mysql_derived_prepare))
1917
@note "best_effort" is used in cases were if a failure occurred on this
1918
operation it would not be surprising because we are only removing because there
1919
might be an issue (lame engines).
1922
bool Session::rm_temporary_table(TableIdentifier &identifier, bool best_effort)
1924
if (plugin::StorageEngine::dropTable(*this, identifier))
1926
if (not best_effort)
1928
errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
1929
identifier.getSQLPath().c_str(), errno);
1938
bool Session::rm_temporary_table(plugin::StorageEngine *base, TableIdentifier &identifier)
1942
if (plugin::StorageEngine::dropTable(*this, *base, identifier))
1944
errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
1945
identifier.getSQLPath().c_str(), errno);
1954
@note this will be removed, I am looking through Hudson to see if it is finding
1955
any tables that are missed during cleanup.
1957
void Session::dumpTemporaryTableNames(const char *foo)
1961
if (not temporary_tables)
1964
cerr << "Begin Run: " << foo << "\n";
1965
for (table= temporary_tables; table; table= table->getNext())
1967
bool have_proto= false;
1969
message::Table *proto= table->getShare()->getTableProto();
1970
if (table->getShare()->getTableProto())
1973
const char *answer= have_proto ? "true" : "false";
1977
cerr << "\tTable Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
1978
cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
1981
cerr << "\tTabl;e Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
1985
bool Session::storeTableMessage(const TableIdentifier &identifier, message::Table &table_message)
1987
table_message_cache.insert(make_pair(identifier.getPath(), table_message));
1992
bool Session::removeTableMessage(const TableIdentifier &identifier)
1994
TableMessageCache::iterator iter;
1996
iter= table_message_cache.find(identifier.getPath());
1998
if (iter == table_message_cache.end())
2001
table_message_cache.erase(iter);
2006
bool Session::getTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2008
TableMessageCache::iterator iter;
2010
iter= table_message_cache.find(identifier.getPath());
2012
if (iter == table_message_cache.end())
2015
table_message.CopyFrom(((*iter).second));
2020
bool Session::doesTableMessageExist(const TableIdentifier &identifier)
2022
TableMessageCache::iterator iter;
2024
iter= table_message_cache.find(identifier.getPath());
2026
if (iter == table_message_cache.end())
2034
bool Session::renameTableMessage(const TableIdentifier &from, const TableIdentifier &to)
2036
TableMessageCache::iterator iter;
2038
table_message_cache[to.getPath()]= table_message_cache[from.getPath()];
2040
iter= table_message_cache.find(to.getPath());
2042
if (iter == table_message_cache.end())
2047
(*iter).second.set_schema(to.getSchemaName());
2048
(*iter).second.set_name(to.getTableName());
2053
TableShareInstance *Session::getTemporaryShare(TableIdentifier::Type type_arg)
2055
temporary_shares.push_back(new TableShareInstance(type_arg)); // This will not go into the tableshare cache, so no key is used.
2057
TableShareInstance *tmp_share= temporary_shares.back();
2064
} /* namespace drizzled */
2239
void mark_transaction_to_rollback(THD *thd, bool all)
2243
thd->is_fatal_sub_stmt_error= true;
2244
thd->transaction_rollback_request= all;
2247
/***************************************************************************
2248
Handling of XA id cacheing
2249
***************************************************************************/
2251
pthread_mutex_t LOCK_xid_cache;
2254
extern "C" unsigned char *xid_get_hash_key(const unsigned char *, size_t *, bool);
2255
extern "C" void xid_free_hash(void *);
2257
unsigned char *xid_get_hash_key(const unsigned char *ptr, size_t *length,
2258
bool not_used __attribute__((unused)))
2260
*length=((XID_STATE*)ptr)->xid.key_length();
2261
return ((XID_STATE*)ptr)->xid.key();
2264
void xid_free_hash(void *ptr)
2266
if (!((XID_STATE*)ptr)->in_thd)
2267
free((unsigned char*)ptr);
2270
bool xid_cache_init()
2272
pthread_mutex_init(&LOCK_xid_cache, MY_MUTEX_INIT_FAST);
2273
return hash_init(&xid_cache, &my_charset_bin, 100, 0, 0,
2274
xid_get_hash_key, xid_free_hash, 0) != 0;
2277
void xid_cache_free()
2279
if (hash_inited(&xid_cache))
2281
hash_free(&xid_cache);
2282
pthread_mutex_destroy(&LOCK_xid_cache);
2286
XID_STATE *xid_cache_search(XID *xid)
2288
pthread_mutex_lock(&LOCK_xid_cache);
2289
XID_STATE *res=(XID_STATE *)hash_search(&xid_cache, xid->key(), xid->key_length());
2290
pthread_mutex_unlock(&LOCK_xid_cache);
2295
bool xid_cache_insert(XID *xid, enum xa_states xa_state)
2299
pthread_mutex_lock(&LOCK_xid_cache);
2300
if (hash_search(&xid_cache, xid->key(), xid->key_length()))
2302
else if (!(xs=(XID_STATE *)my_malloc(sizeof(*xs), MYF(MY_WME))))
2306
xs->xa_state=xa_state;
2309
res=my_hash_insert(&xid_cache, (unsigned char*)xs);
2311
pthread_mutex_unlock(&LOCK_xid_cache);
2316
bool xid_cache_insert(XID_STATE *xid_state)
2318
pthread_mutex_lock(&LOCK_xid_cache);
2319
assert(hash_search(&xid_cache, xid_state->xid.key(),
2320
xid_state->xid.key_length())==0);
2321
bool res=my_hash_insert(&xid_cache, (unsigned char*)xid_state);
2322
pthread_mutex_unlock(&LOCK_xid_cache);
2327
void xid_cache_delete(XID_STATE *xid_state)
2329
pthread_mutex_lock(&LOCK_xid_cache);
2330
hash_delete(&xid_cache, (unsigned char *)xid_state);
2331
pthread_mutex_unlock(&LOCK_xid_cache);
2335
Implementation of interface to write rows to the binary log through the
2336
thread. The thread is responsible for writing the rows it has
2337
inserted/updated/deleted.
2342
Template member function for ensuring that there is an rows log
2343
event of the apropriate type before proceeding.
2346
- Events of type 'RowEventT' have the type code 'type_code'.
2349
If a non-NULL pointer is returned, the pending event for thread 'thd' will
2350
be an event of type 'RowEventT' (which have the type code 'type_code')
2351
will either empty or have enough space to hold 'needed' bytes. In
2352
addition, the columns bitmap will be correct for the row, meaning that
2353
the pending event will be flushed if the columns in the event differ from
2354
the columns suppled to the function.
2357
If no error, a non-NULL pending event (either one which already existed or
2358
the newly created one).
2362
template <class RowsEventT> Rows_log_event*
2363
THD::binlog_prepare_pending_rows_event(Table* table, uint32_t serv_id,
2365
bool is_transactional,
2366
RowsEventT *hint __attribute__((unused)))
2368
/* Pre-conditions */
2369
assert(table->s->table_map_id != UINT32_MAX);
2371
/* Fetch the type code for the RowsEventT template parameter */
2372
int const type_code= RowsEventT::TYPE_CODE;
2375
There is no good place to set up the transactional data, so we
2378
if (binlog_setup_trx_data())
2381
Rows_log_event* pending= binlog_get_pending_rows_event();
2383
if (unlikely(pending && !pending->is_valid()))
2387
Check if the current event is non-NULL and a write-rows
2388
event. Also check if the table provided is mapped: if it is not,
2389
then we have switched to writing to a new table.
2390
If there is no pending event, we need to create one. If there is a pending
2391
event, but it's not about the same table id, or not of the same type
2392
(between Write, Update and Delete), or not the same affected columns, or
2393
going to be too big, flush this event to disk and create a new pending
2396
The last test is necessary for the Cluster injector to work
2397
correctly. The reason is that the Cluster can inject two write
2398
rows with different column bitmaps if there is an insert followed
2399
by an update in the same transaction, and these are grouped into a
2400
single epoch/transaction when fed to the injector.
2402
TODO: Fix the code so that the last test can be removed.
2405
pending->server_id != serv_id ||
2406
pending->get_table_id() != table->s->table_map_id ||
2407
pending->get_type_code() != type_code ||
2408
pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
2409
!bitmap_cmp(pending->get_cols(), table->write_set))
2411
/* Create a new RowsEventT... */
2412
Rows_log_event* const
2413
ev= new RowsEventT(this, table, table->s->table_map_id,
2417
ev->server_id= serv_id; // I don't like this, it's too easy to forget.
2419
flush the pending event and replace it with the newly created
2422
if (unlikely(mysql_bin_log.flush_and_set_pending_rows_event(this, ev)))
2428
return(ev); /* This is the new pending event */
2430
return(pending); /* This is the current pending event */
2433
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
2435
Instantiate the versions we need, we have -fno-implicit-template as
2438
template Rows_log_event*
2439
THD::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
2440
Write_rows_log_event*);
2442
template Rows_log_event*
2443
THD::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
2444
Delete_rows_log_event *);
2446
template Rows_log_event*
2447
THD::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
2448
Update_rows_log_event *);
2453
Class to handle temporary allocation of memory for row data.
2455
The responsibilities of the class is to provide memory for
2456
packing one or two rows of packed data (depending on what
2457
constructor is called).
2459
In order to make the allocation more efficient for "simple" rows,
2460
i.e., rows that do not contain any blobs, a pointer to the
2461
allocated memory is of memory is stored in the table structure
2462
for simple rows. If memory for a table containing a blob field
2463
is requested, only memory for that is allocated, and subsequently
2464
released when the object is destroyed.
2467
class Row_data_memory {
2470
Build an object to keep track of a block-local piece of memory
2471
for storing a row of data.
2474
Table where the pre-allocated memory is stored.
2477
Length of data that is needed, if the record contain blobs.
2479
Row_data_memory(Table *table, size_t const len1)
2482
m_alloc_checked= false;
2483
allocate_memory(table, len1);
2484
m_ptr[0]= has_memory() ? m_memory : 0;
2488
Row_data_memory(Table *table, size_t const len1, size_t const len2)
2491
m_alloc_checked= false;
2492
allocate_memory(table, len1 + len2);
2493
m_ptr[0]= has_memory() ? m_memory : 0;
2494
m_ptr[1]= has_memory() ? m_memory + len1 : 0;
2499
if (m_memory != 0 && m_release_memory_on_destruction)
2500
free((unsigned char*) m_memory);
2504
Is there memory allocated?
2506
@retval true There is memory allocated
2507
@retval false Memory allocation failed
2509
bool has_memory() const {
2510
m_alloc_checked= true;
2511
return m_memory != 0;
2514
unsigned char *slot(uint32_t s)
2516
assert(s < sizeof(m_ptr)/sizeof(*m_ptr));
2517
assert(m_ptr[s] != 0);
2518
assert(m_alloc_checked == true);
2523
void allocate_memory(Table *const table, size_t const total_length)
2525
if (table->s->blob_fields == 0)
2528
The maximum length of a packed record is less than this
2529
length. We use this value instead of the supplied length
2530
when allocating memory for records, since we don't know how
2531
the memory will be used in future allocations.
2533
Since table->s->reclength is for unpacked records, we have
2534
to add two bytes for each field, which can potentially be
2535
added to hold the length of a packed field.
2537
size_t const maxlen= table->s->reclength + 2 * table->s->fields;
2540
Allocate memory for two records if memory hasn't been
2541
allocated. We allocate memory for two records so that it can
2542
be used when processing update rows as well.
2544
if (table->write_row_record == 0)
2545
table->write_row_record=
2546
(unsigned char *) alloc_root(&table->mem_root, 2 * maxlen);
2547
m_memory= table->write_row_record;
2548
m_release_memory_on_destruction= false;
2552
m_memory= (unsigned char *) my_malloc(total_length, MYF(MY_WME));
2553
m_release_memory_on_destruction= true;
2557
mutable bool m_alloc_checked;
2558
bool m_release_memory_on_destruction;
2559
unsigned char *m_memory;
2560
unsigned char *m_ptr[2];
2565
int THD::binlog_write_row(Table* table, bool is_trans,
2566
unsigned char const *record)
2568
assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
2571
Pack records into format for transfer. We are allocating more
2572
memory than needed, but that doesn't matter.
2574
Row_data_memory memory(table, table->max_row_length(record));
2575
if (!memory.has_memory())
2576
return HA_ERR_OUT_OF_MEM;
2578
unsigned char *row_data= memory.slot(0);
2580
size_t const len= pack_row(table, table->write_set, row_data, record);
2582
Rows_log_event* const ev=
2583
binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
2584
static_cast<Write_rows_log_event*>(0));
2586
if (unlikely(ev == 0))
2587
return HA_ERR_OUT_OF_MEM;
2589
return ev->add_row_data(row_data, len);
2592
int THD::binlog_update_row(Table* table, bool is_trans,
2593
const unsigned char *before_record,
2594
const unsigned char *after_record)
2596
assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
2598
size_t const before_maxlen = table->max_row_length(before_record);
2599
size_t const after_maxlen = table->max_row_length(after_record);
2601
Row_data_memory row_data(table, before_maxlen, after_maxlen);
2602
if (!row_data.has_memory())
2603
return HA_ERR_OUT_OF_MEM;
2605
unsigned char *before_row= row_data.slot(0);
2606
unsigned char *after_row= row_data.slot(1);
2608
size_t const before_size= pack_row(table, table->read_set, before_row,
2610
size_t const after_size= pack_row(table, table->write_set, after_row,
2613
Rows_log_event* const ev=
2614
binlog_prepare_pending_rows_event(table, server_id,
2615
before_size + after_size, is_trans,
2616
static_cast<Update_rows_log_event*>(0));
2618
if (unlikely(ev == 0))
2619
return HA_ERR_OUT_OF_MEM;
2622
ev->add_row_data(before_row, before_size) ||
2623
ev->add_row_data(after_row, after_size);
2626
int THD::binlog_delete_row(Table* table, bool is_trans,
2627
unsigned char const *record)
2629
assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
2632
Pack records into format for transfer. We are allocating more
2633
memory than needed, but that doesn't matter.
2635
Row_data_memory memory(table, table->max_row_length(record));
2636
if (unlikely(!memory.has_memory()))
2637
return HA_ERR_OUT_OF_MEM;
2639
unsigned char *row_data= memory.slot(0);
2641
size_t const len= pack_row(table, table->read_set, row_data, record);
2643
Rows_log_event* const ev=
2644
binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
2645
static_cast<Delete_rows_log_event*>(0));
2647
if (unlikely(ev == 0))
2648
return HA_ERR_OUT_OF_MEM;
2650
return ev->add_row_data(row_data, len);
2654
int THD::binlog_flush_pending_rows_event(bool stmt_end)
2657
We shall flush the pending event even if we are not in row-based
2658
mode: it might be the case that we left row-based mode before
2659
flushing anything (e.g., if we have explicitly locked tables).
2661
if (!mysql_bin_log.is_open())
2665
Mark the event as the last event of a statement if the stmt_end
2669
if (Rows_log_event *pending= binlog_get_pending_rows_event())
2673
pending->set_flags(Rows_log_event::STMT_END_F);
2674
pending->flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
2675
binlog_table_maps= 0;
2678
error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0);
2686
Member function that will log query, either row-based or
2687
statement-based depending on the value of the 'current_stmt_binlog_row_based'
2688
the value of the 'qtype' flag.
2690
This function should be called after the all calls to ha_*_row()
2691
functions have been issued, but before tables are unlocked and
2695
There shall be no writes to any system table after calling
2696
binlog_query(), so these writes has to be moved to before the call
2697
of binlog_query() for correct functioning.
2699
This is necessesary not only for RBR, but the master might crash
2700
after binlogging the query but before changing the system tables.
2701
This means that the slave and the master are not in the same state
2702
(after the master has restarted), so therefore we have to
2703
eliminate this problem.
2706
Error code, or 0 if no error.
2708
int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
2709
ulong query_len, bool is_trans, bool suppress_use,
2710
THD::killed_state killed_status_arg)
2712
assert(query_arg && mysql_bin_log.is_open());
2714
if (int error= binlog_flush_pending_rows_event(true))
2718
If we are in statement mode and trying to log an unsafe statement,
2719
we should print a warning.
2721
if (lex->is_stmt_unsafe() &&
2722
variables.binlog_format == BINLOG_FORMAT_STMT)
2724
assert(this->query != NULL);
2725
push_warning(this, DRIZZLE_ERROR::WARN_LEVEL_WARN,
2726
ER_BINLOG_UNSAFE_STATEMENT,
2727
ER(ER_BINLOG_UNSAFE_STATEMENT));
2728
if (!(binlog_flags & BINLOG_FLAG_UNSAFE_STMT_PRINTED))
2730
char warn_buf[DRIZZLE_ERRMSG_SIZE];
2731
snprintf(warn_buf, DRIZZLE_ERRMSG_SIZE, "%s Statement: %s",
2732
ER(ER_BINLOG_UNSAFE_STATEMENT), this->query);
2733
sql_print_warning(warn_buf);
2734
binlog_flags|= BINLOG_FLAG_UNSAFE_STMT_PRINTED;
2739
case THD::ROW_QUERY_TYPE:
2740
if (current_stmt_binlog_row_based)
2742
/* Otherwise, we fall through */
2743
case THD::DRIZZLE_QUERY_TYPE:
2745
Using this query type is a conveniece hack, since we have been
2746
moving back and forth between using RBR for replication of
2747
system tables and not using it.
2749
Make sure to change in check_table_binlog_row_based() according
2750
to how you treat this.
2752
case THD::STMT_QUERY_TYPE:
2754
The DRIZZLE_LOG::write() function will set the STMT_END_F flag and
2755
flush the pending rows event if necessary.
2758
Query_log_event qinfo(this, query_arg, query_len, is_trans, suppress_use,
2760
qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
2762
Binlog table maps will be irrelevant after a Query_log_event
2763
(they are just removed on the slave side) so after the query
2764
log event is written to the binary log, we pretend that no
2765
table maps were written.
2767
int error= mysql_bin_log.write(&qinfo);
2768
binlog_table_maps= 0;
2773
case THD::QUERY_TYPE_COUNT:
2775
assert(0 <= qtype && qtype < QUERY_TYPE_COUNT);
2780
bool Discrete_intervals_list::append(uint64_t start, uint64_t val,
2783
/* first, see if this can be merged with previous */
2784
if ((head == NULL) || tail->merge_if_contiguous(start, val, incr))
2786
/* it cannot, so need to add a new interval */
2787
Discrete_interval *new_interval= new Discrete_interval(start, val, incr);
2788
return(append(new_interval));
2793
bool Discrete_intervals_list::append(Discrete_interval *new_interval)
2795
if (unlikely(new_interval == NULL))
2798
head= current= new_interval;
2800
tail->next= new_interval;