~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_insert.cc

  • Committer: Nathan Williams
  • Date: 2009-06-05 22:51:06 UTC
  • mto: This revision was merged to the branch mainline in revision 1063.
  • Revision ID: nathanlws@gmail.com-20090605225106-8xrsftpf50tdpumn
No actual code changes. Changed Create_field to CreateField to be consistent with coding standards.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
 
17
17
/* Insert of records */
18
18
 
19
 
#include "config.h"
20
 
#include <cstdio>
 
19
#include <drizzled/server_includes.h>
21
20
#include <drizzled/sql_select.h>
22
21
#include <drizzled/show.h>
23
22
#include <drizzled/error.h>
25
24
#include <drizzled/probes.h>
26
25
#include <drizzled/sql_base.h>
27
26
#include <drizzled/sql_load.h>
28
 
#include <drizzled/field/epoch.h>
 
27
#include <drizzled/field/timestamp.h>
29
28
#include <drizzled/lock.h>
30
 
#include "drizzled/sql_table.h"
31
 
#include "drizzled/pthread_globals.h"
32
 
#include "drizzled/transaction_services.h"
33
 
#include "drizzled/plugin/transactional_storage_engine.h"
34
 
 
35
 
#include "drizzled/table/shell.h"
36
 
 
37
 
namespace drizzled
38
 
{
39
 
 
40
 
extern plugin::StorageEngine *heap_engine;
41
 
extern plugin::StorageEngine *myisam_engine;
 
29
 
42
30
 
43
31
/*
44
32
  Check if insert fields are correct.
70
58
 
71
59
  if (fields.elements == 0 && values.elements != 0)
72
60
  {
73
 
    if (values.elements != table->getShare()->sizeFields())
 
61
    if (values.elements != table->s->fields)
74
62
    {
75
63
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
76
64
      return -1;
123
111
    if (table->timestamp_field) // Don't automaticly set timestamp if used
124
112
    {
125
113
      if (table->timestamp_field->isWriteSet())
126
 
      {
127
114
        clear_timestamp_auto_bits(table->timestamp_field_type,
128
115
                                  TIMESTAMP_AUTO_SET_ON_INSERT);
129
 
      }
130
116
      else
131
117
      {
132
 
        table->setWriteSet(table->timestamp_field->position());
 
118
        table->setWriteSet(table->timestamp_field->field_index);
133
119
      }
134
120
    }
135
121
  }
170
156
      Unmark the timestamp field so that we can check if this is modified
171
157
      by update_fields
172
158
    */
173
 
    timestamp_mark= table->write_set->test(table->timestamp_field->position());
174
 
    table->write_set->reset(table->timestamp_field->position());
 
159
    timestamp_mark= bitmap_test_and_clear(table->write_set,
 
160
                                          table->timestamp_field->field_index);
175
161
  }
176
162
 
177
163
  /* Check the fields we are going to modify */
182
168
  {
183
169
    /* Don't set timestamp column if this is modified. */
184
170
    if (table->timestamp_field->isWriteSet())
185
 
    {
186
171
      clear_timestamp_auto_bits(table->timestamp_field_type,
187
172
                                TIMESTAMP_AUTO_SET_ON_UPDATE);
188
 
    }
189
 
 
190
173
    if (timestamp_mark)
191
 
    {
192
 
      table->setWriteSet(table->timestamp_field->position());
193
 
    }
 
174
      table->setWriteSet(table->timestamp_field->field_index);
194
175
  }
195
176
  return 0;
196
177
}
227
208
  end of dispatch_command().
228
209
*/
229
210
 
230
 
