~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_class.cc

Cleanup around SAFEMALLOC

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2008 Sun Microsystems
5
 
 *
6
 
 *  This program is free software; you can redistribute it and/or modify
7
 
 *  it under the terms of the GNU General Public License as published by
8
 
 *  the Free Software Foundation; version 2 of the License.
9
 
 *
10
 
 *  This program is distributed in the hope that it will be useful,
11
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 *  GNU General Public License for more details.
14
 
 *
15
 
 *  You should have received a copy of the GNU General Public License
16
 
 *  along with this program; if not, write to the Free Software
17
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
 
 */
19
 
 
20
 
/**
21
 
 * @file Implementation of the Session class and API
22
 
 */
23
 
 
24
 
#include "config.h"
25
 
#include <drizzled/session.h>
26
 
#include "drizzled/session_list.h"
 
1
/* Copyright (C) 2000-2006 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
/*****************************************************************************
 
18
**
 
19
** This file implements classes defined in sql_class.h
 
20
** Especially the classes to handle a result from a select
 
21
**
 
22
*****************************************************************************/
 
23
#include <drizzled/server_includes.h>
 
24
#include "rpl_rli.h"
 
25
#include "rpl_record.h"
 
26
#include "log_event.h"
27
27
#include <sys/stat.h>
28
 
#include <drizzled/error.h>
29
 
#include <drizzled/gettext.h>
30
 
#include <drizzled/query_id.h>
31
 
#include <drizzled/data_home.h>
32
 
#include <drizzled/sql_base.h>
33
 
#include <drizzled/lock.h>
34
 
#include <drizzled/item/cache.h>
35
 
#include <drizzled/item/float.h>
36
 
#include <drizzled/item/return_int.h>
37
 
#include <drizzled/item/empty_string.h>
38
 
#include <drizzled/show.h>
39
 
#include <drizzled/plugin/client.h>
40
 
#include "drizzled/plugin/scheduler.h"
41
 
#include "drizzled/plugin/authentication.h"
42
 
#include "drizzled/plugin/logging.h"
43
 
#include "drizzled/plugin/transactional_storage_engine.h"
44
 
#include "drizzled/probes.h"
45
 
#include "drizzled/table_proto.h"
46
 
#include "drizzled/db.h"
47
 
#include "drizzled/pthread_globals.h"
48
 
#include "drizzled/transaction_services.h"
49
 
 
50
 
#include "plugin/myisam/myisam.h"
51
 
#include "drizzled/internal/iocache.h"
52
 
 
53
 
#include <fcntl.h>
54
 
#include <algorithm>
55
 
#include <climits>
56
 
 
57
 
using namespace std;
58
 
namespace drizzled
59
 
{
60
 
 
61
 
extern "C"
62
 
{
63
 
  unsigned char *get_var_key(user_var_entry *entry, size_t *length, bool );
64
 
  void free_user_var(user_var_entry *entry);
65
 
}
 
28
#include <mysys/thr_alarm.h>
 
29
#include <mysys/mysys_err.h>
 
30
#include <drizzled/drizzled_error_messages.h>
66
31
 
67
32
/*
68
33
  The following is used to initialise Table_ident with a internal
71
36
char internal_table_name[2]= "*";
72
37
char empty_c_string[1]= {0};    /* used for not defined db */
73
38
 
74
 
const char * const Session::DEFAULT_WHERE= "field list";
75
 
extern pthread_key_t THR_Session;
76
 
extern pthread_key_t THR_Mem_root;
77
 
extern uint32_t max_used_connections;
78
 
extern atomic<uint32_t> connection_count;
79
 
 
 
39
const char * const THD::DEFAULT_WHERE= "field list";
 
40
 
 
41
 
 
42
/*****************************************************************************
 
43
** Instansiate templates
 
44
*****************************************************************************/
 
45
 
 
46
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
47
/* Used templates */
 
48
template class List<Key>;
 
49
template class List_iterator<Key>;
 
50
template class List<Key_part_spec>;
 
51
template class List_iterator<Key_part_spec>;
 
52
template class List<Alter_drop>;
 
53
template class List_iterator<Alter_drop>;
 
54
template class List<Alter_column>;
 
55
template class List_iterator<Alter_column>;
 
56
#endif
80
57
 
81
58
/****************************************************************************
82
59
** User variables
83
60
****************************************************************************/
84
 
unsigned char *get_var_key(user_var_entry *entry, size_t *length, bool )
 
61
 
 
62
extern "C" uchar *get_var_key(user_var_entry *entry, size_t *length,
 
63
                              bool not_used __attribute__((unused)))
85
64
{
86
65
  *length= entry->name.length;
87
 
  return (unsigned char*) entry->name.str;
 
66
  return (uchar*) entry->name.str;
88
67
}
89
68
 
90
 
void free_user_var(user_var_entry *entry)
 
69
extern "C" void free_user_var(user_var_entry *entry)
91
70
{
92
 
  delete entry;
 
71
  char *pos= (char*) entry+ALIGN_SIZE(sizeof(*entry));
 
72
  if (entry->value && entry->value != pos)
 
73
    my_free(entry->value, MYF(0));
 
74
  my_free((char*) entry,MYF(0));
93
75
}
94
76
 
95
77
bool Key_part_spec::operator==(const Key_part_spec& other) const
99
81
         !strcmp(field_name.str, other.field_name.str);
100
82
}
101
83
 
102
 
Open_tables_state::Open_tables_state(uint64_t version_arg)
103
 
  :version(version_arg), backups_available(false)
 
84
/**
 
85
  Construct an (almost) deep copy of this key. Only those
 
86
  elements that are known to never change are not copied.
 
87
  If out of memory, a partial copy is returned and an error is set
 
88
  in THD.
 
89
*/
 
90
 
 
91
Key::Key(const Key &rhs, MEM_ROOT *mem_root)
 
92
  :type(rhs.type),
 
93
  key_create_info(rhs.key_create_info),
 
94
  columns(rhs.columns, mem_root),
 
95
  name(rhs.name),
 
96
  generated(rhs.generated)
 
97
{
 
98
  list_copy_and_replace_each_value(columns, mem_root);
 
99
}
 
100
 
 
101
/**
 
102
  Construct an (almost) deep copy of this foreign key. Only those
 
103
  elements that are known to never change are not copied.
 
104
  If out of memory, a partial copy is returned and an error is set
 
105
  in THD.
 
106
*/
 
107
 
 
108
Foreign_key::Foreign_key(const Foreign_key &rhs, MEM_ROOT *mem_root)
 
109
  :Key(rhs),
 
110
  ref_table(rhs.ref_table),
 
111
  ref_columns(rhs.ref_columns),
 
112
  delete_opt(rhs.delete_opt),
 
113
  update_opt(rhs.update_opt),
 
114
  match_opt(rhs.match_opt)
 
115
{
 
116
  list_copy_and_replace_each_value(ref_columns, mem_root);
 
117
}
 
118
 
 
119
/*
 
120
  Test if a foreign key (= generated key) is a prefix of the given key
 
121
  (ignoring key name, key type and order of columns)
 
122
 
 
123
  NOTES:
 
124
    This is only used to test if an index for a FOREIGN KEY exists
 
125
 
 
126
  IMPLEMENTATION
 
127
    We only compare field names
 
128
 
 
129
  RETURN
 
130
    0   Generated key is a prefix of other key
 
131
    1   Not equal
 
132
*/
 
133
 
 
134
bool foreign_key_prefix(Key *a, Key *b)
 
135
{
 
136
  /* Ensure that 'a' is the generated key */
 
137
  if (a->generated)
 
138
  {
 
139
    if (b->generated && a->columns.elements > b->columns.elements)
 
140
      std::swap(a, b);                       // Put shorter key in 'a'
 
141
  }
 
142
  else
 
143
  {
 
144
    if (!b->generated)
 
145
      return true;                              // No foreign key
 
146
    std::swap(a, b);                       // Put generated key in 'a'
 
147
  }
 
148
 
 
149
  /* Test if 'a' is a prefix of 'b' */
 
150
  if (a->columns.elements > b->columns.elements)
 
151
    return true;                                // Can't be prefix
 
152
 
 
153
  List_iterator<Key_part_spec> col_it1(a->columns);
 
154
  List_iterator<Key_part_spec> col_it2(b->columns);
 
155
  const Key_part_spec *col1, *col2;
 
156
 
 
157
#ifdef ENABLE_WHEN_INNODB_CAN_HANDLE_SWAPED_FOREIGN_KEY_COLUMNS
 
158
  while ((col1= col_it1++))
 
159
  {
 
160
    bool found= 0;
 
161
    col_it2.rewind();
 
162
    while ((col2= col_it2++))
 
163
    {
 
164
      if (*col1 == *col2)
 
165
      {
 
166
        found= true;
 
167
        break;
 
168
      }
 
169
    }
 
170
    if (!found)
 
171
      return true;                              // Error
 
172
  }
 
173
  return false;                                 // Is prefix
 
174
#else
 
175
  while ((col1= col_it1++))
 
176
  {
 
177
    col2= col_it2++;
 
178
    if (!(*col1 == *col2))
 
179
      return true;
 
180
  }
 
181
  return false;                                 // Is prefix
 
182
#endif
 
183
}
 
184
 
 
185
 
 
186
/****************************************************************************
 
187
** Thread specific functions
 
188
****************************************************************************/
 
189
 
 
190
Open_tables_state::Open_tables_state(ulong version_arg)
 
191
  :version(version_arg), state_flags(0U)
104
192
{
105
193
  reset_open_tables_state();
106
194
}
108
196
/*
109
197
  The following functions form part of the C plugin API
110
198
*/
 
199
 
111
200
extern "C" int mysql_tmpfile(const char *prefix)
112
201
{
113
202
  char filename[FN_REFLEN];
114
 
  int fd = internal::create_temp_file(filename, drizzle_tmpdir, prefix, MYF(MY_WME));
 
203
  File fd = create_temp_file(filename, mysql_tmpdir, prefix,
 
204
                             O_CREAT | O_EXCL | O_RDWR | O_TEMPORARY,
 
205
                             MYF(MY_WME));
115
206
  if (fd >= 0) {
 
207
    /*
 
208
      This can be removed once the following bug is fixed:
 
209
      Bug #28903  create_temp_file() doesn't honor O_TEMPORARY option
 
210
                  (file not removed) (Unix)
 
211
    */
116
212
    unlink(filename);
117
213
  }
118
214
 
119
215
  return fd;
120
216
}
121
217
 
122
 
extern "C"
123
 
int session_tablespace_op(const Session *session)
124
 
{
125
 
  return test(session->tablespace_op);
126
 
}
127
 
 
128
 
/**
129
 
   Set the process info field of the Session structure.
130
 
 
131
 
   This function is used by plug-ins. Internally, the
132
 
   Session::set_proc_info() function should be used.
133
 
 
134
 
   @see Session::set_proc_info
135
 
 */
136
 
extern "C" void
137
 
set_session_proc_info(Session *session, const char *info)
138
 
{
139
 
  session->set_proc_info(info);
140
 
}
141
 
 
142
 
extern "C"
143
 
const char *get_session_proc_info(Session *session)
144
 
{
145
 
  return session->get_proc_info();
146
 
}
147
 
 
148
 
void **Session::getEngineData(const plugin::MonitoredInTransaction *monitored)
149
 
{
150
 
  return static_cast<void **>(&ha_data[monitored->getId()].ha_ptr);
151
 
}
152
 
 
153
 
ResourceContext *Session::getResourceContext(const plugin::MonitoredInTransaction *monitored,
154
 
                                             size_t index)
155
 
{
156
 
  return &ha_data[monitored->getId()].resource_context[index];
157
 
}
158
 
 
159
 
extern "C"
160
 
int64_t session_test_options(const Session *session, int64_t test_options)
161
 
{
162
 
  return session->options & test_options;
163
 
}
164
 
 
165
 
extern "C"
166
 
int session_sql_command(const Session *session)
167
 
{
168
 
  return (int) session->lex->sql_command;
169
 
}
170
 
 
171
 
extern "C"
172
 
int session_tx_isolation(const Session *session)
173
 
{
174
 
  return (int) session->variables.tx_isolation;
175
 
}
176
 
 
177
 
Session::Session(plugin::Client *client_arg)
178
 
  :
179
 
  Open_tables_state(refresh_version),
180
 
  mem_root(&main_mem_root),
181
 
  lex(&main_lex),
182
 
  query(),
183
 
  client(client_arg),
184
 
  scheduler(NULL),
185
 
  scheduler_arg(NULL),
186
 
  lock_id(&main_lock_id),
187
 
  user_time(0),
188
 
  ha_data(plugin::num_trx_monitored_objects),
189
 
  arg_of_last_insert_id_function(false),
190
 
  first_successful_insert_id_in_prev_stmt(0),
191
 
  first_successful_insert_id_in_cur_stmt(0),
192
 
  limit_found_rows(0),
193
 
  global_read_lock(0),
194
 
  some_tables_deleted(false),
195
 
  no_errors(false),
196
 
  password(false),
197
 
  is_fatal_error(false),
198
 
  transaction_rollback_request(false),
199
 
  is_fatal_sub_stmt_error(0),
200
 
  derived_tables_processing(false),
201
 
  tablespace_op(false),
202
 
  m_lip(NULL),
203
 
  cached_table(0),
204
 
  transaction_message(NULL),
205
 
  statement_message(NULL)
206
 
