~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_insert.cc

  • Committer: Brian Aker
  • Date: 2008-12-19 07:02:38 UTC
  • Revision ID: brian@tangent.org-20081219070238-569uxp3vsr6r37v1
Updated/fix to foreign key test.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
/* Insert of records */
18
18
 
19
 
#include "config.h"
 
19
/*
 
20
  INSERT DELAYED
 
21
 
 
22
  Drizzle has a different form of DELAYED then MySQL. DELAYED is just
 
23
  a hint to the the sorage engine (which can then do whatever it likes.
 
24
*/
 
25
#include <drizzled/server_includes.h>
20
26
#include <drizzled/sql_select.h>
21
27
#include <drizzled/show.h>
 
28
#include <drizzled/replication/mi.h>
22
29
#include <drizzled/error.h>
23
30
#include <drizzled/name_resolution_context_state.h>
 
31
#include <drizzled/slave.h>
 
32
#include <drizzled/sql_parse.h>
24
33
#include <drizzled/probes.h>
 
34
#include <drizzled/tableop_hooks.h>
25
35
#include <drizzled/sql_base.h>
26
36
#include <drizzled/sql_load.h>
27
37
#include <drizzled/field/timestamp.h>
28
38
#include <drizzled/lock.h>
29
 
#include "drizzled/sql_table.h"
30
 
#include "drizzled/pthread_globals.h"
31
 
#include "drizzled/transaction_services.h"
32
 
#include "drizzled/plugin/transactional_storage_engine.h"
33
 
 
34
 
namespace drizzled
35
 
{
36
 
 
37
 
extern plugin::StorageEngine *heap_engine;
38
 
extern plugin::StorageEngine *myisam_engine;
 
39
 
39
40
 
40
41
/*
41
42
  Check if insert fields are correct.
61
62
static int check_insert_fields(Session *session, TableList *table_list,
62
63
                               List<Item> &fields, List<Item> &values,
63
64
                               bool check_unique,
64
 
                               table_map *)
 
65
                               table_map *map __attribute__((unused)))
65
66
{
66
67
  Table *table= table_list->table;
67
68
 
78
79
      No fields are provided so all fields must be provided in the values.
79
80
      Thus we set all bits in the write set.
80
81
    */
81
 
    table->setWriteSet();
 
82
    bitmap_set_all(table->write_set);
82
83
  }
83
84
  else
84
85
  {                                             // Part field list
85
 
    Select_Lex *select_lex= &session->lex->select_lex;
 
86
    SELECT_LEX *select_lex= &session->lex->select_lex;
86
87
    Name_resolution_context *context= &select_lex->context;
87
88
    Name_resolution_context_state ctx_state;
88
89
    int res;
119
120
    }
120
121
    if (table->timestamp_field) // Don't automaticly set timestamp if used
121
122
    {
122
 
      if (table->timestamp_field->isWriteSet())
 
123
      if (bitmap_is_set(table->write_set,
 
124
                        table->timestamp_field->field_index))
123
125
        clear_timestamp_auto_bits(table->timestamp_field_type,
124
126
                                  TIMESTAMP_AUTO_SET_ON_INSERT);
125
127
      else
126
128
      {
127
 
        table->setWriteSet(table->timestamp_field->field_index);
 
129
        bitmap_set_bit(table->write_set,
 
130
                       table->timestamp_field->field_index);
128
131
      }
129
132
    }
 
133
    /* Mark all virtual columns for write*/
 
134
    if (table->vfield)
 
135
      table->mark_virtual_columns();
130
136
  }
131
137
 
132
138
  return 0;
154
160
 
155
161
static int check_update_fields(Session *session, TableList *insert_table_list,
156
162
                               List<Item> &update_fields,
157
 
                               table_map *)
 
163
                               table_map *map __attribute__((unused)))
158
164
{
159
165
  Table *table= insert_table_list->table;
160
166
  bool timestamp_mark= false;
165
171
      Unmark the timestamp field so that we can check if this is modified
166
172
      by update_fields
167
173
    */
168
 
    timestamp_mark= table->write_set->testAndClear(table->timestamp_field->field_index);
 
174
    timestamp_mark= bitmap_test_and_clear(table->write_set,
 
175
                                          table->timestamp_field->field_index);
169
176
  }
170
177
 
171
178
  /* Check the fields we are going to modify */
175
182
  if (table->timestamp_field)
176
183
  {
177
184
    /* Don't set timestamp column if this is modified. */
178
 
    if (table->timestamp_field->isWriteSet())
 
185
    if (bitmap_is_set(table->write_set,
 
186
                      table->timestamp_field->field_index))
179
187
      clear_timestamp_auto_bits(table->timestamp_field_type,
180
188
                                TIMESTAMP_AUTO_SET_ON_UPDATE);
181
189
    if (timestamp_mark)
182
 
      table->setWriteSet(table->timestamp_field->field_index);
 
190
      bitmap_set_bit(table->write_set,
 
191
                     table->timestamp_field->field_index);
183
192
  }
184
193
  return 0;
185
194
}
194
203
*/
195
204
 
196
205
static
197
 
void upgrade_lock_type(Session *,
 
206
void upgrade_lock_type(Session *session __attribute__((unused)),
198
207
                       thr_lock_type *lock_type,
199
208
                       enum_duplicates duplic,
200
 
                       bool )
 
209
                       bool is_multi_insert __attribute__((unused)))
201
210
{
202
211
  if (duplic == DUP_UPDATE ||
203
212
      (duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT))
227
236
  int error;
228
237
  bool transactional_table, joins_freed= false;
229
238
  bool changed;
 
239
  bool was_insert_delayed= (table_list->lock_type ==  TL_WRITE_DELAYED);
230
240
  uint32_t value_count;
231
241
  ulong counter = 1;
232
242
  uint64_t id;
247
257
  upgrade_lock_type(session, &table_list->lock_type, duplic,
248
258
                    values_list.elements > 1);
249
259
 
250
 
  if (session->openTablesLock(table_list))
 
260
  /*
 
261
    We can't write-delayed into a table locked with LOCK TABLES:
 
262
    this will lead to a deadlock, since the delayed thread will
 
263
    never be able to get a lock on the table. QQQ: why not
 
264
    upgrade the lock here instead?
 
265
  */
 
266
  if (table_list->lock_type == TL_WRITE_DELAYED && session->locked_tables &&
 
267
      find_locked_table(session, table_list->db, table_list->table_name))
251
268
  {
252
 
    DRIZZLE_INSERT_DONE(1, 0);
253
 
    return true;
 
269
    my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
 
270
             table_list->table_name);
 
271
    return(true);
254
272
  }
255
273
 
 
274
  {
 
275
    if (open_and_lock_tables(session, table_list))
 
276
      return(true);
 
277
  }
256
278
  lock_type= table_list->lock_type;
257
279
 
258
280
  session->set_proc_info("init");
307
329
  ctx_state.restore_state(context, table_list);
308
330
 
