12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
17
/* Insert of records */
22
Drizzle has a different form of DELAYED then MySQL. DELAYED is just
23
a hint to the the sorage engine (which can then do whatever it likes.
25
#include <drizzled/server_includes.h>
21
26
#include <drizzled/sql_select.h>
22
#include <drizzled/show.h>
23
#include <drizzled/error.h>
24
#include <drizzled/name_resolution_context_state.h>
25
#include <drizzled/probes.h>
26
#include <drizzled/sql_base.h>
27
#include <drizzled/sql_load.h>
28
#include <drizzled/field/timestamp.h>
29
#include <drizzled/lock.h>
30
#include "drizzled/sql_table.h"
31
#include "drizzled/pthread_globals.h"
32
#include "drizzled/transaction_services.h"
33
#include "drizzled/plugin/transactional_storage_engine.h"
35
#include "drizzled/table/shell.h"
40
extern plugin::StorageEngine *heap_engine;
41
extern plugin::StorageEngine *myisam_engine;
27
#include <drizzled/sql_show.h>
29
#include <drizzled/drizzled_error_messages.h>
31
/* Define to force use of my_malloc() if the allocated memory block is big */
34
#define my_safe_alloca(size, min_length) my_alloca(size)
35
#define my_safe_afree(ptr, size, min_length) my_afree(ptr)
37
#define my_safe_alloca(size, min_length) ((size <= min_length) ? my_alloca(size) : my_malloc(size,MYF(0)))
38
#define my_safe_afree(ptr, size, min_length) if (size > min_length) my_free(ptr,MYF(0))
44
44
Check if insert fields are correct.
47
47
check_insert_fields()
48
session The current thread.
48
thd The current thread.
49
49
table The table for insert.
50
50
fields The insert fields.
51
51
values The insert values.
249
247
Name_resolution_context_state ctx_state;
250
248
thr_lock_type lock_type;
251
249
Item *unused_conds= 0;
255
253
Upgrade lock type if the requested lock is incompatible with
256
254
the current connection mode or table operation.
258
upgrade_lock_type(session, &table_list->lock_type, duplic,
256
upgrade_lock_type(thd, &table_list->lock_type, duplic,
259
257
values_list.elements > 1);
261
if (session->openTablesLock(table_list))
260
We can't write-delayed into a table locked with LOCK TABLES:
261
this will lead to a deadlock, since the delayed thread will
262
never be able to get a lock on the table. QQQ: why not
263
upgrade the lock here instead?
265
if (table_list->lock_type == TL_WRITE_DELAYED && thd->locked_tables &&
266
find_locked_table(thd, table_list->db, table_list->table_name))
263
DRIZZLE_INSERT_DONE(1, 0);
268
my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
269
table_list->table_name);
274
if (open_and_lock_tables(thd, table_list))
267
277
lock_type= table_list->lock_type;
269
session->set_proc_info("init");
270
session->used_tables=0;
279
thd_proc_info(thd, "init");
272
282
value_count= values->elements;
274
if (mysql_prepare_insert(session, table_list, table, fields, values,
284
if (mysql_prepare_insert(thd, table_list, table, fields, values,
275
285
update_fields, update_values, duplic, &unused_conds,
277
287
(fields.elements || !value_count ||
278
288
(0) != 0), !ignore))
281
table->cursor->ha_release_auto_increment();
283
free_underlaid_joins(session, &session->lex->select_lex);
284
session->abort_on_warning= 0;
285
DRIZZLE_INSERT_DONE(1, 0);
289
291
/* mysql_prepare_insert set table_list->table if it was not set */
290
292
table= table_list->table;
292
context= &session->lex->select_lex.context;
294
context= &thd->lex->select_lex.context;
294
296
These three asserts test the hypothesis that the resetting of the name
295
297
resolution context below is not necessary at all since the list of local
434
437
Do not do this release if this is a delayed insert, it would steal
435
438
auto_inc values from the delayed_insert thread as they share Table.
437
table->cursor->ha_release_auto_increment();
438
if (table->cursor->ha_end_bulk_insert() && !error)
440
table->file->ha_release_auto_increment();
441
if (table->file->ha_end_bulk_insert() && !error)
440
table->print_error(errno,MYF(0));
443
table->file->print_error(my_errno,MYF(0));
443
446
if (duplic != DUP_ERROR || ignore)
444
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
446
transactional_table= table->cursor->has_transactions();
448
changed= (info.copied || info.deleted || info.updated);
449
if ((changed && error <= 0) || session->transaction.stmt.hasModifiedNonTransData())
451
if (session->transaction.stmt.hasModifiedNonTransData())
452
session->transaction.all.markModifiedNonTransData();
454
assert(transactional_table || !changed || session->transaction.stmt.hasModifiedNonTransData());
447
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
449
transactional_table= table->file->has_transactions();
451
if ((changed= (info.copied || info.deleted || info.updated)))
454
Invalidate the table in the query cache if something changed.
455
For the transactional algorithm to work the invalidation must be
456
before binlog writing and ha_autocommit_or_rollback
459
if ((changed && error <= 0) || thd->transaction.stmt.modified_non_trans_table || was_insert_delayed)
461
if (mysql_bin_log.is_open())
466
[Guilhem wrote] Temporary errors may have filled
467
thd->net.last_error/errno. For example if there has
468
been a disk full error when writing the row, and it was
469
MyISAM, then thd->net.last_error/errno will be set to
470
"disk full"... and the my_pwrite() will wait until free
471
space appears, and so when it finishes then the
472
write_row() was entirely successful
474
/* todo: consider removing */
479
A query which per-row-loop can not be interrupted with
480
KILLED, like INSERT, and that does not invoke stored
481
routines can be binlogged with neglecting the KILLED error.
483
If there was no error (error == zero) until after the end of
484
inserting loop the KILLED flag that appeared later can be
485
disregarded since previously possible invocation of stored
486
routines did not result in any error due to the KILLED. In
487
such case the flag is ignored for constructing binlog event.
489
assert(thd->killed != THD::KILL_BAD_DATA || error > 0);
490
if (thd->binlog_query(THD::ROW_QUERY_TYPE,
491
thd->query, thd->query_length,
492
transactional_table, false,
493
(error>0) ? thd->killed : THD::NOT_KILLED) &&
499
if (thd->transaction.stmt.modified_non_trans_table)
500
thd->transaction.all.modified_non_trans_table= true;
502
assert(transactional_table || !changed ||
503
thd->transaction.stmt.modified_non_trans_table);
457
session->set_proc_info("end");
506
thd_proc_info(thd, "end");
459
508
We'll report to the client this id:
460
509
- if the table contains an autoincrement column and we successfully
465
514
inserted, the id of the last "inserted" row (if IGNORE, that value may not
466
515
have been really inserted but ignored).
468
id= (session->first_successful_insert_id_in_cur_stmt > 0) ?
469
session->first_successful_insert_id_in_cur_stmt :
470
(session->arg_of_last_insert_id_function ?
471
session->first_successful_insert_id_in_prev_stmt :
517
id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
518
thd->first_successful_insert_id_in_cur_stmt :
519
(thd->arg_of_last_insert_id_function ?
520
thd->first_successful_insert_id_in_prev_stmt :
472
521
((table->next_number_field && info.copied) ?
473
522
table->next_number_field->val_int() : 0));
474
523
table->next_number_field=0;
475
session->count_cuted_fields= CHECK_FIELD_IGNORE;
524
thd->count_cuted_fields= CHECK_FIELD_IGNORE;
476
525
table->auto_increment_field_not_null= false;
477
526
if (duplic == DUP_REPLACE)
478
table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
527
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
483
table->cursor->ha_release_auto_increment();
485
free_underlaid_joins(session, &session->lex->select_lex);
486
session->abort_on_warning= 0;
487
DRIZZLE_INSERT_DONE(1, 0);
491
if (values_list.elements == 1 && (!(session->options & OPTION_WARNINGS) ||
492
!session->cuted_fields))
494
session->row_count_func= info.copied + info.deleted + info.updated;
495
session->my_ok((ulong) session->row_count_func,
496
info.copied + info.deleted + info.touched, id);
531
if (values_list.elements == 1 && (!(thd->options & OPTION_WARNINGS) ||
534
thd->row_count_func= info.copied + info.deleted +
535
((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
536
info.touched : info.updated);
537
my_ok(thd, (ulong) thd->row_count_func, id);
542
ha_rows updated=((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
543
info.touched : info.updated);
502
snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
503
(ulong) (info.records - info.copied), (ulong) session->cuted_fields);
545
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
546
(ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
505
snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
506
(ulong) (info.deleted + info.updated), (ulong) session->cuted_fields);
507
session->row_count_func= info.copied + info.deleted + info.updated;
508
session->my_ok((ulong) session->row_count_func,
509
info.copied + info.deleted + info.touched, id, buff);
548
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
549
(ulong) (info.deleted + updated), (ulong) thd->cuted_fields);
550
thd->row_count_func= info.copied + info.deleted + updated;
551
::my_ok(thd, (ulong) thd->row_count_func, id, buff);
511
session->status_var.inserted_row_count+= session->row_count_func;
512
session->abort_on_warning= 0;
513
DRIZZLE_INSERT_DONE(0, session->row_count_func);
553
thd->abort_on_warning= 0;
554
DRIZZLE_INSERT_END();
559
table->file->ha_release_auto_increment();
561
free_underlaid_joins(thd, &thd->lex->select_lex);
562
thd->abort_on_warning= 0;
563
DRIZZLE_INSERT_END();
924
if ((error=table->cursor->deleteRecord(table->getUpdateRecord())))
979
if ((error=table->file->ha_delete_row(table->record[1])))
927
if (!table->cursor->has_transactions())
928
session->transaction.stmt.markModifiedNonTransData();
982
if (!table->file->has_transactions())
983
thd->transaction.stmt.modified_non_trans_table= true;
929
984
/* Let us attempt do write_row() once more */
933
session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
988
thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
935
990
Restore column maps if they where replaced during an duplicate key
938
993
if (table->read_set != save_read_set ||
939
994
table->write_set != save_write_set)
940
table->column_bitmaps_set(*save_read_set, *save_write_set);
995
table->column_bitmaps_set(save_read_set, save_write_set);
942
else if ((error=table->cursor->insertRecord(table->getInsertRecord())))
997
else if ((error=table->file->ha_write_row(table->record[0])))
944
999
if (!info->ignore ||
945
table->cursor->is_fatal_error(error, HA_CHECK_DUP))
1000
table->file->is_fatal_error(error, HA_CHECK_DUP))
947
table->cursor->restore_auto_increment(prev_insert_id);
1002
table->file->restore_auto_increment(prev_insert_id);
948
1003
goto gok_or_after_err;
951
1006
after_n_copied_inc:
953
session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
1008
thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
955
1010
gok_or_after_err:
956
if (!table->cursor->has_transactions())
957
session->transaction.stmt.markModifiedNonTransData();
1012
my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH);
1013
if (!table->file->has_transactions())
1014
thd->transaction.stmt.modified_non_trans_table= true;
961
1018
info->last_errno= error;
962
1019
/* current_select is NULL if this is a delayed insert */
963
if (session->lex->current_select)
964
session->lex->current_select->no_error= 0; // Give error
965
table->print_error(error,MYF(0));
1020
if (thd->lex->current_select)
1021
thd->lex->current_select->no_error= 0; // Give error
1022
table->file->print_error(error,MYF(0));
968
table->cursor->restore_auto_increment(prev_insert_id);
969
table->column_bitmaps_set(*save_read_set, *save_write_set);
1025
table->file->restore_auto_increment(prev_insert_id);
1027
my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH);
1028
table->column_bitmaps_set(save_read_set, save_write_set);
1338
1404
bool select_insert::send_eof()
1341
bool const trans_table= table->cursor->has_transactions();
1407
bool const trans_table= table->file->has_transactions();
1345
error= table->cursor->ha_end_bulk_insert();
1346
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1347
table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1410
THD::killed_state killed_status= thd->killed;
1412
error= table->file->ha_end_bulk_insert();
1413
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1414
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1349
1416
if ((changed= (info.copied || info.deleted || info.updated)))
1352
1419
We must invalidate the table in the query cache before binlog writing
1353
and autocommitOrRollback.
1420
and ha_autocommit_or_rollback.
1355
if (session->transaction.stmt.hasModifiedNonTransData())
1356
session->transaction.all.markModifiedNonTransData();
1422
if (thd->transaction.stmt.modified_non_trans_table)
1423
thd->transaction.all.modified_non_trans_table= true;
1358
assert(trans_table || !changed ||
1359
session->transaction.stmt.hasModifiedNonTransData());
1425
assert(trans_table || !changed ||
1426
thd->transaction.stmt.modified_non_trans_table);
1361
table->cursor->ha_release_auto_increment();
1429
Write to binlog before commiting transaction. No statement will
1430
be written by the binlog_query() below in RBR mode. All the
1431
events are in the transaction cache and will be written when
1432
ha_autocommit_or_rollback() is issued below.
1434
if (mysql_bin_log.is_open())
1438
thd->binlog_query(THD::ROW_QUERY_TYPE,
1439
thd->query, thd->query_length,
1440
trans_table, false, killed_status);
1442
table->file->ha_release_auto_increment();
1365
table->print_error(error,MYF(0));
1366
DRIZZLE_INSERT_SELECT_DONE(error, 0);
1446
table->file->print_error(error,MYF(0));
1369
1449
char buff[160];
1370
1450
if (info.ignore)
1371
snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
1372
(ulong) (info.records - info.copied), (ulong) session->cuted_fields);
1451
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
1452
(ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
1374
snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
1375
(ulong) (info.deleted+info.updated), (ulong) session->cuted_fields);
1376
session->row_count_func= info.copied + info.deleted + info.updated;
1454
sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
1455
(ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields);
1456
thd->row_count_func= info.copied + info.deleted +
1457
((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
1458
info.touched : info.updated);
1378
id= (session->first_successful_insert_id_in_cur_stmt > 0) ?
1379
session->first_successful_insert_id_in_cur_stmt :
1380
(session->arg_of_last_insert_id_function ?
1381
session->first_successful_insert_id_in_prev_stmt :
1460
id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
1461
thd->first_successful_insert_id_in_cur_stmt :
1462
(thd->arg_of_last_insert_id_function ?
1463
thd->first_successful_insert_id_in_prev_stmt :
1382
1464
(info.copied ? autoinc_value_of_last_inserted_row : 0));
1383
session->my_ok((ulong) session->row_count_func,
1384
info.copied + info.deleted + info.touched, id, buff);
1385
session->status_var.inserted_row_count+= session->row_count_func;
1386
DRIZZLE_INSERT_SELECT_DONE(0, session->row_count_func);
1465
::my_ok(thd, (ulong) thd->row_count_func, id, buff);
1390
1469
void select_insert::abort() {
1394
1473
If the creation of the table failed (due to a syntax error, for
1395
1474
example), no table will have been opened and therefore 'table'
1481
static Table *create_table_from_items(Session *session, HA_CREATE_INFO *create_info,
1563
static Table *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
1482
1564
TableList *create_table,
1483
message::Table &table_proto,
1484
AlterInfo *alter_info,
1565
Alter_info *alter_info,
1485
1566
List<Item> *items,
1486
bool is_if_not_exists,
1488
TableIdentifier &identifier)
1567
DRIZZLE_LOCK **lock,
1568
TABLEOP_HOOKS *hooks)
1490
TableShare share(message::Table::INTERNAL);
1491
uint32_t select_field_count= items->elements;
1570
Table tmp_table; // Used during 'Create_field()'
1573
uint select_field_count= items->elements;
1492
1574
/* Add selected items to field list */
1493
1575
List_iterator_fast<Item> it(*items);
1495
1577
Field *tmp_field;
1498
if (not (identifier.isTmp()) && create_table->table->db_stat)
1580
if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
1581
create_table->table->db_stat)
1500
/* Table already exists and was open at openTablesLock() stage. */
1501
if (is_if_not_exists)
1583
/* Table already exists and was open at open_and_lock_tables() stage. */
1584
if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
1503
1586
create_info->table_existed= 1; // Mark that table existed
1504
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1587
push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1505
1588
ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
1506
create_table->getTableName());
1507
return create_table->table;
1589
create_table->table_name);
1590
return(create_table->table);
1510
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
1593
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
1598
tmp_table.timestamp_field= 0;
1599
tmp_table.s= &share;
1600
init_tmp_table_share(thd, &share, "", 0, "", "");
1602
tmp_table.s->db_create_options=0;
1603
tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
1604
tmp_table.s->db_low_byte_first=
1605
test(create_info->db_type == myisam_hton ||
1606
create_info->db_type == heap_hton);
1607
tmp_table.null_row= false;
1608
tmp_table.maybe_null= false;
1515
table::Shell tmp_table(share); // Used during 'CreateField()'
1516
tmp_table.timestamp_field= 0;
1518
tmp_table.getMutableShare()->db_create_options= 0;
1519
tmp_table.getMutableShare()->blob_ptr_size= portable_sizeof_char_ptr;
1521
if (not table_proto.engine().name().compare("MyISAM"))
1522
tmp_table.getMutableShare()->db_low_byte_first= true;
1523
else if (not table_proto.engine().name().compare("MEMORY"))
1524
tmp_table.getMutableShare()->db_low_byte_first= true;
1526
tmp_table.null_row= false;
1527
tmp_table.maybe_null= false;
1529
tmp_table.in_use= session;
1533
CreateField *cr_field;
1534
Field *field, *def_field;
1535
if (item->type() == Item::FUNC_ITEM)
1537
if (item->result_type() != STRING_RESULT)
1539
field= item->tmp_table_field(&tmp_table);
1543
field= item->tmp_table_field_from_field_type(&tmp_table, 0);
1612
Create_field *cr_field;
1613
Field *field, *def_field;
1614
if (item->type() == Item::FUNC_ITEM)
1615
if (item->result_type() != STRING_RESULT)
1616
field= item->tmp_table_field(&tmp_table);
1548
field= create_tmp_field(session, &tmp_table, item, item->type(),
1549
(Item ***) 0, &tmp_field, &def_field, false,
1554
!(cr_field=new CreateField(field,(item->type() == Item::FIELD_ITEM ?
1555
((Item_field *)item)->field :
1561
if (item->maybe_null)
1563
cr_field->flags &= ~NOT_NULL_FLAG;
1566
alter_info->create_list.push_back(cr_field);
1618
field= item->tmp_table_field_from_field_type(&tmp_table, 0);
1620
field= create_tmp_field(thd, &tmp_table, item, item->type(),
1621
(Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0,
1624
!(cr_field=new Create_field(field,(item->type() == Item::FIELD_ITEM ?
1625
((Item_field *)item)->field :
1628
if (item->maybe_null)
1629
cr_field->flags &= ~NOT_NULL_FLAG;
1630
alter_info->create_list.push_back(cr_field);
1573
1636
Note that we either creating (or opening existing) temporary table or
1574
1637
creating base table on which name we have exclusive lock. So code below
1575
1638
should not cause deadlocks or races.
1640
We don't log the statement, it will be logged later.
1642
If this is a HEAP table, the automatic DELETE FROM which is written to the
1643
binlog when a HEAP table is opened for the first time since startup, must
1644
not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
1645
don't want to delete from it) 2) it would be written before the CREATE
1646
Table, which is a wrong order. So we keep binary logging disabled when we
1579
if (not mysql_create_table_no_lock(session,
1650
tmp_disable_binlog(thd);
1651
if (!mysql_create_table_no_lock(thd, create_table->db,
1652
create_table->table_name,
1653
create_info, alter_info, 0,
1654
select_field_count))
1588
if (create_info->table_existed && not identifier.isTmp())
1656
if (create_info->table_existed &&
1657
!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
1591
1660
This means that someone created table underneath server
1592
1661
or it was created via different mysqld front-end to the
1593
1662
cluster. We don't have much options but throw an error.
1595
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
1664
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
1599
if (not identifier.isTmp())
1668
if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
1601
/* CREATE TABLE... has found that the table already exists for insert and is adapting to use it */
1602
boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex());
1604
if (create_table->table)
1670
VOID(pthread_mutex_lock(&LOCK_open));
1671
if (reopen_name_locked_table(thd, create_table, false))
1606
table::Concurrent *concurrent_table= static_cast<table::Concurrent *>(create_table->table);
1608
if (concurrent_table->reopen_name_locked_table(create_table, session))
1610
plugin::StorageEngine::dropTable(*session, identifier);
1614
table= create_table->table;
1673
quick_rm_table(create_info->db_type, create_table->db,
1674
table_case_name(create_info, create_table->table_name),
1619
plugin::StorageEngine::dropTable(*session, identifier);
1678
table= create_table->table;
1679
VOID(pthread_mutex_unlock(&LOCK_open));
1624
if (not (table= session->openTable(create_table, (bool*) 0,
1625
DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
1626
not create_info->table_existed)
1683
if (!(table= open_table(thd, create_table, (bool*) 0,
1684
DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
1685
!create_info->table_existed)
1629
1688
This shouldn't happen as creation of temporary table should make
1630
1689
it preparable for open. But let us do close_temporary_table() here
1633
session->drop_temporary_table(identifier);
1692
drop_temporary_table(thd, create_table);
1637
if (not table) // open failed
1696
reenable_binlog(thd);
1697
if (!table) // open failed
1641
1701
table->reginfo.lock_type=TL_WRITE;
1642
if (! ((*lock)= session->lockTables(&table, 1, DRIZZLE_LOCK_IGNORE_FLUSH, ¬_used)))
1702
hooks->prelock(&table, 1); // Call prelock hooks
1703
if (! ((*lock)= mysql_lock_tables(thd, &table, 1,
1704
DRIZZLE_LOCK_IGNORE_FLUSH, ¬_used)) ||
1705
hooks->postlock(&table, 1))
1646
session->unlockTables(*lock);
1709
mysql_unlock_tables(thd, *lock);
1650
if (not create_info->table_existed)
1651
session->drop_open_table(table, identifier);
1713
if (!create_info->table_existed)
1714
drop_open_table(thd, table, create_table->db, create_table->table_name);
1660
select_create::prepare(List<Item> &values, Select_Lex_Unit *u)
1722
select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
1662
DrizzleLock *extra_lock= NULL;
1724
DRIZZLE_LOCK *extra_lock= NULL;
1727
TABLEOP_HOOKS *hook_ptr= NULL;
1664
For replication, the CREATE-SELECT statement is written
1665
in two pieces: the first transaction messsage contains
1666
the CREATE TABLE statement as a CreateTableStatement message
1667
necessary to create the table.
1669
The second transaction message contains all the InsertStatement
1670
and associated InsertRecords that should go into the table.
1729
For row-based replication, the CREATE-SELECT statement is written
1730
in two pieces: the first one contain the CREATE TABLE statement
1731
necessary to create the table and the second part contain the rows
1732
that should go into the table.
1734
For non-temporary tables, the start of the CREATE-SELECT
1735
implicitly commits the previous transaction, and all events
1736
forming the statement will be stored the transaction cache. At end
1737
of the statement, the entire statement is committed as a
1738
transaction, and all events are written to the binary log.
1740
On the master, the table is locked for the duration of the
1741
statement, but since the CREATE part is replicated as a simple
1742
statement, there is no way to lock the table for accesses on the
1743
slave. Hence, we have to hold on to the CREATE part of the
1744
statement until the statement has finished.
1746
class MY_HOOKS : public TABLEOP_HOOKS {
1748
MY_HOOKS(select_create *x, TableList *create_table,
1749
TableList *select_tables)
1750
: ptr(x), all_tables(*create_table)
1752
all_tables.next_global= select_tables;
1756
virtual int do_postlock(Table **tables, uint count)
1758
THD *thd= const_cast<THD*>(ptr->get_thd());
1759
if (int error= decide_logging_format(thd, &all_tables))
1762
Table const *const table = *tables;
1763
if (thd->current_stmt_binlog_row_based &&
1764
!table->s->tmp_table &&
1765
!ptr->get_create_info()->table_existed)
1767
ptr->binlog_show_create_table(tables, count);
1773
TableList all_tables;
1776
MY_HOOKS hooks(this, create_table, select_tables);
1675
if (not (table= create_table_from_items(session, create_info, create_table,
1677
alter_info, &values,
1679
&extra_lock, identifier)))
1782
Start a statement transaction before the create if we are using
1783
row-based replication for the statement. If we are creating a
1784
temporary table, we need to start a statement transaction.
1786
if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 &&
1787
thd->current_stmt_binlog_row_based)
1789
thd->binlog_start_trans_and_stmt();
1792
if (!(table= create_table_from_items(thd, create_info, create_table,
1793
alter_info, &values,
1794
&extra_lock, hook_ptr)))
1681
1795
return(-1); // abort() deletes table
1684
1797
if (extra_lock)
1686
1799
assert(m_plock == NULL);
1688
if (identifier.isTmp())
1801
if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
1689
1802
m_plock= &m_lock;
1691
m_plock= &session->extra_lock;
1804
m_plock= &thd->extra_lock;
1693
1806
*m_plock= extra_lock;
1696
if (table->getShare()->sizeFields() < values.elements)
1809
if (table->s->fields < values.elements)
1698
1811
my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
1702
1815
/* First field to copy */
1703
field= table->getFields() + table->getShare()->sizeFields() - values.elements;
1816
field= table->field+table->s->fields - values.elements;
1705
1818
/* Mark all fields that are given values */
1706
1819
for (Field **f= field ; *f ; f++)
1708
table->setWriteSet((*f)->position());
1820
bitmap_set_bit(table->write_set, (*f)->field_index);
1711
1822
/* Don't set timestamp if used */
1712
1823
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
1713
1824
table->next_number_field=table->found_next_number_field;
1715
table->restoreRecordAsDefault(); // Get empty record
1716
session->cuted_fields=0;
1826
restore_record(table,s->default_values); // Get empty record
1827
thd->cuted_fields=0;
1717
1828
if (info.ignore || info.handle_duplicates != DUP_ERROR)
1718
table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
1829
table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
1719
1830
if (info.handle_duplicates == DUP_REPLACE)
1720
table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1831
table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1721
1832
if (info.handle_duplicates == DUP_UPDATE)
1722
table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1723
table->cursor->ha_start_bulk_insert((ha_rows) 0);
1724
session->abort_on_warning= !info.ignore;
1725
if (check_that_all_fields_are_given_values(session, table, table_list))
1833
table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1834
table->file->ha_start_bulk_insert((ha_rows) 0);
1835
thd->abort_on_warning= !info.ignore;
1836
if (check_that_all_fields_are_given_values(thd, table, table_list))
1727
1838
table->mark_columns_needed_for_insert();
1728
table->cursor->extra(HA_EXTRA_WRITE_CACHE);
1839
table->file->extra(HA_EXTRA_WRITE_CACHE);
1844
select_create::binlog_show_create_table(Table **tables, uint count)
1847
Note 1: In RBR mode, we generate a CREATE TABLE statement for the
1848
created table by calling store_create_info() (behaves as SHOW
1849
CREATE TABLE). In the event of an error, nothing should be
1850
written to the binary log, even if the table is non-transactional;
1851
therefore we pretend that the generated CREATE TABLE statement is
1852
for a transactional table. The event will then be put in the
1853
transaction cache, and any subsequent events (e.g., table-map
1854
events and binrow events) will also be put there. We can then use
1855
ha_autocommit_or_rollback() to either throw away the entire
1856
kaboodle of events, or write them to the binary log.
1858
We write the CREATE TABLE statement here and not in prepare()
1859
since there potentially are sub-selects or accesses to information
1860
schema that will do a close_thread_tables(), destroying the
1861
statement transaction cache.
1863
assert(thd->current_stmt_binlog_row_based);
1864
assert(tables && *tables && count > 0);
1867
String query(buf, sizeof(buf), system_charset_info);
1869
TableList tmp_table_list;
1871
memset(&tmp_table_list, 0, sizeof(tmp_table_list));
1872
tmp_table_list.table = *tables;
1873
query.length(0); // Have to zero it since constructor doesn't
1875
result= store_create_info(thd, &tmp_table_list, &query, create_info);
1876
assert(result == 0); /* store_create_info() always return 0 */
1878
thd->binlog_query(THD::STMT_QUERY_TYPE,
1879
query.ptr(), query.length(),
1880
/* is_trans */ true,
1881
/* suppress_use */ false);
1732
1884
void select_create::store_values(List<Item> &values)
1734
fill_record(session, field, values, true);
1886
fill_record(thd, field, values, 1);
1738
void select_create::send_error(uint32_t errcode,const char *err)
1890
void select_create::send_error(uint errcode,const char *err)
1741
1895
This will execute any rollbacks that are necessary before writing
1742
1896
the transcation cache.