~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/db.cc

  • Committer: Stewart Smith
  • Date: 2010-02-26 05:33:19 UTC
  • mfrom: (1283.13.5)
  • mto: (1283.11.6)
  • mto: This revision was merged to the branch mainline in revision 1449.
  • Revision ID: stewart@flamingspork.com-20100226053319-6dnlmghm0vq320ch
Merged embedded-innodb-init into embedded-innodb-dump-datadict-func.

Show diffs side-by-side

added added

removed removed

Lines of Context:
38
38
#include <drizzled/replication_services.h>
39
39
#include <drizzled/message/schema.pb.h>
40
40
#include "drizzled/sql_table.h"
41
 
#include "drizzled/plugin/info_schema_table.h"
 
41
#include "drizzled/plugin/storage_engine.h"
42
42
#include "drizzled/global_charset_info.h"
43
43
#include "drizzled/pthread_globals.h"
44
44
#include "drizzled/charset.h"
45
45
 
46
46
#include "drizzled/internal/my_sys.h"
47
47
 
48
 
#define MY_DB_OPT_FILE "db.opt"
49
48
#define MAX_DROP_TABLE_Q_LEN      1024
50
49
 
51
50
using namespace std;
53
52
namespace drizzled
54
53
{
55
54
 
56
 
const string del_exts[]= {".dfe", ".blk", ".arz", ".BAK", ".TMD", ".opt"};
 
55
const string del_exts[]= {".dfe", ".blk", ".arz", ".BAK", ".TMD"};
57
56
static set<string> deletable_extentions(del_exts, &del_exts[sizeof(del_exts)/sizeof(del_exts[0])]);
58
57
 
59
58
 
62
61
                                 TableList **dropped_tables);
63
62
static void mysql_change_db_impl(Session *session, LEX_STRING *new_db_name);
64
63
 
65
 
/**
66
 
  Return default database collation.
67
 
 
68
 
  @param session     Thread context.
69
 
  @param db_name Database name.
70
 
 
71
 
  @return CHARSET_INFO object. The operation always return valid character
72
 
    set, even if the database does not exist.
73
 
*/
74
 
 
75
 
const CHARSET_INFO *get_default_db_collation(const char *db_name)
76
 
{
77
 
  message::Schema db;
78
 
 
79
 
  get_database_metadata(db_name, db);
80
 
 
81
 
  /* If for some reason the db.opt file lacks a collation,
82
 
     we just return the default */
83
 
 
84
 
  if (db.has_collation())
85
 
  {
86
 
    const string buffer= db.collation();
87
 
    const CHARSET_INFO* cs= get_charset_by_name(buffer.c_str());
88
 
 
89
 
    if (!cs)
90
 
    {
91
 
      errmsg_printf(ERRMSG_LVL_ERROR,
92
 
                    _("Error while loading database options: '%s':"),db_name);
93
 
      errmsg_printf(ERRMSG_LVL_ERROR, ER(ER_UNKNOWN_COLLATION), buffer.c_str());
94
 
 
95
 
      return default_charset_info;
96
 
    }
97
 
 
98
 
    return cs;
99
 
  }
100
 
 
101
 
  return default_charset_info;
102
 
}
103
 
 
104
 
/* path is path to database, not schema file */
105
 
static int write_schema_file(const char *path, const message::Schema &db)
106
 
{
107
 
  char schema_file_tmp[FN_REFLEN];
108
 
  string schema_file(path);
109
 
 
110
 
  snprintf(schema_file_tmp, FN_REFLEN, "%s%c%s.tmpXXXXXX", path, FN_LIBCHAR, MY_DB_OPT_FILE);
111
 
 
112
 
  schema_file.append(1, FN_LIBCHAR);
113
 
  schema_file.append(MY_DB_OPT_FILE);
114
 
 
115
 
  int fd= mkstemp(schema_file_tmp);
116
 
 
117
 
  if (fd==-1)
118
 
    return errno;
119
 
 
120
 
 
121
 
  if (!db.SerializeToFileDescriptor(fd))
122
 
  {
123
 
    close(fd);
124
 
    unlink(schema_file_tmp);
125
 
    return -1;
126
 
  }
127
 
 
128
 
  if (rename(schema_file_tmp, schema_file.c_str()) == -1)
129
 
  {
130
 
    close(fd);
131
 
    return errno;
132
 
  }
133
 
  close(fd);
134
 
 
135
 
  return 0;
136
 
}
137
 
 
138
 
