~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/session.cc

  • Committer: Monty Taylor
  • Date: 2010-01-12 21:34:24 UTC
  • mto: This revision was merged to the branch mainline in revision 1268.
  • Revision ID: mordred@inaugust.com-20100112213424-6mslywtlca49mvnk
Updated to pandora-buld v0.94

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
 
 *  Copyright (C) 2008 Sun Microsystems, Inc.
 
4
 *  Copyright (C) 2008 Sun Microsystems
5
5
 *
6
6
 *  This program is free software; you can redistribute it and/or modify
7
7
 *  it under the terms of the GNU General Public License as published by
21
21
 * @file Implementation of the Session class and API
22
22
 */
23
23
 
24
 
#include <config.h>
25
 
 
26
 
#include <drizzled/copy_field.h>
27
 
#include <drizzled/data_home.h>
28
 
#include <drizzled/display.h>
29
 
#include <drizzled/drizzled.h>
 
24
#include "config.h"
 
25
#include <drizzled/session.h>
 
26
#include "drizzled/session_list.h"
 
27
#include <sys/stat.h>
 
28
#include "drizzled/my_error.h"
30
29
#include <drizzled/error.h>
31
30
#include <drizzled/gettext.h>
32
 
#include <drizzled/identifier.h>
33
 
#include <drizzled/internal/iocache.h>
34
 
#include <drizzled/internal/thread_var.h>
35
 
#include <drizzled/internal_error_handler.h>
 
31
#include <drizzled/query_id.h>
 
32
#include <drizzled/data_home.h>
 
33
#include <drizzled/sql_base.h>
 
34
#include <drizzled/lock.h>
36
35
#include <drizzled/item/cache.h>
37
 
#include <drizzled/item/empty_string.h>
38
36
#include <drizzled/item/float.h>
39
37
#include <drizzled/item/return_int.h>
40
 
#include <drizzled/lock.h>
41
 
#include <drizzled/plugin/authentication.h>
 
38
#include <drizzled/item/empty_string.h>
 
39
#include <drizzled/show.h>
42
40
#include <drizzled/plugin/client.h>
43
 
#include <drizzled/plugin/event_observer.h>
44
 
#include <drizzled/plugin/logging.h>
45
 
#include <drizzled/plugin/query_rewrite.h>
46
 
#include <drizzled/plugin/scheduler.h>
47
 
#include <drizzled/plugin/transactional_storage_engine.h>
48
 
#include <drizzled/probes.h>
49
 
#include <drizzled/pthread_globals.h>
50
 
#include <drizzled/query_id.h>
51
 
#include <drizzled/refresh_version.h>
52
 
#include <drizzled/select_dump.h>
53
 
#include <drizzled/select_exists_subselect.h>
54
 
#include <drizzled/select_export.h>
55
 
#include <drizzled/select_max_min_finder_subselect.h>
56
 
#include <drizzled/select_singlerow_subselect.h>
57
 
#include <drizzled/select_subselect.h>
58
 
#include <drizzled/select_to_file.h>
59
 
#include <drizzled/session.h>
60
 
#include <drizzled/session/cache.h>
61
 
#include <drizzled/show.h>
62
 
#include <drizzled/sql_base.h>
63
 
#include <drizzled/table/singular.h>
64
 
#include <drizzled/table_proto.h>
65
 
#include <drizzled/tmp_table_param.h>
66
 
#include <drizzled/transaction_services.h>
67
 
#include <drizzled/user_var_entry.h>
68
 
#include <drizzled/util/functors.h>
69
 
#include <plugin/myisam/myisam.h>
70
 
 
 
41
#include "drizzled/plugin/scheduler.h"
 
42
#include "drizzled/plugin/authentication.h"
 
43
#include "drizzled/probes.h"
 
44
#include "drizzled/table_proto.h"
 
45
#include "drizzled/db.h"
 
46
#include "drizzled/pthread_globals.h"
 
47
 
 
48
#include "plugin/myisam/myisam.h"
 
49
#include "drizzled/internal/iocache.h"
 
50
 
 
51
#include <fcntl.h>
71
52
#include <algorithm>
72
53
#include <climits>
73
 
#include <fcntl.h>
74
 
#include <sys/stat.h>
75
 
 
76
 
#include <boost/filesystem.hpp>
77
 
#include <boost/checked_delete.hpp>
78
 
 
79
 
#include <drizzled/util/backtrace.h>
80
 
 
81
 
#include <drizzled/schema.h>
82
54
 
83
55
using namespace std;
 
56
using namespace drizzled;
84
57
 
85
 
namespace fs=boost::filesystem;
86
 
namespace drizzled
 
58
extern "C"
87
59
{
 
60
  unsigned char *get_var_key(user_var_entry *entry, size_t *length, bool );
 
61
  void free_user_var(user_var_entry *entry);
 
62
}
88
63
 
89
64
/*
90
65
  The following is used to initialise Table_ident with a internal
94
69
char empty_c_string[1]= {0};    /* used for not defined db */
95
70
 
96
71
const char * const Session::DEFAULT_WHERE= "field list";
 
72
extern pthread_key_t THR_Session;
 
73
extern pthread_key_t THR_Mem_root;
 
74
extern uint32_t max_used_connections;
 
75
extern drizzled::atomic<uint32_t> connection_count;
 
76
 
 
77
 
 
78
/****************************************************************************
 
79
** User variables
 
80
****************************************************************************/
 
81
unsigned char *get_var_key(user_var_entry *entry, size_t *length, bool )
 
82
{
 
83
  *length= entry->name.length;
 
84
  return (unsigned char*) entry->name.str;
 
85
}
 
86
 
 
87
void free_user_var(user_var_entry *entry)
 
88
{
 
89
  delete entry;
 
90
}
97
91
 
98
92
bool Key_part_spec::operator==(const Key_part_spec& other) const
99
93
{
100
94
  return length == other.length &&
101
95
         field_name.length == other.field_name.length &&
102
 
    !my_strcasecmp(system_charset_info, field_name.str, other.field_name.str);
 
96
         !strcmp(field_name.str, other.field_name.str);
103
97
}
104
98
 
105
 
Open_tables_state::Open_tables_state(uint64_t version_arg) :
106
 
  version(version_arg)
 
99
Open_tables_state::Open_tables_state(uint64_t version_arg)
 
100
  :version(version_arg), backups_available(false)
107
101
{
108
 
  open_tables= temporary_tables= derived_tables= NULL;
109
 
  extra_lock= lock= NULL;
 
102
  reset_open_tables_state();
110
103
}
111
104
 
112
105
/*
113
106
  The following functions form part of the C plugin API
114
107
*/
115
 
int tmpfile(const char *prefix)
 
108
extern "C" int mysql_tmpfile(const char *prefix)
116
109
{
117
110
  char filename[FN_REFLEN];
118
 
  int fd = internal::create_temp_file(filename, drizzle_tmpdir.c_str(), prefix, MYF(MY_WME));
 
111
  int fd = create_temp_file(filename, drizzle_tmpdir, prefix, MYF(MY_WME));
119
112
  if (fd >= 0) {
120
113
    unlink(filename);
121
114
  }
123
116
  return fd;
124
117
}
125
118
 
126
 
void **Session::getEngineData(const plugin::MonitoredInTransaction *monitored)
127
 
{
128
 
  return static_cast<void **>(&ha_data[monitored->getId()].ha_ptr);
129
 
}
130
 
 
131
 
ResourceContext *Session::getResourceContext(const plugin::MonitoredInTransaction *monitored,
132
 
                                             size_t index)
133
 
{
134
 
  return &ha_data[monitored->getId()].resource_context[index];
135
 
}
136
 
 
 
119
extern "C"
 
120
int session_tablespace_op(const Session *session)
 
121
{
 
122
  return test(session->tablespace_op);
 
123
}
 
124
 
 
125
/**
 
126
   Set the process info field of the Session structure.
 
127
 
 
128
   This function is used by plug-ins. Internally, the
 
129
   Session::set_proc_info() function should be used.
 
130
 
 
131
   @see Session::set_proc_info
 
132
 */
 
133
extern "C" void
 
134
set_session_proc_info(Session *session, const char *info)
 
135
{
 
136
  session->set_proc_info(info);
 
137
}
 
138
 
 
139
extern "C"
 
140
const char *get_session_proc_info(Session *session)
 
141
{
 
142
  return session->get_proc_info();
 
143
}
 
144
 
 
145
void **Session::getEngineData(const plugin::StorageEngine *engine)
 
146
{
 
147
  return static_cast<void **>(&ha_data[engine->slot].ha_ptr);
 
148
}
 
149
 
 
150
Ha_trx_info *Session::getEngineInfo(const plugin::StorageEngine *engine,
 
151
                                    size_t index)
 
152
{
 
153
  return &ha_data[engine->getSlot()].ha_info[index];
 
154
}
 
155
 
 
156
extern "C"
137
157
int64_t session_test_options(const Session *session, int64_t test_options)
138
158
{
139
159
  return session->options & test_options;
140
160
}
141
161
 
142
 
Session::Session(plugin::Client *client_arg, catalog::Instance::shared_ptr catalog_arg) :
 
162
extern "C"
 
163
int session_sql_command(const Session *session)
 
164
{
 
165
  return (int) session->lex->sql_command;
 
166
}
 
167
 
 
168
extern "C"
 
169
int session_tx_isolation(const Session *session)
 
170
{
 
171
  return (int) session->variables.tx_isolation;
 
172
}
 
173
 
 
174
Session::Session(plugin::Client *client_arg)
 
175
  :
143
176
  Open_tables_state(refresh_version),
144
177
  mem_root(&main_mem_root),
145
 
  xa_id(0),
146
178
  lex(&main_lex),
147
 
  query(new std::string),
148
 
  _schema(new std::string("")),
149
179
  client(client_arg),
150
180
  scheduler(NULL),
151
181
  scheduler_arg(NULL),
152
182
  lock_id(&main_lock_id),
153
 
  thread_stack(NULL),
154
 
  security_ctx(identifier::User::make_shared()),
155
 
  _where(Session::DEFAULT_WHERE),
156
 
  dbug_sentry(Session_SENTRY_MAGIC),
157
 
  mysys_var(0),
158
 
  command(COM_CONNECT),
159
 
  file_id(0),
160
 
  _epoch(boost::gregorian::date(1970,1,1)),
161
 
  _connect_time(boost::posix_time::microsec_clock::universal_time()),
162
 
  utime_after_lock(0),
163
 
  ha_data(plugin::num_trx_monitored_objects),
164
 
  query_id(0),
165
 
  warn_query_id(0),
166
 
  concurrent_execute_allowed(true),
 
183
  user_time(0),
167
184
  arg_of_last_insert_id_function(false),
168
185
  first_successful_insert_id_in_prev_stmt(0),
169
186
  first_successful_insert_id_in_cur_stmt(0),
170
187
  limit_found_rows(0),
171
 
  options(session_startup_options),
172
 
  row_count_func(-1),
173
 
  sent_row_count(0),
174
 
  examined_row_count(0),
175
 
  used_tables(0),
176
 
  total_warn_count(0),
177
 
  col_access(0),
178
 
  statement_id_counter(0),
179
 
  row_count(0),
180
 
  thread_id(0),
181
 
  tmp_table(0),
182
 
  _global_read_lock(NONE),
183
 
  count_cuted_fields(CHECK_FIELD_ERROR_FOR_NULL),
184
 
  _killed(NOT_KILLED),
 
188
  global_read_lock(0),
185
189
  some_tables_deleted(false),
186
190
  no_errors(false),
187
191
  password(false),
188
192
  is_fatal_error(false),
189
193
  transaction_rollback_request(false),
