~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/session.cc

  • Committer: Brian Aker
  • Date: 2009-03-25 18:24:15 UTC
  • mto: This revision was merged to the branch mainline in revision 963.
  • Revision ID: brian@tangent.org-20090325182415-opf2720c1hidtfgk
Cut down on shutdown loop of plugins (cutting stuff out in order to simplify
old lock system).

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
18
 */
19
19
 
20
 
/**
21
 
 * @file Implementation of the Session class and API
22
 
 */
23
 
 
24
 
#include "config.h"
25
 
#include "drizzled/session.h"
26
 
#include "drizzled/session/cache.h"
 
20
#include <drizzled/server_includes.h>
 
21
#include <drizzled/session.h>
27
22
#include <sys/stat.h>
28
 
#include "drizzled/error.h"
29
 
#include "drizzled/gettext.h"
30
 
#include "drizzled/query_id.h"
31
 
#include "drizzled/data_home.h"
32
 
#include "drizzled/sql_base.h"
33
 
#include "drizzled/lock.h"
34
 
#include "drizzled/item/cache.h"
35
 
#include "drizzled/item/float.h"
36
 
#include "drizzled/item/return_int.h"
37
 
#include "drizzled/item/empty_string.h"
38
 
#include "drizzled/show.h"
39
 
#include "drizzled/plugin/client.h"
40
 
#include "drizzled/plugin/scheduler.h"
41
 
#include "drizzled/plugin/authentication.h"
42
 
#include "drizzled/plugin/logging.h"
43
 
#include "drizzled/plugin/transactional_storage_engine.h"
44
 
#include "drizzled/plugin/query_rewrite.h"
45
 
#include "drizzled/probes.h"
46
 
#include "drizzled/table_proto.h"
47
 
#include "drizzled/db.h"
48
 
#include "drizzled/pthread_globals.h"
49
 
#include "drizzled/transaction_services.h"
50
 
#include "drizzled/drizzled.h"
51
 
 
52
 
#include "drizzled/table/instance.h"
53
 
 
54
 
#include "plugin/myisam/myisam.h"
55
 
#include "drizzled/internal/iocache.h"
56
 
#include "drizzled/internal/thread_var.h"
57
 
#include "drizzled/plugin/event_observer.h"
58
 
 
59
 
#include "drizzled/util/functors.h"
60
 
 
61
 
#include "drizzled/display.h"
62
 
 
63
 
#include <fcntl.h>
64
 
#include <algorithm>
65
 
#include <climits>
66
 
#include <boost/filesystem.hpp>
67
 
 
68
 
#include "drizzled/util/backtrace.h"
69
 
 
70
 
using namespace std;
71
 
 
72
 
namespace fs=boost::filesystem;
73
 
namespace drizzled
74
 
{
 
23
#include <mysys/mysys_err.h>
 
24
#include <drizzled/error.h>
 
25
#include <drizzled/query_id.h>
 
26
#include <drizzled/data_home.h>
 
27
#include <drizzled/sql_base.h>
 
28
#include <drizzled/lock.h>
 
29
#include <drizzled/item/cache.h>
 
30
#include <drizzled/item/float.h>
 
31
#include <drizzled/item/return_int.h>
 
32
#include <drizzled/item/empty_string.h>
 
33
#include <drizzled/show.h>
 
34
#include <drizzled/scheduling.h>
 
35
#include <libdrizzleclient/errmsg.h>
75
36
 
76
37
/*
77
38
  The following is used to initialise Table_ident with a internal
81
42
char empty_c_string[1]= {0};    /* used for not defined db */
82
43
 
83
44
const char * const Session::DEFAULT_WHERE= "field list";
 
45
extern pthread_key_t THR_Session;
 
46
extern pthread_key_t THR_Mem_root;
 
47
 
 
48
/*****************************************************************************
 
49
** Instansiate templates
 
50
*****************************************************************************/
 
51
 
 
52
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
53
/* Used templates */
 
54
template class List<Key>;
 
55
template class List_iterator<Key>;
 
56
template class List<Key_part_spec>;
 
57
template class List_iterator<Key_part_spec>;
 
58
template class List<Alter_drop>;
 
59
template class List_iterator<Alter_drop>;
 
60
template class List<Alter_column>;
 
61
template class List_iterator<Alter_column>;
 
62
#endif
 
63
 
 
64
/****************************************************************************
 
65
** User variables
 
66
****************************************************************************/
 
67
extern "C" unsigned char *get_var_key(user_var_entry *entry, size_t *length,
 
68
                              bool )
 
69
{
 
70
  *length= entry->name.length;
 
71
  return (unsigned char*) entry->name.str;
 
72
}
 
73
 
 
74
extern "C" void free_user_var(user_var_entry *entry)
 
75
{
 
76
  char *pos= (char*) entry+ALIGN_SIZE(sizeof(*entry));
 
77
  if (entry->value && entry->value != pos)
 
78
    free(entry->value);
 
79
  free((char*) entry);
 
80
}
84
81
 
85
82
bool Key_part_spec::operator==(const Key_part_spec& other) const
86
83
{
87
84
  return length == other.length &&
88
85
         field_name.length == other.field_name.length &&
89
 
    !my_strcasecmp(system_charset_info, field_name.str, other.field_name.str);
 
86
         !strcmp(field_name.str, other.field_name.str);
90
87
}
91
88
 
92
 
Open_tables_state::Open_tables_state(uint64_t version_arg) :
93
 
  version(version_arg)
 
89
/****************************************************************************
 
90
** Thread specific functions
 
91
****************************************************************************/
 
92
 
 
93
Open_tables_state::Open_tables_state(ulong version_arg)
 
94
  :version(version_arg), state_flags(0U)
94
95
{
95
 
  open_tables= temporary_tables= derived_tables= NULL;
96
 
  extra_lock= lock= NULL;
 
96
  reset_open_tables_state();
97
97
}
98
98
 
99
99
/*
100
100
  The following functions form part of the C plugin API
101
101
*/
102
 
int mysql_tmpfile(const char *prefix)
 
102
 
 
103
extern "C" int mysql_tmpfile(const char *prefix)
103
104
{
104
105
  char filename[FN_REFLEN];
105
 
  int fd = internal::create_temp_file(filename, drizzle_tmpdir.c_str(), prefix, MYF(MY_WME));
 
106
  File fd = create_temp_file(filename, drizzle_tmpdir, prefix,
 
107
                             O_CREAT | O_EXCL | O_RDWR,
 
108
                             MYF(MY_WME));
106
109
  if (fd >= 0) {
107
110
    unlink(filename);
108
111
  }
110
113
  return fd;
111
114
}
112
115
 
 
116
 
 
117
extern "C"
 
118
int session_in_lock_tables(const Session *session)
 
119
{
 
120
  return test(session->in_lock_tables);
 
121
}
 
122
 
 
123
 
 
124
extern "C"
113
125
int session_tablespace_op(const Session *session)
114
126
{
115
127
  return test(session->tablespace_op);
116
128
}
117
129
 
 
130
 
118
131
/**
119
132
   Set the process info field of the Session structure.
120
133
 
123
136
 
124
137
   @see Session::set_proc_info
125
138
 */
126
 
void set_session_proc_info(Session *session, const char *info)
 
139
extern "C" void
 
140
set_session_proc_info(Session *session, const char *info)
127
141
{
128
142
  session->set_proc_info(info);
129
143
}
130
144
 
 
145
extern "C"
131
146
const char *get_session_proc_info(Session *session)
132
147
{
133
148
  return session->get_proc_info();
134
149
}
135
150
 
136
 
void **Session::getEngineData(const plugin::MonitoredInTransaction *monitored)
137
 
{
138
 
  return static_cast<void **>(&ha_data[monitored->getId()].ha_ptr);
139
 
}
140
 
 
141
 
ResourceContext *Session::getResourceContext(const plugin::MonitoredInTransaction *monitored,
142
 
                                             size_t index)
143
 
{
144
 
  return &ha_data[monitored->getId()].resource_context[index];
145
 
}
146
 
 
 
151
extern "C"
 
152
void **session_ha_data(const Session *session, const struct handlerton *hton)
 
153
{
 
154
  return (void **) &session->ha_data[hton->slot].ha_ptr;
 
155
}
 
156
 
 
157
extern "C"
147
158
int64_t session_test_options(const Session *session, int64_t test_options)
148
159
{
149
160
  return session->options & test_options;
150
161
}
151
162
 
 
163
extern "C"
152
164
int session_sql_command(const Session *session)
153
165
{
154
166
  return (int) session->lex->sql_command;
155
167
}
156
168
 
157
 
enum_tx_isolation session_tx_isolation(const Session *session)
158
 
{
159
 
  return (enum_tx_isolation)session->variables.tx_isolation;
160
 
}
161
 
 
162
 
Session::Session(plugin::Client *client_arg) :
163
 
  Open_tables_state(refresh_version),
164
 
  mem_root(&main_mem_root),
165
 
  xa_id(0),
166
 
  lex(&main_lex),
167
 
  query(new std::string),
168
 
  _schema(new std::string("")),
169
 
  catalog("LOCAL"),
170
 
  client(client_arg),
171
 
  scheduler(NULL),
172
 
  scheduler_arg(NULL),
173
 
  lock_id(&main_lock_id),
174
 
  user_time(0),
175
 
  ha_data(plugin::num_trx_monitored_objects),
176
 
  concurrent_execute_allowed(true),
177
 
  arg_of_last_insert_id_function(false),
178
 
  first_successful_insert_id_in_prev_stmt(0),
179
 
  first_successful_insert_id_in_cur_stmt(0),
180
 
  limit_found_rows(0),
181
 
  _global_read_lock(NONE),
182
 
  _killed(NOT_KILLED),
183
 
  some_tables_deleted(false),
184
 
  no_errors(false),
185
 
  password(false),
186
 
  is_fatal_error(false),
187
 
  transaction_rollback_request(false),
188
 
  is_fatal_sub_stmt_error(0),
189
 
  derived_tables_processing(false),
190
 
  tablespace_op(false),
191
 
  m_lip(NULL),
192
 
  cached_table(0),
193
 
  transaction_message(NULL),
194
 
  statement_message(NULL),
195
 
  session_event_observers(NULL),
196
 
  use_usage(false)
197
 
{
198
 
  client->setSession(this);
 
169
extern "C"
 
170
int session_tx_isolation(const Session *session)
 
171
{
 
172
  return (int) session->variables.tx_isolation;
 
173
}
 
174
 
 
175
extern "C"
 
176
void session_inc_row_count(Session *session)
 
177
{
 
178
  session->row_count++;
 
179
}
 
180
 
 
181
Session::Session()
 
182
   :Statement(&main_lex, &main_mem_root,
 
183
              /* statement id */ 0),
 
184
   Open_tables_state(refresh_version),
 
185
   lock_id(&main_lock_id),
 
186
   user_time(0),
 
187
   arg_of_last_insert_id_function(false),
 
188
   first_successful_insert_id_in_prev_stmt(0),
 
189
   first_successful_insert_id_in_cur_stmt(0),
 
190
   global_read_lock(0),
 
191
   is_fatal_error(0),
 
192
   transaction_rollback_request(0),
 
193
   is_fatal_sub_stmt_error(0),
 
194
   in_lock_tables(0),
 
195
   derived_tables_processing(false),
 
196
   m_lip(NULL),
 
197
   scheduler(0)
 
198
{
 
199
  uint64_t tmp;
 
200
 
 
201
  memset(process_list_info, 0, PROCESS_LIST_WIDTH);
199
202
 
200
203
  /*
201
204
    Pass nominal parameters to init_alloc_root only to ensure that
202
205
    the destructor works OK in case of an error. The main_mem_root
203
206
    will be re-initialized in init_for_queries().
204
207
  */
205
 
  memory::init_sql_alloc(&main_mem_root, memory::ROOT_MIN_BLOCK_SIZE, 0);
206
 
  thread_stack= NULL;
207
 
  count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
208
 
  col_access= 0;
209
 
  tmp_table= 0;
210
 
  used_tables= 0;
 
208
  init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0);
 
209
  thread_stack= 0;
 
210
  catalog= (char*)"std"; // the only catalog we have for now
 
211
  some_tables_deleted=no_errors=password= 0;
 
212
  count_cuted_fields= CHECK_FIELD_IGNORE;
 
213
  killed= NOT_KILLED;
 
214
  col_access=0;
 
215
  thread_specific_used= false;
 
216
  tmp_table=0;
 
217
  used_tables=0;
211
218
  cuted_fields= sent_row_count= row_count= 0L;
 
219
  limit_found_rows= 0;
212
220
  row_count_func= -1;
213
221
  statement_id_counter= 0UL;
214
222
  // Must be reset to handle error with Session's created for init of mysqld
220
228
  thread_id= 0;
221
229
  file_id = 0;
222
230
  query_id= 0;
223
 
  warn_query_id= 0;
224
 
  mysys_var= 0;
225
 
  scoreboard_index= -1;
 
231
  warn_id= 0;
 
232
  db_charset= global_system_variables.collation_database;
 
233
  memset(ha_data, 0, sizeof(ha_data));
 
234
  replication_data= 0;
 
235
  mysys_var=0;
226
236
  dbug_sentry=Session_SENTRY_MAGIC;
227
 
  cleanup_done= abort_on_warning= no_warnings_for_error= false;  
228
 
 
229
 
  /* query_cache init */
230
 
  query_cache_key= "";
231
 
  resultset= NULL;
 
237
  net.vio= 0;
 
238
  client_capabilities= 0;                       // minimalistic client
 
239
  system_thread= NON_SYSTEM_THREAD;
 
240
  cleanup_done= abort_on_warning= no_warnings_for_error= false;
 
241
  peer_port= 0;                                 // For SHOW PROCESSLIST
 
242
  transaction.on= 1;
 
243
  pthread_mutex_init(&LOCK_delete, MY_MUTEX_INIT_FAST);
232
244
 
233
245
  /* Variables with default values */
234
246
  proc_info="login";
235
247
  where= Session::DEFAULT_WHERE;
236
 
  command= COM_CONNECT;
237
 
 
238
 
  plugin_sessionvar_init(this);
239
 
  /*
240
 
    variables= global_system_variables above has reset
241
 
    variables.pseudo_thread_id to 0. We need to correct it here to
242
 
    avoid temporary tables replication failure.
243
 
  */
244
 
  variables.pseudo_thread_id= thread_id;
245
 
  server_status= SERVER_STATUS_AUTOCOMMIT;
246
 
  options= session_startup_options;
247
 
 
248
 
  if (variables.max_join_size == HA_POS_ERROR)
249
 
    options |= OPTION_BIG_SELECTS;
250
 
  else
251
 
    options &= ~OPTION_BIG_SELECTS;
252
 
 
253
 
  open_options=ha_open_options;
254
 
  update_lock_default= TL_WRITE;
255
 
  session_tx_isolation= (enum_tx_isolation) variables.tx_isolation;
256
 
  warn_list.empty();
257
 
  memset(warn_count, 0, sizeof(warn_count));
258
 
  total_warn_count= 0;
259
 
  memset(&status_var, 0, sizeof(status_var));
260
 
 
 
248
  server_id = ::server_id;
 
249
  command=COM_CONNECT;
 
250
  *scramble= '\0';
 
251
 
 
252
  init();
261
253
  /* Initialize sub structures */
262
 
  memory::init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
263
 
 
 
254
  init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
 
255
  hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0,
 
256
            (hash_get_key) get_var_key,
 
257
            (hash_free_key) free_user_var, 0);
 
258
 
 
259
  /* Protocol */
 
260
  protocol= &protocol_text;                     // Default protocol
 
261
  protocol_text.init(this);
 
262
 
 
263
  const Query_id& local_query_id= Query_id::get_query_id();
 
264
  tablespace_op= false;
 
265
  tmp= sql_rnd();
 
266
  drizzleclient_randominit(&rand, tmp + (uint64_t) &rand,
 
267
                           tmp + (uint64_t)local_query_id.value());
264
268
  substitute_null_with_insert_id = false;
265
 
  lock_info.init(); /* safety: will be reset after start */
 
269
  thr_lock_info_init(&lock_info); /* safety: will be reset after start */
266
270
  thr_lock_owner_init(&main_lock_id, &lock_info);
267
271
 
268
272
  m_internal_handler= NULL;
269
 
  
270
 
  plugin::EventObserver::registerSessionEvents(*this); 
271
273
}
272
274
 
