~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/transaction_log/transaction_log_reader.cc

Added code necessary for building plugins dynamically.
Merged in changes from lifeless to allow autoreconf to work.
Touching plugin.ini files now triggers a rebuid - so config/autorun.sh is no
longer required to be run after touching those.
Removed the duplicate plugin names - also removed the issue that getting them
different would silently fail weirdly later.

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
/**
26
26
 * @file
27
27
 *
28
 
 * Implementation of a command reader for the command log.
 
28
 * Implementation of a transaction reader for the transaction log.
29
29
 *
30
30
 * @details
31
31
 *
32
32
 * This is currently an extremely simple implementation which
33
 
 * reads through the command log file one message at a time, using the
 
33
 * reads through the transaction log file one message at a time, using the
34
34
 * length-coded bytes to skip through the log.  Once it finds the message
35
35
 * which corresponds to the transaction id the caller to read() is looking
36
36
 * for, it copies the message into the supplied pointer and returns true.
42
42
 */
43
43
 
44
44
#include <drizzled/server_includes.h>
45
 
#include "command_log_reader.h"
 
45
#include "transaction_log_reader.h"
46
46
 
47
47
#include <drizzled/gettext.h>
48
 
#include <drizzled/message/replication.pb.h>
 
48
#include <drizzled/message/transaction.pb.h>
49
49
 
50
50
#include <google/protobuf/io/zero_copy_stream_impl.h>
51
 
#include <drizzled/crc32.h>
 
51
#include <google/protobuf/io/coded_stream.h>
 
52
#include <drizzled/hash/crc32.h>
52
53
 
53
54
using namespace std;
54
55
using namespace drizzled;
55
56
using namespace google;
56
57
 
57
 
bool CommandLogReader::read(const ReplicationServices::GlobalTransactionId &to_read_trx_id, 
58
 
                            message::Command *to_fill)
 
58
bool TransactionLogReader::read(const ReplicationServices::GlobalTransactionId &to_read_trx_id, 
 
59
                            message::Transaction *to_fill)
