17
17
/* create and drop of databases */
18
#include <drizzled/global.h>
20
20
#include CINTTYPES_H
23
#include <drizzled/serialize/serialize.h>
23
#include <drizzled/message/schema.pb.h>
24
24
using namespace std;
25
25
#include <drizzled/server_includes.h>
26
26
#include <mysys/mysys_err.h>
27
27
#include <mysys/my_dir.h>
29
28
#include <drizzled/error.h>
30
29
#include <drizzled/gettext.h>
30
#include <mysys/hash.h>
31
#include <drizzled/session.h>
32
#include <drizzled/db.h>
33
#include <drizzled/sql_base.h>
34
#include <drizzled/lock.h>
35
#include <drizzled/errmsg_print.h>
36
#include <drizzled/replication_services.h>
37
#include <drizzled/message/schema.pb.h>
39
using namespace drizzled;
41
#define MY_DB_OPT_FILE "db.opt"
33
42
#define MAX_DROP_TABLE_Q_LEN 1024
35
const char *del_exts[]= {".frm", ".BAK", ".TMD",".opt", NULL};
44
const char *del_exts[]= {".dfe", ".BAK", ".TMD",".opt", NULL};
36
45
static TYPELIB deletable_extentions=
37
46
{array_elements(del_exts)-1,"del_exts", del_exts, NULL};
39
48
static long mysql_rm_known_files(Session *session, MY_DIR *dirp,
40
const char *db, const char *path, uint32_t level,
49
const char *db, const char *path,
41
50
TableList **dropped_tables);
43
static bool rm_dir_w_symlink(const char *org_path, bool send_error);
44
static void mysql_change_db_impl(Session *session,
45
LEX_STRING *new_db_name,
46
const CHARSET_INFO * const new_db_charset);
49
/* Database lock hash */
51
pthread_mutex_t LOCK_lock_db;
52
int creating_database= 0; // how many database locks are made
55
/* Structure for database lock */
56
typedef struct my_dblock_st
58
char *name; /* Database name */
59
uint32_t name_length; /* Database length name */
67
extern "C" unsigned char* lock_db_get_key(my_dblock_t *, size_t *, bool not_used);
69
unsigned char* lock_db_get_key(my_dblock_t *ptr, size_t *length,
70
bool not_used __attribute__((unused)))
72
*length= ptr->name_length;
73
return (unsigned char*) ptr->name;
78
Free lock_db hash element.
81
extern "C" void lock_db_free_element(void *ptr);
83
void lock_db_free_element(void *ptr)
90
Delete a database lock entry from hash.
93
void lock_db_delete(const char *name, uint32_t length)
96
safe_mutex_assert_owner(&LOCK_lock_db);
97
if ((opt= (my_dblock_t *)hash_search(&lock_db_cache,
98
(const unsigned char*) name, length)))
99
hash_delete(&lock_db_cache, (unsigned char*) opt);
103
/* Database options hash */
104
static HASH dboptions;
105
static bool dboptions_init= 0;
106
static rw_lock_t LOCK_dboptions;
108
/* Structure for database options */
109
typedef struct my_dbopt_st
111
char *name; /* Database name */
112
uint32_t name_length; /* Database length name */
113
const CHARSET_INFO *charset; /* Database default character set */
118
Function we use in the creation of our hash to get key.
121
extern "C" unsigned char* dboptions_get_key(my_dbopt_t *opt, size_t *length,
124
unsigned char* dboptions_get_key(my_dbopt_t *opt, size_t *length,
125
bool not_used __attribute__((unused)))
127
*length= opt->name_length;
128
return (unsigned char*) opt->name;
133
Helper function to write a query to binlog used by mysql_rm_db()
136
static inline void write_to_binlog(Session *session, char *query, uint32_t q_len,
137
char *db, uint32_t db_len)
139
Query_log_event qinfo(session, query, q_len, 0, 0);
142
qinfo.db_len= db_len;
143
mysql_bin_log.write(&qinfo);
148
Function to free dboptions hash element
151
extern "C" void free_dbopt(void *dbopt);
153
void free_dbopt(void *dbopt)
155
free((unsigned char*) dbopt);
160
Initialize database option hash and locked database hash.
166
Must be called before any other database function is called.
173
bool my_database_names_init(void)
176
(void) my_rwlock_init(&LOCK_dboptions, NULL);
180
error= hash_init(&dboptions, lower_case_table_names ?
181
&my_charset_bin : system_charset_info,
182
32, 0, 0, (hash_get_key) dboptions_get_key,
184
hash_init(&lock_db_cache, lower_case_table_names ?
185
&my_charset_bin : system_charset_info,
186
32, 0, 0, (hash_get_key) lock_db_get_key,
187
lock_db_free_element,0);
196
Free database option hash and locked databases hash.
199
void my_database_names_free(void)
204
hash_free(&dboptions);
205
(void) rwlock_destroy(&LOCK_dboptions);
206
hash_free(&lock_db_cache);
212
Cleanup cached options
215
void my_dbopt_cleanup(void)
217
rw_wrlock(&LOCK_dboptions);
218
hash_free(&dboptions);
219
hash_init(&dboptions, lower_case_table_names ?
220
&my_charset_bin : system_charset_info,
221
32, 0, 0, (hash_get_key) dboptions_get_key,
223
rw_unlock(&LOCK_dboptions);
228
Find database options in the hash.
231
Search a database options in the hash, usings its path.
232
Fills "create" on success.
239
static bool get_dbopt(const char *dbname, HA_CREATE_INFO *create)
245
length= (uint) strlen(dbname);
247
rw_rdlock(&LOCK_dboptions);
248
if ((opt= (my_dbopt_t*) hash_search(&dboptions, (unsigned char*) dbname, length)))
250
create->default_table_charset= opt->charset;
253
rw_unlock(&LOCK_dboptions);
259
Writes database options into the hash.
262
Inserts database options into the hash, or updates
263
options if they are already in the hash.
270
static bool put_dbopt(const char *dbname, HA_CREATE_INFO *create)
276
length= (uint) strlen(dbname);
278
rw_wrlock(&LOCK_dboptions);
279
if (!(opt= (my_dbopt_t*) hash_search(&dboptions, (unsigned char*) dbname, length)))
281
/* Options are not in the hash, insert them */
283
if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
284
&opt, (uint) sizeof(*opt), &tmp_name, (uint) length+1,
292
my_stpcpy(opt->name, dbname);
293
opt->name_length= length;
295
if ((error= my_hash_insert(&dboptions, (unsigned char*) opt)))
302
/* Update / write options in hash */
303
opt->charset= create->default_table_charset;
306
rw_unlock(&LOCK_dboptions);
312
Deletes database options from the hash.
315
void del_dbopt(const char *path)
318
rw_wrlock(&LOCK_dboptions);
319
if ((opt= (my_dbopt_t *)hash_search(&dboptions, (const unsigned char*) path,
321
hash_delete(&dboptions, (unsigned char*) opt);
322
rw_unlock(&LOCK_dboptions);
327
Create database options file:
330
Currently database default charset is only stored there.
334
1 Could not create file or write to it. Error sent through my_error()
337
static bool write_db_opt(Session *session, const char *path, const char *name, HA_CREATE_INFO *create)
52
static void mysql_change_db_impl(Session *session, LEX_STRING *new_db_name);
56
Return default database collation.
58
@param session Thread context.
59
@param db_name Database name.
61
@return CHARSET_INFO object. The operation always return valid character
62
set, even if the database does not exist.
65
const CHARSET_INFO *get_default_db_collation(const char *db_name)
69
get_database_metadata(db_name, &db);
71
/* If for some reason the db.opt file lacks a collation,
72
we just return the default */
74
if (db.has_collation())
76
const string buffer= db.collation();
77
const CHARSET_INFO* cs= get_charset_by_name(buffer.c_str());
81
errmsg_printf(ERRMSG_LVL_ERROR,
82
_("Error while loading database options: '%s':"),db_name);
83
errmsg_printf(ERRMSG_LVL_ERROR, ER(ER_UNKNOWN_COLLATION), buffer.c_str());
85
return default_charset_info;
91
return default_charset_info;
94
/* path is path to database, not schema file */
95
static int write_schema_file(Session *session,
96
const char *path, const char *name,
97
HA_CREATE_INFO *create)
100
char schema_file_tmp[FN_REFLEN];
101
string schema_file(path);
107
snprintf(schema_file_tmp, FN_REFLEN, "%s%c%s.tmpXXXXXX", path, FN_LIBCHAR, MY_DB_OPT_FILE);
109
schema_file.append(1, FN_LIBCHAR);
110
schema_file.append(MY_DB_OPT_FILE);
112
int fd= mkstemp(schema_file_tmp);
344
117
if (!create->default_table_charset)
345
118
create->default_table_charset= session->variables.collation_server;
347
if (put_dbopt(path, create))
350
120
db.set_name(name);
351
db.set_characterset(create->default_table_charset->csname);
352
121
db.set_collation(create->default_table_charset->name);
354
fstream output(path, ios::out | ios::trunc | ios::binary);
355
if (!db.SerializeToOstream(&output))
363
Load database options file
366
path Path for option file
367
create Where to store the read options
373
1 No database file or could not open it
377
bool load_db_opt(Session *session, const char *path, HA_CREATE_INFO *create)
383
memset(create, 0, sizeof(*create));
384
create->default_table_charset= session->variables.collation_server;
386
/* Check if options for this database are already in the hash */
387
if (!get_dbopt(path, create))
390
fstream input(path, ios::in | ios::binary);
393
else if (!db.ParseFromIstream(&input))
396
buffer= db.characterset();
397
if (!(create->default_table_charset= get_charset_by_csname(buffer.c_str(), MY_CS_PRIMARY, MYF(0))))
399
sql_print_error(_("Error while loading database options: '%s':"),path);
400
sql_print_error(ER(ER_UNKNOWN_COLLATION), buffer.c_str());
401
create->default_table_charset= default_charset_info;
404
buffer= db.collation();
405
if (!(create->default_table_charset= get_charset_by_name(buffer.c_str(), MYF(0))))
407
sql_print_error(_("Error while loading database options: '%s':"),path);
408
sql_print_error(ER(ER_UNKNOWN_COLLATION), buffer.c_str());
409
create->default_table_charset= default_charset_info;
413
Put the loaded value into the hash.
414
Note that another thread could've added the same
415
entry to the hash after we called get_dbopt(),
416
but it's not an error, as put_dbopt() takes this
417
possibility into account.
419
error= put_dbopt(path, create);
427
Retrieve database options by name. Load database options file or fetch from
431
load_db_opt_by_name()
432
db_name Database name
433
db_create_info Where to store the database options
436
load_db_opt_by_name() is a shortcut for load_db_opt().
439
Although load_db_opt_by_name() (and load_db_opt()) returns status of
440
the operation, it is useless usually and should be ignored. The problem
441
is that there are 1) system databases ("mysql") and 2) virtual
442
databases ("information_schema"), which do not contain options file.
443
So, load_db_opt[_by_name]() returns false for these databases, but this
446
load_db_opt[_by_name]() clears db_create_info structure in any case, so
447
even on failure it contains valid data. So, common use case is just
448
call load_db_opt[_by_name]() without checking return value and use
449
db_create_info right after that.
451
RETURN VALUES (read NOTE!)
453
true Failed to retrieve options
456
bool load_db_opt_by_name(Session *session, const char *db_name,
457
HA_CREATE_INFO *db_create_info)
123
if (!db.SerializeToFileDescriptor(fd))
126
unlink(schema_file_tmp);
130
if (rename(schema_file_tmp, schema_file.c_str()) == -1)
140
int get_database_metadata(const char *dbname, message::Schema *db)
459
142
char db_opt_path[FN_REFLEN];
462
146
Pass an empty file name, and the database options file name as extension
463
147
to avoid table name to file name encoding.
465
(void) build_table_filename(db_opt_path, sizeof(db_opt_path),
466
db_name, "", MY_DB_OPT_FILE, 0);
468
return load_db_opt(session, db_opt_path, db_create_info);
473
Return default database collation.
475
@param session Thread context.
476
@param db_name Database name.
478
@return CHARSET_INFO object. The operation always return valid character
479
set, even if the database does not exist.
482
const CHARSET_INFO *get_default_db_collation(Session *session, const char *db_name)
484
HA_CREATE_INFO db_info;
486
if (session->db != NULL && strcmp(db_name, session->db) == 0)
487
return session->db_charset;
489
load_db_opt_by_name(session, db_name, &db_info);
492
NOTE: even if load_db_opt_by_name() fails,
493
db_info.default_table_charset contains valid character set
494
(collation_server). We should not fail if load_db_opt_by_name() fails,
495
because it is valid case. If a database has been created just by
496
"mkdir", it does not contain db.opt file, but it is valid database.
499
return db_info.default_table_charset;
149
length= build_table_filename(db_opt_path, sizeof(db_opt_path),
151
strcpy(db_opt_path + length, MY_DB_OPT_FILE);
153
int fd= open(db_opt_path, O_RDONLY);
158
if (!db->ParseFromFileDescriptor(fd))
504
169
Create a database
545
207
Do not create database if another thread is holding read lock.
546
Wait for global read lock before acquiring LOCK_mysql_create_db.
208
Wait for global read lock before acquiring LOCK_create_db.
547
209
After wait_if_global_read_lock() we have protection against another
548
global read lock. If we would acquire LOCK_mysql_create_db first,
210
global read lock. If we would acquire LOCK_create_db first,
549
211
another thread could step in and get the global read lock before we
550
212
reach wait_if_global_read_lock(). If this thread tries the same as we
551
(admin a db), it would then go and wait on LOCK_mysql_create_db...
213
(admin a db), it would then go and wait on LOCK_create_db...
552
214
Furthermore wait_if_global_read_lock() checks if the current thread
553
215
has the global read lock and refuses the operation with
554
216
ER_CANT_UPDATE_WITH_READLOCK if applicable.
556
218
if (wait_if_global_read_lock(session, 0, 1))
562
pthread_mutex_lock(&LOCK_mysql_create_db);
224
pthread_mutex_lock(&LOCK_create_db);
564
226
/* Check directory */
565
path_len= build_table_filename(path, sizeof(path), db, "", "", 0);
227
path_len= build_table_filename(path, sizeof(path), db, "", false);
566
228
path[path_len-1]= 0; // Remove last '/' from path
568
if (!stat(path,&stat_info))
230
if (mkdir(path,0777) == -1)
570
if (!(create_options & HA_LEX_CREATE_IF_NOT_EXISTS))
572
my_error(ER_DB_CREATE_EXISTS, MYF(0), db);
234
if (!(create_options & HA_LEX_CREATE_IF_NOT_EXISTS))
236
my_error(ER_DB_CREATE_EXISTS, MYF(0), db);
240
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
241
ER_DB_CREATE_EXISTS, ER(ER_DB_CREATE_EXISTS), db);
576
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
577
ER_DB_CREATE_EXISTS, ER(ER_DB_CREATE_EXISTS), db);
247
my_error(ER_CANT_CREATE_DB, MYF(0), db, my_errno);
587
my_error(EE_STAT, MYF(0), path, errno);
590
if (my_mkdir(path,0777,MYF(0)) < 0)
592
my_error(ER_CANT_CREATE_DB, MYF(0), db, my_errno);
598
path[path_len-1]= FN_LIBCHAR;
599
strmake(path+path_len, MY_DB_OPT_FILE, sizeof(path)-path_len-1);
600
if (write_db_opt(session, path, db, create_info))
252
error_erno= write_schema_file(session, path, db, create_info);
253
if (error_erno && error_erno != EEXIST)
603
Could not create options file.
604
Restore things to beginning.
607
255
if (rmdir(path) >= 0)
613
We come here when we managed to create the database, but not the option
614
file. In this case it's best to just continue as if nothing has
615
happened. (This is a very unlikely senario)
622
uint32_t query_length;
624
if (!session->query) // Only in replication
627
query_length= (uint) (strxmov(tmp_query,"create database `",
628
db, "`", NULL) - tmp_query);
632
query= session->query;
633
query_length= session->query_length;
636
if (mysql_bin_log.is_open())
638
Query_log_event qinfo(session, query, query_length, 0,
639
/* suppress_use */ true);
642
Write should use the database being created as the "current
643
database" and not the threads current database, which is the
644
default. If we do not change the "current database" to the
645
database being created, the CREATE statement will not be
646
replicated when using --binlog-do-db to select databases to be
649
An example (--binlog-do-db=sisyfos):
651
CREATE DATABASE bob; # Not replicated
652
USE bob; # 'bob' is the current database
653
CREATE DATABASE sisyfos; # Not replicated since 'bob' is
655
USE sisyfos; # Will give error on slave since
656
# database does not exist.
659
qinfo.db_len = strlen(db);
661
/* These DDL methods and logging protected with LOCK_mysql_create_db */
662
mysql_bin_log.write(&qinfo);
664
my_ok(session, result);
264
replication_services.rawStatement(session, session->query, session->query_length);
265
session->my_ok(result);
668
pthread_mutex_unlock(&LOCK_mysql_create_db);
268
pthread_mutex_unlock(&LOCK_create_db);
669
269
start_waiting_global_read_lock(session);
677
277
bool mysql_alter_db(Session *session, const char *db, HA_CREATE_INFO *create_info)
679
char path[FN_REFLEN+16];
279
ReplicationServices &replication_services= ReplicationServices::singleton();
282
char path[FN_REFLEN+16];
684
286
Do not alter database if another thread is holding read lock.
685
Wait for global read lock before acquiring LOCK_mysql_create_db.
287
Wait for global read lock before acquiring LOCK_create_db.
686
288
After wait_if_global_read_lock() we have protection against another
687
global read lock. If we would acquire LOCK_mysql_create_db first,
289
global read lock. If we would acquire LOCK_create_db first,
688
290
another thread could step in and get the global read lock before we
689
291
reach wait_if_global_read_lock(). If this thread tries the same as we
690
(admin a db), it would then go and wait on LOCK_mysql_create_db...
292
(admin a db), it would then go and wait on LOCK_create_db...
691
293
Furthermore wait_if_global_read_lock() checks if the current thread
692
294
has the global read lock and refuses the operation with
693
295
ER_CANT_UPDATE_WITH_READLOCK if applicable.
695
297
if ((error=wait_if_global_read_lock(session,0,1)))
698
pthread_mutex_lock(&LOCK_mysql_create_db);
701
Recreate db options file: /dbpath/.db.opt
702
We pass MY_DB_OPT_FILE as "extension" to avoid
703
"table name to file name" encoding.
705
build_table_filename(path, sizeof(path), db, "", MY_DB_OPT_FILE, 0);
706
if ((error=write_db_opt(session, path, db, create_info)))
300
pthread_mutex_lock(&LOCK_create_db);
709
302
/* Change options if current database is being altered. */
711
if (session->db && !strcmp(session->db,db))
713
session->db_charset= create_info->default_table_charset ?
714
create_info->default_table_charset :
715
session->variables.collation_server;
716
session->variables.collation_database= session->db_charset;
719
if (mysql_bin_log.is_open())
721
Query_log_event qinfo(session, session->query, session->query_length, 0,
722
/* suppress_use */ true);
725
Write should use the database being created as the "current
726
database" and not the threads current database, which is the
730
qinfo.db_len = strlen(db);
732
session->clear_error();
733
/* These DDL methods and logging protected with LOCK_mysql_create_db */
734
mysql_bin_log.write(&qinfo);
736
my_ok(session, result);
303
path_len= build_table_filename(path, sizeof(path), db, "", false);
304
path[path_len-1]= 0; // Remove last '/' from path
306
error= write_schema_file(session, path, db, create_info);
307
if (error && error != EEXIST)
309
/* TODO: find some witty way of getting back an error message */
310
pthread_mutex_unlock(&LOCK_create_db);
314
replication_services.rawStatement(session, session->getQueryString(), session->getQueryLength());
315
session->my_ok(result);
317
pthread_mutex_unlock(&LOCK_create_db);
318
start_waiting_global_read_lock(session);
739
pthread_mutex_unlock(&LOCK_mysql_create_db);
740
start_waiting_global_read_lock(session);
320
return error ? true : false;