~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_insert.cc

  • Committer: Monty Taylor
  • Date: 2008-11-07 00:15:51 UTC
  • mto: This revision was merged to the branch mainline in revision 579.
  • Revision ID: monty@inaugust.com-20081107001551-8vxb6sf1ti0i5p09
Cleaned up some headers for PCH.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
 
17
17
/* Insert of records */
18
18
 
19
 
#include "config.h"
20
 
#include <cstdio>
 
19
/*
 
20
  INSERT DELAYED
 
21
 
 
22
  Drizzle has a different form of DELAYED then MySQL. DELAYED is just
 
23
  a hint to the the sorage engine (which can then do whatever it likes.
 
24
*/
 
25
#include <drizzled/server_includes.h>
21
26
#include <drizzled/sql_select.h>
22
 
#include <drizzled/show.h>
 
27
#include <drizzled/sql_show.h>
 
28
#include <drizzled/rpl_mi.h>
23
29
#include <drizzled/error.h>
24
 
#include <drizzled/name_resolution_context_state.h>
 
30
#include <drizzled/slave.h>
 
31
#include <drizzled/sql_parse.h>
25
32
#include <drizzled/probes.h>
26
 
#include <drizzled/sql_base.h>
27
 
#include <drizzled/sql_load.h>
28
 
#include <drizzled/field/epoch.h>
29
 
#include <drizzled/lock.h>
30
 
#include "drizzled/sql_table.h"
31
 
#include "drizzled/pthread_globals.h"
32
 
#include "drizzled/transaction_services.h"
33
 
#include "drizzled/plugin/transactional_storage_engine.h"
34
 
 
35
 
#include "drizzled/table/shell.h"
36
 
 
37
 
namespace drizzled
38
 
{
39
 
 
40
 
extern plugin::StorageEngine *heap_engine;
41
 
extern plugin::StorageEngine *myisam_engine;
 
33
#include <drizzled/tableop_hooks.h>
 
34
 
 
35
/* Define to force use of my_malloc() if the allocated memory block is big */
 
36
 
 
37
#ifndef HAVE_ALLOCA
 
38
#define my_safe_alloca(size, min_length) my_alloca(size)
 
39
#define my_safe_afree(ptr, size, min_length) my_afree(ptr)
 
40
#else
 
41
#define my_safe_alloca(size, min_length) ((size <= min_length) ? my_alloca(size) : malloc(size))
 
42
#define my_safe_afree(ptr, size, min_length) if (size > min_length) free(ptr)
 
43
#endif
 
44
 
 
45
 
42
46
 
43
47
/*
44
48
  Check if insert fields are correct.
64
68
static int check_insert_fields(Session *session, TableList *table_list,
65
69
                               List<Item> &fields, List<Item> &values,
66
70
                               bool check_unique,
67
 
                               table_map *)
 
71
                               table_map *map __attribute__((unused)))
68
72
{
69
73
  Table *table= table_list->table;
70
74
 
71
75
  if (fields.elements == 0 && values.elements != 0)
72
76
  {
73
 
    if (values.elements != table->getShare()->sizeFields())
 
77
    if (values.elements != table->s->fields)
74
78
    {
75
79
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
76
80
      return -1;
81
85
      No fields are provided so all fields must be provided in the values.
82
86
      Thus we set all bits in the write set.
83
87
    */
84
 
    table->setWriteSet();
 
88
    bitmap_set_all(table->write_set);
85
89
  }
86
90
  else
87
91
  {                                             // Part field list
88
 
    Select_Lex *select_lex= &session->lex->select_lex;
 
92
    SELECT_LEX *select_lex= &session->lex->select_lex;
89
93
    Name_resolution_context *context= &select_lex->context;
90
94
    Name_resolution_context_state ctx_state;
91
95
    int res;
122
126
    }
123
127
    if (table->timestamp_field) // Don't automaticly set timestamp if used
124
128
    {
125
 
      if (table->timestamp_field->isWriteSet())
126
 
      {
 
129
      if (bitmap_is_set(table->write_set,
 
130
                        table->timestamp_field->field_index))
127
131
        clear_timestamp_auto_bits(table->timestamp_field_type,
128
132
                                  TIMESTAMP_AUTO_SET_ON_INSERT);
129
 
      }
130
133
      else
131
134
      {
132
 
        table->setWriteSet(table->timestamp_field->position());
 
135
        bitmap_set_bit(table->write_set,
 
136
                       table->timestamp_field->field_index);
133
137
      }
134
138
    }
 
139
    /* Mark all virtual columns for write*/
 
140
    if (table->vfield)
 
141
      table->mark_virtual_columns();
135
142
  }
136
143
 
137
144
  return 0;
159
166
 
160
167
static int check_update_fields(Session *session, TableList *insert_table_list,
161
168
                               List<Item> &update_fields,
162
 
                               table_map *)
 
169
                               table_map *map __attribute__((unused)))
163
170
{
164
171
  Table *table= insert_table_list->table;
165
172
  bool timestamp_mark= false;
170
177
      Unmark the timestamp field so that we can check if this is modified
171
178
      by update_fields
172
179
    */
173
 
    timestamp_mark= table->write_set->test(table->timestamp_field->position());
174
 
    table->write_set->reset(table->timestamp_field->position());
 
180
    timestamp_mark= bitmap_test_and_clear(table->write_set,
 
181
                                          table->timestamp_field->field_index);
175
182
  }
176
183
 
177
184
  /* Check the fields we are going to modify */
181
188
  if (table->timestamp_field)
182
189
  {
183
190
    /* Don't set timestamp column if this is modified. */
184
 
    if (table->timestamp_field->isWriteSet())
185
 
    {
 
191
    if (bitmap_is_set(table->write_set,
 
192
                      table->timestamp_field->field_index))
186
193
      clear_timestamp_auto_bits(table->timestamp_field_type,
187
194
                                TIMESTAMP_AUTO_SET_ON_UPDATE);
188
 
    }
189
 
 
190
195
    if (timestamp_mark)
191
 
    {
192
 
      table->setWriteSet(table->timestamp_field->position());
193
 
    }
 
196
      bitmap_set_bit(table->write_set,
 
197
                     table->timestamp_field->field_index);
194
198
  }
195
199
  return 0;
196
200
}
205
209
*/
206
210
 
207
211
static
208
 
void upgrade_lock_type(Session *,
 
212
void upgrade_lock_type(Session *session __attribute__((unused)),
209
213
                       thr_lock_type *lock_type,
210
214
                       enum_duplicates duplic,
211
 
                       bool )
 
215
                       bool is_multi_insert __attribute__((unused)))