190
194
  is_fatal_sub_stmt_error(0),
 
195
  derived_tables_processing(false),
191
196
  tablespace_op(false),
192
 
  derived_tables_processing(false),
193
197
  m_lip(NULL),
194
198
  cached_table(0),
195
199
  transaction_message(NULL),
196
 
  statement_message(NULL),
197
 
  session_event_observers(NULL),
198
 
  _catalog(catalog_arg),
199
 
  use_usage(false)
 
200
  statement_message(NULL)
200
201
{
 
202
  memset(process_list_info, 0, PROCESS_LIST_WIDTH);
201
203
  client->setSession(this);
202
204
 
203
205
  /*
206
208
    will be re-initialized in init_for_queries().
207
209
  */
208
210
  memory::init_sql_alloc(&main_mem_root, memory::ROOT_MIN_BLOCK_SIZE, 0);
 
211
  thread_stack= NULL;
 
212
  count_cuted_fields= CHECK_FIELD_IGNORE;
 
213
  killed= NOT_KILLED;
 
214
  col_access= 0;
 
215
  tmp_table= 0;
 
216
  used_tables= 0;
209
217
  cuted_fields= sent_row_count= row_count= 0L;
 
218
  row_count_func= -1;
 
219
  statement_id_counter= 0UL;
210
220
  // Must be reset to handle error with Session's created for init of mysqld
211
221
  lex->current_select= 0;
 
222
  start_time=(time_t) 0;
 
223
  start_utime= 0L;
 
224
  utime_after_lock= 0L;
212
225
  memset(&variables, 0, sizeof(variables));
213
 
  scoreboard_index= -1;
214
 
  cleanup_done= abort_on_warning= no_warnings_for_error= false;  
215
 
 
216
 
  /* query_cache init */
217
 
  query_cache_key= "";
218
 
  resultset= NULL;
 
226
  thread_id= 0;
 
227
  file_id = 0;
 
228
  query_id= 0;
 
229
  query= NULL;
 
230
  query_length= 0;
 
231
  warn_id= 0;
 
232
  memset(ha_data, 0, sizeof(ha_data));
 
233
  replication_data= 0;
 
234
  mysys_var= 0;
 
235
  dbug_sentry=Session_SENTRY_MAGIC;
 
236
  cleanup_done= abort_on_warning= no_warnings_for_error= false;
 
237
  transaction.on= 1;
 
238
  pthread_mutex_init(&LOCK_delete, MY_MUTEX_INIT_FAST);
219
239
 
220
240
  /* Variables with default values */
221
241
  proc_info="login";
 
242
  where= Session::DEFAULT_WHERE;
 
243
  command= COM_CONNECT;
222
244
 
223
245
  plugin_sessionvar_init(this);
224
246
  /*
228
250
  */
229
251
  variables.pseudo_thread_id= thread_id;
230
252
  server_status= SERVER_STATUS_AUTOCOMMIT;
 
253
  options= session_startup_options;
231
254
 
232
255
  if (variables.max_join_size == HA_POS_ERROR)
233
256
    options |= OPTION_BIG_SELECTS;
234
257
  else
235
258
    options &= ~OPTION_BIG_SELECTS;
236
259
 
 
260
  transaction.all.modified_non_trans_table= transaction.stmt.modified_non_trans_table= false;
237
261
  open_options=ha_open_options;
238
262
  update_lock_default= TL_WRITE;
239
263
  session_tx_isolation= (enum_tx_isolation) variables.tx_isolation;
240
 
  warn_list.clear();
 
264
  warn_list.empty();
241
265
  memset(warn_count, 0, sizeof(warn_count));
 
266
  total_warn_count= 0;
242
267
  memset(&status_var, 0, sizeof(status_var));
243
268
 
244
269
  /* Initialize sub structures */
245
270
  memory::init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
 
271
  hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0,
 
272
            (hash_get_key) get_var_key,
 
273
            (hash_free_key) free_user_var, 0);
246
274
 
247
275
  substitute_null_with_insert_id = false;
248
 
  lock_info.init(); /* safety: will be reset after start */
 
276
  thr_lock_info_init(&lock_info); /* safety: will be reset after start */
249
277
  thr_lock_owner_init(&main_lock_id, &lock_info);
250
278
 
251
279
  m_internal_handler= NULL;
252
 
  
253
 
  plugin::EventObserver::registerSessionEvents(*this); 
254
280
}
255
281
 
256
282
void Session::free_items()
274
300
  m_internal_handler= handler;
275
301
}
276
302
 
277
 
bool Session::handle_error(drizzled::error_t sql_errno, const char *message,
278
 
                           DRIZZLE_ERROR::enum_warning_level level)
 
303
bool Session::handle_error(uint32_t sql_errno, const char *message,
 
304
                       DRIZZLE_ERROR::enum_warning_level level)
279
305
{
280
306
  if (m_internal_handler)
281
307
  {
285
311
  return false;                                 // 'false', as per coding style
286
312
}
287
313
 
288
 
void Session::setAbort(bool arg)
289
 
{
290
 
  mysys_var->abort= arg;
291
 
}
292
 
 
293
 
void Session::lockOnSys()
294
 
{
295
 
  if (not mysys_var)
296
 
    return;
297
 
 
298
 
  setAbort(true);
299
 
  boost_unique_lock_t scopedLock(mysys_var->mutex);
300
 
  if (mysys_var->current_cond)
301
 
  {
302
 
    mysys_var->current_mutex->lock();
303
 
    mysys_var->current_cond->notify_all();
304
 
    mysys_var->current_mutex->unlock();
305
 
  }
306
 
}
307
 
 
308
314
void Session::pop_internal_handler()
309
315
{
310
316
  assert(m_internal_handler != NULL);
311
317
  m_internal_handler= NULL;
312
318
}
313
319
 
314
 
void Session::get_xid(DrizzleXid *xid)
315
 
{
316
 
  *xid = *(DrizzleXid *) &transaction.xid_state.xid;
317
 
}
 
320
#if defined(__cplusplus)
 
321
extern "C" {
 
322
#endif
 
323
 
 
324
void *session_alloc(Session *session, unsigned int size)
 
325
{
 
326
  return session->alloc(size);
 
327
}
 
328
 
 
329
void *session_calloc(Session *session, unsigned int size)
 
330
{
 
331
  return session->calloc(size);
 
332
}
 
333
 
 
334
char *session_strdup(Session *session, const char *str)
 
335
{
 
336
  return session->strdup(str);
 
337
}
 
338
 
 
339
char *session_strmake(Session *session, const char *str, unsigned int size)
 
340
{
 
341
  return session->strmake(str, size);
 
342
}
 
343
 
 
344
void *session_memdup(Session *session, const void* str, unsigned int size)
 
345
{
 
346
  return session->memdup(str, size);
 
347
}
 
348
 
 
349
void session_get_xid(const Session *session, DRIZZLE_XID *xid)
 
350
{
 
351
  *xid = *(DRIZZLE_XID *) &session->transaction.xid_state.xid;
 
352
}
 
353
 
 
354
#if defined(__cplusplus)
 
355
}
 
356
#endif
318
357
 
319
358
/* Do operations that may take a long time */
320
359
 
322
361
{
323
362
  assert(cleanup_done == false);
324
363
 
325
 
  setKilled(KILL_CONNECTION);
 
364
  killed= KILL_CONNECTION;
326
365
#ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE
327
366
  if (transaction.xid_state.xa_state == XA_PREPARED)
328
367
  {
330
369
  }
331
370
#endif
332
371
  {
333
 
    TransactionServices &transaction_services= TransactionServices::singleton();
334
 
    transaction_services.rollbackTransaction(*this, true);
 
372
    ha_rollback(this);
335
373
    xid_cache_delete(&transaction.xid_state);
336
374
  }
337
 
 
338
 
  for (UserVars::iterator iter= user_vars.begin();
339
 
       iter != user_vars.end();
340
 
       iter++)
341
 
  {
342
 
    user_var_entry *entry= (*iter).second;
343
 
    boost::checked_delete(entry);
344
 
  }
345
 
  user_vars.clear();
346
 
 
347
 
 
 
375
  hash_free(&user_vars);
348
376
  close_temporary_tables();
349
377
 
350
378
  if (global_read_lock)
351
 
  {
352
 
    unlockGlobalReadLock();
353
 
  }
 
379
    unlock_global_read_lock(this);
354
380
 
355
381
  cleanup_done= true;
356
382
}
357
383
 
358
384
Session::~Session()
359
385
{
360
 
  this->checkSentry();
 
386
  Session_CHECK_SENTRY(this);
 
387
  add_to_status(&global_status_var, &status_var);
361
388
 
362
 
  if (client and client->isConnected())
 
389
  if (client->isConnected())
363
390
  {
364
 
    assert(security_ctx);
365
391
    if (global_system_variables.log_warnings)
366
 
    {
367
 
      errmsg_printf(error::WARN, ER(ER_FORCING_CLOSE),
368
 
                    internal::my_progname,
369
 
                    thread_id,
370
 
                    security_ctx->username().c_str());
371
 
    }
372
 
 
373
 
    disconnect();
 
392
        errmsg_printf(ERRMSG_LVL_WARN, ER(ER_FORCING_CLOSE),my_progname,
 
393
                      thread_id,
 
394
                      (security_ctx.user.c_str() ?
 
395
                       security_ctx.user.c_str() : ""));
 
396
    disconnect(0, false);
374
397
  }
375
398
 
376
399
  /* Close connection */
377
 
  if (client)
378
 
  {
379
 
    client->close();
380
 
    boost::checked_delete(client);
381
 
    client= NULL;
382
 
  }
 
400
  client->close();
 
401
  delete client;
383
402
 
384
403
  if (cleanup_done == false)
385
404
    cleanup();
387
406
  plugin::StorageEngine::closeConnection(this);
388
407
  plugin_sessionvar_cleanup(this);
389
408
 
390
 
  warn_root.free_root(MYF(0));
 
409
  free_root(&warn_root,MYF(0));
 
410
  free_root(&transaction.mem_root,MYF(0));
391
411
  mysys_var=0;                                  // Safety (shouldn't be needed)
392
412
  dbug_sentry= Session_SENTRY_GONE;
393
413
 
394
 
  main_mem_root.free_root(MYF(0));
395
 
  currentMemRoot().release();
396
 
  currentSession().release();
397
 
 
398
 
  plugin::Logging::postEndDo(this);
399
 
  plugin::EventObserver::deregisterSessionEvents(*this); 
400
 
}
401
 
 
402
 
void Session::setClient(plugin::Client *client_arg)
403
 
{
404
 
  client= client_arg;
405
 
  client->setSession(this);
406
 
}
407
 
 
408
 
void Session::awake(Session::killed_state_t state_to_set)
409
 
{
410
 
  if ((state_to_set == Session::KILL_QUERY) and (command == COM_SLEEP))
411
 
    return;
412
 
 
413
 
  this->checkSentry();
414
 
 
415
 
  setKilled(state_to_set);
416
 
  scheduler->killSession(this);
417
 
 
 
414
  free_root(&main_mem_root, MYF(0));
 
415
  pthread_setspecific(THR_Session,  0);
 
416
 
 
417
 
 
418
  /* Ensure that no one is using Session */
 
419
  pthread_mutex_unlock(&LOCK_delete);
 
420
  pthread_mutex_destroy(&LOCK_delete);
 
421
}
 
422
 
 
423
/*
 
424
  Add all status variables to another status variable array
 
425
 
 
426
  SYNOPSIS
 
427
   add_to_status()
 
428
   to_var       add to this array
 
429
   from_var     from this array
 
430
 
 
431
  NOTES
 
432
    This function assumes that all variables are long/ulong.
 
433
    If this assumption will change, then we have to explictely add
 
434
    the other variables after the while loop
 
435
*/
 
436
void add_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var)
 
