~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/session.cc

  • Committer: Brian Aker
  • Date: 2010-09-22 22:25:29 UTC
  • mto: (1791.1.1 drizzle)
  • mto: This revision was merged to the branch mainline in revision 1792.
  • Revision ID: brian@tangent.org-20100922222529-geo4wggmu5ntqa5k
Current boost work (more conversion).

Show diffs side-by-side

added added

removed removed

Lines of Context:
22
22
 */
23
23
 
24
24
#include "config.h"
25
 
#include "drizzled/session.h"
26
 
#include "drizzled/session/cache.h"
 
25
#include <drizzled/session.h>
 
26
#include "drizzled/session_list.h"
27
27
#include <sys/stat.h>
28
 
#include "drizzled/error.h"
29
 
#include "drizzled/gettext.h"
30
 
#include "drizzled/query_id.h"
31
 
#include "drizzled/data_home.h"
32
 
#include "drizzled/sql_base.h"
33
 
#include "drizzled/lock.h"
34
 
#include "drizzled/item/cache.h"
35
 
#include "drizzled/item/float.h"
36
 
#include "drizzled/item/return_int.h"
37
 
#include "drizzled/item/empty_string.h"
38
 
#include "drizzled/show.h"
39
 
#include "drizzled/plugin/client.h"
 
28
#include <drizzled/error.h>
 
29
#include <drizzled/gettext.h>
 
30
#include <drizzled/query_id.h>
 
31
#include <drizzled/data_home.h>
 
32
#include <drizzled/sql_base.h>
 
33
#include <drizzled/lock.h>
 
34
#include <drizzled/item/cache.h>
 
35
#include <drizzled/item/float.h>
 
36
#include <drizzled/item/return_int.h>
 
37
#include <drizzled/item/empty_string.h>
 
38
#include <drizzled/show.h>
 
39
#include <drizzled/plugin/client.h>
40
40
#include "drizzled/plugin/scheduler.h"
41
41
#include "drizzled/plugin/authentication.h"
42
42
#include "drizzled/plugin/logging.h"
43
43
#include "drizzled/plugin/transactional_storage_engine.h"
44
 
#include "drizzled/plugin/query_rewrite.h"
45
44
#include "drizzled/probes.h"
46
45
#include "drizzled/table_proto.h"
47
46
#include "drizzled/db.h"
49
48
#include "drizzled/transaction_services.h"
50
49
#include "drizzled/drizzled.h"
51
50
 
52
 
#include "drizzled/table/instance.h"
 
51
#include "drizzled/table_share_instance.h"
53
52
 
54
53
#include "plugin/myisam/myisam.h"
55
54
#include "drizzled/internal/iocache.h"
56
55
#include "drizzled/internal/thread_var.h"
57
56
#include "drizzled/plugin/event_observer.h"
58
57
 
59
 
#include "drizzled/util/functors.h"
60
 
 
61
 
#include "drizzled/display.h"
62
 
 
63
58
#include <fcntl.h>
64
59
#include <algorithm>
65
60
#include <climits>
66
 
#include <boost/filesystem.hpp>
67
 
 
68
 
#include "drizzled/util/backtrace.h"
69
61
 
70
62
using namespace std;
71
 
 
72
 
