~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/sql_db.cc

  • Committer: Stewart Smith
  • Date: 2008-10-15 04:21:24 UTC
  • mto: This revision was merged to the branch mainline in revision 516.
  • Revision ID: stewart@flamingspork.com-20081015042124-kdmb74bcbky1k1nz
remove my_pthread_[gs]etspecific

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
 
12
12
   You should have received a copy of the GNU General Public License
13
13
   along with this program; if not, write to the Free Software
14
 
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
15
15
 
16
16
 
17
17
/* create and drop of databases */
18
 
#include <config.h>
19
 
 
20
 
#include <fcntl.h>
21
 
#include <sys/stat.h>
22
 
#include <sys/types.h>
23
 
 
24
 
#include <set>
 
18
#include "config.h"
 
19
#include CSTDINT_H
 
20
#include CINTTYPES_H
25
21
#include <string>
26
22
#include <fstream>
27
 
 
28
 
#include <drizzled/error.h>
29
 
#include <drizzled/gettext.h>
30
 
#include <drizzled/my_hash.h>
31
 
#include <drizzled/internal/m_string.h>
32
 
#include <drizzled/session.h>
33
 
#include <drizzled/schema.h>
34
 
#include <drizzled/sql_base.h>
35
 
#include <drizzled/lock.h>
36
 
#include <drizzled/errmsg_print.h>
37
 
#include <drizzled/transaction_services.h>
38
 
#include <drizzled/message/schema.pb.h>
39
 
#include <drizzled/sql_table.h>
40
 
#include <drizzled/plugin/storage_engine.h>
41
 
#include <drizzled/plugin/authorization.h>
42
 
#include <drizzled/global_charset_info.h>
43
 
#include <drizzled/pthread_globals.h>
44
 
#include <drizzled/charset.h>
45
 
#include <drizzled/internal/my_sys.h>
46
 
 
47
 
#include <boost/thread/mutex.hpp>
 
23
#include <drizzled/serialize/serialize.h>
 
24
using namespace std;
 
25
#include <drizzled/server_includes.h>
 
26
#include <mysys/mysys_err.h>
 
27
#include <mysys/my_dir.h>
 
28
#include "log.h"
 
29
#include <drizzled/drizzled_error_messages.h>
 
30
#include <libdrizzle/gettext.h>
 
31
 
48
32
 
49
33
#define MAX_DROP_TABLE_Q_LEN      1024
50
34
 
51
 
using namespace std;
52
 
 
53
 
namespace drizzled
54
 
{
55
 
 
56
 
namespace schema
57
 
{
58
 
 
59
 
static void change_db_impl(Session &session);
60
 
static void change_db_impl(Session &session, identifier::Schema &schema_identifier);
 
35
const char *del_exts[]= {".frm", ".BAK", ".TMD",".opt", NULL};
 
36
static TYPELIB deletable_extentions=
 
37
{array_elements(del_exts)-1,"del_exts", del_exts, NULL};
 
38
 
 
39
static long mysql_rm_known_files(THD *thd, MY_DIR *dirp,
 
40
                                 const char *db, const char *path, uint32_t level, 
 
41
                                 TableList **dropped_tables);
 
42
         
 
43
static bool rm_dir_w_symlink(const char *org_path, bool send_error);
 
44
static void mysql_change_db_impl(THD *thd,
 
45
                                 LEX_STRING *new_db_name,
 
46
                                 const CHARSET_INFO * const new_db_charset);
 
47
 
 
48
 
 
49
/* Database lock hash */
 
50
HASH lock_db_cache;
 
51
pthread_mutex_t LOCK_lock_db;
 
52
int creating_database= 0;  // how many database locks are made
 
53
 
 
54
 
 
55
/* Structure for database lock */
 
56
typedef struct my_dblock_st
 
57
{
 
58
  char *name;        /* Database name        */
 
59
  uint32_t name_length;  /* Database length name */
 
60
} my_dblock_t;
 
61
 
 
62
 
 
63
/*
 
64
  lock_db key.
 
65
*/
 
66
 
 
67
extern "C" unsigned char* lock_db_get_key(my_dblock_t *, size_t *, bool not_used);
 
68
 
 
69
unsigned char* lock_db_get_key(my_dblock_t *ptr, size_t *length,
 
70
                       bool not_used __attribute__((unused)))
 
71
{
 
72
  *length= ptr->name_length;
 
73
  return (unsigned char*) ptr->name;
 
74
}
 
75
 
 
76
 
 
77
/*
 
78
  Free lock_db hash element.
 
79
*/
 
80
 
 
81
extern "C" void lock_db_free_element(void *ptr);
 
82
 
 
83
void lock_db_free_element(void *ptr)
 
84
{
 
85
  free(ptr);
 
86
}
 
87
 
 
88
 
 
89
/*
 
90
  Delete a database lock entry from hash.
 
91
*/
 
92
 
 
93
void lock_db_delete(const char *name, uint32_t length)
 
94
{
 
95
  my_dblock_t *opt;
 
96
  safe_mutex_assert_owner(&LOCK_lock_db);
 
97
  if ((opt= (my_dblock_t *)hash_search(&lock_db_cache,
 
98
                                       (const unsigned char*) name, length)))
 
99
    hash_delete(&lock_db_cache, (unsigned char*) opt);
 
100
}
 
101
 
 
102
 
 
103
/* Database options hash */
 
104
static HASH dboptions;
 
105
static bool dboptions_init= 0;
 
106
static rw_lock_t LOCK_dboptions;
 
107
 
 
108
/* Structure for database options */
 
109
typedef struct my_dbopt_st
 
110
{
 
111
  char *name;                   /* Database name                  */
 
112
  uint32_t name_length;         /* Database length name           */
 
113
  const CHARSET_INFO *charset;  /* Database default character set */
 
114
} my_dbopt_t;
 
115
 
 
116
 
 
117
/*
 
118
  Function we use in the creation of our hash to get key.
 
119
*/
 
120
 
 
121
extern "C" unsigned char* dboptions_get_key(my_dbopt_t *opt, size_t *length,
 
122
                                    bool not_used);
 
123
 
 
124
unsigned char* dboptions_get_key(my_dbopt_t *opt, size_t *length,
 
125
                         bool not_used __attribute__((unused)))
 
126
{
 
127
  *length= opt->name_length;
 
128
  return (unsigned char*) opt->name;
 
129
}
 
130
 
 
131
 
 
132
/*
 
133
  Helper function to write a query to binlog used by mysql_rm_db()
 
134
*/
 
135
 
 
136
static inline void write_to_binlog(THD *thd, char *query, uint32_t q_len,
 
137
                                   char *db, uint32_t db_len)
 
138
{
 
139
  Query_log_event qinfo(thd, query, q_len, 0, 0);
 
140
  qinfo.error_code= 0;
 
141
  qinfo.db= db;
 
142
  qinfo.db_len= db_len;
 
143
  mysql_bin_log.write(&qinfo);
 
144
}  
 
145
 
 
146
 
 
147
/*
 
148
  Function to free dboptions hash element
 
149
*/
 
150
 
 
151
extern "C" void free_dbopt(void *dbopt);
 
152
 
 
153
void free_dbopt(void *dbopt)
 
154
{
 
155
  free((unsigned char*) dbopt);
 
156
}
 
157
 
 
158
 
 
159
/* 
 
160
  Initialize database option hash and locked database hash.
 
161
 
 
162
  SYNOPSIS
 
163
    my_database_names()
 
164
 
 
165
  NOTES
 
166
    Must be called before any other database function is called.
 
167
 
 
168
  RETURN
 
169
    0   ok
 
170
    1   Fatal error
 
171
*/
 
172
 
 
173
bool my_database_names_init(void)
 
174
{
 
175
  bool error= false;
 
176
  (void) my_rwlock_init(&LOCK_dboptions, NULL);
 
177
  if (!dboptions_init)
 
178
  {
 
179
    dboptions_init= 1;
 
180
    error= hash_init(&dboptions, lower_case_table_names ? 
 
181
                     &my_charset_bin : system_charset_info,
 
182
                     32, 0, 0, (hash_get_key) dboptions_get_key,
 
183
                     free_dbopt,0) ||
 
184
           hash_init(&lock_db_cache, lower_case_table_names ? 
 
185
                     &my_charset_bin : system_charset_info,
 
186
                     32, 0, 0, (hash_get_key) lock_db_get_key,
 
187
                     lock_db_free_element,0);
 
188
 
 
189
  }
 
190
  return error;
 
191
}
 
192
 
 
193
 
 
194
 
 
195
/* 
 
196
  Free database option hash and locked databases hash.
 
197
*/
 
198
 
 
199
void my_database_names_free(void)
 
200
{
 
201
  if (dboptions_init)
 
202
  {
 
203
    dboptions_init= 0;
 
204
    hash_free(&dboptions);
 
205
    (void) rwlock_destroy(&LOCK_dboptions);
 
206
    hash_free(&lock_db_cache);
 
207
  }
 
208
}
 
209
 
 
210
 
 
211
/*
 
212
  Cleanup cached options
 
213
*/
 
214
 
 
215
void my_dbopt_cleanup(void)
 
216
{
 
217
  rw_wrlock(&LOCK_dboptions);
 
218
  hash_free(&dboptions);
 
219
  hash_init(&dboptions, lower_case_table_names ? 
 
220
            &my_charset_bin : system_charset_info,
 
221
            32, 0, 0, (hash_get_key) dboptions_get_key,
 
222
            free_dbopt,0);
 
223
  rw_unlock(&LOCK_dboptions);
 
224
}
 
225
 
 
226
 
 
227
/*
 
228
  Find database options in the hash.
 
229
  
 
230
  DESCRIPTION
 
231
    Search a database options in the hash, usings its path.
 
232
    Fills "create" on success.
 
233
  
 
234
  RETURN VALUES
 
235
    0 on success.
 
236
    1 on error.
 
237
*/
 
238
 
 
239
static bool get_dbopt(const char *dbname, HA_CREATE_INFO *create)
 
240
{
 
241
  my_dbopt_t *opt;
 
242
  uint32_t length;
 
243
  bool error= true;
 
244
  
 
245
  length= (uint) strlen(dbname);
 
246
  
 
247
  rw_rdlock(&LOCK_dboptions);
 
248
  if ((opt= (my_dbopt_t*) hash_search(&dboptions, (unsigned char*) dbname, length)))
 
249
  {
 
250
    create->default_table_charset= opt->charset;
 
251
    error= true;
 
252
  }
 
253
  rw_unlock(&LOCK_dboptions);
 
254
  return error;
 
255
}
 
256
 
 
257
 
 
258
/*
 
259
  Writes database options into the hash.
 
260
  
 
261
  DESCRIPTION
 
262
    Inserts database options into the hash, or updates
 
263
    options if they are already in the hash.
 
264
  
 
265
  RETURN VALUES
 
266
    0 on success.
 
267
    1 on error.
 
268
*/
 
269
 
 
270
static bool put_dbopt(const char *dbname, HA_CREATE_INFO *create)
 
271
{
 
272
  my_dbopt_t *opt;
 
273
  uint32_t length;
 
274
  bool error= false;
 
275
 
 
276
  length= (uint) strlen(dbname);
 
277
  
 
278
  rw_wrlock(&LOCK_dboptions);
 
279
  if (!(opt= (my_dbopt_t*) hash_search(&dboptions, (unsigned char*) dbname, length)))
 
280
  { 
 
281
    /* Options are not in the hash, insert them */
 
282
    char *tmp_name;
 
283
    if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
 
284
                         &opt, (uint) sizeof(*opt), &tmp_name, (uint) length+1,
 
285
                         NULL))
 
286
    {
 
287
      error= true;
 
288
      goto end;
 
289
    }
 
290
    
 
291
    opt->name= tmp_name;
 
292
    my_stpcpy(opt->name, dbname);
 
293
    opt->name_length= length;
 
294
    
 
295
    if ((error= my_hash_insert(&dboptions, (unsigned char*) opt)))
 
296
    {
 
297
      free(opt);
 
298
      goto end;
 
299
    }
 
300
  }
 
301
 
 
302
  /* Update / write options in hash */
 
303
  opt->charset= create->default_table_charset;
 
304
 
 
305
end:
 
306
  rw_unlock(&LOCK_dboptions);  
 
307
  return(error);
 
308
}
 
