~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_table.cc

  • Committer: Brian Aker
  • Date: 2010-10-08 16:11:47 UTC
  • mto: This revision was merged to the branch mainline in revision 1834.
  • Revision ID: brian@tangent.org-20101008161147-o3us2a8itszsaycx
Typdef our lock type.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
14
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
15
15
 
16
16
/* drop and alter of tables */
17
17
 
36
36
#include "drizzled/transaction_services.h"
37
37
#include <drizzled/table_proto.h>
38
38
#include <drizzled/plugin/client.h>
39
 
#include <drizzled/table_identifier.h>
 
39
#include <drizzled/identifier.h>
40
40
#include "drizzled/internal/m_string.h"
41
41
#include "drizzled/global_charset_info.h"
42
42
#include "drizzled/charset.h"
49
49
#include <algorithm>
50
50
#include <sstream>
51
51
 
 
52
#include <boost/unordered_set.hpp>
 
53
 
52
54
using namespace std;
53
55
 
54
56
namespace drizzled
55
57
{
56
58
 
57
 
extern plugin::StorageEngine *myisam_engine;
58
59
extern pid_t current_pid;
59
60
 
60
 
bool is_primary_key(KEY *key_info)
 
61
bool is_primary_key(KeyInfo *key_info)
61
62
{
62
63
  static const char * primary_key_name="PRIMARY";
63
64
  return (strcmp(key_info->name, primary_key_name)==0);
72
73
    return NULL;
73
74
}
74
75
 
75
 
static bool check_if_keyname_exists(const char *name,KEY *start, KEY *end);
76
 
static char *make_unique_key_name(const char *field_name,KEY *start,KEY *end);
 
76
static bool check_if_keyname_exists(const char *name,KeyInfo *start, KeyInfo *end);
 
77
static char *make_unique_key_name(const char *field_name,KeyInfo *start,KeyInfo *end);
77
78
 
78
79
static bool prepare_blob_field(Session *session, CreateField *sql_field);
79
80
 
111
112
  transaction_services.rawStatement(session, query);
112
113
}
113
114
 
114
 
 
115
 
/* Should should be refactored to go away */
116
 
void write_bin_log_drop_table(Session *session, bool if_exists, const char *db_name, const char *table_name)
117
 
{
118
 
  TransactionServices &transaction_services= TransactionServices::singleton();
119
 
  string built_query;
120
 
 
121
 
  if (if_exists)
122
 
    built_query.append("DROP TABLE IF EXISTS ");
123
 
  else
124
 
    built_query.append("DROP TABLE ");
125
 
 
126
 
  built_query.append("`");
127
 
  if (session->db.empty() || strcmp(db_name, session->db.c_str()) != 0)
128
 
  {
129
 
    built_query.append(db_name);
130
 
    built_query.append("`.`");
131
 
  }
132
 
 
133
 
  built_query.append(table_name);
134
 
  built_query.append("`");
135
 
  transaction_services.rawStatement(session, built_query);
136
 
}
137
 
 
138
115
/*
139
116
  Execute the drop of a normal or temporary table
140
117
 
146
123
                        In this case we give an warning of level 'NOTE'
147
124
    drop_temporary      Only drop temporary tables
148
125
 
149
 
  TODO:
 
126
  @todo
150
127
    When logging to the binary log, we should log
151
128
    tmp_tables and transactional tables as separate statements if we
152
129
    are in a transaction;  This is needed to get these tables into the
170
147
  int error= 0;
171
148
  bool foreign_key_error= false;
172
149
 
173
 
  pthread_mutex_lock(&LOCK_open); /* Part 2 of rm a table */
174
 
 
175
 
  /*
176
 
    If we have the table in the definition cache, we don't have to check the
177
 
    .frm cursor to find if the table is a normal table (not view) and what
178
 
    engine to use.
179
 
  */
180
 
 
181
 
  for (table= tables; table; table= table->next_local)
182
 
  {
183
 
    TableIdentifier identifier(table->db, table->table_name);
184
 
    TableShare *share;
185
 
    table->db_type= NULL;
186
 
 
187
 
    if ((share= TableShare::getShare(identifier)))
188
 
    {
189
 
      table->db_type= share->db_type();
190
 
    }
191
 
  }
 
150
  LOCK_open.lock(); /* Part 2 of rm a table */
192
151
 
193
152
  if (not drop_temporary && lock_table_names_exclusively(session, tables))
194
153
  {
195
 
    pthread_mutex_unlock(&LOCK_open);
 
154
    LOCK_open.unlock();
196
155
    return 1;
197
156
  }
198
157
 
220
179
    if (drop_temporary == false)
221
180
    {
222
181
      Table *locked_table;
223
 
      abort_locked_tables(session, db, table->table_name);
224
 
      remove_table_from_cache(session, db, table->table_name,
 
182
      TableIdentifier identifier(db, table->table_name);
 
183
      abort_locked_tables(session, identifier);
 
184
      remove_table_from_cache(session, identifier,
225
185
                              RTFC_WAIT_OTHER_THREAD_FLAG |
226
186
                              RTFC_CHECK_KILLED_FLAG);
227
187
      /*
228
188
        If the table was used in lock tables, remember it so that
229
189
        unlock_table_names can free it
230
190
      */
231
 
      if ((locked_table= drop_locked_tables(session, db, table->table_name)))
 
191
      if ((locked_table= drop_locked_tables(session, identifier)))
232
192
        table->table= locked_table;
233
193
 
234
194
      if (session->killed)
237
197
        goto err_with_placeholders;
238
198
      }
239
199
    }
