~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_update.cc

  • Committer: Padraig O'Sullivan
  • Date: 2009-03-21 01:02:23 UTC
  • mto: (960.2.5 mordred)
  • mto: This revision was merged to the branch mainline in revision 961.
  • Revision ID: osullivan.padraig@gmail.com-20090321010223-j8cph7eeyt1u3xol
Fixed function object to ensure it correctly returns a boolean type since
memcmp returns an integer. Added some more comments.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
 
17
17
/*
18
18
  Single table and multi table updates of tables.
19
19
  Multi-table updates were introduced by Sinisa & Monty
20
20
*/
21
 
#include "config.h"
22
 
#include "drizzled/sql_select.h"
23
 
#include "drizzled/error.h"
24
 
#include "drizzled/probes.h"
25
 
#include "drizzled/sql_base.h"
26
 
#include "drizzled/field/timestamp.h"
27
 
#include "drizzled/sql_parse.h"
28
 
#include "drizzled/optimizer/range.h"
29
 
#include "drizzled/records.h"
30
 
#include "drizzled/internal/my_sys.h"
31
 
#include "drizzled/internal/iocache.h"
32
 
#include "drizzled/transaction_services.h"
33
 
#include "drizzled/filesort.h"
34
 
 
35
 
#include <boost/dynamic_bitset.hpp>
36
 
#include <list>
37
 
 
38
 
using namespace std;
39
 
 
40
 
namespace drizzled
 
21
#include <drizzled/server_includes.h>
 
22
#include <drizzled/sql_select.h>
 
23
#include <drizzled/error.h>
 
24
#include <drizzled/probes.h>
 
25
#include <drizzled/sql_base.h>
 
26
#include <drizzled/field/timestamp.h>
 
27
 
 
28
/*
 
29
  check that all fields are real fields
 
30
 
 
31
  SYNOPSIS
 
32
    check_fields()
 
33
    session             thread handler
 
34
    items           Items for check
 
35
 
 
36
  RETURN
 
37
    true  Items can't be used in UPDATE
 
38
    false Items are OK
 
39
*/
 
40
 
 
41
static bool check_fields(Session *session, List<Item> &items)
41
42
{
 
43
  List_iterator<Item> it(items);
 
44
  Item *item;
 
45
  Item_field *field;
 
46
 
 
47
  while ((item= it++))
 
48
  {
 
49
    if (!(field= item->filed_for_view_update()))
 
50
    {
 
51
      /* item has name, because it comes from VIEW SELECT list */
 
52
      my_error(ER_NONUPDATEABLE_COLUMN, MYF(0), item->name);
 
53
      return true;
 
54
    }
 
55
    /*
 
56
      we make temporary copy of Item_field, to avoid influence of changing
 
57
      result_field on Item_ref which refer on this field
 
58
    */
 
59
    session->change_item_tree(it.ref(), new Item_field(session, field));
 
60
  }
 
61
  return false;
 
62
}
 
63
 
42
64
 
43
65
/**
44
66
  Re-read record if more columns are needed for error message.
45
67
 
46
68
  If we got a duplicate key error, we want to write an error
47
69
  message containing the value of the duplicate key. If we do not have
48
 
  all fields of the key value in getInsertRecord(), we need to re-read the
 
70
  all fields of the key value in record[0], we need to re-read the
49
71
  record with a proper read_set.
50
72
 
51
73
  @param[in] error   error number
54
76
 
55
77
static void prepare_record_for_error_message(int error, Table *table)
56
78
{
57
 
  Field **field_p= NULL;
58
 
  Field *field= NULL;
59
 
  uint32_t keynr= 0;
 
79
  Field **field_p;
 
80
  Field *field;
 
81
  uint32_t keynr;
 
82
  MY_BITMAP unique_map; /* Fields in offended unique. */
 
83
  my_bitmap_map unique_map_buf[bitmap_buffer_size(MAX_FIELDS)];
60
84
 
61
85
  /*
62
86
    Only duplicate key errors print the key value.
63
87
    If storage engine does always read all columns, we have the value alraedy.
64
88
  */
65
89
  if ((error != HA_ERR_FOUND_DUPP_KEY) ||
66
 
      ! (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ)))
 
90
      !(table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ))
67
91
    return;
68
92
 
69
93
  /*
70
94
    Get the number of the offended index.
71
95
    We will see MAX_KEY if the engine cannot determine the affected index.
72
96
  */
73
 
  if ((keynr= table->get_dup_key(error)) >= MAX_KEY)
 
97
  if ((keynr= table->file->get_dup_key(error)) >= MAX_KEY)
74
98
    return;
75
99
 
76
100
  /* Create unique_map with all fields used by that index. */
77
 
  boost::dynamic_bitset<> unique_map(table->getShare()->sizeFields()); /* Fields in offended unique. */
78
 
  table->mark_columns_used_by_index_no_reset(keynr, unique_map);
 
101
  bitmap_init(&unique_map, unique_map_buf, table->s->fields, false);
 
102
  table->mark_columns_used_by_index_no_reset(keynr, &unique_map);
79
103
 
80
104
  /* Subtract read_set and write_set. */
81
 
  unique_map-= *table->read_set;
82
 
  unique_map-= *table->write_set;
 
105
  bitmap_subtract(&unique_map, table->read_set);
 
106
  bitmap_subtract(&unique_map, table->write_set);
83
107
 
84
108
  /*
85
109
    If the unique index uses columns that are neither in read_set
86
110
    nor in write_set, we must re-read the record.
87
111
    Otherwise no need to do anything.
88
112
  */
89
 
  if (unique_map.none())
 
113
  if (bitmap_is_clear_all(&unique_map))
90
114
    return;
91
115
 
92
 
  /* Get identifier of last read record into table->cursor->ref. */
93
 
  table->cursor->position(table->getInsertRecord());
 
116
  /* Get identifier of last read record into table->file->ref. */
 
117
  table->file->position(table->record[0]);
94
118
  /* Add all fields used by unique index to read_set. */
95
 
  *table->read_set|= unique_map;
96
 
  /* Read record that is identified by table->cursor->ref. */
97
 
  (void) table->cursor->rnd_pos(table->getUpdateRecord(), table->cursor->ref);
 
119
  bitmap_union(table->read_set, &unique_map);
 
120
  /* Tell the engine about the new set. */
 
121
  table->file->column_bitmaps_signal();
 
122
  /* Read record that is identified by table->file->ref. */
 
123
  (void) table->file->rnd_pos(table->record[1], table->file->ref);
98
124
  /* Copy the newly read columns into the new record. */
99
 
  for (field_p= table->getFields(); (field= *field_p); field_p++)
100
 
  {
101
 
    if (unique_map.test(field->position()))
102
 
    {
103
 
      field->copy_from_tmp(table->getShare()->rec_buff_length);
104
 
    }
105
 
  }
 
125
  for (field_p= table->field; (field= *field_p); field_p++)
 
126
    if (bitmap_is_set(&unique_map, field->field_index))
 
127
      field->copy_from_tmp(table->s->rec_buff_length);
106
128
 
107
129
  return;
108
130
}
117
139
    fields              fields for update
118
140
    values              values of fields for update
119
141
    conds               WHERE clause expression
120
 
    order_num           number of elemen in ORDER BY clause
 
142
    order_num           number of elemen in order_st BY clause
121
143
    order               order_st BY clause list
122
144
    limit               limit clause
123
145
    handle_duplicates   how to handle duplicates
129
151
 
130
152
int mysql_update(Session *session, TableList *table_list,
131
153
                 List<Item> &fields, List<Item> &values, COND *conds,
132
 
                 uint32_t order_num, Order *order,
 
154
                 uint32_t order_num, order_st *order,
133
155
                 ha_rows limit, enum enum_duplicates,
134
156
                 bool ignore)
