~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to sql/sql_insert.cc

  • Committer: brian
  • Date: 2008-06-25 05:29:13 UTC
  • Revision ID: brian@localhost.localdomain-20080625052913-6upwo0jsrl4lnapl
clean slate

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2000-2006 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
/* Insert of records */
 
18
 
 
19
/*
 
20
  INSERT DELAYED
 
21
 
 
22
  Drizzle has a different form of DELAYED then MySQL. DELAYED is just
 
23
  a hint to the the sorage engine (which can then do whatever it likes.
 
24
*/
 
25
 
 
26
#include "mysql_priv.h"
 
27
#include "sql_select.h"
 
28
#include "sql_show.h"
 
29
#include "slave.h"
 
30
#include "rpl_mi.h"
 
31
 
 
32
/* Define to force use of my_malloc() if the allocated memory block is big */
 
33
 
 
34
#ifndef HAVE_ALLOCA
 
35
#define my_safe_alloca(size, min_length) my_alloca(size)
 
36
#define my_safe_afree(ptr, size, min_length) my_afree(ptr)
 
37
#else
 
38
#define my_safe_alloca(size, min_length) ((size <= min_length) ? my_alloca(size) : my_malloc(size,MYF(0)))
 
39
#define my_safe_afree(ptr, size, min_length) if (size > min_length) my_free(ptr,MYF(0))
 
40
#endif
 
41
 
 
42
 
 
43
 
 
44
/*
 
45
  Check if insert fields are correct.
 
46
 
 
47
  SYNOPSIS
 
48
    check_insert_fields()
 
49
    thd                         The current thread.
 
50
    table                       The table for insert.
 
51
    fields                      The insert fields.
 
52
    values                      The insert values.
 
53
    check_unique                If duplicate values should be rejected.
 
54
 
 
55
  NOTE
 
56
    Clears TIMESTAMP_AUTO_SET_ON_INSERT from table->timestamp_field_type
 
57
    or leaves it as is, depending on if timestamp should be updated or
 
58
    not.
 
59
 
 
60
  RETURN
 
61
    0           OK
 
62
    -1          Error
 
63
*/
 
64
 
 
65
static int check_insert_fields(THD *thd, TABLE_LIST *table_list,
 
66
                               List<Item> &fields, List<Item> &values,
 
67
                               bool check_unique, table_map *map)
 
68
{
 
69
  TABLE *table= table_list->table;
 
70
 
 
71
  if (fields.elements == 0 && values.elements != 0)
 
72
  {
 
73
    if (values.elements != table->s->fields)
 
74
    {
 
75
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
 
76
      return -1;
 
77
    }
 
78
    clear_timestamp_auto_bits(table->timestamp_field_type,
 
79
                              TIMESTAMP_AUTO_SET_ON_INSERT);
 
80
    /*
 
81
      No fields are provided so all fields must be provided in the values.
 
82
      Thus we set all bits in the write set.
 
83
    */
 
84
    bitmap_set_all(table->write_set);
 
85
  }
 
86
  else
 
87
  {                                             // Part field list
 
88
    SELECT_LEX *select_lex= &thd->lex->select_lex;
 
89
    Name_resolution_context *context= &select_lex->context;
 
90
    Name_resolution_context_state ctx_state;
 
91
    int res;
 
92
 
 
93
    if (fields.elements != values.elements)
 
94
    {
 
95
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
 
96
      return -1;
 
97
    }
 
98
 
 
99
    thd->dup_field= 0;
 
100
    select_lex->no_wrap_view_item= TRUE;
 
101
 
 
102
    /* Save the state of the current name resolution context. */
 
103
    ctx_state.save_state(context, table_list);
 
104
 
 
105
    /*
 
106
      Perform name resolution only in the first table - 'table_list',
 
107
      which is the table that is inserted into.
 
108
    */
 
109
    table_list->next_local= 0;
 
110
    context->resolve_in_table_list_only(table_list);
 
111
    res= setup_fields(thd, 0, fields, MARK_COLUMNS_WRITE, 0, 0);
 
112
 
 
113
    /* Restore the current context. */
 
114
    ctx_state.restore_state(context, table_list);
 
115
    thd->lex->select_lex.no_wrap_view_item= FALSE;
 
116
 
 
117
    if (res)
 
118
      return -1;
 
119
 
 
120
    if (check_unique && thd->dup_field)
 
121
    {
 
122
      my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), thd->dup_field->field_name);
 
123
      return -1;
 
124
    }
 
125
    if (table->timestamp_field) // Don't automaticly set timestamp if used
 
126
    {
 
127
      if (bitmap_is_set(table->write_set,
 
128
                        table->timestamp_field->field_index))
 
129
        clear_timestamp_auto_bits(table->timestamp_field_type,
 
130
                                  TIMESTAMP_AUTO_SET_ON_INSERT);
 
131
      else
 
132
      {
 
133
        bitmap_set_bit(table->write_set,
 
134
                       table->timestamp_field->field_index);
 
135
      }
 
136
    }
 
137
  }
 
138
 
 
139
  return 0;
 
140
}
 
141
 
 
142
 
 
143
/*
 
144
  Check update fields for the timestamp field.
 
145
 
 
146
  SYNOPSIS
 
147
    check_update_fields()
 
148
    thd                         The current thread.
 
149
    insert_table_list           The insert table list.
 
150
    table                       The table for update.
 
151
    update_fields               The update fields.
 
152
 
 
153
  NOTE
 
154
    If the update fields include the timestamp field,
 
155
    remove TIMESTAMP_AUTO_SET_ON_UPDATE from table->timestamp_field_type.
 
156
 
 
157
  RETURN
 
158
    0           OK
 
159
    -1          Error
 
160
*/
 
161
 
 
162
static int check_update_fields(THD *thd, TABLE_LIST *insert_table_list,
 
163
                               List<Item> &update_fields, table_map *map)
 
164
{
 
165
  TABLE *table= insert_table_list->table;
 
166
  my_bool timestamp_mark= false;
 
167
 
 
168
  if (table->timestamp_field)
 
169
  {
 
170
    /*
 
171
      Unmark the timestamp field so that we can check if this is modified
 
172
      by update_fields
 
173
    */
 
174
    timestamp_mark= bitmap_test_and_clear(table->write_set,
 
175
                                          table->timestamp_field->field_index);
 
176
  }
 
177
 
 
178
  /* Check the fields we are going to modify */
 
179
  if (setup_fields(thd, 0, update_fields, MARK_COLUMNS_WRITE, 0, 0))
 
180
    return -1;
 
181
 
 
182
  if (table->timestamp_field)
 
183
  {
 
184
    /* Don't set timestamp column if this is modified. */
 
185
    if (bitmap_is_set(table->write_set,
 
186
                      table->timestamp_field->field_index))
 
187
      clear_timestamp_auto_bits(table->timestamp_field_type,
 
188
                                TIMESTAMP_AUTO_SET_ON_UPDATE);
 
189
    if (timestamp_mark)
 
190
      bitmap_set_bit(table->write_set,
 
191
                     table->timestamp_field->field_index);
 
192
  }
 
193
  return 0;
 
194
}
 
195
 
 
196
 
 
197
/**
 
198
  Upgrade table-level lock of INSERT statement to TL_WRITE if
 
199
  a more concurrent lock is infeasible for some reason. This is
 
200
  necessary for engines without internal locking support (MyISAM).
 
201
  An engine with internal locking implementation might later
 
202
  downgrade the lock in handler::store_lock() method.
 
203
*/
 
204
 
 
205
static
 
206
void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
 
207
                       enum_duplicates duplic,
 
208
                       bool is_multi_insert)
 
209
{
 
210
  if (duplic == DUP_UPDATE ||
 
211
      (duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT))
 
212
  {
 
213
    *lock_type= TL_WRITE_DEFAULT;
 
214
    return;
 
215
  }
 
216
}
 
217
 
 
218
 
 
219
/**
 
220
  INSERT statement implementation
 
221
 
 
222
  @note Like implementations of other DDL/DML in MySQL, this function
 
223
  relies on the caller to close the thread tables. This is done in the
 
224
  end of dispatch_command().
 
225
*/
 
226
 
 
227
bool mysql_insert(THD *thd,TABLE_LIST *table_list,
 
228
                  List<Item> &fields,
 
229
                  List<List_item> &values_list,
 
230
                  List<Item> &update_fields,
 
231
                  List<Item> &update_values,
 
232
                  enum_duplicates duplic,
 
233
                  bool ignore)
 
234
{
 
235
  int error;
 
236
  bool transactional_table, joins_freed= FALSE;
 
237
  bool changed;
 
238
  bool was_insert_delayed= (table_list->lock_type ==  TL_WRITE_DELAYED);
 
239
  uint value_count;
 
240
  ulong counter = 1;
 
241
  ulonglong id;
 
242
  COPY_INFO info;
 
243
  TABLE *table= 0;
 
244
  List_iterator_fast<List_item> its(values_list);
 
245
  List_item *values;
 
246
  Name_resolution_context *context;
 
247
  Name_resolution_context_state ctx_state;
 
248
  thr_lock_type lock_type;
 
249
  Item *unused_conds= 0;
 
250
  DBUG_ENTER("mysql_insert");
 
251
 
 
252
  /*
 
253
    Upgrade lock type if the requested lock is incompatible with
 
254
    the current connection mode or table operation.
 
255
  */
 
256
  upgrade_lock_type(thd, &table_list->lock_type, duplic,
 
257
                    values_list.elements > 1);
 
258
 
 
259
  /*
 
260
    We can't write-delayed into a table locked with LOCK TABLES:
 
261
    this will lead to a deadlock, since the delayed thread will
 
262
    never be able to get a lock on the table. QQQ: why not
 
263
    upgrade the lock here instead?
 
264
  */
 
265
  if (table_list->lock_type == TL_WRITE_DELAYED && thd->locked_tables &&
 
266
      find_locked_table(thd, table_list->db, table_list->table_name))
 
267
  {
 
268
    my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
 
269
             table_list->table_name);
 
270
    DBUG_RETURN(TRUE);
 
271
  }
 
272
 
 
273
  {
 
274
    if (open_and_lock_tables(thd, table_list))
 
275
      DBUG_RETURN(TRUE);
 
276
  }
 
277
  lock_type= table_list->lock_type;
 
278
 
 
279
  thd_proc_info(thd, "init");
 
