~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/session.cc

  • Committer: Brian Aker
  • Date: 2010-11-06 15:43:10 UTC
  • mfrom: (1908.1.1 merge)
  • Revision ID: brian@tangent.org-20101106154310-g1jpjzwbc53pfc4f
Filesort encapsulation, plus modification to copy contructor

Show diffs side-by-side

added added

removed removed

Lines of Context:
57
57
 
58
58
#include "drizzled/util/functors.h"
59
59
 
60
 
#include "drizzled/display.h"
61
 
 
62
60
#include <fcntl.h>
63
61
#include <algorithm>
64
62
#include <climits>
65
63
#include <boost/filesystem.hpp>
66
64
 
67
 
#include "drizzled/util/backtrace.h"
68
 
 
69
65
using namespace std;
70
66
 
71
67
namespace fs=boost::filesystem;
163
159
  mem_root(&main_mem_root),
164
160
  xa_id(0),
165
161
  lex(&main_lex),
166
 
  query(new std::string),
167
162
  catalog("LOCAL"),
168
163
  client(client_arg),
169
164
  scheduler(NULL),
171
166
  lock_id(&main_lock_id),
172
167
  user_time(0),
173
168
  ha_data(plugin::num_trx_monitored_objects),
174
 
  concurrent_execute_allowed(true),
175
169
  arg_of_last_insert_id_function(false),
176
170
  first_successful_insert_id_in_prev_stmt(0),
177
171
  first_successful_insert_id_in_cur_stmt(0),
178
172
  limit_found_rows(0),
179
 
  _global_read_lock(NONE),
180
 
  _killed(NOT_KILLED),
 
173
  global_read_lock(0),
181
174
  some_tables_deleted(false),
182
175
  no_errors(false),
183
176
  password(false),
193
186
  session_event_observers(NULL),
194
187
  use_usage(false)
195
188
{
 
189
  memset(process_list_info, 0, PROCESS_LIST_WIDTH);
196
190
  client->setSession(this);
197
191
 
198
192
  /*
203
197
  memory::init_sql_alloc(&main_mem_root, memory::ROOT_MIN_BLOCK_SIZE, 0);
204
198
  thread_stack= NULL;
205
199
  count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
 
200
  killed= NOT_KILLED;
206
201
  col_access= 0;
207
202
  tmp_table= 0;
208
203
  used_tables= 0;
337
332
{
338
333
  assert(cleanup_done == false);
339
334
 
340
 
  setKilled(KILL_CONNECTION);
 
335
  killed= KILL_CONNECTION;
341
336
#ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE
342
337
  if (transaction.xid_state.xa_state == XA_PREPARED)
343
338
  {
363
358
  close_temporary_tables();
364
359
 
365
360
  if (global_read_lock)
366
 
  {
367
 
    unlockGlobalReadLock();
368
 
  }
 
361
    unlock_global_read_lock(this);
369
362
 
370
363
  cleanup_done= true;
371
364
}
410
403
    delete (*iter).second;
411
404
  }
412
405
  life_properties.clear();
413
 
}
414
 
 
415
 
void Session::setClient(plugin::Client *client_arg)
416
 
{
417
 
  client= client_arg;
418
 
  client->setSession(this);
419
 
}
420
 
 
421
 
void Session::awake(Session::killed_state_t state_to_set)
 
406
 
 
407
  /* Ensure that no one is using Session */
 
408
  LOCK_delete.unlock();
 
409
}
 