int get_database_metadata(const std::string &dbname, message::Schema &schema_message)
139
 
{
140
 
  int rc= 0;
141
 
  char db_opt_path[FN_REFLEN];
142
 
  size_t length;
143
 
 
144
 
  /*
145
 
    Pass an empty file name, and the database options file name as extension
146
 
    to avoid table name to file name encoding.
147
 
  */
148
 
  length= build_table_filename(db_opt_path, sizeof(db_opt_path),
149
 
                               dbname.c_str(), "", false);
150
 
  strcpy(db_opt_path + length, MY_DB_OPT_FILE);
151
 
 
152
 
  int fd= open(db_opt_path, O_RDONLY);
153
 
 
154
 
  if (fd == -1 && errno != ENOENT)
155
 
    rc= errno;
156
 
 
157
 
  /**
158
 
    @note If parsing fails, either someone has done a "mkdir" or has deleted their opt file.
159
 
    So what do we do? We muddle through the adventure by generating 
160
 
    one with a name in it, and the charset set to the default.
161
 
  */
162
 
  if (fd == -1 || not schema_message.ParseFromFileDescriptor(fd))
163
 
  {
164
 
    struct stat directory_stat_buffer;
165
 
 
166
 
    /* Remove the opt file name and see if we can just open up the directory. */
167
 
    db_opt_path[length]= 0;
168
 
    if (lstat(db_opt_path, &directory_stat_buffer))
169
 
    {
170
 
      rc= errno;
171
 
    }
172
 
    else if (not S_ISDIR(directory_stat_buffer.st_mode))
173
 
    {
174
 
      rc= -1;
175
 
    }
176
 
    else
177
 
    {
178
 
      schema_message.set_name(dbname);
179
 
      rc= 0;
180
 
    }
181
 
 
182
 
#if 0 //@todo fill this in with something totally acceptable
183
 
    schema_message.set_collation("utf8_general_ci"); 
184
 
#endif
185
 
  }
186
 
 
187
 
  if (fd != -1)
188
 
    close(fd);
189
 
 
190
 
  return rc;
191
 
}
192
 
 
193
64
/*
194
65
  Create a database
195
66
 
211
82
 
212
83
*/
213
84
 
214
 
bool mysql_create_db(Session *session, const char *db, message::Schema *schema_message, bool is_if_not_exists)
 
85
bool mysql_create_db(Session *session, const message::Schema &schema_message, bool is_if_not_exists)
215
86
{
216
87
  ReplicationServices &replication_services= ReplicationServices::singleton();
217
 
  long result= 1;
218
 
  int error_erno;
219
88
  bool error= false;
220
89
 
221
 
  /* do not create 'information_schema' db */
222
 
  if (!my_strcasecmp(system_charset_info, db, INFORMATION_SCHEMA_NAME.c_str()))
223
 
  {
224
 
    my_error(ER_DB_CREATE_EXISTS, MYF(0), db);
225
 
    return(-1);
226
 
  }
227
 
 
228
 
  schema_message->set_name(db);
229
 
 
230
90
  /*
231
91
    Do not create database if another thread is holding read lock.
232
92
    Wait for global read lock before acquiring LOCK_create_db.
241
101
  */
242
102
  if (wait_if_global_read_lock(session, 0, 1))
243
103
  {
244
 
    error= true;
245
 
    goto exit2;
 
104
    return false;
246
105
  }
247
106
 
 
107
  assert(schema_message.has_name());
 
108
  assert(schema_message.has_collation());
 
109
 
 
110
  // @todo push this lock down into the engine
248
111
  pthread_mutex_lock(&LOCK_create_db);
249
112
 
250
 
  /* check directory */
251
 
  char   path[FN_REFLEN+16];
252
 
  uint32_t path_len;
253
 
  path_len= build_table_filename(path, sizeof(path), db, "", false);
254
 
  path[path_len-1]= 0;                    // remove last '/' from path
255
 
 
256
 
  if (mkdir(path, 0777) == -1)
 
113
  // Check to see if it exists already.
 
114
  if (plugin::StorageEngine::doesSchemaExist(schema_message.name()))
257
115
  {
258
 
    if (errno == EEXIST)
259
 
    {
260
 
      if (! is_if_not_exists)
261
 
      {
262
 
        my_error(ER_DB_CREATE_EXISTS, MYF(0), path);
263
 
        error= true;
264
 
        goto exit;
265
 
      }
 
116
    if (not is_if_not_exists)
 
117
    {
 
118
      my_error(ER_DB_CREATE_EXISTS, MYF(0), schema_message.name().c_str());
 
119
      error= true;
 
120
    }
 
121
    else
 
122
    {
266
123
      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
267
 
                          ER_DB_CREATE_EXISTS, ER(ER_DB_CREATE_EXISTS),
268
 
                          path);
 
124
                          ER_DB_CREATE_EXISTS, ER(ER_DB_CREATE_EXISTS),
 
125
                          schema_message.name().c_str());
269
126
      session->my_ok();
270
 
      error= false;
271
 
      goto exit;
272
 
    }
273
 
 
274
 
    my_error(ER_CANT_CREATE_DB, MYF(0), path, errno);
275
 
    error= true;
276
 
    goto exit;
277
 
  }
