~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_class.cc

  • Committer: Monty Taylor
  • Date: 2008-11-15 18:39:51 UTC
  • mto: (584.1.7 devel)
  • mto: This revision was merged to the branch mainline in revision 588.
  • Revision ID: monty@inaugust.com-20081115183951-jo2v3abwdu24lnwq
Split out hybrid_type_traits.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
 
 *  Copyright (C) 2008 Sun Microsystems, Inc.
 
4
 *  Copyright (C) 2008 Sun Microsystems
5
5
 *
6
6
 *  This program is free software; you can redistribute it and/or modify
7
7
 *  it under the terms of the GNU General Public License as published by
17
17
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
18
 */
19
19
 
20
 
/**
21
 
 * @file Implementation of the Session class and API
22
 
 */
23
 
 
24
 
#include "config.h"
25
 
 
26
 
#include <drizzled/copy_field.h>
27
 
#include "drizzled/session.h"
28
 
#include "drizzled/session/cache.h"
29
 
#include "drizzled/error.h"
30
 
#include "drizzled/gettext.h"
31
 
#include "drizzled/query_id.h"
32
 
#include "drizzled/data_home.h"
33
 
#include "drizzled/sql_base.h"
34
 
#include "drizzled/lock.h"
35
 
#include "drizzled/item/cache.h"
36
 
#include "drizzled/item/float.h"
37
 
#include "drizzled/item/return_int.h"
38
 
#include "drizzled/item/empty_string.h"
39
 
#include "drizzled/show.h"
40
 
#include "drizzled/plugin/client.h"
41
 
#include "drizzled/plugin/scheduler.h"
42
 
#include "drizzled/plugin/authentication.h"
43
 
#include "drizzled/plugin/logging.h"
44
 
#include "drizzled/plugin/transactional_storage_engine.h"
45
 
#include "drizzled/plugin/query_rewrite.h"
46
 
#include "drizzled/probes.h"
47
 
#include "drizzled/table_proto.h"
48
 
#include "drizzled/db.h"
49
 
#include "drizzled/pthread_globals.h"
50
 
#include "drizzled/transaction_services.h"
51
 
#include "drizzled/drizzled.h"
52
 
 
53
 
#include "drizzled/identifier.h"
54
 
 
55
 
#include <drizzled/refresh_version.h>
56
 
 
57
 
#include "drizzled/table/singular.h"
58
 
 
59
 
#include "plugin/myisam/myisam.h"
60
 
#include "drizzled/internal/iocache.h"
61
 
#include "drizzled/internal/thread_var.h"
62
 
#include "drizzled/plugin/event_observer.h"
63
 
 
64
 
#include "drizzled/util/functors.h"
65
 
 
66
 
#include "drizzled/display.h"
67
 
 
68
 
#include <algorithm>
69
 
#include <climits>
70
 
#include <fcntl.h>
 
20
 
 
21
/*****************************************************************************
 
22
**
 
23
** This file implements classes defined in sql_class.h
 
24
** Especially the classes to handle a result from a select
 
25
**
 
26
*****************************************************************************/
 
27
#include <drizzled/server_includes.h>
 
28
#include <drizzled/replication/rli.h>
 
29
#include <drizzled/replication/record.h>
 
30
#include <drizzled/log_event.h>
71
31
#include <sys/stat.h>
72
 
 
73
 
#include <boost/filesystem.hpp>
74
 
#include <boost/checked_delete.hpp>
75
 
 
76
 
#include "drizzled/util/backtrace.h"
77
 
 
78
 
using namespace std;
79
 
 
80
 
namespace fs=boost::filesystem;
81
 
namespace drizzled
82
 
{
83
 
 
 
32
#include <mysys/thr_alarm.h>
 
33
#include <mysys/mysys_err.h>
 
34
#include <drizzled/error.h>
 
35
#include <drizzled/query_id.h>
 
36
#include <drizzled/data_home.h>
 
37
 
 
38
extern scheduler_functions thread_scheduler;
84
39
/*
85
40
  The following is used to initialise Table_ident with a internal
86
41
  table name
90
45
 
91
46
const char * const Session::DEFAULT_WHERE= "field list";
92
47
 
 
48
 
 
49
/*****************************************************************************
 
50
** Instansiate templates
 
51
*****************************************************************************/
 
52
 
 
53
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
54
/* Used templates */
 
55
template class List<Key>;
 
56
template class List_iterator<Key>;
 
57
template class List<Key_part_spec>;
 
58
template class List_iterator<Key_part_spec>;
 
59
template class List<Alter_drop>;
 
60
template class List_iterator<Alter_drop>;
 
61
template class List<Alter_column>;
 
62
template class List_iterator<Alter_column>;
 
63
#endif
 
64
 
 
65
/****************************************************************************
 
66
** User variables
 
67
****************************************************************************/
 
68
 
 
69
extern "C" unsigned char *get_var_key(user_var_entry *entry, size_t *length,
 
70
                              bool not_used __attribute__((unused)))
 
71
{
 
72
  *length= entry->name.length;
 
73
  return (unsigned char*) entry->name.str;
 
74
}
 
75
 
 
76
extern "C" void free_user_var(user_var_entry *entry)
 
77
{
 
78
  char *pos= (char*) entry+ALIGN_SIZE(sizeof(*entry));
 
79
  if (entry->value && entry->value != pos)
 
80
    free(entry->value);
 
81
  free((char*) entry);
 
82
}
 
83
 
93
84
bool Key_part_spec::operator==(const Key_part_spec& other) const
94
85
{
95
86
  return length == other.length &&
96
87
         field_name.length == other.field_name.length &&
97
 
    !my_strcasecmp(system_charset_info, field_name.str, other.field_name.str);
98
 
}
99
 
 
100
 
Open_tables_state::Open_tables_state(uint64_t version_arg) :
101
 
  version(version_arg)
102
 
{
103
 
  open_tables= temporary_tables= derived_tables= NULL;
104
 
  extra_lock= lock= NULL;
 
88
         !strcmp(field_name.str, other.field_name.str);
 
89
}
 
90
 
 
91
/**
 
92
  Construct an (almost) deep copy of this key. Only those
 
93
  elements that are known to never change are not copied.
 
94
  If out of memory, a partial copy is returned and an error is set
 
95
  in Session.
 
96
*/
 
97
 
 
98
Key::Key(const Key &rhs, MEM_ROOT *mem_root)
 
99
  :type(rhs.type),
 
100
  key_create_info(rhs.key_create_info),
 
101
  columns(rhs.columns, mem_root),
 
102
  name(rhs.name),
 
103
  generated(rhs.generated)
 
104
{
 
105
  list_copy_and_replace_each_value(columns, mem_root);
 
106
}
 
107
 
 
108
/**
 
109
  Construct an (almost) deep copy of this foreign key. Only those
 
110
  elements that are known to never change are not copied.
 
111
  If out of memory, a partial copy is returned and an error is set
 
112
  in Session.
 
113
*/
 
114
 
 
115
Foreign_key::Foreign_key(const Foreign_key &rhs, MEM_ROOT *mem_root)
 
116
  :Key(rhs),
 
117
  ref_table(rhs.ref_table),
 
118
  ref_columns(rhs.ref_columns),
 
119
  delete_opt(rhs.delete_opt),
 
120
  update_opt(rhs.update_opt),
 
121
  match_opt(rhs.match_opt)
 
122
{
 
123
  list_copy_and_replace_each_value(ref_columns, mem_root);
 
124
}
 
125
 
 
126
/*
 
127
  Test if a foreign key (= generated key) is a prefix of the given key
 
128
  (ignoring key name, key type and order of columns)
 
129
 
 
130
  NOTES:
 
131
    This is only used to test if an index for a FOREIGN KEY exists
 
132
 
 
133
  IMPLEMENTATION
 
134
    We only compare field names
 
135
 
 
136
  RETURN
 
137
    0   Generated key is a prefix of other key
 
138
    1   Not equal
 
139
*/
 
140
 
 
141
bool foreign_key_prefix(Key *a, Key *b)
 
142
{
 
143
  /* Ensure that 'a' is the generated key */
 
144
  if (a->generated)
 
145
  {
 
146
    if (b->generated && a->columns.elements > b->columns.elements)
 
147
      std::swap(a, b);                       // Put shorter key in 'a'
 
148
  }
 
149
  else
 
150
  {
 
151
    if (!b->generated)
 
152
      return true;                              // No foreign key
 
153
    std::swap(a, b);                       // Put generated key in 'a'
 
154
  }
 
155
 
 
156
  /* Test if 'a' is a prefix of 'b' */
 
157
  if (a->columns.elements > b->columns.elements)
 
158
    return true;                                // Can't be prefix
 
159
 
 
160
  List_iterator<Key_part_spec> col_it1(a->columns);
 
161
  List_iterator<Key_part_spec> col_it2(b->columns);
 
162
  const Key_part_spec *col1, *col2;
 
163
 
 
164
#ifdef ENABLE_WHEN_INNODB_CAN_HANDLE_SWAPED_FOREIGN_KEY_COLUMNS
 
165
  while ((col1= col_it1++))
 
166
  {
 
167
    bool found= 0;
 
168
    col_it2.rewind();
 
169
    while ((col2= col_it2++))
 
170
    {
 
171
      if (*col1 == *col2)
 
172
      {
 
173
        found= true;
 
174
        break;
 
175
      }
 
176
    }
 
177
    if (!found)
 
178
      return true;                              // Error
 
179
  }
 
180
  return false;                                 // Is prefix
 
181
#else
 
182
  while ((col1= col_it1++))
 
183
  {
 
184
    col2= col_it2++;
 
185
    if (!(*col1 == *col2))
 
186
      return true;
 
187
  }
 
188
  return false;                                 // Is prefix
 
189
#endif
 
190
}
 
191
 
 
192
 
 
193
/*
 
194
  Check if the foreign key options are compatible with columns
 
195
  on which the FK is created.
 
196
 
 
197
  RETURN
 
198
    0   Key valid
 
199
    1   Key invalid
 
200
*/
 
201
bool Foreign_key::validate(List<Create_field> &table_fields)
 
202
{
 
203
  Create_field  *sql_field;
 
204
  Key_part_spec *column;
 
205
  List_iterator<Key_part_spec> cols(columns);
 
206
  List_iterator<Create_field> it(table_fields);
 
207
  while ((column= cols++))
 
208
  {
 
209
    it.rewind();
 
210
    while ((sql_field= it++) &&
 
211
           my_strcasecmp(system_charset_info,
 
212
                         column->field_name.str,
 
213
                         sql_field->field_name)) {}
 
214
    if (!sql_field)
 
215
    {
 
216
      my_error(ER_KEY_COLUMN_DOES_NOT_EXITS, MYF(0), column->field_name.str);
 
217
      return true;
 
218
    }
 
219
    if (type == Key::FOREIGN_KEY && sql_field->vcol_info)
 
220
    {
 
221
      if (delete_opt == FK_OPTION_SET_NULL)
 
222
      {
 
223
        my_error(ER_WRONG_FK_OPTION_FOR_VIRTUAL_COLUMN, MYF(0), 
 
224
                 "ON DELETE SET NULL");
 
225
        return true;
 
226
      }
 
227
      if (update_opt == FK_OPTION_SET_NULL)
 
228
      {
 
229
        my_error(ER_WRONG_FK_OPTION_FOR_VIRTUAL_COLUMN, MYF(0), 
 
230
                 "ON UPDATE SET NULL");
 
231
        return true;
 
232
      }
 
233
      if (update_opt == FK_OPTION_CASCADE)
 
234
      {
 
235
        my_error(ER_WRONG_FK_OPTION_FOR_VIRTUAL_COLUMN, MYF(0), 
 
236
                 "ON UPDATE CASCADE");
 
237
        return true;
 
238
      }
 
239
    }
 
240
  }
 
241
  return false;
 
242
}
 
243
 
 
244
 
 
245
/****************************************************************************
 
246
** Thread specific functions
 
247
****************************************************************************/
 
248
 
 
249
Open_tables_state::Open_tables_state(ulong version_arg)
 
250
  :version(version_arg), state_flags(0U)
 
251
{
 
252
  reset_open_tables_state();
105
253
}
106
254
 
107
255
/*
108
256
  The following functions form part of the C plugin API
109
257
*/
110
 
int tmpfile(const char *prefix)
 
258
 
 
259
extern "C" int mysql_tmpfile(const char *prefix)
111
260
{
112
261
  char filename[FN_REFLEN];
113
 
  int fd = internal::create_temp_file(filename, drizzle_tmpdir.c_str(), prefix, MYF(MY_WME));
 
262
  File fd = create_temp_file(filename, drizzle_tmpdir, prefix,
 
263
                             O_CREAT | O_EXCL | O_RDWR,
 
264
                             MYF(MY_WME));
114
265
  if (fd >= 0) {
115
266
    unlink(filename);
116
267
  }
118
269
  return fd;
119
270
}
120
271
 
121
 
void **Session::getEngineData(const plugin::MonitoredInTransaction *monitored)
122
 
{
123
 
  return static_cast<void **>(&ha_data[monitored->getId()].ha_ptr);
124
 
}
125
 
 
126
 
ResourceContext *Session::getResourceContext(const plugin::MonitoredInTransaction *monitored,
127
 
                                             size_t index)
128
 
{
129
 
  return &ha_data[monitored->getId()].resource_context[index];
130
 
}
131
 
 
 
272
 
 
273
extern "C"
 
274
int session_in_lock_tables(const Session *session)
 
275
{
 
276
  return test(session->in_lock_tables);
 
277
}
 
278
 
 
279
 
 
280
extern "C"
 
281
int session_tablespace_op(const Session *session)
 
282
{
 
283
  return test(session->tablespace_op);
 
284
}
 
285
 
 
286
 
 
287
/**
 
288
   Set the process info field of the Session structure.
 
289
 
 
290
   This function is used by plug-ins. Internally, the
 
291
   Session::set_proc_info() function should be used.
 
292
 
 
293
   @see Session::set_proc_info
 
294
 */
 
295
extern "C" void
 
296
set_session_proc_info(Session *session, const char *info)
 
297
{
 
298
  session->set_proc_info(info);
 
299
}
 
300
 
 
301
extern "C"
 
302
const char *get_session_proc_info(Session *session)
 
303
{
 
304
  return session->get_proc_info();
 
305
}
 
306
 
 
307
extern "C"
 
308
void **session_ha_data(const Session *session, const struct handlerton *hton)
 
309
{
 
310
  return (void **) &session->ha_data[hton->slot].ha_ptr;
 
311
}
 
312
 
 
313
extern "C"
132
314
int64_t session_test_options(const Session *session, int64_t test_options)
133
315
{
134
316
  return session->options & test_options;
135
317
}
136
318
 
137
 
Session::Session(plugin::Client *client_arg, catalog::Instance::shared_ptr catalog_arg) :
138
 
  Open_tables_state(refresh_version),
139
 
  mem_root(&main_mem_root),
140
 
  xa_id(0),
141
 
  lex(&main_lex),
142
 
  query(new std::string),
143
 
  _schema(new std::string("")),
144
 
  client(client_arg),
145
 
  scheduler(NULL),
146
 
  scheduler_arg(NULL),
147
 
  lock_id(&main_lock_id),
148
 
  thread_stack(NULL),
149
 
  security_ctx(identifier::User::make_shared()),
150
 
  _where(Session::DEFAULT_WHERE),
151
 
  dbug_sentry(Session_SENTRY_MAGIC),
152
 
  mysys_var(0),
153
 
  command(COM_CONNECT),
154
 
  file_id(0),
155
 
  _epoch(boost::gregorian::date(1970,1,1)),
156
 
  _connect_time(boost::posix_time::microsec_clock::universal_time()),
157
 
  utime_after_lock(0),
158
 
  ha_data(plugin::num_trx_monitored_objects),
159
 
  query_id(0),
160
 
  warn_query_id(0),
161
 
  concurrent_execute_allowed(true),
162
 
  arg_of_last_insert_id_function(false),
163
 
  first_successful_insert_id_in_prev_stmt(0),
164
 
  first_successful_insert_id_in_cur_stmt(0),
165
 
  limit_found_rows(0),
166
 
  options(session_startup_options),
167
 
  row_count_func(-1),
168
 
  sent_row_count(0),
169
 
  examined_row_count(0),
170
 
  used_tables(0),
171
 
  total_warn_count(0),
172
 
  col_access(0),
173
 
  statement_id_counter(0),
174
 
  row_count(0),
175
 
  thread_id(0),
176
 
  tmp_table(0),
177
 
  _global_read_lock(NONE),
178
 
  count_cuted_fields(CHECK_FIELD_ERROR_FOR_NULL),
179
 
  _killed(NOT_KILLED),
180
 
  some_tables_deleted(false),
181
 
  no_errors(false),
182
 
  password(false),
183
 
  is_fatal_error(false),
184
 
  transaction_rollback_request(false),
185
 
  is_fatal_sub_stmt_error(0),
186
 
  tablespace_op(false),
187
 
  derived_tables_processing(false),
188
 
  m_lip(NULL),
189
 
  cached_table(0),
190
 
  transaction_message(NULL),
191
 
  statement_message(NULL),
192
 
  session_event_observers(NULL),
193
 
  _catalog(catalog_arg),
194
 
  use_usage(false)
195
 
{
196
 
  client->setSession(this);
 
319
extern "C"
 
320
int session_sql_command(const Session *session)
 
321
{
 
322
  return (int) session->lex->sql_command;
 
323
}
 
324
 
 
325
extern "C"
 
326
int session_tx_isolation(const Session *session)
 
327
{
 
328
  return (int) session->variables.tx_isolation;
 
329
}
 
330
 
 
331
extern "C"
 
332
void session_inc_row_count(Session *session)
 
333
{
 
334
  session->row_count++;
 
335
}
 
336
 
 
337
/**
 
338
  Clear this diagnostics area. 
 
339
 
 
340
  Normally called at the end of a statement.
 
341
*/
 
342
 
 
343
void
 
344
Diagnostics_area::reset_diagnostics_area()
 
345
{
 
346
  can_overwrite_status= false;
 
347
  /** Don't take chances in production */
 
348
  m_message[0]= '\0';
 
349
  m_sql_errno= 0;
 
350
  m_server_status= 0;
 
351
  m_affected_rows= 0;
 
352
  m_last_insert_id= 0;
 
353
  m_total_warn_count= 0;
 
354
  is_sent= false;
 
355
  /** Tiny reset in debug mode to see garbage right away */
 
356
  m_status= DA_EMPTY;
 
357
}
 
358
 
 
359
 
 
360
/**
 
361
  Set OK status -- ends commands that do not return a
 
362
  result set, e.g. INSERT/UPDATE/DELETE.
 
363
*/
 
364
 
 
365
void
 
366
Diagnostics_area::set_ok_status(Session *session, ha_rows affected_rows_arg,
 
367
                                uint64_t last_insert_id_arg,
 
368
                                const char *message_arg)
 
369
{
 
370
  assert(! is_set());
 
371
  /*
 
372
    In production, refuse to overwrite an error or a custom response
 
373
    with an OK packet.
 
374
  */
 
375
  if (is_error() || is_disabled())
 
376
    return;
 
377
  /** Only allowed to report success if has not yet reported an error */
 
378
 
 
379
  m_server_status= session->server_status;
 
380
  m_total_warn_count= session->total_warn_count;
 
381
  m_affected_rows= affected_rows_arg;
 
382
  m_last_insert_id= last_insert_id_arg;
 
383
  if (message_arg)
 
384
    strmake(m_message, message_arg, sizeof(m_message) - 1);
 
385
  else
 
386
    m_message[0]= '\0';
 
387
  m_status= DA_OK;
 
388
}
 
389
 
 
390
 
 
391
/**
 
392
  Set EOF status.
 
393
*/
 
394
 
 
395
void
 
396
Diagnostics_area::set_eof_status(Session *session)
 
397
{
 
398
  /** Only allowed to report eof if has not yet reported an error */
 
399
 
 
400
  assert(! is_set());
 
401
  /*
 
402
    In production, refuse to overwrite an error or a custom response
 
403
    with an EOF packet.
 
404
  */
 
405
  if (is_error() || is_disabled())
 
406
    return;
 
407
 
 
408
  m_server_status= session->server_status;
 
409
  /*
 
410
    If inside a stored procedure, do not return the total
 
411
    number of warnings, since they are not available to the client
 
412
    anyway.
 
413
  */
 
414
  m_total_warn_count= session->total_warn_count;
 
415
 
 
416
  m_status= DA_EOF;
 
417
}
 
418
 
 
419
/**
 
420
  Set ERROR status.
 
421
*/
 
422
 
 
423
void
 
424
Diagnostics_area::set_error_status(Session *session __attribute__((unused)),
 
425
                                   uint32_t sql_errno_arg,
 
426
                                   const char *message_arg)
 
427
{
 
428
  /*
 
429
    Only allowed to report error if has not yet reported a success
 
430
    The only exception is when we flush the message to the client,
 
431
    an error can happen during the flush.
 
432
  */
 
433
  assert(! is_set() || can_overwrite_status);
 
434
  /*
 
435
    In production, refuse to overwrite a custom response with an
 
436
    ERROR packet.
 
437
  */
 
438
  if (is_disabled())
 
439
    return;
 
440
 
 
441
  m_sql_errno= sql_errno_arg;
 
442
  strmake(m_message, message_arg, sizeof(m_message) - 1);
 
443
 
 
444
  m_status= DA_ERROR;
 
445
}
 
446
 
 
447
 
 
448
/**
 
449
  Mark the diagnostics area as 'DISABLED'.
 
450
 
 
451
  This is used in rare cases when the COM_ command at hand sends a response
 
452
  in a custom format. One example is the query cache, another is
 
453
  COM_STMT_PREPARE.
 
454
*/
 
455
 
 
456
void
 
457
Diagnostics_area::disable_status()
 
458
{
 
459
  assert(! is_set());
 
460
  m_status= DA_DISABLED;
 
461
}
 
462
 
 
463
 
 
464
Session::Session()
 
465
   :Statement(&main_lex, &main_mem_root,
 
466
              /* statement id */ 0),
 
467
   Open_tables_state(refresh_version), rli_fake(0),
 
468
   lock_id(&main_lock_id),
 
469
   user_time(0),
 
470
   binlog_table_maps(0), binlog_flags(0UL),
 
471
   arg_of_last_insert_id_function(false),
 
472
   first_successful_insert_id_in_prev_stmt(0),
 
473
   first_successful_insert_id_in_prev_stmt_for_binlog(0),
 
474
   first_successful_insert_id_in_cur_stmt(0),
 
475
   stmt_depends_on_first_successful_insert_id_in_prev_stmt(false),
 
476
   global_read_lock(0),
 
477
   is_fatal_error(0),
 
478
   transaction_rollback_request(0),
 
479
   is_fatal_sub_stmt_error(0),
 
480
   rand_used(0),
 
481
   time_zone_used(0),
 
482
   in_lock_tables(0),
 
483
   derived_tables_processing(false),
 
484
   m_lip(NULL)
 
485
{
 
486
  ulong tmp;
197
487
 
198
488
  /*
199
489
    Pass nominal parameters to init_alloc_root only to ensure that
200
490
    the destructor works OK in case of an error. The main_mem_root
201
491
    will be re-initialized in init_for_queries().
202
492
  */
203
 
  memory::init_sql_alloc(&main_mem_root, memory::ROOT_MIN_BLOCK_SIZE, 0);
 
493
  init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0);
 
494
  thread_stack= 0;
 
495
  catalog= (char*)"std"; // the only catalog we have for now
 
496
  main_security_ctx.init();
 
497
  security_ctx= &main_security_ctx;
 
498
  some_tables_deleted=no_errors=password= 0;
 
499
  query_start_used= 0;
 
500
  count_cuted_fields= CHECK_FIELD_IGNORE;
 
501
  killed= NOT_KILLED;
 
502
  col_access=0;
 
503
  is_slave_error= thread_specific_used= false;
 
504
  hash_clear(&handler_tables_hash);
 
505
  tmp_table=0;
 
506
  used_tables=0;
204
507
  cuted_fields= sent_row_count= row_count= 0L;
 
508
  limit_found_rows= 0;
 
509
  row_count_func= -1;
 
510
  statement_id_counter= 0UL;
205
511
  // Must be reset to handle error with Session's created for init of mysqld
206
512
  lex->current_select= 0;
 
513
  start_time=(time_t) 0;
 
514
  start_utime= 0L;
 
515
  utime_after_lock= 0L;
 
516
  current_linfo =  0;
 
517
  slave_thread = 0;
207
518
  memset(&variables, 0, sizeof(variables));
208
 
  scoreboard_index= -1;
209
 
  cleanup_done= abort_on_warning= no_warnings_for_error= false;  
210
 
 
211
 
  /* query_cache init */
212
 
  query_cache_key= "";
213
 
  resultset= NULL;
 
519
  thread_id= 0;
 
520
  one_shot_set= 0;
 
521
  file_id = 0;
 
522
  query_id= 0;
 
523
  warn_id= 0;
 
524
  db_charset= global_system_variables.collation_database;
 
525
  memset(ha_data, 0, sizeof(ha_data));
 
526
  mysys_var=0;
 
527
  binlog_evt_union.do_union= false;
 
528
  dbug_sentry=Session_SENTRY_MAGIC;
 
529
  net.vio=0;
 
530
  client_capabilities= 0;                       // minimalistic client
 
531
  system_thread= NON_SYSTEM_THREAD;
 
532
  cleanup_done= abort_on_warning= no_warnings_for_error= 0;
 
533
  peer_port= 0;                                 // For SHOW PROCESSLIST
 
534
  transaction.m_pending_rows_event= 0;
 
535
  transaction.on= 1;
 
536
  pthread_mutex_init(&LOCK_delete, MY_MUTEX_INIT_FAST);
214
537
 
215
538
  /* Variables with default values */
216
539
  proc_info="login";
217
 
 
218
 
  plugin_sessionvar_init(this);
219
 
  /*
220
 
    variables= global_system_variables above has reset
221
 
    variables.pseudo_thread_id to 0. We need to correct it here to
222
 
    avoid temporary tables replication failure.
223
 
  */
224
 
  variables.pseudo_thread_id= thread_id;
225
 
  server_status= SERVER_STATUS_AUTOCOMMIT;
226
 
 
227
 
  if (variables.max_join_size == HA_POS_ERROR)
228
 
    options |= OPTION_BIG_SELECTS;
229
 
  else
230
 
    options &= ~OPTION_BIG_SELECTS;
231
 
 
232
 
  open_options=ha_open_options;
233
 
  update_lock_default= TL_WRITE;
234
 
  session_tx_isolation= (enum_tx_isolation) variables.tx_isolation;
235
 
  warn_list.empty();
236
 
  memset(warn_count, 0, sizeof(warn_count));
237
 
  memset(&status_var, 0, sizeof(status_var));
238
 
 
 
540
  where= Session::DEFAULT_WHERE;
 
541
  server_id = ::server_id;
 
542
  slave_net = 0;
 
543
  command=COM_CONNECT;
 
544
  *scramble= '\0';
 
545
 
 
546
  init();
239
547
  /* Initialize sub structures */
240
 
  memory::init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
241
 
 
 
548
  init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
 
549
  user_connect=(USER_CONN *)0;
 
550
  hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0,
 
551
            (hash_get_key) get_var_key,
 
552
            (hash_free_key) free_user_var, 0);
 
553
 
 
554
  /* For user vars replication*/
 
555
  if (opt_bin_log)
 
556
    my_init_dynamic_array(&user_var_events,
 
557
                          sizeof(BINLOG_USER_VAR_EVENT *), 16, 16);
 
558
  else
 
559
    memset(&user_var_events, 0, sizeof(user_var_events));
 
560
 
 
561
  /* Protocol */
 
562
  protocol= &protocol_text;                     // Default protocol
 
563
  protocol_text.init(this);
 
564
 
 
565
  const Query_id& query_id= Query_id::get_query_id();
 
566
  tablespace_op= false;
 
567
  tmp= sql_rnd();
 
568
  randominit(&rand, tmp + (ulong) &rand, tmp + query_id.value());
242
569
  substitute_null_with_insert_id = false;
243
 
  lock_info.init(); /* safety: will be reset after start */
 
570
  thr_lock_info_init(&lock_info); /* safety: will be reset after start */
244
571
  thr_lock_owner_init(&main_lock_id, &lock_info);
245
572
 
246
573
  m_internal_handler= NULL;
247
 
  
248
 
  plugin::EventObserver::registerSessionEvents(*this); 
249
574
}
250
575
 
