~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/statement/alter_table.cc

Merge Monty.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 *  Copyright (C) 2009 Sun Microsystems
 
5
 *
 
6
 *  This program is free software; you can redistribute it and/or modify
 
7
 *  it under the terms of the GNU General Public License as published by
 
8
 *  the Free Software Foundation; either version 2 of the License, or
 
9
 *  (at your option) any later version.
 
10
 *
 
11
 *  This program is distributed in the hope that it will be useful,
 
12
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
 *  GNU General Public License for more details.
 
15
 *
 
16
 *  You should have received a copy of the GNU General Public License
 
17
 *  along with this program; if not, write to the Free Software
 
18
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
19
 */
 
20
 
 
21
#include "config.h"
 
22
 
 
23
#include <fcntl.h>
 
24
 
 
25
#include <sstream>
 
26
 
 
27
#include "drizzled/show.h"
 
28
#include "drizzled/lock.h"
 
29
#include "drizzled/session.h"
 
30
#include "drizzled/statement/alter_table.h"
 
31
#include "drizzled/global_charset_info.h"
 
32
 
 
33
 
 
34
#include "drizzled/gettext.h"
 
35
#include "drizzled/data_home.h"
 
36
#include "drizzled/sql_table.h"
 
37
#include "drizzled/table_proto.h"
 
38
#include "drizzled/plugin/info_schema_table.h"
 
39
#include "drizzled/optimizer/range.h"
 
40
#include "drizzled/time_functions.h"
 
41
#include "drizzled/records.h"
 
42
#include "drizzled/pthread_globals.h"
 
43
#include "drizzled/internal/my_sys.h"
 
44
#include "drizzled/internal/iocache.h"
 
45
 
 
46
extern pid_t current_pid;
 
47
 
 
48
using namespace std;
 
49
 
 
50
namespace drizzled
 
