12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
18
18
Single table and multi table updates of tables.
19
19
Multi-table updates were introduced by Sinisa & Monty
22
#include "drizzled/sql_select.h"
23
#include "drizzled/error.h"
24
#include "drizzled/probes.h"
25
#include "drizzled/sql_base.h"
26
#include "drizzled/field/timestamp.h"
27
#include "drizzled/sql_parse.h"
28
#include "drizzled/optimizer/range.h"
29
#include "drizzled/records.h"
30
#include "drizzled/internal/my_sys.h"
31
#include "drizzled/internal/iocache.h"
32
#include "drizzled/transaction_services.h"
33
#include "drizzled/filesort.h"
21
#include <drizzled/server_includes.h>
22
#include <drizzled/sql_select.h>
23
#include <drizzled/error.h>
24
#include <drizzled/probes.h>
25
#include <drizzled/sql_base.h>
26
#include <drizzled/field/timestamp.h>
27
#include <drizzled/sql_parse.h>
35
#include <boost/dynamic_bitset.hpp>
38
31
using namespace std;
44
35
Re-read record if more columns are needed for error message.
46
37
If we got a duplicate key error, we want to write an error
47
38
message containing the value of the duplicate key. If we do not have
48
all fields of the key value in getInsertRecord(), we need to re-read the
39
all fields of the key value in record[0], we need to re-read the
49
40
record with a proper read_set.
51
42
@param[in] error error number
55
46
static void prepare_record_for_error_message(int error, Table *table)
57
Field **field_p= NULL;
51
MyBitmap unique_map; /* Fields in offended unique. */
52
my_bitmap_map unique_map_buf[bitmap_buffer_size(MAX_FIELDS)];
62
55
Only duplicate key errors print the key value.
63
56
If storage engine does always read all columns, we have the value alraedy.
65
58
if ((error != HA_ERR_FOUND_DUPP_KEY) ||
66
! (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ)))
59
!(table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ))
70
63
Get the number of the offended index.
71
64
We will see MAX_KEY if the engine cannot determine the affected index.
73
if ((keynr= table->get_dup_key(error)) >= MAX_KEY)
66
if ((keynr= table->file->get_dup_key(error)) >= MAX_KEY)
76
69
/* Create unique_map with all fields used by that index. */
77
boost::dynamic_bitset<> unique_map(table->getShare()->sizeFields()); /* Fields in offended unique. */
78
table->mark_columns_used_by_index_no_reset(keynr, unique_map);
70
unique_map.init(unique_map_buf, table->s->fields);
71
table->mark_columns_used_by_index_no_reset(keynr, &unique_map);
80
73
/* Subtract read_set and write_set. */
81
unique_map-= *table->read_set;
82
unique_map-= *table->write_set;
74
bitmap_subtract(&unique_map, table->read_set);
75
bitmap_subtract(&unique_map, table->write_set);
85
78
If the unique index uses columns that are neither in read_set
86
79
nor in write_set, we must re-read the record.
87
80
Otherwise no need to do anything.
89
if (unique_map.none())
82
if (unique_map.isClearAll())
92
/* Get identifier of last read record into table->cursor->ref. */
93
table->cursor->position(table->getInsertRecord());
85
/* Get identifier of last read record into table->file->ref. */
86
table->file->position(table->record[0]);
94
87
/* Add all fields used by unique index to read_set. */
95
*table->read_set|= unique_map;
96
/* Read record that is identified by table->cursor->ref. */
97
(void) table->cursor->rnd_pos(table->getUpdateRecord(), table->cursor->ref);
88
bitmap_union(table->read_set, &unique_map);
89
/* Read record that is identified by table->file->ref. */
90
(void) table->file->rnd_pos(table->record[1], table->file->ref);
98
91
/* Copy the newly read columns into the new record. */
99
for (field_p= table->getFields(); (field= *field_p); field_p++)
101
if (unique_map.test(field->position()))
103
field->copy_from_tmp(table->getShare()->rec_buff_length);
92
for (field_p= table->field; (field= *field_p); field_p++)
93
if (unique_map.isBitSet(field->field_index))
94
field->copy_from_tmp(table->s->rec_buff_length);
130
119
int mysql_update(Session *session, TableList *table_list,
131
120
List<Item> &fields, List<Item> &values, COND *conds,
132
uint32_t order_num, Order *order,
121
uint32_t order_num, order_st *order,
133
122
ha_rows limit, enum enum_duplicates,
136
125
bool using_limit= limit != HA_POS_ERROR;
137
bool used_key_is_modified;
138
bool transactional_table;
126
bool safe_update= test(session->options & OPTION_SAFE_UPDATES);
127
bool used_key_is_modified, transactional_table, will_batch;
128
bool can_compare_record;
129
int error, loc_error;
140
130
uint used_index= MAX_KEY, dup_key_found;
141
131
bool need_sort= true;
142
132
ha_rows updated, found;
143
133
key_map old_covering_keys;
145
optimizer::SqlSelect *select= NULL;
147
137
Select_Lex *select_lex= &session->lex->select_lex;
149
139
List<Item> all_fields;
150
Session::killed_state_t killed_status= Session::NOT_KILLED;
140
Session::killed_state killed_status= Session::NOT_KILLED;
152
DRIZZLE_UPDATE_START(session->getQueryString()->c_str());
142
DRIZZLE_UPDATE_START(session->query);
153
143
if (session->openTablesLock(table_list))
155
145
DRIZZLE_UPDATE_DONE(1, 0, 0);
160
150
table= table_list->table;
162
152
/* Calculate "table->covering_keys" based on the WHERE */
163
table->covering_keys= table->getShare()->keys_in_use;
153
table->covering_keys= table->s->keys_in_use;
164
154
table->quick_keys.reset();
166
156
if (mysql_prepare_update(session, table_list, &conds, order_num, order))
168
DRIZZLE_UPDATE_DONE(1, 0, 0);
172
159
old_covering_keys= table->covering_keys; // Keys used in WHERE
173
160
/* Check the fields we are going to modify */
174
161
if (setup_fields_with_no_wrap(session, 0, fields, MARK_COLUMNS_WRITE, 0, 0))
176
DRIZZLE_UPDATE_DONE(1, 0, 0);
180
163
if (table->timestamp_field)
182
165
// Don't set timestamp column if this is modified
183
166
if (table->timestamp_field->isWriteSet())
185
167
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
189
170
if (table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_UPDATE ||
190
171
table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH)
192
table->setWriteSet(table->timestamp_field->position());
172
table->setWriteSet(table->timestamp_field->field_index);
197
176
if (setup_fields(session, 0, values, MARK_COLUMNS_READ, 0, 0))
199
178
free_underlaid_joins(session, select_lex);
200
DRIZZLE_UPDATE_DONE(1, 0, 0);
205
182
if (select_lex->inner_refs_list.elements &&
222
199
update force the table handler to retrieve write-only fields to be able
223
200
to compare records and detect data change.
225
if (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ) &&
202
if (table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ &&
226
203
table->timestamp_field &&
227
204
(table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_UPDATE ||
228
205
table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
230
*table->read_set|= *table->write_set;
206
bitmap_union(table->read_set, table->write_set);
232
207
// Don't count on usage of 'only index' when calculating which key to use
233
208
table->covering_keys.reset();
235
/* Update the table->cursor->stats.records number */
236
table->cursor->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK);
210
/* Update the table->file->stats.records number */
211
table->file->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK);
238
select= optimizer::make_select(table, 0, 0, conds, 0, &error);
213
select= make_select(table, 0, 0, conds, 0, &error);
239
214
if (error || !limit ||
240
(select && select->check_quick(session, false, limit)))
215
(select && select->check_quick(session, safe_update, limit)))
244
* Resetting the Diagnostic area to prevent
247
session->main_da.reset_diagnostics_area();
248
218
free_underlaid_joins(session, select_lex);
251
DRIZZLE_UPDATE_DONE(1, 0, 0);
220
goto abort; // Error in where
254
221
DRIZZLE_UPDATE_DONE(0, 0, 0);
255
222
session->my_ok(); // No matching records
258
225
if (!select && limit != HA_POS_ERROR)
260
if ((used_index= optimizer::get_index_for_order(table, order, limit)) != MAX_KEY)
227
if ((used_index= get_index_for_order(table, order, limit)) != MAX_KEY)
261
228
need_sort= false;
263
230
/* If running in safe sql mode, don't allow updates without keys */
264
231
if (table->quick_keys.none())
266
233
session->server_status|=SERVER_QUERY_NO_INDEX_USED;
234
if (safe_update && !using_limit)
236
my_message(ER_UPDATE_WITHOUT_KEY_IN_SAFE_MODE,
237
ER(ER_UPDATE_WITHOUT_KEY_IN_SAFE_MODE), MYF(0));
269
242
table->mark_columns_needed_for_update();
364
337
if (used_index == MAX_KEY || (select && select->quick))
366
info.init_read_record(session, table, select, 0, true);
338
init_read_record(&info,session,table,select,0,1);
370
info.init_read_record_idx(session, table, 1, used_index);
340
init_read_record_idx(&info, session, table, 1, used_index);
373
342
session->set_proc_info("Searching rows for update");
374
343
ha_rows tmp_limit= limit;
376
while (not(error= info.read_record(&info)) && not session->getKilled())
345
while (!(error=info.read_record(&info)) && !session->killed)
378
347
if (!(select && select->skip_record()))
380
if (table->cursor->was_semi_consistent_read())
349
if (table->file->was_semi_consistent_read())
381
350
continue; /* repeat the read of the same row if it still exists */
383
table->cursor->position(table->getInsertRecord());
384
if (my_b_write(&tempfile,table->cursor->ref,
385
table->cursor->ref_length))
352
table->file->position(table->record[0]);
353
if (my_b_write(&tempfile,table->file->ref,
354
table->file->ref_length))
448
416
session->cuted_fields= 0L;
449
417
session->set_proc_info("Updating");
451
transactional_table= table->cursor->has_transactions();
419
transactional_table= table->file->has_transactions();
452
420
session->abort_on_warning= test(!ignore);
421
will_batch= !table->file->start_bulk_update();
455
424
Assure that we can use position()
456
425
if we need to create an error message.
458
if (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ))
427
if (table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ)
459
428
table->prepare_for_position();
461
while (not (error=info.read_record(&info)) && not session->getKilled())
431
We can use compare_record() to optimize away updates if
432
the table handler is returning all columns OR if
433
if all updated columns are read
435
can_compare_record= (!(table->file->ha_table_flags() &
436
HA_PARTIAL_COLUMN_READ) ||
437
bitmap_is_subset(table->write_set, table->read_set));
439
while (!(error=info.read_record(&info)) && !session->killed)
463
if (not (select && select->skip_record()))
441
if (!(select && select->skip_record()))
465
if (table->cursor->was_semi_consistent_read())
443
if (table->file->was_semi_consistent_read())
466
444
continue; /* repeat the read of the same row if it still exists */
468
446
table->storeRecord();
469
if (fill_record(session, fields, values))
447
if (fill_record(session, fields, values, 0))
474
if (! table->records_are_comparable() || table->compare_records())
452
if (!can_compare_record || table->compare_record())
476
/* Non-batched update */
477
error= table->cursor->updateRecord(table->getUpdateRecord(),
478
table->getInsertRecord());
480
table->auto_increment_field_not_null= false;
457
Typically a batched handler can execute the batched jobs when:
458
1) When specifically told to do so
459
2) When it is not a good idea to batch anymore
460
3) When it is necessary to send batch for other reasons
461
(One such reason is when READ's must be performed)
463
1) is covered by exec_bulk_update calls.
464
2) and 3) is handled by the bulk_update_row method.
466
bulk_update_row can execute the updates including the one
467
defined in the bulk_update_row or not including the row
468
in the call. This is up to the handler implementation and can
469
vary from call to call.
471
The dup_key_found reports the number of duplicate keys found
472
in those updates actually executed. It only reports those if
473
the extra call with HA_EXTRA_IGNORE_DUP_KEY have been issued.
474
If this hasn't been issued it returns an error code and can
475
ignore this number. Thus any handler that implements batching
476
for UPDATE IGNORE must also handle this extra call properly.
478
If a duplicate key is found on the record included in this
479
call then it should be included in the count of dup_key_found
480
and error should be set to 0 (only if these errors are ignored).
482
error= table->file->ha_bulk_update_row(table->record[1],
485
limit+= dup_key_found;
486
updated-= dup_key_found;
490
/* Non-batched update */
491
error= table->file->ha_update_row(table->record[1],
482
494
if (!error || error == HA_ERR_RECORD_IS_THE_SAME)
484
496
if (error != HA_ERR_RECORD_IS_THE_SAME)
490
table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
502
table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
493
505
If (ignore && error is ignorable) we don't have to
494
506
do anything; otherwise...
498
if (table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
510
if (table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
499
511
flags|= ME_FATALERROR; /* Other handler errors are fatal */
501
513
prepare_record_for_error_message(error, table);
502
table->print_error(error,MYF(flags));
514
table->file->print_error(error,MYF(flags));
508
520
if (!--limit && using_limit)
510
error= -1; // Simulate end of cursor
523
We have reached end-of-file in most common situations where no
524
batching has occurred and if batching was supposed to occur but
525
no updates were made and finally when the batch execution was
526
performed without error and without finding any duplicate keys.
527
If the batched updates were performed with errors we need to
528
check and if no error but duplicate key's found we need to
529
continue since those are not counted for in limit.
532
((error= table->file->exec_bulk_update(&dup_key_found)) ||
538
The handler should not report error of duplicate keys if they
539
are ignored. This is a requirement on batching handlers.
541
prepare_record_for_error_message(error, table);
542
table->file->print_error(error,MYF(0));
547
Either an error was found and we are ignoring errors or there
548
were duplicate keys found. In both cases we need to correct
549
the counters and continue the loop.
551
limit= dup_key_found; //limit is 0 when we get here so need to +
552
updated-= dup_key_found;
556
error= -1; // Simulate end of file
515
table->cursor->unlock_row();
562
table->file->unlock_row();
516
563
session->row_count++;
518
565
dup_key_found= 0;
524
571
It's assumed that if an error was set in combination with an effective
525
572
killed status then the error is due to killing.
527
killed_status= session->getKilled(); // get the status of the volatile
574
killed_status= session->killed; // get the status of the volatile
528
575
// simulated killing after the loop must be ineffective for binlogging
529
576
error= (killed_status == Session::NOT_KILLED)? error : 1;
531
updated-= dup_key_found;
532
table->cursor->try_semi_consistent_read(0);
580
(loc_error= table->file->exec_bulk_update(&dup_key_found)))
582
An error has occurred when a batched update was performed and returned
583
an error indication. It cannot be an allowed duplicate key error since
584
we require the batching handler to treat this as a normal behavior.
586
Otherwise we simply remove the number of duplicate keys records found
587
in the batched update.
590
prepare_record_for_error_message(loc_error, table);
591
table->file->print_error(loc_error,MYF(ME_FATALERROR));
595
updated-= dup_key_found;
597
table->file->end_bulk_update();
598
table->file->try_semi_consistent_read(0);
534
600
if (!transactional_table && updated > 0)
535
session->transaction.stmt.markModifiedNonTransData();
601
session->transaction.stmt.modified_non_trans_table= true;
537
info.end_read_record();
603
end_read_record(&info);
539
605
session->set_proc_info("end");
540
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
606
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
543
609
error < 0 means really no error at all: we processed all rows until the
544
610
last one without error. error > 0 means an error (e.g. unique key
545
611
violation and no IGNORE or REPLACE). error == 0 is also an error (if
546
612
preparing the record or invoking before triggers fails). See
547
autocommitOrRollback(error>=0) and return(error>=0) below.
613
ha_autocommit_or_rollback(error>=0) and return(error>=0) below.
548
614
Sometimes we want to binlog even if we updated no rows, in case user used
549
615
it to be sure master and slave are in same state.
551
if ((error < 0) || session->transaction.stmt.hasModifiedNonTransData())
617
if ((error < 0) || session->transaction.stmt.modified_non_trans_table)
553
if (session->transaction.stmt.hasModifiedNonTransData())
554
session->transaction.all.markModifiedNonTransData();
619
if (session->transaction.stmt.modified_non_trans_table)
620
session->transaction.all.modified_non_trans_table= true;
556
assert(transactional_table || !updated || session->transaction.stmt.hasModifiedNonTransData());
622
assert(transactional_table || !updated || session->transaction.stmt.modified_non_trans_table);
557
623
free_underlaid_joins(session, select_lex);
559
625
/* If LAST_INSERT_ID(X) was used, report X */
565
631
char buff[STRING_BUFFER_USUAL_SIZE];
566
snprintf(buff, sizeof(buff), ER(ER_UPDATE_INFO), (ulong) found, (ulong) updated,
632
sprintf(buff, ER(ER_UPDATE_INFO), (ulong) found, (ulong) updated,
567
633
(ulong) session->cuted_fields);
568
634
session->row_count_func= updated;
570
* Resetting the Diagnostic area to prevent
573
session->main_da.reset_diagnostics_area();
574
635
session->my_ok((ulong) session->row_count_func, found, id, buff);
575
session->status_var.updated_row_count+= session->row_count_func;
577
session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL; /* calc cuted fields */
637
session->count_cuted_fields= CHECK_FIELD_IGNORE; /* calc cuted fields */
578
638
session->abort_on_warning= 0;
579
639
DRIZZLE_UPDATE_DONE((error >= 0 || session->is_error()), found, updated);
580
640
return ((error >= 0 || session->is_error()) ? 1 : 0);