~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_class.cc

  • Committer: Monty Taylor
  • Date: 2008-10-10 23:04:21 UTC
  • mto: (509.1.1 codestyle)
  • mto: This revision was merged to the branch mainline in revision 511.
  • Revision ID: monty@inaugust.com-20081010230421-zohe1eppxievpw8d
RemovedĀ O_NOFOLLOW

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2008 Sun Microsystems
5
 
 *
6
 
 *  This program is free software; you can redistribute it and/or modify
7
 
 *  it under the terms of the GNU General Public License as published by
8
 
 *  the Free Software Foundation; version 2 of the License.
9
 
 *
10
 
 *  This program is distributed in the hope that it will be useful,
11
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 *  GNU General Public License for more details.
14
 
 *
15
 
 *  You should have received a copy of the GNU General Public License
16
 
 *  along with this program; if not, write to the Free Software
17
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
 
 */
19
 
 
20
 
/**
21
 
 * @file Implementation of the Session class and API
22
 
 */
23
 
 
24
 
#include "config.h"
25
 
#include <drizzled/session.h>
26
 
#include "drizzled/session_list.h"
 
1
/* Copyright (C) 2000-2006 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
/*****************************************************************************
 
18
**
 
19
** This file implements classes defined in sql_class.h
 
20
** Especially the classes to handle a result from a select
 
21
**
 
22
*****************************************************************************/
 
23
#include <drizzled/server_includes.h>
 
24
#include "rpl_rli.h"
 
25
#include "rpl_record.h"
 
26
#include "log_event.h"
27
27
#include <sys/stat.h>
28
 
#include <drizzled/error.h>
29
 
#include <drizzled/gettext.h>
30
 
#include <drizzled/query_id.h>
31
 
#include <drizzled/data_home.h>
32
 
#include <drizzled/sql_base.h>
33
 
#include <drizzled/lock.h>
34
 
#include <drizzled/item/cache.h>
35
 
#include <drizzled/item/float.h>
36
 
#include <drizzled/item/return_int.h>
37
 
#include <drizzled/item/empty_string.h>
38
 
#include <drizzled/show.h>
39
 
#include <drizzled/plugin/client.h>
40
 
#include "drizzled/plugin/scheduler.h"
41
 
#include "drizzled/plugin/authentication.h"
42
 
#include "drizzled/plugin/logging.h"
43
 
#include "drizzled/plugin/transactional_storage_engine.h"
44
 
#include "drizzled/probes.h"
45
 
#include "drizzled/table_proto.h"
46
 
#include "drizzled/db.h"
47
 
#include "drizzled/pthread_globals.h"
48
 
#include "drizzled/transaction_services.h"
49
 
#include "drizzled/drizzled.h"
50
 
 
51
 
#include "drizzled/table_share_instance.h"
52
 
 
53
 
#include "plugin/myisam/myisam.h"
54
 
#include "drizzled/internal/iocache.h"
55
 
#include "drizzled/internal/thread_var.h"
56
 
#include "drizzled/plugin/event_observer.h"
57
 
 
58
 
#include <fcntl.h>
59
 
#include <algorithm>
60
 
#include <climits>
61
 
#include "boost/filesystem.hpp" 
62
 
 
63
 
using namespace std;
64
 
 
65
 
namespace fs=boost::filesystem;
66
 
namespace drizzled
67
 
{
 
28
#include <mysys/thr_alarm.h>
 
29
#include <mysys/mysys_err.h>
 
30
#include <drizzled/drizzled_error_messages.h>
 
31
#include <drizzled/innodb_plugin_extras.h>
68
32
 
69
33
/*
70
34
  The following is used to initialise Table_ident with a internal
73
37
char internal_table_name[2]= "*";
74
38
char empty_c_string[1]= {0};    /* used for not defined db */
75
39
 
76
 
const char * const Session::DEFAULT_WHERE= "field list";
 
40
const char * const THD::DEFAULT_WHERE= "field list";
 
41
 
 
42
 
 
43
/*****************************************************************************
 
44
** Instansiate templates
 
45
*****************************************************************************/
 
46
 
 
47
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
48
/* Used templates */
 
49
template class List<Key>;
 
50
template class List_iterator<Key>;
 
51
template class List<Key_part_spec>;
 
52
template class List_iterator<Key_part_spec>;
 
53
template class List<Alter_drop>;
 
54
template class List_iterator<Alter_drop>;
 
55
template class List<Alter_column>;
 
56
template class List_iterator<Alter_column>;
 
57
#endif
 
58
 
 
59
/****************************************************************************
 
60
** User variables
 
61
****************************************************************************/
 
62
 
 
63
extern "C" unsigned char *get_var_key(user_var_entry *entry, size_t *length,
 
64
                              bool not_used __attribute__((unused)))
 
65
{
 
66
  *length= entry->name.length;
 
67
  return (unsigned char*) entry->name.str;
 
68
}
 
69
 
 
70
extern "C" void free_user_var(user_var_entry *entry)
 
71
{
 
72
  char *pos= (char*) entry+ALIGN_SIZE(sizeof(*entry));
 
73
  if (entry->value && entry->value != pos)
 
74
    free(entry->value);
 
75
  free((char*) entry);
 
76
}
77
77
 
78
78
bool Key_part_spec::operator==(const Key_part_spec& other) const
79
79
{
82
82
         !strcmp(field_name.str, other.field_name.str);
83
83
}
84
84
 
85
 
Open_tables_state::Open_tables_state(uint64_t version_arg) :
86
 
  version(version_arg)
87
 
{
88
 
  open_tables= temporary_tables= derived_tables= NULL;
89
 
  extra_lock= lock= NULL;
 
85
/**
 
86
  Construct an (almost) deep copy of this key. Only those
 
87
  elements that are known to never change are not copied.
 
88
  If out of memory, a partial copy is returned and an error is set
 
89
  in THD.
 
90
*/
 
91
 
 
92
Key::Key(const Key &rhs, MEM_ROOT *mem_root)
 
93
  :type(rhs.type),
 
94
  key_create_info(rhs.key_create_info),
 
95
  columns(rhs.columns, mem_root),
 
96
  name(rhs.name),
 
97
  generated(rhs.generated)
 
98
{
 
99
  list_copy_and_replace_each_value(columns, mem_root);
 
100
}
 
101
 
 
102
/**
 
103
  Construct an (almost) deep copy of this foreign key. Only those
 
104
  elements that are known to never change are not copied.
 
105
  If out of memory, a partial copy is returned and an error is set
 
106
  in THD.
 
107
*/
 
108
 
 
109
Foreign_key::Foreign_key(const Foreign_key &rhs, MEM_ROOT *mem_root)
 
110
  :Key(rhs),
 
111
  ref_table(rhs.ref_table),
 
112
  ref_columns(rhs.ref_columns),
 
113
  delete_opt(rhs.delete_opt),
 
114
  update_opt(rhs.update_opt),
 
115
  match_opt(rhs.match_opt)
 
116
{
 
117
  list_copy_and_replace_each_value(ref_columns, mem_root);
 
118
}
 
119
 
 
120
/*
 
121
  Test if a foreign key (= generated key) is a prefix of the given key
 
122
  (ignoring key name, key type and order of columns)
 
123
 
 
124
  NOTES:
 
125
    This is only used to test if an index for a FOREIGN KEY exists
 
126
 
 
127
  IMPLEMENTATION
 
128
    We only compare field names
 
129
 
 
130
  RETURN
 
131
    0   Generated key is a prefix of other key
 
132
    1   Not equal
 
133
*/
 
134
 
 
135
bool foreign_key_prefix(Key *a, Key *b)
 
136
{
 
137
  /* Ensure that 'a' is the generated key */
 
138
  if (a->generated)
 
139
  {
 
140
    if (b->generated && a->columns.elements > b->columns.elements)
 
141
      std::swap(a, b);                       // Put shorter key in 'a'
 
142
  }
 
143
  else
 
144
  {
 
145
    if (!b->generated)
 
146
      return true;                              // No foreign key
 
147
    std::swap(a, b);                       // Put generated key in 'a'
 
148
  }
 
149
 
 
150
  /* Test if 'a' is a prefix of 'b' */
 
151
  if (a->columns.elements > b->columns.elements)
 
152
    return true;                                // Can't be prefix
 
153
 
 
154
  List_iterator<Key_part_spec> col_it1(a->columns);
 
155
  List_iterator<Key_part_spec> col_it2(b->columns);
 
156
  const Key_part_spec *col1, *col2;
 
157
 
 
158
#ifdef ENABLE_WHEN_INNODB_CAN_HANDLE_SWAPED_FOREIGN_KEY_COLUMNS
 
159
  while ((col1= col_it1++))
 
160
  {
 
161
    bool found= 0;
 
162
    col_it2.rewind();
 
163
    while ((col2= col_it2++))
 
164
    {
 
165
      if (*col1 == *col2)
 
166
      {
 
167
        found= true;
 
168
        break;
 
169
      }
 
170
    }
 
171
    if (!found)
 
172
      return true;                              // Error
 
173
  }
 
174
  return false;                                 // Is prefix
 
175
#else
 
176
  while ((col1= col_it1++))
 
177
  {
 
178
    col2= col_it2++;
 
179
    if (!(*col1 == *col2))
 
180
      return true;
 
181
  }
 
182
  return false;                                 // Is prefix
 
183
#endif
 
184
}
 
185
 
 
186
 
 
187
/****************************************************************************
 
188
** Thread specific functions
 
189
****************************************************************************/
 
190
 
 
191
Open_tables_state::Open_tables_state(ulong version_arg)
 
192
  :version(version_arg), state_flags(0U)
 
193
{
 
194
  reset_open_tables_state();
90
195
}
91
196
 
92
197
/*
93
198
  The following functions form part of the C plugin API
94
199
*/
95
 
int mysql_tmpfile(const char *prefix)
 
200
 
 
201
extern "C" int mysql_tmpfile(const char *prefix)
96
202
{
97
203
  char filename[FN_REFLEN];
98
 
  int fd = internal::create_temp_file(filename, drizzle_tmpdir.c_str(), prefix, MYF(MY_WME));
 
204
  File fd = create_temp_file(filename, mysql_tmpdir, prefix,
 
205
                             O_CREAT | O_EXCL | O_RDWR,
 
206
                             MYF(MY_WME));
99
207
  if (fd >= 0) {
100
208
    unlink(filename);
101
209
  }
103
211
  return fd;
104
212
}
105
213
 
106
 
int session_tablespace_op(const Session *session)
107
 
{
108
 
  return test(session->tablespace_op);
109
 
}
 
214
 
 
215
extern "C"
 
216
int thd_in_lock_tables(const THD *thd)
 
217
{
 
218
  return test(thd->in_lock_tables);
 
219
}
 
220
 
 
221
 
 
222
extern "C"
 
223
int thd_tablespace_op(const THD *thd)
 
224
{
 
225
  return test(thd->tablespace_op);
 
226
}
 
227
 
110
228
 
111
229
/**
112
 
   Set the process info field of the Session structure.
 
230
   Set the process info field of the THD structure.
113
231
 
114
232
   This function is used by plug-ins. Internally, the
115
 
   Session::set_proc_info() function should be used.
 
233
   THD::set_proc_info() function should be used.
116
234
 
117
 
   @see Session::set_proc_info
 
235
   @see THD::set_proc_info
118
236
 */
119
 
void set_session_proc_info(Session *session, const char *info)
120
 
{
121
 
  session->set_proc_info(info);
122
 
}
123
 
 
124
 
const char *get_session_proc_info(Session *session)
125
 
{
126
 
  return session->get_proc_info();
127
 
}
128
 
 
129
 
void **Session::getEngineData(const plugin::MonitoredInTransaction *monitored)
130
 
{
131
 
  return static_cast<void **>(&ha_data[monitored->getId()].ha_ptr);
132
 
}
133
 
 
134
 
ResourceContext *Session::getResourceContext(const plugin::MonitoredInTransaction *monitored,
135
 
                                             size_t index)
136
 
{
137
 
  return &ha_data[monitored->getId()].resource_context[index];
138
 
}
139
 
 
140
 
int64_t session_test_options(const Session *session, int64_t test_options)
141
 
{
142
 
  return session->options & test_options;
143
 
}
144
 
 
145
 
int session_sql_command(const Session *session)
146
 
{
147
 
  return (int) session->lex->sql_command;
148
 
}
149
 
 
150
 
enum_tx_isolation session_tx_isolation(const Session *session)
151
 
{
152
 
  return (enum_tx_isolation)session->variables.tx_isolation;
153
 
}
154
 
 
155
 
Session::Session(plugin::Client *client_arg) :
156
 
  Open_tables_state(refresh_version),
157
 
  mem_root(&main_mem_root),
158
 
  lex(&main_lex),
159
 
  query(),
160
 
  client(client_arg),
161
 
  scheduler(NULL),
162
 
  scheduler_arg(NULL),
163
 
  lock_id(&main_lock_id),
164
 
  user_time(0),
165
 
  ha_data(plugin::num_trx_monitored_objects),
166
 
  arg_of_last_insert_id_function(false),
167
 
  first_successful_insert_id_in_prev_stmt(0),
168
 
  first_successful_insert_id_in_cur_stmt(0),
169
 
  limit_found_rows(0),
170
 
  global_read_lock(0),
171
 
  some_tables_deleted(false),
172
 
  no_errors(false),
173
 
  password(false),
174
 
  is_fatal_error(false),
175
 
  transaction_rollback_request(false),
176
 
  is_fatal_sub_stmt_error(0),
177
 
  derived_tables_processing(false),
178
 
  tablespace_op(false),
179
 
  m_lip(NULL),
180
 
  cached_table(0),
181
 
  transaction_message(NULL),
182
 
  statement_message(NULL),
183
 
  session_event_observers(NULL),
184
 
  use_usage(false)
185
 
{
186
 
  memset(process_list_info, 0, PROCESS_LIST_WIDTH);
187
 
  client->setSession(this);
 
237
extern "C" void
 
238
set_thd_proc_info(THD *thd, const char *info)
 
239
{
 
240
  thd->set_proc_info(info);
 
241
}
 
242
 
 
243
extern "C"
 
244
const char *get_thd_proc_info(THD *thd)
 
245
{
 
246
  return thd->get_proc_info();
 
247
}
 
248
 
 
249
extern "C"
 
250
void **thd_ha_data(const THD *thd, const struct handlerton *hton)
 
251
{
 
252
  return (void **) &thd->ha_data[hton->slot].ha_ptr;
 
253
}
 
254
 
 
255
extern "C"
 
256
int64_t thd_test_options(const THD *thd, int64_t test_options)
 
257
{
 
258
  return thd->options & test_options;
 
259
}
 
260
 
 
261
extern "C"
 
262
int thd_sql_command(const THD *thd)
 
263
{
 
264
  return (int) thd->lex->sql_command;
 
265
}
 
266
 
 
267
extern "C"
 
268
int thd_tx_isolation(const THD *thd)
 
269
{
 
270
  return (int) thd->variables.tx_isolation;
 
271
}
 
272
 
 
273
extern "C"
 
274
void thd_inc_row_count(THD *thd)
 
275
{
 
276
  thd->row_count++;
 
277
}
 
278
 
 
279
/**
 
280
  Clear this diagnostics area. 
 
281
 
 
282
  Normally called at the end of a statement.
 
283
*/
 
284
 
 
285
void
 
286
Diagnostics_area::reset_diagnostics_area()
 
287
{
 
288
  can_overwrite_status= false;
 
289
  /** Don't take chances in production */
 
290
  m_message[0]= '\0';
 
291
  m_sql_errno= 0;
 
292
  m_server_status= 0;
 
293
  m_affected_rows= 0;
 
294
  m_last_insert_id= 0;
 
295
  m_total_warn_count= 0;
 
296
  is_sent= false;
 
297
  /** Tiny reset in debug mode to see garbage right away */
 
298
  m_status= DA_EMPTY;
 
299
}
 
300
 
 
301
 
 
302
/**
 
303
  Set OK status -- ends commands that do not return a
 
304
  result set, e.g. INSERT/UPDATE/DELETE.
 
305
*/
 
306
 
 
307
void
 
308
Diagnostics_area::set_ok_status(THD *thd, ha_rows affected_rows_arg,
 
309
                                uint64_t last_insert_id_arg,
 
310
                                const char *message_arg)
 
311
{
 
312
  assert(! is_set());
 
313
  /*
 
314
    In production, refuse to overwrite an error or a custom response
 
315
    with an OK packet.
 
316
  */
 
317
  if (is_error() || is_disabled())
 
318
    return;
 
319
  /** Only allowed to report success if has not yet reported an error */
 
320
 
 
321
  m_server_status= thd->server_status;
 
322
  m_total_warn_count= thd->total_warn_count;
 
323
  m_affected_rows= affected_rows_arg;
 
324
  m_last_insert_id= last_insert_id_arg;
 
325
  if (message_arg)
 
326
    strmake(m_message, message_arg, sizeof(m_message) - 1);
 
327
  else
 
328
    m_message[0]= '\0';
 
329
  m_status= DA_OK;
 
330
}
 
331
 
 
332
 
 
333
/**
 
334
  Set EOF status.
 
335
*/
 
336
 
 
337
void
 
338
Diagnostics_area::set_eof_status(THD *thd)
 
339
{
 
340
  /** Only allowed to report eof if has not yet reported an error */
 
341
 
 
342
  assert(! is_set());
 
343
  /*
 
344
    In production, refuse to overwrite an error or a custom response
 
345
    with an EOF packet.
 
346
  */
 
347
  if (is_error() || is_disabled())
 
348
    return;
 
349
 
 
350
  m_server_status= thd->server_status;
 
351
  /*
 
352
    If inside a stored procedure, do not return the total
 
353
    number of warnings, since they are not available to the client
 
354
    anyway.
 
355
  */
 
356
  m_total_warn_count= thd->total_warn_count;
 
357
 
 
358
  m_status= DA_EOF;
 
359
}
 
360
 
 
361
/**
 
362
  Set ERROR status.
 
363
*/
 
364
 
 
365
void
 
366
Diagnostics_area::set_error_status(THD *thd __attribute__((unused)),
 
367
                                   uint32_t sql_errno_arg,
 
368
                                   const char *message_arg)
 
369
{
 
370
  /*
 
371
    Only allowed to report error if has not yet reported a success
 
372
    The only exception is when we flush the message to the client,
 
373
    an error can happen during the flush.
 
374
  */
 
375
  assert(! is_set() || can_overwrite_status);
 
376
  /*
 
377
    In production, refuse to overwrite a custom response with an
 
378
    ERROR packet.
 
379
  */
 
380
  if (is_disabled())
 
381
    return;
 
382
 
 
383
  m_sql_errno= sql_errno_arg;
 
384
  strmake(m_message, message_arg, sizeof(m_message) - 1);
 
385
 
 
386
  m_status= DA_ERROR;
 
387
}
 
388
 
 
389
 
 
390
/**
 
391
  Mark the diagnostics area as 'DISABLED'.
 
392
 
 
393
  This is used in rare cases when the COM_ command at hand sends a response
 
394
  in a custom format. One example is the query cache, another is
 
395
  COM_STMT_PREPARE.
 
396
*/
 
397
 
 
398
void
 
399
Diagnostics_area::disable_status()
 
400
{
 
401
  assert(! is_set());
 
402
  m_status= DA_DISABLED;
 
403
}
 
404
 
 
405
 
 
406
THD::THD()
 
407
   :Statement(&main_lex, &main_mem_root,
 
408
              /* statement id */ 0),
 
409
   Open_tables_state(refresh_version), rli_fake(0),
 
410
   lock_id(&main_lock_id),
 
411
   user_time(0),
 
412
   binlog_table_maps(0), binlog_flags(0UL),
 
413
   arg_of_last_insert_id_function(false),
 
414
   first_successful_insert_id_in_prev_stmt(0),
 
415
   first_successful_insert_id_in_prev_stmt_for_binlog(0),
 
416
   first_successful_insert_id_in_cur_stmt(0),
 
417
   stmt_depends_on_first_successful_insert_id_in_prev_stmt(false),
 
418
   global_read_lock(0),
 
419
   is_fatal_error(0),
 
420
   transaction_rollback_request(0),
 
421
   is_fatal_sub_stmt_error(0),
 
422
   rand_used(0),
 
423
   time_zone_used(0),
 
424
   in_lock_tables(0),
 
425
   bootstrap(0),
 
426
   derived_tables_processing(false),
 
427
   m_lip(NULL)
 
428
{
 
429
  ulong tmp;
188
430
 
189
431
  /*
190
432
    Pass nominal parameters to init_alloc_root only to ensure that
191
433
    the destructor works OK in case of an error. The main_mem_root
192
434
    will be re-initialized in init_for_queries().
193
435
  */
194
 
  memory::init_sql_alloc(&main_mem_root, memory::ROOT_MIN_BLOCK_SIZE, 0);
195
 
  thread_stack= NULL;
196
 
  count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
 
436
  init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0);
 
437
  thread_stack= 0;
 
438
  catalog= (char*)"std"; // the only catalog we have for now
 
439
  main_security_ctx.init();
 
440
  security_ctx= &main_security_ctx;
 
441
  some_tables_deleted=no_errors=password= 0;
 
442
  query_start_used= 0;
 
443
  count_cuted_fields= CHECK_FIELD_IGNORE;
197
444
  killed= NOT_KILLED;
198
 
  col_access= 0;
199
 
  tmp_table= 0;
200
 
  used_tables= 0;
 
445
  col_access=0;
 
446
  is_slave_error= thread_specific_used= false;
 
447
  hash_clear(&handler_tables_hash);
 
448
  tmp_table=0;
 
449
  used_tables=0;
201
450
  cuted_fields= sent_row_count= row_count= 0L;
 
451
  limit_found_rows= 0;
202
452
  row_count_func= -1;
203
453
  statement_id_counter= 0UL;
204
 
  // Must be reset to handle error with Session's created for init of mysqld
 
454
  // Must be reset to handle error with THD's created for init of mysqld
205
455
  lex->current_select= 0;
206
456
  start_time=(time_t) 0;
207
457
  start_utime= 0L;
208
458
  utime_after_lock= 0L;
 
459
  current_linfo =  0;
 
460
  slave_thread = 0;
209
461
  memset(&variables, 0, sizeof(variables));
210
462
  thread_id= 0;
 
463
  one_shot_set= 0;
211
464
  file_id = 0;
212
465
  query_id= 0;
213
 
  warn_query_id= 0;
214
 
  mysys_var= 0;
215
 
  scoreboard_index= -1;
216
 
  dbug_sentry=Session_SENTRY_MAGIC;
217
 
  cleanup_done= abort_on_warning= no_warnings_for_error= false;  
218
 
 
219
 
  /* query_cache init */
220
 
  query_cache_key= "";
221
 
  resultset= NULL;
 
466
  warn_id= 0;
 
467
  db_charset= global_system_variables.collation_database;
 
468
  memset(ha_data, 0, sizeof(ha_data));
 
469
  mysys_var=0;
 
470
  binlog_evt_union.do_union= false;
 
471
  dbug_sentry=THD_SENTRY_MAGIC;
 
472
  net.vio=0;
 
473
  client_capabilities= 0;                       // minimalistic client
 
474
  system_thread= NON_SYSTEM_THREAD;
 
475
  cleanup_done= abort_on_warning= no_warnings_for_error= 0;
 
476
  peer_port= 0;                                 // For SHOW PROCESSLIST
 
477
  transaction.m_pending_rows_event= 0;
 
478
  transaction.on= 1;
 
479
  pthread_mutex_init(&LOCK_delete, MY_MUTEX_INIT_FAST);
222
480
 
223
481
  /* Variables with default values */
224
482
  proc_info="login";
225
 
  where= Session::DEFAULT_WHERE;
226
 
  command= COM_CONNECT;
227
 
 
228
 
  plugin_sessionvar_init(this);
 
483
  where= THD::DEFAULT_WHERE;
 
484
  server_id = ::server_id;
 
485
  slave_net = 0;
 
486
  command=COM_CONNECT;
 
487
  *scramble= '\0';
 
488
 
 
489
  init();
 
490
  /* Initialize sub structures */
 
491
  init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
 
492
  user_connect=(USER_CONN *)0;
 
493
  hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0,
 
494
            (hash_get_key) get_var_key,
 
495
            (hash_free_key) free_user_var, 0);
 
496
 
 
497
  /* For user vars replication*/
 
498
  if (opt_bin_log)
 
499
    my_init_dynamic_array(&user_var_events,
 
500
                          sizeof(BINLOG_USER_VAR_EVENT *), 16, 16);
 
501
  else
 
502
    memset(&user_var_events, 0, sizeof(user_var_events));
 
503
 
 
504
  /* Protocol */
 
505
  protocol= &protocol_text;                     // Default protocol
 
506
  protocol_text.init(this);
 
507
 
 
508
  tablespace_op= false;
 
509
  tmp= sql_rnd();
 
510
  randominit(&rand, tmp + (ulong) &rand, tmp + (ulong) ::global_query_id);
 
511
  substitute_null_with_insert_id = false;
 
512
  thr_lock_info_init(&lock_info); /* safety: will be reset after start */
 
513
  thr_lock_owner_init(&main_lock_id, &lock_info);
 
514
 
 
515
  m_internal_handler= NULL;
 
516
}
 
517
 
 
518
 
 
519
void THD::push_internal_handler(Internal_error_handler *handler)
 
520
{
 
521
  /*
 
522
    TODO: The current implementation is limited to 1 handler at a time only.
 
523
    THD and sp_rcontext need to be modified to use a common handler stack.
 
524
  */
 
525
  assert(m_internal_handler == NULL);
 
526
  m_internal_handler= handler;
 
527
}
 
528
 
 
529
 
 
530
bool THD::handle_error(uint32_t sql_errno, const char *message,
 
531
                       DRIZZLE_ERROR::enum_warning_level level)
 
532
{
 
533
  if (m_internal_handler)
 
534
  {
 
535
    return m_internal_handler->handle_error(sql_errno, message, level, this);
 
536
  }
 
537
 
 
538
  return false;                                 // 'false', as per coding style
 
539
}
 
540
 
 
541
 
 
542
void THD::pop_internal_handler()
 
543
{
 
544
  assert(m_internal_handler != NULL);
 
545
  m_internal_handler= NULL;
 
546
}
 
547
 
 
548
extern "C"
 
549
void *thd_alloc(DRIZZLE_THD thd, unsigned int size)
 
550
{
 
551
  return thd->alloc(size);
 
552
}
 
553
 
 
554
extern "C"
 
555
void *thd_calloc(DRIZZLE_THD thd, unsigned int size)
 
556
{
 
557
  return thd->calloc(size);
 
558
}
 
559
 
 
560
extern "C"
 
561
char *thd_strdup(DRIZZLE_THD thd, const char *str)
 
562
{
 
563
  return thd->strdup(str);
 
564
}
 
565
 
 
566
extern "C"
 
567
char *thd_strmake(DRIZZLE_THD thd, const char *str, unsigned int size)
 
568
{
 
569
  return thd->strmake(str, size);
 
570
}
 
571
 
 
572
extern "C"
 
573
LEX_STRING *thd_make_lex_string(THD *thd, LEX_STRING *lex_str,
 
574
                                const char *str, unsigned int size,
 
575
                                int allocate_lex_string)
 
576
{
 
577
  return thd->make_lex_string(lex_str, str, size,
 
578
                              (bool) allocate_lex_string);
 
579
}
 
580
 
 
581
extern "C"
 
582
void *thd_memdup(DRIZZLE_THD thd, const void* str, unsigned int size)
 
583
{
 
584
  return thd->memdup(str, size);
 
585
}
 
586
 
 
587
extern "C"
 
588
void thd_get_xid(const DRIZZLE_THD thd, DRIZZLE_XID *xid)
 
589
{
 
590
  *xid = *(DRIZZLE_XID *) &thd->transaction.xid_state.xid;
 
591
}
 
592
 
 
593
/*
 
594
  Init common variables that has to be reset on start and on change_user
 
595
*/
 
596
 
 
597
void THD::init(void)
 
598
{
 
599
  pthread_mutex_lock(&LOCK_global_system_variables);
 
600
  plugin_thdvar_init(this);
 
601
  variables.time_format= date_time_format_copy((THD*) 0,
 
602
                                               variables.time_format);
 
603
  variables.date_format= date_time_format_copy((THD*) 0,
 
604
                                               variables.date_format);
 
605
  variables.datetime_format= date_time_format_copy((THD*) 0,
 
606
                                                   variables.datetime_format);
229
607
  /*
230
608
    variables= global_system_variables above has reset
231
609
    variables.pseudo_thread_id to 0. We need to correct it here to
232
610
    avoid temporary tables replication failure.
233
611
  */
234
612
  variables.pseudo_thread_id= thread_id;
 
613
  pthread_mutex_unlock(&LOCK_global_system_variables);
235
614
  server_status= SERVER_STATUS_AUTOCOMMIT;
236
 
  options= session_startup_options;
 
615
  options= thd_startup_options;
237
616
 
238
617
  if (variables.max_join_size == HA_POS_ERROR)
239
618
    options |= OPTION_BIG_SELECTS;
240
619
  else
241
620
    options &= ~OPTION_BIG_SELECTS;
242
621
 
 
622
  transaction.all.modified_non_trans_table= transaction.stmt.modified_non_trans_table= false;
243
623
  open_options=ha_open_options;
244
 
  update_lock_default= TL_WRITE;
 
624
  update_lock_default= (variables.low_priority_updates ?
 
625
                        TL_WRITE_LOW_PRIORITY :
 
626
                        TL_WRITE);
245
627
  session_tx_isolation= (enum_tx_isolation) variables.tx_isolation;
246
628
  warn_list.empty();
247
629
  memset(warn_count, 0, sizeof(warn_count));
248
630
  total_warn_count= 0;
 
631
  update_charset();
 
632
  reset_current_stmt_binlog_row_based();
249
633
  memset(&status_var, 0, sizeof(status_var));
250
 
 
251
 
  /* Initialize sub structures */
252
 
  memory::init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
253
 
 
254
 
  substitute_null_with_insert_id = false;
255
 
  lock_info.init(); /* safety: will be reset after start */
256
 
  thr_lock_owner_init(&main_lock_id, &lock_info);
257
 
 
258
 
  m_internal_handler= NULL;
259
 
  
260
 
  plugin::EventObserver::registerSessionEvents(*this); 
261
 
}
262
 
 
263
 
void Session::free_items()
264
 
{
265
 
  Item *next;
266
 
  /* This works because items are allocated with memory::sql_alloc() */
267
 
  for (; free_list; free_list= next)
268
 
  {
269
 
    next= free_list->next;
270
 
    free_list->delete_self();
271
 
  }
272
 
}
273
 
 
274
 
void Session::push_internal_handler(Internal_error_handler *handler)
275
 
{
276
 
  /*
277
 
    TODO: The current implementation is limited to 1 handler at a time only.
278
 
    Session and sp_rcontext need to be modified to use a common handler stack.
279
 
  */
280
 
  assert(m_internal_handler == NULL);
281
 
  m_internal_handler= handler;
282
 
}
283
 
 
284
 
bool Session::handle_error(uint32_t sql_errno, const char *message,
285
 
                       DRIZZLE_ERROR::enum_warning_level level)
286
 
{
287
 
  if (m_internal_handler)
288
 
  {
289
 
    return m_internal_handler->handle_error(sql_errno, message, level, this);
290
 
  }
291
 
 
292
 
  return false;                                 // 'false', as per coding style
293
 
}
294
 
 
295
 
void Session::setAbort(bool arg)
296
 
{
297
 
  mysys_var->abort= arg;
298
 
}
299
 
 
300
 
void Session::lockOnSys()
301
 
{
302
 
  if (not mysys_var)
303
 
    return;
304
 
 
305
 
  setAbort(true);
306
 
  boost::mutex::scoped_lock scopedLock(mysys_var->mutex);
307
 
  if (mysys_var->current_cond)
308
 
  {
309
 
    mysys_var->current_mutex->lock();
310
 
    pthread_cond_broadcast(mysys_var->current_cond->native_handle());
311
 
    mysys_var->current_mutex->unlock();
312
 
  }
313
 
}
314
 
 
315
 
void Session::pop_internal_handler()
316
 
{
317
 
  assert(m_internal_handler != NULL);
318
 
  m_internal_handler= NULL;
319
 
}
320
 
 
321
 
void Session::get_xid(DRIZZLE_XID *xid)
322
 
{
323
 
  *xid = *(DRIZZLE_XID *) &transaction.xid_state.xid;
324
 
}
 
634
}
 