240
 
    TableIdentifier identifier(db, table->table_name, table->internal_tmp_table ? message::Table::INTERNAL : message::Table::STANDARD);
 
200
    TableIdentifier identifier(db, table->table_name, table->getInternalTmpTable() ? message::Table::INTERNAL : message::Table::STANDARD);
241
201
 
242
202
    if (drop_temporary || not plugin::StorageEngine::doesTableExist(*session, identifier))
243
203
    {
283
243
    It's safe to unlock LOCK_open: we have an exclusive lock
284
244
    on the table name.
285
245
  */
286
 
  pthread_mutex_unlock(&LOCK_open);
 
246
  LOCK_open.unlock();
287
247
  error= 0;
288
248
 
289
249
  if (wrong_tables.length())
300
260
    error= 1;
301
261
  }
302
262
 
303
 
  pthread_mutex_lock(&LOCK_open); /* final bit in rm table lock */
 
263
  LOCK_open.lock(); /* final bit in rm table lock */
304
264
 
305
265
err_with_placeholders:
306
266
  unlock_table_names(tables, NULL);
307
 
  pthread_mutex_unlock(&LOCK_open);
 
267
  LOCK_open.unlock();
308
268
  session->no_warnings_for_error= 0;
309
269
 
310
270
  return error;
344
304
  PRIMARY keys are prioritized.
345
305
*/
346
306
 
347
 
static int sort_keys(KEY *a, KEY *b)
 
307
static int sort_keys(KeyInfo *a, KeyInfo *b)
348
308
{
349
309
  ulong a_flags= a->flags, b_flags= b->flags;
350
310
 
396
356
    1             Error
397
357
*/
398
358
 
 
359
class typelib_set_member
 
360
{
 
361
public:
 
362
  string s;
 
363
  const CHARSET_INFO * const cs;
 
364
 
 
365
  typelib_set_member(const char* value, unsigned int length,
 
366
                     const CHARSET_INFO * const charset)
 
367
    : s(value, length),
 
368
      cs(charset)
 
369
  {}
 
370
};
 
371
 
 
372
static bool operator==(typelib_set_member const& a, typelib_set_member const& b)
 
373
{
 
374
  return (my_strnncoll(a.cs,
 
375
                       (const unsigned char*)a.s.c_str(), a.s.length(),
 
376
                       (const unsigned char*)b.s.c_str(), b.s.length())==0);
 
377
}
 
378
 
 
379
 
 
380
namespace
 
381
{
 
382
class typelib_set_member_hasher
 
383
{
 
384
  boost::hash<string> hasher;
 
385
public:
 
386
  std::size_t operator()(const typelib_set_member& t) const
 
387
  {
 
388
    return hasher(t.s);
 
389
  }
 
390
};
 
391
}
 
392
 
399
393
static bool check_duplicates_in_interval(const char *set_or_name,
400
394
                                         const char *name, TYPELIB *typelib,
401
395
                                         const CHARSET_INFO * const cs,
406
400
  unsigned int *cur_length= typelib->type_lengths;
407
401
  *dup_val_count= 0;
408
402
 
409
 
  for ( ; tmp.count > 1; cur_value++, cur_length++)
 
403
  boost::unordered_set<typelib_set_member, typelib_set_member_hasher> interval_set;
 
404
 
 
405
  for ( ; tmp.count > 0; cur_value++, cur_length++)
410
406
  {
411
407
    tmp.type_names++;
412
408
    tmp.type_lengths++;
413
409
    tmp.count--;
414
 
    if (find_type2(&tmp, (const char*)*cur_value, *cur_length, cs))
 
410
    if (interval_set.find(typelib_set_member(*cur_value, *cur_length, cs)) != interval_set.end())
415
411
    {
416
412
      my_error(ER_DUPLICATED_VALUE_IN_TYPE, MYF(0),
417
413
               name,*cur_value,set_or_name);
418
414
      return 1;
419
415
    }
 
416
    else
 
417
      interval_set.insert(typelib_set_member(*cur_value, *cur_length, cs));
420
418
  }
421
419
  return 0;
422
420
}
489
487
 
490
488
  switch (sql_field->sql_type) {
491
489
  case DRIZZLE_TYPE_BLOB:
492
 
    sql_field->pack_flag= pack_length_to_packflag(sql_field->pack_length - portable_sizeof_char_ptr);
493
490
    sql_field->length= 8; // Unireg field length
494
491
    (*blob_columns)++;
495
492
    break;
496
493
  case DRIZZLE_TYPE_VARCHAR:
497
 
    sql_field->pack_flag=0;
498
494
    break;
499
495
  case DRIZZLE_TYPE_ENUM:
500
 
    sql_field->pack_flag=pack_length_to_packflag(sql_field->pack_length);
501
496
    if (check_duplicates_in_interval("ENUM",
502
497
                                     sql_field->field_name,
503
498
                                     sql_field->interval,
508
503
  case DRIZZLE_TYPE_DATE:  // Rest of string types
509
504
  case DRIZZLE_TYPE_DATETIME:
510
505
  case DRIZZLE_TYPE_NULL:
511
 
    sql_field->pack_flag=f_settype((uint32_t) sql_field->sql_type);
512
506
    break;
513
507
  case DRIZZLE_TYPE_DECIMAL:
514
 
    sql_field->pack_flag= 0;
515
508
    break;
516
509
  case DRIZZLE_TYPE_TIMESTAMP:
517
510
    /* We should replace old TIMESTAMP fields with their newer analogs */
523
516
        (*timestamps_with_niladic)++;
524
517
      }
525
518
      else
 
519
      {
526
520
        sql_field->unireg_check= Field::NONE;
 
521
      }
527
522
    }
528
523
    else if (sql_field->unireg_check != Field::NONE)
529
524
      (*timestamps_with_niladic)++;
531
526
    (*timestamps)++;
532
527
    /* fall-through */
533
528
  default:
534
 
    sql_field->pack_flag=(0 |
535
 
                          f_settype((uint32_t) sql_field->sql_type));
536
529
    break;
537
530
  }
 
531
 
538
532
  return 0;
539
533
}
540
534
 
