~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_update.cc

  • Committer: Monty Taylor
  • Date: 2008-11-16 23:47:43 UTC
  • mto: (584.1.10 devel)
  • mto: This revision was merged to the branch mainline in revision 589.
  • Revision ID: monty@inaugust.com-20081116234743-c38gmv0pa2kdefaj
BrokeĀ outĀ cached_item.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
  Single table and multi table updates of tables.
19
19
  Multi-table updates were introduced by Sinisa & Monty
20
20
*/
21
 
#include "config.h"
22
 
#include "drizzled/sql_select.h"
23
 
#include "drizzled/error.h"
24
 
#include "drizzled/probes.h"
25
 
#include "drizzled/sql_base.h"
26
 
#include "drizzled/field/timestamp.h"
27
 
#include "drizzled/sql_parse.h"
28
 
#include "drizzled/optimizer/range.h"
29
 
#include "drizzled/records.h"
30
 
#include "drizzled/internal/my_sys.h"
31
 
#include "drizzled/internal/iocache.h"
32
 
 
33
 
#include <list>
34
 
 
35
 
using namespace std;
36
 
 
37
 
namespace drizzled
 
21
#include <drizzled/server_includes.h>
 
22
#include <drizzled/sql_select.h>
 
23
#include <drizzled/error.h>
 
24
#include <drizzled/probes.h>
 
25
 
 
26
/*
 
27
  check that all fields are real fields
 
28
 
 
29
  SYNOPSIS
 
30
    check_fields()
 
31
    session             thread handler
 
32
    items           Items for check
 
33
 
 
34
  RETURN
 
35
    true  Items can't be used in UPDATE
 
36
    false Items are OK
 
37
*/
 
38
 
 
39
static bool check_fields(Session *session, List<Item> &items)
38
40
{
 
41
  List_iterator<Item> it(items);
 
42
  Item *item;
 
43
  Item_field *field;
 
44
 
 
45
  while ((item= it++))
 
46
  {
 
47
    if (!(field= item->filed_for_view_update()))
 
48
    {
 
49
      /* item has name, because it comes from VIEW SELECT list */
 
50
      my_error(ER_NONUPDATEABLE_COLUMN, MYF(0), item->name);
 
51
      return true;
 
52
    }
 
53
    /*
 
54
      we make temporary copy of Item_field, to avoid influence of changing
 
55
      result_field on Item_ref which refer on this field
 
56
    */
 
57
    session->change_item_tree(it.ref(), new Item_field(session, field));
 
58
  }
 
59
  return false;
 
60
}
 
61
 
39
62
 
40
63
/**
41
64
  Re-read record if more columns are needed for error message.
54
77
  Field **field_p;
55
78
  Field *field;
56
79
  uint32_t keynr;
57
 
  MyBitmap unique_map; /* Fields in offended unique. */
 
80
  MY_BITMAP unique_map; /* Fields in offended unique. */
58
81
  my_bitmap_map unique_map_buf[bitmap_buffer_size(MAX_FIELDS)];
59
 
 
 
82
  
60
83
  /*
61
84
    Only duplicate key errors print the key value.
62
85
    If storage engine does always read all columns, we have the value alraedy.
63
86
  */
64
87
  if ((error != HA_ERR_FOUND_DUPP_KEY) ||
65
 
      !(table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ)))
 
88
      !(table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ))
66
89
    return;
67
90
 
68
91
  /*
69
92
    Get the number of the offended index.
70
93
    We will see MAX_KEY if the engine cannot determine the affected index.
71
94
  */
72
 
  if ((keynr= table->get_dup_key(error)) >= MAX_KEY)
 
95
  if ((keynr= table->file->get_dup_key(error)) >= MAX_KEY)
73
96
    return;
74
97
 
75
98
  /* Create unique_map with all fields used by that index. */
76
 
  unique_map.init(unique_map_buf, table->s->fields);
 
99
  bitmap_init(&unique_map, unique_map_buf, table->s->fields, false);
77
100
  table->mark_columns_used_by_index_no_reset(keynr, &unique_map);
78
101
 
79
102
  /* Subtract read_set and write_set. */
85
108
    nor in write_set, we must re-read the record.
86
109
    Otherwise no need to do anything.
87
110
  */
88
 
  if (unique_map.isClearAll())
 
111
  if (bitmap_is_clear_all(&unique_map))
89
112
    return;
90
113
 
91
 
  /* Get identifier of last read record into table->cursor->ref. */
92
 
  table->cursor->position(table->record[0]);
 
114
  /* Get identifier of last read record into table->file->ref. */
 
115
  table->file->position(table->record[0]);
93
116
  /* Add all fields used by unique index to read_set. */
94
117
  bitmap_union(table->read_set, &unique_map);
95
 
  /* Read record that is identified by table->cursor->ref. */
96
 
  (void) table->cursor->rnd_pos(table->record[1], table->cursor->ref);
 
118
  /* Tell the engine about the new set. */
 
119
  table->file->column_bitmaps_signal();
 
120
  /* Read record that is identified by table->file->ref. */
 
121
  (void) table->file->rnd_pos(table->record[1], table->file->ref);
97
122
  /* Copy the newly read columns into the new record. */
98
123
  for (field_p= table->field; (field= *field_p); field_p++)
99
 
    if (unique_map.isBitSet(field->field_index))
 
124
    if (bitmap_is_set(&unique_map, field->field_index))
100
125
      field->copy_from_tmp(table->s->rec_buff_length);
101
126
 
102
127
  return;
112
137
    fields              fields for update
113
138
    values              values of fields for update
114
139
    conds               WHERE clause expression
115
 
    order_num           number of elemen in ORDER BY clause
 
140
    order_num           number of elemen in order_st BY clause
116
141
    order               order_st BY clause list
117
142
    limit               limit clause
118
143
    handle_duplicates   how to handle duplicates
119
144
 
120
145
  RETURN
121
146
    0  - OK
 
147
    2  - privilege check and openning table passed, but we need to convert to
 
148
         multi-update because of view substitution
122
149
    1  - error
123
150
*/
124
151
 
129
156
                 bool ignore)
130
157
{
131
158
  bool          using_limit= limit != HA_POS_ERROR;
132
 
  bool          used_key_is_modified;
133
 
  bool          transactional_table;
 
159
  bool          safe_update= test(session->options & OPTION_SAFE_UPDATES);
 
160
  bool          used_key_is_modified, transactional_table, will_batch;
134
161
  bool          can_compare_record;
135
 
  int           error;
 
162
  int           error, loc_error;
136
163
  uint          used_index= MAX_KEY, dup_key_found;
137
164
  bool          need_sort= true;
 
165
  uint32_t          table_count= 0;
138
166
  ha_rows       updated, found;
139
167
  key_map       old_covering_keys;
140
168
  Table         *table;
141
 
  optimizer::SqlSelect *select= NULL;
 
169
  SQL_SELECT    *select;
142
170
  READ_RECORD   info;
143
 
  Select_Lex    *select_lex= &session->lex->select_lex;
 
171
  SELECT_LEX    *select_lex= &session->lex->select_lex;
 
172
  bool          need_reopen;
144
173
  uint64_t     id;
145
174
  List<Item> all_fields;
146
175
  Session::killed_state killed_status= Session::NOT_KILLED;
147
 
 
148
 
  DRIZZLE_UPDATE_START(session->query.c_str());
149
 
  if (session->openTablesLock(table_list))
 
176
  
 
177
  for ( ; ; )
150
178
  {
151
 
    DRIZZLE_UPDATE_DONE(1, 0, 0);
152
 
    return 1;
 
179
    if (open_tables(session, &table_list, &table_count, 0))
 
180
      return(1);
 
181
 
 
182
    if (!lock_tables(session, table_list, table_count, &need_reopen))
 
183
      break;
 
184
    if (!need_reopen)
 
185
      return(1);
 
186
    close_tables_for_reopen(session, &table_list);
153
187
  }
154
188
 
 
189
  if (mysql_handle_derived(session->lex, &mysql_derived_prepare) ||
 
190
      (session->fill_derived_tables() &&
 
191
       mysql_handle_derived(session->lex, &mysql_derived_filling)))
 
192
    return(1);
 
193
 
 
194
  DRIZZLE_UPDATE_START();
155
195
  session->set_proc_info("init");
156
196
  table= table_list->table;
157
197
 
158
198
  /* Calculate "table->covering_keys" based on the WHERE */
159
199
  table->covering_keys= table->s->keys_in_use;
160
 
  table->quick_keys.reset();
 
200
  table->quick_keys.clear_all();
161
201
 
162
202
  if (mysql_prepare_update(session, table_list, &conds, order_num, order))
163
203
    goto abort;
165
205
  old_covering_keys= table->covering_keys;              // Keys used in WHERE
166
206
  /* Check the fields we are going to modify */
167
207
  if (setup_fields_with_no_wrap(session, 0, fields, MARK_COLUMNS_WRITE, 0, 0))
168
 
    goto abort;
 
208
    goto abort;                               /* purecov: inspected */
169
209
  if (table->timestamp_field)
170
210
  {
171
211
    // Don't set timestamp column if this is modified
172
 
    if (table->timestamp_field->isWriteSet())
 
212
    if (bitmap_is_set(table->write_set,
 
213
                      table->timestamp_field->field_index))
173
214
      table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
174
215
    else
175
216
    {
176
217
      if (table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_UPDATE ||
177
218
          table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH)
178
 
        table->setWriteSet(table->timestamp_field->field_index);
 
219
        bitmap_set_bit(table->write_set,
 
220
                       table->timestamp_field->field_index);
179
221
    }
180
222
  }
181
223
 
182
224
  if (setup_fields(session, 0, values, MARK_COLUMNS_READ, 0, 0))
183
225
  {
184
226
    free_underlaid_joins(session, select_lex);
185
 
    goto abort;
 
227
    goto abort;                               /* purecov: inspected */
186
228
  }
187
229
 
188
230
  if (select_lex->inner_refs_list.elements &&
189
231
    fix_inner_refs(session, all_fields, select_lex, select_lex->ref_pointer_array))
190
232
  {
191
 
    DRIZZLE_UPDATE_DONE(1, 0, 0);
192
 
    return -1;
 
233
    DRIZZLE_UPDATE_END();
 
234
    return(-1);
193
235
  }
194
236
 
195
237
  if (conds)
205
247
    update force the table handler to retrieve write-only fields to be able
206
248
    to compare records and detect data change.
207
249
  */
208
 
  if (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ) &&
 
250
  if (table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ &&
209
251
      table->timestamp_field &&
210
252
      (table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_UPDATE ||
211
253
       table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
212
254
    bitmap_union(table->read_set, table->write_set);
213
255
  // Don't count on usage of 'only index' when calculating which key to use
214
 
  table->covering_keys.reset();
215
 
 
216
 
  /* Update the table->cursor->stats.records number */
217
 
  table->cursor->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK);
218
 
 
219
 
  select= optimizer::make_select(table, 0, 0, conds, 0, &error);
 
256
  table->covering_keys.clear_all();
 
257
 
 
258
  /* Update the table->file->stats.records number */
 
259
  table->file->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK);
 
260
 
 
261
  select= make_select(table, 0, 0, conds, 0, &error);
220
262
  if (error || !limit ||
221
 
      (select && select->check_quick(session, false, limit)))
 
263
      (select && select->check_quick(session, safe_update, limit)))
