96
static void printTransaction(const message::Transaction &transaction)
98
const message::TransactionContext trx= transaction.transaction_context();
102
static bool isEndStatement(const message::Statement &statement)
104
switch (statement.type())
106
case (message::Statement::INSERT):
108
const message::InsertData &data= statement.insert_data();
109
if (not data.end_segment())
113
case (message::Statement::UPDATE):
115
const message::UpdateData &data= statement.update_data();
116
if (not data.end_segment())
120
case (message::Statement::DELETE):
122
const message::DeleteData &data= statement.delete_data();
123
if (not data.end_segment())
133
static bool isEndTransaction(const message::Transaction &transaction)
135
const message::TransactionContext trx= transaction.transaction_context();
137
size_t num_statements= transaction.statement_size();
140
* If any Statement is partial, then we can expect another Transaction
143
for (size_t x= 0; x < num_statements; ++x)
145
const message::Statement &statement= transaction.statement(x);
147
if (not isEndStatement(statement))
154
static void printEvent(const message::Event &event)
156
switch (event.type())
158
case message::Event::STARTUP:
160
cout << "-- EVENT: Server startup\n";
163
case message::Event::SHUTDOWN:
165
cout << "-- EVENT: Server shutdown\n";
170
cout << "-- EVENT: Unknown event\n";
176
static void printTransaction(const message::Transaction &transaction,
180
static uint64_t last_trx_id= 0;
181
bool should_commit= true;
182
const message::TransactionContext trx= transaction.transaction_context();
185
* First check to see if this is an event message.
187
if (transaction.has_event())
189
last_trx_id= trx.transaction_id();
190
if (not ignore_events)
193
transaction.PrintDebugString();
195
printEvent(transaction.event());
202
transaction.PrintDebugString();
100
206
size_t num_statements= transaction.statement_size();
103
cout << "START TRANSACTION;" << endl;
210
* One way to determine when a new transaction begins is when the
211
* transaction id changes (if all transactions have their GPB messages
212
* grouped together, which this program will). We check that here.
214
if (trx.transaction_id() != last_trx_id)
215
cout << "START TRANSACTION;" << endl;
217
last_trx_id= trx.transaction_id();
104
219
for (x= 0; x < num_statements; ++x)
106
221
const message::Statement &statement= transaction.statement(x);
224
should_commit= isEndStatement(statement);
226
/* A ROLLBACK would be the only Statement within the Transaction
227
* since all other Statements will have been deleted from the
228
* Transaction message, so we should fall out of this loop immediately.
229
* We don't want to issue an unnecessary COMMIT, so we change
230
* should_commit to false here.
232
if (statement.type() == message::Statement::ROLLBACK)
233
should_commit= false;
107
235
printStatement(statement);
109
cout << "COMMIT;" << endl;
239
* If ALL Statements are end segments, we can commit this Transaction.
240
* We can also check to see if the transaction_id changed, but this
241
* wouldn't work for the last Transaction in the transaction log since
242
* we don't have another Transaction to compare to. Checking for all
243
* end segments (like we do above) covers this case.
246
cout << "COMMIT;" << endl;
112
249
int main(int argc, char* argv[])
114
251
GOOGLE_PROTOBUF_VERIFY_VERSION;
117
if (argc < 2 || argc > 3)
119
fprintf(stderr, _("Usage: %s TRANSACTION_LOG [--checksum] \n"), argv[0]);
252
int opt_start_pos= 0;
253
uint64_t opt_transaction_id= 0;
256
* Setup program options
258
po::options_description desc("Program options");
260
("help", N_("Display help and exit"))
261
("checksum", N_("Perform checksum"))
262
("ignore-events", N_("Ignore event messages"))
263
("input-file", po::value< vector<string> >(), N_("Transaction log file"))
264
("raw", N_("Print raw Protobuf messages instead of SQL"))
266
po::value<int>(&opt_start_pos),
267
N_("Start reading from the given file position"))
269
po::value<uint64_t>(&opt_transaction_id),
270
N_("Only output for the given transaction ID"));
273
* We allow one positional argument that will be transaction file name
275
po::positional_options_description pos;
276
pos.add("input-file", 1);
279
* Parse the program options
281
po::variables_map vm;
282
po::store(po::command_line_parser(argc, argv).
283
options(desc).positional(pos).run(), vm);
287
* If the help option was given, or not input file was supplied,
288
* print out usage information.
290
if (vm.count("help") || not vm.count("input-file"))
292
cerr << desc << endl;
297
* Specifying both a transaction ID and a start position
300
if (vm.count("start-pos") && vm.count("transaction-id"))
302
cerr << _("Cannot use --start-pos and --transaction-id together\n");
306
bool do_checksum= vm.count("checksum") ? true : false;
307
bool ignore_events= vm.count("ignore-events") ? true : false;
308
bool print_as_raw= vm.count("raw") ? true : false;
310
string filename= vm["input-file"].as< vector<string> >()[0];
311
int file= open(filename.c_str(), O_RDONLY);
314
cerr << _("Cannot open file: ") << filename << endl;
123
318
message::Transaction transaction;
125
file= open(argv[1], O_RDONLY);
128
fprintf(stderr, _("Cannot open file: %s\n"), argv[1]);
132
bool do_checksum= false;
136
string checksum_arg(argv[2]);
137
transform(checksum_arg.begin(), checksum_arg.end(), checksum_arg.begin(), ::tolower);
139
if ("--checksum" == checksum_arg)
319
message::TransactionManager trx_mgr;
143
321
protobuf::io::ZeroCopyInputStream *raw_input= new protobuf::io::FileInputStream(file);
144
protobuf::io::CodedInputStream *coded_input= new protobuf::io::CodedInputStream(raw_input);
323
/* Skip ahead to user supplied position */
326
if (not raw_input->Skip(opt_start_pos))
328
cerr << _("Could not skip to position ") << opt_start_pos
329
<< _(" in file ") << filename << endl;
146
334
char *buffer= NULL;
147
335
char *temp_buffer= NULL;
151
339
bool result= true;
152
340
uint32_t message_type= 0;
154
/* Read in the length of the command */
155
while (result == true &&
156
coded_input->ReadLittleEndian32(&message_type) == true &&
157
coded_input->ReadLittleEndian32(&length) == true)
342
while (result == true)
345
* Odd thing to note about using CodedInputStream: This class wasn't
346
* intended to read large amounts of GPB messages. It has an upper
347
* limit on the number of bytes it will read (see Protobuf docs for
348
* this class for more info). A warning will be produced as you
349
* get close to this limit. Since this is a pretty lightweight class,
350
* we should be able to simply create a new one for each message we
353
protobuf::io::CodedInputStream coded_input(raw_input);
355
/* Read in the type and length of the command */
356
if (not coded_input.ReadLittleEndian32(&message_type) ||
357
not coded_input.ReadLittleEndian32(&length))
159
362
if (message_type != ReplicationServices::TRANSACTION)
161
fprintf(stderr, _("Found a non-transaction message in log. Currently, not supported.\n"));
364
cerr << _("Found a non-transaction message in log. Currently, not supported.\n");
165
368
if (length > INT_MAX)
167
fprintf(stderr, _("Attempted to read record bigger than INT_MAX\n"));
370
cerr << _("Attempted to read record bigger than INT_MAX\n");
171
374
if (buffer == NULL)
174
377
* First time around...just malloc the length. This block gets rid
175
378
* of a GCC warning about uninitialized temp_buffer.
185
388
if (temp_buffer == NULL)
187
fprintf(stderr, _("Memory allocation failure trying to allocate %" PRIu64 " bytes.\n"),
188
static_cast<uint64_t>(length));
390
cerr << _("Memory allocation failure trying to allocate ") << length << _(" bytes\n");
192
394
buffer= temp_buffer;
194
396
/* Read the Command */
195
result= coded_input->ReadRaw(buffer, (int) length);
397
result= coded_input.ReadRaw(buffer, (int) length);
196
398
if (result == false)
198
fprintf(stderr, _("Could not read transaction message.\n"));
199
fprintf(stderr, _("GPB ERROR: %s.\n"), strerror(errno));
400
char errmsg[STRERROR_MAX];
401
strerror_r(errno, errmsg, sizeof(errmsg));
402
cerr << _("Could not read transaction message.\n");
403
cerr << _("GPB ERROR: ") << errmsg << endl;;
201
405
hexdump.reserve(length * 4);
202
406
bytesToHexdumpFormat(hexdump, reinterpret_cast<const unsigned char *>(buffer), length);
203
fprintf(stderr, _("HEXDUMP:\n\n%s\n"), hexdump.c_str());
407
cerr << _("HEXDUMP:\n\n") << hexdump << endl;
207
411
result= transaction.ParseFromArray(buffer, static_cast<int32_t>(length));
208
412
if (result == false)
210
fprintf(stderr, _("Unable to parse command. Got error: %s.\n"), transaction.InitializationErrorString().c_str());
414
cerr << _("Unable to parse command. Got error: ")
415
<< transaction.InitializationErrorString() << endl;
211
416
if (buffer != NULL)
214
419
hexdump.reserve(length * 4);
215
420
bytesToHexdumpFormat(hexdump, reinterpret_cast<const unsigned char *>(buffer), length);
216
fprintf(stderr, _("HEXDUMP:\n\n%s\n"), hexdump.c_str());
421
cerr << _("HEXDUMP:\n\n") << hexdump << endl;
221
/* Print the transaction */
222
printTransaction(transaction);
426
const message::TransactionContext trx= transaction.transaction_context();
427
uint64_t transaction_id= trx.transaction_id();
430
* If we are given a transaction ID, we only look for that one and
433
if (vm.count("transaction-id"))
435
if (opt_transaction_id == transaction_id)
437
printTransaction(transaction, ignore_events, print_as_raw);
441
/* Need to get the checksum bytes out of stream */
442
coded_input.ReadLittleEndian32(&checksum);
443
previous_length = length;
449
* No transaction ID given, so process all messages.
453
if (not isEndTransaction(transaction))
455
trx_mgr.store(transaction);
460
* If there are any previous Transaction messages for this transaction,
461
* store this one, then output all of them together.
463
if (trx_mgr.contains(transaction_id))
465
trx_mgr.store(transaction);
467
uint32_t size= trx_mgr.getTransactionBufferSize(transaction_id);
472
message::Transaction new_trx;
473
trx_mgr.getTransactionMessage(new_trx, transaction_id, idx);
474
printTransaction(new_trx, ignore_events, print_as_raw);
478
/* No longer need this transaction */
479
trx_mgr.remove(transaction_id);
483
printTransaction(transaction, ignore_events, print_as_raw);
486
} /* end ! vm.count("transaction-id") */
224
488
/* Skip 4 byte checksum */
225
coded_input->ReadLittleEndian32(&checksum);
489
coded_input.ReadLittleEndian32(&checksum);
229
493
if (checksum != drizzled::algorithm::crc32(buffer, static_cast<size_t>(length)))
231
fprintf(stderr, _("Checksum failed. Wanted %" PRIu32 " got %" PRIu32 "\n"), checksum, drizzled::algorithm::crc32(buffer, static_cast<size_t>(length)));
495
cerr << _("Checksum failed. Wanted ")
498
<< drizzled::algorithm::crc32(buffer, static_cast<size_t>(length))
235
503
previous_length= length;
241
509
delete raw_input;
243
511
return (result == true ? 0 : 1);