~drizzle-trunk/drizzle/development

1548.2.1 by Barry.Leslie at PrimeBase
Added the PBMS daemon plugin.
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
2
 *
3
 * PrimeBase Media Stream for MySQL
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of the GNU General Public License as published by
7
 * the Free Software Foundation; either version 2 of the License, or
8
 * (at your option) any later version.
9
 *
10
 * This program is distributed in the hope that it will be useful,
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 * GNU General Public License for more details.
14
 *
15
 * You should have received a copy of the GNU General Public License
16
 * along with this program; if not, write to the Free Software
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
 *
19
 * Original author: Paul McCullagh
20
 * Continued development: Barry Leslie
21
 *
22
 * 2007-05-20
23
 *
24
 * H&G2JCtL
25
 *
26
 * Table handler.
27
 *
28
 */
29
30
#ifdef USE_PRAGMA_IMPLEMENTATION
31
#pragma implementation				// gcc: Class implementation
32
#endif
33
34
#ifdef DRIZZLED
35
#include <drizzled/server_includes.h>
36
#include <drizzled/handler.h>
37
#include <drizzled/table.h>
38
#include <drizzled/current_session.h>
39
#include <drizzled/plugin/storage_engine.h>
40
#include <drizzled/plugin.h>
41
42
#define my_strdup(a,b) strdup(a)
43
44
/*
45
#include <sys/stat.h>
46
#include <drizzled/common_server.h>
47
#include <drizzled/data_home.h>
48
#include <drizzled/error.h>
49
#include <drizzled/handler.h>
50
#include <drizzled/plugin/storage_engine.h>
51
*/
52
#include "CSConfig.h"
53
#else
54
#include "CSConfig.h"
55
#include "mysql_priv.h"
56
#include <mysql/plugin.h>
57
#include <my_dir.h>
58
#endif 
59
60
#include <stdlib.h>
61
#include <time.h>
62
63
64
#include "Defs_ms.h"
65
66
#include "CSDefs.h"
67
#include "CSObject.h"
68
#include "CSGlobal.h"
69
#include "CSThread.h"
70
#include "CSStrUtil.h"
71
#include "CSTest.h"
72
#include "CSLog.h"
73
74
#include "Engine_ms.h"	
75
#include "ha_pbms.h"
76
#include "Network_ms.h"
77
#include "ConnectionHandler_ms.h"
78
#include "OpenTable_ms.h"
79
#include "Database_ms.h"
80
#include "TempLog_ms.h"
81
#include "Util_ms.h"
82
#include "SystemTable_ms.h"
83
#include "ms_mysql.h"
84
#include "Discover_ms.h"
85
#include "metadata_ms.h"
86
#include "Transaction_ms.h"
87
#include "SysTab_httpheader.h"
88
89
/* Note: 'new' used here is NOT CSObject::new which is a DEBUG define*/
90
#ifdef new
91
#undef new
92
#endif
93
94
#ifndef PBMS_PORT
95
#define PBMS_PORT 8080
96
#endif
97
98
static int		pbms_port = PBMS_PORT; 
99
static char		*pbms_repository_threshold;
100
static char		*pbms_temp_log_threshold;
101
static char		*pbms_http_metadata_headers;
102
103
#ifdef DRIZZLED
104
class PBMSTableNameIterator: public TableNameIteratorImplementation
105
{
106
private:
107
  uint32_t current_name;
108
109
public:
110
  PBMSTableNameIterator(const std::string &database)
111
    : TableNameIteratorImplementation(database), current_name(0)
112
    {};
113
114
   int next(std::string *name);
115
116
};
117
118
int PBMSTableNameIterator::next(string *name)
119
{
120
	const char *tab_name = pbms_getSysTableName(current_name++);
121
	
122
	if (!tab_name) 
123
		return -1;
124
		
125
    if (name)
126
      name->assign(tab_name);
127
	  
128
	return 0;
129
	
130
}
131
132
int pbms_discover_system_tables(const char *name, drizzled::message::Table *table);
133
134
class PBMSStorageEngine : public StorageEngine {
135
public:
136
	PBMSStorageEngine(std::string name_arg)
137
	: StorageEngine(name_arg, HTON_NO_FLAGS | HTON_HIDDEN, 4 /*savepoint_offset*/, false /*support_2pc*/) {}
138
139
	int close_connection(Session *);
140
	int commit(Session *, bool);
141
	int rollback(Session *, bool);
142
	handler *create(TABLE_SHARE *, MEM_ROOT *);
143
	void drop_database(char *);
144
	int savepoint_set(Session *thd, void *sv);
145
	int savepoint_rollback(Session *thd, void *sv);
146
	int savepoint_release(Session *thd, void *sv);
147
	const char **bas_ext() const;
148
149
	int createTableImplementation(Session*, const char *path, Table *,HA_CREATE_INFO *, drizzled::message::Table*)
150
	{
151
		if (pbms_is_Systable(cs_last_name_of_path(path)))
152
			return(0);
153
			
154
		/* Create only works for system tables. */
155
		return( HA_ERR_WRONG_COMMAND );
156
	}
157
	
158
	int deleteTableImplementation(Session*, const string table_name) { return 0;}
159
	
160
	int getTableProtoImplementation(const char* path, drizzled::message::Table *table_proto)
161
	{
162
		int err = pbms_discover_system_tables(path, table_proto);
163
		if (err)
164
			return err;
165
			
166
		return EEXIST;
167
	}
168
169
	TableNameIteratorImplementation* tableNameIterator(const std::string &database)
170
	{
171
		return new PBMSTableNameIterator(database);
172
	}
173
174
	int renameTableImplementation(Session*,
175
											  const char *from, const char *to)
176
	{
177
	  return -1;
178
	}
179
};
180
181
PBMSStorageEngine	*pbms_hton;
182
#else
183
handlerton		*pbms_hton;
184
#endif
185
186
static const char *ha_pbms_exts[] = {
187
	NullS
188
};
189
190
/*
191
 * ---------------------------------------------------------------
192
 * UTILITIES
193
 */