635
 
 
636
 
 
637
/*
 
638
  Init THD for query processing.
 
639
  This has to be called once before we call mysql_parse.
 
640
  See also comments in sql_class.h.
 
641
*/
 
642
 
 
643
void THD::init_for_queries()
 
644
{
 
645
  set_time(); 
 
646
  ha_enable_transaction(this,true);
 
647
 
 
648
  reset_root_defaults(mem_root, variables.query_alloc_block_size,
 
649
                      variables.query_prealloc_size);
 
650
  reset_root_defaults(&transaction.mem_root,
 
651
                      variables.trans_alloc_block_size,
 
652
                      variables.trans_prealloc_size);
 
653
  transaction.xid_state.xid.null();
 
654
  transaction.xid_state.in_thd=1;
 
655
}
 
656
 
325
657
 
326
658
/* Do operations that may take a long time */
327
659
 
328
 
void Session::cleanup(void)
 
660
void THD::cleanup(void)
329
661
{
330
 
  assert(cleanup_done == false);
 
662
  assert(cleanup_done == 0);
331
663
 
332
664
  killed= KILL_CONNECTION;
333
665
#ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE
337
669
  }
338
670
#endif
339
671
  {
340
 
    TransactionServices &transaction_services= TransactionServices::singleton();
341
 
    transaction_services.rollbackTransaction(this, true);
 
672
    ha_rollback(this);
342
673
    xid_cache_delete(&transaction.xid_state);
343
674
  }
344
 
 
345
 
  for (UserVars::iterator iter= user_vars.begin();
346
 
       iter != user_vars.end();
347
 
       iter++)
 
675
  if (locked_tables)
348
676
  {
349
 
    user_var_entry *entry= (*iter).second;
350
 
    delete entry;
 
677
    lock=locked_tables; locked_tables=0;
 
678
    close_thread_tables(this);
351
679
  }
352
 
  user_vars.clear();
353
 
 
354
 
 
355
 
  close_temporary_tables();
356
 
 
 
680
  mysql_ha_cleanup(this);
 
681
  delete_dynamic(&user_var_events);
 
682
  hash_free(&user_vars);
 
683
  close_temporary_tables(this);
 
684
  free((char*) variables.time_format);
 
685
  free((char*) variables.date_format);
 
686
  free((char*) variables.datetime_format);
 
687
  
357
688
  if (global_read_lock)
358
689
    unlock_global_read_lock(this);
359
690
 
360
 
  cleanup_done= true;
 
691
  cleanup_done=1;
 
692
  return;
361
693
}
362
694
 
363
 
Session::~Session()
 
695
THD::~THD()
364
696
{
365
 
  this->checkSentry();
366
 
 
367
 
  if (client->isConnected())
368
 
  {
369
 
    if (global_system_variables.log_warnings)
370
 
        errmsg_printf(ERRMSG_LVL_WARN, ER(ER_FORCING_CLOSE),internal::my_progname,
371
 
                      thread_id,
372
 
                      (getSecurityContext().getUser().c_str() ?
373
 
                       getSecurityContext().getUser().c_str() : ""));
374
 
    disconnect(0, false);
375
 
  }
 
697
  THD_CHECK_SENTRY(this);
 
698
  /* Ensure that no one is using THD */
 
699
  pthread_mutex_lock(&LOCK_delete);
 
700
  pthread_mutex_unlock(&LOCK_delete);
 
701
  add_to_status(&global_status_var, &status_var);
376
702
 
377
703
  /* Close connection */
378
 
  client->close();
379
 
  delete client;
380
 
 
381
 
  if (cleanup_done == false)
 
704
  if (net.vio)
 
705
  {
 
706
    net_close(&net);
 
707
    net_end(&net);
 
708
  }
 
709
  if (!cleanup_done)
382
710
    cleanup();
383
711
 
384
 
  plugin::StorageEngine::closeConnection(this);
385
 
  plugin_sessionvar_cleanup(this);
 
712
  ha_close_connection(this);
 
713
  plugin_thdvar_cleanup(this);
386
714
 
387
 
  warn_root.free_root(MYF(0));
 
715
  main_security_ctx.destroy();
 
716
  if (db)
 
717
  {
 
718
    free(db);
 
719
    db= NULL;
 
720
  }
 
721
  free_root(&warn_root,MYF(0));
 
722
  free_root(&transaction.mem_root,MYF(0));
388
723
  mysys_var=0;                                  // Safety (shouldn't be needed)
389
 
  dbug_sentry= Session_SENTRY_GONE;
390
 
 
391
 
  main_mem_root.free_root(MYF(0));
392
 
  currentMemRoot().release();
393
 
  currentSession().release();
394
 
 
395
 
  plugin::Logging::postEndDo(this);
396
 
  plugin::EventObserver::deregisterSessionEvents(*this); 
397
 
 
398
 
  for (PropertyMap::iterator iter= life_properties.begin(); iter != life_properties.end(); iter++)
 
724
  pthread_mutex_destroy(&LOCK_delete);
 
725
  dbug_sentry= THD_SENTRY_GONE;
 
726
  if (rli_fake)
399
727
  {
400
 
    delete (*iter).second;
 
728
    delete rli_fake;
 
729
    rli_fake= NULL;
401
730
  }
402
 
  life_properties.clear();
403
 
 
404
 
  /* Ensure that no one is using Session */
405
 
  LOCK_delete.unlock();
406
 
}
407
 
 
408
 
