~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_insert.cc

  • Committer: Brian Aker
  • Date: 2009-07-16 22:37:01 UTC
  • mto: This revision was merged to the branch mainline in revision 1100.
  • Revision ID: brian@gaz-20090716223701-vbbbo8dmgd2ljqqo
Refactor TableShare has to be behind class.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
/* Insert of records */
18
18
 
19
 
#include "config.h"
 
19
#include <drizzled/server_includes.h>
20
20
#include <drizzled/sql_select.h>
21
21
#include <drizzled/show.h>
22
22
#include <drizzled/error.h>
26
26
#include <drizzled/sql_load.h>
27
27
#include <drizzled/field/timestamp.h>
28
28
#include <drizzled/lock.h>
29
 
#include "drizzled/sql_table.h"
30
 
#include "drizzled/pthread_globals.h"
31
 
#include "drizzled/transaction_services.h"
32
 
#include "drizzled/plugin/transactional_storage_engine.h"
33
 
 
34
 
namespace drizzled
35
 
{
36
 
 
37
 
extern plugin::StorageEngine *heap_engine;
38
 
extern plugin::StorageEngine *myisam_engine;
 
29
 
39
30
 
40
31
/*
41
32
  Check if insert fields are correct.
165
156
      Unmark the timestamp field so that we can check if this is modified
166
157
      by update_fields
167
158
    */
168
 
    timestamp_mark= table->write_set->testAndClear(table->timestamp_field->field_index);
 
159
    timestamp_mark= bitmap_test_and_clear(table->write_set,
 
160
                                          table->timestamp_field->field_index);
169
161
  }
170
162
 
171
163
  /* Check the fields we are going to modify */
247
239
  upgrade_lock_type(session, &table_list->lock_type, duplic,
248
240
                    values_list.elements > 1);
249
241
 
250
 
  if (session->openTablesLock(table_list))
251
 
  {
252
 
    DRIZZLE_INSERT_DONE(1, 0);
253
 
    return true;
254
 
  }
 
242
  if (session->open_and_lock_tables(table_list))
 
243
    return(true);
255
244
 
256
245
  lock_type= table_list->lock_type;
257
246
 
307
296
  ctx_state.restore_state(context, table_list);
308
297
 
309
298
  /*
310
 
    Fill in the given fields and dump it to the table cursor
 
299
    Fill in the given fields and dump it to the table file
311
300
  */
312
301
  memset(&info, 0, sizeof(info));
313
302
  info.ignore= ignore;
321
310
    to NULL.
322
311
  */
323
312
  session->count_cuted_fields= ((values_list.elements == 1 &&
324
 
                                 !ignore) ?
325
 
                                CHECK_FIELD_ERROR_FOR_NULL :
326
 
                                CHECK_FIELD_WARN);
 
313
                             !ignore) ?
 
314
                            CHECK_FIELD_ERROR_FOR_NULL :
 
315
                            CHECK_FIELD_WARN);
327
316
  session->cuted_fields = 0L;
328
317
  table->next_number_field=table->found_next_number_field;
329
318
 
330
319
  error=0;
331
320
  session->set_proc_info("update");
332
321
  if (duplic == DUP_REPLACE)
333
 
    table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
322
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
334
323
  if (duplic == DUP_UPDATE)
335
 
    table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
324
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
336
325
  {
337
326
    if (duplic != DUP_ERROR || ignore)
338
 
      table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
339
 
    table->cursor->ha_start_bulk_insert(values_list.elements);
 
327
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
328
    table->file->ha_start_bulk_insert(values_list.elements);
340
329
  }
341
330
 
342
331
 
349
338
    if (fields.elements || !value_count)