namespace fs=boost::filesystem;
73
63
namespace drizzled
74
64
{
75
65
 
86
76
{
87
77
  return length == other.length &&
88
78
         field_name.length == other.field_name.length &&
89
 
    !my_strcasecmp(system_charset_info, field_name.str, other.field_name.str);
 
79
         !strcmp(field_name.str, other.field_name.str);
90
80
}
91
81
 
92
82
Open_tables_state::Open_tables_state(uint64_t version_arg) :
162
152
Session::Session(plugin::Client *client_arg) :
163
153
  Open_tables_state(refresh_version),
164
154
  mem_root(&main_mem_root),
165
 
  xa_id(0),
166
155
  lex(&main_lex),
167
 
  query(new std::string),
168
 
  _schema(new std::string("")),
169
 
  catalog("LOCAL"),
 
156
  query(),
170
157
  client(client_arg),
171
158
  scheduler(NULL),
172
159
  scheduler_arg(NULL),
173
160
  lock_id(&main_lock_id),
174
161
  user_time(0),
175
162
  ha_data(plugin::num_trx_monitored_objects),
176
 
  concurrent_execute_allowed(true),
177
163
  arg_of_last_insert_id_function(false),
178
164
  first_successful_insert_id_in_prev_stmt(0),
179
165
  first_successful_insert_id_in_cur_stmt(0),
180
166
  limit_found_rows(0),
181
 
  _global_read_lock(NONE),
182
 
  _killed(NOT_KILLED),
 
167
  global_read_lock(0),
183
168
  some_tables_deleted(false),
184
169
  no_errors(false),
185
170
  password(false),
192
177
  cached_table(0),
193
178
  transaction_message(NULL),
194
179
  statement_message(NULL),
195
 
  session_event_observers(NULL),
196
 
  use_usage(false)
 
180
  session_event_observers(NULL)
197
181
{
 
182
  memset(process_list_info, 0, PROCESS_LIST_WIDTH);
198
183
  client->setSession(this);
199
184
 
200
185
  /*
205
190
  memory::init_sql_alloc(&main_mem_root, memory::ROOT_MIN_BLOCK_SIZE, 0);
206
191
  thread_stack= NULL;
207
192
  count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
 
193
  killed= NOT_KILLED;
208
194
  col_access= 0;
209
195
  tmp_table= 0;
210
196
  used_tables= 0;
313
299
    return;
314
300
 
315
301
  setAbort(true);
316
 
  boost_unique_lock_t scopedLock(mysys_var->mutex);
 
302
  boost::mutex::scoped_lock scopedLock(mysys_var->mutex);
317
303
  if (mysys_var->current_cond)
318
304
  {
319
305
    mysys_var->current_mutex->lock();
320
 
    mysys_var->current_cond->notify_all();
 
306
    pthread_cond_broadcast(mysys_var->current_cond->native_handle());
321
307
    mysys_var->current_mutex->unlock();
322
308
  }
323
309
}
339
325
{
340
326
  assert(cleanup_done == false);
341
327
 
342
 
  setKilled(KILL_CONNECTION);
 
328
  killed= KILL_CONNECTION;
343
329
#ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE
344
330
  if (transaction.xid_state.xa_state == XA_PREPARED)
345
331
  {
365
351
  close_temporary_tables();
366
352
 
367
353
  if (global_read_lock)
368
 
  {
369
 
    unlockGlobalReadLock();
370
 
  }
 
354
    unlock_global_read_lock(this);
371
355
 
372
356
  cleanup_done= true;
373
357
}
407
391
  plugin::Logging::postEndDo(this);
408
392
  plugin::EventObserver::deregisterSessionEvents(*this); 
409
393
 
410
 
  for (PropertyMap::iterator iter= life_properties.begin(); iter != life_properties.end(); iter++)
411
 
  {
412
 
    delete (*iter).second;
413
 
  }
414
 
  life_properties.clear();
415
 
}
416
 
 
417
 
void Session::setClient(plugin::Client *client_arg)
418
 
{
419
 
  client= client_arg;
420
 
  client->setSession(this);
421
 
}
422
 
 
423
 
void Session::awake(Session::killed_state_t state_to_set)
424
 
{
425
 
  if ((state_to_set == Session::KILL_QUERY) and (command == COM_SLEEP))
426
 
    return;
427
 
 
 
394
  /* Ensure that no one is using Session */
 
395
  LOCK_delete.unlock();
 
396
}
 
397
 
 
398
void Session::awake(Session::killed_state state_to_set)
 
399
{
428
400
  this->checkSentry();
429
 
 
430
 
  setKilled(state_to_set);
431
 
  scheduler->killSession(this);
432
 
 
 
401
  safe_mutex_assert_owner(&LOCK_delete);
 
402
 
 
403
  killed= state_to_set;
433
404
  if (state_to_set != Session::KILL_QUERY)
434
405
  {
 
406
    scheduler->killSession(this);
435
407
    DRIZZLE_CONNECTION_DONE(thread_id);
436
408
  }
437
 
 
438
409
  if (mysys_var)
439
410
  {
440
 
    boost_unique_lock_t scopedLock(mysys_var->mutex);
 
411
    boost::mutex::scoped_lock scopedLock(mysys_var->mutex);
441
412
    /*
442
413
      "
443
414
      This broadcast could be up in the air if the victim thread
461
432
    if (mysys_var->current_cond && mysys_var->current_mutex)
462
433
    {
463
434
      mysys_var->current_mutex->lock();
464
 
      mysys_var->current_cond->notify_all();
 
435
      pthread_cond_broadcast(mysys_var->current_cond->native_handle());
465
436
      mysys_var->current_mutex->unlock();
466
437
    }
467
438
  }
522
493
                                variables.query_prealloc_size);
523
494
  transaction.xid_state.xid.null();
524
495
  transaction.xid_state.in_session=1;
525
 
  if (use_usage)
526
 
    resetUsage();
527
496
}
528
497
 
529
498
bool Session::initGlobals()
547
516
 
548
517
  prepareForQueries();
549
518
 
550
 
  while (not client->haveError() && getKilled() != KILL_CONNECTION)
 
519
  while (! client->haveError() && killed != KILL_CONNECTION)
551
520
  {
552
 
    if (not executeStatement())
 
521
    if (! executeStatement())
553
522
      break;
554
523
  }
555
524
 
556
525
  disconnect(0, true);
557
526
}
558
527
 
559
 
bool Session::schedule(Session::shared_ptr &arg)
 
528
bool Session::schedule()
560
529
{
561
 
  arg->scheduler= plugin::Scheduler::getScheduler();
562
 
  assert(arg->scheduler);
 
530
  scheduler= plugin::Scheduler::getScheduler();
 
531
  assert(scheduler);
563
532
 
564
533
  connection_count.increment();
565
534
 
569
538
  }
570
539
 
571
540
  current_global_counters.connections++;
572
 
  arg->thread_id= arg->variables.pseudo_thread_id= global_thread_id++;
573
 
 
574
 
  session::Cache::singleton().insert(arg);
575
 
 
576
 
  if (unlikely(plugin::EventObserver::connectSession(*arg)))
577
 
  {
578
 
    // We should do something about an error...
579
 
  }
580
 
 
581
 
  if (plugin::Scheduler::getScheduler()->addSession(arg))
582
 
  {
583
 
    DRIZZLE_CONNECTION_START(arg->getSessionId());
 
541
  thread_id= variables.pseudo_thread_id= global_thread_id++;
 
542
 
 
543
  LOCK_thread_count.lock();
 
544
  getSessionList().push_back(this);
 
545
  LOCK_thread_count.unlock();
 
546
 
 
547
  if (scheduler->addSession(this))
 
548
  {
 
549
    DRIZZLE_CONNECTION_START(thread_id);
584
550
    char error_message_buff[DRIZZLE_ERRMSG_SIZE];
585
551
 
586
 
    arg->setKilled(Session::KILL_CONNECTION);
 
552
    killed= Session::KILL_CONNECTION;
587
553
 
588
 
    arg->status_var.aborted_connects++;
 
554
    status_var.aborted_connects++;
589
555
 
590
556
    /* Can't use my_error() since store_globals has not been called. */
591
557
    /* TODO replace will better error message */
592
558
    snprintf(error_message_buff, sizeof(error_message_buff),
593
559
             ER(ER_CANT_CREATE_THREAD), 1);
594
 
    arg->client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
595
 
 
 
560
    client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
596
561
    return true;
597
562
  }
598
563
 
600
565
}
601
566
 
602
567
 
603
 
/*
604
 
  Is this session viewable by the current user?
605
 
*/
606
 
bool Session::isViewable() const
607
 
{
608
 
  return plugin::Authorization::isAuthorized(current_session->getSecurityContext(),
609
 
                                             this,
610
 
                                             false);
611
 
}
612
 
 
613
 
 
614
 
const char* Session::enter_cond(boost::condition_variable_any &cond, boost::mutex &mutex, const char* msg)
 
568
const char* Session::enter_cond(boost::condition_variable &cond, boost::mutex &mutex, const char* msg)
615
569
{
616
570
  const char* old_msg = get_proc_info();
617
571
  safe_mutex_assert_owner(mutex);
630
584
    does a Session::awake() on you).
631
585
  */
632
586
  mysys_var->current_mutex->unlock();
633
 
  boost_unique_lock_t scopedLock(mysys_var->mutex);
 
587
  boost::mutex::scoped_lock scopedLock(mysys_var->mutex);
634
588
  mysys_var->current_mutex = 0;
635
589
  mysys_var->current_cond = 0;
636
590
  this->set_proc_info(old_msg);
638
592
 
639
593
bool Session::authenticate()
640
594
{
641
 
  lex->start(this);
 
595
  lex_start(this);
642
596
  if (client->authenticate())
643
597
    return false;
644
598
 
647
601
  return true;
648
602
}
649
603
 
650
 
bool Session::checkUser(const std::string &passwd_str,
651
 
                        const std::string &in_db)
 
604
bool Session::checkUser(const char *passwd, uint32_t passwd_len, const char *in_db)
652
605
{
 
606
  const string passwd_str(passwd, passwd_len);
653
607
  bool is_authenticated=
654
608
    plugin::Authentication::isAuthenticated(getSecurityContext(),
655
609
                                            passwd_str);
662
616
  }
663
617
 
664
618
  /* Change database if necessary */
665
 
  if (not in_db.empty())
 
619
  if (in_db && in_db[0])
666
620
  {
667
621
    SchemaIdentifier identifier(in_db);
668
622
    if (mysql_change_db(this, identifier))
672
626
    }
673
627
  }