309
331
  /*
310
 
    Fill in the given fields and dump it to the table cursor
 
332
    Fill in the given fields and dump it to the table file
311
333
  */
312
334
  memset(&info, 0, sizeof(info));
313
335
  info.ignore= ignore;
321
343
    to NULL.
322
344
  */
323
345
  session->count_cuted_fields= ((values_list.elements == 1 &&
324
 
                                 !ignore) ?
325
 
                                CHECK_FIELD_ERROR_FOR_NULL :
326
 
                                CHECK_FIELD_WARN);
 
346
                             !ignore) ?
 
347
                            CHECK_FIELD_ERROR_FOR_NULL :
 
348
                            CHECK_FIELD_WARN);
327
349
  session->cuted_fields = 0L;
328
350
  table->next_number_field=table->found_next_number_field;
329
351
 
 
352
  if (session->slave_thread &&
 
353
      (info.handle_duplicates == DUP_UPDATE) &&
 
354
      (table->next_number_field != NULL) &&
 
355
      rpl_master_has_bug(&active_mi->rli, 24432))
 
356
    goto abort;
 
357
 
330
358
  error=0;
331
359
  session->set_proc_info("update");
332
360
  if (duplic == DUP_REPLACE)
333
 
    table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
361
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
334
362
  if (duplic == DUP_UPDATE)
335
 
    table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
363
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
336
364
  {
337
365
    if (duplic != DUP_ERROR || ignore)
338
 
      table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
339
 
    table->cursor->ha_start_bulk_insert(values_list.elements);
 
366
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
367
    table->file->ha_start_bulk_insert(values_list.elements);
340
368
  }
341
369
 
342
370
 