{
207
 
  memset(process_list_info, 0, PROCESS_LIST_WIDTH);
208
 
  client->setSession(this);
 
218
 
 
219
extern "C"
 
220
int thd_in_lock_tables(const THD *thd)
 
221
{
 
222
  return test(thd->in_lock_tables);
 
223
}
 
224
 
 
225
 
 
226
extern "C"
 
227
int thd_tablespace_op(const THD *thd)
 
228
{
 
229
  return test(thd->tablespace_op);
 
230
}
 
231
 
 
232
 
 
233
extern "C"
 
234
const char *set_thd_proc_info(THD *thd, const char *info,
 
235
                              const char *calling_function __attribute__((unused)),
 
236
                              const char *calling_file __attribute__((unused)),
 
237
                              const unsigned int calling_line __attribute__((unused)))
 
238
{
 
239
  const char *old_info= thd->get_proc_info();
 
240
  thd->set_proc_info(info);
 
241
  return old_info;
 
242
}
 
243
 
 
244
extern "C"
 
245
void **thd_ha_data(const THD *thd, const struct handlerton *hton)
 
246
{
 
247
  return (void **) &thd->ha_data[hton->slot].ha_ptr;
 
248
}
 
249
 
 
250
extern "C"
 
251
int64_t thd_test_options(const THD *thd, int64_t test_options)
 
252
{
 
253
  return thd->options & test_options;
 
254
}
 
255
 
 
256
extern "C"
 
257
int thd_sql_command(const THD *thd)
 
258
{
 
259
  return (int) thd->lex->sql_command;
 
260
}
 
261
 
 
262
extern "C"
 
263
int thd_tx_isolation(const THD *thd)
 
264
{
 
265
  return (int) thd->variables.tx_isolation;
 
266
}
 
267
 
 
268
extern "C"
 
269
void thd_inc_row_count(THD *thd)
 
270
{
 
271
  thd->row_count++;
 
272
}
 
273
 
 
274
/**
 
275
  Clear this diagnostics area. 
 
276
 
 
277
  Normally called at the end of a statement.
 
278
*/
 
279
 
 
280
void
 
281
Diagnostics_area::reset_diagnostics_area()
 
282
{
 
283
  can_overwrite_status= false;
 
284
  /** Don't take chances in production */
 
285
  m_message[0]= '\0';
 
286
  m_sql_errno= 0;
 
287
  m_server_status= 0;
 
288
  m_affected_rows= 0;
 
289
  m_last_insert_id= 0;
 
290
  m_total_warn_count= 0;
 
291
  is_sent= false;
 
292
  /** Tiny reset in debug mode to see garbage right away */
 
293
  m_status= DA_EMPTY;
 
294
}
 
295
 
 
296
 
 
297
/**
 
298
  Set OK status -- ends commands that do not return a
 
299
  result set, e.g. INSERT/UPDATE/DELETE.
 
300
*/
 
301
 
 
302
void
 
303
Diagnostics_area::set_ok_status(THD *thd, ha_rows affected_rows_arg,
 
304
                                uint64_t last_insert_id_arg,
 
305
                                const char *message_arg)
 
306
{
 
307
  assert(! is_set());
 
308
  /*
 
309
    In production, refuse to overwrite an error or a custom response
 
310
    with an OK packet.
 
311
  */
 
312
  if (is_error() || is_disabled())
 
313
    return;
 
314
  /** Only allowed to report success if has not yet reported an error */
 
315
 
 
316
  m_server_status= thd->server_status;
 
317
  m_total_warn_count= thd->total_warn_count;
 
318
  m_affected_rows= affected_rows_arg;
 
319
  m_last_insert_id= last_insert_id_arg;
 
320
  if (message_arg)
 
321
    strmake(m_message, message_arg, sizeof(m_message) - 1);
 
322
  else
 
323
    m_message[0]= '\0';
 
324
  m_status= DA_OK;
 
325
}
 
326
 
 
327
 
 
328
/**
 
329
  Set EOF status.
 
330
*/
 
331
 
 
332
void
 
333
Diagnostics_area::set_eof_status(THD *thd)
 
334
{
 
335
  /** Only allowed to report eof if has not yet reported an error */
 
336
 
 
337
  assert(! is_set());
 
338
  /*
 
339
    In production, refuse to overwrite an error or a custom response
 
340
    with an EOF packet.
 
341
  */
 
342
  if (is_error() || is_disabled())
 
343
    return;
 
344
 
 
345
  m_server_status= thd->server_status;
 
346
  /*
 
347
    If inside a stored procedure, do not return the total
 
348
    number of warnings, since they are not available to the client
 
349
    anyway.
 
350
  */
 
351
  m_total_warn_count= thd->total_warn_count;
 
352
 
 
353
  m_status= DA_EOF;
 
354
}
 
355
 
 
356
/**
 
357
  Set ERROR status.
 
358
*/
 
359
 
 
360
void
 
361
Diagnostics_area::set_error_status(THD *thd __attribute__((unused)),
 
362
                                   uint sql_errno_arg,
 
363
                                   const char *message_arg)
 
364
{
 
365
  /*
 
366
    Only allowed to report error if has not yet reported a success
 
367
    The only exception is when we flush the message to the client,
 
368
    an error can happen during the flush.
 
369
  */
 
370
  assert(! is_set() || can_overwrite_status);
 
371
  /*
 
372
    In production, refuse to overwrite a custom response with an
 
373
    ERROR packet.
 
374
  */
 
375
  if (is_disabled())
 
376
    return;
 
377
 
 
378
  m_sql_errno= sql_errno_arg;
 
379
  strmake(m_message, message_arg, sizeof(m_message) - 1);
 
380
 
 
381
  m_status= DA_ERROR;
 
382
}
 
383
 
 
384
 
 
385
/**
 
386
  Mark the diagnostics area as 'DISABLED'.
 
387
 
 
388
  This is used in rare cases when the COM_ command at hand sends a response
 
389
  in a custom format. One example is the query cache, another is
 
390
  COM_STMT_PREPARE.
 
391
*/
 
392
 
 
393
void
 
394
Diagnostics_area::disable_status()
 
395
{
 
396
  assert(! is_set());
 
397
  m_status= DA_DISABLED;
 
398
}
 
399
 
 
400
 
 
401
THD::THD()
 
402
   :Statement(&main_lex, &main_mem_root, CONVENTIONAL_EXECUTION,
 
403
              /* statement id */ 0),
 
404
   Open_tables_state(refresh_version), rli_fake(0),
 
405
   lock_id(&main_lock_id),
 
406
   user_time(0), in_sub_stmt(0),
 
407
   binlog_table_maps(0), binlog_flags(0UL),
 
408
   arg_of_last_insert_id_function(false),
 
409
   first_successful_insert_id_in_prev_stmt(0),
 
410
   first_successful_insert_id_in_prev_stmt_for_binlog(0),
 
411
   first_successful_insert_id_in_cur_stmt(0),
 
412
   stmt_depends_on_first_successful_insert_id_in_prev_stmt(false),
 
413
   global_read_lock(0),
 
414
   is_fatal_error(0),
 
415
   transaction_rollback_request(0),
 
416
   is_fatal_sub_stmt_error(0),
 
417
   rand_used(0),
 
418
   time_zone_used(0),
 
419
   in_lock_tables(0),
 
420
   bootstrap(0),
 
421
   derived_tables_processing(false),
 
422
   m_lip(NULL)
 
423
{
 
424
  ulong tmp;
209
425
 
210
426
  /*
211
427
    Pass nominal parameters to init_alloc_root only to ensure that
212
428
    the destructor works OK in case of an error. The main_mem_root
213
429
    will be re-initialized in init_for_queries().
214
430
  */
215
 
  memory::init_sql_alloc(&main_mem_root, memory::ROOT_MIN_BLOCK_SIZE, 0);
216
 
  thread_stack= NULL;
 
431
  init_sql_alloc(&main_mem_root, ALLOC_ROOT_MIN_BLOCK_SIZE, 0);
 
432
  stmt_arena= this;
 
433
  thread_stack= 0;
 
434
  catalog= (char*)"std"; // the only catalog we have for now
 
435
  main_security_ctx.init();
 
436
  security_ctx= &main_security_ctx;
 
437
  some_tables_deleted=no_errors=password= 0;
 
438
  query_start_used= 0;
217
439
  count_cuted_fields= CHECK_FIELD_IGNORE;
218
440
  killed= NOT_KILLED;
219
 
  col_access= 0;
220
 
  tmp_table= 0;
221
 
  used_tables= 0;
 
441
  col_access=0;
 
442
  is_slave_error= thread_specific_used= false;
 
443
  hash_clear(&handler_tables_hash);
 
444
  tmp_table=0;
 
445
  used_tables=0;
222
446
  cuted_fields= sent_row_count= row_count= 0L;
 
447
  limit_found_rows= 0;
223
448
  row_count_func= -1;
224
449
  statement_id_counter= 0UL;
225
 
  // Must be reset to handle error with Session's created for init of mysqld
 
450
  // Must be reset to handle error with THD's created for init of mysqld
226
451
  lex->current_select= 0;
227
452
  start_time=(time_t) 0;
228
453
  start_utime= 0L;
229
454
  utime_after_lock= 0L;
 
455
  current_linfo =  0;
 
456
  slave_thread = 0;
230
457
  memset(&variables, 0, sizeof(variables));
231
458
  thread_id= 0;
 
459
  one_shot_set= 0;
232
460
  file_id = 0;
233
461
  query_id= 0;
234
 
  warn_query_id= 0;
235
 
  mysys_var= 0;
236
 
  dbug_sentry=Session_SENTRY_MAGIC;
237
 
  cleanup_done= abort_on_warning= no_warnings_for_error= false;
 
462
  warn_id= 0;
 
463
  db_charset= global_system_variables.collation_database;
 
464
  memset(ha_data, 0, sizeof(ha_data));
 
465
  mysys_var=0;
 
466
  binlog_evt_union.do_union= false;
 
467
  enable_slow_log= 0;
 
468
  dbug_sentry=THD_SENTRY_MAGIC;
 
469
  net.vio=0;
 
470
  client_capabilities= 0;                       // minimalistic client
 
471
  system_thread= NON_SYSTEM_THREAD;
 
472
  cleanup_done= abort_on_warning= no_warnings_for_error= 0;
 
473
  peer_port= 0;                                 // For SHOW PROCESSLIST
 
474
  transaction.m_pending_rows_event= 0;
 
475
  transaction.on= 1;
 
476
#ifdef SIGNAL_WITH_VIO_CLOSE
 
477
  active_vio = 0;
 
478
#endif
238
479
  pthread_mutex_init(&LOCK_delete, MY_MUTEX_INIT_FAST);
239
480
 
240
481
  /* Variables with default values */
241
482
  proc_info="login";
242
 
  where= Session::DEFAULT_WHERE;
243
 
  command= COM_CONNECT;
244
 
 
245
 
  plugin_sessionvar_init(this);
246
 
  /*
247
 
    variables= global_system_variables above has reset
248
 
    variables.pseudo_thread_id to 0. We need to correct it here to
249
 
    avoid temporary tables replication failure.
250
 
  */
251
 
  variables.pseudo_thread_id= thread_id;
252
 
  server_status= SERVER_STATUS_AUTOCOMMIT;
253
 
  options= session_startup_options;
254
 
 
255
 
  if (variables.max_join_size == HA_POS_ERROR)
256
 
    options |= OPTION_BIG_SELECTS;
257
 
  else
258
 
    options &= ~OPTION_BIG_SELECTS;
259
 
 
260
 
  open_options=ha_open_options;
261
 
  update_lock_default= TL_WRITE;
262
 
  session_tx_isolation= (enum_tx_isolation) variables.tx_isolation;
263
 
  warn_list.empty();
264
 
  memset(warn_count, 0, sizeof(warn_count));
265
 
  total_warn_count= 0;
266
 
  memset(&status_var, 0, sizeof(status_var));
267
 
 
 
483
  where= THD::DEFAULT_WHERE;
 
484
  server_id = ::server_id;
 
485
  slave_net = 0;
 
486
  command=COM_CONNECT;
 
487
  *scramble= '\0';
 
488
 
 
489
  init();
268
490
  /* Initialize sub structures */
269
 
  memory::init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
 
491
  init_sql_alloc(&warn_root, WARN_ALLOC_BLOCK_SIZE, WARN_ALLOC_PREALLOC_SIZE);
 
492
  user_connect=(USER_CONN *)0;
270
493
  hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0,
271
494
            (hash_get_key) get_var_key,
272
495
            (hash_free_key) free_user_var, 0);
273
496
 
 
497
  /* For user vars replication*/
 
498
  if (opt_bin_log)
 
499
    my_init_dynamic_array(&user_var_events,
 
500
                          sizeof(BINLOG_USER_VAR_EVENT *), 16, 16);
 
501
  else
 
502
    memset(&user_var_events, 0, sizeof(user_var_events));
 
503
 
 
504
  /* Protocol */
 
505
  protocol= &protocol_text;                     // Default protocol
 
506
  protocol_text.init(this);
 
507
 
 
508
  tablespace_op= false;
 
509
  tmp= sql_rnd();
 
510
  randominit(&rand, tmp + (ulong) &rand, tmp + (ulong) ::global_query_id);
274
511
  substitute_null_with_insert_id = false;
275
512
  thr_lock_info_init(&lock_info); /* safety: will be reset after start */
276
513
  thr_lock_owner_init(&main_lock_id, &lock_info);
278
515
  m_internal_handler= NULL;
279
516
}
280
517
 
281
 
void Session::free_items()
282
 
{
283
 
  Item *next;
284
 
  /* This works because items are allocated with memory::sql_alloc() */
285
 
  for (; free_list; free_list= next)
286
 
  {
287
 
    next= free_list->next;
288
 
    free_list->delete_self();
289
 
  }
290
 
}
291
518
 
292
 
void Session::push_internal_handler(Internal_error_handler *handler)
 
519
void THD::push_internal_handler(Internal_error_handler *handler)
293
520
{
294
521
  /*
295
522
    TODO: The current implementation is limited to 1 handler at a time only.
296
 
    Session and sp_rcontext need to be modified to use a common handler stack.
 
523
    THD and sp_rcontext need to be modified to use a common handler stack.
297
524
  */
298
525
  assert(m_internal_handler == NULL);
299
526
  m_internal_handler= handler;
300
527
}
301
528
 
302
 
bool Session::handle_error(uint32_t sql_errno, const char *message,
 
529
 
 
530
bool THD::handle_error(uint sql_errno, const char *message,
303
531
                       DRIZZLE_ERROR::enum_warning_level level)
304
532
{
305
533
  if (m_internal_handler)
310
538
  return false;                                 // 'false', as per coding style
311
539
}
312
540
 
313
 
void Session::pop_internal_handler()
 
541
 
 
542
void THD::pop_internal_handler()
314
543
{
315
544
  assert(m_internal_handler != NULL);
316
545
  m_internal_handler= NULL;
317
546
}
318
547
 
319
 
#if defined(__cplusplus)
320
 
extern "C" {
321
 
#endif
322
 
 
323
 
void *session_alloc(Session *session, unsigned int size)
324
 
{
325
 
  return session->alloc(size);
326
 
}
327
 
 
328
 
void *session_calloc(Session *session, unsigned int size)
329
 
{
330
 
  return session->calloc(size);
331
 
}
332
 
 
333
 
char *session_strdup(Session *session, const char *str)
334
 
{
335
 
  return session->strdup(str);
336
 
}
337
 
 
338
 
char *session_strmake(Session *session, const char *str, unsigned int size)
339
 
{
340
 
  return session->strmake(str, size);
341
 
}
342
 
 
343
 
void *session_memdup(Session *session, const void* str, unsigned int size)
344
 
{
345
 
  return session->memdup(str, size);
346
 
}
347
 
 
348
 
void session_get_xid(const Session *session, DRIZZLE_XID *xid)
349
 
{
350
 
  *xid = *(DRIZZLE_XID *) &session->transaction.xid_state.xid;
351
 
}
352
 
 
353
 
#if defined(__cplusplus)
354
 
}
355
 
#endif
 
548
extern "C"
 
549
void *thd_alloc(DRIZZLE_THD thd, unsigned int size)
 
550
{
 
551
  return thd->alloc(size);
 
552
}
 
553
 
 
554
extern "C"
 
555
void *thd_calloc(DRIZZLE_THD thd, unsigned int size)
 
556
{
 
557
  return thd->calloc(size);
 
558
}
 
559
 
 
560
extern "C"
 
561
char *thd_strdup(DRIZZLE_THD thd, const char *str)
 
562
{
 
563
  return thd->strdup(str);
 
564
}
 
565
 
 
566
extern "C"
 
567
char *thd_strmake(DRIZZLE_THD thd, const char *str, unsigned int size)
 
568
{
 
569
  return thd->strmake(str, size);
 
570
}
 
571
 
 
572
extern "C"
 
573
LEX_STRING *thd_make_lex_string(THD *thd, LEX_STRING *lex_str,
 
574
                                const char *str, unsigned int size,
 
575
                                int allocate_lex_string)
 
576
{
 
577
  return thd->make_lex_string(lex_str, str, size,
 
578
                              (bool) allocate_lex_string);
 
579
}
 
580
 
 
581
extern "C"
 
582
void *thd_memdup(DRIZZLE_THD thd, const void* str, unsigned int size)
 
583
{
 
584
  return thd->memdup(str, size);
 
585
}
 
586
 
 
587
extern "C"
 
588
void thd_get_xid(const DRIZZLE_THD thd, DRIZZLE_XID *xid)
 
589
{
 
590
  *xid = *(DRIZZLE_XID *) &thd->transaction.xid_state.xid;
 
591
}
 
592
 
 
593
/*
 
594
  Init common variables that has to be reset on start and on change_user
 
595
*/
 
596
 
 
597
void THD::init(void)
 
598
{
 
599
  pthread_mutex_lock(&LOCK_global_system_variables);
 
600
  plugin_thdvar_init(this);
 
601
  variables.time_format= date_time_format_copy((THD*) 0,
 
602
                                               variables.time_format);
 
603
  variables.date_format= date_time_format_copy((THD*) 0,
 
604
                                               variables.date_format);
 
605
  variables.datetime_format= date_time_format_copy((THD*) 0,
 
606
                                                   variables.datetime_format);
 
607
  /*
 
608
    variables= global_system_variables above has reset
 
609
    variables.pseudo_thread_id to 0. We need to correct it here to
 
610
    avoid temporary tables replication failure.
 
611
  */
 
612
  variables.pseudo_thread_id= thread_id;
 
613
  pthread_mutex_unlock(&LOCK_global_system_variables);
 
614
  server_status= SERVER_STATUS_AUTOCOMMIT;
 
615
  options= thd_startup_options;
 
616
 
 
617
  if (variables.max_join_size == HA_POS_ERROR)
 
618
    options |= OPTION_BIG_SELECTS;
 
619
  else
 
620
    options &= ~OPTION_BIG_SELECTS;
 
621
 
 
622
  transaction.all.modified_non_trans_table= transaction.stmt.modified_non_trans_table= false;
 
623
  open_options=ha_open_options;
 
624
  update_lock_default= (variables.low_priority_updates ?
 
625
                        TL_WRITE_LOW_PRIORITY :
 
626
                        TL_WRITE);
 
627
  session_tx_isolation= (enum_tx_isolation) variables.tx_isolation;
 
628
  warn_list.empty();
 
629
  memset(warn_count, 0, sizeof(warn_count));
 
630
  total_warn_count= 0;
 
631
  update_charset();
 
632
  reset_current_stmt_binlog_row_based();
 
633
  memset(&status_var, 0, sizeof(status_var));
 
634
}
 
635
 
 
636
 
 
637
/*
 
638
  Init THD for query processing.
 
639
  This has to be called once before we call mysql_parse.
 
640
  See also comments in sql_class.h.
 
641
*/
 
642
 
 
643
void THD::init_for_queries()
 
644
{
 
645
  set_time(); 
 
646
  ha_enable_transaction(this,true);
 
647
 
 
648
  reset_root_defaults(mem_root, variables.query_alloc_block_size,
 
649
                      variables.query_prealloc_size);
 
650
  reset_root_defaults(&transaction.mem_root,
 
651
                      variables.trans_alloc_block_size,
 
652
                      variables.trans_prealloc_size);
 
653
  transaction.xid_state.xid.null();
 
654
  transaction.xid_state.in_thd=1;
 
655
}
 
656
 
356
657
 
357
658
/* Do operations that may take a long time */
358
659
 
359
 
void Session::cleanup(void)
 
660
void THD::cleanup(void)
360
661
{
361
 
  assert(cleanup_done == false);
 
662
  assert(cleanup_done == 0);
362
663
 
363
664
  killed= KILL_CONNECTION;
364
665
#ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE
368
669
  }
369
670
#endif
370
671
  {
371
 
    TransactionServices &transaction_services= TransactionServices::singleton();
372
 
    transaction_services.ha_rollback_trans(this, true);
 
672
    ha_rollback(this);
373
673
    xid_cache_delete(&transaction.xid_state);
374
674
  }
 
675
  if (locked_tables)
 
676
  {
 
677
    lock=locked_tables; locked_tables=0;
 
678
    close_thread_tables(this);
 
679
  }
 
680
  mysql_ha_cleanup(this);
 
681
  delete_dynamic(&user_var_events);
375
682
  hash_free(&user_vars);
376
 
  close_temporary_tables();
377
 
 
 
683
  close_temporary_tables(this);
 
684
  my_free((char*) variables.time_format, MYF(MY_ALLOW_ZERO_PTR));
 
685
  my_free((char*) variables.date_format, MYF(MY_ALLOW_ZERO_PTR));
 
686
  my_free((char*) variables.datetime_format, MYF(MY_ALLOW_ZERO_PTR));
 
687
  
378
688
  if (global_read_lock)
379
689
    unlock_global_read_lock(this);
380
690
 
381
 
  cleanup_done= true;
 
691
  cleanup_done=1;
 
692
  return;
382
693
}
383
694
 
384
 
Session::~Session()
 
695
THD::~THD()
385
696
{
386
 
  this->checkSentry();
 
697
  THD_CHECK_SENTRY(this);
 
698
  /* Ensure that no one is using THD */
 
699
  pthread_mutex_lock(&LOCK_delete);
 
700
  pthread_mutex_unlock(&LOCK_delete);
387
701
  add_to_status(&global_status_var, &status_var);
388
702
 
389
 
  if (client->isConnected())
390
 
  {
391
 
    if (global_system_variables.log_warnings)
392
 
        errmsg_printf(ERRMSG_LVL_WARN, ER(ER_FORCING_CLOSE),internal::my_progname,
393
 
                      thread_id,
394
 
                      (getSecurityContext().getUser().c_str() ?
395
 
                       getSecurityContext().getUser().c_str() : ""));
396
 
    disconnect(0, false);
397
 
  }
398
 
 
399
703
  /* Close connection */
400
 
  client->close();
401
 
  delete client;
402
 
 
403
 
  if (cleanup_done == false)
 
704
  if (net.vio)
 
705
  {
 
706
    vio_delete(net.vio);
 
707
    net_end(&net);
 
708
  }
 
709
  if (!cleanup_done)
404
710
    cleanup();
405
711
 
406
 
  plugin::StorageEngine::closeConnection(this);
407
 
  plugin_sessionvar_cleanup(this);
 
712
  ha_close_connection(this);
 
713
  plugin_thdvar_cleanup(this);
408
714
 
 
715
  main_security_ctx.destroy();
 
716
  safeFree(db);
409
717
  free_root(&warn_root,MYF(0));
 
718
  free_root(&transaction.mem_root,MYF(0));
410
719
  mysys_var=0;                                  // Safety (shouldn't be needed)
411
 
  dbug_sentry= Session_SENTRY_GONE;
412
 
 
 
720
  pthread_mutex_destroy(&LOCK_delete);
 
721
  dbug_sentry= THD_SENTRY_GONE;
 
722
  if (rli_fake)
 
723
  {
 
724
    delete rli_fake;
 
725
    rli_fake= NULL;
 
726
  }
 
727
  
413
728
  free_root(&main_mem_root, MYF(0));
414
 
  pthread_setspecific(THR_Session,  0);
415
 
 
416
 
  plugin::Logging::postEndDo(this);
417
 
 
418
 
  /* Ensure that no one is using Session */
419
 
  pthread_mutex_unlock(&LOCK_delete);
420
 
  pthread_mutex_destroy(&LOCK_delete);
 
729
  return;
421
730
}
422
731
 
 
732
 
423
733
/*
424
734
  Add all status variables to another status variable array
425
735
 
433
743
    If this assumption will change, then we have to explictely add
434
744
    the other variables after the while loop
435
745
*/
436
 
void add_to_status(system_status_var *to_var, system_status_var *from_var)
 
