88
#include "drizzled/internal/my_sys.h" /* for internal::my_sync */
90
#include <drizzled/session.h>
91
#include <drizzled/set_var.h>
76
#include <drizzled/internal/my_sys.h> /* for internal::my_sync */
77
#include <drizzled/errmsg_print.h>
92
78
#include <drizzled/gettext.h>
93
#include <drizzled/algorithm/crc32.h>
94
#include <drizzled/message/transaction.pb.h>
95
#include <google/protobuf/io/coded_stream.h>
97
80
using namespace std;
98
81
using namespace drizzled;
99
using namespace google;
102
* Transaction Log plugin system variable - Is the log enabled? Only used on init().
103
* The enable() and disable() methods of the TransactionLog class control online
106
static bool sysvar_transaction_log_enabled= false;
107
/** Transaction Log plugin system variable - The path to the log file used */
108
static char* sysvar_transaction_log_file= NULL;
110
* Transaction Log plugin system variable - A debugging variable to assist
111
* in truncating the log file.
113
static bool sysvar_transaction_log_truncate_debug= false;
114
static const char DEFAULT_LOG_FILE_PATH[]= "transaction.log"; /* In datadir... */
116
* Transaction Log plugin system variable - Should we write a CRC32 checksum for
117
* each written Transaction message?
119
static bool sysvar_transaction_log_checksum_enabled= false;
121
* Numeric option controlling the sync/flush behaviour of the transaction
124
* TransactionLog::SYNC_METHOD_OS == 0 ... let OS do sync'ing
125
* TransactionLog::SYNC_METHOD_EVERY_WRITE == 1 ... sync on every write
126
* TransactionLog::SYNC_METHOD_EVERY_SECOND == 2 ... sync at most once a second
128
static uint32_t sysvar_transaction_log_sync_method= 0;
130
/** DATA_DICTIONARY views */
131
static TransactionLogTool *transaction_log_tool;
132
static TransactionLogEntriesTool *transaction_log_entries_tool;
133
static TransactionLogTransactionsTool *transaction_log_transactions_tool;
135
/** Index defined in transaction_log_index.cc */
136
extern TransactionLogIndex *transaction_log_index;
138
/** Defined in print_transaction_message.cc */
139
extern plugin::Create_function<PrintTransactionMessageFunction> *print_transaction_message_func_factory;
140
extern plugin::Create_function<HexdumpTransactionMessageFunction> *hexdump_transaction_message_func_factory;
142
TransactionLog::TransactionLog(string name_arg,
143
const string &in_log_file_path,
145
: plugin::TransactionApplier(name_arg),
83
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
85
TransactionLog::TransactionLog(const string in_log_file_path,
86
uint32_t in_sync_method) :
147
88
log_file_path(in_log_file_path),
91
sync_method(in_sync_method)
151
do_checksum= in_do_checksum; /* Have to do here, not in initialization list b/c atomic<> */
153
93
/* Setup our log file and determine the next write offset... */
154
94
log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
155
95
if (log_file == -1)
187
126
TransactionLog::~TransactionLog()
189
128
/* Clear up any resources we've consumed */
190
if (isEnabled() && log_file != -1)
192
131
(void) close(log_file);
196
void TransactionLog::apply(const message::Transaction &to_apply)
135
off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
198
uint8_t *buffer; /* Buffer we will write serialized header,
199
message and trailing checksum to */
200
uint8_t *orig_buffer;
202
size_t message_byte_length= to_apply.ByteSize();
205
size_t total_envelope_length= HEADER_TRAILER_BYTES + message_byte_length;
208
* Attempt allocation of raw memory buffer for the header,
209
* message and trailing checksum bytes.
211
buffer= static_cast<uint8_t *>(malloc(total_envelope_length));
214
errmsg_printf(ERRMSG_LVL_ERROR,
215
_("Failed to allocate enough memory to buffer header, transaction message, and trailing checksum bytes. Tried to allocate %" PRId64
216
" bytes. Error: %s\n"),
217
static_cast<int64_t>(total_envelope_length),
224
orig_buffer= buffer; /* We will free() orig_buffer, as buffer is moved during write */
227
140
* Do an atomic increment on the offset of the log file position
229
cur_offset= log_offset.fetch_and_add(static_cast<off_t>(total_envelope_length));
142
off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
232
145
* We adjust cur_offset back to the original log_offset before
233
146
* the increment above...
235
cur_offset-= static_cast<off_t>((total_envelope_length));
238
* Write the header information, which is the message type and
239
* the length of the transaction message into the buffer
241
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
242
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(static_cast<uint32_t>(message_byte_length), buffer);
245
* Now write the serialized transaction message, followed
246
* by the optional checksum into the buffer.
248
buffer= to_apply.SerializeWithCachedSizesToArray(buffer);
250
uint32_t checksum= 0;
253
checksum= drizzled::algorithm::crc32(reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
256
/* We always write in network byte order */
257
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(checksum, buffer);
148
cur_offset-= static_cast<off_t>(data_length);
260
151
* Quick safety...if an error occurs above in another writer, the log
267
158
* the original offset where an error occurred.
269
160
log_offset= cur_offset;
274
164
/* Write the full buffer in one swoop */
277
written= pwrite(log_file, orig_buffer, total_envelope_length, cur_offset);
167
written= pwrite(log_file, data, data_length, cur_offset);
279
169
while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
281
if (unlikely(written != static_cast<ssize_t>(total_envelope_length)))
171
if (unlikely(written != static_cast<ssize_t>(data_length)))
283
173
errmsg_printf(ERRMSG_LVL_ERROR,
284
_("Failed to write full size of transaction. Tried to write %" PRId64
174
_("Failed to write full size of log entry. Tried to write %" PRId64
285
175
" bytes at offset %" PRId64 ", but only wrote %" PRId32 " bytes. Error: %s\n"),
286
static_cast<int64_t>(total_envelope_length),
176
static_cast<int64_t>(data_length),
287
177
static_cast<int64_t>(cur_offset),
288
178
static_cast<int64_t>(written),
289
179
strerror(errno));
399
267
error_message.clear();
402
const std::string &TransactionLog::getErrorMessage() const
270
const string &TransactionLog::getErrorMessage() const
404
272
return error_message;
407
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
409
static int init(drizzled::plugin::Registry ®istry)
411
/* Create and initialize the transaction log itself */
412
if (sysvar_transaction_log_enabled)
414
transaction_log= new (nothrow) TransactionLog("transaction_log_applier",
415
string(sysvar_transaction_log_file),
416
sysvar_transaction_log_checksum_enabled);
418
if (transaction_log == NULL)
420
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the TransactionLog instance. Got error: %s\n"),
426
/* Check to see if the log was not created properly */
427
if (transaction_log->hasError())
429
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to initialize the Transaction Log. Got error: %s\n"),
430
transaction_log->getErrorMessage().c_str());
434
registry.add(transaction_log);
436
/* Setup DATA_DICTIONARY views */
438
transaction_log_tool= new(std::nothrow)TransactionLogTool;
439
registry.add(transaction_log_tool);
440
transaction_log_entries_tool= new(std::nothrow)TransactionLogEntriesTool;
441
registry.add(transaction_log_entries_tool);
442
transaction_log_transactions_tool= new(std::nothrow)TransactionLogTransactionsTool;
443
registry.add(transaction_log_transactions_tool);
445
/* Setup the module's UDFs */
446
print_transaction_message_func_factory=
447
new plugin::Create_function<PrintTransactionMessageFunction>("print_transaction_message");
448
registry.add(print_transaction_message_func_factory);
450
hexdump_transaction_message_func_factory=
451
new plugin::Create_function<HexdumpTransactionMessageFunction>("hexdump_transaction_message");
452
registry.add(hexdump_transaction_message_func_factory);
454
/* Create and initialize the transaction log index */
455
transaction_log_index= new (nothrow) TransactionLogIndex(*transaction_log);
456
if (transaction_log_index == NULL)
458
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the TransactionLogIndex instance. Got error: %s\n"),
464
/* Check to see if the index was not created properly */
465
if (transaction_log_index->hasError())
467
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to initialize the Transaction Log Index. Got error: %s\n"),
468
transaction_log_index->getErrorMessage().c_str());
474
* Setup the background worker thread which maintains
475
* summary information about the transaction log.
477
if (initTransactionLogBackgroundWorker())
478
return 1; /* Error message output handled in function above */
483
static int deinit(drizzled::plugin::Registry ®istry)
485
/* Cleanup the transaction log itself */
488
registry.remove(transaction_log);
489
delete transaction_log;
490
delete transaction_log_index;
492
/* Cleanup the DATA_DICTIONARY views */
493
registry.remove(transaction_log_tool);
494
delete transaction_log_tool;
495
registry.remove(transaction_log_entries_tool);
496
delete transaction_log_entries_tool;
497
registry.remove(transaction_log_transactions_tool);
498
delete transaction_log_transactions_tool;
500
/* Cleanup module UDFs */
501
registry.remove(print_transaction_message_func_factory);
502
delete print_transaction_message_func_factory;
503
registry.remove(hexdump_transaction_message_func_factory);
504
delete hexdump_transaction_message_func_factory;
510
static void set_truncate_debug(Session *,
516
* The const void * save comes directly from the check function,
517
* which should simply return the result from the set statement.
520
if (*(bool *)save != false)
521
transaction_log->truncate();
524
static DRIZZLE_SYSVAR_BOOL(enable,
525
sysvar_transaction_log_enabled,
527
N_("Enable transaction log"),
528
NULL, /* check func */
529
NULL, /* update func */
530
false /* default */);
532
static DRIZZLE_SYSVAR_BOOL(truncate_debug,
533
sysvar_transaction_log_truncate_debug,
535
N_("DEBUGGING - Truncate transaction log"),
536
NULL, /* check func */
537
set_truncate_debug, /* update func */
538
false /* default */);
540
static DRIZZLE_SYSVAR_STR(log_file,
541
sysvar_transaction_log_file,
543
N_("Path to the file to use for transaction log"),
544
NULL, /* check func */
545
NULL, /* update func*/
546
DEFAULT_LOG_FILE_PATH /* default */);
548
static DRIZZLE_SYSVAR_BOOL(enable_checksum,
549
sysvar_transaction_log_checksum_enabled,
551
N_("Enable CRC32 Checksumming of each written transaction log entry"),
552
NULL, /* check func */
553
NULL, /* update func */
554
false /* default */);
556
static DRIZZLE_SYSVAR_UINT(sync_method,
557
sysvar_transaction_log_sync_method,
559
N_("0 == rely on operating system to sync log file (default), "
560
"1 == sync file at each transaction write, "
561
"2 == sync log file once per second"),
562
NULL, /* check func */
563
NULL, /* update func */
569
static drizzle_sys_var* sys_variables[]= {
570
DRIZZLE_SYSVAR(enable),
571
DRIZZLE_SYSVAR(truncate_debug),
572
DRIZZLE_SYSVAR(log_file),
573
DRIZZLE_SYSVAR(enable_checksum),
574
DRIZZLE_SYSVAR(sync_method),
578
DRIZZLE_PLUGIN(init, deinit, sys_variables);