222
264
  {
223
265
    delete select;
224
 
    /**
225
 
     * Resetting the Diagnostic area to prevent
226
 
     * lp bug# 439719
227
 
     */
228
 
    session->main_da.reset_diagnostics_area();
229
266
    free_underlaid_joins(session, select_lex);
230
267
    if (error)
231
268
      goto abort;                               // Error in where
232
 
    DRIZZLE_UPDATE_DONE(0, 0, 0);
233
 
    session->my_ok();                           // No matching records
234
 
    return 0;
 
269
    DRIZZLE_UPDATE_END();
 
270
    my_ok(session);                             // No matching records
 
271
    return(0);
235
272
  }
236
273
  if (!select && limit != HA_POS_ERROR)
237
274
  {
238
 
    if ((used_index= optimizer::get_index_for_order(table, order, limit)) != MAX_KEY)
 
275
    if ((used_index= get_index_for_order(table, order, limit)) != MAX_KEY)
239
276
      need_sort= false;
240
277
  }
241
278
  /* If running in safe sql mode, don't allow updates without keys */
242
 
  if (table->quick_keys.none())
 
279
  if (table->quick_keys.is_clear_all())
243
280
  {
244
281
    session->server_status|=SERVER_QUERY_NO_INDEX_USED;
 
282
    if (safe_update && !using_limit)
 
283
    {
 
284
      my_message(ER_UPDATE_WITHOUT_KEY_IN_SAFE_MODE,
 
285
                 ER(ER_UPDATE_WITHOUT_KEY_IN_SAFE_MODE), MYF(0));
 
286
      goto err;
 
287
    }
245
288
  }
246
289
 
247
290
  table->mark_columns_needed_for_update();
248
291
 
249
292
  /* Check if we are modifying a key that we are used to search with */
250
 
 
 
293
  
251
294
  if (select && select->quick)
