~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_update.cc

  • Committer: Brian Aker
  • Date: 2009-01-24 09:43:35 UTC
  • Revision ID: brian@gir-3.local-20090124094335-6qdtvc35gl5fvivz
Adding in an example singe thread scheduler

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
 
17
17
/*
18
18
  Single table and multi table updates of tables.
19
19
  Multi-table updates were introduced by Sinisa & Monty
20
20
*/
21
 
#include "config.h"
22
 
#include "drizzled/sql_select.h"
23
 
#include "drizzled/error.h"
24
 
#include "drizzled/probes.h"
25
 
#include "drizzled/sql_base.h"
26
 
#include "drizzled/field/timestamp.h"
27
 
#include "drizzled/sql_parse.h"
28
 
#include "drizzled/optimizer/range.h"
29
 
#include "drizzled/records.h"
30
 
#include "drizzled/internal/my_sys.h"
31
 
#include "drizzled/internal/iocache.h"
32
 
#include "drizzled/transaction_services.h"
33
 
#include "drizzled/filesort.h"
34
 
 
35
 
#include <boost/dynamic_bitset.hpp>
36
 
#include <list>
37
 
 
38
 
using namespace std;
39
 
 
40
 
namespace drizzled
 
21
#include <drizzled/server_includes.h>
 
22
#include <drizzled/sql_select.h>
 
23
#include <drizzled/error.h>
 
24
#include <drizzled/probes.h>
 
25
#include <drizzled/sql_base.h>
 
26
#include <drizzled/field/timestamp.h>
 
27
 
 
28
/*
 
29
  check that all fields are real fields
 
30
 
 
31
  SYNOPSIS
 
32
    check_fields()
 
33
    session             thread handler
 
34
    items           Items for check
 
35
 
 
36
  RETURN
 
37
    true  Items can't be used in UPDATE
 
38
    false Items are OK
 
39
*/
 
40
 
 
41
static bool check_fields(Session *session, List<Item> &items)
41
42
{
 
43
  List_iterator<Item> it(items);
 
44
  Item *item;
 
45
  Item_field *field;
 
46
 
 
47
  while ((item= it++))
 
48
  {
 
49
    if (!(field= item->filed_for_view_update()))
 
50
    {
 
51
      /* item has name, because it comes from VIEW SELECT list */
 
52
      my_error(ER_NONUPDATEABLE_COLUMN, MYF(0), item->name);
 
53
      return true;
 
54
    }
 
55
    /*
 
56
      we make temporary copy of Item_field, to avoid influence of changing
 
57
      result_field on Item_ref which refer on this field
 
58
    */
 
59
    session->change_item_tree(it.ref(), new Item_field(session, field));
 
60
  }
 
61
  return false;
 
62
}
 
63
 
42
64
 
43
65
/**
44
66
  Re-read record if more columns are needed for error message.
45
67
 
46
68
  If we got a duplicate key error, we want to write an error
47
69
  message containing the value of the duplicate key. If we do not have
48
 
  all fields of the key value in getInsertRecord(), we need to re-read the
 
70
  all fields of the key value in record[0], we need to re-read the
49
71
  record with a proper read_set.
50
72
 
51
73
  @param[in] error   error number
54
76
 
55
77
static void prepare_record_for_error_message(int error, Table *table)
56
78
{
57
 
  Field **field_p= NULL;
58
 
  Field *field= NULL;
59
 
  uint32_t keynr= 0;
 
79
  Field **field_p;
 
80
  Field *field;
 
81
  uint32_t keynr;
 
82
  MY_BITMAP unique_map; /* Fields in offended unique. */
 
83
  my_bitmap_map unique_map_buf[bitmap_buffer_size(MAX_FIELDS)];
60
84
 
61
85
  /*
62
86
    Only duplicate key errors print the key value.
63
87
    If storage engine does always read all columns, we have the value alraedy.
64
88
  */
65
89
  if ((error != HA_ERR_FOUND_DUPP_KEY) ||
66
 
      ! (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ)))
 
90
      !(table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ))
67
91
    return;
68
92
 
69
93
  /*
70
94
    Get the number of the offended index.
71
95
    We will see MAX_KEY if the engine cannot determine the affected index.
72
96
  */
73
 
  if ((keynr= table->get_dup_key(error)) >= MAX_KEY)
 
97
  if ((keynr= table->file->get_dup_key(error)) >= MAX_KEY)
74
98
    return;
75
99
 
76
100
  /* Create unique_map with all fields used by that index. */
77
 
  boost::dynamic_bitset<> unique_map(table->getShare()->sizeFields()); /* Fields in offended unique. */
78
 
  table->mark_columns_used_by_index_no_reset(keynr, unique_map);
 
101
  bitmap_init(&unique_map, unique_map_buf, table->s->fields, false);
 
102
  table->mark_columns_used_by_index_no_reset(keynr, &unique_map);
79
103
 
80
104
  /* Subtract read_set and write_set. */
81
 
  unique_map-= *table->read_set;
82
 
  unique_map-= *table->write_set;
 
105
  bitmap_subtract(&unique_map, table->read_set);
 
106
  bitmap_subtract(&unique_map, table->write_set);
83
107
 
84
108
  /*
85
109
    If the unique index uses columns that are neither in read_set
86
110
    nor in write_set, we must re-read the record.
87
111
    Otherwise no need to do anything.
88
112
  */
89
 
  if (unique_map.none())
 
113
  if (bitmap_is_clear_all(&unique_map))
90
114
    return;
91
115
 
92
 
  /* Get identifier of last read record into table->cursor->ref. */
93
 
  table->cursor->position(table->getInsertRecord());
 
116
  /* Get identifier of last read record into table->file->ref. */
 
117
  table->file->position(table->record[0]);
94
118
  /* Add all fields used by unique index to read_set. */
95
 
  *table->read_set|= unique_map;
96
 
  /* Read record that is identified by table->cursor->ref. */
97
 
  (void) table->cursor->rnd_pos(table->getUpdateRecord(), table->cursor->ref);
 
119
  bitmap_union(table->read_set, &unique_map);
 
120
  /* Tell the engine about the new set. */
 
121
  table->file->column_bitmaps_signal();
 
122
  /* Read record that is identified by table->file->ref. */
 
123
  (void) table->file->rnd_pos(table->record[1], table->file->ref);
98
124
  /* Copy the newly read columns into the new record. */
99
 
  for (field_p= table->getFields(); (field= *field_p); field_p++)
100
 
  {
101
 
    if (unique_map.test(field->position()))
102
 
    {
103
 
      field->copy_from_tmp(table->getShare()->rec_buff_length);
104
 
    }
105
 
  }
 
125
  for (field_p= table->field; (field= *field_p); field_p++)
 
126
    if (bitmap_is_set(&unique_map, field->field_index))
 
127
      field->copy_from_tmp(table->s->rec_buff_length);
106
128
 
107
129
  return;
108
130
}
117
139
    fields              fields for update
118
140
    values              values of fields for update
119
141
    conds               WHERE clause expression
120
 
    order_num           number of elemen in ORDER BY clause
 
142
    order_num           number of elemen in order_st BY clause
121
143
    order               order_st BY clause list
122
144
    limit               limit clause
123
145
    handle_duplicates   how to handle duplicates
124
146
 
125
147
  RETURN
126
148
    0  - OK
 
149
    2  - privilege check and openning table passed, but we need to convert to
 
150
         multi-update because of view substitution
127
151
    1  - error
128
152
*/
129
153
 
130
154
int mysql_update(Session *session, TableList *table_list,
131
155
                 List<Item> &fields, List<Item> &values, COND *conds,
132
 
                 uint32_t order_num, Order *order,
 
156
                 uint32_t order_num, order_st *order,
133
157
                 ha_rows limit, enum enum_duplicates,
134
158
                 bool ignore)