746
 
 
747
void add_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var)
437
748
{
438
 
  ulong *end= (ulong*) ((unsigned char*) to_var +
439
 
                        offsetof(system_status_var, last_system_status_var) +
 
749
  ulong *end= (ulong*) ((uchar*) to_var +
 
750
                        offsetof(STATUS_VAR, last_system_status_var) +
440
751
                        sizeof(ulong));
441
752
  ulong *to= (ulong*) to_var, *from= (ulong*) from_var;
442
753
 
452
763
    to_var       add to this array
453
764
    from_var     from this array
454
765
    dec_var      minus this array
455
 
 
 
766
  
456
767
  NOTE
457
768
    This function assumes that all variables are long/ulong.
458
769
*/
459
 
void add_diff_to_status(system_status_var *to_var, system_status_var *from_var,
460
 
                        system_status_var *dec_var)
 
770
 
 
771
void add_diff_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var,
 
772
                        STATUS_VAR *dec_var)
461
773
{
462
 
  ulong *end= (ulong*) ((unsigned char*) to_var + offsetof(system_status_var,
 
774
  ulong *end= (ulong*) ((uchar*) to_var + offsetof(STATUS_VAR,
463
775
                                                  last_system_status_var) +
464
776
                        sizeof(ulong));
465
777
  ulong *to= (ulong*) to_var, *from= (ulong*) from_var, *dec= (ulong*) dec_var;
468
780
    *(to++)+= *(from++) - *(dec++);
469
781
}
470
782
 
471
 
void Session::awake(Session::killed_state state_to_set)
 
783
 
 
784
void THD::awake(THD::killed_state state_to_set)
472
785
{
473
 
  this->checkSentry();
474
 
  safe_mutex_assert_owner(&LOCK_delete);
 
786
  THD_CHECK_SENTRY(this);
 
787
  safe_mutex_assert_owner(&LOCK_delete); 
475
788
 
476
789
  killed= state_to_set;
477
 
  if (state_to_set != Session::KILL_QUERY)
 
790
  if (state_to_set != THD::KILL_QUERY)
478
791
  {
479
 
    scheduler->killSession(this);
480
 
    DRIZZLE_CONNECTION_DONE(thread_id);
 
792
    thr_alarm_kill(thread_id);
 
793
    if (!slave_thread)
 
794
      thread_scheduler.post_kill_notification(this);
 
795
#ifdef SIGNAL_WITH_VIO_CLOSE
 
796
    if (this != current_thd)
 
797
    {
 
798
      /*
 
799
        In addition to a signal, let's close the socket of the thread that
 
800
        is being killed. This is to make sure it does not block if the
 
801
        signal is lost. This needs to be done only on platforms where
 
802
        signals are not a reliable interruption mechanism.
 
803
 
 
804
        If we're killing ourselves, we know that we're not blocked, so this
 
805
        hack is not used.
 
806
      */
 
807
 
 
808
      close_active_vio();
 
809
    }
 
810
#endif    
481
811
  }
482
812
  if (mysys_var)
483
813
  {
484
814
    pthread_mutex_lock(&mysys_var->mutex);
 
815
    if (!system_thread)         // Don't abort locks
 
816
      mysys_var->abort=1;
485
817
    /*
486
818
      This broadcast could be up in the air if the victim thread
487
819
      exits the cond in the time between read and broadcast, but that is
498
830
      current_cond and current_mutex are 0), then the victim will not get
499
831
      a signal and it may wait "forever" on the cond (until
500
832
      we issue a second KILL or the status it's waiting for happens).
501
 
      It's true that we have set its session->killed but it may not
 
833
      It's true that we have set its thd->killed but it may not
502
834
      see it immediately and so may have time to reach the cond_wait().
503
835
    */
504
836
    if (mysys_var->current_cond && mysys_var->current_mutex)
509
841
    }
510
842
    pthread_mutex_unlock(&mysys_var->mutex);
511
843
  }
 
844
  return;
512
845
}
513
846
 
514
847
/*
515
848
  Remember the location of thread info, the structure needed for
516
 
  memory::sql_alloc() and the structure for the net buffer
 
849
  sql_alloc() and the structure for the net buffer
517
850
*/
518
 
bool Session::storeGlobals()
 
851
 
 
852
bool THD::store_globals()
519
853
{
520
854
  /*
521
855
    Assert that thread_stack is initialized: it's necessary to be able
523
857
  */
524
858
  assert(thread_stack);
525
859
 
526
 
  if (pthread_setspecific(THR_Session,  this) ||
527
 
      pthread_setspecific(THR_Mem_root, &mem_root))
528
 
    return true;
529
 
 
 
860
  if (my_pthread_setspecific_ptr(THR_THD,  this) ||
 
861
      my_pthread_setspecific_ptr(THR_MALLOC, &mem_root))
 
862
    return 1;
530
863
  mysys_var=my_thread_var;
531
 
 
532
864
  /*
533
865
    Let mysqld define the thread id (not mysys)
534
 
    This allows us to move Session to different threads if needed.
 
866
    This allows us to move THD to different threads if needed.
535
867
  */
536
868
  mysys_var->id= thread_id;
537
869
  real_id= pthread_self();                      // For debugging
538
870
 
539
871
  /*
540
 
    We have to call thr_lock_info_init() again here as Session may have been
 
872
    We have to call thr_lock_info_init() again here as THD may have been
541
873
    created in another thread
542
874
  */
543
875
  thr_lock_info_init(&lock_info);
544
 
  return false;
 
876
  return 0;
545
877
}
546
878
 
 
879
 
547
880
/*
548
 
  Init Session for query processing.
549
 
  This has to be called once before we call mysql_parse.
550
 
  See also comments in session.h.
 
881
  Cleanup after query.
 
882
 
 
883
  SYNOPSIS
 
884
    THD::cleanup_after_query()
 
885
 
 
886
  DESCRIPTION
 
887
    This function is used to reset thread data to its default state.
 
888
 
 
889
  NOTE
 
890
    This function is not suitable for setting thread data to some
 
891
    non-default values, as there is only one replication thread, so
 
892
    different master threads may overwrite data of each other on
 
893
    slave.
551
894
*/
552
895
 
553
 
void Session::prepareForQueries()
554
 
{
555
 
  if (variables.max_join_size == HA_POS_ERROR)
556
 
    options |= OPTION_BIG_SELECTS;
557
 
 
558
 
  version= refresh_version;
559
 
  set_proc_info(NULL);
560
 
  command= COM_SLEEP;
561
 
  set_time();
562
 
 
563
 
  reset_root_defaults(mem_root, variables.query_alloc_block_size,
564
 
                      variables.query_prealloc_size);
565
 
  transaction.xid_state.xid.null();
566
 
  transaction.xid_state.in_session=1;
567
 
}
568
 
 
569
 
bool Session::initGlobals()
570
 
{
571
 
  if (storeGlobals())
572
 
  {
573
 
    disconnect(ER_OUT_OF_RESOURCES, true);
574
 
    statistic_increment(aborted_connects, &LOCK_status);
575
 
    return true;
576
 
  }
577
 
  return false;
578
 
}
579
 
 
580
 
void Session::run()
581
 
{
582
 
  if (initGlobals() || authenticate())
583
 
  {
584
 
    disconnect(0, true);
585
 
    return;
586
 
  }
587
 
 
588
 
  prepareForQueries();
589
 
 
590
 
  while (! client->haveError() && killed != KILL_CONNECTION)
591
 
  {
592
 
    if (! executeStatement())
593
 
      break;
594
 
  }
595
 
 
596
 
  disconnect(0, true);
597
 
}
598
 
 
599
 
bool Session::schedule()
600
 
{
601
 
  scheduler= plugin::Scheduler::getScheduler();
602
 
  assert(scheduler);
603
 
 
604
 
  connection_count.increment();
605
 
 
606
 
  if (connection_count > max_used_connections)
607
 
    max_used_connections= connection_count;
608
 
 
609
 
  thread_id= variables.pseudo_thread_id= global_thread_id++;
610
 
 
611
 
  pthread_mutex_lock(&LOCK_thread_count);
612
 
  getSessionList().push_back(this);
613
 
  pthread_mutex_unlock(&LOCK_thread_count);
614
 
 
615
 
  if (scheduler->addSession(this))
616
 
  {
617
 
    DRIZZLE_CONNECTION_START(thread_id);
618
 
    char error_message_buff[DRIZZLE_ERRMSG_SIZE];
619
 
 
620
 
    killed= Session::KILL_CONNECTION;
621
 
 
622
 
    statistic_increment(aborted_connects, &LOCK_status);
623
 
 
624
 
    /* Can't use my_error() since store_globals has not been called. */
625
 
    /* TODO replace will better error message */
626
 
    snprintf(error_message_buff, sizeof(error_message_buff),
627
 
             ER(ER_CANT_CREATE_THREAD), 1);
628
 
    client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
629
 
    return true;
630
 
  }
631
 
 
632
 
  return false;
633
 
}
634
 
 
635
 
 
636
 
const char* Session::enter_cond(pthread_cond_t *cond,
637
 
                                pthread_mutex_t* mutex,
638
 
                                const char* msg)
639
 
{
640
 
  const char* old_msg = get_proc_info();
641
 
  safe_mutex_assert_owner(mutex);
642
 
  mysys_var->current_mutex = mutex;
643
 
  mysys_var->current_cond = cond;
644
 
  this->set_proc_info(msg);
645
 
  return old_msg;
646
 
}
647
 
 
648
 
void Session::exit_cond(const char* old_msg)
649
 
{
650
 
  /*
651
 
    Putting the mutex unlock in exit_cond() ensures that
652
 
    mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
653
 
    locked (if that would not be the case, you'll get a deadlock if someone
654
 
    does a Session::awake() on you).
655
 
  */
656
 
  pthread_mutex_unlock(mysys_var->current_mutex);
657
 
  pthread_mutex_lock(&mysys_var->mutex);
658
 
  mysys_var->current_mutex = 0;
659
 
  mysys_var->current_cond = 0;
660
 
  this->set_proc_info(old_msg);
661
 
  pthread_mutex_unlock(&mysys_var->mutex);
662
 
}
663
 
 
664
 
bool Session::authenticate()
665
 
{
666
 
  lex_start(this);
667
 
  if (client->authenticate())
668
 
    return false;
669
 
 
670
 
  statistic_increment(aborted_connects, &LOCK_status);
671
 
  return true;
672
 
}
673
 
 
674
 
bool Session::checkUser(const char *passwd, uint32_t passwd_len, const char *in_db)
675
 
{
676
 
  const string passwd_str(passwd, passwd_len);
677
 
  bool is_authenticated=
678
 
    plugin::Authentication::isAuthenticated(getSecurityContext(),
679
 
                                            passwd_str);
680
 
 
681
 
  if (is_authenticated != true)
682
 
  {
683
 
    /* isAuthenticated has pushed the error message */
684
 
    return false;
685
 
  }
686
 
 
687
 
  /* Change database if necessary */
688
 
  if (in_db && in_db[0])
689
 
  {
690
 
    SchemaIdentifier identifier(in_db);
691
 
    if (mysql_change_db(this, identifier))
692
 
    {
693
 
      /* mysql_change_db() has pushed the error message. */
694
 
      return false;
695
 
    }
696
 
  }
697
 
  my_ok();
698
 
  password= test(passwd_len);          // remember for error messages
699
 
 
700
 
  /* Ready to handle queries */
701
 
  return true;
702
 
}
703
 
 
704
 
bool Session::executeStatement()
705
 
{
706
 
  char *l_packet= 0;
707
 
  uint32_t packet_length;
708
 
 
709
 
  enum enum_server_command l_command;
710
 
 
711
 
  /*
712
 
    indicator of uninitialized lex => normal flow of errors handling
713
 
    (see my_message_sql)
714
 
  */
715
 
  lex->current_select= 0;
716
 
  clear_error();
717
 
  main_da.reset_diagnostics_area();
718
 
 
719
 
  if (client->readCommand(&l_packet, &packet_length) == false)
720
 
    return false;
721
 
 
722
 
  if (packet_length == 0)
723
 
    return true;
724
 
 
725
 
  l_command= (enum enum_server_command) (unsigned char) l_packet[0];
726
 
 
727
 
  if (command >= COM_END)
728
 
    command= COM_END;                           // Wrong command
729
 
 
730
 
  assert(packet_length);
731
 
  return ! dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
732
 
}
733
 
 
734
 
bool Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
735
 
{
736
 
  /* Remove garbage at start and end of query */
737
 
  while (in_packet_length > 0 && my_isspace(charset(), in_packet[0]))
738
 
  {
739
 
    in_packet++;
740
 
    in_packet_length--;
741
 
  }
742
 
  const char *pos= in_packet + in_packet_length; /* Point at end null */
743
 
  while (in_packet_length > 0 &&
744
 
         (pos[-1] == ';' || my_isspace(charset() ,pos[-1])))
745
 
  {
746
 
    pos--;
747
 
    in_packet_length--;
748
 
  }
749
 
 
750
 
  query.assign(in_packet, in_packet + in_packet_length);
751
 
 
752
 
  return true;
753
 
}
754
 
 
755
 
bool Session::endTransaction(enum enum_mysql_completiontype completion)
756
 
{
757
 
  bool do_release= 0;
758
 
  bool result= true;
759
 
  TransactionServices &transaction_services= TransactionServices::singleton();
760
 
 
761
 
  if (transaction.xid_state.xa_state != XA_NOTR)
762
 
  {
763
 
    my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
764
 
    return false;
765
 
  }
766
 
  switch (completion)
767
 
  {
768
 
    case COMMIT:
769
 
      /*
770
 
       * We don't use endActiveTransaction() here to ensure that this works
771
 
       * even if there is a problem with the OPTION_AUTO_COMMIT flag
772
 
       * (Which of course should never happen...)
773
 
       */
774
 
      server_status&= ~SERVER_STATUS_IN_TRANS;
775
 
      if (transaction_services.ha_commit_trans(this, true))
776
 
        result= false;
777
 
      options&= ~(OPTION_BEGIN);
778
 
      break;
779
 
    case COMMIT_RELEASE:
780
 
      do_release= 1; /* fall through */
781
 
    case COMMIT_AND_CHAIN:
782
 
      result= endActiveTransaction();
783
 
      if (result == true && completion == COMMIT_AND_CHAIN)
784
 
        result= startTransaction();
785
 
      break;
786
 
    case ROLLBACK_RELEASE:
787
 
      do_release= 1; /* fall through */
788
 
    case ROLLBACK:
789
 
    case ROLLBACK_AND_CHAIN:
790
 
    {
791
 
      server_status&= ~SERVER_STATUS_IN_TRANS;
792
 
      if (transaction_services.ha_rollback_trans(this, true))
793
 
        result= false;
794
 
      options&= ~(OPTION_BEGIN);
795
 
      if (result == true && (completion == ROLLBACK_AND_CHAIN))
796
 
        result= startTransaction();
797
 
      break;
798
 
    }
799
 
    default:
800
 
      my_error(ER_UNKNOWN_COM_ERROR, MYF(0));
801
 
      return false;
802
 
  }
803
 
 
804
 
  if (result == false)
805
 
    my_error(killed_errno(), MYF(0));
806
 
  else if ((result == true) && do_release)
807
 
    killed= Session::KILL_CONNECTION;
808
 
 
809
 
  return result;
810
 
}
811
 
 
812
 
bool Session::endActiveTransaction()
813
 
{
814
 
  bool result= true;
815
 
  TransactionServices &transaction_services= TransactionServices::singleton();
816
 
 
817
 
  if (transaction.xid_state.xa_state != XA_NOTR)
818
 
  {
819
 
    my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
820
 
    return false;
821
 
  }
822
 
  if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
823
 
  {
824
 
    server_status&= ~SERVER_STATUS_IN_TRANS;
825
 
    if (transaction_services.ha_commit_trans(this, true))
826
 
      result= false;
827
 
  }
828
 
  options&= ~(OPTION_BEGIN);
829
 
  return result;
830
 
}
831
 
 
832
 
bool Session::startTransaction(start_transaction_option_t opt)
833
 
{
834
 
  bool result= true;
835
 
 
836
 
  if (! endActiveTransaction())
837
 
  {
838
 
    result= false;
839
 
  }
840
 
  else
841
 
  {
842
 
    options|= OPTION_BEGIN;
843
 
    server_status|= SERVER_STATUS_IN_TRANS;
844
 
 
845
 
    if (plugin::TransactionalStorageEngine::notifyStartTransaction(this, opt))
846
 
    {
847
 
      result= false;
848
 
    }
849
 
  }
850
 
 
851
 
  return result;
852
 
}
853
 
 
854
 
void Session::cleanup_after_query()
855
 
{
856
 
  /*
857
 
    Reset rand_used so that detection of calls to rand() will save random
 
896
void THD::cleanup_after_query()
 
897
{
 
898
  /*
 
899
    Reset rand_used so that detection of calls to rand() will save random 
858
900
    seeds if needed by the slave.
 
901
 
 
902
    Do not reset rand_used if inside a stored function or trigger because 
 
903
    only the call to these operations is logged. Thus only the calling 
 
904
    statement needs to detect rand() calls made by its substatements. These
 
905
    substatements must not set rand_used to 0 because it would remove the
 
906
    detection of rand() by the calling statement. 
859
907
  */
 
908
  if (!in_sub_stmt) /* stored functions and triggers are a special case */
860
909
  {
861
910
    /* Forget those values, for next binlogger: */
 
911
    stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
862
912
    auto_inc_intervals_in_cur_stmt_for_binlog.empty();
 
913
    rand_used= 0;
863
914
  }
864
915
  if (first_successful_insert_id_in_cur_stmt > 0)
865
916
  {
866
917
    /* set what LAST_INSERT_ID() will return */
867
 
    first_successful_insert_id_in_prev_stmt= first_successful_insert_id_in_cur_stmt;
 
918
    first_successful_insert_id_in_prev_stmt= 
 
919
      first_successful_insert_id_in_cur_stmt;
868
920
    first_successful_insert_id_in_cur_stmt= 0;
869
921
    substitute_null_with_insert_id= true;
870
922
  }
871
 
  arg_of_last_insert_id_function= false;
 
923
  arg_of_last_insert_id_function= 0;
872
924
  /* Free Items that were created during this execution */
873
925
  free_items();
874
926
  /* Reset where. */
875
 
  where= Session::DEFAULT_WHERE;
 
927
  where= THD::DEFAULT_WHERE;
876
928
}
877
929
 
 
930
 
878
931
/**
879
932
  Create a LEX_STRING in this connection.
880
933
 
885
938
                              instead of using lex_str value
886
939
  @return  NULL on failure, or pointer to the LEX_STRING object
887
940
*/
888
 
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
889
 
                                     const std::string &str,
890
 
                                     bool allocate_lex_string)
891
 
{
892
 
  return make_lex_string(lex_str, str.c_str(), str.length(), allocate_lex_string);
893
 
}
894
 
 
895
 
LEX_STRING *Session::make_lex_string(LEX_STRING *lex_str,
896
 
                                     const char* str, uint32_t length,
897
 
                                     bool allocate_lex_string)
 
941
LEX_STRING *THD::make_lex_string(LEX_STRING *lex_str,
 
942
                                 const char* str, uint length,
 
943
                                 bool allocate_lex_string)
898
944
{
899
945
  if (allocate_lex_string)
900
946
    if (!(lex_str= (LEX_STRING *)alloc(sizeof(LEX_STRING))))
905
951
  return lex_str;
906
952
}
907
953
 
908
 
int Session::send_explain_fields(select_result *result)
 
954
 
 
955
/*
 
956
  Convert a string to another character set
 
957
 
 
958
  SYNOPSIS
 
959
    convert_string()
 
960
    to                          Store new allocated string here
 
961
    to_cs                       New character set for allocated string
 
962
    from                        String to convert
 
963
    from_length                 Length of string to convert
 
964
    from_cs                     Original character set
 
965
 
 
966
  NOTES
 
967
    to will be 0-terminated to make it easy to pass to system funcs
 
968
 
 
969
  RETURN
 
970
    0   ok
 
971
    1   End of memory.
 
972
        In this case to->str will point to 0 and to->length will be 0.
 
973
*/
 
974
 
 
975
bool THD::convert_string(LEX_STRING *to, const CHARSET_INFO * const to_cs,
 
976
                         const char *from, uint from_length,
 
977
                         const CHARSET_INFO * const from_cs)
 
978
{
 
979
  size_t new_length= to_cs->mbmaxlen * from_length;
 
980
  uint dummy_errors;
 
981
  if (!(to->str= (char*) alloc(new_length+1)))
 
982
  {
 
983
    to->length= 0;                              // Safety fix
 
984
    return(1);                          // EOM
 
985
  }
 
986
  to->length= copy_and_convert((char*) to->str, new_length, to_cs,
 
987
                               from, from_length, from_cs, &dummy_errors);
 
988
  to->str[to->length]=0;                        // Safety
 
989
  return(0);
 
990
}
 
991
 
 
992
 
 
993
/*
 
994
  Convert string from source character set to target character set inplace.
 
995
 
 
996
  SYNOPSIS
 
997
    THD::convert_string
 
998
 
 
999
  DESCRIPTION
 
1000
    Convert string using convert_buffer - buffer for character set 
 
1001
    conversion shared between all protocols.
 
1002
 
 
1003
  RETURN
 
1004
    0   ok
 
1005
   !0   out of memory
 
1006
*/
 
1007
 
 
1008
bool THD::convert_string(String *s, const CHARSET_INFO * const from_cs,
 
1009
                         const CHARSET_INFO * const to_cs)
 
1010
{
 
1011
  uint dummy_errors;
 
1012
  if (convert_buffer.copy(s->ptr(), s->length(), from_cs, to_cs, &dummy_errors))
 
1013
    return true;
 
1014
  /* If convert_buffer >> s copying is more efficient long term */
 
1015
  if (convert_buffer.alloced_length() >= convert_buffer.length() * 2 ||
 
1016
      !s->is_alloced())
 
1017
  {
 
1018
    return s->copy(convert_buffer);
 
1019
  }
 
1020
  s->swap(convert_buffer);
 
1021
  return false;
 
1022
}
 
1023
 
 
1024
 
 
1025
/*
 
1026
  Update some cache variables when character set changes
 
1027
*/
 
1028
 
 
1029
void THD::update_charset()
 
1030
{
 
1031
  uint32_t not_used;
 
1032
  charset_is_system_charset= !String::needs_conversion(0,charset(),
 
1033
                                                       system_charset_info,
 
1034
                                                       &not_used);
 
1035
  charset_is_collation_connection= 
 
1036
    !String::needs_conversion(0,charset(),variables.collation_connection,
 
1037
                              &not_used);
 
1038
  charset_is_character_set_filesystem= 
 
1039
    !String::needs_conversion(0, charset(),
 
1040
                              variables.character_set_filesystem, &not_used);
 
1041
}
 
1042
 
 
1043
 
 
1044
/* routings to adding tables to list of changed in transaction tables */
 
1045
 
 
1046
inline static void list_include(CHANGED_TableList** prev,
 
1047
                                CHANGED_TableList* curr,
 
1048
                                CHANGED_TableList* new_table)
 
1049
{
 
1050
  if (new_table)
 
1051
  {
 
1052
    *prev = new_table;
 
1053
    (*prev)->next = curr;
 
1054
  }
 
1055
}
 
1056
 
 
1057
/* add table to list of changed in transaction tables */
 
1058
 
 
1059
void THD::add_changed_table(Table *table)
 
1060
{
 
1061
  assert((options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
 
1062
              table->file->has_transactions());
 
1063
  add_changed_table(table->s->table_cache_key.str,
 
1064
                    (long) table->s->table_cache_key.length);
 
1065
  return;
 
1066
}
 
1067
 
 
1068
 
 
1069
void THD::add_changed_table(const char *key, long key_length)
 
1070
{
 
1071
  CHANGED_TableList **prev_changed = &transaction.changed_tables;
 
1072
  CHANGED_TableList *curr = transaction.changed_tables;
 
1073
 
 
1074
  for (; curr; prev_changed = &(curr->next), curr = curr->next)
 
1075
  {
 
1076
    int cmp =  (long)curr->key_length - (long)key_length;
 
1077
    if (cmp < 0)
 
1078
    {
 
1079
      list_include(prev_changed, curr, changed_table_dup(key, key_length));
 
1080
      return;
 
1081
    }
 
1082
    else if (cmp == 0)
 
1083
    {
 
1084
      cmp = memcmp(curr->key, key, curr->key_length);
 
1085
      if (cmp < 0)
 
1086
      {
 
1087
        list_include(prev_changed, curr, changed_table_dup(key, key_length));
 
1088
        return;
 
1089
      }
 
1090
      else if (cmp == 0)
 
1091
      {
 
1092
        return;
 
1093
      }
 
1094
    }
 
1095
  }
 
1096
  *prev_changed = changed_table_dup(key, key_length);
 
1097
  return;
 
1098
}
 
1099
 
 
1100
 
 
1101
CHANGED_TableList* THD::changed_table_dup(const char *key, long key_length)
 
1102
{
 
1103
  CHANGED_TableList* new_table = 
 
1104
    (CHANGED_TableList*) trans_alloc(ALIGN_SIZE(sizeof(CHANGED_TableList))+
 
1105
                                      key_length + 1);
 
1106
  if (!new_table)
 
1107
  {
 
1108
    my_error(EE_OUTOFMEMORY, MYF(ME_BELL),
 
1109
             ALIGN_SIZE(sizeof(TableList)) + key_length + 1);
 
1110
    killed= KILL_CONNECTION;
 
1111
    return 0;
 
1112
  }
 
1113
 
 
1114
  new_table->key= ((char*)new_table)+ ALIGN_SIZE(sizeof(CHANGED_TableList));
 
1115
  new_table->next = 0;
 
1116
  new_table->key_length = key_length;
 
1117
  ::memcpy(new_table->key, key, key_length);
 
1118
  return new_table;
 
1119
}
 
1120
 
 
1121
 
 
1122
int THD::send_explain_fields(select_result *result)
909
1123
{
910
1124
  List<Item> field_list;
911
1125
  Item *item;
940
1154
  }
941
1155
  item->maybe_null= 1;
942
1156
  field_list.push_back(new Item_empty_string("Extra", 255, cs));
943
 
  return (result->send_fields(field_list));
944
 
}
945
 
 
946
 
void select_result::send_error(uint32_t errcode, const char *err)
 
1157
  return (result->send_fields(field_list,
 
1158
                              Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF));
 
1159
}
 
