48
83
const char * const Session::DEFAULT_WHERE= "field list";
51
/*****************************************************************************
52
** Instansiate templates
53
*****************************************************************************/
55
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
57
template class List<Key>;
58
template class List_iterator<Key>;
59
template class List<Key_part_spec>;
60
template class List_iterator<Key_part_spec>;
61
template class List<Alter_drop>;
62
template class List_iterator<Alter_drop>;
63
template class List<Alter_column>;
64
template class List_iterator<Alter_column>;
67
/****************************************************************************
69
****************************************************************************/
71
extern "C" unsigned char *get_var_key(user_var_entry *entry, size_t *length,
72
bool not_used __attribute__((unused)))
74
*length= entry->name.length;
75
return (unsigned char*) entry->name.str;
78
extern "C" void free_user_var(user_var_entry *entry)
80
char *pos= (char*) entry+ALIGN_SIZE(sizeof(*entry));
81
if (entry->value && entry->value != pos)
86
85
bool Key_part_spec::operator==(const Key_part_spec& other) const
88
87
return length == other.length &&
89
88
field_name.length == other.field_name.length &&
90
!strcmp(field_name.str, other.field_name.str);
94
Construct an (almost) deep copy of this key. Only those
95
elements that are known to never change are not copied.
96
If out of memory, a partial copy is returned and an error is set
100
Key::Key(const Key &rhs, MEM_ROOT *mem_root)
102
key_create_info(rhs.key_create_info),
103
columns(rhs.columns, mem_root),
105
generated(rhs.generated)
107
list_copy_and_replace_each_value(columns, mem_root);
111
Construct an (almost) deep copy of this foreign key. Only those
112
elements that are known to never change are not copied.
113
If out of memory, a partial copy is returned and an error is set
117
Foreign_key::Foreign_key(const Foreign_key &rhs, MEM_ROOT *mem_root)
119
ref_table(rhs.ref_table),
120
ref_columns(rhs.ref_columns),
121
delete_opt(rhs.delete_opt),
122
update_opt(rhs.update_opt),
123
match_opt(rhs.match_opt)
125
list_copy_and_replace_each_value(ref_columns, mem_root);
129
Test if a foreign key (= generated key) is a prefix of the given key
130
(ignoring key name, key type and order of columns)
133
This is only used to test if an index for a FOREIGN KEY exists
136
We only compare field names
139
0 Generated key is a prefix of other key
143
bool foreign_key_prefix(Key *a, Key *b)
145
/* Ensure that 'a' is the generated key */
148
if (b->generated && a->columns.elements > b->columns.elements)
149
std::swap(a, b); // Put shorter key in 'a'
154
return true; // No foreign key
155
std::swap(a, b); // Put generated key in 'a'
158
/* Test if 'a' is a prefix of 'b' */
159
if (a->columns.elements > b->columns.elements)
160
return true; // Can't be prefix
162
List_iterator<Key_part_spec> col_it1(a->columns);
163
List_iterator<Key_part_spec> col_it2(b->columns);
164
const Key_part_spec *col1, *col2;
166
#ifdef ENABLE_WHEN_INNODB_CAN_HANDLE_SWAPED_FOREIGN_KEY_COLUMNS
167
while ((col1= col_it1++))
171
while ((col2= col_it2++))
180
return true; // Error
182
return false; // Is prefix
184
while ((col1= col_it1++))
187
if (!(*col1 == *col2))
190
return false; // Is prefix
196
Check if the foreign key options are compatible with columns
197
on which the FK is created.
203
bool Foreign_key::validate(List<Create_field> &table_fields)
205
Create_field *sql_field;
206
Key_part_spec *column;
207
List_iterator<Key_part_spec> cols(columns);
208
List_iterator<Create_field> it(table_fields);
209
while ((column= cols++))
212
while ((sql_field= it++) &&
213
my_strcasecmp(system_charset_info,
214
column->field_name.str,
215
sql_field->field_name)) {}
218
my_error(ER_KEY_COLUMN_DOES_NOT_EXITS, MYF(0), column->field_name.str);
221
if (type == Key::FOREIGN_KEY && sql_field->vcol_info)
223
if (delete_opt == FK_OPTION_SET_NULL)
225
my_error(ER_WRONG_FK_OPTION_FOR_VIRTUAL_COLUMN, MYF(0),
226
"ON DELETE SET NULL");
229
if (update_opt == FK_OPTION_SET_NULL)
231
my_error(ER_WRONG_FK_OPTION_FOR_VIRTUAL_COLUMN, MYF(0),
232
"ON UPDATE SET NULL");
235
if (update_opt == FK_OPTION_CASCADE)
237
my_error(ER_WRONG_FK_OPTION_FOR_VIRTUAL_COLUMN, MYF(0),
238
"ON UPDATE CASCADE");
247
/****************************************************************************
248
** Thread specific functions
249
****************************************************************************/
251
Open_tables_state::Open_tables_state(ulong version_arg)
252
:version(version_arg), state_flags(0U)
254
reset_open_tables_state();
89
!my_strcasecmp(system_charset_info, field_name.str, other.field_name.str);
92
Open_tables_state::Open_tables_state(uint64_t version_arg) :
95
open_tables= temporary_tables= derived_tables= NULL;
96
extra_lock= lock= NULL;
258
100
The following functions form part of the C plugin API
261
extern "C" int mysql_tmpfile(const char *prefix)
102
int mysql_tmpfile(const char *prefix)
263
104
char filename[FN_REFLEN];
264
File fd = create_temp_file(filename, drizzle_tmpdir, prefix,
265
O_CREAT | O_EXCL | O_RDWR,
105
int fd = internal::create_temp_file(filename, drizzle_tmpdir.c_str(), prefix, MYF(MY_WME));
268
107
unlink(filename);
295
124
@see Session::set_proc_info
298
set_session_proc_info(Session *session, const char *info)
126
void set_session_proc_info(Session *session, const char *info)
300
128
session->set_proc_info(info);
304
131
const char *get_session_proc_info(Session *session)
306
133
return session->get_proc_info();
310
void **session_ha_data(const Session *session, const struct handlerton *hton)
312
return (void **) &session->ha_data[hton->slot].ha_ptr;
136
void **Session::getEngineData(const plugin::MonitoredInTransaction *monitored)
138
return static_cast<void **>(&ha_data[monitored->getId()].ha_ptr);
141
ResourceContext *Session::getResourceContext(const plugin::MonitoredInTransaction *monitored,
144
return &ha_data[monitored->getId()].resource_context[index];
316
147
int64_t session_test_options(const Session *session, int64_t test_options)
318
149
return session->options & test_options;
322
152
int session_sql_command(const Session *session)
324
154
return (int) session->lex->sql_command;
328
int session_tx_isolation(const Session *session)
330
return (int) session->variables.tx_isolation;
334
void session_inc_row_count(Session *session)
336
session->row_count++;
340
Clear this diagnostics area.
342
Normally called at the end of a statement.
346
Diagnostics_area::reset_diagnostics_area()
348
can_overwrite_status= false;
349
/** Don't take chances in production */
355
m_total_warn_count= 0;
357
/** Tiny reset in debug mode to see garbage right away */
363
Set OK status -- ends commands that do not return a
364
result set, e.g. INSERT/UPDATE/DELETE.
368
Diagnostics_area::set_ok_status(Session *session, ha_rows affected_rows_arg,
369
uint64_t last_insert_id_arg,
370
const char *message_arg)
374
In production, refuse to overwrite an error or a custom response
377
if (is_error() || is_disabled())
379
/** Only allowed to report success if has not yet reported an error */
381
m_server_status= session->server_status;
382
m_total_warn_count= session->total_warn_count;
383
m_affected_rows= affected_rows_arg;
384
m_last_insert_id= last_insert_id_arg;
386
strmake(m_message, message_arg, sizeof(m_message) - 1);
398
Diagnostics_area::set_eof_status(Session *session)
400
/** Only allowed to report eof if has not yet reported an error */
404
In production, refuse to overwrite an error or a custom response
407
if (is_error() || is_disabled())
410
m_server_status= session->server_status;
412
If inside a stored procedure, do not return the total
413
number of warnings, since they are not available to the client
416
m_total_warn_count= session->total_warn_count;
426
Diagnostics_area::set_error_status(Session *session __attribute__((unused)),
427
uint32_t sql_errno_arg,
428
const char *message_arg)
431
Only allowed to report error if has not yet reported a success
432
The only exception is when we flush the message to the client,
433
an error can happen during the flush.
435
assert(! is_set() || can_overwrite_status);
437
In production, refuse to overwrite a custom response with an
443
m_sql_errno= sql_errno_arg;
444
strmake(m_message, message_arg, sizeof(m_message) - 1);
451
Mark the diagnostics area as 'DISABLED'.
453
This is used in rare cases when the COM_ command at hand sends a response
454
in a custom format. One example is the query cache, another is
459
Diagnostics_area::disable_status()
462
m_status= DA_DISABLED;
467
:Statement(&main_lex, &main_mem_root,
468
/* statement id */ 0),
469
Open_tables_state(refresh_version), rli_fake(0),
470
lock_id(&main_lock_id),
472
binlog_table_maps(0), binlog_flags(0UL),
473
arg_of_last_insert_id_function(false),
474
first_successful_insert_id_in_prev_stmt(0),
475
first_successful_insert_id_in_cur_stmt(0),
478
transaction_rollback_request(0),
479
is_fatal_sub_stmt_error(0),
481
derived_tables_processing(false),
157
enum_tx_isolation session_tx_isolation(const Session *session)
159
return (enum_tx_isolation)session->variables.tx_isolation;
162
Session::Session(plugin::Client *client_arg) :
163
Open_tables_state(refresh_version),
164
mem_root(&main_mem_root),
167
query(new std::string),
168
_schema(new std::string("")),
173
lock_id(&main_lock_id),
175
ha_data(plugin::num_trx_monitored_objects),
176
concurrent_execute_allowed(true),
177
arg_of_last_insert_id_function(false),
178
first_successful_insert_id_in_prev_stmt(0),
179
first_successful_insert_id_in_cur_stmt(0),
181
_global_read_lock(NONE),
183
some_tables_deleted(false),
186
is_fatal_error(false),
187
transaction_rollback_request(false),
188
is_fatal_sub_stmt_error(0),
189
derived_tables_processing(false),
190
tablespace_op(false),
193
transaction_message(NULL),
194
statement_message(NULL),
195
session_event_observers(NULL),
198
client->setSession(this);
487
201
Pass nominal parameters to init_alloc_root only to ensure that
488
202
the destructor works OK in case of an error. The main_mem_root
489
203
will be re-initialized in init_for_queries().
491
init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0);
493
catalog= (char*)"std"; // the only catalog we have for now
494
main_security_ctx.init();
495
security_ctx= &main_security_ctx;
496
some_tables_deleted=no_errors=password= 0;
498
count_cuted_fields= CHECK_FIELD_IGNORE;
501
is_slave_error= thread_specific_used= false;
502
hash_clear(&handler_tables_hash);
205
memory::init_sql_alloc(&main_mem_root, memory::ROOT_MIN_BLOCK_SIZE, 0);
207
count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
505
211
cuted_fields= sent_row_count= row_count= 0L;
507
212
row_count_func= -1;
508
213
statement_id_counter= 0UL;
509
214
// Must be reset to handle error with Session's created for init of mysqld
893
477
assert(thread_stack);
895
if (pthread_setspecific(THR_Session, this) ||
896
pthread_setspecific(THR_MALLOC, &mem_root))
479
currentSession().release();
480
currentSession().reset(this);
482
currentMemRoot().release();
483
currentMemRoot().reset(&mem_root);
898
485
mysys_var=my_thread_var;
900
488
Let mysqld define the thread id (not mysys)
901
489
This allows us to move Session to different threads if needed.
903
491
mysys_var->id= thread_id;
904
real_id= pthread_self(); // For debugging
907
494
We have to call thr_lock_info_init() again here as Session may have been
908
495
created in another thread
910
thr_lock_info_init(&lock_info);
919
Session::cleanup_after_query()
922
This function is used to reset thread data to its default state.
925
This function is not suitable for setting thread data to some
926
non-default values, as there is only one replication thread, so
927
different master threads may overwrite data of each other on
503
Init Session for query processing.
504
This has to be called once before we call mysql_parse.
505
See also comments in session.h.
508
void Session::prepareForQueries()
510
if (variables.max_join_size == HA_POS_ERROR)
511
options |= OPTION_BIG_SELECTS;
513
version= refresh_version;
518
mem_root->reset_root_defaults(variables.query_alloc_block_size,
519
variables.query_prealloc_size);
520
transaction.xid_state.xid.null();
521
transaction.xid_state.in_session=1;
526
bool Session::initGlobals()
530
disconnect(ER_OUT_OF_RESOURCES, true);
531
status_var.aborted_connects++;
539
if (initGlobals() || authenticate())
547
while (not client->haveError() && getKilled() != KILL_CONNECTION)
549
if (not executeStatement())
556
bool Session::schedule(Session::shared_ptr &arg)
558
arg->scheduler= plugin::Scheduler::getScheduler();
559
assert(arg->scheduler);
561
connection_count.increment();
563
if (connection_count > current_global_counters.max_used_connections)
565
current_global_counters.max_used_connections= connection_count;
568
current_global_counters.connections++;
569
arg->thread_id= arg->variables.pseudo_thread_id= global_thread_id++;
571
session::Cache::singleton().insert(arg);
573
if (unlikely(plugin::EventObserver::connectSession(*arg)))
575
// We should do something about an error...
578
if (plugin::Scheduler::getScheduler()->addSession(arg))
580
DRIZZLE_CONNECTION_START(arg->getSessionId());
581
char error_message_buff[DRIZZLE_ERRMSG_SIZE];
583
arg->setKilled(Session::KILL_CONNECTION);
585
arg->status_var.aborted_connects++;
587
/* Can't use my_error() since store_globals has not been called. */
588
/* TODO replace will better error message */
589
snprintf(error_message_buff, sizeof(error_message_buff),
590
ER(ER_CANT_CREATE_THREAD), 1);
591
arg->client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
601
Is this session viewable by the current user?
603
bool Session::isViewable() const
605
return plugin::Authorization::isAuthorized(current_session->getSecurityContext(),
611
const char* Session::enter_cond(boost::condition_variable_any &cond, boost::mutex &mutex, const char* msg)
613
const char* old_msg = get_proc_info();
614
safe_mutex_assert_owner(mutex);
615
mysys_var->current_mutex = &mutex;
616
mysys_var->current_cond = &cond;
617
this->set_proc_info(msg);
621
void Session::exit_cond(const char* old_msg)
624
Putting the mutex unlock in exit_cond() ensures that
625
mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
626
locked (if that would not be the case, you'll get a deadlock if someone
627
does a Session::awake() on you).
629
mysys_var->current_mutex->unlock();
630
boost_unique_lock_t scopedLock(mysys_var->mutex);
631
mysys_var->current_mutex = 0;
632
mysys_var->current_cond = 0;
633
this->set_proc_info(old_msg);
636
bool Session::authenticate()
639
if (client->authenticate())
642
status_var.aborted_connects++;
647
bool Session::checkUser(const std::string &passwd_str,
648
const std::string &in_db)
650
bool is_authenticated=
651
plugin::Authentication::isAuthenticated(getSecurityContext(),
654
if (is_authenticated != true)
656
status_var.access_denied++;
657
/* isAuthenticated has pushed the error message */
661
/* Change database if necessary */
662
if (not in_db.empty())
664
SchemaIdentifier identifier(in_db);
665
if (mysql_change_db(this, identifier))
667
/* mysql_change_db() has pushed the error message. */
672
password= not passwd_str.empty();
674
/* Ready to handle queries */
678
bool Session::executeStatement()
681
uint32_t packet_length;
683
enum enum_server_command l_command;
686
indicator of uninitialized lex => normal flow of errors handling
689
lex->current_select= 0;
691
main_da.reset_diagnostics_area();
693
if (client->readCommand(&l_packet, &packet_length) == false)
698
if (getKilled() == KILL_CONNECTION)
701
if (packet_length == 0)
704
l_command= static_cast<enum_server_command>(l_packet[0]);
706
if (command >= COM_END)
707
command= COM_END; // Wrong command
709
assert(packet_length);
710
return not dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
713
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
715
/* Remove garbage at start and end of query */
716
while (in_packet_length > 0 && my_isspace(charset(), in_packet[0]))
721
const char *pos= in_packet + in_packet_length; /* Point at end null */
722
while (in_packet_length > 0 && (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
728
std::string *new_query= new std::string(in_packet, in_packet + in_packet_length);
729
// We can not be entirely sure _schema has a value
732
plugin::QueryRewriter::rewriteQuery(*_schema, *new_query);
734
query.reset(new_query);
735
_state.reset(new State(in_packet, in_packet_length));
740
bool Session::endTransaction(enum enum_mysql_completiontype completion)
744
TransactionServices &transaction_services= TransactionServices::singleton();
746
if (transaction.xid_state.xa_state != XA_NOTR)
748
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
755
* We don't use endActiveTransaction() here to ensure that this works
756
* even if there is a problem with the OPTION_AUTO_COMMIT flag
757
* (Which of course should never happen...)
759
server_status&= ~SERVER_STATUS_IN_TRANS;
760
if (transaction_services.commitTransaction(this, true))
762
options&= ~(OPTION_BEGIN);
765
do_release= 1; /* fall through */
766
case COMMIT_AND_CHAIN:
767
result= endActiveTransaction();
768
if (result == true && completion == COMMIT_AND_CHAIN)
769
result= startTransaction();
771
case ROLLBACK_RELEASE:
772
do_release= 1; /* fall through */
774
case ROLLBACK_AND_CHAIN:
776
server_status&= ~SERVER_STATUS_IN_TRANS;
777
if (transaction_services.rollbackTransaction(this, true))
779
options&= ~(OPTION_BEGIN);
780
if (result == true && (completion == ROLLBACK_AND_CHAIN))
781
result= startTransaction();
785
my_error(ER_UNKNOWN_COM_ERROR, MYF(0));
791
my_error(killed_errno(), MYF(0));
793
else if ((result == true) && do_release)
795
setKilled(Session::KILL_CONNECTION);
801
bool Session::endActiveTransaction()
804
TransactionServices &transaction_services= TransactionServices::singleton();
806
if (transaction.xid_state.xa_state != XA_NOTR)
808
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
811
if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
813
server_status&= ~SERVER_STATUS_IN_TRANS;
814
if (transaction_services.commitTransaction(this, true))
817
options&= ~(OPTION_BEGIN);
821
bool Session::startTransaction(start_transaction_option_t opt)
825
if (! endActiveTransaction())
831
options|= OPTION_BEGIN;
832
server_status|= SERVER_STATUS_IN_TRANS;
834
if (plugin::TransactionalStorageEngine::notifyStartTransaction(this, opt))
931
843
void Session::cleanup_after_query()
934
Reset rand_used so that detection of calls to rand() will save random
846
Reset rand_used so that detection of calls to rand() will save random
935
847
seeds if needed by the slave.
2297
1640
session->transaction_rollback_request= all;
2300
/***************************************************************************
2301
Handling of XA id cacheing
2302
***************************************************************************/
2304
pthread_mutex_t LOCK_xid_cache;
2307
extern "C" unsigned char *xid_get_hash_key(const unsigned char *, size_t *, bool);
2308
extern "C" void xid_free_hash(void *);
2310
unsigned char *xid_get_hash_key(const unsigned char *ptr, size_t *length,
2311
bool not_used __attribute__((unused)))
2313
*length=((XID_STATE*)ptr)->xid.key_length();
2314
return ((XID_STATE*)ptr)->xid.key();
2317
void xid_free_hash(void *ptr)
2319
if (!((XID_STATE*)ptr)->in_session)
2320
free((unsigned char*)ptr);
2323
bool xid_cache_init()
2325
pthread_mutex_init(&LOCK_xid_cache, MY_MUTEX_INIT_FAST);
2326
return hash_init(&xid_cache, &my_charset_bin, 100, 0, 0,
2327
xid_get_hash_key, xid_free_hash, 0) != 0;
2330
void xid_cache_free()
2332
if (hash_inited(&xid_cache))
2334
hash_free(&xid_cache);
2335
pthread_mutex_destroy(&LOCK_xid_cache);
2339
XID_STATE *xid_cache_search(XID *xid)
2341
pthread_mutex_lock(&LOCK_xid_cache);
2342
XID_STATE *res=(XID_STATE *)hash_search(&xid_cache, xid->key(), xid->key_length());
2343
pthread_mutex_unlock(&LOCK_xid_cache);
2348
bool xid_cache_insert(XID *xid, enum xa_states xa_state)
2352
pthread_mutex_lock(&LOCK_xid_cache);
2353
if (hash_search(&xid_cache, xid->key(), xid->key_length()))
2355
else if (!(xs=(XID_STATE *)my_malloc(sizeof(*xs), MYF(MY_WME))))
2359
xs->xa_state=xa_state;
2362
res=my_hash_insert(&xid_cache, (unsigned char*)xs);
2364
pthread_mutex_unlock(&LOCK_xid_cache);
2369
bool xid_cache_insert(XID_STATE *xid_state)
2371
pthread_mutex_lock(&LOCK_xid_cache);
2372
assert(hash_search(&xid_cache, xid_state->xid.key(),
2373
xid_state->xid.key_length())==0);
2374
bool res=my_hash_insert(&xid_cache, (unsigned char*)xid_state);
2375
pthread_mutex_unlock(&LOCK_xid_cache);
2380
void xid_cache_delete(XID_STATE *xid_state)
2382
pthread_mutex_lock(&LOCK_xid_cache);
2383
hash_delete(&xid_cache, (unsigned char *)xid_state);
2384
pthread_mutex_unlock(&LOCK_xid_cache);
2388
Implementation of interface to write rows to the binary log through the
2389
thread. The thread is responsible for writing the rows it has
2390
inserted/updated/deleted.
2395
Template member function for ensuring that there is an rows log
2396
event of the apropriate type before proceeding.
2399
- Events of type 'RowEventT' have the type code 'type_code'.
2402
If a non-NULL pointer is returned, the pending event for thread 'session' will
2403
be an event of type 'RowEventT' (which have the type code 'type_code')
2404
will either empty or have enough space to hold 'needed' bytes. In
2405
addition, the columns bitmap will be correct for the row, meaning that
2406
the pending event will be flushed if the columns in the event differ from
2407
the columns suppled to the function.
2410
If no error, a non-NULL pending event (either one which already existed or
2411
the newly created one).
2415
template <class RowsEventT> Rows_log_event*
2416
Session::binlog_prepare_pending_rows_event(Table* table, uint32_t serv_id,
2418
bool is_transactional,
2419
RowsEventT *hint __attribute__((unused)))
2421
/* Pre-conditions */
2422
assert(table->s->table_map_id != UINT32_MAX);
2424
/* Fetch the type code for the RowsEventT template parameter */
2425
int const type_code= RowsEventT::TYPE_CODE;
2428
There is no good place to set up the transactional data, so we
2431
if (binlog_setup_trx_data())
2434
Rows_log_event* pending= binlog_get_pending_rows_event();
2436
if (unlikely(pending && !pending->is_valid()))
2440
Check if the current event is non-NULL and a write-rows
2441
event. Also check if the table provided is mapped: if it is not,
2442
then we have switched to writing to a new table.
2443
If there is no pending event, we need to create one. If there is a pending
2444
event, but it's not about the same table id, or not of the same type
2445
(between Write, Update and Delete), or not the same affected columns, or
2446
going to be too big, flush this event to disk and create a new pending
2449
The last test is necessary for the Cluster injector to work
2450
correctly. The reason is that the Cluster can inject two write
2451
rows with different column bitmaps if there is an insert followed
2452
by an update in the same transaction, and these are grouped into a
2453
single epoch/transaction when fed to the injector.
2455
TODO: Fix the code so that the last test can be removed.
2458
pending->server_id != serv_id ||
2459
pending->get_table_id() != table->s->table_map_id ||
2460
pending->get_type_code() != type_code ||
2461
pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
2462
!bitmap_cmp(pending->get_cols(), table->write_set))
2464
/* Create a new RowsEventT... */
2465
Rows_log_event* const
2466
ev= new RowsEventT(this, table, table->s->table_map_id,
2470
ev->server_id= serv_id; // I don't like this, it's too easy to forget.
2472
flush the pending event and replace it with the newly created
2475
if (unlikely(drizzle_bin_log.flush_and_set_pending_rows_event(this, ev)))
2481
return(ev); /* This is the new pending event */
2483
return(pending); /* This is the current pending event */
2486
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
2488
Instantiate the versions we need, we have -fno-implicit-template as
2491
template Rows_log_event*
2492
Session::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
2493
Write_rows_log_event*);
2495
template Rows_log_event*
2496
Session::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
2497
Delete_rows_log_event *);
2499
template Rows_log_event*
2500
Session::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
2501
Update_rows_log_event *);
2506
Class to handle temporary allocation of memory for row data.
2508
The responsibilities of the class is to provide memory for
2509
packing one or two rows of packed data (depending on what
2510
constructor is called).
2512
In order to make the allocation more efficient for "simple" rows,
2513
i.e., rows that do not contain any blobs, a pointer to the
2514
allocated memory is of memory is stored in the table structure
2515
for simple rows. If memory for a table containing a blob field
2516
is requested, only memory for that is allocated, and subsequently
2517
released when the object is destroyed.
2520
class Row_data_memory {
2523
Build an object to keep track of a block-local piece of memory
2524
for storing a row of data.
2527
Table where the pre-allocated memory is stored.
2530
Length of data that is needed, if the record contain blobs.
2532
Row_data_memory(Table *table, size_t const len1)
2535
m_alloc_checked= false;
2536
allocate_memory(table, len1);
2537
m_ptr[0]= has_memory() ? m_memory : 0;
2541
Row_data_memory(Table *table, size_t const len1, size_t const len2)
2544
m_alloc_checked= false;
2545
allocate_memory(table, len1 + len2);
2546
m_ptr[0]= has_memory() ? m_memory : 0;
2547
m_ptr[1]= has_memory() ? m_memory + len1 : 0;
2552
if (m_memory != 0 && m_release_memory_on_destruction)
2553
free((unsigned char*) m_memory);
2557
Is there memory allocated?
2559
@retval true There is memory allocated
2560
@retval false Memory allocation failed
2562
bool has_memory() const {
2563
m_alloc_checked= true;
2564
return m_memory != 0;
2567
unsigned char *slot(uint32_t s)
2569
assert(s < sizeof(m_ptr)/sizeof(*m_ptr));
2570
assert(m_ptr[s] != 0);
2571
assert(m_alloc_checked == true);
2576
void allocate_memory(Table *const table, size_t const total_length)
2578
if (table->s->blob_fields == 0)
2581
The maximum length of a packed record is less than this
2582
length. We use this value instead of the supplied length
2583
when allocating memory for records, since we don't know how
2584
the memory will be used in future allocations.
2586
Since table->s->reclength is for unpacked records, we have
2587
to add two bytes for each field, which can potentially be
2588
added to hold the length of a packed field.
2590
size_t const maxlen= table->s->reclength + 2 * table->s->fields;
2593
Allocate memory for two records if memory hasn't been
2594
allocated. We allocate memory for two records so that it can
2595
be used when processing update rows as well.
2597
if (table->write_row_record == 0)
2598
table->write_row_record=
2599
(unsigned char *) alloc_root(&table->mem_root, 2 * maxlen);
2600
m_memory= table->write_row_record;
2601
m_release_memory_on_destruction= false;
2605
m_memory= (unsigned char *) my_malloc(total_length, MYF(MY_WME));
2606
m_release_memory_on_destruction= true;
2610
mutable bool m_alloc_checked;
2611
bool m_release_memory_on_destruction;
2612
unsigned char *m_memory;
2613
unsigned char *m_ptr[2];
2618
int Session::binlog_write_row(Table* table, bool is_trans,
2619
unsigned char const *record)
2621
assert(drizzle_bin_log.is_open());
2624
Pack records into format for transfer. We are allocating more
2625
memory than needed, but that doesn't matter.
2627
Row_data_memory memory(table, table->max_row_length(record));
2628
if (!memory.has_memory())
2629
return HA_ERR_OUT_OF_MEM;
2631
unsigned char *row_data= memory.slot(0);
2633
size_t const len= pack_row(table, table->write_set, row_data, record);
2635
Rows_log_event* const ev=
2636
binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
2637
static_cast<Write_rows_log_event*>(0));
2639
if (unlikely(ev == 0))
2640
return HA_ERR_OUT_OF_MEM;
2642
return ev->add_row_data(row_data, len);
2645
int Session::binlog_update_row(Table* table, bool is_trans,
2646
const unsigned char *before_record,
2647
const unsigned char *after_record)
2649
assert(drizzle_bin_log.is_open());
2651
size_t const before_maxlen = table->max_row_length(before_record);
2652
size_t const after_maxlen = table->max_row_length(after_record);
2654
Row_data_memory row_data(table, before_maxlen, after_maxlen);
2655
if (!row_data.has_memory())
2656
return HA_ERR_OUT_OF_MEM;
2658
unsigned char *before_row= row_data.slot(0);
2659
unsigned char *after_row= row_data.slot(1);
2661
size_t const before_size= pack_row(table, table->read_set, before_row,
2663
size_t const after_size= pack_row(table, table->write_set, after_row,
2666
Rows_log_event* const ev=
2667
binlog_prepare_pending_rows_event(table, server_id,
2668
before_size + after_size, is_trans,
2669
static_cast<Update_rows_log_event*>(0));
2671
if (unlikely(ev == 0))
2672
return HA_ERR_OUT_OF_MEM;
2675
ev->add_row_data(before_row, before_size) ||
2676
ev->add_row_data(after_row, after_size);
2679
int Session::binlog_delete_row(Table* table, bool is_trans,
2680
unsigned char const *record)
2682
assert(drizzle_bin_log.is_open());
2685
Pack records into format for transfer. We are allocating more
2686
memory than needed, but that doesn't matter.
2688
Row_data_memory memory(table, table->max_row_length(record));
2689
if (unlikely(!memory.has_memory()))
2690
return HA_ERR_OUT_OF_MEM;
2692
unsigned char *row_data= memory.slot(0);
2694
size_t const len= pack_row(table, table->read_set, row_data, record);
2696
Rows_log_event* const ev=
2697
binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
2698
static_cast<Delete_rows_log_event*>(0));
2700
if (unlikely(ev == 0))
2701
return HA_ERR_OUT_OF_MEM;
2703
return ev->add_row_data(row_data, len);
2707
int Session::binlog_flush_pending_rows_event(bool stmt_end)
2710
We shall flush the pending event even if we are not in row-based
2711
mode: it might be the case that we left row-based mode before
2712
flushing anything (e.g., if we have explicitly locked tables).
2714
if (!drizzle_bin_log.is_open())
2718
Mark the event as the last event of a statement if the stmt_end
2722
if (Rows_log_event *pending= binlog_get_pending_rows_event())
2726
pending->set_flags(Rows_log_event::STMT_END_F);
2727
pending->flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
2728
binlog_table_maps= 0;
2731
error= drizzle_bin_log.flush_and_set_pending_rows_event(this, 0);
2739
Member function that will log query, either row-based or
2740
statement-based depending on the value of the 'current_stmt_binlog_row_based'
2741
the value of the 'qtype' flag.
2743
This function should be called after the all calls to ha_*_row()
2744
functions have been issued, but before tables are unlocked and
2748
There shall be no writes to any system table after calling
2749
binlog_query(), so these writes has to be moved to before the call
2750
of binlog_query() for correct functioning.
2752
This is necessesary not only for RBR, but the master might crash
2753
after binlogging the query but before changing the system tables.
2754
This means that the slave and the master are not in the same state
2755
(after the master has restarted), so therefore we have to
2756
eliminate this problem.
2759
Error code, or 0 if no error.
2761
int Session::binlog_query(Session::enum_binlog_query_type qtype, char const *query_arg,
2762
ulong query_len, bool is_trans, bool suppress_use,
2763
Session::killed_state killed_status_arg)
2765
assert(query_arg && drizzle_bin_log.is_open());
2767
if (int error= binlog_flush_pending_rows_event(true))
2771
case Session::ROW_QUERY_TYPE:
2773
case Session::DRIZZLE_QUERY_TYPE:
2775
Using this query type is a conveniece hack, since we have been
2776
moving back and forth between using RBR for replication of
2777
system tables and not using it.
2779
Make sure to change in check_table_binlog_row_based() according
2780
to how you treat this.
2782
case Session::STMT_QUERY_TYPE:
2784
The DRIZZLE_LOG::write() function will set the STMT_END_F flag and
2785
flush the pending rows event if necessary.
2788
Query_log_event qinfo(this, query_arg, query_len, is_trans, suppress_use,
2790
qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
2792
Binlog table maps will be irrelevant after a Query_log_event
2793
(they are just removed on the slave side) so after the query
2794
log event is written to the binary log, we pretend that no
2795
table maps were written.
2797
int error= drizzle_bin_log.write(&qinfo);
2798
binlog_table_maps= 0;
2803
case Session::QUERY_TYPE_COUNT:
2805
assert(0 <= qtype && qtype < QUERY_TYPE_COUNT);
2810
bool Discrete_intervals_list::append(uint64_t start, uint64_t val,
2813
/* first, see if this can be merged with previous */
2814
if ((head == NULL) || tail->merge_if_contiguous(start, val, incr))
2816
/* it cannot, so need to add a new interval */
2817
Discrete_interval *new_interval= new Discrete_interval(start, val, incr);
2818
return(append(new_interval));
2823
bool Discrete_intervals_list::append(Discrete_interval *new_interval)
2825
if (unlikely(new_interval == NULL))
2828
head= current= new_interval;
2830
tail->next= new_interval;
2839
@param session Thread handle
2840
@param errcode Error code to print to console
2841
@param lock 1 if we have have to lock LOCK_thread_count
2844
For the connection that is doing shutdown, this is called twice
2846
void close_connection(Session *session, uint32_t errcode, bool lock)
2850
(void) pthread_mutex_lock(&LOCK_thread_count);
2851
session->killed= Session::KILL_CONNECTION;
2852
if ((vio= session->net.vio) != 0)
1644
void Session::disconnect(uint32_t errcode, bool should_lock)
1646
/* Allow any plugins to cleanup their session variables */
1647
plugin_sessionvar_cleanup(this);
1649
/* If necessary, log any aborted or unauthorized connections */
1650
if (getKilled() || client->wasAborted())
1652
status_var.aborted_threads++;
1655
if (client->wasAborted())
1657
if (not getKilled() && variables.log_warnings > 1)
1659
SecurityContext *sctx= &security_ctx;
1661
errmsg_printf(ERRMSG_LVL_WARN, ER(ER_NEW_ABORTING_CONNECTION)
1663
, (_schema->empty() ? "unconnected" : _schema->c_str())
1664
, sctx->getUser().empty() == false ? sctx->getUser().c_str() : "unauthenticated"
1665
, sctx->getIp().c_str()
1666
, (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1670
/* Close out our connection to the client */
1672
session::Cache::singleton().mutex().lock();
1674
setKilled(Session::KILL_CONNECTION);
1676
if (client->isConnected())
2855
net_send_error(session, errcode, ER(errcode)); /* purecov: inspected */
2856
net_close(&(session->net)); /* vio is freed in delete session */
1680
/*my_error(errcode, ER(errcode));*/
1681
client->sendError(errcode, ER(errcode));
1688
session::Cache::singleton().mutex().unlock();
1692
void Session::reset_for_next_command()
1697
Those two lines below are theoretically unneeded as
1698
Session::cleanup_after_query() should take care of this already.
1700
auto_inc_intervals_in_cur_stmt_for_binlog.empty();
1702
is_fatal_error= false;
1703
server_status&= ~ (SERVER_MORE_RESULTS_EXISTS |
1704
SERVER_QUERY_NO_INDEX_USED |
1705
SERVER_QUERY_NO_GOOD_INDEX_USED);
1708
main_da.reset_diagnostics_area();
1709
total_warn_count=0; // Warnings for this query
1710
sent_row_count= examined_row_count= 0;
1714
Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
1717
void Open_tables_state::close_temporary_tables()
1722
if (not temporary_tables)
1725
for (table= temporary_tables; table; table= tmp_next)
1727
tmp_next= table->getNext();
1730
temporary_tables= NULL;
1734
unlink from session->temporary tables and close temporary table
1737
void Open_tables_state::close_temporary_table(Table *table)
1739
if (table->getPrev())
1741
table->getPrev()->setNext(table->getNext());
1742
if (table->getPrev()->getNext())
1744
table->getNext()->setPrev(table->getPrev());
1749
/* removing the item from the list */
1750
assert(table == temporary_tables);
1752
slave must reset its temporary list pointer to zero to exclude
1753
passing non-zero value to end_slave via rli->save_temporary_tables
1754
when no temp tables opened, see an invariant below.
1756
temporary_tables= table->getNext();
1757
if (temporary_tables)
1759
table->getNext()->setPrev(NULL);
1766
Close and drop a temporary table
1769
This dosn't unlink table from session->temporary
1770
If this is needed, use close_temporary_table()
1773
void Open_tables_state::nukeTable(Table *table)
1775
plugin::StorageEngine *table_type= table->getShare()->db_type();
1777
table->free_io_cache();
1778
table->delete_table();
1780
TableIdentifier identifier(table->getShare()->getSchemaName(), table->getShare()->getTableName(), table->getShare()->getPath());
1781
rm_temporary_table(table_type, identifier);
1783
delete table->getMutableShare();
1785
/* This makes me sad, but we're allocating it via malloc */
1789
/** Clear most status variables. */
1790
extern time_t flush_status_time;
1792
void Session::refresh_status()
1794
/* Reset thread's status variables */
1795
memset(&status_var, 0, sizeof(status_var));
1797
flush_status_time= time((time_t*) 0);
1798
current_global_counters.max_used_connections= 1; /* We set it to one, because we know we exist */
1799
current_global_counters.connections= 0;
1802
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1804
return getVariable(std::string(name.str, name.length), create_if_not_exists);
1807
user_var_entry *Session::getVariable(const std::string &name, bool create_if_not_exists)
1809
UserVarsRange ppp= user_vars.equal_range(name);
1811
for (UserVars::iterator iter= ppp.first;
1812
iter != ppp.second; ++iter)
1814
return (*iter).second;
1817
if (not create_if_not_exists)
1820
user_var_entry *entry= NULL;
1821
entry= new (nothrow) user_var_entry(name.c_str(), query_id);
1826
std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(name, entry));
1828
if (not returnable.second)
1836
void Session::setVariable(const std::string &name, const std::string &value)
1838
user_var_entry *updateable_var= getVariable(name.c_str(), true);
1840
updateable_var->update_hash(false,
1841
(void*)value.c_str(),
1842
static_cast<uint32_t>(value.length()), STRING_RESULT,
1844
DERIVATION_IMPLICIT, false);
1847
void Open_tables_state::mark_temp_tables_as_free_for_reuse()
1849
for (Table *table= temporary_tables ; table ; table= table->getNext())
1851
if (table->query_id == getQueryId())
1854
table->cursor->ha_reset();
1859
void Session::mark_used_tables_as_free_for_reuse(Table *table)
1861
for (; table ; table= table->getNext())
1863
if (table->query_id == getQueryId())
1866
table->cursor->ha_reset();
1872
Unlocks tables and frees derived tables.
1873
Put all normal tables used by thread in free list.
1875
It will only close/mark as free for reuse tables opened by this
1876
substatement, it will also check if we are closing tables after
1877
execution of complete query (i.e. we are on upper level) and will
1878
leave prelocked mode if needed.
1880
void Session::close_thread_tables()
1882
clearDerivedTables();
1885
Mark all temporary tables used by this statement as free for reuse.
1887
mark_temp_tables_as_free_for_reuse();
1889
Let us commit transaction for statement. Since in 5.0 we only have
1890
one statement transaction and don't allow several nested statement
1891
transactions this call will do nothing if we are inside of stored
1892
function or trigger (i.e. statement transaction is already active and
1893
does not belong to statement for which we do close_thread_tables()).
1894
TODO: This should be fixed in later releases.
1897
TransactionServices &transaction_services= TransactionServices::singleton();
1898
main_da.can_overwrite_status= true;
1899
transaction_services.autocommitOrRollback(this, is_error());
1900
main_da.can_overwrite_status= false;
1901
transaction.stmt.reset();
2859
(void) pthread_mutex_unlock(&LOCK_thread_count);
1907
For RBR we flush the pending event just before we unlock all the
1908
tables. This means that we are at the end of a topmost
1909
statement, so we ensure that the STMT_END_F flag is set on the
1910
pending event. For statements that are *inside* stored
1911
functions, the pending event will not be flushed: that will be
1912
handled either before writing a query log event (inside
1913
binlog_query()) or when preparing a pending event.
1919
Note that we need to hold table::Cache::singleton().mutex() while changing the
1920
open_tables list. Another thread may work on it.
1921
(See: table::Cache::singleton().removeTable(), mysql_wait_completed_table())
1922
Closing a MERGE child before the parent would be fatal if the
1923
other thread tries to abort the MERGE lock in between.
1926
close_open_tables();
1929
void Session::close_tables_for_reopen(TableList **tables)
1932
If table list consists only from tables from prelocking set, table list
1933
for new attempt should be empty, so we have to update list's root pointer.
1935
if (lex->first_not_own_table() == *tables)
1937
lex->chop_off_not_own_tables();
1938
for (TableList *tmp= *tables; tmp; tmp= tmp->next_global)
1940
close_thread_tables();
1943
bool Session::openTablesLock(TableList *tables)
1950
if (open_tables_from_list(&tables, &counter))
1953
if (not lock_tables(tables, counter, &need_reopen))
1955
if (not need_reopen)
1957
close_tables_for_reopen(&tables);
1959
if ((mysql_handle_derived(lex, &mysql_derived_prepare) ||
1961
mysql_handle_derived(lex, &mysql_derived_filling))))
1968
@note "best_effort" is used in cases were if a failure occurred on this
1969
operation it would not be surprising because we are only removing because there
1970
might be an issue (lame engines).
1973
bool Open_tables_state::rm_temporary_table(const TableIdentifier &identifier, bool best_effort)
1975
if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), identifier))
1977
if (not best_effort)
1980
identifier.getSQLPath(path);
1981
errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
1982
path.c_str(), errno);
1991
bool Open_tables_state::rm_temporary_table(plugin::StorageEngine *base, const TableIdentifier &identifier)
1995
if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), *base, identifier))
1998
identifier.getSQLPath(path);
1999
errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
2000
path.c_str(), errno);
2009
@note this will be removed, I am looking through Hudson to see if it is finding
2010
any tables that are missed during cleanup.
2012
void Open_tables_state::dumpTemporaryTableNames(const char *foo)
2016
if (not temporary_tables)
2019
cerr << "Begin Run: " << foo << "\n";
2020
for (table= temporary_tables; table; table= table->getNext())
2022
bool have_proto= false;
2024
message::Table *proto= table->getShare()->getTableProto();
2025
if (table->getShare()->getTableProto())
2028
const char *answer= have_proto ? "true" : "false";
2032
cerr << "\tTable Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2033
cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
2037
cerr << "\tTabl;e Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2042
bool Session::TableMessages::storeTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2044
table_message_cache.insert(make_pair(identifier.getPath(), table_message));
2049
bool Session::TableMessages::removeTableMessage(const TableIdentifier &identifier)
2051
TableMessageCache::iterator iter;
2053
iter= table_message_cache.find(identifier.getPath());
2055
if (iter == table_message_cache.end())
2058
table_message_cache.erase(iter);
2063
bool Session::TableMessages::getTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2065
TableMessageCache::iterator iter;
2067
iter= table_message_cache.find(identifier.getPath());
2069
if (iter == table_message_cache.end())
2072
table_message.CopyFrom(((*iter).second));
2077
bool Session::TableMessages::doesTableMessageExist(const TableIdentifier &identifier)
2079
TableMessageCache::iterator iter;
2081
iter= table_message_cache.find(identifier.getPath());
2083
if (iter == table_message_cache.end())
2091
bool Session::TableMessages::renameTableMessage(const TableIdentifier &from, const TableIdentifier &to)
2093
TableMessageCache::iterator iter;
2095
table_message_cache[to.getPath()]= table_message_cache[from.getPath()];
2097
iter= table_message_cache.find(to.getPath());
2099
if (iter == table_message_cache.end())
2104
(*iter).second.set_schema(to.getSchemaName());
2105
(*iter).second.set_name(to.getTableName());
2110
table::Instance *Session::getInstanceTable()
2112
temporary_shares.push_back(new table::Instance()); // This will not go into the tableshare cache, so no key is used.
2114
table::Instance *tmp_share= temporary_shares.back();
2123
Create a reduced Table object with properly set up Field list from a
2124
list of field definitions.
2126
The created table doesn't have a table Cursor associated with
2127
it, has no keys, no group/distinct, no copy_funcs array.
2128
The sole purpose of this Table object is to use the power of Field
2129
class to read/write data to/from table->getInsertRecord(). Then one can store
2130
the record in any container (RB tree, hash, etc).
2131
The table is created in Session mem_root, so are the table's fields.
2132
Consequently, if you don't BLOB fields, you don't need to free it.
2134
@param session connection handle
2135
@param field_list list of column definitions
2138
0 if out of memory, Table object in case of success
2140
table::Instance *Session::getInstanceTable(List<CreateField> &field_list)
2142
temporary_shares.push_back(new table::Instance(this, field_list)); // This will not go into the tableshare cache, so no key is used.
2144
table::Instance *tmp_share= temporary_shares.back();
2153
static const std::string NONE= "NONE";
2154
static const std::string GOT_GLOBAL_READ_LOCK= "HAS GLOBAL READ LOCK";
2155
static const std::string MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT= "HAS GLOBAL READ LOCK WITH BLOCKING COMMIT";
2157
const std::string &type(drizzled::Session::global_read_lock_t type)
2163
case Session::GOT_GLOBAL_READ_LOCK:
2164
return GOT_GLOBAL_READ_LOCK;
2165
case Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT:
2166
return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT;
2170
size_t max_string_length(drizzled::Session::global_read_lock_t)
2172
return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT.size();
2175
} /* namespace display */
2177
} /* namespace drizzled */