544
538
                                      AlterInfo *alter_info,
545
539
                                      bool tmp_table,
546
540
                                      uint32_t *db_options,
547
 
                                      KEY **key_info_buffer,
 
541
                                      KeyInfo **key_info_buffer,
548
542
                                      uint32_t *key_count,
549
543
                                      int select_field_count)
550
544
{
552
546
  CreateField   *sql_field,*dup_field;
553
547
  uint          field,null_fields,blob_columns,max_key_length;
554
548
  ulong         record_offset= 0;
555
 
  KEY           *key_info;
556
 
  KEY_PART_INFO *key_part_info;
 
549
  KeyInfo               *key_info;
 
550
  KeyPartInfo *key_part_info;
557
551
  int           timestamps= 0, timestamps_with_niladic= 0;
558
552
  int           field_no,dup_no;
559
553
  int           select_field_pos,auto_increment=0;
666
660
          {
667
661
            uint32_t cnv_errs;
668
662
            conv.copy(tmp->ptr(), tmp->length(), tmp->charset(), cs, &cnv_errs);
669
 
            interval->type_names[i]= strmake_root(session->mem_root, conv.ptr(),
670
 
                                                  conv.length());
 
663
            interval->type_names[i]= session->mem_root->strmake_root(conv.ptr(), conv.length());
671
664
            interval->type_lengths[i]= conv.length();
672
665
          }
673
666
 
775
768
    /** @todo Get rid of this MyISAM-specific crap. */
776
769
    if (not create_proto.engine().name().compare("MyISAM") &&
777
770
        ((sql_field->flags & BLOB_FLAG) ||
778
 
         (sql_field->sql_type == DRIZZLE_TYPE_VARCHAR && create_info->row_type != ROW_TYPE_FIXED)))
 
771
         (sql_field->sql_type == DRIZZLE_TYPE_VARCHAR)))
779
772
      (*db_options)|= HA_OPTION_PACK_RECORD;
780
773
    it2.rewind();
781
774
  }
843
836
      fk_key_count++;
844
837
      if (((Foreign_key *)key)->validate(alter_info->create_list))
845
838
        return true;
 
839
 
846
840
      Foreign_key *fk_key= (Foreign_key*) key;
 
841
 
 
842
      add_foreign_key_to_table_message(&create_proto,
 
843
                                       fk_key->name.str,
 
844
                                       fk_key->columns,
 
845
                                       fk_key->ref_table,
 
846
                                       fk_key->ref_columns,
 
847
                                       fk_key->delete_opt,
 
848
                                       fk_key->update_opt,
 
849
                                       fk_key->match_opt);
 
850
 
847
851
      if (fk_key->ref_columns.elements &&
848
852
          fk_key->ref_columns.elements != fk_key->columns.elements)
849
853
      {
878
882
             key2->name.str != ignore_key &&
879
883
             !foreign_key_prefix(key, key2)))
