~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbxt/src/discover_xt.cc

lp:drizzle + pbxt 1.1 + test results

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
 
2
 * Derived from code Copyright (C) 2000-2004 MySQL AB
 
3
 *
 
4
 * PrimeBase XT
 
5
 *
 
6
 * This program is free software; you can redistribute it and/or modify
 
7
 * it under the terms of the GNU General Public License as published by
 
8
 * the Free Software Foundation; either version 2 of the License, or
 
9
 * (at your option) any later version.
 
10
 *
 
11
 * This program is distributed in the hope that it will be useful,
 
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
 * GNU General Public License for more details.
 
15
 *
 
16
 * You should have received a copy of the GNU General Public License
 
17
 * along with this program; if not, write to the Free Software
 
18
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
19
 *
 
20
 *  Created by Leslie on 8/27/08.
 
21
 *
 
22
 */
 
23
 
 
24
#include "xt_config.h"
 
25
 
 
26
#ifndef DRIZZLED
 
27
#include "mysql_priv.h"
 
28
#include "item_create.h"
 
29
#include <m_ctype.h>
 
30
#else
 
31
#include <drizzled/session.h>
 
32
#include <drizzled/sql_base.h>
 
33
#include <drizzled/statement/alter_table.h>
 
34
#include <drizzled/sql_table.h>
 
35
#include <algorithm>
 
36
#include <sstream>
 
37
extern pid_t drizzled::current_pid;
 
38
namespace drizzled { 
 
39
namespace internal {
 
40
size_t unpack_filename(char * to, const char *from);
 
41
}
 
42
}
 
43
using namespace drizzled;
 
44
using namespace drizzled::internal;
 
45
#endif
 
46
 
 
47
#include "strutil_xt.h"
 
48
#include "ha_pbxt.h"
 
49
#include "discover_xt.h"
 
50
#include "ha_xtsys.h"
 
51
 
 
52
#ifndef DRIZZLED
 
53
#if MYSQL_VERSION_ID >= 50404
 
54
#define DOT_STR(x)                      x.str
 
55
#else
 
56
#define DOT_STR(x)                      x
 
57
#endif
 
58
#endif
 
59
 
 
60
//#ifndef DRIZZLED
 
61
#define LOCK_OPEN_HACK_REQUIRED
 
62
//#endif // DRIZZLED
 
63
 
 
64
#ifdef LOCK_OPEN_HACK_REQUIRED
 
65
#ifdef DRIZZLED
 
66
 
 
67
using namespace drizzled;
 
68
using namespace std;
 
69
 
 
70
#define mysql_create_table_no_lock hacked_mysql_create_table_no_lock
 
71
 
 
72
namespace drizzled {
 
73
 
 
74
int rea_create_table(Session *session,
 
75
                     TableIdentifier &identifier,
 
76
                     message::Table &table_proto,
 
77
                     HA_CREATE_INFO *create_info,
 
78
                     List<CreateField> &create_field,
 
79
                     uint32_t key_count,KEY *key_info);
 
80
 
 
81
int mysql_prepare_create_table(Session *session,
 
82
                                      HA_CREATE_INFO *create_info,
 
83
                                      message::Table &create_proto,
 
84
                                      AlterInfo *alter_info,
 
85
                                      bool tmp_table,
 
86
                                      uint32_t *db_options,
 
87
                                      KEY **key_info_buffer,
 
88
                                      uint32_t *key_count,
 
89
                                      int select_field_count);
 
90
 
 
91
}
 
92
 
 
93
static uint32_t build_tmptable_filename(Session* session,
 
94
                                        char *buff, size_t bufflen)
 
95
{
 
96
  uint32_t length;
 
97
  ostringstream path_str, post_tmpdir_str;
 
98
  string tmp;
 
99
 
 
100
  path_str << drizzle_tmpdir;
 
101
  post_tmpdir_str << "/" << TMP_FILE_PREFIX << current_pid;
 
102
  post_tmpdir_str << session->thread_id << session->tmp_table++;
 
103
  tmp= post_tmpdir_str.str();
 
104
 
 
105
  transform(tmp.begin(), tmp.end(), tmp.begin(), ::tolower);
 
106
 
 
107
  path_str << tmp;
 
108
 
 
109
  if (bufflen < path_str.str().length())
 
110
    length= 0;
 
111
  else
 
112
    length= unpack_filename(buff, path_str.str().c_str());
 
113
 
 
114
  return length;
 
115
}
 
116
 
 
117
static bool mysql_create_table_no_lock(Session *session,
 
118
                                const char *db, const char *table_name,
 
119
                                HA_CREATE_INFO *create_info,
 
120
                                message::Table *table_proto,
 
121
                                AlterInfo *alter_info,
 
122
                                bool internal_tmp_table,
 
123
                                uint32_t select_field_count,
 
124
                                bool skip_existing)
 
125
{
 
126
  char          path[FN_REFLEN];
 
127
  uint32_t          path_length;
 
128
  uint          db_options, key_count;
 
129
  KEY           *key_info_buffer;
 
130
  Cursor        *file;
 
131
  bool          error= true;
 
132
  /* Check for duplicate fields and check type of table to create */
 
133
  if (!alter_info->create_list.elements)
 
134
  {
 
135
    my_message(ER_TABLE_MUST_HAVE_COLUMNS, ER(ER_TABLE_MUST_HAVE_COLUMNS),
 
136
               MYF(0));
 
137
    return true;
 
138
  }
 
139
  assert(strcmp(table_name,table_proto->name().c_str())==0);
 
140
  //if (check_engine(session, table_name, table_proto, create_info))
 
141
  //  return true;
 
142
  db_options= create_info->table_options;
 
143
  if (create_info->row_type == ROW_TYPE_DYNAMIC)
 
144
    db_options|=HA_OPTION_PACK_RECORD;
 
145
  
 
146
  /*if (!(file= create_info->db_type->getCursor((TableShare*) 0, session->mem_root)))
 
147
  {
 
148
    my_error(ER_OUTOFMEMORY, MYF(0), sizeof(Cursor));
 
149
    return true;
 
150
  }*/
 
151
 
 
152
  TableIdentifier ident(db, table_name, table_proto->type());
 
153
 
 
154
  /* PMC - Done to avoid getting the partition handler by mistake! */
 
155
  if (!(file= new (session->mem_root) ha_xtsys(pbxt_hton, *TableShare::getShare(ident))))
 
156
  {
 
157
    my_error(ER_OUTOFMEMORY, MYF(0), sizeof(Cursor));
 
158
    return true;
 
159
  }
 
160
 
 
161
  set_table_default_charset(create_info, (char*) db);
 
162
 
 
163
  if (mysql_prepare_create_table(session, 
 
164
                                 create_info,
 
165
                                 *table_proto,
 
166
                                 alter_info,
 
167
                                 internal_tmp_table,
 
168
                                 &db_options,
 
169
                                 &key_info_buffer, &key_count,
 
170
                                 select_field_count))
 
171
    goto err;
 
172
 
 
173
      /* Check if table exists */
 
174
  if (table_proto->type() == message::Table::TEMPORARY)
 
175
  {
 
176
    path_length= build_tmptable_filename(session, path, sizeof(path));
 
177
  }
 
178
  else
 
179
  {
 
180
 #ifdef FN_DEVCHAR
 
181
    /* check if the table name contains FN_DEVCHAR when defined */
 
182
    if (strchr(table_name, FN_DEVCHAR))
 
183
    {
 
184
      my_error(ER_WRONG_TABLE_NAME, MYF(0), table_name);
 
185
      return true;
 
186
    }
 
187
#endif
 
188
    std::string path_str;
 
189
    path_length= build_table_filename(path_str, db, table_name, internal_tmp_table);
 
190
  }
 
191
 
 
192
  /* Check if table already exists */
 
193
  if ((table_proto->type() == message::Table::TEMPORARY) &&
 
194
      session->find_temporary_table(db, table_name))
 
195
  {
 
196
    if (skip_existing)
 
197
    {
 
198
      create_info->table_existed= 1;            // Mark that table existed
 
199
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
200
                          ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
 
201
                          table_name);
 
202
      error= 0;
 
203
      goto err;
 
204
    }
 
205
    my_error(ER_TABLE_EXISTS_ERROR, MYF(0), table_name);
 
206
    goto err;
 
207
  }
 
208
 
 
209
  //pthread_mutex_lock(&LOCK_open); /* CREATE TABLE (some confussion on naming, double check) */
 
210
  if (!internal_tmp_table && table_proto->type() != message::Table::TEMPORARY)
 
211
  {
 
212
    if (plugin::StorageEngine::getTableDefinition(*session,
 
213
                                                  ident, 
 
214
                                                  *table_proto,
 
215
                                                  false) == EEXIST)
 
216
    {
 
217
      if (skip_existing)
 
218
      {
 
219
        error= false;
 
220
        push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
221
                            ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
 
222
                            table_name);
 
223
        create_info->table_existed= 1;          // Mark that table existed
 
224
      }
 
225
      else
 
226
        my_error(ER_TABLE_EXISTS_ERROR,MYF(0),table_name);
 
227
 
 
228
      goto unlock_and_end;
 
229
    }
 
230
    /*
 
231
 *       We don't assert here, but check the result, because the table could be
 
232
 *             in the table definition cache and in the same time the .frm could be
 
233
 *                   missing from the disk, in case of manual intervention which deletes
 
234
 *                         the .frm file. The user has to use FLUSH TABLES; to clear the cache.
 
235
 *                               Then she could create the table. This case is pretty obscure and
 
236
 *                                     therefore we don't introduce a new error message only for it.
 
237
 *                                         */
 
238
    if (TableShare::getShare(ident))
 
239
    {
 
240
      my_error(ER_TABLE_EXISTS_ERROR, MYF(0), table_name);
 
241
      goto unlock_and_end;
 
242
    }
 
