~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_class.cc

code clean move Item_func_num1 and Item_func_connection_id to functions directory

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2008 Sun Microsystems
5
 
 *
6
 
 *  This program is free software; you can redistribute it and/or modify
7
 
 *  it under the terms of the GNU General Public License as published by
8
 
 *  the Free Software Foundation; version 2 of the License.
9
 
 *
10
 
 *  This program is distributed in the hope that it will be useful,
11
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 *  GNU General Public License for more details.
14
 
 *
15
 
 *  You should have received a copy of the GNU General Public License
16
 
 *  along with this program; if not, write to the Free Software
17
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
 
 */
19
 
 
20
 
/**
21
 
 * @file Implementation of the Session class and API
22
 
 */
23
 
 
24
 
#include "config.h"
25
 
#include <drizzled/session.h>
26
 
#include "drizzled/session_list.h"
 
1
/* Copyright (C) 2000-2006 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
/*****************************************************************************
 
18
**
 
19
** This file implements classes defined in sql_class.h
 
20
** Especially the classes to handle a result from a select
 
21
**
 
22
*****************************************************************************/
 
23
#include <drizzled/server_includes.h>
 
24
#include "rpl_rli.h"
 
25
#include "rpl_record.h"
 
26
#include "log_event.h"
27
27
#include <sys/stat.h>
28
 
#include <drizzled/error.h>
29
 
#include <drizzled/gettext.h>
30
 
#include <drizzled/query_id.h>
31
 
#include <drizzled/data_home.h>
32
 
#include <drizzled/sql_base.h>
33
 
#include <drizzled/lock.h>
34
 
#include <drizzled/item/cache.h>
35
 
#include <drizzled/item/float.h>
36
 
#include <drizzled/item/return_int.h>
37
 
#include <drizzled/item/empty_string.h>
38
 
#include <drizzled/show.h>
39
 
#include <drizzled/plugin/client.h>
40
 
#include "drizzled/plugin/scheduler.h"
41
 
#include "drizzled/plugin/authentication.h"
42
 
#include "drizzled/plugin/logging.h"
43
 
#include "drizzled/plugin/transactional_storage_engine.h"
44
 
#include "drizzled/probes.h"
45
 
#include "drizzled/table_proto.h"
46
 
#include "drizzled/db.h"
47
 
#include "drizzled/pthread_globals.h"
48
 
#include "drizzled/transaction_services.h"
49
 
#include "drizzled/drizzled.h"
50
 
 
51
 
#include "drizzled/table_share_instance.h"
52
 
 
53
 
#include "plugin/myisam/myisam.h"
54
 
#include "drizzled/internal/iocache.h"
55
 
#include "drizzled/internal/thread_var.h"
56
 
#include "drizzled/plugin/event_observer.h"
57
 
 
58
 
#include <fcntl.h>
59
 
#include <algorithm>
60
 
#include <climits>
61
 
#include "boost/filesystem.hpp" 
62
 
 
63
 
using namespace std;
64
 
 
65
 
namespace fs=boost::filesystem;
66
 
namespace drizzled
67
 
{
 
28
#include <mysys/thr_alarm.h>
 
29
#include <mysys/mysys_err.h>
 
30
#include <drizzled/drizzled_error_messages.h>
 
31
#include <drizzled/innodb_plugin_extras.h>
68
32
 
69
33
/*
70
34
  The following is used to initialise Table_ident with a internal
73
37
char internal_table_name[2]= "*";
74
38
char empty_c_string[1]= {0};    /* used for not defined db */
75
39
 
76
 
const char * const Session::DEFAULT_WHERE= "field list";
 
40
const char * const THD::DEFAULT_WHERE= "field list";
 
41
 
 
42
 
 
43
/*****************************************************************************
 
44
** Instansiate templates
 
45
*****************************************************************************/
 
46
 
 
47
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
48
/* Used templates */
 
49
template class List<Key>;
 
50
template class List_iterator<Key>;
 
51
template class List<Key_part_spec>;
 
52
template class List_iterator<Key_part_spec>;
 
53
template class List<Alter_drop>;
 
54
template class List_iterator<Alter_drop>;
 
55
template class List<Alter_column>;
 
56
template class List_iterator<Alter_column>;
 
57
#endif
 
58
 
 
59
/****************************************************************************
 
60
** User variables
 
61
****************************************************************************/
 
62
 
 
63
extern "C" unsigned char *get_var_key(user_var_entry *entry, size_t *length,
 
64
                              bool not_used __attribute__((unused)))
 
65
{
 
66
  *length= entry->name.length;
 
67
  return (unsigned char*) entry->name.str;
 
68
}
 
69
 
 
70
extern "C" void free_user_var(user_var_entry *entry)
 
71
{
 
72
  char *pos= (char*) entry+ALIGN_SIZE(sizeof(*entry));
 
73
  if (entry->value && entry->value != pos)
 
74
    free(entry->value);
 
75
  free((char*) entry);
 
76
}
77
77
 
78
78
bool Key_part_spec::operator==(const Key_part_spec& other) const
79
79
{
82
82
         !strcmp(field_name.str, other.field_name.str);
83
83
}
84
84
 
85
 
Open_tables_state::Open_tables_state(uint64_t version_arg) :
86
 
  version(version_arg)
87
 
{
88
 
  open_tables= temporary_tables= derived_tables= NULL;
89
 
  extra_lock= lock= NULL;
 
85
/**
 
86
  Construct an (almost) deep copy of this key. Only those
 
87
  elements that are known to never change are not copied.
 
88
  If out of memory, a partial copy is returned and an error is set
 
89
  in THD.
 
90
*/
 
91
 
 
92
Key::Key(const Key &rhs, MEM_ROOT *mem_root)
 
93
  :type(rhs.type),
 
94
  key_create_info(rhs.key_create_info),
 
95
  columns(rhs.columns, mem_root),
 
96
  name(rhs.name),
 
97
  generated(rhs.generated)
 
98
{
 
99
  list_copy_and_replace_each_value(columns, mem_root);
 
100
}
 
101
 
 
102
/**
 
103
  Construct an (almost) deep copy of this foreign key. Only those
 
104
  elements that are known to never change are not copied.
 
105
  If out of memory, a partial copy is returned and an error is set
 
106
  in THD.
 
107
*/
 
108
 
 
109
Foreign_key::Foreign_key(const Foreign_key &rhs, MEM_ROOT *mem_root)
 
110
  :Key(rhs),
 
111
  ref_table(rhs.ref_table),
 
112
  ref_columns(rhs.ref_columns),
 
113
  delete_opt(rhs.delete_opt),
 
114
  update_opt(rhs.update_opt),
 
115
  match_opt(rhs.match_opt)
 
116
{
 
117
  list_copy_and_replace_each_value(ref_columns, mem_root);
 
118
}
 
119
 
 
120
/*
 
121
  Test if a foreign key (= generated key) is a prefix of the given key
 
122
  (ignoring key name, key type and order of columns)
 
123
 
 
124
  NOTES:
 
125
    This is only used to test if an index for a FOREIGN KEY exists
 
126
 
 
127
  IMPLEMENTATION
 
128
    We only compare field names
 
129
 
 
130
  RETURN
 
131
    0   Generated key is a prefix of other key
 
132
    1   Not equal
 
133
*/
 
134
 
 
135
bool foreign_key_prefix(Key *a, Key *b)
 
136
{
 
137
  /* Ensure that 'a' is the generated key */
 
138
  if (a->generated)
 
139
  {
 
140
    if (b->generated && a->columns.elements > b->columns.elements)
 
141
      std::swap(a, b);                       // Put shorter key in 'a'
 
142
  }
 
143
  else
 
144
  {
 
145
    if (!b->generated)
 
146
      return true;                              // No foreign key
 
147
    std::swap(a, b);                       // Put generated key in 'a'
 
148
  }
 
149
 
 
150
  /* Test if 'a' is a prefix of 'b' */
 
151
  if (a->columns.elements > b->columns.elements)
 
152
    return true;                                // Can't be prefix
 
153
 
 
154
  List_iterator<Key_part_spec> col_it1(a->columns);
 
155
  List_iterator<Key_part_spec> col_it2(b->columns);
 
156
  const Key_part_spec *col1, *col2;
 
157
 
 
158
#ifdef ENABLE_WHEN_INNODB_CAN_HANDLE_SWAPED_FOREIGN_KEY_COLUMNS
 
159
  while ((col1= col_it1++))
 
160
  {
 
161
    bool found= 0;
 
162
    col_it2.rewind();
 
163
    while ((col2= col_it2++))
 
164
    {
 
165
      if (*col1 == *col2)
 
166
      {
 
167
        found= true;
 
168
        break;
 
169
      }
 
170
    }
 
171
    if (!found)
 
172
      return true;                              // Error
 
173
  }
 
174
  return false;                                 // Is prefix
 
175
#else
 
176
  while ((col1= col_it1++))
 
177
  {
 
178
    col2= col_it2++;
 
179
    if (!(*col1 == *col2))
 
180
      return true;
 
181
  }
 
182
  return false;                                 // Is prefix
 
183
#endif
 
184
}
 
185
 
 
186
 
 
187
/****************************************************************************
 
188
** Thread specific functions
 
189
****************************************************************************/
 
190
 
 
191
Open_tables_state::Open_tables_state(ulong version_arg)
 
192
  :version(version_arg), state_flags(0U)
 
193
{
 
194
  reset_open_tables_state();
90
195
}
91
196
 
92
197
/*
93
198
  The following functions form part of the C plugin API
94
199
*/
95
 
int mysql_tmpfile(const char *prefix)
 
200
 
 
201
extern "C" int mysql_tmpfile(const char *prefix)
96
202
{
97
203
  char filename[FN_REFLEN];
98
 
  int fd = internal::create_temp_file(filename, drizzle_tmpdir.c_str(), prefix, MYF(MY_WME));
 
204
  File fd = create_temp_file(filename, mysql_tmpdir, prefix,
 
205
                             O_CREAT | O_EXCL | O_RDWR | O_TEMPORARY,
 
206
                             MYF(MY_WME));
99
207
  if (fd >= 0) {
 
208
    /*
 
209
      This can be removed once the following bug is fixed:
 
210
      Bug #28903  create_temp_file() doesn't honor O_TEMPORARY option
 
211
                  (file not removed) (Unix)
 
212
    */
100
213
    unlink(filename);
101
214
  }
102
215
 
103
216
  return fd;
104
217
}
105
218
 
106
 
int session_tablespace_op(const Session *session)
107
 
{
108
 
  return test(session->tablespace_op);
109
 
}
 
219
 
 
220
extern "C"
 
221
int thd_in_lock_tables(const THD *thd)
 
222
{
 
223
  return test(thd->in_lock_tables);
 
224
}
 
225
 
 
226
 
 
227
extern "C"
 
228
int thd_tablespace_op(const THD *thd)
 
229
{
 
230
  return test(thd->tablespace_op);
 
231
}
 
232
 
110
233
 
111
234
/**
112
 
   Set the process info field of the Session structure.
 
235
   Set the process info field of the THD structure.
113
236
 
114
237
   This function is used by plug-ins. Internally, the
115
 
   Session::set_proc_info() function should be used.
 
238
   THD::set_proc_info() function should be used.
116
239
 
117
 
   @see Session::set_proc_info
 
240
   @see THD::set_proc_info
118
241
 */
119
 
void set_session_proc_info(Session *session, const char *info)
120
 
{
121
 
  session->set_proc_info(info);
122
 
}
123
 
 
124
 
const char *get_session_proc_info(Session *session)
125
 
{
126
 
  return session->get_proc_info();
127
 
}
128
 
 
129
 
void **Session::getEngineData(const plugin::MonitoredInTransaction *monitored)
130
 
{
131
 
  return static_cast<void **>(&ha_data[monitored->getId()].ha_ptr);
132
 
}
133
 
 
134
 
ResourceContext *Session::getResourceContext(const plugin::MonitoredInTransaction *monitored,
135
 
                                             size_t index)
136
 
{
137
 
  return &ha_data[monitored->getId()].resource_context[index];
138
 
}
139
 
 
140
 
int64_t session_test_options(const Session *session, int64_t test_options)
141
 
{
142
 
  return session->options & test_options;
143
 
}
144
 
 
145
 
int session_sql_command(const Session *session)
146
 
{
147
 
  return (int) session->lex->sql_command;
148
 
}
149
 
 
150
 
enum_tx_isolation session_tx_isolation(const Session *session)
151
 
{
152
 
  return (enum_tx_isolation)session->variables.tx_isolation;
153
 
}
154
 
 
155
 
Session::Session(plugin::Client *client_arg) :
156
 
  Open_tables_state(refresh_version),
157
 
  mem_root(&main_mem_root),
158
 
  lex(&main_lex),
159
 
  query(),
160
 
  client(client_arg),
161
 
  scheduler(NULL),
162
 
  scheduler_arg(NULL),
163
 
  lock_id(&main_lock_id),
164
 
  user_time(0),
165
 
  ha_data(plugin::num_trx_monitored_objects),
166
 
  arg_of_last_insert_id_function(false),
167
 
  first_successful_insert_id_in_prev_stmt(0),
168
 
  first_successful_insert_id_in_cur_stmt(0),
169
 
  limit_found_rows(0),
170
 
  global_read_lock(0),
171
 
  some_tables_deleted(false),
172
 
  no_errors(false),
173
 
  password(false),
174
 
  is_fatal_error(false),
175
 
  transaction_rollback_request(false),
176
 
  is_fatal_sub_stmt_error(0),
177
 
  derived_tables_processing(false),
178
 
  tablespace_op(false),
179
 
  m_lip(NULL),
180
 
  cached_table(0),
181
 
  transaction_message(NULL),
182
 
  statement_message(NULL),
183
 
  session_event_observers(NULL),
184
 
  use_usage(false)
185
 
{
186
 
  memset(process_list_info, 0, PROCESS_LIST_WIDTH);
187
 
  client->setSession(this);
 
242
extern "C" void
 
243
set_thd_proc_info(THD *thd, const char *info)
 
244
{
 
245
  thd->set_proc_info(info);
 
246
}
 
247
 
 
248
extern "C"
 
249
const char *get_thd_proc_info(THD *thd)
 
250
{
 
251
  return thd->get_proc_info();
 
252
}
 
253
 
 
254
extern "C"
 
255
void **thd_ha_data(const THD *thd, const struct handlerton *hton)
 
256
{
 
257
  return (void **) &thd->ha_data[hton->slot].ha_ptr;
 
258
}
 
259
 
 
260
extern "C"
 
261
int64_t thd_test_options(const THD *thd, int64_t test_options)
 
262
{
 
263
  return thd->options & test_options;
 
264
}
 
265
 
 
266
extern "C"
 
267
int thd_sql_command(const THD *thd)
 
268
{
 
269
  return (int) thd->lex->sql_command;
 
270
}
 
271
 
 
272
extern "C"
 
273
int thd_tx_isolation(const THD *thd)
 
274
{
 
275
  return (int) thd->variables.tx_isolation;
 
276
}
 
277
 
 
278
extern "C"
 
279
void thd_inc_row_count(THD *thd)
 
280
{
 
281
  thd->row_count++;
 
282
}
 
283
 
 
284
/**
 
285
  Clear this diagnostics area. 
 
286
 
 
287
  Normally called at the end of a statement.
 
288
*/
 
289
 
 
290
void
 
291
Diagnostics_area::reset_diagnostics_area()
 
292
{
 
293
  can_overwrite_status= false;
 
294
  /** Don't take chances in production */
 
295
  m_message[0]= '\0';
 
296
  m_sql_errno= 0;
 
297
  m_server_status= 0;
 
298
  m_affected_rows= 0;
 
299
  m_last_insert_id= 0;
 
300
  m_total_warn_count= 0;
 
301
  is_sent= false;
 
302
  /** Tiny reset in debug mode to see garbage right away */
 
303
  m_status= DA_EMPTY;
 
304
}
 
305
 
 
306
 
 
307
/**
 
308
  Set OK status -- ends commands that do not return a
 
309
  result set, e.g. INSERT/UPDATE/DELETE.
 
310
*/
 
311
 
 
312
void
 
313
Diagnostics_area::set_ok_status(THD *thd, ha_rows affected_rows_arg,
 
314
                                uint64_t last_insert_id_arg,
 
315
                                const char *message_arg)
 
316
{
 
317
  assert(! is_set());
 
318
  /*
 
319
    In production, refuse to overwrite an error or a custom response
 
320
    with an OK packet.
 
321
  */
 
322
  if (is_error() || is_disabled())
 
323
    return;
 
324
  /** Only allowed to report success if has not yet reported an error */
 
325
 
 
326
  m_server_status= thd->server_status;
 
327
  m_total_warn_count= thd->total_warn_count;
 
328
  m_affected_rows= affected_rows_arg;
 
329
  m_last_insert_id= last_insert_id_arg;
 
330
  if (message_arg)
 
331
    strmake(m_message, message_arg, sizeof(m_message) - 1);
 
332
  else
 
333
    m_message[0]= '\0';
 
334
  m_status= DA_OK;
 
335
}
 
336
 
 
337
 
 
338
/**
 
339
  Set EOF status.
 
340
*/
 
341
 
 
342
void
 
343
Diagnostics_area::set_eof_status(THD *thd)
 
344
{
 
345
  /** Only allowed to report eof if has not yet reported an error */
 
346
 
 
347
  assert(! is_set());
 
348
  /*
 
349
    In production, refuse to overwrite an error or a custom response
 
350
    with an EOF packet.
 
351
  */
 
352
  if (is_error() || is_disabled())
 
353
    return;
 
354
 
 
355
  m_server_status= thd->server_status;
 
356
  /*
 
357
    If inside a stored procedure, do not return the total
 
358
    number of warnings, since they are not available to the client
 
359
    anyway.
 
360
  */
 
361
  m_total_warn_count= thd->total_warn_count;
 
362
 
 
363
  m_status= DA_EOF;
 
364
}
 
365
 
 
366
/**
 
367
  Set ERROR status.
 
368
*/
 
369
 
 
370
void
 
371
Diagnostics_area::set_error_status(THD *thd __attribute__((unused)),
 
372
                                   uint32_t sql_errno_arg,
 
373
                                   const char *message_arg)
 
374
{
 
375
  /*
 
376
    Only allowed to report error if has not yet reported a success
 
377
    The only exception is when we flush the message to the client,
 
378
    an error can happen during the flush.
 
379
  */
 
380
  assert(! is_set() || can_overwrite_status);
 
381
  /*
 
382
    In production, refuse to overwrite a custom response with an
 
383
    ERROR packet.
 
384
  */
 
385
  if (is_disabled())
 
386
    return;
 
387
 
 
388
  m_sql_errno= sql_errno_arg;
 
389
  strmake(m_message, message_arg, sizeof(m_message) - 1);
 
390
 
 
391
  m_status= DA_ERROR;
 
392
}
 
393
 
 
394
 
 
395
/**
 
396
  Mark the diagnostics area as 'DISABLED'.
 
397
 
 
398
  This is used in rare cases when the COM_ command at hand sends a response
 
399
  in a custom format. One example is the query cache, another is
 
400
  COM_STMT_PREPARE.
 
401
*/
 
402
 
 
403
void
 
404
Diagnostics_area::disable_status()
 
405
{
 
406
  assert(! is_set());
 
407
  m_status= DA_DISABLED;
 
408
}
 
409
 
 
410
 
 
411
THD::THD()
 
412
   :Statement(&main_lex, &main_mem_root,
 
413
              /* statement id */ 0),
 
414
   Open_tables_state(refresh_version), rli_fake(0),
 
415
   lock_id(&main_lock_id),
 
416
   user_time(0),
 
417
   binlog_table_maps(0), binlog_flags(0UL),
 
418
   arg_of_last_insert_id_function(false),
 
419
   first_successful_insert_id_in_prev_stmt(0),
 
420
   first_successful_insert_id_in_prev_stmt_for_binlog(0),
 
421
   first_successful_insert_id_in_cur_stmt(0),
 
422
   stmt_depends_on_first_successful_insert_id_in_prev_stmt(false),
 
423
   global_read_lock(0),
 
424
   is_fatal_error(0),
 
425
   transaction_rollback_request(0),
 
426
   is_fatal_sub_stmt_error(0),
 
427
   rand_used(0),
 
428
   time_zone_used(0),
 
429
   in_lock_tables(0),
 
430
   bootstrap(0),
 
431
   derived_tables_processing(false),
 
432
   m_lip(NULL)
 
433
{
 
434
  ulong tmp;
188
435
 
189
436
  /*
190
437
    Pass nominal parameters to init_alloc_root only to ensure that
191
438
    the destructor works OK in case of an error. The main_mem_root
192
439
    will be re-initialized in init_for_queries().
193
440
  */
194
 
  memory::init_sql_alloc(&main_mem_root, memory::ROOT_MIN_BLOCK_SIZE, 0);
195
 
  thread_stack= NULL;
196
 
  count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
 
441
  init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0);
 
442
  thread_stack= 0;
 
443
  catalog= (char*)"std"; // the only catalog we have for now
 
444
  main_security_ctx.init();
 
445
  security_ctx= &main_security_ctx;
 
446
  some_tables_deleted=no_errors=password= 0;
 
447
  query_start_used= 0;
 
448
  count_cuted_fields= CHECK_FIELD_IGNORE;
197
449
  killed= NOT_KILLED;
198
 
  col_access= 0;
199
 
  tmp_table= 0;
200
 
  used_tables= 0;
 
450
  col_access=0;
 
451
  is_slave_error= thread_specific_used= false;
 
452
  hash_clear(&handler_tables_hash);
 
453
  tmp_table=0;
 
454
  used_tables=0;
201
455
  cuted_fields= sent_row_count= row_count= 0L;
 
456
  limit_found_rows= 0;
202
457
  row_count_func= -1;
203
458
  statement_id_counter= 0UL;
204
 
  // Must be reset to handle error with Session's created for init of mysqld
 
459
  // Must be reset to handle error with THD's created for init of mysqld
205
460
  lex->current_select= 0;
206
461
  start_time=(time_t) 0;
207
462
  start_utime= 0L;
208
463
  utime_after_lock= 0L;
 
464
  current_linfo =  0;
 
465
  slave_thread = 0;
209
466
  memset(&variables, 0, sizeof(variables));
210
467
  thread_id= 0;
 
468
  one_shot_set= 0;
211
469
  file_id = 0;
212
470
  query_id= 0;
213
 
  warn_query_id= 0;
214
 
  mysys_var= 0;
215
 
  scoreboard_index= -1;
216
 
  dbug_sentry=Session_SENTRY_MAGIC;
217
 
  cleanup_done= abort_on_warning= no_warnings_for_error= false;  
218
 
 
219
 
  /* query_cache init */
220
 
  query_cache_key= "";
221
 
  resultset= NULL;
 
471
  warn_id= 0;
 
472
  db_charset= global_system_variables.collation_database;
 
473
  memset(ha_data, 0, sizeof(ha_data));
 
474
  mysys_var=0;
 
475
  binlog_evt_union.do_union= false;
 
476
  dbug_sentry=THD_SENTRY_MAGIC;
 
477
  net.vio=0;
 
478
  client_capabilities= 0;                       // minimalistic client
 
479
  system_thread= NON_SYSTEM_THREAD;
 
480
  cleanup_done= abort_on_warning= no_warnings_for_error= 0;
 
481
  peer_port= 0;                                 // For SHOW PROCESSLIST
 
482
  transaction.m_pending_rows_event= 0;
 
483
  transaction.on= 1;
 
484
  pthread_mutex_init(&LOCK_delete, MY_MUTEX_INIT_FAST);
222
485
 
223
486
  /* Variables with default values */
224
487
  proc_info="login";
225
 
  where= Session::DEFAULT_WHERE;
226
 
  command= COM_CONNECT;
227
 
 
228
 
  plugin_sessionvar_init(this);
 
488
  where= THD::DEFAULT_WHERE;
 
489
  server_id = ::server_id;
 
490
  slave_net = 0;
 
491
  command=COM_CONNECT;
 
492
  *scramble= '\0';
 
493
 
 
494
  init();
 
495
  /* Initialize sub structures */
 
496
  init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
 
497
  user_connect=(USER_CONN *)0;
 
498
  hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0,
 
499
            (hash_get_key) get_var_key,
 
500
            (hash_free_key) free_user_var, 0);
 
501
 
 
502
  /* For user vars replication*/
 
503
  if (opt_bin_log)
 
504
    my_init_dynamic_array(&user_var_events,
 
505
                          sizeof(BINLOG_USER_VAR_EVENT *), 16, 16);
 
506
  else
 
507
    memset(&user_var_events, 0, sizeof(user_var_events));
 
508
 
 
509
  /* Protocol */
 
510
  protocol= &protocol_text;                     // Default protocol
 
511
  protocol_text.init(this);
 
512
 
 
513
  tablespace_op= false;
 
514
  tmp= sql_rnd();
 
515
  randominit(&rand, tmp + (ulong) &rand, tmp + (ulong) ::global_query_id);
 
516
  substitute_null_with_insert_id = false;
 
517
  thr_lock_info_init(&lock_info); /* safety: will be reset after start */
 
518
  thr_lock_owner_init(&main_lock_id, &lock_info);
 
519
 
 
520
  m_internal_handler= NULL;
 
521
}
 
522
 
 
523
 
 
524
void THD::push_internal_handler(Internal_error_handler *handler)
 
525
{
 
526
  /*
 
527
    TODO: The current implementation is limited to 1 handler at a time only.
 
528
    THD and sp_rcontext need to be modified to use a common handler stack.
 
529
  */
 
530
  assert(m_internal_handler == NULL);
 
531
  m_internal_handler= handler;
 
532
}
 
533
 
 
534
 
 
535
bool THD::handle_error(uint32_t sql_errno, const char *message,
 
536
                       DRIZZLE_ERROR::enum_warning_level level)
 
537
{
 
538
  if (m_internal_handler)
 
539
  {
 
540
    return m_internal_handler->handle_error(sql_errno, message, level, this);
 
541
  }
 
542
 
 
543
  return false;                                 // 'false', as per coding style
 
544
}
 
545
 
 
546
 
 
547
void THD::pop_internal_handler()
 
548
{
 
549
  assert(m_internal_handler != NULL);
 
550
  m_internal_handler= NULL;
 
551
}
 
552
 
 
553
extern "C"
 
554
void *thd_alloc(DRIZZLE_THD thd, unsigned int size)
 
555
{
 
556
  return thd->alloc(size);
 
557
}
 
558
 
 
559
extern "C"
 
560
void *thd_calloc(DRIZZLE_THD thd, unsigned int size)
 
561
{
 
562
  return thd->calloc(size);
 
563
}
 
564
 
 
565
extern "C"
 
566
char *thd_strdup(DRIZZLE_THD thd, const char *str)
 
567
{
 
568
  return thd->strdup(str);
 
569
}
 
570
 
 
571
extern "C"
 
572
char *thd_strmake(DRIZZLE_THD thd, const char *str, unsigned int size)
 
573
{
 
574
  return thd->strmake(str, size);
 
575
}
 
576
 
 
577
extern "C"
 
578
LEX_STRING *thd_make_lex_string(THD *thd, LEX_STRING *lex_str,
 
579
                                const char *str, unsigned int size,
 
580
                                int allocate_lex_string)
 
581
{
 
582
  return thd->make_lex_string(lex_str, str, size,
 
583
                              (bool) allocate_lex_string);
 
584
}
 
585
 
 
586
extern "C"
 
587
void *thd_memdup(DRIZZLE_THD thd, const void* str, unsigned int size)
 
588
{
 
589
  return thd->memdup(str, size);
 
590
}
 
591
 
 
592
extern "C"
 
593
void thd_get_xid(const DRIZZLE_THD thd, DRIZZLE_XID *xid)
 
594
{
 
595
  *xid = *(DRIZZLE_XID *) &thd->transaction.xid_state.xid;
 
596
}
 
597
 
 
598
/*
 
599
  Init common variables that has to be reset on start and on change_user
 
600
*/
 
601
 
 
602
void THD::init(void)
 
603
{
 
604
  pthread_mutex_lock(&LOCK_global_system_variables);
 
605
  plugin_thdvar_init(this);
 
606
  variables.time_format= date_time_format_copy((THD*) 0,
 
607
                                               variables.time_format);
 
608
  variables.date_format= date_time_format_copy((THD*) 0,
 
609
                                               variables.date_format);
 
610
  variables.datetime_format= date_time_format_copy((THD*) 0,
 
611
                                                   variables.datetime_format);
229
612
  /*
230
613
    variables= global_system_variables above has reset
231
614
    variables.pseudo_thread_id to 0. We need to correct it here to
232
615
    avoid temporary tables replication failure.
233
616
  */
234
617
  variables.pseudo_thread_id= thread_id;
 
618
  pthread_mutex_unlock(&LOCK_global_system_variables);
235
619
  server_status= SERVER_STATUS_AUTOCOMMIT;
236
 
  options= session_startup_options;
 
620
  options= thd_startup_options;
237
621
 
238
622
  if (variables.max_join_size == HA_POS_ERROR)
239
623
    options |= OPTION_BIG_SELECTS;
240
624
  else
241
625
    options &= ~OPTION_BIG_SELECTS;
242
626
 
 
627
  transaction.all.modified_non_trans_table= transaction.stmt.modified_non_trans_table= false;
243
628
  open_options=ha_open_options;
244
 
  update_lock_default= TL_WRITE;
 
629
  update_lock_default= (variables.low_priority_updates ?
 
630
                        TL_WRITE_LOW_PRIORITY :
 
631
                        TL_WRITE);
245
632
  session_tx_isolation= (enum_tx_isolation) variables.tx_isolation;
246
633
  warn_list.empty();
247
634
  memset(warn_count, 0, sizeof(warn_count));
248
635
  total_warn_count= 0;
 
636
  update_charset();
 
637
  reset_current_stmt_binlog_row_based();
249
638
  memset(&status_var, 0, sizeof(status_var));
250
 
 
251
 
  /* Initialize sub structures */
252
 
  memory::init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
253
 
 
254
 
  substitute_null_with_insert_id = false;
255
 
  lock_info.init(); /* safety: will be reset after start */
256
 
  thr_lock_owner_init(&main_lock_id, &lock_info);
257
 
 
258
 
  m_internal_handler= NULL;
259
 
  
260
 
  plugin::EventObserver::registerSessionEvents(*this); 
261
 
}
262
 
 
263
 
void Session::free_items()
264
 
{
265
 
  Item *next;
266
 
  /* This works because items are allocated with memory::sql_alloc() */
267
 
  for (; free_list; free_list= next)
268
 
  {
269
 
    next= free_list->next;
270
 
    free_list->delete_self();
271
 
  }
272
 
}
273
 
 
274
 
void Session::push_internal_handler(Internal_error_handler *handler)
275
 
{
276
 
  /*
277
 
    TODO: The current implementation is limited to 1 handler at a time only.
278
 
    Session and sp_rcontext need to be modified to use a common handler stack.
279
 
  */
280
 
  assert(m_internal_handler == NULL);
281
 
  m_internal_handler= handler;
282
 
}
283
 
 
284
 
bool Session::handle_error(uint32_t sql_errno, const char *message,
285
 
                       DRIZZLE_ERROR::enum_warning_level level)
286
 
{
287
 
  if (m_internal_handler)
288
 
  {
289
 
    return m_internal_handler->handle_error(sql_errno, message, level, this);
290
 
  }
291
 
 
292
 
  return false;                                 // 'false', as per coding style
293
 
}
294
 
 
295
 
void Session::setAbort(bool arg)
296
 
{
297
 
  mysys_var->abort= arg;
298
 
}
299
 
 
300
 
void Session::lockOnSys()
301
 
{
302
 
  if (not mysys_var)
303
 
    return;
304
 
 
305
 
  setAbort(true);
306
 
  boost::mutex::scoped_lock scopedLock(mysys_var->mutex);
307
 
  if (mysys_var->current_cond)
308
 
  {
309
 
    mysys_var->current_mutex->lock();
310
 
    pthread_cond_broadcast(mysys_var->current_cond->native_handle());
311
 
    mysys_var->current_mutex->unlock();
312
 
  }
313
 
}
314
 
 
315
 
void Session::pop_internal_handler()
316
 
{
317
 
  assert(m_internal_handler != NULL);
318
 
  m_internal_handler= NULL;
319
 
}
320
 
 
321
 
void Session::get_xid(DRIZZLE_XID *xid)
322
 
{
323
 
  *xid = *(DRIZZLE_XID *) &transaction.xid_state.xid;
324
 
}
 
639
}
 