348
376
  {
349
377
    if (fields.elements || !value_count)
350
378
    {
351
 
      table->restoreRecordAsDefault();  // Get empty record
352
 
      if (fill_record(session, fields, *values))
 
379
      restore_record(table,s->default_values);  // Get empty record
 
380
      if (fill_record(session, fields, *values, 0))
353
381
      {
354
382
        if (values_list.elements != 1 && ! session->is_error())
355
383
        {
367
395
    }
368
396
    else
369
397
    {
370
 
      table->restoreRecordAsDefault();  // Get empty record
371
 
 
372
 
      if (fill_record(session, table->field, *values))
 
398
      if (session->used_tables)                 // Column used in values()
 
399
        restore_record(table,s->default_values);        // Get empty record
 
400
      else
 
401
      {
 
402
        /*
 
403
          Fix delete marker. No need to restore rest of record since it will
 
404
          be overwritten by fill_record() anyway (and fill_record() does not
 
405
          use default values in this case).
 
406
        */
 
407
        table->record[0][0]= table->s->default_values[0];
 
408
      }
 
409
      if (fill_record(session, table->field, *values, 0))
373
410
      {
374
411
        if (values_list.elements != 1 && ! session->is_error())
375
412
        {
381
418
      }
382
419
    }
383
420
 
384
 
    // Release latches in case bulk insert takes a long time
385
 
    plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
386
 
 
387
421
    error=write_record(session, table ,&info);
388
422
    if (error)
389
423
      break;
402
436
      Do not do this release if this is a delayed insert, it would steal
403
437
      auto_inc values from the delayed_insert thread as they share Table.
404
438
    */
405
 
    table->cursor->ha_release_auto_increment();
406
 
    if (table->cursor->ha_end_bulk_insert() && !error)
 
439
    table->file->ha_release_auto_increment();
 
440
    if (table->file->ha_end_bulk_insert() && !error)
407
441
    {
408
 
      table->print_error(errno,MYF(0));
 
442
      table->file->print_error(my_errno,MYF(0));
409
443
      error=1;
410
444
    }
411
445
    if (duplic != DUP_ERROR || ignore)
412
 
      table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
413
 
 
414
 
    transactional_table= table->cursor->has_transactions();
415
 
 
416
 
    changed= (info.copied || info.deleted || info.updated);
417
 
    if ((changed && error <= 0) || session->transaction.stmt.hasModifiedNonTransData())
418
 
    {
419
 
      if (session->transaction.stmt.hasModifiedNonTransData())
420
 
        session->transaction.all.markModifiedNonTransData();
421
 
    }
422
 
    assert(transactional_table || !changed || session->transaction.stmt.hasModifiedNonTransData());
 
446
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
447
 
 
448
    transactional_table= table->file->has_transactions();
 
449
 
 
450
    if ((changed= (info.copied || info.deleted || info.updated)))
 
451
    {
 
452
      /*
 
453
        Invalidate the table in the query cache if something changed.
 
454
        For the transactional algorithm to work the invalidation must be
 
455
        before binlog writing and ha_autocommit_or_rollback
 
456
      */
 
457
    }
 
458
    if ((changed && error <= 0) || session->transaction.stmt.modified_non_trans_table || was_insert_delayed)
 
459
    {
 
460
      if (session->transaction.stmt.modified_non_trans_table)
 
461
        session->transaction.all.modified_non_trans_table= true;
 
462
    }
 
463
    assert(transactional_table || !changed ||
 
464
                session->transaction.stmt.modified_non_trans_table);
423
465
 
424
466
  }
425
467
  session->set_proc_info("end");
443
485
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
444
486
  table->auto_increment_field_not_null= false;
445
487
  if (duplic == DUP_REPLACE)
446
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
488
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
447
489
 
448
490
  if (error)
449
491
    goto abort;
450
492
  if (values_list.elements == 1 && (!(session->options & OPTION_WARNINGS) ||
451
493
                                    !session->cuted_fields))
452
494
  {
453
 
    session->row_count_func= info.copied + info.deleted + info.updated;
454
 
    session->my_ok((ulong) session->row_count_func,
455
 
                   info.copied + info.deleted + info.touched, id);
 
495
    session->row_count_func= info.copied + info.deleted +
 
496
                         ((session->client_capabilities & CLIENT_FOUND_ROWS) ?
 
497
                          info.touched : info.updated);
 
498
    my_ok(session, (ulong) session->row_count_func, id);
456
499
  }
457
500
  else
458
501
  {
459
502
    char buff[160];
 
503
    ha_rows updated=((session->client_capabilities & CLIENT_FOUND_ROWS) ?
 
504
                     info.touched : info.updated);
460
505
    if (ignore)
461
 
      snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
 
506
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
462
507
              (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
463
508
    else
464
 
      snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
465
 
              (ulong) (info.deleted + info.updated), (ulong) session->cuted_fields);
466
 
    session->row_count_func= info.copied + info.deleted + info.updated;
467
 
    session->my_ok((ulong) session->row_count_func,
468
 
                   info.copied + info.deleted + info.touched, id, buff);
 
509
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
 
510
              (ulong) (info.deleted + updated), (ulong) session->cuted_fields);
 
511
    session->row_count_func= info.copied + info.deleted + updated;
 
512
    ::my_ok(session, (ulong) session->row_count_func, id, buff);
469
513
  }
470
514
  session->abort_on_warning= 0;
471
 
  DRIZZLE_INSERT_DONE(0, session->row_count_func);
472
 
  return false;
 
515
  DRIZZLE_INSERT_END();
 
516
  return(false);
473
517
 
474
518
abort:
475
519
  if (table != NULL)
476
 
    table->cursor->ha_release_auto_increment();
 
520
    table->file->ha_release_auto_increment();
477
521
  if (!joins_freed)
478
522
    free_underlaid_joins(session, &session->lex->select_lex);
479
523
  session->abort_on_warning= 0;
480
 
  DRIZZLE_INSERT_DONE(1, 0);
481
 
  return true;
 
524
  DRIZZLE_INSERT_END();
 
525
  return(true);
482
526
}
483
527
 
484
528
 
499
543
*/
500
544
 
501
545
static bool mysql_prepare_insert_check_table(Session *session, TableList *table_list,
502
 
                                             List<Item> &,
 
546
                                             List<Item> &fields __attribute__((unused)),
503
547
                                             bool select_insert)
504
548
{
505
549
 
557
601
                          Table *table, List<Item> &fields, List_item *values,
558
602
                          List<Item> &update_fields, List<Item> &update_values,
559
603
                          enum_duplicates duplic,
560
 
                          COND **,
 
604
                          COND **where __attribute__((unused)),
561
605
                          bool select_insert,
562
606
                          bool check_fields, bool abort_on_warning)
563
607
{
564
 
  Select_Lex *select_lex= &session->lex->select_lex;
 
608
  SELECT_LEX *select_lex= &session->lex->select_lex;
565
609
  Name_resolution_context *context= &select_lex->context;
566
610
  Name_resolution_context_state ctx_state;
567
611
  bool insert_into_view= (0 != 0);
574
618
  /*
575
619
    For subqueries in VALUES() we should not see the table in which we are
576
620
    inserting (for INSERT ... SELECT this is done by changing table_list,
577
 
    because INSERT ... SELECT share Select_Lex it with SELECT.
 
621
    because INSERT ... SELECT share SELECT_LEX it with SELECT.
578
622
  */
579
623
  if (!select_insert)
580
624
  {
581
 
    for (Select_Lex_Unit *un= select_lex->first_inner_unit();
 
625
    for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit();
582
626
         un;
583
627
         un= un->next_unit())
584
628
    {
585
 
      for (Select_Lex *sl= un->first_select();
 
629
      for (SELECT_LEX *sl= un->first_select();
586
630
           sl;
587
631
           sl= sl->next_select())
588
632
      {
654
698
  if (!select_insert)
655
699
  {
656
700
    TableList *duplicate;
657
 
    if ((duplicate= unique_table(table_list, table_list->next_global, true)))
 
701
    if ((duplicate= unique_table(session, table_list, table_list->next_global, 1)))
658
702
    {
659
 
      my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->alias);
660
 
 
661
 
      return true;
 
703
      update_non_unique_table_error(table_list, "INSERT", duplicate);
 
704
      return(true);
662
705
    }
663
706
  }
664
707
  if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
665
708
    table->prepare_for_position();
666
 
 
667
 
  return false;
 
709
  return(false);
668
710
}
669
711
 
670
712
 
697
739
    then both on update triggers will work instead. Similarly both on
698
740
    delete triggers will be invoked if we will delete conflicting records.
699
741
 
700
 
    Sets session->transaction.stmt.modified_non_trans_data to true if table which is updated didn't have
 
742
    Sets session->transaction.stmt.modified_non_trans_table to true if table which is updated didn't have
701
743
    transactions.
702
744
 
703
745
  RETURN VALUE
710
752
{
711
753
  int error;
712
754
  char *key=0;
713
 
  MyBitmap *save_read_set, *save_write_set;
714
 
  uint64_t prev_insert_id= table->cursor->next_insert_id;
 
755
  MY_BITMAP *save_read_set, *save_write_set;
 
756
  uint64_t prev_insert_id= table->file->next_insert_id;
715
757
  uint64_t insert_id_for_cur_row= 0;
716
758
 
717
759
 
719
761
  save_read_set=  table->read_set;
720
762
  save_write_set= table->write_set;
721
763
 
722
 
  if (info->handle_duplicates == DUP_REPLACE || info->handle_duplicates == DUP_UPDATE)
 
764
  if (info->handle_duplicates == DUP_REPLACE ||
 
765
      info->handle_duplicates == DUP_UPDATE)
723
766
  {
724
 
    while ((error=table->cursor->ha_write_row(table->record[0])))
 
767
    while ((error=table->file->ha_write_row(table->record[0])))
725
768
    {
726
769
      uint32_t key_nr;
727
770
      /*
731
774
        the autogenerated value to avoid session->insert_id_for_cur_row to become
732
775
        0.
733
776
      */
734
 
      if (table->cursor->insert_id_for_cur_row > 0)
735
 
        insert_id_for_cur_row= table->cursor->insert_id_for_cur_row;
 
777
      if (table->file->insert_id_for_cur_row > 0)
 
778
        insert_id_for_cur_row= table->file->insert_id_for_cur_row;
736
779
      else
737
 
        table->cursor->insert_id_for_cur_row= insert_id_for_cur_row;
 
780
        table->file->insert_id_for_cur_row= insert_id_for_cur_row;
738
781
      bool is_duplicate_key_error;
739
 
      if (table->cursor->is_fatal_error(error, HA_CHECK_DUP))
 
782
      if (table->file->is_fatal_error(error, HA_CHECK_DUP))
740
783
        goto err;
741
 
      is_duplicate_key_error= table->cursor->is_fatal_error(error, 0);
 
784
      is_duplicate_key_error= table->file->is_fatal_error(error, 0);
742
785
      if (!is_duplicate_key_error)
743
786
      {
744
787
        /*
750
793
          goto gok_or_after_err; /* Ignoring a not fatal error, return 0 */
751
794
        goto err;
752
795
      }
753
 
      if ((int) (key_nr = table->get_dup_key(error)) < 0)
 
796
      if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
754
797
      {
755
798
        error= HA_ERR_FOUND_DUPP_KEY;         /* Database can't find key */
756
799
        goto err;
767
810
          key_nr == table->s->next_number_index &&
768
811
          (insert_id_for_cur_row > 0))
769
812
        goto err;
770
 
      if (table->cursor->getEngine()->check_flag(HTON_BIT_DUPLICATE_POS))
 
813
      if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
771
814
      {
772
 
        if (table->cursor->rnd_pos(table->record[1],table->cursor->dup_ref))
 
815
        if (table->file->rnd_pos(table->record[1],table->file->dup_ref))
773
816
          goto err;
774
817
      }
775
818
      else
776
819
      {
777
 
        if (table->cursor->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
 
820
        if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
778
821
        {
779
 
          error=errno;
 
822
          error=my_errno;
780
823
          goto err;
781
824
        }
782
825
 
789
832
          }
790
833
        }
791
834
        key_copy((unsigned char*) key,table->record[0],table->key_info+key_nr,0);
792
 
        if ((error=(table->cursor->index_read_idx_map(table->record[1],key_nr,
 
835
        if ((error=(table->file->index_read_idx_map(table->record[1],key_nr,
793
836
                                                    (unsigned char*) key, HA_WHOLE_KEY,
794
837
                                                    HA_READ_KEY_EXACT))))
795
838
          goto err;
802
845
          an error is returned
803
846
        */
804
847
        assert(table->insert_values != NULL);
805
 
        table->storeRecordAsInsert();
806
 
        table->restoreRecord();
 
848
        store_record(table,insert_values);
 
849
        restore_record(table,record[1]);
807
850
        assert(info->update_fields->elements ==
808
851
                    info->update_values->elements);
809
852
        if (fill_record(session, *info->update_fields,
811
854
                                                 info->ignore))
812
855
          goto before_err;
813
856
 
814
 
        table->cursor->restore_auto_increment(prev_insert_id);
 
857
        table->file->restore_auto_increment(prev_insert_id);
815
858
        if (table->next_number_field)
816
 
          table->cursor->adjust_next_insert_id_after_explicit_value(
 
859
          table->file->adjust_next_insert_id_after_explicit_value(
817
860
            table->next_number_field->val_int());
818
861
        info->touched++;
819
 
        if ((table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ) &&
 
862
        if ((table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ &&
820
863
             !bitmap_is_subset(table->write_set, table->read_set)) ||
821
864
            table->compare_record())
822
865
        {
823
 
          if ((error=table->cursor->ha_update_row(table->record[1],
 
866
          if ((error=table->file->ha_update_row(table->record[1],
824
867
                                                table->record[0])) &&
825
868
              error != HA_ERR_RECORD_IS_THE_SAME)
826
869
          {
827
870
            if (info->ignore &&
828
 
                !table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
 
871
                !table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
829
872
            {
830
873
              goto gok_or_after_err;
831
874
            }
843
886
            Except if LAST_INSERT_ID(#) was in the INSERT query, which is
844
887
            handled separately by Session::arg_of_last_insert_id_function.
845
888
          */
846
 
          insert_id_for_cur_row= table->cursor->insert_id_for_cur_row= 0;
 
889
          insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
847
890
          info->copied++;
848
891
        }
849
892
 
850
893
        if (table->next_number_field)
851
 
          table->cursor->adjust_next_insert_id_after_explicit_value(
 
894
          table->file->adjust_next_insert_id_after_explicit_value(
852
895
            table->next_number_field->val_int());
853
896
        info->touched++;
854
897
 
871
914
          ON UPDATE triggers.
872
915
        */
873
916
        if (last_uniq_key(table,key_nr) &&
874
 
            !table->cursor->referenced_by_foreign_key() &&
 
917
            !table->file->referenced_by_foreign_key() &&
875
918
            (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
876
919
             table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
877
920
        {
878
 
          if ((error=table->cursor->ha_update_row(table->record[1],
 
921
          if ((error=table->file->ha_update_row(table->record[1],
879
922
                                                table->record[0])) &&
880
923
              error != HA_ERR_RECORD_IS_THE_SAME)
881
924
            goto err;
883
926
            info->deleted++;
884
927
          else
885
928
            error= 0;
886
 
          session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
 
929
          session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
887
930
          /*
888
931
            Since we pretend that we have done insert we should call
889
932
            its after triggers.
892
935
        }
893
936
        else
894
937
        {
895
 
          if ((error=table->cursor->ha_delete_row(table->record[1])))
 
938
          if ((error=table->file->ha_delete_row(table->record[1])))
896
939
            goto err;
897
940
          info->deleted++;
898
 
          if (!table->cursor->has_transactions())
899
 
            session->transaction.stmt.markModifiedNonTransData();
 
941
          if (!table->file->has_transactions())
 
942
            session->transaction.stmt.modified_non_trans_table= true;
900
943
          /* Let us attempt do write_row() once more */
901
944
        }
902
945
      }
903
946
    }
904
 
    session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
 
947
    session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
905
948
    /*
906
949
      Restore column maps if they where replaced during an duplicate key
907
950
      problem.
910
953
        table->write_set != save_write_set)
911
954
      table->column_bitmaps_set(save_read_set, save_write_set);
912
955
  }
913
 
  else if ((error=table->cursor->ha_write_row(table->record[0])))
 
956
  else if ((error=table->file->ha_write_row(table->record[0])))
914
957
  {
915
958
    if (!info->ignore ||
916
 
        table->cursor->is_fatal_error(error, HA_CHECK_DUP))
 
959
        table->file->is_fatal_error(error, HA_CHECK_DUP))
917
960
      goto err;
918
 
    table->cursor->restore_auto_increment(prev_insert_id);
 
961
    table->file->restore_auto_increment(prev_insert_id);
919
962
    goto gok_or_after_err;
920
963
  }
921
964
 
922
965
after_n_copied_inc:
923
966
  info->copied++;
924
 
  session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
 
967
  session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
925
968
 
926
969
gok_or_after_err:
927
970
  if (key)
928
971
    free(key);
929
 
  if (!table->cursor->has_transactions())
930
 
    session->transaction.stmt.markModifiedNonTransData();
 
972
  if (!table->file->has_transactions())
 
973
    session->transaction.stmt.modified_non_trans_table= true;
931
974
  return(0);
932
975
 
933
976
err:
935
978
  /* current_select is NULL if this is a delayed insert */
936
979
  if (session->lex->current_select)
937
980
    session->lex->current_select->no_error= 0;        // Give error
938
 
  table->print_error(error,MYF(0));
 
981
  table->file->print_error(error,MYF(0));
939
982
 
940
983
before_err:
941
 
  table->cursor->restore_auto_increment(prev_insert_id);
 
984
  table->file->restore_auto_increment(prev_insert_id);
942
985
  if (key)
943
986
    free(key);
944
987
  table->column_bitmaps_set(save_read_set, save_write_set);
954
997
                                           TableList *)
955
998
{
956
999
  int err= 0;
 
1000
  MY_BITMAP *write_set= entry->write_set;
957
1001
 
958
1002
  for (Field **field=entry->field ; *field ; field++)
959
1003
  {
960
 
    if (((*field)->isWriteSet()) == false)
 
1004
    if (!bitmap_is_set(write_set, (*field)->field_index))
961
1005
    {
962
1006
      /*
963
1007
       * If the field doesn't have any default value
1012
1056
bool mysql_insert_select_prepare(Session *session)
1013
1057
{
1014
1058
  LEX *lex= session->lex;
1015
 
  Select_Lex *select_lex= &lex->select_lex;
 
1059
  SELECT_LEX *select_lex= &lex->select_lex;
1016
1060
 
1017
1061
  /*
1018
 
    Select_Lex do not belong to INSERT statement, so we can't add WHERE
 
1062
    SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
1019
1063
    clause if table is VIEW
1020
1064
  */
1021
1065
 
1057
1101
 
1058
1102
 
1059
1103
int
1060
 
select_insert::prepare(List<Item> &values, Select_Lex_Unit *u)
 
1104
select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
1061
1105
{
1062
1106
  LEX *lex= session->lex;
1063
1107
  int res;
1064
1108
  table_map map= 0;
1065
 
  Select_Lex *lex_current_select_save= lex->current_select;
 
1109
  SELECT_LEX *lex_current_select_save= lex->current_select;
1066
1110
 
1067
1111
 
1068
1112
  unit= u;
1153
1197
    Is table which we are changing used somewhere in other parts of
1154
1198
    query
1155
1199
  */
1156
 
  if (unique_table(table_list, table_list->next_global))
 
1200
  if (unique_table(session, table_list, table_list->next_global, 0))
1157
1201
  {
1158
1202
    /* Using same table for INSERT and SELECT */
1159
1203
    lex->current_select->options|= OPTION_BUFFER_RESULT;
1170
1214
      We won't start bulk inserts at all if this statement uses functions or
1171
1215
      should invoke triggers since they may access to the same table too.
1172
1216
    */
1173
 
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
1217
    table->file->ha_start_bulk_insert((ha_rows) 0);
1174
1218
  }
1175
 
  table->restoreRecordAsDefault();              // Get empty record
 
1219
  restore_record(table,s->default_values);              // Get empty record
1176
1220
  table->next_number_field=table->found_next_number_field;
1177
1221
 
 
1222
  if (session->slave_thread &&
 
1223
      (info.handle_duplicates == DUP_UPDATE) &&
 
1224
      (table->next_number_field != NULL) &&
 
1225
      rpl_master_has_bug(&active_mi->rli, 24432))
 
1226
    return(1);
 
1227
 
1178
1228
  session->cuted_fields=0;
1179
1229
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
1180
 
    table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
1230
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
1181
1231
  if (info.handle_duplicates == DUP_REPLACE)
1182
 
    table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
1232
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1183
1233
  if (info.handle_duplicates == DUP_UPDATE)
1184
 
    table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
1234
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1185
1235
  session->abort_on_warning= !info.ignore;
1186
1236
  table->mark_columns_needed_for_insert();
1187
1237
 
1210
1260
{
1211
1261
 
1212
1262
  if (session->lex->current_select->options & OPTION_BUFFER_RESULT)
1213
 
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
1263
    table->file->ha_start_bulk_insert((ha_rows) 0);
1214
1264
  return(0);
1215
1265
}
1216
1266
 
1228
1278
  {
1229
1279
    table->next_number_field=0;
1230
1280
    table->auto_increment_field_not_null= false;
1231
 
    table->cursor->ha_reset();
 
1281
    table->file->ha_reset();
1232
1282
  }
1233
1283
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
1234
1284
  session->abort_on_warning= 0;
1253
1303
  if (session->is_error())
1254
1304
    return(1);
1255
1305
 
1256
 
  // Release latches in case bulk insert takes a long time
1257
 
  plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
1258
 
 
1259
1306
  error= write_record(session, table, &info);
1260
1307
 
1261
1308
  if (!error)
1270
1317
        originally touched by INSERT ... SELECT, so we have to restore
1271
1318
        their original values for the next row.
1272
1319
      */
1273
 
      table->restoreRecordAsDefault();
 
1320
      restore_record(table, s->default_values);
1274
1321
    }
1275
1322
    if (table->next_number_field)
1276
1323
    {
1295
1342
void select_insert::store_values(List<Item> &values)
1296
1343
{
1297
1344
  if (fields->elements)
1298
 
    fill_record(session, *fields, values, true);
 
1345
    fill_record(session, *fields, values, 1);
1299
1346
  else
1300
 
    fill_record(session, table->field, values, true);
 
1347
    fill_record(session, table->field, values, 1);
1301
1348
}
1302
1349
 
1303
1350
void select_insert::send_error(uint32_t errcode,const char *err)
1313
1360
bool select_insert::send_eof()
1314
1361
{
1315
1362
  int error;
1316
 
  bool const trans_table= table->cursor->has_transactions();
 
1363
  bool const trans_table= table->file->has_transactions();
1317
1364
  uint64_t id;
1318
1365
  bool changed;
1319
1366
 
1320
 
  error= table->cursor->ha_end_bulk_insert();
1321
 
  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1322
 
  table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1367
  error= table->file->ha_end_bulk_insert();
 
1368
  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1369
  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1323
1370
 
1324
1371
  if ((changed= (info.copied || info.deleted || info.updated)))
1325
1372
  {
1327
1374
      We must invalidate the table in the query cache before binlog writing
1328
1375
      and ha_autocommit_or_rollback.
1329
1376
    */
1330
 
    if (session->transaction.stmt.hasModifiedNonTransData())
1331
 
      session->transaction.all.markModifiedNonTransData();
 
1377
    if (session->transaction.stmt.modified_non_trans_table)
 
1378
      session->transaction.all.modified_non_trans_table= true;
1332
1379
  }
1333
1380
  assert(trans_table || !changed ||
1334
 
              session->transaction.stmt.hasModifiedNonTransData());
 
1381
              session->transaction.stmt.modified_non_trans_table);
1335
1382
 
1336
 
  table->cursor->ha_release_auto_increment();
 
1383
  table->file->ha_release_auto_increment();
1337
1384
 
1338
1385
  if (error)
1339
1386
  {
1340
 
    table->print_error(error,MYF(0));
1341
 
    DRIZZLE_INSERT_SELECT_DONE(error, 0);
1342
 
    return 1;
 
1387
    table->file->print_error(error,MYF(0));
 
1388
    return(1);
1343
1389
  }
1344
1390
  char buff[160];
1345
1391
  if (info.ignore)
1346
 
    snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
 
1392
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
1347
1393
            (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
1348
1394
  else
1349
 
    snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
 
1395
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
1350
1396
            (ulong) (info.deleted+info.updated), (ulong) session->cuted_fields);
1351
 
  session->row_count_func= info.copied + info.deleted + info.updated;
 
1397
  session->row_count_func= info.copied + info.deleted +
 
1398
                       ((session->client_capabilities & CLIENT_FOUND_ROWS) ?
 
1399
                        info.touched : info.updated);
1352
1400
 
1353
1401
  id= (session->first_successful_insert_id_in_cur_stmt > 0) ?
1354
1402
    session->first_successful_insert_id_in_cur_stmt :
1355
1403
    (session->arg_of_last_insert_id_function ?
1356
1404
     session->first_successful_insert_id_in_prev_stmt :
1357
1405
     (info.copied ? autoinc_value_of_last_inserted_row : 0));
1358
 
  session->my_ok((ulong) session->row_count_func,
1359
 
                 info.copied + info.deleted + info.touched, id, buff);
1360
 
  DRIZZLE_INSERT_SELECT_DONE(0, session->row_count_func);
1361
 
  return 0;
 
1406
  ::my_ok(session, (ulong) session->row_count_func, id, buff);
 
1407
  return(0);
1362
1408
}
1363
1409
 
1364
1410
void select_insert::abort() {
1374
1420
  {
1375
1421
    bool changed, transactional_table;
1376
1422
 
1377
 
    table->cursor->ha_end_bulk_insert();
 
1423
    table->file->ha_end_bulk_insert();
1378
1424
 
1379
1425
    /*
1380
1426
      If at least one row has been inserted/modified and will stay in
1391
1437
      zero, so no check for that is made.
1392
1438
    */
1393
1439
    changed= (info.copied || info.deleted || info.updated);
1394
 
    transactional_table= table->cursor->has_transactions();
 
1440
    transactional_table= table->file->has_transactions();
1395
1441
    assert(transactional_table || !changed ||
1396
 
                session->transaction.stmt.hasModifiedNonTransData());
1397
 
    table->cursor->ha_release_auto_increment();
1398
 
  }
1399
 
 
1400
 
  if (DRIZZLE_INSERT_SELECT_DONE_ENABLED())
1401
 
  {
1402
 
    DRIZZLE_INSERT_SELECT_DONE(0, info.copied + info.deleted + info.updated);
 
1442
                session->transaction.stmt.modified_non_trans_table);
 
1443
    table->file->ha_release_auto_increment();
1403
1444
  }
1404
1445
 
1405
1446
  return;
1436
1477
  NOTES
1437
1478
    This function behaves differently for base and temporary tables:
1438
1479
    - For base table we assume that either table exists and was pre-opened
1439
 
      and locked at openTablesLock() stage (and in this case we just
 
1480
      and locked at open_and_lock_tables() stage (and in this case we just
1440
1481
      emit error or warning and return pre-opened Table object) or special
1441
1482
      placeholder was put in table cache that guarantees that this table
1442
1483
      won't be created or opened until the placeholder will be removed
1454
1495
 
1455
1496
static Table *create_table_from_items(Session *session, HA_CREATE_INFO *create_info,
1456
1497
                                      TableList *create_table,
1457
 
                                      message::Table &table_proto,
1458
 
                                      AlterInfo *alter_info,
 
1498
                                      Alter_info *alter_info,
1459
1499
                                      List<Item> *items,
1460
 
                                      bool is_if_not_exists,
1461
1500
                                      DRIZZLE_LOCK **lock,
1462
 
                                      TableIdentifier &identifier)
 
1501
                                      Tableop_hooks *hooks)
1463
1502
{
1464
 
  Table tmp_table;              // Used during 'CreateField()'
1465
 
  TableShare share;
 
1503
  Table tmp_table;              // Used during 'Create_field()'
 
1504
  TABLE_SHARE share;
1466
1505
  Table *table= 0;
1467
1506
  uint32_t select_field_count= items->elements;
1468
1507
  /* Add selected items to field list */
1471
1510
  Field *tmp_field;
1472
1511
  bool not_used;
1473
1512
 
1474
 
  if (not (identifier.isTmp()) && create_table->table->db_stat)
 
1513
  if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
 
1514
      create_table->table->db_stat)
1475
1515
  {
1476
 
    /* Table already exists and was open at openTablesLock() stage. */
1477
 
    if (is_if_not_exists)
 
1516
    /* Table already exists and was open at open_and_lock_tables() stage. */
 
1517
    if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
1478
1518
    {
1479
1519
      create_info->table_existed= 1;            // Mark that table existed
1480
1520
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1481
1521
                          ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
1482
1522
                          create_table->table_name);
1483
 
      return create_table->table;
 
1523
      return(create_table->table);
1484
1524
    }
1485
1525
 
1486
1526
    my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
1487
 
    return NULL;
 
1527
    return(0);
1488
1528
  }
1489
1529
 
1490
1530
  tmp_table.alias= 0;
1491
1531
  tmp_table.timestamp_field= 0;
1492
1532
  tmp_table.s= &share;
 
1533
  init_tmp_table_share(session, &share, "", 0, "", "");
1493
1534
 
1494
1535
  tmp_table.s->db_create_options=0;
1495
1536
  tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
1496
 
 
1497
 
  if (not table_proto.engine().name().compare("MyISAM"))
1498
 
    tmp_table.s->db_low_byte_first= true;
1499
 
  else if (not table_proto.engine().name().compare("MEMORY"))
1500
 
    tmp_table.s->db_low_byte_first= true;
1501
 
 
 
1537
  tmp_table.s->db_low_byte_first=
 
1538
        test(create_info->db_type == myisam_hton ||
 
1539
             create_info->db_type == heap_hton);
1502
1540
  tmp_table.null_row= false;
1503
1541
  tmp_table.maybe_null= false;
1504
1542
 
1505
1543
  while ((item=it++))
1506
1544
  {
1507
 
    CreateField *cr_field;
 
1545
    Create_field *cr_field;
1508
1546
    Field *field, *def_field;
1509
1547
    if (item->type() == Item::FUNC_ITEM)
1510
1548
      if (item->result_type() != STRING_RESULT)
1513
1551
        field= item->tmp_table_field_from_field_type(&tmp_table, 0);
1514
1552
    else
1515
1553
      field= create_tmp_field(session, &tmp_table, item, item->type(),
1516
 
                              (Item ***) 0, &tmp_field, &def_field, false,
1517
 
                              false, false, 0);
 
1554
                              (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0,
 
1555
                              0);
1518
1556
    if (!field ||
1519
 
        !(cr_field=new CreateField(field,(item->type() == Item::FIELD_ITEM ?
 
1557
        !(cr_field=new Create_field(field,(item->type() == Item::FIELD_ITEM ?
1520
1558
                                           ((Item_field *)item)->field :
1521
1559
                                           (Field*) 0))))
1522
 
      return NULL;
 
1560
      return(0);
1523
1561
    if (item->maybe_null)
1524
1562
      cr_field->flags &= ~NOT_NULL_FLAG;
1525
1563
    alter_info->create_list.push_back(cr_field);
1531
1569
    Note that we either creating (or opening existing) temporary table or
1532
1570
    creating base table on which name we have exclusive lock. So code below
1533
1571
    should not cause deadlocks or races.
 
1572
 
 
1573
    We don't log the statement, it will be logged later.
 
1574
 
 
1575
    If this is a HEAP table, the automatic DELETE FROM which is written to the
 
1576
    binlog when a HEAP table is opened for the first time since startup, must
 
1577
    not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
 
1578
    don't want to delete from it) 2) it would be written before the CREATE
 
1579
    Table, which is a wrong order. So we keep binary logging disabled when we
 
1580
    open_table().
1534
1581
  */
1535
1582
  {
1536
 
    if (not mysql_create_table_no_lock(session,
1537
 
                                       identifier,
1538
 
                                       create_info,
1539
 
                                       table_proto,
1540
 
                                       alter_info,
1541
 
                                       false,
1542
 
                                       select_field_count,
1543
 
                                       is_if_not_exists))
 
1583
    tmp_disable_binlog(session);
 
1584
    if (!mysql_create_table_no_lock(session, create_table->db,
 
1585
                                    create_table->table_name,
 
1586
                                    create_info, alter_info, 0,
 
1587
                                    select_field_count, true))
1544
1588
    {
1545
 
      if (create_info->table_existed && not identifier.isTmp())
 
1589
      if (create_info->table_existed &&
 
1590
          !(create_info->options & HA_LEX_CREATE_TMP_TABLE))
1546
1591
      {
1547
1592
        /*
1548
1593
          This means that someone created table underneath server
1550
1595
          cluster. We don't have much options but throw an error.
1551
1596
        */
1552
1597
        my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
1553
 
        return NULL;
 
1598
        return(0);
1554
1599
      }
1555
1600
 
1556
 
      if (not identifier.isTmp())
 
1601
      if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
1557
1602
      {
1558
 
        pthread_mutex_lock(&LOCK_open); /* CREATE TABLE... has found that the table already exists for insert and is adapting to use it */
1559
 
        if (session->reopen_name_locked_table(create_table, false))
 
1603
        pthread_mutex_lock(&LOCK_open);
 
1604
        if (reopen_name_locked_table(session, create_table, false))
1560
1605
        {
1561
 
          quick_rm_table(*session, identifier);
 
1606
          quick_rm_table(create_info->db_type, create_table->db,
 
1607
                         table_case_name(create_info, create_table->table_name),
 
1608
                         0);
1562
1609
        }
1563
1610
        else
1564
1611
          table= create_table->table;
1566
1613
      }
1567
1614
      else
1568
1615
      {
1569
 
        if (not (table= session->openTable(create_table, (bool*) 0,
1570
 
                                           DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
1571
 
            not create_info->table_existed)
 
1616
        if (!(table= open_table(session, create_table, (bool*) 0,
 
1617
                                DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
 
1618
            !create_info->table_existed)
1572
1619
        {
1573
1620
          /*
1574
1621
            This shouldn't happen as creation of temporary table should make
1575
1622
            it preparable for open. But let us do close_temporary_table() here
1576
1623
            just in case.
1577
1624
          */
1578
 
          session->drop_temporary_table(create_table);
 
1625
          drop_temporary_table(session, create_table);
1579
1626
        }
1580
1627
      }
1581
1628
    }
 
1629
    reenable_binlog(session);
1582
1630
    if (!table)                                   // open failed
1583
 
      return NULL;
 
1631
      return(0);
1584
1632
  }
1585
1633
 
1586
1634
  table->reginfo.lock_type=TL_WRITE;
 
1635
  hooks->prelock(&table, 1);                    // Call prelock hooks
1587
1636
  if (! ((*lock)= mysql_lock_tables(session, &table, 1,
1588
 
                                    DRIZZLE_LOCK_IGNORE_FLUSH, &not_used)))
 
1637
                                    DRIZZLE_LOCK_IGNORE_FLUSH, &not_used)) ||
 
1638
        hooks->postlock(&table, 1))
1589
1639
  {
1590
1640
    if (*lock)
1591
1641
    {
1593
1643
      *lock= 0;
1594
1644
    }
1595
1645
 
1596
 
    if (not create_info->table_existed)
1597
 
      session->drop_open_table(table, identifier);
1598
 
    return NULL;
 
1646
    if (!create_info->table_existed)
 
1647
      drop_open_table(session, table, create_table->db, create_table->table_name);
 
1648
    return(0);
1599
1649
  }
1600
 
 
1601
 
  return table;
 
1650
  return(table);
1602
1651
}
1603
1652
 
1604
1653
 
1605
1654
int
1606
 
select_create::prepare(List<Item> &values, Select_Lex_Unit *u)
 
1655
select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
1607
1656
{
1608
1657
  DRIZZLE_LOCK *extra_lock= NULL;
 
1658
 
 
1659
 
 
1660
  Tableop_hooks *hook_ptr= NULL;
1609
1661
  /*
1610
 
    For replication, the CREATE-SELECT statement is written
1611
 
    in two pieces: the first transaction messsage contains 
1612
 
    the CREATE TABLE statement as a CreateTableStatement message
1613
 
    necessary to create the table.
1614
 
    
1615
 
    The second transaction message contains all the InsertStatement
1616
 
    and associated InsertRecords that should go into the table.
 
1662
    For row-based replication, the CREATE-SELECT statement is written
 
1663
    in two pieces: the first one contain the CREATE TABLE statement
 
1664
    necessary to create the table and the second part contain the rows
 
1665
    that should go into the table.
 
1666
 
 
1667
    For non-temporary tables, the start of the CREATE-SELECT
 
1668
    implicitly commits the previous transaction, and all events
 
1669
    forming the statement will be stored the transaction cache. At end
 
1670
    of the statement, the entire statement is committed as a
 
1671
    transaction, and all events are written to the binary log.
 
1672
 
 
1673
    On the master, the table is locked for the duration of the
 
1674
    statement, but since the CREATE part is replicated as a simple
 
1675
    statement, there is no way to lock the table for accesses on the
 
1676
    slave.  Hence, we have to hold on to the CREATE part of the
 
1677
    statement until the statement has finished.
1617
1678
   */
 
1679
  class MY_HOOKS : public Tableop_hooks {
 
1680
  public:
 
1681
    MY_HOOKS(select_create *x, TableList *create_table,
 
1682
             TableList *select_tables)
 
1683
      : ptr(x), all_tables(*create_table)
 
1684
      {
 
1685
        all_tables.next_global= select_tables;
 
1686
      }
 
1687
 
 
1688
  private:
 
1689
    virtual int do_postlock(Table **tables, uint32_t count)
 
1690
    {
 
1691
      Table const *const table = *tables;
 
1692
      if (drizzle_bin_log.is_open()
 
1693
          && !table->s->tmp_table
 
1694
          && !ptr->get_create_info()->table_existed)
 
1695
      {
 
1696
        ptr->binlog_show_create_table(tables, count);
 
1697
      }
 
1698
      return 0;
 
1699
    }
 
1700
 
 
1701
    select_create *ptr;
 
1702
    TableList all_tables;
 
1703
  };
 
1704
 
 
1705
  MY_HOOKS hooks(this, create_table, select_tables);
 
1706
  hook_ptr= &hooks;
1618
1707
 
1619
1708
  unit= u;
1620
1709
 
1621
 
  if (not (table= create_table_from_items(session, create_info, create_table,
1622
 
                                          table_proto,
1623
 
                                          alter_info, &values,
1624
 
                                          is_if_not_exists,
1625
 
                                          &extra_lock, identifier)))
 
1710
  /*
 
1711
    Start a statement transaction before the create if we are using
 
1712
    row-based replication for the statement.  If we are creating a
 
1713
    temporary table, we need to start a statement transaction.
 
1714
  */
 
1715
  if ((session->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0
 
1716
      && drizzle_bin_log.is_open())
 
1717
  {
 
1718
  }
 
1719
 
 
1720
  if (!(table= create_table_from_items(session, create_info, create_table,
 
1721
                                       alter_info, &values,
 
1722
                                       &extra_lock, hook_ptr)))
1626
1723
    return(-1);                         // abort() deletes table
1627
1724
 
1628
1725
  if (extra_lock)
1629
1726
  {
1630
1727
    assert(m_plock == NULL);
1631
1728
 
1632
 
    if (identifier.isTmp())
 
1729
    if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
1633
1730
      m_plock= &m_lock;
1634
1731
    else
1635
1732
      m_plock= &session->extra_lock;
1648
1745
 
1649
1746
  /* Mark all fields that are given values */
1650
1747
  for (Field **f= field ; *f ; f++)
1651
 
    table->setWriteSet((*f)->field_index);
 
1748
    bitmap_set_bit(table->write_set, (*f)->field_index);
1652
1749
 
1653
1750
  /* Don't set timestamp if used */
1654
1751
  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
1655
1752
  table->next_number_field=table->found_next_number_field;
1656
1753
 
1657
 
  table->restoreRecordAsDefault();      // Get empty record
 
1754
  restore_record(table,s->default_values);      // Get empty record
1658
1755
  session->cuted_fields=0;
1659
1756
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
1660
 
    table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
1757
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
1661
1758
  if (info.handle_duplicates == DUP_REPLACE)
1662
 
    table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
1759
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1663
1760
  if (info.handle_duplicates == DUP_UPDATE)
1664
 
    table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1665
 
  table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
1761
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
1762
  table->file->ha_start_bulk_insert((ha_rows) 0);
1666
1763
  session->abort_on_warning= !info.ignore;
1667
1764
  if (check_that_all_fields_are_given_values(session, table, table_list))
1668
1765
    return(1);
1669
1766
  table->mark_columns_needed_for_insert();
1670
 
  table->cursor->extra(HA_EXTRA_WRITE_CACHE);
 
1767
  table->file->extra(HA_EXTRA_WRITE_CACHE);
1671
1768
  return(0);
1672
1769
}
1673
1770
 
 
1771
void
 
1772
select_create::binlog_show_create_table(Table **tables, uint32_t count)
 
1773
{
 
1774
  /*
 
1775
    Note 1: In RBR mode, we generate a CREATE TABLE statement for the
 
1776
    created table by calling store_create_info() (behaves as SHOW
 
1777
    CREATE TABLE).  In the event of an error, nothing should be
 
1778
    written to the binary log, even if the table is non-transactional;
 
1779
    therefore we pretend that the generated CREATE TABLE statement is
 
1780
    for a transactional table.  The event will then be put in the
 
1781
    transaction cache, and any subsequent events (e.g., table-map
 
1782
    events and binrow events) will also be put there.  We can then use
 
1783
    ha_autocommit_or_rollback() to either throw away the entire
 
1784
    kaboodle of events, or write them to the binary log.
 
1785
 
 
1786
    We write the CREATE TABLE statement here and not in prepare()
 
1787
    since there potentially are sub-selects or accesses to information
 
1788
    schema that will do a close_thread_tables(), destroying the
 
1789
    statement transaction cache.
 
1790
  */
 
1791
  assert(tables && *tables && count > 0);
 
1792
 
 
1793
  char buf[2048];
 
1794
  String query(buf, sizeof(buf), system_charset_info);
 
1795
  int result;
 
1796
  TableList tmp_table_list;
 
1797
 
 
1798
  memset(&tmp_table_list, 0, sizeof(tmp_table_list));
 
1799
  tmp_table_list.table = *tables;
 
1800
  query.length(0);      // Have to zero it since constructor doesn't
 
1801
 
 
1802
  result= store_create_info(session, &tmp_table_list, &query, create_info);
 
1803
  assert(result == 0); /* store_create_info() always return 0 */
 
1804
}
 
1805
 
1674
1806
void select_create::store_values(List<Item> &values)
1675
1807
{
1676
 
  fill_record(session, field, values, true);
 
1808
  fill_record(session, field, values, 1);
1677
1809
}
1678
1810
 
1679
1811
 
1680
1812
void select_create::send_error(uint32_t errcode,const char *err)
1681
1813
{
 
1814
 
 
1815
 
1682
1816
  /*
1683
1817
    This will execute any rollbacks that are necessary before writing
1684
1818
    the transcation cache.
1690
1824
    written to the binary log.
1691
1825
 
1692
1826
  */
 
1827
  tmp_disable_binlog(session);
1693
1828
  select_insert::send_error(errcode, err);
 
1829
  reenable_binlog(session);
1694
1830
 
1695
1831
  return;
1696
1832
}
1710
1846
    */
1711
1847
    if (!table->s->tmp_table)
1712
1848
    {
1713
 
      TransactionServices &transaction_services= TransactionServices::singleton();
1714
 
      transaction_services.ha_autocommit_or_rollback(session, 0);
1715
 
      (void) session->endActiveTransaction();
 
1849
      ha_autocommit_or_rollback(session, 0);
 
1850
      end_active_trans(session);
1716
1851
    }
1717
1852
 
1718
 
    table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1719
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1853
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1854
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1720
1855
    if (m_plock)
1721
1856
    {
1722
1857
      mysql_unlock_tables(session, *m_plock);
1730
1865
 
1731
1866
void select_create::abort()
1732
1867
{
 
1868
 
 
1869
 
1733
1870
  /*
1734
1871
    In select_insert::abort() we roll back the statement, including
1735
1872
    truncating the transaction cache of the binary log. To do this, we
1745
1882
    of the table succeeded or not, since we need to reset the binary
1746
1883
    log state.
1747
1884
  */
 
1885
  tmp_disable_binlog(session);
1748
1886
  select_insert::abort();
 
1887
  session->transaction.stmt.modified_non_trans_table= false;
 
1888
  reenable_binlog(session);
 
1889
 
1749
1890
 
1750
1891
  if (m_plock)
1751
1892
  {
1756
1897
 
1757
1898
  if (table)
1758
1899
  {
1759
 
    table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1760
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1761
 
    if (not create_info->table_existed)
1762
 
      session->drop_open_table(table, identifier);
1763
 
    table= NULL;                                    // Safety
 
1900
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1901
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1902
    if (!create_info->table_existed)
 
1903
      drop_open_table(session, table, create_table->db, create_table->table_name);
 
1904
    table=0;                                    // Safety
1764
1905
  }
 
1906
  return;
1765
1907
}
1766
1908
 
1767
 
} /* namespace drizzled */
 
1909
 
 
1910
/*****************************************************************************
 
1911
  Instansiate templates
 
1912
*****************************************************************************/
 
1913
 
 
1914
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
1915
template class List_iterator_fast<List_item>;
 
1916
#endif /* HAVE_EXPLICIT_TEMPLATE_INSTANTIATION */