~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_insert.cc

  • Committer: Brian Aker
  • Date: 2010-07-15 23:18:11 UTC
  • mto: This revision was merged to the branch mainline in revision 1661.
  • Revision ID: brian@gaz-20100715231811-c5erivy1nvwf6bii
Merge in move identifier work.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
 
17
17
/* Insert of records */
32
32
#include "drizzled/transaction_services.h"
33
33
#include "drizzled/plugin/transactional_storage_engine.h"
34
34
 
35
 
#include "drizzled/table/shell.h"
36
 
 
37
35
namespace drizzled
38
36
{
39
37
 
129
127
      }
130
128
      else
131
129
      {
132
 
        table->setWriteSet(table->timestamp_field->position());
 
130
        table->setWriteSet(table->timestamp_field->field_index);
133
131
      }
134
132
    }
135
133
  }
170
168
      Unmark the timestamp field so that we can check if this is modified
171
169
      by update_fields
172
170
    */
173
 
    timestamp_mark= table->write_set->test(table->timestamp_field->position());
174
 
    table->write_set->reset(table->timestamp_field->position());
 
171
    timestamp_mark= table->write_set->testAndClear(table->timestamp_field->field_index);
175
172
  }
176
173
 
177
174
  /* Check the fields we are going to modify */
189
186
 
190
187
    if (timestamp_mark)
191
188
    {
192
 
      table->setWriteSet(table->timestamp_field->position());
 
189
      table->setWriteSet(table->timestamp_field->field_index);
193
190
    }
194
191
  }
195
192
  return 0;
241
238
  uint32_t value_count;
242
239
  ulong counter = 1;
243
240
  uint64_t id;
244
 
  CopyInfo info;
 
241
  COPY_INFO info;
245
242
  Table *table= 0;
246
243
  List_iterator_fast<List_item> its(values_list);
247
244
  List_item *values;
276
273
                           false,
277
274
                           (fields.elements || !value_count ||
278
275
                            (0) != 0), !ignore))
279
 
  {
280
 
    if (table != NULL)
281
 
      table->cursor->ha_release_auto_increment();
282
 
    if (!joins_freed)
283
 
      free_underlaid_joins(session, &session->lex->select_lex);
284
 
    session->abort_on_warning= 0;
285
 
    DRIZZLE_INSERT_DONE(1, 0);
286
 
    return true;
287
 
  }
 
276
    goto abort;
288
277
 
289
278
  /* mysql_prepare_insert set table_list->table if it was not set */
290
279
  table= table_list->table;
315
304
    if (values->elements != value_count)
316
305
    {
317
306
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
318
 
 
319
 
      if (table != NULL)
320
 
        table->cursor->ha_release_auto_increment();
321
 
      if (!joins_freed)
322
 
        free_underlaid_joins(session, &session->lex->select_lex);
323
 
      session->abort_on_warning= 0;
324
 
      DRIZZLE_INSERT_DONE(1, 0);
325
 
 
326
 
      return true;
 
307
      goto abort;
327
308
    }
328
309
    if (setup_fields(session, 0, *values, MARK_COLUMNS_READ, 0, 0))
329
 
    {
330
 
      if (table != NULL)
331
 
        table->cursor->ha_release_auto_increment();
332
 
      if (!joins_freed)
333
 
        free_underlaid_joins(session, &session->lex->select_lex);
334
 
      session->abort_on_warning= 0;
335
 
      DRIZZLE_INSERT_DONE(1, 0);
336
 
      return true;
337
 
    }
 
310
      goto abort;
338
311
  }
339
312
  its.rewind ();
340
313
 
344
317
  /*
345
318
    Fill in the given fields and dump it to the table cursor
346
319
  */
 
320
  memset(&info, 0, sizeof(info));
347
321
  info.ignore= ignore;
348
322
  info.handle_duplicates=duplic;
349
323
  info.update_fields= &update_fields;
478
452
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
479
453
 
480
454
  if (error)
481
 
  {
482
 
    if (table != NULL)
483
 
      table->cursor->ha_release_auto_increment();
484
 
    if (!joins_freed)
485
 
      free_underlaid_joins(session, &session->lex->select_lex);
486
 
    session->abort_on_warning= 0;
487
 
    DRIZZLE_INSERT_DONE(1, 0);
488
 
    return true;
489
 
  }
490
 
 
 
455
    goto abort;
491
456
  if (values_list.elements == 1 && (!(session->options & OPTION_WARNINGS) ||
492
457
                                    !session->cuted_fields))