135
159
{
136
160
  bool          using_limit= limit != HA_POS_ERROR;
137
 
  bool          used_key_is_modified;
138
 
  bool          transactional_table;
139
 
  int           error;
 
161
  bool          safe_update= test(session->options & OPTION_SAFE_UPDATES);
 
162
  bool          used_key_is_modified, transactional_table, will_batch;
 
163
  bool          can_compare_record;
 
164
  int           error, loc_error;
140
165
  uint          used_index= MAX_KEY, dup_key_found;
141
166
  bool          need_sort= true;
 
167
  uint32_t          table_count= 0;
142
168
  ha_rows       updated, found;
143
169
  key_map       old_covering_keys;
144
170
  Table         *table;
145
 
  optimizer::SqlSelect *select= NULL;
146
 
  ReadRecord    info;
147
 
  Select_Lex    *select_lex= &session->lex->select_lex;
 
171
  SQL_SELECT    *select;
 
172
  READ_RECORD   info;
 
173
  SELECT_LEX    *select_lex= &session->lex->select_lex;
 
174
  bool          need_reopen;
148
175
  uint64_t     id;
149
176
  List<Item> all_fields;
150
 
  Session::killed_state_t killed_status= Session::NOT_KILLED;
 
177
  Session::killed_state killed_status= Session::NOT_KILLED;
151
178
 
152
 
  DRIZZLE_UPDATE_START(session->getQueryString()->c_str());
153
 
  if (session->openTablesLock(table_list))
 
179
  for ( ; ; )
154
180
  {
155
 
    DRIZZLE_UPDATE_DONE(1, 0, 0);
156
 
    return 1;
 
181
    if (open_tables(session, &table_list, &table_count, 0))
 
182
      return(1);
 
183
 
 
184
    if (!lock_tables(session, table_list, table_count, &need_reopen))
 
185
      break;
 
186
    if (!need_reopen)
 
187
      return(1);
 
188
    close_tables_for_reopen(session, &table_list);
157
189
  }
158
190
 
 
191
  if (mysql_handle_derived(session->lex, &mysql_derived_prepare) ||
 
192
      (session->fill_derived_tables() &&
 
193
       mysql_handle_derived(session->lex, &mysql_derived_filling)))
 
194
    return(1);
 
195
 
 
196
  DRIZZLE_UPDATE_START();
159
197
  session->set_proc_info("init");
160
198
  table= table_list->table;
161
199
 
162
200
  /* Calculate "table->covering_keys" based on the WHERE */
163
 
  table->covering_keys= table->getShare()->keys_in_use;
164
 
  table->quick_keys.reset();
 
201
  table->covering_keys= table->s->keys_in_use;
 
202
  table->quick_keys.clear_all();
165
203
 
166
204
  if (mysql_prepare_update(session, table_list, &conds, order_num, order))
167
 
  {
168
 
    DRIZZLE_UPDATE_DONE(1, 0, 0);
169
 
    return 1;
170
 
  }
 
205
    goto abort;
171
206
 
172
207
  old_covering_keys= table->covering_keys;              // Keys used in WHERE
173
208
  /* Check the fields we are going to modify */
174
209
  if (setup_fields_with_no_wrap(session, 0, fields, MARK_COLUMNS_WRITE, 0, 0))
175
 
  {
176
 
    DRIZZLE_UPDATE_DONE(1, 0, 0);
177
 
    return 1;
178
 
  }
179
 
 
 
210
    goto abort;                               /* purecov: inspected */
180
211
  if (table->timestamp_field)
181
212
  {
182
213
    // Don't set timestamp column if this is modified
183
 
    if (table->timestamp_field->isWriteSet())
184
 
    {
 
214
    if (bitmap_is_set(table->write_set,
 
215
                      table->timestamp_field->field_index))
185
216
      table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
186
 
    }
187
217
    else
188
218
    {
189
219
      if (table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_UPDATE ||
190
220
          table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH)
191
 
      {
192
 
        table->setWriteSet(table->timestamp_field->position());
193
 
      }
 
221
        bitmap_set_bit(table->write_set,
 
222
                       table->timestamp_field->field_index);
194
223
    }
195
224
  }
196
225
 
197
226
  if (setup_fields(session, 0, values, MARK_COLUMNS_READ, 0, 0))
198
227
  {
199
228
    free_underlaid_joins(session, select_lex);
200
 
    DRIZZLE_UPDATE_DONE(1, 0, 0);
201
 
 
202
 
    return 1;
 
229
    goto abort;                               /* purecov: inspected */
203
230
  }
204
231
 
205
232
  if (select_lex->inner_refs_list.elements &&
206
233
    fix_inner_refs(session, all_fields, select_lex, select_lex->ref_pointer_array))
207
234
  {
208
 
    DRIZZLE_UPDATE_DONE(1, 0, 0);
209
 
    return -1;
 
235
    DRIZZLE_UPDATE_END();
 
236
    return(-1);
210
237
  }
211
238
 
212
239
  if (conds)
222
249
    update force the table handler to retrieve write-only fields to be able
223
250
    to compare records and detect data change.
224
251
  */
225
 
  if (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ) &&
 
252
  if (table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ &&
226
253
      table->timestamp_field &&
227
254
      (table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_UPDATE ||
228
255
       table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
229
 
  {
230
 
    *table->read_set|= *table->write_set;
231
 
  }
 
256
    bitmap_union(table->read_set, table->write_set);
232
257
  // Don't count on usage of 'only index' when calculating which key to use
233
 
  table->covering_keys.reset();
234
 
 
235
 
  /* Update the table->cursor->stats.records number */
236
 
  table->cursor->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK);
237
 
 
238
 
  select= optimizer::make_select(table, 0, 0, conds, 0, &error);
 
258
  table->covering_keys.clear_all();
 
259
 
 
260
  /* Update the table->file->stats.records number */
 
261
  table->file->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK);
 
262
 
 
263
  select= make_select(table, 0, 0, conds, 0, &error);
239
264
  if (error || !limit ||
240
 
      (select && select->check_quick(session, false, limit)))
 
265
      (select && select->check_quick(session, safe_update, limit)))
241
266
  {
242
267
    delete select;
243
 
    /**
244
 
     * Resetting the Diagnostic area to prevent
245
 
     * lp bug# 439719
246
 
     */
247
 
    session->main_da.reset_diagnostics_area();
248
268
    free_underlaid_joins(session, select_lex);
249
269
    if (error)
250
 
    {
251
 
      DRIZZLE_UPDATE_DONE(1, 0, 0);
252
 
      return 1;
253
 
    }
254
 
    DRIZZLE_UPDATE_DONE(0, 0, 0);
255
 
    session->my_ok();                           // No matching records
256
 
    return 0;
 
270
      goto abort;                               // Error in where
 
271
    DRIZZLE_UPDATE_END();
 
272
    my_ok(session);                             // No matching records
 
273
    return(0);
257
274
  }
258
275
  if (!select && limit != HA_POS_ERROR)
259
276
  {
260
 
    if ((used_index= optimizer::get_index_for_order(table, order, limit)) != MAX_KEY)
 
277
    if ((used_index= get_index_for_order(table, order, limit)) != MAX_KEY)
261
278
      need_sort= false;
262
279
  }
263
280
  /* If running in safe sql mode, don't allow updates without keys */
264
 
  if (table->quick_keys.none())
 
281
  if (table->quick_keys.is_clear_all())
265
282
  {
266
283
    session->server_status|=SERVER_QUERY_NO_INDEX_USED;
 
284
    if (safe_update && !using_limit)
 
285
    {
 
286
      my_message(ER_UPDATE_WITHOUT_KEY_IN_SAFE_MODE,
 
287
                 ER(ER_UPDATE_WITHOUT_KEY_IN_SAFE_MODE), MYF(0));
 
288
      goto err;
 
289
    }
267
290
  }
268
291
 
269
292
  table->mark_columns_needed_for_update();
274
297
  {
275
298
    used_index= select->quick->index;
276
299
    used_key_is_modified= (!select->quick->unique_key_range() &&
277
 
                          select->quick->is_keys_used(*table->write_set));
 
300
                          select->quick->is_keys_used(table->write_set));
278
301
  }
279
302
  else
280
303
  {
281
304
    used_key_is_modified= 0;
282
305
    if (used_index == MAX_KEY)                  // no index for sort order
283
 
      used_index= table->cursor->key_used_on_scan;
 
306
      used_index= table->file->key_used_on_scan;
284
307
    if (used_index != MAX_KEY)
285
 
      used_key_is_modified= is_key_used(table, used_index, *table->write_set);
 
308
      used_key_is_modified= is_key_used(table, used_index, table->write_set);
286
309
  }
287
310
 
288
311
 
292
315
      We can't update table directly;  We must first search after all
293
316
      matching rows before updating the table!
294
317
    */
295
 
    if (used_index < MAX_KEY && old_covering_keys.test(used_index))
 
318
    if (used_index < MAX_KEY && old_covering_keys.is_set(used_index))
296
319
    {
297
320
      table->key_read=1;
298
321
      table->mark_columns_used_by_index(used_index);
311
334
        NOTE: filesort will call table->prepare_for_position()
312
335
      */
313
336
      uint32_t         length= 0;
314
 
      SortField  *sortorder;
 
337
      SORT_FIELD  *sortorder;
315
338
      ha_rows examined_rows;
316
 
      FileSort filesort(*session);
317
339
 
318
 
      table->sort.io_cache= new internal::IO_CACHE;
 
340
      table->sort.io_cache = new IO_CACHE;
 
341
      memset(table->sort.io_cache, 0, sizeof(IO_CACHE));
319
342
 
320
343
      if (!(sortorder=make_unireg_sortorder(order, &length, NULL)) ||
321
 
          (table->sort.found_records= filesort.run(table, sortorder, length,
322
 
                                                   select, limit, 1,
323
 
                                                   examined_rows)) == HA_POS_ERROR)
 
344
          (table->sort.found_records= filesort(session, table, sortorder, length,
 
345
                                               select, limit, 1,
 
346
                                               &examined_rows))
 
347
          == HA_POS_ERROR)
324
348
      {
325
349
        goto err;
326
350
      }
339
363
        update these in a separate loop based on the pointer.
340
364
      */
341
365
 
342
 
      internal::IO_CACHE tempfile;
343
 
      if (tempfile.open_cached_file(drizzle_tmpdir.c_str(),TEMP_PREFIX, DISK_BUFFER_SIZE, MYF(MY_WME)))
344
 
      {
 
366
      IO_CACHE tempfile;
 
367
      if (open_cached_file(&tempfile, drizzle_tmpdir,TEMP_PREFIX,
 
368
                           DISK_BUFFER_SIZE, MYF(MY_WME)))
345
369
        goto err;
346
 
      }
347
370
 
348
371
      /* If quick select is used, initialize it before retrieving rows. */
349
372
      if (select && select->quick && select->quick->reset())
350
373
        goto err;
351
 
      table->cursor->try_semi_consistent_read(1);
 
374
      table->file->try_semi_consistent_read(1);
352
375
 
353
376
      /*
354
377
        When we get here, we have one of the following options:
362
385
      */
