~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/session.cc

  • Committer: Brian Aker
  • Date: 2010-05-17 23:12:39 UTC
  • Revision ID: brian@gaz-20100517231239-kgcnn613z0gga7ie
Code shuffle on ReadRecord

Show diffs side-by-side

added added

removed removed

Lines of Context:
22
22
 */
23
23
 
24
24
#include "config.h"
25
 
#include "drizzled/session.h"
26
 
#include "drizzled/session/cache.h"
 
25
#include <drizzled/session.h>
 
26
#include "drizzled/session_list.h"
27
27
#include <sys/stat.h>
28
 
#include "drizzled/error.h"
29
 
#include "drizzled/gettext.h"
30
 
#include "drizzled/query_id.h"
31
 
#include "drizzled/data_home.h"
32
 
#include "drizzled/sql_base.h"
33
 
#include "drizzled/lock.h"
34
 
#include "drizzled/item/cache.h"
35
 
#include "drizzled/item/float.h"
36
 
#include "drizzled/item/return_int.h"
37
 
#include "drizzled/item/empty_string.h"
38
 
#include "drizzled/show.h"
39
 
#include "drizzled/plugin/client.h"
 
28
#include <drizzled/error.h>
 
29
#include <drizzled/gettext.h>
 
30
#include <drizzled/query_id.h>
 
31
#include <drizzled/data_home.h>
 
32
#include <drizzled/sql_base.h>
 
33
#include <drizzled/lock.h>
 
34
#include <drizzled/item/cache.h>
 
35
#include <drizzled/item/float.h>
 
36
#include <drizzled/item/return_int.h>
 
37
#include <drizzled/item/empty_string.h>
 
38
#include <drizzled/show.h>
 
39
#include <drizzled/plugin/client.h>
40
40
#include "drizzled/plugin/scheduler.h"
41
41
#include "drizzled/plugin/authentication.h"
42
42
#include "drizzled/plugin/logging.h"
43
43
#include "drizzled/plugin/transactional_storage_engine.h"
44
 
#include "drizzled/plugin/query_rewrite.h"
45
44
#include "drizzled/probes.h"
46
45
#include "drizzled/table_proto.h"
47
46
#include "drizzled/db.h"
49
48
#include "drizzled/transaction_services.h"
50
49
#include "drizzled/drizzled.h"
51
50
 
52
 
#include "drizzled/table/instance.h"
53
 
 
54
51
#include "plugin/myisam/myisam.h"
55
52
#include "drizzled/internal/iocache.h"
56
 
#include "drizzled/internal/thread_var.h"
57
 
#include "drizzled/plugin/event_observer.h"
58
 
 
59
 
#include "drizzled/util/functors.h"
60
 
 
61
 
#include "drizzled/display.h"
62
53
 
63
54
#include <fcntl.h>
64
55
#include <algorithm>
65
56
#include <climits>
66
 
#include <boost/filesystem.hpp>
67
 
 
68
 
#include "drizzled/util/backtrace.h"
69
57
 
70
58
using namespace std;
71
 
 
72
 
namespace fs=boost::filesystem;
73
59
namespace drizzled
74
60
{
75
61
 
81
67
char empty_c_string[1]= {0};    /* used for not defined db */
82
68
 
83
69
const char * const Session::DEFAULT_WHERE= "field list";
 
70
extern pthread_key_t THR_Session;
 
71
extern pthread_key_t THR_Mem_root;
 
72
 
 
73
 
 
74
/****************************************************************************
 
75
** User variables
 
76
****************************************************************************/
 
77
static unsigned char *get_var_key(user_var_entry *entry, size_t *length, bool)
 
78
{
 
79
  *length= entry->name.length;
 
80
  return (unsigned char*) entry->name.str;
 
81
}
 
82
 
 
83
static void free_user_var(user_var_entry *entry)
 
84
{
 
85
  delete entry;
 
86
}
84
87
 
85
88
bool Key_part_spec::operator==(const Key_part_spec& other) const
86
89
{
87
90
  return length == other.length &&
88
91
         field_name.length == other.field_name.length &&
89
 
    !my_strcasecmp(system_charset_info, field_name.str, other.field_name.str);
 
92
         !strcmp(field_name.str, other.field_name.str);
90
93
}
91
94
 
92
 
Open_tables_state::Open_tables_state(uint64_t version_arg) :
93
 
  version(version_arg)
 
95
Open_tables_state::Open_tables_state(uint64_t version_arg)
 
96
  :version(version_arg), backups_available(false)
94
97
{
95
 
  open_tables= temporary_tables= derived_tables= NULL;
96
 
  extra_lock= lock= NULL;
 
98
  reset_open_tables_state();
97
99
}
98
100
 
99
101
/*
102
104
int mysql_tmpfile(const char *prefix)
103
105
{
104
106
  char filename[FN_REFLEN];
105
 
  int fd = internal::create_temp_file(filename, drizzle_tmpdir.c_str(), prefix, MYF(MY_WME));
 
107
  int fd = internal::create_temp_file(filename, drizzle_tmpdir, prefix, MYF(MY_WME));
106
108
  if (fd >= 0) {
107
109
    unlink(filename);
108
110
  }
154
156
  return (int) session->lex->sql_command;
155
157
}
156
158
 
157
 
enum_tx_isolation session_tx_isolation(const Session *session)
 
159
int session_tx_isolation(const Session *session)
158
160
{
159
 
  return (enum_tx_isolation)session->variables.tx_isolation;
 
161
  return (int) session->variables.tx_isolation;
160
162
}
161
163
 
162
 
Session::Session(plugin::Client *client_arg) :
 
164
Session::Session(plugin::Client *client_arg)
 
165
  :
163
166
  Open_tables_state(refresh_version),
164
167
  mem_root(&main_mem_root),
165
 
  xa_id(0),
166
168
  lex(&main_lex),
167
 
  query(new std::string),
168
 
  _schema(new std::string("")),
169
 
  catalog("LOCAL"),
 
169
  query(),
170
170
  client(client_arg),
171
171
  scheduler(NULL),
172
172
  scheduler_arg(NULL),
173
173
  lock_id(&main_lock_id),
174
174
  user_time(0),
175
175
  ha_data(plugin::num_trx_monitored_objects),
176
 
  concurrent_execute_allowed(true),
177
176
  arg_of_last_insert_id_function(false),
178
177
  first_successful_insert_id_in_prev_stmt(0),
179
178
  first_successful_insert_id_in_cur_stmt(0),
180
179
  limit_found_rows(0),
181
 
  _global_read_lock(NONE),
182
 
  _killed(NOT_KILLED),
 
180
  global_read_lock(0),
183
181
  some_tables_deleted(false),
184
182
  no_errors(false),
185
183
  password(false),
191
189
  m_lip(NULL),
192
190
  cached_table(0),
193
191
  transaction_message(NULL),
194
 
  statement_message(NULL),
195
 
  session_event_observers(NULL),
196
 
  use_usage(false)
 
192
  statement_message(NULL)
197
193
{
 
194
  memset(process_list_info, 0, PROCESS_LIST_WIDTH);
198
195
  client->setSession(this);
199
196
 
200
197
  /*
204
201
  */
205
202
  memory::init_sql_alloc(&main_mem_root, memory::ROOT_MIN_BLOCK_SIZE, 0);
206
203
  thread_stack= NULL;
207
 
  count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
 
204
  count_cuted_fields= CHECK_FIELD_IGNORE;
 
205
  killed= NOT_KILLED;
208
206
  col_access= 0;
209
207
  tmp_table= 0;
210
208
  used_tables= 0;
222
220
  query_id= 0;
223
221
  warn_query_id= 0;
224
222
  mysys_var= 0;
225
 
  scoreboard_index= -1;
226
223
  dbug_sentry=Session_SENTRY_MAGIC;
227
 
  cleanup_done= abort_on_warning= no_warnings_for_error= false;  
228
 
 
229
 
  /* query_cache init */
230
 
  query_cache_key= "";
231
 
  resultset= NULL;
 
224
  cleanup_done= abort_on_warning= no_warnings_for_error= false;
 
225
  pthread_mutex_init(&LOCK_delete, MY_MUTEX_INIT_FAST);
232
226
 
233
227
  /* Variables with default values */
234
228
  proc_info="login";
260
254
 
261
255
  /* Initialize sub structures */
262
256
  memory::init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
 
257
  hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0,
 
258
            (hash_get_key) get_var_key,
 
259
            (hash_free_key) free_user_var, 0);