309
 
 
310
 
 
311
/*
 
312
  Deletes database options from the hash.
 
313
*/
 
314
 
 
315
void del_dbopt(const char *path)
 
316
{
 
317
  my_dbopt_t *opt;
 
318
  rw_wrlock(&LOCK_dboptions);
 
319
  if ((opt= (my_dbopt_t *)hash_search(&dboptions, (const unsigned char*) path,
 
320
                                      strlen(path))))
 
321
    hash_delete(&dboptions, (unsigned char*) opt);
 
322
  rw_unlock(&LOCK_dboptions);
 
323
}
 
324
 
 
325
 
 
326
/*
 
327
  Create database options file:
 
328
 
 
329
  DESCRIPTION
 
330
    Currently database default charset is only stored there.
 
331
 
 
332
  RETURN VALUES
 
333
  0     ok
 
334
  1     Could not create file or write to it.  Error sent through my_error()
 
335
*/
 
336
 
 
337
static bool write_db_opt(THD *thd, const char *path, const char *name, HA_CREATE_INFO *create)
 
338
{
 
339
  bool error= true;
 
340
  drizzle::Schema db;
 
341
 
 
342
  assert(name);
 
343
 
 
344
  if (!create->default_table_charset)
 
345
    create->default_table_charset= thd->variables.collation_server;
 
346
 
 
347
  if (put_dbopt(path, create))
 
348
    return 1;
 
349
 
 
350
  db.set_name(name);
 
351
  db.set_characterset(create->default_table_charset->csname);
 
352
  db.set_collation(create->default_table_charset->name);
 
353
 
 
354
  fstream output(path, ios::out | ios::trunc | ios::binary);
 
355
  if (!db.SerializeToOstream(&output)) 
 
356
    error= false;
 
357
 
 
358
  return error;
 
359
}
 
360
 
 
361
 
 
362
/*
 
363
  Load database options file
 
364
 
 
365
  load_db_opt()
 
366
  path          Path for option file
 
367
  create        Where to store the read options
 
368
 
 
369
  DESCRIPTION
 
370
 
 
371
  RETURN VALUES
 
372
  0     File found
 
373
  1     No database file or could not open it
 
374
 
 
375
*/
 
376
 
 
377
bool load_db_opt(THD *thd, const char *path, HA_CREATE_INFO *create)
 
378
{
 
379
  bool error=1;
 
380
  drizzle::Schema db;
 
381
  string buffer;
 
382
 
 
383
  memset(create, 0, sizeof(*create));
 
384
  create->default_table_charset= thd->variables.collation_server;
 
385
 
 
386
  /* Check if options for this database are already in the hash */
 
387
  if (!get_dbopt(path, create))
 
388
    return(0);
 
389
 
 
390
  fstream input(path, ios::in | ios::binary);
 
391
  if (!input)
 
392
    goto err1;
 
393
  else if (!db.ParseFromIstream(&input))
 
394
    goto err1;
 
395
 
 
396
  buffer= db.characterset();
 
397
  if (!(create->default_table_charset= get_charset_by_csname(buffer.c_str(), MY_CS_PRIMARY, MYF(0))))
 
398
  {
 
399
    sql_print_error(_("Error while loading database options: '%s':"),path);
 
400
    sql_print_error(ER(ER_UNKNOWN_COLLATION), buffer.c_str());
 
401
    create->default_table_charset= default_charset_info;
 
402
  }
 
403
 
 
404
  buffer= db.collation();
 
405
  if (!(create->default_table_charset= get_charset_by_name(buffer.c_str(), MYF(0))))
 
406
  {
 
407
    sql_print_error(_("Error while loading database options: '%s':"),path);
 
408
    sql_print_error(ER(ER_UNKNOWN_COLLATION), buffer.c_str());
 
409
    create->default_table_charset= default_charset_info;
 
410
  }
 
411
 
 
412
  /*
 
413
    Put the loaded value into the hash.
 
414
    Note that another thread could've added the same
 
415
    entry to the hash after we called get_dbopt(),
 
416
    but it's not an error, as put_dbopt() takes this
 
417
    possibility into account.
 
418
  */
 
419
  error= put_dbopt(path, create);
 
420
 
 
421
err1:
 
422
  return(error);
 
423
}
 
