~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_table.cc

  • Committer: Jay Pipes
  • Date: 2010-04-23 17:43:11 UTC
  • mto: This revision was merged to the branch mainline in revision 1524.
  • Revision ID: jpipes@serialcoder-20100423174311-ze5sq5nrbb7km2po
ha_rnd_init -> startTableScan, rnd_init -> doStartTableScan, ha_rnd_end -> endTableScan, rnd_end -> doEndTableScan

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
/* drop and alter of tables */
17
17
 
36
36
#include "drizzled/transaction_services.h"
37
37
#include <drizzled/table_proto.h>
38
38
#include <drizzled/plugin/client.h>
39
 
#include <drizzled/identifier.h>
 
39
#include <drizzled/table_identifier.h>
40
40
#include "drizzled/internal/m_string.h"
41
41
#include "drizzled/global_charset_info.h"
42
42
#include "drizzled/charset.h"
43
43
 
44
 
#include "drizzled/definition/cache.h"
45
 
 
46
44
 
47
45
#include "drizzled/statement/alter_table.h"
48
46
#include "drizzled/sql_table.h"
51
49
#include <algorithm>
52
50
#include <sstream>
53
51
 
54
 
#include <boost/unordered_set.hpp>
55
 
 
56
52
using namespace std;
57
53
 
58
54
namespace drizzled
60
56
 
61
57
extern pid_t current_pid;
62
58
 
63
 
bool is_primary_key(KeyInfo *key_info)
 
59
bool is_primary_key(KEY *key_info)
64
60
{
65
61
  static const char * primary_key_name="PRIMARY";
66
62
  return (strcmp(key_info->name, primary_key_name)==0);
75
71
    return NULL;
76
72
}
77
73
 
78
 
static bool check_if_keyname_exists(const char *name,KeyInfo *start, KeyInfo *end);
79
 
static char *make_unique_key_name(const char *field_name,KeyInfo *start,KeyInfo *end);
 
74
static bool check_if_keyname_exists(const char *name,KEY *start, KEY *end);
 
75
static char *make_unique_key_name(const char *field_name,KEY *start,KEY *end);
80
76
 
81
77
static bool prepare_blob_field(Session *session, CreateField *sql_field);
82
78
 
107
103
    cursor
108
104
*/
109
105
 
110
 
void write_bin_log(Session *session, const std::string &query)
 
106
void write_bin_log(Session *session,
 
107
                   char const *query)
111
108
{
112
109
  TransactionServices &transaction_services= TransactionServices::singleton();
113
110
  transaction_services.rawStatement(session, query);
114
111
}
115
112
 
 
113
 
 
114
/* Should should be refactored to go away */
 
115
void write_bin_log_drop_table(Session *session, bool if_exists, const char *db_name, const char *table_name)
 
116
{
 
117
  TransactionServices &transaction_services= TransactionServices::singleton();
 
118
  string built_query;
 
119
 
 
120
  if (if_exists)
 
121
    built_query.append("DROP TABLE IF EXISTS ");
 
122
  else
 
123
    built_query.append("DROP TABLE ");
 
124
 
 
125
  built_query.append("`");
 
126
  if (session->db.empty() || strcmp(db_name, session->db.c_str()) != 0)
 
127
  {
 
128
    built_query.append(db_name);
 
129
    built_query.append("`.`");
 
130
  }
 
131
 
 
132
  built_query.append(table_name);
 
133
  built_query.append("`");
 
134
  transaction_services.rawStatement(session, built_query);
 
135
}
 
136
 
116
137
/*
117
138
  Execute the drop of a normal or temporary table
118
139
 
124
145
                        In this case we give an warning of level 'NOTE'
125
146
    drop_temporary      Only drop temporary tables
126
147
 
127
 
  @todo
 
148
  TODO:
128
149
    When logging to the binary log, we should log
129
150
    tmp_tables and transactional tables as separate statements if we
130
151
    are in a transaction;  This is needed to get these tables into the
148
169
  int error= 0;
149
170
  bool foreign_key_error= false;
150
171
 
151
 
  {
152
 
    table::Cache::singleton().mutex().lock(); /* Part 2 of rm a table */
153
 
 
154
 
    if (not drop_temporary && session->lock_table_names_exclusively(tables))
155
 
    {
156
 
      table::Cache::singleton().mutex().unlock();
157
 
      return 1;
158
 
    }
159
 
 
160
 
    /* Don't give warnings for not found errors, as we already generate notes */
161
 
    session->no_warnings_for_error= 1;
162
 
 
163
 
    for (table= tables; table; table= table->next_local)