51
{
 
52
 
 
53
static int copy_data_between_tables(Table *from,Table *to,
 
54
                                    List<CreateField> &create,
 
55
                                    bool ignore,
 
56
                                    uint32_t order_num,
 
57
                                    order_st *order,
 
58
                                    ha_rows *copied,
 
59
                                    ha_rows *deleted,
 
60
                                    enum enum_enable_or_disable keys_onoff,
 
61
                                    bool error_if_not_empty);
 
62
 
 
63
static bool mysql_prepare_alter_table(Session *session,
 
64
                                      Table *table,
 
65
                                      HA_CREATE_INFO *create_info,
 
66
                                      message::Table *table_proto,
 
67
                                      AlterInfo *alter_info);
 
68
 
 
69
static int create_temporary_table(Session *session,
 
70
                                  Table *table,
 
71
                                  TableIdentifier &identifier,
 
72
                                  HA_CREATE_INFO *create_info,
 
73
                                  message::Table *create_proto,
 
74
                                  AlterInfo *alter_info);
 
75
 
 
76
static Table *open_alter_table(Session *session, Table *table, char *db, char *table_name);
 
77
 
 
78
bool statement::AlterTable::execute()
 
79
{
 
80
  TableList *first_table= (TableList *) session->lex->select_lex.table_list.first;
 
81
  TableList *all_tables= session->lex->query_tables;
 
82
  assert(first_table == all_tables && first_table != 0);
 
83
  Select_Lex *select_lex= &session->lex->select_lex;
 
84
  bool need_start_waiting= false;
 
85
 
 
86
  if (is_engine_set)
 
87
  {
 
88
    create_info.db_type= 
 
89
      plugin::StorageEngine::findByName(*session, create_table_proto.engine().name());
 
90
 
 
91
    if (create_info.db_type == NULL)
 
92
    {
 
93
      my_error(ER_UNKNOWN_STORAGE_ENGINE, MYF(0), 
 
94
               create_table_proto.name().c_str());
 
95
 
 
96
      return true;
 
97
    }
 
98
  }
 
99
 
 
100
  /* Must be set in the parser */
 
101
  assert(select_lex->db);
 
102
 
 
103
  /* ALTER TABLE ends previous transaction */
 
104
  if (! session->endActiveTransaction())
 
105
  {
 
106
    return true;
 
107
  }
 
108
 
 
109
  if (! (need_start_waiting= ! wait_if_global_read_lock(session, 0, 1)))
 
110
  {
 
111
    return true;
 
112
  }
 
113
 
 
114
  bool res= alter_table(session, 
 
115
                        select_lex->db, 
 
116
                        session->lex->name.str,
 
117
                        &create_info,
 
118
                        &create_table_proto,
 
119
                        first_table,
 
120
                        &alter_info,
 
121
                        select_lex->order_list.elements,
 
122
                        (order_st *) select_lex->order_list.first,
 
123
                        session->lex->ignore);
 
124
  /*
 
125
     Release the protection against the global read lock and wake
 
126
     everyone, who might want to set a global read lock.
 
127
   */
 
128
  start_waiting_global_read_lock(session);
 
129
  return res;
 
130
}
 
131
 
 
132
 
 
133
/**
 
134
  Prepare column and key definitions for CREATE TABLE in ALTER Table.
 
135
 
 
136
  This function transforms parse output of ALTER Table - lists of
 
137
  columns and keys to add, drop or modify into, essentially,
 
138
  CREATE TABLE definition - a list of columns and keys of the new
 
139
  table. While doing so, it also performs some (bug not all)
 
140
  semantic checks.
 
141
 
 
142
  This function is invoked when we know that we're going to
 
143
  perform ALTER Table via a temporary table -- i.e. fast ALTER Table
 
144
  is not possible, perhaps because the ALTER statement contains
 
145
  instructions that require change in table data, not only in
 
146
  table definition or indexes.
 
147
 
 
148
  @param[in,out]  session         thread handle. Used as a memory pool
 
149
                              and source of environment information.
 
150
  @param[in]      table       the source table, open and locked
 
151
                              Used as an interface to the storage engine
 
152
                              to acquire additional information about
 
153
                              the original table.
 
154
  @param[in,out]  create_info A blob with CREATE/ALTER Table
 
155
                              parameters
 
156
  @param[in,out]  alter_info  Another blob with ALTER/CREATE parameters.
 
157
                              Originally create_info was used only in
 
158
                              CREATE TABLE and alter_info only in ALTER Table.
 
159
                              But since ALTER might end-up doing CREATE,
 
160
                              this distinction is gone and we just carry
 
161
                              around two structures.
 
162
 
 
163
  @return
 
164
    Fills various create_info members based on information retrieved
 
165
    from the storage engine.
 
166
    Sets create_info->varchar if the table has a VARCHAR column.
 
167
    Prepares alter_info->create_list and alter_info->key_list with
 
168
    columns and keys of the new table.
 
169
  @retval true   error, out of memory or a semantical error in ALTER
 
170
                 Table instructions
 
171
  @retval false  success
 
172
*/
 
173
static bool mysql_prepare_alter_table(Session *session,
 
174
                                      Table *table,
 
175
                                      HA_CREATE_INFO *create_info,
 
176
                                      message::Table *table_proto,
 
177
                                      AlterInfo *alter_info)
 
178
{
 
179
  /* New column definitions are added here */
 
180
  List<CreateField> new_create_list;
 
181
  /* New key definitions are added here */
 
182
  List<Key> new_key_list;
 
183
  List_iterator<AlterDrop> drop_it(alter_info->drop_list);
 
184
  List_iterator<CreateField> def_it(alter_info->create_list);
 
185
  List_iterator<AlterColumn> alter_it(alter_info->alter_list);
 
186
  List_iterator<Key> key_it(alter_info->key_list);
 
187
  List_iterator<CreateField> find_it(new_create_list);
 
188
  List_iterator<CreateField> field_it(new_create_list);
 
189
  List<Key_part_spec> key_parts;
 
190
  uint32_t used_fields= create_info->used_fields;
 
191
  KEY *key_info= table->key_info;
 
192
  bool rc= true;
 
193
 
 
194
  /* Let new create options override the old ones */
 
195
  message::Table::TableOptions *table_options;
 
196
  table_options= table_proto->mutable_options();
 
197
 
 
198
  if (! (used_fields & HA_CREATE_USED_BLOCK_SIZE))
 
199
    table_options->set_block_size(table->s->block_size);
 
200
  if (! (used_fields & HA_CREATE_USED_DEFAULT_CHARSET))
 
201
    create_info->default_table_charset= table->s->table_charset;
 
202
  if (! (used_fields & HA_CREATE_USED_AUTO) &&
 
203
      table->found_next_number_field)
 
204
  {
 
205
    /* Table has an autoincrement, copy value to new table */
 
206
    table->cursor->info(HA_STATUS_AUTO);
 
207
    create_info->auto_increment_value= table->cursor->stats.auto_increment_value;
 
208
  }
 
209
  if (! (used_fields & HA_CREATE_USED_KEY_BLOCK_SIZE)
 
210
      && table->s->hasKeyBlockSize())
 
211
    table_options->set_key_block_size(table->s->getKeyBlockSize());
 
212
 
 
213
  if ((used_fields & HA_CREATE_USED_KEY_BLOCK_SIZE)
 
214
      && table_options->key_block_size() == 0)
 
215
    table_options->clear_key_block_size();
 
216
 
 
217
  table->restoreRecordAsDefault(); /* Empty record for DEFAULT */
 
218
  CreateField *def;
 
219
 
 
220
  /* First collect all fields from table which isn't in drop_list */
 
221
  Field **f_ptr;
 
222
  Field *field;
 
223
  for (f_ptr= table->field; (field= *f_ptr); f_ptr++)
 
224
  {
 
225
    /* Check if field should be dropped */
 
226
    AlterDrop *drop;
 
227
    drop_it.rewind();
 
228
    while ((drop= drop_it++))
 
229
    {
 
230
      if (drop->type == AlterDrop::COLUMN &&
 
231
          ! my_strcasecmp(system_charset_info, field->field_name, drop->name))
 
232
      {
 
233
        /* Reset auto_increment value if it was dropped */
 
234
        if (MTYP_TYPENR(field->unireg_check) == Field::NEXT_NUMBER &&
 
235
            ! (used_fields & HA_CREATE_USED_AUTO))
 
236
        {
 
237
          create_info->auto_increment_value= 0;
 
238
          create_info->used_fields|= HA_CREATE_USED_AUTO;
 
239
        }
 
240
        break;
 
241
      }
 
242
    }
 
243
    if (drop)
 
244
    {
 
245
      drop_it.remove();
 
246
      continue;
 
247
    }
 
248
    
 
249
    /* Mark that we will read the field */
 
250
    field->setReadSet();
 
251
 
 
252
    /* Check if field is changed */
 
253
    def_it.rewind();
 
254
    while ((def= def_it++))
 
255
    {
 
256
      if (def->change &&
 
257
          ! my_strcasecmp(system_charset_info, field->field_name, def->change))
 
258
              break;
 
259
    }
 
260
    if (def)
 
261
    {
 
262
      /* Field is changed */
 
263
      def->field= field;
 
264
      if (! def->after)
 
265
      {
 
266
        new_create_list.push_back(def);
 
267
        def_it.remove();
 
268
      }
 
269
    }
 
270
    else
 
271
    {
 
272
      /*
 
273
        This field was not dropped and not changed, add it to the list
 
274
        for the new table.
 
275
      */
 
276
      def= new CreateField(field, field);
 
277
      new_create_list.push_back(def);
 
278
      alter_it.rewind(); /* Change default if ALTER */
 
279
      AlterColumn *alter;
 
280
      while ((alter= alter_it++))
 
281
      {
 
282
        if (! my_strcasecmp(system_charset_info,field->field_name, alter->name))
 
283
          break;
 
284
      }
 
285
      if (alter)
 
286
      {
 
287
        if (def->sql_type == DRIZZLE_TYPE_BLOB)
 
288
        {
 
289
          my_error(ER_BLOB_CANT_HAVE_DEFAULT, MYF(0), def->change);
 
290
                goto err;
 
291
        }
 
292
        if ((def->def= alter->def))
 
293
        {
 
294
          /* Use new default */
 
295
          def->flags&= ~NO_DEFAULT_VALUE_FLAG;
 
296
        }
 
297
        else
 
298
          def->flags|= NO_DEFAULT_VALUE_FLAG;
 
299
        alter_it.remove();
 
300
      }
 
301
    }
 
302
  }
 
303
  def_it.rewind();
 
304
  while ((def= def_it++)) /* Add new columns */
 
305
  {
 
306
    if (def->change && ! def->field)
 
307
    {
 
308
      my_error(ER_BAD_FIELD_ERROR, MYF(0), def->change, table->s->table_name.str);
 
309
      goto err;
 
310
    }
 
311
    /*
 
312
      Check that the DATE/DATETIME not null field we are going to add is
 
313
      either has a default value or the '0000-00-00' is allowed by the
 
314
      set sql mode.
 
315
      If the '0000-00-00' value isn't allowed then raise the error_if_not_empty
 
316
      flag to allow ALTER Table only if the table to be altered is empty.
 
317
    */
 
318
    if ((def->sql_type == DRIZZLE_TYPE_DATE ||
 
319
         def->sql_type == DRIZZLE_TYPE_DATETIME) &&
 
320
        ! alter_info->datetime_field &&
 
321
        ! (~def->flags & (NO_DEFAULT_VALUE_FLAG | NOT_NULL_FLAG)) &&
 
322
        session->variables.sql_mode & MODE_NO_ZERO_DATE)
 
323
    {
 
324
      alter_info->datetime_field= def;
 
325
      alter_info->error_if_not_empty= true;
 
326
    }
 
327
    if (! def->after)
 
328
      new_create_list.push_back(def);
 
329
    else if (def->after == first_keyword)
 
330
      new_create_list.push_front(def);
 
331
    else
 
332
    {
 
333
      CreateField *find;
 
334
      find_it.rewind();
 
335
      while ((find= find_it++)) /* Add new columns */
 
336
      {
 
337
        if (! my_strcasecmp(system_charset_info,def->after, find->field_name))
 
338
          break;
 
339
      }
 
340
      if (! find)
 
341
      {
 
342
        my_error(ER_BAD_FIELD_ERROR, MYF(0), def->after, table->s->table_name.str);
 
343
        goto err;
 
344
      }
 
345
      find_it.after(def); /* Put element after this */
 
346
      /*
 
347
        XXX: hack for Bug#28427.
 
348
        If column order has changed, force OFFLINE ALTER Table
 
349
        without querying engine capabilities.  If we ever have an
 
350
        engine that supports online ALTER Table CHANGE COLUMN
 
351
        <name> AFTER <name1> (Falcon?), this fix will effectively
 
352
        disable the capability.
 
353
        TODO: detect the situation in compare_tables, behave based
 
354
        on engine capabilities.
 
355
      */
 
356
      if (alter_info->build_method == HA_BUILD_ONLINE)
 
357
      {
 
358
        my_error(ER_NOT_SUPPORTED_YET, MYF(0), session->query);
 
359
        goto err;
 
360
      }
 
361
      alter_info->build_method= HA_BUILD_OFFLINE;
 
362
    }
 
363
  }
 
364
  if (alter_info->alter_list.elements)
 
365
  {
 
366
    my_error(ER_BAD_FIELD_ERROR,
 
367
             MYF(0),
 
368
             alter_info->alter_list.head()->name,
 
369
             table->s->table_name.str);
 
370
    goto err;
 
371
  }
 
372
  if (! new_create_list.elements)
 
373
  {
 
374
    my_message(ER_CANT_REMOVE_ALL_FIELDS,
 
375
               ER(ER_CANT_REMOVE_ALL_FIELDS),
 
376
               MYF(0));
 
377
    goto err;
 
378
  }
 
379
 
 
380
  /*
 
381
    Collect all keys which isn't in drop list. Add only those
 
382
    for which some fields exists.
 
383
  */
 
384
  for (uint32_t i= 0; i < table->s->keys; i++, key_info++)
 
385
  {
 
386
    char *key_name= key_info->name;
 
387
    AlterDrop *drop;
 
388
    drop_it.rewind();
 
389
    while ((drop= drop_it++))
 
390
    {
 
391
      if (drop->type == AlterDrop::KEY &&
 
392
          ! my_strcasecmp(system_charset_info, key_name, drop->name))
 
393
        break;
 
394
    }
 
395
    if (drop)
 
396
    {
 
397
      drop_it.remove();
 
398
      continue;
 
399
    }
 
400
 
 
401
    KEY_PART_INFO *key_part= key_info->key_part;
 
402
    key_parts.empty();
 
403
    for (uint32_t j= 0; j < key_info->key_parts; j++, key_part++)
 
404
    {
 
405
      if (! key_part->field)
 
406
              continue; /* Wrong field (from UNIREG) */
 
407
 
 
408
      const char *key_part_name= key_part->field->field_name;
 
409
      CreateField *cfield;
 
410
      field_it.rewind();
 
411
      while ((cfield= field_it++))
 
412
      {
 
413
        if (cfield->change)
 
414
        {
 
415
          if (! my_strcasecmp(system_charset_info, key_part_name, cfield->change))
 
416
            break;
 
417
        }
 
418
        else if (! my_strcasecmp(system_charset_info, key_part_name, cfield->field_name))
 
419
          break;
 
420
      }
 
421
      if (! cfield)
 
422
              continue; /* Field is removed */
 
423
      
 
424
      uint32_t key_part_length= key_part->length;
 
425
      if (cfield->field) /* Not new field */
 
426
      {
 
427
        /*
 
428
          If the field can't have only a part used in a key according to its
 
429
          new type, or should not be used partially according to its
 
430
          previous type, or the field length is less than the key part
 
431
          length, unset the key part length.
 
432
 
 
433
          We also unset the key part length if it is the same as the
 
434
          old field's length, so the whole new field will be used.
 
435
 
 
436
          BLOBs may have cfield->length == 0, which is why we test it before
 
437
          checking whether cfield->length < key_part_length (in chars).
 
438
         */
 
439
        if (! Field::type_can_have_key_part(cfield->field->type()) ||
 
440
            ! Field::type_can_have_key_part(cfield->sql_type) ||
 
441
            (cfield->field->field_length == key_part_length) ||
 
442
            (cfield->length &&
 
443
             (cfield->length < key_part_length / key_part->field->charset()->mbmaxlen)))
 
444
          key_part_length= 0; /* Use whole field */
 
445
      }
 
446
      key_part_length/= key_part->field->charset()->mbmaxlen;
 
447
      key_parts.push_back(new Key_part_spec(cfield->field_name,
 
448
                                            strlen(cfield->field_name),
 
449
                                            key_part_length));
 
450
    }
 
451
    if (key_parts.elements)
 
452
    {
 
453
      KEY_CREATE_INFO key_create_info;
 
454
      Key *key;
 
455
      enum Key::Keytype key_type;
 
456
      memset(&key_create_info, 0, sizeof(key_create_info));
 
457
 
 
458
      key_create_info.algorithm= key_info->algorithm;
 
459
      if (key_info->flags & HA_USES_BLOCK_SIZE)
 
460
        key_create_info.block_size= key_info->block_size;
 
461
      if (key_info->flags & HA_USES_COMMENT)
 
462
        key_create_info.comment= key_info->comment;
 
463
 
 
464
      if (key_info->flags & HA_NOSAME)
 
465
      {
 
466
        if (is_primary_key_name(key_name))
 
467
          key_type= Key::PRIMARY;
 
468
        else
 
469
          key_type= Key::UNIQUE;
 
470
      }
 
471
      else
 
472
        key_type= Key::MULTIPLE;
 
473
 
 
474
      key= new Key(key_type,
 
475
                   key_name,
 
476
                   strlen(key_name),
 
477
                   &key_create_info,
 
478
                   test(key_info->flags & HA_GENERATED_KEY),
 
479
                   key_parts);
 
480
      new_key_list.push_back(key);
 
481
    }
 
482
  }
 
483
  {
 
484
    Key *key;
 
485
    while ((key= key_it++)) /* Add new keys */
 
486
    {
 
487
      if (key->type == Key::FOREIGN_KEY &&
 
488
          ((Foreign_key *)key)->validate(new_create_list))
 
489
        goto err;
 
490
      if (key->type != Key::FOREIGN_KEY)
 
491
        new_key_list.push_back(key);
 
492
      if (key->name.str && is_primary_key_name(key->name.str))
 
493
      {
 
494
        my_error(ER_WRONG_NAME_FOR_INDEX,
 
495
                 MYF(0),
 
496
                 key->name.str);
 
497
        goto err;
 
498
      }
 
499
    }
 
500
  }
 
501
 
 
502
  if (alter_info->drop_list.elements)
 
503
  {
 
504
    my_error(ER_CANT_DROP_FIELD_OR_KEY,
 
505
             MYF(0),
 
506
             alter_info->drop_list.head()->name);
 
507
    goto err;
 
508
  }
 
509
  if (alter_info->alter_list.elements)
 
510
  {
 
511
    my_error(ER_CANT_DROP_FIELD_OR_KEY,
 
512
             MYF(0),
 
513
             alter_info->alter_list.head()->name);
 
514
    goto err;
 
515
  }
 
516
 
 
517
  if (! table_proto->options().has_comment()
 
518
      && table->s->hasComment())
 
519
    table_options->set_comment(table->s->getComment());
 
520
 
 
521
  if (table->s->tmp_table)
 
522
  {
 
523
    table_proto->set_type(message::Table::TEMPORARY);
 
524
  }
 
525
 
 
526
  rc= false;
 
527
  alter_info->create_list.swap(new_create_list);
 
528
  alter_info->key_list.swap(new_key_list);
 
529
err:
 
530
  return rc;
 
531
}
 
532
 
 
533
/* table_list should contain just one table */
 
534
static int mysql_discard_or_import_tablespace(Session *session,
 
535
                                              TableList *table_list,
 
536
                                              enum tablespace_op_type tablespace_op)
 
537
{
 
538
  Table *table;
 
539
  bool discard;
 
540
  int error;
 
541
 
 
542
  /*
 
543
    Note that DISCARD/IMPORT TABLESPACE always is the only operation in an
 
544
    ALTER Table
 
545
  */
 
546
 
 
547
  session->set_proc_info("discard_or_import_tablespace");
 
548
 
 
549
  discard= test(tablespace_op == DISCARD_TABLESPACE);
 
550
 
 
551
 /*
 
552
   We set this flag so that ha_innobase::open and ::external_lock() do
 
553
   not complain when we lock the table
 
554
 */
 
555
  session->tablespace_op= true;
 
556
  if (!(table= session->openTableLock(table_list, TL_WRITE)))
 
557
  {
 
558
    session->tablespace_op= false;
 
559
    return -1;
 
560
  }
 
561
 
 
562
  error= table->cursor->ha_discard_or_import_tablespace(discard);
 
563
 
 
564
  session->set_proc_info("end");
 
565
 
 
566
  if (error)
 
567
    goto err;
 
568
 
 
569
  /* The ALTER Table is always in its own transaction */
 
570
  error = ha_autocommit_or_rollback(session, 0);
 
571
  if (! session->endActiveTransaction())
 
572
    error=1;
 
573
  if (error)
 
574
    goto err;
 
575
  write_bin_log(session, session->query, session->query_length);
 
576
 
 
577
err:
 
578
  ha_autocommit_or_rollback(session, error);
 
579
  session->tablespace_op=false;
 
580
 
 
581
  if (error == 0)
 
582
  {
 
583
    session->my_ok();
 
584
    return 0;
 
585
  }
 
586
 
 
587
  table->print_error(error, MYF(0));
 
588
 
 
589
  return -1;
 
590
}
 
591
 
 
592
/**
 
593
  Manages enabling/disabling of indexes for ALTER Table
 
594
 
 
595
  SYNOPSIS
 
596
    alter_table_manage_keys()
 
597
      table                  Target table
 
598
      indexes_were_disabled  Whether the indexes of the from table
 
599
                             were disabled
 
600
      keys_onoff             ENABLE | DISABLE | LEAVE_AS_IS
 
601
 
 
602
  RETURN VALUES
 
603
    false  OK
 
604
    true   Error
 
605
*/
 
606
static bool alter_table_manage_keys(Table *table, int indexes_were_disabled,
 
607
                             enum enum_enable_or_disable keys_onoff)
 
608
{
 
609
  int error= 0;
 
610
  switch (keys_onoff) {
 
611
  case ENABLE:
 
612
    error= table->cursor->ha_enable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE);
 
613
    break;
 
614
  case LEAVE_AS_IS:
 
615
    if (!indexes_were_disabled)
 
616
      break;
 
617
    /* fall-through: disabled indexes */
 
618
  case DISABLE:
 
619
    error= table->cursor->ha_disable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE);
 
620
  }
 
