~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/session.cc

  • Committer: Monty Taylor
  • Date: 2009-12-01 17:50:17 UTC
  • mto: (1235.1.1 push)
  • mto: This revision was merged to the branch mainline in revision 1236.
  • Revision ID: mordred@inaugust.com-20091201175017-o9yed6ssdiolghv4
Renamed instances of HEAP engine to MEMORY. Removed the alias.

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
 * @file Implementation of the Session class and API
22
22
 */
23
23
 
24
 
#include "config.h"
25
 
#include "drizzled/session.h"
26
 
#include "drizzled/session/cache.h"
 
24
#include <drizzled/server_includes.h>
 
25
#include <drizzled/session.h>
27
26
#include <sys/stat.h>
28
 
#include "drizzled/error.h"
29
 
#include "drizzled/gettext.h"
30
 
#include "drizzled/query_id.h"
31
 
#include "drizzled/data_home.h"
32
 
#include "drizzled/sql_base.h"
33
 
#include "drizzled/lock.h"
34
 
#include "drizzled/item/cache.h"
35
 
#include "drizzled/item/float.h"
36
 
#include "drizzled/item/return_int.h"
37
 
#include "drizzled/item/empty_string.h"
38
 
#include "drizzled/show.h"
39
 
#include "drizzled/plugin/client.h"
 
27
#include <mysys/mysys_err.h>
 
28
#include <drizzled/error.h>
 
29
#include <drizzled/gettext.h>
 
30
#include <drizzled/query_id.h>
 
31
#include <drizzled/data_home.h>
 
32
#include <drizzled/sql_base.h>
 
33
#include <drizzled/lock.h>
 
34
#include <drizzled/item/cache.h>
 
35
#include <drizzled/item/float.h>
 
36
#include <drizzled/item/return_int.h>
 
37
#include <drizzled/item/empty_string.h>
 
38
#include <drizzled/show.h>
 
39
#include <drizzled/plugin/client.h>
40
40
#include "drizzled/plugin/scheduler.h"
41
41
#include "drizzled/plugin/authentication.h"
42
 
#include "drizzled/plugin/logging.h"
43
 
#include "drizzled/plugin/transactional_storage_engine.h"
44
 
#include "drizzled/plugin/query_rewrite.h"
45
42
#include "drizzled/probes.h"
46
43
#include "drizzled/table_proto.h"
47
 
#include "drizzled/db.h"
48
 
#include "drizzled/pthread_globals.h"
49
 
#include "drizzled/transaction_services.h"
50
 
#include "drizzled/drizzled.h"
51
 
 
52
 
#include "drizzled/table/instance.h"
53
 
 
54
 
#include "plugin/myisam/myisam.h"
55
 
#include "drizzled/internal/iocache.h"
56
 
#include "drizzled/internal/thread_var.h"
57
 
#include "drizzled/plugin/event_observer.h"
58
 
 
59
 
#include "drizzled/util/functors.h"
60
 
 
61
 
#include "drizzled/display.h"
62
 
 
63
 
#include <fcntl.h>
 
44
 
64
45
#include <algorithm>
65
 
#include <climits>
66
 
#include <boost/filesystem.hpp>
67
 
 
68
 
#include "drizzled/util/backtrace.h"
69
46
 
70
47
using namespace std;
 
48
using namespace drizzled;
71
49
 
72
 
namespace fs=boost::filesystem;
73
 
namespace drizzled
 
50
extern "C"
74
51
{
 
52
  unsigned char *get_var_key(user_var_entry *entry, size_t *length, bool );
 
53
  void free_user_var(user_var_entry *entry);
 
54
}
75
55
 
76
56
/*
77
57
  The following is used to initialise Table_ident with a internal
81
61
char empty_c_string[1]= {0};    /* used for not defined db */
82
62
 
83
63
const char * const Session::DEFAULT_WHERE= "field list";
 
64
extern pthread_key_t THR_Session;
 
65
extern pthread_key_t THR_Mem_root;
 
66
extern uint32_t max_used_connections;
 
67
extern drizzled::atomic<uint32_t> connection_count;
 
68
 
 
69
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
70
/* Used templates */
 
71
template class List<Key>;
 
72
template class List_iterator<Key>;
 
73
template class List<Key_part_spec>;
 
74
template class List_iterator<Key_part_spec>;
 
75
template class List<AlterDrop>;
 
76
template class List_iterator<AlterDrop>;
 
77
template class List<AlterColumn>;
 
78
template class List_iterator<AlterColumn>;
 
79
#endif
 
80
 
 
81
/****************************************************************************
 
82
** User variables
 
83
****************************************************************************/
 
84
unsigned char *get_var_key(user_var_entry *entry, size_t *length, bool )
 
85
{
 
86
  *length= entry->name.length;
 
87
  return (unsigned char*) entry->name.str;
 
88
}
 
89
 
 
90
void free_user_var(user_var_entry *entry)
 
91
{
 
92
  delete entry;
 
93
}
84
94
 
85
95
bool Key_part_spec::operator==(const Key_part_spec& other) const
86
96
{
87
97
  return length == other.length &&
88
98
         field_name.length == other.field_name.length &&
89
 
    !my_strcasecmp(system_charset_info, field_name.str, other.field_name.str);
 
99
         !strcmp(field_name.str, other.field_name.str);
90
100
}
91
101
 
92
 
Open_tables_state::Open_tables_state(uint64_t version_arg) :
93
 
  version(version_arg)
 
102
Open_tables_state::Open_tables_state(uint64_t version_arg)
 
103
  :version(version_arg), backups_available(false)
94
104
{
95
 
  open_tables= temporary_tables= derived_tables= NULL;
96
 
  extra_lock= lock= NULL;
 
105
  reset_open_tables_state();
97
106
}
98
107
 
99
108
/*
100
109
  The following functions form part of the C plugin API
101
110
*/
102
 
int mysql_tmpfile(const char *prefix)
 
111
extern "C" int mysql_tmpfile(const char *prefix)
103
112
{
104
113
  char filename[FN_REFLEN];
105
 
  int fd = internal::create_temp_file(filename, drizzle_tmpdir.c_str(), prefix, MYF(MY_WME));
 
114
  File fd = create_temp_file(filename, drizzle_tmpdir, prefix, MYF(MY_WME));
106
115
  if (fd >= 0) {
107
116
    unlink(filename);
108
117
  }
110
119
  return fd;
111
120
}
112
121
 
 
122
extern "C"
113
123
int session_tablespace_op(const Session *session)
114
124
{
115
125
  return test(session->tablespace_op);
123
133
 
124
134
   @see Session::set_proc_info
125
135
 */
126
 
void set_session_proc_info(Session *session, const char *info)
 
136
extern "C" void
 
137
set_session_proc_info(Session *session, const char *info)
127
138
{
128
139
  session->set_proc_info(info);
129
140
}
130
141
 
 
142
extern "C"
131
143
const char *get_session_proc_info(Session *session)
132
144
{
133
145
  return session->get_proc_info();
134
146
}
135
147
 
136
 
void **Session::getEngineData(const plugin::MonitoredInTransaction *monitored)
137
 
{
138
 
  return static_cast<void **>(&ha_data[monitored->getId()].ha_ptr);
139
 
}
140
 
 
141
 
ResourceContext *Session::getResourceContext(const plugin::MonitoredInTransaction *monitored,
142
 
                                             size_t index)
143
 
{
144
 
  return &ha_data[monitored->getId()].resource_context[index];
145
 
}
146
 
 
 
148
extern "C"
 
149
void **session_ha_data(const Session *session, const plugin::StorageEngine *engine)
 
150
{
 
151
  return (void **) &session->ha_data[engine->slot].ha_ptr;
 
152
}
 
153
 
 
154
extern "C"
147
155
int64_t session_test_options(const Session *session, int64_t test_options)
148
156
{
149
157
  return session->options & test_options;
150
158
}
151
159
 
 
160
extern "C"
152
161
int session_sql_command(const Session *session)
153
162
{
154
163
  return (int) session->lex->sql_command;
155
164
}
156
165
 
157
 
enum_tx_isolation session_tx_isolation(const Session *session)
 
166
extern "C"
 
167
int session_tx_isolation(const Session *session)
158
168
{
159
 
  return (enum_tx_isolation)session->variables.tx_isolation;
 
169
  return (int) session->variables.tx_isolation;
160
170
}
161
171
 
162
 
Session::Session(plugin::Client *client_arg) :
 
172
Session::Session(plugin::Client *client_arg)
 
173
  :
163
174
  Open_tables_state(refresh_version),
164
175
  mem_root(&main_mem_root),
165
 
  xa_id(0),
166
176
  lex(&main_lex),
167
 
  query(new std::string),
168
 
  _schema(new std::string("")),
169
 
  catalog("LOCAL"),
170
177
  client(client_arg),
171
178
  scheduler(NULL),
172
179
  scheduler_arg(NULL),
173
180
  lock_id(&main_lock_id),
174
181
  user_time(0),
175
 
  ha_data(plugin::num_trx_monitored_objects),
176
 
  concurrent_execute_allowed(true),
177
182
  arg_of_last_insert_id_function(false),
178
183
  first_successful_insert_id_in_prev_stmt(0),
179
184
  first_successful_insert_id_in_cur_stmt(0),
180
185
  limit_found_rows(0),
181
 
  _global_read_lock(NONE),
182
 
  _killed(NOT_KILLED),
 
186
  global_read_lock(0),
183
187
  some_tables_deleted(false),
184
188
  no_errors(false),
185
189
  password(false),
191
195
  m_lip(NULL),
192
196
  cached_table(0),
193
197
  transaction_message(NULL),
194
 
  statement_message(NULL),
195
 
  session_event_observers(NULL),
196
 
  use_usage(false)
 
198
  statement_message(NULL)
197
199
{
 
200
  memset(process_list_info, 0, PROCESS_LIST_WIDTH);
198
201
  client->setSession(this);
199
202
 
200
203
  /*
202
205
    the destructor works OK in case of an error. The main_mem_root
203
206
    will be re-initialized in init_for_queries().
204
207
  */
205
 
  memory::init_sql_alloc(&main_mem_root, memory::ROOT_MIN_BLOCK_SIZE, 0);
 
208
  init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0);
206
209
  thread_stack= NULL;
207
 
  count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
 
210
  count_cuted_fields= CHECK_FIELD_IGNORE;
 
211
  killed= NOT_KILLED;
208
212
  col_access= 0;
209
213
  tmp_table= 0;
210
214
  used_tables= 0;
220
224
  thread_id= 0;
221
225
  file_id = 0;
222
226
  query_id= 0;
223
 
  warn_query_id= 0;
 
227
  query= NULL;
 
228
  query_length= 0;
 
229
  warn_id= 0;
 
230
  memset(ha_data, 0, sizeof(ha_data));
 
231
  replication_data= 0;
224
232
  mysys_var= 0;
225
 
  scoreboard_index= -1;
226
233
  dbug_sentry=Session_SENTRY_MAGIC;
227
 
  cleanup_done= abort_on_warning= no_warnings_for_error= false;  
228
 
 
229
 
  /* query_cache init */
230
 
  query_cache_key= "";
231
 
  resultset= NULL;
 
234
  cleanup_done= abort_on_warning= no_warnings_for_error= false;
 
235
  transaction.on= 1;
 
236
  pthread_mutex_init(&LOCK_delete, MY_MUTEX_INIT_FAST);
232
237
 
233
238
  /* Variables with default values */
234
239
  proc_info="login";
250
255
  else
251
256
    options &= ~OPTION_BIG_SELECTS;
252
257
 
 
258
  transaction.all.modified_non_trans_table= transaction.stmt.modified_non_trans_table= false;
253
259
  open_options=ha_open_options;
254
260
  update_lock_default= TL_WRITE;
255
261
  session_tx_isolation= (enum_tx_isolation) variables.tx_isolation;
259
265
  memset(&status_var, 0, sizeof(status_var));
260
266
 
261
267
  /* Initialize sub structures */
262
 
  memory::init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
 
268
  init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
 
269
  hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0,
 
270
            (hash_get_key) get_var_key,
 
271
            (hash_free_key) free_user_var, 0);