bool insert_query(Session *session,TableList *table_list,
 
211
bool mysql_insert(Session *session,TableList *table_list,
231
212
                  List<Item> &fields,
232
213
                  List<List_item> &values_list,
233
214
                  List<Item> &update_fields,
241
222
  uint32_t value_count;
242
223
  ulong counter = 1;
243
224
  uint64_t id;
244
 
  CopyInfo info;
 
225
  COPY_INFO info;
245
226
  Table *table= 0;
246
227
  List_iterator_fast<List_item> its(values_list);
247
228
  List_item *values;
258
239
  upgrade_lock_type(session, &table_list->lock_type, duplic,
259
240
                    values_list.elements > 1);
260
241
 
261
 
  if (session->openTablesLock(table_list))
262
 
  {
263
 
    DRIZZLE_INSERT_DONE(1, 0);
264
 
    return true;
265
 
  }
 
242
  if (open_and_lock_tables(session, table_list))
 
243
    return(true);
266
244
 
267
245
  lock_type= table_list->lock_type;
268
246
 
271
249
  values= its++;
272
250
  value_count= values->elements;
273
251
 
274
 
  if (prepare_insert(session, table_list, table, fields, values,
 
252
  if (mysql_prepare_insert(session, table_list, table, fields, values,
275
253
                           update_fields, update_values, duplic, &unused_conds,
276
254
                           false,
277
255
                           (fields.elements || !value_count ||
278
256
                            (0) != 0), !ignore))
279
 
  {
280
 
    if (table != NULL)
281
 
      table->cursor->ha_release_auto_increment();
282
 
    if (!joins_freed)
283
 
      free_underlaid_joins(session, &session->lex->select_lex);
284
 
    session->setAbortOnWarning(false);
285
 
    DRIZZLE_INSERT_DONE(1, 0);
286
 
    return true;
287
 
  }
 
257
    goto abort;
288
258
 
289
259
  /* mysql_prepare_insert set table_list->table if it was not set */
290
260
  table= table_list->table;
315
285
    if (values->elements != value_count)
316
286
    {
317
287
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
318
 
 
319
 
      if (table != NULL)
320
 
        table->cursor->ha_release_auto_increment();
321
 
      if (!joins_freed)
322
 
        free_underlaid_joins(session, &session->lex->select_lex);
323
 
      session->setAbortOnWarning(false);
324
 
      DRIZZLE_INSERT_DONE(1, 0);
325
 
 
326
 
      return true;
 
288
      goto abort;
327
289
    }
328
290
    if (setup_fields(session, 0, *values, MARK_COLUMNS_READ, 0, 0))
329
 
    {
330
 
      if (table != NULL)
331
 
        table->cursor->ha_release_auto_increment();
332
 
      if (!joins_freed)
333
 
        free_underlaid_joins(session, &session->lex->select_lex);
334
 
      session->setAbortOnWarning(false);
335
 
      DRIZZLE_INSERT_DONE(1, 0);
336
 
      return true;
337
 
    }
 
291
      goto abort;
338
292
  }
339
293
  its.rewind ();
340
294
 
342
296
  ctx_state.restore_state(context, table_list);
343
297
 
344
298
  /*
345
 
    Fill in the given fields and dump it to the table cursor
 
299
    Fill in the given fields and dump it to the table file
346
300
  */
 
301
  memset(&info, 0, sizeof(info));
347
302
  info.ignore= ignore;
348
303
  info.handle_duplicates=duplic;
349
304
  info.update_fields= &update_fields;
354
309
    For single line insert, generate an error if try to set a NOT NULL field
355
310
    to NULL.
356
311
  */
357
 
  session->count_cuted_fields= ignore ? CHECK_FIELD_WARN : CHECK_FIELD_ERROR_FOR_NULL;
358
 
 
 
312
  session->count_cuted_fields= ((values_list.elements == 1 &&
 
313
                             !ignore) ?
 
314
                            CHECK_FIELD_ERROR_FOR_NULL :
 
315
                            CHECK_FIELD_WARN);
359
316
  session->cuted_fields = 0L;
360
317
  table->next_number_field=table->found_next_number_field;
361
318
 
362
319
  error=0;
363
320
  session->set_proc_info("update");
364
321
  if (duplic == DUP_REPLACE)
365
 
    table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
322
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
366
323
  if (duplic == DUP_UPDATE)
367
 
    table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
324
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
368
325
  {
369
326
    if (duplic != DUP_ERROR || ignore)
370
 
      table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
371
 
    table->cursor->ha_start_bulk_insert(values_list.elements);
 
327
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
328
    table->file->ha_start_bulk_insert(values_list.elements);
372
329
  }
373
330
 
374
331
 
375
 
  session->setAbortOnWarning(not ignore);
 
332
  session->abort_on_warning= !ignore;
376
333
 
377
334
  table->mark_columns_needed_for_insert();
378
335
 
381
338
    if (fields.elements || !value_count)
382
339
    {
383
340
      table->restoreRecordAsDefault();  // Get empty record
384
 
      if (fill_record(session, fields, *values))
 
341
      if (fill_record(session, fields, *values, 0))
385
342
      {
386
343
        if (values_list.elements != 1 && ! session->is_error())
387
344
        {
399
356
    }
400
357
    else
401
358
    {
402
 
      table->restoreRecordAsDefault();  // Get empty record
403
 
 
404
 
      if (fill_record(session, table->getFields(), *values))
 
359
      if (session->used_tables)                 // Column used in values()
 
360
        table->restoreRecordAsDefault();        // Get empty record
 
361
      else
 
362
      {
 
363
        /*
 
364
          Fix delete marker. No need to restore rest of record since it will
 
365
          be overwritten by fill_record() anyway (and fill_record() does not
 
366
          use default values in this case).
 
367
        */
 
368
        table->record[0][0]= table->s->default_values[0];
 
369
      }
 
370
      if (fill_record(session, table->field, *values, 0))
405
371
      {
406
372
        if (values_list.elements != 1 && ! session->is_error())
407
373
        {
414
380
    }
415
381
 
416
382
    // Release latches in case bulk insert takes a long time
417
 
    plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
 
383
    ha_release_temporary_latches(session);
418
384
 
419
385
    error=write_record(session, table ,&info);
420
386
    if (error)
434
400
      Do not do this release if this is a delayed insert, it would steal
435
401
      auto_inc values from the delayed_insert thread as they share Table.
436
402
    */
437
 
    table->cursor->ha_release_auto_increment();
438
 
    if (table->cursor->ha_end_bulk_insert() && !error)
 
403
    table->file->ha_release_auto_increment();
 
404
    if (table->file->ha_end_bulk_insert() && !error)
439
405
    {
440
 
      table->print_error(errno,MYF(0));
 
406
      table->file->print_error(my_errno,MYF(0));
441
407
      error=1;
442
408
    }
443
409
    if (duplic != DUP_ERROR || ignore)
444
 
      table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
410
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
445
411
 
446
 
    transactional_table= table->cursor->has_transactions();
 
412
    transactional_table= table->file->has_transactions();
447
413
 
448
414
    changed= (info.copied || info.deleted || info.updated);
449
 
    if ((changed && error <= 0) || session->transaction.stmt.hasModifiedNonTransData())
 
415
    if ((changed && error <= 0) || session->transaction.stmt.modified_non_trans_table)
450
416
    {
451
 
      if (session->transaction.stmt.hasModifiedNonTransData())
452
 
        session->transaction.all.markModifiedNonTransData();
 
417
      if (session->transaction.stmt.modified_non_trans_table)
 
418
        session->transaction.all.modified_non_trans_table= true;
453
419
    }
454
 
    assert(transactional_table || !changed || session->transaction.stmt.hasModifiedNonTransData());
 
420
    assert(transactional_table || !changed || session->transaction.stmt.modified_non_trans_table);
455
421
 
456
422
  }
457
423
  session->set_proc_info("end");
475
441
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
476
442
  table->auto_increment_field_not_null= false;
477
443
  if (duplic == DUP_REPLACE)
478
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
444
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
479
445
 
480
446
  if (error)
481
 
  {
482
 
    if (table != NULL)
483
 
      table->cursor->ha_release_auto_increment();
484
 
    if (!joins_freed)
485
 
      free_underlaid_joins(session, &session->lex->select_lex);
486
 
    session->setAbortOnWarning(false);
487
 
    DRIZZLE_INSERT_DONE(1, 0);
488
 
    return true;
489
 
  }
490
 
 
 
447
    goto abort;
491
448
  if (values_list.elements == 1 && (!(session->options & OPTION_WARNINGS) ||
492
449
                                    !session->cuted_fields))
493
450
  {
494
 
    session->row_count_func= info.copied + info.deleted + info.updated;
495
 
    session->my_ok((ulong) session->rowCount(),
496
 
                   info.copied + info.deleted + info.touched, id);
 
451
    session->row_count_func= info.copied + info.deleted +
 
452
                         ((session->client_capabilities & CLIENT_FOUND_ROWS) ?
 
453
                          info.touched : info.updated);
 
454
    session->my_ok((ulong) session->row_count_func, id);
497
455
  }
498
456
  else
499
457
  {
500
458
    char buff[160];
 
459
    ha_rows updated=((session->client_capabilities & CLIENT_FOUND_ROWS) ?
 
460
                     info.touched : info.updated);
501
461
    if (ignore)
502
 
      snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
 
462
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
503
463
              (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
504
464
    else
505
 
      snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
506
 
              (ulong) (info.deleted + info.updated), (ulong) session->cuted_fields);
507
 
    session->row_count_func= info.copied + info.deleted + info.updated;
508
 
    session->my_ok((ulong) session->rowCount(),
509
 
                   info.copied + info.deleted + info.touched, id, buff);
 
465
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
 
466
              (ulong) (info.deleted + updated), (ulong) session->cuted_fields);
 
467
    session->row_count_func= info.copied + info.deleted + updated;
 
468
    session->my_ok((ulong) session->row_count_func, id, buff);
510
469
  }
511
 
  session->status_var.inserted_row_count+= session->rowCount();
512
 
  session->setAbortOnWarning(false);
513
 
  DRIZZLE_INSERT_DONE(0, session->rowCount());
 
470
  session->abort_on_warning= 0;
 
471
  DRIZZLE_INSERT_END();
 
472
  return(false);
514
473
 
515
 
  return false;
 
474
abort:
 
475
  if (table != NULL)
 
476
    table->file->ha_release_auto_increment();
 
477
  if (!joins_freed)
 
478
    free_underlaid_joins(session, &session->lex->select_lex);
 
479
  session->abort_on_warning= 0;
 
480
  DRIZZLE_INSERT_END();
 
481
  return(true);
516
482
}
517
483
 
518
484
 
520
486
  Check if table can be updated
521
487
 
522
488
  SYNOPSIS
523
 
     prepare_insert_check_table()
 
489
     mysql_prepare_insert_check_table()
524
490
     session            Thread handle
525
491
     table_list         Table list
526
492
     fields             List of fields to be updated
532
498
     true  ERROR
533
499
*/
534
500
 
535
 
static bool prepare_insert_check_table(Session *session, TableList *table_list,
 
501
static bool mysql_prepare_insert_check_table(Session *session, TableList *table_list,
536
502
                                             List<Item> &,
537
503
                                             bool select_insert)
538
504
{
560
526
  Prepare items in INSERT statement
561
527
 
562
528
  SYNOPSIS
563
 
    prepare_insert()
 
529
    mysql_prepare_insert()
564
530
    session                     Thread handler
565
531
    table_list          Global/local table list
566
532
    table               Table to insert into (can be NULL if table should
587
553
    true  error
588
554
*/
589
555
 
590
 
bool prepare_insert(Session *session, TableList *table_list,
 
556
bool mysql_prepare_insert(Session *session, TableList *table_list,
591
557
                          Table *table, List<Item> &fields, List_item *values,
592
558
                          List<Item> &update_fields, List<Item> &update_values,
593
559
                          enum_duplicates duplic,
610
576
    inserting (for INSERT ... SELECT this is done by changing table_list,
611
577
    because INSERT ... SELECT share Select_Lex it with SELECT.
612
578
  */
613
 
  if (not select_insert)
 
579
  if (!select_insert)
614
580
  {
615
581
    for (Select_Lex_Unit *un= select_lex->first_inner_unit();
616
582
         un;
632
598
      return(true);
633
599
  }
634
600
 
635
 
  if (prepare_insert_check_table(session, table_list, fields, select_insert))
 
601
  if (mysql_prepare_insert_check_table(session, table_list, fields, select_insert))
636
602
    return(true);
637
603
 
638
604
 
658
624
 
659
625
    if (!res && check_fields)
660
626
    {
661
 
      bool saved_abort_on_warning= session->abortOnWarning();
662
 
 
663
 
      session->setAbortOnWarning(abort_on_warning);
 
627
      bool saved_abort_on_warning= session->abort_on_warning;
 
628
      session->abort_on_warning= abort_on_warning;
664
629
      res= check_that_all_fields_are_given_values(session,
665
630
                                                  table ? table :
666
631
                                                  context->table_list->table,
667
632
                                                  context->table_list);
668
 
      session->setAbortOnWarning(saved_abort_on_warning);
 
633
      session->abort_on_warning= saved_abort_on_warning;
669
634
    }
670
635
 
671
636
    if (!res && duplic == DUP_UPDATE)
676
641
    /* Restore the current context. */
677
642
    ctx_state.restore_state(context, table_list);
678
643
 
679
 
    if (not res)
 
644
    if (!res)
680
645
      res= setup_fields(session, 0, update_values, MARK_COLUMNS_READ, 0, 0);
681
646
  }
682
647
 
683
648
  if (res)
684
649
    return(res);
685
650
 
686
 
  if (not table)
 
651
  if (!table)
687
652
    table= table_list->table;
688
653
 
689
 
  if (not select_insert)
 
654
  if (!select_insert)
690
655
  {
691
656
    TableList *duplicate;
692
 
    if ((duplicate= unique_table(table_list, table_list->next_global, true)))
 
657
    if ((duplicate= unique_table(session, table_list, table_list->next_global, 1)))
693
658
    {
694
 
      my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->alias);
695
 
 
696
 
      return true;
 
659
      update_non_unique_table_error(table_list, "INSERT", duplicate);
 
660
      return(true);
697
661
    }
698
662
  }
699
 
 
700
663
  if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
701
664
    table->prepare_for_position();
702
 
 
703
 
  return false;
 
665
  return(false);
704
666
}
705
667
 
706
668
 
708
670
 
709
671
static int last_uniq_key(Table *table,uint32_t keynr)
710
672
{
711
 
  while (++keynr < table->getShare()->sizeKeys())
 
673
  while (++keynr < table->s->keys)
712
674
    if (table->key_info[keynr].flags & HA_NOSAME)
713
675
      return 0;
714
676
  return 1;
723
685
     write_record()
724
686
      session   - thread context
725
687
      table - table to which record should be written
726
 
      info  - CopyInfo structure describing handling of duplicates
 
688
      info  - COPY_INFO structure describing handling of duplicates
727
689
              and which is used for counting number of records inserted
728
690
              and deleted.
729
691
 
733
695
    then both on update triggers will work instead. Similarly both on
734
696
    delete triggers will be invoked if we will delete conflicting records.
735
697
 
736
 
    Sets session->transaction.stmt.modified_non_trans_data to true if table which is updated didn't have
 
698
    Sets session->transaction.stmt.modified_non_trans_table to true if table which is updated didn't have
737
699
    transactions.
738
700
 
739
701
  RETURN VALUE
742
704
*/
743
705
 
744
706
 
745
 
int write_record(Session *session, Table *table,CopyInfo *info)
 
707
int write_record(Session *session, Table *table,COPY_INFO *info)
746
708
{
747
709
  int error;
748
 
  std::vector<unsigned char> key;
749
 
  boost::dynamic_bitset<> *save_read_set, *save_write_set;
750
 
  uint64_t prev_insert_id= table->cursor->next_insert_id;
 
710
  char *key=0;
 
711
  MY_BITMAP *save_read_set, *save_write_set;
 
712
  uint64_t prev_insert_id= table->file->next_insert_id;
751
713
  uint64_t insert_id_for_cur_row= 0;
752
714
 
753
715
 
757
719
 
758
720
  if (info->handle_duplicates == DUP_REPLACE || info->handle_duplicates == DUP_UPDATE)
759
721
  {
760
 
    while ((error=table->cursor->insertRecord(table->getInsertRecord())))
 
722
    while ((error=table->file->ha_write_row(table->record[0])))
761
723
    {
762
724
      uint32_t key_nr;
763
725
      /*
767
729
        the autogenerated value to avoid session->insert_id_for_cur_row to become
768
730
        0.
769
731
      */
770
 
      if (table->cursor->insert_id_for_cur_row > 0)
771
 
        insert_id_for_cur_row= table->cursor->insert_id_for_cur_row;
 
732
      if (table->file->insert_id_for_cur_row > 0)
 
733
        insert_id_for_cur_row= table->file->insert_id_for_cur_row;
772
734
      else
773
 
        table->cursor->insert_id_for_cur_row= insert_id_for_cur_row;
 
735
        table->file->insert_id_for_cur_row= insert_id_for_cur_row;
774
736
      bool is_duplicate_key_error;
775
 
      if (table->cursor->is_fatal_error(error, HA_CHECK_DUP))
 
737
      if (table->file->is_fatal_error(error, HA_CHECK_DUP))
776
738
        goto err;
777
 
      is_duplicate_key_error= table->cursor->is_fatal_error(error, 0);
 
739
      is_duplicate_key_error= table->file->is_fatal_error(error, 0);
778
740
      if (!is_duplicate_key_error)
779
741
      {
780
742
        /*
786
748
          goto gok_or_after_err; /* Ignoring a not fatal error, return 0 */
787
749
        goto err;
788
750
      }
789
 
      if ((int) (key_nr = table->get_dup_key(error)) < 0)
 
751
      if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
790
752
      {
791
753
        error= HA_ERR_FOUND_DUPP_KEY;         /* Database can't find key */
792
754
        goto err;
800
762
      */
801
763
      if (info->handle_duplicates == DUP_REPLACE &&
802
764
          table->next_number_field &&
803
 
          key_nr == table->getShare()->next_number_index &&
 
765
          key_nr == table->s->next_number_index &&
804
766
          (insert_id_for_cur_row > 0))
805
767
        goto err;
806
 
      if (table->cursor->getEngine()->check_flag(HTON_BIT_DUPLICATE_POS))
 
768
      if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
807
769
      {
808
 
        if (table->cursor->rnd_pos(table->getUpdateRecord(),table->cursor->dup_ref))
 
770
        if (table->file->rnd_pos(table->record[1],table->file->dup_ref))
809
771
          goto err;
810
772
      }
811
773
      else
812
774
      {
813
 
        if (table->cursor->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
 
775
        if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
814
776
        {
815
 
          error=errno;
 
777
          error=my_errno;
816
778
          goto err;
817
779
        }
818
780
 
819
 
        if (not key.size())
 
781
        if (!key)
820
782
        {
821
 
          key.resize(table->getShare()->max_unique_length);
 
783
          if (!(key=(char*) malloc(table->s->max_unique_length)))
 
784
          {
 
785
            error=ENOMEM;
 
786
            goto err;
 
787
          }
822
788
        }
823
 
        key_copy(&key[0], table->getInsertRecord(), table->key_info+key_nr, 0);
824
 
        if ((error=(table->cursor->index_read_idx_map(table->getUpdateRecord(),key_nr,
825
 
                                                    &key[0], HA_WHOLE_KEY,
 
789
        key_copy((unsigned char*) key,table->record[0],table->key_info+key_nr,0);
 
790
        if ((error=(table->file->index_read_idx_map(table->record[1],key_nr,
 
791
                                                    (unsigned char*) key, HA_WHOLE_KEY,
826
792
                                                    HA_READ_KEY_EXACT))))
827
793
          goto err;
828
794
      }
833
799
          that matches, is updated. If update causes a conflict again,
834
800
          an error is returned
835
801
        */
836
 
        assert(table->insert_values.size());
 
802
        assert(table->insert_values != NULL);
837
803
        table->storeRecordAsInsert();
838
804
        table->restoreRecord();
839
805
        assert(info->update_fields->elements ==
843
809
                                                 info->ignore))
844
810
          goto before_err;
845
811
 
846
 
        table->cursor->restore_auto_increment(prev_insert_id);
 
812
        table->file->restore_auto_increment(prev_insert_id);
847
813
        if (table->next_number_field)
848
 
          table->cursor->adjust_next_insert_id_after_explicit_value(
 
814
          table->file->adjust_next_insert_id_after_explicit_value(
849
815
            table->next_number_field->val_int());
850
816
        info->touched++;
851
 
 
852
 
        if (! table->records_are_comparable() || table->compare_records())
 
817
        if ((table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ &&
 
818
             !bitmap_is_subset(table->write_set, table->read_set)) ||
 
819
            table->compare_record())
853
820
        {
854
 
          if ((error=table->cursor->updateRecord(table->getUpdateRecord(),
855
 
                                                table->getInsertRecord())) &&
 
821
          if ((error=table->file->ha_update_row(table->record[1],
 
822
                                                table->record[0])) &&
856
823
              error != HA_ERR_RECORD_IS_THE_SAME)
857
824
          {
858
825
            if (info->ignore &&
859
 
                !table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
 
826
                !table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
860
827
            {
861
828
              goto gok_or_after_err;
862
829
            }
870
837
          /*
871
838
            If ON DUP KEY UPDATE updates a row instead of inserting one, it's
872
839
            like a regular UPDATE statement: it should not affect the value of a
873
 
            next SELECT LAST_INSERT_ID() or insert_id().
 
840
            next SELECT LAST_INSERT_ID() or mysql_insert_id().
874
841
            Except if LAST_INSERT_ID(#) was in the INSERT query, which is
875
842
            handled separately by Session::arg_of_last_insert_id_function.
876
843
          */
877
 
          insert_id_for_cur_row= table->cursor->insert_id_for_cur_row= 0;
 
844
          insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
878
845
          info->copied++;
879
846
        }
880
847
 
881
848
        if (table->next_number_field)
882
 
          table->cursor->adjust_next_insert_id_after_explicit_value(
 
849
          table->file->adjust_next_insert_id_after_explicit_value(
883
850
            table->next_number_field->val_int());
884
851
        info->touched++;
885
852
 
902
869
          ON UPDATE triggers.
903
870
        */
904
871
        if (last_uniq_key(table,key_nr) &&
905
 
            !table->cursor->referenced_by_foreign_key() &&
 
872
            !table->file->referenced_by_foreign_key() &&
906
873
            (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
907
874
             table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
908
875
        {
909
 
          if ((error=table->cursor->updateRecord(table->getUpdateRecord(),
910
 
                                                table->getInsertRecord())) &&
 
876
          if ((error=table->file->ha_update_row(table->record[1],
 
877
                                                table->record[0])) &&
911
878
              error != HA_ERR_RECORD_IS_THE_SAME)
912
879
            goto err;
913
880
          if (error != HA_ERR_RECORD_IS_THE_SAME)
914
881
            info->deleted++;
915
882
          else
916
883
            error= 0;
917
 
          session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
 
884
          session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
918
885
          /*
919
886
            Since we pretend that we have done insert we should call
920
887
            its after triggers.
923
890
        }
924
891
        else
925
892
        {
926
 
          if ((error=table->cursor->deleteRecord(table->getUpdateRecord())))
 
893
          if ((error=table->file->ha_delete_row(table->record[1])))
927
894
            goto err;
928
895
          info->deleted++;
929
 
          if (!table->cursor->has_transactions())
930
 
            session->transaction.stmt.markModifiedNonTransData();
 
896
          if (!table->file->has_transactions())
 
897
            session->transaction.stmt.modified_non_trans_table= true;
931
898
          /* Let us attempt do write_row() once more */
932
899
        }
933
900
      }
934
901
    }
935
 
    session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
 
902
    session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
936
903
    /*
937
904
      Restore column maps if they where replaced during an duplicate key
938
905
      problem.
939
906
    */
940
907
    if (table->read_set != save_read_set ||
941
908
        table->write_set != save_write_set)
942
 
      table->column_bitmaps_set(*save_read_set, *save_write_set);
 
909
      table->column_bitmaps_set(save_read_set, save_write_set);
943
910
  }
944
 
  else if ((error=table->cursor->insertRecord(table->getInsertRecord())))
 
911
  else if ((error=table->file->ha_write_row(table->record[0])))
945
912
  {
946
913
    if (!info->ignore ||
947
 
        table->cursor->is_fatal_error(error, HA_CHECK_DUP))
 
914
        table->file->is_fatal_error(error, HA_CHECK_DUP))
948
915
      goto err;
949
 
    table->cursor->restore_auto_increment(prev_insert_id);
 
916
    table->file->restore_auto_increment(prev_insert_id);
950
917
    goto gok_or_after_err;
951
918
  }
952
919
 
953
920
after_n_copied_inc:
954
921
  info->copied++;
955
 
  session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
 
922
  session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
956
923
 
957
924
gok_or_after_err:
958
 
  if (!table->cursor->has_transactions())
959
 
    session->transaction.stmt.markModifiedNonTransData();
 
925
  if (key)
 
926
    free(key);
 
927
  if (!table->file->has_transactions())
 
928
    session->transaction.stmt.modified_non_trans_table= true;
960
929
  return(0);
961
930
 
962
931
err:
964
933
  /* current_select is NULL if this is a delayed insert */
965
934
  if (session->lex->current_select)
966
935
    session->lex->current_select->no_error= 0;        // Give error
967
 
  table->print_error(error,MYF(0));
 
936
  table->file->print_error(error,MYF(0));
968
937
 
969
938
before_err:
970
 
  table->cursor->restore_auto_increment(prev_insert_id);
971
 
  table->column_bitmaps_set(*save_read_set, *save_write_set);
972
 
  return 1;
 
939
  table->file->restore_auto_increment(prev_insert_id);
 
940
  if (key)
 
941
    free(key);
 
942
  table->column_bitmaps_set(save_read_set, save_write_set);
 
943
  return(1);
973
944
}
974
945
 
975
946
 
982
953
{
983
954
  int err= 0;
984
955
 
985
 
  for (Field **field=entry->getFields() ; *field ; field++)
 
956
  for (Field **field=entry->field ; *field ; field++)
986
957
  {
987
958
    if (((*field)->isWriteSet()) == false)
988
959
    {
1016
987
      }
1017
988
    }
1018
989
  }
1019
 
  return session->abortOnWarning() ? err : 0;
 
990
  return session->abort_on_warning ? err : 0;
1020
991
}
1021
992
 
1022
993
/***************************************************************************
1028
999
  make insert specific preparation and checks after opening tables
1029
1000
 
1030
1001
  SYNOPSIS
1031
 
    insert_select_prepare()
 
1002
    mysql_insert_select_prepare()
1032
1003
    session         thread handler
1033
1004
 
1034
1005
  RETURN
1036
1007
    true  Error
1037
1008
*/
1038
1009
 
1039
 
bool insert_select_prepare(Session *session)
 
1010
bool mysql_insert_select_prepare(Session *session)
1040
1011
{
1041
1012
  LEX *lex= session->lex;
1042
1013
  Select_Lex *select_lex= &lex->select_lex;
1046
1017
    clause if table is VIEW
1047
1018
  */
1048
1019
 
1049
 
  if (prepare_insert(session, lex->query_tables,
 
1020
  if (mysql_prepare_insert(session, lex->query_tables,
1050
1021
                           lex->query_tables->table, lex->field_list, 0,
1051
1022
                           lex->update_list, lex->value_list,
1052
1023
                           lex->duplicates,
1070
1041
                             List<Item> *update_fields,
1071
1042
                             List<Item> *update_values,
1072
1043
                             enum_duplicates duplic,
1073
 
                             bool ignore_check_option_errors) :
1074
 
  table_list(table_list_par), table(table_par), fields(fields_par),
1075
 
  autoinc_value_of_last_inserted_row(0),
1076
 
  insert_into_view(table_list_par && 0 != 0)
 
1044
                             bool ignore_check_option_errors)
 
1045
  :table_list(table_list_par), table(table_par), fields(fields_par),
 
1046
   autoinc_value_of_last_inserted_row(0),
 
1047
   insert_into_view(table_list_par && 0 != 0)
1077
1048
{
 
1049
  memset(&info, 0, sizeof(info));
1078
1050
  info.handle_duplicates= duplic;
1079
1051
  info.ignore= ignore_check_option_errors;
1080
1052
  info.update_fields= update_fields;
1105
1077
 
1106
1078
  if (!res && fields->elements)
1107
1079
  {
1108
 
    bool saved_abort_on_warning= session->abortOnWarning();
1109
 
    session->setAbortOnWarning(not info.ignore);
 
1080
    bool saved_abort_on_warning= session->abort_on_warning;
 
1081
    session->abort_on_warning= !info.ignore;
1110
1082
    res= check_that_all_fields_are_given_values(session, table_list->table,
1111
1083
                                                table_list);
1112
 
    session->setAbortOnWarning(saved_abort_on_warning);
 
1084
    session->abort_on_warning= saved_abort_on_warning;
1113
1085
  }
1114
1086
 
1115
1087
  if (info.handle_duplicates == DUP_UPDATE && !res)
1179
1151
    Is table which we are changing used somewhere in other parts of
1180
1152
    query
1181
1153
  */
1182
 
  if (unique_table(table_list, table_list->next_global))
 
1154
  if (unique_table(session, table_list, table_list->next_global, 0))
1183
1155
  {
1184
1156
    /* Using same table for INSERT and SELECT */
1185
1157
    lex->current_select->options|= OPTION_BUFFER_RESULT;
1196
1168
      We won't start bulk inserts at all if this statement uses functions or
1197
1169
      should invoke triggers since they may access to the same table too.
1198
1170
    */
1199
 
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
1171
    table->file->ha_start_bulk_insert((ha_rows) 0);
1200
1172
  }
1201
1173
  table->restoreRecordAsDefault();              // Get empty record
1202
1174
  table->next_number_field=table->found_next_number_field;
1203
1175
 
1204
1176
  session->cuted_fields=0;
1205
 
 
1206
1177
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
1207
 
    table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
1208
 
 
 
1178
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
1209
1179
  if (info.handle_duplicates == DUP_REPLACE)
1210
 
    table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1211
 
 
 
1180
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1212
1181
  if (info.handle_duplicates == DUP_UPDATE)
1213
 
    table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1214
 
 
1215
 
  session->setAbortOnWarning(not info.ignore);
 
1182
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
1183
  session->abort_on_warning= !info.ignore;
1216
1184
  table->mark_columns_needed_for_insert();
1217
1185
 
1218
1186
 
1240
1208
{
1241
1209
 
1242
1210
  if (session->lex->current_select->options & OPTION_BUFFER_RESULT)
1243
 
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
1211
    table->file->ha_start_bulk_insert((ha_rows) 0);
1244
1212
  return(0);
1245
1213
}
1246
1214
 
1258
1226
  {
1259
1227
    table->next_number_field=0;
1260
1228
    table->auto_increment_field_not_null= false;
1261
 
    table->cursor->ha_reset();
 
1229
    table->file->ha_reset();
1262
1230
  }
1263
1231
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
1264
 
  session->setAbortOnWarning(false);
 
1232
  session->abort_on_warning= 0;
1265
1233
  return;
1266
1234
}
1267
1235
 
1274
1242
  if (unit->offset_limit_cnt)
1275
1243
  {                                             // using limit offset,count
1276
1244
    unit->offset_limit_cnt--;
1277
 
    return false;
 
1245
    return(0);
1278
1246
  }
1279
1247
 
1280
1248
  session->count_cuted_fields= CHECK_FIELD_WARN;        // Calculate cuted fields
1281
1249
  store_values(values);
1282
1250
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
1283
1251
  if (session->is_error())
1284
 
    return true;
 
1252
    return(1);
1285
1253
 
1286
1254
  // Release latches in case bulk insert takes a long time
1287
 
  plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
 
1255
  ha_release_temporary_latches(session);
1288
1256
 
1289
1257
  error= write_record(session, table, &info);
1290
 
  table->auto_increment_field_not_null= false;
1291
1258
 
1292
1259
  if (!error)
1293
1260
  {
1326
1293
void select_insert::store_values(List<Item> &values)
1327
1294
{
1328
1295
  if (fields->elements)
1329
 
    fill_record(session, *fields, values, true);
 
1296
    fill_record(session, *fields, values, 1);
1330
1297
  else
1331
 
    fill_record(session, table->getFields(), values, true);
 
1298
    fill_record(session, table->field, values, 1);
1332
1299
}
1333
1300
 
1334
 
void select_insert::send_error(drizzled::error_t errcode,const char *err)
 
1301
void select_insert::send_error(uint32_t errcode,const char *err)
1335
1302
{
 
1303
 
 
1304
 
1336
1305
  my_message(errcode, err, MYF(0));
 
1306
 
 
1307
  return;
1337
1308
}
1338
1309
 
1339
1310
 
1340
1311
bool select_insert::send_eof()
1341
1312
{
1342
1313
  int error;
1343
 
  bool const trans_table= table->cursor->has_transactions();
 
1314
  bool const trans_table= table->file->has_transactions();
1344
1315
  uint64_t id;
1345
1316
  bool changed;
1346
1317
 
1347
 
  error= table->cursor->ha_end_bulk_insert();
1348
 
  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1349
 
  table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1318
  error= table->file->ha_end_bulk_insert();
 
1319
  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1320
  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1350
1321
 
1351
1322
  if ((changed= (info.copied || info.deleted || info.updated)))
1352
1323
  {
1353
1324
    /*
1354
1325
      We must invalidate the table in the query cache before binlog writing
1355
 
      and autocommitOrRollback.
 
1326
      and ha_autocommit_or_rollback.
1356
1327
    */
1357
 
    if (session->transaction.stmt.hasModifiedNonTransData())
1358
 
      session->transaction.all.markModifiedNonTransData();
 
1328
    if (session->transaction.stmt.modified_non_trans_table)
 
1329
      session->transaction.all.modified_non_trans_table= true;
1359
1330
  }
1360
1331
  assert(trans_table || !changed ||
1361
 
              session->transaction.stmt.hasModifiedNonTransData());
 
1332
              session->transaction.stmt.modified_non_trans_table);
1362
1333
 
1363
 
  table->cursor->ha_release_auto_increment();
 
1334
  table->file->ha_release_auto_increment();
1364
1335
 
1365
1336
  if (error)
1366
1337
  {
1367
 
    table->print_error(error,MYF(0));
1368
 
    DRIZZLE_INSERT_SELECT_DONE(error, 0);
1369
 
    return 1;
 
1338
    table->file->print_error(error,MYF(0));
 
1339
    return(1);
1370
1340
  }
1371
1341
  char buff[160];
1372
1342
  if (info.ignore)
1373
 
    snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
 
1343
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
1374
1344
            (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
1375
1345
  else
1376
 
    snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
 
1346
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
1377
1347
            (ulong) (info.deleted+info.updated), (ulong) session->cuted_fields);
1378
 
  session->row_count_func= info.copied + info.deleted + info.updated;
 
1348
  session->row_count_func= info.copied + info.deleted +
 
1349
                       ((session->client_capabilities & CLIENT_FOUND_ROWS) ?
 
1350
                        info.touched : info.updated);
1379
1351
 
1380
1352
  id= (session->first_successful_insert_id_in_cur_stmt > 0) ?
1381
1353
    session->first_successful_insert_id_in_cur_stmt :
1382
1354
    (session->arg_of_last_insert_id_function ?
1383
1355
     session->first_successful_insert_id_in_prev_stmt :
1384
1356
     (info.copied ? autoinc_value_of_last_inserted_row : 0));
1385
 
  session->my_ok((ulong) session->rowCount(),
1386
 
                 info.copied + info.deleted + info.touched, id, buff);
1387
 
  session->status_var.inserted_row_count+= session->rowCount(); 
1388
 
  DRIZZLE_INSERT_SELECT_DONE(0, session->rowCount());
1389
 
  return 0;
 
1357
  session->my_ok((ulong) session->row_count_func, id, buff);
 
1358
  return(0);
1390
1359
}
1391
1360
 
1392
1361
void select_insert::abort() {
1402
1371
  {
1403
1372
    bool changed, transactional_table;
1404
1373
 
1405
 
    table->cursor->ha_end_bulk_insert();
 
1374
    table->file->ha_end_bulk_insert();
1406
1375
 
1407
1376
    /*
1408
1377
      If at least one row has been inserted/modified and will stay in
1419
1388
      zero, so no check for that is made.
1420
1389
    */
1421
1390
    changed= (info.copied || info.deleted || info.updated);
1422
 
    transactional_table= table->cursor->has_transactions();
 
1391
    transactional_table= table->file->has_transactions();
1423
1392
    assert(transactional_table || !changed ||
1424
 
                session->transaction.stmt.hasModifiedNonTransData());
1425
 
    table->cursor->ha_release_auto_increment();
1426
 
  }
1427
 
 
1428
 
  if (DRIZZLE_INSERT_SELECT_DONE_ENABLED())
1429
 
  {
1430
 
    DRIZZLE_INSERT_SELECT_DONE(0, info.copied + info.deleted + info.updated);
 
1393
                session->transaction.stmt.modified_non_trans_table);
 
1394
    table->file->ha_release_auto_increment();
1431
1395
  }
1432
1396
 
1433
1397
  return;
1454
1418
      items        in     List of items which should be used to produce rest
1455
1419
                          of fields for the table (corresponding fields will
1456
1420
                          be added to the end of alter_info->create_list)
1457
 
      lock         out    Pointer to the DrizzleLock object for table created
 
1421
      lock         out    Pointer to the DRIZZLE_LOCK object for table created
1458
1422
                          (or open temporary table) will be returned in this
1459
1423
                          parameter. Since this table is not included in
1460
1424
                          Session::lock caller is responsible for explicitly
1464
1428
  NOTES
1465
1429
    This function behaves differently for base and temporary tables:
1466
1430
    - For base table we assume that either table exists and was pre-opened
1467
 
      and locked at openTablesLock() stage (and in this case we just
 
1431
      and locked at open_and_lock_tables() stage (and in this case we just
1468
1432
      emit error or warning and return pre-opened Table object) or special
1469
1433
      placeholder was put in table cache that guarantees that this table
1470
1434
      won't be created or opened until the placeholder will be removed
1482
1446
 
1483
1447
static Table *create_table_from_items(Session *session, HA_CREATE_INFO *create_info,
1484
1448
                                      TableList *create_table,
1485
 
                                      message::Table &table_proto,
1486
 
                                      AlterInfo *alter_info,
 
1449
                                      Alter_info *alter_info,
1487
1450
                                      List<Item> *items,
1488
 
                                      bool is_if_not_exists,
1489
 
                                      DrizzleLock **lock,
1490
 
                                      identifier::Table::const_reference identifier)
 
1451
                                      DRIZZLE_LOCK **lock)
1491
1452
{
1492
 
  TableShare share(message::Table::INTERNAL);
 
1453
  Table tmp_table;              // Used during 'CreateField()'
 
1454
  TableShare share;
 
1455
  Table *table= 0;
1493
1456
  uint32_t select_field_count= items->elements;
1494
1457
  /* Add selected items to field list */
1495
1458
  List_iterator_fast<Item> it(*items);
1496
1459
  Item *item;
1497
1460
  Field *tmp_field;
 
1461
  bool not_used;
1498
1462
 
1499
 
  if (not (identifier.isTmp()) && create_table->table->db_stat)
 
1463
  if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
 
1464
      create_table->table->db_stat)
1500
1465
  {
1501
 
    /* Table already exists and was open at openTablesLock() stage. */
1502
 
    if (is_if_not_exists)
 
1466
    /* Table already exists and was open at open_and_lock_tables() stage. */
 
1467
    if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
1503
1468
    {
1504
1469
      create_info->table_existed= 1;            // Mark that table existed
1505
1470
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1506
1471
                          ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
1507
 
                          create_table->getTableName());
1508
 
      return create_table->table;
 
1472
                          create_table->table_name);
 
1473
      return(create_table->table);
1509
1474
    }
1510
1475
 
1511
 
    my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
1512
 
    return NULL;
 
1476
    my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
 
1477
    return(0);
1513
1478
  }
1514
1479
 
 
1480
  tmp_table.alias= 0;
 
1481
  tmp_table.timestamp_field= 0;
 
1482
  tmp_table.s= &share;
 
1483
 
 
1484
  tmp_table.s->db_create_options=0;
 
1485
  tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
 
1486
  tmp_table.s->db_low_byte_first=
 
1487
        test(create_info->db_type == myisam_engine ||
 
1488
             create_info->db_type == heap_engine);
 
1489
  tmp_table.null_row= false;
 
1490
  tmp_table.maybe_null= false;
 
1491
 
 
1492
  while ((item=it++))
1515
1493
  {
1516
 
    table::Shell tmp_table(share);              // Used during 'CreateField()'
1517
 
 
1518
 
    if (not table_proto.engine().name().compare("MyISAM"))
1519
 
      tmp_table.getMutableShare()->db_low_byte_first= true;
1520
 
    else if (not table_proto.engine().name().compare("MEMORY"))
1521
 
      tmp_table.getMutableShare()->db_low_byte_first= true;
1522
 
 
1523
 
    tmp_table.in_use= session;
1524
 
 
1525
 
    while ((item=it++))
1526
 
    {
1527
 
      CreateField *cr_field;
1528
 
      Field *field, *def_field;
1529
 
      if (item->type() == Item::FUNC_ITEM)
1530
 
      {
1531
 
        if (item->result_type() != STRING_RESULT)
1532
 
        {
1533
 
          field= item->tmp_table_field(&tmp_table);
1534
 
        }
1535
 
        else
1536
 
        {
1537
 
          field= item->tmp_table_field_from_field_type(&tmp_table, 0);
1538
 
        }
1539
 
      }
 
1494
    CreateField *cr_field;
 
1495
    Field *field, *def_field;
 
1496
    if (item->type() == Item::FUNC_ITEM)
 
1497
      if (item->result_type() != STRING_RESULT)
 
1498
        field= item->tmp_table_field(&tmp_table);
1540
1499
      else
1541
 
      {
1542
 
        field= create_tmp_field(session, &tmp_table, item, item->type(),
1543
 
                                (Item ***) 0, &tmp_field, &def_field, false,
1544
 
                                false, false, 0);
1545
 
      }
1546
 
 
1547
 
      if (!field ||
1548
 
          !(cr_field=new CreateField(field,(item->type() == Item::FIELD_ITEM ?
1549
 
                                            ((Item_field *)item)->field :
1550
 
                                            (Field*) 0))))
1551
 
      {
1552
 
        return NULL;
1553
 
      }
1554
 
 
1555
 
      if (item->maybe_null)
1556
 
      {
1557
 
        cr_field->flags &= ~NOT_NULL_FLAG;
1558
 
      }
1559
 
 
1560
 
      alter_info->create_list.push_back(cr_field);
1561
 
    }
 
1500
        field= item->tmp_table_field_from_field_type(&tmp_table, 0);
 
1501
    else
 
1502
      field= create_tmp_field(session, &tmp_table, item, item->type(),
 
1503
                              (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0,
 
1504
                              0);
 
1505
    if (!field ||
 
1506
        !(cr_field=new CreateField(field,(item->type() == Item::FIELD_ITEM ?
 
1507
                                           ((Item_field *)item)->field :
 
1508
                                           (Field*) 0))))
 
1509
      return(0);
 
1510
    if (item->maybe_null)
 
1511
      cr_field->flags &= ~NOT_NULL_FLAG;
 
1512
    alter_info->create_list.push_back(cr_field);
1562
1513
  }
1563
1514
 
1564
1515
  /*
1568
1519
    creating base table on which name we have exclusive lock. So code below
1569
1520
    should not cause deadlocks or races.
1570
1521
  */
1571
 
  Table *table= 0;
1572
1522
  {
1573
 
    if (not create_table_no_lock(session,
1574
 
                                 identifier,
1575
 
                                 create_info,
1576
 
                                 table_proto,
1577
 
                                 alter_info,
1578
 
                                 false,
1579
 
                                 select_field_count,
1580
 
                                 is_if_not_exists))
 
1523
    drizzled::message::Table table_proto;
 
1524
    table_proto.set_name(create_table->table_name);
 
1525
    table_proto.set_type(drizzled::message::Table::STANDARD);
 
1526
 
 
1527
    if (!mysql_create_table_no_lock(session, create_table->db,
 
1528
                                    create_table->table_name,
 
1529
                                    create_info,
 
1530
                                    &table_proto,
 
1531
                                    alter_info, false,
 
1532
                                    select_field_count))
1581
1533
    {
1582
 
      if (create_info->table_existed && not identifier.isTmp())
 
1534
      if (create_info->table_existed &&
 
1535
          !(create_info->options & HA_LEX_CREATE_TMP_TABLE))
1583
1536
      {
1584
1537
        /*
1585
1538
          This means that someone created table underneath server
1586
1539
          or it was created via different mysqld front-end to the
1587
1540
          cluster. We don't have much options but throw an error.
1588
1541
        */
1589
 
        my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
1590
 
        return NULL;
 
1542
        my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
 
1543
        return(0);
1591
1544
      }
1592
1545
 
1593
 
      if (not identifier.isTmp())
 
1546
      if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
1594
1547
      {
1595
 
        /* CREATE TABLE... has found that the table already exists for insert and is adapting to use it */
1596
 
        boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex());
1597
 
 
1598
 
        if (create_table->table)
 
1548
        pthread_mutex_lock(&LOCK_open); /* CREATE TABLE... has found that the table already exists for insert and is adapting to use it */
 
1549
        if (reopen_name_locked_table(session, create_table, false))
1599
1550
        {
1600
 
          table::Concurrent *concurrent_table= static_cast<table::Concurrent *>(create_table->table);
1601
 
 
1602
 
          if (concurrent_table->reopen_name_locked_table(create_table, session))
1603
 
          {
1604
 
            (void)plugin::StorageEngine::dropTable(*session, identifier);
1605
 
          }
1606
 
          else
1607
 
          {
1608
 
            table= create_table->table;
1609
 
          }
 
1551
          quick_rm_table(create_info->db_type, create_table->db,
 
1552
                         create_table->table_name, false);
1610
1553
        }
1611
1554
        else
1612
 
        {
1613
 
          (void)plugin::StorageEngine::dropTable(*session, identifier);
1614
 
        }
 
1555
          table= create_table->table;
 
1556
        pthread_mutex_unlock(&LOCK_open);
1615
1557
      }
1616
1558
      else
1617
1559
      {
1618
 
        if (not (table= session->openTable(create_table, (bool*) 0,
1619
 
                                           DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
1620
 
            not create_info->table_existed)
 
1560
        if (!(table= open_table(session, create_table, (bool*) 0,
 
1561
                                DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
 
1562
            !create_info->table_existed)
1621
1563
        {
1622
1564
          /*
1623
1565
            This shouldn't happen as creation of temporary table should make
1624
1566
            it preparable for open. But let us do close_temporary_table() here
1625
1567
            just in case.
1626
1568
          */
1627
 
          session->drop_temporary_table(identifier);
 
1569
          drop_temporary_table(session, create_table);
1628
1570
        }
1629
1571
      }
1630
1572
    }
1631
 
    if (not table)                                   // open failed
1632
 
      return NULL;
 
1573
    if (!table)                                   // open failed
 
1574
      return(0);
1633
1575
  }
1634
1576
 
1635
1577
  table->reginfo.lock_type=TL_WRITE;
1636
 
  if (not ((*lock)= session->lockTables(&table, 1, DRIZZLE_LOCK_IGNORE_FLUSH)))
 
1578
  if (! ((*lock)= mysql_lock_tables(session, &table, 1,
 
1579
                                    DRIZZLE_LOCK_IGNORE_FLUSH, &not_used)))
1637
1580
  {
1638
1581
    if (*lock)
1639
1582
    {
1640
 
      session->unlockTables(*lock);
 
1583
      mysql_unlock_tables(session, *lock);
1641
1584
      *lock= 0;
1642
1585
    }
1643
1586
 
1644
 
    if (not create_info->table_existed)
1645
 
      session->drop_open_table(table, identifier);
1646
 
    return NULL;
 
1587
    if (!create_info->table_existed)
 
1588
      drop_open_table(session, table, create_table->db, create_table->table_name);
 
1589
    return(0);
1647
1590
  }
1648
 
 
1649
 
  return table;
 
1591
  return(table);
1650
1592
}
1651
1593
 
1652
1594
 
1653
1595
int
1654
1596
select_create::prepare(List<Item> &values, Select_Lex_Unit *u)
1655
1597
{
1656
 
  DrizzleLock *extra_lock= NULL;
 
1598
  DRIZZLE_LOCK *extra_lock= NULL;
1657
1599
  /*
1658
 
    For replication, the CREATE-SELECT statement is written
1659
 
    in two pieces: the first transaction messsage contains 
1660
 
    the CREATE TABLE statement as a CreateTableStatement message
1661
 
    necessary to create the table.
1662
 
    
1663
 
    The second transaction message contains all the InsertStatement
1664
 
    and associated InsertRecords that should go into the table.
 
1600
    For row-based replication, the CREATE-SELECT statement is written
 
1601
    in two pieces: the first one contain the CREATE TABLE statement
 
1602
    necessary to create the table and the second part contain the rows
 
1603
    that should go into the table.
 
1604
 
 
1605
    For non-temporary tables, the start of the CREATE-SELECT
 
1606
    implicitly commits the previous transaction, and all events
 
1607
    forming the statement will be stored the transaction cache. At end
 
1608
    of the statement, the entire statement is committed as a
 
1609
    transaction, and all events are written to the binary log.
 
1610
 
 
1611
    On the master, the table is locked for the duration of the
 
1612
    statement, but since the CREATE part is replicated as a simple
 
1613
    statement, there is no way to lock the table for accesses on the
 
1614
    slave.  Hence, we have to hold on to the CREATE part of the
 
1615
    statement until the statement has finished.
1665
1616
   */
1666
1617
 
1667
1618
  unit= u;
1668
1619
 
1669
 
  if (not (table= create_table_from_items(session, create_info, create_table,
1670
 
                                          table_proto,
1671
 
                                          alter_info, &values,
1672
 
                                          is_if_not_exists,
1673
 
                                          &extra_lock, identifier)))
1674
 
  {
 
1620
  /*
 
1621
    Start a statement transaction before the create if we are using
 
1622
    row-based replication for the statement.  If we are creating a
 
1623
    temporary table, we need to start a statement transaction.
 
1624
  */
 
1625
 
 
1626
  if (!(table= create_table_from_items(session, create_info, create_table,
 
1627
                                       alter_info, &values,
 
1628
                                       &extra_lock)))
1675
1629
    return(-1);                         // abort() deletes table
1676
 
  }
1677
1630
 
1678
1631
  if (extra_lock)
1679
1632
  {
1680
1633
    assert(m_plock == NULL);
1681
1634
 
1682
 
    if (identifier.isTmp())
 
1635
    if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
1683
1636
      m_plock= &m_lock;
1684
1637
    else
1685
1638
      m_plock= &session->extra_lock;
1687
1640
    *m_plock= extra_lock;
1688
1641
  }
1689
1642
 
1690
 
  if (table->getShare()->sizeFields() < values.elements)
 
1643
  if (table->s->fields < values.elements)
1691
1644
  {
1692
1645
    my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
1693
1646
    return(-1);
1694
1647
  }
1695
1648
 
1696
1649
 /* First field to copy */
1697
 
  field= table->getFields() + table->getShare()->sizeFields() - values.elements;
 
1650
  field= table->field+table->s->fields - values.elements;
1698
1651
 
1699
1652
  /* Mark all fields that are given values */
1700
1653
  for (Field **f= field ; *f ; f++)
1701
 
  {
1702
 
    table->setWriteSet((*f)->position());
1703
 
  }
 
1654
    table->setWriteSet((*f)->field_index);
1704
1655
 
1705
1656
  /* Don't set timestamp if used */
1706
1657
  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
1709
1660
  table->restoreRecordAsDefault();      // Get empty record
1710
1661
  session->cuted_fields=0;
1711
1662
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
1712
 
    table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
1713
 
 
 
1663
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
1714
1664
  if (info.handle_duplicates == DUP_REPLACE)
1715
 
    table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1716
 
 
 
1665
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1717
1666
  if (info.handle_duplicates == DUP_UPDATE)
1718
 
    table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1719
 
 
1720
 
  table->cursor->ha_start_bulk_insert((ha_rows) 0);
1721
 
  session->setAbortOnWarning(not info.ignore);
 
1667
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
1668
  table->file->ha_start_bulk_insert((ha_rows) 0);
 
1669
  session->abort_on_warning= !info.ignore;
1722
1670
  if (check_that_all_fields_are_given_values(session, table, table_list))
1723
1671
    return(1);
1724
 
 
1725
1672
  table->mark_columns_needed_for_insert();
1726
 
  table->cursor->extra(HA_EXTRA_WRITE_CACHE);
 
1673
  table->file->extra(HA_EXTRA_WRITE_CACHE);
1727
1674
  return(0);
1728
1675
}
1729
1676
 
1730
1677
void select_create::store_values(List<Item> &values)
1731
1678
{
1732
 
  fill_record(session, field, values, true);
 
1679
  fill_record(session, field, values, 1);
1733
1680
}
1734
1681
 
1735
1682
 
1736
 
void select_create::send_error(drizzled::error_t errcode,const char *err)
 
1683
void select_create::send_error(uint32_t errcode,const char *err)
1737
1684
{
 
1685
 
 
1686
 
1738
1687
  /*
1739
1688
    This will execute any rollbacks that are necessary before writing
1740
1689
    the transcation cache.
1747
1696
 
1748
1697
  */
1749
1698
  select_insert::send_error(errcode, err);
 
1699
 
 
1700
  return;
1750
1701
}
1751
1702
 
1752
1703
 
1762
1713
      tables.  This can fail, but we should unlock the table
1763
1714
      nevertheless.
1764
1715
    */
1765
 
    if (!table->getShare()->getType())
 
1716
    if (!table->s->tmp_table)
1766
1717
    {
1767
 
      TransactionServices &transaction_services= TransactionServices::singleton();
1768
 
      transaction_services.autocommitOrRollback(*session, 0);
 
1718
      ha_autocommit_or_rollback(session, 0);
1769
1719
      (void) session->endActiveTransaction();
1770
1720
    }
1771
1721
 
1772
 
    table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1773
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1722
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1723
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1774
1724
    if (m_plock)
1775
1725
    {
1776
 
      session->unlockTables(*m_plock);
 
1726
      mysql_unlock_tables(session, *m_plock);
1777
1727
      *m_plock= NULL;
1778
1728
      m_plock= NULL;
1779
1729
    }
1784
1734
 
1785
1735
void select_create::abort()
1786
1736
{
 
1737
 
 
1738
 
1787
1739
  /*
1788
1740
    In select_insert::abort() we roll back the statement, including
1789
1741
    truncating the transaction cache of the binary log. To do this, we
1800
1752
    log state.
1801
1753
  */
1802
1754
  select_insert::abort();
 
1755
  session->transaction.stmt.modified_non_trans_table= false;
 
1756
 
1803
1757
 
1804
1758
  if (m_plock)
1805
1759
  {
1806
 
    session->unlockTables(*m_plock);
 
1760
    mysql_unlock_tables(session, *m_plock);
1807
1761
    *m_plock= NULL;
1808
1762
    m_plock= NULL;
1809
1763
  }
1810
1764
 
1811
1765
  if (table)
1812
1766
  {
1813
 
    table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1814
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1815
 
    if (not create_info->table_existed)
1816
 
      session->drop_open_table(table, identifier);
1817
 
    table= NULL;                                    // Safety
 
1767
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1768
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1769
    if (!create_info->table_existed)
 
1770
      drop_open_table(session, table, create_table->db, create_table->table_name);
 
1771
    table=0;                                    // Safety
1818
1772
  }
 
1773
  return;
1819
1774
}
1820
1775
 
1821
 
} /* namespace drizzled */
 
1776
 
 
1777
/*****************************************************************************
 
1778
  Instansiate templates
 
1779
*****************************************************************************/
 
1780
 
 
1781
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
1782
template class List_iterator_fast<List_item>;
 
1783
#endif /* HAVE_EXPLICIT_TEMPLATE_INSTANTIATION */