273
 
void Session::free_items()
274
 
{
275
 
  Item *next;
276
 
  /* This works because items are allocated with memory::sql_alloc() */
277
 
  for (; free_list; free_list= next)
278
 
  {
279
 
    next= free_list->next;
280
 
    free_list->delete_self();
281
 
  }
282
 
}
283
275
 
284
276
void Session::push_internal_handler(Internal_error_handler *handler)
285
277
{
291
283
  m_internal_handler= handler;
292
284
}
293
285
 
 
286
 
294
287
bool Session::handle_error(uint32_t sql_errno, const char *message,
295
288
                       DRIZZLE_ERROR::enum_warning_level level)
296
289
{
302
295
  return false;                                 // 'false', as per coding style
303
296
}
304
297
 
305
 
void Session::setAbort(bool arg)
306
 
{
307
 
  mysys_var->abort= arg;
308
 
}
309
 
 
310
 
void Session::lockOnSys()
311
 
{
312
 
  if (not mysys_var)
313
 
    return;
314
 
 
315
 
  setAbort(true);
316
 
  boost_unique_lock_t scopedLock(mysys_var->mutex);
317
 
  if (mysys_var->current_cond)
318
 
  {
319
 
    mysys_var->current_mutex->lock();
320
 
    mysys_var->current_cond->notify_all();
321
 
    mysys_var->current_mutex->unlock();
322
 
  }
323
 
}
324
298
 
325
299
void Session::pop_internal_handler()
326
300
{
328
302
  m_internal_handler= NULL;
329
303
}
330
304
 
331
 
void Session::get_xid(DRIZZLE_XID *xid)
332
 
{
333
 
  *xid = *(DRIZZLE_XID *) &transaction.xid_state.xid;
334
 
}
 
305
#if defined(__cplusplus)
 
306
extern "C" {
 
307
#endif
 
308
 
 
309
void *session_alloc(Session *session, unsigned int size)
 
310
{
 
311
  return session->alloc(size);
 
312
}
 
313
 
 
314
void *session_calloc(Session *session, unsigned int size)
 
315
{
 
316
  return session->calloc(size);
 
317
}
 
318
 
 
319
char *session_strdup(Session *session, const char *str)
 
320
{
 
321
  return session->strdup(str);
 
322
}
 
323
 
 
324
char *session_strmake(Session *session, const char *str, unsigned int size)
 
325
{
 
326
  return session->strmake(str, size);
 
327
}
 
328
 
 
329
void *session_memdup(Session *session, const void* str, unsigned int size)
 
330
{
 
331
  return session->memdup(str, size);
 
332
}
 
333
 
 
334
void session_get_xid(const Session *session, DRIZZLE_XID *xid)
 
335
{
 
336
  *xid = *(DRIZZLE_XID *) &session->transaction.xid_state.xid;
 
337
}
 
338
 
 
339
#if defined(__cplusplus)
 
340
}
 
341
#endif
 
342
 
 
343
/*
 
344
  Init common variables that has to be reset on start and on change_user
 
345
*/
 
346
 
 
347
void Session::init(void)
 
348
{
 
349
  pthread_mutex_lock(&LOCK_global_system_variables);
 
350
  plugin_sessionvar_init(this);
 
351
  /*
 
352
    variables= global_system_variables above has reset
 
353
    variables.pseudo_thread_id to 0. We need to correct it here to
 
354
    avoid temporary tables replication failure.
 
355
  */
 
356
  variables.pseudo_thread_id= thread_id;
 
357
  pthread_mutex_unlock(&LOCK_global_system_variables);
 
358
  server_status= SERVER_STATUS_AUTOCOMMIT;
 
359
  options= session_startup_options;
 
360
 
 
361
  if (variables.max_join_size == HA_POS_ERROR)
 
362
    options |= OPTION_BIG_SELECTS;
 
363
  else
 
364
    options &= ~OPTION_BIG_SELECTS;
 
365
 
 
366
  transaction.all.modified_non_trans_table= transaction.stmt.modified_non_trans_table= false;
 
367
  open_options=ha_open_options;
 
368
  update_lock_default= TL_WRITE;
 
369
  session_tx_isolation= (enum_tx_isolation) variables.tx_isolation;
 
370
  warn_list.empty();
 
371
  memset(warn_count, 0, sizeof(warn_count));
 
372
  total_warn_count= 0;
 
373
  update_charset();
 
374
  memset(&status_var, 0, sizeof(status_var));
 
375
}
 
376
 
 
377
 
 
378
/*
 
379
  Init Session for query processing.
 
380
  This has to be called once before we call mysql_parse.
 
381
  See also comments in session.h.
 
382
*/
 
383
 
 
384
void Session::init_for_queries()
 
385
{
 
386
  set_time();
 
387
  ha_enable_transaction(this,true);
 
388
 
 
389
  reset_root_defaults(mem_root, variables.query_alloc_block_size,
 
390
                      variables.query_prealloc_size);
 
391
  reset_root_defaults(&transaction.mem_root,
 
392
                      variables.trans_alloc_block_size,
 
393
                      variables.trans_prealloc_size);
 
394
  transaction.xid_state.xid.null();
 
395
  transaction.xid_state.in_session=1;
 
396
}
 
397
 
335
398
 
336
399
/* Do operations that may take a long time */
337
400
 
339
402
{
340
403
  assert(cleanup_done == false);
341
404
 
342
 
  setKilled(KILL_CONNECTION);
 
405
  killed= KILL_CONNECTION;
343
406
#ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE
344
407
  if (transaction.xid_state.xa_state == XA_PREPARED)
345
408
  {
347
410
  }
348
411
#endif
349
412
  {
350
 
    TransactionServices &transaction_services= TransactionServices::singleton();
351
 
    transaction_services.rollbackTransaction(this, true);
 
413
    ha_rollback(this);
352
414
    xid_cache_delete(&transaction.xid_state);
353
415
  }
354
 
 
355
 
  for (UserVars::iterator iter= user_vars.begin();
356
 
       iter != user_vars.end();
357
 
       iter++)
 
416
  if (locked_tables)
358
417
  {
359
 
    user_var_entry *entry= (*iter).second;
360
 
    delete entry;
 
418
    lock=locked_tables; locked_tables=0;
 
419
    close_thread_tables(this);
361
420
  }
362
 
  user_vars.clear();
363
 
 
364
 
 
 
421
  hash_free(&user_vars);
365
422
  close_temporary_tables();
366
423
 
367
424
  if (global_read_lock)
368
 
  {
369
 
    unlockGlobalReadLock();
370
 
  }
 
425
    unlock_global_read_lock(this);
371
426
 
372
427
  cleanup_done= true;
 
428
  return;
373
429
}
374
430
 
375
431
Session::~Session()
376
432
{
377
 
  this->checkSentry();
 
433
  Session_CHECK_SENTRY(this);
 
434
  add_to_status(&global_status_var, &status_var);
378
435
 
379
 
  if (client->isConnected())
 
436
  if (drizzleclient_vio_ok())
380
437
  {
381
438
    if (global_system_variables.log_warnings)
382
 
        errmsg_printf(ERRMSG_LVL_WARN, ER(ER_FORCING_CLOSE),internal::my_progname,
 
439
        errmsg_printf(ERRMSG_LVL_WARN, ER(ER_FORCING_CLOSE),my_progname,
383
440
                      thread_id,
384
 
                      (getSecurityContext().getUser().c_str() ?
385
 
                       getSecurityContext().getUser().c_str() : ""));
 
441
                      (security_ctx.user.c_str() ?
 
442
                       security_ctx.user.c_str() : ""));
386
443
    disconnect(0, false);
387
444
  }
388
445
 
389
446
  /* Close connection */
390
 
  client->close();
391
 
  delete client;
392
 
 
 
447
  if (net.vio)
 
448
  {
 
449
    drizzleclient_net_close(&net);
 
450
    drizzleclient_net_end(&net);
 
451
  }
393
452
  if (cleanup_done == false)
394
453
    cleanup();
395
454
 
396
 
  plugin::StorageEngine::closeConnection(this);
 
455
  ha_close_connection(this);
397
456
  plugin_sessionvar_cleanup(this);
398
457
 
399
 
  warn_root.free_root(MYF(0));
 
458
  if (db)
 
459
  {
 
460
    free(db);
 
461
    db= NULL;
 
462
  }
 
463
  free_root(&warn_root,MYF(0));
 
464
  free_root(&transaction.mem_root,MYF(0));
400
465
  mysys_var=0;                                  // Safety (shouldn't be needed)
401
466
  dbug_sentry= Session_SENTRY_GONE;
402
467
 
403
 
  main_mem_root.free_root(MYF(0));
404
 
  currentMemRoot().release();
405
 
  currentSession().release();
406
 
 
407
 
  plugin::Logging::postEndDo(this);
408
 
  plugin::EventObserver::deregisterSessionEvents(*this); 
409
 
 
410
 
  for (PropertyMap::iterator iter= life_properties.begin(); iter != life_properties.end(); iter++)
411
 
  {
412
 
    delete (*iter).second;
413
 
  }
414
 
  life_properties.clear();
415
 
}
416
 
 
417
 
void Session::setClient(plugin::Client *client_arg)
418
 
{
419
 
  client= client_arg;
420
 
  client->setSession(this);
421
 
}
422
 
 
423
 
void Session::awake(Session::killed_state_t state_to_set)
424
 
{
425
 
  if ((state_to_set == Session::KILL_QUERY) and (command == COM_SLEEP))
426
 
    return;
427
 
 
428
 
  this->checkSentry();
429
 
 
430
 
  setKilled(state_to_set);
431
 
  scheduler->killSession(this);
432
 
 
 
468
  free_root(&main_mem_root, MYF(0));
 
469
  pthread_setspecific(THR_Session,  0);
 
470
 
 
471
 
 
472
  /* Ensure that no one is using Session */
 
473
  pthread_mutex_unlock(&LOCK_delete);
 
474
  pthread_mutex_destroy(&LOCK_delete);
 
475
}
 
476
 
 
477
 
 
478
/*
 
479
  Add all status variables to another status variable array
 
480
 
 
481
  SYNOPSIS
 
482
   add_to_status()
 
483
   to_var       add to this array
 
484
   from_var     from this array
 
485
 
 
486
  NOTES
 
487
    This function assumes that all variables are long/ulong.
 
488
    If this assumption will change, then we have to explictely add
 
489
    the other variables after the while loop
 
490
*/
 
491
 
 
492
void add_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var)
 
493
{
 
494
  ulong *end= (ulong*) ((unsigned char*) to_var +
 
495
                        offsetof(STATUS_VAR, last_system_status_var) +
 
496
                        sizeof(ulong));
 
497
  ulong *to= (ulong*) to_var, *from= (ulong*) from_var;
 
498
 
 
499
  while (to != end)
 
500
    *(to++)+= *(from++);
 
501
}
 
502
 
 
503
/*
 
504
  Add the difference between two status variable arrays to another one.
 
505
 
 
506
  SYNOPSIS
 
507
    add_diff_to_status
 
508
    to_var       add to this array
 
509
    from_var     from this array
 
510
    dec_var      minus this array
 
511
 
 
512
  NOTE
 
513
    This function assumes that all variables are long/ulong.
 
514
*/
 
515
 
 
516
void add_diff_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var,
 
517
                        STATUS_VAR *dec_var)
 
518
{
 
519
  ulong *end= (ulong*) ((unsigned char*) to_var + offsetof(STATUS_VAR,
 
520
                                                  last_system_status_var) +
 
521
                        sizeof(ulong));
 
522
  ulong *to= (ulong*) to_var, *from= (ulong*) from_var, *dec= (ulong*) dec_var;
 
523
 
 
524
  while (to != end)
 
525
    *(to++)+= *(from++) - *(dec++);
 
526
}
 
527
 
 
528
 
 
529
void Session::awake(Session::killed_state state_to_set)
 
530
{
 
531
  Session_CHECK_SENTRY(this);
 
532
  safe_mutex_assert_owner(&LOCK_delete);
 
533
  Scheduler &thread_scheduler= get_thread_scheduler();
 
534
 
 
535
  killed= state_to_set;
433
536
  if (state_to_set != Session::KILL_QUERY)
434
537
  {
435
 
    DRIZZLE_CONNECTION_DONE(thread_id);
 
538
    thread_scheduler.post_kill_notification(this);
436
539
  }
437
 
 
438
540
  if (mysys_var)
439
541
  {
440
 
    boost_unique_lock_t scopedLock(mysys_var->mutex);
 
542
    pthread_mutex_lock(&mysys_var->mutex);
 
543
    if (!system_thread)         // Don't abort locks
 
544
      mysys_var->abort=1;
441
545
    /*
442
 
      "
443
546
      This broadcast could be up in the air if the victim thread
444
547
      exits the cond in the time between read and broadcast, but that is
445
548
      ok since all we want to do is to make the victim thread get out
460
563
    */
461
564
    if (mysys_var->current_cond && mysys_var->current_mutex)
462
565
    {
463
 
      mysys_var->current_mutex->lock();
464
 
      mysys_var->current_cond->notify_all();
465
 
      mysys_var->current_mutex->unlock();
 
566
      pthread_mutex_lock(mysys_var->current_mutex);
 
567
      pthread_cond_broadcast(mysys_var->current_cond);
 
568
      pthread_mutex_unlock(mysys_var->current_mutex);
466
569
    }
 
570
    pthread_mutex_unlock(&mysys_var->mutex);
467
571
  }
 
572
  return;
468
573
}
469
574
 
470
575
/*
471
576
  Remember the location of thread info, the structure needed for
472
 
  memory::sql_alloc() and the structure for the net buffer
 
577
  sql_alloc() and the structure for the net buffer
473
578
*/
474
 