424
 
 
425
 
 
426
/*
 
427
  Retrieve database options by name. Load database options file or fetch from
 
428
  cache.
 
429
 
 
430
  SYNOPSIS
 
431
    load_db_opt_by_name()
 
432
    db_name         Database name
 
433
    db_create_info  Where to store the database options
 
434
 
 
435
  DESCRIPTION
 
436
    load_db_opt_by_name() is a shortcut for load_db_opt().
 
437
 
 
438
  NOTE
 
439
    Although load_db_opt_by_name() (and load_db_opt()) returns status of
 
440
    the operation, it is useless usually and should be ignored. The problem
 
441
    is that there are 1) system databases ("mysql") and 2) virtual
 
442
    databases ("information_schema"), which do not contain options file.
 
443
    So, load_db_opt[_by_name]() returns false for these databases, but this
 
444
    is not an error.
 
445
 
 
446
    load_db_opt[_by_name]() clears db_create_info structure in any case, so
 
447
    even on failure it contains valid data. So, common use case is just
 
448
    call load_db_opt[_by_name]() without checking return value and use
 
449
    db_create_info right after that.
 
450
 
 
451
  RETURN VALUES (read NOTE!)
 
452
    false   Success
 
453
    true    Failed to retrieve options
 
454
*/
 
455
 
 
456
bool load_db_opt_by_name(THD *thd, const char *db_name,
 
457
                         HA_CREATE_INFO *db_create_info)
 
458
{
 
459
  char db_opt_path[FN_REFLEN];
 
460
 
 
461
  /*
 
462
    Pass an empty file name, and the database options file name as extension
 
463
    to avoid table name to file name encoding.
 
464
  */
 
465
  (void) build_table_filename(db_opt_path, sizeof(db_opt_path),
 
466
                              db_name, "", MY_DB_OPT_FILE, 0);
 
467
 
 
468
  return load_db_opt(thd, db_opt_path, db_create_info);
 
469
}
 
470
 
 
471
 
 
472
/**
 
473
  Return default database collation.
 
474
 
 
475
  @param thd     Thread context.
 
476
  @param db_name Database name.
 
477
 
 
478
  @return CHARSET_INFO object. The operation always return valid character
 
479
    set, even if the database does not exist.
 
480
*/
 
481
 
 
482
const CHARSET_INFO *get_default_db_collation(THD *thd, const char *db_name)
 
483
{
 
484
  HA_CREATE_INFO db_info;
 
485
 
 
486
  if (thd->db != NULL && strcmp(db_name, thd->db) == 0)
 
487
    return thd->db_charset;
 
488
 
 
489
  load_db_opt_by_name(thd, db_name, &db_info);
 
490
 
 
491
  /*
 
492
    NOTE: even if load_db_opt_by_name() fails,
 
493
    db_info.default_table_charset contains valid character set
 
494
    (collation_server). We should not fail if load_db_opt_by_name() fails,
 
495
    because it is valid case. If a database has been created just by
 
496
    "mkdir", it does not contain db.opt file, but it is valid database.
 
497
  */
 
498
 
 
499
  return db_info.default_table_charset;
 
500
}
 
501
 
61
502
 
62
503
/*
63
504
  Create a database
64
505
 
65
506
  SYNOPSIS
66
 
  create_db()
67
 
  session               Thread handler
 
507
  mysql_create_db()
 
508
  thd           Thread handler
68
509
  db            Name of database to create
69
510
                Function assumes that this is already validated.
70
511
  create_info   Database create options (like character set)
 
512
  silent        Used by replication when internally creating a database.
 
513
                In this case the entry should not be logged.
71
514
 
72
515
  SIDE-EFFECTS
73
516
   1. Report back to client that command succeeded (my_ok)
74
517
   2. Report errors to client
75
518
   3. Log event to binary log
 
519
   (The 'silent' flags turns off 1 and 3.)
76
520
 
77
521
  RETURN VALUES
78
522
  false ok
80
524
 
81
525
*/
82
526
 
83
 
bool create(Session &session, const message::Schema &schema_message, const bool is_if_not_exists)
 
527
int mysql_create_db(THD *thd, char *db, HA_CREATE_INFO *create_info, bool silent)
84
528
{
85
 
  TransactionServices &transaction_services= TransactionServices::singleton();
86
 
  bool error= false;
 
529
  char   path[FN_REFLEN+16];
 
530
  char   tmp_query[FN_REFLEN+16];
 
531
  long result= 1;
 
532
  int error= 0;
 
533
  struct stat stat_info;
 
534
  uint32_t create_options= create_info ? create_info->options : 0;
 
535
  uint32_t path_len;
 
536
 
 
537
  /* do not create 'information_schema' db */
 
538
  if (!my_strcasecmp(system_charset_info, db, INFORMATION_SCHEMA_NAME.str))
 
539
  {
 
540
    my_error(ER_DB_CREATE_EXISTS, MYF(0), db);
 
541
    return(-1);
 
542
  }
87
543
 
88
544
  /*
89
545
    Do not create database if another thread is holding read lock.
90
 
    Wait for global read lock before acquiring session->catalog()->schemaLock().
 
546
    Wait for global read lock before acquiring LOCK_mysql_create_db.
91
547
    After wait_if_global_read_lock() we have protection against another
92
 
    global read lock. If we would acquire session->catalog()->schemaLock() first,
 
548
    global read lock. If we would acquire LOCK_mysql_create_db first,
93
549
    another thread could step in and get the global read lock before we
94
550
    reach wait_if_global_read_lock(). If this thread tries the same as we
95
 
    (admin a db), it would then go and wait on session->catalog()->schemaLock()...
 
551
    (admin a db), it would then go and wait on LOCK_mysql_create_db...
96
552
    Furthermore wait_if_global_read_lock() checks if the current thread
97
553
    has the global read lock and refuses the operation with
98
554
    ER_CANT_UPDATE_WITH_READLOCK if applicable.
99
555
  */
100
 
  if (session.wait_if_global_read_lock(false, true))
101
 
  {
102
 
    return false;
103
 
  }
104
 
 
105
 
  assert(schema_message.has_name());
106
 
  assert(schema_message.has_collation());
107
 
 
108
 
  // @todo push this lock down into the engine
109
 
  {
110
 
    boost::mutex::scoped_lock scopedLock(session.catalog().schemaLock());
111
 
 
112
 
    // Check to see if it exists already.  
113
 
    identifier::Schema schema_identifier(schema_message.name());
114
 
    if (plugin::StorageEngine::doesSchemaExist(schema_identifier))
115
 
    {
116
 
      if (not is_if_not_exists)
117
 
      {
118
 
        my_error(ER_DB_CREATE_EXISTS, schema_identifier);
119
 
        error= true;
120
 
      }
121
 
      else
122
 
      {
123
 
        push_warning_printf(&session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
124
 
                            ER_DB_CREATE_EXISTS, ER(ER_DB_CREATE_EXISTS),
125
 
                            schema_message.name().c_str());
126
 
        session.my_ok();
127
 
      }
128
 
    }
129
 
    else if (not plugin::StorageEngine::createSchema(schema_message)) // Try to create it 
130
 
    {
131
 
      my_error(ER_CANT_CREATE_DB, MYF(0), schema_message.name().c_str(), errno);
132
 
      error= true;
133
 
    }
134
 
    else // Created !
135
 
    {
136
 
      transaction_services.createSchema(session, schema_message);
137
 
      session.my_ok(1);
138
 
    }
139
 
  }
140
 
  session.startWaitingGlobalReadLock();
141
 
 
142
 
  return error;
 
556
  if (wait_if_global_read_lock(thd, 0, 1))
 
557
  {
 
558
    error= -1;
 
559
    goto exit2;
 
560
  }
 
561
 
 
562
  pthread_mutex_lock(&LOCK_mysql_create_db);
 
563
 
 
564
  /* Check directory */
 
565
  path_len= build_table_filename(path, sizeof(path), db, "", "", 0);
 
566
  path[path_len-1]= 0;                    // Remove last '/' from path
 
567
 
 
568
  if (!stat(path,&stat_info))
 
569
  {
 
570
    if (!(create_options & HA_LEX_CREATE_IF_NOT_EXISTS))
 
571
    {
 
572
      my_error(ER_DB_CREATE_EXISTS, MYF(0), db);
 
573
      error= -1;
 
574
      goto exit;
 
575
    }
 
576
    push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
577
                        ER_DB_CREATE_EXISTS, ER(ER_DB_CREATE_EXISTS), db);
 
578
    if (!silent)
 
579
      my_ok(thd);
 
580
    error= 0;
 
581
    goto exit;
 
582
  }
 