263
272
 
264
273
  substitute_null_with_insert_id = false;
265
 
  lock_info.init(); /* safety: will be reset after start */
 
274
  thr_lock_info_init(&lock_info); /* safety: will be reset after start */
266
275
  thr_lock_owner_init(&main_lock_id, &lock_info);
267
276
 
268
277
  m_internal_handler= NULL;
269
 
  
270
 
  plugin::EventObserver::registerSessionEvents(*this); 
271
278
}
272
279
 
273
280
void Session::free_items()
274
281
{
275
282
  Item *next;
276
 
  /* This works because items are allocated with memory::sql_alloc() */
 
283
  /* This works because items are allocated with sql_alloc() */
277
284
  for (; free_list; free_list= next)
278
285
  {
279
286
    next= free_list->next;
302
309
  return false;                                 // 'false', as per coding style
303
310
}
304
311
 
305
 
void Session::setAbort(bool arg)
306
 
{
307
 
  mysys_var->abort= arg;
308
 
}
309
 
 
310
 
void Session::lockOnSys()
311
 
{
312
 
  if (not mysys_var)
313
 
    return;
314
 
 
315
 
  setAbort(true);
316
 
  boost_unique_lock_t scopedLock(mysys_var->mutex);
317
 
  if (mysys_var->current_cond)
318
 
  {
319
 
    mysys_var->current_mutex->lock();
320
 
    mysys_var->current_cond->notify_all();
321
 
    mysys_var->current_mutex->unlock();
322
 
  }
323
 
}
324
 
 
325
312
void Session::pop_internal_handler()
326
313
{
327
314
  assert(m_internal_handler != NULL);
328
315
  m_internal_handler= NULL;
329
316
}
330
317
 
331
 
void Session::get_xid(DRIZZLE_XID *xid)
332
 
{
333
 
  *xid = *(DRIZZLE_XID *) &transaction.xid_state.xid;
334
 
}
 
318
#if defined(__cplusplus)
 
319
extern "C" {
 
320
#endif
 
321
 
 
322
void *session_alloc(Session *session, unsigned int size)
 
323
{
 
324
  return session->alloc(size);
 
325
}
 
326
 
 
327
void *session_calloc(Session *session, unsigned int size)
 
328
{
 
329
  return session->calloc(size);
 
330
}
 
331
 
 
332
char *session_strdup(Session *session, const char *str)
 
333
{
 
334
  return session->strdup(str);
 
335
}
 
336
 
 
337
char *session_strmake(Session *session, const char *str, unsigned int size)
 
338
{
 
339
  return session->strmake(str, size);
 
340
}
 
341
 
 
342
void *session_memdup(Session *session, const void* str, unsigned int size)
 
343
{
 
344
  return session->memdup(str, size);
 
345
}
 
346
 
 
347
void session_get_xid(const Session *session, DRIZZLE_XID *xid)
 
348
{
 
349
  *xid = *(DRIZZLE_XID *) &session->transaction.xid_state.xid;
 
350
}
 
351
 
 
352
#if defined(__cplusplus)
 
353
}
 
354
#endif
335
355
 
336
356
/* Do operations that may take a long time */
337
357
 
339
359
{
340
360
  assert(cleanup_done == false);
341
361
 
342
 
  setKilled(KILL_CONNECTION);
 
362
  killed= KILL_CONNECTION;
343
363
#ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE
344
364
  if (transaction.xid_state.xa_state == XA_PREPARED)
345
365
  {
347
367
  }
348
368
#endif
349
369
  {
350
 
    TransactionServices &transaction_services= TransactionServices::singleton();
351
 
    transaction_services.rollbackTransaction(this, true);
 
370
    ha_rollback(this);
352
371
    xid_cache_delete(&transaction.xid_state);
353
372
  }
354
 
 
355
 
  for (UserVars::iterator iter= user_vars.begin();
356
 
       iter != user_vars.end();
357
 
       iter++)
358
 
  {
359
 
    user_var_entry *entry= (*iter).second;
360
 
    delete entry;
361
 
  }
362
 
  user_vars.clear();
363
 
 
364
 
 
 
373
  hash_free(&user_vars);
365
374
  close_temporary_tables();
366
375
 
367
376
  if (global_read_lock)
368
 
  {
369
 
    unlockGlobalReadLock();
370
 
  }
 
377
    unlock_global_read_lock(this);
371
378
 
372
379
  cleanup_done= true;
373
380
}
374
381
 
375
382
Session::~Session()
376
383
{
377
 
  this->checkSentry();
 
384
  Session_CHECK_SENTRY(this);
 
385
  add_to_status(&global_status_var, &status_var);
378
386
 
379
387
  if (client->isConnected())
380
388
  {
381
389
    if (global_system_variables.log_warnings)
382
 
        errmsg_printf(ERRMSG_LVL_WARN, ER(ER_FORCING_CLOSE),internal::my_progname,
 
390
        errmsg_printf(ERRMSG_LVL_WARN, ER(ER_FORCING_CLOSE),my_progname,
383
391
                      thread_id,
384
 
                      (getSecurityContext().getUser().c_str() ?
385
 
                       getSecurityContext().getUser().c_str() : ""));
 
392
                      (security_ctx.user.c_str() ?
 
393
                       security_ctx.user.c_str() : ""));
386
394
    disconnect(0, false);
387
395
  }
388
396
 
396
404
  plugin::StorageEngine::closeConnection(this);
397
405
  plugin_sessionvar_cleanup(this);
398
406
 
399
 
  warn_root.free_root(MYF(0));
 
407
  free_root(&warn_root,MYF(0));
 
408
  free_root(&transaction.mem_root,MYF(0));
400
409
  mysys_var=0;                                  // Safety (shouldn't be needed)
401
410
  dbug_sentry= Session_SENTRY_GONE;
402
411
 
403
 
  main_mem_root.free_root(MYF(0));
404
 
  currentMemRoot().release();
405
 
  currentSession().release();
406
 
 
407
 
  plugin::Logging::postEndDo(this);
408
 
  plugin::EventObserver::deregisterSessionEvents(*this); 
409
 
 
410
 
  for (PropertyMap::iterator iter= life_properties.begin(); iter != life_properties.end(); iter++)
411
 
  {
412
 
    delete (*iter).second;
413
 
  }
414
 
  life_properties.clear();
415
 
}
416
 
 
417
 
void Session::setClient(plugin::Client *client_arg)
418
 
{
419
 
  client= client_arg;
420
 
  client->setSession(this);
421
 
}
422
 
 
423
 
void Session::awake(Session::killed_state_t state_to_set)
424
 
{
425
 
  if ((state_to_set == Session::KILL_QUERY) and (command == COM_SLEEP))
426
 
    return;
427
 
 
428
 
  this->checkSentry();
429
 
 
430
 
  setKilled(state_to_set);
431
 
  scheduler->killSession(this);
432
 
 
 
412
  free_root(&main_mem_root, MYF(0));
 
413
  pthread_setspecific(THR_Session,  0);
 
414
 
 
415
 
 
416
  /* Ensure that no one is using Session */
 
417
  pthread_mutex_unlock(&LOCK_delete);
 
418
  pthread_mutex_destroy(&LOCK_delete);
 
419
}
 
420
 
 
421
/*
 
422
  Add all status variables to another status variable array
 
423
 
 
424
  SYNOPSIS
 
425
   add_to_status()
 
426
   to_var       add to this array
 
427
   from_var     from this array
 
428
 
 
429
  NOTES
 
430
    This function assumes that all variables are long/ulong.
 
431
    If this assumption will change, then we have to explictely add
 
432
    the other variables after the while loop
 
433
*/
 
434
void add_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var)
 
435
{
 
436
  ulong *end= (ulong*) ((unsigned char*) to_var +
 
437
                        offsetof(STATUS_VAR, last_system_status_var) +
 
438
                        sizeof(ulong));
 
439
  ulong *to= (ulong*) to_var, *from= (ulong*) from_var;
 
440
 
 
441
  while (to != end)
 
442
    *(to++)+= *(from++);
 
443
}
 
444
 
 
445
/*
 
446
  Add the difference between two status variable arrays to another one.
 
447
 
 
448
  SYNOPSIS
 
449
    add_diff_to_status
 
450
    to_var       add to this array
 
451
    from_var     from this array
 
452
    dec_var      minus this array
 
453
 
 
454
  NOTE
 
455
    This function assumes that all variables are long/ulong.
 
456
*/
 
457
void add_diff_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var,
 
458
                        STATUS_VAR *dec_var)
 
459
{
 
460
  ulong *end= (ulong*) ((unsigned char*) to_var + offsetof(STATUS_VAR,
 
461
                                                  last_system_status_var) +
 
462
                        sizeof(ulong));
 
463
  ulong *to= (ulong*) to_var, *from= (ulong*) from_var, *dec= (ulong*) dec_var;
 
464
 
 
465
  while (to != end)
 
466
    *(to++)+= *(from++) - *(dec++);
 
467
}
 
468
 
 
469
void Session::awake(Session::killed_state state_to_set)
 
470
{
 
471
  Session_CHECK_SENTRY(this);
 
472
  safe_mutex_assert_owner(&LOCK_delete);
 
473
 
 
474
  killed= state_to_set;
433
475
  if (state_to_set != Session::KILL_QUERY)
434
476
  {
 
477
    scheduler->killSession(this);
435
478
    DRIZZLE_CONNECTION_DONE(thread_id);
436
479
  }
437
 
 
438
480
  if (mysys_var)
439
481
  {
440
 
    boost_unique_lock_t scopedLock(mysys_var->mutex);
 
482
    pthread_mutex_lock(&mysys_var->mutex);
441
483
    /*
442
 
      "
443
484
      This broadcast could be up in the air if the victim thread
444
485
      exits the cond in the time between read and broadcast, but that is
445
486
      ok since all we want to do is to make the victim thread get out
460
501
    */
461
502
    if (mysys_var->current_cond && mysys_var->current_mutex)
462
503
    {
463
 
      mysys_var->current_mutex->lock();
464
 
      mysys_var->current_cond->notify_all();
465
 
      mysys_var->current_mutex->unlock();
 
504
      pthread_mutex_lock(mysys_var->current_mutex);
 
505
      pthread_cond_broadcast(mysys_var->current_cond);
 
506
      pthread_mutex_unlock(mysys_var->current_mutex);
466
507
    }
 
508
    pthread_mutex_unlock(&mysys_var->mutex);
467
509
  }
468
510
}
469
511
 