263
260
 
264
261
  substitute_null_with_insert_id = false;
265
 
  lock_info.init(); /* safety: will be reset after start */
 
262
  thr_lock_info_init(&lock_info); /* safety: will be reset after start */
266
263
  thr_lock_owner_init(&main_lock_id, &lock_info);
267
264
 
268
265
  m_internal_handler= NULL;
269
 
  
270
 
  plugin::EventObserver::registerSessionEvents(*this); 
271
266
}
272
267
 
273
268
void Session::free_items()
302
297
  return false;                                 // 'false', as per coding style
303
298
}
304
299
 
305
 
void Session::setAbort(bool arg)
306
 
{
307
 
  mysys_var->abort= arg;
308
 
}
309
 
 
310
 
void Session::lockOnSys()
311
 
{
312
 
  if (not mysys_var)
313
 
    return;
314
 
 
315
 
  setAbort(true);
316
 
  boost_unique_lock_t scopedLock(mysys_var->mutex);
317
 
  if (mysys_var->current_cond)
318
 
  {
319
 
    mysys_var->current_mutex->lock();
320
 
    mysys_var->current_cond->notify_all();
321
 
    mysys_var->current_mutex->unlock();
322
 
  }
323
 
}
324
 
 
325
300
void Session::pop_internal_handler()
326
301
{
327
302
  assert(m_internal_handler != NULL);
339
314
{
340
315
  assert(cleanup_done == false);
341
316
 
342
 
  setKilled(KILL_CONNECTION);
 
317
  killed= KILL_CONNECTION;
343
318
#ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE
344
319
  if (transaction.xid_state.xa_state == XA_PREPARED)
345
320
  {
351
326
    transaction_services.rollbackTransaction(this, true);
352
327
    xid_cache_delete(&transaction.xid_state);
353
328
  }
354
 
 
355
 
  for (UserVars::iterator iter= user_vars.begin();
356
 
       iter != user_vars.end();
357
 
       iter++)
358
 
  {
359
 
    user_var_entry *entry= (*iter).second;
360
 
    delete entry;
361
 
  }
362
 
  user_vars.clear();
363
 
 
364
 
 
 
329
  hash_free(&user_vars);
365
330
  close_temporary_tables();
366
331
 
367
332
  if (global_read_lock)
368
 
  {
369
 
    unlockGlobalReadLock();
370
 
  }
 
333
    unlock_global_read_lock(this);
371
334
 
372
335
  cleanup_done= true;
373
336
}
375
338
Session::~Session()
376
339
{
377
340
  this->checkSentry();
 
341
  add_to_status(&global_status_var, &status_var);
378
342
 
379
343
  if (client->isConnected())
380
344
  {
401
365
  dbug_sentry= Session_SENTRY_GONE;
402
366
 
403
367
  main_mem_root.free_root(MYF(0));
404
 
  currentMemRoot().release();
405
 
  currentSession().release();
 
368
  pthread_setspecific(THR_Session,  0);
406
369
 
407
370
  plugin::Logging::postEndDo(this);
408
 
  plugin::EventObserver::deregisterSessionEvents(*this); 
409
 
 
410
 
  for (PropertyMap::iterator iter= life_properties.begin(); iter != life_properties.end(); iter++)
411
 
  {
412
 
    delete (*iter).second;
413
 
  }
414
 
  life_properties.clear();
415
 
}
416
 
 
417
 
void Session::setClient(plugin::Client *client_arg)
418
 
{
419
 
  client= client_arg;
420
 
  client->setSession(this);
421
 
}
422
 
 
423
 
void Session::awake(Session::killed_state_t state_to_set)
424
 
{
425
 
  if ((state_to_set == Session::KILL_QUERY) and (command == COM_SLEEP))
426
 
    return;
427
 
 
 
371
 
 
372
  /* Ensure that no one is using Session */
 
373
  pthread_mutex_unlock(&LOCK_delete);
 
374
  pthread_mutex_destroy(&LOCK_delete);
 
375
}
 
376
 
 
377
/*
 
378
  Add all status variables to another status variable array
 
379
 
 
380
  SYNOPSIS
 
381
   add_to_status()
 
382
   to_var       add to this array
 
383
   from_var     from this array
 
384
 
 
385
  NOTES
 
386
    This function assumes that all variables are long/ulong.
 
387
    If this assumption will change, then we have to explictely add
 
388
    the other variables after the while loop
 
389
*/
 
390
void add_to_status(system_status_var *to_var, system_status_var *from_var)
 
391
{
 
392
  ulong *end= (ulong*) ((unsigned char*) to_var +
 
393
                        offsetof(system_status_var, last_system_status_var) +
 
394
                        sizeof(ulong));
 
395
  ulong *to= (ulong*) to_var, *from= (ulong*) from_var;
 
396
 
 
397
  while (to != end)
 
398
    *(to++)+= *(from++);
 
399
}
 
400
 
 
401
/*
 
402
  Add the difference between two status variable arrays to another one.
 
403
 
 
404
  SYNOPSIS
 
405
    add_diff_to_status
 
406
    to_var       add to this array
 
407
    from_var     from this array
 
408
    dec_var      minus this array
 
409
 
 
410
  NOTE
 
411
    This function assumes that all variables are long/ulong.
 
412
*/
 
413
void add_diff_to_status(system_status_var *to_var, system_status_var *from_var,
 
414
                        system_status_var *dec_var)
 
415
{
 
416
  ulong *end= (ulong*) ((unsigned char*) to_var + offsetof(system_status_var,
 
417
                                                  last_system_status_var) +
 
418
                        sizeof(ulong));
 
419
  ulong *to= (ulong*) to_var, *from= (ulong*) from_var, *dec= (ulong*) dec_var;
 
420
 
 
421
  while (to != end)
 
422
    *(to++)+= *(from++) - *(dec++);
 
423
}
 
424
 
 
425
void Session::awake(Session::killed_state state_to_set)
 
426
{
428
427
  this->checkSentry();
429
 
 
430
 
  setKilled(state_to_set);
431
 
  scheduler->killSession(this);
432
 
 
 
428
  safe_mutex_assert_owner(&LOCK_delete);
 
429
 
 
430
  killed= state_to_set;
433
431
  if (state_to_set != Session::KILL_QUERY)
434
432
  {
 
433
    scheduler->killSession(this);
435
434
    DRIZZLE_CONNECTION_DONE(thread_id);
436
435
  }
437
 
 
438
436
  if (mysys_var)
439
437
  {
440
 
    boost_unique_lock_t scopedLock(mysys_var->mutex);
 
438
    pthread_mutex_lock(&mysys_var->mutex);
441
439
    /*
442
 
      "
443
440
      This broadcast could be up in the air if the victim thread
444
441
      exits the cond in the time between read and broadcast, but that is
445
442
      ok since all we want to do is to make the victim thread get out
460
457
    */
461
458
    if (mysys_var->current_cond && mysys_var->current_mutex)
462
459
    {
463
 
      mysys_var->current_mutex->lock();
464
 
      mysys_var->current_cond->notify_all();
465
 
      mysys_var->current_mutex->unlock();
 
460
      pthread_mutex_lock(mysys_var->current_mutex);
 
461
      pthread_cond_broadcast(mysys_var->current_cond);
 
462
      pthread_mutex_unlock(mysys_var->current_mutex);
466
463
    }
 
464
    pthread_mutex_unlock(&mysys_var->mutex);
467
465
  }
468
466
}
469
467
 
479
477
  */
480
478
  assert(thread_stack);
481
479
 
482
 
  currentSession().release();
483
 
  currentSession().reset(this);
484
 
 
485
 
  currentMemRoot().release();
486
 
  currentMemRoot().reset(&mem_root);
 
480
  if (pthread_setspecific(THR_Session,  this) ||
 
481
      pthread_setspecific(THR_Mem_root, &mem_root))
 
482
    return true;
487
483
 
488
484
  mysys_var=my_thread_var;
489
485
 
492
488
    This allows us to move Session to different threads if needed.
493
489
  */
494
490
  mysys_var->id= thread_id;
 
491
  real_id= pthread_self();                      // For debugging
495
492
 
496
493
  /*
497
494
    We have to call thr_lock_info_init() again here as Session may have been
498
495
    created in another thread
499
496
  */
500
 
  lock_info.init();
501
 
 
 
497
  thr_lock_info_init(&lock_info);
502
498
  return false;
503
499
}
504
500
 
522
518
                                variables.query_prealloc_size);