880
884
        {
881
 
          /* TODO: issue warning message */
 
885
          /* @todo issue warning message */
882
886
          /* mark that the generated key should be ignored */
883
887
          if (!key2->generated ||
884
888
              (key->generated && key->columns.elements <
912
916
    return(true);
913
917
  }
914
918
 
915
 
  (*key_info_buffer)= key_info= (KEY*) memory::sql_calloc(sizeof(KEY) * (*key_count));
916
 
  key_part_info=(KEY_PART_INFO*) memory::sql_calloc(sizeof(KEY_PART_INFO)*key_parts);
 
919
  (*key_info_buffer)= key_info= (KeyInfo*) memory::sql_calloc(sizeof(KeyInfo) * (*key_count));
 
920
  key_part_info=(KeyPartInfo*) memory::sql_calloc(sizeof(KeyPartInfo)*key_parts);
917
921
  if (!*key_info_buffer || ! key_part_info)
918
922
    return(true);                               // Out of memory
919
923
 
953
957
    key_info->usable_key_parts= key_number;
954
958
    key_info->algorithm= key->key_create_info.algorithm;
955
959
 
956
 
    /* Take block size from key part or table part */
957
 
    /*
958
 
      TODO: Add warning if block size changes. We can't do it here, as
959
 
      this may depend on the size of the key
960
 
    */
961
 
    key_info->block_size= (key->key_create_info.block_size ?
962
 
                           key->key_create_info.block_size :
963
 
                           create_proto.options().key_block_size());
964
 
 
965
 
    if (key_info->block_size)
966
 
      key_info->flags|= HA_USES_BLOCK_SIZE;
967
 
 
968
960
    uint32_t tmp_len= system_charset_info->cset->charpos(system_charset_info,
969
961
                                           key->key_create_info.comment.str,
970
962
                                           key->key_create_info.comment.str +
1074
1066
 
1075
1067
      key_part_info->fieldnr= field;
1076
1068
      key_part_info->offset=  (uint16_t) sql_field->offset;
1077
 
      key_part_info->key_type=sql_field->pack_flag;
 
1069
      key_part_info->key_type= 0;
1078
1070
      length= sql_field->key_length;
1079
1071
 
1080
1072
      if (column->length)
1142
1134
      key_part_info->length=(uint16_t) length;
1143
1135
      /* Use packed keys for long strings on the first column */
1144
1136
      if (!((*db_options) & HA_OPTION_NO_PACK_KEYS) &&
1145
 
          (length >= KEY_DEFAULT_PACK_LENGTH &&
1146
 
           (sql_field->sql_type == DRIZZLE_TYPE_VARCHAR ||
1147
 
      sql_field->sql_type == DRIZZLE_TYPE_BLOB)))
 
1137
          (length >= KEY_DEFAULT_PACK_LENGTH &&
 
1138
           (sql_field->sql_type == DRIZZLE_TYPE_VARCHAR ||
 
1139
            sql_field->sql_type == DRIZZLE_TYPE_BLOB)))
1148
1140
      {
1149
1141
        if ((column_nr == 0 && sql_field->sql_type == DRIZZLE_TYPE_BLOB) ||
1150
1142
            sql_field->sql_type == DRIZZLE_TYPE_VARCHAR)
 
1143
        {
1151
1144
          key_info->flags|= HA_BINARY_PACK_KEY | HA_VAR_LENGTH_KEY;
 
1145
        }
1152
1146
        else
 
1147
        {
1153
1148
          key_info->flags|= HA_PACK_KEY;
 
1149
        }
1154
1150
      }
1155
1151
      /* Check if the key segment is partial, set the key flag accordingly */
1156
1152
      if (length != sql_field->key_length)
1212
1208
    return(true);
1213
1209
  }
1214
1210
  /* Sort keys in optimized order */
1215
 
  internal::my_qsort((unsigned char*) *key_info_buffer, *key_count, sizeof(KEY),
 
1211
  internal::my_qsort((unsigned char*) *key_info_buffer, *key_count, sizeof(KeyInfo),
1216
1212
                     (qsort_cmp) sort_keys);
1217
1213
 
1218
1214
  /* Check fields. */
1295
1291
                                bool internal_tmp_table,
1296
1292
                                uint db_options,
1297
1293
                                uint key_count,
1298
 
                                KEY *key_info_buffer)
 
1294
                                KeyInfo *key_info_buffer)
1299
1295
{
1300
1296
  bool error= true;
1301
1297
 
1426
1422
                                bool is_if_not_exists)
1427
1423
{
1428
1424
  uint          db_options, key_count;
1429
 
  KEY           *key_info_buffer;
 
1425
  KeyInfo               *key_info_buffer;
1430
1426
  bool          error= true;
1431
 
  TableShare share;
1432
1427
 
1433
1428
  /* Check for duplicate fields and check type of table to create */
1434
1429
  if (not alter_info->create_list.elements)
1440
1435
  assert(identifier.getTableName() == table_proto.name());
1441
1436
  db_options= create_info->table_options;
1442
1437
 
1443
 
  if (create_info->row_type == ROW_TYPE_DYNAMIC)
1444
 
    db_options|=HA_OPTION_PACK_RECORD;
1445
 
 
1446
1438
  set_table_default_charset(create_info, identifier.getSchemaName().c_str());
1447
1439
 
1448
1440
  /* Build a Table object to pass down to the engine, and the do the actual create. */
1452
1444
                                 &key_info_buffer, &key_count,
1453
1445
                                 select_field_count))
1454
1446
  {
1455
 
    pthread_mutex_lock(&LOCK_open); /* CREATE TABLE (some confussion on naming, double check) */
 
1447
    boost_unique_lock_t lock(LOCK_open); /* CREATE TABLE (some confussion on naming, double check) */
1456
1448
    error= locked_create_event(session,
1457
1449
                               identifier,
1458
1450
                               create_info,
1462
1454
                               internal_tmp_table,
1463
1455
                               db_options, key_count,
1464
1456
                               key_info_buffer);
1465
 
    pthread_mutex_unlock(&LOCK_open);
1466
1457
  }
1467
1458
 
1468
1459
  session->set_proc_info("After create");
1519
1510
 
1520
1511
  if (name_lock)
1521
1512
  {
1522
 
    pthread_mutex_lock(&LOCK_open); /* Lock for removing name_lock during table create */
 
1513
    boost_unique_lock_t lock(LOCK_open); /* Lock for removing name_lock during table create */
1523
1514
    session->unlink_open_table(name_lock);
1524
 
    pthread_mutex_unlock(&LOCK_open);
1525
1515
  }
1526
1516
 
1527
1517
  return(result);
1568
1558
**/
1569
1559
 
1570
1560
static bool
1571
 
check_if_keyname_exists(const char *name, KEY *start, KEY *end)
 