251
 
void Session::free_items()
252
 
{
253
 
  Item *next;
254
 
  /* This works because items are allocated with memory::sql_alloc() */
255
 
  for (; free_list; free_list= next)
256
 
  {
257
 
    next= free_list->next;
258
 
    free_list->delete_self();
259
 
  }
260
 
}
261
576
 
262
577
void Session::push_internal_handler(Internal_error_handler *handler)
263
578
{
269
584
  m_internal_handler= handler;
270
585
}
271
586
 
272
 
bool Session::handle_error(drizzled::error_t sql_errno, const char *message,
273
 
                           DRIZZLE_ERROR::enum_warning_level level)
 
587
 
 
588
bool Session::handle_error(uint32_t sql_errno, const char *message,
 
589
                       DRIZZLE_ERROR::enum_warning_level level)
274
590
{
275
591
  if (m_internal_handler)
276
592
  {
280
596
  return false;                                 // 'false', as per coding style
281
597
}
282
598
 
283
 
void Session::setAbort(bool arg)
284
 
{
285
 
  mysys_var->abort= arg;
286
 
}
287
 
 
288
 
void Session::lockOnSys()
289
 
{
290
 
  if (not mysys_var)
291
 
    return;
292
 
 
293
 
  setAbort(true);
294
 
  boost_unique_lock_t scopedLock(mysys_var->mutex);
295
 
  if (mysys_var->current_cond)
296
 
  {
297
 
    mysys_var->current_mutex->lock();
298
 
    mysys_var->current_cond->notify_all();
299
 
    mysys_var->current_mutex->unlock();
300
 
  }
301
 
}
302
599
 
303
600
void Session::pop_internal_handler()
304
601
{
306
603
  m_internal_handler= NULL;
307
604
}
308
605
 
309
 
void Session::get_xid(DRIZZLE_XID *xid)
310
 
{
311
 
  *xid = *(DRIZZLE_XID *) &transaction.xid_state.xid;
312
 
}
 
606
#if defined(__cplusplus)
 
607
extern "C" {
 
608
#endif
 
609
 
 
610
void *session_alloc(Session *session, unsigned int size)
 
611
{
 
612
  return session->alloc(size);
 
613
}
 
614
 
 
615
void *session_calloc(Session *session, unsigned int size)
 
616
{
 
617
  return session->calloc(size);
 
618
}
 
619
 
 
620
char *session_strdup(Session *session, const char *str)
 
621
{
 
622
  return session->strdup(str);
 
623
}
 
624
 
 
625
char *session_strmake(Session *session, const char *str, unsigned int size)
 
626
{
 
627
  return session->strmake(str, size);
 
628
}
 
629
 
 
630
void *session_memdup(Session *session, const void* str, unsigned int size)
 
631
{
 
632
  return session->memdup(str, size);
 
633
}
 
634
 
 
635
void session_get_xid(const Session *session, DRIZZLE_XID *xid)
 
636
{
 
637
  *xid = *(DRIZZLE_XID *) &session->transaction.xid_state.xid;
 
638
}
 
639
 
 
640
#if defined(__cplusplus)
 
641
}
 
642
#endif
 
643
 
 
644
/*
 
645
  Init common variables that has to be reset on start and on change_user
 
646
*/
 
647
 
 
648
void Session::init(void)
 
649
{
 
650
  pthread_mutex_lock(&LOCK_global_system_variables);
 
651
  plugin_sessionvar_init(this);
 
652
  variables.time_format= date_time_format_copy((Session*) 0,
 
653
                                               variables.time_format);
 
654
  variables.date_format= date_time_format_copy((Session*) 0,
 
655
                                               variables.date_format);
 
656
  variables.datetime_format= date_time_format_copy((Session*) 0,
 
657
                                                   variables.datetime_format);
 
658
  /*
 
659
    variables= global_system_variables above has reset
 
660
    variables.pseudo_thread_id to 0. We need to correct it here to
 
661
    avoid temporary tables replication failure.
 
662
  */
 
663
  variables.pseudo_thread_id= thread_id;
 
664
  pthread_mutex_unlock(&LOCK_global_system_variables);
 
665
  server_status= SERVER_STATUS_AUTOCOMMIT;
 
666
  options= session_startup_options;
 
667
 
 
668
  if (variables.max_join_size == HA_POS_ERROR)
 
669
    options |= OPTION_BIG_SELECTS;
 
670
  else
 
671
    options &= ~OPTION_BIG_SELECTS;
 
672
 
 
673
  transaction.all.modified_non_trans_table= transaction.stmt.modified_non_trans_table= false;
 
674
  open_options=ha_open_options;
 
675
  update_lock_default= (variables.low_priority_updates ?
 
676
                        TL_WRITE_LOW_PRIORITY :
 
677
                        TL_WRITE);
 
678
  session_tx_isolation= (enum_tx_isolation) variables.tx_isolation;
 
679
  warn_list.empty();
 
680
  memset(warn_count, 0, sizeof(warn_count));
 
681
  total_warn_count= 0;
 
682
  update_charset();
 
683
  reset_current_stmt_binlog_row_based();
 
684
  memset(&status_var, 0, sizeof(status_var));
 
685
}
 
686
 
 
687
 
 
688
/*
 
689
  Init Session for query processing.
 
690
  This has to be called once before we call mysql_parse.
 
691
  See also comments in sql_class.h.
 
692
*/
 
693
 
 
694
void Session::init_for_queries()
 
695
{
 
696
  set_time(); 
 
697
  ha_enable_transaction(this,true);
 
698
 
 
699
  reset_root_defaults(mem_root, variables.query_alloc_block_size,
 
700
                      variables.query_prealloc_size);
 
701
  reset_root_defaults(&transaction.mem_root,
 
702
                      variables.trans_alloc_block_size,
 
703
                      variables.trans_prealloc_size);
 
704
  transaction.xid_state.xid.null();
 
705
  transaction.xid_state.in_session=1;
 
706
}
 
707
 
313
708
 
314
709
/* Do operations that may take a long time */
315
710
 
316
711
void Session::cleanup(void)
317
712
{
318
 
  assert(cleanup_done == false);
 
713
  assert(cleanup_done == 0);
319
714
 
320
 
  setKilled(KILL_CONNECTION);
 
715
  killed= KILL_CONNECTION;
321
716
#ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE
322
717
  if (transaction.xid_state.xa_state == XA_PREPARED)
323
718
  {
325
720
  }
326
721
#endif
327
722
  {
328
 
    TransactionServices &transaction_services= TransactionServices::singleton();
329
 
    transaction_services.rollbackTransaction(*this, true);
 
723
    ha_rollback(this);
330
724
    xid_cache_delete(&transaction.xid_state);
331
725
  }
332
 
 
333
 
  for (UserVars::iterator iter= user_vars.begin();
334
 
       iter != user_vars.end();
335
 
       iter++)
 
726
  if (locked_tables)
336
727
  {
337
 
    user_var_entry *entry= (*iter).second;
338
 
    boost::checked_delete(entry);
 
728
    lock=locked_tables; locked_tables=0;
 
729
    close_thread_tables(this);
339
730
  }
340
 
  user_vars.clear();
341
 
 
342
 
 
343
 
  close_temporary_tables();
344
 
 
 
731
  mysql_ha_cleanup(this);
 
732
  delete_dynamic(&user_var_events);
 
733
  hash_free(&user_vars);
 
734
  close_temporary_tables(this);
 
735
  free((char*) variables.time_format);
 
736
  free((char*) variables.date_format);
 
737
  free((char*) variables.datetime_format);
 
738
  
345
739
  if (global_read_lock)
346
 
  {
347
 
    unlockGlobalReadLock();
348
 
  }
 
740
    unlock_global_read_lock(this);
349
741
 
350
 
  cleanup_done= true;
 
742
  cleanup_done=1;
 
743
  return;
351
744
}
352
745
 
353
746
Session::~Session()
354
747
{
355
 
  this->checkSentry();
356
 
 
357
 
  if (client and client->isConnected())
358
 
  {
359
 
    assert(security_ctx);
360
 
    if (global_system_variables.log_warnings)
361
 
    {
362
 
      errmsg_printf(error::WARN, ER(ER_FORCING_CLOSE),
363
 
                    internal::my_progname,
364
 
                    thread_id,
365
 
                    security_ctx->username().c_str());
366
 
    }
367
 
 
368
 
    disconnect();
369
 
  }
 
748
  Session_CHECK_SENTRY(this);
 
749
  /* Ensure that no one is using Session */
 
750
  pthread_mutex_lock(&LOCK_delete);
 
751
  pthread_mutex_unlock(&LOCK_delete);
 
752
  add_to_status(&global_status_var, &status_var);
370
753
 
371
754
  /* Close connection */
372
 
  if (client)
 
755
  if (net.vio)
373
756
  {
374
 
    client->close();
375
 
    boost::checked_delete(client);
376
 
    client= NULL;
 
757
    net_close(&net);
 
758
    net_end(&net);
377
759
  }
378
 
 
379
 
  if (cleanup_done == false)
 
760
  if (!cleanup_done)
380
761
    cleanup();
381
762
 
382
 
  plugin::StorageEngine::closeConnection(this);
 
763
  ha_close_connection(this);
383
764
  plugin_sessionvar_cleanup(this);
384
765
 
385
 
  warn_root.free_root(MYF(0));
 
766
  main_security_ctx.destroy();
 
767
  if (db)
 
768
  {
 
769
    free(db);
 
770
    db= NULL;
 
771
  }
 
772
  free_root(&warn_root,MYF(0));
 
773
  free_root(&transaction.mem_root,MYF(0));
386
774
  mysys_var=0;                                  // Safety (shouldn't be needed)
 
775
  pthread_mutex_destroy(&LOCK_delete);
387
776
  dbug_sentry= Session_SENTRY_GONE;
388
 
 
389
 
  main_mem_root.free_root(MYF(0));
390
 
  currentMemRoot().release();
391
 
  currentSession().release();
392
 
 
393
 
  plugin::Logging::postEndDo(this);
394
 
  plugin::EventObserver::deregisterSessionEvents(*this); 
395
 
}
396
 
 
397
 
void Session::setClient(plugin::Client *client_arg)
398
 
{
399
 
  client= client_arg;
400
 
  client->setSession(this);
401
 
}
402
 
 
403
 
void Session::awake(Session::killed_state_t state_to_set)
404
 
{
405
 
  if ((state_to_set == Session::KILL_QUERY) and (command == COM_SLEEP))
406
 
    return;
407
 
 
408
 
  this->checkSentry();
409
 
 
410
 
  setKilled(state_to_set);
411
 
  scheduler->killSession(this);
412
 
 
 
777
  if (rli_fake)
 
778
  {
 
779
    delete rli_fake;
 
780
    rli_fake= NULL;
 
781
  }
 
782
  
 
783
  free_root(&main_mem_root, MYF(0));
 
784
  return;
 
785
}
 
786
 
 
787
 
 
788
/*
 
789
  Add all status variables to another status variable array
 
790
 
 
791
  SYNOPSIS
 
792
   add_to_status()
 
793
   to_var       add to this array
 
794
   from_var     from this array
 
795
 
 
796
  NOTES
 
797
    This function assumes that all variables are long/ulong.
 
798
    If this assumption will change, then we have to explictely add
 
799
    the other variables after the while loop
 
800
*/
 
801
 
 
802
void add_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var)
 