674
628
  my_ok();
675
 
  password= not passwd_str.empty();
 
629
  password= test(passwd_len);          // remember for error messages
676
630
 
677
631
  /* Ready to handle queries */
678
632
  return true;
694
648
  main_da.reset_diagnostics_area();
695
649
 
696
650
  if (client->readCommand(&l_packet, &packet_length) == false)
697
 
  {
698
651
    return false;
699
 
  }
700
652
 
701
 
  if (getKilled() == KILL_CONNECTION)
 
653
  if (killed == KILL_CONNECTION)
702
654
    return false;
703
655
 
704
656
  if (packet_length == 0)
705
657
    return true;
706
658
 
707
 
  l_command= static_cast<enum_server_command>(l_packet[0]);
 
659
  l_command= (enum enum_server_command) (unsigned char) l_packet[0];
708
660
 
709
661
  if (command >= COM_END)
710
662
    command= COM_END;                           // Wrong command
711
663
 
712
664
  assert(packet_length);
713
 
  return not dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
 
665
  return ! dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
714
666
}
715
667
 
716
668
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
722
674
    in_packet_length--;
723
675
  }
724
676
  const char *pos= in_packet + in_packet_length; /* Point at end null */
725
 
  while (in_packet_length > 0 && (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
 
677
  while (in_packet_length > 0 &&
 
678
         (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
726
679
  {
727
680
    pos--;
728
681
    in_packet_length--;
729
682
  }
730
683
 
731
 
  std::string *new_query= new std::string(in_packet, in_packet + in_packet_length);
732
 
  // We can not be entirely sure _schema has a value
733
 
  if (_schema)
734
 
  {
735
 
    plugin::QueryRewriter::rewriteQuery(*_schema, *new_query);
736
 
  }
737
 
  query.reset(new_query);
738
 
  _state.reset(new State(in_packet, in_packet_length));
 
684
  query.assign(in_packet, in_packet + in_packet_length);
739
685
 
740
686
  return true;
741
687
}
790
736
  }
791
737
 
792
738
  if (result == false)
793
 
  {
794
739
    my_error(killed_errno(), MYF(0));
795
 
  }
796
740
  else if ((result == true) && do_release)
797
 
  {
798
 
    setKilled(Session::KILL_CONNECTION);
799
 
  }
 
741
    killed= Session::KILL_CONNECTION;
800
742
 
801
743
  return result;
802
744
}
867
809
  where= Session::DEFAULT_WHERE;
868
810
 
869
811
  /* Reset the temporary shares we built */
870
 
  for_each(temporary_shares.begin(),
871
 
           temporary_shares.end(),
872
 
           DeletePtr());
 
812
  for (std::vector<TableShareInstance *>::iterator iter= temporary_shares.begin();
 
813
       iter != temporary_shares.end(); iter++)
 
814
  {
 
815
    delete *iter;
 
816
  }
873
817
  temporary_shares.clear();
874
818
}
875
819
 
955
899
  my_message(errcode, err, MYF(0));
956
900
  if (file > 0)
957
901
  {
958
 
    (void) cache->end_io_cache();
 
902
    (void) end_io_cache(cache);
959
903
    (void) internal::my_close(file, MYF(0));
960
 
    (void) internal::my_delete(path.file_string().c_str(), MYF(0));             // Delete file on error
 
904
    (void) internal::my_delete(path, MYF(0));           // Delete file on error
961
905
    file= -1;
962
906
  }
963
907
}
965
909
 
966
910
bool select_to_file::send_eof()
967
911
{
968
 
  int error= test(cache->end_io_cache());
 
912
  int error= test(end_io_cache(cache));
969
913
  if (internal::my_close(file, MYF(MY_WME)))
970
914
    error= 1;
971
915
  if (!error)
987
931
  /* In case of error send_eof() may be not called: close the file here. */
988
932
  if (file >= 0)
989
933
  {
990
 
    (void) cache->end_io_cache();
 
934
    (void) end_io_cache(cache);
991
935
    (void) internal::my_close(file, MYF(0));
992
936
    file= -1;
993
937
  }
994
 
  path= "";
 
938
  path[0]= '\0';
995
939
  row_count= 0;
996
940
}
997
941
 