1561
check_if_keyname_exists(const char *name, KeyInfo *start, KeyInfo *end)
1572
1562
{
1573
 
  for (KEY *key=start ; key != end ; key++)
 
1563
  for (KeyInfo *key=start ; key != end ; key++)
1574
1564
    if (!my_strcasecmp(system_charset_info,name,key->name))
1575
1565
      return 1;
1576
1566
  return 0;
1578
1568
 
1579
1569
 
1580
1570
static char *
1581
 
make_unique_key_name(const char *field_name,KEY *start,KEY *end)
 
1571
make_unique_key_name(const char *field_name,KeyInfo *start,KeyInfo *end)
1582
1572
{
1583
1573
  char buff[MAX_FIELD_NAME],*buff_end;
1584
1574
 
1613
1603
 
1614
1604
  SYNOPSIS
1615
1605
    mysql_rename_table()
 
1606
      session
1616
1607
      base                      The plugin::StorageEngine handle.
1617
1608
      old_db                    The old database name.
1618
1609
      old_name                  The old table name.
1619
1610
      new_db                    The new database name.
1620
1611
      new_name                  The new table name.
1621
 
      flags                     flags for build_table_filename().
1622
 
                                FN_FROM_IS_TMP old_name is temporary.
1623
 
                                FN_TO_IS_TMP   new_name is temporary.
1624
1612
 
1625
1613
  RETURN
1626
1614
    false   OK
1628
1616
*/
1629
1617
 
1630
1618
bool
1631
 
mysql_rename_table(plugin::StorageEngine *base,
 
1619
mysql_rename_table(Session &session,
 
1620
                   plugin::StorageEngine *base,
1632
1621
                   TableIdentifier &from,
1633
 
                   TableIdentifier &to,
1634
 
                   uint32_t )
 
1622
                   TableIdentifier &to)
1635
1623
{
1636
 
  Session *session= current_session;
1637
1624
  int error= 0;
1638
1625
 
1639
1626
  assert(base);
1644
1631
    return true;
1645
1632
  }
1646
1633
 
1647
 
  error= base->renameTable(*session, from, to);
 
1634
  error= base->renameTable(session, from, to);
1648
1635
 
1649
1636
  if (error == HA_ERR_WRONG_COMMAND)
1650
1637
  {
1685
1672
                              enum ha_extra_function function)
1686
1673
{
1687
1674
 
1688
 
  safe_mutex_assert_owner(&LOCK_open);
 
1675
  safe_mutex_assert_owner(LOCK_open.native_handle());
1689
1676
 
1690
1677
  table->cursor->extra(function);
1691
1678
  /* Mark all tables that are in use as 'old' */
1692
1679
  mysql_lock_abort(session, table);     /* end threads waiting on lock */
1693
1680
 
1694
1681
  /* Wait until all there are no other threads that has this table open */
1695
 
  remove_table_from_cache(session, table->s->getSchemaName(),
1696
 
                          table->s->table_name.str,
1697
 
                          RTFC_WAIT_OTHER_THREAD_FLAG);
 
1682
  TableIdentifier identifier(table->getMutableShare()->getSchemaName(), table->getMutableShare()->getTableName());
 
1683
  remove_table_from_cache(session, identifier, RTFC_WAIT_OTHER_THREAD_FLAG);
1698
1684
}
1699
1685
 
1700
1686
/*
1788
1774
      /*
1789
1775
        Time zone tables and SP tables can be add to lex->query_tables list,
1790
1776
        so it have to be prepared.
1791
 
        TODO: Investigate if we can put extra tables into argument instead of
1792
 
        using lex->query_tables
 
1777
        @todo Investigate if we can put extra tables into argument instead of using lex->query_tables
1793
1778
      */
1794
1779
      lex->query_tables= table;
1795
1780
      lex->query_tables_last= &table->next_global;
1829
1814
      length= snprintf(buff, sizeof(buff), ER(ER_OPEN_AS_READONLY),
1830
1815
                       table_name);
1831
1816
      session->client->store(buff, length);
1832
 
      transaction_services.ha_autocommit_or_rollback(session, false);
 
1817
      transaction_services.autocommitOrRollback(session, false);
1833
1818
      session->endTransaction(COMMIT);
1834
1819
      session->close_thread_tables();
1835
1820
      lex->reset_query_tables_list(false);
1840
1825
    }
1841
1826
 
1842
1827
    /* Close all instances of the table to allow repair to rename files */
1843
 
    if (lock_type == TL_WRITE && table->table->s->version)
 
1828
    if (lock_type == TL_WRITE && table->table->getShare()->getVersion())
1844
1829
    {
1845
 
      pthread_mutex_lock(&LOCK_open); /* Lock type is TL_WRITE and we lock to repair the table */
1846
 
      const char *old_message=session->enter_cond(&COND_refresh, &LOCK_open,
1847
 
                                              "Waiting to get writelock");
 
1830
      LOCK_open.lock(); /* Lock type is TL_WRITE and we lock to repair the table */
 
1831
      const char *old_message=session->enter_cond(COND_refresh, LOCK_open,
 
1832
                                                  "Waiting to get writelock");
1848
1833
      mysql_lock_abort(session,table->table);
1849
 
      remove_table_from_cache(session, table->table->s->getSchemaName(),
1850
 
                              table->table->s->table_name.str,
 
1834
      TableIdentifier identifier(table->table->getMutableShare()->getSchemaName(), table->table->getMutableShare()->getTableName());
 
1835
      remove_table_from_cache(session, identifier,
1851
1836
                              RTFC_WAIT_OTHER_THREAD_FLAG |
1852
1837
                              RTFC_CHECK_KILLED_FLAG);
1853
1838
      session->exit_cond(old_message);
1938
1923
    if (table->table)
1939
1924
    {
1940
1925
      if (fatal_error)
1941
 
        table->table->s->version=0;               // Force close of table
 
1926
      {
 
1927
        table->table->getMutableShare()->resetVersion();               // Force close of table
 
1928
      }
1942
1929
      else if (open_for_modify)
1943
1930
      {
1944
 
        if (table->table->s->tmp_table)
 
1931
        if (table->table->getShare()->getType())
 
1932
        {
1945
1933
          table->table->cursor->info(HA_STATUS_CONST);
 
1934
        }
1946
1935
        else
1947
1936
        {
1948
 
          pthread_mutex_lock(&LOCK_open);
1949
 
          remove_table_from_cache(session, table->table->s->getSchemaName(),
1950
 
                                  table->table->s->table_name.str, RTFC_NO_FLAG);
1951
 
          pthread_mutex_unlock(&LOCK_open);
 
1937
          boost::unique_lock<boost::mutex> lock(LOCK_open);
 
1938
          TableIdentifier identifier(table->table->getMutableShare()->getSchemaName(), table->table->getMutableShare()->getTableName());
 
1939
          remove_table_from_cache(session, identifier, RTFC_NO_FLAG);
1952
1940
        }
1953
1941
      }
1954
1942
    }
1955
 
    transaction_services.ha_autocommit_or_rollback(session, false);
 
1943
    transaction_services.autocommitOrRollback(session, false);
1956
1944
    session->endTransaction(COMMIT);
1957
1945
    session->close_thread_tables();
1958
1946
    table->table=0;                             // For query cache
1964
1952
  return(false);
1965
1953
 
1966
1954
err:
1967
 
  transaction_services.ha_autocommit_or_rollback(session, true);
 
1955
  transaction_services.autocommitOrRollback(session, true);
1968
1956
  session->endTransaction(ROLLBACK);
1969
1957
  session->close_thread_tables();                       // Shouldn't be needed
1970
1958
  if (table)
1972
1960
  return(true);
1973
1961
}
1974
1962
 
1975
 
/*
1976
 
  We have to write the query before we unlock the named table.
1977
 
 
1978
 
  Since temporary tables are not replicated under row-based
1979
 
  replication, CREATE TABLE ... LIKE ... needs special
1980
 
  treatement.  We have four cases to consider, according to the
1981
 
  following decision table:
1982
 
 
1983
 
  ==== ========= ========= ==============================
1984
 
  Case    Target    Source Write to binary log
1985
 
  ==== ========= ========= ==============================
1986
 
  1       normal    normal Original statement
1987
 
  2       normal temporary Generated statement
1988
 
  3    temporary    normal Nothing
1989
 
  4    temporary temporary Nothing
1990
 
  ==== ========= ========= ==============================
1991
 
*/
1992
 
static bool replicateCreateTableLike(Session *session, TableList *table, Table *name_lock,
1993
 
                                     bool is_src_table_tmp, bool is_if_not_exists)
1994
 
{
1995
 
  if (is_src_table_tmp)
1996
 
  {
1997
 
    char buf[2048];
1998
 
    String query(buf, sizeof(buf), system_charset_info);
1999
 
    query.length(0);  // Have to zero it since constructor doesn't
2000
 
 
2001
 
 
2002
 
    /*
2003
 
      Here we open the destination table, on which we already have
2004
 
      name-lock. This is needed for store_create_info() to work.
2005
 
      The table will be closed by unlink_open_table() at the end
2006
 
      of this function.
2007
 
    */
2008
 
    table->table= name_lock;
2009
 
    pthread_mutex_lock(&LOCK_open); /* Open new table we have just acquired */
2010
 
    if (session->reopen_name_locked_table(table, false))
2011
 
    {
2012
 
      pthread_mutex_unlock(&LOCK_open);
2013
 
      return false;
2014
 
    }
2015
 
    pthread_mutex_unlock(&LOCK_open);
2016
 
 
2017
 
    int result= store_create_info(table, &query, is_if_not_exists);
2018
 
 
2019
 
    assert(result == 0); // store_create_info() always return 0
2020
 
    write_bin_log(session, query.ptr());
2021
 
  }
2022
 
  else                                      // Case 1
2023
 
  {
2024
 
    write_bin_log(session, session->query.c_str());
2025
 
  }
2026
 
 
2027
 
  return true;
2028
 
}
2029
 
 
2030
1963
  /*
2031
1964
    Create a new table by copying from source table
2032
1965
 
2067
2000
 
2068
2001
  if (is_engine_set)
2069
2002
  {
2070
 
    message::Table::StorageEngine *protoengine;
2071
 
 
2072
 
    protoengine= new_proto.mutable_engine();
2073
 
    protoengine->set_name(create_table_proto.engine().name());
 
2003
    new_proto.mutable_engine()->set_name(create_table_proto.engine().name());
2074
2004
  }
2075
2005
 
2076
2006
  { // We now do a selective copy of elements on to the new table.
2096
2026
  */
2097
2027
  int err= plugin::StorageEngine::createTable(session,
2098
2028
                                              destination_identifier,
2099
 
                                              true, new_proto);
 
2029
                                              new_proto);
 
2030
 
 
2031
  if (err == false && not destination_identifier.isTmp())
 
2032
  {
 
2033
    TransactionServices &transaction_services= TransactionServices::singleton();
 
2034
    transaction_services.createTable(&session, new_proto);
 
2035
  }
2100
2036
 
2101
2037
  return err ? false : true;
2102
2038
}
2123
2059
                             bool is_if_not_exists,