803
{
 
804
  ulong *end= (ulong*) ((unsigned char*) to_var +
 
805
                        offsetof(STATUS_VAR, last_system_status_var) +
 
806
                        sizeof(ulong));
 
807
  ulong *to= (ulong*) to_var, *from= (ulong*) from_var;
 
808
 
 
809
  while (to != end)
 
810
    *(to++)+= *(from++);
 
811
}
 
812
 
 
813
/*
 
814
  Add the difference between two status variable arrays to another one.
 
815
 
 
816
  SYNOPSIS
 
817
    add_diff_to_status
 
818
    to_var       add to this array
 
819
    from_var     from this array
 
820
    dec_var      minus this array
 
821
  
 
822
  NOTE
 
823
    This function assumes that all variables are long/ulong.
 
824
*/
 
825
 
 
826
void add_diff_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var,
 
827
                        STATUS_VAR *dec_var)
 
828
{
 
829
  ulong *end= (ulong*) ((unsigned char*) to_var + offsetof(STATUS_VAR,
 
830
                                                  last_system_status_var) +
 
831
                        sizeof(ulong));
 
832
  ulong *to= (ulong*) to_var, *from= (ulong*) from_var, *dec= (ulong*) dec_var;
 
833
 
 
834
  while (to != end)
 
835
    *(to++)+= *(from++) - *(dec++);
 
836
}
 
837
 
 
838
 
 
839
void Session::awake(Session::killed_state state_to_set)
 
840
{
 
841
  Session_CHECK_SENTRY(this);
 
842
  safe_mutex_assert_owner(&LOCK_delete); 
 
843
 
 
844
  killed= state_to_set;
413
845
  if (state_to_set != Session::KILL_QUERY)
414
846
  {
415
 
    DRIZZLE_CONNECTION_DONE(thread_id);
 
847
    thr_alarm_kill(thread_id);
 
848
    if (!slave_thread)
 
849
      thread_scheduler.post_kill_notification(this);
416
850
  }
417
 
 
418
851
  if (mysys_var)
419
852
  {
420
 
    boost_unique_lock_t scopedLock(mysys_var->mutex);
 
853
    pthread_mutex_lock(&mysys_var->mutex);
 
854
    if (!system_thread)         // Don't abort locks
 
855
      mysys_var->abort=1;
421
856
    /*
422
 
      "
423
857
      This broadcast could be up in the air if the victim thread
424
858
      exits the cond in the time between read and broadcast, but that is
425
859
      ok since all we want to do is to make the victim thread get out
440
874
    */
441
875
    if (mysys_var->current_cond && mysys_var->current_mutex)
442
876
    {
443
 
      mysys_var->current_mutex->lock();
444
 
      mysys_var->current_cond->notify_all();
445
 
      mysys_var->current_mutex->unlock();
 
877
      pthread_mutex_lock(mysys_var->current_mutex);
 
878
      pthread_cond_broadcast(mysys_var->current_cond);
 
879
      pthread_mutex_unlock(mysys_var->current_mutex);
446
880
    }
 
881
    pthread_mutex_unlock(&mysys_var->mutex);
447
882
  }
 
883
  return;
448
884
}
449
885
 
450
886
/*
451
887
  Remember the location of thread info, the structure needed for
452
 
  memory::sql_alloc() and the structure for the net buffer
 
888
  sql_alloc() and the structure for the net buffer
453
889
*/
454
 
bool Session::storeGlobals()
 
890
 
 
891
bool Session::store_globals()
455
892
{
456
893
  /*
457
894
    Assert that thread_stack is initialized: it's necessary to be able
459
896
  */
460
897
  assert(thread_stack);
461
898
 
462
 
  currentSession().release();
463
 
  currentSession().reset(this);
464
 
 
465
 
  currentMemRoot().release();
466
 
  currentMemRoot().reset(&mem_root);
467
 
 
 
899
  if (pthread_setspecific(THR_Session,  this) ||
 
900
      pthread_setspecific(THR_MALLOC, &mem_root))
 
901
    return 1;
468
902
  mysys_var=my_thread_var;
469
 
 
470
903
  /*
471
904
    Let mysqld define the thread id (not mysys)
472
905
    This allows us to move Session to different threads if needed.
473
906
  */
474
907
  mysys_var->id= thread_id;
 
908
  real_id= pthread_self();                      // For debugging
475
909
 
476
910
  /*
477
911
    We have to call thr_lock_info_init() again here as Session may have been
478
912
    created in another thread
479
913
  */
480
 
  lock_info.init();
481
 
 
482
 
  return false;
483
 
}
484
 
 
485
 
/*
486
 
  Init Session for query processing.
487
 
  This has to be called once before we call mysql_parse.
488
 
  See also comments in session.h.
489
 
*/
490
 
 
491
 
void Session::prepareForQueries()
492
 
{
493
 
  if (variables.max_join_size == HA_POS_ERROR)
494
 
    options |= OPTION_BIG_SELECTS;
495
 
 
496
 
  version= refresh_version;
497
 
  set_proc_info(NULL);
498
 
  command= COM_SLEEP;
499
 
  set_time();
500
 
 
501
 
  mem_root->reset_root_defaults(variables.query_alloc_block_size,
502
 
                                variables.query_prealloc_size);
503
 
  transaction.xid_state.xid.null();
504
 
  transaction.xid_state.in_session=1;
505
 
  if (use_usage)
506
 
    resetUsage();
507
 
}
508
 
 
509
 
bool Session::initGlobals()
510
 
{
511
 
  if (storeGlobals())
512
 
  {
513
 
    disconnect(ER_OUT_OF_RESOURCES);
514
 
    status_var.aborted_connects++;
515
 
    return true;
516
 
  }
517
 
  return false;
518
 
}
519
 
 
520
 
void Session::run()
521
 
{
522
 
  if (initGlobals() || authenticate())
523
 
  {
524
 
    disconnect();
525
 
    return;
526
 
  }
527
 
 
528
 
  prepareForQueries();
529
 
 
530
 
  while (not client->haveError() && getKilled() != KILL_CONNECTION)
531
 
  {
532
 
    if (not executeStatement())
533
 
      break;
534
 
  }
535
 
 
536
 
  disconnect();
537
 
}
538
 
 
539
 
bool Session::schedule(Session::shared_ptr &arg)
540
 
{
541
 
  arg->scheduler= plugin::Scheduler::getScheduler();
542
 
  assert(arg->scheduler);
543
 
 
544
 
  ++connection_count;
545
 
 
546
 
  long current_connections= connection_count;
547
 
 
548
 
  if (current_connections > 0 and static_cast<uint64_t>(current_connections) > current_global_counters.max_used_connections)
549
 
  {
550
 
    current_global_counters.max_used_connections= static_cast<uint64_t>(connection_count);
551
 
  }
552
 
 
553
 
  current_global_counters.connections++;
554
 
  arg->thread_id= arg->variables.pseudo_thread_id= global_thread_id++;
555
 
 
556
 
  session::Cache::singleton().insert(arg);
557
 
 
558
 
  if (unlikely(plugin::EventObserver::connectSession(*arg)))
559
 
  {
560
 
    // We should do something about an error...
561
 
  }
562
 
 
563
 
  if (plugin::Scheduler::getScheduler()->addSession(arg))
564
 
  {
565
 
    DRIZZLE_CONNECTION_START(arg->getSessionId());
566
 
    char error_message_buff[DRIZZLE_ERRMSG_SIZE];
567
 
 
568
 
    arg->setKilled(Session::KILL_CONNECTION);
569
 
 
570
 
    arg->status_var.aborted_connects++;
571
 
 
572
 
    /* Can't use my_error() since store_globals has not been called. */
573
 
    /* TODO replace will better error message */
574
 
    snprintf(error_message_buff, sizeof(error_message_buff),
575
 
             ER(ER_CANT_CREATE_THREAD), 1);
576
 
    arg->client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
577
 
 
578
 
    return true;
579
 
  }
580
 
 
581
 
  return false;
582
 
}
583
 
 
584
 
 
585
 
/*
586
 
  Is this session viewable by the current user?
587
 
*/
588
 
bool Session::isViewable(identifier::User::const_reference user_arg) const
589
 
{
590
 
  return plugin::Authorization::isAuthorized(user_arg, this, false);
591
 
}
592
 
 
593
 
 
594
 
const char* Session::enter_cond(boost::condition_variable_any &cond, boost::mutex &mutex, const char* msg)
595
 
{
596
 
  const char* old_msg = get_proc_info();
597
 
  safe_mutex_assert_owner(mutex);
598
 
  mysys_var->current_mutex = &mutex;
599
 
  mysys_var->current_cond = &cond;
600
 
  this->set_proc_info(msg);
601
 
  return old_msg;
602
 
}
603
 
 
604
 
void Session::exit_cond(const char* old_msg)
605
 
{
606
 
  /*
607
 
    Putting the mutex unlock in exit_cond() ensures that
608
 
    mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
609
 
    locked (if that would not be the case, you'll get a deadlock if someone
610
 
    does a Session::awake() on you).
611
 
  */
612
 
  mysys_var->current_mutex->unlock();
613
 
  boost_unique_lock_t scopedLock(mysys_var->mutex);
614
 
  mysys_var->current_mutex = 0;
615
 
  mysys_var->current_cond = 0;
616
 
  this->set_proc_info(old_msg);
617
 
}
618
 
 
619
 
bool Session::authenticate()
620
 
{
621
 
  if (client->authenticate())
622
 
    return false;
623
 
 
624
 
  status_var.aborted_connects++;
625
 
 
626
 
  return true;
627
 
}
628
 
 
629
 
bool Session::checkUser(const std::string &passwd_str,
630
 
                        const std::string &in_db)
631
 
{
632
 
  bool is_authenticated=
633
 
    plugin::Authentication::isAuthenticated(user(), passwd_str);
634
 
 
635
 
  if (is_authenticated != true)
636
 
  {
637
 
    status_var.access_denied++;
638
 
    /* isAuthenticated has pushed the error message */
639
 
    return false;
640
 
  }
641
 
 
642
 
  /* Change database if necessary */
643
 
  if (not in_db.empty())
644
 
  {
645
 
    identifier::Schema identifier(in_db);
646
 
    if (change_db(this, identifier))
647
 
    {
648
 
      /* change_db() has pushed the error message. */
649
 
      return false;
650
 
    }
651
 
  }
652
 
  my_ok();
653
 
  password= not passwd_str.empty();
654
 
 
655
 
  /* Ready to handle queries */
656
 
  return true;
657
 
}
658
 
 
659
 
bool Session::executeStatement()
660
 
{
661
 
  char *l_packet= 0;
662
 
  uint32_t packet_length;
663
 
 
664
 
  enum enum_server_command l_command;
665
 
 
666
 
  /*
667
 
    indicator of uninitialized lex => normal flow of errors handling
668
 
    (see my_message_sql)
669
 
  */
670
 
  lex->current_select= 0;
671
 
  clear_error();
672
 
  main_da.reset_diagnostics_area();
673
 
 
674
 
  if (client->readCommand(&l_packet, &packet_length) == false)
675
 
  {
676
 
    return false;
677
 
  }
678
 
 
679
 
  if (getKilled() == KILL_CONNECTION)
680
 
    return false;
681
 
 
682
 
  if (packet_length == 0)
683
 
    return true;
684
 
 
685
 
  l_command= static_cast<enum_server_command>(l_packet[0]);
686
 
 
687
 
  if (command >= COM_END)
688
 
    command= COM_END;                           // Wrong command
689
 
 
690
 
  assert(packet_length);
691
 
  return not dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
692
 
}
693
 
 
694
 
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
695
 
{
696
 
  /* Remove garbage at start and end of query */
697
 
  while (in_packet_length > 0 && my_isspace(charset(), in_packet[0]))
698
 
  {
699
 
    in_packet++;
700
 
    in_packet_length--;
701
 
  }
702
 
  const char *pos= in_packet + in_packet_length; /* Point at end null */
703
 
  while (in_packet_length > 0 && (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
704
 
  {
705
 
    pos--;
706
 
    in_packet_length--;
707
 
  }
708
 
 
709
 
  std::string *new_query= new std::string(in_packet, in_packet + in_packet_length);
710
 
  // We can not be entirely sure _schema has a value
711
 
  if (_schema)
712
 
  {
713
 
    plugin::QueryRewriter::rewriteQuery(*_schema, *new_query);
714
 
  }
715
 
  query.reset(new_query);
716
 
  _state.reset(new session::State(in_packet, in_packet_length));
717
 
 
718
 
  return true;
719
 
}
720
 
 
721
 
bool Session::endTransaction(enum enum_mysql_completiontype completion)
722
 
{
723
 
  bool do_release= 0;
724
 
  bool result= true;
725
 
  TransactionServices &transaction_services= TransactionServices::singleton();
726
 
 
727
 
  if (transaction.xid_state.xa_state != XA_NOTR)
728
 
  {
729
 
    my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
730
 
    return false;
731
 
  }
732
 
  switch (completion)
733
 
  {
734
 
    case COMMIT:
735
 
      /*
736
 
       * We don't use endActiveTransaction() here to ensure that this works
737
 
       * even if there is a problem with the OPTION_AUTO_COMMIT flag
738
 
       * (Which of course should never happen...)
739
 
       */
740
 
      server_status&= ~SERVER_STATUS_IN_TRANS;
741
 
      if (transaction_services.commitTransaction(*this, true))
742
 
        result= false;
743
 
      options&= ~(OPTION_BEGIN);
744
 
      break;
745
 
    case COMMIT_RELEASE:
746
 
      do_release= 1; /* fall through */
747
 
    case COMMIT_AND_CHAIN:
748
 
      result= endActiveTransaction();
749
 
      if (result == true && completion == COMMIT_AND_CHAIN)
750
 
        result= startTransaction();
751
 
      break;
752
 
    case ROLLBACK_RELEASE:
753
 
      do_release= 1; /* fall through */
754
 
    case ROLLBACK:
755
 
    case ROLLBACK_AND_CHAIN:
756
 
    {
757
 
      server_status&= ~SERVER_STATUS_IN_TRANS;
758
 
      if (transaction_services.rollbackTransaction(*this, true))
759
 
        result= false;
760
 
      options&= ~(OPTION_BEGIN);
761
 
      if (result == true && (completion == ROLLBACK_AND_CHAIN))
762
 
        result= startTransaction();
763
 
      break;
764
 
    }
765
 
    default:
766
 
      my_error(ER_UNKNOWN_COM_ERROR, MYF(0));
767
 
      return false;
768
 
  }
769
 
 
770
 
  if (result == false)
771
 
  {
772
 
    my_error(static_cast<drizzled::error_t>(killed_errno()), MYF(0));
773
 
  }
774
 
  else if ((result == true) && do_release)
775
 
  {
776
 
    setKilled(Session::KILL_CONNECTION);
777
 
  }
778
 
 
779
 
  return result;
780
 
}
781
 
 
782
 
bool Session::endActiveTransaction()
783
 
{
784
 
  bool result= true;
785
 
  TransactionServices &transaction_services= TransactionServices::singleton();
786
 
 
787
 
  if (transaction.xid_state.xa_state != XA_NOTR)
788
 
  {
789
 
    my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
790
 
    return false;
791
 
  }
792
 
  if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
793
 
  {
794
 
    server_status&= ~SERVER_STATUS_IN_TRANS;
795
 
    if (transaction_services.commitTransaction(*this, true))
796
 
      result= false;
797
 
  }
798
 
  options&= ~(OPTION_BEGIN);
799
 
  return result;
800
 
}
801
 
 
802
 
bool Session::startTransaction(start_transaction_option_t opt)
803
 
{
804
 
  bool result= true;
805
 
 
806
 
  assert(! inTransaction());
807
 
 
808
 
  options|= OPTION_BEGIN;
809
 
  server_status|= SERVER_STATUS_IN_TRANS;
810
 
 
811
 
  if (plugin::TransactionalStorageEngine::notifyStartTransaction(this, opt))
812
 
  {
813
 
    result= false;
814
 
  }
815
 
 
816
 
  return result;
817
 
}
 
914
  thr_lock_info_init(&lock_info);
 
915
  return 0;
 
916
}
 
917
 
 
918
 
 
919
/*
 
920
  Cleanup after query.
 
921
 
 
922
  SYNOPSIS
 
923
    Session::cleanup_after_query()
 
924
 
 
925
  DESCRIPTION
 
926
    This function is used to reset thread data to its default state.
 
927
 
 
928
  NOTE
 
929
    This function is not suitable for setting thread data to some
 
930
    non-default values, as there is only one replication thread, so
 
931
    different master threads may overwrite data of each other on
 
932
    slave.
 
933
*/
818
934
 
819
935
void Session::cleanup_after_query()
820
936
{
821
937
  /*
822
 
    Reset rand_used so that detection of calls to rand() will save random
 
938
    Reset rand_used so that detection of calls to rand() will save random 
823
939
    seeds if needed by the slave.
824
940
  */
825
941
  {
826
942
    /* Forget those values, for next binlogger: */
 
943
    stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
827
944
    auto_inc_intervals_in_cur_stmt_for_binlog.empty();
 
945
    rand_used= 0;
828
946
  }
829
947
  if (first_successful_insert_id_in_cur_stmt > 0)
830
948
  {
831
949
    /* set what LAST_INSERT_ID() will return */
832
 
    first_successful_insert_id_in_prev_stmt= first_successful_insert_id_in_cur_stmt;
 
950
    first_successful_insert_id_in_prev_stmt= 
 
951
      first_successful_insert_id_in_cur_stmt;
833
952
    first_successful_insert_id_in_cur_stmt= 0;
834
953
    substitute_null_with_insert_id= true;
835
954
  }
836
 
 
837
 
  arg_of_last_insert_id_function= false;
838
 
 
 
955
  arg_of_last_insert_id_function= 0;
839
956
  /* Free Items that were created during this execution */
840
957
  free_items();
841
 
 
842
 
  /* Reset _where. */
843
 
  _where= Session::DEFAULT_WHERE;
844
 
 
845
 
  /* Reset the temporary shares we built */
846
 
  for_each(temporary_shares.begin(),
847
 
           temporary_shares.end(),
848
 
           DeletePtr());
849
 
  temporary_shares.clear();
 
958
  /* Reset where. */
 
959
  where= Session::DEFAULT_WHERE;
850
960
}
851
961
 
 
962
 
852
963
/**
853
964
  Create a LEX_STRING in this connection.
854
965
 
860
971
  @return  NULL on failure, or pointer to the LEX_STRING object
861
972
*/
862
973
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
863
 
                                     const std::string &str,
864
 
                                     bool allocate_lex_string)