1001
945
    cache(static_cast<internal::IO_CACHE *>(memory::sql_calloc(sizeof(internal::IO_CACHE)))),
1002
946
    row_count(0L)
1003
947
{
1004
 
  path= "";
 
948
  path[0]=0;
1005
949
}
1006
950
 
1007
951
select_to_file::~select_to_file()
1035
979
*/
1036
980
 
1037
981
 
1038
 
static int create_file(Session *session,
1039
 
                       fs::path &target_path,
1040
 
                       file_exchange *exchange,
1041
 
                       internal::IO_CACHE *cache)
 
982
static int create_file(Session *session, char *path, file_exchange *exchange, internal::IO_CACHE *cache)
1042
983
{
1043
 
  fs::path to_file(exchange->file_name);
1044
984
  int file;
1045
 
 
1046
 
  if (not to_file.has_root_directory())
 
985
  uint32_t option= MY_UNPACK_FILENAME | MY_RELATIVE_PATH;
 
986
 
 
987
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
 
988
  option|= MY_REPLACE_DIR;                      // Force use of db directory
 
989
#endif
 
990
 
 
991
  if (!internal::dirname_length(exchange->file_name))
1047
992
  {
1048
 
    target_path= fs::system_complete(getDataHomeCatalog());
1049
 
    util::string::const_shared_ptr schema(session->schema());
1050
 
    if (schema and not schema->empty())
1051
 
    {
1052
 
      int count_elements= 0;
1053
 
      for (fs::path::iterator iter= to_file.begin();
1054
 
           iter != to_file.end();
1055
 
           ++iter, ++count_elements)
1056
 
      { }
1057
 
 
1058
 
      if (count_elements == 1)
1059
 
      {
1060
 
        target_path /= *schema;
1061
 
      }
1062
 
    }
1063
 
    target_path /= to_file;
 
993
    strcpy(path, data_home_real);
 
994
    if (! session->db.empty())
 
995
      strncat(path, session->db.c_str(), FN_REFLEN-strlen(data_home_real)-1);
 
996
    (void) internal::fn_format(path, exchange->file_name, path, "", option);
1064
997
  }
1065
998
  else
1066
 
  {
1067
 
    target_path = exchange->file_name;
1068
 
  }
1069
 
 
1070
 
  if (not secure_file_priv.string().empty())
1071
 
  {
1072
 
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
1073
 
    {
1074
 
      /* Write only allowed to dir or subdir specified by secure_file_priv */
1075
 
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
1076
 
      return -1;
1077
 
    }
1078
 
  }
1079
 
 
1080
 
  if (!access(target_path.file_string().c_str(), F_OK))
 
999
    (void) internal::fn_format(path, exchange->file_name, data_home_real, "", option);
 
1000
 
 
1001
  if (opt_secure_file_priv &&
 
1002
      strncmp(opt_secure_file_priv, path, strlen(opt_secure_file_priv)))
 
1003
  {
 
1004
    /* Write only allowed to dir or subdir specified by secure_file_priv */
 
1005
    my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
1006
    return -1;
 
1007
  }
 
1008
 
 
1009
  if (!access(path, F_OK))
1081
1010
  {
1082
1011
    my_error(ER_FILE_EXISTS_ERROR, MYF(0), exchange->file_name);
1083
1012
    return -1;
1084
1013
  }
1085
1014
  /* Create the file world readable */
1086
 
  if ((file= internal::my_create(target_path.file_string().c_str(), 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
 
1015
  if ((file= internal::my_create(path, 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
1087
1016
    return file;
1088
1017
  (void) fchmod(file, 0666);                    // Because of umask()
1089
 
  if (cache->init_io_cache(file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
 
1018
  if (init_io_cache(cache, file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1090
1019
  {
1091
1020
    internal::my_close(file, MYF(0));
1092
 
    internal::my_delete(target_path.file_string().c_str(), MYF(0));  // Delete file on error, it was just created
 
1021
    internal::my_delete(path, MYF(0));  // Delete file on error, it was just created
1093
1022
    return -1;
1094
1023
  }
1095
1024
  return file;
1103
1032
  bool string_results= false, non_string_results= false;
1104
1033
  unit= u;
1105
1034
  if ((uint32_t) strlen(exchange->file_name) + NAME_LEN >= FN_REFLEN)
1106
 
  {
1107
 
    path= exchange->file_name;
1108
 
  }
 
1035
    strncpy(path,exchange->file_name,FN_REFLEN-1);
1109
1036
 
1110
1037
  /* Check if there is any blobs in data */
1111
1038
  {
1115
1042
    {
1116
1043
      if (item->max_length >= MAX_BLOB_WIDTH)
1117
1044
      {
1118
 
        blob_flag=1;
1119
 
        break;
 
1045
        blob_flag=1;
 
1046
        break;
1120
1047
      }
1121
 
 
1122
1048
      if (item->result_type() == STRING_RESULT)
1123
1049
        string_results= true;
1124
1050
      else
1169
1095
  if (unit->offset_limit_cnt)
1170
1096
  {                                             // using limit offset,count
1171
1097
    unit->offset_limit_cnt--;
1172
 
    return false;
 
1098
    return(0);
1173
1099
  }
1174
1100
  row_count++;
1175
1101
  Item *item;
1178
1104
 
1179
1105
  if (my_b_write(cache,(unsigned char*) exchange->line_start->ptr(),
1180
1106
                 exchange->line_start->length()))
1181
 
    return true;
1182
 
 
 
1107
    goto err;
1183
1108
  while ((item=li++))
1184
1109
  {
1185
1110
    Item_result result_type=item->result_type();
1190
1115
    {
1191
1116
      if (my_b_write(cache,(unsigned char*) exchange->enclosed->ptr(),
1192
1117
                     exchange->enclosed->length()))
1193
 
        return true;
 
1118
        goto err;
1194
1119
    }
1195
1120
    if (!res)
1196
1121
    {                                           // NULL
1201
1126
          null_buff[0]=escape_char;
1202
1127
          null_buff[1]='N';
1203
1128
          if (my_b_write(cache,(unsigned char*) null_buff,2))
1204
 
            return true;
 
1129
            goto err;
1205
1130
        }
1206
1131
        else if (my_b_write(cache,(unsigned char*) "NULL",4))
1207
 
          return true;
 
1132
          goto err;
1208
1133
      }
1209
1134
      else
1210
1135
      {
1214
1139
    else
1215
1140
    {
1216
1141
      if (fixed_row_size)
1217
 
        used_length= min(res->length(), static_cast<size_t>(item->max_length));
 
1142
        used_length= min(res->length(),item->max_length);
1218
1143
      else
1219
1144
        used_length= res->length();
1220
1145
 
1295
1220
            tmp_buff[1]= *pos ? *pos : '0';
1296
1221
            if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)) ||
1297
1222
                my_b_write(cache,(unsigned char*) tmp_buff,2))
1298
 
              return true;
 
1223
              goto err;
1299
1224
            start=pos+1;
1300
1225
          }
1301
1226
        }
1302
1227
        if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)))
1303
 
          return true;
 
1228
          goto err;
1304
1229
      }
1305
1230
      else if (my_b_write(cache,(unsigned char*) res->ptr(),used_length))
1306
 
        return true;
 
1231
        goto err;
1307
1232
    }
1308
1233
    if (fixed_row_size)
1309
1234
    {                                           // Fill with space
1319
1244
        for (; length > sizeof(space) ; length-=sizeof(space))
1320
1245
        {
1321
1246
          if (my_b_write(cache,(unsigned char*) space,sizeof(space)))
1322
 
            return true;
 
1247
            goto err;
1323
1248
        }
1324
1249
        if (my_b_write(cache,(unsigned char*) space,length))
1325
 
          return true;
 
1250
          goto err;
1326
1251
      }
1327
1252
    }
1328
1253
    if (res && enclosed)
1329
1254
    {
1330
1255
      if (my_b_write(cache, (unsigned char*) exchange->enclosed->ptr(),
1331
1256
                     exchange->enclosed->length()))
1332
 
        return true;
 
1257
        goto err;
1333
1258
    }
1334
1259
    if (--items_left)
1335
1260
    {
1336
1261
      if (my_b_write(cache, (unsigned char*) exchange->field_term->ptr(),
1337
1262
                     field_term_length))
1338
 
        return true;
 
1263
        goto err;
1339
1264
    }
1340
1265
  }
1341
1266
  if (my_b_write(cache,(unsigned char*) exchange->line_term->ptr(),
1342
1267
                 exchange->line_term->length()))
1343
 
  {
1344
 
    return true;
1345
 
  }
1346
 
 
1347
 
  return false;
 
1268
    goto err;
 
1269
  return(0);
 
1270
err:
 
1271
  return(1);
1348
1272
}
1349
1273
 
1350
1274
 
1377
1301
  if (row_count++ > 1)
1378
1302
  {
1379
1303
    my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0));
1380
 
    return 1;
 
1304
    goto err;
1381
1305
  }