135
157
{
136
158
  bool          using_limit= limit != HA_POS_ERROR;
137
 
  bool          used_key_is_modified;
138
 
  bool          transactional_table;
139
 
  int           error;
 
159
  bool          safe_update= test(session->options & OPTION_SAFE_UPDATES);
 
160
  bool          used_key_is_modified, transactional_table, will_batch;
 
161
  bool          can_compare_record;
 
162
  int           error, loc_error;
140
163
  uint          used_index= MAX_KEY, dup_key_found;
141
164
  bool          need_sort= true;
 
165
  uint32_t          table_count= 0;
142
166
  ha_rows       updated, found;
143
167
  key_map       old_covering_keys;
144
168
  Table         *table;
145
 
  optimizer::SqlSelect *select= NULL;
146
 
  ReadRecord    info;
 
169
  SQL_SELECT    *select;
 
170
  READ_RECORD   info;
147
171
  Select_Lex    *select_lex= &session->lex->select_lex;
 
172
  bool          need_reopen;
148
173
  uint64_t     id;
149
174
  List<Item> all_fields;
150
 
  Session::killed_state_t killed_status= Session::NOT_KILLED;
 
175
  Session::killed_state killed_status= Session::NOT_KILLED;
151
176
 
152
 
  DRIZZLE_UPDATE_START(session->getQueryString()->c_str());
153
 
  if (session->openTablesLock(table_list))
 
177
  for ( ; ; )
154
178
  {
155
 
    DRIZZLE_UPDATE_DONE(1, 0, 0);
156
 
    return 1;
 
179
    if (open_tables(session, &table_list, &table_count, 0))
 
180
      return(1);
 
181
 
 
182
    if (!lock_tables(session, table_list, table_count, &need_reopen))
 
183
      break;
 
184
    if (!need_reopen)
 
185
      return(1);
 
186
    close_tables_for_reopen(session, &table_list);
157
187
  }
158
188
 
 
189
  if (mysql_handle_derived(session->lex, &mysql_derived_prepare) ||
 
190
      (session->fill_derived_tables() &&
 
191
       mysql_handle_derived(session->lex, &mysql_derived_filling)))
 
192
    return(1);
 
193
 
 
194
  DRIZZLE_UPDATE_START();
159
195
  session->set_proc_info("init");
160
196
  table= table_list->table;
161
197
 
162
198
  /* Calculate "table->covering_keys" based on the WHERE */
163
 
  table->covering_keys= table->getShare()->keys_in_use;
164
 
  table->quick_keys.reset();
 
199
  table->covering_keys= table->s->keys_in_use;
 
200
  table->quick_keys.clear_all();
165
201
 
166
202
  if (mysql_prepare_update(session, table_list, &conds, order_num, order))
167
 
  {
168
 
    DRIZZLE_UPDATE_DONE(1, 0, 0);
169
 
    return 1;
170
 
  }
 
203
    goto abort;
171
204
 
172
205
  old_covering_keys= table->covering_keys;              // Keys used in WHERE
173
206
  /* Check the fields we are going to modify */
174
207
  if (setup_fields_with_no_wrap(session, 0, fields, MARK_COLUMNS_WRITE, 0, 0))
175
 
  {
176
 
    DRIZZLE_UPDATE_DONE(1, 0, 0);
177
 
    return 1;
178
 
  }
179
 
 
 
208
    goto abort;                               /* purecov: inspected */
180
209
  if (table->timestamp_field)
181
210
  {
182
211
    // Don't set timestamp column if this is modified
183
 
    if (table->timestamp_field->isWriteSet())
184
 
    {
 
212
    if (bitmap_is_set(table->write_set,
 
213
                      table->timestamp_field->field_index))
185
214
      table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
186
 
    }
187
215
    else
188
216
    {
189
217
      if (table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_UPDATE ||
190
218
          table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH)
191
 
      {
192
 
        table->setWriteSet(table->timestamp_field->position());
193
 
      }
 
219
        bitmap_set_bit(table->write_set,
 
220
                       table->timestamp_field->field_index);
194
221
    }
195
222
  }
196
223
 
197
224
  if (setup_fields(session, 0, values, MARK_COLUMNS_READ, 0, 0))
198
225
  {
199
226
    free_underlaid_joins(session, select_lex);
200
 
    DRIZZLE_UPDATE_DONE(1, 0, 0);
201
 
 
202
 
    return 1;
 
227
    goto abort;                               /* purecov: inspected */
203
228
  }
204
229
 
205
230
  if (select_lex->inner_refs_list.elements &&
206
231
    fix_inner_refs(session, all_fields, select_lex, select_lex->ref_pointer_array))
207
232
  {
208
 
    DRIZZLE_UPDATE_DONE(1, 0, 0);
209
 
    return -1;
 
233
    DRIZZLE_UPDATE_END();
 
234
    return(-1);
210
235
  }
211
236
 
212
237
  if (conds)
222
247
    update force the table handler to retrieve write-only fields to be able
223
248
    to compare records and detect data change.
224
249
  */
225
 
  if (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ) &&
 
250
  if (table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ &&
226
251
      table->timestamp_field &&
227
252
      (table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_UPDATE ||
228
253
       table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
229
 
  {
230
 
    *table->read_set|= *table->write_set;
231
 
  }
 
254
    bitmap_union(table->read_set, table->write_set);
232
255
  // Don't count on usage of 'only index' when calculating which key to use
233
 
  table->covering_keys.reset();
234
 
 
235
 
  /* Update the table->cursor->stats.records number */
236
 
  table->cursor->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK);
237
 
 
238
 
  select= optimizer::make_select(table, 0, 0, conds, 0, &error);
 
256
  table->covering_keys.clear_all();
 
257
 
 
258
  /* Update the table->file->stats.records number */
 
259
  table->file->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK);
 
260
 
 
261
  select= make_select(table, 0, 0, conds, 0, &error);
239
262
  if (error || !limit ||
240
 
      (select && select->check_quick(session, false, limit)))
 
263
      (select && select->check_quick(session, safe_update, limit)))
241
264
  {
242
265
    delete select;
243
 
    /**
244
 
     * Resetting the Diagnostic area to prevent
245
 
     * lp bug# 439719
246
 
     */
247
 
    session->main_da.reset_diagnostics_area();
248
266
    free_underlaid_joins(session, select_lex);
249
267
    if (error)
250
 
    {
251
 
      DRIZZLE_UPDATE_DONE(1, 0, 0);
252
 
      return 1;
253
 
    }
254
 
    DRIZZLE_UPDATE_DONE(0, 0, 0);
 
268
      goto abort;                               // Error in where
 
269
    DRIZZLE_UPDATE_END();
255
270
    session->my_ok();                           // No matching records
256
 
    return 0;
 
271
    return(0);
257
272
  }
258
273
  if (!select && limit != HA_POS_ERROR)
259
274
  {
260
 
    if ((used_index= optimizer::get_index_for_order(table, order, limit)) != MAX_KEY)
 
275
    if ((used_index= get_index_for_order(table, order, limit)) != MAX_KEY)
261
276
      need_sort= false;
262
277
  }
263
278
  /* If running in safe sql mode, don't allow updates without keys */
264
 
  if (table->quick_keys.none())
 
279
  if (table->quick_keys.is_clear_all())
265
280
  {
266
281
    session->server_status|=SERVER_QUERY_NO_INDEX_USED;
 
282
    if (safe_update && !using_limit)
 
283
    {
 
284
      my_message(ER_UPDATE_WITHOUT_KEY_IN_SAFE_MODE,
 
285
                 ER(ER_UPDATE_WITHOUT_KEY_IN_SAFE_MODE), MYF(0));
 
286
      goto err;
 
287
    }
267
288
  }
268
289
 
269
290
  table->mark_columns_needed_for_update();
274
295
  {
275
296
    used_index= select->quick->index;
276
297
    used_key_is_modified= (!select->quick->unique_key_range() &&
277
 
                          select->quick->is_keys_used(*table->write_set));
 
298
                          select->quick->is_keys_used(table->write_set));
278
299
  }
279
300
  else
280
301
  {
281
302
    used_key_is_modified= 0;
282
303
    if (used_index == MAX_KEY)                  // no index for sort order
283
 
      used_index= table->cursor->key_used_on_scan;
 
304
      used_index= table->file->key_used_on_scan;
284
305
    if (used_index != MAX_KEY)
285
 
      used_key_is_modified= is_key_used(table, used_index, *table->write_set);
 
306
      used_key_is_modified= is_key_used(table, used_index, table->write_set);
286
307
  }
287
308
 
288
309
 
292
313
      We can't update table directly;  We must first search after all
293
314
      matching rows before updating the table!
294
315
    */
295
 
    if (used_index < MAX_KEY && old_covering_keys.test(used_index))
 
316
    if (used_index < MAX_KEY && old_covering_keys.is_set(used_index))
296
317
    {
297
318
      table->key_read=1;
298
319
      table->mark_columns_used_by_index(used_index);
311
332
        NOTE: filesort will call table->prepare_for_position()
312
333
      */
313
334
      uint32_t         length= 0;
314
 
      SortField  *sortorder;
 
335
      SORT_FIELD  *sortorder;
315
336
      ha_rows examined_rows;
316
 
      FileSort filesort(*session);
317
337
 
318
 
      table->sort.io_cache= new internal::IO_CACHE;
 
338
      table->sort.io_cache = new IO_CACHE;
 
339
      memset(table->sort.io_cache, 0, sizeof(IO_CACHE));
319
340
 
320
341
      if (!(sortorder=make_unireg_sortorder(order, &length, NULL)) ||
321
 
          (table->sort.found_records= filesort.run(table, sortorder, length,
322
 
                                                   select, limit, 1,
323
 
                                                   examined_rows)) == HA_POS_ERROR)
 
342
          (table->sort.found_records= filesort(session, table, sortorder, length,
 
343
                                               select, limit, 1,
 
344
                                               &examined_rows))
 
345
          == HA_POS_ERROR)
324
346
      {
325
347
        goto err;
326
348
      }
339
361
        update these in a separate loop based on the pointer.
340
362
      */
341
363
 
342
 
      internal::IO_CACHE tempfile;
343
 
      if (tempfile.open_cached_file(drizzle_tmpdir.c_str(),TEMP_PREFIX, DISK_BUFFER_SIZE, MYF(MY_WME)))
344
 
      {
 
364
      IO_CACHE tempfile;
 
365
      if (open_cached_file(&tempfile, drizzle_tmpdir,TEMP_PREFIX,
 
366
                           DISK_BUFFER_SIZE, MYF(MY_WME)))
345
367
        goto err;
346
 
      }
347
368
 
348
369
      /* If quick select is used, initialize it before retrieving rows. */
349
370
      if (select && select->quick && select->quick->reset())
350
371
        goto err;
351
 
      table->cursor->try_semi_consistent_read(1);
 
372
      table->file->try_semi_consistent_read(1);
352
373
 
353
374
      /*
354
375
        When we get here, we have one of the following options:
362
383
      */
363
384
 
