~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/Engine_ms.cc

Added the PBMS daemon plugin.

(Augen zu und durch!)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Original author: Paul McCullagh
 
20
 * Continued development: Barry Leslie
 
21
 *
 
22
 * 2007-07-20
 
23
 *
 
24
 * H&G2JCtL
 
25
 *
 
26
 * Engine interface.
 
27
 *
 
28
 */
 
29
 
 
30
#include "CSConfig.h"
 
31
#include "CSGlobal.h"
 
32
#include "CSStrUtil.h"
 
33
#include "CSThread.h"
 
34
 
 
35
#define PBMS_API        pbms_api_PBMS
 
36
 
 
37
#include "Engine_ms.h"
 
38
#include "ConnectionHandler_ms.h"
 
39
#include "OpenTable_ms.h"
 
40
#include "Util_ms.h"
 
41
#include "Network_ms.h"
 
42
#include "Transaction_ms.h"
 
43
#include "ms_mysql.h"
 
44
 
 
45
// If PBMS support is built directly into the mysql/drizzle handler code 
 
46
// then calls from all other handlers are ignored.
 
47
static bool have_handler_support = false; 
 
48
 
 
49
/*
 
50
 * ---------------------------------------------------------------
 
51
 * ENGINE CALL-IN INTERFACE
 
52
 */
 
53
 
 
54
static PBMS_API *StreamingEngines;
 
55
 
 
56
#ifdef new
 
57
#undef new
 
58
#endif
 
59
 
 
60
 
 
61
static MSOpenTable *local_open_table(const char *db_name, const char *tab_name, bool create)
 
62
{
 
63
        MSOpenTable             *otab = NULL;
 
64
        uint32_t db_id, tab_id;
 
65
        enter_();
 
66
        
 
67
        if ( MSDatabase::convertTableAndDatabaseToIDs(db_name, tab_name, &db_id, &tab_id, create))  
 
68
                otab = MSTableList::getOpenTableByID(db_id, tab_id);
 
69
                
 
70
        return_(otab);
 
71
}
 
72
 
 
73
 
 
74
/*
 
75
 * ---------------------------------------------------------------
 
76
 * ENGINE CALLBACK INTERFACE
 
77
 */
 
78
 
 
79
static void ms_register_engine(PBMSEnginePtr engine)
 
80
{
 
81
        if (engine->ms_internal)
 
82
                have_handler_support = true;
 
83
}
 
84
 
 
85
static void ms_deregister_engine(PBMSEnginePtr engine)
 
86
{
 
87
}
 
88
 
 
89
static int ms_create_blob(bool internal, const char *db_name, const char *tab_name, char *blob, size_t blob_len, char *blob_url, unsigned short col_index, PBMSResultPtr result)
 
90
{
 
91
        CSThread                *self;
 
92
        int                             err = MS_OK;
 
93
        MSOpenTable             *otab;
 
94
        CSInputStream   *i_stream = NULL;
 
95
        
 
96
        if (have_handler_support && !internal) {
 
97
                pbms_error_result(CS_CONTEXT, MS_ERR_INVALID_OPERATION, "Invalid ms_create_blob() call", result);
 
98
                return MS_ERR_ENGINE;
 
99
        }
 
100
 
 
101
        if ((err = pbms_enter_conn_no_thd(&self, result)))
 
102
                return err;
 
103
 
 
104
        inner_();
 
105
        try_(a) {
 
106
                otab = local_open_table(db_name, tab_name, true);
 
107
                frompool_(otab);
 
108
                
 
109
                if (!otab->getDB()->isRecovering()) {
 
110
                        i_stream = CSMemoryInputStream::newStream((unsigned char *)blob, blob_len);
 
111
                        otab->createBlob((PBMSBlobURLPtr)blob_url, blob_len, NULL, 0, i_stream);
 
112
                } else
 
113
                        CSException::throwException(CS_CONTEXT, MS_ERR_RECOVERY_IN_PROGRESS, "Cannot create BLOBs during repository recovery.");
 
114
 
 
115
                backtopool_(otab);
 
116
        }
 
117
        catch_(a) {
 
118
                err = pbms_exception_to_result(&self->myException, result);
 
119
        }
 
120
        cont_(a);
 
121
        return_(err);
 
122
}
 