164
 
    {
165
 
      TableIdentifier tmp_identifier(table->getSchemaName(), table->getTableName());
166
 
 
167
 
      error= session->drop_temporary_table(tmp_identifier);
168
 
 
169
 
      switch (error) {
170
 
      case  0:
171
 
        // removed temporary table
172
 
        continue;
173
 
      case -1:
 
172
  pthread_mutex_lock(&LOCK_open); /* Part 2 of rm a table */
 
173
 
 
174
  /*
 
175
    If we have the table in the definition cache, we don't have to check the
 
176
    .frm cursor to find if the table is a normal table (not view) and what
 
177
    engine to use.
 
178
  */
 
179
 
 
180
  for (table= tables; table; table= table->next_local)
 
181
  {
 
182
    TableIdentifier identifier(table->db, table->table_name);
 
183
    TableShare *share;
 
184
    table->db_type= NULL;
 
185
 
 
186
    if ((share= TableShare::getShare(identifier)))
 
187
    {
 
188
      table->db_type= share->db_type();
 
189
    }
 
190
  }
 
191
 
 
192
  if (not drop_temporary && lock_table_names_exclusively(session, tables))
 
193
  {
 
194
    pthread_mutex_unlock(&LOCK_open);
 
195
    return 1;
 
196
  }
 
197
 
 
198
  /* Don't give warnings for not found errors, as we already generate notes */
 
199
  session->no_warnings_for_error= 1;
 
200
 
 
201
  for (table= tables; table; table= table->next_local)
 
202
  {
 
203
    char *db=table->db;
 
204
 
 
205
    error= session->drop_temporary_table(table);
 
206
 
 
207
    switch (error) {
 
208
    case  0:
 
209
      // removed temporary table
 
210
      continue;
 
211
    case -1:
 
212
      error= 1;
 
213
      goto err_with_placeholders;
 
214
    default:
 
215
      // temporary table not found
 
216
      error= 0;
 
217
    }
 
218
 
 
219
    if (drop_temporary == false)
 
220
    {
 
221
      Table *locked_table;
 
222
      abort_locked_tables(session, db, table->table_name);
 
223
      remove_table_from_cache(session, db, table->table_name,
 
224
                              RTFC_WAIT_OTHER_THREAD_FLAG |
 
225
                              RTFC_CHECK_KILLED_FLAG);
 
226
      /*
 
227
        If the table was used in lock tables, remember it so that
 
228
        unlock_table_names can free it
 
229
      */
 
230
      if ((locked_table= drop_locked_tables(session, db, table->table_name)))
 
231
        table->table= locked_table;
 
232
 
 
233
      if (session->killed)
 
234
      {
 
235
        error= -1;
 
236
        goto err_with_placeholders;
 
237
      }
 
238
    }
 
239
    TableIdentifier identifier(db, table->table_name, table->internal_tmp_table ? message::Table::INTERNAL : message::Table::STANDARD);
 
240
 
 
241
    if (drop_temporary || not plugin::StorageEngine::doesTableExist(*session, identifier))
 
242
    {
 
243
      // Table was not found on disk and table can't be created from engine
 
244
      if (if_exists)
 
245
        push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
246
                            ER_BAD_TABLE_ERROR, ER(ER_BAD_TABLE_ERROR),
 
247
                            table->table_name);
 
248
      else
174
249
        error= 1;
175
 
        goto err_with_placeholders;
176
 
      default:
177
 
        // temporary table not found
 
250
    }
 
251
    else
 
252
    {
 
253
      error= plugin::StorageEngine::dropTable(*session, identifier);
 
254
 
 
255
      if ((error == ENOENT || error == HA_ERR_NO_SUCH_TABLE) && if_exists)
 
256
      {
178
257
        error= 0;
179
 
      }
180
 
 
181
 
      if (drop_temporary == false)
182
 
      {
183
 
        Table *locked_table;
184
 
        abort_locked_tables(session, tmp_identifier);
185
 
        table::Cache::singleton().removeTable(session, tmp_identifier,
186
 
                                              RTFC_WAIT_OTHER_THREAD_FLAG |
187
 
                                              RTFC_CHECK_KILLED_FLAG);
188
 
        /*
189
 
          If the table was used in lock tables, remember it so that
190
 
          unlock_table_names can free it
191
 
        */
192
 
        if ((locked_table= drop_locked_tables(session, tmp_identifier)))
193
 
          table->table= locked_table;
194
 
 
195
 
        if (session->getKilled())
196
 
        {
197
 
          error= -1;
198
 
          goto err_with_placeholders;
199
 
        }
200
 
      }
201
 
      TableIdentifier identifier(table->getSchemaName(), table->getTableName(), table->getInternalTmpTable() ? message::Table::INTERNAL : message::Table::STANDARD);
202
 
 
203
 
      if (drop_temporary || not plugin::StorageEngine::doesTableExist(*session, identifier))
204
 
      {
205
 
        // Table was not found on disk and table can't be created from engine
206
 
        if (if_exists)
207
 
          push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
208
 
                              ER_BAD_TABLE_ERROR, ER(ER_BAD_TABLE_ERROR),
209
 
                              table->getTableName());
210
 
        else
211
 
          error= 1;
212
 
      }
213
 
      else
214
 
      {
215
 
        error= plugin::StorageEngine::dropTable(*session, identifier);
216
 
 
217
 
        if ((error == ENOENT || error == HA_ERR_NO_SUCH_TABLE) && if_exists)
218
 
        {
219
 
          error= 0;
220
 
          session->clear_error();
221
 
        }
222
 
 
223
 
        if (error == HA_ERR_ROW_IS_REFERENCED)
224
 
        {
225
 
          /* the table is referenced by a foreign key constraint */
226
 
          foreign_key_error= true;
227
 
        }
228
 
      }
229
 
 
230
 
      if (error == 0 || (if_exists && foreign_key_error == false))
231
 
      {
232
 
        TransactionServices &transaction_services= TransactionServices::singleton();
233
 
        transaction_services.dropTable(session, string(table->getSchemaName()), string(table->getTableName()), if_exists);
234
 
      }
235
 
 
236
 
      if (error)
237
 
      {
238
 
        if (wrong_tables.length())
239
 
          wrong_tables.append(',');
240
 
        wrong_tables.append(String(table->getTableName(), system_charset_info));
241
 
      }
242
 
    }
243
 
    /*
244
 
      It's safe to unlock table::Cache::singleton().mutex(): we have an exclusive lock
245
 
      on the table name.
246
 
    */
247
 
    table::Cache::singleton().mutex().unlock();
 
258
        session->clear_error();
 
259
      }
 
260
 
 
261
      if (error == HA_ERR_ROW_IS_REFERENCED)
 
262
      {
 
263
        /* the table is referenced by a foreign key constraint */
 
264
        foreign_key_error= true;
 
265
      }
 
266
    }
 
267
 
 
268
    if (error == 0 || (if_exists && foreign_key_error == false))
 
269
    {
 
270
      TransactionServices &transaction_services= TransactionServices::singleton();
 
271
      transaction_services.dropTable(session, string(db), string(table->table_name), if_exists);
 
272
    }
 
273
 
 
274
    if (error)
 
275
    {
 
276
      if (wrong_tables.length())
 
277
        wrong_tables.append(',');
 
278
      wrong_tables.append(String(table->table_name,system_charset_info));
 
279
    }
248
280
  }
 
281
  /*
 
282
    It's safe to unlock LOCK_open: we have an exclusive lock
 
283
    on the table name.
 
284
  */
 
285
  pthread_mutex_unlock(&LOCK_open);
249
286
  error= 0;
250
287
 
251
288
  if (wrong_tables.length())
262
299
    error= 1;
263
300
  }
264
301
 
265
 
  table::Cache::singleton().mutex().lock(); /* final bit in rm table lock */
 
302
  pthread_mutex_lock(&LOCK_open); /* final bit in rm table lock */
266
303
 
267
304
err_with_placeholders:
268
 
  tables->unlock_table_names();
269
 
  table::Cache::singleton().mutex().unlock();
 
305
  unlock_table_names(tables, NULL);
 
306
  pthread_mutex_unlock(&LOCK_open);
270
307
  session->no_warnings_for_error= 0;
271
308
 
272
309
  return error;
273
310
}
274
311
 
 
312
 
 
313
/*
 
314
  Quickly remove a table.
 
315
 
 
316
  SYNOPSIS
 
317
    quick_rm_table()
 
318
      base                      The plugin::StorageEngine handle.
 
319
      db                        The database name.
 
320
      table_name                The table name.
 
321
      is_tmp                    If the table is temp.
 
322
 
 
323
  RETURN
 
324
    0           OK
 
325
    != 0        Error
 
326
*/
 
327
bool quick_rm_table(Session& session,
 
328
                    TableIdentifier &identifier)
 
329
{
 
330
  return (plugin::StorageEngine::dropTable(session, identifier));
 
331
}
 
332
 
275
333
/*
276
334
  Sort keys in the following order:
277
335
  - PRIMARY KEY
285
343
  PRIMARY keys are prioritized.
286
344
*/
287
345
 
288
 
