~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/schema_engine/schema.cc

  • Committer: Monty Taylor
  • Date: 2010-04-15 19:14:53 UTC
  • mto: This revision was merged to the branch mainline in revision 1476.
  • Revision ID: mordred@inaugust.com-20100415191453-ril2x8qdo78fny9w
Replaced test_authz with a plugin implementing a hard-coded simple
multi-tennancy policy. The policy describes:
- A root user exists which can do anything
- A user may only see a schema that is named the same has his user name
- A user may see data_dictionary and information_schema (data_dictionary
  required for show databases to work)

This way, we can more clearly test the results of the authorization
interface while providing an optional plugin that is actually useful to some
human.

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
#include "drizzled/charset.h"
28
28
#include "drizzled/charset_info.h"
29
29
#include "drizzled/cursor.h"
30
 
#include "drizzled/data_home.h"
31
30
 
32
31
#include "drizzled/internal/my_sys.h"
33
32
 
45
44
using namespace std;
46
45
using namespace drizzled;
47
46
 
 
47
static SchemaIdentifier TEMPORARY_IDENTIFIER("TEMPORARY");
48
48
 
49
49
#define MY_DB_OPT_FILE "db.opt"
50
50
#define DEFAULT_FILE_EXTENSION ".dfe" // Deep Fried Elephant
58
58
  schema_cache_filled(false)
59
59
{
60
60
  table_definition_ext= DEFAULT_FILE_EXTENSION;
 
61
  pthread_rwlock_init(&schema_lock, NULL);
 
62
  prime();
61
63
}
62
64
 
63
65
Schema::~Schema()
64
66
{
 
67
  pthread_rwlock_destroy(&schema_lock);
65
68
}
66
69
 
67
70
void Schema::prime()
68
71
{
69
 
  CachedDirectory directory(getDataHomeCatalog().file_string(), CachedDirectory::DIRECTORY);
 
72
  CachedDirectory directory(drizzle_data_home, CachedDirectory::DIRECTORY);
70
73
  CachedDirectory::Entries files= directory.getEntries();
71
74
 
72
 
  mutex.lock();
 
75
  pthread_rwlock_wrlock(&schema_lock);
73
76
 
74
77
  for (CachedDirectory::Entries::iterator fileIter= files.begin();
75
78
       fileIter != files.end(); fileIter++)
77
80
    CachedDirectory::Entry *entry= *fileIter;
78
81
    message::Schema schema_message;
79
82
 
80
 
    if (not entry->filename.compare(GLOBAL_TEMPORARY_EXT))
81
 
      continue;
82
 
 
83
 
    SchemaIdentifier filename(entry->filename);
84
 
    if (readSchemaFile(filename, schema_message))
 
83
    if (readSchemaFile(entry->filename, schema_message))
85
84
    {
86
85
      SchemaIdentifier schema_identifier(schema_message.name());
87
86
 
88
87
      pair<SchemaCache::iterator, bool> ret=
89
 
        schema_cache.insert(make_pair(schema_identifier.getPath(), new message::Schema(schema_message)));
 
88
        schema_cache.insert(make_pair(schema_identifier.getPath(), schema_message));
90
89
 
91
90
      if (ret.second == false)
92
91
     {
94
93
      }
95
94
    }
96
95
  }
97
 
  mutex.unlock();
 
96
  pthread_rwlock_unlock(&schema_lock);
98
97
}
99
98
 
100
 
void Schema::doGetSchemaIdentifiers(SchemaIdentifier::vector &set_of_names)
 
99
void Schema::doGetSchemaIdentifiers(SchemaIdentifierList &set_of_names)
101
100
{
102
 
  mutex.lock_shared();
 
101
  if (not pthread_rwlock_rdlock(&schema_lock))
103
102
  {
104
103
    for (SchemaCache::iterator iter= schema_cache.begin();
105
104
         iter != schema_cache.end();
106
105
         iter++)
107
106
    {
108
 
      set_of_names.push_back(SchemaIdentifier((*iter).second->name()));
 
107
      set_of_names.push_back(SchemaIdentifier((*iter).second.name()));
109
108
    }
110
 
  }
111
 
  mutex.unlock_shared();
 
109
    pthread_rwlock_unlock(&schema_lock);
 
110
 
 
111
    return;
 
112
  }
 