363
386
 
364
387
      if (used_index == MAX_KEY || (select && select->quick))
365
 
      {
366
 
        info.init_read_record(session, table, select, 0, true);
367
 
      }
 
388
        init_read_record(&info,session,table,select,0,1);
368
389
      else
369
 
      {
370
 
        info.init_read_record_idx(session, table, 1, used_index);
371
 
      }
 
390
        init_read_record_idx(&info, session, table, 1, used_index);
372
391
 
373
392
      session->set_proc_info("Searching rows for update");
374
393
      ha_rows tmp_limit= limit;
375
394
 
376
 
      while (not(error= info.read_record(&info)) && not session->getKilled())
 
395
      while (!(error=info.read_record(&info)) && !session->killed)
377
396
      {
378
397
        if (!(select && select->skip_record()))
379
398
        {
380
 
          if (table->cursor->was_semi_consistent_read())
 
399
          if (table->file->was_semi_consistent_read())
381
400
            continue;  /* repeat the read of the same row if it still exists */
382
401
 
383
 
          table->cursor->position(table->getInsertRecord());
384
 
          if (my_b_write(&tempfile,table->cursor->ref,
385
 
                         table->cursor->ref_length))
 
402
          table->file->position(table->record[0]);
 
403
          if (my_b_write(&tempfile,table->file->ref,
 
404
                         table->file->ref_length))
386
405
          {
387
 
            error=1;
388
 
            break;
 
406
            error=1; /* purecov: inspected */
 
407
            break; /* purecov: inspected */
389
408
          }
390
409
          if (!--limit && using_limit)
391
410
          {
394
413
          }
395
414
        }
396
415
        else
397
 
          table->cursor->unlock_row();
 
416
          table->file->unlock_row();
398
417
      }
399
 
      if (session->getKilled() && not error)
 
418
      if (session->killed && !error)
400
419
        error= 1;                               // Aborted
401
420
      limit= tmp_limit;
402
 
      table->cursor->try_semi_consistent_read(0);
403
 
      info.end_read_record();
 
421
      table->file->try_semi_consistent_read(0);
 
422
      end_read_record(&info);
404
423
 
405
424
      /* Change select to use tempfile */
406
425
      if (select)
413
432
      }
414
433
      else
415
434
      {
416
 
        select= new optimizer::SqlSelect;
 
435
        select= new SQL_SELECT;
417
436
        select->head=table;
418
437
      }
419
 
      if (tempfile.reinit_io_cache(internal::READ_CACHE,0L,0,0))
420
 
        error=1;
421
 
      // Read row ptrs from this cursor
422
 
      memcpy(select->file, &tempfile, sizeof(tempfile));
 
438
      if (reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
 
439
        error=1; /* purecov: inspected */
 
440
      select->file=tempfile;                    // Read row ptrs from this file
423
441
      if (error >= 0)
424
442
        goto err;
425
443
    }
428
446
  }
429
447
 
430
448
  if (ignore)
431
 
    table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
449
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
432
450
 
433
451
  if (select && select->quick && select->quick->reset())
434
452
    goto err;
435
 
  table->cursor->try_semi_consistent_read(1);
436
 
  info.init_read_record(session, table, select, 0, true);
 
453
  table->file->try_semi_consistent_read(1);
 
454
  init_read_record(&info,session,table,select,0,1);
437
455
 
438
456
  updated= found= 0;
439
457
  /*
448
466
  session->cuted_fields= 0L;
449
467
  session->set_proc_info("Updating");
450
468
 
451
 
  transactional_table= table->cursor->has_transactions();
 
469
  transactional_table= table->file->has_transactions();
452
470
  session->abort_on_warning= test(!ignore);
 
471
  will_batch= !table->file->start_bulk_update();
453
472
 
454
473
  /*
455
474
    Assure that we can use position()
456
475
    if we need to create an error message.
457
476
  */
458
 
  if (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ))
 
477
  if (table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ)
459
478
    table->prepare_for_position();
460
479
 
461
 
  while (not (error=info.read_record(&info)) && not session->getKilled())
 
480
  /*
 
481
    We can use compare_record() to optimize away updates if
 
482
    the table handler is returning all columns OR if
 
483
    if all updated columns are read
 
484
  */
 
485
  can_compare_record= (!(table->file->ha_table_flags() &
 
486
                         HA_PARTIAL_COLUMN_READ) ||
 
487
                       bitmap_is_subset(table->write_set, table->read_set));
 
488
 
 
489
  while (!(error=info.read_record(&info)) && !session->killed)