470
512
/*
471
513
  Remember the location of thread info, the structure needed for
472
 
  memory::sql_alloc() and the structure for the net buffer
 
514
  sql_alloc() and the structure for the net buffer
473
515
*/
474
516
bool Session::storeGlobals()
475
517
{
479
521
  */
480
522
  assert(thread_stack);
481
523
 
482
 
  currentSession().release();
483
 
  currentSession().reset(this);
484
 
 
485
 
  currentMemRoot().release();
486
 
  currentMemRoot().reset(&mem_root);
 
524
  if (pthread_setspecific(THR_Session,  this) ||
 
525
      pthread_setspecific(THR_Mem_root, &mem_root))
 
526
    return true;
487
527
 
488
528
  mysys_var=my_thread_var;
489
529
 
492
532
    This allows us to move Session to different threads if needed.
493
533
  */
494
534
  mysys_var->id= thread_id;
 
535
  real_id= pthread_self();                      // For debugging
495
536
 
496
537
  /*
497
538
    We have to call thr_lock_info_init() again here as Session may have been
498
539
    created in another thread
499
540
  */
500
 
  lock_info.init();
501
 
 
 
541
  thr_lock_info_init(&lock_info);
502
542
  return false;
503
543
}
504
544
 
517
557
  set_proc_info(NULL);
518
558
  command= COM_SLEEP;
519
559
  set_time();
 
560
  ha_enable_transaction(this,true);
520
561
 
521
 
  mem_root->reset_root_defaults(variables.query_alloc_block_size,
522
 
                                variables.query_prealloc_size);
 
562
  reset_root_defaults(mem_root, variables.query_alloc_block_size,
 
563
                      variables.query_prealloc_size);
 
564
  reset_root_defaults(&transaction.mem_root,
 
565
                      variables.trans_alloc_block_size,
 
566
                      variables.trans_prealloc_size);
523
567
  transaction.xid_state.xid.null();
524
568
  transaction.xid_state.in_session=1;
525
 
  if (use_usage)
526
 
    resetUsage();
527
569
}
528
570
 
529
571
bool Session::initGlobals()
531
573
  if (storeGlobals())
532
574
  {
533
575
    disconnect(ER_OUT_OF_RESOURCES, true);
534
 
    status_var.aborted_connects++;
 
576
    statistic_increment(aborted_connects, &LOCK_status);
535
577
    return true;
536
578
  }
537
579
  return false;
547
589
 
548
590
  prepareForQueries();
549
591
 
550
 
  while (not client->haveError() && getKilled() != KILL_CONNECTION)
 
592
  while (! client->haveError() && killed != KILL_CONNECTION)
551
593
  {
552
 
    if (not executeStatement())
 
594
    if (! executeStatement())
553
595
      break;
554
596
  }
555
597
 
556
598
  disconnect(0, true);
557
599
}
558
600
 
559
 
bool Session::schedule(Session::shared_ptr &arg)
 
601
bool Session::schedule()
560
602
{
561
 
  arg->scheduler= plugin::Scheduler::getScheduler();
562
 
  assert(arg->scheduler);
563
 
 
564
 
  connection_count.increment();
565
 
 
566
 
  if (connection_count > current_global_counters.max_used_connections)
567
 
  {
568
 
    current_global_counters.max_used_connections= connection_count;
569
 
  }
570
 
 
571
 
  current_global_counters.connections++;
572
 
  arg->thread_id= arg->variables.pseudo_thread_id= global_thread_id++;
573
 
 
574
 
  session::Cache::singleton().insert(arg);
575
 
 
576
 
  if (unlikely(plugin::EventObserver::connectSession(*arg)))
577
 
  {
578
 
    // We should do something about an error...
579
 
  }
580
 
 
581
 
  if (plugin::Scheduler::getScheduler()->addSession(arg))
582
 
  {
583
 
    DRIZZLE_CONNECTION_START(arg->getSessionId());
 
603
  scheduler= plugin::Scheduler::getScheduler();
 
604
  assert(scheduler);
 
605
 
 
606
  ++connection_count;
 
607
 
 
608
  if (connection_count > max_used_connections)
 
609
    max_used_connections= connection_count;
 
610
 
 
611
  thread_id= variables.pseudo_thread_id= global_thread_id++;
 
612
 
 
613
  pthread_mutex_lock(&LOCK_thread_count);
 
614
  session_list.push_back(this);
 
615
  pthread_mutex_unlock(&LOCK_thread_count);
 
616
 
 
617
  if (scheduler->addSession(this))
 
618
  {
 
619
    DRIZZLE_CONNECTION_START(thread_id);
584
620
    char error_message_buff[DRIZZLE_ERRMSG_SIZE];
585
621
 
586
 
    arg->setKilled(Session::KILL_CONNECTION);
 
622
    killed= Session::KILL_CONNECTION;
587
623
 
588
 
    arg->status_var.aborted_connects++;
 
624
    statistic_increment(aborted_connects, &LOCK_status);
589
625
 
590
626
    /* Can't use my_error() since store_globals has not been called. */
591
627
    /* TODO replace will better error message */
592
628
    snprintf(error_message_buff, sizeof(error_message_buff),
593
629
             ER(ER_CANT_CREATE_THREAD), 1);
594
 
    arg->client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
595
 
 
 
630
    client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
596
631
    return true;
597
632
  }
598
633
 
599
634
  return false;
600
635
}
601
636
 
602
 
 
603
 
/*
604
 
  Is this session viewable by the current user?
605
 
*/
606
 
bool Session::isViewable() const
607
 
{
608
 
  return plugin::Authorization::isAuthorized(current_session->getSecurityContext(),
609
 
                                             this,
610
 
                                             false);
611
 
}
612
 
 
613
 
 
614
 
const char* Session::enter_cond(boost::condition_variable_any &cond, boost::mutex &mutex, const char* msg)
615
 
{
616
 
  const char* old_msg = get_proc_info();
617
 
  safe_mutex_assert_owner(mutex);
618
 
  mysys_var->current_mutex = &mutex;
619
 
  mysys_var->current_cond = &cond;
620
 
  this->set_proc_info(msg);
621
 
  return old_msg;
622
 
}
623
 
 
624
 
void Session::exit_cond(const char* old_msg)
625
 
{
626
 
  /*
627
 
    Putting the mutex unlock in exit_cond() ensures that
628
 
    mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
629
 
    locked (if that would not be the case, you'll get a deadlock if someone
630
 
    does a Session::awake() on you).
631
 
  */
632
 
  mysys_var->current_mutex->unlock();
633
 
  boost_unique_lock_t scopedLock(mysys_var->mutex);
634
 
  mysys_var->current_mutex = 0;
635
 
  mysys_var->current_cond = 0;
636
 
  this->set_proc_info(old_msg);
637
 
}
638
 
 
639
637
bool Session::authenticate()
640
638
{
641
 
  lex->start(this);
 
639
  lex_start(this);
642
640
  if (client->authenticate())
643
641
    return false;
644
642
 
645
 
  status_var.aborted_connects++;
646
 
 
 
643
  statistic_increment(aborted_connects, &LOCK_status);
647
644
  return true;
648
645
}
649
646
 
650
 
bool Session::checkUser(const std::string &passwd_str,
651
 
                        const std::string &in_db)
 
647
bool Session::checkUser(const char *passwd, uint32_t passwd_len, const char *in_db)
652
648
{
653
 
  bool is_authenticated=
654
 
    plugin::Authentication::isAuthenticated(getSecurityContext(),
655
 
                                            passwd_str);
 
649
  LEX_STRING db_str= { (char *) in_db, in_db ? strlen(in_db) : 0 };
 
650
  bool is_authenticated;
 
651
 
 
652
  if (passwd_len != 0 && passwd_len != SCRAMBLE_LENGTH)
 
653
  {
 
654
    my_error(ER_HANDSHAKE_ERROR, MYF(0), security_ctx.ip.c_str());
 
655
    return false;
 
656
  }
 
657
 
 
658
  is_authenticated= plugin::Authentication::isAuthenticated(this, passwd);
656
659
 
657
660
  if (is_authenticated != true)
658
661
  {
659
 
    status_var.access_denied++;
660
 
    /* isAuthenticated has pushed the error message */
 
662
    my_error(ER_ACCESS_DENIED_ERROR, MYF(0),
 
663
             security_ctx.user.c_str(),
 
664
             security_ctx.ip.c_str(),
 
665
             passwd_len ? ER(ER_YES) : ER(ER_NO));
 
666
 
661
667
    return false;
662
668
  }
663
669
 
 
670
  security_ctx.skip_grants();
 
671
 
664
672
  /* Change database if necessary */
665
 
  if (not in_db.empty())
 
673
  if (in_db && in_db[0])
666
674
  {
667
 
    SchemaIdentifier identifier(in_db);
668
 
    if (mysql_change_db(this, identifier))
 
675
    if (mysql_change_db(this, &db_str, false))
669
676
    {
670
677
      /* mysql_change_db() has pushed the error message. */
671
678
      return false;
672
679
    }
673
680
  }
674
681
  my_ok();
675
 
  password= not passwd_str.empty();
 
682
  password= test(passwd_len);          // remember for error messages
676
683
 
677
684
  /* Ready to handle queries */
678
685
  return true;
694
701
  main_da.reset_diagnostics_area();
695
702
 
696
703
  if (client->readCommand(&l_packet, &packet_length) == false)
697
 
  {
698
 
    return false;
699
 
  }
700
 
 
701
 
  if (getKilled() == KILL_CONNECTION)
702
704
    return false;
703
705
 
704
706
  if (packet_length == 0)
705
707
    return true;
706
708
 
707
 
  l_command= static_cast<enum_server_command>(l_packet[0]);
 
709
  l_command= (enum enum_server_command) (unsigned char) l_packet[0];
708
710
 
709
711
  if (command >= COM_END)
710
712
    command= COM_END;                           // Wrong command
711
713
 
712
714
  assert(packet_length);
713
 
  return not dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
 
715
  return ! dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
714
716
}
715
717
 
716
718
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
722
724
    in_packet_length--;
723
725
  }
724
726
  const char *pos= in_packet + in_packet_length; /* Point at end null */