1382
1306
  while ((item=li++))
1383
1307
  {
1385
1309
    if (!res)                                   // If NULL
1386
1310
    {
1387
1311
      if (my_b_write(cache,(unsigned char*) "",1))
1388
 
        return 1;
 
1312
        goto err;
1389
1313
    }
1390
1314
    else if (my_b_write(cache,(unsigned char*) res->ptr(),res->length()))
1391
1315
    {
1392
 
      my_error(ER_ERROR_ON_WRITE, MYF(0), path.file_string().c_str(), errno);
1393
 
      return 1;
 
1316
      my_error(ER_ERROR_ON_WRITE, MYF(0), path, errno);
 
1317
      goto err;
1394
1318
    }
1395
1319
  }
1396
1320
  return(0);
 
1321
err:
 
1322
  return(1);
1397
1323
}
1398
1324
 
1399
1325
 
1451
1377
      switch (val_item->result_type())
1452
1378
      {
1453
1379
      case REAL_RESULT:
1454
 
        op= &select_max_min_finder_subselect::cmp_real;
1455
 
        break;
 
1380
        op= &select_max_min_finder_subselect::cmp_real;
 
1381
        break;
1456
1382
      case INT_RESULT:
1457
 
        op= &select_max_min_finder_subselect::cmp_int;
1458
 
        break;
 
1383
        op= &select_max_min_finder_subselect::cmp_int;
 
1384
        break;
1459
1385
      case STRING_RESULT:
1460
 
        op= &select_max_min_finder_subselect::cmp_str;
1461
 
        break;
 
1386
        op= &select_max_min_finder_subselect::cmp_str;
 
1387
        break;
1462
1388
      case DECIMAL_RESULT:
1463
1389
        op= &select_max_min_finder_subselect::cmp_decimal;
1464
1390
        break;
1465
1391
      case ROW_RESULT:
1466
1392
        // This case should never be choosen
1467
 
        assert(0);
1468
 
        op= 0;
 
1393
        assert(0);
 
1394
        op= 0;
1469
1395
      }
1470
1396
    }
1471
1397
    cache->store(val_item);
1554
1480
void Session::end_statement()
1555
1481
{
1556
1482
  /* Cleanup SQL processing state to reuse this statement in next query. */
1557
 
  lex->end();
 
1483
  lex_end(lex);
1558
1484
  query_cache_key= ""; // reset the cache key
1559
1485
  resetResultsetMessage();
1560
1486
}
1561
1487
 
