~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/transaction_log/transaction_log.cc

Merge Jay

Show diffs side-by-side

added added

removed removed

Lines of Context:
6
6
 *
7
7
 *  Authors:
8
8
 *
9
 
 *  Jay Pipes <jaypipes@gmail.com.com>
 
9
 *    Jay Pipes <jaypipes@gmail.com.com>
10
10
 *
11
11
 *  This program is free software; you can redistribute it and/or modify
12
12
 *  it under the terms of the GNU General Public License as published by
30
30
 *
31
31
 * @details
32
32
 *
33
 
 * Currently, the log file uses this implementation:
 
33
 * Currently, the transaction log file uses a simple, single-file, append-only
 
34
 * format.
34
35
 *
35
36
 * We have an atomic off_t called log_offset which keeps track of the 
36
 
 * offset into the log file for writing the next Transaction.
37
 
 *
38
 
 * We write Transaction message encapsulated in an 8-byte length/type header and a
39
 
 * 4-byte checksum trailer.
40
 
 *
41
 
 * When writing a Transaction to the log, we calculate the length of the 
42
 
 * Transaction to be written.  We then increment log_offset by the length
43
 
 * of the Transaction plus 2 * sizeof(uint32_t) plus sizeof(uint32_t) and store 
44
 
 * this new offset in a local off_t called cur_offset (see TransactionLog::apply().  
45
 
 * This compare and set is done in an atomic instruction.
46
 
 *
47
 
 * We then adjust the local off_t (cur_offset) back to the original
48
 
 * offset by subtracting the length and sizeof(uint32_t) and sizeof(uint32_t).
49
 
 *
50
 
 * We then first write a 64-bit length and then the serialized transaction/transaction
51
 
 * and optional checksum to our log file at our local cur_offset.
52
 
 *
53
 
 * --------------------------------------------------------------------------------
54
 
 * |<- 4 bytes ->|<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
55
 
 * --------------------------------------------------------------------------------
56
 
 * |  Msg Type   |   Length    |   Serialized Transaction Message   |   Checksum  |
57
 
 * --------------------------------------------------------------------------------
 
37
 * offset into the log file for writing the next log entry.  The log
 
38
 * entries are written, one after the other, in the following way:
 
39
 *
 
40
 * <pre>
 
41
 * --------------------------------------
 
42
 * |<- 4 bytes ->|<- # Bytes of Entry ->|
 
43
 * --------------------------------------
 
44
 * |  Entry Type |  Serialized Entry    |
 
45
 * --------------------------------------
 
46
 * </pre>
 
47
 *
 
48
 * The Entry Type is an integer defined as an enumeration in the 
 
49
 * /drizzled/message/transaction.proto file called TransactionLogEntry::Type.
 
50
 *
 
51
 * Each transaction log entry type is written to the log differently.  Here,
 
52
 * we cover the format of each log entry type.
 
53
 *
 
54
 * Committed and Prepared Transaction Log Entries
 
55
 * -----------------------------------------------
 
56
 * 
 
57
 * <pre>
 
58
 * ------------------------------------------------------------------
 
59
 * |<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
 
60
 * ------------------------------------------------------------------
 
61
 * |   Length    |   Serialized Transaction Message   |   Checksum  |
 
62
 * ------------------------------------------------------------------
 
63
 * </pre>
58
64
 *
59
65
 * @todo
60
66
 *
77
83
#include <drizzled/internal/my_sys.h> /* for internal::my_sync */
78
84
#include <drizzled/errmsg_print.h>
79
85
#include <drizzled/gettext.h>
 
86
#include <drizzled/message/transaction.pb.h>
 
87
#include <drizzled/transaction_services.h>
 
88
#include <drizzled/algorithm/crc32.h>
 
89
 
 
90
#include <google/protobuf/io/coded_stream.h>
80
91
 
81
92
using namespace std;
82
93
using namespace drizzled;
 
94
using namespace google;
83
95
 
84
96
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
85
97
 
86
98
TransactionLog::TransactionLog(const string in_log_file_path,
87
 
                               uint32_t in_sync_method) : 
 
99
                               uint32_t in_sync_method,
 
100
                               bool in_do_checksum) : 
88
101
    state(OFFLINE),
89
102
    log_file_path(in_log_file_path),
90
103
    has_error(false),
91
104
    error_message(),
92
 
    sync_method(in_sync_method)
 
105
    sync_method(in_sync_method),
 
106
    do_checksum(in_do_checksum)
93
107
{
94
108
  /* Setup our log file and determine the next write offset... */
95
109
  log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
133
147
  }
134
148
}
135
149
 
 
150
uint8_t *TransactionLog::packTransactionIntoLogEntry(const message::Transaction &trx,
 
151
                                                     uint8_t *buffer,
 
152
                                                     uint32_t *checksum_out)
 
153
{
 
154
  uint8_t *orig_buffer= buffer;
 
155
  size_t message_byte_length= trx.ByteSize();
 
156
 
 
157
  /*
 
158
   * Write the header information, which is the message type and
 
159
   * the length of the transaction message into the buffer
 
160
   */
 
161
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
 
162
      static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
 
163
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
 
164
      static_cast<uint32_t>(message_byte_length), buffer);
 
165
  
 
166
  /*
 
167
   * Now write the serialized transaction message, followed
 
168
   * by the optional checksum into the buffer.
 
169
   */
 
170
  buffer= trx.SerializeWithCachedSizesToArray(buffer);
 
171
 
 
172
  if (do_checksum)
 
173
  {
 
174
    *checksum_out= drizzled::algorithm::crc32(
 
175
        reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
 
176
  }
 
177
  else
 
178
    *checksum_out= 0;
 
179
 
 
180
  /* We always write in network byte order */
 
181
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(*checksum_out, buffer);
 
182
  /* Reset the pointer back to its original location... */
 
183
  buffer= orig_buffer;
 
184
  return orig_buffer;
 
185
}
 
186
 
136
187
off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
137
188
{
138
189
  ssize_t written= 0;
142
193
   */
143
194
  off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
144
195
 
145
 
  /*
146
 
   * We adjust cur_offset back to the original log_offset before
147
 
   * the increment above...
148
 
   */
149
 
  cur_offset-= static_cast<off_t>(data_length);
150
 
 
151
196
  /* 
152
197
   * Quick safety...if an error occurs above in another writer, the log 
153
198
   * file will be in a crashed state.
243
288
    result= ftruncate(log_file, log_offset);
244
289
  }
245
290
  while (result == -1 && errno == EINTR);
 
291
  drizzled::TransactionServices::singleton().resetTransactionId();
246
292
}
247
293
 
248
294
bool TransactionLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,
272
318
{
273
319
  return error_message;
274
320
}
 
321
 
 
322
size_t TransactionLog::getLogEntrySize(const message::Transaction &trx)
 
323
{
 
324
  return trx.ByteSize() + HEADER_TRAILER_BYTES;
 
325
}