~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_class.cc

  • Committer: Monty Taylor
  • Date: 2008-09-16 00:00:48 UTC
  • mto: This revision was merged to the branch mainline in revision 391.
  • Revision ID: monty@inaugust.com-20080916000048-3rvrv3gv9l0ad3gs
Fixed copyright headers in drizzled/

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2008 Sun Microsystems, Inc.
5
 
 *
6
 
 *  This program is free software; you can redistribute it and/or modify
7
 
 *  it under the terms of the GNU General Public License as published by
8
 
 *  the Free Software Foundation; version 2 of the License.
9
 
 *
10
 
 *  This program is distributed in the hope that it will be useful,
11
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 *  GNU General Public License for more details.
14
 
 *
15
 
 *  You should have received a copy of the GNU General Public License
16
 
 *  along with this program; if not, write to the Free Software
17
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
 
 */
19
 
 
20
 
/**
21
 
 * @file Implementation of the Session class and API
22
 
 */
23
 
 
24
 
#include "config.h"
25
 
 
26
 
#include <drizzled/copy_field.h>
27
 
#include "drizzled/session.h"
28
 
#include "drizzled/session/cache.h"
29
 
#include "drizzled/error.h"
30
 
#include "drizzled/gettext.h"
31
 
#include "drizzled/query_id.h"
32
 
#include "drizzled/data_home.h"
33
 
#include "drizzled/sql_base.h"
34
 
#include "drizzled/lock.h"
35
 
#include "drizzled/item/cache.h"
36
 
#include "drizzled/item/float.h"
37
 
#include "drizzled/item/return_int.h"
38
 
#include "drizzled/item/empty_string.h"
39
 
#include "drizzled/show.h"
40
 
#include "drizzled/plugin/client.h"
41
 
#include "drizzled/plugin/scheduler.h"
42
 
#include "drizzled/plugin/authentication.h"
43
 
#include "drizzled/plugin/logging.h"
44
 
#include "drizzled/plugin/transactional_storage_engine.h"
45
 
#include "drizzled/plugin/query_rewrite.h"
46
 
#include "drizzled/probes.h"
47
 
#include "drizzled/table_proto.h"
48
 
#include "drizzled/db.h"
49
 
#include "drizzled/pthread_globals.h"
50
 
#include "drizzled/transaction_services.h"
51
 
#include "drizzled/drizzled.h"
52
 
 
53
 
#include "drizzled/identifier.h"
54
 
 
55
 
#include <drizzled/refresh_version.h>
56
 
 
57
 
#include "drizzled/table/singular.h"
58
 
 
59
 
#include "plugin/myisam/myisam.h"
60
 
#include "drizzled/internal/iocache.h"
61
 
#include "drizzled/internal/thread_var.h"
62
 
#include "drizzled/plugin/event_observer.h"
63
 
 
64
 
#include "drizzled/util/functors.h"
65
 
 
66
 
#include "drizzled/display.h"
67
 
 
68
 
#include <algorithm>
69
 
#include <climits>
70
 
#include <fcntl.h>
 
1
/* Copyright (C) 2000-2006 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
/*****************************************************************************
 
18
**
 
19
** This file implements classes defined in sql_class.h
 
20
** Especially the classes to handle a result from a select
 
21
**
 
22
*****************************************************************************/
 
23
#include <drizzled/server_includes.h>
 
24
#include "rpl_rli.h"
 
25
#include "rpl_record.h"
 
26
#include "log_event.h"
71
27
#include <sys/stat.h>
72
 
 
73
 
#include <boost/filesystem.hpp>
74
 
#include <boost/checked_delete.hpp>
75
 
 
76
 
#include "drizzled/util/backtrace.h"
77
 
 
78
 
using namespace std;
79
 
 
80
 
namespace fs=boost::filesystem;
81
 
namespace drizzled
82
 
{
 
28
#include <mysys/thr_alarm.h>
 
29
#include <mysys/mysys_err.h>
 
30
#include <drizzled/drizzled_error_messages.h>
83
31
 
84
32
/*
85
33
  The following is used to initialise Table_ident with a internal
88
36
char internal_table_name[2]= "*";
89
37
char empty_c_string[1]= {0};    /* used for not defined db */
90
38
 
91
 
const char * const Session::DEFAULT_WHERE= "field list";
 
39
const char * const THD::DEFAULT_WHERE= "field list";
 
40
 
 
41
 
 
42
/*****************************************************************************
 
43
** Instansiate templates
 
44
*****************************************************************************/
 
45
 
 
46
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
47
/* Used templates */
 
48
template class List<Key>;
 
49
template class List_iterator<Key>;
 
50
template class List<Key_part_spec>;
 
51
template class List_iterator<Key_part_spec>;
 
52
template class List<Alter_drop>;
 
53
template class List_iterator<Alter_drop>;
 
54
template class List<Alter_column>;
 
55
template class List_iterator<Alter_column>;
 
56
#endif
 
57
 
 
58
/****************************************************************************
 
59
** User variables
 
60
****************************************************************************/
 
61
 
 
62
extern "C" uchar *get_var_key(user_var_entry *entry, size_t *length,
 
63
                              bool not_used __attribute__((unused)))
 
64
{
 
65
  *length= entry->name.length;
 
66
  return (uchar*) entry->name.str;
 
67
}
 
68
 
 
69
extern "C" void free_user_var(user_var_entry *entry)
 
70
{
 
71
  char *pos= (char*) entry+ALIGN_SIZE(sizeof(*entry));
 
72
  if (entry->value && entry->value != pos)
 
73
    my_free(entry->value, MYF(0));
 
74
  my_free((char*) entry,MYF(0));
 
75
}
92
76
 
93
77
bool Key_part_spec::operator==(const Key_part_spec& other) const
94
78
{
95
79
  return length == other.length &&
96
80
         field_name.length == other.field_name.length &&
97
 
    !my_strcasecmp(system_charset_info, field_name.str, other.field_name.str);
98
 
}
99
 
 
100
 
Open_tables_state::Open_tables_state(uint64_t version_arg) :
101
 
  version(version_arg)
102
 
{
103
 
  open_tables= temporary_tables= derived_tables= NULL;
104
 
  extra_lock= lock= NULL;
 
81
         !strcmp(field_name.str, other.field_name.str);
 
82
}
 
83
 
 
84
/**
 
85
  Construct an (almost) deep copy of this key. Only those
 
86
  elements that are known to never change are not copied.
 
87
  If out of memory, a partial copy is returned and an error is set
 
88
  in THD.
 
89
*/
 
90
 
 
91
Key::Key(const Key &rhs, MEM_ROOT *mem_root)
 
92
  :type(rhs.type),
 
93
  key_create_info(rhs.key_create_info),
 
94
  columns(rhs.columns, mem_root),
 
95
  name(rhs.name),
 
96
  generated(rhs.generated)
 
97
{
 
98
  list_copy_and_replace_each_value(columns, mem_root);
 
99
}
 
100
 
 
101
/**
 
102
  Construct an (almost) deep copy of this foreign key. Only those
 
103
  elements that are known to never change are not copied.
 
104
  If out of memory, a partial copy is returned and an error is set
 
105
  in THD.
 
106
*/
 
107
 
 
108
Foreign_key::Foreign_key(const Foreign_key &rhs, MEM_ROOT *mem_root)
 
109
  :Key(rhs),
 
110
  ref_table(rhs.ref_table),
 
111
  ref_columns(rhs.ref_columns),
 
112
  delete_opt(rhs.delete_opt),
 
113
  update_opt(rhs.update_opt),
 
114
  match_opt(rhs.match_opt)
 
115
{
 
116
  list_copy_and_replace_each_value(ref_columns, mem_root);
 
117
}
 
118
 
 
119
/*
 
120
  Test if a foreign key (= generated key) is a prefix of the given key
 
121
  (ignoring key name, key type and order of columns)
 
122
 
 
123
  NOTES:
 
124
    This is only used to test if an index for a FOREIGN KEY exists
 
125
 
 
126
  IMPLEMENTATION
 
127
    We only compare field names
 
128
 
 
129
  RETURN
 
130
    0   Generated key is a prefix of other key
 
131
    1   Not equal
 
132
*/
 
133
 
 
134
bool foreign_key_prefix(Key *a, Key *b)
 
135
{
 
136
  /* Ensure that 'a' is the generated key */
 
137
  if (a->generated)
 
138
  {
 
139
    if (b->generated && a->columns.elements > b->columns.elements)
 
140
      std::swap(a, b);                       // Put shorter key in 'a'
 
141
  }
 
142
  else
 
143
  {
 
144
    if (!b->generated)
 
145
      return true;                              // No foreign key
 
146
    std::swap(a, b);                       // Put generated key in 'a'
 
147
  }
 
148
 
 
149
  /* Test if 'a' is a prefix of 'b' */
 
150
  if (a->columns.elements > b->columns.elements)
 
151
    return true;                                // Can't be prefix
 
152
 
 
153
  List_iterator<Key_part_spec> col_it1(a->columns);
 
154
  List_iterator<Key_part_spec> col_it2(b->columns);
 
155
  const Key_part_spec *col1, *col2;
 
156
 
 
157
#ifdef ENABLE_WHEN_INNODB_CAN_HANDLE_SWAPED_FOREIGN_KEY_COLUMNS
 
158
  while ((col1= col_it1++))
 
159
  {
 
160
    bool found= 0;
 
161
    col_it2.rewind();
 
162
    while ((col2= col_it2++))
 
163
    {
 
164
      if (*col1 == *col2)
 
165
      {
 
166
        found= true;
 
167
        break;
 
168
      }
 
169
    }
 
170
    if (!found)
 
171
      return true;                              // Error
 
172
  }
 
173
  return false;                                 // Is prefix
 
174
#else
 
175
  while ((col1= col_it1++))
 
176
  {
 
177
    col2= col_it2++;
 
178
    if (!(*col1 == *col2))
 
179
      return true;
 
180
  }
 
181
  return false;                                 // Is prefix
 
182
#endif
 
183
}
 
184
 
 
185
 
 
186
/****************************************************************************
 
187
** Thread specific functions
 
188
****************************************************************************/
 
189
 
 
190
Open_tables_state::Open_tables_state(ulong version_arg)
 
191
  :version(version_arg), state_flags(0U)
 
192
{
 
193
  reset_open_tables_state();
105
194
}
106
195
 
107
196
/*
108
197
  The following functions form part of the C plugin API
109
198
*/
110
 
int tmpfile(const char *prefix)
 
199
 
 
200
extern "C" int mysql_tmpfile(const char *prefix)
111
201
{
112
202
  char filename[FN_REFLEN];
113
 
  int fd = internal::create_temp_file(filename, drizzle_tmpdir.c_str(), prefix, MYF(MY_WME));
 
203
  File fd = create_temp_file(filename, mysql_tmpdir, prefix,
 
204
                             O_CREAT | O_EXCL | O_RDWR | O_TEMPORARY,
 
205
                             MYF(MY_WME));
114
206
  if (fd >= 0) {
 
207
    /*
 
208
      This can be removed once the following bug is fixed:
 
209
      Bug #28903  create_temp_file() doesn't honor O_TEMPORARY option
 
210
                  (file not removed) (Unix)
 
211
    */
115
212
    unlink(filename);
116
213
  }
117
214
 
118
215
  return fd;
119
216
}
120
217
 
121
 
void **Session::getEngineData(const plugin::MonitoredInTransaction *monitored)
122
 
{
123
 
  return static_cast<void **>(&ha_data[monitored->getId()].ha_ptr);
124
 
}
125
 
 
126
 
ResourceContext *Session::getResourceContext(const plugin::MonitoredInTransaction *monitored,
127
 
                                             size_t index)
128
 
{
129
 
  return &ha_data[monitored->getId()].resource_context[index];
130
 
}
131
 
 
132
 
int64_t session_test_options(const Session *session, int64_t test_options)
133
 
{
134
 
  return session->options & test_options;
135
 
}
136
 
 
137
 
Session::Session(plugin::Client *client_arg, catalog::Instance::shared_ptr catalog_arg) :
138
 
  Open_tables_state(refresh_version),
139
 
  mem_root(&main_mem_root),
140
 
  xa_id(0),
141
 
  lex(&main_lex),
142
 
  query(new std::string),
143
 
  _schema(new std::string("")),
144
 
  client(client_arg),
145
 
  scheduler(NULL),
146
 
  scheduler_arg(NULL),
147
 
  lock_id(&main_lock_id),
148
 
  thread_stack(NULL),
149
 
  security_ctx(identifier::User::make_shared()),
150
 
  _where(Session::DEFAULT_WHERE),
151
 
  dbug_sentry(Session_SENTRY_MAGIC),
152
 
  mysys_var(0),
153
 
  command(COM_CONNECT),
154
 
  file_id(0),
155
 
  _epoch(boost::gregorian::date(1970,1,1)),
156
 
  _connect_time(boost::posix_time::microsec_clock::universal_time()),
157
 
  utime_after_lock(0),
158
 
  ha_data(plugin::num_trx_monitored_objects),
159
 
  query_id(0),
160
 
  warn_query_id(0),
161
 
  concurrent_execute_allowed(true),
162
 
  arg_of_last_insert_id_function(false),
163
 
  first_successful_insert_id_in_prev_stmt(0),
164
 
  first_successful_insert_id_in_cur_stmt(0),
165
 
  limit_found_rows(0),
166
 
  options(session_startup_options),
167
 
  row_count_func(-1),
168
 
  sent_row_count(0),
169
 
  examined_row_count(0),
170
 
  used_tables(0),
171
 
  total_warn_count(0),
172
 
  col_access(0),
173
 
  statement_id_counter(0),
174
 
  row_count(0),
175
 
  thread_id(0),
176
 
  tmp_table(0),
177
 
  _global_read_lock(NONE),
178
 
  count_cuted_fields(CHECK_FIELD_ERROR_FOR_NULL),
179
 
  _killed(NOT_KILLED),
180
 
  some_tables_deleted(false),
181
 
  no_errors(false),
182
 
  password(false),
183
 
  is_fatal_error(false),
184
 
  transaction_rollback_request(false),
185
 
  is_fatal_sub_stmt_error(0),
186
 
  tablespace_op(false),
187
 
  derived_tables_processing(false),
188
 
  m_lip(NULL),
189
 
  cached_table(0),
190
 
  transaction_message(NULL),
191
 
  statement_message(NULL),
192
 
  session_event_observers(NULL),
193
 
  _catalog(catalog_arg),
194
 
  use_usage(false)
195
 
{
196
 
  client->setSession(this);
 
218
 
 
219
extern "C"
 
220
int thd_in_lock_tables(const THD *thd)
 
221
{
 
222
  return test(thd->in_lock_tables);
 
223
}
 
224
 
 
225
 
 
226
extern "C"
 
227
int thd_tablespace_op(const THD *thd)
 
228
{
 
229
  return test(thd->tablespace_op);
 
230
}
 
231
 
 
232
 
 
233
extern "C"
 
234
const char *set_thd_proc_info(THD *thd, const char *info,
 
235
                              const char *calling_function __attribute__((unused)),
 
236
                              const char *calling_file __attribute__((unused)),
 
237
                              const unsigned int calling_line __attribute__((unused)))
 
238
{
 
239
  const char *old_info= thd->get_proc_info();
 
240
  thd->set_proc_info(info);
 
241
  return old_info;
 
242
}
 
243
 
 
244
extern "C"
 
245
void **thd_ha_data(const THD *thd, const struct handlerton *hton)
 
246
{
 
247
  return (void **) &thd->ha_data[hton->slot].ha_ptr;
 
248
}
 
249
 
 
250
extern "C"
 
251
int64_t thd_test_options(const THD *thd, int64_t test_options)
 
252
{
 
253
  return thd->options & test_options;
 
254
}
 
255
 
 
256
extern "C"
 
257
int thd_sql_command(const THD *thd)
 
258
{
 
259
  return (int) thd->lex->sql_command;
 
260
}
 
261
 
 
262
extern "C"
 
263
int thd_tx_isolation(const THD *thd)
 
264
{
 
265
  return (int) thd->variables.tx_isolation;
 
266
}
 
267
 
 
268
extern "C"
 
269
void thd_inc_row_count(THD *thd)
 
270
{
 
271
  thd->row_count++;
 
272
}
 
273
 
 
274
/**
 
275
  Clear this diagnostics area. 
 
276
 
 
277
  Normally called at the end of a statement.
 
278
*/
 
279
 
 
280
void
 
281
Diagnostics_area::reset_diagnostics_area()
 
282
{
 
283
  can_overwrite_status= false;
 
284
  /** Don't take chances in production */
 
285
  m_message[0]= '\0';
 
286
  m_sql_errno= 0;
 
287
  m_server_status= 0;
 
288
  m_affected_rows= 0;
 
289
  m_last_insert_id= 0;
 
290
  m_total_warn_count= 0;
 
291
  is_sent= false;
 
292
  /** Tiny reset in debug mode to see garbage right away */
 
293
  m_status= DA_EMPTY;
 
294
}
 
295
 
 
296
 
 
297
/**
 
298
  Set OK status -- ends commands that do not return a
 
299
  result set, e.g. INSERT/UPDATE/DELETE.
 
300
*/
 
301
 
 
302
void
 
303
Diagnostics_area::set_ok_status(THD *thd, ha_rows affected_rows_arg,
 
304
                                uint64_t last_insert_id_arg,
 
305
                                const char *message_arg)
 
306
{
 
307
  assert(! is_set());
 
308
  /*
 
309
    In production, refuse to overwrite an error or a custom response
 
310
    with an OK packet.
 
311
  */
 
312
  if (is_error() || is_disabled())
 
313
    return;
 
314
  /** Only allowed to report success if has not yet reported an error */
 
315
 
 
316
  m_server_status= thd->server_status;
 
317
  m_total_warn_count= thd->total_warn_count;
 
318
  m_affected_rows= affected_rows_arg;
 
319
  m_last_insert_id= last_insert_id_arg;
 
320
  if (message_arg)
 
321
    strmake(m_message, message_arg, sizeof(m_message) - 1);
 
322
  else
 
323
    m_message[0]= '\0';
 
324
  m_status= DA_OK;
 
325
}
 
326
 
 
327
 
 
328
/**
 
329
  Set EOF status.
 
330
*/
 
331
 
 
332
void
 
333
Diagnostics_area::set_eof_status(THD *thd)
 
334
{
 
335
  /** Only allowed to report eof if has not yet reported an error */
 
336
 
 
337
  assert(! is_set());
 
338
  /*
 
339
    In production, refuse to overwrite an error or a custom response
 
340
    with an EOF packet.
 
341
  */
 
342
  if (is_error() || is_disabled())
 
343
    return;
 
344
 
 
345
  m_server_status= thd->server_status;
 
346
  /*
 
347
    If inside a stored procedure, do not return the total
 
348
    number of warnings, since they are not available to the client
 
349
    anyway.
 
350
  */
 
351
  m_total_warn_count= thd->total_warn_count;
 
352
 
 
353
  m_status= DA_EOF;
 
354
}
 
355
 
 
356
/**
 
357
  Set ERROR status.
 
358
*/
 
359
 
 
360
void
 
361
Diagnostics_area::set_error_status(THD *thd __attribute__((unused)),
 
362
                                   uint sql_errno_arg,
 
363
                                   const char *message_arg)
 
364
{
 
365
  /*
 
366
    Only allowed to report error if has not yet reported a success
 
367
    The only exception is when we flush the message to the client,
 
368
    an error can happen during the flush.
 
369
  */
 
370
  assert(! is_set() || can_overwrite_status);
 
371
  /*
 
372
    In production, refuse to overwrite a custom response with an
 
373
    ERROR packet.
 
374
  */
 
375
  if (is_disabled())
 
376
    return;
 
377
 
 
378
  m_sql_errno= sql_errno_arg;
 
379
  strmake(m_message, message_arg, sizeof(m_message) - 1);
 
380
 
 
381
  m_status= DA_ERROR;
 
382
}
 
383
 
 
384
 
 
385
/**
 
386
  Mark the diagnostics area as 'DISABLED'.
 
387
 
 
388
  This is used in rare cases when the COM_ command at hand sends a response
 
389
  in a custom format. One example is the query cache, another is
 
390
  COM_STMT_PREPARE.
 
391
*/
 
392
 
 
393
void
 
394
Diagnostics_area::disable_status()
 
395
{
 
396
  assert(! is_set());
 
397
  m_status= DA_DISABLED;
 
398
}
 
399
 
 
400
 
 
401
THD::THD()
 
402
   :Statement(&main_lex, &main_mem_root, CONVENTIONAL_EXECUTION,
 
403
              /* statement id */ 0),
 
404
   Open_tables_state(refresh_version), rli_fake(0),
 
405
   lock_id(&main_lock_id),
 
406
   user_time(0), in_sub_stmt(0),
 
407
   binlog_table_maps(0), binlog_flags(0UL),
 
408
   arg_of_last_insert_id_function(false),
 
409
   first_successful_insert_id_in_prev_stmt(0),
 
410
   first_successful_insert_id_in_prev_stmt_for_binlog(0),
 
411
   first_successful_insert_id_in_cur_stmt(0),
 
412
   stmt_depends_on_first_successful_insert_id_in_prev_stmt(false),
 
413
   global_read_lock(0),
 
414
   is_fatal_error(0),
 
415
   transaction_rollback_request(0),
 
416
   is_fatal_sub_stmt_error(0),
 
417
   rand_used(0),
 
418
   time_zone_used(0),
 
419
   in_lock_tables(0),
 
420
   bootstrap(0),
 
421
   derived_tables_processing(false),
 
422
   m_lip(NULL)
 
423
{
 
424
  ulong tmp;
197
425
 
198
426
  /*
199
427
    Pass nominal parameters to init_alloc_root only to ensure that
200
428
    the destructor works OK in case of an error. The main_mem_root
201
429
    will be re-initialized in init_for_queries().
202
430
  */
203
 
  memory::init_sql_alloc(&main_mem_root, memory::ROOT_MIN_BLOCK_SIZE, 0);
 
431
  init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0);
 
432
  stmt_arena= this;
 
433
  thread_stack= 0;
 
434
  catalog= (char*)"std"; // the only catalog we have for now
 
435
  main_security_ctx.init();
 
436
  security_ctx= &main_security_ctx;
 
437
  some_tables_deleted=no_errors=password= 0;
 
438
  query_start_used= 0;
 
439
  count_cuted_fields= CHECK_FIELD_IGNORE;
 
440
  killed= NOT_KILLED;
 
441
  col_access=0;
 
442
  is_slave_error= thread_specific_used= false;
 
443
  hash_clear(&handler_tables_hash);
 
444
  tmp_table=0;
 
445
  used_tables=0;
204
446
  cuted_fields= sent_row_count= row_count= 0L;
205
 
  // Must be reset to handle error with Session's created for init of mysqld
 
447
  limit_found_rows= 0;
 
448
  row_count_func= -1;
 
449
  statement_id_counter= 0UL;
 
450
  // Must be reset to handle error with THD's created for init of mysqld
206
451
  lex->current_select= 0;
 
452
  start_time=(time_t) 0;
 
453
  start_utime= 0L;
 
454
  utime_after_lock= 0L;
 
455
  current_linfo =  0;
 
456
  slave_thread = 0;
207
457
  memset(&variables, 0, sizeof(variables));
208
 
  scoreboard_index= -1;
209
 
  cleanup_done= abort_on_warning= no_warnings_for_error= false;  
210
 
 
211
 
  /* query_cache init */
212
 
  query_cache_key= "";
213
 
  resultset= NULL;
 
458
  thread_id= 0;
 
459
  one_shot_set= 0;
 
460
  file_id = 0;
 
461
  query_id= 0;
 
462
  warn_id= 0;
 
463
  db_charset= global_system_variables.collation_database;
 
464
  memset(ha_data, 0, sizeof(ha_data));
 
465
  mysys_var=0;
 
466
  binlog_evt_union.do_union= false;
 
467
  enable_slow_log= 0;
 
468
  dbug_sentry=THD_SENTRY_MAGIC;
 
469
  net.vio=0;
 
470
  client_capabilities= 0;                       // minimalistic client
 
471
  system_thread= NON_SYSTEM_THREAD;
 
472
  cleanup_done= abort_on_warning= no_warnings_for_error= 0;
 
473
  peer_port= 0;                                 // For SHOW PROCESSLIST
 
474
  transaction.m_pending_rows_event= 0;
 
475
  transaction.on= 1;
 
476
  pthread_mutex_init(&LOCK_delete, MY_MUTEX_INIT_FAST);
214
477
 
215
478
  /* Variables with default values */
216
479
  proc_info="login";
217
 
 
218
 
  plugin_sessionvar_init(this);
 
480
  where= THD::DEFAULT_WHERE;
 
481
  server_id = ::server_id;
 
482
  slave_net = 0;
 
483
  command=COM_CONNECT;
 
484
  *scramble= '\0';
 
485
 
 
486
  init();
 
487
  /* Initialize sub structures */
 
488
  init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
 
489
  user_connect=(USER_CONN *)0;
 
490
  hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0,
 
491
            (hash_get_key) get_var_key,
 
492
            (hash_free_key) free_user_var, 0);
 
493
 
 
494
  /* For user vars replication*/
 
495
  if (opt_bin_log)
 
496
    my_init_dynamic_array(&user_var_events,
 
497
                          sizeof(BINLOG_USER_VAR_EVENT *), 16, 16);
 
498
  else
 
499
    memset(&user_var_events, 0, sizeof(user_var_events));
 
500
 
 
501
  /* Protocol */
 
502
  protocol= &protocol_text;                     // Default protocol
 
503
  protocol_text.init(this);
 
504
 
 
505
  tablespace_op= false;
 
506
  tmp= sql_rnd();
 
507
  randominit(&rand, tmp + (ulong) &rand, tmp + (ulong) ::global_query_id);
 
508
  substitute_null_with_insert_id = false;
 
509
  thr_lock_info_init(&lock_info); /* safety: will be reset after start */
 
510
  thr_lock_owner_init(&main_lock_id, &lock_info);
 
511
 
 
512
  m_internal_handler= NULL;
 
513
}
 
514
 
 
515
 
 
516
void THD::push_internal_handler(Internal_error_handler *handler)
 
517
{
 
518
  /*
 
519
    TODO: The current implementation is limited to 1 handler at a time only.
 
520
    THD and sp_rcontext need to be modified to use a common handler stack.
 
521
  */
 
522
  assert(m_internal_handler == NULL);
 
523
  m_internal_handler= handler;
 
524
}
 
525
 
 
526
 
 
527
bool THD::handle_error(uint sql_errno, const char *message,
 
528
                       DRIZZLE_ERROR::enum_warning_level level)
 
529
{
 
530
  if (m_internal_handler)
 
531
  {
 
532
    return m_internal_handler->handle_error(sql_errno, message, level, this);
 
533
  }
 
534
 
 
535
  return false;                                 // 'false', as per coding style
 
536
}
 
537
 
 
538
 
 
539
void THD::pop_internal_handler()
 
540
{
 
541
  assert(m_internal_handler != NULL);
 
542
  m_internal_handler= NULL;
 
543
}
 
544
 
 
545
extern "C"
 
546
void *thd_alloc(DRIZZLE_THD thd, unsigned int size)
 
547
{
 
548
  return thd->alloc(size);
 
549
}
 
550
 
 
551
extern "C"
 
552
void *thd_calloc(DRIZZLE_THD thd, unsigned int size)
 
553
{
 
554
  return thd->calloc(size);
 
555
}
 
556
 
 
557
extern "C"
 
558
char *thd_strdup(DRIZZLE_THD thd, const char *str)
 
559
{
 
560
  return thd->strdup(str);
 
561
}
 
562
 
 
563
extern "C"
 
564
char *thd_strmake(DRIZZLE_THD thd, const char *str, unsigned int size)
 
565
{
 
566
  return thd->strmake(str, size);
 
567
}
 
568
 
 
569
extern "C"
 
570
LEX_STRING *thd_make_lex_string(THD *thd, LEX_STRING *lex_str,
 
571
                                const char *str, unsigned int size,
 
572
                                int allocate_lex_string)
 
573
{
 
574
  return thd->make_lex_string(lex_str, str, size,
 
575
                              (bool) allocate_lex_string);
 
576
}
 
577
 
 
578
extern "C"
 
579
void *thd_memdup(DRIZZLE_THD thd, const void* str, unsigned int size)
 
580
{
 
581
  return thd->memdup(str, size);
 
582
}
 
583
 
 
584
extern "C"
 
585
void thd_get_xid(const DRIZZLE_THD thd, DRIZZLE_XID *xid)
 
586
{
 
587
  *xid = *(DRIZZLE_XID *) &thd->transaction.xid_state.xid;
 
588
}
 
589
 
 
590
/*
 
591
  Init common variables that has to be reset on start and on change_user
 
592
*/
 
593
 
 
594
void THD::init(void)
 
595
{
 
596
  pthread_mutex_lock(&LOCK_global_system_variables);
 
597
  plugin_thdvar_init(this);
 
598
  variables.time_format= date_time_format_copy((THD*) 0,
 
599
                                               variables.time_format);
 
600
  variables.date_format= date_time_format_copy((THD*) 0,
 
601
                                               variables.date_format);
 
602
  variables.datetime_format= date_time_format_copy((THD*) 0,
 
603
                                                   variables.datetime_format);
219
604
  /*
220
605
    variables= global_system_variables above has reset
221
606
    variables.pseudo_thread_id to 0. We need to correct it here to
222
607
    avoid temporary tables replication failure.
223
608
  */
224
609
  variables.pseudo_thread_id= thread_id;
 
610
  pthread_mutex_unlock(&LOCK_global_system_variables);
225
611
  server_status= SERVER_STATUS_AUTOCOMMIT;
 
612
  options= thd_startup_options;
226
613
 
227
614
  if (variables.max_join_size == HA_POS_ERROR)
228
615
    options |= OPTION_BIG_SELECTS;
229
616
  else
230
617
    options &= ~OPTION_BIG_SELECTS;
231
618
 
 
619
  transaction.all.modified_non_trans_table= transaction.stmt.modified_non_trans_table= false;
232
620
  open_options=ha_open_options;
233
 
  update_lock_default= TL_WRITE;
 
621
  update_lock_default= (variables.low_priority_updates ?
 
622
                        TL_WRITE_LOW_PRIORITY :
 
623
                        TL_WRITE);
234
624
  session_tx_isolation= (enum_tx_isolation) variables.tx_isolation;
235
625
  warn_list.empty();
236
626
  memset(warn_count, 0, sizeof(warn_count));
 
627
  total_warn_count= 0;
 
628
  update_charset();
 
629
  reset_current_stmt_binlog_row_based();
237
630
  memset(&status_var, 0, sizeof(status_var));
238
 
 
239
 
  /* Initialize sub structures */
240
 
  memory::init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
241
 
 
242
 
  substitute_null_with_insert_id = false;
243
 
  lock_info.init(); /* safety: will be reset after start */
244
 
  thr_lock_owner_init(&main_lock_id, &lock_info);
245
 
 
246
 
  m_internal_handler= NULL;
247
 
  
248
 
  plugin::EventObserver::registerSessionEvents(*this); 
249
 
}
250
 
 
251
 
void Session::free_items()
252
 
{
253
 
  Item *next;
254
 
  /* This works because items are allocated with memory::sql_alloc() */
255
 
  for (; free_list; free_list= next)
256
 
  {
257
 
    next= free_list->next;
258
 
    free_list->delete_self();
259
 
  }
260
 
}
261
 
 
262
 
void Session::push_internal_handler(Internal_error_handler *handler)
263
 
{
264
 
  /*
265
 
    TODO: The current implementation is limited to 1 handler at a time only.
266
 
    Session and sp_rcontext need to be modified to use a common handler stack.
267
 
  */
268
 
  assert(m_internal_handler == NULL);
269
 
  m_internal_handler= handler;
270
 
}
271
 
 
272
 
bool Session::handle_error(drizzled::error_t sql_errno, const char *message,
273
 
                           DRIZZLE_ERROR::enum_warning_level level)
274
 
{
275
 
  if (m_internal_handler)
276
 
  {
277
 
    return m_internal_handler->handle_error(sql_errno, message, level, this);
278
 
  }
279
 
 
280
 
  return false;                                 // 'false', as per coding style
281
 
}
282
 
 
283
 
void Session::setAbort(bool arg)
284
 
{
285
 
  mysys_var->abort= arg;
286
 
}
287
 
 
288
 
void Session::lockOnSys()
289
 
{
290
 
  if (not mysys_var)
291
 
    return;
292
 
 
293
 
  setAbort(true);
294
 
  boost_unique_lock_t scopedLock(mysys_var->mutex);
295
 
  if (mysys_var->current_cond)
296
 
  {
297
 
    mysys_var->current_mutex->lock();
298
 
    mysys_var->current_cond->notify_all();
299
 
    mysys_var->current_mutex->unlock();
300
 
  }
301
 
}
302
 
 
303
 
void Session::pop_internal_handler()
304
 
{
305
 
  assert(m_internal_handler != NULL);
306
 
  m_internal_handler= NULL;
307
 
}
308
 
 
309
 
void Session::get_xid(DRIZZLE_XID *xid)
310
 
{
311
 
  *xid = *(DRIZZLE_XID *) &transaction.xid_state.xid;
312
 
}
 
631
}
 
632
 
 
633
 
 
634
/*
 
635
  Init THD for query processing.
 
636
  This has to be called once before we call mysql_parse.
 
637
  See also comments in sql_class.h.
 
638
*/
 
639
 
 
640
void THD::init_for_queries()
 
641
{
 
642
  set_time(); 
 
643
  ha_enable_transaction(this,true);
 
644
 
 
645
  reset_root_defaults(mem_root, variables.query_alloc_block_size,
 
646
                      variables.query_prealloc_size);
 
647
  reset_root_defaults(&transaction.mem_root,
 
648
                      variables.trans_alloc_block_size,
 
649
                      variables.trans_prealloc_size);
 
650
  transaction.xid_state.xid.null();
 
651
  transaction.xid_state.in_thd=1;
 
652
}
 
653
 
313
654
 
314
655
/* Do operations that may take a long time */
315
656
 
316
 
void Session::cleanup(void)
 
657
void THD::cleanup(void)
317
658
{
318
 
  assert(cleanup_done == false);
 
659
  assert(cleanup_done == 0);
319
660
 
320
 
  setKilled(KILL_CONNECTION);
 
661
  killed= KILL_CONNECTION;
321
662
#ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE
322
663
  if (transaction.xid_state.xa_state == XA_PREPARED)
323
664
  {
325
666
  }
326
667
#endif
327
668
  {
328
 
    TransactionServices &transaction_services= TransactionServices::singleton();
329
 
    transaction_services.rollbackTransaction(*this, true);
 
669
    ha_rollback(this);
330
670
    xid_cache_delete(&transaction.xid_state);
331
671
  }
332
 
 
333
 
  for (UserVars::iterator iter= user_vars.begin();
334
 
       iter != user_vars.end();
335
 
       iter++)
 
672
  if (locked_tables)
336
673
  {
337
 
    user_var_entry *entry= (*iter).second;
338
 
    boost::checked_delete(entry);
 
674
    lock=locked_tables; locked_tables=0;
 
675
    close_thread_tables(this);
339
676
  }
340
 
  user_vars.clear();
341
 
 
342
 
 
343
 
  close_temporary_tables();
344
 
 
 
677
  mysql_ha_cleanup(this);
 
678
  delete_dynamic(&user_var_events);
 
679
  hash_free(&user_vars);
 
680
  close_temporary_tables(this);
 
681
  my_free((char*) variables.time_format, MYF(MY_ALLOW_ZERO_PTR));
 
682
  my_free((char*) variables.date_format, MYF(MY_ALLOW_ZERO_PTR));
 
683
  my_free((char*) variables.datetime_format, MYF(MY_ALLOW_ZERO_PTR));
 
684
  
345
685
  if (global_read_lock)
346
 
  {
347
 
    unlockGlobalReadLock();
348
 
  }
 
686
    unlock_global_read_lock(this);
349
687
 
350
 
  cleanup_done= true;
 
688
  cleanup_done=1;
 
689
  return;
351
690
}
352
691
 
353
 
Session::~Session()
 
692
THD::~THD()
354
693
{
355
 
  this->checkSentry();
356
 
 
357
 
  if (client and client->isConnected())
358
 
  {
359
 
    assert(security_ctx);
360
 
    if (global_system_variables.log_warnings)
361
 
    {
362
 
      errmsg_printf(error::WARN, ER(ER_FORCING_CLOSE),
363
 
                    internal::my_progname,
364
 
                    thread_id,
365
 
                    security_ctx->username().c_str());
366
 
    }
367
 
 
368
 
    disconnect();
369
 
  }
 
694
  THD_CHECK_SENTRY(this);
 
695
  /* Ensure that no one is using THD */
 
696
  pthread_mutex_lock(&LOCK_delete);
 
697
  pthread_mutex_unlock(&LOCK_delete);
 
698
  add_to_status(&global_status_var, &status_var);
370
699
 
371
700
  /* Close connection */
372
 
  if (client)
 
701
  if (net.vio)
373
702
  {
374
 
    client->close();
375
 
    boost::checked_delete(client);
376
 
    client= NULL;
 
703
    net_close(&net);
 
704
    net_end(&net);
377
705
  }
378
 
 
379
 
  if (cleanup_done == false)
 
706
  if (!cleanup_done)
380
707
    cleanup();
381
708
 
382
 
  plugin::StorageEngine::closeConnection(this);
383
 
  plugin_sessionvar_cleanup(this);
 
709
  ha_close_connection(this);
 
710
  plugin_thdvar_cleanup(this);
384
711
 
385
 
  warn_root.free_root(MYF(0));
 
712
  main_security_ctx.destroy();
 
713
  safeFree(db);
 
714
  free_root(&warn_root,MYF(0));
 
715
  free_root(&transaction.mem_root,MYF(0));
386
716
  mysys_var=0;                                  // Safety (shouldn't be needed)
387
 
  dbug_sentry= Session_SENTRY_GONE;
388
 
 
389
 
  main_mem_root.free_root(MYF(0));
390
 
  currentMemRoot().release();
391
 
  currentSession().release();
392
 
 
393
 
  plugin::Logging::postEndDo(this);
394
 
  plugin::EventObserver::deregisterSessionEvents(*this); 
395
 
}
396
 
 
397
 
void Session::setClient(plugin::Client *client_arg)
398
 
{
399
 
  client= client_arg;
400
 
  client->setSession(this);
401
 
}
402
 
 
403
 
void Session::awake(Session::killed_state_t state_to_set)
404
 
{
405
 
  if ((state_to_set == Session::KILL_QUERY) and (command == COM_SLEEP))
406
 
    return;
407
 
 
408
 
  this->checkSentry();
409
 
 
410
 
  setKilled(state_to_set);
411
 
  scheduler->killSession(this);
412
 
 
413
 
  if (state_to_set != Session::KILL_QUERY)
414
 
  {
415
 
    DRIZZLE_CONNECTION_DONE(thread_id);
416
 
  }
417
 
 
 
717
  pthread_mutex_destroy(&LOCK_delete);
 
718
  dbug_sentry= THD_SENTRY_GONE;
 
719
  if (rli_fake)
 
720
  {
 
721
    delete rli_fake;
 
722
    rli_fake= NULL;
 
723
  }
 
724
  
 
725
  free_root(&main_mem_root, MYF(0));
 
726
  return;
 
727
}
 
728
 
 
729
 
 
730
/*
 
731
  Add all status variables to another status variable array
 
732
 
 
733
  SYNOPSIS
 
734
   add_to_status()
 
735
   to_var       add to this array
 
736
   from_var     from this array
 
737
 
 
738
  NOTES
 
739
    This function assumes that all variables are long/ulong.
 
740
    If this assumption will change, then we have to explictely add
 
741
    the other variables after the while loop
 
742
*/
 
