~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_class.cc

  • Committer: Brian Aker
  • Date: 2008-10-29 13:46:43 UTC
  • Revision ID: brian@tangent.org-20081029134643-z6jcwjvyruhk2vlu
Updates for ignore file.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2008 Sun Microsystems, Inc.
5
 
 *
6
 
 *  This program is free software; you can redistribute it and/or modify
7
 
 *  it under the terms of the GNU General Public License as published by
8
 
 *  the Free Software Foundation; version 2 of the License.
9
 
 *
10
 
 *  This program is distributed in the hope that it will be useful,
11
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 *  GNU General Public License for more details.
14
 
 *
15
 
 *  You should have received a copy of the GNU General Public License
16
 
 *  along with this program; if not, write to the Free Software
17
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
 
 */
19
 
 
20
 
/**
21
 
 * @file Implementation of the Session class and API
22
 
 */
23
 
 
24
 
#include <config.h>
25
 
 
26
 
#include <drizzled/copy_field.h>
27
 
#include <drizzled/data_home.h>
28
 
#include <drizzled/display.h>
29
 
#include <drizzled/drizzled.h>
 
1
/* Copyright (C) 2000-2006 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
/*****************************************************************************
 
18
**
 
19
** This file implements classes defined in sql_class.h
 
20
** Especially the classes to handle a result from a select
 
21
**
 
22
*****************************************************************************/
 
23
#include <drizzled/server_includes.h>
 
24
#include "rpl_rli.h"
 
25
#include "rpl_record.h"
 
26
#include "log_event.h"
 
27
#include <sys/stat.h>
 
28
#include <mysys/thr_alarm.h>
 
29
#include <mysys/mysys_err.h>
30
30
#include <drizzled/error.h>
31
 
#include <drizzled/gettext.h>
32
 
#include <drizzled/identifier.h>
33
 
#include <drizzled/internal/iocache.h>
34
 
#include <drizzled/internal/thread_var.h>
35
 
#include <drizzled/internal_error_handler.h>
36
 
#include <drizzled/item/cache.h>
37
 
#include <drizzled/item/empty_string.h>
38
 
#include <drizzled/item/float.h>
39
 
#include <drizzled/item/return_int.h>
40
 
#include <drizzled/lock.h>
41
 
#include <drizzled/plugin/authentication.h>
42
 
#include <drizzled/plugin/client.h>
43
 
#include <drizzled/plugin/event_observer.h>
44
 
#include <drizzled/plugin/logging.h>
45
 
#include <drizzled/plugin/query_rewrite.h>
46
 
#include <drizzled/plugin/scheduler.h>
47
 
#include <drizzled/plugin/transactional_storage_engine.h>
48
 
#include <drizzled/probes.h>
49
 
#include <drizzled/pthread_globals.h>
 
31
#include <drizzled/innodb_plugin_extras.h>
50
32
#include <drizzled/query_id.h>
51
 
#include <drizzled/refresh_version.h>
52
 
#include <drizzled/select_dump.h>
53
 
#include <drizzled/select_exists_subselect.h>
54
 
#include <drizzled/select_export.h>
55
 
#include <drizzled/select_max_min_finder_subselect.h>
56
 
#include <drizzled/select_singlerow_subselect.h>
57
 
#include <drizzled/select_subselect.h>
58
 
#include <drizzled/select_to_file.h>
59
 
#include <drizzled/session.h>
60
 
#include <drizzled/session/cache.h>
61
 
#include <drizzled/show.h>
62
 
#include <drizzled/sql_base.h>
63
 
#include <drizzled/table/singular.h>
64
 
#include <drizzled/table_proto.h>
65
 
#include <drizzled/tmp_table_param.h>
66
 
#include <drizzled/transaction_services.h>
67
 
#include <drizzled/user_var_entry.h>
68
 
#include <drizzled/util/functors.h>
69
 
#include <plugin/myisam/myisam.h>
70
 
 
71
 
#include <algorithm>
72
 
#include <climits>
73
 
#include <fcntl.h>
74
 
#include <sys/stat.h>
75
 
 
76
 
#include <boost/filesystem.hpp>
77
 
#include <boost/checked_delete.hpp>
78
 
 
79
 
#include <drizzled/util/backtrace.h>
80
 
 
81
 
#include <drizzled/schema.h>
82
 
 
83
 
using namespace std;
84
 
 
85
 
namespace fs=boost::filesystem;
86
 
namespace drizzled
87
 
{
88
33
 
89
34
/*
90
35
  The following is used to initialise Table_ident with a internal
95
40
 
96
41
const char * const Session::DEFAULT_WHERE= "field list";
97
42
 
 
43
 
 
44
/*****************************************************************************
 
45
** Instansiate templates
 
46
*****************************************************************************/
 
47
 
 
48
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
49
/* Used templates */
 
50
template class List<Key>;
 
51
template class List_iterator<Key>;
 
52
template class List<Key_part_spec>;
 
53
template class List_iterator<Key_part_spec>;
 
54
template class List<Alter_drop>;
 
55
template class List_iterator<Alter_drop>;
 
56
template class List<Alter_column>;
 
57
template class List_iterator<Alter_column>;
 
58
#endif
 
59
 
 
60
/****************************************************************************
 
61
** User variables
 
62
****************************************************************************/
 
63
 
 
64
extern "C" unsigned char *get_var_key(user_var_entry *entry, size_t *length,
 
65
                              bool not_used __attribute__((unused)))
 
66
{
 
67
  *length= entry->name.length;
 
68
  return (unsigned char*) entry->name.str;
 
69
}
 
70
 
 
71
extern "C" void free_user_var(user_var_entry *entry)
 
72
{
 
73
  char *pos= (char*) entry+ALIGN_SIZE(sizeof(*entry));
 
74
  if (entry->value && entry->value != pos)
 
75
    free(entry->value);
 
76
  free((char*) entry);
 
77
}
 
78
 
98
79
bool Key_part_spec::operator==(const Key_part_spec& other) const
99
80
{
100
81
  return length == other.length &&
101
82
         field_name.length == other.field_name.length &&
102
 
    !my_strcasecmp(system_charset_info, field_name.str, other.field_name.str);
103
 
}
104
 
 
105
 
Open_tables_state::Open_tables_state(uint64_t version_arg) :
106
 
  version(version_arg)
107
 
{
108
 
  open_tables= temporary_tables= derived_tables= NULL;
109
 
  extra_lock= lock= NULL;
 
83
         !strcmp(field_name.str, other.field_name.str);
 
84
}
 
85
 
 
86
/**
 
87
  Construct an (almost) deep copy of this key. Only those
 
88
  elements that are known to never change are not copied.
 
89
  If out of memory, a partial copy is returned and an error is set
 
90
  in Session.
 
91
*/
 
92
 
 
93
Key::Key(const Key &rhs, MEM_ROOT *mem_root)
 
94
  :type(rhs.type),
 
95
  key_create_info(rhs.key_create_info),
 
96
  columns(rhs.columns, mem_root),
 
97
  name(rhs.name),
 
98
  generated(rhs.generated)
 
99
{
 
100
  list_copy_and_replace_each_value(columns, mem_root);
 
101
}
 
102
 
 
103
/**
 
104
  Construct an (almost) deep copy of this foreign key. Only those
 
105
  elements that are known to never change are not copied.
 
106
  If out of memory, a partial copy is returned and an error is set
 
107
  in Session.
 
108
*/
 
109
 
 
110
Foreign_key::Foreign_key(const Foreign_key &rhs, MEM_ROOT *mem_root)
 
111
  :Key(rhs),
 
112
  ref_table(rhs.ref_table),
 
113
  ref_columns(rhs.ref_columns),
 
114
  delete_opt(rhs.delete_opt),
 
115
  update_opt(rhs.update_opt),
 
116
  match_opt(rhs.match_opt)
 
117
{
 
118
  list_copy_and_replace_each_value(ref_columns, mem_root);
 
119
}
 
120
 
 
121
/*
 
122
  Test if a foreign key (= generated key) is a prefix of the given key
 
123
  (ignoring key name, key type and order of columns)
 
124
 
 
125
  NOTES:
 
126
    This is only used to test if an index for a FOREIGN KEY exists
 
127
 
 
128
  IMPLEMENTATION
 
129
    We only compare field names
 
130
 
 
131
  RETURN
 
132
    0   Generated key is a prefix of other key
 
133
    1   Not equal
 
134
*/
 
135
 
 
136
bool foreign_key_prefix(Key *a, Key *b)
 
137
{
 
138
  /* Ensure that 'a' is the generated key */
 
139
  if (a->generated)
 
140
  {
 
141
    if (b->generated && a->columns.elements > b->columns.elements)
 
142
      std::swap(a, b);                       // Put shorter key in 'a'
 
143
  }
 
144
  else
 
145
  {
 
146
    if (!b->generated)
 
147
      return true;                              // No foreign key
 
148
    std::swap(a, b);                       // Put generated key in 'a'
 
149
  }
 
150
 
 
151
  /* Test if 'a' is a prefix of 'b' */
 
152
  if (a->columns.elements > b->columns.elements)
 
153
    return true;                                // Can't be prefix
 
154
 
 
155
  List_iterator<Key_part_spec> col_it1(a->columns);
 
156
  List_iterator<Key_part_spec> col_it2(b->columns);
 
157
  const Key_part_spec *col1, *col2;
 
158
 
 
159
#ifdef ENABLE_WHEN_INNODB_CAN_HANDLE_SWAPED_FOREIGN_KEY_COLUMNS
 
160
  while ((col1= col_it1++))
 
161
  {
 
162
    bool found= 0;
 
163
    col_it2.rewind();
 
164
    while ((col2= col_it2++))
 
165
    {
 
166
      if (*col1 == *col2)
 
167
      {
 
168
        found= true;
 
169
        break;
 
170
      }
 
171
    }
 
172
    if (!found)
 
173
      return true;                              // Error
 
174
  }
 
175
  return false;                                 // Is prefix
 
176
#else
 
177
  while ((col1= col_it1++))
 
178
  {
 
179
    col2= col_it2++;
 
180
    if (!(*col1 == *col2))
 
181
      return true;
 
182
  }
 
183
  return false;                                 // Is prefix
 
184
#endif
 
185
}
 
186
 
 
187
 
 
188
/*
 
189
  Check if the foreign key options are compatible with columns
 
190
  on which the FK is created.
 
191
 
 
192
  RETURN
 
193
    0   Key valid
 
194
    1   Key invalid
 
195
*/
 
196
bool Foreign_key::validate(List<Create_field> &table_fields)
 
197
{
 
198
  Create_field  *sql_field;
 
199
  Key_part_spec *column;
 
200
  List_iterator<Key_part_spec> cols(columns);
 
201
  List_iterator<Create_field> it(table_fields);
 
202
  while ((column= cols++))
 
203
  {
 
204
    it.rewind();
 
205
    while ((sql_field= it++) &&
 
206
           my_strcasecmp(system_charset_info,
 
207
                         column->field_name.str,
 
208
                         sql_field->field_name)) {}
 
209
    if (!sql_field)
 
210
    {
 
211
      my_error(ER_KEY_COLUMN_DOES_NOT_EXITS, MYF(0), column->field_name.str);
 
212
      return true;
 
213
    }
 
214
    if (type == Key::FOREIGN_KEY && sql_field->vcol_info)
 
215
    {
 
216
      if (delete_opt == FK_OPTION_SET_NULL)
 
217
      {
 
218
        my_error(ER_WRONG_FK_OPTION_FOR_VIRTUAL_COLUMN, MYF(0), 
 
219
                 "ON DELETE SET NULL");
 
220
        return true;
 
221
      }
 
222
      if (update_opt == FK_OPTION_SET_NULL)
 
223
      {
 
224
        my_error(ER_WRONG_FK_OPTION_FOR_VIRTUAL_COLUMN, MYF(0), 
 
225
                 "ON UPDATE SET NULL");
 
226
        return true;
 
227
      }
 
228
      if (update_opt == FK_OPTION_CASCADE)
 
229
      {
 
230
        my_error(ER_WRONG_FK_OPTION_FOR_VIRTUAL_COLUMN, MYF(0), 
 
231
                 "ON UPDATE CASCADE");
 
232
        return true;
 
233
      }
 
234
    }
 
235
  }
 
236
  return false;
 
237
}
 
238
 
 
239
 
 
240
/****************************************************************************
 
241
** Thread specific functions
 
242
****************************************************************************/
 
243
 
 
244
Open_tables_state::Open_tables_state(ulong version_arg)
 
245
  :version(version_arg), state_flags(0U)
 
246
{
 
247
  reset_open_tables_state();
110
248
}
111
249
 
112
250
/*
113
251
  The following functions form part of the C plugin API
114
252
*/
115
 
int tmpfile(const char *prefix)
 
253
 
 
254
extern "C" int mysql_tmpfile(const char *prefix)
116
255
{
117
256
  char filename[FN_REFLEN];
118
 
  int fd = internal::create_temp_file(filename, drizzle_tmpdir.c_str(), prefix, MYF(MY_WME));
 
257
  File fd = create_temp_file(filename, mysql_tmpdir, prefix,
 
258
                             O_CREAT | O_EXCL | O_RDWR,
 
259
                             MYF(MY_WME));
119
260
  if (fd >= 0) {
120
261
    unlink(filename);
121
262
  }
123
264
  return fd;
124
265
}
125
266
 
126
 
void **Session::getEngineData(const plugin::MonitoredInTransaction *monitored)
127
 
{
128
 
  return static_cast<void **>(&ha_data[monitored->getId()].ha_ptr);
129
 
}
130
 
 
131
 
ResourceContext *Session::getResourceContext(const plugin::MonitoredInTransaction *monitored,
132
 
                                             size_t index)
133
 
{
134
 
  return &ha_data[monitored->getId()].resource_context[index];
135
 
}
136
 
 
 
267
 
 
268
extern "C"
 
269
int session_in_lock_tables(const Session *session)
 
270
{
 
271
  return test(session->in_lock_tables);
 
272
}
 
273
 
 
274
 
 
275
extern "C"
 
276
int session_tablespace_op(const Session *session)
 
277
{
 
278
  return test(session->tablespace_op);
 
279
}
 
280
 
 
281
 
 
282
/**
 
283
   Set the process info field of the Session structure.
 
284
 
 
285
   This function is used by plug-ins. Internally, the
 
286
   Session::set_proc_info() function should be used.
 
287
 
 
288
   @see Session::set_proc_info
 
289
 */
 
290
extern "C" void
 
291
set_session_proc_info(Session *session, const char *info)
 
292
{
 
293
  session->set_proc_info(info);
 
294
}
 
295
 
 
296
extern "C"
 
297
const char *get_session_proc_info(Session *session)
 
298
{
 
299
  return session->get_proc_info();
 
300
}
 
301
 
 
302
extern "C"
 
303
void **session_ha_data(const Session *session, const struct handlerton *hton)
 
304
{
 
305
  return (void **) &session->ha_data[hton->slot].ha_ptr;
 
306
}
 
307
 
 
308
extern "C"
137
309
int64_t session_test_options(const Session *session, int64_t test_options)
138
310
{
139
311
  return session->options & test_options;
140
312
}
141
313
 
142
 
Session::Session(plugin::Client *client_arg, catalog::Instance::shared_ptr catalog_arg) :
143
 
  Open_tables_state(refresh_version),
144
 
  mem_root(&main_mem_root),
145
 
  xa_id(0),
146
 
  lex(&main_lex),
147
 
  query(new std::string),
148
 
  _schema(new std::string("")),
149
 
  client(client_arg),
150
 
  scheduler(NULL),
151
 
  scheduler_arg(NULL),
152
 
  lock_id(&main_lock_id),
153
 
  thread_stack(NULL),
154
 
  security_ctx(identifier::User::make_shared()),
155
 
  _where(Session::DEFAULT_WHERE),
156
 
  dbug_sentry(Session_SENTRY_MAGIC),
157
 
  mysys_var(0),
158
 
  command(COM_CONNECT),
159
 
  file_id(0),
160
 
  _epoch(boost::gregorian::date(1970,1,1)),
161
 
  _connect_time(boost::posix_time::microsec_clock::universal_time()),
162
 
  utime_after_lock(0),
163
 
  ha_data(plugin::num_trx_monitored_objects),
164
 
  query_id(0),
165
 
  warn_query_id(0),
166
 
  concurrent_execute_allowed(true),
167
 
  arg_of_last_insert_id_function(false),
168
 
  first_successful_insert_id_in_prev_stmt(0),
169
 
  first_successful_insert_id_in_cur_stmt(0),
170
 
  limit_found_rows(0),
171
 
  options(session_startup_options),
172
 
  row_count_func(-1),
173
 
  sent_row_count(0),
174
 
  examined_row_count(0),
175
 
  used_tables(0),
176
 
  total_warn_count(0),
177
 
  col_access(0),
178
 
  statement_id_counter(0),
179
 
  row_count(0),
180
 
  thread_id(0),
181
 
  tmp_table(0),
182
 
  _global_read_lock(NONE),
183
 
  count_cuted_fields(CHECK_FIELD_ERROR_FOR_NULL),
184
 
  _killed(NOT_KILLED),
185
 
  some_tables_deleted(false),
186
 
  no_errors(false),
187
 
  password(false),
188
 
  is_fatal_error(false),
189
 
  transaction_rollback_request(false),
190
 
  is_fatal_sub_stmt_error(0),
191
 
  tablespace_op(false),
192
 
  derived_tables_processing(false),
193
 
  m_lip(NULL),
194
 
  cached_table(0),
195
 
  transaction_message(NULL),
196
 
  statement_message(NULL),
197
 
  session_event_observers(NULL),
198
 
  _catalog(catalog_arg),
199
 
  use_usage(false)
200
 
{
201
 
  client->setSession(this);
 
314
extern "C"
 
315
int session_sql_command(const Session *session)
 
316
{
 
317
  return (int) session->lex->sql_command;
 
318
}
 
319
 
 
320
extern "C"
 
321
int session_tx_isolation(const Session *session)
 
322
{
 
323
  return (int) session->variables.tx_isolation;
 
324
}
 
325
 
 
326
extern "C"
 
327
void session_inc_row_count(Session *session)
 
328
{
 
329
  session->row_count++;
 
330
}
 
331
 
 
332
/**
 
333
  Clear this diagnostics area. 
 
334
 
 
335
  Normally called at the end of a statement.
 
336
*/
 
337
 
 
338
void
 
339
Diagnostics_area::reset_diagnostics_area()
 
340
{
 
341
  can_overwrite_status= false;
 
342
  /** Don't take chances in production */
 
343
  m_message[0]= '\0';
 
344
  m_sql_errno= 0;
 
345
  m_server_status= 0;
 
346
  m_affected_rows= 0;
 
347
  m_last_insert_id= 0;
 
348
  m_total_warn_count= 0;
 
349
  is_sent= false;
 
350
  /** Tiny reset in debug mode to see garbage right away */
 
351
  m_status= DA_EMPTY;
 
352
}
 
353
 
 
354
 
 
355
/**
 
356
  Set OK status -- ends commands that do not return a
 
357
  result set, e.g. INSERT/UPDATE/DELETE.
 
358
*/
 
359
 
 
360
void
 
361
Diagnostics_area::set_ok_status(Session *session, ha_rows affected_rows_arg,
 
362
                                uint64_t last_insert_id_arg,
 
363
                                const char *message_arg)
 
364
{
 
365
  assert(! is_set());
 
366
  /*
 
367
    In production, refuse to overwrite an error or a custom response
 
368
    with an OK packet.
 
369
  */
 
370
  if (is_error() || is_disabled())
 
371
    return;
 
372
  /** Only allowed to report success if has not yet reported an error */
 
373
 
 
374
  m_server_status= session->server_status;
 
375
  m_total_warn_count= session->total_warn_count;
 
376
  m_affected_rows= affected_rows_arg;
 
377
  m_last_insert_id= last_insert_id_arg;
 
378
  if (message_arg)
 
379
    strmake(m_message, message_arg, sizeof(m_message) - 1);
 
380
  else
 
381
    m_message[0]= '\0';
 
382
  m_status= DA_OK;
 
383
}
 
384
 
 
385
 
 
386
/**
 
387
  Set EOF status.
 
388
*/
 
389
 
 
390
void
 
391
Diagnostics_area::set_eof_status(Session *session)
 
392
{
 
393
  /** Only allowed to report eof if has not yet reported an error */
 
394
 
 
395
  assert(! is_set());
 
396
  /*
 
397
    In production, refuse to overwrite an error or a custom response
 
398
    with an EOF packet.
 
399
  */
 
400
  if (is_error() || is_disabled())
 
401
    return;
 
402
 
 
403
  m_server_status= session->server_status;
 
404
  /*
 
405
    If inside a stored procedure, do not return the total
 
406
    number of warnings, since they are not available to the client
 
407
    anyway.
 
408
  */
 
409
  m_total_warn_count= session->total_warn_count;
 
410
 
 
411
  m_status= DA_EOF;
 
412
}
 
413
 
 
414
/**
 
415
  Set ERROR status.
 
416
*/
 
417
 
 
418
void
 
419
Diagnostics_area::set_error_status(Session *session __attribute__((unused)),
 
420
                                   uint32_t sql_errno_arg,
 
421
                                   const char *message_arg)
 
422
{
 
423
  /*
 
424
    Only allowed to report error if has not yet reported a success
 
425
    The only exception is when we flush the message to the client,
 
426
    an error can happen during the flush.
 
427
  */
 
428
  assert(! is_set() || can_overwrite_status);
 
429
  /*
 
430
    In production, refuse to overwrite a custom response with an
 
431
    ERROR packet.
 
432
  */
 
433
  if (is_disabled())
 
434
    return;
 
435
 
 
436
  m_sql_errno= sql_errno_arg;
 
437
  strmake(m_message, message_arg, sizeof(m_message) - 1);
 
438
 
 
439
  m_status= DA_ERROR;
 
440
}
 
441
 
 
442
 
 
443
/**
 
444
  Mark the diagnostics area as 'DISABLED'.
 
445
 
 
446
  This is used in rare cases when the COM_ command at hand sends a response
 
447
  in a custom format. One example is the query cache, another is
 
448
  COM_STMT_PREPARE.
 
449
*/
 
450
 
 
451
void
 
452
Diagnostics_area::disable_status()
 
453
{
 
454
  assert(! is_set());
 
455
  m_status= DA_DISABLED;
 
456
}
 
457
 
 
458
 
 
459
Session::Session()
 
460
   :Statement(&main_lex, &main_mem_root,
 
461
              /* statement id */ 0),
 
462
   Open_tables_state(refresh_version), rli_fake(0),
 
463
   lock_id(&main_lock_id),
 
464
   user_time(0),
 
465
   binlog_table_maps(0), binlog_flags(0UL),
 
466
   arg_of_last_insert_id_function(false),
 
467
   first_successful_insert_id_in_prev_stmt(0),
 
468
   first_successful_insert_id_in_prev_stmt_for_binlog(0),
 
469
   first_successful_insert_id_in_cur_stmt(0),
 
470
   stmt_depends_on_first_successful_insert_id_in_prev_stmt(false),
 
471
   global_read_lock(0),
 
472
   is_fatal_error(0),
 
473
   transaction_rollback_request(0),
 
474
   is_fatal_sub_stmt_error(0),
 
475
   rand_used(0),
 
476
   time_zone_used(0),
 
477
   in_lock_tables(0),
 
478
   derived_tables_processing(false),
 
479
   m_lip(NULL)
 
480
{
 
481
  ulong tmp;
202
482
 
203
483
  /*
204
484
    Pass nominal parameters to init_alloc_root only to ensure that
205
485
    the destructor works OK in case of an error. The main_mem_root
206
486
    will be re-initialized in init_for_queries().
207
487
  */
208
 
  memory::init_sql_alloc(&main_mem_root, memory::ROOT_MIN_BLOCK_SIZE, 0);
 
488
  init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0);
 
489
  thread_stack= 0;
 
490
  catalog= (char*)"std"; // the only catalog we have for now
 
491
  main_security_ctx.init();
 
492
  security_ctx= &main_security_ctx;
 
493
  some_tables_deleted=no_errors=password= 0;
 
494
  query_start_used= 0;
 
495
  count_cuted_fields= CHECK_FIELD_IGNORE;
 
496
  killed= NOT_KILLED;
 
497
  col_access=0;
 
498
  is_slave_error= thread_specific_used= false;
 
499
  hash_clear(&handler_tables_hash);
 
500
  tmp_table=0;
 
501
  used_tables=0;
209
502
  cuted_fields= sent_row_count= row_count= 0L;
 
503
  limit_found_rows= 0;
 
504
  row_count_func= -1;
 
505
  statement_id_counter= 0UL;
210
506
  // Must be reset to handle error with Session's created for init of mysqld
211
507
  lex->current_select= 0;
 
508
  start_time=(time_t) 0;
 
509
  start_utime= 0L;
 
510
  utime_after_lock= 0L;
 
511
  current_linfo =  0;
 
512
  slave_thread = 0;
212
513
  memset(&variables, 0, sizeof(variables));
213
 
  scoreboard_index= -1;
214
 
  cleanup_done= abort_on_warning= no_warnings_for_error= false;  
215
 
 
216
 
  /* query_cache init */
217
 
  query_cache_key= "";
218
 
  resultset= NULL;
 
514
  thread_id= 0;
 
515
  one_shot_set= 0;
 
516
  file_id = 0;
 
517
  query_id= 0;
 
518
  warn_id= 0;
 
519
  db_charset= global_system_variables.collation_database;
 
520
  memset(ha_data, 0, sizeof(ha_data));
 
521
  mysys_var=0;
 
522
  binlog_evt_union.do_union= false;
 
523
  dbug_sentry=Session_SENTRY_MAGIC;
 
524
  net.vio=0;
 
525
  client_capabilities= 0;                       // minimalistic client
 
526
  system_thread= NON_SYSTEM_THREAD;
 
527
  cleanup_done= abort_on_warning= no_warnings_for_error= 0;
 
528
  peer_port= 0;                                 // For SHOW PROCESSLIST
 
529
  transaction.m_pending_rows_event= 0;
 
530
  transaction.on= 1;
 
531
  pthread_mutex_init(&LOCK_delete, MY_MUTEX_INIT_FAST);
219
532
 
220
533
  /* Variables with default values */
221
534
  proc_info="login";
222
 
 
223
 
  plugin_sessionvar_init(this);
224
 
  /*
225
 
    variables= global_system_variables above has reset
226
 
    variables.pseudo_thread_id to 0. We need to correct it here to
227
 
    avoid temporary tables replication failure.
228
 
  */
229
 
  variables.pseudo_thread_id= thread_id;
230
 
  server_status= SERVER_STATUS_AUTOCOMMIT;
231
 
 
232
 
  if (variables.max_join_size == HA_POS_ERROR)
233
 
    options |= OPTION_BIG_SELECTS;
234
 
  else
235
 
    options &= ~OPTION_BIG_SELECTS;
236
 
 
237
 
  open_options=ha_open_options;
238
 
  update_lock_default= TL_WRITE;
239
 
  session_tx_isolation= (enum_tx_isolation) variables.tx_isolation;
240
 
  warn_list.clear();
241
 
  memset(warn_count, 0, sizeof(warn_count));
242
 
  memset(&status_var, 0, sizeof(status_var));
243
 
 
 
535
  where= Session::DEFAULT_WHERE;
 
536
  server_id = ::server_id;
 
537
  slave_net = 0;
 
538
  command=COM_CONNECT;
 
539
  *scramble= '\0';
 
540
 
 
541
  init();
244
542
  /* Initialize sub structures */
245
 
  memory::init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
246
 
 
 
543
  init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
 
544
  user_connect=(USER_CONN *)0;
 
545
  hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0,
 
546
            (hash_get_key) get_var_key,
 
547
            (hash_free_key) free_user_var, 0);
 
548
 
 
549
  /* For user vars replication*/
 
550
  if (opt_bin_log)
 
551
    my_init_dynamic_array(&user_var_events,
 
552
                          sizeof(BINLOG_USER_VAR_EVENT *), 16, 16);
 
553
  else
 
554
    memset(&user_var_events, 0, sizeof(user_var_events));
 
555
 
 
556
  /* Protocol */
 
557
  protocol= &protocol_text;                     // Default protocol
 
558
  protocol_text.init(this);
 
559
 
 
560
  const Query_id& query_id= Query_id::get_query_id();
 
561
  tablespace_op= false;
 
562
  tmp= sql_rnd();
 
563
  randominit(&rand, tmp + (ulong) &rand, tmp + query_id.value());
247
564
  substitute_null_with_insert_id = false;
248
 
  lock_info.init(); /* safety: will be reset after start */
 
565
  thr_lock_info_init(&lock_info); /* safety: will be reset after start */
249
566
  thr_lock_owner_init(&main_lock_id, &lock_info);
250
567
 
251
568
  m_internal_handler= NULL;
252
 
  
253
 
  plugin::EventObserver::registerSessionEvents(*this); 
254
569
}
255
570
 