1562
1488
bool Session::copy_db_to(char **p_db, size_t *p_db_length)
1563
1489
{
1564
 
  assert(_schema);
1565
 
  if (_schema and _schema->empty())
1566
 
  {
1567
 
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1568
 
    return true;
1569
 
  }
1570
 
  else if (not _schema)
1571
 
  {
1572
 
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1573
 
    return true;
1574
 
  }
1575
 
  assert(_schema);
1576
 
 
1577
 
  *p_db= strmake(_schema->c_str(), _schema->size());
1578
 
  *p_db_length= _schema->size();
1579
 
 
 
1490
  if (db.empty())
 
1491
  {
 
1492
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
 
1493
    return true;
 
1494
  }
 
1495
  *p_db= strmake(db.c_str(), db.length());
 
1496
  *p_db_length= db.length();
1580
1497
  return false;
1581
1498
}
1582
1499
 
1616
1533
}
1617
1534
 
1618
1535
 
1619
 
void Session::set_db(const std::string &new_db)
 
1536
bool Session::set_db(const std::string &new_db)
1620
1537
{
1621
1538
  /* Do not reallocate memory if current chunk is big enough. */
1622
1539
  if (new_db.length())
1623
 
  {
1624
 
    _schema.reset(new std::string(new_db));
1625
 
  }
 
1540
    db= new_db;
1626
1541
  else
1627
 
  {
1628
 
    _schema.reset(new std::string(""));
1629
 
  }
1630
 
}
1631
 
 
 
1542
    db.clear();
 
1543
 
 
1544
  return false;
 
1545
}
 
1546
 
 
1547
 
 
1548
 
 
1549
 
 
1550
/**
 
1551
  Check the killed state of a user thread
 
1552
  @param session  user thread
 
1553
  @retval 0 the user thread is active
 
1554
  @retval 1 the user thread has been killed
 
1555
*/
 
1556
int session_killed(const Session *session)
 
1557
{
 
1558
  return(session->killed);
 
1559
}
 
1560
 
 
1561
 
 
1562
const struct charset_info_st *session_charset(Session *session)
 
1563
{
 
1564
  return(session->charset());
 
1565
}
1632
1566
 
1633
1567
/**
1634
1568
  Mark transaction to rollback and mark error as fatal to a sub-statement.
1651
1585
  plugin_sessionvar_cleanup(this);
1652
1586
 
1653
1587
  /* If necessary, log any aborted or unauthorized connections */
1654
 
  if (getKilled() || client->wasAborted())
 
1588
  if (killed || client->wasAborted())
1655
1589
  {
1656
1590
    status_var.aborted_threads++;
1657
1591
  }
1658
1592
 
1659
1593
  if (client->wasAborted())