743
 
 
744
void add_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var)
 
745
{
 
746
  ulong *end= (ulong*) ((uchar*) to_var +
 
747
                        offsetof(STATUS_VAR, last_system_status_var) +
 
748
                        sizeof(ulong));
 
749
  ulong *to= (ulong*) to_var, *from= (ulong*) from_var;
 
750
 
 
751
  while (to != end)
 
752
    *(to++)+= *(from++);
 
753
}
 
754
 
 
755
/*
 
756
  Add the difference between two status variable arrays to another one.
 
757
 
 
758
  SYNOPSIS
 
759
    add_diff_to_status
 
760
    to_var       add to this array
 
761
    from_var     from this array
 
762
    dec_var      minus this array
 
763
  
 
764
  NOTE
 
765
    This function assumes that all variables are long/ulong.
 
766
*/
 
767
 
 
768
void add_diff_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var,
 
769
                        STATUS_VAR *dec_var)
 
770
{
 
771
  ulong *end= (ulong*) ((uchar*) to_var + offsetof(STATUS_VAR,
 
772
                                                  last_system_status_var) +
 
773
                        sizeof(ulong));
 
774
  ulong *to= (ulong*) to_var, *from= (ulong*) from_var, *dec= (ulong*) dec_var;
 
775
 
 
776
  while (to != end)
 
777
    *(to++)+= *(from++) - *(dec++);
 
778
}
 
779
 
 
780
 
 
781
void THD::awake(THD::killed_state state_to_set)
 
782
{
 
783
  THD_CHECK_SENTRY(this);
 
784
  safe_mutex_assert_owner(&LOCK_delete); 
 
785
 
 
786
  killed= state_to_set;
 
787
  if (state_to_set != THD::KILL_QUERY)
 
788
  {
 
789
    thr_alarm_kill(thread_id);
 
790
    if (!slave_thread)
 
791
      thread_scheduler.post_kill_notification(this);
 
792
  }
418
793
  if (mysys_var)
419
794
  {
420
 
    boost_unique_lock_t scopedLock(mysys_var->mutex);
 
795
    pthread_mutex_lock(&mysys_var->mutex);
 
796
    if (!system_thread)         // Don't abort locks
 
797
      mysys_var->abort=1;
421
798
    /*
422
 
      "
423
799
      This broadcast could be up in the air if the victim thread
424
800
      exits the cond in the time between read and broadcast, but that is
425
801
      ok since all we want to do is to make the victim thread get out
435
811
      current_cond and current_mutex are 0), then the victim will not get
436
812
      a signal and it may wait "forever" on the cond (until
437
813
      we issue a second KILL or the status it's waiting for happens).
438
 
      It's true that we have set its session->killed but it may not
 
814
      It's true that we have set its thd->killed but it may not
439
815
      see it immediately and so may have time to reach the cond_wait().
440
816
    */
441
817
    if (mysys_var->current_cond && mysys_var->current_mutex)
442
818
    {
443
 
      mysys_var->current_mutex->lock();
444
 
      mysys_var->current_cond->notify_all();
445
 
      mysys_var->current_mutex->unlock();
 
819
      pthread_mutex_lock(mysys_var->current_mutex);
 
820
      pthread_cond_broadcast(mysys_var->current_cond);
 
821
      pthread_mutex_unlock(mysys_var->current_mutex);
446
822
    }
 
823
    pthread_mutex_unlock(&mysys_var->mutex);
447
824
  }
 
825
  return;
448
826
}
449
827
 
450
828
/*
451
829
  Remember the location of thread info, the structure needed for
452
 
  memory::sql_alloc() and the structure for the net buffer
 
830
  sql_alloc() and the structure for the net buffer
453
831
*/
454
 
bool Session::storeGlobals()
 
832
 
 
833
bool THD::store_globals()
455
834
{
456
835
  /*
457
836
    Assert that thread_stack is initialized: it's necessary to be able
459
838
  */
460
839
  assert(thread_stack);
461
840
 
462
 
  currentSession().release();
463
 
  currentSession().reset(this);
464
 
 
465
 
  currentMemRoot().release();
466
 
  currentMemRoot().reset(&mem_root);
467
 
 
 
841
  if (my_pthread_setspecific_ptr(THR_THD,  this) ||
 
842
      my_pthread_setspecific_ptr(THR_MALLOC, &mem_root))
 
843
    return 1;
468
844
  mysys_var=my_thread_var;
469
 
 
470
845
  /*
471
846
    Let mysqld define the thread id (not mysys)
472
 
    This allows us to move Session to different threads if needed.
 
847
    This allows us to move THD to different threads if needed.
473
848
  */
474
849
  mysys_var->id= thread_id;
 
850
  real_id= pthread_self();                      // For debugging
475
851
 
476
852
  /*
477
 
    We have to call thr_lock_info_init() again here as Session may have been
 
853
    We have to call thr_lock_info_init() again here as THD may have been
478
854
    created in another thread
479
855
  */
480
 
  lock_info.init();
481
 
 
482
 
  return false;
483
 
}
484
 
 
485
 
/*
486
 
  Init Session for query processing.
487
 
  This has to be called once before we call mysql_parse.
488
 
  See also comments in session.h.
489
 
*/
490
 
 
491
 
void Session::prepareForQueries()
492
 
{
493
 
  if (variables.max_join_size == HA_POS_ERROR)
494
 
    options |= OPTION_BIG_SELECTS;
495
 
 
496
 
  version= refresh_version;
497
 
  set_proc_info(NULL);
498
 
  command= COM_SLEEP;
499
 
  set_time();
500
 
 
501
 
  mem_root->reset_root_defaults(variables.query_alloc_block_size,
502
 
                                variables.query_prealloc_size);
503
 
  transaction.xid_state.xid.null();
504
 
  transaction.xid_state.in_session=1;
505
 
  if (use_usage)
506
 
    resetUsage();
507
 
}
508
 
 
509
 
bool Session::initGlobals()
510
 
{
511
 
  if (storeGlobals())
512
 
  {
513
 
    disconnect(ER_OUT_OF_RESOURCES);
514
 
    status_var.aborted_connects++;
515
 
    return true;
516
 
  }
517
 
  return false;
518
 
}
519
 
 
520
 
void Session::run()
521
 
{
522
 
  if (initGlobals() || authenticate())
523
 
  {
524
 
    disconnect();
525
 
    return;
526
 
  }
527
 
 
528
 
  prepareForQueries();
529
 
 
530
 
  while (not client->haveError() && getKilled() != KILL_CONNECTION)
531
 
  {
532
 
    if (not executeStatement())
533
 
      break;
534
 
  }
535
 
 
536
 
  disconnect();
537
 
}
538
 
 
539
 
bool Session::schedule(Session::shared_ptr &arg)
540
 
{
541
 
  arg->scheduler= plugin::Scheduler::getScheduler();
542
 
  assert(arg->scheduler);
543
 
 
544
 
  ++connection_count;
545
 
 
546
 
  long current_connections= connection_count;
547
 
 
548
 
  if (current_connections > 0 and static_cast<uint64_t>(current_connections) > current_global_counters.max_used_connections)
549
 
  {
550
 
    current_global_counters.max_used_connections= static_cast<uint64_t>(connection_count);
551
 
  }
552
 
 
553
 
  current_global_counters.connections++;
554
 
  arg->thread_id= arg->variables.pseudo_thread_id= global_thread_id++;
555
 
 
556
 
  session::Cache::singleton().insert(arg);
557
 
 
558
 
  if (unlikely(plugin::EventObserver::connectSession(*arg)))
559
 
  {
560
 
    // We should do something about an error...
561
 
  }
562
 
 
563
 
  if (plugin::Scheduler::getScheduler()->addSession(arg))
564
 
  {
565
 
    DRIZZLE_CONNECTION_START(arg->getSessionId());
566
 
    char error_message_buff[DRIZZLE_ERRMSG_SIZE];
567
 
 
568
 
    arg->setKilled(Session::KILL_CONNECTION);
569
 
 
570
 
    arg->status_var.aborted_connects++;
571
 
 
572
 
    /* Can't use my_error() since store_globals has not been called. */
573
 
    /* TODO replace will better error message */
574
 
    snprintf(error_message_buff, sizeof(error_message_buff),
575
 
             ER(ER_CANT_CREATE_THREAD), 1);
576
 
    arg->client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
577
 
 
578
 
    return true;
579
 
  }
580
 
 
581
 
  return false;
582
 
}
583
 
 
584
 
 
585
 
/*
586
 
  Is this session viewable by the current user?
587
 
*/
588
 
bool Session::isViewable(identifier::User::const_reference user_arg) const
589
 
{
590
 
  return plugin::Authorization::isAuthorized(user_arg, this, false);
591
 
}
592
 
 
593
 
 
594
 
const char* Session::enter_cond(boost::condition_variable_any &cond, boost::mutex &mutex, const char* msg)
595
 
{
596
 
  const char* old_msg = get_proc_info();
597
 
  safe_mutex_assert_owner(mutex);
598
 
  mysys_var->current_mutex = &mutex;
599
 
  mysys_var->current_cond = &cond;
600
 
  this->set_proc_info(msg);
601
 
  return old_msg;
602
 
}
603
 
 
604
 
void Session::exit_cond(const char* old_msg)
605
 
{
606
 
  /*
607
 
    Putting the mutex unlock in exit_cond() ensures that
608
 
    mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
609
 
    locked (if that would not be the case, you'll get a deadlock if someone
610
 
    does a Session::awake() on you).
611
 
  */
612
 
  mysys_var->current_mutex->unlock();
613
 
  boost_unique_lock_t scopedLock(mysys_var->mutex);
614
 
  mysys_var->current_mutex = 0;
615
 
  mysys_var->current_cond = 0;
616
 
  this->set_proc_info(old_msg);
617
 
}
618
 
 
619
 
bool Session::authenticate()
620
 
{
621
 
  if (client->authenticate())
622
 
    return false;
623
 
 
624
 
  status_var.aborted_connects++;
625
 
 
626
 
  return true;
627
 
}
628
 
 
629
 
bool Session::checkUser(const std::string &passwd_str,
630
 
                        const std::string &in_db)
631
 
{
632
 
  bool is_authenticated=
633
 
    plugin::Authentication::isAuthenticated(user(), passwd_str);
634
 
 
635
 
  if (is_authenticated != true)
636
 
  {
637
 
    status_var.access_denied++;
638
 
    /* isAuthenticated has pushed the error message */
639
 
    return false;
640
 
  }
641
 
 
642
 
  /* Change database if necessary */
643
 
  if (not in_db.empty())
644
 
  {
645
 
    identifier::Schema identifier(in_db);
646
 
    if (change_db(this, identifier))
647
 
    {
648
 
      /* change_db() has pushed the error message. */
649
 
      return false;
650
 
    }
651
 
  }
652
 
  my_ok();
653
 
  password= not passwd_str.empty();
654
 
 
655
 
  /* Ready to handle queries */
656
 
  return true;
657
 
}
658
 
 
659
 
bool Session::executeStatement()
660
 
{
661
 
  char *l_packet= 0;
662
 
  uint32_t packet_length;
663
 
 
664
 
  enum enum_server_command l_command;
665
 
 
666
 
  /*
667
 
    indicator of uninitialized lex => normal flow of errors handling
668
 
    (see my_message_sql)
669
 
  */
670
 
  lex->current_select= 0;
671
 
  clear_error();
672
 
  main_da.reset_diagnostics_area();
673
 
 
674
 
  if (client->readCommand(&l_packet, &packet_length) == false)
675
 
  {
676
 
    return false;
677
 
  }
678
 
 
679
 
  if (getKilled() == KILL_CONNECTION)
680
 
    return false;
681
 
 
682
 
  if (packet_length == 0)
683
 
    return true;
684
 
 
685
 
  l_command= static_cast<enum_server_command>(l_packet[0]);
686
 
 
687
 
  if (command >= COM_END)
688
 
    command= COM_END;                           // Wrong command
689
 
 
690
 
  assert(packet_length);
691
 
  return not dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
692
 
}
693
 
 
694
 
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
695
 
{
696
 
  /* Remove garbage at start and end of query */
697
 
  while (in_packet_length > 0 && my_isspace(charset(), in_packet[0]))
698
 
  {
699
 
    in_packet++;
700
 
    in_packet_length--;
701
 
  }
702
 
  const char *pos= in_packet + in_packet_length; /* Point at end null */
703
 
  while (in_packet_length > 0 && (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
704
 
  {
705
 
    pos--;
706
 
    in_packet_length--;
707
 
  }
708
 
 
709
 
  std::string *new_query= new std::string(in_packet, in_packet + in_packet_length);
710
 
  // We can not be entirely sure _schema has a value
711
 
  if (_schema)
712
 
  {
713
 
    plugin::QueryRewriter::rewriteQuery(*_schema, *new_query);
714
 
  }
715
 
  query.reset(new_query);
716
 
  _state.reset(new session::State(in_packet, in_packet_length));
717
 
 
718
 
  return true;
719
 
}
720
 
 
721
 
bool Session::endTransaction(enum enum_mysql_completiontype completion)
722
 
{
723
 
  bool do_release= 0;
724
 
  bool result= true;
725
 
  TransactionServices &transaction_services= TransactionServices::singleton();
726
 
 
727
 
  if (transaction.xid_state.xa_state != XA_NOTR)
728
 
  {
729
 
    my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
730
 
    return false;
731
 
  }
732
 
  switch (completion)
733
 
  {
734
 
    case COMMIT:
735
 
      /*
736
 
       * We don't use endActiveTransaction() here to ensure that this works
737
 
       * even if there is a problem with the OPTION_AUTO_COMMIT flag
738
 
       * (Which of course should never happen...)
739
 
       */
740
 
      server_status&= ~SERVER_STATUS_IN_TRANS;
741
 
      if (transaction_services.commitTransaction(*this, true))
742
 
        result= false;
743
 
      options&= ~(OPTION_BEGIN);
744
 
      break;
745
 
    case COMMIT_RELEASE:
746
 
      do_release= 1; /* fall through */
747
 
    case COMMIT_AND_CHAIN:
748
 
      result= endActiveTransaction();
749
 
      if (result == true && completion == COMMIT_AND_CHAIN)
750
 
        result= startTransaction();
751
 
      break;
752
 
    case ROLLBACK_RELEASE:
753
 
      do_release= 1; /* fall through */
754
 
    case ROLLBACK:
755
 
    case ROLLBACK_AND_CHAIN:
756
 
    {
757
 
      server_status&= ~SERVER_STATUS_IN_TRANS;
758
 
      if (transaction_services.rollbackTransaction(*this, true))
759
 
        result= false;
760
 
      options&= ~(OPTION_BEGIN);
761
 
      if (result == true && (completion == ROLLBACK_AND_CHAIN))
762
 
        result= startTransaction();
763
 
      break;
764
 
    }
765
 
    default:
766
 
      my_error(ER_UNKNOWN_COM_ERROR, MYF(0));
767
 
      return false;
768
 
  }
769
 
 
770
 
  if (result == false)
771
 
  {
772
 
    my_error(static_cast<drizzled::error_t>(killed_errno()), MYF(0));
773
 
  }
774
 
  else if ((result == true) && do_release)
775
 
  {
776
 
    setKilled(Session::KILL_CONNECTION);
777
 
  }
778
 
 
779
 
  return result;
780
 
}
781
 
 
782
 
bool Session::endActiveTransaction()
783
 
{
784
 
  bool result= true;
785
 
  TransactionServices &transaction_services= TransactionServices::singleton();
786
 
 
787
 
  if (transaction.xid_state.xa_state != XA_NOTR)
788
 
  {
789
 
    my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
790
 
    return false;
791
 
  }
792
 
  if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
793
 
  {
794
 
    server_status&= ~SERVER_STATUS_IN_TRANS;
795
 
    if (transaction_services.commitTransaction(*this, true))
796
 
      result= false;
797
 
  }
798
 
  options&= ~(OPTION_BEGIN);
799
 
  return result;
800
 
}
801
 
 
802
 
bool Session::startTransaction(start_transaction_option_t opt)
803
 
{
804
 
  bool result= true;
805
 
 
806
 
  assert(! inTransaction());
807
 
 
808
 
  options|= OPTION_BEGIN;
809
 
  server_status|= SERVER_STATUS_IN_TRANS;
810
 
 
811
 
  if (plugin::TransactionalStorageEngine::notifyStartTransaction(this, opt))
812
 
  {
813
 
    result= false;
814
 
  }
815
 
 
816
 
  return result;
817
 
}
818
 
 
819
 
void Session::cleanup_after_query()
820
 
{
821
 
  /*
822
 
    Reset rand_used so that detection of calls to rand() will save random
 
856
  thr_lock_info_init(&lock_info);
 
857
  return 0;
 
858
}
 
859
 
 
860
 
 
861
/*
 
862
  Cleanup after query.
 
863
 
 
864
  SYNOPSIS
 
865
    THD::cleanup_after_query()
 
866
 
 
867
  DESCRIPTION
 
868
    This function is used to reset thread data to its default state.
 
869
 
 
870
  NOTE
 
871
    This function is not suitable for setting thread data to some
 
872
    non-default values, as there is only one replication thread, so
 
873
    different master threads may overwrite data of each other on
 
874
    slave.
 
875
*/
 
876
 
 
877
void THD::cleanup_after_query()
 
878
{
 
879
  /*
 
880
    Reset rand_used so that detection of calls to rand() will save random 
823
881
    seeds if needed by the slave.
 
882
 
 
883
    Do not reset rand_used if inside a stored function or trigger because 
 
884
    only the call to these operations is logged. Thus only the calling 
 
885
    statement needs to detect rand() calls made by its substatements. These
 
886
    substatements must not set rand_used to 0 because it would remove the
 
887
    detection of rand() by the calling statement. 
824
888
  */
 
889
  if (!in_sub_stmt) /* stored functions and triggers are a special case */
825
890
  {
826
891
    /* Forget those values, for next binlogger: */
 
892
    stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
827
893
    auto_inc_intervals_in_cur_stmt_for_binlog.empty();
 
894
    rand_used= 0;
828
895
  }
829
896
  if (first_successful_insert_id_in_cur_stmt > 0)
830
897
  {
831
898
    /* set what LAST_INSERT_ID() will return */
832
 
    first_successful_insert_id_in_prev_stmt= first_successful_insert_id_in_cur_stmt;
 
899
    first_successful_insert_id_in_prev_stmt= 
 
900
      first_successful_insert_id_in_cur_stmt;
833
901
    first_successful_insert_id_in_cur_stmt= 0;
834
902
    substitute_null_with_insert_id= true;
835
903
  }
836
 
 
837
 
  arg_of_last_insert_id_function= false;
838
 
 
 
904
  arg_of_last_insert_id_function= 0;
839
905
  /* Free Items that were created during this execution */
840
906
  free_items();
841
 
 
842
 
  /* Reset _where. */
843
 
  _where= Session::DEFAULT_WHERE;
844
 
 
845
 
  /* Reset the temporary shares we built */
846
 
  for_each(temporary_shares.begin(),
847
 
           temporary_shares.end(),
848
 
           DeletePtr());
849
 
  temporary_shares.clear();
 
907
  /* Reset where. */
 
908
  where= THD::DEFAULT_WHERE;
850
909
}
851
910
 
 
911
 
852
912
/**
853
913
  Create a LEX_STRING in this connection.
854
914
 
859
919
                              instead of using lex_str value
860
920
  @return  NULL on failure, or pointer to the LEX_STRING object
861
921
*/
862
 
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
863
 
                                     const std::string &str,
864
 
                                     bool allocate_lex_string)
865
 
{
866
 
  return make_lex_string(lex_str, str.c_str(), str.length(), allocate_lex_string);
867
 
}
868
 
 
869
 
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
870
 
                                     const char* str, uint32_t length,
871
 
                                     bool allocate_lex_string)
 
922
LEX_STRING *THD::make_lex_string(LEX_STRING *lex_str,
 
923
                                 const char* str, uint length,
 
924
                                 bool allocate_lex_string)
872
925
{
873
926
  if (allocate_lex_string)
874
 
    if (!(lex_str= (LEX_STRING *)getMemRoot()->allocate(sizeof(LEX_STRING))))
 
927
    if (!(lex_str= (LEX_STRING *)alloc(sizeof(LEX_STRING))))
875
928
      return 0;
876
 
  if (!(lex_str->str= mem_root->strmake_root(str, length)))
 
929
  if (!(lex_str->str= strmake_root(mem_root, str, length)))
877
930
    return 0;
878
931
  lex_str->length= length;
879
932
  return lex_str;
880
933
}
881
934
 
882
 
int Session::send_explain_fields(select_result *result)
 
935
 
 
936
/*
 
937
  Convert a string to another character set
 
938
 
 
939
  SYNOPSIS
 
940
    convert_string()
 
941
    to                          Store new allocated string here
 
942
    to_cs                       New character set for allocated string
 
943
    from                        String to convert
 
944
    from_length                 Length of string to convert
 
945
    from_cs                     Original character set
 
946
 
 
947
  NOTES
 
948
    to will be 0-terminated to make it easy to pass to system funcs
 
949
 
 
950
  RETURN
 
951
    0   ok
 
952
    1   End of memory.
 
953
        In this case to->str will point to 0 and to->length will be 0.
 
954
*/
 
955
 
 
956
bool THD::convert_string(LEX_STRING *to, const CHARSET_INFO * const to_cs,
 
957
                         const char *from, uint from_length,
 
958
                         const CHARSET_INFO * const from_cs)
 
959
{
 
960
  size_t new_length= to_cs->mbmaxlen * from_length;
 
961
  uint dummy_errors;
 
962
  if (!(to->str= (char*) alloc(new_length+1)))
 
963
  {
 
964
    to->length= 0;                              // Safety fix
 
965
    return(1);                          // EOM
 
966
  }
 
967
  to->length= copy_and_convert((char*) to->str, new_length, to_cs,
 
968
                               from, from_length, from_cs, &dummy_errors);
 
969
  to->str[to->length]=0;                        // Safety
 
970
  return(0);
 
971
}
 
972
 
 
973
 
 
974
/*
 
975
  Convert string from source character set to target character set inplace.
 
976
 
 
977
  SYNOPSIS
 
978
    THD::convert_string
 
979
 
 
980
  DESCRIPTION
 
981
    Convert string using convert_buffer - buffer for character set 
 
982
    conversion shared between all protocols.
 
983
 
 
984
  RETURN
 
985
    0   ok
 
986
   !0   out of memory
 
987
*/
 
988
 
 
989
bool THD::convert_string(String *s, const CHARSET_INFO * const from_cs,
 
990
                         const CHARSET_INFO * const to_cs)
 
991
{
 
992
  uint dummy_errors;
 
993
  if (convert_buffer.copy(s->ptr(), s->length(), from_cs, to_cs, &dummy_errors))
 
994
    return true;
 
995
  /* If convert_buffer >> s copying is more efficient long term */
 
996
  if (convert_buffer.alloced_length() >= convert_buffer.length() * 2 ||
 
997
      !s->is_alloced())
 
998
  {
 
999
    return s->copy(convert_buffer);
 
1000
  }
 
1001
  s->swap(convert_buffer);
 
1002
  return false;
 
1003
}
 
1004
 
 
1005
 
 
1006
/*
 
1007
  Update some cache variables when character set changes
 
1008
*/
 
1009
 
 
1010
void THD::update_charset()
 
1011
{
 
1012
  uint32_t not_used;
 
1013
  charset_is_system_charset= !String::needs_conversion(0,charset(),
 
1014
                                                       system_charset_info,
 
1015
                                                       &not_used);
 
1016
  charset_is_collation_connection= 
 
1017
    !String::needs_conversion(0,charset(),variables.collation_connection,
 
1018
                              &not_used);
 
1019
  charset_is_character_set_filesystem= 
 
1020
    !String::needs_conversion(0, charset(),
 
1021
                              variables.character_set_filesystem, &not_used);
 
1022
}
 
1023
 
 
1024
 
 
1025
/* routings to adding tables to list of changed in transaction tables */
 
1026
 
 
1027
inline static void list_include(CHANGED_TableList** prev,
 
1028
                                CHANGED_TableList* curr,
 
1029
                                CHANGED_TableList* new_table)
 
1030
{
 
1031
  if (new_table)
 
1032
  {
 
1033
    *prev = new_table;
 
1034
    (*prev)->next = curr;
 
1035
  }
 
1036
}
 
1037
 
 
1038
/* add table to list of changed in transaction tables */
 
1039
 
 
1040
void THD::add_changed_table(Table *table)
 
1041
{
 
1042
  assert((options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
 
1043
              table->file->has_transactions());
 
1044
  add_changed_table(table->s->table_cache_key.str,
 
1045
                    (long) table->s->table_cache_key.length);
 
1046
  return;
 
1047
}
 
1048
 
 
1049
 
 
1050
void THD::add_changed_table(const char *key, long key_length)
 
1051
{
 
1052
  CHANGED_TableList **prev_changed = &transaction.changed_tables;
 
1053
  CHANGED_TableList *curr = transaction.changed_tables;
 
1054
 
 
1055
  for (; curr; prev_changed = &(curr->next), curr = curr->next)
 
1056
  {
 
1057
    int cmp =  (long)curr->key_length - (long)key_length;
 
1058
    if (cmp < 0)
 
1059
    {
 
1060
      list_include(prev_changed, curr, changed_table_dup(key, key_length));
 
1061
      return;
 
1062
    }
 
1063
    else if (cmp == 0)
 
1064
    {
 
1065
      cmp = memcmp(curr->key, key, curr->key_length);
 
1066
      if (cmp < 0)
 
1067
      {
 
1068
        list_include(prev_changed, curr, changed_table_dup(key, key_length));
 
1069
        return;
 
1070
      }
 
1071
      else if (cmp == 0)
 
1072
      {
 
1073
        return;
 
1074
      }
 
1075
    }
 
1076
  }
 
1077
  *prev_changed = changed_table_dup(key, key_length);
 
1078
  return;
 
1079
}
 
1080
 
 
1081
 
 
1082
CHANGED_TableList* THD::changed_table_dup(const char *key, long key_length)
 
1083
{
 
1084
  CHANGED_TableList* new_table = 
 
1085
    (CHANGED_TableList*) trans_alloc(ALIGN_SIZE(sizeof(CHANGED_TableList))+
 
1086
                                      key_length + 1);
 
1087
  if (!new_table)
 
1088
  {
 
1089
    my_error(EE_OUTOFMEMORY, MYF(ME_BELL),
 
1090
             ALIGN_SIZE(sizeof(TableList)) + key_length + 1);
 
1091
    killed= KILL_CONNECTION;
 
1092
    return 0;
 
1093
  }
 
1094
 
 
1095
  new_table->key= ((char*)new_table)+ ALIGN_SIZE(sizeof(CHANGED_TableList));
 
1096
  new_table->next = 0;
 
1097
  new_table->key_length = key_length;
 
1098
  ::memcpy(new_table->key, key, key_length);
 
1099
  return new_table;
 
1100
}
 
1101
 
 
1102
 
 
1103
int THD::send_explain_fields(select_result *result)
883
1104
{
884
1105
  List<Item> field_list;
885
1106
  Item *item;
914
1135
  }
915
1136
  item->maybe_null= 1;
916
1137
  field_list.push_back(new Item_empty_string("Extra", 255, cs));
917
 
  return (result->send_fields(field_list));
918
 
}
919
 
 
920
 
void select_result::send_error(drizzled::error_t errcode, const char *err)
 
1138
  return (result->send_fields(field_list,
 
1139
                              Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF));
 
1140
}
 
