33
* Currently, the log file uses this implementation:
33
* Currently, the transaction log file uses a simple, single-file, append-only
35
36
* We have an atomic off_t called log_offset which keeps track of the
36
* offset into the log file for writing the next Transaction.
38
* We write Transaction message encapsulated in an 8-byte length/type header and a
39
* 4-byte checksum trailer.
41
* When writing a Transaction to the log, we calculate the length of the
42
* Transaction to be written. We then increment log_offset by the length
43
* of the Transaction plus 2 * sizeof(uint32_t) plus sizeof(uint32_t) and store
44
* this new offset in a local off_t called cur_offset (see TransactionLog::apply().
45
* This compare and set is done in an atomic instruction.
47
* We then adjust the local off_t (cur_offset) back to the original
48
* offset by subtracting the length and sizeof(uint32_t) and sizeof(uint32_t).
50
* We then first write a 64-bit length and then the serialized transaction/transaction
51
* and optional checksum to our log file at our local cur_offset.
53
* --------------------------------------------------------------------------------
54
* |<- 4 bytes ->|<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
55
* --------------------------------------------------------------------------------
56
* | Msg Type | Length | Serialized Transaction Message | Checksum |
57
* --------------------------------------------------------------------------------
37
* offset into the log file for writing the next log entry. The log
38
* entries are written, one after the other, in the following way:
41
* --------------------------------------
42
* |<- 4 bytes ->|<- # Bytes of Entry ->|
43
* --------------------------------------
44
* | Entry Type | Serialized Entry |
45
* --------------------------------------
48
* The Entry Type is an integer defined as an enumeration in the
49
* /drizzled/message/transaction.proto file called TransactionLogEntry::Type.
51
* Each transaction log entry type is written to the log differently. Here,
52
* we cover the format of each log entry type.
54
* Committed and Prepared Transaction Log Entries
55
* -----------------------------------------------
58
* ------------------------------------------------------------------
59
* |<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
60
* ------------------------------------------------------------------
61
* | Length | Serialized Transaction Message | Checksum |
62
* ------------------------------------------------------------------
77
83
#include <drizzled/internal/my_sys.h> /* for internal::my_sync */
78
84
#include <drizzled/errmsg_print.h>
79
85
#include <drizzled/gettext.h>
86
#include <drizzled/message/transaction.pb.h>
87
#include <drizzled/transaction_services.h>
88
#include <drizzled/algorithm/crc32.h>
90
#include <google/protobuf/io/coded_stream.h>
81
92
using namespace std;
82
93
using namespace drizzled;
94
using namespace google;
84
96
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
86
98
TransactionLog::TransactionLog(const string in_log_file_path,
87
uint32_t in_sync_method) :
99
uint32_t in_sync_method,
100
bool in_do_checksum) :
89
102
log_file_path(in_log_file_path),
92
sync_method(in_sync_method)
105
sync_method(in_sync_method),
106
do_checksum(in_do_checksum)
94
108
/* Setup our log file and determine the next write offset... */
95
109
log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
150
uint8_t *TransactionLog::packTransactionIntoLogEntry(const message::Transaction &trx,
152
uint32_t *checksum_out)
154
uint8_t *orig_buffer= buffer;
155
size_t message_byte_length= trx.ByteSize();
158
* Write the header information, which is the message type and
159
* the length of the transaction message into the buffer
161
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
162
static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
163
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
164
static_cast<uint32_t>(message_byte_length), buffer);
167
* Now write the serialized transaction message, followed
168
* by the optional checksum into the buffer.
170
buffer= trx.SerializeWithCachedSizesToArray(buffer);
174
*checksum_out= drizzled::algorithm::crc32(
175
reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
180
/* We always write in network byte order */
181
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(*checksum_out, buffer);
182
/* Reset the pointer back to its original location... */
136
187
off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
138
189
ssize_t written= 0;
143
194
off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
146
* We adjust cur_offset back to the original log_offset before
147
* the increment above...
149
cur_offset-= static_cast<off_t>(data_length);
152
197
* Quick safety...if an error occurs above in another writer, the log
153
198
* file will be in a crashed state.