1
#include <drizzled/global.h>
9
#include <drizzled/message/replication.pb.h>
11
#include "drizzled/korr.h"
14
using namespace drizzled;
17
* @file Example application for reading change records (Command messages)
21
* This program is used in the serial_event_log test suite to verify
22
* the log written by that plugin.
25
static void printInsert(const message::Command &container,
26
const message::InsertRecord &record)
29
cout << "INSERT INTO `" << container.schema() << "`.`" << container.table() << "` (";
31
assert(record.insert_field_size() > 0);
32
assert(record.insert_value_size() > 0);
33
assert(record.insert_value_size() % record.insert_field_size() == 0);
35
int32_t num_fields= record.insert_field_size();
38
for (x= 0; x < num_fields; x++)
43
const message::Table::Field f= record.insert_field(x);
45
cout << "`" << f.name() << "`";
51
* There may be an INSERT VALUES (),() type statement. We know the
52
* number of records is equal to the field_values array size divided
53
* by the number of fields.
55
* So, we do an inner and an outer loop. Outer loop is on the number
56
* of records and the inner loop on the number of fields. In this way,
57
* we know that record.field_values(outer_loop * num_fields) + inner_loop))
58
* always gives us our correct field value.
60
int32_t num_records= (record.insert_value_size() / num_fields);
62
for (x= 0; x < num_records; x++)
68
for (y= 0; y < num_fields; y++)
73
cout << "\"" << record.insert_value((x * num_fields) + y) << "\"";
81
static void printDeleteWithPK(const message::Command &container,
82
const message::DeleteRecord &record)
84
cout << "DELETE FROM `" << container.schema() << "`.`" << container.table() << "`";
86
assert(record.where_field_size() > 0);
87
assert(record.where_value_size() == record.where_field_size());
89
int32_t num_where_fields= record.where_field_size();
91
* Make sure we catch anywhere we're not aligning the fields with
92
* the field_values arrays...
94
assert(num_where_fields == record.where_value_size());
98
for (x= 0; x < num_where_fields; x++)
101
cout << " AND "; /* Always AND condition with a multi-column PK */
103
const message::Table::Field f= record.where_field(x);
105
/* Always equality conditions */
106
cout << "`" << f.name() << "` = \"" << record.where_value(x) << "\"";
112
static void printUpdateWithPK(const message::Command &container,
113
const message::UpdateRecord &record)
115
int32_t num_update_fields= record.update_field_size();
118
assert(record.update_field_size() > 0);
119
assert(record.where_field_size() > 0);
120
assert(record.where_value_size() == record.where_field_size());
122
cout << "UPDATE `" << container.schema() << "`.`" << container.table() << "` SET ";
124
for (x= 0;x < num_update_fields; x++)
126
message::Table::Field f= record.update_field(x);
131
cout << "`" << f.name() << "` = \"" << record.after_value(x) << "\"";
134
int32_t num_where_fields= record.where_field_size();
136
* Make sure we catch anywhere we're not aligning the fields with
137
* the field_values arrays...
139
assert(num_where_fields == record.where_value_size());
142
for (x= 0;x < num_where_fields; x++)
145
cout << " AND "; /* Always AND condition with a multi-column PK */
147
const message::Table::Field f= record.where_field(x);
149
/* Always equality conditions */
150
cout << "`" << f.name() << "` = \"" << record.where_value(x) << "\"";
155
static void printCommand(const message::Command &command)
157
cout << "/* Timestamp: " << command.timestamp() << " */"<< endl;
159
message::TransactionContext trx= command.transaction_context();
161
cout << "/* SID: " << trx.server_id() << " XID: " << trx.transaction_id() << " */ ";
163
switch (command.type())
165
case message::Command::START_TRANSACTION:
166
cout << "START TRANSACTION;";
168
case message::Command::COMMIT:
171
case message::Command::ROLLBACK:
174
case message::Command::INSERT:
176
printInsert(command, command.insert_record());
179
case message::Command::DELETE:
181
printDeleteWithPK(command, command.delete_record());
184
case message::Command::UPDATE:
186
printUpdateWithPK(command, command.update_record());
189
case message::Command::RAW_SQL:
191
std::string sql= command.sql();
192
/* Replace \n with spaces */
193
const std::string newline= "\n";
194
while (sql.find(newline) != std::string::npos)
195
sql.replace(sql.find(newline), 1, " ");
201
cout << "Received an unknown Command type: " << (int32_t) command.type();
206
int main(int argc, char* argv[])
208
GOOGLE_PROTOBUF_VERIFY_VERSION;
213
cerr << "Usage: " << argv[0] << " COMMAND_LOG" << endl;
217
message::Command command;
219
if ((file= open(argv[1], O_RDONLY)) == -1)
221
cerr << "Cannot open file: " << argv[1] << endl;
225
char *temp_buffer= NULL;
226
uint64_t previous_length= 0;
227
ssize_t read_bytes= 0;
230
/* We use korr.h macros when writing and must do the same when reading... */
231
unsigned char coded_length[8];
233
/* Read in the length of the command */
234
while ((read_bytes= read(file, coded_length, sizeof(uint64_t))) != 0)
236
if (read_bytes == -1)
238
cerr << "Failed to read initial length." << endl;
241
length= uint8korr(coded_length);
243
if (length > SIZE_MAX)
245
cerr << "Attempted to read record bigger than SIZE_MAX" << endl;
252
* First time around...just malloc the length. This block gets rid
253
* of a GCC warning about uninitialized temp_buffer.
255
temp_buffer= (char *) malloc((size_t) length);
257
/* No need to allocate if we have a buffer big enough... */
258
else if (length > previous_length)
260
temp_buffer= (char *) realloc(buffer, (size_t) length);
263
if (temp_buffer == NULL)
265
cerr << "Memory allocation failure trying to allocate " << length << " bytes." << endl;
271
/* Read the Command */
272
read_bytes= read(file, buffer, (size_t) length);
273
if ((read_bytes != (ssize_t) length))
275
cerr << "Could not read entire transaction. Read " << read_bytes << " bytes instead of " << length << " bytes." << endl;
279
if (! command.ParseFromArray(buffer, (int) length))
281
cerr << "Unable to parse command. Got error: " << command.InitializationErrorString() << endl;
283
cerr << "BUFFER: " << buffer << endl;
287
/* Print the command */
288
printCommand(command);
290
/* Reset our length check */
291
previous_length= length;
292
memset(coded_length, 0, sizeof(coded_length));