640
 
 
641
 
 
642
/*
 
643
  Init THD for query processing.
 
644
  This has to be called once before we call mysql_parse.
 
645
  See also comments in sql_class.h.
 
646
*/
 
647
 
 
648
void THD::init_for_queries()
 
649
{
 
650
  set_time(); 
 
651
  ha_enable_transaction(this,true);
 
652
 
 
653
  reset_root_defaults(mem_root, variables.query_alloc_block_size,
 
654
                      variables.query_prealloc_size);
 
655
  reset_root_defaults(&transaction.mem_root,
 
656
                      variables.trans_alloc_block_size,
 
657
                      variables.trans_prealloc_size);
 
658
  transaction.xid_state.xid.null();
 
659
  transaction.xid_state.in_thd=1;
 
660
}
 
661
 
325
662
 
326
663
/* Do operations that may take a long time */
327
664
 
328
 
void Session::cleanup(void)
 
665
void THD::cleanup(void)
329
666
{
330
 
  assert(cleanup_done == false);
 
667
  assert(cleanup_done == 0);
331
668
 
332
669
  killed= KILL_CONNECTION;
333
670
#ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE
337
674
  }
338
675
#endif
339
676
  {
340
 
    TransactionServices &transaction_services= TransactionServices::singleton();
341
 
    transaction_services.rollbackTransaction(this, true);
 
677
    ha_rollback(this);
342
678
    xid_cache_delete(&transaction.xid_state);
343
679
  }
344
 
 
345
 
  for (UserVars::iterator iter= user_vars.begin();
346
 
       iter != user_vars.end();
347
 
       iter++)
 
680
  if (locked_tables)
348
681
  {
349
 
    user_var_entry *entry= (*iter).second;
350
 
    delete entry;
 
682
    lock=locked_tables; locked_tables=0;
 
683
    close_thread_tables(this);
351
684
  }
352
 
  user_vars.clear();
353
 
 
354
 
 
355
 
  close_temporary_tables();
356
 
 
 
685
  mysql_ha_cleanup(this);
 
686
  delete_dynamic(&user_var_events);
 
687
  hash_free(&user_vars);
 
688
  close_temporary_tables(this);
 
689
  free((char*) variables.time_format);
 
690
  free((char*) variables.date_format);
 
691
  free((char*) variables.datetime_format);
 
692
  
357
693
  if (global_read_lock)
358
694
    unlock_global_read_lock(this);
359
695
 
360
 
  cleanup_done= true;
 
696
  cleanup_done=1;
 
697
  return;
361
698
}
362
699
 
363
 
Session::~Session()
 
700
THD::~THD()
364
701
{
365
 
  this->checkSentry();
366
 
 
367
 
  if (client->isConnected())
368
 
  {
369
 
    if (global_system_variables.log_warnings)
370
 
        errmsg_printf(ERRMSG_LVL_WARN, ER(ER_FORCING_CLOSE),internal::my_progname,
371
 
                      thread_id,
372
 
                      (getSecurityContext().getUser().c_str() ?
373
 
                       getSecurityContext().getUser().c_str() : ""));
374
 
    disconnect(0, false);
375
 
  }
 
702
  THD_CHECK_SENTRY(this);
 
703
  /* Ensure that no one is using THD */
 
704
  pthread_mutex_lock(&LOCK_delete);
 
705
  pthread_mutex_unlock(&LOCK_delete);
 
706
  add_to_status(&global_status_var, &status_var);
376
707
 
377
708
  /* Close connection */
378
 
  client->close();
379
 
  delete client;
380
 
 
381
 
  if (cleanup_done == false)
 
709
  if (net.vio)
 
710
  {
 
711
    net_close(&net);
 
712
    net_end(&net);
 
713
  }
 
714
  if (!cleanup_done)
382
715
    cleanup();
383
716
 
384
 
  plugin::StorageEngine::closeConnection(this);
385
 
  plugin_sessionvar_cleanup(this);
 
717
  ha_close_connection(this);
 
718
  plugin_thdvar_cleanup(this);
386
719
 
387
 
  warn_root.free_root(MYF(0));
 
720
  main_security_ctx.destroy();
 
721
  if (db)
 
722
  {
 
723
    free(db);
 
724
    db= NULL;
 
725
  }
 
726
  free_root(&warn_root,MYF(0));
 
727
  free_root(&transaction.mem_root,MYF(0));
388
728
  mysys_var=0;                                  // Safety (shouldn't be needed)
389
 
  dbug_sentry= Session_SENTRY_GONE;
390
 
 
391
 
  main_mem_root.free_root(MYF(0));
392
 
  currentMemRoot().release();
393
 
  currentSession().release();
394
 
 
395
 
  plugin::Logging::postEndDo(this);
396
 
  plugin::EventObserver::deregisterSessionEvents(*this); 
397
 
 
398
 
  for (PropertyMap::iterator iter= life_properties.begin(); iter != life_properties.end(); iter++)
 
729
  pthread_mutex_destroy(&LOCK_delete);
 
730
  dbug_sentry= THD_SENTRY_GONE;
 
731
  if (rli_fake)
399
732
  {
400
 
    delete (*iter).second;
 
733
    delete rli_fake;
 
734
    rli_fake= NULL;
401
735
  }
402
 
  life_properties.clear();
403
 
 
404
 
  /* Ensure that no one is using Session */
405
 
  LOCK_delete.unlock();
406
 
}
407
 
 
408
 