bool Session::storeGlobals()
 
579
bool Session::store_globals()
475
580
{
476
581
  /*
477
582
    Assert that thread_stack is initialized: it's necessary to be able
479
584
  */
480
585
  assert(thread_stack);
481
586
 
482
 
  currentSession().release();
483
 
  currentSession().reset(this);
484
 
 
485
 
  currentMemRoot().release();
486
 
  currentMemRoot().reset(&mem_root);
487
 
 
 
587
  if (pthread_setspecific(THR_Session,  this) ||
 
588
      pthread_setspecific(THR_Mem_root, &mem_root))
 
589
    return 1;
488
590
  mysys_var=my_thread_var;
489
591
 
490
592
  /*
492
594
    This allows us to move Session to different threads if needed.
493
595
  */
494
596
  mysys_var->id= thread_id;
 
597
  real_id= pthread_self();                      // For debugging
495
598
 
496
599
  /*
497
600
    We have to call thr_lock_info_init() again here as Session may have been
498
601
    created in another thread
499
602
  */
500
 
  lock_info.init();
501
 
 
502
 
  return false;
 
603
  thr_lock_info_init(&lock_info);
 
604
  return 0;
503
605
}
504
606
 
505
 
/*
506
 
  Init Session for query processing.
507
 
  This has to be called once before we call mysql_parse.
508
 
  See also comments in session.h.
509
 
*/
510
 
 
511
607
void Session::prepareForQueries()
512
608
{
513
609
  if (variables.max_join_size == HA_POS_ERROR)
514
610
    options |= OPTION_BIG_SELECTS;
 
611
  if (client_capabilities & CLIENT_COMPRESS)
 
612
    net.compress= true;
515
613
 
516
614
  version= refresh_version;
517
615
  set_proc_info(NULL);
518
616
  command= COM_SLEEP;
519
617
  set_time();
 
618
  init_for_queries();
520
619
 
521
 
  mem_root->reset_root_defaults(variables.query_alloc_block_size,
522
 
                                variables.query_prealloc_size);
523
 
  transaction.xid_state.xid.null();
524
 
  transaction.xid_state.in_session=1;
525
 
  if (use_usage)
526
 
    resetUsage();
 
620
  /* In the past this would only run of the user did not have SUPER_ACL */
 
621
  if (sys_init_connect.value_length)
 
622
  {
 
623
    execute_init_command(this, &sys_init_connect, &LOCK_sys_init_connect);
 
624
    if (is_error())
 
625
    {
 
626
      Security_context *sctx= &security_ctx;
 
627
      killed= Session::KILL_CONNECTION;
 
628
      errmsg_printf(ERRMSG_LVL_WARN
 
629
                  , ER(ER_NEW_ABORTING_CONNECTION)
 
630
                  , thread_id
 
631
                  , (db ? db : "unconnected")
 
632
                  , sctx->user.empty() == false ? sctx->user.c_str() : "unauthenticated"
 
633
                  , sctx->ip.c_str(), "init_connect command failed");
 
634
      errmsg_printf(ERRMSG_LVL_WARN, "%s", main_da.message());
 
635
    }
 
636
    set_proc_info(NULL);
 
637
    set_time();
 
638
    init_for_queries();
 
639
  }
527
640
}
528
641
 
529
642
bool Session::initGlobals()
530
643
{
531
 
  if (storeGlobals())
 
644
  if (store_globals())
532
645
  {
533
646
    disconnect(ER_OUT_OF_RESOURCES, true);
534
 
    status_var.aborted_connects++;
535
 
    return true;
536
 
  }
537
 
  return false;
538
 
}
539
 
 
540
 
void Session::run()
541
 
{
542
 
  if (initGlobals() || authenticate())
543
 
  {
544
 
    disconnect(0, true);
545
 
    return;
546
 
  }
547
 
 
548
 
  prepareForQueries();
549
 
 
550
 
  while (not client->haveError() && getKilled() != KILL_CONNECTION)
551
 
  {
552
 
    if (not executeStatement())
553
 
      break;
554
 
  }
555
 
 
556
 
  disconnect(0, true);
557
 
}
558
 
 
559
 
bool Session::schedule(Session::shared_ptr &arg)
560
 
{
561
 
  arg->scheduler= plugin::Scheduler::getScheduler();
562
 
  assert(arg->scheduler);
563
 
 
564
 
  connection_count.increment();
565
 
 
566
 
  if (connection_count > current_global_counters.max_used_connections)
567
 
  {
568
 
    current_global_counters.max_used_connections= connection_count;
569
 
  }
570
 
 
571
 
  current_global_counters.connections++;
572
 
  arg->thread_id= arg->variables.pseudo_thread_id= global_thread_id++;
573
 
 
574
 
  session::Cache::singleton().insert(arg);
575
 
 
576
 
  if (unlikely(plugin::EventObserver::connectSession(*arg)))
577
 
  {
578
 
    // We should do something about an error...
579
 
  }
580
 
 
581
 
  if (plugin::Scheduler::getScheduler()->addSession(arg))
582
 
  {
583
 
    DRIZZLE_CONNECTION_START(arg->getSessionId());
584
 
    char error_message_buff[DRIZZLE_ERRMSG_SIZE];
585
 
 
586
 
    arg->setKilled(Session::KILL_CONNECTION);
587
 
 
588
 
    arg->status_var.aborted_connects++;
589
 
 
590
 
    /* Can't use my_error() since store_globals has not been called. */
591
 
    /* TODO replace will better error message */
592
 
    snprintf(error_message_buff, sizeof(error_message_buff),
593
 
             ER(ER_CANT_CREATE_THREAD), 1);
594
 
    arg->client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
595
 
 
596
 
    return true;
597
 
  }
598
 
 
599
 
  return false;
600
 
}
601
 
 
602
 
 
603
 
/*
604
 
  Is this session viewable by the current user?
605
 
*/
606
 
bool Session::isViewable() const
607
 
{
608
 
  return plugin::Authorization::isAuthorized(current_session->getSecurityContext(),
609
 
                                             this,
610
 
                                             false);
611
 
}
612
 
 
613
 
 
614
 
const char* Session::enter_cond(boost::condition_variable_any &cond, boost::mutex &mutex, const char* msg)
615
 
{
616
 
  const char* old_msg = get_proc_info();
617
 
  safe_mutex_assert_owner(mutex);
618
 
  mysys_var->current_mutex = &mutex;
619
 
  mysys_var->current_cond = &cond;
620
 
  this->set_proc_info(msg);
621
 
  return old_msg;
622
 
}
623
 
 
624
 
void Session::exit_cond(const char* old_msg)
625
 
{
626
 
  /*
627
 
    Putting the mutex unlock in exit_cond() ensures that
628
 
    mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
629
 
    locked (if that would not be the case, you'll get a deadlock if someone
630
 
    does a Session::awake() on you).
631
 
  */
632
 
  mysys_var->current_mutex->unlock();
633
 
  boost_unique_lock_t scopedLock(mysys_var->mutex);
634
 
  mysys_var->current_mutex = 0;
635
 
  mysys_var->current_cond = 0;
636
 
  this->set_proc_info(old_msg);
 
647
    statistic_increment(aborted_connects, &LOCK_status);
 
648
    Scheduler &thread_scheduler= get_thread_scheduler();
 
649
    thread_scheduler.end_thread(this, 0);
 
650
    return false;
 
651
  }
 
652
  return true;
637
653
}
638
654
 
639
655
bool Session::authenticate()
640
656
{
641
 
  lex->start(this);
642
 
  if (client->authenticate())
 
657
  /* Use "connect_timeout" value during connection phase */
 
658
  drizzleclient_net_set_read_timeout(&net, connect_timeout);
 
659
  drizzleclient_net_set_write_timeout(&net, connect_timeout);
 
660
 
 
661
  lex_start(this);
 
662
 
 
663
  bool connection_is_valid= check_connection();
 
664
  drizzleclient_net_end_statement(this);
 
665
 
 
666
  if (! connection_is_valid)
 
667
  {     
 
668
    /* We got wrong permissions from check_connection() */
 
669
    statistic_increment(aborted_connects, &LOCK_status);
643
670
    return false;
644
 
 
645
 
  status_var.aborted_connects++;
646
 
 
 
671
  }
 
672
 
 
673
  /* Connect completed, set read/write timeouts back to default */
 
674
  drizzleclient_net_set_read_timeout(&net, variables.net_read_timeout);
 
675
  drizzleclient_net_set_write_timeout(&net, variables.net_write_timeout);
647
676
  return true;
648
677
}
649
678
 
650
 
bool Session::checkUser(const std::string &passwd_str,
651
 
                        const std::string &in_db)
652
 