256
 
void Session::free_items()
257
 
{
258
 
  Item *next;
259
 
  /* This works because items are allocated with memory::sql_alloc() */
260
 
  for (; free_list; free_list= next)
261
 
  {
262
 
    next= free_list->next;
263
 
    free_list->delete_self();
264
 
  }
265
 
}
266
571
 
267
572
void Session::push_internal_handler(Internal_error_handler *handler)
268
573
{
274
579
  m_internal_handler= handler;
275
580
}
276
581
 
277
 
bool Session::handle_error(drizzled::error_t sql_errno, const char *message,
278
 
                           DRIZZLE_ERROR::enum_warning_level level)
 
582
 
 
583
bool Session::handle_error(uint32_t sql_errno, const char *message,
 
584
                       DRIZZLE_ERROR::enum_warning_level level)
279
585
{
280
586
  if (m_internal_handler)
281
587
  {
285
591
  return false;                                 // 'false', as per coding style
286
592
}
287
593
 
288
 
void Session::setAbort(bool arg)
289
 
{
290
 
  mysys_var->abort= arg;
291
 
}
292
 
 
293
 
void Session::lockOnSys()
294
 
{
295
 
  if (not mysys_var)
296
 
    return;
297
 
 
298
 
  setAbort(true);
299
 
  boost_unique_lock_t scopedLock(mysys_var->mutex);
300
 
  if (mysys_var->current_cond)
301
 
  {
302
 
    mysys_var->current_mutex->lock();
303
 
    mysys_var->current_cond->notify_all();
304
 
    mysys_var->current_mutex->unlock();
305
 
  }
306
 
}
307
594
 
308
595
void Session::pop_internal_handler()
309
596
{
311
598
  m_internal_handler= NULL;
312
599
}
313
600
 
314
 
void Session::get_xid(DrizzleXid *xid)
315
 
{
316
 
  *xid = *(DrizzleXid *) &transaction.xid_state.xid;
317
 
}
 
601
extern "C"
 
602
void *session_alloc(Session *session, unsigned int size)
 
603
{
 
604
  return session->alloc(size);
 
605
}
 
606
 
 
607
extern "C"
 
608
void *session_calloc(Session *session, unsigned int size)
 
609
{
 
610
  return session->calloc(size);
 
611
}
 
612
 
 
613
extern "C"
 
614
char *session_strdup(Session *session, const char *str)
 
615
{
 
616
  return session->strdup(str);
 
617
}
 
618
 
 
619
extern "C"
 
620
char *session_strmake(Session *session, const char *str, unsigned int size)
 
621
{
 
622
  return session->strmake(str, size);
 
623
}
 
624
 
 
625
extern "C"
 
626
LEX_STRING *session_make_lex_string(Session *session, LEX_STRING *lex_str,
 
627
                                const char *str, unsigned int size,
 
628
                                int allocate_lex_string)
 
629
{
 
630
  return session->make_lex_string(lex_str, str, size,
 
631
                              (bool) allocate_lex_string);
 
632
}
 
633
 
 
634
extern "C"
 
635
void *session_memdup(Session *session, const void* str, unsigned int size)
 
636
{
 
637
  return session->memdup(str, size);
 
638
}
 
639
 
 
640
extern "C"
 
641
void session_get_xid(const Session *session, DRIZZLE_XID *xid)
 
642
{
 
643
  *xid = *(DRIZZLE_XID *) &session->transaction.xid_state.xid;
 
644
}
 
645
 
 
646
/*
 
647
  Init common variables that has to be reset on start and on change_user
 
648
*/
 
649
 
 
650
void Session::init(void)
 
651
{
 
652
  pthread_mutex_lock(&LOCK_global_system_variables);
 
653
  plugin_sessionvar_init(this);
 
654
  variables.time_format= date_time_format_copy((Session*) 0,
 
655
                                               variables.time_format);
 
656
  variables.date_format= date_time_format_copy((Session*) 0,
 
657
                                               variables.date_format);
 
658
  variables.datetime_format= date_time_format_copy((Session*) 0,
 
659
                                                   variables.datetime_format);
 
660
  /*
 
661
    variables= global_system_variables above has reset
 
662
    variables.pseudo_thread_id to 0. We need to correct it here to
 
663
    avoid temporary tables replication failure.
 
664
  */
 
665
  variables.pseudo_thread_id= thread_id;
 
666
  pthread_mutex_unlock(&LOCK_global_system_variables);
 
667
  server_status= SERVER_STATUS_AUTOCOMMIT;
 
668
  options= session_startup_options;
 
669
 
 
670
  if (variables.max_join_size == HA_POS_ERROR)
 
671
    options |= OPTION_BIG_SELECTS;
 
672
  else
 
673
    options &= ~OPTION_BIG_SELECTS;
 
674
 
 
675
  transaction.all.modified_non_trans_table= transaction.stmt.modified_non_trans_table= false;
 
676
  open_options=ha_open_options;
 
677
  update_lock_default= (variables.low_priority_updates ?
 
678
                        TL_WRITE_LOW_PRIORITY :
 
679
                        TL_WRITE);
 
680
  session_tx_isolation= (enum_tx_isolation) variables.tx_isolation;
 
681
  warn_list.empty();
 
682
  memset(warn_count, 0, sizeof(warn_count));
 
683
  total_warn_count= 0;
 
684
  update_charset();
 
685
  reset_current_stmt_binlog_row_based();
 
686
  memset(&status_var, 0, sizeof(status_var));
 
687
}
 
688
 
 
689
 
 
690
/*
 
691
  Init Session for query processing.
 
692
  This has to be called once before we call mysql_parse.
 
693
  See also comments in sql_class.h.
 
694
*/
 
695
 
 
696
void Session::init_for_queries()
 
697
{
 
698
  set_time(); 
 
699
  ha_enable_transaction(this,true);
 
700
 
 
701
  reset_root_defaults(mem_root, variables.query_alloc_block_size,
 
702
                      variables.query_prealloc_size);
 
703
  reset_root_defaults(&transaction.mem_root,
 
704
                      variables.trans_alloc_block_size,
 
705
                      variables.trans_prealloc_size);
 
706
  transaction.xid_state.xid.null();
 
707
  transaction.xid_state.in_session=1;
 
708
}
 
709
 
318
710
 
319
711
/* Do operations that may take a long time */
320
712
 
321
713
void Session::cleanup(void)
322
714
{
323
 
  assert(cleanup_done == false);
 
715
  assert(cleanup_done == 0);
324
716
 
325
 
  setKilled(KILL_CONNECTION);
 
717
  killed= KILL_CONNECTION;
326
718
#ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE
327
719
  if (transaction.xid_state.xa_state == XA_PREPARED)
328
720
  {
330
722
  }
331
723
#endif
332
724
  {
333
 
    TransactionServices &transaction_services= TransactionServices::singleton();
334
 
    transaction_services.rollbackTransaction(*this, true);
 
725
    ha_rollback(this);
335
726
    xid_cache_delete(&transaction.xid_state);
336
727
  }
337
 
 
338
 
  for (UserVars::iterator iter= user_vars.begin();
339
 
       iter != user_vars.end();
340
 
       iter++)
 
728
  if (locked_tables)
341
729
  {
342
 
    user_var_entry *entry= (*iter).second;
343
 
    boost::checked_delete(entry);
 
730
    lock=locked_tables; locked_tables=0;
 
731
    close_thread_tables(this);
344
732
  }
345
 
  user_vars.clear();
346
 
 
347
 
 
348
 
  close_temporary_tables();
349
 
 
 
733
  mysql_ha_cleanup(this);
 
734
  delete_dynamic(&user_var_events);
 
735
  hash_free(&user_vars);
 
736
  close_temporary_tables(this);
 
737
  free((char*) variables.time_format);
 
738
  free((char*) variables.date_format);
 
739
  free((char*) variables.datetime_format);
 
740
  
350
741
  if (global_read_lock)
351
 
  {
352
 
    unlockGlobalReadLock();
353
 
  }
 
742
    unlock_global_read_lock(this);
354
743
 
355
 
  cleanup_done= true;
 
744
  cleanup_done=1;
 
745
  return;
356
746
}
357
747
 
358
748
Session::~Session()
359
749
{
360
 
  this->checkSentry();
361
 
 
362
 
  if (client and client->isConnected())
363
 
  {
364
 
    assert(security_ctx);
365
 
    if (global_system_variables.log_warnings)
366
 
    {
367
 
      errmsg_printf(error::WARN, ER(ER_FORCING_CLOSE),
368
 
                    internal::my_progname,
369
 
                    thread_id,
370
 
                    security_ctx->username().c_str());
371
 
    }
372
 
 
373
 
    disconnect();
374
 
  }
 
750
  Session_CHECK_SENTRY(this);
 
751
  /* Ensure that no one is using Session */
 
752
  pthread_mutex_lock(&LOCK_delete);
 
753
  pthread_mutex_unlock(&LOCK_delete);
 
754
  add_to_status(&global_status_var, &status_var);
375
755
 
376
756
  /* Close connection */
377
 
  if (client)
 
757
  if (net.vio)
378
758
  {
379
 
    client->close();
380
 
    boost::checked_delete(client);
381
 
    client= NULL;
 
759
    net_close(&net);
 
760
    net_end(&net);
382
761
  }
383
 
 
384
 
  if (cleanup_done == false)
 
762
  if (!cleanup_done)
385
763
    cleanup();
386
764
 
387
 
  plugin::StorageEngine::closeConnection(this);
 
765
  ha_close_connection(this);
388
766
  plugin_sessionvar_cleanup(this);
389
767
 
390
 
  warn_root.free_root(MYF(0));
 
768
  main_security_ctx.destroy();
 
769
  if (db)
 
770
  {
 
771
    free(db);
 
772
    db= NULL;
 
773
  }
 
774
  free_root(&warn_root,MYF(0));
 
775
  free_root(&transaction.mem_root,MYF(0));
391
776
  mysys_var=0;                                  // Safety (shouldn't be needed)
 
777
  pthread_mutex_destroy(&LOCK_delete);
392
778
  dbug_sentry= Session_SENTRY_GONE;
393
 
 
394
 
  main_mem_root.free_root(MYF(0));
395
 
  currentMemRoot().release();
396
 
  currentSession().release();
397
 
 
398
 
  plugin::Logging::postEndDo(this);
399
 
  plugin::EventObserver::deregisterSessionEvents(*this); 
400
 
}
401
 
 
402
 
void Session::setClient(plugin::Client *client_arg)
403
 
{
404
 
  client= client_arg;
405
 
  client->setSession(this);
406
 
}
407
 
 
408
 
void Session::awake(Session::killed_state_t state_to_set)
409
 
{
410
 
  if ((state_to_set == Session::KILL_QUERY) and (command == COM_SLEEP))
411
 
    return;
412
 
 
413
 
  this->checkSentry();
414
 
 
415
 
  setKilled(state_to_set);
416
 
  scheduler->killSession(this);
417
 
 
 
779
  if (rli_fake)
 
780
  {
 
781
    delete rli_fake;
 
782
    rli_fake= NULL;
 
783
  }
 
784
  
 
785
  free_root(&main_mem_root, MYF(0));
 
786
  return;
 
787
}
 
788
 
 
789
 
 
790
/*
 
791
  Add all status variables to another status variable array
 
792
 
 
793
  SYNOPSIS
 
794
   add_to_status()
 
795
   to_var       add to this array
 
796
   from_var     from this array
 
797
 
 
798
  NOTES
 
799
    This function assumes that all variables are long/ulong.
 
800
    If this assumption will change, then we have to explictely add
 
801
    the other variables after the while loop
 
802
*/
 
803
 
 
804
void add_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var)
 