1141
 
 
1142
 
 
1143
struct Item_change_record: public ilink
 
1144
{
 
1145
  Item **place;
 
1146
  Item *old_value;
 
1147
  /* Placement new was hidden by `new' in ilink (TODO: check): */
 
1148
  static void *operator new(size_t size __attribute__((unused)),
 
1149
                            void *mem)
 
1150
    { return mem; }
 
1151
  static void operator delete(void *ptr __attribute__((unused)),
 
1152
                              size_t size __attribute__((unused)))
 
1153
    {}
 
1154
  static void operator delete(void *ptr __attribute__((unused)),
 
1155
                              void *mem __attribute__((unused)))
 
1156
    { /* never called */ }
 
1157
};
 
1158
 
 
1159
 
 
1160
/*
 
1161
  Register an item tree tree transformation, performed by the query
 
1162
  optimizer. We need a pointer to runtime_memroot because it may be !=
 
1163
  thd->mem_root (due to possible set_n_backup_active_arena called for thd).
 
1164
*/
 
1165
 
 
1166
void THD::nocheck_register_item_tree_change(Item **place, Item *old_value,
 
1167
                                            MEM_ROOT *runtime_memroot)
 
1168
{
 
1169
  Item_change_record *change;
 
1170
  /*
 
1171
    Now we use one node per change, which adds some memory overhead,
 
1172
    but still is rather fast as we use alloc_root for allocations.
 
1173
    A list of item tree changes of an average query should be short.
 
1174
  */
 
1175
  void *change_mem= alloc_root(runtime_memroot, sizeof(*change));
 
1176
  if (change_mem == 0)
 
1177
  {
 
1178
    /*
 
1179
      OOM, thd->fatal_error() is called by the error handler of the
 
1180
      memroot. Just return.
 
1181
    */
 
1182
    return;
 
1183
  }
 
1184
  change= new (change_mem) Item_change_record;
 
1185
  change->place= place;
 
1186
  change->old_value= old_value;
 
1187
  change_list.append(change);
 
1188
}
 
1189
 
 
1190
 
 
1191
void THD::rollback_item_tree_changes()
 
1192
{
 
1193
  I_List_iterator<Item_change_record> it(change_list);
 
1194
  Item_change_record *change;
 
1195
 
 
1196
  while ((change= it++))
 
1197
    *change->place= change->old_value;
 
1198
  /* We can forget about changes memory: it's allocated in runtime memroot */
 
1199
  change_list.empty();
 
1200
  return;
 
1201
}
 
1202
 
 
1203
 
 
1204
/*****************************************************************************
 
1205
** Functions to provide a interface to select results
 
1206
*****************************************************************************/
 
1207
 
 
1208
select_result::select_result()
 
1209
{
 
1210
  thd=current_thd;
 
1211
}
 
1212
 
 
1213
void select_result::send_error(uint errcode,const char *err)
921
1214
{
922
1215
  my_message(errcode, err, MYF(0));
923
1216
}
924
1217
 
 
1218
 
 
1219
void select_result::cleanup()
 
1220
{
 
1221
  /* do nothing */
 
1222
}
 
1223
 
 
1224
bool select_result::check_simple_select() const
 
1225
{
 
1226
  my_error(ER_SP_BAD_CURSOR_QUERY, MYF(0));
 
1227
  return true;
 
1228
}
 
1229
 
 
1230
 
 
1231
static String default_line_term("\n",default_charset_info);
 
1232
static String default_escaped("\\",default_charset_info);
 
1233
static String default_field_term("\t",default_charset_info);
 
1234
 
 
1235
sql_exchange::sql_exchange(char *name, bool flag,
 
1236
                           enum enum_filetype filetype_arg)
 
1237
  :file_name(name), opt_enclosed(0), dumpfile(flag), skip_lines(0)
 
1238
{
 
1239
  filetype= filetype_arg;
 
1240
  field_term= &default_field_term;
 
1241
  enclosed=   line_start= &my_empty_string;
 
1242
  line_term=  &default_line_term;
 
1243
  escaped=    &default_escaped;
 
1244
  cs= NULL;
 
1245
}
 
1246
 
 
1247
bool select_send::send_fields(List<Item> &list, uint flags)
 
1248
{
 
1249
  bool res;
 
1250
  if (!(res= thd->protocol->send_fields(&list, flags)))
 
1251
    is_result_set_started= 1;
 
1252
  return res;
 
1253
}
 
1254
 
 
1255
void select_send::abort()
 
1256
{
 
1257
  return;
 
1258
}
 
1259
 
 
1260
 
 
1261
/** 
 
1262
  Cleanup an instance of this class for re-use
 
1263
  at next execution of a prepared statement/
 
1264
  stored procedure statement.
 
1265
*/
 
1266
 
 
1267
void select_send::cleanup()
 
1268
{
 
1269
  is_result_set_started= false;
 
1270
}
 
1271
 
 
1272
/* Send data to client. Returns 0 if ok */
 
1273
 
 
1274
bool select_send::send_data(List<Item> &items)
 
1275
{
 
1276
  if (unit->offset_limit_cnt)
 
1277
  {                                             // using limit offset,count
 
1278
    unit->offset_limit_cnt--;
 
1279
    return 0;
 
1280
  }
 
1281
 
 
1282
  /*
 
1283
    We may be passing the control from mysqld to the client: release the
 
1284
    InnoDB adaptive hash S-latch to avoid thread deadlocks if it was reserved
 
1285
    by thd
 
1286
  */
 
1287
  ha_release_temporary_latches(thd);
 
1288
 
 
1289
  List_iterator_fast<Item> li(items);
 
1290
  Protocol *protocol= thd->protocol;
 
1291
  char buff[MAX_FIELD_WIDTH];
 
1292
  String buffer(buff, sizeof(buff), &my_charset_bin);
 
1293
 
 
1294
  protocol->prepare_for_resend();
 
1295
  Item *item;
 
1296
  while ((item=li++))
 
1297
  {
 
1298
    if (item->send(protocol, &buffer))
 
1299
    {
 
1300
      protocol->free();                         // Free used buffer
 
1301
      my_message(ER_OUT_OF_RESOURCES, ER(ER_OUT_OF_RESOURCES), MYF(0));
 
1302
      break;
 
1303
    }
 
1304
  }
 
1305
  thd->sent_row_count++;
 
1306
  if (thd->is_error())
 
1307
  {
 
1308
    protocol->remove_last_row();
 
1309
    return(1);
 
1310
  }
 
1311
  if (thd->vio_ok())
 
1312
    return(protocol->write());
 
1313
  return(0);
 
1314
}
 
1315
 
 
1316
bool select_send::send_eof()
 
1317
{
 
1318
  /* 
 
1319
    We may be passing the control from mysqld to the client: release the
 
1320
    InnoDB adaptive hash S-latch to avoid thread deadlocks if it was reserved
 
1321
    by thd 
 
1322
  */
 
1323
  ha_release_temporary_latches(thd);
 
1324
 
 
1325
  /* Unlock tables before sending packet to gain some speed */
 
1326
  if (thd->lock)
 
1327
  {
 
1328
    mysql_unlock_tables(thd, thd->lock);
 
1329
    thd->lock=0;
 
1330
  }
 
1331
  ::my_eof(thd);
 
1332
  is_result_set_started= 0;
 
1333
  return false;
 
1334
}
 
1335
 
 
1336
 
925
1337
/************************************************************************
926
1338
  Handling writing to file
927
1339
************************************************************************/
928
1340
 
929
 
void select_to_file::send_error(drizzled::error_t errcode,const char *err)
 
1341
void select_to_file::send_error(uint errcode,const char *err)
930
1342
{
931
1343
  my_message(errcode, err, MYF(0));
932
1344
  if (file > 0)
933
1345
  {
934
 
    (void) cache->end_io_cache();
935
 
    (void) internal::my_close(file, MYF(0));
936
 
    (void) internal::my_delete(path.file_string().c_str(), MYF(0));             // Delete file on error
 
1346
    (void) end_io_cache(&cache);
 
1347
    (void) my_close(file,MYF(0));
 
1348
    (void) my_delete(path,MYF(0));              // Delete file on error
937
1349
    file= -1;
938
1350
  }
939
1351
}
941
1353
 
942
1354
bool select_to_file::send_eof()
943
1355
{
944
 
  int error= test(cache->end_io_cache());
945
 
  if (internal::my_close(file, MYF(MY_WME)))
 
1356
  int error= test(end_io_cache(&cache));
 
1357
  if (my_close(file,MYF(MY_WME)))
946
1358
    error= 1;
947
1359
  if (!error)
948
1360
  {
951
1363
      function, SELECT INTO has to have an own SQLCOM.
952
1364
      TODO: split from SQLCOM_SELECT
953
1365
    */
954
 
    session->my_ok(row_count);
 
1366
    ::my_ok(thd,row_count);
955
1367
  }
956
1368
  file= -1;
957
1369
  return error;
963
1375
  /* In case of error send_eof() may be not called: close the file here. */
964
1376
  if (file >= 0)
965
1377
  {
966
 
    (void) cache->end_io_cache();
967
 
    (void) internal::my_close(file, MYF(0));
 
1378
    (void) end_io_cache(&cache);
 
1379
    (void) my_close(file,MYF(0));
968
1380
    file= -1;
969
1381
  }
970
 
  path= "";
 
1382
  path[0]= '\0';
971
1383
  row_count= 0;
972
1384
}
973
1385
 
974
 
select_to_file::select_to_file(file_exchange *ex)
975
 
  : exchange(ex),
976
 
    file(-1),
977
 
    cache(static_cast<internal::IO_CACHE *>(memory::sql_calloc(sizeof(internal::IO_CACHE)))),
978
 
    row_count(0L)
979
 
{
980
 
  path= "";
981
 
}
982
1386
 
983
1387
select_to_file::~select_to_file()
984
1388
{
985
 
  cleanup();
 
1389
  if (file >= 0)
 
1390
  {                                     // This only happens in case of error
 
1391
    (void) end_io_cache(&cache);
 
1392
    (void) my_close(file,MYF(0));
 
1393
    file= -1;
 
1394
  }
986
1395
}
987
1396
 
988
1397
/***************************************************************************
991
1400
 
992
1401
select_export::~select_export()
993
1402
{
994
 
  session->sent_row_count=row_count;
 
1403
  thd->sent_row_count=row_count;
995
1404
}
996
1405
 
997
1406
 
1000
1409
 
1001
1410
  SYNOPSIS
1002
1411
    create_file()
1003
 
    session                     Thread handle
 
1412
    thd                 Thread handle
1004
1413
    path                File name
1005
1414
    exchange            Excange class
1006
1415
    cache               IO cache
1011
1420
*/
1012
1421
 
1013
1422
 
1014
 
static int create_file(Session *session,
1015
 
                       fs::path &target_path,
1016
 
                       file_exchange *exchange,
1017
 
                       internal::IO_CACHE *cache)
 
1423
static File create_file(THD *thd, char *path, sql_exchange *exchange,
 
1424
                        IO_CACHE *cache)