725
 
  while (in_packet_length > 0 && (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
 
727
  while (in_packet_length > 0 &&
 
728
         (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
726
729
  {
727
730
    pos--;
728
731
    in_packet_length--;
729
732
  }
730
733
 
731
 
  std::string *new_query= new std::string(in_packet, in_packet + in_packet_length);
732
 
  // We can not be entirely sure _schema has a value
733
 
  if (_schema)
734
 
  {
735
 
    plugin::QueryRewriter::rewriteQuery(*_schema, *new_query);
736
 
  }
737
 
  query.reset(new_query);
738
 
  _state.reset(new State(in_packet, in_packet_length));
 
734
  /* We must allocate some extra memory for the cached query string */
 
735
  query_length= 0; /* Extra safety: Avoid races */
 
736
  query= (char*) memdup_w_gap((unsigned char*) in_packet, in_packet_length, db.length() + 1);
 
737
  if (! query)
 
738
    return false;
 
739
 
 
740
  query[in_packet_length]=0;
 
741
  query_length= in_packet_length;
739
742
 
740
743
  return true;
741
744
}
744
747
{
745
748
  bool do_release= 0;
746
749
  bool result= true;
747
 
  TransactionServices &transaction_services= TransactionServices::singleton();
748
750
 
749
751
  if (transaction.xid_state.xa_state != XA_NOTR)
750
752
  {
760
762
       * (Which of course should never happen...)
761
763
       */
762
764
      server_status&= ~SERVER_STATUS_IN_TRANS;
763
 
      if (transaction_services.commitTransaction(this, true))
 
765
      if (ha_commit(this))
764
766
        result= false;
765
767
      options&= ~(OPTION_BEGIN);
 
768
      transaction.all.modified_non_trans_table= false;
766
769
      break;
767
770
    case COMMIT_RELEASE:
768
771
      do_release= 1; /* fall through */
777
780
    case ROLLBACK_AND_CHAIN:
778
781
    {
779
782
      server_status&= ~SERVER_STATUS_IN_TRANS;
780
 
      if (transaction_services.rollbackTransaction(this, true))
 
783
      if (ha_rollback(this))
781
784
        result= false;
782
785
      options&= ~(OPTION_BEGIN);
 
786
      transaction.all.modified_non_trans_table= false;
783
787
      if (result == true && (completion == ROLLBACK_AND_CHAIN))
784
788
        result= startTransaction();
785
789
      break;
790
794
  }
791
795
 
792
796
  if (result == false)
793
 
  {
794
797
    my_error(killed_errno(), MYF(0));
795
 
  }
796
798
  else if ((result == true) && do_release)
797
 
  {
798
 
    setKilled(Session::KILL_CONNECTION);
799
 
  }
 
799
    killed= Session::KILL_CONNECTION;
800
800
 
801
801
  return result;
802
802
}
804
804
bool Session::endActiveTransaction()
805
805
{
806
806
  bool result= true;
807
 
  TransactionServices &transaction_services= TransactionServices::singleton();
808
807
 
809
808
  if (transaction.xid_state.xa_state != XA_NOTR)
810
809
  {
814
813
  if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
815
814
  {
816
815
    server_status&= ~SERVER_STATUS_IN_TRANS;
817
 
    if (transaction_services.commitTransaction(this, true))
 
816
    if (ha_commit(this))
818
817
      result= false;
819
818
  }
820
819
  options&= ~(OPTION_BEGIN);
 
820
  transaction.all.modified_non_trans_table= false;
821
821
  return result;
822
822
}
823
823
 
834
834
    options|= OPTION_BEGIN;
835
835
    server_status|= SERVER_STATUS_IN_TRANS;
836
836
 
837
 
    if (plugin::TransactionalStorageEngine::notifyStartTransaction(this, opt))
 
837
    if (opt == START_TRANS_OPT_WITH_CONS_SNAPSHOT)
838
838
    {
839
 
      result= false;
 
839
      // TODO make this a loop for all engines, not just this one (Inno only
 
840
      // right now)
 
841
      if (plugin::StorageEngine::startConsistentSnapshot(this))
 
842
      {
 
843
        result= false;
 
844
      }
840
845
    }
841
846
  }
842
847
 
865
870
  free_items();
866
871
  /* Reset where. */
867
872
  where= Session::DEFAULT_WHERE;
868
 
 
869
 
  /* Reset the temporary shares we built */
870
 
  for_each(temporary_shares.begin(),
871
 
           temporary_shares.end(),
872
 
           DeletePtr());
873
 
  temporary_shares.clear();
874
873
}
875
874
 
876
875
/**
884
883
  @return  NULL on failure, or pointer to the LEX_STRING object
885
884
*/
886
885
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
887
 
                                     const std::string &str,
888
 
                                     bool allocate_lex_string)
889
 
{
890
 
  return make_lex_string(lex_str, str.c_str(), str.length(), allocate_lex_string);
891
 
}
892
 
 
893
 
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
894
 
                                     const char* str, uint32_t length,
895
 
                                     bool allocate_lex_string)
 
886
                                 const char* str, uint32_t length,
 
887
                                 bool allocate_lex_string)
896
888
{
897
889
  if (allocate_lex_string)
898
890
    if (!(lex_str= (LEX_STRING *)alloc(sizeof(LEX_STRING))))
899
891
      return 0;
900
 
  if (!(lex_str->str= mem_root->strmake_root(str, length)))
 
892
  if (!(lex_str->str= strmake_root(mem_root, str, length)))
901
893
    return 0;
902
894
  lex_str->length= length;
903
895
  return lex_str;
904
896
}
905
897
 
 
898
/* routings to adding tables to list of changed in transaction tables */
 
899
inline static void list_include(CHANGED_TableList** prev,
 
900
                                CHANGED_TableList* curr,
 
901
                                CHANGED_TableList* new_table)
 
902
{
 
903
  if (new_table)
 
904
  {
 
905
    *prev = new_table;
 
906
    (*prev)->next = curr;
 
907
  }
 
908
}
 
909
 
 
910
/* add table to list of changed in transaction tables */
 
911
 
 
912
void Session::add_changed_table(Table *table)
 
913
{
 
914
  assert((options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
 
915
              table->cursor->has_transactions());
 
916
  add_changed_table(table->s->table_cache_key.str,
 
917
                    (long) table->s->table_cache_key.length);
 
918
}
 
919
 
 
920
 
 
921
void Session::add_changed_table(const char *key, long key_length)
 
922
{
 
923
  CHANGED_TableList **prev_changed = &transaction.changed_tables;
 
924
  CHANGED_TableList *curr = transaction.changed_tables;
 
925
 
 
926
  for (; curr; prev_changed = &(curr->next), curr = curr->next)
 
927
  {
 
928
    int cmp =  (long)curr->key_length - (long)key_length;
 
929
    if (cmp < 0)
 
930
    {
 
931
      list_include(prev_changed, curr, changed_table_dup(key, key_length));
 
932
      return;
 
933
    }
 
934
    else if (cmp == 0)
 
935
    {
 
936
      cmp = memcmp(curr->key, key, curr->key_length);
 
937
      if (cmp < 0)
 
938
      {
 
939
        list_include(prev_changed, curr, changed_table_dup(key, key_length));
 
940
        return;
 
941
      }
 
942
      else if (cmp == 0)
 
943
      {
 
944
        return;
 
945
      }
 
946
    }
 
947
  }
 
948
  *prev_changed = changed_table_dup(key, key_length);
 
949
}
 
950
 
 
951
 
 
952
CHANGED_TableList* Session::changed_table_dup(const char *key, long key_length)
 
953
{
 
954
  CHANGED_TableList* new_table =
 
955
    (CHANGED_TableList*) trans_alloc(ALIGN_SIZE(sizeof(CHANGED_TableList))+
 
956
                                      key_length + 1);
 
957
  if (!new_table)
 
958
  {
 
959
    my_error(EE_OUTOFMEMORY, MYF(ME_BELL),
 
960
             ALIGN_SIZE(sizeof(TableList)) + key_length + 1);
 
961
    killed= KILL_CONNECTION;
 
962
    return 0;
 
963
  }
 
964
 
 
965
  new_table->key= ((char*)new_table)+ ALIGN_SIZE(sizeof(CHANGED_TableList));
 
966
  new_table->next = 0;
 
967
  new_table->key_length = key_length;
 
968
  ::memcpy(new_table->key, key, key_length);
 
969
  return new_table;
 
970
}
 
971
 
 
972
 
906
973
int Session::send_explain_fields(select_result *result)
907
974
{
908
975
  List<Item> field_list;
941
1008
  return (result->send_fields(field_list));
942
1009
}
943
1010
 
944
 
void select_result::send_error(uint32_t errcode, const char *err)
945
 
{
946
 
  my_message(errcode, err, MYF(0));
947
 
}
948
 
 
949
1011
/************************************************************************
950
1012
  Handling writing to file
951
1013
************************************************************************/
955
1017
  my_message(errcode, err, MYF(0));
956
1018
  if (file > 0)
957
1019
  {
958
 
    (void) cache->end_io_cache();
959
 
    (void) internal::my_close(file, MYF(0));
960
 
    (void) internal::my_delete(path.file_string().c_str(), MYF(0));             // Delete file on error
 
1020
    (void) end_io_cache(&cache);
 
1021
    (void) my_close(file, MYF(0));
 
1022
    (void) my_delete(path, MYF(0));             // Delete file on error
961
1023
    file= -1;
962
1024
  }
963
1025
}
965
1027
 