static int sort_keys(KeyInfo *a, KeyInfo *b)
 
346
static int sort_keys(KEY *a, KEY *b)
289
347
{
290
348
  ulong a_flags= a->flags, b_flags= b->flags;
291
349
 
337
395
    1             Error
338
396
*/
339
397
 
340
 
class typelib_set_member
341
 
{
342
 
public:
343
 
  string s;
344
 
  const CHARSET_INFO * const cs;
345
 
 
346
 
  typelib_set_member(const char* value, unsigned int length,
347
 
                     const CHARSET_INFO * const charset)
348
 
    : s(value, length),
349
 
      cs(charset)
350
 
  {}
351
 
};
352
 
 
353
 
static bool operator==(typelib_set_member const& a, typelib_set_member const& b)
354
 
{
355
 
  return (my_strnncoll(a.cs,
356
 
                       (const unsigned char*)a.s.c_str(), a.s.length(),
357
 
                       (const unsigned char*)b.s.c_str(), b.s.length())==0);
358
 
}
359
 
 
360
 
 
361
 
namespace
362
 
{
363
 
class typelib_set_member_hasher
364
 
{
365
 
  boost::hash<string> hasher;
366
 
public:
367
 
  std::size_t operator()(const typelib_set_member& t) const
368
 
  {
369
 
    return hasher(t.s);
370
 
  }
371
 
};
372
 
}
373
 
 
374
398
static bool check_duplicates_in_interval(const char *set_or_name,
375
399
                                         const char *name, TYPELIB *typelib,
376
400
                                         const CHARSET_INFO * const cs,
381
405
  unsigned int *cur_length= typelib->type_lengths;
382
406
  *dup_val_count= 0;
383
407
 
384
 
  boost::unordered_set<typelib_set_member, typelib_set_member_hasher> interval_set;
385
 
 
386
 
  for ( ; tmp.count > 0; cur_value++, cur_length++)
 
408
  for ( ; tmp.count > 1; cur_value++, cur_length++)
387
409
  {
388
410
    tmp.type_names++;
389
411
    tmp.type_lengths++;
390
412
    tmp.count--;
391
 
    if (interval_set.find(typelib_set_member(*cur_value, *cur_length, cs)) != interval_set.end())
 
413
    if (find_type2(&tmp, (const char*)*cur_value, *cur_length, cs))
392
414
    {
393
415
      my_error(ER_DUPLICATED_VALUE_IN_TYPE, MYF(0),
394
416
               name,*cur_value,set_or_name);
395
417
      return 1;
396
418
    }
397
 
    else
398
 
      interval_set.insert(typelib_set_member(*cur_value, *cur_length, cs));
399
419
  }
400
420
  return 0;
401
421
}
468
488
 
469
489
  switch (sql_field->sql_type) {
470
490
  case DRIZZLE_TYPE_BLOB:
 
491
    sql_field->pack_flag= pack_length_to_packflag(sql_field->pack_length - portable_sizeof_char_ptr);
471
492
    sql_field->length= 8; // Unireg field length
472
493
    (*blob_columns)++;
473
494
    break;
474
495
  case DRIZZLE_TYPE_VARCHAR:
 
496
    sql_field->pack_flag=0;
475
497
    break;
476
498
  case DRIZZLE_TYPE_ENUM:
 
499
    sql_field->pack_flag=pack_length_to_packflag(sql_field->pack_length);
477
500
    if (check_duplicates_in_interval("ENUM",
478
501
                                     sql_field->field_name,
479
502
                                     sql_field->interval,
484
507
  case DRIZZLE_TYPE_DATE:  // Rest of string types
485
508
  case DRIZZLE_TYPE_DATETIME:
486
509
  case DRIZZLE_TYPE_NULL:
 
510
    sql_field->pack_flag=f_settype((uint32_t) sql_field->sql_type);
487
511
    break;
488
512
  case DRIZZLE_TYPE_DECIMAL:
 
513
    sql_field->pack_flag= 0;
489
514
    break;
490
515
  case DRIZZLE_TYPE_TIMESTAMP:
491
516
    /* We should replace old TIMESTAMP fields with their newer analogs */
497
522
        (*timestamps_with_niladic)++;
498
523
      }
499
524
      else
500
 
      {
501
525
        sql_field->unireg_check= Field::NONE;
502
 
      }
503
526
    }
504
527
    else if (sql_field->unireg_check != Field::NONE)
505
528
      (*timestamps_with_niladic)++;
507
530
    (*timestamps)++;
508
531
    /* fall-through */
509
532
  default:
 
533
    sql_field->pack_flag=(0 |
 
534
                          f_settype((uint32_t) sql_field->sql_type));
510
535
    break;
511
536
  }
512
 
 
513
537
  return 0;
514
538
}
515
539
 
519
543
                                      AlterInfo *alter_info,
520
544
                                      bool tmp_table,
521
545
                                      uint32_t *db_options,
522
 
                                      KeyInfo **key_info_buffer,
 
546
                                      KEY **key_info_buffer,
523
547
                                      uint32_t *key_count,
524
548
                                      int select_field_count)
525
549
{
527
551
  CreateField   *sql_field,*dup_field;
528
552
  uint          field,null_fields,blob_columns,max_key_length;
529
553
  ulong         record_offset= 0;
530
 
  KeyInfo               *key_info;
531
 
  KeyPartInfo *key_part_info;
 
554
  KEY           *key_info;
 
555
  KEY_PART_INFO *key_part_info;
532
556
  int           timestamps= 0, timestamps_with_niladic= 0;
533
557
  int           field_no,dup_no;
534
558
  int           select_field_pos,auto_increment=0;
606
630
 
607
631
    if (sql_field->sql_type == DRIZZLE_TYPE_ENUM)
608
632
    {
609
 
      size_t dummy;
 
633
      uint32_t dummy;
610
634
      const CHARSET_INFO * const cs= sql_field->charset;
611
635
      TYPELIB *interval= sql_field->interval;
612
636
 
639
663
          if (String::needs_conversion(tmp->length(), tmp->charset(),
640
664
                                       cs, &dummy))
641
665
          {
642
 
            size_t cnv_errs;
 
666
            uint32_t cnv_errs;
643
667
            conv.copy(tmp->ptr(), tmp->length(), tmp->charset(), cs, &cnv_errs);
644
668
            interval->type_names[i]= session->mem_root->strmake_root(conv.ptr(), conv.length());
645
669
            interval->type_lengths[i]= conv.length();
681
705
            }
682
706
          }
683
707
        }
684
 
        uint32_t new_dummy;
685
 
        calculate_interval_lengths(cs, interval, &field_length, &new_dummy);
 
708
        calculate_interval_lengths(cs, interval, &field_length, &dummy);
686
709
        sql_field->length= field_length;
687
710
      }
688
711
      set_if_smaller(sql_field->length, (uint32_t)MAX_FIELD_WIDTH-1);
