21
21
* @file Implementation of the Session class and API
25
#include "drizzled/session.h"
26
#include "drizzled/session/cache.h"
24
#include <drizzled/server_includes.h>
25
#include <drizzled/session.h>
27
26
#include <sys/stat.h>
28
#include "drizzled/error.h"
29
#include "drizzled/gettext.h"
30
#include "drizzled/query_id.h"
31
#include "drizzled/data_home.h"
32
#include "drizzled/sql_base.h"
33
#include "drizzled/lock.h"
34
#include "drizzled/item/cache.h"
35
#include "drizzled/item/float.h"
36
#include "drizzled/item/return_int.h"
37
#include "drizzled/item/empty_string.h"
38
#include "drizzled/show.h"
39
#include "drizzled/plugin/client.h"
40
#include "drizzled/plugin/scheduler.h"
41
#include "drizzled/plugin/authentication.h"
42
#include "drizzled/plugin/logging.h"
43
#include "drizzled/plugin/transactional_storage_engine.h"
44
#include "drizzled/plugin/query_rewrite.h"
45
#include "drizzled/probes.h"
46
#include "drizzled/table_proto.h"
47
#include "drizzled/db.h"
48
#include "drizzled/pthread_globals.h"
49
#include "drizzled/transaction_services.h"
50
#include "drizzled/drizzled.h"
52
#include "drizzled/table/instance.h"
54
#include "plugin/myisam/myisam.h"
55
#include "drizzled/internal/iocache.h"
56
#include "drizzled/internal/thread_var.h"
57
#include "drizzled/plugin/event_observer.h"
59
#include "drizzled/util/functors.h"
61
#include "drizzled/display.h"
66
#include <boost/filesystem.hpp>
68
#include "drizzled/util/backtrace.h"
72
namespace fs=boost::filesystem;
27
#include <mysys/mysys_err.h>
28
#include <drizzled/error.h>
29
#include <drizzled/query_id.h>
30
#include <drizzled/data_home.h>
31
#include <drizzled/sql_base.h>
32
#include <drizzled/lock.h>
33
#include <drizzled/item/cache.h>
34
#include <drizzled/item/float.h>
35
#include <drizzled/item/return_int.h>
36
#include <drizzled/item/empty_string.h>
37
#include <drizzled/show.h>
38
#include <drizzled/scheduling.h>
77
41
The following is used to initialise Table_ident with a internal
81
45
char empty_c_string[1]= {0}; /* used for not defined db */
83
47
const char * const Session::DEFAULT_WHERE= "field list";
48
extern pthread_key_t THR_Session;
49
extern pthread_key_t THR_Mem_root;
51
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
53
template class List<Key>;
54
template class List_iterator<Key>;
55
template class List<Key_part_spec>;
56
template class List_iterator<Key_part_spec>;
57
template class List<Alter_drop>;
58
template class List_iterator<Alter_drop>;
59
template class List<Alter_column>;
60
template class List_iterator<Alter_column>;
63
/****************************************************************************
65
****************************************************************************/
66
extern "C" unsigned char *get_var_key(user_var_entry *entry, size_t *length,
69
*length= entry->name.length;
70
return (unsigned char*) entry->name.str;
73
extern "C" void free_user_var(user_var_entry *entry)
75
char *pos= (char*) entry+ALIGN_SIZE(sizeof(*entry));
76
if (entry->value && entry->value != pos)
85
81
bool Key_part_spec::operator==(const Key_part_spec& other) const
87
83
return length == other.length &&
88
84
field_name.length == other.field_name.length &&
89
!my_strcasecmp(system_charset_info, field_name.str, other.field_name.str);
85
!strcmp(field_name.str, other.field_name.str);
92
Open_tables_state::Open_tables_state(uint64_t version_arg) :
88
Open_tables_state::Open_tables_state(ulong version_arg)
89
:version(version_arg), state_flags(0U)
95
open_tables= temporary_tables= derived_tables= NULL;
96
extra_lock= lock= NULL;
91
reset_open_tables_state();
100
95
The following functions form part of the C plugin API
102
int mysql_tmpfile(const char *prefix)
97
extern "C" int mysql_tmpfile(const char *prefix)
104
99
char filename[FN_REFLEN];
105
int fd = internal::create_temp_file(filename, drizzle_tmpdir.c_str(), prefix, MYF(MY_WME));
100
File fd = create_temp_file(filename, drizzle_tmpdir, prefix, MYF(MY_WME));
107
102
unlink(filename);
124
120
@see Session::set_proc_info
126
void set_session_proc_info(Session *session, const char *info)
123
set_session_proc_info(Session *session, const char *info)
128
125
session->set_proc_info(info);
131
129
const char *get_session_proc_info(Session *session)
133
131
return session->get_proc_info();
136
void **Session::getEngineData(const plugin::MonitoredInTransaction *monitored)
138
return static_cast<void **>(&ha_data[monitored->getId()].ha_ptr);
141
ResourceContext *Session::getResourceContext(const plugin::MonitoredInTransaction *monitored,
144
return &ha_data[monitored->getId()].resource_context[index];
135
void **session_ha_data(const Session *session, const struct StorageEngine *engine)
137
return (void **) &session->ha_data[engine->slot].ha_ptr;
147
141
int64_t session_test_options(const Session *session, int64_t test_options)
149
143
return session->options & test_options;
152
147
int session_sql_command(const Session *session)
154
149
return (int) session->lex->sql_command;
157
enum_tx_isolation session_tx_isolation(const Session *session)
159
return (enum_tx_isolation)session->variables.tx_isolation;
162
Session::Session(plugin::Client *client_arg) :
153
int session_tx_isolation(const Session *session)
155
return (int) session->variables.tx_isolation;
159
void session_inc_row_count(Session *session)
161
session->row_count++;
164
Session::Session(Protocol *protocol_arg)
166
Statement(&main_lex, &main_mem_root, /* statement id */ 0),
163
167
Open_tables_state(refresh_version),
164
mem_root(&main_mem_root),
167
query(new std::string),
168
_schema(new std::string("")),
173
168
lock_id(&main_lock_id),
175
ha_data(plugin::num_trx_monitored_objects),
176
concurrent_execute_allowed(true),
177
170
arg_of_last_insert_id_function(false),
178
171
first_successful_insert_id_in_prev_stmt(0),
179
172
first_successful_insert_id_in_cur_stmt(0),
180
173
limit_found_rows(0),
181
_global_read_lock(NONE),
183
175
some_tables_deleted(false),
184
176
no_errors(false),
259
253
memset(&status_var, 0, sizeof(status_var));
261
255
/* Initialize sub structures */
262
memory::init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
256
init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
257
hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0,
258
(hash_get_key) get_var_key,
259
(hash_free_key) free_user_var, 0);
262
protocol= protocol_arg;
263
protocol->setSession(this);
265
const Query_id& local_query_id= Query_id::get_query_id();
267
protocol->setRandom(tmp + (uint64_t) &protocol,
268
tmp + (uint64_t)local_query_id.value());
264
269
substitute_null_with_insert_id = false;
265
lock_info.init(); /* safety: will be reset after start */
270
thr_lock_info_init(&lock_info); /* safety: will be reset after start */
266
271
thr_lock_owner_init(&main_lock_id, &lock_info);
268
273
m_internal_handler= NULL;
270
plugin::EventObserver::registerSessionEvents(*this);
273
void Session::free_items()
276
void Statement::free_items()
276
/* This works because items are allocated with memory::sql_alloc() */
279
/* This works because items are allocated with sql_alloc() */
277
280
for (; free_list; free_list= next)
279
282
next= free_list->next;
302
305
return false; // 'false', as per coding style
305
void Session::setAbort(bool arg)
307
mysys_var->abort= arg;
310
void Session::lockOnSys()
316
boost_unique_lock_t scopedLock(mysys_var->mutex);
317
if (mysys_var->current_cond)
319
mysys_var->current_mutex->lock();
320
mysys_var->current_cond->notify_all();
321
mysys_var->current_mutex->unlock();
325
308
void Session::pop_internal_handler()
327
310
assert(m_internal_handler != NULL);
328
311
m_internal_handler= NULL;
331
void Session::get_xid(DRIZZLE_XID *xid)
333
*xid = *(DRIZZLE_XID *) &transaction.xid_state.xid;
314
#if defined(__cplusplus)
318
void *session_alloc(Session *session, unsigned int size)
320
return session->alloc(size);
323
void *session_calloc(Session *session, unsigned int size)
325
return session->calloc(size);
328
char *session_strdup(Session *session, const char *str)
330
return session->strdup(str);
333
char *session_strmake(Session *session, const char *str, unsigned int size)
335
return session->strmake(str, size);
338
void *session_memdup(Session *session, const void* str, unsigned int size)
340
return session->memdup(str, size);
343
void session_get_xid(const Session *session, DRIZZLE_XID *xid)
345
*xid = *(DRIZZLE_XID *) &session->transaction.xid_state.xid;
348
#if defined(__cplusplus)
353
Init Session for query processing.
354
This has to be called once before we call mysql_parse.
355
See also comments in session.h.
358
void Session::init_for_queries()
361
ha_enable_transaction(this,true);
363
reset_root_defaults(mem_root, variables.query_alloc_block_size,
364
variables.query_prealloc_size);
365
reset_root_defaults(&transaction.mem_root,
366
variables.trans_alloc_block_size,
367
variables.trans_prealloc_size);
368
transaction.xid_state.xid.null();
369
transaction.xid_state.in_session=1;
336
373
/* Do operations that may take a long time */
350
TransactionServices &transaction_services= TransactionServices::singleton();
351
transaction_services.rollbackTransaction(this, true);
352
388
xid_cache_delete(&transaction.xid_state);
355
for (UserVars::iterator iter= user_vars.begin();
356
iter != user_vars.end();
359
user_var_entry *entry= (*iter).second;
390
hash_free(&user_vars);
365
391
close_temporary_tables();
367
393
if (global_read_lock)
369
unlockGlobalReadLock();
394
unlock_global_read_lock(this);
372
396
cleanup_done= true;
375
399
Session::~Session()
401
Session_CHECK_SENTRY(this);
402
add_to_status(&global_status_var, &status_var);
379
if (client->isConnected())
404
if (protocol->isConnected())
381
406
if (global_system_variables.log_warnings)
382
errmsg_printf(ERRMSG_LVL_WARN, ER(ER_FORCING_CLOSE),internal::my_progname,
407
errmsg_printf(ERRMSG_LVL_WARN, ER(ER_FORCING_CLOSE),my_progname,
384
(getSecurityContext().getUser().c_str() ?
385
getSecurityContext().getUser().c_str() : ""));
409
(security_ctx.user.c_str() ?
410
security_ctx.user.c_str() : ""));
386
411
disconnect(0, false);
389
414
/* Close connection */
393
418
if (cleanup_done == false)
396
plugin::StorageEngine::closeConnection(this);
421
ha_close_connection(this);
397
422
plugin_sessionvar_cleanup(this);
399
warn_root.free_root(MYF(0));
429
free_root(&warn_root,MYF(0));
430
free_root(&transaction.mem_root,MYF(0));
400
431
mysys_var=0; // Safety (shouldn't be needed)
401
432
dbug_sentry= Session_SENTRY_GONE;
403
main_mem_root.free_root(MYF(0));
404
currentMemRoot().release();
405
currentSession().release();
407
plugin::Logging::postEndDo(this);
408
plugin::EventObserver::deregisterSessionEvents(*this);
410
for (PropertyMap::iterator iter= life_properties.begin(); iter != life_properties.end(); iter++)
412
delete (*iter).second;
414
life_properties.clear();
417
void Session::setClient(plugin::Client *client_arg)
420
client->setSession(this);
423
void Session::awake(Session::killed_state_t state_to_set)
425
if ((state_to_set == Session::KILL_QUERY) and (command == COM_SLEEP))
430
setKilled(state_to_set);
431
scheduler->killSession(this);
434
free_root(&main_mem_root, MYF(0));
435
pthread_setspecific(THR_Session, 0);
438
/* Ensure that no one is using Session */
439
pthread_mutex_unlock(&LOCK_delete);
440
pthread_mutex_destroy(&LOCK_delete);
444
Add all status variables to another status variable array
448
to_var add to this array
449
from_var from this array
452
This function assumes that all variables are long/ulong.
453
If this assumption will change, then we have to explictely add
454
the other variables after the while loop
456
void add_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var)
458
ulong *end= (ulong*) ((unsigned char*) to_var +
459
offsetof(STATUS_VAR, last_system_status_var) +
461
ulong *to= (ulong*) to_var, *from= (ulong*) from_var;
468
Add the difference between two status variable arrays to another one.
472
to_var add to this array
473
from_var from this array
474
dec_var minus this array
477
This function assumes that all variables are long/ulong.
479
void add_diff_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var,
482
ulong *end= (ulong*) ((unsigned char*) to_var + offsetof(STATUS_VAR,
483
last_system_status_var) +
485
ulong *to= (ulong*) to_var, *from= (ulong*) from_var, *dec= (ulong*) dec_var;
488
*(to++)+= *(from++) - *(dec++);
491
void Session::awake(Session::killed_state state_to_set)
493
Session_CHECK_SENTRY(this);
494
safe_mutex_assert_owner(&LOCK_delete);
495
Scheduler &thread_scheduler= get_thread_scheduler();
497
killed= state_to_set;
433
498
if (state_to_set != Session::KILL_QUERY)
435
DRIZZLE_CONNECTION_DONE(thread_id);
500
thread_scheduler.post_kill_notification(this);
440
boost_unique_lock_t scopedLock(mysys_var->mutex);
504
pthread_mutex_lock(&mysys_var->mutex);
443
506
This broadcast could be up in the air if the victim thread
444
507
exits the cond in the time between read and broadcast, but that is
445
508
ok since all we want to do is to make the victim thread get out
492
553
This allows us to move Session to different threads if needed.
494
555
mysys_var->id= thread_id;
556
real_id= pthread_self(); // For debugging
497
559
We have to call thr_lock_info_init() again here as Session may have been
498
560
created in another thread
562
thr_lock_info_init(&lock_info);
506
Init Session for query processing.
507
This has to be called once before we call mysql_parse.
508
See also comments in session.h.
511
566
void Session::prepareForQueries()
513
568
if (variables.max_join_size == HA_POS_ERROR)
514
569
options |= OPTION_BIG_SELECTS;
570
if (client_capabilities & CLIENT_COMPRESS)
572
protocol->enableCompression();
516
575
version= refresh_version;
517
576
set_proc_info(NULL);
518
577
command= COM_SLEEP;
521
mem_root->reset_root_defaults(variables.query_alloc_block_size,
522
variables.query_prealloc_size);
523
transaction.xid_state.xid.null();
524
transaction.xid_state.in_session=1;
529
582
bool Session::initGlobals()
533
586
disconnect(ER_OUT_OF_RESOURCES, true);
534
status_var.aborted_connects++;
542
if (initGlobals() || authenticate())
550
while (not client->haveError() && getKilled() != KILL_CONNECTION)
552
if (not executeStatement())
559
bool Session::schedule(Session::shared_ptr &arg)
561
arg->scheduler= plugin::Scheduler::getScheduler();
562
assert(arg->scheduler);
564
connection_count.increment();
566
if (connection_count > current_global_counters.max_used_connections)
568
current_global_counters.max_used_connections= connection_count;
571
current_global_counters.connections++;
572
arg->thread_id= arg->variables.pseudo_thread_id= global_thread_id++;
574
session::Cache::singleton().insert(arg);
576
if (unlikely(plugin::EventObserver::connectSession(*arg)))
578
// We should do something about an error...
581
if (plugin::Scheduler::getScheduler()->addSession(arg))
583
DRIZZLE_CONNECTION_START(arg->getSessionId());
584
char error_message_buff[DRIZZLE_ERRMSG_SIZE];
586
arg->setKilled(Session::KILL_CONNECTION);
588
arg->status_var.aborted_connects++;
590
/* Can't use my_error() since store_globals has not been called. */
591
/* TODO replace will better error message */
592
snprintf(error_message_buff, sizeof(error_message_buff),
593
ER(ER_CANT_CREATE_THREAD), 1);
594
arg->client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
604
Is this session viewable by the current user?
606
bool Session::isViewable() const
608
return plugin::Authorization::isAuthorized(current_session->getSecurityContext(),
614
const char* Session::enter_cond(boost::condition_variable_any &cond, boost::mutex &mutex, const char* msg)
616
const char* old_msg = get_proc_info();
617
safe_mutex_assert_owner(mutex);
618
mysys_var->current_mutex = &mutex;
619
mysys_var->current_cond = &cond;
620
this->set_proc_info(msg);
624
void Session::exit_cond(const char* old_msg)
627
Putting the mutex unlock in exit_cond() ensures that
628
mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
629
locked (if that would not be the case, you'll get a deadlock if someone
630
does a Session::awake() on you).
632
mysys_var->current_mutex->unlock();
633
boost_unique_lock_t scopedLock(mysys_var->mutex);
634
mysys_var->current_mutex = 0;
635
mysys_var->current_cond = 0;
636
this->set_proc_info(old_msg);
587
statistic_increment(aborted_connects, &LOCK_status);
588
Scheduler &thread_scheduler= get_thread_scheduler();
589
thread_scheduler.end_thread(this, 0);
639
595
bool Session::authenticate()
642
if (client->authenticate())
598
if (protocol->authenticate())
601
statistic_increment(aborted_connects, &LOCK_status);
605
bool Session::checkUser(const char *passwd, uint32_t passwd_len, const char *in_db)
607
LEX_STRING db_str= { (char *) in_db, in_db ? strlen(in_db) : 0 };
608
bool is_authenticated;
611
Clear session->db as it points to something, that will be freed when
612
connection is closed. We don't want to accidentally free a wrong
613
pointer if connect failed. Also in case of 'CHANGE USER' failure,
614
current database will be switched to 'no database selected'.
618
if (passwd_len != 0 && passwd_len != SCRAMBLE_LENGTH)
620
my_error(ER_HANDSHAKE_ERROR, MYF(0), security_ctx.ip.c_str());
645
status_var.aborted_connects++;
650
bool Session::checkUser(const std::string &passwd_str,
651
const std::string &in_db)
653
bool is_authenticated=
654
plugin::Authentication::isAuthenticated(getSecurityContext(),
624
is_authenticated= authenticate_user(this, passwd);
657
626
if (is_authenticated != true)
659
status_var.access_denied++;
660
/* isAuthenticated has pushed the error message */
628
my_error(ER_ACCESS_DENIED_ERROR, MYF(0),
629
security_ctx.user.c_str(),
630
security_ctx.ip.c_str(),
631
passwd_len ? ER(ER_YES) : ER(ER_NO));
636
security_ctx.skip_grants();
664
638
/* Change database if necessary */
665
if (not in_db.empty())
639
if (in_db && in_db[0])
667
SchemaIdentifier identifier(in_db);
668
if (mysql_change_db(this, identifier))
641
if (mysql_change_db(this, &db_str, false))
670
643
/* mysql_change_db() has pushed the error message. */
675
password= not passwd_str.empty();
648
password= test(passwd_len); // remember for error messages
677
650
/* Ready to handle queries */
884
841
@return NULL on failure, or pointer to the LEX_STRING object
886
843
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
887
const std::string &str,
888
bool allocate_lex_string)
890
return make_lex_string(lex_str, str.c_str(), str.length(), allocate_lex_string);
893
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
894
const char* str, uint32_t length,
895
bool allocate_lex_string)
844
const char* str, uint32_t length,
845
bool allocate_lex_string)
897
847
if (allocate_lex_string)
898
848
if (!(lex_str= (LEX_STRING *)alloc(sizeof(LEX_STRING))))
900
if (!(lex_str->str= mem_root->strmake_root(str, length)))
850
if (!(lex_str->str= strmake_root(mem_root, str, length)))
902
852
lex_str->length= length;
856
/* routings to adding tables to list of changed in transaction tables */
857
inline static void list_include(CHANGED_TableList** prev,
858
CHANGED_TableList* curr,
859
CHANGED_TableList* new_table)
864
(*prev)->next = curr;
868
/* add table to list of changed in transaction tables */
870
void Session::add_changed_table(Table *table)
872
assert((options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
873
table->file->has_transactions());
874
add_changed_table(table->s->table_cache_key.str,
875
(long) table->s->table_cache_key.length);
879
void Session::add_changed_table(const char *key, long key_length)
881
CHANGED_TableList **prev_changed = &transaction.changed_tables;
882
CHANGED_TableList *curr = transaction.changed_tables;
884
for (; curr; prev_changed = &(curr->next), curr = curr->next)
886
int cmp = (long)curr->key_length - (long)key_length;
889
list_include(prev_changed, curr, changed_table_dup(key, key_length));
894
cmp = memcmp(curr->key, key, curr->key_length);
897
list_include(prev_changed, curr, changed_table_dup(key, key_length));
906
*prev_changed = changed_table_dup(key, key_length);
910
CHANGED_TableList* Session::changed_table_dup(const char *key, long key_length)
912
CHANGED_TableList* new_table =
913
(CHANGED_TableList*) trans_alloc(ALIGN_SIZE(sizeof(CHANGED_TableList))+
917
my_error(EE_OUTOFMEMORY, MYF(ME_BELL),
918
ALIGN_SIZE(sizeof(TableList)) + key_length + 1);
919
killed= KILL_CONNECTION;
923
new_table->key= ((char*)new_table)+ ALIGN_SIZE(sizeof(CHANGED_TableList));
925
new_table->key_length = key_length;
926
::memcpy(new_table->key, key, key_length);
906
931
int Session::send_explain_fields(select_result *result)
908
933
List<Item> field_list;
1038
static int create_file(Session *session,
1039
fs::path &target_path,
1040
file_exchange *exchange,
1041
internal::IO_CACHE *cache)
1056
static File create_file(Session *session, char *path, file_exchange *exchange, IO_CACHE *cache)
1043
fs::path to_file(exchange->file_name);
1046
if (not to_file.has_root_directory())
1059
uint32_t option= MY_UNPACK_FILENAME | MY_RELATIVE_PATH;
1061
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
1062
option|= MY_REPLACE_DIR; // Force use of db directory
1065
if (!dirname_length(exchange->file_name))
1048
target_path= fs::system_complete(getDataHomeCatalog());
1049
util::string::const_shared_ptr schema(session->schema());
1050
if (schema and not schema->empty())
1052
int count_elements= 0;
1053
for (fs::path::iterator iter= to_file.begin();
1054
iter != to_file.end();
1055
++iter, ++count_elements)
1058
if (count_elements == 1)
1060
target_path /= *schema;
1063
target_path /= to_file;
1067
strcpy(path, drizzle_real_data_home);
1069
strncat(path, session->db, FN_REFLEN-strlen(drizzle_real_data_home)-1);
1070
(void) fn_format(path, exchange->file_name, path, "", option);
1067
target_path = exchange->file_name;
1070
if (not secure_file_priv.string().empty())
1072
if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
1074
/* Write only allowed to dir or subdir specified by secure_file_priv */
1075
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
1080
if (!access(target_path.file_string().c_str(), F_OK))
1073
(void) fn_format(path, exchange->file_name, drizzle_real_data_home, "", option);
1075
if (opt_secure_file_priv &&
1076
strncmp(opt_secure_file_priv, path, strlen(opt_secure_file_priv)))
1078
/* Write only allowed to dir or subdir specified by secure_file_priv */
1079
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
1083
if (!access(path, F_OK))
1082
1085
my_error(ER_FILE_EXISTS_ERROR, MYF(0), exchange->file_name);
1085
1088
/* Create the file world readable */
1086
if ((file= internal::my_create(target_path.file_string().c_str(), 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
1089
if ((file= my_create(path, 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
1088
1092
(void) fchmod(file, 0666); // Because of umask()
1089
if (cache->init_io_cache(file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1094
(void) chmod(path, 0666);
1096
if (init_io_cache(cache, file, 0L, WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1091
internal::my_close(file, MYF(0));
1092
internal::my_delete(target_path.file_string().c_str(), MYF(0)); // Delete file on error, it was just created
1098
my_close(file, MYF(0));
1099
my_delete(path, MYF(0)); // Delete file on error, it was just created
1188
1198
res=item->str_result(&tmp);
1189
1199
if (res && enclosed)
1191
if (my_b_write(cache,(unsigned char*) exchange->enclosed->ptr(),
1192
exchange->enclosed->length()))
1201
if (my_b_write(&cache,(unsigned char*) exchange->enclosed->ptr(),
1202
exchange->enclosed->length()))
1197
1207
if (!fixed_row_size)
1199
if (escape_char != -1) // Use \N syntax
1201
null_buff[0]=escape_char;
1203
if (my_b_write(cache,(unsigned char*) null_buff,2))
1206
else if (my_b_write(cache,(unsigned char*) "NULL",4))
1209
if (escape_char != -1) // Use \N syntax
1211
null_buff[0]=escape_char;
1213
if (my_b_write(&cache,(unsigned char*) null_buff,2))
1216
else if (my_b_write(&cache,(unsigned char*) "NULL",4))
1211
used_length=0; // Fill with space
1221
used_length=0; // Fill with space
1216
1226
if (fixed_row_size)
1217
used_length= min(res->length(), static_cast<size_t>(item->max_length));
1227
used_length=cmin(res->length(),item->max_length);
1219
used_length= res->length();
1229
used_length=res->length();
1221
1230
if ((result_type == STRING_RESULT || is_unsafe_field_sep) &&
1224
1233
char *pos, *start, *end;
1225
1234
const CHARSET_INFO * const res_charset= res->charset();
1226
1235
const CHARSET_INFO * const character_set_client= default_charset_info;
1228
1237
bool check_second_byte= (res_charset == &my_charset_bin) &&
1229
character_set_client->
1230
escape_with_backslash_is_dangerous;
1238
character_set_client->
1239
escape_with_backslash_is_dangerous;
1231
1240
assert(character_set_client->mbmaxlen == 2 ||
1232
1241
!character_set_client->escape_with_backslash_is_dangerous);
1233
for (start=pos=(char*) res->ptr(),end=pos+used_length ;
1237
if (use_mb(res_charset))
1240
if ((l=my_ismbchar(res_charset, pos, end)))
1242
for (start=pos=(char*) res->ptr(),end=pos+used_length ;
1247
if (use_mb(res_charset))
1250
if ((l=my_ismbchar(res_charset, pos, end)))
1248
1259
Special case when dumping BINARY/VARBINARY/BLOB values
1276
1287
assert before the loop makes that sure.
1279
if ((needs_escaping(*pos, enclosed) ||
1290
if ((NEED_ESCAPING(*pos) ||
1280
1291
(check_second_byte &&
1281
1292
my_mbcharlen(character_set_client, (unsigned char) *pos) == 2 &&
1282
1293
pos + 1 < end &&
1283
needs_escaping(pos[1], enclosed))) &&
1294
NEED_ESCAPING(pos[1]))) &&
1285
Don't escape field_term_char by doubling - doubling is only
1286
valid for ENCLOSED BY characters:
1296
Don't escape field_term_char by doubling - doubling is only
1297
valid for ENCLOSED BY characters:
1288
1299
(enclosed || !is_ambiguous_field_term ||
1289
1300
(int) (unsigned char) *pos != field_term_char))
1292
1303
tmp_buff[0]= ((int) (unsigned char) *pos == field_sep_char &&
1293
1304
is_ambiguous_field_sep) ?
1294
field_sep_char : escape_char;
1295
tmp_buff[1]= *pos ? *pos : '0';
1296
if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)) ||
1297
my_b_write(cache,(unsigned char*) tmp_buff,2))
1302
if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)))
1305
field_sep_char : escape_char;
1306
tmp_buff[1]= *pos ? *pos : '0';
1307
if (my_b_write(&cache,(unsigned char*) start,(uint32_t) (pos-start)) ||
1308
my_b_write(&cache,(unsigned char*) tmp_buff,2))
1313
if (my_b_write(&cache,(unsigned char*) start,(uint32_t) (pos-start)))
1305
else if (my_b_write(cache,(unsigned char*) res->ptr(),used_length))
1316
else if (my_b_write(&cache,(unsigned char*) res->ptr(),used_length))
1308
1319
if (fixed_row_size)
1309
1320
{ // Fill with space
1310
1321
if (item->max_length > used_length)
1312
/* QQ: Fix by adding a my_b_fill() function */
1316
memset(space, ' ', sizeof(space));
1318
uint32_t length=item->max_length-used_length;
1319
for (; length > sizeof(space) ; length-=sizeof(space))
1321
if (my_b_write(cache,(unsigned char*) space,sizeof(space)))
1324
if (my_b_write(cache,(unsigned char*) space,length))
1323
/* QQ: Fix by adding a my_b_fill() function */
1327
memset(space, ' ', sizeof(space));
1329
uint32_t length=item->max_length-used_length;
1330
for (; length > sizeof(space) ; length-=sizeof(space))
1332
if (my_b_write(&cache,(unsigned char*) space,sizeof(space)))
1335
if (my_b_write(&cache,(unsigned char*) space,length))
1328
1339
if (res && enclosed)
1330
if (my_b_write(cache, (unsigned char*) exchange->enclosed->ptr(),
1341
if (my_b_write(&cache, (unsigned char*) exchange->enclosed->ptr(),
1331
1342
exchange->enclosed->length()))
1334
1345
if (--items_left)
1336
if (my_b_write(cache, (unsigned char*) exchange->field_term->ptr(),
1347
if (my_b_write(&cache, (unsigned char*) exchange->field_term->ptr(),
1337
1348
field_term_length))
1341
if (my_b_write(cache,(unsigned char*) exchange->line_term->ptr(),
1342
exchange->line_term->length()))
1352
if (my_b_write(&cache,(unsigned char*) exchange->line_term->ptr(),
1353
exchange->line_term->length()))
1615
1639
memset(&status_var, 0, sizeof(status_var));
1619
void Session::set_db(const std::string &new_db)
1642
void Security_context::skip_grants()
1644
/* privileges for the user are unknown everything is allowed */
1648
/****************************************************************************
1649
Handling of open and locked tables states.
1651
This is used when we want to open/lock (and then close) some tables when
1652
we already have a set of tables open and locked. We use these methods for
1653
access to mysql.proc table to find definitions of stored routines.
1654
****************************************************************************/
1656
void Session::reset_n_backup_open_tables_state(Open_tables_state *backup)
1658
backup->set_open_tables_state(this);
1659
reset_open_tables_state();
1660
state_flags|= Open_tables_state::BACKUPS_AVAIL;
1664
void Session::restore_backup_open_tables_state(Open_tables_state *backup)
1667
Before we will throw away current open tables state we want
1668
to be sure that it was properly cleaned up.
1670
assert(open_tables == 0 && temporary_tables == 0 &&
1671
derived_tables == 0 &&
1673
set_open_tables_state(backup);
1677
bool Session::set_db(const char *new_db, size_t new_db_len)
1621
1679
/* Do not reallocate memory if current chunk is big enough. */
1622
if (new_db.length())
1624
_schema.reset(new std::string(new_db));
1680
if (db && new_db && db_length >= new_db_len)
1681
memcpy(db, new_db, new_db_len+1);
1628
_schema.reset(new std::string(""));
1688
db= (char *)malloc(new_db_len + 1);
1691
memcpy(db, new_db, new_db_len);
1698
db_length= db ? new_db_len : 0;
1699
return new_db && !db;
1704
Check the killed state of a user thread
1705
@param session user thread
1706
@retval 0 the user thread is active
1707
@retval 1 the user thread has been killed
1709
extern "C" int session_killed(const Session *session)
1711
return(session->killed);
1715
Return the thread id of a user thread
1716
@param session user thread
1719
extern "C" unsigned long session_get_thread_id(const Session *session)
1721
return((unsigned long)session->thread_id);
1726
LEX_STRING *session_make_lex_string(Session *session, LEX_STRING *lex_str,
1727
const char *str, unsigned int size,
1728
int allocate_lex_string)
1730
return session->make_lex_string(lex_str, str, size,
1731
(bool) allocate_lex_string);
1734
extern "C" const struct charset_info_st *session_charset(Session *session)
1736
return(session->charset());
1739
extern "C" char **session_query(Session *session)
1741
return(&session->query);
1744
extern "C" int session_non_transactional_update(const Session *session)
1746
return(session->transaction.all.modified_non_trans_table);
1749
extern "C" void session_mark_transaction_to_rollback(Session *session, bool all)
1751
mark_transaction_to_rollback(session, all);
1634
1755
Mark transaction to rollback and mark error as fatal to a sub-statement.
1651
1772
plugin_sessionvar_cleanup(this);
1653
1774
/* If necessary, log any aborted or unauthorized connections */
1654
if (getKilled() || client->wasAborted())
1656
status_var.aborted_threads++;
1775
if (killed || protocol->wasAborted())
1776
statistic_increment(aborted_threads, &LOCK_status);
1659
if (client->wasAborted())
1778
if (protocol->wasAborted())
1661
if (not getKilled() && variables.log_warnings > 1)
1780
if (! killed && variables.log_warnings > 1)
1663
SecurityContext *sctx= &security_ctx;
1782
Security_context *sctx= &security_ctx;
1665
1784
errmsg_printf(ERRMSG_LVL_WARN, ER(ER_NEW_ABORTING_CONNECTION)
1667
, (_schema->empty() ? "unconnected" : _schema->c_str())
1668
, sctx->getUser().empty() == false ? sctx->getUser().c_str() : "unauthenticated"
1669
, sctx->getIp().c_str()
1786
, (db ? db : "unconnected")
1787
, sctx->user.empty() == false ? sctx->user.c_str() : "unauthenticated"
1670
1789
, (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1674
1793
/* Close out our connection to the client */
1675
1794
if (should_lock)
1676
session::Cache::singleton().mutex().lock();
1678
setKilled(Session::KILL_CONNECTION);
1680
if (client->isConnected())
1795
(void) pthread_mutex_lock(&LOCK_thread_count);
1796
killed= Session::KILL_CONNECTION;
1797
if (protocol->isConnected())
1684
1801
/*my_error(errcode, ER(errcode));*/
1685
client->sendError(errcode, ER(errcode));
1802
protocol->sendError(errcode, ER(errcode)); /* purecov: inspected */
1690
1806
if (should_lock)
1692
session::Cache::singleton().mutex().unlock();
1807
(void) pthread_mutex_unlock(&LOCK_thread_count);
1696
1810
void Session::reset_for_next_command()
1718
1842
Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
1843
creates one DROP TEMPORARY Table binlog event for each pseudo-thread
1721
void Open_tables_state::close_temporary_tables()
1846
void Session::close_temporary_tables()
1724
1849
Table *tmp_next;
1726
if (not temporary_tables)
1851
if (!temporary_tables)
1729
1854
for (table= temporary_tables; table; table= tmp_next)
1731
tmp_next= table->getNext();
1856
tmp_next= table->next;
1857
close_temporary(table, true, true);
1734
1859
temporary_tables= NULL;
1738
unlink from session->temporary tables and close temporary table
1741
void Open_tables_state::close_temporary_table(Table *table)
1743
if (table->getPrev())
1745
table->getPrev()->setNext(table->getNext());
1746
if (table->getPrev()->getNext())
1748
table->getNext()->setPrev(table->getPrev());
1753
/* removing the item from the list */
1754
assert(table == temporary_tables);
1756
slave must reset its temporary list pointer to zero to exclude
1757
passing non-zero value to end_slave via rli->save_temporary_tables
1758
when no temp tables opened, see an invariant below.
1760
temporary_tables= table->getNext();
1761
if (temporary_tables)
1763
table->getNext()->setPrev(NULL);
1770
Close and drop a temporary table
1773
This dosn't unlink table from session->temporary
1774
If this is needed, use close_temporary_table()
1777
void Open_tables_state::nukeTable(Table *table)
1779
plugin::StorageEngine *table_type= table->getShare()->db_type();
1781
table->free_io_cache();
1782
table->delete_table();
1784
TableIdentifier identifier(table->getShare()->getSchemaName(), table->getShare()->getTableName(), table->getShare()->getPath());
1785
rm_temporary_table(table_type, identifier);
1787
delete table->getMutableShare();
1789
/* This makes me sad, but we're allocating it via malloc */
1793
1863
/** Clear most status variables. */
1794
1864
extern time_t flush_status_time;
1865
extern uint32_t max_used_connections;
1796
1867
void Session::refresh_status()
1869
pthread_mutex_lock(&LOCK_status);
1871
/* Add thread's status variabes to global status */
1872
add_to_status(&global_status_var, &status_var);
1798
1874
/* Reset thread's status variables */
1799
1875
memset(&status_var, 0, sizeof(status_var));
1877
/* Reset some global variables */
1878
reset_status_vars();
1880
/* Reset the counters of all key caches (default and named). */
1881
process_key_caches(reset_key_cache_counters);
1801
1882
flush_status_time= time((time_t*) 0);
1802
current_global_counters.max_used_connections= 1; /* We set it to one, because we know we exist */
1803
current_global_counters.connections= 0;
1883
max_used_connections= 1; /* We set it to one, because we know we exist */
1884
pthread_mutex_unlock(&LOCK_status);
1887
#define extra_size sizeof(double)
1806
1889
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1808
return getVariable(std::string(name.str, name.length), create_if_not_exists);
1811
user_var_entry *Session::getVariable(const std::string &name, bool create_if_not_exists)
1813
UserVarsRange ppp= user_vars.equal_range(name);
1815
for (UserVars::iterator iter= ppp.first;
1816
iter != ppp.second; ++iter)
1818
return (*iter).second;
1821
if (not create_if_not_exists)
1824
1891
user_var_entry *entry= NULL;
1825
entry= new (nothrow) user_var_entry(name.c_str(), query_id);
1830
std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(name, entry));
1832
if (not returnable.second)
1893
assert(name.length == strlen (name.str));
1894
entry= (user_var_entry*) hash_search(&user_vars, (unsigned char*) name.str, name.length);
1896
if ((entry == NULL) && create_if_not_exists)
1898
uint32_t size=ALIGN_SIZE(sizeof(user_var_entry))+name.length+1+extra_size;
1899
if (!hash_inited(&user_vars))
1901
if (!(entry = (user_var_entry*) malloc(size)))
1903
entry->name.str=(char*) entry+ ALIGN_SIZE(sizeof(user_var_entry))+
1905
entry->name.length=name.length;
1908
entry->update_query_id=0;
1909
entry->collation.set(NULL, DERIVATION_IMPLICIT);
1910
entry->unsigned_flag= 0;
1912
If we are here, we were called from a SET or a query which sets a
1913
variable. Imagine it is this:
1914
INSERT INTO t SELECT @a:=10, @a:=@a+1.
1915
Then when we have a Item_func_get_user_var (because of the @a+1) so we
1916
think we have to write the value of @a to the binlog. But before that,
1917
we have a Item_func_set_user_var to create @a (@a:=10), in this we mark
1918
the variable as "already logged" (line below) so that it won't be logged
1919
by Item_func_get_user_var (because that's not necessary).
1921
entry->used_query_id= query_id;
1922
entry->type=STRING_RESULT;
1923
memcpy(entry->name.str, name.str, name.length+1);
1924
if (my_hash_insert(&user_vars, (unsigned char*) entry))
1927
free((char*) entry);
1840
void Session::setVariable(const std::string &name, const std::string &value)
1842
user_var_entry *updateable_var= getVariable(name.c_str(), true);
1844
updateable_var->update_hash(false,
1845
(void*)value.c_str(),
1846
static_cast<uint32_t>(value.length()), STRING_RESULT,
1848
DERIVATION_IMPLICIT, false);
1851
void Open_tables_state::mark_temp_tables_as_free_for_reuse()
1853
for (Table *table= temporary_tables ; table ; table= table->getNext())
1936
void Session::mark_temp_tables_as_free_for_reuse()
1938
for (Table *table= temporary_tables ; table ; table= table->next)
1855
if (table->query_id == getQueryId())
1940
if (table->query_id == query_id)
1857
1942
table->query_id= 0;
1858
table->cursor->ha_reset();
1943
table->file->ha_reset();
1863
1948
void Session::mark_used_tables_as_free_for_reuse(Table *table)
1865
for (; table ; table= table->getNext())
1950
for (; table ; table= table->next)
1867
if (table->query_id == getQueryId())
1952
if (table->query_id == query_id)
1869
1954
table->query_id= 0;
1870
table->cursor->ha_reset();
1955
table->file->ha_reset();
1944
2055
close_thread_tables();
1947
bool Session::openTablesLock(TableList *tables)
2058
int Session::open_and_lock_tables(TableList *tables)
1949
2060
uint32_t counter;
1950
2061
bool need_reopen;
1954
if (open_tables_from_list(&tables, &counter))
2065
if (open_tables_from_list(&tables, &counter, 0))
1957
if (not lock_tables(tables, counter, &need_reopen))
2068
if (!lock_tables(this, tables, counter, &need_reopen))
1959
if (not need_reopen)
1961
2072
close_tables_for_reopen(&tables);
1963
2074
if ((mysql_handle_derived(lex, &mysql_derived_prepare) ||
2075
(fill_derived_tables() &&
1965
2076
mysql_handle_derived(lex, &mysql_derived_filling))))
1972
@note "best_effort" is used in cases were if a failure occurred on this
1973
operation it would not be surprising because we are only removing because there
1974
might be an issue (lame engines).
1977
bool Open_tables_state::rm_temporary_table(const TableIdentifier &identifier, bool best_effort)
1979
if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), identifier))
1981
if (not best_effort)
1984
identifier.getSQLPath(path);
1985
errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
1986
path.c_str(), errno);
1995
bool Open_tables_state::rm_temporary_table(plugin::StorageEngine *base, const TableIdentifier &identifier)
1999
if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), *base, identifier))
2002
identifier.getSQLPath(path);
2003
errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
2004
path.c_str(), errno);
2013
@note this will be removed, I am looking through Hudson to see if it is finding
2014
any tables that are missed during cleanup.
2016
void Open_tables_state::dumpTemporaryTableNames(const char *foo)
2020
if (not temporary_tables)
2023
cerr << "Begin Run: " << foo << "\n";
2024
for (table= temporary_tables; table; table= table->getNext())
2026
bool have_proto= false;
2028
message::Table *proto= table->getShare()->getTableProto();
2029
if (table->getShare()->getTableProto())
2032
const char *answer= have_proto ? "true" : "false";
2036
cerr << "\tTable Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2037
cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
2041
cerr << "\tTabl;e Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2046
bool Session::TableMessages::storeTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2048
table_message_cache.insert(make_pair(identifier.getPath(), table_message));
2053
bool Session::TableMessages::removeTableMessage(const TableIdentifier &identifier)
2055
TableMessageCache::iterator iter;
2057
iter= table_message_cache.find(identifier.getPath());
2059
if (iter == table_message_cache.end())
2062
table_message_cache.erase(iter);
2067
bool Session::TableMessages::getTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2069
TableMessageCache::iterator iter;
2071
iter= table_message_cache.find(identifier.getPath());
2073
if (iter == table_message_cache.end())
2076
table_message.CopyFrom(((*iter).second));
2081
bool Session::TableMessages::doesTableMessageExist(const TableIdentifier &identifier)
2083
TableMessageCache::iterator iter;
2085
iter= table_message_cache.find(identifier.getPath());
2087
if (iter == table_message_cache.end())
2095
bool Session::TableMessages::renameTableMessage(const TableIdentifier &from, const TableIdentifier &to)
2097
TableMessageCache::iterator iter;
2099
table_message_cache[to.getPath()]= table_message_cache[from.getPath()];
2101
iter= table_message_cache.find(to.getPath());
2103
if (iter == table_message_cache.end())
2108
(*iter).second.set_schema(to.getSchemaName());
2109
(*iter).second.set_name(to.getTableName());
2114
table::Instance *Session::getInstanceTable()
2116
temporary_shares.push_back(new table::Instance()); // This will not go into the tableshare cache, so no key is used.
2118
table::Instance *tmp_share= temporary_shares.back();
2127
Create a reduced Table object with properly set up Field list from a
2128
list of field definitions.
2130
The created table doesn't have a table Cursor associated with
2131
it, has no keys, no group/distinct, no copy_funcs array.
2132
The sole purpose of this Table object is to use the power of Field
2133
class to read/write data to/from table->getInsertRecord(). Then one can store
2134
the record in any container (RB tree, hash, etc).
2135
The table is created in Session mem_root, so are the table's fields.
2136
Consequently, if you don't BLOB fields, you don't need to free it.
2138
@param session connection handle
2139
@param field_list list of column definitions
2142
0 if out of memory, Table object in case of success
2144
table::Instance *Session::getInstanceTable(List<CreateField> &field_list)
2146
temporary_shares.push_back(new table::Instance(this, field_list)); // This will not go into the tableshare cache, so no key is used.
2148
table::Instance *tmp_share= temporary_shares.back();
2157
static const std::string NONE= "NONE";
2158
static const std::string GOT_GLOBAL_READ_LOCK= "HAS GLOBAL READ LOCK";
2159
static const std::string MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT= "HAS GLOBAL READ LOCK WITH BLOCKING COMMIT";
2161
const std::string &type(drizzled::Session::global_read_lock_t type)
2167
case Session::GOT_GLOBAL_READ_LOCK:
2168
return GOT_GLOBAL_READ_LOCK;
2169
case Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT:
2170
return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT;
2174
size_t max_string_length(drizzled::Session::global_read_lock_t)
2176
return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT.size();
2179
} /* namespace display */
2181
} /* namespace drizzled */
2077
return 1; /* purecov: inspected */
2082
bool Session::open_normal_and_derived_tables(TableList *tables, uint32_t flags)
2085
assert(!(fill_derived_tables()));
2086
if (open_tables_from_list(&tables, &counter, flags) ||
2087
mysql_handle_derived(lex, &mysql_derived_prepare))
2088
return true; /* purecov: inspected */