17
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21
* @file Implementation of the Session class and API
25
#include "drizzled/session.h"
26
#include "drizzled/session/cache.h"
21
/*****************************************************************************
23
** This file implements classes defined in session.h
24
** Especially the classes to handle a result from a select
26
*****************************************************************************/
27
#include <drizzled/server_includes.h>
28
#include <drizzled/session.h>
27
29
#include <sys/stat.h>
28
#include "drizzled/error.h"
29
#include "drizzled/gettext.h"
30
#include "drizzled/query_id.h"
31
#include "drizzled/data_home.h"
32
#include "drizzled/sql_base.h"
33
#include "drizzled/lock.h"
34
#include "drizzled/item/cache.h"
35
#include "drizzled/item/float.h"
36
#include "drizzled/item/return_int.h"
37
#include "drizzled/item/empty_string.h"
38
#include "drizzled/show.h"
39
#include "drizzled/plugin/client.h"
40
#include "drizzled/plugin/scheduler.h"
41
#include "drizzled/plugin/authentication.h"
42
#include "drizzled/plugin/logging.h"
43
#include "drizzled/plugin/transactional_storage_engine.h"
44
#include "drizzled/plugin/query_rewrite.h"
45
#include "drizzled/probes.h"
46
#include "drizzled/table_proto.h"
47
#include "drizzled/db.h"
48
#include "drizzled/pthread_globals.h"
49
#include "drizzled/transaction_services.h"
50
#include "drizzled/drizzled.h"
52
#include "drizzled/table/instance.h"
54
#include "plugin/myisam/myisam.h"
55
#include "drizzled/internal/iocache.h"
56
#include "drizzled/internal/thread_var.h"
57
#include "drizzled/plugin/event_observer.h"
59
#include "drizzled/util/functors.h"
61
#include "drizzled/display.h"
66
#include <boost/filesystem.hpp>
68
#include "drizzled/util/backtrace.h"
72
namespace fs=boost::filesystem;
30
#include <mysys/mysys_err.h>
31
#include <drizzled/error.h>
32
#include <drizzled/query_id.h>
33
#include <drizzled/data_home.h>
34
#include <drizzled/sql_base.h>
35
#include <drizzled/lock.h>
36
#include <drizzled/item/cache.h>
37
#include <drizzled/item/float.h>
38
#include <drizzled/item/return_int.h>
39
#include <drizzled/item/empty_string.h>
40
#include <drizzled/show.h>
41
#include <drizzled/plugin_scheduling.h>
43
extern scheduling_st thread_scheduler;
77
45
The following is used to initialise Table_ident with a internal
81
49
char empty_c_string[1]= {0}; /* used for not defined db */
83
51
const char * const Session::DEFAULT_WHERE= "field list";
52
extern pthread_key_t THR_Session;
53
extern pthread_key_t THR_Mem_root;
56
/*****************************************************************************
57
** Instansiate templates
58
*****************************************************************************/
60
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
62
template class List<Key>;
63
template class List_iterator<Key>;
64
template class List<Key_part_spec>;
65
template class List_iterator<Key_part_spec>;
66
template class List<Alter_drop>;
67
template class List_iterator<Alter_drop>;
68
template class List<Alter_column>;
69
template class List_iterator<Alter_column>;
73
/****************************************************************************
75
****************************************************************************/
77
extern "C" unsigned char *get_var_key(user_var_entry *entry, size_t *length,
80
*length= entry->name.length;
81
return (unsigned char*) entry->name.str;
84
extern "C" void free_user_var(user_var_entry *entry)
86
char *pos= (char*) entry+ALIGN_SIZE(sizeof(*entry));
87
if (entry->value && entry->value != pos)
85
92
bool Key_part_spec::operator==(const Key_part_spec& other) const
87
94
return length == other.length &&
88
95
field_name.length == other.field_name.length &&
89
!my_strcasecmp(system_charset_info, field_name.str, other.field_name.str);
92
Open_tables_state::Open_tables_state(uint64_t version_arg) :
95
open_tables= temporary_tables= derived_tables= NULL;
96
extra_lock= lock= NULL;
96
!strcmp(field_name.str, other.field_name.str);
100
Construct an (almost) deep copy of this key. Only those
101
elements that are known to never change are not copied.
102
If out of memory, a partial copy is returned and an error is set
106
Key::Key(const Key &rhs, MEM_ROOT *mem_root)
108
key_create_info(rhs.key_create_info),
109
columns(rhs.columns, mem_root),
111
generated(rhs.generated)
113
list_copy_and_replace_each_value(columns, mem_root);
117
Construct an (almost) deep copy of this foreign key. Only those
118
elements that are known to never change are not copied.
119
If out of memory, a partial copy is returned and an error is set
123
Foreign_key::Foreign_key(const Foreign_key &rhs, MEM_ROOT *mem_root)
125
ref_table(rhs.ref_table),
126
ref_columns(rhs.ref_columns),
127
delete_opt(rhs.delete_opt),
128
update_opt(rhs.update_opt),
129
match_opt(rhs.match_opt)
131
list_copy_and_replace_each_value(ref_columns, mem_root);
135
Test if a foreign key (= generated key) is a prefix of the given key
136
(ignoring key name, key type and order of columns)
139
This is only used to test if an index for a FOREIGN KEY exists
142
We only compare field names
145
0 Generated key is a prefix of other key
149
bool foreign_key_prefix(Key *a, Key *b)
151
/* Ensure that 'a' is the generated key */
154
if (b->generated && a->columns.elements > b->columns.elements)
155
std::swap(a, b); // Put shorter key in 'a'
160
return true; // No foreign key
161
std::swap(a, b); // Put generated key in 'a'
164
/* Test if 'a' is a prefix of 'b' */
165
if (a->columns.elements > b->columns.elements)
166
return true; // Can't be prefix
168
List_iterator<Key_part_spec> col_it1(a->columns);
169
List_iterator<Key_part_spec> col_it2(b->columns);
170
const Key_part_spec *col1, *col2;
172
#ifdef ENABLE_WHEN_INNODB_CAN_HANDLE_SWAPED_FOREIGN_KEY_COLUMNS
173
while ((col1= col_it1++))
177
while ((col2= col_it2++))
186
return true; // Error
188
return false; // Is prefix
190
while ((col1= col_it1++))
193
if (!(*col1 == *col2))
196
return false; // Is prefix
202
Check if the foreign key options are compatible with columns
203
on which the FK is created.
209
bool Foreign_key::validate(List<Create_field> &table_fields)
211
Create_field *sql_field;
212
Key_part_spec *column;
213
List_iterator<Key_part_spec> cols(columns);
214
List_iterator<Create_field> it(table_fields);
215
while ((column= cols++))
218
while ((sql_field= it++) &&
219
my_strcasecmp(system_charset_info,
220
column->field_name.str,
221
sql_field->field_name)) {}
224
my_error(ER_KEY_COLUMN_DOES_NOT_EXITS, MYF(0), column->field_name.str);
227
if (type == Key::FOREIGN_KEY && sql_field->vcol_info)
229
if (delete_opt == FK_OPTION_SET_NULL)
231
my_error(ER_WRONG_FK_OPTION_FOR_VIRTUAL_COLUMN, MYF(0),
232
"ON DELETE SET NULL");
235
if (update_opt == FK_OPTION_SET_NULL)
237
my_error(ER_WRONG_FK_OPTION_FOR_VIRTUAL_COLUMN, MYF(0),
238
"ON UPDATE SET NULL");
241
if (update_opt == FK_OPTION_CASCADE)
243
my_error(ER_WRONG_FK_OPTION_FOR_VIRTUAL_COLUMN, MYF(0),
244
"ON UPDATE CASCADE");
253
/****************************************************************************
254
** Thread specific functions
255
****************************************************************************/
257
Open_tables_state::Open_tables_state(ulong version_arg)
258
:version(version_arg), state_flags(0U)
260
reset_open_tables_state();
100
264
The following functions form part of the C plugin API
102
int mysql_tmpfile(const char *prefix)
267
extern "C" int mysql_tmpfile(const char *prefix)
104
269
char filename[FN_REFLEN];
105
int fd = internal::create_temp_file(filename, drizzle_tmpdir.c_str(), prefix, MYF(MY_WME));
270
File fd = create_temp_file(filename, drizzle_tmpdir, prefix,
271
O_CREAT | O_EXCL | O_RDWR,
107
274
unlink(filename);
124
301
@see Session::set_proc_info
126
void set_session_proc_info(Session *session, const char *info)
304
set_session_proc_info(Session *session, const char *info)
128
306
session->set_proc_info(info);
131
310
const char *get_session_proc_info(Session *session)
133
312
return session->get_proc_info();
136
void **Session::getEngineData(const plugin::MonitoredInTransaction *monitored)
138
return static_cast<void **>(&ha_data[monitored->getId()].ha_ptr);
141
ResourceContext *Session::getResourceContext(const plugin::MonitoredInTransaction *monitored,
144
return &ha_data[monitored->getId()].resource_context[index];
316
void **session_ha_data(const Session *session, const struct handlerton *hton)
318
return (void **) &session->ha_data[hton->slot].ha_ptr;
147
322
int64_t session_test_options(const Session *session, int64_t test_options)
149
324
return session->options & test_options;
152
328
int session_sql_command(const Session *session)
154
330
return (int) session->lex->sql_command;
157
enum_tx_isolation session_tx_isolation(const Session *session)
159
return (enum_tx_isolation)session->variables.tx_isolation;
162
Session::Session(plugin::Client *client_arg) :
163
Open_tables_state(refresh_version),
164
mem_root(&main_mem_root),
167
query(new std::string),
168
_schema(new std::string("")),
173
lock_id(&main_lock_id),
175
ha_data(plugin::num_trx_monitored_objects),
176
concurrent_execute_allowed(true),
177
arg_of_last_insert_id_function(false),
178
first_successful_insert_id_in_prev_stmt(0),
179
first_successful_insert_id_in_cur_stmt(0),
181
_global_read_lock(NONE),
183
some_tables_deleted(false),
186
is_fatal_error(false),
187
transaction_rollback_request(false),
188
is_fatal_sub_stmt_error(0),
189
derived_tables_processing(false),
190
tablespace_op(false),
193
transaction_message(NULL),
194
statement_message(NULL),
195
session_event_observers(NULL),
198
client->setSession(this);
334
int session_tx_isolation(const Session *session)
336
return (int) session->variables.tx_isolation;
340
void session_inc_row_count(Session *session)
342
session->row_count++;
346
Clear this diagnostics area.
348
Normally called at the end of a statement.
352
Diagnostics_area::reset_diagnostics_area()
354
can_overwrite_status= false;
355
/** Don't take chances in production */
361
m_total_warn_count= 0;
363
/** Tiny reset in debug mode to see garbage right away */
369
Set OK status -- ends commands that do not return a
370
result set, e.g. INSERT/UPDATE/DELETE.
374
Diagnostics_area::set_ok_status(Session *session, ha_rows affected_rows_arg,
375
uint64_t last_insert_id_arg,
376
const char *message_arg)
380
In production, refuse to overwrite an error or a custom response
383
if (is_error() || is_disabled())
385
/** Only allowed to report success if has not yet reported an error */
387
m_server_status= session->server_status;
388
m_total_warn_count= session->total_warn_count;
389
m_affected_rows= affected_rows_arg;
390
m_last_insert_id= last_insert_id_arg;
392
strncpy(m_message, message_arg, sizeof(m_message) - 1);
404
Diagnostics_area::set_eof_status(Session *session)
406
/** Only allowed to report eof if has not yet reported an error */
410
In production, refuse to overwrite an error or a custom response
413
if (is_error() || is_disabled())
416
m_server_status= session->server_status;
418
If inside a stored procedure, do not return the total
419
number of warnings, since they are not available to the client
422
m_total_warn_count= session->total_warn_count;
432
Diagnostics_area::set_error_status(Session *,
433
uint32_t sql_errno_arg,
434
const char *message_arg)
437
Only allowed to report error if has not yet reported a success
438
The only exception is when we flush the message to the client,
439
an error can happen during the flush.
441
assert(! is_set() || can_overwrite_status);
443
In production, refuse to overwrite a custom response with an
449
m_sql_errno= sql_errno_arg;
450
strncpy(m_message, message_arg, sizeof(m_message) - 1);
457
Mark the diagnostics area as 'DISABLED'.
459
This is used in rare cases when the COM_ command at hand sends a response
460
in a custom format. One example is the query cache, another is
465
Diagnostics_area::disable_status()
468
m_status= DA_DISABLED;
473
:Statement(&main_lex, &main_mem_root,
474
/* statement id */ 0),
475
Open_tables_state(refresh_version),
476
lock_id(&main_lock_id),
478
arg_of_last_insert_id_function(false),
479
first_successful_insert_id_in_prev_stmt(0),
480
first_successful_insert_id_in_cur_stmt(0),
483
transaction_rollback_request(0),
484
is_fatal_sub_stmt_error(0),
486
derived_tables_processing(false),
201
493
Pass nominal parameters to init_alloc_root only to ensure that
202
494
the destructor works OK in case of an error. The main_mem_root
203
495
will be re-initialized in init_for_queries().
205
memory::init_sql_alloc(&main_mem_root, memory::ROOT_MIN_BLOCK_SIZE, 0);
207
count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
497
init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0);
499
catalog= (char*)"std"; // the only catalog we have for now
500
some_tables_deleted=no_errors=password= 0;
501
count_cuted_fields= CHECK_FIELD_IGNORE;
504
thread_specific_used= false;
505
hash_clear(&handler_tables_hash);
211
508
cuted_fields= sent_row_count= row_count= 0L;
212
510
row_count_func= -1;
213
511
statement_id_counter= 0UL;
214
512
// Must be reset to handle error with Session's created for init of mysqld
225
scoreboard_index= -1;
522
db_charset= global_system_variables.collation_database;
523
memset(ha_data, 0, sizeof(ha_data));
226
526
dbug_sentry=Session_SENTRY_MAGIC;
227
cleanup_done= abort_on_warning= no_warnings_for_error= false;
229
/* query_cache init */
528
client_capabilities= 0; // minimalistic client
529
system_thread= NON_SYSTEM_THREAD;
530
cleanup_done= abort_on_warning= no_warnings_for_error= 0;
531
peer_port= 0; // For SHOW PROCESSLIST
533
pthread_mutex_init(&LOCK_delete, MY_MUTEX_INIT_FAST);
233
535
/* Variables with default values */
234
536
proc_info="login";
235
537
where= Session::DEFAULT_WHERE;
236
command= COM_CONNECT;
238
plugin_sessionvar_init(this);
240
variables= global_system_variables above has reset
241
variables.pseudo_thread_id to 0. We need to correct it here to
242
avoid temporary tables replication failure.
244
variables.pseudo_thread_id= thread_id;
245
server_status= SERVER_STATUS_AUTOCOMMIT;
246
options= session_startup_options;
248
if (variables.max_join_size == HA_POS_ERROR)
249
options |= OPTION_BIG_SELECTS;
251
options &= ~OPTION_BIG_SELECTS;
253
open_options=ha_open_options;
254
update_lock_default= TL_WRITE;
255
session_tx_isolation= (enum_tx_isolation) variables.tx_isolation;
257
memset(warn_count, 0, sizeof(warn_count));
259
memset(&status_var, 0, sizeof(status_var));
538
server_id = ::server_id;
261
543
/* Initialize sub structures */
262
memory::init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
544
init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
545
user_connect=(USER_CONN *)0;
546
hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0,
547
(hash_get_key) get_var_key,
548
(hash_free_key) free_user_var, 0);
551
protocol= &protocol_text; // Default protocol
552
protocol_text.init(this);
554
const Query_id& local_query_id= Query_id::get_query_id();
555
tablespace_op= false;
557
drizzleclient_randominit(&rand, tmp + (uint64_t) &rand,
558
tmp + (uint64_t)local_query_id.value());
264
559
substitute_null_with_insert_id = false;
265
lock_info.init(); /* safety: will be reset after start */
560
thr_lock_info_init(&lock_info); /* safety: will be reset after start */
266
561
thr_lock_owner_init(&main_lock_id, &lock_info);
268
563
m_internal_handler= NULL;
270
plugin::EventObserver::registerSessionEvents(*this);
273
void Session::free_items()
276
/* This works because items are allocated with memory::sql_alloc() */
277
for (; free_list; free_list= next)
279
next= free_list->next;
280
free_list->delete_self();
284
567
void Session::push_internal_handler(Internal_error_handler *handler)
328
593
m_internal_handler= NULL;
331
void Session::get_xid(DRIZZLE_XID *xid)
333
*xid = *(DRIZZLE_XID *) &transaction.xid_state.xid;
596
#if defined(__cplusplus)
600
void *session_alloc(Session *session, unsigned int size)
602
return session->alloc(size);
605
void *session_calloc(Session *session, unsigned int size)
607
return session->calloc(size);
610
char *session_strdup(Session *session, const char *str)
612
return session->strdup(str);
615
char *session_strmake(Session *session, const char *str, unsigned int size)
617
return session->strmake(str, size);
620
void *session_memdup(Session *session, const void* str, unsigned int size)
622
return session->memdup(str, size);
625
void session_get_xid(const Session *session, DRIZZLE_XID *xid)
627
*xid = *(DRIZZLE_XID *) &session->transaction.xid_state.xid;
630
#if defined(__cplusplus)
635
Init common variables that has to be reset on start and on change_user
638
void Session::init(void)
640
pthread_mutex_lock(&LOCK_global_system_variables);
641
plugin_sessionvar_init(this);
643
variables= global_system_variables above has reset
644
variables.pseudo_thread_id to 0. We need to correct it here to
645
avoid temporary tables replication failure.
647
variables.pseudo_thread_id= thread_id;
648
pthread_mutex_unlock(&LOCK_global_system_variables);
649
server_status= SERVER_STATUS_AUTOCOMMIT;
650
options= session_startup_options;
652
if (variables.max_join_size == HA_POS_ERROR)
653
options |= OPTION_BIG_SELECTS;
655
options &= ~OPTION_BIG_SELECTS;
657
transaction.all.modified_non_trans_table= transaction.stmt.modified_non_trans_table= false;
658
open_options=ha_open_options;
659
update_lock_default= TL_WRITE;
660
session_tx_isolation= (enum_tx_isolation) variables.tx_isolation;
662
memset(warn_count, 0, sizeof(warn_count));
665
memset(&status_var, 0, sizeof(status_var));
670
Init Session for query processing.
671
This has to be called once before we call mysql_parse.
672
See also comments in session.h.
675
void Session::init_for_queries()
678
ha_enable_transaction(this,true);
680
reset_root_defaults(mem_root, variables.query_alloc_block_size,
681
variables.query_prealloc_size);
682
reset_root_defaults(&transaction.mem_root,
683
variables.trans_alloc_block_size,
684
variables.trans_prealloc_size);
685
transaction.xid_state.xid.null();
686
transaction.xid_state.in_session=1;
336
690
/* Do operations that may take a long time */
338
692
void Session::cleanup(void)
340
assert(cleanup_done == false);
694
assert(cleanup_done == 0);
342
setKilled(KILL_CONNECTION);
696
killed= KILL_CONNECTION;
343
697
#ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE
344
698
if (transaction.xid_state.xa_state == XA_PREPARED)
350
TransactionServices &transaction_services= TransactionServices::singleton();
351
transaction_services.rollbackTransaction(this, true);
352
705
xid_cache_delete(&transaction.xid_state);
355
for (UserVars::iterator iter= user_vars.begin();
356
iter != user_vars.end();
359
user_var_entry *entry= (*iter).second;
709
lock=locked_tables; locked_tables=0;
710
close_thread_tables(this);
712
mysql_ha_cleanup(this);
713
hash_free(&user_vars);
365
714
close_temporary_tables();
367
716
if (global_read_lock)
369
unlockGlobalReadLock();
717
unlock_global_read_lock(this);
375
723
Session::~Session()
379
if (client->isConnected())
381
if (global_system_variables.log_warnings)
382
errmsg_printf(ERRMSG_LVL_WARN, ER(ER_FORCING_CLOSE),internal::my_progname,
384
(getSecurityContext().getUser().c_str() ?
385
getSecurityContext().getUser().c_str() : ""));
386
disconnect(0, false);
725
Session_CHECK_SENTRY(this);
726
/* Ensure that no one is using Session */
727
pthread_mutex_lock(&LOCK_delete);
728
pthread_mutex_unlock(&LOCK_delete);
729
add_to_status(&global_status_var, &status_var);
389
731
/* Close connection */
393
if (cleanup_done == false)
734
drizzleclient_net_close(&net);
735
drizzleclient_net_end(&net);
396
plugin::StorageEngine::closeConnection(this);
740
ha_close_connection(this);
397
741
plugin_sessionvar_cleanup(this);
399
warn_root.free_root(MYF(0));
748
free_root(&warn_root,MYF(0));
749
free_root(&transaction.mem_root,MYF(0));
400
750
mysys_var=0; // Safety (shouldn't be needed)
751
pthread_mutex_destroy(&LOCK_delete);
401
752
dbug_sentry= Session_SENTRY_GONE;
403
main_mem_root.free_root(MYF(0));
404
currentMemRoot().release();
405
currentSession().release();
407
plugin::Logging::postEndDo(this);
408
plugin::EventObserver::deregisterSessionEvents(*this);
410
for (PropertyMap::iterator iter= life_properties.begin(); iter != life_properties.end(); iter++)
412
delete (*iter).second;
414
life_properties.clear();
417
void Session::setClient(plugin::Client *client_arg)
420
client->setSession(this);
423
void Session::awake(Session::killed_state_t state_to_set)
425
if ((state_to_set == Session::KILL_QUERY) and (command == COM_SLEEP))
430
setKilled(state_to_set);
431
scheduler->killSession(this);
754
free_root(&main_mem_root, MYF(0));
755
pthread_setspecific(THR_Session, 0);
761
Add all status variables to another status variable array
765
to_var add to this array
766
from_var from this array
769
This function assumes that all variables are long/ulong.
770
If this assumption will change, then we have to explictely add
771
the other variables after the while loop
774
void add_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var)
776
ulong *end= (ulong*) ((unsigned char*) to_var +
777
offsetof(STATUS_VAR, last_system_status_var) +
779
ulong *to= (ulong*) to_var, *from= (ulong*) from_var;
786
Add the difference between two status variable arrays to another one.
790
to_var add to this array
791
from_var from this array
792
dec_var minus this array
795
This function assumes that all variables are long/ulong.
798
void add_diff_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var,
801
ulong *end= (ulong*) ((unsigned char*) to_var + offsetof(STATUS_VAR,
802
last_system_status_var) +
804
ulong *to= (ulong*) to_var, *from= (ulong*) from_var, *dec= (ulong*) dec_var;
807
*(to++)+= *(from++) - *(dec++);
811
void Session::awake(Session::killed_state state_to_set)
813
Session_CHECK_SENTRY(this);
814
safe_mutex_assert_owner(&LOCK_delete);
816
killed= state_to_set;
433
817
if (state_to_set != Session::KILL_QUERY)
435
DRIZZLE_CONNECTION_DONE(thread_id);
819
thread_scheduler.post_kill_notification(this);
440
boost_unique_lock_t scopedLock(mysys_var->mutex);
823
pthread_mutex_lock(&mysys_var->mutex);
824
if (!system_thread) // Don't abort locks
443
827
This broadcast could be up in the air if the victim thread
444
828
exits the cond in the time between read and broadcast, but that is
445
829
ok since all we want to do is to make the victim thread get out
480
867
assert(thread_stack);
482
currentSession().release();
483
currentSession().reset(this);
485
currentMemRoot().release();
486
currentMemRoot().reset(&mem_root);
869
if (pthread_setspecific(THR_Session, this) ||
870
pthread_setspecific(THR_Mem_root, &mem_root))
488
872
mysys_var=my_thread_var;
491
874
Let mysqld define the thread id (not mysys)
492
875
This allows us to move Session to different threads if needed.
494
877
mysys_var->id= thread_id;
878
real_id= pthread_self(); // For debugging
497
881
We have to call thr_lock_info_init() again here as Session may have been
498
882
created in another thread
506
Init Session for query processing.
507
This has to be called once before we call mysql_parse.
508
See also comments in session.h.
511
void Session::prepareForQueries()
513
if (variables.max_join_size == HA_POS_ERROR)
514
options |= OPTION_BIG_SELECTS;
516
version= refresh_version;
521
mem_root->reset_root_defaults(variables.query_alloc_block_size,
522
variables.query_prealloc_size);
523
transaction.xid_state.xid.null();
524
transaction.xid_state.in_session=1;
529
bool Session::initGlobals()
533
disconnect(ER_OUT_OF_RESOURCES, true);
534
status_var.aborted_connects++;
542
if (initGlobals() || authenticate())
550
while (not client->haveError() && getKilled() != KILL_CONNECTION)
552
if (not executeStatement())
559
bool Session::schedule(Session::shared_ptr &arg)
561
arg->scheduler= plugin::Scheduler::getScheduler();
562
assert(arg->scheduler);
564
connection_count.increment();
566
if (connection_count > current_global_counters.max_used_connections)
568
current_global_counters.max_used_connections= connection_count;
571
current_global_counters.connections++;
572
arg->thread_id= arg->variables.pseudo_thread_id= global_thread_id++;
574
session::Cache::singleton().insert(arg);
576
if (unlikely(plugin::EventObserver::connectSession(*arg)))
578
// We should do something about an error...
581
if (plugin::Scheduler::getScheduler()->addSession(arg))
583
DRIZZLE_CONNECTION_START(arg->getSessionId());
584
char error_message_buff[DRIZZLE_ERRMSG_SIZE];
586
arg->setKilled(Session::KILL_CONNECTION);
588
arg->status_var.aborted_connects++;
590
/* Can't use my_error() since store_globals has not been called. */
591
/* TODO replace will better error message */
592
snprintf(error_message_buff, sizeof(error_message_buff),
593
ER(ER_CANT_CREATE_THREAD), 1);
594
arg->client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
604
Is this session viewable by the current user?
606
bool Session::isViewable() const
608
return plugin::Authorization::isAuthorized(current_session->getSecurityContext(),
614
const char* Session::enter_cond(boost::condition_variable_any &cond, boost::mutex &mutex, const char* msg)
616
const char* old_msg = get_proc_info();
617
safe_mutex_assert_owner(mutex);
618
mysys_var->current_mutex = &mutex;
619
mysys_var->current_cond = &cond;
620
this->set_proc_info(msg);
624
void Session::exit_cond(const char* old_msg)
627
Putting the mutex unlock in exit_cond() ensures that
628
mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
629
locked (if that would not be the case, you'll get a deadlock if someone
630
does a Session::awake() on you).
632
mysys_var->current_mutex->unlock();
633
boost_unique_lock_t scopedLock(mysys_var->mutex);
634
mysys_var->current_mutex = 0;
635
mysys_var->current_cond = 0;
636
this->set_proc_info(old_msg);
639
bool Session::authenticate()
642
if (client->authenticate())
645
status_var.aborted_connects++;
650
bool Session::checkUser(const std::string &passwd_str,
651
const std::string &in_db)
653
bool is_authenticated=
654
plugin::Authentication::isAuthenticated(getSecurityContext(),
657
if (is_authenticated != true)
659
status_var.access_denied++;
660
/* isAuthenticated has pushed the error message */
664
/* Change database if necessary */
665
if (not in_db.empty())
667
SchemaIdentifier identifier(in_db);
668
if (mysql_change_db(this, identifier))
670
/* mysql_change_db() has pushed the error message. */
675
password= not passwd_str.empty();
677
/* Ready to handle queries */
681
bool Session::executeStatement()
684
uint32_t packet_length;
686
enum enum_server_command l_command;
689
indicator of uninitialized lex => normal flow of errors handling
692
lex->current_select= 0;
694
main_da.reset_diagnostics_area();
696
if (client->readCommand(&l_packet, &packet_length) == false)
701
if (getKilled() == KILL_CONNECTION)
704
if (packet_length == 0)
707
l_command= static_cast<enum_server_command>(l_packet[0]);
709
if (command >= COM_END)
710
command= COM_END; // Wrong command
712
assert(packet_length);
713
return not dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
716
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
718
/* Remove garbage at start and end of query */
719
while (in_packet_length > 0 && my_isspace(charset(), in_packet[0]))
724
const char *pos= in_packet + in_packet_length; /* Point at end null */
725
while (in_packet_length > 0 && (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
731
std::string *new_query= new std::string(in_packet, in_packet + in_packet_length);
732
// We can not be entirely sure _schema has a value
735
plugin::QueryRewriter::rewriteQuery(*_schema, *new_query);
737
query.reset(new_query);
738
_state.reset(new State(in_packet, in_packet_length));
743
bool Session::endTransaction(enum enum_mysql_completiontype completion)
747
TransactionServices &transaction_services= TransactionServices::singleton();
749
if (transaction.xid_state.xa_state != XA_NOTR)
751
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
758
* We don't use endActiveTransaction() here to ensure that this works
759
* even if there is a problem with the OPTION_AUTO_COMMIT flag
760
* (Which of course should never happen...)
762
server_status&= ~SERVER_STATUS_IN_TRANS;
763
if (transaction_services.commitTransaction(this, true))
765
options&= ~(OPTION_BEGIN);
768
do_release= 1; /* fall through */
769
case COMMIT_AND_CHAIN:
770
result= endActiveTransaction();
771
if (result == true && completion == COMMIT_AND_CHAIN)
772
result= startTransaction();
774
case ROLLBACK_RELEASE:
775
do_release= 1; /* fall through */
777
case ROLLBACK_AND_CHAIN:
779
server_status&= ~SERVER_STATUS_IN_TRANS;
780
if (transaction_services.rollbackTransaction(this, true))
782
options&= ~(OPTION_BEGIN);
783
if (result == true && (completion == ROLLBACK_AND_CHAIN))
784
result= startTransaction();
788
my_error(ER_UNKNOWN_COM_ERROR, MYF(0));
794
my_error(killed_errno(), MYF(0));
796
else if ((result == true) && do_release)
798
setKilled(Session::KILL_CONNECTION);
804
bool Session::endActiveTransaction()
807
TransactionServices &transaction_services= TransactionServices::singleton();
809
if (transaction.xid_state.xa_state != XA_NOTR)
811
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
814
if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
816
server_status&= ~SERVER_STATUS_IN_TRANS;
817
if (transaction_services.commitTransaction(this, true))
820
options&= ~(OPTION_BEGIN);
824
bool Session::startTransaction(start_transaction_option_t opt)
828
if (! endActiveTransaction())
834
options|= OPTION_BEGIN;
835
server_status|= SERVER_STATUS_IN_TRANS;
837
if (plugin::TransactionalStorageEngine::notifyStartTransaction(this, opt))
884
thr_lock_info_init(&lock_info);
893
Session::cleanup_after_query()
896
This function is used to reset thread data to its default state.
899
This function is not suitable for setting thread data to some
900
non-default values, as there is only one replication thread, so
901
different master threads may overwrite data of each other on
846
905
void Session::cleanup_after_query()
884
939
@return NULL on failure, or pointer to the LEX_STRING object
886
941
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
887
const std::string &str,
888
bool allocate_lex_string)
890
return make_lex_string(lex_str, str.c_str(), str.length(), allocate_lex_string);
893
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
894
const char* str, uint32_t length,
895
bool allocate_lex_string)
942
const char* str, uint32_t length,
943
bool allocate_lex_string)
897
945
if (allocate_lex_string)
898
946
if (!(lex_str= (LEX_STRING *)alloc(sizeof(LEX_STRING))))
900
if (!(lex_str->str= mem_root->strmake_root(str, length)))
948
if (!(lex_str->str= strmake_root(mem_root, str, length)))
902
950
lex_str->length= length;
956
Convert a string to another character set
960
to Store new allocated string here
961
to_cs New character set for allocated string
962
from String to convert
963
from_length Length of string to convert
964
from_cs Original character set
967
to will be 0-terminated to make it easy to pass to system funcs
972
In this case to->str will point to 0 and to->length will be 0.
975
bool Session::convert_string(LEX_STRING *to, const CHARSET_INFO * const to_cs,
976
const char *from, uint32_t from_length,
977
const CHARSET_INFO * const from_cs)
979
size_t new_length= to_cs->mbmaxlen * from_length;
980
uint32_t dummy_errors;
981
if (!(to->str= (char*) alloc(new_length+1)))
983
to->length= 0; // Safety fix
986
to->length= copy_and_convert((char*) to->str, new_length, to_cs,
987
from, from_length, from_cs, &dummy_errors);
988
to->str[to->length]=0; // Safety
994
Convert string from source character set to target character set inplace.
997
Session::convert_string
1000
Convert string using convert_buffer - buffer for character set
1001
conversion shared between all protocols.
1008
bool Session::convert_string(String *s, const CHARSET_INFO * const from_cs,
1009
const CHARSET_INFO * const to_cs)
1011
uint32_t dummy_errors;
1012
if (convert_buffer.copy(s->ptr(), s->length(), from_cs, to_cs, &dummy_errors))
1014
/* If convert_buffer >> s copying is more efficient long term */
1015
if (convert_buffer.alloced_length() >= convert_buffer.length() * 2 ||
1018
return s->copy(convert_buffer);
1020
s->swap(convert_buffer);
1026
Update some cache variables when character set changes
1029
void Session::update_charset()
1032
charset_is_system_charset= !String::needs_conversion(0,charset(),
1033
system_charset_info,
1035
charset_is_collation_connection=
1036
!String::needs_conversion(0,charset(),variables.getCollation(),
1038
charset_is_character_set_filesystem=
1039
!String::needs_conversion(0, charset(),
1040
variables.character_set_filesystem, ¬_used);
1044
/* routings to adding tables to list of changed in transaction tables */
1046
inline static void list_include(CHANGED_TableList** prev,
1047
CHANGED_TableList* curr,
1048
CHANGED_TableList* new_table)
1053
(*prev)->next = curr;
1057
/* add table to list of changed in transaction tables */
1059
void Session::add_changed_table(Table *table)
1061
assert((options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
1062
table->file->has_transactions());
1063
add_changed_table(table->s->table_cache_key.str,
1064
(long) table->s->table_cache_key.length);
1069
void Session::add_changed_table(const char *key, long key_length)
1071
CHANGED_TableList **prev_changed = &transaction.changed_tables;
1072
CHANGED_TableList *curr = transaction.changed_tables;
1074
for (; curr; prev_changed = &(curr->next), curr = curr->next)
1076
int cmp = (long)curr->key_length - (long)key_length;
1079
list_include(prev_changed, curr, changed_table_dup(key, key_length));
1084
cmp = memcmp(curr->key, key, curr->key_length);
1087
list_include(prev_changed, curr, changed_table_dup(key, key_length));
1096
*prev_changed = changed_table_dup(key, key_length);
1101
CHANGED_TableList* Session::changed_table_dup(const char *key, long key_length)
1103
CHANGED_TableList* new_table =
1104
(CHANGED_TableList*) trans_alloc(ALIGN_SIZE(sizeof(CHANGED_TableList))+
1108
my_error(EE_OUTOFMEMORY, MYF(ME_BELL),
1109
ALIGN_SIZE(sizeof(TableList)) + key_length + 1);
1110
killed= KILL_CONNECTION;
1114
new_table->key= ((char*)new_table)+ ALIGN_SIZE(sizeof(CHANGED_TableList));
1115
new_table->next = 0;
1116
new_table->key_length = key_length;
1117
::memcpy(new_table->key, key, key_length);
906
1122
int Session::send_explain_fields(select_result *result)
908
1124
List<Item> field_list;
1038
static int create_file(Session *session,
1039
fs::path &target_path,
1040
file_exchange *exchange,
1041
internal::IO_CACHE *cache)
1309
static File create_file(Session *session, char *path, file_exchange *exchange, IO_CACHE *cache)
1043
fs::path to_file(exchange->file_name);
1046
if (not to_file.has_root_directory())
1312
uint32_t option= MY_UNPACK_FILENAME | MY_RELATIVE_PATH;
1314
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
1315
option|= MY_REPLACE_DIR; // Force use of db directory
1318
if (!dirname_length(exchange->file_name))
1048
target_path= fs::system_complete(getDataHomeCatalog());
1049
util::string::const_shared_ptr schema(session->schema());
1050
if (schema and not schema->empty())
1052
int count_elements= 0;
1053
for (fs::path::iterator iter= to_file.begin();
1054
iter != to_file.end();
1055
++iter, ++count_elements)
1058
if (count_elements == 1)
1060
target_path /= *schema;
1063
target_path /= to_file;
1320
strcpy(path, drizzle_real_data_home);
1322
strncat(path, session->db, FN_REFLEN-strlen(drizzle_real_data_home)-1);
1323
(void) fn_format(path, exchange->file_name, path, "", option);
1067
target_path = exchange->file_name;
1070
if (not secure_file_priv.string().empty())
1072
if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
1074
/* Write only allowed to dir or subdir specified by secure_file_priv */
1075
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
1080
if (!access(target_path.file_string().c_str(), F_OK))
1326
(void) fn_format(path, exchange->file_name, drizzle_real_data_home, "", option);
1328
if (opt_secure_file_priv &&
1329
strncmp(opt_secure_file_priv, path, strlen(opt_secure_file_priv)))
1331
/* Write only allowed to dir or subdir specified by secure_file_priv */
1332
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
1336
if (!access(path, F_OK))
1082
1338
my_error(ER_FILE_EXISTS_ERROR, MYF(0), exchange->file_name);
1085
1341
/* Create the file world readable */
1086
if ((file= internal::my_create(target_path.file_string().c_str(), 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
1342
if ((file= my_create(path, 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
1088
1345
(void) fchmod(file, 0666); // Because of umask()
1089
if (cache->init_io_cache(file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1347
(void) chmod(path, 0666);
1349
if (init_io_cache(cache, file, 0L, WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1091
internal::my_close(file, MYF(0));
1092
internal::my_delete(target_path.file_string().c_str(), MYF(0)); // Delete file on error, it was just created
1351
my_close(file, MYF(0));
1352
my_delete(path, MYF(0)); // Delete file on error, it was just created
1276
1540
assert before the loop makes that sure.
1279
if ((needs_escaping(*pos, enclosed) ||
1543
if ((NEED_ESCAPING(*pos) ||
1280
1544
(check_second_byte &&
1281
1545
my_mbcharlen(character_set_client, (unsigned char) *pos) == 2 &&
1282
1546
pos + 1 < end &&
1283
needs_escaping(pos[1], enclosed))) &&
1547
NEED_ESCAPING(pos[1]))) &&
1285
Don't escape field_term_char by doubling - doubling is only
1286
valid for ENCLOSED BY characters:
1549
Don't escape field_term_char by doubling - doubling is only
1550
valid for ENCLOSED BY characters:
1288
1552
(enclosed || !is_ambiguous_field_term ||
1289
1553
(int) (unsigned char) *pos != field_term_char))
1292
1556
tmp_buff[0]= ((int) (unsigned char) *pos == field_sep_char &&
1293
1557
is_ambiguous_field_sep) ?
1294
field_sep_char : escape_char;
1295
tmp_buff[1]= *pos ? *pos : '0';
1296
if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)) ||
1297
my_b_write(cache,(unsigned char*) tmp_buff,2))
1302
if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)))
1558
field_sep_char : escape_char;
1559
tmp_buff[1]= *pos ? *pos : '0';
1560
if (my_b_write(&cache,(unsigned char*) start,(uint32_t) (pos-start)) ||
1561
my_b_write(&cache,(unsigned char*) tmp_buff,2))
1566
if (my_b_write(&cache,(unsigned char*) start,(uint32_t) (pos-start)))
1305
else if (my_b_write(cache,(unsigned char*) res->ptr(),used_length))
1569
else if (my_b_write(&cache,(unsigned char*) res->ptr(),used_length))
1308
1572
if (fixed_row_size)
1309
1573
{ // Fill with space
1310
1574
if (item->max_length > used_length)
1312
/* QQ: Fix by adding a my_b_fill() function */
1316
memset(space, ' ', sizeof(space));
1318
uint32_t length=item->max_length-used_length;
1319
for (; length > sizeof(space) ; length-=sizeof(space))
1321
if (my_b_write(cache,(unsigned char*) space,sizeof(space)))
1324
if (my_b_write(cache,(unsigned char*) space,length))
1576
/* QQ: Fix by adding a my_b_fill() function */
1580
memset(space, ' ', sizeof(space));
1582
uint32_t length=item->max_length-used_length;
1583
for (; length > sizeof(space) ; length-=sizeof(space))
1585
if (my_b_write(&cache,(unsigned char*) space,sizeof(space)))
1588
if (my_b_write(&cache,(unsigned char*) space,length))
1328
1592
if (res && enclosed)
1330
if (my_b_write(cache, (unsigned char*) exchange->enclosed->ptr(),
1594
if (my_b_write(&cache, (unsigned char*) exchange->enclosed->ptr(),
1331
1595
exchange->enclosed->length()))
1334
1598
if (--items_left)
1336
if (my_b_write(cache, (unsigned char*) exchange->field_term->ptr(),
1600
if (my_b_write(&cache, (unsigned char*) exchange->field_term->ptr(),
1337
1601
field_term_length))
1341
if (my_b_write(cache,(unsigned char*) exchange->line_term->ptr(),
1342
exchange->line_term->length()))
1605
if (my_b_write(&cache,(unsigned char*) exchange->line_term->ptr(),
1606
exchange->line_term->length()))
1615
1914
memset(&status_var, 0, sizeof(status_var));
1619
void Session::set_db(const std::string &new_db)
1917
void Security_context::skip_grants()
1919
/* privileges for the user are unknown everything is allowed */
1923
/****************************************************************************
1924
Handling of open and locked tables states.
1926
This is used when we want to open/lock (and then close) some tables when
1927
we already have a set of tables open and locked. We use these methods for
1928
access to mysql.proc table to find definitions of stored routines.
1929
****************************************************************************/
1931
void Session::reset_n_backup_open_tables_state(Open_tables_state *backup)
1933
backup->set_open_tables_state(this);
1934
reset_open_tables_state();
1935
state_flags|= Open_tables_state::BACKUPS_AVAIL;
1940
void Session::restore_backup_open_tables_state(Open_tables_state *backup)
1943
Before we will throw away current open tables state we want
1944
to be sure that it was properly cleaned up.
1946
assert(open_tables == 0 && temporary_tables == 0 &&
1947
handler_tables == 0 && derived_tables == 0 &&
1948
lock == 0 && locked_tables == 0);
1949
set_open_tables_state(backup);
1954
bool Session::set_db(const char *new_db, size_t new_db_len)
1621
1956
/* Do not reallocate memory if current chunk is big enough. */
1622
if (new_db.length())
1624
_schema.reset(new std::string(new_db));
1957
if (db && new_db && db_length >= new_db_len)
1958
memcpy(db, new_db, new_db_len+1);
1628
_schema.reset(new std::string(""));
1965
db= (char *)malloc(new_db_len + 1);
1968
memcpy(db, new_db, new_db_len);
1975
db_length= db ? new_db_len : 0;
1976
return new_db && !db;
1981
Check the killed state of a user thread
1982
@param session user thread
1983
@retval 0 the user thread is active
1984
@retval 1 the user thread has been killed
1986
extern "C" int session_killed(const Session *session)
1988
return(session->killed);
1992
Return the thread id of a user thread
1993
@param session user thread
1996
extern "C" unsigned long session_get_thread_id(const Session *session)
1998
return((unsigned long)session->thread_id);
2003
LEX_STRING *session_make_lex_string(Session *session, LEX_STRING *lex_str,
2004
const char *str, unsigned int size,
2005
int allocate_lex_string)
2007
return session->make_lex_string(lex_str, str, size,
2008
(bool) allocate_lex_string);
2011
extern "C" const struct charset_info_st *session_charset(Session *session)
2013
return(session->charset());
2016
extern "C" char **session_query(Session *session)
2018
return(&session->query);
2021
extern "C" int session_non_transactional_update(const Session *session)
2023
return(session->transaction.all.modified_non_trans_table);
2026
extern "C" void session_mark_transaction_to_rollback(Session *session, bool all)
2028
mark_transaction_to_rollback(session, all);
1644
2044
session->transaction_rollback_request= all;
1648
void Session::disconnect(uint32_t errcode, bool should_lock)
1650
/* Allow any plugins to cleanup their session variables */
1651
plugin_sessionvar_cleanup(this);
1653
/* If necessary, log any aborted or unauthorized connections */
1654
if (getKilled() || client->wasAborted())
1656
status_var.aborted_threads++;
1659
if (client->wasAborted())
1661
if (not getKilled() && variables.log_warnings > 1)
1663
SecurityContext *sctx= &security_ctx;
1665
errmsg_printf(ERRMSG_LVL_WARN, ER(ER_NEW_ABORTING_CONNECTION)
1667
, (_schema->empty() ? "unconnected" : _schema->c_str())
1668
, sctx->getUser().empty() == false ? sctx->getUser().c_str() : "unauthenticated"
1669
, sctx->getIp().c_str()
1670
, (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1674
/* Close out our connection to the client */
2047
/***************************************************************************
2048
Handling of XA id cacheing
2049
***************************************************************************/
2051
pthread_mutex_t LOCK_xid_cache;
2054
extern "C" unsigned char *xid_get_hash_key(const unsigned char *, size_t *, bool);
2055
extern "C" void xid_free_hash(void *);
2057
unsigned char *xid_get_hash_key(const unsigned char *ptr, size_t *length,
2060
*length=((XID_STATE*)ptr)->xid.key_length();
2061
return ((XID_STATE*)ptr)->xid.key();
2064
void xid_free_hash(void *ptr)
2066
if (!((XID_STATE*)ptr)->in_session)
2067
free((unsigned char*)ptr);
2070
bool xid_cache_init()
2072
pthread_mutex_init(&LOCK_xid_cache, MY_MUTEX_INIT_FAST);
2073
return hash_init(&xid_cache, &my_charset_bin, 100, 0, 0,
2074
xid_get_hash_key, xid_free_hash, 0) != 0;
2077
void xid_cache_free()
2079
if (hash_inited(&xid_cache))
2081
hash_free(&xid_cache);
2082
pthread_mutex_destroy(&LOCK_xid_cache);
2086
XID_STATE *xid_cache_search(XID *xid)
2088
pthread_mutex_lock(&LOCK_xid_cache);
2089
XID_STATE *res=(XID_STATE *)hash_search(&xid_cache, xid->key(), xid->key_length());
2090
pthread_mutex_unlock(&LOCK_xid_cache);
2095
bool xid_cache_insert(XID *xid, enum xa_states xa_state)
2099
pthread_mutex_lock(&LOCK_xid_cache);
2100
if (hash_search(&xid_cache, xid->key(), xid->key_length()))
2102
else if (!(xs=(XID_STATE *)malloc(sizeof(*xs))))
2106
xs->xa_state=xa_state;
2109
res=my_hash_insert(&xid_cache, (unsigned char*)xs);
2111
pthread_mutex_unlock(&LOCK_xid_cache);
2116
bool xid_cache_insert(XID_STATE *xid_state)
2118
pthread_mutex_lock(&LOCK_xid_cache);
2119
assert(hash_search(&xid_cache, xid_state->xid.key(),
2120
xid_state->xid.key_length())==0);
2121
bool res=my_hash_insert(&xid_cache, (unsigned char*)xid_state);
2122
pthread_mutex_unlock(&LOCK_xid_cache);
2127
void xid_cache_delete(XID_STATE *xid_state)
2129
pthread_mutex_lock(&LOCK_xid_cache);
2130
hash_delete(&xid_cache, (unsigned char *)xid_state);
2131
pthread_mutex_unlock(&LOCK_xid_cache);
2136
Class to handle temporary allocation of memory for row data.
2138
The responsibilities of the class is to provide memory for
2139
packing one or two rows of packed data (depending on what
2140
constructor is called).
2142
In order to make the allocation more efficient for "simple" rows,
2143
i.e., rows that do not contain any blobs, a pointer to the
2144
allocated memory is of memory is stored in the table structure
2145
for simple rows. If memory for a table containing a blob field
2146
is requested, only memory for that is allocated, and subsequently
2147
released when the object is destroyed.
2150
class Row_data_memory {
2153
Build an object to keep track of a block-local piece of memory
2154
for storing a row of data.
2157
Table where the pre-allocated memory is stored.
2160
Length of data that is needed, if the record contain blobs.
2162
Row_data_memory(Table *table, size_t const len1)
2165
m_alloc_checked= false;
2166
allocate_memory(table, len1);
2167
m_ptr[0]= has_memory() ? m_memory : 0;
2171
Row_data_memory(Table *table, size_t const len1, size_t const len2)
2174
m_alloc_checked= false;
2175
allocate_memory(table, len1 + len2);
2176
m_ptr[0]= has_memory() ? m_memory : 0;
2177
m_ptr[1]= has_memory() ? m_memory + len1 : 0;
2182
if (m_memory != 0 && m_release_memory_on_destruction)
2183
free((unsigned char*) m_memory);
2187
Is there memory allocated?
2189
@retval true There is memory allocated
2190
@retval false Memory allocation failed
2192
bool has_memory() const {
2193
m_alloc_checked= true;
2194
return m_memory != 0;
2197
unsigned char *slot(uint32_t s)
2199
assert(s < sizeof(m_ptr)/sizeof(*m_ptr));
2200
assert(m_ptr[s] != 0);
2201
assert(m_alloc_checked == true);
2206
void allocate_memory(Table *const table, size_t const total_length)
2208
if (table->s->blob_fields == 0)
2211
The maximum length of a packed record is less than this
2212
length. We use this value instead of the supplied length
2213
when allocating memory for records, since we don't know how
2214
the memory will be used in future allocations.
2216
Since table->s->reclength is for unpacked records, we have
2217
to add two bytes for each field, which can potentially be
2218
added to hold the length of a packed field.
2220
size_t const maxlen= table->s->reclength + 2 * table->s->fields;
2223
Allocate memory for two records if memory hasn't been
2224
allocated. We allocate memory for two records so that it can
2225
be used when processing update rows as well.
2227
if (table->write_row_record == 0)
2228
table->write_row_record=
2229
(unsigned char *) alloc_root(&table->mem_root, 2 * maxlen);
2230
m_memory= table->write_row_record;
2231
m_release_memory_on_destruction= false;
2235
m_memory= (unsigned char *) malloc(total_length);
2236
m_release_memory_on_destruction= true;
2240
mutable bool m_alloc_checked;
2241
bool m_release_memory_on_destruction;
2242
unsigned char *m_memory;
2243
unsigned char *m_ptr[2];
2250
@param session Thread handle
2251
@param errcode Error code to print to console
2252
@param should_lock 1 if we have have to lock LOCK_thread_count
2255
For the connection that is doing shutdown, this is called twice
2257
void Session::close_connection(uint32_t errcode, bool should_lock)
1675
2260
if (should_lock)
1676
session::Cache::singleton().mutex().lock();
1678
setKilled(Session::KILL_CONNECTION);
1680
if (client->isConnected())
2261
(void) pthread_mutex_lock(&LOCK_thread_count);
2262
killed= Session::KILL_CONNECTION;
2263
if ((vio= net.vio) != 0)
1684
/*my_error(errcode, ER(errcode));*/
1685
client->sendError(errcode, ER(errcode));
2266
net_send_error(this, errcode, ER(errcode)); /* purecov: inspected */
2267
drizzleclient_net_close(&net); /* vio is freed in delete session */
1690
2269
if (should_lock)
1692
session::Cache::singleton().mutex().unlock();
2270
(void) pthread_mutex_unlock(&LOCK_thread_count);
2276
Reset Session part responsible for command processing state.
2278
This needs to be called before execution of every statement
2279
(prepared or conventional).
2280
It is not called by substatements of routines.
2283
Make it a method of Session and align its name with the rest of
2284
reset/end/start/init methods.
2286
Call it after we use Session for queries, not before.
1696
2289
void Session::reset_for_next_command()
1704
2297
auto_inc_intervals_in_cur_stmt_for_binlog.empty();
1706
is_fatal_error= false;
1707
2300
server_status&= ~ (SERVER_MORE_RESULTS_EXISTS |
1708
2301
SERVER_QUERY_NO_INDEX_USED |
1709
2302
SERVER_QUERY_NO_GOOD_INDEX_USED);
2304
If in autocommit mode and not in a transaction, reset
2305
OPTION_STATUS_NO_TRANS_UPDATE | OPTION_KEEP_LOG to not get warnings
2306
in ha_rollback_trans() about some tables couldn't be rolled back.
2308
if (!(options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
2310
options&= ~OPTION_KEEP_LOG;
2311
transaction.all.modified_non_trans_table= false;
2313
thread_specific_used= false;
1712
2316
main_da.reset_diagnostics_area();
1713
2317
total_warn_count=0; // Warnings for this query
1714
2318
sent_row_count= examined_row_count= 0;
2325
return true if the table was created explicitly.
2327
inline bool is_user_table(Table * table)
2329
const char *name= table->s->table_name.str;
2330
return strncmp(name, TMP_FILE_PREFIX, TMP_FILE_PREFIX_LENGTH);
1718
2334
Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
2335
creates one DROP TEMPORARY Table binlog event for each pseudo-thread
1721
void Open_tables_state::close_temporary_tables()
2338
void Session::close_temporary_tables()
1724
2341
Table *tmp_next;
1726
if (not temporary_tables)
2343
if (!temporary_tables)
1729
2346
for (table= temporary_tables; table; table= tmp_next)
1731
tmp_next= table->getNext();
1734
temporary_tables= NULL;
1738
unlink from session->temporary tables and close temporary table
1741
void Open_tables_state::close_temporary_table(Table *table)
1743
if (table->getPrev())
1745
table->getPrev()->setNext(table->getNext());
1746
if (table->getPrev()->getNext())
1748
table->getNext()->setPrev(table->getPrev());
1753
/* removing the item from the list */
1754
assert(table == temporary_tables);
1756
slave must reset its temporary list pointer to zero to exclude
1757
passing non-zero value to end_slave via rli->save_temporary_tables
1758
when no temp tables opened, see an invariant below.
1760
temporary_tables= table->getNext();
1761
if (temporary_tables)
1763
table->getNext()->setPrev(NULL);
1770
Close and drop a temporary table
1773
This dosn't unlink table from session->temporary
1774
If this is needed, use close_temporary_table()
1777
void Open_tables_state::nukeTable(Table *table)
1779
plugin::StorageEngine *table_type= table->getShare()->db_type();
1781
table->free_io_cache();
1782
table->delete_table();
1784
TableIdentifier identifier(table->getShare()->getSchemaName(), table->getShare()->getTableName(), table->getShare()->getPath());
1785
rm_temporary_table(table_type, identifier);
1787
delete table->getMutableShare();
1789
/* This makes me sad, but we're allocating it via malloc */
2348
tmp_next= table->next;
2349
close_temporary(table, 1, 1);
2351
temporary_tables= 0;
1793
2357
/** Clear most status variables. */
1794
2358
extern time_t flush_status_time;
2359
extern uint32_t max_used_connections;
1796
2361
void Session::refresh_status()
2363
pthread_mutex_lock(&LOCK_status);
2365
/* Add thread's status variabes to global status */
2366
add_to_status(&global_status_var, &status_var);
1798
2368
/* Reset thread's status variables */
1799
2369
memset(&status_var, 0, sizeof(status_var));
2371
/* Reset some global variables */
2372
reset_status_vars();
2374
/* Reset the counters of all key caches (default and named). */
2375
process_key_caches(reset_key_cache_counters);
1801
2376
flush_status_time= time((time_t*) 0);
1802
current_global_counters.max_used_connections= 1; /* We set it to one, because we know we exist */
1803
current_global_counters.connections= 0;
1806
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1808
return getVariable(std::string(name.str, name.length), create_if_not_exists);
1811
user_var_entry *Session::getVariable(const std::string &name, bool create_if_not_exists)
1813
UserVarsRange ppp= user_vars.equal_range(name);
1815
for (UserVars::iterator iter= ppp.first;
1816
iter != ppp.second; ++iter)
1818
return (*iter).second;
1821
if (not create_if_not_exists)
1824
user_var_entry *entry= NULL;
1825
entry= new (nothrow) user_var_entry(name.c_str(), query_id);
1830
std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(name, entry));
1832
if (not returnable.second)
1840
void Session::setVariable(const std::string &name, const std::string &value)
1842
user_var_entry *updateable_var= getVariable(name.c_str(), true);
1844
updateable_var->update_hash(false,
1845
(void*)value.c_str(),
1846
static_cast<uint32_t>(value.length()), STRING_RESULT,
1848
DERIVATION_IMPLICIT, false);
1851
void Open_tables_state::mark_temp_tables_as_free_for_reuse()
1853
for (Table *table= temporary_tables ; table ; table= table->getNext())
1855
if (table->query_id == getQueryId())
1858
table->cursor->ha_reset();
1863
void Session::mark_used_tables_as_free_for_reuse(Table *table)
1865
for (; table ; table= table->getNext())
1867
if (table->query_id == getQueryId())
1870
table->cursor->ha_reset();
1876
Unlocks tables and frees derived tables.
1877
Put all normal tables used by thread in free list.
1879
It will only close/mark as free for reuse tables opened by this
1880
substatement, it will also check if we are closing tables after
1881
execution of complete query (i.e. we are on upper level) and will
1882
leave prelocked mode if needed.
1884
void Session::close_thread_tables()
1886
clearDerivedTables();
1889
Mark all temporary tables used by this statement as free for reuse.
1891
mark_temp_tables_as_free_for_reuse();
1893
Let us commit transaction for statement. Since in 5.0 we only have
1894
one statement transaction and don't allow several nested statement
1895
transactions this call will do nothing if we are inside of stored
1896
function or trigger (i.e. statement transaction is already active and
1897
does not belong to statement for which we do close_thread_tables()).
1898
TODO: This should be fixed in later releases.
1901
TransactionServices &transaction_services= TransactionServices::singleton();
1902
main_da.can_overwrite_status= true;
1903
transaction_services.autocommitOrRollback(this, is_error());
1904
main_da.can_overwrite_status= false;
1905
transaction.stmt.reset();
1911
For RBR we flush the pending event just before we unlock all the
1912
tables. This means that we are at the end of a topmost
1913
statement, so we ensure that the STMT_END_F flag is set on the
1914
pending event. For statements that are *inside* stored
1915
functions, the pending event will not be flushed: that will be
1916
handled either before writing a query log event (inside
1917
binlog_query()) or when preparing a pending event.
1923
Note that we need to hold table::Cache::singleton().mutex() while changing the
1924
open_tables list. Another thread may work on it.
1925
(See: table::Cache::singleton().removeTable(), mysql_wait_completed_table())
1926
Closing a MERGE child before the parent would be fatal if the
1927
other thread tries to abort the MERGE lock in between.
1930
close_open_tables();
1933
void Session::close_tables_for_reopen(TableList **tables)
1936
If table list consists only from tables from prelocking set, table list
1937
for new attempt should be empty, so we have to update list's root pointer.
1939
if (lex->first_not_own_table() == *tables)
1941
lex->chop_off_not_own_tables();
1942
for (TableList *tmp= *tables; tmp; tmp= tmp->next_global)
1944
close_thread_tables();
1947
bool Session::openTablesLock(TableList *tables)
1954
if (open_tables_from_list(&tables, &counter))
1957
if (not lock_tables(tables, counter, &need_reopen))
1959
if (not need_reopen)
1961
close_tables_for_reopen(&tables);
1963
if ((mysql_handle_derived(lex, &mysql_derived_prepare) ||
1965
mysql_handle_derived(lex, &mysql_derived_filling))))
1972
@note "best_effort" is used in cases were if a failure occurred on this
1973
operation it would not be surprising because we are only removing because there
1974
might be an issue (lame engines).
1977
bool Open_tables_state::rm_temporary_table(const TableIdentifier &identifier, bool best_effort)
1979
if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), identifier))
1981
if (not best_effort)
1984
identifier.getSQLPath(path);
1985
errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
1986
path.c_str(), errno);
1995
bool Open_tables_state::rm_temporary_table(plugin::StorageEngine *base, const TableIdentifier &identifier)
1999
if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), *base, identifier))
2002
identifier.getSQLPath(path);
2003
errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
2004
path.c_str(), errno);
2013
@note this will be removed, I am looking through Hudson to see if it is finding
2014
any tables that are missed during cleanup.
2016
void Open_tables_state::dumpTemporaryTableNames(const char *foo)
2020
if (not temporary_tables)
2023
cerr << "Begin Run: " << foo << "\n";
2024
for (table= temporary_tables; table; table= table->getNext())
2026
bool have_proto= false;
2028
message::Table *proto= table->getShare()->getTableProto();
2029
if (table->getShare()->getTableProto())
2032
const char *answer= have_proto ? "true" : "false";
2036
cerr << "\tTable Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2037
cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
2041
cerr << "\tTabl;e Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2046
bool Session::TableMessages::storeTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2048
table_message_cache.insert(make_pair(identifier.getPath(), table_message));
2053
bool Session::TableMessages::removeTableMessage(const TableIdentifier &identifier)
2055
TableMessageCache::iterator iter;
2057
iter= table_message_cache.find(identifier.getPath());
2059
if (iter == table_message_cache.end())
2062
table_message_cache.erase(iter);
2067
bool Session::TableMessages::getTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2069
TableMessageCache::iterator iter;
2071
iter= table_message_cache.find(identifier.getPath());
2073
if (iter == table_message_cache.end())
2076
table_message.CopyFrom(((*iter).second));
2081
bool Session::TableMessages::doesTableMessageExist(const TableIdentifier &identifier)
2083
TableMessageCache::iterator iter;
2085
iter= table_message_cache.find(identifier.getPath());
2087
if (iter == table_message_cache.end())
2095
bool Session::TableMessages::renameTableMessage(const TableIdentifier &from, const TableIdentifier &to)
2097
TableMessageCache::iterator iter;
2099
table_message_cache[to.getPath()]= table_message_cache[from.getPath()];
2101
iter= table_message_cache.find(to.getPath());
2103
if (iter == table_message_cache.end())
2108
(*iter).second.set_schema(to.getSchemaName());
2109
(*iter).second.set_name(to.getTableName());
2114
table::Instance *Session::getInstanceTable()
2116
temporary_shares.push_back(new table::Instance()); // This will not go into the tableshare cache, so no key is used.
2118
table::Instance *tmp_share= temporary_shares.back();
2127
Create a reduced Table object with properly set up Field list from a
2128
list of field definitions.
2130
The created table doesn't have a table Cursor associated with
2131
it, has no keys, no group/distinct, no copy_funcs array.
2132
The sole purpose of this Table object is to use the power of Field
2133
class to read/write data to/from table->getInsertRecord(). Then one can store
2134
the record in any container (RB tree, hash, etc).
2135
The table is created in Session mem_root, so are the table's fields.
2136
Consequently, if you don't BLOB fields, you don't need to free it.
2138
@param session connection handle
2139
@param field_list list of column definitions
2142
0 if out of memory, Table object in case of success
2144
table::Instance *Session::getInstanceTable(List<CreateField> &field_list)
2146
temporary_shares.push_back(new table::Instance(this, field_list)); // This will not go into the tableshare cache, so no key is used.
2148
table::Instance *tmp_share= temporary_shares.back();
2157
static const std::string NONE= "NONE";
2158
static const std::string GOT_GLOBAL_READ_LOCK= "HAS GLOBAL READ LOCK";
2159
static const std::string MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT= "HAS GLOBAL READ LOCK WITH BLOCKING COMMIT";
2161
const std::string &type(drizzled::Session::global_read_lock_t type)
2167
case Session::GOT_GLOBAL_READ_LOCK:
2168
return GOT_GLOBAL_READ_LOCK;
2169
case Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT:
2170
return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT;
2174
size_t max_string_length(drizzled::Session::global_read_lock_t)
2176
return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT.size();
2179
} /* namespace display */
2181
} /* namespace drizzled */
2377
max_used_connections= 1; /* We set it to one, because we know we exist */
2378
pthread_mutex_unlock(&LOCK_status);