410
 
 
411
void Session::awake(Session::killed_state state_to_set)
422
412
{
423
413
  this->checkSentry();
424
 
 
425
 
  setKilled(state_to_set);
426
 
  scheduler->killSession(this);
427
 
 
 
414
  safe_mutex_assert_owner(&LOCK_delete);
 
415
 
 
416
  killed= state_to_set;
428
417
  if (state_to_set != Session::KILL_QUERY)
429
418
  {
 
419
    scheduler->killSession(this);
430
420
    DRIZZLE_CONNECTION_DONE(thread_id);
431
421
  }
432
 
 
433
422
  if (mysys_var)
434
423
  {
435
424
    boost_unique_lock_t scopedLock(mysys_var->mutex);
542
531
 
543
532
  prepareForQueries();
544
533
 
545
 
  while (not client->haveError() && getKilled() != KILL_CONNECTION)
 
534
  while (! client->haveError() && killed != KILL_CONNECTION)
546
535
  {
547
 
    if (not executeStatement())
 
536
    if (! executeStatement())
548
537
      break;
549
538
  }
550
539
 
551
540
  disconnect(0, true);
552
541
}
553
542
 
554
 
bool Session::schedule(Session::shared_ptr &arg)
 
543
bool Session::schedule()
555
544
{
556
 
  arg->scheduler= plugin::Scheduler::getScheduler();
557
 
  assert(arg->scheduler);
 
545
  scheduler= plugin::Scheduler::getScheduler();
 
546
  assert(scheduler);
558
547
 
559
548
  connection_count.increment();
560
549
 
564
553
  }
565
554
 
566
555
  current_global_counters.connections++;
567
 
  arg->thread_id= arg->variables.pseudo_thread_id= global_thread_id++;
568
 
 
569
 
  session::Cache::singleton().insert(arg);
570
 
 
571
 
  if (unlikely(plugin::EventObserver::connectSession(*arg)))
572
 
  {
573
 
    // We should do something about an error...
574
 
  }
575
 
 
576
 
  if (plugin::Scheduler::getScheduler()->addSession(arg))
577
 
  {
578
 
    DRIZZLE_CONNECTION_START(arg->getSessionId());
 
556
  thread_id= variables.pseudo_thread_id= global_thread_id++;
 
557
 
 
558
  {
 
559
    boost::mutex::scoped_lock scoped(LOCK_thread_count);
 
560
    getSessionList().push_back(this);
 
561
  }
 
562
 
 
563
  if (unlikely(plugin::EventObserver::connectSession(*this)))
 
564
  {
 
565
    // We should do something about an error...
 
566
  }
 
567
 
 
568
  if (unlikely(plugin::EventObserver::connectSession(*this)))
 
569
  {
 
570
    // We should do something about an error...
 
571
  }
 
572
 
 
573
  if (scheduler->addSession(this))
 
574
  {
 
575
    DRIZZLE_CONNECTION_START(thread_id);
579
576
    char error_message_buff[DRIZZLE_ERRMSG_SIZE];
580
577
 
581
 
    arg->setKilled(Session::KILL_CONNECTION);
 
578
    killed= Session::KILL_CONNECTION;
582
579
 
583
 
    arg->status_var.aborted_connects++;
 
580
    status_var.aborted_connects++;
584
581
 
585
582
    /* Can't use my_error() since store_globals has not been called. */
586
583
    /* TODO replace will better error message */
587
584
    snprintf(error_message_buff, sizeof(error_message_buff),
588
585
             ER(ER_CANT_CREATE_THREAD), 1);
589
 
    arg->client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
590
 
 
 
586
    client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
591
587
    return true;
592
588
  }
593
589
 
595
591
}
596
592
 
597
593
 
598
 
/*
599
 
  Is this session viewable by the current user?
600
 
*/
601
 
bool Session::isViewable() const
602
 
{
603
 
  return plugin::Authorization::isAuthorized(current_session->getSecurityContext(),
604
 
                                             this,
605
 
                                             false);
606
 
}
607
 
 
608
 
 
609
594
const char* Session::enter_cond(boost::condition_variable_any &cond, boost::mutex &mutex, const char* msg)
610
595
{
611
596
  const char* old_msg = get_proc_info();
689
674
  main_da.reset_diagnostics_area();
690
675
 
691
676
  if (client->readCommand(&l_packet, &packet_length) == false)
692
 
  {
693
677
    return false;
694
 
  }
695
678
 
696
 
  if (getKilled() == KILL_CONNECTION)
 
679
  if (killed == KILL_CONNECTION)
697
680
    return false;
698
681
 
699
682
  if (packet_length == 0)
700
683
    return true;
701
684
 
702
 
  l_command= static_cast<enum_server_command>(l_packet[0]);
 
685
  l_command= (enum enum_server_command) (unsigned char) l_packet[0];
703
686
 
704
687
  if (command >= COM_END)
705
688
    command= COM_END;                           // Wrong command
706
689
 
707
690
  assert(packet_length);
708
 
  return not dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
 
691
  return ! dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
709
692
}
710
693
 
711
694
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
717
700
    in_packet_length--;
718
701
  }
