~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/ha_pbms.cc

Added the PBMS daemon plugin.

(Augen zu und durch!)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Original author: Paul McCullagh
 
20
 * Continued development: Barry Leslie
 
21
 *
 
22
 * 2007-05-20
 
23
 *
 
24
 * H&G2JCtL
 
25
 *
 
26
 * Table handler.
 
27
 *
 
28
 */
 
29
 
 
30
#ifdef USE_PRAGMA_IMPLEMENTATION
 
31
#pragma implementation                          // gcc: Class implementation
 
32
#endif
 
33
 
 
34
#ifdef DRIZZLED
 
35
#include <drizzled/server_includes.h>
 
36
#include <drizzled/handler.h>
 
37
#include <drizzled/table.h>
 
38
#include <drizzled/current_session.h>
 
39
#include <drizzled/plugin/storage_engine.h>
 
40
#include <drizzled/plugin.h>
 
41
 
 
42
#define my_strdup(a,b) strdup(a)
 
43
 
 
44
/*
 
45
#include <sys/stat.h>
 
46
#include <drizzled/common_server.h>
 
47
#include <drizzled/data_home.h>
 
48
#include <drizzled/error.h>
 
49
#include <drizzled/handler.h>
 
50
#include <drizzled/plugin/storage_engine.h>
 
51
*/
 
52
#include "CSConfig.h"
 
53
#else
 
54
#include "CSConfig.h"
 
55
#include "mysql_priv.h"
 
56
#include <mysql/plugin.h>
 
57
#include <my_dir.h>
 
58
#endif 
 
59
 
 
60
#include <stdlib.h>
 
61
#include <time.h>
 
62
 
 
63
 
 
64
#include "Defs_ms.h"
 
65
 
 
66
#include "CSDefs.h"
 
67
#include "CSObject.h"
 
68
#include "CSGlobal.h"
 
69
#include "CSThread.h"
 
70
#include "CSStrUtil.h"
 
71
#include "CSTest.h"
 
72
#include "CSLog.h"
 
73
 
 
74
#include "Engine_ms.h"  
 
75
#include "ha_pbms.h"
 
76
#include "Network_ms.h"
 
77
#include "ConnectionHandler_ms.h"
 
78
#include "OpenTable_ms.h"
 
79
#include "Database_ms.h"
 
80
#include "TempLog_ms.h"
 
81
#include "Util_ms.h"
 
82
#include "SystemTable_ms.h"
 
83
#include "ms_mysql.h"
 
84
#include "Discover_ms.h"
 
85
#include "metadata_ms.h"
 
86
#include "Transaction_ms.h"
 
87
#include "SysTab_httpheader.h"
 
88
 
 
89
/* Note: 'new' used here is NOT CSObject::new which is a DEBUG define*/
 
90
#ifdef new
 
91
#undef new
 
92
#endif
 
93
 
 
94
#ifndef PBMS_PORT
 
95
#define PBMS_PORT 8080
 
96
#endif
 
97
 
 
98
static int              pbms_port = PBMS_PORT; 
 
99
static char             *pbms_repository_threshold;
 
100
static char             *pbms_temp_log_threshold;
 
101
static char             *pbms_http_metadata_headers;
 
102
 
 
103
#ifdef DRIZZLED
 
104
class PBMSTableNameIterator: public TableNameIteratorImplementation
 
105
{
 
106
private:
 
107
  uint32_t current_name;
 
108
 
 
109
public:
 
110
  PBMSTableNameIterator(const std::string &database)
 
111
    : TableNameIteratorImplementation(database), current_name(0)
 
112
    {};
 
113
 
 
114
   int next(std::string *name);
 
115
 
 
116
};
 
117
 
 
118
int PBMSTableNameIterator::next(string *name)
 
119
{
 
120
        const char *tab_name = pbms_getSysTableName(current_name++);
 
121
        
 
122
        if (!tab_name) 
 
123
                return -1;
 
124
                
 
125
    if (name)
 
126
      name->assign(tab_name);
 
127
          
 
128
        return 0;
 
129
        
 
130
}
 
131
 
 
132
int pbms_discover_system_tables(const char *name, drizzled::message::Table *table);
 
133
 
 
134
class PBMSStorageEngine : public StorageEngine {
 
135
public:
 
136
        PBMSStorageEngine(std::string name_arg)
 
137
        : StorageEngine(name_arg, HTON_NO_FLAGS | HTON_HIDDEN, 4 /*savepoint_offset*/, false /*support_2pc*/) {}
 
138
 
 
139
        int close_connection(Session *);
 
140
        int commit(Session *, bool);
 
141
        int rollback(Session *, bool);
 
142
        handler *create(TABLE_SHARE *, MEM_ROOT *);
 
143
        void drop_database(char *);
 
144
        int savepoint_set(Session *thd, void *sv);
 
145
        int savepoint_rollback(Session *thd, void *sv);
 
146
        int savepoint_release(Session *thd, void *sv);
 
147
        const char **bas_ext() const;
 
148
 
 
149
        int createTableImplementation(Session*, const char *path, Table *,HA_CREATE_INFO *, drizzled::message::Table*)
 
150
        {
 
151
                if (pbms_is_Systable(cs_last_name_of_path(path)))
 
152
                        return(0);
 
153
                        
 
154
                /* Create only works for system tables. */
 
155
                return( HA_ERR_WRONG_COMMAND );
 
156
        }
 
157
        
 
158
        int deleteTableImplementation(Session*, const string table_name) { return 0;}
 
159
        
 
160
        int getTableProtoImplementation(const char* path, drizzled::message::Table *table_proto)
 
161
        {
 
162
                int err = pbms_discover_system_tables(path, table_proto);
 
163
                if (err)
 
164
                        return err;
 
165
                        
 
166
                return EEXIST;
 
167
        }
 
168
 
 
169
        TableNameIteratorImplementation* tableNameIterator(const std::string &database)
 
170
        {
 
171
                return new PBMSTableNameIterator(database);
 
172
        }
 
173
 
 
174
        int renameTableImplementation(Session*,
 
175
                                                                                          const char *from, const char *to)
 
176
        {
 
177
          return -1;
 
178
        }
 
179
};
 