865
 
{
866
 
  return make_lex_string(lex_str, str.c_str(), str.length(), allocate_lex_string);
867
 
}
868
 
 
869
 
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
870
 
                                     const char* str, uint32_t length,
871
 
                                     bool allocate_lex_string)
 
974
                                 const char* str, uint32_t length,
 
975
                                 bool allocate_lex_string)
872
976
{
873
977
  if (allocate_lex_string)
874
 
    if (!(lex_str= (LEX_STRING *)getMemRoot()->allocate(sizeof(LEX_STRING))))
 
978
    if (!(lex_str= (LEX_STRING *)alloc(sizeof(LEX_STRING))))
875
979
      return 0;
876
 
  if (!(lex_str->str= mem_root->strmake_root(str, length)))
 
980
  if (!(lex_str->str= strmake_root(mem_root, str, length)))
877
981
    return 0;
878
982
  lex_str->length= length;
879
983
  return lex_str;
880
984
}
881
985
 
 
986
 
 
987
/*
 
988
  Convert a string to another character set
 
989
 
 
990
  SYNOPSIS
 
991
    convert_string()
 
992
    to                          Store new allocated string here
 
993
    to_cs                       New character set for allocated string
 
994
    from                        String to convert
 
995
    from_length                 Length of string to convert
 
996
    from_cs                     Original character set
 
997
 
 
998
  NOTES
 
999
    to will be 0-terminated to make it easy to pass to system funcs
 
1000
 
 
1001
  RETURN
 
1002
    0   ok
 
1003
    1   End of memory.
 
1004
        In this case to->str will point to 0 and to->length will be 0.
 
1005
*/
 
1006
 
 
1007
bool Session::convert_string(LEX_STRING *to, const CHARSET_INFO * const to_cs,
 
1008
                         const char *from, uint32_t from_length,
 
1009
                         const CHARSET_INFO * const from_cs)
 
1010
{
 
1011
  size_t new_length= to_cs->mbmaxlen * from_length;
 
1012
  uint32_t dummy_errors;
 
1013
  if (!(to->str= (char*) alloc(new_length+1)))
 
1014
  {
 
1015
    to->length= 0;                              // Safety fix
 
1016
    return(1);                          // EOM
 
1017
  }
 
1018
  to->length= copy_and_convert((char*) to->str, new_length, to_cs,
 
1019
                               from, from_length, from_cs, &dummy_errors);
 
1020
  to->str[to->length]=0;                        // Safety
 
1021
  return(0);
 
1022
}
 
1023
 
 
1024
 
 
1025
/*
 
1026
  Convert string from source character set to target character set inplace.
 
1027
 
 
1028
  SYNOPSIS
 
1029
    Session::convert_string
 
1030
 
 
1031
  DESCRIPTION
 
1032
    Convert string using convert_buffer - buffer for character set 
 
1033
    conversion shared between all protocols.
 
1034
 
 
1035
  RETURN
 
1036
    0   ok
 
1037
   !0   out of memory
 
1038
*/
 
1039
 
 
1040
bool Session::convert_string(String *s, const CHARSET_INFO * const from_cs,
 
1041
                         const CHARSET_INFO * const to_cs)
 
1042
{
 
1043
  uint32_t dummy_errors;
 
1044
  if (convert_buffer.copy(s->ptr(), s->length(), from_cs, to_cs, &dummy_errors))
 
1045
    return true;
 
1046
  /* If convert_buffer >> s copying is more efficient long term */
 
1047
  if (convert_buffer.alloced_length() >= convert_buffer.length() * 2 ||
 
1048
      !s->is_alloced())
 
1049
  {
 
1050
    return s->copy(convert_buffer);
 
1051
  }
 
1052
  s->swap(convert_buffer);
 
1053
  return false;
 
1054
}
 
1055
 
 
1056
 
 
1057
/*
 
1058
  Update some cache variables when character set changes
 
1059
*/
 
1060
 
 
1061
void Session::update_charset()
 
1062
{
 
1063
  uint32_t not_used;
 
1064
  charset_is_system_charset= !String::needs_conversion(0,charset(),
 
1065
                                                       system_charset_info,
 
1066
                                                       &not_used);
 
1067
  charset_is_collation_connection= 
 
1068
    !String::needs_conversion(0,charset(),variables.collation_connection,
 
1069
                              &not_used);
 
1070
  charset_is_character_set_filesystem= 
 
1071
    !String::needs_conversion(0, charset(),
 
1072
                              variables.character_set_filesystem, &not_used);
 
1073
}
 
1074
 
 
1075
 
 
1076
/* routings to adding tables to list of changed in transaction tables */
 
1077
 
 
1078
inline static void list_include(CHANGED_TableList** prev,
 
1079
                                CHANGED_TableList* curr,
 
1080
                                CHANGED_TableList* new_table)
 
1081
{
 
1082
  if (new_table)
 
1083
  {
 
1084
    *prev = new_table;
 
1085
    (*prev)->next = curr;
 
1086
  }
 
1087
}
 
1088
 
 
1089
/* add table to list of changed in transaction tables */
 
1090
 
 
1091
void Session::add_changed_table(Table *table)
 
1092
{
 
1093
  assert((options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
 
1094
              table->file->has_transactions());
 
1095
  add_changed_table(table->s->table_cache_key.str,
 
1096
                    (long) table->s->table_cache_key.length);
 
1097
  return;
 
1098
}
 
1099
 
 
1100
 
 
1101
void Session::add_changed_table(const char *key, long key_length)
 
1102
{
 
1103
  CHANGED_TableList **prev_changed = &transaction.changed_tables;
 
1104
  CHANGED_TableList *curr = transaction.changed_tables;
 
1105
 
 
1106
  for (; curr; prev_changed = &(curr->next), curr = curr->next)
 
1107
  {
 
1108
    int cmp =  (long)curr->key_length - (long)key_length;
 
1109
    if (cmp < 0)
 
1110
    {
 
1111
      list_include(prev_changed, curr, changed_table_dup(key, key_length));
 
1112
      return;
 
1113
    }
 
1114
    else if (cmp == 0)
 
1115
    {
 
1116
      cmp = memcmp(curr->key, key, curr->key_length);
 
1117
      if (cmp < 0)
 
1118
      {
 
1119
        list_include(prev_changed, curr, changed_table_dup(key, key_length));
 
1120
        return;
 
1121
      }
 
1122
      else if (cmp == 0)
 
1123
      {
 
1124
        return;
 
1125
      }
 
1126
    }
 
1127
  }
 
1128
  *prev_changed = changed_table_dup(key, key_length);
 
1129
  return;
 
1130
}
 
1131
 
 
1132
 
 
1133
CHANGED_TableList* Session::changed_table_dup(const char *key, long key_length)
 
1134
{
 
1135
  CHANGED_TableList* new_table = 
 
1136
    (CHANGED_TableList*) trans_alloc(ALIGN_SIZE(sizeof(CHANGED_TableList))+
 
1137
                                      key_length + 1);
 
1138
  if (!new_table)
 
1139
  {
 
1140
    my_error(EE_OUTOFMEMORY, MYF(ME_BELL),
 
1141
             ALIGN_SIZE(sizeof(TableList)) + key_length + 1);
 
1142
    killed= KILL_CONNECTION;
 
1143
    return 0;
 
1144
  }
 
1145
 
 
1146
  new_table->key= ((char*)new_table)+ ALIGN_SIZE(sizeof(CHANGED_TableList));
 
1147
  new_table->next = 0;
 
1148
  new_table->key_length = key_length;
 
1149
  ::memcpy(new_table->key, key, key_length);
 
1150
  return new_table;
 
1151
}
 
1152
 
 
1153
 
882
1154
int Session::send_explain_fields(select_result *result)
883
1155
{
884
1156
  List<Item> field_list;
914
1186
  }
915
1187
  item->maybe_null= 1;
916
1188
  field_list.push_back(new Item_empty_string("Extra", 255, cs));
917
 
  return (result->send_fields(field_list));
918
 
}
919
 
 
920
 
void select_result::send_error(drizzled::error_t errcode, const char *err)
 
1189
  return (result->send_fields(field_list,
 
1190
                              Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF));
 
1191
}
 
1192
 
 
1193
 
 
1194
struct Item_change_record: public ilink
 
1195
{
 
1196
  Item **place;
 
1197
  Item *old_value;
 
1198
  /* Placement new was hidden by `new' in ilink (TODO: check): */
 
1199
  static void *operator new(size_t size __attribute__((unused)),
 
1200
                            void *mem)
 
1201
    { return mem; }
 
1202
  static void operator delete(void *ptr __attribute__((unused)),
 
1203
                              size_t size __attribute__((unused)))
 
1204
    {}
 
1205
  static void operator delete(void *ptr __attribute__((unused)),
 
1206
                              void *mem __attribute__((unused)))
 
1207
    { /* never called */ }
 
1208
};
 
1209
 
 
1210
 
 
1211
/*
 
1212
  Register an item tree tree transformation, performed by the query
 
1213
  optimizer. We need a pointer to runtime_memroot because it may be !=
 
1214
  session->mem_root (this may no longer be a true statement)
 
1215
*/
 
1216
 
 
1217
void Session::nocheck_register_item_tree_change(Item **place, Item *old_value,
 
1218
                                            MEM_ROOT *runtime_memroot)
 
1219
{
 
1220
  Item_change_record *change;
 
1221
  /*
 
1222
    Now we use one node per change, which adds some memory overhead,
 
1223
    but still is rather fast as we use alloc_root for allocations.
 
1224
    A list of item tree changes of an average query should be short.
 
1225
  */
 
1226
  void *change_mem= alloc_root(runtime_memroot, sizeof(*change));
 
1227
  if (change_mem == 0)
 
1228
  {
 
1229
    /*
 
1230
      OOM, session->fatal_error() is called by the error handler of the
 
1231
      memroot. Just return.
 
1232
    */
 
1233
    return;
 
1234
  }
 
1235
  change= new (change_mem) Item_change_record;
 
1236
  change->place= place;
 
1237
  change->old_value= old_value;
 
1238
  change_list.append(change);
 
1239
}
 
1240
 
 
1241
 
 
1242
void Session::rollback_item_tree_changes()
 
1243
{
 
1244
  I_List_iterator<Item_change_record> it(change_list);
 
1245
  Item_change_record *change;
 
1246
 
 
1247
  while ((change= it++))
 
1248
    *change->place= change->old_value;
 
1249
  /* We can forget about changes memory: it's allocated in runtime memroot */
 
1250
  change_list.empty();
 
1251
  return;
 
1252
}
 
1253
 
 
1254
 
 
1255
/*****************************************************************************
 
1256
** Functions to provide a interface to select results
 
1257
*****************************************************************************/
 
1258
 
 
1259
select_result::select_result()
 
1260
{
 
1261
  session=current_session;
 
1262
}
 
1263
 
 
1264
void select_result::send_error(uint32_t errcode,const char *err)
921
1265
{
922
1266
  my_message(errcode, err, MYF(0));
923
1267
}
924
1268
 
 
1269
 
 
1270
void select_result::cleanup()
 
1271
{
 
1272
  /* do nothing */
 
1273
}
 
1274
 
 
1275
bool select_result::check_simple_select() const
 
1276
{
 
1277
  my_error(ER_SP_BAD_CURSOR_QUERY, MYF(0));
 
1278
  return true;
 
1279
}
 
1280
 
 
1281
 
 
1282
static String default_line_term("\n",default_charset_info);
 
1283
static String default_escaped("\\",default_charset_info);
 
1284
static String default_field_term("\t",default_charset_info);
 
1285
 
 
1286
sql_exchange::sql_exchange(char *name, bool flag,
 
1287
                           enum enum_filetype filetype_arg)
 
1288
  :file_name(name), opt_enclosed(0), dumpfile(flag), skip_lines(0)
 
1289
{
 
1290
  filetype= filetype_arg;
 
1291
  field_term= &default_field_term;
 
1292
  enclosed=   line_start= &my_empty_string;
 
1293
  line_term=  &default_line_term;
 
1294
  escaped=    &default_escaped;
 
1295
  cs= NULL;
 
1296
}
 
1297
 
 
1298
bool select_send::send_fields(List<Item> &list, uint32_t flags)
 
1299
{
 
1300
  bool res;
 
1301
  if (!(res= session->protocol->send_fields(&list, flags)))
 
1302
    is_result_set_started= 1;
 
1303
  return res;
 
1304
}
 
1305
 
 
1306
void select_send::abort()
 
1307
{
 
1308
  return;
 
1309
}
 
1310
 
 
1311
 
 
1312
/** 
 
1313
  Cleanup an instance of this class for re-use
 
1314
  at next execution of a prepared statement/
 
1315
  stored procedure statement.
 
1316
*/
 
1317
 
 
1318
void select_send::cleanup()
 
1319
{
 
1320
  is_result_set_started= false;
 
1321
}
 
1322
 
 
1323
/* Send data to client. Returns 0 if ok */
 
1324
 
 
1325
bool select_send::send_data(List<Item> &items)
 
1326
{
 
1327
  if (unit->offset_limit_cnt)
 
1328
  {                                             // using limit offset,count
 
1329
    unit->offset_limit_cnt--;
 
1330
    return 0;
 
1331
  }
 
1332
 
 
1333
  /*
 
1334
    We may be passing the control from mysqld to the client: release the
 
1335
    InnoDB adaptive hash S-latch to avoid thread deadlocks if it was reserved
 
1336
    by session
 
1337
  */
 
1338
  ha_release_temporary_latches(session);
 
1339
 
 
1340
  List_iterator_fast<Item> li(items);
 
1341
  Protocol *protocol= session->protocol;
 
1342
  char buff[MAX_FIELD_WIDTH];
 
1343
  String buffer(buff, sizeof(buff), &my_charset_bin);
 
1344
 
 
1345
  protocol->prepare_for_resend();
 
1346
  Item *item;
 
1347
  while ((item=li++))
 
1348
  {
 
1349
    if (item->send(protocol, &buffer))
 
1350
    {
 
1351
      protocol->free();                         // Free used buffer
 
1352
      my_message(ER_OUT_OF_RESOURCES, ER(ER_OUT_OF_RESOURCES), MYF(0));
 
1353
      break;
 
1354
    }
 
1355
  }
 
1356
  session->sent_row_count++;
 
1357
  if (session->is_error())
 
1358
  {
 
1359
    protocol->remove_last_row();
 
1360
    return(1);
 
1361
  }
 
1362
  if (session->vio_ok())
 
1363
    return(protocol->write());
 
1364
  return(0);
 
1365
}
 
1366
 
 
1367
bool select_send::send_eof()
 
1368
{
 
1369
  /* 
 
1370
    We may be passing the control from mysqld to the client: release the
 
1371
    InnoDB adaptive hash S-latch to avoid thread deadlocks if it was reserved
 
1372
    by session 
 
1373
  */
 
1374
  ha_release_temporary_latches(session);
 
1375
 
 
1376
  /* Unlock tables before sending packet to gain some speed */
 
1377
  if (session->lock)
 
1378
  {
 
1379
    mysql_unlock_tables(session, session->lock);
 
1380
    session->lock=0;
 
1381
  }
 
1382
  ::my_eof(session);
 
1383
  is_result_set_started= 0;
 
1384
  return false;
 
1385
}
 
1386
 
 
1387
 
925
1388
/************************************************************************
926
1389
  Handling writing to file
927
1390
************************************************************************/
928
1391
 
929
 
void select_to_file::send_error(drizzled::error_t errcode,const char *err)
 
1392
void select_to_file::send_error(uint32_t errcode,const char *err)
930
1393
{
931
1394
  my_message(errcode, err, MYF(0));
932
1395
  if (file > 0)
933
1396
  {
934
 
    (void) cache->end_io_cache();
935
 
    (void) internal::my_close(file, MYF(0));
936
 
    (void) internal::my_delete(path.file_string().c_str(), MYF(0));             // Delete file on error
 
1397
    (void) end_io_cache(&cache);
 
1398
    (void) my_close(file,MYF(0));
 
1399
    (void) my_delete(path,MYF(0));              // Delete file on error
937
1400
    file= -1;
938
1401
  }
939
1402
}
941
1404
 
942
1405
bool select_to_file::send_eof()
943
1406
{
944
 
  int error= test(cache->end_io_cache());
945
 
  if (internal::my_close(file, MYF(MY_WME)))
 
1407
  int error= test(end_io_cache(&cache));
 
1408
  if (my_close(file,MYF(MY_WME)))
946
1409
    error= 1;
947
1410
  if (!error)
948
1411
  {
951
1414
      function, SELECT INTO has to have an own SQLCOM.
952
1415
      TODO: split from SQLCOM_SELECT
953
1416
    */
954
 
    session->my_ok(row_count);
 
1417
    ::my_ok(session,row_count);
955
1418
  }
956
1419
  file= -1;
957
1420
  return error;
963
1426
  /* In case of error send_eof() may be not called: close the file here. */
964
1427
  if (file >= 0)
965
1428
  {
966
 
    (void) cache->end_io_cache();
967
 
    (void) internal::my_close(file, MYF(0));
 
1429
    (void) end_io_cache(&cache);
 
1430
    (void) my_close(file,MYF(0));
968
1431
    file= -1;
969
1432
  }
970
 
  path= "";
 
1433
  path[0]= '\0';
971
1434
  row_count= 0;
972
1435
}
973
1436
 
974
 
select_to_file::select_to_file(file_exchange *ex)
975
 
  : exchange(ex),
976
 
    file(-1),
977
 
    cache(static_cast<internal::IO_CACHE *>(memory::sql_calloc(sizeof(internal::IO_CACHE)))),
978
 
    row_count(0L)
979
 
{
980
 
  path= "";
981
 
}
982
1437
 
983
1438
select_to_file::~select_to_file()
984
1439
{
985
 
  cleanup();
 
1440
  if (file >= 0)
 
1441
  {                                     // This only happens in case of error
 
1442
    (void) end_io_cache(&cache);
 
1443
    (void) my_close(file,MYF(0));
 
1444
    file= -1;
 
1445
  }
986
1446
}
987
1447
 
988
1448
/***************************************************************************
1011
1471
*/
1012
1472
 
1013
1473
 
1014
 
static int create_file(Session *session,
1015
 
                       fs::path &target_path,
1016
 
                       file_exchange *exchange,
1017
 
                       internal::IO_CACHE *cache)
 
1474
static File create_file(Session *session, char *path, sql_exchange *exchange,
 
1475
                        IO_CACHE *cache)