59
60
{
60
61
  /* 
61
62
   * We ask the log to give us the log file containing the
62
 
   * command message with the needed transaction id, then
 
63
   * transaction message with the needed transaction id, then
63
64
   * we read into the log file to obtain the message, and 
64
 
   * fill the supplied pointer to Command message from the
 
65
   * fill the supplied pointer to Transaction message from the
65
66
   * raw data in the log file.
66
67
   */
67
68
  string log_filename_to_read;
68
69
  bool log_file_found= log.findLogFilenameContainingTransactionId(to_read_trx_id, log_filename_to_read);
69
70
  bool result= true;
 
71
  bool do_checksum= false;
70
72
 
71
73
  if (unlikely(! log_file_found))
72
74
  {
74
76
  }
75
77
  else
76
78
  {
77
 
    protobuf::io::FileInputStream *log_file_stream;
78
 
    message::Command tmp_command; /* Used to check trx id... */
79
 
    string checksum_buffer; /* Buffer we use for buffering serialized messages for checksumming */
80
 
 
81
 
    unsigned char coded_length[8]; /* Length header bytes in network byte order */
82
 
    unsigned char coded_checksum[4]; /* Checksum trailer bytes in network byte order */
83
 
    uint64_t length= 0; /* The length of the command to follow in stream */
84
 
    uint32_t checksum= 0; /* The checksum sent in the wire */
85
 
    ssize_t read_bytes; /* Number bytes read during pread() calls */
86
 
 
87
 
    off_t current_offset= 0;
88
 
 
89
79
    /* Open the log file and read through the log until the transaction ID is found */
90
80
    int log_file= open(log_filename_to_read.c_str(), O_RDONLY | O_NONBLOCK);
91
81
 
92
82
    if (log_file == -1)
93
83
    {
94
84
      errmsg_printf(ERRMSG_LVL_ERROR, 
95
 
                    _("Failed to open command log file %s.  Got error: %s\n"),
 
85
                    _("Failed to open transaction log file %s.  Got error: %s\n"),
96
86
                    log_filename_to_read.c_str(),
97
87
                    strerror(errno));
98
88
      return false;
99
89
    }
100
90
 
101
 
    log_file_stream= new protobuf::io::FileInputStream(log_file); /* Zero-copy stream implementation */
102
 
 
103
 
    while (true)
 
91
    protobuf::io::ZeroCopyInputStream *raw_input= new protobuf::io::FileInputStream(log_file);
 
92
    protobuf::io::CodedInputStream *coded_input= new protobuf::io::CodedInputStream(raw_input);
 
93
 
 
94
    char *buffer= NULL;
 
95
    char *temp_buffer= NULL;
 
96
    uint32_t length= 0;
 
97
    uint32_t previous_length= 0;
 
98
    uint32_t checksum= 0;
 
99
 
 
100
    message::Transaction transaction;
 
101
 
 
102
    /* Read in the length of the command */
 
103
    while (result == true && coded_input->ReadLittleEndian32(&length) == true)
104
104
    {
105
 
      /* Read in the length of the command */
106
 
      do
107
 
      {
108
 
        read_bytes= pread(log_file, coded_length, sizeof(uint64_t), current_offset);
109
 
      }
110
 
      while (read_bytes == -1 && errno == EINTR); /* Just retry the call when interrupted by a signal... */
111
 
 
112
 
      if (unlikely(read_bytes < 0))
113
 
      {
114
 
        errmsg_printf(ERRMSG_LVL_ERROR,
115
 
                      _("Failed to read length header at offset %" PRId64 ".  Got error: %s\n"), 
116
 
                      (int64_t) current_offset, 
117
 
                      strerror(errno));
118
 
        result= false;
119
 
        break;
120
 
      }
121
 
      if (read_bytes == 0)
122
 
      {
123
 
        /* End of file and did not find the command, so return false */
124
 
        result= false;
125
 
        break;
126
 
      }
127
 
      
128
 
      /* We use korr.h macros when writing and must do the same when reading... */
129
 
      length= uint8korr(coded_length);
130
 
 
131
 
      /* Skip to the start of the next Command */
132
 
      log_file_stream->Skip(8);
133
 
 
134
 
      if (unlikely(tmp_command.ParseFromZeroCopyStream(log_file_stream) == false))
135
 
      {
136
 
        tmp_command.Clear();
137
 
        errmsg_printf(ERRMSG_LVL_ERROR,
138
 
                      _("Failed to parse command message at offset %" PRId64 ".  Got error: %s\n"), 
139
 
                      (int64_t) current_offset, 
140
 
                      tmp_command.InitializationErrorString().c_str());
141
 
        result= false;
 
105
      if (length > INT_MAX)
 
106
      {
 
107
        fprintf(stderr, _("Attempted to read record bigger than INT_MAX\n"));
 
108
        exit(1);
 
109
      }
 
110
 
 
111
      if (buffer == NULL)
 
112
      {
 
113
        /* 
 
114
        * First time around...just malloc the length.  This block gets rid
 
115
        * of a GCC warning about uninitialized temp_buffer.
 
116
        */
 
117
        temp_buffer= (char *) malloc(static_cast<size_t>(length));
 
118
      }
 
119
      /* No need to allocate if we have a buffer big enough... */
 
120
      else if (length > previous_length)
 
121
      {
 
122
        temp_buffer= (char *) realloc(buffer, static_cast<size_t>(length));
 
123
      }
 
124
 
 
125
      if (temp_buffer == NULL)
 
126
      {
 
127
        fprintf(stderr, _("Memory allocation failure trying to allocate %" PRIu64 " bytes.\n"),
 
128
                static_cast<uint64_t>(length));
142
129
        break;
143
130
      }
144
131
      else
145
 
      {
146
 
        /* Cool, message was read.  Check the trx id */
147
 
        if (tmp_command.transaction_context().transaction_id() == to_read_trx_id)
148
 
        {
149
 
          /* Found what we were looking for...copy to the pointer we should fill */
150
 
          to_fill->CopyFrom(tmp_command);
151
 
          break;
152
 
        }
153
 
      }
154
 
 
155
 
      /* Keep the stream and the pread() calls in sync... */
156
 
      current_offset+= length;
157
 
 
158
 
      /* 
159
 
       * We now read 4 bytes containing the (possible) checksum of the
160
 
       * just-read command message.  If the result is not zero, then a
161
 
       * checksum was written...
162
 
       */
163
 
      do
164
 
      {
165
 
        read_bytes= pread(log_file, coded_checksum, sizeof(uint32_t), current_offset);
166
 
      }
167
 
      while (read_bytes == -1 && errno == EINTR); /* Just retry the call when interrupted by a signal... */
168
 
 
169
 
      if (unlikely(read_bytes < 0))
170
 
      {
171
 
        errmsg_printf(ERRMSG_LVL_ERROR, 
172
 
                      _("Failed to read checksum trailer at offset %" PRId64 ".  Got error: %s\n"), 
173
 
                      (int64_t) current_offset, 
174
 
                      strerror(errno));
175
 
        result= false;
176
 
        break;
177
 
      }
178
 
 
179
 
      checksum= uint4korr(coded_checksum);
180
 
 
181
 
      if (checksum != 0)
182
 
      {
183
 
        tmp_command.SerializeToString(&checksum_buffer);
184
 
        uint32_t recalc_checksum= hash_crc32(checksum_buffer.c_str(), static_cast<size_t>(length));
185
 
        if (unlikely(recalc_checksum != checksum))
186
 
        {
187
 
          errmsg_printf(ERRMSG_LVL_ERROR, _("Checksum FAILED!\n"), 
188
 
                        (int64_t) current_offset, 
189
 
                        strerror(errno));
190
 
          result= false;
191
 
          break;
192
 
        }
193
 
        checksum_buffer.clear();
194
 
      }
195
 
 
196
 
      /* Keep the stream and the pread() calls in sync... */
197
 
      current_offset+= sizeof(uint32_t);
 
132
        buffer= temp_buffer;
 
133
 
 
134
      /* Read the Command */
 
135
      result= coded_input->ReadRaw(buffer, length);
 
136
      if (result == false)
 
137
      {
 
138
        fprintf(stderr, _("Could not read transaction message.\n"));
 
139
        fprintf(stderr, _("GPB ERROR: %s.\n"), strerror(errno));
 
140
        fprintf(stderr, _("Raw buffer read: %s.\n"), buffer);
 
141
        break;
 
142
      }
 
143
 
 
144
      result= transaction.ParseFromArray(buffer, static_cast<int32_t>(length));
 
145
      if (result == false)
 
146
      {
 
147
        fprintf(stderr, _("Unable to parse transaction. Got error: %s.\n"), transaction.InitializationErrorString().c_str());
 
148
        if (buffer != NULL)
 
149
          fprintf(stderr, _("BUFFER: %s\n"), buffer);
 
150
        break;
 
151
      }
 
152
 
 
153
      /* Skip 4 byte checksum */
 
154
      coded_input->ReadLittleEndian32(&checksum);
 
155
 
 
156
      if (do_checksum)
 
157
      {
 
158
        if (checksum != drizzled::hash::crc32(buffer, static_cast<size_t>(length)))
 
159
        {
 
160
          fprintf(stderr, _("Checksum failed. Wanted %" PRIu32 " got %" PRIu32 "\n"), checksum, drizzled::hash::crc32(buffer, static_cast<size_t>(length)));
 
161
        }
 
162
      }
 
163
 
 
164
      /* Cool, message was read.  Check the trx id */
 
165
      if (transaction.transaction_context().transaction_id() == to_read_trx_id)
 
166
      {
 
167
        /* Found what we were looking for...copy to the pointer we should fill */
 
168
        to_fill->CopyFrom(transaction);
 
169
        break;
 
170
      }
 
171
 
 
172
      previous_length= length;
198
173
    }
 
174
    if (buffer)
 
175
      free(buffer);
 
176
    
 
177
    delete coded_input;
 
178
    delete raw_input;
199
179
 
200
 
    delete log_file_stream;
201
 
    close(log_file);
202
180
    return result;
203
181
  }
204
182
}