1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
* Original author: Paul McCullagh
20
* Continued development: Barry Leslie
30
#ifdef USE_PRAGMA_IMPLEMENTATION
31
#pragma implementation // gcc: Class implementation
36
#include <drizzled/common.h>
37
#include <drizzled/plugin.h>
38
#include <drizzled/field.h>
39
#include <drizzled/session.h>
40
#include <drizzled/data_home.h>
41
#include <drizzled/error.h>
42
#include <drizzled/table.h>
43
#include <drizzled/field/timestamp.h>
44
#include <drizzled/plugin/transactional_storage_engine.h>
46
#define my_strdup(a,b) strdup(a)
47
using namespace drizzled;
48
using namespace drizzled::plugin;
52
#include "cslib/CSConfig.h"
54
#include "cslib/CSConfig.h"
55
#include "mysql_priv.h"
56
#include <mysql/plugin.h>
67
#include "cslib/CSDefs.h"
68
#include "cslib/CSObject.h"
69
#include "cslib/CSGlobal.h"
70
#include "cslib/CSThread.h"
71
#include "cslib/CSStrUtil.h"
72
#include "cslib/CSLog.h"
74
#include "engine_ms.h"
76
#include "network_ms.h"
77
#include "connection_handler_ms.h"
78
#include "open_table_ms.h"
79
#include "database_ms.h"
80
#include "temp_log_ms.h"
81
#include "system_table_ms.h"
83
#include "discover_ms.h"
84
#include "metadata_ms.h"
85
#include "transaction_ms.h"
86
#include "systab_httpheader_ms.h"
87
#include "system_table_ms.h"
88
#include "parameters_ms.h"
89
#include "pbmsdaemon_ms.h"
90
#include "version_ms.h"
92
/* Note: 'new' used here is NOT CSObject::new which is a DEBUG define*/
100
static int pbms_done_func(void *);
102
class PBMSStorageEngine : public drizzled::plugin::TransactionalStorageEngine {
105
: TransactionalStorageEngine(std::string("PBMS"), HTON_NO_FLAGS | HTON_HIDDEN) {}
109
pbms_done_func(NULL);
112
int close_connection(Session *);
114
int doStartTransaction(Session *session, start_transaction_option_t options);
115
int doCommit(Session *, bool);
116
int doRollback(Session *, bool);
117
Cursor *create(Table& table);
118
bool doDropSchema(const drizzled::SchemaIdentifier&);
121
* Indicates to a storage engine the start of a
124
void doStartStatement(Session *session)
130
* Indicates to a storage engine the end of
131
* the current SQL statement in the supplied
134
void doEndStatement(Session *session)
139
int doCreateTable(Session&, Table&, const TableIdentifier& ident, drizzled::message::Table& );
140
int doDropTable(Session &, const TableIdentifier& );
142
int doRenameTable(Session&, const TableIdentifier &from, const TableIdentifier &to);
144
void doGetTableIdentifiers(drizzled::CachedDirectory &dir,
145
const drizzled::SchemaIdentifier &schema,
146
drizzled::TableIdentifiers &set_of_identifiers)
148
std::set<std::string> set_of_names;
150
doGetTableNames(dir, schema, set_of_names);
151
for (std::set<std::string>::iterator set_iter = set_of_names.begin(); set_iter != set_of_names.end(); ++set_iter)
153
set_of_identifiers.push_back(TableIdentifier(schema, *set_iter));
157
void doGetTableNames(CachedDirectory&,
158
const SchemaIdentifier &schema,
159
std::set<std::string> &set_of_names)
161
bool isPBMS = schema.compare("PBMS");
163
if (isPBMS || PBMSParameters::isBLOBDatabase(schema.getSchemaName().c_str()))
164
PBMSSystemTables::getSystemTableNames(isPBMS, set_of_names);
167
int doSetSavepoint(Session *thd, NamedSavepoint &savepoint);
168
int doRollbackToSavepoint(Session *session, NamedSavepoint &savepoint);
169
int doReleaseSavepoint(Session *session, NamedSavepoint &savepoint);
170
const char **bas_ext() const;
172
int doGetTableDefinition(Session&, const TableIdentifier &identifier,
173
drizzled::message::Table &table_proto)
176
const char *tab_name = identifier.getTableName().c_str();
178
// Set some required table proto info:
179
table_proto.set_schema(identifier.getSchemaName().c_str());
180
table_proto.set_creation_timestamp(0);
181
table_proto.set_update_timestamp(0);
183
err = PBMSSystemTables::getSystemTableInfo(tab_name, table_proto);
190
bool doDoesTableExist(Session&, const TableIdentifier &identifier)
192
const char *tab_name = identifier.getTableName().c_str();
193
const char *db_name = identifier.getSchemaName().c_str();
194
bool isPBMS = identifier.getSchemaName().compare("PBMS");
196
if (isPBMS || PBMSParameters::isBLOBDatabase(db_name)) {
197
return PBMSSystemTables::isSystemTable(isPBMS, tab_name);
206
PBMSStorageEngine *pbms_hton;
208
handlerton *pbms_hton;
211
static const char *ha_pbms_exts[] = {
216
* ---------------------------------------------------------------
221
void pbms_take_part_in_transaction(void *thread)
224
if ((thd = (THD *) thread)) {
225
trans_register_ha(thd, true, pbms_hton);
231
const char **PBMSStorageEngine::bas_ext() const
233
const char **ha_pbms::bas_ext() const
240
int PBMSStorageEngine::close_connection(Session *thd)
243
static int pbms_close_connection(handlerton *hton, THD* thd)
247
MSEngine::closeConnection(thd);
254
* ---------------------------------------------------------------
260
Cursor *PBMSStorageEngine::create(Table& table)
262
PBMSStorageEngine * const hton = this;
263
return new ha_pbms(hton, table);
266
static handler *pbms_create_handler(handlerton *hton, TABLE_SHARE *table, MEM_ROOT *mem_root)
268
return new (mem_root) ha_pbms(hton, table);
273
int PBMSStorageEngine::doStartTransaction(Session *thd, start_transaction_option_t options)
280
int PBMSStorageEngine::doCommit(Session *thd, bool all)
283
static int pbms_commit(handlerton *, THD *thd, bool all)
288
PBMSResultRec result;
290
// I am not interesed in single statement transactions.
294
if (MSEngine::enterConnection(thd, &self, &result, false))
298
MSTransactionManager::commit();
301
err = MSEngine::exceptionToResult(&self->myException, &result);
304
self->myIsAutoCommit = true;
309
int PBMSStorageEngine::doRollback(THD *thd, bool all)
312
static int pbms_rollback(handlerton *, THD *thd, bool all)
317
PBMSResultRec result;
321
if (MSEngine::enterConnection(thd, &self, &result, false))
325
MSTransactionManager::rollback();
328
err = MSEngine::exceptionToResult(&self->myException, &result);
331
self->myIsAutoCommit = true;
336
int PBMSStorageEngine::doSetSavepoint(Session *thd, NamedSavepoint &savepoint)
340
PBMSResultRec result;
342
if (MSEngine::enterConnection(thd, &self, &result, false))
347
MSTransactionManager::setSavepoint(savepoint.getName().c_str());
350
err = MSEngine::exceptionToResult(&self->myException, &result);
357
int PBMSStorageEngine::doRollbackToSavepoint(Session *session, NamedSavepoint &savepoint)
361
PBMSResultRec result;
363
if (MSEngine::enterConnection(session, &self, &result, false))
367
MSTransactionManager::rollbackTo(savepoint.getName().c_str());
370
err = MSEngine::exceptionToResult(&self->myException, &result);
377
int PBMSStorageEngine::doReleaseSavepoint(Session *session, NamedSavepoint &savepoint)
381
PBMSResultRec result;
383
if (MSEngine::enterConnection(session, &self, &result, false))
388
MSTransactionManager::releaseSavepoint(savepoint.getName().c_str());
391
err = MSEngine::exceptionToResult(&self->myException, &result);
398
static int pbms_savepoint_set(handlerton *hton, THD *thd, void *sv)
402
PBMSResultRec result;
404
if (MSEngine::enterConnection(thd, &self, &result, false))
407
*((uint32_t*)sv) = self->myStmtCount;
411
static int pbms_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
415
PBMSResultRec result;
417
if (MSEngine::enterConnection(thd, &self, &result, false))
421
MSTransactionManager::rollbackToPosition(*((uint32_t*)sv));
424
err = MSEngine::exceptionToResult(&self->myException, &result);
430
static int pbms_savepoint_release(handlerton *hton, THD *thd, void *sv)
438
bool PBMSStorageEngine::doDropSchema(const drizzled::SchemaIdentifier &schema)
441
PBMSResultRec result;
443
if (MSEngine::enterConnectionNoThd(&self, &result))
448
MSDatabase::dropDatabase(schema.getSchemaName().c_str());
451
self->logException();
456
static void pbms_drop_database(handlerton *, char *path)
459
char db_name[PATH_MAX];
460
PBMSResultRec result;
462
if (MSEngine::enterConnectionNoThd(&self, &result))
466
cs_strcpy(PATH_MAX, db_name, cs_last_directory_of_path(path));
467
cs_remove_dir_char(db_name);
469
MSDatabase::dropDatabase(db_name);
472
self->logException();
478
static bool pbms_started = false;
482
int pbms_init_func(module::Context ®istry);
483
int pbms_init_func(module::Context ®istry)
485
int pbms_init_func(void *p);
486
int pbms_discover_system_tables(handlerton *hton, THD* thd, const char *db, const char *name, uchar **frmblob, size_t *frmlen);
487
int pbms_init_func(void *p)
490
PBMSResultRec result;
495
ASSERT(!pbms_started);
496
pbms_started = false;
497
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonStartUp);
501
snprintf(info, 120, "PrimeBase Media Stream (PBMS) Daemon %s loaded...", PBMSVersion::getCString());
502
CSL.logLine(NULL, CSLog::Protocol, info);
504
CSL.logLine(NULL, CSLog::Protocol, "Barry Leslie, PrimeBase Technologies GmbH, http://www.primebase.org");
506
if ((err = MSEngine::startUp(&result))) {
507
CSL.logLine(NULL, CSLog::Error, result.mr_message);
508
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonError);
513
pbms_hton= new PBMSStorageEngine();
514
registry.add(pbms_hton);
516
pbms_hton = (handlerton *) p;
517
pbms_hton->state = SHOW_OPTION_YES;
518
pbms_hton->close_connection = pbms_close_connection; /* close_connection, cleanup thread related data. */
519
pbms_hton->create = pbms_create_handler;
520
pbms_hton->flags = HTON_CAN_RECREATE | HTON_HIDDEN;
521
pbms_hton->drop_database = pbms_drop_database; /* Drop a database */
522
pbms_hton->discover = pbms_discover_system_tables;
524
pbms_hton->commit = pbms_commit; /* commit */
525
pbms_hton->rollback = pbms_rollback; /* rollback */
527
pbms_hton->savepoint_offset = 4;
528
pbms_hton->savepoint_set = pbms_savepoint_set;
529
pbms_hton->savepoint_rollback = pbms_savepoint_rollback;
530
pbms_hton->savepoint_release = pbms_savepoint_release;
533
/* Startup the Media Stream network: */
536
if (!(thread = CSThread::newCSThread())) {
537
CSException::logOSError(CS_CONTEXT, ENOMEM);
538
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonError);
541
if (!CSThread::attach(thread)) {
542
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonError);
543
thread->myException.log(NULL);
544
CSThread::shutDown();
546
MSEngine::shutDown();
551
thread->threadName = CSString::newString("startup");
552
MSDatabase::startUp(PBMSParameters::getDefaultMetaDataHeaders());
553
MSTableList::startUp();
554
MSSystemTableShare::startUp();
555
MSNetwork::startUp(PBMSParameters::getPortNumber());
556
MSTransactionManager::startUp();
557
MSNetwork::startNetwork();
560
self->logException();
566
MSNetwork::shutDown();
567
MSTransactionManager::shutDown();
568
MSSystemTableShare::shutDown();
569
MSDatabase::stopThreads();
570
MSTableList::shutDown();
571
MSDatabase::shutDown();
572
CSThread::shutDown();
575
self->logException();
580
CSThread::detach(thread);
584
MSEngine::shutDown();
593
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonRunning);
595
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonError);
601
static int pbms_done_func(void *)
603
int pbms_done_func(void *)
611
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonShuttingDown);
612
CSL.logLine(NULL, CSLog::Protocol, "PrimeBase Media Stream (PBMS) Daemon shutdown...");
614
/* Shutdown the Media Stream network. */
615
if (!(thread = CSThread::newCSThread()))
616
CSException::logOSError(CS_CONTEXT, ENOMEM);
617
else if (!CSThread::attach(thread))
618
thread->myException.log(NULL);
622
thread->threadName = CSString::newString("shutdown");
623
MSNetwork::shutDown();
624
MSSystemTableShare::shutDown();
625
/* Ensure that the database threads are stopped before
626
* freeing the tables.
628
MSDatabase::stopThreads();
629
MSTableList::shutDown();
630
/* Databases must be shutdown after table because tables
631
* have references to repositories.
633
MSDatabase::shutDown();
635
/* Shutdown the transaction manager after the databases
636
* incase they want to commit or rollback a transaction.
638
MSTransactionManager::shutDown();
641
self->logException();
645
CSThread::shutDown();
646
CSThread::detach(thread);
649
MSEngine::shutDown();
652
CSL.logLine(NULL, CSLog::Protocol, "PrimeBase Media Stream (PBMS) Daemon shutdown completed");
653
pbms_started = false;
658
ha_pbms::ha_pbms(handlerton *hton, Table& table_arg) : handler(*hton, table_arg),
660
ha_pbms::ha_pbms(handlerton *hton, TABLE_SHARE *table_arg) : handler(hton, table_arg),
665
memset(&ha_result, 0, sizeof(PBMSResultRec));
669
MX_TABLE_TYPES_T ha_pbms::table_flags() const
672
/* We need this flag because records are not packed
673
* into a table which means #ROWID != offset
677
#if MYSQL_VERSION_ID > 50119
678
/* We can do row logging, but not statement, because
679
* MVCC is not serializable!
681
HA_BINLOG_ROW_CAPABLE |
684
* Auto-increment is allowed on a partial key.
690
int ha_pbms::open(const char *table_path, int , uint )
694
if ((ha_error = MSEngine::enterConnection(current_thd, &self, &ha_result, true)))
699
ha_open_tab = MSSystemTableShare::openSystemTable(table_path, getTable());
701
ha_lock.init(&ha_open_tab->myShare->myThrLock);
703
thr_lock_data_init(&ha_open_tab->myShare->myThrLock, &ha_lock, NULL);
705
ref_length = ha_open_tab->getRefLen();
708
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
711
return_(ha_error != MS_OK);
714
int ha_pbms::close(void)
718
if ((ha_error = MSEngine::enterConnection(current_thd, &self, &ha_result, true)))
723
ha_open_tab->release();
727
MSEngine::exitConnection();
732
/* Index access functions: */
733
int ha_pbms::index_init(uint idx, bool sorted)
740
ha_open_tab->index_init(idx);
743
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
751
int ha_pbms::index_end()
756
ha_open_tab->index_end();
759
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
767
int ha_pbms::index_read(byte * buf, const byte * key,
768
uint key_len, enum ha_rkey_function find_flag)
773
if (!ha_open_tab->index_read(buf, key, key_len, find_flag))
774
err = HA_ERR_KEY_NOT_FOUND;
778
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
786
int ha_pbms::index_read_idx(byte * buf, uint idx, const byte * key,
787
uint key_len, enum ha_rkey_function find_flag)
792
if (!ha_open_tab->index_read_idx(buf, idx, key, key_len, find_flag))
793
err = HA_ERR_KEY_NOT_FOUND;
796
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
804
int ha_pbms::index_next(byte * buf)
809
if (!ha_open_tab->index_next(buf))
810
err = HA_ERR_END_OF_FILE;
813
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
821
int ha_pbms::index_prev(byte * buf)
826
if (!ha_open_tab->index_prev(buf))
827
err = HA_ERR_END_OF_FILE;
830
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
838
int ha_pbms::index_first(byte * buf)
843
if (!ha_open_tab->index_first(buf))
844
err = HA_ERR_END_OF_FILE;
847
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
855
int ha_pbms::index_last(byte * buf)
860
if (!ha_open_tab->index_last(buf))
861
err = HA_ERR_END_OF_FILE;
864
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
872
int ha_pbms::index_read_last(byte * buf, const byte * key, uint key_len)
877
if (!ha_open_tab->index_read_last(buf, key, key_len))
878
err = HA_ERR_KEY_NOT_FOUND;
881
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
890
#endif // PBMS_HAS_KEYS
892
/* Sequential scan functions: */
894
int ha_pbms::doStartTableScan(bool )
896
int ha_pbms::rnd_init(bool )
902
ha_open_tab->seqScanInit();
905
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
913
int ha_pbms::rnd_next(unsigned char *buf)
918
if (!ha_open_tab->seqScanNext((char *) buf))
919
err = HA_ERR_END_OF_FILE;
922
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
930
void ha_pbms::position(const unsigned char *)
932
ha_open_tab->seqScanPos((uint8_t *) ref);
936
int ha_pbms::rnd_pos(unsigned char * buf, unsigned char *pos)
941
ha_open_tab->seqScanRead((uint8_t *) pos, (char *) buf);
944
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
951
//////////////////////////////
953
int ha_pbms::doInsertRecord(byte * buf)
955
int ha_pbms::write_row(unsigned char * buf)
961
ha_open_tab->insertRow((char *) buf);
964
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
972
int ha_pbms::doDeleteRecord(const byte * buf)
974
int ha_pbms::delete_row(const unsigned char * buf)
980
ha_open_tab->deleteRow((char *) buf);
983
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
991
int ha_pbms::doUpdateRecord(const byte * old_data, byte * new_data)
993
int ha_pbms::update_row(const unsigned char * old_data, unsigned char * new_data)
999
ha_open_tab->updateRow((char *) old_data, (char *) new_data);
1002
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
1009
int ha_pbms::info(uint )
1014
int ha_pbms::external_lock(THD *thd, int lock_type)
1019
if ((ha_error = MSEngine::enterConnection(thd, &self, &ha_result, true)))
1024
if (lock_type == F_UNLCK)
1025
ha_open_tab->unuse();
1030
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
1037
THR_LOCK_DATA **ha_pbms::store_lock(THD *, THR_LOCK_DATA **to, enum thr_lock_type lock_type)
1039
if (lock_type != TL_IGNORE && ha_lock.type == TL_UNLOCK)
1040
ha_lock.type = lock_type;
1047
int PBMSStorageEngine::doCreateTable(Session&, Table&, const TableIdentifier& , drizzled::message::Table& )
1049
/* You cannot create PBMS tables. */
1050
return( HA_ERR_WRONG_COMMAND );
1053
int PBMSStorageEngine::doDropTable(Session &, const TableIdentifier& )
1055
/* You cannot delete PBMS tables. */
1059
int PBMSStorageEngine::doRenameTable(Session&, const TableIdentifier &, const TableIdentifier &)
1061
/* You cannot rename PBMS tables. */
1062
return( HA_ERR_WRONG_COMMAND );
1067
int ha_pbms::create(const char *table_name, TABLE *table, HA_CREATE_INFO *)
1069
bool isPBMS = (strcasecmp(table->s->db.str, "PBMS") == 0);
1071
if (PBMSSystemTables::isSystemTable(isPBMS, cs_last_name_of_path(table_name)))
1074
/* Create only works for system tables. */
1075
return( HA_ERR_WRONG_COMMAND );
1079
bool ha_pbms::get_error_message(int , String *buf)
1081
if (!ha_result.mr_code)
1084
buf->copy(ha_result.mr_message, strlen(ha_result.mr_message), system_charset_info);
1089
CSThread *pbms_getMySelf(THD *thd);
1090
void pbms_setMySelf(THD *thd, CSThread *self);
1092
CSThread *pbms_getMySelf(THD *thd) { return ((CSThread *) *thd->getEngineData(pbms_hton));}
1093
void pbms_setMySelf(THD *thd, CSThread *self) { *thd->getEngineData(pbms_hton) = (void *)self;}
1095
CSThread *pbms_getMySelf(THD *thd) { return ((CSThread *) *thd_ha_data(thd, pbms_hton));}
1096
void pbms_setMySelf(THD *thd, CSThread *self) { *thd_ha_data(thd, pbms_hton) = (void *)self;}