1018
1476
{
1019
 
  fs::path to_file(exchange->file_name);
1020
 
  int file;
1021
 
 
1022
 
  if (not to_file.has_root_directory())
 
1477
  File file;
 
1478
  uint32_t option= MY_UNPACK_FILENAME | MY_RELATIVE_PATH;
 
1479
 
 
1480
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
 
1481
  option|= MY_REPLACE_DIR;                      // Force use of db directory
 
1482
#endif
 
1483
 
 
1484
  if (!dirname_length(exchange->file_name))
1023
1485
  {
1024
 
    target_path= fs::system_complete(getDataHomeCatalog());
1025
 
    util::string::const_shared_ptr schema(session->schema());
1026
 
    if (schema and not schema->empty())
1027
 
    {
1028
 
      int count_elements= 0;
1029
 
      for (fs::path::iterator iter= to_file.begin();
1030
 
           iter != to_file.end();
1031
 
           ++iter, ++count_elements)
1032
 
      { }
1033
 
 
1034
 
      if (count_elements == 1)
1035
 
      {
1036
 
        target_path /= *schema;
1037
 
      }
1038
 
    }
1039
 
    target_path /= to_file;
 
1486
    strcpy(path, drizzle_real_data_home);
 
1487
    if (session->db)
 
1488
      strncat(path, session->db, FN_REFLEN-strlen(drizzle_real_data_home)-1);
 
1489
    (void) fn_format(path, exchange->file_name, path, "", option);
1040
1490
  }
1041
1491
  else
1042
 
  {
1043
 
    target_path = exchange->file_name;
1044
 
  }
1045
 
 
1046
 
  if (not secure_file_priv.string().empty())
1047
 
  {
1048
 
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
1049
 
    {
1050
 
      /* Write only allowed to dir or subdir specified by secure_file_priv */
1051
 
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
1052
 
      return -1;
1053
 
    }
1054
 
  }
1055
 
 
1056
 
  if (!access(target_path.file_string().c_str(), F_OK))
 
1492
    (void) fn_format(path, exchange->file_name, drizzle_real_data_home, "", option);
 
1493
 
 
1494
  if (opt_secure_file_priv &&
 
1495
      strncmp(opt_secure_file_priv, path, strlen(opt_secure_file_priv)))
 
1496
  {
 
1497
    /* Write only allowed to dir or subdir specified by secure_file_priv */
 
1498
    my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
1499
    return -1;
 
1500
  }
 
1501
 
 
1502
  if (!access(path, F_OK))
1057
1503
  {
1058
1504
    my_error(ER_FILE_EXISTS_ERROR, MYF(0), exchange->file_name);
1059
1505
    return -1;
1060
1506
  }
1061
1507
  /* Create the file world readable */
1062
 
  if ((file= internal::my_create(target_path.file_string().c_str(), 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
 
1508
  if ((file= my_create(path, 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
1063
1509
    return file;
 
1510
#ifdef HAVE_FCHMOD
1064
1511
  (void) fchmod(file, 0666);                    // Because of umask()
1065
 
  if (cache->init_io_cache(file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
 
1512
#else
 
1513
  (void) chmod(path, 0666);
 
1514
#endif
 
1515
  if (init_io_cache(cache, file, 0L, WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1066
1516
  {
1067
 
    internal::my_close(file, MYF(0));
1068
 
    internal::my_delete(target_path.file_string().c_str(), MYF(0));  // Delete file on error, it was just created
 
1517
    my_close(file, MYF(0));
 
1518
    my_delete(path, MYF(0));  // Delete file on error, it was just created 
1069
1519
    return -1;
1070
1520
  }
1071
1521
  return file;
1073
1523
 
1074
1524
 
1075
1525
int
1076
 
select_export::prepare(List<Item> &list, Select_Lex_Unit *u)
 
1526
select_export::prepare(List<Item> &list, SELECT_LEX_UNIT *u)
1077
1527
{
1078
1528
  bool blob_flag=0;
1079
1529
  bool string_results= false, non_string_results= false;
1080
1530
  unit= u;
1081
 
  if ((uint32_t) strlen(exchange->file_name) + NAME_LEN >= FN_REFLEN)
1082
 
  {
1083
 
    path= exchange->file_name;
1084
 
  }
 
1531
  if ((uint) strlen(exchange->file_name) + NAME_LEN >= FN_REFLEN)
 
1532
    strmake(path,exchange->file_name,FN_REFLEN-1);
1085
1533
 
 
1534
  if ((file= create_file(session, path, exchange, &cache)) < 0)
 
1535
    return 1;
1086
1536
  /* Check if there is any blobs in data */
1087
1537
  {
1088
1538
    List_iterator_fast<Item> li(list);
1091
1541
    {
1092
1542
      if (item->max_length >= MAX_BLOB_WIDTH)
1093
1543
      {
1094
 
        blob_flag=1;
1095
 
        break;
 
1544
        blob_flag=1;
 
1545
        break;
1096
1546
      }
1097
 
 
1098
1547
      if (item->result_type() == STRING_RESULT)
1099
1548
        string_results= true;
1100
1549
      else
1125
1574
      (exchange->opt_enclosed && non_string_results &&
1126
1575
       field_term_length && strchr(NUMERIC_CHARS, field_term_char)))
1127
1576
  {
1128
 
    my_error(ER_AMBIGUOUS_FIELD_TERM, MYF(0));
1129
 
    return 1;
 
1577
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1578
                 ER_AMBIGUOUS_FIELD_TERM, ER(ER_AMBIGUOUS_FIELD_TERM));
 
1579
    is_ambiguous_field_term= true;
1130
1580
  }
1131
 
 
1132
 
  if ((file= create_file(session, path, exchange, cache)) < 0)
1133
 
    return 1;
 
1581
  else
 
1582
    is_ambiguous_field_term= false;
1134
1583
 
1135
1584
  return 0;
1136
1585
}
1137
1586
 
 
1587
 
 
1588
#define NEED_ESCAPING(x) ((int) (unsigned char) (x) == escape_char    || \
 
1589
                          (enclosed ? (int) (unsigned char) (x) == field_sep_char      \
 
1590
                                    : (int) (unsigned char) (x) == field_term_char) || \
 
1591
                          (int) (unsigned char) (x) == line_sep_char  || \
 
1592
                          !(x))
 
1593
 
1138
1594
bool select_export::send_data(List<Item> &items)
1139
1595
{
1140
1596
  char buff[MAX_FIELD_WIDTH],null_buff[2],space[MAX_FIELD_WIDTH];
1145
1601
  if (unit->offset_limit_cnt)
1146
1602
  {                                             // using limit offset,count
1147
1603
    unit->offset_limit_cnt--;
1148
 
    return false;
 
1604
    return(0);
1149
1605
  }
1150
1606
  row_count++;
1151
1607
  Item *item;
1152
1608
  uint32_t used_length=0,items_left=items.elements;
1153
1609
  List_iterator_fast<Item> li(items);
1154
1610
 
1155
 
  if (my_b_write(cache,(unsigned char*) exchange->line_start->ptr(),
1156
 
                 exchange->line_start->length()))
1157
 
    return true;
1158
 
 
 
1611
  if (my_b_write(&cache,(unsigned char*) exchange->line_start->ptr(),
 
1612
                 exchange->line_start->length()))
 
1613
    goto err;
1159
1614
  while ((item=li++))
1160
1615
  {
1161
1616
    Item_result result_type=item->result_type();
1164
1619
    res=item->str_result(&tmp);
1165
1620
    if (res && enclosed)
1166
1621
    {
1167
 
      if (my_b_write(cache,(unsigned char*) exchange->enclosed->ptr(),
1168
 
                     exchange->enclosed->length()))
1169
 
        return true;
 
1622
      if (my_b_write(&cache,(unsigned char*) exchange->enclosed->ptr(),
 
1623
                     exchange->enclosed->length()))
 
1624
        goto err;
1170
1625
    }
1171
1626
    if (!res)
1172
1627
    {                                           // NULL
1173
1628
      if (!fixed_row_size)
1174
1629
      {
1175
 
        if (escape_char != -1)                  // Use \N syntax
1176
 
        {
1177
 
          null_buff[0]=escape_char;
1178
 
          null_buff[1]='N';
1179
 
          if (my_b_write(cache,(unsigned char*) null_buff,2))
1180
 
            return true;
1181
 
        }
1182
 
        else if (my_b_write(cache,(unsigned char*) "NULL",4))
1183
 
          return true;
 
1630
        if (escape_char != -1)                  // Use \N syntax
 
1631
        {
 
1632
          null_buff[0]=escape_char;
 
1633
          null_buff[1]='N';
 
1634
          if (my_b_write(&cache,(unsigned char*) null_buff,2))
 
1635
            goto err;
 
1636
        }
 
1637
        else if (my_b_write(&cache,(unsigned char*) "NULL",4))
 
1638
          goto err;
1184
1639
      }
1185
1640
      else
1186
1641
      {
1187
 
        used_length=0;                          // Fill with space
 
1642
        used_length=0;                          // Fill with space
1188
1643
      }
1189
1644
    }
1190
1645
    else
1191
1646
    {
1192
1647
      if (fixed_row_size)
1193
 
        used_length= min(res->length(), static_cast<size_t>(item->max_length));
 
1648
        used_length=cmin(res->length(),item->max_length);
1194
1649
      else
1195
 
        used_length= res->length();
1196
 
 
 
1650
        used_length=res->length();
1197
1651
      if ((result_type == STRING_RESULT || is_unsafe_field_sep) &&
1198
 
          escape_char != -1)
 
1652
           escape_char != -1)
1199
1653
      {
1200
1654
        char *pos, *start, *end;
1201
1655
        const CHARSET_INFO * const res_charset= res->charset();
1202
 
        const CHARSET_INFO * const character_set_client= default_charset_info;
1203
 
 
 
1656
        const CHARSET_INFO * const character_set_client= session->variables.
 
1657
                                                            character_set_client;
1204
1658
        bool check_second_byte= (res_charset == &my_charset_bin) &&
1205
 
          character_set_client->
1206
 
          escape_with_backslash_is_dangerous;
 
1659
                                 character_set_client->
 
1660
                                 escape_with_backslash_is_dangerous;
1207
1661
        assert(character_set_client->mbmaxlen == 2 ||
1208
 
               !character_set_client->escape_with_backslash_is_dangerous);
1209
 
        for (start=pos=(char*) res->ptr(),end=pos+used_length ;
1210
 
             pos != end ;
1211
 
             pos++)
1212
 
        {
1213
 
          if (use_mb(res_charset))
1214
 
          {
1215
 
            int l;
1216
 
            if ((l=my_ismbchar(res_charset, pos, end)))
1217
 
            {
1218
 
              pos += l-1;
1219
 
              continue;
1220
 
            }
1221
 
          }
 
1662
                    !character_set_client->escape_with_backslash_is_dangerous);
 
1663
        for (start=pos=(char*) res->ptr(),end=pos+used_length ;
 
1664
             pos != end ;
 
1665
             pos++)
 
1666
        {
 
1667
#ifdef USE_MB
 
1668
          if (use_mb(res_charset))
 
1669
          {
 
1670
            int l;
 
1671
            if ((l=my_ismbchar(res_charset, pos, end)))
 
1672
            {
 
1673
              pos += l-1;
 
1674
              continue;
 
1675
            }
 
1676
          }
 
1677
#endif
1222
1678
 
1223
1679
          /*
1224
1680
            Special case when dumping BINARY/VARBINARY/BLOB values
1225
1681
            for the clients with character sets big5, cp932, gbk and sjis,
1226
1682
            which can have the escape character (0x5C "\" by default)
1227
1683
            as the second byte of a multi-byte sequence.
1228
 
 
 
1684
            
1229
1685
            If
1230
1686
            - pos[0] is a valid multi-byte head (e.g 0xEE) and
1231
1687
            - pos[1] is 0x00, which will be escaped as "\0",
1232
 
 
 
1688
            
1233
1689
            then we'll get "0xEE + 0x5C + 0x30" in the output file.
1234
 
 
 
1690
            
1235
1691
            If this file is later loaded using this sequence of commands:
1236
 
 
 
1692
            
1237
1693
            mysql> create table t1 (a varchar(128)) character set big5;
1238
1694
            mysql> LOAD DATA INFILE 'dump.txt' INTO Table t1;
1239
 
 
 
1695
            
1240
1696
            then 0x5C will be misinterpreted as the second byte
1241
1697
            of a multi-byte character "0xEE + 0x5C", instead of
1242
1698
            escape character for 0x00.
1243
 
 
 
1699
            
1244
1700
            To avoid this confusion, we'll escape the multi-byte
1245
1701
            head character too, so the sequence "0xEE + 0x00" will be
1246
1702
            dumped as "0x5C + 0xEE + 0x5C + 0x30".
1247
 
 
 
1703
            
1248
1704
            Note, in the condition below we only check if
1249
1705
            mbcharlen is equal to 2, because there are no
1250
1706
            character sets with mbmaxlen longer than 2
1252
1708
            assert before the loop makes that sure.
1253
1709
          */
1254
1710
 
1255
 
          if ((needs_escaping(*pos, enclosed) ||
 
1711
          if ((NEED_ESCAPING(*pos) ||
1256
1712
               (check_second_byte &&
1257
1713
                my_mbcharlen(character_set_client, (unsigned char) *pos) == 2 &&
1258
1714
                pos + 1 < end &&
1259
 
                needs_escaping(pos[1], enclosed))) &&
 
1715
                NEED_ESCAPING(pos[1]))) &&
1260
1716
              /*
1261
 
                Don't escape field_term_char by doubling - doubling is only
1262
 
                valid for ENCLOSED BY characters:
 
1717
               Don't escape field_term_char by doubling - doubling is only
 
1718
               valid for ENCLOSED BY characters:
1263
1719
              */
1264
1720
              (enclosed || !is_ambiguous_field_term ||
1265
1721
               (int) (unsigned char) *pos != field_term_char))
1266
1722
          {
1267
 
            char tmp_buff[2];
 
1723
            char tmp_buff[2];
1268
1724
            tmp_buff[0]= ((int) (unsigned char) *pos == field_sep_char &&
1269
1725
                          is_ambiguous_field_sep) ?
1270
 
              field_sep_char : escape_char;
1271
 
            tmp_buff[1]= *pos ? *pos : '0';
1272
 
            if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)) ||
1273
 
                my_b_write(cache,(unsigned char*) tmp_buff,2))
1274
 
              return true;
1275
 
            start=pos+1;
1276
 
          }
1277
 
        }
1278
 
        if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)))
1279
 
          return true;
 
1726
                          field_sep_char : escape_char;
 
1727
            tmp_buff[1]= *pos ? *pos : '0';
 
1728
            if (my_b_write(&cache,(unsigned char*) start,(uint) (pos-start)) ||
 
1729
                my_b_write(&cache,(unsigned char*) tmp_buff,2))
 
1730
              goto err;
 
1731
            start=pos+1;
 
1732
          }
 
1733
        }
 
1734
        if (my_b_write(&cache,(unsigned char*) start,(uint) (pos-start)))
 
1735
          goto err;
1280
1736
      }
1281
 
      else if (my_b_write(cache,(unsigned char*) res->ptr(),used_length))
1282
 
        return true;
 
1737
      else if (my_b_write(&cache,(unsigned char*) res->ptr(),used_length))
 
1738
        goto err;
1283
1739
    }
1284
1740
    if (fixed_row_size)
1285
1741
    {                                           // Fill with space
1286
1742
      if (item->max_length > used_length)
1287
1743
      {
1288
 
        /* QQ:  Fix by adding a my_b_fill() function */
1289
 
        if (!space_inited)
1290
 
        {
1291
 
          space_inited=1;
1292
 
          memset(space, ' ', sizeof(space));
1293
 
        }
1294
 
        uint32_t length=item->max_length-used_length;
1295
 
        for (; length > sizeof(space) ; length-=sizeof(space))
1296
 
        {
1297
 
          if (my_b_write(cache,(unsigned char*) space,sizeof(space)))
1298
 
            return true;
1299
 
        }
1300
 
        if (my_b_write(cache,(unsigned char*) space,length))
1301
 
          return true;
 
1744
        /* QQ:  Fix by adding a my_b_fill() function */
 
1745
        if (!space_inited)
 
1746
        {
 
1747
          space_inited=1;
 
1748
          memset(space, ' ', sizeof(space));
 
1749
        }
 
1750
        uint32_t length=item->max_length-used_length;
 
1751
        for (; length > sizeof(space) ; length-=sizeof(space))
 
1752
        {
 
1753
          if (my_b_write(&cache,(unsigned char*) space,sizeof(space)))
 
1754
            goto err;
 
1755
        }
 
1756
        if (my_b_write(&cache,(unsigned char*) space,length))
 
1757
          goto err;
1302
1758
      }
1303
1759
    }
1304
1760
    if (res && enclosed)
1305
1761
    {
1306
 
      if (my_b_write(cache, (unsigned char*) exchange->enclosed->ptr(),
 
1762
      if (my_b_write(&cache, (unsigned char*) exchange->enclosed->ptr(),
1307
1763
                     exchange->enclosed->length()))
1308
 
        return true;
 
1764
        goto err;
1309
1765
    }
1310
1766
    if (--items_left)
1311
1767
    {
1312
 
      if (my_b_write(cache, (unsigned char*) exchange->field_term->ptr(),
 
1768
      if (my_b_write(&cache, (unsigned char*) exchange->field_term->ptr(),
1313
1769
                     field_term_length))
1314
 
        return true;
 
1770
        goto err;
1315
1771
    }
1316
1772
  }
1317
 
  if (my_b_write(cache,(unsigned char*) exchange->line_term->ptr(),
1318
 
                 exchange->line_term->length()))
1319
 
  {
1320
 
    return true;
1321
 
  }
1322
 
 
1323
 
  return false;
 
1773
  if (my_b_write(&cache,(unsigned char*) exchange->line_term->ptr(),
 
1774
                 exchange->line_term->length()))
 
1775
    goto err;
 
1776
  return(0);
 
1777
err:
 
1778
  return(1);
1324
1779
}
1325
1780
 
1326
1781
 
1330
1785
 
1331
1786
 
1332
1787
int
1333
 
select_dump::prepare(List<Item> &, Select_Lex_Unit *u)
 
1788
select_dump::prepare(List<Item> &list __attribute__((unused)),
 
1789
                     SELECT_LEX_UNIT *u)
1334
1790
{
1335
1791
  unit= u;
1336
 
  return (int) ((file= create_file(session, path, exchange, cache)) < 0);
 
1792
  return (int) ((file= create_file(session, path, exchange, &cache)) < 0);
1337
1793
}
1338
1794
 
1339
1795
 
1350
1806
    unit->offset_limit_cnt--;
1351
1807
    return(0);
1352
1808
  }
1353
 
  if (row_count++ > 1)
 
1809
  if (row_count++ > 1) 
1354
1810
  {
1355
1811
    my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0));
1356
 
    return 1;
 
1812
    goto err;
1357
1813
  }
1358
1814
  while ((item=li++))
1359
1815
  {
1360
1816
    res=item->str_result(&tmp);
1361
1817
    if (!res)                                   // If NULL
1362
1818
    {
1363
 
      if (my_b_write(cache,(unsigned char*) "",1))
1364
 
        return 1;
 
1819
      if (my_b_write(&cache,(unsigned char*) "",1))
 
1820
        goto err;
1365
1821
    }
1366
 
    else if (my_b_write(cache,(unsigned char*) res->ptr(),res->length()))
 
1822
    else if (my_b_write(&cache,(unsigned char*) res->ptr(),res->length()))
1367
1823
    {
1368
 
      my_error(ER_ERROR_ON_WRITE, MYF(0), path.file_string().c_str(), errno);
1369
 
      return 1;
 
1824
      my_error(ER_ERROR_ON_WRITE, MYF(0), path, my_errno);
 
1825
      goto err;
1370
1826
    }
1371
1827
  }
1372
1828
  return(0);
 
1829
err:
 
1830
  return(1);
1373
1831
}
1374
1832
 
1375
1833
 
1404
1862
void select_max_min_finder_subselect::cleanup()
1405
1863
{
1406
1864
  cache= 0;
 
1865
  return;
1407
1866
}
1408
1867
 
1409
1868
 
1427
1886
      switch (val_item->result_type())
1428
1887
      {
1429
1888
      case REAL_RESULT:
1430
 
        op= &select_max_min_finder_subselect::cmp_real;
1431
 
        break;
 
1889
        op= &select_max_min_finder_subselect::cmp_real;
 
1890
        break;
1432
1891
      case INT_RESULT:
1433
 
        op= &select_max_min_finder_subselect::cmp_int;
1434
 
        break;
 
1892
        op= &select_max_min_finder_subselect::cmp_int;
 
1893
        break;
1435
1894
      case STRING_RESULT:
1436
 
        op= &select_max_min_finder_subselect::cmp_str;
1437
 
        break;
 
1895
        op= &select_max_min_finder_subselect::cmp_str;
 
1896
        break;
1438
1897
      case DECIMAL_RESULT:
1439
1898
        op= &select_max_min_finder_subselect::cmp_decimal;
1440
1899
        break;
1441
1900
      case ROW_RESULT:
1442
1901
        // This case should never be choosen
1443
 
        assert(0);
1444
 
        op= 0;
 
1902
        assert(0);
 
1903
        op= 0;
1445
1904
      }
1446
1905
    }