1660
1594
  {
1661
 
    if (not getKilled() && variables.log_warnings > 1)
 
1595
    if (! killed && variables.log_warnings > 1)
1662
1596
    {
1663
1597
      SecurityContext *sctx= &security_ctx;
1664
1598
 
1665
1599
      errmsg_printf(ERRMSG_LVL_WARN, ER(ER_NEW_ABORTING_CONNECTION)
1666
1600
                  , thread_id
1667
 
                  , (_schema->empty() ? "unconnected" : _schema->c_str())
 
1601
                  , (db.empty() ? "unconnected" : db.c_str())
1668
1602
                  , sctx->getUser().empty() == false ? sctx->getUser().c_str() : "unauthenticated"
1669
1603
                  , sctx->getIp().c_str()
1670
1604
                  , (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1673
1607
 
1674
1608
  /* Close out our connection to the client */
1675
1609
  if (should_lock)
1676
 
    session::Cache::singleton().mutex().lock();
1677
 
 
1678
 
  setKilled(Session::KILL_CONNECTION);
1679
 
 
 
1610
    LOCK_thread_count.lock();
 
1611
  killed= Session::KILL_CONNECTION;
1680
1612
  if (client->isConnected())
1681
1613
  {
1682
1614
    if (errcode)
1686
1618
    }
1687
1619
    client->close();
1688
1620
  }
1689
 
 
1690
1621
  if (should_lock)
1691
 
  {
1692
 
    session::Cache::singleton().mutex().unlock();
1693
 
  }
 
1622
    (void) LOCK_thread_count.unlock();
1694
1623
}
1695
1624
 
1696
1625
void Session::reset_for_next_command()
1718
1647
  Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
1719
1648
*/
1720
1649
 
1721
 
void Open_tables_state::close_temporary_tables()
 
1650
void Session::close_temporary_tables()
1722
1651
{
1723
1652
  Table *table;
1724
1653
  Table *tmp_next;
1738
1667
  unlink from session->temporary tables and close temporary table
1739
1668
*/
1740
1669
 
1741
 
void Open_tables_state::close_temporary_table(Table *table)
 
1670
void Session::close_temporary_table(Table *table)
1742
1671
{
1743
1672
  if (table->getPrev())
1744
1673
  {
1774
1703
  If this is needed, use close_temporary_table()
1775
1704
*/
1776
1705
 
1777
 
void Open_tables_state::nukeTable(Table *table)
 
1706
void Session::nukeTable(Table *table)
1778
1707
{
1779
1708
  plugin::StorageEngine *table_type= table->getShare()->db_type();
1780
1709
 
1805
1734
 
1806
1735
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1807
1736
{
1808
 
  return getVariable(std::string(name.str, name.length), create_if_not_exists);
1809
 
}
1810
 
 
1811
 
user_var_entry *Session::getVariable(const std::string  &name, bool create_if_not_exists)
1812
 
{
1813
 
  UserVarsRange ppp= user_vars.equal_range(name);
 
1737
  user_var_entry *entry= NULL;
 
1738
  UserVarsRange ppp= user_vars.equal_range(std::string(name.str, name.length));
1814
1739
 
1815
1740
  for (UserVars::iterator iter= ppp.first;
1816
 
       iter != ppp.second; ++iter)
 
1741
         iter != ppp.second; ++iter)
1817
1742
  {
1818
 
    return (*iter).second;
 
1743
    entry= (*iter).second;
1819
1744
  }
1820
1745
 
1821
 
  if (not create_if_not_exists)
1822
 
    return NULL;
1823
 
 
1824
 
  user_var_entry *entry= NULL;
1825
 
  entry= new (nothrow) user_var_entry(name.c_str(), query_id);
1826
 
 
1827
 
  if (entry == NULL)
1828
 
    return NULL;
1829
 
 
1830
 
  std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(name, entry));
1831
 
 
1832
 
  if (not returnable.second)
 
1746
  if ((entry == NULL) && create_if_not_exists)
1833
1747
  {
1834
 
    delete entry;
 
1748
    entry= new (nothrow) user_var_entry(name.str, query_id);
 
1749
 
 
1750
    if (entry == NULL)
 
1751
      return NULL;
 
1752
 
 
1753
    std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(std::string(name.str, name.length), entry));
 
1754
 
 
1755
    if (not returnable.second)
 
1756
    {
 
1757
      delete entry;
 
1758
      return NULL;
 
1759
    }
1835
1760
  }
1836
1761
 
1837
1762
  return entry;
1838
1763
}
1839
1764
 
1840
 
void Session::setVariable(const std::string &name, const std::string &value)
1841
 
{
1842
 
  user_var_entry *updateable_var= getVariable(name.c_str(), true);
1843
 
 
1844
 
  updateable_var->update_hash(false,
1845
 
                              (void*)value.c_str(),
1846
 
                              static_cast<uint32_t>(value.length()), STRING_RESULT,
1847
 
                              &my_charset_bin,
1848
 
                              DERIVATION_IMPLICIT, false);
1849
 
}
1850
 
 
1851
 
void Open_tables_state::mark_temp_tables_as_free_for_reuse()
 
1765
void Session::mark_temp_tables_as_free_for_reuse()
1852
1766
{
1853
1767
  for (Table *table= temporary_tables ; table ; table= table->getNext())
1854
1768
  {
1855
 
    if (table->query_id == getQueryId())
 
1769
    if (table->query_id == query_id)
1856
1770
    {
1857
1771
      table->query_id= 0;
1858
1772
      table->cursor->ha_reset();
1864
1778
{
1865
1779
  for (; table ; table= table->getNext())
1866
1780
  {
1867
 
    if (table->query_id == getQueryId())
 
1781
    if (table->query_id == query_id)
1868
1782
    {
1869
1783
      table->query_id= 0;
1870
1784
      table->cursor->ha_reset();
1883
1797
*/
1884
1798
void Session::close_thread_tables()
1885
1799
{
1886
 
  clearDerivedTables();
 
1800
  if (derived_tables)
 
1801
    derived_tables= NULL; // They should all be invalid by this point
1887
1802
 
1888
1803
  /*
1889
1804
    Mark all temporary tables used by this statement as free for reuse.
1916
1831
      handled either before writing a query log event (inside
1917
1832
      binlog_query()) or when preparing a pending event.
1918
1833
     */
1919
 
    unlockTables(lock);
 
1834
    mysql_unlock_tables(this, lock);
1920
1835
    lock= 0;
1921
1836
  }
1922
1837
  /*
1923
 
    Note that we need to hold table::Cache::singleton().mutex() while changing the
 
1838
    Note that we need to hold LOCK_open while changing the
1924
1839
    open_tables list. Another thread may work on it.
1925
 
    (See: table::Cache::singleton().removeTable(), mysql_wait_completed_table())
 
1840
    (See: remove_table_from_cache(), mysql_wait_completed_table())
1926
1841
    Closing a MERGE child before the parent would be fatal if the
1927
1842
    other thread tries to abort the MERGE lock in between.
1928
1843
  */
1961
1876
    close_tables_for_reopen(&tables);
1962
1877
  }
1963
1878
  if ((mysql_handle_derived(lex, &mysql_derived_prepare) ||
1964
 
       (
 
1879
       (fill_derived_tables() &&
1965
1880
        mysql_handle_derived(lex, &mysql_derived_filling))))
1966
1881
    return true;
1967
1882
 
1968
1883
  return false;
1969
1884
}
1970
1885
 
 
1886
bool Session::openTables(TableList *tables, uint32_t flags)
 
1887
{
 
1888
  uint32_t counter;
 
1889
  bool ret= fill_derived_tables();
 
1890
  assert(ret == false);
 
1891
  if (open_tables_from_list(&tables, &counter, flags) ||
 
1892
      mysql_handle_derived(lex, &mysql_derived_prepare))
 
1893
  {
 
1894
    return true;
 
1895
  }
 
1896
  return false;
 
1897
}
 
1898
 
1971
1899
/*
1972
1900
  @note "best_effort" is used in cases were if a failure occurred on this
1973
1901
  operation it would not be surprising because we are only removing because there
1974
1902
  might be an issue (lame engines).
1975
1903
*/
1976
1904
 
1977
 
bool Open_tables_state::rm_temporary_table(const TableIdentifier &identifier, bool best_effort)
 
1905
bool Session::rm_temporary_table(TableIdentifier &identifier, bool best_effort)
1978
1906
{
1979
 
  if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), identifier))
 
1907
  if (plugin::StorageEngine::dropTable(*this, identifier))
1980
1908
  {
1981
1909
    if (not best_effort)
1982
1910
    {
1983
 
      std::string path;
1984
 
      identifier.getSQLPath(path);
1985
1911
      errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
1986
 
                    path.c_str(), errno);
 
1912
                    identifier.getSQLPath().c_str(), errno);
1987
1913
    }
1988
1914
 
1989
1915
    return true;
1992
1918
  return false;
1993
1919
}
1994
1920
 
1995
 
bool Open_tables_state::rm_temporary_table(plugin::StorageEngine *base, const TableIdentifier &identifier)
 
1921
bool Session::rm_temporary_table(plugin::StorageEngine *base, TableIdentifier &identifier)
1996
1922
{
1997
1923
  assert(base);
1998
1924
 
1999
 
  if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), *base, identifier))
 
1925
  if (plugin::StorageEngine::dropTable(*this, *base, identifier))
2000
1926
  {
2001
 
    std::string path;
2002
 
    identifier.getSQLPath(path);
2003
1927
    errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
2004
 
                  path.c_str(), errno);
 
1928
                  identifier.getSQLPath().c_str(), errno);
2005
1929
 
2006
1930
    return true;
2007
1931
  }