805
{
 
806
  ulong *end= (ulong*) ((unsigned char*) to_var +
 
807
                        offsetof(STATUS_VAR, last_system_status_var) +
 
808
                        sizeof(ulong));
 
809
  ulong *to= (ulong*) to_var, *from= (ulong*) from_var;
 
810
 
 
811
  while (to != end)
 
812
    *(to++)+= *(from++);
 
813
}
 
814
 
 
815
/*
 
816
  Add the difference between two status variable arrays to another one.
 
817
 
 
818
  SYNOPSIS
 
819
    add_diff_to_status
 
820
    to_var       add to this array
 
821
    from_var     from this array
 
822
    dec_var      minus this array
 
823
  
 
824
  NOTE
 
825
    This function assumes that all variables are long/ulong.
 
826
*/
 
827
 
 
828
void add_diff_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var,
 
829
                        STATUS_VAR *dec_var)
 
830
{
 
831
  ulong *end= (ulong*) ((unsigned char*) to_var + offsetof(STATUS_VAR,
 
832
                                                  last_system_status_var) +
 
833
                        sizeof(ulong));
 
834
  ulong *to= (ulong*) to_var, *from= (ulong*) from_var, *dec= (ulong*) dec_var;
 
835
 
 
836
  while (to != end)
 
837
    *(to++)+= *(from++) - *(dec++);
 
838
}
 
839
 
 
840
 
 
841
void Session::awake(Session::killed_state state_to_set)
 
842
{
 
843
  Session_CHECK_SENTRY(this);
 
844
  safe_mutex_assert_owner(&LOCK_delete); 
 
845
 
 
846
  killed= state_to_set;
418
847
  if (state_to_set != Session::KILL_QUERY)
419
848
  {
420
 
    DRIZZLE_CONNECTION_DONE(thread_id);
 
849
    thr_alarm_kill(thread_id);
 
850
    if (!slave_thread)
 
851
      thread_scheduler.post_kill_notification(this);
421
852
  }
422
 
 
423
853
  if (mysys_var)
424
854
  {
425
 
    boost_unique_lock_t scopedLock(mysys_var->mutex);
 
855
    pthread_mutex_lock(&mysys_var->mutex);
 
856
    if (!system_thread)         // Don't abort locks
 
857
      mysys_var->abort=1;
426
858
    /*
427
 
      "
428
859
      This broadcast could be up in the air if the victim thread
429
860
      exits the cond in the time between read and broadcast, but that is
430
861
      ok since all we want to do is to make the victim thread get out
445
876
    */
446
877
    if (mysys_var->current_cond && mysys_var->current_mutex)
447
878
    {
448
 
      mysys_var->current_mutex->lock();
449
 
      mysys_var->current_cond->notify_all();
450
 
      mysys_var->current_mutex->unlock();
 
879
      pthread_mutex_lock(mysys_var->current_mutex);
 
880
      pthread_cond_broadcast(mysys_var->current_cond);
 
881
      pthread_mutex_unlock(mysys_var->current_mutex);
451
882
    }
 
883
    pthread_mutex_unlock(&mysys_var->mutex);
452
884
  }
 
885
  return;
453
886
}
454
887
 
455
888
/*
456
889
  Remember the location of thread info, the structure needed for
457
 
  memory::sql_alloc() and the structure for the net buffer
 
890
  sql_alloc() and the structure for the net buffer
458
891
*/
459
 
bool Session::storeGlobals()
 
892
 
 
893
bool Session::store_globals()
460
894
{
461
895
  /*
462
896
    Assert that thread_stack is initialized: it's necessary to be able
464
898
  */
465
899
  assert(thread_stack);
466
900
 
467
 
  currentSession().release();
468
 
  currentSession().reset(this);
469
 
 
470
 
  currentMemRoot().release();
471
 
  currentMemRoot().reset(&mem_root);
472
 
 
 
901
  if (pthread_setspecific(THR_Session,  this) ||
 
902
      pthread_setspecific(THR_MALLOC, &mem_root))
 
903
    return 1;
473
904
  mysys_var=my_thread_var;
474
 
 
475
905
  /*
476
906
    Let mysqld define the thread id (not mysys)
477
907
    This allows us to move Session to different threads if needed.
478
908
  */
479
909
  mysys_var->id= thread_id;
 
910
  real_id= pthread_self();                      // For debugging
480
911
 
481
912
  /*
482
913
    We have to call thr_lock_info_init() again here as Session may have been
483
914
    created in another thread
484
915
  */
485
 
  lock_info.init();
486
 
 
487
 
  return false;
488
 
}
489
 
 
490
 
/*
491
 
  Init Session for query processing.
492
 
  This has to be called once before we call mysql_parse.
493
 
  See also comments in session.h.
494
 
*/
495
 
 
496
 
void Session::prepareForQueries()
497
 
{
498
 
  if (variables.max_join_size == HA_POS_ERROR)
499
 
    options |= OPTION_BIG_SELECTS;
500
 
 
501
 
  version= refresh_version;
502
 
  set_proc_info(NULL);
503
 
  command= COM_SLEEP;
504
 
  set_time();
505
 
 
506
 
  mem_root->reset_root_defaults(variables.query_alloc_block_size,
507
 
                                variables.query_prealloc_size);
508
 
  transaction.xid_state.xid.null();
509
 
  transaction.xid_state.in_session=1;
510
 
  if (use_usage)
511
 
    resetUsage();
512
 
}
513
 
 
514
 
bool Session::initGlobals()
515
 
{
516
 
  if (storeGlobals())
517
 
  {
518
 
    disconnect(ER_OUT_OF_RESOURCES);
519
 
    status_var.aborted_connects++;
520
 
    return true;
521
 
  }
522
 
  return false;
523
 
}
524
 
 
525
 
void Session::run()
526
 
{
527
 
  if (initGlobals() || authenticate())
528
 
  {
529
 
    disconnect();
530
 
    return;
531
 
  }
532
 
 
533
 
  prepareForQueries();
534
 
 
535
 
  while (not client->haveError() && getKilled() != KILL_CONNECTION)
536
 
  {
537
 
    if (not executeStatement())
538
 
      break;
539
 
  }
540
 
 
541
 
  disconnect();
542
 
}
543
 
 
544
 
bool Session::schedule(Session::shared_ptr &arg)
545
 
{
546
 
  arg->scheduler= plugin::Scheduler::getScheduler();
547
 
  assert(arg->scheduler);
548
 
 
549
 
  ++connection_count;
550
 
 
551
 
  long current_connections= connection_count;
552
 
 
553
 
  if (current_connections > 0 and static_cast<uint64_t>(current_connections) > current_global_counters.max_used_connections)
554
 
  {
555
 
    current_global_counters.max_used_connections= static_cast<uint64_t>(connection_count);
556
 
  }
557
 
 
558
 
  current_global_counters.connections++;
559
 
  arg->thread_id= arg->variables.pseudo_thread_id= global_thread_id++;
560
 
 
561
 
  session::Cache::singleton().insert(arg);
562
 
 
563
 
  if (unlikely(plugin::EventObserver::connectSession(*arg)))
564
 
  {
565
 
    // We should do something about an error...
566
 
  }
567
 
 
568
 
  if (plugin::Scheduler::getScheduler()->addSession(arg))
569
 
  {
570
 
    DRIZZLE_CONNECTION_START(arg->getSessionId());
571
 
    char error_message_buff[DRIZZLE_ERRMSG_SIZE];
572
 
 
573
 
    arg->setKilled(Session::KILL_CONNECTION);
574
 
 
575
 
    arg->status_var.aborted_connects++;
576
 
 
577
 
    /* Can't use my_error() since store_globals has not been called. */
578
 
    /* TODO replace will better error message */
579
 
    snprintf(error_message_buff, sizeof(error_message_buff),
580
 
             ER(ER_CANT_CREATE_THREAD), 1);
581
 
    arg->client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
582
 
 
583
 
    return true;
584
 
  }
585
 
 
586
 
  return false;
587
 
}
588
 
 
589
 
 
590
 
/*
591
 
  Is this session viewable by the current user?
592
 
*/
593
 
bool Session::isViewable(identifier::User::const_reference user_arg) const
594
 
{
595
 
  return plugin::Authorization::isAuthorized(user_arg, *this, false);
596
 
}
597
 
 
598
 
 
599
 
const char* Session::enter_cond(boost::condition_variable_any &cond, boost::mutex &mutex, const char* msg)
600
 
{
601
 
  const char* old_msg = get_proc_info();
602
 
  safe_mutex_assert_owner(mutex);
603
 
  mysys_var->current_mutex = &mutex;
604
 
  mysys_var->current_cond = &cond;
605
 
  this->set_proc_info(msg);
606
 
  return old_msg;
607
 
}
608
 
 
609
 
void Session::exit_cond(const char* old_msg)
610
 
{
611
 
  /*
612
 
    Putting the mutex unlock in exit_cond() ensures that
613
 
    mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
614
 
    locked (if that would not be the case, you'll get a deadlock if someone
615
 
    does a Session::awake() on you).
616
 
  */
617
 
  mysys_var->current_mutex->unlock();
618
 
  boost_unique_lock_t scopedLock(mysys_var->mutex);
619
 
  mysys_var->current_mutex = 0;
620
 
  mysys_var->current_cond = 0;
621
 
  this->set_proc_info(old_msg);
622
 
}
623
 
 
624
 
bool Session::authenticate()
625
 
{
626
 
  if (client->authenticate())
627
 
    return false;
628
 
 
629
 
  status_var.aborted_connects++;
630
 
 
631
 
  return true;
632
 
}
633
 
 
634
 
bool Session::checkUser(const std::string &passwd_str,
635
 
                        const std::string &in_db)
636
 
{
637
 
  bool is_authenticated=
638
 
    plugin::Authentication::isAuthenticated(*user(), passwd_str);
639
 
 
640
 
  if (is_authenticated != true)
641
 
  {
642
 
    status_var.access_denied++;
643
 
    /* isAuthenticated has pushed the error message */
644
 
    return false;
645
 
  }
646
 
 
647
 
  /* Change database if necessary */
648
 
  if (not in_db.empty())
649
 
  {
650
 
    identifier::Schema identifier(in_db);
651
 
    if (schema::change(*this, identifier))
652
 
    {
653
 
      /* change_db() has pushed the error message. */
654
 
      return false;
655
 
    }
656
 
  }
657
 
  my_ok();
658
 
  password= not passwd_str.empty();
659
 
 
660
 
  /* Ready to handle queries */
661
 
  return true;
662
 
}
663
 
 
664
 
bool Session::executeStatement()
665
 
{
666
 
  char *l_packet= 0;
667
 
  uint32_t packet_length;
668
 
 
669
 
  enum enum_server_command l_command;
670
 
 
671
 
  /*
672
 
    indicator of uninitialized lex => normal flow of errors handling
673
 
    (see my_message_sql)
674
 
  */
675
 
  lex->current_select= 0;
676
 
  clear_error();
677
 
  main_da.reset_diagnostics_area();
678
 
 
679
 
  if (client->readCommand(&l_packet, &packet_length) == false)
680
 
  {
681
 
    return false;
682
 
  }
683
 
 
684
 
  if (getKilled() == KILL_CONNECTION)
685
 
    return false;
686
 
 
687
 
  if (packet_length == 0)
688
 
    return true;
689
 
 
690
 
  l_command= static_cast<enum_server_command>(l_packet[0]);
691
 
 
692
 
  if (command >= COM_END)
693
 
    command= COM_END;                           // Wrong command
694
 
 
695
 
  assert(packet_length);
696
 
  return not dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
697
 
}
698
 
 
699
 
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
700
 
{
701
 
  /* Remove garbage at start and end of query */
702
 
  while (in_packet_length > 0 && my_isspace(charset(), in_packet[0]))
703
 
  {
704
 
    in_packet++;
705
 
    in_packet_length--;
706
 
  }
707
 
  const char *pos= in_packet + in_packet_length; /* Point at end null */
708
 
  while (in_packet_length > 0 && (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
709
 
  {
710
 
    pos--;
711
 
    in_packet_length--;
712
 
  }
713
 
 
714
 
  std::string *new_query= new std::string(in_packet, in_packet + in_packet_length);
715
 
  // We can not be entirely sure _schema has a value
716
 
  if (_schema)
717
 
  {
718
 
    plugin::QueryRewriter::rewriteQuery(*_schema, *new_query);
719
 
  }
720
 
  query.reset(new_query);
721
 
  _state.reset(new session::State(in_packet, in_packet_length));
722
 
 
723
 
  return true;
724
 
}
725
 
 
726
 
bool Session::endTransaction(enum enum_mysql_completiontype completion)
727
 
{
728
 
  bool do_release= 0;
729
 
  bool result= true;
730
 
  TransactionServices &transaction_services= TransactionServices::singleton();
731
 
 
732
 
  if (transaction.xid_state.xa_state != XA_NOTR)
733
 
  {
734
 
    my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
735
 
    return false;
736
 
  }
737
 
  switch (completion)
738
 
  {
739
 
    case COMMIT:
740
 
      /*
741
 
       * We don't use endActiveTransaction() here to ensure that this works
742
 
       * even if there is a problem with the OPTION_AUTO_COMMIT flag
743
 
       * (Which of course should never happen...)
744
 
       */
745
 
      server_status&= ~SERVER_STATUS_IN_TRANS;
746
 
      if (transaction_services.commitTransaction(*this, true))
747
 
        result= false;
748
 
      options&= ~(OPTION_BEGIN);
749
 
      break;
750
 
    case COMMIT_RELEASE:
751
 
      do_release= 1; /* fall through */
752
 
    case COMMIT_AND_CHAIN:
753
 
      result= endActiveTransaction();
754
 
      if (result == true && completion == COMMIT_AND_CHAIN)
755
 
        result= startTransaction();
756
 
      break;
757
 
    case ROLLBACK_RELEASE:
758
 
      do_release= 1; /* fall through */
759
 
    case ROLLBACK:
760
 
    case ROLLBACK_AND_CHAIN:
761
 
    {
762
 
      server_status&= ~SERVER_STATUS_IN_TRANS;
763
 
      if (transaction_services.rollbackTransaction(*this, true))
764
 
        result= false;
765
 
      options&= ~(OPTION_BEGIN);
766
 
      if (result == true && (completion == ROLLBACK_AND_CHAIN))
767
 
        result= startTransaction();
768
 
      break;
769
 
    }
770
 
    default:
771
 
      my_error(ER_UNKNOWN_COM_ERROR, MYF(0));
772
 
      return false;
773
 
  }
774
 
 
775
 
  if (result == false)
776
 
  {
777
 
    my_error(static_cast<drizzled::error_t>(killed_errno()), MYF(0));
778
 
  }
779
 
  else if ((result == true) && do_release)
780
 
  {
781
 
    setKilled(Session::KILL_CONNECTION);
782
 
  }
783
 
 
784
 
  return result;
785
 
}
786
 
 
787
 
bool Session::endActiveTransaction()
788
 
{
789
 
  bool result= true;
790
 
  TransactionServices &transaction_services= TransactionServices::singleton();
791
 
 
792
 
  if (transaction.xid_state.xa_state != XA_NOTR)
793
 
  {
794
 
    my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
795
 
    return false;
796
 
  }
797
 
  if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
798
 
  {
799
 
    server_status&= ~SERVER_STATUS_IN_TRANS;
800
 
    if (transaction_services.commitTransaction(*this, true))
801
 
      result= false;
802
 
  }
803
 
  options&= ~(OPTION_BEGIN);
804
 
  return result;
805
 
}
806
 
 
807
 
bool Session::startTransaction(start_transaction_option_t opt)
808
 
{
809
 
  bool result= true;
810
 
 
811
 
  assert(! inTransaction());
812
 
 
813
 
  options|= OPTION_BEGIN;
814
 
  server_status|= SERVER_STATUS_IN_TRANS;
815
 
 
816
 
  if (plugin::TransactionalStorageEngine::notifyStartTransaction(this, opt))
817
 
  {
818
 
    result= false;
819
 
  }
820
 
 
821
 
  return result;
822
 
}
 
916
  thr_lock_info_init(&lock_info);
 
917
  return 0;
 
918
}
 
919
 
 
920
 
 
921
/*
 
922
  Cleanup after query.
 
923
 
 
924
  SYNOPSIS
 
925
    Session::cleanup_after_query()
 
926
 
 
927
  DESCRIPTION
 
928
    This function is used to reset thread data to its default state.
 
929
 
 
930
  NOTE
 
931
    This function is not suitable for setting thread data to some
 
932
    non-default values, as there is only one replication thread, so
 
933
    different master threads may overwrite data of each other on
 
934
    slave.
 
935
*/
823
936
 
824
937
void Session::cleanup_after_query()
825
938
{
826
939
  /*
827
 
    Reset rand_used so that detection of calls to rand() will save random
 
940
    Reset rand_used so that detection of calls to rand() will save random 
828
941
    seeds if needed by the slave.
829
942
  */
830
943
  {
831
944
    /* Forget those values, for next binlogger: */
 
945
    stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
832
946
    auto_inc_intervals_in_cur_stmt_for_binlog.empty();
 
947
    rand_used= 0;
833
948
  }
834
949
  if (first_successful_insert_id_in_cur_stmt > 0)
835
950
  {
836
951
    /* set what LAST_INSERT_ID() will return */
837
 
    first_successful_insert_id_in_prev_stmt= first_successful_insert_id_in_cur_stmt;
 
952
    first_successful_insert_id_in_prev_stmt= 
 
953
      first_successful_insert_id_in_cur_stmt;
838
954
    first_successful_insert_id_in_cur_stmt= 0;
839
955
    substitute_null_with_insert_id= true;
840
956
  }
841
 
 
842
 
  arg_of_last_insert_id_function= false;
843
 
 
 
957
  arg_of_last_insert_id_function= 0;
844
958
  /* Free Items that were created during this execution */
845
959
  free_items();
846
 
 
847
 
  /* Reset _where. */
848
 
  _where= Session::DEFAULT_WHERE;
849
 
 
850
 
  /* Reset the temporary shares we built */
851
 
  for_each(temporary_shares.begin(),
852
 
           temporary_shares.end(),
853
 
           DeletePtr());
854
 
  temporary_shares.clear();
 
960
  /* Reset where. */
 
961
  where= Session::DEFAULT_WHERE;
855
962
}
856
963
 
 
964
 
857
965
/**
858
966
  Create a LEX_STRING in this connection.
859
967
 
865
973
  @return  NULL on failure, or pointer to the LEX_STRING object
866
974
*/
867
975
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
868
 
                                     const std::string &str,
869
 
                                     bool allocate_lex_string)