583
  else
 
584
  {
 
585
    if (errno != ENOENT)
 
586
    {
 
587
      my_error(EE_STAT, MYF(0), path, errno);
 
588
      goto exit;
 
589
    }
 
590
    if (my_mkdir(path,0777,MYF(0)) < 0)
 
591
    {
 
592
      my_error(ER_CANT_CREATE_DB, MYF(0), db, my_errno);
 
593
      error= -1;
 
594
      goto exit;
 
595
    }
 
596
  }
 
597
 
 
598
  path[path_len-1]= FN_LIBCHAR;
 
599
  strmake(path+path_len, MY_DB_OPT_FILE, sizeof(path)-path_len-1);
 
600
  if (write_db_opt(thd, path, db, create_info))
 
601
  {
 
602
    /*
 
603
      Could not create options file.
 
604
      Restore things to beginning.
 
605
    */
 
606
    path[path_len]= 0;
 
607
    if (rmdir(path) >= 0)
 
608
    {
 
609
      error= -1;
 
610
      goto exit;
 
611
    }
 
612
    /*
 
613
      We come here when we managed to create the database, but not the option
 
614
      file.  In this case it's best to just continue as if nothing has
 
615
      happened.  (This is a very unlikely senario)
 
616
    */
 
617
  }
 
618
  
 
619
  if (!silent)
 
620
  {
 
621
    char *query;
 
622
    uint32_t query_length;
 
623
 
 
624
    if (!thd->query)                            // Only in replication
 
625
    {
 
626
      query=         tmp_query;
 
627
      query_length= (uint) (strxmov(tmp_query,"create database `",
 
628
                                    db, "`", NULL) - tmp_query);
 
629
    }
 
630
    else
 
631
    {
 
632
      query=        thd->query;
 
633
      query_length= thd->query_length;
 
634
    }
 
635
 
 
636
    if (mysql_bin_log.is_open())
 
637
    {
 
638
      Query_log_event qinfo(thd, query, query_length, 0, 
 
639
                            /* suppress_use */ true);
 
640
 
 
641
      /*
 
642
        Write should use the database being created as the "current
 
643
        database" and not the threads current database, which is the
 
644
        default. If we do not change the "current database" to the
 
645
        database being created, the CREATE statement will not be
 
646
        replicated when using --binlog-do-db to select databases to be
 
647
        replicated. 
 
648
 
 
649
        An example (--binlog-do-db=sisyfos):
 
650
       
 
651
          CREATE DATABASE bob;        # Not replicated
 
652
          USE bob;                    # 'bob' is the current database
 
653
          CREATE DATABASE sisyfos;    # Not replicated since 'bob' is
 
654
                                      # current database.
 
655
          USE sisyfos;                # Will give error on slave since
 
656
                                      # database does not exist.
 
657
      */
 
658
      qinfo.db     = db;
 
659
      qinfo.db_len = strlen(db);
 
660
 
 
661
      /* These DDL methods and logging protected with LOCK_mysql_create_db */
 
662
      mysql_bin_log.write(&qinfo);
 
663
    }
 
664
    my_ok(thd, result);
 
665
  }
 
666
 
 
667
exit:
 
668
  pthread_mutex_unlock(&LOCK_mysql_create_db);
 
669
  start_waiting_global_read_lock(thd);
 
670
exit2:
 
671
  return(error);
143
672
}
144
673
 
145
674
 
146
675
/* db-name is already validated when we come here */
147
676
 
148
 
bool alter(Session &session,
149
 
           const message::Schema &schema_message,
150
 
           const message::Schema &original_schema)
 
677
bool mysql_alter_db(THD *thd, const char *db, HA_CREATE_INFO *create_info)
151
678
{
152
 
  TransactionServices &transaction_services= TransactionServices::singleton();
 
679
  char path[FN_REFLEN+16];
 
680
  long result=1;
 
681
  int error= false;
153
682
 
154
683
  /*
155
684
    Do not alter database if another thread is holding read lock.
156
 
    Wait for global read lock before acquiring session->catalog()->schemaLock().
 
685
    Wait for global read lock before acquiring LOCK_mysql_create_db.
157
686
    After wait_if_global_read_lock() we have protection against another
158
 
    global read lock. If we would acquire session->catalog()->schemaLock() first,
 
687
    global read lock. If we would acquire LOCK_mysql_create_db first,
159
688
    another thread could step in and get the global read lock before we
160
689
    reach wait_if_global_read_lock(). If this thread tries the same as we
161
 
    (admin a db), it would then go and wait on session->catalog()->schemaLock()...
 
690
    (admin a db), it would then go and wait on LOCK_mysql_create_db...
162
691
    Furthermore wait_if_global_read_lock() checks if the current thread
163
692
    has the global read lock and refuses the operation with
164
693
    ER_CANT_UPDATE_WITH_READLOCK if applicable.
165
694
  */
166
 
  if ((session.wait_if_global_read_lock(false, true)))
167
 
    return false;
168
 
 
169
 
  bool success;
170
 
  {
171
 
    boost::mutex::scoped_lock scopedLock(session.catalog().schemaLock());
172
 
 
173
 
    identifier::Schema schema_idenifier(schema_message.name());
174
 
    if (not plugin::StorageEngine::doesSchemaExist(schema_idenifier))
175
 
    {
176
 
      my_error(ER_SCHEMA_DOES_NOT_EXIST, schema_idenifier);
177
 
      return false;
178
 
    }
179
 
 
180
 
    /* Change options if current database is being altered. */
181
 
    success= plugin::StorageEngine::alterSchema(schema_message);
182
 
 
183
 
    if (success)
184
 
    {
185
 
      transaction_services.alterSchema(session, original_schema, schema_message);
186
 
      session.my_ok(1);
187
 
    }
188
 
    else
189
 
    {
190
 
      my_error(ER_ALTER_SCHEMA, schema_idenifier);
191
 
    }
192
 
  }
193
 
  session.startWaitingGlobalReadLock();
194
 
 
195
 
  return success;
 
695
  if ((error=wait_if_global_read_lock(thd,0,1)))
 
696
    goto exit2;
 
697
 
 
698
  pthread_mutex_lock(&LOCK_mysql_create_db);
 
699
 
 
700
  /* 
 
701
     Recreate db options file: /dbpath/.db.opt
 
702
     We pass MY_DB_OPT_FILE as "extension" to avoid
 
703
     "table name to file name" encoding.
 
704
  */
 
705
  build_table_filename(path, sizeof(path), db, "", MY_DB_OPT_FILE, 0);
 
706
  if ((error=write_db_opt(thd, path, db, create_info)))
 
707
    goto exit;
 
708
 
 
709
  /* Change options if current database is being altered. */
 
710
 
 
711
  if (thd->db && !strcmp(thd->db,db))
 
712
  {
 
713
    thd->db_charset= create_info->default_table_charset ?
 
714
                     create_info->default_table_charset :
 
715
                     thd->variables.collation_server;
 
716
    thd->variables.collation_database= thd->db_charset;
 
717
  }
 
718
 
 
719
  if (mysql_bin_log.is_open())
 
720
  {
 
721
    Query_log_event qinfo(thd, thd->query, thd->query_length, 0,
 
722
                          /* suppress_use */ true);
 
723
 
 
724
    /*
 
725
      Write should use the database being created as the "current
 
726
      database" and not the threads current database, which is the
 
727
      default.
 
728
    */
 
729
    qinfo.db     = db;
 
730
    qinfo.db_len = strlen(db);
 
731
 
 
732
    thd->clear_error();
 
733
    /* These DDL methods and logging protected with LOCK_mysql_create_db */
 
734
    mysql_bin_log.write(&qinfo);
 
735
  }
 
736
  my_ok(thd, result);
 
737
 
 
738
exit:
 
739
  pthread_mutex_unlock(&LOCK_mysql_create_db);
 
740
  start_waiting_global_read_lock(thd);
 
741
exit2:
 
742
  return(error);
196
743
}
197
744
 