1160
 
 
1161
#ifdef SIGNAL_WITH_VIO_CLOSE
 
1162
void THD::close_active_vio()
 
1163
{
 
1164
  safe_mutex_assert_owner(&LOCK_delete); 
 
1165
  if (active_vio)
 
1166
  {
 
1167
    vio_close(active_vio);
 
1168
    active_vio = 0;
 
1169
  }
 
1170
  return;
 
1171
}
 
1172
#endif
 
1173
 
 
1174
 
 
1175
struct Item_change_record: public ilink
 
1176
{
 
1177
  Item **place;
 
1178
  Item *old_value;
 
1179
  /* Placement new was hidden by `new' in ilink (TODO: check): */
 
1180
  static void *operator new(size_t size __attribute__((unused)),
 
1181
                            void *mem)
 
1182
    { return mem; }
 
1183
  static void operator delete(void *ptr __attribute__((unused)),
 
1184
                              size_t size __attribute__((unused)))
 
1185
    {}
 
1186
  static void operator delete(void *ptr __attribute__((unused)),
 
1187
                              void *mem __attribute__((unused)))
 
1188
    { /* never called */ }
 
1189
};
 
1190
 
 
1191
 
 
1192
/*
 
1193
  Register an item tree tree transformation, performed by the query
 
1194
  optimizer. We need a pointer to runtime_memroot because it may be !=
 
1195
  thd->mem_root (due to possible set_n_backup_active_arena called for thd).
 
1196
*/
 
1197
 
 
1198
void THD::nocheck_register_item_tree_change(Item **place, Item *old_value,
 
1199
                                            MEM_ROOT *runtime_memroot)
 
1200
{
 
1201
  Item_change_record *change;
 
1202
  /*
 
1203
    Now we use one node per change, which adds some memory overhead,
 
1204
    but still is rather fast as we use alloc_root for allocations.
 
1205
    A list of item tree changes of an average query should be short.
 
1206
  */
 
1207
  void *change_mem= alloc_root(runtime_memroot, sizeof(*change));
 
1208
  if (change_mem == 0)
 
1209
  {
 
1210
    /*
 
1211
      OOM, thd->fatal_error() is called by the error handler of the
 
1212
      memroot. Just return.
 
1213
    */
 
1214
    return;
 
1215
  }
 
1216
  change= new (change_mem) Item_change_record;
 
1217
  change->place= place;
 
1218
  change->old_value= old_value;
 
1219
  change_list.append(change);
 
1220
}
 
1221
 
 
1222
 
 
1223
void THD::rollback_item_tree_changes()
 
1224
{
 
1225
  I_List_iterator<Item_change_record> it(change_list);
 
1226
  Item_change_record *change;
 
1227
 
 
1228
  while ((change= it++))
 
1229
    *change->place= change->old_value;
 
1230
  /* We can forget about changes memory: it's allocated in runtime memroot */
 
1231
  change_list.empty();
 
1232
  return;
 
1233
}
 
1234
 
 
1235
 
 
1236
/**
 
1237
  Check that the endpoint is still available.
 
1238
*/
 
1239
 
 
1240
bool THD::vio_is_connected()
 
1241
{
 
1242
  uint bytes= 0;
 
1243
 
 
1244
  /* End of input is signaled by poll if the socket is aborted. */
 
1245
  if (vio_poll_read(net.vio, 0))
 
1246
    return true;
 
1247
 
 
1248
  /* Socket is aborted if signaled but no data is available. */
 
1249
  if (vio_peek_read(net.vio, &bytes))
 
1250
    return true;
 
1251
 
 
1252
  return bytes ? true : false;
 
1253
}
 
1254
 
 
1255
 
 
1256
/*****************************************************************************
 
1257
** Functions to provide a interface to select results
 
1258
*****************************************************************************/
 
1259
 
 
1260
select_result::select_result()
 
1261
{
 
1262
  thd=current_thd;
 
1263
}
 
1264
 
 
1265
void select_result::send_error(uint errcode,const char *err)
947
1266
{
948
1267
  my_message(errcode, err, MYF(0));
949
1268
}
950
1269
 
 
1270
 
 
1271
void select_result::cleanup()
 
1272
{
 
1273
  /* do nothing */
 
1274
}
 
1275
 
 
1276
bool select_result::check_simple_select() const
 
1277
{
 
1278
  my_error(ER_SP_BAD_CURSOR_QUERY, MYF(0));
 
1279
  return true;
 
1280
}
 
1281
 
 
1282
 
 
1283
static String default_line_term("\n",default_charset_info);
 
1284
static String default_escaped("\\",default_charset_info);
 
1285
static String default_field_term("\t",default_charset_info);
 
1286
 
 
1287
sql_exchange::sql_exchange(char *name, bool flag,
 
1288
                           enum enum_filetype filetype_arg)
 
1289
  :file_name(name), opt_enclosed(0), dumpfile(flag), skip_lines(0)
 
1290
{
 
1291
  filetype= filetype_arg;
 
1292
  field_term= &default_field_term;
 
1293
  enclosed=   line_start= &my_empty_string;
 
1294
  line_term=  &default_line_term;
 
1295
  escaped=    &default_escaped;
 
1296
  cs= NULL;
 
1297
}
 
1298
 
 
1299
bool select_send::send_fields(List<Item> &list, uint flags)
 
1300
{
 
1301
  bool res;
 
1302
  if (!(res= thd->protocol->send_fields(&list, flags)))
 
1303
    is_result_set_started= 1;
 
1304
  return res;
 
1305
}
 
1306
 
 
1307
void select_send::abort()
 
1308
{
 
1309
  return;
 
1310
}
 
1311
 
 
1312
 
 
1313
/** 
 
1314
  Cleanup an instance of this class for re-use
 
1315
  at next execution of a prepared statement/
 
1316
  stored procedure statement.
 
1317
*/
 
1318
 
 
1319
void select_send::cleanup()
 
1320
{
 
1321
  is_result_set_started= false;
 
1322
}
 
1323
 
 
1324
/* Send data to client. Returns 0 if ok */
 
1325
 
 
1326
bool select_send::send_data(List<Item> &items)
 
1327
{
 
1328
  if (unit->offset_limit_cnt)
 
1329
  {                                             // using limit offset,count
 
1330
    unit->offset_limit_cnt--;
 
1331
    return 0;
 
1332
  }
 
1333
 
 
1334
  /*
 
1335
    We may be passing the control from mysqld to the client: release the
 
1336
    InnoDB adaptive hash S-latch to avoid thread deadlocks if it was reserved
 
1337
    by thd
 
1338
  */
 
1339
  ha_release_temporary_latches(thd);
 
1340
 
 
1341
  List_iterator_fast<Item> li(items);
 
1342
  Protocol *protocol= thd->protocol;
 
1343
  char buff[MAX_FIELD_WIDTH];
 
1344
  String buffer(buff, sizeof(buff), &my_charset_bin);
 
1345
 
 
1346
  protocol->prepare_for_resend();
 
1347
  Item *item;
 
1348
  while ((item=li++))
 
1349
  {
 
1350
    if (item->send(protocol, &buffer))
 
1351
    {
 
1352
      protocol->free();                         // Free used buffer
 
1353
      my_message(ER_OUT_OF_RESOURCES, ER(ER_OUT_OF_RESOURCES), MYF(0));
 
1354
      break;
 
1355
    }
 
1356
  }
 
1357
  thd->sent_row_count++;
 
1358
  if (thd->is_error())
 
1359
  {
 
1360
    protocol->remove_last_row();
 
1361
    return(1);
 
1362
  }
 
1363
  if (thd->vio_ok())
 
1364
    return(protocol->write());
 
1365
  return(0);
 
1366
}
 
1367
 
 
1368
bool select_send::send_eof()
 
1369
{
 
1370
  /* 
 
1371
    We may be passing the control from mysqld to the client: release the
 
1372
    InnoDB adaptive hash S-latch to avoid thread deadlocks if it was reserved
 
1373
    by thd 
 
1374
  */
 
1375
  ha_release_temporary_latches(thd);
 
1376
 
 
1377
  /* Unlock tables before sending packet to gain some speed */
 
1378
  if (thd->lock)
 
1379
  {
 
1380
    mysql_unlock_tables(thd, thd->lock);
 
1381
    thd->lock=0;
 
1382
  }
 
1383
  ::my_eof(thd);
 
1384
  is_result_set_started= 0;
 
1385
  return false;
 
1386
}
 
1387
 
 
1388
 
951
1389
/************************************************************************
952
1390
  Handling writing to file
953
1391
************************************************************************/
954
1392
 
955
 
void select_to_file::send_error(uint32_t errcode,const char *err)
 
1393
void select_to_file::send_error(uint errcode,const char *err)
956
1394
{
957
1395
  my_message(errcode, err, MYF(0));
958
1396
  if (file > 0)
959
1397
  {
960
 
    (void) end_io_cache(cache);
961
 
    (void) internal::my_close(file, MYF(0));
962
 
    (void) internal::my_delete(path, MYF(0));           // Delete file on error
 
1398
    (void) end_io_cache(&cache);
 
1399
    (void) my_close(file,MYF(0));
 
1400
    (void) my_delete(path,MYF(0));              // Delete file on error
963
1401
    file= -1;
964
1402
  }
965
1403
}
967
1405
 
968
1406
bool select_to_file::send_eof()
969
1407
{
970
 
  int error= test(end_io_cache(cache));
971
 
  if (internal::my_close(file, MYF(MY_WME)))
 
1408
  int error= test(end_io_cache(&cache));
 
1409
  if (my_close(file,MYF(MY_WME)))
972
1410
    error= 1;
973
1411
  if (!error)
974
1412
  {
977
1415
      function, SELECT INTO has to have an own SQLCOM.
978
1416
      TODO: split from SQLCOM_SELECT
979
1417
    */
980
 
    session->my_ok(row_count);
 
1418
    ::my_ok(thd,row_count);
981
1419
  }
982
1420
  file= -1;
983
1421
  return error;
989
1427
  /* In case of error send_eof() may be not called: close the file here. */
990
1428
  if (file >= 0)
991
1429
  {
992
 
    (void) end_io_cache(cache);
993
 
    (void) internal::my_close(file, MYF(0));
 
1430
    (void) end_io_cache(&cache);
 
1431
    (void) my_close(file,MYF(0));
994
1432
    file= -1;
995
1433
  }
996
1434
  path[0]= '\0';
997
1435
  row_count= 0;
998
1436
}
999
1437
 
1000
 
select_to_file::select_to_file(file_exchange *ex)
1001
 
  : exchange(ex),
1002
 
    file(-1),
1003
 
    cache(static_cast<internal::IO_CACHE *>(memory::sql_calloc(sizeof(internal::IO_CACHE)))),
1004
 
    row_count(0L)
1005
 
{
1006
 
  path[0]=0;
1007
 
}
1008
1438
 
1009
1439
select_to_file::~select_to_file()
1010
1440
{
1011
 
  cleanup();
 
1441
  if (file >= 0)
 
1442
  {                                     // This only happens in case of error
 
1443
    (void) end_io_cache(&cache);
 
1444
    (void) my_close(file,MYF(0));
 
1445
    file= -1;
 
1446
  }
1012
1447
}
1013
1448
 
1014
1449
/***************************************************************************
1017
1452
 
1018
1453
select_export::~select_export()
1019
1454
{
1020
 
  session->sent_row_count=row_count;
 
1455
  thd->sent_row_count=row_count;
1021
1456
}
1022
1457
 
1023
1458
 
1026
1461
 
1027
1462
  SYNOPSIS
1028
1463
    create_file()
1029
 
    session                     Thread handle
 
1464
    thd                 Thread handle
1030
1465
    path                File name
1031
1466
    exchange            Excange class
1032
1467
    cache               IO cache
1037
1472
*/
1038
1473
 
1039
1474
 
1040
 
static int create_file(Session *session, char *path, file_exchange *exchange, internal::IO_CACHE *cache)
 
1475
static File create_file(THD *thd, char *path, sql_exchange *exchange,
 
1476
                        IO_CACHE *cache)
1041
1477
{
1042
 
  int file;
1043
 
  uint32_t option= MY_UNPACK_FILENAME | MY_RELATIVE_PATH;
 
1478
  File file;
 
1479
  uint option= MY_UNPACK_FILENAME | MY_RELATIVE_PATH;
1044
1480
 
1045
1481
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
1046
1482
  option|= MY_REPLACE_DIR;                      // Force use of db directory
1047
1483
#endif
1048
1484
 
1049
 
  if (!internal::dirname_length(exchange->file_name))
 
1485
  if (!dirname_length(exchange->file_name))
1050
1486
  {
1051
 
    strcpy(path, drizzle_real_data_home);
1052
 
    if (! session->db.empty())
1053
 
      strncat(path, session->db.c_str(), FN_REFLEN-strlen(drizzle_real_data_home)-1);
1054
 
    (void) internal::fn_format(path, exchange->file_name, path, "", option);
 
1487
    strxnmov(path, FN_REFLEN-1, mysql_real_data_home, thd->db ? thd->db : "",
 
1488
             NullS);
 
1489
    (void) fn_format(path, exchange->file_name, path, "", option);
1055
1490
  }
1056
1491
  else
1057
 
    (void) internal::fn_format(path, exchange->file_name, drizzle_real_data_home, "", option);
 
1492
    (void) fn_format(path, exchange->file_name, mysql_real_data_home, "", option);
1058
1493
 
1059
1494
  if (opt_secure_file_priv &&
1060
1495
      strncmp(opt_secure_file_priv, path, strlen(opt_secure_file_priv)))
1070
1505
    return -1;
1071
1506
  }
