12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
18
18
Single table and multi table updates of tables.
19
Multi-table updates were introduced by Sinisa & Monty
21
#include <drizzled/server_includes.h>
23
22
#include <drizzled/sql_select.h>
24
23
#include <drizzled/error.h>
25
24
#include <drizzled/probes.h>
26
25
#include <drizzled/sql_base.h>
27
#include <drizzled/field/epoch.h>
26
#include <drizzled/field/timestamp.h>
28
27
#include <drizzled/sql_parse.h>
29
#include <drizzled/optimizer/range.h>
30
#include <drizzled/records.h>
31
#include <drizzled/internal/my_sys.h>
32
#include <drizzled/internal/iocache.h>
33
#include <drizzled/transaction_services.h>
34
#include <drizzled/filesort.h>
35
#include <drizzled/plugin/storage_engine.h>
37
#include <boost/dynamic_bitset.hpp>
40
31
using namespace std;
46
35
Re-read record if more columns are needed for error message.
48
37
If we got a duplicate key error, we want to write an error
49
38
message containing the value of the duplicate key. If we do not have
50
all fields of the key value in getInsertRecord(), we need to re-read the
39
all fields of the key value in record[0], we need to re-read the
51
40
record with a proper read_set.
53
42
@param[in] error error number
57
46
static void prepare_record_for_error_message(int error, Table *table)
59
Field **field_p= NULL;
51
MY_BITMAP unique_map; /* Fields in offended unique. */
52
my_bitmap_map unique_map_buf[bitmap_buffer_size(MAX_FIELDS)];
64
55
Only duplicate key errors print the key value.
65
56
If storage engine does always read all columns, we have the value alraedy.
67
58
if ((error != HA_ERR_FOUND_DUPP_KEY) ||
68
! (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ)))
59
!(table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ))
72
63
Get the number of the offended index.
73
64
We will see MAX_KEY if the engine cannot determine the affected index.
75
if ((keynr= table->get_dup_key(error)) >= MAX_KEY)
66
if ((keynr= table->file->get_dup_key(error)) >= MAX_KEY)
78
69
/* Create unique_map with all fields used by that index. */
79
boost::dynamic_bitset<> unique_map(table->getShare()->sizeFields()); /* Fields in offended unique. */
80
table->mark_columns_used_by_index_no_reset(keynr, unique_map);
70
bitmap_init(&unique_map, unique_map_buf, table->s->fields);
71
table->mark_columns_used_by_index_no_reset(keynr, &unique_map);
82
73
/* Subtract read_set and write_set. */
83
unique_map-= *table->read_set;
84
unique_map-= *table->write_set;
74
bitmap_subtract(&unique_map, table->read_set);
75
bitmap_subtract(&unique_map, table->write_set);
87
78
If the unique index uses columns that are neither in read_set
88
79
nor in write_set, we must re-read the record.
89
80
Otherwise no need to do anything.
91
if (unique_map.none())
82
if (bitmap_is_clear_all(&unique_map))
94
/* Get identifier of last read record into table->cursor->ref. */
95
table->cursor->position(table->getInsertRecord());
85
/* Get identifier of last read record into table->file->ref. */
86
table->file->position(table->record[0]);
96
87
/* Add all fields used by unique index to read_set. */
97
*table->read_set|= unique_map;
98
/* Read record that is identified by table->cursor->ref. */
99
(void) table->cursor->rnd_pos(table->getUpdateRecord(), table->cursor->ref);
88
bitmap_union(table->read_set, &unique_map);
89
/* Read record that is identified by table->file->ref. */
90
(void) table->file->rnd_pos(table->record[1], table->file->ref);
100
91
/* Copy the newly read columns into the new record. */
101
for (field_p= table->getFields(); (field= *field_p); field_p++)
103
if (unique_map.test(field->position()))
105
field->copy_from_tmp(table->getShare()->rec_buff_length);
92
for (field_p= table->field; (field= *field_p); field_p++)
93
if (bitmap_is_set(&unique_map, field->field_index))
94
field->copy_from_tmp(table->s->rec_buff_length);
132
int update_query(Session *session, TableList *table_list,
119
int mysql_update(Session *session, TableList *table_list,
133
120
List<Item> &fields, List<Item> &values, COND *conds,
134
uint32_t order_num, Order *order,
121
uint32_t order_num, order_st *order,
135
122
ha_rows limit, enum enum_duplicates,
138
125
bool using_limit= limit != HA_POS_ERROR;
139
bool used_key_is_modified;
140
bool transactional_table;
126
bool safe_update= test(session->options & OPTION_SAFE_UPDATES);
127
bool used_key_is_modified, transactional_table, will_batch;
128
bool can_compare_record;
129
int error, loc_error;
142
130
uint used_index= MAX_KEY, dup_key_found;
143
131
bool need_sort= true;
144
132
ha_rows updated, found;
145
133
key_map old_covering_keys;
147
optimizer::SqlSelect *select= NULL;
149
Select_Lex *select_lex= &session->getLex()->select_lex;
137
Select_Lex *select_lex= &session->lex->select_lex;
151
139
List<Item> all_fields;
152
Session::killed_state_t killed_status= Session::NOT_KILLED;
140
Session::killed_state killed_status= Session::NOT_KILLED;
154
DRIZZLE_UPDATE_START(session->getQueryString()->c_str());
142
DRIZZLE_UPDATE_START();
155
143
if (session->openTablesLock(table_list))
157
DRIZZLE_UPDATE_DONE(1, 0, 0);
161
146
session->set_proc_info("init");
162
147
table= table_list->table;
164
149
/* Calculate "table->covering_keys" based on the WHERE */
165
table->covering_keys= table->getShare()->keys_in_use;
150
table->covering_keys= table->s->keys_in_use;
166
151
table->quick_keys.reset();
168
if (prepare_update(session, table_list, &conds, order_num, order))
170
DRIZZLE_UPDATE_DONE(1, 0, 0);
153
if (mysql_prepare_update(session, table_list, &conds, order_num, order))
174
156
old_covering_keys= table->covering_keys; // Keys used in WHERE
175
157
/* Check the fields we are going to modify */
176
158
if (setup_fields_with_no_wrap(session, 0, fields, MARK_COLUMNS_WRITE, 0, 0))
178
DRIZZLE_UPDATE_DONE(1, 0, 0);
159
goto abort; /* purecov: inspected */
182
160
if (table->timestamp_field)
184
162
// Don't set timestamp column if this is modified
185
163
if (table->timestamp_field->isWriteSet())
187
164
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
191
167
if (table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_UPDATE ||
192
168
table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH)
194
table->setWriteSet(table->timestamp_field->position());
169
table->setWriteSet(table->timestamp_field->field_index);
199
173
if (setup_fields(session, 0, values, MARK_COLUMNS_READ, 0, 0))
201
175
free_underlaid_joins(session, select_lex);
202
DRIZZLE_UPDATE_DONE(1, 0, 0);
176
goto abort; /* purecov: inspected */
207
179
if (select_lex->inner_refs_list.elements &&
208
180
fix_inner_refs(session, all_fields, select_lex, select_lex->ref_pointer_array))
210
DRIZZLE_UPDATE_DONE(1, 0, 0);
182
DRIZZLE_UPDATE_END();
224
196
update force the table handler to retrieve write-only fields to be able
225
197
to compare records and detect data change.
227
if (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ) &&
199
if (table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ &&
228
200
table->timestamp_field &&
229
201
(table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_UPDATE ||
230
202
table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
232
*table->read_set|= *table->write_set;
203
bitmap_union(table->read_set, table->write_set);
234
204
// Don't count on usage of 'only index' when calculating which key to use
235
205
table->covering_keys.reset();
237
/* Update the table->cursor->stats.records number */
238
table->cursor->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK);
207
/* Update the table->file->stats.records number */
208
table->file->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK);
240
select= optimizer::make_select(table, 0, 0, conds, 0, &error);
210
select= make_select(table, 0, 0, conds, 0, &error);
241
211
if (error || !limit ||
242
(select && select->check_quick(session, false, limit)))
212
(select && select->check_quick(session, safe_update, limit)))
246
* Resetting the Diagnostic area to prevent
249
session->main_da.reset_diagnostics_area();
250
215
free_underlaid_joins(session, select_lex);
251
if (error || session->is_error())
253
DRIZZLE_UPDATE_DONE(1, 0, 0);
256
DRIZZLE_UPDATE_DONE(0, 0, 0);
217
goto abort; // Error in where
218
DRIZZLE_UPDATE_END();
257
219
session->my_ok(); // No matching records
260
222
if (!select && limit != HA_POS_ERROR)
262
if ((used_index= optimizer::get_index_for_order(table, order, limit)) != MAX_KEY)
224
if ((used_index= get_index_for_order(table, order, limit)) != MAX_KEY)
263
225
need_sort= false;
265
227
/* If running in safe sql mode, don't allow updates without keys */
266
228
if (table->quick_keys.none())
268
230
session->server_status|=SERVER_QUERY_NO_INDEX_USED;
231
if (safe_update && !using_limit)
233
my_message(ER_UPDATE_WITHOUT_KEY_IN_SAFE_MODE,
234
ER(ER_UPDATE_WITHOUT_KEY_IN_SAFE_MODE), MYF(0));
271
239
table->mark_columns_needed_for_update();
365
334
if (used_index == MAX_KEY || (select && select->quick))
367
if ((error= info.init_read_record(session, table, select, 0, true)))
335
init_read_record(&info,session,table,select,0,1);
372
if ((error= info.init_read_record_idx(session, table, 1, used_index)))
337
init_read_record_idx(&info, session, table, 1, used_index);
376
339
session->set_proc_info("Searching rows for update");
377
340
ha_rows tmp_limit= limit;
379
while (not(error= info.read_record(&info)) && not session->getKilled())
342
while (!(error=info.read_record(&info)) && !session->killed)
381
344
if (!(select && select->skip_record()))
383
if (table->cursor->was_semi_consistent_read())
346
if (table->file->was_semi_consistent_read())
384
347
continue; /* repeat the read of the same row if it still exists */
386
table->cursor->position(table->getInsertRecord());
387
if (my_b_write(&tempfile,table->cursor->ref,
388
table->cursor->ref_length))
349
table->file->position(table->record[0]);
350
if (my_b_write(&tempfile,table->file->ref,
351
table->file->ref_length))
353
error=1; /* purecov: inspected */
354
break; /* purecov: inspected */
393
356
if (!--limit && using_limit)
400
table->cursor->unlock_row();
363
table->file->unlock_row();
402
if (session->getKilled() && not error)
365
if (session->killed && !error)
403
366
error= 1; // Aborted
404
367
limit= tmp_limit;
405
table->cursor->try_semi_consistent_read(0);
406
info.end_read_record();
368
table->file->try_semi_consistent_read(0);
369
end_read_record(&info);
408
371
/* Change select to use tempfile */
411
safe_delete(select->quick);
374
delete select->quick;
412
375
if (select->free_cond)
413
376
delete select->cond;
418
select= new optimizer::SqlSelect();
382
select= new SQL_SELECT;
419
383
select->head=table;
421
if (tempfile.reinit_io_cache(internal::READ_CACHE,0L,0,0))
423
// Read row ptrs from this cursor
424
memcpy(select->file, &tempfile, sizeof(tempfile));
385
if (reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
386
error=1; /* purecov: inspected */
387
select->file=tempfile; // Read row ptrs from this file
453
413
session->cuted_fields= 0L;
454
414
session->set_proc_info("Updating");
456
transactional_table= table->cursor->has_transactions();
457
session->setAbortOnWarning(test(!ignore));
416
transactional_table= table->file->has_transactions();
417
session->abort_on_warning= test(!ignore);
418
will_batch= !table->file->start_bulk_update();
460
421
Assure that we can use position()
461
422
if we need to create an error message.
463
if (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ))
424
if (table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ)
464
425
table->prepare_for_position();
466
while (not (error=info.read_record(&info)) && not session->getKilled())
428
We can use compare_record() to optimize away updates if
429
the table handler is returning all columns OR if
430
if all updated columns are read
432
can_compare_record= (!(table->file->ha_table_flags() &
433
HA_PARTIAL_COLUMN_READ) ||
434
bitmap_is_subset(table->write_set, table->read_set));
436
while (!(error=info.read_record(&info)) && !session->killed)
468
if (not (select && select->skip_record()))
438
if (!(select && select->skip_record()))
470
if (table->cursor->was_semi_consistent_read())
440
if (table->file->was_semi_consistent_read())
471
441
continue; /* repeat the read of the same row if it still exists */
473
443
table->storeRecord();
474
if (fill_record(session, fields, values))
444
if (fill_record(session, fields, values, 0))
445
break; /* purecov: inspected */
479
if (! table->records_are_comparable() || table->compare_records())
449
if (!can_compare_record || table->compare_record())
481
/* Non-batched update */
482
error= table->cursor->updateRecord(table->getUpdateRecord(),
483
table->getInsertRecord());
485
table->auto_increment_field_not_null= false;
454
Typically a batched handler can execute the batched jobs when:
455
1) When specifically told to do so
456
2) When it is not a good idea to batch anymore
457
3) When it is necessary to send batch for other reasons
458
(One such reason is when READ's must be performed)
460
1) is covered by exec_bulk_update calls.
461
2) and 3) is handled by the bulk_update_row method.
463
bulk_update_row can execute the updates including the one
464
defined in the bulk_update_row or not including the row
465
in the call. This is up to the handler implementation and can
466
vary from call to call.
468
The dup_key_found reports the number of duplicate keys found
469
in those updates actually executed. It only reports those if
470
the extra call with HA_EXTRA_IGNORE_DUP_KEY have been issued.
471
If this hasn't been issued it returns an error code and can
472
ignore this number. Thus any handler that implements batching
473
for UPDATE IGNORE must also handle this extra call properly.
475
If a duplicate key is found on the record included in this
476
call then it should be included in the count of dup_key_found
477
and error should be set to 0 (only if these errors are ignored).
479
error= table->file->ha_bulk_update_row(table->record[1],
482
limit+= dup_key_found;
483
updated-= dup_key_found;
487
/* Non-batched update */
488
error= table->file->ha_update_row(table->record[1],
487
491
if (!error || error == HA_ERR_RECORD_IS_THE_SAME)
489
493
if (error != HA_ERR_RECORD_IS_THE_SAME)
495
table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
499
table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
498
502
If (ignore && error is ignorable) we don't have to
499
503
do anything; otherwise...
503
if (table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
507
if (table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
504
508
flags|= ME_FATALERROR; /* Other handler errors are fatal */
506
510
prepare_record_for_error_message(error, table);
507
table->print_error(error,MYF(flags));
511
table->file->print_error(error,MYF(flags));
513
517
if (!--limit && using_limit)
515
error= -1; // Simulate end of cursor
520
We have reached end-of-file in most common situations where no
521
batching has occurred and if batching was supposed to occur but
522
no updates were made and finally when the batch execution was
523
performed without error and without finding any duplicate keys.
524
If the batched updates were performed with errors we need to
525
check and if no error but duplicate key's found we need to
526
continue since those are not counted for in limit.
529
((error= table->file->exec_bulk_update(&dup_key_found)) ||
534
/* purecov: begin inspected */
536
The handler should not report error of duplicate keys if they
537
are ignored. This is a requirement on batching handlers.
539
prepare_record_for_error_message(error, table);
540
table->file->print_error(error,MYF(0));
546
Either an error was found and we are ignoring errors or there
547
were duplicate keys found. In both cases we need to correct
548
the counters and continue the loop.
550
limit= dup_key_found; //limit is 0 when we get here so need to +
551
updated-= dup_key_found;
555
error= -1; // Simulate end of file
520
table->cursor->unlock_row();
561
table->file->unlock_row();
521
562
session->row_count++;
523
564
dup_key_found= 0;
529
570
It's assumed that if an error was set in combination with an effective
530
571
killed status then the error is due to killing.
532
killed_status= session->getKilled(); // get the status of the volatile
573
killed_status= session->killed; // get the status of the volatile
533
574
// simulated killing after the loop must be ineffective for binlogging
534
575
error= (killed_status == Session::NOT_KILLED)? error : 1;
536
updated-= dup_key_found;
537
table->cursor->try_semi_consistent_read(0);
579
(loc_error= table->file->exec_bulk_update(&dup_key_found)))
581
An error has occurred when a batched update was performed and returned
582
an error indication. It cannot be an allowed duplicate key error since
583
we require the batching handler to treat this as a normal behavior.
585
Otherwise we simply remove the number of duplicate keys records found
586
in the batched update.
589
/* purecov: begin inspected */
590
prepare_record_for_error_message(loc_error, table);
591
table->file->print_error(loc_error,MYF(ME_FATALERROR));
596
updated-= dup_key_found;
598
table->file->end_bulk_update();
599
table->file->try_semi_consistent_read(0);
539
601
if (!transactional_table && updated > 0)
540
session->transaction.stmt.markModifiedNonTransData();
602
session->transaction.stmt.modified_non_trans_table= true;
542
info.end_read_record();
604
end_read_record(&info);
544
606
session->set_proc_info("end");
545
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
607
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
548
610
error < 0 means really no error at all: we processed all rows until the
549
611
last one without error. error > 0 means an error (e.g. unique key
550
612
violation and no IGNORE or REPLACE). error == 0 is also an error (if
551
613
preparing the record or invoking before triggers fails). See
552
autocommitOrRollback(error>=0) and return(error>=0) below.
614
ha_autocommit_or_rollback(error>=0) and return(error>=0) below.
553
615
Sometimes we want to binlog even if we updated no rows, in case user used
554
616
it to be sure master and slave are in same state.
556
if ((error < 0) || session->transaction.stmt.hasModifiedNonTransData())
618
if ((error < 0) || session->transaction.stmt.modified_non_trans_table)
558
if (session->transaction.stmt.hasModifiedNonTransData())
559
session->transaction.all.markModifiedNonTransData();
620
if (session->transaction.stmt.modified_non_trans_table)
621
session->transaction.all.modified_non_trans_table= true;
561
assert(transactional_table || !updated || session->transaction.stmt.hasModifiedNonTransData());
623
assert(transactional_table || !updated || session->transaction.stmt.modified_non_trans_table);
562
624
free_underlaid_joins(session, select_lex);
564
626
/* If LAST_INSERT_ID(X) was used, report X */
565
627
id= session->arg_of_last_insert_id_function ?
566
628
session->first_successful_insert_id_in_prev_stmt : 0;
630
DRIZZLE_UPDATE_END();
570
633
char buff[STRING_BUFFER_USUAL_SIZE];
571
snprintf(buff, sizeof(buff), ER(ER_UPDATE_INFO), (ulong) found, (ulong) updated,
634
sprintf(buff, ER(ER_UPDATE_INFO), (ulong) found, (ulong) updated,
572
635
(ulong) session->cuted_fields);
573
636
session->row_count_func= updated;
575
* Resetting the Diagnostic area to prevent
578
session->main_da.reset_diagnostics_area();
579
session->my_ok((ulong) session->rowCount(), found, id, buff);
580
session->status_var.updated_row_count+= session->rowCount();
637
session->my_ok((ulong) session->row_count_func, found, id, buff);
582
session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL; /* calc cuted fields */
583
session->setAbortOnWarning(false);
584
DRIZZLE_UPDATE_DONE((error >= 0 || session->is_error()), found, updated);
585
return ((error >= 0 || session->is_error()) ? 1 : 0);
639
session->count_cuted_fields= CHECK_FIELD_IGNORE; /* calc cuted fields */
640
session->abort_on_warning= 0;
641
return((error >= 0 || session->is_error()) ? 1 : 0);
589
table->print_error(error,MYF(0));
592
645
free_underlaid_joins(session, select_lex);
593
646
if (table->key_read)
595
648
table->key_read=0;
596
table->cursor->extra(HA_EXTRA_NO_KEYREAD);
649
table->file->extra(HA_EXTRA_NO_KEYREAD);
598
session->setAbortOnWarning(false);
651
session->abort_on_warning= 0;
600
DRIZZLE_UPDATE_DONE(1, 0, 0);
654
DRIZZLE_UPDATE_END();
605
659
Prepare items in UPDATE statement
662
mysql_prepare_update()
609
663
session - thread handler
610
664
table_list - global/local table list
611
665
conds - conditions
612
order_num - number of ORDER BY list entries
613
order - ORDER BY clause list
666
order_num - number of order_st BY list entries
667
order - order_st BY clause list
619
bool prepare_update(Session *session, TableList *table_list,
620
Item **conds, uint32_t order_num, Order *order)
673
bool mysql_prepare_update(Session *session, TableList *table_list,
674
Item **conds, uint32_t order_num, order_st *order)
622
676
List<Item> all_fields;
623
Select_Lex *select_lex= &session->getLex()->select_lex;
677
Select_Lex *select_lex= &session->lex->select_lex;
625
session->getLex()->allow_sum_func= 0;
679
session->lex->allow_sum_func= 0;
627
681
if (setup_tables_and_check_access(session, &select_lex->context,
628
682
&select_lex->top_join_list,