~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_update.cc

  • Committer: Monty Taylor
  • Date: 2010-06-20 19:25:46 UTC
  • mto: (1626.1.2 build)
  • mto: This revision was merged to the branch mainline in revision 1633.
  • Revision ID: mordred@inaugust.com-20100620192546-vb496u81mnxepu9u
Replaced terrible casting with strdup+free.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
 
17
17
/*
18
18
  Single table and multi table updates of tables.
 
19
  Multi-table updates were introduced by Sinisa & Monty
19
20
*/
20
 
 
21
 
#include <config.h>
22
 
 
23
 
#include <drizzled/sql_select.h>
24
 
#include <drizzled/error.h>
25
 
#include <drizzled/probes.h>
26
 
#include <drizzled/sql_base.h>
27
 
#include <drizzled/field/epoch.h>
28
 
#include <drizzled/sql_parse.h>
29
 
#include <drizzled/optimizer/range.h>
30
 
#include <drizzled/records.h>
31
 
#include <drizzled/internal/my_sys.h>
32
 
#include <drizzled/internal/iocache.h>
33
 
#include <drizzled/transaction_services.h>
34
 
#include <drizzled/filesort.h>
35
 
#include <drizzled/plugin/storage_engine.h>
36
 
 
37
 
#include <boost/dynamic_bitset.hpp>
 
21
#include "config.h"
 
22
#include "drizzled/sql_select.h"
 
23
#include "drizzled/error.h"
 
24
#include "drizzled/probes.h"
 
25
#include "drizzled/sql_base.h"
 
26
#include "drizzled/field/timestamp.h"
 
27
#include "drizzled/sql_parse.h"
 
28
#include "drizzled/optimizer/range.h"
 
29
#include "drizzled/records.h"
 
30
#include "drizzled/internal/my_sys.h"
 
31
#include "drizzled/internal/iocache.h"
 
32
 
38
33
#include <list>
39
34
 
40
35
using namespace std;
47
42
 
48
43
  If we got a duplicate key error, we want to write an error
49
44
  message containing the value of the duplicate key. If we do not have
50
 
  all fields of the key value in getInsertRecord(), we need to re-read the
 
45
  all fields of the key value in record[0], we need to re-read the
51
46
  record with a proper read_set.
52
47
 
53
48
  @param[in] error   error number
56
51
 
57
52
static void prepare_record_for_error_message(int error, Table *table)
58
53
{
59
 
  Field **field_p= NULL;
60
 
  Field *field= NULL;
61
 
  uint32_t keynr= 0;
 
54
  Field **field_p;
 
55
  Field *field;
 
56
  uint32_t keynr;
 
57
  MyBitmap unique_map; /* Fields in offended unique. */
 
58
  my_bitmap_map unique_map_buf[bitmap_buffer_size(MAX_FIELDS)];
62
59
 
63
60
  /*
64
61
    Only duplicate key errors print the key value.
65
62
    If storage engine does always read all columns, we have the value alraedy.
66
63
  */
67
64
  if ((error != HA_ERR_FOUND_DUPP_KEY) ||
68
 
      ! (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ)))
 
65
      !(table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ)))
69
66
    return;
70
67
 
71
68
  /*
76
73
    return;
77
74
 
78
75
  /* Create unique_map with all fields used by that index. */
79
 
  boost::dynamic_bitset<> unique_map(table->getShare()->sizeFields()); /* Fields in offended unique. */
80
 
  table->mark_columns_used_by_index_no_reset(keynr, unique_map);
 
76
  unique_map.init(unique_map_buf, table->getMutableShare()->sizeFields());
 
77
  table->mark_columns_used_by_index_no_reset(keynr, &unique_map);
81
78
 
82
79
  /* Subtract read_set and write_set. */
83
 
  unique_map-= *table->read_set;
84
 
  unique_map-= *table->write_set;
 
80
  bitmap_subtract(&unique_map, table->read_set);
 
81
  bitmap_subtract(&unique_map, table->write_set);
85
82
 
86
83
  /*
87
84
    If the unique index uses columns that are neither in read_set
88
85
    nor in write_set, we must re-read the record.
89
86
    Otherwise no need to do anything.
90
87
  */
91
 
  if (unique_map.none())
 
88
  if (unique_map.isClearAll())
92
89
    return;
93
90
 
94
91
  /* Get identifier of last read record into table->cursor->ref. */
