12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
17
/* Insert of records */
22
Drizzle has a different form of DELAYED then MySQL. DELAYED is just
23
a hint to the the sorage engine (which can then do whatever it likes.
25
#include <drizzled/server_includes.h>
21
26
#include <drizzled/sql_select.h>
22
#include <drizzled/show.h>
23
#include <drizzled/error.h>
24
#include <drizzled/name_resolution_context_state.h>
25
#include <drizzled/probes.h>
26
#include <drizzled/sql_base.h>
27
#include <drizzled/sql_load.h>
28
#include <drizzled/field/epoch.h>
29
#include <drizzled/lock.h>
30
#include "drizzled/sql_table.h"
31
#include "drizzled/pthread_globals.h"
32
#include "drizzled/transaction_services.h"
33
#include "drizzled/plugin/transactional_storage_engine.h"
35
#include "drizzled/table/shell.h"
40
extern plugin::StorageEngine *heap_engine;
41
extern plugin::StorageEngine *myisam_engine;
27
#include <drizzled/sql_show.h>
29
#include <drizzled/drizzled_error_messages.h>
31
/* Define to force use of my_malloc() if the allocated memory block is big */
34
#define my_safe_alloca(size, min_length) my_alloca(size)
35
#define my_safe_afree(ptr, size, min_length) my_afree(ptr)
37
#define my_safe_alloca(size, min_length) ((size <= min_length) ? my_alloca(size) : malloc(size))
38
#define my_safe_afree(ptr, size, min_length) if (size > min_length) free(ptr)
44
44
Check if insert fields are correct.
47
47
check_insert_fields()
48
session The current thread.
48
thd The current thread.
49
49
table The table for insert.
50
50
fields The insert fields.
51
51
values The insert values.
249
250
Name_resolution_context_state ctx_state;
250
251
thr_lock_type lock_type;
251
252
Item *unused_conds= 0;
255
256
Upgrade lock type if the requested lock is incompatible with
256
257
the current connection mode or table operation.
258
upgrade_lock_type(session, &table_list->lock_type, duplic,
259
upgrade_lock_type(thd, &table_list->lock_type, duplic,
259
260
values_list.elements > 1);
261
if (session->openTablesLock(table_list))
263
We can't write-delayed into a table locked with LOCK TABLES:
264
this will lead to a deadlock, since the delayed thread will
265
never be able to get a lock on the table. QQQ: why not
266
upgrade the lock here instead?
268
if (table_list->lock_type == TL_WRITE_DELAYED && thd->locked_tables &&
269
find_locked_table(thd, table_list->db, table_list->table_name))
263
DRIZZLE_INSERT_DONE(1, 0);
271
my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
272
table_list->table_name);
277
if (open_and_lock_tables(thd, table_list))
267
280
lock_type= table_list->lock_type;
269
session->set_proc_info("init");
270
session->used_tables=0;
282
thd_proc_info(thd, "init");
272
285
value_count= values->elements;
274
if (prepare_insert(session, table_list, table, fields, values,
287
if (mysql_prepare_insert(thd, table_list, table, fields, values,
275
288
update_fields, update_values, duplic, &unused_conds,
277
290
(fields.elements || !value_count ||
278
291
(0) != 0), !ignore))
281
table->cursor->ha_release_auto_increment();
283
free_underlaid_joins(session, &session->lex->select_lex);
284
session->setAbortOnWarning(false);
285
DRIZZLE_INSERT_DONE(1, 0);
289
294
/* mysql_prepare_insert set table_list->table if it was not set */
290
295
table= table_list->table;
292
context= &session->lex->select_lex.context;
297
context= &thd->lex->select_lex.context;
294
299
These three asserts test the hypothesis that the resetting of the name
295
300
resolution context below is not necessary at all since the list of local
434
438
Do not do this release if this is a delayed insert, it would steal
435
439
auto_inc values from the delayed_insert thread as they share Table.
437
table->cursor->ha_release_auto_increment();
438
if (table->cursor->ha_end_bulk_insert() && !error)
441
table->file->ha_release_auto_increment();
442
if (table->file->ha_end_bulk_insert() && !error)
440
table->print_error(errno,MYF(0));
444
table->file->print_error(my_errno,MYF(0));
443
447
if (duplic != DUP_ERROR || ignore)
444
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
446
transactional_table= table->cursor->has_transactions();
448
changed= (info.copied || info.deleted || info.updated);
449
if ((changed && error <= 0) || session->transaction.stmt.hasModifiedNonTransData())
451
if (session->transaction.stmt.hasModifiedNonTransData())
452
session->transaction.all.markModifiedNonTransData();
454
assert(transactional_table || !changed || session->transaction.stmt.hasModifiedNonTransData());
448
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
450
transactional_table= table->file->has_transactions();
452
if ((changed= (info.copied || info.deleted || info.updated)))
455
Invalidate the table in the query cache if something changed.
456
For the transactional algorithm to work the invalidation must be
457
before binlog writing and ha_autocommit_or_rollback
460
if ((changed && error <= 0) || thd->transaction.stmt.modified_non_trans_table || was_insert_delayed)
462
if (mysql_bin_log.is_open())
467
[Guilhem wrote] Temporary errors may have filled
468
thd->net.last_error/errno. For example if there has
469
been a disk full error when writing the row, and it was
470
MyISAM, then thd->net.last_error/errno will be set to
471
"disk full"... and the my_pwrite() will wait until free
472
space appears, and so when it finishes then the
473
write_row() was entirely successful
475
/* todo: consider removing */
480
A query which per-row-loop can not be interrupted with
481
KILLED, like INSERT, and that does not invoke stored
482
routines can be binlogged with neglecting the KILLED error.
484
If there was no error (error == zero) until after the end of
485
inserting loop the KILLED flag that appeared later can be
486
disregarded since previously possible invocation of stored
487
routines did not result in any error due to the KILLED. In
488
such case the flag is ignored for constructing binlog event.
490
assert(thd->killed != THD::KILL_BAD_DATA || error > 0);
491
if (thd->binlog_query(THD::ROW_QUERY_TYPE,
492
thd->query, thd->query_length,
493
transactional_table, false,
494
(error>0) ? thd->killed : THD::NOT_KILLED) &&
500
if (thd->transaction.stmt.modified_non_trans_table)
501
thd->transaction.all.modified_non_trans_table= true;
503
assert(transactional_table || !changed ||
504
thd->transaction.stmt.modified_non_trans_table);
457
session->set_proc_info("end");
507
thd_proc_info(thd, "end");
459
509
We'll report to the client this id:
460
510
- if the table contains an autoincrement column and we successfully
465
515
inserted, the id of the last "inserted" row (if IGNORE, that value may not
466
516
have been really inserted but ignored).
468
id= (session->first_successful_insert_id_in_cur_stmt > 0) ?
469
session->first_successful_insert_id_in_cur_stmt :
470
(session->arg_of_last_insert_id_function ?
471
session->first_successful_insert_id_in_prev_stmt :
518
id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
519
thd->first_successful_insert_id_in_cur_stmt :
520
(thd->arg_of_last_insert_id_function ?
521
thd->first_successful_insert_id_in_prev_stmt :
472
522
((table->next_number_field && info.copied) ?
473
523
table->next_number_field->val_int() : 0));
474
524
table->next_number_field=0;
475
session->count_cuted_fields= CHECK_FIELD_IGNORE;
525
thd->count_cuted_fields= CHECK_FIELD_IGNORE;
476
526
table->auto_increment_field_not_null= false;
477
527
if (duplic == DUP_REPLACE)
478
table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
528
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
483
table->cursor->ha_release_auto_increment();
485
free_underlaid_joins(session, &session->lex->select_lex);
486
session->setAbortOnWarning(false);
487
DRIZZLE_INSERT_DONE(1, 0);
491
if (values_list.elements == 1 && (!(session->options & OPTION_WARNINGS) ||
492
!session->cuted_fields))
494
session->row_count_func= info.copied + info.deleted + info.updated;
495
session->my_ok((ulong) session->rowCount(),
496
info.copied + info.deleted + info.touched, id);
532
if (values_list.elements == 1 && (!(thd->options & OPTION_WARNINGS) ||
535
thd->row_count_func= info.copied + info.deleted +
536
((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
537
info.touched : info.updated);
538
my_ok(thd, (ulong) thd->row_count_func, id);
543
ha_rows updated=((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
544
info.touched : info.updated);
502
snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
503
(ulong) (info.records - info.copied), (ulong) session->cuted_fields);
546
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
547
(ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
505
snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
506
(ulong) (info.deleted + info.updated), (ulong) session->cuted_fields);
507
session->row_count_func= info.copied + info.deleted + info.updated;
508
session->my_ok((ulong) session->rowCount(),
509
info.copied + info.deleted + info.touched, id, buff);
549
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
550
(ulong) (info.deleted + updated), (ulong) thd->cuted_fields);
551
thd->row_count_func= info.copied + info.deleted + updated;
552
::my_ok(thd, (ulong) thd->row_count_func, id, buff);
511
session->status_var.inserted_row_count+= session->rowCount();
512
session->setAbortOnWarning(false);
513
DRIZZLE_INSERT_DONE(0, session->rowCount());
554
thd->abort_on_warning= 0;
555
DRIZZLE_INSERT_END();
560
table->file->ha_release_auto_increment();
562
free_underlaid_joins(thd, &thd->lex->select_lex);
563
thd->abort_on_warning= 0;
564
DRIZZLE_INSERT_END();
652
702
table_list->next_local= 0;
653
703
context->resolve_in_table_list_only(table_list);
655
res= check_insert_fields(session, context->table_list, fields, *values,
705
res= check_insert_fields(thd, context->table_list, fields, *values,
656
706
!insert_into_view, &map) ||
657
setup_fields(session, 0, *values, MARK_COLUMNS_READ, 0, 0);
707
setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0);
659
709
if (!res && check_fields)
661
bool saved_abort_on_warning= session->abortOnWarning();
663
session->setAbortOnWarning(abort_on_warning);
664
res= check_that_all_fields_are_given_values(session,
711
bool saved_abort_on_warning= thd->abort_on_warning;
712
thd->abort_on_warning= abort_on_warning;
713
res= check_that_all_fields_are_given_values(thd,
666
715
context->table_list->table,
667
716
context->table_list);
668
session->setAbortOnWarning(saved_abort_on_warning);
717
thd->abort_on_warning= saved_abort_on_warning;
671
720
if (!res && duplic == DUP_UPDATE)
673
res= check_update_fields(session, context->table_list, update_fields, &map);
722
res= check_update_fields(thd, context->table_list, update_fields, &map);
676
725
/* Restore the current context. */
677
726
ctx_state.restore_state(context, table_list);
680
res= setup_fields(session, 0, update_values, MARK_COLUMNS_READ, 0, 0);
729
res= setup_fields(thd, 0, update_values, MARK_COLUMNS_READ, 0, 0);
687
736
table= table_list->table;
689
if (not select_insert)
691
740
TableList *duplicate;
692
if ((duplicate= unique_table(table_list, table_list->next_global, true)))
741
if ((duplicate= unique_table(thd, table_list, table_list->next_global, 1)))
694
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->alias);
743
update_non_unique_table_error(table_list, "INSERT", duplicate);
700
747
if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
701
748
table->prepare_for_position();
926
if ((error=table->cursor->deleteRecord(table->getUpdateRecord())))
979
if ((error=table->file->ha_delete_row(table->record[1])))
929
if (!table->cursor->has_transactions())
930
session->transaction.stmt.markModifiedNonTransData();
982
if (!table->file->has_transactions())
983
thd->transaction.stmt.modified_non_trans_table= true;
931
984
/* Let us attempt do write_row() once more */
935
session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
988
thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
937
990
Restore column maps if they where replaced during an duplicate key
940
993
if (table->read_set != save_read_set ||
941
994
table->write_set != save_write_set)
942
table->column_bitmaps_set(*save_read_set, *save_write_set);
995
table->column_bitmaps_set(save_read_set, save_write_set);
944
else if ((error=table->cursor->insertRecord(table->getInsertRecord())))
997
else if ((error=table->file->ha_write_row(table->record[0])))
946
999
if (!info->ignore ||
947
table->cursor->is_fatal_error(error, HA_CHECK_DUP))
1000
table->file->is_fatal_error(error, HA_CHECK_DUP))
949
table->cursor->restore_auto_increment(prev_insert_id);
1002
table->file->restore_auto_increment(prev_insert_id);
950
1003
goto gok_or_after_err;
953
1006
after_n_copied_inc:
955
session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
1008
thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
957
1010
gok_or_after_err:
958
if (!table->cursor->has_transactions())
959
session->transaction.stmt.markModifiedNonTransData();
1012
my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH);
1013
if (!table->file->has_transactions())
1014
thd->transaction.stmt.modified_non_trans_table= true;
963
1018
info->last_errno= error;
964
1019
/* current_select is NULL if this is a delayed insert */
965
if (session->lex->current_select)
966
session->lex->current_select->no_error= 0; // Give error
967
table->print_error(error,MYF(0));
1020
if (thd->lex->current_select)
1021
thd->lex->current_select->no_error= 0; // Give error
1022
table->file->print_error(error,MYF(0));
970
table->cursor->restore_auto_increment(prev_insert_id);
971
table->column_bitmaps_set(*save_read_set, *save_write_set);
1025
table->file->restore_auto_increment(prev_insert_id);
1027
my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH);
1028
table->column_bitmaps_set(save_read_set, save_write_set);
1326
1384
void select_insert::store_values(List<Item> &values)
1328
1386
if (fields->elements)
1329
fill_record(session, *fields, values, true);
1387
fill_record(thd, *fields, values, 1);
1331
fill_record(session, table->getFields(), values, true);
1389
fill_record(thd, table->field, values, 1);
1334
void select_insert::send_error(drizzled::error_t errcode,const char *err)
1392
void select_insert::send_error(uint32_t errcode,const char *err)
1336
1396
my_message(errcode, err, MYF(0));
1340
1402
bool select_insert::send_eof()
1343
bool const trans_table= table->cursor->has_transactions();
1405
bool const trans_table= table->file->has_transactions();
1347
error= table->cursor->ha_end_bulk_insert();
1348
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1349
table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1408
THD::killed_state killed_status= thd->killed;
1410
error= table->file->ha_end_bulk_insert();
1411
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1412
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1351
1414
if ((changed= (info.copied || info.deleted || info.updated)))
1354
1417
We must invalidate the table in the query cache before binlog writing
1355
and autocommitOrRollback.
1418
and ha_autocommit_or_rollback.
1357
if (session->transaction.stmt.hasModifiedNonTransData())
1358
session->transaction.all.markModifiedNonTransData();
1420
if (thd->transaction.stmt.modified_non_trans_table)
1421
thd->transaction.all.modified_non_trans_table= true;
1360
assert(trans_table || !changed ||
1361
session->transaction.stmt.hasModifiedNonTransData());
1423
assert(trans_table || !changed ||
1424
thd->transaction.stmt.modified_non_trans_table);
1363
table->cursor->ha_release_auto_increment();
1427
Write to binlog before commiting transaction. No statement will
1428
be written by the binlog_query() below in RBR mode. All the
1429
events are in the transaction cache and will be written when
1430
ha_autocommit_or_rollback() is issued below.
1432
if (mysql_bin_log.is_open())
1436
thd->binlog_query(THD::ROW_QUERY_TYPE,
1437
thd->query, thd->query_length,
1438
trans_table, false, killed_status);
1440
table->file->ha_release_auto_increment();
1367
table->print_error(error,MYF(0));
1368
DRIZZLE_INSERT_SELECT_DONE(error, 0);
1444
table->file->print_error(error,MYF(0));
1371
1447
char buff[160];
1372
1448
if (info.ignore)
1373
snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
1374
(ulong) (info.records - info.copied), (ulong) session->cuted_fields);
1449
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
1450
(ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
1376
snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
1377
(ulong) (info.deleted+info.updated), (ulong) session->cuted_fields);
1378
session->row_count_func= info.copied + info.deleted + info.updated;
1452
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
1453
(ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields);
1454
thd->row_count_func= info.copied + info.deleted +
1455
((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
1456
info.touched : info.updated);
1380
id= (session->first_successful_insert_id_in_cur_stmt > 0) ?
1381
session->first_successful_insert_id_in_cur_stmt :
1382
(session->arg_of_last_insert_id_function ?
1383
session->first_successful_insert_id_in_prev_stmt :
1458
id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
1459
thd->first_successful_insert_id_in_cur_stmt :
1460
(thd->arg_of_last_insert_id_function ?
1461
thd->first_successful_insert_id_in_prev_stmt :
1384
1462
(info.copied ? autoinc_value_of_last_inserted_row : 0));
1385
session->my_ok((ulong) session->rowCount(),
1386
info.copied + info.deleted + info.touched, id, buff);
1387
session->status_var.inserted_row_count+= session->rowCount();
1388
DRIZZLE_INSERT_SELECT_DONE(0, session->rowCount());
1463
::my_ok(thd, (ulong) thd->row_count_func, id, buff);
1392
1467
void select_insert::abort() {
1396
1471
If the creation of the table failed (due to a syntax error, for
1397
1472
example), no table will have been opened and therefore 'table'
1483
static Table *create_table_from_items(Session *session, HA_CREATE_INFO *create_info,
1561
static Table *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
1484
1562
TableList *create_table,
1485
message::Table &table_proto,
1486
AlterInfo *alter_info,
1563
Alter_info *alter_info,
1487
1564
List<Item> *items,
1488
bool is_if_not_exists,
1490
identifier::Table::const_reference identifier)
1565
DRIZZLE_LOCK **lock,
1566
TABLEOP_HOOKS *hooks)
1492
TableShare share(message::Table::INTERNAL);
1568
Table tmp_table; // Used during 'Create_field()'
1493
1571
uint32_t select_field_count= items->elements;
1494
1572
/* Add selected items to field list */
1495
1573
List_iterator_fast<Item> it(*items);
1497
1575
Field *tmp_field;
1499
if (not (identifier.isTmp()) && create_table->table->db_stat)
1578
if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
1579
create_table->table->db_stat)
1501
/* Table already exists and was open at openTablesLock() stage. */
1502
if (is_if_not_exists)
1581
/* Table already exists and was open at open_and_lock_tables() stage. */
1582
if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
1504
1584
create_info->table_existed= 1; // Mark that table existed
1505
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1585
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1506
1586
ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
1507
create_table->getTableName());
1508
return create_table->table;
1587
create_table->table_name);
1588
return(create_table->table);
1511
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
1591
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
1596
tmp_table.timestamp_field= 0;
1597
tmp_table.s= &share;
1598
init_tmp_table_share(thd, &share, "", 0, "", "");
1600
tmp_table.s->db_create_options=0;
1601
tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
1602
tmp_table.s->db_low_byte_first=
1603
test(create_info->db_type == myisam_hton ||
1604
create_info->db_type == heap_hton);
1605
tmp_table.null_row= false;
1606
tmp_table.maybe_null= false;
1516
table::Shell tmp_table(share); // Used during 'CreateField()'
1518
if (not table_proto.engine().name().compare("MyISAM"))
1519
tmp_table.getMutableShare()->db_low_byte_first= true;
1520
else if (not table_proto.engine().name().compare("MEMORY"))
1521
tmp_table.getMutableShare()->db_low_byte_first= true;
1523
tmp_table.in_use= session;
1527
CreateField *cr_field;
1528
Field *field, *def_field;
1529
if (item->type() == Item::FUNC_ITEM)
1531
if (item->result_type() != STRING_RESULT)
1533
field= item->tmp_table_field(&tmp_table);
1537
field= item->tmp_table_field_from_field_type(&tmp_table, 0);
1610
Create_field *cr_field;
1611
Field *field, *def_field;
1612
if (item->type() == Item::FUNC_ITEM)
1613
if (item->result_type() != STRING_RESULT)
1614
field= item->tmp_table_field(&tmp_table);
1542
field= create_tmp_field(session, &tmp_table, item, item->type(),
1543
(Item ***) 0, &tmp_field, &def_field, false,
1548
!(cr_field=new CreateField(field,(item->type() == Item::FIELD_ITEM ?
1549
((Item_field *)item)->field :
1555
if (item->maybe_null)
1557
cr_field->flags &= ~NOT_NULL_FLAG;
1560
alter_info->create_list.push_back(cr_field);
1616
field= item->tmp_table_field_from_field_type(&tmp_table, 0);
1618
field= create_tmp_field(thd, &tmp_table, item, item->type(),
1619
(Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0,
1622
!(cr_field=new Create_field(field,(item->type() == Item::FIELD_ITEM ?
1623
((Item_field *)item)->field :
1626
if (item->maybe_null)
1627
cr_field->flags &= ~NOT_NULL_FLAG;
1628
alter_info->create_list.push_back(cr_field);
1567
1634
Note that we either creating (or opening existing) temporary table or
1568
1635
creating base table on which name we have exclusive lock. So code below
1569
1636
should not cause deadlocks or races.
1638
We don't log the statement, it will be logged later.
1640
If this is a HEAP table, the automatic DELETE FROM which is written to the
1641
binlog when a HEAP table is opened for the first time since startup, must
1642
not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
1643
don't want to delete from it) 2) it would be written before the CREATE
1644
Table, which is a wrong order. So we keep binary logging disabled when we
1573
if (not create_table_no_lock(session,
1648
tmp_disable_binlog(thd);
1649
if (!mysql_create_table_no_lock(thd, create_table->db,
1650
create_table->table_name,
1651
create_info, alter_info, 0,
1652
select_field_count))
1582
if (create_info->table_existed && not identifier.isTmp())
1654
if (create_info->table_existed &&
1655
!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
1585
1658
This means that someone created table underneath server
1586
1659
or it was created via different mysqld front-end to the
1587
1660
cluster. We don't have much options but throw an error.
1589
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
1662
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
1593
if (not identifier.isTmp())
1666
if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
1595
/* CREATE TABLE... has found that the table already exists for insert and is adapting to use it */
1596
boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex());
1598
if (create_table->table)
1668
pthread_mutex_lock(&LOCK_open);
1669
if (reopen_name_locked_table(thd, create_table, false))
1600
table::Concurrent *concurrent_table= static_cast<table::Concurrent *>(create_table->table);
1602
if (concurrent_table->reopen_name_locked_table(create_table, session))
1604
(void)plugin::StorageEngine::dropTable(*session, identifier);
1608
table= create_table->table;
1671
quick_rm_table(create_info->db_type, create_table->db,
1672
table_case_name(create_info, create_table->table_name),
1613
(void)plugin::StorageEngine::dropTable(*session, identifier);
1676
table= create_table->table;
1677
pthread_mutex_unlock(&LOCK_open);
1618
if (not (table= session->openTable(create_table, (bool*) 0,
1619
DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
1620
not create_info->table_existed)
1681
if (!(table= open_table(thd, create_table, (bool*) 0,
1682
DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
1683
!create_info->table_existed)
1623
1686
This shouldn't happen as creation of temporary table should make
1624
1687
it preparable for open. But let us do close_temporary_table() here
1627
session->drop_temporary_table(identifier);
1690
drop_temporary_table(thd, create_table);
1631
if (not table) // open failed
1694
reenable_binlog(thd);
1695
if (!table) // open failed
1635
1699
table->reginfo.lock_type=TL_WRITE;
1636
if (not ((*lock)= session->lockTables(&table, 1, DRIZZLE_LOCK_IGNORE_FLUSH)))
1700
hooks->prelock(&table, 1); // Call prelock hooks
1701
if (! ((*lock)= mysql_lock_tables(thd, &table, 1,
1702
DRIZZLE_LOCK_IGNORE_FLUSH, ¬_used)) ||
1703
hooks->postlock(&table, 1))
1640
session->unlockTables(*lock);
1707
mysql_unlock_tables(thd, *lock);
1644
if (not create_info->table_existed)
1645
session->drop_open_table(table, identifier);
1711
if (!create_info->table_existed)
1712
drop_open_table(thd, table, create_table->db, create_table->table_name);
1654
select_create::prepare(List<Item> &values, Select_Lex_Unit *u)
1720
select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
1656
DrizzleLock *extra_lock= NULL;
1722
DRIZZLE_LOCK *extra_lock= NULL;
1725
TABLEOP_HOOKS *hook_ptr= NULL;
1658
For replication, the CREATE-SELECT statement is written
1659
in two pieces: the first transaction messsage contains
1660
the CREATE TABLE statement as a CreateTableStatement message
1661
necessary to create the table.
1663
The second transaction message contains all the InsertStatement
1664
and associated InsertRecords that should go into the table.
1727
For row-based replication, the CREATE-SELECT statement is written
1728
in two pieces: the first one contain the CREATE TABLE statement
1729
necessary to create the table and the second part contain the rows
1730
that should go into the table.
1732
For non-temporary tables, the start of the CREATE-SELECT
1733
implicitly commits the previous transaction, and all events
1734
forming the statement will be stored the transaction cache. At end
1735
of the statement, the entire statement is committed as a
1736
transaction, and all events are written to the binary log.
1738
On the master, the table is locked for the duration of the
1739
statement, but since the CREATE part is replicated as a simple
1740
statement, there is no way to lock the table for accesses on the
1741
slave. Hence, we have to hold on to the CREATE part of the
1742
statement until the statement has finished.
1744
class MY_HOOKS : public TABLEOP_HOOKS {
1746
MY_HOOKS(select_create *x, TableList *create_table,
1747
TableList *select_tables)
1748
: ptr(x), all_tables(*create_table)
1750
all_tables.next_global= select_tables;
1754
virtual int do_postlock(Table **tables, uint32_t count)
1756
THD *thd= const_cast<THD*>(ptr->get_thd());
1757
if (int error= decide_logging_format(thd, &all_tables))
1760
Table const *const table = *tables;
1761
if (thd->current_stmt_binlog_row_based &&
1762
!table->s->tmp_table &&
1763
!ptr->get_create_info()->table_existed)
1765
ptr->binlog_show_create_table(tables, count);
1771
TableList all_tables;
1774
MY_HOOKS hooks(this, create_table, select_tables);
1669
if (not (table= create_table_from_items(session, create_info, create_table,
1671
alter_info, &values,
1673
&extra_lock, identifier)))
1780
Start a statement transaction before the create if we are using
1781
row-based replication for the statement. If we are creating a
1782
temporary table, we need to start a statement transaction.
1784
if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 &&
1785
thd->current_stmt_binlog_row_based)
1787
thd->binlog_start_trans_and_stmt();
1790
if (!(table= create_table_from_items(thd, create_info, create_table,
1791
alter_info, &values,
1792
&extra_lock, hook_ptr)))
1675
1793
return(-1); // abort() deletes table
1678
1795
if (extra_lock)
1680
1797
assert(m_plock == NULL);
1682
if (identifier.isTmp())
1799
if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
1683
1800
m_plock= &m_lock;
1685
m_plock= &session->extra_lock;
1802
m_plock= &thd->extra_lock;
1687
1804
*m_plock= extra_lock;
1690
if (table->getShare()->sizeFields() < values.elements)
1807
if (table->s->fields < values.elements)
1692
1809
my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
1696
1813
/* First field to copy */
1697
field= table->getFields() + table->getShare()->sizeFields() - values.elements;
1814
field= table->field+table->s->fields - values.elements;
1699
1816
/* Mark all fields that are given values */
1700
1817
for (Field **f= field ; *f ; f++)
1702
table->setWriteSet((*f)->position());
1818
bitmap_set_bit(table->write_set, (*f)->field_index);
1705
1820
/* Don't set timestamp if used */
1706
1821
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
1707
1822
table->next_number_field=table->found_next_number_field;
1709
table->restoreRecordAsDefault(); // Get empty record
1710
session->cuted_fields=0;
1824
restore_record(table,s->default_values); // Get empty record
1825
thd->cuted_fields=0;
1711
1826
if (info.ignore || info.handle_duplicates != DUP_ERROR)
1712
table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
1827
table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
1714
1828
if (info.handle_duplicates == DUP_REPLACE)
1715
table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1829
table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1717
1830
if (info.handle_duplicates == DUP_UPDATE)
1718
table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1720
table->cursor->ha_start_bulk_insert((ha_rows) 0);
1721
session->setAbortOnWarning(not info.ignore);
1722
if (check_that_all_fields_are_given_values(session, table, table_list))
1831
table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1832
table->file->ha_start_bulk_insert((ha_rows) 0);
1833
thd->abort_on_warning= !info.ignore;
1834
if (check_that_all_fields_are_given_values(thd, table, table_list))
1725
1836
table->mark_columns_needed_for_insert();
1726
table->cursor->extra(HA_EXTRA_WRITE_CACHE);
1837
table->file->extra(HA_EXTRA_WRITE_CACHE);
1842
select_create::binlog_show_create_table(Table **tables, uint32_t count)
1845
Note 1: In RBR mode, we generate a CREATE TABLE statement for the
1846
created table by calling store_create_info() (behaves as SHOW
1847
CREATE TABLE). In the event of an error, nothing should be
1848
written to the binary log, even if the table is non-transactional;
1849
therefore we pretend that the generated CREATE TABLE statement is
1850
for a transactional table. The event will then be put in the
1851
transaction cache, and any subsequent events (e.g., table-map
1852
events and binrow events) will also be put there. We can then use
1853
ha_autocommit_or_rollback() to either throw away the entire
1854
kaboodle of events, or write them to the binary log.
1856
We write the CREATE TABLE statement here and not in prepare()
1857
since there potentially are sub-selects or accesses to information
1858
schema that will do a close_thread_tables(), destroying the
1859
statement transaction cache.
1861
assert(thd->current_stmt_binlog_row_based);
1862
assert(tables && *tables && count > 0);
1865
String query(buf, sizeof(buf), system_charset_info);
1867
TableList tmp_table_list;
1869
memset(&tmp_table_list, 0, sizeof(tmp_table_list));
1870
tmp_table_list.table = *tables;
1871
query.length(0); // Have to zero it since constructor doesn't
1873
result= store_create_info(thd, &tmp_table_list, &query, create_info);
1874
assert(result == 0); /* store_create_info() always return 0 */
1876
thd->binlog_query(THD::STMT_QUERY_TYPE,
1877
query.ptr(), query.length(),
1878
/* is_trans */ true,
1879
/* suppress_use */ false);
1730
1882
void select_create::store_values(List<Item> &values)
1732
fill_record(session, field, values, true);
1884
fill_record(thd, field, values, 1);
1736
void select_create::send_error(drizzled::error_t errcode,const char *err)
1888
void select_create::send_error(uint32_t errcode,const char *err)
1739
1893
This will execute any rollbacks that are necessary before writing
1740
1894
the transcation cache.