870
 
{
871
 
  return make_lex_string(lex_str, str.c_str(), str.length(), allocate_lex_string);
872
 
}
873
 
 
874
 
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
875
 
                                     const char* str, uint32_t length,
876
 
                                     bool allocate_lex_string)
 
976
                                 const char* str, uint32_t length,
 
977
                                 bool allocate_lex_string)
877
978
{
878
979
  if (allocate_lex_string)
879
 
    if (!(lex_str= (LEX_STRING *)getMemRoot()->allocate(sizeof(LEX_STRING))))
 
980
    if (!(lex_str= (LEX_STRING *)alloc(sizeof(LEX_STRING))))
880
981
      return 0;
881
 
  if (!(lex_str->str= mem_root->strmake_root(str, length)))
 
982
  if (!(lex_str->str= strmake_root(mem_root, str, length)))
882
983
    return 0;
883
984
  lex_str->length= length;
884
985
  return lex_str;
885
986
}
886
987
 
 
988
 
 
989
/*
 
990
  Convert a string to another character set
 
991
 
 
992
  SYNOPSIS
 
993
    convert_string()
 
994
    to                          Store new allocated string here
 
995
    to_cs                       New character set for allocated string
 
996
    from                        String to convert
 
997
    from_length                 Length of string to convert
 
998
    from_cs                     Original character set
 
999
 
 
1000
  NOTES
 
1001
    to will be 0-terminated to make it easy to pass to system funcs
 
1002
 
 
1003
  RETURN
 
1004
    0   ok
 
1005
    1   End of memory.
 
1006
        In this case to->str will point to 0 and to->length will be 0.
 
1007
*/
 
1008
 
 
1009
bool Session::convert_string(LEX_STRING *to, const CHARSET_INFO * const to_cs,
 
1010
                         const char *from, uint32_t from_length,
 
1011
                         const CHARSET_INFO * const from_cs)
 
1012
{
 
1013
  size_t new_length= to_cs->mbmaxlen * from_length;
 
1014
  uint32_t dummy_errors;
 
1015
  if (!(to->str= (char*) alloc(new_length+1)))
 
1016
  {
 
1017
    to->length= 0;                              // Safety fix
 
1018
    return(1);                          // EOM
 
1019
  }
 
1020
  to->length= copy_and_convert((char*) to->str, new_length, to_cs,
 
1021
                               from, from_length, from_cs, &dummy_errors);
 
1022
  to->str[to->length]=0;                        // Safety
 
1023
  return(0);
 
1024
}
 
1025
 
 
1026
 
 
1027
/*
 
1028
  Convert string from source character set to target character set inplace.
 
1029
 
 
1030
  SYNOPSIS
 
1031
    Session::convert_string
 
1032
 
 
1033
  DESCRIPTION
 
1034
    Convert string using convert_buffer - buffer for character set 
 
1035
    conversion shared between all protocols.
 
1036
 
 
1037
  RETURN
 
1038
    0   ok
 
1039
   !0   out of memory
 
1040
*/
 
1041
 
 
1042
bool Session::convert_string(String *s, const CHARSET_INFO * const from_cs,
 
1043
                         const CHARSET_INFO * const to_cs)
 
1044
{
 
1045
  uint32_t dummy_errors;
 
1046
  if (convert_buffer.copy(s->ptr(), s->length(), from_cs, to_cs, &dummy_errors))
 
1047
    return true;
 
1048
  /* If convert_buffer >> s copying is more efficient long term */
 
1049
  if (convert_buffer.alloced_length() >= convert_buffer.length() * 2 ||
 
1050
      !s->is_alloced())
 
1051
  {
 
1052
    return s->copy(convert_buffer);
 
1053
  }
 
1054
  s->swap(convert_buffer);
 
1055
  return false;
 
1056
}
 
1057
 
 
1058
 
 
1059
/*
 
1060
  Update some cache variables when character set changes
 
1061
*/
 
1062
 
 
1063
void Session::update_charset()
 
1064
{
 
1065
  uint32_t not_used;
 
1066
  charset_is_system_charset= !String::needs_conversion(0,charset(),
 
1067
                                                       system_charset_info,
 
1068
                                                       &not_used);
 
1069
  charset_is_collation_connection= 
 
1070
    !String::needs_conversion(0,charset(),variables.collation_connection,
 
1071
                              &not_used);
 
1072
  charset_is_character_set_filesystem= 
 
1073
    !String::needs_conversion(0, charset(),
 
1074
                              variables.character_set_filesystem, &not_used);
 
1075
}
 
1076
 
 
1077
 
 
1078
/* routings to adding tables to list of changed in transaction tables */
 
1079
 
 
1080
inline static void list_include(CHANGED_TableList** prev,
 
1081
                                CHANGED_TableList* curr,
 
1082
                                CHANGED_TableList* new_table)
 
1083
{
 
1084
  if (new_table)
 
1085
  {
 
1086
    *prev = new_table;
 
1087
    (*prev)->next = curr;
 
1088
  }
 
1089
}
 
1090
 
 
1091
/* add table to list of changed in transaction tables */
 
1092
 
 
1093
void Session::add_changed_table(Table *table)
 
1094
{
 
1095
  assert((options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
 
1096
              table->file->has_transactions());
 
1097
  add_changed_table(table->s->table_cache_key.str,
 
1098
                    (long) table->s->table_cache_key.length);
 
1099
  return;
 
1100
}
 
1101
 
 
1102
 
 
1103
void Session::add_changed_table(const char *key, long key_length)
 
1104
{
 
1105
  CHANGED_TableList **prev_changed = &transaction.changed_tables;
 
1106
  CHANGED_TableList *curr = transaction.changed_tables;
 
1107
 
 
1108
  for (; curr; prev_changed = &(curr->next), curr = curr->next)
 
1109
  {
 
1110
    int cmp =  (long)curr->key_length - (long)key_length;
 
1111
    if (cmp < 0)
 
1112
    {
 
1113
      list_include(prev_changed, curr, changed_table_dup(key, key_length));
 
1114
      return;
 
1115
    }
 
1116
    else if (cmp == 0)
 
1117
    {
 
1118
      cmp = memcmp(curr->key, key, curr->key_length);
 
1119
      if (cmp < 0)
 
1120
      {
 
1121
        list_include(prev_changed, curr, changed_table_dup(key, key_length));
 
1122
        return;
 
1123
      }
 
1124
      else if (cmp == 0)
 
1125
      {
 
1126
        return;
 
1127
      }
 
1128
    }
 
1129
  }
 
1130
  *prev_changed = changed_table_dup(key, key_length);
 
1131
  return;
 
1132
}
 
1133
 
 
1134
 
 
1135
CHANGED_TableList* Session::changed_table_dup(const char *key, long key_length)
 
1136
{
 
1137
  CHANGED_TableList* new_table = 
 
1138
    (CHANGED_TableList*) trans_alloc(ALIGN_SIZE(sizeof(CHANGED_TableList))+
 
1139
                                      key_length + 1);
 
1140
  if (!new_table)
 
1141
  {
 
1142
    my_error(EE_OUTOFMEMORY, MYF(ME_BELL),
 
1143
             ALIGN_SIZE(sizeof(TableList)) + key_length + 1);
 
1144
    killed= KILL_CONNECTION;
 
1145
    return 0;
 
1146
  }
 
1147
 
 
1148
  new_table->key= ((char*)new_table)+ ALIGN_SIZE(sizeof(CHANGED_TableList));
 
1149
  new_table->next = 0;
 
1150
  new_table->key_length = key_length;
 
1151
  ::memcpy(new_table->key, key, key_length);
 
1152
  return new_table;
 
1153
}
 
1154
 
 
1155
 
887
1156
int Session::send_explain_fields(select_result *result)
888
1157
{
889
1158
  List<Item> field_list;
919
1188
  }
920
1189
  item->maybe_null= 1;
921
1190
  field_list.push_back(new Item_empty_string("Extra", 255, cs));
922
 
  return (result->send_fields(field_list));
923
 
}
924
 
 
925
 
void select_result::send_error(drizzled::error_t errcode, const char *err)
 
1191
  return (result->send_fields(field_list,
 
1192
                              Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF));
 
1193
}
 
1194
 
 
1195
 
 
1196
struct Item_change_record: public ilink
 
1197
{
 
1198
  Item **place;
 
1199
  Item *old_value;
 
1200
  /* Placement new was hidden by `new' in ilink (TODO: check): */
 
1201
  static void *operator new(size_t size __attribute__((unused)),
 
1202
                            void *mem)
 
1203
    { return mem; }
 
1204
  static void operator delete(void *ptr __attribute__((unused)),
 
1205
                              size_t size __attribute__((unused)))
 
1206
    {}
 
1207
  static void operator delete(void *ptr __attribute__((unused)),
 
1208
                              void *mem __attribute__((unused)))
 
1209
    { /* never called */ }
 
1210
};
 
1211
 
 
1212
 
 
1213
/*
 
1214
  Register an item tree tree transformation, performed by the query
 
1215
  optimizer. We need a pointer to runtime_memroot because it may be !=
 
1216
  session->mem_root (this may no longer be a true statement)
 
1217
*/
 
1218
 
 
1219
void Session::nocheck_register_item_tree_change(Item **place, Item *old_value,
 
1220
                                            MEM_ROOT *runtime_memroot)
 
1221
{
 
1222
  Item_change_record *change;
 
1223
  /*
 
1224
    Now we use one node per change, which adds some memory overhead,
 
1225
    but still is rather fast as we use alloc_root for allocations.
 
1226
    A list of item tree changes of an average query should be short.
 
1227
  */
 
1228
  void *change_mem= alloc_root(runtime_memroot, sizeof(*change));
 
1229
  if (change_mem == 0)
 
1230
  {
 
1231
    /*
 
1232
      OOM, session->fatal_error() is called by the error handler of the
 
1233
      memroot. Just return.
 
1234
    */
 
1235
    return;
 
1236
  }
 
1237
  change= new (change_mem) Item_change_record;
 
1238
  change->place= place;
 
1239
  change->old_value= old_value;
 
1240
  change_list.append(change);
 
1241
}
 
1242
 
 
1243
 
 
1244
void Session::rollback_item_tree_changes()
 
1245
{
 
1246
  I_List_iterator<Item_change_record> it(change_list);
 
1247
  Item_change_record *change;
 
1248
 
 
1249
  while ((change= it++))
 
1250
    *change->place= change->old_value;
 
1251
  /* We can forget about changes memory: it's allocated in runtime memroot */
 
1252
  change_list.empty();
 
1253
  return;
 
1254
}
 
1255
 
 
1256
 
 
1257
/*****************************************************************************
 
1258
** Functions to provide a interface to select results
 
1259
*****************************************************************************/
 
1260
 
 
1261
select_result::select_result()
 
1262
{
 
1263
  session=current_session;
 
1264
}
 
1265
 
 
1266
void select_result::send_error(uint32_t errcode,const char *err)
926
1267
{
927
1268
  my_message(errcode, err, MYF(0));
928
1269
}
929
1270
 
 
1271
 
 
1272
void select_result::cleanup()
 
1273
{
 
1274
  /* do nothing */
 
1275
}
 
1276
 
 
1277
bool select_result::check_simple_select() const
 
1278
{
 
1279
  my_error(ER_SP_BAD_CURSOR_QUERY, MYF(0));
 
1280
  return true;
 
1281
}
 
1282
 
 
1283
 
 
1284
static String default_line_term("\n",default_charset_info);
 
1285
static String default_escaped("\\",default_charset_info);
 
1286
static String default_field_term("\t",default_charset_info);
 
1287
 
 
1288
sql_exchange::sql_exchange(char *name, bool flag,
 
1289
                           enum enum_filetype filetype_arg)
 
1290
  :file_name(name), opt_enclosed(0), dumpfile(flag), skip_lines(0)
 
1291
{
 
1292
  filetype= filetype_arg;
 
1293
  field_term= &default_field_term;
 
1294
  enclosed=   line_start= &my_empty_string;
 
1295
  line_term=  &default_line_term;
 
1296
  escaped=    &default_escaped;
 
1297
  cs= NULL;
 
1298
}
 
1299
 
 
1300
bool select_send::send_fields(List<Item> &list, uint32_t flags)
 
1301
{
 
1302
  bool res;
 
1303
  if (!(res= session->protocol->send_fields(&list, flags)))
 
1304
    is_result_set_started= 1;
 
1305
  return res;
 
1306
}
 
1307
 
 
1308
void select_send::abort()
 
1309
{
 
1310
  return;
 
1311
}
 
1312
 
 
1313
 
 
1314
/** 
 
1315
  Cleanup an instance of this class for re-use
 
1316
  at next execution of a prepared statement/
 
1317
  stored procedure statement.
 
1318
*/
 
1319
 
 
1320
void select_send::cleanup()
 
1321
{
 
1322
  is_result_set_started= false;
 
1323
}
 
1324
 
 
1325
/* Send data to client. Returns 0 if ok */
 
1326
 
 
1327
bool select_send::send_data(List<Item> &items)
 
1328
{
 
1329
  if (unit->offset_limit_cnt)
 
1330
  {                                             // using limit offset,count
 
1331
    unit->offset_limit_cnt--;
 
1332
    return 0;
 
1333
  }
 
1334
 
 
1335
  /*
 
1336
    We may be passing the control from mysqld to the client: release the
 
1337
    InnoDB adaptive hash S-latch to avoid thread deadlocks if it was reserved
 
1338
    by session
 
1339
  */
 
1340
  ha_release_temporary_latches(session);
 
1341
 
 
1342
  List_iterator_fast<Item> li(items);
 
1343
  Protocol *protocol= session->protocol;
 
1344
  char buff[MAX_FIELD_WIDTH];
 
1345
  String buffer(buff, sizeof(buff), &my_charset_bin);
 
1346
 
 
1347
  protocol->prepare_for_resend();
 
1348
  Item *item;
 
1349
  while ((item=li++))
 
1350
  {
 
1351
    if (item->send(protocol, &buffer))
 
1352
    {
 
1353
      protocol->free();                         // Free used buffer
 
1354
      my_message(ER_OUT_OF_RESOURCES, ER(ER_OUT_OF_RESOURCES), MYF(0));
 
1355
      break;
 
1356
    }
 
1357
  }
 
1358
  session->sent_row_count++;
 
1359
  if (session->is_error())
 
1360
  {
 
1361
    protocol->remove_last_row();
 
1362
    return(1);
 
1363
  }
 
1364
  if (session->vio_ok())
 
1365
    return(protocol->write());
 
1366
  return(0);
 
1367
}
 
1368
 
 
1369
bool select_send::send_eof()
 
1370
{
 
1371
  /* 
 
1372
    We may be passing the control from mysqld to the client: release the
 
1373
    InnoDB adaptive hash S-latch to avoid thread deadlocks if it was reserved
 
1374
    by session 
 
1375
  */
 
1376
  ha_release_temporary_latches(session);
 
1377
 
 
1378
  /* Unlock tables before sending packet to gain some speed */
 
1379
  if (session->lock)
 
1380
  {
 
1381
    mysql_unlock_tables(session, session->lock);
 
1382
    session->lock=0;
 
1383
  }
 
1384
  ::my_eof(session);
 
1385
  is_result_set_started= 0;
 
1386
  return false;
 
1387
}
 
1388
 
 
1389
 
930
1390
/************************************************************************
931
1391
  Handling writing to file
932
1392
************************************************************************/
933
1393
 
934
 
void select_to_file::send_error(drizzled::error_t errcode,const char *err)
 
1394
void select_to_file::send_error(uint32_t errcode,const char *err)
935
1395
{
936
1396
  my_message(errcode, err, MYF(0));
937
1397
  if (file > 0)
938
1398
  {
939
 
    (void) cache->end_io_cache();
940
 
    (void) internal::my_close(file, MYF(0));
941
 
    (void) internal::my_delete(path.file_string().c_str(), MYF(0));             // Delete file on error
 
1399
    (void) end_io_cache(&cache);
 
1400
    (void) my_close(file,MYF(0));
 
1401
    (void) my_delete(path,MYF(0));              // Delete file on error
942
1402
    file= -1;
943
1403
  }
944
1404
}
946
1406
 
947
1407
bool select_to_file::send_eof()
948
1408
{
949
 
  int error= test(cache->end_io_cache());
950
 
  if (internal::my_close(file, MYF(MY_WME)))
 
1409
  int error= test(end_io_cache(&cache));
 
1410
  if (my_close(file,MYF(MY_WME)))
951
1411
    error= 1;
952
1412
  if (!error)
953
1413
  {
956
1416
      function, SELECT INTO has to have an own SQLCOM.
957
1417
      TODO: split from SQLCOM_SELECT
958
1418
    */
959
 
    session->my_ok(row_count);
 
1419
    ::my_ok(session,row_count);
960
1420
  }
961
1421
  file= -1;
962
1422
  return error;
968
1428
  /* In case of error send_eof() may be not called: close the file here. */
969
1429
  if (file >= 0)
970
1430
  {
971
 
    (void) cache->end_io_cache();
972
 
    (void) internal::my_close(file, MYF(0));
 
1431
    (void) end_io_cache(&cache);
 
1432
    (void) my_close(file,MYF(0));
973
1433
    file= -1;
974
1434
  }
975
 
  path= "";
 
1435
  path[0]= '\0';
976
1436
  row_count= 0;
977
1437
}
978
1438
 
979
 
select_to_file::select_to_file(file_exchange *ex)
980
 
  : exchange(ex),
981
 
    file(-1),
982
 
    cache(static_cast<internal::IO_CACHE *>(memory::sql_calloc(sizeof(internal::IO_CACHE)))),
983
 
    row_count(0L)
984
 
{
985
 
  path= "";
986
 
}
987
1439
 
988
1440
select_to_file::~select_to_file()
989
1441
{
990
 
  cleanup();
 
1442
  if (file >= 0)
 
1443
  {                                     // This only happens in case of error
 
1444
    (void) end_io_cache(&cache);
 
1445
    (void) my_close(file,MYF(0));
 
1446
    file= -1;
 
1447
  }
991
1448
}
992
1449
 
993
1450
/***************************************************************************
1016
1473
*/
1017
1474
 
1018
1475
 
1019
 
static int create_file(Session *session,
1020
 
                       fs::path &target_path,
1021
 
                       file_exchange *exchange,
1022
 
                       internal::IO_CACHE *cache)
 
1476
static File create_file(Session *session, char *path, sql_exchange *exchange,
 
1477
                        IO_CACHE *cache)