1447
1906
    cache->store(val_item);
1480
1939
bool select_max_min_finder_subselect::cmp_decimal()
1481
1940
{
1482
1941
  Item *maxmin= ((Item_singlerow_subselect *)item)->element_index(0);
1483
 
  type::Decimal cval, *cvalue= cache->val_decimal(&cval);
1484
 
  type::Decimal mval, *mvalue= maxmin->val_decimal(&mval);
 
1942
  my_decimal cval, *cvalue= cache->val_decimal(&cval);
 
1943
  my_decimal mval, *mvalue= maxmin->val_decimal(&mval);
1485
1944
  if (fmax)
1486
1945
    return (cache->null_value && !maxmin->null_value) ||
1487
1946
      (!cache->null_value && !maxmin->null_value &&
1488
 
       class_decimal_cmp(cvalue, mvalue) > 0) ;
 
1947
       my_decimal_cmp(cvalue, mvalue) > 0) ;
1489
1948
  return (maxmin->null_value && !cache->null_value) ||
1490
1949
    (!cache->null_value && !maxmin->null_value &&
1491
 
     class_decimal_cmp(cvalue,mvalue) < 0);
 
1950
     my_decimal_cmp(cvalue,mvalue) < 0);
1492
1951
}
1493
1952
 
1494
1953
bool select_max_min_finder_subselect::cmp_str()
1510
1969
     sortcmp(val1, val2, cache->collation.collation) < 0);
1511
1970
}
1512
1971
 
1513
 
bool select_exists_subselect::send_data(List<Item> &)
 
1972
bool select_exists_subselect::send_data(List<Item> &items __attribute__((unused)))
1514
1973
{
1515
1974
  Item_exists_subselect *it= (Item_exists_subselect *)item;
1516
1975
  if (unit->offset_limit_cnt)
1523
1982
  return(0);
1524
1983
}
1525
1984
 
 
1985
 
 
1986
/***************************************************************************
 
1987
  Dump of select to variables
 
1988
***************************************************************************/
 
1989
 
 
1990
int select_dumpvar::prepare(List<Item> &list, SELECT_LEX_UNIT *u)
 
1991
{
 
1992
  unit= u;
 
1993
  
 
1994
  if (var_list.elements != list.elements)
 
1995
  {
 
1996
    my_message(ER_WRONG_NUMBER_OF_COLUMNS_IN_SELECT,
 
1997
               ER(ER_WRONG_NUMBER_OF_COLUMNS_IN_SELECT), MYF(0));
 
1998
    return 1;
 
1999
  }               
 
2000
  return 0;
 
2001
}
 
2002
 
 
2003
 
 
2004
bool select_dumpvar::check_simple_select() const
 
2005
{
 
2006
  my_error(ER_SP_BAD_CURSOR_SELECT, MYF(0));
 
2007
  return true;
 
2008
}
 
2009
 
 
2010
 
 
2011
void select_dumpvar::cleanup()
 
2012
{
 
2013
  row_count= 0;
 
2014
}
 
2015
 
 
2016
 
 
2017
void Query_arena::free_items()
 
2018
{
 
2019
  Item *next;
 
2020
  /* This works because items are allocated with sql_alloc() */
 
2021
  for (; free_list; free_list= next)
 
2022
  {
 
2023
    next= free_list->next;
 
2024
    free_list->delete_self();
 
2025
  }
 
2026
  /* Postcondition: free_list is 0 */
 
2027
  return;
 
2028
}
 
2029
 
 
2030
 
 
2031
/*
 
2032
  Statement functions
 
2033
*/
 
2034
 
 
2035
Statement::Statement(LEX *lex_arg, MEM_ROOT *mem_root_arg, ulong id_arg)
 
2036
  :Query_arena(mem_root_arg),
 
2037
  id(id_arg),
 
2038
  mark_used_columns(MARK_COLUMNS_READ),
 
2039
  lex(lex_arg),
 
2040
  query(0),
 
2041
  query_length(0),
 
2042
  db(NULL),
 
2043
  db_length(0)
 
2044
{
 
2045
}
 
2046
 
 
2047
 
1526
2048
/*
1527
2049
  Don't free mem_root, as mem_root is freed in the end of dispatch_command
1528
2050
  (once for any command).
1530
2052
void Session::end_statement()
1531
2053
{
1532
2054
  /* Cleanup SQL processing state to reuse this statement in next query. */
1533
 
  lex->end();
1534
 
  query_cache_key= ""; // reset the cache key
1535
 
  resetResultsetMessage();
 
2055
  lex_end(lex);
1536
2056
}
1537
2057
 
 
2058
 
1538
2059
bool Session::copy_db_to(char **p_db, size_t *p_db_length)
1539
2060
{
1540
 
  assert(_schema);
1541
 
  if (_schema and _schema->empty())
1542
 
  {
1543
 
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1544
 
    return true;
1545
 
  }
1546
 
  else if (not _schema)
1547
 
  {
1548
 
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1549
 
    return true;
1550
 
  }
1551
 
  assert(_schema);
1552
 
 
1553
 
  *p_db= strmake(_schema->c_str(), _schema->size());
1554
 
  *p_db_length= _schema->size();
1555
 
 
 
2061
  if (db == NULL)
 
2062
  {
 
2063
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
 
2064
    return true;
 
2065
  }
 
2066
  *p_db= strmake(db, db_length);
 
2067
  *p_db_length= db_length;
1556
2068
  return false;
1557
2069
}
1558
2070
 
 
2071
 
 
2072
bool select_dumpvar::send_data(List<Item> &items)
 
2073
{
 
2074
  List_iterator_fast<my_var> var_li(var_list);
 
2075
  List_iterator<Item> it(items);
 
2076
  Item *item;
 
2077
  my_var *mv;
 
2078
 
 
2079
  if (unit->offset_limit_cnt)
 
2080
  {                                             // using limit offset,count
 
2081
    unit->offset_limit_cnt--;
 
2082
    return(0);
 
2083
  }
 
2084
  if (row_count++) 
 
2085
  {
 
2086
    my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0));
 
2087
    return(1);
 
2088
  }
 
2089
  while ((mv= var_li++) && (item= it++))
 
2090
  {
 
2091
    if (mv->local == 0)
 
2092
    {
 
2093
      Item_func_set_user_var *suv= new Item_func_set_user_var(mv->s, item);
 
2094
      suv->fix_fields(session, 0);
 
2095
      suv->check(0);
 
2096
      suv->update();
 
2097
    }
 
2098
  }
 
2099
  return(session->is_error());
 
2100
}
 
2101
 
 
2102
bool select_dumpvar::send_eof()
 
2103
{
 
2104
  if (! row_count)
 
2105
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
2106
                 ER_SP_FETCH_NO_DATA, ER(ER_SP_FETCH_NO_DATA));
 
2107
  /*
 
2108
    In order to remember the value of affected rows for ROW_COUNT()
 
2109
    function, SELECT INTO has to have an own SQLCOM.
 
2110
    TODO: split from SQLCOM_SELECT
 
2111
  */
 
2112
  ::my_ok(session,row_count);
 
2113
  return 0;
 
2114
}
 
2115
 
1559
2116
/****************************************************************************
1560
 
  Tmp_Table_Param
 
2117
  TMP_TABLE_PARAM
1561
2118
****************************************************************************/
1562
2119
 
1563
 
void Tmp_Table_Param::init()
 
2120
void TMP_TABLE_PARAM::init()
1564
2121
{
1565
2122
  field_count= sum_func_count= func_count= hidden_field_count= 0;
1566
2123
  group_parts= group_length= group_null_parts= 0;
1567
2124
  quick_group= 1;
1568
2125
  table_charset= 0;
1569
2126
  precomputed_group_by= 0;
 
2127
  bit_fields_as_long= 0;
 
2128
  return;
1570
2129
}
1571
2130
 
1572
 
void Tmp_Table_Param::cleanup(void)
 
2131
 
 
2132
void session_increment_bytes_sent(ulong length)
1573
2133
{
1574
 
  /* Fix for Intel compiler */
1575
 
  if (copy_field)
1576
 
  {
1577
 
    boost::checked_array_delete(copy_field);
1578
 
    save_copy_field= save_copy_field_end= copy_field= copy_field_end= 0;
 
2134
  Session *session=current_session;
 
2135
  if (likely(session != 0))
 
2136
  { /* current_session==0 when close_connection() calls net_send_error() */
 
2137
    session->status_var.bytes_sent+= length;
1579
2138
  }
1580
2139
}
1581
2140
 
 
2141
 
 
2142
void session_increment_bytes_received(ulong length)
 
2143
{
 
2144
  current_session->status_var.bytes_received+= length;
 
2145
}
 
2146
 
 
2147
 
 
2148
void session_increment_net_big_packet_count(ulong length)
 
2149
{
 
2150
  current_session->status_var.net_big_packet_count+= length;
 
2151
}
 
2152
 
1582
2153
void Session::send_kill_message() const
1583
2154
{
1584
 
  drizzled::error_t err= static_cast<drizzled::error_t>(killed_errno());
1585
 
  if (err != EE_OK)
 
2155
  int err= killed_errno();
 
2156
  if (err)
1586
2157
    my_message(err, ER(err), MYF(0));
1587
2158
}
1588
2159
 
1592
2163
}
1593
2164
 
1594
2165
 
1595
 
void Session::set_db(const std::string &new_db)
1596
 
{
1597
 
  /* Do not reallocate memory if current chunk is big enough. */
1598
 
  if (new_db.length())
1599
 
  {
1600
 
    _schema.reset(new std::string(new_db));
1601
 
  }
1602
 
  else
1603
 
  {
1604
 
    _schema.reset(new std::string(""));
1605
 
  }
 
2166
void Security_context::init()
 
2167
{
 
2168
  user= ip= 0;
 
2169
}
 
2170
 
 
2171
 
 
2172
void Security_context::destroy()
 
2173
{
 
2174
  // If not pointer to constant
 
2175
  if (user)
 
2176
  {
 
2177
    free(user);
 
2178
    user= NULL;
 
2179
  }
 
2180
  if (ip)
 
2181
  {
 
2182
    free(ip);
 
2183
    ip= NULL;
 
2184
  }
 
2185
}
 
2186
 
 
2187
 
 
2188
void Security_context::skip_grants()
 
2189
{
 
2190
  /* privileges for the user are unknown everything is allowed */
 
2191
}
 
2192
 
 
2193
 
 
2194
/****************************************************************************
 
2195
  Handling of open and locked tables states.
 
2196
 
 
2197
  This is used when we want to open/lock (and then close) some tables when
 
2198
  we already have a set of tables open and locked. We use these methods for
 
2199
  access to mysql.proc table to find definitions of stored routines.
 
2200
****************************************************************************/
 
2201
 
 
2202
void Session::reset_n_backup_open_tables_state(Open_tables_state *backup)
 
2203
{
 
2204
  backup->set_open_tables_state(this);
 
2205
  reset_open_tables_state();
 
2206
  state_flags|= Open_tables_state::BACKUPS_AVAIL;
 
2207
  return;
 
2208
}
 
2209
 
 
2210
 
 
2211
void Session::restore_backup_open_tables_state(Open_tables_state *backup)
 
2212
{
 
2213
  /*
 
2214
    Before we will throw away current open tables state we want
 
2215
    to be sure that it was properly cleaned up.
 
2216
  */
 
2217
  assert(open_tables == 0 && temporary_tables == 0 &&
 
2218
              handler_tables == 0 && derived_tables == 0 &&
 
2219
              lock == 0 && locked_tables == 0);
 
2220
  set_open_tables_state(backup);
 
2221
  return;
 
2222
}
 
2223
 
 
2224
/**
 
2225
  Check the killed state of a user thread
 
2226
  @param session  user thread
 
2227
  @retval 0 the user thread is active
 
2228
  @retval 1 the user thread has been killed
 
2229
*/
 
2230
extern "C" int session_killed(const Session *session)
 
2231
{
 
2232
  return(session->killed);
 
2233
}
 
2234
 
 
2235
/**
 
2236
  Return the thread id of a user thread
 
2237
  @param session user thread
 
2238
  @return thread id
 
2239
*/
 
2240
extern "C" unsigned long session_get_thread_id(const Session *session)
 
2241
{
 
2242
  return((unsigned long)session->thread_id);
 
2243
}
 
2244
 
 
2245
 
 
2246
extern "C"
 
2247
LEX_STRING *session_make_lex_string(Session *session, LEX_STRING *lex_str,
 
2248
                                const char *str, unsigned int size,
 
2249
                                int allocate_lex_string)
 
2250
{
 
2251
  return session->make_lex_string(lex_str, str, size,
 
2252
                              (bool) allocate_lex_string);
 
2253
}
 
2254
 
 
2255
extern "C" const struct charset_info_st *session_charset(Session *session)
 
2256
{
 
2257
  return(session->charset());
 
2258
}
 
2259
 
 
2260
extern "C" char **session_query(Session *session)
 
2261
{
 
2262
  return(&session->query);
 
2263
}
 
2264
 
 
2265
extern "C" int session_slave_thread(const Session *session)
 
2266
{
 
2267
  return(session->slave_thread);
 
2268
}
 
2269
 
 
2270
extern "C" int session_non_transactional_update(const Session *session)
 
2271
{
 
2272
  return(session->transaction.all.modified_non_trans_table);
 
2273
}
 
2274
 
 
2275
extern "C" int session_binlog_format(const Session *session)
 
2276
{
 
2277
  return (int) session->variables.binlog_format;
 
2278
}
 
2279
 
 
2280
extern "C" void session_mark_transaction_to_rollback(Session *session, bool all)
 
2281
{
 
2282
  mark_transaction_to_rollback(session, all);
1606
2283
}
1607
2284
 
1608
2285
 
1612
2289
  @param  session   Thread handle
1613
2290
  @param  all   true <=> rollback main transaction.
1614
2291
*/
1615
 
void Session::markTransactionForRollback(bool all)
1616
 
{
1617
 
  is_fatal_sub_stmt_error= true;
1618
 
  transaction_rollback_request= all;
1619
 
}
1620
 
 
1621
 
void Session::disconnect(enum error_t errcode)
1622
 
{
1623
 
  /* Allow any plugins to cleanup their session variables */
1624
 
  plugin_sessionvar_cleanup(this);
1625
 
 
1626
 
  /* If necessary, log any aborted or unauthorized connections */
1627
 
  if (getKilled() || client->wasAborted())
1628
 
  {
1629
 
    status_var.aborted_threads++;
1630
 
  }
1631
 
 
1632
 
  if (client->wasAborted())
1633
 
  {
1634
 
    if (not getKilled() && variables.log_warnings > 1)
1635
 
    {
1636
 
      errmsg_printf(error::WARN, ER(ER_NEW_ABORTING_CONNECTION)
1637
 
                  , thread_id
1638
 
                  , (_schema->empty() ? "unconnected" : _schema->c_str())
1639
 
                  , security_ctx->username().empty() == false ? security_ctx->username().c_str() : "unauthenticated"
1640
 
                  , security_ctx->address().c_str()
1641
 
                  , (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1642
 
    }
1643
 
  }
1644
 
 
1645
 
  setKilled(Session::KILL_CONNECTION);
1646
 
 
1647
 
  if (client->isConnected())
1648
 
  {
1649
 
    if (errcode != EE_OK)
1650
 
    {
1651
 
      /*my_error(errcode, ER(errcode));*/
1652
 
      client->sendError(errcode, ER(errcode));
1653
 
    }
1654
 
    client->close();
1655
 
  }
1656
 
}
1657
 
 
1658
 
void Session::reset_for_next_command()
1659
 
{
1660
 
  free_list= 0;
1661
 
  select_number= 1;
1662
 
  /*
1663
 
    Those two lines below are theoretically unneeded as
1664
 
    Session::cleanup_after_query() should take care of this already.
1665
 
  */
1666
 
  auto_inc_intervals_in_cur_stmt_for_binlog.empty();
1667
 
 
1668
 
  is_fatal_error= false;
1669
 
  server_status&= ~ (SERVER_MORE_RESULTS_EXISTS |
1670
 
                          SERVER_QUERY_NO_INDEX_USED |
1671
 
                          SERVER_QUERY_NO_GOOD_INDEX_USED);
1672
 
 
1673
 
  clear_error();
1674
 
  main_da.reset_diagnostics_area();
1675
 
  total_warn_count=0;                   // Warnings for this query
1676
 
  sent_row_count= examined_row_count= 0;
1677
 
}
1678
 
 
1679
 
/*
1680
 
  Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
1681
 
*/
1682
 
 
1683
 
void Open_tables_state::close_temporary_tables()
1684
 
{
1685
 
  Table *table;
1686
 
  Table *tmp_next;
1687
 
 
1688
 
  if (not temporary_tables)
1689
 
    return;
1690
 
 
1691
 
  for (table= temporary_tables; table; table= tmp_next)
1692
 
  {
1693
 
    tmp_next= table->getNext();
1694
 
    nukeTable(table);
1695
 
  }
1696
 
  temporary_tables= NULL;
1697
 
}
1698
 
 
1699
 
/*
1700
 
  unlink from session->temporary tables and close temporary table
1701
 
*/
1702
 
 
1703
 
void Open_tables_state::close_temporary_table(Table *table)
1704
 
{
1705
 
  if (table->getPrev())
1706
 
  {
1707
 
    table->getPrev()->setNext(table->getNext());
1708
 
    if (table->getPrev()->getNext())
1709
 
    {
1710
 
      table->getNext()->setPrev(table->getPrev());
1711
 
    }
1712
 
  }
1713
 
  else
1714
 
  {
1715
 
    /* removing the item from the list */
1716
 
    assert(table == temporary_tables);
1717
 
    /*
1718
 
      slave must reset its temporary list pointer to zero to exclude
1719
 
      passing non-zero value to end_slave via rli->save_temporary_tables
1720
 
      when no temp tables opened, see an invariant below.
1721
 
    */
1722
 
    temporary_tables= table->getNext();
1723
 
    if (temporary_tables)
1724
 
    {
1725
 
      table->getNext()->setPrev(NULL);
1726
 
    }
1727
 
  }
1728
 
  nukeTable(table);
1729
 
}
1730
 
 
1731
 
/*
1732
 
  Close and drop a temporary table
1733
 
 
1734
 
  NOTE
1735
 
  This dosn't unlink table from session->temporary
1736
 
  If this is needed, use close_temporary_table()
1737
 
*/
1738
 
 
1739
 
void Open_tables_state::nukeTable(Table *table)
1740
 
{
1741
 
  plugin::StorageEngine *table_type= table->getShare()->db_type();
1742
 
 
1743
 
  table->free_io_cache();
1744
 
  table->delete_table();
1745
 
 
1746
 
  identifier::Table identifier(table->getShare()->getSchemaName(), table->getShare()->getTableName(), table->getShare()->getPath());
1747
 
  rm_temporary_table(table_type, identifier);
1748
 
 
1749
 
  boost::checked_delete(table->getMutableShare());
1750
 
 
1751
 
  boost::checked_delete(table);
1752
 
}
1753
 
 
1754
 
/** Clear most status variables. */
1755
 
extern time_t flush_status_time;
1756
 
 
1757
 
void Session::refresh_status()
1758
 
{
1759
 
  /* Reset thread's status variables */
1760
 
  memset(&status_var, 0, sizeof(status_var));
1761
 
 
1762
 
  flush_status_time= time((time_t*) 0);
1763
 
  current_global_counters.max_used_connections= 1; /* We set it to one, because we know we exist */
1764
 
  current_global_counters.connections= 0;
1765
 
}
1766
 
 
1767
 
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1768
 
{
1769
 
  return getVariable(std::string(name.str, name.length), create_if_not_exists);
1770
 
}
1771
 
 
1772
 
user_var_entry *Session::getVariable(const std::string  &name, bool create_if_not_exists)
1773
 
{
1774
 
  if (cleanup_done)
1775
 
    return NULL;
1776
 
 
1777
 
  UserVars::iterator iter= user_vars.find(name);
1778
 
  if (iter != user_vars.end())
1779
 
    return (*iter).second;
1780
 
 
1781
 
  if (not create_if_not_exists)
1782
 
    return NULL;
1783
 
 
1784
 
  user_var_entry *entry= NULL;
1785
 
  entry= new (nothrow) user_var_entry(name.c_str(), query_id);
1786
 
 
1787
 
  if (entry == NULL)
1788
 
    return NULL;
1789
 
 
1790
 
  std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(name, entry));
1791
 
 
1792
 
  if (not returnable.second)
1793
 
  {
1794
 
    boost::checked_delete(entry);
1795
 
  }
1796
 
 
1797
 
  return entry;
1798
 
}
1799
 
 
1800
 
void Session::setVariable(const std::string &name, const std::string &value)
1801
 
{
1802
 
  user_var_entry *updateable_var= getVariable(name.c_str(), true);
1803
 
  if (updateable_var)
1804
 
  {
1805
 
    updateable_var->update_hash(false,
1806
 
                                (void*)value.c_str(),
1807
 
                                static_cast<uint32_t>(value.length()), STRING_RESULT,
1808
 
                                &my_charset_bin,
1809
 
                                DERIVATION_IMPLICIT, false);
1810
 
  }
1811
 
}
1812
 
 
1813
 
void Open_tables_state::mark_temp_tables_as_free_for_reuse()
1814
 
{
1815
 
  for (Table *table= temporary_tables ; table ; table= table->getNext())
1816
 
  {
1817
 
    if (table->query_id == getQueryId())
1818
 
    {
1819
 
      table->query_id= 0;
1820
 
      table->cursor->ha_reset();
1821
 
    }
1822
 
  }
1823
 
}
1824
 
 
1825
 
void Session::mark_used_tables_as_free_for_reuse(Table *table)
1826
 
{
1827
 
  for (; table ; table= table->getNext())
1828
 
  {
1829
 
    if (table->query_id == getQueryId())
1830
 
    {
1831
 
      table->query_id= 0;
1832
 
      table->cursor->ha_reset();
1833
 
    }
1834
 
  }
1835
 
}
1836
 
 
1837
 
/*
1838
 
  Unlocks tables and frees derived tables.
1839
 
  Put all normal tables used by thread in free list.
1840
 
 
1841
 
  It will only close/mark as free for reuse tables opened by this
1842
 
  substatement, it will also check if we are closing tables after
1843
 
  execution of complete query (i.e. we are on upper level) and will
1844
 
  leave prelocked mode if needed.
1845
 
*/
1846
 
void Session::close_thread_tables()
1847
 
{
1848
 
  clearDerivedTables();
1849
 
 
1850
 
  /*
1851
 
    Mark all temporary tables used by this statement as free for reuse.
1852
 
  */
1853
 
  mark_temp_tables_as_free_for_reuse();
1854
 
  /*
1855
 
    Let us commit transaction for statement. Since in 5.0 we only have
1856
 
    one statement transaction and don't allow several nested statement
1857
 
    transactions this call will do nothing if we are inside of stored
1858
 
    function or trigger (i.e. statement transaction is already active and
1859
 
    does not belong to statement for which we do close_thread_tables()).
1860
 
    TODO: This should be fixed in later releases.
1861
 
   */
1862
 
  {
1863
 
    TransactionServices &transaction_services= TransactionServices::singleton();
1864
 
    main_da.can_overwrite_status= true;
1865
 
    transaction_services.autocommitOrRollback(*this, is_error());
1866
 
    main_da.can_overwrite_status= false;
1867
 
    transaction.stmt.reset();
1868
 
  }
1869
 
 
1870
 
  if (lock)
1871
 
  {
1872
 
    /*
1873
 
      For RBR we flush the pending event just before we unlock all the
1874
 
      tables.  This means that we are at the end of a topmost
1875
 
      statement, so we ensure that the STMT_END_F flag is set on the
1876
 
      pending event.  For statements that are *inside* stored
1877
 
      functions, the pending event will not be flushed: that will be
1878
 
      handled either before writing a query log event (inside
1879
 
      binlog_query()) or when preparing a pending event.
1880
 
     */
1881
 
    unlockTables(lock);
1882
 
    lock= 0;
1883
 
  }
1884
 
  /*
1885
 
    Note that we need to hold table::Cache::singleton().mutex() while changing the
1886
 
    open_tables list. Another thread may work on it.
1887
 
    (See: table::Cache::singleton().removeTable(), wait_completed_table())
1888
 
    Closing a MERGE child before the parent would be fatal if the
1889
 
    other thread tries to abort the MERGE lock in between.
1890
 
  */
1891
 
  if (open_tables)
1892
 
    close_open_tables();
1893
 
}
1894
 
 
1895
 
void Session::close_tables_for_reopen(TableList **tables)
1896
 
{
1897
 
  /*
1898
 
    If table list consists only from tables from prelocking set, table list
1899
 
    for new attempt should be empty, so we have to update list's root pointer.
1900
 
  */
1901
 
  if (lex->first_not_own_table() == *tables)
1902
 
    *tables= 0;
1903
 
  lex->chop_off_not_own_tables();
1904
 
  for (TableList *tmp= *tables; tmp; tmp= tmp->next_global)
1905
 
    tmp->table= 0;
1906
 
  close_thread_tables();
1907
 
}
1908
 
 
1909
 
bool Session::openTablesLock(TableList *tables)
1910
 
{
1911
 
  uint32_t counter;
1912
 
  bool need_reopen;
1913
 
 
1914
 
  for ( ; ; )
1915
 
  {
1916
 
    if (open_tables_from_list(&tables, &counter))
1917
 
      return true;
1918
 
 
1919
 
    if (not lock_tables(tables, counter, &need_reopen))
1920
 
      break;
1921
 
 
1922
 
    if (not need_reopen)
1923
 
      return true;
1924
 
 
1925
 
    close_tables_for_reopen(&tables);
1926
 
  }
1927
 
 
1928
 
  if ((handle_derived(lex, &derived_prepare) || (handle_derived(lex, &derived_filling))))
1929
 
    return true;
1930
 
 
1931
 
  return false;
1932
 
}
1933
 
 
1934
 
/*
1935
 
  @note "best_effort" is used in cases were if a failure occurred on this
1936
 
  operation it would not be surprising because we are only removing because there
1937
 
  might be an issue (lame engines).
1938
 
*/
1939
 
 
1940
 
bool Open_tables_state::rm_temporary_table(const identifier::Table &identifier, bool best_effort)
1941
 
{
1942
 
  if (not plugin::StorageEngine::dropTable(*static_cast<Session *>(this), identifier))
1943
 
  {
1944
 
    if (not best_effort)
1945
 
    {
1946
 
      std::string path;
1947
 
      identifier.getSQLPath(path);
1948
 
      errmsg_printf(error::WARN, _("Could not remove temporary table: '%s', error: %d"),
1949
 
                    path.c_str(), errno);
1950
 
    }
1951
 
 
1952
 
    return true;
1953
 
  }
1954
 
 
1955
 
  return false;
1956
 
}
1957
 
 
1958
 
bool Open_tables_state::rm_temporary_table(plugin::StorageEngine *base, const identifier::Table &identifier)
1959
 
{
1960
 
  drizzled::error_t error;
1961
 
  assert(base);
1962
 
 
1963
 
  if (not plugin::StorageEngine::dropTable(*static_cast<Session *>(this), *base, identifier, error))
1964
 
  {
1965
 
    std::string path;
1966
 
    identifier.getSQLPath(path);
1967
 
    errmsg_printf(error::WARN, _("Could not remove temporary table: '%s', error: %d"),
1968
 
                  path.c_str(), error);
1969
 
 
1970
 
    return true;
1971
 
  }
1972
 
 
1973
 
  return false;
1974
 
}
1975
 
 
1976
 
/**
1977
 
  @note this will be removed, I am looking through Hudson to see if it is finding
1978
 
  any tables that are missed during cleanup.
1979
 
*/
1980
 
void Open_tables_state::dumpTemporaryTableNames(const char *foo)
1981
 
{
1982
 
  Table *table;
1983
 
 
1984
 
  if (not temporary_tables)
1985
 
    return;
1986
 
 
1987
 
  cerr << "Begin Run: " << foo << "\n";
1988
 
  for (table= temporary_tables; table; table= table->getNext())
1989
 
  {
1990
 
    bool have_proto= false;
1991
 
 
1992
 
    message::Table *proto= table->getShare()->getTableMessage();
1993
 
    if (table->getShare()->getTableMessage())
1994
 
      have_proto= true;
1995
 
 
1996
 
    const char *answer= have_proto ? "true" : "false";
1997
 
 
1998
 
    if (have_proto)
1999
 
    {
2000
 
      cerr << "\tTable Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2001
 
      cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
2002
 
    }
2003
 
    else
2004
 
    {
2005
 
      cerr << "\tTabl;e Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2006
 
    }
2007
 
  }
2008
 
}
2009
 
 
2010
 
table::Singular *Session::getInstanceTable()
2011
 
{
2012
 
  temporary_shares.push_back(new table::Singular()); // This will not go into the tableshare cache, so no key is used.
2013
 
 
2014
 
  table::Singular *tmp_share= temporary_shares.back();
2015
 
 
2016
 
  assert(tmp_share);
2017
 
 
2018
 
  return tmp_share;
2019
 
}
2020
 
 
2021
 
 
2022
 
/**
2023
 
  Create a reduced Table object with properly set up Field list from a
2024
 
  list of field definitions.
2025
 
 
2026
 
    The created table doesn't have a table Cursor associated with
2027
 
    it, has no keys, no group/distinct, no copy_funcs array.
2028
 
    The sole purpose of this Table object is to use the power of Field
2029
 
    class to read/write data to/from table->getInsertRecord(). Then one can store
2030
 
    the record in any container (RB tree, hash, etc).
2031
 
    The table is created in Session mem_root, so are the table's fields.
2032
 
    Consequently, if you don't BLOB fields, you don't need to free it.
2033
 
 
2034
 
  @param session         connection handle
2035
 
  @param field_list  list of column definitions
2036
 
 
2037
 
  @return
2038
 
    0 if out of memory, Table object in case of success
2039
 
*/
2040
 
table::Singular *Session::getInstanceTable(List<CreateField> &field_list)
2041
 
{
2042
 
  temporary_shares.push_back(new table::Singular(this, field_list)); // This will not go into the tableshare cache, so no key is used.
2043
 
 
2044
 
  table::Singular *tmp_share= temporary_shares.back();
2045
 
 
2046
 
  assert(tmp_share);
2047
 
 
2048
 
  return tmp_share;
2049
 
}
2050
 
 
2051
 
namespace display  {
2052
 
 
2053
 
static const std::string NONE= "NONE";
2054
 
static const std::string GOT_GLOBAL_READ_LOCK= "HAS GLOBAL READ LOCK";
2055
 
static const std::string MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT= "HAS GLOBAL READ LOCK WITH BLOCKING COMMIT";
2056
 
 
2057
 
const std::string &type(drizzled::Session::global_read_lock_t type)
2058
 
{
2059
 
  switch (type) {
2060
 
    default:
2061
 
    case Session::NONE:
2062
 
      return NONE;
2063
 
    case Session::GOT_GLOBAL_READ_LOCK:
2064
 
      return GOT_GLOBAL_READ_LOCK;
2065
 
    case Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT:
2066
 
      return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT;
2067
 
  }
2068
 
}
2069
 
 
2070
 
size_t max_string_length(drizzled::Session::global_read_lock_t)
2071
 
{
2072
 
  return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT.size();
2073
 
}
2074
 
 
2075
 
} /* namespace display */
2076
 
 
2077
 
} /* namespace drizzled */
 
2292
 
 
2293
void mark_transaction_to_rollback(Session *session, bool all)
 
2294
{
 
2295
  if (session)
 
2296
  {
 
2297
    session->is_fatal_sub_stmt_error= true;
 
2298
    session->transaction_rollback_request= all;
 
2299
  }
 
2300
}
 
2301
/***************************************************************************
 
2302
  Handling of XA id cacheing
 
2303
***************************************************************************/
 
2304
 
 
2305
pthread_mutex_t LOCK_xid_cache;
 
2306
HASH xid_cache;
 
2307
 
 
2308
extern "C" unsigned char *xid_get_hash_key(const unsigned char *, size_t *, bool);
 
2309
extern "C" void xid_free_hash(void *);
 
2310
 
 
2311
unsigned char *xid_get_hash_key(const unsigned char *ptr, size_t *length,
 
2312
                        bool not_used __attribute__((unused)))
 
2313
{
 
2314
  *length=((XID_STATE*)ptr)->xid.key_length();
 
2315
  return ((XID_STATE*)ptr)->xid.key();
 
2316
}
 
2317
 
 
2318
void xid_free_hash(void *ptr)
 
2319
{
 
2320
  if (!((XID_STATE*)ptr)->in_session)
 
2321
    free((unsigned char*)ptr);
 
2322
}
 
2323
 
 
2324
bool xid_cache_init()
 
2325
{
 
2326
  pthread_mutex_init(&LOCK_xid_cache, MY_MUTEX_INIT_FAST);
 
2327
  return hash_init(&xid_cache, &my_charset_bin, 100, 0, 0,
 
2328
                   xid_get_hash_key, xid_free_hash, 0) != 0;
 
2329
}
 
2330
 
 
2331
void xid_cache_free()
 
2332
{
 
2333
  if (hash_inited(&xid_cache))
 
2334
  {
 
2335
    hash_free(&xid_cache);
 
2336
    pthread_mutex_destroy(&LOCK_xid_cache);
 
2337
  }
 
2338
}
 
2339
 
 
2340
XID_STATE *xid_cache_search(XID *xid)
 
2341
{
 
2342
  pthread_mutex_lock(&LOCK_xid_cache);
 
2343
  XID_STATE *res=(XID_STATE *)hash_search(&xid_cache, xid->key(), xid->key_length());
 
2344
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2345
  return res;
 
2346
}
 
2347
 
 
2348
 
 
2349
bool xid_cache_insert(XID *xid, enum xa_states xa_state)
 
2350
{
 
2351
  XID_STATE *xs;
 
2352
  bool res;
 
2353
  pthread_mutex_lock(&LOCK_xid_cache);
 
2354
  if (hash_search(&xid_cache, xid->key(), xid->key_length()))
 
2355
    res=0;
 
2356
  else if (!(xs=(XID_STATE *)my_malloc(sizeof(*xs), MYF(MY_WME))))
 
2357
    res=1;
 
2358
  else
 
2359
  {
 
2360
    xs->xa_state=xa_state;
 
2361
    xs->xid.set(xid);
 
2362
    xs->in_session=0;
 
2363
    res=my_hash_insert(&xid_cache, (unsigned char*)xs);
 
2364
  }
 
2365
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2366
  return res;
 
2367
}
 
2368
 
 
2369
 
 
2370
bool xid_cache_insert(XID_STATE *xid_state)
 
2371
{
 
2372
  pthread_mutex_lock(&LOCK_xid_cache);
 
2373
  assert(hash_search(&xid_cache, xid_state->xid.key(),
 
2374
                          xid_state->xid.key_length())==0);
 
2375
  bool res=my_hash_insert(&xid_cache, (unsigned char*)xid_state);
 
2376
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2377
  return res;
 
2378
}
 
2379
 
 
2380
 
 
2381
void xid_cache_delete(XID_STATE *xid_state)
 
2382
{
 
2383
  pthread_mutex_lock(&LOCK_xid_cache);
 
2384
  hash_delete(&xid_cache, (unsigned char *)xid_state);
 
2385
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2386
}
 
2387
 
 
2388
/*
 
2389
  Implementation of interface to write rows to the binary log through the
 
2390
  thread.  The thread is responsible for writing the rows it has
 
2391
  inserted/updated/deleted.
 
2392
*/
 
2393
 
 
2394
 
 
2395
/*
 
2396
  Template member function for ensuring that there is an rows log
 
2397
  event of the apropriate type before proceeding.
 
2398
 
 
2399
  PRE CONDITION:
 
2400
    - Events of type 'RowEventT' have the type code 'type_code'.
 
2401
    
 
2402
  POST CONDITION:
 
2403
    If a non-NULL pointer is returned, the pending event for thread 'session' will
 
2404
    be an event of type 'RowEventT' (which have the type code 'type_code')
 
2405
    will either empty or have enough space to hold 'needed' bytes.  In
 
2406
    addition, the columns bitmap will be correct for the row, meaning that
 
2407
    the pending event will be flushed if the columns in the event differ from
 
2408
    the columns suppled to the function.
 
2409
 
 
2410
  RETURNS
 
2411
    If no error, a non-NULL pending event (either one which already existed or
 
2412
    the newly created one).
 
2413
    If error, NULL.
 
2414
 */
 
2415
 
 
2416
template <class RowsEventT> Rows_log_event* 
 
2417
Session::binlog_prepare_pending_rows_event(Table* table, uint32_t serv_id,
 
2418
                                       size_t needed,
 
2419
                                       bool is_transactional,
 
2420
                                       RowsEventT *hint __attribute__((unused)))
 
2421
{
 
2422
  /* Pre-conditions */
 
2423
  assert(table->s->table_map_id != UINT32_MAX);
 
2424
 
 
2425
  /* Fetch the type code for the RowsEventT template parameter */
 
2426
  int const type_code= RowsEventT::TYPE_CODE;
 
2427
 
 
2428
  /*
 
2429
    There is no good place to set up the transactional data, so we
 
2430
    have to do it here.
 
2431
  */
 
2432
  if (binlog_setup_trx_data())
 
2433
    return(NULL);
 
2434
 
 
2435
  Rows_log_event* pending= binlog_get_pending_rows_event();
 
2436
 
 
2437
  if (unlikely(pending && !pending->is_valid()))
 
2438
    return(NULL);
 
2439
 
 
2440
  /*
 
2441
    Check if the current event is non-NULL and a write-rows
 
2442
    event. Also check if the table provided is mapped: if it is not,
 
2443
    then we have switched to writing to a new table.
 
2444
    If there is no pending event, we need to create one. If there is a pending
 
2445
    event, but it's not about the same table id, or not of the same type
 
2446
    (between Write, Update and Delete), or not the same affected columns, or
 
2447
    going to be too big, flush this event to disk and create a new pending
 
2448
    event.
 
2449
 
 
2450
    The last test is necessary for the Cluster injector to work
 
2451
    correctly. The reason is that the Cluster can inject two write
 
2452
    rows with different column bitmaps if there is an insert followed
 
2453
    by an update in the same transaction, and these are grouped into a
 
2454
    single epoch/transaction when fed to the injector.
 
2455
 
 
2456
    TODO: Fix the code so that the last test can be removed.
 
2457
  */
 
2458
  if (!pending ||
 
2459
      pending->server_id != serv_id || 
 
2460
      pending->get_table_id() != table->s->table_map_id ||
 
2461
      pending->get_type_code() != type_code || 
 
2462
      pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
 
2463
      !bitmap_cmp(pending->get_cols(), table->write_set))
 
2464
    {
 
2465
    /* Create a new RowsEventT... */
 
2466
    Rows_log_event* const
 
2467
        ev= new RowsEventT(this, table, table->s->table_map_id,
 
2468
                           is_transactional);
 
2469
    if (unlikely(!ev))
 
2470
      return(NULL);
 
2471
    ev->server_id= serv_id; // I don't like this, it's too easy to forget.
 
2472
    /*
 
2473
      flush the pending event and replace it with the newly created
 
2474
      event...
 
2475
    */
 
2476
    if (unlikely(mysql_bin_log.flush_and_set_pending_rows_event(this, ev)))
 
2477
    {
 
2478
      delete ev;
 
2479
      return(NULL);
 
2480
    }
 
2481
 
 
2482
    return(ev);               /* This is the new pending event */
 
2483
  }
 
2484
  return(pending);        /* This is the current pending event */
 
2485
}
 
2486
 
 
2487
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
2488
/*
 
2489
  Instantiate the versions we need, we have -fno-implicit-template as
 
2490
  compiling option.
 
2491
*/
 
2492
template Rows_log_event*
 
2493
Session::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
 
2494
                                       Write_rows_log_event*);
 