198
745
 
200
747
  Drop all tables in a database and the database itself
201
748
 
202
749
  SYNOPSIS
203
 
    rm_db()
204
 
    session                     Thread handle
 
750
    mysql_rm_db()
 
751
    thd                 Thread handle
205
752
    db                  Database name in the case given by user
206
753
                        It's already validated and set to lower case
207
754
                        (if needed) when we come here
213
760
    ERROR Error
214
761
*/
215
762
 
216
 
bool drop(Session &session, identifier::Schema &schema_identifier, const bool if_exists)
 
763
bool mysql_rm_db(THD *thd,char *db,bool if_exists, bool silent)
217
764
{
218
 
  bool error= false;
 
765
  long deleted=0;
 
766
  int error= false;
 
767
  char  path[FN_REFLEN+16];
 
768
  MY_DIR *dirp;
 
769
  uint32_t length;
 
770
  TableList* dropped_tables= 0;
 
771
 
 
772
  if (db && (strcmp(db, "information_schema") == 0))
 
773
  {
 
774
    my_error(ER_DBACCESS_DENIED_ERROR, MYF(0), "", "", INFORMATION_SCHEMA_NAME.str);
 
775
    return(true);
 
776
  }
219
777
 
220
778
  /*
221
779
    Do not drop database if another thread is holding read lock.
222
 
    Wait for global read lock before acquiring session->catalog()->schemaLock().
 
780
    Wait for global read lock before acquiring LOCK_mysql_create_db.
223
781
    After wait_if_global_read_lock() we have protection against another
224
 
    global read lock. If we would acquire session->catalog()->schemaLock() first,
 
782
    global read lock. If we would acquire LOCK_mysql_create_db first,
225
783
    another thread could step in and get the global read lock before we
226
784
    reach wait_if_global_read_lock(). If this thread tries the same as we
227
 
    (admin a db), it would then go and wait on session->catalog()->schemaLock()...
 
785
    (admin a db), it would then go and wait on LOCK_mysql_create_db...
228
786
    Furthermore wait_if_global_read_lock() checks if the current thread
229
787
    has the global read lock and refuses the operation with
230
788
    ER_CANT_UPDATE_WITH_READLOCK if applicable.
231
789
  */
232
 
  if (session.wait_if_global_read_lock(false, true))
233
 
  {
234
 
    return true;
235
 
  }
236
 
 
237
 
  do
238
 
  {
239
 
    boost::mutex::scoped_lock scopedLock(session.catalog().schemaLock());
240
 
    message::schema::shared_ptr message= plugin::StorageEngine::getSchemaDefinition(schema_identifier);
241
 
 
242
 
    /* See if the schema exists */
243
 
    if (not message)
244
 
    {
245
 
      if (if_exists)
246
 
      {
247
 
        std::string path;
248
 
        schema_identifier.getSQLPath(path);
249
 
 
250
 
        push_warning_printf(&session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
251
 
                            ER_DB_DROP_EXISTS, ER(ER_DB_DROP_EXISTS),
252
 
                            path.c_str());
253
 
      }
254
 
      else
255
 
      {
256
 
        error= true;
257
 
        my_error(ER_DB_DROP_EXISTS, schema_identifier);
258
 
        break;
259
 
      }
260
 
    }
261
 
    else
262
 
    {
263
 
      error= plugin::StorageEngine::dropSchema(session, schema_identifier, *message);
264
 
    }
265
 
 
266
 
  } while (0);
267
 
 
 
790
  if (wait_if_global_read_lock(thd, 0, 1))
 
791
  {
 
792
    error= -1;
 
793
    goto exit2;
 
794
  }
 
795
 
 
796
  pthread_mutex_lock(&LOCK_mysql_create_db);
 
797
 
 
798
  /*
 
799
    This statement will be replicated as a statement, even when using
 
800
    row-based replication. The flag will be reset at the end of the
 
801
    statement.
 
802
  */
 
803
  thd->clear_current_stmt_binlog_row_based();
 
804
 
 
805
  length= build_table_filename(path, sizeof(path), db, "", "", 0);
 
806
  my_stpcpy(path+length, MY_DB_OPT_FILE);               // Append db option file name
 
807
  del_dbopt(path);                              // Remove dboption hash entry
 
808
  path[length]= '\0';                           // Remove file name
 
809
 
 
810
  /* See if the directory exists */
 
811
  if (!(dirp= my_dir(path,MYF(MY_DONT_SORT))))
 
812
  {
 
813
    if (!if_exists)
 
814
    {
 
815
      error= -1;
 
816
      my_error(ER_DB_DROP_EXISTS, MYF(0), db);
 
817
      goto exit;
 
818
    }
 
819
    else
 
820
      push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
821
                          ER_DB_DROP_EXISTS, ER(ER_DB_DROP_EXISTS), db);
 
822
  }
 
823
  else
 
824
  {
 
825
    pthread_mutex_lock(&LOCK_open);
 
826
    remove_db_from_cache(db);
 
827
    pthread_mutex_unlock(&LOCK_open);
 
828
 
 
829
    
 
830
    error= -1;
 
831
    if ((deleted= mysql_rm_known_files(thd, dirp, db, path, 0,
 
832
                                       &dropped_tables)) >= 0)
 
833
    {
 
834
      ha_drop_database(path);
 
835
      error = 0;
 
836
    }
 
837
  }
 
838
  if (!silent && deleted>=0)
 
839
  {
 
840
    const char *query;
 
841
    uint32_t query_length;
 
842
    if (!thd->query)
 
843
    {
 
844
      /* The client used the old obsolete mysql_drop_db() call */
 
845
      query= path;
 
846
      query_length= (uint) (strxmov(path, "drop database `", db, "`",
 
847
                                     NULL) - path);
 
848
    }
 
849
    else
 
850
    {
 
851
      query =thd->query;
 
852
      query_length= thd->query_length;
 
853
    }
 
854
    if (mysql_bin_log.is_open())
 
855
    {
 
856
      Query_log_event qinfo(thd, query, query_length, 0, 
 
857
                            /* suppress_use */ true);
 
858
      /*
 
859
        Write should use the database being created as the "current
 
860
        database" and not the threads current database, which is the
 
861
        default.
 
862
      */
 
863
      qinfo.db     = db;
 
864
      qinfo.db_len = strlen(db);
 
865
 
 
866
      thd->clear_error();
 
867
      /* These DDL methods and logging protected with LOCK_mysql_create_db */
 
868
      mysql_bin_log.write(&qinfo);
 
869
    }
 
870
    thd->clear_error();
 
871
    thd->server_status|= SERVER_STATUS_DB_DROPPED;
 
872
    my_ok(thd, (uint32_t) deleted);
 
873
    thd->server_status&= ~SERVER_STATUS_DB_DROPPED;
 
874
  }
 
875
  else if (mysql_bin_log.is_open())
 
