~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/transaction_log/transaction_log_reader.cc

  • Committer: Jay Pipes
  • Date: 2009-10-07 23:59:47 UTC
  • mto: (1234.1.1 push) (1237.2.10 push)
  • mto: This revision was merged to the branch mainline in revision 1193.
  • Revision ID: jpipes@serialcoder-20091007235947-18simrecnzwv8t1q
Phase 2 new replication work:

* Removes old replication.proto file, old command_transform library
* Removes use of korr.h macro calls in favor of GPB's CodedOutputStream
  API.
* Updates transaction_log, default_replicator, and filtered_replicator module
  to use new Transaction message.
* Updates ReplicationServices to construct the new Transaction messages and
  associated Statement sub-messages
* Corrects transaction boundaries.  AUTOCOMMIT now works properly, and I have
  added a new test case to verify AUTOCOMMIT variable modification of the way
  in which Transaction messages are bundled up and sent across to replicators.

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
/**
26
26
 * @file
27
27
 *
28
 
 * Implementation of a command reader for the command log.
 
28
 * Implementation of a transaction reader for the transaction log.
29
29
 *
30
30
 * @details
31
31
 *
32
32
 * This is currently an extremely simple implementation which
33
 
 * reads through the command log file one message at a time, using the
 
33
 * reads through the transaction log file one message at a time, using the
34
34
 * length-coded bytes to skip through the log.  Once it finds the message
35
35
 * which corresponds to the transaction id the caller to read() is looking
36
36
 * for, it copies the message into the supplied pointer and returns true.
41
41
 * the log file(s)
42
42
 */
43
43
 
44
 
#include "command_log_reader.h"
 
44
#include "transaction_log_reader.h"
45
45
 
46
46
#include <drizzled/gettext.h>
47
 
#include <drizzled/message/replication.pb.h>
 
47
#include <drizzled/message/transaction.pb.h>
48
48
 
49
49
#include <google/protobuf/io/zero_copy_stream_impl.h>
50
50
#include <drizzled/hash/crc32.h>
53
53
using namespace drizzled;
54
54
using namespace google;
55
55
 
56
 
bool CommandLogReader::read(const ReplicationServices::GlobalTransactionId &to_read_trx_id, 
57
 
                            message::Command *to_fill)
 
56
bool TransactionLogReader::read(const ReplicationServices::GlobalTransactionId &to_read_trx_id, 
 
57
                            message::Transaction *to_fill)
58
58
{
59
59
  /* 
60
60
   * We ask the log to give us the log file containing the
61
 
   * command message with the needed transaction id, then
 
61
   * transaction message with the needed transaction id, then
62
62
   * we read into the log file to obtain the message, and 
63
 
   * fill the supplied pointer to Command message from the
 
63
   * fill the supplied pointer to Transaction message from the
64
64
   * raw data in the log file.
65
65
   */
66
66
  string log_filename_to_read;
74
74
  else
75
75
  {
76
76
    protobuf::io::FileInputStream *log_file_stream;
77
 
    message::Command tmp_command; /* Used to check trx id... */
 
77
    message::Transaction tmp_transaction; /* Used to check trx id... */
78
78
    string checksum_buffer; /* Buffer we use for buffering serialized messages for checksumming */
79
79
 
80
80
    unsigned char coded_length[8]; /* Length header bytes in network byte order */
81
81
    unsigned char coded_checksum[4]; /* Checksum trailer bytes in network byte order */
82
 
    uint64_t length= 0; /* The length of the command to follow in stream */
 
82
    uint64_t length= 0; /* The length of the transaction to follow in stream */
83
83
    uint32_t checksum= 0; /* The checksum sent in the wire */
84
84
    ssize_t read_bytes; /* Number bytes read during pread() calls */
85
85
 
91
91
    if (log_file == -1)
92
92
    {
93
93
      errmsg_printf(ERRMSG_LVL_ERROR, 
94
 
                    _("Failed to open command log file %s.  Got error: %s\n"),
 
94
                    _("Failed to open transaction log file %s.  Got error: %s\n"),
95
95
                    log_filename_to_read.c_str(),
96
96
                    strerror(errno));
97
97
      return false;
101
101
 
102
102
    while (true)
103
103
    {
104
 
      /* Read in the length of the command */
 
104
      /* Read in the length of the transaction */
105
105
      do
106
106
      {
107
107
        read_bytes= pread(log_file, coded_length, sizeof(uint64_t), current_offset);
119
119
      }
120
120
      if (read_bytes == 0)
121
121
      {
122
 
        /* End of file and did not find the command, so return false */
 
122
        /* End of file and did not find the transaction, so return false */
123
123
        result= false;
124
124
        break;
125
125
      }
127
127
      /* We use korr.h macros when writing and must do the same when reading... */
128
128
      length= uint8korr(coded_length);
129
129
 
130
 
      /* Skip to the start of the next Command */
 
130
      /* Skip to the start of the next Transaction */
131
131
      log_file_stream->Skip(8);
132
132
 
133
 
      if (unlikely(tmp_command.ParseFromZeroCopyStream(log_file_stream) == false))
 
133
      if (unlikely(tmp_transaction.ParseFromZeroCopyStream(log_file_stream) == false))
134
134
      {
135
 
        tmp_command.Clear();
 
135
        tmp_transaction.Clear();
136
136
        errmsg_printf(ERRMSG_LVL_ERROR,
137
 
                      _("Failed to parse command message at offset %" PRId64 ".  Got error: %s\n"), 
 
137
                      _("Failed to parse transaction message at offset %" PRId64 ".  Got error: %s\n"), 
138
138
                      (int64_t) current_offset, 
139
 
                      tmp_command.InitializationErrorString().c_str());
 
139
                      tmp_transaction.InitializationErrorString().c_str());
140
140
        result= false;
141
141
        break;
142
142
      }
143
143
      else
144
144
      {
145
145
        /* Cool, message was read.  Check the trx id */
146
 
        if (tmp_command.transaction_context().transaction_id() == to_read_trx_id)
 
146
        if (tmp_transaction.transaction_context().transaction_id() == to_read_trx_id)
147
147
        {
148
148
          /* Found what we were looking for...copy to the pointer we should fill */
149
 
          to_fill->CopyFrom(tmp_command);
 
149
          to_fill->CopyFrom(tmp_transaction);
150
150
          break;
151
151
        }
152
152
      }
156
156
 
157
157
      /* 
158
158
       * We now read 4 bytes containing the (possible) checksum of the
159
 
       * just-read command message.  If the result is not zero, then a
 
159
       * just-read transaction message.  If the result is not zero, then a
160
160
       * checksum was written...
161
161
       */
162
162
      do
179
179
 
180
180
      if (checksum != 0)
181
181
      {
182
 
        tmp_command.SerializeToString(&checksum_buffer);
 
182
        tmp_transaction.SerializeToString(&checksum_buffer);
183
183
        uint32_t recalc_checksum= drizzled::hash::crc32(checksum_buffer.c_str(), static_cast<size_t>(length));
184
184
        if (unlikely(recalc_checksum != checksum))
185
185
        {