void Session::awake(Session::killed_state state_to_set)
409
 
{
410
 
  this->checkSentry();
411
 
  safe_mutex_assert_owner(&LOCK_delete);
 
736
  
 
737
  free_root(&main_mem_root, MYF(0));
 
738
  return;
 
739
}
 
740
 
 
741
 
 
742
/*
 
743
  Add all status variables to another status variable array
 
744
 
 
745
  SYNOPSIS
 
746
   add_to_status()
 
747
   to_var       add to this array
 
748
   from_var     from this array
 
749
 
 
750
  NOTES
 
751
    This function assumes that all variables are long/ulong.
 
752
    If this assumption will change, then we have to explictely add
 
753
    the other variables after the while loop
 
754
*/
 
755
 
 
756
void add_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var)
 
757
{
 
758
  ulong *end= (ulong*) ((unsigned char*) to_var +
 
759
                        offsetof(STATUS_VAR, last_system_status_var) +
 
760
                        sizeof(ulong));
 
761
  ulong *to= (ulong*) to_var, *from= (ulong*) from_var;
 
762
 
 
763
  while (to != end)
 
764
    *(to++)+= *(from++);
 
765
}
 
766
 
 
767
/*
 
768
  Add the difference between two status variable arrays to another one.
 
769
 
 
770
  SYNOPSIS
 
771
    add_diff_to_status
 
772
    to_var       add to this array
 
773
    from_var     from this array
 
774
    dec_var      minus this array
 
775
  
 
776
  NOTE
 
777
    This function assumes that all variables are long/ulong.
 
778
*/
 
779
 
 
780
void add_diff_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var,
 
781
                        STATUS_VAR *dec_var)
 
782
{
 
783
  ulong *end= (ulong*) ((unsigned char*) to_var + offsetof(STATUS_VAR,
 
784
                                                  last_system_status_var) +
 
785
                        sizeof(ulong));
 
786
  ulong *to= (ulong*) to_var, *from= (ulong*) from_var, *dec= (ulong*) dec_var;
 
787
 
 
788
  while (to != end)
 
789
    *(to++)+= *(from++) - *(dec++);
 
790
}
 
791
 
 
792
 
 
793
void THD::awake(THD::killed_state state_to_set)
 
794
{
 
795
  THD_CHECK_SENTRY(this);
 
796
  safe_mutex_assert_owner(&LOCK_delete); 
412
797
 
413
798
  killed= state_to_set;
414
 
  if (state_to_set != Session::KILL_QUERY)
 
799
  if (state_to_set != THD::KILL_QUERY)
415
800
  {
416
 
    scheduler->killSession(this);
417
 
    DRIZZLE_CONNECTION_DONE(thread_id);
 
801
    thr_alarm_kill(thread_id);
 
802
    if (!slave_thread)
 
803
      thread_scheduler.post_kill_notification(this);
418
804
  }
419
805
  if (mysys_var)
420
806
  {
421
 
    boost::mutex::scoped_lock scopedLock(mysys_var->mutex);
 
807
    pthread_mutex_lock(&mysys_var->mutex);
 
808
    if (!system_thread)         // Don't abort locks
 
809
      mysys_var->abort=1;
422
810
    /*
423
 
      "
424
811
      This broadcast could be up in the air if the victim thread
425
812
      exits the cond in the time between read and broadcast, but that is
426
813
      ok since all we want to do is to make the victim thread get out
436
823
      current_cond and current_mutex are 0), then the victim will not get
437
824
      a signal and it may wait "forever" on the cond (until
438
825
      we issue a second KILL or the status it's waiting for happens).
439
 
      It's true that we have set its session->killed but it may not
 
826
      It's true that we have set its thd->killed but it may not
440
827
      see it immediately and so may have time to reach the cond_wait().
441
828
    */
442
829
    if (mysys_var->current_cond && mysys_var->current_mutex)
443
830
    {
444
 
      mysys_var->current_mutex->lock();
445
 
      pthread_cond_broadcast(mysys_var->current_cond->native_handle());
446
 
      mysys_var->current_mutex->unlock();
 
831
      pthread_mutex_lock(mysys_var->current_mutex);
 
832
      pthread_cond_broadcast(mysys_var->current_cond);
 
833
      pthread_mutex_unlock(mysys_var->current_mutex);
447
834
    }
 
835
    pthread_mutex_unlock(&mysys_var->mutex);
448
836
  }
 
837
  return;
449
838
}
450
839
 
451
840
/*
452
841
  Remember the location of thread info, the structure needed for
453
 
  memory::sql_alloc() and the structure for the net buffer
 
842
  sql_alloc() and the structure for the net buffer
454
843
*/
455
 
bool Session::storeGlobals()
 
844
 
 
845
bool THD::store_globals()
456
846
{
457
847
  /*
458
848
    Assert that thread_stack is initialized: it's necessary to be able
460
850
  */
461
851
  assert(thread_stack);
462
852
 
463
 
  currentSession().release();
464
 
  currentSession().reset(this);
465
 
 
466
 
  currentMemRoot().release();
467
 
  currentMemRoot().reset(&mem_root);
468
 
 
 
853
  if (my_pthread_setspecific_ptr(THR_THD,  this) ||
 
854
      my_pthread_setspecific_ptr(THR_MALLOC, &mem_root))
 
855
    return 1;
469
856
  mysys_var=my_thread_var;
470
 
 
471
857
  /*
472
858
    Let mysqld define the thread id (not mysys)
473
 
    This allows us to move Session to different threads if needed.
 
859
    This allows us to move THD to different threads if needed.
474
860
  */
475
861
  mysys_var->id= thread_id;
 
862
  real_id= pthread_self();                      // For debugging
476
863
 
477
864
  /*
478
 
    We have to call thr_lock_info_init() again here as Session may have been
 
865
    We have to call thr_lock_info_init() again here as THD may have been
479
866
    created in another thread
480
867
  */
481
 
  lock_info.init();
482
 
 
483
 
  return false;
 
868
  thr_lock_info_init(&lock_info);
 
869
  return 0;
484
870
}
485
871
 
 
872
 
486
873
/*
487
 
  Init Session for query processing.
488
 
  This has to be called once before we call mysql_parse.
489
 
  See also comments in session.h.
 
874
  Cleanup after query.
 
875
 
 
876
  SYNOPSIS
 
877
    THD::cleanup_after_query()
 
878
 
 
879
  DESCRIPTION
 
880
    This function is used to reset thread data to its default state.
 
881
 
 
882
  NOTE
 
883
    This function is not suitable for setting thread data to some
 
884
    non-default values, as there is only one replication thread, so
 
885
    different master threads may overwrite data of each other on
 
886
    slave.
490
887
*/
491
888
 
492
 
void Session::prepareForQueries()
493
 
{
494
 
  if (variables.max_join_size == HA_POS_ERROR)
495
 
    options |= OPTION_BIG_SELECTS;
496
 
 
497
 
  version= refresh_version;
498
 
  set_proc_info(NULL);
499
 
  command= COM_SLEEP;
500
 
  set_time();
501
 
 
502
 
  mem_root->reset_root_defaults(variables.query_alloc_block_size,
503
 
                                variables.query_prealloc_size);
504
 
  transaction.xid_state.xid.null();
505
 
  transaction.xid_state.in_session=1;
506
 
  if (use_usage)
507
 
    resetUsage();
508
 
}
509
 
 
510
 
bool Session::initGlobals()
511
 
{
512
 
  if (storeGlobals())
513
 
  {
514
 
    disconnect(ER_OUT_OF_RESOURCES, true);
515
 
    status_var.aborted_connects++;
516
 
    return true;
517
 
  }
518
 
  return false;
519
 
}
520
 
 
521
 
void Session::run()
522
 
{
523
 
  if (initGlobals() || authenticate())
524
 
  {
525
 
    disconnect(0, true);
526
 
    return;
527
 
  }
528
 
 
529
 
  prepareForQueries();
530
 
 
531
 
  while (! client->haveError() && killed != KILL_CONNECTION)
532
 
  {
533
 
    if (! executeStatement())
534
 
      break;
535
 
  }
536
 
 
537
 
  disconnect(0, true);
538
 
}
539
 
 
540
 
bool Session::schedule()
541
 
{
542
 
  scheduler= plugin::Scheduler::getScheduler();
543
 
  assert(scheduler);
544
 
 
545
 
  connection_count.increment();
546
 
 
547
 
  if (connection_count > current_global_counters.max_used_connections)
548
 
  {
549
 
    current_global_counters.max_used_connections= connection_count;
550
 
  }
551
 
 
552
 
  current_global_counters.connections++;
553
 
  thread_id= variables.pseudo_thread_id= global_thread_id++;
554
 
 
555
 
  LOCK_thread_count.lock();
556
 
  getSessionList().push_back(this);
557
 
  LOCK_thread_count.unlock();
558
 
 
559
 
  if (scheduler->addSession(this))
560
 
  {
561
 
    DRIZZLE_CONNECTION_START(thread_id);
562
 
    char error_message_buff[DRIZZLE_ERRMSG_SIZE];
563
 
 
564
 
    killed= Session::KILL_CONNECTION;
565
 
 
566
 
    status_var.aborted_connects++;
567
 
 
568
 
    /* Can't use my_error() since store_globals has not been called. */
569
 
    /* TODO replace will better error message */
570
 
    snprintf(error_message_buff, sizeof(error_message_buff),
571
 
             ER(ER_CANT_CREATE_THREAD), 1);
572
 
    client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
573
 
    return true;
574
 
  }
575
 
 
576
 
  return false;
577
 
}
578
 
 
579
 
 
580
 
const char* Session::enter_cond(boost::condition_variable &cond, boost::mutex &mutex, const char* msg)
581
 
{
582
 
  const char* old_msg = get_proc_info();
583
 
  safe_mutex_assert_owner(mutex);
584
 
  mysys_var->current_mutex = &mutex;
585
 
  mysys_var->current_cond = &cond;
586
 
  this->set_proc_info(msg);
587
 
  return old_msg;
588
 
}
589
 
 
590
 
void Session::exit_cond(const char* old_msg)
591
 
{
592
 
  /*
593
 
    Putting the mutex unlock in exit_cond() ensures that
594
 
    mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
595
 
    locked (if that would not be the case, you'll get a deadlock if someone
596
 
    does a Session::awake() on you).
597
 
  */
598
 
  mysys_var->current_mutex->unlock();
599
 
  boost::mutex::scoped_lock scopedLock(mysys_var->mutex);
600
 
  mysys_var->current_mutex = 0;
601
 
  mysys_var->current_cond = 0;
602
 
  this->set_proc_info(old_msg);
603
 
}
604
 
 
605
 
bool Session::authenticate()
606
 
{
607
 
  lex_start(this);
608
 
  if (client->authenticate())
609
 
    return false;
610
 
 
611
 
  status_var.aborted_connects++;
612
 
 
613
 
  return true;
614
 
}
615
 
 
616
 
bool Session::checkUser(const char *passwd, uint32_t passwd_len, const char *in_db)
617
 
{
618
 
  const string passwd_str(passwd, passwd_len);
619
 
  bool is_authenticated=
620
 
    plugin::Authentication::isAuthenticated(getSecurityContext(),
621
 
                                            passwd_str);
622
 
 
623
 
  if (is_authenticated != true)
624
 
  {
625
 
    status_var.access_denied++;
626
 
    /* isAuthenticated has pushed the error message */
627
 
    return false;
628
 
  }
629
 
 
630
 
  /* Change database if necessary */
631
 
  if (in_db && in_db[0])
632
 
  {
633
 
    SchemaIdentifier identifier(in_db);
634
 
    if (mysql_change_db(this, identifier))
635
 
    {
636
 
      /* mysql_change_db() has pushed the error message. */
637
 
      return false;
638
 
    }
639
 
  }
640
 
  my_ok();
641
 
  password= test(passwd_len);          // remember for error messages
642
 
 
643
 
  /* Ready to handle queries */
644
 
  return true;
645
 
}
646
 
 
647
 
bool Session::executeStatement()
648
 
{
649
 
  char *l_packet= 0;
650
 
  uint32_t packet_length;
651
 
 
652
 
  enum enum_server_command l_command;
653
 
 
654
 
  /*
655
 
    indicator of uninitialized lex => normal flow of errors handling
656
 
    (see my_message_sql)
657
 
  */
658
 
  lex->current_select= 0;
659
 
  clear_error();
660
 
  main_da.reset_diagnostics_area();
661
 
 
662
 
  if (client->readCommand(&l_packet, &packet_length) == false)
663
 
    return false;
664
 
 
665
 
  if (killed == KILL_CONNECTION)
666
 
    return false;
667
 
 
668
 
  if (packet_length == 0)
669
 
    return true;
670
 
 
671
 
  l_command= (enum enum_server_command) (unsigned char) l_packet[0];
672
 
 
673
 
  if (command >= COM_END)
674
 
    command= COM_END;                           // Wrong command
675
 
 
676
 
  assert(packet_length);
677
 
  return ! dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
678
 
}
679
 
 
680
 
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
681
 
{
682
 
  /* Remove garbage at start and end of query */
683
 
  while (in_packet_length > 0 && my_isspace(charset(), in_packet[0]))
684
 
  {
685
 
    in_packet++;
686
 
    in_packet_length--;
687
 
  }
688
 
  const char *pos= in_packet + in_packet_length; /* Point at end null */
689
 
  while (in_packet_length > 0 &&
690
 
         (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
691
 
  {
692
 
    pos--;
693
 
    in_packet_length--;
694
 
  }
695
 
 
696
 
  query.assign(in_packet, in_packet + in_packet_length);
697
 
 
698
 
  return true;
699
 
}
700
 
 
701
 
bool Session::endTransaction(enum enum_mysql_completiontype completion)
702
 
{
703
 
  bool do_release= 0;
704
 
  bool result= true;
705
 
  TransactionServices &transaction_services= TransactionServices::singleton();
706
 
 
707
 
  if (transaction.xid_state.xa_state != XA_NOTR)
708
 
  {
709
 
    my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
710
 
    return false;
711
 
  }
712
 
  switch (completion)
713
 
  {
714
 
    case COMMIT:
715
 
      /*
716
 
       * We don't use endActiveTransaction() here to ensure that this works
717
 
       * even if there is a problem with the OPTION_AUTO_COMMIT flag
718
 
       * (Which of course should never happen...)
719
 
       */
720
 
      server_status&= ~SERVER_STATUS_IN_TRANS;
721
 
      if (transaction_services.commitTransaction(this, true))
722
 
        result= false;
723
 
      options&= ~(OPTION_BEGIN);
724
 
      break;
725
 
    case COMMIT_RELEASE:
726
 
      do_release= 1; /* fall through */
727
 
    case COMMIT_AND_CHAIN:
728
 
      result= endActiveTransaction();
729
 
      if (result == true && completion == COMMIT_AND_CHAIN)
730
 
        result= startTransaction();
731
 
      break;
732
 
    case ROLLBACK_RELEASE:
733
 
      do_release= 1; /* fall through */
734
 
    case ROLLBACK:
735
 
    case ROLLBACK_AND_CHAIN:
736
 
    {
737
 
      server_status&= ~SERVER_STATUS_IN_TRANS;
738
 
      if (transaction_services.rollbackTransaction(this, true))
739
 
        result= false;
740
 
      options&= ~(OPTION_BEGIN);
741
 
      if (result == true && (completion == ROLLBACK_AND_CHAIN))
742
 
        result= startTransaction();
743
 
      break;
744
 
    }
745
 
    default:
746
 
      my_error(ER_UNKNOWN_COM_ERROR, MYF(0));
747
 
      return false;
748
 
  }
749
 
 
750
 
  if (result == false)
751
 
    my_error(killed_errno(), MYF(0));
752
 
  else if ((result == true) && do_release)
753
 
    killed= Session::KILL_CONNECTION;
754
 
 
755
 
  return result;
756
 
}
757
 
 
758
 
bool Session::endActiveTransaction()
759
 
{
760
 
  bool result= true;
761
 
  TransactionServices &transaction_services= TransactionServices::singleton();
762
 
 
763
 
  if (transaction.xid_state.xa_state != XA_NOTR)
764
 
  {
765
 
    my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
766
 
    return false;
767
 
  }
768
 
  if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
769
 
  {
770
 
    server_status&= ~SERVER_STATUS_IN_TRANS;
771
 
    if (transaction_services.commitTransaction(this, true))
772
 
      result= false;
773
 
  }
774
 
  options&= ~(OPTION_BEGIN);
775
 
  return result;
776
 
}
777
 
 
778
 
bool Session::startTransaction(start_transaction_option_t opt)
779
 
{
780
 
  bool result= true;
781
 
 
782
 
  if (! endActiveTransaction())
783
 
  {
784
 
    result= false;
785
 
  }
786
 
  else
787
 
  {
788
 
    options|= OPTION_BEGIN;
789
 
    server_status|= SERVER_STATUS_IN_TRANS;
790
 
 
791
 
    if (plugin::TransactionalStorageEngine::notifyStartTransaction(this, opt))
792
 
    {
793
 
      result= false;
794
 
    }
795
 
  }
796
 
 
797
 
  return result;
798
 
}
799
 
 
800
 
void Session::cleanup_after_query()
801
 
{
802
 
  /*
803
 
    Reset rand_used so that detection of calls to rand() will save random
 
889
void THD::cleanup_after_query()
 
890
{
 
891
  /*
 
892
    Reset rand_used so that detection of calls to rand() will save random 
804
893
    seeds if needed by the slave.
805
894
  */
806
895
  {
807
896
    /* Forget those values, for next binlogger: */
 
897
    stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
808
898
    auto_inc_intervals_in_cur_stmt_for_binlog.empty();
 
899
    rand_used= 0;
809
900
  }
810
901
  if (first_successful_insert_id_in_cur_stmt > 0)
811
902
  {
812
903
    /* set what LAST_INSERT_ID() will return */
813
 
    first_successful_insert_id_in_prev_stmt= first_successful_insert_id_in_cur_stmt;
 
904
    first_successful_insert_id_in_prev_stmt= 
 
905
      first_successful_insert_id_in_cur_stmt;
814
906
    first_successful_insert_id_in_cur_stmt= 0;
815
907
    substitute_null_with_insert_id= true;
816
908
  }
817
 
  arg_of_last_insert_id_function= false;
 
909
  arg_of_last_insert_id_function= 0;
818
910
  /* Free Items that were created during this execution */
819
911
  free_items();
820
912
  /* Reset where. */
821
 
  where= Session::DEFAULT_WHERE;
822
 
 
823
 
  /* Reset the temporary shares we built */
824
 
  for (std::vector<TableShareInstance *>::iterator iter= temporary_shares.begin();
825
 
       iter != temporary_shares.end(); iter++)
826
 
  {
827
 
    delete *iter;
828
 
  }
829
 
  temporary_shares.clear();
 
913
  where= THD::DEFAULT_WHERE;
830
914
}
831
915
 
 
916
 
832
917
/**
833
918
  Create a LEX_STRING in this connection.
834
919
 
839
924
                              instead of using lex_str value
840
925
  @return  NULL on failure, or pointer to the LEX_STRING object
841
926
*/
842
 
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
843
 
                                     const std::string &str,
844
 
                                     bool allocate_lex_string)
845
 
{
846
 
  return make_lex_string(lex_str, str.c_str(), str.length(), allocate_lex_string);
847
 
}
848
 
 
849
 
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
850
 
                                     const char* str, uint32_t length,
851
 
                                     bool allocate_lex_string)
 