123
 
 
124
/*
 
125
 * ms_use_blob() may or may not alter the blob url depending on the type of URL and if the BLOB is in a
 
126
 * different database or not. It may also add a BLOB reference to the BLOB table log if the BLOB was from
 
127
 * a different table or no table was specified when the BLOB was uploaded.
 
128
 *
 
129
 * There is no need to undo this function because it will be undone automaticly if the BLOB is not retained.
 
130
 */
 
131
static int ms_retain_blob(bool internal, const char *db_name, const char *tab_name, char *ret_blob_url, char *blob_url, unsigned short col_index, PBMSResultPtr result)
 
132
{
 
133
        CSThread                *self;
 
134
        int                             err = MS_OK;
 
135
        MSBlobURLRec    blob;
 
136
        MSOpenTable             *otab;
 
137
        
 
138
        if (have_handler_support && !internal) {
 
139
                cs_strcpy(PBMS_BLOB_URL_SIZE, ret_blob_url, blob_url); // This should have already been converted.
 
140
                return MS_OK;
 
141
        }
 
142
        
 
143
        if ((err = pbms_enter_conn_no_thd(&self, result)))
 
144
                return err;
 
145
 
 
146
        inner_();
 
147
        try_(a) {
 
148
                
 
149
                if (! ms_parse_blob_url(&blob, blob_url)){
 
150
                        char buffer[CS_EXC_MESSAGE_SIZE];
 
151
                        
 
152
                        cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect URL: ");
 
153
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, blob_url);
 
154
                        CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
 
155
                }
 
156
                
 
157
                otab = local_open_table(db_name, tab_name, true);
 
158
                frompool_(otab);
 
159
 
 
160
                otab->useBlob(blob.bu_type, blob.bu_db_id, blob.bu_tab_id, blob.bu_blob_id, blob.bu_auth_code, col_index, blob.bu_blob_size, blob.bu_blob_ref_id, ret_blob_url);
 
161
 
 
162
                backtopool_(otab);
 
163
        }
 
164
        catch_(a) {
 
165
                err = pbms_exception_to_result(&self->myException, result);
 
166
        }
 
167
        cont_(a);
 
168
        return_(err);
 
169
}
 
170
 
 
171
static int ms_release_blob(bool internal, const char *db_name, const char *tab_name, char *blob_url, PBMSResultPtr result)
 
172
{
 
173
        CSThread                *self;
 
174
        int                             err = MS_OK;
 
175
        MSBlobURLRec    blob;
 
176
        MSOpenTable             *otab;
 
177
 
 
178
        if (have_handler_support && !internal) 
 
179
                return MS_OK;
 
180
        
 
181
        if ((err = pbms_enter_conn_no_thd(&self, result)))
 
182
                return err;
 
183
 
 
184
        inner_();
 
185
        try_(a) {
 
186
                if (! ms_parse_blob_url(&blob, blob_url)){
 
187
                        char buffer[CS_EXC_MESSAGE_SIZE];
 
188
 
 
189
                        cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect URL: ");
 
190
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, blob_url);
 
191
                        CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
 
192
                }
 
193
                
 
194
                otab = local_open_table(db_name, tab_name, true);
 
195
                frompool_(otab);
 
196
                if (!otab->getDB()->isRecovering()) {
 
197
                        if (otab->getTableID() == blob.bu_tab_id)
 
198
                                otab->releaseReference(blob.bu_blob_id, blob.bu_blob_ref_id);
 
199
                        else {
 
200
                                char buffer[CS_EXC_MESSAGE_SIZE];
 
201
 
 
202
                                cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect table ID: ");
 
203
                                cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, blob_url);
 
204
                                CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
 
205
                        }
 
206
                }
 
207
                else {
 
208
                        char buffer[CS_EXC_MESSAGE_SIZE];
 
209
 
 
210
                        cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect URL: ");
 
211
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, blob_url);
 
212
                        CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
 
213
                }
 
214
                
 
215
                backtopool_(otab);
 
216
                
 
217
                
 
218
        }
 
219
        catch_(a) {
 
220
                err = pbms_exception_to_result(&self->myException, result);
 
221
        }
 