1072
1507
  /* Create the file world readable */
1073
 
  if ((file= internal::my_create(path, 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
 
1508
  if ((file= my_create(path, 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
1074
1509
    return file;
 
1510
#ifdef HAVE_FCHMOD
1075
1511
  (void) fchmod(file, 0666);                    // Because of umask()
1076
 
  if (init_io_cache(cache, file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
 
1512
#else
 
1513
  (void) chmod(path, 0666);
 
1514
#endif
 
1515
  if (init_io_cache(cache, file, 0L, WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1077
1516
  {
1078
 
    internal::my_close(file, MYF(0));
1079
 
    internal::my_delete(path, MYF(0));  // Delete file on error, it was just created
 
1517
    my_close(file, MYF(0));
 
1518
    my_delete(path, MYF(0));  // Delete file on error, it was just created 
1080
1519
    return -1;
1081
1520
  }
1082
1521
  return file;
1084
1523
 
1085
1524
 
1086
1525
int
1087
 
select_export::prepare(List<Item> &list, Select_Lex_Unit *u)
 
1526
select_export::prepare(List<Item> &list, SELECT_LEX_UNIT *u)
1088
1527
{
1089
1528
  bool blob_flag=0;
1090
1529
  bool string_results= false, non_string_results= false;
1091
1530
  unit= u;
1092
 
  if ((uint32_t) strlen(exchange->file_name) + NAME_LEN >= FN_REFLEN)
1093
 
    strncpy(path,exchange->file_name,FN_REFLEN-1);
 
1531
  if ((uint) strlen(exchange->file_name) + NAME_LEN >= FN_REFLEN)
 
1532
    strmake(path,exchange->file_name,FN_REFLEN-1);
1094
1533
 
 
1534
  if ((file= create_file(thd, path, exchange, &cache)) < 0)
 
1535
    return 1;
1095
1536
  /* Check if there is any blobs in data */
1096
1537
  {
1097
1538
    List_iterator_fast<Item> li(list);
1111
1552
  }
1112
1553
  field_term_length=exchange->field_term->length();
1113
1554
  field_term_char= field_term_length ?
1114
 
                   (int) (unsigned char) (*exchange->field_term)[0] : INT_MAX;
 
1555
                   (int) (uchar) (*exchange->field_term)[0] : INT_MAX;
1115
1556
  if (!exchange->line_term->length())
1116
1557
    exchange->line_term=exchange->field_term;   // Use this if it exists
1117
1558
  field_sep_char= (exchange->enclosed->length() ?
1118
 
                  (int) (unsigned char) (*exchange->enclosed)[0] : field_term_char);
 
1559
                  (int) (uchar) (*exchange->enclosed)[0] : field_term_char);
1119
1560
  escape_char=  (exchange->escaped->length() ?
1120
 
                (int) (unsigned char) (*exchange->escaped)[0] : -1);
 
1561
                (int) (uchar) (*exchange->escaped)[0] : -1);
1121
1562
  is_ambiguous_field_sep= test(strchr(ESCAPE_CHARS, field_sep_char));
1122
1563
  is_unsafe_field_sep= test(strchr(NUMERIC_CHARS, field_sep_char));
1123
1564
  line_sep_char= (exchange->line_term->length() ?
1124
 
                 (int) (unsigned char) (*exchange->line_term)[0] : INT_MAX);
 
1565
                 (int) (uchar) (*exchange->line_term)[0] : INT_MAX);
1125
1566
  if (!field_term_length)
1126
1567
    exchange->opt_enclosed=0;
1127
1568
  if (!exchange->enclosed->length())
1133
1574
      (exchange->opt_enclosed && non_string_results &&
1134
1575
       field_term_length && strchr(NUMERIC_CHARS, field_term_char)))
1135
1576
  {
1136
 
    my_error(ER_AMBIGUOUS_FIELD_TERM, MYF(0));
1137
 
    return 1;
 
1577
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1578
                 ER_AMBIGUOUS_FIELD_TERM, ER(ER_AMBIGUOUS_FIELD_TERM));
 
1579
    is_ambiguous_field_term= true;
1138
1580
  }
1139
 
 
1140
 
  if ((file= create_file(session, path, exchange, cache)) < 0)
1141
 
    return 1;
 
1581
  else
 
1582
    is_ambiguous_field_term= false;
1142
1583
 
1143
1584
  return 0;
1144
1585
}
1145
1586
 
 
1587
 
 
1588
#define NEED_ESCAPING(x) ((int) (uchar) (x) == escape_char    || \
 
1589
                          (enclosed ? (int) (uchar) (x) == field_sep_char      \
 
1590
                                    : (int) (uchar) (x) == field_term_char) || \
 
1591
                          (int) (uchar) (x) == line_sep_char  || \
 
1592
                          !(x))
 
1593
 
1146
1594
bool select_export::send_data(List<Item> &items)
1147
1595
{
1148
1596
  char buff[MAX_FIELD_WIDTH],null_buff[2],space[MAX_FIELD_WIDTH];
1157
1605
  }
1158
1606
  row_count++;
1159
1607
  Item *item;
1160
 
  uint32_t used_length=0,items_left=items.elements;
 
1608
  uint used_length=0,items_left=items.elements;
1161
1609
  List_iterator_fast<Item> li(items);
1162
1610
 
1163
 
  if (my_b_write(cache,(unsigned char*) exchange->line_start->ptr(),
1164
 
                 exchange->line_start->length()))
 
1611
  if (my_b_write(&cache,(uchar*) exchange->line_start->ptr(),
 
1612
                 exchange->line_start->length()))
1165
1613
    goto err;
1166
1614
  while ((item=li++))
1167
1615
  {
1171
1619
    res=item->str_result(&tmp);
1172
1620
    if (res && enclosed)
1173
1621
    {
1174
 
      if (my_b_write(cache,(unsigned char*) exchange->enclosed->ptr(),
1175
 
                     exchange->enclosed->length()))
1176
 
        goto err;
 
1622
      if (my_b_write(&cache,(uchar*) exchange->enclosed->ptr(),
 
1623
                     exchange->enclosed->length()))
 
1624
        goto err;
1177
1625
    }
1178
1626
    if (!res)
1179
1627
    {                                           // NULL
1180
1628
      if (!fixed_row_size)
1181
1629
      {
1182
 
        if (escape_char != -1)                  // Use \N syntax
1183
 
        {
1184
 
          null_buff[0]=escape_char;
1185
 
          null_buff[1]='N';
1186
 
          if (my_b_write(cache,(unsigned char*) null_buff,2))
1187
 
            goto err;
1188
 
        }
1189
 
        else if (my_b_write(cache,(unsigned char*) "NULL",4))
1190
 
          goto err;
 
1630
        if (escape_char != -1)                  // Use \N syntax
 
1631
        {
 
1632
          null_buff[0]=escape_char;
 
1633
          null_buff[1]='N';
 
1634
          if (my_b_write(&cache,(uchar*) null_buff,2))
 
1635
            goto err;
 
1636
        }
 
1637
        else if (my_b_write(&cache,(uchar*) "NULL",4))
 
1638
          goto err;
1191
1639
      }
1192
1640
      else
1193
1641
      {
1194
 
        used_length=0;                          // Fill with space
 
1642
        used_length=0;                          // Fill with space
1195
1643
      }
1196
1644
    }
1197
1645
    else
1198
1646
    {
1199
1647
      if (fixed_row_size)
1200
 
        used_length= min(res->length(),item->max_length);
 
1648
        used_length=min(res->length(),item->max_length);
1201
1649
      else
1202
 
        used_length= res->length();
1203
 
 
 
1650
        used_length=res->length();
1204
1651
      if ((result_type == STRING_RESULT || is_unsafe_field_sep) &&
1205
 
          escape_char != -1)
 
1652
           escape_char != -1)
1206
1653
      {
1207
1654
        char *pos, *start, *end;
1208
1655
        const CHARSET_INFO * const res_charset= res->charset();
1209
 
        const CHARSET_INFO * const character_set_client= default_charset_info;
1210
 
 
 
1656
        const CHARSET_INFO * const character_set_client= thd->variables.
 
1657
                                                            character_set_client;
1211
1658
        bool check_second_byte= (res_charset == &my_charset_bin) &&
1212
 
          character_set_client->
1213
 
          escape_with_backslash_is_dangerous;
 
1659
                                 character_set_client->
 
1660
                                 escape_with_backslash_is_dangerous;
1214
1661
        assert(character_set_client->mbmaxlen == 2 ||
1215
 
               !character_set_client->escape_with_backslash_is_dangerous);
1216
 
        for (start=pos=(char*) res->ptr(),end=pos+used_length ;
1217
 
             pos != end ;
1218
 
             pos++)
1219
 
        {
1220
 
          if (use_mb(res_charset))
1221
 
          {
1222
 
            int l;
1223
 
            if ((l=my_ismbchar(res_charset, pos, end)))
1224
 
            {
1225
 
              pos += l-1;
1226
 
              continue;
1227
 
            }
1228
 
          }
 
1662
                    !character_set_client->escape_with_backslash_is_dangerous);
 
1663
        for (start=pos=(char*) res->ptr(),end=pos+used_length ;
 
1664
             pos != end ;
 
1665
             pos++)
 
1666
        {
 
1667
#ifdef USE_MB
 
1668
          if (use_mb(res_charset))
 
1669
          {
 
1670
            int l;
 
1671
            if ((l=my_ismbchar(res_charset, pos, end)))
 
1672
            {
 
1673
              pos += l-1;
 
1674
              continue;
 
1675
            }
 
1676
          }
 
1677
#endif
1229
1678
 
1230
1679
          /*
1231
1680
            Special case when dumping BINARY/VARBINARY/BLOB values
1232
1681
            for the clients with character sets big5, cp932, gbk and sjis,
1233
1682
            which can have the escape character (0x5C "\" by default)
1234
1683
            as the second byte of a multi-byte sequence.
1235
 
 
 
1684
            
1236
1685
            If
1237
1686
            - pos[0] is a valid multi-byte head (e.g 0xEE) and
1238
1687
            - pos[1] is 0x00, which will be escaped as "\0",
1239
 
 
 
1688
            
1240
1689
            then we'll get "0xEE + 0x5C + 0x30" in the output file.
1241
 
 
 
1690
            
1242
1691
            If this file is later loaded using this sequence of commands:
1243
 
 
 
1692
            
1244
1693
            mysql> create table t1 (a varchar(128)) character set big5;
1245
1694
            mysql> LOAD DATA INFILE 'dump.txt' INTO Table t1;
1246
 
 
 
1695
            
1247
1696
            then 0x5C will be misinterpreted as the second byte
1248
1697
            of a multi-byte character "0xEE + 0x5C", instead of
1249
1698
            escape character for 0x00.
1250
 
 
 
1699
            
1251
1700
            To avoid this confusion, we'll escape the multi-byte
1252
1701
            head character too, so the sequence "0xEE + 0x00" will be
1253
1702
            dumped as "0x5C + 0xEE + 0x5C + 0x30".
1254
 
 
 
1703
            
1255
1704
            Note, in the condition below we only check if
1256
1705
            mbcharlen is equal to 2, because there are no
1257
1706
            character sets with mbmaxlen longer than 2
1259
1708
            assert before the loop makes that sure.
1260
1709
          */
1261
1710
 
1262
 
          if ((needs_escaping(*pos, enclosed) ||
 
1711
          if ((NEED_ESCAPING(*pos) ||
1263
1712
               (check_second_byte &&
1264
 
                my_mbcharlen(character_set_client, (unsigned char) *pos) == 2 &&
 
1713
                my_mbcharlen(character_set_client, (uchar) *pos) == 2 &&
1265
1714
                pos + 1 < end &&
1266
 
                needs_escaping(pos[1], enclosed))) &&
 
1715
                NEED_ESCAPING(pos[1]))) &&
1267
1716
              /*
1268
 
                Don't escape field_term_char by doubling - doubling is only
1269
 
                valid for ENCLOSED BY characters:
 
1717
               Don't escape field_term_char by doubling - doubling is only
 
1718
               valid for ENCLOSED BY characters:
1270
1719
              */
1271
1720
              (enclosed || !is_ambiguous_field_term ||
1272
 
               (int) (unsigned char) *pos != field_term_char))
 
1721
               (int) (uchar) *pos != field_term_char))
1273
1722
          {
1274
 
            char tmp_buff[2];
1275
 
            tmp_buff[0]= ((int) (unsigned char) *pos == field_sep_char &&
 
1723
            char tmp_buff[2];
 
1724
            tmp_buff[0]= ((int) (uchar) *pos == field_sep_char &&
1276
1725
                          is_ambiguous_field_sep) ?
1277
 
              field_sep_char : escape_char;
1278
 
            tmp_buff[1]= *pos ? *pos : '0';
1279
 
            if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)) ||
1280
 
                my_b_write(cache,(unsigned char*) tmp_buff,2))
1281
 
              goto err;
1282
 
            start=pos+1;
1283
 
          }
1284
 
        }
1285
 
        if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)))
1286
 
          goto err;
 
1726
                          field_sep_char : escape_char;
 
1727
            tmp_buff[1]= *pos ? *pos : '0';
 
1728
            if (my_b_write(&cache,(uchar*) start,(uint) (pos-start)) ||
 
1729
                my_b_write(&cache,(uchar*) tmp_buff,2))
 
1730
              goto err;
 
1731
            start=pos+1;
 
1732
          }
 
1733
        }
 
1734
        if (my_b_write(&cache,(uchar*) start,(uint) (pos-start)))
 
1735
          goto err;
1287
1736
      }
1288
 
      else if (my_b_write(cache,(unsigned char*) res->ptr(),used_length))
1289
 
        goto err;
 
1737
      else if (my_b_write(&cache,(uchar*) res->ptr(),used_length))
 
1738
        goto err;
1290
1739
    }
1291
1740
    if (fixed_row_size)
1292
1741
    {                                           // Fill with space
1293
1742
      if (item->max_length > used_length)
1294
1743
      {
1295
 
        /* QQ:  Fix by adding a my_b_fill() function */
1296
 
        if (!space_inited)
1297
 
        {
1298
 
          space_inited=1;
1299
 
          memset(space, ' ', sizeof(space));
1300
 
        }
1301
 
        uint32_t length=item->max_length-used_length;
1302
 
        for (; length > sizeof(space) ; length-=sizeof(space))
1303
 
        {
1304
 
          if (my_b_write(cache,(unsigned char*) space,sizeof(space)))
1305
 
            goto err;
1306
 
        }
1307
 
        if (my_b_write(cache,(unsigned char*) space,length))
1308
 
          goto err;
 
1744
        /* QQ:  Fix by adding a my_b_fill() function */
 
1745
        if (!space_inited)
 
1746
        {
 
1747
          space_inited=1;
 
1748
          memset(space, ' ', sizeof(space));
 
1749
        }
 
1750
        uint length=item->max_length-used_length;
 
1751
        for (; length > sizeof(space) ; length-=sizeof(space))
 
1752
        {
 
1753
          if (my_b_write(&cache,(uchar*) space,sizeof(space)))
 
1754
            goto err;
 
1755
        }
 
1756
        if (my_b_write(&cache,(uchar*) space,length))
 
1757
          goto err;
1309
1758
      }
1310
1759
    }
1311
1760
    if (res && enclosed)
1312
1761
    {
1313
 
      if (my_b_write(cache, (unsigned char*) exchange->enclosed->ptr(),
 
1762
      if (my_b_write(&cache, (uchar*) exchange->enclosed->ptr(),
1314
1763
                     exchange->enclosed->length()))
1315
1764
        goto err;
1316
1765
    }
1317
1766
    if (--items_left)
1318
1767
    {
1319
 
      if (my_b_write(cache, (unsigned char*) exchange->field_term->ptr(),
 
1768
      if (my_b_write(&cache, (uchar*) exchange->field_term->ptr(),
1320
1769
                     field_term_length))
1321
1770
        goto err;
1322
1771
    }
1323
1772
  }
1324
 
  if (my_b_write(cache,(unsigned char*) exchange->line_term->ptr(),
1325
 
                 exchange->line_term->length()))
 
1773
  if (my_b_write(&cache,(uchar*) exchange->line_term->ptr(),
 
1774
                 exchange->line_term->length()))
1326
1775
    goto err;
1327
1776
  return(0);
1328
1777
err:
1336
1785
 
1337
1786
 
1338
1787
int
1339
 
select_dump::prepare(List<Item> &, Select_Lex_Unit *u)
 
1788
select_dump::prepare(List<Item> &list __attribute__((unused)),
 
1789
                     SELECT_LEX_UNIT *u)
1340
1790
{
1341
1791
  unit= u;
1342
 
  return (int) ((file= create_file(session, path, exchange, cache)) < 0);
 
1792
  return (int) ((file= create_file(thd, path, exchange, &cache)) < 0);
1343
1793
}
1344
1794
 
1345
1795
 
1356
1806
    unit->offset_limit_cnt--;
1357
1807
    return(0);
1358
1808
  }
1359
 
  if (row_count++ > 1)
 
1809
  if (row_count++ > 1) 
1360
1810
  {
1361
1811
    my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0));
1362
1812
    goto err;
1366
1816
    res=item->str_result(&tmp);
1367
1817
    if (!res)                                   // If NULL
1368
1818
    {
1369
 
      if (my_b_write(cache,(unsigned char*) "",1))
 
1819
      if (my_b_write(&cache,(uchar*) "",1))
1370
1820
        goto err;
1371
1821
    }
1372
 
    else if (my_b_write(cache,(unsigned char*) res->ptr(),res->length()))
 
1822
    else if (my_b_write(&cache,(uchar*) res->ptr(),res->length()))
1373
1823
    {
1374
 
      my_error(ER_ERROR_ON_WRITE, MYF(0), path, errno);
 
1824
      my_error(ER_ERROR_ON_WRITE, MYF(0), path, my_errno);
1375
1825
      goto err;
1376
1826
    }
1377
1827
  }
1402
1852
  }
1403
1853
  List_iterator_fast<Item> li(items);
1404
1854
  Item *val_item;
1405
 
  for (uint32_t i= 0; (val_item= li++); i++)
 
1855
  for (uint i= 0; (val_item= li++); i++)
1406
1856
    it->store(i, val_item);
1407
1857
  it->assigned(1);
1408
1858
  return(0);
1412
1862
void select_max_min_finder_subselect::cleanup()
1413
1863
{
1414
1864
  cache= 0;
 
1865
  return;
1415
1866
}
1416
1867
 
1417
1868
 
1518
1969
     sortcmp(val1, val2, cache->collation.collation) < 0);
1519
1970
}
1520
1971
 
1521
 
bool select_exists_subselect::send_data(List<Item> &)
 