437
{
 
438
  ulong *end= (ulong*) ((unsigned char*) to_var +
 
439
                        offsetof(STATUS_VAR, last_system_status_var) +
 
440
                        sizeof(ulong));
 
441
  ulong *to= (ulong*) to_var, *from= (ulong*) from_var;
 
442
 
 
443
  while (to != end)
 
444
    *(to++)+= *(from++);
 
445
}
 
446
 
 
447
/*
 
448
  Add the difference between two status variable arrays to another one.
 
449
 
 
450
  SYNOPSIS
 
451
    add_diff_to_status
 
452
    to_var       add to this array
 
453
    from_var     from this array
 
454
    dec_var      minus this array
 
455
 
 
456
  NOTE
 
457
    This function assumes that all variables are long/ulong.
 
458
*/
 
459
void add_diff_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var,
 
460
                        STATUS_VAR *dec_var)
 
461
{
 
462
  ulong *end= (ulong*) ((unsigned char*) to_var + offsetof(STATUS_VAR,
 
463
                                                  last_system_status_var) +
 
464
                        sizeof(ulong));
 
465
  ulong *to= (ulong*) to_var, *from= (ulong*) from_var, *dec= (ulong*) dec_var;
 
466
 
 
467
  while (to != end)
 
468
    *(to++)+= *(from++) - *(dec++);
 
469
}
 
470
 
 
471
void Session::awake(Session::killed_state state_to_set)
 
472
{
 
473
  Session_CHECK_SENTRY(this);
 
474
  safe_mutex_assert_owner(&LOCK_delete);
 
475
 
 
476
  killed= state_to_set;
418
477
  if (state_to_set != Session::KILL_QUERY)
419
478
  {
 
479
    scheduler->killSession(this);
420
480
    DRIZZLE_CONNECTION_DONE(thread_id);
421
481
  }
422
 
 
423
482
  if (mysys_var)
424
483
  {
425
 
    boost_unique_lock_t scopedLock(mysys_var->mutex);
 
484
    pthread_mutex_lock(&mysys_var->mutex);
426
485
    /*
427
 
      "
428
486
      This broadcast could be up in the air if the victim thread
429
487
      exits the cond in the time between read and broadcast, but that is
430
488
      ok since all we want to do is to make the victim thread get out
445
503
    */
446
504
    if (mysys_var->current_cond && mysys_var->current_mutex)
447
505
    {
448
 
      mysys_var->current_mutex->lock();
449
 
      mysys_var->current_cond->notify_all();
450
 
      mysys_var->current_mutex->unlock();
 
506
      pthread_mutex_lock(mysys_var->current_mutex);
 
507
      pthread_cond_broadcast(mysys_var->current_cond);
 
508
      pthread_mutex_unlock(mysys_var->current_mutex);
451
509
    }
 
510
    pthread_mutex_unlock(&mysys_var->mutex);
452
511
  }
453
512
}
454
513
 
464
523
  */
465
524
  assert(thread_stack);
466
525
 
467
 
  currentSession().release();
468
 
  currentSession().reset(this);
469
 
 
470
 
  currentMemRoot().release();
471
 
  currentMemRoot().reset(&mem_root);
 
526
  if (pthread_setspecific(THR_Session,  this) ||
 
527
      pthread_setspecific(THR_Mem_root, &mem_root))
 
528
    return true;
472
529
 
473
530
  mysys_var=my_thread_var;
474
531
 
477
534
    This allows us to move Session to different threads if needed.
478
535
  */
479
536
  mysys_var->id= thread_id;
 
537
  real_id= pthread_self();                      // For debugging
480
538
 
481
539
  /*
482
540
    We have to call thr_lock_info_init() again here as Session may have been
483
541
    created in another thread
484
542
  */
485
 
  lock_info.init();
486
 
 
 
543
  thr_lock_info_init(&lock_info);
487
544
  return false;
488
545
}
489
546
 
502
559
  set_proc_info(NULL);
503
560
  command= COM_SLEEP;
504
561
  set_time();
 
562
  ha_enable_transaction(this,true);
505
563
 
506
 
  mem_root->reset_root_defaults(variables.query_alloc_block_size,
507
 
                                variables.query_prealloc_size);
 
564
  reset_root_defaults(mem_root, variables.query_alloc_block_size,
 
565
                      variables.query_prealloc_size);
 
566
  reset_root_defaults(&transaction.mem_root,
 
567
                      variables.trans_alloc_block_size,
 
568
                      variables.trans_prealloc_size);
508
569
  transaction.xid_state.xid.null();
509
570
  transaction.xid_state.in_session=1;
510
 
  if (use_usage)
511
 
    resetUsage();
512
571
}
513
572
 
514
573
bool Session::initGlobals()
515
574
{
516
575
  if (storeGlobals())
517
576
  {
518
 
    disconnect(ER_OUT_OF_RESOURCES);
519
 
    status_var.aborted_connects++;
 
577
    disconnect(ER_OUT_OF_RESOURCES, true);
 
578
    statistic_increment(aborted_connects, &LOCK_status);
520
579
    return true;
521
580
  }
522
581
  return false;
526
585
{
527
586
  if (initGlobals() || authenticate())
528
587
  {
529
 
    disconnect();
 
588
    disconnect(0, true);
530
589
    return;
531
590
  }
532
591
 
533
592
  prepareForQueries();
534
593
 
535
 
  while (not client->haveError() && getKilled() != KILL_CONNECTION)
 
594
  while (! client->haveError() && killed != KILL_CONNECTION)
536
595
  {
537
 
    if (not executeStatement())
 
596
    if (! executeStatement())
538
597
      break;
539
598
  }
540
599
 
541
 
  disconnect();
 
600
  disconnect(0, true);
542
601
}
543
602
 
544
 
bool Session::schedule(Session::shared_ptr &arg)
 
603
bool Session::schedule()
545
604
{
546
 
  arg->scheduler= plugin::Scheduler::getScheduler();
547
 
  assert(arg->scheduler);
548
 
 
549
 
  ++connection_count;
550
 
 
551
 
  long current_connections= connection_count;
552
 
 
553
 
  if (current_connections > 0 and static_cast<uint64_t>(current_connections) > current_global_counters.max_used_connections)
554
 
  {
555
 
    current_global_counters.max_used_connections= static_cast<uint64_t>(connection_count);
556
 
  }
557
 
 
558
 
  current_global_counters.connections++;
559
 
  arg->thread_id= arg->variables.pseudo_thread_id= global_thread_id++;
560
 
 
561
 
  session::Cache::singleton().insert(arg);
562
 
 
563
 
  if (unlikely(plugin::EventObserver::connectSession(*arg)))
564
 
  {
565
 
    // We should do something about an error...
566
 
  }
567
 
 
568
 
  if (plugin::Scheduler::getScheduler()->addSession(arg))
569
 
  {
570
 
    DRIZZLE_CONNECTION_START(arg->getSessionId());
 
605
  scheduler= plugin::Scheduler::getScheduler();
 
606
  assert(scheduler);
 
607
 
 
608
  connection_count.increment();
 
609
 
 
610
  if (connection_count > max_used_connections)
 
611
    max_used_connections= connection_count;
 
612
 
 
613
  thread_id= variables.pseudo_thread_id= global_thread_id++;
 
614
 
 
615
  pthread_mutex_lock(&LOCK_thread_count);
 
616
  getSessionList().push_back(this);
 
617
  pthread_mutex_unlock(&LOCK_thread_count);
 
618
 
 
619
  if (scheduler->addSession(this))
 
620
  {
 
621
    DRIZZLE_CONNECTION_START(thread_id);
571
622
    char error_message_buff[DRIZZLE_ERRMSG_SIZE];
572
623
 
573
 
    arg->setKilled(Session::KILL_CONNECTION);
 
624
    killed= Session::KILL_CONNECTION;
574
625
 
575
 
    arg->status_var.aborted_connects++;
 
626
    statistic_increment(aborted_connects, &LOCK_status);
576
627
 
577
628
    /* Can't use my_error() since store_globals has not been called. */
578
629
    /* TODO replace will better error message */
579
630
    snprintf(error_message_buff, sizeof(error_message_buff),
580
631
             ER(ER_CANT_CREATE_THREAD), 1);
581
 
    arg->client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
582
 
 
 
632
    client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
583
633
    return true;
584
634
  }
585
635
 
587
637
}
588
638
 
589
639
 
590
 
/*
591
 
  Is this session viewable by the current user?
592
 
*/
593
 
bool Session::isViewable(identifier::User::const_reference user_arg) const
594
 
{
595
 
  return plugin::Authorization::isAuthorized(user_arg, *this, false);
596
 
}
597
 
 
598
 
 
599
 
const char* Session::enter_cond(boost::condition_variable_any &cond, boost::mutex &mutex, const char* msg)
 
640
const char* Session::enter_cond(pthread_cond_t *cond,
 
641
                                pthread_mutex_t* mutex,
 
642
                                const char* msg)
600
643
{
601
644
  const char* old_msg = get_proc_info();
602
645
  safe_mutex_assert_owner(mutex);
603
 
  mysys_var->current_mutex = &mutex;
604
 
  mysys_var->current_cond = &cond;
 
646
  mysys_var->current_mutex = mutex;
 
647
  mysys_var->current_cond = cond;
605
648
  this->set_proc_info(msg);
606
649
  return old_msg;
607
650
}
614
657
    locked (if that would not be the case, you'll get a deadlock if someone
615
658
    does a Session::awake() on you).
616
659
  */
617
 
  mysys_var->current_mutex->unlock();
618
 
  boost_unique_lock_t scopedLock(mysys_var->mutex);
 
660
  pthread_mutex_unlock(mysys_var->current_mutex);
 
661
  pthread_mutex_lock(&mysys_var->mutex);
619
662
  mysys_var->current_mutex = 0;
620
663
  mysys_var->current_cond = 0;
621
664
  this->set_proc_info(old_msg);
 
665
  pthread_mutex_unlock(&mysys_var->mutex);
622
666
}
623
667
 
624
668
bool Session::authenticate()
625
669
{
 
670
  lex_start(this);
626
671
  if (client->authenticate())
627
672
    return false;
628
673
 
629
 
  status_var.aborted_connects++;
630
 
 
 
674
  statistic_increment(aborted_connects, &LOCK_status);
631
675
  return true;
632
676
}
633
677
 
634
 
bool Session::checkUser(const std::string &passwd_str,
635
 
                        const std::string &in_db)
 
678
bool Session::checkUser(const char *passwd, uint32_t passwd_len, const char *in_db)
636
679
{
637
 
  bool is_authenticated=
638
 
    plugin::Authentication::isAuthenticated(*user(), passwd_str);
 
680
  bool is_authenticated;
 
681
 
 
682
  if (passwd_len != 0 && passwd_len != SCRAMBLE_LENGTH)
 
683
  {
 
684
    my_error(ER_HANDSHAKE_ERROR, MYF(0), security_ctx.ip.c_str());
 
685
    return false;
 
686
  }
 
687
 
 
688
  is_authenticated= plugin::Authentication::isAuthenticated(this, passwd);
639
689
 
640
690
  if (is_authenticated != true)
641
691
  {
642
 
    status_var.access_denied++;
643
 
    /* isAuthenticated has pushed the error message */
 
692
    my_error(ER_ACCESS_DENIED_ERROR, MYF(0),
 
693
             security_ctx.user.c_str(),
 
694
             security_ctx.ip.c_str(),
 
695
             passwd_len ? ER(ER_YES) : ER(ER_NO));
 
696
 
644
697
    return false;
645
698
  }
646
699
 
 
700
  security_ctx.skip_grants();
 
701
 
647
702
  /* Change database if necessary */
648
 
  if (not in_db.empty())
 
703
  if (in_db && in_db[0])
649
704
  {
650
 
    identifier::Schema identifier(in_db);
651
 
    if (schema::change(*this, identifier))
 
705
    const string database_name_string(in_db);
 
706
    NonNormalisedDatabaseName database_name(database_name_string);
 
707
    NormalisedDatabaseName normalised_database_name(database_name);
 
708
 
 
709
    if (mysql_change_db(this, normalised_database_name, false))
652
710
    {
653
 
      /* change_db() has pushed the error message. */
 
711
      /* mysql_change_db() has pushed the error message. */
654
712
      return false;
655
713
    }
656
714
  }
657
715
  my_ok();
658
 
  password= not passwd_str.empty();
 
716
  password= test(passwd_len);          // remember for error messages
659
717
 
660
718
  /* Ready to handle queries */
661
719
  return true;
677
735
  main_da.reset_diagnostics_area();
678
736
 
679
737
  if (client->readCommand(&l_packet, &packet_length) == false)
680
 
  {
681
 
    return false;
682
 
  }
683
 
 
684
 
  if (getKilled() == KILL_CONNECTION)
685
738
    return false;
686
739
 
687
740
  if (packet_length == 0)
688
741
    return true;
689
742
 
690
 
  l_command= static_cast<enum_server_command>(l_packet[0]);
 
743
  l_command= (enum enum_server_command) (unsigned char) l_packet[0];
691
744
 
692
745
  if (command >= COM_END)
693
746
    command= COM_END;                           // Wrong command
694
747
 
695
748
  assert(packet_length);
696
 
  return not dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
 
749
  return ! dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
697
750
}
698
751
 
699
752
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
705
758
    in_packet_length--;
706
759
  }