void Session::awake(Session::killed_state state_to_set)
409
 
{
410
 
  this->checkSentry();
411
 
  safe_mutex_assert_owner(&LOCK_delete);
 
731
  
 
732
  free_root(&main_mem_root, MYF(0));
 
733
  return;
 
734
}
 
735
 
 
736
 
 
737
/*
 
738
  Add all status variables to another status variable array
 
739
 
 
740
  SYNOPSIS
 
741
   add_to_status()
 
742
   to_var       add to this array
 
743
   from_var     from this array
 
744
 
 
745
  NOTES
 
746
    This function assumes that all variables are long/ulong.
 
747
    If this assumption will change, then we have to explictely add
 
748
    the other variables after the while loop
 
749
*/
 
750
 
 
751
void add_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var)
 
752
{
 
753
  ulong *end= (ulong*) ((unsigned char*) to_var +
 
754
                        offsetof(STATUS_VAR, last_system_status_var) +
 
755
                        sizeof(ulong));
 
756
  ulong *to= (ulong*) to_var, *from= (ulong*) from_var;
 
757
 
 
758
  while (to != end)
 
759
    *(to++)+= *(from++);
 
760
}
 
761
 
 
762
/*
 
763
  Add the difference between two status variable arrays to another one.
 
764
 
 
765
  SYNOPSIS
 
766
    add_diff_to_status
 
767
    to_var       add to this array
 
768
    from_var     from this array
 
769
    dec_var      minus this array
 
770
  
 
771
  NOTE
 
772
    This function assumes that all variables are long/ulong.
 
773
*/
 
774
 
 
775
void add_diff_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var,
 
776
                        STATUS_VAR *dec_var)
 
777
{
 
778
  ulong *end= (ulong*) ((unsigned char*) to_var + offsetof(STATUS_VAR,
 
779
                                                  last_system_status_var) +
 
780
                        sizeof(ulong));
 
781
  ulong *to= (ulong*) to_var, *from= (ulong*) from_var, *dec= (ulong*) dec_var;
 
782
 
 
783
  while (to != end)
 
784
    *(to++)+= *(from++) - *(dec++);
 
785
}
 
786
 
 
787
 
 
788
void THD::awake(THD::killed_state state_to_set)
 
789
{
 
790
  THD_CHECK_SENTRY(this);
 
791
  safe_mutex_assert_owner(&LOCK_delete); 
412
792
 
413
793
  killed= state_to_set;
414
 
  if (state_to_set != Session::KILL_QUERY)
 
794
  if (state_to_set != THD::KILL_QUERY)
415
795
  {
416
 
    scheduler->killSession(this);
417
 
    DRIZZLE_CONNECTION_DONE(thread_id);
 
796
    thr_alarm_kill(thread_id);
 
797
    if (!slave_thread)
 
798
      thread_scheduler.post_kill_notification(this);
418
799
  }
419
800
  if (mysys_var)
420
801
  {
421
 
    boost::mutex::scoped_lock scopedLock(mysys_var->mutex);
 
802
    pthread_mutex_lock(&mysys_var->mutex);
 
803
    if (!system_thread)         // Don't abort locks
 
804
      mysys_var->abort=1;
422
805
    /*
423
 
      "
424
806
      This broadcast could be up in the air if the victim thread
425
807
      exits the cond in the time between read and broadcast, but that is
426
808
      ok since all we want to do is to make the victim thread get out
436
818
      current_cond and current_mutex are 0), then the victim will not get
437
819
      a signal and it may wait "forever" on the cond (until
438
820
      we issue a second KILL or the status it's waiting for happens).
439
 
      It's true that we have set its session->killed but it may not
 
821
      It's true that we have set its thd->killed but it may not
440
822
      see it immediately and so may have time to reach the cond_wait().
441
823
    */
442
824
    if (mysys_var->current_cond && mysys_var->current_mutex)
443
825
    {
444
 
      mysys_var->current_mutex->lock();
445
 
      pthread_cond_broadcast(mysys_var->current_cond->native_handle());
446
 
      mysys_var->current_mutex->unlock();
 
826
      pthread_mutex_lock(mysys_var->current_mutex);
 
827
      pthread_cond_broadcast(mysys_var->current_cond);
 
828
      pthread_mutex_unlock(mysys_var->current_mutex);
447
829
    }
 
830
    pthread_mutex_unlock(&mysys_var->mutex);
448
831
  }
 
832
  return;
449
833
}
450
834
 
451
835
/*
452
836
  Remember the location of thread info, the structure needed for
453
 
  memory::sql_alloc() and the structure for the net buffer
 
837
  sql_alloc() and the structure for the net buffer
454
838
*/
455
 
bool Session::storeGlobals()
 
839
 
 
840
bool THD::store_globals()
456
841
{
457
842
  /*
458
843
    Assert that thread_stack is initialized: it's necessary to be able
460
845
  */
461
846
  assert(thread_stack);
462
847
 
463
 
  currentSession().release();
464
 
  currentSession().reset(this);
465
 
 
466
 
  currentMemRoot().release();
467
 
  currentMemRoot().reset(&mem_root);
468
 
 
 
848
  if (my_pthread_setspecific_ptr(THR_THD,  this) ||
 
849
      my_pthread_setspecific_ptr(THR_MALLOC, &mem_root))
 
850
    return 1;
469
851
  mysys_var=my_thread_var;
470
 
 
471
852
  /*
472
853
    Let mysqld define the thread id (not mysys)
473
 
    This allows us to move Session to different threads if needed.
 
854
    This allows us to move THD to different threads if needed.
474
855
  */
475
856
  mysys_var->id= thread_id;
 
857
  real_id= pthread_self();                      // For debugging
476
858
 
477
859
  /*
478
 
    We have to call thr_lock_info_init() again here as Session may have been
 
860
    We have to call thr_lock_info_init() again here as THD may have been
479
861
    created in another thread
480
862
  */
481
 
  lock_info.init();
482
 
 
483
 
  return false;
 
863
  thr_lock_info_init(&lock_info);
 
864
  return 0;
484
865
}
485
866
 
 
867
 
486
868
/*
487
 
  Init Session for query processing.
488
 
  This has to be called once before we call mysql_parse.
489
 
  See also comments in session.h.
 
869
  Cleanup after query.
 
870
 
 
871
  SYNOPSIS
 
872
    THD::cleanup_after_query()
 
873
 
 
874
  DESCRIPTION
 
875
    This function is used to reset thread data to its default state.
 
876
 
 
877
  NOTE
 
878
    This function is not suitable for setting thread data to some
 
879
    non-default values, as there is only one replication thread, so
 
880
    different master threads may overwrite data of each other on
 
881
    slave.
490
882
*/
491
883
 
492
 
void Session::prepareForQueries()
493
 
{
494
 
  if (variables.max_join_size == HA_POS_ERROR)
495
 
    options |= OPTION_BIG_SELECTS;
496
 
 
497
 
  version= refresh_version;
498
 
  set_proc_info(NULL);
499
 
  command= COM_SLEEP;
500
 
  set_time();
501
 
 
502
 
  mem_root->reset_root_defaults(variables.query_alloc_block_size,
503
 
                                variables.query_prealloc_size);
504
 
  transaction.xid_state.xid.null();
505
 
  transaction.xid_state.in_session=1;
506
 
  if (use_usage)
507
 
    resetUsage();
508
 
}
509
 
 
510
 
bool Session::initGlobals()
511
 
{
512
 
  if (storeGlobals())
513
 
  {
514
 
    disconnect(ER_OUT_OF_RESOURCES, true);
515
 
    status_var.aborted_connects++;
516
 
    return true;
517
 
  }
518
 
  return false;
519
 
}
520
 
 
521
 
void Session::run()
522
 
{
523
 
  if (initGlobals() || authenticate())
524
 
  {
525
 
    disconnect(0, true);
526
 
    return;
527
 
  }
528
 
 
529
 
  prepareForQueries();
530
 
 
531
 
  while (! client->haveError() && killed != KILL_CONNECTION)
532
 
  {
533
 
    if (! executeStatement())
534
 
      break;
535
 
  }
536
 
 
537
 
  disconnect(0, true);
538
 
}
539
 
 
540
 
bool Session::schedule()
541
 
{
542
 
  scheduler= plugin::Scheduler::getScheduler();
543
 
  assert(scheduler);
544
 
 
545
 
  connection_count.increment();
546
 
 
547
 
  if (connection_count > current_global_counters.max_used_connections)
548
 
  {
549
 
    current_global_counters.max_used_connections= connection_count;
550
 
  }
551
 
 
552
 
  current_global_counters.connections++;
553
 
  thread_id= variables.pseudo_thread_id= global_thread_id++;
554
 
 
555
 
  LOCK_thread_count.lock();
556
 
  getSessionList().push_back(this);
557
 
  LOCK_thread_count.unlock();
558
 
 
559
 
  if (scheduler->addSession(this))
560
 
  {
561
 
    DRIZZLE_CONNECTION_START(thread_id);
562
 
    char error_message_buff[DRIZZLE_ERRMSG_SIZE];
563
 
 
564
 
    killed= Session::KILL_CONNECTION;
565
 
 
566
 
    status_var.aborted_connects++;
567
 
 
568
 
    /* Can't use my_error() since store_globals has not been called. */
569
 
    /* TODO replace will better error message */
570
 
    snprintf(error_message_buff, sizeof(error_message_buff),
571
 
             ER(ER_CANT_CREATE_THREAD), 1);
572
 
    client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
573
 
    return true;
574
 
  }
575
 
 
576
 
  return false;
577
 
}
578
 
 
579
 
 
580
 
const char* Session::enter_cond(boost::condition_variable &cond, boost::mutex &mutex, const char* msg)
581
 
{
582
 
  const char* old_msg = get_proc_info();
583
 
  safe_mutex_assert_owner(mutex);
584
 
  mysys_var->current_mutex = &mutex;
585
 
  mysys_var->current_cond = &cond;
586
 
  this->set_proc_info(msg);
587
 
  return old_msg;
588
 
}
589
 
 
590
 
void Session::exit_cond(const char* old_msg)
591
 
{
592
 
  /*
593
 
    Putting the mutex unlock in exit_cond() ensures that
594
 
    mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
595
 
    locked (if that would not be the case, you'll get a deadlock if someone
596
 
    does a Session::awake() on you).
597
 
  */
598
 
  mysys_var->current_mutex->unlock();
599
 
  boost::mutex::scoped_lock scopedLock(mysys_var->mutex);
600
 
  mysys_var->current_mutex = 0;
601
 
  mysys_var->current_cond = 0;
602
 
  this->set_proc_info(old_msg);
603
 
}
604
 
 
605
 
bool Session::authenticate()
606
 
{
607
 
  lex_start(this);
608
 
  if (client->authenticate())
609
 
    return false;
610
 
 
611
 
  status_var.aborted_connects++;
612
 
 
613
 
  return true;
614
 
}
615
 
 
616
 
bool Session::checkUser(const char *passwd, uint32_t passwd_len, const char *in_db)
617
 
{
618
 
  const string passwd_str(passwd, passwd_len);
619
 
  bool is_authenticated=
620
 
    plugin::Authentication::isAuthenticated(getSecurityContext(),
621
 
                                            passwd_str);
622
 
 
623
 
  if (is_authenticated != true)
624
 
  {
625
 
    status_var.access_denied++;
626
 
    /* isAuthenticated has pushed the error message */
627
 
    return false;
628
 
  }
629
 
 
630
 
  /* Change database if necessary */
631
 
  if (in_db && in_db[0])
632
 
  {
633
 
    SchemaIdentifier identifier(in_db);
634
 
    if (mysql_change_db(this, identifier))
635
 
    {
636
 
      /* mysql_change_db() has pushed the error message. */
637
 
      return false;
638
 
    }
639
 
  }
640
 
  my_ok();
641
 
  password= test(passwd_len);          // remember for error messages
642
 
 
643
 
  /* Ready to handle queries */
644
 
  return true;
645
 
}
646
 
 
647
 
bool Session::executeStatement()
648
 
{
649
 
  char *l_packet= 0;
650
 
  uint32_t packet_length;
651
 
 
652
 
  enum enum_server_command l_command;
653
 
 
654
 
  /*
655
 
    indicator of uninitialized lex => normal flow of errors handling
656
 
    (see my_message_sql)
657
 
  */
658
 
  lex->current_select= 0;
659
 
  clear_error();
660
 
  main_da.reset_diagnostics_area();
661
 
 
662
 
  if (client->readCommand(&l_packet, &packet_length) == false)
663
 
    return false;
664
 
 
665
 
  if (killed == KILL_CONNECTION)
666
 
    return false;
667
 
 
668
 
  if (packet_length == 0)
669
 
    return true;
670
 
 
671
 
  l_command= (enum enum_server_command) (unsigned char) l_packet[0];
672
 
 
673
 
  if (command >= COM_END)
674
 
    command= COM_END;                           // Wrong command
675
 
 
676
 
  assert(packet_length);
677
 
  return ! dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
678
 
}
679
 
 
680
 
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
681
 
{
682
 
  /* Remove garbage at start and end of query */
683
 
  while (in_packet_length > 0 && my_isspace(charset(), in_packet[0]))
684
 
  {
685
 
    in_packet++;
686
 
    in_packet_length--;
687
 
  }
688
 
  const char *pos= in_packet + in_packet_length; /* Point at end null */
689
 
  while (in_packet_length > 0 &&
690
 
         (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
691
 
  {
692
 
    pos--;
693
 
    in_packet_length--;
694
 
  }
695
 
 
696
 
  query.assign(in_packet, in_packet + in_packet_length);
697
 
 
698
 
  return true;
699
 
}
700
 
 
701
 
bool Session::endTransaction(enum enum_mysql_completiontype completion)
702
 
{
703
 
  bool do_release= 0;
704
 
  bool result= true;
705
 
  TransactionServices &transaction_services= TransactionServices::singleton();
706
 
 
707
 
  if (transaction.xid_state.xa_state != XA_NOTR)
708
 
  {
709
 
    my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
710
 
    return false;
711
 
  }
712
 
  switch (completion)
713
 
  {
714
 
    case COMMIT:
715
 
      /*
716
 
       * We don't use endActiveTransaction() here to ensure that this works
717
 
       * even if there is a problem with the OPTION_AUTO_COMMIT flag
718
 
       * (Which of course should never happen...)
719
 
       */
720
 
      server_status&= ~SERVER_STATUS_IN_TRANS;
721
 
      if (transaction_services.commitTransaction(this, true))
722
 
        result= false;
723
 
      options&= ~(OPTION_BEGIN);
724
 
      break;
725
 
    case COMMIT_RELEASE:
726
 
      do_release= 1; /* fall through */
727
 
    case COMMIT_AND_CHAIN:
728
 
      result= endActiveTransaction();
729
 
      if (result == true && completion == COMMIT_AND_CHAIN)
730
 
        result= startTransaction();
731
 
      break;
732
 
    case ROLLBACK_RELEASE:
733
 
      do_release= 1; /* fall through */
734
 
    case ROLLBACK:
735
 
    case ROLLBACK_AND_CHAIN:
736
 
    {
737
 
      server_status&= ~SERVER_STATUS_IN_TRANS;
738
 
      if (transaction_services.rollbackTransaction(this, true))
739
 
        result= false;
740
 
      options&= ~(OPTION_BEGIN);
741
 
      if (result == true && (completion == ROLLBACK_AND_CHAIN))
742
 
        result= startTransaction();
743
 
      break;
744
 
    }
745
 
    default:
746
 
      my_error(ER_UNKNOWN_COM_ERROR, MYF(0));
747
 
      return false;
748
 
  }
749
 
 
750
 
  if (result == false)
751
 
    my_error(killed_errno(), MYF(0));
752
 
  else if ((result == true) && do_release)
753
 
    killed= Session::KILL_CONNECTION;
754
 
 
755
 
  return result;
756
 
}
757
 
 
758
 
bool Session::endActiveTransaction()
759
 
{
760
 
  bool result= true;
761
 
  TransactionServices &transaction_services= TransactionServices::singleton();
762
 
 
763
 
  if (transaction.xid_state.xa_state != XA_NOTR)
764
 
  {
765
 
    my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
766
 
    return false;
767
 
  }
768
 
  if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
769
 
  {
770
 
    server_status&= ~SERVER_STATUS_IN_TRANS;
771
 
    if (transaction_services.commitTransaction(this, true))
772
 
      result= false;
773
 
  }
774
 
  options&= ~(OPTION_BEGIN);
775
 
  return result;
776
 
}
777
 
 
778
 
bool Session::startTransaction(start_transaction_option_t opt)
779
 
{
780
 
  bool result= true;
781
 
 
782
 
  if (! endActiveTransaction())
783
 
  {
784
 
    result= false;
785
 
  }
786
 
  else
787
 
  {
788
 
    options|= OPTION_BEGIN;
789
 
    server_status|= SERVER_STATUS_IN_TRANS;
790
 
 
791
 
    if (plugin::TransactionalStorageEngine::notifyStartTransaction(this, opt))
792
 
    {
793
 
      result= false;
794
 
    }
795
 
  }
796
 
 
797
 
  return result;
798
 
}
799
 
 
800
 
void Session::cleanup_after_query()
801
 
{
802
 
  /*
803
 
    Reset rand_used so that detection of calls to rand() will save random
 
884
void THD::cleanup_after_query()
 
885
{
 
886
  /*
 
887
    Reset rand_used so that detection of calls to rand() will save random 
804
888
    seeds if needed by the slave.
805
889
  */
806
890
  {
807
891
    /* Forget those values, for next binlogger: */
 
892
    stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
808
893
    auto_inc_intervals_in_cur_stmt_for_binlog.empty();
 
894
    rand_used= 0;
809
895
  }
810
896
  if (first_successful_insert_id_in_cur_stmt > 0)
811
897
  {
812
898
    /* set what LAST_INSERT_ID() will return */
813
 
    first_successful_insert_id_in_prev_stmt= first_successful_insert_id_in_cur_stmt;
 
899
    first_successful_insert_id_in_prev_stmt= 
 
900
      first_successful_insert_id_in_cur_stmt;
814
901
    first_successful_insert_id_in_cur_stmt= 0;
815
902
    substitute_null_with_insert_id= true;
816
903
  }
817
 
  arg_of_last_insert_id_function= false;
 
904
  arg_of_last_insert_id_function= 0;
818
905
  /* Free Items that were created during this execution */
819
906
  free_items();
820
907
  /* Reset where. */
821
 
  where= Session::DEFAULT_WHERE;
822
 
 
823
 
  /* Reset the temporary shares we built */
824
 
  for (std::vector<TableShareInstance *>::iterator iter= temporary_shares.begin();
825
 
       iter != temporary_shares.end(); iter++)
826
 
  {
827
 
    delete *iter;
828
 
  }
829
 
  temporary_shares.clear();
 
908
  where= THD::DEFAULT_WHERE;
830
909
}
831
910
 
 
911
 
832
912
/**
833
913
  Create a LEX_STRING in this connection.
834
914
 
839
919
                              instead of using lex_str value
840
920
  @return  NULL on failure, or pointer to the LEX_STRING object
841
921
*/
842
 
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
843
 
                                     const std::string &str,
844
 
                                     bool allocate_lex_string)
845
 
{
846
 
  return make_lex_string(lex_str, str.c_str(), str.length(), allocate_lex_string);
847
 
}
848
 
 
849
 
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
850
 
                                     const char* str, uint32_t length,
851
 
                                     bool allocate_lex_string)
 