876
  {
 
877
    char *query, *query_pos, *query_end, *query_data_start;
 
878
    TableList *tbl;
 
879
    uint32_t db_len;
 
880
 
 
881
    if (!(query= (char*) thd->alloc(MAX_DROP_TABLE_Q_LEN)))
 
882
      goto exit; /* not much else we can do */
 
883
    query_pos= query_data_start= my_stpcpy(query,"drop table ");
 
884
    query_end= query + MAX_DROP_TABLE_Q_LEN;
 
885
    db_len= strlen(db);
 
886
 
 
887
    for (tbl= dropped_tables; tbl; tbl= tbl->next_local)
 
888
    {
 
889
      uint32_t tbl_name_len;
 
890
 
 
891
      /* 3 for the quotes and the comma*/
 
892
      tbl_name_len= strlen(tbl->table_name) + 3;
 
893
      if (query_pos + tbl_name_len + 1 >= query_end)
 
894
      {
 
895
        /* These DDL methods and logging protected with LOCK_mysql_create_db */
 
896
        write_to_binlog(thd, query, query_pos -1 - query, db, db_len);
 
897
        query_pos= query_data_start;
 
898
      }
 
899
 
 
900
      *query_pos++ = '`';
 
901
      query_pos= my_stpcpy(query_pos,tbl->table_name);
 
902
      *query_pos++ = '`';
 
903
      *query_pos++ = ',';
 
904
    }
 
905
 
 
906
    if (query_pos != query_data_start)
 
907
    {
 
908
      /* These DDL methods and logging protected with LOCK_mysql_create_db */
 
909
      write_to_binlog(thd, query, query_pos -1 - query, db, db_len);
 
910
    }
 
911
  }
 
912
 
 
913
exit:
268
914
  /*
269
915
    If this database was the client's selected database, we silently
270
916
    change the client's selected database to nothing (to have an empty
271
 
    SELECT DATABASE() in the future). For this we free() session->db and set
 
917
    SELECT DATABASE() in the future). For this we free() thd->db and set
272
918
    it to 0.
273
919
  */
274
 
  if (not error and schema_identifier.compare(*session.schema()))
275
 
    change_db_impl(session);
276
 
 
277
 
  session.startWaitingGlobalReadLock();
278
 
 
279
 
  return error;
280
 
}
 
920
  if (thd->db && !strcmp(thd->db, db))
 
921
    mysql_change_db_impl(thd, NULL, thd->variables.collation_server);
 
922
  pthread_mutex_unlock(&LOCK_mysql_create_db);
 
923
  start_waiting_global_read_lock(thd);
 
924
exit2:
 
925
  return(error);
 
926
}
 
927
 
 
928
/*
 
929
  Removes files with known extensions plus.
 
930
  thd MUST be set when calling this function!
 
931
*/
 
932
 
 
933
static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *db,
 
934
                                 const char *org_path, uint32_t level,
 
935
                                 TableList **dropped_tables)
 
936
{
 
937
  long deleted=0;
 
938
  uint32_t found_other_files=0;
 
939
  char filePath[FN_REFLEN];
 
940
  TableList *tot_list=0, **tot_list_next;
 
941
 
 
942
  tot_list_next= &tot_list;
 
943
 
 
944
  for (uint32_t idx=0 ;
 
945
       idx < (uint) dirp->number_off_files && !thd->killed ;
 
946
       idx++)
 
947
  {
 
948
    FILEINFO *file=dirp->dir_entry+idx;
 
949
    char *extension;
 
950
 
 
951
    /* skiping . and .. */
 
952
    if (file->name[0] == '.' && (!file->name[1] ||
 
953
       (file->name[1] == '.' &&  !file->name[2])))
 
954
      continue;
 
955
 
 
956
    if (!(extension= strrchr(file->name, '.')))
 
957
      extension= strchr(file->name, '\0');
 
958
    if (find_type(extension, &deletable_extentions,1+2) <= 0)
 
959
    {
 
960
      if (find_type(extension, ha_known_exts(),1+2) <= 0)
 
961
        found_other_files++;
 
962
      continue;
 
963
    }
 
964
    /* just for safety we use files_charset_info */
 
965
    if (db && !my_strcasecmp(files_charset_info,
 
966
                             extension, reg_ext))
 
967
    {
 
968
      /* Drop the table nicely */
 
969
      *extension= 0;                    // Remove extension
 
970
      TableList *table_list=(TableList*)
 
971
                              thd->calloc(sizeof(*table_list) + 
 
972
                                          strlen(db) + 1 +
 
973
                                          MYSQL50_TABLE_NAME_PREFIX_LENGTH + 
 
974
                                          strlen(file->name) + 1);
 
975
 
 
976
      if (!table_list)
 
977
        goto err;
 
978
      table_list->db= (char*) (table_list+1);
 
979
      table_list->table_name= my_stpcpy(table_list->db, db) + 1;
 
980
      filename_to_tablename(file->name, table_list->table_name,
 
981
                            MYSQL50_TABLE_NAME_PREFIX_LENGTH +
 
982
                            strlen(file->name) + 1);
 
983
      table_list->alias= table_list->table_name;        // If lower_case_table_names=2
 
984
      table_list->internal_tmp_table= is_prefix(file->name, tmp_file_prefix);
 
985
      /* Link into list */
 
986
      (*tot_list_next)= table_list;
 
987
      tot_list_next= &table_list->next_local;
 
988
      deleted++;
 
989
    }
 
990
    else
 
991
    {
 
992
      strxmov(filePath, org_path, "/", file->name, NULL);
 
993
      if (my_delete_with_symlink(filePath,MYF(MY_WME)))
 
994
      {
 
995
        goto err;
 
996
      }
 
997
    }
 
998
  }
 
999
  if (thd->killed ||
 
1000
      (tot_list && mysql_rm_table_part2(thd, tot_list, 1, 0, 1, 1)))
 
1001
    goto err;
 
1002
 
 
1003
  my_dirend(dirp);  
 
1004
  
 
1005
  if (dropped_tables)
 
1006
    *dropped_tables= tot_list;
 
1007
  
 
1008
  /*
 
1009
    If the directory is a symbolic link, remove the link first, then
 
1010
    remove the directory the symbolic link pointed at
 
1011
  */
 
1012
  if (found_other_files)
 
1013
  {
 
1014
    my_error(ER_DB_DROP_RMDIR, MYF(0), org_path, EEXIST);
 
1015
    return(-1);
 
1016
  }
 
1017
  else
 
1018
  {
 
1019
    /* Don't give errors if we can't delete 'RAID' directory */
 
1020
    if (rm_dir_w_symlink(org_path, level == 0))
 
1021
      return(-1);
 
1022
  }
 
1023
 
 
1024
  return(deleted);
 
1025
 
 
1026
err:
 
1027
  my_dirend(dirp);
 
1028
  return(-1);
 
1029
}
 
1030
 
 
1031
 
 
1032
/*
 
1033
  Remove directory with symlink
 
1034
 
 
1035
  SYNOPSIS
 
1036
    rm_dir_w_symlink()
 
1037
    org_path    path of derictory
 
1038
    send_error  send errors
 
1039
  RETURN
 
1040
    0 OK
 
1041
    1 ERROR
 
1042
*/
 
1043
 
 
1044
static bool rm_dir_w_symlink(const char *org_path, bool send_error)
 
1045
{
 
1046
  char tmp_path[FN_REFLEN], *pos;
 
1047
  char *path= tmp_path;
 
1048
  unpack_filename(tmp_path, org_path);
 
1049
#ifdef HAVE_READLINK
 
1050
  int error;
 
1051
  char tmp2_path[FN_REFLEN];
 
1052
 
 
1053
  /* Remove end FN_LIBCHAR as this causes problem on Linux in readlink */
 
1054
  pos= strchr(path, '\0');
 
1055
  if (pos > path && pos[-1] == FN_LIBCHAR)
 
1056
    *--pos=0;
 
1057
 
 
1058
  if ((error= my_readlink(tmp2_path, path, MYF(MY_WME))) < 0)
 
1059
    return(1);
 
1060
  if (!error)
 
1061
  {
 
1062
    if (my_delete(path, MYF(send_error ? MY_WME : 0)))
 
1063
    {
 
1064
      return(send_error);
 
1065
    }
 
1066
    /* Delete directory symbolic link pointed at */
 
1067
    path= tmp2_path;
 
1068
  }
 
1069
#endif
 
1070
  /* Remove last FN_LIBCHAR to not cause a problem on OS/2 */
 
1071
  pos= strchr(path, '\0');
 
1072
 
 
1073
  if (pos > path && pos[-1] == FN_LIBCHAR)
 
1074
    *--pos=0;
 
1075
  if (rmdir(path) < 0 && send_error)
 
1076
  {
 
1077
    my_error(ER_DB_DROP_RMDIR, MYF(0), path, errno);
 
1078
    return(1);
 
1079
  }
 
1080
  return(0);
 
1081
}
 
