~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to drizzled/message/transaction_reader.cc

  • Committer: Andrew Hutchings
  • Date: 2010-11-01 22:14:18 UTC
  • mto: This revision was merged to the branch mainline in revision 1907.
  • Revision ID: andrew@linuxjedi.co.uk-20101101221418-9n9gmm4ms7fl8vo5
Fix copyright

Show diffs side-by-side

added added

removed removed

Lines of Context:
22
22
 */
23
23
 
24
24
#include "config.h"
25
 
#include <drizzled/gettext.h>
26
 
#include <drizzled/replication_services.h>
27
 
#include <drizzled/algorithm/crc32.h>
28
25
#include <sys/types.h>
29
26
#include <sys/stat.h>
30
27
#include <fcntl.h>
35
32
#include <algorithm>
36
33
#include <vector>
37
34
#include <unistd.h>
38
 
#include <drizzled/message/transaction.pb.h>
39
 
#include <drizzled/message/statement_transform.h>
40
 
#include <drizzled/util/convert.h>
 
35
#include "drizzled/definitions.h"
 
36
#include "drizzled/gettext.h"
 
37
#include "drizzled/replication_services.h"
 
38
#include "drizzled/algorithm/crc32.h"
 
39
#include "drizzled/message/transaction.pb.h"
 
40
#include "drizzled/message/statement_transform.h"
 
41
#include "drizzled/message/transaction_manager.h"
 
42
#include "drizzled/util/convert.h"
41
43
 
42
44
#include <google/protobuf/io/coded_stream.h>
43
45
#include <google/protobuf/io/zero_copy_stream_impl.h>
44
46
 
 
47
#include <boost/program_options.hpp>
 
48
 
45
49
using namespace std;
46
50
using namespace google;
47
51
using namespace drizzled;
48
52
 
 
53
namespace po= boost::program_options;
 
54
 
49
55
static const char *replace_with_spaces= "\n\r";
50
56
 
51
57
static void printStatement(const message::Statement &statement)
63
69
  {
64
70
    string &sql= *sql_string_iter;
65
71
 
66
 
    /* 
67
 
     * Replace \n and \r with spaces so that SQL statements 
68
 
     * are always on a single line 
 
72
    /*
 
73
     * Replace \n and \r with spaces so that SQL statements
 
74
     * are always on a single line
69
75
     */
70
76
    {
71
77
      string::size_type found= sql.find_first_of(replace_with_spaces);
93
99
  }
94
100
}
95
101
 
96
 
static void printTransaction(const message::Transaction &transaction)
97
 
{
98
 
  const message::TransactionContext trx= transaction.transaction_context();
 
102
static bool isEndStatement(const message::Statement &statement)
 
103
{
 
104
  switch (statement.type())
 
105
  {
 
106
    case (message::Statement::INSERT):
 
107
    {
 
108
      const message::InsertData &data= statement.insert_data();
 
109
      if (not data.end_segment())
 
110
        return false;
 
111
      break;
 
112
    }
 
113
    case (message::Statement::UPDATE):
 
114
    {
 
115
      const message::UpdateData &data= statement.update_data();
 
116
      if (not data.end_segment())
 
117
        return false;
 
118
      break;
 
119
    }
 
120
    case (message::Statement::DELETE):
 
121
    {
 
122
      const message::DeleteData &data= statement.delete_data();
 
123
      if (not data.end_segment())
 
124
        return false;
 
125
      break;
 
126
    }
 
127
    default:
 
128
      return true;
 
129
  }
 
130
  return true;
 
131
}
 
132
 
 
133
static bool isEndTransaction(const message::Transaction &transaction)
 
134
{
 
135
  const message::TransactionContext trx= transaction.transaction_context();
 
136
 
 
137
  size_t num_statements= transaction.statement_size();
 
138
 
 
139
  /*
 
140
   * If any Statement is partial, then we can expect another Transaction
 
141
   * message.
 
142
   */
 
143
  for (size_t x= 0; x < num_statements; ++x)
 
144
  {
 
145
    const message::Statement &statement= transaction.statement(x);
 
146
 
 
147
    if (not isEndStatement(statement))
 
148
      return false;
 
149
  }
 
150
 
 
151
  return true;
 
152
}
 
153
 
 
154
static void printEvent(const message::Event &event)
 
155
{
 
156
  switch (event.type())
 
157
  {
 
158
    case message::Event::STARTUP:
 
159
    {
 
160
      cout << "-- EVENT: Server startup\n";
 
161
      break;
 
162
    }
 
163
    case message::Event::SHUTDOWN:
 
164
    {
 
165
      cout << "-- EVENT: Server shutdown\n";
 
166
      break;
 
167
    }
 
168
    default:
 
169
    {
 
170
      cout << "-- EVENT: Unknown event\n";
 
171
      break;
 
172
    }
 
173
  }
 
174
}
 
175
 
 
176
static void printTransaction(const message::Transaction &transaction,
 
177
                             bool ignore_events,
 
178
                             bool print_as_raw)
 
179
{
 
180
  static uint64_t last_trx_id= 0;
 
181
  bool should_commit= true;
 
182
  const message::TransactionContext trx= transaction.transaction_context();
 
183
 
 
184
  /*
 
185
   * First check to see if this is an event message.
 
186
   */
 
187
  if (transaction.has_event())
 
188
  {
 
189
    last_trx_id= trx.transaction_id();
 
190
    if (not ignore_events)
 
191
    {
 
192
      if (print_as_raw)
 
193
        transaction.PrintDebugString();
 
194
      else
 
195
        printEvent(transaction.event());
 
196
    }
 
197
    return;
 
198
  }
 
199
 
 
200
  if (print_as_raw)
 
201
  {
 
202
    transaction.PrintDebugString();
 
203
    return;
 
204
  }
99
205
 
100
206
  size_t num_statements= transaction.statement_size();
101
207
  size_t x;
102
208
 
103
 
  cout << "START TRANSACTION;" << endl;
 
209
  /*
 
210
   * One way to determine when a new transaction begins is when the
 
211
   * transaction id changes (if all transactions have their GPB messages
 
212
   * grouped together, which this program will). We check that here.
 
213
   */
 
214
  if (trx.transaction_id() != last_trx_id)
 
215
    cout << "START TRANSACTION;" << endl;
 
216
 
 
217
  last_trx_id= trx.transaction_id();
 
218
 
104
219
  for (x= 0; x < num_statements; ++x)
105
220
  {
106
221
    const message::Statement &statement= transaction.statement(x);
 
222
 
 
223
    if (should_commit)
 
224
      should_commit= isEndStatement(statement);
 
225
 
 
226
    /* A ROLLBACK would be the only Statement within the Transaction
 
227
     * since all other Statements will have been deleted from the
 
228
     * Transaction message, so we should fall out of this loop immediately.
 
229
     * We don't want to issue an unnecessary COMMIT, so we change
 
230
     * should_commit to false here.
 
231
     */
 
232
    if (statement.type() == message::Statement::ROLLBACK)
 
233
      should_commit= false;
 
234
 
107
235
    printStatement(statement);
108
236
  }
109
 
  cout << "COMMIT;" << endl;
 
237
 
 
238
  /*
 
239
   * If ALL Statements are end segments, we can commit this Transaction.
 
240
   * We can also check to see if the transaction_id changed, but this
 
241
   * wouldn't work for the last Transaction in the transaction log since
 
242
   * we don't have another Transaction to compare to. Checking for all
 
243
   * end segments (like we do above) covers this case.
 
244
   */
 
245
  if (should_commit)
 
246
    cout << "COMMIT;" << endl;
110
247
}
111
248
 
112
249
int main(int argc, char* argv[])
113
250
{
114
251
  GOOGLE_PROTOBUF_VERIFY_VERSION;
115
 
  int file;
116
 
 
117
 
  if (argc < 2 || argc > 3)
118
 
  {
119
 
    fprintf(stderr, _("Usage: %s TRANSACTION_LOG [--checksum] \n"), argv[0]);
 
252
  int opt_start_pos= 0;
 
253
  uint64_t opt_transaction_id= 0;
 
254
 
 
255
  /*
 
256
   * Setup program options
 
257
   */
 
258
  po::options_description desc("Program options");
 
259
  desc.add_options()
 
260
    ("help", N_("Display help and exit"))
 
261
    ("checksum", N_("Perform checksum"))
 
262
    ("ignore-events", N_("Ignore event messages"))
 
263
    ("input-file", po::value< vector<string> >(), N_("Transaction log file"))
 
264
    ("raw", N_("Print raw Protobuf messages instead of SQL"))
 
265
    ("start-pos",
 
266
      po::value<int>(&opt_start_pos),
 
267
      N_("Start reading from the given file position"))
 
268
    ("transaction-id",
 
269
      po::value<uint64_t>(&opt_transaction_id),
 
270
      N_("Only output for the given transaction ID"));
 
271
 
 
272
  /*
 
273
   * We allow one positional argument that will be transaction file name
 
274
   */
 
275
  po::positional_options_description pos;
 
276
  pos.add("input-file", 1);
 
277
 
 
278
  /*
 
279
   * Parse the program options
 
280
   */
 
281
  po::variables_map vm;
 
282
  po::store(po::command_line_parser(argc, argv).
 
283
            options(desc).positional(pos).run(), vm);
 
284
  po::notify(vm);
 
285
 
 
286
  /*
 
287
   * If the help option was given, or not input file was supplied,
 
288
   * print out usage information.
 
289
   */
 
290
  if (vm.count("help") || not vm.count("input-file"))
 
291
  {
 
292
    cerr << desc << endl;
 
293
    return -1;
 
294
  }
 
295
 
 
296
  /*
 
297
   * Specifying both a transaction ID and a start position
 
298
   * is not logical.
 
299
   */
 
300
  if (vm.count("start-pos") && vm.count("transaction-id"))
 
301
  {
 
302
    cerr << _("Cannot use --start-pos and --transaction-id together\n");
 
303
    return -1;
 
304
  }
 
305
 
 
306
  bool do_checksum= vm.count("checksum") ? true : false;
 
307
  bool ignore_events= vm.count("ignore-events") ? true : false;
 
308
  bool print_as_raw= vm.count("raw") ? true : false;
 
309
 
 
310
  string filename= vm["input-file"].as< vector<string> >()[0];
 
311
  int file= open(filename.c_str(), O_RDONLY);
 
312
  if (file == -1)
 
313
  {
 
314
    cerr << _("Cannot open file: ") << filename << endl;
120
315
    return -1;
121
316
  }
122
317
 
123
318
  message::Transaction transaction;
124
 
 
125
 
  file= open(argv[1], O_RDONLY);
126
 
  if (file == -1)
127
 
  {
128
 
    fprintf(stderr, _("Cannot open file: %s\n"), argv[1]);
129
 
    return -1;
130
 
  }
131
 
 
132
 
  bool do_checksum= false;
133
 
 
134
 
  if (argc == 3)
135
 
  {
136
 
    string checksum_arg(argv[2]);
137
 
    transform(checksum_arg.begin(), checksum_arg.end(), checksum_arg.begin(), ::tolower);
138
 
 
139
 
    if ("--checksum" == checksum_arg)
140
 
      do_checksum= true;
141
 
  }
 
319
  message::TransactionManager trx_mgr;
142
320
 
143
321
  protobuf::io::ZeroCopyInputStream *raw_input= new protobuf::io::FileInputStream(file);
144
 
  protobuf::io::CodedInputStream *coded_input= new protobuf::io::CodedInputStream(raw_input);
 
322
 
 
323
  /* Skip ahead to user supplied position */
 
324
  if (opt_start_pos)
 
325
  {
 
326
    if (not raw_input->Skip(opt_start_pos))
 
327
    {
 
328
      cerr << _("Could not skip to position ") << opt_start_pos
 
329
           << _(" in file ") << filename << endl;
 
330
      exit(-1);
 
331
    }
 
332
  }
145
333
 
146
334
  char *buffer= NULL;
147
335
  char *temp_buffer= NULL;
151
339
  bool result= true;
152
340
  uint32_t message_type= 0;
153
341
 
154
 
  /* Read in the length of the command */
155
 
  while (result == true && 
156
 
         coded_input->ReadLittleEndian32(&message_type) == true &&
157
 
         coded_input->ReadLittleEndian32(&length) == true)
 
342
  while (result == true)
158
343
  {
 
344
   /*
 
345
     * Odd thing to note about using CodedInputStream: This class wasn't
 
346
     * intended to read large amounts of GPB messages. It has an upper
 
347
     * limit on the number of bytes it will read (see Protobuf docs for
 
348
     * this class for more info). A warning will be produced as you
 
349
     * get close to this limit. Since this is a pretty lightweight class,
 
350
     * we should be able to simply create a new one for each message we
 
351
     * want to read.
 
352
     */
 
353
    protobuf::io::CodedInputStream coded_input(raw_input);
 
354
 
 
355
    /* Read in the type and length of the command */
 
356
    if (not coded_input.ReadLittleEndian32(&message_type) ||
 
357
        not coded_input.ReadLittleEndian32(&length))
 
358
    {
 
359
      break;  /* EOF */
 
360
    }
 
361
 
159
362
    if (message_type != ReplicationServices::TRANSACTION)
160
363
    {
161
 
      fprintf(stderr, _("Found a non-transaction message in log.  Currently, not supported.\n"));
162
 
      exit(1);
 
364
      cerr << _("Found a non-transaction message in log.  Currently, not supported.\n");
 
365
      exit(-1);
163
366
    }
164
367
 
165
368
    if (length > INT_MAX)
166
369
    {
167
 
      fprintf(stderr, _("Attempted to read record bigger than INT_MAX\n"));
168
 
      exit(1);
 
370
      cerr << _("Attempted to read record bigger than INT_MAX\n");
 
371
      exit(-1);
169
372
    }
170
373
 
171
374
    if (buffer == NULL)
172
375
    {
173
 
      /* 
 
376
      /*
174
377
       * First time around...just malloc the length.  This block gets rid
175
378
       * of a GCC warning about uninitialized temp_buffer.
176
379
       */
184
387
 
185
388
    if (temp_buffer == NULL)
186
389
    {
187
 
      fprintf(stderr, _("Memory allocation failure trying to allocate %" PRIu64 " bytes.\n"),
188
 
              static_cast<uint64_t>(length));
 
390
      cerr << _("Memory allocation failure trying to allocate ") << length << _(" bytes\n");
189
391
      break;
190
392
    }
191
393
    else
192
394
      buffer= temp_buffer;
193
395
 
194
396
    /* Read the Command */
195
 
    result= coded_input->ReadRaw(buffer, (int) length);
 
397
    result= coded_input.ReadRaw(buffer, (int) length);
196
398
    if (result == false)
197
399
    {
198
 
      fprintf(stderr, _("Could not read transaction message.\n"));
199
 
      fprintf(stderr, _("GPB ERROR: %s.\n"), strerror(errno));
 
400
      char errmsg[STRERROR_MAX];
 
401
      strerror_r(errno, errmsg, sizeof(errmsg));
 
402
      cerr << _("Could not read transaction message.\n");
 
403
      cerr << _("GPB ERROR: ") << errmsg << endl;;
200
404
      string hexdump;
201
405
      hexdump.reserve(length * 4);
202
406
      bytesToHexdumpFormat(hexdump, reinterpret_cast<const unsigned char *>(buffer), length);
203
 
      fprintf(stderr, _("HEXDUMP:\n\n%s\n"), hexdump.c_str());
 
407
      cerr << _("HEXDUMP:\n\n") << hexdump << endl;
204
408
      break;
205
409
    }
206
410
 
207
411
    result= transaction.ParseFromArray(buffer, static_cast<int32_t>(length));
208
412
    if (result == false)
209
413
    {
210
 
      fprintf(stderr, _("Unable to parse command. Got error: %s.\n"), transaction.InitializationErrorString().c_str());
 
414
      cerr << _("Unable to parse command. Got error: ")
 
415
           << transaction.InitializationErrorString() << endl;
211
416
      if (buffer != NULL)
212
417
      {
213
418
        string hexdump;
214
419
        hexdump.reserve(length * 4);
215
420
        bytesToHexdumpFormat(hexdump, reinterpret_cast<const unsigned char *>(buffer), length);
216
 
        fprintf(stderr, _("HEXDUMP:\n\n%s\n"), hexdump.c_str());
 
421
        cerr <<  _("HEXDUMP:\n\n") << hexdump << endl;
217
422
      }
218
423
      break;
219
424
    }
220
425
 
221
 
    /* Print the transaction */
222
 
    printTransaction(transaction);
 
426
    const message::TransactionContext trx= transaction.transaction_context();
 
427
    uint64_t transaction_id= trx.transaction_id();
 
428
 
 
429
    /*
 
430
     * If we are given a transaction ID, we only look for that one and
 
431
     * print it out.
 
432
     */
 
433
    if (vm.count("transaction-id"))
 
434
    {
 
435
      if (opt_transaction_id == transaction_id)
 
436
      {
 
437
        printTransaction(transaction, ignore_events, print_as_raw);
 
438
      }
 
439
      else
 
440
      {
 
441
        /* Need to get the checksum bytes out of stream */
 
442
        coded_input.ReadLittleEndian32(&checksum);
 
443
        previous_length = length;
 
444
        continue;
 
445
      }
 
446
    }
 
447
 
 
448
    /*
 
449
     * No transaction ID given, so process all messages.
 
450
     */
 
451
    else
 
452
    {
 
453
      if (not isEndTransaction(transaction))
 
454
      {
 
455
        trx_mgr.store(transaction);
 
456
      }
 
457
      else
 
458
      {
 
459
        /*
 
460
         * If there are any previous Transaction messages for this transaction,
 
461
         * store this one, then output all of them together.
 
462
         */
 
463
        if (trx_mgr.contains(transaction_id))
 
464
        {
 
465
          trx_mgr.store(transaction);
 
466
 
 
467
          uint32_t size= trx_mgr.getTransactionBufferSize(transaction_id);
 
468
          uint32_t idx= 0;
 
469
 
 
470
          while (idx != size)
 
471
          {
 
472
            message::Transaction new_trx;
 
473
            trx_mgr.getTransactionMessage(new_trx, transaction_id, idx);
 
474
            printTransaction(new_trx, ignore_events, print_as_raw);
 
475
            idx++;
 
476
          }
 
477
 
 
478
          /* No longer need this transaction */
 
479
          trx_mgr.remove(transaction_id);
 
480
        }
 
481
        else
 
482
        {
 
483
          printTransaction(transaction, ignore_events, print_as_raw);
 
484
        }
 
485
      }
 
486
    } /* end ! vm.count("transaction-id") */
223
487
 
224
488
    /* Skip 4 byte checksum */
225
 
    coded_input->ReadLittleEndian32(&checksum);
 
489
    coded_input.ReadLittleEndian32(&checksum);
226
490
 
227
491
    if (do_checksum)
228
492
    {
229
493
      if (checksum != drizzled::algorithm::crc32(buffer, static_cast<size_t>(length)))
230
494
      {
231
 
        fprintf(stderr, _("Checksum failed. Wanted %" PRIu32 " got %" PRIu32 "\n"), checksum, drizzled::algorithm::crc32(buffer, static_cast<size_t>(length)));
 
495
        cerr << _("Checksum failed. Wanted ")
 
496
             << checksum
 
497
             << _(" got ")
 
498
             << drizzled::algorithm::crc32(buffer, static_cast<size_t>(length))
 
499
             << endl;
232
500
      }
233
501
    }
234
502
 
235
503
    previous_length= length;
236
 
  }
 
504
  } /* end while */
 
505
 
237
506
  if (buffer)
238
507
    free(buffer);
239
 
  
240
 
  delete coded_input;
 
508
 
241
509
  delete raw_input;
242
510
 
243
511
  return (result == true ? 0 : 1);