12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
17
/* Insert of records */
22
Drizzle has a different form of DELAYED then MySQL. DELAYED is just
23
a hint to the the sorage engine (which can then do whatever it likes.
25
#include <drizzled/server_includes.h>
21
26
#include <drizzled/sql_select.h>
22
#include <drizzled/show.h>
27
#include <drizzled/sql_show.h>
28
#include <drizzled/rpl_mi.h>
23
29
#include <drizzled/error.h>
24
#include <drizzled/name_resolution_context_state.h>
30
#include <drizzled/slave.h>
31
#include <drizzled/sql_parse.h>
25
32
#include <drizzled/probes.h>
26
#include <drizzled/sql_base.h>
27
#include <drizzled/sql_load.h>
28
#include <drizzled/field/epoch.h>
29
#include <drizzled/lock.h>
30
#include "drizzled/sql_table.h"
31
#include "drizzled/pthread_globals.h"
32
#include "drizzled/transaction_services.h"
33
#include "drizzled/plugin/transactional_storage_engine.h"
35
#include "drizzled/table/shell.h"
40
extern plugin::StorageEngine *heap_engine;
41
extern plugin::StorageEngine *myisam_engine;
33
#include <drizzled/tableop_hooks.h>
35
/* Define to force use of my_malloc() if the allocated memory block is big */
38
#define my_safe_alloca(size, min_length) my_alloca(size)
39
#define my_safe_afree(ptr, size, min_length) my_afree(ptr)
41
#define my_safe_alloca(size, min_length) ((size <= min_length) ? my_alloca(size) : malloc(size))
42
#define my_safe_afree(ptr, size, min_length) if (size > min_length) free(ptr)
44
48
Check if insert fields are correct.
354
348
For single line insert, generate an error if try to set a NOT NULL field
357
session->count_cuted_fields= ignore ? CHECK_FIELD_WARN : CHECK_FIELD_ERROR_FOR_NULL;
351
session->count_cuted_fields= ((values_list.elements == 1 &&
353
CHECK_FIELD_ERROR_FOR_NULL :
359
355
session->cuted_fields = 0L;
360
356
table->next_number_field=table->found_next_number_field;
358
if (session->slave_thread &&
359
(info.handle_duplicates == DUP_UPDATE) &&
360
(table->next_number_field != NULL) &&
361
rpl_master_has_bug(&active_mi->rli, 24432))
363
365
session->set_proc_info("update");
364
366
if (duplic == DUP_REPLACE)
365
table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
367
table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
366
368
if (duplic == DUP_UPDATE)
367
table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
369
table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
369
371
if (duplic != DUP_ERROR || ignore)
370
table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
371
table->cursor->ha_start_bulk_insert(values_list.elements);
372
table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
373
table->file->ha_start_bulk_insert(values_list.elements);
375
session->setAbortOnWarning(not ignore);
377
session->abort_on_warning= !ignore;
377
379
table->mark_columns_needed_for_insert();
434
442
Do not do this release if this is a delayed insert, it would steal
435
443
auto_inc values from the delayed_insert thread as they share Table.
437
table->cursor->ha_release_auto_increment();
438
if (table->cursor->ha_end_bulk_insert() && !error)
445
table->file->ha_release_auto_increment();
446
if (table->file->ha_end_bulk_insert() && !error)
440
table->print_error(errno,MYF(0));
448
table->file->print_error(my_errno,MYF(0));
443
451
if (duplic != DUP_ERROR || ignore)
444
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
446
transactional_table= table->cursor->has_transactions();
448
changed= (info.copied || info.deleted || info.updated);
449
if ((changed && error <= 0) || session->transaction.stmt.hasModifiedNonTransData())
451
if (session->transaction.stmt.hasModifiedNonTransData())
452
session->transaction.all.markModifiedNonTransData();
454
assert(transactional_table || !changed || session->transaction.stmt.hasModifiedNonTransData());
452
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
454
transactional_table= table->file->has_transactions();
456
if ((changed= (info.copied || info.deleted || info.updated)))
459
Invalidate the table in the query cache if something changed.
460
For the transactional algorithm to work the invalidation must be
461
before binlog writing and ha_autocommit_or_rollback
464
if ((changed && error <= 0) || session->transaction.stmt.modified_non_trans_table || was_insert_delayed)
466
if (mysql_bin_log.is_open())
471
[Guilhem wrote] Temporary errors may have filled
472
session->net.last_error/errno. For example if there has
473
been a disk full error when writing the row, and it was
474
MyISAM, then session->net.last_error/errno will be set to
475
"disk full"... and the my_pwrite() will wait until free
476
space appears, and so when it finishes then the
477
write_row() was entirely successful
479
/* todo: consider removing */
480
session->clear_error();
484
A query which per-row-loop can not be interrupted with
485
KILLED, like INSERT, and that does not invoke stored
486
routines can be binlogged with neglecting the KILLED error.
488
If there was no error (error == zero) until after the end of
489
inserting loop the KILLED flag that appeared later can be
490
disregarded since previously possible invocation of stored
491
routines did not result in any error due to the KILLED. In
492
such case the flag is ignored for constructing binlog event.
494
assert(session->killed != Session::KILL_BAD_DATA || error > 0);
495
if (session->binlog_query(Session::ROW_QUERY_TYPE,
496
session->query, session->query_length,
497
transactional_table, false,
498
(error>0) ? session->killed : Session::NOT_KILLED) &&
504
if (session->transaction.stmt.modified_non_trans_table)
505
session->transaction.all.modified_non_trans_table= true;
507
assert(transactional_table || !changed ||
508
session->transaction.stmt.modified_non_trans_table);
457
511
session->set_proc_info("end");
475
529
session->count_cuted_fields= CHECK_FIELD_IGNORE;
476
530
table->auto_increment_field_not_null= false;
477
531
if (duplic == DUP_REPLACE)
478
table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
532
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
483
table->cursor->ha_release_auto_increment();
485
free_underlaid_joins(session, &session->lex->select_lex);
486
session->setAbortOnWarning(false);
487
DRIZZLE_INSERT_DONE(1, 0);
491
536
if (values_list.elements == 1 && (!(session->options & OPTION_WARNINGS) ||
492
537
!session->cuted_fields))
494
session->row_count_func= info.copied + info.deleted + info.updated;
495
session->my_ok((ulong) session->rowCount(),
496
info.copied + info.deleted + info.touched, id);
539
session->row_count_func= info.copied + info.deleted +
540
((session->client_capabilities & CLIENT_FOUND_ROWS) ?
541
info.touched : info.updated);
542
my_ok(session, (ulong) session->row_count_func, id);
547
ha_rows updated=((session->client_capabilities & CLIENT_FOUND_ROWS) ?
548
info.touched : info.updated);
502
snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
550
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
503
551
(ulong) (info.records - info.copied), (ulong) session->cuted_fields);
505
snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
506
(ulong) (info.deleted + info.updated), (ulong) session->cuted_fields);
507
session->row_count_func= info.copied + info.deleted + info.updated;
508
session->my_ok((ulong) session->rowCount(),
509
info.copied + info.deleted + info.touched, id, buff);
553
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
554
(ulong) (info.deleted + updated), (ulong) session->cuted_fields);
555
session->row_count_func= info.copied + info.deleted + updated;
556
::my_ok(session, (ulong) session->row_count_func, id, buff);
511
session->status_var.inserted_row_count+= session->rowCount();
512
session->setAbortOnWarning(false);
513
DRIZZLE_INSERT_DONE(0, session->rowCount());
558
session->abort_on_warning= 0;
559
DRIZZLE_INSERT_END();
564
table->file->ha_release_auto_increment();
566
free_underlaid_joins(session, &session->lex->select_lex);
567
session->abort_on_warning= 0;
568
DRIZZLE_INSERT_END();
582
636
You MUST set table->insert_values to 0 after calling this function
583
637
before releasing the table object.
590
bool prepare_insert(Session *session, TableList *table_list,
644
bool mysql_prepare_insert(Session *session, TableList *table_list,
591
645
Table *table, List<Item> &fields, List_item *values,
592
646
List<Item> &update_fields, List<Item> &update_values,
593
647
enum_duplicates duplic,
648
COND **where __attribute__((unused)),
595
649
bool select_insert,
596
650
bool check_fields, bool abort_on_warning)
598
Select_Lex *select_lex= &session->lex->select_lex;
652
SELECT_LEX *select_lex= &session->lex->select_lex;
599
653
Name_resolution_context *context= &select_lex->context;
600
654
Name_resolution_context_state ctx_state;
601
655
bool insert_into_view= (0 != 0);
603
657
table_map map= 0;
605
659
/* INSERT should have a SELECT or VALUES clause */
606
660
assert (!select_insert || !values);
609
663
For subqueries in VALUES() we should not see the table in which we are
610
664
inserting (for INSERT ... SELECT this is done by changing table_list,
611
because INSERT ... SELECT share Select_Lex it with SELECT.
665
because INSERT ... SELECT share SELECT_LEX it with SELECT.
613
if (not select_insert)
615
for (Select_Lex_Unit *un= select_lex->first_inner_unit();
669
for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit();
617
671
un= un->next_unit())
619
for (Select_Lex *sl= un->first_select();
673
for (SELECT_LEX *sl= un->first_select();
621
675
sl= sl->next_select())
926
if ((error=table->cursor->deleteRecord(table->getUpdateRecord())))
983
if ((error=table->file->ha_delete_row(table->record[1])))
929
if (!table->cursor->has_transactions())
930
session->transaction.stmt.markModifiedNonTransData();
986
if (!table->file->has_transactions())
987
session->transaction.stmt.modified_non_trans_table= true;
931
988
/* Let us attempt do write_row() once more */
935
session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
992
session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
937
994
Restore column maps if they where replaced during an duplicate key
940
997
if (table->read_set != save_read_set ||
941
998
table->write_set != save_write_set)
942
table->column_bitmaps_set(*save_read_set, *save_write_set);
999
table->column_bitmaps_set(save_read_set, save_write_set);
944
else if ((error=table->cursor->insertRecord(table->getInsertRecord())))
1001
else if ((error=table->file->ha_write_row(table->record[0])))
946
1003
if (!info->ignore ||
947
table->cursor->is_fatal_error(error, HA_CHECK_DUP))
1004
table->file->is_fatal_error(error, HA_CHECK_DUP))
949
table->cursor->restore_auto_increment(prev_insert_id);
1006
table->file->restore_auto_increment(prev_insert_id);
950
1007
goto gok_or_after_err;
953
1010
after_n_copied_inc:
955
session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
1012
session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
957
1014
gok_or_after_err:
958
if (!table->cursor->has_transactions())
959
session->transaction.stmt.markModifiedNonTransData();
1016
my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH);
1017
if (!table->file->has_transactions())
1018
session->transaction.stmt.modified_non_trans_table= true;
978
1039
******************************************************************************/
980
1041
int check_that_all_fields_are_given_values(Session *session, Table *entry,
1042
TableList *table_list)
1045
MY_BITMAP *write_set= entry->write_set;
985
for (Field **field=entry->getFields() ; *field ; field++)
1047
for (Field **field=entry->field ; *field ; field++)
987
if (((*field)->isWriteSet()) == false)
990
* If the field doesn't have any default value
991
* and there is no actual value specified in the
992
* INSERT statement, throw error ER_NO_DEFAULT_FOR_FIELD.
994
if (((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
1049
if (!bitmap_is_set(write_set, (*field)->field_index) &&
1050
((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
995
1051
((*field)->real_type() != DRIZZLE_TYPE_ENUM))
997
my_error(ER_NO_DEFAULT_FOR_FIELD, MYF(0), (*field)->field_name);
1004
* However, if an actual NULL value was specified
1005
* for the field and the field is a NOT NULL field,
1006
* throw ER_BAD_NULL_ERROR.
1008
* Per the SQL standard, inserting NULL into a NOT NULL
1009
* field requires an error to be thrown.
1011
if (((*field)->flags & NOT_NULL_FLAG) &&
1012
(*field)->is_null())
1014
my_error(ER_BAD_NULL_ERROR, MYF(0), (*field)->field_name);
1056
table_list= table_list->top_table();
1060
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
1061
ER_NO_DEFAULT_FOR_FIELD,
1062
ER(ER_NO_DEFAULT_FOR_FIELD),
1063
(*field)->field_name);
1019
return session->abortOnWarning() ? err : 0;
1068
return session->abort_on_warning ? err : 0;
1022
1071
/***************************************************************************
1188
1252
else if (!(lex->current_select->options & OPTION_BUFFER_RESULT))
1191
We must not yet prepare the result table if it is the same as one of the
1192
source tables (INSERT SELECT). The preparation may disable
1255
We must not yet prepare the result table if it is the same as one of the
1256
source tables (INSERT SELECT). The preparation may disable
1193
1257
indexes on the result table, which may be used during the select, if it
1194
1258
is the same table (Bug #6034). Do the preparation after the select phase
1195
1259
in select_insert::prepare2().
1196
1260
We won't start bulk inserts at all if this statement uses functions or
1197
1261
should invoke triggers since they may access to the same table too.
1199
table->cursor->ha_start_bulk_insert((ha_rows) 0);
1263
table->file->ha_start_bulk_insert((ha_rows) 0);
1201
table->restoreRecordAsDefault(); // Get empty record
1265
restore_record(table,s->default_values); // Get empty record
1202
1266
table->next_number_field=table->found_next_number_field;
1268
if (session->slave_thread &&
1269
(info.handle_duplicates == DUP_UPDATE) &&
1270
(table->next_number_field != NULL) &&
1271
rpl_master_has_bug(&active_mi->rli, 24432))
1204
1274
session->cuted_fields=0;
1206
1275
if (info.ignore || info.handle_duplicates != DUP_ERROR)
1207
table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
1276
table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
1209
1277
if (info.handle_duplicates == DUP_REPLACE)
1210
table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1278
table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1212
1279
if (info.handle_duplicates == DUP_UPDATE)
1213
table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1215
session->setAbortOnWarning(not info.ignore);
1280
table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1281
session->abort_on_warning= !info.ignore;
1216
1282
table->mark_columns_needed_for_insert();
1326
1388
void select_insert::store_values(List<Item> &values)
1328
1390
if (fields->elements)
1329
fill_record(session, *fields, values, true);
1391
fill_record(session, *fields, values, 1);
1331
fill_record(session, table->getFields(), values, true);
1393
fill_record(session, table->field, values, 1);
1334
void select_insert::send_error(drizzled::error_t errcode,const char *err)
1396
void select_insert::send_error(uint32_t errcode,const char *err)
1336
1400
my_message(errcode, err, MYF(0));
1340
1406
bool select_insert::send_eof()
1343
bool const trans_table= table->cursor->has_transactions();
1409
bool const trans_table= table->file->has_transactions();
1347
error= table->cursor->ha_end_bulk_insert();
1348
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1349
table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1412
Session::killed_state killed_status= session->killed;
1414
error= table->file->ha_end_bulk_insert();
1415
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1416
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1351
1418
if ((changed= (info.copied || info.deleted || info.updated)))
1354
1421
We must invalidate the table in the query cache before binlog writing
1355
and autocommitOrRollback.
1422
and ha_autocommit_or_rollback.
1357
if (session->transaction.stmt.hasModifiedNonTransData())
1358
session->transaction.all.markModifiedNonTransData();
1424
if (session->transaction.stmt.modified_non_trans_table)
1425
session->transaction.all.modified_non_trans_table= true;
1360
assert(trans_table || !changed ||
1361
session->transaction.stmt.hasModifiedNonTransData());
1427
assert(trans_table || !changed ||
1428
session->transaction.stmt.modified_non_trans_table);
1363
table->cursor->ha_release_auto_increment();
1431
Write to binlog before commiting transaction. No statement will
1432
be written by the binlog_query() below in RBR mode. All the
1433
events are in the transaction cache and will be written when
1434
ha_autocommit_or_rollback() is issued below.
1436
if (mysql_bin_log.is_open())
1439
session->clear_error();
1440
session->binlog_query(Session::ROW_QUERY_TYPE,
1441
session->query, session->query_length,
1442
trans_table, false, killed_status);
1444
table->file->ha_release_auto_increment();
1367
table->print_error(error,MYF(0));
1368
DRIZZLE_INSERT_SELECT_DONE(error, 0);
1448
table->file->print_error(error,MYF(0));
1371
1451
char buff[160];
1372
1452
if (info.ignore)
1373
snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
1453
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
1374
1454
(ulong) (info.records - info.copied), (ulong) session->cuted_fields);
1376
snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
1456
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
1377
1457
(ulong) (info.deleted+info.updated), (ulong) session->cuted_fields);
1378
session->row_count_func= info.copied + info.deleted + info.updated;
1458
session->row_count_func= info.copied + info.deleted +
1459
((session->client_capabilities & CLIENT_FOUND_ROWS) ?
1460
info.touched : info.updated);
1380
1462
id= (session->first_successful_insert_id_in_cur_stmt > 0) ?
1381
1463
session->first_successful_insert_id_in_cur_stmt :
1382
1464
(session->arg_of_last_insert_id_function ?
1383
1465
session->first_successful_insert_id_in_prev_stmt :
1384
1466
(info.copied ? autoinc_value_of_last_inserted_row : 0));
1385
session->my_ok((ulong) session->rowCount(),
1386
info.copied + info.deleted + info.touched, id, buff);
1387
session->status_var.inserted_row_count+= session->rowCount();
1388
DRIZZLE_INSERT_SELECT_DONE(0, session->rowCount());
1467
::my_ok(session, (ulong) session->row_count_func, id, buff);
1392
1471
void select_insert::abort() {
1396
1475
If the creation of the table failed (due to a syntax error, for
1397
1476
example), no table will have been opened and therefore 'table'
1483
1565
static Table *create_table_from_items(Session *session, HA_CREATE_INFO *create_info,
1484
1566
TableList *create_table,
1485
message::Table &table_proto,
1486
AlterInfo *alter_info,
1567
Alter_info *alter_info,
1487
1568
List<Item> *items,
1488
bool is_if_not_exists,
1490
identifier::Table::const_reference identifier)
1569
DRIZZLE_LOCK **lock,
1570
Tableop_hooks *hooks)
1492
TableShare share(message::Table::INTERNAL);
1572
Table tmp_table; // Used during 'Create_field()'
1493
1575
uint32_t select_field_count= items->elements;
1494
1576
/* Add selected items to field list */
1495
1577
List_iterator_fast<Item> it(*items);
1497
1579
Field *tmp_field;
1499
if (not (identifier.isTmp()) && create_table->table->db_stat)
1582
if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
1583
create_table->table->db_stat)
1501
/* Table already exists and was open at openTablesLock() stage. */
1502
if (is_if_not_exists)
1585
/* Table already exists and was open at open_and_lock_tables() stage. */
1586
if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
1504
1588
create_info->table_existed= 1; // Mark that table existed
1505
1589
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1506
1590
ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
1507
create_table->getTableName());
1508
return create_table->table;
1591
create_table->table_name);
1592
return(create_table->table);
1511
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
1595
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
1600
tmp_table.timestamp_field= 0;
1601
tmp_table.s= &share;
1602
init_tmp_table_share(session, &share, "", 0, "", "");
1604
tmp_table.s->db_create_options=0;
1605
tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
1606
tmp_table.s->db_low_byte_first=
1607
test(create_info->db_type == myisam_hton ||
1608
create_info->db_type == heap_hton);
1609
tmp_table.null_row= false;
1610
tmp_table.maybe_null= false;
1516
table::Shell tmp_table(share); // Used during 'CreateField()'
1518
if (not table_proto.engine().name().compare("MyISAM"))
1519
tmp_table.getMutableShare()->db_low_byte_first= true;
1520
else if (not table_proto.engine().name().compare("MEMORY"))
1521
tmp_table.getMutableShare()->db_low_byte_first= true;
1523
tmp_table.in_use= session;
1527
CreateField *cr_field;
1528
Field *field, *def_field;
1529
if (item->type() == Item::FUNC_ITEM)
1531
if (item->result_type() != STRING_RESULT)
1533
field= item->tmp_table_field(&tmp_table);
1537
field= item->tmp_table_field_from_field_type(&tmp_table, 0);
1614
Create_field *cr_field;
1615
Field *field, *def_field;
1616
if (item->type() == Item::FUNC_ITEM)
1617
if (item->result_type() != STRING_RESULT)
1618
field= item->tmp_table_field(&tmp_table);
1542
field= create_tmp_field(session, &tmp_table, item, item->type(),
1543
(Item ***) 0, &tmp_field, &def_field, false,
1548
!(cr_field=new CreateField(field,(item->type() == Item::FIELD_ITEM ?
1549
((Item_field *)item)->field :
1555
if (item->maybe_null)
1557
cr_field->flags &= ~NOT_NULL_FLAG;
1560
alter_info->create_list.push_back(cr_field);
1620
field= item->tmp_table_field_from_field_type(&tmp_table, 0);
1622
field= create_tmp_field(session, &tmp_table, item, item->type(),
1623
(Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0,
1626
!(cr_field=new Create_field(field,(item->type() == Item::FIELD_ITEM ?
1627
((Item_field *)item)->field :
1630
if (item->maybe_null)
1631
cr_field->flags &= ~NOT_NULL_FLAG;
1632
alter_info->create_list.push_back(cr_field);
1567
1638
Note that we either creating (or opening existing) temporary table or
1568
1639
creating base table on which name we have exclusive lock. So code below
1569
1640
should not cause deadlocks or races.
1642
We don't log the statement, it will be logged later.
1644
If this is a HEAP table, the automatic DELETE FROM which is written to the
1645
binlog when a HEAP table is opened for the first time since startup, must
1646
not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
1647
don't want to delete from it) 2) it would be written before the CREATE
1648
Table, which is a wrong order. So we keep binary logging disabled when we
1573
if (not create_table_no_lock(session,
1652
tmp_disable_binlog(session);
1653
if (!mysql_create_table_no_lock(session, create_table->db,
1654
create_table->table_name,
1655
create_info, alter_info, 0,
1656
select_field_count, true))
1582
if (create_info->table_existed && not identifier.isTmp())
1658
if (create_info->table_existed &&
1659
!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
1585
1662
This means that someone created table underneath server
1586
1663
or it was created via different mysqld front-end to the
1587
1664
cluster. We don't have much options but throw an error.
1589
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
1666
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
1593
if (not identifier.isTmp())
1670
if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
1595
/* CREATE TABLE... has found that the table already exists for insert and is adapting to use it */
1596
boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex());
1598
if (create_table->table)
1672
pthread_mutex_lock(&LOCK_open);
1673
if (reopen_name_locked_table(session, create_table, false))
1600
table::Concurrent *concurrent_table= static_cast<table::Concurrent *>(create_table->table);
1602
if (concurrent_table->reopen_name_locked_table(create_table, session))
1604
(void)plugin::StorageEngine::dropTable(*session, identifier);
1608
table= create_table->table;
1675
quick_rm_table(create_info->db_type, create_table->db,
1676
table_case_name(create_info, create_table->table_name),
1613
(void)plugin::StorageEngine::dropTable(*session, identifier);
1680
table= create_table->table;
1681
pthread_mutex_unlock(&LOCK_open);
1618
if (not (table= session->openTable(create_table, (bool*) 0,
1619
DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
1620
not create_info->table_existed)
1685
if (!(table= open_table(session, create_table, (bool*) 0,
1686
DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
1687
!create_info->table_existed)
1623
1690
This shouldn't happen as creation of temporary table should make
1624
1691
it preparable for open. But let us do close_temporary_table() here
1627
session->drop_temporary_table(identifier);
1694
drop_temporary_table(session, create_table);
1631
if (not table) // open failed
1698
reenable_binlog(session);
1699
if (!table) // open failed
1635
1703
table->reginfo.lock_type=TL_WRITE;
1636
if (not ((*lock)= session->lockTables(&table, 1, DRIZZLE_LOCK_IGNORE_FLUSH)))
1704
hooks->prelock(&table, 1); // Call prelock hooks
1705
if (! ((*lock)= mysql_lock_tables(session, &table, 1,
1706
DRIZZLE_LOCK_IGNORE_FLUSH, ¬_used)) ||
1707
hooks->postlock(&table, 1))
1640
session->unlockTables(*lock);
1711
mysql_unlock_tables(session, *lock);
1644
if (not create_info->table_existed)
1645
session->drop_open_table(table, identifier);
1715
if (!create_info->table_existed)
1716
drop_open_table(session, table, create_table->db, create_table->table_name);
1654
select_create::prepare(List<Item> &values, Select_Lex_Unit *u)
1724
select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
1656
DrizzleLock *extra_lock= NULL;
1726
DRIZZLE_LOCK *extra_lock= NULL;
1729
Tableop_hooks *hook_ptr= NULL;
1658
For replication, the CREATE-SELECT statement is written
1659
in two pieces: the first transaction messsage contains
1660
the CREATE TABLE statement as a CreateTableStatement message
1661
necessary to create the table.
1663
The second transaction message contains all the InsertStatement
1664
and associated InsertRecords that should go into the table.
1731
For row-based replication, the CREATE-SELECT statement is written
1732
in two pieces: the first one contain the CREATE TABLE statement
1733
necessary to create the table and the second part contain the rows
1734
that should go into the table.
1736
For non-temporary tables, the start of the CREATE-SELECT
1737
implicitly commits the previous transaction, and all events
1738
forming the statement will be stored the transaction cache. At end
1739
of the statement, the entire statement is committed as a
1740
transaction, and all events are written to the binary log.
1742
On the master, the table is locked for the duration of the
1743
statement, but since the CREATE part is replicated as a simple
1744
statement, there is no way to lock the table for accesses on the
1745
slave. Hence, we have to hold on to the CREATE part of the
1746
statement until the statement has finished.
1748
class MY_HOOKS : public Tableop_hooks {
1750
MY_HOOKS(select_create *x, TableList *create_table,
1751
TableList *select_tables)
1752
: ptr(x), all_tables(*create_table)
1754
all_tables.next_global= select_tables;
1758
virtual int do_postlock(Table **tables, uint32_t count)
1760
Session *session= const_cast<Session*>(ptr->get_session());
1761
if (int error= decide_logging_format(session, &all_tables))
1764
Table const *const table = *tables;
1765
if (session->current_stmt_binlog_row_based &&
1766
!table->s->tmp_table &&
1767
!ptr->get_create_info()->table_existed)
1769
ptr->binlog_show_create_table(tables, count);
1775
TableList all_tables;
1778
MY_HOOKS hooks(this, create_table, select_tables);
1669
if (not (table= create_table_from_items(session, create_info, create_table,
1671
alter_info, &values,
1673
&extra_lock, identifier)))
1784
Start a statement transaction before the create if we are using
1785
row-based replication for the statement. If we are creating a
1786
temporary table, we need to start a statement transaction.
1788
if ((session->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 &&
1789
session->current_stmt_binlog_row_based)
1791
session->binlog_start_trans_and_stmt();
1794
if (!(table= create_table_from_items(session, create_info, create_table,
1795
alter_info, &values,
1796
&extra_lock, hook_ptr)))
1675
1797
return(-1); // abort() deletes table
1678
1799
if (extra_lock)
1680
1801
assert(m_plock == NULL);
1682
if (identifier.isTmp())
1803
if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
1683
1804
m_plock= &m_lock;
1685
1806
m_plock= &session->extra_lock;
1687
1808
*m_plock= extra_lock;
1690
if (table->getShare()->sizeFields() < values.elements)
1811
if (table->s->fields < values.elements)
1692
1813
my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
1696
1817
/* First field to copy */
1697
field= table->getFields() + table->getShare()->sizeFields() - values.elements;
1818
field= table->field+table->s->fields - values.elements;
1699
1820
/* Mark all fields that are given values */
1700
1821
for (Field **f= field ; *f ; f++)
1702
table->setWriteSet((*f)->position());
1822
bitmap_set_bit(table->write_set, (*f)->field_index);
1705
1824
/* Don't set timestamp if used */
1706
1825
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
1707
1826
table->next_number_field=table->found_next_number_field;
1709
table->restoreRecordAsDefault(); // Get empty record
1828
restore_record(table,s->default_values); // Get empty record
1710
1829
session->cuted_fields=0;
1711
1830
if (info.ignore || info.handle_duplicates != DUP_ERROR)
1712
table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
1831
table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
1714
1832
if (info.handle_duplicates == DUP_REPLACE)
1715
table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1833
table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1717
1834
if (info.handle_duplicates == DUP_UPDATE)
1718
table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1720
table->cursor->ha_start_bulk_insert((ha_rows) 0);
1721
session->setAbortOnWarning(not info.ignore);
1835
table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1836
table->file->ha_start_bulk_insert((ha_rows) 0);
1837
session->abort_on_warning= !info.ignore;
1722
1838
if (check_that_all_fields_are_given_values(session, table, table_list))
1725
1840
table->mark_columns_needed_for_insert();
1726
table->cursor->extra(HA_EXTRA_WRITE_CACHE);
1841
table->file->extra(HA_EXTRA_WRITE_CACHE);
1846
select_create::binlog_show_create_table(Table **tables, uint32_t count)
1849
Note 1: In RBR mode, we generate a CREATE TABLE statement for the
1850
created table by calling store_create_info() (behaves as SHOW
1851
CREATE TABLE). In the event of an error, nothing should be
1852
written to the binary log, even if the table is non-transactional;
1853
therefore we pretend that the generated CREATE TABLE statement is
1854
for a transactional table. The event will then be put in the
1855
transaction cache, and any subsequent events (e.g., table-map
1856
events and binrow events) will also be put there. We can then use
1857
ha_autocommit_or_rollback() to either throw away the entire
1858
kaboodle of events, or write them to the binary log.
1860
We write the CREATE TABLE statement here and not in prepare()
1861
since there potentially are sub-selects or accesses to information
1862
schema that will do a close_thread_tables(), destroying the
1863
statement transaction cache.
1865
assert(session->current_stmt_binlog_row_based);
1866
assert(tables && *tables && count > 0);
1869
String query(buf, sizeof(buf), system_charset_info);
1871
TableList tmp_table_list;
1873
memset(&tmp_table_list, 0, sizeof(tmp_table_list));
1874
tmp_table_list.table = *tables;
1875
query.length(0); // Have to zero it since constructor doesn't
1877
result= store_create_info(session, &tmp_table_list, &query, create_info);
1878
assert(result == 0); /* store_create_info() always return 0 */
1880
session->binlog_query(Session::STMT_QUERY_TYPE,
1881
query.ptr(), query.length(),
1882
/* is_trans */ true,
1883
/* suppress_use */ false);
1730
1886
void select_create::store_values(List<Item> &values)
1732
fill_record(session, field, values, true);
1888
fill_record(session, field, values, 1);
1736
void select_create::send_error(drizzled::error_t errcode,const char *err)
1892
void select_create::send_error(uint32_t errcode,const char *err)
1739
1897
This will execute any rollbacks that are necessary before writing
1740
1898
the transcation cache.