{
653
 
  bool is_authenticated=
654
 
    plugin::Authentication::isAuthenticated(getSecurityContext(),
655
 
                                            passwd_str);
 
679
bool Session::check_connection()
 
680
{
 
681
  uint32_t pkt_len= 0;
 
682
  char *end;
 
683
 
 
684
  // TCP/IP connection
 
685
  {
 
686
    char ip[NI_MAXHOST];
 
687
 
 
688
    if (drizzleclient_net_peer_addr(&net, ip, &peer_port, NI_MAXHOST))
 
689
    {
 
690
      my_error(ER_BAD_HOST_ERROR, MYF(0), security_ctx.ip.c_str());
 
691
      return false;
 
692
    }
 
693
 
 
694
    security_ctx.ip.assign(ip);
 
695
  }
 
696
  drizzleclient_net_keepalive(&net, true);
 
697
 
 
698
  uint32_t server_capabilites;
 
699
  {
 
700
    /* buff[] needs to big enough to hold the server_version variable */
 
701
    char buff[SERVER_VERSION_LENGTH + SCRAMBLE_LENGTH + 64];
 
702
 
 
703
    server_capabilites= CLIENT_BASIC_FLAGS;
 
704
 
 
705
#ifdef HAVE_COMPRESS
 
706
    server_capabilites|= CLIENT_COMPRESS;
 
707
#endif /* HAVE_COMPRESS */
 
708
 
 
709
    end= buff + strlen(server_version);
 
710
    if ((end - buff) >= SERVER_VERSION_LENGTH)
 
711
      end= buff + (SERVER_VERSION_LENGTH - 1);
 
712
    memcpy(buff, server_version, end - buff);
 
713
    *end= 0;
 
714
    end++;
 
715
 
 
716
    int4store((unsigned char*) end, thread_id);
 
717
    end+= 4;
 
718
    /*
 
719
      So as check_connection is the only entry point to authorization
 
720
      procedure, scramble is set here. This gives us new scramble for
 
721
      each handshake.
 
722
    */
 
723
    drizzleclient_create_random_string(scramble, SCRAMBLE_LENGTH, &rand);
 
724
    /*
 
725
      Old clients does not understand long scrambles, but can ignore packet
 
726
      tail: that's why first part of the scramble is placed here, and second
 
727
      part at the end of packet.
 
728
    */
 
729
    end= strncpy(end, scramble, SCRAMBLE_LENGTH_323);
 
730
    end+= SCRAMBLE_LENGTH_323;
 
731
 
 
732
    *end++= 0; /* an empty byte for some reason */
 
733
 
 
734
    int2store(end, server_capabilites);
 
735
    /* write server characteristics: up to 16 bytes allowed */
 
736
    end[2]=(char) default_charset_info->number;
 
737
    int2store(end+3, server_status);
 
738
    memset(end+5, 0, 13);
 
739
    end+= 18;
 
740
    /* write scramble tail */
 
741
    size_t scramble_len= SCRAMBLE_LENGTH - SCRAMBLE_LENGTH_323;
 
742
    end= strncpy(end, scramble + SCRAMBLE_LENGTH_323, scramble_len);
 
743
    end+= scramble_len;
 
744
 
 
745
    *end++= 0; /* an empty byte for some reason */
 
746
 
 
747
    /* At this point we write connection message and read reply */
 
748
    if (drizzleclient_net_write_command(&net
 
749
          , (unsigned char) protocol_version
 
750
          , (unsigned char*) ""
 
751
          , 0
 
752
          , (unsigned char*) buff
 
753
          , (size_t) (end-buff)) 
 
754
        ||      (pkt_len= drizzleclient_net_read(&net)) == packet_error 
 
755
        || pkt_len < MIN_HANDSHAKE_SIZE)
 
756
    {
 
757
      my_error(ER_HANDSHAKE_ERROR, MYF(0), security_ctx.ip.c_str());
 
758
      return false;
 
759
    }
 
760
  }
 
761
  if (packet.alloc(variables.net_buffer_length))
 
762
    return false; /* The error is set by alloc(). */
 
763
 
 
764
  client_capabilities= uint2korr(net.read_pos);
 
765
 
 
766
 
 
767
  client_capabilities|= ((uint32_t) uint2korr(net.read_pos + 2)) << 16;
 
768
  max_client_packet_length= uint4korr(net.read_pos + 4);
 
769
  update_charset();
 
770
  end= (char*) net.read_pos + 32;
 
771
 
 
772
  /*
 
773
    Disable those bits which are not supported by the server.
 
774
    This is a precautionary measure, if the client lies. See Bug#27944.
 
775
  */
 
776
  client_capabilities&= server_capabilites;
 
777
 
 
778
  if (end >= (char*) net.read_pos + pkt_len + 2)
 
779
  {
 
780
    my_error(ER_HANDSHAKE_ERROR, MYF(0), security_ctx.ip.c_str());
 
781
    return false;
 
782
  }
 
783
 
 
784
  net.return_status= &server_status;
 
785
 
 
786
  char *user= end;
 
787
  char *passwd= strchr(user, '\0')+1;
 
788
  uint32_t user_len= passwd - user - 1;
 
789
  char *l_db= passwd;
 
790
  char db_buff[NAME_LEN + 1];           // buffer to store db in utf8
 
791
  char user_buff[USERNAME_LENGTH + 1];  // buffer to store user in utf8
 
792
  uint32_t dummy_errors;
 
793
 
 
794
  /*
 
795
    Old clients send null-terminated string as password; new clients send
 
796
    the size (1 byte) + string (not null-terminated). Hence in case of empty
 
797
    password both send '\0'.
 
798
 
 
799
    This strlen() can't be easily deleted without changing protocol.
 
800
 
 
801
    Cast *passwd to an unsigned char, so that it doesn't extend the sign for
 
802
    *passwd > 127 and become 2**32-127+ after casting to uint.
 
803
  */
 
804
  uint32_t passwd_len= client_capabilities & CLIENT_SECURE_CONNECTION ?
 
805
    (unsigned char)(*passwd++) : strlen(passwd);
 
806
  l_db= client_capabilities & CLIENT_CONNECT_WITH_DB ? l_db + passwd_len + 1 : 0;
 
807
 
 
808
  /* strlen() can't be easily deleted without changing protocol */
 
809
  uint32_t db_len= l_db ? strlen(l_db) : 0;
 
810
 
 
811
  if (passwd + passwd_len + db_len > (char *) net.read_pos + pkt_len)
 
812
  {
 
813
    my_error(ER_HANDSHAKE_ERROR, MYF(0), security_ctx.ip.c_str());
 
814
    return false;
 
815
  }
 
816
 
 
817
  /* Since 4.1 all database names are stored in utf8 */
 
818
  if (l_db)
 
819
  {
 
820
    db_buff[copy_and_convert(db_buff, sizeof(db_buff)-1,
 
821
                             system_charset_info,
 
822
                             l_db, db_len,
 
823
                             charset(), &dummy_errors)]= 0;
 
824
    l_db= db_buff;
 
825
  }
 
826
 
 
827
  user_buff[user_len= copy_and_convert(user_buff, sizeof(user_buff)-1,
 
828
                                       system_charset_info, user, user_len,
 
829
                                       charset(), &dummy_errors)]= '\0';
 
830
  user= user_buff;
 
831
 
 
832
  /* If username starts and ends in "'", chop them off */
 
833
  if (user_len > 1 && user[0] == '\'' && user[user_len - 1] == '\'')
 
834
  {
 
835
    user[user_len-1]= 0;
 
836
    user++;
 
837
    user_len-= 2;
 
838
  }
 
839
 
 
840
  security_ctx.user.assign(user);
 
841
 
 
842
  return check_user(passwd, passwd_len, l_db);
 
843
}
 
844
 
 
845
bool Session::check_user(const char *passwd, uint32_t passwd_len, const char *in_db)
 
846
{
 
847
  LEX_STRING db_str= { (char *) in_db, in_db ? strlen(in_db) : 0 };
 
848
  bool is_authenticated;
 
849
 
 
850
  /*
 
851
    Clear session->db as it points to something, that will be freed when
 
852
    connection is closed. We don't want to accidentally free a wrong
 
853
    pointer if connect failed. Also in case of 'CHANGE USER' failure,
 
854
    current database will be switched to 'no database selected'.
 
855
  */
 
856
  reset_db(NULL, 0);
 
857
 
 
858
  if (passwd_len != 0 && passwd_len != SCRAMBLE_LENGTH)
 
859
  {
 
860
    my_error(ER_HANDSHAKE_ERROR, MYF(0), security_ctx.ip.c_str());
 
861
    return false;
 
862
  }
 
863
 
 
864
  is_authenticated= authenticate_user(this, passwd);
656
865
 
657
866
  if (is_authenticated != true)
658
867
  {
659
 
    status_var.access_denied++;
660
 
    /* isAuthenticated has pushed the error message */
 
868
    my_error(ER_ACCESS_DENIED_ERROR, MYF(0),
 
869
             security_ctx.user.c_str(),
 
870
             security_ctx.ip.c_str(),
 
871
             passwd_len ? ER(ER_YES) : ER(ER_NO));
 
872
 
661
873
    return false;
662
874
  }
663
875
 
 
876
  security_ctx.skip_grants();
 
877
 
664
878
  /* Change database if necessary */
665
 
  if (not in_db.empty())
 
879
  if (in_db && in_db[0])
666
880
  {
667
 
    SchemaIdentifier identifier(in_db);
668
 
    if (mysql_change_db(this, identifier))
 
881
    if (mysql_change_db(this, &db_str, false))
669
882
    {
670
883
      /* mysql_change_db() has pushed the error message. */
671
884
      return false;
672
885
    }
673
886
  }
674
887
  my_ok();
675
 
  password= not passwd_str.empty();
 
888
  password= test(passwd_len);          // remember for error messages
676
889
 
677
890
  /* Ready to handle queries */
678
891
  return true;
680
893
 
681
894
bool Session::executeStatement()
682
895
{
 
896
  bool return_value;
683
897
  char *l_packet= 0;
684
898
  uint32_t packet_length;
685
899
 
690
904
    (see my_message_sql)
691
905
  */
692
906
  lex->current_select= 0;
693
 
  clear_error();
 
907
 
 
908
  /*
 
909
    This thread will do a blocking read from the client which
 
910
    will be interrupted when the next command is received from
 
911
    the client, the connection is closed or "net_wait_timeout"
 
912
    number of seconds has passed
 
913
  */
 
914
  drizzleclient_net_set_read_timeout(&net, variables.net_wait_timeout);
 
915
 
 
916
  /*
 
917
    XXX: this code is here only to clear possible errors of init_connect.
 
918
    Consider moving to init_connect() instead.
 
919
  */
 
920
  clear_error();                                // Clear error message
694
921
  main_da.reset_diagnostics_area();
695
922
 
696
 
  if (client->readCommand(&l_packet, &packet_length) == false)
697
 
  {
698
 
    return false;
699
 
  }
700
 
 
701
 
  if (getKilled() == KILL_CONNECTION)
702
 
    return false;
703
 
 
704
 
  if (packet_length == 0)
705
 
    return true;
706
 
 
707
 
  l_command= static_cast<enum_server_command>(l_packet[0]);
 
923
  net_new_transaction(&net);
 
924
 
 
925
  packet_length= drizzleclient_net_read(&net);
 
926
  if (packet_length == packet_error)
 
927
  {
 
928
    /* Check if we can continue without closing the connection */
 
929
 
 
930
    if(net.last_errno== CR_NET_PACKET_TOO_LARGE)
 
931
      my_error(ER_NET_PACKET_TOO_LARGE, MYF(0));
 
932
    /* Assert is invalid for dirty connection shutdown
 
933
     *     assert(session->is_error());
 
934
     */
 
935
    drizzleclient_net_end_statement(this);
 
936
 
 
937
    if (net.error != 3)
 
938
    {
 
939
      return_value= false;                       // We have to close it.
 
940
      goto out;
 
941
    }
 
942
 
 
943
    net.error= 0;
 
944
    return_value= true;
 
945
    goto out;
 
946
  }
 
947
 
 
948
  l_packet= (char*) net.read_pos;
 
949
  /*
 
950
    'packet_length' contains length of data, as it was stored in packet
 
951
    header. In case of malformed header, drizzleclient_net_read returns zero.
 
952
    If packet_length is not zero, drizzleclient_net_read ensures that the returned
 
953
    number of bytes was actually read from network.
 
954
    There is also an extra safety measure in drizzleclient_net_read:
 
955
    it sets packet[packet_length]= 0, but only for non-zero packets.
 
956
  */
 
957
  if (packet_length == 0)                       /* safety */
 
958
  {
 
959
    /* Initialize with COM_SLEEP packet */
 
960
    l_packet[0]= (unsigned char) COM_SLEEP;
 
961
    packet_length= 1;
 
962
  }
 
963
  /* Do not rely on drizzleclient_net_read, extra safety against programming errors. */
 
964
  l_packet[packet_length]= '\0';                  /* safety */
 
965
 
 
966
  l_command= (enum enum_server_command) (unsigned char) l_packet[0];
708
967
 
709
968
  if (command >= COM_END)
710
969
    command= COM_END;                           // Wrong command
711
970
 
 
971
  /* Restore read timeout value */
 
972
  drizzleclient_net_set_read_timeout(&net, variables.net_read_timeout);
 
973
 
712
974
  assert(packet_length);
713
 
  return not dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
 
975
  return_value= ! dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
 
976
 
 
977
out:
 
978
  return return_value;
714
979
}
715
980
 
716
981
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
722
987
    in_packet_length--;
723
988
  }
724
989
  const char *pos= in_packet + in_packet_length; /* Point at end null */
725
 
  while (in_packet_length > 0 && (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
 
990
  while (in_packet_length > 0 &&
 
991
         (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
726
992
  {
727
993
    pos--;
728
994
    in_packet_length--;
729
995
  }
730
996
 
731
 
  std::string *new_query= new std::string(in_packet, in_packet + in_packet_length);
732
 
  // We can not be entirely sure _schema has a value
733
 
  if (_schema)
734
 
  {
735
 
    plugin::QueryRewriter::rewriteQuery(*_schema, *new_query);
736
 
  }
737
 
  query.reset(new_query);
738
 
  _state.reset(new State(in_packet, in_packet_length));
 
997
  /* We must allocate some extra memory for the cached query string */
 
998
  query_length= 0; /* Extra safety: Avoid races */
 
999
  query= (char*) memdup_w_gap((unsigned char*) in_packet, in_packet_length, db_length + 1);
 
1000
  if (! query)
 
1001
    return false;
 
1002
 
 
1003
  query[in_packet_length]=0;
 
1004
  query_length= in_packet_length;
 
1005
 
 
1006
  /* Reclaim some memory */
 
1007
  packet.shrink(variables.net_buffer_length);
 
1008
  convert_buffer.shrink(variables.net_buffer_length);
739
1009
 
740
1010
  return true;
741
1011
}
744
1014
{
745
1015
  bool do_release= 0;
746
1016
  bool result= true;
747
 
  TransactionServices &transaction_services= TransactionServices::singleton();
748
1017
 
749
1018
  if (transaction.xid_state.xa_state != XA_NOTR)
750
1019
  {
751
1020
    my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
752
1021
    return false;
753
1022
  }
754
 
  switch (completion)
 
1023
  switch (completion) 
755
1024
  {
756
1025
    case COMMIT:
757
1026
      /*
760
1029
       * (Which of course should never happen...)
761
1030
       */
762
1031
      server_status&= ~SERVER_STATUS_IN_TRANS;
763
 
      if (transaction_services.commitTransaction(this, true))
 
1032
      if (ha_commit(this))
764
1033
        result= false;
765
 
      options&= ~(OPTION_BEGIN);
 
1034
      options&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
 
1035
      transaction.all.modified_non_trans_table= false;
766
1036
      break;
767
1037
    case COMMIT_RELEASE:
768
1038
      do_release= 1; /* fall through */
777
1047
    case ROLLBACK_AND_CHAIN:
778
1048
    {
779
1049
      server_status&= ~SERVER_STATUS_IN_TRANS;
780
 
      if (transaction_services.rollbackTransaction(this, true))
 
1050
      if (ha_rollback(this))
781
1051
        result= false;
782
 
      options&= ~(OPTION_BEGIN);
 
1052
      options&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
 
1053
      transaction.all.modified_non_trans_table= false;
783
1054
      if (result == true && (completion == ROLLBACK_AND_CHAIN))
784
1055
        result= startTransaction();
785
1056
      break;
790
1061
  }
791
1062
 
792
1063
  if (result == false)
793
 
  {
794
1064
    my_error(killed_errno(), MYF(0));
795
 
  }
796
1065
  else if ((result == true) && do_release)
797
 
  {
798
 
    setKilled(Session::KILL_CONNECTION);
799
 
  }
 
1066
    killed= Session::KILL_CONNECTION;
800
1067
 
801
1068
  return result;
802
1069
}
804
1071
bool Session::endActiveTransaction()
805
1072
{
806
1073
  bool result= true;
807
 
  TransactionServices &transaction_services= TransactionServices::singleton();
808
1074
 
809
1075
  if (transaction.xid_state.xa_state != XA_NOTR)
810
1076
  {
811
1077
    my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
812
1078
    return false;
813
1079
  }
814
 
  if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
 
1080
  if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN | OPTION_TABLE_LOCK))
815
1081
  {
 
1082
    /* Safety if one did "drop table" on locked tables */
 
1083
    if (! locked_tables)
 
1084
      options&= ~OPTION_TABLE_LOCK;
816
1085
    server_status&= ~SERVER_STATUS_IN_TRANS;
817
 
    if (transaction_services.commitTransaction(this, true))
 
1086
    if (ha_commit(this))
818
1087
      result= false;
819
1088
  }
820
 
  options&= ~(OPTION_BEGIN);
 
1089
  options&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
 
1090
  transaction.all.modified_non_trans_table= false;
821
1091
  return result;
822
1092
}
823
1093
 
824
 
bool Session::startTransaction(start_transaction_option_t opt)
 
1094
bool Session::startTransaction()
825
1095
{
826
1096
  bool result= true;
827
1097
 
 
1098
  if (locked_tables)
 
1099
  {
 
1100
    lock= locked_tables;
 
1101
    locked_tables= 0;                   // Will be automatically closed
 
1102
    close_thread_tables(this);                  // Free tables
 
1103
  }
828
1104
  if (! endActiveTransaction())
829
 
  {
830
1105
    result= false;
831
 
  }
832
1106
  else
833
1107
  {
834
1108
    options|= OPTION_BEGIN;
835
1109
    server_status|= SERVER_STATUS_IN_TRANS;
836
 
 
837
 
    if (plugin::TransactionalStorageEngine::notifyStartTransaction(this, opt))
838
 
    {
839
 
      result= false;
840
 
    }
 
1110
    if (lex->start_transaction_opt & DRIZZLE_START_TRANS_OPT_WITH_CONS_SNAPSHOT)
 
1111
      if (ha_start_consistent_snapshot(this))
 
1112
        result= false;
841
1113
  }
842
 
 
843
1114
  return result;
844
1115
}
845
1116
 
 
1117
/*
 
1118
  Cleanup after query.
 
1119
 
 
1120
  SYNOPSIS
 
1121
    Session::cleanup_after_query()
 
1122
 
 
1123
  DESCRIPTION
 
1124
    This function is used to reset thread data to its default state.
 
1125
 
 
1126
  NOTE
 
1127
    This function is not suitable for setting thread data to some
 
1128
    non-default values, as there is only one replication thread, so
 
1129
    different master threads may overwrite data of each other on
 
1130
    slave.
 
1131
*/
846
1132
void Session::cleanup_after_query()
847
1133
{
848
1134
  /*
856
1142
  if (first_successful_insert_id_in_cur_stmt > 0)
857
1143
  {
858
1144
    /* set what LAST_INSERT_ID() will return */
859
 
    first_successful_insert_id_in_prev_stmt= first_successful_insert_id_in_cur_stmt;
 
1145
    first_successful_insert_id_in_prev_stmt=
 
1146
      first_successful_insert_id_in_cur_stmt;
860
1147
    first_successful_insert_id_in_cur_stmt= 0;
861
1148
    substitute_null_with_insert_id= true;
862
1149
  }
863
 
  arg_of_last_insert_id_function= false;
 
1150
  arg_of_last_insert_id_function= 0;
864
1151
  /* Free Items that were created during this execution */
865
1152
  free_items();
866
1153
  /* Reset where. */
867
1154
  where= Session::DEFAULT_WHERE;
868
 
 
869
 
  /* Reset the temporary shares we built */
870
 
  for_each(temporary_shares.begin(),
871
 
           temporary_shares.end(),
872
 
           DeletePtr());
873
 
  temporary_shares.clear();
874
1155
}
875
1156
 
 
1157
 
876
1158
/**
877
1159
  Create a LEX_STRING in this connection.
878
1160
 
884
1166
  @return  NULL on failure, or pointer to the LEX_STRING object
885
1167
*/
886
1168
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
887
 
                                     const std::string &str,
888
 
                                     bool allocate_lex_string)
889
 
{
890
 
  return make_lex_string(lex_str, str.c_str(), str.length(), allocate_lex_string);
891
 
}
892
 
 
893
 
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
894
 
                                     const char* str, uint32_t length,
895
 
                                     bool allocate_lex_string)
 
1169
                                 const char* str, uint32_t length,
 