707
760
  const char *pos= in_packet + in_packet_length; /* Point at end null */
708
 
  while (in_packet_length > 0 && (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
 
761
  while (in_packet_length > 0 &&
 
762
         (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
709
763
  {
710
764
    pos--;
711
765
    in_packet_length--;
712
766
  }
713
767
 
714
 
  std::string *new_query= new std::string(in_packet, in_packet + in_packet_length);
715
 
  // We can not be entirely sure _schema has a value
716
 
  if (_schema)
717
 
  {
718
 
    plugin::QueryRewriter::rewriteQuery(*_schema, *new_query);
719
 
  }
720
 
  query.reset(new_query);
721
 
  _state.reset(new session::State(in_packet, in_packet_length));
 
768
  /* We must allocate some extra memory for the cached query string */
 
769
  query_length= 0; /* Extra safety: Avoid races */
 
770
  query= (char*) memdup_w_gap((unsigned char*) in_packet, in_packet_length, db.length() + 1);
 
771
  if (! query)
 
772
    return false;
 
773
 
 
774
  query[in_packet_length]=0;
 
775
  query_length= in_packet_length;
722
776
 
723
777
  return true;
724
778
}
727
781
{
728
782
  bool do_release= 0;
729
783
  bool result= true;
730
 
  TransactionServices &transaction_services= TransactionServices::singleton();
731
784
 
732
785
  if (transaction.xid_state.xa_state != XA_NOTR)
733
786
  {
743
796
       * (Which of course should never happen...)
744
797
       */
745
798
      server_status&= ~SERVER_STATUS_IN_TRANS;
746
 
      if (transaction_services.commitTransaction(*this, true))
 
799
      if (ha_commit(this))
747
800
        result= false;
748
801
      options&= ~(OPTION_BEGIN);
 
802
      transaction.all.modified_non_trans_table= false;
749
803
      break;
750
804
    case COMMIT_RELEASE:
751
805
      do_release= 1; /* fall through */
760
814
    case ROLLBACK_AND_CHAIN:
761
815
    {
762
816
      server_status&= ~SERVER_STATUS_IN_TRANS;
763
 
      if (transaction_services.rollbackTransaction(*this, true))
 
817
      if (ha_rollback(this))
764
818
        result= false;
765
819
      options&= ~(OPTION_BEGIN);
 
820
      transaction.all.modified_non_trans_table= false;
766
821
      if (result == true && (completion == ROLLBACK_AND_CHAIN))
767
822
        result= startTransaction();
768
823
      break;
773
828
  }
774
829
 
775
830
  if (result == false)
776
 
  {
777
 
    my_error(static_cast<drizzled::error_t>(killed_errno()), MYF(0));
778
 
  }
 
831
    my_error(killed_errno(), MYF(0));
779
832
  else if ((result == true) && do_release)
780
 
  {
781
 
    setKilled(Session::KILL_CONNECTION);
782
 
  }
 
833
    killed= Session::KILL_CONNECTION;
783
834
 
784
835
  return result;
785
836
}
787
838
bool Session::endActiveTransaction()
788
839
{
789
840
  bool result= true;
790
 
  TransactionServices &transaction_services= TransactionServices::singleton();
791
841
 
792
842
  if (transaction.xid_state.xa_state != XA_NOTR)
793
843
  {
797
847
  if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
798
848
  {
799
849
    server_status&= ~SERVER_STATUS_IN_TRANS;
800
 
    if (transaction_services.commitTransaction(*this, true))
 
850
    if (ha_commit(this))
801
851
      result= false;
802
852
  }
803
853
  options&= ~(OPTION_BEGIN);
 
854
  transaction.all.modified_non_trans_table= false;
804
855
  return result;
805
856
}
806
857
 
808
859
{
809
860
  bool result= true;
810
861
 
811
 
  assert(! inTransaction());
812
 
 
813
 
  options|= OPTION_BEGIN;
814
 
  server_status|= SERVER_STATUS_IN_TRANS;
815
 
 
816
 
  if (plugin::TransactionalStorageEngine::notifyStartTransaction(this, opt))
 
862
  if (! endActiveTransaction())
817
863
  {
818
864
    result= false;
819
865
  }
 
866
  else
 
867
  {
 
868
    options|= OPTION_BEGIN;
 
869
    server_status|= SERVER_STATUS_IN_TRANS;
 
870
 
 
871
    if (opt == START_TRANS_OPT_WITH_CONS_SNAPSHOT)
 
872
    {
 
873
      // TODO make this a loop for all engines, not just this one (Inno only
 
874
      // right now)
 
875
      if (plugin::StorageEngine::startConsistentSnapshot(this))
 
876
      {
 
877
        result= false;
 
878
      }
 
879
    }
 
880
  }
820
881
 
821
882
  return result;
822
883
}
838
899
    first_successful_insert_id_in_cur_stmt= 0;
839
900
    substitute_null_with_insert_id= true;
840
901
  }
841
 
 
842
902
  arg_of_last_insert_id_function= false;
843
 
 
844
903
  /* Free Items that were created during this execution */
845
904
  free_items();
846
 
 
847
 
  /* Reset _where. */
848
 
  _where= Session::DEFAULT_WHERE;
849
 
 
850
 
  /* Reset the temporary shares we built */
851
 
  for_each(temporary_shares.begin(),
852
 
           temporary_shares.end(),
853
 
           DeletePtr());
854
 
  temporary_shares.clear();
 
905
  /* Reset where. */
 
906
  where= Session::DEFAULT_WHERE;
855
907
}
856
908
 
857
909
/**
865
917
  @return  NULL on failure, or pointer to the LEX_STRING object
866
918
*/
867
919
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
868
 
                                     const std::string &str,
869
 
                                     bool allocate_lex_string)
870
 
{
871
 
  return make_lex_string(lex_str, str.c_str(), str.length(), allocate_lex_string);
872
 
}
873
 
 
874
 
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
875
 
                                     const char* str, uint32_t length,
876
 
                                     bool allocate_lex_string)
 
920
                                 const char* str, uint32_t length,
 
921
                                 bool allocate_lex_string)
877
922
{
878
923
  if (allocate_lex_string)
879
 
    if (!(lex_str= (LEX_STRING *)getMemRoot()->allocate(sizeof(LEX_STRING))))
 
924
    if (!(lex_str= (LEX_STRING *)alloc(sizeof(LEX_STRING))))
880
925
      return 0;
881
 
  if (!(lex_str->str= mem_root->strmake_root(str, length)))
 
926
  if (!(lex_str->str= strmake_root(mem_root, str, length)))
882
927
    return 0;
883
928
  lex_str->length= length;
884
929
  return lex_str;
885
930
}
886
931
 
 
932
/* routings to adding tables to list of changed in transaction tables */
 
933
inline static void list_include(CHANGED_TableList** prev,
 
934
                                CHANGED_TableList* curr,
 
935
                                CHANGED_TableList* new_table)
 
936
{
 
937
  if (new_table)
 
938
  {
 
939
    *prev = new_table;
 
940
    (*prev)->next = curr;
 
941
  }
 
942
}
 
943
 
 
944
/* add table to list of changed in transaction tables */
 
945
 
 
946
void Session::add_changed_table(Table *table)
 
