12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
17
/* Insert of records */
19
19
#include "config.h"
21
20
#include <drizzled/sql_select.h>
22
21
#include <drizzled/show.h>
23
22
#include <drizzled/error.h>
25
24
#include <drizzled/probes.h>
26
25
#include <drizzled/sql_base.h>
27
26
#include <drizzled/sql_load.h>
28
#include <drizzled/field/epoch.h>
27
#include <drizzled/field/timestamp.h>
29
28
#include <drizzled/lock.h>
30
29
#include "drizzled/sql_table.h"
31
30
#include "drizzled/pthread_globals.h"
32
31
#include "drizzled/transaction_services.h"
33
32
#include "drizzled/plugin/transactional_storage_engine.h"
35
#include "drizzled/table/shell.h"
272
261
value_count= values->elements;
274
if (prepare_insert(session, table_list, table, fields, values,
263
if (mysql_prepare_insert(session, table_list, table, fields, values,
275
264
update_fields, update_values, duplic, &unused_conds,
277
266
(fields.elements || !value_count ||
278
267
(0) != 0), !ignore))
281
table->cursor->ha_release_auto_increment();
283
free_underlaid_joins(session, &session->lex->select_lex);
284
session->setAbortOnWarning(false);
285
DRIZZLE_INSERT_DONE(1, 0);
289
270
/* mysql_prepare_insert set table_list->table if it was not set */
290
271
table= table_list->table;
315
296
if (values->elements != value_count)
317
298
my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
320
table->cursor->ha_release_auto_increment();
322
free_underlaid_joins(session, &session->lex->select_lex);
323
session->setAbortOnWarning(false);
324
DRIZZLE_INSERT_DONE(1, 0);
328
301
if (setup_fields(session, 0, *values, MARK_COLUMNS_READ, 0, 0))
331
table->cursor->ha_release_auto_increment();
333
free_underlaid_joins(session, &session->lex->select_lex);
334
session->setAbortOnWarning(false);
335
DRIZZLE_INSERT_DONE(1, 0);
478
446
table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
483
table->cursor->ha_release_auto_increment();
485
free_underlaid_joins(session, &session->lex->select_lex);
486
session->setAbortOnWarning(false);
487
DRIZZLE_INSERT_DONE(1, 0);
491
450
if (values_list.elements == 1 && (!(session->options & OPTION_WARNINGS) ||
492
451
!session->cuted_fields))
494
453
session->row_count_func= info.copied + info.deleted + info.updated;
495
session->my_ok((ulong) session->rowCount(),
454
session->my_ok((ulong) session->row_count_func,
496
455
info.copied + info.deleted + info.touched, id);
502
snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
461
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
503
462
(ulong) (info.records - info.copied), (ulong) session->cuted_fields);
505
snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
464
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
506
465
(ulong) (info.deleted + info.updated), (ulong) session->cuted_fields);
507
466
session->row_count_func= info.copied + info.deleted + info.updated;
508
session->my_ok((ulong) session->rowCount(),
467
session->my_ok((ulong) session->row_count_func,
509
468
info.copied + info.deleted + info.touched, id, buff);
511
session->status_var.inserted_row_count+= session->rowCount();
512
session->setAbortOnWarning(false);
513
DRIZZLE_INSERT_DONE(0, session->rowCount());
470
session->abort_on_warning= 0;
471
DRIZZLE_INSERT_DONE(0, session->row_count_func);
476
table->cursor->ha_release_auto_increment();
478
free_underlaid_joins(session, &session->lex->select_lex);
479
session->abort_on_warning= 0;
480
DRIZZLE_INSERT_DONE(1, 0);
659
625
if (!res && check_fields)
661
bool saved_abort_on_warning= session->abortOnWarning();
663
session->setAbortOnWarning(abort_on_warning);
627
bool saved_abort_on_warning= session->abort_on_warning;
628
session->abort_on_warning= abort_on_warning;
664
629
res= check_that_all_fields_are_given_values(session,
666
631
context->table_list->table,
667
632
context->table_list);
668
session->setAbortOnWarning(saved_abort_on_warning);
633
session->abort_on_warning= saved_abort_on_warning;
671
636
if (!res && duplic == DUP_UPDATE)
745
int write_record(Session *session, Table *table,CopyInfo *info)
709
int write_record(Session *session, Table *table,COPY_INFO *info)
748
std::vector<unsigned char> key;
749
boost::dynamic_bitset<> *save_read_set, *save_write_set;
713
MyBitmap *save_read_set, *save_write_set;
750
714
uint64_t prev_insert_id= table->cursor->next_insert_id;
751
715
uint64_t insert_id_for_cur_row= 0;
801
765
if (info->handle_duplicates == DUP_REPLACE &&
802
766
table->next_number_field &&
803
key_nr == table->getShare()->next_number_index &&
767
key_nr == table->s->next_number_index &&
804
768
(insert_id_for_cur_row > 0))
806
770
if (table->cursor->getEngine()->check_flag(HTON_BIT_DUPLICATE_POS))
808
if (table->cursor->rnd_pos(table->getUpdateRecord(),table->cursor->dup_ref))
772
if (table->cursor->rnd_pos(table->record[1],table->cursor->dup_ref))
821
key.resize(table->getShare()->max_unique_length);
785
if (!(key=(char*) malloc(table->s->max_unique_length)))
823
key_copy(&key[0], table->getInsertRecord(), table->key_info+key_nr, 0);
824
if ((error=(table->cursor->index_read_idx_map(table->getUpdateRecord(),key_nr,
825
&key[0], HA_WHOLE_KEY,
791
key_copy((unsigned char*) key,table->record[0],table->key_info+key_nr,0);
792
if ((error=(table->cursor->index_read_idx_map(table->record[1],key_nr,
793
(unsigned char*) key, HA_WHOLE_KEY,
826
794
HA_READ_KEY_EXACT))))
848
816
table->cursor->adjust_next_insert_id_after_explicit_value(
849
817
table->next_number_field->val_int());
852
if (! table->records_are_comparable() || table->compare_records())
819
if ((table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ) &&
820
!bitmap_is_subset(table->write_set, table->read_set)) ||
821
table->compare_record())
854
if ((error=table->cursor->updateRecord(table->getUpdateRecord(),
855
table->getInsertRecord())) &&
823
if ((error=table->cursor->ha_update_row(table->record[1],
824
table->record[0])) &&
856
825
error != HA_ERR_RECORD_IS_THE_SAME)
858
827
if (info->ignore &&
1070
1043
List<Item> *update_fields,
1071
1044
List<Item> *update_values,
1072
1045
enum_duplicates duplic,
1073
bool ignore_check_option_errors) :
1074
table_list(table_list_par), table(table_par), fields(fields_par),
1075
autoinc_value_of_last_inserted_row(0),
1076
insert_into_view(table_list_par && 0 != 0)
1046
bool ignore_check_option_errors)
1047
:table_list(table_list_par), table(table_par), fields(fields_par),
1048
autoinc_value_of_last_inserted_row(0),
1049
insert_into_view(table_list_par && 0 != 0)
1051
memset(&info, 0, sizeof(info));
1078
1052
info.handle_duplicates= duplic;
1079
1053
info.ignore= ignore_check_option_errors;
1080
1054
info.update_fields= update_fields;
1106
1080
if (!res && fields->elements)
1108
bool saved_abort_on_warning= session->abortOnWarning();
1109
session->setAbortOnWarning(not info.ignore);
1082
bool saved_abort_on_warning= session->abort_on_warning;
1083
session->abort_on_warning= !info.ignore;
1110
1084
res= check_that_all_fields_are_given_values(session, table_list->table,
1112
session->setAbortOnWarning(saved_abort_on_warning);
1086
session->abort_on_warning= saved_abort_on_warning;
1115
1089
if (info.handle_duplicates == DUP_UPDATE && !res)
1202
1176
table->next_number_field=table->found_next_number_field;
1204
1178
session->cuted_fields=0;
1206
1179
if (info.ignore || info.handle_duplicates != DUP_ERROR)
1207
1180
table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
1209
1181
if (info.handle_duplicates == DUP_REPLACE)
1210
1182
table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1212
1183
if (info.handle_duplicates == DUP_UPDATE)
1213
1184
table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1215
session->setAbortOnWarning(not info.ignore);
1185
session->abort_on_warning= !info.ignore;
1216
1186
table->mark_columns_needed_for_insert();
1274
1244
if (unit->offset_limit_cnt)
1275
1245
{ // using limit offset,count
1276
1246
unit->offset_limit_cnt--;
1280
1250
session->count_cuted_fields= CHECK_FIELD_WARN; // Calculate cuted fields
1281
1251
store_values(values);
1282
1252
session->count_cuted_fields= CHECK_FIELD_IGNORE;
1283
1253
if (session->is_error())
1286
1256
// Release latches in case bulk insert takes a long time
1287
1257
plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
1289
1259
error= write_record(session, table, &info);
1290
table->auto_increment_field_not_null= false;
1328
1297
if (fields->elements)
1329
1298
fill_record(session, *fields, values, true);
1331
fill_record(session, table->getFields(), values, true);
1300
fill_record(session, table->field, values, true);
1334
void select_insert::send_error(drizzled::error_t errcode,const char *err)
1303
void select_insert::send_error(uint32_t errcode,const char *err)
1336
1307
my_message(errcode, err, MYF(0));
1371
1344
char buff[160];
1372
1345
if (info.ignore)
1373
snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
1346
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
1374
1347
(ulong) (info.records - info.copied), (ulong) session->cuted_fields);
1376
snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
1349
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
1377
1350
(ulong) (info.deleted+info.updated), (ulong) session->cuted_fields);
1378
1351
session->row_count_func= info.copied + info.deleted + info.updated;
1382
1355
(session->arg_of_last_insert_id_function ?
1383
1356
session->first_successful_insert_id_in_prev_stmt :
1384
1357
(info.copied ? autoinc_value_of_last_inserted_row : 0));
1385
session->my_ok((ulong) session->rowCount(),
1358
session->my_ok((ulong) session->row_count_func,
1386
1359
info.copied + info.deleted + info.touched, id, buff);
1387
session->status_var.inserted_row_count+= session->rowCount();
1388
DRIZZLE_INSERT_SELECT_DONE(0, session->rowCount());
1360
DRIZZLE_INSERT_SELECT_DONE(0, session->row_count_func);
1483
1455
static Table *create_table_from_items(Session *session, HA_CREATE_INFO *create_info,
1484
1456
TableList *create_table,
1485
message::Table &table_proto,
1457
message::Table *table_proto,
1486
1458
AlterInfo *alter_info,
1487
1459
List<Item> *items,
1488
1460
bool is_if_not_exists,
1490
identifier::Table::const_reference identifier)
1461
DRIZZLE_LOCK **lock)
1492
TableShare share(message::Table::INTERNAL);
1463
Table tmp_table; // Used during 'CreateField()'
1493
1466
uint32_t select_field_count= items->elements;
1494
1467
/* Add selected items to field list */
1495
1468
List_iterator_fast<Item> it(*items);
1497
1470
Field *tmp_field;
1499
if (not (identifier.isTmp()) && create_table->table->db_stat)
1473
bool lex_identified_temp_table= (table_proto->type() == message::Table::TEMPORARY);
1475
if (!(lex_identified_temp_table) &&
1476
create_table->table->db_stat)
1501
1478
/* Table already exists and was open at openTablesLock() stage. */
1502
1479
if (is_if_not_exists)
1504
1481
create_info->table_existed= 1; // Mark that table existed
1505
1482
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1506
1483
ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
1507
create_table->getTableName());
1484
create_table->table_name);
1508
1485
return create_table->table;
1511
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
1488
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
1493
tmp_table.timestamp_field= 0;
1494
tmp_table.s= &share;
1496
tmp_table.s->db_create_options=0;
1497
tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
1498
tmp_table.s->db_low_byte_first=
1499
test(create_info->db_type == myisam_engine ||
1500
create_info->db_type == heap_engine);
1501
tmp_table.null_row= false;
1502
tmp_table.maybe_null= false;
1516
table::Shell tmp_table(share); // Used during 'CreateField()'
1518
if (not table_proto.engine().name().compare("MyISAM"))
1519
tmp_table.getMutableShare()->db_low_byte_first= true;
1520
else if (not table_proto.engine().name().compare("MEMORY"))
1521
tmp_table.getMutableShare()->db_low_byte_first= true;
1523
tmp_table.in_use= session;
1527
CreateField *cr_field;
1528
Field *field, *def_field;
1529
if (item->type() == Item::FUNC_ITEM)
1531
if (item->result_type() != STRING_RESULT)
1533
field= item->tmp_table_field(&tmp_table);
1537
field= item->tmp_table_field_from_field_type(&tmp_table, 0);
1506
CreateField *cr_field;
1507
Field *field, *def_field;
1508
if (item->type() == Item::FUNC_ITEM)
1509
if (item->result_type() != STRING_RESULT)
1510
field= item->tmp_table_field(&tmp_table);
1542
field= create_tmp_field(session, &tmp_table, item, item->type(),
1543
(Item ***) 0, &tmp_field, &def_field, false,
1548
!(cr_field=new CreateField(field,(item->type() == Item::FIELD_ITEM ?
1549
((Item_field *)item)->field :
1555
if (item->maybe_null)
1557
cr_field->flags &= ~NOT_NULL_FLAG;
1560
alter_info->create_list.push_back(cr_field);
1512
field= item->tmp_table_field_from_field_type(&tmp_table, 0);
1514
field= create_tmp_field(session, &tmp_table, item, item->type(),
1515
(Item ***) 0, &tmp_field, &def_field, false,
1518
!(cr_field=new CreateField(field,(item->type() == Item::FIELD_ITEM ?
1519
((Item_field *)item)->field :
1522
if (item->maybe_null)
1523
cr_field->flags &= ~NOT_NULL_FLAG;
1524
alter_info->create_list.push_back(cr_field);
1527
TableIdentifier identifier(create_table->db,
1528
create_table->table_name,
1529
lex_identified_temp_table ? TEMP_TABLE :
1565
1534
Create and lock table.
1568
1537
creating base table on which name we have exclusive lock. So code below
1569
1538
should not cause deadlocks or races.
1573
if (not create_table_no_lock(session,
1541
if (!mysql_create_table_no_lock(session,
1582
if (create_info->table_existed && not identifier.isTmp())
1550
if (create_info->table_existed &&
1551
!(lex_identified_temp_table))
1585
1554
This means that someone created table underneath server
1586
1555
or it was created via different mysqld front-end to the
1587
1556
cluster. We don't have much options but throw an error.
1589
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
1558
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
1593
if (not identifier.isTmp())
1562
if (!(lex_identified_temp_table))
1595
/* CREATE TABLE... has found that the table already exists for insert and is adapting to use it */
1596
boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex());
1598
if (create_table->table)
1564
pthread_mutex_lock(&LOCK_open); /* CREATE TABLE... has found that the table already exists for insert and is adapting to use it */
1565
if (session->reopen_name_locked_table(create_table, false))
1600
table::Concurrent *concurrent_table= static_cast<table::Concurrent *>(create_table->table);
1602
if (concurrent_table->reopen_name_locked_table(create_table, session))
1604
(void)plugin::StorageEngine::dropTable(*session, identifier);
1608
table= create_table->table;
1567
quick_rm_table(*session, identifier);
1613
(void)plugin::StorageEngine::dropTable(*session, identifier);
1570
table= create_table->table;
1571
pthread_mutex_unlock(&LOCK_open);
1618
if (not (table= session->openTable(create_table, (bool*) 0,
1619
DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
1620
not create_info->table_existed)
1575
if (!(table= session->openTable(create_table, (bool*) 0,
1576
DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
1577
!create_info->table_existed)
1623
1580
This shouldn't happen as creation of temporary table should make
1624
1581
it preparable for open. But let us do close_temporary_table() here
1627
session->drop_temporary_table(identifier);
1584
session->drop_temporary_table(create_table);
1631
if (not table) // open failed
1588
if (!table) // open failed
1635
1592
table->reginfo.lock_type=TL_WRITE;
1636
if (not ((*lock)= session->lockTables(&table, 1, DRIZZLE_LOCK_IGNORE_FLUSH)))
1593
if (! ((*lock)= mysql_lock_tables(session, &table, 1,
1594
DRIZZLE_LOCK_IGNORE_FLUSH, ¬_used)))
1640
session->unlockTables(*lock);
1598
mysql_unlock_tables(session, *lock);
1644
if (not create_info->table_existed)
1645
session->drop_open_table(table, identifier);
1602
if (!create_info->table_existed)
1603
session->drop_open_table(table, create_table->db, create_table->table_name);
1654
1612
select_create::prepare(List<Item> &values, Select_Lex_Unit *u)
1656
DrizzleLock *extra_lock= NULL;
1614
bool lex_identified_temp_table= (table_proto->type() == message::Table::TEMPORARY);
1616
DRIZZLE_LOCK *extra_lock= NULL;
1658
For replication, the CREATE-SELECT statement is written
1659
in two pieces: the first transaction messsage contains
1660
the CREATE TABLE statement as a CreateTableStatement message
1661
necessary to create the table.
1663
The second transaction message contains all the InsertStatement
1664
and associated InsertRecords that should go into the table.
1618
For row-based replication, the CREATE-SELECT statement is written
1619
in two pieces: the first one contain the CREATE TABLE statement
1620
necessary to create the table and the second part contain the rows
1621
that should go into the table.
1623
For non-temporary tables, the start of the CREATE-SELECT
1624
implicitly commits the previous transaction, and all events
1625
forming the statement will be stored the transaction cache. At end
1626
of the statement, the entire statement is committed as a
1627
transaction, and all events are written to the binary log.
1629
On the master, the table is locked for the duration of the
1630
statement, but since the CREATE part is replicated as a simple
1631
statement, there is no way to lock the table for accesses on the
1632
slave. Hence, we have to hold on to the CREATE part of the
1633
statement until the statement has finished.
1669
if (not (table= create_table_from_items(session, create_info, create_table,
1671
alter_info, &values,
1673
&extra_lock, identifier)))
1639
Start a statement transaction before the create if we are using
1640
row-based replication for the statement. If we are creating a
1641
temporary table, we need to start a statement transaction.
1644
if (!(table= create_table_from_items(session, create_info, create_table,
1646
alter_info, &values,
1675
1649
return(-1); // abort() deletes table
1678
1651
if (extra_lock)
1680
1653
assert(m_plock == NULL);
1682
if (identifier.isTmp())
1655
if (lex_identified_temp_table)
1683
1656
m_plock= &m_lock;
1685
1658
m_plock= &session->extra_lock;
1687
1660
*m_plock= extra_lock;
1690
if (table->getShare()->sizeFields() < values.elements)
1663
if (table->s->fields < values.elements)
1692
1665
my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
1696
1669
/* First field to copy */
1697
field= table->getFields() + table->getShare()->sizeFields() - values.elements;
1670
field= table->field+table->s->fields - values.elements;
1699
1672
/* Mark all fields that are given values */
1700
1673
for (Field **f= field ; *f ; f++)
1702
table->setWriteSet((*f)->position());
1674
table->setWriteSet((*f)->field_index);
1705
1676
/* Don't set timestamp if used */
1706
1677
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
1710
1681
session->cuted_fields=0;
1711
1682
if (info.ignore || info.handle_duplicates != DUP_ERROR)
1712
1683
table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
1714
1684
if (info.handle_duplicates == DUP_REPLACE)
1715
1685
table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1717
1686
if (info.handle_duplicates == DUP_UPDATE)
1718
1687
table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1720
1688
table->cursor->ha_start_bulk_insert((ha_rows) 0);
1721
session->setAbortOnWarning(not info.ignore);
1689
session->abort_on_warning= !info.ignore;
1722
1690
if (check_that_all_fields_are_given_values(session, table, table_list))
1725
1692
table->mark_columns_needed_for_insert();
1726
1693
table->cursor->extra(HA_EXTRA_WRITE_CACHE);