523
519
  transaction.xid_state.xid.null();
524
520
  transaction.xid_state.in_session=1;
525
 
  if (use_usage)
526
 
    resetUsage();
527
521
}
528
522
 
529
523
bool Session::initGlobals()
531
525
  if (storeGlobals())
532
526
  {
533
527
    disconnect(ER_OUT_OF_RESOURCES, true);
534
 
    status_var.aborted_connects++;
 
528
    statistic_increment(aborted_connects, &LOCK_status);
535
529
    return true;
536
530
  }
537
531
  return false;
547
541
 
548
542
  prepareForQueries();
549
543
 
550
 
  while (not client->haveError() && getKilled() != KILL_CONNECTION)
 
544
  while (! client->haveError() && killed != KILL_CONNECTION)
551
545
  {
552
 
    if (not executeStatement())
 
546
    if (! executeStatement())
553
547
      break;
554
548
  }
555
549
 
556
550
  disconnect(0, true);
557
551
}
558
552
 
559
 
bool Session::schedule(Session::shared_ptr &arg)
 
553
bool Session::schedule()
560
554
{
561
 
  arg->scheduler= plugin::Scheduler::getScheduler();
562
 
  assert(arg->scheduler);
 
555
  scheduler= plugin::Scheduler::getScheduler();
 
556
  assert(scheduler);
563
557
 
564
558
  connection_count.increment();
565
559
 
566
 
  if (connection_count > current_global_counters.max_used_connections)
567
 
  {
568
 
    current_global_counters.max_used_connections= connection_count;
569
 
  }
570
 
 
571
 
  current_global_counters.connections++;
572
 
  arg->thread_id= arg->variables.pseudo_thread_id= global_thread_id++;
573
 
 
574
 
  session::Cache::singleton().insert(arg);
575
 
 
576
 
  if (unlikely(plugin::EventObserver::connectSession(*arg)))
577
 
  {
578
 
    // We should do something about an error...
579
 
  }
580
 
 
581
 
  if (plugin::Scheduler::getScheduler()->addSession(arg))
582
 
  {
583
 
    DRIZZLE_CONNECTION_START(arg->getSessionId());
 
560
  if (connection_count > max_used_connections)
 
561
    max_used_connections= connection_count;
 
562
 
 
563
  thread_id= variables.pseudo_thread_id= global_thread_id++;
 
564
 
 
565
  pthread_mutex_lock(&LOCK_thread_count);
 
566
  getSessionList().push_back(this);
 
567
  pthread_mutex_unlock(&LOCK_thread_count);
 
568
 
 
569
  if (scheduler->addSession(this))
 
570
  {
 
571
    DRIZZLE_CONNECTION_START(thread_id);
584
572
    char error_message_buff[DRIZZLE_ERRMSG_SIZE];
585
573
 
586
 
    arg->setKilled(Session::KILL_CONNECTION);
 
574
    killed= Session::KILL_CONNECTION;
587
575
 
588
 
    arg->status_var.aborted_connects++;
 
576
    statistic_increment(aborted_connects, &LOCK_status);
589
577
 
590
578
    /* Can't use my_error() since store_globals has not been called. */
591
579
    /* TODO replace will better error message */
592
580
    snprintf(error_message_buff, sizeof(error_message_buff),
593
581
             ER(ER_CANT_CREATE_THREAD), 1);
594
 
    arg->client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
595
 
 
 
582
    client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
596
583
    return true;
597
584
  }
598
585
 
600
587
}
601
588
 
602
589
 
603
 
/*
604
 
  Is this session viewable by the current user?
605
 
*/
606
 
bool Session::isViewable() const
607
 
{
608
 
  return plugin::Authorization::isAuthorized(current_session->getSecurityContext(),
609
 
                                             this,
610
 
                                             false);
611
 
}
612
 
 
613
 
 
614
 
const char* Session::enter_cond(boost::condition_variable_any &cond, boost::mutex &mutex, const char* msg)
 
590
const char* Session::enter_cond(pthread_cond_t *cond,
 
591
                                pthread_mutex_t* mutex,
 
592
                                const char* msg)
615
593
{
616
594
  const char* old_msg = get_proc_info();
617
595
  safe_mutex_assert_owner(mutex);
618
 
  mysys_var->current_mutex = &mutex;
619
 
  mysys_var->current_cond = &cond;
 
596
  mysys_var->current_mutex = mutex;
 
597
  mysys_var->current_cond = cond;
620
598
  this->set_proc_info(msg);
621
599
  return old_msg;
622
600
}
629
607
    locked (if that would not be the case, you'll get a deadlock if someone
630
608
    does a Session::awake() on you).
631
609
  */
632
 
  mysys_var->current_mutex->unlock();
633
 
  boost_unique_lock_t scopedLock(mysys_var->mutex);
 
610
  pthread_mutex_unlock(mysys_var->current_mutex);
 
611
  pthread_mutex_lock(&mysys_var->mutex);
634
612
  mysys_var->current_mutex = 0;
635
613
  mysys_var->current_cond = 0;
636
614
  this->set_proc_info(old_msg);
 
615
  pthread_mutex_unlock(&mysys_var->mutex);
637
616
}
638
617
 
639
618
bool Session::authenticate()
640
619
{
641
 
  lex->start(this);
 
620
  lex_start(this);
642
621
  if (client->authenticate())
643
622
    return false;
644
623
 
645
 
  status_var.aborted_connects++;
646
 
 
 
624
  statistic_increment(aborted_connects, &LOCK_status);
647
625
  return true;
648
626
}
649
627
 
650
 
bool Session::checkUser(const std::string &passwd_str,
651
 
                        const std::string &in_db)
 
628
bool Session::checkUser(const char *passwd, uint32_t passwd_len, const char *in_db)
652
629
{
 
630
  const string passwd_str(passwd, passwd_len);
653
631
  bool is_authenticated=
654
632
    plugin::Authentication::isAuthenticated(getSecurityContext(),
655
633
                                            passwd_str);
656
634
 
657
635
  if (is_authenticated != true)
658
636
  {
659
 
    status_var.access_denied++;
660
637
    /* isAuthenticated has pushed the error message */
661
638
    return false;
662
639
  }
663
640
 
664
641
  /* Change database if necessary */
665
 
  if (not in_db.empty())
 
642
  if (in_db && in_db[0])
666
643
  {
667
644
    SchemaIdentifier identifier(in_db);
668
645
    if (mysql_change_db(this, identifier))
672
649
    }
673
650
  }
674
651
  my_ok();
675
 
  password= not passwd_str.empty();
 
652
  password= test(passwd_len);          // remember for error messages
676
653
 
677
654
  /* Ready to handle queries */
678
655
  return true;
694
671
  main_da.reset_diagnostics_area();
695
672
 
696
673
  if (client->readCommand(&l_packet, &packet_length) == false)
697
 
  {
698
 
    return false;
699
 
  }
700
 
 
701
 
  if (getKilled() == KILL_CONNECTION)