243
  }
 
244
  /*
 
245
 *     Check that table with given name does not already
 
246
 *         exist in any storage engine. In such a case it should
 
247
 *             be discovered and the error ER_TABLE_EXISTS_ERROR be returned
 
248
 *                 unless user specified CREATE TABLE IF EXISTS
 
249
 *                     The LOCK_open mutex has been locked to make sure no
 
250
 *                         one else is attempting to discover the table. Since
 
251
 *                             it's not on disk as a frm file, no one could be using it!
 
252
 *                               */
 
253
  if (table_proto->type() != message::Table::TEMPORARY)
 
254
  {
 
255
    bool create_if_not_exists = skip_existing;
 
256
 
 
257
    std::string path_str;
 
258
    uint32_t          table_path_length;
 
259
 
 
260
    table_path_length= build_table_filename(path_str, db, table_name, false);
 
261
 
 
262
    int retcode= plugin::StorageEngine::getTableDefinition(*session,
 
263
                                                           ident, 
 
264
                                                           *table_proto,
 
265
                                                           false);
 
266
    switch (retcode)
 
267
    {
 
268
      case ENOENT:
 
269
        /* Normal case, no table exists. we can go and create it */
 
270
        break;
 
271
      case EEXIST:
 
272
        if (create_if_not_exists)
 
273
        {
 
274
          error= false;
 
275
          push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
276
                              ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
 
277
                              table_name);
 
278
          create_info->table_existed= 1;                // Mark that table existed
 
279
          goto unlock_and_end;
 
280
        }
 
281
        my_error(ER_TABLE_EXISTS_ERROR,MYF(0),table_name);
 
282
        goto unlock_and_end;
 
283
      default:
 
284
        my_error(retcode, MYF(0),table_name);
 
285
        goto unlock_and_end;
 
286
    }
 
287
  }
 
288
 
 
289
  session->set_proc_info("creating table");
 
290
  create_info->table_existed= 0;                // Mark that table is created
 
291
 
 
292
  create_info->table_options=db_options;
 
293
 
 
294
  if (rea_create_table(session, ident,
 
295
                       *table_proto,
 
296
                       create_info, alter_info->create_list,
 
297
                       key_count, key_info_buffer))
 
298
    goto unlock_and_end;
 
299
 
 
300
  if (table_proto->type() == message::Table::TEMPORARY)
 
301
  {
 
302
    /* Open table and put in temporary table list */
 
303
    if (!(session->open_temporary_table(ident)))
 
304
    {
 
305
      (void) session->rm_temporary_table(ident);
 
306
      goto unlock_and_end;
 
307
    }
 
308
  }
 
309
 
 
310
  /*
 
311
 *     Don't write statement if:
 
312
 *         - It is an internal temporary table,
 
313
 *             - Row-based logging is used and it we are creating a temporary table, or
 
314
 *                 - The binary log is not open.
 
315
 *                     Otherwise, the statement shall be binlogged.
 
316
 *                        */
 
317
  if (!internal_tmp_table &&
 
318
      ((table_proto->type() != message::Table::TEMPORARY)))
 
319
    write_bin_log(session, session->getQueryString().c_str());
 
320
  error= false;
 
321
unlock_and_end:
 
322
  //pthread_mutex_unlock(&LOCK_open);
 
323
 
 
324
err:
 
325
  session->set_proc_info("After create");
 
326
  delete file;
 
327
  return(error);
 
328
}
 
329
 
 
330
#else // MySQL case
 
331
///////////////////////////////
 
332
/*
 
333
 * Unfortunately I cannot use the standard mysql_create_table_no_lock() because it will lock "LOCK_open"
 
334
 * which has already been locked while the server is performing table discovery. So I have added this hack 
 
335
 * in here to create my own version. The following macros will make the changes I need to get it to work.
 
336
 * The actual function code has been copied here without changes.
 
337
 *
 
338
 * Its almost enough to make you want to cry. :(
 
339
*/
 
340
//-----------------------------
 
341
 
 
342
#ifdef pthread_mutex_lock
 
343
#undef pthread_mutex_lock
 
344
#endif
 
345
 
 
346
#ifdef pthread_mutex_unlock
 
347
#undef pthread_mutex_unlock
 
348
#endif
 
349
 
 
350
#define mysql_create_table_no_lock hacked_mysql_create_table_no_lock
 
351
#define pthread_mutex_lock(l)
 
352
#define pthread_mutex_unlock(l)
 
353
 
 
354
#define check_engine(t, n, c) (0)
 
355
#define set_table_default_charset(t, c, d)
 
356
 
 
357
void calculate_interval_lengths(CHARSET_INFO *cs, TYPELIB *interval,
 
358
                                uint32 *max_length, uint32 *tot_length);
 
359
 
 
360
uint build_tmptable_filename(THD* thd, char *buff, size_t bufflen);
 
361
uint build_table_filename(char *buff, size_t bufflen, const char *db,
 
362
                          const char *table_name, const char *ext, uint flags);
 
363
 
 
364
//////////////////////////////////////////////////////////
 
365
////// START OF CUT AND PASTES FROM  sql_table.cc ////////
 
366
//////////////////////////////////////////////////////////
 
367
 
 
368
// sort_keys() cut and pasted directly from sql_table.cc. 
 
369
static int sort_keys(KEY *a, KEY *b)
 
370
{
 
371
  ulong a_flags= a->flags, b_flags= b->flags;
 
372
  
 
373
  if (a_flags & HA_NOSAME)
 
374
  {
 
375
    if (!(b_flags & HA_NOSAME))
 
376
      return -1;
 
377
    if ((a_flags ^ b_flags) & HA_NULL_PART_KEY)
 
378
    {
 
379
      /* Sort NOT NULL keys before other keys */
 
380
      return (a_flags & HA_NULL_PART_KEY) ? 1 : -1;
 
381
    }
 
382
    if (a->name == primary_key_name)
 
383
      return -1;
 
384
    if (b->name == primary_key_name)
 
385
      return 1;
 
386
    /* Sort keys don't containing partial segments before others */
 
387
    if ((a_flags ^ b_flags) & HA_KEY_HAS_PART_KEY_SEG)
 
388
      return (a_flags & HA_KEY_HAS_PART_KEY_SEG) ? 1 : -1;
 
389
  }
 
390
  else if (b_flags & HA_NOSAME)
 
391
    return 1;                                   // Prefer b
 
392
 
 
393
  if ((a_flags ^ b_flags) & HA_FULLTEXT)
 
394
  {
 
395
    return (a_flags & HA_FULLTEXT) ? 1 : -1;
 
396
  }
 
397
  /*
 
398
    Prefer original key order.  usable_key_parts contains here
 
399
    the original key position.
 
400
  */
 
401
  return ((a->usable_key_parts < b->usable_key_parts) ? -1 :
 
402
          (a->usable_key_parts > b->usable_key_parts) ? 1 :
 
403
          0);
 
404
}
 
405
 
 
406
// check_if_keyname_exists() cut and pasted directly from sql_table.cc. 
 
407
static bool
 
408
check_if_keyname_exists(const char *name, KEY *start, KEY *end)
 
409
{
 
410
  for (KEY *key=start ; key != end ; key++)
 
411
    if (!my_strcasecmp(system_charset_info,name,key->name))
 
412
      return 1;
 
413
  return 0;
 
414
}
 
415
 
 
416
// make_unique_key_name() cut and pasted directly from sql_table.cc. 
 
417
static char *
 
418
make_unique_key_name(const char *field_name,KEY *start,KEY *end)
 
419
{
 
420
  char buff[MAX_FIELD_NAME],*buff_end;
 
421
 
 
422
  if (!check_if_keyname_exists(field_name,start,end) &&
 
423
      my_strcasecmp(system_charset_info,field_name,primary_key_name))
 
424
    return (char*) field_name;                  // Use fieldname
 
425
  buff_end=strmake(buff,field_name, sizeof(buff)-4);
 
426
 
 
427
  /*
 
428
    Only 3 chars + '\0' left, so need to limit to 2 digit
 
429
    This is ok as we can't have more than 100 keys anyway
 
430
  */
 
431
  for (uint i=2 ; i< 100; i++)
 
432
  {
 
433
    *buff_end= '_';
 
434
    int10_to_str(i, buff_end+1, 10);
 
435
    if (!check_if_keyname_exists(buff,start,end))
 
436
      return sql_strdup(buff);
 
437
  }
 
438
  return (char*) "not_specified";               // Should never happen
 
439
}
 
440
 
 
441
 
 
442
// prepare_blob_field() cut and pasted directly from sql_table.cc. 
 
443
static bool prepare_blob_field(THD *thd, Create_field *sql_field)
 
444
{
 
445
  DBUG_ENTER("prepare_blob_field");
 
446
 
 
447
  if (sql_field->length > MAX_FIELD_VARCHARLENGTH &&
 
448
      !(sql_field->flags & BLOB_FLAG))
 
449
  {
 
450
    /* Convert long VARCHAR columns to TEXT or BLOB */
 
451
    char warn_buff[MYSQL_ERRMSG_SIZE];
 
452
 
 
453
    if (sql_field->def || (thd->variables.sql_mode & (MODE_STRICT_TRANS_TABLES |
 
454
                                                      MODE_STRICT_ALL_TABLES)))
 
455
    {
 
456
      my_error(ER_TOO_BIG_FIELDLENGTH, MYF(0), sql_field->field_name,
 
457
               MAX_FIELD_VARCHARLENGTH / sql_field->charset->mbmaxlen);
 
458
      DBUG_RETURN(1);
 
459
    }
 
460
    sql_field->sql_type= MYSQL_TYPE_BLOB;
 
461
    sql_field->flags|= BLOB_FLAG;
 
462
    sprintf(warn_buff, ER(ER_AUTO_CONVERT), sql_field->field_name,
 
463
            (sql_field->charset == &my_charset_bin) ? "VARBINARY" : "VARCHAR",
 
464
            (sql_field->charset == &my_charset_bin) ? "BLOB" : "TEXT");
 
465
    push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_AUTO_CONVERT,
 
466
                 warn_buff);
 
467
  }
 