462
490
  {
463
 
    if (not (select && select->skip_record()))
 
491
    if (!(select && select->skip_record()))
464
492
    {
465
 
      if (table->cursor->was_semi_consistent_read())
 
493
      if (table->file->was_semi_consistent_read())
466
494
        continue;  /* repeat the read of the same row if it still exists */
467
495
 
468
 
      table->storeRecord();
469
 
      if (fill_record(session, fields, values))
470
 
        break;
 
496
      store_record(table,record[1]);
 
497
      if (fill_record(session, fields, values, 0))
 
498
        break; /* purecov: inspected */
471
499
 
472
500
      found++;
473
501
 
474
 
      if (! table->records_are_comparable() || table->compare_records())
 
502
      if (!can_compare_record || table->compare_record())
475
503
      {
476
 
        /* Non-batched update */
477
 
        error= table->cursor->updateRecord(table->getUpdateRecord(),
478
 
                                            table->getInsertRecord());
479
 
 
480
 
        table->auto_increment_field_not_null= false;
481
 
 
 
504
        if (will_batch)
 
505
        {
 
506
          /*
 
507
            Typically a batched handler can execute the batched jobs when:
 
508
            1) When specifically told to do so
 
509
            2) When it is not a good idea to batch anymore
 
510
            3) When it is necessary to send batch for other reasons
 
511
               (One such reason is when READ's must be performed)
 
512
 
 
513
            1) is covered by exec_bulk_update calls.
 
514
            2) and 3) is handled by the bulk_update_row method.
 
515
 
 
516
            bulk_update_row can execute the updates including the one
 
517
            defined in the bulk_update_row or not including the row
 
518
            in the call. This is up to the handler implementation and can
 
519
            vary from call to call.
 
520
 
 
521
            The dup_key_found reports the number of duplicate keys found
 
522
            in those updates actually executed. It only reports those if
 
523
            the extra call with HA_EXTRA_IGNORE_DUP_KEY have been issued.
 
524
            If this hasn't been issued it returns an error code and can
 
525
            ignore this number. Thus any handler that implements batching
 
526
            for UPDATE IGNORE must also handle this extra call properly.
 
527
 
 
528
            If a duplicate key is found on the record included in this
 
529
            call then it should be included in the count of dup_key_found
 
530
            and error should be set to 0 (only if these errors are ignored).
 
531
          */
 
532
          error= table->file->ha_bulk_update_row(table->record[1],
 
533
                                                 table->record[0],
 
534
                                                 &dup_key_found);
 
535
          limit+= dup_key_found;
 
536
          updated-= dup_key_found;
 
537
        }
 
538
        else
 
539
        {
 
540
          /* Non-batched update */
 
541
          error= table->file->ha_update_row(table->record[1],
 
542
                                            table->record[0]);
 
543
        }
482
544
        if (!error || error == HA_ERR_RECORD_IS_THE_SAME)
483
 
        {
 
545
        {
484
546
          if (error != HA_ERR_RECORD_IS_THE_SAME)
485
547
            updated++;
486
548
          else
487
549
            error= 0;
488
 
        }
489
 
        else if (! ignore ||
490
 
                 table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
491
 
        {
 
550
        }
 
551
        else if (!ignore ||
 
552
                 table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
 
553
        {
492
554
          /*
493
555
            If (ignore && error is ignorable) we don't have to
494
556
            do anything; otherwise...
495
557
          */
496
558
          myf flags= 0;
497
559
 
498
 
          if (table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
 
560
          if (table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
499
561
            flags|= ME_FATALERROR; /* Other handler errors are fatal */
500
562
 
501
563
          prepare_record_for_error_message(error, table);
502
 
          table->print_error(error,MYF(flags));
503
 
          error= 1;
504
 
          break;
505
 
        }
 
564
          table->file->print_error(error,MYF(flags));
 
565
          error= 1;
 
566
          break;
 
567
        }
506
568
      }
507
569
 
508
570
      if (!--limit && using_limit)
509
571
      {
510
 
        error= -1;                              // Simulate end of cursor
511
 
        break;
 
572
        /*
 
573
          We have reached end-of-file in most common situations where no
 
574
          batching has occurred and if batching was supposed to occur but
 
575
          no updates were made and finally when the batch execution was
 
576
          performed without error and without finding any duplicate keys.
 
577
          If the batched updates were performed with errors we need to
 
578
          check and if no error but duplicate key's found we need to
 
579
          continue since those are not counted for in limit.
 
580
        */
 
581
        if (will_batch &&
 
582
            ((error= table->file->exec_bulk_update(&dup_key_found)) ||
 
583
             dup_key_found))
 
584
        {
 
585
          if (error)
 
586
          {
 
587
            /* purecov: begin inspected */
 
588
            /*
 
589
              The handler should not report error of duplicate keys if they
 
590
              are ignored. This is a requirement on batching handlers.
 
591
            */
 
592
            prepare_record_for_error_message(error, table);
 
593
            table->file->print_error(error,MYF(0));
 
594
            error= 1;
 
595
            break;
 
596
            /* purecov: end */
 
597
          }
 
598
          /*
 
599
            Either an error was found and we are ignoring errors or there
 
600
            were duplicate keys found. In both cases we need to correct
 
601
            the counters and continue the loop.
 
602
          */
 
603
          limit= dup_key_found; //limit is 0 when we get here so need to +
 
604
          updated-= dup_key_found;
 
605
        }
 
606
        else
 
607
        {
 
608
          error= -1;                            // Simulate end of file
 
609
          break;
 
610
        }
512
611
      }
513
612
    }
514
613
    else
515
 
      table->cursor->unlock_row();
 
614
      table->file->unlock_row();
516
615
    session->row_count++;
517
616
  }
518
617
  dup_key_found= 0;
524
623
    It's assumed that if an error was set in combination with an effective
525
624
    killed status then the error is due to killing.
526
625
  */
527
 
  killed_status= session->getKilled(); // get the status of the volatile
 
626
  killed_status= session->killed; // get the status of the volatile
528
627
  // simulated killing after the loop must be ineffective for binlogging
529
628
  error= (killed_status == Session::NOT_KILLED)?  error : 1;
530
629
 
531
 
  updated-= dup_key_found;
532
 
  table->cursor->try_semi_consistent_read(0);
 
630
  if (error &&
 
631
      will_batch &&
 
632
      (loc_error= table->file->exec_bulk_update(&dup_key_found)))
 
633
    /*
 
634
      An error has occurred when a batched update was performed and returned
 
635
      an error indication. It cannot be an allowed duplicate key error since
 
636
      we require the batching handler to treat this as a normal behavior.
 
637
 
 
638
      Otherwise we simply remove the number of duplicate keys records found
 
639
      in the batched update.
 
640
    */
 
641
  {
 
642
    /* purecov: begin inspected */
 
643
    prepare_record_for_error_message(loc_error, table);
 
644
    table->file->print_error(loc_error,MYF(ME_FATALERROR));
 
645
    error= 1;
 
646
    /* purecov: end */
 
647
  }
 
648
  else
 
649
    updated-= dup_key_found;
 
650
  if (will_batch)
 
651
    table->file->end_bulk_update();
 
652
  table->file->try_semi_consistent_read(0);
533
653
 
534
654
  if (!transactional_table && updated > 0)
535
 
    session->transaction.stmt.markModifiedNonTransData();
 
655
    session->transaction.stmt.modified_non_trans_table= true;
536
656
 
537
 
  info.end_read_record();
 
657
  end_read_record(&info);
538
658
  delete select;
539
659
  session->set_proc_info("end");
540
 
  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
660
  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
541
661
 
542
662
  /*
543
663
    error < 0 means really no error at all: we processed all rows until the
544
664
    last one without error. error > 0 means an error (e.g. unique key
545
665
    violation and no IGNORE or REPLACE). error == 0 is also an error (if
546
666
    preparing the record or invoking before triggers fails). See
547
 
    autocommitOrRollback(error>=0) and return(error>=0) below.
 
667
    ha_autocommit_or_rollback(error>=0) and return(error>=0) below.
548
668
    Sometimes we want to binlog even if we updated no rows, in case user used
549
669
    it to be sure master and slave are in same state.
550
670
  */
551
 
  if ((error < 0) || session->transaction.stmt.hasModifiedNonTransData())
 
671
  if ((error < 0) || session->transaction.stmt.modified_non_trans_table)
552
672
  {
553
 
    if (session->transaction.stmt.hasModifiedNonTransData())
554
 
      session->transaction.all.markModifiedNonTransData();
 
673
    if (session->transaction.stmt.modified_non_trans_table)
 
674
      session->transaction.all.modified_non_trans_table= true;
555
675
  }
556
 
  assert(transactional_table || !updated || session->transaction.stmt.hasModifiedNonTransData());
 
676
  assert(transactional_table || !updated || session->transaction.stmt.modified_non_trans_table);
557
677
  free_underlaid_joins(session, select_lex);
558
678
 
559
679
  /* If LAST_INSERT_ID(X) was used, report X */
560
680
  id= session->arg_of_last_insert_id_function ?
561
681
    session->first_successful_insert_id_in_prev_stmt : 0;
562
682
 
 
683
  DRIZZLE_UPDATE_END();
563
684
  if (error < 0)
564
685
  {
565
686
    char buff[STRING_BUFFER_USUAL_SIZE];
566
 
    snprintf(buff, sizeof(buff), ER(ER_UPDATE_INFO), (ulong) found, (ulong) updated,
 
687
    sprintf(buff, ER(ER_UPDATE_INFO), (ulong) found, (ulong) updated,
567
688
            (ulong) session->cuted_fields);
568
 
    session->row_count_func= updated;
569
 
    /**
570
 
     * Resetting the Diagnostic area to prevent
571
 
     * lp bug# 439719
572
 
     */
573
 
    session->main_da.reset_diagnostics_area();
574
 
    session->my_ok((ulong) session->row_count_func, found, id, buff);
575
 
    session->status_var.updated_row_count+= session->row_count_func;
 
689
    session->row_count_func=
 
690
      (session->client_capabilities & CLIENT_FOUND_ROWS) ? found : updated;
 
691
    my_ok(session, (ulong) session->row_count_func, id, buff);
576
692
  }
577
 
  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;              /* calc cuted fields */
 
693
  session->count_cuted_fields= CHECK_FIELD_IGNORE;              /* calc cuted fields */
578
694
  session->abort_on_warning= 0;
579
 
  DRIZZLE_UPDATE_DONE((error >= 0 || session->is_error()), found, updated);
580
 
  return ((error >= 0 || session->is_error()) ? 1 : 0);
 
695
  return((error >= 0 || session->is_error()) ? 1 : 0);
581
696
 
582
697
err:
583
698
  delete select;
585
700
  if (table->key_read)
586
701
  {
587
702
    table->key_read=0;
588
 
    table->cursor->extra(HA_EXTRA_NO_KEYREAD);
 
703
    table->file->extra(HA_EXTRA_NO_KEYREAD);
589
704
  }
590
705
  session->abort_on_warning= 0;
591
706
 
592
 
  DRIZZLE_UPDATE_DONE(1, 0, 0);
593
 
  return 1;
 
707
abort:
 
708
  DRIZZLE_UPDATE_END();
 
709
  return(1);
594
710
}
595
711
 
596
712
/*
601
717
    session                     - thread handler
602
718
    table_list          - global/local table list
603
719
    conds               - conditions
604
 
    order_num           - number of ORDER BY list entries
605
 
    order               - ORDER BY clause list
 
720
    order_num           - number of order_st BY list entries
 
721
    order               - order_st BY clause list
606
722
 
607
723
  RETURN VALUE
608
724
    false OK
609
725
    true  error
610
726
*/
611
727
bool mysql_prepare_update(Session *session, TableList *table_list,
612
 
                         Item **conds, uint32_t order_num, Order *order)
 
728
                         Item **conds, uint32_t order_num, order_st *order)
613
729
{
614
730
  List<Item> all_fields;
615
 
  Select_Lex *select_lex= &session->lex->select_lex;
 
731
  SELECT_LEX *select_lex= &session->lex->select_lex;
616
732
 
617
733
  session->lex->allow_sum_func= 0;
618
734
 
621
737
                                    table_list,
622
738
                                    &select_lex->leaf_tables,
623
739
                                    false) ||
624
 
      session->setup_conds(table_list, conds) ||
 
740
      setup_conds(session, table_list, select_lex->leaf_tables, conds) ||
625
741
      select_lex->setup_ref_array(session, order_num) ||
626
742
      setup_order(session, select_lex->ref_pointer_array,
627
743
                  table_list, all_fields, all_fields, order))
628
 
    return true;
 
744
    return(true);
629
745
 
630
746
  /* Check that we are not using table that we are updating in a sub select */
631
747
  {
632
748
    TableList *duplicate;
633
 
    if ((duplicate= unique_table(table_list, table_list->next_global)))
634
 
    {
635
 
      my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
636
 
      return true;
637
 
    }
638
 
  }
639
 
 
 
749
    if ((duplicate= unique_table(session, table_list, table_list->next_global, 0)))
 
750
    {
 
751
      update_non_unique_table_error(table_list, "UPDATE", duplicate);
 
752
      my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
 
753
      return(true);
 
754
    }
 
755
  }
 
756
 
 
757
  return(false);
 
758
}
 
759
 
 
760
 
 
761
/***************************************************************************
 
762
  Update multiple tables from join
 
763
***************************************************************************/
 
764
 
 
765
/*
 
766
  Get table map for list of Item_field
 
767
*/
 
768
 
 
769
static table_map get_table_map(List<Item> *items)
 
770
{
 
771
  List_iterator_fast<Item> item_it(*items);
 
772
  Item_field *item;
 
773
  table_map map= 0;
 
774
 
 
775
  while ((item= (Item_field *) item_it++))
 
776
    map|= item->used_tables();
 
777
  return map;
 
778
}
 
779
 
 
780
 
 
781
/*
 
782
  make update specific preparation and checks after opening tables
 
783
 
 
784
  SYNOPSIS
 
785
    mysql_multi_update_prepare()
 
786
    session         thread handler
 
787
 
 
788
  RETURN
 
789
    false OK
 
790
    true  Error
 
791
*/
 
792
 
 
793
int mysql_multi_update_prepare(Session *session)
 
