88
#include "drizzled/internal/my_sys.h" /* for internal::my_sync */
90
#include <drizzled/session.h>
91
#include <drizzled/set_var.h>
76
#include <drizzled/internal/my_sys.h> /* for internal::my_sync */
77
#include <drizzled/errmsg_print.h>
92
78
#include <drizzled/gettext.h>
93
#include <drizzled/algorithm/crc32.h>
94
#include <drizzled/message/transaction.pb.h>
95
#include <google/protobuf/io/coded_stream.h>
97
80
using namespace std;
98
81
using namespace drizzled;
99
using namespace google;
102
* Transaction Log plugin system variable - Is the log enabled? Only used on init().
103
* The enable() and disable() methods of the TransactionLog class control online
106
static bool sysvar_transaction_log_enabled= false;
107
/** Transaction Log plugin system variable - The path to the log file used */
108
static char* sysvar_transaction_log_file= NULL;
110
* Transaction Log plugin system variable - A debugging variable to assist
111
* in truncating the log file.
113
static bool sysvar_transaction_log_truncate_debug= false;
114
static const char DEFAULT_LOG_FILE_PATH[]= "transaction.log"; /* In datadir... */
116
* Transaction Log plugin system variable - Should we write a CRC32 checksum for
117
* each written Transaction message?
119
static bool sysvar_transaction_log_checksum_enabled= false;
121
* Numeric option controlling the sync/flush behaviour of the transaction
124
* TransactionLog::SYNC_METHOD_OS == 0 ... let OS do sync'ing
125
* TransactionLog::SYNC_METHOD_EVERY_WRITE == 1 ... sync on every write
126
* TransactionLog::SYNC_METHOD_EVERY_SECOND == 2 ... sync at most once a second
128
static uint32_t sysvar_transaction_log_sync_method= 0;
130
/** DATA_DICTIONARY views */
131
static TransactionLogTool *transaction_log_tool;
132
static TransactionLogEntriesTool *transaction_log_entries_tool;
133
static TransactionLogTransactionsTool *transaction_log_transactions_tool;
135
/** Index defined in transaction_log_index.cc */
136
extern TransactionLogIndex *transaction_log_index;
138
/** Defined in print_transaction_message.cc */
139
extern plugin::Create_function<PrintTransactionMessageFunction> *print_transaction_message_func_factory;
140
extern plugin::Create_function<HexdumpTransactionMessageFunction> *hexdump_transaction_message_func_factory;
142
TransactionLog::TransactionLog(string name_arg,
143
const string &in_log_file_path,
145
: plugin::TransactionApplier(name_arg),
83
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
85
TransactionLog::TransactionLog(const string in_log_file_path,
86
uint32_t in_sync_method) :
147
88
log_file_path(in_log_file_path),
91
sync_method(in_sync_method)
151
do_checksum= in_do_checksum; /* Have to do here, not in initialization list b/c atomic<> */
153
93
/* Setup our log file and determine the next write offset... */
154
94
log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
155
95
if (log_file == -1)
187
126
TransactionLog::~TransactionLog()
189
128
/* Clear up any resources we've consumed */
190
if (isEnabled() && log_file != -1)
192
131
(void) close(log_file);
196
void TransactionLog::apply(const message::Transaction &to_apply)
135
off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
198
uint8_t *buffer; /* Buffer we will write serialized header,
199
message and trailing checksum to */
200
uint8_t *orig_buffer;
202
size_t message_byte_length= to_apply.ByteSize();
205
size_t total_envelope_length= HEADER_TRAILER_BYTES + message_byte_length;
208
* Attempt allocation of raw memory buffer for the header,
209
* message and trailing checksum bytes.
211
buffer= static_cast<uint8_t *>(malloc(total_envelope_length));
214
errmsg_printf(ERRMSG_LVL_ERROR,
215
_("Failed to allocate enough memory to buffer header, transaction message, and trailing checksum bytes. Tried to allocate %" PRId64
216
" bytes. Error: %s\n"),
217
static_cast<int64_t>(total_envelope_length),
224
orig_buffer= buffer; /* We will free() orig_buffer, as buffer is moved during write */
227
140
* Do an atomic increment on the offset of the log file position
229
cur_offset= log_offset.fetch_and_add(static_cast<off_t>(total_envelope_length));
142
off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
232
145
* We adjust cur_offset back to the original log_offset before
233
146
* the increment above...
235
cur_offset-= static_cast<off_t>((total_envelope_length));
238
* Write the header information, which is the message type and
239
* the length of the transaction message into the buffer
241
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
242
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(static_cast<uint32_t>(message_byte_length), buffer);
245
* Now write the serialized transaction message, followed
246
* by the optional checksum into the buffer.
248
buffer= to_apply.SerializeWithCachedSizesToArray(buffer);
250
uint32_t checksum= 0;
253
checksum= drizzled::algorithm::crc32(reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
256
/* We always write in network byte order */
257
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(checksum, buffer);
148
cur_offset-= static_cast<off_t>(data_length);
260
151
* Quick safety...if an error occurs above in another writer, the log
267
158
* the original offset where an error occurred.
269
160
log_offset= cur_offset;
274
164
/* Write the full buffer in one swoop */
277
written= pwrite(log_file, orig_buffer, total_envelope_length, cur_offset);
167
written= pwrite(log_file, data, data_length, cur_offset);
279
169
while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
281
if (unlikely(written != static_cast<ssize_t>(total_envelope_length)))
171
if (unlikely(written != static_cast<ssize_t>(data_length)))
283
173
errmsg_printf(ERRMSG_LVL_ERROR,
284
_("Failed to write full size of transaction. Tried to write %" PRId64
174
_("Failed to write full size of log entry. Tried to write %" PRId64
285
175
" bytes at offset %" PRId64 ", but only wrote %" PRId32 " bytes. Error: %s\n"),
286
static_cast<int64_t>(total_envelope_length),
176
static_cast<int64_t>(data_length),
287
177
static_cast<int64_t>(cur_offset),
288
178
static_cast<int64_t>(written),
289
179
strerror(errno));
396
267
error_message.clear();
399
const std::string &TransactionLog::getErrorMessage() const
270
const string &TransactionLog::getErrorMessage() const
401
272
return error_message;
404
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
406
static int init(drizzled::plugin::Registry ®istry)
408
/* Create and initialize the transaction log itself */
409
if (sysvar_transaction_log_enabled)
411
transaction_log= new (nothrow) TransactionLog("transaction_log_applier",
412
string(sysvar_transaction_log_file),
413
sysvar_transaction_log_checksum_enabled);
415
if (transaction_log == NULL)
417
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the TransactionLog instance. Got error: %s\n"),
423
/* Check to see if the log was not created properly */
424
if (transaction_log->hasError())
426
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to initialize the Transaction Log. Got error: %s\n"),
427
transaction_log->getErrorMessage().c_str());
431
registry.add(transaction_log);
433
/* Setup DATA_DICTIONARY views */
435
transaction_log_tool= new(std::nothrow)TransactionLogTool;
436
registry.add(transaction_log_tool);
437
transaction_log_entries_tool= new(std::nothrow)TransactionLogEntriesTool;
438
registry.add(transaction_log_entries_tool);
439
transaction_log_transactions_tool= new(std::nothrow)TransactionLogTransactionsTool;
440
registry.add(transaction_log_transactions_tool);
442
/* Setup the module's UDFs */
443
print_transaction_message_func_factory=
444
new plugin::Create_function<PrintTransactionMessageFunction>("print_transaction_message");
445
registry.add(print_transaction_message_func_factory);
447
hexdump_transaction_message_func_factory=
448
new plugin::Create_function<HexdumpTransactionMessageFunction>("hexdump_transaction_message");
449
registry.add(hexdump_transaction_message_func_factory);
451
/* Create and initialize the transaction log index */
452
transaction_log_index= new (nothrow) TransactionLogIndex(*transaction_log);
453
if (transaction_log_index == NULL)
455
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the TransactionLogIndex instance. Got error: %s\n"),
461
/* Check to see if the index was not created properly */
462
if (transaction_log_index->hasError())
464
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to initialize the Transaction Log Index. Got error: %s\n"),
465
transaction_log_index->getErrorMessage().c_str());
471
* Setup the background worker thread which maintains
472
* summary information about the transaction log.
474
if (initTransactionLogBackgroundWorker())
475
return 1; /* Error message output handled in function above */
480
static int deinit(drizzled::plugin::Registry ®istry)
482
/* Cleanup the transaction log itself */
485
registry.remove(transaction_log);
486
delete transaction_log;
487
delete transaction_log_index;
489
/* Cleanup the DATA_DICTIONARY views */
490
registry.remove(transaction_log_tool);
491
delete transaction_log_tool;
492
registry.remove(transaction_log_entries_tool);
493
delete transaction_log_entries_tool;
494
registry.remove(transaction_log_transactions_tool);
495
delete transaction_log_transactions_tool;
497
/* Cleanup module UDFs */
498
registry.remove(print_transaction_message_func_factory);
499
delete print_transaction_message_func_factory;
500
registry.remove(hexdump_transaction_message_func_factory);
501
delete hexdump_transaction_message_func_factory;
507
static void set_truncate_debug(Session *,
513
* The const void * save comes directly from the check function,
514
* which should simply return the result from the set statement.
517
if (*(bool *)save != false)
518
transaction_log->truncate();
521
static DRIZZLE_SYSVAR_BOOL(enable,
522
sysvar_transaction_log_enabled,
524
N_("Enable transaction log"),
525
NULL, /* check func */
526
NULL, /* update func */
527
false /* default */);
529
static DRIZZLE_SYSVAR_BOOL(truncate_debug,
530
sysvar_transaction_log_truncate_debug,
532
N_("DEBUGGING - Truncate transaction log"),
533
NULL, /* check func */
534
set_truncate_debug, /* update func */
535
false /* default */);
537
static DRIZZLE_SYSVAR_STR(log_file,
538
sysvar_transaction_log_file,
540
N_("Path to the file to use for transaction log"),
541
NULL, /* check func */
542
NULL, /* update func*/
543
DEFAULT_LOG_FILE_PATH /* default */);
545
static DRIZZLE_SYSVAR_BOOL(enable_checksum,
546
sysvar_transaction_log_checksum_enabled,
548
N_("Enable CRC32 Checksumming of each written transaction log entry"),
549
NULL, /* check func */
550
NULL, /* update func */
551
false /* default */);
553
static DRIZZLE_SYSVAR_UINT(sync_method,
554
sysvar_transaction_log_sync_method,
556
N_("0 == rely on operating system to sync log file (default), "
557
"1 == sync file at each transaction write, "
558
"2 == sync log file once per second"),
559
NULL, /* check func */
560
NULL, /* update func */
566
static drizzle_sys_var* sys_variables[]= {
567
DRIZZLE_SYSVAR(enable),
568
DRIZZLE_SYSVAR(truncate_debug),
569
DRIZZLE_SYSVAR(log_file),
570
DRIZZLE_SYSVAR(enable_checksum),
571
DRIZZLE_SYSVAR(sync_method),
575
DRIZZLE_PLUGIN(init, deinit, sys_variables);