702
674
    return false;
703
675
 
704
676
  if (packet_length == 0)
705
677
    return true;
706
678
 
707
 
  l_command= static_cast<enum_server_command>(l_packet[0]);
 
679
  l_command= (enum enum_server_command) (unsigned char) l_packet[0];
708
680
 
709
681
  if (command >= COM_END)
710
682
    command= COM_END;                           // Wrong command
711
683
 
712
684
  assert(packet_length);
713
 
  return not dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
 
685
  return ! dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
714
686
}
715
687
 
716
688
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
722
694
    in_packet_length--;
723
695
  }
724
696
  const char *pos= in_packet + in_packet_length; /* Point at end null */
725
 
  while (in_packet_length > 0 && (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
 
697
  while (in_packet_length > 0 &&
 
698
         (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
726
699
  {
727
700
    pos--;
728
701
    in_packet_length--;
729
702
  }
730
703
 
731
 
  std::string *new_query= new std::string(in_packet, in_packet + in_packet_length);
732
 
  // We can not be entirely sure _schema has a value
733
 
  if (_schema)
734
 
  {
735
 
    plugin::QueryRewriter::rewriteQuery(*_schema, *new_query);
736
 
  }
737
 
  query.reset(new_query);
738
 
  _state.reset(new State(in_packet, in_packet_length));
 
704
  query.assign(in_packet, in_packet + in_packet_length);
739
705
 
740
706
  return true;
741
707
}
790
756
  }
791
757
 
792
758
  if (result == false)
793
 
  {
794
759
    my_error(killed_errno(), MYF(0));
795
 
  }
796
760
  else if ((result == true) && do_release)
797
 
  {
798
 
    setKilled(Session::KILL_CONNECTION);
799
 
  }
 
761
    killed= Session::KILL_CONNECTION;
800
762
 
801
763
  return result;
802
764
}
865
827
  free_items();
866
828
  /* Reset where. */
867
829
  where= Session::DEFAULT_WHERE;
868
 
 
869
 
  /* Reset the temporary shares we built */
870
 
  for_each(temporary_shares.begin(),
871
 
           temporary_shares.end(),
872
 
           DeletePtr());
873
 
  temporary_shares.clear();
874
830
}
875
831
 
876
832
/**
955
911
  my_message(errcode, err, MYF(0));
956
912
  if (file > 0)
957
913
  {
958
 
    (void) cache->end_io_cache();
 
914
    (void) end_io_cache(cache);
959
915
    (void) internal::my_close(file, MYF(0));
960
 
    (void) internal::my_delete(path.file_string().c_str(), MYF(0));             // Delete file on error
 
916
    (void) internal::my_delete(path, MYF(0));           // Delete file on error
961
917
    file= -1;
962
918
  }
963
919
}
965
921
 
966
922
bool select_to_file::send_eof()
967
923
{
968
 
  int error= test(cache->end_io_cache());
 
924
  int error= test(end_io_cache(cache));
969
925
  if (internal::my_close(file, MYF(MY_WME)))
970
926
    error= 1;
971
927
  if (!error)
987
943
  /* In case of error send_eof() may be not called: close the file here. */
988
944
  if (file >= 0)
989
945
  {
990
 
    (void) cache->end_io_cache();
 
946
    (void) end_io_cache(cache);
991
947
    (void) internal::my_close(file, MYF(0));
992
948
    file= -1;
993
949
  }
994
 
  path= "";
 
950
  path[0]= '\0';
995
951
  row_count= 0;
996
952
}
997
953
 
1001
957
    cache(static_cast<internal::IO_CACHE *>(memory::sql_calloc(sizeof(internal::IO_CACHE)))),
1002
958
    row_count(0L)
1003
959
{
1004
 
  path= "";
 
960
  path[0]=0;
1005
961
}
1006
962
 
1007
963
select_to_file::~select_to_file()
1035
991
*/
1036
992
 
1037
993
 
1038
 
static int create_file(Session *session,
1039
 
                       fs::path &target_path,
1040
 
                       file_exchange *exchange,
1041
 
                       internal::IO_CACHE *cache)
 
994
static int create_file(Session *session, char *path, file_exchange *exchange, internal::IO_CACHE *cache)
1042
995
{
1043
 
  fs::path to_file(exchange->file_name);
1044
996
  int file;
1045
 
 
1046
 
  if (not to_file.has_root_directory())
 
997
  uint32_t option= MY_UNPACK_FILENAME | MY_RELATIVE_PATH;
 
998
 
 
999
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
 
1000
  option|= MY_REPLACE_DIR;                      // Force use of db directory
 
1001
#endif
 
1002
 
 
1003
  if (!internal::dirname_length(exchange->file_name))
1047
1004
  {
1048
 
    target_path= fs::system_complete(getDataHomeCatalog());
1049
 
    util::string::const_shared_ptr schema(session->schema());
1050
 
    if (schema and not schema->empty())
1051
 
    {
1052
 
      int count_elements= 0;
1053
 
      for (fs::path::iterator iter= to_file.begin();
1054
 
           iter != to_file.end();
1055
 
           ++iter, ++count_elements)
1056
 
      { }
1057
 
 
1058
 
      if (count_elements == 1)
1059
 
      {
1060
 
        target_path /= *schema;
1061
 
      }
1062
 
    }
1063
 
    target_path /= to_file;
 
1005
    strcpy(path, data_home_real);
 
1006
    if (! session->db.empty())
 
1007
      strncat(path, session->db.c_str(), FN_REFLEN-strlen(data_home_real)-1);
 
1008
    (void) internal::fn_format(path, exchange->file_name, path, "", option);
1064
1009
  }
1065
1010
  else
1066
 
  {
1067
 
    target_path = exchange->file_name;
1068
 
  }
1069
 
 
1070
 
  if (not secure_file_priv.string().empty())
1071
 
  {
1072
 
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
1073
 
    {
1074
 
      /* Write only allowed to dir or subdir specified by secure_file_priv */
1075
 
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
1076
 
      return -1;
1077
 
    }
1078
 
  }
1079
 
 
1080
 
  if (!access(target_path.file_string().c_str(), F_OK))
 
1011
    (void) internal::fn_format(path, exchange->file_name, data_home_real, "", option);
 
1012
 
 
1013
  if (opt_secure_file_priv &&
 
1014
      strncmp(opt_secure_file_priv, path, strlen(opt_secure_file_priv)))
 
1015
  {
 
1016
    /* Write only allowed to dir or subdir specified by secure_file_priv */
 
1017
    my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
1018
    return -1;
 
1019
  }
 
1020
 
 
1021
  if (!access(path, F_OK))
1081
1022
  {
1082
1023
    my_error(ER_FILE_EXISTS_ERROR, MYF(0), exchange->file_name);
1083
1024
    return -1;
1084
1025
  }
1085
1026
  /* Create the file world readable */
1086
 
  if ((file= internal::my_create(target_path.file_string().c_str(), 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
 
1027
  if ((file= internal::my_create(path, 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
1087
1028
    return file;
1088
1029
  (void) fchmod(file, 0666);                    // Because of umask()
1089
 
  if (cache->init_io_cache(file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
 
1030
  if (init_io_cache(cache, file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1090
1031
  {
1091
1032
    internal::my_close(file, MYF(0));
1092
 
    internal::my_delete(target_path.file_string().c_str(), MYF(0));  // Delete file on error, it was just created
 
1033
    internal::my_delete(path, MYF(0));  // Delete file on error, it was just created
1093
1034
    return -1;
1094
1035
  }
1095
1036
  return file;
1103
1044
  bool string_results= false, non_string_results= false;
1104
1045
  unit= u;
1105
1046
  if ((uint32_t) strlen(exchange->file_name) + NAME_LEN >= FN_REFLEN)
1106
 
  {
1107
 
    path= exchange->file_name;
1108
 
  }
 
1047
    strncpy(path,exchange->file_name,FN_REFLEN-1);
1109
1048
 
1110
1049
  /* Check if there is any blobs in data */
1111
1050
  {
1115
1054
    {
1116
1055
      if (item->max_length >= MAX_BLOB_WIDTH)
1117
1056
      {
1118
 
        blob_flag=1;
1119
 
        break;
 
1057
        blob_flag=1;
 
1058
        break;
1120
1059
      }
1121
 
 
1122
1060
      if (item->result_type() == STRING_RESULT)
1123
1061
        string_results= true;
1124
1062
      else
1169
1107
  if (unit->offset_limit_cnt)
1170
1108
  {                                             // using limit offset,count
1171
1109
    unit->offset_limit_cnt--;
1172
 
    return false;
 
1110
    return(0);
1173
1111
  }
1174
1112
  row_count++;
1175
1113
  Item *item;
1178
1116
 
1179
1117
  if (my_b_write(cache,(unsigned char*) exchange->line_start->ptr(),
1180
1118
                 exchange->line_start->length()))
1181
 
    return true;
1182
 
 
 
1119
    goto err;
1183
1120
  while ((item=li++))
1184
1121
  {
1185
1122
    Item_result result_type=item->result_type();
1190
1127
    {
1191
1128
      if (my_b_write(cache,(unsigned char*) exchange->enclosed->ptr(),
1192
1129
                     exchange->enclosed->length()))
1193
 
        return true;
 
1130
        goto err;
1194
1131
    }
1195
1132
    if (!res)
1196
1133
    {                                           // NULL
1201
1138
          null_buff[0]=escape_char;
1202
1139
          null_buff[1]='N';
1203
1140
          if (my_b_write(cache,(unsigned char*) null_buff,2))
1204
 
            return true;
 
1141
            goto err;
1205
1142
        }
1206
1143
        else if (my_b_write(cache,(unsigned char*) "NULL",4))
1207
 
          return true;
 
1144
          goto err;
1208
1145
      }
1209
1146
      else
1210
1147
      {
1214
1151
    else
1215
1152
    {
1216
1153
      if (fixed_row_size)
1217
 
        used_length= min(res->length(), static_cast<size_t>(item->max_length));
 
1154
        used_length= min(res->length(),item->max_length);
1218
1155
      else
1219
1156
        used_length= res->length();
1220
1157
 
1295
1232
            tmp_buff[1]= *pos ? *pos : '0';
1296
1233
            if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)) ||
1297
1234
                my_b_write(cache,(unsigned char*) tmp_buff,2))
1298
 
              return true;
 
1235
              goto err;
1299
1236
            start=pos+1;
1300
1237
          }
1301
1238
        }
1302
1239
        if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)))
1303
 
          return true;
 
1240
          goto err;
1304
1241
      }
1305
1242
      else if (my_b_write(cache,(unsigned char*) res->ptr(),used_length))
1306
 
        return true;
 
1243
        goto err;
1307
1244
    }
1308
1245
    if (fixed_row_size)
1309
1246
    {                                           // Fill with space
1319
1256
        for (; length > sizeof(space) ; length-=sizeof(space))
1320
1257
        {
1321
1258
          if (my_b_write(cache,(unsigned char*) space,sizeof(space)))
1322
 
            return true;
 
1259
            goto err;
1323
1260
        }
1324
1261
        if (my_b_write(cache,(unsigned char*) space,length))
1325
 
          return true;
 
1262
          goto err;
1326
1263
      }
1327
1264
    }
1328
1265
    if (res && enclosed)
1329
1266
    {
1330
1267
      if (my_b_write(cache, (unsigned char*) exchange->enclosed->ptr(),
1331
1268
                     exchange->enclosed->length()))
1332
 
        return true;
 
1269
        goto err;
1333
1270
    }
1334
1271
    if (--items_left)
1335
1272
    {
1336
1273
      if (my_b_write(cache, (unsigned char*) exchange->field_term->ptr(),
1337
1274
                     field_term_length))
1338
 
        return true;
 
1275
        goto err;
1339
1276
    }
1340
1277
  }
1341
1278
  if (my_b_write(cache,(unsigned char*) exchange->line_term->ptr(),
1342
1279
                 exchange->line_term->length()))
1343
 
  {
1344
 
    return true;
1345
 
  }
1346
 
 
1347
 
  return false;
 
1280
    goto err;
 
1281
  return(0);
 
1282
err:
 
1283
  return(1);
1348
1284
}
1349
1285
 
1350
1286
 
1377
1313
  if (row_count++ > 1)
1378
1314
  {
1379
1315
    my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0));
1380
 
    return 1;
 
1316
    goto err;
1381
1317
  }
1382
1318
  while ((item=li++))
1383
1319
  {
1385
1321
    if (!res)                                   // If NULL
1386
1322
    {
1387
1323
      if (my_b_write(cache,(unsigned char*) "",1))
1388
 
        return 1;
 
1324
        goto err;
1389
1325
    }
1390
1326
    else if (my_b_write(cache,(unsigned char*) res->ptr(),res->length()))
1391
1327
    {
1392
 
      my_error(ER_ERROR_ON_WRITE, MYF(0), path.file_string().c_str(), errno);
1393
 
      return 1;
 
1328
      my_error(ER_ERROR_ON_WRITE, MYF(0), path, errno);
 
1329
      goto err;
1394
1330
    }
1395
1331
  }
1396
1332
  return(0);
 
1333
err:
 
1334
  return(1);
1397
1335
}
1398
1336
 
1399
1337
 
1451
1389
      switch (val_item->result_type())
1452
1390
      {
1453
1391
      case REAL_RESULT:
1454
 
        op= &select_max_min_finder_subselect::cmp_real;
1455
 
        break;
 
1392
        op= &select_max_min_finder_subselect::cmp_real;
 
1393
        break;
1456
1394
      case INT_RESULT:
1457
 
        op= &select_max_min_finder_subselect::cmp_int;
1458
 
        break;
 
1395
        op= &select_max_min_finder_subselect::cmp_int;
 
1396
        break;
1459
1397
      case STRING_RESULT:
1460
 
        op= &select_max_min_finder_subselect::cmp_str;
1461
 
        break;
 
1398
        op= &select_max_min_finder_subselect::cmp_str;
 
1399
        break;
1462
1400
      case DECIMAL_RESULT:
1463
1401
        op= &select_max_min_finder_subselect::cmp_decimal;
1464
1402
        break;
1465
1403
      case ROW_RESULT:
1466
1404
        // This case should never be choosen
1467
 
        assert(0);
1468
 
        op= 0;
 
1405
        assert(0);
 
1406
        op= 0;
1469
1407
      }
1470
1408
    }
1471
1409
    cache->store(val_item);
1554
1492
void Session::end_statement()
1555
1493
{
1556
1494
  /* Cleanup SQL processing state to reuse this statement in next query. */
1557
 
  lex->end();
1558
 
  query_cache_key= ""; // reset the cache key
1559
 
  resetResultsetMessage();
 
1495
  lex_end(lex);
1560
1496
}
1561
1497
 
1562
1498
bool Session::copy_db_to(char **p_db, size_t *p_db_length)
1563
1499
{
1564
 
  assert(_schema);
1565
 
  if (_schema and _schema->empty())
1566
 
  {
1567
 
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1568
 
    return true;
1569
 
  }
1570
 
  else if (not _schema)
1571
 
  {
1572
 
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1573
 
    return true;
1574
 
  }
1575
 
  assert(_schema);
1576
 
 
1577
 
  *p_db= strmake(_schema->c_str(), _schema->size());
1578
 
  *p_db_length= _schema->size();
1579
 
 
 
1500
  if (db.empty())
 
1501
  {
 
1502
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
 
1503
    return true;
 
1504
  }
 
1505
  *p_db= strmake(db.c_str(), db.length());
 
1506
  *p_db_length= db.length();
1580
1507
  return false;
1581
1508
}
1582
1509
 
1616
1543
}
1617
1544
 
1618
1545
 
1619
 
void Session::set_db(const std::string &new_db)
 