364
385
      if (used_index == MAX_KEY || (select && select->quick))
365
 
      {
366
 
        info.init_read_record(session, table, select, 0, true);
367
 
      }
 
386
        init_read_record(&info,session,table,select,0,1);
368
387
      else
369
 
      {
370
 
        info.init_read_record_idx(session, table, 1, used_index);
371
 
      }
 
388
        init_read_record_idx(&info, session, table, 1, used_index);
372
389
 
373
390
      session->set_proc_info("Searching rows for update");
374
391
      ha_rows tmp_limit= limit;
375
392
 
376
 
      while (not(error= info.read_record(&info)) && not session->getKilled())
 
393
      while (!(error=info.read_record(&info)) && !session->killed)
377
394
      {
378
395
        if (!(select && select->skip_record()))
379
396
        {
380
 
          if (table->cursor->was_semi_consistent_read())
 
397
          if (table->file->was_semi_consistent_read())
381
398
            continue;  /* repeat the read of the same row if it still exists */
382
399
 
383
 
          table->cursor->position(table->getInsertRecord());
384
 
          if (my_b_write(&tempfile,table->cursor->ref,
385
 
                         table->cursor->ref_length))
 
400
          table->file->position(table->record[0]);
 
401
          if (my_b_write(&tempfile,table->file->ref,
 
402
                         table->file->ref_length))
386
403
          {
387
 
            error=1;
388
 
            break;
 
404
            error=1; /* purecov: inspected */
 
405
            break; /* purecov: inspected */
389
406
          }
390
407
          if (!--limit && using_limit)
391
408
          {
394
411
          }
395
412
        }
396
413
        else
397
 
          table->cursor->unlock_row();
 
414
          table->file->unlock_row();
398
415
      }
399
 
      if (session->getKilled() && not error)
 
416
      if (session->killed && !error)
400
417
        error= 1;                               // Aborted
401
418
      limit= tmp_limit;
402
 
      table->cursor->try_semi_consistent_read(0);
403
 
      info.end_read_record();
 
419
      table->file->try_semi_consistent_read(0);
 
420
      end_read_record(&info);
404
421
 
405
422
      /* Change select to use tempfile */
406
423
      if (select)
413
430
      }
414
431
      else
415
432
      {
416
 
        select= new optimizer::SqlSelect;
 
433
        select= new SQL_SELECT;
417
434
        select->head=table;
418
435
      }
419
 
      if (tempfile.reinit_io_cache(internal::READ_CACHE,0L,0,0))
420
 
        error=1;
421
 
      // Read row ptrs from this cursor
422
 
      memcpy(select->file, &tempfile, sizeof(tempfile));
 
436
      if (reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
 
437
        error=1; /* purecov: inspected */
 
438
      select->file=tempfile;                    // Read row ptrs from this file
423
439
      if (error >= 0)
424
440
        goto err;
425
441
    }
428
444
  }
429
445
 
430
446
  if (ignore)
431
 
    table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
447
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
432
448
 
433
449
  if (select && select->quick && select->quick->reset())
434
450
    goto err;
435
 
  table->cursor->try_semi_consistent_read(1);
436
 
  info.init_read_record(session, table, select, 0, true);
 
451
  table->file->try_semi_consistent_read(1);
 
452
  init_read_record(&info,session,table,select,0,1);
437
453
 
438
454
  updated= found= 0;
439
455
  /*
448
464
  session->cuted_fields= 0L;
449
465
  session->set_proc_info("Updating");
450
466
 
451
 
  transactional_table= table->cursor->has_transactions();
 
467
  transactional_table= table->file->has_transactions();
452
468
  session->abort_on_warning= test(!ignore);
 
469
  will_batch= !table->file->start_bulk_update();
453
470
 
454
471
  /*
455
472
    Assure that we can use position()
456
473
    if we need to create an error message.
457
474
  */
458
 
  if (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ))
 
475
  if (table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ)
459
476
    table->prepare_for_position();
460
477
 
461
 
  while (not (error=info.read_record(&info)) && not session->getKilled())
 
478
  /*
 
479
    We can use compare_record() to optimize away updates if
 
480
    the table handler is returning all columns OR if
 
481
    if all updated columns are read
 
482
  */
 
483
  can_compare_record= (!(table->file->ha_table_flags() &
 
484
                         HA_PARTIAL_COLUMN_READ) ||
 
485
                       bitmap_is_subset(table->write_set, table->read_set));
 
486
 
 
487
  while (!(error=info.read_record(&info)) && !session->killed)
462
488
  {
463
 
    if (not (select && select->skip_record()))
 
489
    if (!(select && select->skip_record()))
464
490
    {
465
 
      if (table->cursor->was_semi_consistent_read())
 
491
      if (table->file->was_semi_consistent_read())
466
492
        continue;  /* repeat the read of the same row if it still exists */
467
493
 
468
 
      table->storeRecord();
469
 
      if (fill_record(session, fields, values))
470
 
        break;
 
494
      store_record(table,record[1]);
 
495
      if (fill_record(session, fields, values, 0))
 
496
        break; /* purecov: inspected */
471
497
 
472
498
      found++;
473
499
 
474
 
      if (! table->records_are_comparable() || table->compare_records())
 
500
      if (!can_compare_record || table->compare_record())
475
501
      {
476
 
        /* Non-batched update */
477
 
        error= table->cursor->updateRecord(table->getUpdateRecord(),
478
 
                                            table->getInsertRecord());
479
 
 
480
 
        table->auto_increment_field_not_null= false;
481
 
 
 
502
        if (will_batch)
 
503
        {
 
504
          /*
 
505
            Typically a batched handler can execute the batched jobs when:
 
506
            1) When specifically told to do so
 
507
            2) When it is not a good idea to batch anymore
 
508
            3) When it is necessary to send batch for other reasons
 
509
               (One such reason is when READ's must be performed)
 
510
 
 
511
            1) is covered by exec_bulk_update calls.
 
512
            2) and 3) is handled by the bulk_update_row method.
 
513
 
 
514
            bulk_update_row can execute the updates including the one
 
515
            defined in the bulk_update_row or not including the row
 
516
            in the call. This is up to the handler implementation and can
 
517
            vary from call to call.
 
518
 
 
519
            The dup_key_found reports the number of duplicate keys found
 
520
            in those updates actually executed. It only reports those if
 
521
            the extra call with HA_EXTRA_IGNORE_DUP_KEY have been issued.
 
522
            If this hasn't been issued it returns an error code and can
 
523
            ignore this number. Thus any handler that implements batching
 
524
            for UPDATE IGNORE must also handle this extra call properly.
 
525
 
 
526
            If a duplicate key is found on the record included in this
 
527
            call then it should be included in the count of dup_key_found
 
528
            and error should be set to 0 (only if these errors are ignored).
 
529
          */
 
530
          error= table->file->ha_bulk_update_row(table->record[1],
 
531
                                                 table->record[0],
 
532
                                                 &dup_key_found);
 
533
          limit+= dup_key_found;
 
534
          updated-= dup_key_found;
 
535
        }
 
536
        else
 
537
        {
 
538
          /* Non-batched update */
 
539
          error= table->file->ha_update_row(table->record[1],
 
540
                                            table->record[0]);
 
541
        }
482
542
        if (!error || error == HA_ERR_RECORD_IS_THE_SAME)
483
 
        {
 
543
        {
484
544
          if (error != HA_ERR_RECORD_IS_THE_SAME)
485
545
            updated++;
486
546
          else
487
547
            error= 0;
488
 
        }
489
 
        else if (! ignore ||
490
 
                 table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
491
 
        {
 
548
        }
 
549
        else if (!ignore ||
 
550
                 table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
 
551
        {
492
552
          /*
493
553
            If (ignore && error is ignorable) we don't have to
494
554
            do anything; otherwise...
495
555
          */
496
556
          myf flags= 0;
497
557
 
498
 
          if (table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
 
558
          if (table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
499
559
            flags|= ME_FATALERROR; /* Other handler errors are fatal */
500
560
 
501
561
          prepare_record_for_error_message(error, table);
502
 
          table->print_error(error,MYF(flags));
503
 
          error= 1;
504
 
          break;
505
 
        }
 
562
          table->file->print_error(error,MYF(flags));
 
563
          error= 1;
 
564
          break;
 
565
        }
506
566
      }
507
567
 
508
568
      if (!--limit && using_limit)
509
569
      {
510
 
        error= -1;                              // Simulate end of cursor
511
 
        break;
 
570
        /*
 
571
          We have reached end-of-file in most common situations where no
 
572
          batching has occurred and if batching was supposed to occur but
 
573
          no updates were made and finally when the batch execution was
 
574
          performed without error and without finding any duplicate keys.
 
575
          If the batched updates were performed with errors we need to
 
576
          check and if no error but duplicate key's found we need to
 
577
          continue since those are not counted for in limit.
 
578
        */
 
579
        if (will_batch &&
 
580
            ((error= table->file->exec_bulk_update(&dup_key_found)) ||
 
581
             dup_key_found))
 
582
        {
 
583
          if (error)
 
584
          {
 
585
            /* purecov: begin inspected */
 
586
            /*
 
587
              The handler should not report error of duplicate keys if they
 
588
              are ignored. This is a requirement on batching handlers.
 
589
            */
 
590
            prepare_record_for_error_message(error, table);
 
591
            table->file->print_error(error,MYF(0));
 
592
            error= 1;
 
593
            break;
 
594
            /* purecov: end */
 
595
          }
 
596
          /*
 
597
            Either an error was found and we are ignoring errors or there
 
598
            were duplicate keys found. In both cases we need to correct
 
599
            the counters and continue the loop.
 
600
          */
 
601
          limit= dup_key_found; //limit is 0 when we get here so need to +
 
602
          updated-= dup_key_found;
 
603
        }
 
604
        else
 
605
        {
 
606
          error= -1;                            // Simulate end of file
 
607
          break;
 
608
        }
512
609
      }
513
610
    }
514
611
    else
515
 
      table->cursor->unlock_row();
 
612
      table->file->unlock_row();
516
613
    session->row_count++;
517
614
  }
518
615
  dup_key_found= 0;
524
621
    It's assumed that if an error was set in combination with an effective
525
622
    killed status then the error is due to killing.
526
623
  */
527
 
  killed_status= session->getKilled(); // get the status of the volatile
 
624
  killed_status= session->killed; // get the status of the volatile
528
625
  // simulated killing after the loop must be ineffective for binlogging
529
626
  error= (killed_status == Session::NOT_KILLED)?  error : 1;
530
627
 
531
 
  updated-= dup_key_found;
532
 
  table->cursor->try_semi_consistent_read(0);
 
628
  if (error &&
 
629
      will_batch &&
 
630
      (loc_error= table->file->exec_bulk_update(&dup_key_found)))
 
631
    /*
 
632
      An error has occurred when a batched update was performed and returned
 
633
      an error indication. It cannot be an allowed duplicate key error since
 
634
      we require the batching handler to treat this as a normal behavior.
 
635
 
 
636
      Otherwise we simply remove the number of duplicate keys records found
 
637
      in the batched update.
 
638
    */
 
639
  {
 
640
    /* purecov: begin inspected */
 
641
    prepare_record_for_error_message(loc_error, table);
 
642
    table->file->print_error(loc_error,MYF(ME_FATALERROR));
 
643
    error= 1;
 
644
    /* purecov: end */
 
645
  }
 
646
  else
 
647
    updated-= dup_key_found;
 
648
  if (will_batch)
 
649
    table->file->end_bulk_update();
 
650
  table->file->try_semi_consistent_read(0);
533
651
 
534
652
  if (!transactional_table && updated > 0)
535
 
    session->transaction.stmt.markModifiedNonTransData();
 
653
    session->transaction.stmt.modified_non_trans_table= true;
536
654
 
537
 
  info.end_read_record();
 
655
  end_read_record(&info);
538
656
  delete select;
539
657
  session->set_proc_info("end");
540
 
  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
658
  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
541
659
 
542
660
  /*
543
661
    error < 0 means really no error at all: we processed all rows until the
544
662
    last one without error. error > 0 means an error (e.g. unique key
545
663
    violation and no IGNORE or REPLACE). error == 0 is also an error (if
546
664
    preparing the record or invoking before triggers fails). See
547
 
    autocommitOrRollback(error>=0) and return(error>=0) below.
 
665
    ha_autocommit_or_rollback(error>=0) and return(error>=0) below.
548
666
    Sometimes we want to binlog even if we updated no rows, in case user used
549
667
    it to be sure master and slave are in same state.
550
668
  */
551
 
  if ((error < 0) || session->transaction.stmt.hasModifiedNonTransData())
 
669
  if ((error < 0) || session->transaction.stmt.modified_non_trans_table)
552
670
  {
553
 
    if (session->transaction.stmt.hasModifiedNonTransData())
554
 
      session->transaction.all.markModifiedNonTransData();
 
671
    if (session->transaction.stmt.modified_non_trans_table)
 
672
      session->transaction.all.modified_non_trans_table= true;
555
673
  }
556
 
  assert(transactional_table || !updated || session->transaction.stmt.hasModifiedNonTransData());
 
674
  assert(transactional_table || !updated || session->transaction.stmt.modified_non_trans_table);
557
675
  free_underlaid_joins(session, select_lex);
558
676
 
559
677
  /* If LAST_INSERT_ID(X) was used, report X */
560
678
  id= session->arg_of_last_insert_id_function ?
561
679
    session->first_successful_insert_id_in_prev_stmt : 0;
562
680
 
 
681
  DRIZZLE_UPDATE_END();
563
682
  if (error < 0)
564
683
  {
565
684
    char buff[STRING_BUFFER_USUAL_SIZE];
566
 
    snprintf(buff, sizeof(buff), ER(ER_UPDATE_INFO), (ulong) found, (ulong) updated,
 
685
    sprintf(buff, ER(ER_UPDATE_INFO), (ulong) found, (ulong) updated,
567
686
            (ulong) session->cuted_fields);
568
 
    session->row_count_func= updated;
569
 
    /**
570
 
     * Resetting the Diagnostic area to prevent
571
 
     * lp bug# 439719
572
 
     */
573
 
    session->main_da.reset_diagnostics_area();
574
 
    session->my_ok((ulong) session->row_count_func, found, id, buff);
575
 
    session->status_var.updated_row_count+= session->row_count_func;
 
687
    session->row_count_func=
 
688
      (session->client_capabilities & CLIENT_FOUND_ROWS) ? found : updated;
 
689
    session->my_ok((ulong) session->row_count_func, id, buff);
576
690
  }
577
 
  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;              /* calc cuted fields */
 
691
  session->count_cuted_fields= CHECK_FIELD_IGNORE;              /* calc cuted fields */
578
692
  session->abort_on_warning= 0;
579
 
  DRIZZLE_UPDATE_DONE((error >= 0 || session->is_error()), found, updated);
580
 
  return ((error >= 0 || session->is_error()) ? 1 : 0);
 
693
  return((error >= 0 || session->is_error()) ? 1 : 0);
581
694
 
582
695
err:
583
696
  delete select;
585
698
  if (table->key_read)
586
699
  {
587
700
    table->key_read=0;
588
 
    table->cursor->extra(HA_EXTRA_NO_KEYREAD);
 
701
    table->file->extra(HA_EXTRA_NO_KEYREAD);
589
702
  }
590
703
  session->abort_on_warning= 0;
591
704
 
592
 
  DRIZZLE_UPDATE_DONE(1, 0, 0);
593
 
  return 1;
 
705
abort:
 
706
  DRIZZLE_UPDATE_END();
 
707
  return(1);
594
708
}
595
709
 
596
710
/*
601
715
    session                     - thread handler
602
716
    table_list          - global/local table list
603
717
    conds               - conditions
604
 
    order_num           - number of ORDER BY list entries
605
 
    order               - ORDER BY clause list
 
718
    order_num           - number of order_st BY list entries
 
719
    order               - order_st BY clause list
606
720
 
607
721
  RETURN VALUE
608
722
    false OK
609
723
    true  error
610
724
*/
611
725
bool mysql_prepare_update(Session *session, TableList *table_list,
612
 
                         Item **conds, uint32_t order_num, Order *order)
 
726
                         Item **conds, uint32_t order_num, order_st *order)
613
727
{
614
728
  List<Item> all_fields;
615
729
  Select_Lex *select_lex= &session->lex->select_lex;
621
735
                                    table_list,
622
736
                                    &select_lex->leaf_tables,
623
737
                                    false) ||
624
 
      session->setup_conds(table_list, conds) ||
 
738
      setup_conds(session, table_list, select_lex->leaf_tables, conds) ||
625
739
      select_lex->setup_ref_array(session, order_num) ||
626
740
      setup_order(session, select_lex->ref_pointer_array,
627
741
                  table_list, all_fields, all_fields, order))
630
744
  /* Check that we are not using table that we are updating in a sub select */
631
745
  {
632
746
    TableList *duplicate;
633
 
    if ((duplicate= unique_table(table_list, table_list->next_global)))
634
 
    {
635
 
      my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
636
 
      return true;
637
 
    }
638
 
  }
639
 
 
640
 
  return false;
641
 
}
642
 
 
643
 
} /* namespace drizzled */
 
747
    if ((duplicate= unique_table(session, table_list, table_list->next_global, 0)))
 
748
    {
 
749
      update_non_unique_table_error(table_list, "UPDATE", duplicate);
 
750
      my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->table_name);
 
751
      return true;
 
752
    }
 
753
  }
 
754
 
 
755
  return false;
 
756
}
 
757
 
 
758
 
 
759
/***************************************************************************
 
760
  Update multiple tables from join
 
761
***************************************************************************/
 
762
 
 
763
/*
 
764
  Get table map for list of Item_field
 
765
*/
 
766
 
 
767
static table_map get_table_map(List<Item> *items)
 
768
{
 
769
  List_iterator_fast<Item> item_it(*items);
 
770
  Item_field *item;
 
771
  table_map map= 0;
 
772
 
 
773
  while ((item= (Item_field *) item_it++))
 
774
    map|= item->used_tables();
 
775
  return map;
 
776
}
 
777
 
 
778
 
 
779
/*
 
780
  make update specific preparation and checks after opening tables
 
781
 
 
782
  SYNOPSIS
 
783
    mysql_multi_update_prepare()
 
784
    session         thread handler
 
785
 
 
786
  RETURN
 
787
    false OK
 
788
    true  Error
 
789
*/
 
790
 
 
791
int mysql_multi_update_prepare(Session *session)
 
792
{
 
793
  LEX *lex= session->lex;
 
794
  TableList *table_list= lex->query_tables;
 
795
  TableList *tl, *leaves;
 
796
  List<Item> *fields= &lex->select_lex.item_list;
 
797
  table_map tables_for_update;
 
798
  bool update_view= 0;
 
799
  /*
 
800
    if this multi-update was converted from usual update, here is table
 
801
    counter else junk will be assigned here, but then replaced with real
 
802
    count in open_tables()
 
803
  */
 
804
  uint32_t  table_count= lex->table_count;
 
805
  const bool using_lock_tables= session->locked_tables != 0;
 
806
  bool original_multiupdate= (session->lex->sql_command == SQLCOM_UPDATE_MULTI);
 
807
  bool need_reopen= false;
 
808
 
 
809
 
 
810
  /* following need for prepared statements, to run next time multi-update */
 
811
  session->lex->sql_command= SQLCOM_UPDATE_MULTI;
 
812
 
 
813
reopen_tables:
 
814
 
 
815
  /* open tables and create derived ones, but do not lock and fill them */
 
816
  if (((original_multiupdate || need_reopen) &&
 
817
       open_tables(session, &table_list, &table_count, 0)) ||
 
818
      mysql_handle_derived(lex, &mysql_derived_prepare))
 
819
    return true;
 
820
  /*
 
821
    setup_tables() need for VIEWs. JOIN::prepare() will call setup_tables()
 
822
    second time, but this call will do nothing (there are check for second
 
823
    call in setup_tables()).
 
824
  */
 
825
 
 
826
  if (setup_tables_and_check_access(session, &lex->select_lex.context,
 
827
                                    &lex->select_lex.top_join_list,
 
828
                                    table_list,
 
829
                                    &lex->select_lex.leaf_tables, false))
 
830
    return true;
 
831
 
 
832
  if (setup_fields_with_no_wrap(session, 0, *fields, MARK_COLUMNS_WRITE, 0, 0))
 
833
    return true;
 
834
 
 
835
  if (update_view && check_fields(session, *fields))
 
836
  {
 
837
    return true;
 
838
  }
 
839
 
 
840
  tables_for_update= get_table_map(fields);
 
841
 
 
842
  /*
 
843
    Setup timestamp handling and locking mode
 
844
  */
 
845
  leaves= lex->select_lex.leaf_tables;
 
846
  for (tl= leaves; tl; tl= tl->next_leaf)
 
847
  {
 
848
    Table *table= tl->table;
 
849
    /* Only set timestamp column if this is not modified */
 
850
    if (table->timestamp_field &&
 
851
        bitmap_is_set(table->write_set,
 
852
                      table->timestamp_field->field_index))
 
853
      table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
 
854
 
 
855
    /* if table will be updated then check that it is unique */
 
856
    if (table->map & tables_for_update)
 
857
    {
 
858
      table->mark_columns_needed_for_update();
 
859
      /*
 
860
        If table will be updated we should not downgrade lock for it and
 
861
        leave it as is.
 
862
      */
 
863
    }
 
864
    else
 
865
    {
 
866
      /*
 
867
        If we are using the binary log, we need TL_READ_NO_INSERT to get
 
868
        correct order of statements. Otherwise, we use a TL_READ lock to
 
869
        improve performance.
 
870
      */
 
871
      tl->lock_type= TL_READ;
 
872
      tl->updating= 0;
 
873
      /* Update Table::lock_type accordingly. */
 
874
      if (!tl->placeholder() && !using_lock_tables)
 
875
        tl->table->reginfo.lock_type= tl->lock_type;
 
876
    }
 
877
  }
 
878
 
 
879
  /* now lock and fill tables */
 
880
  if (lock_tables(session, table_list, table_count, &need_reopen))
 
881
  {
 
882
    if (!need_reopen)
 
883
      return true;
 
884
 
 
885
    /*
 
886
      We have to reopen tables since some of them were altered or dropped
 
887
      during lock_tables() or something was done with their triggers.
 
888
      Let us do some cleanups to be able do setup_table() and setup_fields()
 
889
      once again.
 
890
    */
 
891
    List_iterator_fast<Item> it(*fields);
 
892
    Item *item;
 
893
    while ((item= it++))
 
894
      item->cleanup();
 
895
 
 
896
    /* We have to cleanup translation tables of views. */
 
897
    for (TableList *tbl= table_list; tbl; tbl= tbl->next_global)
 
898
      tbl->cleanup_items();
 
899
 
 
900
    close_tables_for_reopen(session, &table_list);
 
901
    goto reopen_tables;
 
902
  }
 
903
 
 
904
  /*
 
905
    Check that we are not using table that we are updating, but we should
 
906
    skip all tables of UPDATE SELECT itself
 
907
  */
 
908
  lex->select_lex.exclude_from_table_unique_test= true;
 
909
  /* We only need SELECT privilege for columns in the values list */
 
910
  for (tl= leaves; tl; tl= tl->next_leaf)
 
911
  {
 
912
    if (tl->lock_type != TL_READ &&
 
913
        tl->lock_type != TL_READ_NO_INSERT)
 
914
    {
 
915
      TableList *duplicate;
 
916
      if ((duplicate= unique_table(session, tl, table_list, 0)))
 
917
      {
 
918
        update_non_unique_table_error(table_list, "UPDATE", duplicate);
 
919
        return true;
 
920
      }
 
921
    }
 
922
  }
 
923
  /*
 
924
    Set exclude_from_table_unique_test value back to false. It is needed for
 
925
    further check in multi_update::prepare whether to use record cache.
 
926
  */
 
927
  lex->select_lex.exclude_from_table_unique_test= false;
 
928
 
 
929
  if (session->fill_derived_tables() &&
 
930
      mysql_handle_derived(lex, &mysql_derived_filling))
 
931
    return true;
 
932
 
 
933
  return (false);
 
934
}
 
935
 
 
936
 
 
937
/*
 
938
  Setup multi-update handling and call SELECT to do the join
 
939
*/
 
940
 
 
941
bool mysql_multi_update(Session *session,
 
942
                        TableList *table_list,
 
943
                        List<Item> *fields,
 
944
                        List<Item> *values,
 
945
                        COND *conds,
 
946
                        uint64_t options,
 
947
                        enum enum_duplicates handle_duplicates, bool ignore,
 
948
                        Select_Lex_Unit *unit, Select_Lex *select_lex)
 
949
{
 
950
  multi_update *result;
 
951
  bool res;
 
952
 
 
953
  if (!(result= new multi_update(table_list,
 
954
                                 session->lex->select_lex.leaf_tables,
 
955
                                 fields, values,
 
956
                                 handle_duplicates, ignore)))
 
957
    return true;
 
958
 
 
959
  session->abort_on_warning= true;
 
960
 
 
961
  List<Item> total_list;
 
962
  res= mysql_select(session, &select_lex->ref_pointer_array,
 
963
                      table_list, select_lex->with_wild,
 
964
                      total_list,
 
965
                      conds, 0, (order_st *) NULL, (order_st *)NULL, (Item *) NULL,
 
966
                      options | SELECT_NO_JOIN_CACHE | SELECT_NO_UNLOCK |
 
967
                      OPTION_SETUP_TABLES_DONE,
 
968
                      result, unit, select_lex);
 
969
  res|= session->is_error();
 
970
  if (unlikely(res))
 
971
  {
 
972
    /* If we had a another error reported earlier then this will be ignored */
 
973
    result->send_error(ER_UNKNOWN_ERROR, ER(ER_UNKNOWN_ERROR));
 
974
    result->abort();
 
975
  }
 
976
  delete result;
 
977
  session->abort_on_warning= 0;
 
978
  return false;
 
979
}
 
980
 
 
981
 
 
982
multi_update::multi_update(TableList *table_list,
 
983
                           TableList *leaves_list,
 
984
                           List<Item> *field_list, List<Item> *value_list,
 
985
                           enum enum_duplicates handle_duplicates_arg,
 
986
                           bool ignore_arg)
 
987
  :all_tables(table_list), leaves(leaves_list), update_tables(0),
 
988
   tmp_tables(0), updated(0), found(0), fields(field_list),
 
989
   values(value_list), table_count(0), copy_field(0),
 
990
   handle_duplicates(handle_duplicates_arg), do_update(1), trans_safe(1),
 
991
   transactional_tables(0), ignore(ignore_arg), error_handled(0)
 
992
{}
 
993
 
 
994
 
 
995
/*
 
996
  Connect fields with tables and create list of tables that are updated
 
997
*/
 
998
 
 
999
int multi_update::prepare(List<Item> &,
 
1000
                          Select_Lex_Unit *)
 
1001
{
 
1002
  TableList *table_ref;
 
1003
  SQL_LIST update;
 
1004
  table_map tables_to_update;
 
1005
  Item_field *item;
 
1006
  List_iterator_fast<Item> field_it(*fields);
 
1007
  List_iterator_fast<Item> value_it(*values);
 
1008
  uint32_t i, max_fields;
 
1009
  uint32_t leaf_table_count= 0;
 
1010
 
 
1011
  session->count_cuted_fields= CHECK_FIELD_WARN;
 
1012
  session->cuted_fields=0L;
 
1013
  session->set_proc_info("updating main table");
 
1014
 
 
1015
  tables_to_update= get_table_map(fields);
 
1016
 
 
1017
  if (!tables_to_update)
 
1018
  {
 
1019
    my_message(ER_NO_TABLES_USED, ER(ER_NO_TABLES_USED), MYF(0));
 
1020
    return(1);
 
1021
  }
 
1022
 
 
1023
  /*
 
1024
    We have to check values after setup_tables to get covering_keys right in
 
1025
    reference tables
 
1026
  */
 
1027
 
 
1028
  if (setup_fields(session, 0, *values, MARK_COLUMNS_READ, 0, 0))
 
1029
    return(1);
 
1030
 
 
1031
  /*
 
1032
    Save tables beeing updated in update_tables
 
1033
    update_table->shared is position for table
 
1034
    Don't use key read on tables that are updated
 
1035
  */
 
1036
 
 
1037
  update.empty();
 
1038
  for (table_ref= leaves; table_ref; table_ref= table_ref->next_leaf)
 
1039
  {
 
1040
    /* TODO: add support of view of join support */
 
1041
    Table *table=table_ref->table;
 
1042
    leaf_table_count++;
 
1043
    if (tables_to_update & table->map)
 
1044
    {
 
1045
      TableList *tl= (TableList*) session->memdup((char*) table_ref,
 
1046
                                                sizeof(*tl));
 
1047
      if (!tl)
 
1048
        return(1);
 
1049
      update.link_in_list((unsigned char*) tl, (unsigned char**) &tl->next_local);
 
1050
      tl->shared= table_count++;
 
1051
      table->no_keyread=1;
 
1052
      table->covering_keys.clear_all();
 
1053
      table->pos_in_table_list= tl;
 
1054
    }
 
1055
  }
 
1056
 
 
1057
 
 
1058
  table_count=  update.elements;
 
1059
  update_tables= (TableList*) update.first;
 
1060
 
 
1061
  tmp_tables = (Table**) session->calloc(sizeof(Table *) * table_count);
 
1062
  tmp_table_param = (Tmp_Table_Param*) session->calloc(sizeof(Tmp_Table_Param) *
 
1063
                                                   table_count);
 
1064
  fields_for_table= (List_item **) session->alloc(sizeof(List_item *) *
 
1065
                                              table_count);
 
1066
  values_for_table= (List_item **) session->alloc(sizeof(List_item *) *
 
1067
                                              table_count);
 
1068
  if (session->is_fatal_error)
 
1069
    return(1);
 
1070
  for (i=0 ; i < table_count ; i++)
 
1071
  {
 
1072
    fields_for_table[i]= new List_item;
 
1073
    values_for_table[i]= new List_item;
 
1074
  }
 
1075
  if (session->is_fatal_error)
 
1076
    return(1);
 
1077
 
 
1078
  /* Split fields into fields_for_table[] and values_by_table[] */
 
1079
 
 
1080
  while ((item= (Item_field *) field_it++))
 
1081
  {
 
1082
    Item *value= value_it++;
 
1083
    uint32_t offset= item->field->table->pos_in_table_list->shared;
 
1084
    fields_for_table[offset]->push_back(item);
 
1085
    values_for_table[offset]->push_back(value);
 
1086
  }
 
1087
  if (session->is_fatal_error)
 
1088
    return(1);
 
1089
 
 
1090
  /* Allocate copy fields */
 
1091
  max_fields=0;
 
1092
  for (i=0 ; i < table_count ; i++)
 
1093
    set_if_bigger(max_fields, fields_for_table[i]->elements + leaf_table_count);
 
1094
  copy_field= new Copy_field[max_fields];
 
1095
  return(session->is_fatal_error != 0);
 
1096
}
 
1097
 
 
1098
 
 
1099
/*
 
1100
  Check if table is safe to update on fly
 
1101
 
 
1102
  SYNOPSIS
 
1103
    safe_update_on_fly()
 
1104
    session                 Thread handler
 
1105
    join_tab            How table is used in join
 
1106
    all_tables          List of tables
 
1107
 
 
1108
  NOTES
 
1109
    We can update the first table in join on the fly if we know that
 
1110
    a row in this table will never be read twice. This is true under
 
1111
    the following conditions:
 
1112
 
 
1113
    - We are doing a table scan and the data is in a separate file (MyISAM) or
 
1114
      if we don't update a clustered key.
 
1115
 
 
1116
    - We are doing a range scan and we don't update the scan key or
 
1117
      the primary key for a clustered table handler.
 
1118
 
 
1119
    - Table is not joined to itself.
 
1120
 
 
1121
    This function gets information about fields to be updated from
 
1122
    the Table::write_set bitmap.
 
1123
 
 
1124
  WARNING
 
1125
    This code is a bit dependent of how make_join_readinfo() works.
 
1126
 
 
1127
  RETURN
 
1128
    0           Not safe to update
 
1129
    1           Safe to update
 
1130
*/
 
1131
 
 
1132
static bool safe_update_on_fly(Session *session, JOIN_TAB *join_tab,
 
1133
                               TableList *table_ref, TableList *all_tables)
 
1134
{
 
1135
  Table *table= join_tab->table;
 
1136
  if (unique_table(session, table_ref, all_tables, 0))
 
1137
    return 0;
 
1138
  switch (join_tab->type) {
 
1139
  case JT_SYSTEM:
 
1140
  case JT_CONST:
 
1141
  case JT_EQ_REF:
 
1142
    return true;                                // At most one matching row
 
1143
  case JT_REF:
 
1144
  case JT_REF_OR_NULL:
 
1145
    return !is_key_used(table, join_tab->ref.key, table->write_set);
 
1146
  case JT_ALL:
 
1147
    /* If range search on index */
 
1148
    if (join_tab->quick)
 
1149
      return !join_tab->quick->is_keys_used(table->write_set);
 
1150
    /* If scanning in clustered key */
 
1151
    if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_IN_READ_INDEX) &&
 
1152
        table->s->primary_key < MAX_KEY)
 
1153
      return !is_key_used(table, table->s->primary_key, table->write_set);
 
1154
    return true;
 
1155
  default:
 
1156
    break;                                      // Avoid compler warning
 
1157
  }
 
1158
  return false;
 
1159
 
 
1160
}
 
1161
 
 
1162
 
 
1163
/*
 
1164
  Initialize table for multi table
 
1165
 
 
1166
  IMPLEMENTATION
 
1167
    - Update first table in join on the fly, if possible
 
1168
    - Create temporary tables to store changed values for all other tables
 
1169
      that are updated (and main_table if the above doesn't hold).
 
1170
*/
 
1171
 
 
1172
bool
 
1173
multi_update::initialize_tables(JOIN *join)
 
1174
{
 
1175
  TableList *table_ref;
 
1176
 
 
1177
  if ((session->options & OPTION_SAFE_UPDATES) && error_if_full_join(join))
 
1178
    return(1);
 
1179
  main_table=join->join_tab->table;
 
1180
  table_to_update= 0;
 
1181
 
 
1182
  /* Any update has at least one pair (field, value) */
 
1183
  assert(fields->elements);
 
1184
 
 
1185
  /* Create a temporary table for keys to all tables, except main table */
 
1186
  for (table_ref= update_tables; table_ref; table_ref= table_ref->next_local)
 
1187
  {
 
1188
    Table *table=table_ref->table;
 
1189
    uint32_t cnt= table_ref->shared;
 
1190
    List<Item> temp_fields;
 
1191
    order_st     group;
 
1192
    Tmp_Table_Param *tmp_param;
 
1193
 
 
1194
    table->mark_columns_needed_for_update();
 
1195
    if (ignore)
 
1196
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
1197
    if (table == main_table)                    // First table in join
 
1198
    {
 
1199
      if (safe_update_on_fly(session, join->join_tab, table_ref, all_tables))
 
1200
      {
 
1201
        table_to_update= main_table;            // Update table on the fly
 
1202
        continue;
 
1203
      }
 
1204
    }
 
1205
    table->prepare_for_position();
 
1206
 
 
1207
    tmp_param= tmp_table_param+cnt;
 
1208
 
 
1209
    /*
 
1210
      Create a temporary table to store all fields that are changed for this
 
1211
      table. The first field in the temporary table is a pointer to the
 
1212
      original row so that we can find and update it. For the updatable
 
1213
      VIEW a few following fields are rowids of tables used in the CHECK
 
1214
      OPTION condition.
 
1215
    */
 
1216
 
 
1217
    List_iterator_fast<Table> tbl_it(unupdated_check_opt_tables);
 
1218
    Table *tbl= table;
 
1219
    do
 
1220
    {
 
1221
      Field_varstring *field= new Field_varstring(tbl->file->ref_length, 0,
 
1222
                                                  tbl->alias, tbl->s, &my_charset_bin);
 
1223
      if (!field)
 
1224
        return(1);
 
1225
      field->init(tbl);
 
1226
      /*
 
1227
        The field will be converted to varstring when creating tmp table if
 
1228
        table to be updated was created by mysql 4.1. Deny this.
 
1229
      */
 
1230
      Item_field *ifield= new Item_field((Field *) field);
 
1231
      if (!ifield)
 
1232
         return(1);
 
1233
      ifield->maybe_null= 0;
 
1234
      if (temp_fields.push_back(ifield))
 
1235
        return(1);
 
1236
    } while ((tbl= tbl_it++));
 
1237
 
 
1238
    temp_fields.concat(fields_for_table[cnt]);
 
1239
 
 
1240
    /* Make an unique key over the first field to avoid duplicated updates */
 
1241
    memset(&group, 0, sizeof(group));
 
1242
    group.asc= 1;
 
1243
    group.item= (Item**) temp_fields.head_ref();
 
1244
 
 
1245
    tmp_param->quick_group=1;
 
1246
    tmp_param->field_count=temp_fields.elements;
 
1247
    tmp_param->group_parts=1;
 
1248
    tmp_param->group_length= table->file->ref_length;
 
1249
    if (!(tmp_tables[cnt]=create_tmp_table(session,
 
1250
                                           tmp_param,
 
1251
                                           temp_fields,
 
1252
                                           (order_st*) &group, 0, 0,
 
1253
                                           TMP_TABLE_ALL_COLUMNS,
 
1254
                                           HA_POS_ERROR,
 
1255
                                           (char *) "")))
 
1256
      return(1);
 
1257
    tmp_tables[cnt]->file->extra(HA_EXTRA_WRITE_CACHE);
 
1258
  }
 
1259
  return(0);
 
1260
}
 
