29
* Defines the implementation of the transaction log file descriptor.
24
* Defines the implementation of the default command log.
26
* @see drizzled/plugin/command_replicator.h
27
* @see drizzled/plugin/command_applier.h
33
* Currently, the transaction log file uses a simple, single-file, append-only
31
* Currently, the log file uses this implementation:
36
33
* We have an atomic off_t called log_offset which keeps track of the
37
* offset into the log file for writing the next log entry. The log
38
* entries are written, one after the other, in the following way:
41
* --------------------------------------
42
* |<- 4 bytes ->|<- # Bytes of Entry ->|
43
* --------------------------------------
44
* | Entry Type | Serialized Entry |
45
* --------------------------------------
48
* The Entry Type is an integer defined as an enumeration in the
49
* /drizzled/message/transaction.proto file called TransactionLogEntry::Type.
51
* Each transaction log entry type is written to the log differently. Here,
52
* we cover the format of each log entry type.
54
* Committed and Prepared Transaction Log Entries
55
* -----------------------------------------------
58
* ------------------------------------------------------------------
59
* |<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
60
* ------------------------------------------------------------------
61
* | Length | Serialized Transaction Message | Checksum |
62
* ------------------------------------------------------------------
34
* offset into the log file for writing the next Command.
36
* We write Command message encapsulated in a 8-byte length header and a
37
* 4-byte checksum trailer.
39
* When writing a Command to the log, we calculate the length of the
40
* Command to be written. We then increment log_offset by the length
41
* of the Command plus sizeof(uint64_t) plus sizeof(uint32_t) and store
42
* this new offset in a local off_t called cur_offset (see CommandLog::apply().
43
* This compare and set is done in an atomic instruction.
45
* We then adjust the local off_t (cur_offset) back to the original
46
* offset by subtracting the length and sizeof(uint64_t) and sizeof(uint32_t).
48
* We then first write a 64-bit length and then the serialized transaction/command
49
* and optional checksum to our log file at our local cur_offset.
51
* --------------------------------------------------------------
52
* |<- 8 bytes ->|<- # Bytes of Command Message ->|<- 4 bytes ->|
53
* --------------------------------------------------------------
54
* | Length | Serialized Command Message | Checksum |
55
* --------------------------------------------------------------
67
59
* Possibly look at a scoreboard approach with multiple file segments. For
68
60
* right now, though, this is just a quick simple implementation to serve
69
61
* as a skeleton and a springboard.
63
* Also, we can move to a ZeroCopyStream implementation instead of using the
64
* string as a buffer in apply()
73
#include "transaction_log.h"
67
#include "command_log.h"
77
69
#include <unistd.h>
83
#include <drizzled/internal/my_sys.h> /* for internal::my_sync */
84
#include <drizzled/errmsg_print.h>
75
#include <drizzled/session.h>
76
#include <drizzled/set_var.h>
85
77
#include <drizzled/gettext.h>
86
#include <drizzled/message/transaction.pb.h>
87
#include <drizzled/transaction_services.h>
88
#include <drizzled/algorithm/crc32.h>
90
#include <google/protobuf/io/coded_stream.h>
78
#include <drizzled/message/replication.pb.h>
79
#include <drizzled/crc32.h>
92
81
using namespace std;
93
82
using namespace drizzled;
94
using namespace google;
96
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
98
TransactionLog::TransactionLog(const string in_log_file_path,
99
uint32_t in_flush_frequency,
100
bool in_do_checksum) :
85
* Command Log plugin system variable - Is the log enabled? Only used on init().
86
* The enable() and disable() methods of the CommandLog class control online
89
static bool sysvar_command_log_enabled= false;
90
/** Command Log plugin system variable - The path to the log file used */
91
static char* sysvar_command_log_file= NULL;
93
* Command Log plugin system variable - A debugging variable to assist
94
* in truncating the log file.
96
static bool sysvar_command_log_truncate_debug= false;
97
static const char DEFAULT_LOG_FILE_PATH[]= "command.log"; /* In datadir... */
99
* Command Log plugin system variable - Should we write a CRC32 checksum for
100
* each written Command message?
102
static bool sysvar_command_log_checksum_enabled= false;
104
CommandLog::CommandLog(string name_arg,
105
const char *in_log_file_path, bool in_do_checksum)
107
plugin::CommandApplier(name_arg),
102
log_file_path(in_log_file_path),
105
flush_frequency(in_flush_frequency),
106
do_checksum(in_do_checksum)
109
log_file_path(in_log_file_path)
111
is_enabled= true; /* If constructed, the plugin is enabled until taken offline with disable() */
113
do_checksum= in_do_checksum; /* Have to do here, not in initialization list b/c atomic<> */
108
115
/* Setup our log file and determine the next write offset... */
109
log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
116
log_file= open(log_file_path, O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
110
117
if (log_file == -1)
112
char errmsg[STRERROR_MAX];
113
strerror_r(errno, errmsg, sizeof(errmsg));
114
error_message.assign(_("Failed to open transaction log file "));
115
error_message.append(log_file_path);
116
error_message.append(" Got error: ");
117
error_message.append(errmsg);
118
error_message.push_back('\n');
119
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to open command log file %s. Got error: %s\n"),
123
/* For convenience, grab the log file name from the path */
124
if (log_file_path.find_first_of('/') != string::npos)
126
/* Strip to last / */
128
tmp= log_file_path.substr(log_file_path.find_last_of('/') + 1);
129
log_file_name.assign(tmp);
132
log_file_name.assign(log_file_path);
135
127
* The offset of the next write is the current position of the log
136
128
* file, since it's opened in append mode...
138
130
log_offset= lseek(log_file, 0, SEEK_END);
143
uint8_t *TransactionLog::packTransactionIntoLogEntry(const message::Transaction &trx,
145
uint32_t *checksum_out)
147
uint8_t *orig_buffer= buffer;
148
size_t message_byte_length= trx.ByteSize();
151
* Write the header information, which is the message type and
152
* the length of the transaction message into the buffer
154
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
155
static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
156
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
157
static_cast<uint32_t>(message_byte_length), buffer);
136
CommandLog::~CommandLog()
138
/* Clear up any resources we've consumed */
139
if (isActive() && log_file != -1)
141
(void) close(log_file);
145
bool CommandLog::isActive()
147
return is_enabled && is_active;
150
void CommandLog::apply(const message::Command &to_apply)
153
* There is an issue on Solaris/SunStudio where if the std::string buffer is
154
* NOT initialized with the below, the code produces an EFAULT when accessing
155
* c_str() later on. Stoopid, but true.
157
string buffer(""); /* Buffer we will write serialized command to */
159
static const uint32_t HEADER_TRAILER_BYTES= sizeof(uint64_t) + /* 8-byte length header */
160
sizeof(uint32_t); /* 4 byte checksum trailer */
166
to_apply.SerializeToString(&buffer);
168
length= buffer.length();
171
* Do an atomic increment on the offset of the log file position
173
cur_offset= log_offset.fetch_and_add(static_cast<off_t>((HEADER_TRAILER_BYTES + length)));
176
* We adjust cur_offset back to the original log_offset before
177
* the increment above...
179
cur_offset-= static_cast<off_t>((HEADER_TRAILER_BYTES + length));
182
* Quick safety...if an error occurs below, the log file will
183
* not be active, therefore a caller could have been ready
184
* to write...but the log is crashed.
186
if (unlikely(state == CRASHED))
189
/* We always write in network byte order */
190
unsigned char nbo_length[8];
191
int8store(nbo_length, length);
193
/* Write the length header */
196
written= pwrite(log_file, nbo_length, sizeof(uint64_t), cur_offset);
198
while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
200
if (unlikely(written != sizeof(uint64_t)))
202
errmsg_printf(ERRMSG_LVL_ERROR,
203
_("Failed to write full size of command. Tried to write %" PRId64
204
" bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes. Error: %s\n"),
205
static_cast<int64_t>(sizeof(uint64_t)),
206
static_cast<int64_t>(cur_offset),
207
static_cast<int64_t>(written),
211
* Reset the log's offset in case we want to produce a decent error message including
212
* the original offset where an error occurred.
214
log_offset= cur_offset;
219
cur_offset+= static_cast<off_t>(written);
222
* Quick safety...if an error occurs above in another writer, the log
223
* file will be in a crashed state.
225
if (unlikely(state == CRASHED))
228
* Reset the log's offset in case we want to produce a decent error message including
229
* the original offset where an error occurred.
231
log_offset= cur_offset;
235
/* Write the command message itself */
238
written= pwrite(log_file, buffer.c_str(), length, cur_offset);
240
while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
242
if (unlikely(written != static_cast<ssize_t>(length)))
244
errmsg_printf(ERRMSG_LVL_ERROR,
245
_("Failed to write full serialized command. Tried to write %" PRId64
246
" bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes. Error: %s\n"),
247
static_cast<int64_t>(length),
248
static_cast<int64_t>(cur_offset),
249
static_cast<int64_t>(written),
253
* Reset the log's offset in case we want to produce a decent error message including
254
* the original offset where an error occurred.
256
log_offset= cur_offset;
260
cur_offset+= static_cast<off_t>(written);
263
* Quick safety...if an error occurs above in another writer, the log
264
* file will be in a crashed state.
266
if (unlikely(state == CRASHED))
269
* Reset the log's offset in case we want to produce a decent error message including
270
* the original offset where an error occurred.
272
log_offset= cur_offset;
276
uint32_t checksum= 0;
280
checksum= hash_crc32(buffer.c_str(), length);
283
/* We always write in network byte order */
284
unsigned char nbo_checksum[4];
285
int4store(nbo_checksum, checksum);
287
/* Write the checksum trailer */
290
written= pwrite(log_file, nbo_checksum, sizeof(uint32_t), cur_offset);
292
while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
294
if (unlikely(written != static_cast<ssize_t>(sizeof(uint32_t))))
296
errmsg_printf(ERRMSG_LVL_ERROR,
297
_("Failed to write full checksum of command. Tried to write %" PRId64
298
" bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes. Error: %s\n"),
299
static_cast<int64_t>(sizeof(uint32_t)),
300
static_cast<int64_t>(cur_offset),
301
static_cast<int64_t>(written),
305
* Reset the log's offset in case we want to produce a decent error message including
306
* the original offset where an error occurred.
308
log_offset= cur_offset;
314
void CommandLog::truncate()
316
bool orig_is_enabled= is_enabled;
160
* Now write the serialized transaction message, followed
161
* by the optional checksum into the buffer.
163
buffer= trx.SerializeWithCachedSizesToArray(buffer);
167
*checksum_out= drizzled::algorithm::crc32(
168
reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
173
/* We always write in network byte order */
174
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(*checksum_out, buffer);
175
/* Reset the pointer back to its original location... */
180
off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
185
* Do an atomic increment on the offset of the log file position
187
off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
190
* Quick safety...if an error occurs above in another writer, the log
191
* file will be in a crashed state.
193
if (unlikely(state == CRASHED))
196
* Reset the log's offset in case we want to produce a decent error message including
197
* the original offset where an error occurred.
199
log_offset= cur_offset;
203
/* Write the full buffer in one swoop */
206
written= pwrite(log_file, data, data_length, cur_offset);
208
while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
210
if (unlikely(written != static_cast<ssize_t>(data_length)))
212
char errmsg[STRERROR_MAX];
213
strerror_r(errno, errmsg, sizeof(errmsg));
214
errmsg_printf(error::ERROR,
215
_("Failed to write full size of log entry. Tried to write %" PRId64
216
" bytes at offset %" PRId64 ", but only wrote %" PRId32 " bytes. Error: %s\n"),
217
static_cast<int64_t>(data_length),
218
static_cast<int64_t>(cur_offset),
219
static_cast<int32_t>(written),
223
* Reset the log's offset in case we want to produce a decent error message including
224
* the original offset where an error occurred.
226
log_offset= cur_offset;
229
int error_code= syncLogFile();
231
if (unlikely(error_code != 0))
233
sql_perror(_("Failed to sync log file."));
239
int TransactionLog::syncLogFile()
241
switch (flush_frequency)
243
case FLUSH_FREQUENCY_EVERY_WRITE:
244
return internal::my_sync(log_file, 0);
245
case FLUSH_FREQUENCY_EVERY_SECOND:
247
time_t now_time= time(NULL);
248
if (last_sync_time <= (now_time - 1))
250
last_sync_time= now_time;
251
return internal::my_sync(log_file, 0);
255
case FLUSH_FREQUENCY_OS:
261
const string &TransactionLog::getLogFilename()
263
return log_file_name;
266
const string &TransactionLog::getLogFilepath()
268
return log_file_path;
271
void TransactionLog::truncate()
320
* Wait a short amount of time before truncating. This just prevents error messages
321
* from being produced during a call to apply(). Setting is_enabled to false above
322
* means that once the current caller to apply() is done, no other calls are made to
323
* apply() before is_enabled is reset to its original state
276
* This is NOT THREAD SAFE! DEBUG/TEST code only!
327
* This is DEBUG code only!
329
usleep(500); /* Sleep for half a second */
278
330
log_offset= (off_t) 0;
299
bool TransactionLog::hasError() const
304
void TransactionLog::clearError()
307
error_message.clear();
310
const string &TransactionLog::getErrorMessage() const
312
return error_message;
315
size_t TransactionLog::getLogEntrySize(const message::Transaction &trx)
317
return trx.ByteSize() + HEADER_TRAILER_BYTES;
353
static CommandLog *command_log= NULL; /* The singleton command log */
355
static int init(drizzled::plugin::Registry ®istry)
357
if (sysvar_command_log_enabled)
359
command_log= new CommandLog("command_log",
360
sysvar_command_log_file,
361
sysvar_command_log_checksum_enabled);
362
registry.add(command_log);
367
static int deinit(drizzled::plugin::Registry ®istry)
371
registry.remove(command_log);
377
static void set_truncate_debug(Session *,
378
struct st_mysql_sys_var *,
383
* The const void * save comes directly from the check function,
384
* which should simply return the result from the set statement.
387
if (*(bool *)save != false)
388
command_log->truncate();
391
static DRIZZLE_SYSVAR_BOOL(enable,
392
sysvar_command_log_enabled,
394
N_("Enable command log"),
395
NULL, /* check func */
396
NULL, /* update func */
397
false /* default */);
399
static DRIZZLE_SYSVAR_BOOL(truncate_debug,
400
sysvar_command_log_truncate_debug,
402
N_("DEBUGGING - Truncate command log"),
403
NULL, /* check func */
404
set_truncate_debug, /* update func */
405
false /* default */);
407
static DRIZZLE_SYSVAR_STR(log_file,
408
sysvar_command_log_file,
410
N_("Path to the file to use for command log."),
411
NULL, /* check func */
412
NULL, /* update func*/
413
DEFAULT_LOG_FILE_PATH /* default */);
415
static DRIZZLE_SYSVAR_BOOL(enable_checksum,
416
sysvar_command_log_checksum_enabled,
418
N_("Enable CRC32 Checksumming"),
419
NULL, /* check func */
420
NULL, /* update func */
421
false /* default */);
423
static struct st_mysql_sys_var* system_variables[]= {
424
DRIZZLE_SYSVAR(enable),
425
DRIZZLE_SYSVAR(truncate_debug),
426
DRIZZLE_SYSVAR(log_file),
427
DRIZZLE_SYSVAR(enable_checksum),
431
drizzle_declare_plugin(command_log)
436
N_("Command Message Log"),
438
init, /* Plugin Init */
439
deinit, /* Plugin Deinit */
440
NULL, /* status variables */
441
system_variables, /* system variables */
442
NULL /* config options */
444
drizzle_declare_plugin_end;