1546
/****************************************************************************
 
1547
  Handling of open and locked tables states.
 
1548
 
 
1549
  This is used when we want to open/lock (and then close) some tables when
 
1550
  we already have a set of tables open and locked. We use these methods for
 
1551
  access to mysql.proc table to find definitions of stored routines.
 
1552
****************************************************************************/
 
1553
 
 
1554
void Session::reset_n_backup_open_tables_state(Open_tables_state *backup)
 
1555
{
 
1556
  backup->set_open_tables_state(this);
 
1557
  reset_open_tables_state();
 
1558
  backups_available= false;
 
1559
}
 
1560
 
 
1561
 
 
1562
void Session::restore_backup_open_tables_state(Open_tables_state *backup)
 
1563
{
 
1564
  /*
 
1565
    Before we will throw away current open tables state we want
 
1566
    to be sure that it was properly cleaned up.
 
1567
  */
 
1568
  assert(open_tables == 0 && temporary_tables == 0 &&
 
1569
              derived_tables == 0 &&
 
1570
              lock == 0);
 
1571
  set_open_tables_state(backup);
 
1572
}
 
1573
 
 
1574
bool Session::set_db(const std::string &new_db)
1620
1575
{
1621
1576
  /* Do not reallocate memory if current chunk is big enough. */
1622
1577
  if (new_db.length())
1623
 
  {
1624
 
    _schema.reset(new std::string(new_db));
1625
 
  }
 
1578
    db= new_db;
1626
1579
  else
1627
 
  {
1628
 
    _schema.reset(new std::string(""));
1629
 
  }
1630
 
}
1631
 
 
 
1580
    db.clear();
 
1581
 
 
1582
  return false;
 
1583
}
 
1584
 
 
1585
 
 
1586
 
 
1587
 
 
1588
/**
 
1589
  Check the killed state of a user thread
 
1590
  @param session  user thread
 
1591
  @retval 0 the user thread is active
 
1592
  @retval 1 the user thread has been killed
 
1593
*/
 
1594
int session_killed(const Session *session)
 
1595
{
 
1596
  return(session->killed);
 
1597
}
 
1598
 
 
1599
 
 
1600
const struct charset_info_st *session_charset(Session *session)
 
1601
{
 
1602
  return(session->charset());
 
1603
}
1632
1604
 
1633
1605
/**
1634
1606
  Mark transaction to rollback and mark error as fatal to a sub-statement.
1651
1623
  plugin_sessionvar_cleanup(this);
1652
1624
 
1653
1625
  /* If necessary, log any aborted or unauthorized connections */
1654
 
  if (getKilled() || client->wasAborted())
1655
 
  {
1656
 
    status_var.aborted_threads++;
1657
 
  }
 
1626
  if (killed || client->wasAborted())
 
1627
    statistic_increment(aborted_threads, &LOCK_status);
1658
1628
 
1659
1629
  if (client->wasAborted())
