724
/***************************************************************************
725
Update multiple tables from join
726
***************************************************************************/
729
Get table map for list of Item_field
732
static table_map get_table_map(List<Item> *items)
734
List_iterator_fast<Item> item_it(*items);
738
while ((item= (Item_field *) item_it++))
739
map|= item->used_tables();
745
make update specific preparation and checks after opening tables
748
mysql_multi_update_prepare()
749
session thread handler
756
int mysql_multi_update_prepare(Session *session)
758
LEX *lex= session->lex;
759
TableList *table_list= lex->query_tables;
760
TableList *tl, *leaves;
761
List<Item> *fields= &lex->select_lex.item_list;
762
table_map tables_for_update;
764
if this multi-update was converted from usual update, here is table
765
counter else junk will be assigned here, but then replaced with real
766
count in open_tables()
768
uint32_t table_count= lex->table_count;
769
bool original_multiupdate= (session->lex->sql_command == SQLCOM_UPDATE_MULTI);
770
bool need_reopen= false;
773
/* following need for prepared statements, to run next time multi-update */
774
session->lex->sql_command= SQLCOM_UPDATE_MULTI;
778
/* open tables and create derived ones, but do not lock and fill them */
779
if (((original_multiupdate || need_reopen) &&
780
session->open_tables_from_list(&table_list, &table_count, false)) ||
781
mysql_handle_derived(lex, &mysql_derived_prepare))
784
setup_tables() need for VIEWs. JOIN::prepare() will call setup_tables()
785
second time, but this call will do nothing (there are check for second
786
call in setup_tables()).
789
if (setup_tables_and_check_access(session, &lex->select_lex.context,
790
&lex->select_lex.top_join_list,
792
&lex->select_lex.leaf_tables, false))
795
if (setup_fields_with_no_wrap(session, 0, *fields, MARK_COLUMNS_WRITE, 0, 0))
798
tables_for_update= get_table_map(fields);
801
Setup timestamp handling and locking mode
803
leaves= lex->select_lex.leaf_tables;
804
for (tl= leaves; tl; tl= tl->next_leaf)
806
Table *table= tl->table;
807
/* Only set timestamp column if this is not modified */
808
if (table->timestamp_field && table->timestamp_field->isWriteSet())
809
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
811
/* if table will be updated then check that it is unique */
812
if (table->map & tables_for_update)
814
table->mark_columns_needed_for_update();
816
If table will be updated we should not downgrade lock for it and
823
If we are using the binary log, we need TL_READ_NO_INSERT to get
824
correct order of statements. Otherwise, we use a TL_READ lock to
827
tl->lock_type= TL_READ;
829
/* Update Table::lock_type accordingly. */
830
if (!tl->placeholder())
831
tl->table->reginfo.lock_type= tl->lock_type;
835
/* now lock and fill tables */
836
if (lock_tables(session, table_list, table_count, &need_reopen))
842
We have to reopen tables since some of them were altered or dropped
843
during lock_tables() or something was done with their triggers.
844
Let us do some cleanups to be able do setup_table() and setup_fields()
847
List_iterator_fast<Item> it(*fields);
852
session->close_tables_for_reopen(&table_list);
858
Check that we are not using table that we are updating, but we should
859
skip all tables of UPDATE SELECT itself
861
lex->select_lex.exclude_from_table_unique_test= true;
862
/* We only need SELECT privilege for columns in the values list */
863
for (tl= leaves; tl; tl= tl->next_leaf)
865
if (tl->lock_type != TL_READ &&
866
tl->lock_type != TL_READ_NO_INSERT)
868
TableList *duplicate;
869
if ((duplicate= unique_table(session, tl, table_list, 0)))
871
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->alias);
878
Set exclude_from_table_unique_test value back to false. It is needed for
879
further check in multi_update::prepare whether to use record cache.
881
lex->select_lex.exclude_from_table_unique_test= false;
883
if (session->fill_derived_tables() &&
884
mysql_handle_derived(lex, &mysql_derived_filling))
892
Setup multi-update handling and call SELECT to do the join
895
bool mysql_multi_update(Session *session,
896
TableList *table_list,
901
enum enum_duplicates handle_duplicates, bool ignore,
902
Select_Lex_Unit *unit, Select_Lex *select_lex)
904
multi_update *result;
907
if (!(result= new multi_update(table_list,
908
session->lex->select_lex.leaf_tables,
910
handle_duplicates, ignore)))
913
session->abort_on_warning= true;
915
List<Item> total_list;
916
res= mysql_select(session, &select_lex->ref_pointer_array,
917
table_list, select_lex->with_wild,
919
conds, 0, (order_st *) NULL, (order_st *)NULL, (Item *) NULL,
920
options | SELECT_NO_JOIN_CACHE | SELECT_NO_UNLOCK |
921
OPTION_SETUP_TABLES_DONE,
922
result, unit, select_lex);
923
res|= session->is_error();
926
/* If we had a another error reported earlier then this will be ignored */
927
result->send_error(ER_UNKNOWN_ERROR, ER(ER_UNKNOWN_ERROR));
931
session->abort_on_warning= 0;
936
multi_update::multi_update(TableList *table_list,
937
TableList *leaves_list,
938
List<Item> *field_list, List<Item> *value_list,
939
enum enum_duplicates handle_duplicates_arg,
941
:all_tables(table_list), leaves(leaves_list),
942
tmp_tables(0), updated(0), found(0), fields(field_list),
943
values(value_list), table_count(0), copy_field(0),
944
handle_duplicates(handle_duplicates_arg), do_update(1), trans_safe(1),
945
transactional_tables(0), ignore(ignore_arg), error_handled(0)
950
Connect fields with tables and create list of tables that are updated
953
int multi_update::prepare(List<Item> &,
956
TableList *table_ref;
957
table_map tables_to_update;
959
List_iterator_fast<Item> field_it(*fields);
960
List_iterator_fast<Item> value_it(*values);
961
uint32_t i, max_fields;
962
uint32_t leaf_table_count= 0;
964
session->count_cuted_fields= CHECK_FIELD_WARN;
965
session->cuted_fields=0L;
966
session->set_proc_info("updating main table");
968
tables_to_update= get_table_map(fields);
970
if (!tables_to_update)
972
my_message(ER_NO_TABLES_USED, ER(ER_NO_TABLES_USED), MYF(0));
977
We have to check values after setup_tables to get covering_keys right in
981
if (setup_fields(session, 0, *values, MARK_COLUMNS_READ, 0, 0))
985
Save tables beeing updated in update_tables
986
update_table->shared is position for table
987
Don't use key read on tables that are updated
989
for (table_ref= leaves; table_ref; table_ref= table_ref->next_leaf)
991
/* TODO: add support of view of join support */
992
Table *table=table_ref->table;
994
if (tables_to_update & table->map)
996
TableList *tl= (TableList*) session->memdup((char*) table_ref,
1000
update_tables.push_back(tl);
1001
tl->shared= table_count++;
1002
table->no_keyread=1;
1003
table->covering_keys.reset();
1004
table->pos_in_table_list= tl;
1009
table_count= update_tables.size();
1011
tmp_tables = (Table**) session->calloc(sizeof(Table *) * table_count);
1012
tmp_table_param = (Tmp_Table_Param*) session->calloc(sizeof(Tmp_Table_Param) *
1014
fields_for_table= (List_item **) session->alloc(sizeof(List_item *) *
1016
values_for_table= (List_item **) session->alloc(sizeof(List_item *) *
1018
if (session->is_fatal_error)
1020
for (i=0 ; i < table_count ; i++)
1022
fields_for_table[i]= new List_item;
1023
values_for_table[i]= new List_item;
1025
if (session->is_fatal_error)
1028
/* Split fields into fields_for_table[] and values_by_table[] */
1030
while ((item= (Item_field *) field_it++))
1032
Item *value= value_it++;
1033
uint32_t offset= item->field->table->pos_in_table_list->shared;
1034
fields_for_table[offset]->push_back(item);
1035
values_for_table[offset]->push_back(value);
1037
if (session->is_fatal_error)
1040
/* Allocate copy fields */
1042
for (i=0 ; i < table_count ; i++)
1043
set_if_bigger(max_fields, fields_for_table[i]->elements + leaf_table_count);
1044
copy_field= new CopyField[max_fields];
1045
return(session->is_fatal_error != 0);
1050
Check if table is safe to update on fly
1053
safe_update_on_fly()
1054
session Thread handler
1055
join_tab How table is used in join
1056
all_tables List of tables
1059
We can update the first table in join on the fly if we know that
1060
a row in this table will never be read twice. This is true under
1061
the following conditions:
1063
- We are doing a table scan and the data is in a separate file (MyISAM) or
1064
if we don't update a clustered key.
1066
- We are doing a range scan and we don't update the scan key or
1067
the primary key for a clustered table handler.
1069
- Table is not joined to itself.
1071
This function gets information about fields to be updated from
1072
the Table::write_set bitmap.
1075
This code is a bit dependent of how make_join_readinfo() works.
1078
0 Not safe to update
1082
static bool safe_update_on_fly(Session *session, JoinTable *join_tab,
1083
TableList *table_ref, TableList *all_tables)
1085
Table *table= join_tab->table;
1086
if (unique_table(session, table_ref, all_tables, 0))
1088
switch (join_tab->type) {
1092
return true; // At most one matching row
1094
case JT_REF_OR_NULL:
1095
return !is_key_used(table, join_tab->ref.key, table->write_set);
1097
/* If range search on index */
1098
if (join_tab->quick)
1099
return !join_tab->quick->is_keys_used(table->write_set);
1100
/* If scanning in clustered key */
1101
if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_IN_READ_INDEX) &&
1102
table->s->primary_key < MAX_KEY)
1103
return !is_key_used(table, table->s->primary_key, table->write_set);
1106
break; // Avoid compler warning
1114
Initialize table for multi table
1117
- Update first table in join on the fly, if possible
1118
- Create temporary tables to store changed values for all other tables
1119
that are updated (and main_table if the above doesn't hold).
1123
multi_update::initialize_tables(JOIN *join)
1125
if ((session->options & OPTION_SAFE_UPDATES) && error_if_full_join(join))
1127
main_table=join->join_tab->table;
1130
/* Any update has at least one pair (field, value) */
1131
assert(fields->elements);
1133
/* Create a temporary table for keys to all tables, except main table */
1134
for (list<TableList*>::iterator it= update_tables.begin();
1135
it != update_tables.end();
1138
Table *table= (*it)->table;
1139
uint32_t cnt= (*it)->shared;
1140
List<Item> temp_fields;
1142
Tmp_Table_Param *tmp_param;
1144
table->mark_columns_needed_for_update();
1146
table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
1147
if (table == main_table) // First table in join
1149
if (safe_update_on_fly(session, join->join_tab, (*it), all_tables))
1151
table_to_update= main_table; // Update table on the fly
1155
table->prepare_for_position();
1157
tmp_param= tmp_table_param+cnt;
1160
Create a temporary table to store all fields that are changed for this
1161
table. The first field in the temporary table is a pointer to the
1162
original row so that we can find and update it. For the updatable
1163
VIEW a few following fields are rowids of tables used in the CHECK
1167
List_iterator_fast<Table> tbl_it(unupdated_check_opt_tables);
1171
Field_varstring *field= new Field_varstring(tbl->file->ref_length, 0,
1172
tbl->alias, tbl->s, &my_charset_bin);
1177
The field will be converted to varstring when creating tmp table if
1178
table to be updated was created by mysql 4.1. Deny this.
1180
Item_field *ifield= new Item_field((Field *) field);
1183
ifield->maybe_null= 0;
1184
if (temp_fields.push_back(ifield))
1186
} while ((tbl= tbl_it++));
1188
temp_fields.concat(fields_for_table[cnt]);
1190
/* Make an unique key over the first field to avoid duplicated updates */
1191
memset(&group, 0, sizeof(group));
1193
group.item= (Item**) temp_fields.head_ref();
1195
tmp_param->quick_group=1;
1196
tmp_param->field_count=temp_fields.elements;
1197
tmp_param->group_parts=1;
1198
tmp_param->group_length= table->file->ref_length;
1199
if (!(tmp_tables[cnt]=create_tmp_table(session,
1202
(order_st*) &group, 0, 0,
1203
TMP_TABLE_ALL_COLUMNS,
1207
tmp_tables[cnt]->file->extra(HA_EXTRA_WRITE_CACHE);
1213
multi_update::~multi_update()
1216
for (list<TableList*>::iterator it= update_tables.begin();
1217
it != update_tables.end();
1221
table->table->no_keyread= table->table->no_cache= 0;
1223
table->table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1228
for (uint32_t cnt = 0; cnt < table_count; cnt++)
1230
if (tmp_tables[cnt])
1232
tmp_tables[cnt]->free_tmp_table(session);
1233
tmp_table_param[cnt].cleanup();
1238
delete [] copy_field;
1239
session->count_cuted_fields= CHECK_FIELD_IGNORE; // Restore this setting
1240
assert(trans_safe || !updated ||
1241
session->transaction.all.modified_non_trans_table);
1245
bool multi_update::send_data(List<Item> &)
1247
for (list<TableList*>::iterator it= update_tables.begin();
1248
it != update_tables.end();
1251
Table *table= (*it)->table;
1252
uint32_t offset= (*it)->shared;
1254
Check if we are using outer join and we didn't find the row
1255
or if we have already updated this row in the previous call to this
1258
The same row may be presented here several times in a join of type
1259
UPDATE t1 FROM t1,t2 SET t1.a=t2.a
1261
In this case we will do the update for the first found row combination.
1262
The join algorithm guarantees that we will not find the a row in
1265
if (table->status & (STATUS_NULL_ROW | STATUS_UPDATED))
1269
We can use compare_record() to optimize away updates if
1270
the table handler is returning all columns OR if
1271
if all updated columns are read
1273
if (table == table_to_update)
1275
bool can_compare_record;
1276
can_compare_record= (!(table->file->ha_table_flags() &
1277
HA_PARTIAL_COLUMN_READ) ||
1278
bitmap_is_subset(table->write_set,
1280
table->status|= STATUS_UPDATED;
1281
table->storeRecord();
1282
if (fill_record(session, *fields_for_table[offset],
1283
*values_for_table[offset], 0))
1287
if (!can_compare_record || table->compare_record())
1293
Inform the main table that we are going to update the table even
1294
while we may be scanning it. This will flush the read cache
1297
main_table->file->extra(HA_EXTRA_PREPARE_FOR_UPDATE);
1299
if ((error=table->file->ha_update_row(table->record[1],
1300
table->record[0])) &&
1301
error != HA_ERR_RECORD_IS_THE_SAME)
1305
table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
1308
If (ignore && error == is ignorable) we don't have to
1309
do anything; otherwise...
1313
if (table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
1314
flags|= ME_FATALERROR; /* Other handler errors are fatal */
1316
prepare_record_for_error_message(error, table);
1317
table->file->print_error(error,MYF(flags));
1323
if (error == HA_ERR_RECORD_IS_THE_SAME)
1328
/* non-transactional or transactional table got modified */
1329
/* either multi_update class' flag is raised in its branch */
1330
if (table->file->has_transactions())
1331
transactional_tables= 1;
1335
session->transaction.stmt.modified_non_trans_table= true;
1343
Table *tmp_table= tmp_tables[offset];
1345
For updatable VIEW store rowid of the updated table and
1346
rowids of tables used in the CHECK OPTION condition.
1348
uint32_t field_num= 0;
1349
List_iterator_fast<Table> tbl_it(unupdated_check_opt_tables);
1353
tbl->file->position(tbl->record[0]);
1354
Field_varstring *ref_field=
1355
reinterpret_cast<Field_varstring *>(tmp_table->field[field_num]);
1356
ref_field->store((char *)tbl->file->ref, tbl->file->ref_length,
1359
} while ((tbl= tbl_it++));
1361
/* Store regular updated fields in the row. */
1362
fill_record(session,
1363
tmp_table->field + 1 + unupdated_check_opt_tables.elements,
1364
*values_for_table[offset], 1);
1366
/* Write row, ignoring duplicated updates to a row */
1367
error= tmp_table->file->ha_write_row(tmp_table->record[0]);
1368
if (error != HA_ERR_FOUND_DUPP_KEY && error != HA_ERR_FOUND_DUPP_UNIQUE)
1371
create_myisam_from_heap(session, tmp_table,
1372
tmp_table_param[offset].start_recinfo,
1373
&tmp_table_param[offset].recinfo,
1377
return(1); // Not a table_is_full error
1387
void multi_update::send_error(uint32_t errcode,const char *err)
1389
/* First send error what ever it is ... */
1390
my_error(errcode, MYF(0), err);
1394
void multi_update::abort()
1396
/* the error was handled or nothing deleted and no side effects return */
1397
if (error_handled ||
1398
(!session->transaction.stmt.modified_non_trans_table && !updated))
1401
If all tables that has been updated are trans safe then just do rollback.
1402
If not attempt to do remaining updates.
1407
assert(session->transaction.stmt.modified_non_trans_table);
1408
if (do_update && table_count > 1)
1410
/* Add warning here */
1412
todo/fixme: do_update() is never called with the arg 1.
1413
should it change the signature to become argless?
1418
if (session->transaction.stmt.modified_non_trans_table)
1420
session->transaction.all.modified_non_trans_table= true;
1422
assert(trans_safe || !updated || session->transaction.stmt.modified_non_trans_table);
1426
int multi_update::do_updates()
1428
TableList *cur_table;
1430
ha_rows org_updated;
1431
Table *table, *tmp_table;
1432
List_iterator_fast<Table> check_opt_it(unupdated_check_opt_tables);
1434
do_update= 0; // Don't retry this function
1437
for (list<TableList*>::iterator it= update_tables.begin();
1438
it != update_tables.end();
1442
bool can_compare_record;
1443
uint32_t offset= cur_table->shared;
1445
table = cur_table->table;
1446
if (table == table_to_update)
1447
continue; // Already updated
1448
org_updated= updated;
1449
tmp_table= tmp_tables[cur_table->shared];
1450
tmp_table->file->extra(HA_EXTRA_CACHE); // Change to read cache
1451
(void) table->file->ha_rnd_init(0);
1452
table->file->extra(HA_EXTRA_NO_CACHE);
1454
check_opt_it.rewind();
1455
while(Table *tbl= check_opt_it++)
1457
if (tbl->file->ha_rnd_init(1))
1459
tbl->file->extra(HA_EXTRA_CACHE);
1463
Setup copy functions to copy fields from temporary table
1465
List_iterator_fast<Item> field_it(*fields_for_table[offset]);
1466
Field **field= tmp_table->field +
1467
1 + unupdated_check_opt_tables.elements; // Skip row pointers
1468
CopyField *copy_field_ptr= copy_field, *copy_field_end;
1469
for ( ; *field ; field++)
1471
Item_field *item= (Item_field* ) field_it++;
1472
(copy_field_ptr++)->set(item->field, *field, 0);
1474
copy_field_end=copy_field_ptr;
1476
if ((local_error = tmp_table->file->ha_rnd_init(1)))
1479
can_compare_record= (!(table->file->ha_table_flags() &
1480
HA_PARTIAL_COLUMN_READ) ||
1481
bitmap_is_subset(table->write_set,
1486
if (session->killed && trans_safe)
1488
if ((local_error=tmp_table->file->rnd_next(tmp_table->record[0])))
1490
if (local_error == HA_ERR_END_OF_FILE)
1492
if (local_error == HA_ERR_RECORD_DELETED)
1493
continue; // May happen on dup key
1497
/* call rnd_pos() using rowids from temporary table */
1498
check_opt_it.rewind();
1500
uint32_t field_num= 0;
1503
Field_varstring *ref_field=
1504
reinterpret_cast<Field_varstring *>(tmp_table->field[field_num]);
1506
tbl->file->rnd_pos(tbl->record[0],
1507
(unsigned char *) ref_field->ptr
1508
+ ref_field->length_bytes)))
1511
} while((tbl= check_opt_it++));
1513
table->status|= STATUS_UPDATED;
1514
table->storeRecord();
1516
/* Copy data from temporary table to current table */
1517
for (copy_field_ptr=copy_field;
1518
copy_field_ptr != copy_field_end;
1520
(*copy_field_ptr->do_copy)(copy_field_ptr);
1522
if (!can_compare_record || table->compare_record())
1524
if ((local_error=table->file->ha_update_row(table->record[1],
1525
table->record[0])) &&
1526
local_error != HA_ERR_RECORD_IS_THE_SAME)
1529
table->file->is_fatal_error(local_error, HA_CHECK_DUP_KEY))
1532
if (local_error != HA_ERR_RECORD_IS_THE_SAME)
1539
if (updated != org_updated)
1541
if (table->file->has_transactions())
1542
transactional_tables= 1;
1545
trans_safe= 0; // Can't do safe rollback
1546
session->transaction.stmt.modified_non_trans_table= true;
1549
(void) table->file->ha_rnd_end();
1550
(void) tmp_table->file->ha_rnd_end();
1551
check_opt_it.rewind();
1552
while (Table *tbl= check_opt_it++)
1553
tbl->file->ha_rnd_end();
1559
prepare_record_for_error_message(local_error, table);
1560
table->file->print_error(local_error,MYF(ME_FATALERROR));
1563
(void) table->file->ha_rnd_end();
1564
(void) tmp_table->file->ha_rnd_end();
1565
check_opt_it.rewind();
1566
while (Table *tbl= check_opt_it++)
1567
tbl->file->ha_rnd_end();
1569
if (updated != org_updated)
1571
if (table->file->has_transactions())
1572
transactional_tables= 1;
1576
session->transaction.stmt.modified_non_trans_table= true;
1583
/* out: 1 if error, 0 if success */
1585
bool multi_update::send_eof()
1587
char buff[STRING_BUFFER_USUAL_SIZE];
1589
Session::killed_state killed_status= Session::NOT_KILLED;
1591
session->set_proc_info("updating reference tables");
1594
Does updates for the last n - 1 tables, returns 0 if ok;
1595
error takes into account killed status gained in do_updates()
1597
int local_error = (table_count) ? do_updates() : 0;
1599
if local_error is not set ON until after do_updates() then
1600
later carried out killing should not affect binlogging.
1602
killed_status= (local_error == 0)? Session::NOT_KILLED : session->killed;
1603
session->set_proc_info("end");
1606
Write the SQL statement to the binlog if we updated
1607
rows and we succeeded or if we updated some non
1608
transactional tables.
1610
The query has to binlog because there's a modified non-transactional table
1611
either from the query's list or via a stored routine: bug#13270,23333
1614
assert(trans_safe || !updated ||
1615
session->transaction.stmt.modified_non_trans_table);
1616
if (local_error == 0 || session->transaction.stmt.modified_non_trans_table)
1618
if (session->transaction.stmt.modified_non_trans_table)
1619
session->transaction.all.modified_non_trans_table= true;
1621
if (local_error != 0)
1622
error_handled= true; // to force early leave from ::send_error()
1624
if (local_error > 0) // if the above log write did not fail ...
1626
/* Safety: If we haven't got an error before (can happen in do_updates) */
1627
my_message(ER_UNKNOWN_ERROR, "An error occured in multi-table update",
1632
id= session->arg_of_last_insert_id_function ?
1633
session->first_successful_insert_id_in_prev_stmt : 0;
1634
sprintf(buff, ER(ER_UPDATE_INFO), (ulong) found, (ulong) updated,
1635
(ulong) session->cuted_fields);
1636
session->row_count_func=
1637
(session->client_capabilities & CLIENT_FOUND_ROWS) ? found : updated;
1638
session->my_ok((ulong) session->row_count_func, id, buff);