621
 
 
622
  if (error == HA_ERR_WRONG_COMMAND)
 
623
  {
 
624
    push_warning_printf(current_session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
625
                        ER_ILLEGAL_HA, ER(ER_ILLEGAL_HA),
 
626
                        table->s->table_name.str);
 
627
    error= 0;
 
628
  } else if (error)
 
629
    table->print_error(error, MYF(0));
 
630
 
 
631
  return(error);
 
632
}
 
633
 
 
634
/**
 
635
  Alter table
 
636
 
 
637
  SYNOPSIS
 
638
    alter_table()
 
639
      session              Thread handle
 
640
      new_db           If there is a RENAME clause
 
641
      new_name         If there is a RENAME clause
 
642
      create_info      Information from the parsing phase about new
 
643
                       table properties.
 
644
      table_list       The table to change.
 
645
      alter_info       Lists of fields, keys to be changed, added
 
646
                       or dropped.
 
647
      order_num        How many order_st BY fields has been specified.
 
648
      order            List of fields to order_st BY.
 
649
      ignore           Whether we have ALTER IGNORE Table
 
650
 
 
651
  DESCRIPTION
 
652
    This is a veery long function and is everything but the kitchen sink :)
 
653
    It is used to alter a table and not only by ALTER Table but also
 
654
    CREATE|DROP INDEX are mapped on this function.
 
655
 
 
656
    When the ALTER Table statement just does a RENAME or ENABLE|DISABLE KEYS,
 
657
    or both, then this function short cuts its operation by renaming
 
658
    the table and/or enabling/disabling the keys. In this case, the FRM is
 
659
    not changed, directly by alter_table. However, if there is a
 
660
    RENAME + change of a field, or an index, the short cut is not used.
 
661
    See how `create_list` is used to generate the new FRM regarding the
 
662
    structure of the fields. The same is done for the indices of the table.
 
663
 
 
664
    Important is the fact, that this function tries to do as little work as
 
665
    possible, by finding out whether a intermediate table is needed to copy
 
666
    data into and when finishing the altering to use it as the original table.
 
667
    For this reason the function compare_tables() is called, which decides
 
668
    based on all kind of data how similar are the new and the original
 
669
    tables.
 
670
 
 
671
  RETURN VALUES
 
672
    false  OK
 
673
    true   Error
 
674
*/
 
