29
* Defines the implementation of the transaction log file descriptor.
24
* Defines the implementation of the default command log.
26
* @see drizzled/plugin/replicator.h
27
* @see drizzled/plugin/applier.h
33
31
* Currently, the log file uses this implementation:
35
33
* We have an atomic off_t called log_offset which keeps track of the
36
* offset into the log file for writing the next Transaction.
38
* We write Transaction message encapsulated in an 8-byte length/type header and a
39
* 4-byte checksum trailer.
41
* When writing a Transaction to the log, we calculate the length of the
42
* Transaction to be written. We then increment log_offset by the length
43
* of the Transaction plus 2 * sizeof(uint32_t) plus sizeof(uint32_t) and store
44
* this new offset in a local off_t called cur_offset (see TransactionLog::apply().
45
* This compare and set is done in an atomic instruction.
34
* offset into the log file for writing the next Command.
36
* When writing a Command to the log, we calculate the length of the
37
* Command to be written. We then increment log_offset by the length
38
* of the Command plus sizeof(uint64_t) and store this new offset in a
39
* local off_t called cur_offset (see CommandLog::apply(). This
40
* compare and set is done in an atomic instruction.
47
42
* We then adjust the local off_t (cur_offset) back to the original
48
* offset by subtracting the length and sizeof(uint32_t) and sizeof(uint32_t).
50
* We then first write a 64-bit length and then the serialized transaction/transaction
51
* and optional checksum to our log file at our local cur_offset.
53
* --------------------------------------------------------------------------------
54
* |<- 4 bytes ->|<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
55
* --------------------------------------------------------------------------------
56
* | Msg Type | Length | Serialized Transaction Message | Checksum |
57
* --------------------------------------------------------------------------------
43
* offset by subtracting the length and sizeof(uint64_t).
45
* We then first write a 64-bit length and then the serialized transaction/command
46
* to our log file at our local cur_offset.
63
52
* as a skeleton and a springboard.
67
#include "transaction_log.h"
55
#include "command_log.h"
57
#include <drizzled/session.h>
58
#include <drizzled/set_var.h>
59
#include <drizzled/gettext.h>
60
#include <drizzled/message/replication.pb.h>
77
#include <drizzled/internal/my_sys.h> /* for internal::my_sync */
78
#include <drizzled/errmsg_print.h>
79
#include <drizzled/gettext.h>
81
66
using namespace std;
82
67
using namespace drizzled;
84
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
70
* Command Log plugin system variable - Is the log enabled? Only used on init().
71
* The enable() and disable() methods of the CommandLog class control online
74
static bool sysvar_command_log_enabled= false;
75
/** Command Log plugin system variable - The path to the log file used */
76
static char* sysvar_command_log_file= NULL;
77
/** Command Log plugin system variable - A debugging variable to assist in truncating the log file. */
78
static bool sysvar_command_log_truncate_debug= false;
79
static const char DEFAULT_LOG_FILE_PATH[]= "command.log"; /* In datadir... */
86
TransactionLog::TransactionLog(const string in_log_file_path,
87
uint32_t in_sync_method) :
89
log_file_path(in_log_file_path),
92
sync_method(in_sync_method)
81
CommandLog::CommandLog(const char *in_log_file_path)
84
state(CommandLog::OFFLINE),
85
log_file_path(in_log_file_path)
87
is_enabled= true; /* If constructed, the plugin is enabled until taken offline with disable() */
94
90
/* Setup our log file and determine the next write offset... */
95
log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
91
log_file= open(log_file_path, O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
96
92
if (log_file == -1)
98
error_message.assign(_("Failed to open transaction log file "));
99
error_message.append(log_file_path);
100
error_message.append(" Got error: ");
101
error_message.append(strerror(errno));
102
error_message.push_back('\n');
94
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to open command log file. Got error: %s"), strerror(errno));
107
/* For convenience, grab the log file name from the path */
108
if (log_file_path.find_first_of('/') != string::npos)
110
/* Strip to last / */
112
tmp= log_file_path.substr(log_file_path.find_last_of('/') + 1);
113
log_file_name.assign(tmp);
116
log_file_name.assign(log_file_path);
119
100
* The offset of the next write is the current position of the log
120
101
* file, since it's opened in append mode...
122
103
log_offset= lseek(log_file, 0, SEEK_END);
105
state= CommandLog::ONLINE;
127
TransactionLog::~TransactionLog()
109
CommandLog::~CommandLog()
129
111
/* Clear up any resources we've consumed */
112
if (isActive() && log_file != -1)
132
114
(void) close(log_file);
136
off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
118
bool CommandLog::isActive()
120
return is_enabled && is_active;
123
void CommandLog::apply(message::Command *to_apply)
126
* There is an issue on Solaris/SunStudio where if the std::string buffer is
127
* NOT initialized with the below, the code produces an EFAULT when accessing
128
* c_str() later on. Stoopid, but true.
130
string buffer= string(""); /* Buffer we will write serialized command to */
136
to_apply->SerializeToString(&buffer);
138
length= buffer.length();
141
141
* Do an atomic increment on the offset of the log file position
143
off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
143
cur_offset= log_offset.fetch_and_add((off_t) (sizeof(uint64_t) + length));
146
146
* We adjust cur_offset back to the original log_offset before
147
147
* the increment above...
149
cur_offset-= static_cast<off_t>(data_length);
149
cur_offset-= (off_t) (sizeof(uint64_t) + length);
152
* Quick safety...if an error occurs below, the log file will
153
* not be active, therefore a caller could have been ready
154
* to write...but the log is crashed.
156
if (unlikely(state == CommandLog::CRASHED))
159
/* We always write in network byte order */
160
unsigned char nbo_length[8];
161
int8store(nbo_length, length);
165
written= pwrite(log_file, nbo_length, sizeof(uint64_t), cur_offset);
167
while (written == EINTR); /* Just retry the write when interrupted by a signal... */
169
if (unlikely(written != sizeof(uint64_t)))
171
errmsg_printf(ERRMSG_LVL_ERROR,
172
_("Failed to write full size of command. Tried to write %" PRId64 " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes. Error: %s\n"),
174
(int64_t) cur_offset,
179
* Reset the log's offset in case we want to produce a decent error message including
180
* the original offset where an error occurred.
182
log_offset= cur_offset;
187
cur_offset+= (off_t) written;
152
190
* Quick safety...if an error occurs above in another writer, the log
153
191
* file will be in a crashed state.
155
if (unlikely(state == CRASHED))
193
if (unlikely(state == CommandLog::CRASHED))
158
196
* Reset the log's offset in case we want to produce a decent error message including
159
197
* the original offset where an error occurred.
161
199
log_offset= cur_offset;
165
/* Write the full buffer in one swoop */
168
written= pwrite(log_file, data, data_length, cur_offset);
205
written= pwrite(log_file, buffer.c_str(), length, cur_offset);
170
while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
207
while (written == EINTR); /* Just retry the write when interrupted by a signal... */
172
if (unlikely(written != static_cast<ssize_t>(data_length)))
209
if (unlikely(written != (ssize_t) length))
174
211
errmsg_printf(ERRMSG_LVL_ERROR,
175
_("Failed to write full size of log entry. Tried to write %" PRId64
176
" bytes at offset %" PRId64 ", but only wrote %" PRId32 " bytes. Error: %s\n"),
177
static_cast<int64_t>(data_length),
178
static_cast<int64_t>(cur_offset),
179
static_cast<int64_t>(written),
212
_("Failed to write full serialized command. Tried to write %" PRId64 " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes. Error: %s\n"),
214
(int64_t) cur_offset,
180
216
strerror(errno));
184
220
* the original offset where an error occurred.
186
222
log_offset= cur_offset;
189
int error_code= syncLogFile();
191
if (unlikely(error_code != 0))
193
errmsg_printf(ERRMSG_LVL_ERROR,
194
_("Failed to sync log file. Got error: %s\n"),
200
int TransactionLog::syncLogFile()
204
case SYNC_METHOD_EVERY_WRITE:
205
return internal::my_sync(log_file, 0);
206
case SYNC_METHOD_EVERY_SECOND:
208
time_t now_time= time(NULL);
209
if (last_sync_time <= (now_time - 1))
211
last_sync_time= now_time;
212
return internal::my_sync(log_file, 0);
222
const string &TransactionLog::getLogFilename()
224
return log_file_name;
227
const string &TransactionLog::getLogFilepath()
229
return log_file_path;
232
void TransactionLog::truncate()
227
void CommandLog::truncate()
229
bool orig_is_enabled= is_enabled;
233
* Wait a short amount of time before truncating. This just prevents error messages
234
* from being produced during a call to apply(). Setting is_enabled to false above
235
* means that once the current caller to apply() is done, no other calls are made to
236
* apply() before is_enabled is reset to its original state
237
* This is NOT THREAD SAFE! DEBUG/TEST code only!
240
* This is DEBUG code only!
242
usleep(500); /* Sleep for half a second */
239
243
log_offset= (off_t) 0;
243
247
result= ftruncate(log_file, log_offset);
245
249
while (result == -1 && errno == EINTR);
248
bool TransactionLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,
249
string &out_filename) const
251
is_enabled= orig_is_enabled;
254
static CommandLog *command_log= NULL; /* The singleton command log */
256
static int init(drizzled::plugin::Registry ®istry)
258
if (sysvar_command_log_enabled)
260
command_log= new CommandLog(sysvar_command_log_file);
261
registry.add(command_log);
266
static int deinit(drizzled::plugin::Registry ®istry)
270
registry.remove(command_log);
276
static void set_truncate_debug(Session *, struct st_mysql_sys_var *, void *, const void *save)
252
* Currently, we simply return the single logfile name
253
* Eventually, we'll have an index/hash with upper and
254
* lower bounds to look up a log file with a transaction id
279
* The const void * save comes directly from the check function,
280
* which should simply return the result from the set statement.
256
out_filename.assign(log_file_path);
260
bool TransactionLog::hasError() const
265
void TransactionLog::clearError()
268
error_message.clear();
271
const string &TransactionLog::getErrorMessage() const
273
return error_message;
283
if (*(bool *)save != false)
284
command_log->truncate();
287
static DRIZZLE_SYSVAR_BOOL(enable,
288
sysvar_command_log_enabled,
290
N_("Enable command log"),
291
NULL, /* check func */
292
NULL, /* update func */
293
false /* default */);
295
static DRIZZLE_SYSVAR_BOOL(truncate_debug,
296
sysvar_command_log_truncate_debug,
298
N_("DEBUGGING - Truncate command log"),
299
NULL, /* check func */
300
set_truncate_debug, /* update func */
301
false /* default */);
303
static DRIZZLE_SYSVAR_STR(log_file,
304
sysvar_command_log_file,
306
N_("Path to the file to use for command log."),
307
NULL, /* check func */
308
NULL, /* update func*/
309
DEFAULT_LOG_FILE_PATH /* default */);
311
static struct st_mysql_sys_var* system_variables[]= {
312
DRIZZLE_SYSVAR(enable),
313
DRIZZLE_SYSVAR(truncate_debug),
314
DRIZZLE_SYSVAR(log_file),
318
drizzle_declare_plugin(command_log)
323
N_("Simple Command Message Log"),
325
init, /* Plugin Init */
326
deinit, /* Plugin Deinit */
327
NULL, /* status variables */
328
system_variables, /* system variables */
329
NULL /* config options */
331
drizzle_declare_plugin_end;