1
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
* Original author: Paul McCullagh
20
* Continued development: Barry Leslie
30
#ifdef USE_PRAGMA_IMPLEMENTATION
31
#pragma implementation // gcc: Class implementation
36
#include <drizzled/common.h>
37
#include <drizzled/plugin.h>
38
#include <drizzled/field.h>
39
#include <drizzled/session.h>
40
#include <drizzled/data_home.h>
41
#include <drizzled/error.h>
42
#include <drizzled/table.h>
43
#include <drizzled/plugin/transactional_storage_engine.h>
45
#define my_strdup(a,b) strdup(a)
46
using namespace drizzled;
47
using namespace drizzled::plugin;
51
#include "cslib/CSConfig.h"
53
#include "cslib/CSConfig.h"
54
#include "mysql_priv.h"
55
#include <mysql/plugin.h>
66
#include "cslib/CSDefs.h"
67
#include "cslib/CSObject.h"
68
#include "cslib/CSGlobal.h"
69
#include "cslib/CSThread.h"
70
#include "cslib/CSStrUtil.h"
71
#include "cslib/CSLog.h"
73
#include "engine_ms.h"
75
#include "network_ms.h"
76
#include "connection_handler_ms.h"
77
#include "open_table_ms.h"
78
#include "database_ms.h"
79
#include "temp_log_ms.h"
80
#include "system_table_ms.h"
82
#include "discover_ms.h"
83
#include "metadata_ms.h"
84
#include "transaction_ms.h"
85
#include "systab_httpheader_ms.h"
86
#include "system_table_ms.h"
87
#include "parameters_ms.h"
88
#include "pbmsdaemon_ms.h"
89
#include "version_ms.h"
91
/* Note: 'new' used here is NOT CSObject::new which is a DEBUG define*/
99
static int pbms_done_func(void *);
101
class PBMSStorageEngine : public drizzled::plugin::TransactionalStorageEngine {
104
: TransactionalStorageEngine(std::string("PBMS"), HTON_NO_FLAGS | HTON_HIDDEN) {}
108
pbms_done_func(NULL);
111
int close_connection(Session *);
113
int doStartTransaction(Session *session, start_transaction_option_t options);
114
int doCommit(Session *, bool);
115
int doRollback(Session *, bool);
116
Cursor *create(Table& table);
117
bool doDropSchema(const drizzled::identifier::Schema&);
120
* Indicates to a storage engine the start of a
123
void doStartStatement(Session *session)
129
* Indicates to a storage engine the end of
130
* the current SQL statement in the supplied
133
void doEndStatement(Session *session)
138
int doCreateTable(Session&, Table&, const identifier::Table& ident, drizzled::message::Table& );
139
int doDropTable(Session &, const identifier::Table& );
141
int doRenameTable(Session&, const identifier::Table &from, const identifier::Table &to);
143
void doGetTableIdentifiers(drizzled::CachedDirectory &dir,
144
const drizzled::identifier::Schema &schema,
145
drizzled::identifier::Table::vector &set_of_identifiers)
147
std::set<std::string> set_of_names;
149
doGetTableNames(dir, schema, set_of_names);
150
for (std::set<std::string>::iterator set_iter = set_of_names.begin(); set_iter != set_of_names.end(); ++set_iter)
152
set_of_identifiers.push_back(identifier::Table(schema, *set_iter));
156
void doGetTableNames(CachedDirectory&,
157
const identifier::Schema &schema,
158
std::set<std::string> &set_of_names)
160
bool isPBMS = schema.compare("PBMS");
162
if (isPBMS || PBMSParameters::isBLOBDatabase(schema.getSchemaName().c_str()))
163
PBMSSystemTables::getSystemTableNames(isPBMS, set_of_names);
166
int doSetSavepoint(Session *thd, NamedSavepoint &savepoint);
167
int doRollbackToSavepoint(Session *session, NamedSavepoint &savepoint);
168
int doReleaseSavepoint(Session *session, NamedSavepoint &savepoint);
169
const char **bas_ext() const;
171
int doGetTableDefinition(Session&, const identifier::Table &identifier,
172
drizzled::message::Table &table_proto)
175
const char *tab_name = identifier.getTableName().c_str();
177
// Set some required table proto info:
178
table_proto.set_schema(identifier.getSchemaName().c_str());
179
table_proto.set_creation_timestamp(0);
180
table_proto.set_update_timestamp(0);
182
err = PBMSSystemTables::getSystemTableInfo(tab_name, table_proto);
189
bool doDoesTableExist(Session&, const identifier::Table &identifier)
191
const char *tab_name = identifier.getTableName().c_str();
192
const char *db_name = identifier.getSchemaName().c_str();
193
bool isPBMS = identifier.getSchemaName().compare("PBMS");
195
if (isPBMS || PBMSParameters::isBLOBDatabase(db_name)) {
196
return PBMSSystemTables::isSystemTable(isPBMS, tab_name);
205
PBMSStorageEngine *pbms_hton;
207
handlerton *pbms_hton;
210
static const char *ha_pbms_exts[] = {
215
* ---------------------------------------------------------------
220
void pbms_take_part_in_transaction(void *thread)
223
if ((thd = (THD *) thread)) {
224
trans_register_ha(thd, true, pbms_hton);
230
const char **PBMSStorageEngine::bas_ext() const
232
const char **ha_pbms::bas_ext() const
239
int PBMSStorageEngine::close_connection(Session *thd)
242
static int pbms_close_connection(handlerton *hton, THD* thd)
246
MSEngine::closeConnection(thd);
253
* ---------------------------------------------------------------
259
Cursor *PBMSStorageEngine::create(Table& table)
261
PBMSStorageEngine * const hton = this;
262
return new ha_pbms(hton, table);
265
static handler *pbms_create_handler(handlerton *hton, TABLE_SHARE *table, MEM_ROOT *mem_root)
267
return new (mem_root) ha_pbms(hton, table);
272
int PBMSStorageEngine::doStartTransaction(Session *thd, start_transaction_option_t options)
279
int PBMSStorageEngine::doCommit(Session *thd, bool all)
282
static int pbms_commit(handlerton *, THD *thd, bool all)
287
PBMSResultRec result;
289
// I am not interesed in single statement transactions.
293
if (MSEngine::enterConnection(thd, &self, &result, false))
297
MSTransactionManager::commit();
300
err = MSEngine::exceptionToResult(&self->myException, &result);
303
self->myIsAutoCommit = true;
308
int PBMSStorageEngine::doRollback(THD *thd, bool all)
311
static int pbms_rollback(handlerton *, THD *thd, bool all)
316
PBMSResultRec result;
320
if (MSEngine::enterConnection(thd, &self, &result, false))
324
MSTransactionManager::rollback();
327
err = MSEngine::exceptionToResult(&self->myException, &result);
330
self->myIsAutoCommit = true;
335
int PBMSStorageEngine::doSetSavepoint(Session *thd, NamedSavepoint &savepoint)
339
PBMSResultRec result;
341
if (MSEngine::enterConnection(thd, &self, &result, false))
346
MSTransactionManager::setSavepoint(savepoint.getName().c_str());
349
err = MSEngine::exceptionToResult(&self->myException, &result);
356
int PBMSStorageEngine::doRollbackToSavepoint(Session *session, NamedSavepoint &savepoint)
360
PBMSResultRec result;
362
if (MSEngine::enterConnection(session, &self, &result, false))
366
MSTransactionManager::rollbackTo(savepoint.getName().c_str());
369
err = MSEngine::exceptionToResult(&self->myException, &result);
376
int PBMSStorageEngine::doReleaseSavepoint(Session *session, NamedSavepoint &savepoint)
380
PBMSResultRec result;
382
if (MSEngine::enterConnection(session, &self, &result, false))
387
MSTransactionManager::releaseSavepoint(savepoint.getName().c_str());
390
err = MSEngine::exceptionToResult(&self->myException, &result);
397
static int pbms_savepoint_set(handlerton *hton, THD *thd, void *sv)
401
PBMSResultRec result;
403
if (MSEngine::enterConnection(thd, &self, &result, false))
406
*((uint32_t*)sv) = self->myStmtCount;
410
static int pbms_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
414
PBMSResultRec result;
416
if (MSEngine::enterConnection(thd, &self, &result, false))
420
MSTransactionManager::rollbackToPosition(*((uint32_t*)sv));
423
err = MSEngine::exceptionToResult(&self->myException, &result);
429
static int pbms_savepoint_release(handlerton *hton, THD *thd, void *sv)
437
bool PBMSStorageEngine::doDropSchema(const drizzled::identifier::Schema &schema)
440
PBMSResultRec result;
442
if (MSEngine::enterConnectionNoThd(&self, &result))
447
MSDatabase::dropDatabase(schema.getSchemaName().c_str());
450
self->logException();
455
static void pbms_drop_database(handlerton *, char *path)
458
char db_name[PATH_MAX];
459
PBMSResultRec result;
461
if (MSEngine::enterConnectionNoThd(&self, &result))
465
cs_strcpy(PATH_MAX, db_name, cs_last_directory_of_path(path));
466
cs_remove_dir_char(db_name);
468
MSDatabase::dropDatabase(db_name);
471
self->logException();
477
static bool pbms_started = false;
481
int pbms_init_func(module::Context ®istry);
482
int pbms_init_func(module::Context ®istry)
484
int pbms_init_func(void *p);
485
int pbms_discover_system_tables(handlerton *hton, THD* thd, const char *db, const char *name, uchar **frmblob, size_t *frmlen);
486
int pbms_init_func(void *p)
489
PBMSResultRec result;
494
ASSERT(!pbms_started);
495
pbms_started = false;
496
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonStartUp);
500
snprintf(info, 120, "PrimeBase Media Stream (PBMS) Daemon %s loaded...", PBMSVersion::getCString());
501
CSL.logLine(NULL, CSLog::Protocol, info);
503
CSL.logLine(NULL, CSLog::Protocol, "Barry Leslie, PrimeBase Technologies GmbH, http://www.primebase.org");
505
if ((err = MSEngine::startUp(&result))) {
506
CSL.logLine(NULL, CSLog::Error, result.mr_message);
507
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonError);
512
pbms_hton= new PBMSStorageEngine();
513
registry.add(pbms_hton);
515
pbms_hton = (handlerton *) p;
516
pbms_hton->state = SHOW_OPTION_YES;
517
pbms_hton->close_connection = pbms_close_connection; /* close_connection, cleanup thread related data. */
518
pbms_hton->create = pbms_create_handler;
519
pbms_hton->flags = HTON_CAN_RECREATE | HTON_HIDDEN;
520
pbms_hton->drop_database = pbms_drop_database; /* Drop a database */
521
pbms_hton->discover = pbms_discover_system_tables;
523
pbms_hton->commit = pbms_commit; /* commit */
524
pbms_hton->rollback = pbms_rollback; /* rollback */
526
pbms_hton->savepoint_offset = 4;
527
pbms_hton->savepoint_set = pbms_savepoint_set;
528
pbms_hton->savepoint_rollback = pbms_savepoint_rollback;
529
pbms_hton->savepoint_release = pbms_savepoint_release;
532
/* Startup the Media Stream network: */
535
if (!(thread = CSThread::newCSThread())) {
536
CSException::logOSError(CS_CONTEXT, ENOMEM);
537
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonError);
540
if (!CSThread::attach(thread)) {
541
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonError);
542
thread->myException.log(NULL);
543
CSThread::shutDown();
545
MSEngine::shutDown();
550
thread->threadName = CSString::newString("startup");
551
MSDatabase::startUp(PBMSParameters::getDefaultMetaDataHeaders());
552
MSTableList::startUp();
553
MSSystemTableShare::startUp();
554
MSNetwork::startUp(PBMSParameters::getPortNumber());
555
MSTransactionManager::startUp();
556
MSNetwork::startNetwork();
559
self->logException();
565
MSNetwork::shutDown();
566
MSTransactionManager::shutDown();
567
MSSystemTableShare::shutDown();
568
MSDatabase::stopThreads();
569
MSTableList::shutDown();
570
MSDatabase::shutDown();
571
CSThread::shutDown();
574
self->logException();
579
CSThread::detach(thread);
583
MSEngine::shutDown();
592
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonRunning);
594
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonError);
600
static int pbms_done_func(void *)
602
int pbms_done_func(void *)
610
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonShuttingDown);
611
CSL.logLine(NULL, CSLog::Protocol, "PrimeBase Media Stream (PBMS) Daemon shutdown...");
613
/* Shutdown the Media Stream network. */
614
if (!(thread = CSThread::newCSThread()))
615
CSException::logOSError(CS_CONTEXT, ENOMEM);
616
else if (!CSThread::attach(thread))
617
thread->myException.log(NULL);
621
thread->threadName = CSString::newString("shutdown");
622
MSNetwork::shutDown();
623
MSSystemTableShare::shutDown();
624
/* Ensure that the database threads are stopped before
625
* freeing the tables.
627
MSDatabase::stopThreads();
628
MSTableList::shutDown();
629
/* Databases must be shutdown after table because tables
630
* have references to repositories.
632
MSDatabase::shutDown();
634
/* Shutdown the transaction manager after the databases
635
* incase they want to commit or rollback a transaction.
637
MSTransactionManager::shutDown();
640
self->logException();
644
CSThread::shutDown();
645
CSThread::detach(thread);
648
MSEngine::shutDown();
651
CSL.logLine(NULL, CSLog::Protocol, "PrimeBase Media Stream (PBMS) Daemon shutdown completed");
652
pbms_started = false;
657
ha_pbms::ha_pbms(handlerton *hton, Table& table_arg) : handler(*hton, table_arg),
659
ha_pbms::ha_pbms(handlerton *hton, TABLE_SHARE *table_arg) : handler(hton, table_arg),
664
memset(&ha_result, 0, sizeof(PBMSResultRec));
668
MX_TABLE_TYPES_T ha_pbms::table_flags() const
671
/* We need this flag because records are not packed
672
* into a table which means #ROWID != offset
676
#if MYSQL_VERSION_ID > 50119
677
/* We can do row logging, but not statement, because
678
* MVCC is not serializable!
680
HA_BINLOG_ROW_CAPABLE |
683
* Auto-increment is allowed on a partial key.
689
int ha_pbms::open(const char *table_path, int , uint )
693
if ((ha_error = MSEngine::enterConnection(current_thd, &self, &ha_result, true)))
698
ha_open_tab = MSSystemTableShare::openSystemTable(table_path, getTable());
700
ha_lock.init(&ha_open_tab->myShare->myThrLock);
702
thr_lock_data_init(&ha_open_tab->myShare->myThrLock, &ha_lock, NULL);
704
ref_length = ha_open_tab->getRefLen();
707
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
710
return_(ha_error != MS_OK);
713
int ha_pbms::close(void)
717
if ((ha_error = MSEngine::enterConnection(current_thd, &self, &ha_result, true)))
722
ha_open_tab->release();
726
MSEngine::exitConnection();
731
/* Index access functions: */
732
int ha_pbms::index_init(uint idx, bool sorted)
739
ha_open_tab->index_init(idx);
742
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
750
int ha_pbms::index_end()
755
ha_open_tab->index_end();
758
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
766
int ha_pbms::index_read(byte * buf, const byte * key,
767
uint key_len, enum ha_rkey_function find_flag)
772
if (!ha_open_tab->index_read(buf, key, key_len, find_flag))
773
err = HA_ERR_KEY_NOT_FOUND;
777
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
785
int ha_pbms::index_read_idx(byte * buf, uint idx, const byte * key,
786
uint key_len, enum ha_rkey_function find_flag)
791
if (!ha_open_tab->index_read_idx(buf, idx, key, key_len, find_flag))
792
err = HA_ERR_KEY_NOT_FOUND;
795
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
803
int ha_pbms::index_next(byte * buf)
808
if (!ha_open_tab->index_next(buf))
809
err = HA_ERR_END_OF_FILE;
812
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
820
int ha_pbms::index_prev(byte * buf)
825
if (!ha_open_tab->index_prev(buf))
826
err = HA_ERR_END_OF_FILE;
829
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
837
int ha_pbms::index_first(byte * buf)
842
if (!ha_open_tab->index_first(buf))
843
err = HA_ERR_END_OF_FILE;
846
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
854
int ha_pbms::index_last(byte * buf)
859
if (!ha_open_tab->index_last(buf))
860
err = HA_ERR_END_OF_FILE;
863
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
871
int ha_pbms::index_read_last(byte * buf, const byte * key, uint key_len)
876
if (!ha_open_tab->index_read_last(buf, key, key_len))
877
err = HA_ERR_KEY_NOT_FOUND;
880
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
889
#endif // PBMS_HAS_KEYS
891
/* Sequential scan functions: */
893
int ha_pbms::doStartTableScan(bool )
895
int ha_pbms::rnd_init(bool )
901
ha_open_tab->seqScanInit();
904
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
912
int ha_pbms::rnd_next(unsigned char *buf)
917
if (!ha_open_tab->seqScanNext((char *) buf))
918
err = HA_ERR_END_OF_FILE;
921
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
929
void ha_pbms::position(const unsigned char *)
931
ha_open_tab->seqScanPos((uint8_t *) ref);
935
int ha_pbms::rnd_pos(unsigned char * buf, unsigned char *pos)
940
ha_open_tab->seqScanRead((uint8_t *) pos, (char *) buf);
943
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
950
//////////////////////////////
952
int ha_pbms::doInsertRecord(byte * buf)
954
int ha_pbms::write_row(unsigned char * buf)
960
ha_open_tab->insertRow((char *) buf);
963
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
971
int ha_pbms::doDeleteRecord(const byte * buf)
973
int ha_pbms::delete_row(const unsigned char * buf)
979
ha_open_tab->deleteRow((char *) buf);
982
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
990
int ha_pbms::doUpdateRecord(const byte * old_data, byte * new_data)
992
int ha_pbms::update_row(const unsigned char * old_data, unsigned char * new_data)
998
ha_open_tab->updateRow((char *) old_data, (char *) new_data);
1001
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
1008
int ha_pbms::info(uint )
1013
int ha_pbms::external_lock(THD *thd, int lock_type)
1018
if ((ha_error = MSEngine::enterConnection(thd, &self, &ha_result, true)))
1023
if (lock_type == F_UNLCK)
1024
ha_open_tab->unuse();
1029
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
1036
THR_LOCK_DATA **ha_pbms::store_lock(THD *, THR_LOCK_DATA **to, enum thr_lock_type lock_type)
1038
if (lock_type != TL_IGNORE && ha_lock.type == TL_UNLOCK)
1039
ha_lock.type = lock_type;
1046
int PBMSStorageEngine::doCreateTable(Session&, Table&, const identifier::Table& , drizzled::message::Table& )
1048
/* You cannot create PBMS tables. */
1049
return( HA_ERR_WRONG_COMMAND );
1052
int PBMSStorageEngine::doDropTable(Session &, const identifier::Table& )
1054
/* You cannot delete PBMS tables. */
1058
int PBMSStorageEngine::doRenameTable(Session&, const identifier::Table &, const identifier::Table &)
1060
/* You cannot rename PBMS tables. */
1061
return( HA_ERR_WRONG_COMMAND );
1066
int ha_pbms::create(const char *table_name, TABLE *table, HA_CREATE_INFO *)
1068
bool isPBMS = (strcasecmp(table->s->db.str, "PBMS") == 0);
1070
if (PBMSSystemTables::isSystemTable(isPBMS, cs_last_name_of_path(table_name)))
1073
/* Create only works for system tables. */
1074
return( HA_ERR_WRONG_COMMAND );
1078
bool ha_pbms::get_error_message(int , String *buf)
1080
if (!ha_result.mr_code)
1083
buf->copy(ha_result.mr_message, strlen(ha_result.mr_message), system_charset_info);
1088
CSThread *pbms_getMySelf(THD *thd);
1089
void pbms_setMySelf(THD *thd, CSThread *self);
1091
CSThread *pbms_getMySelf(THD *thd) { return ((CSThread *) *thd->getEngineData(pbms_hton));}
1092
void pbms_setMySelf(THD *thd, CSThread *self) { *thd->getEngineData(pbms_hton) = (void *)self;}
1094
CSThread *pbms_getMySelf(THD *thd) { return ((CSThread *) *thd_ha_data(thd, pbms_hton));}
1095
void pbms_setMySelf(THD *thd, CSThread *self) { *thd_ha_data(thd, pbms_hton) = (void *)self;}