675
bool alter_table(Session *session,
 
676
                 char *new_db,
 
677
                 char *new_name,
 
678
                 HA_CREATE_INFO *create_info,
 
679
                 message::Table *create_proto,
 
680
                 TableList *table_list,
 
681
                 AlterInfo *alter_info,
 
682
                 uint32_t order_num,
 
683
                 order_st *order,
 
684
                 bool ignore)
 
685
{
 
686
  Table *table;
 
687
  Table *new_table= NULL;
 
688
  Table *name_lock= NULL;
 
689
  string new_name_str;
 
690
  int error= 0;
 
691
  char tmp_name[80];
 
692
  char old_name[32];
 
693
  char new_name_buff[FN_REFLEN];
 
694
  char new_alias_buff[FN_REFLEN];
 
695
  char *table_name;
 
696
  char *db;
 
697
  const char *new_alias;
 
698
  ha_rows copied= 0;
 
699
  ha_rows deleted= 0;
 
700
  plugin::StorageEngine *old_db_type;
 
701
  plugin::StorageEngine *new_db_type;
 
702
  plugin::StorageEngine *save_old_db_type;
 
703
  bitset<32> tmp;
 
704
 
 
705
  new_name_buff[0]= '\0';
 
706
 
 
707
  /**
 
708
   * @todo this is a result of retaining the behavior that was here before. This should be removed
 
709
   * and the correct error handling should be done in doDropTable for the I_S engine.
 
710
   */
 
711
  plugin::InfoSchemaTable *sch_table= plugin::InfoSchemaTable::getTable(table_list->table_name);
 
712
  if (sch_table)
 
713
  {
 
714
    my_error(ER_DBACCESS_DENIED_ERROR, MYF(0), "", "", INFORMATION_SCHEMA_NAME.c_str());
 
715
    return true;
 
716
  }
 
717
 
 
718
  session->set_proc_info("init");
 
719
 
 
720
  /*
 
721
    Assign variables table_name, new_name, db, new_db, path
 
722
    to simplify further comparisons: we want to see if it's a RENAME
 
723
    later just by comparing the pointers, avoiding the need for strcmp.
 
724
  */
 
725
  table_name= table_list->table_name;
 
726
  db= table_list->db;
 
727
  if (! new_db || ! my_strcasecmp(table_alias_charset, new_db, db))
 
728
    new_db= db;
 
729
 
 
730
  if (alter_info->tablespace_op != NO_TABLESPACE_OP)
 
731
  {
 
732
    /* DISCARD/IMPORT TABLESPACE is always alone in an ALTER Table */
 
733
    return mysql_discard_or_import_tablespace(session, table_list, alter_info->tablespace_op);
 
734
  }
 
735
 
 
736
  ostringstream oss;
 
737
  oss << drizzle_data_home << "/" << db << "/" << table_name;
 
738
 
 
739
  (void) unpack_filename(new_name_buff, oss.str().c_str());
 
740
 
 
741
  /*
 
742
    If this is just a rename of a view, short cut to the
 
743
    following scenario: 1) lock LOCK_open 2) do a RENAME
 
744
    2) unlock LOCK_open.
 
745
    This is a copy-paste added to make sure
 
746
    ALTER (sic:) Table .. RENAME works for views. ALTER VIEW is handled
 
747
    as an independent branch in mysql_execute_command. The need
 
748
    for a copy-paste arose because the main code flow of ALTER Table
 
749
    ... RENAME tries to use openTableLock, which does not work for views
 
750
    (openTableLock was never modified to merge table lists of child tables
 
751
    into the main table list, like open_tables does).
 
752
    This code is wrong and will be removed, please do not copy.
 
753
  */
 
754
 
 
755
  if (!(table= session->openTableLock(table_list, TL_WRITE_ALLOW_READ)))
 
756
    return true;
 
757
  
 
758
  table->use_all_columns();
 
759
 
 
760
  /* Check that we are not trying to rename to an existing table */
 
761
  if (new_name)
 
762
  {
 
763
    strcpy(new_name_buff, new_name);
 
764
    strcpy(new_alias_buff, new_name);
 
765
    new_alias= new_alias_buff;
 
766
 
 
767
    my_casedn_str(files_charset_info, new_name_buff);
 
768
    new_alias= new_name; // Create lower case table name
 
769
    my_casedn_str(files_charset_info, new_name);
 
770
 
 
771
    if (new_db == db &&
 
772
        ! my_strcasecmp(table_alias_charset, new_name_buff, table_name))
 
773
    {
 
774
      /*
 
775
        Source and destination table names are equal: make later check
 
776
        easier.
 
777
      */
 
778
      new_alias= new_name= table_name;
 
779
    }
 
780
    else
 
781
    {
 
782
      if (table->s->tmp_table != NO_TMP_TABLE)
 
783
      {
 
784
        if (session->find_temporary_table(new_db, new_name_buff))
 
785
        {
 
786
          my_error(ER_TABLE_EXISTS_ERROR, MYF(0), new_name_buff);
 
787
          return true;
 
788
        }
 
789
      }
 
790
      else
 
791
      {
 
792
        if (session->lock_table_name_if_not_cached(new_db, new_name, &name_lock))
 
793
          return true;
 
794
 
 
795
        if (! name_lock)
 
796
        {
 
797
          my_error(ER_TABLE_EXISTS_ERROR, MYF(0), new_alias);
 
798
          return true;
 
799
        }
 
800
 
 
801
        build_table_filename(new_name_buff, sizeof(new_name_buff), new_db, new_name_buff, false);
 
802
 
 
803
        if (plugin::StorageEngine::getTableDefinition(*session, new_name_buff, new_db, new_name_buff, false) == EEXIST)
 
804
        {
 
805
          /* Table will be closed by Session::executeCommand() */
 
806
          my_error(ER_TABLE_EXISTS_ERROR, MYF(0), new_alias);
 
807
          goto err;
 
808
        }
 
809
      }
 
810
    }
 
811
  }
 
812
  else
 
813
  {
 
814
    new_alias= table_name;
 
815
    new_name= table_name;
 
816
  }
 
817
 
 
818
  old_db_type= table->s->db_type();
 
819
  if (! create_info->db_type)
 
820
  {
 
821
    create_info->db_type= old_db_type;
 
822
  }
 
823
 
 
824
  if (table->s->tmp_table != NO_TMP_TABLE)
 
825
  {
 
826
    create_proto->set_type(message::Table::TEMPORARY);
 
827
  }
 
828
  else
 
829
  {
 
830
    create_proto->set_type(message::Table::STANDARD);
 
831
  }
 
832
 
 
833
  new_db_type= create_info->db_type;
 
834
 
 
835
  if (new_db_type != old_db_type &&
 
836
      !table->cursor->can_switch_engines())
 
837
  {
 
838
    assert(0);
 
839
    my_error(ER_ROW_IS_REFERENCED, MYF(0));
 
840
    goto err;
 
841
  }
 
842
 
 
843
  if (create_info->row_type == ROW_TYPE_NOT_USED)
 
844
  {
 
845
    message::Table::TableOptions *table_options;
 
846
    table_options= create_proto->mutable_options();
 
847
 
 
848
    create_info->row_type= table->s->row_type;
 
849
    table_options->set_row_type((drizzled::message::Table_TableOptions_RowType)table->s->row_type);
 
850
  }
 
851
 
 
852
  if (old_db_type->check_flag(HTON_BIT_ALTER_NOT_SUPPORTED) ||
 
853
      new_db_type->check_flag(HTON_BIT_ALTER_NOT_SUPPORTED))
 
854
  {
 
855
    my_error(ER_ILLEGAL_HA, MYF(0), table_name);
 
856
    goto err;
 
857
  }
 
858
 
 
859
  session->set_proc_info("setup");
 
860
  
 
861
  /*
 
862
   * test if no other bits except ALTER_RENAME and ALTER_KEYS_ONOFF are set
 
863
   */
 
864
  tmp.set();
 
865
  tmp.reset(ALTER_RENAME);
 
866
  tmp.reset(ALTER_KEYS_ONOFF);
 
867
  tmp&= alter_info->flags;
 
868
  if (! (tmp.any()) &&
 
869
      ! table->s->tmp_table) // no need to touch frm
 
870
  {
 
871
    switch (alter_info->keys_onoff)
 
872
    {
 
873
    case LEAVE_AS_IS:
 
874
      break;
 
875
    case ENABLE:
 
876
      /*
 
877
        wait_while_table_is_used() ensures that table being altered is
 
878
        opened only by this thread and that Table::TableShare::version
 
879
        of Table object corresponding to this table is 0.
 
880
        The latter guarantees that no DML statement will open this table
 
881
        until ALTER Table finishes (i.e. until close_thread_tables())
 
882
        while the fact that the table is still open gives us protection
 
883
        from concurrent DDL statements.
 
884
      */
 
885
      pthread_mutex_lock(&LOCK_open); /* DDL wait for/blocker */
 
886
      wait_while_table_is_used(session, table, HA_EXTRA_FORCE_REOPEN);
 
887
      pthread_mutex_unlock(&LOCK_open);
 
888
      error= table->cursor->ha_enable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE);
 
889
      /* COND_refresh will be signaled in close_thread_tables() */
 
890
      break;
 
891
    case DISABLE:
 
892
      pthread_mutex_lock(&LOCK_open); /* DDL wait for/blocker */
 
893
      wait_while_table_is_used(session, table, HA_EXTRA_FORCE_REOPEN);
 
894
      pthread_mutex_unlock(&LOCK_open);
 
895
      error=table->cursor->ha_disable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE);
 
896
      /* COND_refresh will be signaled in close_thread_tables() */
 
897
      break;
 
898
    default:
 
899
      assert(false);
 
900
      error= 0;
 
901
      break;
 
902
    }
 