1023
1478
{
1024
 
  fs::path to_file(exchange->file_name);
1025
 
  int file;
1026
 
 
1027
 
  if (not to_file.has_root_directory())
 
1479
  File file;
 
1480
  uint32_t option= MY_UNPACK_FILENAME | MY_RELATIVE_PATH;
 
1481
 
 
1482
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
 
1483
  option|= MY_REPLACE_DIR;                      // Force use of db directory
 
1484
#endif
 
1485
 
 
1486
  if (!dirname_length(exchange->file_name))
1028
1487
  {
1029
 
    target_path= fs::system_complete(getDataHomeCatalog());
1030
 
    util::string::const_shared_ptr schema(session->schema());
1031
 
    if (schema and not schema->empty())
1032
 
    {
1033
 
      int count_elements= 0;
1034
 
      for (fs::path::iterator iter= to_file.begin();
1035
 
           iter != to_file.end();
1036
 
           ++iter, ++count_elements)
1037
 
      { }
1038
 
 
1039
 
      if (count_elements == 1)
1040
 
      {
1041
 
        target_path /= *schema;
1042
 
      }
1043
 
    }
1044
 
    target_path /= to_file;
 
1488
    strcpy(path, mysql_real_data_home);
 
1489
    if (session->db)
 
1490
      strncat(path, session->db, FN_REFLEN-strlen(mysql_real_data_home)-1);
 
1491
    (void) fn_format(path, exchange->file_name, path, "", option);
1045
1492
  }
1046
1493
  else
1047
 
  {
1048
 
    target_path = exchange->file_name;
1049
 
  }
1050
 
 
1051
 
  if (not secure_file_priv.string().empty())
1052
 
  {
1053
 
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
1054
 
    {
1055
 
      /* Write only allowed to dir or subdir specified by secure_file_priv */
1056
 
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
1057
 
      return -1;
1058
 
    }
1059
 
  }
1060
 
 
1061
 
  if (!access(target_path.file_string().c_str(), F_OK))
 
1494
    (void) fn_format(path, exchange->file_name, mysql_real_data_home, "", option);
 
1495
 
 
1496
  if (opt_secure_file_priv &&
 
1497
      strncmp(opt_secure_file_priv, path, strlen(opt_secure_file_priv)))
 
1498
  {
 
1499
    /* Write only allowed to dir or subdir specified by secure_file_priv */
 
1500
    my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
1501
    return -1;
 
1502
  }
 
1503
 
 
1504
  if (!access(path, F_OK))
1062
1505
  {
1063
1506
    my_error(ER_FILE_EXISTS_ERROR, MYF(0), exchange->file_name);
1064
1507
    return -1;
1065
1508
  }
1066
1509
  /* Create the file world readable */
1067
 
  if ((file= internal::my_create(target_path.file_string().c_str(), 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
 
1510
  if ((file= my_create(path, 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
1068
1511
    return file;
 
1512
#ifdef HAVE_FCHMOD
1069
1513
  (void) fchmod(file, 0666);                    // Because of umask()
1070
 
  if (cache->init_io_cache(file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
 
1514
#else
 
1515
  (void) chmod(path, 0666);
 
1516
#endif
 
1517
  if (init_io_cache(cache, file, 0L, WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1071
1518
  {
1072
 
    internal::my_close(file, MYF(0));
1073
 
    internal::my_delete(target_path.file_string().c_str(), MYF(0));  // Delete file on error, it was just created
 
1519
    my_close(file, MYF(0));
 
1520
    my_delete(path, MYF(0));  // Delete file on error, it was just created 
1074
1521
    return -1;
1075
1522
  }
1076
1523
  return file;
1078
1525
 
1079
1526
 
1080
1527
int
1081
 
select_export::prepare(List<Item> &list, Select_Lex_Unit *u)
 
1528
select_export::prepare(List<Item> &list, SELECT_LEX_UNIT *u)
1082
1529
{
1083
1530
  bool blob_flag=0;
1084
1531
  bool string_results= false, non_string_results= false;
1085
1532
  unit= u;
1086
 
  if ((uint32_t) strlen(exchange->file_name) + NAME_LEN >= FN_REFLEN)
1087
 
  {
1088
 
    path= exchange->file_name;
1089
 
  }
 
1533
  if ((uint) strlen(exchange->file_name) + NAME_LEN >= FN_REFLEN)
 
1534
    strmake(path,exchange->file_name,FN_REFLEN-1);
1090
1535
 
 
1536
  if ((file= create_file(session, path, exchange, &cache)) < 0)
 
1537
    return 1;
1091
1538
  /* Check if there is any blobs in data */
1092
1539
  {
1093
 
    List<Item>::iterator li(list.begin());
 
1540
    List_iterator_fast<Item> li(list);
1094
1541
    Item *item;
1095
1542
    while ((item=li++))
1096
1543
    {
1097
1544
      if (item->max_length >= MAX_BLOB_WIDTH)
1098
1545
      {
1099
 
        blob_flag=1;
1100
 
        break;
 
1546
        blob_flag=1;
 
1547
        break;
1101
1548
      }
1102
 
 
1103
1549
      if (item->result_type() == STRING_RESULT)
1104
1550
        string_results= true;
1105
1551
      else
1130
1576
      (exchange->opt_enclosed && non_string_results &&
1131
1577
       field_term_length && strchr(NUMERIC_CHARS, field_term_char)))
1132
1578
  {
1133
 
    my_error(ER_AMBIGUOUS_FIELD_TERM, MYF(0));
1134
 
    return 1;
 
1579
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1580
                 ER_AMBIGUOUS_FIELD_TERM, ER(ER_AMBIGUOUS_FIELD_TERM));
 
1581
    is_ambiguous_field_term= true;
1135
1582
  }
1136
 
 
1137
 
  if ((file= create_file(session, path, exchange, cache)) < 0)
1138
 
    return 1;
 
1583
  else
 
1584
    is_ambiguous_field_term= false;
1139
1585
 
1140
1586
  return 0;
1141
1587
}
1142
1588
 
 
1589
 
 
1590
#define NEED_ESCAPING(x) ((int) (unsigned char) (x) == escape_char    || \
 
1591
                          (enclosed ? (int) (unsigned char) (x) == field_sep_char      \
 
1592
                                    : (int) (unsigned char) (x) == field_term_char) || \
 
1593
                          (int) (unsigned char) (x) == line_sep_char  || \
 
1594
                          !(x))
 
1595
 
1143
1596
bool select_export::send_data(List<Item> &items)
1144
1597
{
1145
1598
  char buff[MAX_FIELD_WIDTH],null_buff[2],space[MAX_FIELD_WIDTH];
1150
1603
  if (unit->offset_limit_cnt)
1151
1604
  {                                             // using limit offset,count
1152
1605
    unit->offset_limit_cnt--;
1153
 
    return false;
 
1606
    return(0);
1154
1607
  }
1155
1608
  row_count++;
1156
1609
  Item *item;
1157
1610
  uint32_t used_length=0,items_left=items.elements;
1158
 
  List<Item>::iterator li(items.begin());
1159
 
 
1160
 
  if (my_b_write(cache,(unsigned char*) exchange->line_start->ptr(),
1161
 
                 exchange->line_start->length()))
1162
 
    return true;
1163
 
 
 
1611
  List_iterator_fast<Item> li(items);
 
1612
 
 
1613
  if (my_b_write(&cache,(unsigned char*) exchange->line_start->ptr(),
 
1614
                 exchange->line_start->length()))
 
1615
    goto err;
1164
1616
  while ((item=li++))
1165
1617
  {
1166
1618
    Item_result result_type=item->result_type();
1169
1621
    res=item->str_result(&tmp);
1170
1622
    if (res && enclosed)
1171
1623
    {
1172
 
      if (my_b_write(cache,(unsigned char*) exchange->enclosed->ptr(),
1173
 
                     exchange->enclosed->length()))
1174
 
        return true;
 
1624
      if (my_b_write(&cache,(unsigned char*) exchange->enclosed->ptr(),
 
1625
                     exchange->enclosed->length()))
 
1626
        goto err;
1175
1627
    }
1176
1628
    if (!res)
1177
1629
    {                                           // NULL
1178
1630
      if (!fixed_row_size)
1179
1631
      {
1180
 
        if (escape_char != -1)                  // Use \N syntax
1181
 
        {
1182
 
          null_buff[0]=escape_char;
1183
 
          null_buff[1]='N';
1184
 
          if (my_b_write(cache,(unsigned char*) null_buff,2))
1185
 
            return true;
1186
 
        }
1187
 
        else if (my_b_write(cache,(unsigned char*) "NULL",4))
1188
 
          return true;
 
1632
        if (escape_char != -1)                  // Use \N syntax
 
1633
        {
 
1634
          null_buff[0]=escape_char;
 
1635
          null_buff[1]='N';
 
1636
          if (my_b_write(&cache,(unsigned char*) null_buff,2))
 
1637
            goto err;
 
1638
        }
 
1639
        else if (my_b_write(&cache,(unsigned char*) "NULL",4))
 
1640
          goto err;
1189
1641
      }
1190
1642
      else
1191
1643
      {
1192
 
        used_length=0;                          // Fill with space
 
1644
        used_length=0;                          // Fill with space
1193
1645
      }
1194
1646
    }
1195
1647
    else
1196
1648
    {
1197
1649
      if (fixed_row_size)
1198
 
        used_length= min(res->length(), static_cast<size_t>(item->max_length));
 
1650
        used_length=cmin(res->length(),item->max_length);
1199
1651
      else
1200
 
        used_length= res->length();
1201
 
 
 
1652
        used_length=res->length();
1202
1653
      if ((result_type == STRING_RESULT || is_unsafe_field_sep) &&
1203
 
          escape_char != -1)
 
1654
           escape_char != -1)
1204
1655
      {
1205
1656
        char *pos, *start, *end;
1206
1657
        const CHARSET_INFO * const res_charset= res->charset();
1207
 
        const CHARSET_INFO * const character_set_client= default_charset_info;
1208
 
 
 
1658
        const CHARSET_INFO * const character_set_client= session->variables.
 
1659
                                                            character_set_client;
1209
1660
        bool check_second_byte= (res_charset == &my_charset_bin) &&
1210
 
          character_set_client->
1211
 
          escape_with_backslash_is_dangerous;
 
1661
                                 character_set_client->
 
1662
                                 escape_with_backslash_is_dangerous;
1212
1663
        assert(character_set_client->mbmaxlen == 2 ||
1213
 
               !character_set_client->escape_with_backslash_is_dangerous);
1214
 
        for (start=pos=(char*) res->ptr(),end=pos+used_length ;
1215
 
             pos != end ;
1216
 
             pos++)
1217
 
        {
1218
 
          if (use_mb(res_charset))
1219
 
          {
1220
 
            int l;
1221
 
            if ((l=my_ismbchar(res_charset, pos, end)))
1222
 
            {
1223
 
              pos += l-1;
1224
 
              continue;
1225
 
            }
1226
 
          }
 
1664
                    !character_set_client->escape_with_backslash_is_dangerous);
 
1665
        for (start=pos=(char*) res->ptr(),end=pos+used_length ;
 
1666
             pos != end ;
 
1667
             pos++)
 
1668
        {
 
1669
#ifdef USE_MB
 
1670
          if (use_mb(res_charset))
 
1671
          {
 
1672
            int l;
 
1673
            if ((l=my_ismbchar(res_charset, pos, end)))
 
1674
            {
 
1675
              pos += l-1;
 
1676
              continue;
 
1677
            }
 
1678
          }
 
1679
#endif
1227
1680
 
1228
1681
          /*
1229
1682
            Special case when dumping BINARY/VARBINARY/BLOB values
1230
1683
            for the clients with character sets big5, cp932, gbk and sjis,
1231
1684
            which can have the escape character (0x5C "\" by default)
1232
1685
            as the second byte of a multi-byte sequence.
1233
 
 
 
1686
            
1234
1687
            If
1235
1688
            - pos[0] is a valid multi-byte head (e.g 0xEE) and
1236
1689
            - pos[1] is 0x00, which will be escaped as "\0",
1237
 
 
 
1690
            
1238
1691
            then we'll get "0xEE + 0x5C + 0x30" in the output file.
1239
 
 
 
1692
            
1240
1693
            If this file is later loaded using this sequence of commands:
1241
 
 
 
1694
            
1242
1695
            mysql> create table t1 (a varchar(128)) character set big5;
1243
1696
            mysql> LOAD DATA INFILE 'dump.txt' INTO Table t1;
1244
 
 
 
1697
            
1245
1698
            then 0x5C will be misinterpreted as the second byte
1246
1699
            of a multi-byte character "0xEE + 0x5C", instead of
1247
1700
            escape character for 0x00.
1248
 
 
 
1701
            
1249
1702
            To avoid this confusion, we'll escape the multi-byte
1250
1703
            head character too, so the sequence "0xEE + 0x00" will be
1251
1704
            dumped as "0x5C + 0xEE + 0x5C + 0x30".
1252
 
 
 
1705
            
1253
1706
            Note, in the condition below we only check if
1254
1707
            mbcharlen is equal to 2, because there are no
1255
1708
            character sets with mbmaxlen longer than 2
1257
1710
            assert before the loop makes that sure.
1258
1711
          */
1259
1712
 
1260
 
          if ((needs_escaping(*pos, enclosed) ||
 
1713
          if ((NEED_ESCAPING(*pos) ||
1261
1714
               (check_second_byte &&
1262
1715
                my_mbcharlen(character_set_client, (unsigned char) *pos) == 2 &&
1263
1716
                pos + 1 < end &&
1264
 
                needs_escaping(pos[1], enclosed))) &&
 
1717
                NEED_ESCAPING(pos[1]))) &&
1265
1718
              /*
1266
 
                Don't escape field_term_char by doubling - doubling is only
1267
 
                valid for ENCLOSED BY characters:
 
1719
               Don't escape field_term_char by doubling - doubling is only
 
1720
               valid for ENCLOSED BY characters:
1268
1721
              */
1269
1722
              (enclosed || !is_ambiguous_field_term ||
1270
1723
               (int) (unsigned char) *pos != field_term_char))
1271
1724
          {
1272
 
            char tmp_buff[2];
 
1725
            char tmp_buff[2];
1273
1726
            tmp_buff[0]= ((int) (unsigned char) *pos == field_sep_char &&
1274
1727
                          is_ambiguous_field_sep) ?
1275
 
              field_sep_char : escape_char;
1276
 
            tmp_buff[1]= *pos ? *pos : '0';
1277
 
            if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)) ||
1278
 
                my_b_write(cache,(unsigned char*) tmp_buff,2))
1279
 
              return true;
1280
 
            start=pos+1;
1281
 
          }
1282
 
        }
1283
 
        if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)))
1284
 
          return true;
 
1728
                          field_sep_char : escape_char;
 
1729
            tmp_buff[1]= *pos ? *pos : '0';
 
1730
            if (my_b_write(&cache,(unsigned char*) start,(uint) (pos-start)) ||
 
1731
                my_b_write(&cache,(unsigned char*) tmp_buff,2))
 
1732
              goto err;
 
1733
            start=pos+1;
 
1734
          }
 
1735
        }
 
1736
        if (my_b_write(&cache,(unsigned char*) start,(uint) (pos-start)))
 
1737
          goto err;
1285
1738
      }
1286
 
      else if (my_b_write(cache,(unsigned char*) res->ptr(),used_length))
1287
 
        return true;
 
1739
      else if (my_b_write(&cache,(unsigned char*) res->ptr(),used_length))
 
1740
        goto err;
1288
1741
    }
1289
1742
    if (fixed_row_size)
1290
1743
    {                                           // Fill with space
1291
1744
      if (item->max_length > used_length)
1292
1745
      {
1293
 
        /* QQ:  Fix by adding a my_b_fill() function */
1294
 
        if (!space_inited)
1295
 
        {
1296
 
          space_inited=1;
1297
 
          memset(space, ' ', sizeof(space));
1298
 
        }
1299
 
        uint32_t length=item->max_length-used_length;
1300
 
        for (; length > sizeof(space) ; length-=sizeof(space))
1301
 
        {
1302
 
          if (my_b_write(cache,(unsigned char*) space,sizeof(space)))
1303
 
            return true;
1304
 
        }
1305
 
        if (my_b_write(cache,(unsigned char*) space,length))
1306
 
          return true;
 
1746
        /* QQ:  Fix by adding a my_b_fill() function */
 
1747
        if (!space_inited)
 
1748
        {
 
1749
          space_inited=1;
 
1750
          memset(space, ' ', sizeof(space));
 
1751
        }
 
1752
        uint32_t length=item->max_length-used_length;
 
1753
        for (; length > sizeof(space) ; length-=sizeof(space))
 
1754
        {
 
1755
          if (my_b_write(&cache,(unsigned char*) space,sizeof(space)))
 
1756
            goto err;
 
1757
        }
 
1758
        if (my_b_write(&cache,(unsigned char*) space,length))
 
1759
          goto err;
1307
1760
      }
1308
1761
    }
1309
1762
    if (res && enclosed)
1310
1763
    {
1311
 
      if (my_b_write(cache, (unsigned char*) exchange->enclosed->ptr(),
 
1764
      if (my_b_write(&cache, (unsigned char*) exchange->enclosed->ptr(),
1312
1765
                     exchange->enclosed->length()))
1313
 
        return true;
 
1766
        goto err;
1314
1767
    }
1315
1768
    if (--items_left)
1316
1769
    {
1317
 
      if (my_b_write(cache, (unsigned char*) exchange->field_term->ptr(),
 
1770
      if (my_b_write(&cache, (unsigned char*) exchange->field_term->ptr(),
1318
1771
                     field_term_length))
1319
 
        return true;
 
1772
        goto err;
1320
1773
    }
1321
1774
  }
1322
 
  if (my_b_write(cache,(unsigned char*) exchange->line_term->ptr(),
1323
 
                 exchange->line_term->length()))
1324
 
  {
1325
 
    return true;
1326
 
  }
1327
 
 
1328
 
  return false;
 
1775
  if (my_b_write(&cache,(unsigned char*) exchange->line_term->ptr(),
 
1776
                 exchange->line_term->length()))
 
1777
    goto err;
 
1778
  return(0);
 
1779
err:
 
1780
  return(1);
1329
1781
}
1330
1782
 
1331
1783
 
1335
1787
 
1336
1788
 
1337
1789
int
1338
 
select_dump::prepare(List<Item> &, Select_Lex_Unit *u)
 
1790
select_dump::prepare(List<Item> &list __attribute__((unused)),
 
1791
                     SELECT_LEX_UNIT *u)
1339
1792
{
1340
1793
  unit= u;
1341
 
  return (int) ((file= create_file(session, path, exchange, cache)) < 0);
 
1794
  return (int) ((file= create_file(session, path, exchange, &cache)) < 0);
1342
1795
}
1343
1796
 
1344
1797
 
1345
1798
bool select_dump::send_data(List<Item> &items)
1346
1799
{
1347
 
  List<Item>::iterator li(items.begin());
 
1800
  List_iterator_fast<Item> li(items);
1348
1801
  char buff[MAX_FIELD_WIDTH];
1349
1802
  String tmp(buff,sizeof(buff),&my_charset_bin),*res;
1350
1803
  tmp.length(0);
1355
1808
    unit->offset_limit_cnt--;
1356
1809
    return(0);
1357
1810
  }
1358
 
  if (row_count++ > 1)
 
1811
  if (row_count++ > 1) 
1359
1812
  {
1360
1813
    my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0));
1361
 
    return 1;
 
1814
    goto err;
1362
1815
  }
1363
1816
  while ((item=li++))
1364
1817
  {
1365
1818
    res=item->str_result(&tmp);
1366
1819
    if (!res)                                   // If NULL
1367
1820
    {
1368
 
      if (my_b_write(cache,(unsigned char*) "",1))
1369
 
        return 1;
 
1821
      if (my_b_write(&cache,(unsigned char*) "",1))
 
1822
        goto err;
1370
1823
    }
1371
 
    else if (my_b_write(cache,(unsigned char*) res->ptr(),res->length()))
 
1824
    else if (my_b_write(&cache,(unsigned char*) res->ptr(),res->length()))
1372
1825
    {
1373
 
      my_error(ER_ERROR_ON_WRITE, MYF(0), path.file_string().c_str(), errno);
1374
 
      return 1;
 
1826
      my_error(ER_ERROR_ON_WRITE, MYF(0), path, my_errno);
 
1827
      goto err;
1375
1828
    }
1376
1829
  }
1377
1830
  return(0);
 
1831
err:
 
1832
  return(1);
1378
1833
}
1379
1834
 
1380
1835
 
1397
1852
    unit->offset_limit_cnt--;
1398
1853
    return(0);
1399
1854
  }
1400
 
  List<Item>::iterator li(items.begin());
 
1855
  List_iterator_fast<Item> li(items);
1401
1856
  Item *val_item;
1402
1857
  for (uint32_t i= 0; (val_item= li++); i++)
1403
1858
    it->store(i, val_item);
1409
1864
void select_max_min_finder_subselect::cleanup()
1410
1865
{
1411
1866
  cache= 0;
 
1867
  return;
1412
1868
}
1413
1869
 
1414
1870
 
1415
1871
bool select_max_min_finder_subselect::send_data(List<Item> &items)
1416
1872
{
1417
1873
  Item_maxmin_subselect *it= (Item_maxmin_subselect *)item;
1418
 
  List<Item>::iterator li(items.begin());
 
1874
  List_iterator_fast<Item> li(items);
1419
1875
  Item *val_item= li++;
1420
1876
  it->register_value();
1421
1877
  if (it->assigned())
1432
1888
      switch (val_item->result_type())
1433
1889
      {
1434
1890
      case REAL_RESULT:
1435
 
        op= &select_max_min_finder_subselect::cmp_real;
1436
 
        break;
 
1891
        op= &select_max_min_finder_subselect::cmp_real;
 
1892
        break;
1437
1893
      case INT_RESULT:
1438
 
        op= &select_max_min_finder_subselect::cmp_int;
1439
 
        break;
 
1894
        op= &select_max_min_finder_subselect::cmp_int;
 
1895
        break;
1440
1896
      case STRING_RESULT:
1441
 
        op= &select_max_min_finder_subselect::cmp_str;
1442
 
        break;
 
1897
        op= &select_max_min_finder_subselect::cmp_str;
 
1898
        break;
1443
1899
      case DECIMAL_RESULT:
1444
1900
        op= &select_max_min_finder_subselect::cmp_decimal;
1445
1901
        break;
1446
1902
      case ROW_RESULT:
1447
1903
        // This case should never be choosen
1448
 
        assert(0);
1449
 
        op= 0;
 
1904
        assert(0);
 
1905
        op= 0;
1450
1906
      }
1451
1907
    }
1452
1908
    cache->store(val_item);
1485
1941
bool select_max_min_finder_subselect::cmp_decimal()
1486
1942
{
1487
1943
  Item *maxmin= ((Item_singlerow_subselect *)item)->element_index(0);
1488
 
  type::Decimal cval, *cvalue= cache->val_decimal(&cval);
1489
 
  type::Decimal mval, *mvalue= maxmin->val_decimal(&mval);
 
1944
  my_decimal cval, *cvalue= cache->val_decimal(&cval);
 
1945
  my_decimal mval, *mvalue= maxmin->val_decimal(&mval);
1490
1946
  if (fmax)
1491
1947
    return (cache->null_value && !maxmin->null_value) ||
1492
1948
      (!cache->null_value && !maxmin->null_value &&
1493
 
       class_decimal_cmp(cvalue, mvalue) > 0) ;
 
1949
       my_decimal_cmp(cvalue, mvalue) > 0) ;
1494
1950
  return (maxmin->null_value && !cache->null_value) ||
1495
1951
    (!cache->null_value && !maxmin->null_value &&
1496
 
     class_decimal_cmp(cvalue,mvalue) < 0);
 
1952
     my_decimal_cmp(cvalue,mvalue) < 0);