468
    
 
469
  if ((sql_field->flags & BLOB_FLAG) && sql_field->length)
 
470
  {
 
471
    if (sql_field->sql_type == MYSQL_TYPE_BLOB)
 
472
    {
 
473
      /* The user has given a length to the blob column */
 
474
      sql_field->sql_type= get_blob_type_from_length(sql_field->length);
 
475
      sql_field->pack_length= calc_pack_length(sql_field->sql_type, 0);
 
476
    }
 
477
    sql_field->length= 0;
 
478
  }
 
479
  DBUG_RETURN(0);
 
480
}
 
481
 
 
482
//////////////////////////////
 
483
// mysql_prepare_create_table() cut and pasted directly from sql_table.cc.
 
484
static int
 
485
mysql_prepare_create_table(THD *thd, HA_CREATE_INFO *create_info,
 
486
                           Alter_info *alter_info,
 
487
                           bool tmp_table,
 
488
                           uint *db_options,
 
489
                           handler *file, KEY **key_info_buffer,
 
490
                           uint *key_count, int select_field_count)
 
491
{
 
492
  const char    *key_name;
 
493
  Create_field  *sql_field,*dup_field;
 
494
  uint          field,null_fields,blob_columns,max_key_length;
 
495
  ulong         record_offset= 0;
 
496
  KEY           *key_info;
 
497
  KEY_PART_INFO *key_part_info;
 
498
  int           timestamps= 0, timestamps_with_niladic= 0;
 
499
  int           field_no,dup_no;
 
500
  int           select_field_pos,auto_increment=0;
 
501
  List_iterator<Create_field> it(alter_info->create_list);
 
502
  List_iterator<Create_field> it2(alter_info->create_list);
 
503
  uint total_uneven_bit_length= 0;
 
504
  DBUG_ENTER("mysql_prepare_create_table");
 
505
 
 
506
  select_field_pos= alter_info->create_list.elements - select_field_count;
 
507
  null_fields=blob_columns=0;
 
508
  create_info->varchar= 0;
 
509
  max_key_length= file->max_key_length();
 
510
 
 
511
  for (field_no=0; (sql_field=it++) ; field_no++)
 
512
  {
 
513
    CHARSET_INFO *save_cs;
 
514
 
 
515
    /*
 
516
      Initialize length from its original value (number of characters),
 
517
      which was set in the parser. This is necessary if we're
 
518
      executing a prepared statement for the second time.
 
519
    */
 
520
    sql_field->length= sql_field->char_length;
 
521
    if (!sql_field->charset)
 
522
      sql_field->charset= create_info->default_table_charset;
 
523
    /*
 
524
      table_charset is set in ALTER TABLE if we want change character set
 
525
      for all varchar/char columns.
 
526
      But the table charset must not affect the BLOB fields, so don't
 
527
      allow to change my_charset_bin to somethig else.
 
528
    */
 
529
    if (create_info->table_charset && sql_field->charset != &my_charset_bin)
 
530
      sql_field->charset= create_info->table_charset;
 
531
 
 
532
    save_cs= sql_field->charset;
 
533
    if ((sql_field->flags & BINCMP_FLAG) &&
 
534
        !(sql_field->charset= get_charset_by_csname(sql_field->charset->csname,
 
535
                                                    MY_CS_BINSORT,MYF(0))))
 
536
    {
 
537
      char tmp[64];
 
538
      strmake(strmake(tmp, save_cs->csname, sizeof(tmp)-4),
 
539
              STRING_WITH_LEN("_bin"));
 
540
      my_error(ER_UNKNOWN_COLLATION, MYF(0), tmp);
 
541
      DBUG_RETURN(TRUE);
 
542
    }
 
543
 
 
544
    /*
 
545
      Convert the default value from client character
 
546
      set into the column character set if necessary.
 
547
    */
 
548
    if (sql_field->def && 
 
549
        save_cs != sql_field->def->collation.collation &&
 
550
        (sql_field->sql_type == MYSQL_TYPE_VAR_STRING ||
 
551
         sql_field->sql_type == MYSQL_TYPE_STRING ||
 
552
         sql_field->sql_type == MYSQL_TYPE_SET ||
 
553
         sql_field->sql_type == MYSQL_TYPE_ENUM))
 
554
    {
 
555
      /*
 
556
        Starting from 5.1 we work here with a copy of Create_field
 
557
        created by the caller, not with the instance that was
 
558
        originally created during parsing. It's OK to create
 
559
        a temporary item and initialize with it a member of the
 
560
        copy -- this item will be thrown away along with the copy
 
561
        at the end of execution, and thus not introduce a dangling
 
562
        pointer in the parsed tree of a prepared statement or a
 
563
        stored procedure statement.
 
564
      */
 
565
      sql_field->def= sql_field->def->safe_charset_converter(save_cs);
 
566
 
 
567
      if (sql_field->def == NULL)
 
568
      {
 
569
        /* Could not convert */
 
570
        my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
 
571
        DBUG_RETURN(TRUE);
 
572
      }
 
573
    }
 
574
 
 
575
    if (sql_field->sql_type == MYSQL_TYPE_SET ||
 
576
        sql_field->sql_type == MYSQL_TYPE_ENUM)
 
577
    {
 
578
      uint32 dummy;
 
579
      CHARSET_INFO *cs= sql_field->charset;
 
580
      TYPELIB *interval= sql_field->interval;
 
581
 
 
582
      /*
 
583
        Create typelib from interval_list, and if necessary
 
584
        convert strings from client character set to the
 
585
        column character set.
 
586
      */
 
587
      if (!interval)
 
588
      {
 
589
        /*
 
590
          Create the typelib in runtime memory - we will free the
 
591
          occupied memory at the same time when we free this
 
592
          sql_field -- at the end of execution.
 
593
        */
 
594
        interval= sql_field->interval= typelib(thd->mem_root,
 
595
                                               sql_field->interval_list);
 
596
        List_iterator<String> int_it(sql_field->interval_list);
 
597
        String conv, *tmp;
 
598
        char comma_buf[2];
 
599
        int comma_length= cs->cset->wc_mb(cs, ',', (uchar*) comma_buf,
 
600
                                          (uchar*) comma_buf + 
 
601
                                          sizeof(comma_buf));
 
602
        DBUG_ASSERT(comma_length > 0);
 
603
        for (uint i= 0; (tmp= int_it++); i++)
 
604
        {
 
605
          uint lengthsp;
 
606
          if (String::needs_conversion(tmp->length(), tmp->charset(),
 
607
                                       cs, &dummy))
 
608
          {
 
609
            uint cnv_errs;
 
610
            conv.copy(tmp->ptr(), tmp->length(), tmp->charset(), cs, &cnv_errs);
 
611
            interval->type_names[i]= strmake_root(thd->mem_root, conv.ptr(),
 
612
                                                  conv.length());
 
613
            interval->type_lengths[i]= conv.length();
 
614
          }
 
615
 
 
616
          // Strip trailing spaces.
 
617
          lengthsp= cs->cset->lengthsp(cs, interval->type_names[i],
 
618
                                       interval->type_lengths[i]);
 
619
          interval->type_lengths[i]= lengthsp;
 
620
          ((uchar *)interval->type_names[i])[lengthsp]= '\0';
 
621
          if (sql_field->sql_type == MYSQL_TYPE_SET)
 
622
          {
 
623
            if (cs->coll->instr(cs, interval->type_names[i], 
 
624
                                interval->type_lengths[i], 
 
625
                                comma_buf, comma_length, NULL, 0))
 
626
            {
 
627
              my_error(ER_ILLEGAL_VALUE_FOR_TYPE, MYF(0), "set", tmp->ptr());
 
628
              DBUG_RETURN(TRUE);
 
629
            }
 
630
          }
 
631
        }
 
632
        sql_field->interval_list.empty(); // Don't need interval_list anymore
 
633
      }
 
634
 
 
635
      if (sql_field->sql_type == MYSQL_TYPE_SET)
 
636
      {
 
637
        uint32 field_length;
 
638
        if (sql_field->def != NULL)
 
639
        {
 
640
          char *not_used;
 
641
          uint not_used2;
 
642
          bool not_found= 0;
 
643
          String str, *def= sql_field->def->val_str(&str);
 
644
          if (def == NULL) /* SQL "NULL" maps to NULL */
 
645
          {
 
646
            if ((sql_field->flags & NOT_NULL_FLAG) != 0)
 
647
            {
 
648
              my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
 
649
              DBUG_RETURN(TRUE);
 
650
            }
 
651
 
 
652
            /* else, NULL is an allowed value */
 
653
            (void) find_set(interval, NULL, 0,
 
654
                            cs, &not_used, &not_used2, &not_found);
 
655
          }
 
656
          else /* not NULL */
 
657
          {
 
658
            (void) find_set(interval, def->ptr(), def->length(),
 
659
                            cs, &not_used, &not_used2, &not_found);
 
660
          }
 
661
 
 
662
          if (not_found)
 
663
          {
 
664
            my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
 
665
            DBUG_RETURN(TRUE);
 
666
          }
 
667
        }
 
668
        calculate_interval_lengths(cs, interval, &dummy, &field_length);
 
669
        sql_field->length= field_length + (interval->count - 1);
 
670
      }
 
671
      else  /* MYSQL_TYPE_ENUM */
 
672
      {
 
673
        uint32 field_length;
 
674
        DBUG_ASSERT(sql_field->sql_type == MYSQL_TYPE_ENUM);
 
675
        if (sql_field->def != NULL)
 
676
        {
 
677
          String str, *def= sql_field->def->val_str(&str);
 
678
          if (def == NULL) /* SQL "NULL" maps to NULL */
 
679
          {
 
680
            if ((sql_field->flags & NOT_NULL_FLAG) != 0)
 
681
            {
 
682
              my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
 
683
              DBUG_RETURN(TRUE);
 
684
            }
 
685
 
 
686
            /* else, the defaults yield the correct length for NULLs. */
 
687
          } 
 
688
          else /* not NULL */
 
689
          {
 
690
            def->length(cs->cset->lengthsp(cs, def->ptr(), def->length()));
 
691
            if (find_type2(interval, def->ptr(), def->length(), cs) == 0) /* not found */
 
692
            {
 
693
              my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
 
694
              DBUG_RETURN(TRUE);
 
695
            }
 
696
          }
 
697
        }
 
698
        calculate_interval_lengths(cs, interval, &field_length, &dummy);
 
699
        sql_field->length= field_length;
 
700
      }
 
701
      set_if_smaller(sql_field->length, MAX_FIELD_WIDTH-1);
 
702
    }
 
703
 
 
704
    if (sql_field->sql_type == MYSQL_TYPE_BIT)
 
705
    { 
 
706
      sql_field->pack_flag= FIELDFLAG_NUMBER;
 
707
      if (file->ha_table_flags() & HA_CAN_BIT_FIELD)
 
708
        total_uneven_bit_length+= sql_field->length & 7;
 
709
      else
 
710
        sql_field->pack_flag|= FIELDFLAG_TREAT_BIT_AS_CHAR;
 
711
    }
 
712
 
 
713
    sql_field->create_length_to_internal_length();
 
714
    if (prepare_blob_field(thd, sql_field))
 
715
      DBUG_RETURN(TRUE);
 
716
 
 
717
    if (!(sql_field->flags & NOT_NULL_FLAG))
 
718
      null_fields++;
 
719
 
 
720
    if (check_column_name(sql_field->field_name))
 
721
    {
 
722
      my_error(ER_WRONG_COLUMN_NAME, MYF(0), sql_field->field_name);
 
723
      DBUG_RETURN(TRUE);
 
724
    }
 
725
 
 
726
    /* Check if we have used the same field name before */
 
727
    for (dup_no=0; (dup_field=it2++) != sql_field; dup_no++)
 
728
    {
 
729
      if (my_strcasecmp(system_charset_info,
 
730
                        sql_field->field_name,
 
731
                        dup_field->field_name) == 0)
 
732
      {
 
733
        /*
 
734
          If this was a CREATE ... SELECT statement, accept a field
 
735
          redefinition if we are changing a field in the SELECT part
 
736
        */
 
737
        if (field_no < select_field_pos || dup_no >= select_field_pos)
 
738
        {
 
739
          my_error(ER_DUP_FIELDNAME, MYF(0), sql_field->field_name);
 
740
          DBUG_RETURN(TRUE);
 
741
        }
 
742
        else
 
743
        {
 
744
          /* Field redefined */
 
745
          sql_field->def=               dup_field->def;
 
746
          sql_field->sql_type=          dup_field->sql_type;
 
747
          sql_field->charset=           (dup_field->charset ?
 
748
                                         dup_field->charset :
 
749
                                         create_info->default_table_charset);
 
750
          sql_field->length=            dup_field->char_length;
 
751
          sql_field->pack_length=       dup_field->pack_length;
 
752
          sql_field->key_length=        dup_field->key_length;
 
753
          sql_field->decimals=          dup_field->decimals;
 
754
          sql_field->create_length_to_internal_length();
 
755
          sql_field->unireg_check=      dup_field->unireg_check;
 
756
          /* 
 
757
            We're making one field from two, the result field will have
 
758
            dup_field->flags as flags. If we've incremented null_fields
 
759
            because of sql_field->flags, decrement it back.
 
760
          */
 
761
          if (!(sql_field->flags & NOT_NULL_FLAG))
 
762
            null_fields--;
 
763
          sql_field->flags=             dup_field->flags;
 
764
          sql_field->interval=          dup_field->interval;
 
765
          it2.remove();                 // Remove first (create) definition
 
766
          select_field_pos--;
 
767
          break;
 
768
        }
 
769
      }
 
770
    }
 
771
    /* Don't pack rows in old tables if the user has requested this */
 
772
    if ((sql_field->flags & BLOB_FLAG) ||
 
773
        (sql_field->sql_type == MYSQL_TYPE_VARCHAR &&
 
774
         create_info->row_type != ROW_TYPE_FIXED))
 
775
      (*db_options)|= HA_OPTION_PACK_RECORD;
 
776
    it2.rewind();
 
777
  }
 
778
 
 
779
  /* record_offset will be increased with 'length-of-null-bits' later */
 
780
  record_offset= 0;
 
781
  null_fields+= total_uneven_bit_length;
 
782
 
 
783
  it.rewind();
 
784
  while ((sql_field=it++))
 
785
  {
 
786
    DBUG_ASSERT(sql_field->charset != 0);
 
787
 
 
788
    if (prepare_create_field(sql_field, &blob_columns, 
 
789
                             &timestamps, &timestamps_with_niladic,
 
790
                             file->ha_table_flags()))
 
791
      DBUG_RETURN(TRUE);
 
792
    if (sql_field->sql_type == MYSQL_TYPE_VARCHAR)
 
793
      create_info->varchar= TRUE;
 
794
    sql_field->offset= record_offset;
 
795
    if (MTYP_TYPENR(sql_field->unireg_check) == Field::NEXT_NUMBER)
 
796
      auto_increment++;
 
797
    record_offset+= sql_field->pack_length;
 
798
  }
 
799
  if (timestamps_with_niladic > 1)
 
800
  {
 
801
    my_message(ER_TOO_MUCH_AUTO_TIMESTAMP_COLS,
 
802
               ER(ER_TOO_MUCH_AUTO_TIMESTAMP_COLS), MYF(0));
 
803
    DBUG_RETURN(TRUE);
 
804
  }
 
805
  if (auto_increment > 1)
 
806
  {
 
807
    my_message(ER_WRONG_AUTO_KEY, ER(ER_WRONG_AUTO_KEY), MYF(0));
 
808
    DBUG_RETURN(TRUE);
 
809
  }
 
810
  if (auto_increment &&
 
811
      (file->ha_table_flags() & HA_NO_AUTO_INCREMENT))
 
812
  {
 
813
    my_message(ER_TABLE_CANT_HANDLE_AUTO_INCREMENT,
 
814
               ER(ER_TABLE_CANT_HANDLE_AUTO_INCREMENT), MYF(0));
 
815
    DBUG_RETURN(TRUE);
 
816
  }
 
817
 
 
818
  if (blob_columns && (file->ha_table_flags() & HA_NO_BLOBS))
 
819
  {
 
820
    my_message(ER_TABLE_CANT_HANDLE_BLOB, ER(ER_TABLE_CANT_HANDLE_BLOB),
 
821
               MYF(0));
 
822
    DBUG_RETURN(TRUE);
 
823
  }
 
824
 
 
825
  /* Create keys */
 
826
 
 
827
  List_iterator<Key> key_iterator(alter_info->key_list);
 
828
  List_iterator<Key> key_iterator2(alter_info->key_list);
 
829
  uint key_parts=0, fk_key_count=0;
 
830
  bool primary_key=0,unique_key=0;
 
831
  Key *key, *key2;
 
832
  uint tmp, key_number;
 
833
  /* special marker for keys to be ignored */
 
834
  static char ignore_key[1];
 
835
 
 
836
  /* Calculate number of key segements */
 
837
  *key_count= 0;
 
838
  
 
839
  while ((key=key_iterator++))
 
840
  {
 
841
    DBUG_PRINT("info", ("key name: '%s'  type: %d", key->DOT_STR(name) ? key->DOT_STR(name) :
 
842
                        "(none)" , key->type));
 
843
    LEX_STRING key_name_str;
 
844
    if (key->type == Key::FOREIGN_KEY)
 
845
    {
 
846
      fk_key_count++;
 
847
      Foreign_key *fk_key= (Foreign_key*) key;
 
848
      if (fk_key->ref_columns.elements &&
 
849
          fk_key->ref_columns.elements != fk_key->columns.elements)
 
850
      {
 
851
        my_error(ER_WRONG_FK_DEF, MYF(0),
 
852
                 (fk_key->DOT_STR(name) ?  fk_key->DOT_STR(name) : "foreign key without name"),
 
853
                 ER(ER_KEY_REF_DO_NOT_MATCH_TABLE_REF));
 
854
        DBUG_RETURN(TRUE);
 
855
      }
 
856
      continue;
 
857
    }
 
858
    (*key_count)++;
 
859
    tmp=file->max_key_parts();
 
860
    if (key->columns.elements > tmp)
 
861
    {
 
862
      my_error(ER_TOO_MANY_KEY_PARTS,MYF(0),tmp);
 
863
      DBUG_RETURN(TRUE);
 
864
    }
 
865
    key_name_str.str= (char*) key->DOT_STR(name);
 
866
    key_name_str.length= key->DOT_STR(name) ? strlen(key->DOT_STR(name)) : 0;
 
867
    if (check_string_char_length(&key_name_str, "", NAME_CHAR_LEN,
 
868
                                 system_charset_info, 1))
 
869
    {
 
870
      my_error(ER_TOO_LONG_IDENT, MYF(0), key->DOT_STR(name));
 
871
      DBUG_RETURN(TRUE);
 
872
    }
 
873
    key_iterator2.rewind ();
 
874
    if (key->type != Key::FOREIGN_KEY)
 
875
    {
 
876
      while ((key2 = key_iterator2++) != key)
 
877
      {
 
878
        /*
 
879
          foreign_key_prefix(key, key2) returns 0 if key or key2, or both, is
 
880
          'generated', and a generated key is a prefix of the other key.
 
881
          Then we do not need the generated shorter key.
 
882
        */
 
883
        if ((key2->type != Key::FOREIGN_KEY &&
 
884
             key2->DOT_STR(name) != ignore_key &&
 
885
             !foreign_key_prefix(key, key2)))
 
886
        {
 
887
          /* TODO: issue warning message */
 
888
          /* mark that the generated key should be ignored */
 
889
          if (!key2->generated ||
 
890
              (key->generated && key->columns.elements <
 
891
               key2->columns.elements))
 
892
            key->DOT_STR(name)= ignore_key;
 
893
          else
 
894
          {
 
895
            key2->DOT_STR(name)= ignore_key;
 
896
            key_parts-= key2->columns.elements;
 
897
            (*key_count)--;
 
898
          }
 
899
          break;
 
900
        }
 
901
      }
 
902
    }
 
903
    if (key->DOT_STR(name) != ignore_key)
 
904
      key_parts+=key->columns.elements;
 
905
    else
 
906
      (*key_count)--;
 
907
    if (key->DOT_STR(name) && !tmp_table && (key->type != Key::PRIMARY) &&
 
908
        !my_strcasecmp(system_charset_info,key->DOT_STR(name),primary_key_name))
 
909
    {
 
910
      my_error(ER_WRONG_NAME_FOR_INDEX, MYF(0), key->DOT_STR(name));
 
911
      DBUG_RETURN(TRUE);
 
912
    }
 
913
  }
 
914
  tmp=file->max_keys();
 
915
  if (*key_count > tmp)
 
916
  {
 
917
    my_error(ER_TOO_MANY_KEYS,MYF(0),tmp);
 
918
    DBUG_RETURN(TRUE);
 
919
  }
 
920
 
 
921
  (*key_info_buffer)= key_info= (KEY*) sql_calloc(sizeof(KEY) * (*key_count));
 
922
  key_part_info=(KEY_PART_INFO*) sql_calloc(sizeof(KEY_PART_INFO)*key_parts);
 
923
  if (!*key_info_buffer || ! key_part_info)
 
924
    DBUG_RETURN(TRUE);                          // Out of memory
 
925
 
 
926
  key_iterator.rewind();
 
927
  key_number=0;
 
928
  for (; (key=key_iterator++) ; key_number++)
 
929
  {
 
930
    uint key_length=0;
 
931
    Key_part_spec *column;
 
932
 
 
933
    if (key->DOT_STR(name) == ignore_key)
 
934
    {
 
935
      /* ignore redundant keys */
 
936
      do
 
937
        key=key_iterator++;
 
938
      while (key && key->DOT_STR(name) == ignore_key);
 
939
      if (!key)
 
940
        break;
 
941
    }
 
942
 
 
943
    switch (key->type) {
 
944
    case Key::MULTIPLE:
 
945
        key_info->flags= 0;
 
946
        break;
 
947
    case Key::FULLTEXT:
 
948
        key_info->flags= HA_FULLTEXT;
 
949
        if ((key_info->parser_name= &key->key_create_info.parser_name)->str)
 
950
          key_info->flags|= HA_USES_PARSER;
 
951
        else
 
952
          key_info->parser_name= 0;
 
953
        break;
 
954
    case Key::SPATIAL:
 
955
#ifdef HAVE_SPATIAL
 
956
        key_info->flags= HA_SPATIAL;
 
957
        break;
 
958
#else
 
959
        my_error(ER_FEATURE_DISABLED, MYF(0),
 
960
                 sym_group_geom.name, sym_group_geom.needed_define);
 
961
        DBUG_RETURN(TRUE);
 
962
#endif
 
963
    case Key::FOREIGN_KEY:
 
964
      key_number--;                             // Skip this key
 
965
      continue;
 
966
    default:
 
967
      key_info->flags = HA_NOSAME;
 
968
      break;
 
969
    }
 
970
    if (key->generated)
 
971
      key_info->flags|= HA_GENERATED_KEY;
 
972
 
 
973
    key_info->key_parts=(uint8) key->columns.elements;
 
974
    key_info->key_part=key_part_info;
 
975
    key_info->usable_key_parts= key_number;
 
976
    key_info->algorithm= key->key_create_info.algorithm;
 
977
 
 
978
    if (key->type == Key::FULLTEXT)
 
979
    {
 
980
      if (!(file->ha_table_flags() & HA_CAN_FULLTEXT))
 
981
      {
 
982
        my_message(ER_TABLE_CANT_HANDLE_FT, ER(ER_TABLE_CANT_HANDLE_FT),
 
983
                   MYF(0));
 
984
        DBUG_RETURN(TRUE);
 
985
      }
 
986
    }
 
987
    /*
 
988
       Make SPATIAL to be RTREE by default
 
989
       SPATIAL only on BLOB or at least BINARY, this
 
990
       actually should be replaced by special GEOM type
 
991
       in near future when new frm file is ready
 
992
       checking for proper key parts number:
 
993
    */
 
994
 
 
995
    /* TODO: Add proper checks if handler supports key_type and algorithm */
 
996
    if (key_info->flags & HA_SPATIAL)
 
997
    {
 
998
      if (!(file->ha_table_flags() & HA_CAN_RTREEKEYS))
 
999
      {
 
1000
        my_message(ER_TABLE_CANT_HANDLE_SPKEYS, ER(ER_TABLE_CANT_HANDLE_SPKEYS),
 
1001
                   MYF(0));
 
1002
        DBUG_RETURN(TRUE);
 
1003
      }
 
1004
      if (key_info->key_parts != 1)
 
1005
      {
 
1006
        my_error(ER_WRONG_ARGUMENTS, MYF(0), "SPATIAL INDEX");
 
1007
        DBUG_RETURN(TRUE);
 
1008
      }
 
1009
    }
 
1010
    else if (key_info->algorithm == HA_KEY_ALG_RTREE)
 
1011
    {
 
1012
#ifdef HAVE_RTREE_KEYS
 
1013
      if ((key_info->key_parts & 1) == 1)
 
1014
      {
 
1015
        my_error(ER_WRONG_ARGUMENTS, MYF(0), "RTREE INDEX");
 
1016
        DBUG_RETURN(TRUE);
 
1017
      }
 
1018
      /* TODO: To be deleted */
 
1019
      my_error(ER_NOT_SUPPORTED_YET, MYF(0), "RTREE INDEX");
 
1020
      DBUG_RETURN(TRUE);
 
1021
#else
 
1022
      my_error(ER_FEATURE_DISABLED, MYF(0),
 
1023
               sym_group_rtree.name, sym_group_rtree.needed_define);
 
1024
      DBUG_RETURN(TRUE);
 
1025
#endif
 
1026
    }
 
1027
 
 
1028
    /* Take block size from key part or table part */
 
1029
    /*
 
1030
      TODO: Add warning if block size changes. We can't do it here, as
 
1031
      this may depend on the size of the key
 
1032
    */
 
1033
    key_info->block_size= (key->key_create_info.block_size ?
 
1034
                           key->key_create_info.block_size :
 
1035
                           create_info->key_block_size);
 
1036
 
 
1037
    if (key_info->block_size)
 
1038
      key_info->flags|= HA_USES_BLOCK_SIZE;
 
1039
 
 
1040
    List_iterator<Key_part_spec> cols(key->columns), cols2(key->columns);
 
1041
    CHARSET_INFO *ft_key_charset=0;  // for FULLTEXT
 
1042
    for (uint column_nr=0 ; (column=cols++) ; column_nr++)
 
1043
    {
 
1044
      uint length;
 
1045
      Key_part_spec *dup_column;
 
1046
 
 
1047
      it.rewind();
 
1048
      field=0;
 
1049
      while ((sql_field=it++) &&
 
1050
             my_strcasecmp(system_charset_info,
 
1051
                           column->DOT_STR(field_name),
 
1052
                           sql_field->field_name))
 
1053
        field++;
 
1054
      if (!sql_field)
 
1055
      {
 
1056
        my_error(ER_KEY_COLUMN_DOES_NOT_EXITS, MYF(0), column->field_name);
 
1057
        DBUG_RETURN(TRUE);
 
1058
      }
 
1059
      while ((dup_column= cols2++) != column)
 
1060
      {
 
1061
        if (!my_strcasecmp(system_charset_info,
 
1062
                           column->DOT_STR(field_name), dup_column->DOT_STR(field_name)))
 
1063
        {
 
1064
          my_printf_error(ER_DUP_FIELDNAME,
 
1065
                          ER(ER_DUP_FIELDNAME),MYF(0),
 
1066
                          column->field_name);
 
1067
          DBUG_RETURN(TRUE);
 
1068
        }
 
1069
      }
 
1070
      cols2.rewind();
 
1071
      if (key->type == Key::FULLTEXT)
 
1072
      {
 
1073
        if ((sql_field->sql_type != MYSQL_TYPE_STRING &&
 
1074
             sql_field->sql_type != MYSQL_TYPE_VARCHAR &&
 
1075
             !f_is_blob(sql_field->pack_flag)) ||
 
1076
            sql_field->charset == &my_charset_bin ||
 
1077
            sql_field->charset->mbminlen > 1 || // ucs2 doesn't work yet
 
1078
            (ft_key_charset && sql_field->charset != ft_key_charset))
 
1079
        {
 
1080
            my_error(ER_BAD_FT_COLUMN, MYF(0), column->field_name);
 
1081
            DBUG_RETURN(-1);
 
1082
        }
 
1083
        ft_key_charset=sql_field->charset;
 
1084
        /*
 
1085
          for fulltext keys keyseg length is 1 for blobs (it's ignored in ft
 
1086
          code anyway, and 0 (set to column width later) for char's. it has
 
1087
          to be correct col width for char's, as char data are not prefixed
 
1088
          with length (unlike blobs, where ft code takes data length from a
 
1089
          data prefix, ignoring column->length).
 
1090
        */
 
1091
        column->length=test(f_is_blob(sql_field->pack_flag));
 
1092
      }
 
1093
      else
 
1094
      {
 
1095
        column->length*= sql_field->charset->mbmaxlen;
 
1096
 
 
1097
        if (key->type == Key::SPATIAL && column->length)
 
1098
        {
 
1099
          my_error(ER_WRONG_SUB_KEY, MYF(0));
 
1100
          DBUG_RETURN(TRUE);
 
1101
        }
 
1102
 
 
1103
        if (f_is_blob(sql_field->pack_flag) ||
 
1104
            (f_is_geom(sql_field->pack_flag) && key->type != Key::SPATIAL))
 
1105
        {
 
1106
          if (!(file->ha_table_flags() & HA_CAN_INDEX_BLOBS))
 
1107
          {
 
1108
            my_error(ER_BLOB_USED_AS_KEY, MYF(0), column->field_name);
 
1109
            DBUG_RETURN(TRUE);
 
1110
          }
 
1111
          if (f_is_geom(sql_field->pack_flag) && sql_field->geom_type ==
 
1112
              Field::GEOM_POINT)
 
1113
            column->length= 25;
 
1114
          if (!column->length)
 
1115
          {
 
1116
            my_error(ER_BLOB_KEY_WITHOUT_LENGTH, MYF(0), column->field_name);
 
1117
            DBUG_RETURN(TRUE);
 
1118
          }
 
1119
        }
 
1120
#ifdef HAVE_SPATIAL
 
1121
        if (key->type == Key::SPATIAL)
 
1122
        {
 
1123
          if (!column->length)
 
1124
          {
 
1125
            /*
 
1126
              4 is: (Xmin,Xmax,Ymin,Ymax), this is for 2D case
 
1127
              Lately we'll extend this code to support more dimensions
 
1128
            */
 
1129
            column->length= 4*sizeof(double);
 
1130
          }
 
1131
        }
 
1132
#endif
 
1133
        if (!(sql_field->flags & NOT_NULL_FLAG))
 
1134
        {
 
1135
          if (key->type == Key::PRIMARY)
 
1136
          {
 
1137
            /* Implicitly set primary key fields to NOT NULL for ISO conf. */
 
1138
            sql_field->flags|= NOT_NULL_FLAG;
 
1139
            sql_field->pack_flag&= ~FIELDFLAG_MAYBE_NULL;
 
1140
            null_fields--;
 
1141
          }
 
1142
          else
 
1143
          {
 
1144
            key_info->flags|= HA_NULL_PART_KEY;
 
1145
            if (!(file->ha_table_flags() & HA_NULL_IN_KEY))
 
1146
            {
 
1147
              my_error(ER_NULL_COLUMN_IN_INDEX, MYF(0), column->field_name);
 
1148
              DBUG_RETURN(TRUE);
 
1149
            }
 
1150
            if (key->type == Key::SPATIAL)
 
1151
            {
 
1152
              my_message(ER_SPATIAL_CANT_HAVE_NULL,
 
1153
                         ER(ER_SPATIAL_CANT_HAVE_NULL), MYF(0));
 
1154
              DBUG_RETURN(TRUE);
 
1155
            }
 
1156
          }
 
1157
        }
 
1158
        if (MTYP_TYPENR(sql_field->unireg_check) == Field::NEXT_NUMBER)
 
1159
        {
 
1160
          if (column_nr == 0 || (file->ha_table_flags() & HA_AUTO_PART_KEY))
 
1161
            auto_increment--;                   // Field is used
 
1162
        }
 
1163
      }
 
1164
 
 
1165
      key_part_info->fieldnr= field;
 
1166
      key_part_info->offset=  (uint16) sql_field->offset;
 
1167
      key_part_info->key_type=sql_field->pack_flag;
 
1168
      length= sql_field->key_length;
 
1169
 
 
1170
      if (column->length)
 
1171
      {
 
1172
        if (f_is_blob(sql_field->pack_flag))
 
1173
        {
 
1174
          if ((length=column->length) > max_key_length ||
 
1175
              length > file->max_key_part_length())
 
1176
          {
 
1177
            length=min(max_key_length, file->max_key_part_length());
 
1178
            if (key->type == Key::MULTIPLE)
 
1179
            {
 
1180
              /* not a critical problem */
 
1181
              char warn_buff[MYSQL_ERRMSG_SIZE];
 
1182
              my_snprintf(warn_buff, sizeof(warn_buff), ER(ER_TOO_LONG_KEY),
 
1183
                          length);
 
1184
              push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
 
1185
                           ER_TOO_LONG_KEY, warn_buff);
 
1186
              /* Align key length to multibyte char boundary */
 
1187
              length-= length % sql_field->charset->mbmaxlen;
 
1188
            }
 
1189
            else
 
1190
            {
 
1191
              my_error(ER_TOO_LONG_KEY,MYF(0),length);
 
1192
              DBUG_RETURN(TRUE);
 
1193
            }
 
1194
          }
 
1195
        }
 
1196
        else if (!f_is_geom(sql_field->pack_flag) &&
 
1197
                  (column->length > length ||
 
1198
                   !Field::type_can_have_key_part (sql_field->sql_type) ||
 
1199
                   ((f_is_packed(sql_field->pack_flag) ||
 
1200
                     ((file->ha_table_flags() & HA_NO_PREFIX_CHAR_KEYS) &&
 
1201
                      (key_info->flags & HA_NOSAME))) &&
 
1202
                    column->length != length)))
 
1203
        {
 
1204
          my_message(ER_WRONG_SUB_KEY, ER(ER_WRONG_SUB_KEY), MYF(0));
 
1205
          DBUG_RETURN(TRUE);
 
1206
        }
 
1207
        else if (!(file->ha_table_flags() & HA_NO_PREFIX_CHAR_KEYS))
 
1208
          length=column->length;
 
1209
      }
 
1210
      else if (length == 0)
 
1211
      {
 
1212
        my_error(ER_WRONG_KEY_COLUMN, MYF(0), column->field_name);
 
1213
          DBUG_RETURN(TRUE);
 
1214
      }
 
1215
      if (length > file->max_key_part_length() && key->type != Key::FULLTEXT)
 
1216
      {
 
1217
        length= file->max_key_part_length();
 
1218
        if (key->type == Key::MULTIPLE)
 
1219
        {
 
1220
          /* not a critical problem */
 
1221
          char warn_buff[MYSQL_ERRMSG_SIZE];
 
1222
          my_snprintf(warn_buff, sizeof(warn_buff), ER(ER_TOO_LONG_KEY),
 
1223
                      length);
 
1224
          push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
 
1225
                       ER_TOO_LONG_KEY, warn_buff);
 
1226
          /* Align key length to multibyte char boundary */
 
1227
          length-= length % sql_field->charset->mbmaxlen;
 
1228
        }
 
1229
        else
 
1230
        {
 
1231
          my_error(ER_TOO_LONG_KEY,MYF(0),length);
 
1232
          DBUG_RETURN(TRUE);
 
1233
        }
 
1234
      }
 
1235
      key_part_info->length=(uint16) length;
 
1236
      /* Use packed keys for long strings on the first column */
 
1237
      if (!((*db_options) & HA_OPTION_NO_PACK_KEYS) &&
 
1238
          (length >= KEY_DEFAULT_PACK_LENGTH &&
 
1239
           (sql_field->sql_type == MYSQL_TYPE_STRING ||
 
1240
            sql_field->sql_type == MYSQL_TYPE_VARCHAR ||
 
1241
            sql_field->pack_flag & FIELDFLAG_BLOB)))
 
1242
      {
 
1243
        if ((column_nr == 0 && (sql_field->pack_flag & FIELDFLAG_BLOB)) ||
 
1244
            sql_field->sql_type == MYSQL_TYPE_VARCHAR)
 
1245
          key_info->flags|= HA_BINARY_PACK_KEY | HA_VAR_LENGTH_KEY;
 
1246
        else
 
1247
          key_info->flags|= HA_PACK_KEY;
 
1248
      }
 
1249
      /* Check if the key segment is partial, set the key flag accordingly */
 
1250
      if (length != sql_field->key_length)
 
1251
        key_info->flags|= HA_KEY_HAS_PART_KEY_SEG;
 
1252
 
 
1253
      key_length+=length;
 
1254
      key_part_info++;
 
1255
 
 
1256
      /* Create the key name based on the first column (if not given) */
 
1257
      if (column_nr == 0)
 
1258
      {
 
1259
        if (key->type == Key::PRIMARY)
 
1260
        {
 
1261
          if (primary_key)
 
1262
          {
 
1263
            my_message(ER_MULTIPLE_PRI_KEY, ER(ER_MULTIPLE_PRI_KEY),
 
1264
                       MYF(0));
 
1265
            DBUG_RETURN(TRUE);
 
1266
          }
 
1267
          key_name=primary_key_name;
 
1268
          primary_key=1;
 
1269
        }
 
1270
        else if (!(key_name = key->DOT_STR(name)))
 
1271
          key_name=make_unique_key_name(sql_field->field_name,
 
1272
                                        *key_info_buffer, key_info);
 
1273
        if (check_if_keyname_exists(key_name, *key_info_buffer, key_info))
 
1274
        {
 
1275
          my_error(ER_DUP_KEYNAME, MYF(0), key_name);
 
1276
          DBUG_RETURN(TRUE);
 
1277
        }
 
1278
        key_info->name=(char*) key_name;
 
1279
      }
 
1280
    }
 
