~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_insert.cc

  • Committer: Monty Taylor
  • Date: 2011-02-13 17:26:39 UTC
  • mfrom: (2157.2.2 give-in-to-pkg-config)
  • mto: This revision was merged to the branch mainline in revision 2166.
  • Revision ID: mordred@inaugust.com-20110213172639-nhy7i72sfhoq13ms
Merged in pkg-config fixes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
14
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
15
15
 
16
16
 
17
17
/* Insert of records */
18
18
 
19
 
#include <drizzled/server_includes.h>
 
19
#include "config.h"
 
20
#include <cstdio>
20
21
#include <drizzled/sql_select.h>
21
22
#include <drizzled/show.h>
22
23
#include <drizzled/error.h>
24
25
#include <drizzled/probes.h>
25
26
#include <drizzled/sql_base.h>
26
27
#include <drizzled/sql_load.h>
27
 
#include <drizzled/field/timestamp.h>
 
28
#include <drizzled/field/epoch.h>
28
29
#include <drizzled/lock.h>
29
 
 
 
30
#include "drizzled/sql_table.h"
 
31
#include "drizzled/pthread_globals.h"
 
32
#include "drizzled/transaction_services.h"
 
33
#include "drizzled/plugin/transactional_storage_engine.h"
 
34
 
 
35
#include "drizzled/table/shell.h"
 
36
 
 
37
namespace drizzled
 
38
{
 
39
 
 
40
extern plugin::StorageEngine *heap_engine;
 
41
extern plugin::StorageEngine *myisam_engine;
30
42
 
31
43
/*
32
44
  Check if insert fields are correct.
58
70
 
59
71
  if (fields.elements == 0 && values.elements != 0)
60
72
  {
61
 
    if (values.elements != table->s->fields)
 
73
    if (values.elements != table->getShare()->sizeFields())
62
74
    {
63
75
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
64
76
      return -1;
111
123
    if (table->timestamp_field) // Don't automaticly set timestamp if used
112
124
    {
113
125
      if (table->timestamp_field->isWriteSet())
 
126
      {
114
127
        clear_timestamp_auto_bits(table->timestamp_field_type,
115
128
                                  TIMESTAMP_AUTO_SET_ON_INSERT);
 
129
      }
116
130
      else
117
131
      {
118
 
        table->setWriteSet(table->timestamp_field->field_index);
 
132
        table->setWriteSet(table->timestamp_field->position());
119
133
      }
120
134
    }
121
135
  }
156
170
      Unmark the timestamp field so that we can check if this is modified
157
171
      by update_fields
158
172
    */
159
 
    timestamp_mark= bitmap_test_and_clear(table->write_set,
160
 
                                          table->timestamp_field->field_index);
 
173
    timestamp_mark= table->write_set->test(table->timestamp_field->position());
 
174
    table->write_set->reset(table->timestamp_field->position());
161
175
  }
162
176
 
163
177
  /* Check the fields we are going to modify */
168
182
  {
169
183
    /* Don't set timestamp column if this is modified. */
170
184
    if (table->timestamp_field->isWriteSet())
 
185
    {
171
186
      clear_timestamp_auto_bits(table->timestamp_field_type,
172
187
                                TIMESTAMP_AUTO_SET_ON_UPDATE);
 
188
    }
 
189
 
173
190
    if (timestamp_mark)
174
 
      table->setWriteSet(table->timestamp_field->field_index);
 
191
    {
 
192
      table->setWriteSet(table->timestamp_field->position());
 
193
    }
175
194
  }
176
195
  return 0;
177
196
}
208
227
  end of dispatch_command().
209
228
*/
210
229
 
211
 