1170
                                 bool allocate_lex_string)
896
1171
{
897
1172
  if (allocate_lex_string)
898
1173
    if (!(lex_str= (LEX_STRING *)alloc(sizeof(LEX_STRING))))
899
1174
      return 0;
900
 
  if (!(lex_str->str= mem_root->strmake_root(str, length)))
 
1175
  if (!(lex_str->str= strmake_root(mem_root, str, length)))
901
1176
    return 0;
902
1177
  lex_str->length= length;
903
1178
  return lex_str;
904
1179
}
905
1180
 
 
1181
 
 
1182
/*
 
1183
  Convert a string to another character set
 
1184
 
 
1185
  SYNOPSIS
 
1186
    convert_string()
 
1187
    to                          Store new allocated string here
 
1188
    to_cs                       New character set for allocated string
 
1189
    from                        String to convert
 
1190
    from_length                 Length of string to convert
 
1191
    from_cs                     Original character set
 
1192
 
 
1193
  NOTES
 
1194
    to will be 0-terminated to make it easy to pass to system funcs
 
1195
 
 
1196
  RETURN
 
1197
    0   ok
 
1198
    1   End of memory.
 
1199
        In this case to->str will point to 0 and to->length will be 0.
 
1200
*/
 
1201
 
 
1202
bool Session::convert_string(LEX_STRING *to, const CHARSET_INFO * const to_cs,
 
1203
                         const char *from, uint32_t from_length,
 
1204
                         const CHARSET_INFO * const from_cs)
 
1205
{
 
1206
  size_t new_length= to_cs->mbmaxlen * from_length;
 
1207
  uint32_t dummy_errors;
 
1208
  if (!(to->str= (char*) alloc(new_length+1)))
 
1209
  {
 
1210
    to->length= 0;                              // Safety fix
 
1211
    return(1);                          // EOM
 
1212
  }
 
1213
  to->length= copy_and_convert((char*) to->str, new_length, to_cs,
 
1214
                               from, from_length, from_cs, &dummy_errors);
 
1215
  to->str[to->length]=0;                        // Safety
 
1216
  return(0);
 
1217
}
 
1218
 
 
1219
 
 
1220
/*
 
1221
  Convert string from source character set to target character set inplace.
 
1222
 
 
1223
  SYNOPSIS
 
1224
    Session::convert_string
 
1225
 
 
1226
  DESCRIPTION
 
1227
    Convert string using convert_buffer - buffer for character set
 
1228
    conversion shared between all protocols.
 
1229
 
 
1230
  RETURN
 
1231
    0   ok
 
1232
   !0   out of memory
 
1233
*/
 
1234
 
 
1235
bool Session::convert_string(String *s, const CHARSET_INFO * const from_cs,
 
1236
                         const CHARSET_INFO * const to_cs)
 
1237
{
 
1238
  uint32_t dummy_errors;
 
1239
  if (convert_buffer.copy(s->ptr(), s->length(), from_cs, to_cs, &dummy_errors))
 
1240
    return true;
 
1241
  /* If convert_buffer >> s copying is more efficient long term */
 
1242
  if (convert_buffer.alloced_length() >= convert_buffer.length() * 2 ||
 
1243
      !s->is_alloced())
 
1244
  {
 
1245
    return s->copy(convert_buffer);
 
1246
  }
 
1247
  s->swap(convert_buffer);
 
1248
  return false;
 
1249
}
 
1250
 
 
1251
 
 
1252
/*
 
1253
  Update some cache variables when character set changes
 
1254
*/
 
1255
 
 
1256
void Session::update_charset()
 
1257
{
 
1258
  uint32_t not_used;
 
1259
  charset_is_system_charset= !String::needs_conversion(0,charset(),
 
1260
                                                       system_charset_info,
 
1261
                                                       &not_used);
 
1262
  charset_is_collation_connection=
 
1263
    !String::needs_conversion(0,charset(),variables.getCollation(),
 
1264
                              &not_used);
 
1265
  charset_is_character_set_filesystem=
 
1266
    !String::needs_conversion(0, charset(),
 
1267
                              variables.character_set_filesystem, &not_used);
 
1268
}
 
1269
 
 
1270
 
 
1271
/* routings to adding tables to list of changed in transaction tables */
 
1272
 
 
1273
inline static void list_include(CHANGED_TableList** prev,
 
1274
                                CHANGED_TableList* curr,
 
1275
                                CHANGED_TableList* new_table)
 
1276
{
 
1277
  if (new_table)
 
1278
  {
 
1279
    *prev = new_table;
 
1280
    (*prev)->next = curr;
 
1281
  }
 
1282
}
 
1283
 
 
1284
/* add table to list of changed in transaction tables */
 
1285
 
 
1286
void Session::add_changed_table(Table *table)
 
1287
{
 
1288
  assert((options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
 
1289
              table->file->has_transactions());
 
1290
  add_changed_table(table->s->table_cache_key.str,
 
1291
                    (long) table->s->table_cache_key.length);
 
1292
  return;
 
1293
}
 
1294
 
 
1295
 
 
1296
void Session::add_changed_table(const char *key, long key_length)
 
1297
{
 
1298
  CHANGED_TableList **prev_changed = &transaction.changed_tables;
 
1299
  CHANGED_TableList *curr = transaction.changed_tables;
 
1300
 
 
1301
  for (; curr; prev_changed = &(curr->next), curr = curr->next)
 
1302
  {
 
1303
    int cmp =  (long)curr->key_length - (long)key_length;
 
1304
    if (cmp < 0)
 
1305
    {
 
1306
      list_include(prev_changed, curr, changed_table_dup(key, key_length));
 
1307
      return;
 
1308
    }
 
1309
    else if (cmp == 0)
 
1310
    {
 
1311
      cmp = memcmp(curr->key, key, curr->key_length);
 
1312
      if (cmp < 0)
 
1313
      {
 
1314
        list_include(prev_changed, curr, changed_table_dup(key, key_length));
 
1315
        return;
 
1316
      }
 
1317
      else if (cmp == 0)
 
1318
      {
 
1319
        return;
 
1320
      }
 
1321
    }
 
1322
  }
 
1323
  *prev_changed = changed_table_dup(key, key_length);
 
1324
  return;
 
1325
}
 
1326
 
 
1327
 
 
1328
CHANGED_TableList* Session::changed_table_dup(const char *key, long key_length)
 
1329
{
 
1330
  CHANGED_TableList* new_table =
 
1331
    (CHANGED_TableList*) trans_alloc(ALIGN_SIZE(sizeof(CHANGED_TableList))+
 
1332
                                      key_length + 1);
 
1333
  if (!new_table)
 
1334
  {
 
1335
    my_error(EE_OUTOFMEMORY, MYF(ME_BELL),
 
1336
             ALIGN_SIZE(sizeof(TableList)) + key_length + 1);
 
1337
    killed= KILL_CONNECTION;
 
1338
    return 0;
 
1339
  }
 
1340
 
 
1341
  new_table->key= ((char*)new_table)+ ALIGN_SIZE(sizeof(CHANGED_TableList));
 
1342
  new_table->next = 0;
 
1343
  new_table->key_length = key_length;
 
1344
  ::memcpy(new_table->key, key, key_length);
 
1345
  return new_table;
 
1346
}
 
1347
 
 
1348
 
906
1349
int Session::send_explain_fields(select_result *result)
907
1350
{
908
1351
  List<Item> field_list;
938
1381
  }
939
1382
  item->maybe_null= 1;
940
1383
  field_list.push_back(new Item_empty_string("Extra", 255, cs));
941
 
  return (result->send_fields(field_list));
942
 
}
943
 
 
944
 
void select_result::send_error(uint32_t errcode, const char *err)
945
 
{
946
 
  my_message(errcode, err, MYF(0));
 
1384
  return (result->send_fields(field_list,
 
1385
                              Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF));
947
1386
}
948
1387
 
949
1388
/************************************************************************
955
1394
  my_message(errcode, err, MYF(0));
956
1395
  if (file > 0)
957
1396
  {
958
 
    (void) cache->end_io_cache();
959
 
    (void) internal::my_close(file, MYF(0));
960
 
    (void) internal::my_delete(path.file_string().c_str(), MYF(0));             // Delete file on error
 
1397
    (void) end_io_cache(&cache);
 
1398
    (void) my_close(file,MYF(0));
 
1399
    (void) my_delete(path,MYF(0));              // Delete file on error
961
1400
    file= -1;
962
1401
  }
963
1402
}
965
1404
 
966
1405
bool select_to_file::send_eof()
967
1406
{
968
 
  int error= test(cache->end_io_cache());
969
 
  if (internal::my_close(file, MYF(MY_WME)))
 
1407
  int error= test(end_io_cache(&cache));
 
1408
  if (my_close(file,MYF(MY_WME)))
970
1409
    error= 1;
971
1410
  if (!error)
972
1411
  {
987
1426
  /* In case of error send_eof() may be not called: close the file here. */
988
1427
  if (file >= 0)
989
1428
  {
990
 
    (void) cache->end_io_cache();
991
 
    (void) internal::my_close(file, MYF(0));
 
1429
    (void) end_io_cache(&cache);
 
1430
    (void) my_close(file,MYF(0));
992
1431
    file= -1;
993
1432
  }
994
 
  path= "";
 
1433
  path[0]= '\0';
995
1434
  row_count= 0;
996
1435
}
997
1436
 
998
 
select_to_file::select_to_file(file_exchange *ex)
999
 
  : exchange(ex),
1000
 
    file(-1),
1001
 
    cache(static_cast<internal::IO_CACHE *>(memory::sql_calloc(sizeof(internal::IO_CACHE)))),
1002
 
    row_count(0L)
1003
 
{
1004
 
  path= "";
1005
 
}
1006
1437
 
1007
1438
select_to_file::~select_to_file()
1008
1439
{
1009
 
  cleanup();
 
1440
  if (file >= 0)
 
1441
  {                                     // This only happens in case of error
 
1442
    (void) end_io_cache(&cache);
 
1443
    (void) my_close(file,MYF(0));
 
1444
    file= -1;
 
1445
  }
1010
1446
}
1011
1447
 
1012
1448
/***************************************************************************
1035
1471
*/
1036
1472
 
1037
1473
 
1038
 
static int create_file(Session *session,
1039
 
                       fs::path &target_path,
1040
 
                       file_exchange *exchange,
1041
 
                       internal::IO_CACHE *cache)
 