1018
1425
{
1019
 
  fs::path to_file(exchange->file_name);
1020
 
  int file;
1021
 
 
1022
 
  if (not to_file.has_root_directory())
 
1426
  File file;
 
1427
  uint option= MY_UNPACK_FILENAME | MY_RELATIVE_PATH;
 
1428
 
 
1429
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
 
1430
  option|= MY_REPLACE_DIR;                      // Force use of db directory
 
1431
#endif
 
1432
 
 
1433
  if (!dirname_length(exchange->file_name))
1023
1434
  {
1024
 
    target_path= fs::system_complete(getDataHomeCatalog());
1025
 
    util::string::const_shared_ptr schema(session->schema());
1026
 
    if (schema and not schema->empty())
1027
 
    {
1028
 
      int count_elements= 0;
1029
 
      for (fs::path::iterator iter= to_file.begin();
1030
 
           iter != to_file.end();
1031
 
           ++iter, ++count_elements)
1032
 
      { }
1033
 
 
1034
 
      if (count_elements == 1)
1035
 
      {
1036
 
        target_path /= *schema;
1037
 
      }
1038
 
    }
1039
 
    target_path /= to_file;
 
1435
    strxnmov(path, FN_REFLEN-1, mysql_real_data_home, thd->db ? thd->db : "",
 
1436
             NullS);
 
1437
    (void) fn_format(path, exchange->file_name, path, "", option);
1040
1438
  }
1041
1439
  else
1042
 
  {
1043
 
    target_path = exchange->file_name;
1044
 
  }
1045
 
 
1046
 
  if (not secure_file_priv.string().empty())
1047
 
  {
1048
 
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
1049
 
    {
1050
 
      /* Write only allowed to dir or subdir specified by secure_file_priv */
1051
 
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
1052
 
      return -1;
1053
 
    }
1054
 
  }
1055
 
 
1056
 
  if (!access(target_path.file_string().c_str(), F_OK))
 
1440
    (void) fn_format(path, exchange->file_name, mysql_real_data_home, "", option);
 
1441
 
 
1442
  if (opt_secure_file_priv &&
 
1443
      strncmp(opt_secure_file_priv, path, strlen(opt_secure_file_priv)))
 
1444
  {
 
1445
    /* Write only allowed to dir or subdir specified by secure_file_priv */
 
1446
    my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
1447
    return -1;
 
1448
  }
 
1449
 
 
1450
  if (!access(path, F_OK))
1057
1451
  {
1058
1452
    my_error(ER_FILE_EXISTS_ERROR, MYF(0), exchange->file_name);
1059
1453
    return -1;
1060
1454
  }
1061
1455
  /* Create the file world readable */
1062
 
  if ((file= internal::my_create(target_path.file_string().c_str(), 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
 
1456
  if ((file= my_create(path, 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
1063
1457
    return file;
 
1458
#ifdef HAVE_FCHMOD
1064
1459
  (void) fchmod(file, 0666);                    // Because of umask()
1065
 
  if (cache->init_io_cache(file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
 
1460
#else
 
1461
  (void) chmod(path, 0666);
 
1462
#endif
 
1463
  if (init_io_cache(cache, file, 0L, WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1066
1464
  {
1067
 
    internal::my_close(file, MYF(0));
1068
 
    internal::my_delete(target_path.file_string().c_str(), MYF(0));  // Delete file on error, it was just created
 
1465
    my_close(file, MYF(0));
 
1466
    my_delete(path, MYF(0));  // Delete file on error, it was just created 
1069
1467
    return -1;
1070
1468
  }
1071
1469
  return file;
1073
1471
 
1074
1472
 
1075
1473
int
1076
 
select_export::prepare(List<Item> &list, Select_Lex_Unit *u)
 
1474
select_export::prepare(List<Item> &list, SELECT_LEX_UNIT *u)
1077
1475
{
1078
1476
  bool blob_flag=0;
1079
1477
  bool string_results= false, non_string_results= false;
1080
1478
  unit= u;
1081
 
  if ((uint32_t) strlen(exchange->file_name) + NAME_LEN >= FN_REFLEN)
1082
 
  {
1083
 
    path= exchange->file_name;
1084
 
  }
 
1479
  if ((uint) strlen(exchange->file_name) + NAME_LEN >= FN_REFLEN)
 
1480
    strmake(path,exchange->file_name,FN_REFLEN-1);
1085
1481
 
 
1482
  if ((file= create_file(thd, path, exchange, &cache)) < 0)
 
1483
    return 1;
1086
1484
  /* Check if there is any blobs in data */
1087
1485
  {
1088
1486
    List_iterator_fast<Item> li(list);
1091
1489
    {
1092
1490
      if (item->max_length >= MAX_BLOB_WIDTH)
1093
1491
      {
1094
 
        blob_flag=1;
1095
 
        break;
 
1492
        blob_flag=1;
 
1493
        break;
1096
1494
      }
1097
 
 
1098
1495
      if (item->result_type() == STRING_RESULT)
1099
1496
        string_results= true;
1100
1497
      else
1103
1500
  }
1104
1501
  field_term_length=exchange->field_term->length();
1105
1502
  field_term_char= field_term_length ?
1106
 
                   (int) (unsigned char) (*exchange->field_term)[0] : INT_MAX;
 
1503
                   (int) (uchar) (*exchange->field_term)[0] : INT_MAX;
1107
1504
  if (!exchange->line_term->length())
1108
1505
    exchange->line_term=exchange->field_term;   // Use this if it exists
1109
1506
  field_sep_char= (exchange->enclosed->length() ?
1110
 
                  (int) (unsigned char) (*exchange->enclosed)[0] : field_term_char);
 
1507
                  (int) (uchar) (*exchange->enclosed)[0] : field_term_char);
1111
1508
  escape_char=  (exchange->escaped->length() ?
1112
 
                (int) (unsigned char) (*exchange->escaped)[0] : -1);
 
1509
                (int) (uchar) (*exchange->escaped)[0] : -1);
1113
1510
  is_ambiguous_field_sep= test(strchr(ESCAPE_CHARS, field_sep_char));
1114
1511
  is_unsafe_field_sep= test(strchr(NUMERIC_CHARS, field_sep_char));
1115
1512
  line_sep_char= (exchange->line_term->length() ?
1116
 
                 (int) (unsigned char) (*exchange->line_term)[0] : INT_MAX);
 
1513
                 (int) (uchar) (*exchange->line_term)[0] : INT_MAX);
1117
1514
  if (!field_term_length)
1118
1515
    exchange->opt_enclosed=0;
1119
1516
  if (!exchange->enclosed->length())
1125
1522
      (exchange->opt_enclosed && non_string_results &&
1126
1523
       field_term_length && strchr(NUMERIC_CHARS, field_term_char)))
1127
1524
  {
1128
 
    my_error(ER_AMBIGUOUS_FIELD_TERM, MYF(0));
1129
 
    return 1;
 
1525
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1526
                 ER_AMBIGUOUS_FIELD_TERM, ER(ER_AMBIGUOUS_FIELD_TERM));
 
1527
    is_ambiguous_field_term= true;
1130
1528
  }
1131
 
 
1132
 
  if ((file= create_file(session, path, exchange, cache)) < 0)
1133
 
    return 1;
 
1529
  else
 
1530
    is_ambiguous_field_term= false;
1134
1531
 
1135
1532
  return 0;
1136
1533
}
1137
1534
 
 
1535
 
 
1536
#define NEED_ESCAPING(x) ((int) (uchar) (x) == escape_char    || \
 
1537
                          (enclosed ? (int) (uchar) (x) == field_sep_char      \
 
1538
                                    : (int) (uchar) (x) == field_term_char) || \
 
1539
                          (int) (uchar) (x) == line_sep_char  || \
 
1540
                          !(x))
 
1541
 
1138
1542
bool select_export::send_data(List<Item> &items)
1139
1543
{
1140
1544
  char buff[MAX_FIELD_WIDTH],null_buff[2],space[MAX_FIELD_WIDTH];
1145
1549
  if (unit->offset_limit_cnt)
1146
1550
  {                                             // using limit offset,count
1147
1551
    unit->offset_limit_cnt--;
1148
 
    return false;
 
1552
    return(0);
1149
1553
  }
1150
1554
  row_count++;
1151
1555
  Item *item;
1152
 
  uint32_t used_length=0,items_left=items.elements;
 
1556
  uint used_length=0,items_left=items.elements;
1153
1557
  List_iterator_fast<Item> li(items);
1154
1558
 
1155
 
  if (my_b_write(cache,(unsigned char*) exchange->line_start->ptr(),
1156
 
                 exchange->line_start->length()))
1157
 
    return true;
1158
 
 
 
1559
  if (my_b_write(&cache,(uchar*) exchange->line_start->ptr(),
 
1560
                 exchange->line_start->length()))
 
1561
    goto err;
1159
1562
  while ((item=li++))
1160
1563
  {
1161
1564
    Item_result result_type=item->result_type();
1164
1567
    res=item->str_result(&tmp);
1165
1568
    if (res && enclosed)
1166
1569
    {
1167
 
      if (my_b_write(cache,(unsigned char*) exchange->enclosed->ptr(),
1168
 
                     exchange->enclosed->length()))
1169
 
        return true;
 
1570
      if (my_b_write(&cache,(uchar*) exchange->enclosed->ptr(),
 
1571
                     exchange->enclosed->length()))
 
1572
        goto err;
1170
1573
    }
1171
1574
    if (!res)
1172
1575
    {                                           // NULL
1173
1576
      if (!fixed_row_size)
1174
1577
      {
1175
 
        if (escape_char != -1)                  // Use \N syntax
1176
 
        {
1177
 
          null_buff[0]=escape_char;
1178
 
          null_buff[1]='N';
1179
 
          if (my_b_write(cache,(unsigned char*) null_buff,2))
1180
 
            return true;
1181
 
        }
1182
 
        else if (my_b_write(cache,(unsigned char*) "NULL",4))
1183
 
          return true;
 
1578
        if (escape_char != -1)                  // Use \N syntax
 
1579
        {
 
1580
          null_buff[0]=escape_char;
 
1581
          null_buff[1]='N';
 
1582
          if (my_b_write(&cache,(uchar*) null_buff,2))
 
1583
            goto err;
 
1584
        }
 
1585
        else if (my_b_write(&cache,(uchar*) "NULL",4))
 
1586
          goto err;
1184
1587
      }
1185
1588
      else
1186
1589
      {
1187
 
        used_length=0;                          // Fill with space
 
1590
        used_length=0;                          // Fill with space
1188
1591
      }
1189
1592
    }
1190
1593
    else
1191
1594
    {
1192
1595
      if (fixed_row_size)
1193
 
        used_length= min(res->length(), static_cast<size_t>(item->max_length));
 
1596
        used_length=min(res->length(),item->max_length);
1194
1597
      else
1195
 
        used_length= res->length();
1196
 
 
 
1598
        used_length=res->length();
1197
1599
      if ((result_type == STRING_RESULT || is_unsafe_field_sep) &&
1198
 
          escape_char != -1)
 
1600
           escape_char != -1)
1199
1601
      {
1200
1602
        char *pos, *start, *end;
1201
1603
        const CHARSET_INFO * const res_charset= res->charset();
1202
 
        const CHARSET_INFO * const character_set_client= default_charset_info;
1203
 
 
 
1604
        const CHARSET_INFO * const character_set_client= thd->variables.
 
1605
                                                            character_set_client;
1204
1606
        bool check_second_byte= (res_charset == &my_charset_bin) &&
1205
 
          character_set_client->
1206
 
          escape_with_backslash_is_dangerous;
 
1607
                                 character_set_client->
 
1608
                                 escape_with_backslash_is_dangerous;
1207
1609
        assert(character_set_client->mbmaxlen == 2 ||
1208
 
               !character_set_client->escape_with_backslash_is_dangerous);
1209
 
        for (start=pos=(char*) res->ptr(),end=pos+used_length ;
1210
 
             pos != end ;
1211
 
             pos++)
1212
 
        {
1213
 
          if (use_mb(res_charset))
1214
 
          {
1215
 
            int l;
1216
 
            if ((l=my_ismbchar(res_charset, pos, end)))
1217
 
            {
1218
 
              pos += l-1;
1219
 
              continue;
1220
 
            }
1221
 
          }
 
1610
                    !character_set_client->escape_with_backslash_is_dangerous);
 
1611
        for (start=pos=(char*) res->ptr(),end=pos+used_length ;
 
1612
             pos != end ;
 
1613
             pos++)
 
1614
        {
 
1615
#ifdef USE_MB
 
1616
          if (use_mb(res_charset))
 
1617
          {
 
1618
            int l;
 
1619
            if ((l=my_ismbchar(res_charset, pos, end)))
 
1620
            {
 
1621
              pos += l-1;
 
1622
              continue;
 
1623
            }
 
1624
          }
 
1625
#endif
1222
1626
 
1223
1627
          /*
1224
1628
            Special case when dumping BINARY/VARBINARY/BLOB values
1225
1629
            for the clients with character sets big5, cp932, gbk and sjis,
1226
1630
            which can have the escape character (0x5C "\" by default)
1227
1631
            as the second byte of a multi-byte sequence.
1228
 
 
 
1632
            
1229
1633
            If
1230
1634
            - pos[0] is a valid multi-byte head (e.g 0xEE) and
1231
1635
            - pos[1] is 0x00, which will be escaped as "\0",
1232
 
 
 
1636
            
1233
1637
            then we'll get "0xEE + 0x5C + 0x30" in the output file.
1234
 
 
 
1638
            
1235
1639
            If this file is later loaded using this sequence of commands:
1236
 
 
 
1640
            
1237
1641
            mysql> create table t1 (a varchar(128)) character set big5;
1238
1642
            mysql> LOAD DATA INFILE 'dump.txt' INTO Table t1;
1239
 
 
 
1643
            
1240
1644
            then 0x5C will be misinterpreted as the second byte
1241
1645
            of a multi-byte character "0xEE + 0x5C", instead of
1242
1646
            escape character for 0x00.
1243
 
 
 
1647
            
1244
1648
            To avoid this confusion, we'll escape the multi-byte
1245
1649
            head character too, so the sequence "0xEE + 0x00" will be
1246
1650
            dumped as "0x5C + 0xEE + 0x5C + 0x30".
1247
 
 
 
1651
            
1248
1652
            Note, in the condition below we only check if
1249
1653
            mbcharlen is equal to 2, because there are no
1250
1654
            character sets with mbmaxlen longer than 2
1252
1656
            assert before the loop makes that sure.
1253
1657
          */
1254
1658
 
1255
 
          if ((needs_escaping(*pos, enclosed) ||
 
1659
          if ((NEED_ESCAPING(*pos) ||
1256
1660
               (check_second_byte &&
1257
 
                my_mbcharlen(character_set_client, (unsigned char) *pos) == 2 &&
 
1661
                my_mbcharlen(character_set_client, (uchar) *pos) == 2 &&
1258
1662
                pos + 1 < end &&
1259
 
                needs_escaping(pos[1], enclosed))) &&
 
1663
                NEED_ESCAPING(pos[1]))) &&
1260
1664
              /*
1261
 
                Don't escape field_term_char by doubling - doubling is only
1262
 
                valid for ENCLOSED BY characters:
 
1665
               Don't escape field_term_char by doubling - doubling is only
 
1666
               valid for ENCLOSED BY characters:
1263
1667
              */
1264
1668
              (enclosed || !is_ambiguous_field_term ||
1265
 
               (int) (unsigned char) *pos != field_term_char))
 
1669
               (int) (uchar) *pos != field_term_char))
1266
1670
          {
1267
 
            char tmp_buff[2];
1268
 
            tmp_buff[0]= ((int) (unsigned char) *pos == field_sep_char &&
 
1671
            char tmp_buff[2];
 
1672
            tmp_buff[0]= ((int) (uchar) *pos == field_sep_char &&
1269
1673
                          is_ambiguous_field_sep) ?
1270
 
              field_sep_char : escape_char;
1271
 
            tmp_buff[1]= *pos ? *pos : '0';
1272
 
            if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)) ||
1273
 
                my_b_write(cache,(unsigned char*) tmp_buff,2))
1274
 
              return true;
1275
 
            start=pos+1;
1276
 
          }
1277
 
        }
1278
 
        if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)))
1279
 
          return true;
 
1674
                          field_sep_char : escape_char;
 
1675
            tmp_buff[1]= *pos ? *pos : '0';
 
1676
            if (my_b_write(&cache,(uchar*) start,(uint) (pos-start)) ||
 
1677
                my_b_write(&cache,(uchar*) tmp_buff,2))
 
1678
              goto err;
 
1679
            start=pos+1;
 
1680
          }
 
1681
        }
 
1682
        if (my_b_write(&cache,(uchar*) start,(uint) (pos-start)))
 
1683
          goto err;
1280
1684
      }
1281
 
      else if (my_b_write(cache,(unsigned char*) res->ptr(),used_length))
1282
 
        return true;
 
1685
      else if (my_b_write(&cache,(uchar*) res->ptr(),used_length))
 
1686
        goto err;
1283
1687
    }
1284
1688
    if (fixed_row_size)
1285
1689
    {                                           // Fill with space
1286
1690
      if (item->max_length > used_length)
1287
1691
      {
1288
 
        /* QQ:  Fix by adding a my_b_fill() function */
1289
 
        if (!space_inited)
1290
 
        {
1291
 
          space_inited=1;
1292
 
          memset(space, ' ', sizeof(space));
1293
 
        }
1294
 
        uint32_t length=item->max_length-used_length;
1295
 
        for (; length > sizeof(space) ; length-=sizeof(space))
1296
 
        {
1297
 
          if (my_b_write(cache,(unsigned char*) space,sizeof(space)))
1298
 
            return true;
1299
 
        }
1300
 
        if (my_b_write(cache,(unsigned char*) space,length))
1301
 
          return true;
 
1692
        /* QQ:  Fix by adding a my_b_fill() function */
 
1693
        if (!space_inited)
 
1694
        {
 
1695
          space_inited=1;
 
1696
          memset(space, ' ', sizeof(space));
 
1697
        }
 
1698
        uint length=item->max_length-used_length;
 
1699
        for (; length > sizeof(space) ; length-=sizeof(space))
 
1700
        {
 
1701
          if (my_b_write(&cache,(uchar*) space,sizeof(space)))
 
1702
            goto err;
 
1703
        }
 
1704
        if (my_b_write(&cache,(uchar*) space,length))
 
1705
          goto err;
1302
1706
      }
1303
1707
    }
1304
1708
    if (res && enclosed)
1305
1709
    {
1306
 
      if (my_b_write(cache, (unsigned char*) exchange->enclosed->ptr(),
 
1710
      if (my_b_write(&cache, (uchar*) exchange->enclosed->ptr(),
1307
1711
                     exchange->enclosed->length()))
1308
 
        return true;
 
1712
        goto err;
1309
1713
    }
1310
1714
    if (--items_left)
1311
1715
    {
1312
 
      if (my_b_write(cache, (unsigned char*) exchange->field_term->ptr(),
 
1716
      if (my_b_write(&cache, (uchar*) exchange->field_term->ptr(),
1313
1717
                     field_term_length))
1314
 
        return true;
 
1718
        goto err;
1315
1719
    }
1316
1720
  }
1317
 
  if (my_b_write(cache,(unsigned char*) exchange->line_term->ptr(),
1318
 
                 exchange->line_term->length()))
1319
 
  {
1320
 
    return true;
1321
 
  }
1322
 
 
1323
 
  return false;
 
1721
  if (my_b_write(&cache,(uchar*) exchange->line_term->ptr(),
 
1722
                 exchange->line_term->length()))
 
1723
    goto err;
 
1724
  return(0);
 
1725
err:
 
1726
  return(1);
1324
1727
}
1325
1728
 
1326
1729
 
1330
1733
 
1331
1734
 
1332
1735
int
1333
 
select_dump::prepare(List<Item> &, Select_Lex_Unit *u)
 
1736
select_dump::prepare(List<Item> &list __attribute__((unused)),
 
1737
                     SELECT_LEX_UNIT *u)
1334
1738
{
1335
1739
  unit= u;
1336
 
  return (int) ((file= create_file(session, path, exchange, cache)) < 0);
 
1740
  return (int) ((file= create_file(thd, path, exchange, &cache)) < 0);
1337
1741
}
1338
1742
 
1339
1743
 
1350
1754
    unit->offset_limit_cnt--;
1351
1755
    return(0);
1352
1756
  }
1353
 
  if (row_count++ > 1)
 
1757
  if (row_count++ > 1) 
1354
1758
  {
1355
1759
    my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0));
1356
 
    return 1;
 
1760
    goto err;
1357
1761
  }
1358
1762
  while ((item=li++))
1359
1763
  {
1360
1764
    res=item->str_result(&tmp);
1361
1765
    if (!res)                                   // If NULL
1362
1766
    {
1363
 
      if (my_b_write(cache,(unsigned char*) "",1))
1364
 
        return 1;
 
1767
      if (my_b_write(&cache,(uchar*) "",1))
 
1768
        goto err;
1365
1769
    }
1366
 
    else if (my_b_write(cache,(unsigned char*) res->ptr(),res->length()))
 
1770
    else if (my_b_write(&cache,(uchar*) res->ptr(),res->length()))
1367
1771
    {
1368
 
      my_error(ER_ERROR_ON_WRITE, MYF(0), path.file_string().c_str(), errno);
1369
 
      return 1;
 
1772
      my_error(ER_ERROR_ON_WRITE, MYF(0), path, my_errno);
 
1773
      goto err;
1370
1774
    }
1371
1775
  }