1660
1630
  {
1661
 
    if (not getKilled() && variables.log_warnings > 1)
 
1631
    if (! killed && variables.log_warnings > 1)
1662
1632
    {
1663
1633
      SecurityContext *sctx= &security_ctx;
1664
1634
 
1665
1635
      errmsg_printf(ERRMSG_LVL_WARN, ER(ER_NEW_ABORTING_CONNECTION)
1666
1636
                  , thread_id
1667
 
                  , (_schema->empty() ? "unconnected" : _schema->c_str())
 
1637
                  , (db.empty() ? "unconnected" : db.c_str())
1668
1638
                  , sctx->getUser().empty() == false ? sctx->getUser().c_str() : "unauthenticated"
1669
1639
                  , sctx->getIp().c_str()
1670
1640
                  , (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1673
1643
 
1674
1644
  /* Close out our connection to the client */
1675
1645
  if (should_lock)
1676
 
    session::Cache::singleton().mutex().lock();
1677
 
 
1678
 
  setKilled(Session::KILL_CONNECTION);
1679
 
 
 
1646
    (void) pthread_mutex_lock(&LOCK_thread_count);
 
1647
  killed= Session::KILL_CONNECTION;
1680
1648
  if (client->isConnected())
1681
1649
  {
1682
1650
    if (errcode)
1686
1654
    }
1687
1655
    client->close();
1688
1656
  }
1689
 
 
1690
1657
  if (should_lock)
1691
 
  {
1692
 
    session::Cache::singleton().mutex().unlock();
1693
 
  }
 
1658
    (void) pthread_mutex_unlock(&LOCK_thread_count);
1694
1659
}
1695
1660
 
1696
1661
void Session::reset_for_next_command()
1718
1683
  Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
1719
1684
*/
1720
1685
 
1721
 
void Open_tables_state::close_temporary_tables()
 
1686
void Session::close_temporary_tables()
1722
1687
{
1723
1688
  Table *table;
1724
1689
  Table *tmp_next;
1728
1693
 
1729
1694
  for (table= temporary_tables; table; table= tmp_next)
1730
1695
  {
1731
 
    tmp_next= table->getNext();
 
1696
    tmp_next= table->next;
1732
1697
    nukeTable(table);
1733
1698
  }
1734
1699
  temporary_tables= NULL;
1738
1703
  unlink from session->temporary tables and close temporary table
1739
1704
*/
1740
1705
 
1741
 
void Open_tables_state::close_temporary_table(Table *table)
 
1706
void Session::close_temporary_table(Table *table)
1742
1707
{
1743
 
  if (table->getPrev())
 
1708
  if (table->prev)
1744
1709
  {
1745
 
    table->getPrev()->setNext(table->getNext());
1746
 
    if (table->getPrev()->getNext())
1747
 
    {
1748
 
      table->getNext()->setPrev(table->getPrev());
1749
 
    }
 
1710
    table->prev->next= table->next;
 
1711
    if (table->prev->next)
 
1712
      table->next->prev= table->prev;
1750
1713
  }
1751
1714
  else
1752
1715
  {
1757
1720
      passing non-zero value to end_slave via rli->save_temporary_tables
1758
1721
      when no temp tables opened, see an invariant below.
1759
1722
    */
1760
 
    temporary_tables= table->getNext();
 
1723
    temporary_tables= table->next;
1761
1724
    if (temporary_tables)
1762
 
    {
1763
 
      table->getNext()->setPrev(NULL);
1764
 
    }
 
1725
      table->next->prev= NULL;
1765
1726
  }
1766
1727
  nukeTable(table);
1767
1728
}
1774
1735
  If this is needed, use close_temporary_table()
1775
1736
*/
1776
1737
 
1777
 
void Open_tables_state::nukeTable(Table *table)
 
1738
void Session::nukeTable(Table *table)
1778
1739
{
1779
 
  plugin::StorageEngine *table_type= table->getShare()->db_type();
 
1740
  plugin::StorageEngine *table_type= table->s->db_type();
1780
1741
 
1781
1742
  table->free_io_cache();
1782
 
  table->delete_table();
 
1743
  table->delete_table(false);
1783
1744
 
1784
 
  TableIdentifier identifier(table->getShare()->getSchemaName(), table->getShare()->getTableName(), table->getShare()->getPath());
 
1745
  TableIdentifier identifier(table->s->getSchemaName(), table->s->getTableName(), table->s->getPath());
1785
1746
  rm_temporary_table(table_type, identifier);
1786
1747
 
1787
 
  delete table->getMutableShare();
 
1748
  delete table->s;
1788
1749
 
1789
1750
  /* This makes me sad, but we're allocating it via malloc */
1790
 
  delete table;
 
1751
  free(table);
1791
1752
}
1792
1753
 
1793
1754
/** Clear most status variables. */
1794
1755
extern time_t flush_status_time;
 
1756
extern uint32_t max_used_connections;
1795
1757
 
1796
1758
void Session::refresh_status()
1797
1759
{
 
1760
  pthread_mutex_lock(&LOCK_status);
 
1761
 
 
1762
  /* Add thread's status variabes to global status */
 
1763
  add_to_status(&global_status_var, &status_var);
 
1764
 
1798
1765
  /* Reset thread's status variables */
1799
1766
  memset(&status_var, 0, sizeof(status_var));
1800
1767
 
 
1768
  /* Reset some global variables */
 
1769
  reset_status_vars();
 
1770
 
 
1771
  /* Reset the counters of all key caches (default and named). */
 
1772
  reset_key_cache_counters();
1801
1773
  flush_status_time= time((time_t*) 0);
1802
 
  current_global_counters.max_used_connections= 1; /* We set it to one, because we know we exist */
1803
 
  current_global_counters.connections= 0;
 
1774
  max_used_connections= 1; /* We set it to one, because we know we exist */
 
1775
  pthread_mutex_unlock(&LOCK_status);
1804
1776
}
1805
1777
 
1806
1778
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1807
1779
{
1808
 
  return getVariable(std::string(name.str, name.length), create_if_not_exists);
1809
 
}
1810
 
 
1811
 
user_var_entry *Session::getVariable(const std::string  &name, bool create_if_not_exists)
1812
 
{
1813
 
  UserVarsRange ppp= user_vars.equal_range(name);
1814
 
 
1815
 
  for (UserVars::iterator iter= ppp.first;
1816
 
       iter != ppp.second; ++iter)
1817
 
  {
1818
 
    return (*iter).second;
1819
 
  }
1820
 
 
1821
 
  if (not create_if_not_exists)
1822
 
    return NULL;
1823
 
 
1824
1780
  user_var_entry *entry= NULL;
1825
 
  entry= new (nothrow) user_var_entry(name.c_str(), query_id);
1826
 
 
1827
 
  if (entry == NULL)
1828
 
    return NULL;
1829
 
 
1830
 
  std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(name, entry));
1831
 
 
1832
 
  if (not returnable.second)
 
1781
 
 
1782
  entry= (user_var_entry*) hash_search(&user_vars, (unsigned char*) name.str, name.length);
 
1783
 
 
1784
  if ((entry == NULL) && create_if_not_exists)
1833
1785
  {
1834
 
    delete entry;
 
1786
    if (!hash_inited(&user_vars))
 
1787
      return NULL;
 
1788
    entry= new (nothrow) user_var_entry(name.str, query_id);
 
1789
 
 
1790
    if (entry == NULL)
 
1791
      return NULL;
 
1792
 
 
1793
    if (my_hash_insert(&user_vars, (unsigned char*) entry))
 
1794
    {
 
1795
      assert(1);
 
1796
      delete entry;
 
1797
      return 0;
 
1798
    }
 
1799
 
1835
1800
  }
1836
1801
 
1837
1802
  return entry;
1838
1803
}
1839
1804
 
1840
 
void Session::setVariable(const std::string &name, const std::string &value)
1841
 
{
1842
 
  user_var_entry *updateable_var= getVariable(name.c_str(), true);
1843
 
 
1844
 
  updateable_var->update_hash(false,
1845
 
                              (void*)value.c_str(),
1846
 
                              static_cast<uint32_t>(value.length()), STRING_RESULT,
1847
 
                              &my_charset_bin,
1848
 
                              DERIVATION_IMPLICIT, false);
1849
 
}
1850
 
 
1851
 
void Open_tables_state::mark_temp_tables_as_free_for_reuse()
1852
 
{
1853
 
  for (Table *table= temporary_tables ; table ; table= table->getNext())
 
1805
void Session::mark_temp_tables_as_free_for_reuse()
 
1806
{
 
1807
  for (Table *table= temporary_tables ; table ; table= table->next)
1854
1808
  {
1855
 
    if (table->query_id == getQueryId())
 
1809
    if (table->query_id == query_id)
1856
1810
    {
1857
1811
      table->query_id= 0;
1858
1812
      table->cursor->ha_reset();
1862
1816
 
1863
1817
void Session::mark_used_tables_as_free_for_reuse(Table *table)
1864
1818
{
1865
 
  for (; table ; table= table->getNext())
 
1819
  for (; table ; table= table->next)
1866
1820
  {
1867
 
    if (table->query_id == getQueryId())
 
1821
    if (table->query_id == query_id)
1868
1822
    {
1869
1823
      table->query_id= 0;
1870
1824
      table->cursor->ha_reset();
1883
1837
*/
1884
1838
void Session::close_thread_tables()
1885
1839
{
1886
 
  clearDerivedTables();
 
1840
  Table *table;
 
1841
 
 
1842
  /*
 
1843
    We are assuming here that session->derived_tables contains ONLY derived
 
1844
    tables for this substatement. i.e. instead of approach which uses
 
1845
    query_id matching for determining which of the derived tables belong
 
1846
    to this substatement we rely on the ability of substatements to
 
1847
    save/restore session->derived_tables during their execution.
 
1848
 
 
1849
    TODO: Probably even better approach is to simply associate list of
 
1850
          derived tables with (sub-)statement instead of thread and destroy
 
1851
          them at the end of its execution.
 
1852
  */
 
1853
  if (derived_tables)
 
1854
  {
 
1855
    Table *next;
 
1856
    /*
 
1857
      Close all derived tables generated in queries like
 
1858
      SELECT * FROM (SELECT * FROM t1)
 
1859
    */
 
1860
    for (table= derived_tables ; table ; table= next)
 
1861
    {
 
1862
      next= table->next;
 
1863
      table->free_tmp_table(this);
 
1864
    }
 
1865
    derived_tables= 0;
 
1866
  }
1887
1867
 
1888
1868
  /*
1889
1869
    Mark all temporary tables used by this statement as free for reuse.
1897
1877
    does not belong to statement for which we do close_thread_tables()).
1898
1878
    TODO: This should be fixed in later releases.
1899
1879
   */
 
1880
  if (backups_available == false)
1900
1881
  {
1901
1882
    TransactionServices &transaction_services= TransactionServices::singleton();
1902
1883
    main_da.can_overwrite_status= true;
1916
1897
      handled either before writing a query log event (inside
1917
1898
      binlog_query()) or when preparing a pending event.
1918
1899
     */
1919
 
    unlockTables(lock);
 
1900
    mysql_unlock_tables(this, lock);
1920
1901
    lock= 0;
1921
1902
  }
1922
1903
  /*
1923
 
    Note that we need to hold table::Cache::singleton().mutex() while changing the
 
1904
    Note that we need to hold LOCK_open while changing the
1924
1905
    open_tables list. Another thread may work on it.
1925
 
    (See: table::Cache::singleton().removeTable(), mysql_wait_completed_table())
 
1906
    (See: remove_table_from_cache(), mysql_wait_completed_table())
1926
1907
    Closing a MERGE child before the parent would be fatal if the
1927
1908
    other thread tries to abort the MERGE lock in between.
1928
1909
  */
1961
1942
    close_tables_for_reopen(&tables);
1962
1943
  }
1963
1944
  if ((mysql_handle_derived(lex, &mysql_derived_prepare) ||
1964
 
       (
 
1945
       (fill_derived_tables() &&
1965
1946
        mysql_handle_derived(lex, &mysql_derived_filling))))
1966
1947
    return true;
1967
1948
 
1968
1949
  return false;
1969
1950
}
1970
1951
 
1971
 
/*
1972
 
  @note "best_effort" is used in cases were if a failure occurred on this
1973
 
  operation it would not be surprising because we are only removing because there
1974
 
  might be an issue (lame engines).
1975
 
*/
 
1952
bool Session::openTables(TableList *tables, uint32_t flags)
 
1953
{
 
1954
  uint32_t counter;
 
1955
  bool ret= fill_derived_tables();
 
1956
  assert(ret == false);
 
1957
  if (open_tables_from_list(&tables, &counter, flags) ||
 
1958
      mysql_handle_derived(lex, &mysql_derived_prepare))
 
1959
    return true;
 
1960
  return false;
 
1961
}
1976
1962
 
1977
 
bool Open_tables_state::rm_temporary_table(const TableIdentifier &identifier, bool best_effort)
 
1963
bool Session::rm_temporary_table(TableIdentifier &identifier)
1978
1964
{
1979
 
  if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), identifier))
 
1965
  if (plugin::StorageEngine::dropTable(*this, identifier))
1980
1966
  {
1981
 
    if (not best_effort)
1982
 
    {
1983
 
      std::string path;
1984
 
      identifier.getSQLPath(path);
1985
 
      errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
1986
 
                    path.c_str(), errno);
1987
 
    }
 
1967
    errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
 
1968
                  identifier.getSQLPath().c_str(), errno);
 
1969
    dumpTemporaryTableNames("rm_temporary_table()");
1988
1970
 
1989
1971
    return true;
1990
1972
  }
1992
1974
  return false;
1993
1975
}
1994
1976
 
1995
 
bool Open_tables_state::rm_temporary_table(plugin::StorageEngine *base, const TableIdentifier &identifier)
 
1977
bool Session::rm_temporary_table(plugin::StorageEngine *base, TableIdentifier &identifier)
1996
1978
{
1997
1979
  assert(base);
1998
1980
 
1999
 
  if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), *base, identifier))
 
1981
  if (plugin::StorageEngine::dropTable(*this, *base, identifier))
2000
1982
  {
2001
 
    std::string path;
2002
 
    identifier.getSQLPath(path);
2003
1983
    errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
2004
 
                  path.c_str(), errno);
 
1984
                  identifier.getSQLPath().c_str(), errno);
 
1985
    dumpTemporaryTableNames("rm_temporary_table()");
2005
1986
 
2006
1987
    return true;
2007
1988
  }
