29
* Defines the implementation of the transaction log file descriptor.
28
* Defines the implementation of the default transaction log.
30
* @see drizzled/plugin/transaction_replicator.h
31
* @see drizzled/plugin/transaction_applier.h
33
* Currently, the transaction log file uses a simple, single-file, append-only
35
* Currently, the log file uses this implementation:
36
37
* We have an atomic off_t called log_offset which keeps track of the
37
* offset into the log file for writing the next log entry. The log
38
* entries are written, one after the other, in the following way:
41
* --------------------------------------
42
* |<- 4 bytes ->|<- # Bytes of Entry ->|
43
* --------------------------------------
44
* | Entry Type | Serialized Entry |
45
* --------------------------------------
48
* The Entry Type is an integer defined as an enumeration in the
49
* /drizzled/message/transaction.proto file called TransactionLogEntry::Type.
51
* Each transaction log entry type is written to the log differently. Here,
52
* we cover the format of each log entry type.
54
* Committed and Prepared Transaction Log Entries
55
* -----------------------------------------------
58
* ------------------------------------------------------------------
59
* |<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
60
* ------------------------------------------------------------------
61
* | Length | Serialized Transaction Message | Checksum |
62
* ------------------------------------------------------------------
38
* offset into the log file for writing the next Transaction.
40
* We write Transaction message encapsulated in an 8-byte length header and a
41
* 4-byte checksum trailer.
43
* When writing a Transaction to the log, we calculate the length of the
44
* Transaction to be written. We then increment log_offset by the length
45
* of the Transaction plus 2 * sizeof(uint32_t) plus sizeof(uint32_t) and store
46
* this new offset in a local off_t called cur_offset (see TransactionLog::apply().
47
* This compare and set is done in an atomic instruction.
49
* We then adjust the local off_t (cur_offset) back to the original
50
* offset by subtracting the length and sizeof(uint32_t) and sizeof(uint32_t).
52
* We then first write a 64-bit length and then the serialized transaction/transaction
53
* and optional checksum to our log file at our local cur_offset.
55
* --------------------------------------------------------------------------------
56
* |<- 4 bytes ->|<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
57
* --------------------------------------------------------------------------------
58
* | Msg Type | Length | Serialized Transaction Message | Checksum |
59
* --------------------------------------------------------------------------------
69
65
* as a skeleton and a springboard.
68
#include <drizzled/server_includes.h>
73
69
#include "transaction_log.h"
77
71
#include <unistd.h>
83
#include <drizzled/internal/my_sys.h> /* for internal::my_sync */
84
#include <drizzled/errmsg_print.h>
76
#include <drizzled/session.h>
77
#include <drizzled/set_var.h>
85
78
#include <drizzled/gettext.h>
79
#include <drizzled/hash/crc32.h>
86
80
#include <drizzled/message/transaction.pb.h>
87
#include <drizzled/transaction_services.h>
88
#include <drizzled/algorithm/crc32.h>
90
81
#include <google/protobuf/io/coded_stream.h>
92
83
using namespace std;
93
84
using namespace drizzled;
94
85
using namespace google;
96
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
88
* Transaction Log plugin system variable - Is the log enabled? Only used on init().
89
* The enable() and disable() methods of the TransactionLog class control online
92
static bool sysvar_transaction_log_enabled= false;
93
/** Transaction Log plugin system variable - The path to the log file used */
94
static char* sysvar_transaction_log_file= NULL;
96
* Transaction Log plugin system variable - A debugging variable to assist
97
* in truncating the log file.
99
static bool sysvar_transaction_log_truncate_debug= false;
100
static const char DEFAULT_LOG_FILE_PATH[]= "transaction.log"; /* In datadir... */
102
* Transaction Log plugin system variable - Should we write a CRC32 checksum for
103
* each written Transaction message?
105
static bool sysvar_transaction_log_checksum_enabled= false;
98
TransactionLog::TransactionLog(const string in_log_file_path,
99
uint32_t in_flush_frequency,
100
bool in_do_checksum) :
107
TransactionLog::TransactionLog(string name_arg,
108
const char *in_log_file_path,
110
: plugin::TransactionApplier(name_arg),
102
log_file_path(in_log_file_path),
105
flush_frequency(in_flush_frequency),
106
do_checksum(in_do_checksum)
112
log_file_path(in_log_file_path)
114
do_checksum= in_do_checksum; /* Have to do here, not in initialization list b/c atomic<> */
108
116
/* Setup our log file and determine the next write offset... */
109
log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
117
log_file= open(log_file_path, O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
110
118
if (log_file == -1)
112
char errmsg[STRERROR_MAX];
113
strerror_r(errno, errmsg, sizeof(errmsg));
114
error_message.assign(_("Failed to open transaction log file "));
115
error_message.append(log_file_path);
116
error_message.append(" Got error: ");
117
error_message.append(errmsg);
118
error_message.push_back('\n');
120
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to open transaction log file %s. Got error: %s\n"),
123
/* For convenience, grab the log file name from the path */
124
if (log_file_path.find_first_of('/') != string::npos)
126
/* Strip to last / */
128
tmp= log_file_path.substr(log_file_path.find_last_of('/') + 1);
129
log_file_name.assign(tmp);
132
log_file_name.assign(log_file_path);
135
128
* The offset of the next write is the current position of the log
136
129
* file, since it's opened in append mode...
143
uint8_t *TransactionLog::packTransactionIntoLogEntry(const message::Transaction &trx,
145
uint32_t *checksum_out)
147
uint8_t *orig_buffer= buffer;
148
size_t message_byte_length= trx.ByteSize();
136
TransactionLog::~TransactionLog()
138
/* Clear up any resources we've consumed */
139
if (isEnabled() && log_file != -1)
141
(void) close(log_file);
145
void TransactionLog::apply(const message::Transaction &to_apply)
147
uint8_t *buffer; /* Buffer we will write serialized header,
148
message and trailing checksum to */
149
uint8_t *orig_buffer;
151
size_t message_byte_length= to_apply.ByteSize();
154
size_t total_envelope_length= HEADER_TRAILER_BYTES + message_byte_length;
157
* Attempt allocation of raw memory buffer for the header,
158
* message and trailing checksum bytes.
160
buffer= static_cast<uint8_t *>(malloc(total_envelope_length));
163
errmsg_printf(ERRMSG_LVL_ERROR,
164
_("Failed to allocate enough memory to buffer header, transaction message, and trailing checksum bytes. Tried to allocate %" PRId64
165
" bytes. Error: %s\n"),
166
static_cast<int64_t>(total_envelope_length),
173
orig_buffer= buffer; /* We will free() orig_buffer, as buffer is moved during write */
176
* Do an atomic increment on the offset of the log file position
178
cur_offset= log_offset.fetch_and_add(static_cast<off_t>(total_envelope_length));
181
* We adjust cur_offset back to the original log_offset before
182
* the increment above...
184
cur_offset-= static_cast<off_t>((total_envelope_length));
151
187
* Write the header information, which is the message type and
152
188
* the length of the transaction message into the buffer
154
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
155
static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
156
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
157
static_cast<uint32_t>(message_byte_length), buffer);
190
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
191
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(static_cast<uint32_t>(message_byte_length), buffer);
160
194
* Now write the serialized transaction message, followed
161
195
* by the optional checksum into the buffer.
163
buffer= trx.SerializeWithCachedSizesToArray(buffer);
197
buffer= to_apply.SerializeWithCachedSizesToArray(buffer);
199
uint32_t checksum= 0;
167
*checksum_out= drizzled::algorithm::crc32(
168
reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
202
checksum= drizzled::hash::crc32(reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
173
205
/* We always write in network byte order */
174
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(*checksum_out, buffer);
175
/* Reset the pointer back to its original location... */
180
off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
185
* Do an atomic increment on the offset of the log file position
187
off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
206
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(checksum, buffer);
190
209
* Quick safety...if an error occurs above in another writer, the log
197
216
* the original offset where an error occurred.
199
218
log_offset= cur_offset;
203
223
/* Write the full buffer in one swoop */
206
written= pwrite(log_file, data, data_length, cur_offset);
226
written= pwrite(log_file, orig_buffer, total_envelope_length, cur_offset);
208
228
while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
210
if (unlikely(written != static_cast<ssize_t>(data_length)))
230
if (unlikely(written != static_cast<ssize_t>(total_envelope_length)))
212
char errmsg[STRERROR_MAX];
213
strerror_r(errno, errmsg, sizeof(errmsg));
214
232
errmsg_printf(ERRMSG_LVL_ERROR,
215
_("Failed to write full size of log entry. Tried to write %" PRId64
233
_("Failed to write full size of transaction. Tried to write %" PRId64
216
234
" bytes at offset %" PRId64 ", but only wrote %" PRId32 " bytes. Error: %s\n"),
217
static_cast<int64_t>(data_length),
235
static_cast<int64_t>(total_envelope_length),
218
236
static_cast<int64_t>(cur_offset),
219
static_cast<int32_t>(written),
237
static_cast<int64_t>(written),
223
241
* Reset the log's offset in case we want to produce a decent error message including
224
242
* the original offset where an error occurred.
226
244
log_offset= cur_offset;
229
int error_code= syncLogFile();
231
if (unlikely(error_code != 0))
233
char errmsg[STRERROR_MAX];
234
strerror_r(errno, errmsg, sizeof(errmsg));
235
errmsg_printf(ERRMSG_LVL_ERROR,
236
_("Failed to sync log file. Got error: %s\n"),
242
int TransactionLog::syncLogFile()
244
switch (flush_frequency)
246
case FLUSH_FREQUENCY_EVERY_WRITE:
247
return internal::my_sync(log_file, 0);
248
case FLUSH_FREQUENCY_EVERY_SECOND:
250
time_t now_time= time(NULL);
251
if (last_sync_time <= (now_time - 1))
253
last_sync_time= now_time;
254
return internal::my_sync(log_file, 0);
258
case FLUSH_FREQUENCY_OS:
264
const string &TransactionLog::getLogFilename()
266
return log_file_name;
269
const string &TransactionLog::getLogFilepath()
271
return log_file_path;
274
250
void TransactionLog::truncate()
252
bool orig_is_enabled= isEnabled();
256
* Wait a short amount of time before truncating. This just prevents error messages
257
* from being produced during a call to apply(). Calling disable() above
258
* means that once the current caller to apply() is done, no other calls are made to
259
* apply() before enable is reset to its original state
279
* This is NOT THREAD SAFE! DEBUG/TEST code only!
263
* This is DEBUG code only!
265
usleep(500); /* Sleep for half a second */
281
266
log_offset= (off_t) 0;
302
bool TransactionLog::hasError() const
307
void TransactionLog::clearError()
310
error_message.clear();
313
const string &TransactionLog::getErrorMessage() const
315
return error_message;
318
size_t TransactionLog::getLogEntrySize(const message::Transaction &trx)
320
return trx.ByteSize() + HEADER_TRAILER_BYTES;
290
static TransactionLog *transaction_log= NULL; /* The singleton transaction log */
292
static int init(drizzled::plugin::Registry ®istry)
294
if (sysvar_transaction_log_enabled)
296
transaction_log= new TransactionLog("transaction_log",
297
sysvar_transaction_log_file,
298
sysvar_transaction_log_checksum_enabled);
299
registry.add(transaction_log);
304
static int deinit(drizzled::plugin::Registry ®istry)
308
registry.remove(transaction_log);
309
delete transaction_log;
314
static void set_truncate_debug(Session *,
315
struct st_mysql_sys_var *,
320
* The const void * save comes directly from the check function,
321
* which should simply return the result from the set statement.
324
if (*(bool *)save != false)
325
transaction_log->truncate();
328
static DRIZZLE_SYSVAR_BOOL(enable,
329
sysvar_transaction_log_enabled,
331
N_("Enable transaction log"),
332
NULL, /* check func */
333
NULL, /* update func */
334
false /* default */);
336
static DRIZZLE_SYSVAR_BOOL(truncate_debug,
337
sysvar_transaction_log_truncate_debug,
339
N_("DEBUGGING - Truncate transaction log"),
340
NULL, /* check func */
341
set_truncate_debug, /* update func */
342
false /* default */);
344
static DRIZZLE_SYSVAR_STR(log_file,
345
sysvar_transaction_log_file,
347
N_("Path to the file to use for transaction log."),
348
NULL, /* check func */
349
NULL, /* update func*/
350
DEFAULT_LOG_FILE_PATH /* default */);
352
static DRIZZLE_SYSVAR_BOOL(enable_checksum,
353
sysvar_transaction_log_checksum_enabled,
355
N_("Enable CRC32 Checksumming"),
356
NULL, /* check func */
357
NULL, /* update func */
358
false /* default */);
360
static struct st_mysql_sys_var* system_variables[]= {
361
DRIZZLE_SYSVAR(enable),
362
DRIZZLE_SYSVAR(truncate_debug),
363
DRIZZLE_SYSVAR(log_file),
364
DRIZZLE_SYSVAR(enable_checksum),
368
drizzle_declare_plugin(transaction_log)
373
N_("Transaction Message Log"),
375
init, /* Plugin Init */
376
deinit, /* Plugin Deinit */
377
NULL, /* status variables */
378
system_variables, /* system variables */
379
NULL /* config options */
381
drizzle_declare_plugin_end;