2124
2060
                             bool is_engine_set)
2125
2061
{
2126
 
  Table *name_lock= 0;
2127
2062
  char *db= table->db;
2128
2063
  char *table_name= table->table_name;
2129
2064
  bool res= true;
2130
2065
  uint32_t not_used;
2131
 
  bool was_created;
2132
2066
 
2133
2067
  /*
2134
2068
    By opening source table we guarantee that it exists and no concurrent
2142
2076
  if (session->open_tables_from_list(&src_table, &not_used))
2143
2077
    return true;
2144
2078
 
2145
 
  TableIdentifier src_identifier(src_table->table->s->getSchemaName(),
2146
 
                                 src_table->table->s->table_name.str, src_table->table->s->tmp_table);
 
2079
  TableIdentifier src_identifier(src_table->table->getMutableShare()->getSchemaName(),
 
2080
                                 src_table->table->getMutableShare()->getTableName(), src_table->table->getMutableShare()->getType());
2147
2081
 
2148
2082
 
2149
2083
 
2150
2084
  /*
2151
2085
    Check that destination tables does not exist. Note that its name
2152
2086
    was already checked when it was added to the table list.
 
2087
 
 
2088
    For temporary tables we don't aim to grab locks.
2153
2089
  */
2154
2090
  bool table_exists= false;
2155
2091
  if (destination_identifier.isTmp())
2158
2094
    {
2159
2095
      table_exists= true;
2160
2096
    }
 
2097
    else
 
2098
    {
 
2099
      bool was_created= create_table_wrapper(*session, create_table_proto, destination_identifier,
 
2100
                                             src_identifier, is_engine_set);
 
2101
      if (not was_created) // This is pretty paranoid, but we assume something might not clean up after itself
 
2102
      {
 
2103
        (void) session->rm_temporary_table(destination_identifier, true);
 
2104
      }
 
2105
      else if (not session->open_temporary_table(destination_identifier))
 
2106
      {
 
2107
        // We created, but we can't open... also, a hack.
 
2108
        (void) session->rm_temporary_table(destination_identifier, true);
 
2109
      }
 
2110
      else
 
2111
      {
 
2112
        res= false;
 
2113
      }
 
2114
    }
2161
2115
  }