252
295
  {
253
296
    used_index= select->quick->index;
258
301
  {
259
302
    used_key_is_modified= 0;
260
303
    if (used_index == MAX_KEY)                  // no index for sort order
261
 
      used_index= table->cursor->key_used_on_scan;
 
304
      used_index= table->file->key_used_on_scan;
262
305
    if (used_index != MAX_KEY)
263
306
      used_key_is_modified= is_key_used(table, used_index, table->write_set);
264
307
  }
270
313
      We can't update table directly;  We must first search after all
271
314
      matching rows before updating the table!
272
315
    */
273
 
    if (used_index < MAX_KEY && old_covering_keys.test(used_index))
 
316
    if (used_index < MAX_KEY && old_covering_keys.is_set(used_index))
274
317
    {
275
318
      table->key_read=1;
276
319
      table->mark_columns_used_by_index(used_index);
292
335
      SORT_FIELD  *sortorder;
293
336
      ha_rows examined_rows;
294
337
 
295
 
      table->sort.io_cache = new internal::IO_CACHE;
296
 
      memset(table->sort.io_cache, 0, sizeof(internal::IO_CACHE));
297
 
 
 
338
      table->sort.io_cache = (IO_CACHE *) my_malloc(sizeof(IO_CACHE),
 
339
                                                    MYF(MY_FAE | MY_ZEROFILL));
298
340
      if (!(sortorder=make_unireg_sortorder(order, &length, NULL)) ||
299
341
          (table->sort.found_records= filesort(session, table, sortorder, length,
300
342
                                               select, limit, 1,
318
360
        update these in a separate loop based on the pointer.
319
361
      */
320
362
 
321
 
      internal::IO_CACHE tempfile;
 
363
      IO_CACHE tempfile;
322
364
      if (open_cached_file(&tempfile, drizzle_tmpdir,TEMP_PREFIX,
323
365
                           DISK_BUFFER_SIZE, MYF(MY_WME)))
324
366
        goto err;
326
368
      /* If quick select is used, initialize it before retrieving rows. */
327
369
      if (select && select->quick && select->quick->reset())
328
370
        goto err;
329
 
      table->cursor->try_semi_consistent_read(1);
 
371
      table->file->try_semi_consistent_read(1);
330
372
 
331
373
      /*
332
374
        When we get here, we have one of the following options:
351
393
      {
352
394
        if (!(select && select->skip_record()))
353
395
        {
354
 
          if (table->cursor->was_semi_consistent_read())
 
396
          if (table->file->was_semi_consistent_read())
355
397
            continue;  /* repeat the read of the same row if it still exists */
356
398
 
357
 
          table->cursor->position(table->record[0]);
358
 
          if (my_b_write(&tempfile,table->cursor->ref,
359
 
                         table->cursor->ref_length))
 
399
          table->file->position(table->record[0]);
 
400
          if (my_b_write(&tempfile,table->file->ref,
 
401
                         table->file->ref_length))
360
402
          {
361
 
            error=1;
362
 
            break;
 
403
            error=1; /* purecov: inspected */
 
404
            break; /* purecov: inspected */
363
405
          }
364
406
          if (!--limit && using_limit)
365
407
          {
368
410
          }
369
411
        }
370
412
        else
371
 
          table->cursor->unlock_row();
 
413
          table->file->unlock_row();
372
414
      }
373
415
      if (session->killed && !error)
374
416
        error= 1;                               // Aborted
375
417
      limit= tmp_limit;
376
 
      table->cursor->try_semi_consistent_read(0);
 
418
      table->file->try_semi_consistent_read(0);
377
419
      end_read_record(&info);
378
 
 
 
420
     
379
421
      /* Change select to use tempfile */
380
422
      if (select)
381
423
      {
387
429
      }
388
430
      else
389
431
      {
390
 
        select= new optimizer::SqlSelect;
 
432
        select= new SQL_SELECT;
391
433
        select->head=table;
392
434
      }
393
 
      if (reinit_io_cache(&tempfile,internal::READ_CACHE,0L,0,0))
394
 
        error=1;
395
 
      // Read row ptrs from this cursor
396
 
      memcpy(select->file, &tempfile, sizeof(tempfile));
 
435
      if (reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
 
436
        error=1; /* purecov: inspected */
 
437
      select->file=tempfile;                    // Read row ptrs from this file
397
438
      if (error >= 0)
398
439
        goto err;
399
440
    }
402
443
  }
403
444
 
404
445
  if (ignore)
405
 
    table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
406
 
 
 
446
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
447
  
407
448
  if (select && select->quick && select->quick->reset())
408
449
    goto err;
409
 
  table->cursor->try_semi_consistent_read(1);
 
450
  table->file->try_semi_consistent_read(1);
410
451
  init_read_record(&info,session,table,select,0,1);
411
452
 
412
453
  updated= found= 0;
413
 
  /*
414
 
   * Per the SQL standard, inserting NULL into a NOT NULL
415
 
   * field requires an error to be thrown.
416
 
   *
417
 
   * @NOTE
418
 
   *
419
 
   * NULL check and handling occurs in field_conv.cc
420
 
   */
421
 
  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
422
 
  session->cuted_fields= 0L;
 
454
  /* Generate an error when trying to set a NOT NULL field to NULL. */
 
455
  session->count_cuted_fields= ignore ? CHECK_FIELD_WARN
 
456
                                  : CHECK_FIELD_ERROR_FOR_NULL;
 
457
  session->cuted_fields=0L;
423
458
  session->set_proc_info("Updating");
424
459
 
425
 
  transactional_table= table->cursor->has_transactions();
 
460
  transactional_table= table->file->has_transactions();
426
461
  session->abort_on_warning= test(!ignore);
 
462
  will_batch= !table->file->start_bulk_update();
427
463
 
428
464
  /*
429
465
    Assure that we can use position()
430
466
    if we need to create an error message.
431
467
  */
432
 
  if (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ))
 
468
  if (table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ)
433
469
    table->prepare_for_position();
434
470
 
435
471
  /*
437
473
    the table handler is returning all columns OR if
438
474
    if all updated columns are read
439
475
  */
440
 
  can_compare_record= (!(table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ)) ||
 
476
  can_compare_record= (!(table->file->ha_table_flags() &
 
477
                         HA_PARTIAL_COLUMN_READ) ||
441
478
                       bitmap_is_subset(table->write_set, table->read_set));
442
479
 
443
480
  while (!(error=info.read_record(&info)) && !session->killed)
444
481
  {
445
482
    if (!(select && select->skip_record()))
446
483
    {
447
 
      if (table->cursor->was_semi_consistent_read())
 
484
      if (table->file->was_semi_consistent_read())
448
485
        continue;  /* repeat the read of the same row if it still exists */
449
486
 
450
 
      table->storeRecord();
451
 
      if (fill_record(session, fields, values))
452
 
        break;
 
487
      store_record(table,record[1]);
 
488
      if (fill_record(session, fields, values, 0))
 
489
        break; /* purecov: inspected */
453
490
 
454
491
      found++;
455
492
 
456
493
      if (!can_compare_record || table->compare_record())
457
494
      {
458
 
        /* Non-batched update */
459
 
        error= table->cursor->ha_update_row(table->record[1],
 
495
        if (will_batch)
 
496
        {
 
497
          /*
 
498
            Typically a batched handler can execute the batched jobs when:
 
499
            1) When specifically told to do so
 
500
            2) When it is not a good idea to batch anymore
 
501
            3) When it is necessary to send batch for other reasons
 
502
               (One such reason is when READ's must be performed)
 
503
 
 
504
            1) is covered by exec_bulk_update calls.
 
505
            2) and 3) is handled by the bulk_update_row method.
 
506
            
 
507
            bulk_update_row can execute the updates including the one
 
508
            defined in the bulk_update_row or not including the row
 
509
            in the call. This is up to the handler implementation and can
 
510
            vary from call to call.
 
511
 
 
512
            The dup_key_found reports the number of duplicate keys found
 
513
            in those updates actually executed. It only reports those if
 
514
            the extra call with HA_EXTRA_IGNORE_DUP_KEY have been issued.
 
515
            If this hasn't been issued it returns an error code and can
 
516
            ignore this number. Thus any handler that implements batching
 
517
            for UPDATE IGNORE must also handle this extra call properly.
 
518
 
 
519
            If a duplicate key is found on the record included in this
 
520
            call then it should be included in the count of dup_key_found
 
521
            and error should be set to 0 (only if these errors are ignored).
 
522
          */
 
523
          error= table->file->ha_bulk_update_row(table->record[1],
 
524
                                                 table->record[0],
 
525
                                                 &dup_key_found);
 
526
          limit+= dup_key_found;
 
527
          updated-= dup_key_found;
 
528
        }
 
529
        else
 
530
        {
 
531
          /* Non-batched update */
 
532
          error= table->file->ha_update_row(table->record[1],
460
533
                                            table->record[0]);
 
534
        }
461
535
        if (!error || error == HA_ERR_RECORD_IS_THE_SAME)
462
536
        {
463
537
          if (error != HA_ERR_RECORD_IS_THE_SAME)
465
539
          else
466
540
            error= 0;
467
541
        }
468
 
        else if (! ignore ||
469
 
                 table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
 
542
        else if (!ignore ||
 
543
                 table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
470
544
        {
471
545
          /*
472
546
            If (ignore && error is ignorable) we don't have to
474
548
          */
475
549
          myf flags= 0;
476
550
 
477
 
          if (table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
 
551
          if (table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
478
552
            flags|= ME_FATALERROR; /* Other handler errors are fatal */
479
553
 
480
554
          prepare_record_for_error_message(error, table);
481
 
          table->print_error(error,MYF(flags));
 
555
          table->file->print_error(error,MYF(flags));
482
556
          error= 1;
483
557
          break;
484
558
        }
486
560
 
487
561
      if (!--limit && using_limit)
488
562
      {
489
 
        error= -1;                              // Simulate end of cursor
490
 
        break;
 
563
        /*
 
564
          We have reached end-of-file in most common situations where no
 
565
          batching has occurred and if batching was supposed to occur but
 
566
          no updates were made and finally when the batch execution was
 
567
          performed without error and without finding any duplicate keys.
 
568
          If the batched updates were performed with errors we need to
 
569
          check and if no error but duplicate key's found we need to
 
570
          continue since those are not counted for in limit.
 
571
        */
 
572
        if (will_batch &&
 
573
            ((error= table->file->exec_bulk_update(&dup_key_found)) ||
 
574
             dup_key_found))
 
575
        {
 
576
          if (error)
 
577
          {
 
578
            /* purecov: begin inspected */
 
579
            /*
 
580
              The handler should not report error of duplicate keys if they
 
581
              are ignored. This is a requirement on batching handlers.
 
582
            */
 
583
            prepare_record_for_error_message(error, table);
 
584
            table->file->print_error(error,MYF(0));
 
585
            error= 1;
 
586
            break;
 
587
            /* purecov: end */
 
588
          }
 
589
          /*
 
590
            Either an error was found and we are ignoring errors or there
 
591
            were duplicate keys found. In both cases we need to correct
 
592
            the counters and continue the loop.
 
593
          */
 
594
          limit= dup_key_found; //limit is 0 when we get here so need to +
 
595
          updated-= dup_key_found;
 
596
        }
 
597
        else
 
598
        {
 
599
          error= -1;                            // Simulate end of file
 
600
          break;
 
601
        }
491
602
      }
492
603
    }
493
604
    else
494
 
      table->cursor->unlock_row();
 
605
      table->file->unlock_row();
495
606
    session->row_count++;
496
607
  }
497
608
  dup_key_found= 0;
507
618
  // simulated killing after the loop must be ineffective for binlogging
508
619
  error= (killed_status == Session::NOT_KILLED)?  error : 1;
509
620
 
510
 
  updated-= dup_key_found;
511
 
  table->cursor->try_semi_consistent_read(0);
 
621
  if (error &&
 
622
      will_batch &&
 
623
      (loc_error= table->file->exec_bulk_update(&dup_key_found)))
 
624
    /*
 
625
      An error has occurred when a batched update was performed and returned
 
626
      an error indication. It cannot be an allowed duplicate key error since
 
627
      we require the batching handler to treat this as a normal behavior.
 
628
 
 
629
      Otherwise we simply remove the number of duplicate keys records found
 
630
      in the batched update.
 
631
    */
 
632
  {
 
633
    /* purecov: begin inspected */
 
634
    prepare_record_for_error_message(loc_error, table);
 
635
    table->file->print_error(loc_error,MYF(ME_FATALERROR));
 
636
    error= 1;
 
637
    /* purecov: end */
 
638
  }
 
639
  else
 
640
    updated-= dup_key_found;
 
641
  if (will_batch)
 
642
    table->file->end_bulk_update();
 
643
  table->file->try_semi_consistent_read(0);
512
644
 
513
645
  if (!transactional_table && updated > 0)
514
 
    session->transaction.stmt.markModifiedNonTransData();
 
646
    session->transaction.stmt.modified_non_trans_table= true;
515
647
 
516
648
  end_read_record(&info);
517
649
  delete select;
518
650
  session->set_proc_info("end");
519
 
  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
651
  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
520
652
 
521
653
  /*
522
654
    error < 0 means really no error at all: we processed all rows until the
527
659
    Sometimes we want to binlog even if we updated no rows, in case user used
528
660
    it to be sure master and slave are in same state.
529
661
  */
530
 
  if ((error < 0) || session->transaction.stmt.hasModifiedNonTransData())
 
662
  if ((error < 0) || session->transaction.stmt.modified_non_trans_table)
531
663
  {
532
 
    if (session->transaction.stmt.hasModifiedNonTransData())
533
 
      session->transaction.all.markModifiedNonTransData();
 
664
    if (mysql_bin_log.is_open())
 
665
    {
 
666
      if (error < 0)
 
667
        session->clear_error();
 
668
      if (session->binlog_query(Session::ROW_QUERY_TYPE,
 
669
                            session->query, session->query_length,
 
670
                            transactional_table, false, killed_status) &&
 
671
          transactional_table)
 
672
      {
 
673
        error=1;                                // Rollback update
 
674
      }
 
675
    }
 
676
    if (session->transaction.stmt.modified_non_trans_table)
 
677
      session->transaction.all.modified_non_trans_table= true;
534
678
  }
535
 
  assert(transactional_table || !updated || session->transaction.stmt.hasModifiedNonTransData());
 
679
  assert(transactional_table || !updated || session->transaction.stmt.modified_non_trans_table);
536
680
  free_underlaid_joins(session, select_lex);
537
681
 
538
682
  /* If LAST_INSERT_ID(X) was used, report X */
539
683
  id= session->arg_of_last_insert_id_function ?
540
684
    session->first_successful_insert_id_in_prev_stmt : 0;
541
685
 
 
686
  DRIZZLE_UPDATE_END();
542
687
  if (error < 0)
543
688
  {
544
689
    char buff[STRING_BUFFER_USUAL_SIZE];
545
 
    snprintf(buff, sizeof(buff), ER(ER_UPDATE_INFO), (ulong) found, (ulong) updated,
 
690
    sprintf(buff, ER(ER_UPDATE_INFO), (ulong) found, (ulong) updated,
546
691
            (ulong) session->cuted_fields);
547
 
    session->row_count_func= updated;
548
 
    /**
549
 
     * Resetting the Diagnostic area to prevent
550
 
     * lp bug# 439719
551
 
     */
552
 
    session->main_da.reset_diagnostics_area();
553
 
    session->my_ok((ulong) session->row_count_func, found, id, buff);
 
692
    session->row_count_func=
 
693
      (session->client_capabilities & CLIENT_FOUND_ROWS) ? found : updated;
 
694
    my_ok(session, (ulong) session->row_count_func, id, buff);
554
695
  }
555
696
  session->count_cuted_fields= CHECK_FIELD_IGNORE;              /* calc cuted fields */
556
697
  session->abort_on_warning= 0;
557
 
  DRIZZLE_UPDATE_DONE((error >= 0 || session->is_error()), found, updated);
558
 
  return ((error >= 0 || session->is_error()) ? 1 : 0);
 
698
  return((error >= 0 || session->is_error()) ? 1 : 0);
559
699
 
560
700
err:
561
701
  delete select;
563
703
  if (table->key_read)
564
704
  {
565
705
    table->key_read=0;
566
 
    table->cursor->extra(HA_EXTRA_NO_KEYREAD);
 
706
    table->file->extra(HA_EXTRA_NO_KEYREAD);
567
707
  }
568
708
  session->abort_on_warning= 0;
569
709
 
570
710
abort:
571
 
  DRIZZLE_UPDATE_DONE(1, 0, 0);
572
 
  return 1;
 
711
  DRIZZLE_UPDATE_END();
 
712
  return(1);
573
713
}
574
714
 
575
715
/*
580
720
    session                     - thread handler
581
721
    table_list          - global/local table list
582
722
    conds               - conditions
583
 
    order_num           - number of ORDER BY list entries
584
 
    order               - ORDER BY clause list
 
723
    order_num           - number of order_st BY list entries
 
724
    order               - order_st BY clause list
585
725
 
586
726
  RETURN VALUE
587
727
    false OK
591
731
                         Item **conds, uint32_t order_num, order_st *order)
592
732
{
593
733
  List<Item> all_fields;
594
 
  Select_Lex *select_lex= &session->lex->select_lex;
 
734
  SELECT_LEX *select_lex= &session->lex->select_lex;
 
735
  
 
736
  /*
 
737
    Statement-based replication of UPDATE ... LIMIT is not safe as order of
 
738
    rows is not defined, so in mixed mode we go to row-based.
 
739
 
 
740
    Note that we may consider a statement as safe if order_st BY primary_key
 
741
    is present. However it may confuse users to see very similiar statements
 
742
    replicated differently.
 
743
  */
 
744
  if (session->lex->current_select->select_limit)
 
745
  {
 
746
    session->lex->set_stmt_unsafe();
 
747
    session->set_current_stmt_binlog_row_based();
 
748
  }
595
749
 
596
750
  session->lex->allow_sum_func= 0;
597
751
 
598
 
  if (setup_tables_and_check_access(session, &select_lex->context,
 
752
  if (setup_tables_and_check_access(session, &select_lex->context, 
599
753
                                    &select_lex->top_join_list,
600
754
                                    table_list,
601
755
                                    &select_lex->leaf_tables,
602
756
                                    false) ||
603
 
      session->setup_conds(table_list, conds) ||
 
757
      setup_conds(session, table_list, select_lex->leaf_tables, conds) ||
604
758
      select_lex->setup_ref_array(session, order_num) ||
605
759
      setup_order(session, select_lex->ref_pointer_array,
606
760
                  table_list, all_fields, all_fields, order))
607
 
    return true;
 
761
    return(true);
608
762
 
609
763
  /* Check that we are not using table that we are updating in a sub select */
610
764
  {
611
765
    TableList *duplicate;
612
 
    if ((duplicate= unique_table(table_list, table_list->next_global)))
 
766
    if ((duplicate= unique_table(session, table_list, table_list->next_global, 0)))
613
767
    {
 
768
      update_non_unique_table_error(table_list, "UPDATE", duplicate);
614
769
      my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
615
 
      return true;
616
 
    }
617
 
  }
618
 
 
 
770
      return(true);
 
771
    }
 
772
  }
 
773
 
 
774
  return(false);
 
775
}
 
776
 
 
777
 
 
778
/***************************************************************************
 
779
  Update multiple tables from join 
 
780
***************************************************************************/
 
781
 
 
782
/*
 
783
  Get table map for list of Item_field
 
784
*/
 
785
 
 
786
static table_map get_table_map(List<Item> *items)
 
787
{
 
788
  List_iterator_fast<Item> item_it(*items);
 
789
  Item_field *item;
 
790
  table_map map= 0;
 
791
 
 
792
  while ((item= (Item_field *) item_it++)) 
 
793
    map|= item->used_tables();
 
794
  return map;
 
795
}
 
796
 
 
797
 
 
798
/*
 
799
  make update specific preparation and checks after opening tables
 
800
 
 
801
  SYNOPSIS
 
802
    mysql_multi_update_prepare()
 
803
    session         thread handler
 
804
 
 
805
  RETURN
 
806
    false OK
 
807
    true  Error
 
808
*/
 
809
 
 
810
int mysql_multi_update_prepare(Session *session)
 
811
{
 
812
  LEX *lex= session->lex;
 
813
  TableList *table_list= lex->query_tables;
 
814
  TableList *tl, *leaves;
 
815
  List<Item> *fields= &lex->select_lex.item_list;
 
816
  table_map tables_for_update;
 
817
  bool update_view= 0;
 
818
  /*
 
819
    if this multi-update was converted from usual update, here is table
 
820
    counter else junk will be assigned here, but then replaced with real
 
821
    count in open_tables()
 
822
  */
 
823
  uint32_t  table_count= lex->table_count;
 
824
  const bool using_lock_tables= session->locked_tables != 0;
 
825
  bool original_multiupdate= (session->lex->sql_command == SQLCOM_UPDATE_MULTI);
 
826
  bool need_reopen= false;
 
827
  
 
828
 
 
829
  /* following need for prepared statements, to run next time multi-update */
 
830
  session->lex->sql_command= SQLCOM_UPDATE_MULTI;
 
831
 
 
832
reopen_tables:
 
833
 
 
834
  /* open tables and create derived ones, but do not lock and fill them */
 
835
  if (((original_multiupdate || need_reopen) &&
 
836
       open_tables(session, &table_list, &table_count, 0)) ||
 
837
      mysql_handle_derived(lex, &mysql_derived_prepare))
 
838
    return(true);
 
839
  /*
 
840
    setup_tables() need for VIEWs. JOIN::prepare() will call setup_tables()
 
841
    second time, but this call will do nothing (there are check for second
 
842
    call in setup_tables()).
 
843
  */
 
844
 
 
845
  if (setup_tables_and_check_access(session, &lex->select_lex.context,
 
846
                                    &lex->select_lex.top_join_list,
 
847
                                    table_list,
 
848
                                    &lex->select_lex.leaf_tables, false))
 
849
    return(true);
 
850
 
 
851
  if (setup_fields_with_no_wrap(session, 0, *fields, MARK_COLUMNS_WRITE, 0, 0))
 
852
    return(true);
 
853
 
 
854
  if (update_view && check_fields(session, *fields))
 
855
  {
 
856
    return(true);
 
857
  }
 
858
 
 
859
  tables_for_update= get_table_map(fields);
 
860
 
 
861
  /*
 
862
    Setup timestamp handling and locking mode
 
863
  */
 
864
  leaves= lex->select_lex.leaf_tables;
 
865
  for (tl= leaves; tl; tl= tl->next_leaf)
 
866
  {
 
867
    Table *table= tl->table;
 
868
    /* Only set timestamp column if this is not modified */
 
869
    if (table->timestamp_field &&
 
870
        bitmap_is_set(table->write_set,
 
871
                      table->timestamp_field->field_index))
 
872
      table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
 
873
 
 
874
    /* if table will be updated then check that it is unique */
 
875
    if (table->map & tables_for_update)
 
876
    {
 
877
      table->mark_columns_needed_for_update();
 
878
      /*
 
879
        If table will be updated we should not downgrade lock for it and
 
880
        leave it as is.
 
881
      */
 
882
    }
 
883
    else
 
884
    {
 
885
      /*
 
886
        If we are using the binary log, we need TL_READ_NO_INSERT to get
 
887
        correct order of statements. Otherwise, we use a TL_READ lock to
 
888
        improve performance.
 
889
      */
 
890
      tl->lock_type= using_update_log ? TL_READ_NO_INSERT : TL_READ;
 
891
      tl->updating= 0;
 
892
      /* Update Table::lock_type accordingly. */
 
893
      if (!tl->placeholder() && !using_lock_tables)
 
894
        tl->table->reginfo.lock_type= tl->lock_type;
 
895
    }
 
896
  }
 
897
 
 
898
  /* now lock and fill tables */
 
899
  if (lock_tables(session, table_list, table_count, &need_reopen))
 
900
  {
 
901
    if (!need_reopen)
 
902
      return(true);
 
903
 
 
904
    /*
 
905
      We have to reopen tables since some of them were altered or dropped
 
906
      during lock_tables() or something was done with their triggers.
 
907
      Let us do some cleanups to be able do setup_table() and setup_fields()
 
908
      once again.
 
909
    */
 
910
    List_iterator_fast<Item> it(*fields);
 
911
    Item *item;
 
912
    while ((item= it++))
 
913
      item->cleanup();
 
914
 
 
915
    /* We have to cleanup translation tables of views. */
 
916
    for (TableList *tbl= table_list; tbl; tbl= tbl->next_global)
 
917
      tbl->cleanup_items();
 
918
 
 
919
    close_tables_for_reopen(session, &table_list);
 
920
    goto reopen_tables;
 
921
  }
 
922
 
 
923
  /*
 
924
    Check that we are not using table that we are updating, but we should
 
925
    skip all tables of UPDATE SELECT itself
 
926
  */
 
927
  lex->select_lex.exclude_from_table_unique_test= true;
 
928
  /* We only need SELECT privilege for columns in the values list */
 
929
  for (tl= leaves; tl; tl= tl->next_leaf)
 
930
  {
 
931
    if (tl->lock_type != TL_READ &&
 
932
        tl->lock_type != TL_READ_NO_INSERT)
 
933
    {
 
934
      TableList *duplicate;
 
935
      if ((duplicate= unique_table(session, tl, table_list, 0)))
 
936
      {
 
937
        update_non_unique_table_error(table_list, "UPDATE", duplicate);
 
938
        return(true);
 
939
      }
 
940
    }
 
941
  }
 
942
  /*
 
943
    Set exclude_from_table_unique_test value back to false. It is needed for
 
944
    further check in multi_update::prepare whether to use record cache.
 
945
  */
 
946
  lex->select_lex.exclude_from_table_unique_test= false;
 
947
 
 
948
  if (session->fill_derived_tables() &&
 
949
      mysql_handle_derived(lex, &mysql_derived_filling))
 
950
    return(true);
 
951
 
 
952
  return (false);
 
953
}
 
954
 
 
955
 
 
956
/*
 
957
  Setup multi-update handling and call SELECT to do the join
 
958
*/
 
959
 
 
960
bool mysql_multi_update(Session *session,
 
961
                        TableList *table_list,
 
962
                        List<Item> *fields,
 
963
                        List<Item> *values,
 
964
                        COND *conds,
 
965
                        uint64_t options,
 
966
                        enum enum_duplicates handle_duplicates, bool ignore,
 
967
                        SELECT_LEX_UNIT *unit, SELECT_LEX *select_lex)
 
968
{
 
969
  multi_update *result;
 
970
  bool res;
 
971
  
 
972
  if (!(result= new multi_update(table_list,
 
973
                                 session->lex->select_lex.leaf_tables,
 
974
                                 fields, values,
 
975
                                 handle_duplicates, ignore)))
 
976
    return(true);
 
977
 
 
978
  session->abort_on_warning= true;
 
979
 
 
980
  List<Item> total_list;
 
981
  res= mysql_select(session, &select_lex->ref_pointer_array,
 
982
                      table_list, select_lex->with_wild,
 
983
                      total_list,
 
984
                      conds, 0, (order_st *) NULL, (order_st *)NULL, (Item *) NULL,
 
985
                      (order_st *)NULL,
 
986
                      options | SELECT_NO_JOIN_CACHE | SELECT_NO_UNLOCK |
 
987
                      OPTION_SETUP_TABLES_DONE,
 
988
                      result, unit, select_lex);
 
989
  res|= session->is_error();
 
990
  if (unlikely(res))
 
991
  {
 
992
    /* If we had a another error reported earlier then this will be ignored */
 
993
    result->send_error(ER_UNKNOWN_ERROR, ER(ER_UNKNOWN_ERROR));
 
994
    result->abort();
 
995
  }
 
996
  delete result;
 
997
  session->abort_on_warning= 0;
 
998
  return(false);
 
999
}
 
1000
 
 
1001
 
 
1002
multi_update::multi_update(TableList *table_list,
 
1003
                           TableList *leaves_list,
 
1004
                           List<Item> *field_list, List<Item> *value_list,
 
1005
                           enum enum_duplicates handle_duplicates_arg,
 
1006
                           bool ignore_arg)
 
1007
  :all_tables(table_list), leaves(leaves_list), update_tables(0),
 
1008
   tmp_tables(0), updated(0), found(0), fields(field_list),
 
1009
   values(value_list), table_count(0), copy_field(0),
 
1010
   handle_duplicates(handle_duplicates_arg), do_update(1), trans_safe(1),
 
1011
   transactional_tables(0), ignore(ignore_arg), error_handled(0)
 
1012
{}
 
1013
 
 
1014
 
 
1015
/*
 
1016
  Connect fields with tables and create list of tables that are updated
 
1017
*/
 
1018
 
 
1019
int multi_update::prepare(List<Item> &,
 
1020
                          SELECT_LEX_UNIT *)
 
1021
{
 
1022
  TableList *table_ref;
 
1023
  SQL_LIST update;
 
1024
  table_map tables_to_update;
 
1025
  Item_field *item;
 
1026
  List_iterator_fast<Item> field_it(*fields);
 
1027
  List_iterator_fast<Item> value_it(*values);
 
1028
  uint32_t i, max_fields;
 
1029
  uint32_t leaf_table_count= 0;
 
1030
  
 
1031
  session->count_cuted_fields= CHECK_FIELD_WARN;
 
1032
  session->cuted_fields=0L;
 
1033
  session->set_proc_info("updating main table");
 
1034
 
 
1035
  tables_to_update= get_table_map(fields);
 
1036
 
 
1037
  if (!tables_to_update)
 
1038
  {
 
1039
    my_message(ER_NO_TABLES_USED, ER(ER_NO_TABLES_USED), MYF(0));
 
1040
    return(1);
 
1041
  }
 
1042
 
 
1043
  /*
 
1044
    We have to check values after setup_tables to get covering_keys right in
 
1045
    reference tables
 
1046
  */
 
1047
 
 
1048
  if (setup_fields(session, 0, *values, MARK_COLUMNS_READ, 0, 0))
 
1049
    return(1);
 
1050
 
 
1051
  /*
 
1052
    Save tables beeing updated in update_tables
 
1053
    update_table->shared is position for table
 
1054
    Don't use key read on tables that are updated
 
1055
  */
 
1056
 
 
1057
  update.empty();
 
1058
  for (table_ref= leaves; table_ref; table_ref= table_ref->next_leaf)
 
1059
  {
 
1060
    /* TODO: add support of view of join support */
 
1061
    Table *table=table_ref->table;
 
1062
    leaf_table_count++;
 
1063
    if (tables_to_update & table->map)
 
1064
    {
 
1065
      TableList *tl= (TableList*) session->memdup((char*) table_ref,
 
1066
                                                sizeof(*tl));
 
1067
      if (!tl)
 
1068
        return(1);
 
1069
      update.link_in_list((unsigned char*) tl, (unsigned char**) &tl->next_local);
 
1070
      tl->shared= table_count++;
 
1071
      table->no_keyread=1;
 
1072
      table->covering_keys.clear_all();
 
1073
      table->pos_in_table_list= tl;
 
1074
    }
 
1075
  }
 
1076
 
 
1077
 
 
1078
  table_count=  update.elements;
 
1079
  update_tables= (TableList*) update.first;
 
1080
 
 
1081
  tmp_tables = (Table**) session->calloc(sizeof(Table *) * table_count);
 
1082
  tmp_table_param = (TMP_TABLE_PARAM*) session->calloc(sizeof(TMP_TABLE_PARAM) *
 
1083
                                                   table_count);
 
1084
  fields_for_table= (List_item **) session->alloc(sizeof(List_item *) *
 
1085
                                              table_count);
 
1086
  values_for_table= (List_item **) session->alloc(sizeof(List_item *) *
 
1087
                                              table_count);
 
1088
  if (session->is_fatal_error)
 
1089
    return(1);
 
1090
  for (i=0 ; i < table_count ; i++)
 
1091
  {
 
1092
    fields_for_table[i]= new List_item;
 
1093
    values_for_table[i]= new List_item;
 
1094
  }
 
1095
  if (session->is_fatal_error)
 
1096
    return(1);
 
1097
 
 
1098
  /* Split fields into fields_for_table[] and values_by_table[] */
 
1099
 
 
1100
  while ((item= (Item_field *) field_it++))
 
1101
  {
 
1102
    Item *value= value_it++;
 
1103
    uint32_t offset= item->field->table->pos_in_table_list->shared;
 
1104
    fields_for_table[offset]->push_back(item);
 
1105
    values_for_table[offset]->push_back(value);
 
1106
  }
 
1107
  if (session->is_fatal_error)
 
1108
    return(1);
 
1109
 
 
1110
  /* Allocate copy fields */
 
1111
  max_fields=0;
 
1112
  for (i=0 ; i < table_count ; i++)
 
1113
    set_if_bigger(max_fields, fields_for_table[i]->elements + leaf_table_count);
 
1114
  copy_field= new Copy_field[max_fields];
 
1115
  return(session->is_fatal_error != 0);
 
1116
}
 
1117
 
 
1118
 
 
1119
/*
 
1120
  Check if table is safe to update on fly
 
1121
 
 
1122
  SYNOPSIS
 
1123
    safe_update_on_fly()
 
1124
    session                 Thread handler
 
1125
    join_tab            How table is used in join
 
1126
    all_tables          List of tables
 
1127
 
 
1128
  NOTES
 
1129
    We can update the first table in join on the fly if we know that
 
1130
    a row in this table will never be read twice. This is true under
 
1131
    the following conditions:
 
1132
 
 
1133
    - We are doing a table scan and the data is in a separate file (MyISAM) or
 
1134
      if we don't update a clustered key.
 
1135
 
 
1136
    - We are doing a range scan and we don't update the scan key or
 
1137
      the primary key for a clustered table handler.
 
1138
 
 
1139
    - Table is not joined to itself.
 
1140
 
 
1141
    This function gets information about fields to be updated from
 
1142
    the Table::write_set bitmap.
 
1143
 
 
1144
  WARNING
 
1145
    This code is a bit dependent of how make_join_readinfo() works.
 
1146
 
 
1147
  RETURN
 
1148
    0           Not safe to update
 
1149
    1           Safe to update
 
1150
*/
 
1151
 
 
1152
static bool safe_update_on_fly(Session *session, JOIN_TAB *join_tab,
 
1153
                               TableList *table_ref, TableList *all_tables)
 
1154
{
 
1155
  Table *table= join_tab->table;
 
1156
  if (unique_table(session, table_ref, all_tables, 0))
 
1157
    return 0;
 
1158
  switch (join_tab->type) {
 
1159
  case JT_SYSTEM:
 
1160
  case JT_CONST:
 
1161
  case JT_EQ_REF:
 
1162
    return true;                                // At most one matching row
 
1163
  case JT_REF:
 
1164
  case JT_REF_OR_NULL:
 
1165
    return !is_key_used(table, join_tab->ref.key, table->write_set);
 
1166
  case JT_ALL:
 
1167
    /* If range search on index */
 
1168
    if (join_tab->quick)
 
1169
      return !join_tab->quick->is_keys_used(table->write_set);
 
1170
    /* If scanning in clustered key */
 
1171
    if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_IN_READ_INDEX) &&
 
1172
        table->s->primary_key < MAX_KEY)
 
1173
      return !is_key_used(table, table->s->primary_key, table->write_set);
 
1174
    return true;
 
1175
  default:
 
1176
    break;                                      // Avoid compler warning
 
1177
  }
619
1178
  return false;
620
 
}
621
 
 
622
 
} /* namespace drizzled */
 
1179
 
 
1180
}
 
1181
 
 
1182
 
 
1183
/*
 
1184
  Initialize table for multi table
 
1185
 
 
1186
  IMPLEMENTATION
 
1187
    - Update first table in join on the fly, if possible
 
1188
    - Create temporary tables to store changed values for all other tables
 
1189
      that are updated (and main_table if the above doesn't hold).
 
1190
*/
 
1191
 
 
1192
bool
 
1193
multi_update::initialize_tables(JOIN *join)
 
1194
{
 
1195
  TableList *table_ref;
 
1196
  
 
1197
  if ((session->options & OPTION_SAFE_UPDATES) && error_if_full_join(join))
 
1198
    return(1);
 
1199
  main_table=join->join_tab->table;
 
1200
  table_to_update= 0;
 
1201
 
 
1202
  /* Any update has at least one pair (field, value) */
 
1203
  assert(fields->elements);
 
1204
 
 
1205
  /* Create a temporary table for keys to all tables, except main table */
 
1206
  for (table_ref= update_tables; table_ref; table_ref= table_ref->next_local)
 
1207
  {
 
1208
    Table *table=table_ref->table;
 
1209
    uint32_t cnt= table_ref->shared;
 
1210
    List<Item> temp_fields;
 
1211
    order_st     group;
 
1212
    TMP_TABLE_PARAM *tmp_param;
 
1213
 
 
1214
    table->mark_columns_needed_for_update();
 
1215
    if (ignore)
 
1216
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
1217
    if (table == main_table)                    // First table in join
 
1218
    {
 
1219
      if (safe_update_on_fly(session, join->join_tab, table_ref, all_tables))
 
1220
      {
 
1221
        table_to_update= main_table;            // Update table on the fly
 
1222
        continue;
 
1223
      }
 
1224
    }
 
1225
    table->prepare_for_position();
 
1226
 
 
1227
    tmp_param= tmp_table_param+cnt;
 
1228
 
 
1229
    /*
 
1230
      Create a temporary table to store all fields that are changed for this
 
1231
      table. The first field in the temporary table is a pointer to the
 
1232
      original row so that we can find and update it. For the updatable
 
1233
      VIEW a few following fields are rowids of tables used in the CHECK
 
1234
      OPTION condition.
 
1235
    */
 
1236
 
 
1237
    List_iterator_fast<Table> tbl_it(unupdated_check_opt_tables);
 
1238
    Table *tbl= table;
 
1239
    do
 
1240
    {
 
1241
      Field_string *field= new Field_string(tbl->file->ref_length, 0,
 
1242
                                            tbl->alias, &my_charset_bin);
 
1243
#ifdef OLD
 
1244
      Field_varstring *field= new Field_varstring(tbl->file->ref_length, 0,
 
1245
                                                  tbl->alias, tbl->s, &my_charset_bin);
 
1246
#endif
 
1247
      if (!field)
 
1248
        return(1);
 
1249
      field->init(tbl);
 
1250
      /*
 
1251
        The field will be converted to varstring when creating tmp table if
 
1252
        table to be updated was created by mysql 4.1. Deny this.
 
1253
      */
 
1254
      Item_field *ifield= new Item_field((Field *) field);
 
1255
      if (!ifield)
 
1256
         return(1);
 
1257
      ifield->maybe_null= 0;
 
1258
      if (temp_fields.push_back(ifield))
 
1259
        return(1);
 
1260
    } while ((tbl= tbl_it++));
 
1261
 
 
1262
    temp_fields.concat(fields_for_table[cnt]);
 
1263
 
 
1264
    /* Make an unique key over the first field to avoid duplicated updates */
 
1265
    memset(&group, 0, sizeof(group));
 
1266
    group.asc= 1;
 
1267
    group.item= (Item**) temp_fields.head_ref();
 
1268
 
 
1269
    tmp_param->quick_group=1;
 
1270
    tmp_param->field_count=temp_fields.elements;
 
1271
    tmp_param->group_parts=1;
 
1272
    tmp_param->group_length= table->file->ref_length;
 
1273
    if (!(tmp_tables[cnt]=create_tmp_table(session,
 
1274
                                           tmp_param,
 
1275
                                           temp_fields,
 
1276
                                           (order_st*) &group, 0, 0,
 
1277
                                           TMP_TABLE_ALL_COLUMNS,
 
1278
                                           HA_POS_ERROR,
 
1279
                                           (char *) "")))
 
1280
      return(1);
 
1281
    tmp_tables[cnt]->file->extra(HA_EXTRA_WRITE_CACHE);
 
1282
  }
 
1283
  return(0);
 
1284
}
 
1285
 
 
1286
 
 
1287
multi_update::~multi_update()
 
1288
{
 
1289
  TableList *table;
 
1290
  for (table= update_tables ; table; table= table->next_local)
 
1291
  {
 
1292
    table->table->no_keyread= table->table->no_cache= 0;
 
1293
    if (ignore)
 
1294
      table->table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1295
  }
 
1296
 
 
1297
  if (tmp_tables)
 
1298
  {
 
1299
    for (uint32_t cnt = 0; cnt < table_count; cnt++)
 
1300
    {
 
1301
      if (tmp_tables[cnt])
 
1302
      {
 
1303
        tmp_tables[cnt]->free_tmp_table(session);
 
1304
        tmp_table_param[cnt].cleanup();
 
1305
      }
 
1306
    }
 
1307
  }
 
1308
  if (copy_field)
 
1309
    delete [] copy_field;
 
1310
  session->count_cuted_fields= CHECK_FIELD_IGNORE;              // Restore this setting
 
1311
  assert(trans_safe || !updated ||
 
1312
              session->transaction.all.modified_non_trans_table);
 
1313
}
 
1314
 
 
1315
 
 
1316
bool multi_update::send_data(List<Item> &)
 
1317
{
 
1318
  TableList *cur_table;
 
1319
 
 
1320
  for (cur_table= update_tables; cur_table; cur_table= cur_table->next_local)
 
1321
  {
 
1322
    Table *table= cur_table->table;
 
1323
    uint32_t offset= cur_table->shared;
 
1324
    /*
 
1325
      Check if we are using outer join and we didn't find the row
 
1326
      or if we have already updated this row in the previous call to this
 
1327
      function.
 
1328
 
 
1329
      The same row may be presented here several times in a join of type
 
1330
      UPDATE t1 FROM t1,t2 SET t1.a=t2.a
 
1331
 
 
1332
      In this case we will do the update for the first found row combination.
 
1333
      The join algorithm guarantees that we will not find the a row in
 
1334
      t1 several times.
 
1335
    */
 
1336
    if (table->status & (STATUS_NULL_ROW | STATUS_UPDATED))
 
1337
      continue;
 
1338
 
 
1339
    /*
 
1340
      We can use compare_record() to optimize away updates if
 
1341
      the table handler is returning all columns OR if
 
1342
      if all updated columns are read
 
1343
    */
 
1344
    if (table == table_to_update)
 
1345
    {
 
1346
      bool can_compare_record;
 
1347
      can_compare_record= (!(table->file->ha_table_flags() &
 
1348
                             HA_PARTIAL_COLUMN_READ) ||
 
1349
                           bitmap_is_subset(table->write_set,
 
1350
                                            table->read_set));
 
1351
      table->status|= STATUS_UPDATED;
 
1352
      store_record(table,record[1]);
 
1353
      if (fill_record(session, *fields_for_table[offset],
 
1354
                      *values_for_table[offset], 0))
 
1355
        return(1);
 
1356
 
 
1357
      found++;
 
1358
      if (!can_compare_record || table->compare_record())
 
1359
      {
 
1360
        int error;
 
1361
        if (!updated++)
 
1362
        {
 
1363
          /*
 
1364
            Inform the main table that we are going to update the table even
 
1365
            while we may be scanning it.  This will flush the read cache
 
1366
            if it's used.
 
1367
          */
 
1368
          main_table->file->extra(HA_EXTRA_PREPARE_FOR_UPDATE);
 
1369
        }
 
1370
        if ((error=table->file->ha_update_row(table->record[1],
 
1371
                                              table->record[0])) &&
 
1372
            error != HA_ERR_RECORD_IS_THE_SAME)
 
1373
        {
 
1374
          updated--;
 
1375
          if (!ignore ||
 
1376
              table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
 
1377
          {
 
1378
            /*
 
1379
              If (ignore && error == is ignorable) we don't have to
 
1380
              do anything; otherwise...
 
1381
            */
 
1382
            myf flags= 0;
 
1383
 
 
1384
            if (table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
 
1385
              flags|= ME_FATALERROR; /* Other handler errors are fatal */
 
1386
 
 
1387
            prepare_record_for_error_message(error, table);
 
1388
            table->file->print_error(error,MYF(flags));
 
1389
            return(1);
 
1390
          }
 
1391
        }
 
1392
        else
 
1393
        {
 
1394
          if (error == HA_ERR_RECORD_IS_THE_SAME)
 
1395
          {
 
1396
            error= 0;
 
1397
            updated--;
 
1398
          }
 
1399
          /* non-transactional or transactional table got modified   */
 
1400
          /* either multi_update class' flag is raised in its branch */
 
1401
          if (table->file->has_transactions())
 
1402
            transactional_tables= 1;
 
1403
          else
 
1404
          {
 
1405
            trans_safe= 0;
 
1406
            session->transaction.stmt.modified_non_trans_table= true;
 
1407
          }
 
1408
        }
 
1409
      }
 
1410
    }
 
1411
    else
 
1412
    {
 
1413
      int error;
 
1414
      Table *tmp_table= tmp_tables[offset];
 
1415
      /*
 
1416
       For updatable VIEW store rowid of the updated table and
 
1417
       rowids of tables used in the CHECK OPTION condition.
 
1418
      */
 
1419
      uint32_t field_num= 0;
 
1420
      List_iterator_fast<Table> tbl_it(unupdated_check_opt_tables);
 
1421
      Table *tbl= table;
 
1422
      do
 
1423
      {
 
1424
        tbl->file->position(tbl->record[0]);
 
1425
        memcpy(tmp_table->field[field_num]->ptr,
 
1426
               tbl->file->ref, tbl->file->ref_length);
 
1427
        field_num++;
 
1428
      } while ((tbl= tbl_it++));
 
1429
 
 
1430
      /* Store regular updated fields in the row. */
 
1431
      fill_record(session,
 
1432
                  tmp_table->field + 1 + unupdated_check_opt_tables.elements,
 
1433
                  *values_for_table[offset], 1);
 
1434
 
 
1435
      /* Write row, ignoring duplicated updates to a row */
 
1436
      error= tmp_table->file->ha_write_row(tmp_table->record[0]);
 
1437
      if (error != HA_ERR_FOUND_DUPP_KEY && error != HA_ERR_FOUND_DUPP_UNIQUE)
 
1438
      {
 
1439
        if (error &&
 
1440
            create_myisam_from_heap(session, tmp_table,
 
1441
                                         tmp_table_param[offset].start_recinfo,
 
1442
                                         &tmp_table_param[offset].recinfo,
 
1443
                                         error, 1))
 
1444
        {
 
1445
          do_update=0;
 
1446
          return(1);                    // Not a table_is_full error
 
1447
        }
 
1448
        found++;
 
1449
      }
 
1450
    }
 
1451
  }
 
1452
  return(0);
 
1453
}
 
1454
 
 
1455
 
 
1456
void multi_update::send_error(uint32_t errcode,const char *err)
 
1457
{
 
1458
  /* First send error what ever it is ... */
 
1459
  my_error(errcode, MYF(0), err);
 
1460
}
 
1461
 
 
1462
 
 
1463
void multi_update::abort()
 
1464
{
 
1465
  /* the error was handled or nothing deleted and no side effects return */
 
1466
  if (error_handled ||
 
1467
      (!session->transaction.stmt.modified_non_trans_table && !updated))
 
1468
    return;
 
1469
  /*
 
1470
    If all tables that has been updated are trans safe then just do rollback.
 
1471
    If not attempt to do remaining updates.
 
1472
  */
 
1473
 
 
1474
  if (! trans_safe)
 
1475
  {
 
1476
    assert(session->transaction.stmt.modified_non_trans_table);
 
1477
    if (do_update && table_count > 1)
 
1478
    {
 
1479
      /* Add warning here */
 
1480
      /* 
 
1481
         todo/fixme: do_update() is never called with the arg 1.
 
1482
         should it change the signature to become argless?
 
1483
      */
 
1484
      do_updates();
 
1485
    }
 
1486
  }
 
1487
  if (session->transaction.stmt.modified_non_trans_table)
 
1488
  {
 
1489
    /*
 
1490
      The query has to binlog because there's a modified non-transactional table
 
1491
      either from the query's list or via a stored routine: bug#13270,23333
 
1492
    */
 
1493
    if (mysql_bin_log.is_open())
 
1494
    {
 
1495
      /*
 
1496
        Session::killed status might not have been set ON at time of an error
 
1497
        got caught and if happens later the killed error is written
 
1498
        into repl event.
 
1499
      */
 
1500
      session->binlog_query(Session::ROW_QUERY_TYPE,
 
1501
                        session->query, session->query_length,
 
1502
                        transactional_tables, false);
 
1503
    }
 
1504
    session->transaction.all.modified_non_trans_table= true;
 
1505
  }
 
1506
  assert(trans_safe || !updated || session->transaction.stmt.modified_non_trans_table);
 
1507
}
 
1508
 
 
1509
 
 
1510
int multi_update::do_updates()
 
1511
{
 
1512
  TableList *cur_table;
 
1513
  int local_error= 0;
 
1514
  ha_rows org_updated;
 
1515
  Table *table, *tmp_table;
 
1516
  List_iterator_fast<Table> check_opt_it(unupdated_check_opt_tables);
 
1517
  
 
1518
  do_update= 0;                                 // Don't retry this function
 
1519
  if (!found)
 
1520
    return(0);
 
1521
  for (cur_table= update_tables; cur_table; cur_table= cur_table->next_local)
 
1522
  {
 
1523
    bool can_compare_record;
 
1524
    uint32_t offset= cur_table->shared;
 
1525
 
 
1526
    table = cur_table->table;
 
1527
    if (table == table_to_update)
 
1528
      continue;                                 // Already updated
 
1529
    org_updated= updated;
 
1530
    tmp_table= tmp_tables[cur_table->shared];
 
1531
    tmp_table->file->extra(HA_EXTRA_CACHE);     // Change to read cache
 
1532
    (void) table->file->ha_rnd_init(0);
 
1533
    table->file->extra(HA_EXTRA_NO_CACHE);
 
1534
 
 
1535
    check_opt_it.rewind();
 
1536
    while(Table *tbl= check_opt_it++)
 
1537
    {
 
1538
      if (tbl->file->ha_rnd_init(1))
 
1539
        goto err;
 
1540
      tbl->file->extra(HA_EXTRA_CACHE);
 
1541
    }
 
1542
 
 
1543
    /*
 
1544
      Setup copy functions to copy fields from temporary table
 
1545
    */
 
1546
    List_iterator_fast<Item> field_it(*fields_for_table[offset]);
 
1547
    Field **field= tmp_table->field + 
 
1548
                   1 + unupdated_check_opt_tables.elements; // Skip row pointers
 
1549
    Copy_field *copy_field_ptr= copy_field, *copy_field_end;
 
1550
    for ( ; *field ; field++)
 
1551
    {
 
1552
      Item_field *item= (Item_field* ) field_it++;
 
1553
      (copy_field_ptr++)->set(item->field, *field, 0);
 
1554
    }
 
1555
    copy_field_end=copy_field_ptr;
 
1556
 
 
1557
    if ((local_error = tmp_table->file->ha_rnd_init(1)))
 
1558
      goto err;
 
1559
 
 
1560
    can_compare_record= (!(table->file->ha_table_flags() &
 
1561
                           HA_PARTIAL_COLUMN_READ) ||
 
1562
                         bitmap_is_subset(table->write_set,
 
1563
                                          table->read_set));
 
1564
 
 
1565
    for (;;)
 
1566
    {
 
1567
      if (session->killed && trans_safe)
 
1568
        goto err;
 
1569
      if ((local_error=tmp_table->file->rnd_next(tmp_table->record[0])))
 
1570
      {
 
1571
        if (local_error == HA_ERR_END_OF_FILE)
 
1572
          break;
 
1573
        if (local_error == HA_ERR_RECORD_DELETED)
 
1574
          continue;                             // May happen on dup key
 
1575
        goto err;
 
1576
      }
 
1577
 
 
1578
      /* call rnd_pos() using rowids from temporary table */
 
1579
      check_opt_it.rewind();
 
1580
      Table *tbl= table;
 
1581
      uint32_t field_num= 0;
 
1582
      do
 
1583
      {
 
1584
        if((local_error=
 
1585
              tbl->file->rnd_pos(tbl->record[0],
 
1586
                                (unsigned char *) tmp_table->field[field_num]->ptr)))
 
1587
          goto err;
 
1588
        field_num++;
 
1589
      } while((tbl= check_opt_it++));
 
1590
 
 
1591
      table->status|= STATUS_UPDATED;
 
1592
      store_record(table,record[1]);
 
1593
 
 
1594
      /* Copy data from temporary table to current table */
 
1595
      for (copy_field_ptr=copy_field;
 
1596
           copy_field_ptr != copy_field_end;
 
1597
           copy_field_ptr++)
 
1598
        (*copy_field_ptr->do_copy)(copy_field_ptr);
 
1599
 
 
1600
      if (!can_compare_record || table->compare_record())
 
1601
      {
 
1602
        if ((local_error=table->file->ha_update_row(table->record[1],
 
1603
                                                    table->record[0])) &&
 
1604
            local_error != HA_ERR_RECORD_IS_THE_SAME)
 
1605
        {
 
1606
          if (!ignore ||
 
1607
              table->file->is_fatal_error(local_error, HA_CHECK_DUP_KEY))
 
1608
            goto err;
 
1609
        }
 
1610
        if (local_error != HA_ERR_RECORD_IS_THE_SAME)
 
1611
          updated++;
 
1612
        else
 
1613
          local_error= 0;
 
1614
      }
 
1615
    }
 
1616
 
 
1617
    if (updated != org_updated)
 
1618
    {
 
1619
      if (table->file->has_transactions())
 
1620
        transactional_tables= 1;
 
1621
      else
 
1622
      {
 
1623
        trans_safe= 0;                          // Can't do safe rollback
 
1624
        session->transaction.stmt.modified_non_trans_table= true;
 
1625
      }
 
1626
    }
 
1627
    (void) table->file->ha_rnd_end();
 
1628
    (void) tmp_table->file->ha_rnd_end();
 
1629
    check_opt_it.rewind();
 
1630
    while (Table *tbl= check_opt_it++)
 
1631
        tbl->file->ha_rnd_end();
 
1632
 
 
1633
  }
 
1634
  return(0);
 
1635
 
 
1636
err:
 
1637
  {
 
1638
    prepare_record_for_error_message(local_error, table);
 
1639
    table->file->print_error(local_error,MYF(ME_FATALERROR));
 
1640
  }
 
1641
 
 
1642
  (void) table->file->ha_rnd_end();
 
1643
  (void) tmp_table->file->ha_rnd_end();
 
1644
  check_opt_it.rewind();
 
1645
  while (Table *tbl= check_opt_it++)
 
1646
      tbl->file->ha_rnd_end();
 
1647
 
 
1648
  if (updated != org_updated)
 
1649
  {
 
1650
    if (table->file->has_transactions())
 
1651
      transactional_tables= 1;
 
1652
    else
 
1653
    {
 
1654
      trans_safe= 0;
 
1655
      session->transaction.stmt.modified_non_trans_table= true;
 
1656
    }
 
1657
  }
 
1658
  return(1);
 
1659
}
 
1660
 
 
1661
 
 
1662
/* out: 1 if error, 0 if success */
 
1663
 
 
1664
bool multi_update::send_eof()
 
1665
{
 
1666
  char buff[STRING_BUFFER_USUAL_SIZE];
 
1667
  uint64_t id;
 
1668
  Session::killed_state killed_status= Session::NOT_KILLED;
 
1669
  
 
1670
  session->set_proc_info("updating reference tables");
 
1671
 
 
1672
  /* 
 
1673
     Does updates for the last n - 1 tables, returns 0 if ok;
 
1674
     error takes into account killed status gained in do_updates()
 
1675
  */
 
1676
  int local_error = (table_count) ? do_updates() : 0;
 
1677
  /*
 
1678
    if local_error is not set ON until after do_updates() then
 
1679
    later carried out killing should not affect binlogging.
 
1680
  */
 
1681
  killed_status= (local_error == 0)? Session::NOT_KILLED : session->killed;
 
1682
  session->set_proc_info("end");
 
1683
 
 
1684
  /*
 
1685
    Write the SQL statement to the binlog if we updated
 
1686
    rows and we succeeded or if we updated some non
 
1687
    transactional tables.
 
1688
    
 
1689
    The query has to binlog because there's a modified non-transactional table
 
1690
    either from the query's list or via a stored routine: bug#13270,23333
 
1691
  */
 
1692
 
 
1693
  assert(trans_safe || !updated || 
 
1694
              session->transaction.stmt.modified_non_trans_table);
 
1695
  if (local_error == 0 || session->transaction.stmt.modified_non_trans_table)
 
1696
  {
 
1697
    if (mysql_bin_log.is_open())
 
1698
    {
 
1699
      if (local_error == 0)
 
1700
        session->clear_error();
 
1701
      if (session->binlog_query(Session::ROW_QUERY_TYPE,
 
1702
                            session->query, session->query_length,
 
1703
                            transactional_tables, false, killed_status) &&
 
1704
          trans_safe)
 
1705
      {
 
1706
        local_error= 1;                         // Rollback update
 
1707
      }
 
1708
    }
 
1709
    if (session->transaction.stmt.modified_non_trans_table)
 
1710
      session->transaction.all.modified_non_trans_table= true;
 
1711
  }
 
1712
  if (local_error != 0)
 
1713
    error_handled= true; // to force early leave from ::send_error()
 
1714
 
 
1715
  if (local_error > 0) // if the above log write did not fail ...
 
1716
  {
 
1717
    /* Safety: If we haven't got an error before (can happen in do_updates) */
 
1718
    my_message(ER_UNKNOWN_ERROR, "An error occured in multi-table update",
 
1719
               MYF(0));
 
1720
    return(true);
 
1721
  }
 
1722
 
 
1723
  id= session->arg_of_last_insert_id_function ?
 
1724
    session->first_successful_insert_id_in_prev_stmt : 0;
 
1725
  sprintf(buff, ER(ER_UPDATE_INFO), (ulong) found, (ulong) updated,
 
1726
          (ulong) session->cuted_fields);
 
1727
  session->row_count_func=
 
1728
    (session->client_capabilities & CLIENT_FOUND_ROWS) ? found : updated;
 
1729
  ::my_ok(session, (ulong) session->row_count_func, id, buff);
 
1730
  return(false);
 
1731
}