350
339
    {
351
340
      table->restoreRecordAsDefault();  // Get empty record
352
 
      if (fill_record(session, fields, *values))
 
341
      if (fill_record(session, fields, *values, 0))
353
342
      {
354
343
        if (values_list.elements != 1 && ! session->is_error())
355
344
        {
367
356
    }
368
357
    else
369
358
    {
370
 
      table->restoreRecordAsDefault();  // Get empty record
371
 
 
372
 
      if (fill_record(session, table->field, *values))
 
359
      if (session->used_tables)                 // Column used in values()
 
360
        table->restoreRecordAsDefault();        // Get empty record
 
361
      else
 
362
      {
 
363
        /*
 
364
          Fix delete marker. No need to restore rest of record since it will
 
365
          be overwritten by fill_record() anyway (and fill_record() does not
 
366
          use default values in this case).
 
367
        */
 
368
        table->record[0][0]= table->s->default_values[0];
 
369
      }
 
370
      if (fill_record(session, table->field, *values, 0))
373
371
      {
374
372
        if (values_list.elements != 1 && ! session->is_error())
375
373
        {
382
380
    }
383
381
 
384
382
    // Release latches in case bulk insert takes a long time
385
 
    plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
 
383
    ha_release_temporary_latches(session);
386
384
 
387
385
    error=write_record(session, table ,&info);
388
386
    if (error)
402
400
      Do not do this release if this is a delayed insert, it would steal
403
401
      auto_inc values from the delayed_insert thread as they share Table.
404
402
    */
405
 
    table->cursor->ha_release_auto_increment();
406
 
    if (table->cursor->ha_end_bulk_insert() && !error)
 
403
    table->file->ha_release_auto_increment();
 
404
    if (table->file->ha_end_bulk_insert() && !error)
407
405
    {
408
 
      table->print_error(errno,MYF(0));
 
406
      table->file->print_error(my_errno,MYF(0));
409
407
      error=1;
410
408
    }
411
409
    if (duplic != DUP_ERROR || ignore)
412
 
      table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
410
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
413
411
 
414
 
    transactional_table= table->cursor->has_transactions();
 
412
    transactional_table= table->file->has_transactions();
415
413
 
416
414
    changed= (info.copied || info.deleted || info.updated);
417
 
    if ((changed && error <= 0) || session->transaction.stmt.hasModifiedNonTransData())
 
415
    if ((changed && error <= 0) || session->transaction.stmt.modified_non_trans_table)
418
416
    {
419
 
      if (session->transaction.stmt.hasModifiedNonTransData())
420
 
        session->transaction.all.markModifiedNonTransData();
 
417
      if (session->transaction.stmt.modified_non_trans_table)
 
418
        session->transaction.all.modified_non_trans_table= true;
421
419
    }
422
 
    assert(transactional_table || !changed || session->transaction.stmt.hasModifiedNonTransData());
 
420
    assert(transactional_table || !changed || session->transaction.stmt.modified_non_trans_table);
423
421
 
424
422
  }
425
423
  session->set_proc_info("end");
443
441
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
444
442
  table->auto_increment_field_not_null= false;
445
443
  if (duplic == DUP_REPLACE)
446
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
444
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
447
445
 
448
446
  if (error)
449
447
    goto abort;
450
448
  if (values_list.elements == 1 && (!(session->options & OPTION_WARNINGS) ||
451
449
                                    !session->cuted_fields))
452
450
  {
453
 
    session->row_count_func= info.copied + info.deleted + info.updated;
454
 
    session->my_ok((ulong) session->row_count_func,
455
 
                   info.copied + info.deleted + info.touched, id);
 
451
    session->row_count_func= info.copied + info.deleted +
 
452
                         ((session->client_capabilities & CLIENT_FOUND_ROWS) ?
 
453
                          info.touched : info.updated);
 
454
    session->my_ok((ulong) session->row_count_func, id);
456
455
  }
457
456
  else
458
457
  {
459
458
    char buff[160];
 
459
    ha_rows updated=((session->client_capabilities & CLIENT_FOUND_ROWS) ?
 
460
                     info.touched : info.updated);
460
461
    if (ignore)
461
 
      snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
 
462
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
462
463
              (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
463
464
    else
464
 
      snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
465
 
              (ulong) (info.deleted + info.updated), (ulong) session->cuted_fields);
466
 
    session->row_count_func= info.copied + info.deleted + info.updated;
467
 
    session->my_ok((ulong) session->row_count_func,
468
 
                   info.copied + info.deleted + info.touched, id, buff);
 
465
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
 
466
              (ulong) (info.deleted + updated), (ulong) session->cuted_fields);
 
467
    session->row_count_func= info.copied + info.deleted + updated;
 
468
    session->my_ok((ulong) session->row_count_func, id, buff);
469
469
  }
470
470
  session->abort_on_warning= 0;
471
 
  DRIZZLE_INSERT_DONE(0, session->row_count_func);
472
 
  return false;
 
471
  DRIZZLE_INSERT_END();
 
472
  return(false);
473
473
 
474
474
abort:
475
475
  if (table != NULL)
476
 
    table->cursor->ha_release_auto_increment();
 
476
    table->file->ha_release_auto_increment();
477
477
  if (!joins_freed)
478
478
    free_underlaid_joins(session, &session->lex->select_lex);
479
479
  session->abort_on_warning= 0;
480
 
  DRIZZLE_INSERT_DONE(1, 0);
481
 
  return true;
 
480
  DRIZZLE_INSERT_END();
 
481
  return(true);
482
482
}
483
483
 
484
484
 
654
654
  if (!select_insert)
655
655
  {
656
656
    TableList *duplicate;
657
 
    if ((duplicate= unique_table(table_list, table_list->next_global, true)))
 
657
    if ((duplicate= unique_table(session, table_list, table_list->next_global, 1)))
658
658
    {
659
659
      my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->alias);
660
660
 
697
697
    then both on update triggers will work instead. Similarly both on
698
698
    delete triggers will be invoked if we will delete conflicting records.
699
699
 
700
 
    Sets session->transaction.stmt.modified_non_trans_data to true if table which is updated didn't have
 
700
    Sets session->transaction.stmt.modified_non_trans_table to true if table which is updated didn't have
701
701
    transactions.
702
702
 
703
703
  RETURN VALUE
710
710
{
711
711
  int error;
712
712
  char *key=0;
713
 
  MyBitmap *save_read_set, *save_write_set;
714
 
  uint64_t prev_insert_id= table->cursor->next_insert_id;
 
713
  MY_BITMAP *save_read_set, *save_write_set;
 
714
  uint64_t prev_insert_id= table->file->next_insert_id;
715
715
  uint64_t insert_id_for_cur_row= 0;
716
716
 
717
717
 
721
721
 
722
722
  if (info->handle_duplicates == DUP_REPLACE || info->handle_duplicates == DUP_UPDATE)
723
723
  {
724
 
    while ((error=table->cursor->ha_write_row(table->record[0])))
 
724
    while ((error=table->file->ha_write_row(table->record[0])))
725
725
    {
726
726
      uint32_t key_nr;
727
727
      /*
731
731
        the autogenerated value to avoid session->insert_id_for_cur_row to become
732
732
        0.
733
733
      */
734
 
      if (table->cursor->insert_id_for_cur_row > 0)
735
 
        insert_id_for_cur_row= table->cursor->insert_id_for_cur_row;
 
734
      if (table->file->insert_id_for_cur_row > 0)
 
735
        insert_id_for_cur_row= table->file->insert_id_for_cur_row;
736
736
      else
737
 
        table->cursor->insert_id_for_cur_row= insert_id_for_cur_row;
 
737
        table->file->insert_id_for_cur_row= insert_id_for_cur_row;
738
738
      bool is_duplicate_key_error;
739
 
      if (table->cursor->is_fatal_error(error, HA_CHECK_DUP))
 
739
      if (table->file->is_fatal_error(error, HA_CHECK_DUP))
740
740
        goto err;
741
 
      is_duplicate_key_error= table->cursor->is_fatal_error(error, 0);
 
741
      is_duplicate_key_error= table->file->is_fatal_error(error, 0);
742
742
      if (!is_duplicate_key_error)
743
743
      {
744
744
        /*
750
750
          goto gok_or_after_err; /* Ignoring a not fatal error, return 0 */
751
751
        goto err;
752
752
      }
753
 
      if ((int) (key_nr = table->get_dup_key(error)) < 0)
 
753
      if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
754
754
      {
755
755
        error= HA_ERR_FOUND_DUPP_KEY;         /* Database can't find key */
756
756
        goto err;
767
767
          key_nr == table->s->next_number_index &&
768
768
          (insert_id_for_cur_row > 0))
769
769
        goto err;
770
 
      if (table->cursor->getEngine()->check_flag(HTON_BIT_DUPLICATE_POS))
 
770
      if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
771
771
      {
772
 
        if (table->cursor->rnd_pos(table->record[1],table->cursor->dup_ref))
 
772
        if (table->file->rnd_pos(table->record[1],table->file->dup_ref))
773
773
          goto err;
774
774
      }
775
775
      else
776
776
      {
777
 
        if (table->cursor->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
 
777
        if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
778
778
        {
779
 
          error=errno;
 
779
          error=my_errno;
780
780
          goto err;
781
781
        }
782
782
 
789
789
          }
790
790
        }
791
791
        key_copy((unsigned char*) key,table->record[0],table->key_info+key_nr,0);
792
 
        if ((error=(table->cursor->index_read_idx_map(table->record[1],key_nr,
 
792
        if ((error=(table->file->index_read_idx_map(table->record[1],key_nr,
793
793
                                                    (unsigned char*) key, HA_WHOLE_KEY,
794
794
                                                    HA_READ_KEY_EXACT))))
795
795
          goto err;
811
811
                                                 info->ignore))
812
812
          goto before_err;
813
813
 
814
 
        table->cursor->restore_auto_increment(prev_insert_id);
 
814
        table->file->restore_auto_increment(prev_insert_id);
815
815
        if (table->next_number_field)
816
 
          table->cursor->adjust_next_insert_id_after_explicit_value(
 
816
          table->file->adjust_next_insert_id_after_explicit_value(
817
817
            table->next_number_field->val_int());
818
818
        info->touched++;
819
 
        if ((table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ) &&
 
819
        if ((table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ &&
820
820
             !bitmap_is_subset(table->write_set, table->read_set)) ||
821
821
            table->compare_record())
822
822
        {
823
 
          if ((error=table->cursor->ha_update_row(table->record[1],
 
823
          if ((error=table->file->ha_update_row(table->record[1],
824
824
                                                table->record[0])) &&
825
825
              error != HA_ERR_RECORD_IS_THE_SAME)
826
826
          {
827
827
            if (info->ignore &&
828
 
                !table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
 
828
                !table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
829
829
            {
830
830
              goto gok_or_after_err;
831
831
            }
843
843
            Except if LAST_INSERT_ID(#) was in the INSERT query, which is
844
844
            handled separately by Session::arg_of_last_insert_id_function.
845
845
          */
846
 
          insert_id_for_cur_row= table->cursor->insert_id_for_cur_row= 0;
 
846
          insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
847
847
          info->copied++;
848
848
        }
849
849
 
850
850
        if (table->next_number_field)
851
 
          table->cursor->adjust_next_insert_id_after_explicit_value(
 
851
          table->file->adjust_next_insert_id_after_explicit_value(
852
852
            table->next_number_field->val_int());
853
853
        info->touched++;
854
854
 
871
871
          ON UPDATE triggers.
872
872
        */
873
873
        if (last_uniq_key(table,key_nr) &&
874
 
            !table->cursor->referenced_by_foreign_key() &&
 
874
            !table->file->referenced_by_foreign_key() &&
875
875
            (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
876
876
             table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
877
877
        {
878
 
          if ((error=table->cursor->ha_update_row(table->record[1],
 
878
          if ((error=table->file->ha_update_row(table->record[1],
879
879
                                                table->record[0])) &&
880
880
              error != HA_ERR_RECORD_IS_THE_SAME)
881
881
            goto err;
883
883
            info->deleted++;
884
884
          else
885
885
            error= 0;
886
 
          session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
 
886
          session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
887
887
          /*
888
888
            Since we pretend that we have done insert we should call
889
889
            its after triggers.
892
892
        }
893
893
        else
894
894
        {
895
 
          if ((error=table->cursor->ha_delete_row(table->record[1])))
 
895
          if ((error=table->file->ha_delete_row(table->record[1])))
896
896
            goto err;
897
897
          info->deleted++;
898
 
          if (!table->cursor->has_transactions())
899
 
            session->transaction.stmt.markModifiedNonTransData();
 
898
          if (!table->file->has_transactions())
 
899
            session->transaction.stmt.modified_non_trans_table= true;
900
900
          /* Let us attempt do write_row() once more */
901
901
        }
902
902
      }
903
903
    }
904
 
    session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
 
904
    session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
905
905
    /*
906
906
      Restore column maps if they where replaced during an duplicate key
907
907
      problem.
910
910
        table->write_set != save_write_set)
911
911
      table->column_bitmaps_set(save_read_set, save_write_set);
912
912
  }
913
 
  else if ((error=table->cursor->ha_write_row(table->record[0])))
 
913
  else if ((error=table->file->ha_write_row(table->record[0])))
914
914
  {
915
915
    if (!info->ignore ||
916
 
        table->cursor->is_fatal_error(error, HA_CHECK_DUP))
 
916
        table->file->is_fatal_error(error, HA_CHECK_DUP))
917
917
      goto err;
918
 
    table->cursor->restore_auto_increment(prev_insert_id);
 
918
    table->file->restore_auto_increment(prev_insert_id);
919
919
    goto gok_or_after_err;
920
920
  }
921
921
 
922
922
after_n_copied_inc:
923
923
  info->copied++;
924
 
  session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
 
924
  session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
925
925
 
926
926
gok_or_after_err:
927
927
  if (key)
928
928
    free(key);
929
 
  if (!table->cursor->has_transactions())
930
 
    session->transaction.stmt.markModifiedNonTransData();
 
929
  if (!table->file->has_transactions())
 
930
    session->transaction.stmt.modified_non_trans_table= true;
931
931
  return(0);
932
932
 
933
933
err:
935
935
  /* current_select is NULL if this is a delayed insert */
936
936
  if (session->lex->current_select)
937
937
    session->lex->current_select->no_error= 0;        // Give error
938
 
  table->print_error(error,MYF(0));
 
938
  table->file->print_error(error,MYF(0));
939
939
 
940
940
before_err:
941
 
  table->cursor->restore_auto_increment(prev_insert_id);
 
941
  table->file->restore_auto_increment(prev_insert_id);
942
942
  if (key)
943
943
    free(key);
944
944
  table->column_bitmaps_set(save_read_set, save_write_set);
1153
1153
    Is table which we are changing used somewhere in other parts of
1154
1154
    query
1155
1155
  */
1156
 
  if (unique_table(table_list, table_list->next_global))
 
1156
  if (unique_table(session, table_list, table_list->next_global, 0))
1157
1157
  {
1158
1158
    /* Using same table for INSERT and SELECT */
1159
1159
    lex->current_select->options|= OPTION_BUFFER_RESULT;
1170
1170
      We won't start bulk inserts at all if this statement uses functions or
1171
1171
      should invoke triggers since they may access to the same table too.
1172
1172
    */
1173
 
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
1173
    table->file->ha_start_bulk_insert((ha_rows) 0);
1174
1174
  }
1175
1175
  table->restoreRecordAsDefault();              // Get empty record
1176
1176
  table->next_number_field=table->found_next_number_field;
1177
1177
 
1178
1178
  session->cuted_fields=0;
1179
1179
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
1180
 
    table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
1180
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
1181
1181
  if (info.handle_duplicates == DUP_REPLACE)
1182
 
    table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
1182
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1183
1183
  if (info.handle_duplicates == DUP_UPDATE)
1184
 
    table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
1184
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1185
1185
  session->abort_on_warning= !info.ignore;
1186
1186
  table->mark_columns_needed_for_insert();
1187
1187
 
1210
1210
{
1211
1211
 
1212
1212
  if (session->lex->current_select->options & OPTION_BUFFER_RESULT)
1213
 
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
1213
    table->file->ha_start_bulk_insert((ha_rows) 0);
1214
1214
  return(0);
1215
1215
}
1216
1216
 
1228
1228
  {
1229
1229
    table->next_number_field=0;
1230
1230
    table->auto_increment_field_not_null= false;
1231
 
    table->cursor->ha_reset();
 
1231
    table->file->ha_reset();
1232
1232
  }
1233
1233
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
1234
1234
  session->abort_on_warning= 0;
1254
1254
    return(1);
1255
1255
 
1256
1256
  // Release latches in case bulk insert takes a long time
1257
 
  plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
 
1257
  ha_release_temporary_latches(session);
1258
1258
 
1259
1259
  error= write_record(session, table, &info);
1260
1260
 
1295
1295
void select_insert::store_values(List<Item> &values)
1296
1296
{
1297
1297
  if (fields->elements)
1298
 
    fill_record(session, *fields, values, true);
 
1298
    fill_record(session, *fields, values, 1);
1299
1299
  else
1300
 
    fill_record(session, table->field, values, true);
 
1300
    fill_record(session, table->field, values, 1);
1301
1301
}
1302
1302
 
1303
1303
void select_insert::send_error(uint32_t errcode,const char *err)
1313
1313
bool select_insert::send_eof()
1314
1314
{
1315
1315
  int error;
1316
 
  bool const trans_table= table->cursor->has_transactions();
 
1316
  bool const trans_table= table->file->has_transactions();
1317
1317
  uint64_t id;
1318
1318
  bool changed;
1319
1319
 
1320
 
  error= table->cursor->ha_end_bulk_insert();
1321
 
  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1322
 
  table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1320
  error= table->file->ha_end_bulk_insert();
 
1321
  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1322
  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1323
1323
 
1324
1324
  if ((changed= (info.copied || info.deleted || info.updated)))
1325
1325
  {
1327
1327
      We must invalidate the table in the query cache before binlog writing
1328
1328
      and ha_autocommit_or_rollback.
1329
1329
    */
1330
 
    if (session->transaction.stmt.hasModifiedNonTransData())
1331
 
      session->transaction.all.markModifiedNonTransData();
 
1330
    if (session->transaction.stmt.modified_non_trans_table)
 
1331
      session->transaction.all.modified_non_trans_table= true;
1332
1332
  }
1333
1333
  assert(trans_table || !changed ||
1334
 
              session->transaction.stmt.hasModifiedNonTransData());
 
1334
              session->transaction.stmt.modified_non_trans_table);
1335
1335
 
1336
 
  table->cursor->ha_release_auto_increment();
 
1336
  table->file->ha_release_auto_increment();
1337
1337
 
1338
1338
  if (error)
1339
1339
  {
1340
 
    table->print_error(error,MYF(0));
1341
 
    DRIZZLE_INSERT_SELECT_DONE(error, 0);
1342
 
    return 1;
 
1340
    table->file->print_error(error,MYF(0));
 
1341
    return(1);
1343
1342
  }
1344
1343
  char buff[160];
1345
1344
  if (info.ignore)
1346
 
    snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
 
1345
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
1347
1346
            (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
1348
1347
  else
1349
 
    snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
 
1348
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
1350
1349
            (ulong) (info.deleted+info.updated), (ulong) session->cuted_fields);
1351
 
  session->row_count_func= info.copied + info.deleted + info.updated;
 
1350
  session->row_count_func= info.copied + info.deleted +
 
1351
                       ((session->client_capabilities & CLIENT_FOUND_ROWS) ?
 
1352
                        info.touched : info.updated);
1352
1353
 
1353
1354
  id= (session->first_successful_insert_id_in_cur_stmt > 0) ?
1354
1355
    session->first_successful_insert_id_in_cur_stmt :
1355
1356
    (session->arg_of_last_insert_id_function ?
1356
1357
     session->first_successful_insert_id_in_prev_stmt :
1357
1358
     (info.copied ? autoinc_value_of_last_inserted_row : 0));
1358
 
  session->my_ok((ulong) session->row_count_func,
1359
 
                 info.copied + info.deleted + info.touched, id, buff);
1360
 
  DRIZZLE_INSERT_SELECT_DONE(0, session->row_count_func);
1361
 
  return 0;
 
1359
  session->my_ok((ulong) session->row_count_func, id, buff);
 
1360
  return(0);
1362
1361
}
1363
1362
 
1364
1363
void select_insert::abort() {
1374
1373
  {
1375
1374
    bool changed, transactional_table;
1376
1375
 
1377
 
    table->cursor->ha_end_bulk_insert();
 
1376
    table->file->ha_end_bulk_insert();
1378
1377
 
1379
1378
    /*
1380
1379
      If at least one row has been inserted/modified and will stay in
1391
1390
      zero, so no check for that is made.
1392
1391
    */
1393
1392
    changed= (info.copied || info.deleted || info.updated);
1394
 
    transactional_table= table->cursor->has_transactions();
 
1393
    transactional_table= table->file->has_transactions();
1395
1394
    assert(transactional_table || !changed ||
1396
 
                session->transaction.stmt.hasModifiedNonTransData());
1397
 
    table->cursor->ha_release_auto_increment();
1398
 
  }
1399
 
 
1400
 
  if (DRIZZLE_INSERT_SELECT_DONE_ENABLED())
1401
 
  {
1402
 
    DRIZZLE_INSERT_SELECT_DONE(0, info.copied + info.deleted + info.updated);
 
1395
                session->transaction.stmt.modified_non_trans_table);
 
1396
    table->file->ha_release_auto_increment();
1403
1397
  }
1404
1398
 
1405
1399
  return;
1436
1430
  NOTES
1437
1431
    This function behaves differently for base and temporary tables:
1438
1432
    - For base table we assume that either table exists and was pre-opened
1439
 
      and locked at openTablesLock() stage (and in this case we just
 
1433
      and locked at open_and_lock_tables() stage (and in this case we just
1440
1434
      emit error or warning and return pre-opened Table object) or special
1441
1435
      placeholder was put in table cache that guarantees that this table
1442
1436
      won't be created or opened until the placeholder will be removed
1454
1448
 
1455
1449
static Table *create_table_from_items(Session *session, HA_CREATE_INFO *create_info,
1456
1450
                                      TableList *create_table,
1457
 
                                      message::Table &table_proto,
1458
 
                                      AlterInfo *alter_info,
 
1451
                                      drizzled::message::Table *table_proto,
 
1452
                                      Alter_info *alter_info,
1459
1453
                                      List<Item> *items,
1460
 
                                      bool is_if_not_exists,
1461
 
                                      DRIZZLE_LOCK **lock,
1462
 
                                      TableIdentifier &identifier)
 
1454
                                      DRIZZLE_LOCK **lock)
1463
1455
{
1464
1456
  Table tmp_table;              // Used during 'CreateField()'
1465
1457
  TableShare share;
1471
1463
  Field *tmp_field;
1472
1464
  bool not_used;
1473
1465
 
1474
 
  if (not (identifier.isTmp()) && create_table->table->db_stat)
 
1466
  if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
 
1467
      create_table->table->db_stat)
1475
1468
  {
1476
 
    /* Table already exists and was open at openTablesLock() stage. */
1477
 
    if (is_if_not_exists)
 
1469
    /* Table already exists and was open at open_and_lock_tables() stage. */
 
1470
    if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
1478
1471
    {
1479
1472
      create_info->table_existed= 1;            // Mark that table existed
1480
1473
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1493
1486
 
1494
1487
  tmp_table.s->db_create_options=0;
1495
1488
  tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
1496
 
 
1497
 
  if (not table_proto.engine().name().compare("MyISAM"))
1498
 
    tmp_table.s->db_low_byte_first= true;
1499
 
  else if (not table_proto.engine().name().compare("MEMORY"))
1500
 
    tmp_table.s->db_low_byte_first= true;
1501
 
 
 
1489
  tmp_table.s->db_low_byte_first=
 
1490
        test(create_info->db_type == myisam_engine ||
 
1491
             create_info->db_type == heap_engine);
1502
1492
  tmp_table.null_row= false;
1503
1493
  tmp_table.maybe_null= false;
1504
1494
 
1513
1503
        field= item->tmp_table_field_from_field_type(&tmp_table, 0);
1514
1504
    else
1515
1505
      field= create_tmp_field(session, &tmp_table, item, item->type(),
1516
 
                              (Item ***) 0, &tmp_field, &def_field, false,
1517
 
                              false, false, 0);
 
1506
                              (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0,
 
1507
                              0);
1518
1508
    if (!field ||
1519
1509
        !(cr_field=new CreateField(field,(item->type() == Item::FIELD_ITEM ?
1520
1510
                                           ((Item_field *)item)->field :
1533
1523
    should not cause deadlocks or races.
1534
1524
  */
1535
1525
  {
1536
 
    if (not mysql_create_table_no_lock(session,
1537
 
                                       identifier,
1538
 
                                       create_info,
1539
 
                                       table_proto,
1540
 
                                       alter_info,
1541
 
                                       false,
1542
 
                                       select_field_count,
1543
 
                                       is_if_not_exists))
 
1526
    if (!mysql_create_table_no_lock(session, create_table->db,
 
1527
                                    create_table->table_name,
 
1528
                                    create_info,
 
1529
                                    table_proto,
 
1530
                                    alter_info, false,
 
1531
                                    select_field_count))
1544
1532
    {
1545
 
      if (create_info->table_existed && not identifier.isTmp())
 
1533
      if (create_info->table_existed &&
 
1534
          !(create_info->options & HA_LEX_CREATE_TMP_TABLE))
1546
1535
      {
1547
1536
        /*
1548
1537
          This means that someone created table underneath server
1553
1542
        return NULL;
1554
1543
      }
1555
1544
 
1556
 
      if (not identifier.isTmp())
 
1545
      if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
1557
1546
      {
1558
1547
        pthread_mutex_lock(&LOCK_open); /* CREATE TABLE... has found that the table already exists for insert and is adapting to use it */
1559
1548
        if (session->reopen_name_locked_table(create_table, false))
1560
1549
        {
1561
 
          quick_rm_table(*session, identifier);
 
1550
          quick_rm_table(create_info->db_type, create_table->db,
 
1551
                         create_table->table_name, false);
1562
1552
        }
1563
1553
        else
1564
1554
          table= create_table->table;
1566
1556
      }
1567
1557
      else
1568
1558
      {
1569
 
        if (not (table= session->openTable(create_table, (bool*) 0,
1570
 
                                           DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
1571
 
            not create_info->table_existed)
 
1559
        if (!(table= session->open_table(create_table, (bool*) 0,
 
1560
                                         DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
 
1561
            !create_info->table_existed)
1572
1562
        {
1573
1563
          /*
1574
1564
            This shouldn't happen as creation of temporary table should make
1593
1583
      *lock= 0;
1594
1584
    }
1595
1585
 
1596
 
    if (not create_info->table_existed)
1597
 
      session->drop_open_table(table, identifier);
 
1586
    if (!create_info->table_existed)
 
1587
      session->drop_open_table(table, create_table->db, create_table->table_name);
1598
1588
    return NULL;
1599
1589
  }
1600
1590
 
1607
1597
{
1608
1598
  DRIZZLE_LOCK *extra_lock= NULL;
1609
1599
  /*
1610
 
    For replication, the CREATE-SELECT statement is written
1611
 
    in two pieces: the first transaction messsage contains 
1612
 
    the CREATE TABLE statement as a CreateTableStatement message
1613
 
    necessary to create the table.
1614
 
    
1615
 
    The second transaction message contains all the InsertStatement
1616
 
    and associated InsertRecords that should go into the table.
 
1600
    For row-based replication, the CREATE-SELECT statement is written
 
1601
    in two pieces: the first one contain the CREATE TABLE statement
 
1602
    necessary to create the table and the second part contain the rows
 
1603
    that should go into the table.
 
1604
 
 
1605
    For non-temporary tables, the start of the CREATE-SELECT
 
1606
    implicitly commits the previous transaction, and all events
 
1607
    forming the statement will be stored the transaction cache. At end
 
1608
    of the statement, the entire statement is committed as a
 
1609
    transaction, and all events are written to the binary log.
 
1610
 
 
1611
    On the master, the table is locked for the duration of the
 
1612
    statement, but since the CREATE part is replicated as a simple
 
1613
    statement, there is no way to lock the table for accesses on the
 
1614
    slave.  Hence, we have to hold on to the CREATE part of the
 
1615
    statement until the statement has finished.
1617
1616
   */
1618
1617
 
1619
1618
  unit= u;
1620
1619
 
1621
 
  if (not (table= create_table_from_items(session, create_info, create_table,
1622
 
                                          table_proto,
1623
 
                                          alter_info, &values,
1624
 
                                          is_if_not_exists,
1625
 
                                          &extra_lock, identifier)))
 
1620
  /*
 
1621
    Start a statement transaction before the create if we are using
 
1622
    row-based replication for the statement.  If we are creating a
 
1623
    temporary table, we need to start a statement transaction.
 
1624
  */
 
1625
 
 
1626
  if (!(table= create_table_from_items(session, create_info, create_table,
 
1627
                                       table_proto,
 
1628
                                       alter_info, &values,
 
1629
                                       &extra_lock)))
1626
1630
    return(-1);                         // abort() deletes table
1627
1631
 
1628
1632
  if (extra_lock)
1629
1633
  {
1630
1634
    assert(m_plock == NULL);
1631
1635
 
1632
 
    if (identifier.isTmp())
 
1636
    if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
1633
1637
      m_plock= &m_lock;
1634
1638
    else
1635
1639
      m_plock= &session->extra_lock;
1657
1661
  table->restoreRecordAsDefault();      // Get empty record
1658
1662
  session->cuted_fields=0;
1659
1663
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
1660
 
    table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
1664
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
1661
1665
  if (info.handle_duplicates == DUP_REPLACE)
1662
 
    table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
1666
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1663
1667
  if (info.handle_duplicates == DUP_UPDATE)
1664
 
    table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1665
 
  table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
1668
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
1669
  table->file->ha_start_bulk_insert((ha_rows) 0);
1666
1670
  session->abort_on_warning= !info.ignore;
1667
1671
  if (check_that_all_fields_are_given_values(session, table, table_list))
1668
1672
    return(1);
1669
1673
  table->mark_columns_needed_for_insert();
1670
 
  table->cursor->extra(HA_EXTRA_WRITE_CACHE);
 
1674
  table->file->extra(HA_EXTRA_WRITE_CACHE);
1671
1675
  return(0);
1672
1676
}
1673
1677
 
1674
1678
void select_create::store_values(List<Item> &values)
1675
1679
{
1676
 
  fill_record(session, field, values, true);
 
1680
  fill_record(session, field, values, 1);
1677
1681
}
1678
1682
 
1679
1683
 
1680
1684
void select_create::send_error(uint32_t errcode,const char *err)
1681
1685
{
 
1686
 
 
1687
 
1682
1688
  /*
1683
1689
    This will execute any rollbacks that are necessary before writing
1684
1690
    the transcation cache.
1710
1716
    */
1711
1717
    if (!table->s->tmp_table)
1712
1718
    {
1713
 
      TransactionServices &transaction_services= TransactionServices::singleton();
1714
 
      transaction_services.ha_autocommit_or_rollback(session, 0);
 
1719
      ha_autocommit_or_rollback(session, 0);
1715
1720
      (void) session->endActiveTransaction();
1716
1721
    }
1717
1722
 
1718
 
    table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1719
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1723
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1724
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1720
1725
    if (m_plock)
1721
1726
    {
1722
1727
      mysql_unlock_tables(session, *m_plock);
1730
1735
 
1731
1736
void select_create::abort()
1732
1737
{
 
1738
 
 
1739
 
1733
1740
  /*
1734
1741
    In select_insert::abort() we roll back the statement, including
1735
1742
    truncating the transaction cache of the binary log. To do this, we
1746
1753
    log state.
1747
1754
  */
1748
1755
  select_insert::abort();
 
1756
  session->transaction.stmt.modified_non_trans_table= false;
 
1757
 
1749
1758
 
1750
1759
  if (m_plock)
1751
1760
  {
1756
1765
 
1757
1766
  if (table)
1758
1767
  {
1759
 
    table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1760
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1761
 
    if (not create_info->table_existed)
1762
 
      session->drop_open_table(table, identifier);
 
1768
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1769
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1770
    if (!create_info->table_existed)
 
1771
      session->drop_open_table(table, create_table->db, create_table->table_name);
1763
1772
    table= NULL;                                    // Safety
1764
1773
  }
1765
1774
}
1766
1775
 
1767
 
} /* namespace drizzled */
 
1776
 
 
1777
/*****************************************************************************
 
1778
  Instansiate templates
 
1779
*****************************************************************************/
 
1780
 
 
1781
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
1782
template class List_iterator_fast<List_item>;
 
1783
#endif /* HAVE_EXPLICIT_TEMPLATE_INSTANTIATION */