44
44
#include <drizzled/server_includes.h>
45
#include "command_log_reader.h"
45
#include "transaction_log_reader.h"
47
47
#include <drizzled/gettext.h>
48
#include <drizzled/message/replication.pb.h>
48
#include <drizzled/message/transaction.pb.h>
50
50
#include <google/protobuf/io/zero_copy_stream_impl.h>
51
#include <drizzled/crc32.h>
51
#include <google/protobuf/io/coded_stream.h>
52
#include <drizzled/hash/crc32.h>
53
54
using namespace std;
54
55
using namespace drizzled;
55
56
using namespace google;
57
bool CommandLogReader::read(const ReplicationServices::GlobalTransactionId &to_read_trx_id,
58
message::Command *to_fill)
58
bool TransactionLogReader::read(const ReplicationServices::GlobalTransactionId &to_read_trx_id,
59
message::Transaction *to_fill)
61
62
* We ask the log to give us the log file containing the
62
* command message with the needed transaction id, then
63
* transaction message with the needed transaction id, then
63
64
* we read into the log file to obtain the message, and
64
* fill the supplied pointer to Command message from the
65
* fill the supplied pointer to Transaction message from the
65
66
* raw data in the log file.
67
68
string log_filename_to_read;
68
69
bool log_file_found= log.findLogFilenameContainingTransactionId(to_read_trx_id, log_filename_to_read);
71
bool do_checksum= false;
71
73
if (unlikely(! log_file_found))
77
protobuf::io::FileInputStream *log_file_stream;
78
message::Command tmp_command; /* Used to check trx id... */
79
string checksum_buffer; /* Buffer we use for buffering serialized messages for checksumming */
81
unsigned char coded_length[8]; /* Length header bytes in network byte order */
82
unsigned char coded_checksum[4]; /* Checksum trailer bytes in network byte order */
83
uint64_t length= 0; /* The length of the command to follow in stream */
84
uint32_t checksum= 0; /* The checksum sent in the wire */
85
ssize_t read_bytes; /* Number bytes read during pread() calls */
87
off_t current_offset= 0;
89
79
/* Open the log file and read through the log until the transaction ID is found */
90
80
int log_file= open(log_filename_to_read.c_str(), O_RDONLY | O_NONBLOCK);
92
82
if (log_file == -1)
94
84
errmsg_printf(ERRMSG_LVL_ERROR,
95
_("Failed to open command log file %s. Got error: %s\n"),
85
_("Failed to open transaction log file %s. Got error: %s\n"),
96
86
log_filename_to_read.c_str(),
101
log_file_stream= new protobuf::io::FileInputStream(log_file); /* Zero-copy stream implementation */
91
protobuf::io::ZeroCopyInputStream *raw_input= new protobuf::io::FileInputStream(log_file);
92
protobuf::io::CodedInputStream *coded_input= new protobuf::io::CodedInputStream(raw_input);
95
char *temp_buffer= NULL;
97
uint32_t previous_length= 0;
100
message::Transaction transaction;
102
/* Read in the length of the command */
103
while (result == true && coded_input->ReadLittleEndian32(&length) == true)
105
/* Read in the length of the command */
108
read_bytes= pread(log_file, coded_length, sizeof(uint64_t), current_offset);
110
while (read_bytes == -1 && errno == EINTR); /* Just retry the call when interrupted by a signal... */
112
if (unlikely(read_bytes < 0))
114
errmsg_printf(ERRMSG_LVL_ERROR,
115
_("Failed to read length header at offset %" PRId64 ". Got error: %s\n"),
116
(int64_t) current_offset,
123
/* End of file and did not find the command, so return false */
128
/* We use korr.h macros when writing and must do the same when reading... */
129
length= uint8korr(coded_length);
131
/* Skip to the start of the next Command */
132
log_file_stream->Skip(8);
134
if (unlikely(tmp_command.ParseFromZeroCopyStream(log_file_stream) == false))
137
errmsg_printf(ERRMSG_LVL_ERROR,
138
_("Failed to parse command message at offset %" PRId64 ". Got error: %s\n"),
139
(int64_t) current_offset,
140
tmp_command.InitializationErrorString().c_str());
105
if (length > INT_MAX)
107
fprintf(stderr, _("Attempted to read record bigger than INT_MAX\n"));
114
* First time around...just malloc the length. This block gets rid
115
* of a GCC warning about uninitialized temp_buffer.
117
temp_buffer= (char *) malloc(static_cast<size_t>(length));
119
/* No need to allocate if we have a buffer big enough... */
120
else if (length > previous_length)
122
temp_buffer= (char *) realloc(buffer, static_cast<size_t>(length));
125
if (temp_buffer == NULL)
127
fprintf(stderr, _("Memory allocation failure trying to allocate %" PRIu64 " bytes.\n"),
128
static_cast<uint64_t>(length));
146
/* Cool, message was read. Check the trx id */
147
if (tmp_command.transaction_context().transaction_id() == to_read_trx_id)
149
/* Found what we were looking for...copy to the pointer we should fill */
150
to_fill->CopyFrom(tmp_command);
155
/* Keep the stream and the pread() calls in sync... */
156
current_offset+= length;
159
* We now read 4 bytes containing the (possible) checksum of the
160
* just-read command message. If the result is not zero, then a
161
* checksum was written...
165
read_bytes= pread(log_file, coded_checksum, sizeof(uint32_t), current_offset);
167
while (read_bytes == -1 && errno == EINTR); /* Just retry the call when interrupted by a signal... */
169
if (unlikely(read_bytes < 0))
171
errmsg_printf(ERRMSG_LVL_ERROR,
172
_("Failed to read checksum trailer at offset %" PRId64 ". Got error: %s\n"),
173
(int64_t) current_offset,
179
checksum= uint4korr(coded_checksum);
183
tmp_command.SerializeToString(&checksum_buffer);
184
uint32_t recalc_checksum= hash_crc32(checksum_buffer.c_str(), static_cast<size_t>(length));
185
if (unlikely(recalc_checksum != checksum))
187
errmsg_printf(ERRMSG_LVL_ERROR, _("Checksum FAILED!\n"),
188
(int64_t) current_offset,
193
checksum_buffer.clear();
196
/* Keep the stream and the pread() calls in sync... */
197
current_offset+= sizeof(uint32_t);
134
/* Read the Command */
135
result= coded_input->ReadRaw(buffer, length);
138
fprintf(stderr, _("Could not read transaction message.\n"));
139
fprintf(stderr, _("GPB ERROR: %s.\n"), strerror(errno));
140
fprintf(stderr, _("Raw buffer read: %s.\n"), buffer);
144
result= transaction.ParseFromArray(buffer, static_cast<int32_t>(length));
147
fprintf(stderr, _("Unable to parse transaction. Got error: %s.\n"), transaction.InitializationErrorString().c_str());
149
fprintf(stderr, _("BUFFER: %s\n"), buffer);
153
/* Skip 4 byte checksum */
154
coded_input->ReadLittleEndian32(&checksum);
158
if (checksum != drizzled::hash::crc32(buffer, static_cast<size_t>(length)))
160
fprintf(stderr, _("Checksum failed. Wanted %" PRIu32 " got %" PRIu32 "\n"), checksum, drizzled::hash::crc32(buffer, static_cast<size_t>(length)));
164
/* Cool, message was read. Check the trx id */
165
if (transaction.transaction_context().transaction_id() == to_read_trx_id)
167
/* Found what we were looking for...copy to the pointer we should fill */
168
to_fill->CopyFrom(transaction);
172
previous_length= length;
200
delete log_file_stream;