29
* Defines the implementation of the transaction log file descriptor.
24
* Defines the implementation of the default command log.
26
* @see drizzled/plugin/command_replicator.h
27
* @see drizzled/plugin/command_applier.h
33
31
* Currently, the log file uses this implementation:
35
33
* We have an atomic off_t called log_offset which keeps track of the
36
* offset into the log file for writing the next Transaction.
34
* offset into the log file for writing the next Command.
38
* We write Transaction message encapsulated in an 8-byte length/type header and a
36
* We write Command message encapsulated in a 8-byte length header and a
39
37
* 4-byte checksum trailer.
41
* When writing a Transaction to the log, we calculate the length of the
42
* Transaction to be written. We then increment log_offset by the length
43
* of the Transaction plus 2 * sizeof(uint32_t) plus sizeof(uint32_t) and store
44
* this new offset in a local off_t called cur_offset (see TransactionLog::apply().
39
* When writing a Command to the log, we calculate the length of the
40
* Command to be written. We then increment log_offset by the length
41
* of the Command plus sizeof(uint64_t) plus sizeof(uint32_t) and store
42
* this new offset in a local off_t called cur_offset (see CommandLog::apply().
45
43
* This compare and set is done in an atomic instruction.
47
45
* We then adjust the local off_t (cur_offset) back to the original
48
* offset by subtracting the length and sizeof(uint32_t) and sizeof(uint32_t).
46
* offset by subtracting the length and sizeof(uint64_t) and sizeof(uint32_t).
50
* We then first write a 64-bit length and then the serialized transaction/transaction
48
* We then first write a 64-bit length and then the serialized transaction/command
51
49
* and optional checksum to our log file at our local cur_offset.
53
* --------------------------------------------------------------------------------
54
* |<- 4 bytes ->|<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
55
* --------------------------------------------------------------------------------
56
* | Msg Type | Length | Serialized Transaction Message | Checksum |
57
* --------------------------------------------------------------------------------
51
* --------------------------------------------------------------
52
* |<- 8 bytes ->|<- # Bytes of Command Message ->|<- 4 bytes ->|
53
* --------------------------------------------------------------
54
* | Length | Serialized Command Message | Checksum |
55
* --------------------------------------------------------------
61
59
* Possibly look at a scoreboard approach with multiple file segments. For
62
60
* right now, though, this is just a quick simple implementation to serve
63
61
* as a skeleton and a springboard.
63
* Also, we can move to a ZeroCopyStream implementation instead of using the
64
* string as a buffer in apply()
67
#include "transaction_log.h"
67
#include <drizzled/server_includes.h>
68
#include "command_log.h"
71
70
#include <unistd.h>
77
#include <drizzled/internal/my_sys.h> /* for internal::my_sync */
78
#include <drizzled/errmsg_print.h>
76
#include <drizzled/session.h>
77
#include <drizzled/set_var.h>
79
78
#include <drizzled/gettext.h>
79
#include <drizzled/message/replication.pb.h>
80
#include <drizzled/crc32.h>
81
82
using namespace std;
82
83
using namespace drizzled;
84
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
86
* Command Log plugin system variable - Is the log enabled? Only used on init().
87
* The enable() and disable() methods of the CommandLog class control online
90
static bool sysvar_command_log_enabled= false;
91
/** Command Log plugin system variable - The path to the log file used */
92
static char* sysvar_command_log_file= NULL;
94
* Command Log plugin system variable - A debugging variable to assist
95
* in truncating the log file.
97
static bool sysvar_command_log_truncate_debug= false;
98
static const char DEFAULT_LOG_FILE_PATH[]= "command.log"; /* In datadir... */
100
* Command Log plugin system variable - Should we write a CRC32 checksum for
101
* each written Command message?
103
static bool sysvar_command_log_checksum_enabled= false;
86
TransactionLog::TransactionLog(const string in_log_file_path,
87
uint32_t in_sync_method) :
105
CommandLog::CommandLog(string name_arg,
106
const char *in_log_file_path, bool in_do_checksum)
108
plugin::CommandApplier(name_arg),
89
log_file_path(in_log_file_path),
92
sync_method(in_sync_method)
110
log_file_path(in_log_file_path)
112
do_checksum= in_do_checksum; /* Have to do here, not in initialization list b/c atomic<> */
94
114
/* Setup our log file and determine the next write offset... */
95
log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
115
log_file= open(log_file_path, O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
96
116
if (log_file == -1)
98
error_message.assign(_("Failed to open transaction log file "));
99
error_message.append(log_file_path);
100
error_message.append(" Got error: ");
101
error_message.append(strerror(errno));
102
error_message.push_back('\n');
118
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to open command log file %s. Got error: %s\n"),
107
/* For convenience, grab the log file name from the path */
108
if (log_file_path.find_first_of('/') != string::npos)
110
/* Strip to last / */
112
tmp= log_file_path.substr(log_file_path.find_last_of('/') + 1);
113
log_file_name.assign(tmp);
116
log_file_name.assign(log_file_path);
119
126
* The offset of the next write is the current position of the log
120
127
* file, since it's opened in append mode...
127
TransactionLog::~TransactionLog()
134
CommandLog::~CommandLog()
129
136
/* Clear up any resources we've consumed */
137
if (isEnabled() && log_file != -1)
132
139
(void) close(log_file);
136
off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
143
void CommandLog::apply(const message::Command &to_apply)
146
* There is an issue on Solaris/SunStudio where if the std::string buffer is
147
* NOT initialized with the below, the code produces an EFAULT when accessing
148
* c_str() later on. Stoopid, but true.
150
string buffer(""); /* Buffer we will write serialized command to */
152
static const uint32_t HEADER_TRAILER_BYTES= sizeof(uint64_t) + /* 8-byte length header */
153
sizeof(uint32_t); /* 4 byte checksum trailer */
159
to_apply.SerializeToString(&buffer);
161
length= buffer.length();
141
164
* Do an atomic increment on the offset of the log file position
143
off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
166
cur_offset= log_offset.fetch_and_add(static_cast<off_t>((HEADER_TRAILER_BYTES + length)));
146
169
* We adjust cur_offset back to the original log_offset before
147
170
* the increment above...
149
cur_offset-= static_cast<off_t>(data_length);
152
* Quick safety...if an error occurs above in another writer, the log
153
* file will be in a crashed state.
155
if (unlikely(state == CRASHED))
158
* Reset the log's offset in case we want to produce a decent error message including
159
* the original offset where an error occurred.
161
log_offset= cur_offset;
165
/* Write the full buffer in one swoop */
168
written= pwrite(log_file, data, data_length, cur_offset);
170
while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
172
if (unlikely(written != static_cast<ssize_t>(data_length)))
174
errmsg_printf(ERRMSG_LVL_ERROR,
175
_("Failed to write full size of log entry. Tried to write %" PRId64
176
" bytes at offset %" PRId64 ", but only wrote %" PRId32 " bytes. Error: %s\n"),
177
static_cast<int64_t>(data_length),
178
static_cast<int64_t>(cur_offset),
179
static_cast<int64_t>(written),
183
* Reset the log's offset in case we want to produce a decent error message including
184
* the original offset where an error occurred.
186
log_offset= cur_offset;
189
int error_code= syncLogFile();
191
if (unlikely(error_code != 0))
193
errmsg_printf(ERRMSG_LVL_ERROR,
194
_("Failed to sync log file. Got error: %s\n"),
200
int TransactionLog::syncLogFile()
204
case SYNC_METHOD_EVERY_WRITE:
205
return internal::my_sync(log_file, 0);
206
case SYNC_METHOD_EVERY_SECOND:
208
time_t now_time= time(NULL);
209
if (last_sync_time <= (now_time - 1))
211
last_sync_time= now_time;
212
return internal::my_sync(log_file, 0);
222
const string &TransactionLog::getLogFilename()
224
return log_file_name;
227
const string &TransactionLog::getLogFilepath()
229
return log_file_path;
232
void TransactionLog::truncate()
172
cur_offset-= static_cast<off_t>((HEADER_TRAILER_BYTES + length));
175
* Quick safety...if an error occurs below, the log file will
176
* not be active, therefore a caller could have been ready
177
* to write...but the log is crashed.
179
if (unlikely(state == CRASHED))
182
/* We always write in network byte order */
183
unsigned char nbo_length[8];
184
int8store(nbo_length, (uint64_t) length);
186
/* Write the length header */
189
written= pwrite(log_file, nbo_length, sizeof(uint64_t), cur_offset);
191
while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
193
if (unlikely(written != sizeof(uint64_t)))
195
errmsg_printf(ERRMSG_LVL_ERROR,
196
_("Failed to write full size of command. Tried to write %" PRId64
197
" bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes. Error: %s\n"),
198
static_cast<int64_t>(sizeof(uint64_t)),
199
static_cast<int64_t>(cur_offset),
200
static_cast<int64_t>(written),
204
* Reset the log's offset in case we want to produce a decent error message including
205
* the original offset where an error occurred.
207
log_offset= cur_offset;
212
cur_offset+= static_cast<off_t>(written);
215
* Quick safety...if an error occurs above in another writer, the log
216
* file will be in a crashed state.
218
if (unlikely(state == CRASHED))
221
* Reset the log's offset in case we want to produce a decent error message including
222
* the original offset where an error occurred.
224
log_offset= cur_offset;
228
/* Write the command message itself */
231
written= pwrite(log_file, buffer.c_str(), length, cur_offset);
233
while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
235
if (unlikely(written != static_cast<ssize_t>(length)))
237
errmsg_printf(ERRMSG_LVL_ERROR,
238
_("Failed to write full serialized command. Tried to write %" PRId64
239
" bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes. Error: %s\n"),
240
static_cast<int64_t>(length),
241
static_cast<int64_t>(cur_offset),
242
static_cast<int64_t>(written),
246
* Reset the log's offset in case we want to produce a decent error message including
247
* the original offset where an error occurred.
249
log_offset= cur_offset;
253
cur_offset+= static_cast<off_t>(written);
256
* Quick safety...if an error occurs above in another writer, the log
257
* file will be in a crashed state.
259
if (unlikely(state == CRASHED))
262
* Reset the log's offset in case we want to produce a decent error message including
263
* the original offset where an error occurred.
265
log_offset= cur_offset;
269
uint32_t checksum= 0;
273
checksum= hash_crc32(buffer.c_str(), length);
276
/* We always write in network byte order */
277
unsigned char nbo_checksum[4];
278
int4store(nbo_checksum, checksum);
280
/* Write the checksum trailer */
283
written= pwrite(log_file, nbo_checksum, sizeof(uint32_t), cur_offset);
285
while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
287
if (unlikely(written != static_cast<ssize_t>(sizeof(uint32_t))))
289
errmsg_printf(ERRMSG_LVL_ERROR,
290
_("Failed to write full checksum of command. Tried to write %" PRId64
291
" bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes. Error: %s\n"),
292
static_cast<int64_t>(sizeof(uint32_t)),
293
static_cast<int64_t>(cur_offset),
294
static_cast<int64_t>(written),
298
* Reset the log's offset in case we want to produce a decent error message including
299
* the original offset where an error occurred.
301
log_offset= cur_offset;
307
void CommandLog::truncate()
309
bool orig_is_enabled= isEnabled();
313
* Wait a short amount of time before truncating. This just prevents error messages
314
* from being produced during a call to apply(). Calling disable() above
315
* means that once the current caller to apply() is done, no other calls are made to
316
* apply() before enable is reset to its original state
237
* This is NOT THREAD SAFE! DEBUG/TEST code only!
320
* This is DEBUG code only!
322
usleep(500); /* Sleep for half a second */
239
323
log_offset= (off_t) 0;
260
bool TransactionLog::hasError() const
265
void TransactionLog::clearError()
268
error_message.clear();
271
const string &TransactionLog::getErrorMessage() const
273
return error_message;
347
static CommandLog *command_log= NULL; /* The singleton command log */
349
static int init(drizzled::plugin::Registry ®istry)
351
if (sysvar_command_log_enabled)
353
command_log= new CommandLog("command_log",
354
sysvar_command_log_file,
355
sysvar_command_log_checksum_enabled);
356
registry.add(command_log);
361
static int deinit(drizzled::plugin::Registry ®istry)
365
registry.remove(command_log);
371
static void set_truncate_debug(Session *,
372
struct st_mysql_sys_var *,
377
* The const void * save comes directly from the check function,
378
* which should simply return the result from the set statement.
381
if (*(bool *)save != false)
382
command_log->truncate();
385
static DRIZZLE_SYSVAR_BOOL(enable,
386
sysvar_command_log_enabled,
388
N_("Enable command log"),
389
NULL, /* check func */
390
NULL, /* update func */
391
false /* default */);
393
static DRIZZLE_SYSVAR_BOOL(truncate_debug,
394
sysvar_command_log_truncate_debug,
396
N_("DEBUGGING - Truncate command log"),
397
NULL, /* check func */
398
set_truncate_debug, /* update func */
399
false /* default */);
401
static DRIZZLE_SYSVAR_STR(log_file,
402
sysvar_command_log_file,
404
N_("Path to the file to use for command log."),
405
NULL, /* check func */
406
NULL, /* update func*/
407
DEFAULT_LOG_FILE_PATH /* default */);
409
static DRIZZLE_SYSVAR_BOOL(enable_checksum,
410
sysvar_command_log_checksum_enabled,
412
N_("Enable CRC32 Checksumming"),
413
NULL, /* check func */
414
NULL, /* update func */
415
false /* default */);
417
static struct st_mysql_sys_var* system_variables[]= {
418
DRIZZLE_SYSVAR(enable),
419
DRIZZLE_SYSVAR(truncate_debug),
420
DRIZZLE_SYSVAR(log_file),
421
DRIZZLE_SYSVAR(enable_checksum),
425
drizzle_declare_plugin(command_log)
430
N_("Command Message Log"),
432
init, /* Plugin Init */
433
deinit, /* Plugin Deinit */
434
NULL, /* status variables */
435
system_variables, /* system variables */
436
NULL /* config options */
438
drizzle_declare_plugin_end;