113
 
 
114
  // If for some reason getting a lock should fail, we resort to disk
 
115
 
 
116
  CachedDirectory directory(drizzle_data_home, CachedDirectory::DIRECTORY);
 
117
 
 
118
  CachedDirectory::Entries files= directory.getEntries();
 
119
 
 
120
  for (CachedDirectory::Entries::iterator fileIter= files.begin();
 
121
       fileIter != files.end(); fileIter++)
 
122
  {
 
123
    CachedDirectory::Entry *entry= *fileIter;
 
124
    set_of_names.push_back(entry->filename);
 
125
  }
112
126
}
113
127
 
114
 
bool Schema::doGetSchemaDefinition(const SchemaIdentifier &schema_identifier, message::schema::shared_ptr &schema_message)
 
128
bool Schema::doGetSchemaDefinition(SchemaIdentifier &schema_identifier, message::Schema &schema_message)
115
129
{
116
 
  mutex.lock_shared();
117
 
  SchemaCache::iterator iter= schema_cache.find(schema_identifier.getPath());
118
 
 
119
 
  if (iter != schema_cache.end())
 
130
  if (not pthread_rwlock_rdlock(&schema_lock))
120
131
  {
121
 
    schema_message= (*iter).second;
122
 
    mutex.unlock_shared();
123
 
    return true;
 
132
    SchemaCache::iterator iter= schema_cache.find(schema_identifier.getPath());
 
133
 
 
134
    if (iter != schema_cache.end())
 
135
    {
 
136
      schema_message.CopyFrom(((*iter).second));
 
137
      pthread_rwlock_unlock(&schema_lock);
 
138
      return true;
 
139
    }
 
140
    pthread_rwlock_unlock(&schema_lock);
 
141
 
 
142
    return false;
124
143
  }
125
 
  mutex.unlock_shared();
126
144
 
127
 
  return false;
 
145
  // Fail to disk based means
 
146
  return readSchemaFile(schema_identifier.getPath(), schema_message);
128
147
}
129
148
 
130
 
 
131
149
bool Schema::doCreateSchema(const drizzled::message::Schema &schema_message)
132
150
{
133
151
  SchemaIdentifier schema_identifier(schema_message.name());
142
160
    return false;
143
161
  }
144
162
 
145
 
  mutex.lock();
 
163
  if (not pthread_rwlock_wrlock(&schema_lock))
146
164
  {
147
 
    pair<SchemaCache::iterator, bool> ret=
148
 
      schema_cache.insert(make_pair(schema_identifier.getPath(), new message::Schema(schema_message)));
149
 
 
150
 
 
151
 
    if (ret.second == false)
152
 
    {
153
 
      abort(); // If this has happened, something really bad is going down.
154
 
    }
 
165
      pair<SchemaCache::iterator, bool> ret=
 
166
        schema_cache.insert(make_pair(schema_identifier.getPath(), schema_message));
 
167
 
 
168
 
 
169
      if (ret.second == false)
 
170
      {
 
171
        abort(); // If this has happened, something really bad is going down.
 
172
      }
 
173
    pthread_rwlock_unlock(&schema_lock);
155
174
  }
156
 
  mutex.unlock();
157
175
 
158
176
  return true;
159
177
}
160
178
 
161
 
bool Schema::doDropSchema(const SchemaIdentifier &schema_identifier)
 
179
bool Schema::doDropSchema(SchemaIdentifier &schema_identifier)
162
180
{
163
 
  message::schema::shared_ptr schema_message;
 
181
  message::Schema schema_message;
164
182
 
165
183
  string schema_file(schema_identifier.getPath());
166
184
  schema_file.append(1, FN_LIBCHAR);
191
209
    cerr << dir;
192
210
  }
193
211
 
194
 
  mutex.lock();
195
 
  schema_cache.erase(schema_identifier.getPath());
196
 
  mutex.unlock();
 
212
  if (not pthread_rwlock_wrlock(&schema_lock))
 
213
  {
 
214
    schema_cache.erase(schema_identifier.getPath());
 
215
    pthread_rwlock_unlock(&schema_lock);
 
216
  }
197
217
 
198
218
  return true;
199
219
}
207
227
 
208
228
  if (writeSchemaFile(schema_identifier, schema_message))
