~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_insert.cc

Merge Stewart's dead code removal

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
/* Insert of records */
18
18
 
19
 
#include "config.h"
 
19
#include <drizzled/server_includes.h>
20
20
#include <drizzled/sql_select.h>
21
21
#include <drizzled/show.h>
22
22
#include <drizzled/error.h>
26
26
#include <drizzled/sql_load.h>
27
27
#include <drizzled/field/timestamp.h>
28
28
#include <drizzled/lock.h>
29
 
#include "drizzled/sql_table.h"
30
 
#include "drizzled/pthread_globals.h"
31
 
#include "drizzled/transaction_services.h"
32
 
#include "drizzled/plugin/transactional_storage_engine.h"
33
 
 
34
 
namespace drizzled
35
 
{
36
 
 
37
 
extern plugin::StorageEngine *heap_engine;
38
 
extern plugin::StorageEngine *myisam_engine;
 
29
 
 
30
using namespace drizzled;
39
31
 
40
32
/*
41
33
  Check if insert fields are correct.
307
299
  ctx_state.restore_state(context, table_list);
308
300
 
309
301
  /*
310
 
    Fill in the given fields and dump it to the table cursor
 
302
    Fill in the given fields and dump it to the table file
311
303
  */
312
304
  memset(&info, 0, sizeof(info));
313
305
  info.ignore= ignore;
321
313
    to NULL.
322
314
  */
323
315
  session->count_cuted_fields= ((values_list.elements == 1 &&
324
 
                                 !ignore) ?
325
 
                                CHECK_FIELD_ERROR_FOR_NULL :
326
 
                                CHECK_FIELD_WARN);
 
316
                             !ignore) ?
 
317
                            CHECK_FIELD_ERROR_FOR_NULL :
 
318
                            CHECK_FIELD_WARN);
327
319
  session->cuted_fields = 0L;
328
320
  table->next_number_field=table->found_next_number_field;
329
321
 
330
322
  error=0;
331
323
  session->set_proc_info("update");
332
324
  if (duplic == DUP_REPLACE)
333
 
    table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
325
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
334
326
  if (duplic == DUP_UPDATE)
335
 
    table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
327
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
336
328
  {
337
329
    if (duplic != DUP_ERROR || ignore)
338
 
      table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
339
 
    table->cursor->ha_start_bulk_insert(values_list.elements);
 
330
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
331
    table->file->ha_start_bulk_insert(values_list.elements);
340
332
  }
341
333
 
342
334
 
349
341
    if (fields.elements || !value_count)