222
        cont_(a);
 
223
        return_(err);
 
224
}
 
225
 
 
226
typedef struct {
 
227
        bool udo_WasRename;
 
228
        CSString *udo_DatabaseName;
 
229
        CSString *udo_OldName;
 
230
        CSString *udo_NewName;
 
231
} UnDoInfoRec, *UnDoInfoPtr;
 
232
 
 
233
static int ms_drop_table(bool internal, const char *db_name, const char *tab_name, PBMSResultPtr result)
 
234
{
 
235
        CSThread        *self;
 
236
        int                     err;
 
237
 
 
238
        if (have_handler_support && !internal) 
 
239
                return MS_OK;
 
240
 
 
241
        if ((err = pbms_enter_conn_no_thd(&self, result)))
 
242
                return err;
 
243
 
 
244
        inner_();
 
245
        try_(a) {
 
246
 
 
247
                CSPath                  *new_path;
 
248
                CSPath                  *old_path;
 
249
                MSOpenTable             *otab;
 
250
                MSOpenTablePool *tab_pool;
 
251
                MSTable                 *tab;
 
252
                UnDoInfoPtr             undo_info = NULL;
 
253
 
 
254
                otab = local_open_table(db_name, tab_name, false);
 
255
                if (!otab)
 
256
                        goto exit;
 
257
                
 
258
                // If we are recovering do not delete the table.
 
259
                // It is normal for MySQL recovery scripts to delete any table they aare about to
 
260
                // recover and then recreate it. If this is done after the repository has been recovered
 
261
                // then this would delete all the recovered BLOBs in the table.
 
262
                if (otab->getDB()->isRecovering()) {
 
263
                        otab->returnToPool();
 
264
                        goto exit;
 
265
                }
 
266
 
 
267
                frompool_(otab);
 
268
 
 
269
                // Before dropping the table the table ref file is renamed so that
 
270
                // it is out of the way incase a new table is created before the
 
271
                // old one is cleaned up.
 
272
                
 
273
                old_path = otab->getDBTable()->getTableFile();
 
274
                push_(old_path);
 
275
 
 
276
                new_path = otab->getDBTable()->getTableFile(tab_name, true);
 
277
 
 
278
                // Rearrage the object stack to pop the otab object
 
279
                pop_(old_path);
 
280
                pop_(otab);
 
281
 
 
282
                push_(new_path);
 
283
                push_(old_path);
 
284
                frompool_(otab);
 
285
                
 
286
                tab = otab->getDBTable();
 
287
                pop_(otab);
 
288
                push_(tab);
 
289
 
 
290
                tab_pool = MSTableList::lockTablePoolForDeletion(otab);
 
291
                frompool_(tab_pool);
 
292
 
 
293
                if (old_path->exists())
 
294
                        old_path->move(new_path);
 
295
                tab->myDatabase->dropTable(RETAIN(tab));
 
296
                
 
297
                /* Add the table to the temp delete list if we are not recovering... */
 
298
                tab->prepareToDelete();
 
299
 
 
300
                backtopool_(tab_pool);  // The will unlock and close the table pool freeing all tables in it.
 
301
                pop_(tab);                              // Returning the pool will have released this. (YUK!)
 
302
                release_(old_path);
 
303
                release_(new_path);
 
304
 
 
305
 
 
306
                undo_info = (UnDoInfoPtr) cs_malloc(sizeof(UnDoInfoRec));
 
307
                
 
308
                undo_info->udo_WasRename = false;
 
309
                self->myInfo = undo_info;
 
310
                
 
311
                
 
312
exit: ;
 
313
        }
 
314
        
 
315
        catch_(a) {
 
316
                err = pbms_exception_to_result(&self->myException, result);
 
317
        }
 
318
        cont_(a);
 
319
        outer_();
 
320
        pbms_exit_conn();
 
321
        return err;
 
322
}
 
323
 
 
324
static void complete_delete(UnDoInfoPtr info, bool ok)
 
325
{
 
326
        // TO DO: figure out a way to undo the delete.
 
327
        cs_free(info);
 
328
        if (!ok) 
 
329
                CSException::throwException(CS_CONTEXT, MS_ERR_NOT_IMPLEMENTED, "Cannot undo delete table.");
 
330
}
 