1281
    if (!key_info->name || check_column_name(key_info->name))
 
1282
    {
 
1283
      my_error(ER_WRONG_NAME_FOR_INDEX, MYF(0), key_info->name);
 
1284
      DBUG_RETURN(TRUE);
 
1285
    }
 
1286
    if (!(key_info->flags & HA_NULL_PART_KEY))
 
1287
      unique_key=1;
 
1288
    key_info->key_length=(uint16) key_length;
 
1289
    if (key_length > max_key_length && key->type != Key::FULLTEXT)
 
1290
    {
 
1291
      my_error(ER_TOO_LONG_KEY,MYF(0),max_key_length);
 
1292
      DBUG_RETURN(TRUE);
 
1293
    }
 
1294
    key_info++;
 
1295
  }
 
1296
  if (!unique_key && !primary_key &&
 
1297
      (file->ha_table_flags() & HA_REQUIRE_PRIMARY_KEY))
 
1298
  {
 
1299
    my_message(ER_REQUIRES_PRIMARY_KEY, ER(ER_REQUIRES_PRIMARY_KEY), MYF(0));
 
1300
    DBUG_RETURN(TRUE);
 
1301
  }
 
1302
  if (auto_increment > 0)
 
1303
  {
 
1304
    my_message(ER_WRONG_AUTO_KEY, ER(ER_WRONG_AUTO_KEY), MYF(0));
 
1305
    DBUG_RETURN(TRUE);
 
1306
  }
 
