1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2008-2009 Sun Microsystems
6
* This program is free software; you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation; either version 2 of the License, or
9
* (at your option) any later version.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with this program; if not, write to the Free Software
18
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24
* Defines the implementation of the default command log.
26
* @see drizzled/plugin/command_replicator.h
27
* @see drizzled/plugin/command_applier.h
31
* Currently, the log file uses this implementation:
33
* We have an atomic off_t called log_offset which keeps track of the
34
* offset into the log file for writing the next Command.
36
* We write Command message encapsulated in a 8-byte length header and a
37
* 4-byte checksum trailer.
39
* When writing a Command to the log, we calculate the length of the
40
* Command to be written. We then increment log_offset by the length
41
* of the Command plus sizeof(uint64_t) plus sizeof(uint32_t) and store
42
* this new offset in a local off_t called cur_offset (see CommandLog::apply().
43
* This compare and set is done in an atomic instruction.
45
* We then adjust the local off_t (cur_offset) back to the original
46
* offset by subtracting the length and sizeof(uint64_t) and sizeof(uint32_t).
48
* We then first write a 64-bit length and then the serialized transaction/command
49
* and optional checksum to our log file at our local cur_offset.
51
* --------------------------------------------------------------
52
* |<- 8 bytes ->|<- # Bytes of Command Message ->|<- 4 bytes ->|
53
* --------------------------------------------------------------
54
* | Length | Serialized Command Message | Checksum |
55
* --------------------------------------------------------------
59
* Possibly look at a scoreboard approach with multiple file segments. For
60
* right now, though, this is just a quick simple implementation to serve
61
* as a skeleton and a springboard.
63
* Also, we can move to a ZeroCopyStream implementation instead of using the
64
* string as a buffer in apply()
67
#include "command_log.h"
75
#include <drizzled/session.h>
76
#include <drizzled/set_var.h>
77
#include <drizzled/gettext.h>
78
#include <drizzled/message/replication.pb.h>
79
#include <drizzled/crc32.h>
82
using namespace drizzled;
85
* Command Log plugin system variable - Is the log enabled? Only used on init().
86
* The enable() and disable() methods of the CommandLog class control online
89
static bool sysvar_command_log_enabled= false;
90
/** Command Log plugin system variable - The path to the log file used */
91
static char* sysvar_command_log_file= NULL;
93
* Command Log plugin system variable - A debugging variable to assist
94
* in truncating the log file.
96
static bool sysvar_command_log_truncate_debug= false;
97
static const char DEFAULT_LOG_FILE_PATH[]= "command.log"; /* In datadir... */
99
* Command Log plugin system variable - Should we write a CRC32 checksum for
100
* each written Command message?
102
static bool sysvar_command_log_checksum_enabled= false;
104
CommandLog::CommandLog(const char *in_log_file_path, bool in_do_checksum)
106
plugin::CommandApplier(),
108
log_file_path(in_log_file_path)
110
is_enabled= true; /* If constructed, the plugin is enabled until taken offline with disable() */
112
do_checksum= in_do_checksum; /* Have to do here, not in initialization list b/c atomic<> */
114
/* Setup our log file and determine the next write offset... */
115
log_file= open(log_file_path, O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
118
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to open command log file %s. Got error: %s\n"),
126
* The offset of the next write is the current position of the log
127
* file, since it's opened in append mode...
129
log_offset= lseek(log_file, 0, SEEK_END);
135
CommandLog::~CommandLog()
137
/* Clear up any resources we've consumed */
138
if (isActive() && log_file != -1)
140
(void) close(log_file);
144
bool CommandLog::isActive()
146
return is_enabled && is_active;
149
void CommandLog::apply(const message::Command &to_apply)
152
* There is an issue on Solaris/SunStudio where if the std::string buffer is
153
* NOT initialized with the below, the code produces an EFAULT when accessing
154
* c_str() later on. Stoopid, but true.
156
string buffer(""); /* Buffer we will write serialized command to */
158
static const uint32_t HEADER_TRAILER_BYTES= sizeof(uint64_t) + /* 8-byte length header */
159
sizeof(uint32_t); /* 4 byte checksum trailer */
165
to_apply.SerializeToString(&buffer);
167
length= buffer.length();
170
* Do an atomic increment on the offset of the log file position
172
cur_offset= log_offset.fetch_and_add(static_cast<off_t>((HEADER_TRAILER_BYTES + length)));
175
* We adjust cur_offset back to the original log_offset before
176
* the increment above...
178
cur_offset-= static_cast<off_t>((HEADER_TRAILER_BYTES + length));
181
* Quick safety...if an error occurs below, the log file will
182
* not be active, therefore a caller could have been ready
183
* to write...but the log is crashed.
185
if (unlikely(state == CRASHED))
188
/* We always write in network byte order */
189
unsigned char nbo_length[8];
190
int8store(nbo_length, length);
192
/* Write the length header */
195
written= pwrite(log_file, nbo_length, sizeof(uint64_t), cur_offset);
197
while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
199
if (unlikely(written != sizeof(uint64_t)))
201
errmsg_printf(ERRMSG_LVL_ERROR,
202
_("Failed to write full size of command. Tried to write %" PRId64
203
" bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes. Error: %s\n"),
204
static_cast<int64_t>(sizeof(uint64_t)),
205
static_cast<int64_t>(cur_offset),
206
static_cast<int64_t>(written),
210
* Reset the log's offset in case we want to produce a decent error message including
211
* the original offset where an error occurred.
213
log_offset= cur_offset;
218
cur_offset+= static_cast<off_t>(written);
221
* Quick safety...if an error occurs above in another writer, the log
222
* file will be in a crashed state.
224
if (unlikely(state == CRASHED))
227
* Reset the log's offset in case we want to produce a decent error message including
228
* the original offset where an error occurred.
230
log_offset= cur_offset;
234
/* Write the command message itself */
237
written= pwrite(log_file, buffer.c_str(), length, cur_offset);
239
while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
241
if (unlikely(written != static_cast<ssize_t>(length)))
243
errmsg_printf(ERRMSG_LVL_ERROR,
244
_("Failed to write full serialized command. Tried to write %" PRId64
245
" bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes. Error: %s\n"),
246
static_cast<int64_t>(length),
247
static_cast<int64_t>(cur_offset),
248
static_cast<int64_t>(written),
252
* Reset the log's offset in case we want to produce a decent error message including
253
* the original offset where an error occurred.
255
log_offset= cur_offset;
259
cur_offset+= static_cast<off_t>(written);
262
* Quick safety...if an error occurs above in another writer, the log
263
* file will be in a crashed state.
265
if (unlikely(state == CRASHED))
268
* Reset the log's offset in case we want to produce a decent error message including
269
* the original offset where an error occurred.
271
log_offset= cur_offset;
275
uint32_t checksum= 0;
279
checksum= hash_crc32(buffer.c_str(), length);
282
/* We always write in network byte order */
283
unsigned char nbo_checksum[4];
284
int4store(nbo_checksum, checksum);
286
/* Write the checksum trailer */
289
written= pwrite(log_file, nbo_checksum, sizeof(uint32_t), cur_offset);
291
while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
293
if (unlikely(written != static_cast<ssize_t>(sizeof(uint32_t))))
295
errmsg_printf(ERRMSG_LVL_ERROR,
296
_("Failed to write full checksum of command. Tried to write %" PRId64
297
" bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes. Error: %s\n"),
298
static_cast<int64_t>(sizeof(uint32_t)),
299
static_cast<int64_t>(cur_offset),
300
static_cast<int64_t>(written),
304
* Reset the log's offset in case we want to produce a decent error message including
305
* the original offset where an error occurred.
307
log_offset= cur_offset;
313
void CommandLog::truncate()
315
bool orig_is_enabled= is_enabled;
319
* Wait a short amount of time before truncating. This just prevents error messages
320
* from being produced during a call to apply(). Setting is_enabled to false above
321
* means that once the current caller to apply() is done, no other calls are made to
322
* apply() before is_enabled is reset to its original state
326
* This is DEBUG code only!
328
usleep(500); /* Sleep for half a second */
329
log_offset= (off_t) 0;
333
result= ftruncate(log_file, log_offset);
335
while (result == -1 && errno == EINTR);
337
is_enabled= orig_is_enabled;
340
bool CommandLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,
341
string &out_filename) const
344
* Currently, we simply return the single logfile name
345
* Eventually, we'll have an index/hash with upper and
346
* lower bounds to look up a log file with a transaction id
348
out_filename.assign(log_file_path);
352
static CommandLog *command_log= NULL; /* The singleton command log */
354
static int init(drizzled::plugin::Registry ®istry)
356
if (sysvar_command_log_enabled)
358
command_log= new CommandLog(sysvar_command_log_file,
359
sysvar_command_log_checksum_enabled);
360
registry.add(command_log);
365
static int deinit(drizzled::plugin::Registry ®istry)
369
registry.remove(command_log);
375
static void set_truncate_debug(Session *,
376
struct st_mysql_sys_var *,
381
* The const void * save comes directly from the check function,
382
* which should simply return the result from the set statement.
385
if (*(bool *)save != false)
386
command_log->truncate();
389
static DRIZZLE_SYSVAR_BOOL(enable,
390
sysvar_command_log_enabled,
392
N_("Enable command log"),
393
NULL, /* check func */
394
NULL, /* update func */
395
false /* default */);
397
static DRIZZLE_SYSVAR_BOOL(truncate_debug,
398
sysvar_command_log_truncate_debug,
400
N_("DEBUGGING - Truncate command log"),
401
NULL, /* check func */
402
set_truncate_debug, /* update func */
403
false /* default */);
405
static DRIZZLE_SYSVAR_STR(log_file,
406
sysvar_command_log_file,
408
N_("Path to the file to use for command log."),
409
NULL, /* check func */
410
NULL, /* update func*/
411
DEFAULT_LOG_FILE_PATH /* default */);
413
static DRIZZLE_SYSVAR_BOOL(enable_checksum,
414
sysvar_command_log_checksum_enabled,
416
N_("Enable CRC32 Checksumming"),
417
NULL, /* check func */
418
NULL, /* update func */
419
false /* default */);
421
static struct st_mysql_sys_var* system_variables[]= {
422
DRIZZLE_SYSVAR(enable),
423
DRIZZLE_SYSVAR(truncate_debug),
424
DRIZZLE_SYSVAR(log_file),
425
DRIZZLE_SYSVAR(enable_checksum),
429
drizzle_declare_plugin(command_log)
434
N_("Command Message Log"),
436
init, /* Plugin Init */
437
deinit, /* Plugin Deinit */
438
NULL, /* status variables */
439
system_variables, /* system variables */
440
NULL /* config options */
442
drizzle_declare_plugin_end;