922
LEX_STRING *THD::make_lex_string(LEX_STRING *lex_str,
 
923
                                 const char* str, uint32_t length,
 
924
                                 bool allocate_lex_string)
852
925
{
853
926
  if (allocate_lex_string)
854
927
    if (!(lex_str= (LEX_STRING *)alloc(sizeof(LEX_STRING))))
855
928
      return 0;
856
 
  if (!(lex_str->str= mem_root->strmake_root(str, length)))
 
929
  if (!(lex_str->str= strmake_root(mem_root, str, length)))
857
930
    return 0;
858
931
  lex_str->length= length;
859
932
  return lex_str;
860
933
}
861
934
 
862
 
int Session::send_explain_fields(select_result *result)
 
935
 
 
936
/*
 
937
  Convert a string to another character set
 
938
 
 
939
  SYNOPSIS
 
940
    convert_string()
 
941
    to                          Store new allocated string here
 
942
    to_cs                       New character set for allocated string
 
943
    from                        String to convert
 
944
    from_length                 Length of string to convert
 
945
    from_cs                     Original character set
 
946
 
 
947
  NOTES
 
948
    to will be 0-terminated to make it easy to pass to system funcs
 
949
 
 
950
  RETURN
 
951
    0   ok
 
952
    1   End of memory.
 
953
        In this case to->str will point to 0 and to->length will be 0.
 
954
*/
 
955
 
 
956
bool THD::convert_string(LEX_STRING *to, const CHARSET_INFO * const to_cs,
 
957
                         const char *from, uint32_t from_length,
 
958
                         const CHARSET_INFO * const from_cs)
 
959
{
 
960
  size_t new_length= to_cs->mbmaxlen * from_length;
 
961
  uint32_t dummy_errors;
 
962
  if (!(to->str= (char*) alloc(new_length+1)))
 
963
  {
 
964
    to->length= 0;                              // Safety fix
 
965
    return(1);                          // EOM
 
966
  }
 
967
  to->length= copy_and_convert((char*) to->str, new_length, to_cs,
 
968
                               from, from_length, from_cs, &dummy_errors);
 
969
  to->str[to->length]=0;                        // Safety
 
970
  return(0);
 
971
}
 
972
 
 
973
 
 
974
/*
 
975
  Convert string from source character set to target character set inplace.
 
976
 
 
977
  SYNOPSIS
 
978
    THD::convert_string
 
979
 
 
980
  DESCRIPTION
 
981
    Convert string using convert_buffer - buffer for character set 
 
982
    conversion shared between all protocols.
 
983
 
 
984
  RETURN
 
985
    0   ok
 
986
   !0   out of memory
 
987
*/
 
988
 
 
989
bool THD::convert_string(String *s, const CHARSET_INFO * const from_cs,
 
990
                         const CHARSET_INFO * const to_cs)
 
991
{
 
992
  uint32_t dummy_errors;
 
993
  if (convert_buffer.copy(s->ptr(), s->length(), from_cs, to_cs, &dummy_errors))
 
994
    return true;
 
995
  /* If convert_buffer >> s copying is more efficient long term */
 
996
  if (convert_buffer.alloced_length() >= convert_buffer.length() * 2 ||
 
997
      !s->is_alloced())
 
998
  {
 
999
    return s->copy(convert_buffer);
 
1000
  }
 
1001
  s->swap(convert_buffer);
 
1002
  return false;
 
1003
}
 
1004
 
 
1005
 
 
1006
/*
 
1007
  Update some cache variables when character set changes
 
1008
*/
 
1009
 
 
1010
void THD::update_charset()
 
1011
{
 
1012
  uint32_t not_used;
 
1013
  charset_is_system_charset= !String::needs_conversion(0,charset(),
 
1014
                                                       system_charset_info,
 
1015
                                                       &not_used);
 
1016
  charset_is_collation_connection= 
 
1017
    !String::needs_conversion(0,charset(),variables.collation_connection,
 
1018
                              &not_used);
 
1019
  charset_is_character_set_filesystem= 
 
1020
    !String::needs_conversion(0, charset(),
 
1021
                              variables.character_set_filesystem, &not_used);
 
1022
}
 
1023
 
 
1024
 
 
1025
/* routings to adding tables to list of changed in transaction tables */
 
1026
 
 
1027
inline static void list_include(CHANGED_TableList** prev,
 
1028
                                CHANGED_TableList* curr,
 
1029
                                CHANGED_TableList* new_table)
 
1030
{
 
1031
  if (new_table)
 
1032
  {
 
1033
    *prev = new_table;
 
1034
    (*prev)->next = curr;
 
1035
  }
 
1036
}
 
1037
 
 
1038
/* add table to list of changed in transaction tables */
 
1039
 
 
1040
void THD::add_changed_table(Table *table)
 
1041
{
 
1042
  assert((options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
 
1043
              table->file->has_transactions());
 
1044
  add_changed_table(table->s->table_cache_key.str,
 
1045
                    (long) table->s->table_cache_key.length);
 
1046
  return;
 
1047
}
 
1048
 
 
1049
 
 
1050
void THD::add_changed_table(const char *key, long key_length)
 
1051
{
 
1052
  CHANGED_TableList **prev_changed = &transaction.changed_tables;
 
1053
  CHANGED_TableList *curr = transaction.changed_tables;
 
1054
 
 
1055
  for (; curr; prev_changed = &(curr->next), curr = curr->next)
 
1056
  {
 
1057
    int cmp =  (long)curr->key_length - (long)key_length;
 
1058
    if (cmp < 0)
 
1059
    {
 
1060
      list_include(prev_changed, curr, changed_table_dup(key, key_length));
 
1061
      return;
 
1062
    }
 
1063
    else if (cmp == 0)
 
1064
    {
 
1065
      cmp = memcmp(curr->key, key, curr->key_length);
 
1066
      if (cmp < 0)
 
1067
      {
 
1068
        list_include(prev_changed, curr, changed_table_dup(key, key_length));
 
1069
        return;
 
1070
      }
 
1071
      else if (cmp == 0)
 
1072
      {
 
1073
        return;
 
1074
      }
 
1075
    }
 
1076
  }
 
1077
  *prev_changed = changed_table_dup(key, key_length);
 
1078
  return;
 
1079
}
 
1080
 
 
1081
 
 
1082
CHANGED_TableList* THD::changed_table_dup(const char *key, long key_length)
 
1083
{
 
1084
  CHANGED_TableList* new_table = 
 
1085
    (CHANGED_TableList*) trans_alloc(ALIGN_SIZE(sizeof(CHANGED_TableList))+
 
1086
                                      key_length + 1);
 
1087
  if (!new_table)
 
1088
  {
 
1089
    my_error(EE_OUTOFMEMORY, MYF(ME_BELL),
 
1090
             ALIGN_SIZE(sizeof(TableList)) + key_length + 1);
 
1091
    killed= KILL_CONNECTION;
 
1092
    return 0;
 
1093
  }
 
1094
 
 
1095
  new_table->key= ((char*)new_table)+ ALIGN_SIZE(sizeof(CHANGED_TableList));
 
1096
  new_table->next = 0;
 
1097
  new_table->key_length = key_length;
 
1098
  ::memcpy(new_table->key, key, key_length);
 
1099
  return new_table;
 
1100
}
 
1101
 
 
1102
 
 
1103
int THD::send_explain_fields(select_result *result)
863
1104
{
864
1105
  List<Item> field_list;
865
1106
  Item *item;
894
1135
  }
895
1136
  item->maybe_null= 1;
896
1137
  field_list.push_back(new Item_empty_string("Extra", 255, cs));
897
 
  return (result->send_fields(field_list));
898
 
}
899
 
 
900
 
void select_result::send_error(uint32_t errcode, const char *err)
 
1138
  return (result->send_fields(field_list,
 
1139
                              Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF));
 
1140
}
 
1141
 
 
1142
 
 
1143
struct Item_change_record: public ilink
 
1144
{
 
1145
  Item **place;
 
1146
  Item *old_value;
 
1147
  /* Placement new was hidden by `new' in ilink (TODO: check): */
 
1148
  static void *operator new(size_t size __attribute__((unused)),
 
1149
                            void *mem)
 
1150
    { return mem; }
 
1151
  static void operator delete(void *ptr __attribute__((unused)),
 
1152
                              size_t size __attribute__((unused)))
 
1153
    {}
 
1154
  static void operator delete(void *ptr __attribute__((unused)),
 
1155
                              void *mem __attribute__((unused)))
 
1156
    { /* never called */ }
 
1157
};
 
1158
 
 
1159
 
 
1160
/*
 
1161
  Register an item tree tree transformation, performed by the query
 
1162
  optimizer. We need a pointer to runtime_memroot because it may be !=
 
1163
  thd->mem_root (this may no longer be a true statement)
 
1164
*/
 
1165
 
 
1166
void THD::nocheck_register_item_tree_change(Item **place, Item *old_value,
 
1167
                                            MEM_ROOT *runtime_memroot)
 
1168
{
 
1169
  Item_change_record *change;
 
1170
  /*
 
1171
    Now we use one node per change, which adds some memory overhead,
 
1172
    but still is rather fast as we use alloc_root for allocations.
 
1173
    A list of item tree changes of an average query should be short.
 
1174
  */
 
1175
  void *change_mem= alloc_root(runtime_memroot, sizeof(*change));
 
1176
  if (change_mem == 0)
 
1177
  {
 
1178
    /*
 
1179
      OOM, thd->fatal_error() is called by the error handler of the
 
1180
      memroot. Just return.
 
1181
    */
 
1182
    return;
 
1183
  }
 
1184
  change= new (change_mem) Item_change_record;
 
1185
  change->place= place;
 
1186
  change->old_value= old_value;
 
1187
  change_list.append(change);
 
1188
}
 
1189
 
 
1190
 
 
1191
void THD::rollback_item_tree_changes()
 
1192
{
 
1193
  I_List_iterator<Item_change_record> it(change_list);
 
1194
  Item_change_record *change;
 
1195
 
 
1196
  while ((change= it++))
 
1197
    *change->place= change->old_value;
 
1198
  /* We can forget about changes memory: it's allocated in runtime memroot */
 
1199
  change_list.empty();
 
1200
  return;
 
1201
}
 
1202
 
 
1203
 
 
1204
/*****************************************************************************
 
1205
** Functions to provide a interface to select results
 
1206
*****************************************************************************/
 
1207
 
 
1208
select_result::select_result()
 
1209
{
 
1210
  thd=current_thd;
 
1211
}
 
1212
 
 
1213
void select_result::send_error(uint32_t errcode,const char *err)
901
1214
{
902
1215
  my_message(errcode, err, MYF(0));
903
1216
}
904
1217
 
 
1218
 
 
1219
void select_result::cleanup()
 
1220
{
 
1221
  /* do nothing */
 
1222
}
 
1223
 
 
1224
bool select_result::check_simple_select() const
 
1225
{
 
1226
  my_error(ER_SP_BAD_CURSOR_QUERY, MYF(0));
 
1227
  return true;
 
1228
}
 
1229
 
 
1230
 
 
1231
static String default_line_term("\n",default_charset_info);
 
1232
static String default_escaped("\\",default_charset_info);
 
1233
static String default_field_term("\t",default_charset_info);
 
1234
 
 
1235
sql_exchange::sql_exchange(char *name, bool flag,
 
1236
                           enum enum_filetype filetype_arg)
 
1237
  :file_name(name), opt_enclosed(0), dumpfile(flag), skip_lines(0)
 
1238
{
 
1239
  filetype= filetype_arg;
 
1240
  field_term= &default_field_term;
 
1241
  enclosed=   line_start= &my_empty_string;
 
1242
  line_term=  &default_line_term;
 
1243
  escaped=    &default_escaped;
 
1244
  cs= NULL;
 
1245
}
 
1246
 
 
1247
bool select_send::send_fields(List<Item> &list, uint32_t flags)
 
1248
{
 
1249
  bool res;
 
1250
  if (!(res= thd->protocol->send_fields(&list, flags)))
 
1251
    is_result_set_started= 1;
 
1252
  return res;
 
1253
}
 
1254
 
 
1255
void select_send::abort()
 
1256
{
 
1257
  return;
 
1258
}
 
1259
 
 
1260
 
 
1261
/** 
 
1262
  Cleanup an instance of this class for re-use
 
1263
  at next execution of a prepared statement/
 
1264
  stored procedure statement.
 
1265
*/
 
1266
 
 
1267
void select_send::cleanup()
 
1268
{
 
1269
  is_result_set_started= false;
 
1270
}
 
1271
 
 
1272
/* Send data to client. Returns 0 if ok */
 
1273
 
 
1274
bool select_send::send_data(List<Item> &items)
 
1275
{
 
1276
  if (unit->offset_limit_cnt)
 
1277
  {                                             // using limit offset,count
 
1278
    unit->offset_limit_cnt--;
 
1279
    return 0;
 
1280
  }
 
1281
 
 
1282
  /*
 
1283
    We may be passing the control from mysqld to the client: release the
 
1284
    InnoDB adaptive hash S-latch to avoid thread deadlocks if it was reserved
 
1285
    by thd
 
1286
  */
 
1287
  ha_release_temporary_latches(thd);
 
1288
 
 
1289
  List_iterator_fast<Item> li(items);
 
1290
  Protocol *protocol= thd->protocol;
 
1291
  char buff[MAX_FIELD_WIDTH];
 
1292
  String buffer(buff, sizeof(buff), &my_charset_bin);
 
1293
 
 
1294
  protocol->prepare_for_resend();
 
1295
  Item *item;
 
1296
  while ((item=li++))
 
1297
  {
 
1298
    if (item->send(protocol, &buffer))
 
1299
    {
 
1300
      protocol->free();                         // Free used buffer
 
1301
      my_message(ER_OUT_OF_RESOURCES, ER(ER_OUT_OF_RESOURCES), MYF(0));
 
1302
      break;
 
1303
    }
 
1304
  }
 
1305
  thd->sent_row_count++;
 
1306
  if (thd->is_error())
 
1307
  {
 
1308
    protocol->remove_last_row();
 
1309
    return(1);
 
1310
  }
 
1311
  if (thd->vio_ok())
 
1312
    return(protocol->write());
 
1313
  return(0);
 
1314
}
 
1315
 
 
1316
bool select_send::send_eof()
 
1317
{
 
1318
  /* 
 
1319
    We may be passing the control from mysqld to the client: release the
 
1320
    InnoDB adaptive hash S-latch to avoid thread deadlocks if it was reserved
 
1321
    by thd 
 
1322
  */
 
1323
  ha_release_temporary_latches(thd);
 
1324
 
 
1325
  /* Unlock tables before sending packet to gain some speed */
 
1326
  if (thd->lock)
 
1327
  {
 
1328
    mysql_unlock_tables(thd, thd->lock);
 
1329
    thd->lock=0;
 
1330
  }
 
1331
  ::my_eof(thd);
 
1332
  is_result_set_started= 0;
 
1333
  return false;
 
1334
}
 
1335
 
 
1336
 
905
1337
/************************************************************************
906
1338
  Handling writing to file
907
1339
************************************************************************/
911
1343
  my_message(errcode, err, MYF(0));
912
1344
  if (file > 0)
913
1345
  {
914
 
    (void) end_io_cache(cache);
915
 
    (void) internal::my_close(file, MYF(0));
916
 
    (void) internal::my_delete(path, MYF(0));           // Delete file on error
 
1346
    (void) end_io_cache(&cache);
 
1347
    (void) my_close(file,MYF(0));
 
1348
    (void) my_delete(path,MYF(0));              // Delete file on error
917
1349
    file= -1;
918
1350
  }
919
1351
}
921
1353
 
922
1354
bool select_to_file::send_eof()
923
1355
{
924
 
  int error= test(end_io_cache(cache));
925
 
  if (internal::my_close(file, MYF(MY_WME)))
 
1356
  int error= test(end_io_cache(&cache));
 
1357
  if (my_close(file,MYF(MY_WME)))
926
1358
    error= 1;
927
1359
  if (!error)
928
1360
  {
931
1363
      function, SELECT INTO has to have an own SQLCOM.
932
1364
      TODO: split from SQLCOM_SELECT
933
1365
    */
934
 
    session->my_ok(row_count);
 
1366
    ::my_ok(thd,row_count);
935
1367
  }
936
1368
  file= -1;
937
1369
  return error;
943
1375
  /* In case of error send_eof() may be not called: close the file here. */
944
1376
  if (file >= 0)
945
1377
  {
946
 
    (void) end_io_cache(cache);
947
 
    (void) internal::my_close(file, MYF(0));
 
1378
    (void) end_io_cache(&cache);
 
1379
    (void) my_close(file,MYF(0));
948
1380
    file= -1;
949
1381
  }
950
1382
  path[0]= '\0';
951
1383
  row_count= 0;
952
1384
}
953
1385
 