927
LEX_STRING *THD::make_lex_string(LEX_STRING *lex_str,
 
928
                                 const char* str, uint32_t length,
 
929
                                 bool allocate_lex_string)
852
930
{
853
931
  if (allocate_lex_string)
854
932
    if (!(lex_str= (LEX_STRING *)alloc(sizeof(LEX_STRING))))
855
933
      return 0;
856
 
  if (!(lex_str->str= mem_root->strmake_root(str, length)))
 
934
  if (!(lex_str->str= strmake_root(mem_root, str, length)))
857
935
    return 0;
858
936
  lex_str->length= length;
859
937
  return lex_str;
860
938
}
861
939
 
862
 
int Session::send_explain_fields(select_result *result)
 
940
 
 
941
/*
 
942
  Convert a string to another character set
 
943
 
 
944
  SYNOPSIS
 
945
    convert_string()
 
946
    to                          Store new allocated string here
 
947
    to_cs                       New character set for allocated string
 
948
    from                        String to convert
 
949
    from_length                 Length of string to convert
 
950
    from_cs                     Original character set
 
951
 
 
952
  NOTES
 
953
    to will be 0-terminated to make it easy to pass to system funcs
 
954
 
 
955
  RETURN
 
956
    0   ok
 
957
    1   End of memory.
 
958
        In this case to->str will point to 0 and to->length will be 0.
 
959
*/
 
960
 
 
961
bool THD::convert_string(LEX_STRING *to, const CHARSET_INFO * const to_cs,
 
962
                         const char *from, uint32_t from_length,
 
963
                         const CHARSET_INFO * const from_cs)
 
964
{
 
965
  size_t new_length= to_cs->mbmaxlen * from_length;
 
966
  uint32_t dummy_errors;
 
967
  if (!(to->str= (char*) alloc(new_length+1)))
 
968
  {
 
969
    to->length= 0;                              // Safety fix
 
970
    return(1);                          // EOM
 
971
  }
 
972
  to->length= copy_and_convert((char*) to->str, new_length, to_cs,
 
973
                               from, from_length, from_cs, &dummy_errors);
 
974
  to->str[to->length]=0;                        // Safety
 
975
  return(0);
 
976
}
 
977
 
 
978
 
 
979
/*
 
980
  Convert string from source character set to target character set inplace.
 
981
 
 
982
  SYNOPSIS
 
983
    THD::convert_string
 
984
 
 
985
  DESCRIPTION
 
986
    Convert string using convert_buffer - buffer for character set 
 
987
    conversion shared between all protocols.
 
988
 
 
989
  RETURN
 
990
    0   ok
 
991
   !0   out of memory
 
992
*/
 
993
 
 
994
bool THD::convert_string(String *s, const CHARSET_INFO * const from_cs,
 
995
                         const CHARSET_INFO * const to_cs)
 
996
{
 
997
  uint32_t dummy_errors;
 
998
  if (convert_buffer.copy(s->ptr(), s->length(), from_cs, to_cs, &dummy_errors))
 
999
    return true;
 
1000
  /* If convert_buffer >> s copying is more efficient long term */
 
1001
  if (convert_buffer.alloced_length() >= convert_buffer.length() * 2 ||
 
1002
      !s->is_alloced())
 
1003
  {
 
1004
    return s->copy(convert_buffer);
 
1005
  }
 
1006
  s->swap(convert_buffer);
 
1007
  return false;
 
1008
}
 
1009
 
 
1010
 
 
1011
/*
 
1012
  Update some cache variables when character set changes
 
1013
*/
 
1014
 
 
1015
void THD::update_charset()
 
1016
{
 
1017
  uint32_t not_used;
 
1018
  charset_is_system_charset= !String::needs_conversion(0,charset(),
 
1019
                                                       system_charset_info,
 
1020
                                                       &not_used);
 
1021
  charset_is_collation_connection= 
 
1022
    !String::needs_conversion(0,charset(),variables.collation_connection,
 
1023
                              &not_used);
 
1024
  charset_is_character_set_filesystem= 
 
1025
    !String::needs_conversion(0, charset(),
 
1026
                              variables.character_set_filesystem, &not_used);
 
1027
}
 
1028
 
 
1029
 
 
1030
/* routings to adding tables to list of changed in transaction tables */
 
1031
 
 
1032
inline static void list_include(CHANGED_TableList** prev,
 
1033
                                CHANGED_TableList* curr,
 
1034
                                CHANGED_TableList* new_table)
 
1035
{
 
1036
  if (new_table)
 
1037
  {
 
1038
    *prev = new_table;
 
1039
    (*prev)->next = curr;
 
1040
  }
 
1041
}
 
1042
 
 
1043
/* add table to list of changed in transaction tables */
 
1044
 
 
1045
void THD::add_changed_table(Table *table)
 
1046
{
 
1047
  assert((options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
 
1048
              table->file->has_transactions());
 
1049
  add_changed_table(table->s->table_cache_key.str,
 
1050
                    (long) table->s->table_cache_key.length);
 
1051
  return;
 
1052
}
 
1053
 
 
1054
 
 
1055
void THD::add_changed_table(const char *key, long key_length)
 
1056
{
 
1057
  CHANGED_TableList **prev_changed = &transaction.changed_tables;
 
1058
  CHANGED_TableList *curr = transaction.changed_tables;
 
1059
 
 
1060
  for (; curr; prev_changed = &(curr->next), curr = curr->next)
 
1061
  {
 
1062
    int cmp =  (long)curr->key_length - (long)key_length;
 
1063
    if (cmp < 0)
 
1064
    {
 
1065
      list_include(prev_changed, curr, changed_table_dup(key, key_length));
 
1066
      return;
 
1067
    }
 
1068
    else if (cmp == 0)
 
1069
    {
 
1070
      cmp = memcmp(curr->key, key, curr->key_length);
 
1071
      if (cmp < 0)
 
1072
      {
 
1073
        list_include(prev_changed, curr, changed_table_dup(key, key_length));
 
1074
        return;
 
1075
      }
 
1076
      else if (cmp == 0)
 
1077
      {
 
1078
        return;
 
1079
      }
 
1080
    }
 
1081
  }
 
1082
  *prev_changed = changed_table_dup(key, key_length);
 
1083
  return;
 
1084
}
 
1085
 
 
1086
 
 
1087
CHANGED_TableList* THD::changed_table_dup(const char *key, long key_length)
 
1088
{
 
1089
  CHANGED_TableList* new_table = 
 
1090
    (CHANGED_TableList*) trans_alloc(ALIGN_SIZE(sizeof(CHANGED_TableList))+
 
1091
                                      key_length + 1);
 
1092
  if (!new_table)
 
1093
  {
 
1094
    my_error(EE_OUTOFMEMORY, MYF(ME_BELL),
 
1095
             ALIGN_SIZE(sizeof(TableList)) + key_length + 1);
 
1096
    killed= KILL_CONNECTION;
 
1097
    return 0;
 
1098
  }
 
1099
 
 
1100
  new_table->key= ((char*)new_table)+ ALIGN_SIZE(sizeof(CHANGED_TableList));
 
1101
  new_table->next = 0;
 
1102
  new_table->key_length = key_length;
 
1103
  ::memcpy(new_table->key, key, key_length);
 
1104
  return new_table;
 
1105
}
 
1106
 
 
1107
 
 
1108
int THD::send_explain_fields(select_result *result)
863
1109
{
864
1110
  List<Item> field_list;
865
1111
  Item *item;
894
1140
  }
895
1141
  item->maybe_null= 1;
896
1142
  field_list.push_back(new Item_empty_string("Extra", 255, cs));
897
 
  return (result->send_fields(field_list));
898
 
}
899
 
 
900
 
void select_result::send_error(uint32_t errcode, const char *err)
 
1143
  return (result->send_fields(field_list,
 
1144
                              Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF));
 
1145
}
 
1146
 
 
1147
 
 
1148
struct Item_change_record: public ilink
 
1149
{
 
1150
  Item **place;
 
1151
  Item *old_value;
 
1152
  /* Placement new was hidden by `new' in ilink (TODO: check): */
 
1153
  static void *operator new(size_t size __attribute__((unused)),
 
1154
                            void *mem)
 
1155
    { return mem; }
 
1156
  static void operator delete(void *ptr __attribute__((unused)),
 
1157
                              size_t size __attribute__((unused)))
 
1158
    {}
 
1159
  static void operator delete(void *ptr __attribute__((unused)),
 
1160
                              void *mem __attribute__((unused)))
 
1161
    { /* never called */ }
 
1162
};
 
1163
 
 
1164
 
 
1165
/*
 
1166
  Register an item tree tree transformation, performed by the query
 
1167
  optimizer. We need a pointer to runtime_memroot because it may be !=
 
1168
  thd->mem_root (this may no longer be a true statement)
 
1169
*/
 
1170
 
 
1171
void THD::nocheck_register_item_tree_change(Item **place, Item *old_value,
 
1172
                                            MEM_ROOT *runtime_memroot)
 
1173
{
 
1174
  Item_change_record *change;
 
1175
  /*
 
1176
    Now we use one node per change, which adds some memory overhead,
 
1177
    but still is rather fast as we use alloc_root for allocations.
 
1178
    A list of item tree changes of an average query should be short.
 
1179
  */
 
1180
  void *change_mem= alloc_root(runtime_memroot, sizeof(*change));
 
1181
  if (change_mem == 0)
 
1182
  {
 
1183
    /*
 
1184
      OOM, thd->fatal_error() is called by the error handler of the
 
1185
      memroot. Just return.
 
1186
    */
 
1187
    return;
 
1188
  }
 
1189
  change= new (change_mem) Item_change_record;
 
1190
  change->place= place;
 
1191
  change->old_value= old_value;
 
1192
  change_list.append(change);
 
1193
}
 
1194
 
 
1195
 
 
1196
void THD::rollback_item_tree_changes()
 
1197
{
 
1198
  I_List_iterator<Item_change_record> it(change_list);
 
1199
  Item_change_record *change;
 
1200
 
 
1201
  while ((change= it++))
 
1202
    *change->place= change->old_value;
 
1203
  /* We can forget about changes memory: it's allocated in runtime memroot */
 
1204
  change_list.empty();
 
1205
  return;
 
1206
}
 
1207
 
 
1208
 
 
1209
/*****************************************************************************
 
1210
** Functions to provide a interface to select results
 
1211
*****************************************************************************/
 
1212
 
 
1213
select_result::select_result()
 
1214
{
 
1215
  thd=current_thd;
 
1216
}
 
1217
 
 
1218
void select_result::send_error(uint32_t errcode,const char *err)
901
1219
{
902
1220
  my_message(errcode, err, MYF(0));
903
1221
}
904
1222
 
 
1223
 
 
1224
void select_result::cleanup()
 
1225
{
 
1226
  /* do nothing */
 
1227
}
 
1228
 
 
1229
bool select_result::check_simple_select() const
 
1230
{
 
1231
  my_error(ER_SP_BAD_CURSOR_QUERY, MYF(0));
 
1232
  return true;
 
1233
}
 
1234
 
 
1235
 
 
1236
static String default_line_term("\n",default_charset_info);
 
1237
static String default_escaped("\\",default_charset_info);
 
1238
static String default_field_term("\t",default_charset_info);
 
1239
 
 
1240
sql_exchange::sql_exchange(char *name, bool flag,
 
1241
                           enum enum_filetype filetype_arg)
 
1242
  :file_name(name), opt_enclosed(0), dumpfile(flag), skip_lines(0)
 
1243
{
 
1244
  filetype= filetype_arg;
 
1245
  field_term= &default_field_term;
 
1246
  enclosed=   line_start= &my_empty_string;
 
1247
  line_term=  &default_line_term;
 
1248
  escaped=    &default_escaped;
 
1249
  cs= NULL;
 
1250
}
 
1251
 
 
1252
bool select_send::send_fields(List<Item> &list, uint32_t flags)
 
1253
{
 
1254
  bool res;
 
1255
  if (!(res= thd->protocol->send_fields(&list, flags)))
 
1256
    is_result_set_started= 1;
 
1257
  return res;
 
1258
}
 
1259
 
 
1260
void select_send::abort()
 
1261
{
 
1262
  return;
 
1263
}
 
1264
 
 
1265
 
 
1266
/** 
 
1267
  Cleanup an instance of this class for re-use
 
1268
  at next execution of a prepared statement/
 
1269
  stored procedure statement.
 
1270
*/
 
1271
 
 
1272
void select_send::cleanup()
 
1273
{
 
1274
  is_result_set_started= false;
 
1275
}
 
1276
 
 
1277
/* Send data to client. Returns 0 if ok */
 
1278
 
 
1279
bool select_send::send_data(List<Item> &items)
 
1280
{
 
1281
  if (unit->offset_limit_cnt)
 
1282
  {                                             // using limit offset,count
 
1283
    unit->offset_limit_cnt--;
 
1284
    return 0;
 
1285
  }
 
1286
 
 
1287
  /*
 
1288
    We may be passing the control from mysqld to the client: release the
 
1289
    InnoDB adaptive hash S-latch to avoid thread deadlocks if it was reserved
 
1290
    by thd
 
1291
  */
 
1292
  ha_release_temporary_latches(thd);
 
1293
 
 
1294
  List_iterator_fast<Item> li(items);
 
1295
  Protocol *protocol= thd->protocol;
 
1296
  char buff[MAX_FIELD_WIDTH];
 
1297
  String buffer(buff, sizeof(buff), &my_charset_bin);
 
1298
 
 
1299
  protocol->prepare_for_resend();
 
1300
  Item *item;
 
1301
  while ((item=li++))
 
1302
  {
 
1303
    if (item->send(protocol, &buffer))
 
1304
    {
 
1305
      protocol->free();                         // Free used buffer
 
1306
      my_message(ER_OUT_OF_RESOURCES, ER(ER_OUT_OF_RESOURCES), MYF(0));
 
1307
      break;
 
1308
    }
 
1309
  }
 
1310
  thd->sent_row_count++;
 
1311
  if (thd->is_error())
 
1312
  {
 
1313
    protocol->remove_last_row();
 
1314
    return(1);
 
1315
  }
 
1316
  if (thd->vio_ok())
 
1317
    return(protocol->write());
 
1318
  return(0);
 
1319
}
 
1320
 
 
1321
bool select_send::send_eof()
 
1322
{
 
1323
  /* 
 
1324
    We may be passing the control from mysqld to the client: release the
 
1325
    InnoDB adaptive hash S-latch to avoid thread deadlocks if it was reserved
 
1326
    by thd 
 
1327
  */
 
1328
  ha_release_temporary_latches(thd);
 
1329
 
 
1330
  /* Unlock tables before sending packet to gain some speed */
 
1331
  if (thd->lock)
 
1332
  {
 
1333
    mysql_unlock_tables(thd, thd->lock);
 
1334
    thd->lock=0;
 
1335
  }
 
1336
  ::my_eof(thd);
 
1337
  is_result_set_started= 0;
 
1338
  return false;
 
1339
}
 
1340
 
 
1341
 
905
1342
/************************************************************************
906
1343
  Handling writing to file
907
1344
************************************************************************/
911
1348
  my_message(errcode, err, MYF(0));
912
1349
  if (file > 0)
913
1350
  {
914
 
    (void) end_io_cache(cache);
915
 
    (void) internal::my_close(file, MYF(0));
916
 
    (void) internal::my_delete(path, MYF(0));           // Delete file on error
 
1351
    (void) end_io_cache(&cache);
 
1352
    (void) my_close(file,MYF(0));
 
1353
    (void) my_delete(path,MYF(0));              // Delete file on error
917
1354
    file= -1;
918
1355
  }
919
1356
}
921
1358
 
922
1359
bool select_to_file::send_eof()
923
1360
{
924
 
  int error= test(end_io_cache(cache));
925
 
  if (internal::my_close(file, MYF(MY_WME)))
 
1361
  int error= test(end_io_cache(&cache));
 
1362
  if (my_close(file,MYF(MY_WME)))
926
1363
    error= 1;
927
1364
  if (!error)
928
1365
  {
931
1368
      function, SELECT INTO has to have an own SQLCOM.
932
1369
      TODO: split from SQLCOM_SELECT
933
1370
    */
934
 
    session->my_ok(row_count);
 
1371
    ::my_ok(thd,row_count);
935
1372
  }
936
1373
  file= -1;
937
1374
  return error;
943
1380
  /* In case of error send_eof() may be not called: close the file here. */
944
1381
  if (file >= 0)
945
1382
  {
946
 
    (void) end_io_cache(cache);
947
 
    (void) internal::my_close(file, MYF(0));
 
1383
    (void) end_io_cache(&cache);
 
1384
    (void) my_close(file,MYF(0));
948
1385
    file= -1;
949
1386
  }
950
1387
  path[0]= '\0';
951
1388
  row_count= 0;
952
1389
}
953
1390
 
