96
41
const char * const Session::DEFAULT_WHERE= "field list";
44
/*****************************************************************************
45
** Instansiate templates
46
*****************************************************************************/
48
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
50
template class List<Key>;
51
template class List_iterator<Key>;
52
template class List<Key_part_spec>;
53
template class List_iterator<Key_part_spec>;
54
template class List<Alter_drop>;
55
template class List_iterator<Alter_drop>;
56
template class List<Alter_column>;
57
template class List_iterator<Alter_column>;
60
/****************************************************************************
62
****************************************************************************/
64
extern "C" unsigned char *get_var_key(user_var_entry *entry, size_t *length,
65
bool not_used __attribute__((unused)))
67
*length= entry->name.length;
68
return (unsigned char*) entry->name.str;
71
extern "C" void free_user_var(user_var_entry *entry)
73
char *pos= (char*) entry+ALIGN_SIZE(sizeof(*entry));
74
if (entry->value && entry->value != pos)
98
79
bool Key_part_spec::operator==(const Key_part_spec& other) const
100
81
return length == other.length &&
101
82
field_name.length == other.field_name.length &&
102
!my_strcasecmp(system_charset_info, field_name.str, other.field_name.str);
105
Open_tables_state::Open_tables_state(uint64_t version_arg) :
108
open_tables= temporary_tables= derived_tables= NULL;
109
extra_lock= lock= NULL;
83
!strcmp(field_name.str, other.field_name.str);
87
Construct an (almost) deep copy of this key. Only those
88
elements that are known to never change are not copied.
89
If out of memory, a partial copy is returned and an error is set
93
Key::Key(const Key &rhs, MEM_ROOT *mem_root)
95
key_create_info(rhs.key_create_info),
96
columns(rhs.columns, mem_root),
98
generated(rhs.generated)
100
list_copy_and_replace_each_value(columns, mem_root);
104
Construct an (almost) deep copy of this foreign key. Only those
105
elements that are known to never change are not copied.
106
If out of memory, a partial copy is returned and an error is set
110
Foreign_key::Foreign_key(const Foreign_key &rhs, MEM_ROOT *mem_root)
112
ref_table(rhs.ref_table),
113
ref_columns(rhs.ref_columns),
114
delete_opt(rhs.delete_opt),
115
update_opt(rhs.update_opt),
116
match_opt(rhs.match_opt)
118
list_copy_and_replace_each_value(ref_columns, mem_root);
122
Test if a foreign key (= generated key) is a prefix of the given key
123
(ignoring key name, key type and order of columns)
126
This is only used to test if an index for a FOREIGN KEY exists
129
We only compare field names
132
0 Generated key is a prefix of other key
136
bool foreign_key_prefix(Key *a, Key *b)
138
/* Ensure that 'a' is the generated key */
141
if (b->generated && a->columns.elements > b->columns.elements)
142
std::swap(a, b); // Put shorter key in 'a'
147
return true; // No foreign key
148
std::swap(a, b); // Put generated key in 'a'
151
/* Test if 'a' is a prefix of 'b' */
152
if (a->columns.elements > b->columns.elements)
153
return true; // Can't be prefix
155
List_iterator<Key_part_spec> col_it1(a->columns);
156
List_iterator<Key_part_spec> col_it2(b->columns);
157
const Key_part_spec *col1, *col2;
159
#ifdef ENABLE_WHEN_INNODB_CAN_HANDLE_SWAPED_FOREIGN_KEY_COLUMNS
160
while ((col1= col_it1++))
164
while ((col2= col_it2++))
173
return true; // Error
175
return false; // Is prefix
177
while ((col1= col_it1++))
180
if (!(*col1 == *col2))
183
return false; // Is prefix
189
Check if the foreign key options are compatible with columns
190
on which the FK is created.
196
bool Foreign_key::validate(List<Create_field> &table_fields)
198
Create_field *sql_field;
199
Key_part_spec *column;
200
List_iterator<Key_part_spec> cols(columns);
201
List_iterator<Create_field> it(table_fields);
202
while ((column= cols++))
205
while ((sql_field= it++) &&
206
my_strcasecmp(system_charset_info,
207
column->field_name.str,
208
sql_field->field_name)) {}
211
my_error(ER_KEY_COLUMN_DOES_NOT_EXITS, MYF(0), column->field_name.str);
214
if (type == Key::FOREIGN_KEY && sql_field->vcol_info)
216
if (delete_opt == FK_OPTION_SET_NULL)
218
my_error(ER_WRONG_FK_OPTION_FOR_VIRTUAL_COLUMN, MYF(0),
219
"ON DELETE SET NULL");
222
if (update_opt == FK_OPTION_SET_NULL)
224
my_error(ER_WRONG_FK_OPTION_FOR_VIRTUAL_COLUMN, MYF(0),
225
"ON UPDATE SET NULL");
228
if (update_opt == FK_OPTION_CASCADE)
230
my_error(ER_WRONG_FK_OPTION_FOR_VIRTUAL_COLUMN, MYF(0),
231
"ON UPDATE CASCADE");
240
/****************************************************************************
241
** Thread specific functions
242
****************************************************************************/
244
Open_tables_state::Open_tables_state(ulong version_arg)
245
:version(version_arg), state_flags(0U)
247
reset_open_tables_state();
113
251
The following functions form part of the C plugin API
115
int tmpfile(const char *prefix)
254
extern "C" int mysql_tmpfile(const char *prefix)
117
256
char filename[FN_REFLEN];
118
int fd = internal::create_temp_file(filename, drizzle_tmpdir.c_str(), prefix, MYF(MY_WME));
257
File fd = create_temp_file(filename, mysql_tmpdir, prefix,
258
O_CREAT | O_EXCL | O_RDWR,
120
261
unlink(filename);
126
void **Session::getEngineData(const plugin::MonitoredInTransaction *monitored)
128
return static_cast<void **>(&ha_data[monitored->getId()].ha_ptr);
131
ResourceContext *Session::getResourceContext(const plugin::MonitoredInTransaction *monitored,
134
return &ha_data[monitored->getId()].resource_context[index];
269
int session_in_lock_tables(const Session *session)
271
return test(session->in_lock_tables);
276
int session_tablespace_op(const Session *session)
278
return test(session->tablespace_op);
283
Set the process info field of the Session structure.
285
This function is used by plug-ins. Internally, the
286
Session::set_proc_info() function should be used.
288
@see Session::set_proc_info
291
set_session_proc_info(Session *session, const char *info)
293
session->set_proc_info(info);
297
const char *get_session_proc_info(Session *session)
299
return session->get_proc_info();
303
void **session_ha_data(const Session *session, const struct handlerton *hton)
305
return (void **) &session->ha_data[hton->slot].ha_ptr;
137
309
int64_t session_test_options(const Session *session, int64_t test_options)
139
311
return session->options & test_options;
142
Session::Session(plugin::Client *client_arg, catalog::Instance::shared_ptr catalog_arg) :
143
Open_tables_state(refresh_version),
144
mem_root(&main_mem_root),
147
query(new std::string),
148
_schema(new std::string("")),
152
lock_id(&main_lock_id),
154
security_ctx(identifier::User::make_shared()),
155
_where(Session::DEFAULT_WHERE),
156
dbug_sentry(Session_SENTRY_MAGIC),
158
command(COM_CONNECT),
160
_epoch(boost::gregorian::date(1970,1,1)),
161
_connect_time(boost::posix_time::microsec_clock::universal_time()),
163
ha_data(plugin::num_trx_monitored_objects),
166
concurrent_execute_allowed(true),
167
arg_of_last_insert_id_function(false),
168
first_successful_insert_id_in_prev_stmt(0),
169
first_successful_insert_id_in_cur_stmt(0),
171
options(session_startup_options),
174
examined_row_count(0),
178
statement_id_counter(0),
182
_global_read_lock(NONE),
183
count_cuted_fields(CHECK_FIELD_ERROR_FOR_NULL),
185
some_tables_deleted(false),
188
is_fatal_error(false),
189
transaction_rollback_request(false),
190
is_fatal_sub_stmt_error(0),
191
tablespace_op(false),
192
derived_tables_processing(false),
195
transaction_message(NULL),
196
statement_message(NULL),
197
session_event_observers(NULL),
198
_catalog(catalog_arg),
201
client->setSession(this);
315
int session_sql_command(const Session *session)
317
return (int) session->lex->sql_command;
321
int session_tx_isolation(const Session *session)
323
return (int) session->variables.tx_isolation;
327
void session_inc_row_count(Session *session)
329
session->row_count++;
333
Clear this diagnostics area.
335
Normally called at the end of a statement.
339
Diagnostics_area::reset_diagnostics_area()
341
can_overwrite_status= false;
342
/** Don't take chances in production */
348
m_total_warn_count= 0;
350
/** Tiny reset in debug mode to see garbage right away */
356
Set OK status -- ends commands that do not return a
357
result set, e.g. INSERT/UPDATE/DELETE.
361
Diagnostics_area::set_ok_status(Session *session, ha_rows affected_rows_arg,
362
uint64_t last_insert_id_arg,
363
const char *message_arg)
367
In production, refuse to overwrite an error or a custom response
370
if (is_error() || is_disabled())
372
/** Only allowed to report success if has not yet reported an error */
374
m_server_status= session->server_status;
375
m_total_warn_count= session->total_warn_count;
376
m_affected_rows= affected_rows_arg;
377
m_last_insert_id= last_insert_id_arg;
379
strmake(m_message, message_arg, sizeof(m_message) - 1);
391
Diagnostics_area::set_eof_status(Session *session)
393
/** Only allowed to report eof if has not yet reported an error */
397
In production, refuse to overwrite an error or a custom response
400
if (is_error() || is_disabled())
403
m_server_status= session->server_status;
405
If inside a stored procedure, do not return the total
406
number of warnings, since they are not available to the client
409
m_total_warn_count= session->total_warn_count;
419
Diagnostics_area::set_error_status(Session *session __attribute__((unused)),
420
uint32_t sql_errno_arg,
421
const char *message_arg)
424
Only allowed to report error if has not yet reported a success
425
The only exception is when we flush the message to the client,
426
an error can happen during the flush.
428
assert(! is_set() || can_overwrite_status);
430
In production, refuse to overwrite a custom response with an
436
m_sql_errno= sql_errno_arg;
437
strmake(m_message, message_arg, sizeof(m_message) - 1);
444
Mark the diagnostics area as 'DISABLED'.
446
This is used in rare cases when the COM_ command at hand sends a response
447
in a custom format. One example is the query cache, another is
452
Diagnostics_area::disable_status()
455
m_status= DA_DISABLED;
460
:Statement(&main_lex, &main_mem_root,
461
/* statement id */ 0),
462
Open_tables_state(refresh_version), rli_fake(0),
463
lock_id(&main_lock_id),
465
binlog_table_maps(0), binlog_flags(0UL),
466
arg_of_last_insert_id_function(false),
467
first_successful_insert_id_in_prev_stmt(0),
468
first_successful_insert_id_in_prev_stmt_for_binlog(0),
469
first_successful_insert_id_in_cur_stmt(0),
470
stmt_depends_on_first_successful_insert_id_in_prev_stmt(false),
473
transaction_rollback_request(0),
474
is_fatal_sub_stmt_error(0),
478
derived_tables_processing(false),
204
484
Pass nominal parameters to init_alloc_root only to ensure that
205
485
the destructor works OK in case of an error. The main_mem_root
206
486
will be re-initialized in init_for_queries().
208
memory::init_sql_alloc(&main_mem_root, memory::ROOT_MIN_BLOCK_SIZE, 0);
488
init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0);
490
catalog= (char*)"std"; // the only catalog we have for now
491
main_security_ctx.init();
492
security_ctx= &main_security_ctx;
493
some_tables_deleted=no_errors=password= 0;
495
count_cuted_fields= CHECK_FIELD_IGNORE;
498
is_slave_error= thread_specific_used= false;
499
hash_clear(&handler_tables_hash);
209
502
cuted_fields= sent_row_count= row_count= 0L;
505
statement_id_counter= 0UL;
210
506
// Must be reset to handle error with Session's created for init of mysqld
211
507
lex->current_select= 0;
508
start_time=(time_t) 0;
510
utime_after_lock= 0L;
212
513
memset(&variables, 0, sizeof(variables));
213
scoreboard_index= -1;
214
cleanup_done= abort_on_warning= no_warnings_for_error= false;
216
/* query_cache init */
519
db_charset= global_system_variables.collation_database;
520
memset(ha_data, 0, sizeof(ha_data));
522
binlog_evt_union.do_union= false;
523
dbug_sentry=Session_SENTRY_MAGIC;
525
client_capabilities= 0; // minimalistic client
526
system_thread= NON_SYSTEM_THREAD;
527
cleanup_done= abort_on_warning= no_warnings_for_error= 0;
528
peer_port= 0; // For SHOW PROCESSLIST
529
transaction.m_pending_rows_event= 0;
531
pthread_mutex_init(&LOCK_delete, MY_MUTEX_INIT_FAST);
220
533
/* Variables with default values */
221
534
proc_info="login";
223
plugin_sessionvar_init(this);
225
variables= global_system_variables above has reset
226
variables.pseudo_thread_id to 0. We need to correct it here to
227
avoid temporary tables replication failure.
229
variables.pseudo_thread_id= thread_id;
230
server_status= SERVER_STATUS_AUTOCOMMIT;
232
if (variables.max_join_size == HA_POS_ERROR)
233
options |= OPTION_BIG_SELECTS;
235
options &= ~OPTION_BIG_SELECTS;
237
open_options=ha_open_options;
238
update_lock_default= TL_WRITE;
239
session_tx_isolation= (enum_tx_isolation) variables.tx_isolation;
241
memset(warn_count, 0, sizeof(warn_count));
242
memset(&status_var, 0, sizeof(status_var));
535
where= Session::DEFAULT_WHERE;
536
server_id = ::server_id;
244
542
/* Initialize sub structures */
245
memory::init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
543
init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
544
user_connect=(USER_CONN *)0;
545
hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0,
546
(hash_get_key) get_var_key,
547
(hash_free_key) free_user_var, 0);
549
/* For user vars replication*/
551
my_init_dynamic_array(&user_var_events,
552
sizeof(BINLOG_USER_VAR_EVENT *), 16, 16);
554
memset(&user_var_events, 0, sizeof(user_var_events));
557
protocol= &protocol_text; // Default protocol
558
protocol_text.init(this);
560
const Query_id& query_id= Query_id::get_query_id();
561
tablespace_op= false;
563
randominit(&rand, tmp + (ulong) &rand, tmp + query_id.value());
247
564
substitute_null_with_insert_id = false;
248
lock_info.init(); /* safety: will be reset after start */
565
thr_lock_info_init(&lock_info); /* safety: will be reset after start */
249
566
thr_lock_owner_init(&main_lock_id, &lock_info);
251
568
m_internal_handler= NULL;
253
plugin::EventObserver::registerSessionEvents(*this);
256
void Session::free_items()
259
/* This works because items are allocated with memory::sql_alloc() */
260
for (; free_list; free_list= next)
262
next= free_list->next;
263
free_list->delete_self();
267
572
void Session::push_internal_handler(Internal_error_handler *handler)
465
899
assert(thread_stack);
467
currentSession().release();
468
currentSession().reset(this);
470
currentMemRoot().release();
471
currentMemRoot().reset(&mem_root);
901
if (pthread_setspecific(THR_Session, this) ||
902
pthread_setspecific(THR_MALLOC, &mem_root))
473
904
mysys_var=my_thread_var;
476
906
Let mysqld define the thread id (not mysys)
477
907
This allows us to move Session to different threads if needed.
479
909
mysys_var->id= thread_id;
910
real_id= pthread_self(); // For debugging
482
913
We have to call thr_lock_info_init() again here as Session may have been
483
914
created in another thread
491
Init Session for query processing.
492
This has to be called once before we call mysql_parse.
493
See also comments in session.h.
496
void Session::prepareForQueries()
498
if (variables.max_join_size == HA_POS_ERROR)
499
options |= OPTION_BIG_SELECTS;
501
version= refresh_version;
506
mem_root->reset_root_defaults(variables.query_alloc_block_size,
507
variables.query_prealloc_size);
508
transaction.xid_state.xid.null();
509
transaction.xid_state.in_session=1;
514
bool Session::initGlobals()
518
disconnect(ER_OUT_OF_RESOURCES);
519
status_var.aborted_connects++;
527
if (initGlobals() || authenticate())
535
while (not client->haveError() && getKilled() != KILL_CONNECTION)
537
if (not executeStatement())
544
bool Session::schedule(Session::shared_ptr &arg)
546
arg->scheduler= plugin::Scheduler::getScheduler();
547
assert(arg->scheduler);
551
long current_connections= connection_count;
553
if (current_connections > 0 and static_cast<uint64_t>(current_connections) > current_global_counters.max_used_connections)
555
current_global_counters.max_used_connections= static_cast<uint64_t>(connection_count);
558
current_global_counters.connections++;
559
arg->thread_id= arg->variables.pseudo_thread_id= global_thread_id++;
561
session::Cache::singleton().insert(arg);
563
if (unlikely(plugin::EventObserver::connectSession(*arg)))
565
// We should do something about an error...
568
if (plugin::Scheduler::getScheduler()->addSession(arg))
570
DRIZZLE_CONNECTION_START(arg->getSessionId());
571
char error_message_buff[DRIZZLE_ERRMSG_SIZE];
573
arg->setKilled(Session::KILL_CONNECTION);
575
arg->status_var.aborted_connects++;
577
/* Can't use my_error() since store_globals has not been called. */
578
/* TODO replace will better error message */
579
snprintf(error_message_buff, sizeof(error_message_buff),
580
ER(ER_CANT_CREATE_THREAD), 1);
581
arg->client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
591
Is this session viewable by the current user?
593
bool Session::isViewable(identifier::User::const_reference user_arg) const
595
return plugin::Authorization::isAuthorized(user_arg, *this, false);
599
const char* Session::enter_cond(boost::condition_variable_any &cond, boost::mutex &mutex, const char* msg)
601
const char* old_msg = get_proc_info();
602
safe_mutex_assert_owner(mutex);
603
mysys_var->current_mutex = &mutex;
604
mysys_var->current_cond = &cond;
605
this->set_proc_info(msg);
609
void Session::exit_cond(const char* old_msg)
612
Putting the mutex unlock in exit_cond() ensures that
613
mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
614
locked (if that would not be the case, you'll get a deadlock if someone
615
does a Session::awake() on you).
617
mysys_var->current_mutex->unlock();
618
boost_unique_lock_t scopedLock(mysys_var->mutex);
619
mysys_var->current_mutex = 0;
620
mysys_var->current_cond = 0;
621
this->set_proc_info(old_msg);
624
bool Session::authenticate()
626
if (client->authenticate())
629
status_var.aborted_connects++;
634
bool Session::checkUser(const std::string &passwd_str,
635
const std::string &in_db)
637
bool is_authenticated=
638
plugin::Authentication::isAuthenticated(*user(), passwd_str);
640
if (is_authenticated != true)
642
status_var.access_denied++;
643
/* isAuthenticated has pushed the error message */
647
/* Change database if necessary */
648
if (not in_db.empty())
650
identifier::Schema identifier(in_db);
651
if (schema::change(*this, identifier))
653
/* change_db() has pushed the error message. */
658
password= not passwd_str.empty();
660
/* Ready to handle queries */
664
bool Session::executeStatement()
667
uint32_t packet_length;
669
enum enum_server_command l_command;
672
indicator of uninitialized lex => normal flow of errors handling
675
lex->current_select= 0;
677
main_da.reset_diagnostics_area();
679
if (client->readCommand(&l_packet, &packet_length) == false)
684
if (getKilled() == KILL_CONNECTION)
687
if (packet_length == 0)
690
l_command= static_cast<enum_server_command>(l_packet[0]);
692
if (command >= COM_END)
693
command= COM_END; // Wrong command
695
assert(packet_length);
696
return not dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
699
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
701
/* Remove garbage at start and end of query */
702
while (in_packet_length > 0 && my_isspace(charset(), in_packet[0]))
707
const char *pos= in_packet + in_packet_length; /* Point at end null */
708
while (in_packet_length > 0 && (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
714
std::string *new_query= new std::string(in_packet, in_packet + in_packet_length);
715
// We can not be entirely sure _schema has a value
718
plugin::QueryRewriter::rewriteQuery(*_schema, *new_query);
720
query.reset(new_query);
721
_state.reset(new session::State(in_packet, in_packet_length));
726
bool Session::endTransaction(enum enum_mysql_completiontype completion)
730
TransactionServices &transaction_services= TransactionServices::singleton();
732
if (transaction.xid_state.xa_state != XA_NOTR)
734
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
741
* We don't use endActiveTransaction() here to ensure that this works
742
* even if there is a problem with the OPTION_AUTO_COMMIT flag
743
* (Which of course should never happen...)
745
server_status&= ~SERVER_STATUS_IN_TRANS;
746
if (transaction_services.commitTransaction(*this, true))
748
options&= ~(OPTION_BEGIN);
751
do_release= 1; /* fall through */
752
case COMMIT_AND_CHAIN:
753
result= endActiveTransaction();
754
if (result == true && completion == COMMIT_AND_CHAIN)
755
result= startTransaction();
757
case ROLLBACK_RELEASE:
758
do_release= 1; /* fall through */
760
case ROLLBACK_AND_CHAIN:
762
server_status&= ~SERVER_STATUS_IN_TRANS;
763
if (transaction_services.rollbackTransaction(*this, true))
765
options&= ~(OPTION_BEGIN);
766
if (result == true && (completion == ROLLBACK_AND_CHAIN))
767
result= startTransaction();
771
my_error(ER_UNKNOWN_COM_ERROR, MYF(0));
777
my_error(static_cast<drizzled::error_t>(killed_errno()), MYF(0));
779
else if ((result == true) && do_release)
781
setKilled(Session::KILL_CONNECTION);
787
bool Session::endActiveTransaction()
790
TransactionServices &transaction_services= TransactionServices::singleton();
792
if (transaction.xid_state.xa_state != XA_NOTR)
794
my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
797
if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
799
server_status&= ~SERVER_STATUS_IN_TRANS;
800
if (transaction_services.commitTransaction(*this, true))
803
options&= ~(OPTION_BEGIN);
807
bool Session::startTransaction(start_transaction_option_t opt)
811
assert(! inTransaction());
813
options|= OPTION_BEGIN;
814
server_status|= SERVER_STATUS_IN_TRANS;
816
if (plugin::TransactionalStorageEngine::notifyStartTransaction(this, opt))
916
thr_lock_info_init(&lock_info);
925
Session::cleanup_after_query()
928
This function is used to reset thread data to its default state.
931
This function is not suitable for setting thread data to some
932
non-default values, as there is only one replication thread, so
933
different master threads may overwrite data of each other on
824
937
void Session::cleanup_after_query()
827
Reset rand_used so that detection of calls to rand() will save random
940
Reset rand_used so that detection of calls to rand() will save random
828
941
seeds if needed by the slave.
831
944
/* Forget those values, for next binlogger: */
945
stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
832
946
auto_inc_intervals_in_cur_stmt_for_binlog.empty();
834
949
if (first_successful_insert_id_in_cur_stmt > 0)
836
951
/* set what LAST_INSERT_ID() will return */
837
first_successful_insert_id_in_prev_stmt= first_successful_insert_id_in_cur_stmt;
952
first_successful_insert_id_in_prev_stmt=
953
first_successful_insert_id_in_cur_stmt;
838
954
first_successful_insert_id_in_cur_stmt= 0;
839
955
substitute_null_with_insert_id= true;
842
arg_of_last_insert_id_function= false;
957
arg_of_last_insert_id_function= 0;
844
958
/* Free Items that were created during this execution */
848
_where= Session::DEFAULT_WHERE;
850
/* Reset the temporary shares we built */
851
for_each(temporary_shares.begin(),
852
temporary_shares.end(),
854
temporary_shares.clear();
961
where= Session::DEFAULT_WHERE;
858
966
Create a LEX_STRING in this connection.
920
1189
item->maybe_null= 1;
921
1190
field_list.push_back(new Item_empty_string("Extra", 255, cs));
922
return (result->send_fields(field_list));
925
void select_result::send_error(drizzled::error_t errcode, const char *err)
1191
return (result->send_fields(field_list,
1192
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF));
1196
struct Item_change_record: public ilink
1200
/* Placement new was hidden by `new' in ilink (TODO: check): */
1201
static void *operator new(size_t size __attribute__((unused)),
1204
static void operator delete(void *ptr __attribute__((unused)),
1205
size_t size __attribute__((unused)))
1207
static void operator delete(void *ptr __attribute__((unused)),
1208
void *mem __attribute__((unused)))
1209
{ /* never called */ }
1214
Register an item tree tree transformation, performed by the query
1215
optimizer. We need a pointer to runtime_memroot because it may be !=
1216
session->mem_root (this may no longer be a true statement)
1219
void Session::nocheck_register_item_tree_change(Item **place, Item *old_value,
1220
MEM_ROOT *runtime_memroot)
1222
Item_change_record *change;
1224
Now we use one node per change, which adds some memory overhead,
1225
but still is rather fast as we use alloc_root for allocations.
1226
A list of item tree changes of an average query should be short.
1228
void *change_mem= alloc_root(runtime_memroot, sizeof(*change));
1229
if (change_mem == 0)
1232
OOM, session->fatal_error() is called by the error handler of the
1233
memroot. Just return.
1237
change= new (change_mem) Item_change_record;
1238
change->place= place;
1239
change->old_value= old_value;
1240
change_list.append(change);
1244
void Session::rollback_item_tree_changes()
1246
I_List_iterator<Item_change_record> it(change_list);
1247
Item_change_record *change;
1249
while ((change= it++))
1250
*change->place= change->old_value;
1251
/* We can forget about changes memory: it's allocated in runtime memroot */
1252
change_list.empty();
1257
/*****************************************************************************
1258
** Functions to provide a interface to select results
1259
*****************************************************************************/
1261
select_result::select_result()
1263
session=current_session;
1266
void select_result::send_error(uint32_t errcode,const char *err)
927
1268
my_message(errcode, err, MYF(0));
1272
void select_result::cleanup()
1277
bool select_result::check_simple_select() const
1279
my_error(ER_SP_BAD_CURSOR_QUERY, MYF(0));
1284
static String default_line_term("\n",default_charset_info);
1285
static String default_escaped("\\",default_charset_info);
1286
static String default_field_term("\t",default_charset_info);
1288
sql_exchange::sql_exchange(char *name, bool flag,
1289
enum enum_filetype filetype_arg)
1290
:file_name(name), opt_enclosed(0), dumpfile(flag), skip_lines(0)
1292
filetype= filetype_arg;
1293
field_term= &default_field_term;
1294
enclosed= line_start= &my_empty_string;
1295
line_term= &default_line_term;
1296
escaped= &default_escaped;
1300
bool select_send::send_fields(List<Item> &list, uint32_t flags)
1303
if (!(res= session->protocol->send_fields(&list, flags)))
1304
is_result_set_started= 1;
1308
void select_send::abort()
1315
Cleanup an instance of this class for re-use
1316
at next execution of a prepared statement/
1317
stored procedure statement.
1320
void select_send::cleanup()
1322
is_result_set_started= false;
1325
/* Send data to client. Returns 0 if ok */
1327
bool select_send::send_data(List<Item> &items)
1329
if (unit->offset_limit_cnt)
1330
{ // using limit offset,count
1331
unit->offset_limit_cnt--;
1336
We may be passing the control from mysqld to the client: release the
1337
InnoDB adaptive hash S-latch to avoid thread deadlocks if it was reserved
1340
ha_release_temporary_latches(session);
1342
List_iterator_fast<Item> li(items);
1343
Protocol *protocol= session->protocol;
1344
char buff[MAX_FIELD_WIDTH];
1345
String buffer(buff, sizeof(buff), &my_charset_bin);
1347
protocol->prepare_for_resend();
1351
if (item->send(protocol, &buffer))
1353
protocol->free(); // Free used buffer
1354
my_message(ER_OUT_OF_RESOURCES, ER(ER_OUT_OF_RESOURCES), MYF(0));
1358
session->sent_row_count++;
1359
if (session->is_error())
1361
protocol->remove_last_row();
1364
if (session->vio_ok())
1365
return(protocol->write());
1369
bool select_send::send_eof()
1372
We may be passing the control from mysqld to the client: release the
1373
InnoDB adaptive hash S-latch to avoid thread deadlocks if it was reserved
1376
ha_release_temporary_latches(session);
1378
/* Unlock tables before sending packet to gain some speed */
1381
mysql_unlock_tables(session, session->lock);
1385
is_result_set_started= 0;
930
1390
/************************************************************************
931
1391
Handling writing to file
932
1392
************************************************************************/
934
void select_to_file::send_error(drizzled::error_t errcode,const char *err)
1394
void select_to_file::send_error(uint32_t errcode,const char *err)
936
1396
my_message(errcode, err, MYF(0));
939
(void) cache->end_io_cache();
940
(void) internal::my_close(file, MYF(0));
941
(void) internal::my_delete(path.file_string().c_str(), MYF(0)); // Delete file on error
1399
(void) end_io_cache(&cache);
1400
(void) my_close(file,MYF(0));
1401
(void) my_delete(path,MYF(0)); // Delete file on error
1617
2284
@param session Thread handle
1618
2285
@param all true <=> rollback main transaction.
1620
void Session::markTransactionForRollback(bool all)
1622
is_fatal_sub_stmt_error= true;
1623
transaction_rollback_request= all;
1626
void Session::disconnect(enum error_t errcode)
1628
/* Allow any plugins to cleanup their session variables */
1629
plugin_sessionvar_cleanup(this);
1631
/* If necessary, log any aborted or unauthorized connections */
1632
if (getKilled() || client->wasAborted())
1634
status_var.aborted_threads++;
1637
if (client->wasAborted())
1639
if (not getKilled() && variables.log_warnings > 1)
1641
errmsg_printf(error::WARN, ER(ER_NEW_ABORTING_CONNECTION)
1643
, (_schema->empty() ? "unconnected" : _schema->c_str())
1644
, security_ctx->username().empty() == false ? security_ctx->username().c_str() : "unauthenticated"
1645
, security_ctx->address().c_str()
1646
, (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1650
setKilled(Session::KILL_CONNECTION);
1652
if (client->isConnected())
1654
if (errcode != EE_OK)
1656
/*my_error(errcode, ER(errcode));*/
1657
client->sendError(errcode, ER(errcode));
1663
void Session::reset_for_next_command()
1668
Those two lines below are theoretically unneeded as
1669
Session::cleanup_after_query() should take care of this already.
1671
auto_inc_intervals_in_cur_stmt_for_binlog.empty();
1673
is_fatal_error= false;
1674
server_status&= ~ (SERVER_MORE_RESULTS_EXISTS |
1675
SERVER_QUERY_NO_INDEX_USED |
1676
SERVER_QUERY_NO_GOOD_INDEX_USED);
1679
main_da.reset_diagnostics_area();
1680
total_warn_count=0; // Warnings for this query
1681
sent_row_count= examined_row_count= 0;
1685
Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
1688
void Open_tables_state::close_temporary_tables()
1693
if (not temporary_tables)
1696
for (table= temporary_tables; table; table= tmp_next)
1698
tmp_next= table->getNext();
1701
temporary_tables= NULL;
1705
unlink from session->temporary tables and close temporary table
1708
void Open_tables_state::close_temporary_table(Table *table)
1710
if (table->getPrev())
1712
table->getPrev()->setNext(table->getNext());
1713
if (table->getPrev()->getNext())
1715
table->getNext()->setPrev(table->getPrev());
1720
/* removing the item from the list */
1721
assert(table == temporary_tables);
1723
slave must reset its temporary list pointer to zero to exclude
1724
passing non-zero value to end_slave via rli->save_temporary_tables
1725
when no temp tables opened, see an invariant below.
1727
temporary_tables= table->getNext();
1728
if (temporary_tables)
1730
table->getNext()->setPrev(NULL);
1737
Close and drop a temporary table
1740
This dosn't unlink table from session->temporary
1741
If this is needed, use close_temporary_table()
1744
void Open_tables_state::nukeTable(Table *table)
1746
plugin::StorageEngine *table_type= table->getShare()->db_type();
1748
table->free_io_cache();
1749
table->delete_table();
1751
identifier::Table identifier(table->getShare()->getSchemaName(), table->getShare()->getTableName(), table->getShare()->getPath());
1752
rm_temporary_table(table_type, identifier);
1754
boost::checked_delete(table->getMutableShare());
1756
boost::checked_delete(table);
1759
/** Clear most status variables. */
1760
extern time_t flush_status_time;
1762
void Session::refresh_status()
1764
/* Reset thread's status variables */
1765
memset(&status_var, 0, sizeof(status_var));
1767
flush_status_time= time((time_t*) 0);
1768
current_global_counters.max_used_connections= 1; /* We set it to one, because we know we exist */
1769
current_global_counters.connections= 0;
1772
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1774
return getVariable(std::string(name.str, name.length), create_if_not_exists);
1777
user_var_entry *Session::getVariable(const std::string &name, bool create_if_not_exists)
1782
UserVars::iterator iter= user_vars.find(name);
1783
if (iter != user_vars.end())
1784
return (*iter).second;
1786
if (not create_if_not_exists)
1789
user_var_entry *entry= NULL;
1790
entry= new (nothrow) user_var_entry(name.c_str(), query_id);
1795
std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(name, entry));
1797
if (not returnable.second)
1799
boost::checked_delete(entry);
1805
void Session::setVariable(const std::string &name, const std::string &value)
1807
user_var_entry *updateable_var= getVariable(name.c_str(), true);
1810
updateable_var->update_hash(false,
1811
(void*)value.c_str(),
1812
static_cast<uint32_t>(value.length()), STRING_RESULT,
1814
DERIVATION_IMPLICIT, false);
1818
void Open_tables_state::mark_temp_tables_as_free_for_reuse()
1820
for (Table *table= temporary_tables ; table ; table= table->getNext())
1822
if (table->query_id == getQueryId())
1825
table->cursor->ha_reset();
1830
void Session::mark_used_tables_as_free_for_reuse(Table *table)
1832
for (; table ; table= table->getNext())
1834
if (table->query_id == getQueryId())
1837
table->cursor->ha_reset();
1843
Unlocks tables and frees derived tables.
1844
Put all normal tables used by thread in free list.
1846
It will only close/mark as free for reuse tables opened by this
1847
substatement, it will also check if we are closing tables after
1848
execution of complete query (i.e. we are on upper level) and will
1849
leave prelocked mode if needed.
1851
void Session::close_thread_tables()
1853
clearDerivedTables();
1856
Mark all temporary tables used by this statement as free for reuse.
1858
mark_temp_tables_as_free_for_reuse();
1860
Let us commit transaction for statement. Since in 5.0 we only have
1861
one statement transaction and don't allow several nested statement
1862
transactions this call will do nothing if we are inside of stored
1863
function or trigger (i.e. statement transaction is already active and
1864
does not belong to statement for which we do close_thread_tables()).
1865
TODO: This should be fixed in later releases.
1868
TransactionServices &transaction_services= TransactionServices::singleton();
1869
main_da.can_overwrite_status= true;
1870
transaction_services.autocommitOrRollback(*this, is_error());
1871
main_da.can_overwrite_status= false;
1872
transaction.stmt.reset();
1878
For RBR we flush the pending event just before we unlock all the
1879
tables. This means that we are at the end of a topmost
1880
statement, so we ensure that the STMT_END_F flag is set on the
1881
pending event. For statements that are *inside* stored
1882
functions, the pending event will not be flushed: that will be
1883
handled either before writing a query log event (inside
1884
binlog_query()) or when preparing a pending event.
1890
Note that we need to hold table::Cache::singleton().mutex() while changing the
1891
open_tables list. Another thread may work on it.
1892
(See: table::Cache::singleton().removeTable(), wait_completed_table())
1893
Closing a MERGE child before the parent would be fatal if the
1894
other thread tries to abort the MERGE lock in between.
1897
close_open_tables();
1900
void Session::close_tables_for_reopen(TableList **tables)
1903
If table list consists only from tables from prelocking set, table list
1904
for new attempt should be empty, so we have to update list's root pointer.
1906
if (lex->first_not_own_table() == *tables)
1908
lex->chop_off_not_own_tables();
1909
for (TableList *tmp= *tables; tmp; tmp= tmp->next_global)
1911
close_thread_tables();
1914
bool Session::openTablesLock(TableList *tables)
1921
if (open_tables_from_list(&tables, &counter))
1924
if (not lock_tables(tables, counter, &need_reopen))
1927
if (not need_reopen)
1930
close_tables_for_reopen(&tables);
1933
if ((handle_derived(lex, &derived_prepare) || (handle_derived(lex, &derived_filling))))
1940
@note "best_effort" is used in cases were if a failure occurred on this
1941
operation it would not be surprising because we are only removing because there
1942
might be an issue (lame engines).
1945
bool Open_tables_state::rm_temporary_table(const identifier::Table &identifier, bool best_effort)
1947
if (not plugin::StorageEngine::dropTable(*static_cast<Session *>(this), identifier))
1949
if (not best_effort)
1952
identifier.getSQLPath(path);
1953
errmsg_printf(error::WARN, _("Could not remove temporary table: '%s', error: %d"),
1954
path.c_str(), errno);
1963
bool Open_tables_state::rm_temporary_table(plugin::StorageEngine *base, const identifier::Table &identifier)
1965
drizzled::error_t error;
1968
if (not plugin::StorageEngine::dropTable(*static_cast<Session *>(this), *base, identifier, error))
1971
identifier.getSQLPath(path);
1972
errmsg_printf(error::WARN, _("Could not remove temporary table: '%s', error: %d"),
1973
path.c_str(), error);
1982
@note this will be removed, I am looking through Hudson to see if it is finding
1983
any tables that are missed during cleanup.
1985
void Open_tables_state::dumpTemporaryTableNames(const char *foo)
1989
if (not temporary_tables)
1992
cerr << "Begin Run: " << foo << "\n";
1993
for (table= temporary_tables; table; table= table->getNext())
1995
bool have_proto= false;
1997
message::Table *proto= table->getShare()->getTableMessage();
1998
if (table->getShare()->getTableMessage())
2001
const char *answer= have_proto ? "true" : "false";
2005
cerr << "\tTable Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2006
cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
2010
cerr << "\tTabl;e Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2015
table::Singular *Session::getInstanceTable()
2017
temporary_shares.push_back(new table::Singular()); // This will not go into the tableshare cache, so no key is used.
2019
table::Singular *tmp_share= temporary_shares.back();
2028
Create a reduced Table object with properly set up Field list from a
2029
list of field definitions.
2031
The created table doesn't have a table Cursor associated with
2032
it, has no keys, no group/distinct, no copy_funcs array.
2033
The sole purpose of this Table object is to use the power of Field
2034
class to read/write data to/from table->getInsertRecord(). Then one can store
2035
the record in any container (RB tree, hash, etc).
2036
The table is created in Session mem_root, so are the table's fields.
2037
Consequently, if you don't BLOB fields, you don't need to free it.
2039
@param session connection handle
2040
@param field_list list of column definitions
2043
0 if out of memory, Table object in case of success
2045
table::Singular *Session::getInstanceTable(List<CreateField> &field_list)
2047
temporary_shares.push_back(new table::Singular(this, field_list)); // This will not go into the tableshare cache, so no key is used.
2049
table::Singular *tmp_share= temporary_shares.back();
2058
static const std::string NONE= "NONE";
2059
static const std::string GOT_GLOBAL_READ_LOCK= "HAS GLOBAL READ LOCK";
2060
static const std::string MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT= "HAS GLOBAL READ LOCK WITH BLOCKING COMMIT";
2062
const std::string &type(drizzled::Session::global_read_lock_t type)
2068
case Session::GOT_GLOBAL_READ_LOCK:
2069
return GOT_GLOBAL_READ_LOCK;
2070
case Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT:
2071
return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT;
2075
size_t max_string_length(drizzled::Session::global_read_lock_t)
2077
return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT.size();
2080
} /* namespace display */
2082
} /* namespace drizzled */
2288
void mark_transaction_to_rollback(Session *session, bool all)
2292
session->is_fatal_sub_stmt_error= true;
2293
session->transaction_rollback_request= all;
2296
/***************************************************************************
2297
Handling of XA id cacheing
2298
***************************************************************************/
2300
pthread_mutex_t LOCK_xid_cache;
2303
extern "C" unsigned char *xid_get_hash_key(const unsigned char *, size_t *, bool);
2304
extern "C" void xid_free_hash(void *);
2306
unsigned char *xid_get_hash_key(const unsigned char *ptr, size_t *length,
2307
bool not_used __attribute__((unused)))
2309
*length=((XID_STATE*)ptr)->xid.key_length();
2310
return ((XID_STATE*)ptr)->xid.key();
2313
void xid_free_hash(void *ptr)
2315
if (!((XID_STATE*)ptr)->in_session)
2316
free((unsigned char*)ptr);
2319
bool xid_cache_init()
2321
pthread_mutex_init(&LOCK_xid_cache, MY_MUTEX_INIT_FAST);
2322
return hash_init(&xid_cache, &my_charset_bin, 100, 0, 0,
2323
xid_get_hash_key, xid_free_hash, 0) != 0;
2326
void xid_cache_free()
2328
if (hash_inited(&xid_cache))
2330
hash_free(&xid_cache);
2331
pthread_mutex_destroy(&LOCK_xid_cache);
2335
XID_STATE *xid_cache_search(XID *xid)
2337
pthread_mutex_lock(&LOCK_xid_cache);
2338
XID_STATE *res=(XID_STATE *)hash_search(&xid_cache, xid->key(), xid->key_length());
2339
pthread_mutex_unlock(&LOCK_xid_cache);
2344
bool xid_cache_insert(XID *xid, enum xa_states xa_state)
2348
pthread_mutex_lock(&LOCK_xid_cache);
2349
if (hash_search(&xid_cache, xid->key(), xid->key_length()))
2351
else if (!(xs=(XID_STATE *)my_malloc(sizeof(*xs), MYF(MY_WME))))
2355
xs->xa_state=xa_state;
2358
res=my_hash_insert(&xid_cache, (unsigned char*)xs);
2360
pthread_mutex_unlock(&LOCK_xid_cache);
2365
bool xid_cache_insert(XID_STATE *xid_state)
2367
pthread_mutex_lock(&LOCK_xid_cache);
2368
assert(hash_search(&xid_cache, xid_state->xid.key(),
2369
xid_state->xid.key_length())==0);
2370
bool res=my_hash_insert(&xid_cache, (unsigned char*)xid_state);
2371
pthread_mutex_unlock(&LOCK_xid_cache);
2376
void xid_cache_delete(XID_STATE *xid_state)
2378
pthread_mutex_lock(&LOCK_xid_cache);
2379
hash_delete(&xid_cache, (unsigned char *)xid_state);
2380
pthread_mutex_unlock(&LOCK_xid_cache);
2384
Implementation of interface to write rows to the binary log through the
2385
thread. The thread is responsible for writing the rows it has
2386
inserted/updated/deleted.
2391
Template member function for ensuring that there is an rows log
2392
event of the apropriate type before proceeding.
2395
- Events of type 'RowEventT' have the type code 'type_code'.
2398
If a non-NULL pointer is returned, the pending event for thread 'session' will
2399
be an event of type 'RowEventT' (which have the type code 'type_code')
2400
will either empty or have enough space to hold 'needed' bytes. In
2401
addition, the columns bitmap will be correct for the row, meaning that
2402
the pending event will be flushed if the columns in the event differ from
2403
the columns suppled to the function.
2406
If no error, a non-NULL pending event (either one which already existed or
2407
the newly created one).
2411
template <class RowsEventT> Rows_log_event*
2412
Session::binlog_prepare_pending_rows_event(Table* table, uint32_t serv_id,
2414
bool is_transactional,
2415
RowsEventT *hint __attribute__((unused)))
2417
/* Pre-conditions */
2418
assert(table->s->table_map_id != UINT32_MAX);
2420
/* Fetch the type code for the RowsEventT template parameter */
2421
int const type_code= RowsEventT::TYPE_CODE;
2424
There is no good place to set up the transactional data, so we
2427
if (binlog_setup_trx_data())
2430
Rows_log_event* pending= binlog_get_pending_rows_event();
2432
if (unlikely(pending && !pending->is_valid()))
2436
Check if the current event is non-NULL and a write-rows
2437
event. Also check if the table provided is mapped: if it is not,
2438
then we have switched to writing to a new table.
2439
If there is no pending event, we need to create one. If there is a pending
2440
event, but it's not about the same table id, or not of the same type
2441
(between Write, Update and Delete), or not the same affected columns, or
2442
going to be too big, flush this event to disk and create a new pending
2445
The last test is necessary for the Cluster injector to work
2446
correctly. The reason is that the Cluster can inject two write
2447
rows with different column bitmaps if there is an insert followed
2448
by an update in the same transaction, and these are grouped into a
2449
single epoch/transaction when fed to the injector.
2451
TODO: Fix the code so that the last test can be removed.
2454
pending->server_id != serv_id ||
2455
pending->get_table_id() != table->s->table_map_id ||
2456
pending->get_type_code() != type_code ||
2457
pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
2458
!bitmap_cmp(pending->get_cols(), table->write_set))
2460
/* Create a new RowsEventT... */
2461
Rows_log_event* const
2462
ev= new RowsEventT(this, table, table->s->table_map_id,
2466
ev->server_id= serv_id; // I don't like this, it's too easy to forget.
2468
flush the pending event and replace it with the newly created
2471
if (unlikely(mysql_bin_log.flush_and_set_pending_rows_event(this, ev)))
2477
return(ev); /* This is the new pending event */
2479
return(pending); /* This is the current pending event */
2482
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
2484
Instantiate the versions we need, we have -fno-implicit-template as
2487
template Rows_log_event*
2488
Session::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
2489
Write_rows_log_event*);
2491
template Rows_log_event*
2492
Session::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
2493
Delete_rows_log_event *);
2495
template Rows_log_event*
2496
Session::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
2497
Update_rows_log_event *);
2502
Class to handle temporary allocation of memory for row data.
2504
The responsibilities of the class is to provide memory for
2505
packing one or two rows of packed data (depending on what
2506
constructor is called).
2508
In order to make the allocation more efficient for "simple" rows,
2509
i.e., rows that do not contain any blobs, a pointer to the
2510
allocated memory is of memory is stored in the table structure
2511
for simple rows. If memory for a table containing a blob field
2512
is requested, only memory for that is allocated, and subsequently
2513
released when the object is destroyed.
2516
class Row_data_memory {
2519
Build an object to keep track of a block-local piece of memory
2520
for storing a row of data.
2523
Table where the pre-allocated memory is stored.
2526
Length of data that is needed, if the record contain blobs.
2528
Row_data_memory(Table *table, size_t const len1)
2531
m_alloc_checked= false;
2532
allocate_memory(table, len1);
2533
m_ptr[0]= has_memory() ? m_memory : 0;
2537
Row_data_memory(Table *table, size_t const len1, size_t const len2)
2540
m_alloc_checked= false;
2541
allocate_memory(table, len1 + len2);
2542
m_ptr[0]= has_memory() ? m_memory : 0;
2543
m_ptr[1]= has_memory() ? m_memory + len1 : 0;
2548
if (m_memory != 0 && m_release_memory_on_destruction)
2549
free((unsigned char*) m_memory);
2553
Is there memory allocated?
2555
@retval true There is memory allocated
2556
@retval false Memory allocation failed
2558
bool has_memory() const {
2559
m_alloc_checked= true;
2560
return m_memory != 0;
2563
unsigned char *slot(uint32_t s)
2565
assert(s < sizeof(m_ptr)/sizeof(*m_ptr));
2566
assert(m_ptr[s] != 0);
2567
assert(m_alloc_checked == true);
2572
void allocate_memory(Table *const table, size_t const total_length)
2574
if (table->s->blob_fields == 0)
2577
The maximum length of a packed record is less than this
2578
length. We use this value instead of the supplied length
2579
when allocating memory for records, since we don't know how
2580
the memory will be used in future allocations.
2582
Since table->s->reclength is for unpacked records, we have
2583
to add two bytes for each field, which can potentially be
2584
added to hold the length of a packed field.
2586
size_t const maxlen= table->s->reclength + 2 * table->s->fields;
2589
Allocate memory for two records if memory hasn't been
2590
allocated. We allocate memory for two records so that it can
2591
be used when processing update rows as well.
2593
if (table->write_row_record == 0)
2594
table->write_row_record=
2595
(unsigned char *) alloc_root(&table->mem_root, 2 * maxlen);
2596
m_memory= table->write_row_record;
2597
m_release_memory_on_destruction= false;
2601
m_memory= (unsigned char *) my_malloc(total_length, MYF(MY_WME));
2602
m_release_memory_on_destruction= true;
2606
mutable bool m_alloc_checked;
2607
bool m_release_memory_on_destruction;
2608
unsigned char *m_memory;
2609
unsigned char *m_ptr[2];
2614
int Session::binlog_write_row(Table* table, bool is_trans,
2615
unsigned char const *record)
2617
assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
2620
Pack records into format for transfer. We are allocating more
2621
memory than needed, but that doesn't matter.
2623
Row_data_memory memory(table, table->max_row_length(record));
2624
if (!memory.has_memory())
2625
return HA_ERR_OUT_OF_MEM;
2627
unsigned char *row_data= memory.slot(0);
2629
size_t const len= pack_row(table, table->write_set, row_data, record);
2631
Rows_log_event* const ev=
2632
binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
2633
static_cast<Write_rows_log_event*>(0));
2635
if (unlikely(ev == 0))
2636
return HA_ERR_OUT_OF_MEM;
2638
return ev->add_row_data(row_data, len);
2641
int Session::binlog_update_row(Table* table, bool is_trans,
2642
const unsigned char *before_record,
2643
const unsigned char *after_record)
2645
assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
2647
size_t const before_maxlen = table->max_row_length(before_record);
2648
size_t const after_maxlen = table->max_row_length(after_record);
2650
Row_data_memory row_data(table, before_maxlen, after_maxlen);
2651
if (!row_data.has_memory())
2652
return HA_ERR_OUT_OF_MEM;
2654
unsigned char *before_row= row_data.slot(0);
2655
unsigned char *after_row= row_data.slot(1);
2657
size_t const before_size= pack_row(table, table->read_set, before_row,
2659
size_t const after_size= pack_row(table, table->write_set, after_row,
2662
Rows_log_event* const ev=
2663
binlog_prepare_pending_rows_event(table, server_id,
2664
before_size + after_size, is_trans,
2665
static_cast<Update_rows_log_event*>(0));
2667
if (unlikely(ev == 0))
2668
return HA_ERR_OUT_OF_MEM;
2671
ev->add_row_data(before_row, before_size) ||
2672
ev->add_row_data(after_row, after_size);
2675
int Session::binlog_delete_row(Table* table, bool is_trans,
2676
unsigned char const *record)
2678
assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
2681
Pack records into format for transfer. We are allocating more
2682
memory than needed, but that doesn't matter.
2684
Row_data_memory memory(table, table->max_row_length(record));
2685
if (unlikely(!memory.has_memory()))
2686
return HA_ERR_OUT_OF_MEM;
2688
unsigned char *row_data= memory.slot(0);
2690
size_t const len= pack_row(table, table->read_set, row_data, record);
2692
Rows_log_event* const ev=
2693
binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
2694
static_cast<Delete_rows_log_event*>(0));
2696
if (unlikely(ev == 0))
2697
return HA_ERR_OUT_OF_MEM;
2699
return ev->add_row_data(row_data, len);
2703
int Session::binlog_flush_pending_rows_event(bool stmt_end)
2706
We shall flush the pending event even if we are not in row-based
2707
mode: it might be the case that we left row-based mode before
2708
flushing anything (e.g., if we have explicitly locked tables).
2710
if (!mysql_bin_log.is_open())
2714
Mark the event as the last event of a statement if the stmt_end
2718
if (Rows_log_event *pending= binlog_get_pending_rows_event())
2722
pending->set_flags(Rows_log_event::STMT_END_F);
2723
pending->flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
2724
binlog_table_maps= 0;
2727
error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0);
2735
Member function that will log query, either row-based or
2736
statement-based depending on the value of the 'current_stmt_binlog_row_based'
2737
the value of the 'qtype' flag.
2739
This function should be called after the all calls to ha_*_row()
2740
functions have been issued, but before tables are unlocked and
2744
There shall be no writes to any system table after calling
2745
binlog_query(), so these writes has to be moved to before the call
2746
of binlog_query() for correct functioning.
2748
This is necessesary not only for RBR, but the master might crash
2749
after binlogging the query but before changing the system tables.
2750
This means that the slave and the master are not in the same state
2751
(after the master has restarted), so therefore we have to
2752
eliminate this problem.
2755
Error code, or 0 if no error.
2757
int Session::binlog_query(Session::enum_binlog_query_type qtype, char const *query_arg,
2758
ulong query_len, bool is_trans, bool suppress_use,
2759
Session::killed_state killed_status_arg)
2761
assert(query_arg && mysql_bin_log.is_open());
2763
if (int error= binlog_flush_pending_rows_event(true))
2767
If we are in statement mode and trying to log an unsafe statement,
2768
we should print a warning.
2770
if (lex->is_stmt_unsafe() &&
2771
variables.binlog_format == BINLOG_FORMAT_STMT)
2773
assert(this->query != NULL);
2774
push_warning(this, DRIZZLE_ERROR::WARN_LEVEL_WARN,
2775
ER_BINLOG_UNSAFE_STATEMENT,
2776
ER(ER_BINLOG_UNSAFE_STATEMENT));
2777
if (!(binlog_flags & BINLOG_FLAG_UNSAFE_STMT_PRINTED))
2779
char warn_buf[DRIZZLE_ERRMSG_SIZE];
2780
snprintf(warn_buf, DRIZZLE_ERRMSG_SIZE, "%s Statement: %s",
2781
ER(ER_BINLOG_UNSAFE_STATEMENT), this->query);
2782
sql_print_warning("%s",warn_buf);
2783
binlog_flags|= BINLOG_FLAG_UNSAFE_STMT_PRINTED;
2788
case Session::ROW_QUERY_TYPE:
2789
if (current_stmt_binlog_row_based)
2791
/* Otherwise, we fall through */
2792
case Session::DRIZZLE_QUERY_TYPE:
2794
Using this query type is a conveniece hack, since we have been
2795
moving back and forth between using RBR for replication of
2796
system tables and not using it.
2798
Make sure to change in check_table_binlog_row_based() according
2799
to how you treat this.
2801
case Session::STMT_QUERY_TYPE:
2803
The DRIZZLE_LOG::write() function will set the STMT_END_F flag and
2804
flush the pending rows event if necessary.
2807
Query_log_event qinfo(this, query_arg, query_len, is_trans, suppress_use,
2809
qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
2811
Binlog table maps will be irrelevant after a Query_log_event
2812
(they are just removed on the slave side) so after the query
2813
log event is written to the binary log, we pretend that no
2814
table maps were written.
2816
int error= mysql_bin_log.write(&qinfo);
2817
binlog_table_maps= 0;
2822
case Session::QUERY_TYPE_COUNT:
2824
assert(0 <= qtype && qtype < QUERY_TYPE_COUNT);
2829
bool Discrete_intervals_list::append(uint64_t start, uint64_t val,
2832
/* first, see if this can be merged with previous */
2833
if ((head == NULL) || tail->merge_if_contiguous(start, val, incr))
2835
/* it cannot, so need to add a new interval */
2836
Discrete_interval *new_interval= new Discrete_interval(start, val, incr);
2837
return(append(new_interval));
2842
bool Discrete_intervals_list::append(Discrete_interval *new_interval)
2844
if (unlikely(new_interval == NULL))
2847
head= current= new_interval;
2849
tail->next= new_interval;