954
 
select_to_file::select_to_file(file_exchange *ex)
955
 
  : exchange(ex),
956
 
    file(-1),
957
 
    cache(static_cast<internal::IO_CACHE *>(memory::sql_calloc(sizeof(internal::IO_CACHE)))),
958
 
    row_count(0L)
959
 
{
960
 
  path[0]=0;
961
 
}
962
1386
 
963
1387
select_to_file::~select_to_file()
964
1388
{
965
 
  cleanup();
 
1389
  if (file >= 0)
 
1390
  {                                     // This only happens in case of error
 
1391
    (void) end_io_cache(&cache);
 
1392
    (void) my_close(file,MYF(0));
 
1393
    file= -1;
 
1394
  }
966
1395
}
967
1396
 
968
1397
/***************************************************************************
971
1400
 
972
1401
select_export::~select_export()
973
1402
{
974
 
  session->sent_row_count=row_count;
 
1403
  thd->sent_row_count=row_count;
975
1404
}
976
1405
 
977
1406
 
980
1409
 
981
1410
  SYNOPSIS
982
1411
    create_file()
983
 
    session                     Thread handle
 
1412
    thd                 Thread handle
984
1413
    path                File name
985
1414
    exchange            Excange class
986
1415
    cache               IO cache
991
1420
*/
992
1421
 
993
1422
 
994
 
static int create_file(Session *session, char *path, file_exchange *exchange, internal::IO_CACHE *cache)
 
1423
static File create_file(THD *thd, char *path, sql_exchange *exchange,
 
1424
                        IO_CACHE *cache)
995
1425
{
996
 
  int file;
 
1426
  File file;
997
1427
  uint32_t option= MY_UNPACK_FILENAME | MY_RELATIVE_PATH;
998
1428
 
999
1429
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
1000
1430
  option|= MY_REPLACE_DIR;                      // Force use of db directory
1001
1431
#endif
1002
1432
 
1003
 
  if (!internal::dirname_length(exchange->file_name))
 
1433
  if (!dirname_length(exchange->file_name))
1004
1434
  {
1005
 
    strcpy(path, getDataHomeCatalog().c_str());
1006
 
    strncat(path, "/", 1);
1007
 
    if (! session->db.empty())
1008
 
      strncat(path, session->db.c_str(), FN_REFLEN-getDataHomeCatalog().size());
1009
 
    (void) internal::fn_format(path, exchange->file_name, path, "", option);
 
1435
    strxnmov(path, FN_REFLEN-1, mysql_real_data_home, thd->db ? thd->db : "",
 
1436
             NULL);
 
1437
    (void) fn_format(path, exchange->file_name, path, "", option);
1010
1438
  }
1011
1439
  else
1012
 
    (void) internal::fn_format(path, exchange->file_name, getDataHomeCatalog().c_str(), "", option);
 
1440
    (void) fn_format(path, exchange->file_name, mysql_real_data_home, "", option);
1013
1441
 
1014
 
  if (opt_secure_file_priv)
 
1442
  if (opt_secure_file_priv &&
 
1443
      strncmp(opt_secure_file_priv, path, strlen(opt_secure_file_priv)))
1015
1444
  {
1016
 
    fs::path secure_file_path(fs::system_complete(fs::path(opt_secure_file_priv)));
1017
 
    fs::path target_path(fs::system_complete(fs::path(path)));
1018
 
    if (target_path.file_string().substr(0, secure_file_path.file_string().size()) != secure_file_path.file_string())
1019
 
    {
1020
 
      /* Write only allowed to dir or subdir specified by secure_file_priv */
1021
 
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
1022
 
      return -1;
1023
 
    }
 
1445
    /* Write only allowed to dir or subdir specified by secure_file_priv */
 
1446
    my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
1447
    return -1;
1024
1448
  }
1025
1449
 
1026
1450
  if (!access(path, F_OK))
1029
1453
    return -1;
1030
1454
  }
1031
1455
  /* Create the file world readable */
1032
 
  if ((file= internal::my_create(path, 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
 
1456
  if ((file= my_create(path, 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
1033
1457
    return file;
 
1458
#ifdef HAVE_FCHMOD
1034
1459
  (void) fchmod(file, 0666);                    // Because of umask()
1035
 
  if (init_io_cache(cache, file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
 
1460
#else
 
1461
  (void) chmod(path, 0666);
 
1462
#endif
 
1463
  if (init_io_cache(cache, file, 0L, WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1036
1464
  {
1037
 
    internal::my_close(file, MYF(0));
1038
 
    internal::my_delete(path, MYF(0));  // Delete file on error, it was just created
 
1465
    my_close(file, MYF(0));
 
1466
    my_delete(path, MYF(0));  // Delete file on error, it was just created 
1039
1467
    return -1;
1040
1468
  }
1041
1469
  return file;
1043
1471
 
1044
1472
 
1045
1473
int
1046
 
select_export::prepare(List<Item> &list, Select_Lex_Unit *u)
 
1474
select_export::prepare(List<Item> &list, SELECT_LEX_UNIT *u)
1047
1475
{
1048
1476
  bool blob_flag=0;
1049
1477
  bool string_results= false, non_string_results= false;
1050
1478
  unit= u;
1051
 
  if ((uint32_t) strlen(exchange->file_name) + NAME_LEN >= FN_REFLEN)
1052
 
    strncpy(path,exchange->file_name,FN_REFLEN-1);
 
1479
  if ((uint) strlen(exchange->file_name) + NAME_LEN >= FN_REFLEN)
 
1480
    strmake(path,exchange->file_name,FN_REFLEN-1);
1053
1481
 
 
1482
  if ((file= create_file(thd, path, exchange, &cache)) < 0)
 
1483
    return 1;
1054
1484
  /* Check if there is any blobs in data */
1055
1485
  {
1056
1486
    List_iterator_fast<Item> li(list);
1092
1522
      (exchange->opt_enclosed && non_string_results &&
1093
1523
       field_term_length && strchr(NUMERIC_CHARS, field_term_char)))
1094
1524
  {
1095
 
    my_error(ER_AMBIGUOUS_FIELD_TERM, MYF(0));
1096
 
    return 1;
 
1525
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1526
                 ER_AMBIGUOUS_FIELD_TERM, ER(ER_AMBIGUOUS_FIELD_TERM));
 
1527
    is_ambiguous_field_term= true;
1097
1528
  }
1098
 
 
1099
 
  if ((file= create_file(session, path, exchange, cache)) < 0)
1100
 
    return 1;
 
1529
  else
 
1530
    is_ambiguous_field_term= false;
1101
1531
 
1102
1532
  return 0;
1103
1533
}
1104
1534
 
 
1535
 
 
1536
#define NEED_ESCAPING(x) ((int) (unsigned char) (x) == escape_char    || \
 
1537
                          (enclosed ? (int) (unsigned char) (x) == field_sep_char      \
 
1538
                                    : (int) (unsigned char) (x) == field_term_char) || \
 
1539
                          (int) (unsigned char) (x) == line_sep_char  || \
 
1540
                          !(x))
 
1541
 
1105
1542
bool select_export::send_data(List<Item> &items)
1106
1543
{
1107
1544
  char buff[MAX_FIELD_WIDTH],null_buff[2],space[MAX_FIELD_WIDTH];
1119
1556
  uint32_t used_length=0,items_left=items.elements;
1120
1557
  List_iterator_fast<Item> li(items);
1121
1558
 
1122
 
  if (my_b_write(cache,(unsigned char*) exchange->line_start->ptr(),
1123
 
                 exchange->line_start->length()))
 
1559
  if (my_b_write(&cache,(unsigned char*) exchange->line_start->ptr(),
 
1560
                 exchange->line_start->length()))
1124
1561
    goto err;
1125
1562
  while ((item=li++))
1126
1563
  {
1130
1567
    res=item->str_result(&tmp);
1131
1568
    if (res && enclosed)
1132
1569
    {
1133
 
      if (my_b_write(cache,(unsigned char*) exchange->enclosed->ptr(),
1134
 
                     exchange->enclosed->length()))
1135
 
        goto err;
 
1570
      if (my_b_write(&cache,(unsigned char*) exchange->enclosed->ptr(),
 
1571
                     exchange->enclosed->length()))
 
1572
        goto err;
1136
1573
    }
1137
1574
    if (!res)
1138
1575
    {                                           // NULL
1139
1576
      if (!fixed_row_size)
1140
1577
      {
1141
 
        if (escape_char != -1)                  // Use \N syntax
1142
 
        {
1143
 
          null_buff[0]=escape_char;
1144
 
          null_buff[1]='N';
1145
 
          if (my_b_write(cache,(unsigned char*) null_buff,2))
1146
 
            goto err;
1147
 
        }
1148
 
        else if (my_b_write(cache,(unsigned char*) "NULL",4))
1149
 
          goto err;
 
1578
        if (escape_char != -1)                  // Use \N syntax
 
1579
        {
 
1580
          null_buff[0]=escape_char;
 
1581
          null_buff[1]='N';
 
1582
          if (my_b_write(&cache,(unsigned char*) null_buff,2))
 
1583
            goto err;
 
1584
        }
 
1585
        else if (my_b_write(&cache,(unsigned char*) "NULL",4))
 
1586
          goto err;
1150
1587
      }
1151
1588
      else
1152
1589
      {
1153
 
        used_length=0;                          // Fill with space
 
1590
        used_length=0;                          // Fill with space
1154
1591
      }
1155
1592
    }
1156
1593
    else
1157
1594
    {
1158
1595
      if (fixed_row_size)
1159
 
        used_length= min(res->length(),item->max_length);
 
1596
        used_length=cmin(res->length(),item->max_length);
1160
1597
      else
1161
 
        used_length= res->length();
1162
 
 
 
1598
        used_length=res->length();
1163
1599
      if ((result_type == STRING_RESULT || is_unsafe_field_sep) &&
1164
 
          escape_char != -1)
 
1600
           escape_char != -1)
1165
1601
      {
1166
1602
        char *pos, *start, *end;
1167
1603
        const CHARSET_INFO * const res_charset= res->charset();
1168
 
        const CHARSET_INFO * const character_set_client= default_charset_info;
1169
 
 
 
1604
        const CHARSET_INFO * const character_set_client= thd->variables.
 
1605
                                                            character_set_client;
1170
1606
        bool check_second_byte= (res_charset == &my_charset_bin) &&
1171
 
          character_set_client->
1172
 
          escape_with_backslash_is_dangerous;
 
1607
                                 character_set_client->
 
1608
                                 escape_with_backslash_is_dangerous;
1173
1609
        assert(character_set_client->mbmaxlen == 2 ||
1174
 
               !character_set_client->escape_with_backslash_is_dangerous);
1175
 
        for (start=pos=(char*) res->ptr(),end=pos+used_length ;
1176
 
             pos != end ;
1177
 
             pos++)
1178
 
        {
1179
 
          if (use_mb(res_charset))
1180
 
          {
1181
 
            int l;
1182
 
            if ((l=my_ismbchar(res_charset, pos, end)))
1183
 
            {
1184
 
              pos += l-1;
1185
 
              continue;
1186
 
            }
1187
 
          }
 
1610
                    !character_set_client->escape_with_backslash_is_dangerous);
 
1611
        for (start=pos=(char*) res->ptr(),end=pos+used_length ;
 
1612
             pos != end ;
 
1613
             pos++)
 
1614
        {
 
1615
#ifdef USE_MB
 
1616
          if (use_mb(res_charset))
 
1617
          {
 
1618
            int l;
 
1619
            if ((l=my_ismbchar(res_charset, pos, end)))
 
1620
            {
 
1621
              pos += l-1;
 
1622
              continue;
 
1623
            }
 
1624
          }
 
1625
#endif
1188
1626
 
1189
1627
          /*
1190
1628
            Special case when dumping BINARY/VARBINARY/BLOB values
1191
1629
            for the clients with character sets big5, cp932, gbk and sjis,
1192
1630
            which can have the escape character (0x5C "\" by default)
1193
1631
            as the second byte of a multi-byte sequence.
1194
 
 
 
1632
            
1195
1633
            If
1196
1634
            - pos[0] is a valid multi-byte head (e.g 0xEE) and
1197
1635
            - pos[1] is 0x00, which will be escaped as "\0",
1198
 
 
 
1636
            
1199
1637
            then we'll get "0xEE + 0x5C + 0x30" in the output file.
1200
 
 
 
1638
            
1201
1639
            If this file is later loaded using this sequence of commands:
1202
 
 
 
1640
            
1203
1641
            mysql> create table t1 (a varchar(128)) character set big5;
1204
1642
            mysql> LOAD DATA INFILE 'dump.txt' INTO Table t1;
1205
 
 
 
1643
            
1206
1644
            then 0x5C will be misinterpreted as the second byte
1207
1645
            of a multi-byte character "0xEE + 0x5C", instead of
1208
1646
            escape character for 0x00.
1209
 
 
 
1647
            
1210
1648
            To avoid this confusion, we'll escape the multi-byte
1211
1649
            head character too, so the sequence "0xEE + 0x00" will be
1212
1650
            dumped as "0x5C + 0xEE + 0x5C + 0x30".
1213
 
 
 
1651
            
1214
1652
            Note, in the condition below we only check if
1215
1653
            mbcharlen is equal to 2, because there are no
1216
1654
            character sets with mbmaxlen longer than 2
1218
1656
            assert before the loop makes that sure.
1219
1657
          */
1220
1658
 
1221
 
          if ((needs_escaping(*pos, enclosed) ||
 
1659
          if ((NEED_ESCAPING(*pos) ||
1222
1660
               (check_second_byte &&
1223
1661
                my_mbcharlen(character_set_client, (unsigned char) *pos) == 2 &&
1224
1662
                pos + 1 < end &&
1225
 
                needs_escaping(pos[1], enclosed))) &&
 
1663
                NEED_ESCAPING(pos[1]))) &&
1226
1664
              /*
1227
 
                Don't escape field_term_char by doubling - doubling is only
1228
 
                valid for ENCLOSED BY characters:
 
1665
               Don't escape field_term_char by doubling - doubling is only
 
1666
               valid for ENCLOSED BY characters:
1229
1667
              */
1230
1668
              (enclosed || !is_ambiguous_field_term ||
1231
1669
               (int) (unsigned char) *pos != field_term_char))
1232
1670
          {
1233
 
            char tmp_buff[2];
 
1671
            char tmp_buff[2];
1234
1672
            tmp_buff[0]= ((int) (unsigned char) *pos == field_sep_char &&
1235
1673
                          is_ambiguous_field_sep) ?
1236
 
              field_sep_char : escape_char;
1237
 
            tmp_buff[1]= *pos ? *pos : '0';
1238
 
            if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)) ||
1239
 
                my_b_write(cache,(unsigned char*) tmp_buff,2))
1240
 
              goto err;
1241
 
            start=pos+1;
1242
 
          }
1243
 
        }
1244
 
        if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)))
1245
 
          goto err;
 
1674
                          field_sep_char : escape_char;
 
1675
            tmp_buff[1]= *pos ? *pos : '0';
 
1676
            if (my_b_write(&cache,(unsigned char*) start,(uint) (pos-start)) ||
 
1677
                my_b_write(&cache,(unsigned char*) tmp_buff,2))
 
1678
              goto err;
 
1679
            start=pos+1;
 
1680
          }
 
1681
        }
 
1682
        if (my_b_write(&cache,(unsigned char*) start,(uint) (pos-start)))
 
1683
          goto err;
1246
1684
      }
1247
 
      else if (my_b_write(cache,(unsigned char*) res->ptr(),used_length))
1248
 
        goto err;
 
1685
      else if (my_b_write(&cache,(unsigned char*) res->ptr(),used_length))
 
1686
        goto err;
1249
1687
    }
1250
1688
    if (fixed_row_size)
1251
1689
    {                                           // Fill with space
1252
1690
      if (item->max_length > used_length)
1253
1691
      {
1254
 
        /* QQ:  Fix by adding a my_b_fill() function */
1255
 
        if (!space_inited)
1256
 
        {
1257
 
          space_inited=1;
1258
 
          memset(space, ' ', sizeof(space));
1259
 
        }
1260
 
        uint32_t length=item->max_length-used_length;
1261
 
        for (; length > sizeof(space) ; length-=sizeof(space))
1262
 
        {
1263
 
          if (my_b_write(cache,(unsigned char*) space,sizeof(space)))
1264
 
            goto err;
1265
 
        }
1266
 
        if (my_b_write(cache,(unsigned char*) space,length))
1267
 
          goto err;
 
1692
        /* QQ:  Fix by adding a my_b_fill() function */
 
1693
        if (!space_inited)
 
1694
        {
 
1695
          space_inited=1;
 
1696
          memset(space, ' ', sizeof(space));
 
1697
        }
 
1698
        uint32_t length=item->max_length-used_length;
 
1699
        for (; length > sizeof(space) ; length-=sizeof(space))
 
1700
        {
 
1701
          if (my_b_write(&cache,(unsigned char*) space,sizeof(space)))
 
1702
            goto err;
 
1703
        }
 
1704
        if (my_b_write(&cache,(unsigned char*) space,length))
 
1705
          goto err;
1268
1706
      }
1269
1707
    }
1270
1708
    if (res && enclosed)
1271
1709
    {
1272
 
      if (my_b_write(cache, (unsigned char*) exchange->enclosed->ptr(),
 
1710
      if (my_b_write(&cache, (unsigned char*) exchange->enclosed->ptr(),
1273
1711
                     exchange->enclosed->length()))
1274
1712
        goto err;
1275
1713
    }
1276
1714
    if (--items_left)
1277
1715
    {
1278
 
      if (my_b_write(cache, (unsigned char*) exchange->field_term->ptr(),
 
1716
      if (my_b_write(&cache, (unsigned char*) exchange->field_term->ptr(),
1279
1717
                     field_term_length))
1280
1718
        goto err;
1281
1719
    }
1282
1720
  }
1283
 
  if (my_b_write(cache,(unsigned char*) exchange->line_term->ptr(),
1284
 
                 exchange->line_term->length()))
 
1721
  if (my_b_write(&cache,(unsigned char*) exchange->line_term->ptr(),
 
1722
                 exchange->line_term->length()))
1285
1723
    goto err;
1286
1724
  return(0);
1287
1725
err:
1295
1733
 
1296
1734
 
1297
1735
int
1298
 
select_dump::prepare(List<Item> &, Select_Lex_Unit *u)
 
1736
select_dump::prepare(List<Item> &list __attribute__((unused)),
 
1737
                     SELECT_LEX_UNIT *u)
1299
1738
{
1300
1739
  unit= u;
1301
 
  return (int) ((file= create_file(session, path, exchange, cache)) < 0);
 
1740
  return (int) ((file= create_file(thd, path, exchange, &cache)) < 0);
1302
1741
}
1303
1742
 
1304
1743
 
1315
1754
    unit->offset_limit_cnt--;
1316
1755
    return(0);
1317
1756
  }
1318
 
  if (row_count++ > 1)
 
1757
  if (row_count++ > 1) 
1319
1758
  {
1320
1759
    my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0));
1321
1760
    goto err;
1325
1764
    res=item->str_result(&tmp);
1326
1765
    if (!res)                                   // If NULL
1327
1766
    {
1328
 
      if (my_b_write(cache,(unsigned char*) "",1))
 
1767
      if (my_b_write(&cache,(unsigned char*) "",1))
1329
1768
        goto err;
1330
1769
    }
1331
 
    else if (my_b_write(cache,(unsigned char*) res->ptr(),res->length()))
 
1770
    else if (my_b_write(&cache,(unsigned char*) res->ptr(),res->length()))
1332
1771
    {
1333
 
      my_error(ER_ERROR_ON_WRITE, MYF(0), path, errno);
 
1772
      my_error(ER_ERROR_ON_WRITE, MYF(0), path, my_errno);
1334
1773
      goto err;
1335
1774
    }
1336
1775
  }
1371
1810
void select_max_min_finder_subselect::cleanup()
1372
1811
{
1373
1812
  cache= 0;
 
1813
  return;
1374
1814
}
1375
1815
 
1376
1816
 
1477
1917
     sortcmp(val1, val2, cache->collation.collation) < 0);