212
216
{
213
217
  if (duplic == DUP_UPDATE ||
214
218
      (duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT))
227
231
  end of dispatch_command().
228
232
*/
229
233
 
230
 
bool insert_query(Session *session,TableList *table_list,
 
234
bool mysql_insert(Session *session,TableList *table_list,
231
235
                  List<Item> &fields,
232
236
                  List<List_item> &values_list,
233
237
                  List<Item> &update_fields,
238
242
  int error;
239
243
  bool transactional_table, joins_freed= false;
240
244
  bool changed;
 
245
  bool was_insert_delayed= (table_list->lock_type ==  TL_WRITE_DELAYED);
241
246
  uint32_t value_count;
242
247
  ulong counter = 1;
243
248
  uint64_t id;
244
 
  CopyInfo info;
 
249
  COPY_INFO info;
245
250
  Table *table= 0;
246
251
  List_iterator_fast<List_item> its(values_list);
247
252
  List_item *values;
249
254
  Name_resolution_context_state ctx_state;
250
255
  thr_lock_type lock_type;
251
256
  Item *unused_conds= 0;
252
 
 
 
257
  
253
258
 
254
259
  /*
255
260
    Upgrade lock type if the requested lock is incompatible with
258
263
  upgrade_lock_type(session, &table_list->lock_type, duplic,
259
264
                    values_list.elements > 1);
260
265
 
261
 
  if (session->openTablesLock(table_list))
 
266
  /*
 
267
    We can't write-delayed into a table locked with LOCK TABLES:
 
268
    this will lead to a deadlock, since the delayed thread will
 
269
    never be able to get a lock on the table. QQQ: why not
 
270
    upgrade the lock here instead?
 
271
  */
 
272
  if (table_list->lock_type == TL_WRITE_DELAYED && session->locked_tables &&
 
273
      find_locked_table(session, table_list->db, table_list->table_name))
262
274
  {
263
 
    DRIZZLE_INSERT_DONE(1, 0);
264
 
    return true;
 
275
    my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
 
276
             table_list->table_name);
 
277
    return(true);
265
278
  }
266
279
 
 
280
  {
 
281
    if (open_and_lock_tables(session, table_list))
 
282
      return(true);
 
283
  }
267
284
  lock_type= table_list->lock_type;
268
285
 
269
286
  session->set_proc_info("init");
271
288
  values= its++;
272
289
  value_count= values->elements;
273
290
 
274
 
  if (prepare_insert(session, table_list, table, fields, values,
 
291
  if (mysql_prepare_insert(session, table_list, table, fields, values,
275
292
                           update_fields, update_values, duplic, &unused_conds,
276
293
                           false,
277
294
                           (fields.elements || !value_count ||
278
295
                            (0) != 0), !ignore))
279
 
  {
280
 
    if (table != NULL)
281
 
      table->cursor->ha_release_auto_increment();
282
 
    if (!joins_freed)
283
 
      free_underlaid_joins(session, &session->lex->select_lex);
284
 
    session->setAbortOnWarning(false);
285
 
    DRIZZLE_INSERT_DONE(1, 0);
286
 
    return true;
287
 
  }
 
296
    goto abort;
288
297
 
289
298
  /* mysql_prepare_insert set table_list->table if it was not set */
290
299
  table= table_list->table;
315
324
    if (values->elements != value_count)
316
325
    {
317
326
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
318
 
 
319
 
      if (table != NULL)
320
 
        table->cursor->ha_release_auto_increment();
321
 
      if (!joins_freed)
322
 
        free_underlaid_joins(session, &session->lex->select_lex);
323
 
      session->setAbortOnWarning(false);
324
 
      DRIZZLE_INSERT_DONE(1, 0);
325
 
 
326
 
      return true;
 
327
      goto abort;
327
328
    }
328
329
    if (setup_fields(session, 0, *values, MARK_COLUMNS_READ, 0, 0))
329
 
    {
330
 
      if (table != NULL)
331
 
        table->cursor->ha_release_auto_increment();
332
 
      if (!joins_freed)
333
 
        free_underlaid_joins(session, &session->lex->select_lex);
334
 
      session->setAbortOnWarning(false);
335
 
      DRIZZLE_INSERT_DONE(1, 0);
336
 
      return true;
337
 
    }
 
330
      goto abort;
338
331
  }
339
332
  its.rewind ();
340
 
 
 
333
 
341
334
  /* Restore the current context. */
342
335
  ctx_state.restore_state(context, table_list);
343
336
 
344
337
  /*
345
 
    Fill in the given fields and dump it to the table cursor
 
338
    Fill in the given fields and dump it to the table file
346
339
  */
 
340
  memset(&info, 0, sizeof(info));
347
341
  info.ignore= ignore;
348
342
  info.handle_duplicates=duplic;
349
343
  info.update_fields= &update_fields;
354
348
    For single line insert, generate an error if try to set a NOT NULL field
355
349
    to NULL.
356
350
  */
357
 
  session->count_cuted_fields= ignore ? CHECK_FIELD_WARN : CHECK_FIELD_ERROR_FOR_NULL;
358
 
 
 
351
  session->count_cuted_fields= ((values_list.elements == 1 &&
 
352
                             !ignore) ?
 
353
                            CHECK_FIELD_ERROR_FOR_NULL :
 
354
                            CHECK_FIELD_WARN);
359
355
  session->cuted_fields = 0L;
360
356
  table->next_number_field=table->found_next_number_field;
361
357
 
 
358
  if (session->slave_thread &&
 
359
      (info.handle_duplicates == DUP_UPDATE) &&
 
360
      (table->next_number_field != NULL) &&
 
361
      rpl_master_has_bug(&active_mi->rli, 24432))
 
362
    goto abort;
 
363
 
362
364
  error=0;
363
365
  session->set_proc_info("update");
364
366
  if (duplic == DUP_REPLACE)
365
 
    table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
367
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
366
368
  if (duplic == DUP_UPDATE)
367
 
    table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
369
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
368
370
  {
369
371
    if (duplic != DUP_ERROR || ignore)
370
 
      table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
371
 
    table->cursor->ha_start_bulk_insert(values_list.elements);
 
372
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
373
    table->file->ha_start_bulk_insert(values_list.elements);
372
374
  }
373
375
 
374
376
 
375
 
  session->setAbortOnWarning(not ignore);
 
377
  session->abort_on_warning= !ignore;
376
378
 
377
379
  table->mark_columns_needed_for_insert();
378
380
 
380
382
  {
381
383
    if (fields.elements || !value_count)
382
384
    {
383
 
      table->restoreRecordAsDefault();  // Get empty record
384
 
      if (fill_record(session, fields, *values))
 
385
      restore_record(table,s->default_values);  // Get empty record
 
386
      if (fill_record(session, fields, *values, 0))
385
387
      {
386
388
        if (values_list.elements != 1 && ! session->is_error())
387
389
        {
399
401
    }
400
402
    else
401
403
    {
402
 
      table->restoreRecordAsDefault();  // Get empty record
403
 
 
404
 
      if (fill_record(session, table->getFields(), *values))
 
404
      if (session->used_tables)                 // Column used in values()
 
405
        restore_record(table,s->default_values);        // Get empty record
 
406
      else
 
407
      {
 
408
        /*
 
409
          Fix delete marker. No need to restore rest of record since it will
 
410
          be overwritten by fill_record() anyway (and fill_record() does not
 
411
          use default values in this case).
 
412
        */
 
413
        table->record[0][0]= table->s->default_values[0];
 
414
      }
 
415
      if (fill_record(session, table->field, *values, 0))
405
416
      {
406
417
        if (values_list.elements != 1 && ! session->is_error())
407
418
        {
413
424
      }
414
425
    }
415
426
 
416
 
    // Release latches in case bulk insert takes a long time
417
 
    plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
418
 
 
419
427
    error=write_record(session, table ,&info);
420
428
    if (error)
421
429
      break;
434
442
      Do not do this release if this is a delayed insert, it would steal
435
443
      auto_inc values from the delayed_insert thread as they share Table.
436
444
    */
437
 
    table->cursor->ha_release_auto_increment();
438
 
    if (table->cursor->ha_end_bulk_insert() && !error)
 
445
    table->file->ha_release_auto_increment();
 
446
    if (table->file->ha_end_bulk_insert() && !error)
439
447
    {
440
 
      table->print_error(errno,MYF(0));
 
448
      table->file->print_error(my_errno,MYF(0));
441
449
      error=1;
442
450
    }
443
451
    if (duplic != DUP_ERROR || ignore)
444
 
      table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
445
 
 
446
 
    transactional_table= table->cursor->has_transactions();
447
 
 
448
 
    changed= (info.copied || info.deleted || info.updated);
449
 
    if ((changed && error <= 0) || session->transaction.stmt.hasModifiedNonTransData())
450
 
    {
451
 
      if (session->transaction.stmt.hasModifiedNonTransData())
452
 
        session->transaction.all.markModifiedNonTransData();
453
 
    }
454
 
    assert(transactional_table || !changed || session->transaction.stmt.hasModifiedNonTransData());
 
452
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
453
 
 
454
    transactional_table= table->file->has_transactions();
 
455
 
 
456
    if ((changed= (info.copied || info.deleted || info.updated)))
 
457
    {
 
458
      /*
 
459
        Invalidate the table in the query cache if something changed.
 
460
        For the transactional algorithm to work the invalidation must be
 
461
        before binlog writing and ha_autocommit_or_rollback
 
462
      */
 
463
    }
 
464
    if ((changed && error <= 0) || session->transaction.stmt.modified_non_trans_table || was_insert_delayed)
 
465
    {
 
466
      if (mysql_bin_log.is_open())
 
467
      {
 
468
        if (error <= 0)
 
469
        {
 
470
          /*
 
471
            [Guilhem wrote] Temporary errors may have filled
 
472
            session->net.last_error/errno.  For example if there has
 
473
            been a disk full error when writing the row, and it was
 
474
            MyISAM, then session->net.last_error/errno will be set to
 
475
            "disk full"... and the my_pwrite() will wait until free
 
476
            space appears, and so when it finishes then the
 
477
            write_row() was entirely successful
 
478
          */
 
479
          /* todo: consider removing */
 
480
          session->clear_error();
 
481
        }
 
482
        /* bug#22725:
 
483
 
 
484
        A query which per-row-loop can not be interrupted with
 
485
        KILLED, like INSERT, and that does not invoke stored
 
486
        routines can be binlogged with neglecting the KILLED error.
 
487
        
 
488
        If there was no error (error == zero) until after the end of
 
489
        inserting loop the KILLED flag that appeared later can be
 
490
        disregarded since previously possible invocation of stored
 
491
        routines did not result in any error due to the KILLED.  In
 
492
        such case the flag is ignored for constructing binlog event.
 
493
        */
 
494
        assert(session->killed != Session::KILL_BAD_DATA || error > 0);
 
495
        if (session->binlog_query(Session::ROW_QUERY_TYPE,
 
496
                              session->query, session->query_length,
 
497
                              transactional_table, false,
 
498
                              (error>0) ? session->killed : Session::NOT_KILLED) &&
 
499
            transactional_table)
 
500
        {
 
501
          error=1;
 
502
        }
 
503
      }
 
504
      if (session->transaction.stmt.modified_non_trans_table)
 
505
        session->transaction.all.modified_non_trans_table= true;
 
506
    }
 
507
    assert(transactional_table || !changed || 
 
508
                session->transaction.stmt.modified_non_trans_table);
455
509
 
456
510
  }
457
511
  session->set_proc_info("end");
475
529
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
476
530
  table->auto_increment_field_not_null= false;
477
531
  if (duplic == DUP_REPLACE)
478
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
532
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
479
533
 
480
534
  if (error)
481
 
  {
482
 
    if (table != NULL)
483
 
      table->cursor->ha_release_auto_increment();
484
 
    if (!joins_freed)
485
 
      free_underlaid_joins(session, &session->lex->select_lex);
486
 
    session->setAbortOnWarning(false);
487
 
    DRIZZLE_INSERT_DONE(1, 0);
488
 
    return true;
489
 
  }
490
 
 
 
535
    goto abort;
491
536
  if (values_list.elements == 1 && (!(session->options & OPTION_WARNINGS) ||
492
537
                                    !session->cuted_fields))
493
538
  {
494
 
    session->row_count_func= info.copied + info.deleted + info.updated;
495
 
    session->my_ok((ulong) session->rowCount(),
496
 
                   info.copied + info.deleted + info.touched, id);
 
539
    session->row_count_func= info.copied + info.deleted +
 
540
                         ((session->client_capabilities & CLIENT_FOUND_ROWS) ?
 
541
                          info.touched : info.updated);
 
542
    my_ok(session, (ulong) session->row_count_func, id);
497
543
  }
498
544
  else
499
545
  {
500
546
    char buff[160];
 
547
    ha_rows updated=((session->client_capabilities & CLIENT_FOUND_ROWS) ?
 
548
                     info.touched : info.updated);
501
549
    if (ignore)
502
 
      snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
 
550
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
503
551
              (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
504
552
    else
505
 
      snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
506
 
              (ulong) (info.deleted + info.updated), (ulong) session->cuted_fields);
507
 
    session->row_count_func= info.copied + info.deleted + info.updated;
508
 
    session->my_ok((ulong) session->rowCount(),
509
 
                   info.copied + info.deleted + info.touched, id, buff);
 
553
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
 
554
              (ulong) (info.deleted + updated), (ulong) session->cuted_fields);
 
555
    session->row_count_func= info.copied + info.deleted + updated;
 
556
    ::my_ok(session, (ulong) session->row_count_func, id, buff);
510
557
  }
511
 
  session->status_var.inserted_row_count+= session->rowCount();
512
 
  session->setAbortOnWarning(false);
513
 
  DRIZZLE_INSERT_DONE(0, session->rowCount());
 
558
  session->abort_on_warning= 0;
 
559
  DRIZZLE_INSERT_END();
 
560
  return(false);
514
561
 
515
 
  return false;
 
562
abort:
 
563
  if (table != NULL)
 
564
    table->file->ha_release_auto_increment();
 
565
  if (!joins_freed)
 
566
    free_underlaid_joins(session, &session->lex->select_lex);
 
567
  session->abort_on_warning= 0;
 
568
  DRIZZLE_INSERT_END();
 
569
  return(true);
516
570
}
517
571
 
518
572
 
520
574
  Check if table can be updated
521
575
 
522
576
  SYNOPSIS
523
 
     prepare_insert_check_table()
 
577
     mysql_prepare_insert_check_table()
524
578
     session            Thread handle
525
579
     table_list         Table list
526
580
     fields             List of fields to be updated
532
586
     true  ERROR
533
587
*/
534
588
 
535
 
static bool prepare_insert_check_table(Session *session, TableList *table_list,
536
 
                                             List<Item> &,
 
589
static bool mysql_prepare_insert_check_table(Session *session, TableList *table_list,
 
590
                                             List<Item> &fields __attribute__((unused)),
537
591
                                             bool select_insert)
538
592
{
539
 
 
 
593
  
540
594
 
541
595
  /*
542
596
     first table in list is the one we'll INSERT into, requires INSERT_ACL.
560
614
  Prepare items in INSERT statement
561
615
 
562
616
  SYNOPSIS
563
 
    prepare_insert()
 
617
    mysql_prepare_insert()
564
618
    session                     Thread handler
565
619
    table_list          Global/local table list
566
620
    table               Table to insert into (can be NULL if table should
567
 
                        be taken from table_list->table)
 
621
                        be taken from table_list->table)    
568
622
    where               Where clause (for insert ... select)
569
623
    select_insert       true if INSERT ... SELECT statement
570
 
    check_fields        true if need to check that all INSERT fields are
 
624
    check_fields        true if need to check that all INSERT fields are 
571
625
                        given values.
572
 
    abort_on_warning    whether to report if some INSERT field is not
 
626
    abort_on_warning    whether to report if some INSERT field is not 
573
627
                        assigned as an error (true) or as a warning (false).
574
628
 
575
629
  TODO (in far future)
581
635
  WARNING
582
636
    You MUST set table->insert_values to 0 after calling this function
583
637
    before releasing the table object.
584
 
 
 
638
  
585
639
  RETURN VALUE
586
640
    false OK
587
641
    true  error
588
642
*/
589
643
 
590
 
bool prepare_insert(Session *session, TableList *table_list,
 
644
bool mysql_prepare_insert(Session *session, TableList *table_list,
591
645
                          Table *table, List<Item> &fields, List_item *values,
592
646
                          List<Item> &update_fields, List<Item> &update_values,
593
647
                          enum_duplicates duplic,
594
 
                          COND **,
 
648
                          COND **where __attribute__((unused)),
595
649
                          bool select_insert,
596
650
                          bool check_fields, bool abort_on_warning)
597
651
{
598
 
  Select_Lex *select_lex= &session->lex->select_lex;
 
652
  SELECT_LEX *select_lex= &session->lex->select_lex;
599
653
  Name_resolution_context *context= &select_lex->context;
600
654
  Name_resolution_context_state ctx_state;
601
655
  bool insert_into_view= (0 != 0);
602
656
  bool res= 0;
603
657
  table_map map= 0;
604
 
 
 
658
  
605
659
  /* INSERT should have a SELECT or VALUES clause */
606
660
  assert (!select_insert || !values);
607
661
 
608
662
  /*
609
663
    For subqueries in VALUES() we should not see the table in which we are
610
664
    inserting (for INSERT ... SELECT this is done by changing table_list,
611
 
    because INSERT ... SELECT share Select_Lex it with SELECT.
 
665
    because INSERT ... SELECT share SELECT_LEX it with SELECT.
612
666
  */
613
 
  if (not select_insert)
 
667
  if (!select_insert)
614
668
  {
615
 
    for (Select_Lex_Unit *un= select_lex->first_inner_unit();
 
669
    for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit();
616
670
         un;
617
671
         un= un->next_unit())
618
672
    {
619
 
      for (Select_Lex *sl= un->first_select();
 
673
      for (SELECT_LEX *sl= un->first_select();
620
674
           sl;
621
675
           sl= sl->next_select())
622
676
      {
632
686
      return(true);
633
687
  }
634
688
 
635
 
  if (prepare_insert_check_table(session, table_list, fields, select_insert))
 
689
  if (mysql_prepare_insert_check_table(session, table_list, fields, select_insert))
636
690
    return(true);
637
691
 
638
692
 
658
712
 
659
713
    if (!res && check_fields)
660
714
    {
661
 
      bool saved_abort_on_warning= session->abortOnWarning();
662
 
 
663
 
      session->setAbortOnWarning(abort_on_warning);
664
 
      res= check_that_all_fields_are_given_values(session,
665
 
                                                  table ? table :
 
715
      bool saved_abort_on_warning= session->abort_on_warning;
 
716
      session->abort_on_warning= abort_on_warning;
 
717
      res= check_that_all_fields_are_given_values(session, 
 
718
                                                  table ? table : 
666
719
                                                  context->table_list->table,
667
720
                                                  context->table_list);
668
 
      session->setAbortOnWarning(saved_abort_on_warning);
 
721
      session->abort_on_warning= saved_abort_on_warning;
669
722
    }
670
723
 
671
724
    if (!res && duplic == DUP_UPDATE)
676
729
    /* Restore the current context. */
677
730
    ctx_state.restore_state(context, table_list);
678
731
 
679
 
    if (not res)
 
732
    if (!res)
680
733
      res= setup_fields(session, 0, update_values, MARK_COLUMNS_READ, 0, 0);
681
734
  }
682
735
 
683
736
  if (res)
684
737
    return(res);
685
738
 
686
 
  if (not table)
 
739
  if (!table)
687
740
    table= table_list->table;
688
741
 
689
 
  if (not select_insert)
 
742
  if (!select_insert)
690
743
  {
691
744
    TableList *duplicate;
692
 
    if ((duplicate= unique_table(table_list, table_list->next_global, true)))
 
745
    if ((duplicate= unique_table(session, table_list, table_list->next_global, 1)))
693
746
    {
694
 
      my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->alias);
695
 
 
696
 
      return true;
 
747
      update_non_unique_table_error(table_list, "INSERT", duplicate);
 
748
      return(true);
697
749
    }
698
750
  }
699
 
 
700
751
  if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
701
752
    table->prepare_for_position();
702
 
 
703
 
  return false;
 
753
  return(false);
704
754
}
705
755
 
706
756
 
708
758
 
709
759
static int last_uniq_key(Table *table,uint32_t keynr)
710
760
{
711
 
  while (++keynr < table->getShare()->sizeKeys())
 
761
  while (++keynr < table->s->keys)
712
762
    if (table->key_info[keynr].flags & HA_NOSAME)
713
763
      return 0;
714
764
  return 1;
723
773
     write_record()
724
774
      session   - thread context
725
775
      table - table to which record should be written
726
 
      info  - CopyInfo structure describing handling of duplicates
 
776
      info  - COPY_INFO structure describing handling of duplicates
727
777
              and which is used for counting number of records inserted
728
778
              and deleted.
729
779
 
733
783
    then both on update triggers will work instead. Similarly both on
734
784
    delete triggers will be invoked if we will delete conflicting records.
735
785
 
736
 
    Sets session->transaction.stmt.modified_non_trans_data to true if table which is updated didn't have
 
786
    Sets session->transaction.stmt.modified_non_trans_table to true if table which is updated didn't have
737
787
    transactions.
738
788
 
739
789
  RETURN VALUE
742
792
*/
743
793
 
744
794
 
745
 
int write_record(Session *session, Table *table,CopyInfo *info)
 
795
int write_record(Session *session, Table *table,COPY_INFO *info)
746
796
{
747
797
  int error;
748
 
  std::vector<unsigned char> key;
749
 
  boost::dynamic_bitset<> *save_read_set, *save_write_set;
750
 
  uint64_t prev_insert_id= table->cursor->next_insert_id;
 
798
  char *key=0;
 
799
  MY_BITMAP *save_read_set, *save_write_set;
 
800
  uint64_t prev_insert_id= table->file->next_insert_id;
751
801
  uint64_t insert_id_for_cur_row= 0;
752
 
 
 
802
  
753
803
 
754
804
  info->records++;
755
805
  save_read_set=  table->read_set;
756
806
  save_write_set= table->write_set;
757
807
 
758
 
  if (info->handle_duplicates == DUP_REPLACE || info->handle_duplicates == DUP_UPDATE)
 
808
  if (info->handle_duplicates == DUP_REPLACE ||
 
809
      info->handle_duplicates == DUP_UPDATE)
759
810
  {
760
 
    while ((error=table->cursor->insertRecord(table->getInsertRecord())))
 
811
    while ((error=table->file->ha_write_row(table->record[0])))
761
812
    {
762
813
      uint32_t key_nr;
763
814
      /*
767
818
        the autogenerated value to avoid session->insert_id_for_cur_row to become
768
819
        0.
769
820
      */
770
 
      if (table->cursor->insert_id_for_cur_row > 0)
771
 
        insert_id_for_cur_row= table->cursor->insert_id_for_cur_row;
 
821
      if (table->file->insert_id_for_cur_row > 0)
 
822
        insert_id_for_cur_row= table->file->insert_id_for_cur_row;
772
823
      else
773
 
        table->cursor->insert_id_for_cur_row= insert_id_for_cur_row;
 
824
        table->file->insert_id_for_cur_row= insert_id_for_cur_row;
774
825
      bool is_duplicate_key_error;
775
 
      if (table->cursor->is_fatal_error(error, HA_CHECK_DUP))
 
826
      if (table->file->is_fatal_error(error, HA_CHECK_DUP))
776
827
        goto err;
777
 
      is_duplicate_key_error= table->cursor->is_fatal_error(error, 0);
 
828
      is_duplicate_key_error= table->file->is_fatal_error(error, 0);
778
829
      if (!is_duplicate_key_error)
779
830
      {
780
831
        /*
786
837
          goto gok_or_after_err; /* Ignoring a not fatal error, return 0 */
787
838
        goto err;
788
839
      }
789
 
      if ((int) (key_nr = table->get_dup_key(error)) < 0)
 
840
      if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
790
841
      {
791
842
        error= HA_ERR_FOUND_DUPP_KEY;         /* Database can't find key */
792
843
        goto err;
800
851
      */
801
852
      if (info->handle_duplicates == DUP_REPLACE &&
802
853
          table->next_number_field &&
803
 
          key_nr == table->getShare()->next_number_index &&
 
854
          key_nr == table->s->next_number_index &&
804
855
          (insert_id_for_cur_row > 0))
805
856
        goto err;
806
 
      if (table->cursor->getEngine()->check_flag(HTON_BIT_DUPLICATE_POS))
 
857
      if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
807
858
      {
808
 
        if (table->cursor->rnd_pos(table->getUpdateRecord(),table->cursor->dup_ref))
 
859
        if (table->file->rnd_pos(table->record[1],table->file->dup_ref))
809
860
          goto err;
810
861
      }
811
862
      else
812
863
      {
813
 
        if (table->cursor->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
 
864
        if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
814
865
        {
815
 
          error=errno;
 
866
          error=my_errno;
816
867
          goto err;
817
868
        }
818
869
 
819
 
        if (not key.size())
 
870
        if (!key)
820
871
        {
821
 
          key.resize(table->getShare()->max_unique_length);
 
872
          if (!(key=(char*) my_safe_alloca(table->s->max_unique_length,
 
873
                                           MAX_KEY_LENGTH)))
 
874
          {
 
875
            error=ENOMEM;
 
876
            goto err;
 
877
          }
822
878
        }
823
 
        key_copy(&key[0], table->getInsertRecord(), table->key_info+key_nr, 0);
824
 
        if ((error=(table->cursor->index_read_idx_map(table->getUpdateRecord(),key_nr,
825
 
                                                    &key[0], HA_WHOLE_KEY,
 
879
        key_copy((unsigned char*) key,table->record[0],table->key_info+key_nr,0);
 
880
        if ((error=(table->file->index_read_idx_map(table->record[1],key_nr,
 
881
                                                    (unsigned char*) key, HA_WHOLE_KEY,
826
882
                                                    HA_READ_KEY_EXACT))))
827
883
          goto err;
828
884
      }
833
889
          that matches, is updated. If update causes a conflict again,
834
890
          an error is returned
835
891
        */
836
 
        assert(table->insert_values.size());
837
 
        table->storeRecordAsInsert();
838
 
        table->restoreRecord();
 
892
        assert(table->insert_values != NULL);
 
893
        store_record(table,insert_values);
 
894
        restore_record(table,record[1]);
839
895
        assert(info->update_fields->elements ==
840
896
                    info->update_values->elements);
841
897
        if (fill_record(session, *info->update_fields,
843
899
                                                 info->ignore))
844
900
          goto before_err;
845
901
 
846
 
        table->cursor->restore_auto_increment(prev_insert_id);
 
902
        table->file->restore_auto_increment(prev_insert_id);
847
903
        if (table->next_number_field)
848
 
          table->cursor->adjust_next_insert_id_after_explicit_value(
 
904
          table->file->adjust_next_insert_id_after_explicit_value(
849
905
            table->next_number_field->val_int());
850
906
        info->touched++;
851
 
 
852
 
        if (! table->records_are_comparable() || table->compare_records())
 
907
        if ((table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ &&
 
908
             !bitmap_is_subset(table->write_set, table->read_set)) ||
 
909
            table->compare_record())
853
910
        {
854
 
          if ((error=table->cursor->updateRecord(table->getUpdateRecord(),
855
 
                                                table->getInsertRecord())) &&
 
911
          if ((error=table->file->ha_update_row(table->record[1],
 
912
                                                table->record[0])) &&
856
913
              error != HA_ERR_RECORD_IS_THE_SAME)
857
914
          {
858
915
            if (info->ignore &&
859
 
                !table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
 
916
                !table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
860
917
            {
861
918
              goto gok_or_after_err;
862
919
            }
870
927
          /*
871
928
            If ON DUP KEY UPDATE updates a row instead of inserting one, it's
872
929
            like a regular UPDATE statement: it should not affect the value of a
873
 
            next SELECT LAST_INSERT_ID() or insert_id().
 
930
            next SELECT LAST_INSERT_ID() or mysql_insert_id().
874
931
            Except if LAST_INSERT_ID(#) was in the INSERT query, which is
875
932
            handled separately by Session::arg_of_last_insert_id_function.
876
933
          */
877
 
          insert_id_for_cur_row= table->cursor->insert_id_for_cur_row= 0;
 
934
          insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
878
935
          info->copied++;
879
936
        }
880
937
 
881
938
        if (table->next_number_field)
882
 
          table->cursor->adjust_next_insert_id_after_explicit_value(
 
939
          table->file->adjust_next_insert_id_after_explicit_value(
883
940
            table->next_number_field->val_int());
884
941
        info->touched++;
885
942
 
892
949
          an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
893
950
          InnoDB do not function in the defined way if we allow MySQL
894
951
          to convert the latter operation internally to an UPDATE.
895
 
          We also should not perform this conversion if we have
 
952
          We also should not perform this conversion if we have 
896
953
          timestamp field with ON UPDATE which is different from DEFAULT.
897
954
          Another case when conversion should not be performed is when
898
955
          we have ON DELETE trigger on table so user may notice that
902
959
          ON UPDATE triggers.
903
960
        */
904
961
        if (last_uniq_key(table,key_nr) &&
905
 
            !table->cursor->referenced_by_foreign_key() &&
 
962
            !table->file->referenced_by_foreign_key() &&
906
963
            (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
907
964
             table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
908
965
        {
909
 
          if ((error=table->cursor->updateRecord(table->getUpdateRecord(),
910
 
                                                table->getInsertRecord())) &&
 
966
          if ((error=table->file->ha_update_row(table->record[1],
 
967
                                                table->record[0])) &&
911
968
              error != HA_ERR_RECORD_IS_THE_SAME)
912
969
            goto err;
913
970
          if (error != HA_ERR_RECORD_IS_THE_SAME)
914
971
            info->deleted++;
915
972
          else
916
973
            error= 0;
917
 
          session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
 
974
          session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
918
975
          /*
919
976
            Since we pretend that we have done insert we should call
920
977
            its after triggers.
923
980
        }
924
981
        else
925
982
        {
926
 
          if ((error=table->cursor->deleteRecord(table->getUpdateRecord())))
 
983
          if ((error=table->file->ha_delete_row(table->record[1])))
927
984
            goto err;
928
985
          info->deleted++;
929
 
          if (!table->cursor->has_transactions())
930
 
            session->transaction.stmt.markModifiedNonTransData();
 
986
          if (!table->file->has_transactions())
 
987
            session->transaction.stmt.modified_non_trans_table= true;
931
988
          /* Let us attempt do write_row() once more */
932
989
        }
933
990
      }
934
991
    }
935
 
    session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
 
992
    session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
936
993
    /*
937
994
      Restore column maps if they where replaced during an duplicate key
938
995
      problem.
939
996
    */
940
997
    if (table->read_set != save_read_set ||
941
998
        table->write_set != save_write_set)
942
 
      table->column_bitmaps_set(*save_read_set, *save_write_set);
 
999
      table->column_bitmaps_set(save_read_set, save_write_set);
943
1000
  }
944
 
  else if ((error=table->cursor->insertRecord(table->getInsertRecord())))
 
1001
  else if ((error=table->file->ha_write_row(table->record[0])))
945
1002
  {
946
1003
    if (!info->ignore ||
947
 
        table->cursor->is_fatal_error(error, HA_CHECK_DUP))
 
1004
        table->file->is_fatal_error(error, HA_CHECK_DUP))
948
1005
      goto err;
949
 
    table->cursor->restore_auto_increment(prev_insert_id);
 
1006
    table->file->restore_auto_increment(prev_insert_id);
950
1007
    goto gok_or_after_err;
951
1008
  }
952
1009
 
953
1010
after_n_copied_inc:
954
1011
  info->copied++;
955
 
  session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
 
1012
  session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
956
1013
 
957
1014
gok_or_after_err:
958
 
  if (!table->cursor->has_transactions())
959
 
    session->transaction.stmt.markModifiedNonTransData();
 
1015
  if (key)
 
1016
    my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH);
 
1017
  if (!table->file->has_transactions())
 
1018
    session->transaction.stmt.modified_non_trans_table= true;
960
1019
  return(0);
961
1020
 
962
1021
err:
964
1023
  /* current_select is NULL if this is a delayed insert */
965
1024
  if (session->lex->current_select)
966
1025
    session->lex->current_select->no_error= 0;        // Give error
967
 
  table->print_error(error,MYF(0));
968
 
 
 
1026
  table->file->print_error(error,MYF(0));
 
1027
  
969
1028
before_err:
970
 
  table->cursor->restore_auto_increment(prev_insert_id);
971
 
  table->column_bitmaps_set(*save_read_set, *save_write_set);
972
 
  return 1;
 
1029
  table->file->restore_auto_increment(prev_insert_id);
 
1030
  if (key)
 
1031
    my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH);
 
1032
  table->column_bitmaps_set(save_read_set, save_write_set);
 
1033
  return(1);
973
1034
}
974
1035
 
975
1036
 
978
1039
******************************************************************************/
979
1040
 
980
1041
int check_that_all_fields_are_given_values(Session *session, Table *entry,
981
 
                                           TableList *)
 
1042
                                           TableList *table_list)
982
1043
{
983
1044
  int err= 0;
 
1045
  MY_BITMAP *write_set= entry->write_set;
984
1046
 
985
 
  for (Field **field=entry->getFields() ; *field ; field++)
 
1047
  for (Field **field=entry->field ; *field ; field++)
986
1048
  {
987
 
    if (((*field)->isWriteSet()) == false)
988
 
    {
989
 
      /*
990
 
       * If the field doesn't have any default value
991
 
       * and there is no actual value specified in the
992
 
       * INSERT statement, throw error ER_NO_DEFAULT_FOR_FIELD.
993
 
       */
994
 
      if (((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
 
1049
    if (!bitmap_is_set(write_set, (*field)->field_index) &&
 
1050
        ((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
995
1051
        ((*field)->real_type() != DRIZZLE_TYPE_ENUM))
996
 
      {
997
 
        my_error(ER_NO_DEFAULT_FOR_FIELD, MYF(0), (*field)->field_name);
998
 
        err= 1;
999
 
      }
1000
 
    }
1001
 
    else
1002
1052
    {
1003
 
      /*
1004
 
       * However, if an actual NULL value was specified
1005
 
       * for the field and the field is a NOT NULL field, 
1006
 
       * throw ER_BAD_NULL_ERROR.
1007
 
       *
1008
 
       * Per the SQL standard, inserting NULL into a NOT NULL
1009
 
       * field requires an error to be thrown.
1010
 
       */
1011
 
      if (((*field)->flags & NOT_NULL_FLAG) &&
1012
 
          (*field)->is_null())
1013
 
      {
1014
 
        my_error(ER_BAD_NULL_ERROR, MYF(0), (*field)->field_name);
1015
 
        err= 1;
1016
 
      }
 
1053
      bool view= false;
 
1054
      if (table_list)
 
1055
      {
 
1056
        table_list= table_list->top_table();
 
1057
        view= test(0);
 
1058
      }
 
1059
      {
 
1060
        push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
 
1061
                            ER_NO_DEFAULT_FOR_FIELD,
 
1062
                            ER(ER_NO_DEFAULT_FOR_FIELD),
 
1063
                            (*field)->field_name);
 
1064
      }
 
1065
      err= 1;
1017
1066
    }
1018
1067
  }
1019
 
  return session->abortOnWarning() ? err : 0;
 
1068
  return session->abort_on_warning ? err : 0;
1020
1069
}
1021
1070
 
1022
1071
/***************************************************************************
1028
1077
  make insert specific preparation and checks after opening tables
1029
1078
 
1030
1079
  SYNOPSIS
1031
 
    insert_select_prepare()
 
1080
    mysql_insert_select_prepare()
1032
1081
    session         thread handler
1033
1082
 
1034
1083
  RETURN
1036
1085
    true  Error
1037
1086
*/
1038
1087
 
1039
 
bool insert_select_prepare(Session *session)
 
1088
bool mysql_insert_select_prepare(Session *session)
1040
1089
{
1041
1090
  LEX *lex= session->lex;
1042
 
  Select_Lex *select_lex= &lex->select_lex;
1043
 
 
1044
 
  /*
1045
 
    Select_Lex do not belong to INSERT statement, so we can't add WHERE
 
1091
  SELECT_LEX *select_lex= &lex->select_lex;
 
1092
  
 
1093
 
 
1094
  /*
 
1095
    Statement-based replication of INSERT ... SELECT ... LIMIT is not safe
 
1096
    as order of rows is not defined, so in mixed mode we go to row-based.
 
1097
 
 
1098
    Note that we may consider a statement as safe if ORDER BY primary_key
 
1099
    is present or we SELECT a constant. However it may confuse users to
 
1100
    see very similiar statements replicated differently.
 
1101
  */
 
1102
  if (lex->current_select->select_limit)
 
1103
  {
 
1104
    lex->set_stmt_unsafe();
 
1105
    session->set_current_stmt_binlog_row_based_if_mixed();
 
1106
  }
 
1107
  /*
 
1108
    SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
1046
1109
    clause if table is VIEW
1047
1110
  */
1048
 
 
1049
 
  if (prepare_insert(session, lex->query_tables,
 
1111
  
 
1112
  if (mysql_prepare_insert(session, lex->query_tables,
1050
1113
                           lex->query_tables->table, lex->field_list, 0,
1051
1114
                           lex->update_list, lex->value_list,
1052
1115
                           lex->duplicates,
1070
1133
                             List<Item> *update_fields,
1071
1134
                             List<Item> *update_values,
1072
1135
                             enum_duplicates duplic,
1073
 
                             bool ignore_check_option_errors) :
1074
 
  table_list(table_list_par), table(table_par), fields(fields_par),
1075
 
  autoinc_value_of_last_inserted_row(0),
1076
 
  insert_into_view(table_list_par && 0 != 0)
 
1136
                             bool ignore_check_option_errors)
 
1137
  :table_list(table_list_par), table(table_par), fields(fields_par),
 
1138
   autoinc_value_of_last_inserted_row(0),
 
1139
   insert_into_view(table_list_par && 0 != 0)
1077
1140
{
 
1141
  memset(&info, 0, sizeof(info));
1078
1142
  info.handle_duplicates= duplic;
1079
1143
  info.ignore= ignore_check_option_errors;
1080
1144
  info.update_fields= update_fields;
1083
1147
 
1084
1148
 
1085
1149
int
1086
 
select_insert::prepare(List<Item> &values, Select_Lex_Unit *u)
 
1150
select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
1087
1151
{
1088
1152
  LEX *lex= session->lex;
1089
1153
  int res;
1090
1154
  table_map map= 0;
1091
 
  Select_Lex *lex_current_select_save= lex->current_select;
1092
 
 
 
1155
  SELECT_LEX *lex_current_select_save= lex->current_select;
 
1156
  
1093
1157
 
1094
1158
  unit= u;
1095
1159
 
1105
1169
 
1106
1170
  if (!res && fields->elements)
1107
1171
  {
1108
 
    bool saved_abort_on_warning= session->abortOnWarning();
1109
 
    session->setAbortOnWarning(not info.ignore);
1110
 
    res= check_that_all_fields_are_given_values(session, table_list->table,
 
1172
    bool saved_abort_on_warning= session->abort_on_warning;
 
1173
    session->abort_on_warning= !info.ignore;
 
1174
    res= check_that_all_fields_are_given_values(session, table_list->table, 
1111
1175
                                                table_list);
1112
 
    session->setAbortOnWarning(saved_abort_on_warning);
 
1176
    session->abort_on_warning= saved_abort_on_warning;
1113
1177
  }
1114
1178
 
1115
1179
  if (info.handle_duplicates == DUP_UPDATE && !res)
1127
1191
    res= res || check_update_fields(session, context->table_list,
1128
1192
                                    *info.update_fields, &map);
1129
1193
    /*
1130
 
      When we are not using GROUP BY and there are no ungrouped aggregate functions
 
1194
      When we are not using GROUP BY and there are no ungrouped aggregate functions 
1131
1195
      we can refer to other tables in the ON DUPLICATE KEY part.
1132
1196
      We use next_name_resolution_table descructively, so check it first (views?)
1133
1197
    */
1138
1202
        We must make a single context out of the two separate name resolution contexts :
1139
1203
        the INSERT table and the tables in the SELECT part of INSERT ... SELECT.
1140
1204
        To do that we must concatenate the two lists
1141
 
      */
1142
 
      table_list->next_name_resolution_table=
 
1205
      */  
 
1206
      table_list->next_name_resolution_table= 
1143
1207
        ctx_state.get_first_name_resolution_table();
1144
1208
 
1145
1209
    res= res || setup_fields(session, 0, *info.update_values,
1179
1243
    Is table which we are changing used somewhere in other parts of
1180
1244
    query
1181
1245
  */
1182
 
  if (unique_table(table_list, table_list->next_global))
 
1246
  if (unique_table(session, table_list, table_list->next_global, 0))
1183
1247
  {
1184
1248
    /* Using same table for INSERT and SELECT */
1185
1249
    lex->current_select->options|= OPTION_BUFFER_RESULT;
1188
1252
  else if (!(lex->current_select->options & OPTION_BUFFER_RESULT))
1189
1253
  {
1190
1254
    /*
1191
 
      We must not yet prepare the result table if it is the same as one of the
1192
 
      source tables (INSERT SELECT). The preparation may disable
 
1255
      We must not yet prepare the result table if it is the same as one of the 
 
1256
      source tables (INSERT SELECT). The preparation may disable 
1193
1257
      indexes on the result table, which may be used during the select, if it
1194
1258
      is the same table (Bug #6034). Do the preparation after the select phase
1195
1259
      in select_insert::prepare2().
1196
1260
      We won't start bulk inserts at all if this statement uses functions or
1197
1261
      should invoke triggers since they may access to the same table too.
1198
1262
    */
1199
 
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
1263
    table->file->ha_start_bulk_insert((ha_rows) 0);
1200
1264
  }
1201
 
  table->restoreRecordAsDefault();              // Get empty record
 
1265
  restore_record(table,s->default_values);              // Get empty record
1202
1266
  table->next_number_field=table->found_next_number_field;
1203
1267
 
 
1268
  if (session->slave_thread &&
 
1269
      (info.handle_duplicates == DUP_UPDATE) &&
 
1270
      (table->next_number_field != NULL) &&
 
1271
      rpl_master_has_bug(&active_mi->rli, 24432))
 
1272
    return(1);
 
1273
 
1204
1274
  session->cuted_fields=0;
1205
 
 
1206
1275
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
1207
 
    table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
1208
 
 
 
1276
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
1209
1277
  if (info.handle_duplicates == DUP_REPLACE)
1210
 
    table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1211
 
 
 
1278
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1212
1279
  if (info.handle_duplicates == DUP_UPDATE)
1213
 
    table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1214
 
 
1215
 
  session->setAbortOnWarning(not info.ignore);
 
1280
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
1281
  session->abort_on_warning= !info.ignore;
1216
1282
  table->mark_columns_needed_for_insert();
1217
1283
 
1218
1284
 
1231
1297
    If the result table is the same as one of the source tables (INSERT SELECT),
1232
1298
    the result table is not finally prepared at the join prepair phase.
1233
1299
    Do the final preparation now.
1234
 
 
 
1300
                       
1235
1301
  RETURN
1236
1302
    0   OK
1237
1303
*/
1238
1304
 
1239
1305
int select_insert::prepare2(void)
1240
1306
{
1241
 
 
 
1307
  
1242
1308
  if (session->lex->current_select->options & OPTION_BUFFER_RESULT)
1243
 
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
1309
    table->file->ha_start_bulk_insert((ha_rows) 0);
1244
1310
  return(0);
1245
1311
}
1246
1312
 
1253
1319
 
1254
1320
select_insert::~select_insert()
1255
1321
{
1256
 
 
 
1322
  
1257
1323
  if (table)
1258
1324
  {
1259
1325
    table->next_number_field=0;
1260
1326
    table->auto_increment_field_not_null= false;
1261
 
    table->cursor->ha_reset();
 
1327
    table->file->ha_reset();
1262
1328
  }
1263
1329
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
1264
 
  session->setAbortOnWarning(false);
 
1330
  session->abort_on_warning= 0;
1265
1331
  return;
1266
1332
}
1267
1333
 
1268
1334
 
1269
1335
bool select_insert::send_data(List<Item> &values)
1270
1336
{
1271
 
 
 
1337
  
1272
1338
  bool error=0;
1273
1339
 
1274
1340
  if (unit->offset_limit_cnt)
1275
1341
  {                                             // using limit offset,count
1276
1342
    unit->offset_limit_cnt--;
1277
 
    return false;
 
1343
    return(0);
1278
1344
  }
1279
1345
 
1280
1346
  session->count_cuted_fields= CHECK_FIELD_WARN;        // Calculate cuted fields
1281
1347
  store_values(values);
1282
1348
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
1283
1349
  if (session->is_error())
1284
 
    return true;
1285
 
 
1286
 
  // Release latches in case bulk insert takes a long time
1287
 
  plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
 
1350
    return(1);
1288
1351
 
1289
1352
  error= write_record(session, table, &info);
1290
 
  table->auto_increment_field_not_null= false;
1291
 
 
 
1353
    
1292
1354
  if (!error)
1293
1355
  {
1294
1356
    if (info.handle_duplicates == DUP_UPDATE)
1296
1358
      /*
1297
1359
        Restore fields of the record since it is possible that they were
1298
1360
        changed by ON DUPLICATE KEY UPDATE clause.
1299
 
 
 
1361
    
1300
1362
        If triggers exist then whey can modify some fields which were not
1301
1363
        originally touched by INSERT ... SELECT, so we have to restore
1302
1364
        their original values for the next row.
1303
1365
      */
1304
 
      table->restoreRecordAsDefault();
 
1366
      restore_record(table, s->default_values);
1305
1367
    }
1306
1368
    if (table->next_number_field)
1307
1369
    {
1310
1372
        value we just saw, we may need to send it to client in the end.
1311
1373
      */
1312
1374
      if (session->first_successful_insert_id_in_cur_stmt == 0) // optimization
1313
 
        autoinc_value_of_last_inserted_row=
 
1375
        autoinc_value_of_last_inserted_row= 
1314
1376
          table->next_number_field->val_int();
1315
1377
      /*
1316
1378
        Clear auto-increment field for the next record, if triggers are used
1326
1388
void select_insert::store_values(List<Item> &values)
1327
1389
{
1328
1390
  if (fields->elements)
1329
 
    fill_record(session, *fields, values, true);
 
1391
    fill_record(session, *fields, values, 1);
1330
1392
  else
1331
 
    fill_record(session, table->getFields(), values, true);
 
1393
    fill_record(session, table->field, values, 1);
1332
1394
}
1333
1395
 
1334
 
void select_insert::send_error(drizzled::error_t errcode,const char *err)
 
1396
void select_insert::send_error(uint32_t errcode,const char *err)
1335
1397
{
 
1398
  
 
1399
 
1336
1400
  my_message(errcode, err, MYF(0));
 
1401
 
 
1402
  return;
1337
1403
}
1338
1404
 
1339
1405
 
1340
1406
bool select_insert::send_eof()
1341
1407
{
1342
1408
  int error;
1343
 
  bool const trans_table= table->cursor->has_transactions();
 
1409
  bool const trans_table= table->file->has_transactions();
1344
1410
  uint64_t id;
1345
1411
  bool changed;
1346
 
 
1347
 
  error= table->cursor->ha_end_bulk_insert();
1348
 
  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1349
 
  table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1412
  Session::killed_state killed_status= session->killed;
 
1413
  
 
1414
  error= table->file->ha_end_bulk_insert();
 
1415
  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1416
  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1350
1417
 
1351
1418
  if ((changed= (info.copied || info.deleted || info.updated)))
1352
1419
  {
1353
1420
    /*
1354
1421
      We must invalidate the table in the query cache before binlog writing
1355
 
      and autocommitOrRollback.
 
1422
      and ha_autocommit_or_rollback.
1356
1423
    */
1357
 
    if (session->transaction.stmt.hasModifiedNonTransData())
1358
 
      session->transaction.all.markModifiedNonTransData();
 
1424
    if (session->transaction.stmt.modified_non_trans_table)
 
1425
      session->transaction.all.modified_non_trans_table= true;
1359
1426
  }
1360
 
  assert(trans_table || !changed ||
1361
 
              session->transaction.stmt.hasModifiedNonTransData());
 
1427
  assert(trans_table || !changed || 
 
1428
              session->transaction.stmt.modified_non_trans_table);
1362
1429
 
1363
 
  table->cursor->ha_release_auto_increment();
 
1430
  /*
 
1431
    Write to binlog before commiting transaction.  No statement will
 
1432
    be written by the binlog_query() below in RBR mode.  All the
 
1433
    events are in the transaction cache and will be written when
 
1434
    ha_autocommit_or_rollback() is issued below.
 
1435
  */
 
1436
  if (mysql_bin_log.is_open())
 
1437
  {
 
1438
    if (!error)
 
1439
      session->clear_error();
 
1440
    session->binlog_query(Session::ROW_QUERY_TYPE,
 
1441
                      session->query, session->query_length,
 
1442
                      trans_table, false, killed_status);
 
1443
  }
 
1444
  table->file->ha_release_auto_increment();
1364
1445
 
1365
1446
  if (error)
1366
1447
  {
1367
 
    table->print_error(error,MYF(0));
1368
 
    DRIZZLE_INSERT_SELECT_DONE(error, 0);
1369
 
    return 1;
 
1448
    table->file->print_error(error,MYF(0));
 
1449
    return(1);
1370
1450
  }
1371
1451
  char buff[160];
1372
1452
  if (info.ignore)
1373
 
    snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
 
1453
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
1374
1454
            (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
1375
1455
  else
1376
 
    snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
 
1456
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
1377
1457
            (ulong) (info.deleted+info.updated), (ulong) session->cuted_fields);
1378
 
  session->row_count_func= info.copied + info.deleted + info.updated;
 
1458
  session->row_count_func= info.copied + info.deleted +
 
1459
                       ((session->client_capabilities & CLIENT_FOUND_ROWS) ?
 
1460
                        info.touched : info.updated);
1379
1461
 
1380
1462
  id= (session->first_successful_insert_id_in_cur_stmt > 0) ?
1381
1463
    session->first_successful_insert_id_in_cur_stmt :
1382
1464
    (session->arg_of_last_insert_id_function ?
1383
1465
     session->first_successful_insert_id_in_prev_stmt :
1384
1466
     (info.copied ? autoinc_value_of_last_inserted_row : 0));
1385
 
  session->my_ok((ulong) session->rowCount(),
1386
 
                 info.copied + info.deleted + info.touched, id, buff);
1387
 
  session->status_var.inserted_row_count+= session->rowCount(); 
1388
 
  DRIZZLE_INSERT_SELECT_DONE(0, session->rowCount());
1389
 
  return 0;
 
1467
  ::my_ok(session, (ulong) session->row_count_func, id, buff);
 
1468
  return(0);
1390
1469
}
1391
1470
 
1392
1471
void select_insert::abort() {
1393
1472
 
1394
 
 
 
1473
  
1395
1474
  /*
1396
1475
    If the creation of the table failed (due to a syntax error, for
1397
1476
    example), no table will have been opened and therefore 'table'
1402
1481
  {
1403
1482
    bool changed, transactional_table;
1404
1483
 
1405
 
    table->cursor->ha_end_bulk_insert();
 
1484
    table->file->ha_end_bulk_insert();
1406
1485
 
1407
1486
    /*
1408
1487
      If at least one row has been inserted/modified and will stay in
1419
1498
      zero, so no check for that is made.
1420
1499
    */
1421
1500
    changed= (info.copied || info.deleted || info.updated);
1422
 
    transactional_table= table->cursor->has_transactions();
 
1501
    transactional_table= table->file->has_transactions();
 
1502
    if (session->transaction.stmt.modified_non_trans_table)
 
1503
    {
 
1504
        if (mysql_bin_log.is_open())
 
1505
          session->binlog_query(Session::ROW_QUERY_TYPE, session->query, session->query_length,
 
1506
                            transactional_table, false);
 
1507
        if (!session->current_stmt_binlog_row_based && !can_rollback_data())
 
1508
          session->transaction.all.modified_non_trans_table= true;
 
1509
    }
1423
1510
    assert(transactional_table || !changed ||
1424
 
                session->transaction.stmt.hasModifiedNonTransData());
1425
 
    table->cursor->ha_release_auto_increment();
1426
 
  }
1427
 
 
1428
 
  if (DRIZZLE_INSERT_SELECT_DONE_ENABLED())
1429
 
  {
1430
 
    DRIZZLE_INSERT_SELECT_DONE(0, info.copied + info.deleted + info.updated);
 
1511
                session->transaction.stmt.modified_non_trans_table);
 
1512
    table->file->ha_release_auto_increment();
1431
1513
  }
1432
1514
 
1433
1515
  return;
1454
1536
      items        in     List of items which should be used to produce rest
1455
1537
                          of fields for the table (corresponding fields will
1456
1538
                          be added to the end of alter_info->create_list)
1457
 
      lock         out    Pointer to the DrizzleLock object for table created
 
1539
      lock         out    Pointer to the DRIZZLE_LOCK object for table created
1458
1540
                          (or open temporary table) will be returned in this
1459
1541
                          parameter. Since this table is not included in
1460
1542
                          Session::lock caller is responsible for explicitly
1464
1546
  NOTES
1465
1547
    This function behaves differently for base and temporary tables:
1466
1548
    - For base table we assume that either table exists and was pre-opened
1467
 
      and locked at openTablesLock() stage (and in this case we just
 
1549
      and locked at open_and_lock_tables() stage (and in this case we just
1468
1550
      emit error or warning and return pre-opened Table object) or special
1469
1551
      placeholder was put in table cache that guarantees that this table
1470
1552
      won't be created or opened until the placeholder will be removed
1482
1564
 
1483
1565
static Table *create_table_from_items(Session *session, HA_CREATE_INFO *create_info,
1484
1566
                                      TableList *create_table,
1485
 
                                      message::Table &table_proto,
1486
 
                                      AlterInfo *alter_info,
 
1567
                                      Alter_info *alter_info,
1487
1568
                                      List<Item> *items,
1488
 
                                      bool is_if_not_exists,
1489
 
                                      DrizzleLock **lock,
1490
 
                                      identifier::Table::const_reference identifier)
 
1569
                                      DRIZZLE_LOCK **lock,
 
1570
                                      Tableop_hooks *hooks)
1491
1571
{
1492
 
  TableShare share(message::Table::INTERNAL);
 
1572
  Table tmp_table;              // Used during 'Create_field()'
 
1573
  TABLE_SHARE share;
 
1574
  Table *table= 0;
1493
1575
  uint32_t select_field_count= items->elements;
1494
1576
  /* Add selected items to field list */
1495
1577
  List_iterator_fast<Item> it(*items);
1496
1578
  Item *item;
1497
1579
  Field *tmp_field;
 
1580
  bool not_used;
1498
1581
 
1499
 
  if (not (identifier.isTmp()) && create_table->table->db_stat)
 
1582
  if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
 
1583
      create_table->table->db_stat)
1500
1584
  {
1501
 
    /* Table already exists and was open at openTablesLock() stage. */
1502
 
    if (is_if_not_exists)
 
1585
    /* Table already exists and was open at open_and_lock_tables() stage. */
 
1586
    if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
1503
1587
    {
1504
1588
      create_info->table_existed= 1;            // Mark that table existed
1505
1589
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1506
1590
                          ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
1507
 
                          create_table->getTableName());
1508
 
      return create_table->table;
 
1591
                          create_table->table_name);
 
1592
      return(create_table->table);
1509
1593
    }
1510
1594
 
1511
 
    my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
1512
 
    return NULL;
 
1595
    my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
 
1596
    return(0);
1513
1597
  }
1514
1598
 
 
1599
  tmp_table.alias= 0;
 
1600
  tmp_table.timestamp_field= 0;
 
1601
  tmp_table.s= &share;
 
1602
  init_tmp_table_share(session, &share, "", 0, "", "");
 
1603
 
 
1604
  tmp_table.s->db_create_options=0;
 
1605
  tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
 
1606
  tmp_table.s->db_low_byte_first= 
 
1607
        test(create_info->db_type == myisam_hton ||
 
1608
             create_info->db_type == heap_hton);
 
1609
  tmp_table.null_row= false;
 
1610
  tmp_table.maybe_null= false;
 
1611
 
 
1612
  while ((item=it++))
1515
1613
  {
1516
 
    table::Shell tmp_table(share);              // Used during 'CreateField()'
1517
 
 
1518
 
    if (not table_proto.engine().name().compare("MyISAM"))
1519
 
      tmp_table.getMutableShare()->db_low_byte_first= true;
1520
 
    else if (not table_proto.engine().name().compare("MEMORY"))
1521
 
      tmp_table.getMutableShare()->db_low_byte_first= true;
1522
 
 
1523
 
    tmp_table.in_use= session;
1524
 
 
1525
 
    while ((item=it++))
1526
 
    {
1527
 
      CreateField *cr_field;
1528
 
      Field *field, *def_field;
1529
 
      if (item->type() == Item::FUNC_ITEM)
1530
 
      {
1531
 
        if (item->result_type() != STRING_RESULT)
1532
 
        {
1533
 
          field= item->tmp_table_field(&tmp_table);
1534
 
        }
1535
 
        else
1536
 
        {
1537
 
          field= item->tmp_table_field_from_field_type(&tmp_table, 0);
1538
 
        }
1539
 
      }
 
1614
    Create_field *cr_field;
 
1615
    Field *field, *def_field;
 
1616
    if (item->type() == Item::FUNC_ITEM)
 
1617
      if (item->result_type() != STRING_RESULT)
 
1618
        field= item->tmp_table_field(&tmp_table);
1540
1619
      else
1541
 
      {
1542
 
        field= create_tmp_field(session, &tmp_table, item, item->type(),
1543
 
                                (Item ***) 0, &tmp_field, &def_field, false,
1544
 
                                false, false, 0);
1545
 
      }
1546
 
 
1547
 
      if (!field ||
1548
 
          !(cr_field=new CreateField(field,(item->type() == Item::FIELD_ITEM ?
1549
 
                                            ((Item_field *)item)->field :
1550
 
                                            (Field*) 0))))
1551
 
      {
1552
 
        return NULL;
1553
 
      }
1554
 
 
1555
 
      if (item->maybe_null)
1556
 
      {
1557
 
        cr_field->flags &= ~NOT_NULL_FLAG;
1558
 
      }
1559
 
 
1560
 
      alter_info->create_list.push_back(cr_field);
1561
 
    }
 
1620
        field= item->tmp_table_field_from_field_type(&tmp_table, 0);
 
1621
    else
 
1622
      field= create_tmp_field(session, &tmp_table, item, item->type(),
 
1623
                              (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0,
 
1624
                              0);
 
1625
    if (!field ||
 
1626
        !(cr_field=new Create_field(field,(item->type() == Item::FIELD_ITEM ?
 
1627
                                           ((Item_field *)item)->field :
 
1628
                                           (Field*) 0))))
 
1629
      return(0);
 
1630
    if (item->maybe_null)
 
1631
      cr_field->flags &= ~NOT_NULL_FLAG;
 
1632
    alter_info->create_list.push_back(cr_field);
1562
1633
  }
1563
1634
 
1564
1635
  /*
1567
1638
    Note that we either creating (or opening existing) temporary table or
1568
1639
    creating base table on which name we have exclusive lock. So code below
1569
1640
    should not cause deadlocks or races.
 
1641
 
 
1642
    We don't log the statement, it will be logged later.
 
1643
 
 
1644
    If this is a HEAP table, the automatic DELETE FROM which is written to the
 
1645
    binlog when a HEAP table is opened for the first time since startup, must
 
1646
    not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
 
1647
    don't want to delete from it) 2) it would be written before the CREATE
 
1648
    Table, which is a wrong order. So we keep binary logging disabled when we
 
1649
    open_table().
1570
1650
  */
1571
 
  Table *table= 0;
1572
1651
  {
1573
 
    if (not create_table_no_lock(session,
1574
 
                                 identifier,
1575
 
                                 create_info,
1576
 
                                 table_proto,
1577
 
                                 alter_info,
1578
 
                                 false,
1579
 
                                 select_field_count,
1580
 
                                 is_if_not_exists))
 
1652
    tmp_disable_binlog(session);
 
1653
    if (!mysql_create_table_no_lock(session, create_table->db,
 
1654
                                    create_table->table_name,
 
1655
                                    create_info, alter_info, 0,
 
1656
                                    select_field_count, true))
1581
1657
    {
1582
 
      if (create_info->table_existed && not identifier.isTmp())
 
1658
      if (create_info->table_existed &&
 
1659
          !(create_info->options & HA_LEX_CREATE_TMP_TABLE))
1583
1660
      {
1584
1661
        /*
1585
1662
          This means that someone created table underneath server
1586
1663
          or it was created via different mysqld front-end to the
1587
1664
          cluster. We don't have much options but throw an error.
1588
1665
        */
1589
 
        my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
1590
 
        return NULL;
 
1666
        my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
 
1667
        return(0);
1591
1668
      }
1592
1669
 
1593
 
      if (not identifier.isTmp())
 
1670
      if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
1594
1671
      {
1595
 
        /* CREATE TABLE... has found that the table already exists for insert and is adapting to use it */
1596
 
        boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex());
1597
 
 
1598
 
        if (create_table->table)
 
1672
        pthread_mutex_lock(&LOCK_open);
 
1673
        if (reopen_name_locked_table(session, create_table, false))
1599
1674
        {
1600
 
          table::Concurrent *concurrent_table= static_cast<table::Concurrent *>(create_table->table);
1601
 
 
1602
 
          if (concurrent_table->reopen_name_locked_table(create_table, session))
1603
 
          {
1604
 
            (void)plugin::StorageEngine::dropTable(*session, identifier);
1605
 
          }
1606
 
          else
1607
 
          {
1608
 
            table= create_table->table;
1609
 
          }
 
1675
          quick_rm_table(create_info->db_type, create_table->db,
 
1676
                         table_case_name(create_info, create_table->table_name),
 
1677
                         0);
1610
1678
        }
1611
1679
        else
1612
 
        {
1613
 
          (void)plugin::StorageEngine::dropTable(*session, identifier);
1614
 
        }
 
1680
          table= create_table->table;
 
1681
        pthread_mutex_unlock(&LOCK_open);
1615
1682
      }
1616
1683
      else
1617
1684
      {
1618
 
        if (not (table= session->openTable(create_table, (bool*) 0,
1619
 
                                           DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
1620
 
            not create_info->table_existed)
 
1685
        if (!(table= open_table(session, create_table, (bool*) 0,
 
1686
                                DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
 
1687
            !create_info->table_existed)
1621
1688
        {
1622
1689
          /*
1623
1690
            This shouldn't happen as creation of temporary table should make
1624
1691
            it preparable for open. But let us do close_temporary_table() here
1625
1692
            just in case.
1626
1693
          */
1627
 
          session->drop_temporary_table(identifier);
 
1694
          drop_temporary_table(session, create_table);
1628
1695
        }
1629
1696
      }
1630
1697
    }
1631
 
    if (not table)                                   // open failed
1632
 
      return NULL;
 
1698
    reenable_binlog(session);
 
1699
    if (!table)                                   // open failed
 
1700
      return(0);
1633
1701
  }
1634
1702
 
1635
1703
  table->reginfo.lock_type=TL_WRITE;
1636
 
  if (not ((*lock)= session->lockTables(&table, 1, DRIZZLE_LOCK_IGNORE_FLUSH)))
 
1704
  hooks->prelock(&table, 1);                    // Call prelock hooks
 
1705
  if (! ((*lock)= mysql_lock_tables(session, &table, 1,
 
1706
                                    DRIZZLE_LOCK_IGNORE_FLUSH, &not_used)) ||
 
1707
        hooks->postlock(&table, 1))
1637
1708
  {
1638
1709
    if (*lock)
1639
1710
    {
1640
 
      session->unlockTables(*lock);
 
1711
      mysql_unlock_tables(session, *lock);
1641
1712
      *lock= 0;
1642
1713
    }
1643
1714
 
1644
 
    if (not create_info->table_existed)
1645
 
      session->drop_open_table(table, identifier);
1646
 
    return NULL;
 
1715
    if (!create_info->table_existed)
 
1716
      drop_open_table(session, table, create_table->db, create_table->table_name);
 
1717
    return(0);
1647
1718
  }
1648
 
 
1649
 
  return table;
 
1719
  return(table);
1650
1720
}
1651
1721
 
1652
1722
 
1653
1723
int
1654
 
select_create::prepare(List<Item> &values, Select_Lex_Unit *u)
 
1724
select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
1655
1725
{
1656
 
  DrizzleLock *extra_lock= NULL;
 
1726
  DRIZZLE_LOCK *extra_lock= NULL;
 
1727
  
 
1728
 
 
1729
  Tableop_hooks *hook_ptr= NULL;
1657
1730
  /*
1658
 
    For replication, the CREATE-SELECT statement is written
1659
 
    in two pieces: the first transaction messsage contains 
1660
 
    the CREATE TABLE statement as a CreateTableStatement message
1661
 
    necessary to create the table.
1662
 
    
1663
 
    The second transaction message contains all the InsertStatement
1664
 
    and associated InsertRecords that should go into the table.
 
1731
    For row-based replication, the CREATE-SELECT statement is written
 
1732
    in two pieces: the first one contain the CREATE TABLE statement
 
1733
    necessary to create the table and the second part contain the rows
 
1734
    that should go into the table.
 
1735
 
 
1736
    For non-temporary tables, the start of the CREATE-SELECT
 
1737
    implicitly commits the previous transaction, and all events
 
1738
    forming the statement will be stored the transaction cache. At end
 
1739
    of the statement, the entire statement is committed as a
 
1740
    transaction, and all events are written to the binary log.
 
1741
 
 
1742
    On the master, the table is locked for the duration of the
 
1743
    statement, but since the CREATE part is replicated as a simple
 
1744
    statement, there is no way to lock the table for accesses on the
 
1745
    slave.  Hence, we have to hold on to the CREATE part of the
 
1746
    statement until the statement has finished.
1665
1747
   */
 
1748
  class MY_HOOKS : public Tableop_hooks {
 
1749
  public:
 
1750
    MY_HOOKS(select_create *x, TableList *create_table,
 
1751
             TableList *select_tables)
 
1752
      : ptr(x), all_tables(*create_table)
 
1753
      {
 
1754
        all_tables.next_global= select_tables;
 
1755
      }
 
1756
 
 
1757
  private:
 
1758
    virtual int do_postlock(Table **tables, uint32_t count)
 
1759
    {
 
1760
      Session *session= const_cast<Session*>(ptr->get_session());
 
1761
      if (int error= decide_logging_format(session, &all_tables))
 
1762
        return error;
 
1763
 
 
1764
      Table const *const table = *tables;
 
1765
      if (session->current_stmt_binlog_row_based  &&
 
1766
          !table->s->tmp_table &&
 
1767
          !ptr->get_create_info()->table_existed)
 
1768
      {
 
1769
        ptr->binlog_show_create_table(tables, count);
 
1770
      }
 
1771
      return 0;
 
1772
    }
 
1773
 
 
1774
    select_create *ptr;
 
1775
    TableList all_tables;
 
1776
  };
 
1777
 
 
1778
  MY_HOOKS hooks(this, create_table, select_tables);
 
1779
  hook_ptr= &hooks;
1666
1780
 
1667
1781
  unit= u;
1668
1782
 
1669
 
  if (not (table= create_table_from_items(session, create_info, create_table,
1670
 
                                          table_proto,
1671
 
                                          alter_info, &values,
1672
 
                                          is_if_not_exists,
1673
 
                                          &extra_lock, identifier)))
 
1783
  /*
 
1784
    Start a statement transaction before the create if we are using
 
1785
    row-based replication for the statement.  If we are creating a
 
1786
    temporary table, we need to start a statement transaction.
 
1787
  */
 
1788
  if ((session->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 &&
 
1789
      session->current_stmt_binlog_row_based)
1674
1790
  {
 
1791
    session->binlog_start_trans_and_stmt();
 
1792
  }
 
1793
 
 
1794
  if (!(table= create_table_from_items(session, create_info, create_table,
 
1795
                                       alter_info, &values,
 
1796
                                       &extra_lock, hook_ptr)))
1675
1797
    return(-1);                         // abort() deletes table
1676
 
  }
1677
1798
 
1678
1799
  if (extra_lock)
1679
1800
  {
1680
1801
    assert(m_plock == NULL);
1681
1802
 
1682
 
    if (identifier.isTmp())
 
1803
    if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
1683
1804
      m_plock= &m_lock;
1684
1805
    else
1685
1806
      m_plock= &session->extra_lock;
1687
1808
    *m_plock= extra_lock;
1688
1809
  }
1689
1810
 
1690
 
  if (table->getShare()->sizeFields() < values.elements)
 
1811
  if (table->s->fields < values.elements)
1691
1812
  {
1692
1813
    my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
1693
1814
    return(-1);
1694
1815
  }
1695
1816
 
1696
1817
 /* First field to copy */
1697
 
  field= table->getFields() + table->getShare()->sizeFields() - values.elements;
 
1818
  field= table->field+table->s->fields - values.elements;
1698
1819
 
1699
1820
  /* Mark all fields that are given values */
1700
1821
  for (Field **f= field ; *f ; f++)
1701
 
  {
1702
 
    table->setWriteSet((*f)->position());
1703
 
  }
 
1822
    bitmap_set_bit(table->write_set, (*f)->field_index);
1704
1823
 
1705
1824
  /* Don't set timestamp if used */
1706
1825
  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
1707
1826
  table->next_number_field=table->found_next_number_field;
1708
1827
 
1709
 
  table->restoreRecordAsDefault();      // Get empty record
 
1828
  restore_record(table,s->default_values);      // Get empty record
1710
1829
  session->cuted_fields=0;
1711
1830
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
1712
 
    table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
1713
 
 
 
1831
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
1714
1832
  if (info.handle_duplicates == DUP_REPLACE)
1715
 
    table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1716
 
 
 
1833
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1717
1834
  if (info.handle_duplicates == DUP_UPDATE)
1718
 
    table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1719
 
 
1720
 
  table->cursor->ha_start_bulk_insert((ha_rows) 0);
1721
 
  session->setAbortOnWarning(not info.ignore);
 
1835
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
1836
  table->file->ha_start_bulk_insert((ha_rows) 0);
 
1837
  session->abort_on_warning= !info.ignore;
1722
1838
  if (check_that_all_fields_are_given_values(session, table, table_list))
1723
1839
    return(1);
1724
 
 
1725
1840
  table->mark_columns_needed_for_insert();
1726
 
  table->cursor->extra(HA_EXTRA_WRITE_CACHE);
 
1841
  table->file->extra(HA_EXTRA_WRITE_CACHE);
1727
1842
  return(0);
1728
1843
}
1729
1844
 
 
1845
void
 
1846
select_create::binlog_show_create_table(Table **tables, uint32_t count)
 
1847
{
 
1848
  /*
 
1849
    Note 1: In RBR mode, we generate a CREATE TABLE statement for the
 
1850
    created table by calling store_create_info() (behaves as SHOW
 
1851
    CREATE TABLE).  In the event of an error, nothing should be
 
1852
    written to the binary log, even if the table is non-transactional;
 
1853
    therefore we pretend that the generated CREATE TABLE statement is
 
1854
    for a transactional table.  The event will then be put in the
 
1855
    transaction cache, and any subsequent events (e.g., table-map
 
1856
    events and binrow events) will also be put there.  We can then use
 
1857
    ha_autocommit_or_rollback() to either throw away the entire
 
1858
    kaboodle of events, or write them to the binary log.
 
1859
 
 
1860
    We write the CREATE TABLE statement here and not in prepare()
 
1861
    since there potentially are sub-selects or accesses to information
 
1862
    schema that will do a close_thread_tables(), destroying the
 
1863
    statement transaction cache.
 
1864
  */
 
1865
  assert(session->current_stmt_binlog_row_based);
 
1866
  assert(tables && *tables && count > 0);
 
1867
 
 
1868
  char buf[2048];
 
1869
  String query(buf, sizeof(buf), system_charset_info);
 
1870
  int result;
 
1871
  TableList tmp_table_list;
 
1872
 
 
1873
  memset(&tmp_table_list, 0, sizeof(tmp_table_list));
 
1874
  tmp_table_list.table = *tables;
 
1875
  query.length(0);      // Have to zero it since constructor doesn't
 
1876
 
 
1877
  result= store_create_info(session, &tmp_table_list, &query, create_info);
 
1878
  assert(result == 0); /* store_create_info() always return 0 */
 
1879
 
 
1880
  session->binlog_query(Session::STMT_QUERY_TYPE,
 
1881
                    query.ptr(), query.length(),
 
1882
                    /* is_trans */ true,
 
1883
                    /* suppress_use */ false);
 
1884
}
 
1885
 
1730
1886
void select_create::store_values(List<Item> &values)
1731
1887
{
1732
 
  fill_record(session, field, values, true);
 
1888
  fill_record(session, field, values, 1);
1733
1889
}
1734
1890
 
1735
1891
 
1736
 
void select_create::send_error(drizzled::error_t errcode,const char *err)
 
1892
void select_create::send_error(uint32_t errcode,const char *err)
1737
1893
{
 
1894
  
 
1895
 
1738
1896
  /*
1739
1897
    This will execute any rollbacks that are necessary before writing
1740
1898
    the transcation cache.
1746
1904
    written to the binary log.
1747
1905
 
1748
1906
  */
 
1907
  tmp_disable_binlog(session);
1749
1908
  select_insert::send_error(errcode, err);
 
1909
  reenable_binlog(session);
 
1910
 
 
1911
  return;
1750
1912
}
1751
1913
 
1752
1914
 
1762
1924
      tables.  This can fail, but we should unlock the table
1763
1925
      nevertheless.
1764
1926
    */
1765
 
    if (!table->getShare()->getType())
 
1927
    if (!table->s->tmp_table)
1766
1928
    {
1767
 
      TransactionServices &transaction_services= TransactionServices::singleton();
1768
 
      transaction_services.autocommitOrRollback(*session, 0);
1769
 
      (void) session->endActiveTransaction();
 
1929
      ha_autocommit_or_rollback(session, 0);
 
1930
      end_active_trans(session);
1770
1931
    }
1771
1932
 
1772
 
    table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1773
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1933
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1934
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1774
1935
    if (m_plock)
1775
1936
    {
1776
 
      session->unlockTables(*m_plock);
 
1937
      mysql_unlock_tables(session, *m_plock);
1777
1938
      *m_plock= NULL;
1778
1939
      m_plock= NULL;
1779
1940
    }
1784
1945
 
1785
1946
void select_create::abort()
1786
1947
{
 
1948
  
 
1949
 
1787
1950
  /*
1788
1951
    In select_insert::abort() we roll back the statement, including
1789
1952
    truncating the transaction cache of the binary log. To do this, we
1799
1962
    of the table succeeded or not, since we need to reset the binary
1800
1963
    log state.
1801
1964
  */
 
1965
  tmp_disable_binlog(session);
1802
1966
  select_insert::abort();
 
1967
  session->transaction.stmt.modified_non_trans_table= false;
 
1968
  reenable_binlog(session);
 
1969
 
1803
1970
 
1804
1971
  if (m_plock)
1805
1972
  {
1806
 
    session->unlockTables(*m_plock);
 
1973
    mysql_unlock_tables(session, *m_plock);
1807
1974
    *m_plock= NULL;
1808
1975
    m_plock= NULL;
1809
1976
  }
1810
1977
 
1811
1978
  if (table)
1812
1979
  {
1813
 
    table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1814
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1815
 
    if (not create_info->table_existed)
1816
 
      session->drop_open_table(table, identifier);
1817
 
    table= NULL;                                    // Safety
 
1980
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1981
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1982
    if (!create_info->table_existed)
 
1983
      drop_open_table(session, table, create_table->db, create_table->table_name);
 
1984
    table=0;                                    // Safety
1818
1985
  }
 
1986
  return;
1819
1987
}
1820
1988
 
1821
 
} /* namespace drizzled */
 
1989
 
 
1990
/*****************************************************************************
 
1991
  Instansiate templates
 
1992
*****************************************************************************/
 
1993
 
 
1994
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
1995
template class List_iterator_fast<List_item>;
 
1996
#endif /* HAVE_EXPLICIT_TEMPLATE_INSTANTIATION */