1474
static File create_file(Session *session, char *path, file_exchange *exchange, IO_CACHE *cache)
1042
1475
{
1043
 
  fs::path to_file(exchange->file_name);
1044
 
  int file;
1045
 
 
1046
 
  if (not to_file.has_root_directory())
 
1476
  File file;
 
1477
  uint32_t option= MY_UNPACK_FILENAME | MY_RELATIVE_PATH;
 
1478
 
 
1479
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
 
1480
  option|= MY_REPLACE_DIR;                      // Force use of db directory
 
1481
#endif
 
1482
 
 
1483
  if (!dirname_length(exchange->file_name))
1047
1484
  {
1048
 
    target_path= fs::system_complete(getDataHomeCatalog());
1049
 
    util::string::const_shared_ptr schema(session->schema());
1050
 
    if (schema and not schema->empty())
1051
 
    {
1052
 
      int count_elements= 0;
1053
 
      for (fs::path::iterator iter= to_file.begin();
1054
 
           iter != to_file.end();
1055
 
           ++iter, ++count_elements)
1056
 
      { }
1057
 
 
1058
 
      if (count_elements == 1)
1059
 
      {
1060
 
        target_path /= *schema;
1061
 
      }
1062
 
    }
1063
 
    target_path /= to_file;
 
1485
    strcpy(path, drizzle_real_data_home);
 
1486
    if (session->db)
 
1487
      strncat(path, session->db, FN_REFLEN-strlen(drizzle_real_data_home)-1);
 
1488
    (void) fn_format(path, exchange->file_name, path, "", option);
1064
1489
  }
1065
1490
  else
1066
 
  {
1067
 
    target_path = exchange->file_name;
1068
 
  }
1069
 
 
1070
 
  if (not secure_file_priv.string().empty())
1071
 
  {
1072
 
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
1073
 
    {
1074
 
      /* Write only allowed to dir or subdir specified by secure_file_priv */
1075
 
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
1076
 
      return -1;
1077
 
    }
1078
 
  }
1079
 
 
1080
 
  if (!access(target_path.file_string().c_str(), F_OK))
 
1491
    (void) fn_format(path, exchange->file_name, drizzle_real_data_home, "", option);
 
1492
 
 
1493
  if (opt_secure_file_priv &&
 
1494
      strncmp(opt_secure_file_priv, path, strlen(opt_secure_file_priv)))
 
1495
  {
 
1496
    /* Write only allowed to dir or subdir specified by secure_file_priv */
 
1497
    my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
1498
    return -1;
 
1499
  }
 
1500
 
 
1501
  if (!access(path, F_OK))
1081
1502
  {
1082
1503
    my_error(ER_FILE_EXISTS_ERROR, MYF(0), exchange->file_name);
1083
1504
    return -1;
1084
1505
  }
1085
1506
  /* Create the file world readable */
1086
 
  if ((file= internal::my_create(target_path.file_string().c_str(), 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
 
1507
  if ((file= my_create(path, 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
1087
1508
    return file;
 
1509
#ifdef HAVE_FCHMOD
1088
1510
  (void) fchmod(file, 0666);                    // Because of umask()
1089
 
  if (cache->init_io_cache(file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
 
1511
#else
 
1512
  (void) chmod(path, 0666);
 
1513
#endif
 
1514
  if (init_io_cache(cache, file, 0L, WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1090
1515
  {
1091
 
    internal::my_close(file, MYF(0));
1092
 
    internal::my_delete(target_path.file_string().c_str(), MYF(0));  // Delete file on error, it was just created
 
1516
    my_close(file, MYF(0));
 
1517
    my_delete(path, MYF(0));  // Delete file on error, it was just created
1093
1518
    return -1;
1094
1519
  }
1095
1520
  return file;
1103
1528
  bool string_results= false, non_string_results= false;
1104
1529
  unit= u;
1105
1530
  if ((uint32_t) strlen(exchange->file_name) + NAME_LEN >= FN_REFLEN)
1106
 
  {
1107
 
    path= exchange->file_name;
1108
 
  }
 
1531
    strncpy(path,exchange->file_name,FN_REFLEN-1);
1109
1532
 
1110
1533
  /* Check if there is any blobs in data */
1111
1534
  {
1115
1538
    {
1116
1539
      if (item->max_length >= MAX_BLOB_WIDTH)
1117
1540
      {
1118
 
        blob_flag=1;
1119
 
        break;
 
1541
        blob_flag=1;
 
1542
        break;
1120
1543
      }
1121
 
 
1122
1544
      if (item->result_type() == STRING_RESULT)
1123
1545
        string_results= true;
1124
1546
      else
1153
1575
    return 1;
1154
1576
  }
1155
1577
 
1156
 
  if ((file= create_file(session, path, exchange, cache)) < 0)
 
1578
  if ((file= create_file(session, path, exchange, &cache)) < 0)
1157
1579
    return 1;
1158
1580
 
1159
1581
  return 0;
1160
1582
}
1161
1583
 
 
1584
 
 
1585
#define NEED_ESCAPING(x) ((int) (unsigned char) (x) == escape_char    || \
 
1586
                          (enclosed ? (int) (unsigned char) (x) == field_sep_char      \
 
1587
                                    : (int) (unsigned char) (x) == field_term_char) || \
 
1588
                          (int) (unsigned char) (x) == line_sep_char  || \
 
1589
                          !(x))
 
1590
 
1162
1591
bool select_export::send_data(List<Item> &items)
1163
1592
{
1164
1593
  char buff[MAX_FIELD_WIDTH],null_buff[2],space[MAX_FIELD_WIDTH];
1169
1598
  if (unit->offset_limit_cnt)
1170
1599
  {                                             // using limit offset,count
1171
1600
    unit->offset_limit_cnt--;
1172
 
    return false;
 
1601
    return(0);
1173
1602
  }
1174
1603
  row_count++;
1175
1604
  Item *item;
1176
1605
  uint32_t used_length=0,items_left=items.elements;
1177
1606
  List_iterator_fast<Item> li(items);
1178
1607
 
1179
 
  if (my_b_write(cache,(unsigned char*) exchange->line_start->ptr(),
1180
 
                 exchange->line_start->length()))
1181
 
    return true;
1182
 
 
 
1608
  if (my_b_write(&cache,(unsigned char*) exchange->line_start->ptr(),
 
1609
                 exchange->line_start->length()))
 
1610
    goto err;
1183
1611
  while ((item=li++))
1184
1612
  {
1185
1613
    Item_result result_type=item->result_type();
1188
1616
    res=item->str_result(&tmp);
1189
1617
    if (res && enclosed)
1190
1618
    {
1191
 
      if (my_b_write(cache,(unsigned char*) exchange->enclosed->ptr(),
1192
 
                     exchange->enclosed->length()))
1193
 
        return true;
 
1619
      if (my_b_write(&cache,(unsigned char*) exchange->enclosed->ptr(),
 
1620
                     exchange->enclosed->length()))
 
1621
        goto err;
1194
1622
    }
1195
1623
    if (!res)
1196
1624
    {                                           // NULL
1197
1625
      if (!fixed_row_size)
1198
1626
      {
1199
 
        if (escape_char != -1)                  // Use \N syntax
1200
 
        {
1201
 
          null_buff[0]=escape_char;
1202
 
          null_buff[1]='N';
1203
 
          if (my_b_write(cache,(unsigned char*) null_buff,2))
1204
 
            return true;
1205
 
        }
1206
 
        else if (my_b_write(cache,(unsigned char*) "NULL",4))
1207
 
          return true;
 
1627
        if (escape_char != -1)                  // Use \N syntax
 
1628
        {
 
1629
          null_buff[0]=escape_char;
 
1630
          null_buff[1]='N';
 
1631
          if (my_b_write(&cache,(unsigned char*) null_buff,2))
 
1632
            goto err;
 
1633
        }
 
1634
        else if (my_b_write(&cache,(unsigned char*) "NULL",4))
 
1635
          goto err;
1208
1636
      }
1209
1637
      else
1210
1638
      {
1211
 
        used_length=0;                          // Fill with space
 
1639
        used_length=0;                          // Fill with space
1212
1640
      }
1213
1641
    }
1214
1642
    else
1215
1643
    {
1216
1644
      if (fixed_row_size)
1217
 
        used_length= min(res->length(), static_cast<size_t>(item->max_length));
 
1645
        used_length=cmin(res->length(),item->max_length);
1218
1646
      else
1219
 
        used_length= res->length();
1220
 
 
 
1647
        used_length=res->length();
1221
1648
      if ((result_type == STRING_RESULT || is_unsafe_field_sep) &&
1222
 
          escape_char != -1)
 
1649
           escape_char != -1)
1223
1650
      {
1224
1651
        char *pos, *start, *end;
1225
1652
        const CHARSET_INFO * const res_charset= res->charset();
1226
1653
        const CHARSET_INFO * const character_set_client= default_charset_info;
1227
1654
 
1228
1655
        bool check_second_byte= (res_charset == &my_charset_bin) &&
1229
 
          character_set_client->
1230
 
          escape_with_backslash_is_dangerous;
 
1656
                                 character_set_client->
 
1657
                                 escape_with_backslash_is_dangerous;
1231
1658
        assert(character_set_client->mbmaxlen == 2 ||
1232
1659
               !character_set_client->escape_with_backslash_is_dangerous);
1233
 
        for (start=pos=(char*) res->ptr(),end=pos+used_length ;
1234
 
             pos != end ;
1235
 
             pos++)
1236
 
        {
1237
 
          if (use_mb(res_charset))
1238
 
          {
1239
 
            int l;
1240
 
            if ((l=my_ismbchar(res_charset, pos, end)))
1241
 
            {
1242
 
              pos += l-1;
1243
 
              continue;
1244
 
            }
1245
 
          }
 
1660
        for (start=pos=(char*) res->ptr(),end=pos+used_length ;
 
1661
             pos != end ;
 
1662
             pos++)
 
1663
        {
 
1664
#ifdef USE_MB
 
1665
          if (use_mb(res_charset))
 
1666
          {
 
1667
            int l;
 
1668
            if ((l=my_ismbchar(res_charset, pos, end)))
 
1669
            {
 
1670
              pos += l-1;
 
1671
              continue;
 
1672
            }
 
1673
          }
 
1674
#endif
1246
1675
 
1247
1676
          /*
1248
1677
            Special case when dumping BINARY/VARBINARY/BLOB values
1276
1705
            assert before the loop makes that sure.
1277
1706
          */
1278
1707
 
1279
 
          if ((needs_escaping(*pos, enclosed) ||
 
1708
          if ((NEED_ESCAPING(*pos) ||
1280
1709
               (check_second_byte &&
1281
1710
                my_mbcharlen(character_set_client, (unsigned char) *pos) == 2 &&
1282
1711
                pos + 1 < end &&
1283
 
                needs_escaping(pos[1], enclosed))) &&
 
1712
                NEED_ESCAPING(pos[1]))) &&
1284
1713
              /*
1285
 
                Don't escape field_term_char by doubling - doubling is only
1286
 
                valid for ENCLOSED BY characters:
 
1714
               Don't escape field_term_char by doubling - doubling is only
 
1715
               valid for ENCLOSED BY characters:
1287
1716
              */
1288
1717
              (enclosed || !is_ambiguous_field_term ||
1289
1718
               (int) (unsigned char) *pos != field_term_char))
1290
1719
          {
1291
 
            char tmp_buff[2];
 
1720
            char tmp_buff[2];
1292
1721
            tmp_buff[0]= ((int) (unsigned char) *pos == field_sep_char &&
1293
1722
                          is_ambiguous_field_sep) ?
1294
 
              field_sep_char : escape_char;
1295
 
            tmp_buff[1]= *pos ? *pos : '0';
1296
 
            if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)) ||
1297
 
                my_b_write(cache,(unsigned char*) tmp_buff,2))
1298
 
              return true;
1299
 
            start=pos+1;
1300
 
          }
1301
 
        }
1302
 
        if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)))
1303
 
          return true;
 
1723
                          field_sep_char : escape_char;
 
1724
            tmp_buff[1]= *pos ? *pos : '0';
 
1725
            if (my_b_write(&cache,(unsigned char*) start,(uint32_t) (pos-start)) ||
 
1726
                my_b_write(&cache,(unsigned char*) tmp_buff,2))
 
1727
              goto err;
 
1728
            start=pos+1;
 
1729
          }
 
1730
        }
 
1731
        if (my_b_write(&cache,(unsigned char*) start,(uint32_t) (pos-start)))
 
1732
          goto err;
1304
1733
      }
1305
 
      else if (my_b_write(cache,(unsigned char*) res->ptr(),used_length))
1306
 
        return true;
 
1734
      else if (my_b_write(&cache,(unsigned char*) res->ptr(),used_length))
 
1735
        goto err;
1307
1736
    }
1308
1737
    if (fixed_row_size)
1309
1738
    {                                           // Fill with space
1310
1739
      if (item->max_length > used_length)
1311
1740
      {
1312
 
        /* QQ:  Fix by adding a my_b_fill() function */
1313
 
        if (!space_inited)
1314
 
        {
1315
 
          space_inited=1;
1316
 
          memset(space, ' ', sizeof(space));
1317
 
        }
1318
 
        uint32_t length=item->max_length-used_length;
1319
 
        for (; length > sizeof(space) ; length-=sizeof(space))
1320
 
        {
1321
 
          if (my_b_write(cache,(unsigned char*) space,sizeof(space)))
1322
 
            return true;
1323
 
        }
1324
 
        if (my_b_write(cache,(unsigned char*) space,length))
1325
 
          return true;
 
1741
        /* QQ:  Fix by adding a my_b_fill() function */
 
1742
        if (!space_inited)
 
1743
        {
 
1744
          space_inited=1;
 
1745
          memset(space, ' ', sizeof(space));
 
1746
        }
 
1747
        uint32_t length=item->max_length-used_length;
 
1748
        for (; length > sizeof(space) ; length-=sizeof(space))
 
1749
        {
 
1750
          if (my_b_write(&cache,(unsigned char*) space,sizeof(space)))
 
1751
            goto err;
 
1752
        }
 
1753
        if (my_b_write(&cache,(unsigned char*) space,length))
 
1754
          goto err;
1326
1755
      }
1327
1756
    }
1328
1757
    if (res && enclosed)
1329
1758
    {
1330
 
      if (my_b_write(cache, (unsigned char*) exchange->enclosed->ptr(),
 
1759
      if (my_b_write(&cache, (unsigned char*) exchange->enclosed->ptr(),
1331
1760
                     exchange->enclosed->length()))
1332
 
        return true;
 
1761
        goto err;
1333
1762
    }
1334
1763
    if (--items_left)
1335
1764
    {
1336
 
      if (my_b_write(cache, (unsigned char*) exchange->field_term->ptr(),
 
1765
      if (my_b_write(&cache, (unsigned char*) exchange->field_term->ptr(),
1337
1766
                     field_term_length))
1338
 
        return true;
 
1767
        goto err;
1339
1768
    }
1340
1769
  }
1341
 
  if (my_b_write(cache,(unsigned char*) exchange->line_term->ptr(),
1342
 
                 exchange->line_term->length()))
1343
 
  {
1344
 
    return true;
1345
 
  }
1346
 
 
1347
 
  return false;
 
1770
  if (my_b_write(&cache,(unsigned char*) exchange->line_term->ptr(),
 
1771
                 exchange->line_term->length()))
 
1772
    goto err;
 
1773
  return(0);
 
1774
err:
 
1775
  return(1);
1348
1776
}
1349
1777
 
1350
1778
 
1357
1785
select_dump::prepare(List<Item> &, Select_Lex_Unit *u)
1358
1786
{
1359
1787
  unit= u;
1360
 
  return (int) ((file= create_file(session, path, exchange, cache)) < 0);
 
1788
  return (int) ((file= create_file(session, path, exchange, &cache)) < 0);
1361
1789
}
1362
1790
 
1363
1791
 
1377
1805
  if (row_count++ > 1)
1378
1806
  {
1379
1807
    my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0));
1380
 
    return 1;
 
1808
    goto err;
1381
1809
  }
1382
1810
  while ((item=li++))
1383
1811
  {
1384
1812
    res=item->str_result(&tmp);
1385
1813
    if (!res)                                   // If NULL
1386
1814
    {
1387
 
      if (my_b_write(cache,(unsigned char*) "",1))
1388
 
        return 1;
 
1815
      if (my_b_write(&cache,(unsigned char*) "",1))
 
1816
        goto err;
1389
1817
    }
1390
 
    else if (my_b_write(cache,(unsigned char*) res->ptr(),res->length()))
 
1818
    else if (my_b_write(&cache,(unsigned char*) res->ptr(),res->length()))
1391
1819
    {
1392
 
      my_error(ER_ERROR_ON_WRITE, MYF(0), path.file_string().c_str(), errno);
1393
 
      return 1;
 
1820
      my_error(ER_ERROR_ON_WRITE, MYF(0), path, my_errno);
 
1821
      goto err;
1394
1822
    }
1395
1823
  }
1396
1824
  return(0);
 
1825
err:
 
1826
  return(1);
1397
1827
}
1398
1828
 
1399
1829
 
1428
1858
void select_max_min_finder_subselect::cleanup()
1429
1859
{
1430
1860
  cache= 0;
 
1861
  return;
1431
1862
}
1432
1863
 
1433
1864
 
1451
1882
      switch (val_item->result_type())
1452
1883
      {
1453
1884
      case REAL_RESULT:
1454
 
        op= &select_max_min_finder_subselect::cmp_real;
1455
 
        break;
 
1885
        op= &select_max_min_finder_subselect::cmp_real;
 
1886
        break;
1456
1887
      case INT_RESULT:
1457
 
        op= &select_max_min_finder_subselect::cmp_int;
1458
 
        break;
 
1888
        op= &select_max_min_finder_subselect::cmp_int;
 
1889
        break;
1459
1890
      case STRING_RESULT:
1460
 
        op= &select_max_min_finder_subselect::cmp_str;
1461
 
        break;
 
1891
        op= &select_max_min_finder_subselect::cmp_str;
 
1892
        break;
1462
1893
      case DECIMAL_RESULT:
1463
1894
        op= &select_max_min_finder_subselect::cmp_decimal;
1464
1895
        break;
1465
1896
      case ROW_RESULT:
1466
1897
        // This case should never be choosen
1467
 
        assert(0);
1468
 
        op= 0;
 
1898
        assert(0);
 
1899
        op= 0;
1469
1900
      }
1470
1901
    }
1471
1902
    cache->store(val_item);
1547
1978
  return(0);
1548
1979
}
1549
1980
 
 
1981
 
 
1982
/*
 
1983
  Statement functions
 
1984
*/
 
1985
 
 
1986
Statement::Statement(LEX *lex_arg, MEM_ROOT *mem_root_arg, ulong id_arg)
 
1987
  :Query_arena(mem_root_arg),
 
1988
  id(id_arg),
 
1989
  mark_used_columns(MARK_COLUMNS_READ),
 
1990
  lex(lex_arg),
 
1991
  query(0),
 
1992
  query_length(0),
 
1993
  db(NULL),
 
1994
  db_length(0)
 
1995
{
 
1996
}
 
1997
 
 
1998
 
1550
1999
/*
1551
2000
  Don't free mem_root, as mem_root is freed in the end of dispatch_command
1552
2001
  (once for any command).
1554
2003
void Session::end_statement()
1555
2004
{
1556
2005
  /* Cleanup SQL processing state to reuse this statement in next query. */
1557
 
  lex->end();
1558
 
  query_cache_key= ""; // reset the cache key
1559
 
  resetResultsetMessage();
 
2006
  lex_end(lex);
1560
2007
}
1561
2008
 
 
2009
 
