1
#include <drizzled/server_includes.h>
2
#include <drizzled/gettext.h>
11
#include <drizzled/message/replication.pb.h>
13
#include "drizzled/korr.h"
16
using namespace drizzled;
19
* @file Example application for reading change records (Command messages)
23
* This program is used in the serial_event_log test suite to verify
24
* the log written by that plugin.
27
static void printInsert(const message::Command &container,
28
const message::InsertRecord &record)
31
cout << "INSERT INTO `" << container.schema() << "`.`" << container.table() << "` (";
33
assert(record.insert_field_size() > 0);
34
assert(record.insert_value_size() > 0);
35
assert(record.insert_value_size() % record.insert_field_size() == 0);
37
int32_t num_fields= record.insert_field_size();
40
for (x= 0; x < num_fields; x++)
45
const message::Table::Field f= record.insert_field(x);
47
cout << "`" << f.name() << "`";
53
* There may be an INSERT VALUES (),() type statement. We know the
54
* number of records is equal to the field_values array size divided
55
* by the number of fields.
57
* So, we do an inner and an outer loop. Outer loop is on the number
58
* of records and the inner loop on the number of fields. In this way,
59
* we know that record.field_values(outer_loop * num_fields) + inner_loop))
60
* always gives us our correct field value.
62
int32_t num_records= (record.insert_value_size() / num_fields);
64
for (x= 0; x < num_records; x++)
70
for (y= 0; y < num_fields; y++)
75
cout << "\"" << record.insert_value((x * num_fields) + y) << "\"";
83
static void printDeleteWithPK(const message::Command &container,
84
const message::DeleteRecord &record)
86
cout << "DELETE FROM `" << container.schema() << "`.`" << container.table() << "`";
88
assert(record.where_field_size() > 0);
89
assert(record.where_value_size() == record.where_field_size());
91
int32_t num_where_fields= record.where_field_size();
93
* Make sure we catch anywhere we're not aligning the fields with
94
* the field_values arrays...
96
assert(num_where_fields == record.where_value_size());
100
for (x= 0; x < num_where_fields; x++)
103
cout << " AND "; /* Always AND condition with a multi-column PK */
105
const message::Table::Field f= record.where_field(x);
107
/* Always equality conditions */
108
cout << "`" << f.name() << "` = \"" << record.where_value(x) << "\"";
114
static void printUpdateWithPK(const message::Command &container,
115
const message::UpdateRecord &record)
117
int32_t num_update_fields= record.update_field_size();
120
assert(record.update_field_size() > 0);
121
assert(record.where_field_size() > 0);
122
assert(record.where_value_size() == record.where_field_size());
124
cout << "UPDATE `" << container.schema() << "`.`" << container.table() << "` SET ";
126
for (x= 0;x < num_update_fields; x++)
128
message::Table::Field f= record.update_field(x);
133
cout << "`" << f.name() << "` = \"" << record.after_value(x) << "\"";
136
int32_t num_where_fields= record.where_field_size();
138
* Make sure we catch anywhere we're not aligning the fields with
139
* the field_values arrays...
141
assert(num_where_fields == record.where_value_size());
144
for (x= 0;x < num_where_fields; x++)
147
cout << " AND "; /* Always AND condition with a multi-column PK */
149
const message::Table::Field f= record.where_field(x);
151
/* Always equality conditions */
152
cout << "`" << f.name() << "` = \"" << record.where_value(x) << "\"";
157
static void printCommand(const message::Command &command)
159
cout << "/* Timestamp: " << command.timestamp() << " */"<< endl;
161
message::TransactionContext trx= command.transaction_context();
163
cout << "/* SERVER ID: " << trx.server_id() << " TRX ID: " << trx.transaction_id();
165
if (command.has_session_id())
166
cout << " SESSION ID: " << command.session_id();
170
switch (command.type())
172
case message::Command::START_TRANSACTION:
173
cout << "START TRANSACTION;";
175
case message::Command::COMMIT:
178
case message::Command::ROLLBACK:
181
case message::Command::INSERT:
183
printInsert(command, command.insert_record());
186
case message::Command::DELETE:
188
printDeleteWithPK(command, command.delete_record());
191
case message::Command::UPDATE:
193
printUpdateWithPK(command, command.update_record());
196
case message::Command::RAW_SQL:
198
std::string sql= command.sql();
199
/* Replace \n with spaces */
200
const std::string newline= "\n";
201
while (sql.find(newline) != std::string::npos)
202
sql.replace(sql.find(newline), 1, " ");
208
cout << "Received an unknown Command type: " << (int32_t) command.type();
213
int main(int argc, char* argv[])
215
GOOGLE_PROTOBUF_VERIFY_VERSION;
220
fprintf(stderr, _("Usage: %s COMMAND_LOG\n"), argv[0]);
224
message::Command command;
226
file= open(argv[1], O_RDONLY);
229
fprintf(stderr, _("Cannot open file: %s\n"), argv[1]);
233
char *temp_buffer= NULL;
234
uint64_t previous_length= 0;
235
ssize_t read_bytes= 0;
237
uint32_t checksum= 0;
239
/* We use korr.h macros when writing and must do the same when reading... */
240
unsigned char coded_length[8];
241
unsigned char coded_checksum[4];
243
/* Read in the length of the command */
244
while ((read_bytes= read(file, coded_length, sizeof(uint64_t))) != 0)
246
if (read_bytes == -1)
248
fprintf(stderr, _("Failed to read initial length header\n"));
251
length= uint8korr(coded_length);
253
if (length > SIZE_MAX)
255
fprintf(stderr, _("Attempted to read record bigger than SIZE_MAX\n"));
262
* First time around...just malloc the length. This block gets rid
263
* of a GCC warning about uninitialized temp_buffer.
265
temp_buffer= (char *) malloc((size_t) length);
267
/* No need to allocate if we have a buffer big enough... */
268
else if (length > previous_length)
270
temp_buffer= (char *) realloc(buffer, (size_t) length);
273
if (temp_buffer == NULL)
275
fprintf(stderr, _("Memory allocation failure trying to allocate %" PRIu64 " bytes.\n"), length);
281
/* Read the Command */
282
read_bytes= read(file, buffer, (size_t) length);
283
if ((read_bytes != (ssize_t) length))
285
fprintf(stderr, _("Could not read entire transaction. Read %" PRIu64 " bytes instead of %" PRIu64 " bytes.\n"), (uint64_t) read_bytes, (uint64_t) length);
289
if (! command.ParseFromArray(buffer, (int) length))
291
fprintf(stderr, _("Unable to parse command. Got error: %s.\n"), command.InitializationErrorString().c_str());
293
fprintf(stderr, _("BUFFER: %s\n"), buffer);
297
/* Read the checksum */
298
read_bytes= read(file, coded_checksum, sizeof(uint32_t));
299
if ((read_bytes != (ssize_t) sizeof(uint32_t)))
301
fprintf(stderr, _("Could not read entire checksum. Read %" PRIu64 " bytes instead of 4 bytes.\n"), (uint64_t) read_bytes);
304
checksum= uint4korr(coded_checksum);
308
/* @TODO checksumming.. */
311
/* Print the command */
312
printCommand(command);
314
/* Reset our length check */
315
previous_length= length;
316
memset(coded_length, 0, sizeof(coded_length));