95
 
  table->cursor->position(table->getInsertRecord());
 
92
  table->cursor->position(table->record[0]);
96
93
  /* Add all fields used by unique index to read_set. */
97
 
  *table->read_set|= unique_map;
 
94
  bitmap_union(table->read_set, &unique_map);
98
95
  /* Read record that is identified by table->cursor->ref. */
99
 
  (void) table->cursor->rnd_pos(table->getUpdateRecord(), table->cursor->ref);
 
96
  (void) table->cursor->rnd_pos(table->record[1], table->cursor->ref);
100
97
  /* Copy the newly read columns into the new record. */
101
98
  for (field_p= table->getFields(); (field= *field_p); field_p++)
102
99
  {
103
 
    if (unique_map.test(field->position()))
 
100
    if (unique_map.isBitSet(field->field_index))
104
101
    {
105
102
      field->copy_from_tmp(table->getShare()->rec_buff_length);
106
103
    }
107
104
  }
108
105
 
 
106
 
109
107
  return;
110
108
}
111
109
 
114
112
  Process usual UPDATE
115
113
 
116
114
  SYNOPSIS
117
 
    update_query()
 
115
    mysql_update()
118
116
    session                     thread handler
119
117
    fields              fields for update
120
118
    values              values of fields for update
129
127
    1  - error
130
128
*/
131
129
 
132
 