194
195
void pbms_take_part_in_transaction(void *thread)
196
{
197
	THD			*thd;
198
	if ((thd = (THD *) thread)) {
199
		trans_register_ha(thd, true, pbms_hton); 
200
	}
201
}
202
203
#ifdef DRIZZLED
204
const char **PBMSStorageEngine::bas_ext() const
205
#else
206
const char **ha_pbms::bas_ext() const
207
#endif
208
{
209
	return ha_pbms_exts;
210
}
211
212
#ifdef DRIZZLED
213
int PBMSStorageEngine::close_connection(Session *thd)
214
{
215
	PBMSStorageEngine * const hton = this;
216
#else
217
static int pbms_close_connection(handlerton *hton, THD* thd)
218
{
219
#endif
220
	CSThread	*self;
221
222
	self = CSThread::getSelf();
223
	if (self && self->pbms_api_owner)
224
		return 0;
225
226
	if (thd) {
227
		if ((self = (CSThread *) *thd_ha_data(thd, pbms_hton))) {
228
			*thd_ha_data(thd, pbms_hton) = NULL;
229
			CSThread::setSelf(self);
230
			CSThread::detach(self);
231
		}
232
	}
233
	else {
234
		self = CSThread::getSelf();
235
		CSThread::detach(self);
236
	}
237
	return 0;
238
}
239
240
static int pbms_enter_conn(void *thread, CSThread **r_self, PBMSResultPtr result)
241
{
242
	THD			*thd;
243
	CSThread	*self;
244
245
	self = CSThread::getSelf();
246
	if (!self) {	
247
		if ((thd = (THD *) thread)) {
248
			if (!(self = (CSThread *) *thd_ha_data(((THD *) thd), pbms_hton))) {
249
				if (!(self = CSThread::newCSThread()))
250
					return pbms_os_error_result(CS_CONTEXT, ENOMEM, result);
251
				if (!CSThread::attach(self))
252
					return pbms_exception_to_result(&self->myException, result);
253
				*thd_ha_data(thd, pbms_hton) = self;
254
			}
255
			else {
256
				if (!CSThread::setSelf(self))
257
					return pbms_exception_to_result(&self->myException, result);
258
			}
259
		}
260
		else {
261
			if (!(self = CSThread::newCSThread()))
262
				return pbms_os_error_result(CS_CONTEXT, ENOMEM, result);
263
			if (!CSThread::attach(self))
264
				return pbms_exception_to_result(&self->myException, result);
265
		}
266
	}
267
	*r_self = self;
268
	return MS_OK;
269
}
270
271
int pbms_enter_conn_no_thd(CSThread **r_self, PBMSResultPtr result)
272
{
273
	return pbms_enter_conn(current_thd, r_self, result);
274
}
275
276
void pbms_exit_conn()
277
{
278
	THD			*thd = (THD *) current_thd;
279
	CSThread	*self;
280
281
	self = CSThread::getSelf();
282
	if (self && self->pbms_api_owner)
283
		return;
284
285
286
	if (thd)
287
		CSThread::setSelf(NULL);
288
	else {
289
		self = CSThread::getSelf();
290
		CSThread::detach(self);
291
	}
292
}
293
294
int pbms_exception_to_result(CSException *e, PBMSResultPtr result)
295
{
296
	const char *context, *trace;
297
298
	result->mr_code = e->getErrorCode();
299
	cs_strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, e->getMessage());
300
	context = e->getContext();
301
	trace = e->getStackTrace();
302
	if (context && *context) {
303
		cs_strcpy(MS_RESULT_STACK_SIZE, result->mr_stack, context);
304
		if (trace && *trace)
305
			cs_strcat(MS_RESULT_STACK_SIZE, result->mr_stack, "\n");
306
	}
307
	else
308
		*result->mr_stack = 0;
309
	if (trace && *trace)
310
		cs_strcat(MS_RESULT_STACK_SIZE, result->mr_stack, trace);
311
	return MS_ERR_ENGINE;
312
}
313
314
int pbms_os_error_result(const char *func, const char *file, int line, int err, PBMSResultPtr result)
315
{
316
	CSException e;
317
		
318
	e.initOSError(func, file, line, err);
319
	return pbms_exception_to_result(&e, result);
320
}
321
322
int pbms_error_result(const char *func, const char *file, int line, int err, const char *message, PBMSResultPtr result)
323
{
324
	CSException e;
325
		
326
	e.initException(func, file, line, err, message);
327
	return pbms_exception_to_result(&e, result);
328
}
329
330
/*
331
 * ---------------------------------------------------------------
332
 * HANDLER INTERFACE
333
 */