2495
 
 
2496
template Rows_log_event*
 
2497
Session::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
 
2498
                                       Delete_rows_log_event *);
 
2499
 
 
2500
template Rows_log_event* 
 
2501
Session::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
 
2502
                                       Update_rows_log_event *);
 
2503
#endif
 
2504
 
 
2505
namespace {
 
2506
  /**
 
2507
     Class to handle temporary allocation of memory for row data.
 
2508
 
 
2509
     The responsibilities of the class is to provide memory for
 
2510
     packing one or two rows of packed data (depending on what
 
2511
     constructor is called).
 
2512
 
 
2513
     In order to make the allocation more efficient for "simple" rows,
 
2514
     i.e., rows that do not contain any blobs, a pointer to the
 
2515
     allocated memory is of memory is stored in the table structure
 
2516
     for simple rows.  If memory for a table containing a blob field
 
2517
     is requested, only memory for that is allocated, and subsequently
 
2518
     released when the object is destroyed.
 
2519
 
 
2520
   */
 
2521
  class Row_data_memory {
 
2522
  public:
 
2523
    /**
 
2524
      Build an object to keep track of a block-local piece of memory
 
2525
      for storing a row of data.
 
2526
 
 
2527
      @param table
 
2528
      Table where the pre-allocated memory is stored.
 
2529
 
 
2530
      @param length
 
2531
      Length of data that is needed, if the record contain blobs.
 
2532
     */
 
2533
    Row_data_memory(Table *table, size_t const len1)
 
2534
      : m_memory(0)
 
2535
    {
 
2536
      m_alloc_checked= false;
 
2537
      allocate_memory(table, len1);
 
2538
      m_ptr[0]= has_memory() ? m_memory : 0;
 
2539
      m_ptr[1]= 0;
 
2540
    }
 
2541
 
 
2542
    Row_data_memory(Table *table, size_t const len1, size_t const len2)
 
2543
      : m_memory(0)
 
2544
    {
 
2545
      m_alloc_checked= false;
 
2546
      allocate_memory(table, len1 + len2);
 
2547
      m_ptr[0]= has_memory() ? m_memory        : 0;
 
2548
      m_ptr[1]= has_memory() ? m_memory + len1 : 0;
 
2549
    }
 
2550
 
 
2551
    ~Row_data_memory()
 
2552
    {
 
2553
      if (m_memory != 0 && m_release_memory_on_destruction)
 
2554
        free((unsigned char*) m_memory);
 
2555
    }
 
2556
 
 
2557
    /**
 
2558
       Is there memory allocated?
 
2559
 
 
2560
       @retval true There is memory allocated
 
2561
       @retval false Memory allocation failed
 
2562
     */
 
2563
    bool has_memory() const {
 
2564
      m_alloc_checked= true;
 
2565
      return m_memory != 0;
 
2566
    }
 
2567
 
 
2568
    unsigned char *slot(uint32_t s)
 
2569
    {
 
2570
      assert(s < sizeof(m_ptr)/sizeof(*m_ptr));
 
2571
      assert(m_ptr[s] != 0);
 
2572
      assert(m_alloc_checked == true);
 
2573
      return m_ptr[s];
 
2574
    }
 
2575
 
 
2576
  private:
 
2577
    void allocate_memory(Table *const table, size_t const total_length)
 
2578
    {
 
2579
      if (table->s->blob_fields == 0)
 
2580
      {
 
2581
        /*
 
2582
          The maximum length of a packed record is less than this
 
2583
          length. We use this value instead of the supplied length
 
2584
          when allocating memory for records, since we don't know how
 
2585
          the memory will be used in future allocations.
 
2586
 
 
2587
          Since table->s->reclength is for unpacked records, we have
 
2588
          to add two bytes for each field, which can potentially be
 
2589
          added to hold the length of a packed field.
 
2590
        */
 
2591
        size_t const maxlen= table->s->reclength + 2 * table->s->fields;
 
2592
 
 
2593
        /*
 
2594
          Allocate memory for two records if memory hasn't been
 
2595
          allocated. We allocate memory for two records so that it can
 
2596
          be used when processing update rows as well.
 
2597
        */
 
2598
        if (table->write_row_record == 0)
 
2599
          table->write_row_record=
 
2600
            (unsigned char *) alloc_root(&table->mem_root, 2 * maxlen);
 
2601
        m_memory= table->write_row_record;
 
2602
        m_release_memory_on_destruction= false;
 
2603
      }
 
2604
      else
 
2605
      {
 
2606
        m_memory= (unsigned char *) my_malloc(total_length, MYF(MY_WME));
 
2607
        m_release_memory_on_destruction= true;
 
2608
      }
 
2609
    }
 
2610
 
 
2611
    mutable bool m_alloc_checked;
 
2612
    bool m_release_memory_on_destruction;
 
2613
    unsigned char *m_memory;
 
2614
    unsigned char *m_ptr[2];
 
2615
  };
 
2616
}
 
2617
 
 
2618
 
 
2619
int Session::binlog_write_row(Table* table, bool is_trans, 
 
2620
                          unsigned char const *record) 
 
2621
 
2622
  assert(mysql_bin_log.is_open());
 
2623
 
 
2624
  /*
 
2625
    Pack records into format for transfer. We are allocating more
 
2626
    memory than needed, but that doesn't matter.
 
2627
  */
 
2628
  Row_data_memory memory(table, table->max_row_length(record));
 
2629
  if (!memory.has_memory())
 
2630
    return HA_ERR_OUT_OF_MEM;
 
2631
 
 
2632
  unsigned char *row_data= memory.slot(0);
 
2633
 
 
2634
  size_t const len= pack_row(table, table->write_set, row_data, record);
 
2635
 
 
2636
  Rows_log_event* const ev=
 
2637
    binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
 
2638
                                      static_cast<Write_rows_log_event*>(0));
 
2639
 
 
2640
  if (unlikely(ev == 0))
 
2641
    return HA_ERR_OUT_OF_MEM;
 
2642
 
 
2643
  return ev->add_row_data(row_data, len);
 
2644
}
 
2645
 
 
2646
int Session::binlog_update_row(Table* table, bool is_trans,
 
2647
                           const unsigned char *before_record,
 
2648
                           const unsigned char *after_record)
 
2649
 
2650
  assert(mysql_bin_log.is_open());
 
2651
 
 
2652
  size_t const before_maxlen = table->max_row_length(before_record);
 
2653
  size_t const after_maxlen  = table->max_row_length(after_record);
 
2654
 
 
2655
  Row_data_memory row_data(table, before_maxlen, after_maxlen);
 
2656
  if (!row_data.has_memory())
 
2657
    return HA_ERR_OUT_OF_MEM;
 
2658
 
 
2659
  unsigned char *before_row= row_data.slot(0);
 
2660
  unsigned char *after_row= row_data.slot(1);
 
2661
 
 
2662
  size_t const before_size= pack_row(table, table->read_set, before_row,
 
2663
                                        before_record);
 
2664
  size_t const after_size= pack_row(table, table->write_set, after_row,
 
2665
                                       after_record);
 
2666
 
 
2667
  Rows_log_event* const ev=
 
2668
    binlog_prepare_pending_rows_event(table, server_id,
 
2669
                                      before_size + after_size, is_trans,
 
2670
                                      static_cast<Update_rows_log_event*>(0));
 
2671
 
 
2672
  if (unlikely(ev == 0))
 
2673
    return HA_ERR_OUT_OF_MEM;
 
2674
 
 
2675
  return
 
2676
    ev->add_row_data(before_row, before_size) ||
 
2677
    ev->add_row_data(after_row, after_size);
 
2678
}
 
2679
 
 
2680
int Session::binlog_delete_row(Table* table, bool is_trans, 
 
2681
                           unsigned char const *record)
 
2682
 
2683
  assert(mysql_bin_log.is_open());
 
2684
 
 
2685
  /* 
 
2686
     Pack records into format for transfer. We are allocating more
 
2687
     memory than needed, but that doesn't matter.
 
2688
  */
 
2689
  Row_data_memory memory(table, table->max_row_length(record));
 
2690
  if (unlikely(!memory.has_memory()))
 
2691
    return HA_ERR_OUT_OF_MEM;
 
2692
 
 
2693
  unsigned char *row_data= memory.slot(0);
 
2694
 
 
2695
  size_t const len= pack_row(table, table->read_set, row_data, record);
 
2696
 
 
2697
  Rows_log_event* const ev=
 
2698
    binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
 
2699
                                      static_cast<Delete_rows_log_event*>(0));
 
2700
 
 
2701
  if (unlikely(ev == 0))
 
2702
    return HA_ERR_OUT_OF_MEM;
 
2703
 
 
2704
  return ev->add_row_data(row_data, len);
 
2705
}
 
2706
 
 
2707
 
 
2708
int Session::binlog_flush_pending_rows_event(bool stmt_end)
 
2709
{
 
2710
  /*
 
2711
    We shall flush the pending event even if we are not in row-based
 
2712
    mode: it might be the case that we left row-based mode before
 
2713
    flushing anything (e.g., if we have explicitly locked tables).
 
2714
   */
 
2715
  if (!mysql_bin_log.is_open())
 
2716
    return(0);
 
2717
 
 
2718
  /*
 
2719
    Mark the event as the last event of a statement if the stmt_end
 
2720
    flag is set.
 
2721
  */
 
2722
  int error= 0;
 
2723
  if (Rows_log_event *pending= binlog_get_pending_rows_event())
 
2724
  {
 
2725
    if (stmt_end)
 
2726
    {
 
2727
      pending->set_flags(Rows_log_event::STMT_END_F);
 
2728
      pending->flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
 
2729
      binlog_table_maps= 0;
 
2730
    }
 
2731
 
 
2732
    error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0);
 
2733
  }
 
2734
 
 
2735
  return(error);
 
2736
}
 
2737
 
 
2738
 
 
2739
/*
 
2740
  Member function that will log query, either row-based or
 
2741
  statement-based depending on the value of the 'current_stmt_binlog_row_based'
 
2742
  the value of the 'qtype' flag.
 
2743
 
 
2744
  This function should be called after the all calls to ha_*_row()
 
2745
  functions have been issued, but before tables are unlocked and
 
2746
  closed.
 
2747
 
 
2748
  OBSERVE
 
2749
    There shall be no writes to any system table after calling
 
2750
    binlog_query(), so these writes has to be moved to before the call
 
2751
    of binlog_query() for correct functioning.
 
2752
 
 
2753
    This is necessesary not only for RBR, but the master might crash
 
2754
    after binlogging the query but before changing the system tables.
 
2755
    This means that the slave and the master are not in the same state
 
2756
    (after the master has restarted), so therefore we have to
 
2757
    eliminate this problem.
 
2758
 
 
2759
  RETURN VALUE
 
2760
    Error code, or 0 if no error.
 
2761
*/
 
2762
int Session::binlog_query(Session::enum_binlog_query_type qtype, char const *query_arg,
 
2763
                      ulong query_len, bool is_trans, bool suppress_use,
 
2764
                      Session::killed_state killed_status_arg)
 
2765
{
 
2766
  assert(query_arg && mysql_bin_log.is_open());
 
2767
 
 
2768
  if (int error= binlog_flush_pending_rows_event(true))
 
2769
    return(error);
 
2770
 
 
2771
  /*
 
2772
    If we are in statement mode and trying to log an unsafe statement,
 
2773
    we should print a warning.
 
2774
  */
 
2775
  if (lex->is_stmt_unsafe() &&
 
2776
      variables.binlog_format == BINLOG_FORMAT_STMT)
 
2777
  {
 
2778
    assert(this->query != NULL);
 
2779
    push_warning(this, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
2780
                 ER_BINLOG_UNSAFE_STATEMENT,
 
2781
                 ER(ER_BINLOG_UNSAFE_STATEMENT));
 
2782
    if (!(binlog_flags & BINLOG_FLAG_UNSAFE_STMT_PRINTED))
 
2783
    {
 
2784
      char warn_buf[DRIZZLE_ERRMSG_SIZE];
 
2785
      snprintf(warn_buf, DRIZZLE_ERRMSG_SIZE, "%s Statement: %s",
 
2786
               ER(ER_BINLOG_UNSAFE_STATEMENT), this->query);
 
2787
      sql_print_warning("%s",warn_buf);
 
2788
      binlog_flags|= BINLOG_FLAG_UNSAFE_STMT_PRINTED;
 
2789
    }
 
2790
  }
 
2791
 
 
2792
  switch (qtype) {
 
2793
  case Session::ROW_QUERY_TYPE:
 
2794
    return(0);
 
2795
  case Session::DRIZZLE_QUERY_TYPE:
 
2796
    /*
 
2797
      Using this query type is a conveniece hack, since we have been
 
2798
      moving back and forth between using RBR for replication of
 
2799
      system tables and not using it.
 
2800
 
 
2801
      Make sure to change in check_table_binlog_row_based() according
 
2802
      to how you treat this.
 
2803
    */
 
2804
  case Session::STMT_QUERY_TYPE:
 
2805
    /*
 
2806
      The DRIZZLE_LOG::write() function will set the STMT_END_F flag and
 
2807
      flush the pending rows event if necessary.
 
2808
     */
 
2809
    {
 
2810
      Query_log_event qinfo(this, query_arg, query_len, is_trans, suppress_use,
 
2811
                            killed_status_arg);
 
2812
      qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
 
2813
      /*
 
2814
        Binlog table maps will be irrelevant after a Query_log_event
 
2815
        (they are just removed on the slave side) so after the query
 
2816
        log event is written to the binary log, we pretend that no
 
2817
        table maps were written.
 
2818
       */
 
2819
      int error= mysql_bin_log.write(&qinfo);
 
2820
      binlog_table_maps= 0;
 
2821
      return(error);
 
2822
    }
 
2823
    break;
 
2824
 
 
2825
  case Session::QUERY_TYPE_COUNT:
 
2826
  default:
 
2827
    assert(0 <= qtype && qtype < QUERY_TYPE_COUNT);
 
2828
  }
 
2829
  return(0);
 
2830
}
 
2831
 
 
2832
bool Discrete_intervals_list::append(uint64_t start, uint64_t val,
 
2833
                                 uint64_t incr)
 
2834
{
 
2835
  /* first, see if this can be merged with previous */
 
2836
  if ((head == NULL) || tail->merge_if_contiguous(start, val, incr))
 
2837
  {
 
2838
    /* it cannot, so need to add a new interval */
 
2839
    Discrete_interval *new_interval= new Discrete_interval(start, val, incr);
 
2840
    return(append(new_interval));
 
2841
  }
 
2842
  return(0);
 
2843
}
 
2844
 
 
2845
bool Discrete_intervals_list::append(Discrete_interval *new_interval)
 
2846
{
 
2847
  if (unlikely(new_interval == NULL))
 
2848
    return(1);
 
2849
  if (head == NULL)
 
2850
    head= current= new_interval;
 
2851
  else
 
2852
    tail->next= new_interval;
 
2853
  tail= new_interval;
 
2854
  elements++;
 
2855
  return(0);
 
2856
}
 
2857
 
 
2858
/**
 
2859
  Close a connection.
 
2860
 
 
2861
  @param session                Thread handle
 
2862
  @param errcode        Error code to print to console
 
2863
  @param lock           1 if we have have to lock LOCK_thread_count
 
2864
 
 
2865
  @note
 
2866
    For the connection that is doing shutdown, this is called twice
 
2867
*/
 
2868
void close_connection(Session *session, uint32_t errcode, bool lock)
 
2869
{
 
2870
  st_vio *vio;
 
2871
  if (lock)
 
2872
    (void) pthread_mutex_lock(&LOCK_thread_count);
 
2873
  session->killed= Session::KILL_CONNECTION;
 
2874
  if ((vio= session->net.vio) != 0)
 
2875
  {
 
2876
    if (errcode)
 
2877
      net_send_error(session, errcode, ER(errcode)); /* purecov: inspected */
 
2878
    net_close(&(session->net));         /* vio is freed in delete session */
 
2879
  }
 
2880
  if (lock)
 
2881
    (void) pthread_mutex_unlock(&LOCK_thread_count);
 
2882
  return;;
 
2883
}