1497
1953
}
1498
1954
 
1499
1955
bool select_max_min_finder_subselect::cmp_str()
1515
1971
     sortcmp(val1, val2, cache->collation.collation) < 0);
1516
1972
}
1517
1973
 
1518
 
bool select_exists_subselect::send_data(List<Item> &)
 
1974
bool select_exists_subselect::send_data(List<Item> &items __attribute__((unused)))
1519
1975
{
1520
1976
  Item_exists_subselect *it= (Item_exists_subselect *)item;
1521
1977
  if (unit->offset_limit_cnt)
1528
1984
  return(0);
1529
1985
}
1530
1986
 
 
1987
 
 
1988
/***************************************************************************
 
1989
  Dump of select to variables
 
1990
***************************************************************************/
 
1991
 
 
1992
int select_dumpvar::prepare(List<Item> &list, SELECT_LEX_UNIT *u)
 
1993
{
 
1994
  unit= u;
 
1995
  
 
1996
  if (var_list.elements != list.elements)
 
1997
  {
 
1998
    my_message(ER_WRONG_NUMBER_OF_COLUMNS_IN_SELECT,
 
1999
               ER(ER_WRONG_NUMBER_OF_COLUMNS_IN_SELECT), MYF(0));
 
2000
    return 1;
 
2001
  }               
 
2002
  return 0;
 
2003
}
 
2004
 
 
2005
 
 
2006
bool select_dumpvar::check_simple_select() const
 
2007
{
 
2008
  my_error(ER_SP_BAD_CURSOR_SELECT, MYF(0));
 
2009
  return true;
 
2010
}
 
2011
 
 
2012
 
 
2013
void select_dumpvar::cleanup()
 
2014
{
 
2015
  row_count= 0;
 
2016
}
 
2017
 
 
2018
 
 
2019
void Query_arena::free_items()
 
2020
{
 
2021
  Item *next;
 
2022
  /* This works because items are allocated with sql_alloc() */
 
2023
  for (; free_list; free_list= next)
 
2024
  {
 
2025
    next= free_list->next;
 
2026
    free_list->delete_self();
 
2027
  }
 
2028
  /* Postcondition: free_list is 0 */
 
2029
  return;
 
2030
}
 
2031
 
 
2032
 
 
2033
/*
 
2034
  Statement functions
 
2035
*/
 
2036
 
 
2037
Statement::Statement(LEX *lex_arg, MEM_ROOT *mem_root_arg, ulong id_arg)
 
2038
  :Query_arena(mem_root_arg),
 
2039
  id(id_arg),
 
2040
  mark_used_columns(MARK_COLUMNS_READ),
 
2041
  lex(lex_arg),
 
2042
  query(0),
 
2043
  query_length(0),
 
2044
  db(NULL),
 
2045
  db_length(0)
 
2046
{
 
2047
}
 
2048
 
 
2049
 
1531
2050
/*
1532
2051
  Don't free mem_root, as mem_root is freed in the end of dispatch_command
1533
2052
  (once for any command).
1535
2054
void Session::end_statement()
1536
2055
{
1537
2056
  /* Cleanup SQL processing state to reuse this statement in next query. */
1538
 
  lex->end();
1539
 
  query_cache_key= ""; // reset the cache key
1540
 
  resetResultsetMessage();
 
2057
  lex_end(lex);
1541
2058
}
1542
2059
 
 
2060
 
1543
2061
bool Session::copy_db_to(char **p_db, size_t *p_db_length)
1544
2062
{
1545
 
  assert(_schema);
1546
 
  if (_schema and _schema->empty())
1547
 
  {
1548
 
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1549
 
    return true;
1550
 
  }
1551
 
  else if (not _schema)
1552
 
  {
1553
 
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1554
 
    return true;
1555
 
  }
1556
 
  assert(_schema);
1557
 
 
1558
 
  *p_db= strmake(_schema->c_str(), _schema->size());
1559
 
  *p_db_length= _schema->size();
1560
 
 
 
2063
  if (db == NULL)
 
2064
  {
 
2065
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
 
2066
    return true;
 
2067
  }
 
2068
  *p_db= strmake(db, db_length);
 
2069
  *p_db_length= db_length;
1561
2070
  return false;
1562
2071
}
1563
2072
 
 
2073
 
 
2074
bool select_dumpvar::send_data(List<Item> &items)
 
2075
{
 
2076
  List_iterator_fast<my_var> var_li(var_list);
 
2077
  List_iterator<Item> it(items);
 
2078
  Item *item;
 
2079
  my_var *mv;
 
2080
 
 
2081
  if (unit->offset_limit_cnt)
 
2082
  {                                             // using limit offset,count
 
2083
    unit->offset_limit_cnt--;
 
2084
    return(0);
 
2085
  }
 
2086
  if (row_count++) 
 
2087
  {
 
2088
    my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0));
 
2089
    return(1);
 
2090
  }
 
2091
  while ((mv= var_li++) && (item= it++))
 
2092
  {
 
2093
    if (mv->local == 0)
 
2094
    {
 
2095
      Item_func_set_user_var *suv= new Item_func_set_user_var(mv->s, item);
 
2096
      suv->fix_fields(session, 0);
 
2097
      suv->check(0);
 
2098
      suv->update();
 
2099
    }
 
2100
  }
 
2101
  return(session->is_error());
 
2102
}
 
2103
 
 
2104
bool select_dumpvar::send_eof()
 
2105
{
 
2106
  if (! row_count)
 
2107
    push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
2108
                 ER_SP_FETCH_NO_DATA, ER(ER_SP_FETCH_NO_DATA));
 
2109
  /*
 
2110
    In order to remember the value of affected rows for ROW_COUNT()
 
2111
    function, SELECT INTO has to have an own SQLCOM.
 
2112
    TODO: split from SQLCOM_SELECT
 
2113
  */
 
2114
  ::my_ok(session,row_count);
 
2115
  return 0;
 
2116
}
 
2117
 
1564
2118
/****************************************************************************
1565
 
  Tmp_Table_Param
 
2119
  TMP_TABLE_PARAM
1566
2120
****************************************************************************/
1567
2121
 
1568
 
void Tmp_Table_Param::init()
 
2122
void TMP_TABLE_PARAM::init()
1569
2123
{
1570
2124
  field_count= sum_func_count= func_count= hidden_field_count= 0;
1571
2125
  group_parts= group_length= group_null_parts= 0;
1572
2126
  quick_group= 1;
1573
2127
  table_charset= 0;
1574
2128
  precomputed_group_by= 0;
 
2129
  bit_fields_as_long= 0;
 
2130
  return;
1575
2131
}
1576
2132
 
1577
 
void Tmp_Table_Param::cleanup(void)
 
2133
 
 
2134
void session_increment_bytes_sent(ulong length)
1578
2135
{
1579
 
  /* Fix for Intel compiler */
1580
 
  if (copy_field)
1581
 
  {
1582
 
    boost::checked_array_delete(copy_field);
1583
 
    save_copy_field= save_copy_field_end= copy_field= copy_field_end= 0;
 
2136
  Session *session=current_session;
 
2137
  if (likely(session != 0))
 
2138
  { /* current_session==0 when close_connection() calls net_send_error() */
 
2139
    session->status_var.bytes_sent+= length;
1584
2140
  }
1585
2141
}
1586
2142
 
 
2143
 
 
2144
void session_increment_bytes_received(ulong length)
 
2145
{
 
2146
  current_session->status_var.bytes_received+= length;
 
2147
}
 
2148
 
 
2149
 
 
2150
void session_increment_net_big_packet_count(ulong length)
 
2151
{
 
2152
  current_session->status_var.net_big_packet_count+= length;
 
2153
}
 
2154
 
1587
2155
void Session::send_kill_message() const
1588
2156
{
1589
 
  drizzled::error_t err= static_cast<drizzled::error_t>(killed_errno());
1590
 
  if (err != EE_OK)
 
2157
  int err= killed_errno();
 
2158
  if (err)
1591
2159
    my_message(err, ER(err), MYF(0));
1592
2160
}
1593
2161
 
1597
2165
}
1598
2166
 
1599
2167
 
1600
 
void Session::set_db(const std::string &new_db)
1601
 
{
1602
 
  /* Do not reallocate memory if current chunk is big enough. */
1603
 
  if (new_db.length())
1604
 
  {
1605
 
    _schema.reset(new std::string(new_db));
1606
 
  }
1607
 
  else
1608
 
  {
1609
 
    _schema.reset(new std::string(""));
1610
 
  }
1611
 
}
 
2168
void Security_context::init()
 
2169
{
 
2170
  user= ip= 0;
 
2171
}
 
2172
 
 
2173
 
 
2174
void Security_context::destroy()
 
2175
{
 
2176
  // If not pointer to constant
 
2177
  if (user)
 
2178
  {
 
2179
    free(user);
 
2180
    user= NULL;
 
2181
  }
 
2182
  if (ip)
 
2183
  {
 
2184
    free(ip);
 
2185
    ip= NULL;
 
2186
  }
 
2187
}
 
2188
 
 
2189
 
 
2190
void Security_context::skip_grants()
 
2191
{
 
2192
  /* privileges for the user are unknown everything is allowed */
 
2193
}
 
2194
 
 
2195
 
 
2196
/****************************************************************************
 
2197
  Handling of open and locked tables states.
 
2198
 
 
2199
  This is used when we want to open/lock (and then close) some tables when
 
2200
  we already have a set of tables open and locked. We use these methods for
 
2201
  access to mysql.proc table to find definitions of stored routines.
 
2202
****************************************************************************/
 
2203
 
 
2204
void Session::reset_n_backup_open_tables_state(Open_tables_state *backup)
 
2205
{
 
2206
  backup->set_open_tables_state(this);
 
2207
  reset_open_tables_state();
 
2208
  state_flags|= Open_tables_state::BACKUPS_AVAIL;
 
2209
  return;
 
2210
}
 
2211
 
 
2212
 
 
2213
void Session::restore_backup_open_tables_state(Open_tables_state *backup)
 
2214
{
 
2215
  /*
 
2216
    Before we will throw away current open tables state we want
 
2217
    to be sure that it was properly cleaned up.
 
2218
  */
 
2219
  assert(open_tables == 0 && temporary_tables == 0 &&
 
2220
              handler_tables == 0 && derived_tables == 0 &&
 
2221
              lock == 0 && locked_tables == 0);
 
2222
  set_open_tables_state(backup);
 
2223
  return;
 
2224
}
 
2225
 
 
2226
/**
 
2227
  Check the killed state of a user thread
 
2228
  @param session  user thread
 
2229
  @retval 0 the user thread is active
 
2230
  @retval 1 the user thread has been killed
 
2231
*/
 
2232
extern "C" int session_killed(const Session *session)
 
2233
{
 
2234
  return(session->killed);
 
2235
}
 
2236
 
 
2237
/**
 
2238
  Return the thread id of a user thread
 
2239
  @param session user thread
 
2240
  @return thread id
 
2241
*/
 
2242
extern "C" unsigned long session_get_thread_id(const Session *session)
 
2243
{
 
2244
  return((unsigned long)session->thread_id);
 
2245
}
 
2246
 
 
2247
 
 
2248
#ifdef INNODB_COMPATIBILITY_HOOKS
 
2249
extern "C" const struct charset_info_st *session_charset(Session *session)
 
2250
{
 
2251
  return(session->charset());
 
2252
}
 
2253
 
 
2254
extern "C" char **session_query(Session *session)
 
2255
{
 
2256
  return(&session->query);
 
2257
}
 
2258
 
 
2259
extern "C" int session_slave_thread(const Session *session)
 
2260
{
 
2261
  return(session->slave_thread);
 
2262
}
 
2263
 
 
2264
extern "C" int session_non_transactional_update(const Session *session)
 
2265
{
 
2266
  return(session->transaction.all.modified_non_trans_table);
 
2267
}
 
2268
 
 
2269
extern "C" int session_binlog_format(const Session *session)
 
2270
{
 
2271
  return (int) session->variables.binlog_format;
 
2272
}
 
2273
 
 
2274
extern "C" void session_mark_transaction_to_rollback(Session *session, bool all)
 
2275
{
 
2276
  mark_transaction_to_rollback(session, all);
 
2277
}
 
2278
#endif // INNODB_COMPATIBILITY_HOOKS */
1612
2279
 
1613
2280
 
1614
2281
/**
1617
2284
  @param  session   Thread handle
1618
2285
  @param  all   true <=> rollback main transaction.
1619
2286
*/
1620
 
void Session::markTransactionForRollback(bool all)
1621
 
{
1622
 
  is_fatal_sub_stmt_error= true;
1623
 
  transaction_rollback_request= all;
1624
 
}
1625
 
 
1626
 
void Session::disconnect(enum error_t errcode)
1627
 