278
 
 
279
 
  error_erno= write_schema_file(path, *schema_message);
280
 
  if (error_erno && error_erno != EEXIST)
281
 
  {
282
 
    if (rmdir(path) >= 0)
283
 
    {
284
 
      error= true;
285
 
      goto exit;
286
 
    }
287
 
  }
288
 
  else if (error_erno)
289
 
    error= true;
290
 
 
291
 
  replication_services.rawStatement(session, session->query, session->query_length);
292
 
  session->my_ok(result);
293
 
 
294
 
exit:
 
127
    }
 
128
  }
 
129
  else if (not plugin::StorageEngine::createSchema(schema_message)) // Try to create it 
 
130
  {
 
131
    my_error(ER_CANT_CREATE_DB, MYF(0), schema_message.name().c_str(), errno);
 
132
    error= true;
 
133
  }
 
134
  else // Created !
 
135
  {
 
136
    replication_services.rawStatement(session, session->query);
 
137
    session->my_ok(1);
 
138
  }
 
139
 
295
140
  pthread_mutex_unlock(&LOCK_create_db);
296
141
  start_waiting_global_read_lock(session);
297
 
exit2:
 
142
 
298
143
  return error;
299
144
}
300
145
 
301
146
 
302
147
/* db-name is already validated when we come here */
303
148
 
304
 
bool mysql_alter_db(Session *session, const char *db, message::Schema *schema_message)
 
149
bool mysql_alter_db(Session *session, const message::Schema &schema_message)
305
150
{
306
151
  ReplicationServices &replication_services= ReplicationServices::singleton();
307
 
  long result=1;
308
 
  int error= 0;
309
 
  char   path[FN_REFLEN+16];
310
 
  uint32_t path_len;
311
152
 
312
153
  /*
313
154
    Do not alter database if another thread is holding read lock.
321
162
    has the global read lock and refuses the operation with
322
163
    ER_CANT_UPDATE_WITH_READLOCK if applicable.
323
164
  */
324
 
  if ((error=wait_if_global_read_lock(session,0,1)))
325
 
    goto exit;
326
 
 
327
 
  assert(schema_message);
328
 
 
329
 
  schema_message->set_name(db);
 
165
  if ((wait_if_global_read_lock(session, 0, 1)))
 
166
    return false;
330
167
 
331
168
  pthread_mutex_lock(&LOCK_create_db);
332
169
 
 
170
  if (not plugin::StorageEngine::doesSchemaExist(schema_message.name()))
 
171
  {
 
172
    my_error(ER_SCHEMA_DOES_NOT_EXIST, MYF(0), schema_message.name().c_str());
 
173
    return false;
 
174
  }
 
175
 
333
176
  /* Change options if current database is being altered. */
334
 
  path_len= build_table_filename(path, sizeof(path), db, "", false);
335
 
  path[path_len-1]= 0;                    // Remove last '/' from path
336
 
 
337
 
  error= write_schema_file(path, *schema_message);
338
 
  if (error && error != EEXIST)
339
 
  {
340
 
    /* TODO: find some witty way of getting back an error message */
341
 
    pthread_mutex_unlock(&LOCK_create_db);
342
 
    goto exit;
343
 
  }
344
 
 
345
 
  replication_services.rawStatement(session, session->getQueryString(), session->getQueryLength());
346
 
  session->my_ok(result);
 
177
  bool success= plugin::StorageEngine::alterSchema(schema_message);
 
178
 
 
179
  if (success)
 
180
  {
 
181
    replication_services.rawStatement(session, session->getQueryString());
 
182
    session->my_ok(1);
 
183
  }
 
184
  else
 
185
  {
 
186
    my_error(ER_ALTER_SCHEMA, MYF(0), schema_message.name().c_str());
 
187
  }
347
188
 
348
189
  pthread_mutex_unlock(&LOCK_create_db);
349
190
  start_waiting_global_read_lock(session);
350
 
exit:
351
 
  return error ? true : false;
 
191
 
 
192
  return success;
352
193
}
353
194
 
354
195
 
377
218
  uint32_t length;
378
219
  TableList *dropped_tables= NULL;
379
220
 
380
 
  if (db && (strcmp(db, "information_schema") == 0))
381
 
  {
382
 
    my_error(ER_DBACCESS_DENIED_ERROR, MYF(0), "", "", INFORMATION_SCHEMA_NAME.c_str());
383
 
    return true;
384
 
  }
385
 
 
386
221
  /*
387
222
    Do not drop database if another thread is holding read lock.
388
223
    Wait for global read lock before acquiring LOCK_create_db.
404
239
 
405
240
  length= build_table_filename(path, sizeof(path),
406
241
                               db, "", false);
407
 
  strcpy(path+length, MY_DB_OPT_FILE);         // Append db option file name
408
 
  unlink(path);
409
242
  path[length]= '\0';                           // Remove file name
410
243
 
411
244
  /* See if the directory exists */
441
274
  }
442
275
  if (deleted >= 0)
443
276
  {
444
 
    const char *query;
445
 
    uint32_t query_length;
446
 
 
447
 
    assert(session->query);
448
 
 
449
 
    query= session->query;
450
 
    query_length= session->query_length;
 
277
    assert(! session->query.empty());
451
278
 
452
279
    ReplicationServices &replication_services= ReplicationServices::singleton();
453
 
    replication_services.rawStatement(session, session->getQueryString(), session->getQueryLength());
 
280
    replication_services.rawStatement(session, session->getQueryString());
454
281
    session->clear_error();
455
282
    session->server_status|= SERVER_STATUS_DB_DROPPED;
456
283
    session->my_ok((uint32_t) deleted);
478
305
      if (query_pos + tbl_name_len + 1 >= query_end)
479
306
      {
480
307
        /* These DDL methods and logging protected with LOCK_create_db */
481
 
        replication_services.rawStatement(session, query, (size_t) (query_pos -1 - query));
 
308
        replication_services.rawStatement(session, query);
482
309
        query_pos= query_data_start;
483
310
      }
484
311
 
491
318
    if (query_pos != query_data_start)
492
319
    {
493
320
      /* These DDL methods and logging protected with LOCK_create_db */
494
 
      replication_services.rawStatement(session, query, (size_t) (query_pos -1 - query));
 
321
      replication_services.rawStatement(session, query);
495
322
    }
496
323
  }
497
324
 
502
329
    SELECT DATABASE() in the future). For this we free() session->db and set
503
330
    it to 0.
504
331
  */
505
 
  if (! session->db.empty() && session->db.compare(db) == 0)
 
332
  if (not session->db.empty() && session->db.compare(db) == 0)
506
333
    mysql_change_db_impl(session, NULL);
507
334
  pthread_mutex_unlock(&LOCK_create_db);
508
335
  start_waiting_global_read_lock(session);
749
576
  if (dropped_tables)
750
577
    *dropped_tables= tot_list;
751
578
 
752
 
  if (rmdir(org_path))
 
579
  if (not plugin::StorageEngine::dropSchema(db))
753
580
  {
754
 
    my_error(ER_DB_DROP_RMDIR, MYF(0), org_path, errno);
 
581
    my_error(ER_DROP_SCHEMA, MYF(0), db.c_str());
755
582
    return -1;
756
583
  }
