1
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
* Original author: Paul McCullagh
20
* Continued development: Barry Leslie
30
#ifdef USE_PRAGMA_IMPLEMENTATION
31
#pragma implementation // gcc: Class implementation
37
#include <drizzled/current_session.h>
38
#include <drizzled/common.h>
39
#include <drizzled/plugin.h>
40
#include <drizzled/field.h>
41
#include <drizzled/session.h>
42
#include <drizzled/data_home.h>
43
#include <drizzled/error.h>
44
#include <drizzled/table.h>
45
#include <drizzled/plugin/transactional_storage_engine.h>
46
#include <drizzled/named_savepoint.h>
48
#define my_strdup(a,b) strdup(a)
49
using namespace drizzled;
50
using namespace drizzled::plugin;
54
#include "cslib/CSConfig.h"
56
#include "cslib/CSConfig.h"
57
#include "mysql_priv.h"
58
#include <mysql/plugin.h>
69
#include "cslib/CSDefs.h"
70
#include "cslib/CSObject.h"
71
#include "cslib/CSGlobal.h"
72
#include "cslib/CSThread.h"
73
#include "cslib/CSStrUtil.h"
74
#include "cslib/CSLog.h"
76
#include "engine_ms.h"
78
#include "network_ms.h"
79
#include "connection_handler_ms.h"
80
#include "open_table_ms.h"
81
#include "database_ms.h"
82
#include "temp_log_ms.h"
83
#include "system_table_ms.h"
85
#include "discover_ms.h"
86
#include "metadata_ms.h"
87
#include "transaction_ms.h"
88
#include "systab_httpheader_ms.h"
89
#include "system_table_ms.h"
90
#include "parameters_ms.h"
91
#include "pbmsdaemon_ms.h"
92
#include "version_ms.h"
94
/* Note: 'new' used here is NOT CSObject::new which is a DEBUG define*/
102
static int pbms_done_func(void *);
104
class PBMSStorageEngine : public drizzled::plugin::TransactionalStorageEngine {
107
: TransactionalStorageEngine(std::string("PBMS"), HTON_NO_FLAGS | HTON_HIDDEN) {}
111
pbms_done_func(NULL);
114
int close_connection(Session *);
116
int doStartTransaction(Session *session, start_transaction_option_t options);
117
int doCommit(Session *, bool);
118
int doRollback(Session *, bool);
119
Cursor *create(Table& table);
120
bool doDropSchema(const drizzled::identifier::Schema&);
123
* Indicates to a storage engine the start of a
126
void doStartStatement(Session *session)
132
* Indicates to a storage engine the end of
133
* the current SQL statement in the supplied
136
void doEndStatement(Session *session)
141
int doCreateTable(Session&, Table&, const identifier::Table& ident, const drizzled::message::Table& );
142
int doDropTable(Session &, const identifier::Table& );
144
int doRenameTable(Session&, const identifier::Table &from, const identifier::Table &to);
146
void doGetTableIdentifiers(drizzled::CachedDirectory &dir,
147
const drizzled::identifier::Schema &schema,
148
drizzled::identifier::table::vector &set_of_identifiers)
150
std::set<std::string> set_of_names;
152
doGetTableNames(dir, schema, set_of_names);
153
for (std::set<std::string>::iterator set_iter = set_of_names.begin(); set_iter != set_of_names.end(); ++set_iter)
155
set_of_identifiers.push_back(identifier::Table(schema, *set_iter));
159
void doGetTableNames(CachedDirectory&,
160
const identifier::Schema &schema,
161
std::set<std::string> &set_of_names)
163
bool isPBMS = schema.compare("PBMS");
165
if (isPBMS || PBMSParameters::isBLOBDatabase(schema.getSchemaName().c_str()))
166
PBMSSystemTables::getSystemTableNames(isPBMS, set_of_names);
169
int doSetSavepoint(Session *thd, NamedSavepoint &savepoint);
170
int doRollbackToSavepoint(Session *session, NamedSavepoint &savepoint);
171
int doReleaseSavepoint(Session *session, NamedSavepoint &savepoint);
172
const char **bas_ext() const;
174
int doGetTableDefinition(Session&, const identifier::Table &identifier,
175
drizzled::message::Table &table_proto)
178
const char *tab_name = identifier.getTableName().c_str();
180
// Set some required table proto info:
181
table_proto.set_schema(identifier.getSchemaName().c_str());
182
table_proto.set_creation_timestamp(0);
183
table_proto.set_update_timestamp(0);
185
err = PBMSSystemTables::getSystemTableInfo(tab_name, table_proto);
192
bool doDoesTableExist(Session&, const identifier::Table &identifier)
194
const char *tab_name = identifier.getTableName().c_str();
195
const char *db_name = identifier.getSchemaName().c_str();
196
bool isPBMS = identifier.getSchemaName().compare("PBMS");
198
if (isPBMS || PBMSParameters::isBLOBDatabase(db_name)) {
199
return PBMSSystemTables::isSystemTable(isPBMS, tab_name);
208
PBMSStorageEngine *pbms_hton;
210
handlerton *pbms_hton;
213
static const char *ha_pbms_exts[] = {
218
* ---------------------------------------------------------------
223
void pbms_take_part_in_transaction(void *thread)
226
if ((thd = (THD *) thread)) {
227
trans_register_ha(thd, true, pbms_hton);
233
const char **PBMSStorageEngine::bas_ext() const
235
const char **ha_pbms::bas_ext() const
242
int PBMSStorageEngine::close_connection(Session *thd)
245
static int pbms_close_connection(handlerton *hton, THD* thd)
249
MSEngine::closeConnection(thd);
256
* ---------------------------------------------------------------
262
Cursor *PBMSStorageEngine::create(Table& table)
264
PBMSStorageEngine * const hton = this;
265
return new ha_pbms(hton, table);
268
static handler *pbms_create_handler(handlerton *hton, TABLE_SHARE *table, MEM_ROOT *mem_root)
270
return new (mem_root) ha_pbms(hton, table);
275
int PBMSStorageEngine::doStartTransaction(Session *thd, start_transaction_option_t options)
282
int PBMSStorageEngine::doCommit(Session *thd, bool all)
285
static int pbms_commit(handlerton *, THD *thd, bool all)
290
PBMSResultRec result;
292
// I am not interesed in single statement transactions.
296
if (MSEngine::enterConnection(thd, &self, &result, false))
300
MSTransactionManager::commit();
303
err = MSEngine::exceptionToResult(&self->myException, &result);
306
self->myIsAutoCommit = true;
311
int PBMSStorageEngine::doRollback(THD *thd, bool all)
314
static int pbms_rollback(handlerton *, THD *thd, bool all)
319
PBMSResultRec result;
323
if (MSEngine::enterConnection(thd, &self, &result, false))
327
MSTransactionManager::rollback();
330
err = MSEngine::exceptionToResult(&self->myException, &result);
333
self->myIsAutoCommit = true;
338
int PBMSStorageEngine::doSetSavepoint(Session *thd, NamedSavepoint &savepoint)
342
PBMSResultRec result;
344
if (MSEngine::enterConnection(thd, &self, &result, false))
349
MSTransactionManager::setSavepoint(savepoint.getName().c_str());
352
err = MSEngine::exceptionToResult(&self->myException, &result);
359
int PBMSStorageEngine::doRollbackToSavepoint(Session *session, NamedSavepoint &savepoint)
363
PBMSResultRec result;
365
if (MSEngine::enterConnection(session, &self, &result, false))
369
MSTransactionManager::rollbackTo(savepoint.getName().c_str());
372
err = MSEngine::exceptionToResult(&self->myException, &result);
379
int PBMSStorageEngine::doReleaseSavepoint(Session *session, NamedSavepoint &savepoint)
383
PBMSResultRec result;
385
if (MSEngine::enterConnection(session, &self, &result, false))
390
MSTransactionManager::releaseSavepoint(savepoint.getName().c_str());
393
err = MSEngine::exceptionToResult(&self->myException, &result);
400
static int pbms_savepoint_set(handlerton *hton, THD *thd, void *sv)
404
PBMSResultRec result;
406
if (MSEngine::enterConnection(thd, &self, &result, false))
409
*((uint32_t*)sv) = self->myStmtCount;
413
static int pbms_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
417
PBMSResultRec result;
419
if (MSEngine::enterConnection(thd, &self, &result, false))
423
MSTransactionManager::rollbackToPosition(*((uint32_t*)sv));
426
err = MSEngine::exceptionToResult(&self->myException, &result);
432
static int pbms_savepoint_release(handlerton *hton, THD *thd, void *sv)
440
bool PBMSStorageEngine::doDropSchema(const drizzled::identifier::Schema &schema)
443
PBMSResultRec result;
445
if (MSEngine::enterConnectionNoThd(&self, &result))
450
MSDatabase::dropDatabase(schema.getSchemaName().c_str());
453
self->logException();
458
static void pbms_drop_database(handlerton *, char *path)
461
char db_name[PATH_MAX];
462
PBMSResultRec result;
464
if (MSEngine::enterConnectionNoThd(&self, &result))
468
cs_strcpy(PATH_MAX, db_name, cs_last_directory_of_path(path));
469
cs_remove_dir_char(db_name);
471
MSDatabase::dropDatabase(db_name);
474
self->logException();
480
static bool pbms_started = false;
484
int pbms_init_func(module::Context ®istry);
485
int pbms_init_func(module::Context ®istry)
487
int pbms_init_func(void *p);
488
int pbms_discover_system_tables(handlerton *hton, THD* thd, const char *db, const char *name, uchar **frmblob, size_t *frmlen);
489
int pbms_init_func(void *p)
492
PBMSResultRec result;
497
ASSERT(!pbms_started);
498
pbms_started = false;
499
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonStartUp);
503
snprintf(info, 120, "PrimeBase Media Stream (PBMS) Daemon %s loaded...", PBMSVersion::getCString());
504
CSL.logLine(NULL, CSLog::Protocol, info);
506
CSL.logLine(NULL, CSLog::Protocol, "Barry Leslie, PrimeBase Technologies GmbH, http://www.primebase.org");
508
if ((err = MSEngine::startUp(&result))) {
509
CSL.logLine(NULL, CSLog::Error, result.mr_message);
510
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonError);
515
pbms_hton= new PBMSStorageEngine();
516
registry.add(pbms_hton);
518
pbms_hton = (handlerton *) p;
519
pbms_hton->state = SHOW_OPTION_YES;
520
pbms_hton->close_connection = pbms_close_connection; /* close_connection, cleanup thread related data. */
521
pbms_hton->create = pbms_create_handler;
522
pbms_hton->flags = HTON_CAN_RECREATE | HTON_HIDDEN;
523
pbms_hton->drop_database = pbms_drop_database; /* Drop a database */
524
pbms_hton->discover = pbms_discover_system_tables;
526
pbms_hton->commit = pbms_commit; /* commit */
527
pbms_hton->rollback = pbms_rollback; /* rollback */
529
pbms_hton->savepoint_offset = 4;
530
pbms_hton->savepoint_set = pbms_savepoint_set;
531
pbms_hton->savepoint_rollback = pbms_savepoint_rollback;
532
pbms_hton->savepoint_release = pbms_savepoint_release;
535
/* Startup the Media Stream network: */
538
if (!(thread = CSThread::newCSThread())) {
539
CSException::logOSError(CS_CONTEXT, ENOMEM);
540
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonError);
543
if (!CSThread::attach(thread)) {
544
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonError);
545
thread->myException.log(NULL);
546
CSThread::shutDown();
548
MSEngine::shutDown();
553
thread->threadName = CSString::newString("startup");
554
MSDatabase::startUp(PBMSParameters::getDefaultMetaDataHeaders());
555
MSTableList::startUp();
556
MSSystemTableShare::startUp();
557
MSNetwork::startUp(PBMSParameters::getPortNumber());
558
MSTransactionManager::startUp();
559
MSNetwork::startNetwork();
562
self->logException();
568
MSNetwork::shutDown();
569
MSTransactionManager::shutDown();
570
MSSystemTableShare::shutDown();
571
MSDatabase::stopThreads();
572
MSTableList::shutDown();
573
MSDatabase::shutDown();
574
CSThread::shutDown();
577
self->logException();
582
CSThread::detach(thread);
586
MSEngine::shutDown();
595
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonRunning);
597
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonError);
603
static int pbms_done_func(void *)
605
int pbms_done_func(void *)
613
PBMSDaemon::setDaemonState(PBMSDaemon::DaemonShuttingDown);
614
CSL.logLine(NULL, CSLog::Protocol, "PrimeBase Media Stream (PBMS) Daemon shutdown...");
616
/* Shutdown the Media Stream network. */
617
if (!(thread = CSThread::newCSThread()))
618
CSException::logOSError(CS_CONTEXT, ENOMEM);
619
else if (!CSThread::attach(thread))
620
thread->myException.log(NULL);
624
thread->threadName = CSString::newString("shutdown");
625
MSNetwork::shutDown();
626
MSSystemTableShare::shutDown();
627
/* Ensure that the database threads are stopped before
628
* freeing the tables.
630
MSDatabase::stopThreads();
631
MSTableList::shutDown();
632
/* Databases must be shutdown after table because tables
633
* have references to repositories.
635
MSDatabase::shutDown();
637
/* Shutdown the transaction manager after the databases
638
* incase they want to commit or rollback a transaction.
640
MSTransactionManager::shutDown();
643
self->logException();
647
CSThread::shutDown();
648
CSThread::detach(thread);
651
MSEngine::shutDown();
654
CSL.logLine(NULL, CSLog::Protocol, "PrimeBase Media Stream (PBMS) Daemon shutdown completed");
655
pbms_started = false;
660
ha_pbms::ha_pbms(handlerton *hton, Table& table_arg) : handler(*hton, table_arg),
662
ha_pbms::ha_pbms(handlerton *hton, TABLE_SHARE *table_arg) : handler(hton, table_arg),
667
memset(&ha_result, 0, sizeof(PBMSResultRec));
671
MX_TABLE_TYPES_T ha_pbms::table_flags() const
674
/* We need this flag because records are not packed
675
* into a table which means #ROWID != offset
679
#if MYSQL_VERSION_ID > 50119
680
/* We can do row logging, but not statement, because
681
* MVCC is not serializable!
683
HA_BINLOG_ROW_CAPABLE |
686
* Auto-increment is allowed on a partial key.
692
int ha_pbms::open(const char *table_path, int , uint )
696
if ((ha_error = MSEngine::enterConnection(current_thd, &self, &ha_result, true)))
702
ha_open_tab = MSSystemTableShare::openSystemTable(table_path, getTable());
703
ha_lock.init(&ha_open_tab->myShare->myThrLock);
705
ha_open_tab = MSSystemTableShare::openSystemTable(table_path, table);
706
thr_lock_data_init(&ha_open_tab->myShare->myThrLock, &ha_lock, NULL);
708
ref_length = ha_open_tab->getRefLen();
711
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
714
return_(ha_error != MS_OK);
717
int ha_pbms::close(void)
721
if ((ha_error = MSEngine::enterConnection(current_thd, &self, &ha_result, true)))
726
ha_open_tab->release();
730
MSEngine::exitConnection();
735
/* Index access functions: */
736
int ha_pbms::index_init(uint idx, bool sorted)
743
ha_open_tab->index_init(idx);
746
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
754
int ha_pbms::index_end()
759
ha_open_tab->index_end();
762
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
770
int ha_pbms::index_read(byte * buf, const byte * key,
771
uint key_len, enum ha_rkey_function find_flag)
776
if (!ha_open_tab->index_read(buf, key, key_len, find_flag))
777
err = HA_ERR_KEY_NOT_FOUND;
781
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
789
int ha_pbms::index_read_idx(byte * buf, uint idx, const byte * key,
790
uint key_len, enum ha_rkey_function find_flag)
795
if (!ha_open_tab->index_read_idx(buf, idx, key, key_len, find_flag))
796
err = HA_ERR_KEY_NOT_FOUND;
799
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
807
int ha_pbms::index_next(byte * buf)
812
if (!ha_open_tab->index_next(buf))
813
err = HA_ERR_END_OF_FILE;
816
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
824
int ha_pbms::index_prev(byte * buf)
829
if (!ha_open_tab->index_prev(buf))
830
err = HA_ERR_END_OF_FILE;
833
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
841
int ha_pbms::index_first(byte * buf)
846
if (!ha_open_tab->index_first(buf))
847
err = HA_ERR_END_OF_FILE;
850
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
858
int ha_pbms::index_last(byte * buf)
863
if (!ha_open_tab->index_last(buf))
864
err = HA_ERR_END_OF_FILE;
867
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
875
int ha_pbms::index_read_last(byte * buf, const byte * key, uint key_len)
880
if (!ha_open_tab->index_read_last(buf, key, key_len))
881
err = HA_ERR_KEY_NOT_FOUND;
884
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
893
#endif // PBMS_HAS_KEYS
895
/* Sequential scan functions: */
897
int ha_pbms::doStartTableScan(bool )
899
int ha_pbms::rnd_init(bool )
905
ha_open_tab->seqScanInit();
908
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
916
int ha_pbms::rnd_next(unsigned char *buf)
921
if (!ha_open_tab->seqScanNext((char *) buf))
922
err = HA_ERR_END_OF_FILE;
925
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
933
void ha_pbms::position(const unsigned char *)
935
ha_open_tab->seqScanPos((unsigned char *) ref);
939
int ha_pbms::rnd_pos(unsigned char * buf, unsigned char *pos)
944
ha_open_tab->seqScanRead(pos, (char *) buf);
947
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
954
//////////////////////////////
956
int ha_pbms::doInsertRecord(byte * buf)
958
int ha_pbms::write_row(unsigned char * buf)
964
ha_open_tab->insertRow((char *) buf);
967
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
975
int ha_pbms::doDeleteRecord(const byte * buf)
977
int ha_pbms::delete_row(const unsigned char * buf)
983
ha_open_tab->deleteRow((char *) buf);
986
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
994
int ha_pbms::doUpdateRecord(const byte * old_data, byte * new_data)
996
int ha_pbms::update_row(const unsigned char * old_data, unsigned char * new_data)
1002
ha_open_tab->updateRow((char *) old_data, (char *) new_data);
1005
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
1012
int ha_pbms::info(uint )
1017
int ha_pbms::external_lock(THD *thd, int lock_type)
1022
if ((ha_error = MSEngine::enterConnection(thd, &self, &ha_result, true)))
1027
if (lock_type == F_UNLCK)
1028
ha_open_tab->unuse();
1033
ha_error = MSEngine::exceptionToResult(&self->myException, &ha_result);
1040
THR_LOCK_DATA **ha_pbms::store_lock(THD *, THR_LOCK_DATA **to, enum thr_lock_type lock_type)
1042
if (lock_type != TL_IGNORE && ha_lock.type == TL_UNLOCK)
1043
ha_lock.type = lock_type;
1050
int PBMSStorageEngine::doCreateTable(Session&, Table&, const identifier::Table& , const drizzled::message::Table& )
1052
/* You cannot create PBMS tables. */
1053
return( HA_ERR_WRONG_COMMAND );
1056
int PBMSStorageEngine::doDropTable(Session &, const identifier::Table& )
1058
/* You cannot delete PBMS tables. */
1062
int PBMSStorageEngine::doRenameTable(Session&, const identifier::Table &, const identifier::Table &)
1064
/* You cannot rename PBMS tables. */
1065
return( HA_ERR_WRONG_COMMAND );
1070
int ha_pbms::create(const char *table_name, TABLE *table, HA_CREATE_INFO *)
1072
bool isPBMS = (strcasecmp(table->s->db.str, "PBMS") == 0);
1074
if (PBMSSystemTables::isSystemTable(isPBMS, cs_last_name_of_path(table_name)))
1077
/* Create only works for system tables. */
1078
return( HA_ERR_WRONG_COMMAND );
1082
bool ha_pbms::get_error_message(int , String *buf)
1084
if (!ha_result.mr_code)
1087
buf->copy(ha_result.mr_message, strlen(ha_result.mr_message), system_charset_info);
1092
CSThread *pbms_getMySelf(THD *thd);
1093
void pbms_setMySelf(THD *thd, CSThread *self);
1095
CSThread *pbms_getMySelf(THD *thd) { return ((CSThread *) *thd->getEngineData(pbms_hton));}
1096
void pbms_setMySelf(THD *thd, CSThread *self) { *thd->getEngineData(pbms_hton) = (void *)self;}
1098
CSThread *pbms_getMySelf(THD *thd) { return ((CSThread *) *thd_ha_data(thd, pbms_hton));}
1099
void pbms_setMySelf(THD *thd, CSThread *self) { *thd_ha_data(thd, pbms_hton) = (void *)self;}