91
46
const char * const Session::DEFAULT_WHERE= "field list";
49
/*****************************************************************************
50
** Instansiate templates
51
*****************************************************************************/
53
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
55
template class List<Key>;
56
template class List_iterator<Key>;
57
template class List<Key_part_spec>;
58
template class List_iterator<Key_part_spec>;
59
template class List<Alter_drop>;
60
template class List_iterator<Alter_drop>;
61
template class List<Alter_column>;
62
template class List_iterator<Alter_column>;
65
/****************************************************************************
67
****************************************************************************/
69
extern "C" unsigned char *get_var_key(user_var_entry *entry, size_t *length,
70
bool not_used __attribute__((unused)))
72
*length= entry->name.length;
73
return (unsigned char*) entry->name.str;
76
extern "C" void free_user_var(user_var_entry *entry)
78
char *pos= (char*) entry+ALIGN_SIZE(sizeof(*entry));
79
if (entry->value && entry->value != pos)
93
84
bool Key_part_spec::operator==(const Key_part_spec& other) const
95
86
return length == other.length &&
96
87
field_name.length == other.field_name.length &&
97
!my_strcasecmp(system_charset_info, field_name.str, other.field_name.str);
100
Open_tables_state::Open_tables_state(uint64_t version_arg) :
103
open_tables= temporary_tables= derived_tables= NULL;
104
extra_lock= lock= NULL;
88
!strcmp(field_name.str, other.field_name.str);
92
Construct an (almost) deep copy of this key. Only those
93
elements that are known to never change are not copied.
94
If out of memory, a partial copy is returned and an error is set
98
Key::Key(const Key &rhs, MEM_ROOT *mem_root)
100
key_create_info(rhs.key_create_info),
101
columns(rhs.columns, mem_root),
103
generated(rhs.generated)
105
list_copy_and_replace_each_value(columns, mem_root);
109
Construct an (almost) deep copy of this foreign key. Only those
110
elements that are known to never change are not copied.
111
If out of memory, a partial copy is returned and an error is set
115
Foreign_key::Foreign_key(const Foreign_key &rhs, MEM_ROOT *mem_root)
117
ref_table(rhs.ref_table),
118
ref_columns(rhs.ref_columns),
119
delete_opt(rhs.delete_opt),
120
update_opt(rhs.update_opt),
121
match_opt(rhs.match_opt)
123
list_copy_and_replace_each_value(ref_columns, mem_root);
127
Test if a foreign key (= generated key) is a prefix of the given key
128
(ignoring key name, key type and order of columns)
131
This is only used to test if an index for a FOREIGN KEY exists
134
We only compare field names
137
0 Generated key is a prefix of other key
141
bool foreign_key_prefix(Key *a, Key *b)
143
/* Ensure that 'a' is the generated key */
146
if (b->generated && a->columns.elements > b->columns.elements)
147
std::swap(a, b); // Put shorter key in 'a'
152
return true; // No foreign key
153
std::swap(a, b); // Put generated key in 'a'
156
/* Test if 'a' is a prefix of 'b' */
157
if (a->columns.elements > b->columns.elements)
158
return true; // Can't be prefix
160
List_iterator<Key_part_spec> col_it1(a->columns);
161
List_iterator<Key_part_spec> col_it2(b->columns);
162
const Key_part_spec *col1, *col2;
164
#ifdef ENABLE_WHEN_INNODB_CAN_HANDLE_SWAPED_FOREIGN_KEY_COLUMNS
165
while ((col1= col_it1++))
169
while ((col2= col_it2++))
178
return true; // Error
180
return false; // Is prefix
182
while ((col1= col_it1++))
185
if (!(*col1 == *col2))
188
return false; // Is prefix
194
Check if the foreign key options are compatible with columns
195
on which the FK is created.
201
bool Foreign_key::validate(List<Create_field> &table_fields)
203
Create_field *sql_field;
204
Key_part_spec *column;
205
List_iterator<Key_part_spec> cols(columns);
206
List_iterator<Create_field> it(table_fields);
207
while ((column= cols++))
210
while ((sql_field= it++) &&
211
my_strcasecmp(system_charset_info,
212
column->field_name.str,
213
sql_field->field_name)) {}
216
my_error(ER_KEY_COLUMN_DOES_NOT_EXITS, MYF(0), column->field_name.str);
219
if (type == Key::FOREIGN_KEY && sql_field->vcol_info)
221
if (delete_opt == FK_OPTION_SET_NULL)
223
my_error(ER_WRONG_FK_OPTION_FOR_VIRTUAL_COLUMN, MYF(0),
224
"ON DELETE SET NULL");
227
if (update_opt == FK_OPTION_SET_NULL)
229
my_error(ER_WRONG_FK_OPTION_FOR_VIRTUAL_COLUMN, MYF(0),
230
"ON UPDATE SET NULL");
233
if (update_opt == FK_OPTION_CASCADE)
235
my_error(ER_WRONG_FK_OPTION_FOR_VIRTUAL_COLUMN, MYF(0),
236
"ON UPDATE CASCADE");
245
/****************************************************************************
246
** Thread specific functions
247
****************************************************************************/
249
Open_tables_state::Open_tables_state(ulong version_arg)
250
:version(version_arg), state_flags(0U)
252
reset_open_tables_state();
108
256
The following functions form part of the C plugin API
110
int tmpfile(const char *prefix)
259
extern "C" int mysql_tmpfile(const char *prefix)
112
261
char filename[FN_REFLEN];
113
int fd = internal::create_temp_file(filename, drizzle_tmpdir.c_str(), prefix, MYF(MY_WME));
262
File fd = create_temp_file(filename, drizzle_tmpdir, prefix,
263
O_CREAT | O_EXCL | O_RDWR,
115
266
unlink(filename);
121
void **Session::getEngineData(const plugin::MonitoredInTransaction *monitored)
123
return static_cast<void **>(&ha_data[monitored->getId()].ha_ptr);
126
ResourceContext *Session::getResourceContext(const plugin::MonitoredInTransaction *monitored,
129
return &ha_data[monitored->getId()].resource_context[index];
274
int session_in_lock_tables(const Session *session)
276
return test(session->in_lock_tables);
281
int session_tablespace_op(const Session *session)
283
return test(session->tablespace_op);
288
Set the process info field of the Session structure.
290
This function is used by plug-ins. Internally, the
291
Session::set_proc_info() function should be used.
293
@see Session::set_proc_info
296
set_session_proc_info(Session *session, const char *info)
298
session->set_proc_info(info);
302
const char *get_session_proc_info(Session *session)
304
return session->get_proc_info();
308
void **session_ha_data(const Session *session, const struct handlerton *hton)
310
return (void **) &session->ha_data[hton->slot].ha_ptr;
132
314
int64_t session_test_options(const Session *session, int64_t test_options)
134
316
return session->options & test_options;
137
Session::Session(plugin::Client *client_arg, catalog::Instance::shared_ptr catalog_arg) :
138
Open_tables_state(refresh_version),
139
mem_root(&main_mem_root),
142
query(new std::string),
143
_schema(new std::string("")),
147
lock_id(&main_lock_id),
149
security_ctx(identifier::User::make_shared()),
150
_where(Session::DEFAULT_WHERE),
151
dbug_sentry(Session_SENTRY_MAGIC),
153
command(COM_CONNECT),
155
_epoch(boost::gregorian::date(1970,1,1)),
156
_connect_time(boost::posix_time::microsec_clock::universal_time()),
158
ha_data(plugin::num_trx_monitored_objects),
161
concurrent_execute_allowed(true),
162
arg_of_last_insert_id_function(false),
163
first_successful_insert_id_in_prev_stmt(0),
164
first_successful_insert_id_in_cur_stmt(0),
166
options(session_startup_options),
169
examined_row_count(0),
173
statement_id_counter(0),
177
_global_read_lock(NONE),
178
count_cuted_fields(CHECK_FIELD_ERROR_FOR_NULL),
180
some_tables_deleted(false),
183
is_fatal_error(false),
184
transaction_rollback_request(false),
185
is_fatal_sub_stmt_error(0),
186
tablespace_op(false),
187
derived_tables_processing(false),
190
transaction_message(NULL),
191
statement_message(NULL),
192
session_event_observers(NULL),
193
_catalog(catalog_arg),
196
client->setSession(this);
320
int session_sql_command(const Session *session)
322
return (int) session->lex->sql_command;
326
int session_tx_isolation(const Session *session)
328
return (int) session->variables.tx_isolation;
332
void session_inc_row_count(Session *session)
334
session->row_count++;
338
Clear this diagnostics area.
340
Normally called at the end of a statement.
344
Diagnostics_area::reset_diagnostics_area()
346
can_overwrite_status= false;
347
/** Don't take chances in production */
353
m_total_warn_count= 0;
355
/** Tiny reset in debug mode to see garbage right away */
361
Set OK status -- ends commands that do not return a
362
result set, e.g. INSERT/UPDATE/DELETE.
366
Diagnostics_area::set_ok_status(Session *session, ha_rows affected_rows_arg,
367
uint64_t last_insert_id_arg,
368
const char *message_arg)
372
In production, refuse to overwrite an error or a custom response
375
if (is_error() || is_disabled())
377
/** Only allowed to report success if has not yet reported an error */
379
m_server_status= session->server_status;
380
m_total_warn_count= session->total_warn_count;
381
m_affected_rows= affected_rows_arg;
382
m_last_insert_id= last_insert_id_arg;
384
strmake(m_message, message_arg, sizeof(m_message) - 1);
396
Diagnostics_area::set_eof_status(Session *session)
398
/** Only allowed to report eof if has not yet reported an error */
402
In production, refuse to overwrite an error or a custom response
405
if (is_error() || is_disabled())
408
m_server_status= session->server_status;
410
If inside a stored procedure, do not return the total
411
number of warnings, since they are not available to the client
414
m_total_warn_count= session->total_warn_count;
424
Diagnostics_area::set_error_status(Session *session __attribute__((unused)),
425
uint32_t sql_errno_arg,
426
const char *message_arg)
429
Only allowed to report error if has not yet reported a success
430
The only exception is when we flush the message to the client,
431
an error can happen during the flush.
433
assert(! is_set() || can_overwrite_status);
435
In production, refuse to overwrite a custom response with an
441
m_sql_errno= sql_errno_arg;
442
strmake(m_message, message_arg, sizeof(m_message) - 1);
449
Mark the diagnostics area as 'DISABLED'.
451
This is used in rare cases when the COM_ command at hand sends a response
452
in a custom format. One example is the query cache, another is
457
Diagnostics_area::disable_status()
460
m_status= DA_DISABLED;
465
:Statement(&main_lex, &main_mem_root,
466
/* statement id */ 0),
467
Open_tables_state(refresh_version), rli_fake(0),
468
lock_id(&main_lock_id),
470
binlog_table_maps(0), binlog_flags(0UL),
471
arg_of_last_insert_id_function(false),
472
first_successful_insert_id_in_prev_stmt(0),
473
first_successful_insert_id_in_prev_stmt_for_binlog(0),
474
first_successful_insert_id_in_cur_stmt(0),
475
stmt_depends_on_first_successful_insert_id_in_prev_stmt(false),
478
transaction_rollback_request(0),
479
is_fatal_sub_stmt_error(0),
483
derived_tables_processing(false),
199
489
Pass nominal parameters to init_alloc_root only to ensure that
200
490
the destructor works OK in case of an error. The main_mem_root
201
491
will be re-initialized in init_for_queries().
203
memory::init_sql_alloc(&main_mem_root, memory::ROOT_MIN_BLOCK_SIZE, 0);
493
init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0);
495
catalog= (char*)"std"; // the only catalog we have for now
496
main_security_ctx.init();
497
security_ctx= &main_security_ctx;
498
some_tables_deleted=no_errors=password= 0;
500
count_cuted_fields= CHECK_FIELD_IGNORE;
503
is_slave_error= thread_specific_used= false;
504
hash_clear(&handler_tables_hash);
204
507
cuted_fields= sent_row_count= row_count= 0L;
510
statement_id_counter= 0UL;
205
511
// Must be reset to handle error with Session's created for init of mysqld
206
512
lex->current_select= 0;
513
start_time=(time_t) 0;
515
utime_after_lock= 0L;
207
518
memset(&variables, 0, sizeof(variables));
208
scoreboard_index= -1;
209
cleanup_done= abort_on_warning= no_warnings_for_error= false;
211
/* query_cache init */
524
db_charset= global_system_variables.collation_database;
525
memset(ha_data, 0, sizeof(ha_data));
527
binlog_evt_union.do_union= false;
528
dbug_sentry=Session_SENTRY_MAGIC;
530
client_capabilities= 0; // minimalistic client
531
system_thread= NON_SYSTEM_THREAD;
532
cleanup_done= abort_on_warning= no_warnings_for_error= 0;
533
peer_port= 0; // For SHOW PROCESSLIST
534
transaction.m_pending_rows_event= 0;
536
pthread_mutex_init(&LOCK_delete, MY_MUTEX_INIT_FAST);
215
538
/* Variables with default values */
216
539
proc_info="login";
218
plugin_sessionvar_init(this);
220
variables= global_system_variables above has reset
221
variables.pseudo_thread_id to 0. We need to correct it here to
222
avoid temporary tables replication failure.
224
variables.pseudo_thread_id= thread_id;
225
server_status= SERVER_STATUS_AUTOCOMMIT;
227
if (variables.max_join_size == HA_POS_ERROR)
228
options |= OPTION_BIG_SELECTS;
230
options &= ~OPTION_BIG_SELECTS;
232
open_options=ha_open_options;
233
update_lock_default= TL_WRITE;
234
session_tx_isolation= (enum_tx_isolation) variables.tx_isolation;
236
memset(warn_count, 0, sizeof(warn_count));
237
memset(&status_var, 0, sizeof(status_var));
540
where= Session::DEFAULT_WHERE;
541
server_id = ::server_id;
239
547
/* Initialize sub structures */
240
memory::init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
548
init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
549
user_connect=(USER_CONN *)0;
550
hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0,
551
(hash_get_key) get_var_key,
552
(hash_free_key) free_user_var, 0);
554
/* For user vars replication*/
556
my_init_dynamic_array(&user_var_events,
557
sizeof(BINLOG_USER_VAR_EVENT *), 16, 16);
559
memset(&user_var_events, 0, sizeof(user_var_events));
562
protocol= &protocol_text; // Default protocol
563
protocol_text.init(this);
565
const Query_id& query_id= Query_id::get_query_id();
566
tablespace_op= false;
568
randominit(&rand, tmp + (ulong) &rand, tmp + query_id.value());
242
569
substitute_null_with_insert_id = false;
243
lock_info.init(); /* safety: will be reset after start */
570
thr_lock_info_init(&lock_info); /* safety: will be reset after start */
244
571
thr_lock_owner_init(&main_lock_id, &lock_info);
246
573
m_internal_handler= NULL;
248
plugin::EventObserver::registerSessionEvents(*this);
251
void Session::free_items()
254
/* This works because items are allocated with memory::sql_alloc() */
255
for (; free_list; free_list= next)
257
next= free_list->next;
258
free_list->delete_self();
262
577
void Session::push_internal_handler(Internal_error_handler *handler)
460
897
assert(thread_stack);
462
currentSession().release();
463
currentSession().reset(this);
465
currentMemRoot().release();
466
currentMemRoot().reset(&mem_root);
899
if (pthread_setspecific(THR_Session, this) ||
900
pthread_setspecific(THR_MALLOC, &mem_root))
468
902
mysys_var=my_thread_var;
471
904
Let mysqld define the thread id (not mysys)
472
905
This allows us to move Session to different threads if needed.
474
907
mysys_var->id= thread_id;
908
real_id= pthread_self(); // For debugging
477
911
We have to call thr_lock_info_init() again here as Session may have been
478
912
created in another thread
486
Init Session for query processing.
487
This has to be called once before we call mysql_parse.
488
See also comments in session.h.
491
void Session::prepareForQueries()
493
if (variables.max_join_size == HA_POS_ERROR)
494
options |= OPTION_BIG_SELECTS;
496
version= refresh_version;
501
mem_root->reset_root_defaults(variables.query_alloc_block_size,
502
variables.query_prealloc_size);
503
transaction.xid_state.xid.null();
504
transaction.xid_state.in_session=1;
509
bool Session::initGlobals()
513
disconnect(ER_OUT_OF_RESOURCES);
514
status_var.aborted_connects++;
522
if (initGlobals() || authenticate())
530
while (not client->haveError() && getKilled() != KILL_CONNECTION)
532
if (not executeStatement())
539
bool Session::schedule(Session::shared_ptr &arg)
541
arg->scheduler= plugin::Scheduler::getScheduler();
542
assert(arg->scheduler);
546
long current_connections= connection_count;
548
if (current_connections > 0 and static_cast<uint64_t>(current_connections) > current_global_counters.max_used_connections)
550
current_global_counters.max_used_connections= static_cast<uint64_t>(connection_count);
553
current_global_counters.connections++;
554
arg->thread_id= arg->variables.pseudo_thread_id= global_thread_id++;
556
session::Cache::singleton().insert(arg);
558
if (unlikely(plugin::EventObserver::connectSession(*arg)))
560
// We should do something about an error...
563
if (plugin::Scheduler::getScheduler()->addSession(arg))
565
DRIZZLE_CONNECTION_START(arg->getSessionId());
566
char error_message_buff[DRIZZLE_ERRMSG_SIZE];
568
arg->setKilled(Session::KILL_CONNECTION);
570
arg->status_var.aborted_connects++;
572
/* Can't use my_error() since store_globals has not been called. */
573
/* TODO replace will better error message */
574
snprintf(error_message_buff, sizeof(error_message_buff),
575
ER(ER_CANT_CREATE_THREAD), 1);
576
arg->client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
586
Is this session viewable by the current user?
588
bool Session::isViewable(identifier::User::const_reference user_arg) const
590
return plugin::Authorization::isAuthorized(user_arg, this, false);
594
const char* Session::enter_cond(boost::condition_variable_any &cond, boost::mutex &mutex, const char* msg)
596
const char* old_msg = get_proc_info();
597
safe_mutex_assert_owner(mutex);
598
mysys_var->current_mutex = &mutex;
599
mysys_var->current_cond = &cond;
600
this->set_proc_info(msg);
604
void Session::exit_cond(const char* old_msg)
607
Putting the mutex unlock in exit_cond() ensures that
608
mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
609
locked (if that would not be the case, you'll get a deadlock if someone
610
does a Session::awake() on you).
612
mysys_var->current_mutex->unlock();
613
boost_unique_lock_t scopedLock(mysys_var->mutex);
614
mysys_var->current_mutex = 0;
615
mysys_var->current_cond = 0;
616
this->set_proc_info(old_msg);
619
bool Session::authenticate()
621
if (client->authenticate())
624
status_var.aborted_connects++;
629
bool Session::checkUser(const std::string &passwd_str,
630
const std::string &in_db)
632
bool is_authenticated=
633
plugin::Authentication::isAuthenticated(user(), passwd_str);
635
if (is_authenticated != true)
637
status_var.access_denied++;
638
/* isAuthenticated has pushed the error message */
642
/* Change database if necessary */
643
if (not in_db.empty())
645
identifier::Schema identifier(in_db);
646
if (change_db(this, identifier))
648
/* change_db() has pushed the error message. */
653
password= not passwd_str.empty();
655
/* Ready to handle queries */
659
bool Session::executeStatement()
662
uint32_t packet_length;
664
enum enum_server_command l_command;
667
indicator of uninitialized lex => normal flow of errors handling
670
lex->current_select= 0;
672
main_da.reset_diagnostics_area();
674
if (client->readCommand(&l_packet, &packet_length) == false)
679
if (getKilled() == KILL_CONNECTION)
682
if (packet_length == 0)
685
l_command= static_cast<enum_server_command>(l_packet[0]);
687
if (command >= COM_END)
688
command= COM_END; // Wrong command
690
assert(packet_length);
691
return not dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
694
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
696
/* Remove garbage at start and end of query */
697
while (in_packet_length > 0 && my_isspace(charset(), in_packet[0]))
702
const char *pos= in_packet + in_packet_length; /* Point at end null */
703
while (in_packet_length > 0 && (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
709
std::string *new_query= new std::string(in_packet, in_packet + in_packet_length);
710
// We can not be entirely sure _schema has a value
713
plugin::QueryRewriter::rewriteQuery(*_schema, *new_query);
715
query.reset(new_query);
716
_state.reset(new session::State(in_packet, in_packet_length));
721
bool Session::endTransaction(enum enum_mysql_completiontype completion)
725
TransactionServices &transaction_services= TransactionServices::singleton();
727
if (transaction.xid_state.xa_state != XA_NOTR)
729
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
736
* We don't use endActiveTransaction() here to ensure that this works
737
* even if there is a problem with the OPTION_AUTO_COMMIT flag
738
* (Which of course should never happen...)
740
server_status&= ~SERVER_STATUS_IN_TRANS;
741
if (transaction_services.commitTransaction(*this, true))
743
options&= ~(OPTION_BEGIN);
746
do_release= 1; /* fall through */
747
case COMMIT_AND_CHAIN:
748
result= endActiveTransaction();
749
if (result == true && completion == COMMIT_AND_CHAIN)
750
result= startTransaction();
752
case ROLLBACK_RELEASE:
753
do_release= 1; /* fall through */
755
case ROLLBACK_AND_CHAIN:
757
server_status&= ~SERVER_STATUS_IN_TRANS;
758
if (transaction_services.rollbackTransaction(*this, true))
760
options&= ~(OPTION_BEGIN);
761
if (result == true && (completion == ROLLBACK_AND_CHAIN))
762
result= startTransaction();
766
my_error(ER_UNKNOWN_COM_ERROR, MYF(0));
772
my_error(static_cast<drizzled::error_t>(killed_errno()), MYF(0));
774
else if ((result == true) && do_release)
776
setKilled(Session::KILL_CONNECTION);
782
bool Session::endActiveTransaction()
785
TransactionServices &transaction_services= TransactionServices::singleton();
787
if (transaction.xid_state.xa_state != XA_NOTR)
789
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
792
if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
794
server_status&= ~SERVER_STATUS_IN_TRANS;
795
if (transaction_services.commitTransaction(*this, true))
798
options&= ~(OPTION_BEGIN);
802
bool Session::startTransaction(start_transaction_option_t opt)
806
assert(! inTransaction());
808
options|= OPTION_BEGIN;
809
server_status|= SERVER_STATUS_IN_TRANS;
811
if (plugin::TransactionalStorageEngine::notifyStartTransaction(this, opt))
914
thr_lock_info_init(&lock_info);
923
Session::cleanup_after_query()
926
This function is used to reset thread data to its default state.
929
This function is not suitable for setting thread data to some
930
non-default values, as there is only one replication thread, so
931
different master threads may overwrite data of each other on
819
935
void Session::cleanup_after_query()
822
Reset rand_used so that detection of calls to rand() will save random
938
Reset rand_used so that detection of calls to rand() will save random
823
939
seeds if needed by the slave.
826
942
/* Forget those values, for next binlogger: */
943
stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
827
944
auto_inc_intervals_in_cur_stmt_for_binlog.empty();
829
947
if (first_successful_insert_id_in_cur_stmt > 0)
831
949
/* set what LAST_INSERT_ID() will return */
832
first_successful_insert_id_in_prev_stmt= first_successful_insert_id_in_cur_stmt;
950
first_successful_insert_id_in_prev_stmt=
951
first_successful_insert_id_in_cur_stmt;
833
952
first_successful_insert_id_in_cur_stmt= 0;
834
953
substitute_null_with_insert_id= true;
837
arg_of_last_insert_id_function= false;
955
arg_of_last_insert_id_function= 0;
839
956
/* Free Items that were created during this execution */
843
_where= Session::DEFAULT_WHERE;
845
/* Reset the temporary shares we built */
846
for_each(temporary_shares.begin(),
847
temporary_shares.end(),
849
temporary_shares.clear();
959
where= Session::DEFAULT_WHERE;
853
964
Create a LEX_STRING in this connection.
915
1187
item->maybe_null= 1;
916
1188
field_list.push_back(new Item_empty_string("Extra", 255, cs));
917
return (result->send_fields(field_list));
920
void select_result::send_error(drizzled::error_t errcode, const char *err)
1189
return (result->send_fields(field_list,
1190
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF));
1194
struct Item_change_record: public ilink
1198
/* Placement new was hidden by `new' in ilink (TODO: check): */
1199
static void *operator new(size_t size __attribute__((unused)),
1202
static void operator delete(void *ptr __attribute__((unused)),
1203
size_t size __attribute__((unused)))
1205
static void operator delete(void *ptr __attribute__((unused)),
1206
void *mem __attribute__((unused)))
1207
{ /* never called */ }
1212
Register an item tree tree transformation, performed by the query
1213
optimizer. We need a pointer to runtime_memroot because it may be !=
1214
session->mem_root (this may no longer be a true statement)
1217
void Session::nocheck_register_item_tree_change(Item **place, Item *old_value,
1218
MEM_ROOT *runtime_memroot)
1220
Item_change_record *change;
1222
Now we use one node per change, which adds some memory overhead,
1223
but still is rather fast as we use alloc_root for allocations.
1224
A list of item tree changes of an average query should be short.
1226
void *change_mem= alloc_root(runtime_memroot, sizeof(*change));
1227
if (change_mem == 0)
1230
OOM, session->fatal_error() is called by the error handler of the
1231
memroot. Just return.
1235
change= new (change_mem) Item_change_record;
1236
change->place= place;
1237
change->old_value= old_value;
1238
change_list.append(change);
1242
void Session::rollback_item_tree_changes()
1244
I_List_iterator<Item_change_record> it(change_list);
1245
Item_change_record *change;
1247
while ((change= it++))
1248
*change->place= change->old_value;
1249
/* We can forget about changes memory: it's allocated in runtime memroot */
1250
change_list.empty();
1255
/*****************************************************************************
1256
** Functions to provide a interface to select results
1257
*****************************************************************************/
1259
select_result::select_result()
1261
session=current_session;
1264
void select_result::send_error(uint32_t errcode,const char *err)
922
1266
my_message(errcode, err, MYF(0));
1270
void select_result::cleanup()
1275
bool select_result::check_simple_select() const
1277
my_error(ER_SP_BAD_CURSOR_QUERY, MYF(0));
1282
static String default_line_term("\n",default_charset_info);
1283
static String default_escaped("\\",default_charset_info);
1284
static String default_field_term("\t",default_charset_info);
1286
sql_exchange::sql_exchange(char *name, bool flag,
1287
enum enum_filetype filetype_arg)
1288
:file_name(name), opt_enclosed(0), dumpfile(flag), skip_lines(0)
1290
filetype= filetype_arg;
1291
field_term= &default_field_term;
1292
enclosed= line_start= &my_empty_string;
1293
line_term= &default_line_term;
1294
escaped= &default_escaped;
1298
bool select_send::send_fields(List<Item> &list, uint32_t flags)
1301
if (!(res= session->protocol->send_fields(&list, flags)))
1302
is_result_set_started= 1;
1306
void select_send::abort()
1313
Cleanup an instance of this class for re-use
1314
at next execution of a prepared statement/
1315
stored procedure statement.
1318
void select_send::cleanup()
1320
is_result_set_started= false;
1323
/* Send data to client. Returns 0 if ok */
1325
bool select_send::send_data(List<Item> &items)
1327
if (unit->offset_limit_cnt)
1328
{ // using limit offset,count
1329
unit->offset_limit_cnt--;
1334
We may be passing the control from mysqld to the client: release the
1335
InnoDB adaptive hash S-latch to avoid thread deadlocks if it was reserved
1338
ha_release_temporary_latches(session);
1340
List_iterator_fast<Item> li(items);
1341
Protocol *protocol= session->protocol;
1342
char buff[MAX_FIELD_WIDTH];
1343
String buffer(buff, sizeof(buff), &my_charset_bin);
1345
protocol->prepare_for_resend();
1349
if (item->send(protocol, &buffer))
1351
protocol->free(); // Free used buffer
1352
my_message(ER_OUT_OF_RESOURCES, ER(ER_OUT_OF_RESOURCES), MYF(0));
1356
session->sent_row_count++;
1357
if (session->is_error())
1359
protocol->remove_last_row();
1362
if (session->vio_ok())
1363
return(protocol->write());
1367
bool select_send::send_eof()
1370
We may be passing the control from mysqld to the client: release the
1371
InnoDB adaptive hash S-latch to avoid thread deadlocks if it was reserved
1374
ha_release_temporary_latches(session);
1376
/* Unlock tables before sending packet to gain some speed */
1379
mysql_unlock_tables(session, session->lock);
1383
is_result_set_started= 0;
925
1388
/************************************************************************
926
1389
Handling writing to file
927
1390
************************************************************************/
929
void select_to_file::send_error(drizzled::error_t errcode,const char *err)
1392
void select_to_file::send_error(uint32_t errcode,const char *err)
931
1394
my_message(errcode, err, MYF(0));
934
(void) cache->end_io_cache();
935
(void) internal::my_close(file, MYF(0));
936
(void) internal::my_delete(path.file_string().c_str(), MYF(0)); // Delete file on error
1397
(void) end_io_cache(&cache);
1398
(void) my_close(file,MYF(0));
1399
(void) my_delete(path,MYF(0)); // Delete file on error
1612
2289
@param session Thread handle
1613
2290
@param all true <=> rollback main transaction.
1615
void Session::markTransactionForRollback(bool all)
1617
is_fatal_sub_stmt_error= true;
1618
transaction_rollback_request= all;
1621
void Session::disconnect(enum error_t errcode)
1623
/* Allow any plugins to cleanup their session variables */
1624
plugin_sessionvar_cleanup(this);
1626
/* If necessary, log any aborted or unauthorized connections */
1627
if (getKilled() || client->wasAborted())
1629
status_var.aborted_threads++;
1632
if (client->wasAborted())
1634
if (not getKilled() && variables.log_warnings > 1)
1636
errmsg_printf(error::WARN, ER(ER_NEW_ABORTING_CONNECTION)
1638
, (_schema->empty() ? "unconnected" : _schema->c_str())
1639
, security_ctx->username().empty() == false ? security_ctx->username().c_str() : "unauthenticated"
1640
, security_ctx->address().c_str()
1641
, (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1645
setKilled(Session::KILL_CONNECTION);
1647
if (client->isConnected())
1649
if (errcode != EE_OK)
1651
/*my_error(errcode, ER(errcode));*/
1652
client->sendError(errcode, ER(errcode));
1658
void Session::reset_for_next_command()
1663
Those two lines below are theoretically unneeded as
1664
Session::cleanup_after_query() should take care of this already.
1666
auto_inc_intervals_in_cur_stmt_for_binlog.empty();
1668
is_fatal_error= false;
1669
server_status&= ~ (SERVER_MORE_RESULTS_EXISTS |
1670
SERVER_QUERY_NO_INDEX_USED |
1671
SERVER_QUERY_NO_GOOD_INDEX_USED);
1674
main_da.reset_diagnostics_area();
1675
total_warn_count=0; // Warnings for this query
1676
sent_row_count= examined_row_count= 0;
1680
Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
1683
void Open_tables_state::close_temporary_tables()
1688
if (not temporary_tables)
1691
for (table= temporary_tables; table; table= tmp_next)
1693
tmp_next= table->getNext();
1696
temporary_tables= NULL;
1700
unlink from session->temporary tables and close temporary table
1703
void Open_tables_state::close_temporary_table(Table *table)
1705
if (table->getPrev())
1707
table->getPrev()->setNext(table->getNext());
1708
if (table->getPrev()->getNext())
1710
table->getNext()->setPrev(table->getPrev());
1715
/* removing the item from the list */
1716
assert(table == temporary_tables);
1718
slave must reset its temporary list pointer to zero to exclude
1719
passing non-zero value to end_slave via rli->save_temporary_tables
1720
when no temp tables opened, see an invariant below.
1722
temporary_tables= table->getNext();
1723
if (temporary_tables)
1725
table->getNext()->setPrev(NULL);
1732
Close and drop a temporary table
1735
This dosn't unlink table from session->temporary
1736
If this is needed, use close_temporary_table()
1739
void Open_tables_state::nukeTable(Table *table)
1741
plugin::StorageEngine *table_type= table->getShare()->db_type();
1743
table->free_io_cache();
1744
table->delete_table();
1746
identifier::Table identifier(table->getShare()->getSchemaName(), table->getShare()->getTableName(), table->getShare()->getPath());
1747
rm_temporary_table(table_type, identifier);
1749
boost::checked_delete(table->getMutableShare());
1751
boost::checked_delete(table);
1754
/** Clear most status variables. */
1755
extern time_t flush_status_time;
1757
void Session::refresh_status()
1759
/* Reset thread's status variables */
1760
memset(&status_var, 0, sizeof(status_var));
1762
flush_status_time= time((time_t*) 0);
1763
current_global_counters.max_used_connections= 1; /* We set it to one, because we know we exist */
1764
current_global_counters.connections= 0;
1767
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1769
return getVariable(std::string(name.str, name.length), create_if_not_exists);
1772
user_var_entry *Session::getVariable(const std::string &name, bool create_if_not_exists)
1777
UserVars::iterator iter= user_vars.find(name);
1778
if (iter != user_vars.end())
1779
return (*iter).second;
1781
if (not create_if_not_exists)
1784
user_var_entry *entry= NULL;
1785
entry= new (nothrow) user_var_entry(name.c_str(), query_id);
1790
std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(name, entry));
1792
if (not returnable.second)
1794
boost::checked_delete(entry);
1800
void Session::setVariable(const std::string &name, const std::string &value)
1802
user_var_entry *updateable_var= getVariable(name.c_str(), true);
1805
updateable_var->update_hash(false,
1806
(void*)value.c_str(),
1807
static_cast<uint32_t>(value.length()), STRING_RESULT,
1809
DERIVATION_IMPLICIT, false);
1813
void Open_tables_state::mark_temp_tables_as_free_for_reuse()
1815
for (Table *table= temporary_tables ; table ; table= table->getNext())
1817
if (table->query_id == getQueryId())
1820
table->cursor->ha_reset();
1825
void Session::mark_used_tables_as_free_for_reuse(Table *table)
1827
for (; table ; table= table->getNext())
1829
if (table->query_id == getQueryId())
1832
table->cursor->ha_reset();
1838
Unlocks tables and frees derived tables.
1839
Put all normal tables used by thread in free list.
1841
It will only close/mark as free for reuse tables opened by this
1842
substatement, it will also check if we are closing tables after
1843
execution of complete query (i.e. we are on upper level) and will
1844
leave prelocked mode if needed.
1846
void Session::close_thread_tables()
1848
clearDerivedTables();
1851
Mark all temporary tables used by this statement as free for reuse.
1853
mark_temp_tables_as_free_for_reuse();
1855
Let us commit transaction for statement. Since in 5.0 we only have
1856
one statement transaction and don't allow several nested statement
1857
transactions this call will do nothing if we are inside of stored
1858
function or trigger (i.e. statement transaction is already active and
1859
does not belong to statement for which we do close_thread_tables()).
1860
TODO: This should be fixed in later releases.
1863
TransactionServices &transaction_services= TransactionServices::singleton();
1864
main_da.can_overwrite_status= true;
1865
transaction_services.autocommitOrRollback(*this, is_error());
1866
main_da.can_overwrite_status= false;
1867
transaction.stmt.reset();
1873
For RBR we flush the pending event just before we unlock all the
1874
tables. This means that we are at the end of a topmost
1875
statement, so we ensure that the STMT_END_F flag is set on the
1876
pending event. For statements that are *inside* stored
1877
functions, the pending event will not be flushed: that will be
1878
handled either before writing a query log event (inside
1879
binlog_query()) or when preparing a pending event.
1885
Note that we need to hold table::Cache::singleton().mutex() while changing the
1886
open_tables list. Another thread may work on it.
1887
(See: table::Cache::singleton().removeTable(), wait_completed_table())
1888
Closing a MERGE child before the parent would be fatal if the
1889
other thread tries to abort the MERGE lock in between.
1892
close_open_tables();
1895
void Session::close_tables_for_reopen(TableList **tables)
1898
If table list consists only from tables from prelocking set, table list
1899
for new attempt should be empty, so we have to update list's root pointer.
1901
if (lex->first_not_own_table() == *tables)
1903
lex->chop_off_not_own_tables();
1904
for (TableList *tmp= *tables; tmp; tmp= tmp->next_global)
1906
close_thread_tables();
1909
bool Session::openTablesLock(TableList *tables)
1916
if (open_tables_from_list(&tables, &counter))
1919
if (not lock_tables(tables, counter, &need_reopen))
1922
if (not need_reopen)
1925
close_tables_for_reopen(&tables);
1928
if ((handle_derived(lex, &derived_prepare) || (handle_derived(lex, &derived_filling))))
1935
@note "best_effort" is used in cases were if a failure occurred on this
1936
operation it would not be surprising because we are only removing because there
1937
might be an issue (lame engines).
1940
bool Open_tables_state::rm_temporary_table(const identifier::Table &identifier, bool best_effort)
1942
if (not plugin::StorageEngine::dropTable(*static_cast<Session *>(this), identifier))
1944
if (not best_effort)
1947
identifier.getSQLPath(path);
1948
errmsg_printf(error::WARN, _("Could not remove temporary table: '%s', error: %d"),
1949
path.c_str(), errno);
1958
bool Open_tables_state::rm_temporary_table(plugin::StorageEngine *base, const identifier::Table &identifier)
1960
drizzled::error_t error;
1963
if (not plugin::StorageEngine::dropTable(*static_cast<Session *>(this), *base, identifier, error))
1966
identifier.getSQLPath(path);
1967
errmsg_printf(error::WARN, _("Could not remove temporary table: '%s', error: %d"),
1968
path.c_str(), error);
1977
@note this will be removed, I am looking through Hudson to see if it is finding
1978
any tables that are missed during cleanup.
1980
void Open_tables_state::dumpTemporaryTableNames(const char *foo)
1984
if (not temporary_tables)
1987
cerr << "Begin Run: " << foo << "\n";
1988
for (table= temporary_tables; table; table= table->getNext())
1990
bool have_proto= false;
1992
message::Table *proto= table->getShare()->getTableMessage();
1993
if (table->getShare()->getTableMessage())
1996
const char *answer= have_proto ? "true" : "false";
2000
cerr << "\tTable Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2001
cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
2005
cerr << "\tTabl;e Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2010
table::Singular *Session::getInstanceTable()
2012
temporary_shares.push_back(new table::Singular()); // This will not go into the tableshare cache, so no key is used.
2014
table::Singular *tmp_share= temporary_shares.back();
2023
Create a reduced Table object with properly set up Field list from a
2024
list of field definitions.
2026
The created table doesn't have a table Cursor associated with
2027
it, has no keys, no group/distinct, no copy_funcs array.
2028
The sole purpose of this Table object is to use the power of Field
2029
class to read/write data to/from table->getInsertRecord(). Then one can store
2030
the record in any container (RB tree, hash, etc).
2031
The table is created in Session mem_root, so are the table's fields.
2032
Consequently, if you don't BLOB fields, you don't need to free it.
2034
@param session connection handle
2035
@param field_list list of column definitions
2038
0 if out of memory, Table object in case of success
2040
table::Singular *Session::getInstanceTable(List<CreateField> &field_list)
2042
temporary_shares.push_back(new table::Singular(this, field_list)); // This will not go into the tableshare cache, so no key is used.
2044
table::Singular *tmp_share= temporary_shares.back();
2053
static const std::string NONE= "NONE";
2054
static const std::string GOT_GLOBAL_READ_LOCK= "HAS GLOBAL READ LOCK";
2055
static const std::string MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT= "HAS GLOBAL READ LOCK WITH BLOCKING COMMIT";
2057
const std::string &type(drizzled::Session::global_read_lock_t type)
2063
case Session::GOT_GLOBAL_READ_LOCK:
2064
return GOT_GLOBAL_READ_LOCK;
2065
case Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT:
2066
return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT;
2070
size_t max_string_length(drizzled::Session::global_read_lock_t)
2072
return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT.size();
2075
} /* namespace display */
2077
} /* namespace drizzled */
2293
void mark_transaction_to_rollback(Session *session, bool all)
2297
session->is_fatal_sub_stmt_error= true;
2298
session->transaction_rollback_request= all;
2301
/***************************************************************************
2302
Handling of XA id cacheing
2303
***************************************************************************/
2305
pthread_mutex_t LOCK_xid_cache;
2308
extern "C" unsigned char *xid_get_hash_key(const unsigned char *, size_t *, bool);
2309
extern "C" void xid_free_hash(void *);
2311
unsigned char *xid_get_hash_key(const unsigned char *ptr, size_t *length,
2312
bool not_used __attribute__((unused)))
2314
*length=((XID_STATE*)ptr)->xid.key_length();
2315
return ((XID_STATE*)ptr)->xid.key();
2318
void xid_free_hash(void *ptr)
2320
if (!((XID_STATE*)ptr)->in_session)
2321
free((unsigned char*)ptr);
2324
bool xid_cache_init()
2326
pthread_mutex_init(&LOCK_xid_cache, MY_MUTEX_INIT_FAST);
2327
return hash_init(&xid_cache, &my_charset_bin, 100, 0, 0,
2328
xid_get_hash_key, xid_free_hash, 0) != 0;
2331
void xid_cache_free()
2333
if (hash_inited(&xid_cache))
2335
hash_free(&xid_cache);
2336
pthread_mutex_destroy(&LOCK_xid_cache);
2340
XID_STATE *xid_cache_search(XID *xid)
2342
pthread_mutex_lock(&LOCK_xid_cache);
2343
XID_STATE *res=(XID_STATE *)hash_search(&xid_cache, xid->key(), xid->key_length());
2344
pthread_mutex_unlock(&LOCK_xid_cache);
2349
bool xid_cache_insert(XID *xid, enum xa_states xa_state)
2353
pthread_mutex_lock(&LOCK_xid_cache);
2354
if (hash_search(&xid_cache, xid->key(), xid->key_length()))
2356
else if (!(xs=(XID_STATE *)my_malloc(sizeof(*xs), MYF(MY_WME))))
2360
xs->xa_state=xa_state;
2363
res=my_hash_insert(&xid_cache, (unsigned char*)xs);
2365
pthread_mutex_unlock(&LOCK_xid_cache);
2370
bool xid_cache_insert(XID_STATE *xid_state)
2372
pthread_mutex_lock(&LOCK_xid_cache);
2373
assert(hash_search(&xid_cache, xid_state->xid.key(),
2374
xid_state->xid.key_length())==0);
2375
bool res=my_hash_insert(&xid_cache, (unsigned char*)xid_state);
2376
pthread_mutex_unlock(&LOCK_xid_cache);
2381
void xid_cache_delete(XID_STATE *xid_state)
2383
pthread_mutex_lock(&LOCK_xid_cache);
2384
hash_delete(&xid_cache, (unsigned char *)xid_state);
2385
pthread_mutex_unlock(&LOCK_xid_cache);
2389
Implementation of interface to write rows to the binary log through the
2390
thread. The thread is responsible for writing the rows it has
2391
inserted/updated/deleted.
2396
Template member function for ensuring that there is an rows log
2397
event of the apropriate type before proceeding.
2400
- Events of type 'RowEventT' have the type code 'type_code'.
2403
If a non-NULL pointer is returned, the pending event for thread 'session' will
2404
be an event of type 'RowEventT' (which have the type code 'type_code')
2405
will either empty or have enough space to hold 'needed' bytes. In
2406
addition, the columns bitmap will be correct for the row, meaning that
2407
the pending event will be flushed if the columns in the event differ from
2408
the columns suppled to the function.
2411
If no error, a non-NULL pending event (either one which already existed or
2412
the newly created one).
2416
template <class RowsEventT> Rows_log_event*
2417
Session::binlog_prepare_pending_rows_event(Table* table, uint32_t serv_id,
2419
bool is_transactional,
2420
RowsEventT *hint __attribute__((unused)))
2422
/* Pre-conditions */
2423
assert(table->s->table_map_id != UINT32_MAX);
2425
/* Fetch the type code for the RowsEventT template parameter */
2426
int const type_code= RowsEventT::TYPE_CODE;
2429
There is no good place to set up the transactional data, so we
2432
if (binlog_setup_trx_data())
2435
Rows_log_event* pending= binlog_get_pending_rows_event();
2437
if (unlikely(pending && !pending->is_valid()))
2441
Check if the current event is non-NULL and a write-rows
2442
event. Also check if the table provided is mapped: if it is not,
2443
then we have switched to writing to a new table.
2444
If there is no pending event, we need to create one. If there is a pending
2445
event, but it's not about the same table id, or not of the same type
2446
(between Write, Update and Delete), or not the same affected columns, or
2447
going to be too big, flush this event to disk and create a new pending
2450
The last test is necessary for the Cluster injector to work
2451
correctly. The reason is that the Cluster can inject two write
2452
rows with different column bitmaps if there is an insert followed
2453
by an update in the same transaction, and these are grouped into a
2454
single epoch/transaction when fed to the injector.
2456
TODO: Fix the code so that the last test can be removed.
2459
pending->server_id != serv_id ||
2460
pending->get_table_id() != table->s->table_map_id ||
2461
pending->get_type_code() != type_code ||
2462
pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
2463
!bitmap_cmp(pending->get_cols(), table->write_set))
2465
/* Create a new RowsEventT... */
2466
Rows_log_event* const
2467
ev= new RowsEventT(this, table, table->s->table_map_id,
2471
ev->server_id= serv_id; // I don't like this, it's too easy to forget.
2473
flush the pending event and replace it with the newly created
2476
if (unlikely(mysql_bin_log.flush_and_set_pending_rows_event(this, ev)))
2482
return(ev); /* This is the new pending event */
2484
return(pending); /* This is the current pending event */
2487
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
2489
Instantiate the versions we need, we have -fno-implicit-template as
2492
template Rows_log_event*
2493
Session::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
2494
Write_rows_log_event*);
2496
template Rows_log_event*
2497
Session::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
2498
Delete_rows_log_event *);
2500
template Rows_log_event*
2501
Session::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
2502
Update_rows_log_event *);
2507
Class to handle temporary allocation of memory for row data.
2509
The responsibilities of the class is to provide memory for
2510
packing one or two rows of packed data (depending on what
2511
constructor is called).
2513
In order to make the allocation more efficient for "simple" rows,
2514
i.e., rows that do not contain any blobs, a pointer to the
2515
allocated memory is of memory is stored in the table structure
2516
for simple rows. If memory for a table containing a blob field
2517
is requested, only memory for that is allocated, and subsequently
2518
released when the object is destroyed.
2521
class Row_data_memory {
2524
Build an object to keep track of a block-local piece of memory
2525
for storing a row of data.
2528
Table where the pre-allocated memory is stored.
2531
Length of data that is needed, if the record contain blobs.
2533
Row_data_memory(Table *table, size_t const len1)
2536
m_alloc_checked= false;
2537
allocate_memory(table, len1);
2538
m_ptr[0]= has_memory() ? m_memory : 0;
2542
Row_data_memory(Table *table, size_t const len1, size_t const len2)
2545
m_alloc_checked= false;
2546
allocate_memory(table, len1 + len2);
2547
m_ptr[0]= has_memory() ? m_memory : 0;
2548
m_ptr[1]= has_memory() ? m_memory + len1 : 0;
2553
if (m_memory != 0 && m_release_memory_on_destruction)
2554
free((unsigned char*) m_memory);
2558
Is there memory allocated?
2560
@retval true There is memory allocated
2561
@retval false Memory allocation failed
2563
bool has_memory() const {
2564
m_alloc_checked= true;
2565
return m_memory != 0;
2568
unsigned char *slot(uint32_t s)
2570
assert(s < sizeof(m_ptr)/sizeof(*m_ptr));
2571
assert(m_ptr[s] != 0);
2572
assert(m_alloc_checked == true);
2577
void allocate_memory(Table *const table, size_t const total_length)
2579
if (table->s->blob_fields == 0)
2582
The maximum length of a packed record is less than this
2583
length. We use this value instead of the supplied length
2584
when allocating memory for records, since we don't know how
2585
the memory will be used in future allocations.
2587
Since table->s->reclength is for unpacked records, we have
2588
to add two bytes for each field, which can potentially be
2589
added to hold the length of a packed field.
2591
size_t const maxlen= table->s->reclength + 2 * table->s->fields;
2594
Allocate memory for two records if memory hasn't been
2595
allocated. We allocate memory for two records so that it can
2596
be used when processing update rows as well.
2598
if (table->write_row_record == 0)
2599
table->write_row_record=
2600
(unsigned char *) alloc_root(&table->mem_root, 2 * maxlen);
2601
m_memory= table->write_row_record;
2602
m_release_memory_on_destruction= false;
2606
m_memory= (unsigned char *) my_malloc(total_length, MYF(MY_WME));
2607
m_release_memory_on_destruction= true;
2611
mutable bool m_alloc_checked;
2612
bool m_release_memory_on_destruction;
2613
unsigned char *m_memory;
2614
unsigned char *m_ptr[2];
2619
int Session::binlog_write_row(Table* table, bool is_trans,
2620
unsigned char const *record)
2622
assert(mysql_bin_log.is_open());
2625
Pack records into format for transfer. We are allocating more
2626
memory than needed, but that doesn't matter.
2628
Row_data_memory memory(table, table->max_row_length(record));
2629
if (!memory.has_memory())
2630
return HA_ERR_OUT_OF_MEM;
2632
unsigned char *row_data= memory.slot(0);
2634
size_t const len= pack_row(table, table->write_set, row_data, record);
2636
Rows_log_event* const ev=
2637
binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
2638
static_cast<Write_rows_log_event*>(0));
2640
if (unlikely(ev == 0))
2641
return HA_ERR_OUT_OF_MEM;
2643
return ev->add_row_data(row_data, len);
2646
int Session::binlog_update_row(Table* table, bool is_trans,
2647
const unsigned char *before_record,
2648
const unsigned char *after_record)
2650
assert(mysql_bin_log.is_open());
2652
size_t const before_maxlen = table->max_row_length(before_record);
2653
size_t const after_maxlen = table->max_row_length(after_record);
2655
Row_data_memory row_data(table, before_maxlen, after_maxlen);
2656
if (!row_data.has_memory())
2657
return HA_ERR_OUT_OF_MEM;
2659
unsigned char *before_row= row_data.slot(0);
2660
unsigned char *after_row= row_data.slot(1);
2662
size_t const before_size= pack_row(table, table->read_set, before_row,
2664
size_t const after_size= pack_row(table, table->write_set, after_row,
2667
Rows_log_event* const ev=
2668
binlog_prepare_pending_rows_event(table, server_id,
2669
before_size + after_size, is_trans,
2670
static_cast<Update_rows_log_event*>(0));
2672
if (unlikely(ev == 0))
2673
return HA_ERR_OUT_OF_MEM;
2676
ev->add_row_data(before_row, before_size) ||
2677
ev->add_row_data(after_row, after_size);
2680
int Session::binlog_delete_row(Table* table, bool is_trans,
2681
unsigned char const *record)
2683
assert(mysql_bin_log.is_open());
2686
Pack records into format for transfer. We are allocating more
2687
memory than needed, but that doesn't matter.
2689
Row_data_memory memory(table, table->max_row_length(record));
2690
if (unlikely(!memory.has_memory()))
2691
return HA_ERR_OUT_OF_MEM;
2693
unsigned char *row_data= memory.slot(0);
2695
size_t const len= pack_row(table, table->read_set, row_data, record);
2697
Rows_log_event* const ev=
2698
binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
2699
static_cast<Delete_rows_log_event*>(0));
2701
if (unlikely(ev == 0))
2702
return HA_ERR_OUT_OF_MEM;
2704
return ev->add_row_data(row_data, len);
2708
int Session::binlog_flush_pending_rows_event(bool stmt_end)
2711
We shall flush the pending event even if we are not in row-based
2712
mode: it might be the case that we left row-based mode before
2713
flushing anything (e.g., if we have explicitly locked tables).
2715
if (!mysql_bin_log.is_open())
2719
Mark the event as the last event of a statement if the stmt_end
2723
if (Rows_log_event *pending= binlog_get_pending_rows_event())
2727
pending->set_flags(Rows_log_event::STMT_END_F);
2728
pending->flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
2729
binlog_table_maps= 0;
2732
error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0);
2740
Member function that will log query, either row-based or
2741
statement-based depending on the value of the 'current_stmt_binlog_row_based'
2742
the value of the 'qtype' flag.
2744
This function should be called after the all calls to ha_*_row()
2745
functions have been issued, but before tables are unlocked and
2749
There shall be no writes to any system table after calling
2750
binlog_query(), so these writes has to be moved to before the call
2751
of binlog_query() for correct functioning.
2753
This is necessesary not only for RBR, but the master might crash
2754
after binlogging the query but before changing the system tables.
2755
This means that the slave and the master are not in the same state
2756
(after the master has restarted), so therefore we have to
2757
eliminate this problem.
2760
Error code, or 0 if no error.
2762
int Session::binlog_query(Session::enum_binlog_query_type qtype, char const *query_arg,
2763
ulong query_len, bool is_trans, bool suppress_use,
2764
Session::killed_state killed_status_arg)
2766
assert(query_arg && mysql_bin_log.is_open());
2768
if (int error= binlog_flush_pending_rows_event(true))
2772
If we are in statement mode and trying to log an unsafe statement,
2773
we should print a warning.
2775
if (lex->is_stmt_unsafe() &&
2776
variables.binlog_format == BINLOG_FORMAT_STMT)
2778
assert(this->query != NULL);
2779
push_warning(this, DRIZZLE_ERROR::WARN_LEVEL_WARN,
2780
ER_BINLOG_UNSAFE_STATEMENT,
2781
ER(ER_BINLOG_UNSAFE_STATEMENT));
2782
if (!(binlog_flags & BINLOG_FLAG_UNSAFE_STMT_PRINTED))
2784
char warn_buf[DRIZZLE_ERRMSG_SIZE];
2785
snprintf(warn_buf, DRIZZLE_ERRMSG_SIZE, "%s Statement: %s",
2786
ER(ER_BINLOG_UNSAFE_STATEMENT), this->query);
2787
sql_print_warning("%s",warn_buf);
2788
binlog_flags|= BINLOG_FLAG_UNSAFE_STMT_PRINTED;
2793
case Session::ROW_QUERY_TYPE:
2795
case Session::DRIZZLE_QUERY_TYPE:
2797
Using this query type is a conveniece hack, since we have been
2798
moving back and forth between using RBR for replication of
2799
system tables and not using it.
2801
Make sure to change in check_table_binlog_row_based() according
2802
to how you treat this.
2804
case Session::STMT_QUERY_TYPE:
2806
The DRIZZLE_LOG::write() function will set the STMT_END_F flag and
2807
flush the pending rows event if necessary.
2810
Query_log_event qinfo(this, query_arg, query_len, is_trans, suppress_use,
2812
qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
2814
Binlog table maps will be irrelevant after a Query_log_event
2815
(they are just removed on the slave side) so after the query
2816
log event is written to the binary log, we pretend that no
2817
table maps were written.
2819
int error= mysql_bin_log.write(&qinfo);
2820
binlog_table_maps= 0;
2825
case Session::QUERY_TYPE_COUNT:
2827
assert(0 <= qtype && qtype < QUERY_TYPE_COUNT);
2832
bool Discrete_intervals_list::append(uint64_t start, uint64_t val,
2835
/* first, see if this can be merged with previous */
2836
if ((head == NULL) || tail->merge_if_contiguous(start, val, incr))
2838
/* it cannot, so need to add a new interval */
2839
Discrete_interval *new_interval= new Discrete_interval(start, val, incr);
2840
return(append(new_interval));
2845
bool Discrete_intervals_list::append(Discrete_interval *new_interval)
2847
if (unlikely(new_interval == NULL))
2850
head= current= new_interval;
2852
tail->next= new_interval;
2861
@param session Thread handle
2862
@param errcode Error code to print to console
2863
@param lock 1 if we have have to lock LOCK_thread_count
2866
For the connection that is doing shutdown, this is called twice
2868
void close_connection(Session *session, uint32_t errcode, bool lock)
2872
(void) pthread_mutex_lock(&LOCK_thread_count);
2873
session->killed= Session::KILL_CONNECTION;
2874
if ((vio= session->net.vio) != 0)
2877
net_send_error(session, errcode, ER(errcode)); /* purecov: inspected */
2878
net_close(&(session->net)); /* vio is freed in delete session */
2881
(void) pthread_mutex_unlock(&LOCK_thread_count);