280
  thd->used_tables=0;
 
281
  values= its++;
 
282
  value_count= values->elements;
 
283
 
 
284
  if (mysql_prepare_insert(thd, table_list, table, fields, values,
 
285
                           update_fields, update_values, duplic, &unused_conds,
 
286
                           FALSE,
 
287
                           (fields.elements || !value_count ||
 
288
                            (0) != 0), !ignore))
 
289
    goto abort;
 
290
 
 
291
  /* mysql_prepare_insert set table_list->table if it was not set */
 
292
  table= table_list->table;
 
293
 
 
294
  context= &thd->lex->select_lex.context;
 
295
  /*
 
296
    These three asserts test the hypothesis that the resetting of the name
 
297
    resolution context below is not necessary at all since the list of local
 
298
    tables for INSERT always consists of one table.
 
299
  */
 
300
  DBUG_ASSERT(!table_list->next_local);
 
301
  DBUG_ASSERT(!context->table_list->next_local);
 
302
  DBUG_ASSERT(!context->first_name_resolution_table->next_name_resolution_table);
 
303
 
 
304
  /* Save the state of the current name resolution context. */
 
305
  ctx_state.save_state(context, table_list);
 
306
 
 
307
  /*
 
308
    Perform name resolution only in the first table - 'table_list',
 
309
    which is the table that is inserted into.
 
310
  */
 
311
  table_list->next_local= 0;
 
312
  context->resolve_in_table_list_only(table_list);
 
313
 
 
314
  while ((values= its++))
 
315
  {
 
316
    counter++;
 
317
    if (values->elements != value_count)
 
318
    {
 
319
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
 
320
      goto abort;
 
321
    }
 
322
    if (setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0))
 
323
      goto abort;
 
324
  }
 
325
  its.rewind ();
 
326
 
 
327
  /* Restore the current context. */
 
328
  ctx_state.restore_state(context, table_list);
 
329
 
 
330
  /*
 
331
    Fill in the given fields and dump it to the table file
 
332
  */
 
333
  bzero((char*) &info,sizeof(info));
 
334
  info.ignore= ignore;
 
335
  info.handle_duplicates=duplic;
 
336
  info.update_fields= &update_fields;
 
337
  info.update_values= &update_values;
 
338
 
 
339
  /*
 
340
    Count warnings for all inserts.
 
341
    For single line insert, generate an error if try to set a NOT NULL field
 
342
    to NULL.
 
343
  */
 
344
  thd->count_cuted_fields= ((values_list.elements == 1 &&
 
345
                             !ignore) ?
 
346
                            CHECK_FIELD_ERROR_FOR_NULL :
 
347
                            CHECK_FIELD_WARN);
 
348
  thd->cuted_fields = 0L;
 
349
  table->next_number_field=table->found_next_number_field;
 
350
 
 
351
#ifdef HAVE_REPLICATION
 
352
  if (thd->slave_thread &&
 
353
      (info.handle_duplicates == DUP_UPDATE) &&
 
354
      (table->next_number_field != NULL) &&
 
355
      rpl_master_has_bug(&active_mi->rli, 24432))
 
356
    goto abort;
 
357
#endif
 
358
 
 
359
  error=0;
 
360
  thd_proc_info(thd, "update");
 
361
  if (duplic == DUP_REPLACE)
 
362
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
363
  if (duplic == DUP_UPDATE)
 
364
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
365
  {
 
366
    if (duplic != DUP_ERROR || ignore)
 
367
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
368
    table->file->ha_start_bulk_insert(values_list.elements);
 
369
  }
 
370
 
 
371
 
 
372
  thd->abort_on_warning= !ignore;
 
373
 
 
374
  table->mark_columns_needed_for_insert();
 
375
 
 
376
  while ((values= its++))
 
377
  {
 
378
    if (fields.elements || !value_count)
 
379
    {
 
380
      restore_record(table,s->default_values);  // Get empty record
 
381
      if (fill_record(thd, fields, *values, 0))
 
382
      {
 
383
        if (values_list.elements != 1 && ! thd->is_error())
 
384
        {
 
385
          info.records++;
 
386
          continue;
 
387
        }
 
388
        /*
 
389
          TODO: set thd->abort_on_warning if values_list.elements == 1
 
390
          and check that all items return warning in case of problem with
 
391
          storing field.
 
392
        */
 
393
        error=1;
 
394
        break;
 
395
      }
 
396
    }
 
397
    else
 
398
    {
 
399
      if (thd->used_tables)                     // Column used in values()
 
400
        restore_record(table,s->default_values);        // Get empty record
 
401
      else
 
402
      {
 
403
        /*
 
404
          Fix delete marker. No need to restore rest of record since it will
 
405
          be overwritten by fill_record() anyway (and fill_record() does not
 
406
          use default values in this case).
 
407
        */
 
408
        table->record[0][0]= table->s->default_values[0];
 
409
      }
 
410
      if (fill_record(thd, table->field, *values, 0))
 
411
      {
 
412
        if (values_list.elements != 1 && ! thd->is_error())
 
413
        {
 
414
          info.records++;
 
415
          continue;
 
416
        }
 
417
        error=1;
 
418
        break;
 
419
      }
 
420
    }
 
421
 
 
422
    error=write_record(thd, table ,&info);
 
423
    if (error)
 
424
      break;
 
425
    thd->row_count++;
 
426
  }
 
427
 
 
428
  free_underlaid_joins(thd, &thd->lex->select_lex);
 
429
  joins_freed= TRUE;
 
430
 
 
431
  /*
 
432
    Now all rows are inserted.  Time to update logs and sends response to
 
433
    user
 
434
  */
 
435
  {
 
436
    /*
 
437
      Do not do this release if this is a delayed insert, it would steal
 
438
      auto_inc values from the delayed_insert thread as they share TABLE.
 
439
    */
 
440
    table->file->ha_release_auto_increment();
 
441
    if (table->file->ha_end_bulk_insert() && !error)
 
442
    {
 
443
      table->file->print_error(my_errno,MYF(0));
 
444
      error=1;
 
445
    }
 
446
    if (duplic != DUP_ERROR || ignore)
 
447
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
448
 
 
449
    transactional_table= table->file->has_transactions();
 
450
 
 
451
    if ((changed= (info.copied || info.deleted || info.updated)))
 
452
    {
 
453
      /*
 
454
        Invalidate the table in the query cache if something changed.
 
455
        For the transactional algorithm to work the invalidation must be
 
456
        before binlog writing and ha_autocommit_or_rollback
 
457
      */
 
458
    }
 
459
    if ((changed && error <= 0) || thd->transaction.stmt.modified_non_trans_table || was_insert_delayed)
 
460
    {
 
461
      if (mysql_bin_log.is_open())
 
462
      {
 
463
        if (error <= 0)
 
464
        {
 
465
          /*
 
466
            [Guilhem wrote] Temporary errors may have filled
 
467
            thd->net.last_error/errno.  For example if there has
 
468
            been a disk full error when writing the row, and it was
 
469
            MyISAM, then thd->net.last_error/errno will be set to
 
470
            "disk full"... and the my_pwrite() will wait until free
 
471
            space appears, and so when it finishes then the
 
472
            write_row() was entirely successful
 
473
          */
 
474
          /* todo: consider removing */
 
475
          thd->clear_error();
 
476
        }
 
477
        /* bug#22725:
 
478
 
 
479
        A query which per-row-loop can not be interrupted with
 
480
        KILLED, like INSERT, and that does not invoke stored
 
481
        routines can be binlogged with neglecting the KILLED error.
 
482
        
 
483
        If there was no error (error == zero) until after the end of
 
484
        inserting loop the KILLED flag that appeared later can be
 
485
        disregarded since previously possible invocation of stored
 
486
        routines did not result in any error due to the KILLED.  In
 
487
        such case the flag is ignored for constructing binlog event.
 
488
        */
 
489
        DBUG_ASSERT(thd->killed != THD::KILL_BAD_DATA || error > 0);
 
490
        if (thd->binlog_query(THD::ROW_QUERY_TYPE,
 
491
                              thd->query, thd->query_length,
 
492
                              transactional_table, FALSE,
 
493
                              (error>0) ? thd->killed : THD::NOT_KILLED) &&
 
494
            transactional_table)
 
495
        {
 
496
          error=1;
 
497
        }
 
498
      }
 
499
      if (thd->transaction.stmt.modified_non_trans_table)
 
500
        thd->transaction.all.modified_non_trans_table= TRUE;
 
501
    }
 
502
    DBUG_ASSERT(transactional_table || !changed || 
 
503
                thd->transaction.stmt.modified_non_trans_table);
 
504
 
 
505
  }
 
506
  thd_proc_info(thd, "end");
 
507
  /*
 
508
    We'll report to the client this id:
 
509
    - if the table contains an autoincrement column and we successfully
 
510
    inserted an autogenerated value, the autogenerated value.
 
511
    - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
 
512
    called, X.
 
513
    - if the table contains an autoincrement column, and some rows were
 
514
    inserted, the id of the last "inserted" row (if IGNORE, that value may not
 
515
    have been really inserted but ignored).
 
516
  */
 
517
  id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
 
518
    thd->first_successful_insert_id_in_cur_stmt :
 
519
    (thd->arg_of_last_insert_id_function ?
 
520
     thd->first_successful_insert_id_in_prev_stmt :
 
521
     ((table->next_number_field && info.copied) ?
 
522
     table->next_number_field->val_int() : 0));
 
523
  table->next_number_field=0;
 
524
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
 
525
  table->auto_increment_field_not_null= FALSE;
 
526
  if (duplic == DUP_REPLACE)
 
527
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
528
 
 
529
  if (error)
 
530
    goto abort;
 
531
  if (values_list.elements == 1 && (!(thd->options & OPTION_WARNINGS) ||
 
532
                                    !thd->cuted_fields))
 
533
  {
 
534
    thd->row_count_func= info.copied + info.deleted +
 
535
                         ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
 
536
                          info.touched : info.updated);
 
537
    my_ok(thd, (ulong) thd->row_count_func, id);
 
538
  }
 
539
  else
 
540
  {
 
541
    char buff[160];
 
542
    ha_rows updated=((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
 
543
                     info.touched : info.updated);
 
544
    if (ignore)
 
545
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
 
546
              (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
 
547
    else
 
548
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
 
549
              (ulong) (info.deleted + updated), (ulong) thd->cuted_fields);
 
550
    thd->row_count_func= info.copied + info.deleted + updated;
 
551
    ::my_ok(thd, (ulong) thd->row_count_func, id, buff);
 
552
  }
 
553
  thd->abort_on_warning= 0;
 
554
  MYSQL_INSERT_END();
 
555
  DBUG_RETURN(FALSE);
 
556
 
 
557
abort:
 
558
  if (table != NULL)
 
559
    table->file->ha_release_auto_increment();
 
560
  if (!joins_freed)
 
561
    free_underlaid_joins(thd, &thd->lex->select_lex);
 
562
  thd->abort_on_warning= 0;
 
563
  MYSQL_INSERT_END();
 
564
  DBUG_RETURN(TRUE);
 
565
}
 
566
 
 
567
 
 
568
/*
 
569
  Check if table can be updated
 
570
 
 
571
  SYNOPSIS
 
572
     mysql_prepare_insert_check_table()
 
573
     thd                Thread handle
 
574
     table_list         Table list
 
575
     fields             List of fields to be updated
 
576
     where              Pointer to where clause
 
577
     select_insert      Check is making for SELECT ... INSERT
 
578
 
 
579
   RETURN
 
580
     FALSE ok
 
581
     TRUE  ERROR
 
582
*/
 
583
 
 
584
static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list,
 
585
                                             List<Item> &fields,
 
586
                                             bool select_insert)
 
587
{
 
588
  DBUG_ENTER("mysql_prepare_insert_check_table");
 
589
 
 
590
  /*
 
591
     first table in list is the one we'll INSERT into, requires INSERT_ACL.
 
592
     all others require SELECT_ACL only. the ACL requirement below is for
 
593
     new leaves only anyway (view-constituents), so check for SELECT rather
 
594
     than INSERT.
 
595
  */
 
596
 
 
597
  if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
 
598
                                    &thd->lex->select_lex.top_join_list,
 
599
                                    table_list,
 
600
                                    &thd->lex->select_lex.leaf_tables,
 
601
                                    select_insert))
 
602
    DBUG_RETURN(TRUE);
 
603
 
 
604
  DBUG_RETURN(FALSE);
 
605
}
 
606
 
 
607
 
 
608
/*
 
609
  Prepare items in INSERT statement
 
610
 
 
611
  SYNOPSIS
 
612
    mysql_prepare_insert()
 
613
    thd                 Thread handler
 
614
    table_list          Global/local table list
 
615
    table               Table to insert into (can be NULL if table should
 
616
                        be taken from table_list->table)    
 
617
    where               Where clause (for insert ... select)
 
618
    select_insert       TRUE if INSERT ... SELECT statement
 
619
    check_fields        TRUE if need to check that all INSERT fields are 
 
620
                        given values.
 
621
    abort_on_warning    whether to report if some INSERT field is not 
 
622
                        assigned as an error (TRUE) or as a warning (FALSE).
 
623
 
 
624
  TODO (in far future)
 
625
    In cases of:
 
626
    INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
 
627
    ON DUPLICATE KEY ...
 
628
    we should be able to refer to sum1 in the ON DUPLICATE KEY part
 
629
 
 
630
  WARNING
 
631
    You MUST set table->insert_values to 0 after calling this function
 
632
    before releasing the table object.
 
633
  
 
634
  RETURN VALUE
 
635
    FALSE OK
 
636
    TRUE  error
 
637
*/
 
638
 
 
639
bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
 
640
                          TABLE *table, List<Item> &fields, List_item *values,
 
641
                          List<Item> &update_fields, List<Item> &update_values,
 
642
                          enum_duplicates duplic,
 
643
                          COND **where, bool select_insert,
 
644
                          bool check_fields, bool abort_on_warning)
 
645
{
 
646
  SELECT_LEX *select_lex= &thd->lex->select_lex;
 
647
  Name_resolution_context *context= &select_lex->context;
 
648
  Name_resolution_context_state ctx_state;
 
649
  bool insert_into_view= (0 != 0);
 
650
  bool res= 0;
 
651
  table_map map= 0;
 
652
  DBUG_ENTER("mysql_prepare_insert");
 
653
  DBUG_PRINT("enter", ("table_list 0x%lx, table 0x%lx, view %d",
 
654
                       (ulong)table_list, (ulong)table,
 
655
                       (int)insert_into_view));
 
656
  /* INSERT should have a SELECT or VALUES clause */
 
657
  DBUG_ASSERT (!select_insert || !values);
 
658
 
 
659
  /*
 
660
    For subqueries in VALUES() we should not see the table in which we are
 
661
    inserting (for INSERT ... SELECT this is done by changing table_list,
 
662
    because INSERT ... SELECT share SELECT_LEX it with SELECT.
 
663
  */
 
664
  if (!select_insert)
 
665
  {
 
666
    for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit();
 
667
         un;
 
668
         un= un->next_unit())
 
669
    {
 
670
      for (SELECT_LEX *sl= un->first_select();
 
671
           sl;
 
672
           sl= sl->next_select())
 
673
      {
 
674
        sl->context.outer_context= 0;
 
675
      }
 
676
    }
 
677
  }
 
678
 
 
679
  if (duplic == DUP_UPDATE)
 
680
  {
 
681
    /* it should be allocated before Item::fix_fields() */
 
682
    if (table_list->set_insert_values(thd->mem_root))
 
683
      DBUG_RETURN(TRUE);
 
684
  }
 
685
 
 
686
  if (mysql_prepare_insert_check_table(thd, table_list, fields, select_insert))
 
687
    DBUG_RETURN(TRUE);
 
688
 
 
689
 
 
690
  /* Prepare the fields in the statement. */
 
691
  if (values)
 
692
  {
 
693
    /* if we have INSERT ... VALUES () we cannot have a GROUP BY clause */
 
694
    DBUG_ASSERT (!select_lex->group_list.elements);
 
695
 
 
696
    /* Save the state of the current name resolution context. */
 
697
    ctx_state.save_state(context, table_list);
 
698
 
 
699
    /*
 
700
      Perform name resolution only in the first table - 'table_list',
 
701
      which is the table that is inserted into.
 
702
     */
 
703
    table_list->next_local= 0;
 
704
    context->resolve_in_table_list_only(table_list);
 
705
 
 
706
    res= check_insert_fields(thd, context->table_list, fields, *values,
 
707
                             !insert_into_view, &map) ||
 
708
      setup_fields(thd, 0, *values, MARK_COLUMNS_READ, 0, 0);
 
709
 
 
710
    if (!res && check_fields)
 
711
    {
 
712
      bool saved_abort_on_warning= thd->abort_on_warning;
 
713
      thd->abort_on_warning= abort_on_warning;
 
714
      res= check_that_all_fields_are_given_values(thd, 
 
715
                                                  table ? table : 
 
716
                                                  context->table_list->table,
 
717
                                                  context->table_list);
 
718
      thd->abort_on_warning= saved_abort_on_warning;
 
719
    }
 
720
 
 
721
    if (!res && duplic == DUP_UPDATE)
 
722
    {
 
723
      select_lex->no_wrap_view_item= TRUE;
 
724
      res= check_update_fields(thd, context->table_list, update_fields, &map);
 
725
      select_lex->no_wrap_view_item= FALSE;
 
726
    }
 
727
 
 
728
    /* Restore the current context. */
 
729
    ctx_state.restore_state(context, table_list);
 
730
 
 
731
    if (!res)
 
732
      res= setup_fields(thd, 0, update_values, MARK_COLUMNS_READ, 0, 0);
 
733
  }
 
734
 
 
735
  if (res)
 
736
    DBUG_RETURN(res);
 
737
 
 
738
  if (!table)
 
739
    table= table_list->table;
 
740
 
 
741
  if (!select_insert)
 
742
  {
 
743
    Item *fake_conds= 0;
 
744
    TABLE_LIST *duplicate;
 
745
    if ((duplicate= unique_table(thd, table_list, table_list->next_global, 1)))
 
746
    {
 
747
      update_non_unique_table_error(table_list, "INSERT", duplicate);
 
748
      DBUG_RETURN(TRUE);
 
749
    }
 
750
    select_lex->fix_prepare_information(thd, &fake_conds, &fake_conds);
 
751
    select_lex->first_execution= 0;
 
752
  }
 
753
  if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
 
754
    table->prepare_for_position();
 
755
  DBUG_RETURN(FALSE);
 
756
}
 
757
 
 
758
 
 
759
        /* Check if there is more uniq keys after field */
 
760
 
 
761
static int last_uniq_key(TABLE *table,uint keynr)
 
762
{
 
763
  while (++keynr < table->s->keys)
 
764
    if (table->key_info[keynr].flags & HA_NOSAME)
 
765
      return 0;
 
766
  return 1;
 
767
}
 
768
 
 
769
 
 
770
/*
 
771
  Write a record to table with optional deleting of conflicting records,
 
772
  invoke proper triggers if needed.
 
773
 
 
774
  SYNOPSIS
 
775
     write_record()
 
776
      thd   - thread context
 
777
      table - table to which record should be written
 
778
      info  - COPY_INFO structure describing handling of duplicates
 
779
              and which is used for counting number of records inserted
 
780
              and deleted.
 
781
 
 
782
  NOTE
 
783
    Once this record will be written to table after insert trigger will
 
784
    be invoked. If instead of inserting new record we will update old one
 
785
    then both on update triggers will work instead. Similarly both on
 
786
    delete triggers will be invoked if we will delete conflicting records.
 
787
 
 
788
    Sets thd->transaction.stmt.modified_non_trans_table to TRUE if table which is updated didn't have
 
789
    transactions.
 
790
 
 
791
  RETURN VALUE
 
792
    0     - success
 
793
    non-0 - error
 
794
*/
 
795
 
 
796
 
 
797
int write_record(THD *thd, TABLE *table,COPY_INFO *info)
 
798
{
 
799
  int error;
 
800
  char *key=0;
 
801
  MY_BITMAP *save_read_set, *save_write_set;
 
802
  ulonglong prev_insert_id= table->file->next_insert_id;
 
803
  ulonglong insert_id_for_cur_row= 0;
 
804
  DBUG_ENTER("write_record");
 
805
 
 
806
  info->records++;
 
807
  save_read_set=  table->read_set;
 
808
  save_write_set= table->write_set;
 
809
 
 
810
  if (info->handle_duplicates == DUP_REPLACE ||
 
811
      info->handle_duplicates == DUP_UPDATE)
 
812
  {
 
813
    while ((error=table->file->ha_write_row(table->record[0])))
 
814
    {
 
815
      uint key_nr;
 
816
      /*
 
817
        If we do more than one iteration of this loop, from the second one the
 
818
        row will have an explicit value in the autoinc field, which was set at
 
819
        the first call of handler::update_auto_increment(). So we must save
 
820
        the autogenerated value to avoid thd->insert_id_for_cur_row to become
 
821
        0.
 
822
      */
 
823
      if (table->file->insert_id_for_cur_row > 0)
 
824
        insert_id_for_cur_row= table->file->insert_id_for_cur_row;
 
825
      else
 
826
        table->file->insert_id_for_cur_row= insert_id_for_cur_row;
 
827
      bool is_duplicate_key_error;
 
828
      if (table->file->is_fatal_error(error, HA_CHECK_DUP))
 
829
        goto err;
 
830
      is_duplicate_key_error= table->file->is_fatal_error(error, 0);
 
831
      if (!is_duplicate_key_error)
 
832
      {
 
833
        /*
 
834
          We come here when we had an ignorable error which is not a duplicate
 
835
          key error. In this we ignore error if ignore flag is set, otherwise
 
836
          report error as usual. We will not do any duplicate key processing.
 
837
        */
 
838
        if (info->ignore)
 
839
          goto gok_or_after_err; /* Ignoring a not fatal error, return 0 */
 
840
        goto err;
 
841
      }
 
842
      if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
 
843
      {
 
844
        error= HA_ERR_FOUND_DUPP_KEY;         /* Database can't find key */
 
845
        goto err;
 
846
      }
 
847
      /* Read all columns for the row we are going to replace */
 
848
      table->use_all_columns();
 
849
      /*
 
850
        Don't allow REPLACE to replace a row when a auto_increment column
 
851
        was used.  This ensures that we don't get a problem when the
 
852
        whole range of the key has been used.
 
853
      */
 
854
      if (info->handle_duplicates == DUP_REPLACE &&
 
855
          table->next_number_field &&
 
856
          key_nr == table->s->next_number_index &&
 
857
          (insert_id_for_cur_row > 0))
 
858
        goto err;
 
859
      if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
 
860
      {
 
861
        if (table->file->rnd_pos(table->record[1],table->file->dup_ref))
 
862
          goto err;
 
863
      }
 
864
      else
 
865
      {
 
866
        if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
 
867
        {
 
868
          error=my_errno;
 
869
          goto err;
 
870
        }
 
871
 
 
872
        if (!key)
 
873
        {
 
874
          if (!(key=(char*) my_safe_alloca(table->s->max_unique_length,
 
875
                                           MAX_KEY_LENGTH)))
 
876
          {
 
877
            error=ENOMEM;
 
878
            goto err;
 
879
          }
 
880
        }
 
881
        key_copy((uchar*) key,table->record[0],table->key_info+key_nr,0);
 
882
        if ((error=(table->file->index_read_idx_map(table->record[1],key_nr,
 
883
                                                    (uchar*) key, HA_WHOLE_KEY,
 
884
                                                    HA_READ_KEY_EXACT))))
 
885
          goto err;
 
886
      }
 
887
      if (info->handle_duplicates == DUP_UPDATE)
 
888
      {
 
889
        /*
 
890
          We don't check for other UNIQUE keys - the first row
 
891
          that matches, is updated. If update causes a conflict again,
 
892
          an error is returned
 
893
        */
 
894
        DBUG_ASSERT(table->insert_values != NULL);
 
895
        store_record(table,insert_values);
 
896
        restore_record(table,record[1]);
 
897
        DBUG_ASSERT(info->update_fields->elements ==
 
898
                    info->update_values->elements);
 
899
        if (fill_record(thd, *info->update_fields,
 
900
                                                 *info->update_values,
 
901
                                                 info->ignore))
 
902
          goto before_err;
 
903
 
 
904
        table->file->restore_auto_increment(prev_insert_id);
 
905
        if (table->next_number_field)
 
906
          table->file->adjust_next_insert_id_after_explicit_value(
 
907
            table->next_number_field->val_int());
 
908
        info->touched++;
 
909
        if ((table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ &&
 
910
             !bitmap_is_subset(table->write_set, table->read_set)) ||
 
911
            compare_record(table))
 
912
        {
 
913
          if ((error=table->file->ha_update_row(table->record[1],
 
914
                                                table->record[0])) &&
 
915
              error != HA_ERR_RECORD_IS_THE_SAME)
 
916
          {
 
917
            if (info->ignore &&
 
918
                !table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
 
919
            {
 
920
              goto gok_or_after_err;
 
921
            }
 
922
            goto err;
 
923
          }
 
924
 
 
925
          if (error != HA_ERR_RECORD_IS_THE_SAME)
 
926
            info->updated++;
 
927
          else
 
928
            error= 0;
 
929
          /*
 
930
            If ON DUP KEY UPDATE updates a row instead of inserting one, it's
 
931
            like a regular UPDATE statement: it should not affect the value of a
 
932
            next SELECT LAST_INSERT_ID() or mysql_insert_id().
 
933
            Except if LAST_INSERT_ID(#) was in the INSERT query, which is
 
934
            handled separately by THD::arg_of_last_insert_id_function.
 
935
          */
 
936
          insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
 
937
          info->copied++;
 
938
        }
 
939
 
 
940
        if (table->next_number_field)
 
941
          table->file->adjust_next_insert_id_after_explicit_value(
 
942
            table->next_number_field->val_int());
 
943
        info->touched++;
 
944
 
 
945
        goto gok_or_after_err;
 
946
      }
 
947
      else /* DUP_REPLACE */
 
948
      {
 
949
        /*
 
950
          The manual defines the REPLACE semantics that it is either
 
951
          an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
 
952
          InnoDB do not function in the defined way if we allow MySQL
 
953
          to convert the latter operation internally to an UPDATE.
 
954
          We also should not perform this conversion if we have 
 
955
          timestamp field with ON UPDATE which is different from DEFAULT.
 
956
          Another case when conversion should not be performed is when
 
957
          we have ON DELETE trigger on table so user may notice that
 
958
          we cheat here. Note that it is ok to do such conversion for
 
959
          tables which have ON UPDATE but have no ON DELETE triggers,
 
960
          we just should not expose this fact to users by invoking
 
961
          ON UPDATE triggers.
 
962
        */
 
963
        if (last_uniq_key(table,key_nr) &&
 
964
            !table->file->referenced_by_foreign_key() &&
 
965
            (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
 
966
             table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
 
967
        {
 
968
          if ((error=table->file->ha_update_row(table->record[1],
 
969
                                                table->record[0])) &&
 
970
              error != HA_ERR_RECORD_IS_THE_SAME)
 
971
            goto err;
 
972
          if (error != HA_ERR_RECORD_IS_THE_SAME)
 
973
            info->deleted++;
 
974
          else
 
975
            error= 0;
 
976
          thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
 
977
          /*
 
978
            Since we pretend that we have done insert we should call
 
979
            its after triggers.
 
980
          */
 
981
          goto after_n_copied_inc;
 
982
        }
 
983
        else
 
984
        {
 
985
          if ((error=table->file->ha_delete_row(table->record[1])))
 
986
            goto err;
 
987
          info->deleted++;
 
988
          if (!table->file->has_transactions())
 
989
            thd->transaction.stmt.modified_non_trans_table= TRUE;
 
990
          /* Let us attempt do write_row() once more */
 
991
        }
 
992
      }
 
993
    }
 
994
    thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
 
995
    /*
 
996
      Restore column maps if they where replaced during an duplicate key
 
997
      problem.
 
998
    */
 
999
    if (table->read_set != save_read_set ||
 
1000
        table->write_set != save_write_set)
 
1001
      table->column_bitmaps_set(save_read_set, save_write_set);
 
1002
  }
 
1003
  else if ((error=table->file->ha_write_row(table->record[0])))
 
1004
  {
 
1005
    if (!info->ignore ||
 
1006
        table->file->is_fatal_error(error, HA_CHECK_DUP))
 
1007
      goto err;
 
1008
    table->file->restore_auto_increment(prev_insert_id);
 
1009
    goto gok_or_after_err;
 
1010
  }
 
1011
 
 
1012
after_n_copied_inc:
 
1013
  info->copied++;
 
1014
  thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
 
1015
 
 
1016
gok_or_after_err:
 
1017
  if (key)
 
1018
    my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH);
 
1019
  if (!table->file->has_transactions())
 
1020
    thd->transaction.stmt.modified_non_trans_table= TRUE;
 
1021
  DBUG_RETURN(0);
 
1022
 
 
1023
err:
 
1024
  info->last_errno= error;
 
1025
  /* current_select is NULL if this is a delayed insert */
 
1026
  if (thd->lex->current_select)
 
1027
    thd->lex->current_select->no_error= 0;        // Give error
 
1028
  table->file->print_error(error,MYF(0));
 
1029
  
 
1030
before_err:
 
1031
  table->file->restore_auto_increment(prev_insert_id);
 
1032
  if (key)
 
1033
    my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH);
 
1034
  table->column_bitmaps_set(save_read_set, save_write_set);
 
1035
  DBUG_RETURN(1);
 
1036
}
 
1037
 
 
1038
 
 
1039
/******************************************************************************
 
1040
  Check that all fields with arn't null_fields are used
 
1041
******************************************************************************/
 
1042
 
 
1043
int check_that_all_fields_are_given_values(THD *thd, TABLE *entry,
 
1044
                                           TABLE_LIST *table_list)
 
1045
{
 
1046
  int err= 0;
 
1047
  MY_BITMAP *write_set= entry->write_set;
 
1048
 
 
1049
  for (Field **field=entry->field ; *field ; field++)
 
1050
  {
 
1051
    if (!bitmap_is_set(write_set, (*field)->field_index) &&
 
1052
        ((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
 
1053
        ((*field)->real_type() != MYSQL_TYPE_ENUM))
 
1054
    {
 
1055
      bool view= FALSE;
 
1056
      if (table_list)
 
1057
      {
 
1058
        table_list= table_list->top_table();
 
1059
        view= test(0);
 
1060
      }
 
1061
      {
 
1062
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
 
1063
                            ER_NO_DEFAULT_FOR_FIELD,
 
1064
                            ER(ER_NO_DEFAULT_FOR_FIELD),
 
1065
                            (*field)->field_name);
 
1066
      }
 
1067
      err= 1;
 
1068
    }
 
1069
  }
 
1070
  return thd->abort_on_warning ? err : 0;
 
1071
}
 
1072
 
 
1073
/***************************************************************************
 
1074
  Store records in INSERT ... SELECT *
 
1075
***************************************************************************/
 
1076
 
 
1077
 
 
1078
/*
 
1079
  make insert specific preparation and checks after opening tables
 
1080
 
 
1081
  SYNOPSIS
 
1082
    mysql_insert_select_prepare()
 
1083
    thd         thread handler
 
1084
 
 
1085
  RETURN
 
1086
    FALSE OK
 
1087
    TRUE  Error
 
1088
*/
 
1089
 
 
1090
bool mysql_insert_select_prepare(THD *thd)
 
1091
{
 
1092
  LEX *lex= thd->lex;
 
1093
  SELECT_LEX *select_lex= &lex->select_lex;
 
1094
  TABLE_LIST *first_select_leaf_table;
 
1095
  DBUG_ENTER("mysql_insert_select_prepare");
 
1096
 
 
1097
  /*
 
1098
    Statement-based replication of INSERT ... SELECT ... LIMIT is not safe
 
1099
    as order of rows is not defined, so in mixed mode we go to row-based.
 
1100
 
 
1101
    Note that we may consider a statement as safe if ORDER BY primary_key
 
1102
    is present or we SELECT a constant. However it may confuse users to
 
1103
    see very similiar statements replicated differently.
 
1104
  */
 
1105
  if (lex->current_select->select_limit)
 
1106
  {
 
1107
    lex->set_stmt_unsafe();
 
1108
    thd->set_current_stmt_binlog_row_based_if_mixed();
 
1109
  }
 
1110
  /*
 
1111
    SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
 
1112
    clause if table is VIEW
 
1113
  */
 
1114
  
 
1115
  if (mysql_prepare_insert(thd, lex->query_tables,
 
1116
                           lex->query_tables->table, lex->field_list, 0,
 
1117
                           lex->update_list, lex->value_list,
 
1118
                           lex->duplicates,
 
1119
                           &select_lex->where, TRUE, FALSE, FALSE))
 
1120
    DBUG_RETURN(TRUE);
 
1121
 
 
1122
  /*
 
1123
    exclude first table from leaf tables list, because it belong to
 
1124
    INSERT
 
1125
  */
 
1126
  DBUG_ASSERT(select_lex->leaf_tables != 0);
 
1127
  lex->leaf_tables_insert= select_lex->leaf_tables;
 
1128
  /* skip all leaf tables belonged to view where we are insert */
 
1129
  for (first_select_leaf_table= select_lex->leaf_tables->next_leaf;
 
1130
       first_select_leaf_table &&
 
1131
       first_select_leaf_table->belong_to_view &&
 
1132
       first_select_leaf_table->belong_to_view ==
 
1133
       lex->leaf_tables_insert->belong_to_view;
 
1134
       first_select_leaf_table= first_select_leaf_table->next_leaf)
 
1135
  {}
 
1136
  select_lex->leaf_tables= first_select_leaf_table;
 
1137
  DBUG_RETURN(FALSE);
 
1138
}
 
1139
 
 
1140
 
 
1141
select_insert::select_insert(TABLE_LIST *table_list_par, TABLE *table_par,
 
1142
                             List<Item> *fields_par,
 
1143
                             List<Item> *update_fields,
 
1144
                             List<Item> *update_values,
 
1145
                             enum_duplicates duplic,
 
1146
                             bool ignore_check_option_errors)
 
1147
  :table_list(table_list_par), table(table_par), fields(fields_par),
 
1148
   autoinc_value_of_last_inserted_row(0),
 
1149
   insert_into_view(table_list_par && 0 != 0)
 
1150
{
 
1151
  bzero((char*) &info,sizeof(info));
 
1152
  info.handle_duplicates= duplic;
 
1153
  info.ignore= ignore_check_option_errors;
 
1154
  info.update_fields= update_fields;
 
1155
  info.update_values= update_values;
 
1156
}
 
1157
 
 
1158
 
 
1159
int
 
1160
select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
 
1161
{
 
1162
  LEX *lex= thd->lex;
 
1163
  int res;
 
1164
  table_map map= 0;
 
1165
  SELECT_LEX *lex_current_select_save= lex->current_select;
 
1166
  DBUG_ENTER("select_insert::prepare");
 
1167
 
 
1168
  unit= u;
 
1169
 
 
1170
  /*
 
1171
    Since table in which we are going to insert is added to the first
 
1172
    select, LEX::current_select should point to the first select while
 
1173
    we are fixing fields from insert list.
 
1174
  */
 
1175
  lex->current_select= &lex->select_lex;
 
1176
  res= check_insert_fields(thd, table_list, *fields, values,
 
1177
                           !insert_into_view, &map) ||
 
1178
       setup_fields(thd, 0, values, MARK_COLUMNS_READ, 0, 0);
 
1179
 
 
1180
  if (!res && fields->elements)
 
1181
  {
 
1182
    bool saved_abort_on_warning= thd->abort_on_warning;
 
1183
    thd->abort_on_warning= !info.ignore;
 
1184
    res= check_that_all_fields_are_given_values(thd, table_list->table, 
 
1185
                                                table_list);
 
1186
    thd->abort_on_warning= saved_abort_on_warning;
 
1187
  }
 
1188
 
 
1189
  if (info.handle_duplicates == DUP_UPDATE && !res)
 
1190
  {
 
1191
    Name_resolution_context *context= &lex->select_lex.context;
 
1192
    Name_resolution_context_state ctx_state;
 
1193
 
 
1194
    /* Save the state of the current name resolution context. */
 
1195
    ctx_state.save_state(context, table_list);
 
1196
 
 
1197
    /* Perform name resolution only in the first table - 'table_list'. */
 
1198
    table_list->next_local= 0;
 
1199
    context->resolve_in_table_list_only(table_list);
 
1200
 
 
1201
    lex->select_lex.no_wrap_view_item= TRUE;
 
1202
    res= res || check_update_fields(thd, context->table_list,
 
1203
                                    *info.update_fields, &map);
 
1204
    lex->select_lex.no_wrap_view_item= FALSE;
 
1205
    /*
 
1206
      When we are not using GROUP BY and there are no ungrouped aggregate functions 
 
1207
      we can refer to other tables in the ON DUPLICATE KEY part.
 
1208
      We use next_name_resolution_table descructively, so check it first (views?)
 
1209
    */
 
1210
    DBUG_ASSERT (!table_list->next_name_resolution_table);
 
1211
    if (lex->select_lex.group_list.elements == 0 &&
 
1212
        !lex->select_lex.with_sum_func)
 
1213
      /*
 
1214
        We must make a single context out of the two separate name resolution contexts :
 
1215
        the INSERT table and the tables in the SELECT part of INSERT ... SELECT.
 
1216
        To do that we must concatenate the two lists
 
1217
      */  
 
1218
      table_list->next_name_resolution_table= 
 
1219
        ctx_state.get_first_name_resolution_table();
 
1220
 
 
1221
    res= res || setup_fields(thd, 0, *info.update_values,
 
1222
                             MARK_COLUMNS_READ, 0, 0);
 
1223
    if (!res)
 
1224
    {
 
1225
      /*
 
1226
        Traverse the update values list and substitute fields from the
 
1227
        select for references (Item_ref objects) to them. This is done in
 
1228
        order to get correct values from those fields when the select
 
1229
        employs a temporary table.
 
1230
      */
 
1231
      List_iterator<Item> li(*info.update_values);
 
1232
      Item *item;
 
1233
 
 
1234
      while ((item= li++))
 
1235
      {
 
1236
        item->transform(&Item::update_value_transformer,
 
1237
                        (uchar*)lex->current_select);
 
1238
      }
 
1239
    }
 
1240
 
 
1241
    /* Restore the current context. */
 
1242
    ctx_state.restore_state(context, table_list);
 
1243
  }
 
1244
 
 
1245
  lex->current_select= lex_current_select_save;
 
1246
  if (res)
 
1247
    DBUG_RETURN(1);
 
1248
  /*
 
1249
    if it is INSERT into join view then check_insert_fields already found
 
1250
    real table for insert
 
1251
  */
 
1252
  table= table_list->table;
 
1253
 
 
1254
  /*
 
1255
    Is table which we are changing used somewhere in other parts of
 
1256
    query
 
1257
  */
 
1258
  if (unique_table(thd, table_list, table_list->next_global, 0))
 
1259
  {
 
1260
    /* Using same table for INSERT and SELECT */
 
1261
    lex->current_select->options|= OPTION_BUFFER_RESULT;
 
1262
    lex->current_select->join->select_options|= OPTION_BUFFER_RESULT;
 
1263
  }
 
1264
  else if (!(lex->current_select->options & OPTION_BUFFER_RESULT))
 
1265
  {
 
1266
    /*
 
1267
      We must not yet prepare the result table if it is the same as one of the 
 
1268
      source tables (INSERT SELECT). The preparation may disable 
 
1269
      indexes on the result table, which may be used during the select, if it
 
1270
      is the same table (Bug #6034). Do the preparation after the select phase
 
1271
      in select_insert::prepare2().
 
1272
      We won't start bulk inserts at all if this statement uses functions or
 
1273
      should invoke triggers since they may access to the same table too.
 
1274
    */
 
1275
    table->file->ha_start_bulk_insert((ha_rows) 0);
 
1276
  }
 
1277
  restore_record(table,s->default_values);              // Get empty record
 
1278
  table->next_number_field=table->found_next_number_field;
 
1279
 
 
1280
#ifdef HAVE_REPLICATION
 
1281
  if (thd->slave_thread &&
 
1282
      (info.handle_duplicates == DUP_UPDATE) &&
 
1283
      (table->next_number_field != NULL) &&
 
1284
      rpl_master_has_bug(&active_mi->rli, 24432))
 
1285
    DBUG_RETURN(1);
 
1286
#endif
 
1287
 
 
1288
  thd->cuted_fields=0;
 
1289
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
 
1290
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
1291
  if (info.handle_duplicates == DUP_REPLACE)
 
1292
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
1293
  if (info.handle_duplicates == DUP_UPDATE)
 
1294
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
1295
  thd->abort_on_warning= !info.ignore;
 
1296
  table->mark_columns_needed_for_insert();
 
1297
 
 
1298
 
 
1299
  DBUG_RETURN(res);
 
1300
}
 
1301
 
 
1302
 
 
1303
/*
 
1304
  Finish the preparation of the result table.
 
1305
 
 
1306
  SYNOPSIS
 
1307
    select_insert::prepare2()
 
1308
    void
 
1309
 
 
1310
  DESCRIPTION
 
1311
    If the result table is the same as one of the source tables (INSERT SELECT),
 
1312
    the result table is not finally prepared at the join prepair phase.
 
1313
    Do the final preparation now.
 
1314
                       
 
1315
  RETURN
 
1316
    0   OK
 
1317
*/
 
1318
 
 
1319
int select_insert::prepare2(void)
 
1320
{
 
1321
  DBUG_ENTER("select_insert::prepare2");
 
1322
  if (thd->lex->current_select->options & OPTION_BUFFER_RESULT)
 
1323
    table->file->ha_start_bulk_insert((ha_rows) 0);
 
1324
  DBUG_RETURN(0);
 
1325
}
 
1326
 
 
1327
 
 
1328
void select_insert::cleanup()
 
1329
{
 
1330
  /* select_insert/select_create are never re-used in prepared statement */
 
1331
  DBUG_ASSERT(0);
 
1332
}
 
1333
 
 
1334
select_insert::~select_insert()
 
1335
{
 
1336
  DBUG_ENTER("~select_insert");
 
1337
  if (table)
 
1338
  {
 
1339
    table->next_number_field=0;
 
1340
    table->auto_increment_field_not_null= FALSE;
 
1341
    table->file->ha_reset();
 
1342
  }
 
1343
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
 
1344
  thd->abort_on_warning= 0;
 
1345
  DBUG_VOID_RETURN;
 
1346
}
 
1347
 
 
1348
 
 
1349
bool select_insert::send_data(List<Item> &values)
 
1350
{
 
1351
  DBUG_ENTER("select_insert::send_data");
 
1352
  bool error=0;
 
1353
 
 
1354
  if (unit->offset_limit_cnt)
 
1355
  {                                             // using limit offset,count
 
1356
    unit->offset_limit_cnt--;
 
1357
    DBUG_RETURN(0);
 
1358
  }
 
1359
 
 
1360
  thd->count_cuted_fields= CHECK_FIELD_WARN;    // Calculate cuted fields
 
1361
  store_values(values);
 
1362
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
 
1363
  if (thd->is_error())
 
1364
    DBUG_RETURN(1);
 
1365
 
 
1366
  error= write_record(thd, table, &info);
 
1367
    
 
1368
  if (!error)
 
1369
  {
 
1370
    if (info.handle_duplicates == DUP_UPDATE)
 
1371
    {
 
1372
      /*
 
1373
        Restore fields of the record since it is possible that they were
 
1374
        changed by ON DUPLICATE KEY UPDATE clause.
 
1375
    
 
1376
        If triggers exist then whey can modify some fields which were not
 
1377
        originally touched by INSERT ... SELECT, so we have to restore
 
1378
        their original values for the next row.
 
1379
      */
 
1380
      restore_record(table, s->default_values);
 
1381
    }
 
1382
    if (table->next_number_field)
 
1383
    {
 
1384
      /*
 
1385
        If no value has been autogenerated so far, we need to remember the
 
1386
        value we just saw, we may need to send it to client in the end.
 
1387
      */
 
1388
      if (thd->first_successful_insert_id_in_cur_stmt == 0) // optimization
 
1389
        autoinc_value_of_last_inserted_row= 
 
1390
          table->next_number_field->val_int();
 
1391
      /*
 
1392
        Clear auto-increment field for the next record, if triggers are used
 
1393
        we will clear it twice, but this should be cheap.
 
1394
      */
 
1395
      table->next_number_field->reset();
 
1396
    }
 
1397
  }
 
1398
  DBUG_RETURN(error);
 
1399
}
 
1400
 
 
1401
 
 
1402
void select_insert::store_values(List<Item> &values)
 
1403
{
 
1404
  if (fields->elements)
 
1405
    fill_record(thd, *fields, values, 1);
 
1406
  else
 
1407
    fill_record(thd, table->field, values, 1);
 
1408
}
 
1409
 
 
1410
void select_insert::send_error(uint errcode,const char *err)
 
1411
{
 
1412
  DBUG_ENTER("select_insert::send_error");
 
1413
 
 
1414
  my_message(errcode, err, MYF(0));
 
1415
 
 
1416
  DBUG_VOID_RETURN;
 
1417
}
 
1418
 
 
1419
 
 
1420
bool select_insert::send_eof()
 
1421
{
 
1422
  int error;
 
1423
  bool const trans_table= table->file->has_transactions();
 
1424
  ulonglong id;
 
1425
  bool changed;
 
1426
  THD::killed_state killed_status= thd->killed;
 
1427
  DBUG_ENTER("select_insert::send_eof");
 
1428
  DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'",
 
1429
                       trans_table, table->file->table_type()));
 
1430
 
 
1431
  error= table->file->ha_end_bulk_insert();
 
1432
  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1433
  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1434
 
 
1435
  if ((changed= (info.copied || info.deleted || info.updated)))
 
1436
  {
 
1437
    /*
 
1438
      We must invalidate the table in the query cache before binlog writing
 
1439
      and ha_autocommit_or_rollback.
 
1440
    */
 
1441
    if (thd->transaction.stmt.modified_non_trans_table)
 
1442
      thd->transaction.all.modified_non_trans_table= TRUE;
 
1443
  }
 
1444
  DBUG_ASSERT(trans_table || !changed || 
 
1445
              thd->transaction.stmt.modified_non_trans_table);
 
1446
 
 
1447
  /*
 
1448
    Write to binlog before commiting transaction.  No statement will
 
1449
    be written by the binlog_query() below in RBR mode.  All the
 
1450
    events are in the transaction cache and will be written when
 
1451
    ha_autocommit_or_rollback() is issued below.
 
1452
  */
 
1453
  if (mysql_bin_log.is_open())
 
1454
  {
 
1455
    if (!error)
 
1456
      thd->clear_error();
 
1457
    thd->binlog_query(THD::ROW_QUERY_TYPE,
 
1458
                      thd->query, thd->query_length,
 
1459
                      trans_table, FALSE, killed_status);
 
1460
  }
 
1461
  table->file->ha_release_auto_increment();
 
1462
 
 
1463
  if (error)
 
1464
  {
 
1465
    table->file->print_error(error,MYF(0));
 
1466
    DBUG_RETURN(1);
 
1467
  }
 
1468
  char buff[160];
 
1469
  if (info.ignore)
 
1470
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
 
1471
            (ulong) (info.records - info.copied), (ulong) thd->cuted_fields);
 
1472
  else
 
1473
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
 
1474
            (ulong) (info.deleted+info.updated), (ulong) thd->cuted_fields);
 
1475
  thd->row_count_func= info.copied + info.deleted +
 
1476
                       ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
 
1477
                        info.touched : info.updated);
 
1478
 
 
1479
  id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
 
1480
    thd->first_successful_insert_id_in_cur_stmt :
 
1481
    (thd->arg_of_last_insert_id_function ?
 
1482
     thd->first_successful_insert_id_in_prev_stmt :
 
1483
     (info.copied ? autoinc_value_of_last_inserted_row : 0));
 
1484
  ::my_ok(thd, (ulong) thd->row_count_func, id, buff);
 
1485
  DBUG_RETURN(0);
 
1486
}
 
1487
 
 
1488
void select_insert::abort() {
 
1489
 
 
1490
  DBUG_ENTER("select_insert::abort");
 
1491
  /*
 
1492
    If the creation of the table failed (due to a syntax error, for
 
1493
    example), no table will have been opened and therefore 'table'
 
1494
    will be NULL. In that case, we still need to execute the rollback
 
1495
    and the end of the function.
 
1496
   */
 
1497
  if (table)
 
1498
  {
 
1499
    bool changed, transactional_table;
 
1500
 
 
1501
    table->file->ha_end_bulk_insert();
 
1502
 
 
1503
    /*
 
1504
      If at least one row has been inserted/modified and will stay in
 
1505
      the table (the table doesn't have transactions) we must write to
 
1506
      the binlog (and the error code will make the slave stop).
 
1507
 
 
1508
      For many errors (example: we got a duplicate key error while
 
1509
      inserting into a MyISAM table), no row will be added to the table,
 
1510
      so passing the error to the slave will not help since there will
 
1511
      be an error code mismatch (the inserts will succeed on the slave
 
1512
      with no error).
 
1513
 
 
1514
      If table creation failed, the number of rows modified will also be
 
1515
      zero, so no check for that is made.
 
1516
    */
 
1517
    changed= (info.copied || info.deleted || info.updated);
 
1518
    transactional_table= table->file->has_transactions();
 
1519
    if (thd->transaction.stmt.modified_non_trans_table)
 
1520
    {
 
1521
        if (mysql_bin_log.is_open())
 
1522
          thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length,
 
1523
                            transactional_table, FALSE);
 
1524
        if (!thd->current_stmt_binlog_row_based && !can_rollback_data())
 
1525
          thd->transaction.all.modified_non_trans_table= TRUE;
 
1526
    }
 
1527
    DBUG_ASSERT(transactional_table || !changed ||
 
1528
                thd->transaction.stmt.modified_non_trans_table);
 
1529
    table->file->ha_release_auto_increment();
 
1530
  }
 
1531
 
 
1532
  DBUG_VOID_RETURN;
 
1533
}
 
1534
 
 
1535
 
 
1536
/***************************************************************************
 
1537
  CREATE TABLE (SELECT) ...
 
1538
***************************************************************************/
 
1539
 
 
1540
/*
 
1541
  Create table from lists of fields and items (or just return TABLE
 
1542
  object for pre-opened existing table).
 
1543
 
 
1544
  SYNOPSIS
 
1545
    create_table_from_items()
 
1546
      thd          in     Thread object
 
1547
      create_info  in     Create information (like MAX_ROWS, ENGINE or
 
1548
                          temporary table flag)
 
1549
      create_table in     Pointer to TABLE_LIST object providing database
 
1550
                          and name for table to be created or to be open
 
1551
      alter_info   in/out Initial list of columns and indexes for the table
 
1552
                          to be created
 
1553
      items        in     List of items which should be used to produce rest
 
1554
                          of fields for the table (corresponding fields will
 
1555
                          be added to the end of alter_info->create_list)
 
1556
      lock         out    Pointer to the MYSQL_LOCK object for table created
 
1557
                          (or open temporary table) will be returned in this
 
1558
                          parameter. Since this table is not included in
 
1559
                          THD::lock caller is responsible for explicitly
 
1560
                          unlocking this table.
 
1561
      hooks
 
1562
 
 
1563
  NOTES
 
1564
    This function behaves differently for base and temporary tables:
 
1565
    - For base table we assume that either table exists and was pre-opened
 
1566
      and locked at open_and_lock_tables() stage (and in this case we just
 
1567
      emit error or warning and return pre-opened TABLE object) or special
 
1568
      placeholder was put in table cache that guarantees that this table
 
1569
      won't be created or opened until the placeholder will be removed
 
1570
      (so there is an exclusive lock on this table).
 
1571
    - We don't pre-open existing temporary table, instead we either open
 
1572
      or create and then open table in this function.
 
1573
 
 
1574
    Since this function contains some logic specific to CREATE TABLE ...
 
1575
    SELECT it should be changed before it can be used in other contexts.
 
1576
 
 
1577
  RETURN VALUES
 
1578
    non-zero  Pointer to TABLE object for table created or opened
 
1579
    0         Error
 
1580
*/
 
1581
 
 
1582
static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
 
1583
                                      TABLE_LIST *create_table,
 
1584
                                      Alter_info *alter_info,
 
1585
                                      List<Item> *items,
 
1586
                                      MYSQL_LOCK **lock,
 
1587
                                      TABLEOP_HOOKS *hooks)
 
1588
{
 
1589
  TABLE tmp_table;              // Used during 'Create_field()'
 
1590
  TABLE_SHARE share;
 
1591
  TABLE *table= 0;
 
1592
  uint select_field_count= items->elements;
 
1593
  /* Add selected items to field list */
 
1594
  List_iterator_fast<Item> it(*items);
 
1595
  Item *item;
 
1596
  Field *tmp_field;
 
1597
  bool not_used;
 
1598
  DBUG_ENTER("create_table_from_items");
 
1599
 
 
1600
  DBUG_EXECUTE_IF("sleep_create_select_before_check_if_exists", my_sleep(6000000););
 
1601
 
 
1602
  if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
 
1603
      create_table->table->db_stat)
 
1604
  {
 
1605
    /* Table already exists and was open at open_and_lock_tables() stage. */
 
1606
    if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
 
1607
    {
 
1608
      create_info->table_existed= 1;            // Mark that table existed
 
1609
      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
 
1610
                          ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
 
1611
                          create_table->table_name);
 
1612
      DBUG_RETURN(create_table->table);
 
1613
    }
 
1614
 
 
1615
    my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
 
1616
    DBUG_RETURN(0);
 
1617
  }
 
1618
 
 
1619
  tmp_table.alias= 0;
 
1620
  tmp_table.timestamp_field= 0;
 
1621
  tmp_table.s= &share;
 
1622
  init_tmp_table_share(thd, &share, "", 0, "", "");
 
1623
 
 
1624
  tmp_table.s->db_create_options=0;
 
1625
  tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
 
1626
  tmp_table.s->db_low_byte_first= 
 
1627
        test(create_info->db_type == myisam_hton ||
 
1628
             create_info->db_type == heap_hton);
 
1629
  tmp_table.null_row=tmp_table.maybe_null=0;
 
1630
 
 
1631
  while ((item=it++))
 
1632
  {
 
1633
    Create_field *cr_field;
 
1634
    Field *field, *def_field;
 
1635
    if (item->type() == Item::FUNC_ITEM)
 
1636
      if (item->result_type() != STRING_RESULT)
 
1637
        field= item->tmp_table_field(&tmp_table);
 
1638
      else
 
1639
        field= item->tmp_table_field_from_field_type(&tmp_table, 0);
 
1640
    else
 
1641
      field= create_tmp_field(thd, &tmp_table, item, item->type(),
 
1642
                              (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0,
 
1643
                              0);
 
1644
    if (!field ||
 
1645
        !(cr_field=new Create_field(field,(item->type() == Item::FIELD_ITEM ?
 
1646
                                           ((Item_field *)item)->field :
 
1647
                                           (Field*) 0))))
 
1648
      DBUG_RETURN(0);
 
1649
    if (item->maybe_null)
 
1650
      cr_field->flags &= ~NOT_NULL_FLAG;
 
1651
    alter_info->create_list.push_back(cr_field);
 
1652
  }
 
1653
 
 
1654
  DBUG_EXECUTE_IF("sleep_create_select_before_create", my_sleep(6000000););
 
1655
 
 
1656
  /*
 
1657
    Create and lock table.
 
1658
 
 
1659
    Note that we either creating (or opening existing) temporary table or
 
1660
    creating base table on which name we have exclusive lock. So code below
 
1661
    should not cause deadlocks or races.
 
1662
 
 
1663
    We don't log the statement, it will be logged later.
 
1664
 
 
1665
    If this is a HEAP table, the automatic DELETE FROM which is written to the
 
1666
    binlog when a HEAP table is opened for the first time since startup, must
 
1667
    not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
 
1668
    don't want to delete from it) 2) it would be written before the CREATE
 
1669
    TABLE, which is a wrong order. So we keep binary logging disabled when we
 
1670
    open_table().
 
1671
  */
 
1672
  {
 
1673
    tmp_disable_binlog(thd);
 
1674
    if (!mysql_create_table_no_lock(thd, create_table->db,
 
1675
                                    create_table->table_name,
 
1676
                                    create_info, alter_info, 0,
 
1677
                                    select_field_count))
 
1678
    {
 
1679
      if (create_info->table_existed &&
 
1680
          !(create_info->options & HA_LEX_CREATE_TMP_TABLE))
 
1681
      {
 
1682
        /*
 
1683
          This means that someone created table underneath server
 
1684
          or it was created via different mysqld front-end to the
 
1685
          cluster. We don't have much options but throw an error.
 
1686
        */
 
1687
        my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
 
1688
        DBUG_RETURN(0);
 
1689
      }
 
1690
 
 
1691
      DBUG_EXECUTE_IF("sleep_create_select_before_open", my_sleep(6000000););
 
1692
 
 
1693
      if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
 
1694
      {
 
1695
        VOID(pthread_mutex_lock(&LOCK_open));
 
1696
        if (reopen_name_locked_table(thd, create_table, FALSE))
 
1697
        {
 
1698
          quick_rm_table(create_info->db_type, create_table->db,
 
1699
                         table_case_name(create_info, create_table->table_name),
 
1700
                         0);
 
1701
        }
 
1702
        else
 
1703
          table= create_table->table;
 
1704
        VOID(pthread_mutex_unlock(&LOCK_open));
 
1705
      }
 
1706
      else
 
1707
      {
 
1708
        if (!(table= open_table(thd, create_table, thd->mem_root, (bool*) 0,
 
1709
                                MYSQL_OPEN_TEMPORARY_ONLY)) &&
 
1710
            !create_info->table_existed)
 
1711
        {
 
1712
          /*
 
1713
            This shouldn't happen as creation of temporary table should make
 
1714
            it preparable for open. But let us do close_temporary_table() here
 
1715
            just in case.
 
1716
          */
 
1717
          drop_temporary_table(thd, create_table);
 
1718
        }
 
1719
      }
 
1720
    }
 
1721
    reenable_binlog(thd);
 
1722
    if (!table)                                   // open failed
 
1723
      DBUG_RETURN(0);
 
1724
  }
 
1725
 
 
1726
  DBUG_EXECUTE_IF("sleep_create_select_before_lock", my_sleep(6000000););
 
1727
 
 
1728
  table->reginfo.lock_type=TL_WRITE;
 
1729
  hooks->prelock(&table, 1);                    // Call prelock hooks
 
1730
  if (! ((*lock)= mysql_lock_tables(thd, &table, 1,
 
1731
                                    MYSQL_LOCK_IGNORE_FLUSH, &not_used)) ||
 
1732
        hooks->postlock(&table, 1))
 
1733
  {
 
1734
    if (*lock)
 
1735
    {
 
1736
      mysql_unlock_tables(thd, *lock);
 
1737
      *lock= 0;
 
1738
    }
 
1739
 
 
1740
    if (!create_info->table_existed)
 
1741
      drop_open_table(thd, table, create_table->db, create_table->table_name);
 
1742
    DBUG_RETURN(0);
 
1743
  }
 
1744
  DBUG_RETURN(table);
 
1745
}
 
1746
 
 
1747
 
 
1748
int
 
1749
select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
 
1750
{
 
1751
  MYSQL_LOCK *extra_lock= NULL;
 
1752
  DBUG_ENTER("select_create::prepare");
 
1753
 
 
1754
  TABLEOP_HOOKS *hook_ptr= NULL;
 
1755
  /*
 
1756
    For row-based replication, the CREATE-SELECT statement is written
 
1757
    in two pieces: the first one contain the CREATE TABLE statement
 
1758
    necessary to create the table and the second part contain the rows
 
1759
    that should go into the table.
 
1760
 
 
1761
    For non-temporary tables, the start of the CREATE-SELECT
 
1762
    implicitly commits the previous transaction, and all events
 
1763
    forming the statement will be stored the transaction cache. At end
 
1764
    of the statement, the entire statement is committed as a
 
1765
    transaction, and all events are written to the binary log.
 
1766
 
 
1767
    On the master, the table is locked for the duration of the
 
1768
    statement, but since the CREATE part is replicated as a simple
 
1769
    statement, there is no way to lock the table for accesses on the
 
1770
    slave.  Hence, we have to hold on to the CREATE part of the
 
1771
    statement until the statement has finished.
 
1772
   */
 
1773
  class MY_HOOKS : public TABLEOP_HOOKS {
 
1774
  public:
 
1775
    MY_HOOKS(select_create *x, TABLE_LIST *create_table,
 
1776
             TABLE_LIST *select_tables)
 
1777
      : ptr(x), all_tables(*create_table)
 
1778
      {
 
1779
        all_tables.next_global= select_tables;
 
1780
      }
 
1781
 
 
1782
  private:
 
1783
    virtual int do_postlock(TABLE **tables, uint count)
 
1784
    {
 
1785
      THD *thd= const_cast<THD*>(ptr->get_thd());
 
1786
      if (int error= decide_logging_format(thd, &all_tables))
 
1787
        return error;
 
1788
 
 
1789
      TABLE const *const table = *tables;
 
1790
      if (thd->current_stmt_binlog_row_based  &&
 
1791
          !table->s->tmp_table &&
 
1792
          !ptr->get_create_info()->table_existed)
 
1793
      {
 
1794
        ptr->binlog_show_create_table(tables, count);
 
1795
      }
 
1796
      return 0;
 
1797
    }
 
1798
 
 
1799
    select_create *ptr;
 
1800
    TABLE_LIST all_tables;
 
1801
  };
 
1802
 
 
1803
  MY_HOOKS hooks(this, create_table, select_tables);
 
1804
  hook_ptr= &hooks;
 
1805
 
 
1806
  unit= u;
 
1807
 
 
1808
  /*
 
1809
    Start a statement transaction before the create if we are using
 
1810
    row-based replication for the statement.  If we are creating a
 
1811
    temporary table, we need to start a statement transaction.
 
1812
  */
 
1813
  if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 &&
 
1814
      thd->current_stmt_binlog_row_based)
 
1815
  {
 
1816
    thd->binlog_start_trans_and_stmt();
 
1817
  }
 
1818
 
 
1819
  if (!(table= create_table_from_items(thd, create_info, create_table,
 
1820
                                       alter_info, &values,
 
1821
                                       &extra_lock, hook_ptr)))
 
1822
    DBUG_RETURN(-1);                            // abort() deletes table
 
1823
 
 
1824
  if (extra_lock)
 
1825
  {
 
1826
    DBUG_ASSERT(m_plock == NULL);
 
1827
 
 
1828
    if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
 
1829
      m_plock= &m_lock;
 
1830
    else
 
1831
      m_plock= &thd->extra_lock;
 
1832
 
 
1833
    *m_plock= extra_lock;
 
1834
  }
 
1835
 
 
1836
  if (table->s->fields < values.elements)
 
1837
  {
 
1838
    my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
 
1839
    DBUG_RETURN(-1);
 
1840
  }
 
1841
 
 
1842
 /* First field to copy */
 
1843
  field= table->field+table->s->fields - values.elements;
 
1844
 
 
1845
  /* Mark all fields that are given values */
 
1846
  for (Field **f= field ; *f ; f++)
 
1847
    bitmap_set_bit(table->write_set, (*f)->field_index);
 
1848
 
 
1849
  /* Don't set timestamp if used */
 
1850
  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
 
1851
  table->next_number_field=table->found_next_number_field;
 
1852
 
 
1853
  restore_record(table,s->default_values);      // Get empty record
 
1854
  thd->cuted_fields=0;
 
1855
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
 
1856
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
1857
  if (info.handle_duplicates == DUP_REPLACE)
 
1858
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
1859
  if (info.handle_duplicates == DUP_UPDATE)
 
1860
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
1861
  table->file->ha_start_bulk_insert((ha_rows) 0);
 
1862
  thd->abort_on_warning= !info.ignore;
 
1863
  if (check_that_all_fields_are_given_values(thd, table, table_list))
 
1864
    DBUG_RETURN(1);
 
1865
  table->mark_columns_needed_for_insert();
 
1866
  table->file->extra(HA_EXTRA_WRITE_CACHE);
 
1867
  DBUG_RETURN(0);
 
1868
}
 
1869
 
 
1870
void
 
1871
select_create::binlog_show_create_table(TABLE **tables, uint count)
 
1872
{
 
1873
  /*
 
1874
    Note 1: In RBR mode, we generate a CREATE TABLE statement for the
 
1875
    created table by calling store_create_info() (behaves as SHOW
 
1876
    CREATE TABLE).  In the event of an error, nothing should be
 
1877
    written to the binary log, even if the table is non-transactional;
 
1878
    therefore we pretend that the generated CREATE TABLE statement is
 
1879
    for a transactional table.  The event will then be put in the
 
1880
    transaction cache, and any subsequent events (e.g., table-map
 
1881
    events and binrow events) will also be put there.  We can then use
 
1882
    ha_autocommit_or_rollback() to either throw away the entire
 
1883
    kaboodle of events, or write them to the binary log.
 
1884
 
 
1885
    We write the CREATE TABLE statement here and not in prepare()
 
1886
    since there potentially are sub-selects or accesses to information
 
1887
    schema that will do a close_thread_tables(), destroying the
 
1888
    statement transaction cache.
 
1889
  */
 
1890
  DBUG_ASSERT(thd->current_stmt_binlog_row_based);
 
1891
  DBUG_ASSERT(tables && *tables && count > 0);
 
1892
 
 
1893
  char buf[2048];
 
1894
  String query(buf, sizeof(buf), system_charset_info);
 
1895
  int result;
 
1896
  TABLE_LIST tmp_table_list;
 
1897
 
 
1898
  memset(&tmp_table_list, 0, sizeof(tmp_table_list));
 
1899
  tmp_table_list.table = *tables;
 
1900
  query.length(0);      // Have to zero it since constructor doesn't
 
1901
 
 
1902
  result= store_create_info(thd, &tmp_table_list, &query, create_info);
 
1903
  DBUG_ASSERT(result == 0); /* store_create_info() always return 0 */
 
1904
 
 
1905
  thd->binlog_query(THD::STMT_QUERY_TYPE,
 
1906
                    query.ptr(), query.length(),
 
1907
                    /* is_trans */ TRUE,
 
1908
                    /* suppress_use */ FALSE);
 
1909
}
 
1910
 
 
1911
void select_create::store_values(List<Item> &values)
 
1912
{
 
1913
  fill_record(thd, field, values, 1);
 
1914
}
 
1915
 
 
1916
 
 
1917
void select_create::send_error(uint errcode,const char *err)
 
1918
{
 
1919
  DBUG_ENTER("select_create::send_error");
 
1920
 
 
1921
  DBUG_PRINT("info",
 
1922
             ("Current statement %s row-based",
 
1923
              thd->current_stmt_binlog_row_based ? "is" : "is NOT"));
 
1924
  DBUG_PRINT("info",
 
1925
             ("Current table (at 0x%lu) %s a temporary (or non-existant) table",
 
1926
              (ulong) table,
 
1927
              table && !table->s->tmp_table ? "is NOT" : "is"));
 
1928
  DBUG_PRINT("info",
 
1929
             ("Table %s prior to executing this statement",
 
1930
              get_create_info()->table_existed ? "existed" : "did not exist"));
 
1931
 
 
1932
  /*
 
1933
    This will execute any rollbacks that are necessary before writing
 
1934
    the transcation cache.
 
1935
 
 
1936
    We disable the binary log since nothing should be written to the
 
1937
    binary log.  This disabling is important, since we potentially do
 
1938
    a "roll back" of non-transactional tables by removing the table,
 
1939
    and the actual rollback might generate events that should not be
 
1940
    written to the binary log.
 
1941
 
 
1942
  */
 
1943
  tmp_disable_binlog(thd);
 
1944
  select_insert::send_error(errcode, err);
 
1945
  reenable_binlog(thd);
 
1946
 
 
1947
  DBUG_VOID_RETURN;
 
1948
}
 
1949
 
 
1950
 
 
1951
bool select_create::send_eof()
 
1952
{
 
1953
  bool tmp=select_insert::send_eof();
 
1954
  if (tmp)
 
1955
    abort();
 
1956
  else
 
1957
  {
 
1958
    /*
 
1959
      Do an implicit commit at end of statement for non-temporary
 
1960
      tables.  This can fail, but we should unlock the table
 
1961
      nevertheless.
 
1962
    */
 
1963
    if (!table->s->tmp_table)
 
1964
    {
 
1965
      ha_autocommit_or_rollback(thd, 0);
 
1966
      end_active_trans(thd);
 
1967
    }
 
1968
 
 
1969
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1970
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1971
    if (m_plock)
 
1972
    {
 
1973
      mysql_unlock_tables(thd, *m_plock);
 
1974
      *m_plock= NULL;
 
1975
      m_plock= NULL;
 
1976
    }
 
1977
  }
 
1978
  return tmp;
 
1979
}
 
1980
 
 
1981
 
 
1982
void select_create::abort()
 
1983
{
 
1984
  DBUG_ENTER("select_create::abort");
 
1985
 
 
1986
  /*
 
1987
    In select_insert::abort() we roll back the statement, including
 
1988
    truncating the transaction cache of the binary log. To do this, we
 
1989
    pretend that the statement is transactional, even though it might
 
1990
    be the case that it was not.
 
1991
 
 
1992
    We roll back the statement prior to deleting the table and prior
 
1993
    to releasing the lock on the table, since there might be potential
 
1994
    for failure if the rollback is executed after the drop or after
 
1995
    unlocking the table.
 
1996
 
 
1997
    We also roll back the statement regardless of whether the creation
 
1998
    of the table succeeded or not, since we need to reset the binary
 
1999
    log state.
 
2000
  */
 
2001
  tmp_disable_binlog(thd);
 
2002
  select_insert::abort();
 
2003
  thd->transaction.stmt.modified_non_trans_table= FALSE;
 
2004
  reenable_binlog(thd);
 
2005
 
 
2006
 
 
2007
  if (m_plock)
 
2008
  {
 
2009
    mysql_unlock_tables(thd, *m_plock);
 
2010
    *m_plock= NULL;
 
2011
    m_plock= NULL;
 
2012
  }
 
2013
 
 
2014
  if (table)
 
2015
  {
 
2016
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
2017
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
2018
    if (!create_info->table_existed)
 
2019
      drop_open_table(thd, table, create_table->db, create_table->table_name);
 
2020
    table=0;                                    // Safety
 
2021
  }
 
2022
  DBUG_VOID_RETURN;
 
2023
}
 
2024
 
 
2025
 
 
2026
/*****************************************************************************
 
2027
  Instansiate templates
 
2028
*****************************************************************************/
 
2029
 
 
2030
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
2031
template class List_iterator_fast<List_item>;
 
2032
#endif /* HAVE_EXPLICIT_TEMPLATE_INSTANTIATION */