180
 
 
181
PBMSStorageEngine       *pbms_hton;
 
182
#else
 
183
handlerton              *pbms_hton;
 
184
#endif
 
185
 
 
186
static const char *ha_pbms_exts[] = {
 
187
        NullS
 
188
};
 
189
 
 
190
/*
 
191
 * ---------------------------------------------------------------
 
192
 * UTILITIES
 
193
 */
 
194
 
 
195
void pbms_take_part_in_transaction(void *thread)
 
196
{
 
197
        THD                     *thd;
 
198
        if ((thd = (THD *) thread)) {
 
199
                trans_register_ha(thd, true, pbms_hton); 
 
200
        }
 
201
}
 
202
 
 
203
#ifdef DRIZZLED
 
204
const char **PBMSStorageEngine::bas_ext() const
 
205
#else
 
206
const char **ha_pbms::bas_ext() const
 
207
#endif
 
208
{
 
209
        return ha_pbms_exts;
 
210
}
 
211
 
 
212
#ifdef DRIZZLED
 
213
int PBMSStorageEngine::close_connection(Session *thd)
 
214
{
 
215
        PBMSStorageEngine * const hton = this;
 
216
#else
 
217
static int pbms_close_connection(handlerton *hton, THD* thd)
 
218
{
 
219
#endif
 
220
        CSThread        *self;
 
221
 
 
222
        self = CSThread::getSelf();
 
223
        if (self && self->pbms_api_owner)
 
224
                return 0;
 
225
 
 
226
        if (thd) {
 
227
                if ((self = (CSThread *) *thd_ha_data(thd, pbms_hton))) {
 
228
                        *thd_ha_data(thd, pbms_hton) = NULL;
 
229
                        CSThread::setSelf(self);
 
230
                        CSThread::detach(self);
 
231
                }
 
232
        }
 
233
        else {
 
234
                self = CSThread::getSelf();
 
235
                CSThread::detach(self);
 
236
        }
 
237
        return 0;
 
238
}
 
239
 
 
240
static int pbms_enter_conn(void *thread, CSThread **r_self, PBMSResultPtr result)
 
241
{
 
242
        THD                     *thd;
 
243
        CSThread        *self;
 
244
 
 
245
        self = CSThread::getSelf();
 
246
        if (!self) {    
 
247
                if ((thd = (THD *) thread)) {
 
248
                        if (!(self = (CSThread *) *thd_ha_data(((THD *) thd), pbms_hton))) {
 
249
                                if (!(self = CSThread::newCSThread()))
 
250
                                        return pbms_os_error_result(CS_CONTEXT, ENOMEM, result);
 
251
                                if (!CSThread::attach(self))
 
252
                                        return pbms_exception_to_result(&self->myException, result);
 
253
                                *thd_ha_data(thd, pbms_hton) = self;
 
254
                        }
 
255
                        else {
 
256
                                if (!CSThread::setSelf(self))
 
257
                                        return pbms_exception_to_result(&self->myException, result);
 
258
                        }
 
259
                }
 
260
                else {
 
261
                        if (!(self = CSThread::newCSThread()))
 
262
                                return pbms_os_error_result(CS_CONTEXT, ENOMEM, result);
 
263
                        if (!CSThread::attach(self))
 
264
                                return pbms_exception_to_result(&self->myException, result);
 
265
                }
 
266
        }
 
267
        *r_self = self;
 
268
        return MS_OK;
 
269
}
 
270
 
 
271
int pbms_enter_conn_no_thd(CSThread **r_self, PBMSResultPtr result)
 
272
{
 
273
        return pbms_enter_conn(current_thd, r_self, result);
 
274
}
 
275
 
 
276
void pbms_exit_conn()
 
277
{
 
278
        THD                     *thd = (THD *) current_thd;
 
279
        CSThread        *self;
 
280
 
 
281
        self = CSThread::getSelf();
 
282
        if (self && self->pbms_api_owner)
 
283
                return;
 
284
 
 
285
 
 
286
        if (thd)
 
287
                CSThread::setSelf(NULL);
 
288
        else {
 
289
                self = CSThread::getSelf();
 
290
                CSThread::detach(self);
 
291
        }
 
292
}
 
293
 
 
294
int pbms_exception_to_result(CSException *e, PBMSResultPtr result)
 
295
{
 
296
        const char *context, *trace;
 
297
 
 
298
        result->mr_code = e->getErrorCode();
 
299
        cs_strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, e->getMessage());
 
300
        context = e->getContext();
 
301
        trace = e->getStackTrace();
 
302
        if (context && *context) {
 
303
                cs_strcpy(MS_RESULT_STACK_SIZE, result->mr_stack, context);
 
304
                if (trace && *trace)
 
305
                        cs_strcat(MS_RESULT_STACK_SIZE, result->mr_stack, "\n");
 
306
        }
 
307
        else
 
308
                *result->mr_stack = 0;
 
309
        if (trace && *trace)
 
310
                cs_strcat(MS_RESULT_STACK_SIZE, result->mr_stack, trace);
 
311
        return MS_ERR_ENGINE;
 
312
}
 
313
 
 
314
int pbms_os_error_result(const char *func, const char *file, int line, int err, PBMSResultPtr result)
 
315
{
 
316
        CSException e;
 
317
                
 
318
        e.initOSError(func, file, line, err);
 
319
        return pbms_exception_to_result(&e, result);
 
320
}
 
321
 
 
322
int pbms_error_result(const char *func, const char *file, int line, int err, const char *message, PBMSResultPtr result)
 
323
{
 
324
        CSException e;
 
325
                
 
326
        e.initException(func, file, line, err, message);
 
327
        return pbms_exception_to_result(&e, result);
 
328
}
 
329
 
 
330
/*
 
331
 * ---------------------------------------------------------------
 
332
 * HANDLER INTERFACE
 
333
 */
 
334
 
 
335
 
 
336
#ifdef DRIZZLED
 
337
handler *PBMSStorageEngine::create(TABLE_SHARE *table, MEM_ROOT *mem_root)
 