966
1028
bool select_to_file::send_eof()
967
1029
{
968
 
  int error= test(cache->end_io_cache());
969
 
  if (internal::my_close(file, MYF(MY_WME)))
 
1030
  int error= test(end_io_cache(&cache));
 
1031
  if (my_close(file, MYF(MY_WME)))
970
1032
    error= 1;
971
1033
  if (!error)
972
1034
  {
987
1049
  /* In case of error send_eof() may be not called: close the file here. */
988
1050
  if (file >= 0)
989
1051
  {
990
 
    (void) cache->end_io_cache();
991
 
    (void) internal::my_close(file, MYF(0));
 
1052
    (void) end_io_cache(&cache);
 
1053
    (void) my_close(file, MYF(0));
992
1054
    file= -1;
993
1055
  }
994
 
  path= "";
 
1056
  path[0]= '\0';
995
1057
  row_count= 0;
996
1058
}
997
1059
 
998
 
select_to_file::select_to_file(file_exchange *ex)
999
 
  : exchange(ex),
1000
 
    file(-1),
1001
 
    cache(static_cast<internal::IO_CACHE *>(memory::sql_calloc(sizeof(internal::IO_CACHE)))),
1002
 
    row_count(0L)
1003
 
{
1004
 
  path= "";
1005
 
}
1006
1060
 
1007
1061
select_to_file::~select_to_file()
1008
1062
{
1009
 
  cleanup();
 
1063
  if (file >= 0)
 
1064
  {                                     // This only happens in case of error
 
1065
    (void) end_io_cache(&cache);
 
1066
    (void) my_close(file, MYF(0));
 
1067
    file= -1;
 
1068
  }
1010
1069
}
1011
1070
 
1012
1071
/***************************************************************************
1035
1094
*/
1036
1095
 
1037
1096
 
1038
 
static int create_file(Session *session,
1039
 
                       fs::path &target_path,
1040
 
                       file_exchange *exchange,
1041
 
                       internal::IO_CACHE *cache)
 
1097
static File create_file(Session *session, char *path, file_exchange *exchange, IO_CACHE *cache)
1042
1098
{
1043
 
  fs::path to_file(exchange->file_name);
1044
 
  int file;
1045
 
 
1046
 
  if (not to_file.has_root_directory())
 
1099
  File file;
 
1100
  uint32_t option= MY_UNPACK_FILENAME | MY_RELATIVE_PATH;
 
1101
 
 
1102
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
 
1103
  option|= MY_REPLACE_DIR;                      // Force use of db directory
 
1104
#endif
 
1105
 
 
1106
  if (!dirname_length(exchange->file_name))
1047
1107
  {
1048
 
    target_path= fs::system_complete(getDataHomeCatalog());
1049
 
    util::string::const_shared_ptr schema(session->schema());
1050
 
    if (schema and not schema->empty())
1051
 
    {
1052
 
      int count_elements= 0;
1053
 
      for (fs::path::iterator iter= to_file.begin();
1054
 
           iter != to_file.end();
1055
 
           ++iter, ++count_elements)
1056
 
      { }
1057
 
 
1058
 
      if (count_elements == 1)
1059
 
      {
1060
 
        target_path /= *schema;
1061
 
      }
1062
 
    }
1063
 
    target_path /= to_file;
 
1108
    strcpy(path, drizzle_real_data_home);
 
1109
    if (! session->db.empty())
 
1110
      strncat(path, session->db.c_str(), FN_REFLEN-strlen(drizzle_real_data_home)-1);
 
1111
    (void) fn_format(path, exchange->file_name, path, "", option);
1064
1112
  }
1065
1113
  else
1066
 
  {
1067
 
    target_path = exchange->file_name;
1068
 
  }
1069
 
 
1070
 
  if (not secure_file_priv.string().empty())
1071
 
  {
1072
 
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
1073
 
    {
1074
 
      /* Write only allowed to dir or subdir specified by secure_file_priv */
1075
 
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
1076
 
      return -1;
1077
 
    }
1078
 
  }
1079
 
 
1080
 
  if (!access(target_path.file_string().c_str(), F_OK))
 
1114
    (void) fn_format(path, exchange->file_name, drizzle_real_data_home, "", option);
 
1115
 
 
1116
  if (opt_secure_file_priv &&
 
1117
      strncmp(opt_secure_file_priv, path, strlen(opt_secure_file_priv)))
 
1118
  {
 
1119
    /* Write only allowed to dir or subdir specified by secure_file_priv */
 
1120
    my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
1121
    return -1;
 
1122
  }
 
1123
 
 
1124
  if (!access(path, F_OK))
1081
1125
  {
1082
1126
    my_error(ER_FILE_EXISTS_ERROR, MYF(0), exchange->file_name);
1083
1127
    return -1;
1084
1128
  }
1085
1129
  /* Create the file world readable */
1086
 
  if ((file= internal::my_create(target_path.file_string().c_str(), 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
 
1130
  if ((file= my_create(path, 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
1087
1131
    return file;
 
1132
#ifdef HAVE_FCHMOD
1088
1133
  (void) fchmod(file, 0666);                    // Because of umask()
1089
 
  if (cache->init_io_cache(file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
 
1134
#else
 
1135
  (void) chmod(path, 0666);
 
1136
#endif
 
1137
  if (init_io_cache(cache, file, 0L, WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1090
1138
  {
1091
 
    internal::my_close(file, MYF(0));
1092
 
    internal::my_delete(target_path.file_string().c_str(), MYF(0));  // Delete file on error, it was just created
 
1139
    my_close(file, MYF(0));
 
1140
    my_delete(path, MYF(0));  // Delete file on error, it was just created
1093
1141
    return -1;
1094
1142
  }
1095
1143
  return file;
1103
1151
  bool string_results= false, non_string_results= false;
1104
1152
  unit= u;
1105
1153
  if ((uint32_t) strlen(exchange->file_name) + NAME_LEN >= FN_REFLEN)
1106
 
  {
1107
 
    path= exchange->file_name;
1108
 
  }
 
1154
    strncpy(path,exchange->file_name,FN_REFLEN-1);
1109
1155
 
1110
1156
  /* Check if there is any blobs in data */
1111
1157
  {
1115
1161
    {
1116
1162
      if (item->max_length >= MAX_BLOB_WIDTH)
1117
1163
      {
1118
 
        blob_flag=1;
1119
 
        break;
 
1164
        blob_flag=1;
 
1165
        break;
1120
1166
      }
1121
 
 
1122
1167
      if (item->result_type() == STRING_RESULT)
1123
1168
        string_results= true;
1124
1169
      else
1153
1198
    return 1;
1154
1199
  }
1155
1200
 
1156
 
  if ((file= create_file(session, path, exchange, cache)) < 0)
 
1201
  if ((file= create_file(session, path, exchange, &cache)) < 0)
1157
1202
    return 1;
1158
1203
 
1159
1204
  return 0;
1160
1205
}
1161
1206
 
 
1207
 
 
1208
#define NEED_ESCAPING(x) ((int) (unsigned char) (x) == escape_char    || \
 
1209
                          (enclosed ? (int) (unsigned char) (x) == field_sep_char      \
 
1210
                                    : (int) (unsigned char) (x) == field_term_char) || \
 
1211
                          (int) (unsigned char) (x) == line_sep_char  || \
 
1212
                          !(x))
 
1213
 
1162
1214
bool select_export::send_data(List<Item> &items)
1163
1215
{
1164
1216
  char buff[MAX_FIELD_WIDTH],null_buff[2],space[MAX_FIELD_WIDTH];
1169
1221
  if (unit->offset_limit_cnt)
1170
1222
  {                                             // using limit offset,count
1171
1223
    unit->offset_limit_cnt--;
1172
 
    return false;
 
1224
    return(0);
1173
1225
  }
1174
1226
  row_count++;
1175
1227
  Item *item;
1176
1228
  uint32_t used_length=0,items_left=items.elements;
1177
1229
  List_iterator_fast<Item> li(items);
1178
1230
 
1179
 
  if (my_b_write(cache,(unsigned char*) exchange->line_start->ptr(),
 
1231
  if (my_b_write(&cache,(unsigned char*) exchange->line_start->ptr(),
1180
1232
                 exchange->line_start->length()))
1181
 
    return true;
1182
 
 
 
1233
    goto err;
1183
1234
  while ((item=li++))
1184
1235
  {
1185
1236
    Item_result result_type=item->result_type();
1188
1239
    res=item->str_result(&tmp);
1189
1240
    if (res && enclosed)
1190
1241
    {
1191
 
      if (my_b_write(cache,(unsigned char*) exchange->enclosed->ptr(),
 
1242
      if (my_b_write(&cache,(unsigned char*) exchange->enclosed->ptr(),
1192
1243
                     exchange->enclosed->length()))
1193
 
        return true;
 
1244
        goto err;
1194
1245
    }
1195
1246
    if (!res)
1196
1247
    {                                           // NULL
1200
1251
        {
1201
1252
          null_buff[0]=escape_char;
1202
1253
          null_buff[1]='N';
1203
 
          if (my_b_write(cache,(unsigned char*) null_buff,2))
1204
 
            return true;
 
1254
          if (my_b_write(&cache,(unsigned char*) null_buff,2))
 
1255
            goto err;
1205
1256
        }
1206
 
        else if (my_b_write(cache,(unsigned char*) "NULL",4))
1207
 
          return true;
 
1257
        else if (my_b_write(&cache,(unsigned char*) "NULL",4))
 
1258
          goto err;
1208
1259
      }
1209
1260
      else
1210
1261
      {
1214
1265
    else
1215
1266
    {
1216
1267
      if (fixed_row_size)
1217
 
        used_length= min(res->length(), static_cast<size_t>(item->max_length));
 
1268
        used_length= min(res->length(),item->max_length);
1218
1269
      else
1219
1270
        used_length= res->length();
1220
1271
 
1276
1327
            assert before the loop makes that sure.
1277
1328
          */
1278
1329
 
1279
 
          if ((needs_escaping(*pos, enclosed) ||
 
1330
          if ((NEED_ESCAPING(*pos) ||
1280
1331
               (check_second_byte &&
1281
1332
                my_mbcharlen(character_set_client, (unsigned char) *pos) == 2 &&
1282
1333
                pos + 1 < end &&
1283
 
                needs_escaping(pos[1], enclosed))) &&
 
1334
                NEED_ESCAPING(pos[1]))) &&
1284
1335
              /*
1285
1336
                Don't escape field_term_char by doubling - doubling is only
1286
1337
                valid for ENCLOSED BY characters:
1293
1344
                          is_ambiguous_field_sep) ?
1294
1345
              field_sep_char : escape_char;
1295
1346
            tmp_buff[1]= *pos ? *pos : '0';
1296
 
            if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)) ||
1297
 
                my_b_write(cache,(unsigned char*) tmp_buff,2))
1298
 
              return true;
 
1347
            if (my_b_write(&cache,(unsigned char*) start,(uint32_t) (pos-start)) ||
 
1348
                my_b_write(&cache,(unsigned char*) tmp_buff,2))
 
1349
              goto err;
1299
1350
            start=pos+1;
1300
1351
          }
1301
1352
        }
1302
 
        if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)))
1303
 
          return true;
 
1353
        if (my_b_write(&cache,(unsigned char*) start,(uint32_t) (pos-start)))
 
1354
          goto err;
1304
1355
      }
1305
 
      else if (my_b_write(cache,(unsigned char*) res->ptr(),used_length))
1306
 
        return true;
 
1356
      else if (my_b_write(&cache,(unsigned char*) res->ptr(),used_length))
 
1357
        goto err;
1307
1358
    }
1308
1359
    if (fixed_row_size)
1309
1360
    {                                           // Fill with space
1318
1369
        uint32_t length=item->max_length-used_length;
1319
1370
        for (; length > sizeof(space) ; length-=sizeof(space))
1320
1371
        {
1321
 
          if (my_b_write(cache,(unsigned char*) space,sizeof(space)))
1322
 
            return true;
 
1372
          if (my_b_write(&cache,(unsigned char*) space,sizeof(space)))
 
1373
            goto err;
1323
1374
        }
1324
 
        if (my_b_write(cache,(unsigned char*) space,length))
1325
 
          return true;
 
1375
        if (my_b_write(&cache,(unsigned char*) space,length))
 
1376
          goto err;
1326
1377
      }
1327
1378
    }
1328
1379
    if (res && enclosed)
1329
1380
    {
1330
 
      if (my_b_write(cache, (unsigned char*) exchange->enclosed->ptr(),
 
1381
      if (my_b_write(&cache, (unsigned char*) exchange->enclosed->ptr(),
1331
1382
                     exchange->enclosed->length()))
1332
 
        return true;
 
1383
        goto err;
1333
1384
    }
1334
1385
    if (--items_left)
1335
1386
    {
1336
 
      if (my_b_write(cache, (unsigned char*) exchange->field_term->ptr(),
 
1387
      if (my_b_write(&cache, (unsigned char*) exchange->field_term->ptr(),
1337
1388
                     field_term_length))
1338
 
        return true;
 
1389
        goto err;
1339
1390
    }
1340
1391
  }
1341
 
  if (my_b_write(cache,(unsigned char*) exchange->line_term->ptr(),
 
1392
  if (my_b_write(&cache,(unsigned char*) exchange->line_term->ptr(),
1342
1393
                 exchange->line_term->length()))
1343
 
  {
1344
 
    return true;
1345
 
  }
1346
 
 
1347
 
  return false;
 
1394
    goto err;
 
1395
  return(0);
 
1396
err:
 
1397
  return(1);
1348
1398
}
1349
1399
 
1350
1400
 
1357
1407
select_dump::prepare(List<Item> &, Select_Lex_Unit *u)
1358
1408
{
1359
1409
  unit= u;
1360
 
  return (int) ((file= create_file(session, path, exchange, cache)) < 0);
 
1410
  return (int) ((file= create_file(session, path, exchange, &cache)) < 0);
1361
1411
}
1362
1412
 
1363
1413
 
1377
1427
  if (row_count++ > 1)
1378
1428
  {
1379
1429
    my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0));
1380
 
    return 1;
 
1430
    goto err;
1381
1431
  }
1382
1432
  while ((item=li++))
1383
1433
  {
1384
1434
    res=item->str_result(&tmp);
1385
1435
    if (!res)                                   // If NULL
1386
1436
    {
1387
 
      if (my_b_write(cache,(unsigned char*) "",1))
1388
 
        return 1;
 
1437
      if (my_b_write(&cache,(unsigned char*) "",1))
 
1438
        goto err;
1389
1439
    }
1390
 
    else if (my_b_write(cache,(unsigned char*) res->ptr(),res->length()))
 
1440
    else if (my_b_write(&cache,(unsigned char*) res->ptr(),res->length()))
1391
1441
    {
1392
 
      my_error(ER_ERROR_ON_WRITE, MYF(0), path.file_string().c_str(), errno);
1393
 
      return 1;
 
1442
      my_error(ER_ERROR_ON_WRITE, MYF(0), path, my_errno);
 
1443
      goto err;
1394
1444
    }
1395
1445
  }
1396
1446
  return(0);
 
1447
err:
 
1448
  return(1);
1397
1449
}
1398
1450
 
1399
1451
 
1451
1503
      switch (val_item->result_type())
1452
1504
      {
1453
1505
      case REAL_RESULT:
1454
 
        op= &select_max_min_finder_subselect::cmp_real;
1455
 
        break;
 
1506
        op= &select_max_min_finder_subselect::cmp_real;
 
1507
        break;
1456
1508
      case INT_RESULT:
1457
 
        op= &select_max_min_finder_subselect::cmp_int;
1458
 
        break;
 
1509
        op= &select_max_min_finder_subselect::cmp_int;
 
1510
        break;
1459
1511
      case STRING_RESULT:
1460
 
        op= &select_max_min_finder_subselect::cmp_str;
1461
 
        break;
 
1512
        op= &select_max_min_finder_subselect::cmp_str;
 
1513
        break;
1462
1514
      case DECIMAL_RESULT:
1463
1515
        op= &select_max_min_finder_subselect::cmp_decimal;
1464
1516
        break;
1465
1517
      case ROW_RESULT:
1466
1518
        // This case should never be choosen
1467
 
        assert(0);
1468
 
        op= 0;
 
1519
        assert(0);
 
1520
        op= 0;
1469
1521
      }
1470
1522
    }
1471
1523
    cache->store(val_item);
1554
1606
void Session::end_statement()
1555
1607
{
1556
1608
  /* Cleanup SQL processing state to reuse this statement in next query. */
1557
 
  lex->end();
1558
 
  query_cache_key= ""; // reset the cache key
1559
 
  resetResultsetMessage();
 
1609
  lex_end(lex);
1560
1610
}
1561
1611
 
1562
1612
bool Session::copy_db_to(char **p_db, size_t *p_db_length)
1563
1613
{
1564
 
  assert(_schema);
1565
 
  if (_schema and _schema->empty())
1566
 
  {
1567
 
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1568
 
    return true;
1569
 
  }
1570
 
  else if (not _schema)
1571
 
  {
1572
 
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1573
 
    return true;
1574
 
  }
1575
 
  assert(_schema);
1576
 
 
1577
 
  *p_db= strmake(_schema->c_str(), _schema->size());
1578
 
  *p_db_length= _schema->size();
1579
 
 
 
1614
  if (db.empty())
 
1615
  {
 
1616
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
 
1617
    return true;
 
1618
  }
 
1619
  *p_db= strmake(db.c_str(), db.length());
 
1620
  *p_db_length= db.length();
1580
1621
  return false;
1581
1622
}
1582
1623
 
1591
1632
  quick_group= 1;
1592
1633
  table_charset= 0;
1593
1634
  precomputed_group_by= 0;
 
1635
  bit_fields_as_long= 0;
1594
1636
}
1595
1637
 
1596
1638
void Tmp_Table_Param::cleanup(void)
1615
1657
  memset(&status_var, 0, sizeof(status_var));
1616
1658
}
1617
1659
 
1618
 
 
1619
 
void Session::set_db(const std::string &new_db)
 
1660
void Security_context::skip_grants()
 
1661
{
 
1662
  /* privileges for the user are unknown everything is allowed */
 
1663
}
 
1664
 
 
1665
 
 
1666
/****************************************************************************
 
1667
  Handling of open and locked tables states.
 
1668
 
 
1669
  This is used when we want to open/lock (and then close) some tables when
 
1670
  we already have a set of tables open and locked. We use these methods for
 
1671
  access to mysql.proc table to find definitions of stored routines.
 
1672
****************************************************************************/
 
1673
 
 
1674
void Session::reset_n_backup_open_tables_state(Open_tables_state *backup)
 
1675
{
 
1676
  backup->set_open_tables_state(this);
 
1677
  reset_open_tables_state();
 
1678
  backups_available= false;
 
1679
}
 
1680
 
 
1681
 
 
1682
void Session::restore_backup_open_tables_state(Open_tables_state *backup)
 
1683
{
 
1684
  /*
 
1685
    Before we will throw away current open tables state we want
 
1686
    to be sure that it was properly cleaned up.
 
1687
  */
 
1688
  assert(open_tables == 0 && temporary_tables == 0 &&
 
1689
              derived_tables == 0 &&
 
1690
              lock == 0);
 
1691
  set_open_tables_state(backup);
 
1692
}
 
1693
 
 
1694
 
 
1695
bool Session::set_db(const char *new_db, size_t length)
1620
1696
{
1621
1697
  /* Do not reallocate memory if current chunk is big enough. */
1622
 
  if (new_db.length())
1623
 
  {
1624
 
    _schema.reset(new std::string(new_db));
1625
 
  }
 
1698
  if (length)
 
1699
    db= new_db;
1626
1700
  else
1627
 
  {
1628
 
    _schema.reset(new std::string(""));
1629
 
  }
1630
 
}
1631
 
 
 
1701
    db.clear();
 
1702
 
 
1703
  return false;
 
1704
}
 
1705
 
 
1706
 
 
1707
/**
 
1708
  Check the killed state of a user thread
 
1709
  @param session  user thread
 
1710
  @retval 0 the user thread is active
 
1711
  @retval 1 the user thread has been killed
 
1712
*/
 
1713
extern "C" int session_killed(const Session *session)
 
1714
{
 
1715
  return(session->killed);
 
1716
}
 
1717
 
 
1718
/**
 
1719
  Return the session id of a user session
 
1720
  @param pointer to Session object
 
1721
  @return session's id
 
1722
*/
 
1723
extern "C" unsigned long session_get_thread_id(const Session *session)
 
1724
{
 
1725
  return (unsigned long) session->getSessionId();
 
1726
}
 
1727
 
 
1728
 
 
1729
extern "C"
 
1730
LEX_STRING *session_make_lex_string(Session *session, LEX_STRING *lex_str,
 
1731
                                const char *str, unsigned int size,
 
1732
                                int allocate_lex_string)
 
1733
{
 
1734
  return session->make_lex_string(lex_str, str, size,
 
1735
                              (bool) allocate_lex_string);
 
1736
}
 
1737
 
 
1738
const struct charset_info_st *session_charset(Session *session)
 
1739
{
 
1740
  return(session->charset());
 
1741
}
 
1742
 
 
1743
char **session_query(Session *session)
 
1744
{
 
1745
  return(&session->query);
 
1746
}
 
1747
 
 
1748
int session_non_transactional_update(const Session *session)
 
1749
{
 
1750
  return(session->transaction.all.modified_non_trans_table);
 
1751
}
 
1752
 
 
1753
void session_mark_transaction_to_rollback(Session *session, bool all)
 
1754
{
 
1755
  mark_transaction_to_rollback(session, all);
 
1756
}
1632
1757
 
1633
1758
/**
1634
1759
  Mark transaction to rollback and mark error as fatal to a sub-statement.
1651
1776
  plugin_sessionvar_cleanup(this);
1652
1777
 
1653
1778
  /* If necessary, log any aborted or unauthorized connections */
1654
 
  if (getKilled() || client->wasAborted())
1655
 
  {
1656
 
    status_var.aborted_threads++;
1657
 
  }
 
1779
  if (killed || client->wasAborted())
 
1780
    statistic_increment(aborted_threads, &LOCK_status);
1658
1781
 
1659
1782
  if (client->wasAborted())
1660
1783
  {
1661
 
    if (not getKilled() && variables.log_warnings > 1)
 
1784
    if (! killed && variables.log_warnings > 1)
1662
1785
    {
1663
 
      SecurityContext *sctx= &security_ctx;
 
1786
      Security_context *sctx= &security_ctx;
1664
1787
 
1665
1788
      errmsg_printf(ERRMSG_LVL_WARN, ER(ER_NEW_ABORTING_CONNECTION)
1666
1789
                  , thread_id
1667
 
                  , (_schema->empty() ? "unconnected" : _schema->c_str())
1668
 
                  , sctx->getUser().empty() == false ? sctx->getUser().c_str() : "unauthenticated"
1669
 
                  , sctx->getIp().c_str()
 
1790
                  , (db.empty() ? "unconnected" : db.c_str())
 
1791
                  , sctx->user.empty() == false ? sctx->user.c_str() : "unauthenticated"
 
1792
                  , sctx->ip.c_str()
1670
1793
                  , (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1671
1794
    }
1672
1795
  }
1673
1796
 
1674
1797
  /* Close out our connection to the client */
1675
1798
  if (should_lock)
1676
 
    session::Cache::singleton().mutex().lock();
1677
 
 
1678
 
  setKilled(Session::KILL_CONNECTION);
1679
 
 
 
1799
    (void) pthread_mutex_lock(&LOCK_thread_count);
 
1800
  killed= Session::KILL_CONNECTION;
1680
1801
  if (client->isConnected())
1681
1802
  {
1682
1803
    if (errcode)
1686
1807
    }
1687
1808
    client->close();
1688
1809
  }
1689
 
 
1690
1810
  if (should_lock)
1691
 
  {
1692
 
    session::Cache::singleton().mutex().unlock();
1693
 
  }
 
1811
    (void) pthread_mutex_unlock(&LOCK_thread_count);
1694
1812
}
1695
1813
 
1696
1814
void Session::reset_for_next_command()
1707
1825
  server_status&= ~ (SERVER_MORE_RESULTS_EXISTS |
1708
1826
                          SERVER_QUERY_NO_INDEX_USED |
1709
1827
                          SERVER_QUERY_NO_GOOD_INDEX_USED);
 
1828
  /*
 
1829
    If in autocommit mode and not in a transaction, reset
 
1830
    OPTION_STATUS_NO_TRANS_UPDATE to not get warnings
 
1831
    in ha_rollback_trans() about some tables couldn't be rolled back.
 
1832
  */
 
1833
  if (!(options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
 
1834
  {
 
1835
    transaction.all.modified_non_trans_table= false;
 
1836
  }
1710
1837
 
1711
1838
  clear_error();
1712
1839
  main_da.reset_diagnostics_area();
1718
1845
  Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
1719
1846
*/
1720
1847
 
1721
 
void Open_tables_state::close_temporary_tables()
 
1848
void Session::close_temporary_tables()
1722
1849
{
1723
1850
  Table *table;
1724
1851
  Table *tmp_next;
1725
1852
 
1726
 
  if (not temporary_tables)
 
1853
  if (!temporary_tables)
1727
1854
    return;
1728
1855
 
1729
1856
  for (table= temporary_tables; table; table= tmp_next)
1730
1857
  {
1731
 
    tmp_next= table->getNext();
1732
 
    nukeTable(table);
 
1858
    tmp_next= table->next;
 
1859
    close_temporary(table);
1733
1860
  }
1734
1861
  temporary_tables= NULL;
1735
1862
}
1738
1865
  unlink from session->temporary tables and close temporary table
1739
1866
*/
1740
1867
 
1741
 
void Open_tables_state::close_temporary_table(Table *table)
 
1868
void Session::close_temporary_table(Table *table)
 
1869
                         
1742
1870
{
1743
 
  if (table->getPrev())
 
1871
  if (table->prev)
1744
1872
  {
1745
 
    table->getPrev()->setNext(table->getNext());
1746
 
    if (table->getPrev()->getNext())
1747
 
    {
1748
 
      table->getNext()->setPrev(table->getPrev());
1749
 
    }
 
1873
    table->prev->next= table->next;
 
1874
    if (table->prev->next)
 
1875
      table->next->prev= table->prev;
1750
1876
  }
1751
1877
  else
1752
1878
  {
1757
1883
      passing non-zero value to end_slave via rli->save_temporary_tables
1758
1884
      when no temp tables opened, see an invariant below.
1759
1885
    */
1760
 
    temporary_tables= table->getNext();
 
1886
    temporary_tables= table->next;
1761
1887
    if (temporary_tables)
1762
 
    {
1763
 
      table->getNext()->setPrev(NULL);
1764
 
    }
 
1888
      table->next->prev= NULL;
1765
1889
  }
1766
 
  nukeTable(table);
 
1890
  close_temporary(table);
1767
1891
}
1768
1892
 
1769
1893
/*
1770
 
  Close and drop a temporary table
 
1894
  Close and delete a temporary table
1771
1895
 
1772
1896
  NOTE
1773
1897
  This dosn't unlink table from session->temporary
1774
1898
  If this is needed, use close_temporary_table()
1775
1899
*/
1776
1900
 
1777
 
void Open_tables_state::nukeTable(Table *table)
 
1901
void Session::close_temporary(Table *table)
1778
1902
{
1779
 
  plugin::StorageEngine *table_type= table->getShare()->db_type();
 
1903
  plugin::StorageEngine *table_type= table->s->db_type();
1780
1904
 
1781
1905
  table->free_io_cache();
1782
 
  table->delete_table();
1783
 
 
1784
 
  TableIdentifier identifier(table->getShare()->getSchemaName(), table->getShare()->getTableName(), table->getShare()->getPath());
1785
 
  rm_temporary_table(table_type, identifier);
1786
 
 
1787
 
  delete table->getMutableShare();
1788
 
 
 
1906
  table->closefrm(false);
 
1907
 
 
1908
  rm_temporary_table(table_type, table->s->path.str);
 
1909
 
 
1910
  table->s->free_table_share();
1789
1911
  /* This makes me sad, but we're allocating it via malloc */
1790
 
  delete table;
 
1912
  free(table);
1791
1913
}
1792
1914
 
1793
1915
/** Clear most status variables. */
1794
1916
extern time_t flush_status_time;
 
1917
extern uint32_t max_used_connections;
1795
1918
 
1796
1919
void Session::refresh_status()
1797
1920
{
 
1921
  pthread_mutex_lock(&LOCK_status);
 
1922
 
 
1923
  /* Add thread's status variabes to global status */
 
1924
  add_to_status(&global_status_var, &status_var);
 
1925
 
1798
1926
  /* Reset thread's status variables */
1799
1927
  memset(&status_var, 0, sizeof(status_var));
1800
1928
 
 
1929
  /* Reset some global variables */
 
1930
  reset_status_vars();
 
1931
 
 
1932
  /* Reset the counters of all key caches (default and named). */
 
1933
  reset_key_cache_counters();
1801
1934
  flush_status_time= time((time_t*) 0);
1802
 
  current_global_counters.max_used_connections= 1; /* We set it to one, because we know we exist */
1803
 
  current_global_counters.connections= 0;
 
1935
  max_used_connections= 1; /* We set it to one, because we know we exist */
 
1936
  pthread_mutex_unlock(&LOCK_status);
1804
1937
}
1805
1938
 
1806
1939
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1807
1940
{
1808
 
  return getVariable(std::string(name.str, name.length), create_if_not_exists);
1809
 
}
1810
 
 
1811
 
user_var_entry *Session::getVariable(const std::string  &name, bool create_if_not_exists)
1812
 
{
1813
 
  UserVarsRange ppp= user_vars.equal_range(name);
1814
 
 
1815
 
  for (UserVars::iterator iter= ppp.first;
1816
 
       iter != ppp.second; ++iter)
1817
 
  {
1818
 
    return (*iter).second;
1819
 
  }
1820
 
 
1821
 
  if (not create_if_not_exists)
1822
 
    return NULL;
1823
 
 
1824
1941
  user_var_entry *entry= NULL;
1825
 
  entry= new (nothrow) user_var_entry(name.c_str(), query_id);
1826
 
 
1827
 
  if (entry == NULL)
1828
 
    return NULL;
1829
 
 
1830
 
  std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(name, entry));
1831
 
 
1832
 
  if (not returnable.second)
 
1942
 
 
1943
  entry= (user_var_entry*) hash_search(&user_vars, (unsigned char*) name.str, name.length);
 
1944
 
 
1945
  if ((entry == NULL) && create_if_not_exists)
1833
1946
  {
1834
 
    delete entry;
 
1947
    if (!hash_inited(&user_vars))
 
1948
      return NULL;
 
1949
    entry= new (nothrow) user_var_entry(name.str, query_id);
 
1950
 
 
1951
    if (entry == NULL)
 
1952
      return NULL;
 
1953
 
 
1954
    if (my_hash_insert(&user_vars, (unsigned char*) entry))
 
1955
    {
 
1956
      assert(1);
 
1957
      free((char*) entry);
 
1958
      return 0;
 
1959
    }
 
1960
 
1835
1961
  }
1836
1962
 
1837
1963
  return entry;
1838
1964
}
1839
1965
 
1840
 
void Session::setVariable(const std::string &name, const std::string &value)
1841
 
{
1842
 
  user_var_entry *updateable_var= getVariable(name.c_str(), true);
1843
 
 
1844
 
  updateable_var->update_hash(false,
1845
 
                              (void*)value.c_str(),
1846
 
                              static_cast<uint32_t>(value.length()), STRING_RESULT,
1847
 
                              &my_charset_bin,
1848
 
                              DERIVATION_IMPLICIT, false);
1849
 
}
1850
 
 
1851
 
void Open_tables_state::mark_temp_tables_as_free_for_reuse()
1852
 
{
1853
 
  for (Table *table= temporary_tables ; table ; table= table->getNext())
 
1966
void Session::mark_temp_tables_as_free_for_reuse()
 
1967
{
 
1968
  for (Table *table= temporary_tables ; table ; table= table->next)
1854
1969
  {
1855
 
    if (table->query_id == getQueryId())
 
1970
    if (table->query_id == query_id)
1856
1971
    {
1857
1972
      table->query_id= 0;
1858
1973
      table->cursor->ha_reset();
1862
1977
 
1863
1978
void Session::mark_used_tables_as_free_for_reuse(Table *table)
1864
1979
{
1865
 
  for (; table ; table= table->getNext())
 
1980
  for (; table ; table= table->next)
1866
1981
  {
1867
 
    if (table->query_id == getQueryId())
 
1982
    if (table->query_id == query_id)
1868
1983
    {
1869
1984
      table->query_id= 0;
1870
1985
      table->cursor->ha_reset();
1883
1998
*/
1884
1999
void Session::close_thread_tables()
1885
2000
{
1886
 
  clearDerivedTables();
 
2001
  Table *table;
 
2002
 
 
2003
  /*
 
2004
    We are assuming here that session->derived_tables contains ONLY derived
 
2005
    tables for this substatement. i.e. instead of approach which uses
 
2006
    query_id matching for determining which of the derived tables belong
 
2007
    to this substatement we rely on the ability of substatements to
 
2008
    save/restore session->derived_tables during their execution.
 
2009
 
 
2010
    TODO: Probably even better approach is to simply associate list of
 
2011
          derived tables with (sub-)statement instead of thread and destroy
 
2012
          them at the end of its execution.
 
2013
  */
 
2014
  if (derived_tables)
 
2015
  {
 
2016
    Table *next;
 
2017
    /*
 
2018
      Close all derived tables generated in queries like
 
2019
      SELECT * FROM (SELECT * FROM t1)
 
2020
    */
 
2021
    for (table= derived_tables ; table ; table= next)
 
2022
    {
 
2023
      next= table->next;
 
2024
      table->free_tmp_table(this);
 
2025
    }
 
2026
    derived_tables= 0;
 
2027
  }
1887
2028
 
1888
2029
  /*
1889
2030
    Mark all temporary tables used by this statement as free for reuse.
1897
2038
    does not belong to statement for which we do close_thread_tables()).
1898
2039
    TODO: This should be fixed in later releases.
1899
2040
   */
 
2041
  if (backups_available == false)
1900
2042
  {
1901
 
    TransactionServices &transaction_services= TransactionServices::singleton();
1902
2043
    main_da.can_overwrite_status= true;
1903
 
    transaction_services.autocommitOrRollback(this, is_error());
 
2044
    ha_autocommit_or_rollback(this, is_error());
1904
2045
    main_da.can_overwrite_status= false;
1905
2046
    transaction.stmt.reset();
1906
2047
  }
1916
2057
      handled either before writing a query log event (inside
1917
2058
      binlog_query()) or when preparing a pending event.
1918
2059
     */
1919
 
    unlockTables(lock);
 
2060
    mysql_unlock_tables(this, lock);
1920
2061
    lock= 0;
1921
2062
  }
1922
2063
  /*
1923
 
    Note that we need to hold table::Cache::singleton().mutex() while changing the
 
2064
    Note that we need to hold LOCK_open while changing the
1924
2065
    open_tables list. Another thread may work on it.
1925
 
    (See: table::Cache::singleton().removeTable(), mysql_wait_completed_table())
 
2066
    (See: remove_table_from_cache(), mysql_wait_completed_table())
1926
2067
    Closing a MERGE child before the parent would be fatal if the
1927
2068
    other thread tries to abort the MERGE lock in between.
1928
2069
  */
1954
2095
    if (open_tables_from_list(&tables, &counter))
1955
2096
      return true;
1956
2097
 
1957
 
    if (not lock_tables(tables, counter, &need_reopen))
 
2098
    if (!lock_tables(tables, counter, &need_reopen))
1958
2099
      break;
1959
 
    if (not need_reopen)
 
2100
    if (!need_reopen)
1960
2101
      return true;
1961
2102
    close_tables_for_reopen(&tables);
1962
2103
  }
1963
2104
  if ((mysql_handle_derived(lex, &mysql_derived_prepare) ||
1964
 
       (
 
2105
       (fill_derived_tables() &&
1965
2106
        mysql_handle_derived(lex, &mysql_derived_filling))))
1966
2107
    return true;
1967
2108
 
1968
2109
  return false;
1969
2110
}
1970
2111
 
1971
 
/*
1972
 
  @note "best_effort" is used in cases were if a failure occurred on this
1973
 
  operation it would not be surprising because we are only removing because there
1974
 
  might be an issue (lame engines).
1975
 
*/
1976
 
 
1977
 
bool Open_tables_state::rm_temporary_table(const TableIdentifier &identifier, bool best_effort)
1978
 
{
1979
 
  if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), identifier))
1980
 
  {
1981
 
    if (not best_effort)
1982
 
    {
1983
 
      std::string path;
1984
 
      identifier.getSQLPath(path);
1985
 
      errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
1986
 
                    path.c_str(), errno);
1987
 
    }
1988
 
 
1989
 
    return true;
1990
 
  }
1991
 
 
1992
 
  return false;
1993
 
}
1994
 
 
1995
 
bool Open_tables_state::rm_temporary_table(plugin::StorageEngine *base, const TableIdentifier &identifier)
1996
 
{
1997
 
  assert(base);
1998
 
 
1999
 
  if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), *base, identifier))
2000
 
  {
2001
 
    std::string path;
2002
 
    identifier.getSQLPath(path);
2003
 
    errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
2004
 
                  path.c_str(), errno);
2005
 
 
2006
 
    return true;
2007
 
  }
2008
 
 
2009
 
  return false;
2010
 
}
2011
 
 
2012
 
/**
2013
 
  @note this will be removed, I am looking through Hudson to see if it is finding
2014
 
  any tables that are missed during cleanup.
2015
 
*/
2016
 
void Open_tables_state::dumpTemporaryTableNames(const char *foo)
2017
 
{
2018
 
  Table *table;
2019
 
 
2020
 
  if (not temporary_tables)
2021
 
    return;
2022
 
 
2023
 
  cerr << "Begin Run: " << foo << "\n";
2024
 
  for (table= temporary_tables; table; table= table->getNext())
2025
 
  {
2026
 
    bool have_proto= false;
2027
 
 
2028
 
    message::Table *proto= table->getShare()->getTableProto();
2029
 
    if (table->getShare()->getTableProto())
2030
 
      have_proto= true;
2031
 
 
2032
 
    const char *answer= have_proto ? "true" : "false";
2033
 
 
2034
 
    if (have_proto)
2035
 
    {
2036
 
      cerr << "\tTable Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2037
 
      cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
2038
 
    }
2039
 
    else
2040
 
    {
2041
 
      cerr << "\tTabl;e Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2042
 
    }
2043
 
  }
2044
 
}
2045
 
 
2046
 
bool Session::TableMessages::storeTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2047
 
{
2048
 
  table_message_cache.insert(make_pair(identifier.getPath(), table_message));
2049
 
 
2050
 
  return true;
2051
 
}
2052
 
 
2053
 
bool Session::TableMessages::removeTableMessage(const TableIdentifier &identifier)
2054
 
{
2055
 
  TableMessageCache::iterator iter;
2056
 
 
2057
 
  iter= table_message_cache.find(identifier.getPath());
2058
 
 
2059
 
  if (iter == table_message_cache.end())
2060
 
    return false;
2061
 
 
2062
 
  table_message_cache.erase(iter);
2063
 
 
2064
 
  return true;
2065
 
}
2066
 
 
2067
 
bool Session::TableMessages::getTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2068
 
{
2069
 
  TableMessageCache::iterator iter;
2070
 
 
2071
 
  iter= table_message_cache.find(identifier.getPath());
2072
 
 
2073
 
  if (iter == table_message_cache.end())
2074
 
    return false;
2075
 
 
2076
 
  table_message.CopyFrom(((*iter).second));
2077
 
 
2078
 
  return true;
2079
 
}
2080
 
 
2081
 
bool Session::TableMessages::doesTableMessageExist(const TableIdentifier &identifier)
2082
 
{
2083
 
  TableMessageCache::iterator iter;
2084
 
 
2085
 
  iter= table_message_cache.find(identifier.getPath());
2086
 
 
2087
 
  if (iter == table_message_cache.end())
2088
 
  {
2089
 
    return false;
2090
 
  }
2091
 
 
2092
 
  return true;
2093
 
}
2094
 
 
2095
 
bool Session::TableMessages::renameTableMessage(const TableIdentifier &from, const TableIdentifier &to)
2096
 
{
2097
 
  TableMessageCache::iterator iter;
2098
 
 
2099
 
  table_message_cache[to.getPath()]= table_message_cache[from.getPath()];
2100
 
 
2101
 
  iter= table_message_cache.find(to.getPath());
2102
 
 
2103
 
  if (iter == table_message_cache.end())
2104
 
  {
2105
 
    return false;
2106
 
  }
2107
 
 
2108
 
  (*iter).second.set_schema(to.getSchemaName());
2109
 
  (*iter).second.set_name(to.getTableName());
2110
 
 
2111
 
  return true;
2112
 
}
2113
 
 
2114
 
table::Instance *Session::getInstanceTable()
2115
 
{
2116
 
  temporary_shares.push_back(new table::Instance()); // This will not go into the tableshare cache, so no key is used.
2117
 
 
2118
 
  table::Instance *tmp_share= temporary_shares.back();
2119
 
 
2120
 
  assert(tmp_share);
2121
 
 
2122
 
  return tmp_share;
2123
 
}
2124
 
 
2125
 
 
2126
 
/**
2127
 
  Create a reduced Table object with properly set up Field list from a
2128
 
  list of field definitions.
2129
 
 
2130
 
    The created table doesn't have a table Cursor associated with
2131
 
    it, has no keys, no group/distinct, no copy_funcs array.
2132
 
    The sole purpose of this Table object is to use the power of Field
2133
 
    class to read/write data to/from table->getInsertRecord(). Then one can store
2134
 
    the record in any container (RB tree, hash, etc).
2135
 
    The table is created in Session mem_root, so are the table's fields.
2136
 
    Consequently, if you don't BLOB fields, you don't need to free it.
2137
 
 
2138
 
  @param session         connection handle
2139
 
  @param field_list  list of column definitions
2140
 
 
2141
 
  @return
2142
 
    0 if out of memory, Table object in case of success
2143
 
*/
2144
 
table::Instance *Session::getInstanceTable(List<CreateField> &field_list)
2145
 
{
2146
 
  temporary_shares.push_back(new table::Instance(this, field_list)); // This will not go into the tableshare cache, so no key is used.
2147
 
 
2148
 
  table::Instance *tmp_share= temporary_shares.back();
2149
 
 
2150
 
  assert(tmp_share);
2151
 
 
2152
 
  return tmp_share;
2153
 
}
2154
 
 
2155
 
namespace display  {
2156
 
 
2157
 
static const std::string NONE= "NONE";
2158
 
static const std::string GOT_GLOBAL_READ_LOCK= "HAS GLOBAL READ LOCK";
2159
 
static const std::string MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT= "HAS GLOBAL READ LOCK WITH BLOCKING COMMIT";
2160
 
 
2161
 
const std::string &type(drizzled::Session::global_read_lock_t type)
2162
 
{
2163
 
  switch (type) {
2164
 
    default:
2165
 
    case Session::NONE:
2166
 
      return NONE;
2167
 
    case Session::GOT_GLOBAL_READ_LOCK:
2168
 
      return GOT_GLOBAL_READ_LOCK;
2169
 
    case Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT:
2170
 
      return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT;
2171
 
  }
2172
 
}
2173
 
 
2174
 
size_t max_string_length(drizzled::Session::global_read_lock_t)
2175
 
{
2176
 
  return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT.size();
2177
 
}
2178
 
 
2179
 
} /* namespace display */
2180
 
 
2181
 
} /* namespace drizzled */
 
2112
bool Session::openTables(TableList *tables, uint32_t flags)
 
2113
{
 
2114
  uint32_t counter;
 
2115
  bool ret= fill_derived_tables();
 
2116
  assert(ret == false);
 
2117
  if (open_tables_from_list(&tables, &counter, flags) ||
 
2118
      mysql_handle_derived(lex, &mysql_derived_prepare))
 
2119
    return true;
 
2120
  return false;
 
2121
}
 
2122
 
 
2123
bool Session::rm_temporary_table(plugin::StorageEngine *base, TableIdentifier &identifier)
 
2124
{
 
2125
  bool error= false;
 
2126
 
 
2127
  assert(base);
 
2128
 
 
2129
  if (delete_table_proto_file(identifier.getPath()))
 
2130
    error= true;
 
2131
 
 
2132
  if (base->doDropTable(*this, identifier.getPath()))
 
2133
  {
 
2134
    error= true;
 
2135
    errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
 
2136
                  identifier.getPath(), my_errno);
 
2137
  }
 
2138
  return error;
 
2139
}
 
2140
 
 
2141
bool Session::rm_temporary_table(plugin::StorageEngine *base, const char *path)
 
2142
{
 
2143
  bool error= false;
 
2144
 
 
2145
  assert(base);
 
2146
 
 
2147
  if (delete_table_proto_file(path))
 
2148
    error= true;
 
2149
 
 
2150
  if (base->doDropTable(*this, path))
 
2151
  {
 
2152
    error= true;
 
2153
    errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
 
2154
                  path, my_errno);
 
2155
  }
 
2156
  return error;
 
2157
}
 
2158
 
 
2159