1
/* Copyright (C) 2010 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23
* PBMS daemon global parameters.
30
#include <drizzled/common.h>
31
#include <drizzled/plugin.h>
32
#include <drizzled/session.h>
33
#include <drizzled/sql_lex.h>
36
#define my_strdup(a,b) strdup(a)
38
#include "cslib/CSConfig.h"
40
#include "cslib/CSConfig.h"
41
#include "mysql_priv.h"
42
#include <mysql/plugin.h>
49
#include "cslib/CSDefs.h"
50
#include "cslib/CSObject.h"
51
#include "cslib/CSGlobal.h"
52
#include "cslib/CSThread.h"
53
#include "cslib/CSStrUtil.h"
54
#include "cslib/CSPath.h"
55
#include "cslib/CSLog.h"
58
#include "database_ms.h"
59
#include "parameters_ms.h"
62
using namespace drizzled;
63
using namespace drizzled::plugin;
65
#include <drizzled/module/option_map.h>
66
#include <boost/program_options.hpp>
67
namespace po= boost::program_options;
69
#define PBMS_PORT 8080
72
/* Note: 'new' used here is NOT CSObject::new which is a DEBUG define*/
79
static port_constraint pbms_port_number;
81
static std::string my_repository_threshold;
82
static std::string my_temp_log_threshold;
83
static std::string my_http_metadata_headers;
85
typedef drizzled::constrained_check<uint32_t, 100, 0> percent_constraint;
86
static percent_constraint my_garbage_threshold;
87
static uint32_nonzero_constraint my_temp_blob_timeout;
88
static uint32_nonzero_constraint my_max_keep_alive;
89
static uint32_nonzero_constraint my_backup_db_id;
91
static uint32_t my_server_id = 1;
93
uint32_t pbms_port_number;
95
static char *my_repository_threshold = NULL;
96
static char *my_temp_log_threshold = NULL;
97
static char *my_http_metadata_headers = NULL;
99
static u_long my_temp_blob_timeout = MS_DEFAULT_TEMP_LOG_WAIT;
100
static u_long my_garbage_threshold = MS_DEFAULT_GARBAGE_LEVEL;
101
static u_long my_max_keep_alive = MS_DEFAULT_KEEP_ALIVE;
103
static u_long my_backup_db_id = 1;
104
static uint32_t my_server_id = 1;
108
static set<string> my_black_list;
109
static bool my_events_enabled = true;
110
static CSMutex my_table_list_lock;
112
typedef enum {MATCH_ALL, MATCH_DBS, MATCH_SOME, MATCH_NONE, MATCH_UNKNOWN, MATCH_ERROR} TableMatchState;
113
static std::string my_table_list;
115
static TableMatchState my_table_match = MATCH_UNKNOWN;
117
typedef constrained_check<int32_t, INT32_MAX-1, 1> before_position_constraint;
118
static before_position_constraint my_before_insert_position; // Call this event observer first.
119
static before_position_constraint my_before_update_position;
121
using namespace drizzled;
122
using namespace drizzled::plugin;
124
#define st_mysql_sys_var drizzled::drizzle_sys_var
127
struct st_mysql_sys_var
129
MYSQL_PLUGIN_VAR_HEADER;
134
#if MYSQL_VERSION_ID < 60000
136
#if MYSQL_VERSION_ID >= 50124
137
#define CONST_SAVE const
142
#if MYSQL_VERSION_ID >= 60005
143
#define CONST_SAVE const
153
uint32_t PBMSParameters::getPortNumber(){ return pbms_port_number;}
156
uint32_t PBMSParameters::getServerID(){ return my_server_id;}
159
uint64_t PBMSParameters::getRepoThreshold()
162
return (uint64_t) cs_byte_size_to_int8(my_repository_threshold.c_str());
164
if (my_repository_threshold)
165
return((uint64_t) cs_byte_size_to_int8(my_repository_threshold));
167
return((uint64_t) cs_byte_size_to_int8(MS_REPO_THRESHOLD_DEF));
172
uint64_t PBMSParameters::getTempLogThreshold()
175
return (uint64_t) cs_byte_size_to_int8(my_temp_log_threshold.c_str());
177
if (my_temp_log_threshold)
178
return((uint64_t) cs_byte_size_to_int8(my_temp_log_threshold));
180
return((uint64_t) cs_byte_size_to_int8(MS_TEMP_LOG_THRESHOLD_DEF));
185
uint32_t PBMSParameters::getTempBlobTimeout(){ return static_cast<uint32_t>(my_temp_blob_timeout);}
188
uint32_t PBMSParameters::getGarbageThreshold(){ return static_cast<uint32_t>(my_garbage_threshold);}
191
uint32_t PBMSParameters::getMaxKeepAlive(){ return static_cast<uint32_t>(my_max_keep_alive);}
194
const char * PBMSParameters::getDefaultMetaDataHeaders()
197
return my_http_metadata_headers.c_str();
199
if (my_http_metadata_headers)
200
return my_http_metadata_headers;
202
return MS_HTTP_METADATA_HEADERS_DEF;
207
uint32_t PBMSParameters::getBackupDatabaseID() { return static_cast<uint32_t>(my_backup_db_id);}
210
void PBMSParameters::setBackupDatabaseID(uint32_t id) { my_backup_db_id = id;}
214
bool PBMSParameters::isPBMSEventsEnabled() { return my_events_enabled;}
217
#define NEXT_IN_TABLE_LIST(list) {\
218
while ((*list) && (*list != ',')) list++;\
222
static TableMatchState set_match_type(const char *list)
224
const char *ptr = list;
227
TableMatchState match_state;
236
while ((*ptr) && isspace(*ptr)) ptr++;
241
match_state = MATCH_UNKNOWN;
245
// Check database name
248
while ((*ptr) && (!isspace(*ptr)) && (*ptr != ',') && (*ptr != '.')) {ptr++;name_len++;}
249
while ((*ptr) && isspace(*ptr)) ptr++;
252
if ((name_len == 1) && (*name == '*'))
253
match_state = MATCH_ALL;
255
goto bad_list; // Missing table
258
if ((match_state > MATCH_DBS) && (name_len == 1) && (*name == '*'))
259
match_state = MATCH_DBS;
261
ptr++; // Skip the '.'
263
// Find the start of the table name.
264
while ((*ptr) && isspace(*ptr)) ptr++;
265
if ((!*ptr) || (*ptr == ',') || (*ptr == '.'))
266
goto bad_list; // Missing table
268
// Find the end of the table name.
269
while ((*ptr) && (!isspace(*ptr)) && (*ptr != ',') && (*ptr != '.')) ptr++;
272
// Locate the end of the element.
273
while ((*ptr) && isspace(*ptr)) ptr++;
275
if ((*ptr) && (*ptr != ','))
276
goto bad_list; // Bad table name
278
if (match_state > MATCH_SOME)
279
match_state = MATCH_SOME;
282
while ((*ptr) && isspace(*ptr)) ptr++;
289
snprintf(info, 120, "pbms_watch_tables format error near character position %d", (int) (ptr - list));
290
CSL.logLine(NULL, CSLog::Error, info);
291
CSL.logLine(NULL, CSLog::Error, list);
297
static const char* locate_db(const char *list, const char *db, int len)
302
while ((*list) && isspace(*list)) list++;
303
if ((*list == 0) || (*(list+1) == 0) || (*(list+2) == 0)) // We need at least 3 characters
309
else if (strncmp(list, db, len) == 0)
316
while ((*list) && isspace(*list)) list++;
317
if ((*list == 0) || (*(list+1) == 0) ) // We need at least 2 characters
322
while ((*list) && isspace(*list)) list++;
326
return list; // We have gound a table that could belong to this database;
330
NEXT_IN_TABLE_LIST(list);
337
static void temp_blob_timeout_update(Session*, sql_var_t)
340
PBMSResultRec result;
342
if (MSEngine::enterConnectionNoThd(&self, &result))
345
MSDatabase::wakeTempLogThreads();
352
static int table_list_validate(Session*, set_var *var)
354
const char *list= var->value->str_value.ptr();
358
TableMatchState state = set_match_type(list);
359
if (state == MATCH_ERROR)
362
std::string new_list(list);
364
my_table_list_lock.lock();
365
my_table_list.swap(new_list);
366
my_table_match = state;
367
my_table_list_lock.unlock();
376
// Parameter update functions are not called for parameters that are set on
377
// the command line. PBMSParameters::startUp() will perform any initialization required.
379
void PBMSParameters::startUp(drizzled::module::Context &context)
381
void PBMSParameters::startup()
386
my_table_match = set_match_type(my_table_list.c_str());
387
const module::option_map &vm= context.getOptions();
388
my_events_enabled= not vm.count("watch-disable");
390
context.registerVariable(new sys_var_constrained_value_readonly<in_port_t>("port",
392
context.registerVariable(new sys_var_std_string("repository_threshold",
393
my_repository_threshold));
394
context.registerVariable(new sys_var_std_string("temp_log_threshold",
395
my_temp_log_threshold));
396
context.registerVariable(new sys_var_const_string("http_metadata_headers",
397
my_http_metadata_headers));
398
context.registerVariable(new sys_var_constrained_value_readonly<uint32_t>("garbage_threshold", my_garbage_threshold));
399
context.registerVariable(new sys_var_constrained_value<uint32_t>("temp_blob_timeout",
400
my_temp_blob_timeout,
401
temp_blob_timeout_update));
402
context.registerVariable(new sys_var_constrained_value<uint32_t>("max_keep_alive",
404
context.registerVariable(new sys_var_constrained_value<uint32_t>("next_backup_db_id",
406
context.registerVariable(new sys_var_std_string("watch_tables",
408
table_list_validate));
409
context.registerVariable(new sys_var_bool_ptr("watch_enable",
410
&my_events_enabled));
411
context.registerVariable(new sys_var_constrained_value<int32_t>("before_insert_position",
412
my_before_insert_position));
413
context.registerVariable(new sys_var_constrained_value<int32_t>("before_update_position",
414
my_before_update_position));
417
my_table_match = set_match_type(my_table_list);
423
void PBMSParameters::initOptions(drizzled::module::option_context &context)
426
po::value<port_constraint>(&pbms_port_number)->default_value(DEFAULT_PBMS_PORT),
427
_("Port number to use for connection or 0 for default PBMS port "));
428
context("repository-threshold",
429
po::value<std::string>(&my_repository_threshold)->default_value(MS_REPO_THRESHOLD_DEF),
430
_("The maximum size of a BLOB repository file."));
431
context("temp-log-threshold",
432
po::value<std::string>(&my_temp_log_threshold)->default_value(MS_TEMP_LOG_THRESHOLD_DEF),
433
_("The maximum size of a temorary BLOB log file."));
434
context("http-metadata-headers",
435
po::value<std::string>(&my_http_metadata_headers)->default_value(MS_HTTP_METADATA_HEADERS_DEF),
436
_("A ':' delimited list of metadata header names to be used to initialize "
437
"the pbms_metadata_header table when a database is created."));
438
context("garbage-threshold",
439
po::value<percent_constraint>(&my_garbage_threshold)->default_value(MS_DEFAULT_GARBAGE_LEVEL),
440
_("The percentage of garbage in a repository file before it is compacted."));
441
context("temp-blob-timeout",
442
po::value<uint32_nonzero_constraint>(&my_temp_blob_timeout)->default_value(MS_DEFAULT_TEMP_LOG_WAIT),
443
_("The timeout, in seconds, for temporary BLOBs. Uploaded blob data is removed after this time, unless committed to the database."));
444
context("max-keep-alive",
445
po::value<uint32_nonzero_constraint>(&my_temp_blob_timeout)->default_value(MS_DEFAULT_KEEP_ALIVE),
446
_("The timeout, in milli-seconds, before the HTTP server will close an inactive HTTP connection."));
447
context("next-backup-db-id",
448
po::value<uint32_nonzero_constraint>(&my_backup_db_id)->default_value(1),
449
_("The next backup ID to use when backing up a PBMS database."));
450
context("watch-tables",
451
po::value<std::string>(&my_table_list)->default_value("*"),
452
_("A comma delimited list of tables to watch of the format: <database>.<table>, ..."));
453
context("watch-disable",
454
_("Enable PBMS daemon Insert/Update/Delete event scanning"));
456
context("before-insert-position",
457
po::value<before_position_constraint>(&my_before_insert_position)->default_value(1),
458
_("Before insert row event observer call position"));
460
context("before-update-position",
461
po::value<before_position_constraint>(&my_before_update_position)->default_value(1),
462
_("Before update row event observer call position"));
468
bool PBMSParameters::isBlackListedDB(const char *db)
470
if (my_black_list.find(string(db)) == my_black_list.end())
477
void PBMSParameters::blackListedDB(const char *db)
479
my_black_list.insert(string(db));
483
bool PBMSParameters::try_LocateDB(CSThread *self, const char *db, bool *found)
485
volatile bool rtc = true;
487
lock_(&my_table_list_lock);
490
*found = (locate_db(my_table_list.c_str(), db, strlen(db)) != NULL);
492
unlock_(&my_table_list_lock);
502
bool PBMSParameters::isBLOBDatabase(const char *db)
504
CSThread *self= NULL;
506
PBMSResultRec result;
509
if (isBlackListedDB(db))
512
if (my_table_match == MATCH_UNKNOWN)
515
lock_(&my_table_list_lock);
516
my_table_match = set_match_type(my_table_list.c_str());
517
unlock_(&my_table_list_lock);
524
if (my_table_match == MATCH_NONE)
527
if (my_table_match <= MATCH_DBS)
530
if ((err = MSEngine::enterConnectionNoThd(&self, &result)) == 0) {
533
if (try_LocateDB(self, db, &found)) {
534
err = MSEngine::exceptionToResult(&self->myException, &result);
541
fprintf(stderr, "PBMSParameters::isBLOBDatabase(\"%s\") error (%d):'%s'\n",
542
db, result.mr_code, result.mr_message);
549
bool PBMSParameters::try_LocateTable(CSThread *self, const char *db, const char *table, bool *found)
551
volatile bool rtc = true;
553
int db_len, table_len, match_len;
555
lock_(&my_table_list_lock);
558
table_len = strlen(table);
560
const char *ptr = my_table_list.c_str();
562
ptr = locate_db(ptr, db, db_len);
567
else if (strncmp(ptr, table, table_len) == 0)
568
match_len = table_len;
572
if ((!*ptr) || (*ptr == ',') || isspace(*ptr)) {
578
NEXT_IN_TABLE_LIST(ptr);
582
unlock_(&my_table_list_lock);
592
bool PBMSParameters::isBLOBTable(const char *db, const char *table)
594
CSThread *self= NULL;
596
PBMSResultRec result;
599
if (isBlackListedDB(db))
602
if (my_table_match == MATCH_UNKNOWN)
605
lock_(&my_table_list_lock);
606
my_table_match = set_match_type(my_table_list.c_str());
607
unlock_(&my_table_list_lock);
614
if (my_table_match == MATCH_NONE)
617
if (my_table_match <= MATCH_ALL)
620
if ((err = MSEngine::enterConnectionNoThd(&self, &result)) == 0) {
623
if (try_LocateTable(self, db, table, &found)) {
624
err = MSEngine::exceptionToResult(&self->myException, &result);
631
fprintf(stderr, "PBMSParameters::isBLOBTable(\"%s\", \"%s\") error (%d):'%s'\n",
632
db, table, result.mr_code, result.mr_message);
640
int32_t PBMSParameters::getBeforeUptateEventPosition() { return static_cast<int32_t>(my_before_update_position);}
643
int32_t PBMSParameters::getBeforeInsertEventPosition() { return static_cast<int32_t>(my_before_insert_position);}
648
static void pbms_temp_blob_timeout_func(THD *thd, struct st_mysql_sys_var *var, void *trg, CONST_SAVE void *save)
651
PBMSResultRec result;
656
*(u_long *)trg= *(u_long *) save;
658
if (MSEngine::enterConnectionNoThd(&self, &result))
661
MSDatabase::wakeTempLogThreads();
673
static MYSQL_SYSVAR_UINT(port, pbms_port_number,
675
"The port for the server stream-based communications.",
676
NULL, NULL, PBMS_PORT, 0, 64*1024, 1);
678
static MYSQL_SYSVAR_STR(repository_threshold, my_repository_threshold,
679
PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
680
"The maximum size of a BLOB repository file.",
681
NULL, NULL, MS_REPO_THRESHOLD_DEF);
683
static MYSQL_SYSVAR_STR(temp_log_threshold, my_temp_log_threshold,
684
PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
685
"The maximum size of a temorary BLOB log file.",
686
NULL, NULL, MS_TEMP_LOG_THRESHOLD_DEF);
688
static MYSQL_SYSVAR_STR(http_metadata_headers, my_http_metadata_headers,
689
PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_READONLY,
690
"A ':' delimited list of metadata header names to be used to initialize the pbms_metadata_header table when a database is created.",
691
NULL, NULL , MS_HTTP_METADATA_HEADERS_DEF);
693
static MYSQL_SYSVAR_ULONG(temp_blob_timeout, my_temp_blob_timeout,
695
"The timeout, in seconds, for temporary BLOBs. Uploaded blob data is removed after this time, unless committed to the database.",
696
NULL, pbms_temp_blob_timeout_func, MS_DEFAULT_TEMP_LOG_WAIT, 1, ~0L, 1);
698
static MYSQL_SYSVAR_ULONG(garbage_threshold, my_garbage_threshold,
700
"The percentage of garbage in a repository file before it is compacted.",
701
NULL, NULL, MS_DEFAULT_GARBAGE_LEVEL, 0, 100, 1);
704
static MYSQL_SYSVAR_ULONG(max_keep_alive, my_max_keep_alive,
706
"The timeout, in milli-seconds, before the HTTP server will close an inactive HTTP connection.",
707
NULL, NULL, MS_DEFAULT_KEEP_ALIVE, 1, UINT32_MAX, 1);
709
static MYSQL_SYSVAR_ULONG(next_backup_db_id, my_backup_db_id,
711
"The next backup ID to use when backing up a PBMS database.",
712
NULL, NULL, 1, 1, UINT32_MAX, 1);
714
struct st_mysql_sys_var* pbms_system_variables[] = {
716
MYSQL_SYSVAR(repository_threshold),
717
MYSQL_SYSVAR(temp_log_threshold),
718
MYSQL_SYSVAR(temp_blob_timeout),
719
MYSQL_SYSVAR(garbage_threshold),
720
MYSQL_SYSVAR(http_metadata_headers),
721
MYSQL_SYSVAR(max_keep_alive),
722
MYSQL_SYSVAR(next_backup_db_id),
729
// vim:noexpandtab:sts=8:sw=8:tabstop=8:smarttab: