~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_insert.cc

  • Committer: Brian Aker
  • Date: 2008-12-23 07:19:26 UTC
  • Revision ID: brian@tangent.org-20081223071926-69z2ugpftfz1lfnm
Remove dead variables.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
 
17
17
/* Insert of records */
18
18
 
19
 
#include "config.h"
20
 
#include <cstdio>
 
19
/*
 
20
  INSERT DELAYED
 
21
 
 
22
  Drizzle has a different form of DELAYED then MySQL. DELAYED is just
 
23
  a hint to the the sorage engine (which can then do whatever it likes.
 
24
*/
 
25
#include <drizzled/server_includes.h>
21
26
#include <drizzled/sql_select.h>
22
27
#include <drizzled/show.h>
 
28
#include <drizzled/replication/mi.h>
23
29
#include <drizzled/error.h>
24
30
#include <drizzled/name_resolution_context_state.h>
 
31
#include <drizzled/slave.h>
 
32
#include <drizzled/sql_parse.h>
25
33
#include <drizzled/probes.h>
 
34
#include <drizzled/tableop_hooks.h>
26
35
#include <drizzled/sql_base.h>
27
36
#include <drizzled/sql_load.h>
28
37
#include <drizzled/field/timestamp.h>
29
38
#include <drizzled/lock.h>
30
 
#include "drizzled/sql_table.h"
31
 
#include "drizzled/pthread_globals.h"
32
 
#include "drizzled/transaction_services.h"
33
 
#include "drizzled/plugin/transactional_storage_engine.h"
34
 
 
35
 
#include "drizzled/table/shell.h"
36
 
 
37
 
namespace drizzled
38
 
{
39
 
 
40
 
extern plugin::StorageEngine *heap_engine;
41
 
extern plugin::StorageEngine *myisam_engine;
 
39
 
42
40
 
43
41
/*
44
42
  Check if insert fields are correct.
64
62
static int check_insert_fields(Session *session, TableList *table_list,
65
63
                               List<Item> &fields, List<Item> &values,
66
64
                               bool check_unique,
67
 
                               table_map *)
 
65
                               table_map *map __attribute__((unused)))
68
66
{
69
67
  Table *table= table_list->table;
70
68
 
71
69
  if (fields.elements == 0 && values.elements != 0)
72
70
  {
73
 
    if (values.elements != table->getShare()->sizeFields())
 
71
    if (values.elements != table->s->fields)
74
72
    {
75
73
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
76
74
      return -1;
81
79
      No fields are provided so all fields must be provided in the values.
82
80
      Thus we set all bits in the write set.
83
81
    */
84
 
    table->setWriteSet();
 
82
    bitmap_set_all(table->write_set);
85
83
  }
86
84
  else
87
85
  {                                             // Part field list
88
 
    Select_Lex *select_lex= &session->lex->select_lex;
 
86
    SELECT_LEX *select_lex= &session->lex->select_lex;
89
87
    Name_resolution_context *context= &select_lex->context;
90
88
    Name_resolution_context_state ctx_state;
91
89
    int res;
122
120
    }
123
121
    if (table->timestamp_field) // Don't automaticly set timestamp if used
124
122
    {
125
 
      if (table->timestamp_field->isWriteSet())
126
 
      {
 
123
      if (bitmap_is_set(table->write_set,
 
124
                        table->timestamp_field->field_index))
127
125
        clear_timestamp_auto_bits(table->timestamp_field_type,
128
126
                                  TIMESTAMP_AUTO_SET_ON_INSERT);
129
 
      }
130
127
      else
131
128
      {
132
 
        table->setWriteSet(table->timestamp_field->position());
 
129
        bitmap_set_bit(table->write_set,
 
130
                       table->timestamp_field->field_index);
133
131
      }
134
132
    }
 
133
    /* Mark all virtual columns for write*/
 
134
    if (table->vfield)
 
135
      table->mark_virtual_columns();
135
136
  }
136
137
 
137
138
  return 0;
159
160
 
160
161
static int check_update_fields(Session *session, TableList *insert_table_list,
161
162
                               List<Item> &update_fields,
162
 
                               table_map *)
 
163
                               table_map *map __attribute__((unused)))
163
164
{
164
165
  Table *table= insert_table_list->table;
165
166
  bool timestamp_mark= false;
170
171
      Unmark the timestamp field so that we can check if this is modified
171
172
      by update_fields
172
173
    */
173
 
    timestamp_mark= table->write_set->test(table->timestamp_field->position());
174
 
    table->write_set->reset(table->timestamp_field->position());
 
174
    timestamp_mark= bitmap_test_and_clear(table->write_set,
 
175
                                          table->timestamp_field->field_index);
175
176
  }
176
177
 
177
178
  /* Check the fields we are going to modify */
181
182
  if (table->timestamp_field)
182
183
  {
183
184
    /* Don't set timestamp column if this is modified. */
184
 
    if (table->timestamp_field->isWriteSet())
185
 
    {
 
185
    if (bitmap_is_set(table->write_set,
 
186
                      table->timestamp_field->field_index))
186
187
      clear_timestamp_auto_bits(table->timestamp_field_type,
187
188
                                TIMESTAMP_AUTO_SET_ON_UPDATE);
188
 
    }
189
 
 
190
189
    if (timestamp_mark)
191
 
    {
192
 
      table->setWriteSet(table->timestamp_field->position());
193
 
    }
 
190
      bitmap_set_bit(table->write_set,
 
191
                     table->timestamp_field->field_index);
194
192
  }
195
193
  return 0;
196
194
}
205
203
*/
206
204
 
207
205
static
208
 
void upgrade_lock_type(Session *,
 
206
void upgrade_lock_type(Session *session __attribute__((unused)),
209
207
                       thr_lock_type *lock_type,
210
208
                       enum_duplicates duplic,
211
 
                       bool )
 
209
                       bool is_multi_insert __attribute__((unused)))
212
210
{
213
211
  if (duplic == DUP_UPDATE ||
214
212
      (duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT))
238
236
  int error;
239
237
  bool transactional_table, joins_freed= false;
240
238
  bool changed;
 
239
  bool was_insert_delayed= (table_list->lock_type ==  TL_WRITE_DELAYED);
241
240
  uint32_t value_count;
242
241
  ulong counter = 1;
243
242
  uint64_t id;
244
 
  CopyInfo info;
 
243
  COPY_INFO info;
245
244
  Table *table= 0;
246
245
  List_iterator_fast<List_item> its(values_list);
247
246
  List_item *values;
258
257
  upgrade_lock_type(session, &table_list->lock_type, duplic,
259
258
                    values_list.elements > 1);
260
259
 
261
 
  if (session->openTablesLock(table_list))
 
260
  /*
 
261
    We can't write-delayed into a table locked with LOCK TABLES:
 
262
    this will lead to a deadlock, since the delayed thread will
 
263
    never be able to get a lock on the table. QQQ: why not
 
264
    upgrade the lock here instead?
 
265
  */
 
266
  if (table_list->lock_type == TL_WRITE_DELAYED && session->locked_tables &&
 
267
      find_locked_table(session, table_list->db, table_list->table_name))
262
268
  {
263
 
    DRIZZLE_INSERT_DONE(1, 0);
264
 
    return true;
 
269
    my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
 
270
             table_list->table_name);
 
271
    return(true);
265
272
  }
266
273
 
 
274
  {
 
275
    if (open_and_lock_tables(session, table_list))
 
276
      return(true);
 
277
  }
267
278
  lock_type= table_list->lock_type;
268
279
 
269
280
  session->set_proc_info("init");
276
287
                           false,
277
288
                           (fields.elements || !value_count ||
278
289
                            (0) != 0), !ignore))
279
 
  {
280
 
    if (table != NULL)
281
 
      table->cursor->ha_release_auto_increment();
282
 
    if (!joins_freed)
283
 
      free_underlaid_joins(session, &session->lex->select_lex);
284
 
    session->abort_on_warning= 0;
285
 
    DRIZZLE_INSERT_DONE(1, 0);
286
 
    return true;
287
 
  }
 
290
    goto abort;
288
291
 
289
292
  /* mysql_prepare_insert set table_list->table if it was not set */
290
293
  table= table_list->table;
315
318
    if (values->elements != value_count)
316
319
    {
317
320
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
318
 
 
319
 
      if (table != NULL)
320
 
        table->cursor->ha_release_auto_increment();
321
 
      if (!joins_freed)
322
 
        free_underlaid_joins(session, &session->lex->select_lex);
323
 
      session->abort_on_warning= 0;
324
 
      DRIZZLE_INSERT_DONE(1, 0);
325
 
 
326
 
      return true;
 
321
      goto abort;
327
322
    }
328
323
    if (setup_fields(session, 0, *values, MARK_COLUMNS_READ, 0, 0))
329
 
    {
330
 
      if (table != NULL)
331
 
        table->cursor->ha_release_auto_increment();
332
 
      if (!joins_freed)
333
 
        free_underlaid_joins(session, &session->lex->select_lex);
334
 
      session->abort_on_warning= 0;
335
 
      DRIZZLE_INSERT_DONE(1, 0);
336
 
      return true;
337
 
    }
 
324
      goto abort;
338
325
  }
339
326
  its.rewind ();
340
327
 
342
329
  ctx_state.restore_state(context, table_list);
343
330
 
344
331
  /*
345
 
    Fill in the given fields and dump it to the table cursor
 
332
    Fill in the given fields and dump it to the table file
346
333
  */
 
334
  memset(&info, 0, sizeof(info));
347
335
  info.ignore= ignore;
348
336
  info.handle_duplicates=duplic;
349
337
  info.update_fields= &update_fields;
354
342
    For single line insert, generate an error if try to set a NOT NULL field
355
343
    to NULL.
356
344
  */
357
 
  session->count_cuted_fields= ignore ? CHECK_FIELD_WARN : CHECK_FIELD_ERROR_FOR_NULL;
358
 
 
 
345
  session->count_cuted_fields= ((values_list.elements == 1 &&
 
346
                             !ignore) ?
 
347
                            CHECK_FIELD_ERROR_FOR_NULL :
 
348
                            CHECK_FIELD_WARN);
359
349
  session->cuted_fields = 0L;
360
350
  table->next_number_field=table->found_next_number_field;
361
351
 
 
352
  if (session->slave_thread &&
 
353
      (info.handle_duplicates == DUP_UPDATE) &&
 
354
      (table->next_number_field != NULL) &&
 
355
      rpl_master_has_bug(&active_mi->rli, 24432))
 
356
    goto abort;
 
357
 
362
358
  error=0;
363
359
  session->set_proc_info("update");
364
360
  if (duplic == DUP_REPLACE)
365
 
    table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
361
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
366
362
  if (duplic == DUP_UPDATE)
367
 
    table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
363
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
368
364
  {
369
365
    if (duplic != DUP_ERROR || ignore)
370
 
      table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
371
 
    table->cursor->ha_start_bulk_insert(values_list.elements);
 
366
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
367
    table->file->ha_start_bulk_insert(values_list.elements);
372
368
  }
373
369
 
374
370
 
380
376
  {
381
377
    if (fields.elements || !value_count)
382
378
    {
383
 
      table->restoreRecordAsDefault();  // Get empty record
384
 
      if (fill_record(session, fields, *values))
 
379
      restore_record(table,s->default_values);  // Get empty record
 
380
      if (fill_record(session, fields, *values, 0))
385
381
      {
386
382
        if (values_list.elements != 1 && ! session->is_error())
387
383
        {
399
395
    }
400
396
    else
401
397
    {
402
 
      table->restoreRecordAsDefault();  // Get empty record
403
 
 
404
 
      if (fill_record(session, table->getFields(), *values))
 
398
      if (session->used_tables)                 // Column used in values()
 
399
        restore_record(table,s->default_values);        // Get empty record
 
400
      else
 
401
      {
 
402
        /*
 
403
          Fix delete marker. No need to restore rest of record since it will
 
404
          be overwritten by fill_record() anyway (and fill_record() does not
 
405
          use default values in this case).
 
406
        */
 
407
        table->record[0][0]= table->s->default_values[0];
 
408
      }
 
409
      if (fill_record(session, table->field, *values, 0))
405
410
      {
406
411
        if (values_list.elements != 1 && ! session->is_error())
407
412
        {
413
418
      }
414
419
    }
415
420
 
416
 
    // Release latches in case bulk insert takes a long time
417
 
    plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
418
 
 
419
421
    error=write_record(session, table ,&info);
420
422
    if (error)
421
423
      break;
434
436
      Do not do this release if this is a delayed insert, it would steal
435
437
      auto_inc values from the delayed_insert thread as they share Table.
436
438
    */
437
 
    table->cursor->ha_release_auto_increment();
438
 
    if (table->cursor->ha_end_bulk_insert() && !error)
 
439
    table->file->ha_release_auto_increment();
 
440
    if (table->file->ha_end_bulk_insert() && !error)
439
441
    {
440
 
      table->print_error(errno,MYF(0));
 
442
      table->file->print_error(my_errno,MYF(0));
441
443
      error=1;
442
444
    }
443
445
    if (duplic != DUP_ERROR || ignore)
444
 
      table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
445
 
 
446
 
    transactional_table= table->cursor->has_transactions();
447
 
 
448
 
    changed= (info.copied || info.deleted || info.updated);
449
 
    if ((changed && error <= 0) || session->transaction.stmt.hasModifiedNonTransData())
450
 
    {
451
 
      if (session->transaction.stmt.hasModifiedNonTransData())
452
 
        session->transaction.all.markModifiedNonTransData();
453
 
    }
454
 
    assert(transactional_table || !changed || session->transaction.stmt.hasModifiedNonTransData());
 
446
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
447
 
 
448
    transactional_table= table->file->has_transactions();
 
449
 
 
450
    if ((changed= (info.copied || info.deleted || info.updated)))
 
451
    {
 
452
      /*
 
453
        Invalidate the table in the query cache if something changed.
 
454
        For the transactional algorithm to work the invalidation must be
 
455
        before binlog writing and ha_autocommit_or_rollback
 
456
      */
 
457
    }
 
458
    if ((changed && error <= 0) || session->transaction.stmt.modified_non_trans_table || was_insert_delayed)
 
459
    {
 
460
      if (session->transaction.stmt.modified_non_trans_table)
 
461
        session->transaction.all.modified_non_trans_table= true;
 
462
    }
 
463
    assert(transactional_table || !changed ||
 
464
                session->transaction.stmt.modified_non_trans_table);
455
465
 
456
466
  }
457
467
  session->set_proc_info("end");
475
485
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
476
486
  table->auto_increment_field_not_null= false;
477
487
  if (duplic == DUP_REPLACE)
478
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
488
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
479
489
 
480
490
  if (error)
481
 
  {
482
 
    if (table != NULL)
483
 
      table->cursor->ha_release_auto_increment();
484
 
    if (!joins_freed)
485
 
      free_underlaid_joins(session, &session->lex->select_lex);
486
 
    session->abort_on_warning= 0;
487
 
    DRIZZLE_INSERT_DONE(1, 0);
488
 
    return true;
489
 
  }
490
 
 
 
491
    goto abort;
491
492
  if (values_list.elements == 1 && (!(session->options & OPTION_WARNINGS) ||
492
493
                                    !session->cuted_fields))
493
494
  {
494
 
    session->row_count_func= info.copied + info.deleted + info.updated;
495
 
    session->my_ok((ulong) session->row_count_func,
496
 
                   info.copied + info.deleted + info.touched, id);
 
495
    session->row_count_func= info.copied + info.deleted +
 
496
                         ((session->client_capabilities & CLIENT_FOUND_ROWS) ?
 
497
                          info.touched : info.updated);
 
498
    my_ok(session, (ulong) session->row_count_func, id);
497
499
  }
498
500
  else
499
501
  {
500
502
    char buff[160];
 
503
    ha_rows updated=((session->client_capabilities & CLIENT_FOUND_ROWS) ?
 
504
                     info.touched : info.updated);
501
505
    if (ignore)
502
 
      snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
 
506
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
503
507
              (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
504
508
    else
505
 
      snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
506
 
              (ulong) (info.deleted + info.updated), (ulong) session->cuted_fields);
507
 
    session->row_count_func= info.copied + info.deleted + info.updated;
508
 
    session->my_ok((ulong) session->row_count_func,
509
 
                   info.copied + info.deleted + info.touched, id, buff);
 
509
      sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
 
510
              (ulong) (info.deleted + updated), (ulong) session->cuted_fields);
 
511
    session->row_count_func= info.copied + info.deleted + updated;
 
512
    ::my_ok(session, (ulong) session->row_count_func, id, buff);
510
513
  }
511
 
  session->status_var.inserted_row_count+= session->row_count_func;
512
514
  session->abort_on_warning= 0;
513
 
  DRIZZLE_INSERT_DONE(0, session->row_count_func);
 
515
  DRIZZLE_INSERT_END();
 
516
  return(false);
514
517
 
515
 
  return false;
 
518
abort:
 
519
  if (table != NULL)
 
520
    table->file->ha_release_auto_increment();
 
521
  if (!joins_freed)
 
522
    free_underlaid_joins(session, &session->lex->select_lex);
 
523
  session->abort_on_warning= 0;
 
524
  DRIZZLE_INSERT_END();
 
525
  return(true);
516
526
}
517
527
 
518
528
 
533
543
*/
534
544
 
535
545
static bool mysql_prepare_insert_check_table(Session *session, TableList *table_list,
536
 
                                             List<Item> &,
 
546
                                             List<Item> &fields __attribute__((unused)),
537
547
                                             bool select_insert)
538
548
{
539
549
 
591
601
                          Table *table, List<Item> &fields, List_item *values,
592
602
                          List<Item> &update_fields, List<Item> &update_values,
593
603
                          enum_duplicates duplic,
594
 
                          COND **,
 
604
                          COND **where __attribute__((unused)),
595
605
                          bool select_insert,
596
606
                          bool check_fields, bool abort_on_warning)
597
607
{
598
 
  Select_Lex *select_lex= &session->lex->select_lex;
 
608
  SELECT_LEX *select_lex= &session->lex->select_lex;
599
609
  Name_resolution_context *context= &select_lex->context;
600
610
  Name_resolution_context_state ctx_state;
601
611
  bool insert_into_view= (0 != 0);
608
618
  /*
609
619
    For subqueries in VALUES() we should not see the table in which we are
610
620
    inserting (for INSERT ... SELECT this is done by changing table_list,
611
 
    because INSERT ... SELECT share Select_Lex it with SELECT.
 
621
    because INSERT ... SELECT share SELECT_LEX it with SELECT.
612
622
  */
613
623
  if (!select_insert)
614
624
  {
615
 
    for (Select_Lex_Unit *un= select_lex->first_inner_unit();
 
625
    for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit();
616
626
         un;
617
627
         un= un->next_unit())
618
628
    {
619
 
      for (Select_Lex *sl= un->first_select();
 
629
      for (SELECT_LEX *sl= un->first_select();
620
630
           sl;
621
631
           sl= sl->next_select())
622
632
      {
688
698
  if (!select_insert)
689
699
  {
690
700
    TableList *duplicate;
691
 
    if ((duplicate= unique_table(table_list, table_list->next_global, true)))
 
701
    if ((duplicate= unique_table(session, table_list, table_list->next_global, 1)))
692
702
    {
693
 
      my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->alias);
694
 
 
695
 
      return true;
 
703
      update_non_unique_table_error(table_list, "INSERT", duplicate);
 
704
      return(true);
696
705
    }
697
706
  }
698
707
  if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
699
708
    table->prepare_for_position();
700
 
 
701
 
  return false;
 
709
  return(false);
702
710
}
703
711
 
704
712
 
706
714
 
707
715
static int last_uniq_key(Table *table,uint32_t keynr)
708
716
{
709
 
  while (++keynr < table->getShare()->sizeKeys())
 
717
  while (++keynr < table->s->keys)
710
718
    if (table->key_info[keynr].flags & HA_NOSAME)
711
719
      return 0;
712
720
  return 1;
721
729
     write_record()
722
730
      session   - thread context
723
731
      table - table to which record should be written
724
 
      info  - CopyInfo structure describing handling of duplicates
 
732
      info  - COPY_INFO structure describing handling of duplicates
725
733
              and which is used for counting number of records inserted
726
734
              and deleted.
727
735
 
731
739
    then both on update triggers will work instead. Similarly both on
732
740
    delete triggers will be invoked if we will delete conflicting records.
733
741
 
734
 
    Sets session->transaction.stmt.modified_non_trans_data to true if table which is updated didn't have
 
742
    Sets session->transaction.stmt.modified_non_trans_table to true if table which is updated didn't have
735
743
    transactions.
736
744
 
737
745
  RETURN VALUE
740
748
*/
741
749
 
742
750
 
743
 
int write_record(Session *session, Table *table,CopyInfo *info)
 
751
int write_record(Session *session, Table *table,COPY_INFO *info)
744
752
{
745
753
  int error;
746
 
  std::vector<unsigned char> key;
747
 
  boost::dynamic_bitset<> *save_read_set, *save_write_set;
748
 
  uint64_t prev_insert_id= table->cursor->next_insert_id;
 
754
  char *key=0;
 
755
  MY_BITMAP *save_read_set, *save_write_set;
 
756
  uint64_t prev_insert_id= table->file->next_insert_id;
749
757
  uint64_t insert_id_for_cur_row= 0;
750
758
 
751
759
 
753
761
  save_read_set=  table->read_set;
754
762
  save_write_set= table->write_set;
755
763
 
756
 
  if (info->handle_duplicates == DUP_REPLACE || info->handle_duplicates == DUP_UPDATE)
 
764
  if (info->handle_duplicates == DUP_REPLACE ||
 
765
      info->handle_duplicates == DUP_UPDATE)
757
766
  {
758
 
    while ((error=table->cursor->insertRecord(table->getInsertRecord())))
 
767
    while ((error=table->file->ha_write_row(table->record[0])))
759
768
    {
760
769
      uint32_t key_nr;
761
770
      /*
765
774
        the autogenerated value to avoid session->insert_id_for_cur_row to become
766
775
        0.
767
776
      */
768
 
      if (table->cursor->insert_id_for_cur_row > 0)
769
 
        insert_id_for_cur_row= table->cursor->insert_id_for_cur_row;
 
777
      if (table->file->insert_id_for_cur_row > 0)
 
778
        insert_id_for_cur_row= table->file->insert_id_for_cur_row;
770
779
      else
771
 
        table->cursor->insert_id_for_cur_row= insert_id_for_cur_row;
 
780
        table->file->insert_id_for_cur_row= insert_id_for_cur_row;
772
781
      bool is_duplicate_key_error;
773
 
      if (table->cursor->is_fatal_error(error, HA_CHECK_DUP))
 
782
      if (table->file->is_fatal_error(error, HA_CHECK_DUP))
774
783
        goto err;
775
 
      is_duplicate_key_error= table->cursor->is_fatal_error(error, 0);
 
784
      is_duplicate_key_error= table->file->is_fatal_error(error, 0);
776
785
      if (!is_duplicate_key_error)
777
786
      {
778
787
        /*
784
793
          goto gok_or_after_err; /* Ignoring a not fatal error, return 0 */
785
794
        goto err;
786
795
      }
787
 
      if ((int) (key_nr = table->get_dup_key(error)) < 0)
 
796
      if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
788
797
      {
789
798
        error= HA_ERR_FOUND_DUPP_KEY;         /* Database can't find key */
790
799
        goto err;
798
807
      */
799
808
      if (info->handle_duplicates == DUP_REPLACE &&
800
809
          table->next_number_field &&
801
 
          key_nr == table->getShare()->next_number_index &&
 
810
          key_nr == table->s->next_number_index &&
802
811
          (insert_id_for_cur_row > 0))
803
812
        goto err;
804
 
      if (table->cursor->getEngine()->check_flag(HTON_BIT_DUPLICATE_POS))
 
813
      if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
805
814
      {
806
 
        if (table->cursor->rnd_pos(table->getUpdateRecord(),table->cursor->dup_ref))
 
815
        if (table->file->rnd_pos(table->record[1],table->file->dup_ref))
807
816
          goto err;
808
817
      }
809
818
      else
810
819
      {
811
 
        if (table->cursor->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
 
820
        if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
812
821
        {
813
 
          error=errno;
 
822
          error=my_errno;
814
823
          goto err;
815
824
        }
816
825
 
817
 
        if (not key.size())
 
826
        if (!key)
818
827
        {
819
 
          key.resize(table->getShare()->max_unique_length);
 
828
          if (!(key=(char*) malloc(table->s->max_unique_length)))
 
829
          {
 
830
            error=ENOMEM;
 
831
            goto err;
 
832
          }
820
833
        }
821
 
        key_copy(&key[0], table->getInsertRecord(), table->key_info+key_nr, 0);
822
 
        if ((error=(table->cursor->index_read_idx_map(table->getUpdateRecord(),key_nr,
823
 
                                                    &key[0], HA_WHOLE_KEY,
 
834
        key_copy((unsigned char*) key,table->record[0],table->key_info+key_nr,0);
 
835
        if ((error=(table->file->index_read_idx_map(table->record[1],key_nr,
 
836
                                                    (unsigned char*) key, HA_WHOLE_KEY,
824
837
                                                    HA_READ_KEY_EXACT))))
825
838
          goto err;
826
839
      }
831
844
          that matches, is updated. If update causes a conflict again,
832
845
          an error is returned
833
846
        */
834
 
        assert(table->insert_values.size());
835
 
        table->storeRecordAsInsert();
836
 
        table->restoreRecord();
 
847
        assert(table->insert_values != NULL);
 
848
        store_record(table,insert_values);
 
849
        restore_record(table,record[1]);
837
850
        assert(info->update_fields->elements ==
838
851
                    info->update_values->elements);
839
852
        if (fill_record(session, *info->update_fields,
841
854
                                                 info->ignore))
842
855
          goto before_err;
843
856
 
844
 
        table->cursor->restore_auto_increment(prev_insert_id);
 
857
        table->file->restore_auto_increment(prev_insert_id);
845
858
        if (table->next_number_field)
846
 
          table->cursor->adjust_next_insert_id_after_explicit_value(
 
859
          table->file->adjust_next_insert_id_after_explicit_value(
847
860
            table->next_number_field->val_int());
848
861
        info->touched++;
849
 
 
850
 
        if (! table->records_are_comparable() || table->compare_records())
 
862
        if ((table->file->ha_table_flags() & HA_PARTIAL_COLUMN_READ &&
 
863
             !bitmap_is_subset(table->write_set, table->read_set)) ||
 
864
            table->compare_record())
851
865
        {
852
 
          if ((error=table->cursor->updateRecord(table->getUpdateRecord(),
853
 
                                                table->getInsertRecord())) &&
 
866
          if ((error=table->file->ha_update_row(table->record[1],
 
867
                                                table->record[0])) &&
854
868
              error != HA_ERR_RECORD_IS_THE_SAME)
855
869
          {
856
870
            if (info->ignore &&
857
 
                !table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
 
871
                !table->file->is_fatal_error(error, HA_CHECK_DUP_KEY))
858
872
            {
859
873
              goto gok_or_after_err;
860
874
            }
872
886
            Except if LAST_INSERT_ID(#) was in the INSERT query, which is
873
887
            handled separately by Session::arg_of_last_insert_id_function.
874
888
          */
875
 
          insert_id_for_cur_row= table->cursor->insert_id_for_cur_row= 0;
 
889
          insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
876
890
          info->copied++;
877
891
        }
878
892
 
879
893
        if (table->next_number_field)
880
 
          table->cursor->adjust_next_insert_id_after_explicit_value(
 
894
          table->file->adjust_next_insert_id_after_explicit_value(
881
895
            table->next_number_field->val_int());
882
896
        info->touched++;
883
897
 
900
914
          ON UPDATE triggers.
901
915
        */
902
916
        if (last_uniq_key(table,key_nr) &&
903
 
            !table->cursor->referenced_by_foreign_key() &&
 
917
            !table->file->referenced_by_foreign_key() &&
904
918
            (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
905
919
             table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
906
920
        {
907
 
          if ((error=table->cursor->updateRecord(table->getUpdateRecord(),
908
 
                                                table->getInsertRecord())) &&
 
921
          if ((error=table->file->ha_update_row(table->record[1],
 
922
                                                table->record[0])) &&
909
923
              error != HA_ERR_RECORD_IS_THE_SAME)
910
924
            goto err;
911
925
          if (error != HA_ERR_RECORD_IS_THE_SAME)
912
926
            info->deleted++;
913
927
          else
914
928
            error= 0;
915
 
          session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
 
929
          session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
916
930
          /*
917
931
            Since we pretend that we have done insert we should call
918
932
            its after triggers.
921
935
        }
922
936
        else
923
937
        {
924
 
          if ((error=table->cursor->deleteRecord(table->getUpdateRecord())))
 
938
          if ((error=table->file->ha_delete_row(table->record[1])))
925
939
            goto err;
926
940
          info->deleted++;
927
 
          if (!table->cursor->has_transactions())
928
 
            session->transaction.stmt.markModifiedNonTransData();
 
941
          if (!table->file->has_transactions())
 
942
            session->transaction.stmt.modified_non_trans_table= true;
929
943
          /* Let us attempt do write_row() once more */
930
944
        }
931
945
      }
932
946
    }
933
 
    session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
 
947
    session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
934
948
    /*
935
949
      Restore column maps if they where replaced during an duplicate key
936
950
      problem.
937
951
    */
938
952
    if (table->read_set != save_read_set ||
939
953
        table->write_set != save_write_set)
940
 
      table->column_bitmaps_set(*save_read_set, *save_write_set);
 
954
      table->column_bitmaps_set(save_read_set, save_write_set);
941
955
  }
942
 
  else if ((error=table->cursor->insertRecord(table->getInsertRecord())))
 
956
  else if ((error=table->file->ha_write_row(table->record[0])))
943
957
  {
944
958
    if (!info->ignore ||
945
 
        table->cursor->is_fatal_error(error, HA_CHECK_DUP))
 
959
        table->file->is_fatal_error(error, HA_CHECK_DUP))
946
960
      goto err;
947
 
    table->cursor->restore_auto_increment(prev_insert_id);
 
961
    table->file->restore_auto_increment(prev_insert_id);
948
962
    goto gok_or_after_err;
949
963
  }
950
964
 
951
965
after_n_copied_inc:
952
966
  info->copied++;
953
 
  session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
 
967
  session->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
954
968
 
955
969
gok_or_after_err:
956
 
  if (!table->cursor->has_transactions())
957
 
    session->transaction.stmt.markModifiedNonTransData();
 
970
  if (key)
 
971
    free(key);
 
972
  if (!table->file->has_transactions())
 
973
    session->transaction.stmt.modified_non_trans_table= true;
958
974
  return(0);
959
975
 
960
976
err:
962
978
  /* current_select is NULL if this is a delayed insert */
963
979
  if (session->lex->current_select)
964
980
    session->lex->current_select->no_error= 0;        // Give error
965
 
  table->print_error(error,MYF(0));
 
981
  table->file->print_error(error,MYF(0));
966
982
 
967
983
before_err:
968
 
  table->cursor->restore_auto_increment(prev_insert_id);
969
 
  table->column_bitmaps_set(*save_read_set, *save_write_set);
970
 
  return 1;
 
984
  table->file->restore_auto_increment(prev_insert_id);
 
985
  if (key)
 
986
    free(key);
 
987
  table->column_bitmaps_set(save_read_set, save_write_set);
 
988
  return(1);
971
989
}
972
990
 
973
991
 
979
997
                                           TableList *)
980
998
{
981
999
  int err= 0;
 
1000
  MY_BITMAP *write_set= entry->write_set;
982
1001
 
983
 
  for (Field **field=entry->getFields() ; *field ; field++)
 
1002
  for (Field **field=entry->field ; *field ; field++)
984
1003
  {
985
 
    if (((*field)->isWriteSet()) == false)
 
1004
    if (!bitmap_is_set(write_set, (*field)->field_index))
986
1005
    {
987
1006
      /*
988
1007
       * If the field doesn't have any default value
1037
1056
bool mysql_insert_select_prepare(Session *session)
1038
1057
{
1039
1058
  LEX *lex= session->lex;
1040
 
  Select_Lex *select_lex= &lex->select_lex;
 
1059
  SELECT_LEX *select_lex= &lex->select_lex;
1041
1060
 
1042
1061
  /*
1043
 
    Select_Lex do not belong to INSERT statement, so we can't add WHERE
 
1062
    SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
1044
1063
    clause if table is VIEW
1045
1064
  */
1046
1065
 
1068
1087
                             List<Item> *update_fields,
1069
1088
                             List<Item> *update_values,
1070
1089
                             enum_duplicates duplic,
1071
 
                             bool ignore_check_option_errors) :
1072
 
  table_list(table_list_par), table(table_par), fields(fields_par),
1073
 
  autoinc_value_of_last_inserted_row(0),
1074
 
  insert_into_view(table_list_par && 0 != 0)
 
1090
                             bool ignore_check_option_errors)
 
1091
  :table_list(table_list_par), table(table_par), fields(fields_par),
 
1092
   autoinc_value_of_last_inserted_row(0),
 
1093
   insert_into_view(table_list_par && 0 != 0)
1075
1094
{
 
1095
  memset(&info, 0, sizeof(info));
1076
1096
  info.handle_duplicates= duplic;
1077
1097
  info.ignore= ignore_check_option_errors;
1078
1098
  info.update_fields= update_fields;
1081
1101
 
1082
1102
 
1083
1103
int
1084
 
select_insert::prepare(List<Item> &values, Select_Lex_Unit *u)
 
1104
select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
1085
1105
{
1086
1106
  LEX *lex= session->lex;
1087
1107
  int res;
1088
1108
  table_map map= 0;
1089
 
  Select_Lex *lex_current_select_save= lex->current_select;
 
1109
  SELECT_LEX *lex_current_select_save= lex->current_select;
1090
1110
 
1091
1111
 
1092
1112
  unit= u;
1177
1197
    Is table which we are changing used somewhere in other parts of
1178
1198
    query
1179
1199
  */
1180
 
  if (unique_table(table_list, table_list->next_global))
 
1200
  if (unique_table(session, table_list, table_list->next_global, 0))
1181
1201
  {
1182
1202
    /* Using same table for INSERT and SELECT */
1183
1203
    lex->current_select->options|= OPTION_BUFFER_RESULT;
1194
1214
      We won't start bulk inserts at all if this statement uses functions or
1195
1215
      should invoke triggers since they may access to the same table too.
1196
1216
    */
1197
 
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
1217
    table->file->ha_start_bulk_insert((ha_rows) 0);
1198
1218
  }
1199
 
  table->restoreRecordAsDefault();              // Get empty record
 
1219
  restore_record(table,s->default_values);              // Get empty record
1200
1220
  table->next_number_field=table->found_next_number_field;
1201
1221
 
 
1222
  if (session->slave_thread &&
 
1223
      (info.handle_duplicates == DUP_UPDATE) &&
 
1224
      (table->next_number_field != NULL) &&
 
1225
      rpl_master_has_bug(&active_mi->rli, 24432))
 
1226
    return(1);
 
1227
 
1202
1228
  session->cuted_fields=0;
1203
1229
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
1204
 
    table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
1230
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
1205
1231
  if (info.handle_duplicates == DUP_REPLACE)
1206
 
    table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
1232
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1207
1233
  if (info.handle_duplicates == DUP_UPDATE)
1208
 
    table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
1234
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1209
1235
  session->abort_on_warning= !info.ignore;
1210
1236
  table->mark_columns_needed_for_insert();
1211
1237
 
1234
1260
{
1235
1261
 
1236
1262
  if (session->lex->current_select->options & OPTION_BUFFER_RESULT)
1237
 
    table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
1263
    table->file->ha_start_bulk_insert((ha_rows) 0);
1238
1264
  return(0);
1239
1265
}
1240
1266
 
1252
1278
  {
1253
1279
    table->next_number_field=0;
1254
1280
    table->auto_increment_field_not_null= false;
1255
 
    table->cursor->ha_reset();
 
1281
    table->file->ha_reset();
1256
1282
  }
1257
1283
  session->count_cuted_fields= CHECK_FIELD_IGNORE;
1258
1284
  session->abort_on_warning= 0;
1277
1303
  if (session->is_error())
1278
1304
    return(1);
1279
1305
 
1280
 
  // Release latches in case bulk insert takes a long time
1281
 
  plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
1282
 
 
1283
1306
  error= write_record(session, table, &info);
1284
 
  table->auto_increment_field_not_null= false;
1285
1307
 
1286
1308
  if (!error)
1287
1309
  {
1295
1317
        originally touched by INSERT ... SELECT, so we have to restore
1296
1318
        their original values for the next row.
1297
1319
      */
1298
 
      table->restoreRecordAsDefault();
 
1320
      restore_record(table, s->default_values);
1299
1321
    }
1300
1322
    if (table->next_number_field)
1301
1323
    {
1320
1342
void select_insert::store_values(List<Item> &values)
1321
1343
{
1322
1344
  if (fields->elements)
1323
 
    fill_record(session, *fields, values, true);
 
1345
    fill_record(session, *fields, values, 1);
1324
1346
  else
1325
 
    fill_record(session, table->getFields(), values, true);
 
1347
    fill_record(session, table->field, values, 1);
1326
1348
}
1327
1349
 
1328
1350
void select_insert::send_error(uint32_t errcode,const char *err)
1338
1360
bool select_insert::send_eof()
1339
1361
{
1340
1362
  int error;
1341
 
  bool const trans_table= table->cursor->has_transactions();
 
1363
  bool const trans_table= table->file->has_transactions();
1342
1364
  uint64_t id;
1343
1365
  bool changed;
1344
1366
 
1345
 
  error= table->cursor->ha_end_bulk_insert();
1346
 
  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1347
 
  table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1367
  error= table->file->ha_end_bulk_insert();
 
1368
  table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1369
  table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1348
1370
 
1349
1371
  if ((changed= (info.copied || info.deleted || info.updated)))
1350
1372
  {
1351
1373
    /*
1352
1374
      We must invalidate the table in the query cache before binlog writing
1353
 
      and autocommitOrRollback.
 
1375
      and ha_autocommit_or_rollback.
1354
1376
    */
1355
 
    if (session->transaction.stmt.hasModifiedNonTransData())
1356
 
      session->transaction.all.markModifiedNonTransData();
 
1377
    if (session->transaction.stmt.modified_non_trans_table)
 
1378
      session->transaction.all.modified_non_trans_table= true;
1357
1379
  }
1358
1380
  assert(trans_table || !changed ||
1359
 
              session->transaction.stmt.hasModifiedNonTransData());
 
1381
              session->transaction.stmt.modified_non_trans_table);
1360
1382
 
1361
 
  table->cursor->ha_release_auto_increment();
 
1383
  table->file->ha_release_auto_increment();
1362
1384
 
1363
1385
  if (error)
1364
1386
  {
1365
 
    table->print_error(error,MYF(0));
1366
 
    DRIZZLE_INSERT_SELECT_DONE(error, 0);
1367
 
    return 1;
 
1387
    table->file->print_error(error,MYF(0));
 
1388
    return(1);
1368
1389
  }
1369
1390
  char buff[160];
1370
1391
  if (info.ignore)
1371
 
    snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
 
1392
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
1372
1393
            (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
1373
1394
  else
1374
 
    snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
 
1395
    sprintf(buff, ER(ER_INSERT_INFO), (ulong) info.records,
1375
1396
            (ulong) (info.deleted+info.updated), (ulong) session->cuted_fields);
1376
 
  session->row_count_func= info.copied + info.deleted + info.updated;
 
1397
  session->row_count_func= info.copied + info.deleted +
 
1398
                       ((session->client_capabilities & CLIENT_FOUND_ROWS) ?
 
1399
                        info.touched : info.updated);
1377
1400
 
1378
1401
  id= (session->first_successful_insert_id_in_cur_stmt > 0) ?
1379
1402
    session->first_successful_insert_id_in_cur_stmt :
1380
1403
    (session->arg_of_last_insert_id_function ?
1381
1404
     session->first_successful_insert_id_in_prev_stmt :
1382
1405
     (info.copied ? autoinc_value_of_last_inserted_row : 0));
1383
 
  session->my_ok((ulong) session->row_count_func,
1384
 
                 info.copied + info.deleted + info.touched, id, buff);
1385
 
  session->status_var.inserted_row_count+= session->row_count_func; 
1386
 
  DRIZZLE_INSERT_SELECT_DONE(0, session->row_count_func);
1387
 
  return 0;
 
1406
  ::my_ok(session, (ulong) session->row_count_func, id, buff);
 
1407
  return(0);
1388
1408
}
1389
1409
 
1390
1410
void select_insert::abort() {
1400
1420
  {
1401
1421
    bool changed, transactional_table;
1402
1422
 
1403
 
    table->cursor->ha_end_bulk_insert();
 
1423
    table->file->ha_end_bulk_insert();
1404
1424
 
1405
1425
    /*
1406
1426
      If at least one row has been inserted/modified and will stay in
1417
1437
      zero, so no check for that is made.
1418
1438
    */
1419
1439
    changed= (info.copied || info.deleted || info.updated);
1420
 
    transactional_table= table->cursor->has_transactions();
 
1440
    transactional_table= table->file->has_transactions();
1421
1441
    assert(transactional_table || !changed ||
1422
 
                session->transaction.stmt.hasModifiedNonTransData());
1423
 
    table->cursor->ha_release_auto_increment();
1424
 
  }
1425
 
 
1426
 
  if (DRIZZLE_INSERT_SELECT_DONE_ENABLED())
1427
 
  {
1428
 
    DRIZZLE_INSERT_SELECT_DONE(0, info.copied + info.deleted + info.updated);
 
1442
                session->transaction.stmt.modified_non_trans_table);
 
1443
    table->file->ha_release_auto_increment();
1429
1444
  }
1430
1445
 
1431
1446
  return;
1452
1467
      items        in     List of items which should be used to produce rest
1453
1468
                          of fields for the table (corresponding fields will
1454
1469
                          be added to the end of alter_info->create_list)
1455
 
      lock         out    Pointer to the DrizzleLock object for table created
 
1470
      lock         out    Pointer to the DRIZZLE_LOCK object for table created
1456
1471
                          (or open temporary table) will be returned in this
1457
1472
                          parameter. Since this table is not included in
1458
1473
                          Session::lock caller is responsible for explicitly
1462
1477
  NOTES
1463
1478
    This function behaves differently for base and temporary tables:
1464
1479
    - For base table we assume that either table exists and was pre-opened
1465
 
      and locked at openTablesLock() stage (and in this case we just
 
1480
      and locked at open_and_lock_tables() stage (and in this case we just
1466
1481
      emit error or warning and return pre-opened Table object) or special
1467
1482
      placeholder was put in table cache that guarantees that this table
1468
1483
      won't be created or opened until the placeholder will be removed
1480
1495
 
1481
1496
static Table *create_table_from_items(Session *session, HA_CREATE_INFO *create_info,
1482
1497
                                      TableList *create_table,
1483
 
                                      message::Table &table_proto,
1484
 
                                      AlterInfo *alter_info,
 
1498
                                      Alter_info *alter_info,
1485
1499
                                      List<Item> *items,
1486
 
                                      bool is_if_not_exists,
1487
 
                                      DrizzleLock **lock,
1488
 
                                      TableIdentifier &identifier)
 
1500
                                      DRIZZLE_LOCK **lock,
 
1501
                                      Tableop_hooks *hooks)
1489
1502
{
1490
 
  TableShare share(message::Table::INTERNAL);
 
1503
  Table tmp_table;              // Used during 'Create_field()'
 
1504
  TABLE_SHARE share;
 
1505
  Table *table= 0;
1491
1506
  uint32_t select_field_count= items->elements;
1492
1507
  /* Add selected items to field list */
1493
1508
  List_iterator_fast<Item> it(*items);
1495
1510
  Field *tmp_field;
1496
1511
  bool not_used;
1497
1512
 
1498
 
  if (not (identifier.isTmp()) && create_table->table->db_stat)
 
1513
  if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
 
1514
      create_table->table->db_stat)
1499
1515
  {
1500
 
    /* Table already exists and was open at openTablesLock() stage. */
1501
 
    if (is_if_not_exists)
 
1516
    /* Table already exists and was open at open_and_lock_tables() stage. */
 
1517
    if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
1502
1518
    {
1503
1519
      create_info->table_existed= 1;            // Mark that table existed
1504
1520
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1505
1521
                          ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
1506
 
                          create_table->getTableName());
1507
 
      return create_table->table;
 
1522
                          create_table->table_name);
 
1523
      return(create_table->table);
1508
1524
    }
1509
1525
 
1510
 
    my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
1511
 
    return NULL;
 
1526
    my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
 
1527
    return(0);
1512
1528
  }
1513
1529
 
 
1530
  tmp_table.alias= 0;
 
1531
  tmp_table.timestamp_field= 0;
 
1532
  tmp_table.s= &share;
 
1533
  init_tmp_table_share(session, &share, "", 0, "", "");
 
1534
 
 
1535
  tmp_table.s->db_create_options=0;
 
1536
  tmp_table.s->blob_ptr_size= portable_sizeof_char_ptr;
 
1537
  tmp_table.s->db_low_byte_first=
 
1538
        test(create_info->db_type == myisam_hton ||
 
1539
             create_info->db_type == heap_hton);
 
1540
  tmp_table.null_row= false;
 
1541
  tmp_table.maybe_null= false;
 
1542
 
 
1543
  while ((item=it++))
1514
1544
  {
1515
 
    table::Shell tmp_table(share);              // Used during 'CreateField()'
1516
 
    tmp_table.timestamp_field= 0;
1517
 
 
1518
 
    tmp_table.getMutableShare()->db_create_options= 0;
1519
 
    tmp_table.getMutableShare()->blob_ptr_size= portable_sizeof_char_ptr;
1520
 
 
1521
 
    if (not table_proto.engine().name().compare("MyISAM"))
1522
 
      tmp_table.getMutableShare()->db_low_byte_first= true;
1523
 
    else if (not table_proto.engine().name().compare("MEMORY"))
1524
 
      tmp_table.getMutableShare()->db_low_byte_first= true;
1525
 
 
1526
 
    tmp_table.null_row= false;
1527
 
    tmp_table.maybe_null= false;
1528
 
 
1529
 
    tmp_table.in_use= session;
1530
 
 
1531
 
    while ((item=it++))
1532
 
    {
1533
 
      CreateField *cr_field;
1534
 
      Field *field, *def_field;
1535
 
      if (item->type() == Item::FUNC_ITEM)
1536
 
      {
1537
 
        if (item->result_type() != STRING_RESULT)
1538
 
        {
1539
 
          field= item->tmp_table_field(&tmp_table);
1540
 
        }
1541
 
        else
1542
 
        {
1543
 
          field= item->tmp_table_field_from_field_type(&tmp_table, 0);
1544
 
        }
1545
 
      }
 
1545
    Create_field *cr_field;
 
1546
    Field *field, *def_field;
 
1547
    if (item->type() == Item::FUNC_ITEM)
 
1548
      if (item->result_type() != STRING_RESULT)
 
1549
        field= item->tmp_table_field(&tmp_table);
1546
1550
      else
1547
 
      {
1548
 
        field= create_tmp_field(session, &tmp_table, item, item->type(),
1549
 
                                (Item ***) 0, &tmp_field, &def_field, false,
1550
 
                                false, false, 0);
1551
 
      }
1552
 
 
1553
 
      if (!field ||
1554
 
          !(cr_field=new CreateField(field,(item->type() == Item::FIELD_ITEM ?
1555
 
                                            ((Item_field *)item)->field :
1556
 
                                            (Field*) 0))))
1557
 
      {
1558
 
        return NULL;
1559
 
      }
1560
 
 
1561
 
      if (item->maybe_null)
1562
 
      {
1563
 
        cr_field->flags &= ~NOT_NULL_FLAG;
1564
 
      }
1565
 
 
1566
 
      alter_info->create_list.push_back(cr_field);
1567
 
    }
 
1551
        field= item->tmp_table_field_from_field_type(&tmp_table, 0);
 
1552
    else
 
1553
      field= create_tmp_field(session, &tmp_table, item, item->type(),
 
1554
                              (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0,
 
1555
                              0);
 
1556
    if (!field ||
 
1557
        !(cr_field=new Create_field(field,(item->type() == Item::FIELD_ITEM ?
 
1558
                                           ((Item_field *)item)->field :
 
1559
                                           (Field*) 0))))
 
1560
      return(0);
 
1561
    if (item->maybe_null)
 
1562
      cr_field->flags &= ~NOT_NULL_FLAG;
 
1563
    alter_info->create_list.push_back(cr_field);
1568
1564
  }
1569
1565
 
1570
1566
  /*
1573
1569
    Note that we either creating (or opening existing) temporary table or
1574
1570
    creating base table on which name we have exclusive lock. So code below
1575
1571
    should not cause deadlocks or races.
 
1572
 
 
1573
    We don't log the statement, it will be logged later.
 
1574
 
 
1575
    If this is a HEAP table, the automatic DELETE FROM which is written to the
 
1576
    binlog when a HEAP table is opened for the first time since startup, must
 
1577
    not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
 
1578
    don't want to delete from it) 2) it would be written before the CREATE
 
1579
    Table, which is a wrong order. So we keep binary logging disabled when we
 
1580
    open_table().
1576
1581
  */
1577
 
  Table *table= 0;
1578
1582
  {
1579
 
    if (not mysql_create_table_no_lock(session,
1580
 
                                       identifier,
1581
 
                                       create_info,
1582
 
                                       table_proto,
1583
 
                                       alter_info,
1584
 
                                       false,
1585
 
                                       select_field_count,
1586
 
                                       is_if_not_exists))
 
1583
    tmp_disable_binlog(session);
 
1584
    if (!mysql_create_table_no_lock(session, create_table->db,
 
1585
                                    create_table->table_name,
 
1586
                                    create_info, alter_info, 0,
 
1587
                                    select_field_count, true))
1587
1588
    {
1588
 
      if (create_info->table_existed && not identifier.isTmp())
 
1589
      if (create_info->table_existed &&
 
1590
          !(create_info->options & HA_LEX_CREATE_TMP_TABLE))
1589
1591
      {
1590
1592
        /*
1591
1593
          This means that someone created table underneath server
1592
1594
          or it was created via different mysqld front-end to the
1593
1595
          cluster. We don't have much options but throw an error.
1594
1596
        */
1595
 
        my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
1596
 
        return NULL;
 
1597
        my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->table_name);
 
1598
        return(0);
1597
1599
      }
1598
1600
 
1599
 
      if (not identifier.isTmp())
 
1601
      if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
1600
1602
      {
1601
 
        /* CREATE TABLE... has found that the table already exists for insert and is adapting to use it */
1602
 
        boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex());
1603
 
 
1604
 
        if (create_table->table)
 
1603
        pthread_mutex_lock(&LOCK_open);
 
1604
        if (reopen_name_locked_table(session, create_table, false))
1605
1605
        {
1606
 
          table::Concurrent *concurrent_table= static_cast<table::Concurrent *>(create_table->table);
1607
 
 
1608
 
          if (concurrent_table->reopen_name_locked_table(create_table, session))
1609
 
          {
1610
 
            plugin::StorageEngine::dropTable(*session, identifier);
1611
 
          }
1612
 
          else
1613
 
          {
1614
 
            table= create_table->table;
1615
 
          }
 
1606
          quick_rm_table(create_info->db_type, create_table->db,
 
1607
                         table_case_name(create_info, create_table->table_name),
 
1608
                         0);
1616
1609
        }
1617
1610
        else
1618
 
        {
1619
 
          plugin::StorageEngine::dropTable(*session, identifier);
1620
 
        }
 
1611
          table= create_table->table;
 
1612
        pthread_mutex_unlock(&LOCK_open);
1621
1613
      }
1622
1614
      else
1623
1615
      {
1624
 
        if (not (table= session->openTable(create_table, (bool*) 0,
1625
 
                                           DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
1626
 
            not create_info->table_existed)
 
1616
        if (!(table= open_table(session, create_table, (bool*) 0,
 
1617
                                DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
 
1618
            !create_info->table_existed)
1627
1619
        {
1628
1620
          /*
1629
1621
            This shouldn't happen as creation of temporary table should make
1630
1622
            it preparable for open. But let us do close_temporary_table() here
1631
1623
            just in case.
1632
1624
          */
1633
 
          session->drop_temporary_table(identifier);
 
1625
          drop_temporary_table(session, create_table);
1634
1626
        }
1635
1627
      }
1636
1628
    }
1637
 
    if (not table)                                   // open failed
1638
 
      return NULL;
 
1629
    reenable_binlog(session);
 
1630
    if (!table)                                   // open failed
 
1631
      return(0);
1639
1632
  }
1640
1633
 
1641
1634
  table->reginfo.lock_type=TL_WRITE;
1642
 
  if (! ((*lock)= session->lockTables(&table, 1, DRIZZLE_LOCK_IGNORE_FLUSH, &not_used)))
 
1635
  hooks->prelock(&table, 1);                    // Call prelock hooks
 
1636
  if (! ((*lock)= mysql_lock_tables(session, &table, 1,
 
1637
                                    DRIZZLE_LOCK_IGNORE_FLUSH, &not_used)) ||
 
1638
        hooks->postlock(&table, 1))
1643
1639
  {
1644
1640
    if (*lock)
1645
1641
    {
1646
 
      session->unlockTables(*lock);
 
1642
      mysql_unlock_tables(session, *lock);
1647
1643
      *lock= 0;
1648
1644
    }
1649
1645
 
1650
 
    if (not create_info->table_existed)
1651
 
      session->drop_open_table(table, identifier);
1652
 
    return NULL;
 
1646
    if (!create_info->table_existed)
 
1647
      drop_open_table(session, table, create_table->db, create_table->table_name);
 
1648
    return(0);
1653
1649
  }
1654
 
 
1655
 
  return table;
 
1650
  return(table);
1656
1651
}
1657
1652
 
1658
1653
 
1659
1654
int
1660
 
select_create::prepare(List<Item> &values, Select_Lex_Unit *u)
 
1655
select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
1661
1656
{
1662
 
  DrizzleLock *extra_lock= NULL;
 
1657
  DRIZZLE_LOCK *extra_lock= NULL;
 
1658
 
 
1659
 
 
1660
  Tableop_hooks *hook_ptr= NULL;
1663
1661
  /*
1664
 
    For replication, the CREATE-SELECT statement is written
1665
 
    in two pieces: the first transaction messsage contains 
1666
 
    the CREATE TABLE statement as a CreateTableStatement message
1667
 
    necessary to create the table.
1668
 
    
1669
 
    The second transaction message contains all the InsertStatement
1670
 
    and associated InsertRecords that should go into the table.
 
1662
    For row-based replication, the CREATE-SELECT statement is written
 
1663
    in two pieces: the first one contain the CREATE TABLE statement
 
1664
    necessary to create the table and the second part contain the rows
 
1665
    that should go into the table.
 
1666
 
 
1667
    For non-temporary tables, the start of the CREATE-SELECT
 
1668
    implicitly commits the previous transaction, and all events
 
1669
    forming the statement will be stored the transaction cache. At end
 
1670
    of the statement, the entire statement is committed as a
 
1671
    transaction, and all events are written to the binary log.
 
1672
 
 
1673
    On the master, the table is locked for the duration of the
 
1674
    statement, but since the CREATE part is replicated as a simple
 
1675
    statement, there is no way to lock the table for accesses on the
 
1676
    slave.  Hence, we have to hold on to the CREATE part of the
 
1677
    statement until the statement has finished.
1671
1678
   */
 
1679
  class MY_HOOKS : public Tableop_hooks {
 
1680
  public:
 
1681
    MY_HOOKS(select_create *x, TableList *create_table,
 
1682
             TableList *select_tables)
 
1683
      : ptr(x), all_tables(*create_table)
 
1684
      {
 
1685
        all_tables.next_global= select_tables;
 
1686
      }
 
1687
 
 
1688
  private:
 
1689
    virtual int do_postlock(Table **tables, uint32_t count)
 
1690
    {
 
1691
      Table const *const table = *tables;
 
1692
      if (drizzle_bin_log.is_open()
 
1693
          && !table->s->tmp_table
 
1694
          && !ptr->get_create_info()->table_existed)
 
1695
      {
 
1696
        ptr->binlog_show_create_table(tables, count);
 
1697
      }
 
1698
      return 0;
 
1699
    }
 
1700
 
 
1701
    select_create *ptr;
 
1702
    TableList all_tables;
 
1703
  };
 
1704
 
 
1705
  MY_HOOKS hooks(this, create_table, select_tables);
 
1706
  hook_ptr= &hooks;
1672
1707
 
1673
1708
  unit= u;
1674
1709
 
1675
 
  if (not (table= create_table_from_items(session, create_info, create_table,
1676
 
                                          table_proto,
1677
 
                                          alter_info, &values,
1678
 
                                          is_if_not_exists,
1679
 
                                          &extra_lock, identifier)))
 
1710
  /*
 
1711
    Start a statement transaction before the create if we are using
 
1712
    row-based replication for the statement.  If we are creating a
 
1713
    temporary table, we need to start a statement transaction.
 
1714
  */
 
1715
  if ((session->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0
 
1716
      && drizzle_bin_log.is_open())
1680
1717
  {
 
1718
  }
 
1719
 
 
1720
  if (!(table= create_table_from_items(session, create_info, create_table,
 
1721
                                       alter_info, &values,
 
1722
                                       &extra_lock, hook_ptr)))
1681
1723
    return(-1);                         // abort() deletes table
1682
 
  }
1683
1724
 
1684
1725
  if (extra_lock)
1685
1726
  {
1686
1727
    assert(m_plock == NULL);
1687
1728
 
1688
 
    if (identifier.isTmp())
 
1729
    if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
1689
1730
      m_plock= &m_lock;
1690
1731
    else
1691
1732
      m_plock= &session->extra_lock;
1693
1734
    *m_plock= extra_lock;
1694
1735
  }
1695
1736
 
1696
 
  if (table->getShare()->sizeFields() < values.elements)
 
1737
  if (table->s->fields < values.elements)
1697
1738
  {
1698
1739
    my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
1699
1740
    return(-1);
1700
1741
  }
1701
1742
 
1702
1743
 /* First field to copy */
1703
 
  field= table->getFields() + table->getShare()->sizeFields() - values.elements;
 
1744
  field= table->field+table->s->fields - values.elements;
1704
1745
 
1705
1746
  /* Mark all fields that are given values */
1706
1747
  for (Field **f= field ; *f ; f++)
1707
 
  {
1708
 
    table->setWriteSet((*f)->position());
1709
 
  }
 
1748
    bitmap_set_bit(table->write_set, (*f)->field_index);
1710
1749
 
1711
1750
  /* Don't set timestamp if used */
1712
1751
  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
1713
1752
  table->next_number_field=table->found_next_number_field;
1714
1753
 
1715
 
  table->restoreRecordAsDefault();      // Get empty record
 
1754
  restore_record(table,s->default_values);      // Get empty record
1716
1755
  session->cuted_fields=0;
1717
1756
  if (info.ignore || info.handle_duplicates != DUP_ERROR)
1718
 
    table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
1757
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
1719
1758
  if (info.handle_duplicates == DUP_REPLACE)
1720
 
    table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
 
1759
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1721
1760
  if (info.handle_duplicates == DUP_UPDATE)
1722
 
    table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1723
 
  table->cursor->ha_start_bulk_insert((ha_rows) 0);
 
1761
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
 
1762
  table->file->ha_start_bulk_insert((ha_rows) 0);
1724
1763
  session->abort_on_warning= !info.ignore;
1725
1764
  if (check_that_all_fields_are_given_values(session, table, table_list))
1726
1765
    return(1);
1727
1766
  table->mark_columns_needed_for_insert();
1728
 
  table->cursor->extra(HA_EXTRA_WRITE_CACHE);
 
1767
  table->file->extra(HA_EXTRA_WRITE_CACHE);
1729
1768
  return(0);
1730
1769
}
1731
1770
 
 
1771
void
 
1772
select_create::binlog_show_create_table(Table **tables, uint32_t count)
 
1773
{
 
1774
  /*
 
1775
    Note 1: In RBR mode, we generate a CREATE TABLE statement for the
 
1776
    created table by calling store_create_info() (behaves as SHOW
 
1777
    CREATE TABLE).  In the event of an error, nothing should be
 
1778
    written to the binary log, even if the table is non-transactional;
 
1779
    therefore we pretend that the generated CREATE TABLE statement is
 
1780
    for a transactional table.  The event will then be put in the
 
1781
    transaction cache, and any subsequent events (e.g., table-map
 
1782
    events and binrow events) will also be put there.  We can then use
 
1783
    ha_autocommit_or_rollback() to either throw away the entire
 
1784
    kaboodle of events, or write them to the binary log.
 
1785
 
 
1786
    We write the CREATE TABLE statement here and not in prepare()
 
1787
    since there potentially are sub-selects or accesses to information
 
1788
    schema that will do a close_thread_tables(), destroying the
 
1789
    statement transaction cache.
 
1790
  */
 
1791
  assert(tables && *tables && count > 0);
 
1792
 
 
1793
  char buf[2048];
 
1794
  String query(buf, sizeof(buf), system_charset_info);
 
1795
  int result;
 
1796
  TableList tmp_table_list;
 
1797
 
 
1798
  memset(&tmp_table_list, 0, sizeof(tmp_table_list));
 
1799
  tmp_table_list.table = *tables;
 
1800
  query.length(0);      // Have to zero it since constructor doesn't
 
1801
 
 
1802
  result= store_create_info(session, &tmp_table_list, &query, create_info);
 
1803
  assert(result == 0); /* store_create_info() always return 0 */
 
1804
}
 
1805
 
1732
1806
void select_create::store_values(List<Item> &values)
1733
1807
{
1734
 
  fill_record(session, field, values, true);
 
1808
  fill_record(session, field, values, 1);
1735
1809
}
1736
1810
 
1737
1811
 
1738
1812
void select_create::send_error(uint32_t errcode,const char *err)
1739
1813
{
 
1814
 
 
1815
 
1740
1816
  /*
1741
1817
    This will execute any rollbacks that are necessary before writing
1742
1818
    the transcation cache.
1748
1824
    written to the binary log.
1749
1825
 
1750
1826
  */
 
1827
  tmp_disable_binlog(session);
1751
1828
  select_insert::send_error(errcode, err);
 
1829
  reenable_binlog(session);
1752
1830
 
1753
1831
  return;
1754
1832
}
1766
1844
      tables.  This can fail, but we should unlock the table
1767
1845
      nevertheless.
1768
1846
    */
1769
 
    if (!table->getShare()->getType())
 
1847
    if (!table->s->tmp_table)
1770
1848
    {
1771
 
      TransactionServices &transaction_services= TransactionServices::singleton();
1772
 
      transaction_services.autocommitOrRollback(session, 0);
1773
 
      (void) session->endActiveTransaction();
 
1849
      ha_autocommit_or_rollback(session, 0);
 
1850
      end_active_trans(session);
1774
1851
    }
1775
1852
 
1776
 
    table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1777
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1853
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1854
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1778
1855
    if (m_plock)
1779
1856
    {
1780
 
      session->unlockTables(*m_plock);
 
1857
      mysql_unlock_tables(session, *m_plock);
1781
1858
      *m_plock= NULL;
1782
1859
      m_plock= NULL;
1783
1860
    }
1788
1865
 
1789
1866
void select_create::abort()
1790
1867
{
 
1868
 
 
1869
 
1791
1870
  /*
1792
1871
    In select_insert::abort() we roll back the statement, including
1793
1872
    truncating the transaction cache of the binary log. To do this, we
1803
1882
    of the table succeeded or not, since we need to reset the binary
1804
1883
    log state.
1805
1884
  */
 
1885
  tmp_disable_binlog(session);
1806
1886
  select_insert::abort();
 
1887
  session->transaction.stmt.modified_non_trans_table= false;
 
1888
  reenable_binlog(session);
 
1889
 
1807
1890
 
1808
1891
  if (m_plock)
1809
1892
  {
1810
 
    session->unlockTables(*m_plock);
 
1893
    mysql_unlock_tables(session, *m_plock);
1811
1894
    *m_plock= NULL;
1812
1895
    m_plock= NULL;
1813
1896
  }
1814
1897
 
1815
1898
  if (table)
1816
1899
  {
1817
 
    table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1818
 
    table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1819
 
    if (not create_info->table_existed)
1820
 
      session->drop_open_table(table, identifier);
1821
 
    table= NULL;                                    // Safety
 
1900
    table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1901
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
 
1902
    if (!create_info->table_existed)
 
1903
      drop_open_table(session, table, create_table->db, create_table->table_name);
 
1904
    table=0;                                    // Safety
1822
1905
  }
 
1906
  return;
1823
1907
}
1824
1908
 
1825
 
} /* namespace drizzled */
 
1909
 
 
1910
/*****************************************************************************
 
1911
  Instansiate templates
 
1912
*****************************************************************************/
 
1913
 
 
1914
#ifdef HAVE_EXPLICIT_TEMPLATE_INSTANTIATION
 
1915
template class List_iterator_fast<List_item>;
 
1916
#endif /* HAVE_EXPLICIT_TEMPLATE_INSTANTIATION */