794
{
 
795
  LEX *lex= session->lex;
 
796
  TableList *table_list= lex->query_tables;
 
797
  TableList *tl, *leaves;
 
798
  List<Item> *fields= &lex->select_lex.item_list;
 
799
  table_map tables_for_update;
 
800
  bool update_view= 0;
 
801
  /*
 
802
    if this multi-update was converted from usual update, here is table
 
803
    counter else junk will be assigned here, but then replaced with real
 
804
    count in open_tables()
 
805
  */
 
806
  uint32_t  table_count= lex->table_count;
 
807
  const bool using_lock_tables= session->locked_tables != 0;
 
808
  bool original_multiupdate= (session->lex->sql_command == SQLCOM_UPDATE_MULTI);
 
809
  bool need_reopen= false;
 
810
 
 
811
 
 
812
  /* following need for prepared statements, to run next time multi-update */
 
813
  session->lex->sql_command= SQLCOM_UPDATE_MULTI;
 
814
 
 
815
reopen_tables:
 
816
 
 
817
  /* open tables and create derived ones, but do not lock and fill them */
 
818
  if (((original_multiupdate || need_reopen) &&
 
819
       open_tables(session, &table_list, &table_count, 0)) ||
 
820
      mysql_handle_derived(lex, &mysql_derived_prepare))
 
821
    return(true);
 
822
  /*
 
823
    setup_tables() need for VIEWs. JOIN::prepare() will call setup_tables()
 
824
    second time, but this call will do nothing (there are check for second
 
825
    call in setup_tables()).
 
826
  */
 
827
 
 
828
  if (setup_tables_and_check_access(session, &lex->select_lex.context,
 
829
                                    &lex->select_lex.top_join_list,
 
830
                                    table_list,
 
831
                                    &lex->select_lex.leaf_tables, false))
 
832
    return(true);
 
833
 
 
834
  if (setup_fields_with_no_wrap(session, 0, *fields, MARK_COLUMNS_WRITE, 0, 0))
 
835
    return(true);
 
836
 
 
837
  if (update_view && check_fields(session, *fields))
 
838
  {
 
839
    return(true);
 
840
  }
 
841
 
 
842
  tables_for_update= get_table_map(fields);
 
843
 
 
844
  /*
 
845
    Setup timestamp handling and locking mode
 
846
  */
 
847
  leaves= lex->select_lex.leaf_tables;
 
848
  for (tl= leaves; tl; tl= tl->next_leaf)
 
849
  {
 
850
    Table *table= tl->table;
 
851
    /* Only set timestamp column if this is not modified */
 
852
    if (table->timestamp_field &&
 
853
        bitmap_is_set(table->write_set,
 
854
                      table->timestamp_field->field_index))
 
855
      table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
 
856
 
 
857
    /* if table will be updated then check that it is unique */
 
858
    if (table->map & tables_for_update)
 
859
    {
 
860
      table->mark_columns_needed_for_update();
 
861
      /*
 
862
        If table will be updated we should not downgrade lock for it and
 
863
        leave it as is.
 
864
      */
 
865
    }
 
866
    else
 
867
    {
 
868
      /*
 
869
        If we are using the binary log, we need TL_READ_NO_INSERT to get
 
870
        correct order of statements. Otherwise, we use a TL_READ lock to
 
871
        improve performance.
 
872
      */
 
873
      tl->lock_type= TL_READ;
 
874
      tl->updating= 0;
 
875
      /* Update Table::lock_type accordingly. */
 
876
      if (!tl->placeholder() && !using_lock_tables)
 
877
        tl->table->reginfo.lock_type= tl->lock_type;
 
878
    }
 
879
  }
 
880
 
 
881
  /* now lock and fill tables */
 
882
  if (lock_tables(session, table_list, table_count, &need_reopen))
 
883
  {
 
884
    if (!need_reopen)
 
885
      return(true);
 
886
 
 
887
    /*
 
888
      We have to reopen tables since some of them were altered or dropped
 
889
      during lock_tables() or something was done with their triggers.
 
890
      Let us do some cleanups to be able do setup_table() and setup_fields()
 
891
      once again.
 
892
    */
 
893
    List_iterator_fast<Item> it(*fields);
 
894
    Item *item;
 
895
    while ((item= it++))
 
896
      item->cleanup();
 
897
 
 
898
    /* We have to cleanup translation tables of views. */
 
899
    for (TableList *tbl= table_list; tbl; tbl= tbl->next_global)
 
900
      tbl->cleanup_items();
 
901
 
 
902
    close_tables_for_reopen(session, &table_list);
 
903
    goto reopen_tables;
 
904
  }
 
905
 
 
906
  /*
 
907
    Check that we are not using table that we are updating, but we should
 
908
    skip all tables of UPDATE SELECT itself
 
909
  */
 
910
  lex->select_lex.exclude_from_table_unique_test= true;
 
911
  /* We only need SELECT privilege for columns in the values list */
 
912
  for (tl= leaves; tl; tl= tl->next_leaf)
 
913
  {
 
914
    if (tl->lock_type != TL_READ &&
 
915
        tl->lock_type != TL_READ_NO_INSERT)
 
916
    {
 
917
      TableList *duplicate;
 
918
      if ((duplicate= unique_table(session, tl, table_list, 0)))
 
919
      {
 
920
        update_non_unique_table_error(table_list, "UPDATE", duplicate);
 
921
        return(true);
 
922
      }
 
923
    }
 
924
  }
 
925
  /*
 
926
    Set exclude_from_table_unique_test value back to false. It is needed for
 
927
    further check in multi_update::prepare whether to use record cache.
 
928
  */
 
929
  lex->select_lex.exclude_from_table_unique_test= false;
 
930
 
 
931
  if (session->fill_derived_tables() &&
 
932
      mysql_handle_derived(lex, &mysql_derived_filling))
 
933
    return(true);
 
934
 
 
935
  return (false);
 
936
}
 
937
 
 
938
 
 
939
/*
 
940
  Setup multi-update handling and call SELECT to do the join
 
941
*/
 
942
 
 
943
bool mysql_multi_update(Session *session,
 
944
                        TableList *table_list,
 
945
                        List<Item> *fields,
 
946
                        List<Item> *values,
 
947
                        COND *conds,
 
948
                        uint64_t options,
 
949
                        enum enum_duplicates handle_duplicates, bool ignore,
 
950
                        SELECT_LEX_UNIT *unit, SELECT_LEX *select_lex)
 
951
{
 
952
  multi_update *result;
 
953
  bool res;
 
954
 
 
955
  if (!(result= new multi_update(table_list,
 
956
                                 session->lex->select_lex.leaf_tables,
 
957
                                 fields, values,
 
958
                                 handle_duplicates, ignore)))
 
959
    return(true);
 
960
 
 
961
  session->abort_on_warning= true;
 
962
 
 
963
  List<Item> total_list;
 
964
  res= mysql_select(session, &select_lex->ref_pointer_array,
 
965
                      table_list, select_lex->with_wild,
 
966
                      total_list,
 
967
                      conds, 0, (order_st *) NULL, (order_st *)NULL, (Item *) NULL,
 
968
                      (order_st *)NULL,
 
969
                      options | SELECT_NO_JOIN_CACHE | SELECT_NO_UNLOCK |
 
970
                      OPTION_SETUP_TABLES_DONE,
 
971
                      result, unit, select_lex);
 
972
  res|= session->is_error();
 
973
  if (unlikely(res))
 
974
  {
 
975
    /* If we had a another error reported earlier then this will be ignored */
 
976
    result->send_error(ER_UNKNOWN_ERROR, ER(ER_UNKNOWN_ERROR));
 
977
    result->abort();
 
978
  }
 
979
  delete result;
 
980
  session->abort_on_warning= 0;
 
981
  return(false);
 
982
}
 
983
 
 
984
 
 
985
multi_update::multi_update(TableList *table_list,
 
986
                           TableList *leaves_list,
 
987
                           List<Item> *field_list, List<Item> *value_list,
 
988
                           enum enum_duplicates handle_duplicates_arg,
 
989
                           bool ignore_arg)
 
990
  :all_tables(table_list), leaves(leaves_list), update_tables(0),
 
991
   tmp_tables(0), updated(0), found(0), fields(field_list),
 
992
   values(value_list), table_count(0), copy_field(0),
 
993
   handle_duplicates(handle_duplicates_arg), do_update(1), trans_safe(1),
 
994
   transactional_tables(0), ignore(ignore_arg), error_handled(0)
 
995
{}
 
996
 
 
997
 
 
998
/*
 
999
  Connect fields with tables and create list of tables that are updated
 
1000
*/
 
1001
 
 
1002
int multi_update::prepare(List<Item> &,
 
1003
                          SELECT_LEX_UNIT *)
 
