1
/* Copyright (c) 2010 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23
* PBMS daemon global parameters.
30
#include <drizzled/common.h>
31
#include <drizzled/plugin.h>
32
#include <drizzled/session.h>
35
#define my_strdup(a,b) strdup(a)
37
#include "cslib/CSConfig.h"
39
#include "cslib/CSConfig.h"
40
#include "mysql_priv.h"
41
#include <mysql/plugin.h>
48
#include "cslib/CSDefs.h"
49
#include "cslib/CSObject.h"
50
#include "cslib/CSGlobal.h"
51
#include "cslib/CSThread.h"
52
#include "cslib/CSStrUtil.h"
53
#include "cslib/CSPath.h"
54
#include "cslib/CSLog.h"
57
#include "database_ms.h"
58
#include "parameters_ms.h"
61
using namespace drizzled;
62
using namespace drizzled::plugin;
64
#include <drizzled/module/option_map.h>
65
#include <boost/program_options.hpp>
66
namespace po= boost::program_options;
68
#define PBMS_PORT 8080
71
/* Note: 'new' used here is NOT CSObject::new which is a DEBUG define*/
78
static port_constraint pbms_port_number;
80
static std::string my_repository_threshold;
81
static std::string my_temp_log_threshold;
82
static std::string my_http_metadata_headers;
84
typedef drizzled::constrained_check<uint32_t, 100, 0> percent_constraint;
85
static percent_constraint my_garbage_threshold;
86
static uint32_nonzero_constraint my_temp_blob_timeout;
87
static uint32_nonzero_constraint my_max_keep_alive;
88
static uint32_nonzero_constraint my_backup_db_id;
90
static uint32_t my_server_id = 1;
92
uint32_t pbms_port_number;
94
static char *my_repository_threshold = NULL;
95
static char *my_temp_log_threshold = NULL;
96
static char *my_http_metadata_headers = NULL;
98
static u_long my_temp_blob_timeout = MS_DEFAULT_TEMP_LOG_WAIT;
99
static u_long my_garbage_threshold = MS_DEFAULT_GARBAGE_LEVEL;
100
static u_long my_max_keep_alive = MS_DEFAULT_KEEP_ALIVE;
102
static u_long my_backup_db_id = 1;
103
static uint32_t my_server_id = 1;
107
static set<string> my_black_list;
108
static bool my_events_enabled = true;
109
static CSMutex my_table_list_lock;
111
typedef enum {MATCH_ALL, MATCH_DBS, MATCH_SOME, MATCH_NONE, MATCH_UNKNOWN, MATCH_ERROR} TableMatchState;
112
static std::string my_table_list;
114
static TableMatchState my_table_match = MATCH_UNKNOWN;
116
typedef constrained_check<int32_t, INT32_MAX-1, 1> before_position_constraint;
117
static before_position_constraint my_before_insert_position; // Call this event observer first.
118
static before_position_constraint my_before_update_position;
120
using namespace drizzled;
121
using namespace drizzled::plugin;
123
#define st_mysql_sys_var drizzled::drizzle_sys_var
126
struct st_mysql_sys_var
128
MYSQL_PLUGIN_VAR_HEADER;
133
#if MYSQL_VERSION_ID < 60000
135
#if MYSQL_VERSION_ID >= 50124
136
#define CONST_SAVE const
141
#if MYSQL_VERSION_ID >= 60005
142
#define CONST_SAVE const
152
uint32_t PBMSParameters::getPortNumber(){ return pbms_port_number;}
155
uint32_t PBMSParameters::getServerID(){ return my_server_id;}
158
uint64_t PBMSParameters::getRepoThreshold()
161
return (uint64_t) cs_byte_size_to_int8(my_repository_threshold.c_str());
163
if (my_repository_threshold)
164
return((uint64_t) cs_byte_size_to_int8(my_repository_threshold));
166
return((uint64_t) cs_byte_size_to_int8(MS_REPO_THRESHOLD_DEF));
171
uint64_t PBMSParameters::getTempLogThreshold()
174
return (uint64_t) cs_byte_size_to_int8(my_temp_log_threshold.c_str());
176
if (my_temp_log_threshold)
177
return((uint64_t) cs_byte_size_to_int8(my_temp_log_threshold));
179
return((uint64_t) cs_byte_size_to_int8(MS_TEMP_LOG_THRESHOLD_DEF));
184
uint32_t PBMSParameters::getTempBlobTimeout(){ return static_cast<uint32_t>(my_temp_blob_timeout);}
187
uint32_t PBMSParameters::getGarbageThreshold(){ return static_cast<uint32_t>(my_garbage_threshold);}
190
uint32_t PBMSParameters::getMaxKeepAlive(){ return static_cast<uint32_t>(my_max_keep_alive);}
193
const char * PBMSParameters::getDefaultMetaDataHeaders()
196
return my_http_metadata_headers.c_str();
198
if (my_http_metadata_headers)
199
return my_http_metadata_headers;
201
return MS_HTTP_METADATA_HEADERS_DEF;
206
uint32_t PBMSParameters::getBackupDatabaseID() { return static_cast<uint32_t>(my_backup_db_id);}
209
void PBMSParameters::setBackupDatabaseID(uint32_t id) { my_backup_db_id = id;}
213
bool PBMSParameters::isPBMSEventsEnabled() { return my_events_enabled;}
216
#define NEXT_IN_TABLE_LIST(list) {\
217
while ((*list) && (*list != ',')) list++;\
221
static TableMatchState set_match_type(const char *list)
223
const char *ptr = list;
226
TableMatchState match_state;
235
while ((*ptr) && isspace(*ptr)) ptr++;
240
match_state = MATCH_UNKNOWN;
244
// Check database name
247
while ((*ptr) && (!isspace(*ptr)) && (*ptr != ',') && (*ptr != '.')) {ptr++;name_len++;}
248
while ((*ptr) && isspace(*ptr)) ptr++;
251
if ((name_len == 1) && (*name == '*'))
252
match_state = MATCH_ALL;
254
goto bad_list; // Missing table
257
if ((match_state > MATCH_DBS) && (name_len == 1) && (*name == '*'))
258
match_state = MATCH_DBS;
260
ptr++; // Skip the '.'
262
// Find the start of the table name.
263
while ((*ptr) && isspace(*ptr)) ptr++;
264
if ((!*ptr) || (*ptr == ',') || (*ptr == '.'))
265
goto bad_list; // Missing table
267
// Find the end of the table name.
268
while ((*ptr) && (!isspace(*ptr)) && (*ptr != ',') && (*ptr != '.')) ptr++;
271
// Locate the end of the element.
272
while ((*ptr) && isspace(*ptr)) ptr++;
274
if ((*ptr) && (*ptr != ','))
275
goto bad_list; // Bad table name
277
if (match_state > MATCH_SOME)
278
match_state = MATCH_SOME;
281
while ((*ptr) && isspace(*ptr)) ptr++;
288
snprintf(info, 120, "pbms_watch_tables format error near character position %d", (int) (ptr - list));
289
CSL.logLine(NULL, CSLog::Error, info);
290
CSL.logLine(NULL, CSLog::Error, list);
296
static const char* locate_db(const char *list, const char *db, int len)
301
while ((*list) && isspace(*list)) list++;
302
if ((*list == 0) || (*(list+1) == 0) || (*(list+2) == 0)) // We need at least 3 characters
308
else if (strncmp(list, db, len) == 0)
315
while ((*list) && isspace(*list)) list++;
316
if ((*list == 0) || (*(list+1) == 0) ) // We need at least 2 characters
321
while ((*list) && isspace(*list)) list++;
325
return list; // We have gound a table that could belong to this database;
329
NEXT_IN_TABLE_LIST(list);
336
static void temp_blob_timeout_update(Session*, sql_var_t)
339
PBMSResultRec result;
341
if (MSEngine::enterConnectionNoThd(&self, &result))
344
MSDatabase::wakeTempLogThreads();
351
static int table_list_validate(Session*, set_var *var)
353
const char *list= var->value->str_value.ptr();
357
TableMatchState state = set_match_type(list);
358
if (state == MATCH_ERROR)
361
std::string new_list(list);
363
my_table_list_lock.lock();
364
my_table_list.swap(new_list);
365
my_table_match = state;
366
my_table_list_lock.unlock();
375
// Parameter update functions are not called for parameters that are set on
376
// the command line. PBMSParameters::startUp() will perform any initialization required.
378
void PBMSParameters::startUp(drizzled::module::Context &context)
380
void PBMSParameters::startup()
385
my_table_match = set_match_type(my_table_list.c_str());
386
const module::option_map &vm= context.getOptions();
387
my_events_enabled= (vm.count("watch-disable")) ? false : true;
389
context.registerVariable(new sys_var_constrained_value_readonly<in_port_t>("port",
391
context.registerVariable(new sys_var_std_string("repository_threshold",
392
my_repository_threshold));
393
context.registerVariable(new sys_var_std_string("temp_log_threshold",
394
my_temp_log_threshold));
395
context.registerVariable(new sys_var_const_string("http_metadata_headers",
396
my_http_metadata_headers));
397
context.registerVariable(new sys_var_constrained_value_readonly<uint32_t>("garbage_threshold", my_garbage_threshold));
398
context.registerVariable(new sys_var_constrained_value<uint32_t>("temp_blob_timeout",
399
my_temp_blob_timeout,
400
temp_blob_timeout_update));
401
context.registerVariable(new sys_var_constrained_value<uint32_t>("max_keep_alive",
403
context.registerVariable(new sys_var_constrained_value<uint32_t>("next_backup_db_id",
405
context.registerVariable(new sys_var_std_string("watch_tables",
407
table_list_validate));
408
context.registerVariable(new sys_var_bool_ptr("watch_enable",
409
&my_events_enabled));
410
context.registerVariable(new sys_var_constrained_value<int32_t>("before_insert_position",
411
my_before_insert_position));
412
context.registerVariable(new sys_var_constrained_value<int32_t>("before_update_position",
413
my_before_update_position));
416
my_table_match = set_match_type(my_table_list);
422
void PBMSParameters::initOptions(drizzled::module::option_context &context)
425
po::value<port_constraint>(&pbms_port_number)->default_value(DEFAULT_PBMS_PORT),
426
N_("Port number to use for connection or 0 for default PBMS port "));
427
context("repository-threshold",
428
po::value<std::string>(&my_repository_threshold)->default_value(MS_REPO_THRESHOLD_DEF),
429
N_("The maximum size of a BLOB repository file."));
430
context("temp-log-threshold",
431
po::value<std::string>(&my_temp_log_threshold)->default_value(MS_TEMP_LOG_THRESHOLD_DEF),
432
N_("The maximum size of a temorary BLOB log file."));
433
context("http-metadata-headers",
434
po::value<std::string>(&my_http_metadata_headers)->default_value(MS_HTTP_METADATA_HEADERS_DEF),
435
N_("A ':' delimited list of metadata header names to be used to initialize "
436
"the pbms_metadata_header table when a database is created."));
437
context("garbage-threshold",
438
po::value<percent_constraint>(&my_garbage_threshold)->default_value(MS_DEFAULT_GARBAGE_LEVEL),
439
N_("The percentage of garbage in a repository file before it is compacted."));
440
context("temp-blob-timeout",
441
po::value<uint32_nonzero_constraint>(&my_temp_blob_timeout)->default_value(MS_DEFAULT_TEMP_LOG_WAIT),
442
N_("The timeout, in seconds, for temporary BLOBs. Uploaded blob data is removed after this time, unless committed to the database."));
443
context("max-keep-alive",
444
po::value<uint32_nonzero_constraint>(&my_temp_blob_timeout)->default_value(MS_DEFAULT_KEEP_ALIVE),
445
N_("The timeout, in milli-seconds, before the HTTP server will close an inactive HTTP connection."));
446
context("next-backup-db-id",
447
po::value<uint32_nonzero_constraint>(&my_backup_db_id)->default_value(1),
448
N_("The next backup ID to use when backing up a PBMS database."));
449
context("watch-tables",
450
po::value<std::string>(&my_table_list)->default_value("*"),
451
N_("A comma delimited list of tables to watch of the format: <database>.<table>, ..."));
452
context("watch-disable",
453
N_("Enable PBMS daemon Insert/Update/Delete event scanning"));
455
context("before-insert-position",
456
po::value<before_position_constraint>(&my_before_insert_position)->default_value(1),
457
N_("Before insert row event observer call position"));
459
context("before-update-position",
460
po::value<before_position_constraint>(&my_before_update_position)->default_value(1),
461
N_("Before update row event observer call position"));
467
bool PBMSParameters::isBlackListedDB(const char *db)
469
if (my_black_list.find(string(db)) == my_black_list.end())
476
void PBMSParameters::blackListedDB(const char *db)
478
my_black_list.insert(string(db));
482
bool PBMSParameters::try_LocateDB(CSThread *self, const char *db, bool *found)
484
volatile bool rtc = true;
486
lock_(&my_table_list_lock);
489
*found = (locate_db(my_table_list.c_str(), db, strlen(db)) != NULL);
491
unlock_(&my_table_list_lock);
501
bool PBMSParameters::isBLOBDatabase(const char *db)
503
CSThread *self= NULL;
505
PBMSResultRec result;
508
if (isBlackListedDB(db))
511
if (my_table_match == MATCH_UNKNOWN)
514
lock_(&my_table_list_lock);
515
my_table_match = set_match_type(my_table_list.c_str());
516
unlock_(&my_table_list_lock);
523
if (my_table_match == MATCH_NONE)
526
if (my_table_match <= MATCH_DBS)
529
if ((err = MSEngine::enterConnectionNoThd(&self, &result)) == 0) {
532
if (try_LocateDB(self, db, &found)) {
533
err = MSEngine::exceptionToResult(&self->myException, &result);
540
fprintf(stderr, "PBMSParameters::isBLOBDatabase(\"%s\") error (%d):'%s'\n",
541
db, result.mr_code, result.mr_message);
548
bool PBMSParameters::try_LocateTable(CSThread *self, const char *db, const char *table, bool *found)
550
volatile bool rtc = true;
552
int db_len, table_len, match_len;
554
lock_(&my_table_list_lock);
557
table_len = strlen(table);
559
const char *ptr = my_table_list.c_str();
561
ptr = locate_db(ptr, db, db_len);
566
else if (strncmp(ptr, table, table_len) == 0)
567
match_len = table_len;
571
if ((!*ptr) || (*ptr == ',') || isspace(*ptr)) {
577
NEXT_IN_TABLE_LIST(ptr);
581
unlock_(&my_table_list_lock);
591
bool PBMSParameters::isBLOBTable(const char *db, const char *table)
593
CSThread *self= NULL;
595
PBMSResultRec result;
598
if (isBlackListedDB(db))
601
if (my_table_match == MATCH_UNKNOWN)
604
lock_(&my_table_list_lock);
605
my_table_match = set_match_type(my_table_list.c_str());
606
unlock_(&my_table_list_lock);
613
if (my_table_match == MATCH_NONE)
616
if (my_table_match <= MATCH_ALL)
619
if ((err = MSEngine::enterConnectionNoThd(&self, &result)) == 0) {
622
if (try_LocateTable(self, db, table, &found)) {
623
err = MSEngine::exceptionToResult(&self->myException, &result);
630
fprintf(stderr, "PBMSParameters::isBLOBTable(\"%s\", \"%s\") error (%d):'%s'\n",
631
db, table, result.mr_code, result.mr_message);
639
int32_t PBMSParameters::getBeforeUptateEventPosition() { return static_cast<int32_t>(my_before_update_position);}
642
int32_t PBMSParameters::getBeforeInsertEventPosition() { return static_cast<int32_t>(my_before_insert_position);}
647
static void pbms_temp_blob_timeout_func(THD *thd, struct st_mysql_sys_var *var, void *trg, CONST_SAVE void *save)
650
PBMSResultRec result;
655
*(u_long *)trg= *(u_long *) save;
657
if (MSEngine::enterConnectionNoThd(&self, &result))
660
MSDatabase::wakeTempLogThreads();
672
static MYSQL_SYSVAR_UINT(port, pbms_port_number,
674
"The port for the server stream-based communications.",
675
NULL, NULL, PBMS_PORT, 0, 64*1024, 1);
677
static MYSQL_SYSVAR_STR(repository_threshold, my_repository_threshold,
678
PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
679
"The maximum size of a BLOB repository file.",
680
NULL, NULL, MS_REPO_THRESHOLD_DEF);
682
static MYSQL_SYSVAR_STR(temp_log_threshold, my_temp_log_threshold,
683
PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
684
"The maximum size of a temorary BLOB log file.",
685
NULL, NULL, MS_TEMP_LOG_THRESHOLD_DEF);
687
static MYSQL_SYSVAR_STR(http_metadata_headers, my_http_metadata_headers,
688
PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_READONLY,
689
"A ':' delimited list of metadata header names to be used to initialize the pbms_metadata_header table when a database is created.",
690
NULL, NULL , MS_HTTP_METADATA_HEADERS_DEF);
692
static MYSQL_SYSVAR_ULONG(temp_blob_timeout, my_temp_blob_timeout,
694
"The timeout, in seconds, for temporary BLOBs. Uploaded blob data is removed after this time, unless committed to the database.",
695
NULL, pbms_temp_blob_timeout_func, MS_DEFAULT_TEMP_LOG_WAIT, 1, ~0L, 1);
697
static MYSQL_SYSVAR_ULONG(garbage_threshold, my_garbage_threshold,
699
"The percentage of garbage in a repository file before it is compacted.",
700
NULL, NULL, MS_DEFAULT_GARBAGE_LEVEL, 0, 100, 1);
703
static MYSQL_SYSVAR_ULONG(max_keep_alive, my_max_keep_alive,
705
"The timeout, in milli-seconds, before the HTTP server will close an inactive HTTP connection.",
706
NULL, NULL, MS_DEFAULT_KEEP_ALIVE, 1, UINT32_MAX, 1);
708
static MYSQL_SYSVAR_ULONG(next_backup_db_id, my_backup_db_id,
710
"The next backup ID to use when backing up a PBMS database.",
711
NULL, NULL, 1, 1, UINT32_MAX, 1);
713
struct st_mysql_sys_var* pbms_system_variables[] = {
715
MYSQL_SYSVAR(repository_threshold),
716
MYSQL_SYSVAR(temp_log_threshold),
717
MYSQL_SYSVAR(temp_blob_timeout),
718
MYSQL_SYSVAR(garbage_threshold),
719
MYSQL_SYSVAR(http_metadata_headers),
720
MYSQL_SYSVAR(max_keep_alive),
721
MYSQL_SYSVAR(next_backup_db_id),
728
// vim:noexpandtab:sts=8:sw=8:tabstop=8:smarttab: