1
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
* Original author: Paul McCullagh
20
* Continued development: Barry Leslie
32
#include <drizzled/common.h>
33
#include <drizzled/current_session.h>
34
#include <drizzled/session.h>
38
#include "cslib/CSConfig.h"
39
#include "cslib/CSGlobal.h"
40
#include "cslib/CSStrUtil.h"
41
#include "cslib/CSThread.h"
44
#define PBMS_API pbms_internal
48
#include "engine_ms.h"
49
#include "connection_handler_ms.h"
50
#include "open_table_ms.h"
51
#include "network_ms.h"
52
#include "transaction_ms.h"
61
extern CSThread *pbms_getMySelf(THD *thd);
62
extern void pbms_setMySelf(THD *thd, CSThread *self);
67
* ---------------------------------------------------------------
68
* ENGINE CALL-IN INTERFACE
71
static PBMS_API *StreamingEngines;
72
// If PBMS support is built directly into the mysql/drizzle handler code
73
// then calls from all other handlers are ignored.
74
static bool have_handler_support = false;
77
* ---------------------------------------------------------------
78
* ENGINE CALLBACK INTERFACE
81
static void ms_register_engine(PBMSEnginePtr engine)
83
if (engine->ms_internal)
84
have_handler_support = true;
87
static void ms_deregister_engine(PBMSEnginePtr engine)
92
static int ms_create_blob(bool internal, const char *db_name, const char *tab_name, char *blob, size_t blob_len, PBMSBlobURLPtr blob_url, PBMSResultPtr result)
94
if (have_handler_support && !internal) {
95
MSEngine::errorResult(CS_CONTEXT, MS_ERR_INVALID_OPERATION, "Invalid ms_create_blob() call", result);
99
return MSEngine::createBlob(db_name, tab_name, blob, blob_len, blob_url, result);
103
* ms_use_blob() may or may not alter the blob url depending on the type of URL and if the BLOB is in a
104
* different database or not. It may also add a BLOB reference to the BLOB table log if the BLOB was from
105
* a different table or no table was specified when the BLOB was uploaded.
107
* There is no need to undo this function because it will be undone automaticly if the BLOB is not retained.
109
static int ms_retain_blob(bool internal, const char *db_name, const char *tab_name, PBMSBlobURLPtr ret_blob_url, char *blob_url, unsigned short col_index, PBMSResultPtr result)
111
if (have_handler_support && !internal) {
112
cs_strcpy(PBMS_BLOB_URL_SIZE, ret_blob_url->bu_data, blob_url); // This should have already been converted.
116
return MSEngine::referenceBlob(db_name, tab_name, ret_blob_url, blob_url, col_index, result);
119
static int ms_release_blob(bool internal, const char *db_name, const char *tab_name, char *blob_url, PBMSResultPtr result)
122
if (have_handler_support && !internal)
125
return MSEngine::dereferenceBlob(db_name, tab_name, blob_url, result);
128
static int ms_drop_table(bool internal, const char *db_name, const char *tab_name, PBMSResultPtr result)
130
if (have_handler_support && !internal)
133
return MSEngine::dropTable(db_name, tab_name, result);
136
static int ms_rename_table(bool internal, const char * db_name, const char *from_table, const char *to_db, const char *to_table, PBMSResultPtr result)
138
if (have_handler_support && !internal)
141
return MSEngine::renameTable(db_name, from_table, to_db, to_table, result);
144
static void ms_completed(bool internal, bool ok)
146
if (have_handler_support && !internal)
149
MSEngine::callCompleted(ok);
152
PBMSCallbacksRec engine_callbacks = {
155
ms_deregister_engine,
164
// =============================
165
int MSEngine::startUp(PBMSResultPtr result)
169
StreamingEngines = new PBMS_API();
170
err = StreamingEngines->PBMSStartup(&engine_callbacks, result);
172
delete StreamingEngines;
173
else { // Register the PBMS enabled engines the startup before PBMS
174
PBMSSharedMemoryPtr sh_mem = StreamingEngines->sharedMemory;
175
PBMSEnginePtr engine;
177
for (int i=0; i<sh_mem->sm_list_len; i++) {
178
if ((engine = sh_mem->sm_engine_list[i]))
179
ms_register_engine(engine);
185
void MSEngine::shutDown()
187
StreamingEngines->PBMSShutdown();
189
delete StreamingEngines;
192
const PBMSEnginePtr MSEngine::getEngineInfoAt(int indx)
194
PBMSSharedMemoryPtr sh_mem = StreamingEngines->sharedMemory;
195
PBMSEnginePtr engine = NULL;
198
for (int i=0; i<sh_mem->sm_list_len; i++) {
199
if ((engine = sh_mem->sm_engine_list[i])) {
207
return (const PBMSEnginePtr)NULL;
212
bool MSEngine::try_createBlob(CSThread *self, const char *db_name, const char *tab_name, char *blob, size_t blob_len, PBMSBlobURLPtr blob_url)
214
volatile bool rtc = true;
218
CSInputStream *i_stream = NULL;
220
otab = openTable(db_name, tab_name, true);
223
if (!otab->getDB()->isRecovering()) {
224
i_stream = CSMemoryInputStream::newStream((unsigned char *)blob, blob_len);
225
otab->createBlob(blob_url, blob_len, NULL, 0, i_stream);
227
CSException::throwException(CS_CONTEXT, MS_ERR_RECOVERY_IN_PROGRESS, "Cannot create BLOBs during repository recovery.");
238
int32_t MSEngine::createBlob(const char *db_name, const char *tab_name, char *blob, size_t blob_len, PBMSBlobURLPtr blob_url, PBMSResultPtr result)
244
if ((err = enterConnectionNoThd(&self, result)))
248
if (try_createBlob(self, db_name, tab_name, blob, blob_len, blob_url))
249
err = exceptionToResult(&self->myException, result);
255
bool MSEngine::try_referenceBlob(CSThread *self, const char *db_name, const char *tab_name, PBMSBlobURLPtr ret_blob_url, char *blob_url, uint16_t col_index)
257
volatile bool rtc = true;
262
if (! PBMSBlobURLTools::couldBeURL(blob_url, &blob)){
263
char buffer[CS_EXC_MESSAGE_SIZE];
265
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect URL: ");
266
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, blob_url);
267
CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
270
otab = openTable(db_name, tab_name, true);
273
otab->useBlob(blob.bu_type, blob.bu_db_id, blob.bu_tab_id, blob.bu_blob_id, blob.bu_auth_code, col_index, blob.bu_blob_size, blob.bu_blob_ref_id, ret_blob_url);
284
int32_t MSEngine::referenceBlob(const char *db_name, const char *tab_name, PBMSBlobURLPtr ret_blob_url, char *blob_url, uint16_t col_index, PBMSResultPtr result)
290
if ((err = enterConnectionNoThd(&self, result)))
294
if (try_referenceBlob(self, db_name, tab_name, ret_blob_url, blob_url, col_index))
295
err = exceptionToResult(&self->myException, result);
302
bool MSEngine::try_dereferenceBlob(CSThread *self, const char *db_name, const char *tab_name, char *blob_url)
304
volatile bool rtc = true;
309
if (! PBMSBlobURLTools::couldBeURL(blob_url, &blob)){
310
char buffer[CS_EXC_MESSAGE_SIZE];
312
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect URL: ");
313
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, blob_url);
314
CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
317
otab = openTable(db_name, tab_name, true);
319
if (!otab->getDB()->isRecovering()) {
320
if (otab->getTableID() == blob.bu_tab_id)
321
otab->releaseReference(blob.bu_blob_id, blob.bu_blob_ref_id);
323
char buffer[CS_EXC_MESSAGE_SIZE];
325
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect table ID: ");
326
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, blob_url);
327
CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
331
char buffer[CS_EXC_MESSAGE_SIZE];
333
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect URL: ");
334
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, blob_url);
335
CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
346
int32_t MSEngine::dereferenceBlob(const char *db_name, const char *tab_name, char *blob_url, PBMSResultPtr result)
351
if ((err = enterConnectionNoThd(&self, result)))
355
if (try_dereferenceBlob(self, db_name, tab_name, blob_url))
356
err = exceptionToResult(&self->myException, result);
361
bool MSEngine::try_dropDatabase(CSThread *self, const char *db_name)
363
volatile bool rtc = true;
365
MSDatabase::dropDatabase(db_name);
374
int32_t MSEngine::dropDatabase(const char *db_name, PBMSResultPtr result)
379
if ((err = enterConnectionNoThd(&self, result)))
384
if (try_dropDatabase(self, db_name))
385
err = exceptionToResult(&self->myException, result);
391
typedef struct UnDoInfo {
393
CSString *udo_toDatabaseName;
394
CSString *udo_fromDatabaseName;
395
CSString *udo_OldName;
396
CSString *udo_NewName;
397
} UnDoInfoRec, *UnDoInfoPtr;
400
bool MSEngine::try_dropTable(CSThread *self, const char *db_name, const char *tab_name)
402
volatile bool rtc = true;
408
MSOpenTablePool *tab_pool;
410
UnDoInfoPtr undo_info = NULL;
412
otab = openTable(db_name, tab_name, false);
417
// If we are recovering do not delete the table.
418
// It is normal for MySQL recovery scripts to delete any table they aare about to
419
// recover and then recreate it. If this is done after the repository has been recovered
420
// then this would delete all the recovered BLOBs in the table.
421
if (otab->getDB()->isRecovering()) {
422
otab->returnToPool();
428
// Before dropping the table the table ref file is renamed so that
429
// it is out of the way incase a new table is created before the
430
// old one is cleaned up.
432
old_path = otab->getDBTable()->getTableFile();
435
new_path = otab->getDBTable()->getTableFile(tab_name, true);
437
// Rearrage the object stack to pop the otab object
445
tab = otab->getDBTable();
449
tab_pool = MSTableList::lockTablePoolForDeletion(otab);
452
if (old_path->exists())
453
old_path->move(RETAIN(new_path));
454
tab->myDatabase->dropTable(RETAIN(tab));
456
/* Add the table to the temp delete list if we are not recovering... */
457
tab->prepareToDelete();
459
backtopool_(tab_pool); // The will unlock and close the table pool freeing all tables in it.
460
pop_(tab); // Returning the pool will have released this. (YUK!)
465
undo_info = (UnDoInfoPtr) cs_malloc(sizeof(UnDoInfoRec));
467
undo_info->udo_WasRename = false;
468
self->myInfo = undo_info;
479
int32_t MSEngine::dropTable(const char *db_name, const char *tab_name, PBMSResultPtr result)
484
if ((err = enterConnectionNoThd(&self, result)))
488
if (try_dropTable(self, db_name, tab_name))
489
err = exceptionToResult(&self->myException, result);
497
static void completeDeleteTable(UnDoInfoPtr info, bool ok)
499
// TO DO: figure out a way to undo the delete.
502
CSException::throwException(CS_CONTEXT, MS_ERR_NOT_IMPLEMENTED, "Cannot undo delete table.");
506
bool MSEngine::renameTable(const char *from_db_name, const char *from_table, const char *to_db_name, const char *to_table)
511
MSOpenTablePool *tab_pool;
516
if (strcmp(to_db_name, from_db_name) != 0) {
517
CSException::throwException(CS_CONTEXT, MS_ERR_NOT_IMPLEMENTED, "Cannot rename tables containing BLOBs across databases (yet). Sorry!");
520
otab = openTable(from_db_name, from_table, false);
526
if (otab->getDB()->isRecovering())
527
CSException::throwException(CS_CONTEXT, MS_ERR_RECOVERY_IN_PROGRESS, "Cannot rename tables during repository recovery.");
529
from_path = otab->getDBTable()->getTableFile();
532
to_path = otab->getDBTable()->getTableFile(to_table, false);
534
// Rearrage the object stack to pop the otab object
542
otab->openForReading();
543
tab = otab->getDBTable();
548
tab_pool = MSTableList::lockTablePoolForDeletion(otab);
551
from_path->move(RETAIN(to_path));
552
tab->myDatabase->renameTable(tab, to_table);
554
backtopool_(tab_pool); // The will unlock and close the table pool freeing all tables in it.
555
pop_(tab); // Returning the pool will have released this. (YUK!)
563
bool MSEngine::try_renameTable(CSThread *self, const char *from_db_name, const char *from_table, const char *to_db_name, const char *to_table)
565
volatile bool rtc = true;
567
UnDoInfoPtr undo_info = (UnDoInfoPtr) cs_malloc(sizeof(UnDoInfoRec));
568
push_ptr_(undo_info);
570
undo_info->udo_WasRename = true;
571
if (renameTable(from_db_name, from_table, to_db_name, to_table)) {
572
undo_info->udo_fromDatabaseName = CSString::newString(from_db_name);
573
push_(undo_info->udo_fromDatabaseName);
575
undo_info->udo_toDatabaseName = CSString::newString(to_db_name);
576
push_(undo_info->udo_toDatabaseName);
578
undo_info->udo_OldName = CSString::newString(from_table);
579
push_(undo_info->udo_OldName);
581
undo_info->udo_NewName = CSString::newString(to_table);
583
pop_(undo_info->udo_OldName);
584
pop_(undo_info->udo_toDatabaseName);
585
pop_(undo_info->udo_fromDatabaseName);
587
undo_info->udo_fromDatabaseName = undo_info->udo_toDatabaseName = undo_info->udo_OldName = undo_info->udo_NewName = NULL;
589
self->myInfo = undo_info;
599
int32_t MSEngine::renameTable(const char *from_db_name, const char *from_table, const char *to_db_name, const char *to_table, PBMSResultPtr result)
604
if ((err = enterConnectionNoThd(&self, result)))
608
if (try_renameTable(self, from_db_name, from_table, to_db_name, to_table))
609
err = exceptionToResult(&self->myException, result);
617
void MSEngine::completeRenameTable(UnDoInfoPtr info, bool ok)
619
// Swap the paths around here to revers the rename.
620
CSString *from_db_name= info->udo_toDatabaseName;
621
CSString *to_db_name= info->udo_fromDatabaseName;
622
CSString *from_table= info->udo_NewName;
623
CSString *to_table= info->udo_OldName;
634
renameTable(from_db_name->getCString(), from_table->getCString(), to_db_name->getCString(), to_table->getCString());
637
release_(to_db_name);
638
release_(from_table);
639
release_(from_db_name);
645
static bool try_CompleteTransaction(CSThread *self, bool ok)
647
volatile bool rtc = true;
650
MSTransactionManager::commit();
651
else if (self->myIsAutoCommit)
652
MSTransactionManager::rollback();
654
MSTransactionManager::rollbackToPosition(self->myStartStmt); // Rollback the last logical statement.
664
void MSEngine::callCompleted(bool ok)
667
PBMSResultRec result;
669
if (enterConnectionNoThd(&self, &result))
673
UnDoInfoPtr info = (UnDoInfoPtr) self->myInfo;
674
if (info->udo_WasRename)
675
completeRenameTable(info, ok);
677
completeDeleteTable(info, ok);
681
} else if (self->myTID && (self->myIsAutoCommit || !ok)) {
683
if (try_CompleteTransaction(self, ok)) {
684
self->logException();
689
self->myStartStmt = self->myStmtCount;
693
MSOpenTable *MSEngine::openTable(const char *db_name, const char *tab_name, bool create)
695
MSOpenTable *otab = NULL;
696
uint32_t db_id, tab_id;
699
if ( MSDatabase::convertTableAndDatabaseToIDs(db_name, tab_name, &db_id, &tab_id, create))
700
otab = MSTableList::getOpenTableByID(db_id, tab_id);
706
bool MSEngine::couldBeURL(const char *blob_url, size_t length)
709
return PBMSBlobURLTools::couldBeURL(blob_url, length, &blob);
713
int MSEngine::exceptionToResult(CSException *e, PBMSResultPtr result)
715
const char *context, *trace;
717
result->mr_code = e->getErrorCode();
718
cs_strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, e->getMessage());
719
context = e->getContext();
720
trace = e->getStackTrace();
721
if (context && *context) {
722
cs_strcpy(MS_RESULT_STACK_SIZE, result->mr_stack, context);
724
cs_strcat(MS_RESULT_STACK_SIZE, result->mr_stack, "\n");
727
*result->mr_stack = 0;
729
cs_strcat(MS_RESULT_STACK_SIZE, result->mr_stack, trace);
730
return MS_ERR_ENGINE;
734
int MSEngine::errorResult(const char *func, const char *file, int line, int err, const char *message, PBMSResultPtr result)
738
e.initException(func, file, line, err, message);
739
return exceptionToResult(&e, result);
743
int MSEngine::osErrorResult(const char *func, const char *file, int line, int err, PBMSResultPtr result)
747
e.initOSError(func, file, line, err);
748
return MSEngine::exceptionToResult(&e, result);
752
int MSEngine::enterConnection(THD *thd, CSThread **r_self, PBMSResultPtr result, bool doCreate)
754
CSThread *self = NULL;
757
// In drizzle there is no 1:1 relationship between pthreads and sessions
758
// so we must always get it from the session handle NOT the current pthread.
759
self = CSThread::getSelf();
763
if (!(self = pbms_getMySelf(thd))) {
765
return MS_ERR_NOT_FOUND;
767
if (!(self = CSThread::newCSThread()))
768
return osErrorResult(CS_CONTEXT, ENOMEM, result);
769
if (!CSThread::attach(self))
770
return MSEngine::exceptionToResult(&self->myException, result);
771
pbms_setMySelf(thd, self);
773
if (!CSThread::setSelf(self))
774
return MSEngine::exceptionToResult(&self->myException, result);
778
return MS_ERR_NOT_FOUND;
780
if (!(self = CSThread::newCSThread()))
781
return osErrorResult(CS_CONTEXT, ENOMEM, result);
782
if (!CSThread::attach(self))
783
return MSEngine::exceptionToResult(&self->myException, result);
792
int MSEngine::enterConnectionNoThd(CSThread **r_self, PBMSResultPtr result)
794
return enterConnection(current_thd, r_self, result, true);
798
void MSEngine::exitConnection()
800
THD *thd = (THD *) current_thd;
803
self = CSThread::getSelf();
804
if (self && self->pbms_api_owner)
809
CSThread::setSelf(NULL);
811
self = CSThread::getSelf();
812
CSThread::detach(self);
817
void MSEngine::closeConnection(THD* thd)
821
self = CSThread::getSelf();
822
if (self && self->pbms_api_owner)
826
if ((self = pbms_getMySelf(thd))) {
827
pbms_setMySelf(thd, NULL);
828
CSThread::setSelf(self);
829
CSThread::detach(self);
833
self = CSThread::getSelf();
834
CSThread::detach(self);