903
 
 
904
    if (error == HA_ERR_WRONG_COMMAND)
 
905
    {
 
906
      error= 0;
 
907
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
908
                          ER_ILLEGAL_HA, ER(ER_ILLEGAL_HA),
 
909
                          table->alias);
 
910
    }
 
911
 
 
912
    pthread_mutex_lock(&LOCK_open); /* Lock to remove all instances of table from table cache before ALTER */
 
913
    /*
 
914
      Unlike to the above case close_cached_table() below will remove ALL
 
915
      instances of Table from table cache (it will also remove table lock
 
916
      held by this thread). So to make actual table renaming and writing
 
917
      to binlog atomic we have to put them into the same critical section
 
918
      protected by LOCK_open mutex. This also removes gap for races between
 
919
      access() and mysql_rename_table() calls.
 
920
    */
 
921
 
 
922
    if (error == 0 && 
 
923
        (new_name != table_name || new_db != db))
 
924
    {
 
925
      session->set_proc_info("rename");
 
926
      /*
 
927
        Then do a 'simple' rename of the table. First we need to close all
 
928
        instances of 'source' table.
 
929
      */
 
930
      session->close_cached_table(table);
 
931
      /*
 
932
        Then, we want check once again that target table does not exist.
 
933
        Actually the order of these two steps does not matter since
 
934
        earlier we took name-lock on the target table, so we do them
 
935
        in this particular order only to be consistent with 5.0, in which
 
936
        we don't take this name-lock and where this order really matters.
 
937
        TODO: Investigate if we need this access() check at all.
 
938
      */
 
939
      if (plugin::StorageEngine::getTableDefinition(*session, new_name, db, table_name, false) == EEXIST)
 
940
      {
 
941
        my_error(ER_TABLE_EXISTS_ERROR, MYF(0), new_name);
 
942
        error= -1;
 
943
      }
 
944
      else
 
945
      {
 
946
        *fn_ext(new_name)= 0;
 
947
        if (mysql_rename_table(old_db_type, db, table_name, new_db, new_alias, 0))
 
948
          error= -1;
 
949
      }
 
950
    }
 
951
 
 
952
    if (error == HA_ERR_WRONG_COMMAND)
 
953
    {
 
954
      error= 0;
 
955
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
956
                          ER_ILLEGAL_HA, ER(ER_ILLEGAL_HA),
 
957
                          table->alias);
 
958
    }
 
959
 
 
960
    if (error == 0)
 
961
    {
 
962
      write_bin_log(session, session->query, session->query_length);
 
963
      session->my_ok();
 
964
    }
 
965
    else if (error > 0)
 
966
    {
 
967
      table->print_error(error, MYF(0));
 
968
      error= -1;
 
969
    }
 
970
 
 
971
    if (name_lock)
 
972
      session->unlink_open_table(name_lock);
 
973
 
 
974
    pthread_mutex_unlock(&LOCK_open);
 
975
    table_list->table= NULL;
 
976
    return error;
 
977
  }
 
978
 
 
979
  /* We have to do full alter table. */
 
980
  new_db_type= create_info->db_type;
 
981
 
 
982
  if (mysql_prepare_alter_table(session, table, create_info, create_proto,
 
983
                                alter_info))
 
984
      goto err;
 
985
 
 
986
  set_table_default_charset(create_info, db);
 
987
 
 
988
  alter_info->build_method= HA_BUILD_OFFLINE;
 
989
 
 
990
  snprintf(tmp_name, sizeof(tmp_name), "%s-%lx_%"PRIx64, TMP_FILE_PREFIX, (unsigned long) current_pid, session->thread_id);
 
991
 
 
992
  /* Safety fix for innodb */
 
993
  my_casedn_str(files_charset_info, tmp_name);
 
994
 
 
995
  /* Create a temporary table with the new format */
 
996
  {
 
997
    /**
 
998
      @note we make an internal temporary table unless the table is a temporary table. In last
 
999
      case we just use it as is. Neither of these tables require locks in order to  be
 
1000
      filled.
 
1001
    */
 
1002
    TableIdentifier new_table_temp(new_db,
 
1003
                                   tmp_name,
 
1004
                                   create_proto->type() != message::Table::TEMPORARY ? INTERNAL_TMP_TABLE :
 
1005
                                   TEMP_TABLE);
 
1006
 
 
1007
    error= create_temporary_table(session, table, new_table_temp, create_info, create_proto, alter_info);
 
1008
 
 
1009
    if (error != 0)
 
1010
      goto err;
 
1011
  }
 
1012
 
 
1013
  /* Open the table so we need to copy the data to it. */
 
1014
  new_table= open_alter_table(session, table, new_db, tmp_name);
 
1015
 
 
1016
  if (new_table == NULL)
 
1017
    goto err1;
 
1018
 
 
1019
  /* Copy the data if necessary. */
 
1020
  session->count_cuted_fields= CHECK_FIELD_WARN;        // calc cuted fields
 
1021
  session->cuted_fields= 0L;
 
1022
  session->set_proc_info("copy to tmp table");
 
1023
  copied= deleted= 0;
 
1024
 
 
1025
  assert(new_table);
 
1026
 
 
1027
  /* We don't want update TIMESTAMP fields during ALTER Table. */
 
1028
  new_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
 
1029
  new_table->next_number_field= new_table->found_next_number_field;
 
1030
  error= copy_data_between_tables(table,
 
1031
                                  new_table,
 
1032
                                  alter_info->create_list,
 
1033
                                  ignore,
 
1034
                                  order_num,
 
1035
                                  order,
 
1036
                                  &copied,
 
1037
                                  &deleted,
 
1038
                                  alter_info->keys_onoff,
 
1039
                                  alter_info->error_if_not_empty);
 
1040
 
 
1041
  /* We must not ignore bad input! */
 
1042
  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
 
1043
 
 
1044
  if (table->s->tmp_table != NO_TMP_TABLE)
 
1045
  {
 
1046
    /* We changed a temporary table */
 
1047
    if (error)
 
1048
      goto err1;
 
1049
 
 
1050
    /* Close lock if this is a transactional table */
 
1051
    if (session->lock)
 
1052
    {
 
1053
      mysql_unlock_tables(session, session->lock);
 
1054
      session->lock= 0;
 
1055
    }
 
1056
 
 
1057
    /* Remove link to old table and rename the new one */
 
1058
    session->close_temporary_table(table);
 
1059
 
 
1060
    /* Should pass the 'new_name' as we store table name in the cache */
 
1061
    if (new_table->rename_temporary_table(new_db, new_name))
 
1062
      goto err1;
 
1063
 
 
1064
    goto end_temporary;
 
1065
  }
 
1066
 
 
1067
  if (new_table)
 
1068
  {
 
1069
    /*
 
1070
      Close the intermediate table that will be the new table.
 
1071
      Note that MERGE tables do not have their children attached here.
 
1072
    */
 
1073
    new_table->intern_close_table();
 
1074
    free(new_table);
 
1075
  }
 
1076
 
 
1077
  pthread_mutex_lock(&LOCK_open); /* ALTER TABLE */
 
1078
 
 
1079
  if (error)
 
1080
  {
 
1081
    TableIdentifier identifier(new_db, tmp_name, INTERNAL_TMP_TABLE);
 
1082
    quick_rm_table(*session, identifier);
 
1083
    pthread_mutex_unlock(&LOCK_open);
 
1084
    goto err;
 
1085
  }
 
1086
 
 
1087
  /*
 
1088
    Data is copied. Now we:
 
1089
    1) Wait until all other threads close old version of table.
 
1090
    2) Close instances of table open by this thread and replace them
 
1091
       with exclusive name-locks.
 
1092
    3) Rename the old table to a temp name, rename the new one to the
 
1093
       old name.
 
1094
    4) If we are under LOCK TABLES and don't do ALTER Table ... RENAME
 
1095
       we reopen new version of table.
 
1096
    5) Write statement to the binary log.
 
1097
    6) If we are under LOCK TABLES and do ALTER Table ... RENAME we
 
1098
       remove name-locks from list of open tables and table cache.
 
1099
    7) If we are not not under LOCK TABLES we rely on close_thread_tables()
 
1100
       call to remove name-locks from table cache and list of open table.
 
1101
  */
 
1102
 
 
1103
  session->set_proc_info("rename result table");
 
1104
 
 
1105
  snprintf(old_name, sizeof(old_name), "%s2-%lx-%"PRIx64, TMP_FILE_PREFIX, (unsigned long) current_pid, session->thread_id);
 
1106
 
 
1107
  my_casedn_str(files_charset_info, old_name);
 
1108
 
 
1109
  wait_while_table_is_used(session, table, HA_EXTRA_PREPARE_FOR_RENAME);
 
1110
  session->close_data_files_and_morph_locks(db, table_name);
 
1111
 
 
1112
  error= 0;
 
1113
  save_old_db_type= old_db_type;
 
1114
 
 
1115
  /*
 
1116
    This leads to the storage engine (SE) not being notified for renames in
 
1117
    mysql_rename_table(), because we just juggle with the FRM and nothing
 
1118
    more. If we have an intermediate table, then we notify the SE that
 
1119
    it should become the actual table. Later, we will recycle the old table.
 
1120
    However, in case of ALTER Table RENAME there might be no intermediate
 
1121
    table. This is when the old and new tables are compatible, according to
 
1122
    compare_table(). Then, we need one additional call to
 
1123
  */
 
1124
  if (mysql_rename_table(old_db_type, db, table_name, db, old_name, FN_TO_IS_TMP))
 
1125
  {
 
1126
    error= 1;
 
1127
    TableIdentifier identifier(new_db, tmp_name, INTERNAL_TMP_TABLE);
 
1128
    quick_rm_table(*session, identifier);
 
1129
  }
 