bool mysql_insert(Session *session,TableList *table_list,
 
230
bool insert_query(Session *session,TableList *table_list,
212
231
                  List<Item> &fields,
213
232
                  List<List_item> &values_list,
214
233
                  List<Item> &update_fields,
222
241
  uint32_t value_count;
223
242
  ulong counter = 1;
224
243
  uint64_t id;
225
 
  COPY_INFO info;
 
244
  CopyInfo info;
226
245
  Table *table= 0;
227
246
  List_iterator_fast<List_item> its(values_list);
228
247
  List_item *values;
240
259
                    values_list.elements > 1);
241
260
 
242
261
  if (session->openTablesLock(table_list))
 
262
  {
 
263
    DRIZZLE_INSERT_DONE(1, 0);
243
264
    return true;
 
265
  }
244
266
 
245
267
  lock_type= table_list->lock_type;
246
268
 
249
271
  values= its++;
250
272
  value_count= values->elements;
251
273
 
252
 
  if (mysql_prepare_insert(session, table_list, table, fields, values,
 
274
  if (prepare_insert(session, table_list, table, fields, values,
253
275
                           update_fields, update_values, duplic, &unused_conds,
254
276
                           false,
255
277
                           (fields.elements || !value_count ||
256
278
                            (0) != 0), !ignore))
257
 
    goto abort;
 
279
  {
 
280
    if (table != NULL)
 
281
      table->cursor->ha_release_auto_increment();
 
282
    if (!joins_freed)
 
283
      free_underlaid_joins(session, &session->lex->select_lex);
 
284
    session->setAbortOnWarning(false);
 
285
    DRIZZLE_INSERT_DONE(1, 0);
 
286
    return true;
 
287
  }
258
288
 
259
289
  /* mysql_prepare_insert set table_list->table if it was not set */
260
290
  table= table_list->table;
285
315
    if (values->elements != value_count)
286
316
    {
287
317
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
288
 
      goto abort;
 
318
 
 
319
      if (table != NULL)
 
320
        table->cursor->ha_release_auto_increment();
 
321
      if (!joins_freed)
 
322
        free_underlaid_joins(session, &session->lex->select_lex);
 
323
      session->setAbortOnWarning(false);
 
324
      DRIZZLE_INSERT_DONE(1, 0);
 
325
 
 
326
      return true;
289
327
    }
290
328
    if (setup_fields(session, 0, *values, MARK_COLUMNS_READ, 0, 0))
291
 
      goto abort;
 
329
    {
 
330
      if (table != NULL)
 
331
        table->cursor->ha_release_auto_increment();
 
332
      if (!joins_freed)
 
333
        free_underlaid_joins(session, &session->lex->select_lex);
 
334
      session->setAbortOnWarning(false);
 
335
      DRIZZLE_INSERT_DONE(1, 0);
 
336
      return true;
 
337
    }
292
338
  }
293
339
  its.rewind ();
294
340
 
296
342
  ctx_state.restore_state(context, table_list);
297
343
 
298
344
  /*
299
 
    Fill in the given fields and dump it to the table file
 
345
    Fill in the given fields and dump it to the table cursor
300
346
  */
301
 
  memset(&info, 0, sizeof(info));
302
347
  info.ignore= ignore;
303
348
  info.handle_duplicates=duplic;
304
349
  info.update_fields= &update_fields;
309
354
    For single line insert, generate an error if try to set a NOT NULL field
310
355
    to NULL.
311
356
  */
312
 
  session->count_cuted_fields= ((values_list.elements == 1 &&
313
 
                             !ignore) ?
314
 
                            CHECK_FIELD_ERROR_FOR_NULL :
315
 
                            CHECK_FIELD_WARN);
 
357
  session->count_cuted_fields= ignore ? CHECK_FIELD_WARN : CHECK_FIELD_ERROR_FOR_NULL;
 
358
 
316
359
  session->cuted_fields = 0L;
317
360
  table->next_number_field=table->found_next_number_field;
318
361
 
319
362
  error=0;
320
363
  session->set_proc_info("update");
321
364
  if (duplic == DUP_REPLACE)
322
 
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
365
    table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
323
366
  if (duplic == DUP_UPDATE)
324
 
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
367
    table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
325
368
  {
326
369
    if (duplic != DUP_ERROR || ignore)
327
 
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
328
 
    table->file->ha_start_bulk_insert(values_list.elements);
 
370
      table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
371
    table->cursor->ha_start_bulk_insert(values_list.elements);
329
372
  }
330
373
 
331
374
 
332
 
  session->abort_on_warning= !ignore;
 
375
  session->setAbortOnWarning(not ignore);
333
376
 
334
377
  table->mark_columns_needed_for_insert();
335
378
 
338
381
    if (fields.elements || !value_count)
339
382
    {
340
383
      table->restoreRecordAsDefault();  // Get empty record
341
 
      if (fill_record(session, fields, *values, 0))
 
384
      if (fill_record(session, fields, *values))
342
385
      {
343
386
        if (values_list.elements != 1 && ! session->is_error())
344
387
        {
356
399
    }
357
400
    else
358
401
    {
359
 
      if (session->used_tables)                 // Column used in values()
360
 
        table->restoreRecordAsDefault();        // Get empty record
361
 
      else
362
 
      {
363
 
        /*
364
 
          Fix delete marker. No need to restore rest of record since it will
365
 
          be overwritten by fill_record() anyway (and fill_record() does not
366
 
          use default values in this case).
367
 
        */
368
 
        table->record[0][0]= table->s->default_values[0];
369
 
      }
370
 
      if (fill_record(session, table->field, *values, 0))
 
402
      table->restoreRecordAsDefault();  // Get empty record
 
403
 
 
404
      if (fill_record(session, table->getFields(), *values))
371
405
      {
372
406
        if (values_list.elements != 1 && ! session->is_error())
373
407
        {
380
414
    }
381
415
 
382
416
    // Release latches in case bulk insert takes a long time
383
 
    ha_release_temporary_latches(session);
 
417
    plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
384
418
 
385
419
    error=write_record(session, table ,&info);
386
420
    if (error)
400
434
      Do not do this release if this is a delayed insert, it would steal
401
435
      auto_inc values from the delayed_insert thread as they share Table.
402
436
    */
403
 
    table->file->ha_release_auto_increment();
404
 
    if (table->file->ha_end_bulk_insert() && !error)
 
437
    table->cursor->ha_release_auto_increment();
 
438
    if (table->cursor->ha_end_bulk_insert() && !error)
405
439
    {
406
 
      table->file->print_error(my_errno,MYF(0));
 
440
      table->print_error(errno,MYF(0));
407
441
      error=1;
408
442
    }
409
443
    if (duplic != DUP_ERROR || ignore)
410
 
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
444
      table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
411
445
 
412
 
    transactional_table= table->file->has_transactions();
 
446
    transactional_table= table->cursor->has_transactions();
413
447
 
414
448
    changed= (info.copied || info.deleted || info.updated);
415
 
    if ((changed && error <= 0) || session->transaction.stmt.modified_non_trans_table)
 
449
    if ((changed && error <= 0) || session->transaction.stmt.hasModifiedNonTransData())
416
450
    {
417
 
      if (session->transaction.stmt.modified_non_trans_table)
418
 
        session->transaction.all.modified_non_trans_table= true;
 
451
      if (session->transaction.stmt.hasModifiedNonTransData())
 
452
        session->transaction.all.markModifiedNonTransData();
419
453
    }
420
 
    assert(transactional_table || !changed || session->transaction.stmt.modified_non_trans_table);
 
454
    assert(transactional_table || !changed || session->transaction.stmt.hasModifiedNonTransData());
421
455
 
422
456
  }
423
457
  session->set_proc_info("end");
441
475
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
442
476
  table->auto_increment_field_not_null= false;
443
477
  if (duplic == DUP_REPLACE)
444
 
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
478
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
445
479
 
446
480
  if (error)
447
 
    goto abort;
 
481
  {
 
482
    if (table != NULL)
 
483
      table->cursor->ha_release_auto_increment();
 
484
    if (!joins_freed)
 
485
      free_underlaid_joins(session, &session->lex->select_lex);
 
486
    session->setAbortOnWarning(false);
 
487
    DRIZZLE_INSERT_DONE(1, 0);
 
488
    return true;
 
489
  }
 
490
 
448
491
  if (values_list.elements == 1 && (!(session->options & OPTION_WARNINGS) ||
449
492
                                    !session->cuted_fields))
450
493
  {
451
494
    session->row_count_func= info.copied + info.deleted + info.updated;
452
 
    session->my_ok((ulong) session->row_count_func,
 
495
    session->my_ok((ulong) session->rowCount(),
453
496
                   info.copied + info.deleted + info.touched, id);
454
497
  }
455
498
  else
456
499
  {
457
500
    char buff[160];
458
501
    if (ignore)
459
 
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
 
502
      snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
460
503
              (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
461
504
    else
462
 
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
 
505
      snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
463
506
              (ulong) (info.deleted + info.updated), (ulong) session->cuted_fields);
464
507
    session->row_count_func= info.copied + info.deleted + info.updated;
465
 
    session->my_ok((ulong) session->row_count_func,
 
508
    session->my_ok((ulong) session->rowCount(),
466
509
                   info.copied + info.deleted + info.touched, id, buff);
467
510
  }
468
 
  session->abort_on_warning= 0;
469
 
  DRIZZLE_INSERT_END();
470
 
  return(false);
 
511
  session->status_var.inserted_row_count+= session->rowCount();
 
512
  session->setAbortOnWarning(false);
 
513
  DRIZZLE_INSERT_DONE(0, session->rowCount());
471
514
 
472
 
abort:
473
 
  if (table != NULL)
474
 
    table->file->ha_release_auto_increment();
475
 
  if (!joins_freed)
476
 
    free_underlaid_joins(session, &session->lex->select_lex);
477
 
  session->abort_on_warning= 0;
478
 
  DRIZZLE_INSERT_END();
479
 
  return(true);
 
515
  return false;
480
516
}
481
517
 
482
518
 
484
520
  Check if table can be updated
485
521
 
486
522
  SYNOPSIS
487
 
     mysql_prepare_insert_check_table()
 
523
     prepare_insert_check_table()
488
524
     session            Thread handle
489
525
     table_list         Table list
490
526
     fields             List of fields to be updated
496
532
     true  ERROR
497
533
*/
498
534
 
499
 
static bool mysql_prepare_insert_check_table(Session *session, TableList *table_list,
 
535
static bool prepare_insert_check_table(Session *session, TableList *table_list,
500
536
                                             List<Item> &,
501
537
                                             bool select_insert)
502
538
{
524
560
  Prepare items in INSERT statement
525
561
 
526
562
  SYNOPSIS
527
 
    mysql_prepare_insert()
 
563
    prepare_insert()
528
564
    session                     Thread handler
529
565
    table_list          Global/local table list
530
566
    table               Table to insert into (can be NULL if table should
551
587
    true  error
552
588
*/
553
589
 
554
 
bool mysql_prepare_insert(Session *session, TableList *table_list,
 
590
bool prepare_insert(Session *session, TableList *table_list,
555
591
                          Table *table, List<Item> &fields, List_item *values,
556
592
                          List<Item> &update_fields, List<Item> &update_values,
557
593
                          enum_duplicates duplic,
574
610
    inserting (for INSERT ... SELECT this is done by changing table_list,
575
611
    because INSERT ... SELECT share Select_Lex it with SELECT.
576
612
  */
577
 
  if (!select_insert)
 
613
  if (not select_insert)
578
614
  {
579
615
    for (Select_Lex_Unit *un= select_lex->first_inner_unit();
580
616
         un;
596
632
      return(true);
597
633
  }
598
634
 
599
 
  if (mysql_prepare_insert_check_table(session, table_list, fields, select_insert))
 
635
  if (prepare_insert_check_table(session, table_list, fields, select_insert))
600
636
    return(true);
601
637
 
602
638
 
622
658
 
623
659
    if (!res && check_fields)
624
660
    {
625
 
      bool saved_abort_on_warning= session->abort_on_warning;
626
 
      session->abort_on_warning= abort_on_warning;
 
661
      bool saved_abort_on_warning= session->abortOnWarning();
 
662
 
 
663
      session->setAbortOnWarning(abort_on_warning);
627
664
      res= check_that_all_fields_are_given_values(session,
628
665
                                                  table ? table :
629
666
                                                  context->table_list->table,
630
667
                                                  context->table_list);
631
 
      session->abort_on_warning= saved_abort_on_warning;
 
668
      session->setAbortOnWarning(saved_abort_on_warning);
632
669
    }
633
670
 
634
671
    if (!res && duplic == DUP_UPDATE)
639
676
    /* Restore the current context. */
640
677
    ctx_state.restore_state(context, table_list);
641
678
 
642
 
    if (!res)
 
679
    if (not res)
643
680
      res= setup_fields(session, 0, update_values, MARK_COLUMNS_READ, 0, 0);
644
681
  }
645
682
 
646
683
  if (res)
647
684
    return(res);
648
685
 
649
 
  if (!table)
 
686
  if (not table)
650
687
    table= table_list->table;
651
688
 
652
 
  if (!select_insert)
 
689
  if (not select_insert)
653
690
  {
654
691
    TableList *duplicate;
655
 
    if ((duplicate= unique_table(session, table_list, table_list->next_global, 1)))
 
692
    if ((duplicate= unique_table(table_list, table_list->next_global, true)))
656
693
    {
657
694
      my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->alias);
658
695
 
659
696
      return true;
660
697
    }
661
698
  }
 
699
 
662
700
  if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
663
701
    table->prepare_for_position();
664
702
 
670
708
 
671
709
static int last_uniq_key(Table *table,uint32_t keynr)
672
710
{
673
 
  while (++keynr < table->s->keys)
 
711
  while (++keynr < table->getShare()->sizeKeys())
674
712
    if (table->key_info[keynr].flags & HA_NOSAME)
675
713
      return 0;
676
714
  return 1;
685
723
     write_record()
686
724
      session   - thread context
687
725
      table - table to which record should be written
688
 
      info  - COPY_INFO structure describing handling of duplicates
 
726
      info  - CopyInfo structure describing handling of duplicates
689
727
              and which is used for counting number of records inserted
690
728
              and deleted.
691
729
 
695
733
    then both on update triggers will work instead. Similarly both on
696
734
    delete triggers will be invoked if we will delete conflicting records.
697
735
 
698
 
    Sets session->transaction.stmt.modified_non_trans_table to true if table which is updated didn't have
 
736
    Sets session->transaction.stmt.modified_non_trans_data to true if table which is updated didn't have
699
737
    transactions.
700
738
 
701
739
  RETURN VALUE
704
742
*/
705
743
 
706
744
 
707
 
int write_record(Session *session, Table *table,COPY_INFO *info)
 
745
int write_record(Session *session, Table *table,CopyInfo *info)
708
746
{
709
747
  int error;
710
 
  char *key=0;
711
 
  MY_BITMAP *save_read_set, *save_write_set;
712
 
  uint64_t prev_insert_id= table->file->next_insert_id;
 
748
  std::vector<unsigned char> key;
 
749
  boost::dynamic_bitset<> *save_read_set, *save_write_set;
 
750
  uint64_t prev_insert_id= table->cursor->next_insert_id;
713
751
  uint64_t insert_id_for_cur_row= 0;
714
752
 
715
753
 
719
757
 
720
758
  if (info->handle_duplicates == DUP_REPLACE || info->handle_duplicates == DUP_UPDATE)
721
759
  {
722
 
    while ((error=table->file->ha_write_row(table->record[0])))
 
760
    while ((error=table->cursor->insertRecord(table->getInsertRecord())))
723
761
    {
724
762
      uint32_t key_nr;
725
763
      /*
729
767
        the autogenerated value to avoid session->insert_id_for_cur_row to become
730
768
        0.
731
769
      */
732
 
      if (table->file->insert_id_for_cur_row > 0)
733
 
        insert_id_for_cur_row= table->file->insert_id_for_cur_row;
 
770
      if (table->cursor->insert_id_for_cur_row > 0)
 
771
        insert_id_for_cur_row= table->cursor->insert_id_for_cur_row;
734
772
      else
735
 
        table->file->insert_id_for_cur_row= insert_id_for_cur_row;
 
773
        table->cursor->insert_id_for_cur_row= insert_id_for_cur_row;
736
774
      bool is_duplicate_key_error;
737
 
      if (table->file->is_fatal_error(error, HA_CHECK_DUP))
 
775
      if (table->cursor->is_fatal_error(error, HA_CHECK_DUP))
738
776
        goto err;
739
 
      is_duplicate_key_error= table->file->is_fatal_error(error, 0);
 
777
      is_duplicate_key_error= table->cursor->is_fatal_error(error, 0);
740
778
      if (!is_duplicate_key_error)
741
779
      {
742
780
        /*
748
786
          goto gok_or_after_err; /* Ignoring a not fatal error, return 0 */
749
787
        goto err;
750
788
      }
751
 
      if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
 
789
      if ((int) (key_nr = table->get_dup_key(error)) < 0)
752
790
      {
753
791
        error= HA_ERR_FOUND_DUPP_KEY;         /* Database can't find key */
754
792
        goto err;
762
800
      */
763
801
      if (info->handle_duplicates == DUP_REPLACE &&
764
802
          table->next_number_field &&
765
 
          key_nr == table->s->next_number_index &&
 
803
          key_nr == table->getShare()->next_number_index &&
766
804
          (insert_id_for_cur_row > 0))
767
805
        goto err;
768
 
      if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
 
806
      if (table->cursor->getEngine()->check_flag(HTON_BIT_DUPLICATE_POS))
769
807
      {
770
 
        if (table->file->rnd_pos(table->record[1],table->file->dup_ref))
 
808
        if (table->cursor->rnd_pos(table->getUpdateRecord(),table->cursor->dup_ref))
771
809
          goto err;
772
810
      }
773
811
      else
774
812
      {
775
 
        if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
 
813
        if (table->cursor->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
776
814
        {
777
 
          error=my_errno;
 
815
          error=errno;
778
816
          goto err;
779
817
        }
780
818
 
781
 
        if (!key)
 
819
        if (not key.size())
782
820
        {
783
 
          if (!(key=(char*) malloc(table->s->max_unique_length)))
784
 
          {
785
 
            error=ENOMEM;
786
 
            goto err;
787
 
          }
 
821
          key.resize(table->getShare()->max_unique_length);
788
822
        }
789
 
        key_copy((unsigned char*) key,table->record[0],table->key_info+key_nr,0);
790
 
        if ((error=(table->file->index_read_idx_map(table->record[1],key_nr,
791
 
                                                    (unsigned char*) key, HA_WHOLE_KEY,
 
823
        key_copy(&key[0], table->getInsertRecord(), table->key_info+key_nr, 0);
 
824
        if ((error=(table->cursor->index_read_idx_map(table->getUpdateRecord(),key_nr,
 
825
                                                    &key[0], HA_WHOLE_KEY,
792
826
                                                    HA_READ_KEY_EXACT))))
793
827
          goto err;
794
828
      }
799
833
          that matches, is updated. If update causes a conflict again,
800
834
          an error is returned
801
835
        */
802
 
        assert(table->insert_values != NULL);
 
836
        assert(table->insert_values.size());
803
837
        table->storeRecordAsInsert();
804
838
        table->restoreRecord();
805
839
        assert(info->update_fields->elements ==
809
843
                                                 info->ignore))
810
844
          goto before_err;
811
845
 
812
 
        table->file->restore_auto_increment(prev_insert_id);
 
846
        table->cursor->restore_auto_increment(prev_insert_id);
813
847
        if (table->next_number_field)
814
 
          table->file->adjust_next_insert_id_after_explicit_value(
 
848
          table->cursor->adjust_next_insert_id_after_explicit_value(
815
849
            table->next_number_field->val_int());
816
850
        info->touched++;
817
 
        if ((table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ &&
818
 
             !bitmap_is_subset(table->write_set, table->read_set)) ||
819
 
            table->compare_record())
 
851
 
 
852
        if (! table->records_are_comparable() || table->compare_records())
820
853
        {
821
 
          if ((error=table->file->ha_update_row(table->record[1],
822
 
                                                table->record[0])) &&
 
854
          if ((error=table->cursor->updateRecord(table->getUpdateRecord(),
 
855
                                                table->getInsertRecord())) &&
823
856
              error != HA_ERR_RECORD_IS_THE_SAME)
824
857
          {
825
858
            if (info->ignore &&
826
 
                !table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
 
859
                !table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
827
860
            {
828
861
              goto gok_or_after_err;
829
862
            }
837
870
          /*
838
871
            If ON DUP KEY UPDATE updates a row instead of inserting one, it's
839
872
            like a regular UPDATE statement: it should not affect the value of a
840
 
            next SELECT LAST_INSERT_ID() or mysql_insert_id().
 
873
            next SELECT LAST_INSERT_ID() or insert_id().
841
874
            Except if LAST_INSERT_ID(#) was in the INSERT query, which is
842
875
            handled separately by Session::arg_of_last_insert_id_function.
843
876
          */
844
 
          insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
 
877
          insert_id_for_cur_row= table->cursor->insert_id_for_cur_row= 0;
845
878
          info->copied++;
846
879
        }
847
880
 
848
881
        if (table->next_number_field)
849
 
          table->file->adjust_next_insert_id_after_explicit_value(
 
882
          table->cursor->adjust_next_insert_id_after_explicit_value(
850
883
            table->next_number_field->val_int());
851
884
        info->touched++;
852
885
 
869
902
          ON UPDATE triggers.
870
903
        */
871
904
        if (last_uniq_key(table,key_nr) &&
872
 
            !table->file->referenced_by_foreign_key() &&
 
905
            !table->cursor->referenced_by_foreign_key() &&
873
906
            (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
874
907
             table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
875
908
        {
876
 
          if ((error=table->file->ha_update_row(table->record[1],
877
 
                                                table->record[0])) &&
 
909
          if ((error=table->cursor->updateRecord(table->getUpdateRecord(),
 
910
                                                table->getInsertRecord())) &&
878
911
              error != HA_ERR_RECORD_IS_THE_SAME)
879
912
            goto err;
880
913
          if (error != HA_ERR_RECORD_IS_THE_SAME)
881
914
            info->deleted++;
882
915
          else
883
916
            error= 0;
884
 
          session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
 
917
          session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
885
918
          /*
886
919
            Since we pretend that we have done insert we should call
887
920
            its after triggers.
890
923
        }
891
924
        else
892
925
        {
893
 
          if ((error=table->file->ha_delete_row(table->record[1])))
 
926
          if ((error=table->cursor->deleteRecord(table->getUpdateRecord())))
894
927
            goto err;
895
928
          info->deleted++;
896
 
          if (!table->file->has_transactions())
897
 
            session->transaction.stmt.modified_non_trans_table= true;
 
929
          if (!table->cursor->has_transactions())
 
930
            session->transaction.stmt.markModifiedNonTransData();
898
931
          /* Let us attempt do write_row() once more */
899
932
        }
900
933
      }
901
934
    }
902
 
    session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
 
935
    session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
903
936
    /*
904
937
      Restore column maps if they where replaced during an duplicate key
905
938
      problem.
906
939
    */
907
940
    if (table->read_set != save_read_set ||
908
941
        table->write_set != save_write_set)
909
 
      table->column_bitmaps_set(save_read_set, save_write_set);
 
942
      table->column_bitmaps_set(*save_read_set, *save_write_set);
910
943
  }
911
 
  else if ((error=table->file->ha_write_row(table->record[0])))
 
944
  else if ((error=table->cursor->insertRecord(table->getInsertRecord())))
912
945
  {
913
946
    if (!info->ignore ||
914
 
        table->file->is_fatal_error(error, HA_CHECK_DUP))
 
947
        table->cursor->is_fatal_error(error, HA_CHECK_DUP))
915
948
      goto err;
916
 
    table->file->restore_auto_increment(prev_insert_id);
 
949
    table->cursor->restore_auto_increment(prev_insert_id);
917
950
    goto gok_or_after_err;
918
951
  }
919
952
 
920
953
after_n_copied_inc:
921
954
  info->copied++;
922
 
  session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
 
955
  session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
923
956
 
924
957
gok_or_after_err:
925
 
  if (key)
926
 
    free(key);
927
 
  if (!table->file->has_transactions())
928
 
    session->transaction.stmt.modified_non_trans_table= true;
 
958
  if (!table->cursor->has_transactions())
 
959
    session->transaction.stmt.markModifiedNonTransData();
929
960
  return(0);
930
961
 
931
962
err:
933
964
  /* current_select is NULL if this is a delayed insert */
934
965
  if (session->lex->current_select)
935
966
    session->lex->current_select->no_error= 0;        // Give error
936
 
  table->file->print_error(error,MYF(0));
 
967
  table->print_error(error,MYF(0));
937
968
 
938
969
before_err:
939
 
  table->file->restore_auto_increment(prev_insert_id);
940
 
  if (key)
941
 
    free(key);
942
 
  table->column_bitmaps_set(save_read_set, save_write_set);
943
 
  return(1);
 
970
  table->cursor->restore_auto_increment(prev_insert_id);
 
971
  table->column_bitmaps_set(*save_read_set, *save_write_set);
 
972
  return 1;
944
973
}
945
974
 
946
975
 
953
982
{
954
983
  int err= 0;
955
984
 
956
 
  for (Field **field=entry->field ; *field ; field++)
 
985
  for (Field **field=entry->getFields() ; *field ; field++)
957
986
  {
958
987
    if (((*field)->isWriteSet()) == false)
959
988
    {
987
1016
      }
988
1017
    }
989
1018
  }
990
 
  return session->abort_on_warning ? err : 0;
 
1019
  return session->abortOnWarning() ? err : 0;
991
1020
}
992
1021
 
993
1022
/***************************************************************************
999
1028
  make insert specific preparation and checks after opening tables
1000
1029
 
1001
1030
  SYNOPSIS
1002
 
    mysql_insert_select_prepare()
 
1031
    insert_select_prepare()
1003
1032
    session         thread handler
1004
1033
 
1005
1034
  RETURN
1007
1036
    true  Error
1008
1037
*/
1009
1038
 
1010
 
bool mysql_insert_select_prepare(Session *session)
 
1039
bool insert_select_prepare(Session *session)
1011
1040
{
1012
1041
  LEX *lex= session->lex;
1013
1042
  Select_Lex *select_lex= &lex->select_lex;
1017
1046
    clause if table is VIEW
1018
1047
  */
1019
1048
 
1020
 
  if (mysql_prepare_insert(session, lex->query_tables,
 
1049
  if (prepare_insert(session, lex->query_tables,
1021
1050
                           lex->query_tables->table, lex->field_list, 0,
1022
1051
                           lex->update_list, lex->value_list,
1023
1052
                           lex->duplicates,
1041
1070
                             List<Item> *update_fields,
1042
1071
                             List<Item> *update_values,
1043
1072
                             enum_duplicates duplic,
1044
 
                             bool ignore_check_option_errors)
1045
 
  :table_list(table_list_par), table(table_par), fields(fields_par),
1046
 
   autoinc_value_of_last_inserted_row(0),
1047
 
   insert_into_view(table_list_par && 0 != 0)
 
1073
                             bool ignore_check_option_errors) :
 
1074
  table_list(table_list_par), table(table_par), fields(fields_par),
 
1075
  autoinc_value_of_last_inserted_row(0),
 
1076
  insert_into_view(table_list_par && 0 != 0)
1048
1077
{
1049
 
  memset(&info, 0, sizeof(info));
1050
1078
  info.handle_duplicates= duplic;
1051
1079
  info.ignore= ignore_check_option_errors;
1052
1080
  info.update_fields= update_fields;
1077
1105
 
1078
1106
  if (!res && fields->elements)
1079
1107
  {
1080
 
    bool saved_abort_on_warning= session->abort_on_warning;
1081
 
    session->abort_on_warning= !info.ignore;
 
1108
    bool saved_abort_on_warning= session->abortOnWarning();
 
1109
    session->setAbortOnWarning(not info.ignore);
1082
1110
    res= check_that_all_fields_are_given_values(session, table_list->table,
1083
1111
                                                table_list);
1084
 
    session->abort_on_warning= saved_abort_on_warning;
 
1112
    session->setAbortOnWarning(saved_abort_on_warning);
1085
1113
  }
1086
1114
 
1087
1115
  if (info.handle_duplicates == DUP_UPDATE && !res)
1151
1179
    Is table which we are changing used somewhere in other parts of
1152
1180
    query
1153
1181
  */
1154
 
  if (unique_table(session, table_list, table_list->next_global, 0))
 
1182
  if (unique_table(table_list, table_list->next_global))
1155
1183
  {
1156
1184
    /* Using same table for INSERT and SELECT */
1157
1185
    lex->current_select->options|= OPTION_BUFFER_RESULT;
1168
1196
      We won't start bulk inserts at all if this statement uses functions or
1169
1197
      should invoke triggers since they may access to the same table too.
1170
1198
    */
1171
 
    table->file->ha_start_bulk_insert((ha_rows) 0);
 
1199
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
1172
1200
  }
1173
1201
  table->restoreRecordAsDefault();              // Get empty record
1174
1202
  table->next_number_field=table->found_next_number_field;
1175
1203
 
1176
1204
  session->cuted_fields=0;
 
1205
 
1177
1206
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
1178
 
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
1207
    table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
1208
 
1179
1209
  if (info.handle_duplicates == DUP_REPLACE)
1180
 
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
1210
    table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
1211
 
1181
1212
  if (info.handle_duplicates == DUP_UPDATE)
1182
 
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1183
 
  session->abort_on_warning= !info.ignore;
 
1213
    table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
1214
 
 
1215
  session->setAbortOnWarning(not info.ignore);
1184
1216
  table->mark_columns_needed_for_insert();
1185
1217
 
1186
1218
 
1208
1240
{
1209
1241
 
1210
1242
  if (session->lex->current_select->options & OPTION_BUFFER_RESULT)
1211
 
    table->file->ha_start_bulk_insert((ha_rows) 0);
 
1243
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
1212
1244
  return(0);
1213
1245
}
1214
1246
 
1226
1258
  {
1227
1259
    table->next_number_field=0;
1228
1260
    table->auto_increment_field_not_null= false;
1229
 
    table->file->ha_reset();
 
1261
    table->cursor->ha_reset();
1230
1262
  }
1231
1263
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
1232
 
  session->abort_on_warning= 0;
 
1264
  session->setAbortOnWarning(false);
1233
1265
  return;
1234
1266
}
1235
1267
 
1242
1274
  if (unit->offset_limit_cnt)
1243
1275
  {                                             // using limit offset,count
1244
1276
    unit->offset_limit_cnt--;
1245
 
    return(0);
 
1277
    return false;
1246
1278
  }
1247
1279
 
1248
1280
  session->count_cuted_fields= CHECK_FIELD_WARN;        // Calculate cuted fields
1249
1281
  store_values(values);
1250
1282
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
1251
1283
  if (session->is_error())
1252
 
    return(1);
 
1284
    return true;
1253
1285
 
1254
1286
  // Release latches in case bulk insert takes a long time
1255
 
  ha_release_temporary_latches(session);
 
1287
  plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
1256
1288
 
1257
1289
  error= write_record(session, table, &info);
 
1290
  table->auto_increment_field_not_null= false;
1258
1291
 
1259
1292
  if (!error)
1260
1293
  {
1293
1326
void select_insert::store_values(List<Item> &values)
1294
1327
{
1295
1328
  if (fields->elements)
1296
 
    fill_record(session, *fields, values, 1);
 
1329
    fill_record(session, *fields, values, true);
1297
1330
  else
1298
 
    fill_record(session, table->field, values, 1);
 
1331
    fill_record(session, table->getFields(), values, true);
1299
1332
}
1300
1333
 
1301
 
void select_insert::send_error(uint32_t errcode,const char *err)
 
1334
void select_insert::send_error(drizzled::error_t errcode,const char *err)
1302
1335
{
1303
 
 
1304
 
 
1305
1336
  my_message(errcode, err, MYF(0));
1306
 
 
1307
 
  return;
1308
1337
}
1309
1338
 
1310
1339
 
1311
1340
bool select_insert::send_eof()
1312
1341
{
1313
1342
  int error;
1314
 
  bool const trans_table= table->file->has_transactions();
 
1343
  bool const trans_table= table->cursor->has_transactions();
1315
1344
  uint64_t id;
1316
1345
  bool changed;
1317
1346
 
1318
 
  error= table->file->ha_end_bulk_insert();
1319
 
  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1320
 
  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1347
  error= table->cursor->ha_end_bulk_insert();
 
1348
  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1349
  table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1321
1350
 
1322
1351
  if ((changed= (info.copied || info.deleted || info.updated)))
1323
1352
  {
1324
1353
    /*
1325
1354
      We must invalidate the table in the query cache before binlog writing
1326
 
      and ha_autocommit_or_rollback.
 
1355
      and autocommitOrRollback.
1327
1356
    */
1328
 
    if (session->transaction.stmt.modified_non_trans_table)
1329
 
      session->transaction.all.modified_non_trans_table= true;
 
1357
    if (session->transaction.stmt.hasModifiedNonTransData())
 
1358
      session->transaction.all.markModifiedNonTransData();
1330
1359
  }
1331
1360
  assert(trans_table || !changed ||
1332
 
              session->transaction.stmt.modified_non_trans_table);
 
1361
              session->transaction.stmt.hasModifiedNonTransData());
1333
1362
 
1334
 
  table->file->ha_release_auto_increment();
 
1363
  table->cursor->ha_release_auto_increment();
1335
1364
 
1336
1365
  if (error)
1337
1366
  {
1338
 
    table->file->print_error(error,MYF(0));
1339
 
    return(1);
 
1367
    table->print_error(error,MYF(0));
 
1368
    DRIZZLE_INSERT_SELECT_DONE(error, 0);
 
1369
    return 1;
1340
1370
  }
1341
1371
  char buff[160];
1342
1372
  if (info.ignore)
1343
 
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
 
1373
    snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
1344
1374
            (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
1345
1375
  else
1346
 
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
 
1376
    snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
1347
1377
            (ulong) (info.deleted+info.updated), (ulong) session->cuted_fields);
1348
1378
  session->row_count_func= info.copied + info.deleted + info.updated;
1349
1379
 
1352
1382
    (session->arg_of_last_insert_id_function ?
1353
1383
     session->first_successful_insert_id_in_prev_stmt :
1354
1384
     (info.copied ? autoinc_value_of_last_inserted_row : 0));
1355
 
  session->my_ok((ulong) session->row_count_func,
 
1385
  session->my_ok((ulong) session->rowCount(),
1356
1386
                 info.copied + info.deleted + info.touched, id, buff);
1357
 
  return(0);
 
1387
  session->status_var.inserted_row_count+= session->rowCount(); 
 
1388
  DRIZZLE_INSERT_SELECT_DONE(0, session->rowCount());
 
1389
  return 0;
1358
1390
}
1359
1391
 
1360
1392
void select_insert::abort() {
1370
1402
  {
1371
1403
    bool changed, transactional_table;
1372
1404
 
1373
 
    table->file->ha_end_bulk_insert();
 
1405
    table->cursor->ha_end_bulk_insert();
1374
1406
 
1375
1407
    /*
1376
1408
      If at least one row has been inserted/modified and will stay in
1387
1419
      zero, so no check for that is made.
1388
1420
    */
1389
1421
    changed= (info.copied || info.deleted || info.updated);
1390
 
    transactional_table= table->file->has_transactions();
 
1422
    transactional_table= table->cursor->has_transactions();
1391
1423
    assert(transactional_table || !changed ||
1392
 
                session->transaction.stmt.modified_non_trans_table);
1393
 
    table->file->ha_release_auto_increment();
 
1424
                session->transaction.stmt.hasModifiedNonTransData());
 
1425
    table->cursor->ha_release_auto_increment();
 
1426
  }
 
1427
 
 
1428
  if (DRIZZLE_INSERT_SELECT_DONE_ENABLED())
 
1429
  {
 
1430
    DRIZZLE_INSERT_SELECT_DONE(0, info.copied + info.deleted + info.updated);
1394
1431
  }
1395
1432
 
1396
1433
  return;
1417
1454
      items        in     List of items which should be used to produce rest
1418
1455
                          of fields for the table (corresponding fields will
1419
1456
                          be added to the end of alter_info->create_list)
1420
 
      lock         out    Pointer to the DRIZZLE_LOCK object for table created
 
1457
      lock         out    Pointer to the DrizzleLock object for table created
1421
1458
                          (or open temporary table) will be returned in this
1422
1459
                          parameter. Since this table is not included in
1423
1460
                          Session::lock caller is responsible for explicitly
1445
1482
 
1446
1483
static Table *create_table_from_items(Session *session, HA_CREATE_INFO *create_info,
1447
1484
                                      TableList *create_table,
1448
 
                                      drizzled::message::Table *table_proto,
1449
 
                                      Alter_info *alter_info,
 
1485
                                      message::Table &table_proto,
 
1486
                                      AlterInfo *alter_info,
1450
1487
                                      List<Item> *items,
1451
 
                                      DRIZZLE_LOCK **lock)
 
1488
                                      bool is_if_not_exists,
 
1489
                                      DrizzleLock **lock,
 
1490
                                      identifier::Table::const_reference identifier)
1452
1491
{
1453
 
  Table tmp_table;              // Used during 'CreateField()'
1454
 
  TableShare share;
1455
 
  Table *table= 0;
 
1492
  TableShare share(message::Table::INTERNAL);
1456
1493
  uint32_t select_field_count= items->elements;
1457
1494
  /* Add selected items to field list */
1458
1495
  List_iterator_fast<Item> it(*items);
1459
1496
  Item *item;
1460
1497
  Field *tmp_field;
1461
 
  bool not_used;
1462
1498
 
1463
 
  if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
1464
 
      create_table->table->db_stat)
 
1499
  if (not (identifier.isTmp()) && create_table->table->db_stat)
1465
1500
  {
1466
1501
    /* Table already exists and was open at openTablesLock() stage. */
1467
 
    if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
 
1502
    if (is_if_not_exists)
1468
1503
    {
1469
1504
      create_info->table_existed= 1;            // Mark that table existed
1470
1505
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1471
1506
                          ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
1472
 
                          create_table->table_name);
 
1507
                          create_table->getTableName());
1473
1508
      return create_table->table;
1474
1509
    }
1475
1510
 
1476
 
    my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
 
1511
    my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
1477
1512
    return NULL;
1478
1513
  }
1479
1514
 
1480
 
  tmp_table.alias= 0;
1481
 
  tmp_table.timestamp_field= 0;
1482
 
  tmp_table.s= &share;
1483
 
 
1484
 
  tmp_table.s->db_create_options=0;
1485
 
  tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
1486
 
  tmp_table.s->db_low_byte_first=
1487
 
        test(create_info->db_type == myisam_engine ||
1488
 
             create_info->db_type == heap_engine);
1489
 
  tmp_table.null_row= false;
1490
 
  tmp_table.maybe_null= false;
1491
 
 
1492
 
  while ((item=it++))
1493
1515
  {
1494
 
    CreateField *cr_field;
1495
 
    Field *field, *def_field;
1496
 
    if (item->type() == Item::FUNC_ITEM)
1497
 
      if (item->result_type() != STRING_RESULT)
1498
 
        field= item->tmp_table_field(&tmp_table);
 
1516
    table::Shell tmp_table(share);              // Used during 'CreateField()'
 
1517
 
 
1518
    if (not table_proto.engine().name().compare("MyISAM"))
 
1519
      tmp_table.getMutableShare()->db_low_byte_first= true;
 
1520
    else if (not table_proto.engine().name().compare("MEMORY"))
 
1521
      tmp_table.getMutableShare()->db_low_byte_first= true;
 
1522
 
 
1523
    tmp_table.in_use= session;
 
1524
 
 
1525
    while ((item=it++))
 
1526
    {
 
1527
      CreateField *cr_field;
 
1528
      Field *field, *def_field;
 
1529
      if (item->type() == Item::FUNC_ITEM)
 
1530
      {
 
1531
        if (item->result_type() != STRING_RESULT)
 
1532
        {
 
1533
          field= item->tmp_table_field(&tmp_table);
 
1534
        }
 
1535
        else
 
1536
        {
 
1537
          field= item->tmp_table_field_from_field_type(&tmp_table, 0);
 
1538
        }
 
1539
      }
1499
1540
      else
1500
 
        field= item->tmp_table_field_from_field_type(&tmp_table, 0);
1501
 
    else
1502
 
      field= create_tmp_field(session, &tmp_table, item, item->type(),
1503
 
                              (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0,
1504
 
                              0);
1505
 
    if (!field ||
1506
 
        !(cr_field=new CreateField(field,(item->type() == Item::FIELD_ITEM ?
1507
 
                                           ((Item_field *)item)->field :
1508
 
                                           (Field*) 0))))
1509
 
      return NULL;
1510
 
    if (item->maybe_null)
1511
 
      cr_field->flags &= ~NOT_NULL_FLAG;
1512
 
    alter_info->create_list.push_back(cr_field);
 
1541
      {
 
1542
        field= create_tmp_field(session, &tmp_table, item, item->type(),
 
1543
                                (Item ***) 0, &tmp_field, &def_field, false,
 
1544
                                false, false, 0);
 
1545
      }
 
1546
 
 
1547
      if (!field ||
 
1548
          !(cr_field=new CreateField(field,(item->type() == Item::FIELD_ITEM ?
 
1549
                                            ((Item_field *)item)->field :
 
1550
                                            (Field*) 0))))
 
1551
      {
 
1552
        return NULL;
 
1553
      }
 
1554
 
 
1555
      if (item->maybe_null)
 
1556
      {
 
1557
        cr_field->flags &= ~NOT_NULL_FLAG;
 
1558
      }
 
1559
 
 
1560
      alter_info->create_list.push_back(cr_field);
 
1561
    }
1513
1562
  }
1514
1563
 
1515
1564
  /*
1519
1568
    creating base table on which name we have exclusive lock. So code below
1520
1569
    should not cause deadlocks or races.
1521
1570
  */
 
1571
  Table *table= 0;
1522
1572
  {
1523
 
    if (!mysql_create_table_no_lock(session, create_table->db,
1524
 
                                    create_table->table_name,
1525
 
                                    create_info,
1526
 
                                    table_proto,
1527
 
                                    alter_info, false,
1528
 
                                    select_field_count))
 
1573
    if (not create_table_no_lock(session,
 
1574
                                 identifier,
 
1575
                                 create_info,
 
1576
                                 table_proto,
 
1577
                                 alter_info,
 
1578
                                 false,
 
1579
                                 select_field_count,
 
1580
                                 is_if_not_exists))
1529
1581
    {
1530
 
      if (create_info->table_existed &&
1531
 
          !(create_info->options & HA_LEX_CREATE_TMP_TABLE))
 
1582
      if (create_info->table_existed && not identifier.isTmp())
1532
1583
      {
1533
1584
        /*
1534
1585
          This means that someone created table underneath server
1535
1586
          or it was created via different mysqld front-end to the
1536
1587
          cluster. We don't have much options but throw an error.
1537
1588
        */
1538
 
        my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
 
1589
        my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
1539
1590
        return NULL;
1540
1591
      }
1541
1592
 
1542
 
      if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
 
1593
      if (not identifier.isTmp())
1543
1594
      {
1544
 
        pthread_mutex_lock(&LOCK_open); /* CREATE TABLE... has found that the table already exists for insert and is adapting to use it */
1545
 
        if (session->reopen_name_locked_table(create_table, false))
 
1595
        /* CREATE TABLE... has found that the table already exists for insert and is adapting to use it */
 
1596
        boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex());
 
1597
 
 
1598
        if (create_table->table)
1546
1599
        {
1547
 
          quick_rm_table(create_info->db_type, create_table->db,
1548
 
                         create_table->table_name, false);
 
1600
          table::Concurrent *concurrent_table= static_cast<table::Concurrent *>(create_table->table);
 
1601
 
 
1602
          if (concurrent_table->reopen_name_locked_table(create_table, session))
 
1603
          {
 
1604
            (void)plugin::StorageEngine::dropTable(*session, identifier);
 
1605
          }
 
1606
          else
 
1607
          {
 
1608
            table= create_table->table;
 
1609
          }
1549
1610
        }
1550
1611
        else
1551
 
          table= create_table->table;
1552
 
        pthread_mutex_unlock(&LOCK_open);
 
1612
        {
 
1613
          (void)plugin::StorageEngine::dropTable(*session, identifier);
 
1614
        }
1553
1615
      }
1554
1616
      else
1555
1617
      {
1556
 
        if (!(table= session->openTable(create_table, (bool*) 0,
1557
 
                                         DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
1558
 
            !create_info->table_existed)
 
1618
        if (not (table= session->openTable(create_table, (bool*) 0,
 
1619
                                           DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
 
1620
            not create_info->table_existed)
1559
1621
        {
1560
1622
          /*
1561
1623
            This shouldn't happen as creation of temporary table should make
1562
1624
            it preparable for open. But let us do close_temporary_table() here
1563
1625
            just in case.
1564
1626
          */
1565
 
          session->drop_temporary_table(create_table);
 
1627
          session->drop_temporary_table(identifier);
1566
1628
        }
1567
1629
      }
1568
1630
    }
1569
 
    if (!table)                                   // open failed
 
1631
    if (not table)                                   // open failed
1570
1632
      return NULL;
1571
1633
  }
1572
1634
 
1573
1635
  table->reginfo.lock_type=TL_WRITE;
1574
 
  if (! ((*lock)= mysql_lock_tables(session, &table, 1,
1575
 
                                    DRIZZLE_LOCK_IGNORE_FLUSH, &not_used)))
 
1636
  if (not ((*lock)= session->lockTables(&table, 1, DRIZZLE_LOCK_IGNORE_FLUSH)))
1576
1637
  {
1577
1638
    if (*lock)
1578
1639
    {
1579
 
      mysql_unlock_tables(session, *lock);
 
1640
      session->unlockTables(*lock);
1580
1641
      *lock= 0;
1581
1642
    }
1582
1643
 
1583
 
    if (!create_info->table_existed)
1584
 
      session->drop_open_table(table, create_table->db, create_table->table_name);
 
1644
    if (not create_info->table_existed)
 
1645
      session->drop_open_table(table, identifier);
1585
1646
    return NULL;
1586
1647
  }
1587
1648
 
1592
1653
int
1593
1654
select_create::prepare(List<Item> &values, Select_Lex_Unit *u)
1594
1655
{
1595
 
  DRIZZLE_LOCK *extra_lock= NULL;
 
1656
  DrizzleLock *extra_lock= NULL;
1596
1657
  /*
1597
 
    For row-based replication, the CREATE-SELECT statement is written
1598
 
    in two pieces: the first one contain the CREATE TABLE statement
1599
 
    necessary to create the table and the second part contain the rows
1600
 
    that should go into the table.
1601
 
 
1602
 
    For non-temporary tables, the start of the CREATE-SELECT
1603
 
    implicitly commits the previous transaction, and all events
1604
 
    forming the statement will be stored the transaction cache. At end
1605
 
    of the statement, the entire statement is committed as a
1606
 
    transaction, and all events are written to the binary log.
1607
 
 
1608
 
    On the master, the table is locked for the duration of the
1609
 
    statement, but since the CREATE part is replicated as a simple
1610
 
    statement, there is no way to lock the table for accesses on the
1611
 
    slave.  Hence, we have to hold on to the CREATE part of the
1612
 
    statement until the statement has finished.
 
1658
    For replication, the CREATE-SELECT statement is written
 
1659
    in two pieces: the first transaction messsage contains 
 
1660
    the CREATE TABLE statement as a CreateTableStatement message
 
1661
    necessary to create the table.
 
1662
    
 
1663
    The second transaction message contains all the InsertStatement
 
1664
    and associated InsertRecords that should go into the table.
1613
1665
   */
1614
1666
 
1615
1667
  unit= u;
1616
1668
 
1617
 
  /*
1618
 
    Start a statement transaction before the create if we are using
1619
 
    row-based replication for the statement.  If we are creating a
1620
 
    temporary table, we need to start a statement transaction.
1621
 
  */
1622
 
 
1623
 
  if (!(table= create_table_from_items(session, create_info, create_table,
1624
 
                                       table_proto,
1625
 
                                       alter_info, &values,
1626
 
                                       &extra_lock)))
 
1669
  if (not (table= create_table_from_items(session, create_info, create_table,
 
1670
                                          table_proto,
 
1671
                                          alter_info, &values,
 
1672
                                          is_if_not_exists,
 
1673
                                          &extra_lock, identifier)))
 
1674
  {
1627
1675
    return(-1);                         // abort() deletes table
 
1676
  }
1628
1677
 
1629
1678
  if (extra_lock)
1630
1679
  {
1631
1680
    assert(m_plock == NULL);
1632
1681
 
1633
 
    if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
 
1682
    if (identifier.isTmp())
1634
1683
      m_plock= &m_lock;
1635
1684
    else
1636
1685
      m_plock= &session->extra_lock;
1638
1687
    *m_plock= extra_lock;
1639
1688
  }
1640
1689
 
1641
 
  if (table->s->fields < values.elements)
 
1690
  if (table->getShare()->sizeFields() < values.elements)
1642
1691
  {
1643
1692
    my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
1644
1693
    return(-1);
1645
1694
  }
1646
1695
 
1647
1696
 /* First field to copy */
1648
 
  field= table->field+table->s->fields - values.elements;
 
1697
  field= table->getFields() + table->getShare()->sizeFields() - values.elements;
1649
1698
 
1650
1699
  /* Mark all fields that are given values */
1651
1700
  for (Field **f= field ; *f ; f++)
1652
 
    table->setWriteSet((*f)->field_index);
 
1701
  {
 
1702
    table->setWriteSet((*f)->position());
 
1703
  }
1653
1704
 
1654
1705
  /* Don't set timestamp if used */
1655
1706
  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
1658
1709
  table->restoreRecordAsDefault();      // Get empty record
1659
1710
  session->cuted_fields=0;
1660
1711
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
1661
 
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
1712
    table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
1713
 
1662
1714
  if (info.handle_duplicates == DUP_REPLACE)
1663
 
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
1715
    table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
1716
 
1664
1717
  if (info.handle_duplicates == DUP_UPDATE)
1665
 
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1666
 
  table->file->ha_start_bulk_insert((ha_rows) 0);
1667
 
  session->abort_on_warning= !info.ignore;
 
1718
    table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
1719
 
 
1720
  table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
1721
  session->setAbortOnWarning(not info.ignore);
1668
1722
  if (check_that_all_fields_are_given_values(session, table, table_list))
1669
1723
    return(1);
 
1724
 
1670
1725
  table->mark_columns_needed_for_insert();
1671
 
  table->file->extra(HA_EXTRA_WRITE_CACHE);
 
1726
  table->cursor->extra(HA_EXTRA_WRITE_CACHE);
1672
1727
  return(0);
1673
1728
}
1674
1729
 
1675
1730
void select_create::store_values(List<Item> &values)
1676
1731
{
1677
 
  fill_record(session, field, values, 1);
 
1732
  fill_record(session, field, values, true);
1678
1733
}
1679
1734
 
1680
1735
 
1681
 
void select_create::send_error(uint32_t errcode,const char *err)
 
1736
void select_create::send_error(drizzled::error_t errcode,const char *err)
1682
1737
{
1683
 
 
1684
 
 
1685
1738
  /*
1686
1739
    This will execute any rollbacks that are necessary before writing
1687
1740
    the transcation cache.
1694
1747
 
1695
1748
  */
1696
1749
  select_insert::send_error(errcode, err);
1697
 
 
1698
 
  return;
1699
1750
}
1700
1751
 
1701
1752
 
1711
1762
      tables.  This can fail, but we should unlock the table
1712
1763
      nevertheless.
1713
1764
    */
1714
 
    if (!table->s->tmp_table)
 
1765
    if (!table->getShare()->getType())
1715
1766
    {
1716
 
      ha_autocommit_or_rollback(session, 0);
 
1767
      TransactionServices &transaction_services= TransactionServices::singleton();
 
1768
      transaction_services.autocommitOrRollback(*session, 0);
1717
1769
      (void) session->endActiveTransaction();
1718
1770
    }
1719
1771
 
1720
 
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1721
 
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1772
    table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1773
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1722
1774
    if (m_plock)
1723
1775
    {
1724
 
      mysql_unlock_tables(session, *m_plock);
 
1776
      session->unlockTables(*m_plock);
1725
1777
      *m_plock= NULL;
1726
1778
      m_plock= NULL;
1727
1779
    }
1732
1784
 
1733
1785
void select_create::abort()
1734
1786
{
1735
 
 
1736
 
 
1737
1787
  /*
1738
1788
    In select_insert::abort() we roll back the statement, including
1739
1789
    truncating the transaction cache of the binary log. To do this, we
1750
1800
    log state.
1751
1801
  */
1752
1802
  select_insert::abort();
1753
 
  session->transaction.stmt.modified_non_trans_table= false;
1754
 
 
1755
1803
 
1756
1804
  if (m_plock)
1757
1805
  {
1758
 
    mysql_unlock_tables(session, *m_plock);
 
1806
    session->unlockTables(*m_plock);
1759
1807
    *m_plock= NULL;
1760
1808
    m_plock= NULL;
1761
1809
  }
1762
1810
 
1763
1811
  if (table)
1764
1812
  {
1765
 
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1766
 
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1767
 
    if (!create_info->table_existed)
1768
 
      session->drop_open_table(table, create_table->db, create_table->table_name);
 
1813
    table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1814
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1815
    if (not create_info->table_existed)
 
1816
      session->drop_open_table(table, identifier);
1769
1817
    table= NULL;                                    // Safety
1770
1818
  }
1771
1819
}
1772
1820
 
1773
 
 
1774
 
/*****************************************************************************
1775
 
  Instansiate templates
1776
 
*****************************************************************************/
1777
 
 
1778
 
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
1779
 
template class List_iterator_fast<List_item>;
1780
 
#endif /* HAVE_EXPLICIT_TEMPLATE_INSTANTIATION */
 
1821
} /* namespace drizzled */