1478
1918
}
1479
1919
 
1480
 
bool select_exists_subselect::send_data(List<Item> &)
 
1920
bool select_exists_subselect::send_data(List<Item> &items __attribute__((unused)))
1481
1921
{
1482
1922
  Item_exists_subselect *it= (Item_exists_subselect *)item;
1483
1923
  if (unit->offset_limit_cnt)
1490
1930
  return(0);
1491
1931
}
1492
1932
 
 
1933
 
 
1934
/***************************************************************************
 
1935
  Dump of select to variables
 
1936
***************************************************************************/
 
1937
 
 
1938
int select_dumpvar::prepare(List<Item> &list, SELECT_LEX_UNIT *u)
 
1939
{
 
1940
  unit= u;
 
1941
  
 
1942
  if (var_list.elements != list.elements)
 
1943
  {
 
1944
    my_message(ER_WRONG_NUMBER_OF_COLUMNS_IN_SELECT,
 
1945
               ER(ER_WRONG_NUMBER_OF_COLUMNS_IN_SELECT), MYF(0));
 
1946
    return 1;
 
1947
  }               
 
1948
  return 0;
 
1949
}
 
1950
 
 
1951
 
 
1952
bool select_dumpvar::check_simple_select() const
 
1953
{
 
1954
  my_error(ER_SP_BAD_CURSOR_SELECT, MYF(0));
 
1955
  return true;
 
1956
}
 
1957
 
 
1958
 
 
1959
void select_dumpvar::cleanup()
 
1960
{
 
1961
  row_count= 0;
 
1962
}
 
1963
 
 
1964
 
 
1965
void Query_arena::free_items()
 
1966
{
 
1967
  Item *next;
 
1968
  /* This works because items are allocated with sql_alloc() */
 
1969
  for (; free_list; free_list= next)
 
1970
  {
 
1971
    next= free_list->next;
 
1972
    free_list->delete_self();
 
1973
  }
 
1974
  /* Postcondition: free_list is 0 */
 
1975
  return;
 
1976
}
 
1977
 
 
1978
 
 
1979
/*
 
1980
  Statement functions
 
1981
*/
 
1982
 
 
1983
Statement::Statement(LEX *lex_arg, MEM_ROOT *mem_root_arg, ulong id_arg)
 
1984
  :Query_arena(mem_root_arg),
 
1985
  id(id_arg),
 
1986
  mark_used_columns(MARK_COLUMNS_READ),
 
1987
  lex(lex_arg),
 
1988
  query(0),
 
1989
  query_length(0),
 
1990
  db(NULL),
 
1991
  db_length(0)
 
1992
{
 
1993
}
 
1994
 
 
1995
 
1493
1996
/*
1494
1997
  Don't free mem_root, as mem_root is freed in the end of dispatch_command
1495
1998
  (once for any command).
1496
1999
*/
1497
 
void Session::end_statement()
 
2000
void THD::end_statement()
1498
2001
{
1499
2002
  /* Cleanup SQL processing state to reuse this statement in next query. */
1500
2003
  lex_end(lex);
1501
 
  query_cache_key= ""; // reset the cache key
1502
 
  resetResultsetMessage();
1503
2004
}
1504
2005
 
1505
 
bool Session::copy_db_to(char **p_db, size_t *p_db_length)
 
2006
 
 
2007
bool THD::copy_db_to(char **p_db, size_t *p_db_length)
1506
2008
{
1507
 
  if (db.empty())
 
2009
  if (db == NULL)
1508
2010
  {
1509
2011
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1510
2012
    return true;
1511
2013
  }
1512
 
  *p_db= strmake(db.c_str(), db.length());
1513
 
  *p_db_length= db.length();
 
2014
  *p_db= strmake(db, db_length);
 
2015
  *p_db_length= db_length;
1514
2016
  return false;
1515
2017
}
1516
2018
 
 
2019
 
 
2020
bool select_dumpvar::send_data(List<Item> &items)
 
2021
{
 
2022
  List_iterator_fast<my_var> var_li(var_list);
 
2023
  List_iterator<Item> it(items);
 
2024
  Item *item;
 
2025
  my_var *mv;
 
2026
 
 
2027
  if (unit->offset_limit_cnt)
 
2028
  {                                             // using limit offset,count
 
2029
    unit->offset_limit_cnt--;
 
2030
    return(0);
 
2031
  }
 
2032
  if (row_count++) 
 
2033
  {
 
2034
    my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0));
 
2035
    return(1);
 
2036
  }
 
2037
  while ((mv= var_li++) && (item= it++))
 
2038
  {
 
2039
    if (mv->local == 0)
 
2040
    {
 
2041
      Item_func_set_user_var *suv= new Item_func_set_user_var(mv->s, item);
 
2042
      suv->fix_fields(thd, 0);
 
2043
      suv->check(0);
 
2044
      suv->update();
 
2045
    }
 
2046
  }
 
2047
  return(thd->is_error());
 
2048
}
 
2049
 
 
2050
bool select_dumpvar::send_eof()
 
2051
{
 
2052
  if (! row_count)
 
2053
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
2054
                 ER_SP_FETCH_NO_DATA, ER(ER_SP_FETCH_NO_DATA));
 
2055
  /*
 
2056
    In order to remember the value of affected rows for ROW_COUNT()
 
2057
    function, SELECT INTO has to have an own SQLCOM.
 
2058
    TODO: split from SQLCOM_SELECT
 
2059
  */
 
2060
  ::my_ok(thd,row_count);
 
2061
  return 0;
 
2062
}
 
2063
 
1517
2064
/****************************************************************************
1518
 
  Tmp_Table_Param
 
2065
  TMP_TABLE_PARAM
1519
2066
****************************************************************************/
1520
2067
 
1521
 
void Tmp_Table_Param::init()
 
2068
void TMP_TABLE_PARAM::init()
1522
2069
{
1523
2070
  field_count= sum_func_count= func_count= hidden_field_count= 0;
1524
2071
  group_parts= group_length= group_null_parts= 0;
1525
2072
  quick_group= 1;
1526
2073
  table_charset= 0;
1527
2074
  precomputed_group_by= 0;
 
2075
  bit_fields_as_long= 0;
 
2076
  return;
1528
2077
}
1529
2078
 
1530
 
void Tmp_Table_Param::cleanup(void)
 
2079
 
 
2080
void thd_increment_bytes_sent(ulong length)
1531
2081
{
1532
 
  /* Fix for Intel compiler */
1533
 
  if (copy_field)
1534
 
  {
1535
 
    delete [] copy_field;
1536
 
    save_copy_field= copy_field= 0;
 
2082
  THD *thd=current_thd;
 
2083
  if (likely(thd != 0))
 
2084
  { /* current_thd==0 when close_connection() calls net_send_error() */
 
2085
    thd->status_var.bytes_sent+= length;
1537
2086
  }
1538
2087
}
1539
2088
 
1540
 
void Session::send_kill_message() const
 
2089
 
 
2090
void thd_increment_bytes_received(ulong length)
 
2091
{
 
2092
  current_thd->status_var.bytes_received+= length;
 
2093
}
 
2094
 
 
2095
 
 
2096
void thd_increment_net_big_packet_count(ulong length)
 
2097
{
 
2098
  current_thd->status_var.net_big_packet_count+= length;
 
2099
}
 
2100
 
 
2101
void THD::send_kill_message() const
1541
2102
{
1542
2103
  int err= killed_errno();
1543
2104
  if (err)
1544
2105
    my_message(err, ER(err), MYF(0));
1545
2106
}
1546
2107
 
1547
 
void Session::set_status_var_init()
 
2108
void THD::set_status_var_init()
1548
2109
{
1549
2110
  memset(&status_var, 0, sizeof(status_var));
1550
2111
}
1551
2112
 
1552
2113
 
1553
 
bool Session::set_db(const std::string &new_db)
1554
 
{
1555
 
  /* Do not reallocate memory if current chunk is big enough. */
1556
 
  if (new_db.length())
1557
 
    db= new_db;
1558
 
  else
1559
 
    db.clear();
1560
 
 
1561
 
  return false;
1562
 
}
1563
 
 
1564
 
 
1565
 
 
 
2114
void Security_context::init()
 
2115
{
 
2116
  user= ip= 0;
 
2117
}
 
2118
 
 
2119
 
 
2120
void Security_context::destroy()
 
2121
{
 
2122
  // If not pointer to constant
 
2123
  if (user)
 
2124
  {
 
2125
    free(user);
 
2126
    user= NULL;
 
2127
  }
 
2128
  if (ip)
 
2129
  {
 
2130
    free(ip);
 
2131
    ip= NULL;
 
2132
  }
 
2133
}
 
2134
 
 
2135
 
 
2136
void Security_context::skip_grants()
 
2137
{
 
2138
  /* privileges for the user are unknown everything is allowed */
 
2139
}
 
2140
 
 
2141
 
 
2142
/****************************************************************************
 
2143
  Handling of open and locked tables states.
 
2144
 
 
2145
  This is used when we want to open/lock (and then close) some tables when
 
2146
  we already have a set of tables open and locked. We use these methods for
 
2147
  access to mysql.proc table to find definitions of stored routines.
 
2148
****************************************************************************/
 
2149
 
 
2150
void THD::reset_n_backup_open_tables_state(Open_tables_state *backup)
 
2151
{
 
2152
  backup->set_open_tables_state(this);
 
2153
  reset_open_tables_state();
 
2154
  state_flags|= Open_tables_state::BACKUPS_AVAIL;
 
2155
  return;
 
2156
}
 
2157
 
 
2158
 
 
2159
void THD::restore_backup_open_tables_state(Open_tables_state *backup)
 
2160
{
 
2161
  /*
 
2162
    Before we will throw away current open tables state we want
 
2163
    to be sure that it was properly cleaned up.
 
2164
  */
 
2165
  assert(open_tables == 0 && temporary_tables == 0 &&
 
2166
              handler_tables == 0 && derived_tables == 0 &&
 
2167
              lock == 0 && locked_tables == 0);
 
2168
  set_open_tables_state(backup);
 
2169
  return;
 
2170
}
1566
2171
 
1567
2172
/**
1568
2173
  Check the killed state of a user thread
1569
 
  @param session  user thread
 
2174
  @param thd  user thread
1570
2175
  @retval 0 the user thread is active
1571
2176
  @retval 1 the user thread has been killed
1572
2177
*/
1573
 
int session_killed(const Session *session)
1574
 
{
1575
 
  return(session->killed);
1576
 
}
1577
 
 
1578
 
 
1579
 
const struct charset_info_st *session_charset(Session *session)
1580
 
{
1581
 
  return(session->charset());
1582
 
}
 
2178
extern "C" int thd_killed(const DRIZZLE_THD thd)
 
2179
{
 
2180
  return(thd->killed);
 
2181
}
 
2182
 
 
2183
/**
 
2184
  Return the thread id of a user thread
 
2185
  @param thd user thread
 
2186
  @return thread id
 
2187
*/
 
2188
extern "C" unsigned long thd_get_thread_id(const DRIZZLE_THD thd)
 
2189
{
 
2190
  return((unsigned long)thd->thread_id);
 
2191
}
 
2192
 
 
2193
 
 
2194
#ifdef INNODB_COMPATIBILITY_HOOKS
 
2195
extern "C" const struct charset_info_st *thd_charset(DRIZZLE_THD thd)
 
2196
{
 
2197
  return(thd->charset());
 
2198
}
 
2199
 
 
2200
extern "C" char **thd_query(DRIZZLE_THD thd)
 
2201
{
 
2202
  return(&thd->query);
 
2203
}
 
2204
 
 
2205
extern "C" int thd_slave_thread(const DRIZZLE_THD thd)
 
2206
{
 
2207
  return(thd->slave_thread);
 
2208
}
 
2209
 
 
2210
extern "C" int thd_non_transactional_update(const DRIZZLE_THD thd)
 
2211
{
 
2212
  return(thd->transaction.all.modified_non_trans_table);
 
2213
}
 
2214
 
 
2215
extern "C" int thd_binlog_format(const DRIZZLE_THD thd)
 
2216
{
 
2217
  return (int) thd->variables.binlog_format;
 
2218
}
 
2219
 
 
2220
extern "C" void thd_mark_transaction_to_rollback(DRIZZLE_THD thd, bool all)
 
2221
{
 
2222
  mark_transaction_to_rollback(thd, all);
 
2223
}
 
2224
#endif // INNODB_COMPATIBILITY_HOOKS */
 
2225
 
1583
2226
 
1584
2227
/**
1585
2228
  Mark transaction to rollback and mark error as fatal to a sub-statement.
1586
2229
 
1587
 
  @param  session   Thread handle
 
2230
  @param  thd   Thread handle
1588
2231
  @param  all   true <=> rollback main transaction.
1589
2232
*/
1590
 
void mark_transaction_to_rollback(Session *session, bool all)
1591
 
{
1592
 
  if (session)
1593
 
  {
1594
 
    session->is_fatal_sub_stmt_error= true;
1595
 
    session->transaction_rollback_request= all;
1596
 
  }
1597
 
}
1598
 
 
1599
 
void Session::disconnect(uint32_t errcode, bool should_lock)
1600
 
{
1601
 
  /* Allow any plugins to cleanup their session variables */
1602
 
  plugin_sessionvar_cleanup(this);
1603
 
 
1604
 
  /* If necessary, log any aborted or unauthorized connections */
1605
 
  if (killed || client->wasAborted())
1606
 
  {
1607
 
    status_var.aborted_threads++;
1608
 
  }
1609
 
 
1610
 
  if (client->wasAborted())
1611
 
  {
1612
 
    if (! killed && variables.log_warnings > 1)
1613
 
    {
1614
 
      SecurityContext *sctx= &security_ctx;
1615
 
 
1616
 
      errmsg_printf(ERRMSG_LVL_WARN, ER(ER_NEW_ABORTING_CONNECTION)
1617
 
                  , thread_id
1618
 
                  , (db.empty() ? "unconnected" : db.c_str())
1619
 
                  , sctx->getUser().empty() == false ? sctx->getUser().c_str() : "unauthenticated"
1620
 
                  , sctx->getIp().c_str()
1621
 
                  , (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1622
 
    }
1623
 
  }
1624
 
 
1625
 
  /* Close out our connection to the client */
1626
 
  if (should_lock)
1627
 
    LOCK_thread_count.lock();
1628
 
  killed= Session::KILL_CONNECTION;
1629
 
  if (client->isConnected())
1630
 
  {
1631
 
    if (errcode)
1632
 
    {
1633
 
      /*my_error(errcode, ER(errcode));*/
1634
 
      client->sendError(errcode, ER(errcode));
1635
 
    }
1636
 
    client->close();
1637
 
  }
1638
 
  if (should_lock)
1639
 
    (void) LOCK_thread_count.unlock();
1640
 
}
1641
 
 
1642
 
void Session::reset_for_next_command()
1643
 
{
1644
 
  free_list= 0;
1645
 
  select_number= 1;
1646
 
  /*
1647
 
    Those two lines below are theoretically unneeded as
1648
 
    Session::cleanup_after_query() should take care of this already.
1649
 
  */
1650
 
  auto_inc_intervals_in_cur_stmt_for_binlog.empty();
1651
 
 
1652
 
  is_fatal_error= false;
1653
 
  server_status&= ~ (SERVER_MORE_RESULTS_EXISTS |
1654
 
                          SERVER_QUERY_NO_INDEX_USED |
1655
 
                          SERVER_QUERY_NO_GOOD_INDEX_USED);
1656
 
 
1657
 
  clear_error();
1658
 
  main_da.reset_diagnostics_area();
1659
 
  total_warn_count=0;                   // Warnings for this query
1660
 
  sent_row_count= examined_row_count= 0;
1661
 
}
1662
 
 
1663
 
/*
1664
 
  Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
1665
 
*/
1666
 
 
1667
 
void Session::close_temporary_tables()
1668
 
{
1669
 
  Table *table;
1670
 
  Table *tmp_next;
1671
 
 
1672
 
  if (not temporary_tables)
1673
 
    return;
1674
 
 
1675
 
  for (table= temporary_tables; table; table= tmp_next)
1676
 
  {
1677
 
    tmp_next= table->getNext();
1678
 
    nukeTable(table);
1679
 
  }
1680
 
  temporary_tables= NULL;
1681
 
}
1682
 
 
1683
 
/*
1684
 
  unlink from session->temporary tables and close temporary table
1685
 
*/
1686
 
 
1687
 
void Session::close_temporary_table(Table *table)
1688
 
{
1689
 
  if (table->getPrev())
1690
 
  {
1691
 
    table->getPrev()->setNext(table->getNext());
1692
 
    if (table->getPrev()->getNext())
1693
 
    {
1694
 
      table->getNext()->setPrev(table->getPrev());
1695
 
    }
1696
 
  }
1697
 
  else
1698
 
  {
1699
 
    /* removing the item from the list */
1700
 
    assert(table == temporary_tables);
1701
 
    /*
1702
 
      slave must reset its temporary list pointer to zero to exclude
1703
 
      passing non-zero value to end_slave via rli->save_temporary_tables
1704
 
      when no temp tables opened, see an invariant below.
1705
 
    */
1706
 
    temporary_tables= table->getNext();
1707
 
    if (temporary_tables)
1708
 
    {
1709
 
      table->getNext()->setPrev(NULL);
1710
 
    }
1711
 
  }
1712
 
  nukeTable(table);
1713
 
}
1714
 
 
1715
 
/*
1716
 
  Close and drop a temporary table
1717
 
 
1718
 
  NOTE
1719
 
  This dosn't unlink table from session->temporary
1720
 
  If this is needed, use close_temporary_table()
1721
 
*/
1722
 
 
1723
 
void Session::nukeTable(Table *table)
1724
 
{
1725
 
  plugin::StorageEngine *table_type= table->getShare()->db_type();
1726
 
 
1727
 
  table->free_io_cache();
1728
 
  table->delete_table();
1729
 
 
1730
 
  TableIdentifier identifier(table->getShare()->getSchemaName(), table->getShare()->getTableName(), table->getShare()->getPath());
1731
 
  rm_temporary_table(table_type, identifier);
1732
 
 
1733
 
  delete table->getMutableShare();
1734
 
 
1735
 
  /* This makes me sad, but we're allocating it via malloc */
1736
 
  delete table;
1737
 
}
1738
 
 
1739
 
/** Clear most status variables. */
1740
 
extern time_t flush_status_time;
1741
 
 
1742
 
void Session::refresh_status()
1743
 
{
1744
 
  /* Reset thread's status variables */
1745
 
  memset(&status_var, 0, sizeof(status_var));
1746
 
 
1747
 
  flush_status_time= time((time_t*) 0);
1748
 
  current_global_counters.max_used_connections= 1; /* We set it to one, because we know we exist */
1749
 
  current_global_counters.connections= 0;
1750
 
}
1751
 
 
1752
 
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1753
 
{
1754
 
  user_var_entry *entry= NULL;
1755
 
  UserVarsRange ppp= user_vars.equal_range(std::string(name.str, name.length));
1756
 
 
1757
 
  for (UserVars::iterator iter= ppp.first;
1758
 
         iter != ppp.second; ++iter)
1759
 
  {
1760
 
    entry= (*iter).second;
1761
 
  }
1762
 
 
1763
 
  if ((entry == NULL) && create_if_not_exists)
1764
 
  {
1765
 
    entry= new (nothrow) user_var_entry(name.str, query_id);
1766
 
 
1767
 
    if (entry == NULL)
1768
 
      return NULL;
1769
 
 
1770
 
    std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(std::string(name.str, name.length), entry));
1771
 
 
1772
 
    if (not returnable.second)
1773
 
    {
1774
 
      delete entry;
1775
 
      return NULL;
1776
 
    }
1777
 
  }
1778
 
 
1779
 
  return entry;
1780
 
}
1781
 
 
1782
 
void Session::mark_temp_tables_as_free_for_reuse()
1783
 
{
1784
 
  for (Table *table= temporary_tables ; table ; table= table->getNext())
1785
 
  {
1786
 
    if (table->query_id == query_id)
1787
 
    {
1788
 
      table->query_id= 0;
1789
 
      table->cursor->ha_reset();
1790
 
    }
1791
 
  }
1792
 
}
1793
 
 
1794
 
void Session::mark_used_tables_as_free_for_reuse(Table *table)
1795
 
{
1796
 
  for (; table ; table= table->getNext())
1797
 
  {
1798
 
    if (table->query_id == query_id)
1799
 
    {
1800
 
      table->query_id= 0;
1801
 
      table->cursor->ha_reset();
1802
 
    }
1803
 
  }
1804
 
}
1805
 
 
1806
 
/*
1807
 
  Unlocks tables and frees derived tables.
1808
 
  Put all normal tables used by thread in free list.
1809
 
 
1810
 
  It will only close/mark as free for reuse tables opened by this
1811
 
  substatement, it will also check if we are closing tables after
1812
 
  execution of complete query (i.e. we are on upper level) and will
1813
 
  leave prelocked mode if needed.
1814
 
*/
1815
 
void Session::close_thread_tables()
1816
 
{
1817
 
  if (derived_tables)
1818
 
    derived_tables= NULL; // They should all be invalid by this point
1819
 
 
1820
 
  /*
1821
 
    Mark all temporary tables used by this statement as free for reuse.
1822
 
  */
1823
 
  mark_temp_tables_as_free_for_reuse();
1824
 
  /*
1825
 
    Let us commit transaction for statement. Since in 5.0 we only have
1826
 
    one statement transaction and don't allow several nested statement
1827
 
    transactions this call will do nothing if we are inside of stored
1828
 
    function or trigger (i.e. statement transaction is already active and
1829
 
    does not belong to statement for which we do close_thread_tables()).
1830
 
    TODO: This should be fixed in later releases.
1831
 
   */
1832
 
  {
1833
 
    TransactionServices &transaction_services= TransactionServices::singleton();
1834
 
    main_da.can_overwrite_status= true;
1835
 
    transaction_services.autocommitOrRollback(this, is_error());
1836
 
    main_da.can_overwrite_status= false;
1837
 
    transaction.stmt.reset();
1838
 
  }
1839
 
 
1840
 
  if (lock)
1841
 
  {
1842
 
    /*
1843
 
      For RBR we flush the pending event just before we unlock all the
1844
 
      tables.  This means that we are at the end of a topmost
1845
 
      statement, so we ensure that the STMT_END_F flag is set on the
1846
 
      pending event.  For statements that are *inside* stored
1847
 
      functions, the pending event will not be flushed: that will be
1848
 
      handled either before writing a query log event (inside
1849
 
      binlog_query()) or when preparing a pending event.
1850
 
     */
1851
 
    mysql_unlock_tables(this, lock);
1852
 
    lock= 0;
1853
 
  }
1854
 
  /*
1855
 
    Note that we need to hold LOCK_open while changing the
1856
 
    open_tables list. Another thread may work on it.
1857
 
    (See: remove_table_from_cache(), mysql_wait_completed_table())
1858
 
    Closing a MERGE child before the parent would be fatal if the
1859
 
    other thread tries to abort the MERGE lock in between.
1860
 
  */
1861
 
  if (open_tables)
1862
 
    close_open_tables();
1863
 
}
1864
 
 
1865
 
void Session::close_tables_for_reopen(TableList **tables)
1866
 
{
1867
 
  /*
1868
 
    If table list consists only from tables from prelocking set, table list
1869
 
    for new attempt should be empty, so we have to update list's root pointer.
1870
 
  */
1871
 
  if (lex->first_not_own_table() == *tables)
1872
 
    *tables= 0;
1873
 
  lex->chop_off_not_own_tables();
1874
 
  for (TableList *tmp= *tables; tmp; tmp= tmp->next_global)
1875
 
    tmp->table= 0;
1876
 
  close_thread_tables();
1877
 
}
1878
 
 
1879
 
bool Session::openTablesLock(TableList *tables)
1880
 
{
1881
 
  uint32_t counter;
1882
 
  bool need_reopen;
1883
 
 
1884
 
  for ( ; ; )
1885
 
  {
1886
 
    if (open_tables_from_list(&tables, &counter))
1887
 
      return true;
1888
 
 
1889
 
    if (not lock_tables(tables, counter, &need_reopen))
1890
 
      break;
1891
 
    if (not need_reopen)
1892
 
      return true;
1893
 
    close_tables_for_reopen(&tables);
1894
 
  }
1895
 
  if ((mysql_handle_derived(lex, &mysql_derived_prepare) ||
1896
 
       (fill_derived_tables() &&
1897
 
        mysql_handle_derived(lex, &mysql_derived_filling))))
1898
 
    return true;
1899
 
 
1900
 
  return false;
1901
 
}
1902
 
 
1903
 
bool Session::openTables(TableList *tables, uint32_t flags)
1904
 
{
1905
 
  uint32_t counter;
1906
 
  bool ret= fill_derived_tables();
1907
 
  assert(ret == false);
1908
 
  if (open_tables_from_list(&tables, &counter, flags) ||
1909
 
      mysql_handle_derived(lex, &mysql_derived_prepare))
1910
 
  {
1911
 
    return true;
1912
 
  }
1913
 
  return false;
1914
 
}
1915
 
 
1916
 
/*
1917
 
  @note "best_effort" is used in cases were if a failure occurred on this
1918
 
  operation it would not be surprising because we are only removing because there
1919
 
  might be an issue (lame engines).
1920
 
*/
1921
 
 
1922
 
bool Session::rm_temporary_table(TableIdentifier &identifier, bool best_effort)
1923
 
{
1924
 
  if (plugin::StorageEngine::dropTable(*this, identifier))
1925
 
  {
1926
 
    if (not best_effort)
1927
 
    {
1928
 
      errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
1929
 
                    identifier.getSQLPath().c_str(), errno);
1930
 
    }
1931
 
 
1932
 
    return true;
1933
 
  }
1934
 
 
1935
 
  return false;
1936
 
}
1937
 
 
1938
 
bool Session::rm_temporary_table(plugin::StorageEngine *base, TableIdentifier &identifier)
1939
 
{
1940
 
  assert(base);
1941
 
 
1942
 
  if (plugin::StorageEngine::dropTable(*this, *base, identifier))
1943
 
  {
1944
 
    errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
1945
 
                  identifier.getSQLPath().c_str(), errno);
1946
 
 
1947
 
    return true;
1948
 
  }
1949
 
 
1950
 
  return false;
1951
 
}
1952
 
 
1953
 