1372
1776
  return(0);
 
1777
err:
 
1778
  return(1);
1373
1779
}
1374
1780
 
1375
1781
 
1394
1800
  }
1395
1801
  List_iterator_fast<Item> li(items);
1396
1802
  Item *val_item;
1397
 
  for (uint32_t i= 0; (val_item= li++); i++)
 
1803
  for (uint i= 0; (val_item= li++); i++)
1398
1804
    it->store(i, val_item);
1399
1805
  it->assigned(1);
1400
1806
  return(0);
1404
1810
void select_max_min_finder_subselect::cleanup()
1405
1811
{
1406
1812
  cache= 0;
 
1813
  return;
1407
1814
}
1408
1815
 
1409
1816
 
1427
1834
      switch (val_item->result_type())
1428
1835
      {
1429
1836
      case REAL_RESULT:
1430
 
        op= &select_max_min_finder_subselect::cmp_real;
1431
 
        break;
 
1837
        op= &select_max_min_finder_subselect::cmp_real;
 
1838
        break;
1432
1839
      case INT_RESULT:
1433
 
        op= &select_max_min_finder_subselect::cmp_int;
1434
 
        break;
 
1840
        op= &select_max_min_finder_subselect::cmp_int;
 
1841
        break;
1435
1842
      case STRING_RESULT:
1436
 
        op= &select_max_min_finder_subselect::cmp_str;
1437
 
        break;
 
1843
        op= &select_max_min_finder_subselect::cmp_str;
 
1844
        break;
1438
1845
      case DECIMAL_RESULT:
1439
1846
        op= &select_max_min_finder_subselect::cmp_decimal;
1440
1847
        break;
1441
1848
      case ROW_RESULT:
1442
1849
        // This case should never be choosen
1443
 
        assert(0);
1444
 
        op= 0;
 
1850
        assert(0);
 
1851
        op= 0;
1445
1852
      }
1446
1853
    }
1447
1854
    cache->store(val_item);
1480
1887
bool select_max_min_finder_subselect::cmp_decimal()
1481
1888
{
1482
1889
  Item *maxmin= ((Item_singlerow_subselect *)item)->element_index(0);
1483
 
  type::Decimal cval, *cvalue= cache->val_decimal(&cval);
1484
 
  type::Decimal mval, *mvalue= maxmin->val_decimal(&mval);
 
1890
  my_decimal cval, *cvalue= cache->val_decimal(&cval);
 
1891
  my_decimal mval, *mvalue= maxmin->val_decimal(&mval);
1485
1892
  if (fmax)
1486
1893
    return (cache->null_value && !maxmin->null_value) ||
1487
1894
      (!cache->null_value && !maxmin->null_value &&
1488
 
       class_decimal_cmp(cvalue, mvalue) > 0) ;
 
1895
       my_decimal_cmp(cvalue, mvalue) > 0) ;
1489
1896
  return (maxmin->null_value && !cache->null_value) ||
1490
1897
    (!cache->null_value && !maxmin->null_value &&
1491
 
     class_decimal_cmp(cvalue,mvalue) < 0);
 
1898
     my_decimal_cmp(cvalue,mvalue) < 0);
1492
1899
}
1493
1900
 
1494
1901
bool select_max_min_finder_subselect::cmp_str()
1510
1917
     sortcmp(val1, val2, cache->collation.collation) < 0);
1511
1918
}
1512
1919
 
1513
 
bool select_exists_subselect::send_data(List<Item> &)
 
1920
bool select_exists_subselect::send_data(List<Item> &items __attribute__((unused)))
1514
1921
{
1515
1922
  Item_exists_subselect *it= (Item_exists_subselect *)item;
1516
1923
  if (unit->offset_limit_cnt)
1523
1930
  return(0);
1524
1931
}
1525
1932
 
 
1933
 
 
1934
/***************************************************************************
 
1935
  Dump of select to variables
 
1936
***************************************************************************/
 
1937
 
 
1938
int select_dumpvar::prepare(List<Item> &list, SELECT_LEX_UNIT *u)
 
1939
{
 
1940
  unit= u;
 
1941
  
 
1942
  if (var_list.elements != list.elements)
 
1943
  {
 
1944
    my_message(ER_WRONG_NUMBER_OF_COLUMNS_IN_SELECT,
 
1945
               ER(ER_WRONG_NUMBER_OF_COLUMNS_IN_SELECT), MYF(0));
 
1946
    return 1;
 
1947
  }               
 
1948
  return 0;
 
1949
}
 
1950
 
 
1951
 
 
1952
bool select_dumpvar::check_simple_select() const
 
1953
{
 
1954
  my_error(ER_SP_BAD_CURSOR_SELECT, MYF(0));
 
1955
  return true;
 
1956
}
 
1957
 
 
1958
 
 
1959
void select_dumpvar::cleanup()
 
1960
{
 
1961
  row_count= 0;
 
1962
}
 
1963
 
 
1964
 
 
1965
void Query_arena::free_items()
 
1966
{
 
1967
  Item *next;
 
1968
  /* This works because items are allocated with sql_alloc() */
 
1969
  for (; free_list; free_list= next)
 
1970
  {
 
1971
    next= free_list->next;
 
1972
    free_list->delete_self();
 
1973
  }
 
1974
  /* Postcondition: free_list is 0 */
 
1975
  return;
 
1976
}
 
1977
 
 
1978
 
 
1979
void Query_arena::set_query_arena(Query_arena *set)
 
1980
{
 
1981
  mem_root= set->mem_root;
 
1982
  free_list= set->free_list;
 
1983
  state= set->state;
 
1984
}
 
1985
 
 
1986
 
 
1987
void Query_arena::cleanup_stmt()
 
1988
{
 
1989
  assert("not implemented");
 
1990
}
 
1991
 
1526
1992
/*
1527
 
  Don't free mem_root, as mem_root is freed in the end of dispatch_command
1528
 
  (once for any command).
 
1993
  Statement functions
1529
1994
*/
1530
 
void Session::end_statement()
 
1995
 
 
1996
Statement::Statement(LEX *lex_arg, MEM_ROOT *mem_root_arg,
 
1997
                     enum enum_state state_arg, ulong id_arg)
 
1998
  :Query_arena(mem_root_arg, state_arg),
 
1999
  id(id_arg),
 
2000
  mark_used_columns(MARK_COLUMNS_READ),
 
2001
  lex(lex_arg),
 
2002
  query(0),
 
2003
  query_length(0),
 
2004
  db(NULL),
 
2005
  db_length(0)
 
2006
{
 
2007
  name.str= NULL;
 
2008
}
 
2009
 
 
2010
 
 
2011
void Statement::set_statement(Statement *stmt)
 
2012
{
 
2013
  id=             stmt->id;
 
2014
  mark_used_columns=   stmt->mark_used_columns;
 
2015
  lex=            stmt->lex;
 
2016
  query=          stmt->query;
 
2017
  query_length=   stmt->query_length;
 
2018
}
 
2019
 
 
2020
 
 
2021
void
 
2022
Statement::set_n_backup_statement(Statement *stmt, Statement *backup)
 
2023
{
 
2024
  backup->set_statement(this);
 
2025
  set_statement(stmt);
 
2026
  return;
 
2027
}
 
2028
 
 
2029
 
 
2030
void Statement::restore_backup_statement(Statement *stmt, Statement *backup)
 
2031
{
 
2032
  stmt->set_statement(this);
 
2033
  set_statement(backup);
 
2034
  return;
 
2035
}
 
2036
 
 
2037
 
 
2038
void THD::end_statement()
1531
2039
{
1532
2040
  /* Cleanup SQL processing state to reuse this statement in next query. */
1533
 
  lex->end();
1534
 
  query_cache_key= ""; // reset the cache key
1535
 
  resetResultsetMessage();
1536
 
}
1537
 
 
1538
 
bool Session::copy_db_to(char **p_db, size_t *p_db_length)
1539
 
{
1540
 
  assert(_schema);
1541
 
  if (_schema and _schema->empty())
1542
 
  {
1543
 
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1544
 
    return true;
1545
 
  }
1546
 
  else if (not _schema)
1547
 
  {
1548
 
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1549
 
    return true;
1550
 
  }
1551
 
  assert(_schema);
1552
 
 
1553
 
  *p_db= strmake(_schema->c_str(), _schema->size());
1554
 
  *p_db_length= _schema->size();
1555
 
 
 
2041
  lex_end(lex);
 
2042
  delete lex->result;
 
2043
  lex->result= 0;
 
2044
  /* Note that free_list is freed in cleanup_after_query() */
 
2045
 
 
2046
  /*
 
2047
    Don't free mem_root, as mem_root is freed in the end of dispatch_command
 
2048
    (once for any command).
 
2049
  */
 
2050
}
 
2051
 
 
2052
 
 
2053
void THD::set_n_backup_active_arena(Query_arena *set, Query_arena *backup)
 
2054
{
 
2055
  assert(backup->is_backup_arena == false);
 
2056
 
 
2057
  backup->set_query_arena(this);
 
2058
  set_query_arena(set);
 
2059
  backup->is_backup_arena= true;
 
2060
  return;
 
2061
}
 
2062
 
 
2063
 
 
2064
void THD::restore_active_arena(Query_arena *set, Query_arena *backup)
 
2065
{
 
2066
  assert(backup->is_backup_arena);
 
2067
  set->set_query_arena(this);
 
2068
  set_query_arena(backup);
 
2069
  backup->is_backup_arena= false;
 
2070
  return;
 
2071
}
 
2072
 
 
2073
 
 
2074
bool THD::copy_db_to(char **p_db, size_t *p_db_length)
 
2075
{
 
2076
  if (db == NULL)
 
2077
  {
 
2078
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
 
2079
    return true;
 
2080
  }
 
2081
  *p_db= strmake(db, db_length);
 
2082
  *p_db_length= db_length;
1556
2083
  return false;
1557
2084
}
1558
2085
 
 
2086
 
 
2087
bool select_dumpvar::send_data(List<Item> &items)
 
2088
{
 
2089
  List_iterator_fast<my_var> var_li(var_list);
 
2090
  List_iterator<Item> it(items);
 
2091
  Item *item;
 
2092
  my_var *mv;
 
2093
 
 
2094
  if (unit->offset_limit_cnt)
 
2095
  {                                             // using limit offset,count
 
2096
    unit->offset_limit_cnt--;
 
2097
    return(0);
 
2098
  }
 
2099
  if (row_count++) 
 
2100
  {
 
2101
    my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0));
 
2102
    return(1);
 
2103
  }
 
2104
  while ((mv= var_li++) && (item= it++))
 
2105
  {
 
2106
    if (mv->local == 0)
 
2107
    {
 
2108
      Item_func_set_user_var *suv= new Item_func_set_user_var(mv->s, item);
 
2109
      suv->fix_fields(thd, 0);
 
2110
      suv->check(0);
 
2111
      suv->update();
 
2112
    }
 
2113
  }
 
2114
  return(thd->is_error());
 
2115
}
 
2116
 
 
2117
bool select_dumpvar::send_eof()
 
2118
{
 
2119
  if (! row_count)
 
2120
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
2121
                 ER_SP_FETCH_NO_DATA, ER(ER_SP_FETCH_NO_DATA));
 
2122
  /*
 
2123
    In order to remember the value of affected rows for ROW_COUNT()
 
2124
    function, SELECT INTO has to have an own SQLCOM.
 
2125
    TODO: split from SQLCOM_SELECT
 
2126
  */
 
2127
  ::my_ok(thd,row_count);
 
2128
  return 0;
 
2129
}
 
2130
 
1559
2131
/****************************************************************************
1560
 
  Tmp_Table_Param
 
2132
  TMP_TABLE_PARAM
1561
2133
****************************************************************************/
1562
2134
 
1563
 
void Tmp_Table_Param::init()
 
2135
void TMP_TABLE_PARAM::init()
1564
2136
{
1565
2137
  field_count= sum_func_count= func_count= hidden_field_count= 0;
1566
2138
  group_parts= group_length= group_null_parts= 0;
1567
2139
  quick_group= 1;
1568
2140
  table_charset= 0;
1569
2141
  precomputed_group_by= 0;
 
2142
  bit_fields_as_long= 0;
 
2143
  return;
1570
2144
}
1571
2145
 
1572
 
void Tmp_Table_Param::cleanup(void)
 
