12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
18
18
Single table and multi table updates of tables.
19
Multi-table updates were introduced by Sinisa & Monty
21
#include "drizzled/server_includes.h"
23
22
#include "drizzled/sql_select.h"
24
23
#include "drizzled/error.h"
25
24
#include "drizzled/probes.h"
26
25
#include "drizzled/sql_base.h"
27
#include "drizzled/field/epoch.h"
26
#include "drizzled/field/timestamp.h"
28
27
#include "drizzled/sql_parse.h"
29
28
#include "drizzled/optimizer/range.h"
30
29
#include "drizzled/records.h"
31
#include "drizzled/internal/my_sys.h"
32
#include "drizzled/internal/iocache.h"
33
#include "drizzled/transaction_services.h"
34
#include "drizzled/filesort.h"
35
#include "drizzled/plugin/storage_engine.h"
37
#include <boost/dynamic_bitset.hpp>
40
33
using namespace std;
34
using namespace drizzled;
46
37
Re-read record if more columns are needed for error message.
48
39
If we got a duplicate key error, we want to write an error
49
40
message containing the value of the duplicate key. If we do not have
50
all fields of the key value in getInsertRecord(), we need to re-read the
41
all fields of the key value in record[0], we need to re-read the
51
42
record with a proper read_set.
53
44
@param[in] error error number
57
48
static void prepare_record_for_error_message(int error, Table *table)
59
Field **field_p= NULL;
53
MyBitmap unique_map; /* Fields in offended unique. */
54
my_bitmap_map unique_map_buf[bitmap_buffer_size(MAX_FIELDS)];
64
57
Only duplicate key errors print the key value.
65
58
If storage engine does always read all columns, we have the value alraedy.
67
60
if ((error != HA_ERR_FOUND_DUPP_KEY) ||
68
! (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ)))
61
!(table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ)))
78
71
/* Create unique_map with all fields used by that index. */
79
boost::dynamic_bitset<> unique_map(table->getShare()->sizeFields()); /* Fields in offended unique. */
80
table->mark_columns_used_by_index_no_reset(keynr, unique_map);
72
unique_map.init(unique_map_buf, table->s->fields);
73
table->mark_columns_used_by_index_no_reset(keynr, &unique_map);
82
75
/* Subtract read_set and write_set. */
83
unique_map-= *table->read_set;
84
unique_map-= *table->write_set;
76
bitmap_subtract(&unique_map, table->read_set);
77
bitmap_subtract(&unique_map, table->write_set);
87
80
If the unique index uses columns that are neither in read_set
88
81
nor in write_set, we must re-read the record.
89
82
Otherwise no need to do anything.
91
if (unique_map.none())
84
if (unique_map.isClearAll())
94
87
/* Get identifier of last read record into table->cursor->ref. */
95
table->cursor->position(table->getInsertRecord());
88
table->cursor->position(table->record[0]);
96
89
/* Add all fields used by unique index to read_set. */
97
*table->read_set|= unique_map;
90
bitmap_union(table->read_set, &unique_map);
98
91
/* Read record that is identified by table->cursor->ref. */
99
(void) table->cursor->rnd_pos(table->getUpdateRecord(), table->cursor->ref);
92
(void) table->cursor->rnd_pos(table->record[1], table->cursor->ref);
100
93
/* Copy the newly read columns into the new record. */
101
for (field_p= table->getFields(); (field= *field_p); field_p++)
103
if (unique_map.test(field->position()))
105
field->copy_from_tmp(table->getShare()->rec_buff_length);
94
for (field_p= table->field; (field= *field_p); field_p++)
95
if (unique_map.isBitSet(field->field_index))
96
field->copy_from_tmp(table->s->rec_buff_length);
114
103
Process usual UPDATE
118
107
session thread handler
119
108
fields fields for update
120
109
values values of fields for update
121
110
conds WHERE clause expression
122
order_num number of elemen in ORDER BY clause
111
order_num number of elemen in order_st BY clause
123
112
order order_st BY clause list
124
113
limit limit clause
125
114
handle_duplicates how to handle duplicates
132
int update_query(Session *session, TableList *table_list,
121
int mysql_update(Session *session, TableList *table_list,
133
122
List<Item> &fields, List<Item> &values, COND *conds,
134
uint32_t order_num, Order *order,
123
uint32_t order_num, order_st *order,
135
124
ha_rows limit, enum enum_duplicates,
138
127
bool using_limit= limit != HA_POS_ERROR;
139
128
bool used_key_is_modified;
140
129
bool transactional_table;
130
bool can_compare_record;
142
132
uint used_index= MAX_KEY, dup_key_found;
143
133
bool need_sort= true;
144
134
ha_rows updated, found;
145
135
key_map old_covering_keys;
147
137
optimizer::SqlSelect *select= NULL;
149
139
Select_Lex *select_lex= &session->lex->select_lex;
151
141
List<Item> all_fields;
152
Session::killed_state_t killed_status= Session::NOT_KILLED;
142
Session::killed_state killed_status= Session::NOT_KILLED;
154
DRIZZLE_UPDATE_START(session->getQueryString()->c_str());
144
DRIZZLE_UPDATE_START(session->query);
155
145
if (session->openTablesLock(table_list))
157
147
DRIZZLE_UPDATE_DONE(1, 0, 0);
162
152
table= table_list->table;
164
154
/* Calculate "table->covering_keys" based on the WHERE */
165
table->covering_keys= table->getShare()->keys_in_use;
155
table->covering_keys= table->s->keys_in_use;
166
156
table->quick_keys.reset();
168
if (prepare_update(session, table_list, &conds, order_num, order))
170
DRIZZLE_UPDATE_DONE(1, 0, 0);
158
if (mysql_prepare_update(session, table_list, &conds, order_num, order))
174
161
old_covering_keys= table->covering_keys; // Keys used in WHERE
175
162
/* Check the fields we are going to modify */
176
163
if (setup_fields_with_no_wrap(session, 0, fields, MARK_COLUMNS_WRITE, 0, 0))
178
DRIZZLE_UPDATE_DONE(1, 0, 0);
182
165
if (table->timestamp_field)
184
167
// Don't set timestamp column if this is modified
185
168
if (table->timestamp_field->isWriteSet())
187
169
table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
191
172
if (table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_UPDATE ||
192
173
table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH)
194
table->setWriteSet(table->timestamp_field->position());
174
table->setWriteSet(table->timestamp_field->field_index);
199
178
if (setup_fields(session, 0, values, MARK_COLUMNS_READ, 0, 0))
201
180
free_underlaid_joins(session, select_lex);
202
DRIZZLE_UPDATE_DONE(1, 0, 0);
207
184
if (select_lex->inner_refs_list.elements &&
208
185
fix_inner_refs(session, all_fields, select_lex, select_lex->ref_pointer_array))
210
187
DRIZZLE_UPDATE_DONE(1, 0, 0);
228
205
table->timestamp_field &&
229
206
(table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_UPDATE ||
230
207
table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
232
*table->read_set|= *table->write_set;
208
bitmap_union(table->read_set, table->write_set);
234
209
// Don't count on usage of 'only index' when calculating which key to use
235
210
table->covering_keys.reset();
249
224
session->main_da.reset_diagnostics_area();
250
225
free_underlaid_joins(session, select_lex);
251
if (error || session->is_error())
253
DRIZZLE_UPDATE_DONE(1, 0, 0);
227
goto abort; // Error in where
256
228
DRIZZLE_UPDATE_DONE(0, 0, 0);
257
229
session->my_ok(); // No matching records
284
256
if (used_index == MAX_KEY) // no index for sort order
285
257
used_index= table->cursor->key_used_on_scan;
286
258
if (used_index != MAX_KEY)
287
used_key_is_modified= is_key_used(table, used_index, *table->write_set);
259
used_key_is_modified= is_key_used(table, used_index, table->write_set);
313
285
NOTE: filesort will call table->prepare_for_position()
315
287
uint32_t length= 0;
316
SortField *sortorder;
288
SORT_FIELD *sortorder;
317
289
ha_rows examined_rows;
318
FileSort filesort(*session);
320
table->sort.io_cache= new internal::IO_CACHE;
291
table->sort.io_cache = new IO_CACHE;
292
memset(table->sort.io_cache, 0, sizeof(IO_CACHE));
322
294
if (!(sortorder=make_unireg_sortorder(order, &length, NULL)) ||
323
(table->sort.found_records= filesort.run(table, sortorder, length,
325
examined_rows)) == HA_POS_ERROR)
295
(table->sort.found_records= filesort(session, table, sortorder, length,
341
314
update these in a separate loop based on the pointer.
344
internal::IO_CACHE tempfile;
345
if (tempfile.open_cached_file(drizzle_tmpdir.c_str(),TEMP_PREFIX, DISK_BUFFER_SIZE, MYF(MY_WME)))
318
if (open_cached_file(&tempfile, drizzle_tmpdir,TEMP_PREFIX,
319
DISK_BUFFER_SIZE, MYF(MY_WME)))
350
322
/* If quick select is used, initialize it before retrieving rows. */
351
323
if (select && select->quick && select->quick->reset())
366
338
if (used_index == MAX_KEY || (select && select->quick))
368
if ((error= info.init_read_record(session, table, select, 0, true)))
339
init_read_record(&info,session,table,select,0,1);
373
if ((error= info.init_read_record_idx(session, table, 1, used_index)))
341
init_read_record_idx(&info, session, table, 1, used_index);
377
343
session->set_proc_info("Searching rows for update");
378
344
ha_rows tmp_limit= limit;
380
while (not(error= info.read_record(&info)) && not session->getKilled())
346
while (!(error=info.read_record(&info)) && !session->killed)
382
348
if (!(select && select->skip_record()))
384
350
if (table->cursor->was_semi_consistent_read())
385
351
continue; /* repeat the read of the same row if it still exists */
387
table->cursor->position(table->getInsertRecord());
353
table->cursor->position(table->record[0]);
388
354
if (my_b_write(&tempfile,table->cursor->ref,
389
355
table->cursor->ref_length))
401
367
table->cursor->unlock_row();
403
if (session->getKilled() && not error)
369
if (session->killed && !error)
404
370
error= 1; // Aborted
405
371
limit= tmp_limit;
406
372
table->cursor->try_semi_consistent_read(0);
407
info.end_read_record();
373
end_read_record(&info);
409
375
/* Change select to use tempfile */
420
select= new optimizer::SqlSelect();
386
select= new optimizer::SqlSelect;
421
387
select->head=table;
423
if (tempfile.reinit_io_cache(internal::READ_CACHE,0L,0,0))
389
if (reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
425
// Read row ptrs from this cursor
426
memcpy(select->file, &tempfile, sizeof(tempfile));
391
select->file= tempfile; // Read row ptrs from this cursor
437
402
if (select && select->quick && select->quick->reset())
439
404
table->cursor->try_semi_consistent_read(1);
440
if ((error= info.init_read_record(session, table, select, 0, true)))
405
init_read_record(&info,session,table,select,0,1);
445
407
updated= found= 0;
456
418
session->set_proc_info("Updating");
458
420
transactional_table= table->cursor->has_transactions();
459
session->setAbortOnWarning(test(!ignore));
421
session->abort_on_warning= test(!ignore);
462
424
Assure that we can use position()
465
427
if (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ))
466
428
table->prepare_for_position();
468
while (not (error=info.read_record(&info)) && not session->getKilled())
431
We can use compare_record() to optimize away updates if
432
the table handler is returning all columns OR if
433
if all updated columns are read
435
can_compare_record= (!(table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ)) ||
436
bitmap_is_subset(table->write_set, table->read_set));
438
while (!(error=info.read_record(&info)) && !session->killed)
470
if (not (select && select->skip_record()))
440
if (!(select && select->skip_record()))
472
442
if (table->cursor->was_semi_consistent_read())
473
443
continue; /* repeat the read of the same row if it still exists */
481
if (! table->records_are_comparable() || table->compare_records())
451
if (!can_compare_record || table->compare_record())
483
453
/* Non-batched update */
484
error= table->cursor->updateRecord(table->getUpdateRecord(),
485
table->getInsertRecord());
487
table->auto_increment_field_not_null= false;
454
error= table->cursor->ha_update_row(table->record[1],
489
456
if (!error || error == HA_ERR_RECORD_IS_THE_SAME)
491
458
if (error != HA_ERR_RECORD_IS_THE_SAME)
497
464
table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
500
467
If (ignore && error is ignorable) we don't have to
501
468
do anything; otherwise...
506
473
flags|= ME_FATALERROR; /* Other handler errors are fatal */
508
475
prepare_record_for_error_message(error, table);
509
table->print_error(error,MYF(flags));
476
table->print_error(error,MYF(flags));
515
482
if (!--limit && using_limit)
531
498
It's assumed that if an error was set in combination with an effective
532
499
killed status then the error is due to killing.
534
killed_status= session->getKilled(); // get the status of the volatile
501
killed_status= session->killed; // get the status of the volatile
535
502
// simulated killing after the loop must be ineffective for binlogging
536
503
error= (killed_status == Session::NOT_KILLED)? error : 1;
539
506
table->cursor->try_semi_consistent_read(0);
541
508
if (!transactional_table && updated > 0)
542
session->transaction.stmt.markModifiedNonTransData();
509
session->transaction.stmt.modified_non_trans_table= true;
544
info.end_read_record();
511
end_read_record(&info);
546
513
session->set_proc_info("end");
547
514
table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
551
518
last one without error. error > 0 means an error (e.g. unique key
552
519
violation and no IGNORE or REPLACE). error == 0 is also an error (if
553
520
preparing the record or invoking before triggers fails). See
554
autocommitOrRollback(error>=0) and return(error>=0) below.
521
ha_autocommit_or_rollback(error>=0) and return(error>=0) below.
555
522
Sometimes we want to binlog even if we updated no rows, in case user used
556
523
it to be sure master and slave are in same state.
558
if ((error < 0) || session->transaction.stmt.hasModifiedNonTransData())
525
if ((error < 0) || session->transaction.stmt.modified_non_trans_table)
560
if (session->transaction.stmt.hasModifiedNonTransData())
561
session->transaction.all.markModifiedNonTransData();
527
if (session->transaction.stmt.modified_non_trans_table)
528
session->transaction.all.modified_non_trans_table= true;
563
assert(transactional_table || !updated || session->transaction.stmt.hasModifiedNonTransData());
530
assert(transactional_table || !updated || session->transaction.stmt.modified_non_trans_table);
564
531
free_underlaid_joins(session, select_lex);
566
533
/* If LAST_INSERT_ID(X) was used, report X */
572
539
char buff[STRING_BUFFER_USUAL_SIZE];
573
snprintf(buff, sizeof(buff), ER(ER_UPDATE_INFO), (ulong) found, (ulong) updated,
540
sprintf(buff, ER(ER_UPDATE_INFO), (ulong) found, (ulong) updated,
574
541
(ulong) session->cuted_fields);
575
542
session->row_count_func= updated;
580
547
session->main_da.reset_diagnostics_area();
581
session->my_ok((ulong) session->rowCount(), found, id, buff);
582
session->status_var.updated_row_count+= session->rowCount();
548
session->my_ok((ulong) session->row_count_func, found, id, buff);
584
session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL; /* calc cuted fields */
585
session->setAbortOnWarning(false);
550
session->count_cuted_fields= CHECK_FIELD_IGNORE; /* calc cuted fields */
551
session->abort_on_warning= 0;
586
552
DRIZZLE_UPDATE_DONE((error >= 0 || session->is_error()), found, updated);
587
553
return ((error >= 0 || session->is_error()) ? 1 : 0);
591
table->print_error(error,MYF(0));
594
557
free_underlaid_joins(session, select_lex);
595
558
if (table->key_read)
597
560
table->key_read=0;
598
561
table->cursor->extra(HA_EXTRA_NO_KEYREAD);
600
session->setAbortOnWarning(false);
563
session->abort_on_warning= 0;
602
566
DRIZZLE_UPDATE_DONE(1, 0, 0);
607
571
Prepare items in UPDATE statement
574
mysql_prepare_update()
611
575
session - thread handler
612
576
table_list - global/local table list
613
577
conds - conditions
614
order_num - number of ORDER BY list entries
615
order - ORDER BY clause list
578
order_num - number of order_st BY list entries
579
order - order_st BY clause list
621
bool prepare_update(Session *session, TableList *table_list,
622
Item **conds, uint32_t order_num, Order *order)
585
bool mysql_prepare_update(Session *session, TableList *table_list,
586
Item **conds, uint32_t order_num, order_st *order)
624
588
List<Item> all_fields;
625
589
Select_Lex *select_lex= &session->lex->select_lex;
642
606
TableList *duplicate;
643
607
if ((duplicate= unique_table(table_list, table_list->next_global)))
645
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
609
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
653
} /* namespace drizzled */