1004
{
 
1005
  TableList *table_ref;
 
1006
  SQL_LIST update;
 
1007
  table_map tables_to_update;
 
1008
  Item_field *item;
 
1009
  List_iterator_fast<Item> field_it(*fields);
 
1010
  List_iterator_fast<Item> value_it(*values);
 
1011
  uint32_t i, max_fields;
 
1012
  uint32_t leaf_table_count= 0;
 
1013
 
 
1014
  session->count_cuted_fields= CHECK_FIELD_WARN;
 
1015
  session->cuted_fields=0L;
 
1016
  session->set_proc_info("updating main table");
 
1017
 
 
1018
  tables_to_update= get_table_map(fields);
 
1019
 
 
1020
  if (!tables_to_update)
 
1021
  {
 
1022
    my_message(ER_NO_TABLES_USED, ER(ER_NO_TABLES_USED), MYF(0));
 
1023
    return(1);
 
1024
  }
 
1025
 
 
1026
  /*
 
1027
    We have to check values after setup_tables to get covering_keys right in
 
1028
    reference tables
 
1029
  */
 
1030
 
 
1031
  if (setup_fields(session, 0, *values, MARK_COLUMNS_READ, 0, 0))
 
1032
    return(1);
 
1033
 
 
1034
  /*
 
1035
    Save tables beeing updated in update_tables
 
1036
    update_table->shared is position for table
 
1037
    Don't use key read on tables that are updated
 
1038
  */
 
1039
 
 
1040
  update.empty();
 
1041
  for (table_ref= leaves; table_ref; table_ref= table_ref->next_leaf)
 
1042
  {
 
1043
    /* TODO: add support of view of join support */
 
1044
    Table *table=table_ref->table;
 
1045
    leaf_table_count++;
 
1046
    if (tables_to_update & table->map)
 
1047
    {
 
1048
      TableList *tl= (TableList*) session->memdup((char*) table_ref,
 
1049
                                                sizeof(*tl));
 
1050
      if (!tl)
 
1051
        return(1);
 
1052
      update.link_in_list((unsigned char*) tl, (unsigned char**) &tl->next_local);
 
1053
      tl->shared= table_count++;
 
1054
      table->no_keyread=1;
 
1055
      table->covering_keys.clear_all();
 
1056
      table->pos_in_table_list= tl;
 
1057
    }
 
1058
  }
 
1059
 
 
1060
 
 
1061
  table_count=  update.elements;
 
1062
  update_tables= (TableList*) update.first;
 
1063
 
 
1064
  tmp_tables = (Table**) session->calloc(sizeof(Table *) * table_count);
 
1065
  tmp_table_param = (TMP_TABLE_PARAM*) session->calloc(sizeof(TMP_TABLE_PARAM) *
 
1066
                                                   table_count);
 
1067
  fields_for_table= (List_item **) session->alloc(sizeof(List_item *) *
 
1068
                                              table_count);
 
1069
  values_for_table= (List_item **) session->alloc(sizeof(List_item *) *
 
1070
                                              table_count);
 
1071
  if (session->is_fatal_error)
 
1072
    return(1);
 
1073
  for (i=0 ; i < table_count ; i++)
 
1074
  {
 
1075
    fields_for_table[i]= new List_item;
 
1076
    values_for_table[i]= new List_item;
 
1077
  }
 
1078
  if (session->is_fatal_error)
 
1079
    return(1);
 
1080
 
 
1081
  /* Split fields into fields_for_table[] and values_by_table[] */
 
1082
 
 
1083
  while ((item= (Item_field *) field_it++))
 
1084
  {
 
1085
    Item *value= value_it++;
 
1086
    uint32_t offset= item->field->table->pos_in_table_list->shared;
 
1087
    fields_for_table[offset]->push_back(item);
 
1088
    values_for_table[offset]->push_back(value);
 
1089
  }
 
1090
  if (session->is_fatal_error)
 
1091
    return(1);
 
1092
 
 
1093
  /* Allocate copy fields */
 
1094
  max_fields=0;
 
1095
  for (i=0 ; i < table_count ; i++)
 
1096
    set_if_bigger(max_fields, fields_for_table[i]->elements + leaf_table_count);
 
1097
  copy_field= new Copy_field[max_fields];
 
1098
  return(session->is_fatal_error != 0);
 
1099
}
 
1100
 
 
1101
 
 
1102
/*
 
1103
  Check if table is safe to update on fly
 
1104
 
 
1105
  SYNOPSIS
 
1106
    safe_update_on_fly()
 
1107
    session                 Thread handler
 
1108
    join_tab            How table is used in join
 
1109
    all_tables          List of tables
 
1110
 
 
1111
  NOTES
 
1112
    We can update the first table in join on the fly if we know that
 
1113
    a row in this table will never be read twice. This is true under
 
1114
    the following conditions:
 
1115
 
 
1116
    - We are doing a table scan and the data is in a separate file (MyISAM) or
 
1117
      if we don't update a clustered key.
 
1118
 
 
1119
    - We are doing a range scan and we don't update the scan key or
 
1120
      the primary key for a clustered table handler.
 
1121
 
 
1122
    - Table is not joined to itself.
 
1123
 
 
1124
    This function gets information about fields to be updated from
 
1125
    the Table::write_set bitmap.
 
1126
 
 
1127
  WARNING
 
1128
    This code is a bit dependent of how make_join_readinfo() works.
 
1129
 
 
1130
  RETURN
 
1131
    0           Not safe to update
 
1132
    1           Safe to update
 
1133
*/
 
1134
 
 
1135
static bool safe_update_on_fly(Session *session, JOIN_TAB *join_tab,
 
1136
                               TableList *table_ref, TableList *all_tables)
 
1137
{
 
1138
  Table *table= join_tab->table;
 
1139
  if (unique_table(session, table_ref, all_tables, 0))
 
1140
    return 0;
 
1141
  switch (join_tab->type) {
 
1142
  case JT_SYSTEM:
 
1143
  case JT_CONST:
 
1144
  case JT_EQ_REF:
 
1145
    return true;                                // At most one matching row
 
1146
  case JT_REF:
 
1147
  case JT_REF_OR_NULL:
 
1148
    return !is_key_used(table, join_tab->ref.key, table->write_set);
 
1149
  case JT_ALL:
 
1150
    /* If range search on index */
 
1151
    if (join_tab->quick)
 
1152
      return !join_tab->quick->is_keys_used(table->write_set);
 
1153
    /* If scanning in clustered key */
 
1154
    if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_IN_READ_INDEX) &&
 
1155
        table->s->primary_key < MAX_KEY)
 
1156
      return !is_key_used(table, table->s->primary_key, table->write_set);
 
1157
    return true;
 
1158
  default:
 
1159
    break;                                      // Avoid compler warning
 
1160
  }
640
1161
  return false;
641
 
}
642
 
 
643
 
} /* namespace drizzled */
 
1162
 
 
1163
}
 
1164
 
 
1165
 
 
1166
/*
 
1167
  Initialize table for multi table
 
1168
 
 
1169
  IMPLEMENTATION
 
1170
    - Update first table in join on the fly, if possible
 
1171
    - Create temporary tables to store changed values for all other tables
 
1172
      that are updated (and main_table if the above doesn't hold).
 
1173
*/
 
1174
 
 
1175
bool
 
1176
multi_update::initialize_tables(JOIN *join)
 
1177
{
 
1178
  TableList *table_ref;
 
1179
 
 
1180
  if ((session->options & OPTION_SAFE_UPDATES) && error_if_full_join(join))
 
1181
    return(1);
 
1182
  main_table=join->join_tab->table;
 
1183
  table_to_update= 0;
 
1184
 
 
1185
  /* Any update has at least one pair (field, value) */
 
1186
  assert(fields->elements);
 
1187
 
 
1188
  /* Create a temporary table for keys to all tables, except main table */
 
1189
  for (table_ref= update_tables; table_ref; table_ref= table_ref->next_local)
 
1190
  {
 
1191
    Table *table=table_ref->table;
 
1192
    uint32_t cnt= table_ref->shared;
 
1193
    List<Item> temp_fields;
 
1194
    order_st     group;
 
1195
    TMP_TABLE_PARAM *tmp_param;
 
1196
 
 
1197
    table->mark_columns_needed_for_update();
 
1198
    if (ignore)
 
1199
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
1200
    if (table == main_table)                    // First table in join
 
1201
    {
 
1202
      if (safe_update_on_fly(session, join->join_tab, table_ref, all_tables))
 
1203
      {
 
1204
        table_to_update= main_table;            // Update table on the fly
 
1205
        continue;
 
1206
      }
 
1207
    }
 
1208
    table->prepare_for_position();
 
1209
 
 
1210
    tmp_param= tmp_table_param+cnt;
 
1211
 
 
1212
    /*
 
1213
      Create a temporary table to store all fields that are changed for this
 
1214
      table. The first field in the temporary table is a pointer to the
 
1215
      original row so that we can find and update it. For the updatable
 
1216
      VIEW a few following fields are rowids of tables used in the CHECK
 
1217
      OPTION condition.
 
1218
    */
 
1219
 
 
1220
    List_iterator_fast<Table> tbl_it(unupdated_check_opt_tables);
 
1221
    Table *tbl= table;
 
1222
    do
 
1223
    {
 
1224
      Field_varstring *field= new Field_varstring(tbl->file->ref_length, 0,
 
1225
                                                  tbl->alias, tbl->s, &my_charset_bin);
 
1226
      if (!field)
 
1227
        return(1);
 
1228
      field->init(tbl);
 
1229
      /*
 
1230
        The field will be converted to varstring when creating tmp table if
 
1231
        table to be updated was created by mysql 4.1. Deny this.
 
1232
      */
 
1233
      Item_field *ifield= new Item_field((Field *) field);
 
1234
      if (!ifield)
 
1235
         return(1);
 
1236
      ifield->maybe_null= 0;
 
1237
      if (temp_fields.push_back(ifield))
 
1238
        return(1);
 
1239
    } while ((tbl= tbl_it++));
 
1240
 
 
1241
    temp_fields.concat(fields_for_table[cnt]);
 
1242
 
 
1243
    /* Make an unique key over the first field to avoid duplicated updates */
 
1244
    memset(&group, 0, sizeof(group));
 
1245
    group.asc= 1;
 
1246
    group.item= (Item**) temp_fields.head_ref();
 
1247
 
 
1248
    tmp_param->quick_group=1;
 
1249
    tmp_param->field_count=temp_fields.elements;
 
1250
    tmp_param->group_parts=1;
 
1251
    tmp_param->group_length= table->file->ref_length;
 
1252
    if (!(tmp_tables[cnt]=create_tmp_table(session,
 
1253
                                           tmp_param,
 
1254
                                           temp_fields,
 
1255
                                           (order_st*) &group, 0, 0,
 
1256
                                           TMP_TABLE_ALL_COLUMNS,
 
1257
                                           HA_POS_ERROR,
 
1258
                                           (char *) "")))
 
1259
      return(1);
 
1260
    tmp_tables[cnt]->file->extra(HA_EXTRA_WRITE_CACHE);
 
1261
  }
 
1262
  return(0);
 
1263
}
 
