1
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2008-2009 Sun Microsystems, Inc.
5
* Copyright (C) 2010 Jay Pipes <jaypipes@gmail.com>
4
* Copyright (C) 2008-2009 Sun Microsystems
5
* Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
9
* Jay Pipes <jaypipes@gmail.com.com>
9
* Jay Pipes <jaypipes@gmail.com.com>
11
11
* This program is free software; you can redistribute it and/or modify
12
12
* it under the terms of the GNU General Public License as published by
33
* Currently, the transaction log file uses a simple, single-file, append-only
33
* Currently, the log file uses this implementation:
36
35
* We have an atomic off_t called log_offset which keeps track of the
37
* offset into the log file for writing the next log entry. The log
38
* entries are written, one after the other, in the following way:
41
* --------------------------------------
42
* |<- 4 bytes ->|<- # Bytes of Entry ->|
43
* --------------------------------------
44
* | Entry Type | Serialized Entry |
45
* --------------------------------------
48
* The Entry Type is an integer defined as an enumeration in the
49
* /drizzled/message/transaction.proto file called TransactionLogEntry::Type.
51
* Each transaction log entry type is written to the log differently. Here,
52
* we cover the format of each log entry type.
54
* Committed and Prepared Transaction Log Entries
55
* -----------------------------------------------
58
* ------------------------------------------------------------------
59
* |<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
60
* ------------------------------------------------------------------
61
* | Length | Serialized Transaction Message | Checksum |
62
* ------------------------------------------------------------------
36
* offset into the log file for writing the next Transaction.
38
* We write Transaction message encapsulated in an 8-byte length/type header and a
39
* 4-byte checksum trailer.
41
* When writing a Transaction to the log, we calculate the length of the
42
* Transaction to be written. We then increment log_offset by the length
43
* of the Transaction plus 2 * sizeof(uint32_t) plus sizeof(uint32_t) and store
44
* this new offset in a local off_t called cur_offset (see TransactionLog::apply().
45
* This compare and set is done in an atomic instruction.
47
* We then adjust the local off_t (cur_offset) back to the original
48
* offset by subtracting the length and sizeof(uint32_t) and sizeof(uint32_t).
50
* We then first write a 64-bit length and then the serialized transaction/transaction
51
* and optional checksum to our log file at our local cur_offset.
53
* --------------------------------------------------------------------------------
54
* |<- 4 bytes ->|<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
55
* --------------------------------------------------------------------------------
56
* | Msg Type | Length | Serialized Transaction Message | Checksum |
57
* --------------------------------------------------------------------------------
83
77
#include <drizzled/internal/my_sys.h> /* for internal::my_sync */
84
78
#include <drizzled/errmsg_print.h>
85
79
#include <drizzled/gettext.h>
86
#include <drizzled/message/transaction.pb.h>
87
#include <drizzled/transaction_services.h>
88
#include <drizzled/algorithm/crc32.h>
90
#include <google/protobuf/io/coded_stream.h>
92
81
using namespace std;
93
82
using namespace drizzled;
94
using namespace google;
96
84
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
98
86
TransactionLog::TransactionLog(const string in_log_file_path,
99
uint32_t in_flush_frequency,
100
bool in_do_checksum) :
87
uint32_t in_sync_method) :
102
89
log_file_path(in_log_file_path),
105
flush_frequency(in_flush_frequency),
106
do_checksum(in_do_checksum)
92
sync_method(in_sync_method)
108
94
/* Setup our log file and determine the next write offset... */
109
95
log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
110
96
if (log_file == -1)
112
char errmsg[STRERROR_MAX];
113
strerror_r(errno, errmsg, sizeof(errmsg));
114
98
error_message.assign(_("Failed to open transaction log file "));
115
99
error_message.append(log_file_path);
116
100
error_message.append(" Got error: ");
117
error_message.append(errmsg);
101
error_message.append(strerror(errno));
118
102
error_message.push_back('\n');
143
uint8_t *TransactionLog::packTransactionIntoLogEntry(const message::Transaction &trx,
145
uint32_t *checksum_out)
127
TransactionLog::~TransactionLog()
147
uint8_t *orig_buffer= buffer;
148
size_t message_byte_length= trx.ByteSize();
151
* Write the header information, which is the message type and
152
* the length of the transaction message into the buffer
154
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
155
static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
156
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
157
static_cast<uint32_t>(message_byte_length), buffer);
160
* Now write the serialized transaction message, followed
161
* by the optional checksum into the buffer.
163
buffer= trx.SerializeWithCachedSizesToArray(buffer);
129
/* Clear up any resources we've consumed */
167
*checksum_out= drizzled::algorithm::crc32(
168
reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
132
(void) close(log_file);
173
/* We always write in network byte order */
174
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(*checksum_out, buffer);
175
/* Reset the pointer back to its original location... */
180
136
off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
187
143
off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
146
* We adjust cur_offset back to the original log_offset before
147
* the increment above...
149
cur_offset-= static_cast<off_t>(data_length);
190
152
* Quick safety...if an error occurs above in another writer, the log
191
153
* file will be in a crashed state.
210
172
if (unlikely(written != static_cast<ssize_t>(data_length)))
212
char errmsg[STRERROR_MAX];
213
strerror_r(errno, errmsg, sizeof(errmsg));
214
errmsg_printf(error::ERROR,
174
errmsg_printf(ERRMSG_LVL_ERROR,
215
175
_("Failed to write full size of log entry. Tried to write %" PRId64
216
176
" bytes at offset %" PRId64 ", but only wrote %" PRId32 " bytes. Error: %s\n"),
217
177
static_cast<int64_t>(data_length),
218
178
static_cast<int64_t>(cur_offset),
219
static_cast<int32_t>(written),
179
static_cast<int64_t>(written),
223
183
* Reset the log's offset in case we want to produce a decent error message including
231
191
if (unlikely(error_code != 0))
233
sql_perror(_("Failed to sync log file."));
193
errmsg_printf(ERRMSG_LVL_ERROR,
194
_("Failed to sync log file. Got error: %s\n"),
236
197
return cur_offset;
239
200
int TransactionLog::syncLogFile()
241
switch (flush_frequency)
243
case FLUSH_FREQUENCY_EVERY_WRITE:
204
case SYNC_METHOD_EVERY_WRITE:
244
205
return internal::my_sync(log_file, 0);
245
case FLUSH_FREQUENCY_EVERY_SECOND:
206
case SYNC_METHOD_EVERY_SECOND:
247
208
time_t now_time= time(NULL);
248
209
if (last_sync_time <= (now_time - 1))