1307
  /* Sort keys in optimized order */
 
1308
  my_qsort((uchar*) *key_info_buffer, *key_count, sizeof(KEY),
 
1309
           (qsort_cmp) sort_keys);
 
1310
  create_info->null_bits= null_fields;
 
1311
 
 
1312
  /* Check fields. */
 
1313
  it.rewind();
 
1314
  while ((sql_field=it++))
 
1315
  {
 
1316
    Field::utype type= (Field::utype) MTYP_TYPENR(sql_field->unireg_check);
 
1317
 
 
1318
    if (thd->variables.sql_mode & MODE_NO_ZERO_DATE &&
 
1319
        !sql_field->def &&
 
1320
        sql_field->sql_type == MYSQL_TYPE_TIMESTAMP &&
 
1321
        (sql_field->flags & NOT_NULL_FLAG) &&
 
1322
        (type == Field::NONE || type == Field::TIMESTAMP_UN_FIELD))
 
1323
    {
 
1324
      /*
 
1325
        An error should be reported if:
 
1326
          - NO_ZERO_DATE SQL mode is active;
 
1327
          - there is no explicit DEFAULT clause (default column value);
 
1328
          - this is a TIMESTAMP column;
 
1329
          - the column is not NULL;
 
1330
          - this is not the DEFAULT CURRENT_TIMESTAMP column.
 
1331
 
 
1332
        In other words, an error should be reported if
 
1333
          - NO_ZERO_DATE SQL mode is active;
 
1334
          - the column definition is equivalent to
 
1335
            'column_name TIMESTAMP DEFAULT 0'.
 
1336
      */
 
1337
 
 
1338
      my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
 
1339
      DBUG_RETURN(TRUE);
 
1340
    }
 
1341
  }
 