954
 
select_to_file::select_to_file(file_exchange *ex)
955
 
  : exchange(ex),
956
 
    file(-1),
957
 
    cache(static_cast<internal::IO_CACHE *>(memory::sql_calloc(sizeof(internal::IO_CACHE)))),
958
 
    row_count(0L)
959
 
{
960
 
  path[0]=0;
961
 
}
962
1391
 
963
1392
select_to_file::~select_to_file()
964
1393
{
965
 
  cleanup();
 
1394
  if (file >= 0)
 
1395
  {                                     // This only happens in case of error
 
1396
    (void) end_io_cache(&cache);
 
1397
    (void) my_close(file,MYF(0));
 
1398
    file= -1;
 
1399
  }
966
1400
}
967
1401
 
968
1402
/***************************************************************************
971
1405
 
972
1406
select_export::~select_export()
973
1407
{
974
 
  session->sent_row_count=row_count;
 
1408
  thd->sent_row_count=row_count;
975
1409
}
976
1410
 
977
1411
 
980
1414
 
981
1415
  SYNOPSIS
982
1416
    create_file()
983
 
    session                     Thread handle
 
1417
    thd                 Thread handle
984
1418
    path                File name
985
1419
    exchange            Excange class
986
1420
    cache               IO cache
991
1425
*/
992
1426
 
993
1427
 
994
 
static int create_file(Session *session, char *path, file_exchange *exchange, internal::IO_CACHE *cache)
 
1428
static File create_file(THD *thd, char *path, sql_exchange *exchange,
 
1429
                        IO_CACHE *cache)
995
1430
{
996
 
  int file;
 
1431
  File file;
997
1432
  uint32_t option= MY_UNPACK_FILENAME | MY_RELATIVE_PATH;
998
1433
 
999
1434
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
1000
1435
  option|= MY_REPLACE_DIR;                      // Force use of db directory
1001
1436
#endif
1002
1437
 
1003
 
  if (!internal::dirname_length(exchange->file_name))
 
1438
  if (!dirname_length(exchange->file_name))
1004
1439
  {
1005
 
    strcpy(path, getDataHomeCatalog().c_str());
1006
 
    strncat(path, "/", 1);
1007
 
    if (! session->db.empty())
1008
 
      strncat(path, session->db.c_str(), FN_REFLEN-getDataHomeCatalog().size());
1009
 
    (void) internal::fn_format(path, exchange->file_name, path, "", option);
 
1440
    strxnmov(path, FN_REFLEN-1, mysql_real_data_home, thd->db ? thd->db : "",
 
1441
             NULL);
 
1442
    (void) fn_format(path, exchange->file_name, path, "", option);
1010
1443
  }
1011
1444
  else
1012
 
    (void) internal::fn_format(path, exchange->file_name, getDataHomeCatalog().c_str(), "", option);
 
1445
    (void) fn_format(path, exchange->file_name, mysql_real_data_home, "", option);
1013
1446
 
1014
 
  if (opt_secure_file_priv)
 
1447
  if (opt_secure_file_priv &&
 
1448
      strncmp(opt_secure_file_priv, path, strlen(opt_secure_file_priv)))
1015
1449
  {
1016
 
    fs::path secure_file_path(fs::system_complete(fs::path(opt_secure_file_priv)));
1017
 
    fs::path target_path(fs::system_complete(fs::path(path)));
1018
 
    if (target_path.file_string().substr(0, secure_file_path.file_string().size()) != secure_file_path.file_string())
1019
 
    {
1020
 
      /* Write only allowed to dir or subdir specified by secure_file_priv */
1021
 
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
1022
 
      return -1;
1023
 
    }
 
1450
    /* Write only allowed to dir or subdir specified by secure_file_priv */
 
1451
    my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
 
1452
    return -1;
1024
1453
  }
1025
1454
 
1026
1455
  if (!access(path, F_OK))
1029
1458
    return -1;
1030
1459
  }
1031
1460
  /* Create the file world readable */
1032
 
  if ((file= internal::my_create(path, 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
 
1461
  if ((file= my_create(path, 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
1033
1462
    return file;
 
1463
#ifdef HAVE_FCHMOD
1034
1464
  (void) fchmod(file, 0666);                    // Because of umask()
1035
 
  if (init_io_cache(cache, file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
 
1465
#else
 
1466
  (void) chmod(path, 0666);
 
1467
#endif
 
1468
  if (init_io_cache(cache, file, 0L, WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1036
1469
  {
1037
 
    internal::my_close(file, MYF(0));
1038
 
    internal::my_delete(path, MYF(0));  // Delete file on error, it was just created
 
1470
    my_close(file, MYF(0));
 
1471
    my_delete(path, MYF(0));  // Delete file on error, it was just created 
1039
1472
    return -1;
1040
1473
  }
1041
1474
  return file;
1043
1476
 
1044
1477
 
1045
1478
int
1046
 
select_export::prepare(List<Item> &list, Select_Lex_Unit *u)
 
1479
select_export::prepare(List<Item> &list, SELECT_LEX_UNIT *u)
1047
1480
{
1048
1481
  bool blob_flag=0;
1049
1482
  bool string_results= false, non_string_results= false;
1050
1483
  unit= u;
1051
 
  if ((uint32_t) strlen(exchange->file_name) + NAME_LEN >= FN_REFLEN)
1052
 
    strncpy(path,exchange->file_name,FN_REFLEN-1);
 
1484
  if ((uint) strlen(exchange->file_name) + NAME_LEN >= FN_REFLEN)
 
1485
    strmake(path,exchange->file_name,FN_REFLEN-1);
1053
1486
 
 
1487
  if ((file= create_file(thd, path, exchange, &cache)) < 0)
 
1488
    return 1;
1054
1489
  /* Check if there is any blobs in data */
1055
1490
  {
1056
1491
    List_iterator_fast<Item> li(list);
1092
1527
      (exchange->opt_enclosed && non_string_results &&
1093
1528
       field_term_length && strchr(NUMERIC_CHARS, field_term_char)))
1094
1529
  {
1095
 
    my_error(ER_AMBIGUOUS_FIELD_TERM, MYF(0));
1096
 
    return 1;
 
1530
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1531
                 ER_AMBIGUOUS_FIELD_TERM, ER(ER_AMBIGUOUS_FIELD_TERM));
 
1532
    is_ambiguous_field_term= true;
1097
1533
  }
1098
 
 
1099
 
  if ((file= create_file(session, path, exchange, cache)) < 0)
1100
 
    return 1;
 
1534
  else
 
1535
    is_ambiguous_field_term= false;
1101
1536
 
1102
1537
  return 0;
1103
1538
}
1104
1539
 
 
1540
 
 
1541
#define NEED_ESCAPING(x) ((int) (unsigned char) (x) == escape_char    || \
 
1542
                          (enclosed ? (int) (unsigned char) (x) == field_sep_char      \
 
1543
                                    : (int) (unsigned char) (x) == field_term_char) || \
 
1544
                          (int) (unsigned char) (x) == line_sep_char  || \
 
1545
                          !(x))
 
1546
 
1105
1547
bool select_export::send_data(List<Item> &items)
1106
1548
{
1107
1549
  char buff[MAX_FIELD_WIDTH],null_buff[2],space[MAX_FIELD_WIDTH];
1119
1561
  uint32_t used_length=0,items_left=items.elements;
1120
1562
  List_iterator_fast<Item> li(items);
1121
1563
 
1122
 
  if (my_b_write(cache,(unsigned char*) exchange->line_start->ptr(),
1123
 
                 exchange->line_start->length()))
 
1564
  if (my_b_write(&cache,(unsigned char*) exchange->line_start->ptr(),
 
1565
                 exchange->line_start->length()))
1124
1566
    goto err;
1125
1567
  while ((item=li++))
1126
1568
  {
1130
1572
    res=item->str_result(&tmp);
1131
1573
    if (res && enclosed)
1132
1574
    {
1133
 
      if (my_b_write(cache,(unsigned char*) exchange->enclosed->ptr(),
1134
 
                     exchange->enclosed->length()))
1135
 
        goto err;
 
1575
      if (my_b_write(&cache,(unsigned char*) exchange->enclosed->ptr(),
 
1576
                     exchange->enclosed->length()))
 
1577
        goto err;
1136
1578
    }
1137
1579
    if (!res)
1138
1580
    {                                           // NULL
1139
1581
      if (!fixed_row_size)
1140
1582
      {
1141
 
        if (escape_char != -1)                  // Use \N syntax
1142
 
        {
1143
 
          null_buff[0]=escape_char;
1144
 
          null_buff[1]='N';
1145
 
          if (my_b_write(cache,(unsigned char*) null_buff,2))
1146
 
            goto err;
1147
 
        }
1148
 
        else if (my_b_write(cache,(unsigned char*) "NULL",4))
1149
 
          goto err;
 
1583
        if (escape_char != -1)                  // Use \N syntax
 
1584
        {
 
1585
          null_buff[0]=escape_char;
 
1586
          null_buff[1]='N';
 
1587
          if (my_b_write(&cache,(unsigned char*) null_buff,2))
 
1588
            goto err;
 
1589
        }
 
1590
        else if (my_b_write(&cache,(unsigned char*) "NULL",4))
 
1591
          goto err;
1150
1592
      }
1151
1593
      else
1152
1594
      {
1153
 
        used_length=0;                          // Fill with space
 
1595
        used_length=0;                          // Fill with space
1154
1596
      }
1155
1597
    }
1156
1598
    else
1157
1599
    {
1158
1600
      if (fixed_row_size)
1159
 
        used_length= min(res->length(),item->max_length);
 
1601
        used_length=cmin(res->length(),item->max_length);
1160
1602
      else
1161
 
        used_length= res->length();
1162
 
 
 
1603
        used_length=res->length();
1163
1604
      if ((result_type == STRING_RESULT || is_unsafe_field_sep) &&
1164
 
          escape_char != -1)
 
1605
           escape_char != -1)
1165
1606
      {
1166
1607
        char *pos, *start, *end;
1167
1608
        const CHARSET_INFO * const res_charset= res->charset();
1168
 
        const CHARSET_INFO * const character_set_client= default_charset_info;
1169
 
 
 
1609
        const CHARSET_INFO * const character_set_client= thd->variables.
 
1610
                                                            character_set_client;
1170
1611
        bool check_second_byte= (res_charset == &my_charset_bin) &&
1171
 
          character_set_client->
1172
 
          escape_with_backslash_is_dangerous;
 
1612
                                 character_set_client->
 
1613
                                 escape_with_backslash_is_dangerous;
1173
1614
        assert(character_set_client->mbmaxlen == 2 ||
1174
 
               !character_set_client->escape_with_backslash_is_dangerous);
1175
 
        for (start=pos=(char*) res->ptr(),end=pos+used_length ;
1176
 
             pos != end ;
1177
 
             pos++)
1178
 
        {
1179
 
          if (use_mb(res_charset))
1180
 
          {
1181
 
            int l;
1182
 
            if ((l=my_ismbchar(res_charset, pos, end)))
1183
 
            {
1184
 
              pos += l-1;
1185
 
              continue;
1186
 
            }
1187
 
          }
 
1615
                    !character_set_client->escape_with_backslash_is_dangerous);
 
1616
        for (start=pos=(char*) res->ptr(),end=pos+used_length ;
 
1617
             pos != end ;
 
1618
             pos++)
 
1619
        {
 
1620
#ifdef USE_MB
 
1621
          if (use_mb(res_charset))
 
1622
          {
 
1623
            int l;
 
1624
            if ((l=my_ismbchar(res_charset, pos, end)))
 
1625
            {
 
1626
              pos += l-1;
 
1627
              continue;
 
1628
            }
 
1629
          }
 
1630
#endif
1188
1631
 
1189
1632
          /*
1190
1633
            Special case when dumping BINARY/VARBINARY/BLOB values
1191
1634
            for the clients with character sets big5, cp932, gbk and sjis,
1192
1635
            which can have the escape character (0x5C "\" by default)
1193
1636
            as the second byte of a multi-byte sequence.
1194
 
 
 
1637
            
1195
1638
            If
1196
1639
            - pos[0] is a valid multi-byte head (e.g 0xEE) and
1197
1640
            - pos[1] is 0x00, which will be escaped as "\0",
1198
 
 
 
1641
            
1199
1642
            then we'll get "0xEE + 0x5C + 0x30" in the output file.
1200
 
 
 
1643
            
1201
1644
            If this file is later loaded using this sequence of commands:
1202
 
 
 
1645
            
1203
1646
            mysql> create table t1 (a varchar(128)) character set big5;
1204
1647
            mysql> LOAD DATA INFILE 'dump.txt' INTO Table t1;
1205
 
 
 
1648
            
1206
1649
            then 0x5C will be misinterpreted as the second byte
1207
1650
            of a multi-byte character "0xEE + 0x5C", instead of
1208
1651
            escape character for 0x00.
1209
 
 
 
1652
            
1210
1653
            To avoid this confusion, we'll escape the multi-byte
1211
1654
            head character too, so the sequence "0xEE + 0x00" will be
1212
1655
            dumped as "0x5C + 0xEE + 0x5C + 0x30".
1213
 
 
 
1656
            
1214
1657
            Note, in the condition below we only check if
1215
1658
            mbcharlen is equal to 2, because there are no
1216
1659
            character sets with mbmaxlen longer than 2
1218
1661
            assert before the loop makes that sure.
1219
1662
          */
1220
1663
 
1221
 
          if ((needs_escaping(*pos, enclosed) ||
 
1664
          if ((NEED_ESCAPING(*pos) ||
1222
1665
               (check_second_byte &&
1223
1666
                my_mbcharlen(character_set_client, (unsigned char) *pos) == 2 &&
1224
1667
                pos + 1 < end &&
1225
 
                needs_escaping(pos[1], enclosed))) &&
 
1668
                NEED_ESCAPING(pos[1]))) &&
1226
1669
              /*
1227
 
                Don't escape field_term_char by doubling - doubling is only
1228
 
                valid for ENCLOSED BY characters:
 
1670
               Don't escape field_term_char by doubling - doubling is only
 
1671
               valid for ENCLOSED BY characters:
1229
1672
              */
1230
1673
              (enclosed || !is_ambiguous_field_term ||
1231
1674
               (int) (unsigned char) *pos != field_term_char))
1232
1675
          {
1233
 
            char tmp_buff[2];
 
1676
            char tmp_buff[2];
1234
1677
            tmp_buff[0]= ((int) (unsigned char) *pos == field_sep_char &&
1235
1678
                          is_ambiguous_field_sep) ?
1236
 
              field_sep_char : escape_char;
1237
 
            tmp_buff[1]= *pos ? *pos : '0';
1238
 
            if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)) ||
1239
 
                my_b_write(cache,(unsigned char*) tmp_buff,2))
1240
 
              goto err;
1241
 
            start=pos+1;
1242
 
          }
1243
 
        }
1244
 
        if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)))
1245
 
          goto err;
 
1679
                          field_sep_char : escape_char;
 
1680
            tmp_buff[1]= *pos ? *pos : '0';
 
1681
            if (my_b_write(&cache,(unsigned char*) start,(uint) (pos-start)) ||
 
1682
                my_b_write(&cache,(unsigned char*) tmp_buff,2))
 
1683
              goto err;
 
1684
            start=pos+1;
 
1685
          }
 
1686
        }
 
1687
        if (my_b_write(&cache,(unsigned char*) start,(uint) (pos-start)))
 
1688
          goto err;
1246
1689
      }
1247
 
      else if (my_b_write(cache,(unsigned char*) res->ptr(),used_length))
1248
 
        goto err;
 
1690
      else if (my_b_write(&cache,(unsigned char*) res->ptr(),used_length))
 
1691
        goto err;
1249
1692
    }
1250
1693
    if (fixed_row_size)
1251
1694
    {                                           // Fill with space
1252
1695
      if (item->max_length > used_length)
1253
1696
      {
1254
 
        /* QQ:  Fix by adding a my_b_fill() function */
1255
 
        if (!space_inited)
1256
 
        {
1257
 
          space_inited=1;
1258
 
          memset(space, ' ', sizeof(space));
1259
 
        }
1260
 
        uint32_t length=item->max_length-used_length;
1261
 
        for (; length > sizeof(space) ; length-=sizeof(space))
1262
 
        {
1263
 
          if (my_b_write(cache,(unsigned char*) space,sizeof(space)))
1264
 
            goto err;
1265
 
        }
1266
 
        if (my_b_write(cache,(unsigned char*) space,length))
1267
 
          goto err;
 
1697
        /* QQ:  Fix by adding a my_b_fill() function */
 
1698
        if (!space_inited)
 
1699
        {
 
1700
          space_inited=1;
 
1701
          memset(space, ' ', sizeof(space));
 
1702
        }
 
1703
        uint32_t length=item->max_length-used_length;
 
1704
        for (; length > sizeof(space) ; length-=sizeof(space))
 
1705
        {
 
1706
          if (my_b_write(&cache,(unsigned char*) space,sizeof(space)))
 
1707
            goto err;
 
1708
        }
 
1709
        if (my_b_write(&cache,(unsigned char*) space,length))
 
1710
          goto err;
1268
1711
      }
1269
1712
    }
1270
1713
    if (res && enclosed)
1271
1714
    {
1272
 
      if (my_b_write(cache, (unsigned char*) exchange->enclosed->ptr(),
 
1715
      if (my_b_write(&cache, (unsigned char*) exchange->enclosed->ptr(),
1273
1716
                     exchange->enclosed->length()))
1274
1717
        goto err;
1275
1718
    }
1276
1719
    if (--items_left)
1277
1720
    {
1278
 
      if (my_b_write(cache, (unsigned char*) exchange->field_term->ptr(),
 
1721
      if (my_b_write(&cache, (unsigned char*) exchange->field_term->ptr(),
1279
1722
                     field_term_length))
1280
1723
        goto err;
1281
1724
    }
1282
1725
  }
1283
 
  if (my_b_write(cache,(unsigned char*) exchange->line_term->ptr(),
1284
 
                 exchange->line_term->length()))
 
1726
  if (my_b_write(&cache,(unsigned char*) exchange->line_term->ptr(),
 
1727
                 exchange->line_term->length()))
1285
1728
    goto err;
1286
1729
  return(0);
1287
1730
err:
1295
1738
 
1296
1739
 
1297
1740
int
1298
 
select_dump::prepare(List<Item> &, Select_Lex_Unit *u)
 
1741
select_dump::prepare(List<Item> &list __attribute__((unused)),
 
1742
                     SELECT_LEX_UNIT *u)
1299
1743
{
1300
1744
  unit= u;
1301
 
  return (int) ((file= create_file(session, path, exchange, cache)) < 0);
 
1745
  return (int) ((file= create_file(thd, path, exchange, &cache)) < 0);
1302
1746
}
1303
1747
 
1304
1748
 
1315
1759
    unit->offset_limit_cnt--;
1316
1760
    return(0);
1317
1761
  }
1318
 
  if (row_count++ > 1)
 
1762
  if (row_count++ > 1) 
1319
1763
  {
1320
1764
    my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0));
1321
1765
    goto err;
1325
1769
    res=item->str_result(&tmp);
1326
1770
    if (!res)                                   // If NULL
1327
1771
    {
1328
 
      if (my_b_write(cache,(unsigned char*) "",1))
 
1772
      if (my_b_write(&cache,(unsigned char*) "",1))
1329
1773
        goto err;
1330
1774
    }
1331
 
    else if (my_b_write(cache,(unsigned char*) res->ptr(),res->length()))
 
1775
    else if (my_b_write(&cache,(unsigned char*) res->ptr(),res->length()))
1332
1776
    {
1333
 
      my_error(ER_ERROR_ON_WRITE, MYF(0), path, errno);
 
1777
      my_error(ER_ERROR_ON_WRITE, MYF(0), path, my_errno);
1334
1778
      goto err;
1335
1779
    }
1336
1780
  }
1371
1815
void select_max_min_finder_subselect::cleanup()
1372
1816
{
1373
1817
  cache= 0;
 
1818
  return;
1374
1819
}
1375
1820
 
1376
1821
 
1477
1922
     sortcmp(val1, val2, cache->collation.collation) < 0);
