29
* Defines the implementation of the transaction log file descriptor.
28
* Defines the implementation of the default transaction log.
30
* @see drizzled/plugin/transaction_replicator.h
31
* @see drizzled/plugin/transaction_applier.h
33
* Currently, the transaction log file uses a simple, single-file, append-only
35
* Currently, the log file uses this implementation:
36
37
* We have an atomic off_t called log_offset which keeps track of the
37
* offset into the log file for writing the next log entry. The log
38
* entries are written, one after the other, in the following way:
41
* --------------------------------------
42
* |<- 4 bytes ->|<- # Bytes of Entry ->|
43
* --------------------------------------
44
* | Entry Type | Serialized Entry |
45
* --------------------------------------
48
* The Entry Type is an integer defined as an enumeration in the
49
* /drizzled/message/transaction.proto file called TransactionLogEntry::Type.
51
* Each transaction log entry type is written to the log differently. Here,
52
* we cover the format of each log entry type.
54
* Committed and Prepared Transaction Log Entries
55
* -----------------------------------------------
58
* ------------------------------------------------------------------
59
* |<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
60
* ------------------------------------------------------------------
61
* | Length | Serialized Transaction Message | Checksum |
62
* ------------------------------------------------------------------
38
* offset into the log file for writing the next Transaction.
40
* We write Transaction message encapsulated in an 8-byte length header and a
41
* 4-byte checksum trailer.
43
* When writing a Transaction to the log, we calculate the length of the
44
* Transaction to be written. We then increment log_offset by the length
45
* of the Transaction plus 2 * sizeof(uint32_t) plus sizeof(uint32_t) and store
46
* this new offset in a local off_t called cur_offset (see TransactionLog::apply().
47
* This compare and set is done in an atomic instruction.
49
* We then adjust the local off_t (cur_offset) back to the original
50
* offset by subtracting the length and sizeof(uint32_t) and sizeof(uint32_t).
52
* We then first write a 64-bit length and then the serialized transaction/transaction
53
* and optional checksum to our log file at our local cur_offset.
55
* --------------------------------------------------------------------------------
56
* |<- 4 bytes ->|<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
57
* --------------------------------------------------------------------------------
58
* | Msg Type | Length | Serialized Transaction Message | Checksum |
59
* --------------------------------------------------------------------------------
67
63
* Possibly look at a scoreboard approach with multiple file segments. For
68
64
* right now, though, this is just a quick simple implementation to serve
69
65
* as a skeleton and a springboard.
69
* Move the Applier piece of this code out into its own source file and leave
70
* this for all the glue code of the module.
72
73
#include "config.h"
73
74
#include "transaction_log.h"
75
#include "transaction_log_index.h"
76
#include "data_dictionary_schema.h"
77
#include "print_transaction_message.h"
78
#include "hexdump_transaction_message.h"
79
#include "background_worker.h"
75
81
#include <sys/stat.h>
77
83
#include <unistd.h>
83
#include <drizzled/internal/my_sys.h> /* for internal::my_sync */
84
#include <drizzled/errmsg_print.h>
88
#include "drizzled/internal/my_sys.h" /* for internal::my_sync */
90
#include <drizzled/session.h>
91
#include <drizzled/set_var.h>
85
92
#include <drizzled/gettext.h>
93
#include <drizzled/algorithm/crc32.h>
86
94
#include <drizzled/message/transaction.pb.h>
87
#include <drizzled/transaction_services.h>
88
#include <drizzled/algorithm/crc32.h>
90
95
#include <google/protobuf/io/coded_stream.h>
92
97
using namespace std;
93
98
using namespace drizzled;
94
99
using namespace google;
96
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
98
TransactionLog::TransactionLog(const string in_log_file_path,
99
uint32_t in_flush_frequency,
100
bool in_do_checksum) :
102
* Transaction Log plugin system variable - Is the log enabled? Only used on init().
103
* The enable() and disable() methods of the TransactionLog class control online
106
static bool sysvar_transaction_log_enabled= false;
107
/** Transaction Log plugin system variable - The path to the log file used */
108
static char* sysvar_transaction_log_file= NULL;
110
* Transaction Log plugin system variable - A debugging variable to assist
111
* in truncating the log file.
113
static bool sysvar_transaction_log_truncate_debug= false;
114
static const char DEFAULT_LOG_FILE_PATH[]= "transaction.log"; /* In datadir... */
116
* Transaction Log plugin system variable - Should we write a CRC32 checksum for
117
* each written Transaction message?
119
static bool sysvar_transaction_log_checksum_enabled= false;
121
* Numeric option controlling the sync/flush behaviour of the transaction
124
* TransactionLog::SYNC_METHOD_OS == 0 ... let OS do sync'ing
125
* TransactionLog::SYNC_METHOD_EVERY_WRITE == 1 ... sync on every write
126
* TransactionLog::SYNC_METHOD_EVERY_SECOND == 2 ... sync at most once a second
128
static uint32_t sysvar_transaction_log_sync_method= 0;
130
/** DATA_DICTIONARY views */
131
static TransactionLogTool *transaction_log_tool;
132
static TransactionLogEntriesTool *transaction_log_entries_tool;
133
static TransactionLogTransactionsTool *transaction_log_transactions_tool;
135
/** Index defined in transaction_log_index.cc */
136
extern TransactionLogIndex *transaction_log_index;
138
/** Defined in print_transaction_message.cc */
139
extern plugin::Create_function<PrintTransactionMessageFunction> *print_transaction_message_func_factory;
140
extern plugin::Create_function<HexdumpTransactionMessageFunction> *hexdump_transaction_message_func_factory;
142
TransactionLog::TransactionLog(string name_arg,
143
const string &in_log_file_path,
145
: plugin::TransactionApplier(name_arg),
102
147
log_file_path(in_log_file_path),
103
148
has_error(false),
105
flush_frequency(in_flush_frequency),
106
do_checksum(in_do_checksum)
151
do_checksum= in_do_checksum; /* Have to do here, not in initialization list b/c atomic<> */
108
153
/* Setup our log file and determine the next write offset... */
109
154
log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
110
155
if (log_file == -1)
112
char errmsg[STRERROR_MAX];
113
strerror_r(errno, errmsg, sizeof(errmsg));
114
157
error_message.assign(_("Failed to open transaction log file "));
115
158
error_message.append(log_file_path);
116
159
error_message.append(" Got error: ");
117
error_message.append(errmsg);
160
error_message.append(strerror(errno));
118
161
error_message.push_back('\n');
143
uint8_t *TransactionLog::packTransactionIntoLogEntry(const message::Transaction &trx,
145
uint32_t *checksum_out)
147
uint8_t *orig_buffer= buffer;
148
size_t message_byte_length= trx.ByteSize();
187
TransactionLog::~TransactionLog()
189
/* Clear up any resources we've consumed */
190
if (isEnabled() && log_file != -1)
192
(void) close(log_file);
196
void TransactionLog::apply(const message::Transaction &to_apply)
198
uint8_t *buffer; /* Buffer we will write serialized header,
199
message and trailing checksum to */
200
uint8_t *orig_buffer;
202
size_t message_byte_length= to_apply.ByteSize();
205
size_t total_envelope_length= HEADER_TRAILER_BYTES + message_byte_length;
208
* Attempt allocation of raw memory buffer for the header,
209
* message and trailing checksum bytes.
211
buffer= static_cast<uint8_t *>(malloc(total_envelope_length));
214
errmsg_printf(ERRMSG_LVL_ERROR,
215
_("Failed to allocate enough memory to buffer header, transaction message, and trailing checksum bytes. Tried to allocate %" PRId64
216
" bytes. Error: %s\n"),
217
static_cast<int64_t>(total_envelope_length),
224
orig_buffer= buffer; /* We will free() orig_buffer, as buffer is moved during write */
227
* Do an atomic increment on the offset of the log file position
229
cur_offset= log_offset.fetch_and_add(static_cast<off_t>(total_envelope_length));
232
* We adjust cur_offset back to the original log_offset before
233
* the increment above...
235
cur_offset-= static_cast<off_t>((total_envelope_length));
151
238
* Write the header information, which is the message type and
152
239
* the length of the transaction message into the buffer
154
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
155
static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
156
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
157
static_cast<uint32_t>(message_byte_length), buffer);
241
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
242
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(static_cast<uint32_t>(message_byte_length), buffer);
160
245
* Now write the serialized transaction message, followed
161
246
* by the optional checksum into the buffer.
163
buffer= trx.SerializeWithCachedSizesToArray(buffer);
248
buffer= to_apply.SerializeWithCachedSizesToArray(buffer);
250
uint32_t checksum= 0;
167
*checksum_out= drizzled::algorithm::crc32(
168
reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
253
checksum= drizzled::algorithm::crc32(reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
173
256
/* We always write in network byte order */
174
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(*checksum_out, buffer);
175
/* Reset the pointer back to its original location... */
180
off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
185
* Do an atomic increment on the offset of the log file position
187
off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
257
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(checksum, buffer);
190
260
* Quick safety...if an error occurs above in another writer, the log
197
267
* the original offset where an error occurred.
199
269
log_offset= cur_offset;
203
274
/* Write the full buffer in one swoop */
206
written= pwrite(log_file, data, data_length, cur_offset);
277
written= pwrite(log_file, orig_buffer, total_envelope_length, cur_offset);
208
279
while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
210
if (unlikely(written != static_cast<ssize_t>(data_length)))
281
if (unlikely(written != static_cast<ssize_t>(total_envelope_length)))
212
char errmsg[STRERROR_MAX];
213
strerror_r(errno, errmsg, sizeof(errmsg));
214
errmsg_printf(error::ERROR,
215
_("Failed to write full size of log entry. Tried to write %" PRId64
283
errmsg_printf(ERRMSG_LVL_ERROR,
284
_("Failed to write full size of transaction. Tried to write %" PRId64
216
285
" bytes at offset %" PRId64 ", but only wrote %" PRId32 " bytes. Error: %s\n"),
217
static_cast<int64_t>(data_length),
286
static_cast<int64_t>(total_envelope_length),
218
287
static_cast<int64_t>(cur_offset),
219
static_cast<int32_t>(written),
288
static_cast<int64_t>(written),
223
292
* Reset the log's offset in case we want to produce a decent error message including
224
293
* the original offset where an error occurred.
226
295
log_offset= cur_offset;
229
300
int error_code= syncLogFile();
302
transaction_log_index->addEntry(TransactionLogEntry(ReplicationServices::TRANSACTION,
304
total_envelope_length),
231
308
if (unlikely(error_code != 0))
233
sql_perror(_("Failed to sync log file."));
310
errmsg_printf(ERRMSG_LVL_ERROR,
311
_("Failed to sync log file. Got error: %s\n"),
239
316
int TransactionLog::syncLogFile()
241
switch (flush_frequency)
318
switch (sysvar_transaction_log_sync_method)
243
case FLUSH_FREQUENCY_EVERY_WRITE:
320
case SYNC_METHOD_EVERY_WRITE:
244
321
return internal::my_sync(log_file, 0);
245
case FLUSH_FREQUENCY_EVERY_SECOND:
322
case SYNC_METHOD_EVERY_SECOND:
247
324
time_t now_time= time(NULL);
248
325
if (last_sync_time <= (now_time - 1))
307
396
error_message.clear();
310
const string &TransactionLog::getErrorMessage() const
399
const std::string &TransactionLog::getErrorMessage() const
312
401
return error_message;
315
size_t TransactionLog::getLogEntrySize(const message::Transaction &trx)
317
return trx.ByteSize() + HEADER_TRAILER_BYTES;
404
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
406
static int init(drizzled::plugin::Registry ®istry)
408
/* Create and initialize the transaction log itself */
409
if (sysvar_transaction_log_enabled)
411
transaction_log= new (nothrow) TransactionLog("transaction_log_applier",
412
string(sysvar_transaction_log_file),
413
sysvar_transaction_log_checksum_enabled);
415
if (transaction_log == NULL)
417
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the TransactionLog instance. Got error: %s\n"),
423
/* Check to see if the log was not created properly */
424
if (transaction_log->hasError())
426
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to initialize the Transaction Log. Got error: %s\n"),
427
transaction_log->getErrorMessage().c_str());
431
registry.add(transaction_log);
433
/* Setup DATA_DICTIONARY views */
435
transaction_log_tool= new(std::nothrow)TransactionLogTool;
436
registry.add(transaction_log_tool);
437
transaction_log_entries_tool= new(std::nothrow)TransactionLogEntriesTool;
438
registry.add(transaction_log_entries_tool);
439
transaction_log_transactions_tool= new(std::nothrow)TransactionLogTransactionsTool;
440
registry.add(transaction_log_transactions_tool);
442
/* Setup the module's UDFs */
443
print_transaction_message_func_factory=
444
new plugin::Create_function<PrintTransactionMessageFunction>("print_transaction_message");
445
registry.add(print_transaction_message_func_factory);
447
hexdump_transaction_message_func_factory=
448
new plugin::Create_function<HexdumpTransactionMessageFunction>("hexdump_transaction_message");
449
registry.add(hexdump_transaction_message_func_factory);
451
/* Create and initialize the transaction log index */
452
transaction_log_index= new (nothrow) TransactionLogIndex(*transaction_log);
453
if (transaction_log_index == NULL)
455
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the TransactionLogIndex instance. Got error: %s\n"),
461
/* Check to see if the index was not created properly */
462
if (transaction_log_index->hasError())
464
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to initialize the Transaction Log Index. Got error: %s\n"),
465
transaction_log_index->getErrorMessage().c_str());
471
* Setup the background worker thread which maintains
472
* summary information about the transaction log.
474
if (initTransactionLogBackgroundWorker())
475
return 1; /* Error message output handled in function above */
480
static int deinit(drizzled::plugin::Registry ®istry)
482
/* Cleanup the transaction log itself */
485
registry.remove(transaction_log);
486
delete transaction_log;
487
delete transaction_log_index;
489
/* Cleanup the DATA_DICTIONARY views */
490
registry.remove(transaction_log_tool);
491
delete transaction_log_tool;
492
registry.remove(transaction_log_entries_tool);
493
delete transaction_log_entries_tool;
494
registry.remove(transaction_log_transactions_tool);
495
delete transaction_log_transactions_tool;
497
/* Cleanup module UDFs */
498
registry.remove(print_transaction_message_func_factory);
499
delete print_transaction_message_func_factory;
500
registry.remove(hexdump_transaction_message_func_factory);
501
delete hexdump_transaction_message_func_factory;
507
static void set_truncate_debug(Session *,
513
* The const void * save comes directly from the check function,
514
* which should simply return the result from the set statement.
517
if (*(bool *)save != false)
518
transaction_log->truncate();
521
static DRIZZLE_SYSVAR_BOOL(enable,
522
sysvar_transaction_log_enabled,
524
N_("Enable transaction log"),
525
NULL, /* check func */
526
NULL, /* update func */
527
false /* default */);
529
static DRIZZLE_SYSVAR_BOOL(truncate_debug,
530
sysvar_transaction_log_truncate_debug,
532
N_("DEBUGGING - Truncate transaction log"),
533
NULL, /* check func */
534
set_truncate_debug, /* update func */
535
false /* default */);
537
static DRIZZLE_SYSVAR_STR(log_file,
538
sysvar_transaction_log_file,
540
N_("Path to the file to use for transaction log"),
541
NULL, /* check func */
542
NULL, /* update func*/
543
DEFAULT_LOG_FILE_PATH /* default */);
545
static DRIZZLE_SYSVAR_BOOL(enable_checksum,
546
sysvar_transaction_log_checksum_enabled,
548
N_("Enable CRC32 Checksumming of each written transaction log entry"),
549
NULL, /* check func */
550
NULL, /* update func */
551
false /* default */);
553
static DRIZZLE_SYSVAR_UINT(sync_method,
554
sysvar_transaction_log_sync_method,
556
N_("0 == rely on operating system to sync log file (default), "
557
"1 == sync file at each transaction write, "
558
"2 == sync log file once per second"),
559
NULL, /* check func */
560
NULL, /* update func */
566
static drizzle_sys_var* sys_variables[]= {
567
DRIZZLE_SYSVAR(enable),
568
DRIZZLE_SYSVAR(truncate_debug),
569
DRIZZLE_SYSVAR(log_file),
570
DRIZZLE_SYSVAR(enable_checksum),
571
DRIZZLE_SYSVAR(sync_method),
575
DRIZZLE_PLUGIN(init, deinit, sys_variables);