1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2008-2009 Sun Microsystems
5
* Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
9
* Jay Pipes <jaypipes@gmail.com.com>
11
* This program is free software; you can redistribute it and/or modify
12
* it under the terms of the GNU General Public License as published by
13
* the Free Software Foundation; either version 2 of the License, or
14
* (at your option) any later version.
16
* This program is distributed in the hope that it will be useful,
17
* but WITHOUT ANY WARRANTY; without even the implied warranty of
18
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19
* GNU General Public License for more details.
21
* You should have received a copy of the GNU General Public License
22
* along with this program; if not, write to the Free Software
23
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
29
* Defines the implementation of the transaction log file descriptor.
33
* Currently, the transaction log file uses a simple, single-file, append-only
36
* We have an atomic off_t called log_offset which keeps track of the
37
* offset into the log file for writing the next log entry. The log
38
* entries are written, one after the other, in the following way:
41
* --------------------------------------
42
* |<- 4 bytes ->|<- # Bytes of Entry ->|
43
* --------------------------------------
44
* | Entry Type | Serialized Entry |
45
* --------------------------------------
48
* The Entry Type is an integer defined as an enumeration in the
49
* /drizzled/message/transaction.proto file called TransactionLogEntry::Type.
51
* Each transaction log entry type is written to the log differently. Here,
52
* we cover the format of each log entry type.
54
* Committed and Prepared Transaction Log Entries
55
* -----------------------------------------------
58
* ------------------------------------------------------------------
59
* |<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
60
* ------------------------------------------------------------------
61
* | Length | Serialized Transaction Message | Checksum |
62
* ------------------------------------------------------------------
67
* Possibly look at a scoreboard approach with multiple file segments. For
68
* right now, though, this is just a quick simple implementation to serve
69
* as a skeleton and a springboard.
73
#include "transaction_log.h"
83
#include <drizzled/internal/my_sys.h> /* for internal::my_sync */
84
#include <drizzled/errmsg_print.h>
85
#include <drizzled/gettext.h>
86
#include <drizzled/message/transaction.pb.h>
87
#include <drizzled/transaction_services.h>
88
#include <drizzled/algorithm/crc32.h>
90
#include <google/protobuf/io/coded_stream.h>
93
using namespace drizzled;
94
using namespace google;
96
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
98
TransactionLog::TransactionLog(const string in_log_file_path,
99
uint32_t in_flush_frequency,
100
bool in_do_checksum) :
102
log_file_path(in_log_file_path),
105
flush_frequency(in_flush_frequency),
106
do_checksum(in_do_checksum)
108
/* Setup our log file and determine the next write offset... */
109
log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
112
char errmsg[STRERROR_MAX];
113
strerror_r(errno, errmsg, sizeof(errmsg));
114
error_message.assign(_("Failed to open transaction log file "));
115
error_message.append(log_file_path);
116
error_message.append(" Got error: ");
117
error_message.append(errmsg);
118
error_message.push_back('\n');
123
/* For convenience, grab the log file name from the path */
124
if (log_file_path.find_first_of('/') != string::npos)
126
/* Strip to last / */
128
tmp= log_file_path.substr(log_file_path.find_last_of('/') + 1);
129
log_file_name.assign(tmp);
132
log_file_name.assign(log_file_path);
135
* The offset of the next write is the current position of the log
136
* file, since it's opened in append mode...
138
log_offset= lseek(log_file, 0, SEEK_END);
143
uint8_t *TransactionLog::packTransactionIntoLogEntry(const message::Transaction &trx,
145
uint32_t *checksum_out)
147
uint8_t *orig_buffer= buffer;
148
size_t message_byte_length= trx.ByteSize();
151
* Write the header information, which is the message type and
152
* the length of the transaction message into the buffer
154
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
155
static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
156
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
157
static_cast<uint32_t>(message_byte_length), buffer);
160
* Now write the serialized transaction message, followed
161
* by the optional checksum into the buffer.
163
buffer= trx.SerializeWithCachedSizesToArray(buffer);
167
*checksum_out= drizzled::algorithm::crc32(
168
reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
173
/* We always write in network byte order */
174
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(*checksum_out, buffer);
175
/* Reset the pointer back to its original location... */
180
off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
185
* Do an atomic increment on the offset of the log file position
187
off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
190
* Quick safety...if an error occurs above in another writer, the log
191
* file will be in a crashed state.
193
if (unlikely(state == CRASHED))
196
* Reset the log's offset in case we want to produce a decent error message including
197
* the original offset where an error occurred.
199
log_offset= cur_offset;
203
/* Write the full buffer in one swoop */
206
written= pwrite(log_file, data, data_length, cur_offset);
208
while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
210
if (unlikely(written != static_cast<ssize_t>(data_length)))
212
char errmsg[STRERROR_MAX];
213
strerror_r(errno, errmsg, sizeof(errmsg));
214
errmsg_printf(ERRMSG_LVL_ERROR,
215
_("Failed to write full size of log entry. Tried to write %" PRId64
216
" bytes at offset %" PRId64 ", but only wrote %" PRId32 " bytes. Error: %s\n"),
217
static_cast<int64_t>(data_length),
218
static_cast<int64_t>(cur_offset),
219
static_cast<int32_t>(written),
223
* Reset the log's offset in case we want to produce a decent error message including
224
* the original offset where an error occurred.
226
log_offset= cur_offset;
229
int error_code= syncLogFile();
231
if (unlikely(error_code != 0))
233
char errmsg[STRERROR_MAX];
234
strerror_r(errno, errmsg, sizeof(errmsg));
235
errmsg_printf(ERRMSG_LVL_ERROR,
236
_("Failed to sync log file. Got error: %s\n"),
242
int TransactionLog::syncLogFile()
244
switch (flush_frequency)
246
case FLUSH_FREQUENCY_EVERY_WRITE:
247
return internal::my_sync(log_file, 0);
248
case FLUSH_FREQUENCY_EVERY_SECOND:
250
time_t now_time= time(NULL);
251
if (last_sync_time <= (now_time - 1))
253
last_sync_time= now_time;
254
return internal::my_sync(log_file, 0);
258
case FLUSH_FREQUENCY_OS:
264
const string &TransactionLog::getLogFilename()
266
return log_file_name;
269
const string &TransactionLog::getLogFilepath()
271
return log_file_path;
274
void TransactionLog::truncate()
279
* This is NOT THREAD SAFE! DEBUG/TEST code only!
281
log_offset= (off_t) 0;
285
result= ftruncate(log_file, log_offset);
287
while (result == -1 && errno == EINTR);
290
bool TransactionLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,
291
string &out_filename) const
294
* Currently, we simply return the single logfile name
295
* Eventually, we'll have an index/hash with upper and
296
* lower bounds to look up a log file with a transaction id
298
out_filename.assign(log_file_path);
302
bool TransactionLog::hasError() const
307
void TransactionLog::clearError()
310
error_message.clear();
313
const string &TransactionLog::getErrorMessage() const
315
return error_message;
318
size_t TransactionLog::getLogEntrySize(const message::Transaction &trx)
320
return trx.ByteSize() + HEADER_TRAILER_BYTES;