~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/transaction_log/transaction_log.cc

  • Committer: Stewart Smith
  • Date: 2010-11-03 03:27:09 UTC
  • mto: (1902.1.1 build) (1910.1.2 build)
  • mto: This revision was merged to the branch mainline in revision 1903.
  • Revision ID: stewart@flamingspork.com-20101103032709-oyvfrc6eb8fzj0mr
fix docs warning: docs/unlock.rst:2: (WARNING/2) Title underline too short.

Show diffs side-by-side

added added

removed removed

Lines of Context:
6
6
 *
7
7
 *  Authors:
8
8
 *
9
 
 *  Jay Pipes <jaypipes@gmail.com.com>
 
9
 *    Jay Pipes <jaypipes@gmail.com.com>
10
10
 *
11
11
 *  This program is free software; you can redistribute it and/or modify
12
12
 *  it under the terms of the GNU General Public License as published by
30
30
 *
31
31
 * @details
32
32
 *
33
 
 * Currently, the log file uses this implementation:
 
33
 * Currently, the transaction log file uses a simple, single-file, append-only
 
34
 * format.
34
35
 *
35
36
 * We have an atomic off_t called log_offset which keeps track of the 
36
 
 * offset into the log file for writing the next Transaction.
37
 
 *
38
 
 * We write Transaction message encapsulated in an 8-byte length/type header and a
39
 
 * 4-byte checksum trailer.
40
 
 *
41
 
 * When writing a Transaction to the log, we calculate the length of the 
42
 
 * Transaction to be written.  We then increment log_offset by the length
43
 
 * of the Transaction plus 2 * sizeof(uint32_t) plus sizeof(uint32_t) and store 
44
 
 * this new offset in a local off_t called cur_offset (see TransactionLog::apply().  
45
 
 * This compare and set is done in an atomic instruction.
46
 
 *
47
 
 * We then adjust the local off_t (cur_offset) back to the original
48
 
 * offset by subtracting the length and sizeof(uint32_t) and sizeof(uint32_t).
49
 
 *
50
 
 * We then first write a 64-bit length and then the serialized transaction/transaction
51
 
 * and optional checksum to our log file at our local cur_offset.
52
 
 *
53
 
 * --------------------------------------------------------------------------------
54
 
 * |<- 4 bytes ->|<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
55
 
 * --------------------------------------------------------------------------------
56
 
 * |  Msg Type   |   Length    |   Serialized Transaction Message   |   Checksum  |
57
 
 * --------------------------------------------------------------------------------
 
37
 * offset into the log file for writing the next log entry.  The log
 
38
 * entries are written, one after the other, in the following way:
 
39
 *
 
40
 * <pre>
 
41
 * --------------------------------------
 
42
 * |<- 4 bytes ->|<- # Bytes of Entry ->|
 
43
 * --------------------------------------
 
44
 * |  Entry Type |  Serialized Entry    |
 
45
 * --------------------------------------
 
46
 * </pre>
 
47
 *
 
48
 * The Entry Type is an integer defined as an enumeration in the 
 
49
 * /drizzled/message/transaction.proto file called TransactionLogEntry::Type.
 
50
 *
 
51
 * Each transaction log entry type is written to the log differently.  Here,
 
52
 * we cover the format of each log entry type.
 
53
 *
 
54
 * Committed and Prepared Transaction Log Entries
 
55
 * -----------------------------------------------
 
56
 * 
 
57
 * <pre>
 
58
 * ------------------------------------------------------------------
 
59
 * |<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
 
60
 * ------------------------------------------------------------------
 
61
 * |   Length    |   Serialized Transaction Message   |   Checksum  |
 
62
 * ------------------------------------------------------------------
 
63
 * </pre>
58
64
 *
59
65
 * @todo
60
66
 *
77
83
#include <drizzled/internal/my_sys.h> /* for internal::my_sync */
78
84
#include <drizzled/errmsg_print.h>
79
85
#include <drizzled/gettext.h>
 
86
#include <drizzled/message/transaction.pb.h>
 
87
#include <drizzled/transaction_services.h>
 
88
#include <drizzled/algorithm/crc32.h>
 
89
 
 
90
#include <google/protobuf/io/coded_stream.h>
80
91
 
81
92
using namespace std;
82
93
using namespace drizzled;
 
94
using namespace google;
83
95
 
84
96
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
85
97
 
86
98
TransactionLog::TransactionLog(const string in_log_file_path,
87
 
                               uint32_t in_sync_method) : 
 
99
                               uint32_t in_flush_frequency,
 
100
                               bool in_do_checksum) : 
88
101
    state(OFFLINE),
89
102
    log_file_path(in_log_file_path),
90
103
    has_error(false),
91
104
    error_message(),
92
 
    sync_method(in_sync_method)
 
105
    flush_frequency(in_flush_frequency),
 
106
    do_checksum(in_do_checksum)
