12
12
You should have received a copy of the GNU General Public License
13
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
17
17
/* create and drop of databases */
22
#include <sys/types.h>
18
#include <drizzled/global.h>
28
#include "drizzled/error.h"
23
#include <drizzled/message/schema.pb.h>
25
#include <drizzled/server_includes.h>
26
#include <mysys/mysys_err.h>
27
#include <mysys/my_dir.h>
28
#include <drizzled/error.h>
29
29
#include <drizzled/gettext.h>
30
#include <drizzled/my_hash.h>
31
#include "drizzled/internal/m_string.h"
30
#include <mysys/hash.h>
32
31
#include <drizzled/session.h>
33
32
#include <drizzled/db.h>
34
33
#include <drizzled/sql_base.h>
35
34
#include <drizzled/lock.h>
36
35
#include <drizzled/errmsg_print.h>
37
#include <drizzled/transaction_services.h>
36
#include <drizzled/replication_services.h>
38
37
#include <drizzled/message/schema.pb.h>
39
#include "drizzled/sql_table.h"
40
#include "drizzled/plugin/storage_engine.h"
41
#include "drizzled/plugin/authorization.h"
42
#include "drizzled/global_charset_info.h"
43
#include "drizzled/pthread_globals.h"
44
#include "drizzled/charset.h"
46
#include <boost/thread/mutex.hpp>
48
#include "drizzled/internal/my_sys.h"
39
using namespace drizzled;
41
#define MY_DB_OPT_FILE "db.opt"
50
42
#define MAX_DROP_TABLE_Q_LEN 1024
57
static void change_db_impl(Session *session);
58
static void change_db_impl(Session *session, identifier::Schema &schema_identifier);
44
const char *del_exts[]= {".dfe", ".BAK", ".TMD",".opt", NULL};
45
static TYPELIB deletable_extentions=
46
{array_elements(del_exts)-1,"del_exts", del_exts, NULL};
48
static long mysql_rm_known_files(Session *session, MY_DIR *dirp,
49
const char *db, const char *path,
50
TableList **dropped_tables);
52
static void mysql_change_db_impl(Session *session, LEX_STRING *new_db_name);
56
Return default database collation.
58
@param session Thread context.
59
@param db_name Database name.
61
@return CHARSET_INFO object. The operation always return valid character
62
set, even if the database does not exist.
65
const CHARSET_INFO *get_default_db_collation(const char *db_name)
69
get_database_metadata(db_name, &db);
71
/* If for some reason the db.opt file lacks a collation,
72
we just return the default */
74
if (db.has_collation())
76
const string buffer= db.collation();
77
const CHARSET_INFO* cs= get_charset_by_name(buffer.c_str());
81
errmsg_printf(ERRMSG_LVL_ERROR,
82
_("Error while loading database options: '%s':"),db_name);
83
errmsg_printf(ERRMSG_LVL_ERROR, ER(ER_UNKNOWN_COLLATION), buffer.c_str());
85
return default_charset_info;
91
return default_charset_info;
94
/* path is path to database, not schema file */
95
static int write_schema_file(Session *session,
96
const char *path, const char *name,
97
HA_CREATE_INFO *create)
100
char schema_file_tmp[FN_REFLEN];
101
string schema_file(path);
107
snprintf(schema_file_tmp, FN_REFLEN, "%s%c%s.tmpXXXXXX", path, FN_LIBCHAR, MY_DB_OPT_FILE);
109
schema_file.append(1, FN_LIBCHAR);
110
schema_file.append(MY_DB_OPT_FILE);
112
int fd= mkstemp(schema_file_tmp);
117
if (!create->default_table_charset)
118
create->default_table_charset= session->variables.collation_server;
121
db.set_collation(create->default_table_charset->name);
123
if (!db.SerializeToFileDescriptor(fd))
126
unlink(schema_file_tmp);
130
if (rename(schema_file_tmp, schema_file.c_str()) == -1)
140
int get_database_metadata(const char *dbname, message::Schema *db)
142
char db_opt_path[FN_REFLEN];
146
Pass an empty file name, and the database options file name as extension
147
to avoid table name to file name encoding.
149
length= build_table_filename(db_opt_path, sizeof(db_opt_path),
151
strcpy(db_opt_path + length, MY_DB_OPT_FILE);
153
int fd= open(db_opt_path, O_RDONLY);
158
if (!db->ParseFromFileDescriptor(fd))
65
173
session Thread handler
66
174
db Name of database to create
67
175
Function assumes that this is already validated.
81
bool create_db(Session *session, const message::Schema &schema_message, const bool is_if_not_exists)
189
bool mysql_create_db(Session *session, const char *db, HA_CREATE_INFO *create_info, bool is_if_not_exists)
83
TransactionServices &transaction_services= TransactionServices::singleton();
191
ReplicationServices &replication_services= ReplicationServices::singleton();
192
char path[FN_REFLEN+16];
84
195
bool error= false;
198
/* do not create 'information_schema' db */
199
if (!my_strcasecmp(system_charset_info, db, INFORMATION_SCHEMA_NAME.c_str()))
201
my_error(ER_DB_CREATE_EXISTS, MYF(0), db);
87
206
Do not create database if another thread is holding read lock.
88
Wait for global read lock before acquiring session->catalog()->schemaLock().
207
Wait for global read lock before acquiring LOCK_create_db.
89
208
After wait_if_global_read_lock() we have protection against another
90
global read lock. If we would acquire session->catalog()->schemaLock() first,
209
global read lock. If we would acquire LOCK_create_db first,
91
210
another thread could step in and get the global read lock before we
92
211
reach wait_if_global_read_lock(). If this thread tries the same as we
93
(admin a db), it would then go and wait on session->catalog()->schemaLock()...
212
(admin a db), it would then go and wait on LOCK_create_db...
94
213
Furthermore wait_if_global_read_lock() checks if the current thread
95
214
has the global read lock and refuses the operation with
96
215
ER_CANT_UPDATE_WITH_READLOCK if applicable.
98
if (session->wait_if_global_read_lock(false, true))
217
if (wait_if_global_read_lock(session, 0, 1))
103
assert(schema_message.has_name());
104
assert(schema_message.has_collation());
106
// @todo push this lock down into the engine
223
pthread_mutex_lock(&LOCK_create_db);
225
/* Check directory */
226
path_len= build_table_filename(path, sizeof(path), db, "", false);
227
path[path_len-1]= 0; // Remove last '/' from path
229
if (mkdir(path,0777) == -1)
108
boost::mutex::scoped_lock scopedLock(session->catalog().schemaLock());
110
// Check to see if it exists already.
111
identifier::Schema schema_identifier(schema_message.name());
112
if (plugin::StorageEngine::doesSchemaExist(schema_identifier))
114
if (not is_if_not_exists)
116
my_error(ER_DB_CREATE_EXISTS, schema_identifier);
121
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
122
ER_DB_CREATE_EXISTS, ER(ER_DB_CREATE_EXISTS),
123
schema_message.name().c_str());
233
if (! is_if_not_exists)
235
my_error(ER_DB_CREATE_EXISTS, MYF(0), db);
239
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
240
ER_DB_CREATE_EXISTS, ER(ER_DB_CREATE_EXISTS), db);
127
else if (not plugin::StorageEngine::createSchema(schema_message)) // Try to create it
246
my_error(ER_CANT_CREATE_DB, MYF(0), db, my_errno);
251
error_erno= write_schema_file(session, path, db, create_info);
252
if (error_erno && error_erno != EEXIST)
254
if (rmdir(path) >= 0)
129
my_error(ER_CANT_CREATE_DB, MYF(0), schema_message.name().c_str(), errno);
134
transaction_services.createSchema(*session, schema_message);
138
session->startWaitingGlobalReadLock();
263
replication_services.rawStatement(session, session->query, session->query_length);
264
session->my_ok(result);
267
pthread_mutex_unlock(&LOCK_create_db);
268
start_waiting_global_read_lock(session);
144
274
/* db-name is already validated when we come here */
146
bool alter_db(Session *session,
147
const message::Schema &schema_message,
148
const message::schema::shared_ptr &original_schema)
276
bool mysql_alter_db(Session *session, const char *db, HA_CREATE_INFO *create_info)
150
TransactionServices &transaction_services= TransactionServices::singleton();
278
ReplicationServices &replication_services= ReplicationServices::singleton();
281
char path[FN_REFLEN+16];
153
285
Do not alter database if another thread is holding read lock.
154
Wait for global read lock before acquiring session->catalog()->schemaLock().
286
Wait for global read lock before acquiring LOCK_create_db.
155
287
After wait_if_global_read_lock() we have protection against another
156
global read lock. If we would acquire session->catalog()->schemaLock() first,
288
global read lock. If we would acquire LOCK_create_db first,
157
289
another thread could step in and get the global read lock before we
158
290
reach wait_if_global_read_lock(). If this thread tries the same as we
159
(admin a db), it would then go and wait on session->catalog()->schemaLock()...
291
(admin a db), it would then go and wait on LOCK_create_db...
160
292
Furthermore wait_if_global_read_lock() checks if the current thread
161
293
has the global read lock and refuses the operation with
162
294
ER_CANT_UPDATE_WITH_READLOCK if applicable.
164
if ((session->wait_if_global_read_lock(false, true)))
296
if ((error=wait_if_global_read_lock(session,0,1)))
299
pthread_mutex_lock(&LOCK_create_db);
301
/* Change options if current database is being altered. */
302
path_len= build_table_filename(path, sizeof(path), db, "", false);
303
path[path_len-1]= 0; // Remove last '/' from path
305
error= write_schema_file(session, path, db, create_info);
306
if (error && error != EEXIST)
169
boost::mutex::scoped_lock scopedLock(session->catalog().schemaLock());
171
identifier::Schema schema_idenifier(schema_message.name());
172
if (not plugin::StorageEngine::doesSchemaExist(schema_idenifier))
174
my_error(ER_SCHEMA_DOES_NOT_EXIST, schema_idenifier);
178
/* Change options if current database is being altered. */
179
success= plugin::StorageEngine::alterSchema(schema_message);
183
transaction_services.alterSchema(*session, original_schema, schema_message);
188
my_error(ER_ALTER_SCHEMA, schema_idenifier);
308
/* TODO: find some witty way of getting back an error message */
309
pthread_mutex_unlock(&LOCK_create_db);
191
session->startWaitingGlobalReadLock();
313
replication_services.rawStatement(session, session->getQueryString(), session->getQueryLength());
314
session->my_ok(result);
316
pthread_mutex_unlock(&LOCK_create_db);
317
start_waiting_global_read_lock(session);
319
return error ? true : false;
214
bool rm_db(Session *session, identifier::Schema &schema_identifier, const bool if_exists)
340
bool mysql_rm_db(Session *session,char *db,bool if_exists)
344
char path[FN_REFLEN+16];
347
TableList *dropped_tables= NULL;
349
if (db && (strcmp(db, "information_schema") == 0))
351
my_error(ER_DBACCESS_DENIED_ERROR, MYF(0), "", "", INFORMATION_SCHEMA_NAME.c_str());
219
356
Do not drop database if another thread is holding read lock.
220
Wait for global read lock before acquiring session->catalog()->schemaLock().
357
Wait for global read lock before acquiring LOCK_create_db.
221
358
After wait_if_global_read_lock() we have protection against another
222
global read lock. If we would acquire session->catalog()->schemaLock() first,
359
global read lock. If we would acquire LOCK_create_db first,
223
360
another thread could step in and get the global read lock before we
224
361
reach wait_if_global_read_lock(). If this thread tries the same as we
225
(admin a db), it would then go and wait on session->catalog()->schemaLock()...
362
(admin a db), it would then go and wait on LOCK_create_db...
226
363
Furthermore wait_if_global_read_lock() checks if the current thread
227
364
has the global read lock and refuses the operation with
228
365
ER_CANT_UPDATE_WITH_READLOCK if applicable.
230
if (session->wait_if_global_read_lock(false, true))
237
boost::mutex::scoped_lock scopedLock(session->catalog().schemaLock());
239
/* See if the schema exists */
240
if (not plugin::StorageEngine::doesSchemaExist(schema_identifier))
245
schema_identifier.getSQLPath(path);
247
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
248
ER_DB_DROP_EXISTS, ER(ER_DB_DROP_EXISTS),
254
my_error(ER_DB_DROP_EXISTS, schema_identifier);
260
error= plugin::StorageEngine::dropSchema(*session, schema_identifier);
367
if (wait_if_global_read_lock(session, 0, 1))
373
pthread_mutex_lock(&LOCK_create_db);
375
length= build_table_filename(path, sizeof(path), db, "", false);
376
strcpy(path+length, MY_DB_OPT_FILE); // Append db option file name
378
path[length]= '\0'; // Remove file name
380
/* See if the directory exists */
381
if (!(dirp= my_dir(path,MYF(MY_DONT_SORT))))
386
my_error(ER_DB_DROP_EXISTS, MYF(0), db);
390
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
391
ER_DB_DROP_EXISTS, ER(ER_DB_DROP_EXISTS), db);
395
pthread_mutex_lock(&LOCK_open); /* After deleting database, remove all cache entries related to schema */
396
remove_db_from_cache(db);
397
pthread_mutex_unlock(&LOCK_open);
401
if ((deleted= mysql_rm_known_files(session, dirp, db, path, &dropped_tables)) >= 0)
403
plugin::StorageEngine::dropDatabase(path);
410
uint32_t query_length;
413
/* The client used the old obsolete mysql_drop_db() call */
415
query_length= sprintf(path, "drop database `%s`", db);
419
query= session->query;
420
query_length= session->query_length;
422
ReplicationServices &replication_services= ReplicationServices::singleton();
423
replication_services.rawStatement(session, session->getQueryString(), session->getQueryLength());
424
session->clear_error();
425
session->server_status|= SERVER_STATUS_DB_DROPPED;
426
session->my_ok((uint32_t) deleted);
427
session->server_status&= ~SERVER_STATUS_DB_DROPPED;
431
char *query, *query_pos, *query_end, *query_data_start;
435
if (!(query= (char*) session->alloc(MAX_DROP_TABLE_Q_LEN)))
436
goto exit; /* not much else we can do */
437
query_pos= query_data_start= strcpy(query,"drop table ")+11;
438
query_end= query + MAX_DROP_TABLE_Q_LEN;
441
ReplicationServices &replication_services= ReplicationServices::singleton();
442
for (tbl= dropped_tables; tbl; tbl= tbl->next_local)
444
uint32_t tbl_name_len;
446
/* 3 for the quotes and the comma*/
447
tbl_name_len= strlen(tbl->table_name) + 3;
448
if (query_pos + tbl_name_len + 1 >= query_end)
450
/* These DDL methods and logging protected with LOCK_create_db */
451
replication_services.rawStatement(session, query, (size_t) (query_pos -1 - query));
452
query_pos= query_data_start;
456
query_pos= strcpy(query_pos,tbl->table_name) + (tbl_name_len-3);
461
if (query_pos != query_data_start)
463
/* These DDL methods and logging protected with LOCK_create_db */
464
replication_services.rawStatement(session, query, (size_t) (query_pos -1 - query));
266
470
If this database was the client's selected database, we silently
267
471
change the client's selected database to nothing (to have an empty
268
472
SELECT DATABASE() in the future). For this we free() session->db and set
271
if (not error and schema_identifier.compare(*session->schema()))
272
change_db_impl(session);
274
session->startWaitingGlobalReadLock();
475
if (! session->db.empty() && ! strcmp(session->db.c_str(), db))
476
mysql_change_db_impl(session, NULL);
477
pthread_mutex_unlock(&LOCK_create_db);
478
start_waiting_global_read_lock(session);
484
Removes files with known extensions plus.
485
session MUST be set when calling this function!
488
static long mysql_rm_known_files(Session *session, MY_DIR *dirp, const char *db,
489
const char *org_path,
490
TableList **dropped_tables)
493
char filePath[FN_REFLEN];
494
TableList *tot_list= NULL, **tot_list_next;
496
tot_list_next= &tot_list;
498
for (uint32_t idx= 0;
499
idx < (uint32_t) dirp->number_off_files && !session->killed ;
502
FILEINFO *file=dirp->dir_entry+idx;
505
/* skiping . and .. */
506
if (file->name[0] == '.' && (!file->name[1] ||
507
(file->name[1] == '.' && !file->name[2])))
510
if (!(extension= strrchr(file->name, '.')))
511
extension= strchr(file->name, '\0');
512
if (find_type(extension, &deletable_extentions,1+2) <= 0)
517
strange checking for magic extensions that are then deleted if
518
not reg_ext (i.e. .frm).
520
and (previously) we'd err out on drop database if files not matching
521
engine ha_known_exts() or deletable_extensions were present.
523
presumably this was to avoid deleting other user data... except if that
524
data happened to be in files ending in .BAK, .opt or .TMD. *fun*
528
/* just for safety we use files_charset_info */
529
if (db && !my_strcasecmp(files_charset_info,
532
uint32_t db_len= strlen(db);
534
/* Drop the table nicely */
535
*extension= 0; // Remove extension
536
TableList *table_list=(TableList*)
537
session->calloc(sizeof(*table_list) +
539
strlen(file->name) + 1);
543
table_list->db= (char*) (table_list+1);
544
table_list->table_name= strcpy(table_list->db, db) + db_len + 1;
545
filename_to_tablename(file->name, table_list->table_name,
546
strlen(file->name) + 1);
547
table_list->alias= table_list->table_name; // If lower_case_table_names=2
548
table_list->internal_tmp_table= is_prefix(file->name, TMP_FILE_PREFIX);
550
(*tot_list_next)= table_list;
551
tot_list_next= &table_list->next_local;
556
sprintf(filePath, "%s/%s", org_path, file->name);
557
if (my_delete_with_symlink(filePath,MYF(MY_WME)))
563
if (session->killed ||
564
(tot_list && mysql_rm_table_part2(session, tot_list, 1, 0, 1)))
570
*dropped_tables= tot_list;
574
my_error(ER_DB_DROP_RMDIR, MYF(0), org_path, errno);
586
@brief Internal implementation: switch current database to a valid one.
588
@param session Thread context.
589
@param new_db_name Name of the database to switch to. The function will
590
take ownership of the name (the caller must not free
591
the allocated memory). If the name is NULL, we're
592
going to switch to NULL db.
593
@param new_db_charset Character set of the new database.
596
static void mysql_change_db_impl(Session *session, LEX_STRING *new_db_name)
598
/* 1. Change current database in Session. */
600
if (new_db_name == NULL)
603
Session::set_db() does all the job -- it frees previous database name and
607
session->set_db(NULL, 0);
612
Here we already have a copy of database name to be used in Session. So,
613
we just call Session::reset_db(). Since Session::reset_db() does not releases
614
the previous database name, we should do it explicitly.
617
session->set_db(new_db_name->str, new_db_name->length);
622
Return true if db1_name is equal to db2_name, false otherwise.
624
The function allows to compare database names according to the MySQL
625
rules. The database names db1 and db2 are equal if:
626
- db1 is NULL and db2 is NULL;
628
- db1 is not-NULL, db2 is not-NULL, db1 is equal (ignoring case) to
629
db2 in system character set (UTF8).
633
cmp_db_names(const char *db1_name,
634
const char *db2_name)
637
/* db1 is NULL and db2 is NULL */
638
(!db1_name && !db2_name) ||
640
/* db1 is not-NULL, db2 is not-NULL, db1 == db2. */
641
(db1_name && db2_name && my_strcasecmp(system_charset_info, db1_name, db2_name) == 0);
280
646
@brief Change the current database and its attributes unconditionally.
338
704
@retval true Error
341
bool change_db(Session *session, identifier::Schema &schema_identifier)
707
bool mysql_change_db(Session *session, const LEX_STRING *new_db_name, bool force_switch)
344
if (not plugin::Authorization::isAuthorized(session->user(), schema_identifier))
346
/* Error message is set in isAuthorized */
350
if (not check_db_name(session, schema_identifier))
352
my_error(ER_WRONG_DB_NAME, schema_identifier);
357
if (not plugin::StorageEngine::doesSchemaExist(schema_identifier))
359
my_error(ER_BAD_DB_ERROR, schema_identifier);
361
/* The operation failed. */
366
change_db_impl(session, schema_identifier);
709
LEX_STRING new_db_file_name;
710
const CHARSET_INFO *db_default_cl;
713
assert(new_db_name->length);
715
if (my_strcasecmp(system_charset_info, new_db_name->str,
716
INFORMATION_SCHEMA_NAME.c_str()) == 0)
718
/* Switch the current database to INFORMATION_SCHEMA. */
719
/* const_cast<> is safe here: mysql_change_db_impl does a copy */
720
LEX_STRING is_name= { const_cast<char *>(INFORMATION_SCHEMA_NAME.c_str()),
721
INFORMATION_SCHEMA_NAME.length() };
722
mysql_change_db_impl(session, &is_name);
728
Now we need to make a copy because check_db_name requires a
729
non-constant argument. Actually, it takes database file name.
731
TODO: fix check_db_name().
734
new_db_file_name.length= new_db_name->length;
735
new_db_file_name.str= (char *)malloc(new_db_name->length + 1);
736
if (new_db_file_name.str == NULL)
737
return true; /* the error is set */
738
memcpy(new_db_file_name.str, new_db_name->str, new_db_name->length);
739
new_db_file_name.str[new_db_name->length]= 0;
743
NOTE: if check_db_name() fails, we should throw an error in any case,
744
even if we are called from sp_head::execute().
746
It's next to impossible however to get this error when we are called
747
from sp_head::execute(). But let's switch the current database to NULL
748
in this case to be sure.
751
if (check_db_name(&new_db_file_name))
753
my_error(ER_WRONG_DB_NAME, MYF(0), new_db_file_name.str);
754
free(new_db_file_name.str);
757
mysql_change_db_impl(session, NULL);
762
if (check_db_dir_existence(new_db_file_name.str))
766
/* Throw a warning and free new_db_file_name. */
768
push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
769
ER_BAD_DB_ERROR, ER(ER_BAD_DB_ERROR),
770
new_db_file_name.str);
772
free(new_db_file_name.str);
774
/* Change db to NULL. */
776
mysql_change_db_impl(session, NULL);
778
/* The operation succeed. */
784
/* Report an error and free new_db_file_name. */
786
my_error(ER_BAD_DB_ERROR, MYF(0), new_db_file_name.str);
787
free(new_db_file_name.str);
789
/* The operation failed. */
795
db_default_cl= get_default_db_collation(new_db_file_name.str);
797
mysql_change_db_impl(session, &new_db_file_name);
798
free(new_db_file_name.str);
372
@brief Internal implementation: switch current database to a valid one.
374
@param session Thread context.
375
@param new_db_name Name of the database to switch to. The function will
376
take ownership of the name (the caller must not free
377
the allocated memory). If the name is NULL, we're
378
going to switch to NULL db.
379
@param new_db_charset Character set of the new database.
804
Check if there is directory for the database name.
807
check_db_dir_existence()
808
db_name database name
811
false There is directory for the specified database name.
812
true The directory does not exist.
382
static void change_db_impl(Session *session, identifier::Schema &schema_identifier)
384
/* 1. Change current database in Session. */
387
if (new_db_name == NULL)
390
Session::set_db() does all the job -- it frees previous database name and
394
session->set_db(NULL, 0);
400
Here we already have a copy of database name to be used in Session. So,
401
we just call Session::reset_db(). Since Session::reset_db() does not releases
402
the previous database name, we should do it explicitly.
405
session->set_db(schema_identifier.getSchemaName());
409
static void change_db_impl(Session *session)
411
session->set_db(string());
414
} /* namespace drizzled */
815
bool check_db_dir_existence(const char *db_name)
817
char db_dir_path[FN_REFLEN];
818
uint32_t db_dir_path_len;
820
db_dir_path_len= build_table_filename(db_dir_path, sizeof(db_dir_path),
823
if (db_dir_path_len && db_dir_path[db_dir_path_len - 1] == FN_LIBCHAR)
824
db_dir_path[db_dir_path_len - 1]= 0;
828
return my_access(db_dir_path, F_OK);