/**
1954
 
  @note this will be removed, I am looking through Hudson to see if it is finding
1955
 
  any tables that are missed during cleanup.
1956
 
*/
1957
 
void Session::dumpTemporaryTableNames(const char *foo)
1958
 
{
1959
 
  Table *table;
1960
 
 
1961
 
  if (not temporary_tables)
1962
 
    return;
1963
 
 
1964
 
  cerr << "Begin Run: " << foo << "\n";
1965
 
  for (table= temporary_tables; table; table= table->getNext())
1966
 
  {
1967
 
    bool have_proto= false;
1968
 
 
1969
 
    message::Table *proto= table->getShare()->getTableProto();
1970
 
    if (table->getShare()->getTableProto())
1971
 
      have_proto= true;
1972
 
 
1973
 
    const char *answer= have_proto ? "true" : "false";
1974
 
 
1975
 
    if (have_proto)
1976
 
    {
1977
 
      cerr << "\tTable Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
1978
 
      cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
1979
 
    }
1980
 
    else
1981
 
      cerr << "\tTabl;e Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
1982
 
  }
1983
 
}
1984
 
 
1985
 
bool Session::storeTableMessage(const TableIdentifier &identifier, message::Table &table_message)
1986
 
{
1987
 
  table_message_cache.insert(make_pair(identifier.getPath(), table_message));
1988
 
 
1989
 
  return true;
1990
 
}
1991
 
 
1992
 
bool Session::removeTableMessage(const TableIdentifier &identifier)
1993
 
{
1994
 
  TableMessageCache::iterator iter;
1995
 
 
1996
 
  iter= table_message_cache.find(identifier.getPath());
1997
 
 
1998
 
  if (iter == table_message_cache.end())
1999
 
    return false;
2000
 
 
2001
 
  table_message_cache.erase(iter);
2002
 
 
2003
 
  return true;
2004
 
}
2005
 
 
2006
 
bool Session::getTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2007
 
{
2008
 
  TableMessageCache::iterator iter;
2009
 
 
2010
 
  iter= table_message_cache.find(identifier.getPath());
2011
 
 
2012
 
  if (iter == table_message_cache.end())
2013
 
    return false;
2014
 
 
2015
 
  table_message.CopyFrom(((*iter).second));
2016
 
 
2017
 
  return true;
2018
 
}
2019
 
 
2020
 
bool Session::doesTableMessageExist(const TableIdentifier &identifier)
2021
 
{
2022
 
  TableMessageCache::iterator iter;
2023
 
 
2024
 
  iter= table_message_cache.find(identifier.getPath());
2025
 
 
2026
 
  if (iter == table_message_cache.end())
2027
 
  {
2028
 
    return false;
2029
 
  }
2030
 
 
2031
 
  return true;
2032
 
}
2033
 
 
2034
 
bool Session::renameTableMessage(const TableIdentifier &from, const TableIdentifier &to)
2035
 
{
2036
 
  TableMessageCache::iterator iter;
2037
 
 
2038
 
  table_message_cache[to.getPath()]= table_message_cache[from.getPath()];
2039
 
 
2040
 
  iter= table_message_cache.find(to.getPath());
2041
 
 
2042
 
  if (iter == table_message_cache.end())
2043
 
  {
2044
 
    return false;
2045
 
  }
2046
 
 
2047
 
  (*iter).second.set_schema(to.getSchemaName());
2048
 
  (*iter).second.set_name(to.getTableName());
2049
 
 
2050
 
  return true;
2051
 
}
2052
 
 
2053
 
TableShareInstance *Session::getTemporaryShare(TableIdentifier::Type type_arg)
2054
 
{
2055
 
  temporary_shares.push_back(new TableShareInstance(type_arg)); // This will not go into the tableshare cache, so no key is used.
2056
 
 
2057
 
  TableShareInstance *tmp_share= temporary_shares.back();
2058
 
 
2059
 
  assert(tmp_share);
2060
 
 
2061
 
  return tmp_share;
2062
 
}
2063
 
 
2064
 
} /* namespace drizzled */
 
2233
 
 
2234
void mark_transaction_to_rollback(THD *thd, bool all)
 
2235
{
 
2236
  if (thd)
 
2237
  {
 
2238
    thd->is_fatal_sub_stmt_error= true;
 
2239
    thd->transaction_rollback_request= all;
 
2240
  }
 
2241
}
 
2242
/***************************************************************************
 
2243
  Handling of XA id cacheing
 
2244
***************************************************************************/
 
2245
 
 
2246
pthread_mutex_t LOCK_xid_cache;
 
2247
HASH xid_cache;
 
2248
 
 
2249
extern "C" unsigned char *xid_get_hash_key(const unsigned char *, size_t *, bool);
 
2250
extern "C" void xid_free_hash(void *);
 
2251
 
 
2252
unsigned char *xid_get_hash_key(const unsigned char *ptr, size_t *length,
 
2253
                        bool not_used __attribute__((unused)))
 
2254
{
 
2255
  *length=((XID_STATE*)ptr)->xid.key_length();
 
2256
  return ((XID_STATE*)ptr)->xid.key();
 
2257
}
 
2258
 
 
2259
void xid_free_hash(void *ptr)
 
2260
{
 
2261
  if (!((XID_STATE*)ptr)->in_thd)
 
2262
    free((unsigned char*)ptr);
 
2263
}
 
2264
 
 
2265
bool xid_cache_init()
 
2266
{
 
2267
  pthread_mutex_init(&LOCK_xid_cache, MY_MUTEX_INIT_FAST);
 
2268
  return hash_init(&xid_cache, &my_charset_bin, 100, 0, 0,
 
2269
                   xid_get_hash_key, xid_free_hash, 0) != 0;
 
2270
}
 
2271
 
 
2272
void xid_cache_free()
 
2273
{
 
2274
  if (hash_inited(&xid_cache))
 
2275
  {
 
2276
    hash_free(&xid_cache);
 
2277
    pthread_mutex_destroy(&LOCK_xid_cache);
 
2278
  }
 
2279
}
 
2280
 
 
2281
XID_STATE *xid_cache_search(XID *xid)
 
2282
{
 
2283
  pthread_mutex_lock(&LOCK_xid_cache);
 
2284
  XID_STATE *res=(XID_STATE *)hash_search(&xid_cache, xid->key(), xid->key_length());
 
2285
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2286
  return res;
 
2287
}
 
2288
 
 
2289
 
 
2290
bool xid_cache_insert(XID *xid, enum xa_states xa_state)
 
2291
{
 
2292
  XID_STATE *xs;
 
2293
  bool res;
 
2294
  pthread_mutex_lock(&LOCK_xid_cache);
 
2295
  if (hash_search(&xid_cache, xid->key(), xid->key_length()))
 
2296
    res=0;
 
2297
  else if (!(xs=(XID_STATE *)my_malloc(sizeof(*xs), MYF(MY_WME))))
 
2298
    res=1;
 
2299
  else
 
2300
  {
 
2301
    xs->xa_state=xa_state;
 
2302
    xs->xid.set(xid);
 
2303
    xs->in_thd=0;
 
2304
    res=my_hash_insert(&xid_cache, (unsigned char*)xs);
 
2305
  }
 
2306
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2307
  return res;
 
2308
}
 
2309
 
 
2310
 
 
2311
bool xid_cache_insert(XID_STATE *xid_state)
 
2312
{
 
2313
  pthread_mutex_lock(&LOCK_xid_cache);
 
2314
  assert(hash_search(&xid_cache, xid_state->xid.key(),
 
2315
                          xid_state->xid.key_length())==0);
 
2316
  bool res=my_hash_insert(&xid_cache, (unsigned char*)xid_state);
 
2317
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2318
  return res;
 
2319
}
 
2320
 
 
2321
 
 
2322
void xid_cache_delete(XID_STATE *xid_state)
 
2323
{
 
2324
  pthread_mutex_lock(&LOCK_xid_cache);
 
2325
  hash_delete(&xid_cache, (unsigned char *)xid_state);
 
2326
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2327
}
 
2328
 
 
2329
/*
 
2330
  Implementation of interface to write rows to the binary log through the
 
2331
  thread.  The thread is responsible for writing the rows it has
 
2332
  inserted/updated/deleted.
 
2333
*/
 
2334
 
 
2335
 
 
2336
/*
 
2337
  Template member function for ensuring that there is an rows log
 
2338
  event of the apropriate type before proceeding.
 
2339
 
 
2340
  PRE CONDITION:
 
2341
    - Events of type 'RowEventT' have the type code 'type_code'.
 
2342
    
 
2343
  POST CONDITION:
 
2344
    If a non-NULL pointer is returned, the pending event for thread 'thd' will
 
2345
    be an event of type 'RowEventT' (which have the type code 'type_code')
 
2346
    will either empty or have enough space to hold 'needed' bytes.  In
 
2347
    addition, the columns bitmap will be correct for the row, meaning that
 
2348
    the pending event will be flushed if the columns in the event differ from
 
2349
    the columns suppled to the function.
 
2350
 
 
2351
  RETURNS
 
2352
    If no error, a non-NULL pending event (either one which already existed or
 
2353
    the newly created one).
 
2354
    If error, NULL.
 
2355
 */
 