1478
1923
}
1479
1924
 
1480
 
bool select_exists_subselect::send_data(List<Item> &)
 
1925
bool select_exists_subselect::send_data(List<Item> &items __attribute__((unused)))
1481
1926
{
1482
1927
  Item_exists_subselect *it= (Item_exists_subselect *)item;
1483
1928
  if (unit->offset_limit_cnt)
1490
1935
  return(0);
1491
1936
}
1492
1937
 
 
1938
 
 
1939
/***************************************************************************
 
1940
  Dump of select to variables
 
1941
***************************************************************************/
 
1942
 
 
1943
int select_dumpvar::prepare(List<Item> &list, SELECT_LEX_UNIT *u)
 
1944
{
 
1945
  unit= u;
 
1946
  
 
1947
  if (var_list.elements != list.elements)
 
1948
  {
 
1949
    my_message(ER_WRONG_NUMBER_OF_COLUMNS_IN_SELECT,
 
1950
               ER(ER_WRONG_NUMBER_OF_COLUMNS_IN_SELECT), MYF(0));
 
1951
    return 1;
 
1952
  }               
 
1953
  return 0;
 
1954
}
 
1955
 
 
1956
 
 
1957
bool select_dumpvar::check_simple_select() const
 
1958
{
 
1959
  my_error(ER_SP_BAD_CURSOR_SELECT, MYF(0));
 
1960
  return true;
 
1961
}
 
1962
 
 
1963
 
 
1964
void select_dumpvar::cleanup()
 
1965
{
 
1966
  row_count= 0;
 
1967
}
 
1968
 
 
1969
 
 
1970
void Query_arena::free_items()
 
1971
{
 
1972
  Item *next;
 
1973
  /* This works because items are allocated with sql_alloc() */
 
1974
  for (; free_list; free_list= next)
 
1975
  {
 
1976
    next= free_list->next;
 
1977
    free_list->delete_self();
 
1978
  }
 
1979
  /* Postcondition: free_list is 0 */
 
1980
  return;
 
1981
}
 
1982
 
 
1983
 
 
1984
/*
 
1985
  Statement functions
 
1986
*/
 
1987
 
 
1988
Statement::Statement(LEX *lex_arg, MEM_ROOT *mem_root_arg, ulong id_arg)
 
1989
  :Query_arena(mem_root_arg),
 
1990
  id(id_arg),
 
1991
  mark_used_columns(MARK_COLUMNS_READ),
 
1992
  lex(lex_arg),
 
1993
  query(0),
 
1994
  query_length(0),
 
1995
  db(NULL),
 
1996
  db_length(0)
 
1997
{
 
1998
}
 
1999
 
 
2000
 
1493
2001
/*
1494
2002
  Don't free mem_root, as mem_root is freed in the end of dispatch_command
1495
2003
  (once for any command).
1496
2004
*/
1497
 
void Session::end_statement()
 
2005
void THD::end_statement()
1498
2006
{
1499
2007
  /* Cleanup SQL processing state to reuse this statement in next query. */
1500
2008
  lex_end(lex);
1501
 
  query_cache_key= ""; // reset the cache key
1502
 
  resetResultsetMessage();
1503
2009
}
1504
2010
 
1505
 
bool Session::copy_db_to(char **p_db, size_t *p_db_length)
 
2011
 
 
2012
bool THD::copy_db_to(char **p_db, size_t *p_db_length)
1506
2013
{
1507
 
  if (db.empty())
 
2014
  if (db == NULL)
1508
2015
  {
1509
2016
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1510
2017
    return true;
1511
2018
  }
1512
 
  *p_db= strmake(db.c_str(), db.length());
1513
 
  *p_db_length= db.length();
 
2019
  *p_db= strmake(db, db_length);
 
2020
  *p_db_length= db_length;
1514
2021
  return false;
1515
2022
}
1516
2023
 
 
2024
 
 
2025
bool select_dumpvar::send_data(List<Item> &items)
 
2026
{
 
2027
  List_iterator_fast<my_var> var_li(var_list);
 
2028
  List_iterator<Item> it(items);
 
2029
  Item *item;
 
2030
  my_var *mv;
 
2031
 
 
2032
  if (unit->offset_limit_cnt)
 
2033
  {                                             // using limit offset,count
 
2034
    unit->offset_limit_cnt--;
 
2035
    return(0);
 
2036
  }
 
2037
  if (row_count++) 
 
2038
  {
 
2039
    my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0));
 
2040
    return(1);
 
2041
  }
 
2042
  while ((mv= var_li++) && (item= it++))
 
2043
  {
 
2044
    if (mv->local == 0)
 
2045
    {
 
2046
      Item_func_set_user_var *suv= new Item_func_set_user_var(mv->s, item);
 
2047
      suv->fix_fields(thd, 0);
 
2048
      suv->check(0);
 
2049
      suv->update();
 
2050
    }
 
2051
  }
 
2052
  return(thd->is_error());
 
2053
}
 
2054
 
 
2055
bool select_dumpvar::send_eof()
 
2056
{
 
2057
  if (! row_count)
 
2058
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
2059
                 ER_SP_FETCH_NO_DATA, ER(ER_SP_FETCH_NO_DATA));
 
2060
  /*
 
2061
    In order to remember the value of affected rows for ROW_COUNT()
 
2062
    function, SELECT INTO has to have an own SQLCOM.
 
2063
    TODO: split from SQLCOM_SELECT
 
2064
  */
 
2065
  ::my_ok(thd,row_count);
 
2066
  return 0;
 
2067
}
 
2068
 
1517
2069
/****************************************************************************
1518
 
  Tmp_Table_Param
 
2070
  TMP_TABLE_PARAM
1519
2071
****************************************************************************/
1520
2072
 
1521
 
void Tmp_Table_Param::init()
 
2073
void TMP_TABLE_PARAM::init()
1522
2074
{
1523
2075
  field_count= sum_func_count= func_count= hidden_field_count= 0;
1524
2076
  group_parts= group_length= group_null_parts= 0;
1525
2077
  quick_group= 1;
1526
2078
  table_charset= 0;
1527
2079
  precomputed_group_by= 0;
 
2080
  bit_fields_as_long= 0;
 
2081
  return;
1528
2082
}
1529
2083
 
1530
 
void Tmp_Table_Param::cleanup(void)
 
2084
 
 
2085
void thd_increment_bytes_sent(ulong length)
1531
2086
{
1532
 
  /* Fix for Intel compiler */
1533
 
  if (copy_field)
1534
 
  {
1535
 
    delete [] copy_field;
1536
 
    save_copy_field= copy_field= 0;
 
2087
  THD *thd=current_thd;
 
2088
  if (likely(thd != 0))
 
2089
  { /* current_thd==0 when close_connection() calls net_send_error() */
 
2090
    thd->status_var.bytes_sent+= length;
1537
2091
  }
1538
2092
}
1539
2093
 
1540
 
void Session::send_kill_message() const
 
2094
 
 
2095
void thd_increment_bytes_received(ulong length)
 
2096
{
 
2097
  current_thd->status_var.bytes_received+= length;
 
2098
}
 
2099
 
 
2100
 
 
2101
void thd_increment_net_big_packet_count(ulong length)
 
2102
{
 
2103
  current_thd->status_var.net_big_packet_count+= length;
 
2104
}
 
2105
 
 
2106
void THD::send_kill_message() const
1541
2107
{
1542
2108
  int err= killed_errno();
1543
2109
  if (err)
1544
2110
    my_message(err, ER(err), MYF(0));
1545
2111
}
1546
2112
 
1547
 
void Session::set_status_var_init()
 
2113
void THD::set_status_var_init()
1548
2114
{
1549
2115
  memset(&status_var, 0, sizeof(status_var));
1550
2116
}
1551
2117
 
1552
2118
 
1553
 
bool Session::set_db(const std::string &new_db)
1554
 
{
1555
 
  /* Do not reallocate memory if current chunk is big enough. */
1556
 
  if (new_db.length())
1557
 
    db= new_db;
1558
 
  else
1559
 
    db.clear();
1560
 
 
1561
 
  return false;
1562
 
}
1563
 
 
1564
 
 
1565
 
 
 
2119
void Security_context::init()
 
2120
{
 
2121
  user= ip= 0;
 
2122
}
 
2123
 
 
2124
 
 
2125
void Security_context::destroy()
 
2126
{
 
2127
  // If not pointer to constant
 
2128
  if (user)
 
2129
  {
 
2130
    free(user);
 
2131
    user= NULL;
 
2132
  }
 
2133
  if (ip)
 
2134
  {
 
2135
    free(ip);
 
2136
    ip= NULL;
 
2137
  }
 
2138
}
 
2139
 
 
2140
 
 
2141
void Security_context::skip_grants()
 
2142
{
 
2143
  /* privileges for the user are unknown everything is allowed */
 
2144
}
 
2145
 
 
2146
 
 
2147
/****************************************************************************
 
2148
  Handling of open and locked tables states.
 
2149
 
 
2150
  This is used when we want to open/lock (and then close) some tables when
 
2151
  we already have a set of tables open and locked. We use these methods for
 
2152
  access to mysql.proc table to find definitions of stored routines.
 
2153
****************************************************************************/
 
2154
 
 
2155
void THD::reset_n_backup_open_tables_state(Open_tables_state *backup)
 
2156
{
 
2157
  backup->set_open_tables_state(this);
 
2158
  reset_open_tables_state();
 
2159
  state_flags|= Open_tables_state::BACKUPS_AVAIL;
 
2160
  return;
 
2161
}
 
2162
 
 
2163
 
 
2164
void THD::restore_backup_open_tables_state(Open_tables_state *backup)
 
2165
{
 
2166
  /*
 
2167
    Before we will throw away current open tables state we want
 
2168
    to be sure that it was properly cleaned up.
 
2169
  */
 
2170
  assert(open_tables == 0 && temporary_tables == 0 &&
 
2171
              handler_tables == 0 && derived_tables == 0 &&
 
2172
              lock == 0 && locked_tables == 0);
 
2173
  set_open_tables_state(backup);
 
2174
  return;
 
2175
}
1566
2176
 
1567
2177
/**
1568
2178
  Check the killed state of a user thread
1569
 
  @param session  user thread
 
2179
  @param thd  user thread
1570
2180
  @retval 0 the user thread is active
1571
2181
  @retval 1 the user thread has been killed
1572
2182
*/
1573
 
int session_killed(const Session *session)
1574
 
{
1575
 
  return(session->killed);
1576
 
}
1577
 
 
1578
 
 
1579
 
const struct charset_info_st *session_charset(Session *session)
1580
 
{
1581
 
  return(session->charset());
1582
 
}
 
2183
extern "C" int thd_killed(const DRIZZLE_THD thd)
 
2184
{
 
2185
  return(thd->killed);
 
2186
}
 
2187
 
 
2188
/**
 
2189
  Return the thread id of a user thread
 
2190
  @param thd user thread
 
2191
  @return thread id
 
2192
*/
 
2193
extern "C" unsigned long thd_get_thread_id(const DRIZZLE_THD thd)
 
2194
{
 
2195
  return((unsigned long)thd->thread_id);
 
2196
}
 
2197
 
 
2198
 
 
2199
#ifdef INNODB_COMPATIBILITY_HOOKS
 
2200
extern "C" const struct charset_info_st *thd_charset(DRIZZLE_THD thd)
 
2201
{
 
2202
  return(thd->charset());
 
2203
}
 
2204
 
 
2205
extern "C" char **thd_query(DRIZZLE_THD thd)
 
2206
{
 
2207
  return(&thd->query);
 
2208
}
 
2209
 
 
2210
extern "C" int thd_slave_thread(const DRIZZLE_THD thd)
 
2211
{
 
2212
  return(thd->slave_thread);
 
2213
}
 
2214
 
 
2215
extern "C" int thd_non_transactional_update(const DRIZZLE_THD thd)
 
2216
{
 
2217
  return(thd->transaction.all.modified_non_trans_table);
 
2218
}
 
2219
 
 
2220
extern "C" int thd_binlog_format(const DRIZZLE_THD thd)
 
2221
{
 
2222
  return (int) thd->variables.binlog_format;
 
2223
}
 
2224
 
 
2225
extern "C" void thd_mark_transaction_to_rollback(DRIZZLE_THD thd, bool all)
 
2226
{
 
2227
  mark_transaction_to_rollback(thd, all);
 
2228
}
 
2229
#endif // INNODB_COMPATIBILITY_HOOKS */
 
2230
 
1583
2231
 
1584
2232
/**
1585
2233
  Mark transaction to rollback and mark error as fatal to a sub-statement.
1586
2234
 
1587
 
  @param  session   Thread handle
 
2235
  @param  thd   Thread handle
1588
2236
  @param  all   true <=> rollback main transaction.
1589
2237
*/
1590
 
void mark_transaction_to_rollback(Session *session, bool all)
1591
 
{
1592
 
  if (session)
1593
 
  {
1594
 
    session->is_fatal_sub_stmt_error= true;
1595
 
    session->transaction_rollback_request= all;
1596
 
  }
1597
 
}
1598
 
 
1599
 
void Session::disconnect(uint32_t errcode, bool should_lock)
1600
 
{
1601
 
  /* Allow any plugins to cleanup their session variables */
1602
 
  plugin_sessionvar_cleanup(this);
1603
 
 
1604
 
  /* If necessary, log any aborted or unauthorized connections */
1605
 
  if (killed || client->wasAborted())
1606
 
  {
1607
 
    status_var.aborted_threads++;
1608
 
  }
1609
 
 
1610
 
  if (client->wasAborted())
1611
 
  {
1612
 
    if (! killed && variables.log_warnings > 1)
1613
 
    {
1614
 
      SecurityContext *sctx= &security_ctx;
1615
 
 
1616
 
      errmsg_printf(ERRMSG_LVL_WARN, ER(ER_NEW_ABORTING_CONNECTION)
1617
 
                  , thread_id
1618
 
                  , (db.empty() ? "unconnected" : db.c_str())
1619
 
                  , sctx->getUser().empty() == false ? sctx->getUser().c_str() : "unauthenticated"
1620
 
                  , sctx->getIp().c_str()
1621
 
                  , (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1622
 
    }
1623
 
  }
1624
 
 
1625
 
  /* Close out our connection to the client */
1626
 
  if (should_lock)
1627
 
    LOCK_thread_count.lock();
1628
 
  killed= Session::KILL_CONNECTION;
1629
 
  if (client->isConnected())
1630
 
  {
1631
 
    if (errcode)
1632
 
    {
1633
 
      /*my_error(errcode, ER(errcode));*/
1634
 
      client->sendError(errcode, ER(errcode));
1635
 
    }
1636
 
    client->close();
1637
 
  }
1638
 
  if (should_lock)
1639
 
    (void) LOCK_thread_count.unlock();
1640
 
}
1641
 
 
1642
 
void Session::reset_for_next_command()
1643
 
{
1644
 
  free_list= 0;
1645
 
  select_number= 1;
1646
 
  /*
1647
 
    Those two lines below are theoretically unneeded as
1648
 
    Session::cleanup_after_query() should take care of this already.
1649
 
  */
1650
 
  auto_inc_intervals_in_cur_stmt_for_binlog.empty();
1651
 
 
1652
 
  is_fatal_error= false;
1653
 
  server_status&= ~ (SERVER_MORE_RESULTS_EXISTS |
1654
 
                          SERVER_QUERY_NO_INDEX_USED |
1655
 
                          SERVER_QUERY_NO_GOOD_INDEX_USED);
1656
 
 
1657
 
  clear_error();
1658
 
  main_da.reset_diagnostics_area();
1659
 
  total_warn_count=0;                   // Warnings for this query
1660
 
  sent_row_count= examined_row_count= 0;
1661
 
}
1662
 
 
1663
 
/*
1664
 
  Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
1665
 
*/
1666
 
 
1667
 
void Session::close_temporary_tables()
1668
 
{
1669
 
  Table *table;
1670
 
  Table *tmp_next;
1671
 
 
1672
 
  if (not temporary_tables)
1673
 
    return;
1674
 
 
1675
 
  for (table= temporary_tables; table; table= tmp_next)
1676
 
  {
1677
 
    tmp_next= table->getNext();
1678
 
    nukeTable(table);
1679
 
  }
1680
 
  temporary_tables= NULL;
1681
 
}
1682
 
 
1683
 
/*
1684
 
  unlink from session->temporary tables and close temporary table
1685
 
*/
1686
 
 
1687
 
void Session::close_temporary_table(Table *table)
1688
 
{
1689
 
  if (table->getPrev())
1690
 
  {
1691
 
    table->getPrev()->setNext(table->getNext());
1692
 
    if (table->getPrev()->getNext())
1693
 
    {
1694
 
      table->getNext()->setPrev(table->getPrev());
1695
 
    }
1696
 
  }
1697
 
  else
1698
 
  {
1699
 
    /* removing the item from the list */
1700
 
    assert(table == temporary_tables);
1701
 
    /*
1702
 
      slave must reset its temporary list pointer to zero to exclude
1703
 
      passing non-zero value to end_slave via rli->save_temporary_tables
1704
 
      when no temp tables opened, see an invariant below.
1705
 
    */
1706
 
    temporary_tables= table->getNext();
1707
 
    if (temporary_tables)
1708
 
    {
1709
 
      table->getNext()->setPrev(NULL);
1710
 
    }
1711
 
  }
1712
 
  nukeTable(table);
1713
 
}
1714
 
 
1715
 
/*
1716
 
  Close and drop a temporary table
1717
 
 
1718
 
  NOTE
1719
 
  This dosn't unlink table from session->temporary
1720
 
  If this is needed, use close_temporary_table()
1721
 
*/
1722
 
 
1723
 
void Session::nukeTable(Table *table)
1724
 
{
1725
 
  plugin::StorageEngine *table_type= table->getShare()->db_type();
1726
 
 
1727
 
  table->free_io_cache();
1728
 
  table->delete_table();
1729
 
 
1730
 
  TableIdentifier identifier(table->getShare()->getSchemaName(), table->getShare()->getTableName(), table->getShare()->getPath());
1731
 
  rm_temporary_table(table_type, identifier);
1732
 
 
1733
 
  delete table->getMutableShare();
1734
 
 
1735
 
  /* This makes me sad, but we're allocating it via malloc */
1736
 
  delete table;
1737
 
}
1738
 
 
1739
 
/** Clear most status variables. */
1740
 
extern time_t flush_status_time;
1741
 
 
1742
 
void Session::refresh_status()
1743
 
{
1744
 
  /* Reset thread's status variables */
1745
 
  memset(&status_var, 0, sizeof(status_var));
1746
 
 
1747
 
  flush_status_time= time((time_t*) 0);
1748
 
  current_global_counters.max_used_connections= 1; /* We set it to one, because we know we exist */
1749
 
  current_global_counters.connections= 0;
1750
 
}
1751
 
 
1752
 
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1753
 
{
1754
 
  user_var_entry *entry= NULL;
1755
 
  UserVarsRange ppp= user_vars.equal_range(std::string(name.str, name.length));
1756
 
 
1757
 
  for (UserVars::iterator iter= ppp.first;
1758
 
         iter != ppp.second; ++iter)
1759
 
  {
1760
 
    entry= (*iter).second;
1761
 
  }
1762
 
 
1763
 
  if ((entry == NULL) && create_if_not_exists)
1764
 
  {
1765
 
    entry= new (nothrow) user_var_entry(name.str, query_id);
1766
 
 
1767
 
    if (entry == NULL)
1768
 
      return NULL;
1769
 
 
1770
 
    std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(std::string(name.str, name.length), entry));
1771
 
 
1772
 
    if (not returnable.second)
1773
 
    {
1774
 
      delete entry;
1775
 
      return NULL;
1776
 
    }
1777
 
  }
1778
 
 
1779
 
  return entry;
1780
 
}
1781
 
 
1782
 
void Session::mark_temp_tables_as_free_for_reuse()
1783
 
{
1784
 
  for (Table *table= temporary_tables ; table ; table= table->getNext())
1785
 
  {
1786
 
    if (table->query_id == query_id)
1787
 
    {
1788
 
      table->query_id= 0;
1789
 
      table->cursor->ha_reset();
1790
 
    }
1791
 
  }
1792
 
}
1793
 
 
1794
 
void Session::mark_used_tables_as_free_for_reuse(Table *table)
1795
 
{
1796
 
  for (; table ; table= table->getNext())
1797
 
  {
1798
 
    if (table->query_id == query_id)
1799
 
    {
1800
 
      table->query_id= 0;
1801
 
      table->cursor->ha_reset();
1802
 
    }
1803
 
  }
1804
 
}
1805
 
 
1806
 
/*
1807
 
  Unlocks tables and frees derived tables.
1808
 
  Put all normal tables used by thread in free list.
1809
 
 
1810
 
  It will only close/mark as free for reuse tables opened by this
1811
 
  substatement, it will also check if we are closing tables after
1812
 
  execution of complete query (i.e. we are on upper level) and will
1813
 
  leave prelocked mode if needed.
1814
 
*/
1815
 
void Session::close_thread_tables()
1816
 
{
1817
 
  if (derived_tables)
1818
 
    derived_tables= NULL; // They should all be invalid by this point
1819
 
 
1820
 
  /*
1821
 
    Mark all temporary tables used by this statement as free for reuse.
1822
 
  */
1823
 
  mark_temp_tables_as_free_for_reuse();
1824
 
  /*
1825
 
    Let us commit transaction for statement. Since in 5.0 we only have
1826
 
    one statement transaction and don't allow several nested statement
1827
 
    transactions this call will do nothing if we are inside of stored
1828
 
    function or trigger (i.e. statement transaction is already active and
1829
 
    does not belong to statement for which we do close_thread_tables()).
1830
 
    TODO: This should be fixed in later releases.
1831
 
   */
1832
 
  {
1833
 
    TransactionServices &transaction_services= TransactionServices::singleton();
1834
 
    main_da.can_overwrite_status= true;
1835
 
    transaction_services.autocommitOrRollback(this, is_error());
1836
 
    main_da.can_overwrite_status= false;
1837
 
    transaction.stmt.reset();
1838
 
  }
1839
 
 
1840
 
  if (lock)
1841
 
  {
1842
 
    /*
1843
 
      For RBR we flush the pending event just before we unlock all the
1844
 
      tables.  This means that we are at the end of a topmost
1845
 
      statement, so we ensure that the STMT_END_F flag is set on the
1846
 
      pending event.  For statements that are *inside* stored
1847
 
      functions, the pending event will not be flushed: that will be
1848
 
      handled either before writing a query log event (inside
1849
 
      binlog_query()) or when preparing a pending event.
1850
 
     */
1851
 
    mysql_unlock_tables(this, lock);
1852
 
    lock= 0;
1853
 
  }
1854
 
  /*
1855
 
    Note that we need to hold LOCK_open while changing the
1856
 
    open_tables list. Another thread may work on it.
1857
 
    (See: remove_table_from_cache(), mysql_wait_completed_table())
1858
 
    Closing a MERGE child before the parent would be fatal if the
1859
 
    other thread tries to abort the MERGE lock in between.
1860
 
  */
1861
 
  if (open_tables)
1862
 
    close_open_tables();
1863
 
}
1864
 
 
1865
 
void Session::close_tables_for_reopen(TableList **tables)
1866
 
{
1867
 
  /*
1868
 
    If table list consists only from tables from prelocking set, table list
1869
 
    for new attempt should be empty, so we have to update list's root pointer.
1870
 
  */
1871
 
  if (lex->first_not_own_table() == *tables)
1872
 
    *tables= 0;
1873
 
  lex->chop_off_not_own_tables();
1874
 
  for (TableList *tmp= *tables; tmp; tmp= tmp->next_global)
1875
 
    tmp->table= 0;
1876
 
  close_thread_tables();
1877
 
}
1878
 
 
1879
 
bool Session::openTablesLock(TableList *tables)
1880
 
{
1881
 
  uint32_t counter;
1882
 
  bool need_reopen;
1883
 
 
1884
 
  for ( ; ; )
1885
 
  {
1886
 
    if (open_tables_from_list(&tables, &counter))
1887
 
      return true;
1888
 
 
1889
 
    if (not lock_tables(tables, counter, &need_reopen))
1890
 
      break;
1891
 
    if (not need_reopen)
1892
 
      return true;
1893
 
    close_tables_for_reopen(&tables);
1894
 
  }
1895
 
  if ((mysql_handle_derived(lex, &mysql_derived_prepare) ||
1896
 
       (fill_derived_tables() &&
1897
 
        mysql_handle_derived(lex, &mysql_derived_filling))))
1898
 
    return true;
1899
 
 
1900
 
  return false;
1901
 
}
1902
 
 
1903
 
bool Session::openTables(TableList *tables, uint32_t flags)
1904
 
{
1905
 
  uint32_t counter;
1906
 
  bool ret= fill_derived_tables();
1907
 
  assert(ret == false);
1908
 
  if (open_tables_from_list(&tables, &counter, flags) ||
1909
 
      mysql_handle_derived(lex, &mysql_derived_prepare))
1910
 
  {
1911
 
    return true;
1912
 
  }
1913
 
  return false;
1914
 
}
1915
 
 
1916
 
/*
1917
 
  @note "best_effort" is used in cases were if a failure occurred on this
1918
 
  operation it would not be surprising because we are only removing because there
1919
 
  might be an issue (lame engines).
1920
 
*/
1921
 
 
1922
 
bool Session::rm_temporary_table(TableIdentifier &identifier, bool best_effort)
1923
 
{
1924
 
  if (plugin::StorageEngine::dropTable(*this, identifier))
1925
 
  {
1926
 
    if (not best_effort)
1927
 
    {
1928
 
      errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
1929
 
                    identifier.getSQLPath().c_str(), errno);
1930
 
    }
1931
 
 
1932
 
    return true;
1933
 
  }
1934
 
 
1935
 
  return false;
1936
 
}
1937
 
 
1938
 
bool Session::rm_temporary_table(plugin::StorageEngine *base, TableIdentifier &identifier)
1939
 
{
1940
 
  assert(base);
1941
 
 
1942
 
  if (plugin::StorageEngine::dropTable(*this, *base, identifier))
1943
 
  {
1944
 
    errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
1945
 
                  identifier.getSQLPath().c_str(), errno);
1946
 
 
1947
 
    return true;
1948
 
  }
1949
 
 
1950
 
  return false;
1951
 
}
1952
 
 
1953
 