1972
bool select_exists_subselect::send_data(List<Item> &items __attribute__((unused)))
1522
1973
{
1523
1974
  Item_exists_subselect *it= (Item_exists_subselect *)item;
1524
1975
  if (unit->offset_limit_cnt)
1531
1982
  return(0);
1532
1983
}
1533
1984
 
 
1985
 
 
1986
/***************************************************************************
 
1987
  Dump of select to variables
 
1988
***************************************************************************/
 
1989
 
 
1990
int select_dumpvar::prepare(List<Item> &list, SELECT_LEX_UNIT *u)
 
1991
{
 
1992
  unit= u;
 
1993
  
 
1994
  if (var_list.elements != list.elements)
 
1995
  {
 
1996
    my_message(ER_WRONG_NUMBER_OF_COLUMNS_IN_SELECT,
 
1997
               ER(ER_WRONG_NUMBER_OF_COLUMNS_IN_SELECT), MYF(0));
 
1998
    return 1;
 
1999
  }               
 
2000
  return 0;
 
2001
}
 
2002
 
 
2003
 
 
2004
bool select_dumpvar::check_simple_select() const
 
2005
{
 
2006
  my_error(ER_SP_BAD_CURSOR_SELECT, MYF(0));
 
2007
  return true;
 
2008
}
 
2009
 
 
2010
 
 
2011
void select_dumpvar::cleanup()
 
2012
{
 
2013
  row_count= 0;
 
2014
}
 
2015
 
 
2016
 
 
2017
void Query_arena::free_items()
 
2018
{
 
2019
  Item *next;
 
2020
  /* This works because items are allocated with sql_alloc() */
 
2021
  for (; free_list; free_list= next)
 
2022
  {
 
2023
    next= free_list->next;
 
2024
    free_list->delete_self();
 
2025
  }
 
2026
  /* Postcondition: free_list is 0 */
 
2027
  return;
 
2028
}
 
2029
 
 
2030
 
 
2031
void Query_arena::set_query_arena(Query_arena *set)
 
2032
{
 
2033
  mem_root= set->mem_root;
 
2034
  free_list= set->free_list;
 
2035
  state= set->state;
 
2036
}
 
2037
 
 
2038
 
 
2039
void Query_arena::cleanup_stmt()
 
2040
{
 
2041
  assert("not implemented");
 
2042
}
 
2043
 
1534
2044
/*
1535
 
  Don't free mem_root, as mem_root is freed in the end of dispatch_command
1536
 
  (once for any command).
 
2045
  Statement functions
1537
2046
*/
1538
 
void Session::end_statement()
 
2047
 
 
2048
Statement::Statement(LEX *lex_arg, MEM_ROOT *mem_root_arg,
 
2049
                     enum enum_state state_arg, ulong id_arg)
 
2050
  :Query_arena(mem_root_arg, state_arg),
 
2051
  id(id_arg),
 
2052
  mark_used_columns(MARK_COLUMNS_READ),
 
2053
  lex(lex_arg),
 
2054
  query(0),
 
2055
  query_length(0),
 
2056
  db(NULL),
 
2057
  db_length(0)
 
2058
{
 
2059
  name.str= NULL;
 
2060
}
 
2061
 
 
2062
 
 
2063
void Statement::set_statement(Statement *stmt)
 
2064
{
 
2065
  id=             stmt->id;
 
2066
  mark_used_columns=   stmt->mark_used_columns;
 
2067
  lex=            stmt->lex;
 
2068
  query=          stmt->query;
 
2069
  query_length=   stmt->query_length;
 
2070
}
 
2071
 
 
2072
 
 
2073
void
 
2074
Statement::set_n_backup_statement(Statement *stmt, Statement *backup)
 
2075
{
 
2076
  backup->set_statement(this);
 
2077
  set_statement(stmt);
 
2078
  return;
 
2079
}
 
2080
 
 
2081
 
 
2082
void Statement::restore_backup_statement(Statement *stmt, Statement *backup)
 
2083
{
 
2084
  stmt->set_statement(this);
 
2085
  set_statement(backup);
 
2086
  return;
 
2087
}
 
2088
 
 
2089
 
 
2090
void THD::end_statement()
1539
2091
{
1540
2092
  /* Cleanup SQL processing state to reuse this statement in next query. */
1541
2093
  lex_end(lex);
1542
 
}
1543
 
 
1544
 
bool Session::copy_db_to(char **p_db, size_t *p_db_length)
1545
 
{
1546
 
  if (db.empty())
 
2094
  delete lex->result;
 
2095
  lex->result= 0;
 
2096
  /* Note that free_list is freed in cleanup_after_query() */
 
2097
 
 
2098
  /*
 
2099
    Don't free mem_root, as mem_root is freed in the end of dispatch_command
 
2100
    (once for any command).
 
2101
  */
 
2102
}
 
2103
 
 
2104
 
 
2105
void THD::set_n_backup_active_arena(Query_arena *set, Query_arena *backup)
 
2106
{
 
2107
  assert(backup->is_backup_arena == false);
 
2108
 
 
2109
  backup->set_query_arena(this);
 
2110
  set_query_arena(set);
 
2111
  backup->is_backup_arena= true;
 
2112
  return;
 
2113
}
 
2114
 
 
2115
 
 
2116
void THD::restore_active_arena(Query_arena *set, Query_arena *backup)
 
2117
{
 
2118
  assert(backup->is_backup_arena);
 
2119
  set->set_query_arena(this);
 
2120
  set_query_arena(backup);
 
2121
  backup->is_backup_arena= false;
 
2122
  return;
 
2123
}
 
2124
 
 
2125
 
 
2126
bool THD::copy_db_to(char **p_db, size_t *p_db_length)
 
2127
{
 
2128
  if (db == NULL)
1547
2129
  {
1548
2130
    my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1549
2131
    return true;
1550
2132
  }
1551
 
  *p_db= strmake(db.c_str(), db.length());
1552
 
  *p_db_length= db.length();
 
2133
  *p_db= strmake(db, db_length);
 
2134
  *p_db_length= db_length;
1553
2135
  return false;
1554
2136
}
1555
2137
 
 
2138
 
 
2139
bool select_dumpvar::send_data(List<Item> &items)
 
2140
{
 
2141
  List_iterator_fast<my_var> var_li(var_list);
 
2142
  List_iterator<Item> it(items);
 
2143
  Item *item;
 
2144
  my_var *mv;
 
2145
 
 
2146
  if (unit->offset_limit_cnt)
 
2147
  {                                             // using limit offset,count
 
2148
    unit->offset_limit_cnt--;
 
2149
    return(0);
 
2150
  }
 
2151
  if (row_count++) 
 
2152
  {
 
2153
    my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0));
 
2154
    return(1);
 
2155
  }
 
2156
  while ((mv= var_li++) && (item= it++))
 
2157
  {
 
2158
    if (mv->local == 0)
 
2159
    {
 
2160
      Item_func_set_user_var *suv= new Item_func_set_user_var(mv->s, item);
 
2161
      suv->fix_fields(thd, 0);
 
2162
      suv->check(0);
 
2163
      suv->update();
 
2164
    }
 
2165
  }
 
2166
  return(thd->is_error());
 
2167
}
 
2168
 
 
2169
bool select_dumpvar::send_eof()
 
2170
{
 
2171
  if (! row_count)
 
2172
    push_warning(thd, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
2173
                 ER_SP_FETCH_NO_DATA, ER(ER_SP_FETCH_NO_DATA));
 
2174
  /*
 
2175
    In order to remember the value of affected rows for ROW_COUNT()
 
2176
    function, SELECT INTO has to have an own SQLCOM.
 
2177
    TODO: split from SQLCOM_SELECT
 
2178
  */
 
2179
  ::my_ok(thd,row_count);
 
2180
  return 0;
 
2181
}
 
2182
 
1556
2183
/****************************************************************************
1557
 
  Tmp_Table_Param
 
2184
  TMP_TABLE_PARAM
1558
2185
****************************************************************************/
1559
2186
 
1560
 
void Tmp_Table_Param::init()
 
2187
void TMP_TABLE_PARAM::init()
1561
2188
{
1562
2189
  field_count= sum_func_count= func_count= hidden_field_count= 0;
1563
2190
  group_parts= group_length= group_null_parts= 0;
1564
2191
  quick_group= 1;
1565
2192
  table_charset= 0;
1566
2193
  precomputed_group_by= 0;
 
2194
  bit_fields_as_long= 0;
 
2195
  return;
1567
2196
}
1568
2197
 
1569
 
void Tmp_Table_Param::cleanup(void)
 
2198
 
 
2199
void thd_increment_bytes_sent(ulong length)
1570
2200
{
1571
 
  /* Fix for Intel compiler */
1572
 
  if (copy_field)
1573
 
  {
1574
 
    delete [] copy_field;
1575
 
    save_copy_field= copy_field= 0;
 
2201
  THD *thd=current_thd;
 
2202
  if (likely(thd != 0))
 
2203
  { /* current_thd==0 when close_connection() calls net_send_error() */
 
2204
    thd->status_var.bytes_sent+= length;
1576
2205
  }
1577
2206
}
1578
2207
 
1579
 
void Session::send_kill_message() const
 
2208
 
 
2209
void thd_increment_bytes_received(ulong length)
 
2210
{
 
2211
  current_thd->status_var.bytes_received+= length;
 
2212
}
 
2213
 
 
2214
 
 
2215
void thd_increment_net_big_packet_count(ulong length)
 
2216
{
 
2217
  current_thd->status_var.net_big_packet_count+= length;
 
2218
}
 
2219
 
 
2220
void THD::send_kill_message() const
1580
2221
{
1581
2222
  int err= killed_errno();
1582
2223
  if (err)
1583
2224
    my_message(err, ER(err), MYF(0));
1584
2225
}
1585
2226
 
1586
 
void Session::set_status_var_init()
 
2227
void THD::set_status_var_init()
1587
2228
{
1588
2229
  memset(&status_var, 0, sizeof(status_var));
1589
2230
}
1590
2231
 
1591
2232
 
 
2233
void Security_context::init()
 
2234
{
 
2235
  user= ip= 0;
 
2236
}
 
2237
 
 
2238
 
 
2239
void Security_context::destroy()
 
2240
{
 
2241
  // If not pointer to constant
 
2242
  safeFree(user);
 
2243
  safeFree(ip);
 
2244
}
 
2245
 
 
2246
 
 
2247
void Security_context::skip_grants()
 
2248
{
 
2249
  /* privileges for the user are unknown everything is allowed */
 
2250
}
 
2251
 
 
2252
 
1592
2253
/****************************************************************************
1593
2254
  Handling of open and locked tables states.
1594
2255
 
1597
2258
  access to mysql.proc table to find definitions of stored routines.
1598
2259
****************************************************************************/
1599
2260
 
1600
 
void Session::reset_n_backup_open_tables_state(Open_tables_state *backup)
 
2261
void THD::reset_n_backup_open_tables_state(Open_tables_state *backup)
1601
2262
{
1602
2263
  backup->set_open_tables_state(this);
1603
2264
  reset_open_tables_state();
1604
 
  backups_available= false;
 
2265
  state_flags|= Open_tables_state::BACKUPS_AVAIL;
 
2266
  return;
1605
2267
}
1606
2268
 
1607
2269
 
1608
 
void Session::restore_backup_open_tables_state(Open_tables_state *backup)
 
2270
void THD::restore_backup_open_tables_state(Open_tables_state *backup)
1609
2271
{
1610
2272
  /*
1611
2273
    Before we will throw away current open tables state we want
1612
2274
    to be sure that it was properly cleaned up.
1613
2275
  */
1614
2276
  assert(open_tables == 0 && temporary_tables == 0 &&
1615
 
              derived_tables == 0 &&
1616
 
              lock == 0);
 
2277
              handler_tables == 0 && derived_tables == 0 &&
 
2278
              lock == 0 && locked_tables == 0);
1617
2279
  set_open_tables_state(backup);
1618
 
}
1619
 
 
1620
 
bool Session::set_db(const std::string &new_db)
1621
 
{
1622
 
  /* Do not reallocate memory if current chunk is big enough. */
1623
 
  if (new_db.length())
1624
 
    db= new_db;
1625
 
  else
1626
 
    db.clear();
1627
 
 
1628
 
  return false;
1629
 
}
1630
 
 
1631
 
 
1632
 
 
 
2280
  return;
 
2281
}
1633
2282
 
1634
2283
/**
1635
2284
  Check the killed state of a user thread
1636
 
  @param session  user thread
 
2285
  @param thd  user thread
1637
2286
  @retval 0 the user thread is active
1638
2287
  @retval 1 the user thread has been killed
1639
2288
*/
1640
 
extern "C" int session_killed(const Session *session)
 
2289
extern "C" int thd_killed(const DRIZZLE_THD thd)
1641
2290
{
1642
 
  return(session->killed);
 
2291
  return(thd->killed);
1643
2292
}
1644
2293
 
1645
2294
/**
1646
 
  Return the session id of a user session
1647
 
  @param pointer to Session object
1648
 
  @return session's id
 
2295
  Return the thread id of a user thread
 
2296
  @param thd user thread
 
2297
  @return thread id
1649
2298
*/
1650
 
extern "C" unsigned long session_get_thread_id(const Session *session)
1651
 
{
1652
 
  return (unsigned long) session->getSessionId();
1653
 
}
1654
 
 
1655
 
 
1656
 
const struct charset_info_st *session_charset(Session *session)
1657
 
{
1658
 
  return(session->charset());
1659
 
}
1660
 
 
1661
 
int session_non_transactional_update(const Session *session)
1662
 
{
1663
 
  return(session->transaction.all.hasModifiedNonTransData());
1664
 
}
1665
 
 
1666
 
void session_mark_transaction_to_rollback(Session *session, bool all)
1667
 
{
1668
 
  mark_transaction_to_rollback(session, all);
1669
 
}
 
2299
extern "C" unsigned long thd_get_thread_id(const DRIZZLE_THD thd)
 
2300
{
 
2301
  return((unsigned long)thd->thread_id);
 
2302
}
 
2303
 
 
2304
 
 
2305
#ifdef INNODB_COMPATIBILITY_HOOKS
 
2306
extern "C" const struct charset_info_st *thd_charset(DRIZZLE_THD thd)
 
2307
{
 
2308
  return(thd->charset());
 
2309
}
 
2310
 
 
2311
extern "C" char **thd_query(DRIZZLE_THD thd)
 
2312
{
 
2313
  return(&thd->query);
 
2314
}
 
2315
 
 
2316
extern "C" int thd_slave_thread(const DRIZZLE_THD thd)
 
2317
{
 
2318
  return(thd->slave_thread);
 
2319
}
 
2320
 
 
2321
extern "C" int thd_non_transactional_update(const DRIZZLE_THD thd)
 
2322
{
 
2323
  return(thd->transaction.all.modified_non_trans_table);
 
2324
}
 
2325
 
 
2326
extern "C" int thd_binlog_format(const DRIZZLE_THD thd)
 
2327
{
 
2328
  return (int) thd->variables.binlog_format;
 
2329
}
 
2330
 
 
2331
extern "C" void thd_mark_transaction_to_rollback(DRIZZLE_THD thd, bool all)
 
2332
{
 
2333
  mark_transaction_to_rollback(thd, all);
 
2334
}
 
2335
#endif // INNODB_COMPATIBILITY_HOOKS */
 
2336
 
1670
2337
 
1671
2338
/**
1672
2339
  Mark transaction to rollback and mark error as fatal to a sub-statement.
1673
2340
 
1674
 
  @param  session   Thread handle
 
2341
  @param  thd   Thread handle
1675
2342
  @param  all   true <=> rollback main transaction.
1676
2343
*/
1677
 
void mark_transaction_to_rollback(Session *session, bool all)
1678
 
{
1679
 
  if (session)
1680
 
  {
1681
 
    session->is_fatal_sub_stmt_error= true;
1682
 
    session->transaction_rollback_request= all;
1683
 
  }
1684
 
}
1685
 
 
1686
 
void Session::disconnect(uint32_t errcode, bool should_lock)
1687
 
{
1688
 
  /* Allow any plugins to cleanup their session variables */
1689
 
  plugin_sessionvar_cleanup(this);
1690
 
 
1691
 
  /* If necessary, log any aborted or unauthorized connections */
1692
 
  if (killed || client->wasAborted())
1693
 
    statistic_increment(aborted_threads, &LOCK_status);
1694
 
 
1695
 
  if (client->wasAborted())
1696
 
  {
1697
 
    if (! killed && variables.log_warnings > 1)
1698
 
    {
1699
 
      SecurityContext *sctx= &security_ctx;
1700
 
 
1701
 
      errmsg_printf(ERRMSG_LVL_WARN, ER(ER_NEW_ABORTING_CONNECTION)
1702
 
                  , thread_id
1703
 
                  , (db.empty() ? "unconnected" : db.c_str())
1704
 
                  , sctx->getUser().empty() == false ? sctx->getUser().c_str() : "unauthenticated"
1705
 
                  , sctx->getIp().c_str()
1706
 
                  , (main_da.is_error() ? main_da.message() : ER(ER_UNKNOWN_ERROR)));
1707
 
    }
1708
 
  }
1709
 
 
1710
 
  /* Close out our connection to the client */
1711
 
  if (should_lock)
1712
 
    (void) pthread_mutex_lock(&LOCK_thread_count);
1713
 
  killed= Session::KILL_CONNECTION;
1714
 
  if (client->isConnected())
1715
 
  {
1716
 
    if (errcode)
1717
 
    {
1718
 
      /*my_error(errcode, ER(errcode));*/
1719
 
      client->sendError(errcode, ER(errcode));
1720
 
    }
1721
 
    client->close();
1722
 
  }
1723
 
  if (should_lock)
1724
 
    (void) pthread_mutex_unlock(&LOCK_thread_count);
1725
 
}
1726
 
 
1727
 
void Session::reset_for_next_command()
1728
 
{
1729
 
  free_list= 0;
1730
 
  select_number= 1;
1731
 
  /*
1732
 
    Those two lines below are theoretically unneeded as
1733
 
    Session::cleanup_after_query() should take care of this already.
1734
 
  */
1735
 
  auto_inc_intervals_in_cur_stmt_for_binlog.empty();
1736
 
 
1737
 
  is_fatal_error= false;
1738
 
  server_status&= ~ (SERVER_MORE_RESULTS_EXISTS |
1739
 
                          SERVER_QUERY_NO_INDEX_USED |
1740
 
                          SERVER_QUERY_NO_GOOD_INDEX_USED);
1741
 
 
1742
 
  clear_error();
1743
 
  main_da.reset_diagnostics_area();
1744
 
  total_warn_count=0;                   // Warnings for this query
1745
 
  sent_row_count= examined_row_count= 0;
1746
 
}
1747
 
 
1748
 
/*
1749
 
  Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
1750
 
*/
1751
 
 
1752
 
void Session::close_temporary_tables()
1753
 
{
1754
 
  Table *table;
1755
 
  Table *tmp_next;
1756
 
 
1757
 
  if (not temporary_tables)
1758
 
    return;
1759
 
 
1760
 
  for (table= temporary_tables; table; table= tmp_next)
1761
 
  {
1762
 
    tmp_next= table->next;
1763
 
    nukeTable(table);
1764
 
  }
1765
 
  temporary_tables= NULL;
1766
 
}
1767
 
 
1768
 
/*
1769
 
  unlink from session->temporary tables and close temporary table
1770
 
*/
1771
 
 
1772
 