350
342
    {
351
343
      table->restoreRecordAsDefault();  // Get empty record
352
 
      if (fill_record(session, fields, *values))
 
344
      if (fill_record(session, fields, *values, 0))
353
345
      {
354
346
        if (values_list.elements != 1 && ! session->is_error())
355
347
        {
367
359
    }
368
360
    else
369
361
    {
370
 
      table->restoreRecordAsDefault();  // Get empty record
371
 
 
372
 
      if (fill_record(session, table->field, *values))
 
362
      if (session->used_tables)                 // Column used in values()
 
363
        table->restoreRecordAsDefault();        // Get empty record
 
364
      else
 
365
      {
 
366
        /*
 
367
          Fix delete marker. No need to restore rest of record since it will
 
368
          be overwritten by fill_record() anyway (and fill_record() does not
 
369
          use default values in this case).
 
370
        */
 
371
        table->record[0][0]= table->s->default_values[0];
 
372
      }
 
373
      if (fill_record(session, table->field, *values, 0))
373
374
      {
374
375
        if (values_list.elements != 1 && ! session->is_error())
375
376
        {
382
383
    }
383
384
 
384
385
    // Release latches in case bulk insert takes a long time
385
 
    plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
 
386
    plugin::StorageEngine::releaseTemporaryLatches(session);
386
387
 
387
388
    error=write_record(session, table ,&info);
388
389
    if (error)
402
403
      Do not do this release if this is a delayed insert, it would steal
403
404
      auto_inc values from the delayed_insert thread as they share Table.
404
405
    */
405
 
    table->cursor->ha_release_auto_increment();
406
 
    if (table->cursor->ha_end_bulk_insert() && !error)
 
406
    table->file->ha_release_auto_increment();
 
407
    if (table->file->ha_end_bulk_insert() && !error)
407
408
    {
408
 
      table->print_error(errno,MYF(0));
 
409
      table->file->print_error(my_errno,MYF(0));
409
410
      error=1;
410
411
    }
411
412
    if (duplic != DUP_ERROR || ignore)
412
 
      table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
413
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
413
414
 
414
 
    transactional_table= table->cursor->has_transactions();
 
415
    transactional_table= table->file->has_transactions();
415
416
 
416
417
    changed= (info.copied || info.deleted || info.updated);
417
 
    if ((changed && error <= 0) || session->transaction.stmt.hasModifiedNonTransData())
 
418
    if ((changed && error <= 0) || session->transaction.stmt.modified_non_trans_table)
418
419
    {
419
 
      if (session->transaction.stmt.hasModifiedNonTransData())
420
 
        session->transaction.all.markModifiedNonTransData();
 
420
      if (session->transaction.stmt.modified_non_trans_table)
 
421
        session->transaction.all.modified_non_trans_table= true;
421
422
    }
422
 
    assert(transactional_table || !changed || session->transaction.stmt.hasModifiedNonTransData());
 
423
    assert(transactional_table || !changed || session->transaction.stmt.modified_non_trans_table);
423
424
 
424
425
  }
425
426
  session->set_proc_info("end");
443
444
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
444
445
  table->auto_increment_field_not_null= false;
445
446
  if (duplic == DUP_REPLACE)
446
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
447
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
447
448
 
448
449
  if (error)
449
450
    goto abort;
458
459
  {
459
460
    char buff[160];
460
461
    if (ignore)
461
 
      snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
 
462
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
462
463
              (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
463
464
    else
464
 
      snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
 
465
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
465
466
              (ulong) (info.deleted + info.updated), (ulong) session->cuted_fields);
466
467
    session->row_count_func= info.copied + info.deleted + info.updated;
467
468
    session->my_ok((ulong) session->row_count_func,
473
474
 
474
475
abort:
475
476
  if (table != NULL)
476
 
    table->cursor->ha_release_auto_increment();
 
477
    table->file->ha_release_auto_increment();
477
478
  if (!joins_freed)
478
479
    free_underlaid_joins(session, &session->lex->select_lex);
479
480
  session->abort_on_warning= 0;
654
655
  if (!select_insert)
655
656
  {
656
657
    TableList *duplicate;
657
 
    if ((duplicate= unique_table(table_list, table_list->next_global, true)))
 
658
    if ((duplicate= unique_table(session, table_list, table_list->next_global, 1)))
658
659
    {
659
660
      my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->alias);
660
661
 
697
698
    then both on update triggers will work instead. Similarly both on
698
699
    delete triggers will be invoked if we will delete conflicting records.
699
700
 
700
 
    Sets session->transaction.stmt.modified_non_trans_data to true if table which is updated didn't have
 
701
    Sets session->transaction.stmt.modified_non_trans_table to true if table which is updated didn't have
701
702
    transactions.
702
703
 
703
704
  RETURN VALUE
711
712
  int error;
712
713
  char *key=0;
713
714
  MyBitmap *save_read_set, *save_write_set;
714
 
  uint64_t prev_insert_id= table->cursor->next_insert_id;
 
715
  uint64_t prev_insert_id= table->file->next_insert_id;
715
716
  uint64_t insert_id_for_cur_row= 0;
716
717
 
717
718
 
721
722
 
722
723
  if (info->handle_duplicates == DUP_REPLACE || info->handle_duplicates == DUP_UPDATE)
723
724
  {
724
 
    while ((error=table->cursor->ha_write_row(table->record[0])))
 
725
    while ((error=table->file->ha_write_row(table->record[0])))
725
726
    {
726
727
      uint32_t key_nr;
727
728
      /*
731
732
        the autogenerated value to avoid session->insert_id_for_cur_row to become
732
733
        0.
733
734
      */
734
 
      if (table->cursor->insert_id_for_cur_row > 0)
735
 
        insert_id_for_cur_row= table->cursor->insert_id_for_cur_row;
 
735
      if (table->file->insert_id_for_cur_row > 0)
 
736
        insert_id_for_cur_row= table->file->insert_id_for_cur_row;
736
737
      else
737
 
        table->cursor->insert_id_for_cur_row= insert_id_for_cur_row;
 
738
        table->file->insert_id_for_cur_row= insert_id_for_cur_row;
738
739
      bool is_duplicate_key_error;
739
 
      if (table->cursor->is_fatal_error(error, HA_CHECK_DUP))
 
740
      if (table->file->is_fatal_error(error, HA_CHECK_DUP))
740
741
        goto err;
741
 
      is_duplicate_key_error= table->cursor->is_fatal_error(error, 0);
 
742
      is_duplicate_key_error= table->file->is_fatal_error(error, 0);
742
743
      if (!is_duplicate_key_error)
743
744
      {
744
745
        /*
750
751
          goto gok_or_after_err; /* Ignoring a not fatal error, return 0 */
751
752
        goto err;
752
753
      }
753
 
      if ((int) (key_nr = table->get_dup_key(error)) < 0)
 
754
      if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
754
755
      {
755
756
        error= HA_ERR_FOUND_DUPP_KEY;         /* Database can't find key */
756
757
        goto err;
767
768
          key_nr == table->s->next_number_index &&
768
769
          (insert_id_for_cur_row > 0))
769
770
        goto err;
770
 
      if (table->cursor->getEngine()->check_flag(HTON_BIT_DUPLICATE_POS))
 
771
      if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
771
772
      {
772
 
        if (table->cursor->rnd_pos(table->record[1],table->cursor->dup_ref))
 
773
        if (table->file->rnd_pos(table->record[1],table->file->dup_ref))
773
774
          goto err;
774
775
      }
775
776
      else
776
777
      {
777
 
        if (table->cursor->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
 
778
        if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
778
779
        {
779
 
          error=errno;
 
780
          error=my_errno;
780
781
          goto err;
781
782
        }
782
783
 
789
790
          }
790
791
        }
791
792
        key_copy((unsigned char*) key,table->record[0],table->key_info+key_nr,0);
792
 
        if ((error=(table->cursor->index_read_idx_map(table->record[1],key_nr,
 
793
        if ((error=(table->file->index_read_idx_map(table->record[1],key_nr,
793
794
                                                    (unsigned char*) key, HA_WHOLE_KEY,
794
795
                                                    HA_READ_KEY_EXACT))))
795
796
          goto err;
811
812
                                                 info->ignore))
812
813
          goto before_err;
813
814
 
814
 
        table->cursor->restore_auto_increment(prev_insert_id);
 
815
        table->file->restore_auto_increment(prev_insert_id);
815
816
        if (table->next_number_field)
816
 
          table->cursor->adjust_next_insert_id_after_explicit_value(
 
817
          table->file->adjust_next_insert_id_after_explicit_value(
817
818
            table->next_number_field->val_int());
818
819
        info->touched++;
819
 
        if ((table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ) &&
 
820
        if ((table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ &&
820
821
             !bitmap_is_subset(table->write_set, table->read_set)) ||
821
822
            table->compare_record())
822
823
        {
823
 
          if ((error=table->cursor->ha_update_row(table->record[1],
 
824
          if ((error=table->file->ha_update_row(table->record[1],
824
825
                                                table->record[0])) &&
825
826
              error != HA_ERR_RECORD_IS_THE_SAME)
826
827
          {
827
828
            if (info->ignore &&
828
 
                !table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
 
829
                !table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
829
830
            {
830
831
              goto gok_or_after_err;
831
832
            }
843
844
            Except if LAST_INSERT_ID(#) was in the INSERT query, which is
844
845
            handled separately by Session::arg_of_last_insert_id_function.
845
846
          */
846
 
          insert_id_for_cur_row= table->cursor->insert_id_for_cur_row= 0;
 
847
          insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
847
848
          info->copied++;
848
849
        }
849
850
 
850
851
        if (table->next_number_field)
851
 
          table->cursor->adjust_next_insert_id_after_explicit_value(
 
852
          table->file->adjust_next_insert_id_after_explicit_value(
852
853
            table->next_number_field->val_int());
853
854
        info->touched++;
854
855
 
871
872
          ON UPDATE triggers.
872
873
        */
873
874
        if (last_uniq_key(table,key_nr) &&
874
 
            !table->cursor->referenced_by_foreign_key() &&
 
875
            !table->file->referenced_by_foreign_key() &&
875
876
            (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
876
877
             table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
877
878
        {
878
 
          if ((error=table->cursor->ha_update_row(table->record[1],
 
879
          if ((error=table->file->ha_update_row(table->record[1],
879
880
                                                table->record[0])) &&
880
881
              error != HA_ERR_RECORD_IS_THE_SAME)
881
882
            goto err;
883
884
            info->deleted++;
884
885
          else
885
886
            error= 0;
886
 
          session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
 
887
          session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
887
888
          /*
888
889
            Since we pretend that we have done insert we should call
889
890
            its after triggers.
892
893
        }
893
894
        else
894
895
        {
895
 
          if ((error=table->cursor->ha_delete_row(table->record[1])))
 
896
          if ((error=table->file->ha_delete_row(table->record[1])))
896
897
            goto err;
897
898
          info->deleted++;
898
 
          if (!table->cursor->has_transactions())
899
 
            session->transaction.stmt.markModifiedNonTransData();
 
899
          if (!table->file->has_transactions())
 
900
            session->transaction.stmt.modified_non_trans_table= true;
900
901
          /* Let us attempt do write_row() once more */
901
902
        }
902
903
      }
903
904
    }
904
 
    session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
 
905
    session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
905
906
    /*
906
907
      Restore column maps if they where replaced during an duplicate key
907
908
      problem.
910
911
        table->write_set != save_write_set)
911
912
      table->column_bitmaps_set(save_read_set, save_write_set);
912
913
  }
913
 
  else if ((error=table->cursor->ha_write_row(table->record[0])))
 
914
  else if ((error=table->file->ha_write_row(table->record[0])))
914
915
  {
915
916
    if (!info->ignore ||
916
 
        table->cursor->is_fatal_error(error, HA_CHECK_DUP))
 
917
        table->file->is_fatal_error(error, HA_CHECK_DUP))
917
918
      goto err;
918
 
    table->cursor->restore_auto_increment(prev_insert_id);
 
919
    table->file->restore_auto_increment(prev_insert_id);
919
920
    goto gok_or_after_err;
920
921
  }
921
922
 
922
923
after_n_copied_inc:
923
924
  info->copied++;
924
 
  session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
 
925
  session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
925
926
 
926
927
gok_or_after_err:
927
928
  if (key)
928
929
    free(key);
929
 
  if (!table->cursor->has_transactions())
930
 
    session->transaction.stmt.markModifiedNonTransData();
 
930
  if (!table->file->has_transactions())
 
931
    session->transaction.stmt.modified_non_trans_table= true;
931
932
  return(0);
932
933
 
933
934
err:
935
936
  /* current_select is NULL if this is a delayed insert */
936
937
  if (session->lex->current_select)
937
938
    session->lex->current_select->no_error= 0;        // Give error
938
 
  table->print_error(error,MYF(0));
 
939
  table->file->print_error(error,MYF(0));
939
940
 
940
941
before_err:
941
 
  table->cursor->restore_auto_increment(prev_insert_id);
 
942
  table->file->restore_auto_increment(prev_insert_id);
942
943
  if (key)
943
944
    free(key);
944
945
  table->column_bitmaps_set(save_read_set, save_write_set);
1153
1154
    Is table which we are changing used somewhere in other parts of
1154
1155
    query
1155
1156
  */
1156
 
  if (unique_table(table_list, table_list->next_global))
 
1157
  if (unique_table(session, table_list, table_list->next_global, 0))
1157
1158
  {
1158
1159
    /* Using same table for INSERT and SELECT */
1159
1160
    lex->current_select->options|= OPTION_BUFFER_RESULT;
1170
1171
      We won't start bulk inserts at all if this statement uses functions or
1171
1172
      should invoke triggers since they may access to the same table too.
1172
1173
    */
1173
 
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
1174
    table->file->ha_start_bulk_insert((ha_rows) 0);
1174
1175
  }
1175
1176
  table->restoreRecordAsDefault();              // Get empty record
1176
1177
  table->next_number_field=table->found_next_number_field;
1177
1178
 
1178
1179
  session->cuted_fields=0;
1179
1180
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
1180
 
    table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
1181
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
1181
1182
  if (info.handle_duplicates == DUP_REPLACE)
1182
 
    table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
1183
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1183
1184
  if (info.handle_duplicates == DUP_UPDATE)
1184
 
    table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
1185
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1185
1186
  session->abort_on_warning= !info.ignore;
1186
1187
  table->mark_columns_needed_for_insert();
1187
1188
 
1210
1211
{
1211
1212
 
1212
1213
  if (session->lex->current_select->options & OPTION_BUFFER_RESULT)
1213
 
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
1214
    table->file->ha_start_bulk_insert((ha_rows) 0);
1214
1215
  return(0);
1215
1216
}
1216
1217
 
1228
1229
  {
1229
1230
    table->next_number_field=0;
1230
1231
    table->auto_increment_field_not_null= false;
1231
 
    table->cursor->ha_reset();
 
1232
    table->file->ha_reset();
1232
1233
  }
1233
1234
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
1234
1235
  session->abort_on_warning= 0;
1254
1255
    return(1);
1255
1256
 
1256
1257
  // Release latches in case bulk insert takes a long time
1257
 
  plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
 
1258
  plugin::StorageEngine::releaseTemporaryLatches(session);
1258
1259
 
1259
1260
  error= write_record(session, table, &info);
1260
1261
 
1295
1296
void select_insert::store_values(List<Item> &values)
1296
1297
{
1297
1298
  if (fields->elements)
1298
 
    fill_record(session, *fields, values, true);
 
1299
    fill_record(session, *fields, values, 1);
1299
1300
  else
1300
 
    fill_record(session, table->field, values, true);
 
1301
    fill_record(session, table->field, values, 1);
1301
1302
}
1302
1303
 
1303
1304
void select_insert::send_error(uint32_t errcode,const char *err)
1313
1314
bool select_insert::send_eof()
1314
1315
{
1315
1316
  int error;
1316
 
  bool const trans_table= table->cursor->has_transactions();
 
1317
  bool const trans_table= table->file->has_transactions();
1317
1318
  uint64_t id;
1318
1319
  bool changed;
1319
1320
 
1320
 
  error= table->cursor->ha_end_bulk_insert();
1321
 
  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1322
 
  table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1321
  error= table->file->ha_end_bulk_insert();
 
1322
  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1323
  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1323
1324
 
1324
1325
  if ((changed= (info.copied || info.deleted || info.updated)))
1325
1326
  {
1327
1328
      We must invalidate the table in the query cache before binlog writing
1328
1329
      and ha_autocommit_or_rollback.
1329
1330
    */
1330
 
    if (session->transaction.stmt.hasModifiedNonTransData())
1331
 
      session->transaction.all.markModifiedNonTransData();
 
1331
    if (session->transaction.stmt.modified_non_trans_table)
 
1332
      session->transaction.all.modified_non_trans_table= true;
1332
1333
  }
1333
1334
  assert(trans_table || !changed ||
1334
 
              session->transaction.stmt.hasModifiedNonTransData());
 
1335
              session->transaction.stmt.modified_non_trans_table);
1335
1336
 
1336
 
  table->cursor->ha_release_auto_increment();
 
1337
  table->file->ha_release_auto_increment();
1337
1338
 
1338
1339
  if (error)
1339
1340
  {
1340
 
    table->print_error(error,MYF(0));
 
1341
    table->file->print_error(error,MYF(0));
1341
1342
    DRIZZLE_INSERT_SELECT_DONE(error, 0);
1342
1343
    return 1;
1343
1344
  }
1344
1345
  char buff[160];
1345
1346
  if (info.ignore)
1346
 
    snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
 
1347
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
1347
1348
            (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
1348
1349
  else
1349
 
    snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
 
1350
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
1350
1351
            (ulong) (info.deleted+info.updated), (ulong) session->cuted_fields);
1351
1352
  session->row_count_func= info.copied + info.deleted + info.updated;
1352
1353
 
1374
1375
  {
1375
1376
    bool changed, transactional_table;
1376
1377
 
1377
 
    table->cursor->ha_end_bulk_insert();
 
1378
    table->file->ha_end_bulk_insert();
1378
1379
 
1379
1380
    /*
1380
1381
      If at least one row has been inserted/modified and will stay in
1391
1392
      zero, so no check for that is made.
1392
1393
    */
1393
1394
    changed= (info.copied || info.deleted || info.updated);
1394
 
    transactional_table= table->cursor->has_transactions();
 
1395
    transactional_table= table->file->has_transactions();
1395
1396
    assert(transactional_table || !changed ||
1396
 
                session->transaction.stmt.hasModifiedNonTransData());
1397
 
    table->cursor->ha_release_auto_increment();
 
1397
                session->transaction.stmt.modified_non_trans_table);
 
1398
    table->file->ha_release_auto_increment();
1398
1399
  }
1399
1400
 
1400
1401
  if (DRIZZLE_INSERT_SELECT_DONE_ENABLED())
1454
1455
 
1455
1456
static Table *create_table_from_items(Session *session, HA_CREATE_INFO *create_info,
1456
1457
                                      TableList *create_table,
1457
 
                                      message::Table &table_proto,
 
1458
                                      message::Table *table_proto,
1458
1459
                                      AlterInfo *alter_info,
1459
1460
                                      List<Item> *items,
1460
 
                                      bool is_if_not_exists,
1461
 
                                      DRIZZLE_LOCK **lock,
1462
 
                                      TableIdentifier &identifier)
 
1461
                                      DRIZZLE_LOCK **lock)
1463
1462
{
1464
1463
  Table tmp_table;              // Used during 'CreateField()'
1465
1464
  TableShare share;
1471
1470
  Field *tmp_field;
1472
1471
  bool not_used;
1473
1472
 
1474
 
  if (not (identifier.isTmp()) && create_table->table->db_stat)
 
1473
  if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
 
1474
      create_table->table->db_stat)
1475
1475
  {
1476
1476
    /* Table already exists and was open at openTablesLock() stage. */
1477
 
    if (is_if_not_exists)
 
1477
    if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
1478
1478
    {
1479
1479
      create_info->table_existed= 1;            // Mark that table existed
1480
1480
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1493
1493
 
1494
1494
  tmp_table.s->db_create_options=0;
1495
1495
  tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
1496
 
 
1497
 
  if (not table_proto.engine().name().compare("MyISAM"))
1498
 
    tmp_table.s->db_low_byte_first= true;
1499
 
  else if (not table_proto.engine().name().compare("MEMORY"))
1500
 
    tmp_table.s->db_low_byte_first= true;
1501
 
 
 
1496
  tmp_table.s->db_low_byte_first=
 
1497
        test(create_info->db_type == myisam_engine ||
 
1498
             create_info->db_type == heap_engine);
1502
1499
  tmp_table.null_row= false;
1503
1500
  tmp_table.maybe_null= false;
1504
1501
 
1513
1510
        field= item->tmp_table_field_from_field_type(&tmp_table, 0);
1514
1511
    else
1515
1512
      field= create_tmp_field(session, &tmp_table, item, item->type(),
1516
 
                              (Item ***) 0, &tmp_field, &def_field, false,
1517
 
                              false, false, 0);
 
1513
                              (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0,
 
1514
                              0);
1518
1515
    if (!field ||
1519
1516
        !(cr_field=new CreateField(field,(item->type() == Item::FIELD_ITEM ?
1520
1517
                                           ((Item_field *)item)->field :
1533
1530
    should not cause deadlocks or races.
1534
1531
  */
1535
1532
  {
1536
 
    if (not mysql_create_table_no_lock(session,
1537
 
                                       identifier,
1538
 
                                       create_info,
1539
 
                                       table_proto,
1540
 
                                       alter_info,
1541
 
                                       false,
1542
 
                                       select_field_count,
1543
 
                                       is_if_not_exists))
 
1533
    if (!mysql_create_table_no_lock(session, create_table->db,
 
1534
                                    create_table->table_name,
 
1535
                                    create_info,
 
1536
                                    table_proto,
 
1537
                                    alter_info, false,
 
1538
                                    select_field_count))
1544
1539
    {
1545
 
      if (create_info->table_existed && not identifier.isTmp())
 
1540
      if (create_info->table_existed &&
 
1541
          !(create_info->options & HA_LEX_CREATE_TMP_TABLE))
1546
1542
      {
1547
1543
        /*
1548
1544
          This means that someone created table underneath server
1553
1549
        return NULL;
1554
1550
      }
1555
1551
 
1556
 
      if (not identifier.isTmp())
 
1552
      if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
1557
1553
      {
1558
1554
        pthread_mutex_lock(&LOCK_open); /* CREATE TABLE... has found that the table already exists for insert and is adapting to use it */
1559
1555
        if (session->reopen_name_locked_table(create_table, false))
1560
1556
        {
1561
 
          quick_rm_table(*session, identifier);
 
1557
          quick_rm_table(create_info->db_type, create_table->db,
 
1558
                         create_table->table_name, false);
1562
1559
        }
1563
1560
        else
1564
1561
          table= create_table->table;
1566
1563
      }
1567
1564
      else
1568
1565
      {
1569
 
        if (not (table= session->openTable(create_table, (bool*) 0,
1570
 
                                           DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
1571
 
            not create_info->table_existed)
 
1566
        if (!(table= session->openTable(create_table, (bool*) 0,
 
1567
                                         DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
 
1568
            !create_info->table_existed)
1572
1569
        {
1573
1570
          /*
1574
1571
            This shouldn't happen as creation of temporary table should make
1593
1590
      *lock= 0;
1594
1591
    }
1595
1592
 
1596
 
    if (not create_info->table_existed)
1597
 
      session->drop_open_table(table, identifier);
 
1593
    if (!create_info->table_existed)
 
1594
      session->drop_open_table(table, create_table->db, create_table->table_name);
1598
1595
    return NULL;
1599
1596
  }
1600
1597
 
1607
1604
{
1608
1605
  DRIZZLE_LOCK *extra_lock= NULL;
1609
1606
  /*
1610
 
    For replication, the CREATE-SELECT statement is written
1611
 
    in two pieces: the first transaction messsage contains 
1612
 
    the CREATE TABLE statement as a CreateTableStatement message
1613
 
    necessary to create the table.
1614
 
    
1615
 
    The second transaction message contains all the InsertStatement
1616
 
    and associated InsertRecords that should go into the table.
 
1607
    For row-based replication, the CREATE-SELECT statement is written
 
1608
    in two pieces: the first one contain the CREATE TABLE statement
 
1609
    necessary to create the table and the second part contain the rows
 
1610
    that should go into the table.
 
1611
 
 
1612
    For non-temporary tables, the start of the CREATE-SELECT
 
1613
    implicitly commits the previous transaction, and all events
 
1614
    forming the statement will be stored the transaction cache. At end
 
1615
    of the statement, the entire statement is committed as a
 
1616
    transaction, and all events are written to the binary log.
 
1617
 
 
1618
    On the master, the table is locked for the duration of the
 
1619
    statement, but since the CREATE part is replicated as a simple
 
1620
    statement, there is no way to lock the table for accesses on the
 
1621
    slave.  Hence, we have to hold on to the CREATE part of the
 
1622
    statement until the statement has finished.
1617
1623
   */
1618
1624
 
1619
1625
  unit= u;
1620
1626
 
1621
 
  if (not (table= create_table_from_items(session, create_info, create_table,
1622
 
                                          table_proto,
1623
 
                                          alter_info, &values,
1624
 
                                          is_if_not_exists,
1625
 
                                          &extra_lock, identifier)))
 
1627
  /*
 
1628
    Start a statement transaction before the create if we are using
 
1629
    row-based replication for the statement.  If we are creating a
 
1630
    temporary table, we need to start a statement transaction.
 
1631
  */
 
1632
 
 
1633
  if (!(table= create_table_from_items(session, create_info, create_table,
 
1634
                                       table_proto,
 
1635
                                       alter_info, &values,
 
1636
                                       &extra_lock)))
1626
1637
    return(-1);                         // abort() deletes table
1627
1638
 
1628
1639
  if (extra_lock)
1629
1640
  {
1630
1641
    assert(m_plock == NULL);
1631
1642
 
1632
 
    if (identifier.isTmp())
 
1643
    if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
1633
1644
      m_plock= &m_lock;
1634
1645
    else
1635
1646
      m_plock= &session->extra_lock;
1657
1668
  table->restoreRecordAsDefault();      // Get empty record
1658
1669
  session->cuted_fields=0;
1659
1670
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
1660
 
    table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
1671
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
1661
1672
  if (info.handle_duplicates == DUP_REPLACE)
1662
 
    table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
1673
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1663
1674
  if (info.handle_duplicates == DUP_UPDATE)
1664
 
    table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1665
 
  table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
1675
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
1676
  table->file->ha_start_bulk_insert((ha_rows) 0);
1666
1677
  session->abort_on_warning= !info.ignore;
1667
1678
  if (check_that_all_fields_are_given_values(session, table, table_list))
1668
1679
    return(1);
1669
1680
  table->mark_columns_needed_for_insert();
1670
 
  table->cursor->extra(HA_EXTRA_WRITE_CACHE);
 
1681
  table->file->extra(HA_EXTRA_WRITE_CACHE);
1671
1682
  return(0);
1672
1683
}
1673
1684
 
1674
1685
void select_create::store_values(List<Item> &values)
1675
1686
{
1676
 
  fill_record(session, field, values, true);
 
1687
  fill_record(session, field, values, 1);
1677
1688
}
1678
1689
 
1679
1690
 
1680
1691
void select_create::send_error(uint32_t errcode,const char *err)
1681
1692
{
 
1693
 
 
1694
 
1682
1695
  /*
1683
1696
    This will execute any rollbacks that are necessary before writing
1684
1697
    the transcation cache.
1710
1723
    */
1711
1724
    if (!table->s->tmp_table)
1712
1725
    {
1713
 
      TransactionServices &transaction_services= TransactionServices::singleton();
1714
 
      transaction_services.ha_autocommit_or_rollback(session, 0);
 
1726
      ha_autocommit_or_rollback(session, 0);
1715
1727
      (void) session->endActiveTransaction();
1716
1728
    }
1717
1729
 
1718
 
    table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1719
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1730
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1731
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1720
1732
    if (m_plock)
1721
1733
    {
1722
1734
      mysql_unlock_tables(session, *m_plock);
1730
1742
 
1731
1743
void select_create::abort()
1732
1744
{
 
1745
 
 
1746
 
1733
1747
  /*
1734
1748
    In select_insert::abort() we roll back the statement, including
1735
1749
    truncating the transaction cache of the binary log. To do this, we
1746
1760
    log state.
1747
1761
  */
1748
1762
  select_insert::abort();
 
1763
  session->transaction.stmt.modified_non_trans_table= false;
 
1764
 
1749
1765
 
1750
1766
  if (m_plock)
1751
1767
  {
1756
1772
 
1757
1773
  if (table)
1758
1774
  {
1759
 
    table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1760
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1761
 
    if (not create_info->table_existed)
1762
 
      session->drop_open_table(table, identifier);
 
1775
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1776
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1777
    if (!create_info->table_existed)
 
1778
      session->drop_open_table(table, create_table->db, create_table->table_name);
1763
1779
    table= NULL;                                    // Safety
1764
1780
  }
1765
1781
}
1766
1782
 
1767
 
} /* namespace drizzled */
 
1783
 
 
1784
/*****************************************************************************
 
1785
  Instansiate templates
 
1786
*****************************************************************************/
 
1787
 
 
1788
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
1789
template class List_iterator_fast<List_item>;
 
1790
#endif /* HAVE_EXPLICIT_TEMPLATE_INSTANTIATION */