750
773
    /** @todo Get rid of this MyISAM-specific crap. */
751
774
    if (not create_proto.engine().name().compare("MyISAM") &&
752
775
        ((sql_field->flags & BLOB_FLAG) ||
753
 
         (sql_field->sql_type == DRIZZLE_TYPE_VARCHAR)))
 
776
         (sql_field->sql_type == DRIZZLE_TYPE_VARCHAR && create_info->row_type != ROW_TYPE_FIXED)))
754
777
      (*db_options)|= HA_OPTION_PACK_RECORD;
755
778
    it2.rewind();
756
779
  }
818
841
      fk_key_count++;
819
842
      if (((Foreign_key *)key)->validate(alter_info->create_list))
820
843
        return true;
821
 
 
822
844
      Foreign_key *fk_key= (Foreign_key*) key;
823
 
 
824
 
      add_foreign_key_to_table_message(&create_proto,
825
 
                                       fk_key->name.str,
826
 
                                       fk_key->columns,
827
 
                                       fk_key->ref_table,
828
 
                                       fk_key->ref_columns,
829
 
                                       fk_key->delete_opt,
830
 
                                       fk_key->update_opt,
831
 
                                       fk_key->match_opt);
832
 
 
833
845
      if (fk_key->ref_columns.elements &&
834
846
          fk_key->ref_columns.elements != fk_key->columns.elements)
835
847
      {
864
876
             key2->name.str != ignore_key &&
865
877
             !foreign_key_prefix(key, key2)))
866
878
        {
867
 
          /* @todo issue warning message */
 
879
          /* TODO: issue warning message */
868
880
          /* mark that the generated key should be ignored */
869
881
          if (!key2->generated ||
870
882
              (key->generated && key->columns.elements <
898
910
    return(true);
899
911
  }
900
912
 
901
 
  (*key_info_buffer)= key_info= (KeyInfo*) memory::sql_calloc(sizeof(KeyInfo) * (*key_count));
902
 
  key_part_info=(KeyPartInfo*) memory::sql_calloc(sizeof(KeyPartInfo)*key_parts);
 
913
  (*key_info_buffer)= key_info= (KEY*) memory::sql_calloc(sizeof(KEY) * (*key_count));
 
914
  key_part_info=(KEY_PART_INFO*) memory::sql_calloc(sizeof(KEY_PART_INFO)*key_parts);
903
915
  if (!*key_info_buffer || ! key_part_info)
904
916
    return(true);                               // Out of memory
905
917
 
939
951
    key_info->usable_key_parts= key_number;
940
952
    key_info->algorithm= key->key_create_info.algorithm;
941
953
 
 
954
    /* Take block size from key part or table part */
 
955
    /*
 
956
      TODO: Add warning if block size changes. We can't do it here, as
 
957
      this may depend on the size of the key
 
958
    */
 
959
    key_info->block_size= (key->key_create_info.block_size ?
 
960
                           key->key_create_info.block_size :
 
961
                           create_proto.options().key_block_size());
 
962
 
 
963
    if (key_info->block_size)
 
964
      key_info->flags|= HA_USES_BLOCK_SIZE;
 
965
 
942
966
    uint32_t tmp_len= system_charset_info->cset->charpos(system_charset_info,
943
967
                                           key->key_create_info.comment.str,
944
968
                                           key->key_create_info.comment.str +
1048
1072
 
1049
1073
      key_part_info->fieldnr= field;
1050
1074
      key_part_info->offset=  (uint16_t) sql_field->offset;
1051
 
      key_part_info->key_type= 0;
 
1075
      key_part_info->key_type=sql_field->pack_flag;
1052
1076
      length= sql_field->key_length;
1053
1077
 
1054
1078
      if (column->length)
1116
1140
      key_part_info->length=(uint16_t) length;
1117
1141
      /* Use packed keys for long strings on the first column */
1118
1142
      if (!((*db_options) & HA_OPTION_NO_PACK_KEYS) &&
1119
 
          (length >= KEY_DEFAULT_PACK_LENGTH &&
1120
 
           (sql_field->sql_type == DRIZZLE_TYPE_VARCHAR ||
1121
 
            sql_field->sql_type == DRIZZLE_TYPE_BLOB)))
 
1143
          (length >= KEY_DEFAULT_PACK_LENGTH &&
 
1144
           (sql_field->sql_type == DRIZZLE_TYPE_VARCHAR ||
 
1145
      sql_field->sql_type == DRIZZLE_TYPE_BLOB)))
1122
1146
      {
1123
1147
        if ((column_nr == 0 && sql_field->sql_type == DRIZZLE_TYPE_BLOB) ||
1124
1148
            sql_field->sql_type == DRIZZLE_TYPE_VARCHAR)
1125
 
        {
1126
1149
          key_info->flags|= HA_BINARY_PACK_KEY | HA_VAR_LENGTH_KEY;
1127
 
        }
1128
1150
        else
1129
 
        {
1130
1151
          key_info->flags|= HA_PACK_KEY;
1131
 
        }
1132
1152
      }
1133
1153
      /* Check if the key segment is partial, set the key flag accordingly */
1134
1154
      if (length != sql_field->key_length)
1190
1210
    return(true);
1191
1211
  }
1192
1212
  /* Sort keys in optimized order */
1193
 
  internal::my_qsort((unsigned char*) *key_info_buffer, *key_count, sizeof(KeyInfo),
 
1213
  internal::my_qsort((unsigned char*) *key_info_buffer, *key_count, sizeof(KEY),
1194
1214
                     (qsort_cmp) sort_keys);
1195
1215
 
1196
1216
  /* Check fields. */
1265
1285
}
1266
1286
 
1267
1287
static bool locked_create_event(Session *session,
1268
 
                                const TableIdentifier &identifier,
 
1288
                                TableIdentifier &identifier,
1269
1289
                                HA_CREATE_INFO *create_info,
1270
1290
                                message::Table &table_proto,
1271
1291
                                AlterInfo *alter_info,
1273
1293
                                bool internal_tmp_table,
1274
1294
                                uint db_options,
1275
1295
                                uint key_count,
1276
 
                                KeyInfo *key_info_buffer)
 
1296
                                KEY *key_info_buffer)
1277
1297
{
1278
1298
  bool error= true;
1279
1299
 
1300
1320
        return error;
1301
1321
      }
1302
1322
 
1303
 
      std::string path;
1304
 
      identifier.getSQLPath(path);
1305
 
      my_error(ER_TABLE_EXISTS_ERROR, MYF(0), path.c_str());
1306
 
 
 
1323
      my_error(ER_TABLE_EXISTS_ERROR, MYF(0), identifier.getSQLPath().c_str());
1307
1324
      return error;
1308
1325
    }
1309
1326
 
1320
1337
      /*
1321
1338
        @todo improve this error condition.
1322
1339
      */
1323
 
      if (definition::Cache::singleton().find(identifier.getKey()))
 