1342
 
 
1343
  DBUG_RETURN(FALSE);
 
1344
}
 
1345
 
 
1346
//////////////////////////////
 
1347
// mysql_create_table_no_lock() cut and pasted directly from sql_table.cc. (I did make is static after copying it.)
 
1348
 
 
1349
static bool mysql_create_table_no_lock(THD *thd,
 
1350
                                const char *db, const char *table_name,
 
1351
                                HA_CREATE_INFO *create_info,
 
1352
                                Alter_info *alter_info,
 
1353
                                bool internal_tmp_table,
 
1354
                                uint select_field_count)
 
1355
{
 
1356
  char                  path[FN_REFLEN];
 
1357
  uint          path_length;
 
1358
  const char    *alias;
 
1359
  uint                  db_options, key_count;
 
1360
  KEY                   *key_info_buffer;
 
1361
  handler               *file;
 
1362
  bool                  error= TRUE;
 
1363
  DBUG_ENTER("mysql_create_table_no_lock");
 
1364
  DBUG_PRINT("enter", ("db: '%s'  table: '%s'  tmp: %d",
 
1365
                       db, table_name, internal_tmp_table));
 
1366
 
 
1367
 
 
1368
  /* Check for duplicate fields and check type of table to create */
 
1369
  if (!alter_info->create_list.elements)
 
1370
  {
 
1371
    my_message(ER_TABLE_MUST_HAVE_COLUMNS, ER(ER_TABLE_MUST_HAVE_COLUMNS),
 
1372
               MYF(0));
 
1373
    DBUG_RETURN(TRUE);
 
1374
  }
 
1375
  if (check_engine(thd, table_name, create_info))
 
1376
    DBUG_RETURN(TRUE);
 
1377
  db_options= create_info->table_options;
 
1378
  if (create_info->row_type == ROW_TYPE_DYNAMIC)
 
1379
    db_options|=HA_OPTION_PACK_RECORD;
 
1380
  alias= table_case_name(create_info, table_name);
 
1381
 
 
1382
  /* PMC - Done to avoid getting the partition handler by mistake! */
 
1383
  if (!(file= new (thd->mem_root) ha_xtsys(pbxt_hton, NULL)))
 
1384
  {
 
1385
    mem_alloc_error(sizeof(handler));
 
1386
    DBUG_RETURN(TRUE);
 
1387
  }
 
1388
 
 
1389
  set_table_default_charset(thd, create_info, (char*) db);
 
1390
 
 
1391
  if (mysql_prepare_create_table(thd, create_info, alter_info,
 
1392
                                 internal_tmp_table,
 
1393
                                 &db_options, file,
 
1394
                                 &key_info_buffer, &key_count,
 
1395
                                 select_field_count))
 
1396
    goto err;
 
1397
 
 
1398
      /* Check if table exists */
 
1399
  if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
 
1400
  {
 
1401
    path_length= build_tmptable_filename(thd, path, sizeof(path));
 
1402
    create_info->table_options|=HA_CREATE_DELAY_KEY_WRITE;
 
1403
  }
 
1404
  else  
 
1405
  {
 
1406
 #ifdef FN_DEVCHAR
 
1407
    /* check if the table name contains FN_DEVCHAR when defined */
 
1408
    if (strchr(alias, FN_DEVCHAR))
 
1409
    {
 
1410
      my_error(ER_WRONG_TABLE_NAME, MYF(0), alias);
 
1411
      DBUG_RETURN(TRUE);
 
1412
    }
 
1413
#endif
 
1414
    path_length= build_table_filename(path, sizeof(path), db, alias, reg_ext,
 
1415
                                      internal_tmp_table ? FN_IS_TMP : 0);
 
1416
  }
 
1417
 
 
1418
  /* Check if table already exists */
 
1419
  if ((create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
 
1420
      find_temporary_table(thd, db, table_name))
 
1421
  {
 
1422
    if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
 
1423
    {
 
1424
      create_info->table_existed= 1;            // Mark that table existed
 
1425
      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
 
1426
                          ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
 
1427
                          alias);
 
1428
      error= 0;
 
1429
      goto err;
 
1430
    }
 
1431
    my_error(ER_TABLE_EXISTS_ERROR, MYF(0), alias);
 
1432
    goto err;
 
1433
  }
 
1434
 
 
1435
  pthread_mutex_lock(&LOCK_open);
 
1436
  if (!internal_tmp_table && !(create_info->options & HA_LEX_CREATE_TMP_TABLE))
 
1437
  {
 
1438
    if (!access(path,F_OK))
 
1439
    {
 
1440
      if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
 
1441
        goto warn;
 
1442
      my_error(ER_TABLE_EXISTS_ERROR,MYF(0),table_name);
 
1443
      goto unlock_and_end;
 
1444
    }
 
1445
    /*
 
1446
      We don't assert here, but check the result, because the table could be
 
1447
      in the table definition cache and in the same time the .frm could be
 
1448
      missing from the disk, in case of manual intervention which deletes
 
1449
      the .frm file. The user has to use FLUSH TABLES; to clear the cache.
 
1450
      Then she could create the table. This case is pretty obscure and
 
1451
      therefore we don't introduce a new error message only for it.
 
1452
    */
 
1453
    if (get_cached_table_share(db, alias))
 
1454
    {
 
1455
      my_error(ER_TABLE_EXISTS_ERROR, MYF(0), table_name);
 
1456
      goto unlock_and_end;
 
1457
    }
 
1458
  }
 
1459
 
 
1460
  /*
 
1461
    Check that table with given name does not already
 
1462
    exist in any storage engine. In such a case it should
 
1463
    be discovered and the error ER_TABLE_EXISTS_ERROR be returned
 
1464
    unless user specified CREATE TABLE IF EXISTS
 
1465
    The LOCK_open mutex has been locked to make sure no
 
1466
    one else is attempting to discover the table. Since
 
1467
    it's not on disk as a frm file, no one could be using it!
 
1468
  */
 
1469
  if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
 
1470
  {
 
1471
    bool create_if_not_exists =
 
1472
      create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS;
 
1473
    int retcode = ha_table_exists_in_engine(thd, db, table_name);
 
1474
    DBUG_PRINT("info", ("exists_in_engine: %u",retcode));
 
1475
    switch (retcode)
 
1476
    {
 
1477
      case HA_ERR_NO_SUCH_TABLE:
 
1478
        /* Normal case, no table exists. we can go and create it */
 
1479
        break;
 
1480
      case HA_ERR_TABLE_EXIST:
 
1481
        DBUG_PRINT("info", ("Table existed in handler"));
 
1482
 
 
1483
        if (create_if_not_exists)
 
1484
          goto warn;
 
1485
        my_error(ER_TABLE_EXISTS_ERROR,MYF(0),table_name);
 
1486
        goto unlock_and_end;
 
1487
        break;
 
1488
      default:
 
1489
        DBUG_PRINT("info", ("error: %u from storage engine", retcode));
 
1490
        my_error(retcode, MYF(0),table_name);
 
1491
        goto unlock_and_end;
 
1492
    }
 
1493
  }
 
1494
 
 
1495
  thd_proc_info(thd, "creating table");
 
1496
  create_info->table_existed= 0;                // Mark that table is created
 
1497
 
 
1498
  create_info->table_options=db_options;
 
1499
 
 
1500
  path[path_length - reg_ext_length]= '\0'; // Remove .frm extension
 
1501
  if (rea_create_table(thd, path, db, table_name,
 
1502
                       create_info, alter_info->create_list,
 
1503
                       key_count, key_info_buffer, file))
 
1504
    goto unlock_and_end;
 
1505
 
 
1506
  if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
 
1507
  {
 
1508
    /* Open table and put in temporary table list */
 
1509
#if MYSQL_VERSION_ID >= 50404
 
1510
    if (!(open_temporary_table(thd, path, db, table_name, 1, OTM_OPEN)))
 
1511
#else
 
1512
    if (!(open_temporary_table(thd, path, db, table_name, 1)))
 
1513
#endif
 
1514
    {
 
1515
#if MYSQL_VERSION_ID >= 50404
 
1516
      (void) rm_temporary_table(create_info->db_type, path, false);
 
1517
#else
 
1518
      (void) rm_temporary_table(create_info->db_type, path);
 
1519
#endif
 
1520
      goto unlock_and_end;
 
1521
    }
 
1522
    thd->thread_specific_used= TRUE;
 
1523
  }
 
1524
 
 
1525
  /*
 
1526
    Don't write statement if:
 
1527
    - It is an internal temporary table,
 
1528
    - Row-based logging is used and it we are creating a temporary table, or
 
1529
    - The binary log is not open.
 
1530
    Otherwise, the statement shall be binlogged.
 
1531
   */
 
1532
  /* PBXT 1.0.09e
 
1533
   * Firstly we had a compile problem with MySQL 5.1.42 and
 
1534
   * the write_bin_log() call below:
 
1535
   * discover_xt.cc:1259: error: argument of type 'char* (Statement::)()' does not match 'const char*'
 
1536
   * 
 
1537
   * And secondly, we should no write the BINLOG anyway because this is
 
1538
   * an internal PBXT system table.
 
1539
   *
 
1540
   * So I am just commenting out the code altogether.
 
1541
  if (!internal_tmp_table &&
 
1542
      (!thd->current_stmt_binlog_row_based ||
 
1543
       (thd->current_stmt_binlog_row_based &&
 
1544
        !(create_info->options & HA_LEX_CREATE_TMP_TABLE))))
 
1545
    write_bin_log(thd, TRUE, thd->query, thd->query_length);
 
1546
   */
 
1547
  error= FALSE;
 
1548
unlock_and_end:
 
1549
  pthread_mutex_unlock(&LOCK_open);
 
1550
 
 
1551
err:
 
1552
  thd_proc_info(thd, "After create");
 
1553
  delete file;
 
1554
  DBUG_RETURN(error);
 
1555
 
 
1556
warn:
 
1557
  error= FALSE;
 
1558
  push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
 
1559
                      ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
 
1560
                      alias);
 
