117
287
@see Session::set_proc_info
119
void set_session_proc_info(Session *session, const char *info)
290
set_session_proc_info(Session *session, const char *info)
121
292
session->set_proc_info(info);
124
296
const char *get_session_proc_info(Session *session)
126
298
return session->get_proc_info();
129
void **Session::getEngineData(const plugin::MonitoredInTransaction *monitored)
131
return static_cast<void **>(&ha_data[monitored->getId()].ha_ptr);
134
ResourceContext *Session::getResourceContext(const plugin::MonitoredInTransaction *monitored,
137
return &ha_data[monitored->getId()].resource_context[index];
302
void **session_ha_data(const Session *session, const struct handlerton *hton)
304
return (void **) &session->ha_data[hton->slot].ha_ptr;
140
308
int64_t session_test_options(const Session *session, int64_t test_options)
142
310
return session->options & test_options;
145
314
int session_sql_command(const Session *session)
147
316
return (int) session->lex->sql_command;
150
enum_tx_isolation session_tx_isolation(const Session *session)
152
return (enum_tx_isolation)session->variables.tx_isolation;
155
Session::Session(plugin::Client *client_arg) :
156
Open_tables_state(refresh_version),
157
mem_root(&main_mem_root),
163
lock_id(&main_lock_id),
165
ha_data(plugin::num_trx_monitored_objects),
166
arg_of_last_insert_id_function(false),
167
first_successful_insert_id_in_prev_stmt(0),
168
first_successful_insert_id_in_cur_stmt(0),
171
some_tables_deleted(false),
174
is_fatal_error(false),
175
transaction_rollback_request(false),
176
is_fatal_sub_stmt_error(0),
177
derived_tables_processing(false),
178
tablespace_op(false),
181
transaction_message(NULL),
182
statement_message(NULL),
183
session_event_observers(NULL),
186
memset(process_list_info, 0, PROCESS_LIST_WIDTH);
187
client->setSession(this);
320
int session_tx_isolation(const Session *session)
322
return (int) session->variables.tx_isolation;
326
void session_inc_row_count(Session *session)
328
session->row_count++;
332
Clear this diagnostics area.
334
Normally called at the end of a statement.
338
Diagnostics_area::reset_diagnostics_area()
340
can_overwrite_status= false;
341
/** Don't take chances in production */
347
m_total_warn_count= 0;
349
/** Tiny reset in debug mode to see garbage right away */
355
Set OK status -- ends commands that do not return a
356
result set, e.g. INSERT/UPDATE/DELETE.
360
Diagnostics_area::set_ok_status(Session *session, ha_rows affected_rows_arg,
361
uint64_t last_insert_id_arg,
362
const char *message_arg)
366
In production, refuse to overwrite an error or a custom response
369
if (is_error() || is_disabled())
371
/** Only allowed to report success if has not yet reported an error */
373
m_server_status= session->server_status;
374
m_total_warn_count= session->total_warn_count;
375
m_affected_rows= affected_rows_arg;
376
m_last_insert_id= last_insert_id_arg;
378
strmake(m_message, message_arg, sizeof(m_message) - 1);
390
Diagnostics_area::set_eof_status(Session *session)
392
/** Only allowed to report eof if has not yet reported an error */
396
In production, refuse to overwrite an error or a custom response
399
if (is_error() || is_disabled())
402
m_server_status= session->server_status;
404
If inside a stored procedure, do not return the total
405
number of warnings, since they are not available to the client
408
m_total_warn_count= session->total_warn_count;
418
Diagnostics_area::set_error_status(Session *session __attribute__((unused)),
419
uint32_t sql_errno_arg,
420
const char *message_arg)
423
Only allowed to report error if has not yet reported a success
424
The only exception is when we flush the message to the client,
425
an error can happen during the flush.
427
assert(! is_set() || can_overwrite_status);
429
In production, refuse to overwrite a custom response with an
435
m_sql_errno= sql_errno_arg;
436
strmake(m_message, message_arg, sizeof(m_message) - 1);
443
Mark the diagnostics area as 'DISABLED'.
445
This is used in rare cases when the COM_ command at hand sends a response
446
in a custom format. One example is the query cache, another is
451
Diagnostics_area::disable_status()
454
m_status= DA_DISABLED;
459
:Statement(&main_lex, &main_mem_root,
460
/* statement id */ 0),
461
Open_tables_state(refresh_version), rli_fake(0),
462
lock_id(&main_lock_id),
464
binlog_table_maps(0), binlog_flags(0UL),
465
arg_of_last_insert_id_function(false),
466
first_successful_insert_id_in_prev_stmt(0),
467
first_successful_insert_id_in_prev_stmt_for_binlog(0),
468
first_successful_insert_id_in_cur_stmt(0),
469
stmt_depends_on_first_successful_insert_id_in_prev_stmt(false),
472
transaction_rollback_request(0),
473
is_fatal_sub_stmt_error(0),
477
derived_tables_processing(false),
190
483
Pass nominal parameters to init_alloc_root only to ensure that
191
484
the destructor works OK in case of an error. The main_mem_root
192
485
will be re-initialized in init_for_queries().
194
memory::init_sql_alloc(&main_mem_root, memory::ROOT_MIN_BLOCK_SIZE, 0);
196
count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
487
init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0);
489
catalog= (char*)"std"; // the only catalog we have for now
490
main_security_ctx.init();
491
security_ctx= &main_security_ctx;
492
some_tables_deleted=no_errors=password= 0;
494
count_cuted_fields= CHECK_FIELD_IGNORE;
197
495
killed= NOT_KILLED;
497
is_slave_error= thread_specific_used= false;
498
hash_clear(&handler_tables_hash);
201
501
cuted_fields= sent_row_count= row_count= 0L;
202
503
row_count_func= -1;
203
504
statement_id_counter= 0UL;
204
505
// Must be reset to handle error with Session's created for init of mysqld
461
897
assert(thread_stack);
463
currentSession().release();
464
currentSession().reset(this);
466
currentMemRoot().release();
467
currentMemRoot().reset(&mem_root);
899
if (pthread_setspecific(THR_Session, this) ||
900
pthread_setspecific(THR_MALLOC, &mem_root))
469
902
mysys_var=my_thread_var;
472
904
Let mysqld define the thread id (not mysys)
473
905
This allows us to move Session to different threads if needed.
475
907
mysys_var->id= thread_id;
908
real_id= pthread_self(); // For debugging
478
911
We have to call thr_lock_info_init() again here as Session may have been
479
912
created in another thread
914
thr_lock_info_init(&lock_info);
487
Init Session for query processing.
488
This has to be called once before we call mysql_parse.
489
See also comments in session.h.
923
Session::cleanup_after_query()
926
This function is used to reset thread data to its default state.
929
This function is not suitable for setting thread data to some
930
non-default values, as there is only one replication thread, so
931
different master threads may overwrite data of each other on
492
void Session::prepareForQueries()
494
if (variables.max_join_size == HA_POS_ERROR)
495
options |= OPTION_BIG_SELECTS;
497
version= refresh_version;
502
mem_root->reset_root_defaults(variables.query_alloc_block_size,
503
variables.query_prealloc_size);
504
transaction.xid_state.xid.null();
505
transaction.xid_state.in_session=1;
510
bool Session::initGlobals()
514
disconnect(ER_OUT_OF_RESOURCES, true);
515
status_var.aborted_connects++;
523
if (initGlobals() || authenticate())
531
while (! client->haveError() && killed != KILL_CONNECTION)
533
if (! executeStatement())
540
bool Session::schedule()
542
scheduler= plugin::Scheduler::getScheduler();
545
connection_count.increment();
547
if (connection_count > current_global_counters.max_used_connections)
549
current_global_counters.max_used_connections= connection_count;
552
current_global_counters.connections++;
553
thread_id= variables.pseudo_thread_id= global_thread_id++;
555
LOCK_thread_count.lock();
556
getSessionList().push_back(this);
557
LOCK_thread_count.unlock();
559
if (scheduler->addSession(this))
561
DRIZZLE_CONNECTION_START(thread_id);
562
char error_message_buff[DRIZZLE_ERRMSG_SIZE];
564
killed= Session::KILL_CONNECTION;
566
status_var.aborted_connects++;
568
/* Can't use my_error() since store_globals has not been called. */
569
/* TODO replace will better error message */
570
snprintf(error_message_buff, sizeof(error_message_buff),
571
ER(ER_CANT_CREATE_THREAD), 1);
572
client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
580
const char* Session::enter_cond(boost::condition_variable &cond, boost::mutex &mutex, const char* msg)
582
const char* old_msg = get_proc_info();
583
safe_mutex_assert_owner(mutex);
584
mysys_var->current_mutex = &mutex;
585
mysys_var->current_cond = &cond;
586
this->set_proc_info(msg);
590
void Session::exit_cond(const char* old_msg)
593
Putting the mutex unlock in exit_cond() ensures that
594
mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
595
locked (if that would not be the case, you'll get a deadlock if someone
596
does a Session::awake() on you).
598
mysys_var->current_mutex->unlock();
599
boost::mutex::scoped_lock scopedLock(mysys_var->mutex);
600
mysys_var->current_mutex = 0;
601
mysys_var->current_cond = 0;
602
this->set_proc_info(old_msg);
605
bool Session::authenticate()
608
if (client->authenticate())
611
status_var.aborted_connects++;
616
bool Session::checkUser(const char *passwd, uint32_t passwd_len, const char *in_db)
618
const string passwd_str(passwd, passwd_len);
619
bool is_authenticated=
620
plugin::Authentication::isAuthenticated(getSecurityContext(),
623
if (is_authenticated != true)
625
status_var.access_denied++;
626
/* isAuthenticated has pushed the error message */
630
/* Change database if necessary */
631
if (in_db && in_db[0])
633
SchemaIdentifier identifier(in_db);
634
if (mysql_change_db(this, identifier))
636
/* mysql_change_db() has pushed the error message. */
641
password= test(passwd_len); // remember for error messages
643
/* Ready to handle queries */
647
bool Session::executeStatement()
650
uint32_t packet_length;
652
enum enum_server_command l_command;
655
indicator of uninitialized lex => normal flow of errors handling
658
lex->current_select= 0;
660
main_da.reset_diagnostics_area();
662
if (client->readCommand(&l_packet, &packet_length) == false)
665
if (killed == KILL_CONNECTION)
668
if (packet_length == 0)
671
l_command= (enum enum_server_command) (unsigned char) l_packet[0];
673
if (command >= COM_END)
674
command= COM_END; // Wrong command
676
assert(packet_length);
677
return ! dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
680
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
682
/* Remove garbage at start and end of query */
683
while (in_packet_length > 0 && my_isspace(charset(), in_packet[0]))
688
const char *pos= in_packet + in_packet_length; /* Point at end null */
689
while (in_packet_length > 0 &&
690
(pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
696
query.assign(in_packet, in_packet + in_packet_length);
701
bool Session::endTransaction(enum enum_mysql_completiontype completion)
705
TransactionServices &transaction_services= TransactionServices::singleton();
707
if (transaction.xid_state.xa_state != XA_NOTR)
709
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
716
* We don't use endActiveTransaction() here to ensure that this works
717
* even if there is a problem with the OPTION_AUTO_COMMIT flag
718
* (Which of course should never happen...)
720
server_status&= ~SERVER_STATUS_IN_TRANS;
721
if (transaction_services.commitTransaction(this, true))
723
options&= ~(OPTION_BEGIN);
726
do_release= 1; /* fall through */
727
case COMMIT_AND_CHAIN:
728
result= endActiveTransaction();
729
if (result == true && completion == COMMIT_AND_CHAIN)
730
result= startTransaction();
732
case ROLLBACK_RELEASE:
733
do_release= 1; /* fall through */
735
case ROLLBACK_AND_CHAIN:
737
server_status&= ~SERVER_STATUS_IN_TRANS;
738
if (transaction_services.rollbackTransaction(this, true))
740
options&= ~(OPTION_BEGIN);
741
if (result == true && (completion == ROLLBACK_AND_CHAIN))
742
result= startTransaction();
746
my_error(ER_UNKNOWN_COM_ERROR, MYF(0));
751
my_error(killed_errno(), MYF(0));
752
else if ((result == true) && do_release)
753
killed= Session::KILL_CONNECTION;
758
bool Session::endActiveTransaction()
761
TransactionServices &transaction_services= TransactionServices::singleton();
763
if (transaction.xid_state.xa_state != XA_NOTR)
765
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
768
if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
770
server_status&= ~SERVER_STATUS_IN_TRANS;
771
if (transaction_services.commitTransaction(this, true))
774
options&= ~(OPTION_BEGIN);
778
bool Session::startTransaction(start_transaction_option_t opt)
782
if (! endActiveTransaction())
788
options|= OPTION_BEGIN;
789
server_status|= SERVER_STATUS_IN_TRANS;
791
if (plugin::TransactionalStorageEngine::notifyStartTransaction(this, opt))
800
935
void Session::cleanup_after_query()
803
Reset rand_used so that detection of calls to rand() will save random
938
Reset rand_used so that detection of calls to rand() will save random
804
939
seeds if needed by the slave.
807
942
/* Forget those values, for next binlogger: */
943
stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
808
944
auto_inc_intervals_in_cur_stmt_for_binlog.empty();
810
947
if (first_successful_insert_id_in_cur_stmt > 0)
812
949
/* set what LAST_INSERT_ID() will return */
813
first_successful_insert_id_in_prev_stmt= first_successful_insert_id_in_cur_stmt;
950
first_successful_insert_id_in_prev_stmt=
951
first_successful_insert_id_in_cur_stmt;
814
952
first_successful_insert_id_in_cur_stmt= 0;
815
953
substitute_null_with_insert_id= true;
817
arg_of_last_insert_id_function= false;
955
arg_of_last_insert_id_function= 0;
818
956
/* Free Items that were created during this execution */
820
958
/* Reset where. */
821
959
where= Session::DEFAULT_WHERE;
823
/* Reset the temporary shares we built */
824
for (std::vector<TableShareInstance *>::iterator iter= temporary_shares.begin();
825
iter != temporary_shares.end(); iter++)
829
temporary_shares.clear();
833
964
Create a LEX_STRING in this connection.
1595
2291
session->transaction_rollback_request= all;
1599
void Session::disconnect(uint32_t errcode, bool should_lock)
1601
/* Allow any plugins to cleanup their session variables */
1602
plugin_sessionvar_cleanup(this);
1604
/* If necessary, log any aborted or unauthorized connections */
1605
if (killed || client->wasAborted())
1607
status_var.aborted_threads++;
1610
if (client->wasAborted())
1612
if (! killed && variables.log_warnings > 1)
1614
SecurityContext *sctx= &security_ctx;
1616
errmsg_printf(ERRMSG_LVL_WARN, ER(ER_NEW_ABORTING_CONNECTION)
1618
, (db.empty() ? "unconnected" : db.c_str())
1619
, sctx->getUser().empty() == false ? sctx->getUser().c_str() : "unauthenticated"
1620
, sctx->getIp().c_str()
1621
, (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1625
/* Close out our connection to the client */
1627
LOCK_thread_count.lock();
1628
killed= Session::KILL_CONNECTION;
1629
if (client->isConnected())
1633
/*my_error(errcode, ER(errcode));*/
1634
client->sendError(errcode, ER(errcode));
1639
(void) LOCK_thread_count.unlock();
1642
void Session::reset_for_next_command()
1647
Those two lines below are theoretically unneeded as
1648
Session::cleanup_after_query() should take care of this already.
1650
auto_inc_intervals_in_cur_stmt_for_binlog.empty();
1652
is_fatal_error= false;
1653
server_status&= ~ (SERVER_MORE_RESULTS_EXISTS |
1654
SERVER_QUERY_NO_INDEX_USED |
1655
SERVER_QUERY_NO_GOOD_INDEX_USED);
1658
main_da.reset_diagnostics_area();
1659
total_warn_count=0; // Warnings for this query
1660
sent_row_count= examined_row_count= 0;
1664
Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
1667
void Session::close_temporary_tables()
1672
if (not temporary_tables)
1675
for (table= temporary_tables; table; table= tmp_next)
1677
tmp_next= table->getNext();
1680
temporary_tables= NULL;
1684
unlink from session->temporary tables and close temporary table
1687
void Session::close_temporary_table(Table *table)
1689
if (table->getPrev())
1691
table->getPrev()->setNext(table->getNext());
1692
if (table->getPrev()->getNext())
1694
table->getNext()->setPrev(table->getPrev());
1699
/* removing the item from the list */
1700
assert(table == temporary_tables);
1702
slave must reset its temporary list pointer to zero to exclude
1703
passing non-zero value to end_slave via rli->save_temporary_tables
1704
when no temp tables opened, see an invariant below.
1706
temporary_tables= table->getNext();
1707
if (temporary_tables)
1709
table->getNext()->setPrev(NULL);
1716
Close and drop a temporary table
1719
This dosn't unlink table from session->temporary
1720
If this is needed, use close_temporary_table()
1723
void Session::nukeTable(Table *table)
1725
plugin::StorageEngine *table_type= table->getShare()->db_type();
1727
table->free_io_cache();
1728
table->delete_table();
1730
TableIdentifier identifier(table->getShare()->getSchemaName(), table->getShare()->getTableName(), table->getShare()->getPath());
1731
rm_temporary_table(table_type, identifier);
1733
delete table->getMutableShare();
1735
/* This makes me sad, but we're allocating it via malloc */
1739
/** Clear most status variables. */
1740
extern time_t flush_status_time;
1742
void Session::refresh_status()
1744
/* Reset thread's status variables */
1745
memset(&status_var, 0, sizeof(status_var));
1747
flush_status_time= time((time_t*) 0);
1748
current_global_counters.max_used_connections= 1; /* We set it to one, because we know we exist */
1749
current_global_counters.connections= 0;
1752
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1754
user_var_entry *entry= NULL;
1755
UserVarsRange ppp= user_vars.equal_range(std::string(name.str, name.length));
1757
for (UserVars::iterator iter= ppp.first;
1758
iter != ppp.second; ++iter)
1760
entry= (*iter).second;
1763
if ((entry == NULL) && create_if_not_exists)
1765
entry= new (nothrow) user_var_entry(name.str, query_id);
1770
std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(std::string(name.str, name.length), entry));
1772
if (not returnable.second)
1782
void Session::mark_temp_tables_as_free_for_reuse()
1784
for (Table *table= temporary_tables ; table ; table= table->getNext())
1786
if (table->query_id == query_id)
1789
table->cursor->ha_reset();
1794
void Session::mark_used_tables_as_free_for_reuse(Table *table)
1796
for (; table ; table= table->getNext())
1798
if (table->query_id == query_id)
1801
table->cursor->ha_reset();
1807
Unlocks tables and frees derived tables.
1808
Put all normal tables used by thread in free list.
1810
It will only close/mark as free for reuse tables opened by this
1811
substatement, it will also check if we are closing tables after
1812
execution of complete query (i.e. we are on upper level) and will
1813
leave prelocked mode if needed.
1815
void Session::close_thread_tables()
1818
derived_tables= NULL; // They should all be invalid by this point
1821
Mark all temporary tables used by this statement as free for reuse.
1823
mark_temp_tables_as_free_for_reuse();
1825
Let us commit transaction for statement. Since in 5.0 we only have
1826
one statement transaction and don't allow several nested statement
1827
transactions this call will do nothing if we are inside of stored
1828
function or trigger (i.e. statement transaction is already active and
1829
does not belong to statement for which we do close_thread_tables()).
1830
TODO: This should be fixed in later releases.
1833
TransactionServices &transaction_services= TransactionServices::singleton();
1834
main_da.can_overwrite_status= true;
1835
transaction_services.autocommitOrRollback(this, is_error());
1836
main_da.can_overwrite_status= false;
1837
transaction.stmt.reset();
1843
For RBR we flush the pending event just before we unlock all the
1844
tables. This means that we are at the end of a topmost
1845
statement, so we ensure that the STMT_END_F flag is set on the
1846
pending event. For statements that are *inside* stored
1847
functions, the pending event will not be flushed: that will be
1848
handled either before writing a query log event (inside
1849
binlog_query()) or when preparing a pending event.
1851
mysql_unlock_tables(this, lock);
1855
Note that we need to hold LOCK_open while changing the
1856
open_tables list. Another thread may work on it.
1857
(See: remove_table_from_cache(), mysql_wait_completed_table())
1858
Closing a MERGE child before the parent would be fatal if the
1859
other thread tries to abort the MERGE lock in between.
1862
close_open_tables();
1865
void Session::close_tables_for_reopen(TableList **tables)
1868
If table list consists only from tables from prelocking set, table list
1869
for new attempt should be empty, so we have to update list's root pointer.
1871
if (lex->first_not_own_table() == *tables)
1873
lex->chop_off_not_own_tables();
1874
for (TableList *tmp= *tables; tmp; tmp= tmp->next_global)
1876
close_thread_tables();
1879
bool Session::openTablesLock(TableList *tables)
1886
if (open_tables_from_list(&tables, &counter))
1889
if (not lock_tables(tables, counter, &need_reopen))
1891
if (not need_reopen)
1893
close_tables_for_reopen(&tables);
1895
if ((mysql_handle_derived(lex, &mysql_derived_prepare) ||
1896
(fill_derived_tables() &&
1897
mysql_handle_derived(lex, &mysql_derived_filling))))
1903
bool Session::openTables(TableList *tables, uint32_t flags)
1906
bool ret= fill_derived_tables();
1907
assert(ret == false);
1908
if (open_tables_from_list(&tables, &counter, flags) ||
1909
mysql_handle_derived(lex, &mysql_derived_prepare))
1917
@note "best_effort" is used in cases were if a failure occurred on this
1918
operation it would not be surprising because we are only removing because there
1919
might be an issue (lame engines).
1922
bool Session::rm_temporary_table(TableIdentifier &identifier, bool best_effort)
1924
if (plugin::StorageEngine::dropTable(*this, identifier))
1926
if (not best_effort)
1928
errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
1929
identifier.getSQLPath().c_str(), errno);
1938
bool Session::rm_temporary_table(plugin::StorageEngine *base, TableIdentifier &identifier)
1942
if (plugin::StorageEngine::dropTable(*this, *base, identifier))
1944
errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
1945
identifier.getSQLPath().c_str(), errno);
1954
@note this will be removed, I am looking through Hudson to see if it is finding
1955
any tables that are missed during cleanup.
1957
void Session::dumpTemporaryTableNames(const char *foo)
1961
if (not temporary_tables)
1964
cerr << "Begin Run: " << foo << "\n";
1965
for (table= temporary_tables; table; table= table->getNext())
1967
bool have_proto= false;
1969
message::Table *proto= table->getShare()->getTableProto();
1970
if (table->getShare()->getTableProto())
1973
const char *answer= have_proto ? "true" : "false";
1977
cerr << "\tTable Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
1978
cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
1981
cerr << "\tTabl;e Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
1985
bool Session::storeTableMessage(const TableIdentifier &identifier, message::Table &table_message)
1987
table_message_cache.insert(make_pair(identifier.getPath(), table_message));
1992
bool Session::removeTableMessage(const TableIdentifier &identifier)
1994
TableMessageCache::iterator iter;
1996
iter= table_message_cache.find(identifier.getPath());
1998
if (iter == table_message_cache.end())
2001
table_message_cache.erase(iter);
2006
bool Session::getTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2008
TableMessageCache::iterator iter;
2010
iter= table_message_cache.find(identifier.getPath());
2012
if (iter == table_message_cache.end())
2015
table_message.CopyFrom(((*iter).second));
2020
bool Session::doesTableMessageExist(const TableIdentifier &identifier)
2022
TableMessageCache::iterator iter;
2024
iter= table_message_cache.find(identifier.getPath());
2026
if (iter == table_message_cache.end())
2034
bool Session::renameTableMessage(const TableIdentifier &from, const TableIdentifier &to)
2036
TableMessageCache::iterator iter;
2038
table_message_cache[to.getPath()]= table_message_cache[from.getPath()];
2040
iter= table_message_cache.find(to.getPath());
2042
if (iter == table_message_cache.end())
2047
(*iter).second.set_schema(to.getSchemaName());
2048
(*iter).second.set_name(to.getTableName());
2053
TableShareInstance *Session::getTemporaryShare(TableIdentifier::Type type_arg)
2055
temporary_shares.push_back(new TableShareInstance(type_arg)); // This will not go into the tableshare cache, so no key is used.
2057
TableShareInstance *tmp_share= temporary_shares.back();
2064
} /* namespace drizzled */
2294
/***************************************************************************
2295
Handling of XA id cacheing
2296
***************************************************************************/
2298
pthread_mutex_t LOCK_xid_cache;
2301
extern "C" unsigned char *xid_get_hash_key(const unsigned char *, size_t *, bool);
2302
extern "C" void xid_free_hash(void *);
2304
unsigned char *xid_get_hash_key(const unsigned char *ptr, size_t *length,
2305
bool not_used __attribute__((unused)))
2307
*length=((XID_STATE*)ptr)->xid.key_length();
2308
return ((XID_STATE*)ptr)->xid.key();
2311
void xid_free_hash(void *ptr)
2313
if (!((XID_STATE*)ptr)->in_session)
2314
free((unsigned char*)ptr);
2317
bool xid_cache_init()
2319
pthread_mutex_init(&LOCK_xid_cache, MY_MUTEX_INIT_FAST);
2320
return hash_init(&xid_cache, &my_charset_bin, 100, 0, 0,
2321
xid_get_hash_key, xid_free_hash, 0) != 0;
2324
void xid_cache_free()
2326
if (hash_inited(&xid_cache))
2328
hash_free(&xid_cache);
2329
pthread_mutex_destroy(&LOCK_xid_cache);
2333
XID_STATE *xid_cache_search(XID *xid)
2335
pthread_mutex_lock(&LOCK_xid_cache);
2336
XID_STATE *res=(XID_STATE *)hash_search(&xid_cache, xid->key(), xid->key_length());
2337
pthread_mutex_unlock(&LOCK_xid_cache);
2342
bool xid_cache_insert(XID *xid, enum xa_states xa_state)
2346
pthread_mutex_lock(&LOCK_xid_cache);
2347
if (hash_search(&xid_cache, xid->key(), xid->key_length()))
2349
else if (!(xs=(XID_STATE *)my_malloc(sizeof(*xs), MYF(MY_WME))))
2353
xs->xa_state=xa_state;
2356
res=my_hash_insert(&xid_cache, (unsigned char*)xs);
2358
pthread_mutex_unlock(&LOCK_xid_cache);
2363
bool xid_cache_insert(XID_STATE *xid_state)
2365
pthread_mutex_lock(&LOCK_xid_cache);
2366
assert(hash_search(&xid_cache, xid_state->xid.key(),
2367
xid_state->xid.key_length())==0);
2368
bool res=my_hash_insert(&xid_cache, (unsigned char*)xid_state);
2369
pthread_mutex_unlock(&LOCK_xid_cache);
2374
void xid_cache_delete(XID_STATE *xid_state)
2376
pthread_mutex_lock(&LOCK_xid_cache);
2377
hash_delete(&xid_cache, (unsigned char *)xid_state);
2378
pthread_mutex_unlock(&LOCK_xid_cache);
2382
Implementation of interface to write rows to the binary log through the
2383
thread. The thread is responsible for writing the rows it has
2384
inserted/updated/deleted.
2389
Template member function for ensuring that there is an rows log
2390
event of the apropriate type before proceeding.
2393
- Events of type 'RowEventT' have the type code 'type_code'.
2396
If a non-NULL pointer is returned, the pending event for thread 'session' will
2397
be an event of type 'RowEventT' (which have the type code 'type_code')
2398
will either empty or have enough space to hold 'needed' bytes. In
2399
addition, the columns bitmap will be correct for the row, meaning that
2400
the pending event will be flushed if the columns in the event differ from
2401
the columns suppled to the function.
2404
If no error, a non-NULL pending event (either one which already existed or
2405
the newly created one).
2409
template <class RowsEventT> Rows_log_event*
2410
Session::binlog_prepare_pending_rows_event(Table* table, uint32_t serv_id,
2412
bool is_transactional,
2413
RowsEventT *hint __attribute__((unused)))
2415
/* Pre-conditions */
2416
assert(table->s->table_map_id != UINT32_MAX);
2418
/* Fetch the type code for the RowsEventT template parameter */
2419
int const type_code= RowsEventT::TYPE_CODE;
2422
There is no good place to set up the transactional data, so we
2425
if (binlog_setup_trx_data())
2428
Rows_log_event* pending= binlog_get_pending_rows_event();
2430
if (unlikely(pending && !pending->is_valid()))
2434
Check if the current event is non-NULL and a write-rows
2435
event. Also check if the table provided is mapped: if it is not,
2436
then we have switched to writing to a new table.
2437
If there is no pending event, we need to create one. If there is a pending
2438
event, but it's not about the same table id, or not of the same type
2439
(between Write, Update and Delete), or not the same affected columns, or
2440
going to be too big, flush this event to disk and create a new pending
2443
The last test is necessary for the Cluster injector to work
2444
correctly. The reason is that the Cluster can inject two write
2445
rows with different column bitmaps if there is an insert followed
2446
by an update in the same transaction, and these are grouped into a
2447
single epoch/transaction when fed to the injector.
2449
TODO: Fix the code so that the last test can be removed.
2452
pending->server_id != serv_id ||
2453
pending->get_table_id() != table->s->table_map_id ||
2454
pending->get_type_code() != type_code ||
2455
pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
2456
!bitmap_cmp(pending->get_cols(), table->write_set))
2458
/* Create a new RowsEventT... */
2459
Rows_log_event* const
2460
ev= new RowsEventT(this, table, table->s->table_map_id,
2464
ev->server_id= serv_id; // I don't like this, it's too easy to forget.
2466
flush the pending event and replace it with the newly created
2469
if (unlikely(mysql_bin_log.flush_and_set_pending_rows_event(this, ev)))
2475
return(ev); /* This is the new pending event */
2477
return(pending); /* This is the current pending event */
2480
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
2482
Instantiate the versions we need, we have -fno-implicit-template as
2485
template Rows_log_event*
2486
Session::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
2487
Write_rows_log_event*);
2489
template Rows_log_event*
2490
Session::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
2491
Delete_rows_log_event *);
2493
template Rows_log_event*
2494
Session::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
2495
Update_rows_log_event *);
2500
Class to handle temporary allocation of memory for row data.
2502
The responsibilities of the class is to provide memory for
2503
packing one or two rows of packed data (depending on what
2504
constructor is called).
2506
In order to make the allocation more efficient for "simple" rows,
2507
i.e., rows that do not contain any blobs, a pointer to the
2508
allocated memory is of memory is stored in the table structure
2509
for simple rows. If memory for a table containing a blob field
2510
is requested, only memory for that is allocated, and subsequently
2511
released when the object is destroyed.
2514
class Row_data_memory {
2517
Build an object to keep track of a block-local piece of memory
2518
for storing a row of data.
2521
Table where the pre-allocated memory is stored.
2524
Length of data that is needed, if the record contain blobs.
2526
Row_data_memory(Table *table, size_t const len1)
2529
m_alloc_checked= false;
2530
allocate_memory(table, len1);
2531
m_ptr[0]= has_memory() ? m_memory : 0;
2535
Row_data_memory(Table *table, size_t const len1, size_t const len2)
2538
m_alloc_checked= false;
2539
allocate_memory(table, len1 + len2);
2540
m_ptr[0]= has_memory() ? m_memory : 0;
2541
m_ptr[1]= has_memory() ? m_memory + len1 : 0;
2546
if (m_memory != 0 && m_release_memory_on_destruction)
2547
free((unsigned char*) m_memory);
2551
Is there memory allocated?
2553
@retval true There is memory allocated
2554
@retval false Memory allocation failed
2556
bool has_memory() const {
2557
m_alloc_checked= true;
2558
return m_memory != 0;
2561
unsigned char *slot(uint32_t s)
2563
assert(s < sizeof(m_ptr)/sizeof(*m_ptr));
2564
assert(m_ptr[s] != 0);
2565
assert(m_alloc_checked == true);
2570
void allocate_memory(Table *const table, size_t const total_length)
2572
if (table->s->blob_fields == 0)
2575
The maximum length of a packed record is less than this
2576
length. We use this value instead of the supplied length
2577
when allocating memory for records, since we don't know how
2578
the memory will be used in future allocations.
2580
Since table->s->reclength is for unpacked records, we have
2581
to add two bytes for each field, which can potentially be
2582
added to hold the length of a packed field.
2584
size_t const maxlen= table->s->reclength + 2 * table->s->fields;
2587
Allocate memory for two records if memory hasn't been
2588
allocated. We allocate memory for two records so that it can
2589
be used when processing update rows as well.
2591
if (table->write_row_record == 0)
2592
table->write_row_record=
2593
(unsigned char *) alloc_root(&table->mem_root, 2 * maxlen);
2594
m_memory= table->write_row_record;
2595
m_release_memory_on_destruction= false;
2599
m_memory= (unsigned char *) my_malloc(total_length, MYF(MY_WME));
2600
m_release_memory_on_destruction= true;
2604
mutable bool m_alloc_checked;
2605
bool m_release_memory_on_destruction;
2606
unsigned char *m_memory;
2607
unsigned char *m_ptr[2];
2612
int Session::binlog_write_row(Table* table, bool is_trans,
2613
unsigned char const *record)
2615
assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
2618
Pack records into format for transfer. We are allocating more
2619
memory than needed, but that doesn't matter.
2621
Row_data_memory memory(table, table->max_row_length(record));
2622
if (!memory.has_memory())
2623
return HA_ERR_OUT_OF_MEM;
2625
unsigned char *row_data= memory.slot(0);
2627
size_t const len= pack_row(table, table->write_set, row_data, record);
2629
Rows_log_event* const ev=
2630
binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
2631
static_cast<Write_rows_log_event*>(0));
2633
if (unlikely(ev == 0))
2634
return HA_ERR_OUT_OF_MEM;
2636
return ev->add_row_data(row_data, len);
2639
int Session::binlog_update_row(Table* table, bool is_trans,
2640
const unsigned char *before_record,
2641
const unsigned char *after_record)
2643
assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
2645
size_t const before_maxlen = table->max_row_length(before_record);
2646
size_t const after_maxlen = table->max_row_length(after_record);
2648
Row_data_memory row_data(table, before_maxlen, after_maxlen);
2649
if (!row_data.has_memory())
2650
return HA_ERR_OUT_OF_MEM;
2652
unsigned char *before_row= row_data.slot(0);
2653
unsigned char *after_row= row_data.slot(1);
2655
size_t const before_size= pack_row(table, table->read_set, before_row,
2657
size_t const after_size= pack_row(table, table->write_set, after_row,
2660
Rows_log_event* const ev=
2661
binlog_prepare_pending_rows_event(table, server_id,
2662
before_size + after_size, is_trans,
2663
static_cast<Update_rows_log_event*>(0));
2665
if (unlikely(ev == 0))
2666
return HA_ERR_OUT_OF_MEM;
2669
ev->add_row_data(before_row, before_size) ||
2670
ev->add_row_data(after_row, after_size);
2673
int Session::binlog_delete_row(Table* table, bool is_trans,
2674
unsigned char const *record)
2676
assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
2679
Pack records into format for transfer. We are allocating more
2680
memory than needed, but that doesn't matter.
2682
Row_data_memory memory(table, table->max_row_length(record));
2683
if (unlikely(!memory.has_memory()))
2684
return HA_ERR_OUT_OF_MEM;
2686
unsigned char *row_data= memory.slot(0);
2688
size_t const len= pack_row(table, table->read_set, row_data, record);
2690
Rows_log_event* const ev=
2691
binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
2692
static_cast<Delete_rows_log_event*>(0));
2694
if (unlikely(ev == 0))
2695
return HA_ERR_OUT_OF_MEM;
2697
return ev->add_row_data(row_data, len);
2701
int Session::binlog_flush_pending_rows_event(bool stmt_end)
2704
We shall flush the pending event even if we are not in row-based
2705
mode: it might be the case that we left row-based mode before
2706
flushing anything (e.g., if we have explicitly locked tables).
2708
if (!mysql_bin_log.is_open())
2712
Mark the event as the last event of a statement if the stmt_end
2716
if (Rows_log_event *pending= binlog_get_pending_rows_event())
2720
pending->set_flags(Rows_log_event::STMT_END_F);
2721
pending->flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
2722
binlog_table_maps= 0;
2725
error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0);
2733
Member function that will log query, either row-based or
2734
statement-based depending on the value of the 'current_stmt_binlog_row_based'
2735
the value of the 'qtype' flag.
2737
This function should be called after the all calls to ha_*_row()
2738
functions have been issued, but before tables are unlocked and
2742
There shall be no writes to any system table after calling
2743
binlog_query(), so these writes has to be moved to before the call
2744
of binlog_query() for correct functioning.
2746
This is necessesary not only for RBR, but the master might crash
2747
after binlogging the query but before changing the system tables.
2748
This means that the slave and the master are not in the same state
2749
(after the master has restarted), so therefore we have to
2750
eliminate this problem.
2753
Error code, or 0 if no error.
2755
int Session::binlog_query(Session::enum_binlog_query_type qtype, char const *query_arg,
2756
ulong query_len, bool is_trans, bool suppress_use,
2757
Session::killed_state killed_status_arg)
2759
assert(query_arg && mysql_bin_log.is_open());
2761
if (int error= binlog_flush_pending_rows_event(true))
2765
If we are in statement mode and trying to log an unsafe statement,
2766
we should print a warning.
2768
if (lex->is_stmt_unsafe() &&
2769
variables.binlog_format == BINLOG_FORMAT_STMT)
2771
assert(this->query != NULL);
2772
push_warning(this, DRIZZLE_ERROR::WARN_LEVEL_WARN,
2773
ER_BINLOG_UNSAFE_STATEMENT,
2774
ER(ER_BINLOG_UNSAFE_STATEMENT));
2775
if (!(binlog_flags & BINLOG_FLAG_UNSAFE_STMT_PRINTED))
2777
char warn_buf[DRIZZLE_ERRMSG_SIZE];
2778
snprintf(warn_buf, DRIZZLE_ERRMSG_SIZE, "%s Statement: %s",
2779
ER(ER_BINLOG_UNSAFE_STATEMENT), this->query);
2780
sql_print_warning("%s",warn_buf);
2781
binlog_flags|= BINLOG_FLAG_UNSAFE_STMT_PRINTED;
2786
case Session::ROW_QUERY_TYPE:
2787
if (current_stmt_binlog_row_based)
2789
/* Otherwise, we fall through */
2790
case Session::DRIZZLE_QUERY_TYPE:
2792
Using this query type is a conveniece hack, since we have been
2793
moving back and forth between using RBR for replication of
2794
system tables and not using it.
2796
Make sure to change in check_table_binlog_row_based() according
2797
to how you treat this.
2799
case Session::STMT_QUERY_TYPE:
2801
The DRIZZLE_LOG::write() function will set the STMT_END_F flag and
2802
flush the pending rows event if necessary.
2805
Query_log_event qinfo(this, query_arg, query_len, is_trans, suppress_use,
2807
qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
2809
Binlog table maps will be irrelevant after a Query_log_event
2810
(they are just removed on the slave side) so after the query
2811
log event is written to the binary log, we pretend that no
2812
table maps were written.
2814
int error= mysql_bin_log.write(&qinfo);
2815
binlog_table_maps= 0;
2820
case Session::QUERY_TYPE_COUNT:
2822
assert(0 <= qtype && qtype < QUERY_TYPE_COUNT);
2827
bool Discrete_intervals_list::append(uint64_t start, uint64_t val,
2830
/* first, see if this can be merged with previous */
2831
if ((head == NULL) || tail->merge_if_contiguous(start, val, incr))
2833
/* it cannot, so need to add a new interval */
2834
Discrete_interval *new_interval= new Discrete_interval(start, val, incr);
2835
return(append(new_interval));
2840
bool Discrete_intervals_list::append(Discrete_interval *new_interval)
2842
if (unlikely(new_interval == NULL))
2845
head= current= new_interval;
2847
tail->next= new_interval;