2146
 
 
2147
void thd_increment_bytes_sent(ulong length)
1573
2148
{
1574
 
  /* Fix for Intel compiler */
1575
 
  if (copy_field)
1576
 
  {
1577
 
    boost::checked_array_delete(copy_field);
1578
 
    save_copy_field= save_copy_field_end= copy_field= copy_field_end= 0;
 
2149
  THD *thd=current_thd;
 
2150
  if (likely(thd != 0))
 
2151
  { /* current_thd==0 when close_connection() calls net_send_error() */
 
2152
    thd->status_var.bytes_sent+= length;
1579
2153
  }
1580
2154
}
1581
2155
 
1582
 
void Session::send_kill_message() const
1583
 
{
1584
 
  drizzled::error_t err= static_cast<drizzled::error_t>(killed_errno());
1585
 
  if (err != EE_OK)
 
2156
 
 
2157
void thd_increment_bytes_received(ulong length)
 
2158
{
 
2159
  current_thd->status_var.bytes_received+= length;
 
2160
}
 
2161
 
 
2162
 
 
2163
void thd_increment_net_big_packet_count(ulong length)
 
2164
{
 
2165
  current_thd->status_var.net_big_packet_count+= length;
 
2166
}
 
2167
 
 
2168
void THD::send_kill_message() const
 
2169
{
 
2170
  int err= killed_errno();
 
2171
  if (err)
1586
2172
    my_message(err, ER(err), MYF(0));
1587
2173
}
1588
2174
 
1589
 
void Session::set_status_var_init()
 
2175
void THD::set_status_var_init()
1590
2176
{
1591
2177
  memset(&status_var, 0, sizeof(status_var));
1592
2178
}
1593
2179
 
1594
2180
 
1595
 
void Session::set_db(const std::string &new_db)
1596
 
{
1597
 
  /* Do not reallocate memory if current chunk is big enough. */
1598
 
  if (new_db.length())
1599
 
  {
1600
 
    _schema.reset(new std::string(new_db));
1601
 
  }
1602
 
  else
1603
 
  {
1604
 
    _schema.reset(new std::string(""));
1605
 
  }
1606
 
}
 
2181
void Security_context::init()
 
2182
{
 
2183
  user= ip= 0;
 
2184
}
 
2185
 
 
2186
 
 
2187
void Security_context::destroy()
 
2188
{
 
2189
  // If not pointer to constant
 
2190
  safeFree(user);
 
2191
  safeFree(ip);
 
2192
}
 
2193
 
 
2194
 
 
2195
void Security_context::skip_grants()
 
2196
{
 
2197
  /* privileges for the user are unknown everything is allowed */
 
2198
}
 
2199
 
 
2200
 
 
2201
/****************************************************************************
 
2202
  Handling of open and locked tables states.
 
2203
 
 
2204
  This is used when we want to open/lock (and then close) some tables when
 
2205
  we already have a set of tables open and locked. We use these methods for
 
2206
  access to mysql.proc table to find definitions of stored routines.
 
2207
****************************************************************************/
 
2208
 
 
2209
void THD::reset_n_backup_open_tables_state(Open_tables_state *backup)
 
2210
{
 
2211
  backup->set_open_tables_state(this);
 
2212
  reset_open_tables_state();
 
2213
  state_flags|= Open_tables_state::BACKUPS_AVAIL;
 
2214
  return;
 
2215
}
 
2216
 
 
2217
 
 
2218
void THD::restore_backup_open_tables_state(Open_tables_state *backup)
 
2219
{
 
2220
  /*
 
2221
    Before we will throw away current open tables state we want
 
2222
    to be sure that it was properly cleaned up.
 
2223
  */
 
2224
  assert(open_tables == 0 && temporary_tables == 0 &&
 
2225
              handler_tables == 0 && derived_tables == 0 &&
 
2226
              lock == 0 && locked_tables == 0);
 
2227
  set_open_tables_state(backup);
 
2228
  return;
 
2229
}
 
2230
 
 
2231
/**
 
2232
  Check the killed state of a user thread
 
2233
  @param thd  user thread
 
2234
  @retval 0 the user thread is active
 
2235
  @retval 1 the user thread has been killed
 
2236
*/
 
2237
extern "C" int thd_killed(const DRIZZLE_THD thd)
 
2238
{
 
2239
  return(thd->killed);
 
2240
}
 
2241
 
 
2242
/**
 
2243
  Return the thread id of a user thread
 
2244
  @param thd user thread
 
2245
  @return thread id
 
2246
*/
 
2247
extern "C" unsigned long thd_get_thread_id(const DRIZZLE_THD thd)
 
2248
{
 
2249
  return((unsigned long)thd->thread_id);
 
2250
}
 
2251
 
 
2252
 
 
2253
#ifdef INNODB_COMPATIBILITY_HOOKS
 
2254
extern "C" const struct charset_info_st *thd_charset(DRIZZLE_THD thd)
 
2255
{
 
2256
  return(thd->charset());
 
2257
}
 
2258
 
 
2259
extern "C" char **thd_query(DRIZZLE_THD thd)
 
2260
{
 
2261
  return(&thd->query);
 
2262
}
 
2263
 
 
2264
extern "C" int thd_slave_thread(const DRIZZLE_THD thd)
 
2265
{
 
2266
  return(thd->slave_thread);
 
2267
}
 
2268
 
 
2269
extern "C" int thd_non_transactional_update(const DRIZZLE_THD thd)
 
2270
{
 
2271
  return(thd->transaction.all.modified_non_trans_table);
 
2272
}
 
2273
 
 
2274
extern "C" int thd_binlog_format(const DRIZZLE_THD thd)
 
2275
{
 
2276
  return (int) thd->variables.binlog_format;
 
2277
}
 
2278
 
 
2279
extern "C" void thd_mark_transaction_to_rollback(DRIZZLE_THD thd, bool all)
 
2280
{
 
2281
  mark_transaction_to_rollback(thd, all);
 
2282
}
 
2283
#endif // INNODB_COMPATIBILITY_HOOKS */
1607
2284
 
1608
2285
 
1609
2286
/**
1610
2287
  Mark transaction to rollback and mark error as fatal to a sub-statement.
1611
2288
 
1612
 
  @param  session   Thread handle
 
2289
  @param  thd   Thread handle
1613
2290
  @param  all   true <=> rollback main transaction.
1614
2291
*/
1615
 
void Session::markTransactionForRollback(bool all)
1616
 
{
1617
 
  is_fatal_sub_stmt_error= true;
1618
 
  transaction_rollback_request= all;
1619
 
}
1620
 
 
1621
 
void Session::disconnect(enum error_t errcode)
1622
 
{
1623
 
  /* Allow any plugins to cleanup their session variables */
1624
 
  plugin_sessionvar_cleanup(this);
1625
 
 
1626
 
  /* If necessary, log any aborted or unauthorized connections */
1627
 
  if (getKilled() || client->wasAborted())
1628
 
  {
1629
 
    status_var.aborted_threads++;
1630
 
  }
1631
 
 
1632
 
  if (client->wasAborted())
1633
 
  {
1634
 
    if (not getKilled() && variables.log_warnings > 1)
1635
 
    {
1636
 
      errmsg_printf(error::WARN, ER(ER_NEW_ABORTING_CONNECTION)
1637
 
                  , thread_id
1638
 
                  , (_schema->empty() ? "unconnected" : _schema->c_str())
1639
 
                  , security_ctx->username().empty() == false ? security_ctx->username().c_str() : "unauthenticated"
1640
 
                  , security_ctx->address().c_str()
1641
 
                  , (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1642
 
    }
1643
 
  }
1644
 
 
1645
 
  setKilled(Session::KILL_CONNECTION);
1646
 
 
1647
 
  if (client->isConnected())
1648
 
  {
1649
 
    if (errcode != EE_OK)
1650
 
    {
1651
 
      /*my_error(errcode, ER(errcode));*/
1652
 
      client->sendError(errcode, ER(errcode));
1653
 
    }
1654
 
    client->close();
1655
 
  }
1656
 
}
1657
 
 
1658
 
void Session::reset_for_next_command()
1659
 
{
1660
 
  free_list= 0;
1661
 
  select_number= 1;
1662
 
  /*
1663
 
    Those two lines below are theoretically unneeded as
1664
 
    Session::cleanup_after_query() should take care of this already.
1665
 
  */
1666
 
  auto_inc_intervals_in_cur_stmt_for_binlog.empty();
1667
 
 
1668
 
  is_fatal_error= false;
1669
 
  server_status&= ~ (SERVER_MORE_RESULTS_EXISTS |
1670
 
                          SERVER_QUERY_NO_INDEX_USED |
1671
 
                          SERVER_QUERY_NO_GOOD_INDEX_USED);
1672
 
 
1673
 
  clear_error();
1674
 
  main_da.reset_diagnostics_area();
1675
 
  total_warn_count=0;                   // Warnings for this query
1676
 
  sent_row_count= examined_row_count= 0;
1677
 
}
1678
 
 
1679
 
/*
1680
 
  Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
1681
 
*/
1682
 
 
1683
 
void Open_tables_state::close_temporary_tables()
1684
 
{
1685
 
  Table *table;
1686
 
  Table *tmp_next;
1687
 
 
1688
 
  if (not temporary_tables)
1689
 
    return;
1690
 
 
1691
 
  for (table= temporary_tables; table; table= tmp_next)
1692
 
  {
1693
 
    tmp_next= table->getNext();
1694
 
    nukeTable(table);
1695
 
  }
1696
 
  temporary_tables= NULL;
1697
 
}
1698
 
 
1699
 
/*
1700
 
  unlink from session->temporary tables and close temporary table
1701
 
*/
1702
 
 
1703
 
void Open_tables_state::close_temporary_table(Table *table)
1704
 
{
1705
 
  if (table->getPrev())
1706
 
  {
1707
 
    table->getPrev()->setNext(table->getNext());
1708
 
    if (table->getPrev()->getNext())
1709
 
    {
1710
 
      table->getNext()->setPrev(table->getPrev());
1711
 
    }
1712
 
  }
1713
 
  else
1714
 
  {
1715
 
    /* removing the item from the list */
1716
 
    assert(table == temporary_tables);
1717
 
    /*
1718
 
      slave must reset its temporary list pointer to zero to exclude
1719
 
      passing non-zero value to end_slave via rli->save_temporary_tables
1720
 
      when no temp tables opened, see an invariant below.
1721
 
    */
1722
 
    temporary_tables= table->getNext();
1723
 
    if (temporary_tables)
1724
 
    {
1725
 
      table->getNext()->setPrev(NULL);
1726
 
    }
1727
 
  }
1728
 
  nukeTable(table);
1729
 
}
1730
 
 
1731
 
/*
1732
 
  Close and drop a temporary table
1733
 
 
1734
 
  NOTE
1735
 
  This dosn't unlink table from session->temporary
1736
 
  If this is needed, use close_temporary_table()
1737
 
*/
1738
 
 
1739
 
void Open_tables_state::nukeTable(Table *table)
1740
 
{
1741
 
  plugin::StorageEngine *table_type= table->getShare()->db_type();
1742
 
 
1743
 
  table->free_io_cache();
1744
 
  table->delete_table();
1745
 
 
1746
 
  identifier::Table identifier(table->getShare()->getSchemaName(), table->getShare()->getTableName(), table->getShare()->getPath());
1747
 
  rm_temporary_table(table_type, identifier);
1748
 
 
1749
 
  boost::checked_delete(table->getMutableShare());
1750
 
 
1751
 
  boost::checked_delete(table);
1752
 
}
1753
 
 
1754
 
/** Clear most status variables. */
1755
 
extern time_t flush_status_time;
1756
 
 
1757
 
void Session::refresh_status()
1758
 
{
1759
 
  /* Reset thread's status variables */
1760
 
  memset(&status_var, 0, sizeof(status_var));
1761
 
 
1762
 
  flush_status_time= time((time_t*) 0);
1763
 
  current_global_counters.max_used_connections= 1; /* We set it to one, because we know we exist */
1764
 
  current_global_counters.connections= 0;
1765
 
}
1766
 
 
1767
 
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1768
 
{
1769
 
  return getVariable(std::string(name.str, name.length), create_if_not_exists);
1770
 
}
1771
 
 
1772
 
user_var_entry *Session::getVariable(const std::string  &name, bool create_if_not_exists)
1773
 
{
1774
 
  if (cleanup_done)
1775
 
    return NULL;
1776
 
 
1777
 
  UserVars::iterator iter= user_vars.find(name);
1778
 
  if (iter != user_vars.end())
1779
 
    return (*iter).second;
1780
 
 
1781
 
  if (not create_if_not_exists)
1782
 
    return NULL;
1783
 
 
1784
 
  user_var_entry *entry= NULL;
1785
 
  entry= new (nothrow) user_var_entry(name.c_str(), query_id);
1786
 
 
1787
 
  if (entry == NULL)
1788
 
    return NULL;
1789
 
 
1790
 
  std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(name, entry));
1791
 
 
1792
 
  if (not returnable.second)
1793
 
  {
1794
 
    boost::checked_delete(entry);
1795
 
  }
1796
 
 
1797
 
  return entry;
1798
 
}
1799
 
 
1800
 
void Session::setVariable(const std::string &name, const std::string &value)
1801
 
{
1802
 
  user_var_entry *updateable_var= getVariable(name.c_str(), true);
1803
 
  if (updateable_var)
1804
 
  {
1805
 
    updateable_var->update_hash(false,
1806
 
                                (void*)value.c_str(),
1807
 
                                static_cast<uint32_t>(value.length()), STRING_RESULT,
1808
 
                                &my_charset_bin,
1809
 
                                DERIVATION_IMPLICIT, false);
1810
 
  }
1811
 
}
1812
 
 
1813
 
void Open_tables_state::mark_temp_tables_as_free_for_reuse()
1814
 
{
1815
 
  for (Table *table= temporary_tables ; table ; table= table->getNext())
1816
 
  {
1817
 
    if (table->query_id == getQueryId())
1818
 
    {
1819
 
      table->query_id= 0;
1820
 
      table->cursor->ha_reset();
1821
 
    }
1822
 
  }
1823
 
}
1824
 
 
1825
 
void Session::mark_used_tables_as_free_for_reuse(Table *table)
1826
 
{
1827
 
  for (; table ; table= table->getNext())
1828
 
  {
1829
 
    if (table->query_id == getQueryId())
1830
 
    {
1831
 
      table->query_id= 0;
1832
 
      table->cursor->ha_reset();
1833
 
    }
1834
 
  }
1835
 
}
1836
 
 
1837
 
/*
1838
 
  Unlocks tables and frees derived tables.
1839
 
  Put all normal tables used by thread in free list.
1840
 
 
1841
 
  It will only close/mark as free for reuse tables opened by this
1842
 
  substatement, it will also check if we are closing tables after
1843
 
  execution of complete query (i.e. we are on upper level) and will
1844
 
  leave prelocked mode if needed.
1845
 
*/
1846
 
void Session::close_thread_tables()
1847
 
{
1848
 
  clearDerivedTables();
1849
 
 
1850
 
  /*
1851
 
    Mark all temporary tables used by this statement as free for reuse.
1852
 
  */
1853
 
  mark_temp_tables_as_free_for_reuse();
1854
 
  /*
1855
 
    Let us commit transaction for statement. Since in 5.0 we only have
1856
 
    one statement transaction and don't allow several nested statement
1857
 
    transactions this call will do nothing if we are inside of stored
1858
 
    function or trigger (i.e. statement transaction is already active and
1859
 
    does not belong to statement for which we do close_thread_tables()).
1860
 
    TODO: This should be fixed in later releases.
1861
 
   */
1862
 
  {
1863
 
    TransactionServices &transaction_services= TransactionServices::singleton();
1864
 
    main_da.can_overwrite_status= true;
1865
 
    transaction_services.autocommitOrRollback(*this, is_error());
1866
 
    main_da.can_overwrite_status= false;
1867
 
    transaction.stmt.reset();
1868
 
  }
1869
 
 
1870
 
  if (lock)
1871
 
  {
1872
 
    /*
1873
 
      For RBR we flush the pending event just before we unlock all the
1874
 
      tables.  This means that we are at the end of a topmost
1875
 
      statement, so we ensure that the STMT_END_F flag is set on the
1876
 
      pending event.  For statements that are *inside* stored
1877
 
      functions, the pending event will not be flushed: that will be
1878
 
      handled either before writing a query log event (inside
1879
 
      binlog_query()) or when preparing a pending event.
1880
 
     */
1881
 
    unlockTables(lock);
1882
 
    lock= 0;
1883
 
  }
1884
 
  /*
1885
 
    Note that we need to hold table::Cache::singleton().mutex() while changing the
1886
 
    open_tables list. Another thread may work on it.
1887
 
    (See: table::Cache::singleton().removeTable(), wait_completed_table())
1888
 
    Closing a MERGE child before the parent would be fatal if the
1889
 
    other thread tries to abort the MERGE lock in between.
1890
 
  */
1891
 
  if (open_tables)
1892
 
    close_open_tables();
1893
 
}
1894
 
 
1895
 
void Session::close_tables_for_reopen(TableList **tables)
1896
 
{
1897
 
  /*
1898
 
    If table list consists only from tables from prelocking set, table list
1899
 
    for new attempt should be empty, so we have to update list's root pointer.
1900
 
  */
1901
 
  if (lex->first_not_own_table() == *tables)
1902
 
    *tables= 0;
1903
 
  lex->chop_off_not_own_tables();
1904
 
  for (TableList *tmp= *tables; tmp; tmp= tmp->next_global)
1905
 
    tmp->table= 0;
1906
 
  close_thread_tables();
1907
 
}
1908
 
 
1909
 
bool Session::openTablesLock(TableList *tables)
1910
 
{
1911
 
  uint32_t counter;
1912
 
  bool need_reopen;
1913
 
 
1914
 
  for ( ; ; )
1915
 
  {
1916
 
    if (open_tables_from_list(&tables, &counter))
1917
 
      return true;
1918
 
 
1919
 
    if (not lock_tables(tables, counter, &need_reopen))
1920
 
      break;
1921
 
 
1922
 
    if (not need_reopen)
1923
 
      return true;
1924
 
 
1925
 
    close_tables_for_reopen(&tables);
1926
 
  }
1927
 
 
1928
 
  if ((handle_derived(lex, &derived_prepare) || (handle_derived(lex, &derived_filling))))
1929
 
    return true;
1930
 
 
1931
 
  return false;
1932
 
}
1933
 
 
1934
 
/*
1935
 
  @note "best_effort" is used in cases were if a failure occurred on this
1936
 
  operation it would not be surprising because we are only removing because there
1937
 
  might be an issue (lame engines).
1938
 
*/
1939
 
 
1940
 
bool Open_tables_state::rm_temporary_table(const identifier::Table &identifier, bool best_effort)
1941
 
{
1942
 
  if (not plugin::StorageEngine::dropTable(*static_cast<Session *>(this), identifier))
1943
 
  {
1944
 
    if (not best_effort)
1945
 
    {
1946
 
      std::string path;
1947
 
      identifier.getSQLPath(path);
1948
 
      errmsg_printf(error::WARN, _("Could not remove temporary table: '%s', error: %d"),
1949
 
                    path.c_str(), errno);
1950
 
    }
1951
 
 
1952
 
    return true;
1953
 
  }
1954
 
 
1955
 
  return false;
1956
 
}
1957
 
 
1958
 
bool Open_tables_state::rm_temporary_table(plugin::StorageEngine *base, const identifier::Table &identifier)
1959
 
{
1960
 
  drizzled::error_t error;
1961
 
  assert(base);
1962
 
 
1963
 
  if (not plugin::StorageEngine::dropTable(*static_cast<Session *>(this), *base, identifier, error))
1964
 
  {
1965
 
    std::string path;
1966
 
    identifier.getSQLPath(path);
1967
 
    errmsg_printf(error::WARN, _("Could not remove temporary table: '%s', error: %d"),
1968
 
                  path.c_str(), error);
1969
 
 
1970
 
    return true;
1971
 
  }
1972
 
 
1973
 
  return false;
1974
 
}
1975
 
 
1976
 
/**
1977
 
  @note this will be removed, I am looking through Hudson to see if it is finding
1978
 
  any tables that are missed during cleanup.
1979
 
*/
1980
 
void Open_tables_state::dumpTemporaryTableNames(const char *foo)
1981
 
{
1982
 
  Table *table;
1983
 
 
1984
 
  if (not temporary_tables)
1985
 
    return;
1986
 
 
1987
 
  cerr << "Begin Run: " << foo << "\n";
1988
 
  for (table= temporary_tables; table; table= table->getNext())
1989
 
  {
1990
 
    bool have_proto= false;
1991
 
 
1992
 
    message::Table *proto= table->getShare()->getTableMessage();
1993
 
    if (table->getShare()->getTableMessage())
1994
 
      have_proto= true;
1995
 
 
1996
 
    const char *answer= have_proto ? "true" : "false";
1997
 
 
1998
 
    if (have_proto)
1999
 
    {
2000
 
      cerr << "\tTable Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2001
 
      cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
2002
 
    }
2003
 
    else
2004
 
    {
2005
 
      cerr << "\tTabl;e Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
2006
 
    }
2007
 
  }
2008
 
}
2009
 
 
2010
 
table::Singular *Session::getInstanceTable()
2011
 
{
2012
 
  temporary_shares.push_back(new table::Singular()); // This will not go into the tableshare cache, so no key is used.
2013
 
 
2014
 
  table::Singular *tmp_share= temporary_shares.back();
2015
 
 
2016
 
  assert(tmp_share);
2017
 
 
2018
 
  return tmp_share;
2019
 
}
2020
 
 
2021
 
 
2022
 
/**
2023
 
  Create a reduced Table object with properly set up Field list from a
2024
 
  list of field definitions.
2025
 
 
2026
 
    The created table doesn't have a table Cursor associated with
2027
 
    it, has no keys, no group/distinct, no copy_funcs array.
2028
 
    The sole purpose of this Table object is to use the power of Field
2029
 
    class to read/write data to/from table->getInsertRecord(). Then one can store
2030
 
    the record in any container (RB tree, hash, etc).
2031
 
    The table is created in Session mem_root, so are the table's fields.
2032
 
    Consequently, if you don't BLOB fields, you don't need to free it.
2033
 
 
2034
 
  @param session         connection handle
2035
 
  @param field_list  list of column definitions
2036
 
 
2037
 
  @return
2038
 
    0 if out of memory, Table object in case of success
2039
 
*/
2040
 
table::Singular *Session::getInstanceTable(List<CreateField> &field_list)
2041
 
{
2042
 
  temporary_shares.push_back(new table::Singular(this, field_list)); // This will not go into the tableshare cache, so no key is used.
2043
 
 
2044
 
  table::Singular *tmp_share= temporary_shares.back();
2045
 
 
2046
 
  assert(tmp_share);
2047
 
 
2048
 
  return tmp_share;
2049
 
}
2050
 
 
2051
 
namespace display  {
2052
 
 
2053
 
static const std::string NONE= "NONE";
2054
 
static const std::string GOT_GLOBAL_READ_LOCK= "HAS GLOBAL READ LOCK";
2055
 
static const std::string MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT= "HAS GLOBAL READ LOCK WITH BLOCKING COMMIT";
2056
 
 
2057
 
const std::string &type(drizzled::Session::global_read_lock_t type)
2058
 
{
2059
 
  switch (type) {
2060
 
    default:
2061
 
    case Session::NONE:
2062
 
      return NONE;
2063
 
    case Session::GOT_GLOBAL_READ_LOCK:
2064
 
      return GOT_GLOBAL_READ_LOCK;
2065
 
    case Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT:
2066
 
      return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT;
2067
 
  }
2068
 
}
2069
 
 
2070
 
size_t max_string_length(drizzled::Session::global_read_lock_t)
2071
 
{
2072
 
  return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT.size();
2073
 
}
2074
 
 
2075
 
} /* namespace display */
2076
 
 
2077
 
} /* namespace drizzled */
 
2292
 
 
2293
void mark_transaction_to_rollback(THD *thd, bool all)
 
2294
{
 
2295
  if (thd)
 
2296
  {
 
2297
    thd->is_fatal_sub_stmt_error= true;
 
2298
    thd->transaction_rollback_request= all;
 
2299
  }
 
2300
}
 
2301
/***************************************************************************
 
2302
  Handling of XA id cacheing
 
2303
***************************************************************************/
 
2304
 
 
2305
pthread_mutex_t LOCK_xid_cache;
 
2306
HASH xid_cache;
 
2307
 
 
2308
extern "C" uchar *xid_get_hash_key(const uchar *, size_t *, bool);
 
2309
extern "C" void xid_free_hash(void *);
 
2310
 
 
2311
uchar *xid_get_hash_key(const uchar *ptr, size_t *length,
 
2312
                        bool not_used __attribute__((unused)))
 
2313
{
 
2314
  *length=((XID_STATE*)ptr)->xid.key_length();
 
2315
  return ((XID_STATE*)ptr)->xid.key();
 
2316
}
 
2317
 
 
2318
void xid_free_hash(void *ptr)
 
2319
{
 
2320
  if (!((XID_STATE*)ptr)->in_thd)
 
2321
    my_free((uchar*)ptr, MYF(0));
 
2322
}
 
2323
 
 
2324
bool xid_cache_init()
 
2325
{
 
2326
  pthread_mutex_init(&LOCK_xid_cache, MY_MUTEX_INIT_FAST);
 
2327
  return hash_init(&xid_cache, &my_charset_bin, 100, 0, 0,
 
2328
                   xid_get_hash_key, xid_free_hash, 0) != 0;
 
2329
}
 
2330
 
 
2331
void xid_cache_free()
 
2332
{
 
2333
  if (hash_inited(&xid_cache))
 
2334
  {
 
2335
    hash_free(&xid_cache);
 
2336
    pthread_mutex_destroy(&LOCK_xid_cache);
 
2337
  }
 
2338
}
 
2339
 
 
2340
XID_STATE *xid_cache_search(XID *xid)
 
2341
{
 
2342
  pthread_mutex_lock(&LOCK_xid_cache);
 
2343
  XID_STATE *res=(XID_STATE *)hash_search(&xid_cache, xid->key(), xid->key_length());
 
2344
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2345
  return res;
 
2346
}
 
2347
 
 
2348
 
 
2349
bool xid_cache_insert(XID *xid, enum xa_states xa_state)
 
2350
{
 
2351
  XID_STATE *xs;
 
2352
  bool res;
 
2353
  pthread_mutex_lock(&LOCK_xid_cache);
 
2354
  if (hash_search(&xid_cache, xid->key(), xid->key_length()))
 
2355
    res=0;
 
2356
  else if (!(xs=(XID_STATE *)my_malloc(sizeof(*xs), MYF(MY_WME))))
 
2357
    res=1;
 
2358
  else
 
2359
  {
 
2360
    xs->xa_state=xa_state;
 
2361
    xs->xid.set(xid);
 
2362
    xs->in_thd=0;
 
2363
    res=my_hash_insert(&xid_cache, (uchar*)xs);
 
2364
  }
 
2365
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2366
  return res;
 
2367
}
 
2368
 
 
2369
 
 
2370
bool xid_cache_insert(XID_STATE *xid_state)
 
2371
{
 
2372
  pthread_mutex_lock(&LOCK_xid_cache);
 
2373
  assert(hash_search(&xid_cache, xid_state->xid.key(),
 
2374
                          xid_state->xid.key_length())==0);
 
2375
  bool res=my_hash_insert(&xid_cache, (uchar*)xid_state);
 
2376
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2377
  return res;
 
2378
}
 
2379
 
 
2380
 
 
2381
void xid_cache_delete(XID_STATE *xid_state)
 
2382
{
 
2383
  pthread_mutex_lock(&LOCK_xid_cache);
 
2384
  hash_delete(&xid_cache, (uchar *)xid_state);
 
2385
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2386
}
 
2387
 
 
2388
/*
 
2389
  Implementation of interface to write rows to the binary log through the
 
2390
  thread.  The thread is responsible for writing the rows it has
 
2391
  inserted/updated/deleted.
 
2392
*/
 
2393
 
 
2394
#ifndef DRIZZLE_CLIENT
 
2395
 
 
2396
/*
 
2397
  Template member function for ensuring that there is an rows log
 
2398
  event of the apropriate type before proceeding.
 
2399
 
 
2400
  PRE CONDITION:
 
2401
    - Events of type 'RowEventT' have the type code 'type_code'.
 
2402
    
 
2403
  POST CONDITION:
 
2404
    If a non-NULL pointer is returned, the pending event for thread 'thd' will
 
2405
    be an event of type 'RowEventT' (which have the type code 'type_code')
 
2406
    will either empty or have enough space to hold 'needed' bytes.  In
 
2407
    addition, the columns bitmap will be correct for the row, meaning that
 
2408
    the pending event will be flushed if the columns in the event differ from
 
2409
    the columns suppled to the function.
 
2410
 
 
2411
  RETURNS
 
2412
    If no error, a non-NULL pending event (either one which already existed or
 
2413
    the newly created one).
 
2414
    If error, NULL.
 
2415
 */
 
2416
 
 
2417
template <class RowsEventT> Rows_log_event* 
 
2418
THD::binlog_prepare_pending_rows_event(Table* table, uint32_t serv_id,
 
2419
                                       size_t needed,
 
2420
                                       bool is_transactional,
 
2421
                                       RowsEventT *hint __attribute__((unused)))
 
2422
{
 
2423
  /* Pre-conditions */
 
2424
  assert(table->s->table_map_id != UINT32_MAX);
 
2425
 
 
2426
  /* Fetch the type code for the RowsEventT template parameter */
 
2427
  int const type_code= RowsEventT::TYPE_CODE;
 
2428
 
 
2429
  /*
 
2430
    There is no good place to set up the transactional data, so we
 
2431
    have to do it here.
 
2432
  */
 
2433
  if (binlog_setup_trx_data())
 
2434
    return(NULL);
 
2435
 
 
2436
  Rows_log_event* pending= binlog_get_pending_rows_event();
 
2437
 
 
2438
  if (unlikely(pending && !pending->is_valid()))
 
2439
    return(NULL);
 
2440
 
 
2441
  /*
 
2442
    Check if the current event is non-NULL and a write-rows
 
2443
    event. Also check if the table provided is mapped: if it is not,
 
2444
    then we have switched to writing to a new table.
 
2445
    If there is no pending event, we need to create one. If there is a pending
 
2446
    event, but it's not about the same table id, or not of the same type
 
2447
    (between Write, Update and Delete), or not the same affected columns, or
 
2448
    going to be too big, flush this event to disk and create a new pending
 
2449
    event.
 
2450
 
 
2451
    The last test is necessary for the Cluster injector to work
 
2452
    correctly. The reason is that the Cluster can inject two write
 
2453
    rows with different column bitmaps if there is an insert followed
 
2454
    by an update in the same transaction, and these are grouped into a
 
2455
    single epoch/transaction when fed to the injector.
 
2456
 
 
2457
    TODO: Fix the code so that the last test can be removed.
 
2458
  */
 
2459
  if (!pending ||
 
2460
      pending->server_id != serv_id || 
 
2461
      pending->get_table_id() != table->s->table_map_id ||
 
2462
      pending->get_type_code() != type_code || 
 
2463
      pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
 
2464
      !bitmap_cmp(pending->get_cols(), table->write_set))
 
2465
    {
 
2466
    /* Create a new RowsEventT... */
 
2467
    Rows_log_event* const
 
2468
        ev= new RowsEventT(this, table, table->s->table_map_id,
 
2469
                           is_transactional);
 
2470
    if (unlikely(!ev))
 
2471
      return(NULL);
 
2472
    ev->server_id= serv_id; // I don't like this, it's too easy to forget.
 
2473
    /*
 
2474
      flush the pending event and replace it with the newly created
 
2475
      event...
 
2476
    */
 
2477
    if (unlikely(mysql_bin_log.flush_and_set_pending_rows_event(this, ev)))
 
2478
    {
 
2479
      delete ev;
 
2480
      return(NULL);
 
2481
    }
 
2482
 
 
2483
    return(ev);               /* This is the new pending event */
 
2484
  }
 
2485
  return(pending);        /* This is the current pending event */
 
2486
}
 
2487
 
 
2488
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
2489
/*
 
2490
  Instantiate the versions we need, we have -fno-implicit-template as
 
2491
  compiling option.
 
2492
*/
 
2493
template Rows_log_event*
 
2494
THD::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
 
2495
                                       Write_rows_log_event*);
 
2496
 
 
2497
template Rows_log_event*
 
2498
THD::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
 
2499
                                       Delete_rows_log_event *);
 