338
{
 
339
        PBMSStorageEngine * const hton = this;
 
340
#else
 
341
static handler *pbms_create_handler(handlerton *hton, TABLE_SHARE *table, MEM_ROOT *mem_root)
 
342
{
 
343
#endif
 
344
        return new (mem_root) ha_pbms(hton, table);
 
345
}
 
346
 
 
347
#ifdef DRIZZLED
 
348
int PBMSStorageEngine::commit(Session *thd, bool all)
 
349
{
 
350
        PBMSStorageEngine * const hton = this;
 
351
#else
 
352
static int pbms_commit(handlerton *hton, THD *thd, bool all)
 
353
{
 
354
#endif
 
355
        int                     err = 0;
 
356
        CSThread        *self;
 
357
        PBMSResultRec result;
 
358
 
 
359
        if (pbms_enter_conn_no_thd(&self, &result))
 
360
                return 0;
 
361
        inner_();
 
362
        try_(a) {
 
363
                MSTransactionManager::commit();
 
364
        }
 
365
        catch_(a) {
 
366
                err = pbms_exception_to_result(&self->myException, &result);
 
367
        }
 
368
        cont_(a);
 
369
        return_(err);
 
370
}
 
371
 
 
372
#ifdef DRIZZLED
 
373
int PBMSStorageEngine::rollback(Session *thd, bool all)
 
374
{
 
375
        PBMSStorageEngine * const hton = this;
 
376
#else
 
377
static int pbms_rollback(handlerton *hton, THD *thd, bool all)
 
378
{
 
379
#endif
 
380
        int                     err = 0;
 
381
        CSThread        *self;
 
382
        PBMSResultRec result;
 
383
 
 
384
        if (pbms_enter_conn_no_thd(&self, &result))
 
385
                return 0;
 
386
        inner_();
 
387
        try_(a) {
 
388
                MSTransactionManager::rollback();
 
389
        }
 
390
        catch_(a) {
 
391
                err = pbms_exception_to_result(&self->myException, &result);
 
392
        }
 
393
        cont_(a);
 
394
        return_(err);
 
395
}
 
396
 
 
397
#ifdef DRIZZLED
 
398
int PBMSStorageEngine::savepoint_set(Session *thd, void *sv)
 
399
{
 
400
        PBMSStorageEngine * const hton = this;
 
401
#else
 
402
static int pbms_savepoint_set(handlerton *hton, THD *thd, void *sv)
 
403
{
 
404
#endif
 
405
        int                     err = 0;
 
406
        CSThread        *self;
 
407
        PBMSResultRec result;
 
408
 
 
409
        if (pbms_enter_conn_no_thd(&self, &result))
 
410
                return 0;
 
411
                
 
412
        *((uint32_t*)sv) = self->myStmtCount;
 
413
        return 0;       
 
414
}
 
415
 
 
416
#ifdef DRIZZLED
 
417
int PBMSStorageEngine::savepoint_rollback(Session *thd, void *sv)
 
418
{
 
419
        PBMSStorageEngine * const hton = this;
 
420
#else
 
421
static int pbms_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
 
422
{
 
423
#endif
 
424
        int                     err = 0;
 
425
        CSThread        *self;
 
426
        PBMSResultRec result;
 
427
 
 
428
        if (pbms_enter_conn_no_thd(&self, &result))
 
429
                return 0;
 
430
        inner_();
 
431
        try_(a) {
 
432
                MSTransactionManager::rollbackTo(*((uint32_t*)sv));
 
433
        }
 
434
        catch_(a) {
 
435
                err = pbms_exception_to_result(&self->myException, &result);
 
436
        }
 
437
        cont_(a);
 
438
        return_(err);
 
439
}
 
440
 
 
441
#ifdef DRIZZLED
 
442
int PBMSStorageEngine::savepoint_release(Session *thd, void *sv)
 
443
{
 
444
        PBMSStorageEngine * const hton = this;
 
445
#else
 
446
static int pbms_savepoint_release(handlerton *hton, THD *thd, void *sv)
 
447
{
 
448
#endif
 
449
        return 0;
 
450
}
 
451
 
 
452
 
 
453
#ifdef DRIZZLED
 
454
void PBMSStorageEngine::drop_database(char *path)
 
455
{
 
456
        PBMSStorageEngine * const hton = this;
 
457
#else
 
458
static void pbms_drop_database(handlerton *hton, char *path)
 
459
{
 
460
#endif
 
461
        CSThread *self;
 
462
        char db_name[PATH_MAX];
 
463
        PBMSResultRec result;
 
464
        
 
465
        if (pbms_enter_conn_no_thd(&self, &result))
 
466
                return;
 
467
        inner_();
 
468
        
 
469
        cs_strcpy(PATH_MAX, db_name, cs_last_directory_of_path(path));
 
470
        cs_remove_dir_char(db_name);
 
471
        try_(a) {
 
472
                MSDatabase::dropDatabase(db_name);
 
473
        }
 
474
        catch_(a);
 
475
        self->logException();
 
476
        cont_(a);
 
477
        exit_();
 
478
}
 
479
 
 
480
static bool pbms_started = false;
 
481
 
 
482
 
 
483
#ifdef DRIZZLED
 
484
static int pbms_init_func(drizzled::plugin::Registry &registry)
 
485
#else
 
486
int pbms_discover_system_tables(handlerton *hton, THD* thd, const char *db, const char *name, uchar **frmblob, size_t *frmlen);
 
487
static int pbms_init_func(void *p)
 
488
#endif
 
489
{
 
490
        PBMSResultRec           result;
 
491
        int                                     err;
 
492
        int                                     my_res = 0;
 
493
        CSThread                        *thread;
 
494
 
 
495
        ASSERT(!pbms_started);
 
496
        pbms_started = false;
 
497
        
 
498
        MSDatabase::gRepoThreshold = (uint64_t) cs_byte_size_to_int8(pbms_repository_threshold);
 
499
        MSDatabase::gTempLogThreshold = (uint64_t) cs_byte_size_to_int8(pbms_temp_log_threshold);
 
500
 
 
501
        {
 
502
                char info[120];
 
503
                snprintf(info, 120, "PrimeBase Media Stream (PBMS) Daemon %s loaded...", ms_version());
 
504
                CSL.logLine(NULL, CSLog::Protocol, info);
 
505
        }
 
506
        CSL.logLine(NULL, CSLog::Protocol, "Barry Leslie, PrimeBase Technologies GmbH, http://www.primebase.org");
 
507
        
 
508
        if ((err = MSEngine::startUp(&result))) {
 
509
                CSL.logLine(NULL, CSLog::Error, result.mr_message);
 
510
                return(1);
 
511
        }
 
512
 
 
513
#ifdef DRIZZLED
 
514
                pbms_hton= new PBMSStorageEngine(std::string("PBMS"));
 
515
                registry.add(pbms_hton);
 
516
#else
 
517
        pbms_hton = (handlerton *) p;
 
518
        pbms_hton->state = SHOW_OPTION_YES;
 
519
        pbms_hton->close_connection = pbms_close_connection; /* close_connection, cleanup thread related data. */
 
520
        pbms_hton->create = pbms_create_handler;
 
521
        pbms_hton->flags = HTON_CAN_RECREATE | HTON_HIDDEN;
 
522
        pbms_hton->drop_database = pbms_drop_database; /* Drop a database */
 
523
        pbms_hton->discover = pbms_discover_system_tables;
 
524
 
 
525
        pbms_hton->commit = pbms_commit; /* commit */
 
526
        pbms_hton->rollback = pbms_rollback; /* rollback */
 
527
 
 
528
        pbms_hton->savepoint_offset = 4;
 
529
        pbms_hton->savepoint_set = pbms_savepoint_set;
 
530
        pbms_hton->savepoint_rollback = pbms_savepoint_rollback; 
 
531
        pbms_hton->savepoint_release = pbms_savepoint_release; 
 
532
#endif
 
533
        
 
534
        /* Startup the Media Stream network: */
 
535
        cs_init_memory();
 
536
        CSThread::startUp();
 
537
        if (!(thread = CSThread::newCSThread())) {
 
538
                CSException::logOSError(CS_CONTEXT, ENOMEM);
 
539
                return(1);
 
540
        }
 
541
        if (!CSThread::attach(thread)) {
 
542
                thread->myException.log(NULL);
 
543
                CSThread::shutDown();
 
544
                cs_exit_memory();
 
545
                MSEngine::shutDown();
 
546
                return(1);
 
547
        }
 
548
        enter_();
 
549
        try_(a) {
 
550
                thread->threadName = CSString::newString("startup");
 
551
                //CSTest::runAll();
 
552
                MSDatabase::startUp(pbms_http_metadata_headers);
 
553
                MSTableList::startUp();
 
554
                MSSystemTableShare::startUp();
 
555
                MSNetwork::startUp(pbms_port);
 
556
                MSTransactionManager::startUp();
 
557
                MSNetwork::startNetwork();
 
558
        }
 
559
        catch_(a) {
 
560
                self->logException();
 
561
                my_res = 1;
 
562
        }
 
563
        cont_(a);
 
564
        if (my_res) {
 
565
                try_(b) {
 
566
                        MSNetwork::shutDown();
 
567
                        MSTransactionManager::shutDown();
 
568
                        MSSystemTableShare::shutDown();
 
569
                        MSDatabase::stopThreads();
 
570
                        MSTableList::shutDown();
 
571
                        MSDatabase::shutDown();
 
572
                        CSThread::shutDown();
 
573
                }
 
574
                catch_(b) {
 
575
                        self->logException();
 
576
                }
 
577
                cont_(b);
 
578
        }
 
579
        outer_();
 
580
        CSThread::detach(thread);
 
581
 
 
582
        if (my_res) {
 
583
                cs_exit_memory();
 
584
                MSEngine::shutDown();
 
585
        }
 
586
        else {
 
587
                srandom(time(NULL));
 
588
                pbms_started = true;
 
589
                
 
590
        }
 
591
 
 
592
        return(my_res);
 
593
}
 
594
 
 
595
#ifdef DRIZZLED
 
596
static int pbms_done_func(drizzled::plugin::Registry &registry)
 
597
#else
 
598
static int pbms_done_func(void *)
 
599
#endif
 
600
{
 
601
        CSThread        *thread;
 
602
 
 
603
        if (!pbms_started)
 
604
                return 0;
 
605
 
 
606
        CSL.logLine(NULL, CSLog::Protocol, "PrimeBase Media Stream (PBMS) Daemon shutdown...");
 
607
        
 
608
        /* Shutdown the Media Stream network. */
 
609
        if (!(thread = CSThread::newCSThread()))
 
610
                CSException::logOSError(CS_CONTEXT, ENOMEM);
 
611
        else if (!CSThread::attach(thread))
 
612
                thread->myException.log(NULL);
 
613
        else {
 
614
                enter_();
 
615
                try_(a) {
 
616
                        thread->threadName = CSString::newString("shutdown");
 
617
                        MSNetwork::shutDown();
 
618
                        MSSystemTableShare::shutDown();
 
619
                        /* Ensure that the database threads are stopped before
 
620
                         * freeing the tables.
 
621
                         */
 
622
                        MSDatabase::stopThreads();
 
623
                        MSTableList::shutDown();
 
624
                        /* Databases must be shutdown after table because tables
 
625
                         * have references to repositories.
 
626
                         */
 
627
                        MSDatabase::shutDown();
 
628
                        
 
629
                        /* Shutdown the transaction manager after the databases
 
630
                         * incase they want to commit or rollback a transaction.
 
631
                         */
 
632
                        MSTransactionManager::shutDown();
 
633
                }
 
634
                catch_(a) {
 
635
                        self->logException();
 
636
                }
 
637
                cont_(a);
 
638
                outer_();
 
639
                CSThread::shutDown();
 
640
                CSThread::detach(thread);
 
641
        }
 
642
 
 
643
        MSEngine::shutDown();
 
644
        cs_exit_memory();
 
645
 
 
646
        CSL.logLine(NULL, CSLog::Protocol, "PrimeBase Media Stream (PBMS) Daemon shutdown completed");
 
647
        pbms_started = false;
 
648
#ifdef DRIZZLED
 
649
        registry.remove(pbms_hton);
 
650
#endif
 
651
        return(0);
 
652
}
 
653
 
 
654
ha_pbms::ha_pbms(handlerton *hton, TABLE_SHARE *table_arg):
 
655
handler(hton, table_arg),
 
656
ha_open_tab(NULL),
 
657
ha_error(0)
 
658
{
 
659
        memset(&ha_result, 0, sizeof(PBMSResultRec));
 
660
}
 
661
 
 
662
MX_TABLE_TYPES_T ha_pbms::table_flags() const
 
663
{
 
664
        return (
 
665
                /* We need this flag because records are not packed
 
666
                 * into a table which means #ROWID != offset
 
667
                 */
 
668
                HA_REC_NOT_IN_SEQ |
 
669
                HA_CAN_SQL_HANDLER |
 
670
#if MYSQL_VERSION_ID > 50119
 
671
                /* We can do row logging, but not statement, because
 
672
                 * MVCC is not serializable!
 
673
                 */
 
674
                HA_BINLOG_ROW_CAPABLE |
 
675
#endif
 
676
                /*
 
677
                 * Auto-increment is allowed on a partial key.
 
678
                 */
 
679
                0);
 
680
}
 
681
 
 
682
int ha_pbms::open(const char *table_path, int mode, uint test_if_locked)
 
683
{
 
684
        CSThread *self;
 
685
 
 
686
        if ((ha_error = pbms_enter_conn(current_thd, &self, &ha_result)))
 
687
                return 1;
 
688
 
 
689
        inner_();
 
690
        try_(a) {
 
691
                ha_open_tab = MSSystemTableShare::openSystemTable(table_path, table);
 
692
                thr_lock_data_init(&ha_open_tab->myShare->myThrLock, &ha_lock, NULL);
 
693
                ref_length = ha_open_tab->getRefLen();
 
694
        }
 
695
        catch_(a) {
 
696
                ha_error = pbms_exception_to_result(&self->myException, &ha_result);
 
697
        }
 
698
        cont_(a);
 
699
        return_(ha_error != MS_OK);
 
700
}
 
701
 
 
702
int ha_pbms::close(void)
 
703
{
 
704
        CSThread *self;
 
705
 
 
706
        if ((ha_error = pbms_enter_conn(current_thd, &self, &ha_result)))
 
707
                return 1;
 
708
 
 
709
        inner_();
 
710
        if (ha_open_tab) {
 
711
                ha_open_tab->release();
 
712
                ha_open_tab = NULL;
 
713
        }
 
714
        outer_();
 
715
        pbms_exit_conn();
 
716
        return 0;
 
717
}
 
718
 
 
719
#ifdef PBMS_HAS_KEYS
 
720
/* Index access functions: */
 
721
int ha_pbms::index_init(uint idx, bool sorted __attribute__((unused)))
 
722
{
 
723
        int err = 0;
 
724
        enter_();
 
725
        try_(a) {
 
726
                ha_open_tab->index_init(idx);
 
727
        }
 
728
        catch_(a) {
 
729
                ha_error = pbms_exception_to_result(&self->myException, &ha_result);
 
730
                err = 1;
 
731
        }
 
732
        cont_(a);
 
733
        return_(err);
 
734
}
 
735
 
 
736
//-------
 
737
int ha_pbms::index_end()
 
738
{
 
739
        int err = 0;
 
740
        enter_();
 
741
        try_(a) {
 
742
                ha_open_tab->index_end();
 
743
        }
 
744
        catch_(a) {
 
745
                ha_error = pbms_exception_to_result(&self->myException, &ha_result);
 
746
                err = 1;
 
747
        }
 
748
        cont_(a);
 
749
        return_(err);
 
750
}
 
751
 
 
752
//-------
 
753
int ha_pbms::index_read(byte * buf, const byte * key,
 
754
                                                         uint key_len, enum ha_rkey_function find_flag)
 
755
{
 
756
        int err = 0;
 
757
        enter_();
 
758
        try_(a) {
 
759
                if (!ha_open_tab->index_read(buf, key, key_len, find_flag))
 
760
                        err = HA_ERR_KEY_NOT_FOUND;
 
761
 
 
762
        }
 
763
        catch_(a) {
 
764
                ha_error = pbms_exception_to_result(&self->myException, &ha_result);
 
765
                err = 1;
 
766
        }
 
767
        cont_(a);
 
768
        return_(err);
 
769
}
 
770
 
 
771
//-------
 
772
int ha_pbms::index_read_idx(byte * buf, uint idx, const byte * key,
 
773
                                                                         uint key_len, enum ha_rkey_function find_flag)
 
774
{
 
775
        int err = 0;
 
776
        enter_();
 
777
        try_(a) {
 
778
                if (!ha_open_tab->index_read_idx(buf, idx, key, key_len, find_flag))
 
779
                        err = HA_ERR_KEY_NOT_FOUND;
 
780
        }
 
781
        catch_(a) {
 
782
                ha_error = pbms_exception_to_result(&self->myException, &ha_result);
 
783
                err = 1;
 
784
        }
 
785
        cont_(a);
 
786
        return_(err);
 
787
}
 
788
 
 
789
//-------
 
790
int ha_pbms::index_next(byte * buf)
 
791
{
 
792
        int err = 0;
 
793
        enter_();
 
794
        try_(a) {
 
795
                if (!ha_open_tab->index_next(buf))
 
796
                        err = HA_ERR_END_OF_FILE;
 
797
        }
 
798
        catch_(a) {
 
799
                ha_error = pbms_exception_to_result(&self->myException, &ha_result);
 
800
                err = 1;
 
801
        }
 
802
        cont_(a);
 
803
        return_(err);
 
804
}
 
805
 
 
806
//-------
 
807
int ha_pbms::index_prev(byte * buf)
 
808
{
 
809
        int err = 0;
 
810
        enter_();
 
811
        try_(a) {
 
812
                if (!ha_open_tab->index_prev(buf))
 
813
                        err = HA_ERR_END_OF_FILE;
 
814
        }
 
815
        catch_(a) {
 
816
                ha_error = pbms_exception_to_result(&self->myException, &ha_result);
 
817
                err = 1;
 
818
        }
 
819
        cont_(a);
 
820
        return_(err);
 
821
}
 
822
 
 
823
//-------
 
824
int ha_pbms::index_first(byte * buf)
 
825
{
 
826
        int err = 0;
 
827
        enter_();
 
828
        try_(a) {
 
829
                if (!ha_open_tab->index_first(buf))
 
830
                        err = HA_ERR_END_OF_FILE;
 
831
        }
 
832
        catch_(a) {
 
833
                ha_error = pbms_exception_to_result(&self->myException, &ha_result);
 
834
                err = 1;
 
835
        }
 
836
        cont_(a);
 
837
        return_(err);
 
838
}
 
839
 
 
840
//-------
 
841
int ha_pbms::index_last(byte * buf)
 
842
{
 
843
        int err = 0;
 
844
        enter_();
 
845
        try_(a) {
 
846
                if (!ha_open_tab->index_last(buf))
 
847
                        err = HA_ERR_END_OF_FILE;
 
848
        }
 
849
        catch_(a) {
 
850
                ha_error = pbms_exception_to_result(&self->myException, &ha_result);
 
851
                err = 1;
 
852
        }
 
853
        cont_(a);
 
854
        return_(err);
 
855
}
 
856
 
 
857
//-------
 
858
int ha_pbms::index_read_last(byte * buf, const byte * key, uint key_len)
 
859
{
 
860
        int err = 0;
 
861
        enter_();
 
862
        try_(a) {
 
863
                if (!ha_open_tab->index_read_last(buf, key, key_len))
 
864
                        err = HA_ERR_KEY_NOT_FOUND;
 
865
        }
 
866
        catch_(a) {
 
867
                ha_error = pbms_exception_to_result(&self->myException, &ha_result);
 
868
                err = 1;
 
869
        }
 
870
        cont_(a);
 
871
        return_(err);
 
872
}
 
873
 
 
874
//-------
 
875
 
 
876
#endif // PBMS_HAS_KEYS
 
877
 
 
878
/* Sequential scan functions: */
 
879
int ha_pbms::rnd_init(bool scan)
 
880
{
 
881
        int err = 0;
 
882
        enter_();
 
883
        try_(a) {
 
884
                ha_open_tab->seqScanInit();
 
885
        }
 
886
        catch_(a) {
 
887
                ha_error = pbms_exception_to_result(&self->myException, &ha_result);
 
888
                err = 1;
 
889
        }
 
890
        cont_(a);
 
891
        return_(err);
 
892
}
 
893
 
 
894
//-------
 
895
int ha_pbms::rnd_next(unsigned char *buf)
 
896
{
 
897
        int err = 0;
 
898
        enter_();
 
899
        try_(a) {
 
900
                if (!ha_open_tab->seqScanNext((char *) buf))
 
901
                        err = HA_ERR_END_OF_FILE;
 
902
        }
 
903
        catch_(a) {
 
904
                ha_error = pbms_exception_to_result(&self->myException, &ha_result);
 
905
                err = 1;
 
906
        }
 
907
        cont_(a);
 
908
        return_(err);
 
909
}
 
910
 
 
911
//-------
 
912
void ha_pbms::position(const unsigned char *record)
 
913
{
 
914
        ha_open_tab->seqScanPos((uint8_t *) ref);
 
915
}
 
916
 
 
917
//-------
 
918
int ha_pbms::rnd_pos(unsigned char * buf, unsigned char *pos)
 
919
{
 
920
        int err = 0;
 
921
        enter_();
 
922
        try_(a) {
 
923
                ha_open_tab->seqScanRead((uint8_t *) pos, (char *) buf);
 
924
        }
 
925
        catch_(a) {
 
926
                ha_error = pbms_exception_to_result(&self->myException, &ha_result);
 
927
                err = 1;
 
928
        }
 
929
        cont_(a);
 
930
        return_(err);
 
931
}
 
932
 
 
933
//////////////////////////////
 
934
int ha_pbms::write_row(unsigned char * buf)
 
935
{
 
936
        int err = 0;
 
937
        enter_();
 
938
        try_(a) {
 
939
                ha_open_tab->insertRow((char *) buf);
 
940
        }
 
941
        catch_(a) {
 
942
                ha_error = pbms_exception_to_result(&self->myException, &ha_result);
 
943
                err = 1;
 
944
        }
 
945
        cont_(a);
 
946
        return_(err);
 
947
}
 
948
 
 
949
int ha_pbms::delete_row(const unsigned char * buf)
 
950
{
 
951
        int err = 0;
 
952
        enter_();
 
953
        try_(a) {
 
954
                ha_open_tab->deleteRow((char *) buf);
 
955
        }
 
956
        catch_(a) {
 
957
                ha_error = pbms_exception_to_result(&self->myException, &ha_result);
 
958
                err = 1;
 
959
        }
 
960
        cont_(a);
 
961
        return_(err);
 
962
}
 
963
 
 
964
int ha_pbms::update_row(const unsigned char * old_data, unsigned char * new_data)
 
965
{
 
966
        int err = 0;
 
967
        enter_();
 
968
        try_(a) {
 
969
                ha_open_tab->updateRow((char *) old_data, (char *) new_data);
 
970
        }
 
971
        catch_(a) {
 
972
                ha_error = pbms_exception_to_result(&self->myException, &ha_result);
 
973
                err = 1;
 
974
        }
 
975
        cont_(a);
 
976
        return_(err);
 
977
}
 
978
 
 
979
int ha_pbms::info(uint flag)
 
980
{
 
981
        return 0;
 
982
}
 
983
 
 
984
int ha_pbms::external_lock(THD *thd, int lock_type)
 
985
{
 
986
        CSThread        *self;
 
987
        int                     err = 0;
 
988
 
 
989
        if ((ha_error = pbms_enter_conn(thd, &self, &ha_result)))
 
990
                return 1;
 
991
 
 
992
        inner_();
 
993
        try_(a) {
 
994
                if (lock_type == F_UNLCK)
 
995
                        ha_open_tab->unuse();
 
996
                else
 
997
                        ha_open_tab->use();
 
998
        }
 
999
        catch_(a) {
 
1000
                ha_error = pbms_exception_to_result(&self->myException, &ha_result);
 
1001
                err = 1;
 
1002
        }
 
1003
        cont_(a);
 
1004
        return_(err);
 
1005
}
 
1006
 
 
1007
THR_LOCK_DATA **ha_pbms::store_lock(THD *thd, THR_LOCK_DATA **to, enum thr_lock_type lock_type)
 
1008
{
 
1009
        if (lock_type != TL_IGNORE && ha_lock.type == TL_UNLOCK)
 
1010
                ha_lock.type = lock_type;
 
1011
        *to++ = &ha_lock;
 
1012
        return to;
 
1013
}
 
1014
 
 
1015
#ifndef DRIZZLED
 
1016
int ha_pbms::create(const char *name, TABLE *table_arg, HA_CREATE_INFO *create_info)
 
1017
{
 
1018
        if (pbms_is_Systable(cs_last_name_of_path(name)))
 
1019
                return(0);
 
1020
                
 
1021
        /* Create only works for system tables. */
 
1022
        return( HA_ERR_WRONG_COMMAND );
 
1023
}
 
1024
#endif
 
1025
 
 
1026
bool ha_pbms::get_error_message(int error, String *buf)
 
1027
{
 
1028
        if (!ha_result.mr_code)
 
1029
                return false;
 
1030
 
 
1031
        buf->copy(ha_result.mr_message, strlen(ha_result.mr_message), system_charset_info);
 
1032
        return true;
 
1033
}
 
1034
 
 
1035
 
 
1036
#ifndef DRIZZLED
 
1037
struct st_mysql_storage_engine pbms_engine_handler = {
 
1038
        MYSQL_HANDLERTON_INTERFACE_VERSION
 
1039
};
 
1040
#endif
 
1041
 
 
1042
struct st_mysql_sys_var
 
1043
{
 
1044
  MYSQL_PLUGIN_VAR_HEADER;
 
1045
};
 
1046
 
 
1047
#if MYSQL_VERSION_ID < 60000
 
1048
#if MYSQL_VERSION_ID >= 50124
 
1049
#define USE_CONST_SAVE
 
1050
#endif
 
1051
#else
 
1052
#if MYSQL_VERSION_ID >= 60005
 
1053
#define USE_CONST_SAVE
 
1054
#endif
 
1055
#endif
 
1056
 
 
1057
#ifdef USE_CONST_SAVE
 
1058
static void pbms_repository_threshold_func(THD *thd, struct st_mysql_sys_var *var, void *tgt, const void *save)
 
1059
#else
 
1060
static void pbms_repository_threshold_func(THD *thd, struct st_mysql_sys_var *var, void *tgt, void *save)
 
1061
#endif
 
1062
{
 
1063
        char *old= *(char **) tgt;
 
1064
        *(char **)tgt= *(char **) save;
 
1065
        if (var->flags & PLUGIN_VAR_MEMALLOC)
 
1066
        {
 
1067
                *(char **)tgt= my_strdup(*(char **) save, MYF(0));
 
1068
                my_free(old, MYF(0));
 
1069
        }
 
1070
        MSDatabase::gRepoThreshold = (uint64_t) cs_byte_size_to_int8(pbms_repository_threshold);
 
1071
#ifdef DEBUG
 
1072
        char buffer[200];
 
1073
 
 
1074
        snprintf(buffer, 200, "pbms_repository_threshold=%"PRIu64"\n", MSDatabase::gRepoThreshold);
 
1075
        CSL.log(NULL, CSLog::Protocol, buffer);
 
1076
#endif
 
1077
}
 
1078
 
 
1079
#ifdef USE_CONST_SAVE
 
1080
static void pbms_temp_log_threshold_func(THD *thd, struct st_mysql_sys_var *var, void *tgt, const void *save)
 
1081
#else
 
1082
static void pbms_temp_log_threshold_func(THD *thd, struct st_mysql_sys_var *var, void *tgt, void *save)
 
1083
#endif
 
1084
{
 
1085
        char *old= *(char **) tgt;
 
1086
        *(char **)tgt= *(char **) save;
 
1087
        if (var->flags & PLUGIN_VAR_MEMALLOC)
 
1088
        {
 
1089
                *(char **)tgt= my_strdup(*(char **) save, MYF(0));
 
1090
                my_free(old, MYF(0));
 
1091
        }
 
1092
        MSDatabase::gTempLogThreshold = (uint64_t) cs_byte_size_to_int8(pbms_temp_log_threshold);
 
1093
#ifdef DEBUG
 
1094
        char buffer[200];
 
1095
 
 
1096
        snprintf(buffer, 200, "pbms_temp_log_threshold=%"PRIu64"\n",MSDatabase::gTempLogThreshold);
 
1097
        CSL.log(NULL, CSLog::Protocol, buffer);
 
1098
#endif
 
1099
}
 
1100
 
 
1101
#ifdef USE_CONST_SAVE
 
1102
static void pbms_http_metadata_headers_func(THD *thd, struct st_mysql_sys_var *var, void *tgt, const void *save)
 
1103
#else
 
1104
static void pbms_http_metadata_headers_func(THD *thd, struct st_mysql_sys_var *var, void *tgt, void *save)
 
1105
#endif
 
1106
{
 
1107
        char *old= *(char **) tgt;
 
1108
        *(char **)tgt= *(char **) save;
 
1109
        if (var->flags & PLUGIN_VAR_MEMALLOC)
 
1110
        {
 
1111
                *(char **)tgt= my_strdup(*(char **) save, MYF(0));
 
1112
                my_free(old, MYF(0));
 
1113
        }
 
1114
        
 
1115
        MSHTTPHeaderTable::setDefaultMetaDataHeaders(pbms_http_metadata_headers);
 
1116
        
 
1117
#ifdef DEBUG
 
1118
        char buffer[200];
 
1119
 
 
1120
        snprintf(buffer, 200, "pbms_http_metadata_headers=%"PRIu64"\n", pbms_http_metadata_headers);
 
1121
        CSL.log(NULL, CSLog::Protocol, buffer);
 
1122
#endif
 
1123
}
 
1124
 
 
1125
#ifdef USE_CONST_SAVE
 
1126
static void pbms_temp_blob_timeout_func(THD *thd, struct st_mysql_sys_var *var, void *tgt, const void *save)
 
1127
#else
 
1128
static void pbms_temp_blob_timeout_func(THD *thd, struct st_mysql_sys_var *var, void *tgt, void *save)
 
1129
#endif
 
1130
{
 
1131
        CSThread                *self;
 
1132
        PBMSResultRec   result;
 
1133
 
 
1134
        *(long *)tgt= *(long *) save;
 
1135
 
 
1136
        if (pbms_enter_conn(thd, &self, &result))
 
1137
                return;
 
1138
        MSDatabase::wakeTempLogThreads();
 
1139
}
 
1140
 
 
1141
#if MYSQL_VERSION_ID >= 50118
 
1142
static MYSQL_SYSVAR_INT(port, pbms_port,
 
1143
        PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_READONLY,
 
1144
        "The port for the server stream-based communications.",
 
1145
        NULL, NULL, pbms_port, 0, 64*1024, 1);
 
1146
 
 
1147
static MYSQL_SYSVAR_STR(repository_threshold, pbms_repository_threshold,
 
1148
        PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
 
1149
        "The maximum size of a BLOB repository file.",
 
1150
        NULL, /*NULL*/ /**/pbms_repository_threshold_func/**/, MS_REPO_THRESHOLD_DEF);
 
1151
 
 
1152
static MYSQL_SYSVAR_STR(temp_log_threshold, pbms_temp_log_threshold,
 
1153
        PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
 
1154
        "The maximum size of a temorary BLOB log file.",
 
1155
        NULL, /*NULL*/ /**/pbms_temp_log_threshold_func/**/, MS_TEMP_LOG_THRESHOLD_DEF);
 
1156
 
 
1157
static MYSQL_SYSVAR_STR(http_metadata_headers, pbms_http_metadata_headers,
 
1158
        PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
 
1159
        "A ':' delimited list of metadata header names to be used to initialize the pbms_metadata_header table when a database is created.",
 
1160
        NULL, /*NULL*/ /**/pbms_http_metadata_headers_func/**/, MS_HTTP_METADATA_HEADERS_DEF);
 
1161
 
 
1162
static MYSQL_SYSVAR_ULONG(temp_blob_timeout, MSTempLog::gTempBlobTimeout,
 
1163
        PLUGIN_VAR_OPCMDARG,
 
1164
        "The timeout, in seconds, for temporary BLOBs. Uploaded blob data is removed after this time, unless committed to the database.",
 
1165
        NULL, pbms_temp_blob_timeout_func, MS_DEFAULT_TEMP_LOG_WAIT, 1, ~0L, 1);
 
1166
 
 
1167
static MYSQL_SYSVAR_INT(garbage_threshold, MSRepository::gGarbageThreshold,
 
1168
        PLUGIN_VAR_OPCMDARG,
 
1169
        "The percentage of garbage in a repository file before it is compacted.",
 
1170
        NULL, NULL, MS_DEFAULT_GARBAGE_LEVEL, 0, 100, 1);
 
1171
 
 
1172
 
 
1173
static MYSQL_SYSVAR_INT(max_keep_alive, MSConnectionHandler::gMaxKeepAlive,
 
1174
        PLUGIN_VAR_OPCMDARG,
 
1175
        "The timeout, in milli-seconds, before the HTTP server will close an inactive HTTP connection.",
 
1176
        NULL, NULL, MS_DEFAULT_KEEP_ALIVE, 1, INT32_MAX, 1);
 
1177
 
 
1178
static struct st_mysql_sys_var* pbms_system_variables[] = {
 
1179
        MYSQL_SYSVAR(port),
 
1180
        MYSQL_SYSVAR(repository_threshold),
 
1181
        MYSQL_SYSVAR(temp_log_threshold),
 
1182
        MYSQL_SYSVAR(temp_blob_timeout),
 
1183
        MYSQL_SYSVAR(garbage_threshold),
 
1184
        MYSQL_SYSVAR(http_metadata_headers),
 
1185
        MYSQL_SYSVAR(max_keep_alive),
 
1186
        NULL
 
1187
};
 
1188
#endif
 
1189
 
 
1190
#ifdef DRIZZLED
 
1191
drizzle_declare_plugin(pbms)
 
1192
#else
 
1193
mysql_declare_plugin(pbms)
 
1194
#endif
 
1195
{
 
1196
#ifndef DRIZZLED
 
1197
        MYSQL_STORAGE_ENGINE_PLUGIN,
 
1198
        &pbms_engine_handler,
 
1199
#endif
 
1200
        "PBMS",
 
1201
#ifdef DRIZZLED
 
1202
        "1.0",
 
1203
#endif
 
1204
        "Barry Leslie, PrimeBase Technologies GmbH",
 
1205
        "The Media Stream daemon for MySQL",
 
1206
        PLUGIN_LICENSE_GPL,
 
1207
        pbms_init_func, /* Plugin Init */
 
1208
        pbms_done_func, /* Plugin Deinit */
 
1209
#ifndef DRIZZLED
 
1210
        0x0001 /* 0.1 */,
 
1211
#endif
 
1212
        NULL,                                                                                   /* status variables                                                             */
 
1213
#if MYSQL_VERSION_ID >= 50118
 
1214
        pbms_system_variables,                                                  /* system variables                                                             */
 
1215
#else
 
1216
        NULL,
 
1217
#endif
 
1218
        NULL                                                                                    /* config options                                                               */
 
1219
}
 
1220
#ifdef DRIZZLED
 
1221
drizzle_declare_plugin_end;
 
1222
#else
 
1223
mysql_declare_plugin_end;
 
1224
#endif