int update_query(Session *session, TableList *table_list,
 
130
int mysql_update(Session *session, TableList *table_list,
133
131
                 List<Item> &fields, List<Item> &values, COND *conds,
134
 
                 uint32_t order_num, Order *order,
 
132
                 uint32_t order_num, order_st *order,
135
133
                 ha_rows limit, enum enum_duplicates,
136
134
                 bool ignore)
137
135
{
138
136
  bool          using_limit= limit != HA_POS_ERROR;
139
137
  bool          used_key_is_modified;
140
138
  bool          transactional_table;
141
 
  int           error= 0;
 
139
  bool          can_compare_record;
 
140
  int           error;
142
141
  uint          used_index= MAX_KEY, dup_key_found;
143
142
  bool          need_sort= true;
144
143
  ha_rows       updated, found;
146
145
  Table         *table;
147
146
  optimizer::SqlSelect *select= NULL;
148
147
  ReadRecord    info;
149
 
  Select_Lex    *select_lex= &session->getLex()->select_lex;
 
148
  Select_Lex    *select_lex= &session->lex->select_lex;
150
149
  uint64_t     id;
151
150
  List<Item> all_fields;
152
 
  Session::killed_state_t killed_status= Session::NOT_KILLED;
 
151
  Session::killed_state killed_status= Session::NOT_KILLED;
153
152
 
154
 
  DRIZZLE_UPDATE_START(session->getQueryString()->c_str());
 
153
  DRIZZLE_UPDATE_START(session->query.c_str());
155
154
  if (session->openTablesLock(table_list))
156
155
  {
157
156
    DRIZZLE_UPDATE_DONE(1, 0, 0);
165
164
  table->covering_keys= table->getShare()->keys_in_use;
166
165
  table->quick_keys.reset();
167
166
 
168
 
  if (prepare_update(session, table_list, &conds, order_num, order))
169
 
  {
170
 
    DRIZZLE_UPDATE_DONE(1, 0, 0);
171
 
    return 1;
172
 
  }
 
167
  if (mysql_prepare_update(session, table_list, &conds, order_num, order))
 
168
    goto abort;
173
169
 
174
170
  old_covering_keys= table->covering_keys;              // Keys used in WHERE
175
171
  /* Check the fields we are going to modify */
176
172
  if (setup_fields_with_no_wrap(session, 0, fields, MARK_COLUMNS_WRITE, 0, 0))
177
 
  {
178
 
    DRIZZLE_UPDATE_DONE(1, 0, 0);
179
 
    return 1;
180
 
  }
181
 
 
 
173
    goto abort;
182
174
  if (table->timestamp_field)
183
175
  {
184
176
    // Don't set timestamp column if this is modified
191
183
      if (table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_UPDATE ||
192
184
          table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH)
193
185
      {
194
 
        table->setWriteSet(table->timestamp_field->position());
 
186
        table->setWriteSet(table->timestamp_field->field_index);
195
187
      }
196
188
    }
197
189
  }
199
191
  if (setup_fields(session, 0, values, MARK_COLUMNS_READ, 0, 0))
200
192
  {
201
193
    free_underlaid_joins(session, select_lex);
202
 
    DRIZZLE_UPDATE_DONE(1, 0, 0);
203
 
 
204
 
    return 1;
 
194
    goto abort;
205
195
  }
206
196
 
207
197
  if (select_lex->inner_refs_list.elements &&
208
198
    fix_inner_refs(session, all_fields, select_lex, select_lex->ref_pointer_array))
209
199
  {
210
200
    DRIZZLE_UPDATE_DONE(1, 0, 0);
211
 
    return 1;
 
201
    return -1;
212
202
  }
213
203
 
214
204
  if (conds)
229
219
      (table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_UPDATE ||
230
220
       table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
231
221
  {
232
 
    *table->read_set|= *table->write_set;
 
222
    bitmap_union(table->read_set, table->write_set);
233
223
  }
234
224
  // Don't count on usage of 'only index' when calculating which key to use
235
225
  table->covering_keys.reset();
248
238
     */
249
239
    session->main_da.reset_diagnostics_area();
250
240
    free_underlaid_joins(session, select_lex);
251
 
    if (error || session->is_error())
252
 
    {
253
 
      DRIZZLE_UPDATE_DONE(1, 0, 0);
254
 
      return 1;
255
 
    }
 
241
    if (error)
 
242
      goto abort;                               // Error in where
256
243
    DRIZZLE_UPDATE_DONE(0, 0, 0);
257
244
    session->my_ok();                           // No matching records
258
245
    return 0;
276
263
  {
277
264
    used_index= select->quick->index;
278
265
    used_key_is_modified= (!select->quick->unique_key_range() &&
279
 
                          select->quick->is_keys_used(*table->write_set));
 
266
                          select->quick->is_keys_used(table->write_set));
280
267
  }
281
268
  else
282
269
  {
284
271
    if (used_index == MAX_KEY)                  // no index for sort order
285
272
      used_index= table->cursor->key_used_on_scan;
286
273
    if (used_index != MAX_KEY)
287
 
      used_key_is_modified= is_key_used(table, used_index, *table->write_set);
 
274
      used_key_is_modified= is_key_used(table, used_index, table->write_set);
288
275
  }
289
276
 
290
277
 
313
300
        NOTE: filesort will call table->prepare_for_position()
314
301
      */
315
302
      uint32_t         length= 0;
316
 
      SortField  *sortorder;
 
303
      SORT_FIELD  *sortorder;
317
304
      ha_rows examined_rows;
318
 
      FileSort filesort(*session);
319
305
 
320
 
      table->sort.io_cache= new internal::IO_CACHE;
 
306
      table->sort.io_cache = new internal::IO_CACHE;
321
307
 
322
308
      if (!(sortorder=make_unireg_sortorder(order, &length, NULL)) ||
323
 
          (table->sort.found_records= filesort.run(table, sortorder, length,
324
 
                                                   select, limit, 1,
325
 
                                                   examined_rows)) == HA_POS_ERROR)
 
309
          (table->sort.found_records= filesort(session, table, sortorder, length,
 
310
                                               select, limit, 1,
 
311
                                               &examined_rows))
 
312
          == HA_POS_ERROR)
326
313
      {
327
314
        goto err;
328
315
      }
330
317
        Filesort has already found and selected the rows we want to update,
331
318
        so we don't need the where clause
332
319
      */
333
 
      safe_delete(select);
 
320
      delete select;
 
321
      select= 0;
334
322
    }
335
323
    else