757
584
 
820
647
    @retval true  Error
821
648
*/
822
649
 
823
 
bool mysql_change_db(Session *session, const LEX_STRING *new_db_name, bool force_switch)
 
650
bool mysql_change_db(Session *session, const std::string &new_db_name, bool force_switch)
824
651
{
825
652
  LEX_STRING new_db_file_name;
826
653
  const CHARSET_INFO *db_default_cl;
827
654
 
828
 
  assert(new_db_name);
829
 
  assert(new_db_name->length);
830
 
 
831
 
  if (my_strcasecmp(system_charset_info, new_db_name->str,
832
 
                    INFORMATION_SCHEMA_NAME.c_str()) == 0)
833
 
  {
834
 
    /* Switch the current database to INFORMATION_SCHEMA. */
835
 
    /* const_cast<> is safe here: mysql_change_db_impl does a copy */
836
 
    LEX_STRING is_name= { const_cast<char *>(INFORMATION_SCHEMA_NAME.c_str()),
837
 
                          INFORMATION_SCHEMA_NAME.length() };
838
 
    mysql_change_db_impl(session, &is_name);
839
 
 
840
 
    return false;
841
 
  }
 
655
  assert(not new_db_name.empty());
 
656
 
842
657
  /*
843
658
    Now we need to make a copy because check_db_name requires a
844
659
    non-constant argument. Actually, it takes database file name.
846
661
    TODO: fix check_db_name().
847
662
  */
848
663
 
849
 
  new_db_file_name.length= new_db_name->length;
850
 
  new_db_file_name.str= (char *)malloc(new_db_name->length + 1);
 
664
  new_db_file_name.length= new_db_name.length();
 
665
  new_db_file_name.str= (char *)malloc(new_db_name.length() + 1);
851
666
  if (new_db_file_name.str == NULL)
852
667
    return true;                             /* the error is set */
853
 
  memcpy(new_db_file_name.str, new_db_name->str, new_db_name->length);
854
 
  new_db_file_name.str[new_db_name->length]= 0;
 
668
  memcpy(new_db_file_name.str, new_db_name.c_str(), new_db_name.length());
 
669
  new_db_file_name.str[new_db_name.length()]= 0;
855
670
 
856
671
 
857
672
  /*
874
689
    return true;
875
690
  }
876
691
 
877
 
  if (check_db_dir_existence(new_db_file_name.str))
 
692
  if (not plugin::StorageEngine::doesSchemaExist(new_db_file_name.str))
878
693
  {
879
694
    if (force_switch)
880
695
    {
907
722
    }
908
723
  }
909
724
 
910
 
  db_default_cl= get_default_db_collation(new_db_file_name.str);
 
725
  db_default_cl= plugin::StorageEngine::getSchemaCollation(new_db_file_name.str);
911
726
 
912
727
  mysql_change_db_impl(session, &new_db_file_name);
913
728
  free(new_db_file_name.str);
915
730
  return false;
916
731
}
917
732
 
918
 
/*
919
 
  Check if there is directory for the database name.
920
 
 
921
 
  SYNOPSIS
922
 
    check_db_dir_existence()
923
 
    db_name   database name
924
 
 
925
 
  RETURN VALUES
926
 
    false   There is directory for the specified database name.
927
 
    true    The directory does not exist.
928
 
*/
929
 
 
930
 
bool check_db_dir_existence(const char *db_name)
931
 
{
932
 
  char db_dir_path[FN_REFLEN];
933
 
  uint32_t db_dir_path_len;
934
 
 
935
 
  db_dir_path_len= build_table_filename(db_dir_path, sizeof(db_dir_path),
936
 
                                        db_name, "", false);
937
 
 
938
 
  if (db_dir_path_len && db_dir_path[db_dir_path_len - 1] == FN_LIBCHAR)
939
 
    db_dir_path[db_dir_path_len - 1]= 0;
940
 
 
941
 
  /* Check access. */
942
 
 
943
 
  return access(db_dir_path, F_OK);
944
 
}
945
 
 
946
733
/**
947
734
  @brief Internal implementation: switch current database to a valid one.
948
735