2500
 
 
2501
template Rows_log_event* 
 
2502
THD::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
 
2503
                                       Update_rows_log_event *);
 
2504
#endif
 
2505
 
 
2506
namespace {
 
2507
  /**
 
2508
     Class to handle temporary allocation of memory for row data.
 
2509
 
 
2510
     The responsibilities of the class is to provide memory for
 
2511
     packing one or two rows of packed data (depending on what
 
2512
     constructor is called).
 
2513
 
 
2514
     In order to make the allocation more efficient for "simple" rows,
 
2515
     i.e., rows that do not contain any blobs, a pointer to the
 
2516
     allocated memory is of memory is stored in the table structure
 
2517
     for simple rows.  If memory for a table containing a blob field
 
2518
     is requested, only memory for that is allocated, and subsequently
 
2519
     released when the object is destroyed.
 
2520
 
 
2521
   */
 
2522
  class Row_data_memory {
 
2523
  public:
 
2524
    /**
 
2525
      Build an object to keep track of a block-local piece of memory
 
2526
      for storing a row of data.
 
2527
 
 
2528
      @param table
 
2529
      Table where the pre-allocated memory is stored.
 
2530
 
 
2531
      @param length
 
2532
      Length of data that is needed, if the record contain blobs.
 
2533
     */
 
2534
    Row_data_memory(Table *table, size_t const len1)
 
2535
      : m_memory(0)
 
2536
    {
 
2537
      m_alloc_checked= false;
 
2538
      allocate_memory(table, len1);
 
2539
      m_ptr[0]= has_memory() ? m_memory : 0;
 
2540
      m_ptr[1]= 0;
 
2541
    }
 
2542
 
 
2543
    Row_data_memory(Table *table, size_t const len1, size_t const len2)
 
2544
      : m_memory(0)
 
2545
    {
 
2546
      m_alloc_checked= false;
 
2547
      allocate_memory(table, len1 + len2);
 
2548
      m_ptr[0]= has_memory() ? m_memory        : 0;
 
2549
      m_ptr[1]= has_memory() ? m_memory + len1 : 0;
 
2550
    }
 
2551
 
 
2552
    ~Row_data_memory()
 
2553
    {
 
2554
      if (m_memory != 0 && m_release_memory_on_destruction)
 
2555
        my_free((uchar*) m_memory, MYF(MY_WME));
 
2556
    }
 
2557
 
 
2558
    /**
 
2559
       Is there memory allocated?
 
2560
 
 
2561
       @retval true There is memory allocated
 
2562
       @retval false Memory allocation failed
 
2563
     */
 
2564
    bool has_memory() const {
 
2565
      m_alloc_checked= true;
 
2566
      return m_memory != 0;
 
2567
    }
 
2568
 
 
2569
    uchar *slot(uint s)
 
2570
    {
 
2571
      assert(s < sizeof(m_ptr)/sizeof(*m_ptr));
 
2572
      assert(m_ptr[s] != 0);
 
2573
      assert(m_alloc_checked == true);
 
2574
      return m_ptr[s];
 
2575
    }
 
2576
 
 
2577
  private:
 
2578
    void allocate_memory(Table *const table, size_t const total_length)
 
2579
    {
 
2580
      if (table->s->blob_fields == 0)
 
2581
      {
 
2582
        /*
 
2583
          The maximum length of a packed record is less than this
 
2584
          length. We use this value instead of the supplied length
 
2585
          when allocating memory for records, since we don't know how
 
2586
          the memory will be used in future allocations.
 
2587
 
 
2588
          Since table->s->reclength is for unpacked records, we have
 
2589
          to add two bytes for each field, which can potentially be
 
2590
          added to hold the length of a packed field.
 
2591
        */
 
2592
        size_t const maxlen= table->s->reclength + 2 * table->s->fields;
 
2593
 
 
2594
        /*
 
2595
          Allocate memory for two records if memory hasn't been
 
2596
          allocated. We allocate memory for two records so that it can
 
2597
          be used when processing update rows as well.
 
2598
        */
 
2599
        if (table->write_row_record == 0)
 
2600
          table->write_row_record=
 
2601
            (uchar *) alloc_root(&table->mem_root, 2 * maxlen);
 
2602
        m_memory= table->write_row_record;
 
2603
        m_release_memory_on_destruction= false;
 
2604
      }
 
2605
      else
 
2606
      {
 
2607
        m_memory= (uchar *) my_malloc(total_length, MYF(MY_WME));
 
2608
        m_release_memory_on_destruction= true;
 
2609
      }
 
2610
    }
 
2611
 
 
2612
    mutable bool m_alloc_checked;
 
2613
    bool m_release_memory_on_destruction;
 
2614
    uchar *m_memory;
 
2615
    uchar *m_ptr[2];
 
2616
  };
 
2617
}
 
2618
 
 
2619
 
 
2620
int THD::binlog_write_row(Table* table, bool is_trans, 
 
2621
                          uchar const *record) 
 
2622
 
2623
  assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
 
2624
 
 
2625
  /*
 
2626
    Pack records into format for transfer. We are allocating more
 
2627
    memory than needed, but that doesn't matter.
 
2628
  */
 
2629
  Row_data_memory memory(table, table->max_row_length(record));
 
2630
  if (!memory.has_memory())
 
2631
    return HA_ERR_OUT_OF_MEM;
 
2632
 
 
2633
  uchar *row_data= memory.slot(0);
 
2634
 
 
2635
  size_t const len= pack_row(table, table->write_set, row_data, record);
 
2636
 
 
2637
  Rows_log_event* const ev=
 
2638
    binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
 
2639
                                      static_cast<Write_rows_log_event*>(0));
 
2640
 
 
2641
  if (unlikely(ev == 0))
 
2642
    return HA_ERR_OUT_OF_MEM;
 
2643
 
 
2644
  return ev->add_row_data(row_data, len);
 
2645
}
 
2646
 
 
2647
int THD::binlog_update_row(Table* table, bool is_trans,
 
2648
                           const uchar *before_record,
 
2649
                           const uchar *after_record)
 
2650
 
2651
  assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
 
2652
 
 
2653
  size_t const before_maxlen = table->max_row_length(before_record);
 
2654
  size_t const after_maxlen  = table->max_row_length(after_record);
 
2655
 
 
2656
  Row_data_memory row_data(table, before_maxlen, after_maxlen);
 
2657
  if (!row_data.has_memory())
 
2658
    return HA_ERR_OUT_OF_MEM;
 
2659
 
 
2660
  uchar *before_row= row_data.slot(0);
 
2661
  uchar *after_row= row_data.slot(1);
 
2662
 
 
2663
  size_t const before_size= pack_row(table, table->read_set, before_row,
 
2664
                                        before_record);
 
2665
  size_t const after_size= pack_row(table, table->write_set, after_row,
 
2666
                                       after_record);
 
2667
 
 
2668
  Rows_log_event* const ev=
 
2669
    binlog_prepare_pending_rows_event(table, server_id,
 
2670
                                      before_size + after_size, is_trans,
 
2671
                                      static_cast<Update_rows_log_event*>(0));
 
2672
 
 
2673
  if (unlikely(ev == 0))
 
2674
    return HA_ERR_OUT_OF_MEM;
 
2675
 
 
2676
  return
 
2677
    ev->add_row_data(before_row, before_size) ||
 
2678
    ev->add_row_data(after_row, after_size);
 
2679
}
 
2680
 
 
2681
int THD::binlog_delete_row(Table* table, bool is_trans, 
 
2682
                           uchar const *record)
 
2683
 
2684
  assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
 
2685
 
 
2686
  /* 
 
2687
     Pack records into format for transfer. We are allocating more
 
2688
     memory than needed, but that doesn't matter.
 
2689
  */
 
2690
  Row_data_memory memory(table, table->max_row_length(record));
 
2691
  if (unlikely(!memory.has_memory()))
 
2692
    return HA_ERR_OUT_OF_MEM;
 
2693
 
 
2694
  uchar *row_data= memory.slot(0);
 
2695
 
 
2696
  size_t const len= pack_row(table, table->read_set, row_data, record);
 
2697
 
 
2698
  Rows_log_event* const ev=
 
2699
    binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
 
2700
                                      static_cast<Delete_rows_log_event*>(0));
 
2701
 
 
2702
  if (unlikely(ev == 0))
 
2703
    return HA_ERR_OUT_OF_MEM;
 
2704
 
 
2705
  return ev->add_row_data(row_data, len);
 
2706
}
 
2707
 
 
2708
 
 
2709
int THD::binlog_flush_pending_rows_event(bool stmt_end)
 
2710
{
 
2711
  /*
 
2712
    We shall flush the pending event even if we are not in row-based
 
2713
    mode: it might be the case that we left row-based mode before
 
2714
    flushing anything (e.g., if we have explicitly locked tables).
 
2715
   */
 
2716
  if (!mysql_bin_log.is_open())
 
2717
    return(0);
 
2718
 
 
2719
  /*
 
2720
    Mark the event as the last event of a statement if the stmt_end
 
2721
    flag is set.
 
2722
  */
 
2723
  int error= 0;
 
2724
  if (Rows_log_event *pending= binlog_get_pending_rows_event())
 
2725
  {
 
2726
    if (stmt_end)
 
2727
    {
 
2728
      pending->set_flags(Rows_log_event::STMT_END_F);
 
2729
      pending->flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
 
2730
      binlog_table_maps= 0;
 
2731
    }
 
2732
 
 
2733
    error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0);
 
2734
  }
 
2735
 
 
2736
  return(error);
 
2737
}
 
2738
 
 
2739
 
 
2740
/*
 
2741
  Member function that will log query, either row-based or
 
2742
  statement-based depending on the value of the 'current_stmt_binlog_row_based'
 
2743
  the value of the 'qtype' flag.
 
2744
 
 
2745
  This function should be called after the all calls to ha_*_row()
 
2746
  functions have been issued, but before tables are unlocked and
 
2747
  closed.
 
2748
 
 
2749
  OBSERVE
 
2750
    There shall be no writes to any system table after calling
 
2751
    binlog_query(), so these writes has to be moved to before the call
 
2752
    of binlog_query() for correct functioning.
 
2753
 
 
2754
    This is necessesary not only for RBR, but the master might crash
 
2755
    after binlogging the query but before changing the system tables.
 
2756
    This means that the slave and the master are not in the same state
 
2757
    (after the master has restarted), so therefore we have to
 
2758
    eliminate this problem.
 
2759
 
 
2760
  RETURN VALUE
 
2761
    Error code, or 0 if no error.
 
2762
*/
 
2763
int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
 
2764
                      ulong query_len, bool is_trans, bool suppress_use,
 
2765
                      THD::killed_state killed_status_arg)
 
2766
{
 
2767
  assert(query_arg && mysql_bin_log.is_open());
 
2768
 
 
2769
  if (int error= binlog_flush_pending_rows_event(true))
 
2770
    return(error);
 
2771
 
 
2772
  /*
 
2773
    If we are in statement mode and trying to log an unsafe statement,
 
2774
    we should print a warning.
 
2775
  */
 
2776
  if (lex->is_stmt_unsafe() &&
 
2777
      variables.binlog_format == BINLOG_FORMAT_STMT)
 
2778
  {
 
2779
    assert(this->query != NULL);
 
2780
    push_warning(this, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
2781
                 ER_BINLOG_UNSAFE_STATEMENT,
 
2782
                 ER(ER_BINLOG_UNSAFE_STATEMENT));
 
2783
    if (!(binlog_flags & BINLOG_FLAG_UNSAFE_STMT_PRINTED))
 
2784
    {
 
2785
      char warn_buf[DRIZZLE_ERRMSG_SIZE];
 
2786
      snprintf(warn_buf, DRIZZLE_ERRMSG_SIZE, "%s Statement: %s",
 
2787
               ER(ER_BINLOG_UNSAFE_STATEMENT), this->query);
 
2788
      sql_print_warning(warn_buf);
 
2789
      binlog_flags|= BINLOG_FLAG_UNSAFE_STMT_PRINTED;
 
2790
    }
 
2791
  }
 
2792
 
 
2793
  switch (qtype) {
 
2794
  case THD::ROW_QUERY_TYPE:
 
2795
    if (current_stmt_binlog_row_based)
 
2796
      return(0);
 
2797
    /* Otherwise, we fall through */
 
2798
  case THD::DRIZZLE_QUERY_TYPE:
 
2799
    /*
 
2800
      Using this query type is a conveniece hack, since we have been
 
2801
      moving back and forth between using RBR for replication of
 
2802
      system tables and not using it.
 
2803
 
 
2804
      Make sure to change in check_table_binlog_row_based() according
 
2805
      to how you treat this.
 
2806
    */
 
2807
  case THD::STMT_QUERY_TYPE:
 
2808
    /*
 
2809
      The DRIZZLE_LOG::write() function will set the STMT_END_F flag and
 
2810
      flush the pending rows event if necessary.
 
2811
     */
 
2812
    {
 
2813
      Query_log_event qinfo(this, query_arg, query_len, is_trans, suppress_use,
 
2814
                            killed_status_arg);
 
2815
      qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
 
2816
      /*
 
2817
        Binlog table maps will be irrelevant after a Query_log_event
 
2818
        (they are just removed on the slave side) so after the query
 
2819
        log event is written to the binary log, we pretend that no
 
2820
        table maps were written.
 
2821
       */
 
2822
      int error= mysql_bin_log.write(&qinfo);
 
2823
      binlog_table_maps= 0;
 
2824
      return(error);
 
2825
    }
 
2826
    break;
 
2827
 
 
2828
  case THD::QUERY_TYPE_COUNT:
 
2829
  default:
 
2830
    assert(0 <= qtype && qtype < QUERY_TYPE_COUNT);
 
2831
  }
 
2832
  return(0);
 
2833
}
 
2834
 
 
2835
bool Discrete_intervals_list::append(uint64_t start, uint64_t val,
 
2836
                                 uint64_t incr)
 
2837
{
 
2838
  /* first, see if this can be merged with previous */
 
2839
  if ((head == NULL) || tail->merge_if_contiguous(start, val, incr))
 
2840
  {
 
2841
    /* it cannot, so need to add a new interval */
 
2842
    Discrete_interval *new_interval= new Discrete_interval(start, val, incr);
 
2843
    return(append(new_interval));
 
2844
  }
 
2845
  return(0);
 
2846
}
 
2847
 
 
2848
bool Discrete_intervals_list::append(Discrete_interval *new_interval)
 
2849
{
 
2850
  if (unlikely(new_interval == NULL))
 
2851
    return(1);
 
2852
  if (head == NULL)
 
2853
    head= current= new_interval;
 
2854
  else
 
2855
    tail->next= new_interval;
 
2856
  tail= new_interval;
 
2857
  elements++;
 
2858
  return(0);
 
2859
}
 
2860
 
 
2861
#endif /* !defined(DRIZZLE_CLIENT) */