void Session::close_temporary_table(Table *table)
1773
 
{
1774
 
  if (table->prev)
1775
 
  {
1776
 
    table->prev->next= table->next;
1777
 
    if (table->prev->next)
1778
 
      table->next->prev= table->prev;
1779
 
  }
1780
 
  else
1781
 
  {
1782
 
    /* removing the item from the list */
1783
 
    assert(table == temporary_tables);
1784
 
    /*
1785
 
      slave must reset its temporary list pointer to zero to exclude
1786
 
      passing non-zero value to end_slave via rli->save_temporary_tables
1787
 
      when no temp tables opened, see an invariant below.
1788
 
    */
1789
 
    temporary_tables= table->next;
1790
 
    if (temporary_tables)
1791
 
      table->next->prev= NULL;
1792
 
  }
1793
 
  nukeTable(table);
1794
 
}
1795
 
 
1796
 
/*
1797
 
  Close and drop a temporary table
1798
 
 
1799
 
  NOTE
1800
 
  This dosn't unlink table from session->temporary
1801
 
  If this is needed, use close_temporary_table()
1802
 
*/
1803
 
 
1804
 
void Session::nukeTable(Table *table)
1805
 
{
1806
 
  plugin::StorageEngine *table_type= table->s->db_type();
1807
 
 
1808
 
  table->free_io_cache();
1809
 
  table->closefrm(false);
1810
 
 
1811
 
  TableIdentifier identifier(table->s->getSchemaName(), table->s->table_name.str, table->s->path.str);
1812
 
  rm_temporary_table(table_type, identifier);
1813
 
 
1814
 
  table->s->free_table_share();
1815
 
 
1816
 
  /* This makes me sad, but we're allocating it via malloc */
1817
 
  free(table);
1818
 
}
1819
 
 
1820
 
/** Clear most status variables. */
1821
 
extern time_t flush_status_time;
1822
 
extern uint32_t max_used_connections;
1823
 
 
1824
 
void Session::refresh_status()
1825
 
{
1826
 
  pthread_mutex_lock(&LOCK_status);
1827
 
 
1828
 
  /* Add thread's status variabes to global status */
1829
 
  add_to_status(&global_status_var, &status_var);
1830
 
 
1831
 
  /* Reset thread's status variables */
1832
 
  memset(&status_var, 0, sizeof(status_var));
1833
 
 
1834
 
  /* Reset some global variables */
1835
 
  reset_status_vars();
1836
 
 
1837
 
  /* Reset the counters of all key caches (default and named). */
1838
 
  reset_key_cache_counters();
1839
 
  flush_status_time= time((time_t*) 0);
1840
 
  max_used_connections= 1; /* We set it to one, because we know we exist */
1841
 
  pthread_mutex_unlock(&LOCK_status);
1842
 
}
1843
 
 
1844
 
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1845
 
{
1846
 
  user_var_entry *entry= NULL;
1847
 
 
1848
 
  entry= (user_var_entry*) hash_search(&user_vars, (unsigned char*) name.str, name.length);
1849
 
 
1850
 
  if ((entry == NULL) && create_if_not_exists)
1851
 
  {
1852
 
    if (!hash_inited(&user_vars))
1853
 
      return NULL;
1854
 
    entry= new (nothrow) user_var_entry(name.str, query_id);
1855
 
 
1856
 
    if (entry == NULL)
1857
 
      return NULL;
1858
 
 
1859
 
    if (my_hash_insert(&user_vars, (unsigned char*) entry))
1860
 
    {
1861
 
      assert(1);
1862
 
      free((char*) entry);
1863
 
      return 0;
1864
 
    }
1865
 
 
1866
 
  }
1867
 
 
1868
 
  return entry;
1869
 
}
1870
 
 
1871
 
void Session::mark_temp_tables_as_free_for_reuse()
1872
 
{
1873
 
  for (Table *table= temporary_tables ; table ; table= table->next)
1874
 
  {
1875
 
    if (table->query_id == query_id)
1876
 
    {
1877
 
      table->query_id= 0;
1878
 
      table->cursor->ha_reset();
1879
 
    }
1880
 
  }
1881
 
}
1882
 
 
1883
 
void Session::mark_used_tables_as_free_for_reuse(Table *table)
1884
 
{
1885
 
  for (; table ; table= table->next)
1886
 
  {
1887
 
    if (table->query_id == query_id)
1888
 
    {
1889
 
      table->query_id= 0;
1890
 
      table->cursor->ha_reset();
1891
 
    }
1892
 
  }
1893
 
}
1894
 
 
1895
 
/*
1896
 
  Unlocks tables and frees derived tables.
1897
 
  Put all normal tables used by thread in free list.
1898
 
 
1899
 
  It will only close/mark as free for reuse tables opened by this
1900
 
  substatement, it will also check if we are closing tables after
1901
 
  execution of complete query (i.e. we are on upper level) and will
1902
 
  leave prelocked mode if needed.
1903
 
*/
1904
 
void Session::close_thread_tables()
1905
 
{
1906
 
  Table *table;
1907
 
 
1908
 
  /*
1909
 
    We are assuming here that session->derived_tables contains ONLY derived
1910
 
    tables for this substatement. i.e. instead of approach which uses
1911
 
    query_id matching for determining which of the derived tables belong
1912
 
    to this substatement we rely on the ability of substatements to
1913
 
    save/restore session->derived_tables during their execution.
1914
 
 
1915
 
    TODO: Probably even better approach is to simply associate list of
1916
 
          derived tables with (sub-)statement instead of thread and destroy
1917
 
          them at the end of its execution.
1918
 
  */
1919
 
  if (derived_tables)
1920
 
  {
1921
 
    Table *next;
1922
 
    /*
1923
 
      Close all derived tables generated in queries like
1924
 
      SELECT * FROM (SELECT * FROM t1)
1925
 
    */
1926
 
    for (table= derived_tables ; table ; table= next)
1927
 
    {
1928
 
      next= table->next;
1929
 
      table->free_tmp_table(this);
1930
 
    }
1931
 
    derived_tables= 0;
1932
 
  }
1933
 
 
1934
 
  /*
1935
 
    Mark all temporary tables used by this statement as free for reuse.
1936
 
  */
1937
 
  mark_temp_tables_as_free_for_reuse();
1938
 
  /*
1939
 
    Let us commit transaction for statement. Since in 5.0 we only have
1940
 
    one statement transaction and don't allow several nested statement
1941
 
    transactions this call will do nothing if we are inside of stored
1942
 
    function or trigger (i.e. statement transaction is already active and
1943
 
    does not belong to statement for which we do close_thread_tables()).
1944
 
    TODO: This should be fixed in later releases.
1945
 
   */
1946
 
  if (backups_available == false)
1947
 
  {
1948
 
    TransactionServices &transaction_services= TransactionServices::singleton();
1949
 
    main_da.can_overwrite_status= true;
1950
 
    transaction_services.ha_autocommit_or_rollback(this, is_error());
1951
 
    main_da.can_overwrite_status= false;
1952
 
    transaction.stmt.reset();
1953
 
  }
1954
 
 
1955
 
  if (lock)
1956
 
  {
1957
 
    /*
1958
 
      For RBR we flush the pending event just before we unlock all the
1959
 
      tables.  This means that we are at the end of a topmost
1960
 
      statement, so we ensure that the STMT_END_F flag is set on the
1961
 
      pending event.  For statements that are *inside* stored
1962
 
      functions, the pending event will not be flushed: that will be
1963
 
      handled either before writing a query log event (inside
1964
 
      binlog_query()) or when preparing a pending event.
1965
 
     */
1966
 
    mysql_unlock_tables(this, lock);
1967
 
    lock= 0;
1968
 
  }
1969
 
  /*
1970
 
    Note that we need to hold LOCK_open while changing the
1971
 
    open_tables list. Another thread may work on it.
1972
 
    (See: remove_table_from_cache(), mysql_wait_completed_table())
1973
 
    Closing a MERGE child before the parent would be fatal if the
1974
 
    other thread tries to abort the MERGE lock in between.
1975
 
  */
1976
 
  if (open_tables)
1977
 
    close_open_tables();
1978
 
}
1979
 
 
1980
 
void Session::close_tables_for_reopen(TableList **tables)
1981
 
{
1982
 
  /*
1983
 
    If table list consists only from tables from prelocking set, table list
1984
 
    for new attempt should be empty, so we have to update list's root pointer.
1985
 
  */
1986
 
  if (lex->first_not_own_table() == *tables)
1987
 
    *tables= 0;
1988
 
  lex->chop_off_not_own_tables();
1989
 
  for (TableList *tmp= *tables; tmp; tmp= tmp->next_global)
1990
 
    tmp->table= 0;
1991
 
  close_thread_tables();
1992
 
}
1993
 
 
1994
 
bool Session::openTablesLock(TableList *tables)
1995
 
{
1996
 
  uint32_t counter;
1997
 
  bool need_reopen;
1998
 
 
1999
 
  for ( ; ; )
2000
 
  {
2001
 
    if (open_tables_from_list(&tables, &counter))
2002
 
      return true;
2003
 
 
2004
 
    if (not lock_tables(tables, counter, &need_reopen))
2005
 
      break;
2006
 
    if (not need_reopen)
2007
 
      return true;
2008
 
    close_tables_for_reopen(&tables);
2009
 
  }
2010
 
  if ((mysql_handle_derived(lex, &mysql_derived_prepare) ||
2011
 
       (fill_derived_tables() &&
2012
 
        mysql_handle_derived(lex, &mysql_derived_filling))))
2013
 
    return true;
2014
 
 
2015
 
  return false;
2016
 
}
2017
 
 
2018
 
bool Session::openTables(TableList *tables, uint32_t flags)
2019
 
{
2020
 
  uint32_t counter;
2021
 
  bool ret= fill_derived_tables();
2022
 
  assert(ret == false);
2023
 
  if (open_tables_from_list(&tables, &counter, flags) ||
2024
 
      mysql_handle_derived(lex, &mysql_derived_prepare))
2025
 
    return true;
2026
 
  return false;
2027
 
}
2028
 
 
2029
 
bool Session::rm_temporary_table(TableIdentifier &identifier)
2030
 
{
2031
 
  if (plugin::StorageEngine::dropTable(*this, identifier))
2032
 
  {
2033
 
    errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
2034
 
                  identifier.getSQLPath().c_str(), errno);
2035
 
    dumpTemporaryTableNames("rm_temporary_table()");
2036
 
 
2037
 
    return true;
2038
 
  }
2039
 
 
2040
 
  return false;
2041
 
}
2042
 
 
2043
 
bool Session::rm_temporary_table(plugin::StorageEngine *base, TableIdentifier &identifier)
2044
 
{
2045
 
  assert(base);
2046
 
 
2047
 
  if (plugin::StorageEngine::dropTable(*this, *base, identifier))
2048
 
  {
2049
 
    errmsg_printf(ERRMSG_LVL_WARN, _("Could not remove temporary table: '%s', error: %d"),
2050
 
                  identifier.getSQLPath().c_str(), errno);
2051
 
    dumpTemporaryTableNames("rm_temporary_table()");
2052
 
 
2053
 
    return true;
2054
 
  }
2055
 
 
2056
 
  return false;
2057
 
}
2058
 
 
2059
 
/**
2060
 
  @note this will be removed, I am looking through Hudson to see if it is finding
2061
 
  any tables that are missed during cleanup.
2062
 
*/
2063
 
void Session::dumpTemporaryTableNames(const char *foo)
2064
 
{
2065
 
  Table *table;
2066
 
 
2067
 
  if (not temporary_tables)
2068
 
    return;
2069
 
 
2070
 
  cerr << "Begin Run: " << foo << "\n";
2071
 
  for (table= temporary_tables; table; table= table->next)
2072
 
  {
2073
 
    bool have_proto= false;
2074
 
 
2075
 
    message::Table *proto= table->s->getTableProto();
2076
 
    if (table->s->getTableProto())
2077
 
      have_proto= true;
2078
 
 
2079
 
    const char *answer= have_proto ? "true" : "false";
2080
 
 
2081
 
    if (have_proto)
2082
 
    {
2083
 
      cerr << "\tTable Name " << table->s->getSchemaName() << "." << table->s->table_name.str << " : " << answer << "\n";
2084
 
      cerr << "\t\t Proto " << proto->schema() << " " << proto->name() << "\n";
2085
 
    }
2086
 
    else
2087
 
      cerr << "\tTabl;e Name " << table->s->getSchemaName() << "." << table->s->table_name.str << " : " << answer << "\n";
2088
 
  }
2089
 
}
2090
 
 
2091
 
bool Session::storeTableMessage(TableIdentifier &identifier, message::Table &table_message)
2092
 
{
2093
 
  table_message_cache.insert(make_pair(identifier.getPath(), table_message));
2094
 
 
2095
 
  return true;
2096
 
}
2097
 
 
2098
 
bool Session::removeTableMessage(TableIdentifier &identifier)
2099
 
{
2100
 
  TableMessageCache::iterator iter;
2101
 
 
2102
 
  iter= table_message_cache.find(identifier.getPath());
2103
 
 
2104
 
  if (iter == table_message_cache.end())
2105
 
    return false;
2106
 
 
2107
 
  table_message_cache.erase(iter);
2108
 
 
2109
 
  return true;
2110
 
}
2111
 
 
2112
 
bool Session::getTableMessage(TableIdentifier &identifier, message::Table &table_message)
2113
 
{
2114
 
  TableMessageCache::iterator iter;
2115
 
 
2116
 
  iter= table_message_cache.find(identifier.getPath());
2117
 
 
2118
 
  if (iter == table_message_cache.end())
2119
 
    return false;
2120
 
 
2121
 
  table_message.CopyFrom(((*iter).second));
2122
 
 
2123
 
  return true;
2124
 
}
2125
 
 
2126
 
bool Session::doesTableMessageExist(TableIdentifier &identifier)
2127
 
{
2128
 
  TableMessageCache::iterator iter;
2129
 
 
2130
 
  iter= table_message_cache.find(identifier.getPath());
2131
 
 
2132
 
  if (iter == table_message_cache.end())
2133
 
  {
2134
 
    return false;
2135
 
  }
2136
 
 
2137
 
  return true;
2138
 
}
2139
 
 
2140
 
bool Session::renameTableMessage(TableIdentifier &from, TableIdentifier &to)
2141
 
{
2142
 
  TableMessageCache::iterator iter;
2143
 
 
2144
 
  table_message_cache[to.getPath()]= table_message_cache[from.getPath()];
2145
 
 
2146
 
  iter= table_message_cache.find(to.getPath());
2147
 
 
2148
 
  if (iter == table_message_cache.end())
2149
 
  {
2150
 
    return false;
2151
 
  }
2152
 
 
2153
 
  (*iter).second.set_schema(to.getSchemaName());
2154
 
  (*iter).second.set_name(to.getTableName());
2155
 
 
2156
 
  return true;
2157
 
}
2158
 
 
2159
 
} /* namespace drizzled */
 
2344
 
 
2345
void mark_transaction_to_rollback(THD *thd, bool all)
 
2346
{
 
2347
  if (thd)
 
2348
  {
 
2349
    thd->is_fatal_sub_stmt_error= true;
 
2350
    thd->transaction_rollback_request= all;
 
2351
  }
 
2352
}
 
2353
/***************************************************************************
 
2354
  Handling of XA id cacheing
 
2355
***************************************************************************/
 
2356
 
 
2357
pthread_mutex_t LOCK_xid_cache;
 
2358
HASH xid_cache;
 
2359
 
 
2360
extern "C" uchar *xid_get_hash_key(const uchar *, size_t *, bool);
 
2361
extern "C" void xid_free_hash(void *);
 
2362
 
 
2363
uchar *xid_get_hash_key(const uchar *ptr, size_t *length,
 
2364
                        bool not_used __attribute__((unused)))
 
2365
{
 
2366
  *length=((XID_STATE*)ptr)->xid.key_length();
 
2367
  return ((XID_STATE*)ptr)->xid.key();
 
2368
}
 
2369
 
 
2370
void xid_free_hash(void *ptr)
 
2371
{
 
2372
  if (!((XID_STATE*)ptr)->in_thd)
 
2373
    my_free((uchar*)ptr, MYF(0));
 
2374
}
 
2375
 
 
2376
bool xid_cache_init()
 
2377
{
 
2378
  pthread_mutex_init(&LOCK_xid_cache, MY_MUTEX_INIT_FAST);
 
2379
  return hash_init(&xid_cache, &my_charset_bin, 100, 0, 0,
 
2380
                   xid_get_hash_key, xid_free_hash, 0) != 0;
 
2381
}
 
2382
 
 
2383
void xid_cache_free()
 
2384
{
 
2385
  if (hash_inited(&xid_cache))
 
2386
  {
 
2387
    hash_free(&xid_cache);
 
2388
    pthread_mutex_destroy(&LOCK_xid_cache);
 
2389
  }
 
2390
}
 
2391
 
 
2392
XID_STATE *xid_cache_search(XID *xid)
 
2393
{
 
2394
  pthread_mutex_lock(&LOCK_xid_cache);
 
2395
  XID_STATE *res=(XID_STATE *)hash_search(&xid_cache, xid->key(), xid->key_length());
 
2396
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2397
  return res;
 
2398
}
 
2399
 
 
2400
 
 
2401
bool xid_cache_insert(XID *xid, enum xa_states xa_state)
 
2402
{
 
2403
  XID_STATE *xs;
 
2404
  bool res;
 
2405
  pthread_mutex_lock(&LOCK_xid_cache);
 
2406
  if (hash_search(&xid_cache, xid->key(), xid->key_length()))
 
2407
    res=0;
 
2408
  else if (!(xs=(XID_STATE *)my_malloc(sizeof(*xs), MYF(MY_WME))))
 
2409
    res=1;
 
2410
  else
 
2411
  {
 
2412
    xs->xa_state=xa_state;
 
2413
    xs->xid.set(xid);
 
2414
    xs->in_thd=0;
 
2415
    res=my_hash_insert(&xid_cache, (uchar*)xs);
 
2416
  }
 
2417
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2418
  return res;
 
2419
}
 
2420
 
 
2421
 
 
2422
bool xid_cache_insert(XID_STATE *xid_state)
 
2423
{
 
2424
  pthread_mutex_lock(&LOCK_xid_cache);
 
2425
  assert(hash_search(&xid_cache, xid_state->xid.key(),
 
2426
                          xid_state->xid.key_length())==0);
 
2427
  bool res=my_hash_insert(&xid_cache, (uchar*)xid_state);
 
2428
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2429
  return res;
 
2430
}
 
2431
 
 
2432
 
 
2433
void xid_cache_delete(XID_STATE *xid_state)
 
2434
{
 
2435
  pthread_mutex_lock(&LOCK_xid_cache);
 
2436
  hash_delete(&xid_cache, (uchar *)xid_state);
 
2437
  pthread_mutex_unlock(&LOCK_xid_cache);
 
2438
}
 
2439
 
 
2440
/*
 
2441
  Implementation of interface to write rows to the binary log through the
 
2442
  thread.  The thread is responsible for writing the rows it has
 
2443
  inserted/updated/deleted.
 
2444
*/
 
2445
 
 
2446
#ifndef DRIZZLE_CLIENT
 