1130
  else
 
1131
  {
 
1132
    if (mysql_rename_table(new_db_type, new_db, tmp_name, new_db, new_alias, FN_FROM_IS_TMP) != 0)
 
1133
    {
 
1134
      /* Try to get everything back. */
 
1135
      error= 1;
 
1136
 
 
1137
      TableIdentifier alias_identifier(new_db, new_alias, NO_TMP_TABLE);
 
1138
      quick_rm_table(*session, alias_identifier);
 
1139
 
 
1140
      TableIdentifier tmp_identifier(new_db, tmp_name, INTERNAL_TMP_TABLE);
 
1141
      quick_rm_table(*session, tmp_identifier);
 
1142
 
 
1143
      mysql_rename_table(old_db_type, db, old_name, db, table_name, FN_FROM_IS_TMP);
 
1144
    }
 
1145
  }
 
1146
 
 
1147
  if (error)
 
1148
  {
 
1149
    /* This shouldn't happen. But let us play it safe. */
 
1150
    goto err_with_placeholders;
 
1151
  }
 
1152
 
 
1153
  {
 
1154
    TableIdentifier old_identifier(db, old_name, INTERNAL_TMP_TABLE);
 
1155
    quick_rm_table(*session, old_identifier);
 
1156
  }
 
1157
  
 
1158
 
 
1159
  pthread_mutex_unlock(&LOCK_open);
 
1160
 
 
1161
  session->set_proc_info("end");
 
1162
 
 
1163
  write_bin_log(session, session->query, session->query_length);
 
1164
  table_list->table= NULL;
 
1165
 
 
1166
end_temporary:
 
1167
  /*
 
1168
   * Field::store() may have called my_error().  If this is 
 
1169
   * the case, we must not send an ok packet, since 
 
1170
   * Diagnostics_area::is_set() will fail an assert.
 
1171
   */
 
1172
  if (! session->is_error())
 
1173
  {
 
1174
    snprintf(tmp_name, sizeof(tmp_name), ER(ER_INSERT_INFO),
 
1175
            (ulong) (copied + deleted), (ulong) deleted,
 
1176
            (ulong) session->cuted_fields);
 
1177
    session->my_ok(copied + deleted, 0, 0L, tmp_name);
 
1178
    session->some_tables_deleted=0;
 
1179
    return false;
 
1180
  }
 
1181
  else
 
1182
  {
 
1183
    /* my_error() was called.  Return true (which means error...) */
 
1184
    return true;
 
1185
  }
 
1186
 
 
1187
err1:
 
1188
  if (new_table)
 
1189
  {
 
1190
    /* close_temporary_table() frees the new_table pointer. */
 
1191
    session->close_temporary_table(new_table);
 
1192
  }
 
1193
  else
 
1194
  {
 
1195
    TableIdentifier tmp_identifier(new_db, tmp_name, INTERNAL_TMP_TABLE);
 
1196
    quick_rm_table(*session, tmp_identifier);
 
1197
  }
 
1198
 
 
1199
err:
 
1200
  /*
 
1201
    No default value was provided for a DATE/DATETIME field, the
 
1202
    current sql_mode doesn't allow the '0000-00-00' value and
 
1203
    the table to be altered isn't empty.
 
1204
    Report error here.
 
1205
  */
 
1206
  if (alter_info->error_if_not_empty && session->row_count)
 
1207
  {
 
1208
    const char *f_val= 0;
 
1209
    enum enum_drizzle_timestamp_type t_type= DRIZZLE_TIMESTAMP_DATE;
 
1210
    switch (alter_info->datetime_field->sql_type)
 
1211
    {
 
1212
      case DRIZZLE_TYPE_DATE:
 
1213
        f_val= "0000-00-00";
 
1214
        t_type= DRIZZLE_TIMESTAMP_DATE;
 
1215
        break;
 
1216
      case DRIZZLE_TYPE_DATETIME:
 
1217
        f_val= "0000-00-00 00:00:00";
 
1218
        t_type= DRIZZLE_TIMESTAMP_DATETIME;
 
1219
        break;
 
1220
      default:
 
1221
        /* Shouldn't get here. */
 
1222
        assert(0);
 
1223
    }
 
1224
    bool save_abort_on_warning= session->abort_on_warning;
 
1225
    session->abort_on_warning= true;
 
1226
    make_truncated_value_warning(session, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
 
1227
                                 f_val, strlength(f_val), t_type,
 
1228
                                 alter_info->datetime_field->field_name);
 
1229
    session->abort_on_warning= save_abort_on_warning;
 
1230
  }
 
1231
  if (name_lock)
 
1232
  {
 
1233
    pthread_mutex_lock(&LOCK_open); /* ALTER TABLe */
 
1234
    session->unlink_open_table(name_lock);
 
1235
    pthread_mutex_unlock(&LOCK_open);
 
1236
  }
 
1237
  return true;
 
1238
 
 
1239
err_with_placeholders:
 
1240
  /*
 
1241
    An error happened while we were holding exclusive name-lock on table
 
1242
    being altered. To be safe under LOCK TABLES we should remove placeholders
 
1243
    from list of open tables list and table cache.
 
1244
  */
 
1245
  session->unlink_open_table(table);
 
1246
  if (name_lock)
 
1247
    session->unlink_open_table(name_lock);
 
1248
  pthread_mutex_unlock(&LOCK_open);
 
1249
  return true;
 
1250
}
 
1251
/* alter_table */
 
1252
 
 
1253
static int
 
1254
copy_data_between_tables(Table *from, Table *to,
 
1255
                         List<CreateField> &create,
 
1256
                         bool ignore,
 
1257
                         uint32_t order_num, order_st *order,
 
1258
                         ha_rows *copied,
 
1259
                         ha_rows *deleted,
 
1260
                         enum enum_enable_or_disable keys_onoff,
 
1261
                         bool error_if_not_empty)
 
1262
{
 
1263
  int error;
 
1264
  CopyField *copy,*copy_end;
 
1265
  ulong found_count,delete_count;
 
1266
  Session *session= current_session;
 
1267
  uint32_t length= 0;
 
1268
  SORT_FIELD *sortorder;
 
1269
  READ_RECORD info;
 
1270
  TableList   tables;
 
1271
  List<Item>   fields;
 
1272
  List<Item>   all_fields;
 
1273
  ha_rows examined_rows;
 
1274
  bool auto_increment_field_copied= 0;
 
1275
  uint64_t prev_insert_id;
 
1276
 
 
1277
  /*
 
1278
    Turn off recovery logging since rollback of an alter table is to
 
1279
    delete the new table so there is no need to log the changes to it.
 
1280
 
 
1281
    This needs to be done before external_lock
 
1282
  */
 
1283
  error= ha_enable_transaction(session, false);
 
1284
  if (error)
 
1285
    return -1;
 
1286
 
 
1287
  if (!(copy= new CopyField[to->s->fields]))
 
1288
    return -1;
 
1289
 
 
1290
  if (to->cursor->ha_external_lock(session, F_WRLCK))
 
1291
    return -1;
 
1292
 
 
1293
  /* We need external lock before we can disable/enable keys */
 
1294
  alter_table_manage_keys(to, from->cursor->indexes_are_disabled(), keys_onoff);
 
1295
 
 
1296
  /* We can abort alter table for any table type */
 
1297
  session->abort_on_warning= !ignore;
 
1298
 
 
1299
  from->cursor->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK);
 
1300
  to->cursor->ha_start_bulk_insert(from->cursor->stats.records);
 
1301
 
 
1302
  List_iterator<CreateField> it(create);
 
1303
  CreateField *def;
 
1304
  copy_end=copy;
 
1305
  for (Field **ptr=to->field ; *ptr ; ptr++)
 