2162
 
  else
 
2116
  else // Standard table which will require locks.
2163
2117
  {
2164
 
    if (session->lock_table_name_if_not_cached(db, table_name, &name_lock))
 
2118
    Table *name_lock= 0;
 
2119
 
 
2120
    if (session->lock_table_name_if_not_cached(destination_identifier, &name_lock))
2165
2121
    {
2166
2122
      if (name_lock)
2167
2123
      {
2168
 
        pthread_mutex_lock(&LOCK_open); /* unlink open tables for create table like*/
 
2124
        boost_unique_lock_t lock(LOCK_open); /* unlink open tables for create table like*/
2169
2125
        session->unlink_open_table(name_lock);
2170
 
        pthread_mutex_unlock(&LOCK_open);
2171
2126
      }
2172
2127
 
2173
2128
      return res;
2181
2136
    {
2182
2137
      table_exists= true;
2183
2138
    }
 
2139
    else // Otherwise we create the table
 
2140
    {
 
2141
      bool was_created;
 
2142
      {
 
2143
        boost_unique_lock_t lock(LOCK_open); /* We lock for CREATE TABLE LIKE to copy table definition */
 
2144
        was_created= create_table_wrapper(*session, create_table_proto, destination_identifier,
 
2145
                                               src_identifier, is_engine_set);
 
2146
      }
 
2147
 
 
2148
      // So we blew the creation of the table, and we scramble to clean up
 
2149
      // anything that might have been created (read... it is a hack)
 
2150
      if (not was_created)
 
2151
      {
 
2152
        quick_rm_table(*session, destination_identifier);
 
2153
      } 
 
2154
      else
 
2155
      {
 
2156
        res= false;
 
2157
      }
 
2158
    }
 
2159
 
 
2160
    if (name_lock)
 
2161
    {
 
2162
      boost_unique_lock_t lock(LOCK_open); /* unlink open tables for create table like*/
 
2163
      session->unlink_open_table(name_lock);
 
2164
    }
2184
2165
  }
2185
2166
 
2186
2167
  if (table_exists)
2199
2180
      my_error(ER_TABLE_EXISTS_ERROR, MYF(0), table_name);
2200
2181
    }
2201
2182
  }
2202
 
  else // Otherwise we create the table
2203
 
  {
2204
 
    pthread_mutex_lock(&LOCK_open); /* We lock for CREATE TABLE LIKE to copy table definition */
2205
 
    was_created= create_table_wrapper(*session, create_table_proto, destination_identifier,
2206
 
                                      src_identifier, is_engine_set);
2207
 
    pthread_mutex_unlock(&LOCK_open);
2208
 
 
2209
 
    // So we blew the creation of the table, and we scramble to clean up
2210
 
    // anything that might have been created (read... it is a hack)
2211
 
    if (not was_created)
2212
 
    {
2213
 
      if (destination_identifier.isTmp())
2214
 
      {
2215
 
        (void) session->rm_temporary_table(destination_identifier);
2216
 
      }
2217
 
      else
2218
 
      {
2219
 
        quick_rm_table(*session, destination_identifier);
2220
 
      }
2221
 
    } 
2222
 
    else if (destination_identifier.isTmp() && not session->open_temporary_table(destination_identifier))
2223
 
    {
2224
 
      // We created, but we can't open... also, a hack.
2225
 
      (void) session->rm_temporary_table(destination_identifier);
2226
 
    }
2227
 
    else
2228
 
    {
2229
 
      if (not destination_identifier.isTmp())
2230
 
      {
2231
 
        bool rc= replicateCreateTableLike(session, table, name_lock, (src_table->table->s->tmp_table), is_if_not_exists);
2232
 
        (void)rc;
2233
 
      }
2234
 
 
2235
 
      res= false;
2236
 
    }
2237
 
  }