334
335
336
#ifdef DRIZZLED
337
handler *PBMSStorageEngine::create(TABLE_SHARE *table, MEM_ROOT *mem_root)
338
{
339
	PBMSStorageEngine * const hton = this;
340
#else
341
static handler *pbms_create_handler(handlerton *hton, TABLE_SHARE *table, MEM_ROOT *mem_root)
342
{
343
#endif
344
	return new (mem_root) ha_pbms(hton, table);
345
}
346
347
#ifdef DRIZZLED
348
int PBMSStorageEngine::commit(Session *thd, bool all)
349
{
350
	PBMSStorageEngine * const hton = this;
351
#else
352
static int pbms_commit(handlerton *hton, THD *thd, bool all)
353
{
354
#endif
355
	int			err = 0;
356
	CSThread	*self;
357
	PBMSResultRec result;
358
359
	if (pbms_enter_conn_no_thd(&self, &result))
360
		return 0;
361
	inner_();
362
	try_(a) {
363
		MSTransactionManager::commit();
364
	}
365
	catch_(a) {
366
		err = pbms_exception_to_result(&self->myException, &result);
367
	}
368
	cont_(a);
369
	return_(err);
370
}
371
372
#ifdef DRIZZLED
373
int PBMSStorageEngine::rollback(Session *thd, bool all)
374
{
375
	PBMSStorageEngine * const hton = this;
376
#else
377
static int pbms_rollback(handlerton *hton, THD *thd, bool all)
378
{
379
#endif
380
	int			err = 0;
381
	CSThread	*self;
382
	PBMSResultRec result;
383
384
	if (pbms_enter_conn_no_thd(&self, &result))
385
		return 0;
386
	inner_();
387
	try_(a) {
388
		MSTransactionManager::rollback();
389
	}
390
	catch_(a) {
391
		err = pbms_exception_to_result(&self->myException, &result);
392
	}
393
	cont_(a);
394
	return_(err);
395
}
396
397
#ifdef DRIZZLED
398
int PBMSStorageEngine::savepoint_set(Session *thd, void *sv)
399
{
400
	PBMSStorageEngine * const hton = this;
401
#else
402
static int pbms_savepoint_set(handlerton *hton, THD *thd, void *sv)
403
{
404
#endif
405
	int			err = 0;
406
	CSThread	*self;
407
	PBMSResultRec result;
408
409
	if (pbms_enter_conn_no_thd(&self, &result))
410
		return 0;
411
		
412
	*((uint32_t*)sv) = self->myStmtCount;
413
	return 0;	
414
}
415
416
#ifdef DRIZZLED
417
int PBMSStorageEngine::savepoint_rollback(Session *thd, void *sv)
418
{
419
	PBMSStorageEngine * const hton = this;
420
#else
421
static int pbms_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
422
{
423
#endif
424
	int			err = 0;
425
	CSThread	*self;
426
	PBMSResultRec result;
427
428
	if (pbms_enter_conn_no_thd(&self, &result))
429
		return 0;
430
	inner_();
431
	try_(a) {
432
		MSTransactionManager::rollbackTo(*((uint32_t*)sv));
433
	}
434
	catch_(a) {
435
		err = pbms_exception_to_result(&self->myException, &result);
436
	}
437
	cont_(a);
438
	return_(err);
439
}
440
441
#ifdef DRIZZLED
442
int PBMSStorageEngine::savepoint_release(Session *thd, void *sv)
443
{
444
	PBMSStorageEngine * const hton = this;
445
#else
446
static int pbms_savepoint_release(handlerton *hton, THD *thd, void *sv)
447
{
448
#endif
449
	return 0;
450
}
451
452
453
#ifdef DRIZZLED
454
void PBMSStorageEngine::drop_database(char *path)
455
{
456
	PBMSStorageEngine * const hton = this;
457
#else
458
static void pbms_drop_database(handlerton *hton, char *path)
459
{
460
#endif
461
	CSThread *self;
462
	char db_name[PATH_MAX];
463
	PBMSResultRec result;
464
	
465
	if (pbms_enter_conn_no_thd(&self, &result))
466
		return;
467
	inner_();
468
	
469
	cs_strcpy(PATH_MAX, db_name, cs_last_directory_of_path(path));
470
	cs_remove_dir_char(db_name);
471
	try_(a) {
472
		MSDatabase::dropDatabase(db_name);
473
	}
474
	catch_(a);
475
	self->logException();
476
	cont_(a);
477
	exit_();
478
}
479
480
static bool pbms_started = false;
481
482
483
#ifdef DRIZZLED
484
static int pbms_init_func(drizzled::plugin::Registry &registry)
485
#else
486
int pbms_discover_system_tables(handlerton *hton, THD* thd, const char *db, const char *name, uchar **frmblob, size_t *frmlen);
487
static int pbms_init_func(void *p)
488
#endif
489
{
490
	PBMSResultRec		result;
491
	int					err;
492
	int					my_res = 0;
493
	CSThread			*thread;
494
495
	ASSERT(!pbms_started);
496
	pbms_started = false;
497
	
498
	MSDatabase::gRepoThreshold = (uint64_t) cs_byte_size_to_int8(pbms_repository_threshold);
499
	MSDatabase::gTempLogThreshold = (uint64_t) cs_byte_size_to_int8(pbms_temp_log_threshold);
500
501
	{
502
		char info[120];
503
		snprintf(info, 120, "PrimeBase Media Stream (PBMS) Daemon %s loaded...", ms_version());
504
		CSL.logLine(NULL, CSLog::Protocol, info);
505
	}
506
	CSL.logLine(NULL, CSLog::Protocol, "Barry Leslie, PrimeBase Technologies GmbH, http://www.primebase.org");
507
	
508
	if ((err = MSEngine::startUp(&result))) {
509
		CSL.logLine(NULL, CSLog::Error, result.mr_message);
510
		return(1);
511
	}
512
513
#ifdef DRIZZLED
514
		pbms_hton= new PBMSStorageEngine(std::string("PBMS"));
515
		registry.add(pbms_hton);
516
#else
517
	pbms_hton = (handlerton *) p;
518
	pbms_hton->state = SHOW_OPTION_YES;
519
	pbms_hton->close_connection = pbms_close_connection; /* close_connection, cleanup thread related data. */
520
	pbms_hton->create = pbms_create_handler;
521
	pbms_hton->flags = HTON_CAN_RECREATE | HTON_HIDDEN;
522
	pbms_hton->drop_database = pbms_drop_database; /* Drop a database */
523
	pbms_hton->discover = pbms_discover_system_tables;
524
525
	pbms_hton->commit = pbms_commit; /* commit */
526
	pbms_hton->rollback = pbms_rollback; /* rollback */
527
528
	pbms_hton->savepoint_offset = 4;
529
	pbms_hton->savepoint_set = pbms_savepoint_set;
530
	pbms_hton->savepoint_rollback = pbms_savepoint_rollback; 
531
	pbms_hton->savepoint_release = pbms_savepoint_release; 
532
#endif
533
	
534
	/* Startup the Media Stream network: */
535
	cs_init_memory();
536
	CSThread::startUp();
537
	if (!(thread = CSThread::newCSThread())) {
538
		CSException::logOSError(CS_CONTEXT, ENOMEM);
539
		return(1);
540
	}
541
	if (!CSThread::attach(thread)) {
542
		thread->myException.log(NULL);
543
		CSThread::shutDown();
544
		cs_exit_memory();
545
		MSEngine::shutDown();
546
		return(1);
547
	}
548
	enter_();
549
	try_(a) {
550
		thread->threadName = CSString::newString("startup");
551
		//CSTest::runAll();
552
		MSDatabase::startUp(pbms_http_metadata_headers);
553
		MSTableList::startUp();
554
		MSSystemTableShare::startUp();
555
		MSNetwork::startUp(pbms_port);
556
		MSTransactionManager::startUp();
557
		MSNetwork::startNetwork();
558
	}
559
	catch_(a) {
560
		self->logException();
561
		my_res = 1;
562
	}
563
	cont_(a);
564
	if (my_res) {
565
		try_(b) {
566
			MSNetwork::shutDown();
567
			MSTransactionManager::shutDown();
568
			MSSystemTableShare::shutDown();
569
			MSDatabase::stopThreads();
570
			MSTableList::shutDown();
571
			MSDatabase::shutDown();
572
			CSThread::shutDown();
573
		}
574
		catch_(b) {
575
			self->logException();
576
		}
577
		cont_(b);
578
	}
579
	outer_();
580
	CSThread::detach(thread);
581
582
	if (my_res) {
583
		cs_exit_memory();
584
		MSEngine::shutDown();
585
	}
586
	else {
587
		srandom(time(NULL));
588
		pbms_started = true;
589
		
590
	}
591
592
	return(my_res);
593
}
594
595
#ifdef DRIZZLED
596
static int pbms_done_func(drizzled::plugin::Registry &registry)
597
#else
598
static int pbms_done_func(void *)
599
#endif
600
{
601
	CSThread	*thread;
602
603
	if (!pbms_started)
604
		return 0;
605
606
	CSL.logLine(NULL, CSLog::Protocol, "PrimeBase Media Stream (PBMS) Daemon shutdown...");
607
	
608
	/* Shutdown the Media Stream network. */
609
	if (!(thread = CSThread::newCSThread()))
610
		CSException::logOSError(CS_CONTEXT, ENOMEM);
611
	else if (!CSThread::attach(thread))
612
		thread->myException.log(NULL);
613
	else {
614
		enter_();
615
		try_(a) {
616
			thread->threadName = CSString::newString("shutdown");
617
			MSNetwork::shutDown();
618
			MSSystemTableShare::shutDown();
619
			/* Ensure that the database threads are stopped before
620
			 * freeing the tables.
621
			 */
622
			MSDatabase::stopThreads();
623
			MSTableList::shutDown();
624
			/* Databases must be shutdown after table because tables
625
			 * have references to repositories.
626
			 */
627
			MSDatabase::shutDown();
628
			
629
			/* Shutdown the transaction manager after the databases
630
			 * incase they want to commit or rollback a transaction.
631
			 */
632
			MSTransactionManager::shutDown();
633
		}
634
		catch_(a) {
635
			self->logException();
636
		}
637
		cont_(a);
638
		outer_();
639
		CSThread::shutDown();
640
		CSThread::detach(thread);
641
	}
642
643
	MSEngine::shutDown();
644
	cs_exit_memory();
645
646
	CSL.logLine(NULL, CSLog::Protocol, "PrimeBase Media Stream (PBMS) Daemon shutdown completed");
647
	pbms_started = false;
648
#ifdef DRIZZLED
649
	registry.remove(pbms_hton);
650
#endif
651
	return(0);
652
}
653
654
ha_pbms::ha_pbms(handlerton *hton, TABLE_SHARE *table_arg):
655
handler(hton, table_arg),
656
ha_open_tab(NULL),
657
ha_error(0)
658
{
659
	memset(&ha_result, 0, sizeof(PBMSResultRec));
660
}
661
662
MX_TABLE_TYPES_T ha_pbms::table_flags() const
663
{
664
	return (
665
		/* We need this flag because records are not packed
666
		 * into a table which means #ROWID != offset
667
		 */
668
		HA_REC_NOT_IN_SEQ |
669
		HA_CAN_SQL_HANDLER |
670
#if MYSQL_VERSION_ID > 50119
671
		/* We can do row logging, but not statement, because
672
		 * MVCC is not serializable!
673
		 */
674
		HA_BINLOG_ROW_CAPABLE |
675
#endif
676
		/*
677
		 * Auto-increment is allowed on a partial key.
678
		 */
679
		0);
680
}
681
682
int ha_pbms::open(const char *table_path, int mode, uint test_if_locked)
683
{
684
	CSThread *self;
685
686
	if ((ha_error = pbms_enter_conn(current_thd, &self, &ha_result)))
687
		return 1;
688
689
	inner_();
690
	try_(a) {
691
		ha_open_tab = MSSystemTableShare::openSystemTable(table_path, table);
692
		thr_lock_data_init(&ha_open_tab->myShare->myThrLock, &ha_lock, NULL);
693
		ref_length = ha_open_tab->getRefLen();
694
	}
695
	catch_(a) {
696
		ha_error = pbms_exception_to_result(&self->myException, &ha_result);
697
	}
698
	cont_(a);
699
	return_(ha_error != MS_OK);
700
}
701
702
int ha_pbms::close(void)
703
{
704
	CSThread *self;
705
706
	if ((ha_error = pbms_enter_conn(current_thd, &self, &ha_result)))
707
		return 1;
708
709
	inner_();
710
	if (ha_open_tab) {
711
		ha_open_tab->release();
712
		ha_open_tab = NULL;
713
	}
714
	outer_();
715
	pbms_exit_conn();
716
	return 0;
717
}
718
719
#ifdef PBMS_HAS_KEYS
720
/* Index access functions: */
721
int ha_pbms::index_init(uint idx, bool sorted __attribute__((unused)))
722
{
723
	int err = 0;
724
	enter_();
725
	try_(a) {
726
		ha_open_tab->index_init(idx);
727
	}
728
	catch_(a) {
729
		ha_error = pbms_exception_to_result(&self->myException, &ha_result);
730
		err = 1;
731
	}
732
	cont_(a);
733
	return_(err);
734
}
735
736
//-------
737
int ha_pbms::index_end()
738
{
739
	int err = 0;
740
	enter_();
741
	try_(a) {
742
		ha_open_tab->index_end();
743
	}
744
	catch_(a) {
745
		ha_error = pbms_exception_to_result(&self->myException, &ha_result);
746
		err = 1;
747
	}
748
	cont_(a);
749
	return_(err);
750
}
751
752
//-------
753
int ha_pbms::index_read(byte * buf, const byte * key,
754
							 uint key_len, enum ha_rkey_function find_flag)
755
{
756
	int err = 0;
757
	enter_();
758
	try_(a) {
759
		if (!ha_open_tab->index_read(buf, key, key_len, find_flag))
760
			err = HA_ERR_KEY_NOT_FOUND;
761
762
	}
763
	catch_(a) {
764
		ha_error = pbms_exception_to_result(&self->myException, &ha_result);
765
		err = 1;
766
	}
767
	cont_(a);
768
	return_(err);
769
}
770
771
//-------
772
int ha_pbms::index_read_idx(byte * buf, uint idx, const byte * key,
773
									 uint key_len, enum ha_rkey_function find_flag)
774
{
775
	int err = 0;
776
	enter_();
777
	try_(a) {
778
		if (!ha_open_tab->index_read_idx(buf, idx, key, key_len, find_flag))
779
			err = HA_ERR_KEY_NOT_FOUND;
780
	}
781
	catch_(a) {
782
		ha_error = pbms_exception_to_result(&self->myException, &ha_result);
783
		err = 1;
784
	}
785
	cont_(a);
786
	return_(err);
787
}
788
789
//-------
790
int ha_pbms::index_next(byte * buf)
791
{
792
	int err = 0;
793
	enter_();
794
	try_(a) {
795
		if (!ha_open_tab->index_next(buf))
796
			err = HA_ERR_END_OF_FILE;
797
	}
798
	catch_(a) {
799
		ha_error = pbms_exception_to_result(&self->myException, &ha_result);
800
		err = 1;
801
	}
802
	cont_(a);
803
	return_(err);
804
}
805
806
//-------
807
int ha_pbms::index_prev(byte * buf)
808
{
809
	int err = 0;
810
	enter_();
811
	try_(a) {
812
		if (!ha_open_tab->index_prev(buf))
813
			err = HA_ERR_END_OF_FILE;
814
	}
815
	catch_(a) {
816
		ha_error = pbms_exception_to_result(&self->myException, &ha_result);
817
		err = 1;
818
	}
819
	cont_(a);
820
	return_(err);
821
}
822
823
//-------
824
int ha_pbms::index_first(byte * buf)
825
{
826
	int err = 0;
827
	enter_();
828
	try_(a) {
829
		if (!ha_open_tab->index_first(buf))
830
			err = HA_ERR_END_OF_FILE;
831
	}
832
	catch_(a) {
833
		ha_error = pbms_exception_to_result(&self->myException, &ha_result);
834
		err = 1;
835
	}
836
	cont_(a);
837
	return_(err);
838
}
839
840
//-------
841
int ha_pbms::index_last(byte * buf)
842
{
843
	int err = 0;
844
	enter_();
845
	try_(a) {
846
		if (!ha_open_tab->index_last(buf))
847
			err = HA_ERR_END_OF_FILE;
848
	}
849
	catch_(a) {
850
		ha_error = pbms_exception_to_result(&self->myException, &ha_result);
851
		err = 1;
852
	}
853
	cont_(a);
854
	return_(err);
855
}
856
857
//-------
858
int ha_pbms::index_read_last(byte * buf, const byte * key, uint key_len)
859
{
860
	int err = 0;
861
	enter_();
862
	try_(a) {
863
		if (!ha_open_tab->index_read_last(buf, key, key_len))
864
			err = HA_ERR_KEY_NOT_FOUND;
865
	}
866
	catch_(a) {
867
		ha_error = pbms_exception_to_result(&self->myException, &ha_result);
868
		err = 1;
869
	}
870
	cont_(a);
871
	return_(err);
872
}
873
874
//-------
875
876
#endif // PBMS_HAS_KEYS
877
878
/* Sequential scan functions: */
879
int ha_pbms::rnd_init(bool scan)
880
{
881
	int err = 0;
882
	enter_();
883
	try_(a) {
884
		ha_open_tab->seqScanInit();
885
	}
886
	catch_(a) {
887
		ha_error = pbms_exception_to_result(&self->myException, &ha_result);
888
		err = 1;
889
	}
890
	cont_(a);
891
	return_(err);
892
}
893
894
//-------
895
int ha_pbms::rnd_next(unsigned char *buf)
896
{
897
	int err = 0;
898
	enter_();
899
	try_(a) {
900
		if (!ha_open_tab->seqScanNext((char *) buf))
901
			err = HA_ERR_END_OF_FILE;
902
	}
903
	catch_(a) {
904
		ha_error = pbms_exception_to_result(&self->myException, &ha_result);
905
		err = 1;
906
	}
907
	cont_(a);
908
	return_(err);
909
}
910
911
//-------
912
void ha_pbms::position(const unsigned char *record)
913
{
914
	ha_open_tab->seqScanPos((uint8_t *) ref);
915
}
916
917
//-------
918
int ha_pbms::rnd_pos(unsigned char * buf, unsigned char *pos)
919
{
920
	int err = 0;
921
	enter_();
922
	try_(a) {
923
		ha_open_tab->seqScanRead((uint8_t *) pos, (char *) buf);
924
	}
925
	catch_(a) {
926
		ha_error = pbms_exception_to_result(&self->myException, &ha_result);
927
		err = 1;
928
	}
929
	cont_(a);
930
	return_(err);
931
}
932
933
//////////////////////////////
934
int ha_pbms::write_row(unsigned char * buf)
935
{
936
	int err = 0;
937
	enter_();
938
	try_(a) {
939
		ha_open_tab->insertRow((char *) buf);
940
	}
941
	catch_(a) {
942
		ha_error = pbms_exception_to_result(&self->myException, &ha_result);
943
		err = 1;
944
	}
945
	cont_(a);
946
	return_(err);
947
}
948
949
int ha_pbms::delete_row(const unsigned char * buf)
950
{
951
	int err = 0;
952
	enter_();
953
	try_(a) {
954
		ha_open_tab->deleteRow((char *) buf);
955
	}
956
	catch_(a) {
957
		ha_error = pbms_exception_to_result(&self->myException, &ha_result);
958
		err = 1;
959
	}
960
	cont_(a);
961
	return_(err);
962
}
963
964
int ha_pbms::update_row(const unsigned char * old_data, unsigned char * new_data)
965
{
966
	int err = 0;
967
	enter_();
968
	try_(a) {
969
		ha_open_tab->updateRow((char *) old_data, (char *) new_data);
970
	}
971
	catch_(a) {
972
		ha_error = pbms_exception_to_result(&self->myException, &ha_result);
973
		err = 1;
974
	}
975
	cont_(a);
976
	return_(err);
977
}
978
979
int ha_pbms::info(uint flag)
980
{
981
	return 0;
982
}
983
984
int ha_pbms::external_lock(THD *thd, int lock_type)
985
{
986
	CSThread	*self;
987
	int			err = 0;
988
989
	if ((ha_error = pbms_enter_conn(thd, &self, &ha_result)))
990
		return 1;
991
992
	inner_();
993
	try_(a) {
994
		if (lock_type == F_UNLCK)
995
			ha_open_tab->unuse();
996
		else
997
			ha_open_tab->use();
998
	}
999
	catch_(a) {
1000
		ha_error = pbms_exception_to_result(&self->myException, &ha_result);
1001
		err = 1;
1002
	}
1003
	cont_(a);
1004
	return_(err);
1005
}
1006
1007
THR_LOCK_DATA **ha_pbms::store_lock(THD *thd, THR_LOCK_DATA **to, enum thr_lock_type lock_type)
1008
{
1009
	if (lock_type != TL_IGNORE && ha_lock.type == TL_UNLOCK)
1010
		ha_lock.type = lock_type;
1011
	*to++ = &ha_lock;
1012
	return to;
1013
}
1014
1015
#ifndef DRIZZLED
1016
int ha_pbms::create(const char *name, TABLE *table_arg, HA_CREATE_INFO *create_info)
1017
{
1018
	if (pbms_is_Systable(cs_last_name_of_path(name)))
1019
		return(0);
1020
		
1021
	/* Create only works for system tables. */
1022
	return( HA_ERR_WRONG_COMMAND );
1023
}
1024
#endif
1025
1026
bool ha_pbms::get_error_message(int error, String *buf)
1027
{
1028
	if (!ha_result.mr_code)
1029
		return false;
1030
1031
	buf->copy(ha_result.mr_message, strlen(ha_result.mr_message), system_charset_info);
1032
	return true;
1033
}
1034
1035
1036
#ifndef DRIZZLED
1037
struct st_mysql_storage_engine pbms_engine_handler = {
1038
	MYSQL_HANDLERTON_INTERFACE_VERSION
1039
};
1040
#endif
1041
1042
struct st_mysql_sys_var
1043
{
1044
  MYSQL_PLUGIN_VAR_HEADER;
1045
};
1046
1047
#if MYSQL_VERSION_ID < 60000
1048
#if MYSQL_VERSION_ID >= 50124
1049
#define USE_CONST_SAVE
1050
#endif
1051
#else
1052
#if MYSQL_VERSION_ID >= 60005
1053
#define USE_CONST_SAVE
1054
#endif
1055
#endif
1056
1057
#ifdef USE_CONST_SAVE
1058
static void pbms_repository_threshold_func(THD *thd, struct st_mysql_sys_var *var, void *tgt, const void *save)
1059
#else
1060
static void pbms_repository_threshold_func(THD *thd, struct st_mysql_sys_var *var, void *tgt, void *save)
1061
#endif
1062
{
1063
	char *old= *(char **) tgt;
1064
	*(char **)tgt= *(char **) save;
1065
	if (var->flags & PLUGIN_VAR_MEMALLOC)
1066
	{
1067
		*(char **)tgt= my_strdup(*(char **) save, MYF(0));
1068
		my_free(old, MYF(0));
1069
	}
1070
	MSDatabase::gRepoThreshold = (uint64_t) cs_byte_size_to_int8(pbms_repository_threshold);
1071
#ifdef DEBUG
1072
	char buffer[200];
1073
1074
	snprintf(buffer, 200, "pbms_repository_threshold=%"PRIu64"\n", MSDatabase::gRepoThreshold);
1075
	CSL.log(NULL, CSLog::Protocol, buffer);
1076
#endif
1077
}
1078
1079
#ifdef USE_CONST_SAVE
1080
static void pbms_temp_log_threshold_func(THD *thd, struct st_mysql_sys_var *var, void *tgt, const void *save)
1081
#else
1082
static void pbms_temp_log_threshold_func(THD *thd, struct st_mysql_sys_var *var, void *tgt, void *save)
1083
#endif
1084
{
1085
	char *old= *(char **) tgt;
1086
	*(char **)tgt= *(char **) save;
1087
	if (var->flags & PLUGIN_VAR_MEMALLOC)
1088
	{
1089
		*(char **)tgt= my_strdup(*(char **) save, MYF(0));
1090
		my_free(old, MYF(0));
1091
	}
1092
	MSDatabase::gTempLogThreshold = (uint64_t) cs_byte_size_to_int8(pbms_temp_log_threshold);
1093
#ifdef DEBUG
1094
	char buffer[200];
1095
1096
	snprintf(buffer, 200, "pbms_temp_log_threshold=%"PRIu64"\n",MSDatabase::gTempLogThreshold);
1097
	CSL.log(NULL, CSLog::Protocol, buffer);
1098
#endif
1099
}
1100
1101
#ifdef USE_CONST_SAVE
1102
static void pbms_http_metadata_headers_func(THD *thd, struct st_mysql_sys_var *var, void *tgt, const void *save)
1103
#else
1104
static void pbms_http_metadata_headers_func(THD *thd, struct st_mysql_sys_var *var, void *tgt, void *save)
1105
#endif
1106
{
1107
	char *old= *(char **) tgt;
1108
	*(char **)tgt= *(char **) save;
1109
	if (var->flags & PLUGIN_VAR_MEMALLOC)
1110
	{
1111
		*(char **)tgt= my_strdup(*(char **) save, MYF(0));
1112
		my_free(old, MYF(0));
1113
	}
1114
	
1115
	MSHTTPHeaderTable::setDefaultMetaDataHeaders(pbms_http_metadata_headers);
1116
	
1117
#ifdef DEBUG
1118
	char buffer[200];
1119
1120
	snprintf(buffer, 200, "pbms_http_metadata_headers=%"PRIu64"\n", pbms_http_metadata_headers);
1121
	CSL.log(NULL, CSLog::Protocol, buffer);
1122
#endif
1123
}
1124
1125
#ifdef USE_CONST_SAVE
1126
static void pbms_temp_blob_timeout_func(THD *thd, struct st_mysql_sys_var *var, void *tgt, const void *save)
1127
#else
1128
static void pbms_temp_blob_timeout_func(THD *thd, struct st_mysql_sys_var *var, void *tgt, void *save)
1129
#endif
1130
{
1131
	CSThread		*self;
1132
	PBMSResultRec	result;
1133
1134
	*(long *)tgt= *(long *) save;
1135
1136
	if (pbms_enter_conn(thd, &self, &result))
1137
		return;
1138
	MSDatabase::wakeTempLogThreads();
1139
}
1140
1141
#if MYSQL_VERSION_ID >= 50118
1142
static MYSQL_SYSVAR_INT(port, pbms_port,
1143
	PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_READONLY,
1144
	"The port for the server stream-based communications.",
1145
	NULL, NULL, pbms_port, 0, 64*1024, 1);
1146
1147
static MYSQL_SYSVAR_STR(repository_threshold, pbms_repository_threshold,
1148
	PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
1149
	"The maximum size of a BLOB repository file.",
1150
	NULL, /*NULL*/ /**/pbms_repository_threshold_func/**/, MS_REPO_THRESHOLD_DEF);
1151
1152
static MYSQL_SYSVAR_STR(temp_log_threshold, pbms_temp_log_threshold,
1153
	PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
1154
	"The maximum size of a temorary BLOB log file.",
1155
	NULL, /*NULL*/ /**/pbms_temp_log_threshold_func/**/, MS_TEMP_LOG_THRESHOLD_DEF);
1156
1157
static MYSQL_SYSVAR_STR(http_metadata_headers, pbms_http_metadata_headers,
1158
	PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
1159
	"A ':' delimited list of metadata header names to be used to initialize the pbms_metadata_header table when a database is created.",
1160
	NULL, /*NULL*/ /**/pbms_http_metadata_headers_func/**/, MS_HTTP_METADATA_HEADERS_DEF);
1161
1162
static MYSQL_SYSVAR_ULONG(temp_blob_timeout, MSTempLog::gTempBlobTimeout,
1163
	PLUGIN_VAR_OPCMDARG,
1164
	"The timeout, in seconds, for temporary BLOBs. Uploaded blob data is removed after this time, unless committed to the database.",
1165
	NULL, pbms_temp_blob_timeout_func, MS_DEFAULT_TEMP_LOG_WAIT, 1, ~0L, 1);
1166
1167
static MYSQL_SYSVAR_INT(garbage_threshold, MSRepository::gGarbageThreshold,
1168
	PLUGIN_VAR_OPCMDARG,
1169
	"The percentage of garbage in a repository file before it is compacted.",
1170
	NULL, NULL, MS_DEFAULT_GARBAGE_LEVEL, 0, 100, 1);
1171
1172
1173
static MYSQL_SYSVAR_INT(max_keep_alive, MSConnectionHandler::gMaxKeepAlive,
1174
	PLUGIN_VAR_OPCMDARG,
1175
	"The timeout, in milli-seconds, before the HTTP server will close an inactive HTTP connection.",
1176
	NULL, NULL, MS_DEFAULT_KEEP_ALIVE, 1, INT32_MAX, 1);
1177
1178
static struct st_mysql_sys_var* pbms_system_variables[] = {
1179
	MYSQL_SYSVAR(port),
1180
	MYSQL_SYSVAR(repository_threshold),
1181
	MYSQL_SYSVAR(temp_log_threshold),
1182
	MYSQL_SYSVAR(temp_blob_timeout),
1183
	MYSQL_SYSVAR(garbage_threshold),
1184
	MYSQL_SYSVAR(http_metadata_headers),
1185
	MYSQL_SYSVAR(max_keep_alive),
1186
	NULL
1187
};
1188
#endif
1189
1190
#ifdef DRIZZLED
1191
drizzle_declare_plugin(pbms)
1192
#else
1193
mysql_declare_plugin(pbms)
1194
#endif
1195
{
1196
#ifndef DRIZZLED
1197
	MYSQL_STORAGE_ENGINE_PLUGIN,
1198
	&pbms_engine_handler,
1199
#endif
1200
	"PBMS",
1201
#ifdef DRIZZLED
1202
	"1.0",
1203
#endif
1204
	"Barry Leslie, PrimeBase Technologies GmbH",
1205
	"The Media Stream daemon for MySQL",
1206
	PLUGIN_LICENSE_GPL,
1207
	pbms_init_func, /* Plugin Init */
1208
	pbms_done_func, /* Plugin Deinit */
1209
#ifndef DRIZZLED
1210
	0x0001 /* 0.1 */,
1211
#endif
1212
	NULL, 											/* status variables								*/
1213
#if MYSQL_VERSION_ID >= 50118
1214
	pbms_system_variables, 							/* system variables								*/
1215
#else
1216
	NULL,
1217
#endif
1218
	NULL											/* config options								*/
1219
}
1220
#ifdef DRIZZLED
1221
drizzle_declare_plugin_end;
1222
#else
1223
mysql_declare_plugin_end;
1224
#endif