24
24
#include "config.h"
25
#include "drizzled/session.h"
26
#include "drizzled/session/cache.h"
25
#include <drizzled/session.h>
26
#include "drizzled/session_list.h"
27
27
#include <sys/stat.h>
28
#include "drizzled/error.h"
29
#include "drizzled/gettext.h"
30
#include "drizzled/query_id.h"
31
#include "drizzled/data_home.h"
32
#include "drizzled/sql_base.h"
33
#include "drizzled/lock.h"
34
#include "drizzled/item/cache.h"
35
#include "drizzled/item/float.h"
36
#include "drizzled/item/return_int.h"
37
#include "drizzled/item/empty_string.h"
38
#include "drizzled/show.h"
39
#include "drizzled/plugin/client.h"
28
#include <drizzled/error.h>
29
#include <drizzled/gettext.h>
30
#include <drizzled/query_id.h>
31
#include <drizzled/data_home.h>
32
#include <drizzled/sql_base.h>
33
#include <drizzled/lock.h>
34
#include <drizzled/item/cache.h>
35
#include <drizzled/item/float.h>
36
#include <drizzled/item/return_int.h>
37
#include <drizzled/item/empty_string.h>
38
#include <drizzled/show.h>
39
#include <drizzled/plugin/client.h>
40
40
#include "drizzled/plugin/scheduler.h"
41
41
#include "drizzled/plugin/authentication.h"
42
42
#include "drizzled/plugin/logging.h"
43
43
#include "drizzled/plugin/transactional_storage_engine.h"
44
#include "drizzled/plugin/query_rewrite.h"
45
44
#include "drizzled/probes.h"
46
45
#include "drizzled/table_proto.h"
47
46
#include "drizzled/db.h"
59
58
#include "drizzled/util/functors.h"
61
#include "drizzled/display.h"
64
61
#include <algorithm>
66
#include <boost/filesystem.hpp>
68
#include "drizzled/util/backtrace.h"
63
#include "boost/filesystem.hpp"
70
65
using namespace std;
87
82
return length == other.length &&
88
83
field_name.length == other.field_name.length &&
89
!my_strcasecmp(system_charset_info, field_name.str, other.field_name.str);
84
!strcmp(field_name.str, other.field_name.str);
92
87
Open_tables_state::Open_tables_state(uint64_t version_arg) :
162
157
Session::Session(plugin::Client *client_arg) :
163
158
Open_tables_state(refresh_version),
164
159
mem_root(&main_mem_root),
167
query(new std::string),
168
_schema(new std::string("")),
170
162
client(client_arg),
172
164
scheduler_arg(NULL),
173
165
lock_id(&main_lock_id),
175
167
ha_data(plugin::num_trx_monitored_objects),
176
concurrent_execute_allowed(true),
177
168
arg_of_last_insert_id_function(false),
178
169
first_successful_insert_id_in_prev_stmt(0),
179
170
first_successful_insert_id_in_cur_stmt(0),
180
171
limit_found_rows(0),
181
_global_read_lock(NONE),
183
173
some_tables_deleted(false),
184
174
no_errors(false),
195
185
session_event_observers(NULL),
188
memset(process_list_info, 0, PROCESS_LIST_WIDTH);
198
189
client->setSession(this);
412
402
delete (*iter).second;
414
404
life_properties.clear();
417
void Session::setClient(plugin::Client *client_arg)
420
client->setSession(this);
423
void Session::awake(Session::killed_state_t state_to_set)
425
if ((state_to_set == Session::KILL_QUERY) and (command == COM_SLEEP))
406
/* Ensure that no one is using Session */
407
LOCK_delete.unlock();
410
void Session::awake(Session::killed_state state_to_set)
428
412
this->checkSentry();
430
setKilled(state_to_set);
431
scheduler->killSession(this);
413
safe_mutex_assert_owner(&LOCK_delete);
415
killed= state_to_set;
433
416
if (state_to_set != Session::KILL_QUERY)
418
scheduler->killSession(this);
435
419
DRIZZLE_CONNECTION_DONE(thread_id);
440
423
boost_unique_lock_t scopedLock(mysys_var->mutex);
548
531
prepareForQueries();
550
while (not client->haveError() && getKilled() != KILL_CONNECTION)
533
while (! client->haveError() && killed != KILL_CONNECTION)
552
if (not executeStatement())
535
if (! executeStatement())
556
539
disconnect(0, true);
559
bool Session::schedule(Session::shared_ptr &arg)
542
bool Session::schedule()
561
arg->scheduler= plugin::Scheduler::getScheduler();
562
assert(arg->scheduler);
544
scheduler= plugin::Scheduler::getScheduler();
564
547
connection_count.increment();
571
554
current_global_counters.connections++;
572
arg->thread_id= arg->variables.pseudo_thread_id= global_thread_id++;
574
session::Cache::singleton().insert(arg);
576
if (unlikely(plugin::EventObserver::connectSession(*arg)))
578
// We should do something about an error...
581
if (plugin::Scheduler::getScheduler()->addSession(arg))
583
DRIZZLE_CONNECTION_START(arg->getSessionId());
555
thread_id= variables.pseudo_thread_id= global_thread_id++;
558
boost::mutex::scoped_lock scoped(LOCK_thread_count);
559
getSessionList().push_back(this);
562
if (unlikely(plugin::EventObserver::connectSession(*this)))
564
// We should do something about an error...
567
if (unlikely(plugin::EventObserver::connectSession(*this)))
569
// We should do something about an error...
572
if (scheduler->addSession(this))
574
DRIZZLE_CONNECTION_START(thread_id);
584
575
char error_message_buff[DRIZZLE_ERRMSG_SIZE];
586
arg->setKilled(Session::KILL_CONNECTION);
577
killed= Session::KILL_CONNECTION;
588
arg->status_var.aborted_connects++;
579
status_var.aborted_connects++;
590
581
/* Can't use my_error() since store_globals has not been called. */
591
582
/* TODO replace will better error message */
592
583
snprintf(error_message_buff, sizeof(error_message_buff),
593
584
ER(ER_CANT_CREATE_THREAD), 1);
594
arg->client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
585
client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
604
Is this session viewable by the current user?
606
bool Session::isViewable() const
608
return plugin::Authorization::isAuthorized(current_session->getSecurityContext(),
614
593
const char* Session::enter_cond(boost::condition_variable_any &cond, boost::mutex &mutex, const char* msg)
616
595
const char* old_msg = get_proc_info();
694
673
main_da.reset_diagnostics_area();
696
675
if (client->readCommand(&l_packet, &packet_length) == false)
701
if (getKilled() == KILL_CONNECTION)
678
if (killed == KILL_CONNECTION)
704
681
if (packet_length == 0)
707
l_command= static_cast<enum_server_command>(l_packet[0]);
684
l_command= (enum enum_server_command) (unsigned char) l_packet[0];
709
686
if (command >= COM_END)
710
687
command= COM_END; // Wrong command
712
689
assert(packet_length);
713
return not dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
690
return ! dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
716
693
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
722
699
in_packet_length--;
724
701
const char *pos= in_packet + in_packet_length; /* Point at end null */
725
while (in_packet_length > 0 && (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
702
while (in_packet_length > 0 &&
703
(pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
728
706
in_packet_length--;
731
std::string *new_query= new std::string(in_packet, in_packet + in_packet_length);
732
// We can not be entirely sure _schema has a value
735
plugin::QueryRewriter::rewriteQuery(*_schema, *new_query);
737
query.reset(new_query);
738
_state.reset(new State(in_packet, in_packet_length));
709
query.assign(in_packet, in_packet + in_packet_length);
792
763
if (result == false)
794
764
my_error(killed_errno(), MYF(0));
796
765
else if ((result == true) && do_release)
798
setKilled(Session::KILL_CONNECTION);
766
killed= Session::KILL_CONNECTION;
955
922
my_message(errcode, err, MYF(0));
958
(void) cache->end_io_cache();
925
(void) end_io_cache(cache);
959
926
(void) internal::my_close(file, MYF(0));
960
927
(void) internal::my_delete(path.file_string().c_str(), MYF(0)); // Delete file on error
966
933
bool select_to_file::send_eof()
968
int error= test(cache->end_io_cache());
935
int error= test(end_io_cache(cache));
969
936
if (internal::my_close(file, MYF(MY_WME)))
987
954
/* In case of error send_eof() may be not called: close the file here. */
990
(void) cache->end_io_cache();
957
(void) end_io_cache(cache);
991
958
(void) internal::my_close(file, MYF(0));
1046
1013
if (not to_file.has_root_directory())
1048
1015
target_path= fs::system_complete(getDataHomeCatalog());
1049
util::string::const_shared_ptr schema(session->schema());
1050
if (schema and not schema->empty())
1016
if (not session->db.empty())
1052
1018
int count_elements= 0;
1053
1019
for (fs::path::iterator iter= to_file.begin();
1086
1052
if ((file= internal::my_create(target_path.file_string().c_str(), 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
1088
1054
(void) fchmod(file, 0666); // Because of umask()
1089
if (cache->init_io_cache(file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1055
if (init_io_cache(cache, file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1091
1057
internal::my_close(file, MYF(0));
1092
1058
internal::my_delete(target_path.file_string().c_str(), MYF(0)); // Delete file on error, it was just created
1201
1165
null_buff[0]=escape_char;
1202
1166
null_buff[1]='N';
1203
1167
if (my_b_write(cache,(unsigned char*) null_buff,2))
1206
1170
else if (my_b_write(cache,(unsigned char*) "NULL",4))
1295
1259
tmp_buff[1]= *pos ? *pos : '0';
1296
1260
if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)) ||
1297
1261
my_b_write(cache,(unsigned char*) tmp_buff,2))
1302
1266
if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)))
1305
1269
else if (my_b_write(cache,(unsigned char*) res->ptr(),used_length))
1308
1272
if (fixed_row_size)
1309
1273
{ // Fill with space
1319
1283
for (; length > sizeof(space) ; length-=sizeof(space))
1321
1285
if (my_b_write(cache,(unsigned char*) space,sizeof(space)))
1324
1288
if (my_b_write(cache,(unsigned char*) space,length))
1328
1292
if (res && enclosed)
1330
1294
if (my_b_write(cache, (unsigned char*) exchange->enclosed->ptr(),
1331
1295
exchange->enclosed->length()))
1334
1298
if (--items_left)
1336
1300
if (my_b_write(cache, (unsigned char*) exchange->field_term->ptr(),
1337
1301
field_term_length))
1341
1305
if (my_b_write(cache,(unsigned char*) exchange->line_term->ptr(),
1342
1306
exchange->line_term->length()))
1554
1517
void Session::end_statement()
1556
1519
/* Cleanup SQL processing state to reuse this statement in next query. */
1558
1521
query_cache_key= ""; // reset the cache key
1559
1522
resetResultsetMessage();
1562
1525
bool Session::copy_db_to(char **p_db, size_t *p_db_length)
1565
if (_schema and _schema->empty())
1567
my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1570
else if (not _schema)
1572
my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1577
*p_db= strmake(_schema->c_str(), _schema->size());
1578
*p_db_length= _schema->size();
1529
my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1532
*p_db= strmake(db.c_str(), db.length());
1533
*p_db_length= db.length();
1619
void Session::set_db(const std::string &new_db)
1573
bool Session::set_db(const std::string &new_db)
1621
1575
/* Do not reallocate memory if current chunk is big enough. */
1622
1576
if (new_db.length())
1624
_schema.reset(new std::string(new_db));
1628
_schema.reset(new std::string(""));
1588
Check the killed state of a user thread
1589
@param session user thread
1590
@retval 0 the user thread is active
1591
@retval 1 the user thread has been killed
1593
int session_killed(const Session *session)
1595
return(session->killed);
1599
const struct charset_info_st *session_charset(Session *session)
1601
return(session->charset());
1634
1605
Mark transaction to rollback and mark error as fatal to a sub-statement.
1651
1622
plugin_sessionvar_cleanup(this);
1653
1624
/* If necessary, log any aborted or unauthorized connections */
1654
if (getKilled() || client->wasAborted())
1625
if (killed || client->wasAborted())
1656
1627
status_var.aborted_threads++;
1659
1630
if (client->wasAborted())
1661
if (not getKilled() && variables.log_warnings > 1)
1632
if (! killed && variables.log_warnings > 1)
1663
1634
SecurityContext *sctx= &security_ctx;
1665
1636
errmsg_printf(ERRMSG_LVL_WARN, ER(ER_NEW_ABORTING_CONNECTION)
1667
, (_schema->empty() ? "unconnected" : _schema->c_str())
1638
, (db.empty() ? "unconnected" : db.c_str())
1668
1639
, sctx->getUser().empty() == false ? sctx->getUser().c_str() : "unauthenticated"
1669
1640
, sctx->getIp().c_str()
1670
1641
, (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1674
1645
/* Close out our connection to the client */
1675
1646
if (should_lock)
1676
session::Cache::singleton().mutex().lock();
1678
setKilled(Session::KILL_CONNECTION);
1647
LOCK_thread_count.lock();
1648
killed= Session::KILL_CONNECTION;
1680
1649
if (client->isConnected())
1774
1740
If this is needed, use close_temporary_table()
1777
void Open_tables_state::nukeTable(Table *table)
1743
void Session::nukeTable(Table *table)
1779
1745
plugin::StorageEngine *table_type= table->getShare()->db_type();
1848
1814
DERIVATION_IMPLICIT, false);
1851
void Open_tables_state::mark_temp_tables_as_free_for_reuse()
1817
void Session::mark_temp_tables_as_free_for_reuse()
1853
1819
for (Table *table= temporary_tables ; table ; table= table->getNext())
1855
if (table->query_id == getQueryId())
1821
if (table->query_id == query_id)
1857
1823
table->query_id= 0;
1858
1824
table->cursor->ha_reset();
1916
1883
handled either before writing a query log event (inside
1917
1884
binlog_query()) or when preparing a pending event.
1886
mysql_unlock_tables(this, lock);
1923
Note that we need to hold table::Cache::singleton().mutex() while changing the
1890
Note that we need to hold LOCK_open while changing the
1924
1891
open_tables list. Another thread may work on it.
1925
(See: table::Cache::singleton().removeTable(), mysql_wait_completed_table())
1892
(See: remove_table_from_cache(), mysql_wait_completed_table())
1926
1893
Closing a MERGE child before the parent would be fatal if the
1927
1894
other thread tries to abort the MERGE lock in between.
1974
1941
might be an issue (lame engines).
1977
bool Open_tables_state::rm_temporary_table(const TableIdentifier &identifier, bool best_effort)
1944
bool Session::rm_temporary_table(TableIdentifier &identifier, bool best_effort)
1979
if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), identifier))
1946
if (plugin::StorageEngine::dropTable(*this, identifier))
1981
1948
if (not best_effort)
1984
identifier.getSQLPath(path);
1985
1950
errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
1986
path.c_str(), errno);
1951
identifier.getSQLPath().c_str(), errno);
1995
bool Open_tables_state::rm_temporary_table(plugin::StorageEngine *base, const TableIdentifier &identifier)
1960
bool Session::rm_temporary_table(plugin::StorageEngine *base, TableIdentifier &identifier)
1999
if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), *base, identifier))
1964
if (plugin::StorageEngine::dropTable(*this, *base, identifier))
2002
identifier.getSQLPath(path);
2003
1966
errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
2004
path.c_str(), errno);
1967
identifier.getSQLPath().c_str(), errno);
2013
1976
@note this will be removed, I am looking through Hudson to see if it is finding
2014
1977
any tables that are missed during cleanup.
2016
void Open_tables_state::dumpTemporaryTableNames(const char *foo)
1979
void Session::dumpTemporaryTableNames(const char *foo)
2037
2000
cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
2041
2003
cerr << "\tTabl;e Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2046
bool Session::TableMessages::storeTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2007
bool Session::storeTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2048
2009
table_message_cache.insert(make_pair(identifier.getPath(), table_message));
2053
bool Session::TableMessages::removeTableMessage(const TableIdentifier &identifier)
2014
bool Session::removeTableMessage(const TableIdentifier &identifier)
2055
2016
TableMessageCache::iterator iter;
2067
bool Session::TableMessages::getTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2028
bool Session::getTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2069
2030
TableMessageCache::iterator iter;
2095
bool Session::TableMessages::renameTableMessage(const TableIdentifier &from, const TableIdentifier &to)
2056
bool Session::renameTableMessage(const TableIdentifier &from, const TableIdentifier &to)
2097
2058
TableMessageCache::iterator iter;
2122
2083
return tmp_share;
2127
Create a reduced Table object with properly set up Field list from a
2128
list of field definitions.
2130
The created table doesn't have a table Cursor associated with
2131
it, has no keys, no group/distinct, no copy_funcs array.
2132
The sole purpose of this Table object is to use the power of Field
2133
class to read/write data to/from table->getInsertRecord(). Then one can store
2134
the record in any container (RB tree, hash, etc).
2135
The table is created in Session mem_root, so are the table's fields.
2136
Consequently, if you don't BLOB fields, you don't need to free it.
2138
@param session connection handle
2139
@param field_list list of column definitions
2142
0 if out of memory, Table object in case of success
2144
table::Instance *Session::getInstanceTable(List<CreateField> &field_list)
2146
temporary_shares.push_back(new table::Instance(this, field_list)); // This will not go into the tableshare cache, so no key is used.
2148
table::Instance *tmp_share= temporary_shares.back();
2157
static const std::string NONE= "NONE";
2158
static const std::string GOT_GLOBAL_READ_LOCK= "HAS GLOBAL READ LOCK";
2159
static const std::string MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT= "HAS GLOBAL READ LOCK WITH BLOCKING COMMIT";
2161
const std::string &type(drizzled::Session::global_read_lock_t type)
2167
case Session::GOT_GLOBAL_READ_LOCK:
2168
return GOT_GLOBAL_READ_LOCK;
2169
case Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT:
2170
return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT;
2174
size_t max_string_length(drizzled::Session::global_read_lock_t)
2176
return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT.size();
2179
} /* namespace display */
2181
2086
} /* namespace drizzled */