2013
1937
  @note this will be removed, I am looking through Hudson to see if it is finding
2014
1938
  any tables that are missed during cleanup.
2015
1939
*/
2016
 
void Open_tables_state::dumpTemporaryTableNames(const char *foo)
 
1940
void Session::dumpTemporaryTableNames(const char *foo)
2017
1941
{
2018
1942
  Table *table;
2019
1943
 
2037
1961
      cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
2038
1962
    }
2039
1963
    else
2040
 
    {
2041
1964
      cerr << "\tTabl;e Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2042
 
    }
2043
1965
  }
2044
1966
}
2045
1967
 
2046
 
bool Session::TableMessages::storeTableMessage(const TableIdentifier &identifier, message::Table &table_message)
 
1968
bool Session::storeTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2047
1969
{
2048
1970
  table_message_cache.insert(make_pair(identifier.getPath(), table_message));
2049
1971
 
2050
1972
  return true;
2051
1973
}
2052
1974
 
2053
 
bool Session::TableMessages::removeTableMessage(const TableIdentifier &identifier)
 
1975
bool Session::removeTableMessage(const TableIdentifier &identifier)
2054
1976
{
2055
1977
  TableMessageCache::iterator iter;
2056
1978
 
2064
1986
  return true;
2065
1987
}
2066
1988
 
2067
 
bool Session::TableMessages::getTableMessage(const TableIdentifier &identifier, message::Table &table_message)
 
1989
bool Session::getTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2068
1990
{
2069
1991
  TableMessageCache::iterator iter;
2070
1992
 
2078
2000
  return true;
2079
2001
}
2080
2002
 
2081
 
bool Session::TableMessages::doesTableMessageExist(const TableIdentifier &identifier)
 
2003
bool Session::doesTableMessageExist(const TableIdentifier &identifier)
2082
2004
{
2083
2005
  TableMessageCache::iterator iter;
2084
2006
 
2092
2014
  return true;
2093
2015
}
2094
2016
 
2095
 
bool Session::TableMessages::renameTableMessage(const TableIdentifier &from, const TableIdentifier &to)
 
2017
bool Session::renameTableMessage(const TableIdentifier &from, const TableIdentifier &to)
2096
2018
{
2097
2019
  TableMessageCache::iterator iter;
2098
2020
 
2111
2033
  return true;
2112
2034
}
2113
2035
 
2114
 
table::Instance *Session::getInstanceTable()
2115
 
{
2116
 
  temporary_shares.push_back(new table::Instance()); // This will not go into the tableshare cache, so no key is used.
2117
 
 
2118
 
  table::Instance *tmp_share= temporary_shares.back();
2119
 
 
2120
 
  assert(tmp_share);
2121
 
 
2122
 
  return tmp_share;
2123
 
}
2124
 
 
2125
 
 
2126
 
/**
2127
 
  Create a reduced Table object with properly set up Field list from a
2128
 
  list of field definitions.
2129
 
 
2130
 
    The created table doesn't have a table Cursor associated with
2131
 
    it, has no keys, no group/distinct, no copy_funcs array.
2132
 
    The sole purpose of this Table object is to use the power of Field
2133
 
    class to read/write data to/from table->getInsertRecord(). Then one can store
2134
 
    the record in any container (RB tree, hash, etc).
2135
 
    The table is created in Session mem_root, so are the table's fields.
2136
 
    Consequently, if you don't BLOB fields, you don't need to free it.
2137
 
 
2138
 
  @param session         connection handle
2139
 
  @param field_list  list of column definitions
2140
 
 
2141
 
  @return
2142
 
    0 if out of memory, Table object in case of success
2143
 
*/
2144
 
table::Instance *Session::getInstanceTable(List<CreateField> &field_list)
2145
 
{
2146
 
  temporary_shares.push_back(new table::Instance(this, field_list)); // This will not go into the tableshare cache, so no key is used.
2147
 
 
2148
 
  table::Instance *tmp_share= temporary_shares.back();
2149
 
 
2150
 
  assert(tmp_share);
2151
 
 
2152
 
  return tmp_share;
2153
 
}
2154
 
 
2155
 
namespace display  {
2156
 
 
2157
 
static const std::string NONE= "NONE";
2158
 
static const std::string GOT_GLOBAL_READ_LOCK= "HAS GLOBAL READ LOCK";
2159
 
static const std::string MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT= "HAS GLOBAL READ LOCK WITH BLOCKING COMMIT";
2160
 
 
2161
 
const std::string &type(drizzled::Session::global_read_lock_t type)
2162
 
{
2163
 
  switch (type) {
2164
 
    default:
2165
 
    case Session::NONE:
2166
 
      return NONE;
2167
 
    case Session::GOT_GLOBAL_READ_LOCK:
2168
 
      return GOT_GLOBAL_READ_LOCK;
2169
 
    case Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT:
2170
 
      return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT;
2171
 
  }
2172
 
}
2173
 
 
2174
 
size_t max_string_length(drizzled::Session::global_read_lock_t)
2175
 
{
2176
 
  return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT.size();
2177
 
}
2178
 
 
2179
 
} /* namespace display */
 
2036
TableShareInstance *Session::getTemporaryShare(TableIdentifier::Type type_arg)
 
2037
{
 
2038
  temporary_shares.push_back(new TableShareInstance(type_arg)); // This will not go into the tableshare cache, so no key is used.
 
2039
 
 
2040
  TableShareInstance *tmp_share= temporary_shares.back();
 
2041
 
 
2042
  assert(tmp_share);
 
2043
 
 
2044
  return tmp_share;
 
2045
}
2180
2046
 
2181
2047
} /* namespace drizzled */