1562
2010
bool Session::copy_db_to(char **p_db, size_t *p_db_length)
1563
2011
{
1564
 
  assert(_schema);
1565
 
  if (_schema and _schema->empty())
1566
 
  {
1567
 
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1568
 
    return true;
1569
 
  }
1570
 
  else if (not _schema)
1571
 
  {
1572
 
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1573
 
    return true;
1574
 
  }
1575
 
  assert(_schema);
1576
 
 
1577
 
  *p_db= strmake(_schema->c_str(), _schema->size());
1578
 
  *p_db_length= _schema->size();
1579
 
 
 
2012
  if (db == NULL)
 
2013
  {
 
2014
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
 
2015
    return true;
 
2016
  }
 
2017
  *p_db= strmake(db, db_length);
 
2018
  *p_db_length= db_length;
1580
2019
  return false;
1581
2020
}
1582
2021
 
 
2022
 
1583
2023
/****************************************************************************
1584
2024
  Tmp_Table_Param
1585
2025
****************************************************************************/
1591
2031
  quick_group= 1;
1592
2032
  table_charset= 0;
1593
2033
  precomputed_group_by= 0;
 
2034
  bit_fields_as_long= 0;
 
2035
  return;
1594
2036
}
1595
2037
 
1596
2038
void Tmp_Table_Param::cleanup(void)
1603
2045
  }
1604
2046
}
1605
2047
 
 
2048
 
 
2049
void session_increment_bytes_sent(ulong length)
 
2050
{
 
2051
  Session *session=current_session;
 
2052
  if (likely(session != 0))
 
2053
  { /* current_session==0 when disconnect() calls net_send_error() */
 
2054
    session->status_var.bytes_sent+= length;
 
2055
  }
 
2056
}
 
2057
 
 
2058
 
 
2059
void session_increment_bytes_received(ulong length)
 
2060
{
 
2061
  current_session->status_var.bytes_received+= length;
 
2062
}
 
2063
 
 
2064
 
 
2065
void session_increment_net_big_packet_count(ulong length)
 
2066
{
 
2067
  current_session->status_var.net_big_packet_count+= length;
 
2068
}
 
2069
 
1606
2070
void Session::send_kill_message() const
1607
2071
{
1608
2072
  int err= killed_errno();
1615
2079
  memset(&status_var, 0, sizeof(status_var));
1616
2080
}
1617
2081
 
1618
 
 
1619
 
void Session::set_db(const std::string &new_db)
 
2082
void Security_context::skip_grants()
 
2083
{
 
2084
  /* privileges for the user are unknown everything is allowed */
 
2085
}
 
2086
 
 
2087
 
 
2088
/****************************************************************************
 
2089
  Handling of open and locked tables states.
 
2090
 
 
2091
  This is used when we want to open/lock (and then close) some tables when
 
2092
  we already have a set of tables open and locked. We use these methods for
 
2093
  access to mysql.proc table to find definitions of stored routines.
 
2094
****************************************************************************/
 
2095
 
 
2096
void Session::reset_n_backup_open_tables_state(Open_tables_state *backup)
 
2097
{
 
2098
  backup->set_open_tables_state(this);
 
2099
  reset_open_tables_state();
 
2100
  state_flags|= Open_tables_state::BACKUPS_AVAIL;
 
2101
  return;
 
2102
}
 
2103
 
 
2104
 
 
2105
void Session::restore_backup_open_tables_state(Open_tables_state *backup)
 
2106
{
 
2107
  /*
 
2108
    Before we will throw away current open tables state we want
 
2109
    to be sure that it was properly cleaned up.
 
2110
  */
 
2111
  assert(open_tables == 0 && temporary_tables == 0 &&
 
2112
              handler_tables == 0 && derived_tables == 0 &&
 
2113
              lock == 0 && locked_tables == 0);
 
2114
  set_open_tables_state(backup);
 
2115
  return;
 
2116
}
 
2117
 
 
2118
 
 
2119
bool Session::set_db(const char *new_db, size_t new_db_len)
1620
2120
{
1621
2121
  /* Do not reallocate memory if current chunk is big enough. */
1622
 
  if (new_db.length())
1623
 
  {
1624
 
    _schema.reset(new std::string(new_db));
1625
 
  }
 
2122
  if (db && new_db && db_length >= new_db_len)
 
2123
    memcpy(db, new_db, new_db_len+1);
1626
2124
  else
1627
2125
  {
1628
 
    _schema.reset(new std::string(""));
 
2126
    if (db)
 
2127
      free(db);
 
2128
    if (new_db)
 
2129
    {
 
2130
      db= (char *)malloc(new_db_len + 1);
 
2131
      if (db != NULL)
 
2132
      {
 
2133
        memcpy(db, new_db, new_db_len);
 
2134
        db[new_db_len]= 0;
 
2135
      }
 
2136
    }
 
2137
    else
 
2138
      db= NULL;
1629
2139
  }
 
2140
  db_length= db ? new_db_len : 0;
 
2141
  return new_db && !db;
 
2142
}
 
2143
 
 
2144
 
 
2145
/**
 
2146
  Check the killed state of a user thread
 
2147
  @param session  user thread
 
2148
  @retval 0 the user thread is active
 
2149
  @retval 1 the user thread has been killed
 
2150
*/
 
2151
extern "C" int session_killed(const Session *session)
 
2152
{
 
2153
  return(session->killed);
 
2154
}
 
2155
 
 
2156
/**
 
2157
  Return the thread id of a user thread
 
2158
  @param session user thread
 
2159
  @return thread id
 
2160
*/
 
2161
extern "C" unsigned long session_get_thread_id(const Session *session)
 
2162
{
 
2163
  return((unsigned long)session->thread_id);
 
2164
}
 
2165
 
 
2166
 
 
2167
extern "C"
 
2168
LEX_STRING *session_make_lex_string(Session *session, LEX_STRING *lex_str,
 
2169
                                const char *str, unsigned int size,
 
2170
                                int allocate_lex_string)
 
2171
{
 
2172
  return session->make_lex_string(lex_str, str, size,
 
2173
                              (bool) allocate_lex_string);
 
2174
}
 
2175
 
 
2176
extern "C" const struct charset_info_st *session_charset(Session *session)
 
2177
{
 
2178
  return(session->charset());
 
2179
}
 
2180
 
 
2181
extern "C" char **session_query(Session *session)
 
2182
{
 
2183
  return(&session->query);
 
2184
}
 
2185
 
 
2186
extern "C" int session_non_transactional_update(const Session *session)
 
2187
{
 
2188
  return(session->transaction.all.modified_non_trans_table);
 
2189
}
 
2190
 
 
2191
extern "C" void session_mark_transaction_to_rollback(Session *session, bool all)
 
2192
{
 
2193
  mark_transaction_to_rollback(session, all);
1630
2194
}
1631
2195
 
1632
2196
 
1636
2200
  @param  session   Thread handle
1637
2201
  @param  all   true <=> rollback main transaction.
1638
2202
*/
 
2203
 
1639
2204
void mark_transaction_to_rollback(Session *session, bool all)
1640
2205
{
1641
2206
  if (session)
1651
2216
  plugin_sessionvar_cleanup(this);
1652
2217
 
1653
2218
  /* If necessary, log any aborted or unauthorized connections */
1654
 
  if (getKilled() || client->wasAborted())
1655
 
  {
1656
 
    status_var.aborted_threads++;
1657
 
  }
 
2219
  if (killed || (net.error && net.vio != 0))
 
2220
    statistic_increment(aborted_threads, &LOCK_status);
1658
2221
 
1659
 
  if (client->wasAborted())
 
2222
  if (net.error && net.vio != 0)
1660
2223
  {
1661
 
    if (not getKilled() && variables.log_warnings > 1)
 
2224
    if (! killed && variables.log_warnings > 1)
1662
2225
    {
1663
 
      SecurityContext *sctx= &security_ctx;
 
2226
      Security_context *sctx= &security_ctx;
1664
2227
 
1665
2228
      errmsg_printf(ERRMSG_LVL_WARN, ER(ER_NEW_ABORTING_CONNECTION)
1666
2229
                  , thread_id
1667
 
                  , (_schema->empty() ? "unconnected" : _schema->c_str())
1668
 
                  , sctx->getUser().empty() == false ? sctx->getUser().c_str() : "unauthenticated"
1669
 
                  , sctx->getIp().c_str()
 
2230
                  , (db ? db : "unconnected")
 
2231
                  , sctx->user.empty() == false ? sctx->user.c_str() : "unauthenticated"
 
2232
                  , sctx->ip.c_str()
1670
2233
                  , (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1671
2234
    }
1672
2235
  }
1673
2236
 
1674
2237
  /* Close out our connection to the client */
 
2238
  st_vio *vio;
1675
2239
  if (should_lock)
1676
 
    session::Cache::singleton().mutex().lock();
1677
 
 
1678
 
  setKilled(Session::KILL_CONNECTION);
1679
 
 
1680
 
  if (client->isConnected())
 
2240
    (void) pthread_mutex_lock(&LOCK_thread_count);
 
2241
  killed= Session::KILL_CONNECTION;
 
2242
  if ((vio= net.vio) != 0)
1681
2243
  {
1682
2244
    if (errcode)
1683
 
    {
1684
 
      /*my_error(errcode, ER(errcode));*/
1685
 
      client->sendError(errcode, ER(errcode));
1686
 
    }
1687
 
    client->close();
 
2245
      net_send_error(this, errcode, ER(errcode)); /* purecov: inspected */
 
2246
    drizzleclient_net_close(&net);              /* vio is freed in delete session */
1688
2247
  }
1689
 
 
1690
2248
  if (should_lock)
1691
 
  {
1692
 
    session::Cache::singleton().mutex().unlock();
1693
 
  }
 
2249
    (void) pthread_mutex_unlock(&LOCK_thread_count);
1694
2250
}
1695
2251
 
 
2252
/**
 
2253
 Reset Session part responsible for command processing state.
 
2254
 
 
2255
   This needs to be called before execution of every statement
 
2256
   (prepared or conventional).
 
2257
   It is not called by substatements of routines.
 
2258
 
 
2259
  @todo
 
2260
   Make it a method of Session and align its name with the rest of
 
2261
   reset/end/start/init methods.
 
2262
  @todo
 
2263
   Call it after we use Session for queries, not before.
 
2264
*/
 
2265
 
1696
2266
void Session::reset_for_next_command()
1697
2267
{
1698
2268
  free_list= 0;
1703
2273
  */
1704
2274
  auto_inc_intervals_in_cur_stmt_for_binlog.empty();
1705
2275
 
1706
 
  is_fatal_error= false;
 
2276
  is_fatal_error= 0;
1707
2277
  server_status&= ~ (SERVER_MORE_RESULTS_EXISTS |
1708
2278
                          SERVER_QUERY_NO_INDEX_USED |
1709
2279
                          SERVER_QUERY_NO_GOOD_INDEX_USED);
 
2280
  /*
 
2281
    If in autocommit mode and not in a transaction, reset
 
2282
    OPTION_STATUS_NO_TRANS_UPDATE | OPTION_KEEP_LOG to not get warnings
 
2283
    in ha_rollback_trans() about some tables couldn't be rolled back.
 
2284
  */
 
2285
  if (!(options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
 
2286
  {
 
2287
    options&= ~OPTION_KEEP_LOG;
 
2288
    transaction.all.modified_non_trans_table= false;
 
2289
  }
 
2290
  thread_specific_used= false;
1710
2291
 
1711
2292
  clear_error();
1712
2293
  main_da.reset_diagnostics_area();
1713
2294
  total_warn_count=0;                   // Warnings for this query
1714
2295
  sent_row_count= examined_row_count= 0;
 
2296
 
 
2297
  return;
1715
2298
}
1716
2299
 
1717
2300
/*
1718
2301
  Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
 
2302
  creates one DROP TEMPORARY Table binlog event for each pseudo-thread
1719
2303
*/
1720
2304
 
1721
 
void Open_tables_state::close_temporary_tables()
 
2305
void Session::close_temporary_tables()
1722
2306
{
1723
2307
  Table *table;
1724
2308
  Table *tmp_next;
1725
2309
 
1726
 
  if (not temporary_tables)
 
2310
  if (!temporary_tables)
1727
2311
    return;
1728
2312
 
1729
2313
  for (table= temporary_tables; table; table= tmp_next)
1730
2314
  {
1731
 
    tmp_next= table->getNext();
1732
 
    nukeTable(table);
1733
 
  }
1734
 
  temporary_tables= NULL;
1735
 
}
1736
 
 
1737
 
/*
1738
 
  unlink from session->temporary tables and close temporary table
1739
 
*/
1740
 
 
1741
 
void Open_tables_state::close_temporary_table(Table *table)
1742
 
{
1743
 
  if (table->getPrev())
1744
 
  {
1745
 
    table->getPrev()->setNext(table->getNext());
1746
 
    if (table->getPrev()->getNext())
1747
 
    {
1748
 
      table->getNext()->setPrev(table->getPrev());
1749
 
    }
1750
 
  }
1751
 
  else
1752
 
  {
1753
 
    /* removing the item from the list */
1754
 
    assert(table == temporary_tables);
1755
 
    /*
1756
 
      slave must reset its temporary list pointer to zero to exclude
1757
 
      passing non-zero value to end_slave via rli->save_temporary_tables
1758
 
      when no temp tables opened, see an invariant below.
1759
 
    */
1760
 
    temporary_tables= table->getNext();
1761
 
    if (temporary_tables)
1762
 
    {
1763
 
      table->getNext()->setPrev(NULL);
1764
 
    }
1765
 
  }
1766
 
  nukeTable(table);
1767
 
}
1768
 
 
1769
 
/*
1770
 
  Close and drop a temporary table
1771
 
 
1772
 
  NOTE
1773
 
  This dosn't unlink table from session->temporary
1774
 
  If this is needed, use close_temporary_table()
1775
 
*/
1776
 
 
1777
 
void Open_tables_state::nukeTable(Table *table)
1778
 
{
1779
 
  plugin::StorageEngine *table_type= table->getShare()->db_type();
1780
 
 
1781
 
  table->free_io_cache();
1782
 
  table->delete_table();
1783
 
 
1784
 
  TableIdentifier identifier(table->getShare()->getSchemaName(), table->getShare()->getTableName(), table->getShare()->getPath());
1785
 
  rm_temporary_table(table_type, identifier);
1786
 
 
1787
 
  delete table->getMutableShare();
1788
 
 
1789
 
  /* This makes me sad, but we're allocating it via malloc */
1790
 
  delete table;
1791
 
}
 
2315
    tmp_next= table->next;
 
2316
    close_temporary(table, 1, 1);
 
2317
  }
 
2318
  temporary_tables= 0;
 
2319
 
 
2320
  return;
 
2321
}
 
2322
 
1792
2323
 
1793
2324
/** Clear most status variables. */
1794
2325
extern time_t flush_status_time;
 
2326
extern uint32_t max_used_connections;
1795
2327
 
1796
2328
void Session::refresh_status()
1797
2329
{
 
2330
  pthread_mutex_lock(&LOCK_status);
 
2331
 
 
2332
  /* Add thread's status variabes to global status */
 
2333
  add_to_status(&global_status_var, &status_var);
 
2334
 
1798
2335
  /* Reset thread's status variables */
1799
2336
  memset(&status_var, 0, sizeof(status_var));
1800
2337
 
 
2338
  /* Reset some global variables */
 
2339
  reset_status_vars();
 
2340
 
 
2341
  /* Reset the counters of all key caches (default and named). */
 
2342
  process_key_caches(reset_key_cache_counters);
1801
2343
  flush_status_time= time((time_t*) 0);
1802
 
  current_global_counters.max_used_connections= 1; /* We set it to one, because we know we exist */
1803
 
  current_global_counters.connections= 0;
1804
 
}
1805
 
 
1806
 
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1807
 
{
1808
 
  return getVariable(std::string(name.str, name.length), create_if_not_exists);
1809
 
}
1810
 
 
1811
 
user_var_entry *Session::getVariable(const std::string  &name, bool create_if_not_exists)
1812
 
{
1813
 
  UserVarsRange ppp= user_vars.equal_range(name);
1814
 
 
1815
 
  for (UserVars::iterator iter= ppp.first;
1816
 
       iter != ppp.second; ++iter)
1817
 
  {
1818
 
    return (*iter).second;
1819
 
  }
1820
 
 
1821
 
  if (not create_if_not_exists)
1822
 
    return NULL;
1823
 
 
1824
 
  user_var_entry *entry= NULL;
1825
 
  entry= new (nothrow) user_var_entry(name.c_str(), query_id);
1826
 
 
1827
 
  if (entry == NULL)
1828
 
    return NULL;
1829
 
 
1830
 
  std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(name, entry));
1831
 
 
1832
 
  if (not returnable.second)
1833
 
  {
1834
 
    delete entry;
1835
 
  }
1836
 
 
1837
 
  return entry;
1838
 
}
1839
 
 
1840
 
void Session::setVariable(const std::string &name, const std::string &value)
1841
 
{
1842
 
  user_var_entry *updateable_var= getVariable(name.c_str(), true);
1843
 
 
1844
 
  updateable_var->update_hash(false,
1845
 
                              (void*)value.c_str(),
1846
 
                              static_cast<uint32_t>(value.length()), STRING_RESULT,
1847
 
                              &my_charset_bin,
1848
 
                              DERIVATION_IMPLICIT, false);
1849
 
}
1850
 
 
1851
 
void Open_tables_state::mark_temp_tables_as_free_for_reuse()
1852
 
{
1853
 
  for (Table *table= temporary_tables ; table ; table= table->getNext())
1854
 
  {
1855
 
    if (table->query_id == getQueryId())
1856
 
    {
1857
 
      table->query_id= 0;
1858
 
      table->cursor->ha_reset();
1859
 
    }
1860
 
  }
1861
 
}
1862
 
 
1863
 
void Session::mark_used_tables_as_free_for_reuse(Table *table)
1864
 
{
1865
 
  for (; table ; table= table->getNext())
1866
 
  {
1867
 
    if (table->query_id == getQueryId())
1868
 
    {
1869
 
      table->query_id= 0;
1870
 
      table->cursor->ha_reset();
1871
 
    }
1872
 
  }
1873
 
}
1874
 
 
1875
 
/*
1876
 
  Unlocks tables and frees derived tables.
1877
 
  Put all normal tables used by thread in free list.
1878
 
 
1879
 
  It will only close/mark as free for reuse tables opened by this
1880
 
  substatement, it will also check if we are closing tables after
1881
 
  execution of complete query (i.e. we are on upper level) and will
1882
 
  leave prelocked mode if needed.
1883
 
*/
1884
 
void Session::close_thread_tables()
1885
 
{
1886
 
  clearDerivedTables();
1887
 
 
1888
 
  /*
1889
 
    Mark all temporary tables used by this statement as free for reuse.
1890
 
  */
1891
 
  mark_temp_tables_as_free_for_reuse();
1892
 
  /*
1893
 
    Let us commit transaction for statement. Since in 5.0 we only have
1894
 
    one statement transaction and don't allow several nested statement
1895
 
    transactions this call will do nothing if we are inside of stored
1896
 
    function or trigger (i.e. statement transaction is already active and
1897
 
    does not belong to statement for which we do close_thread_tables()).
1898
 
    TODO: This should be fixed in later releases.
1899
 
   */
1900
 
  {
1901
 
    TransactionServices &transaction_services= TransactionServices::singleton();
1902
 
    main_da.can_overwrite_status= true;
1903
 
    transaction_services.autocommitOrRollback(this, is_error());
1904
 
    main_da.can_overwrite_status= false;
1905
 
    transaction.stmt.reset();
1906
 
  }
1907
 
 
1908
 
  if (lock)
1909
 
  {
1910
 
    /*
1911
 
      For RBR we flush the pending event just before we unlock all the
1912
 
      tables.  This means that we are at the end of a topmost
1913
 
      statement, so we ensure that the STMT_END_F flag is set on the
1914
 
      pending event.  For statements that are *inside* stored
1915
 
      functions, the pending event will not be flushed: that will be
1916
 
      handled either before writing a query log event (inside
1917
 
      binlog_query()) or when preparing a pending event.
1918
 
     */
1919
 
    unlockTables(lock);
1920
 
    lock= 0;
1921
 
  }
1922
 
  /*
1923
 
    Note that we need to hold table::Cache::singleton().mutex() while changing the
1924
 
    open_tables list. Another thread may work on it.
1925
 
    (See: table::Cache::singleton().removeTable(), mysql_wait_completed_table())
1926
 
    Closing a MERGE child before the parent would be fatal if the
1927
 
    other thread tries to abort the MERGE lock in between.
1928
 
  */
1929
 
  if (open_tables)
1930
 
    close_open_tables();
1931
 
}
1932
 
 
1933
 
void Session::close_tables_for_reopen(TableList **tables)
1934
 
{
1935
 
  /*
1936
 
    If table list consists only from tables from prelocking set, table list
1937
 
    for new attempt should be empty, so we have to update list's root pointer.
1938
 
  */
1939
 
  if (lex->first_not_own_table() == *tables)
1940
 
    *tables= 0;
1941
 
  lex->chop_off_not_own_tables();
1942
 
  for (TableList *tmp= *tables; tmp; tmp= tmp->next_global)
1943
 
    tmp->table= 0;
1944
 
  close_thread_tables();
1945
 
}
1946
 
 
1947
 
bool Session::openTablesLock(TableList *tables)
1948
 
{
1949
 
  uint32_t counter;
1950
 
  bool need_reopen;
1951
 
 
1952
 
  for ( ; ; )
1953
 
  {
1954
 
    if (open_tables_from_list(&tables, &counter))
1955
 
      return true;
1956
 
 
1957
 
    if (not lock_tables(tables, counter, &need_reopen))
1958
 
      break;
1959
 
    if (not need_reopen)
1960
 
      return true;
1961
 
    close_tables_for_reopen(&tables);
1962
 
  }
1963
 
  if ((mysql_handle_derived(lex, &mysql_derived_prepare) ||
1964
 
       (
1965
 
        mysql_handle_derived(lex, &mysql_derived_filling))))
1966
 
    return true;
1967
 
 
1968
 
  return false;
1969
 
}
1970
 
 
1971
 
/*
1972
 
  @note "best_effort" is used in cases were if a failure occurred on this
1973
 
  operation it would not be surprising because we are only removing because there
1974
 
  might be an issue (lame engines).
1975
 
*/
1976
 
 
1977
 
bool Open_tables_state::rm_temporary_table(const TableIdentifier &identifier, bool best_effort)
1978
 
{
1979
 
  if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), identifier))