2238
 
 
2239
 
  if (name_lock)
2240
 
  {
2241
 
    pthread_mutex_lock(&LOCK_open); /* unlink open tables for create table like*/
2242
 
    session->unlink_open_table(name_lock);
2243
 
    pthread_mutex_unlock(&LOCK_open);
2244
 
  }
2245
2183
 
2246
2184
  return(res);
2247
2185
}
2267
2205
                                &Cursor::ha_check));
2268
2206
}
2269
2207
 
2270
 
 
2271
 
bool mysql_checksum_table(Session *session, TableList *tables,
2272
 
                          HA_CHECK_OPT *)
2273
 
{
2274
 
  TableList *table;
2275
 
  List<Item> field_list;
2276
 
  Item *item;
2277
 
 
2278
 
  field_list.push_back(item = new Item_empty_string("Table", NAME_LEN*2));
2279
 
  item->maybe_null= 1;
2280
 
  field_list.push_back(item= new Item_int("Checksum", (int64_t) 1,
2281
 
                                          MY_INT64_NUM_DECIMAL_DIGITS));
2282
 
  item->maybe_null= 1;
2283
 
  if (session->client->sendFields(&field_list))
2284
 
    return true;
2285
 
 
2286
 
  /* Open one table after the other to keep lock time as short as possible. */
2287
 
  for (table= tables; table; table= table->next_local)
2288
 
  {
2289
 
    char table_name[NAME_LEN*2+2];
2290
 
    Table *t;
2291
 
 
2292
 
    snprintf(table_name, sizeof(table_name), "%s.%s",table->db,table->table_name);
2293
 
 
2294
 
    t= table->table= session->openTableLock(table, TL_READ);
2295
 
    session->clear_error();                     // these errors shouldn't get client
2296
 
 
2297
 
    session->client->store(table_name);
2298
 
 
2299
 
    if (!t)
2300
 
    {
2301
 
      /* Table didn't exist */
2302
 
      session->client->store();
2303
 
      session->clear_error();
2304
 
    }
2305
 
    else
2306
 
    {
2307
 
      /**
2308
 
        @note if the engine keeps a checksum then we return the checksum, otherwise we calculate
2309
 
      */
2310
 
      if (t->cursor->getEngine()->check_flag(HTON_BIT_HAS_CHECKSUM))
2311
 
      {
2312
 
        session->client->store((uint64_t)t->cursor->checksum());
2313
 
      }
2314
 
      else
2315
 
      {
2316
 
        /* calculating table's checksum */
2317
 
        internal::ha_checksum crc= 0;
2318
 
        unsigned char null_mask=256 -  (1 << t->s->last_null_bit_pos);
2319
 
 
2320
 
        t->use_all_columns();
2321
 
 
2322
 
        if (t->cursor->ha_rnd_init(1))
2323
 
          session->client->store();
2324
 
        else
2325
 
        {
2326
 
          for (;;)
2327
 
          {
2328
 
            internal::ha_checksum row_crc= 0;
2329
 
            int error= t->cursor->rnd_next(t->record[0]);
2330
 
            if (unlikely(error))
2331
 
            {
2332
 
              if (error == HA_ERR_RECORD_DELETED)
2333
 
                continue;
2334
 
              break;
2335
 
            }
2336
 
            if (t->s->null_bytes)
2337
 
            {
2338
 
              /* fix undefined null bits */
2339
 
              t->record[0][t->s->null_bytes-1] |= null_mask;
2340
 
              if (!(t->s->db_create_options & HA_OPTION_PACK_RECORD))
2341
 
                t->record[0][0] |= 1;
2342
 
 
2343
 
              row_crc= internal::my_checksum(row_crc, t->record[0], t->s->null_bytes);
2344
 
            }
2345
 
 
2346
 
            for (uint32_t i= 0; i < t->s->fields; i++ )
2347
 
            {
2348
 
              Field *f= t->field[i];
2349
 
              if ((f->type() == DRIZZLE_TYPE_BLOB) ||
2350
 
                  (f->type() == DRIZZLE_TYPE_VARCHAR))
2351
 
              {
2352
 
                String tmp;
2353
 
                f->val_str(&tmp);
2354
 
                row_crc= internal::my_checksum(row_crc, (unsigned char*) tmp.ptr(), tmp.length());
2355
 
              }
2356
 
              else
2357
 
                row_crc= internal::my_checksum(row_crc, f->ptr,
2358
 
                                     f->pack_length());
2359
 
            }
2360
 
 
2361
 
            crc+= row_crc;
2362
 
          }
2363
 
          session->client->store((uint64_t)crc);
2364
 
          t->cursor->ha_rnd_end();
2365
 
        }
2366
 
      }
2367
 
      session->clear_error();
2368
 
      session->close_thread_tables();
2369
 
      table->table=0;                           // For query cache
2370
 
    }
2371
 
    if (session->client->flush())
2372
 
      goto err;
2373
 
  }
2374
 
 
2375
 
  session->my_eof();
2376
 
  return(false);
2377
 
 
2378
 
 err:
2379
 
  session->close_thread_tables();                       // Shouldn't be needed
2380
 
  if (table)
2381
 
    table->table=0;
2382
 
  return(true);
2383
 
}
2384
 
 
2385
2208
} /* namespace drizzled */