24
24
#include "config.h"
25
#include "drizzled/session.h"
26
#include "drizzled/session/cache.h"
25
#include <drizzled/session.h>
26
#include "drizzled/session_list.h"
27
27
#include <sys/stat.h>
28
#include "drizzled/error.h"
29
#include "drizzled/gettext.h"
30
#include "drizzled/query_id.h"
31
#include "drizzled/data_home.h"
32
#include "drizzled/sql_base.h"
33
#include "drizzled/lock.h"
34
#include "drizzled/item/cache.h"
35
#include "drizzled/item/float.h"
36
#include "drizzled/item/return_int.h"
37
#include "drizzled/item/empty_string.h"
38
#include "drizzled/show.h"
39
#include "drizzled/plugin/client.h"
28
#include <drizzled/error.h>
29
#include <drizzled/gettext.h>
30
#include <drizzled/query_id.h>
31
#include <drizzled/data_home.h>
32
#include <drizzled/sql_base.h>
33
#include <drizzled/lock.h>
34
#include <drizzled/item/cache.h>
35
#include <drizzled/item/float.h>
36
#include <drizzled/item/return_int.h>
37
#include <drizzled/item/empty_string.h>
38
#include <drizzled/show.h>
39
#include <drizzled/plugin/client.h>
40
40
#include "drizzled/plugin/scheduler.h"
41
41
#include "drizzled/plugin/authentication.h"
42
42
#include "drizzled/plugin/logging.h"
43
43
#include "drizzled/plugin/transactional_storage_engine.h"
44
#include "drizzled/plugin/query_rewrite.h"
45
44
#include "drizzled/probes.h"
46
45
#include "drizzled/table_proto.h"
47
46
#include "drizzled/db.h"
49
48
#include "drizzled/transaction_services.h"
50
49
#include "drizzled/drizzled.h"
52
#include "drizzled/table/instance.h"
51
#include "drizzled/table_share_instance.h"
54
53
#include "plugin/myisam/myisam.h"
55
54
#include "drizzled/internal/iocache.h"
56
55
#include "drizzled/internal/thread_var.h"
57
56
#include "drizzled/plugin/event_observer.h"
59
#include "drizzled/util/functors.h"
61
#include "drizzled/display.h"
64
59
#include <algorithm>
66
#include <boost/filesystem.hpp>
68
#include "drizzled/util/backtrace.h"
61
#include "boost/filesystem.hpp"
70
63
using namespace std;
87
80
return length == other.length &&
88
81
field_name.length == other.field_name.length &&
89
!my_strcasecmp(system_charset_info, field_name.str, other.field_name.str);
82
!strcmp(field_name.str, other.field_name.str);
92
85
Open_tables_state::Open_tables_state(uint64_t version_arg) :
162
155
Session::Session(plugin::Client *client_arg) :
163
156
Open_tables_state(refresh_version),
164
157
mem_root(&main_mem_root),
167
query(new std::string),
168
_schema(new std::string("")),
170
160
client(client_arg),
172
162
scheduler_arg(NULL),
173
163
lock_id(&main_lock_id),
175
165
ha_data(plugin::num_trx_monitored_objects),
176
concurrent_execute_allowed(true),
177
166
arg_of_last_insert_id_function(false),
178
167
first_successful_insert_id_in_prev_stmt(0),
179
168
first_successful_insert_id_in_cur_stmt(0),
180
169
limit_found_rows(0),
181
_global_read_lock(NONE),
183
171
some_tables_deleted(false),
184
172
no_errors(false),
412
400
delete (*iter).second;
414
402
life_properties.clear();
417
void Session::setClient(plugin::Client *client_arg)
420
client->setSession(this);
423
void Session::awake(Session::killed_state_t state_to_set)
425
if ((state_to_set == Session::KILL_QUERY) and (command == COM_SLEEP))
404
/* Ensure that no one is using Session */
405
LOCK_delete.unlock();
408
void Session::awake(Session::killed_state state_to_set)
428
410
this->checkSentry();
430
setKilled(state_to_set);
431
scheduler->killSession(this);
411
safe_mutex_assert_owner(&LOCK_delete);
413
killed= state_to_set;
433
414
if (state_to_set != Session::KILL_QUERY)
416
scheduler->killSession(this);
435
417
DRIZZLE_CONNECTION_DONE(thread_id);
440
421
boost_unique_lock_t scopedLock(mysys_var->mutex);
548
529
prepareForQueries();
550
while (not client->haveError() && getKilled() != KILL_CONNECTION)
531
while (! client->haveError() && killed != KILL_CONNECTION)
552
if (not executeStatement())
533
if (! executeStatement())
556
537
disconnect(0, true);
559
bool Session::schedule(Session::shared_ptr &arg)
540
bool Session::schedule()
561
arg->scheduler= plugin::Scheduler::getScheduler();
562
assert(arg->scheduler);
542
scheduler= plugin::Scheduler::getScheduler();
564
545
connection_count.increment();
571
552
current_global_counters.connections++;
572
arg->thread_id= arg->variables.pseudo_thread_id= global_thread_id++;
574
session::Cache::singleton().insert(arg);
576
if (unlikely(plugin::EventObserver::connectSession(*arg)))
553
thread_id= variables.pseudo_thread_id= global_thread_id++;
555
LOCK_thread_count.lock();
556
getSessionList().push_back(this);
557
LOCK_thread_count.unlock();
559
if (unlikely(plugin::EventObserver::connectSession(*this)))
578
561
// We should do something about an error...
581
if (plugin::Scheduler::getScheduler()->addSession(arg))
564
if (scheduler->addSession(this))
583
DRIZZLE_CONNECTION_START(arg->getSessionId());
566
DRIZZLE_CONNECTION_START(thread_id);
584
567
char error_message_buff[DRIZZLE_ERRMSG_SIZE];
586
arg->setKilled(Session::KILL_CONNECTION);
569
killed= Session::KILL_CONNECTION;
588
arg->status_var.aborted_connects++;
571
status_var.aborted_connects++;
590
573
/* Can't use my_error() since store_globals has not been called. */
591
574
/* TODO replace will better error message */
592
575
snprintf(error_message_buff, sizeof(error_message_buff),
593
576
ER(ER_CANT_CREATE_THREAD), 1);
594
arg->client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
577
client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
604
Is this session viewable by the current user?
606
bool Session::isViewable() const
608
return plugin::Authorization::isAuthorized(current_session->getSecurityContext(),
614
585
const char* Session::enter_cond(boost::condition_variable_any &cond, boost::mutex &mutex, const char* msg)
616
587
const char* old_msg = get_proc_info();
650
bool Session::checkUser(const std::string &passwd_str,
651
const std::string &in_db)
621
bool Session::checkUser(const char *passwd, uint32_t passwd_len, const char *in_db)
623
const string passwd_str(passwd, passwd_len);
653
624
bool is_authenticated=
654
625
plugin::Authentication::isAuthenticated(getSecurityContext(),
694
665
main_da.reset_diagnostics_area();
696
667
if (client->readCommand(&l_packet, &packet_length) == false)
701
if (getKilled() == KILL_CONNECTION)
670
if (killed == KILL_CONNECTION)
704
673
if (packet_length == 0)
707
l_command= static_cast<enum_server_command>(l_packet[0]);
676
l_command= (enum enum_server_command) (unsigned char) l_packet[0];
709
678
if (command >= COM_END)
710
679
command= COM_END; // Wrong command
712
681
assert(packet_length);
713
return not dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
682
return ! dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
716
685
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
722
691
in_packet_length--;
724
693
const char *pos= in_packet + in_packet_length; /* Point at end null */
725
while (in_packet_length > 0 && (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
694
while (in_packet_length > 0 &&
695
(pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
728
698
in_packet_length--;
731
std::string *new_query= new std::string(in_packet, in_packet + in_packet_length);
732
// We can not be entirely sure _schema has a value
735
plugin::QueryRewriter::rewriteQuery(*_schema, *new_query);
737
query.reset(new_query);
738
_state.reset(new State(in_packet, in_packet_length));
701
query.assign(in_packet, in_packet + in_packet_length);
867
826
where= Session::DEFAULT_WHERE;
869
828
/* Reset the temporary shares we built */
870
for_each(temporary_shares.begin(),
871
temporary_shares.end(),
829
for (std::vector<TableShareInstance *>::iterator iter= temporary_shares.begin();
830
iter != temporary_shares.end(); iter++)
873
834
temporary_shares.clear();
955
916
my_message(errcode, err, MYF(0));
958
(void) cache->end_io_cache();
919
(void) end_io_cache(cache);
959
920
(void) internal::my_close(file, MYF(0));
960
921
(void) internal::my_delete(path.file_string().c_str(), MYF(0)); // Delete file on error
1046
1007
if (not to_file.has_root_directory())
1048
1009
target_path= fs::system_complete(getDataHomeCatalog());
1049
util::string::const_shared_ptr schema(session->schema());
1050
if (schema and not schema->empty())
1010
if (not session->db.empty())
1052
1012
int count_elements= 0;
1053
1013
for (fs::path::iterator iter= to_file.begin();
1086
1046
if ((file= internal::my_create(target_path.file_string().c_str(), 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
1088
1048
(void) fchmod(file, 0666); // Because of umask()
1089
if (cache->init_io_cache(file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1049
if (init_io_cache(cache, file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1091
1051
internal::my_close(file, MYF(0));
1092
1052
internal::my_delete(target_path.file_string().c_str(), MYF(0)); // Delete file on error, it was just created
1295
1253
tmp_buff[1]= *pos ? *pos : '0';
1296
1254
if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)) ||
1297
1255
my_b_write(cache,(unsigned char*) tmp_buff,2))
1302
1260
if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)))
1305
1263
else if (my_b_write(cache,(unsigned char*) res->ptr(),used_length))
1308
1266
if (fixed_row_size)
1309
1267
{ // Fill with space
1319
1277
for (; length > sizeof(space) ; length-=sizeof(space))
1321
1279
if (my_b_write(cache,(unsigned char*) space,sizeof(space)))
1324
1282
if (my_b_write(cache,(unsigned char*) space,length))
1328
1286
if (res && enclosed)
1330
1288
if (my_b_write(cache, (unsigned char*) exchange->enclosed->ptr(),
1331
1289
exchange->enclosed->length()))
1334
1292
if (--items_left)
1336
1294
if (my_b_write(cache, (unsigned char*) exchange->field_term->ptr(),
1337
1295
field_term_length))
1341
1299
if (my_b_write(cache,(unsigned char*) exchange->line_term->ptr(),
1342
1300
exchange->line_term->length()))
1451
1408
switch (val_item->result_type())
1453
1410
case REAL_RESULT:
1454
op= &select_max_min_finder_subselect::cmp_real;
1411
op= &select_max_min_finder_subselect::cmp_real;
1456
1413
case INT_RESULT:
1457
op= &select_max_min_finder_subselect::cmp_int;
1414
op= &select_max_min_finder_subselect::cmp_int;
1459
1416
case STRING_RESULT:
1460
op= &select_max_min_finder_subselect::cmp_str;
1417
op= &select_max_min_finder_subselect::cmp_str;
1462
1419
case DECIMAL_RESULT:
1463
1420
op= &select_max_min_finder_subselect::cmp_decimal;
1465
1422
case ROW_RESULT:
1466
1423
// This case should never be choosen
1471
1428
cache->store(val_item);
1554
1511
void Session::end_statement()
1556
1513
/* Cleanup SQL processing state to reuse this statement in next query. */
1558
1515
query_cache_key= ""; // reset the cache key
1559
1516
resetResultsetMessage();
1562
1519
bool Session::copy_db_to(char **p_db, size_t *p_db_length)
1565
if (_schema and _schema->empty())
1567
my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1570
else if (not _schema)
1572
my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1577
*p_db= strmake(_schema->c_str(), _schema->size());
1578
*p_db_length= _schema->size();
1523
my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1526
*p_db= strmake(db.c_str(), db.length());
1527
*p_db_length= db.length();
1619
void Session::set_db(const std::string &new_db)
1567
bool Session::set_db(const std::string &new_db)
1621
1569
/* Do not reallocate memory if current chunk is big enough. */
1622
1570
if (new_db.length())
1624
_schema.reset(new std::string(new_db));
1628
_schema.reset(new std::string(""));
1582
Check the killed state of a user thread
1583
@param session user thread
1584
@retval 0 the user thread is active
1585
@retval 1 the user thread has been killed
1587
int session_killed(const Session *session)
1589
return(session->killed);
1593
const struct charset_info_st *session_charset(Session *session)
1595
return(session->charset());
1634
1599
Mark transaction to rollback and mark error as fatal to a sub-statement.
1651
1616
plugin_sessionvar_cleanup(this);
1653
1618
/* If necessary, log any aborted or unauthorized connections */
1654
if (getKilled() || client->wasAborted())
1619
if (killed || client->wasAborted())
1656
1621
status_var.aborted_threads++;
1659
1624
if (client->wasAborted())
1661
if (not getKilled() && variables.log_warnings > 1)
1626
if (! killed && variables.log_warnings > 1)
1663
1628
SecurityContext *sctx= &security_ctx;
1665
1630
errmsg_printf(ERRMSG_LVL_WARN, ER(ER_NEW_ABORTING_CONNECTION)
1667
, (_schema->empty() ? "unconnected" : _schema->c_str())
1632
, (db.empty() ? "unconnected" : db.c_str())
1668
1633
, sctx->getUser().empty() == false ? sctx->getUser().c_str() : "unauthenticated"
1669
1634
, sctx->getIp().c_str()
1670
1635
, (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1674
1639
/* Close out our connection to the client */
1675
1640
if (should_lock)
1676
session::Cache::singleton().mutex().lock();
1678
setKilled(Session::KILL_CONNECTION);
1641
LOCK_thread_count.lock();
1642
killed= Session::KILL_CONNECTION;
1680
1643
if (client->isConnected())
1806
1766
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1808
return getVariable(std::string(name.str, name.length), create_if_not_exists);
1811
user_var_entry *Session::getVariable(const std::string &name, bool create_if_not_exists)
1813
UserVarsRange ppp= user_vars.equal_range(name);
1768
user_var_entry *entry= NULL;
1769
UserVarsRange ppp= user_vars.equal_range(std::string(name.str, name.length));
1815
1771
for (UserVars::iterator iter= ppp.first;
1816
iter != ppp.second; ++iter)
1772
iter != ppp.second; ++iter)
1818
return (*iter).second;
1774
entry= (*iter).second;
1821
if (not create_if_not_exists)
1824
user_var_entry *entry= NULL;
1825
entry= new (nothrow) user_var_entry(name.c_str(), query_id);
1830
std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(name, entry));
1832
if (not returnable.second)
1777
if ((entry == NULL) && create_if_not_exists)
1779
entry= new (nothrow) user_var_entry(name.str, query_id);
1784
std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(std::string(name.str, name.length), entry));
1786
if (not returnable.second)
1840
void Session::setVariable(const std::string &name, const std::string &value)
1842
user_var_entry *updateable_var= getVariable(name.c_str(), true);
1844
updateable_var->update_hash(false,
1845
(void*)value.c_str(),
1846
static_cast<uint32_t>(value.length()), STRING_RESULT,
1848
DERIVATION_IMPLICIT, false);
1851
void Open_tables_state::mark_temp_tables_as_free_for_reuse()
1796
void Session::mark_temp_tables_as_free_for_reuse()
1853
1798
for (Table *table= temporary_tables ; table ; table= table->getNext())
1855
if (table->query_id == getQueryId())
1800
if (table->query_id == query_id)
1857
1802
table->query_id= 0;
1858
1803
table->cursor->ha_reset();
1916
1862
handled either before writing a query log event (inside
1917
1863
binlog_query()) or when preparing a pending event.
1865
mysql_unlock_tables(this, lock);
1923
Note that we need to hold table::Cache::singleton().mutex() while changing the
1869
Note that we need to hold LOCK_open while changing the
1924
1870
open_tables list. Another thread may work on it.
1925
(See: table::Cache::singleton().removeTable(), mysql_wait_completed_table())
1871
(See: remove_table_from_cache(), mysql_wait_completed_table())
1926
1872
Closing a MERGE child before the parent would be fatal if the
1927
1873
other thread tries to abort the MERGE lock in between.
1974
1920
might be an issue (lame engines).
1977
bool Open_tables_state::rm_temporary_table(const TableIdentifier &identifier, bool best_effort)
1923
bool Session::rm_temporary_table(TableIdentifier &identifier, bool best_effort)
1979
if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), identifier))
1925
if (plugin::StorageEngine::dropTable(*this, identifier))
1981
1927
if (not best_effort)
1984
identifier.getSQLPath(path);
1985
1929
errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
1986
path.c_str(), errno);
1930
identifier.getSQLPath().c_str(), errno);
1995
bool Open_tables_state::rm_temporary_table(plugin::StorageEngine *base, const TableIdentifier &identifier)
1939
bool Session::rm_temporary_table(plugin::StorageEngine *base, TableIdentifier &identifier)
1999
if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), *base, identifier))
1943
if (plugin::StorageEngine::dropTable(*this, *base, identifier))
2002
identifier.getSQLPath(path);
2003
1945
errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
2004
path.c_str(), errno);
1946
identifier.getSQLPath().c_str(), errno);
2037
1979
cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
2041
1982
cerr << "\tTabl;e Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2046
bool Session::TableMessages::storeTableMessage(const TableIdentifier &identifier, message::Table &table_message)
1986
bool Session::storeTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2048
1988
table_message_cache.insert(make_pair(identifier.getPath(), table_message));
2053
bool Session::TableMessages::removeTableMessage(const TableIdentifier &identifier)
1993
bool Session::removeTableMessage(const TableIdentifier &identifier)
2055
1995
TableMessageCache::iterator iter;
2114
table::Instance *Session::getInstanceTable()
2116
temporary_shares.push_back(new table::Instance()); // This will not go into the tableshare cache, so no key is used.
2118
table::Instance *tmp_share= temporary_shares.back();
2127
Create a reduced Table object with properly set up Field list from a
2128
list of field definitions.
2130
The created table doesn't have a table Cursor associated with
2131
it, has no keys, no group/distinct, no copy_funcs array.
2132
The sole purpose of this Table object is to use the power of Field
2133
class to read/write data to/from table->getInsertRecord(). Then one can store
2134
the record in any container (RB tree, hash, etc).
2135
The table is created in Session mem_root, so are the table's fields.
2136
Consequently, if you don't BLOB fields, you don't need to free it.
2138
@param session connection handle
2139
@param field_list list of column definitions
2142
0 if out of memory, Table object in case of success
2144
table::Instance *Session::getInstanceTable(List<CreateField> &field_list)
2146
temporary_shares.push_back(new table::Instance(this, field_list)); // This will not go into the tableshare cache, so no key is used.
2148
table::Instance *tmp_share= temporary_shares.back();
2157
static const std::string NONE= "NONE";
2158
static const std::string GOT_GLOBAL_READ_LOCK= "HAS GLOBAL READ LOCK";
2159
static const std::string MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT= "HAS GLOBAL READ LOCK WITH BLOCKING COMMIT";
2161
const std::string &type(drizzled::Session::global_read_lock_t type)
2167
case Session::GOT_GLOBAL_READ_LOCK:
2168
return GOT_GLOBAL_READ_LOCK;
2169
case Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT:
2170
return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT;
2174
size_t max_string_length(drizzled::Session::global_read_lock_t)
2176
return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT.size();
2179
} /* namespace display */
2054
TableShareInstance *Session::getTemporaryShare(TableIdentifier::Type type_arg)
2056
temporary_shares.push_back(new TableShareInstance(type_arg)); // This will not go into the tableshare cache, so no key is used.
2058
TableShareInstance *tmp_share= temporary_shares.back();
2181
2065
} /* namespace drizzled */