1561
  create_info->table_existed= 1;                // Mark that table existed
 
1562
  goto unlock_and_end;
 
1563
}
 
1564
 
 
1565
////////////////////////////////////////////////////////
 
1566
////// END OF CUT AND PASTES FROM  sql_table.cc ////////
 
1567
////////////////////////////////////////////////////////
 
1568
 
 
1569
#endif // DRIZZLED
 
1570
#endif // LOCK_OPEN_HACK_REQUIRED
 
1571
 
 
1572
//------------------------------
 
1573
int xt_create_table_frm(handlerton *hton, THD* thd, const char *db, const char *name, DT_FIELD_INFO *info, DT_KEY_INFO *XT_UNUSED(keys), xtBool skip_existing)
 
1574
{
 
1575
#ifdef DRIZZLED
 
1576
#define MYLEX_CREATE_INFO create_info
 
1577
#else
 
1578
#define MYLEX_CREATE_INFO mylex.create_info 
 
1579
#endif
 
1580
 
 
1581
#ifdef DRIZZLED
 
1582
        drizzled::statement::AlterTable *stmt = new drizzled::statement::AlterTable(thd);
 
1583
        HA_CREATE_INFO create_info;
 
1584
        //AlterInfo alter_info;
 
1585
        drizzled::message::Table table_proto;
 
1586
 
 
1587
        static const char *ext = ".dfe";
 
1588
        static const int ext_len = 4;
 
1589
 
 
1590
        table_proto.mutable_engine()->mutable_name()->assign("PBXT");
 
1591
#else
 
1592
        static const char *ext = ".frm";
 
1593
        static const int ext_len = 4;
 
1594
#endif
 
1595
        int err = 1;
 
1596
        char field_length_buffer[12], *field_length_ptr;
 
1597
        LEX  *save_lex= thd->lex, mylex;
 
1598
 
 
1599
        memset(&MYLEX_CREATE_INFO, 0, sizeof(HA_CREATE_INFO));
 
1600
 
 
1601
        thd->lex = &mylex;
 
1602
        lex_start(thd);
 
1603
#ifdef DRIZZLED
 
1604
        mylex.statement = stmt;
 
1605
#endif
 
1606
        
 
1607
        /* setup the create info */
 
1608
        MYLEX_CREATE_INFO.db_type = hton;
 
1609
 
 
1610
#ifndef DRIZZLED 
 
1611
        mylex.create_info.frm_only = 1;
 
1612
#endif
 
1613
        MYLEX_CREATE_INFO.default_table_charset = system_charset_info;
 
1614
        
 
1615
        /* setup the column info. */
 
1616
        while (info->field_name) {              
 
1617
                 LEX_STRING field_name, comment;                 
 
1618
                 field_name.str = (char*)(info->field_name);
 
1619
                 field_name.length = strlen(info->field_name);
 
1620
                 
 
1621
                 comment.str = (char*)(info->comment);
 
1622
                 comment.length = strlen(info->comment);
 
1623
                                        
 
1624
                 if (info->field_length) {
 
1625
                        sprintf(field_length_buffer, "%d", info->field_length);
 
1626
                        field_length_ptr = field_length_buffer;
 
1627
                 } else 
 
1628
                        field_length_ptr = NULL;
 
1629
 
 
1630
#ifdef DRIZZLED
 
1631
                if (add_field_to_list(thd, &field_name, info->field_type, field_length_ptr, info->field_decimal_length,
 
1632
                        info->field_flags,
 
1633
            COLUMN_FORMAT_TYPE_FIXED,
 
1634
                    NULL /*default_value*/, NULL /*on_update_value*/, &comment, NULL /*change*/,
 
1635
            NULL /*interval_list*/, info->field_charset))
 
1636
#else
 
1637
                if (add_field_to_list(thd, &field_name, info->field_type, field_length_ptr, info->field_decimal_length,
 
1638
                        info->field_flags,
 
1639
#if MYSQL_VERSION_ID >= 50404
 
1640
                                HA_SM_DISK,
 
1641
                                COLUMN_FORMAT_TYPE_FIXED,
 
1642
#endif
 
1643
                       NULL /*default_value*/, NULL /*on_update_value*/, &comment, NULL /*change*/, 
 
1644
                       NULL /*interval_list*/, info->field_charset, 0 /*uint_geom_type*/)) 
 
1645
#endif
 
1646
                        goto error;
 
1647
 
 
1648
 
 
1649
                info++;
 
1650
        }
 
1651
 
 
1652
        if (skip_existing) {
 
1653
                size_t db_len = strlen(db);
 
1654
                size_t name_len = strlen(name);
 
1655
                size_t len = db_len + 1 + name_len + ext_len + 1;
 
1656
                char *path = (char *)xt_malloc_ns(len);
 
1657
                memcpy(path, db, db_len);
 
1658
                memcpy(path + db_len + 1, name, name_len);
 
1659
                memcpy(path + db_len + 1 + name_len, ext, ext_len);
 
1660
                path[db_len] = XT_DIR_CHAR;
 
1661
                path[len - 1] = '\0';
 
1662
                xtBool exists = xt_fs_exists(path);
 
1663
                xt_free_ns(path);
 
1664
                if (exists)
 
1665
                        goto noerror;
 
1666
        }
 
1667
        
 
1668
        /* Create an internal temp table */
 
1669
#ifdef DRIZZLED
 
1670
    table_proto.set_name(name);
 
1671
    table_proto.set_type(drizzled::message::Table::STANDARD);
 
1672
    table_proto.set_schema(db);
 
1673
    table_proto.set_creation_timestamp(time(NULL));
 
1674
    table_proto.set_update_timestamp(time(NULL));
 
1675
        if (mysql_create_table_no_lock(thd, db, name, &create_info, &table_proto, &stmt->alter_info, 1, 0, skip_existing)) 
 
1676
                goto error;
 
1677
#else
 
1678
        if (mysql_create_table_no_lock(thd, db, name, &mylex.create_info, &mylex.alter_info, 1, 0)) 
 
1679
                goto error;
 
1680
#endif
 
1681
 
 
1682
        noerror:
 
1683
        err = 0;
 
1684
 
 
1685
        error:
 
1686
        lex_end(&mylex);
 
1687
        thd->lex = save_lex;
 
1688
        return err;
 
1689
}
 
1690