1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2009 Sun Microsystems
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; version 2 of the License.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20
#include "drizzled/global.h"
26
#include <google/protobuf/io/zero_copy_stream.h>
27
#include <google/protobuf/io/zero_copy_stream_impl.h>
29
#include "mysys/my_dir.h"
30
#include "mysys/hash.h"
32
#include "drizzled/service/storage_engine.h"
33
#include "drizzled/plugin/storage_engine.h"
34
#include "drizzled/gettext.h"
35
#include "drizzled/xid.h"
36
#include "drizzled/errmsg_print.h"
37
#include "drizzled/plugin/registry.h"
38
#include "drizzled/session.h"
45
StorageEngine::StorageEngine() : all_engines() {}
46
StorageEngine::~StorageEngine() {}
48
void StorageEngine::add(plugin::StorageEngine *engine)
50
all_engines.add(engine);
53
void StorageEngine::remove(plugin::StorageEngine *engine)
55
all_engines.remove(engine);
58
plugin::StorageEngine *StorageEngine::findByName(Session *session,
62
transform(find_str.begin(), find_str.end(),
63
find_str.begin(), ::tolower);
64
string default_str("default");
65
if (find_str == default_str)
66
return ha_default_storage_engine(session);
68
plugin::StorageEngine *engine= all_engines.find(find_str);
70
if (engine && engine->is_user_selectable())
76
class StorageEngineCloseConnection
77
: public unary_function<plugin::StorageEngine *, void>
81
StorageEngineCloseConnection(Session *session_arg) : session(session_arg) {}
83
there's no need to rollback here as all transactions must
84
be rolled back already
86
inline result_type operator() (argument_type engine)
88
if (engine->is_enabled() &&
89
session_get_ha_data(session, engine))
90
engine->close_connection(session);
96
don't bother to rollback here, it's done already
98
void StorageEngine::closeConnection(Session* session)
100
for_each(all_engines.begin(), all_engines.end(),
101
StorageEngineCloseConnection(session));
104
void StorageEngine::dropDatabase(char* path)
106
for_each(all_engines.begin(), all_engines.end(),
107
bind2nd(mem_fun(&plugin::StorageEngine::drop_database),path));
110
int StorageEngine::commitOrRollbackByXID(XID *xid, bool commit)
115
transform(all_engines.begin(), all_engines.end(), results.begin(),
116
bind2nd(mem_fun(&plugin::StorageEngine::commit_by_xid),xid));
118
transform(all_engines.begin(), all_engines.end(), results.begin(),
119
bind2nd(mem_fun(&plugin::StorageEngine::rollback_by_xid),xid));
121
if (find_if(results.begin(), results.end(), bind2nd(equal_to<int>(),0))
129
This function should be called when MySQL sends rows of a SELECT result set
130
or the EOF mark to the client. It releases a possible adaptive hash index
131
S-latch held by session in InnoDB and also releases a possible InnoDB query
132
FIFO ticket to enter InnoDB. To save CPU time, InnoDB allows a session to
133
keep them over several calls of the InnoDB handler interface when a join
134
is executed. But when we let the control to pass to the client they have
135
to be released because if the application program uses mysql_use_result(),
136
it may deadlock on the S-latch if the application on another connection
137
performs another SQL query. In MySQL-4.1 this is even more important because
138
there a connection can have several SELECT queries open at the same time.
140
@param session the thread handle of the current connection
145
int StorageEngine::releaseTemporaryLatches(Session *session)
147
for_each(all_engines.begin(), all_engines.end(),
148
bind2nd(mem_fun(&plugin::StorageEngine::release_temporary_latches),session));
152
bool StorageEngine::flushLogs(plugin::StorageEngine *engine)
156
if (find_if(all_engines.begin(), all_engines.end(),
157
mem_fun(&plugin::StorageEngine::flush_logs))
158
!= all_engines.begin())
163
if ((!engine->is_enabled()) ||
164
(engine->flush_logs()))
171
recover() step of xa.
174
there are three modes of operation:
175
- automatic recover after a crash
176
in this case commit_list != 0, tc_heuristic_recover==0
177
all xids from commit_list are committed, others are rolled back
178
- manual (heuristic) recover
179
in this case commit_list==0, tc_heuristic_recover != 0
180
DBA has explicitly specified that all prepared transactions should
181
be committed (or rolled back).
182
- no recovery (MySQL did not detect a crash)
183
in this case commit_list==0, tc_heuristic_recover == 0
184
there should be no prepared transactions in this case.
186
class XARecover : unary_function<plugin::StorageEngine *, void>
188
int trans_len, found_foreign_xids, found_my_xids;
194
XARecover(XID *trans_list_arg, int trans_len_arg,
195
HASH *commit_list_arg, bool dry_run_arg)
196
: trans_len(trans_len_arg), found_foreign_xids(0), found_my_xids(0),
198
trans_list(trans_list_arg), commit_list(commit_list_arg),
204
return found_foreign_xids;
209
return found_my_xids;
212
result_type operator() (argument_type engine)
217
if (engine->is_enabled())
219
while ((got= engine->recover(trans_list, trans_len)) > 0 )
221
errmsg_printf(ERRMSG_LVL_INFO,
222
_("Found %d prepared transaction(s) in %s"),
223
got, engine->getName().c_str());
224
for (int i=0; i < got; i ++)
226
my_xid x=trans_list[i].get_my_xid();
227
if (!x) // not "mine" - that is generated by external TM
229
xid_cache_insert(trans_list+i, XA_PREPARED);
230
found_foreign_xids++;
240
hash_search(commit_list, (unsigned char *)&x, sizeof(x)) != 0 :
241
tc_heuristic_recover == TC_HEURISTIC_RECOVER_COMMIT)
243
engine->commit_by_xid(trans_list+i);
247
engine->rollback_by_xid(trans_list+i);
257
int StorageEngine::recover(HASH *commit_list)
259
XID *trans_list= NULL;
262
bool dry_run= (commit_list==0 && tc_heuristic_recover==0);
264
/* commit_list and tc_heuristic_recover cannot be set both */
265
assert(commit_list==0 || tc_heuristic_recover==0);
267
/* if either is set, total_ha_2pc must be set too */
268
if (total_ha_2pc <= 1)
272
#ifndef WILL_BE_DELETED_LATER
275
for now, only InnoDB supports 2pc. It means we can always safely
276
rollback all pending transactions, without risking inconsistent data
279
assert(total_ha_2pc == 2); // only InnoDB and binlog
280
tc_heuristic_recover= TC_HEURISTIC_RECOVER_ROLLBACK; // forcing ROLLBACK
283
for (trans_len= MAX_XID_LIST_SIZE ;
284
trans_list==0 && trans_len > MIN_XID_LIST_SIZE; trans_len/=2)
286
trans_list=(XID *)malloc(trans_len*sizeof(XID));
290
errmsg_printf(ERRMSG_LVL_ERROR, ER(ER_OUTOFMEMORY), trans_len*sizeof(XID));
295
errmsg_printf(ERRMSG_LVL_INFO, _("Starting crash recovery..."));
298
XARecover recover_func(trans_list, trans_len, commit_list, dry_run);
299
for_each(all_engines.begin(), all_engines.end(), recover_func);
302
if (recover_func.getForeignXIDs())
303
errmsg_printf(ERRMSG_LVL_WARN,
304
_("Found %d prepared XA transactions"),
305
recover_func.getForeignXIDs());
306
if (dry_run && recover_func.getMyXIDs())
308
errmsg_printf(ERRMSG_LVL_ERROR,
309
_("Found %d prepared transactions! It means that drizzled "
310
"was not shut down properly last time and critical "
311
"recovery information (last binlog or %s file) was "
312
"manually deleted after a crash. You have to start "
313
"drizzled with the --tc-heuristic-recover switch to "
314
"commit or rollback pending transactions."),
315
recover_func.getMyXIDs(), opt_tc_log_file);
319
errmsg_printf(ERRMSG_LVL_INFO, _("Crash recovery finished."));
323
int StorageEngine::startConsistentSnapshot(Session *session)
325
for_each(all_engines.begin(), all_engines.end(),
326
bind2nd(mem_fun(&plugin::StorageEngine::start_consistent_snapshot),
331
class StorageEngineGetTableProto: public unary_function<plugin::StorageEngine *,bool>
334
message::Table *table_proto;
337
StorageEngineGetTableProto(const char* path_arg,
338
message::Table *table_proto_arg,
340
:path(path_arg), table_proto(table_proto_arg), err(err_arg) {}
342
result_type operator() (argument_type engine)
344
int ret= engine->getTableProtoImplementation(path, table_proto);
349
return *err == EEXIST;
353
static int drizzle_read_table_proto(const char* path, message::Table* table)
355
int fd= open(path, O_RDONLY);
360
google::protobuf::io::ZeroCopyInputStream* input=
361
new google::protobuf::io::FileInputStream(fd);
363
if (table->ParseFromZeroCopyStream(input) == false)
376
Call this function in order to give the handler the possiblity
377
to ask engine if there are any new tables that should be written to disk
378
or any dropped tables that need to be removed from disk
380
int StorageEngine::getTableProto(const char* path,
381
message::Table *table_proto)
385
Registry<plugin::StorageEngine *>::iterator iter=
386
find_if(all_engines.begin(), all_engines.end(),
387
StorageEngineGetTableProto(path, table_proto, &err));
388
if (iter == all_engines.end())
390
string proto_path(path);
391
string file_ext(".dfe");
392
proto_path.append(file_ext);
394
int error= access(proto_path.c_str(), F_OK);
403
int read_proto_err= drizzle_read_table_proto(proto_path.c_str(),
415
An interceptor to hijack the text of the error message without
416
setting an error in the thread. We need the text to present it
417
in the form of a warning to the user.
420
class Ha_delete_table_error_handler: public Internal_error_handler
423
Ha_delete_table_error_handler() : Internal_error_handler() {}
424
virtual bool handle_error(uint32_t sql_errno,
426
DRIZZLE_ERROR::enum_warning_level level,
428
char buff[DRIZZLE_ERRMSG_SIZE];
433
Ha_delete_table_error_handler::
434
handle_error(uint32_t ,
436
DRIZZLE_ERROR::enum_warning_level ,
439
/* Grab the error message */
440
strncpy(buff, message, sizeof(buff)-1);
445
class DeleteTableStorageEngine
446
: public unary_function<plugin::StorageEngine *, void>
453
DeleteTableStorageEngine(Session *session_arg, const char *path_arg,
454
handler **file_arg, int *error_arg)
455
: session(session_arg), path(path_arg), file(file_arg), dt_error(error_arg) {}
457
result_type operator() (argument_type engine)
459
char tmp_path[FN_REFLEN];
462
if(*dt_error!=ENOENT) /* already deleted table */
468
if (!engine->is_enabled())
471
if ((tmp_file= engine->create(NULL, session->mem_root)))
476
path= engine->checkLowercaseNames(path, tmp_path);
477
const std::string table_path(path);
478
int tmp_error= engine->deleteTable(session, table_path);
480
if (tmp_error != ENOENT)
484
if (engine->check_flag(HTON_BIT_HAS_DATA_DICTIONARY))
485
delete_table_proto_file(path);
487
tmp_error= delete_table_proto_file(path);
490
*dt_error= tmp_error;
505
This should return ENOENT if the file doesn't exists.
506
The .frm file will be deleted only if we return 0 or ENOENT
508
int StorageEngine::deleteTable(Session *session, const char *path,
509
const char *db, const char *alias,
510
bool generate_warning)
512
TableShare dummy_share;
514
memset(&dummy_table, 0, sizeof(dummy_table));
515
memset(&dummy_share, 0, sizeof(dummy_share));
517
dummy_table.s= &dummy_share;
522
for_each(all_engines.begin(), all_engines.end(),
523
DeleteTableStorageEngine(session, path, &file, &error));
525
if (error == ENOENT) /* proto may be left behind */
526
error= delete_table_proto_file(path);
528
if (error && generate_warning)
531
Because file->print_error() use my_error() to generate the error message
532
we use an internal error handler to intercept it and store the text
533
in a temporary buffer. Later the message will be presented to user
536
Ha_delete_table_error_handler ha_delete_table_error_handler;
538
/* Fill up strucutures that print_error may need */
539
dummy_share.path.str= (char*) path;
540
dummy_share.path.length= strlen(path);
541
dummy_share.db.str= (char*) db;
542
dummy_share.db.length= strlen(db);
543
dummy_share.table_name.str= (char*) alias;
544
dummy_share.table_name.length= strlen(alias);
545
dummy_table.alias= alias;
549
file->change_table_ptr(&dummy_table, &dummy_share);
551
session->push_internal_handler(&ha_delete_table_error_handler);
552
file->print_error(error, 0);
554
session->pop_internal_handler();
557
error= -1; /* General form of fail. maybe bad FRM */
560
XXX: should we convert *all* errors to warnings here?
561
What if the error is fatal?
563
push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_ERROR, error,
564
ha_delete_table_error_handler.buff);
573
class DFETableNameIterator: public plugin::TableNameIteratorImplementation
577
uint32_t current_entry;
580
DFETableNameIterator(const std::string &database)
581
: plugin::TableNameIteratorImplementation(database),
586
~DFETableNameIterator();
588
int next(std::string *name);
592
DFETableNameIterator::~DFETableNameIterator()
598
int DFETableNameIterator::next(string *name)
600
char uname[NAME_LEN + 1];
603
uint32_t file_name_len;
604
const char *wild= NULL;
609
char path[FN_REFLEN];
611
build_table_filename(path, sizeof(path), db.c_str(), "", false);
613
dirp = my_dir(path,MYF(dir ? MY_WANT_STAT : 0));
617
if (my_errno == ENOENT)
618
my_error(ER_BAD_DB_ERROR, MYF(ME_BELL+ME_WAITTANG), db.c_str());
620
my_error(ER_CANT_READ_DIR, MYF(ME_BELL+ME_WAITTANG), path, my_errno);
630
if (current_entry == dirp->number_off_files)
637
file= dirp->dir_entry + current_entry;
639
if (my_strcasecmp(system_charset_info, ext=fn_rext(file->name),".dfe") ||
640
is_prefix(file->name, TMP_FILE_PREFIX))
644
file_name_len= filename_to_tablename(file->name, uname, sizeof(uname));
646
uname[file_name_len]= '\0';
648
if (wild && wild_compare(uname, wild, 0))
659
TableNameIterator::TableNameIterator(const std::string &db)
660
: current_implementation(NULL), database(db)
662
plugin::Registry &plugins= plugin::Registry::singleton();
663
engine_iter= plugins.storage_engine.begin();
664
default_implementation= new DFETableNameIterator(database);
667
TableNameIterator::~TableNameIterator()
669
delete current_implementation;
672
int TableNameIterator::next(std::string *name)
674
plugin::Registry &plugins= plugin::Registry::singleton();
678
if (current_implementation == NULL)
680
while(current_implementation == NULL &&
681
(engine_iter != plugins.storage_engine.end()))
683
plugin::StorageEngine *engine= *engine_iter;
684
current_implementation= engine->tableNameIterator(database);
688
if (current_implementation == NULL &&
689
(engine_iter == plugins.storage_engine.end()))
691
current_implementation= default_implementation;
695
err= current_implementation->next(name);
699
if (current_implementation != default_implementation)
701
delete current_implementation;
702
current_implementation= NULL;
711
} /* namespace service */
712
} /* namespace drizzled */
716
drizzled::plugin::StorageEngine *ha_resolve_by_name(Session *session,
717
const std::string &find_str)
719
drizzled::plugin::Registry &plugins= drizzled::plugin::Registry::singleton();
720
return plugins.storage_engine.findByName(session, find_str);
723
void ha_close_connection(Session *session)
725
drizzled::plugin::Registry &plugins= drizzled::plugin::Registry::singleton();
726
plugins.storage_engine.closeConnection(session);
729
void ha_drop_database(char* path)
731
drizzled::plugin::Registry &plugins= drizzled::plugin::Registry::singleton();
732
plugins.storage_engine.dropDatabase(path);
735
int ha_commit_or_rollback_by_xid(XID *xid, bool commit)
737
drizzled::plugin::Registry &plugins= drizzled::plugin::Registry::singleton();
738
return plugins.storage_engine.commitOrRollbackByXID(xid, commit);
741
int ha_release_temporary_latches(Session *session)
743
drizzled::plugin::Registry &plugins= drizzled::plugin::Registry::singleton();
744
return plugins.storage_engine.releaseTemporaryLatches(session);
747
bool ha_flush_logs(drizzled::plugin::StorageEngine *engine)
749
drizzled::plugin::Registry &plugins= drizzled::plugin::Registry::singleton();
750
return plugins.storage_engine.flushLogs(engine);
753
int ha_recover(HASH *commit_list)
755
drizzled::plugin::Registry &plugins= drizzled::plugin::Registry::singleton();
756
return plugins.storage_engine.recover(commit_list);
759
int ha_start_consistent_snapshot(Session *session)
761
drizzled::plugin::Registry &plugins= drizzled::plugin::Registry::singleton();
762
return plugins.storage_engine.startConsistentSnapshot(session);
765
int ha_delete_table(Session *session, const char *path,
766
const char *db, const char *alias, bool generate_warning)
768
drizzled::plugin::Registry &plugins= drizzled::plugin::Registry::singleton();
769
return plugins.storage_engine.deleteTable(session, path, db,
770
alias, generate_warning);