209
229
  {
210
 
    mutex.lock();
 
230
    if (not pthread_rwlock_wrlock(&schema_lock))
211
231
    {
212
232
      schema_cache.erase(schema_identifier.getPath());
213
233
 
214
234
      pair<SchemaCache::iterator, bool> ret=
215
 
        schema_cache.insert(make_pair(schema_identifier.getPath(), new message::Schema(schema_message)));
 
235
        schema_cache.insert(make_pair(schema_identifier.getPath(), schema_message));
216
236
 
217
237
      if (ret.second == false)
218
238
      {
219
239
        abort(); // If this has happened, something really bad is going down.
220
240
      }
221
 
    }
222
 
    mutex.unlock();
 
241
 
 
242
      pthread_rwlock_unlock(&schema_lock);
 
243
    }
 
244
    else
 
245
    {
 
246
      abort(); // This would leave us out of sync, suck.
 
247
    }
223
248
  }
224
249
 
225
250
  return true;
230
255
 
231
256
  @note we do the rename to make it crash safe.
232
257
*/
233
 
bool Schema::writeSchemaFile(const SchemaIdentifier &schema_identifier, const message::Schema &db)
 
258
bool Schema::writeSchemaFile(SchemaIdentifier &schema_identifier, const message::Schema &db)
234
259
{
235
260
  char schema_file_tmp[FN_REFLEN];
236
261
  string schema_file(schema_identifier.getPath());
250
275
    return false;
251
276
  }
252
277
 
253
 
  bool success;
254
 
 
255
 
  try {
256
 
    success= db.SerializeToFileDescriptor(fd);
257
 
  }
258
 
  catch (...)
259
 
  {
260
 
    success= false;
261
 
  }
262
 
 
263
 
  if (not success)
264
 
  {
265
 
    my_error(ER_CORRUPT_SCHEMA_DEFINITION, MYF(0), schema_file.c_str(),
266
 
             db.InitializationErrorString().empty() ? "unknown" :  db.InitializationErrorString().c_str());
 
278
  if (not db.SerializeToFileDescriptor(fd))
 
279
  {
 
280
    my_error(ER_CORRUPT_TABLE_DEFINITION, MYF(0),
 
281
             db.InitializationErrorString().c_str());
267
282
 
268
283
    if (close(fd) == -1)
269
284
      perror(schema_file_tmp);
296
311
}
297
312
 
298
313
 
299
 
bool Schema::readSchemaFile(const drizzled::SchemaIdentifier &schema_identifier, drizzled::message::Schema &schema)
 
314
bool Schema::readSchemaFile(const std::string &schema_file_name, drizzled::message::Schema &schema_message)
300
315
{
301
 
  string db_opt_path(schema_identifier.getPath());
 
316
  string db_opt_path(schema_file_name);
302
317
 
303
318
  /*
304
319
    Pass an empty file name, and the database options file name as extension
316
331
  */
317
332
  if (input.good())
318
333
  {
319
 
    if (schema.ParseFromIstream(&input))
 
334
    if (schema_message.ParseFromIstream(&input))
320
335
    {
321
336
      return true;
322
337
    }
323
338
 
324
 
    my_error(ER_CORRUPT_SCHEMA_DEFINITION, MYF(0), db_opt_path.c_str(),
325
 
             schema.InitializationErrorString().empty() ? "unknown" :  schema.InitializationErrorString().c_str());
 
339
    my_error(ER_CORRUPT_TABLE_DEFINITION, MYF(0),
 
340
             schema_message.InitializationErrorString().c_str());
326
341
  }
327
342
  else
328
343
  {
332
347
  return false;
333
348
}
334
349
 
 
350
bool Schema::doCanCreateTable(drizzled::TableIdentifier &identifier)
 
351
{
 
352
  if (static_cast<SchemaIdentifier&>(identifier) == TEMPORARY_IDENTIFIER)
 
353
  {
 
354
    return false;
 
355
  }
 
356
 
 
357
  return true;
 
358
}
 
359
 
335
360
void Schema::doGetTableIdentifiers(drizzled::CachedDirectory&,
336
 
                                   const drizzled::SchemaIdentifier&,
337
 
                                   drizzled::TableIdentifier::vector&)
 
361
                                   drizzled::SchemaIdentifier&,
 
362
                                   drizzled::TableIdentifiers&)
338
363
{
339
364
}