719
702
  const char *pos= in_packet + in_packet_length; /* Point at end null */
720
 
  while (in_packet_length > 0 && (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
 
703
  while (in_packet_length > 0 &&
 
704
         (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
721
705
  {
722
706
    pos--;
723
707
    in_packet_length--;
724
708
  }
725
709
 
726
 
  query.reset(new std::string(in_packet, in_packet + in_packet_length));
 
710
  query.assign(in_packet, in_packet + in_packet_length);
727
711
 
728
712
  return true;
729
713
}
778
762
  }
779
763
 
780
764
  if (result == false)
781
 
  {
782
765
    my_error(killed_errno(), MYF(0));
783
 
  }
784
766
  else if ((result == true) && do_release)
785
 
  {
786
 
    setKilled(Session::KILL_CONNECTION);
787
 
  }
 
767
    killed= Session::KILL_CONNECTION;
788
768
 
789
769
  return result;
790
770
}
943
923
  my_message(errcode, err, MYF(0));
944
924
  if (file > 0)
945
925
  {
946
 
    (void) cache->end_io_cache();
 
926
    (void) end_io_cache(cache);
947
927
    (void) internal::my_close(file, MYF(0));
948
928
    (void) internal::my_delete(path.file_string().c_str(), MYF(0));             // Delete file on error
949
929
    file= -1;
953
933
 
954
934
bool select_to_file::send_eof()
955
935
{
956
 
  int error= test(cache->end_io_cache());
 
936
  int error= test(end_io_cache(cache));
957
937
  if (internal::my_close(file, MYF(MY_WME)))
958
938
    error= 1;
959
939
  if (!error)
975
955
  /* In case of error send_eof() may be not called: close the file here. */
976
956
  if (file >= 0)
977
957
  {
978
 
    (void) cache->end_io_cache();
 
958
    (void) end_io_cache(cache);
979
959
    (void) internal::my_close(file, MYF(0));
980
960
    file= -1;
981
961
  }
1073
1053
  if ((file= internal::my_create(target_path.file_string().c_str(), 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
1074
1054
    return file;
1075
1055
  (void) fchmod(file, 0666);                    // Because of umask()
1076
 
  if (cache->init_io_cache(file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
 
1056
  if (init_io_cache(cache, file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1077
1057
  {
1078
1058
    internal::my_close(file, MYF(0));
1079
1059
    internal::my_delete(target_path.file_string().c_str(), MYF(0));  // Delete file on error, it was just created
1605
1585
}
1606
1586
 
1607
1587
 
 
1588
 
 
1589
 
 
1590
/**
 
1591
  Check the killed state of a user thread
 
1592
  @param session  user thread
 
1593
  @retval 0 the user thread is active
 
1594
  @retval 1 the user thread has been killed
 
1595
*/
 
1596
int session_killed(const Session *session)
 
1597
{
 
1598
  return(session->killed);
 
1599
}
 
1600
 
 
1601
 
 
1602
const struct charset_info_st *session_charset(Session *session)
 
1603
{
 
1604
  return(session->charset());
 
1605
}
 
1606
 
1608
1607
/**
1609
1608
  Mark transaction to rollback and mark error as fatal to a sub-statement.
1610
1609
 
1626
1625
  plugin_sessionvar_cleanup(this);
1627
1626
 
1628
1627
  /* If necessary, log any aborted or unauthorized connections */
1629
 
  if (getKilled() || client->wasAborted())
 
1628
  if (killed || client->wasAborted())
1630
1629
  {
1631
1630
    status_var.aborted_threads++;
1632
1631
  }
1633
1632
 
1634
1633
  if (client->wasAborted())
1635
1634
  {
1636
 
    if (not getKilled() && variables.log_warnings > 1)
 
1635
    if (! killed && variables.log_warnings > 1)
1637
1636
    {
1638
1637
      SecurityContext *sctx= &security_ctx;
1639
1638
 
1648
1647
 
1649
1648
  /* Close out our connection to the client */
1650
1649
  if (should_lock)
1651
 
    session::Cache::singleton().mutex().lock();
1652
 
 
1653
 
  setKilled(Session::KILL_CONNECTION);
1654
 
 
 
1650
    LOCK_thread_count.lock();
 
1651
  killed= Session::KILL_CONNECTION;
1655
1652
  if (client->isConnected())
1656
1653
  {
1657
1654
    if (errcode)
1661
1658
    }
1662
1659
    client->close();
1663
1660
  }
1664
 
 
1665
1661
  if (should_lock)
1666
 
  {
1667
 
    session::Cache::singleton().mutex().unlock();
1668
 
  }
 
1662
    (void) LOCK_thread_count.unlock();
1669
1663
}
1670
1664
 
1671
1665
void Session::reset_for_next_command()
1693
1687
  Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
1694
1688
*/
1695
1689
 
1696
 
void Open_tables_state::close_temporary_tables()
 
1690
void Session::close_temporary_tables()
1697
1691
{
1698
1692
  Table *table;
1699
1693
  Table *tmp_next;
1713
1707
  unlink from session->temporary tables and close temporary table
1714
1708
*/
1715
1709
 
1716
 
void Open_tables_state::close_temporary_table(Table *table)
 
1710
void Session::close_temporary_table(Table *table)
1717
1711
{
1718
1712
  if (table->getPrev())
1719
1713
  {
1749
1743
  If this is needed, use close_temporary_table()
1750
1744
*/
1751
1745
 
1752
 
void Open_tables_state::nukeTable(Table *table)
 
1746
void Session::nukeTable(Table *table)
1753
1747
{
1754
1748
  plugin::StorageEngine *table_type= table->getShare()->db_type();
1755
1749
 
1823
1817
                              DERIVATION_IMPLICIT, false);
1824
1818
}
1825
1819
 
1826
 
void Open_tables_state::mark_temp_tables_as_free_for_reuse()
 
1820
void Session::mark_temp_tables_as_free_for_reuse()
1827
1821
{
1828
1822
  for (Table *table= temporary_tables ; table ; table= table->getNext())
1829
1823
  {
1830
 
    if (table->query_id == getQueryId())
 
1824
    if (table->query_id == query_id)
1831
1825
    {
1832
1826
      table->query_id= 0;
1833
1827
      table->cursor->ha_reset();
1839
1833
{
1840
1834
  for (; table ; table= table->getNext())
1841
1835
  {
1842
 
    if (table->query_id == getQueryId())
 
1836
    if (table->query_id == query_id)
1843
1837
    {
1844
1838
      table->query_id= 0;
1845
1839
      table->cursor->ha_reset();
1858
1852
*/
1859
1853
void Session::close_thread_tables()
1860
1854
{
1861
 
  clearDerivedTables();
 
1855
  if (derived_tables)
 
1856
    derived_tables= NULL; // They should all be invalid by this point
1862
1857
 
1863
1858
  /*
1864
1859
    Mark all temporary tables used by this statement as free for reuse.
1891
1886
      handled either before writing a query log event (inside
1892
1887
      binlog_query()) or when preparing a pending event.
1893
1888
     */
1894
 
    unlockTables(lock);
 
1889
    mysql_unlock_tables(this, lock);
1895
1890
    lock= 0;
1896
1891
  }
1897
1892
  /*
1898
 
    Note that we need to hold table::Cache::singleton().mutex() while changing the
 
1893
    Note that we need to hold LOCK_open while changing the
1899
1894
    open_tables list. Another thread may work on it.
1900
1895
    (See: table::Cache::singleton().removeTable(), mysql_wait_completed_table())
1901
1896
    Closing a MERGE child before the parent would be fatal if the
1949
1944
  might be an issue (lame engines).
1950
1945
*/
1951
1946
 
1952
 
bool Open_tables_state::rm_temporary_table(const TableIdentifier &identifier, bool best_effort)
 
1947
bool Session::rm_temporary_table(TableIdentifier &identifier, bool best_effort)
1953
1948
{
1954
 
  if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), identifier))
 
1949
  if (plugin::StorageEngine::dropTable(*this, identifier))
1955
1950
  {
1956
1951
    if (not best_effort)
1957
1952
    {
1958
 
      std::string path;
1959
 
      identifier.getSQLPath(path);
1960
1953
      errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
1961
 
                    path.c_str(), errno);
 
1954
                    identifier.getSQLPath().c_str(), errno);
1962
1955
    }
1963
1956
 
1964
1957
    return true;
1967
1960
  return false;
1968
1961
}
1969
1962
 
1970
 
bool Open_tables_state::rm_temporary_table(plugin::StorageEngine *base, const TableIdentifier &identifier)
 
1963
bool Session::rm_temporary_table(plugin::StorageEngine *base, TableIdentifier &identifier)
1971
1964
{
1972
1965
  assert(base);
1973
1966
 
1974
 
  if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), *base, identifier))
 
1967
  if (plugin::StorageEngine::dropTable(*this, *base, identifier))
1975
1968
  {
1976
 
    std::string path;
1977
 
    identifier.getSQLPath(path);
1978
1969
    errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
1979
 
                  path.c_str(), errno);
 
1970
                  identifier.getSQLPath().c_str(), errno);
1980
1971
 
1981
1972
    return true;
1982
1973
  }
1988
1979
  @note this will be removed, I am looking through Hudson to see if it is finding
1989
1980
  any tables that are missed during cleanup.
1990
1981
*/
1991
 
void Open_tables_state::dumpTemporaryTableNames(const char *foo)
 
1982
void Session::dumpTemporaryTableNames(const char *foo)
1992
1983
{
1993
1984
  Table *table;
1994
1985
 
2016
2007
  }
2017
2008
}
2018
2009
 
2019
 
bool Session::TableMessages::storeTableMessage(const TableIdentifier &identifier, message::Table &table_message)
 
2010
bool Session::storeTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2020
2011
{
2021
2012
  table_message_cache.insert(make_pair(identifier.getPath(), table_message));
2022
2013
 
2023
2014
  return true;
2024
2015
}
2025
2016
 
2026
 
bool Session::TableMessages::removeTableMessage(const TableIdentifier &identifier)
 
2017
bool Session::removeTableMessage(const TableIdentifier &identifier)
2027
2018
{
2028
2019
  TableMessageCache::iterator iter;
2029
2020
 
2037
2028
  return true;
2038
2029
}
2039
2030
 
2040
 
bool Session::TableMessages::getTableMessage(const TableIdentifier &identifier, message::Table &table_message)
 
2031
bool Session::getTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2041
2032
{
2042
2033
  TableMessageCache::iterator iter;
2043
2034
 
2051
2042
  return true;
2052
2043
}
2053
2044
 
2054
 
bool Session::TableMessages::doesTableMessageExist(const TableIdentifier &identifier)
 
2045
bool Session::doesTableMessageExist(const TableIdentifier &identifier)
2055
2046
{
2056
2047
  TableMessageCache::iterator iter;
2057
2048
 
2065
2056
  return true;
2066
2057
}
2067
2058
 
2068
 
bool Session::TableMessages::renameTableMessage(const TableIdentifier &from, const TableIdentifier &to)
 
2059
bool Session::renameTableMessage(const TableIdentifier &from, const TableIdentifier &to)
2069
2060
{
2070
2061
  TableMessageCache::iterator iter;
2071
2062
 
2125
2116
  return tmp_share;
2126
2117
}
2127
2118
 
2128
 
namespace display  {
2129
 
 
2130
 
static const std::string NONE= "NONE";
2131
 
static const std::string GOT_GLOBAL_READ_LOCK= "HAS GLOBAL READ LOCK";
2132
 
static const std::string MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT= "HAS GLOBAL READ LOCK WITH BLOCKING COMMIT";
2133
 
 
2134
 
const std::string &type(drizzled::Session::global_read_lock_t type)
2135
 
{
2136
 
  switch (type) {
2137
 
    default:
2138
 
    case Session::NONE:
2139
 
      return NONE;
2140
 
    case Session::GOT_GLOBAL_READ_LOCK:
2141
 
      return GOT_GLOBAL_READ_LOCK;
2142
 
    case Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT:
2143
 
      return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT;
2144
 
  }
2145
 
}
2146
 
 
2147
 
size_t max_string_length(drizzled::Session::global_read_lock_t)
2148
 
{
2149
 
  return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT.size();
2150
 
}
2151
 
 
2152
 
} /* namespace display */
2153
 
 
2154
2119
} /* namespace drizzled */