1261
 
 
1262
 
 
1263
multi_update::~multi_update()
 
1264
{
 
1265
  TableList *table;
 
1266
  for (table= update_tables ; table; table= table->next_local)
 
1267
  {
 
1268
    table->table->no_keyread= table->table->no_cache= 0;
 
1269
    if (ignore)
 
1270
      table->table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1271
  }
 
1272
 
 
1273
  if (tmp_tables)
 
1274
  {
 
1275
    for (uint32_t cnt = 0; cnt < table_count; cnt++)
 
1276
    {
 
1277
      if (tmp_tables[cnt])
 
1278
      {
 
1279
        tmp_tables[cnt]->free_tmp_table(session);
 
1280
        tmp_table_param[cnt].cleanup();
 
1281
      }
 
1282
    }
 
1283
  }
 
1284
  if (copy_field)
 
1285
    delete [] copy_field;
 
1286
  session->count_cuted_fields= CHECK_FIELD_IGNORE;              // Restore this setting
 
1287
  assert(trans_safe || !updated ||
 
1288
              session->transaction.all.modified_non_trans_table);
 
1289
}
 
1290
 
 
1291
 
 
1292
bool multi_update::send_data(List<Item> &)
 
1293
{
 
1294
  TableList *cur_table;
 
1295
 
 
1296
  for (cur_table= update_tables; cur_table; cur_table= cur_table->next_local)
 
1297
  {
 
1298
    Table *table= cur_table->table;
 
1299
    uint32_t offset= cur_table->shared;
 
1300
    /*
 
1301
      Check if we are using outer join and we didn't find the row
 
1302
      or if we have already updated this row in the previous call to this
 
1303
      function.
 
1304
 
 
1305
      The same row may be presented here several times in a join of type
 
1306
      UPDATE t1 FROM t1,t2 SET t1.a=t2.a
 
1307
 
 
1308
      In this case we will do the update for the first found row combination.
 
1309
      The join algorithm guarantees that we will not find the a row in
 
1310
      t1 several times.
 
1311
    */
 
1312
    if (table->status & (STATUS_NULL_ROW | STATUS_UPDATED))
 
1313
      continue;
 
1314
 
 
1315
    /*
 
1316
      We can use compare_record() to optimize away updates if
 
1317
      the table handler is returning all columns OR if
 
1318
      if all updated columns are read
 
1319
    */
 
1320
    if (table == table_to_update)
 
1321
    {
 
1322
      bool can_compare_record;
 
1323
      can_compare_record= (!(table->file->ha_table_flags() &
 
1324
                             HA_PARTIAL_COLUMN_READ) ||
 
1325
                           bitmap_is_subset(table->write_set,
 
1326
                                            table->read_set));
 
1327
      table->status|= STATUS_UPDATED;
 
1328
      store_record(table,record[1]);
 
1329
      if (fill_record(session, *fields_for_table[offset],
 
1330
                      *values_for_table[offset], 0))
 
1331
        return(1);
 
1332
 
 
1333
      found++;
 
1334
      if (!can_compare_record || table->compare_record())
 
1335
      {
 
1336
        int error;
 
1337
        if (!updated++)
 
1338
        {
 
1339
          /*
 
1340
            Inform the main table that we are going to update the table even
 
1341
            while we may be scanning it.  This will flush the read cache
 
1342
            if it's used.
 
1343
          */
 
1344
          main_table->file->extra(HA_EXTRA_PREPARE_FOR_UPDATE);
 
1345
        }
 
1346
        if ((error=table->file->ha_update_row(table->record[1],
 
1347
                                              table->record[0])) &&
 
1348
            error != HA_ERR_RECORD_IS_THE_SAME)
 
1349
        {
 
1350
          updated--;
 
1351
          if (!ignore ||
 
1352
              table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
 
1353
          {
 
1354
            /*
 
1355
              If (ignore && error == is ignorable) we don't have to
 
1356
              do anything; otherwise...
 
1357
            */
 
1358
            myf flags= 0;
 
1359
 
 
1360
            if (table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
 
1361
              flags|= ME_FATALERROR; /* Other handler errors are fatal */
 
1362
 
 
1363
            prepare_record_for_error_message(error, table);
 
1364
            table->file->print_error(error,MYF(flags));
 
1365
            return(1);
 
1366
          }
 
1367
        }
 
1368
        else
 
1369
        {
 
1370
          if (error == HA_ERR_RECORD_IS_THE_SAME)
 
1371
          {
 
1372
            error= 0;
 
1373
            updated--;
 
1374
          }
 
1375
          /* non-transactional or transactional table got modified   */
 
1376
          /* either multi_update class' flag is raised in its branch */
 
1377
          if (table->file->has_transactions())
 
1378
            transactional_tables= 1;
 
1379
          else
 
1380
          {
 
1381
            trans_safe= 0;
 
1382
            session->transaction.stmt.modified_non_trans_table= true;
 
1383
          }
 
1384
        }
 
1385
      }
 
1386
    }
 
1387
    else
 
1388
    {
 
1389
      int error;
 
1390
      Table *tmp_table= tmp_tables[offset];
 
1391
      /*
 
1392
       For updatable VIEW store rowid of the updated table and
 
1393
       rowids of tables used in the CHECK OPTION condition.
 
1394
      */
 
1395
      uint32_t field_num= 0;
 
1396
      List_iterator_fast<Table> tbl_it(unupdated_check_opt_tables);
 
1397
      Table *tbl= table;
 
1398
      do
 
1399
      {
 
1400
        tbl->file->position(tbl->record[0]);
 
1401
        Field_varstring *ref_field=
 
1402
          reinterpret_cast<Field_varstring *>(tmp_table->field[field_num]);
 
1403
        ref_field->store((char *)tbl->file->ref, tbl->file->ref_length,
 
1404
                         &my_charset_bin);
 
1405
        field_num++;
 
1406
      } while ((tbl= tbl_it++));
 
1407
 
 
1408
      /* Store regular updated fields in the row. */
 
1409
      fill_record(session,
 
1410
                  tmp_table->field + 1 + unupdated_check_opt_tables.elements,
 
1411
                  *values_for_table[offset], 1);
 
1412
 
 
1413
      /* Write row, ignoring duplicated updates to a row */
 
1414
      error= tmp_table->file->ha_write_row(tmp_table->record[0]);
 
1415
      if (error != HA_ERR_FOUND_DUPP_KEY && error != HA_ERR_FOUND_DUPP_UNIQUE)
 
1416
      {
 
1417
        if (error &&
 
1418
            create_myisam_from_heap(session, tmp_table,
 
1419
                                         tmp_table_param[offset].start_recinfo,
 
1420
                                         &tmp_table_param[offset].recinfo,
 
1421
                                         error, 1))
 
1422
        {
 
1423
          do_update=0;
 
1424
          return(1);                    // Not a table_is_full error
 
1425
        }
 
1426
        found++;
 
1427
      }
 
1428
    }
 
1429
  }
 
1430
  return(0);
 
1431
}
 
1432
 
 
1433
 
 
1434
void multi_update::send_error(uint32_t errcode,const char *err)
 
1435
{
 
1436
  /* First send error what ever it is ... */
 
1437
  my_error(errcode, MYF(0), err);
 
1438
}
 
1439
 
 
1440
 
 
1441
void multi_update::abort()
 
1442
{
 
1443
  /* the error was handled or nothing deleted and no side effects return */
 
1444
  if (error_handled ||
 
1445
      (!session->transaction.stmt.modified_non_trans_table && !updated))
 
1446
    return;
 
1447
  /*
 
1448
    If all tables that has been updated are trans safe then just do rollback.
 
1449
    If not attempt to do remaining updates.
 
1450
  */
 
1451
 
 
1452
  if (! trans_safe)
 
1453
  {
 
1454
    assert(session->transaction.stmt.modified_non_trans_table);
 
1455
    if (do_update && table_count > 1)
 
1456
    {
 
1457
      /* Add warning here */
 
1458
      /*
 
1459
         todo/fixme: do_update() is never called with the arg 1.
 
1460
         should it change the signature to become argless?
 
1461
      */
 
1462
      do_updates();
 
1463
    }
 
1464
  }
 
1465
  if (session->transaction.stmt.modified_non_trans_table)
 
1466
  {
 
1467
    session->transaction.all.modified_non_trans_table= true;
 
1468
  }
 
1469
  assert(trans_safe || !updated || session->transaction.stmt.modified_non_trans_table);
 
1470
}
 
1471
 
 
1472
 
 
1473
int multi_update::do_updates()
 
1474
{
 
1475
  TableList *cur_table;
 
1476
  int local_error= 0;
 
1477
  ha_rows org_updated;
 
1478
  Table *table, *tmp_table;
 
1479
  List_iterator_fast<Table> check_opt_it(unupdated_check_opt_tables);
 
1480
 
 
1481
  do_update= 0;                                 // Don't retry this function
 
1482
  if (!found)
 
1483
    return(0);
 
1484
  for (cur_table= update_tables; cur_table; cur_table= cur_table->next_local)
 
1485
  {
 
1486
    bool can_compare_record;
 
1487
    uint32_t offset= cur_table->shared;
 
1488
 
 
1489
    table = cur_table->table;
 
1490
    if (table == table_to_update)
 
1491
      continue;                                 // Already updated
 
1492
    org_updated= updated;
 
1493
    tmp_table= tmp_tables[cur_table->shared];
 
1494
    tmp_table->file->extra(HA_EXTRA_CACHE);     // Change to read cache
 
1495
    (void) table->file->ha_rnd_init(0);
 
1496
    table->file->extra(HA_EXTRA_NO_CACHE);
 
1497
 
 
1498
    check_opt_it.rewind();
 
1499
    while(Table *tbl= check_opt_it++)
 
1500
    {
 
1501
      if (tbl->file->ha_rnd_init(1))
 
1502
        goto err;
 
1503
      tbl->file->extra(HA_EXTRA_CACHE);
 
1504
    }
 
1505
 
 
1506
    /*
 
1507
      Setup copy functions to copy fields from temporary table
 
1508
    */
 
1509
    List_iterator_fast<Item> field_it(*fields_for_table[offset]);
 
1510
    Field **field= tmp_table->field +
 
1511
                   1 + unupdated_check_opt_tables.elements; // Skip row pointers
 
1512
    Copy_field *copy_field_ptr= copy_field, *copy_field_end;
 
1513
    for ( ; *field ; field++)
 
1514
    {
 
1515
      Item_field *item= (Item_field* ) field_it++;
 
1516
      (copy_field_ptr++)->set(item->field, *field, 0);
 
1517
    }
 
1518
    copy_field_end=copy_field_ptr;
 
1519
 
 
1520
    if ((local_error = tmp_table->file->ha_rnd_init(1)))
 
1521
      goto err;
 
1522
 
 
1523
    can_compare_record= (!(table->file->ha_table_flags() &
 
1524
                           HA_PARTIAL_COLUMN_READ) ||
 
1525
                         bitmap_is_subset(table->write_set,
 
1526
                                          table->read_set));
 
1527
 
 
1528
    for (;;)
 
1529
    {
 
1530
      if (session->killed && trans_safe)
 
1531
        goto err;
 
1532
      if ((local_error=tmp_table->file->rnd_next(tmp_table->record[0])))
 
1533
      {
 
1534
        if (local_error == HA_ERR_END_OF_FILE)
 
1535
          break;
 
1536
        if (local_error == HA_ERR_RECORD_DELETED)
 
1537
          continue;                             // May happen on dup key
 
1538
        goto err;
 
1539
      }
 
1540
 
 
1541
      /* call rnd_pos() using rowids from temporary table */
 
1542
      check_opt_it.rewind();
 
1543
      Table *tbl= table;
 
1544
      uint32_t field_num= 0;
 
1545
      do
 
1546
      {
 
1547
        Field_varstring *ref_field=
 
1548
          reinterpret_cast<Field_varstring *>(tmp_table->field[field_num]);
 
1549
        if((local_error=
 
1550
              tbl->file->rnd_pos(tbl->record[0],
 
1551
                                (unsigned char *) ref_field->ptr
 
1552
                                 + ref_field->length_bytes)))
 
1553
          goto err;
 
1554
        field_num++;
 
1555
      } while((tbl= check_opt_it++));
 
1556
 
 
1557
      table->status|= STATUS_UPDATED;
 
1558
      store_record(table,record[1]);
 
1559
 
 
1560
      /* Copy data from temporary table to current table */
 
1561
      for (copy_field_ptr=copy_field;
 
1562
           copy_field_ptr != copy_field_end;
 
1563
           copy_field_ptr++)
 
1564
        (*copy_field_ptr->do_copy)(copy_field_ptr);
 
1565
 
 
1566
      if (!can_compare_record || table->compare_record())
 
1567
      {
 
1568
        if ((local_error=table->file->ha_update_row(table->record[1],
 
1569
                                                    table->record[0])) &&
 
1570
            local_error != HA_ERR_RECORD_IS_THE_SAME)
 
1571
        {
 
1572
          if (!ignore ||
 
1573
              table->file->is_fatal_error(local_error, HA_CHECK_DUP_KEY))
 
1574
            goto err;
 
1575
        }
 
1576
        if (local_error != HA_ERR_RECORD_IS_THE_SAME)
 
1577
          updated++;
 
1578
        else
 
1579
          local_error= 0;
 
1580
      }
 
1581
    }
 
1582
 
 
1583
    if (updated != org_updated)
 
1584
    {
 
1585
      if (table->file->has_transactions())
 
1586
        transactional_tables= 1;
 
1587
      else
 
1588
      {
 
1589
        trans_safe= 0;                          // Can't do safe rollback
 
1590
        session->transaction.stmt.modified_non_trans_table= true;
 
1591
      }
 
1592
    }
 
1593
    (void) table->file->ha_rnd_end();
 
1594
    (void) tmp_table->file->ha_rnd_end();
 
1595
    check_opt_it.rewind();
 
1596
    while (Table *tbl= check_opt_it++)
 
1597
        tbl->file->ha_rnd_end();
 
1598
 
 
1599
  }
 
1600
  return(0);
 
1601
 
 
1602
err:
 
1603
  {
 
1604
    prepare_record_for_error_message(local_error, table);
 
1605
    table->file->print_error(local_error,MYF(ME_FATALERROR));
 
1606
  }
 
1607
 
 
1608
  (void) table->file->ha_rnd_end();
 
1609
  (void) tmp_table->file->ha_rnd_end();
 
1610
  check_opt_it.rewind();
 
1611
  while (Table *tbl= check_opt_it++)
 
1612
      tbl->file->ha_rnd_end();
 
1613
 
 
1614
  if (updated != org_updated)
 
1615
  {
 
1616
    if (table->file->has_transactions())
 
1617
      transactional_tables= 1;
 
1618
    else
 
1619
    {
 
1620
      trans_safe= 0;
 
1621
      session->transaction.stmt.modified_non_trans_table= true;
 
1622
    }
 
1623
  }
 
1624
  return(1);
 
1625
}
 
1626
 
 
1627
 
 
1628
/* out: 1 if error, 0 if success */
 
1629
 
 
1630
bool multi_update::send_eof()
 
1631
{
 
1632
  char buff[STRING_BUFFER_USUAL_SIZE];
 
1633
  uint64_t id;
 
1634
  Session::killed_state killed_status= Session::NOT_KILLED;
 
1635
 
 
1636
  session->set_proc_info("updating reference tables");
 
1637
 
 
1638
  /*
 
1639
     Does updates for the last n - 1 tables, returns 0 if ok;
 
1640
     error takes into account killed status gained in do_updates()
 
1641
  */
 
1642
  int local_error = (table_count) ? do_updates() : 0;
 
1643
  /*
 
1644
    if local_error is not set ON until after do_updates() then
 
1645
    later carried out killing should not affect binlogging.
 
1646
  */
 
1647
  killed_status= (local_error == 0)? Session::NOT_KILLED : session->killed;
 
1648
  session->set_proc_info("end");
 
1649
 
 
1650
  /*
 
1651
    Write the SQL statement to the binlog if we updated
 
1652
    rows and we succeeded or if we updated some non
 
1653
    transactional tables.
 
1654
 
 
1655
    The query has to binlog because there's a modified non-transactional table
 
1656
    either from the query's list or via a stored routine: bug#13270,23333
 
1657
  */
 
1658
 
 
1659
  assert(trans_safe || !updated ||
 
1660
              session->transaction.stmt.modified_non_trans_table);
 
1661
  if (local_error == 0 || session->transaction.stmt.modified_non_trans_table)
 
1662
  {
 
1663
    if (session->transaction.stmt.modified_non_trans_table)
 
1664
      session->transaction.all.modified_non_trans_table= true;
 
1665
  }
 
1666
  if (local_error != 0)
 
1667
    error_handled= true; // to force early leave from ::send_error()
 
1668
 
 
1669
  if (local_error > 0) // if the above log write did not fail ...
 
1670
  {
 
1671
    /* Safety: If we haven't got an error before (can happen in do_updates) */
 
1672
    my_message(ER_UNKNOWN_ERROR, "An error occured in multi-table update",
 
1673
               MYF(0));
 
1674
    return true;
 
1675
  }
 
1676
 
 
1677
  id= session->arg_of_last_insert_id_function ?
 
1678
    session->first_successful_insert_id_in_prev_stmt : 0;
 
1679
  sprintf(buff, ER(ER_UPDATE_INFO), (ulong) found, (ulong) updated,
 
1680
          (ulong) session->cuted_fields);
 
1681
  session->row_count_func=
 
1682
    (session->client_capabilities & CLIENT_FOUND_ROWS) ? found : updated;
 
1683
  session->my_ok((ulong) session->row_count_func, id, buff);
 
1684
 
 
1685
  return false;
 
1686
}