~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/message/transaction_reader.cc

  • Committer: Lee Bieber
  • Date: 2010-10-27 02:00:05 UTC
  • mfrom: (1882.1.2 build)
  • Revision ID: kalebral@gmail.com-20101027020005-jqiq89je9lhpidux
Merge Shrews - add options to the transaction_reader utility program
Merge Shrews - fix bug 660779: Transaction log retaining partial information from an aborted transaction

Show diffs side-by-side

added added

removed removed

Lines of Context:
22
22
 */
23
23
 
24
24
#include "config.h"
25
 
#include <drizzled/definitions.h>
26
 
#include <drizzled/gettext.h>
27
 
#include <drizzled/replication_services.h>
28
 
#include <drizzled/algorithm/crc32.h>
29
25
#include <sys/types.h>
30
26
#include <sys/stat.h>
31
27
#include <fcntl.h>
32
28
#include <limits.h>
33
 
#include <cstdio>
34
29
#include <cerrno>
35
30
#include <iostream>
36
31
#include <string>
37
32
#include <algorithm>
38
33
#include <vector>
39
34
#include <unistd.h>
40
 
#include <drizzled/message/transaction.pb.h>
41
 
#include <drizzled/message/statement_transform.h>
42
 
#include <drizzled/message/transaction_manager.h>
43
 
#include <drizzled/util/convert.h>
 
35
#include "drizzled/definitions.h"
 
36
#include "drizzled/gettext.h"
 
37
#include "drizzled/replication_services.h"
 
38
#include "drizzled/algorithm/crc32.h"
 
39
#include "drizzled/message/transaction.pb.h"
 
40
#include "drizzled/message/statement_transform.h"
 
41
#include "drizzled/message/transaction_manager.h"
 
42
#include "drizzled/util/convert.h"
44
43
 
45
44
#include <google/protobuf/io/coded_stream.h>
46
45
#include <google/protobuf/io/zero_copy_stream_impl.h>
175
174
}
176
175
 
177
176
static void printTransaction(const message::Transaction &transaction,
178
 
                             bool ignore_events)
 
177
                             bool ignore_events,
 
178
                             bool print_as_raw)