1980
 
  {
1981
 
    if (not best_effort)
1982
 
    {
1983
 
      std::string path;
1984
 
      identifier.getSQLPath(path);
1985
 
      errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
1986
 
                    path.c_str(), errno);
1987
 
    }
1988
 
 
1989
 
    return true;
1990
 
  }
1991
 
 
1992
 
  return false;
1993
 
}
1994
 
 
1995
 
bool Open_tables_state::rm_temporary_table(plugin::StorageEngine *base, const TableIdentifier &identifier)
1996
 
{
1997
 
  assert(base);
1998
 
 
1999
 
  if (plugin::StorageEngine::dropTable(*static_cast<Session *>(this), *base, identifier))
2000
 
  {
2001
 
    std::string path;
2002
 
    identifier.getSQLPath(path);
2003
 
    errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
2004
 
                  path.c_str(), errno);
2005
 
 
2006
 
    return true;
2007
 
  }
2008
 
 
2009
 
  return false;
2010
 
}
2011
 
 
2012
 
/**
2013
 
  @note this will be removed, I am looking through Hudson to see if it is finding
2014
 
  any tables that are missed during cleanup.
2015
 
*/
2016
 
void Open_tables_state::dumpTemporaryTableNames(const char *foo)
2017
 
{
2018
 
  Table *table;
2019
 
 
2020
 
  if (not temporary_tables)
2021
 
    return;
2022
 
 
2023
 
  cerr << "Begin Run: " << foo << "\n";
2024
 
  for (table= temporary_tables; table; table= table->getNext())
2025
 
  {
2026
 
    bool have_proto= false;
2027
 
 
2028
 
    message::Table *proto= table->getShare()->getTableProto();
2029
 
    if (table->getShare()->getTableProto())
2030
 
      have_proto= true;
2031
 
 
2032
 
    const char *answer= have_proto ? "true" : "false";
2033
 
 
2034
 
    if (have_proto)
2035
 
    {
2036
 
      cerr << "\tTable Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2037
 
      cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
2038
 
    }
2039
 
    else
2040
 
    {
2041
 
      cerr << "\tTabl;e Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2042
 
    }
2043
 
  }
2044
 
}
2045
 
 
2046
 
bool Session::TableMessages::storeTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2047
 
{
2048
 
  table_message_cache.insert(make_pair(identifier.getPath(), table_message));
2049
 
 
2050
 
  return true;
2051
 
}
2052
 
 
2053
 
bool Session::TableMessages::removeTableMessage(const TableIdentifier &identifier)
2054
 
{
2055
 
  TableMessageCache::iterator iter;
2056
 
 
2057
 
  iter= table_message_cache.find(identifier.getPath());
2058
 
 
2059
 
  if (iter == table_message_cache.end())
2060
 
    return false;
2061
 
 
2062
 
  table_message_cache.erase(iter);
2063
 
 
2064
 
  return true;
2065
 
}
2066
 
 
2067
 
bool Session::TableMessages::getTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2068
 
{
2069
 
  TableMessageCache::iterator iter;
2070
 
 
2071
 
  iter= table_message_cache.find(identifier.getPath());
2072
 
 
2073
 
  if (iter == table_message_cache.end())
2074
 
    return false;
2075
 
 
2076
 
  table_message.CopyFrom(((*iter).second));
2077
 
 
2078
 
  return true;
2079
 
}
2080
 
 
2081
 
bool Session::TableMessages::doesTableMessageExist(const TableIdentifier &identifier)
2082
 
{
2083
 
  TableMessageCache::iterator iter;
2084
 
 
2085
 
  iter= table_message_cache.find(identifier.getPath());
2086
 
 
2087
 
  if (iter == table_message_cache.end())
2088
 
  {
2089
 
    return false;
2090
 
  }
2091
 
 
2092
 
  return true;
2093
 
}
2094
 
 
2095
 
bool Session::TableMessages::renameTableMessage(const TableIdentifier &from, const TableIdentifier &to)
2096
 
{
2097
 
  TableMessageCache::iterator iter;
2098
 
 
2099
 
  table_message_cache[to.getPath()]= table_message_cache[from.getPath()];
2100
 
 
2101
 
  iter= table_message_cache.find(to.getPath());
2102
 
 
2103
 
  if (iter == table_message_cache.end())
2104
 
  {
2105
 
    return false;
2106
 
  }
2107
 
 
2108
 
  (*iter).second.set_schema(to.getSchemaName());
2109
 
  (*iter).second.set_name(to.getTableName());
2110
 
 
2111
 
  return true;
2112
 
}
2113
 
 
2114
 
table::Instance *Session::getInstanceTable()
2115
 
{
2116
 
  temporary_shares.push_back(new table::Instance()); // This will not go into the tableshare cache, so no key is used.
2117
 
 
2118
 
  table::Instance *tmp_share= temporary_shares.back();
2119
 
 
2120
 
  assert(tmp_share);
2121
 
 
2122
 
  return tmp_share;
2123
 
}
2124
 
 
2125
 
 
2126
 
/**
2127
 
  Create a reduced Table object with properly set up Field list from a
2128
 
  list of field definitions.
2129
 
 
2130
 
    The created table doesn't have a table Cursor associated with
2131
 
    it, has no keys, no group/distinct, no copy_funcs array.
2132
 
    The sole purpose of this Table object is to use the power of Field
2133
 
    class to read/write data to/from table->getInsertRecord(). Then one can store
2134
 
    the record in any container (RB tree, hash, etc).
2135
 
    The table is created in Session mem_root, so are the table's fields.
2136
 
    Consequently, if you don't BLOB fields, you don't need to free it.
2137
 
 
2138
 
  @param session         connection handle
2139
 
  @param field_list  list of column definitions
2140
 
 
2141
 
  @return
2142
 
    0 if out of memory, Table object in case of success
2143
 
*/
2144
 
table::Instance *Session::getInstanceTable(List<CreateField> &field_list)
2145
 
{
2146
 
  temporary_shares.push_back(new table::Instance(this, field_list)); // This will not go into the tableshare cache, so no key is used.
2147
 
 
2148
 
  table::Instance *tmp_share= temporary_shares.back();
2149
 
 
2150
 
  assert(tmp_share);
2151
 
 
2152
 
  return tmp_share;
2153
 
}
2154
 
 
2155
 
namespace display  {
2156
 
 
2157
 
static const std::string NONE= "NONE";
2158
 
static const std::string GOT_GLOBAL_READ_LOCK= "HAS GLOBAL READ LOCK";
2159
 
static const std::string MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT= "HAS GLOBAL READ LOCK WITH BLOCKING COMMIT";
2160
 
 
2161
 
const std::string &type(drizzled::Session::global_read_lock_t type)
2162
 
{
2163
 
  switch (type) {
2164
 
    default:
2165
 
    case Session::NONE:
2166
 
      return NONE;
2167
 
    case Session::GOT_GLOBAL_READ_LOCK:
2168
 
      return GOT_GLOBAL_READ_LOCK;
2169
 
    case Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT:
2170
 
      return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT;
2171
 
  }
2172
 
}
2173
 
 
2174
 
size_t max_string_length(drizzled::Session::global_read_lock_t)
2175
 
{
2176
 
  return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT.size();
2177
 
}
2178
 
 
2179
 
} /* namespace display */
2180
 
 
2181
 
} /* namespace drizzled */
 
2344
  max_used_connections= 1; /* We set it to one, because we know we exist */
 
2345
  pthread_mutex_unlock(&LOCK_status);
 
2346
}