1340
      if (TableShare::getShare(identifier))
1324
1341
      {
1325
 
        std::string path;
1326
 
        identifier.getSQLPath(path);
1327
 
        my_error(ER_TABLE_EXISTS_ERROR, MYF(0), path.c_str());
1328
 
 
 
1342
        my_error(ER_TABLE_EXISTS_ERROR, MYF(0), identifier.getSQLPath().c_str());
1329
1343
        return error;
1330
1344
      }
1331
1345
    }
1401
1415
*/
1402
1416
 
1403
1417
bool mysql_create_table_no_lock(Session *session,
1404
 
                                const TableIdentifier &identifier,
 
1418
                                TableIdentifier &identifier,
1405
1419
                                HA_CREATE_INFO *create_info,
1406
1420
                                message::Table &table_proto,
1407
1421
                                AlterInfo *alter_info,
1410
1424
                                bool is_if_not_exists)
1411
1425
{
1412
1426
  uint          db_options, key_count;
1413
 
  KeyInfo               *key_info_buffer;
 
1427
  KEY           *key_info_buffer;
1414
1428
  bool          error= true;
 
1429
  TableShare share;
1415
1430
 
1416
1431
  /* Check for duplicate fields and check type of table to create */
1417
1432
  if (not alter_info->create_list.elements)
1423
1438
  assert(identifier.getTableName() == table_proto.name());
1424
1439
  db_options= create_info->table_options;
1425
1440
 
 
1441
  if (create_info->row_type == ROW_TYPE_DYNAMIC)
 
1442
    db_options|=HA_OPTION_PACK_RECORD;
 
1443
 
1426
1444
  set_table_default_charset(create_info, identifier.getSchemaName().c_str());
1427
1445
 
1428
1446
  /* Build a Table object to pass down to the engine, and the do the actual create. */
1429
1447
  if (not mysql_prepare_create_table(session, create_info, table_proto, alter_info,
1430
 
                                     internal_tmp_table,
1431
 
                                     &db_options,
1432
 
                                     &key_info_buffer, &key_count,
1433
 
                                     select_field_count))
 
1448
                                 internal_tmp_table,
 
1449
                                 &db_options,
 
1450
                                 &key_info_buffer, &key_count,
 
1451
                                 select_field_count))
1434
1452
  {
1435
 
    boost_unique_lock_t lock(table::Cache::singleton().mutex()); /* CREATE TABLE (some confussion on naming, double check) */
 
1453
    pthread_mutex_lock(&LOCK_open); /* CREATE TABLE (some confussion on naming, double check) */
1436
1454
    error= locked_create_event(session,
1437
1455
                               identifier,
1438
1456
                               create_info,
1442
1460
                               internal_tmp_table,
1443
1461
                               db_options, key_count,
1444
1462
                               key_info_buffer);
 
1463
    pthread_mutex_unlock(&LOCK_open);
1445
1464
  }
1446
1465
 
1447
1466
  session->set_proc_info("After create");
1453
1472
  @note the following two methods implement create [temporary] table.
1454
1473
*/
1455
1474
static bool drizzle_create_table(Session *session,
1456
 
                                 const TableIdentifier &identifier,
 
1475
                                 TableIdentifier &identifier,
1457
1476
                                 HA_CREATE_INFO *create_info,
1458
1477
                                 message::Table &table_proto,
1459
1478
                                 AlterInfo *alter_info,
1480
1499
    }
1481
1500
    else
1482
1501
    {
1483
 
      std::string path;
1484
 
      identifier.getSQLPath(path);
1485
 
      my_error(ER_TABLE_EXISTS_ERROR, MYF(0), path.c_str());
 
1502
      my_error(ER_TABLE_EXISTS_ERROR, MYF(0), identifier.getSQLPath().c_str());
1486
1503
      result= true;
1487
1504
    }
1488
1505
  }
1500
1517
 
1501
1518
  if (name_lock)
1502
1519
  {
1503
 
    boost_unique_lock_t lock(table::Cache::singleton().mutex()); /* Lock for removing name_lock during table create */
 
1520
    pthread_mutex_lock(&LOCK_open); /* Lock for removing name_lock during table create */
1504
1521
    session->unlink_open_table(name_lock);
 
1522
    pthread_mutex_unlock(&LOCK_open);
1505
1523
  }
1506
1524
 
1507
1525
  return(result);
1512
1530
  Database locking aware wrapper for mysql_create_table_no_lock(),