179
179
{
180
180
  static uint64_t last_trx_id= 0;
181
181
  bool should_commit= true;
188
188
  {
189
189
    last_trx_id= trx.transaction_id();
190
190
    if (not ignore_events)
191
 
      printEvent(transaction.event());
 
191
    {
 
192
      if (print_as_raw)
 
193
        transaction.PrintDebugString();
 
194
      else
 
195
        printEvent(transaction.event());
 
196
    }
 
197
    return;
 
198
  }
 
199
 
 
200
  if (print_as_raw)
 
201
  {
 
202
    transaction.PrintDebugString();
192
203
    return;
193
204
  }
194
205
 
238
249
int main(int argc, char* argv[])
239
250
{
240
251
  GOOGLE_PROTOBUF_VERIFY_VERSION;
241
 
  int file;
 
252
  int opt_start_pos= 0;
 
253
  uint64_t opt_transaction_id= 0;
242
254
 
243
255
  /*
244
256
   * Setup program options
248
260
    ("help", N_("Display help and exit"))
249
261
    ("checksum", N_("Perform checksum"))
250
262
    ("ignore-events", N_("Ignore event messages"))
251
 
    ("input-file", po::value< vector<string> >(), N_("Transaction log file"));
 
263
    ("input-file", po::value< vector<string> >(), N_("Transaction log file"))
 
264
    ("raw", N_("Print raw Protobuf messages instead of SQL"))
 
265
    ("start-pos",
 
266
      po::value<int>(&opt_start_pos),
 
267
      N_("Start reading from the given file position"))
 
268
    ("transaction-id",
 
269
      po::value<uint64_t>(&opt_transaction_id),
 
270
      N_("Only output for the given transaction ID"));
252
271
 
253
272
  /*
254
273
   * We allow one positional argument that will be transaction file name
270
289
   */
271
290
  if (vm.count("help") || not vm.count("input-file"))
272
291
  {
273
 
    fprintf(stderr, _("Usage: %s [options] TRANSACTION_LOG \n"),
274
 
            argv[0]);
275
 
    fprintf(stderr, _("OPTIONS:\n"));
276
 
    fprintf(stderr, _("--help           :  Display help and exit\n"));
277
 
    fprintf(stderr, _("--checksum       :  Perform checksum\n"));
278
 
    fprintf(stderr, _("--ignore-events  :  Ignore event messages\n"));
 
292
    cerr << desc << endl;
 
293
    return -1;
 
294
  }
 
295
 
 
296
  /*
 
297
   * Specifying both a transaction ID and a start position
 
298
   * is not logical.
 
299
   */
 
300
  if (vm.count("start-pos") && vm.count("transaction-id"))
 
301
  {
 
302
    cerr << _("Cannot use --start-pos and --transaction-id together\n");
279
303
    return -1;
280
304
  }
281
305
 
282
306
  bool do_checksum= vm.count("checksum") ? true : false;
283
307
  bool ignore_events= vm.count("ignore-events") ? true : false;
 
308
  bool print_as_raw= vm.count("raw") ? true : false;
284
309
 
285
310
  string filename= vm["input-file"].as< vector<string> >()[0];
286
 
  file= open(filename.c_str(), O_RDONLY);
 
311
  int file= open(filename.c_str(), O_RDONLY);
287
312
  if (file == -1)
288
313
  {
289
 
    fprintf(stderr, _("Cannot open file: %s\n"), filename.c_str());
 
314
    cerr << _("Cannot open file: ") << filename << endl;
290
315
    return -1;
291
316
  }
292
317
 
296
321
  protobuf::io::ZeroCopyInputStream *raw_input= new protobuf::io::FileInputStream(file);
297
322
  protobuf::io::CodedInputStream *coded_input= new protobuf::io::CodedInputStream(raw_input);
298
323
 
 
324
  /* Skip ahead to user supplied position */
 
325
  if (opt_start_pos)
 
326
  {
 
327
    if (not coded_input->Skip(opt_start_pos))
 
328
    {
 
329
      cerr << _("Could not skip to position ") << opt_start_pos
 
330
           << _(" in file ") << filename << endl;
 
331
      exit(-1);
 
332
    }
 
333
  }
 
334
 
299
335
  char *buffer= NULL;
300
336
  char *temp_buffer= NULL;
301
337
  uint32_t length= 0;
311
347
  {
312
348
    if (message_type != ReplicationServices::TRANSACTION)
313
349
    {
314
 
      fprintf(stderr, _("Found a non-transaction message in log.  Currently, not supported.\n"));
315
 
      exit(1);
 
350
      cerr << _("Found a non-transaction message in log.  Currently, not supported.\n");
 
351
      exit(-1);
316
352
    }
317
353
 
318
354
    if (length > INT_MAX)
319
355
    {
320
 
      fprintf(stderr, _("Attempted to read record bigger than INT_MAX\n"));
321
 
      exit(1);
 
356
      cerr << _("Attempted to read record bigger than INT_MAX\n");
 
357
      exit(-1);
322
358
    }
323
359
 
324
360
    if (buffer == NULL)
337
373
 
338
374
    if (temp_buffer == NULL)
339
375
    {
340
 
      fprintf(stderr, _("Memory allocation failure trying to allocate %" PRIu64 " bytes.\n"),
341
 
              static_cast<uint64_t>(length));
 
376
      cerr << _("Memory allocation failure trying to allocate ") << length << _(" bytes\n");
342
377
      break;
343
378
    }
344
379
    else
350
385
    {
351
386
      char errmsg[STRERROR_MAX];
352
387
      strerror_r(errno, errmsg, sizeof(errmsg));
353
 
      fprintf(stderr, _("Could not read transaction message.\n"));
354
 
      fprintf(stderr, _("GPB ERROR: %s.\n"), errmsg);
 
388
      cerr << _("Could not read transaction message.\n");
 
389
      cerr << _("GPB ERROR: ") << errmsg << endl;;
355
390
      string hexdump;
356
391
      hexdump.reserve(length * 4);
357
392
      bytesToHexdumpFormat(hexdump, reinterpret_cast<const unsigned char *>(buffer), length);
358
 
      fprintf(stderr, _("HEXDUMP:\n\n%s\n"), hexdump.c_str());
 
393
      cerr << _("HEXDUMP:\n\n") << hexdump << endl;
359
394
      break;
360
395
    }
361
396
 
362
397
    result= transaction.ParseFromArray(buffer, static_cast<int32_t>(length));
363
398
    if (result == false)
364
399
    {
365
 
      fprintf(stderr, _("Unable to parse command. Got error: %s.\n"), transaction.InitializationErrorString().c_str());
 
400
      cerr << _("Unable to parse command. Got error: ")
 
401
           << transaction.InitializationErrorString() << endl;
366
402
      if (buffer != NULL)
367
403
      {
368
404
        string hexdump;
369
405
        hexdump.reserve(length * 4);
370
406
        bytesToHexdumpFormat(hexdump, reinterpret_cast<const unsigned char *>(buffer), length);
371
 
        fprintf(stderr, _("HEXDUMP:\n\n%s\n"), hexdump.c_str());
 
407
        cerr <<  _("HEXDUMP:\n\n") << hexdump << endl;
372
408
      }
373
409
      break;
374
410
    }
375
411
 
376
 
    if (not isEndTransaction(transaction))
 
412
    const message::TransactionContext trx= transaction.transaction_context();
 
413
    uint64_t transaction_id= trx.transaction_id();
 
414
 
 
415
    /*
 
416
     * If we are given a transaction ID, we only look for that one and
 
417
     * print it out.
 
418
     */
 
419
    if (vm.count("transaction-id"))
377
420
    {
378
 
      trx_mgr.store(transaction);
 
421
      if (opt_transaction_id == transaction_id)
 
422
      {
 
423
        printTransaction(transaction, ignore_events, print_as_raw);
 
424
      }
 
425
      else
 
426
      {
 
427
        /* Need to get the checksum bytes out of stream */
 
428
        coded_input->ReadLittleEndian32(&checksum);
 
429
        previous_length = length;
 
430
        continue;
 
431
      }
379
432
    }
 
433
 
 
434
    /*
 
435
     * No transaction ID given, so process all messages.
 
436
     */
380
437
    else
381
438
    {
382
 
      const message::TransactionContext trx= transaction.transaction_context();
383
 
      uint64_t transaction_id= trx.transaction_id();
384
 
 
385
 
      /*
386
 
       * If there are any previous Transaction messages for this transaction,
387
 
       * store this one, then output all of them together.
388
 
       */
389
 
      if (trx_mgr.contains(transaction_id))
 
439
      if (not isEndTransaction(transaction))
390
440
      {
391
441
        trx_mgr.store(transaction);
392
 
 
393
 
        uint32_t size= trx_mgr.getTransactionBufferSize(transaction_id);
394
 
        uint32_t idx= 0;
395
 
 
396
 
        while (idx != size)
397
 
        {
398
 
          message::Transaction new_trx;
399
 
          trx_mgr.getTransactionMessage(new_trx, transaction_id, idx);
400
 
          printTransaction(new_trx, ignore_events);
401
 
          idx++;
402
 
        }
403
 
 
404
 
        /* No longer need this transaction */
405
 
        trx_mgr.remove(transaction_id);
406
442
      }
407
443
      else
408
444
      {
409
 
        printTransaction(transaction, ignore_events);
 
445
        /*
 
446
         * If there are any previous Transaction messages for this transaction,
 
447
         * store this one, then output all of them together.
 
448
         */
 
449
        if (trx_mgr.contains(transaction_id))
 
450
        {
 
451
          trx_mgr.store(transaction);
 
452
 
 
453
          uint32_t size= trx_mgr.getTransactionBufferSize(transaction_id);
 
454
          uint32_t idx= 0;
 
455
 
 
456
          while (idx != size)
 
457
          {
 
458
            message::Transaction new_trx;
 
459
            trx_mgr.getTransactionMessage(new_trx, transaction_id, idx);
 
460
            printTransaction(new_trx, ignore_events, print_as_raw);
 
461
            idx++;
 
462
          }
 
463
 
 
464
          /* No longer need this transaction */
 
465
          trx_mgr.remove(transaction_id);
 
466
        }
 
467
        else
 
468
        {
 
469
          printTransaction(transaction, ignore_events, print_as_raw);
 
470
        }
410
471
      }
411
 
    }
 
472
    } /* end ! vm.count("transaction-id") */
412
473
 
413
474
    /* Skip 4 byte checksum */
414
475
    coded_input->ReadLittleEndian32(&checksum);
417
478
    {
418
479
      if (checksum != drizzled::algorithm::crc32(buffer, static_cast<size_t>(length)))
419
480
      {
420
 
        fprintf(stderr, _("Checksum failed. Wanted %" PRIu32 " got %" PRIu32 "\n"), checksum, drizzled::algorithm::crc32(buffer, static_cast<size_t>(length)));
 
481
        cerr << _("Checksum failed. Wanted ")
 
482
             << checksum
 
483
             << _(" got ")
 
484
             << drizzled::algorithm::crc32(buffer, static_cast<size_t>(length))
 
485
             << endl;
421
486
      }
422
487
    }
423
488