331
 
 
332
static bool my_rename_table(const char * db_name, const char *from_table, const char *to_table)
 
333
{
 
334
        MSOpenTable             *otab;
 
335
        CSPath                  *from_path;
 
336
        CSPath                  *to_path;
 
337
        MSOpenTablePool *tab_pool;
 
338
        MSTable                 *tab;
 
339
 
 
340
        enter_();
 
341
        
 
342
        otab = local_open_table(db_name, from_table, false);
 
343
        if (!otab)
 
344
                return_(false);
 
345
                
 
346
        frompool_(otab);
 
347
 
 
348
        if (otab->getDB()->isRecovering()) 
 
349
                CSException::throwException(CS_CONTEXT, MS_ERR_RECOVERY_IN_PROGRESS, "Cannot rename tables during repository recovery.");
 
350
 
 
351
        from_path = otab->getDBTable()->getTableFile();
 
352
        push_(from_path);
 
353
 
 
354
        to_path = otab->getDBTable()->getTableFile(to_table, false);
 
355
 
 
356
        // Rearrage the object stack to pop the otab object
 
357
        pop_(from_path);
 
358
        pop_(otab);
 
359
 
 
360
        push_(to_path);
 
361
        push_(from_path);
 
362
        frompool_(otab);
 
363
 
 
364
        otab->openForReading();
 
365
        tab = otab->getDBTable();
 
366
        tab->retain();
 
367
        pop_(otab);
 
368
        push_(tab);
 
369
        
 
370
        tab_pool = MSTableList::lockTablePoolForDeletion(otab);
 
371
        frompool_(tab_pool);
 
372
 
 
373
        from_path->move(to_path);
 
374
        tab->myDatabase->renameTable(tab, to_table);
 
375
 
 
376
        backtopool_(tab_pool);  // The will unlock and close the table pool freeing all tables in it.
 
377
        pop_(tab);                              // Returning the pool will have released this. (YUK!)
 
378
        release_(from_path);
 
379
        release_(to_path);
 
380
        
 
381
        return_(true);
 
382
}
 
383
 
 
384
static int ms_rename_table(bool internal, const char * db_name, const char *from_table, const char *to_table, PBMSResultPtr result)
 
385
{
 
386
        CSThread        *self;
 
387
        int                     err;
 
388
        UnDoInfoPtr undo_info = NULL;
 
389
 
 
390
        if (have_handler_support && !internal) 
 
391
                return MS_OK;
 
392
 
 
393
        if ((err = pbms_enter_conn_no_thd(&self, result)))
 
394
                return err;
 
395
 
 
396
        inner_();
 
397
        try_(a) {
 
398
                undo_info = (UnDoInfoPtr) cs_malloc(sizeof(UnDoInfoRec));
 
399
 
 
400
                undo_info->udo_WasRename = true;
 
401
                if (my_rename_table(db_name, from_table, to_table)) {           
 
402
                        undo_info->udo_DatabaseName = CSString::newString(db_name);
 
403
                        undo_info->udo_OldName = CSString::newString(from_table);
 
404
                        undo_info->udo_NewName = CSString::newString(to_table);
 
405
                } else {
 
406
                        undo_info->udo_DatabaseName = undo_info->udo_OldName = undo_info->udo_NewName = NULL;
 
407
                }
 
408
                self->myInfo = undo_info;
 
409
        }
 
410
        catch_(a) {
 
411
                err = pbms_exception_to_result(&self->myException, result);
 
412
                if (undo_info)
 
413
                        cs_free(undo_info);
 
414
        }
 
415
        cont_(a);
 
416
        outer_();
 
417
        pbms_exit_conn();
 
418
        return err;
 
419
}
 
420
 
 
421
static void complete_rename(UnDoInfoPtr info, bool ok)
 