1513
1531
*/
1514
1532
bool mysql_create_table(Session *session,
1515
 
                        const TableIdentifier &identifier,
 
1533
                        TableIdentifier &identifier,
1516
1534
                        HA_CREATE_INFO *create_info,
1517
1535
                        message::Table &table_proto,
1518
1536
                        AlterInfo *alter_info,
1548
1566
**/
1549
1567
 
1550
1568
static bool
1551
 
check_if_keyname_exists(const char *name, KeyInfo *start, KeyInfo *end)
 
1569
check_if_keyname_exists(const char *name, KEY *start, KEY *end)
1552
1570
{
1553
 
  for (KeyInfo *key=start ; key != end ; key++)
 
1571
  for (KEY *key=start ; key != end ; key++)
1554
1572
    if (!my_strcasecmp(system_charset_info,name,key->name))
1555
1573
      return 1;
1556
1574
  return 0;
1558
1576
 
1559
1577
 
1560
1578
static char *
1561
 
make_unique_key_name(const char *field_name,KeyInfo *start,KeyInfo *end)
 
1579
make_unique_key_name(const char *field_name,KEY *start,KEY *end)
1562
1580
{
1563
1581
  char buff[MAX_FIELD_NAME],*buff_end;
1564
1582
 
1593
1611
 
1594
1612
  SYNOPSIS
1595
1613
    mysql_rename_table()
1596
 
      session
1597
1614
      base                      The plugin::StorageEngine handle.
1598
1615
      old_db                    The old database name.
1599
1616
      old_name                  The old table name.
1606
1623
*/
1607
1624
 
1608
1625
bool
1609
 
mysql_rename_table(Session &session,
1610
 
                   plugin::StorageEngine *base,
1611
 
                   const TableIdentifier &from,
1612
 
                   const TableIdentifier &to)
 
1626
mysql_rename_table(plugin::StorageEngine *base,
 
1627
                   TableIdentifier &from,
 
1628
                   TableIdentifier &to)
1613
1629
{
 
1630
  Session *session= current_session;
1614
1631
  int error= 0;
1615
1632
 
1616
1633
  assert(base);
1621
1638
    return true;
1622
1639
  }
1623
1640
 
1624
 
  error= base->renameTable(session, from, to);
 
1641
  error= base->renameTable(*session, from, to);
1625
1642
 
1626
1643
  if (error == HA_ERR_WRONG_COMMAND)
1627
1644
  {
1629
1646
  }
1630
1647
  else if (error)
1631
1648
  {
1632
 
    std::string from_path;
1633
 
    std::string to_path;
1634
 
 
1635
 
    from.getSQLPath(from_path);
1636
 
    to.getSQLPath(to_path);
1637
 
 
1638
 
    const char *from_identifier= from.isTmp() ? "#sql-temporary" : from_path.c_str();
1639
 
    const char *to_identifier= to.isTmp() ? "#sql-temporary" : to_path.c_str();
 
1649
    const char *from_identifier= from.isTmp() ? "#sql-temporary" : from.getSQLPath().c_str();
 
1650
    const char *to_identifier= to.isTmp() ? "#sql-temporary" : to.getSQLPath().c_str();
1640
1651
 
1641
1652
    my_error(ER_ERROR_ON_RENAME, MYF(0), from_identifier, to_identifier, error);
1642
1653
  }
1660
1671
   the table is closed.
1661
1672
 
1662
1673
  PREREQUISITES
1663
 
    Lock on table::Cache::singleton().mutex()
 
1674
    Lock on LOCK_open
1664
1675
    Win32 clients must also have a WRITE LOCK on the table !
1665
1676
*/
1666
1677
 
1668
1679
                              enum ha_extra_function function)
1669
1680
{
1670
1681
 
1671
 
  safe_mutex_assert_owner(table::Cache::singleton().mutex().native_handle());
 
1682
  safe_mutex_assert_owner(&LOCK_open);
1672
1683
 
1673
1684
  table->cursor->extra(function);
1674
1685
  /* Mark all tables that are in use as 'old' */
1675
 
  session->abortLock(table);    /* end threads waiting on lock */
 
1686
  mysql_lock_abort(session, table);     /* end threads waiting on lock */
1676
1687
 
1677
1688
  /* Wait until all there are no other threads that has this table open */
1678
 
  TableIdentifier identifier(table->getShare()->getSchemaName(), table->getShare()->getTableName());
1679
 
  table::Cache::singleton().removeTable(session, identifier, RTFC_WAIT_OTHER_THREAD_FLAG);
 
1689
  remove_table_from_cache(session, table->s->getSchemaName(),
 
1690
                          table->s->table_name.str,
 
1691
                          RTFC_WAIT_OTHER_THREAD_FLAG);
1680
1692
}
1681
1693
 
1682
1694
/*
1692
1704
    reopen the table.
1693
1705
 
1694
1706
  PREREQUISITES
1695
 
    Lock on table::Cache::singleton().mutex()
 
1707
    Lock on LOCK_open
1696
1708
    Win32 clients must also have a WRITE LOCK on the table !
1697
1709
*/
1698
1710
 
1703
1715
  /* Close lock if this is not got with LOCK TABLES */
1704
1716
  if (lock)
1705
1717
  {
1706
 
    unlockTables(lock);
 
1718
    mysql_unlock_tables(this, lock);
1707
1719
    lock= NULL;                 // Start locked threads
1708
1720
  }
1709
1721
  /* Close all copies of 'table'.  This also frees all LOCK TABLES lock */
1710
1722
  unlink_open_table(table);
1711
1723
 
1712
 
  /* When lock on table::Cache::singleton().mutex() is freed other threads can continue */
1713
 
  locking::broadcast_refresh();
 
1724
  /* When lock on LOCK_open is freed other threads can continue */
 
1725
  broadcast_refresh();
1714
1726
}
1715
1727
 
1716
1728
/*
1754
1766
  for (table= tables; table; table= table->next_local)
1755
1767
  {
1756
1768
    char table_name[NAME_LEN*2+2];
 
1769
    char* db = table->db;
1757
1770
    bool fatal_error=0;
1758
1771
 
1759
 
    snprintf(table_name, sizeof(table_name), "%s.%s", table->getSchemaName(), table->getTableName());
 
1772
    snprintf(table_name, sizeof(table_name), "%s.%s",db,table->table_name);
1760
1773
    table->lock_type= lock_type;
1761
1774
    /* open only one table from local list of command */
1762
1775
    {
1769
1782
      /*
1770
1783
        Time zone tables and SP tables can be add to lex->query_tables list,
1771
1784
        so it have to be prepared.
1772
 
        @todo Investigate if we can put extra tables into argument instead of using lex->query_tables
 
1785
        TODO: Investigate if we can put extra tables into argument instead of
 
1786
        using lex->query_tables
1773
1787
      */
1774
1788
      lex->query_tables= table;
1775
1789
      lex->query_tables_last= &table->next_global;
1820
1834
    }
1821
1835
 
1822
1836
    /* Close all instances of the table to allow repair to rename files */
1823
 
    if (lock_type == TL_WRITE && table->table->getShare()->getVersion())
 
1837
    if (lock_type == TL_WRITE && table->table->s->version)
1824
1838
    {
1825
 
      table::Cache::singleton().mutex().lock(); /* Lock type is TL_WRITE and we lock to repair the table */
1826
 
      const char *old_message=session->enter_cond(COND_refresh, table::Cache::singleton().mutex(),
1827
 
                                                  "Waiting to get writelock");
1828
 
      session->abortLock(table->table);
1829
 
      TableIdentifier identifier(table->table->getShare()->getSchemaName(), table->table->getShare()->getTableName());
1830
 
      table::Cache::singleton().removeTable(session, identifier, RTFC_WAIT_OTHER_THREAD_FLAG | RTFC_CHECK_KILLED_FLAG);
 
1839
      pthread_mutex_lock(&LOCK_open); /* Lock type is TL_WRITE and we lock to repair the table */
 
1840
      const char *old_message=session->enter_cond(&COND_refresh, &LOCK_open,
 
1841
                                              "Waiting to get writelock");
 
1842
      mysql_lock_abort(session,table->table);
 
1843
      remove_table_from_cache(session, table->table->s->getSchemaName(),
 
1844
                              table->table->s->table_name.str,
 
1845
                              RTFC_WAIT_OTHER_THREAD_FLAG |
 
1846
                              RTFC_CHECK_KILLED_FLAG);
1831
1847
      session->exit_cond(old_message);
1832
 
      if (session->getKilled())
 
1848
      if (session->killed)
1833
1849
        goto err;
1834
1850
      open_for_modify= 0;
1835
1851
    }
1916
1932
    if (table->table)
1917
1933
    {
1918
1934
      if (fatal_error)
1919
 
      {
1920
 
        table->table->getMutableShare()->resetVersion();               // Force close of table
1921
 
      }
 
1935
        table->table->s->version=0;               // Force close of table
1922
1936
      else if (open_for_modify)
1923
1937
      {
1924
 
        if (table->table->getShare()->getType())
1925
 
        {
 
1938
        if (table->table->s->tmp_table)
1926
1939
          table->table->cursor->info(HA_STATUS_CONST);
1927
 
        }
1928
1940
        else
1929
1941
        {
1930
 
          boost::unique_lock<boost::mutex> lock(table::Cache::singleton().mutex());
1931
 
          TableIdentifier identifier(table->table->getShare()->getSchemaName(), table->table->getShare()->getTableName());
1932
 
          table::Cache::singleton().removeTable(session, identifier, RTFC_NO_FLAG);
 
1942
          pthread_mutex_lock(&LOCK_open);
 
1943
          remove_table_from_cache(session, table->table->s->getSchemaName(),
 
1944
                                  table->table->s->table_name.str, RTFC_NO_FLAG);
 
1945
          pthread_mutex_unlock(&LOCK_open);
1933
1946
        }
1934
1947
      }
1935
1948
    }
1953
1966
  return(true);
1954
1967
}
1955
1968
 
 
1969
/*
 
1970
  We have to write the query before we unlock the named table.
 
1971
 
 
1972
  Since temporary tables are not replicated under row-based
 
1973
  replication, CREATE TABLE ... LIKE ... needs special
 
1974
  treatement.  We have four cases to consider, according to the
 
1975
  following decision table:
 
1976
 
 
1977
  ==== ========= ========= ==============================
 
1978
  Case    Target    Source Write to binary log
 
1979
  ==== ========= ========= ==============================
 
1980
  1       normal    normal Original statement
 
1981
  2       normal temporary Generated statement
 
1982
  3    temporary    normal Nothing
 
1983
  4    temporary temporary Nothing
 
1984
  ==== ========= ========= ==============================
 
1985
*/
 
