1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
* Original author: Paul McCullagh
20
* Continued development: Barry Leslie
32
#include "CSStrUtil.h"
35
#define PBMS_API pbms_api_PBMS
37
#include "Engine_ms.h"
38
#include "ConnectionHandler_ms.h"
39
#include "OpenTable_ms.h"
41
#include "Network_ms.h"
42
#include "Transaction_ms.h"
45
// If PBMS support is built directly into the mysql/drizzle handler code
46
// then calls from all other handlers are ignored.
47
static bool have_handler_support = false;
50
* ---------------------------------------------------------------
51
* ENGINE CALL-IN INTERFACE
54
static PBMS_API *StreamingEngines;
61
static MSOpenTable *local_open_table(const char *db_name, const char *tab_name, bool create)
63
MSOpenTable *otab = NULL;
64
uint32_t db_id, tab_id;
67
if ( MSDatabase::convertTableAndDatabaseToIDs(db_name, tab_name, &db_id, &tab_id, create))
68
otab = MSTableList::getOpenTableByID(db_id, tab_id);
75
* ---------------------------------------------------------------
76
* ENGINE CALLBACK INTERFACE
79
static void ms_register_engine(PBMSEnginePtr engine)
81
if (engine->ms_internal)
82
have_handler_support = true;
85
static void ms_deregister_engine(PBMSEnginePtr engine)
89
static int ms_create_blob(bool internal, const char *db_name, const char *tab_name, char *blob, size_t blob_len, char *blob_url, unsigned short col_index, PBMSResultPtr result)
94
CSInputStream *i_stream = NULL;
96
if (have_handler_support && !internal) {
97
pbms_error_result(CS_CONTEXT, MS_ERR_INVALID_OPERATION, "Invalid ms_create_blob() call", result);
101
if ((err = pbms_enter_conn_no_thd(&self, result)))
106
otab = local_open_table(db_name, tab_name, true);
109
if (!otab->getDB()->isRecovering()) {
110
i_stream = CSMemoryInputStream::newStream((unsigned char *)blob, blob_len);
111
otab->createBlob((PBMSBlobURLPtr)blob_url, blob_len, NULL, 0, i_stream);
113
CSException::throwException(CS_CONTEXT, MS_ERR_RECOVERY_IN_PROGRESS, "Cannot create BLOBs during repository recovery.");
118
err = pbms_exception_to_result(&self->myException, result);
125
* ms_use_blob() may or may not alter the blob url depending on the type of URL and if the BLOB is in a
126
* different database or not. It may also add a BLOB reference to the BLOB table log if the BLOB was from
127
* a different table or no table was specified when the BLOB was uploaded.
129
* There is no need to undo this function because it will be undone automaticly if the BLOB is not retained.
131
static int ms_retain_blob(bool internal, const char *db_name, const char *tab_name, char *ret_blob_url, char *blob_url, unsigned short col_index, PBMSResultPtr result)
138
if (have_handler_support && !internal) {
139
cs_strcpy(PBMS_BLOB_URL_SIZE, ret_blob_url, blob_url); // This should have already been converted.
143
if ((err = pbms_enter_conn_no_thd(&self, result)))
149
if (! ms_parse_blob_url(&blob, blob_url)){
150
char buffer[CS_EXC_MESSAGE_SIZE];
152
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect URL: ");
153
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, blob_url);
154
CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
157
otab = local_open_table(db_name, tab_name, true);
160
otab->useBlob(blob.bu_type, blob.bu_db_id, blob.bu_tab_id, blob.bu_blob_id, blob.bu_auth_code, col_index, blob.bu_blob_size, blob.bu_blob_ref_id, ret_blob_url);
165
err = pbms_exception_to_result(&self->myException, result);
171
static int ms_release_blob(bool internal, const char *db_name, const char *tab_name, char *blob_url, PBMSResultPtr result)
178
if (have_handler_support && !internal)
181
if ((err = pbms_enter_conn_no_thd(&self, result)))
186
if (! ms_parse_blob_url(&blob, blob_url)){
187
char buffer[CS_EXC_MESSAGE_SIZE];
189
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect URL: ");
190
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, blob_url);
191
CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
194
otab = local_open_table(db_name, tab_name, true);
196
if (!otab->getDB()->isRecovering()) {
197
if (otab->getTableID() == blob.bu_tab_id)
198
otab->releaseReference(blob.bu_blob_id, blob.bu_blob_ref_id);
200
char buffer[CS_EXC_MESSAGE_SIZE];
202
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect table ID: ");
203
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, blob_url);
204
CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
208
char buffer[CS_EXC_MESSAGE_SIZE];
210
cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect URL: ");
211
cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, blob_url);
212
CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
220
err = pbms_exception_to_result(&self->myException, result);
228
CSString *udo_DatabaseName;
229
CSString *udo_OldName;
230
CSString *udo_NewName;
231
} UnDoInfoRec, *UnDoInfoPtr;
233
static int ms_drop_table(bool internal, const char *db_name, const char *tab_name, PBMSResultPtr result)
238
if (have_handler_support && !internal)
241
if ((err = pbms_enter_conn_no_thd(&self, result)))
250
MSOpenTablePool *tab_pool;
252
UnDoInfoPtr undo_info = NULL;
254
otab = local_open_table(db_name, tab_name, false);
258
// If we are recovering do not delete the table.
259
// It is normal for MySQL recovery scripts to delete any table they aare about to
260
// recover and then recreate it. If this is done after the repository has been recovered
261
// then this would delete all the recovered BLOBs in the table.
262
if (otab->getDB()->isRecovering()) {
263
otab->returnToPool();
269
// Before dropping the table the table ref file is renamed so that
270
// it is out of the way incase a new table is created before the
271
// old one is cleaned up.
273
old_path = otab->getDBTable()->getTableFile();
276
new_path = otab->getDBTable()->getTableFile(tab_name, true);
278
// Rearrage the object stack to pop the otab object
286
tab = otab->getDBTable();
290
tab_pool = MSTableList::lockTablePoolForDeletion(otab);
293
if (old_path->exists())
294
old_path->move(new_path);
295
tab->myDatabase->dropTable(RETAIN(tab));
297
/* Add the table to the temp delete list if we are not recovering... */
298
tab->prepareToDelete();
300
backtopool_(tab_pool); // The will unlock and close the table pool freeing all tables in it.
301
pop_(tab); // Returning the pool will have released this. (YUK!)
306
undo_info = (UnDoInfoPtr) cs_malloc(sizeof(UnDoInfoRec));
308
undo_info->udo_WasRename = false;
309
self->myInfo = undo_info;
316
err = pbms_exception_to_result(&self->myException, result);
324
static void complete_delete(UnDoInfoPtr info, bool ok)
326
// TO DO: figure out a way to undo the delete.
329
CSException::throwException(CS_CONTEXT, MS_ERR_NOT_IMPLEMENTED, "Cannot undo delete table.");
332
static bool my_rename_table(const char * db_name, const char *from_table, const char *to_table)
337
MSOpenTablePool *tab_pool;
342
otab = local_open_table(db_name, from_table, false);
348
if (otab->getDB()->isRecovering())
349
CSException::throwException(CS_CONTEXT, MS_ERR_RECOVERY_IN_PROGRESS, "Cannot rename tables during repository recovery.");
351
from_path = otab->getDBTable()->getTableFile();
354
to_path = otab->getDBTable()->getTableFile(to_table, false);
356
// Rearrage the object stack to pop the otab object
364
otab->openForReading();
365
tab = otab->getDBTable();
370
tab_pool = MSTableList::lockTablePoolForDeletion(otab);
373
from_path->move(to_path);
374
tab->myDatabase->renameTable(tab, to_table);
376
backtopool_(tab_pool); // The will unlock and close the table pool freeing all tables in it.
377
pop_(tab); // Returning the pool will have released this. (YUK!)
384
static int ms_rename_table(bool internal, const char * db_name, const char *from_table, const char *to_table, PBMSResultPtr result)
388
UnDoInfoPtr undo_info = NULL;
390
if (have_handler_support && !internal)
393
if ((err = pbms_enter_conn_no_thd(&self, result)))
398
undo_info = (UnDoInfoPtr) cs_malloc(sizeof(UnDoInfoRec));
400
undo_info->udo_WasRename = true;
401
if (my_rename_table(db_name, from_table, to_table)) {
402
undo_info->udo_DatabaseName = CSString::newString(db_name);
403
undo_info->udo_OldName = CSString::newString(from_table);
404
undo_info->udo_NewName = CSString::newString(to_table);
406
undo_info->udo_DatabaseName = undo_info->udo_OldName = undo_info->udo_NewName = NULL;
408
self->myInfo = undo_info;
411
err = pbms_exception_to_result(&self->myException, result);
421
static void complete_rename(UnDoInfoPtr info, bool ok)
423
// Swap the paths around here to revers the rename.
424
CSString *db_name= info->udo_DatabaseName;
425
CSString *from_table= info->udo_NewName;
426
CSString *to_table= info->udo_OldName;
436
my_rename_table(db_name->getCString(), from_table->getCString(), to_table->getCString());
439
release_(from_table);
445
static void ms_completed(bool internal, bool ok)
448
PBMSResultRec result;
450
if (have_handler_support && !internal)
453
if (pbms_enter_conn_no_thd(&self, &result))
457
UnDoInfoPtr info = (UnDoInfoPtr) self->myInfo;
458
if (info->udo_WasRename)
459
complete_rename(info, ok);
461
complete_delete(info, ok);
464
} else if (self->myTID && (self->myIsAutoCommit || !ok)) {
468
MSTransactionManager::commit();
469
else if (self->myIsAutoCommit)
470
MSTransactionManager::rollback();
472
MSTransactionManager::rollbackTo(self->myStartStmt); // Rollback the last logical statement.
475
self->logException();
481
self->myStartStmt = self->myStmtCount;
484
PBMSCallbacksRec engine_callbacks = {
487
ms_deregister_engine,
496
// =============================
497
int MSEngine::startUp(PBMSResultPtr result)
501
StreamingEngines = new PBMS_API();
502
err = StreamingEngines->PBMSStartup(&engine_callbacks, result);
504
delete StreamingEngines;
505
else { // Register the PBMS enabled engines the startup before PBMS
506
PBMSSharedMemoryPtr sh_mem = StreamingEngines->sharedMemory;
507
PBMSEnginePtr engine;
509
for (int i=0; i<sh_mem->sm_list_len; i++) {
510
if ((engine = sh_mem->sm_engine_list[i]))
511
ms_register_engine(engine);
518
void MSEngine::shutDown()
520
StreamingEngines->PBMSShutdown();
522
delete StreamingEngines;
525
const PBMSEnginePtr MSEngine::getEngineInfoAt(int indx)
527
PBMSSharedMemoryPtr sh_mem = StreamingEngines->sharedMemory;
528
PBMSEnginePtr engine = NULL;
531
for (int i=0; i<sh_mem->sm_list_len; i++) {
532
if ((engine = sh_mem->sm_engine_list[i])) {