2356
 
 
2357
template <class RowsEventT> Rows_log_event* 
 
2358
THD::binlog_prepare_pending_rows_event(Table* table, uint32_t serv_id,
 
2359
                                       size_t needed,
 
2360
                                       bool is_transactional,
 
2361
                                       RowsEventT *hint __attribute__((unused)))
 
2362
{
 
2363
  /* Pre-conditions */
 
2364
  assert(table->s->table_map_id != UINT32_MAX);
 
2365
 
 
2366
  /* Fetch the type code for the RowsEventT template parameter */
 
2367
  int const type_code= RowsEventT::TYPE_CODE;
 
2368
 
 
2369
  /*
 
2370
    There is no good place to set up the transactional data, so we
 
2371
    have to do it here.
 
2372
  */
 
2373
  if (binlog_setup_trx_data())
 
2374
    return(NULL);
 
2375
 
 
2376
  Rows_log_event* pending= binlog_get_pending_rows_event();
 
2377
 
 
2378
  if (unlikely(pending && !pending->is_valid()))
 
2379
    return(NULL);
 
2380
 
 
2381
  /*
 
2382
    Check if the current event is non-NULL and a write-rows
 
2383
    event. Also check if the table provided is mapped: if it is not,
 
2384
    then we have switched to writing to a new table.
 
2385
    If there is no pending event, we need to create one. If there is a pending
 
2386
    event, but it's not about the same table id, or not of the same type
 
2387
    (between Write, Update and Delete), or not the same affected columns, or
 
2388
    going to be too big, flush this event to disk and create a new pending
 
2389
    event.
 
2390
 
 
2391
    The last test is necessary for the Cluster injector to work
 
2392
    correctly. The reason is that the Cluster can inject two write
 
2393
    rows with different column bitmaps if there is an insert followed
 
2394
    by an update in the same transaction, and these are grouped into a
 
2395
    single epoch/transaction when fed to the injector.
 
2396
 
 
2397
    TODO: Fix the code so that the last test can be removed.
 
2398
  */
 
2399
  if (!pending ||
 
2400
      pending->server_id != serv_id || 
 
2401
      pending->get_table_id() != table->s->table_map_id ||
 
2402
      pending->get_type_code() != type_code || 
 
2403
      pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
 
2404
      !bitmap_cmp(pending->get_cols(), table->write_set))
 
2405
    {
 
2406
    /* Create a new RowsEventT... */
 
2407
    Rows_log_event* const
 
2408
        ev= new RowsEventT(this, table, table->s->table_map_id,
 
2409
                           is_transactional);
 
2410
    if (unlikely(!ev))
 
2411
      return(NULL);
 
2412
    ev->server_id= serv_id; // I don't like this, it's too easy to forget.
 
2413
    /*
 
2414
      flush the pending event and replace it with the newly created
 
2415
      event...
 
2416
    */
 
2417
    if (unlikely(mysql_bin_log.flush_and_set_pending_rows_event(this, ev)))
 
2418
    {
 
2419
      delete ev;
 
2420
      return(NULL);
 
2421
    }
 
2422
 
 
2423
    return(ev);               /* This is the new pending event */
 
2424
  }
 
2425
  return(pending);        /* This is the current pending event */
 
2426
}
 
2427
 
 
2428
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
2429
/*
 
2430
  Instantiate the versions we need, we have -fno-implicit-template as
 
2431
  compiling option.
 
2432
*/
 
2433
template Rows_log_event*
 
2434
THD::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
 
2435
                                       Write_rows_log_event*);
 
2436
 
 
2437
template Rows_log_event*
 
2438
THD::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
 
2439
                                       Delete_rows_log_event *);
 
2440
 
 
2441
template Rows_log_event* 
 
2442
THD::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
 
2443
                                       Update_rows_log_event *);
 
2444
#endif
 
2445
 
 
2446
namespace {
 
2447
  /**
 
2448
     Class to handle temporary allocation of memory for row data.
 
2449
 
 
2450
     The responsibilities of the class is to provide memory for
 
2451
     packing one or two rows of packed data (depending on what
 
2452
     constructor is called).
 
2453
 
 
2454
     In order to make the allocation more efficient for "simple" rows,
 
2455
     i.e., rows that do not contain any blobs, a pointer to the
 
2456
     allocated memory is of memory is stored in the table structure
 
2457
     for simple rows.  If memory for a table containing a blob field
 
2458
     is requested, only memory for that is allocated, and subsequently
 
2459
     released when the object is destroyed.
 
2460
 
 
2461
   */
 
2462
  class Row_data_memory {
 
2463
  public:
 
2464
    /**
 
2465
      Build an object to keep track of a block-local piece of memory
 
2466
      for storing a row of data.
 
2467
 
 
2468
      @param table
 
2469
      Table where the pre-allocated memory is stored.
 
2470
 
 
2471
      @param length
 
2472
      Length of data that is needed, if the record contain blobs.
 
2473
     */
 
2474
    Row_data_memory(Table *table, size_t const len1)
 
2475
      : m_memory(0)
 
2476
    {
 
2477
      m_alloc_checked= false;
 
2478
      allocate_memory(table, len1);
 
2479
      m_ptr[0]= has_memory() ? m_memory : 0;
 
2480
      m_ptr[1]= 0;
 
2481
    }
 
2482
 
 
2483
    Row_data_memory(Table *table, size_t const len1, size_t const len2)
 
2484
      : m_memory(0)
 
2485
    {
 
2486
      m_alloc_checked= false;
 
2487
      allocate_memory(table, len1 + len2);
 
2488
      m_ptr[0]= has_memory() ? m_memory        : 0;
 
2489
      m_ptr[1]= has_memory() ? m_memory + len1 : 0;
 
2490
    }
 
2491
 
 
2492
    ~Row_data_memory()
 
2493
    {
 
2494
      if (m_memory != 0 && m_release_memory_on_destruction)
 
2495
        free((unsigned char*) m_memory);
 
2496
    }
 
2497
 
 
2498
    /**
 
2499
       Is there memory allocated?
 
2500
 
 
2501
       @retval true There is memory allocated
 
2502
       @retval false Memory allocation failed
 
2503
     */
 
2504
    bool has_memory() const {
 
2505
      m_alloc_checked= true;
 
2506
      return m_memory != 0;
 
2507
    }
 
2508
 
 
2509
    unsigned char *slot(uint32_t s)
 
2510
    {
 
2511
      assert(s < sizeof(m_ptr)/sizeof(*m_ptr));
 
2512
      assert(m_ptr[s] != 0);
 
2513
      assert(m_alloc_checked == true);
 
2514
      return m_ptr[s];
 
2515
    }
 
2516
 
 
2517
  private:
 
2518
    void allocate_memory(Table *const table, size_t const total_length)
 
2519
    {
 
2520
      if (table->s->blob_fields == 0)
 
2521
      {
 
2522
        /*
 
2523
          The maximum length of a packed record is less than this
 
2524
          length. We use this value instead of the supplied length
 
2525
          when allocating memory for records, since we don't know how
 
2526
          the memory will be used in future allocations.
 
2527
 
 
2528
          Since table->s->reclength is for unpacked records, we have
 
2529
          to add two bytes for each field, which can potentially be
 
2530
          added to hold the length of a packed field.
 
2531
        */
 
2532
        size_t const maxlen= table->s->reclength + 2 * table->s->fields;
 
2533
 
 
2534
        /*
 
2535
          Allocate memory for two records if memory hasn't been
 
2536
          allocated. We allocate memory for two records so that it can
 
2537
          be used when processing update rows as well.
 
2538
        */
 
2539
        if (table->write_row_record == 0)
 
2540
          table->write_row_record=
 
2541
            (unsigned char *) alloc_root(&table->mem_root, 2 * maxlen);
 
2542
        m_memory= table->write_row_record;
 
2543
        m_release_memory_on_destruction= false;
 
2544
      }
 
2545
      else
 
2546
      {
 
2547
        m_memory= (unsigned char *) my_malloc(total_length, MYF(MY_WME));
 
2548
        m_release_memory_on_destruction= true;
 
2549
      }
 
2550
    }
 
2551
 
 
2552
    mutable bool m_alloc_checked;
 
2553
    bool m_release_memory_on_destruction;
 
2554
    unsigned char *m_memory;
 
2555
    unsigned char *m_ptr[2];
 
2556
  };
 
2557
}
 
2558
 
 
2559
 
 
2560
int THD::binlog_write_row(Table* table, bool is_trans, 
 
2561
                          unsigned char const *record) 
 
2562
 
2563
  assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
 
2564
 
 
2565
  /*
 
2566
    Pack records into format for transfer. We are allocating more
 
2567
    memory than needed, but that doesn't matter.
 
2568
  */
 
2569
  Row_data_memory memory(table, table->max_row_length(record));
 
2570
  if (!memory.has_memory())
 
2571
    return HA_ERR_OUT_OF_MEM;
 
2572
 
 
2573
  unsigned char *row_data= memory.slot(0);
 
2574
 
 
2575
  size_t const len= pack_row(table, table->write_set, row_data, record);
 
2576
 
 
2577
  Rows_log_event* const ev=
 
2578
    binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
 
2579
                                      static_cast<Write_rows_log_event*>(0));
 
2580
 
 
2581
  if (unlikely(ev == 0))
 
2582
    return HA_ERR_OUT_OF_MEM;
 
2583
 
 
2584
  return ev->add_row_data(row_data, len);
 
2585
}
 
2586
 
 
2587
int THD::binlog_update_row(Table* table, bool is_trans,
 
2588
                           const unsigned char *before_record,
 
2589
                           const unsigned char *after_record)
 
2590
 
2591
  assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
 
2592
 
 
2593
  size_t const before_maxlen = table->max_row_length(before_record);
 
2594
  size_t const after_maxlen  = table->max_row_length(after_record);
 
2595
 
 
2596
  Row_data_memory row_data(table, before_maxlen, after_maxlen);
 
2597
  if (!row_data.has_memory())
 
2598
    return HA_ERR_OUT_OF_MEM;
 
2599
 
 
2600
  unsigned char *before_row= row_data.slot(0);
 
2601
  unsigned char *after_row= row_data.slot(1);
 
2602
 
 
2603
  size_t const before_size= pack_row(table, table->read_set, before_row,
 
2604
                                        before_record);
 
2605
  size_t const after_size= pack_row(table, table->write_set, after_row,
 
2606
                                       after_record);
 
2607
 
 
2608
  Rows_log_event* const ev=
 
2609
    binlog_prepare_pending_rows_event(table, server_id,
 
2610
                                      before_size + after_size, is_trans,
 
2611
                                      static_cast<Update_rows_log_event*>(0));
 
2612
 
 
2613
  if (unlikely(ev == 0))
 
2614
    return HA_ERR_OUT_OF_MEM;
 
2615
 
 
2616
  return
 
2617
    ev->add_row_data(before_row, before_size) ||
 
2618
    ev->add_row_data(after_row, after_size);
 
2619
}
 
2620
 
 
2621
int THD::binlog_delete_row(Table* table, bool is_trans, 
 
2622
                           unsigned char const *record)
 
2623
 
2624
  assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
 
2625
 
 
2626
  /* 
 
2627
     Pack records into format for transfer. We are allocating more
 
2628
     memory than needed, but that doesn't matter.
 
2629
  */
 
2630
  Row_data_memory memory(table, table->max_row_length(record));
 
2631
  if (unlikely(!memory.has_memory()))
 
2632
    return HA_ERR_OUT_OF_MEM;
 
2633
 
 
2634
  unsigned char *row_data= memory.slot(0);
 
2635
 
 
2636
  size_t const len= pack_row(table, table->read_set, row_data, record);
 
2637
 
 
2638
  Rows_log_event* const ev=
 
2639
    binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
 
2640
                                      static_cast<Delete_rows_log_event*>(0));
 
2641
 
 
2642
  if (unlikely(ev == 0))
 
2643
    return HA_ERR_OUT_OF_MEM;
 
2644
 
 
2645
  return ev->add_row_data(row_data, len);
 
2646
}
 
2647
 
 
2648
 
 
2649
int THD::binlog_flush_pending_rows_event(bool stmt_end)
 
2650
{
 
2651
  /*
 
2652
    We shall flush the pending event even if we are not in row-based
 
2653
    mode: it might be the case that we left row-based mode before
 
2654
    flushing anything (e.g., if we have explicitly locked tables).
 
2655
   */
 
2656
  if (!mysql_bin_log.is_open())
 
2657
    return(0);
 
2658
 
 
2659
  /*
 
2660
    Mark the event as the last event of a statement if the stmt_end
 
2661
    flag is set.
 
2662
  */
 
2663
  int error= 0;
 
2664
  if (Rows_log_event *pending= binlog_get_pending_rows_event())
 
2665
  {
 
2666
    if (stmt_end)
 
2667
    {
 
2668
      pending->set_flags(Rows_log_event::STMT_END_F);
 
2669
      pending->flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
 
2670
      binlog_table_maps= 0;
 
2671
    }
 
2672
 
 
2673
    error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0);
 
2674
  }
 
2675
 
 
2676
  return(error);
 
2677
}
 
2678
 
 
2679
 
 
2680
/*
 
2681
  Member function that will log query, either row-based or
 
2682
  statement-based depending on the value of the 'current_stmt_binlog_row_based'
 
2683
  the value of the 'qtype' flag.
 
2684
 
 
2685
  This function should be called after the all calls to ha_*_row()
 
2686
  functions have been issued, but before tables are unlocked and
 
2687
  closed.
 
2688
 
 
2689
  OBSERVE
 
2690
    There shall be no writes to any system table after calling
 
2691
    binlog_query(), so these writes has to be moved to before the call
 
2692
    of binlog_query() for correct functioning.
 
2693
 
 
2694
    This is necessesary not only for RBR, but the master might crash
 
2695
    after binlogging the query but before changing the system tables.
 
2696
    This means that the slave and the master are not in the same state
 
2697
    (after the master has restarted), so therefore we have to
 
2698
    eliminate this problem.
 
2699
 
 
2700
  RETURN VALUE
 
2701
    Error code, or 0 if no error.
 
2702
*/
 
2703
int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
 
2704
                      ulong query_len, bool is_trans, bool suppress_use,
 
2705
                      THD::killed_state killed_status_arg)
 
2706
{
 
2707
  assert(query_arg && mysql_bin_log.is_open());
 
2708
 
 
2709
  if (int error= binlog_flush_pending_rows_event(true))
 
2710
    return(error);
 
2711
 
 
2712
  /*
 
2713
    If we are in statement mode and trying to log an unsafe statement,
 
2714
    we should print a warning.
 
2715
  */
 
2716
  if (lex->is_stmt_unsafe() &&
 
2717
      variables.binlog_format == BINLOG_FORMAT_STMT)
 
2718
  {
 
2719
    assert(this->query != NULL);
 
2720
    push_warning(this, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
2721
                 ER_BINLOG_UNSAFE_STATEMENT,
 
2722
                 ER(ER_BINLOG_UNSAFE_STATEMENT));
 
2723
    if (!(binlog_flags & BINLOG_FLAG_UNSAFE_STMT_PRINTED))
 
2724
    {
 
2725
      char warn_buf[DRIZZLE_ERRMSG_SIZE];
 
2726
      snprintf(warn_buf, DRIZZLE_ERRMSG_SIZE, "%s Statement: %s",
 
2727
               ER(ER_BINLOG_UNSAFE_STATEMENT), this->query);
 
2728
      sql_print_warning(warn_buf);
 
2729
      binlog_flags|= BINLOG_FLAG_UNSAFE_STMT_PRINTED;
 
2730
    }
 
2731
  }
 
2732
 
 
2733
  switch (qtype) {
 
2734
  case THD::ROW_QUERY_TYPE:
 
2735
    if (current_stmt_binlog_row_based)
 
2736
      return(0);
 
2737
    /* Otherwise, we fall through */
 
2738
  case THD::DRIZZLE_QUERY_TYPE:
 
2739
    /*
 
2740
      Using this query type is a conveniece hack, since we have been
 
2741
      moving back and forth between using RBR for replication of
 
2742
      system tables and not using it.
 
2743
 
 
2744
      Make sure to change in check_table_binlog_row_based() according
 
2745
      to how you treat this.
 
2746
    */
 
2747
  case THD::STMT_QUERY_TYPE:
 
2748
    /*
 
2749
      The DRIZZLE_LOG::write() function will set the STMT_END_F flag and
 
2750
      flush the pending rows event if necessary.
 
2751
     */
 
2752
    {
 
2753
      Query_log_event qinfo(this, query_arg, query_len, is_trans, suppress_use,
 
2754
                            killed_status_arg);
 
2755
      qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
 
2756
      /*
 
2757
        Binlog table maps will be irrelevant after a Query_log_event
 
2758
        (they are just removed on the slave side) so after the query
 
2759
        log event is written to the binary log, we pretend that no
 
2760
        table maps were written.
 
2761
       */
 
2762
      int error= mysql_bin_log.write(&qinfo);
 
2763
      binlog_table_maps= 0;
 
2764
      return(error);
 
2765
    }
 
2766
    break;
 
2767
 
 
2768
  case THD::QUERY_TYPE_COUNT:
 
2769
  default:
 
2770
    assert(0 <= qtype && qtype < QUERY_TYPE_COUNT);
 
2771
  }
 
2772
  return(0);
 
2773
}
 
2774
 
 
2775
bool Discrete_intervals_list::append(uint64_t start, uint64_t val,
 
2776
                                 uint64_t incr)
 
2777
{
 
2778
  /* first, see if this can be merged with previous */
 
2779
  if ((head == NULL) || tail->merge_if_contiguous(start, val, incr))
 
2780
  {
 
2781
    /* it cannot, so need to add a new interval */
 
2782
    Discrete_interval *new_interval= new Discrete_interval(start, val, incr);
 
2783
    return(append(new_interval));
 
2784
  }
 
2785
  return(0);
 
2786
}
 
2787
 
 
2788
bool Discrete_intervals_list::append(Discrete_interval *new_interval)
 
2789
{
 
2790
  if (unlikely(new_interval == NULL))
 
2791
    return(1);
 
2792
  if (head == NULL)
 
2793
    head= current= new_interval;
 
2794
  else
 
2795
    tail->next= new_interval;
 
2796
  tail= new_interval;
 
2797
  elements++;
 
2798
  return(0);
 
2799
}