422
{
 
423
        // Swap the paths around here to revers the rename.
 
424
        CSString                *db_name= info->udo_DatabaseName;
 
425
        CSString                *from_table= info->udo_NewName;
 
426
        CSString                *to_table= info->udo_OldName;
 
427
        
 
428
        enter_();
 
429
        
 
430
        cs_free(info);
 
431
        if (db_name) {
 
432
                push_(db_name);
 
433
                push_(from_table);
 
434
                push_(to_table);
 
435
                if (!ok) 
 
436
                        my_rename_table(db_name->getCString(), from_table->getCString(), to_table->getCString());
 
437
                        
 
438
                release_(to_table);
 
439
                release_(from_table);
 
440
                release_(db_name);
 
441
        }
 
442
        exit_();
 
443
}
 
444
 
 
445
static void ms_completed(bool internal, bool ok)
 
446
{
 
447
        CSThread        *self;
 
448
        PBMSResultRec   result;
 
449
        
 
450
        if (have_handler_support && !internal) 
 
451
                return;
 
452
 
 
453
        if (pbms_enter_conn_no_thd(&self, &result))
 
454
                return ;
 
455
 
 
456
        if (self->myInfo) {
 
457
                UnDoInfoPtr info = (UnDoInfoPtr) self->myInfo;
 
458
                        if (info->udo_WasRename) 
 
459
                        complete_rename(info, ok);
 
460
                else
 
461
                        complete_delete(info, ok);
 
462
                
 
463
                self->myInfo = NULL;
 
464
        } else if (self->myTID && (self->myIsAutoCommit || !ok)) {
 
465
                inner_();
 
466
                try_(a) {
 
467
                        if (ok)
 
468
                                MSTransactionManager::commit();
 
469
                        else if (self->myIsAutoCommit)
 
470
                                MSTransactionManager::rollback();
 
471
                        else
 
472
                                MSTransactionManager::rollbackTo(self->myStartStmt); // Rollback the last logical statement.
 
473
                }
 
474
                catch_(a) {
 
475
                        self->logException();
 
476
                }
 
477
                cont_(a);
 
478
                outer_();
 
479
        }
 
480
        
 
481
        self->myStartStmt = self->myStmtCount;
 
482
}
 
483
 
 
484
PBMSCallbacksRec engine_callbacks = {
 
485
        MS_CALLBACK_VERSION,
 
486
        ms_register_engine,
 
487
        ms_deregister_engine,
 
488
        ms_create_blob,
 
489
        ms_retain_blob,
 
490
        ms_release_blob,
 
491
        ms_drop_table,
 
492
        ms_rename_table,
 
493
        ms_completed
 
494
};
 
495
 
 
496
// =============================
 
497
int MSEngine::startUp(PBMSResultPtr result)
 
498
{
 
499
        int err;
 
500
        
 
501
        StreamingEngines = new PBMS_API();
 
502
        err = StreamingEngines->PBMSStartup(&engine_callbacks, result);
 
503
        if (err)
 
504
                delete StreamingEngines;
 
505
        else { // Register the PBMS enabled engines the startup before PBMS
 
506
                PBMSSharedMemoryPtr             sh_mem = StreamingEngines->sharedMemory;
 
507
                PBMSEnginePtr                   engine;
 
508
                
 
509
                for (int i=0; i<sh_mem->sm_list_len; i++) {
 
510
                        if ((engine = sh_mem->sm_engine_list[i])) 
 
511
                                ms_register_engine(engine);
 
512
                }
 
513
        }
 
514
        
 
515
        return err;
 
516
}
 
517
 
 
518
void MSEngine::shutDown()
 
519
{
 
520
        StreamingEngines->PBMSShutdown();
 
521
 
 
522
        delete StreamingEngines;
 
523
}
 
524
 
 
525
const PBMSEnginePtr MSEngine::getEngineInfoAt(int indx)
 
526
{
 
527
        PBMSSharedMemoryPtr             sh_mem = StreamingEngines->sharedMemory;
 
528
        PBMSEnginePtr                   engine = NULL;
 
529
        
 
530
        if (sh_mem) {
 
531
                for (int i=0; i<sh_mem->sm_list_len; i++) {
 
532
                        if ((engine = sh_mem->sm_engine_list[i])) {
 
533
                                if (!indx)
 
534
                                        return engine;
 
535
                                indx--;
 
536
                        }
 
537
                }
 
538
        }
 
539
        
 
540
        return NULL;
 
541
}
 
542
 
 
543