1264
 
 
1265
 
 
1266
multi_update::~multi_update()
 
1267
{
 
1268
  TableList *table;
 
1269
  for (table= update_tables ; table; table= table->next_local)
 
1270
  {
 
1271
    table->table->no_keyread= table->table->no_cache= 0;
 
1272
    if (ignore)
 
1273
      table->table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1274
  }
 
1275
 
 
1276
  if (tmp_tables)
 
1277
  {
 
1278
    for (uint32_t cnt = 0; cnt < table_count; cnt++)
 
1279
    {
 
1280
      if (tmp_tables[cnt])
 
1281
      {
 
1282
        tmp_tables[cnt]->free_tmp_table(session);
 
1283
        tmp_table_param[cnt].cleanup();
 
1284
      }
 
1285
    }
 
1286
  }
 
1287
  if (copy_field)
 
1288
    delete [] copy_field;
 
1289
  session->count_cuted_fields= CHECK_FIELD_IGNORE;              // Restore this setting
 
1290
  assert(trans_safe || !updated ||
 
1291
              session->transaction.all.modified_non_trans_table);
 
1292
}
 
1293
 
 
1294
 
 
1295
bool multi_update::send_data(List<Item> &)
 
1296
{
 
1297
  TableList *cur_table;
 
1298
 
 
1299
  for (cur_table= update_tables; cur_table; cur_table= cur_table->next_local)
 
1300
  {
 
1301
    Table *table= cur_table->table;
 
1302
    uint32_t offset= cur_table->shared;
 
1303
    /*
 
1304
      Check if we are using outer join and we didn't find the row
 
1305
      or if we have already updated this row in the previous call to this
 
1306
      function.
 
1307
 
 
1308
      The same row may be presented here several times in a join of type
 
1309
      UPDATE t1 FROM t1,t2 SET t1.a=t2.a
 
1310
 
 
1311
      In this case we will do the update for the first found row combination.
 
1312
      The join algorithm guarantees that we will not find the a row in
 
1313
      t1 several times.
 
1314
    */
 
1315
    if (table->status & (STATUS_NULL_ROW | STATUS_UPDATED))
 
1316
      continue;
 
1317
 
 
1318
    /*
 
1319
      We can use compare_record() to optimize away updates if
 
1320
      the table handler is returning all columns OR if
 
1321
      if all updated columns are read
 
1322
    */
 
1323
    if (table == table_to_update)
 
1324
    {
 
1325
      bool can_compare_record;
 
1326
      can_compare_record= (!(table->file->ha_table_flags() &
 
1327
                             HA_PARTIAL_COLUMN_READ) ||
 
1328
                           bitmap_is_subset(table->write_set,
 
1329
                                            table->read_set));
 
1330
      table->status|= STATUS_UPDATED;
 
1331
      store_record(table,record[1]);
 
1332
      if (fill_record(session, *fields_for_table[offset],
 
1333
                      *values_for_table[offset], 0))
 
1334
        return(1);
 
1335
 
 
1336
      found++;
 
1337
      if (!can_compare_record || table->compare_record())
 
1338
      {
 
1339
        int error;
 
1340
        if (!updated++)
 
1341
        {
 
1342
          /*
 
1343
            Inform the main table that we are going to update the table even
 
1344
            while we may be scanning it.  This will flush the read cache
 
1345
            if it's used.
 
1346
          */
 
1347
          main_table->file->extra(HA_EXTRA_PREPARE_FOR_UPDATE);
 
1348
        }
 
1349
        if ((error=table->file->ha_update_row(table->record[1],
 
1350
                                              table->record[0])) &&
 
1351
            error != HA_ERR_RECORD_IS_THE_SAME)
 
1352
        {
 
1353
          updated--;
 
1354
          if (!ignore ||
 
1355
              table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
 
1356
          {
 
1357
            /*
 
1358
              If (ignore && error == is ignorable) we don't have to
 
1359
              do anything; otherwise...
 
1360
            */
 
1361
            myf flags= 0;
 
1362
 
 
1363
            if (table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
 
1364
              flags|= ME_FATALERROR; /* Other handler errors are fatal */
 
1365
 
 
1366
            prepare_record_for_error_message(error, table);
 
1367
            table->file->print_error(error,MYF(flags));
 
1368
            return(1);
 
1369
          }
 
1370
        }
 
1371
        else
 
1372
        {
 
1373
          if (error == HA_ERR_RECORD_IS_THE_SAME)
 
1374
          {
 
1375
            error= 0;
 
1376
            updated--;
 
1377
          }
 
1378
          /* non-transactional or transactional table got modified   */
 
1379
          /* either multi_update class' flag is raised in its branch */
 
1380
          if (table->file->has_transactions())
 
1381
            transactional_tables= 1;
 
1382
          else
 
1383
          {
 
1384
            trans_safe= 0;
 
1385
            session->transaction.stmt.modified_non_trans_table= true;
 
1386
          }
 
1387
        }
 
1388
      }
 
1389
    }
 
1390
    else
 
1391
    {
 
1392
      int error;
 
1393
      Table *tmp_table= tmp_tables[offset];
 
1394
      /*
 
1395
       For updatable VIEW store rowid of the updated table and
 
1396
       rowids of tables used in the CHECK OPTION condition.
 
1397
      */
 
1398
      uint32_t field_num= 0;
 
1399
      List_iterator_fast<Table> tbl_it(unupdated_check_opt_tables);
 
1400
      Table *tbl= table;
 
1401
      do
 
1402
      {
 
1403
        tbl->file->position(tbl->record[0]);
 
1404
        Field_varstring *ref_field=
 
1405
          reinterpret_cast<Field_varstring *>(tmp_table->field[field_num]);
 
1406
        ref_field->store((char *)tbl->file->ref, tbl->file->ref_length,
 
1407
                         &my_charset_bin);
 
1408
        field_num++;
 
1409
      } while ((tbl= tbl_it++));
 
1410
 
 
1411
      /* Store regular updated fields in the row. */
 
1412
      fill_record(session,
 
1413
                  tmp_table->field + 1 + unupdated_check_opt_tables.elements,
 
1414
                  *values_for_table[offset], 1);
 
1415
 
 
1416
      /* Write row, ignoring duplicated updates to a row */
 
1417
      error= tmp_table->file->ha_write_row(tmp_table->record[0]);
 
1418
      if (error != HA_ERR_FOUND_DUPP_KEY && error != HA_ERR_FOUND_DUPP_UNIQUE)
 
1419
      {
 
1420
        if (error &&
 
1421
            create_myisam_from_heap(session, tmp_table,
 
1422
                                         tmp_table_param[offset].start_recinfo,
 
1423
                                         &tmp_table_param[offset].recinfo,
 
1424
                                         error, 1))
 
1425
        {
 
1426
          do_update=0;
 
1427
          return(1);                    // Not a table_is_full error
 
1428
        }
 
1429
        found++;
 
1430
      }
 
1431
    }
 
1432
  }
 
1433
  return(0);
 
1434
}
 
1435
 
 
1436
 
 
1437
void multi_update::send_error(uint32_t errcode,const char *err)
 
1438
{
 
1439
  /* First send error what ever it is ... */
 
1440
  my_error(errcode, MYF(0), err);
 
1441
}
 
1442
 
 
1443
 
 
1444
void multi_update::abort()
 
1445
{
 
1446
  /* the error was handled or nothing deleted and no side effects return */
 
1447
  if (error_handled ||
 
1448
      (!session->transaction.stmt.modified_non_trans_table && !updated))
 
1449
    return;
 
1450
  /*
 
1451
    If all tables that has been updated are trans safe then just do rollback.
 
1452
    If not attempt to do remaining updates.
 
1453
  */
 
1454
 
 
1455
  if (! trans_safe)
 
1456
  {
 
1457
    assert(session->transaction.stmt.modified_non_trans_table);
 
1458
    if (do_update && table_count > 1)
 
1459
    {
 
1460
      /* Add warning here */
 
1461
      /*
 
1462
         todo/fixme: do_update() is never called with the arg 1.
 
1463
         should it change the signature to become argless?
 
1464
      */
 
1465
      do_updates();
 
1466
    }
 
1467
  }
 
1468
  if (session->transaction.stmt.modified_non_trans_table)
 
1469
  {
 
1470
    session->transaction.all.modified_non_trans_table= true;
 
1471
  }
 
1472
  assert(trans_safe || !updated || session->transaction.stmt.modified_non_trans_table);
 
1473
}
 
1474
 
 
1475
 
 
1476
int multi_update::do_updates()
 
1477
{
 
1478
  TableList *cur_table;
 
1479
  int local_error= 0;
 
1480
  ha_rows org_updated;
 
1481
  Table *table, *tmp_table;
 
1482
  List_iterator_fast<Table> check_opt_it(unupdated_check_opt_tables);
 
1483
 
 
1484
  do_update= 0;                                 // Don't retry this function
 
1485
  if (!found)
 
1486
    return(0);
 
1487
  for (cur_table= update_tables; cur_table; cur_table= cur_table->next_local)
 
1488
  {
 
1489
    bool can_compare_record;
 
1490
    uint32_t offset= cur_table->shared;
 
1491
 
 
1492
    table = cur_table->table;
 
1493
    if (table == table_to_update)
 
1494
      continue;                                 // Already updated
 
1495
    org_updated= updated;
 
1496
    tmp_table= tmp_tables[cur_table->shared];
 
1497
    tmp_table->file->extra(HA_EXTRA_CACHE);     // Change to read cache
 
1498
    (void) table->file->ha_rnd_init(0);
 
1499
    table->file->extra(HA_EXTRA_NO_CACHE);
 
1500
 
 
1501
    check_opt_it.rewind();
 
1502
    while(Table *tbl= check_opt_it++)
 
1503
    {
 
1504
      if (tbl->file->ha_rnd_init(1))
 
1505
        goto err;
 
1506
      tbl->file->extra(HA_EXTRA_CACHE);
 
1507
    }
 
1508
 
 
1509
    /*
 
1510
      Setup copy functions to copy fields from temporary table
 
1511
    */
 
1512
    List_iterator_fast<Item> field_it(*fields_for_table[offset]);
 
1513
    Field **field= tmp_table->field +
 
1514
                   1 + unupdated_check_opt_tables.elements; // Skip row pointers
 
1515
    Copy_field *copy_field_ptr= copy_field, *copy_field_end;
 
1516
    for ( ; *field ; field++)
 
1517
    {
 
1518
      Item_field *item= (Item_field* ) field_it++;
 
1519
      (copy_field_ptr++)->set(item->field, *field, 0);
 
1520
    }
 
1521
    copy_field_end=copy_field_ptr;
 
1522
 
 
1523
    if ((local_error = tmp_table->file->ha_rnd_init(1)))
 
1524
      goto err;
 
1525
 
 
1526
    can_compare_record= (!(table->file->ha_table_flags() &
 
1527
                           HA_PARTIAL_COLUMN_READ) ||
 
1528
                         bitmap_is_subset(table->write_set,
 
1529
                                          table->read_set));
 
1530
 
 
1531
    for (;;)
 
1532
    {
 
1533
      if (session->killed && trans_safe)
 
1534
        goto err;
 
1535
      if ((local_error=tmp_table->file->rnd_next(tmp_table->record[0])))
 
1536
      {
 
1537
        if (local_error == HA_ERR_END_OF_FILE)
 
1538
          break;
 
1539
        if (local_error == HA_ERR_RECORD_DELETED)
 
1540
          continue;                             // May happen on dup key
 
1541
        goto err;
 
1542
      }
 
1543
 
 
1544
      /* call rnd_pos() using rowids from temporary table */
 
1545
      check_opt_it.rewind();
 
1546
      Table *tbl= table;
 
1547
      uint32_t field_num= 0;
 
1548
      do
 
1549
      {
 
1550
        Field_varstring *ref_field=
 
1551
          reinterpret_cast<Field_varstring *>(tmp_table->field[field_num]);
 
1552
        if((local_error=
 
1553
              tbl->file->rnd_pos(tbl->record[0],
 
1554
                                (unsigned char *) ref_field->ptr
 
1555
                                 + ref_field->length_bytes)))
 
1556
          goto err;
 
1557
        field_num++;
 
1558
      } while((tbl= check_opt_it++));
 
1559
 
 
1560
      table->status|= STATUS_UPDATED;
 
1561
      store_record(table,record[1]);
 
1562
 
 
1563
      /* Copy data from temporary table to current table */
 
1564
      for (copy_field_ptr=copy_field;
 
1565
           copy_field_ptr != copy_field_end;
 
1566
           copy_field_ptr++)
 
1567
        (*copy_field_ptr->do_copy)(copy_field_ptr);
 
1568
 
 
1569
      if (!can_compare_record || table->compare_record())
 
1570
      {
 
1571
        if ((local_error=table->file->ha_update_row(table->record[1],
 
1572
                                                    table->record[0])) &&
 
1573
            local_error != HA_ERR_RECORD_IS_THE_SAME)
 
1574
        {
 
1575
          if (!ignore ||
 
1576
              table->file->is_fatal_error(local_error, HA_CHECK_DUP_KEY))
 
1577
            goto err;
 
1578
        }
 
1579
        if (local_error != HA_ERR_RECORD_IS_THE_SAME)
 
1580
          updated++;
 
1581
        else
 
1582
          local_error= 0;
 
1583
      }
 
1584
    }
 
1585
 
 
1586
    if (updated != org_updated)
 
1587
    {
 
1588
      if (table->file->has_transactions())
 
1589
        transactional_tables= 1;
 
1590
      else
 
1591
      {
 
1592
        trans_safe= 0;                          // Can't do safe rollback
 
1593
        session->transaction.stmt.modified_non_trans_table= true;
 
1594
      }
 
1595
    }
 
1596
    (void) table->file->ha_rnd_end();
 
1597
    (void) tmp_table->file->ha_rnd_end();
 
1598
    check_opt_it.rewind();
 
1599
    while (Table *tbl= check_opt_it++)
 
1600
        tbl->file->ha_rnd_end();
 
1601
 
 
1602
  }
 
1603
  return(0);
 
1604
 
 
1605
err:
 
1606
  {
 
1607
    prepare_record_for_error_message(local_error, table);
 
1608
    table->file->print_error(local_error,MYF(ME_FATALERROR));
 
1609
  }
 
1610
 
 
1611
  (void) table->file->ha_rnd_end();
 
1612
  (void) tmp_table->file->ha_rnd_end();
 
1613
  check_opt_it.rewind();
 
1614
  while (Table *tbl= check_opt_it++)
 
1615
      tbl->file->ha_rnd_end();
 
1616
 
 
1617
  if (updated != org_updated)
 
1618
  {
 
1619
    if (table->file->has_transactions())
 
1620
      transactional_tables= 1;
 
1621
    else
 
1622
    {
 
1623
      trans_safe= 0;
 
1624
      session->transaction.stmt.modified_non_trans_table= true;
 
1625
    }
 
1626
  }
 
1627
  return(1);
 
1628
}
 
1629
 
 
1630
 
 
1631
/* out: 1 if error, 0 if success */
 
1632
 
 
1633
bool multi_update::send_eof()
 
1634
{
 
1635
  char buff[STRING_BUFFER_USUAL_SIZE];
 
1636
  uint64_t id;
 
1637
  Session::killed_state killed_status= Session::NOT_KILLED;
 
1638
 
 
1639
  session->set_proc_info("updating reference tables");
 
1640
 
 
1641
  /*
 
1642
     Does updates for the last n - 1 tables, returns 0 if ok;
 
1643
     error takes into account killed status gained in do_updates()
 
1644
  */
 
1645
  int local_error = (table_count) ? do_updates() : 0;
 
1646
  /*
 
1647
    if local_error is not set ON until after do_updates() then
 
1648
    later carried out killing should not affect binlogging.
 
1649
  */
 
1650
  killed_status= (local_error == 0)? Session::NOT_KILLED : session->killed;
 
1651
  session->set_proc_info("end");
 
1652
 
 
1653
  /*
 
1654
    Write the SQL statement to the binlog if we updated
 
1655
    rows and we succeeded or if we updated some non
 
1656
    transactional tables.
 
1657
 
 
1658
    The query has to binlog because there's a modified non-transactional table
 
1659
    either from the query's list or via a stored routine: bug#13270,23333
 
1660
  */
 
1661
 
 
1662
  assert(trans_safe || !updated ||
 
1663
              session->transaction.stmt.modified_non_trans_table);
 
1664
  if (local_error == 0 || session->transaction.stmt.modified_non_trans_table)
 
1665
  {
 
1666
    if (session->transaction.stmt.modified_non_trans_table)
 
1667
      session->transaction.all.modified_non_trans_table= true;
 
1668
  }
 
1669
  if (local_error != 0)
 
1670
    error_handled= true; // to force early leave from ::send_error()
 
1671
 
 
1672
  if (local_error > 0) // if the above log write did not fail ...
 
1673
  {
 
1674
    /* Safety: If we haven't got an error before (can happen in do_updates) */
 
1675
    my_message(ER_UNKNOWN_ERROR, "An error occured in multi-table update",
 
1676
               MYF(0));
 
1677
    return(true);
 
1678
  }
 
1679
 
 
1680
  id= session->arg_of_last_insert_id_function ?
 
1681
    session->first_successful_insert_id_in_prev_stmt : 0;
 
1682
  sprintf(buff, ER(ER_UPDATE_INFO), (ulong) found, (ulong) updated,
 
1683
          (ulong) session->cuted_fields);
 
1684
  session->row_count_func=
 
1685
    (session->client_capabilities & CLIENT_FOUND_ROWS) ? found : updated;
 
1686
  ::my_ok(session, (ulong) session->row_count_func, id, buff);
 
1687
  return(false);
 
1688
}