93
107
{
94
108
  /* Setup our log file and determine the next write offset... */
95
109
  log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
96
110
  if (log_file == -1)
97
111
  {
 
112
    char errmsg[STRERROR_MAX];
 
113
    strerror_r(errno, errmsg, sizeof(errmsg));
98
114
    error_message.assign(_("Failed to open transaction log file "));
99
115
    error_message.append(log_file_path);
100
116
    error_message.append("  Got error: ");
101
 
    error_message.append(strerror(errno));
 
117
    error_message.append(errmsg);
102
118
    error_message.push_back('\n');
103
119
    has_error= true;
104
120
    return;
124
140
  state= ONLINE;
125
141
}
126
142
 
127
 
TransactionLog::~TransactionLog()
 
143
uint8_t *TransactionLog::packTransactionIntoLogEntry(const message::Transaction &trx,
 
144
                                                     uint8_t *buffer,
 
145
                                                     uint32_t *checksum_out)
128
146
{
129
 
  /* Clear up any resources we've consumed */
130
 
  if (log_file != -1)
 
147
  uint8_t *orig_buffer= buffer;
 
148
  size_t message_byte_length= trx.ByteSize();
 
149
 
 
150
  /*
 
151
   * Write the header information, which is the message type and
 
152
   * the length of the transaction message into the buffer
 
153
   */
 
154
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
 
155
      static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
 
156
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
 
157
      static_cast<uint32_t>(message_byte_length), buffer);
 
158
  
 
159
  /*
 
160
   * Now write the serialized transaction message, followed
 
161
   * by the optional checksum into the buffer.
 
162
   */
 
163
  buffer= trx.SerializeWithCachedSizesToArray(buffer);
 
164
 
 
165
  if (do_checksum)
131
166
  {
132
 
    (void) close(log_file);
 
167
    *checksum_out= drizzled::algorithm::crc32(
 
168
        reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
133
169
  }
 
170
  else
 
171
    *checksum_out= 0;
 
172
 
 
173
  /* We always write in network byte order */
 
174
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(*checksum_out, buffer);
 
175
  /* Reset the pointer back to its original location... */
 
176
  buffer= orig_buffer;
 
177
  return orig_buffer;
134
178
}
135
179
 
136
180
off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
142
186
   */
143
187
  off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
144
188
 
145
 
  /*
146
 
   * We adjust cur_offset back to the original log_offset before
147
 
   * the increment above...
148
 
   */
149
 
  cur_offset-= static_cast<off_t>(data_length);
150
 
 
151
189
  /* 
152
190
   * Quick safety...if an error occurs above in another writer, the log 
153
191
   * file will be in a crashed state.
171
209
 
172
210
  if (unlikely(written != static_cast<ssize_t>(data_length)))
173
211
  {
 
212
    char errmsg[STRERROR_MAX];
 
213
    strerror_r(errno, errmsg, sizeof(errmsg));
174
214
    errmsg_printf(ERRMSG_LVL_ERROR, 
175
215
                  _("Failed to write full size of log entry.  Tried to write %" PRId64
176
216
                    " bytes at offset %" PRId64 ", but only wrote %" PRId32 " bytes.  Error: %s\n"), 
177
217
                  static_cast<int64_t>(data_length),
178
218
                  static_cast<int64_t>(cur_offset),
179
219
                  static_cast<int64_t>(written), 
180
 
                  strerror(errno));
 
220
                  errmsg);
181
221
    state= CRASHED;
182
222
    /* 
183
223
     * Reset the log's offset in case we want to produce a decent error message including
190
230
 
191
231
  if (unlikely(error_code != 0))
192
232
  {
 
233
    char errmsg[STRERROR_MAX];
 
234
    strerror_r(errno, errmsg, sizeof(errmsg));
193
235
    errmsg_printf(ERRMSG_LVL_ERROR, 
194
236
                  _("Failed to sync log file. Got error: %s\n"), 
195
 
                  strerror(errno));
 
237
                  errmsg);
196
238
  }
197
239
  return cur_offset;
198
240
}
199
241
 
200
242
int TransactionLog::syncLogFile()
201
243
{
202
 
  switch (sync_method)
 
244
  switch (flush_frequency)
203
245
  {
204
 
  case SYNC_METHOD_EVERY_WRITE:
 
246
  case FLUSH_FREQUENCY_EVERY_WRITE:
205
247
    return internal::my_sync(log_file, 0);
206
 
  case SYNC_METHOD_EVERY_SECOND:
 
248
  case FLUSH_FREQUENCY_EVERY_SECOND:
207
249
    {
208
250
      time_t now_time= time(NULL);
209
251
      if (last_sync_time <= (now_time - 1))
213
255
      }
214
256
      return 0;
215
257
    }
216
 
  case SYNC_METHOD_OS:
 
258
  case FLUSH_FREQUENCY_OS:
217
259
  default:
218
260
    return 0;
219
261
  }
272
314
{
273
315
  return error_message;
274
316
}
 
317
 
 
318
size_t TransactionLog::getLogEntrySize(const message::Transaction &trx)
 
319
{
 
320
  return trx.ByteSize() + HEADER_TRAILER_BYTES;
 
321
}