1082
 
 
1083
 
 
1084
/**
 
1085
  @brief Internal implementation: switch current database to a valid one.
 
1086
 
 
1087
  @param thd            Thread context.
 
1088
  @param new_db_name    Name of the database to switch to. The function will
 
1089
                        take ownership of the name (the caller must not free
 
1090
                        the allocated memory). If the name is NULL, we're
 
1091
                        going to switch to NULL db.
 
1092
  @param new_db_charset Character set of the new database.
 
1093
*/
 
1094
 
 
1095
static void mysql_change_db_impl(THD *thd,
 
1096
                                 LEX_STRING *new_db_name,
 
1097
                                 const CHARSET_INFO * const new_db_charset)
 
1098
{
 
1099
  /* 1. Change current database in THD. */
 
1100
 
 
1101
  if (new_db_name == NULL)
 
1102
  {
 
1103
    /*
 
1104
      THD::set_db() does all the job -- it frees previous database name and
 
1105
      sets the new one.
 
1106
    */
 
1107
 
 
1108
    thd->set_db(NULL, 0);
 
1109
  }
 
1110
  else if (new_db_name == &INFORMATION_SCHEMA_NAME)
 
1111
  {
 
1112
    /*
 
1113
      Here we must use THD::set_db(), because we want to copy
 
1114
      INFORMATION_SCHEMA_NAME constant.
 
1115
    */
 
1116
 
 
1117
    thd->set_db(INFORMATION_SCHEMA_NAME.str, INFORMATION_SCHEMA_NAME.length);
 
1118
  }
 
1119
  else
 
1120
  {
 
1121
    /*
 
1122
      Here we already have a copy of database name to be used in THD. So,
 
1123
      we just call THD::reset_db(). Since THD::reset_db() does not releases
 
1124
      the previous database name, we should do it explicitly.
 
1125
    */
 
1126
 
 
1127
    if (thd->db)
 
1128
      free(thd->db);
 
1129
 
 
1130
    thd->reset_db(new_db_name->str, new_db_name->length);
 
1131
  }
 
1132
 
 
1133
  /* 3. Update db-charset environment variables. */
 
1134
 
 
1135
  thd->db_charset= new_db_charset;
 
1136
  thd->variables.collation_database= new_db_charset;
 
1137
}
 
1138
 
 
1139
 
 
1140
 
 
1141
/**
 
1142
  Backup the current database name before switch.
 
1143
 
 
1144
  @param[in]      thd             thread handle
 
1145
  @param[in, out] saved_db_name   IN: "str" points to a buffer where to store
 
1146
                                  the old database name, "length" contains the
 
1147
                                  buffer size
 
1148
                                  OUT: if the current (default) database is
 
1149
                                  not NULL, its name is copied to the
 
1150
                                  buffer pointed at by "str"
 
1151
                                  and "length" is updated accordingly.
 
1152
                                  Otherwise "str" is set to NULL and
 
1153
                                  "length" is set to 0.
 
1154
*/
 
1155
 
 
1156
static void backup_current_db_name(THD *thd,
 
1157
                                   LEX_STRING *saved_db_name)
 
1158
{
 
1159
  if (!thd->db)
 
1160
  {
 
1161
    /* No current (default) database selected. */
 
1162
 
 
1163
    saved_db_name->str= NULL;
 
1164
    saved_db_name->length= 0;
 
1165
  }
 
1166
  else
 
1167
  {
 
1168
    strmake(saved_db_name->str, thd->db, saved_db_name->length - 1);
 
1169
    saved_db_name->length= thd->db_length;
 
1170
  }
 
1171
}
 
1172
 
 
1173
 
 
1174
/**
 
1175
  Return true if db1_name is equal to db2_name, false otherwise.
 
1176
 
 
1177
  The function allows to compare database names according to the MySQL
 
1178
  rules. The database names db1 and db2 are equal if:
 
1179
     - db1 is NULL and db2 is NULL;
 
1180
     or
 
1181
     - db1 is not-NULL, db2 is not-NULL, db1 is equal (ignoring case) to
 
1182
       db2 in system character set (UTF8).
 
1183
*/
 
1184
 
 
1185
static inline bool
 
1186
cmp_db_names(const char *db1_name,
 
1187
             const char *db2_name)
 
1188
{
 
1189
  return
 
1190
         /* db1 is NULL and db2 is NULL */
 
1191
         (!db1_name && !db2_name) ||
 
1192
 
 
1193
         /* db1 is not-NULL, db2 is not-NULL, db1 == db2. */
 
1194
         (db1_name && db2_name && my_strcasecmp(system_charset_info, db1_name, db2_name) == 0);
 
1195
}
 
1196
 
281
1197
 
282
1198
/**
283
1199
  @brief Change the current database and its attributes unconditionally.
284
1200
 
285
 
  @param session          thread handle
 
1201
  @param thd          thread handle
286
1202
  @param new_db_name  database name
287
1203
  @param force_switch if force_switch is false, then the operation will fail if
288
1204
 
308
1224
                          @@collation_server, but the operation will fail;
309
1225
 
310
1226
                        - user privileges will not be checked
311
 
                          (Session::db_access however is updated);
 
1227
                          (THD::db_access however is updated);
312
1228
 
313
1229
                          TODO: is this really the intention?
314
1230
                                (see sp-security.test).
321
1237
  @details The function checks that the database name corresponds to a
322
1238
  valid and existent database, checks access rights and changes the current
323
1239
  database with database attributes (@@collation_database session variable,
324
 
  Session::db_access).
 
1240
  THD::db_access).
325
1241
 
326
1242
  This function is not the only way to switch the database that is
327
1243
  currently employed. When the replication slave thread switches the
328
 
  database before executing a query, it calls session->set_db directly.
 
1244
  database before executing a query, it calls thd->set_db directly.
329
1245
  However, if the query, in turn, uses a stored routine, the stored routine
330
1246
  will use this function, even if it's run on the slave.
331
1247
 
341
1257
    @retval true  Error
342
1258
*/
343
1259
 
344
 
bool change(Session &session, identifier::Schema &schema_identifier)
 
