1
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
* Original author: Paul McCullagh
20
* Continued development: Barry Leslie
30
#ifdef USE_PRAGMA_IMPLEMENTATION
31
#pragma implementation // gcc: Class implementation
37
#include <drizzled/current_session.h>
38
#include <drizzled/common.h>
39
#include <drizzled/plugin.h>
40
#include <drizzled/field.h>
41
#include <drizzled/session.h>
42
#include <drizzled/data_home.h>
43
#include <drizzled/error.h>
44
#include <drizzled/table.h>
45
#include <drizzled/plugin/transactional_storage_engine.h>
47
#define my_strdup(a,b) strdup(a)
48
using namespace drizzled;
49
using namespace drizzled::plugin;
53
#include "cslib/CSConfig.h"
55
#include "cslib/CSConfig.h"
56
#include "mysql_priv.h"
57
#include <mysql/plugin.h>
68
#include "cslib/CSDefs.h"
69
#include "cslib/CSObject.h"
70
#include "cslib/CSGlobal.h"
71
#include "cslib/CSThread.h"
72
#include "cslib/CSStrUtil.h"
73
#include "cslib/CSLog.h"
75
#include "engine_ms.h"
77
#include "network_ms.h"
78
#include "connection_handler_ms.h"
79
#include "open_table_ms.h"
80
#include "database_ms.h"
81
#include "temp_log_ms.h"
82
#include "system_table_ms.h"
84
#include "discover_ms.h"
85
#include "metadata_ms.h"
86
#include "transaction_ms.h"
87
#include "systab_httpheader_ms.h"
88
#include "system_table_ms.h"
89
#include "parameters_ms.h"
90
#include "pbmsdaemon_ms.h"
91
#include "version_ms.h"
93
/* Note: 'new' used here is NOT CSObject::new which is a DEBUG define*/
101
static int pbms_done_func(void *);
103
class PBMSStorageEngine : public drizzled::plugin::TransactionalStorageEngine {
106
: TransactionalStorageEngine(std::string("PBMS"), HTON_NO_FLAGS | HTON_HIDDEN) {}
110
pbms_done_func(NULL);
113
int close_connection(Session *);
115
int doStartTransaction(Session *session, start_transaction_option_t options);
116
int doCommit(Session *, bool);
117
int doRollback(Session *, bool);
118
Cursor *create(Table& table);
119
bool doDropSchema(const drizzled::identifier::Schema&);
122
* Indicates to a storage engine the start of a
125
void doStartStatement(Session *session)
131
* Indicates to a storage engine the end of
132
* the current SQL statement in the supplied
135
void doEndStatement(Session *session)
140
int doCreateTable(Session&, Table&, const identifier::Table& ident, drizzled::message::Table& );
141
int doDropTable(Session &, const identifier::Table& );
143
int doRenameTable(Session&, const identifier::Table &from, const identifier::Table &to);
145
void doGetTableIdentifiers(drizzled::CachedDirectory &dir,
146
const drizzled::identifier::Schema &schema,
147
drizzled::identifier::Table::vector &set_of_identifiers)
149
std::set<std::string> set_of_names;
151
doGetTableNames(dir, schema, set_of_names);
152
for (std::set<std::string>::iterator set_iter = set_of_names.begin(); set_iter != set_of_names.end(); ++set_iter)
154
set_of_identifiers.push_back(identifier::Table(schema, *set_iter));
158
void doGetTableNames(CachedDirectory&,
159
const identifier::Schema &schema,
160
std::set<std::string> &set_of_names)
162
bool isPBMS = schema.compare("PBMS");
164
if (isPBMS || PBMSParameters::isBLOBDatabase(schema.getSchemaName().c_str()))
165
PBMSSystemTables::getSystemTableNames(isPBMS, set_of_names);
168
int doSetSavepoint(Session *thd, NamedSavepoint &savepoint);
169
int doRollbackToSavepoint(Session *session, NamedSavepoint &savepoint);
170
int doReleaseSavepoint(Session *session, NamedSavepoint &savepoint);
171
const char **bas_ext() const;
173
int doGetTableDefinition(Session&, const identifier::Table &identifier,
174
drizzled::message::Table &table_proto)
177
const char *tab_name = identifier.getTableName().c_str();
179
// Set some required table proto info:
180
table_proto.set_schema(identifier.getSchemaName().c_str());
181
table_proto.set_creation_timestamp(0);
182
table_proto.set_update_timestamp(0);
184
err = PBMSSystemTables::getSystemTableInfo(tab_name, table_proto);
191
bool doDoesTableExist(Session&, const identifier::Table &identifier)
193
const char *tab_name = identifier.getTableName().c_str();
194
const char *db_name = identifier.getSchemaName().c_str();
195
bool isPBMS = identifier.getSchemaName().compare("PBMS");
197
if (isPBMS || PBMSParameters::isBLOBDatabase(db_name)) {
198
return PBMSSystemTables::isSystemTable(isPBMS, tab_name);
207
PBMSStorageEngine *pbms_hton;
209
handlerton *pbms_hton;
212
static const char *ha_pbms_exts[] = {
217
* ---------------------------------------------------------------
222
void pbms_take_part_in_transaction(void *thread)
225
if ((thd = (THD *) thread)) {
226
trans_register_ha(thd, true, pbms_hton);
232
const char **PBMSStorageEngine::bas_ext() const
234
const char **ha_pbms::bas_ext() const
241
int PBMSStorageEngine::close_connection(Session *thd)
244
static int pbms_close_connection(handlerton *hton, THD* thd)
248
MSEngine::closeConnection(thd);
255
* ---------------------------------------------------------------
261
Cursor *PBMSStorageEngine::create(Table& table)
263
PBMSStorageEngine * const hton = this;
264
return new ha_pbms(hton, table);
267
static handler *pbms_create_handler(handlerton *hton, TABLE_SHARE *table, MEM_ROOT *mem_root)
269
return new (mem_root) ha_pbms(hton, table);
274
int PBMSStorageEngine::doStartTransaction(Session *thd, start_transaction_option_t options)
281
int PBMSStorageEngine::doCommit(Session *thd, bool all)
284
static int pbms_commit(handlerton *, THD *thd, bool all)
289
PBMSResultRec result;
291
// I am not interesed in single statement transactions.
295
if (MSEngine::enterConnection(thd, &self, &result, false))
299
MSTransactionManager::commit();
302
err = MSEngine::exceptionToResult(&self->myException, &result);
305
self->myIsAutoCommit = true;
310
int PBMSStorageEngine::doRollback(THD *thd, bool all)
313
static int pbms_rollback(handlerton *, THD *thd, bool all)
318
PBMSResultRec result;
322
if (MSEngine::enterConnection(thd, &self, &result, false))
326
MSTransactionManager::rollback();
329
err = MSEngine::exceptionToResult(&self->myException, &result);
332
self->myIsAutoCommit = true;
337
int PBMSStorageEngine::doSetSavepoint(Session *thd, NamedSavepoint &savepoint)
341
PBMSResultRec result;
343
if (MSEngine::enterConnection(thd, &self, &result, false))
348
MSTransactionManager::setSavepoint(savepoint.getName().c_str());
351
err = MSEngine::exceptionToResult(&self->myException, &result);
358
int PBMSStorageEngine::doRollbackToSavepoint(Session *session, NamedSavepoint &savepoint)
362
PBMSResultRec result;
364
if (MSEngine::enterConnection(session, &self, &result, false))
368
MSTransactionManager::rollbackTo(savepoint.getName().c_str());
371
err = MSEngine::exceptionToResult(&self->myException, &result);
378
int PBMSStorageEngine::doReleaseSavepoint(Session *session, NamedSavepoint &savepoint)
382
PBMSResultRec result;
384
if (MSEngine::enterConnection(session, &self, &result, false))
389
MSTransactionManager::releaseSavepoint(savepoint.getName().c_str());
392
err = MSEngine::exceptionToResult(&self->myException, &result);
399
static int pbms_savepoint_set(handlerton *hton, THD *thd, void *sv)
403
PBMSResultRec result;
405
if (MSEngine::enterConnection(thd, &self, &result, false))
408
*((uint32_t*)sv) = self->myStmtCount;
412
static int pbms_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
416
PBMSResultRec result;
418
if (MSEngine::enterConnection(thd, &self, &result, false))
422
MSTransactionManager::rollbackToPosition(*((uint32_t*)sv));
425
err = MSEngine::exceptionToResult(&self->myException, &result);
431
static int pbms_savepoint_release(handlerton *hton, THD *thd, void *sv)
439
bool PBMSStorageEngine::doDropSchema(const drizzled::identifier::Schema &schema)
442
PBMSResultRec result;
444
if (MSEngine::enterConnectionNoThd(&self, &result))
449
MSDatabase::dropDatabase(schema.getSchemaName().c_str());
452
self->logException();
457
static void pbms_drop_database(handlerton *, char *path)
460
char db_name[PATH_MAX];
461
PBMSResultRec result;
463
if (MSEngine::enterConnectionNoThd(&self, &result))
467
cs_strcpy(PATH_MAX, db_name, cs_last_directory_of_path(path));
468
cs_remove_dir_char(db_name);
470
MSDatabase::dropDatabase(db_name);
473
self->logException();
479
static bool pbms_started = false;
483
int pbms_init_func(module::Context ®istry);
484
int pbms_init_func(module::Context ®istry)
486
int pbms_init_func(void *p);
487
int pbms_discover_system_tables(handlerton *hton, THD* thd, const char *db, const char *name, uchar **frmblob, size_t *frmlen);
488
int pbms_init_func(void *p)
491
PBMSResultRec result;
496
ASSERT(!pbms_started);
497
pbms_started = false;
498
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonStartUp);
502
snprintf(info, 120, "PrimeBase Media Stream (PBMS) Daemon %s loaded...", PBMSVersion::getCString());
503
CSL.logLine(NULL, CSLog::Protocol, info);
505
CSL.logLine(NULL, CSLog::Protocol, "Barry Leslie, PrimeBase Technologies GmbH, http://www.primebase.org");
507
if ((err = MSEngine::startUp(&result))) {
508
CSL.logLine(NULL, CSLog::Error, result.mr_message);
509
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonError);
514
pbms_hton= new PBMSStorageEngine();
515
registry.add(pbms_hton);
517
pbms_hton = (handlerton *) p;
518
pbms_hton->state = SHOW_OPTION_YES;
519
pbms_hton->close_connection = pbms_close_connection; /* close_connection, cleanup thread related data. */
520
pbms_hton->create = pbms_create_handler;
521
pbms_hton->flags = HTON_CAN_RECREATE | HTON_HIDDEN;
522
pbms_hton->drop_database = pbms_drop_database; /* Drop a database */
523
pbms_hton->discover = pbms_discover_system_tables;
525
pbms_hton->commit = pbms_commit; /* commit */
526
pbms_hton->rollback = pbms_rollback; /* rollback */
528
pbms_hton->savepoint_offset = 4;
529
pbms_hton->savepoint_set = pbms_savepoint_set;
530
pbms_hton->savepoint_rollback = pbms_savepoint_rollback;
531
pbms_hton->savepoint_release = pbms_savepoint_release;
534
/* Startup the Media Stream network: */
537
if (!(thread = CSThread::newCSThread())) {
538
CSException::logOSError(CS_CONTEXT, ENOMEM);
539
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonError);
542
if (!CSThread::attach(thread)) {
543
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonError);
544
thread->myException.log(NULL);
545
CSThread::shutDown();
547
MSEngine::shutDown();
552
thread->threadName = CSString::newString("startup");
553
MSDatabase::startUp(PBMSParameters::getDefaultMetaDataHeaders());
554
MSTableList::startUp();
555
MSSystemTableShare::startUp();
556
MSNetwork::startUp(PBMSParameters::getPortNumber());
557
MSTransactionManager::startUp();
558
MSNetwork::startNetwork();
561
self->logException();
567
MSNetwork::shutDown();
568
MSTransactionManager::shutDown();
569
MSSystemTableShare::shutDown();
570
MSDatabase::stopThreads();
571
MSTableList::shutDown();
572
MSDatabase::shutDown();
573
CSThread::shutDown();
576
self->logException();
581
CSThread::detach(thread);
585
MSEngine::shutDown();
594
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonRunning);
596
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonError);
602
static int pbms_done_func(void *)
604
int pbms_done_func(void *)
612
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonShuttingDown);
613
CSL.logLine(NULL, CSLog::Protocol, "PrimeBase Media Stream (PBMS) Daemon shutdown...");
615
/* Shutdown the Media Stream network. */
616
if (!(thread = CSThread::newCSThread()))
617
CSException::logOSError(CS_CONTEXT, ENOMEM);
618
else if (!CSThread::attach(thread))
619
thread->myException.log(NULL);
623
thread->threadName = CSString::newString("shutdown");
624
MSNetwork::shutDown();
625
MSSystemTableShare::shutDown();
626
/* Ensure that the database threads are stopped before
627
* freeing the tables.
629
MSDatabase::stopThreads();
630
MSTableList::shutDown();
631
/* Databases must be shutdown after table because tables
632
* have references to repositories.
634
MSDatabase::shutDown();
636
/* Shutdown the transaction manager after the databases
637
* incase they want to commit or rollback a transaction.
639
MSTransactionManager::shutDown();
642
self->logException();
646
CSThread::shutDown();
647
CSThread::detach(thread);
650
MSEngine::shutDown();
653
CSL.logLine(NULL, CSLog::Protocol, "PrimeBase Media Stream (PBMS) Daemon shutdown completed");
654
pbms_started = false;
659
ha_pbms::ha_pbms(handlerton *hton, Table& table_arg) : handler(*hton, table_arg),
661
ha_pbms::ha_pbms(handlerton *hton, TABLE_SHARE *table_arg) : handler(hton, table_arg),
666
memset(&ha_result, 0, sizeof(PBMSResultRec));
670
MX_TABLE_TYPES_T ha_pbms::table_flags() const
673
/* We need this flag because records are not packed
674
* into a table which means #ROWID != offset
678
#if MYSQL_VERSION_ID > 50119
679
/* We can do row logging, but not statement, because
680
* MVCC is not serializable!
682
HA_BINLOG_ROW_CAPABLE |
685
* Auto-increment is allowed on a partial key.
691
int ha_pbms::open(const char *table_path, int , uint )
695
if ((ha_error = MSEngine::enterConnection(current_thd, &self, &ha_result, true)))
700
ha_open_tab = MSSystemTableShare::openSystemTable(table_path, getTable());
702
ha_lock.init(&ha_open_tab->myShare->myThrLock);
704
thr_lock_data_init(&ha_open_tab->myShare->myThrLock, &ha_lock, NULL);
706
ref_length = ha_open_tab->getRefLen();
709
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
712
return_(ha_error != MS_OK);
715
int ha_pbms::close(void)
719
if ((ha_error = MSEngine::enterConnection(current_thd, &self, &ha_result, true)))
724
ha_open_tab->release();
728
MSEngine::exitConnection();
733
/* Index access functions: */
734
int ha_pbms::index_init(uint idx, bool sorted)
741
ha_open_tab->index_init(idx);
744
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
752
int ha_pbms::index_end()
757
ha_open_tab->index_end();
760
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
768
int ha_pbms::index_read(byte * buf, const byte * key,
769
uint key_len, enum ha_rkey_function find_flag)
774
if (!ha_open_tab->index_read(buf, key, key_len, find_flag))
775
err = HA_ERR_KEY_NOT_FOUND;
779
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
787
int ha_pbms::index_read_idx(byte * buf, uint idx, const byte * key,
788
uint key_len, enum ha_rkey_function find_flag)
793
if (!ha_open_tab->index_read_idx(buf, idx, key, key_len, find_flag))
794
err = HA_ERR_KEY_NOT_FOUND;
797
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
805
int ha_pbms::index_next(byte * buf)
810
if (!ha_open_tab->index_next(buf))
811
err = HA_ERR_END_OF_FILE;
814
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
822
int ha_pbms::index_prev(byte * buf)
827
if (!ha_open_tab->index_prev(buf))
828
err = HA_ERR_END_OF_FILE;
831
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
839
int ha_pbms::index_first(byte * buf)
844
if (!ha_open_tab->index_first(buf))
845
err = HA_ERR_END_OF_FILE;
848
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
856
int ha_pbms::index_last(byte * buf)
861
if (!ha_open_tab->index_last(buf))
862
err = HA_ERR_END_OF_FILE;
865
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
873
int ha_pbms::index_read_last(byte * buf, const byte * key, uint key_len)
878
if (!ha_open_tab->index_read_last(buf, key, key_len))
879
err = HA_ERR_KEY_NOT_FOUND;
882
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
891
#endif // PBMS_HAS_KEYS
893
/* Sequential scan functions: */
895
int ha_pbms::doStartTableScan(bool )
897
int ha_pbms::rnd_init(bool )
903
ha_open_tab->seqScanInit();
906
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
914
int ha_pbms::rnd_next(unsigned char *buf)
919
if (!ha_open_tab->seqScanNext((char *) buf))
920
err = HA_ERR_END_OF_FILE;
923
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
931
void ha_pbms::position(const unsigned char *)
933
ha_open_tab->seqScanPos((uint8_t *) ref);
937
int ha_pbms::rnd_pos(unsigned char * buf, unsigned char *pos)
942
ha_open_tab->seqScanRead((uint8_t *) pos, (char *) buf);
945
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
952
//////////////////////////////
954
int ha_pbms::doInsertRecord(byte * buf)
956
int ha_pbms::write_row(unsigned char * buf)
962
ha_open_tab->insertRow((char *) buf);
965
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
973
int ha_pbms::doDeleteRecord(const byte * buf)
975
int ha_pbms::delete_row(const unsigned char * buf)
981
ha_open_tab->deleteRow((char *) buf);
984
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
992
int ha_pbms::doUpdateRecord(const byte * old_data, byte * new_data)
994
int ha_pbms::update_row(const unsigned char * old_data, unsigned char * new_data)
1000
ha_open_tab->updateRow((char *) old_data, (char *) new_data);
1003
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
1010
int ha_pbms::info(uint )
1015
int ha_pbms::external_lock(THD *thd, int lock_type)
1020
if ((ha_error = MSEngine::enterConnection(thd, &self, &ha_result, true)))
1025
if (lock_type == F_UNLCK)
1026
ha_open_tab->unuse();
1031
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
1038
THR_LOCK_DATA **ha_pbms::store_lock(THD *, THR_LOCK_DATA **to, enum thr_lock_type lock_type)
1040
if (lock_type != TL_IGNORE && ha_lock.type == TL_UNLOCK)
1041
ha_lock.type = lock_type;
1048
int PBMSStorageEngine::doCreateTable(Session&, Table&, const identifier::Table& , drizzled::message::Table& )
1050
/* You cannot create PBMS tables. */
1051
return( HA_ERR_WRONG_COMMAND );
1054
int PBMSStorageEngine::doDropTable(Session &, const identifier::Table& )
1056
/* You cannot delete PBMS tables. */
1060
int PBMSStorageEngine::doRenameTable(Session&, const identifier::Table &, const identifier::Table &)
1062
/* You cannot rename PBMS tables. */
1063
return( HA_ERR_WRONG_COMMAND );
1068
int ha_pbms::create(const char *table_name, TABLE *table, HA_CREATE_INFO *)
1070
bool isPBMS = (strcasecmp(table->s->db.str, "PBMS") == 0);
1072
if (PBMSSystemTables::isSystemTable(isPBMS, cs_last_name_of_path(table_name)))
1075
/* Create only works for system tables. */
1076
return( HA_ERR_WRONG_COMMAND );
1080
bool ha_pbms::get_error_message(int , String *buf)
1082
if (!ha_result.mr_code)
1085
buf->copy(ha_result.mr_message, strlen(ha_result.mr_message), system_charset_info);
1090
CSThread *pbms_getMySelf(THD *thd);
1091
void pbms_setMySelf(THD *thd, CSThread *self);
1093
CSThread *pbms_getMySelf(THD *thd) { return ((CSThread *) *thd->getEngineData(pbms_hton));}
1094
void pbms_setMySelf(THD *thd, CSThread *self) { *thd->getEngineData(pbms_hton) = (void *)self;}
1096
CSThread *pbms_getMySelf(THD *thd) { return ((CSThread *) *thd_ha_data(thd, pbms_hton));}
1097
void pbms_setMySelf(THD *thd, CSThread *self) { *thd_ha_data(thd, pbms_hton) = (void *)self;}