2447
 
 
2448
/*
 
2449
  Template member function for ensuring that there is an rows log
 
2450
  event of the apropriate type before proceeding.
 
2451
 
 
2452
  PRE CONDITION:
 
2453
    - Events of type 'RowEventT' have the type code 'type_code'.
 
2454
    
 
2455
  POST CONDITION:
 
2456
    If a non-NULL pointer is returned, the pending event for thread 'thd' will
 
2457
    be an event of type 'RowEventT' (which have the type code 'type_code')
 
2458
    will either empty or have enough space to hold 'needed' bytes.  In
 
2459
    addition, the columns bitmap will be correct for the row, meaning that
 
2460
    the pending event will be flushed if the columns in the event differ from
 
2461
    the columns suppled to the function.
 
2462
 
 
2463
  RETURNS
 
2464
    If no error, a non-NULL pending event (either one which already existed or
 
2465
    the newly created one).
 
2466
    If error, NULL.
 
2467
 */
 
2468
 
 
2469
template <class RowsEventT> Rows_log_event* 
 
2470
THD::binlog_prepare_pending_rows_event(Table* table, uint32_t serv_id,
 
2471
                                       size_t needed,
 
2472
                                       bool is_transactional,
 
2473
                                       RowsEventT *hint __attribute__((unused)))
 
2474
{
 
2475
  /* Pre-conditions */
 
2476
  assert(table->s->table_map_id != UINT32_MAX);
 
2477
 
 
2478
  /* Fetch the type code for the RowsEventT template parameter */
 
2479
  int const type_code= RowsEventT::TYPE_CODE;
 
2480
 
 
2481
  /*
 
2482
    There is no good place to set up the transactional data, so we
 
2483
    have to do it here.
 
2484
  */
 
2485
  if (binlog_setup_trx_data())
 
2486
    return(NULL);
 
2487
 
 
2488
  Rows_log_event* pending= binlog_get_pending_rows_event();
 
2489
 
 
2490
  if (unlikely(pending && !pending->is_valid()))
 
2491
    return(NULL);
 
2492
 
 
2493
  /*
 
2494
    Check if the current event is non-NULL and a write-rows
 
2495
    event. Also check if the table provided is mapped: if it is not,
 
2496
    then we have switched to writing to a new table.
 
2497
    If there is no pending event, we need to create one. If there is a pending
 
2498
    event, but it's not about the same table id, or not of the same type
 
2499
    (between Write, Update and Delete), or not the same affected columns, or
 
2500
    going to be too big, flush this event to disk and create a new pending
 
2501
    event.
 
2502
 
 
2503
    The last test is necessary for the Cluster injector to work
 
2504
    correctly. The reason is that the Cluster can inject two write
 
2505
    rows with different column bitmaps if there is an insert followed
 
2506
    by an update in the same transaction, and these are grouped into a
 
2507
    single epoch/transaction when fed to the injector.
 
2508
 
 
2509
    TODO: Fix the code so that the last test can be removed.
 
2510
  */
 
2511
  if (!pending ||
 
2512
      pending->server_id != serv_id || 
 
2513
      pending->get_table_id() != table->s->table_map_id ||
 
2514
      pending->get_type_code() != type_code || 
 
2515
      pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
 
2516
      !bitmap_cmp(pending->get_cols(), table->write_set))
 
2517
    {
 
2518
    /* Create a new RowsEventT... */
 
2519
    Rows_log_event* const
 
2520
        ev= new RowsEventT(this, table, table->s->table_map_id,
 
2521
                           is_transactional);
 
2522
    if (unlikely(!ev))
 
2523
      return(NULL);
 
2524
    ev->server_id= serv_id; // I don't like this, it's too easy to forget.
 
2525
    /*
 
2526
      flush the pending event and replace it with the newly created
 
2527
      event...
 
2528
    */
 
2529
    if (unlikely(mysql_bin_log.flush_and_set_pending_rows_event(this, ev)))
 
2530
    {
 
2531
      delete ev;
 
2532
      return(NULL);
 
2533
    }
 
2534
 
 
2535
    return(ev);               /* This is the new pending event */
 
2536
  }
 
2537
  return(pending);        /* This is the current pending event */
 
2538
}
 
2539
 
 
2540
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
2541
/*
 
2542
  Instantiate the versions we need, we have -fno-implicit-template as
 
2543
  compiling option.
 
2544
*/
 
2545
template Rows_log_event*
 
2546
THD::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
 
2547
                                       Write_rows_log_event*);
 
2548
 
 
2549
template Rows_log_event*
 
2550
THD::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
 
2551
                                       Delete_rows_log_event *);
 
2552
 
 
2553
template Rows_log_event* 
 
2554
THD::binlog_prepare_pending_rows_event(Table*, uint32_t, size_t, bool,
 
2555
                                       Update_rows_log_event *);
 
2556
#endif
 
2557
 
 
2558
namespace {
 
2559
  /**
 
2560
     Class to handle temporary allocation of memory for row data.
 
2561
 
 
2562
     The responsibilities of the class is to provide memory for
 
2563
     packing one or two rows of packed data (depending on what
 
2564
     constructor is called).
 
2565
 
 
2566
     In order to make the allocation more efficient for "simple" rows,
 
2567
     i.e., rows that do not contain any blobs, a pointer to the
 
2568
     allocated memory is of memory is stored in the table structure
 
2569
     for simple rows.  If memory for a table containing a blob field
 
2570
     is requested, only memory for that is allocated, and subsequently
 
2571
     released when the object is destroyed.
 
2572
 
 
2573
   */
 
2574
  class Row_data_memory {
 
2575
  public:
 
2576
    /**
 
2577
      Build an object to keep track of a block-local piece of memory
 
2578
      for storing a row of data.
 
2579
 
 
2580
      @param table
 
2581
      Table where the pre-allocated memory is stored.
 
2582
 
 
2583
      @param length
 
2584
      Length of data that is needed, if the record contain blobs.
 
2585
     */
 
2586
    Row_data_memory(Table *table, size_t const len1)
 
2587
      : m_memory(0)
 
2588
    {
 
2589
      m_alloc_checked= false;
 
2590
      allocate_memory(table, len1);
 
2591
      m_ptr[0]= has_memory() ? m_memory : 0;
 
2592
      m_ptr[1]= 0;
 
2593
    }
 
2594
 
 
2595
    Row_data_memory(Table *table, size_t const len1, size_t const len2)
 
2596
      : m_memory(0)
 
2597
    {
 
2598
      m_alloc_checked= false;
 
2599
      allocate_memory(table, len1 + len2);
 
2600
      m_ptr[0]= has_memory() ? m_memory        : 0;
 
2601
      m_ptr[1]= has_memory() ? m_memory + len1 : 0;
 
2602
    }
 
2603
 
 
2604
    ~Row_data_memory()
 
2605
    {
 
2606
      if (m_memory != 0 && m_release_memory_on_destruction)
 
2607
        my_free((uchar*) m_memory, MYF(MY_WME));
 
2608
    }
 
2609
 
 
2610
    /**
 
2611
       Is there memory allocated?
 
2612
 
 
2613
       @retval true There is memory allocated
 
2614
       @retval false Memory allocation failed
 
2615
     */
 
2616
    bool has_memory() const {
 
2617
      m_alloc_checked= true;
 
2618
      return m_memory != 0;
 
2619
    }
 
2620
 
 
2621
    uchar *slot(uint s)
 
2622
    {
 
2623
      assert(s < sizeof(m_ptr)/sizeof(*m_ptr));
 
2624
      assert(m_ptr[s] != 0);
 
2625
      assert(m_alloc_checked == true);
 
2626
      return m_ptr[s];
 
2627
    }
 
2628
 
 
2629
  private:
 
2630
    void allocate_memory(Table *const table, size_t const total_length)
 
2631
    {
 
2632
      if (table->s->blob_fields == 0)
 
2633
      {
 
2634
        /*
 
2635
          The maximum length of a packed record is less than this
 
2636
          length. We use this value instead of the supplied length
 
2637
          when allocating memory for records, since we don't know how
 
2638
          the memory will be used in future allocations.
 
2639
 
 
2640
          Since table->s->reclength is for unpacked records, we have
 
2641
          to add two bytes for each field, which can potentially be
 
2642
          added to hold the length of a packed field.
 
2643
        */
 
2644
        size_t const maxlen= table->s->reclength + 2 * table->s->fields;
 
2645
 
 
2646
        /*
 
2647
          Allocate memory for two records if memory hasn't been
 
2648
          allocated. We allocate memory for two records so that it can
 
2649
          be used when processing update rows as well.
 
2650
        */
 
2651
        if (table->write_row_record == 0)
 
2652
          table->write_row_record=
 
2653
            (uchar *) alloc_root(&table->mem_root, 2 * maxlen);
 
2654
        m_memory= table->write_row_record;
 
2655
        m_release_memory_on_destruction= false;
 
2656
      }
 
2657
      else
 
2658
      {
 
2659
        m_memory= (uchar *) my_malloc(total_length, MYF(MY_WME));
 
2660
        m_release_memory_on_destruction= true;
 
2661
      }
 
2662
    }
 
2663
 
 
2664
    mutable bool m_alloc_checked;
 
2665
    bool m_release_memory_on_destruction;
 
2666
    uchar *m_memory;
 
2667
    uchar *m_ptr[2];
 
2668
  };
 
2669
}
 
2670
 
 
2671
 
 
2672
int THD::binlog_write_row(Table* table, bool is_trans, 
 
2673
                          uchar const *record) 
 
2674
 
2675
  assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
 
2676
 
 
2677
  /*
 
2678
    Pack records into format for transfer. We are allocating more
 
2679
    memory than needed, but that doesn't matter.
 
2680
  */
 
2681
  Row_data_memory memory(table, table->max_row_length(record));
 
2682
  if (!memory.has_memory())
 
2683
    return HA_ERR_OUT_OF_MEM;
 
2684
 
 
2685
  uchar *row_data= memory.slot(0);
 
2686
 
 
2687
  size_t const len= pack_row(table, table->write_set, row_data, record);
 
2688
 
 
2689
  Rows_log_event* const ev=
 
2690
    binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
 
2691
                                      static_cast<Write_rows_log_event*>(0));
 
2692
 
 
2693
  if (unlikely(ev == 0))
 
2694
    return HA_ERR_OUT_OF_MEM;
 
2695
 
 
2696
  return ev->add_row_data(row_data, len);
 
2697
}
 
2698
 
 
2699
int THD::binlog_update_row(Table* table, bool is_trans,
 
2700
                           const uchar *before_record,
 
2701
                           const uchar *after_record)
 
2702
 
2703
  assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
 
2704
 
 
2705
  size_t const before_maxlen = table->max_row_length(before_record);
 
2706
  size_t const after_maxlen  = table->max_row_length(after_record);
 
2707
 
 
2708
  Row_data_memory row_data(table, before_maxlen, after_maxlen);
 
2709
  if (!row_data.has_memory())
 
2710
    return HA_ERR_OUT_OF_MEM;
 
2711
 
 
2712
  uchar *before_row= row_data.slot(0);
 
2713
  uchar *after_row= row_data.slot(1);
 
2714
 
 
2715
  size_t const before_size= pack_row(table, table->read_set, before_row,
 
2716
                                        before_record);
 
2717
  size_t const after_size= pack_row(table, table->write_set, after_row,
 
2718
                                       after_record);
 
2719
 
 
2720
  Rows_log_event* const ev=
 
2721
    binlog_prepare_pending_rows_event(table, server_id,
 
2722
                                      before_size + after_size, is_trans,
 
2723
                                      static_cast<Update_rows_log_event*>(0));
 
2724
 
 
2725
  if (unlikely(ev == 0))
 
2726
    return HA_ERR_OUT_OF_MEM;
 
2727
 
 
2728
  return
 
2729
    ev->add_row_data(before_row, before_size) ||
 
2730
    ev->add_row_data(after_row, after_size);
 
2731
}
 
2732
 
 
2733
int THD::binlog_delete_row(Table* table, bool is_trans, 
 
2734
                           uchar const *record)
 
2735
 
2736
  assert(current_stmt_binlog_row_based && mysql_bin_log.is_open());
 
2737
 
 
2738
  /* 
 
2739
     Pack records into format for transfer. We are allocating more
 
2740
     memory than needed, but that doesn't matter.
 
2741
  */
 
2742
  Row_data_memory memory(table, table->max_row_length(record));
 
2743
  if (unlikely(!memory.has_memory()))
 
2744
    return HA_ERR_OUT_OF_MEM;
 
2745
 
 
2746
  uchar *row_data= memory.slot(0);
 
2747
 
 
2748
  size_t const len= pack_row(table, table->read_set, row_data, record);
 
2749
 
 
2750
  Rows_log_event* const ev=
 
2751
    binlog_prepare_pending_rows_event(table, server_id, len, is_trans,
 
2752
                                      static_cast<Delete_rows_log_event*>(0));
 
2753
 
 
2754
  if (unlikely(ev == 0))
 
2755
    return HA_ERR_OUT_OF_MEM;
 
2756
 
 
2757
  return ev->add_row_data(row_data, len);
 
2758
}
 
2759
 
 
2760
 
 
2761
int THD::binlog_flush_pending_rows_event(bool stmt_end)
 
2762
{
 
2763
  /*
 
2764
    We shall flush the pending event even if we are not in row-based
 
2765
    mode: it might be the case that we left row-based mode before
 
2766
    flushing anything (e.g., if we have explicitly locked tables).
 
2767
   */
 
2768
  if (!mysql_bin_log.is_open())
 
2769
    return(0);
 
2770
 
 
2771
  /*
 
2772
    Mark the event as the last event of a statement if the stmt_end
 
2773
    flag is set.
 
2774
  */
 
2775
  int error= 0;
 
2776
  if (Rows_log_event *pending= binlog_get_pending_rows_event())
 
2777
  {
 
2778
    if (stmt_end)
 
2779
    {
 
2780
      pending->set_flags(Rows_log_event::STMT_END_F);
 
2781
      pending->flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
 
2782
      binlog_table_maps= 0;
 
2783
    }
 
2784
 
 
2785
    error= mysql_bin_log.flush_and_set_pending_rows_event(this, 0);
 
2786
  }
 
2787
 
 
2788
  return(error);
 
2789
}
 
2790
 
 
2791
 
 
2792
/*
 
2793
  Member function that will log query, either row-based or
 
2794
  statement-based depending on the value of the 'current_stmt_binlog_row_based'
 
2795
  the value of the 'qtype' flag.
 
2796
 
 
2797
  This function should be called after the all calls to ha_*_row()
 
2798
  functions have been issued, but before tables are unlocked and
 
2799
  closed.
 
2800
 
 
2801
  OBSERVE
 
2802
    There shall be no writes to any system table after calling
 
2803
    binlog_query(), so these writes has to be moved to before the call
 
2804
    of binlog_query() for correct functioning.
 
2805
 
 
2806
    This is necessesary not only for RBR, but the master might crash
 
2807
    after binlogging the query but before changing the system tables.
 
2808
    This means that the slave and the master are not in the same state
 
2809
    (after the master has restarted), so therefore we have to
 
2810
    eliminate this problem.
 
2811
 
 
2812
  RETURN VALUE
 
2813
    Error code, or 0 if no error.
 
2814
*/
 
2815
int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
 
2816
                      ulong query_len, bool is_trans, bool suppress_use,
 
2817
                      THD::killed_state killed_status_arg)
 
2818
{
 
2819
  assert(query_arg && mysql_bin_log.is_open());
 
2820
 
 
2821
  if (int error= binlog_flush_pending_rows_event(true))
 
2822
    return(error);
 
2823
 
 
2824
  /*
 
2825
    If we are in statement mode and trying to log an unsafe statement,
 
2826
    we should print a warning.
 
2827
  */
 
2828
  if (lex->is_stmt_unsafe() &&
 
2829
      variables.binlog_format == BINLOG_FORMAT_STMT)
 
2830
  {
 
2831
    assert(this->query != NULL);
 
2832
    push_warning(this, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
2833
                 ER_BINLOG_UNSAFE_STATEMENT,
 
2834
                 ER(ER_BINLOG_UNSAFE_STATEMENT));
 
2835
    if (!(binlog_flags & BINLOG_FLAG_UNSAFE_STMT_PRINTED))
 
2836
    {
 
2837
      char warn_buf[DRIZZLE_ERRMSG_SIZE];
 
2838
      snprintf(warn_buf, DRIZZLE_ERRMSG_SIZE, "%s Statement: %s",
 
2839
               ER(ER_BINLOG_UNSAFE_STATEMENT), this->query);
 
2840
      sql_print_warning(warn_buf);
 
2841
      binlog_flags|= BINLOG_FLAG_UNSAFE_STMT_PRINTED;
 
2842
    }
 
2843
  }
 
2844
 
 
2845
  switch (qtype) {
 
2846
  case THD::ROW_QUERY_TYPE:
 
2847
    if (current_stmt_binlog_row_based)
 
2848
      return(0);
 
2849
    /* Otherwise, we fall through */
 
2850
  case THD::DRIZZLE_QUERY_TYPE:
 
2851
    /*
 
2852
      Using this query type is a conveniece hack, since we have been
 
2853
      moving back and forth between using RBR for replication of
 
2854
      system tables and not using it.
 
2855
 
 
2856
      Make sure to change in check_table_binlog_row_based() according
 
2857
      to how you treat this.
 
2858
    */
 
2859
  case THD::STMT_QUERY_TYPE:
 
2860
    /*
 
2861
      The DRIZZLE_LOG::write() function will set the STMT_END_F flag and
 
2862
      flush the pending rows event if necessary.
 
2863
     */
 
2864
    {
 
2865
      Query_log_event qinfo(this, query_arg, query_len, is_trans, suppress_use,
 
2866
                            killed_status_arg);
 
2867
      qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
 
2868
      /*
 
2869
        Binlog table maps will be irrelevant after a Query_log_event
 
2870
        (they are just removed on the slave side) so after the query
 
2871
        log event is written to the binary log, we pretend that no
 
2872
        table maps were written.
 
2873
       */
 
2874
      int error= mysql_bin_log.write(&qinfo);
 
2875
      binlog_table_maps= 0;
 
2876
      return(error);
 
2877
    }
 
2878
    break;
 
2879
 
 
2880
  case THD::QUERY_TYPE_COUNT:
 
2881
  default:
 
2882
    assert(0 <= qtype && qtype < QUERY_TYPE_COUNT);
 
2883
  }
 
2884
  return(0);
 
2885
}
 
2886
 
 
2887
bool Discrete_intervals_list::append(uint64_t start, uint64_t val,
 
2888
                                 uint64_t incr)
 
2889
{
 
2890
  /* first, see if this can be merged with previous */
 
2891
  if ((head == NULL) || tail->merge_if_contiguous(start, val, incr))
 
2892
  {
 
2893
    /* it cannot, so need to add a new interval */
 
2894
    Discrete_interval *new_interval= new Discrete_interval(start, val, incr);
 
2895
    return(append(new_interval));
 
2896
  }
 
2897
  return(0);
 
2898
}
 
2899
 
 
2900
bool Discrete_intervals_list::append(Discrete_interval *new_interval)
 
2901
{
 
2902
  if (unlikely(new_interval == NULL))
 
2903
    return(1);
 
2904
  if (head == NULL)
 
2905
    head= current= new_interval;
 
2906
  else
 
2907
    tail->next= new_interval;
 
2908
  tail= new_interval;
 
2909
  elements++;
 
2910
  return(0);
 
2911
}
 
2912
 
 
2913
#endif /* !defined(DRIZZLE_CLIENT) */