{
1628
 
  /* Allow any plugins to cleanup their session variables */
1629
 
  plugin_sessionvar_cleanup(this);
1630
 
 
1631
 
  /* If necessary, log any aborted or unauthorized connections */
1632
 
  if (getKilled() || client->wasAborted())
1633
 
  {
1634
 
    status_var.aborted_threads++;
1635
 
  }
1636
 
 
1637
 
  if (client->wasAborted())
1638
 
  {
1639
 
    if (not getKilled() && variables.log_warnings > 1)
1640
 
    {
1641
 
      errmsg_printf(error::WARN, ER(ER_NEW_ABORTING_CONNECTION)
1642
 
                  , thread_id
1643
 
                  , (_schema->empty() ? "unconnected" : _schema->c_str())
1644
 
                  , security_ctx->username().empty() == false ? security_ctx->username().c_str() : "unauthenticated"
1645
 
                  , security_ctx->address().c_str()
1646
 
                  , (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1647
 
    }
1648
 
  }
1649
 
 
1650
 
  setKilled(Session::KILL_CONNECTION);
1651
 
 
1652
 
  if (client->isConnected())
1653
 
  {
1654
 
    if (errcode != EE_OK)
1655
 
    {
1656
 
      /*my_error(errcode, ER(errcode));*/
1657
 
      client->sendError(errcode, ER(errcode));
1658
 
    }
1659
 
    client->close();
1660
 
  }
1661
 
}
1662
 
 
1663
 
void Session::reset_for_next_command()
1664
 
{
1665
 
  free_list= 0;
1666
 
  select_number= 1;
1667
 
  /*
1668
 
    Those two lines below are theoretically unneeded as
1669
 
    Session::cleanup_after_query() should take care of this already.
1670
 
  */
1671
 
  auto_inc_intervals_in_cur_stmt_for_binlog.empty();
1672
 
 
1673
 
  is_fatal_error= false;
1674
 
  server_status&= ~ (SERVER_MORE_RESULTS_EXISTS |
1675
 
                          SERVER_QUERY_NO_INDEX_USED |
1676
 
                          SERVER_QUERY_NO_GOOD_INDEX_USED);
1677
 
 
1678
 
  clear_error();
1679
 
  main_da.reset_diagnostics_area();
1680
 
  total_warn_count=0;                   // Warnings for this query
1681
 
  sent_row_count= examined_row_count= 0;
1682
 
}
1683
 
 
1684
 
/*
1685
 
  Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
1686
 
*/
1687
 
 
1688
 
void Open_tables_state::close_temporary_tables()
1689
 
{
1690
 
  Table *table;
1691
 
  Table *tmp_next;
1692
 
 
1693
 
  if (not temporary_tables)
1694
 
    return;
1695
 
 
1696
 
  for (table= temporary_tables; table; table= tmp_next)
1697
 
  {
1698
 
    tmp_next= table->getNext();
1699
 
    nukeTable(table);
1700
 
  }
1701
 
  temporary_tables= NULL;
1702
 
}
1703
 
 
1704
 
/*
1705
 
  unlink from session->temporary tables and close temporary table
1706
 
*/
1707
 
 
1708
 
void Open_tables_state::close_temporary_table(Table *table)
1709
 
{
1710
 
  if (table->getPrev())
1711
 
  {
1712
 
    table->getPrev()->setNext(table->getNext());
1713
 
    if (table->getPrev()->getNext())
1714
 
    {
1715
 
      table->getNext()->setPrev(table->getPrev());
1716
 
    }
1717
 
  }
1718
 
  else
1719
 
  {
1720
 
    /* removing the item from the list */
1721
 
    assert(table == temporary_tables);
1722
 
    /*
1723
 
      slave must reset its temporary list pointer to zero to exclude
1724
 
      passing non-zero value to end_slave via rli->save_temporary_tables
1725
 
      when no temp tables opened, see an invariant below.
1726
 
    */
1727
 
    temporary_tables= table->getNext();
1728
 
    if (temporary_tables)
1729
 
    {
1730
 
      table->getNext()->setPrev(NULL);
1731
 
    }
1732
 
  }
1733
 
  nukeTable(table);
1734
 
}
1735
 
 
1736
 
/*
1737
 
  Close and drop a temporary table
1738
 
 
1739
 
  NOTE
1740
 
  This dosn't unlink table from session->temporary
1741
 
  If this is needed, use close_temporary_table()
1742
 
*/
1743
 
 
1744
 
void Open_tables_state::nukeTable(Table *table)
1745
 
{
1746
 
  plugin::StorageEngine *table_type= table->getShare()->db_type();
1747
 
 
1748
 
  table->free_io_cache();
1749
 
  table->delete_table();
1750
 
 
1751
 
  identifier::Table identifier(table->getShare()->getSchemaName(), table->getShare()->getTableName(), table->getShare()->getPath());
1752
 
  rm_temporary_table(table_type, identifier);
1753
 
 
1754
 
  boost::checked_delete(table->getMutableShare());
1755
 
 
1756
 
  boost::checked_delete(table);
1757
 
}
1758
 
 
1759
 
/** Clear most status variables. */
1760
 
extern time_t flush_status_time;
1761
 
 
1762
 
void Session::refresh_status()
1763
 
{
1764
 
  /* Reset thread's status variables */
1765
 
  memset(&status_var, 0, sizeof(status_var));
1766
 
 
1767
 
  flush_status_time= time((time_t*) 0);
1768
 
  current_global_counters.max_used_connections= 1; /* We set it to one, because we know we exist */
1769
 
  current_global_counters.connections= 0;
1770
 
}
1771
 
 
1772
 
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1773
 
{
1774
 
  return getVariable(std::string(name.str, name.length), create_if_not_exists);
1775
 
}
1776
 
 
1777
 
user_var_entry *Session::getVariable(const std::string  &name, bool create_if_not_exists)
1778
 
{
1779
 
  if (cleanup_done)
1780
 
    return NULL;
1781
 
 
1782
 
  UserVars::iterator iter= user_vars.find(name);
1783
 
  if (iter != user_vars.end())
1784
 
    return (*iter).second;
1785
 
 
1786
 
  if (not create_if_not_exists)
1787
 
    return NULL;
1788
 
 
1789
 
  user_var_entry *entry= NULL;
1790
 
  entry= new (nothrow) user_var_entry(name.c_str(), query_id);
1791
 
 
1792
 
  if (entry == NULL)
1793
 
    return NULL;
1794
 
 
1795
 
  std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(name, entry));
1796
 
 
1797
 
  if (not returnable.second)
1798
 
  {
1799
 
    boost::checked_delete(entry);
1800
 
  }
1801
 
 
1802
 
  return entry;
1803
 
}
1804
 
 
1805
 
void Session::setVariable(const std::string &name, const std::string &value)
1806
 
{
1807
 
  user_var_entry *updateable_var= getVariable(name.c_str(), true);
1808
 
  if (updateable_var)
1809
 
  {
1810
 
    updateable_var->update_hash(false,
1811
 
                                (void*)value.c_str(),
1812
 
                                static_cast<uint32_t>(value.length()), STRING_RESULT,
1813
 
                                &my_charset_bin,
1814
 
                                DERIVATION_IMPLICIT, false);
1815
 
  }
1816
 
}
1817
 
 
1818
 
void Open_tables_state::mark_temp_tables_as_free_for_reuse()
1819
 
{
1820
 
  for (Table *table= temporary_tables ; table ; table= table->getNext())
1821
 
  {
1822
 
    if (table->query_id == getQueryId())
1823
 
    {
1824
 
      table->query_id= 0;
1825
 
      table->cursor->ha_reset();
1826
 
    }
1827
 
  }
1828
 
}
1829
 
 
1830
 
void Session::mark_used_tables_as_free_for_reuse(Table *table)
1831
 
{
1832
 
  for (; table ; table= table->getNext())
1833
 
  {
1834
 
    if (table->query_id == getQueryId())
1835
 
    {
1836
 
      table->query_id= 0;
1837
 
      table->cursor->ha_reset();
1838
 
    }
1839
 
  }
1840
 
}
1841
 
 
1842
 
/*
1843
 
  Unlocks tables and frees derived tables.
1844
 
  Put all normal tables used by thread in free list.
1845
 
 
1846
 
  It will only close/mark as free for reuse tables opened by this
1847
 
  substatement, it will also check if we are closing tables after
1848
 
  execution of complete query (i.e. we are on upper level) and will
1849
 
  leave prelocked mode if needed.
1850
 
*/
1851
 
void Session::close_thread_tables()
1852
 
{
1853
 
  clearDerivedTables();
1854
 
 
1855
 
  /*
1856
 
    Mark all temporary tables used by this statement as free for reuse.
1857
 
  */
1858
 
  mark_temp_tables_as_free_for_reuse();
1859
 
  /*
1860
 
    Let us commit transaction for statement. Since in 5.0 we only have
1861
 
    one statement transaction and don't allow several nested statement
1862
 
    transactions this call will do nothing if we are inside of stored
1863
 
    function or trigger (i.e. statement transaction is already active and
1864
 
    does not belong to statement for which we do close_thread_tables()).
1865
 
    TODO: This should be fixed in later releases.
1866
 
   */
1867
 
  {
1868
 
    TransactionServices &transaction_services= TransactionServices::singleton();
1869
 
    main_da.can_overwrite_status= true;
1870
 
    transaction_services.autocommitOrRollback(*this, is_error());
1871
 
    main_da.can_overwrite_status= false;
1872
 
    transaction.stmt.reset();
1873
 
  }
1874
 
 
1875
 
  if (lock)
1876
 
  {
1877
 
    /*
1878
 
      For RBR we flush the pending event just before we unlock all the
1879
 
      tables.  This means that we are at the end of a topmost
1880
 
      statement, so we ensure that the STMT_END_F flag is set on the
1881
 
      pending event.  For statements that are *inside* stored
1882
 
      functions, the pending event will not be flushed: that will be
1883
 
      handled either before writing a query log event (inside
1884
 
      binlog_query()) or when preparing a pending event.
1885
 
     */
1886
 
    unlockTables(lock);
1887
 
    lock= 0;
1888
 
  }
1889
 
  /*
1890
 
    Note that we need to hold table::Cache::singleton().mutex() while changing the
1891
 
    open_tables list. Another thread may work on it.
1892
 
    (See: table::Cache::singleton().removeTable(), wait_completed_table())
1893
 
    Closing a MERGE child before the parent would be fatal if the
1894
 
    other thread tries to abort the MERGE lock in between.
1895
 
  */
1896
 
  if (open_tables)
1897
 
    close_open_tables();
1898
 
}
1899
 
 
1900
 
void Session::close_tables_for_reopen(TableList **tables)
1901
 
{
1902
 
  /*
1903
 
    If table list consists only from tables from prelocking set, table list
1904
 
    for new attempt should be empty, so we have to update list's root pointer.
1905
 
  */
1906
 
  if (lex->first_not_own_table() == *tables)
1907
 
    *tables= 0;
1908
 
  lex->chop_off_not_own_tables();
1909
 
  for (TableList *tmp= *tables; tmp; tmp= tmp->next_global)
1910
 
    tmp->table= 0;
1911
 
  close_thread_tables();
1912
 
}
1913
 
 
1914
 
bool Session::openTablesLock(TableList *tables)
1915
 
{
1916
 
  uint32_t counter;
1917
 
  bool need_reopen;
1918
 
 
1919
 
  for ( ; ; )
1920
 
  {
1921
 
    if (open_tables_from_list(&tables, &counter))
1922
 
      return true;
1923
 
 
1924
 
    if (not lock_tables(tables, counter, &need_reopen))
1925
 
      break;
1926
 
 
1927
 
    if (not need_reopen)
1928
 
      return true;
1929
 
 
1930
 
    close_tables_for_reopen(&tables);
1931
 
  }
1932
 
 
1933
 
  if ((handle_derived(lex, &derived_prepare) || (handle_derived(lex, &derived_filling))))
1934
 
    return true;
1935
 
 
1936
 
  return false;
1937
 
}
1938
 
 
1939
 
/*
1940
 
  @note "best_effort" is used in cases were if a failure occurred on this
1941
 
  operation it would not be surprising because we are only removing because there
1942
 
  might be an issue (lame engines).
1943
 
*/
1944
 
 
1945
 
bool Open_tables_state::rm_temporary_table(const identifier::Table &identifier, bool best_effort)
1946
 
{
1947
 
  if (not plugin::StorageEngine::dropTable(*static_cast<Session *>(this), identifier))
1948
 
  {
1949
 
    if (not best_effort)
1950
 
    {
1951
 
      std::string path;
1952
 
      identifier.getSQLPath(path);
1953
 
      errmsg_printf(error::WARN, _("Could not remove temporary table: '%s', error: %d"),
1954
 
                    path.c_str(), errno);
1955
 
    }
1956
 
 
1957
 
    return true;
1958
 
  }
1959
 
 
1960
 
  return false;
1961
 
}
1962
 
 
1963
 
bool Open_tables_state::rm_temporary_table(plugin::StorageEngine *base, const identifier::Table &identifier)
1964
 
{
1965
 
  drizzled::error_t error;
1966
 
  assert(base);
1967
 
 
1968
 
  if (not plugin::StorageEngine::dropTable(*static_cast<Session *>(this), *base, identifier, error))
1969
 
  {
1970
 
    std::string path;
1971
 
    identifier.getSQLPath(path);
1972
 
    errmsg_printf(error::WARN, _("Could not remove temporary table: '%s', error: %d"),
1973
 
                  path.c_str(), error);
1974
 
 
1975
 
    return true;
1976
 
  }
1977
 
 
1978
 
  return false;
1979
 
}
1980
 
 
1981
 
/**
1982
 
  @note this will be removed, I am looking through Hudson to see if it is finding
1983
 
  any tables that are missed during cleanup.
1984
 
*/
1985
 
void Open_tables_state::dumpTemporaryTableNames(const char *foo)
1986
 
{
1987
 
  Table *table;
1988
 
 
1989
 
  if (not temporary_tables)
1990
 
    return;
1991
 
 
1992
 
  cerr << "Begin Run: " << foo << "\n";
1993
 
  for (table= temporary_tables; table; table= table->getNext())
1994
 
  {
1995
 
    bool have_proto= false;
1996
 
 
1997
 
    message::Table *proto= table->getShare()->getTableMessage();
1998
 
    if (table->getShare()->getTableMessage())
1999
 
      have_proto= true;
2000
 
 
2001
 
    const char *answer= have_proto ? "true" : "false";
2002
 
 
2003
 
    if (have_proto)
2004
 
    {
2005
 
      cerr << "\tTable Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2006
 
      cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
2007
 
    }
2008
 
    else
2009
 
    {
2010
 
      cerr << "\tTabl;e Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2011
 
    }
2012
 
  }
2013
 
}
2014
 
 
2015
 
table::Singular *Session::getInstanceTable()
2016
 
{
2017
 
  temporary_shares.push_back(new table::Singular()); // This will not go into the tableshare cache, so no key is used.
2018
 
 
2019
 
  table::Singular *tmp_share= temporary_shares.back();
2020
 
 
2021
 
  assert(tmp_share);
2022
 
 
2023
 
  return tmp_share;
2024
 
}
2025
 
 
2026
 
 
2027
 
/**
2028
 
  Create a reduced Table object with properly set up Field list from a
2029
 
  list of field definitions.
2030
 
 
2031
 
    The created table doesn't have a table Cursor associated with
2032
 
    it, has no keys, no group/distinct, no copy_funcs array.
2033
 
    The sole purpose of this Table object is to use the power of Field
2034
 
    class to read/write data to/from table->getInsertRecord(). Then one can store
2035
 
    the record in any container (RB tree, hash, etc).
2036
 
    The table is created in Session mem_root, so are the table's fields.
2037
 
    Consequently, if you don't BLOB fields, you don't need to free it.
2038
 
 
2039
 
  @param session         connection handle
2040
 
  @param field_list  list of column definitions
2041
 
 
2042
 
  @return
2043
 
    0 if out of memory, Table object in case of success
2044
 
*/
2045
 
table::Singular *Session::getInstanceTable(List<CreateField> &field_list)
2046
 
{
2047
 
  temporary_shares.push_back(new table::Singular(this, field_list)); // This will not go into the tableshare cache, so no key is used.
2048
 
 
2049
 
  table::Singular *tmp_share= temporary_shares.back();
2050
 
 
2051
 
  assert(tmp_share);
2052
 
 
2053
 
  return tmp_share;
2054
 
}
2055
 
 
2056
 
namespace display  {
2057
 
 
2058
 
static const std::string NONE= "NONE";
2059
 
static const std::string GOT_GLOBAL_READ_LOCK= "HAS GLOBAL READ LOCK";
2060
 
static const std::string MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT= "HAS GLOBAL READ LOCK WITH BLOCKING COMMIT";
2061
 
 
2062
 
const std::string &type(drizzled::Session::global_read_lock_t type)
2063
 
{
2064
 
  switch (type) {
2065
 
    default:
2066
 
    case Session::NONE:
2067
 
      return NONE;
2068
 
    case Session::GOT_GLOBAL_READ_LOCK:
2069
 
      return GOT_GLOBAL_READ_LOCK;
2070
 
    case Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT:
2071
 
      return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT;
2072
 
  }
2073
 
}
2074
 
 
2075
 
size_t max_string_length(drizzled::Session::global_read_lock_t)
2076
 
{
2077
 
  return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT.size();
2078
 
}
2079
 
 
2080
 
} /* namespace display */
2081
 
 
2082
 
} /* namespace drizzled */
 
2287
 
 
2288
void mark_transaction_to_rollback(Session *session, bool all)
 
2289
{
 
2290
  if (session)
 
2291
  {
 
2292
    session->is_fatal_sub_stmt_error= true;
 
2293
    session->transaction_rollback_request= all;
 
2294
  }
 
2295
}
 
2296
/***************************************************************************
 
2297
  Handling of XA id cacheing
 
2298
***************************************************************************/
 
2299
 
 
2300
pthread_mutex_t LOCK_xid_cache;
 
2301
HASH xid_cache;
 
2302
 
 
2303
extern "C" unsigned char *xid_get_hash_key(const unsigned char *, size_t *, bool);
 
2304
extern "C" void xid_free_hash(void *);
 
2305
 
 
2306
unsigned char *xid_get_hash_key(const unsigned char *ptr, size_t *length,
 
2307
                        bool not_used __attribute__((unused)))
 
2308
{
 
2309
  *length=((XID_STATE*)ptr)->xid.key_length();
 
2310
  return ((XID_STATE*)ptr)->xid.key();
 
2311
}
 
2312
 
 
2313
void xid_free_hash(void *ptr)
 
2314
{
 
2315
  if (!((XID_STATE*)ptr)->in_session)
 
2316
    free((unsigned char*)ptr);
 
2317
}
 
2318
 
 
2319
bool xid_cache_init()
 
2320
{
 
2321
  pthread_mutex_init(&LOCK_xid_cache, MY_MUTEX_INIT_FAST);
 
2322
  return hash_init(&xid_cache, &my_charset_bin, 100, 0, 0,
 
2323
                   xid_get_hash_key, xid_free_hash, 0) != 0;
 
2324
}
 
2325
 
 
2326
void xid_cache_free()
 
2327
{
 
2328
  if (hash_inited(&xid_cache))
 
2329
  {
 
2330
    hash_free(&xid_cache);
 
2331
    pthread_mutex_destroy(&LOCK_xid_cache);
 
2332
  }
 
2333
}
 
2334
 
 
2335
XID_STATE *xid_cache_search(XID *xid)
 
2336
{
 
2337
  pthread_mutex_lock(&LOCK_xid_cache);
 
2338
  XID_STATE *res=(XID_STATE *)hash_search(&xid_cache, xid->key(), xid->key_length());
 
2339
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2340
  return res;
 
2341
}
 
2342
 
 
2343
 
 
2344
bool xid_cache_insert(XID *xid, enum xa_states xa_state)
 
2345
{
 
2346
  XID_STATE *xs;
 
2347
  bool res;
 
2348
  pthread_mutex_lock(&LOCK_xid_cache);
 
2349
  if (hash_search(&xid_cache, xid->key(), xid->key_length()))
 
2350
    res=0;
 
2351
  else if (!(xs=(XID_STATE *)my_malloc(sizeof(*xs), MYF(MY_WME))))
 
2352
    res=1;
 
2353
  else
 
2354
  {
 
2355
    xs->xa_state=xa_state;
 
2356
    xs->xid.set(xid);
 
2357
    xs->in_session=0;
 
2358
    res=my_hash_insert(&xid_cache, (unsigned char*)xs);
 
2359
  }
 
2360
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2361
  return res;
 
2362
}
 
2363
 
 
2364
 
 
2365
bool xid_cache_insert(XID_STATE *xid_state)
 
2366
{
 
2367
  pthread_mutex_lock(&LOCK_xid_cache);
 
2368
  assert(hash_search(&xid_cache, xid_state->xid.key(),
 
2369
                          xid_state->xid.key_length())==0);
 
2370
  bool res=my_hash_insert(&xid_cache, (unsigned char*)xid_state);
 
2371
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2372
  return res;
 
2373
}
 
2374
 
 
2375
 
 
2376
void xid_cache_delete(XID_STATE *xid_state)
 
2377
{
 
2378
  pthread_mutex_lock(&LOCK_xid_cache);
 
2379
  hash_delete(&xid_cache, (unsigned char *)xid_state);
 
2380
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2381
}
 
2382
 
 
2383
/*
 
2384
  Implementation of interface to write rows to the binary log through the
 
2385
  thread.  The thread is responsible for writing the rows it has
 
2386
  inserted/updated/deleted.
 
2387
*/
 