/**
1954
 
  @note this will be removed, I am looking through Hudson to see if it is finding
1955
 
  any tables that are missed during cleanup.
1956
 
*/
1957
 
void Session::dumpTemporaryTableNames(const char *foo)
1958
 
{
1959
 
  Table *table;
1960
 
 
1961
 
  if (not temporary_tables)
1962
 
    return;
1963
 
 
1964
 
  cerr << "Begin Run: " << foo << "\n";
1965
 
  for (table= temporary_tables; table; table= table->getNext())
1966
 
  {
1967
 
    bool have_proto= false;
1968
 
 
1969
 
    message::Table *proto= table->getShare()->getTableProto();
1970
 
    if (table->getShare()->getTableProto())
1971
 
      have_proto= true;
1972
 
 
1973
 
    const char *answer= have_proto ? "true" : "false";
1974
 
 
1975
 
    if (have_proto)
1976
 
    {
1977
 
      cerr << "\tTable Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
1978
 
      cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
1979
 
    }
1980
 
    else
1981
 
      cerr << "\tTabl;e Name " << table->getShare()->getSchemaName() << "." << table->getShare()->getTableName() << " : " << answer << "\n";
1982
 
  }
1983
 
}
1984
 
 
1985
 
bool Session::storeTableMessage(const TableIdentifier &identifier, message::Table &table_message)
1986
 
{
1987
 
  table_message_cache.insert(make_pair(identifier.getPath(), table_message));
1988
 
 
1989
 
  return true;
1990
 
}
1991
 
 
1992
 
bool Session::removeTableMessage(const TableIdentifier &identifier)
1993
 
{
1994
 
  TableMessageCache::iterator iter;
1995
 
 
1996
 
  iter= table_message_cache.find(identifier.getPath());
1997
 
 
1998
 
  if (iter == table_message_cache.end())
1999
 
    return false;
2000
 
 
2001
 
  table_message_cache.erase(iter);
2002
 
 
2003
 
  return true;
2004
 
}
2005
 
 
2006
 
bool Session::getTableMessage(const TableIdentifier &identifier, message::Table &table_message)
2007
 
{
2008
 
  TableMessageCache::iterator iter;
2009
 
 
2010
 
  iter= table_message_cache.find(identifier.getPath());
2011
 
 
2012
 
  if (iter == table_message_cache.end())
2013
 
    return false;
2014
 
 
2015
 
  table_message.CopyFrom(((*iter).second));
2016
 
 
2017
 
  return true;
2018
 
}
2019
 
 
2020
 
bool Session::doesTableMessageExist(const TableIdentifier &identifier)
2021
 
{
2022
 
  TableMessageCache::iterator iter;
2023
 
 
2024
 
  iter= table_message_cache.find(identifier.getPath());
2025
 
 
2026
 
  if (iter == table_message_cache.end())
2027
 
  {
2028
 
    return false;
2029
 
  }
2030
 
 
2031
 
  return true;
2032
 
}
2033
 
 
2034
 
bool Session::renameTableMessage(const TableIdentifier &from, const TableIdentifier &to)
2035
 
{
2036
 
  TableMessageCache::iterator iter;
2037
 
 
2038
 
  table_message_cache[to.getPath()]= table_message_cache[from.getPath()];
2039
 
 
2040
 
  iter= table_message_cache.find(to.getPath());
2041
 
 
2042
 
  if (iter == table_message_cache.end())
2043
 
  {
2044
 
    return false;
2045
 
  }
2046
 
 
2047
 
  (*iter).second.set_schema(to.getSchemaName());
2048
 
  (*iter).second.set_name(to.getTableName());
2049
 
 
2050
 
  return true;
2051
 
}
2052
 
 
2053
 
TableShareInstance *Session::getTemporaryShare(TableIdentifier::Type type_arg)
2054
 
{
2055
 
  temporary_shares.push_back(new TableShareInstance(type_arg)); // This will not go into the tableshare cache, so no key is used.
2056
 
 
2057
 
  TableShareInstance *tmp_share= temporary_shares.back();
2058
 
 
2059
 
  assert(tmp_share);
2060
 
 
2061
 
  return tmp_share;
2062
 
}
2063
 
 
2064
 
} /* namespace drizzled */
 
2238
 
 
2239
void mark_transaction_to_rollback(THD *thd, bool all)
 
2240
{
 
2241
  if (thd)
 
2242
  {
 
2243
    thd->is_fatal_sub_stmt_error= true;
 
2244
    thd->transaction_rollback_request= all;
 
2245
  }
 
2246
}
 
2247
/***************************************************************************
 
2248
  Handling of XA id cacheing
 
2249
***************************************************************************/
 
2250
 
 
2251
pthread_mutex_t LOCK_xid_cache;
 
2252
HASH xid_cache;
 
2253
 
 
2254
extern "C" unsigned char *xid_get_hash_key(const unsigned char *, size_t *, bool);
 
2255
extern "C" void xid_free_hash(void *);
 
2256
 
 
2257
unsigned char *xid_get_hash_key(const unsigned char *ptr, size_t *length,
 
2258
                        bool not_used __attribute__((unused)))
 
2259
{
 
2260
  *length=((XID_STATE*)ptr)->xid.key_length();
 
2261
  return ((XID_STATE*)ptr)->xid.key();
 
2262
}
 
2263
 
 
2264
void xid_free_hash(void *ptr)
 
2265
{
 
2266
  if (!((XID_STATE*)ptr)->in_thd)
 
2267
    free((unsigned char*)ptr);
 
2268
}
 
2269
 
 
2270
bool xid_cache_init()
 
2271
{
 
2272
  pthread_mutex_init(&LOCK_xid_cache, MY_MUTEX_INIT_FAST);
 
2273
  return hash_init(&xid_cache, &my_charset_bin, 100, 0, 0,
 
2274
                   xid_get_hash_key, xid_free_hash, 0) != 0;
 
2275
}
 
2276
 
 
2277
void xid_cache_free()
 
2278
{
 
2279
  if (hash_inited(&xid_cache))
 
2280
  {
 
2281
    hash_free(&xid_cache);
 
2282
    pthread_mutex_destroy(&LOCK_xid_cache);
 
2283
  }
 
2284
}
 
2285
 
 
2286
XID_STATE *xid_cache_search(XID *xid)
 
2287
{
 
2288
  pthread_mutex_lock(&LOCK_xid_cache);
 
2289
  XID_STATE *res=(XID_STATE *)hash_search(&xid_cache, xid->key(), xid->key_length());
 
2290
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2291
  return res;
 
2292
}
 
2293
 
 
2294
 
 
2295
bool xid_cache_insert(XID *xid, enum xa_states xa_state)
 
2296
{
 
2297
  XID_STATE *xs;
 
2298
  bool res;
 
2299
  pthread_mutex_lock(&LOCK_xid_cache);
 
2300
  if (hash_search(&xid_cache, xid->key(), xid->key_length()))
 
2301
    res=0;
 
2302
  else if (!(xs=(XID_STATE *)my_malloc(sizeof(*xs), MYF(MY_WME))))
 
2303
    res=1;
 
2304
  else
 
2305
  {
 
2306
    xs->xa_state=xa_state;
 
2307
    xs->xid.set(xid);
 
2308
    xs->in_thd=0;
 
2309
    res=my_hash_insert(&xid_cache, (unsigned char*)xs);
 
2310
  }
 
2311
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2312
  return res;
 
2313
}
 
2314
 
 
2315
 
 
2316
bool xid_cache_insert(XID_STATE *xid_state)
 
2317
{
 
2318
  pthread_mutex_lock(&LOCK_xid_cache);
 
2319
  assert(hash_search(&xid_cache, xid_state->xid.key(),
 
2320
                          xid_state->xid.key_length())==0);
 
2321
  bool res=my_hash_insert(&xid_cache, (unsigned char*)xid_state);
 
2322
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2323
  return res;
 
2324
}
 
2325
 
 
2326
 
 
2327
void xid_cache_delete(XID_STATE *xid_state)
 
2328
{
 
2329
  pthread_mutex_lock(&LOCK_xid_cache);
 
2330
  hash_delete(&xid_cache, (unsigned char *)xid_state);
 
2331
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2332
}
 
2333
 
 
2334
/*
 
2335
  Implementation of interface to write rows to the binary log through the
 
2336
  thread.  The thread is responsible for writing the rows it has
 
2337
  inserted/updated/deleted.
 
2338
*/
 
2339
 
 
2340
 
 
2341
/*
 
2342
  Template member function for ensuring that there is an rows log
 
2343
  event of the apropriate type before proceeding.
 
2344
 
 
2345
  PRE CONDITION:
 
2346
    - Events of type 'RowEventT' have the type code 'type_code'.
 
2347
    
 
2348
  POST CONDITION:
 
2349
    If a non-NULL pointer is returned, the pending event for thread 'thd' will
 
2350
    be an event of type 'RowEventT' (which have the type code 'type_code')
 
2351
    will either empty or have enough space to hold 'needed' bytes.  In
 
2352
    addition, the columns bitmap will be correct for the row, meaning that
 
2353
    the pending event will be flushed if the columns in the event differ from
 
2354
    the columns suppled to the function.
 
2355
 
 
2356
  RETURNS
 
2357
    If no error, a non-NULL pending event (either one which already existed or
 
2358
    the newly created one).
 
2359
    If error, NULL.
 
2360
 */
 