2013
1994
  @note this will be removed, I am looking through Hudson to see if it is finding
2014
1995
  any tables that are missed during cleanup.
2015
1996
*/
2016
 
void Open_tables_state::dumpTemporaryTableNames(const char *foo)
 
1997
void Session::dumpTemporaryTableNames(const char *foo)
2017
1998
{
2018
1999
  Table *table;
2019
2000
 
2021
2002
    return;
2022
2003
 
2023
2004
  cerr << "Begin Run: " << foo << "\n";
2024
 
  for (table= temporary_tables; table; table= table->getNext())
 
2005
  for (table= temporary_tables; table; table= table->next)
2025
2006
  {
2026
2007
    bool have_proto= false;
2027
2008
 
2028
 
    message::Table *proto= table->getShare()->getTableProto();
2029
 
    if (table->getShare()->getTableProto())
 
2009
    message::Table *proto= table->s->getTableProto();
 
2010
    if (table->s->getTableProto())
2030
2011
      have_proto= true;
2031
2012
 
2032
2013
    const char *answer= have_proto ? "true" : "false";
2033
2014
 
2034
2015
    if (have_proto)
2035
2016
    {
2036
 
      cerr << "\tTable Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
 
2017
      cerr << "\tTable Name " << table->s->getSchemaName() << "." << table->s->getTableName() << " : " << answer << "\n";
2037
2018
      cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
2038
2019
    }
2039
2020
    else
2040
 
    {
2041
 
      cerr << "\tTabl;e Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2042
 
    }
 
2021
      cerr << "\tTabl;e Name " << table->s->getSchemaName() << "." << table->s->getTableName() << " : " << answer << "\n";
2043
2022
  }
2044
2023
}
2045
2024
 
2046
 
bool Session::TableMessages::storeTableMessage(const TableIdentifier &identifier, message::Table &table_message)
 
2025
bool Session::storeTableMessage(TableIdentifier &identifier, message::Table &table_message)
2047
2026
{
2048
2027
  table_message_cache.insert(make_pair(identifier.getPath(), table_message));
2049
2028
 
2050
2029
  return true;
2051
2030
}
2052
2031
 
2053
 
bool Session::TableMessages::removeTableMessage(const TableIdentifier &identifier)
 
2032
bool Session::removeTableMessage(TableIdentifier &identifier)
2054
2033
{
2055
2034
  TableMessageCache::iterator iter;
2056
2035
 
2064
2043
  return true;
2065
2044
}
2066
2045
 
2067
 
bool Session::TableMessages::getTableMessage(const TableIdentifier &identifier, message::Table &table_message)
 
2046
bool Session::getTableMessage(TableIdentifier &identifier, message::Table &table_message)
2068
2047
{
2069
2048
  TableMessageCache::iterator iter;
2070
2049
 
2078
2057
  return true;
2079
2058
}
2080
2059
 
2081
 
bool Session::TableMessages::doesTableMessageExist(const TableIdentifier &identifier)
 
2060
bool Session::doesTableMessageExist(TableIdentifier &identifier)
2082
2061
{
2083
2062
  TableMessageCache::iterator iter;
2084
2063
 
2092
2071
  return true;
2093
2072
}
2094
2073
 
2095
 
bool Session::TableMessages::renameTableMessage(const TableIdentifier &from, const TableIdentifier &to)
 
2074
bool Session::renameTableMessage(TableIdentifier &from, TableIdentifier &to)
2096
2075
{
2097
2076
  TableMessageCache::iterator iter;
2098
2077
 
2111
2090
  return true;
2112
2091
}
2113
2092
 
2114
 
table::Instance *Session::getInstanceTable()
2115
 
{
2116
 
  temporary_shares.push_back(new table::Instance()); // This will not go into the tableshare cache, so no key is used.
2117
 
 
2118
 
  table::Instance *tmp_share= temporary_shares.back();
2119
 
 
2120
 
  assert(tmp_share);
2121
 
 
2122
 
  return tmp_share;
2123
 
}
2124
 
 
2125
 
 
2126
 
/**
2127
 
  Create a reduced Table object with properly set up Field list from a
2128
 
  list of field definitions.
2129
 
 
2130
 
    The created table doesn't have a table Cursor associated with
2131
 
    it, has no keys, no group/distinct, no copy_funcs array.
2132
 
    The sole purpose of this Table object is to use the power of Field
2133
 
    class to read/write data to/from table->getInsertRecord(). Then one can store
2134
 
    the record in any container (RB tree, hash, etc).
2135
 
    The table is created in Session mem_root, so are the table's fields.
2136
 
    Consequently, if you don't BLOB fields, you don't need to free it.
2137
 
 
2138
 
  @param session         connection handle
2139
 
  @param field_list  list of column definitions
2140
 
 
2141
 
  @return
2142
 
    0 if out of memory, Table object in case of success
2143
 
*/
2144
 
table::Instance *Session::getInstanceTable(List<CreateField> &field_list)
2145
 
{
2146
 
  temporary_shares.push_back(new table::Instance(this, field_list)); // This will not go into the tableshare cache, so no key is used.
2147
 
 
2148
 
  table::Instance *tmp_share= temporary_shares.back();
2149
 
 
2150
 
  assert(tmp_share);
2151
 
 
2152
 
  return tmp_share;
2153
 
}
2154
 
 
2155
 
namespace display  {
2156
 
 
2157
 
static const std::string NONE= "NONE";
2158
 
static const std::string GOT_GLOBAL_READ_LOCK= "HAS GLOBAL READ LOCK";
2159
 
static const std::string MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT= "HAS GLOBAL READ LOCK WITH BLOCKING COMMIT";
2160
 
 
2161
 
const std::string &type(drizzled::Session::global_read_lock_t type)
2162
 
{
2163
 
  switch (type) {
2164
 
    default:
2165
 
    case Session::NONE:
2166
 
      return NONE;
2167
 
    case Session::GOT_GLOBAL_READ_LOCK:
2168
 
      return GOT_GLOBAL_READ_LOCK;
2169
 
    case Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT:
2170
 
      return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT;
2171
 
  }
2172
 
}
2173
 
 
2174
 
size_t max_string_length(drizzled::Session::global_read_lock_t)
2175
 
{
2176
 
  return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT.size();
2177
 
}
2178
 
 
2179
 
} /* namespace display */
2180
 
 
2181
2093
} /* namespace drizzled */