24
24
#include "config.h"
25
#include "drizzled/session.h"
26
#include "drizzled/session/cache.h"
25
#include <drizzled/session.h>
26
#include "drizzled/session_list.h"
27
27
#include <sys/stat.h>
28
#include "drizzled/error.h"
29
#include "drizzled/gettext.h"
30
#include "drizzled/query_id.h"
31
#include "drizzled/data_home.h"
32
#include "drizzled/sql_base.h"
33
#include "drizzled/lock.h"
34
#include "drizzled/item/cache.h"
35
#include "drizzled/item/float.h"
36
#include "drizzled/item/return_int.h"
37
#include "drizzled/item/empty_string.h"
38
#include "drizzled/show.h"
39
#include "drizzled/plugin/client.h"
28
#include <drizzled/error.h>
29
#include <drizzled/gettext.h>
30
#include <drizzled/query_id.h>
31
#include <drizzled/data_home.h>
32
#include <drizzled/sql_base.h>
33
#include <drizzled/lock.h>
34
#include <drizzled/item/cache.h>
35
#include <drizzled/item/float.h>
36
#include <drizzled/item/return_int.h>
37
#include <drizzled/item/empty_string.h>
38
#include <drizzled/show.h>
39
#include <drizzled/plugin/client.h>
40
40
#include "drizzled/plugin/scheduler.h"
41
41
#include "drizzled/plugin/authentication.h"
42
42
#include "drizzled/plugin/logging.h"
43
43
#include "drizzled/plugin/transactional_storage_engine.h"
44
#include "drizzled/plugin/query_rewrite.h"
45
44
#include "drizzled/probes.h"
46
45
#include "drizzled/table_proto.h"
47
46
#include "drizzled/db.h"
49
48
#include "drizzled/transaction_services.h"
50
49
#include "drizzled/drizzled.h"
52
#include "drizzled/table/instance.h"
51
#include "drizzled/table_share_instance.h"
54
53
#include "plugin/myisam/myisam.h"
55
54
#include "drizzled/internal/iocache.h"
56
55
#include "drizzled/internal/thread_var.h"
57
56
#include "drizzled/plugin/event_observer.h"
59
#include "drizzled/util/functors.h"
61
#include "drizzled/display.h"
64
59
#include <algorithm>
66
#include <boost/filesystem.hpp>
68
#include "drizzled/util/backtrace.h"
70
62
using namespace std;
72
namespace fs=boost::filesystem;
162
152
Session::Session(plugin::Client *client_arg) :
163
153
Open_tables_state(refresh_version),
164
154
mem_root(&main_mem_root),
167
query(new std::string),
168
_schema(new std::string("")),
170
157
client(client_arg),
172
159
scheduler_arg(NULL),
173
160
lock_id(&main_lock_id),
175
162
ha_data(plugin::num_trx_monitored_objects),
176
concurrent_execute_allowed(true),
177
163
arg_of_last_insert_id_function(false),
178
164
first_successful_insert_id_in_prev_stmt(0),
179
165
first_successful_insert_id_in_cur_stmt(0),
180
166
limit_found_rows(0),
181
_global_read_lock(NONE),
183
168
some_tables_deleted(false),
184
169
no_errors(false),
316
boost_unique_lock_t scopedLock(mysys_var->mutex);
302
pthread_mutex_lock(&mysys_var->mutex);
317
303
if (mysys_var->current_cond)
319
mysys_var->current_mutex->lock();
320
mysys_var->current_cond->notify_all();
321
mysys_var->current_mutex->unlock();
305
pthread_mutex_lock(mysys_var->current_mutex);
306
pthread_cond_broadcast(mysys_var->current_cond);
307
pthread_mutex_unlock(mysys_var->current_mutex);
309
pthread_mutex_unlock(&mysys_var->mutex);
325
312
void Session::pop_internal_handler()
407
392
plugin::Logging::postEndDo(this);
408
393
plugin::EventObserver::deregisterSessionEvents(*this);
410
for (PropertyMap::iterator iter= life_properties.begin(); iter != life_properties.end(); iter++)
412
delete (*iter).second;
414
life_properties.clear();
417
void Session::setClient(plugin::Client *client_arg)
420
client->setSession(this);
423
void Session::awake(Session::killed_state_t state_to_set)
425
if ((state_to_set == Session::KILL_QUERY) and (command == COM_SLEEP))
395
/* Ensure that no one is using Session */
396
LOCK_delete.unlock();
399
void Session::awake(Session::killed_state state_to_set)
428
401
this->checkSentry();
430
setKilled(state_to_set);
431
scheduler->killSession(this);
402
safe_mutex_assert_owner(&LOCK_delete);
404
killed= state_to_set;
433
405
if (state_to_set != Session::KILL_QUERY)
407
scheduler->killSession(this);
435
408
DRIZZLE_CONNECTION_DONE(thread_id);
440
boost_unique_lock_t scopedLock(mysys_var->mutex);
412
pthread_mutex_lock(&mysys_var->mutex);
443
414
This broadcast could be up in the air if the victim thread
444
415
exits the cond in the time between read and broadcast, but that is
445
416
ok since all we want to do is to make the victim thread get out
571
541
current_global_counters.connections++;
572
arg->thread_id= arg->variables.pseudo_thread_id= global_thread_id++;
574
session::Cache::singleton().insert(arg);
576
if (unlikely(plugin::EventObserver::connectSession(*arg)))
578
// We should do something about an error...
581
if (plugin::Scheduler::getScheduler()->addSession(arg))
583
DRIZZLE_CONNECTION_START(arg->getSessionId());
542
thread_id= variables.pseudo_thread_id= global_thread_id++;
544
LOCK_thread_count.lock();
545
getSessionList().push_back(this);
546
LOCK_thread_count.unlock();
548
if (scheduler->addSession(this))
550
DRIZZLE_CONNECTION_START(thread_id);
584
551
char error_message_buff[DRIZZLE_ERRMSG_SIZE];
586
arg->setKilled(Session::KILL_CONNECTION);
553
killed= Session::KILL_CONNECTION;
588
arg->status_var.aborted_connects++;
555
status_var.aborted_connects++;
590
557
/* Can't use my_error() since store_globals has not been called. */
591
558
/* TODO replace will better error message */
592
559
snprintf(error_message_buff, sizeof(error_message_buff),
593
560
ER(ER_CANT_CREATE_THREAD), 1);
594
arg->client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
561
client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
604
Is this session viewable by the current user?
606
bool Session::isViewable() const
608
return plugin::Authorization::isAuthorized(current_session->getSecurityContext(),
614
const char* Session::enter_cond(boost::condition_variable_any &cond, boost::mutex &mutex, const char* msg)
569
const char* Session::enter_cond(boost::condition_variable &cond, boost::mutex &mutex, const char* msg)
616
571
const char* old_msg = get_proc_info();
617
572
safe_mutex_assert_owner(mutex);
618
mysys_var->current_mutex = &mutex;
619
mysys_var->current_cond = &cond;
573
mysys_var->current_mutex = mutex.native_handle();
574
mysys_var->current_cond = cond.native_handle();
620
575
this->set_proc_info(msg);
629
584
locked (if that would not be the case, you'll get a deadlock if someone
630
585
does a Session::awake() on you).
632
mysys_var->current_mutex->unlock();
633
boost_unique_lock_t scopedLock(mysys_var->mutex);
587
pthread_mutex_unlock(mysys_var->current_mutex);
588
pthread_mutex_lock(&mysys_var->mutex);
634
589
mysys_var->current_mutex = 0;
635
590
mysys_var->current_cond = 0;
636
591
this->set_proc_info(old_msg);
592
pthread_mutex_unlock(&mysys_var->mutex);
639
595
bool Session::authenticate()
642
598
if (client->authenticate())
694
650
main_da.reset_diagnostics_area();
696
652
if (client->readCommand(&l_packet, &packet_length) == false)
701
if (getKilled() == KILL_CONNECTION)
655
if (killed == KILL_CONNECTION)
704
658
if (packet_length == 0)
707
l_command= static_cast<enum_server_command>(l_packet[0]);
661
l_command= (enum enum_server_command) (unsigned char) l_packet[0];
709
663
if (command >= COM_END)
710
664
command= COM_END; // Wrong command
712
666
assert(packet_length);
713
return not dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
667
return ! dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
716
670
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
722
676
in_packet_length--;
724
678
const char *pos= in_packet + in_packet_length; /* Point at end null */
725
while (in_packet_length > 0 && (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
679
while (in_packet_length > 0 &&
680
(pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
728
683
in_packet_length--;
731
std::string *new_query= new std::string(in_packet, in_packet + in_packet_length);
732
// We can not be entirely sure _schema has a value
735
plugin::QueryRewriter::rewriteQuery(*_schema, *new_query);
737
query.reset(new_query);
738
_state.reset(new State(in_packet, in_packet_length));
686
query.assign(in_packet, in_packet + in_packet_length);
1038
static int create_file(Session *session,
1039
fs::path &target_path,
1040
file_exchange *exchange,
1041
internal::IO_CACHE *cache)
984
static int create_file(Session *session, char *path, file_exchange *exchange, internal::IO_CACHE *cache)
1043
fs::path to_file(exchange->file_name);
1046
if (not to_file.has_root_directory())
987
uint32_t option= MY_UNPACK_FILENAME | MY_RELATIVE_PATH;
989
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
990
option|= MY_REPLACE_DIR; // Force use of db directory
993
if (!internal::dirname_length(exchange->file_name))
1048
target_path= fs::system_complete(getDataHomeCatalog());
1049
util::string::const_shared_ptr schema(session->schema());
1050
if (schema and not schema->empty())
1052
int count_elements= 0;
1053
for (fs::path::iterator iter= to_file.begin();
1054
iter != to_file.end();
1055
++iter, ++count_elements)
1058
if (count_elements == 1)
1060
target_path /= *schema;
1063
target_path /= to_file;
995
strcpy(path, data_home_real);
996
if (! session->db.empty())
997
strncat(path, session->db.c_str(), FN_REFLEN-strlen(data_home_real)-1);
998
(void) internal::fn_format(path, exchange->file_name, path, "", option);
1067
target_path = exchange->file_name;
1070
if (not secure_file_priv.string().empty())
1072
if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
1074
/* Write only allowed to dir or subdir specified by secure_file_priv */
1075
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
1080
if (!access(target_path.file_string().c_str(), F_OK))
1001
(void) internal::fn_format(path, exchange->file_name, data_home_real, "", option);
1003
if (opt_secure_file_priv &&
1004
strncmp(opt_secure_file_priv, path, strlen(opt_secure_file_priv)))
1006
/* Write only allowed to dir or subdir specified by secure_file_priv */
1007
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
1011
if (!access(path, F_OK))
1082
1013
my_error(ER_FILE_EXISTS_ERROR, MYF(0), exchange->file_name);
1085
1016
/* Create the file world readable */
1086
if ((file= internal::my_create(target_path.file_string().c_str(), 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
1017
if ((file= internal::my_create(path, 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
1088
1019
(void) fchmod(file, 0666); // Because of umask()
1089
if (cache->init_io_cache(file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1020
if (init_io_cache(cache, file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1091
1022
internal::my_close(file, MYF(0));
1092
internal::my_delete(target_path.file_string().c_str(), MYF(0)); // Delete file on error, it was just created
1023
internal::my_delete(path, MYF(0)); // Delete file on error, it was just created
1319
1246
for (; length > sizeof(space) ; length-=sizeof(space))
1321
1248
if (my_b_write(cache,(unsigned char*) space,sizeof(space)))
1324
1251
if (my_b_write(cache,(unsigned char*) space,length))
1328
1255
if (res && enclosed)
1330
1257
if (my_b_write(cache, (unsigned char*) exchange->enclosed->ptr(),
1331
1258
exchange->enclosed->length()))
1334
1261
if (--items_left)
1336
1263
if (my_b_write(cache, (unsigned char*) exchange->field_term->ptr(),
1337
1264
field_term_length))
1341
1268
if (my_b_write(cache,(unsigned char*) exchange->line_term->ptr(),
1342
1269
exchange->line_term->length()))
1451
1379
switch (val_item->result_type())
1453
1381
case REAL_RESULT:
1454
op= &select_max_min_finder_subselect::cmp_real;
1382
op= &select_max_min_finder_subselect::cmp_real;
1456
1384
case INT_RESULT:
1457
op= &select_max_min_finder_subselect::cmp_int;
1385
op= &select_max_min_finder_subselect::cmp_int;
1459
1387
case STRING_RESULT:
1460
op= &select_max_min_finder_subselect::cmp_str;
1388
op= &select_max_min_finder_subselect::cmp_str;
1462
1390
case DECIMAL_RESULT:
1463
1391
op= &select_max_min_finder_subselect::cmp_decimal;
1465
1393
case ROW_RESULT:
1466
1394
// This case should never be choosen
1471
1399
cache->store(val_item);
1554
1482
void Session::end_statement()
1556
1484
/* Cleanup SQL processing state to reuse this statement in next query. */
1558
1486
query_cache_key= ""; // reset the cache key
1559
1487
resetResultsetMessage();
1562
1490
bool Session::copy_db_to(char **p_db, size_t *p_db_length)
1565
if (_schema and _schema->empty())
1567
my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1570
else if (not _schema)
1572
my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1577
*p_db= strmake(_schema->c_str(), _schema->size());
1578
*p_db_length= _schema->size();
1494
my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1497
*p_db= strmake(db.c_str(), db.length());
1498
*p_db_length= db.length();
1619
void Session::set_db(const std::string &new_db)
1538
bool Session::set_db(const std::string &new_db)
1621
1540
/* Do not reallocate memory if current chunk is big enough. */
1622
1541
if (new_db.length())
1624
_schema.reset(new std::string(new_db));
1628
_schema.reset(new std::string(""));
1553
Check the killed state of a user thread
1554
@param session user thread
1555
@retval 0 the user thread is active
1556
@retval 1 the user thread has been killed
1558
int session_killed(const Session *session)
1560
return(session->killed);
1564
const struct charset_info_st *session_charset(Session *session)
1566
return(session->charset());
1634
1570
Mark transaction to rollback and mark error as fatal to a sub-statement.
1651
1587
plugin_sessionvar_cleanup(this);
1653
1589
/* If necessary, log any aborted or unauthorized connections */
1654
if (getKilled() || client->wasAborted())
1590
if (killed || client->wasAborted())
1656
1592
status_var.aborted_threads++;
1659
1595
if (client->wasAborted())
1661
if (not getKilled() && variables.log_warnings > 1)
1597
if (! killed && variables.log_warnings > 1)
1663
1599
SecurityContext *sctx= &security_ctx;
1665
1601
errmsg_printf(ERRMSG_LVL_WARN, ER(ER_NEW_ABORTING_CONNECTION)
1667
, (_schema->empty() ? "unconnected" : _schema->c_str())
1603
, (db.empty() ? "unconnected" : db.c_str())
1668
1604
, sctx->getUser().empty() == false ? sctx->getUser().c_str() : "unauthenticated"
1669
1605
, sctx->getIp().c_str()
1670
1606
, (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1806
1737
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1808
return getVariable(std::string(name.str, name.length), create_if_not_exists);
1811
user_var_entry *Session::getVariable(const std::string &name, bool create_if_not_exists)
1813
UserVarsRange ppp= user_vars.equal_range(name);
1739
user_var_entry *entry= NULL;
1740
UserVarsRange ppp= user_vars.equal_range(std::string(name.str, name.length));
1815
1742
for (UserVars::iterator iter= ppp.first;
1816
iter != ppp.second; ++iter)
1743
iter != ppp.second; ++iter)
1818
return (*iter).second;
1745
entry= (*iter).second;
1821
if (not create_if_not_exists)
1824
user_var_entry *entry= NULL;
1825
entry= new (nothrow) user_var_entry(name.c_str(), query_id);
1830
std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(name, entry));
1832
if (not returnable.second)
1748
if ((entry == NULL) && create_if_not_exists)
1750
entry= new (nothrow) user_var_entry(name.str, query_id);
1755
std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(std::string(name.str, name.length), entry));
1757
if (not returnable.second)
1840
void Session::setVariable(const std::string &name, const std::string &value)
1842
user_var_entry *updateable_var= getVariable(name.c_str(), true);
1844
updateable_var->update_hash(false,
1845
(void*)value.c_str(),
1846
static_cast<uint32_t>(value.length()), STRING_RESULT,
1848
DERIVATION_IMPLICIT, false);
1851
void Open_tables_state::mark_temp_tables_as_free_for_reuse()
1767
void Session::mark_temp_tables_as_free_for_reuse()
1853
1769
for (Table *table= temporary_tables ; table ; table= table->getNext())
1855
if (table->query_id == getQueryId())
1771
if (table->query_id == query_id)
1857
1773
table->query_id= 0;
1858
1774
table->cursor->ha_reset();
1916
1833
handled either before writing a query log event (inside
1917
1834
binlog_query()) or when preparing a pending event.
1836
mysql_unlock_tables(this, lock);
1923
Note that we need to hold table::Cache::singleton().mutex() while changing the
1840
Note that we need to hold LOCK_open while changing the
1924
1841
open_tables list. Another thread may work on it.
1925
(See: table::Cache::singleton().removeTable(), mysql_wait_completed_table())
1842
(See: remove_table_from_cache(), mysql_wait_completed_table())
1926
1843
Closing a MERGE child before the parent would be fatal if the
1927
1844
other thread tries to abort the MERGE lock in between.
1961
1878
close_tables_for_reopen(&tables);
1963
1880
if ((mysql_handle_derived(lex, &mysql_derived_prepare) ||
1881
(fill_derived_tables() &&
1965
1882
mysql_handle_derived(lex, &mysql_derived_filling))))
1888
bool Session::openTables(TableList *tables, uint32_t flags)
1891
bool ret= fill_derived_tables();
1892
assert(ret == false);
1893
if (open_tables_from_list(&tables, &counter, flags) ||
1894
mysql_handle_derived(lex, &mysql_derived_prepare))
1972
1902
@note "best_effort" is used in cases were if a failure occurred on this
1973
1903
operation it would not be surprising because we are only removing because there
1974
1904
might be an issue (lame engines).
1977
bool Open_tables_state::rm_temporary_table(const TableIdentifier &identifier, bool best_effort)
1907
bool Session::rm_temporary_table(TableIdentifier &identifier, bool best_effort)
1979
if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), identifier))
1909
if (plugin::StorageEngine::dropTable(*this, identifier))
1981
1911
if (not best_effort)
1984
identifier.getSQLPath(path);
1985
1913
errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
1986
path.c_str(), errno);
1914
identifier.getSQLPath().c_str(), errno);
1995
bool Open_tables_state::rm_temporary_table(plugin::StorageEngine *base, const TableIdentifier &identifier)
1923
bool Session::rm_temporary_table(plugin::StorageEngine *base, TableIdentifier &identifier)
1999
if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), *base, identifier))
1927
if (plugin::StorageEngine::dropTable(*this, *base, identifier))
2002
identifier.getSQLPath(path);
2003
1929
errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
2004
path.c_str(), errno);
1930
identifier.getSQLPath().c_str(), errno);
2037
1963
cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
2041
1966
cerr << "\tTabl;e Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2046
bool Session::TableMessages::storeTableMessage(const TableIdentifier &identifier, message::Table &table_message)
1970
bool Session::storeTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2048
1972
table_message_cache.insert(make_pair(identifier.getPath(), table_message));
2053
bool Session::TableMessages::removeTableMessage(const TableIdentifier &identifier)
1977
bool Session::removeTableMessage(const TableIdentifier &identifier)
2055
1979
TableMessageCache::iterator iter;
2114
table::Instance *Session::getInstanceTable()
2116
temporary_shares.push_back(new table::Instance()); // This will not go into the tableshare cache, so no key is used.
2118
table::Instance *tmp_share= temporary_shares.back();
2127
Create a reduced Table object with properly set up Field list from a
2128
list of field definitions.
2130
The created table doesn't have a table Cursor associated with
2131
it, has no keys, no group/distinct, no copy_funcs array.
2132
The sole purpose of this Table object is to use the power of Field
2133
class to read/write data to/from table->getInsertRecord(). Then one can store
2134
the record in any container (RB tree, hash, etc).
2135
The table is created in Session mem_root, so are the table's fields.
2136
Consequently, if you don't BLOB fields, you don't need to free it.
2138
@param session connection handle
2139
@param field_list list of column definitions
2142
0 if out of memory, Table object in case of success
2144
table::Instance *Session::getInstanceTable(List<CreateField> &field_list)
2146
temporary_shares.push_back(new table::Instance(this, field_list)); // This will not go into the tableshare cache, so no key is used.
2148
table::Instance *tmp_share= temporary_shares.back();
2157
static const std::string NONE= "NONE";
2158
static const std::string GOT_GLOBAL_READ_LOCK= "HAS GLOBAL READ LOCK";
2159
static const std::string MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT= "HAS GLOBAL READ LOCK WITH BLOCKING COMMIT";
2161
const std::string &type(drizzled::Session::global_read_lock_t type)
2167
case Session::GOT_GLOBAL_READ_LOCK:
2168
return GOT_GLOBAL_READ_LOCK;
2169
case Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT:
2170
return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT;
2174
size_t max_string_length(drizzled::Session::global_read_lock_t)
2176
return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT.size();
2179
} /* namespace display */
2038
TableShareInstance *Session::getTemporaryShare(TableIdentifier::Type type_arg)
2040
temporary_shares.push_back(new TableShareInstance(type_arg)); // This will not go into the tableshare cache, so no key is used.
2042
TableShareInstance *tmp_share= temporary_shares.back();
2181
2049
} /* namespace drizzled */