2388
 
 
2389
 
 
2390
/*
 
2391
  Template member function for ensuring that there is an rows log
 
2392
  event of the apropriate type before proceeding.
 
2393
 
 
2394
  PRE CONDITION:
 
2395
    - Events of type 'RowEventT' have the type code 'type_code'.
 
2396
    
 
2397
  POST CONDITION:
 
2398
    If a non-NULL pointer is returned, the pending event for thread 'session' will
 
2399
    be an event of type 'RowEventT' (which have the type code 'type_code')
 
2400
    will either empty or have enough space to hold 'needed' bytes.  In
 
2401
    addition, the columns bitmap will be correct for the row, meaning that
 
2402
    the pending event will be flushed if the columns in the event differ from
 
2403
    the columns suppled to the function.
 
2404
 
 
2405
  RETURNS
 
2406
    If no error, a non-NULL pending event (either one which already existed or
 
2407
    the newly created one).
 
2408
    If error, NULL.
 
2409
 */
 
2410
 
 
2411
template <class RowsEventT> Rows_log_event* 
 
2412
Session::binlog_prepare_pending_rows_event(Table* table, uint32_t serv_id,
 
2413
                                       size_t needed,
 
2414
                                       bool is_transactional,
 
2415
                                       RowsEventT *hint __attribute__((unused)))
 
2416
{
 
2417
  /* Pre-conditions */
 
2418
  assert(table->s->table_map_id != UINT32_MAX);
 
2419
 
 
2420
  /* Fetch the type code for the RowsEventT template parameter */
 
2421
  int const type_code= RowsEventT::TYPE_CODE;
 
2422
 
 
2423
  /*
 
2424
    There is no good place to set up the transactional data, so we
 
2425
    have to do it here.
 
2426
  */
 
2427
  if (binlog_setup_trx_data())
 
2428
    return(NULL);
 
2429
 
 
2430
  Rows_log_event* pending= binlog_get_pending_rows_event();
 
2431
 
 
2432
  if (unlikely(pending && !pending->is_valid()))
 
2433
    return(NULL);
 
2434
 
 
2435
  /*
 
2436
    Check if the current event is non-NULL and a write-rows
 
2437
    event. Also check if the table provided is mapped: if it is not,
 
2438
    then we have switched to writing to a new table.
 
2439
    If there is no pending event, we need to create one. If there is a pending
 
2440
    event, but it's not about the same table id, or not of the same type
 
2441
    (between Write, Update and Delete), or not the same affected columns, or
 
2442
    going to be too big, flush this event to disk and create a new pending
 
2443
    event.
 
2444
 
 
2445
    The last test is necessary for the Cluster injector to work
 
2446
    correctly. The reason is that the Cluster can inject two write
 
2447
    rows with different column bitmaps if there is an insert followed
 
2448
    by an update in the same transaction, and these are grouped into a
 
2449
    single epoch/transaction when fed to the injector.
 
2450
 
 
2451
    TODO: Fix the code so that the last test can be removed.
 
2452
  */
 
2453
  if (!pending ||
 
2454
      pending->server_id != serv_id || 
 
2455
      pending->get_table_id() != table->s->table_map_id ||
 
2456
      pending->get_type_code() != type_code || 
 
2457
      pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
 
2458
      !bitmap_cmp(pending->get_cols(), table->write_set))
 
2459
    {
 
2460
    /* Create a new RowsEventT... */
 
2461
    Rows_log_event* const
 
2462
        ev= new RowsEventT(this, table, table->s->table_map_id,
 
2463
                           is_transactional);
 
2464
    if (unlikely(!ev))
 
2465
      return(NULL);
 
2466
    ev->server_id= serv_id; // I don't like this, it's too easy to forget.
 
2467
    /*
 
2468
      flush the pending event and replace it with the newly created
 
2469
      event...
 
2470
    */
 
2471
    if (unlikely(mysql_bin_log.flush_and_set_pending_rows_event(this, ev)))
 
2472
    {
 
2473
      delete ev;
 
2474
      return(NULL);
 
2475
    }
 
2476
 
 
2477
    return(ev);               /* This is the new pending event */
 
2478
  }
 
2479
  return(pending);        /* This is the current pending event */
 
2480
}
 
2481
 
 
2482
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
2483
/*
 
2484
  Instantiate the versions we need, we have -fno-implicit-template as
 
2485
  compiling option.
 
2486
*/
 
2487
template Rows_log_event*
 
2488
Session::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
 
2489
                                       Write_rows_log_event*);
 
2490
 
 
2491
template Rows_log_event*
 
2492
Session::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
 
2493
                                       Delete_rows_log_event *);
 
2494
 
 
2495
template Rows_log_event* 
 
2496
Session::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
 
2497
                                       Update_rows_log_event *);
 
2498
#endif
 
2499
 
 
2500
namespace {
 
2501
  /**
 
2502
     Class to handle temporary allocation of memory for row data.
 
2503
 
 
2504
     The responsibilities of the class is to provide memory for
 
2505
     packing one or two rows of packed data (depending on what
 
2506
     constructor is called).
 
2507
 
 
2508
     In order to make the allocation more efficient for "simple" rows,
 
2509
     i.e., rows that do not contain any blobs, a pointer to the
 
2510
     allocated memory is of memory is stored in the table structure
 
2511
     for simple rows.  If memory for a table containing a blob field
 
2512
     is requested, only memory for that is allocated, and subsequently
 
2513
     released when the object is destroyed.
 
2514
 
 
2515
   */
 
2516
  class Row_data_memory {
 
2517
  public:
 
2518
    /**
 
2519
      Build an object to keep track of a block-local piece of memory
 
2520
      for storing a row of data.
 
2521
 
 
2522
      @param table
 
2523
      Table where the pre-allocated memory is stored.
 
2524
 
 
2525
      @param length
 
2526
      Length of data that is needed, if the record contain blobs.
 
2527
     */
 
2528
    Row_data_memory(Table *table, size_t const len1)
 
2529
      : m_memory(0)
 
2530
    {
 
2531
      m_alloc_checked= false;
 
2532
      allocate_memory(table, len1);
 
2533
      m_ptr[0]= has_memory() ? m_memory : 0;
 
2534
      m_ptr[1]= 0;
 
2535
    }
 
2536
 
 
2537
    Row_data_memory(Table *table, size_t const len1, size_t const len2)
 
2538
      : m_memory(0)
 
2539
    {
 
2540
      m_alloc_checked= false;
 
2541
      allocate_memory(table, len1 + len2);
 
2542
      m_ptr[0]= has_memory() ? m_memory        : 0;
 
2543
      m_ptr[1]= has_memory() ? m_memory + len1 : 0;
 
2544
    }
 
2545
 
 
2546
    ~Row_data_memory()
 
2547
    {
 
2548
      if (m_memory != 0 && m_release_memory_on_destruction)
 
2549
        free((unsigned char*) m_memory);
 
2550
    }
 
2551
 
 
2552
    /**
 
2553
       Is there memory allocated?
 
2554
 
 
2555
       @retval true There is memory allocated
 
2556
       @retval false Memory allocation failed
 
2557
     */
 
2558
    bool has_memory() const {
 
2559
      m_alloc_checked= true;
 
2560
      return m_memory != 0;
 
2561
    }
 
2562
 
 
2563
    unsigned char *slot(uint32_t s)
 
2564
    {
 
2565
      assert(s < sizeof(m_ptr)/sizeof(*m_ptr));
 
2566
      assert(m_ptr[s] != 0);
 
2567
      assert(m_alloc_checked == true);
 
2568
      return m_ptr[s];
 
2569
    }
 
2570
 
 
2571
  private:
 
2572
    void allocate_memory(Table *const table, size_t const total_length)
 
2573
    {
 
2574
      if (table->s->blob_fields == 0)
 
2575
      {
 
2576
        /*
 
2577
          The maximum length of a packed record is less than this
 
2578
          length. We use this value instead of the supplied length
 
2579
          when allocating memory for records, since we don't know how
 
2580
          the memory will be used in future allocations.
 
2581
 
 
2582
          Since table->s->reclength is for unpacked records, we have
 
2583
          to add two bytes for each field, which can potentially be
 
2584
          added to hold the length of a packed field.
 
2585
        */
 
2586
        size_t const maxlen= table->s->reclength + 2 * table->s->fields;
 
2587
 
 
2588
        /*
 
2589
          Allocate memory for two records if memory hasn't been
 
2590
          allocated. We allocate memory for two records so that it can
 
2591
          be used when processing update rows as well.
 
2592
        */
 
2593
        if (table->write_row_record == 0)
 
2594
          table->write_row_record=
 
2595
            (unsigned char *) alloc_root(&table->mem_root, 2 * maxlen);
 
2596
        m_memory= table->write_row_record;
 
2597
        m_release_memory_on_destruction= false;
 
2598
      }
 
2599
      else
 
2600
      {
 
2601
        m_memory= (unsigned char *) my_malloc(total_length, MYF(MY_WME));
 
2602
        m_release_memory_on_destruction= true;
 
2603
      }
 
2604
    }
 
2605
 
 
2606
    mutable bool m_alloc_checked;
 
2607
    bool m_release_memory_on_destruction;
 
2608
    unsigned char *m_memory;
 
2609
    unsigned char *m_ptr[2];
 
2610
  };
 
2611
}
 
2612
 
 
2613
 
 
2614
int Session::binlog_write_row(Table* table, bool is_trans, 
 
2615
                          unsigned char const *record) 
 
2616
 
2617
  assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
 
2618
 
 
2619
  /*
 
2620
    Pack records into format for transfer. We are allocating more
 
2621
    memory than needed, but that doesn't matter.
 
2622
  */
 
2623
  Row_data_memory memory(table, table->max_row_length(record));
 
2624
  if (!memory.has_memory())
 
2625
    return HA_ERR_OUT_OF_MEM;
 
2626
 
 
2627
  unsigned char *row_data= memory.slot(0);
 
2628
 
 
2629
  size_t const len= pack_row(table, table->write_set, row_data, record);
 
2630
 
 
2631
  Rows_log_event* const ev=
 
2632
    binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
 
2633
                                      static_cast<Write_rows_log_event*>(0));
 
2634
 
 
2635
  if (unlikely(ev == 0))
 
2636
    return HA_ERR_OUT_OF_MEM;
 
2637
 
 
2638
  return ev->add_row_data(row_data, len);
 
2639
}
 
2640
 
 
2641
int Session::binlog_update_row(Table* table, bool is_trans,
 
2642
                           const unsigned char *before_record,
 
2643
                           const unsigned char *after_record)
 
2644
 
2645
  assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
 
2646
 
 
2647
  size_t const before_maxlen = table->max_row_length(before_record);
 
2648
  size_t const after_maxlen  = table->max_row_length(after_record);
 
2649
 
 
2650
  Row_data_memory row_data(table, before_maxlen, after_maxlen);
 
2651
  if (!row_data.has_memory())
 
2652
    return HA_ERR_OUT_OF_MEM;
 
2653
 
 
2654
  unsigned char *before_row= row_data.slot(0);
 
2655
  unsigned char *after_row= row_data.slot(1);
 
2656
 
 
2657
  size_t const before_size= pack_row(table, table->read_set, before_row,
 
2658
                                        before_record);
 
2659
  size_t const after_size= pack_row(table, table->write_set, after_row,
 
2660
                                       after_record);
 
2661
 
 
2662
  Rows_log_event* const ev=
 
2663
    binlog_prepare_pending_rows_event(table, server_id,
 
2664
                                      before_size + after_size, is_trans,
 
2665
                                      static_cast<Update_rows_log_event*>(0));
 
2666
 
 
2667
  if (unlikely(ev == 0))
 
2668
    return HA_ERR_OUT_OF_MEM;
 
2669
 
 
2670
  return
 
2671
    ev->add_row_data(before_row, before_size) ||
 
2672
    ev->add_row_data(after_row, after_size);
 
2673
}
 
2674
 
 
2675
int Session::binlog_delete_row(Table* table, bool is_trans, 
 
2676
                           unsigned char const *record)
 
2677
 
2678
  assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
 
2679
 
 
2680
  /* 
 
2681
     Pack records into format for transfer. We are allocating more
 
2682
     memory than needed, but that doesn't matter.
 
2683
  */
 
2684
  Row_data_memory memory(table, table->max_row_length(record));
 
2685
  if (unlikely(!memory.has_memory()))
 
2686
    return HA_ERR_OUT_OF_MEM;
 
2687
 
 
2688
  unsigned char *row_data= memory.slot(0);
 
2689
 
 
2690
  size_t const len= pack_row(table, table->read_set, row_data, record);
 
2691
 
 
2692
  Rows_log_event* const ev=
 
2693
    binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
 
2694
                                      static_cast<Delete_rows_log_event*>(0));
 
2695
 
 
2696
  if (unlikely(ev == 0))
 
2697
    return HA_ERR_OUT_OF_MEM;
 
2698
 
 
2699
  return ev->add_row_data(row_data, len);
 
2700
}
 
2701
 
 
2702
 
 
2703
int Session::binlog_flush_pending_rows_event(bool stmt_end)
 
2704
{
 
2705
  /*
 
2706
    We shall flush the pending event even if we are not in row-based
 
2707
    mode: it might be the case that we left row-based mode before
 
2708
    flushing anything (e.g., if we have explicitly locked tables).
 
2709
   */
 
2710
  if (!mysql_bin_log.is_open())
 
2711
    return(0);
 
2712
 
 
2713
  /*
 
2714
    Mark the event as the last event of a statement if the stmt_end
 
2715
    flag is set.
 
2716
  */
 
2717
  int error= 0;
 
2718
  if (Rows_log_event *pending= binlog_get_pending_rows_event())
 
2719
  {
 
2720
    if (stmt_end)
 
2721
    {
 
2722
      pending->set_flags(Rows_log_event::STMT_END_F);
 
2723
      pending->flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
 
2724
      binlog_table_maps= 0;
 
2725
    }
 
2726
 
 
2727
    error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0);
 
2728
  }
 
2729
 
 
2730
  return(error);
 
2731
}
 
2732
 
 
2733
 
 
2734
/*
 
2735
  Member function that will log query, either row-based or
 
2736
  statement-based depending on the value of the 'current_stmt_binlog_row_based'
 
2737
  the value of the 'qtype' flag.
 
2738
 
 
2739
  This function should be called after the all calls to ha_*_row()
 
2740
  functions have been issued, but before tables are unlocked and
 
2741
  closed.
 
2742
 
 
2743
  OBSERVE
 
2744
    There shall be no writes to any system table after calling
 
2745
    binlog_query(), so these writes has to be moved to before the call
 
2746
    of binlog_query() for correct functioning.
 
2747
 
 
2748
    This is necessesary not only for RBR, but the master might crash
 
2749
    after binlogging the query but before changing the system tables.
 
2750
    This means that the slave and the master are not in the same state
 
2751
    (after the master has restarted), so therefore we have to
 
2752
    eliminate this problem.
 
2753
 
 
2754
  RETURN VALUE
 
2755
    Error code, or 0 if no error.
 
2756
*/
 
2757
int Session::binlog_query(Session::enum_binlog_query_type qtype, char const *query_arg,
 
2758
                      ulong query_len, bool is_trans, bool suppress_use,
 
2759
                      Session::killed_state killed_status_arg)
 
2760
{
 
2761
  assert(query_arg && mysql_bin_log.is_open());
 
2762
 
 
2763
  if (int error= binlog_flush_pending_rows_event(true))
 
2764
    return(error);
 
2765
 
 
2766
  /*
 
2767
    If we are in statement mode and trying to log an unsafe statement,
 
2768
    we should print a warning.
 
2769
  */
 
2770
  if (lex->is_stmt_unsafe() &&
 
2771
      variables.binlog_format == BINLOG_FORMAT_STMT)
 
2772
  {
 
2773
    assert(this->query != NULL);
 
2774
    push_warning(this, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
2775
                 ER_BINLOG_UNSAFE_STATEMENT,
 
2776
                 ER(ER_BINLOG_UNSAFE_STATEMENT));
 
2777
    if (!(binlog_flags & BINLOG_FLAG_UNSAFE_STMT_PRINTED))
 
2778
    {
 
2779
      char warn_buf[DRIZZLE_ERRMSG_SIZE];
 
2780
      snprintf(warn_buf, DRIZZLE_ERRMSG_SIZE, "%s Statement: %s",
 
2781
               ER(ER_BINLOG_UNSAFE_STATEMENT), this->query);
 
2782
      sql_print_warning("%s",warn_buf);
 
2783
      binlog_flags|= BINLOG_FLAG_UNSAFE_STMT_PRINTED;
 
2784
    }
 
2785
  }
 
2786
 
 
2787
  switch (qtype) {
 
2788
  case Session::ROW_QUERY_TYPE:
 
2789
    if (current_stmt_binlog_row_based)
 
2790
      return(0);
 
2791
    /* Otherwise, we fall through */
 
2792
  case Session::DRIZZLE_QUERY_TYPE:
 
2793
    /*
 
2794
      Using this query type is a conveniece hack, since we have been
 
2795
      moving back and forth between using RBR for replication of
 
2796
      system tables and not using it.
 
2797
 
 
2798
      Make sure to change in check_table_binlog_row_based() according
 
2799
      to how you treat this.
 
2800
    */
 
2801
  case Session::STMT_QUERY_TYPE:
 
2802
    /*
 
2803
      The DRIZZLE_LOG::write() function will set the STMT_END_F flag and
 
2804
      flush the pending rows event if necessary.
 
2805
     */
 
2806
    {
 
2807
      Query_log_event qinfo(this, query_arg, query_len, is_trans, suppress_use,
 
2808
                            killed_status_arg);
 
2809
      qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
 
2810
      /*
 
2811
        Binlog table maps will be irrelevant after a Query_log_event
 
2812
        (they are just removed on the slave side) so after the query
 
2813
        log event is written to the binary log, we pretend that no
 
2814
        table maps were written.
 
2815
       */
 
2816
      int error= mysql_bin_log.write(&qinfo);
 
2817
      binlog_table_maps= 0;
 
2818
      return(error);
 
2819
    }
 
2820
    break;
 
2821
 
 
2822
  case Session::QUERY_TYPE_COUNT:
 
2823
  default:
 
2824
    assert(0 <= qtype && qtype < QUERY_TYPE_COUNT);
 
2825
  }
 
2826
  return(0);
 
2827
}
 
2828
 
 
2829
bool Discrete_intervals_list::append(uint64_t start, uint64_t val,
 
2830
                                 uint64_t incr)
 
2831
{
 
2832
  /* first, see if this can be merged with previous */
 
2833
  if ((head == NULL) || tail->merge_if_contiguous(start, val, incr))
 
2834
  {
 
2835
    /* it cannot, so need to add a new interval */
 
2836
    Discrete_interval *new_interval= new Discrete_interval(start, val, incr);
 
2837
    return(append(new_interval));
 
2838
  }
 
2839
  return(0);
 
2840
}
 
2841
 
 
2842
bool Discrete_intervals_list::append(Discrete_interval *new_interval)
 
2843
{
 
2844
  if (unlikely(new_interval == NULL))
 
2845
    return(1);
 
2846
  if (head == NULL)
 
2847
    head= current= new_interval;
 
2848
  else
 
2849
    tail->next= new_interval;
 
2850
  tail= new_interval;
 
2851
  elements++;
 
2852
  return(0);
 
2853
}