336
324
    {
341
329
      */
342
330
 
343
331
      internal::IO_CACHE tempfile;
344
 
      if (tempfile.open_cached_file(drizzle_tmpdir.c_str(),TEMP_PREFIX, DISK_BUFFER_SIZE, MYF(MY_WME)))
345
 
      {
 
332
      if (open_cached_file(&tempfile, drizzle_tmpdir.c_str(),TEMP_PREFIX,
 
333
                           DISK_BUFFER_SIZE, MYF(MY_WME)))
346
334
        goto err;
347
 
      }
348
335
 
349
336
      /* If quick select is used, initialize it before retrieving rows. */
350
337
      if (select && select->quick && select->quick->reset())
364
351
 
365
352
      if (used_index == MAX_KEY || (select && select->quick))
366
353
      {
367
 
        if ((error= info.init_read_record(session, table, select, 0, true)))
368
 
          goto err;
 
354
        info.init_read_record(session, table, select, 0, true);
369
355
      }
370
356
      else
371
357
      {
372
 
        if ((error= info.init_read_record_idx(session, table, 1, used_index)))
373
 
          goto err;
 
358
        info.init_read_record_idx(session, table, 1, used_index);
374
359
      }
375
360
 
376
361
      session->set_proc_info("Searching rows for update");
377
362
      ha_rows tmp_limit= limit;
378
363
 
379
 
      while (not(error= info.read_record(&info)) && not session->getKilled())
 
364
      while (!(error=info.read_record(&info)) && !session->killed)
380
365
      {
381
366
        if (!(select && select->skip_record()))
382
367
        {
383
368
          if (table->cursor->was_semi_consistent_read())
384
369
            continue;  /* repeat the read of the same row if it still exists */
385
370
 
386
 
          table->cursor->position(table->getInsertRecord());
 
371
          table->cursor->position(table->record[0]);
387
372
          if (my_b_write(&tempfile,table->cursor->ref,
388
373
                         table->cursor->ref_length))
389
374
          {
399
384
        else
400
385
          table->cursor->unlock_row();
401
386
      }
402
 
      if (session->getKilled() && not error)
 
387
      if (session->killed && !error)
403
388
        error= 1;                               // Aborted
404
389
      limit= tmp_limit;
405
390
      table->cursor->try_semi_consistent_read(0);
408
393
      /* Change select to use tempfile */
409
394
      if (select)
410
395
      {
411
 
        safe_delete(select->quick);
 
396
        delete select->quick;
412
397
        if (select->free_cond)
413
398
          delete select->cond;
 
399
        select->quick=0;
414
400
        select->cond=0;
415
401
      }
416
402
      else
417
403
      {
418
 
        select= new optimizer::SqlSelect();
 
404
        select= new optimizer::SqlSelect;
419
405
        select->head=table;
420
406
      }
421
 
      if (tempfile.reinit_io_cache(internal::READ_CACHE,0L,0,0))
 
407
      if (reinit_io_cache(&tempfile,internal::READ_CACHE,0L,0,0))
422
408
        error=1;
423
409
      // Read row ptrs from this cursor
424
410
      memcpy(select->file, &tempfile, sizeof(tempfile));
435
421
  if (select && select->quick && select->quick->reset())
436
422
    goto err;
437
423
  table->cursor->try_semi_consistent_read(1);
438
 
  if ((error= info.init_read_record(session, table, select, 0, true)))
439
 
  {
440
 
    goto err;
441
 
  }
 
424
  info.init_read_record(session, table, select, 0, true);
442
425
 
443
426
  updated= found= 0;
444
427
  /*
454
437
  session->set_proc_info("Updating");
455
438
 
456
439
  transactional_table= table->cursor->has_transactions();
457
 
  session->setAbortOnWarning(test(!ignore));
 
440
  session->abort_on_warning= test(!ignore);
458
441
 
459
442
  /*
460
443
    Assure that we can use position()
463
446
  if (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ))
464
447
    table->prepare_for_position();
465
448
 
466
 
  while (not (error=info.read_record(&info)) && not session->getKilled())
 
449
  /*
 
450
    We can use compare_record() to optimize away updates if
 
451
    the table handler is returning all columns OR if
 
452
    if all updated columns are read
 
453
  */
 
454
  can_compare_record= (!(table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ)) ||
 
455
                       bitmap_is_subset(table->write_set, table->read_set));
 
456
 
 
457
  while (!(error=info.read_record(&info)) && !session->killed)
467
458
  {
468
 
    if (not (select && select->skip_record()))
 
459
    if (!(select && select->skip_record()))
469
460
    {
470
461
      if (table->cursor->was_semi_consistent_read())
471
462
        continue;  /* repeat the read of the same row if it still exists */
476
467
 
477
468
      found++;
478
469
 
479
 
      if (! table->records_are_comparable() || table->compare_records())
 
470
      if (!can_compare_record || table->compare_record())
480
471
      {
481
472
        /* Non-batched update */
482
 
        error= table->cursor->updateRecord(table->getUpdateRecord(),
483
 
                                            table->getInsertRecord());
 
473
        error= table->cursor->updateRecord(table->record[1],
 
474
                                            table->record[0]);
484
475
 
485
476
        table->auto_increment_field_not_null= false;
486
477
 
487
478
        if (!error || error == HA_ERR_RECORD_IS_THE_SAME)
488
 
        {
 
479
        {
489
480
          if (error != HA_ERR_RECORD_IS_THE_SAME)
490
481
            updated++;
491
482
          else
492
483
            error= 0;
493
 
        }
494
 
        else if (! ignore ||
 
484
        }
 
485
        else if (! ignore ||
495
486
                 table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
496
 
        {
 
487
        {
497
488
          /*
498
489
            If (ignore && error is ignorable) we don't have to
499
490
            do anything; otherwise...
504
495
            flags|= ME_FATALERROR; /* Other handler errors are fatal */
505
496
 
506
497
          prepare_record_for_error_message(error, table);
507
 
          table->print_error(error,MYF(flags));
508
 
          error= 1;
509
 
          break;
510
 
        }
 
498
          table->print_error(error,MYF(flags));
 
499
          error= 1;
 
500
          break;
 
501
        }
511
502
      }
512
503
 
513
504
      if (!--limit && using_limit)
529
520
    It's assumed that if an error was set in combination with an effective
530
521
    killed status then the error is due to killing.
531
522
  */
532
 
  killed_status= session->getKilled(); // get the status of the volatile
 
523
  killed_status= session->killed; // get the status of the volatile
533
524
  // simulated killing after the loop must be ineffective for binlogging
534
525
  error= (killed_status == Session::NOT_KILLED)?  error : 1;
535
526
 
576
567
     * lp bug# 439719
577
568
     */
578
569
    session->main_da.reset_diagnostics_area();
579
 
    session->my_ok((ulong) session->rowCount(), found, id, buff);
580
 
    session->status_var.updated_row_count+= session->rowCount();
 
570
    session->my_ok((ulong) session->row_count_func, found, id, buff);
581
571
  }
582
 
  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;              /* calc cuted fields */
583
 
  session->setAbortOnWarning(false);
 
572
  session->count_cuted_fields= CHECK_FIELD_IGNORE;              /* calc cuted fields */
 
573
  session->abort_on_warning= 0;
584
574
  DRIZZLE_UPDATE_DONE((error >= 0 || session->is_error()), found, updated);
585
575
  return ((error >= 0 || session->is_error()) ? 1 : 0);
586
576
 
587
577
err:
588
 
  if (error != 0)
589
 
    table->print_error(error,MYF(0));
590
 
 
591
578
  delete select;
592
579
  free_underlaid_joins(session, select_lex);
593
580
  if (table->key_read)
595
582
    table->key_read=0;
596
583
    table->cursor->extra(HA_EXTRA_NO_KEYREAD);
597
584
  }
598
 
  session->setAbortOnWarning(false);
 
585
  session->abort_on_warning= 0;
599
586
 
 
587
abort:
600
588
  DRIZZLE_UPDATE_DONE(1, 0, 0);
601
589
  return 1;
602
590
}
605
593
  Prepare items in UPDATE statement
606
594
 
607
595
  SYNOPSIS
608
 
    prepare_update()
 
596
    mysql_prepare_update()
609
597
    session                     - thread handler
610
598
    table_list          - global/local table list
611
599
    conds               - conditions
616
604
    false OK
617
605
    true  error
618
606
*/
619
 
bool prepare_update(Session *session, TableList *table_list,
620
 
                         Item **conds, uint32_t order_num, Order *order)
 
607
bool mysql_prepare_update(Session *session, TableList *table_list,
 
608
                         Item **conds, uint32_t order_num, order_st *order)
621
609
{
622
610
  List<Item> all_fields;
623
 
  Select_Lex *select_lex= &session->getLex()->select_lex;
 
611
  Select_Lex *select_lex= &session->lex->select_lex;
624
612
 
625
 
  session->getLex()->allow_sum_func= 0;
 
613
  session->lex->allow_sum_func= 0;
626
614
 
627
615
  if (setup_tables_and_check_access(session, &select_lex->context,
628
616
                                    &select_lex->top_join_list,
640
628
    TableList *duplicate;
641
629
    if ((duplicate= unique_table(table_list, table_list->next_global)))
642
630
    {
643
 
      my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
 
631
      my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
644
632
      return true;
645
633
    }
646
634
  }