1986
static bool replicateCreateTableLike(Session *session, TableList *table, Table *name_lock,
 
1987
                                     bool is_src_table_tmp, bool is_if_not_exists)
 
1988
{
 
1989
  if (is_src_table_tmp)
 
1990
  {
 
1991
    char buf[2048];
 
1992
    String query(buf, sizeof(buf), system_charset_info);
 
1993
    query.length(0);  // Have to zero it since constructor doesn't
 
1994
 
 
1995
 
 
1996
    /*
 
1997
      Here we open the destination table, on which we already have
 
1998
      name-lock. This is needed for store_create_info() to work.
 
1999
      The table will be closed by unlink_open_table() at the end
 
2000
      of this function.
 
2001
    */
 
2002
    table->table= name_lock;
 
2003
    pthread_mutex_lock(&LOCK_open); /* Open new table we have just acquired */
 
2004
    if (session->reopen_name_locked_table(table, false))
 
2005
    {
 
2006
      pthread_mutex_unlock(&LOCK_open);
 
2007
      return false;
 
2008
    }
 
2009
    pthread_mutex_unlock(&LOCK_open);
 
2010
 
 
2011
    int result= store_create_info(table, &query, is_if_not_exists);
 
2012
 
 
2013
    assert(result == 0); // store_create_info() always return 0
 
2014
    write_bin_log(session, query.ptr());
 
2015
  }
 
2016
  else                                      // Case 1
 
2017
  {
 
2018
    write_bin_log(session, session->query.c_str());
 
2019
  }
 
2020
 
 
2021
  return true;
 
2022
}
 
2023
 
1956
2024
  /*
1957
2025
    Create a new table by copying from source table
1958
2026
 
1959
2027
    Altough exclusive name-lock on target table protects us from concurrent
1960
2028
    DML and DDL operations on it we still want to wrap .FRM creation and call
1961
2029
    to plugin::StorageEngine::createTable() in critical section protected by
1962
 
    table::Cache::singleton().mutex() in order to provide minimal atomicity against operations which
 
2030
    LOCK_open in order to provide minimal atomicity against operations which
1963
2031
    disregard name-locks, like I_S implementation, for example. This is a
1964
2032
    temporary and should not be copied. Instead we should fix our code to
1965
2033
    always honor name-locks.
1966
2034
 
1967
 
    Also some engines (e.g. NDB cluster) require that table::Cache::singleton().mutex() should be held
 
2035
    Also some engines (e.g. NDB cluster) require that LOCK_open should be held
1968
2036
    during the call to plugin::StorageEngine::createTable().
1969
2037
    See bug #28614 for more info.
1970
2038
  */
1971
2039
static bool create_table_wrapper(Session &session, const message::Table& create_table_proto,
1972
 
                                 const TableIdentifier &destination_identifier,
1973
 
                                 const TableIdentifier &src_table,
 
2040
                                 TableIdentifier &destination_identifier,
 
2041
                                 TableIdentifier &src_table,
1974
2042
                                 bool is_engine_set)