1306
  {
 
1307
    def=it++;
 
1308
    if (def->field)
 
1309
    {
 
1310
      if (*ptr == to->next_number_field)
 
1311
        auto_increment_field_copied= true;
 
1312
 
 
1313
      (copy_end++)->set(*ptr,def->field,0);
 
1314
    }
 
1315
 
 
1316
  }
 
1317
 
 
1318
  found_count=delete_count=0;
 
1319
 
 
1320
  if (order)
 
1321
  {
 
1322
    if (to->s->primary_key != MAX_KEY && to->cursor->primary_key_is_clustered())
 
1323
    {
 
1324
      char warn_buff[DRIZZLE_ERRMSG_SIZE];
 
1325
      snprintf(warn_buff, sizeof(warn_buff),
 
1326
               _("order_st BY ignored because there is a user-defined clustered "
 
1327
                 "index in the table '%-.192s'"),
 
1328
               from->s->table_name.str);
 
1329
      push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR,
 
1330
                   warn_buff);
 
1331
    }
 
1332
    else
 
1333
    {
 
1334
      from->sort.io_cache= new IO_CACHE;
 
1335
      memset(from->sort.io_cache, 0, sizeof(IO_CACHE));
 
1336
 
 
1337
      memset(&tables, 0, sizeof(tables));
 
1338
      tables.table= from;
 
1339
      tables.alias= tables.table_name= from->s->table_name.str;
 
1340
      tables.db= from->s->db.str;
 
1341
      error= 1;
 
1342
 
 
1343
      if (session->lex->select_lex.setup_ref_array(session, order_num) ||
 
1344
          setup_order(session, session->lex->select_lex.ref_pointer_array,
 
1345
                      &tables, fields, all_fields, order) ||
 
1346
          !(sortorder= make_unireg_sortorder(order, &length, NULL)) ||
 
1347
          (from->sort.found_records= filesort(session, from, sortorder, length,
 
1348
                                              (optimizer::SqlSelect *) 0, HA_POS_ERROR,
 
1349
                                              1, &examined_rows)) ==
 
1350
          HA_POS_ERROR)
 
1351
        goto err;
 
1352
    }
 
1353
  };
 
1354
 
 
1355
  /* Tell handler that we have values for all columns in the to table */
 
1356
  to->use_all_columns();
 
1357
  init_read_record(&info, session, from, (optimizer::SqlSelect *) 0, 1,1);
 
1358
  if (ignore)
 
1359
    to->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
 
1360
  session->row_count= 0;
 
1361
  to->restoreRecordAsDefault();        // Create empty record
 
1362
  while (!(error=info.read_record(&info)))
 
1363
  {
 
1364
    if (session->killed)
 
1365
    {
 
1366
      session->send_kill_message();
 
1367
      error= 1;
 
1368
      break;
 
1369
    }
 
1370
    session->row_count++;
 
1371
    /* Return error if source table isn't empty. */
 
1372
    if (error_if_not_empty)
 
1373
    {
 
1374
      error= 1;
 
1375
      break;
 
1376
    }
 
1377
    if (to->next_number_field)
 
1378
    {
 
1379
      if (auto_increment_field_copied)
 
1380
        to->auto_increment_field_not_null= true;
 
1381
      else
 
1382
        to->next_number_field->reset();
 
1383
    }
 
1384
 
 
1385
    for (CopyField *copy_ptr=copy ; copy_ptr != copy_end ; copy_ptr++)
 
1386
    {
 
1387
      copy_ptr->do_copy(copy_ptr);
 
1388
    }
 
1389
    prev_insert_id= to->cursor->next_insert_id;
 
1390
    error= to->cursor->ha_write_row(to->record[0]);
 
1391
    to->auto_increment_field_not_null= false;
 
1392
    if (error)
 
1393
    { 
 
1394
      if (!ignore ||
 
1395
          to->cursor->is_fatal_error(error, HA_CHECK_DUP))
 
1396
      { 
 
1397
        to->print_error(error, MYF(0));
 
1398
        break;
 
1399
      }
 
1400
      to->cursor->restore_auto_increment(prev_insert_id);
 
1401
      delete_count++;
 
1402
    }
 
1403
    else
 
1404
      found_count++;
 
1405
  }
 
1406
  end_read_record(&info);
 
1407
  from->free_io_cache();
 
1408
  delete [] copy;                               // This is never 0
 
1409
 
 
1410
  if (to->cursor->ha_end_bulk_insert() && error <= 0)
 
1411
  {
 
1412
    to->print_error(errno, MYF(0));
 
1413
    error=1;
 
1414
  }
 
1415
  to->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
 
1416
 
 
1417
  if (ha_enable_transaction(session, true))
 
1418
  {
 
1419
    error= 1;
 
1420
    goto err;
 
1421
  }
 
1422
 
 
1423
  /*
 
1424
    Ensure that the new table is saved properly to disk so that we
 
1425
    can do a rename
 
1426
  */
 
1427
  if (ha_autocommit_or_rollback(session, 0))
 
1428
    error=1;
 
1429
  if (! session->endActiveTransaction())
 
1430
    error=1;
 
1431
 
 
1432
 err:
 
1433
  session->abort_on_warning= 0;
 
1434
  from->free_io_cache();
 
1435
  *copied= found_count;
 
1436
  *deleted=delete_count;
 
1437
  to->cursor->ha_release_auto_increment();
 
1438
  if (to->cursor->ha_external_lock(session,F_UNLCK))
 
1439
    error=1;
 
1440
  return(error > 0 ? -1 : 0);
 
1441
}
 
1442
 
 
1443
static int
 
1444
create_temporary_table(Session *session,
 
1445
                       Table *table,
 
1446
                       TableIdentifier &identifier,
 
1447
                       HA_CREATE_INFO *create_info,
 
1448
                       message::Table *create_proto,
 
1449
                       AlterInfo *alter_info)
 
1450
{
 
1451
  int error;
 
1452
  plugin::StorageEngine *old_db_type, *new_db_type;
 
1453
 
 
1454
  old_db_type= table->s->db_type();
 
1455
  new_db_type= create_info->db_type;
 
1456
 
 
1457
  /*
 
1458
    Create a table with a temporary name.
 
1459
    We don't log the statement, it will be logged later.
 
1460
  */
 
1461
  create_proto->set_name(identifier.getTableName());
 
1462
 
 
1463
  message::Table::StorageEngine *protoengine;
 
1464
  protoengine= create_proto->mutable_engine();
 
1465
  protoengine->set_name(new_db_type->getName());
 
1466
 
 
1467
  error= mysql_create_table(session,
 
1468
                            identifier,
 
1469
                            create_info, create_proto, alter_info, true, 0, false);
 
1470
 
 
1471
  return error;
 
1472
}
 
1473
 
 
1474
static Table *open_alter_table(Session *session, Table *table, char *db, char *table_name)
 
1475
{
 
1476
  Table *new_table;
 
1477
 
 
1478
  /* Open the table so we need to copy the data to it. */
 
1479
  if (table->s->tmp_table)
 
1480
  {
 
1481
    TableList tbl;
 
1482
    tbl.db= db;
 
1483
    tbl.alias= table_name;
 
1484
    tbl.table_name= table_name;
 
1485
 
 
1486
    /* Table is in session->temporary_tables */
 
1487
    new_table= session->openTable(&tbl, (bool*) 0, DRIZZLE_LOCK_IGNORE_FLUSH);
 
1488
  }
 
1489
  else
 
1490
  {
 
1491
    TableIdentifier new_identifier(db, table_name, INTERNAL_TMP_TABLE);
 
1492
 
 
1493
    /* Open our intermediate table */
 
1494
    new_table= session->open_temporary_table(new_identifier, false);
 
1495
  }
 
1496
 
 
1497
  return new_table;
 
1498
}
 
1499
 
 
1500
} /* namespace drizzled */