1260
bool mysql_change_db(THD *thd, const LEX_STRING *new_db_name, bool force_switch)
345
1261
{
346
 
 
347
 
  if (not plugin::Authorization::isAuthorized(*session.user(), schema_identifier))
348
 
  {
349
 
    /* Error message is set in isAuthorized */
350
 
    return true;
351
 
  }
352
 
 
353
 
  if (not check(session, schema_identifier))
354
 
  {
355
 
    my_error(ER_WRONG_DB_NAME, schema_identifier);
356
 
 
357
 
    return true;
358
 
  }
359
 
 
360
 
  if (not plugin::StorageEngine::doesSchemaExist(schema_identifier))
361
 
  {
362
 
    my_error(ER_BAD_DB_ERROR, schema_identifier);
363
 
 
364
 
    /* The operation failed. */
365
 
 
366
 
    return true;
367
 
  }
368
 
 
369
 
  change_db_impl(session, schema_identifier);
370
 
 
371
 
  return false;
 
1262
  LEX_STRING new_db_file_name;
 
1263
  const CHARSET_INFO *db_default_cl;
 
1264
 
 
1265
  if (new_db_name == NULL ||
 
1266
      new_db_name->length == 0)
 
1267
  {
 
1268
    if (force_switch)
 
1269
    {
 
1270
      /*
 
1271
        This can happen only if we're switching the current database back
 
1272
        after loading stored program. The thing is that loading of stored
 
1273
        program can happen when there is no current database.
 
1274
 
 
1275
        TODO: actually, new_db_name and new_db_name->str seem to be always
 
1276
        non-NULL. In case of stored program, new_db_name->str == "" and
 
1277
        new_db_name->length == 0.
 
1278
      */
 
1279
 
 
1280
      mysql_change_db_impl(thd, NULL, thd->variables.collation_server);
 
1281
 
 
1282
      return(false);
 
1283
    }
 
1284
    else
 
1285
    {
 
1286
      my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
 
1287
 
 
1288
      return(true);
 
1289
    }
 
1290
  }
 
1291
 
 
1292
  if (my_strcasecmp(system_charset_info, new_db_name->str,
 
1293
                    INFORMATION_SCHEMA_NAME.str) == 0)
 
1294
  {
 
1295
    /* Switch the current database to INFORMATION_SCHEMA. */
 
1296
 
 
1297
    mysql_change_db_impl(thd, &INFORMATION_SCHEMA_NAME, system_charset_info);
 
1298
 
 
1299
    return(false);
 
1300
  }
 
1301
 
 
1302
  /*
 
1303
    Now we need to make a copy because check_db_name requires a
 
1304
    non-constant argument. Actually, it takes database file name.
 
1305
 
 
1306
    TODO: fix check_db_name().
 
1307
  */
 
1308
 
 
1309
  new_db_file_name.str= my_strndup(new_db_name->str, new_db_name->length,
 
1310
                                   MYF(MY_WME));
 
1311
  new_db_file_name.length= new_db_name->length;
 
1312
 
 
1313
  if (new_db_file_name.str == NULL)
 
1314
    return(true);                             /* the error is set */
 
1315
 
 
1316
  /*
 
1317
    NOTE: if check_db_name() fails, we should throw an error in any case,
 
1318
    even if we are called from sp_head::execute().
 
1319
 
 
1320
    It's next to impossible however to get this error when we are called
 
1321
    from sp_head::execute(). But let's switch the current database to NULL
 
1322
    in this case to be sure.
 
1323
  */
 
1324
 
 
1325
  if (check_db_name(&new_db_file_name))
 
1326
  {
 
1327
    my_error(ER_WRONG_DB_NAME, MYF(0), new_db_file_name.str);
 
1328
    free(new_db_file_name.str);
 
1329
 
 
1330
    if (force_switch)
 
1331
      mysql_change_db_impl(thd, NULL, thd->variables.collation_server);
 
1332
 
 
1333
    return(true);
 
1334
  }
 
1335
 
 
1336
  if (check_db_dir_existence(new_db_file_name.str))
 
1337
  {
 
1338
    if (force_switch)
 
1339
    {
 
1340
      /* Throw a warning and free new_db_file_name. */
 
1341
 
 
1342
      push_warning_printf(thd, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
 
1343
                          ER_BAD_DB_ERROR, ER(ER_BAD_DB_ERROR),
 
1344
                          new_db_file_name.str);
 
1345
 
 
1346
      free(new_db_file_name.str);
 
1347
 
 
1348
      /* Change db to NULL. */
 
1349
 
 
1350
      mysql_change_db_impl(thd, NULL, thd->variables.collation_server);
 
1351
 
 
1352
      /* The operation succeed. */
 
1353
 
 
1354
      return(false);
 
1355
    }
 
1356
    else
 
1357
    {
 
1358
      /* Report an error and free new_db_file_name. */
 
1359
 
 
1360
      my_error(ER_BAD_DB_ERROR, MYF(0), new_db_file_name.str);
 
1361
      free(new_db_file_name.str);
 
1362
 
 
1363
      /* The operation failed. */
 
1364
 
 
1365
      return(true);
 
1366
    }
 
1367
  }
 
1368
 
 
1369
  /*
 
1370
    NOTE: in mysql_change_db_impl() new_db_file_name is assigned to THD
 
1371
    attributes and will be freed in THD::~THD().
 
1372
  */
 
1373
 
 
1374
  db_default_cl= get_default_db_collation(thd, new_db_file_name.str);
 
1375
 
 
1376
  mysql_change_db_impl(thd, &new_db_file_name, db_default_cl);
 
1377
 
 
1378
  return(false);
372
1379
}
373
1380
 
 
1381
 
374
1382
/**
375
 
  @brief Internal implementation: switch current database to a valid one.
 
1383
  Change the current database and its attributes if needed.
376
1384
 
377
 
  @param session            Thread context.
378
 
  @param new_db_name    Name of the database to switch to. The function will
379
 
                        take ownership of the name (the caller must not free
380
 
                        the allocated memory). If the name is NULL, we're
381
 
                        going to switch to NULL db.
382
 
  @param new_db_charset Character set of the new database.
 
1385
  @param          thd             thread handle
 
1386
  @param          new_db_name     database name
 
1387
  @param[in, out] saved_db_name   IN: "str" points to a buffer where to store
 
1388
                                  the old database name, "length" contains the
 
1389
                                  buffer size
 
1390
                                  OUT: if the current (default) database is
 
1391
                                  not NULL, its name is copied to the
 
1392
                                  buffer pointed at by "str"
 
1393
                                  and "length" is updated accordingly.
 
1394
                                  Otherwise "str" is set to NULL and
 
1395
                                  "length" is set to 0.
 
1396
  @param          force_switch    @see mysql_change_db()
 
1397
  @param[out]     cur_db_changed  out-flag to indicate whether the current
 
1398
                                  database has been changed (valid only if
 
1399
                                  the function suceeded)
383
1400
*/
384
1401
 
385
 
static void change_db_impl(Session &session, identifier::Schema &schema_identifier)
386
 
{
387
 
  /* 1. Change current database in Session. */
388
 
 
389
 
#if 0
390
 
  if (new_db_name == NULL)
391
 
  {
392
 
    /*
393
 
      Session::set_db() does all the job -- it frees previous database name and
394
 
      sets the new one.
395
 
    */
396
 
 
397
 
    session->set_db(NULL, 0);
398
 
  }
399
 
  else
400
 
#endif
401
 
  {
402
 
    /*
403
 
      Here we already have a copy of database name to be used in Session. So,
404
 
      we just call Session::reset_db(). Since Session::reset_db() does not releases
405
 
      the previous database name, we should do it explicitly.
406
 
    */
407
 
 
408
 
    session.set_db(schema_identifier.getSchemaName());
409
 
  }
410
 
}
411
 
 
412
 
static void change_db_impl(Session &session)
413
 
{
414
 
  session.set_db(string());
415
 
}
 
1402
bool mysql_opt_change_db(THD *thd,
 
1403
                         const LEX_STRING *new_db_name,
 
1404
                         LEX_STRING *saved_db_name,
 
1405
                         bool force_switch,
 
1406
                         bool *cur_db_changed)
 
1407
{
 
1408
  *cur_db_changed= !cmp_db_names(thd->db, new_db_name->str);
 
1409
 
 
1410
  if (!*cur_db_changed)
 
1411
    return false;
 
1412
 
 
1413
  backup_current_db_name(thd, saved_db_name);
 
1414
 
 
1415
  return mysql_change_db(thd, new_db_name, force_switch);
 
1416
}
 
1417
 
416
1418
 
417
1419
/*
418
 
  Check if database name is valid
419
 
 
420
 
  SYNPOSIS
421
 
    check()
422
 
    org_name            Name of database and length
423
 
 
424
 
  RETURN
425
 
    false error
426
 
    true ok
 
1420
  Check if there is directory for the database name.
 
1421
 
 
1422
  SYNOPSIS
 
1423
    check_db_dir_existence()
 
1424
    db_name   database name
 
1425
 
 
1426
  RETURN VALUES
 
1427
    false   There is directory for the specified database name.
 
1428
    true    The directory does not exist.
427
1429
*/
428
1430
 
429
 
bool check(Session &session, identifier::Schema &schema_identifier)
 
1431
bool check_db_dir_existence(const char *db_name)
430
1432
{
431
 
  if (not plugin::Authorization::isAuthorized(*session.user(), schema_identifier))
432
 
  {
433
 
    return false;
434
 
  }
435
 
 
436
 
  return schema_identifier.isValid();
 
1433
  char db_dir_path[FN_REFLEN];
 
1434
  uint32_t db_dir_path_len;
 
1435
 
 
1436
  db_dir_path_len= build_table_filename(db_dir_path, sizeof(db_dir_path),
 
1437
                                        db_name, "", "", 0);
 
1438
 
 
1439
  if (db_dir_path_len && db_dir_path[db_dir_path_len - 1] == FN_LIBCHAR)
 
1440
    db_dir_path[db_dir_path_len - 1]= 0;
 
1441
 
 
1442
  /* Check access. */
 
1443
 
 
1444
  return my_access(db_dir_path, F_OK);
437
1445
}
438
 
 
439
 
} /* namespace schema */
440
 
 
441
 
} /* namespace drizzled */