1975
2043
{
1976
2044
  int protoerr= EEXIST;
1977
2045
  message::Table new_proto;
1978
 
  message::table::shared_ptr src_proto;
 
2046
  message::Table src_proto;
1979
2047
 
1980
2048
  protoerr= plugin::StorageEngine::getTableDefinition(session,
1981
2049
                                                      src_table,
1982
2050
                                                      src_proto);
1983
 
  new_proto.CopyFrom(*src_proto);
 
2051
  new_proto.CopyFrom(src_proto);
1984
2052
 
1985
2053
  if (destination_identifier.isTmp())
1986
2054
  {
1993
2061
 
1994
2062
  if (is_engine_set)
1995
2063
  {
1996
 
    new_proto.mutable_engine()->set_name(create_table_proto.engine().name());
 
2064
    message::Table::StorageEngine *protoengine;
 
2065
 
 
2066
    protoengine= new_proto.mutable_engine();
 
2067
    protoengine->set_name(create_table_proto.engine().name());
1997
2068
  }
1998
2069
 
1999
2070
  { // We now do a selective copy of elements on to the new table.
2019
2090
  */
2020
2091
  int err= plugin::StorageEngine::createTable(session,
2021
2092
                                              destination_identifier,
2022
 
                                              new_proto);
2023
 
 
2024
 
  if (err == false && not destination_identifier.isTmp())
2025
 
  {
2026
 
    TransactionServices &transaction_services= TransactionServices::singleton();
2027
 
    transaction_services.createTable(&session, new_proto);
2028
 
  }
 
2093
                                              true, new_proto);
2029
2094
 
2030
2095
  return err ? false : true;
2031
2096
}
2046
2111
*/
2047
2112
 
2048
2113
bool mysql_create_like_table(Session* session,
2049
 
                             const TableIdentifier &destination_identifier,
 
2114
                             TableIdentifier &destination_identifier,
2050
2115
                             TableList* table, TableList* src_table,
2051
2116
                             message::Table &create_table_proto,
2052
2117
                             bool is_if_not_exists,
2053
2118
                             bool is_engine_set)
2054
2119
{
 
2120
  Table *name_lock= 0;
 
2121
  char *db= table->db;
 
2122
  char *table_name= table->table_name;
2055
2123
  bool res= true;
2056
2124
  uint32_t not_used;
 
2125
  bool was_created;
2057
2126
 
2058
2127
  /*
2059
2128
    By opening source table we guarantee that it exists and no concurrent
2067
2136
  if (session->open_tables_from_list(&src_table, &not_used))
2068
2137
    return true;
2069
2138
 
2070
 
  TableIdentifier src_identifier(src_table->table->getShare()->getSchemaName(),
2071
 
                                 src_table->table->getShare()->getTableName(), src_table->table->getShare()->getType());
 
2139
  TableIdentifier src_identifier(src_table->table->s->getSchemaName(),
 
2140
                                 src_table->table->s->table_name.str, src_table->table->s->tmp_table);
2072
2141
 
2073
2142
 
2074
2143
 
2075
2144
  /*
2076
2145
    Check that destination tables does not exist. Note that its name
2077
2146
    was already checked when it was added to the table list.
2078
 
 
2079
 
    For temporary tables we don't aim to grab locks.
2080
2147
  */
2081
2148
  bool table_exists= false;
2082
2149
  if (destination_identifier.isTmp())
2083
2150
  {
2084
 
    if (session->find_temporary_table(destination_identifier))
 
2151
    if (session->find_temporary_table(db, table_name))
2085
2152
    {
2086
2153
      table_exists= true;
2087
2154
    }
2088
 
    else
2089
 
    {
2090
 
      bool was_created= create_table_wrapper(*session, create_table_proto, destination_identifier,
2091
 
                                             src_identifier, is_engine_set);
2092
 
      if (not was_created) // This is pretty paranoid, but we assume something might not clean up after itself
2093
 
      {
2094
 
        (void) session->rm_temporary_table(destination_identifier, true);
2095
 
      }
2096
 
      else if (not session->open_temporary_table(destination_identifier))
2097
 
      {
2098
 
        // We created, but we can't open... also, a hack.
2099
 
        (void) session->rm_temporary_table(destination_identifier, true);
2100
 
      }
2101
 
      else
2102
 
      {
2103
 
        res= false;
2104
 
      }
2105
 
    }
2106
2155
  }
2107
 
  else // Standard table which will require locks.
 
2156
  else
2108
2157
  {
2109
 
    Table *name_lock= 0;
2110
 
 
2111
 
    if (session->lock_table_name_if_not_cached(destination_identifier, &name_lock))
 
2158
    if (session->lock_table_name_if_not_cached(db, table_name, &name_lock))
2112
2159
    {
2113
2160
      if (name_lock)
2114
2161
      {
2115
 
        boost_unique_lock_t lock(table::Cache::singleton().mutex()); /* unlink open tables for create table like*/
 
2162
        pthread_mutex_lock(&LOCK_open); /* unlink open tables for create table like*/
2116
2163
        session->unlink_open_table(name_lock);
 
2164
        pthread_mutex_unlock(&LOCK_open);
2117
2165
      }
2118
2166
 
2119
2167
      return res;
2127
2175
    {
2128
2176
      table_exists= true;
2129
2177
    }
2130
 
    else // Otherwise we create the table
2131
 
    {
2132
 
      bool was_created;
2133
 
      {
2134
 
        boost_unique_lock_t lock(table::Cache::singleton().mutex()); /* We lock for CREATE TABLE LIKE to copy table definition */
2135
 
        was_created= create_table_wrapper(*session, create_table_proto, destination_identifier,
2136
 
                                               src_identifier, is_engine_set);
2137
 
      }
2138
 
 
2139
 
      // So we blew the creation of the table, and we scramble to clean up
2140
 
      // anything that might have been created (read... it is a hack)
2141
 
      if (not was_created)
2142
 
      {
2143
 
        plugin::StorageEngine::dropTable(*session, destination_identifier);
2144
 
      } 
2145
 
      else
2146
 
      {
2147
 
        res= false;
2148
 
      }
2149
 
    }
2150
 
 
2151
 
    if (name_lock)
2152
 
    {
2153
 
      boost_unique_lock_t lock(table::Cache::singleton().mutex()); /* unlink open tables for create table like*/
2154
 
      session->unlink_open_table(name_lock);
2155
 
    }
2156
2178
  }
2157
2179
 
2158
2180
  if (table_exists)
2161
2183
    {
2162
2184
      char warn_buff[DRIZZLE_ERRMSG_SIZE];
2163
2185
      snprintf(warn_buff, sizeof(warn_buff),
2164
 
               ER(ER_TABLE_EXISTS_ERROR), table->getTableName());
 
2186
               ER(ER_TABLE_EXISTS_ERROR), table_name);
2165
2187
      push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
2166
2188
                   ER_TABLE_EXISTS_ERROR,warn_buff);
2167
2189
      res= false;
2168
2190
    }
2169
2191
    else
2170
2192
    {
2171
 
      my_error(ER_TABLE_EXISTS_ERROR, MYF(0), table->getTableName());
2172
 
    }
 
2193
      my_error(ER_TABLE_EXISTS_ERROR, MYF(0), table_name);
 
2194
    }
 
2195
  }
 
2196
  else // Otherwise we create the table
 
2197
  {
 
2198
    pthread_mutex_lock(&LOCK_open); /* We lock for CREATE TABLE LIKE to copy table definition */
 
2199
    was_created= create_table_wrapper(*session, create_table_proto, destination_identifier,
 
2200
                                      src_identifier, is_engine_set);
 
2201
    pthread_mutex_unlock(&LOCK_open);
 
2202
 
 
2203
    // So we blew the creation of the table, and we scramble to clean up
 
2204
    // anything that might have been created (read... it is a hack)
 
2205
    if (not was_created)
 
2206
    {
 
2207
      if (destination_identifier.isTmp())
 
2208
      {
 
2209
        (void) session->rm_temporary_table(destination_identifier);
 
2210
      }
 
2211
      else
 
2212
      {
 
2213
        quick_rm_table(*session, destination_identifier);
 
2214
      }
 
2215
    } 
 
2216
    else if (destination_identifier.isTmp() && not session->open_temporary_table(destination_identifier))
 
2217
    {
 
2218
      // We created, but we can't open... also, a hack.
 
2219
      (void) session->rm_temporary_table(destination_identifier);
 
2220
    }
 
2221
    else
 
2222
    {
 
2223
      if (not destination_identifier.isTmp())
 
2224
      {
 
2225
        bool rc= replicateCreateTableLike(session, table, name_lock, (src_table->table->s->tmp_table), is_if_not_exists);
 
2226
        (void)rc;
 
2227
      }
 
2228
 
 
2229
      res= false;
 
2230
    }
 
2231
  }
 
2232
 
 
2233
  if (name_lock)
 
2234
  {
 
2235
    pthread_mutex_lock(&LOCK_open); /* unlink open tables for create table like*/
 
2236
    session->unlink_open_table(name_lock);
 
2237
    pthread_mutex_unlock(&LOCK_open);
2173
2238
  }
2174
2239
 
2175
2240
  return(res);