493
458
  {
511
476
  session->status_var.inserted_row_count+= session->row_count_func;
512
477
  session->abort_on_warning= 0;
513
478
  DRIZZLE_INSERT_DONE(0, session->row_count_func);
514
 
 
515
479
  return false;
 
480
 
 
481
abort:
 
482
  if (table != NULL)
 
483
    table->cursor->ha_release_auto_increment();
 
484
  if (!joins_freed)
 
485
    free_underlaid_joins(session, &session->lex->select_lex);
 
486
  session->abort_on_warning= 0;
 
487
  DRIZZLE_INSERT_DONE(1, 0);
 
488
  return true;
516
489
}
517
490
 
518
491
 
721
694
     write_record()
722
695
      session   - thread context
723
696
      table - table to which record should be written
724
 
      info  - CopyInfo structure describing handling of duplicates
 
697
      info  - COPY_INFO structure describing handling of duplicates
725
698
              and which is used for counting number of records inserted
726
699
              and deleted.
727
700
 
740
713
*/
741
714
 
742
715
 
743
 
int write_record(Session *session, Table *table,CopyInfo *info)
 
716
int write_record(Session *session, Table *table,COPY_INFO *info)
744
717
{
745
718
  int error;
746
 
  std::vector<unsigned char> key;
747
 
  boost::dynamic_bitset<> *save_read_set, *save_write_set;
 
719
  char *key=0;
 
720
  MyBitmap *save_read_set, *save_write_set;
748
721
  uint64_t prev_insert_id= table->cursor->next_insert_id;
749
722
  uint64_t insert_id_for_cur_row= 0;
750
723
 
755
728
 
756
729
  if (info->handle_duplicates == DUP_REPLACE || info->handle_duplicates == DUP_UPDATE)
757
730
  {
758
 
    while ((error=table->cursor->insertRecord(table->getInsertRecord())))
 
731
    while ((error=table->cursor->insertRecord(table->record[0])))
759
732
    {
760
733
      uint32_t key_nr;
761
734
      /*
803
776
        goto err;
804
777
      if (table->cursor->getEngine()->check_flag(HTON_BIT_DUPLICATE_POS))
805
778
      {
806
 
        if (table->cursor->rnd_pos(table->getUpdateRecord(),table->cursor->dup_ref))
 
779
        if (table->cursor->rnd_pos(table->record[1],table->cursor->dup_ref))
807
780
          goto err;
808
781
      }
809
782
      else
814
787
          goto err;
815
788
        }
816
789
 
817
 
        if (not key.size())
 
790
        if (!key)
818
791
        {
819
 
          key.resize(table->getShare()->max_unique_length);
 
792
          if (!(key=(char*) malloc(table->getShare()->max_unique_length)))
 
793
          {
 
794
            error=ENOMEM;
 
795
            goto err;
 
796
          }
820
797
        }
821
 
        key_copy(&key[0], table->getInsertRecord(), table->key_info+key_nr, 0);
822
 
        if ((error=(table->cursor->index_read_idx_map(table->getUpdateRecord(),key_nr,
823
 
                                                    &key[0], HA_WHOLE_KEY,
 
798
        key_copy((unsigned char*) key,table->record[0],table->key_info+key_nr,0);
 
799
        if ((error=(table->cursor->index_read_idx_map(table->record[1],key_nr,
 
800
                                                    (unsigned char*) key, HA_WHOLE_KEY,
824
801
                                                    HA_READ_KEY_EXACT))))
825
802
          goto err;
826
803
      }
831
808
          that matches, is updated. If update causes a conflict again,
832
809
          an error is returned
833
810
        */
834
 
        assert(table->insert_values.size());
 
811
        assert(table->insert_values != NULL);
835
812
        table->storeRecordAsInsert();
836
813
        table->restoreRecord();
837
814
        assert(info->update_fields->elements ==
846
823
          table->cursor->adjust_next_insert_id_after_explicit_value(
847
824
            table->next_number_field->val_int());
848
825
        info->touched++;
849
 
 
850
 
        if (! table->records_are_comparable() || table->compare_records())
 
826
        if ((table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ) &&
 
827
             !bitmap_is_subset(table->write_set, table->read_set)) ||
 
828
            table->compare_record())
851
829
        {
852
 
          if ((error=table->cursor->updateRecord(table->getUpdateRecord(),
853
 
                                                table->getInsertRecord())) &&
 
830
          if ((error=table->cursor->updateRecord(table->record[1],
 
831
                                                table->record[0])) &&
854
832
              error != HA_ERR_RECORD_IS_THE_SAME)
855
833
          {
856
834
            if (info->ignore &&
904
882
            (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
905
883
             table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
906
884
        {
907
 
          if ((error=table->cursor->updateRecord(table->getUpdateRecord(),
908
 
                                                table->getInsertRecord())) &&
 
885
          if ((error=table->cursor->updateRecord(table->record[1],
 
886
                                                table->record[0])) &&
909
887
              error != HA_ERR_RECORD_IS_THE_SAME)
910
888
            goto err;
911
889
          if (error != HA_ERR_RECORD_IS_THE_SAME)
921
899
        }
922
900
        else
923
901
        {
924
 
          if ((error=table->cursor->deleteRecord(table->getUpdateRecord())))
 
902
          if ((error=table->cursor->deleteRecord(table->record[1])))
925
903
            goto err;
926
904
          info->deleted++;
927
905
          if (!table->cursor->has_transactions())
937
915
    */
938
916
    if (table->read_set != save_read_set ||
939
917
        table->write_set != save_write_set)
940
 
      table->column_bitmaps_set(*save_read_set, *save_write_set);
 
918
      table->column_bitmaps_set(save_read_set, save_write_set);
941
919
  }
942
 
  else if ((error=table->cursor->insertRecord(table->getInsertRecord())))
 
920
  else if ((error=table->cursor->insertRecord(table->record[0])))
943
921
  {
944
922
    if (!info->ignore ||
945
923
        table->cursor->is_fatal_error(error, HA_CHECK_DUP))
953
931
  session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
954
932
 
955
933
gok_or_after_err:
 
934
  if (key)
 
935
    free(key);
956
936
  if (!table->cursor->has_transactions())
957
937
    session->transaction.stmt.markModifiedNonTransData();
958
938
  return(0);
966
946
 
967
947
before_err:
968
948
  table->cursor->restore_auto_increment(prev_insert_id);
969
 
  table->column_bitmaps_set(*save_read_set, *save_write_set);
970
 
  return 1;
 
949
  if (key)
 
950
    free(key);
 
951
  table->column_bitmaps_set(save_read_set, save_write_set);
 
952
  return(1);
971
953
}
972
954
 
973
955
 
1068
1050
                             List<Item> *update_fields,
1069
1051
                             List<Item> *update_values,
1070
1052
                             enum_duplicates duplic,
1071
 
                             bool ignore_check_option_errors) :
1072
 
  table_list(table_list_par), table(table_par), fields(fields_par),
1073
 
  autoinc_value_of_last_inserted_row(0),
1074
 
  insert_into_view(table_list_par && 0 != 0)
 
1053
                             bool ignore_check_option_errors)
 
1054
  :table_list(table_list_par), table(table_par), fields(fields_par),
 
1055
   autoinc_value_of_last_inserted_row(0),
 
1056
   insert_into_view(table_list_par && 0 != 0)
1075
1057
{
 
1058
  memset(&info, 0, sizeof(info));
1076
1059
  info.handle_duplicates= duplic;
1077
1060
  info.ignore= ignore_check_option_errors;
1078
1061
  info.update_fields= update_fields;
1452
1435
      items        in     List of items which should be used to produce rest
1453
1436
                          of fields for the table (corresponding fields will
1454
1437
                          be added to the end of alter_info->create_list)
1455
 
      lock         out    Pointer to the DrizzleLock object for table created
 
1438
      lock         out    Pointer to the DRIZZLE_LOCK object for table created
1456
1439
                          (or open temporary table) will be returned in this
1457
1440
                          parameter. Since this table is not included in
1458
1441
                          Session::lock caller is responsible for explicitly
1484
1467
                                      AlterInfo *alter_info,
1485
1468
                                      List<Item> *items,
1486
1469
                                      bool is_if_not_exists,
1487
 
                                      DrizzleLock **lock,
 
1470
                                      DRIZZLE_LOCK **lock,
1488
1471
                                      TableIdentifier &identifier)
1489
1472
{
 
1473
  Table tmp_table;              // Used during 'CreateField()'
1490
1474
  TableShare share(message::Table::INTERNAL);
 
1475
  Table *table= 0;
1491
1476
  uint32_t select_field_count= items->elements;
1492
1477
  /* Add selected items to field list */
1493
1478
  List_iterator_fast<Item> it(*items);
1503
1488
      create_info->table_existed= 1;            // Mark that table existed
1504
1489
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1505
1490
                          ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
1506
 
                          create_table->getTableName());
 
1491
                          create_table->table_name);
1507
1492
      return create_table->table;
1508
1493
    }
1509
1494
 
1510
 
    my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
 
1495
    my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
1511
1496
    return NULL;
1512
1497
  }
1513
1498
 
 
1499
  tmp_table.alias= 0;
 
1500
  tmp_table.timestamp_field= 0;
 
1501
  tmp_table.setShare(&share);
 
1502
 
 
1503
  tmp_table.getMutableShare()->db_create_options= 0;
 
1504
  tmp_table.getMutableShare()->blob_ptr_size= portable_sizeof_char_ptr;
 
1505
 
 
1506
  if (not table_proto.engine().name().compare("MyISAM"))
 
1507
    tmp_table.getMutableShare()->db_low_byte_first= true;
 
1508
  else if (not table_proto.engine().name().compare("MEMORY"))
 
1509
    tmp_table.getMutableShare()->db_low_byte_first= true;
 
1510
 
 
1511
  tmp_table.null_row= false;
 
1512
  tmp_table.maybe_null= false;
 
1513
 
 
1514
  while ((item=it++))
1514
1515
  {
1515
 
    table::Shell tmp_table(share);              // Used during 'CreateField()'
1516
 
    tmp_table.timestamp_field= 0;
1517
 
 
1518
 
    tmp_table.getMutableShare()->db_create_options= 0;
1519
 
    tmp_table.getMutableShare()->blob_ptr_size= portable_sizeof_char_ptr;
1520
 
 
1521
 
    if (not table_proto.engine().name().compare("MyISAM"))
1522
 
      tmp_table.getMutableShare()->db_low_byte_first= true;
1523
 
    else if (not table_proto.engine().name().compare("MEMORY"))
1524
 
      tmp_table.getMutableShare()->db_low_byte_first= true;
1525
 
 
1526
 
    tmp_table.null_row= false;
1527
 
    tmp_table.maybe_null= false;
1528
 
 
1529
 
    tmp_table.in_use= session;
1530
 
 
1531
 
    while ((item=it++))
1532
 
    {
1533
 
      CreateField *cr_field;
1534
 
      Field *field, *def_field;
1535
 
      if (item->type() == Item::FUNC_ITEM)
1536
 
      {
1537
 
        if (item->result_type() != STRING_RESULT)
1538
 
        {
1539
 
          field= item->tmp_table_field(&tmp_table);
1540
 
        }
1541
 
        else
1542
 
        {
1543
 
          field= item->tmp_table_field_from_field_type(&tmp_table, 0);
1544
 
        }
1545
 
      }
 
1516
    CreateField *cr_field;
 
1517
    Field *field, *def_field;
 
1518
    if (item->type() == Item::FUNC_ITEM)
 
1519
      if (item->result_type() != STRING_RESULT)
 
1520
        field= item->tmp_table_field(&tmp_table);
1546
1521
      else
1547
 
      {
1548
 
        field= create_tmp_field(session, &tmp_table, item, item->type(),
1549
 
                                (Item ***) 0, &tmp_field, &def_field, false,
1550
 
                                false, false, 0);
1551
 
      }
1552
 
 
1553
 
      if (!field ||
1554
 
          !(cr_field=new CreateField(field,(item->type() == Item::FIELD_ITEM ?
1555
 
                                            ((Item_field *)item)->field :
1556
 
                                            (Field*) 0))))
1557
 
      {
1558
 
        return NULL;
1559
 
      }
1560
 
 
1561
 
      if (item->maybe_null)
1562
 
      {
1563
 
        cr_field->flags &= ~NOT_NULL_FLAG;
1564
 
      }
1565
 
 
1566
 
      alter_info->create_list.push_back(cr_field);
1567
 
    }
 
1522
        field= item->tmp_table_field_from_field_type(&tmp_table, 0);
 
1523
    else
 
1524
      field= create_tmp_field(session, &tmp_table, item, item->type(),
 
1525
                              (Item ***) 0, &tmp_field, &def_field, false,
 
1526
                              false, false, 0);
 
1527
    if (!field ||
 
1528
        !(cr_field=new CreateField(field,(item->type() == Item::FIELD_ITEM ?
 
1529
                                           ((Item_field *)item)->field :
 
1530
                                           (Field*) 0))))
 
1531
      return NULL;
 
1532
    if (item->maybe_null)
 
1533
      cr_field->flags &= ~NOT_NULL_FLAG;
 
1534
    alter_info->create_list.push_back(cr_field);
1568
1535
  }
1569
1536
 
1570
1537
  /*
1574
1541
    creating base table on which name we have exclusive lock. So code below
1575
1542
    should not cause deadlocks or races.
1576
1543
  */
1577
 
  Table *table= 0;
1578
1544
  {
1579
1545
    if (not mysql_create_table_no_lock(session,
1580
1546
                                       identifier,
1592
1558
          or it was created via different mysqld front-end to the
1593
1559
          cluster. We don't have much options but throw an error.
1594
1560
        */
1595
 
        my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
 
1561
        my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
1596
1562
        return NULL;
1597
1563
      }
1598
1564
 
1599
1565
      if (not identifier.isTmp())
1600
1566
      {
1601
 
        /* CREATE TABLE... has found that the table already exists for insert and is adapting to use it */
1602
 
        boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex());
1603
 
 
1604
 
        if (create_table->table)
 
1567
        pthread_mutex_lock(&LOCK_open); /* CREATE TABLE... has found that the table already exists for insert and is adapting to use it */
 
1568
        if (session->reopen_name_locked_table(create_table, false))
1605
1569
        {
1606
 
          table::Concurrent *concurrent_table= static_cast<table::Concurrent *>(create_table->table);
1607
 
 
1608
 
          if (concurrent_table->reopen_name_locked_table(create_table, session))
1609
 
          {
1610
 
            plugin::StorageEngine::dropTable(*session, identifier);
1611
 
          }
1612
 
          else
1613
 
          {
1614
 
            table= create_table->table;
1615
 
          }
 
1570
          quick_rm_table(*session, identifier);
1616
1571
        }
1617
1572
        else
1618
 
        {
1619
 
          plugin::StorageEngine::dropTable(*session, identifier);
1620
 
        }
 
1573
          table= create_table->table;
 
1574
        pthread_mutex_unlock(&LOCK_open);
1621
1575
      }
1622
1576
      else
1623
1577
      {
1630
1584
            it preparable for open. But let us do close_temporary_table() here
1631
1585
            just in case.
1632
1586
          */
1633
 
          session->drop_temporary_table(identifier);
 
1587
          session->drop_temporary_table(create_table);
1634
1588
        }
1635
1589
      }
1636
1590
    }
1637
 
    if (not table)                                   // open failed
 
1591
    if (!table)                                   // open failed
1638
1592
      return NULL;
1639
1593
  }
1640
1594
 
1641
1595
  table->reginfo.lock_type=TL_WRITE;
1642
 
  if (! ((*lock)= session->lockTables(&table, 1, DRIZZLE_LOCK_IGNORE_FLUSH, &not_used)))
 
1596
  if (! ((*lock)= mysql_lock_tables(session, &table, 1,
 
1597
                                    DRIZZLE_LOCK_IGNORE_FLUSH, &not_used)))
1643
1598
  {
1644
1599
    if (*lock)
1645
1600
    {
1646
 
      session->unlockTables(*lock);
 
1601
      mysql_unlock_tables(session, *lock);
1647
1602
      *lock= 0;
1648
1603
    }
1649
1604
 
1659
1614
int
1660
1615
select_create::prepare(List<Item> &values, Select_Lex_Unit *u)
1661
1616
{
1662
 
  DrizzleLock *extra_lock= NULL;
 
1617
  DRIZZLE_LOCK *extra_lock= NULL;
1663
1618
  /*
1664
1619
    For replication, the CREATE-SELECT statement is written
1665
1620
    in two pieces: the first transaction messsage contains 
1704
1659
 
1705
1660
  /* Mark all fields that are given values */
1706
1661
  for (Field **f= field ; *f ; f++)
1707
 
  {
1708
 
    table->setWriteSet((*f)->position());
1709
 
  }
 
1662
    table->setWriteSet((*f)->field_index);
1710
1663
 
1711
1664
  /* Don't set timestamp if used */
1712
1665
  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
1777
1730
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1778
1731
    if (m_plock)
1779
1732
    {
1780
 
      session->unlockTables(*m_plock);
 
1733
      mysql_unlock_tables(session, *m_plock);
1781
1734
      *m_plock= NULL;
1782
1735
      m_plock= NULL;
1783
1736
    }
1807
1760
 
1808
1761
  if (m_plock)
1809
1762
  {
1810
 
    session->unlockTables(*m_plock);
 
1763
    mysql_unlock_tables(session, *m_plock);
1811
1764
    *m_plock= NULL;
1812
1765
    m_plock= NULL;
1813
1766
  }