2361
 
 
2362
template <class RowsEventT> Rows_log_event* 
 
2363
THD::binlog_prepare_pending_rows_event(Table* table, uint32_t serv_id,
 
2364
                                       size_t needed,
 
2365
                                       bool is_transactional,
 
2366
                                       RowsEventT *hint __attribute__((unused)))
 
2367
{
 
2368
  /* Pre-conditions */
 
2369
  assert(table->s->table_map_id != UINT32_MAX);
 
2370
 
 
2371
  /* Fetch the type code for the RowsEventT template parameter */
 
2372
  int const type_code= RowsEventT::TYPE_CODE;
 
2373
 
 
2374
  /*
 
2375
    There is no good place to set up the transactional data, so we
 
2376
    have to do it here.
 
2377
  */
 
2378
  if (binlog_setup_trx_data())
 
2379
    return(NULL);
 
2380
 
 
2381
  Rows_log_event* pending= binlog_get_pending_rows_event();
 
2382
 
 
2383
  if (unlikely(pending && !pending->is_valid()))
 
2384
    return(NULL);
 
2385
 
 
2386
  /*
 
2387
    Check if the current event is non-NULL and a write-rows
 
2388
    event. Also check if the table provided is mapped: if it is not,
 
2389
    then we have switched to writing to a new table.
 
2390
    If there is no pending event, we need to create one. If there is a pending
 
2391
    event, but it's not about the same table id, or not of the same type
 
2392
    (between Write, Update and Delete), or not the same affected columns, or
 
2393
    going to be too big, flush this event to disk and create a new pending
 
2394
    event.
 
2395
 
 
2396
    The last test is necessary for the Cluster injector to work
 
2397
    correctly. The reason is that the Cluster can inject two write
 
2398
    rows with different column bitmaps if there is an insert followed
 
2399
    by an update in the same transaction, and these are grouped into a
 
2400
    single epoch/transaction when fed to the injector.
 
2401
 
 
2402
    TODO: Fix the code so that the last test can be removed.
 
2403
  */
 
2404
  if (!pending ||
 
2405
      pending->server_id != serv_id || 
 
2406
      pending->get_table_id() != table->s->table_map_id ||
 
2407
      pending->get_type_code() != type_code || 
 
2408
      pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
 
2409
      !bitmap_cmp(pending->get_cols(), table->write_set))
 
2410
    {
 
2411
    /* Create a new RowsEventT... */
 
2412
    Rows_log_event* const
 
2413
        ev= new RowsEventT(this, table, table->s->table_map_id,
 
2414
                           is_transactional);
 
2415
    if (unlikely(!ev))
 
2416
      return(NULL);
 
2417
    ev->server_id= serv_id; // I don't like this, it's too easy to forget.
 
2418
    /*
 
2419
      flush the pending event and replace it with the newly created
 
2420
      event...
 
2421
    */
 
2422
    if (unlikely(mysql_bin_log.flush_and_set_pending_rows_event(this, ev)))
 
2423
    {
 
2424
      delete ev;
 
2425
      return(NULL);
 
2426
    }
 
2427
 
 
2428
    return(ev);               /* This is the new pending event */
 
2429
  }
 
2430
  return(pending);        /* This is the current pending event */
 
2431
}
 
2432
 
 
2433
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
2434
/*
 
2435
  Instantiate the versions we need, we have -fno-implicit-template as
 
2436
  compiling option.
 
2437
*/
 
2438
template Rows_log_event*
 
2439
THD::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
 
2440
                                       Write_rows_log_event*);
 
2441
 
 
2442
template Rows_log_event*
 
2443
THD::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
 
2444
                                       Delete_rows_log_event *);
 
2445
 
 
2446
template Rows_log_event* 
 
2447
THD::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
 
2448
                                       Update_rows_log_event *);
 
2449
#endif
 
2450
 
 
2451
namespace {
 
2452
  /**
 
2453
     Class to handle temporary allocation of memory for row data.
 
2454
 
 
2455
     The responsibilities of the class is to provide memory for
 
2456
     packing one or two rows of packed data (depending on what
 
2457
     constructor is called).
 
2458
 
 
2459
     In order to make the allocation more efficient for "simple" rows,
 
2460
     i.e., rows that do not contain any blobs, a pointer to the
 
2461
     allocated memory is of memory is stored in the table structure
 
2462
     for simple rows.  If memory for a table containing a blob field
 
2463
     is requested, only memory for that is allocated, and subsequently
 
2464
     released when the object is destroyed.
 
2465
 
 
2466
   */
 
2467
  class Row_data_memory {
 
2468
  public:
 
2469
    /**
 
2470
      Build an object to keep track of a block-local piece of memory
 
2471
      for storing a row of data.
 
2472
 
 
2473
      @param table
 
2474
      Table where the pre-allocated memory is stored.
 
2475
 
 
2476
      @param length
 
2477
      Length of data that is needed, if the record contain blobs.
 
2478
     */
 
2479
    Row_data_memory(Table *table, size_t const len1)
 
2480
      : m_memory(0)
 
2481
    {
 
2482
      m_alloc_checked= false;
 
2483
      allocate_memory(table, len1);
 
2484
      m_ptr[0]= has_memory() ? m_memory : 0;
 
2485
      m_ptr[1]= 0;
 
2486
    }
 
2487
 
 
2488
    Row_data_memory(Table *table, size_t const len1, size_t const len2)
 
2489
      : m_memory(0)
 
2490
    {
 
2491
      m_alloc_checked= false;
 
2492
      allocate_memory(table, len1 + len2);
 
2493
      m_ptr[0]= has_memory() ? m_memory        : 0;
 
2494
      m_ptr[1]= has_memory() ? m_memory + len1 : 0;
 
2495
    }
 
2496
 
 
2497
    ~Row_data_memory()
 
2498
    {
 
2499
      if (m_memory != 0 && m_release_memory_on_destruction)
 
2500
        free((unsigned char*) m_memory);
 
2501
    }
 
2502
 
 
2503
    /**
 
2504
       Is there memory allocated?
 
2505
 
 
2506
       @retval true There is memory allocated
 
2507
       @retval false Memory allocation failed
 
2508
     */
 
2509
    bool has_memory() const {
 
2510
      m_alloc_checked= true;
 
2511
      return m_memory != 0;
 
2512
    }
 
2513
 
 
2514
    unsigned char *slot(uint32_t s)
 
2515
    {
 
2516
      assert(s < sizeof(m_ptr)/sizeof(*m_ptr));
 
2517
      assert(m_ptr[s] != 0);
 
2518
      assert(m_alloc_checked == true);
 
2519
      return m_ptr[s];
 
2520
    }
 
2521
 
 
2522
  private:
 
2523
    void allocate_memory(Table *const table, size_t const total_length)
 
2524
    {
 
2525
      if (table->s->blob_fields == 0)
 
2526
      {
 
2527
        /*
 
2528
          The maximum length of a packed record is less than this
 
2529
          length. We use this value instead of the supplied length
 
2530
          when allocating memory for records, since we don't know how
 
2531
          the memory will be used in future allocations.
 
2532
 
 
2533
          Since table->s->reclength is for unpacked records, we have
 
2534
          to add two bytes for each field, which can potentially be
 
2535
          added to hold the length of a packed field.
 
2536
        */
 
2537
        size_t const maxlen= table->s->reclength + 2 * table->s->fields;
 
2538
 
 
2539
        /*
 
2540
          Allocate memory for two records if memory hasn't been
 
2541
          allocated. We allocate memory for two records so that it can
 
2542
          be used when processing update rows as well.
 
2543
        */
 
2544
        if (table->write_row_record == 0)
 
2545
          table->write_row_record=
 
2546
            (unsigned char *) alloc_root(&table->mem_root, 2 * maxlen);
 
2547
        m_memory= table->write_row_record;
 
2548
        m_release_memory_on_destruction= false;
 
2549
      }
 
2550
      else
 
2551
      {
 
2552
        m_memory= (unsigned char *) my_malloc(total_length, MYF(MY_WME));
 
2553
        m_release_memory_on_destruction= true;
 
2554
      }
 
2555
    }
 
2556
 
 
2557
    mutable bool m_alloc_checked;
 
2558
    bool m_release_memory_on_destruction;
 
2559
    unsigned char *m_memory;
 
2560
    unsigned char *m_ptr[2];
 
2561
  };
 
2562
}
 
2563
 
 
2564
 
 
2565
int THD::binlog_write_row(Table* table, bool is_trans, 
 
2566
                          unsigned char const *record) 
 
2567
 
2568
  assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
 
2569
 
 
2570
  /*
 
2571
    Pack records into format for transfer. We are allocating more
 
2572
    memory than needed, but that doesn't matter.
 
2573
  */
 
2574
  Row_data_memory memory(table, table->max_row_length(record));
 
2575
  if (!memory.has_memory())
 
2576
    return HA_ERR_OUT_OF_MEM;
 
2577
 
 
2578
  unsigned char *row_data= memory.slot(0);
 
2579
 
 
2580
  size_t const len= pack_row(table, table->write_set, row_data, record);
 
2581
 
 
2582
  Rows_log_event* const ev=
 
2583
    binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
 
2584
                                      static_cast<Write_rows_log_event*>(0));
 
2585
 
 
2586
  if (unlikely(ev == 0))
 
2587
    return HA_ERR_OUT_OF_MEM;
 
2588
 
 
2589
  return ev->add_row_data(row_data, len);
 
2590
}
 
2591
 
 
2592
int THD::binlog_update_row(Table* table, bool is_trans,
 
2593
                           const unsigned char *before_record,
 
2594
                           const unsigned char *after_record)
 
2595
 
2596
  assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
 
2597
 
 
2598
  size_t const before_maxlen = table->max_row_length(before_record);
 
2599
  size_t const after_maxlen  = table->max_row_length(after_record);
 
2600
 
 
2601
  Row_data_memory row_data(table, before_maxlen, after_maxlen);
 
2602
  if (!row_data.has_memory())
 
2603
    return HA_ERR_OUT_OF_MEM;
 
2604
 
 
2605
  unsigned char *before_row= row_data.slot(0);
 
2606
  unsigned char *after_row= row_data.slot(1);
 
2607
 
 
2608
  size_t const before_size= pack_row(table, table->read_set, before_row,
 
2609
                                        before_record);
 
2610
  size_t const after_size= pack_row(table, table->write_set, after_row,
 
2611
                                       after_record);
 
2612
 
 
2613
  Rows_log_event* const ev=
 
2614
    binlog_prepare_pending_rows_event(table, server_id,
 
2615
                                      before_size + after_size, is_trans,
 
2616
                                      static_cast<Update_rows_log_event*>(0));
 
2617
 
 
2618
  if (unlikely(ev == 0))
 
2619
    return HA_ERR_OUT_OF_MEM;
 
2620
 
 
2621
  return
 
2622
    ev->add_row_data(before_row, before_size) ||
 
2623
    ev->add_row_data(after_row, after_size);
 
2624
}
 
2625
 
 
2626
int THD::binlog_delete_row(Table* table, bool is_trans, 
 
2627
                           unsigned char const *record)
 
2628
 
2629
  assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
 
2630
 
 
2631
  /* 
 
2632
     Pack records into format for transfer. We are allocating more
 
2633
     memory than needed, but that doesn't matter.
 
2634
  */
 
2635
  Row_data_memory memory(table, table->max_row_length(record));
 
2636
  if (unlikely(!memory.has_memory()))
 
2637
    return HA_ERR_OUT_OF_MEM;
 
2638
 
 
2639
  unsigned char *row_data= memory.slot(0);
 
2640
 
 
2641
  size_t const len= pack_row(table, table->read_set, row_data, record);
 
2642
 
 
2643
  Rows_log_event* const ev=
 
2644
    binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
 
2645
                                      static_cast<Delete_rows_log_event*>(0));
 
2646
 
 
2647
  if (unlikely(ev == 0))
 
2648
    return HA_ERR_OUT_OF_MEM;
 
2649
 
 
2650
  return ev->add_row_data(row_data, len);
 
2651
}
 
2652
 
 
2653
 
 
2654
int THD::binlog_flush_pending_rows_event(bool stmt_end)
 
2655
{
 
2656
  /*
 
2657
    We shall flush the pending event even if we are not in row-based
 
2658
    mode: it might be the case that we left row-based mode before
 
2659
    flushing anything (e.g., if we have explicitly locked tables).
 
2660
   */
 
2661
  if (!mysql_bin_log.is_open())
 
2662
    return(0);
 
2663
 
 
2664
  /*
 
2665
    Mark the event as the last event of a statement if the stmt_end
 
2666
    flag is set.
 
2667
  */
 
2668
  int error= 0;
 
2669
  if (Rows_log_event *pending= binlog_get_pending_rows_event())
 
2670
  {
 
2671
    if (stmt_end)
 
2672
    {
 
2673
      pending->set_flags(Rows_log_event::STMT_END_F);
 
2674
      pending->flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
 
2675
      binlog_table_maps= 0;
 
2676
    }
 
2677
 
 
2678
    error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0);
 
2679
  }
 
2680
 
 
2681
  return(error);
 
2682
}
 
2683
 
 
2684
 
 
2685
/*
 
2686
  Member function that will log query, either row-based or
 
2687
  statement-based depending on the value of the 'current_stmt_binlog_row_based'
 
2688
  the value of the 'qtype' flag.
 
2689
 
 
2690
  This function should be called after the all calls to ha_*_row()
 
2691
  functions have been issued, but before tables are unlocked and
 
2692
  closed.
 
2693
 
 
2694
  OBSERVE
 
2695
    There shall be no writes to any system table after calling
 
2696
    binlog_query(), so these writes has to be moved to before the call
 
2697
    of binlog_query() for correct functioning.
 
2698
 
 
2699
    This is necessesary not only for RBR, but the master might crash
 
2700
    after binlogging the query but before changing the system tables.
 
2701
    This means that the slave and the master are not in the same state
 
2702
    (after the master has restarted), so therefore we have to
 
2703
    eliminate this problem.
 
2704
 
 
2705
  RETURN VALUE
 
2706
    Error code, or 0 if no error.
 
2707
*/
 
2708
int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
 
2709
                      ulong query_len, bool is_trans, bool suppress_use,
 
2710
                      THD::killed_state killed_status_arg)
 
2711
{
 
2712
  assert(query_arg && mysql_bin_log.is_open());
 
2713
 
 
2714
  if (int error= binlog_flush_pending_rows_event(true))
 
2715
    return(error);
 
2716
 
 
2717
  /*
 
2718
    If we are in statement mode and trying to log an unsafe statement,
 
2719
    we should print a warning.
 
2720
  */
 
2721
  if (lex->is_stmt_unsafe() &&
 
2722
      variables.binlog_format == BINLOG_FORMAT_STMT)
 
2723
  {
 
2724
    assert(this->query != NULL);
 
2725
    push_warning(this, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
2726
                 ER_BINLOG_UNSAFE_STATEMENT,
 
2727
                 ER(ER_BINLOG_UNSAFE_STATEMENT));
 
2728
    if (!(binlog_flags & BINLOG_FLAG_UNSAFE_STMT_PRINTED))
 
2729
    {
 
2730
      char warn_buf[DRIZZLE_ERRMSG_SIZE];
 
2731
      snprintf(warn_buf, DRIZZLE_ERRMSG_SIZE, "%s Statement: %s",
 
2732
               ER(ER_BINLOG_UNSAFE_STATEMENT), this->query);
 
2733
      sql_print_warning(warn_buf);
 
2734
      binlog_flags|= BINLOG_FLAG_UNSAFE_STMT_PRINTED;
 
2735
    }
 
2736
  }
 
2737
 
 
2738
  switch (qtype) {
 
2739
  case THD::ROW_QUERY_TYPE:
 
2740
    if (current_stmt_binlog_row_based)
 
2741
      return(0);
 
2742
    /* Otherwise, we fall through */
 
2743
  case THD::DRIZZLE_QUERY_TYPE:
 
2744
    /*
 
2745
      Using this query type is a conveniece hack, since we have been
 
2746
      moving back and forth between using RBR for replication of
 
2747
      system tables and not using it.
 
2748
 
 
2749
      Make sure to change in check_table_binlog_row_based() according
 
2750
      to how you treat this.
 
2751
    */
 
2752
  case THD::STMT_QUERY_TYPE:
 
2753
    /*
 
2754
      The DRIZZLE_LOG::write() function will set the STMT_END_F flag and
 
2755
      flush the pending rows event if necessary.
 
2756
     */
 
2757
    {
 
2758
      Query_log_event qinfo(this, query_arg, query_len, is_trans, suppress_use,
 
2759
                            killed_status_arg);
 
2760
      qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
 
2761
      /*
 
2762
        Binlog table maps will be irrelevant after a Query_log_event
 
2763
        (they are just removed on the slave side) so after the query
 
2764
        log event is written to the binary log, we pretend that no
 
2765
        table maps were written.
 
2766
       */
 
2767
      int error= mysql_bin_log.write(&qinfo);
 
2768
      binlog_table_maps= 0;
 
2769
      return(error);
 
2770
    }
 
2771
    break;
 
2772
 
 
2773
  case THD::QUERY_TYPE_COUNT:
 
2774
  default:
 
2775
    assert(0 <= qtype && qtype < QUERY_TYPE_COUNT);
 
2776
  }
 
2777
  return(0);
 
2778
}
 
2779
 
 
2780
bool Discrete_intervals_list::append(uint64_t start, uint64_t val,
 
2781
                                 uint64_t incr)
 
2782
{
 
2783
  /* first, see if this can be merged with previous */
 
2784
  if ((head == NULL) || tail->merge_if_contiguous(start, val, incr))
 
2785
  {
 
2786
    /* it cannot, so need to add a new interval */
 
2787
    Discrete_interval *new_interval= new Discrete_interval(start, val, incr);
 
2788
    return(append(new_interval));
 
2789
  }
 
2790
  return(0);
 
2791
}
 
2792
 
 
2793
bool Discrete_intervals_list::append(Discrete_interval *new_interval)
 
2794
{
 
2795
  if (unlikely(new_interval == NULL))
 
2796
    return(1);
 
2797
  if (head == NULL)
 
2798
    head= current= new_interval;
 
2799
  else
 
2800
    tail->next= new_interval;
 
2801
  tail= new_interval;
 
2802
  elements++;
 
2803
  return(0);
 
2804
}