947
{
 
948
  assert((options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
 
949
              table->cursor->has_transactions());
 
950
  add_changed_table(table->s->table_cache_key.str,
 
951
                    (long) table->s->table_cache_key.length);
 
952
}
 
953
 
 
954
 
 
955
void Session::add_changed_table(const char *key, long key_length)
 
956
{
 
957
  CHANGED_TableList **prev_changed = &transaction.changed_tables;
 
958
  CHANGED_TableList *curr = transaction.changed_tables;
 
959
 
 
960
  for (; curr; prev_changed = &(curr->next), curr = curr->next)
 
961
  {
 
962
    int cmp =  (long)curr->key_length - (long)key_length;
 
963
    if (cmp < 0)
 
964
    {
 
965
      list_include(prev_changed, curr, changed_table_dup(key, key_length));
 
966
      return;
 
967
    }
 
968
    else if (cmp == 0)
 
969
    {
 
970
      cmp = memcmp(curr->key, key, curr->key_length);
 
971
      if (cmp < 0)
 
972
      {
 
973
        list_include(prev_changed, curr, changed_table_dup(key, key_length));
 
974
        return;
 
975
      }
 
976
      else if (cmp == 0)
 
977
      {
 
978
        return;
 
979
      }
 
980
    }
 
981
  }
 
982
  *prev_changed = changed_table_dup(key, key_length);
 
983
}
 
984
 
 
985
 
 
986
CHANGED_TableList* Session::changed_table_dup(const char *key, long key_length)
 
987
{
 
988
  CHANGED_TableList* new_table =
 
989
    (CHANGED_TableList*) trans_alloc(ALIGN_SIZE(sizeof(CHANGED_TableList))+
 
990
                                      key_length + 1);
 
991
  if (!new_table)
 
992
  {
 
993
    my_error(EE_OUTOFMEMORY, MYF(ME_BELL),
 
994
             ALIGN_SIZE(sizeof(TableList)) + key_length + 1);
 
995
    killed= KILL_CONNECTION;
 
996
    return 0;
 
997
  }
 
998
 
 
999
  new_table->key= ((char*)new_table)+ ALIGN_SIZE(sizeof(CHANGED_TableList));
 
1000
  new_table->next = 0;
 
1001
  new_table->key_length = key_length;
 
1002
  ::memcpy(new_table->key, key, key_length);
 
1003
  return new_table;
 
1004
}
 
1005
 
 
1006
 
887
1007
int Session::send_explain_fields(select_result *result)
888
1008
{
889
1009
  List<Item> field_list;
922
1042
  return (result->send_fields(field_list));
923
1043
}
924
1044
 
925
 
void select_result::send_error(drizzled::error_t errcode, const char *err)
 
1045
void select_result::send_error(uint32_t errcode, const char *err)
926
1046
{
927
1047
  my_message(errcode, err, MYF(0));
928
1048
}
931
1051
  Handling writing to file
932
1052
************************************************************************/
933
1053
 
934
 
void select_to_file::send_error(drizzled::error_t errcode,const char *err)
 
1054
void select_to_file::send_error(uint32_t errcode,const char *err)
935
1055
{
936
1056
  my_message(errcode, err, MYF(0));
937
1057
  if (file > 0)
938
1058
  {
939
 
    (void) cache->end_io_cache();
940
 
    (void) internal::my_close(file, MYF(0));
941
 
    (void) internal::my_delete(path.file_string().c_str(), MYF(0));             // Delete file on error
 
1059
    (void) end_io_cache(cache);
 
1060
    (void) my_close(file, MYF(0));
 
1061
    (void) my_delete(path, MYF(0));             // Delete file on error
942
1062
    file= -1;
943
1063
  }
944
1064
}
946
1066
 
947
1067
bool select_to_file::send_eof()
948
1068
{
949
 
  int error= test(cache->end_io_cache());
950
 
  if (internal::my_close(file, MYF(MY_WME)))
 
1069
  int error= test(end_io_cache(cache));
 
1070
  if (my_close(file, MYF(MY_WME)))
951
1071
    error= 1;
952
1072
  if (!error)
953
1073
  {
968
1088
  /* In case of error send_eof() may be not called: close the file here. */
969
1089
  if (file >= 0)
970
1090
  {
971
 
    (void) cache->end_io_cache();
972
 
    (void) internal::my_close(file, MYF(0));
 
1091
    (void) end_io_cache(cache);
 
1092
    (void) my_close(file, MYF(0));
973
1093
    file= -1;
974
1094
  }
975
 
  path= "";
 
1095
  path[0]= '\0';
976
1096
  row_count= 0;
977
1097
}
978
1098
 
979
1099
select_to_file::select_to_file(file_exchange *ex)
980
1100
  : exchange(ex),
981
1101
    file(-1),
982
 
    cache(static_cast<internal::IO_CACHE *>(memory::sql_calloc(sizeof(internal::IO_CACHE)))),
 
1102
    cache(static_cast<IO_CACHE *>(memory::sql_calloc(sizeof(IO_CACHE)))),
983
1103
    row_count(0L)
984
1104
{
985
 
  path= "";
 
1105
  path[0]=0;
986
1106
}
987
1107
 
988
1108
select_to_file::~select_to_file()
1016
1136
*/
1017
1137
 
1018
1138
 
1019
 
static int create_file(Session *session,
1020
 
                       fs::path &target_path,
1021
 
                       file_exchange *exchange,
1022
 
                       internal::IO_CACHE *cache)
 
1139
static int create_file(Session *session, char *path, file_exchange *exchange, IO_CACHE *cache)
1023
1140
{
1024
 
  fs::path to_file(exchange->file_name);
1025
1141
  int file;
1026
 
 
1027
 
  if (not to_file.has_root_directory())
 
1142
  uint32_t option= MY_UNPACK_FILENAME | MY_RELATIVE_PATH;
 
1143
 
 
1144
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
 
1145
  option|= MY_REPLACE_DIR;                      // Force use of db directory
 
1146
#endif
 
1147
 
 
1148
  if (!dirname_length(exchange->file_name))
1028
1149
  {
1029
 
    target_path= fs::system_complete(getDataHomeCatalog());
1030
 
    util::string::const_shared_ptr schema(session->schema());
1031
 
    if (schema and not schema->empty())
1032
 
    {
1033
 
      int count_elements= 0;
1034
 
      for (fs::path::iterator iter= to_file.begin();
1035
 
           iter != to_file.end();
1036
 
           ++iter, ++count_elements)
1037
 
      { }
1038
 
 
1039
 
      if (count_elements == 1)
1040
 
      {
1041
 
        target_path /= *schema;
1042
 
      }
1043
 
    }
1044
 
    target_path /= to_file;
 
1150
    strcpy(path, drizzle_real_data_home);
 
1151
    if (! session->db.empty())
 
1152
      strncat(path, session->db.c_str(), FN_REFLEN-strlen(drizzle_real_data_home)-1);
 
1153
    (void) fn_format(path, exchange->file_name, path, "", option);
1045
1154
  }
1046
1155
  else
1047
 
  {
1048
 
    target_path = exchange->file_name;
1049
 
  }
1050
 
 
1051
 
  if (not secure_file_priv.string().empty())
1052
 
  {
1053
 
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
1054
 
    {
1055
 
      /* Write only allowed to dir or subdir specified by secure_file_priv */
1056
 
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
1057
 
      return -1;
1058
 
    }
1059
 
  }
1060
 
 
1061
 
  if (!access(target_path.file_string().c_str(), F_OK))
 
1156
    (void) fn_format(path, exchange->file_name, drizzle_real_data_home, "", option);
 
1157
 
 
1158
  if (opt_secure_file_priv &&
 
1159
      strncmp(opt_secure_file_priv, path, strlen(opt_secure_file_priv)))
 
1160
  {
 
1161
    /* Write only allowed to dir or subdir specified by secure_file_priv */
 
1162
    my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
1163
    return -1;
 
1164
  }
 
1165
 
 
1166
  if (!access(path, F_OK))
1062
1167
  {
1063
1168
    my_error(ER_FILE_EXISTS_ERROR, MYF(0), exchange->file_name);
1064
1169
    return -1;
1065
1170
  }
1066
1171
  /* Create the file world readable */
1067
 
  if ((file= internal::my_create(target_path.file_string().c_str(), 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
 
1172
  if ((file= my_create(path, 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
1068
1173
    return file;
1069
1174
  (void) fchmod(file, 0666);                    // Because of umask()
1070
 
  if (cache->init_io_cache(file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
 
1175
  if (init_io_cache(cache, file, 0L, WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1071
1176
  {
1072
 
    internal::my_close(file, MYF(0));
1073
 
    internal::my_delete(target_path.file_string().c_str(), MYF(0));  // Delete file on error, it was just created
 
1177
    my_close(file, MYF(0));
 
1178
    my_delete(path, MYF(0));  // Delete file on error, it was just created
1074
1179
    return -1;
1075
1180
  }
1076
1181
  return file;
1084
1189
  bool string_results= false, non_string_results= false;
1085
1190
  unit= u;
1086
1191
  if ((uint32_t) strlen(exchange->file_name) + NAME_LEN >= FN_REFLEN)
1087
 
  {
1088
 
    path= exchange->file_name;
1089
 
  }
 
1192
    strncpy(path,exchange->file_name,FN_REFLEN-1);
1090
1193
 
1091
1194
  /* Check if there is any blobs in data */
1092
1195
  {
1093
 
    List<Item>::iterator li(list.begin());
 
1196
    List_iterator_fast<Item> li(list);
1094
1197
    Item *item;
1095
1198
    while ((item=li++))
1096
1199
    {
1097
1200
      if (item->max_length >= MAX_BLOB_WIDTH)
1098
1201
      {
1099
 
        blob_flag=1;
1100
 
        break;
 
1202
        blob_flag=1;
 
1203
        break;
1101
1204
      }
1102
 
 
1103
1205
      if (item->result_type() == STRING_RESULT)
1104
1206
        string_results= true;
1105
1207
      else
1140
1242
  return 0;
1141
1243
}
1142
1244
 
 
1245
 
 
1246
#define NEED_ESCAPING(x) ((int) (unsigned char) (x) == escape_char    || \
 
1247
                          (enclosed ? (int) (unsigned char) (x) == field_sep_char      \
 
1248
                                    : (int) (unsigned char) (x) == field_term_char) || \
 
1249
                          (int) (unsigned char) (x) == line_sep_char  || \
 
1250
                          !(x))
 
1251
 
1143
1252
bool select_export::send_data(List<Item> &items)
1144
1253
{
1145
1254
  char buff[MAX_FIELD_WIDTH],null_buff[2],space[MAX_FIELD_WIDTH];
1150
1259
  if (unit->offset_limit_cnt)
1151
1260
  {                                             // using limit offset,count
1152
1261
    unit->offset_limit_cnt--;
1153
 
    return false;
 
1262
    return(0);
1154
1263
  }
1155
1264
  row_count++;
1156
1265
  Item *item;
1157
1266
  uint32_t used_length=0,items_left=items.elements;
1158
 
  List<Item>::iterator li(items.begin());
 
1267
  List_iterator_fast<Item> li(items);
1159
1268
 
1160
1269
  if (my_b_write(cache,(unsigned char*) exchange->line_start->ptr(),
1161
1270
                 exchange->line_start->length()))
1162
 
    return true;
1163
 
 
 
1271
    goto err;
1164
1272
  while ((item=li++))
1165
1273
  {
1166
1274
    Item_result result_type=item->result_type();
1171
1279
    {
1172
1280
      if (my_b_write(cache,(unsigned char*) exchange->enclosed->ptr(),
1173
1281
                     exchange->enclosed->length()))
1174
 
        return true;
 
1282
        goto err;
1175
1283
    }
1176
1284
    if (!res)
1177
1285
    {                                           // NULL
1182
1290
          null_buff[0]=escape_char;
1183
1291
          null_buff[1]='N';
1184
1292
          if (my_b_write(cache,(unsigned char*) null_buff,2))
1185
 
            return true;
 
1293
            goto err;
1186
1294
        }
1187
1295
        else if (my_b_write(cache,(unsigned char*) "NULL",4))
1188
 
          return true;
 
1296
          goto err;
1189
1297
      }
1190
1298
      else
1191
1299
      {
1195
1303
    else
1196
1304
    {
1197
1305
      if (fixed_row_size)
1198
 
        used_length= min(res->length(), static_cast<size_t>(item->max_length));
 
1306
        used_length= min(res->length(),item->max_length);
1199
1307
      else
1200
1308
        used_length= res->length();
1201
1309
 
1257
1365
            assert before the loop makes that sure.
1258
1366
          */
1259
1367
 
1260
 
          if ((needs_escaping(*pos, enclosed) ||
 
1368
          if ((NEED_ESCAPING(*pos) ||
1261
1369
               (check_second_byte &&
1262
1370
                my_mbcharlen(character_set_client, (unsigned char) *pos) == 2 &&
1263
1371
                pos + 1 < end &&
1264
 
                needs_escaping(pos[1], enclosed))) &&
 
1372
                NEED_ESCAPING(pos[1]))) &&
1265
1373
              /*
1266
1374
                Don't escape field_term_char by doubling - doubling is only
1267
1375
                valid for ENCLOSED BY characters:
1276
1384
            tmp_buff[1]= *pos ? *pos : '0';
1277
1385
            if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)) ||
1278
1386
                my_b_write(cache,(unsigned char*) tmp_buff,2))
1279
 
              return true;
 
1387
              goto err;
1280
1388
            start=pos+1;
1281
1389
          }
1282
1390
        }
1283
1391
        if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)))
1284
 
          return true;
 
1392
          goto err;
1285
1393
      }
1286
1394
      else if (my_b_write(cache,(unsigned char*) res->ptr(),used_length))
1287
 
        return true;
 
1395
        goto err;
1288
1396
    }
1289
1397
    if (fixed_row_size)
1290
1398
    {                                           // Fill with space
1300
1408
        for (; length > sizeof(space) ; length-=sizeof(space))
1301
1409
        {
1302
1410
          if (my_b_write(cache,(unsigned char*) space,sizeof(space)))
1303
 
            return true;
 
1411
            goto err;
1304
1412
        }
1305
1413
        if (my_b_write(cache,(unsigned char*) space,length))
1306
 
          return true;
 
1414
          goto err;
1307
1415
      }
1308
1416
    }
1309
1417
    if (res && enclosed)
1310
1418
    {
1311
1419
      if (my_b_write(cache, (unsigned char*) exchange->enclosed->ptr(),
1312
1420
                     exchange->enclosed->length()))
1313
 
        return true;
 
1421
        goto err;
1314
1422
    }
1315
1423
    if (--items_left)
1316
1424
    {
1317
1425
      if (my_b_write(cache, (unsigned char*) exchange->field_term->ptr(),
1318
1426
                     field_term_length))
1319
 
        return true;
 
1427
        goto err;
1320
1428
    }
1321
1429
  }
1322
1430
  if (my_b_write(cache,(unsigned char*) exchange->line_term->ptr(),
1323
1431
                 exchange->line_term->length()))
1324
 
  {
1325
 
    return true;
1326
 
  }
1327
 
 
1328
 
  return false;
 
1432
    goto err;
 
1433
  return(0);
 
1434
err:
 
1435
  return(1);
1329
1436
}
1330
1437
 
1331
1438
 
1344
1451
 
1345
1452
bool select_dump::send_data(List<Item> &items)
1346
1453
{
1347
 
  List<Item>::iterator li(items.begin());
 
1454
  List_iterator_fast<Item> li(items);
1348
1455
  char buff[MAX_FIELD_WIDTH];
1349
1456
  String tmp(buff,sizeof(buff),&my_charset_bin),*res;
1350
1457
  tmp.length(0);
1358
1465
  if (row_count++ > 1)
1359
1466
  {
1360
1467
    my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0));
1361
 
    return 1;
 
1468
    goto err;
1362
1469
  }
1363
1470
  while ((item=li++))
1364
1471
  {
1366
1473
    if (!res)                                   // If NULL
1367
1474
    {
1368
1475
      if (my_b_write(cache,(unsigned char*) "",1))
1369
 
        return 1;
 
1476
        goto err;
1370
1477
    }
1371
1478
    else if (my_b_write(cache,(unsigned char*) res->ptr(),res->length()))
1372
1479
    {
1373
 
      my_error(ER_ERROR_ON_WRITE, MYF(0), path.file_string().c_str(), errno);
1374
 
      return 1;
 
1480
      my_error(ER_ERROR_ON_WRITE, MYF(0), path, errno);
 
1481
      goto err;
1375
1482
    }
1376
1483
  }
1377
1484
  return(0);
 
1485
err:
 
1486
  return(1);
1378
1487
}
1379
1488
 
1380
1489
 
1397
1506
    unit->offset_limit_cnt--;
1398
1507
    return(0);
1399
1508
  }
1400
 
  List<Item>::iterator li(items.begin());
 
1509
  List_iterator_fast<Item> li(items);
1401
1510
  Item *val_item;
1402
1511
  for (uint32_t i= 0; (val_item= li++); i++)
1403
1512
    it->store(i, val_item);
1415
1524
bool select_max_min_finder_subselect::send_data(List<Item> &items)
1416
1525
{
1417
1526
  Item_maxmin_subselect *it= (Item_maxmin_subselect *)item;
1418
 
  List<Item>::iterator li(items.begin());
 
1527
  List_iterator_fast<Item> li(items);
1419
1528
  Item *val_item= li++;
1420
1529
  it->register_value();
1421
1530
  if (it->assigned())
1432
1541
      switch (val_item->result_type())
1433
1542
      {
1434
1543
      case REAL_RESULT:
1435
 
        op= &select_max_min_finder_subselect::cmp_real;
1436
 
        break;
 
1544
        op= &select_max_min_finder_subselect::cmp_real;
 
1545
        break;
1437
1546
      case INT_RESULT:
1438
 
        op= &select_max_min_finder_subselect::cmp_int;
1439
 
        break;
 
1547
        op= &select_max_min_finder_subselect::cmp_int;
 
1548
        break;
1440
1549
      case STRING_RESULT:
1441
 
        op= &select_max_min_finder_subselect::cmp_str;
1442
 
        break;
 
1550
        op= &select_max_min_finder_subselect::cmp_str;
 
1551
        break;
1443
1552
      case DECIMAL_RESULT:
1444
1553
        op= &select_max_min_finder_subselect::cmp_decimal;
1445
1554
        break;
1446
1555
      case ROW_RESULT:
1447
1556
        // This case should never be choosen
1448
 
        assert(0);
1449
 
        op= 0;
 
1557
        assert(0);
 
1558
        op= 0;
1450
1559
      }
1451
1560
    }
1452
1561
    cache->store(val_item);
1485
1594
bool select_max_min_finder_subselect::cmp_decimal()
1486
1595
{
1487
1596
  Item *maxmin= ((Item_singlerow_subselect *)item)->element_index(0);
1488
 
  type::Decimal cval, *cvalue= cache->val_decimal(&cval);
1489
 
  type::Decimal mval, *mvalue= maxmin->val_decimal(&mval);
 
1597
  my_decimal cval, *cvalue= cache->val_decimal(&cval);
 
1598
  my_decimal mval, *mvalue= maxmin->val_decimal(&mval);
1490
1599
  if (fmax)
1491
1600
    return (cache->null_value && !maxmin->null_value) ||
1492
1601
      (!cache->null_value && !maxmin->null_value &&
1493
 
       class_decimal_cmp(cvalue, mvalue) > 0) ;
 
1602
       my_decimal_cmp(cvalue, mvalue) > 0) ;
1494
1603
  return (maxmin->null_value && !cache->null_value) ||
1495
1604
    (!cache->null_value && !maxmin->null_value &&
1496
 
     class_decimal_cmp(cvalue,mvalue) < 0);
 
1605
     my_decimal_cmp(cvalue,mvalue) < 0);
1497
1606
}
1498
1607
 
1499
1608
bool select_max_min_finder_subselect::cmp_str()
1535
1644
void Session::end_statement()
1536
1645
{
1537
1646
  /* Cleanup SQL processing state to reuse this statement in next query. */
1538
 
  lex->end();
1539
 
  query_cache_key= ""; // reset the cache key
1540
 
  resetResultsetMessage();
 
1647
  lex_end(lex);
1541
1648
}
1542
1649
 
1543
1650
bool Session::copy_db_to(char **p_db, size_t *p_db_length)
1544
1651
{
1545
 
  assert(_schema);
1546
 
  if (_schema and _schema->empty())
1547
 
  {
1548
 
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1549
 
    return true;
1550
 
  }
1551
 
  else if (not _schema)
1552
 
  {
1553
 
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1554
 
    return true;
1555
 
  }
1556
 
  assert(_schema);
1557
 
 
1558
 
  *p_db= strmake(_schema->c_str(), _schema->size());
1559
 
  *p_db_length= _schema->size();
1560
 
 
 
1652
  if (db.empty())
 
1653
  {
 
1654
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
 
1655
    return true;
 
1656
  }
 
1657
  *p_db= strmake(db.c_str(), db.length());
 
1658
  *p_db_length= db.length();
1561
1659
  return false;
1562
1660
}
1563
1661
 
1572
1670
  quick_group= 1;
1573
1671
  table_charset= 0;
1574
1672
  precomputed_group_by= 0;
 
1673
  bit_fields_as_long= 0;
1575
1674
}
1576
1675
 
1577
1676
void Tmp_Table_Param::cleanup(void)
1579
1678
  /* Fix for Intel compiler */
1580
1679
  if (copy_field)
1581
1680
  {
1582
 
    boost::checked_array_delete(copy_field);
1583
 
    save_copy_field= save_copy_field_end= copy_field= copy_field_end= 0;
 
1681
    delete [] copy_field;
 
1682
    save_copy_field= copy_field= 0;
1584
1683
  }
1585
1684
}
1586
1685
 
1587
1686
void Session::send_kill_message() const
1588
1687
{
1589
 
  drizzled::error_t err= static_cast<drizzled::error_t>(killed_errno());
1590
 
  if (err != EE_OK)
 
1688
  int err= killed_errno();
 
1689
  if (err)
1591
1690
    my_message(err, ER(err), MYF(0));
1592
1691
}
1593
1692
 
1596
1695
  memset(&status_var, 0, sizeof(status_var));
1597
1696
}
1598
1697
 
1599
 
 
1600
 
void Session::set_db(const std::string &new_db)
1601
 
{
1602
 
  /* Do not reallocate memory if current chunk is big enough. */
1603
 
  if (new_db.length())
1604
 
  {
1605
 
    _schema.reset(new std::string(new_db));
1606
 
  }
1607
 
  else
1608
 
  {
1609
 
    _schema.reset(new std::string(""));
1610
 
  }
1611
 
}
1612
 
 
 
1698
void Security_context::skip_grants()
 
1699
{
 
1700
  /* privileges for the user are unknown everything is allowed */
 
1701
}
 
1702
 
 
1703
 
 
1704
/****************************************************************************
 
1705
  Handling of open and locked tables states.
 
1706
 
 
1707
  This is used when we want to open/lock (and then close) some tables when
 
1708
  we already have a set of tables open and locked. We use these methods for
 
1709
  access to mysql.proc table to find definitions of stored routines.
 
1710
****************************************************************************/
 
1711
 
 
1712
void Session::reset_n_backup_open_tables_state(Open_tables_state *backup)
 
1713
{
 
1714
  backup->set_open_tables_state(this);
 
1715
  reset_open_tables_state();
 
1716
  backups_available= false;
 
1717
}
 
1718
 
 
1719
 
 
1720
void Session::restore_backup_open_tables_state(Open_tables_state *backup)
 
1721
{
 
1722
  /*
 
1723
    Before we will throw away current open tables state we want
 
1724
    to be sure that it was properly cleaned up.
 
1725
  */
 
1726
  assert(open_tables == 0 && temporary_tables == 0 &&
 
1727
              derived_tables == 0 &&
 
1728
              lock == 0);
 
1729
  set_open_tables_state(backup);
 
1730
}
 
1731
 
 
1732
 
 
1733
bool Session::set_db(const NormalisedDatabaseName &new_db)
 
1734
{
 
1735
  db= new_db.to_string();
 
1736
 
 
1737
  return false;
 
1738
}
 
1739
 
 
1740
void Session::clear_db()
 
1741
{
 
1742
  db.clear();
 
1743
}
 
1744
 
 
1745
 
 
1746
/**
 
1747
  Check the killed state of a user thread
 
1748
  @param session  user thread
 
1749
  @retval 0 the user thread is active
 
1750
  @retval 1 the user thread has been killed
 
1751
*/
 
1752
extern "C" int session_killed(const Session *session)
 
1753
{
 
1754
  return(session->killed);
 
1755
}
 
1756
 
 
1757
/**
 
1758
  Return the session id of a user session
 
1759
  @param pointer to Session object
 
1760
  @return session's id
 
1761
*/
 
1762
extern "C" unsigned long session_get_thread_id(const Session *session)
 
1763
{
 
1764
  return (unsigned long) session->getSessionId();
 
1765
}
 
1766
 
 
1767
 
 
1768
const struct charset_info_st *session_charset(Session *session)
 
1769
{
 
1770
  return(session->charset());
 
1771
}
 
1772
 
 
1773
char **session_query(Session *session)
 
1774
{
 
1775
  return(&session->query);
 
1776
}
 
1777
 
 
1778
int session_non_transactional_update(const Session *session)
 
1779
{
 
1780
  return(session->transaction.all.modified_non_trans_table);
 
1781
}
 
1782
 
 
1783
void session_mark_transaction_to_rollback(Session *session, bool all)
 
1784
{
 
1785
  mark_transaction_to_rollback(session, all);
 
1786
}
1613
1787
 
1614
1788
/**
1615
1789
  Mark transaction to rollback and mark error as fatal to a sub-statement.
1617
1791
  @param  session   Thread handle
1618
1792
  @param  all   true <=> rollback main transaction.
1619
1793
*/
1620
 
void Session::markTransactionForRollback(bool all)
 
1794
void mark_transaction_to_rollback(Session *session, bool all)
1621
1795
{
1622
 
  is_fatal_sub_stmt_error= true;
1623
 
  transaction_rollback_request= all;
 
1796
  if (session)
 
1797
  {
 
1798
    session->is_fatal_sub_stmt_error= true;
 
1799
    session->transaction_rollback_request= all;
 
1800
  }
1624
1801
}
1625
1802
 
1626
 
void Session::disconnect(enum error_t errcode)
 
1803
void Session::disconnect(uint32_t errcode, bool should_lock)
1627
1804
{
1628
1805
  /* Allow any plugins to cleanup their session variables */
1629
1806
  plugin_sessionvar_cleanup(this);
1630
1807
 
1631
1808
  /* If necessary, log any aborted or unauthorized connections */
1632
 
  if (getKilled() || client->wasAborted())
1633
 
  {
1634
 
    status_var.aborted_threads++;
1635
 
  }
 
1809
  if (killed || client->wasAborted())
 
1810
    statistic_increment(aborted_threads, &LOCK_status);
1636
1811
 
1637
1812
  if (client->wasAborted())
1638
1813
  {
1639
 
    if (not getKilled() && variables.log_warnings > 1)
 
1814
    if (! killed && variables.log_warnings > 1)
1640
1815
    {
1641
 
      errmsg_printf(error::WARN, ER(ER_NEW_ABORTING_CONNECTION)
 
1816
      Security_context *sctx= &security_ctx;
 
1817
 
 
1818
      errmsg_printf(ERRMSG_LVL_WARN, ER(ER_NEW_ABORTING_CONNECTION)
1642
1819
                  , thread_id
1643
 
                  , (_schema->empty() ? "unconnected" : _schema->c_str())
1644
 
                  , security_ctx->username().empty() == false ? security_ctx->username().c_str() : "unauthenticated"
1645
 
                  , security_ctx->address().c_str()
 
1820
                  , (db.empty() ? "unconnected" : db.c_str())
 
1821
                  , sctx->user.empty() == false ? sctx->user.c_str() : "unauthenticated"
 
1822
                  , sctx->ip.c_str()
1646
1823
                  , (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1647
1824
    }
1648
1825
  }
1649
1826
 
1650
 
  setKilled(Session::KILL_CONNECTION);
1651
 
 
 
1827
  /* Close out our connection to the client */
 
1828
  if (should_lock)
 
1829
    (void) pthread_mutex_lock(&LOCK_thread_count);
 
1830
  killed= Session::KILL_CONNECTION;
1652
1831
  if (client->isConnected())
1653
1832
  {
1654
 
    if (errcode != EE_OK)
 
1833
    if (errcode)
1655
1834
    {
1656
1835
      /*my_error(errcode, ER(errcode));*/
1657
1836
      client->sendError(errcode, ER(errcode));
1658
1837
    }
1659
1838
    client->close();
1660
1839
  }
 
1840
  if (should_lock)
 
1841
    (void) pthread_mutex_unlock(&LOCK_thread_count);
1661
1842
}
1662
1843
 
1663
1844
void Session::reset_for_next_command()
1674
1855
  server_status&= ~ (SERVER_MORE_RESULTS_EXISTS |
1675
1856
                          SERVER_QUERY_NO_INDEX_USED |
1676
1857
                          SERVER_QUERY_NO_GOOD_INDEX_USED);
 
1858
  /*
 
1859
    If in autocommit mode and not in a transaction, reset
 
1860
    OPTION_STATUS_NO_TRANS_UPDATE to not get warnings
 
1861
    in ha_rollback_trans() about some tables couldn't be rolled back.
 
1862
  */
 
1863
  if (!(options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
 
1864
  {
 
1865
    transaction.all.modified_non_trans_table= false;
 
1866
  }
1677
1867
 
1678
1868
  clear_error();
1679
1869
  main_da.reset_diagnostics_area();
1685
1875
  Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
1686
1876
*/
1687
1877
 
1688
 
void Open_tables_state::close_temporary_tables()
 
1878
void Session::close_temporary_tables()
1689
1879
{
1690
1880
  Table *table;
1691
1881
  Table *tmp_next;
1692
1882
 
1693
 
  if (not temporary_tables)
 
1883
  if (!temporary_tables)
1694
1884
    return;
1695
1885
 
1696
1886
  for (table= temporary_tables; table; table= tmp_next)
1697
1887
  {
1698
 
    tmp_next= table->getNext();
1699
 
    nukeTable(table);
 
1888
    tmp_next= table->next;
 
1889
    close_temporary(table);
1700
1890
  }
1701
1891
  temporary_tables= NULL;
1702
1892
}
1705
1895
  unlink from session->temporary tables and close temporary table
1706
1896
*/
1707
1897
 
1708
 
void Open_tables_state::close_temporary_table(Table *table)
 
1898
void Session::close_temporary_table(Table *table)
1709
1899
{
1710
 
  if (table->getPrev())
 
1900
  if (table->prev)
1711
1901
  {
1712
 
    table->getPrev()->setNext(table->getNext());
1713
 
    if (table->getPrev()->getNext())
1714
 
    {
1715
 
      table->getNext()->setPrev(table->getPrev());
1716
 
    }
 
1902
    table->prev->next= table->next;
 
1903
    if (table->prev->next)
 
1904
      table->next->prev= table->prev;
1717
1905
  }
1718
1906
  else
1719
1907
  {
1724
1912
      passing non-zero value to end_slave via rli->save_temporary_tables
1725
1913
      when no temp tables opened, see an invariant below.
1726
1914
    */
1727
 
    temporary_tables= table->getNext();
 
1915
    temporary_tables= table->next;
1728
1916
    if (temporary_tables)
1729
 
    {
1730
 
      table->getNext()->setPrev(NULL);
1731
 
    }
 
1917
      table->next->prev= NULL;
1732
1918
  }
1733
 
  nukeTable(table);
 
1919
  close_temporary(table);
1734
1920
}
1735
1921
 
1736
1922
/*
1737
 
  Close and drop a temporary table
 
1923
  Close and delete a temporary table
1738
1924
 
1739
1925
  NOTE
1740
1926
  This dosn't unlink table from session->temporary
1741
1927
  If this is needed, use close_temporary_table()
1742
1928
*/
1743
1929
 
1744
 
void Open_tables_state::nukeTable(Table *table)
 
1930
void Session::close_temporary(Table *table)
1745
1931
{
1746
 
  plugin::StorageEngine *table_type= table->getShare()->db_type();
 
1932
  plugin::StorageEngine *table_type= table->s->db_type();
1747
1933
 
1748
1934
  table->free_io_cache();
1749
 
  table->delete_table();
1750
 
 
1751
 
  identifier::Table identifier(table->getShare()->getSchemaName(), table->getShare()->getTableName(), table->getShare()->getPath());
1752
 
  rm_temporary_table(table_type, identifier);
1753
 
 
1754
 
  boost::checked_delete(table->getMutableShare());
1755
 
 
1756
 
  boost::checked_delete(table);
 
1935
  table->closefrm(false);
 
1936
 
 
1937
  rm_temporary_table(table_type, table->s->path.str);
 
1938
 
 
1939
  table->s->free_table_share();
 
1940
 
 
1941
  /* This makes me sad, but we're allocating it via malloc */
 
1942
  free(table);
1757
1943
}
1758
1944
 
1759
1945
/** Clear most status variables. */
1760
1946
extern time_t flush_status_time;
 
1947
extern uint32_t max_used_connections;
1761
1948
 
1762
1949
void Session::refresh_status()
1763
1950
{
 
1951
  pthread_mutex_lock(&LOCK_status);
 
1952
 
 
1953
  /* Add thread's status variabes to global status */
 
1954
  add_to_status(&global_status_var, &status_var);
 
1955
 
1764
1956
  /* Reset thread's status variables */
1765
1957
  memset(&status_var, 0, sizeof(status_var));
1766
1958
 
 
1959
  /* Reset some global variables */
 
1960
  reset_status_vars();
 
1961
 
 
1962
  /* Reset the counters of all key caches (default and named). */
 
1963
  reset_key_cache_counters();
1767
1964
  flush_status_time= time((time_t*) 0);
1768
 
  current_global_counters.max_used_connections= 1; /* We set it to one, because we know we exist */
1769
 
  current_global_counters.connections= 0;
 
1965
  max_used_connections= 1; /* We set it to one, because we know we exist */
 
1966
  pthread_mutex_unlock(&LOCK_status);
1770
1967
}
1771
1968
 
1772
1969
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1773
1970
{
1774
 
  return getVariable(std::string(name.str, name.length), create_if_not_exists);
1775
 
}
1776
 
 
1777
 
user_var_entry *Session::getVariable(const std::string  &name, bool create_if_not_exists)
1778
 
{
1779
 
  if (cleanup_done)
1780
 
    return NULL;
1781
 
 
1782
 
  UserVars::iterator iter= user_vars.find(name);
1783
 
  if (iter != user_vars.end())
1784
 
    return (*iter).second;
1785
 
 
1786
 
  if (not create_if_not_exists)
1787
 
    return NULL;
1788
 
 
1789
1971
  user_var_entry *entry= NULL;
1790
 
  entry= new (nothrow) user_var_entry(name.c_str(), query_id);
1791
 
 
1792
 
  if (entry == NULL)
1793
 
    return NULL;
1794
 
 
1795
 
  std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(name, entry));
1796
 
 
1797
 
  if (not returnable.second)
 
1972
 
 
1973
  entry= (user_var_entry*) hash_search(&user_vars, (unsigned char*) name.str, name.length);
 
1974
 
 
1975
  if ((entry == NULL) && create_if_not_exists)
1798
1976
  {
1799
 
    boost::checked_delete(entry);
 
1977
    if (!hash_inited(&user_vars))
 
1978
      return NULL;
 
1979
    entry= new (nothrow) user_var_entry(name.str, query_id);
 
1980
 
 
1981
    if (entry == NULL)
 
1982
      return NULL;
 
1983
 
 
1984
    if (my_hash_insert(&user_vars, (unsigned char*) entry))
 
1985
    {
 
1986
      assert(1);
 
1987
      free((char*) entry);
 
1988
      return 0;
 
1989
    }
 
1990
 
1800
1991
  }
1801
1992
 
1802
1993
  return entry;
1803
1994
}
1804
1995
 
1805
 
void Session::setVariable(const std::string &name, const std::string &value)
1806
 
{
1807
 
  user_var_entry *updateable_var= getVariable(name.c_str(), true);
1808
 
  if (updateable_var)
1809
 
  {
1810
 
    updateable_var->update_hash(false,
1811
 
                                (void*)value.c_str(),
1812
 
                                static_cast<uint32_t>(value.length()), STRING_RESULT,
1813
 
                                &my_charset_bin,
1814
 
                                DERIVATION_IMPLICIT, false);
1815
 
  }
1816
 
}
1817
 
 
1818
 
void Open_tables_state::mark_temp_tables_as_free_for_reuse()
1819
 
{
1820
 
  for (Table *table= temporary_tables ; table ; table= table->getNext())
1821
 
  {
1822
 
    if (table->query_id == getQueryId())
 
1996
void Session::mark_temp_tables_as_free_for_reuse()
 
1997
{
 
1998
  for (Table *table= temporary_tables ; table ; table= table->next)
 
1999
  {
 
2000
    if (table->query_id == query_id)
1823
2001
    {
1824
2002
      table->query_id= 0;
1825
2003
      table->cursor->ha_reset();
1829
2007
 
1830
2008
void Session::mark_used_tables_as_free_for_reuse(Table *table)
1831
2009
{
1832
 
  for (; table ; table= table->getNext())
 
2010
  for (; table ; table= table->next)
1833
2011
  {
1834
 
    if (table->query_id == getQueryId())
 
2012
    if (table->query_id == query_id)
1835
2013
    {
1836
2014
      table->query_id= 0;
1837
2015
      table->cursor->ha_reset();
1850
2028
*/
1851
2029
void Session::close_thread_tables()
1852
2030
{
1853
 
  clearDerivedTables();
 
2031
  Table *table;
 
2032
 
 
2033
  /*
 
2034
    We are assuming here that session->derived_tables contains ONLY derived
 
2035
    tables for this substatement. i.e. instead of approach which uses
 
2036
    query_id matching for determining which of the derived tables belong
 
2037
    to this substatement we rely on the ability of substatements to
 
2038
    save/restore session->derived_tables during their execution.
 
2039
 
 
2040
    TODO: Probably even better approach is to simply associate list of
 
2041
          derived tables with (sub-)statement instead of thread and destroy
 
2042
          them at the end of its execution.
 
2043
  */
 
2044
  if (derived_tables)
 
2045
  {
 
2046
    Table *next;
 
2047
    /*
 
2048
      Close all derived tables generated in queries like
 
2049
      SELECT * FROM (SELECT * FROM t1)
 
2050
    */
 
2051
    for (table= derived_tables ; table ; table= next)
 
2052
    {
 
2053
      next= table->next;
 
2054
      table->free_tmp_table(this);
 
2055
    }
 
2056
    derived_tables= 0;
 
2057
  }
1854
2058
 
1855
2059
  /*
1856
2060
    Mark all temporary tables used by this statement as free for reuse.
1864
2068
    does not belong to statement for which we do close_thread_tables()).
1865
2069
    TODO: This should be fixed in later releases.
1866
2070
   */
 
2071
  if (backups_available == false)
1867
2072
  {
1868
 
    TransactionServices &transaction_services= TransactionServices::singleton();
1869
2073
    main_da.can_overwrite_status= true;
1870
 
    transaction_services.autocommitOrRollback(*this, is_error());
 
2074
    ha_autocommit_or_rollback(this, is_error());
1871
2075
    main_da.can_overwrite_status= false;
1872
2076
    transaction.stmt.reset();
1873
2077
  }
1883
2087
      handled either before writing a query log event (inside
1884
2088
      binlog_query()) or when preparing a pending event.
1885
2089
     */
1886
 
    unlockTables(lock);
 
2090
    mysql_unlock_tables(this, lock);
1887
2091
    lock= 0;
1888
2092
  }
1889
2093
  /*
1890
 
    Note that we need to hold table::Cache::singleton().mutex() while changing the
 
2094
    Note that we need to hold LOCK_open while changing the
1891
2095
    open_tables list. Another thread may work on it.
1892
 
    (See: table::Cache::singleton().removeTable(), wait_completed_table())
 
2096
    (See: remove_table_from_cache(), mysql_wait_completed_table())
1893
2097
    Closing a MERGE child before the parent would be fatal if the
1894
2098
    other thread tries to abort the MERGE lock in between.
1895
2099
  */
1921
2125
    if (open_tables_from_list(&tables, &counter))
1922
2126
      return true;
1923
2127
 
1924
 
    if (not lock_tables(tables, counter, &need_reopen))
 
2128
    if (!lock_tables(tables, counter, &need_reopen))
1925
2129
      break;
1926
 
 
1927
 
    if (not need_reopen)
 
2130
    if (!need_reopen)
1928
2131
      return true;
1929
 
 
1930
2132
    close_tables_for_reopen(&tables);
1931
2133
  }
1932
 
 
1933
 
  if ((handle_derived(lex, &derived_prepare) || (handle_derived(lex, &derived_filling))))
1934
 
    return true;
1935
 
 
1936
 
  return false;
1937
 
}
1938
 
 
1939
 
/*
1940
 
  @note "best_effort" is used in cases were if a failure occurred on this
1941
 
  operation it would not be surprising because we are only removing because there
1942
 
  might be an issue (lame engines).
1943
 
*/
1944
 
 
1945
 
bool Open_tables_state::rm_temporary_table(const identifier::Table &identifier, bool best_effort)
1946
 
{
1947
 
  if (not plugin::StorageEngine::dropTable(*static_cast<Session *>(this), identifier))
1948
 
  {
1949
 
    if (not best_effort)
1950
 
    {
1951
 
      std::string path;
1952
 
      identifier.getSQLPath(path);
1953
 
      errmsg_printf(error::WARN, _("Could not remove temporary table: '%s', error: %d"),
1954
 
                    path.c_str(), errno);
1955
 
    }
1956
 
 
1957
 
    return true;
1958
 
  }
1959
 
 
1960
 
  return false;
1961
 
}
1962
 
 
1963
 
bool Open_tables_state::rm_temporary_table(plugin::StorageEngine *base, const identifier::Table &identifier)
1964
 
{
1965
 
  drizzled::error_t error;
1966
 
  assert(base);
1967
 
 
1968
 
  if (not plugin::StorageEngine::dropTable(*static_cast<Session *>(this), *base, identifier, error))
1969
 
  {
1970
 
    std::string path;
1971
 
    identifier.getSQLPath(path);
1972
 
    errmsg_printf(error::WARN, _("Could not remove temporary table: '%s', error: %d"),
1973
 
                  path.c_str(), error);
1974
 
 
1975
 
    return true;
1976
 
  }
1977
 
 
1978
 
  return false;
1979
 
}
1980
 
 
1981
 
/**
1982
 
  @note this will be removed, I am looking through Hudson to see if it is finding
1983
 
  any tables that are missed during cleanup.
1984
 
*/
1985
 
void Open_tables_state::dumpTemporaryTableNames(const char *foo)
1986
 
{
1987
 
  Table *table;
1988
 
 
1989
 
  if (not temporary_tables)
1990
 
    return;
1991
 
 
1992
 
  cerr << "Begin Run: " << foo << "\n";
1993
 
  for (table= temporary_tables; table; table= table->getNext())
1994
 
  {
1995
 
    bool have_proto= false;
1996
 
 
1997
 
    message::Table *proto= table->getShare()->getTableMessage();
1998
 
    if (table->getShare()->getTableMessage())
1999
 
      have_proto= true;
2000
 
 
2001
 
    const char *answer= have_proto ? "true" : "false";
2002
 
 
2003
 
    if (have_proto)
2004
 
    {
2005
 
      cerr << "\tTable Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2006
 
      cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
2007
 
    }
2008
 
    else
2009
 
    {
2010
 
      cerr << "\tTabl;e Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2011
 
    }
2012
 
  }
2013
 
}
2014
 
 
2015
 
table::Singular *Session::getInstanceTable()
2016
 
{
2017
 
  temporary_shares.push_back(new table::Singular()); // This will not go into the tableshare cache, so no key is used.
2018
 
 
2019
 
  table::Singular *tmp_share= temporary_shares.back();
2020
 
 
2021
 
  assert(tmp_share);
2022
 
 
2023
 
  return tmp_share;
2024
 
}
2025
 
 
2026
 
 
2027
 
/**
2028
 
  Create a reduced Table object with properly set up Field list from a
2029
 
  list of field definitions.
2030
 
 
2031
 
    The created table doesn't have a table Cursor associated with
2032
 
    it, has no keys, no group/distinct, no copy_funcs array.
2033
 
    The sole purpose of this Table object is to use the power of Field
2034
 
    class to read/write data to/from table->getInsertRecord(). Then one can store
2035
 
    the record in any container (RB tree, hash, etc).
2036
 
    The table is created in Session mem_root, so are the table's fields.
2037
 
    Consequently, if you don't BLOB fields, you don't need to free it.
2038
 
 
2039
 
  @param session         connection handle
2040
 
  @param field_list  list of column definitions
2041
 
 
2042
 
  @return
2043
 
    0 if out of memory, Table object in case of success
2044
 
*/
2045
 
table::Singular *Session::getInstanceTable(List<CreateField> &field_list)
2046
 
{
2047
 
  temporary_shares.push_back(new table::Singular(this, field_list)); // This will not go into the tableshare cache, so no key is used.
2048
 
 
2049
 
  table::Singular *tmp_share= temporary_shares.back();
2050
 
 
2051
 
  assert(tmp_share);
2052
 
 
2053
 
  return tmp_share;
2054
 
}
2055
 
 
2056
 
namespace display  {
2057
 
 
2058
 
static const std::string NONE= "NONE";
2059
 
static const std::string GOT_GLOBAL_READ_LOCK= "HAS GLOBAL READ LOCK";
2060
 
static const std::string MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT= "HAS GLOBAL READ LOCK WITH BLOCKING COMMIT";
2061
 
 
2062
 
const std::string &type(drizzled::Session::global_read_lock_t type)
2063
 
{
2064
 
  switch (type) {
2065
 
    default:
2066
 
    case Session::NONE:
2067
 
      return NONE;
2068
 
    case Session::GOT_GLOBAL_READ_LOCK:
2069
 
      return GOT_GLOBAL_READ_LOCK;
2070
 
    case Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT:
2071
 
      return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT;
2072
 
  }
2073
 
}
2074
 
 
2075
 
size_t max_string_length(drizzled::Session::global_read_lock_t)
2076
 
{
2077
 
  return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT.size();
2078
 
}
2079
 
 
2080
 
} /* namespace display */
2081
 
 
2082
 
} /* namespace drizzled */
 
2134
  if ((mysql_handle_derived(lex, &mysql_derived_prepare) ||
 
2135
       (fill_derived_tables() &&
 
2136
        mysql_handle_derived(lex, &mysql_derived_filling))))
 
2137
    return true;
 
2138
 
 
2139
  return false;
 
2140
}
 
2141
 
 
2142
bool Session::openTables(TableList *tables, uint32_t flags)
 
2143
{
 
2144
  uint32_t counter;
 
2145
  bool ret= fill_derived_tables();
 
2146
  assert(ret == false);
 
2147
  if (open_tables_from_list(&tables, &counter, flags) ||
 
2148
      mysql_handle_derived(lex, &mysql_derived_prepare))
 
2149
    return true;
 
2150
  return false;
 
2151
}
 
2152
 
 
2153
bool Session::rm_temporary_table(plugin::StorageEngine *base, TableIdentifier &identifier)
 
2154
{
 
2155
  bool error= false;
 
2156
 
 
2157
  assert(base);
 
2158
 
 
2159
  if (plugin::StorageEngine::deleteDefinitionFromPath(identifier))
 
2160
    error= true;
 
2161
 
 
2162
  if (base->doDropTable(*this, identifier.getPath()))
 
2163
  {
 
2164
    error= true;
 
2165
    errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
 
2166
                  identifier.getPath(), errno);
 
2167
  }
 
2168
  return error;
 
2169
}
 
2170
 
 
2171
bool Session::rm_temporary_table(plugin::StorageEngine *base, const char *path)
 
2172
{
 
2173
  bool error= false;
 
2174
 
 
2175
  assert(base);
 
2176
 
 
2177
  if (delete_table_proto_file(path))
 
2178
    error= true;
 
2179
 
 
2180
  if (base->doDropTable(*this, path))
 
2181
  {
 
2182
    error= true;
 
2183
    errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
 
2184
                  path, errno);
 
2185
  }
 
2186
  return error;
 
2187
}