64
typedef hash_map<std::string, plugin::StorageEngine *> EngineMap;
65
typedef std::vector<plugin::StorageEngine *> EngineVector;
67
67
static EngineVector vector_of_engines;
68
static EngineVector vector_of_transactional_engines;
68
static EngineVector vector_of_schema_engines;
70
const std::string plugin::UNKNOWN_STRING("UNKNOWN");
71
const std::string plugin::DEFAULT_DEFINITION_FILE_EXT(".dfe");
70
const std::string UNKNOWN_STRING("UNKNOWN");
71
const std::string DEFAULT_DEFINITION_FILE_EXT(".dfe");
73
73
static std::set<std::string> set_of_table_definition_ext;
75
plugin::StorageEngine::StorageEngine(const string name_arg,
76
const bitset<HTON_BIT_SIZE> &flags_arg,
75
StorageEngine::StorageEngine(const string name_arg,
76
const bitset<HTON_BIT_SIZE> &flags_arg)
78
77
: Plugin(name_arg, "StorageEngine"),
79
two_phase_commit(support_2pc),
78
MonitoredInTransaction(), /* This gives the storage engine a "slot" or ID */
90
81
pthread_mutex_init(&proto_cache_mutex, NULL);
94
plugin::StorageEngine::~StorageEngine()
84
StorageEngine::~StorageEngine()
96
86
pthread_mutex_destroy(&proto_cache_mutex);
99
void plugin::StorageEngine::setTransactionReadWrite(Session& session)
89
void StorageEngine::setTransactionReadWrite(Session& session)
101
Ha_trx_info *ha_info= session.getEngineInfo(this);
104
When a storage engine method is called, the transaction must
105
have been started, unless it's a DDL call, for which the
106
storage engine starts the transaction internally, and commits
107
it internally, without registering in the ha_list.
108
Unfortunately here we can't know know for sure if the engine
109
has registered the transaction or not, so we must check.
111
if (ha_info->is_started())
114
* table_share can be NULL in plugin::StorageEngine::dropTable().
116
ha_info->set_trx_read_write();
91
TransactionContext &statement_ctx= session.transaction.stmt;
92
statement_ctx.markModifiedNonTransData();
122
int plugin::StorageEngine::doRenameTable(Session *,
95
int StorageEngine::doRenameTable(Session *,
127
100
for (const char **ext= bas_ext(); *ext ; ext++)
308
281
don't bother to rollback here, it's done already
310
void plugin::StorageEngine::closeConnection(Session* session)
283
void StorageEngine::closeConnection(Session* session)
312
285
for_each(vector_of_engines.begin(), vector_of_engines.end(),
313
286
StorageEngineCloseConnection(session));
316
void plugin::StorageEngine::dropDatabase(char* path)
318
for_each(vector_of_engines.begin(), vector_of_engines.end(),
319
bind2nd(mem_fun(&plugin::StorageEngine::drop_database),path));
322
int plugin::StorageEngine::commitOrRollbackByXID(XID *xid, bool commit)
327
transform(vector_of_engines.begin(), vector_of_engines.end(), results.begin(),
328
bind2nd(mem_fun(&plugin::StorageEngine::commit_by_xid),xid));
330
transform(vector_of_engines.begin(), vector_of_engines.end(), results.begin(),
331
bind2nd(mem_fun(&plugin::StorageEngine::rollback_by_xid),xid));
333
if (find_if(results.begin(), results.end(), bind2nd(equal_to<int>(),0))
341
This function should be called when MySQL sends rows of a SELECT result set
342
or the EOF mark to the client. It releases a possible adaptive hash index
343
S-latch held by session in InnoDB and also releases a possible InnoDB query
344
FIFO ticket to enter InnoDB. To save CPU time, InnoDB allows a session to
345
keep them over several calls of the InnoDB Cursor interface when a join
346
is executed. But when we let the control to pass to the client they have
347
to be released because if the application program uses mysql_use_result(),
348
it may deadlock on the S-latch if the application on another connection
349
performs another SQL query. In MySQL-4.1 this is even more important because
350
there a connection can have several SELECT queries open at the same time.
352
@param session the thread handle of the current connection
357
int plugin::StorageEngine::releaseTemporaryLatches(Session *session)
359
for_each(vector_of_transactional_engines.begin(), vector_of_transactional_engines.end(),
360
bind2nd(mem_fun(&plugin::StorageEngine::release_temporary_latches),session));
364
bool plugin::StorageEngine::flushLogs(plugin::StorageEngine *engine)
289
bool StorageEngine::flushLogs(StorageEngine *engine)
366
291
if (engine == NULL)
368
293
if (find_if(vector_of_engines.begin(), vector_of_engines.end(),
369
mem_fun(&plugin::StorageEngine::flush_logs))
370
!= vector_of_engines.begin())
294
mem_fun(&StorageEngine::flush_logs))
295
!= vector_of_engines.begin())
375
if ((!engine->is_enabled()) ||
376
(engine->flush_logs()))
300
if (engine->flush_logs())
383
recover() step of xa.
386
there are three modes of operation:
387
- automatic recover after a crash
388
in this case commit_list != 0, tc_heuristic_recover==0
389
all xids from commit_list are committed, others are rolled back
390
- manual (heuristic) recover
391
in this case commit_list==0, tc_heuristic_recover != 0
392
DBA has explicitly specified that all prepared transactions should
393
be committed (or rolled back).
394
- no recovery (MySQL did not detect a crash)
395
in this case commit_list==0, tc_heuristic_recover == 0
396
there should be no prepared transactions in this case.
398
class XARecover : unary_function<plugin::StorageEngine *, void>
400
int trans_len, found_foreign_xids, found_my_xids;
406
XARecover(XID *trans_list_arg, int trans_len_arg,
407
HASH *commit_list_arg, bool dry_run_arg)
408
: trans_len(trans_len_arg), found_foreign_xids(0), found_my_xids(0),
410
trans_list(trans_list_arg), commit_list(commit_list_arg),
416
return found_foreign_xids;
421
return found_my_xids;
424
result_type operator() (argument_type engine)
429
if (engine->is_enabled())
431
while ((got= engine->recover(trans_list, trans_len)) > 0 )
433
errmsg_printf(ERRMSG_LVL_INFO,
434
_("Found %d prepared transaction(s) in %s"),
435
got, engine->getName().c_str());
436
for (int i=0; i < got; i ++)
438
my_xid x=trans_list[i].get_my_xid();
439
if (!x) // not "mine" - that is generated by external TM
441
xid_cache_insert(trans_list+i, XA_PREPARED);
442
found_foreign_xids++;
452
hash_search(commit_list, (unsigned char *)&x, sizeof(x)) != 0 :
453
tc_heuristic_recover == TC_HEURISTIC_RECOVER_COMMIT)
455
engine->commit_by_xid(trans_list+i);
459
engine->rollback_by_xid(trans_list+i);
469
int plugin::StorageEngine::recover(HASH *commit_list)
471
XID *trans_list= NULL;
474
bool dry_run= (commit_list==0 && tc_heuristic_recover==0);
476
/* commit_list and tc_heuristic_recover cannot be set both */
477
assert(commit_list==0 || tc_heuristic_recover==0);
479
/* if either is set, total_ha_2pc must be set too */
480
if (total_ha_2pc <= 1)
484
#ifndef WILL_BE_DELETED_LATER
487
for now, only InnoDB supports 2pc. It means we can always safely
488
rollback all pending transactions, without risking inconsistent data
491
assert(total_ha_2pc == 2); // only InnoDB and binlog
492
tc_heuristic_recover= TC_HEURISTIC_RECOVER_ROLLBACK; // forcing ROLLBACK
495
for (trans_len= MAX_XID_LIST_SIZE ;
496
trans_list==0 && trans_len > MIN_XID_LIST_SIZE; trans_len/=2)
498
trans_list=(XID *)malloc(trans_len*sizeof(XID));
502
errmsg_printf(ERRMSG_LVL_ERROR, ER(ER_OUTOFMEMORY), trans_len*sizeof(XID));
507
errmsg_printf(ERRMSG_LVL_INFO, _("Starting crash recovery..."));
510
XARecover recover_func(trans_list, trans_len, commit_list, dry_run);
511
for_each(vector_of_transactional_engines.begin(), vector_of_transactional_engines.end(),
515
if (recover_func.getForeignXIDs())
516
errmsg_printf(ERRMSG_LVL_WARN,
517
_("Found %d prepared XA transactions"),
518
recover_func.getForeignXIDs());
519
if (dry_run && recover_func.getMyXIDs())
521
errmsg_printf(ERRMSG_LVL_ERROR,
522
_("Found %d prepared transactions! It means that drizzled "
523
"was not shut down properly last time and critical "
524
"recovery information (last binlog or %s file) was "
525
"manually deleted after a crash. You have to start "
526
"drizzled with the --tc-heuristic-recover switch to "
527
"commit or rollback pending transactions."),
528
recover_func.getMyXIDs(), opt_tc_log_file);
532
errmsg_printf(ERRMSG_LVL_INFO, _("Crash recovery finished."));
536
int plugin::StorageEngine::startConsistentSnapshot(Session *session)
538
for_each(vector_of_engines.begin(), vector_of_engines.end(),
539
bind2nd(mem_fun(&plugin::StorageEngine::start_consistent_snapshot),
544
class StorageEngineGetTableDefinition: public unary_function<plugin::StorageEngine *,bool>
306
class StorageEngineGetTableDefinition: public unary_function<StorageEngine *,bool>
546
308
Session& session;
547
309
const char* path;
549
311
const char *table_name;
550
312
const bool is_tmp;
551
message::Table *table_proto;
313
message::Table *table_message;
610
360
to ask engine if there are any new tables that should be written to disk
611
361
or any dropped tables that need to be removed from disk
613
int plugin::StorageEngine::getTableDefinition(Session& session,
614
TableIdentifier &identifier,
615
message::Table *table_proto)
363
int StorageEngine::getTableDefinition(Session& session,
364
TableIdentifier &identifier,
365
message::Table *table_message,
366
bool include_temporary_tables)
617
368
return getTableDefinition(session,
618
369
identifier.getPath(), identifier.getDBName(), identifier.getTableName(), identifier.isTmp(),
370
table_message, include_temporary_tables);
622
int plugin::StorageEngine::getTableDefinition(Session& session,
373
int StorageEngine::getTableDefinition(Session& session,
623
374
const char* path,
375
const char *schema_name,
376
const char *table_name,
627
message::Table *table_proto)
378
message::Table *table_message,
379
bool include_temporary_tables)
631
vector<plugin::StorageEngine *>::iterator iter=
383
if (include_temporary_tables)
385
if (session.doGetTableDefinition(path, schema_name, table_name, false, table_message) == EEXIST)
389
EngineVector::iterator iter=
632
390
find_if(vector_of_engines.begin(), vector_of_engines.end(),
633
StorageEngineGetTableDefinition(session, path, NULL, NULL, true, table_proto, &err));
391
StorageEngineGetTableDefinition(session, path, NULL, NULL, true, table_message, &err));
635
393
if (iter == vector_of_engines.end())
637
string proto_path(path);
638
string file_ext(".dfe");
639
proto_path.append(file_ext);
641
int error= access(proto_path.c_str(), F_OK);
650
int read_proto_err= drizzle_read_table_proto(proto_path.c_str(),
784
515
@todo refactor to remove goto
786
int plugin::StorageEngine::createTable(Session& session,
787
TableIdentifier &identifier,
788
bool update_create_info,
789
message::Table& table_proto, bool proto_used)
517
int StorageEngine::createTable(Session& session,
518
TableIdentifier &identifier,
519
bool update_create_info,
520
message::Table& table_message)
793
524
TableShare share(identifier.getDBName(), 0, identifier.getTableName(), identifier.getPath());
794
525
message::Table tmp_proto;
798
if (parse_table_proto(session, table_proto, &share))
803
if (open_table_def(session, &share))
527
if (parse_table_proto(session, table_message, &share))
807
530
if (open_table_from_share(&session, &share, "", 0, 0,
811
534
if (update_create_info)
812
table.updateCreateInfo(&table_proto);
535
table.updateCreateInfo(&table_message);
814
537
/* Check for legal operations against the Engine using the proto (if used) */
817
if (table_proto.type() == message::Table::TEMPORARY &&
818
share.storage_engine->check_flag(HTON_BIT_TEMPORARY_NOT_SUPPORTED) == true)
820
error= HA_ERR_UNSUPPORTED;
823
else if (table_proto.type() != message::Table::TEMPORARY &&
824
share.storage_engine->check_flag(HTON_BIT_TEMPORARY_ONLY) == true)
826
error= HA_ERR_UNSUPPORTED;
831
if (! share.storage_engine->is_enabled())
833
error= HA_ERR_UNSUPPORTED;
538
if (table_message.type() == message::Table::TEMPORARY &&
539
share.storage_engine->check_flag(HTON_BIT_TEMPORARY_NOT_SUPPORTED) == true)
541
error= HA_ERR_UNSUPPORTED;
544
else if (table_message.type() != message::Table::TEMPORARY &&
545
share.storage_engine->check_flag(HTON_BIT_TEMPORARY_ONLY) == true)
547
error= HA_ERR_UNSUPPORTED;
839
552
char name_buff[FN_REFLEN];
842
555
table_name_arg= share.storage_engine->checkLowercaseNames(identifier.getPath(), name_buff);
557
if (not share.storage_engine->check_flag(HTON_BIT_HAS_DATA_DICTIONARY))
559
int protoerr= StorageEngine::writeDefinitionFromPath(identifier, table_message);
844
568
share.storage_engine->setTransactionReadWrite(session);
846
570
error= share.storage_engine->doCreateTable(&session,
579
if (not share.storage_engine->check_flag(HTON_BIT_HAS_DATA_DICTIONARY))
580
plugin::StorageEngine::deleteDefinitionFromPath(identifier);
582
my_error(ER_CANT_CREATE_TABLE, MYF(ME_BELL+ME_WAITTANG), identifier.getSQLPath().c_str(), error);
853
585
table.closefrm(false);
857
char name_buff[FN_REFLEN];
858
sprintf(name_buff,"%s.%s", identifier.getDBName(), identifier.getTableName());
859
my_error(ER_CANT_CREATE_TABLE, MYF(ME_BELL+ME_WAITTANG), name_buff, error);
862
588
share.free_table_share();
863
589
return(error != 0);
866
Cursor *plugin::StorageEngine::getCursor(TableShare &share, memory::Root *alloc)
592
Cursor *StorageEngine::getCursor(TableShare &share, memory::Root *alloc)
869
594
return create(share, alloc);
873
598
TODO -> Remove this to force all engines to implement their own file. Solves the "we only looked at dfe" problem.
875
void plugin::StorageEngine::doGetTableNames(CachedDirectory &directory, string&, set<string>& set_of_names)
877
CachedDirectory::Entries entries= directory.getEntries();
879
for (CachedDirectory::Entries::iterator entry_iter= entries.begin();
880
entry_iter != entries.end(); ++entry_iter)
882
CachedDirectory::Entry *entry= *entry_iter;
883
const string *filename= &entry->filename;
885
assert(filename->size());
887
const char *ext= strchr(filename->c_str(), '.');
889
if (ext == NULL || my_strcasecmp(system_charset_info, ext, DEFAULT_DEFINITION_FILE_EXT.c_str()) ||
890
(filename->compare(0, strlen(TMP_FILE_PREFIX), TMP_FILE_PREFIX) == 0))
894
char uname[NAME_LEN + 1];
895
uint32_t file_name_len;
897
file_name_len= filename_to_tablename(filename->c_str(), uname, sizeof(uname));
898
// TODO: Remove need for memory copy here
899
uname[file_name_len - sizeof(".dfe") + 1]= '\0'; // Subtract ending, place NULL
900
set_of_names.insert(uname);
600
void StorageEngine::doGetTableNames(CachedDirectory&, string&, set<string>&)
905
603
class AddTableName :
906
public unary_function<plugin::StorageEngine *, void>
604
public unary_function<StorageEngine *, void>
909
607
CachedDirectory& directory;
910
set<string>& set_of_names;
608
TableNameList &set_of_names;
914
AddTableName(CachedDirectory& directory_arg, string& database_name, set<string>& of_names) :
612
AddTableName(CachedDirectory& directory_arg, const string& database_name, set<string>& of_names) :
915
613
directory(directory_arg),
916
614
set_of_names(of_names)
927
void plugin::StorageEngine::getTableNames(string& db, set<string>& set_of_names)
625
class AddSchemaNames :
626
public unary_function<StorageEngine *, void>
628
SchemaNameList &set_of_names;
632
AddSchemaNames(set<string>& of_names) :
633
set_of_names(of_names)
637
result_type operator() (argument_type engine)
639
engine->doGetSchemaNames(set_of_names);
643
void StorageEngine::getSchemaNames(SchemaNameList &set_of_names)
645
// Add hook here for engines to register schema.
646
for_each(vector_of_schema_engines.begin(), vector_of_schema_engines.end(),
647
AddSchemaNames(set_of_names));
649
plugin::Authorization::pruneSchemaNames(current_session->getSecurityContext(),
653
class StorageEngineGetSchemaDefinition: public unary_function<StorageEngine *, bool>
655
const std::string &schema_name;
656
message::Schema &schema_proto;
659
StorageEngineGetSchemaDefinition(const std::string &schema_name_arg,
660
message::Schema &schema_proto_arg) :
661
schema_name(schema_name_arg),
662
schema_proto(schema_proto_arg)
665
result_type operator() (argument_type engine)
667
return engine->doGetSchemaDefinition(schema_name, schema_proto);
672
Return value is "if parsed"
674
bool StorageEngine::getSchemaDefinition(const std::string &schema_name, message::Schema &proto)
678
EngineVector::iterator iter=
679
find_if(vector_of_schema_engines.begin(), vector_of_schema_engines.end(),
680
StorageEngineGetSchemaDefinition(schema_name, proto));
682
if (iter != vector_of_schema_engines.end())
690
bool StorageEngine::doesSchemaExist(const std::string &schema_name)
692
message::Schema proto;
694
return StorageEngine::getSchemaDefinition(schema_name, proto);
698
const CHARSET_INFO *StorageEngine::getSchemaCollation(const std::string &schema_name)
700
message::Schema schmema_proto;
703
found= StorageEngine::getSchemaDefinition(schema_name, schmema_proto);
705
if (found && schmema_proto.has_collation())
707
const string buffer= schmema_proto.collation();
708
const CHARSET_INFO* cs= get_charset_by_name(buffer.c_str());
712
errmsg_printf(ERRMSG_LVL_ERROR,
713
_("Error while loading database options: '%s':"), schema_name.c_str());
714
errmsg_printf(ERRMSG_LVL_ERROR, ER(ER_UNKNOWN_COLLATION), buffer.c_str());
716
return default_charset_info;
722
return default_charset_info;
726
public unary_function<StorageEngine *, void>
728
const drizzled::message::Schema &schema_message;
732
CreateSchema(const drizzled::message::Schema &arg) :
737
result_type operator() (argument_type engine)
739
// @todo eomeday check that at least one engine said "true"
740
(void)engine->doCreateSchema(schema_message);
744
bool StorageEngine::createSchema(const drizzled::message::Schema &schema_message)
746
// Add hook here for engines to register schema.
747
for_each(vector_of_schema_engines.begin(), vector_of_schema_engines.end(),
748
CreateSchema(schema_message));
754
public unary_function<StorageEngine *, void>
756
uint64_t &success_count;
757
const string &schema_name;
761
DropSchema(const string &arg, uint64_t &count_arg) :
762
success_count(count_arg),
767
result_type operator() (argument_type engine)
769
// @todo someday check that at least one engine said "true"
770
bool success= engine->doDropSchema(schema_name);
777
bool StorageEngine::dropSchema(const string &schema_name)
780
// Add hook here for engines to register schema.
781
for_each(vector_of_schema_engines.begin(), vector_of_schema_engines.end(),
782
DropSchema(schema_name, counter));
784
return counter ? true : false;
788
public unary_function<StorageEngine *, void>
790
uint64_t &success_count;
791
const drizzled::message::Schema &schema_message;
795
AlterSchema(const drizzled::message::Schema &arg, uint64_t &count_arg) :
796
success_count(count_arg),
801
result_type operator() (argument_type engine)
803
// @todo eomeday check that at least one engine said "true"
804
bool success= engine->doAlterSchema(schema_message);
811
bool StorageEngine::alterSchema(const drizzled::message::Schema &schema_message)
813
uint64_t success_count= 0;
815
for_each(vector_of_schema_engines.begin(), vector_of_schema_engines.end(),
816
AlterSchema(schema_message, success_count));
818
return success_count ? true : false;
822
void StorageEngine::getTableNames(const string &schema_name, TableNameList &set_of_names)
929
824
char tmp_path[FN_REFLEN];
931
build_table_filename(tmp_path, sizeof(tmp_path), db.c_str(), "", false);
826
build_table_filename(tmp_path, sizeof(tmp_path), schema_name.c_str(), "", false);
933
828
CachedDirectory directory(tmp_path, set_of_table_definition_ext);
935
if (db.compare("information_schema"))
830
if (not schema_name.compare("information_schema"))
832
else if (not schema_name.compare("data_dictionary"))
937
836
if (directory.fail())
939
838
errno= directory.getError();
940
839
if (errno == ENOENT)
941
my_error(ER_BAD_DB_ERROR, MYF(ME_BELL+ME_WAITTANG), db.c_str());
840
my_error(ER_BAD_DB_ERROR, MYF(ME_BELL+ME_WAITTANG), schema_name.c_str());
943
842
my_error(ER_CANT_READ_DIR, MYF(ME_BELL+ME_WAITTANG), directory.getPath(), errno);