~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/transaction_log/transaction_log.cc

  • Committer: Monty Taylor
  • Date: 2010-03-25 22:03:21 UTC
  • mto: This revision was merged to the branch mainline in revision 1408.
  • Revision ID: mordred@inaugust.com-20100325220321-bf5vs8tryb4a8ssh
Updated the search for libmemcached.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
 
 *  Copyright (C) 2008-2009 Sun Microsystems, Inc.
5
 
 *  Copyright (C) 2010 Jay Pipes <jaypipes@gmail.com>
 
4
 *  Copyright (C) 2008-2009 Sun Microsystems
 
5
 *  Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
6
6
 *
7
7
 *  Authors:
8
8
 *
9
 
 *    Jay Pipes <jaypipes@gmail.com.com>
 
9
 *  Jay Pipes <jaypipes@gmail.com.com>
10
10
 *
11
11
 *  This program is free software; you can redistribute it and/or modify
12
12
 *  it under the terms of the GNU General Public License as published by
30
30
 *
31
31
 * @details
32
32
 *
33
 
 * Currently, the transaction log file uses a simple, single-file, append-only
34
 
 * format.
 
33
 * Currently, the log file uses this implementation:
35
34
 *
36
35
 * We have an atomic off_t called log_offset which keeps track of the 
37
 
 * offset into the log file for writing the next log entry.  The log
38
 
 * entries are written, one after the other, in the following way:
39
 
 *
40
 
 * <pre>
41
 
 * --------------------------------------
42
 
 * |<- 4 bytes ->|<- # Bytes of Entry ->|
43
 
 * --------------------------------------
44
 
 * |  Entry Type |  Serialized Entry    |
45
 
 * --------------------------------------
46
 
 * </pre>
47
 
 *
48
 
 * The Entry Type is an integer defined as an enumeration in the 
49
 
 * /drizzled/message/transaction.proto file called TransactionLogEntry::Type.
50
 
 *
51
 
 * Each transaction log entry type is written to the log differently.  Here,
52
 
 * we cover the format of each log entry type.
53
 
 *
54
 
 * Committed and Prepared Transaction Log Entries
55
 
 * -----------------------------------------------
56
 
 * 
57
 
 * <pre>
58
 
 * ------------------------------------------------------------------
59
 
 * |<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
60
 
 * ------------------------------------------------------------------
61
 
 * |   Length    |   Serialized Transaction Message   |   Checksum  |
62
 
 * ------------------------------------------------------------------
63
 
 * </pre>
 
36
 * offset into the log file for writing the next Transaction.
 
37
 *
 
38
 * We write Transaction message encapsulated in an 8-byte length/type header and a
 
39
 * 4-byte checksum trailer.
 
40
 *
 
41
 * When writing a Transaction to the log, we calculate the length of the 
 
42
 * Transaction to be written.  We then increment log_offset by the length
 
43
 * of the Transaction plus 2 * sizeof(uint32_t) plus sizeof(uint32_t) and store 
 
44
 * this new offset in a local off_t called cur_offset (see TransactionLog::apply().  
 
45
 * This compare and set is done in an atomic instruction.
 
46
 *
 
47
 * We then adjust the local off_t (cur_offset) back to the original
 
48
 * offset by subtracting the length and sizeof(uint32_t) and sizeof(uint32_t).
 
49
 *
 
50
 * We then first write a 64-bit length and then the serialized transaction/transaction
 
51
 * and optional checksum to our log file at our local cur_offset.
 
52
 *
 
53
 * --------------------------------------------------------------------------------
 
54
 * |<- 4 bytes ->|<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
 
55
 * --------------------------------------------------------------------------------
 
56
 * |  Msg Type   |   Length    |   Serialized Transaction Message   |   Checksum  |
 
57
 * --------------------------------------------------------------------------------
64
58
 *
65
59
 * @todo
66
60
 *
83
77
#include <drizzled/internal/my_sys.h> /* for internal::my_sync */
84
78
#include <drizzled/errmsg_print.h>
85
79
#include <drizzled/gettext.h>
86
 
#include <drizzled/message/transaction.pb.h>
87
 
#include <drizzled/transaction_services.h>
88
 
#include <drizzled/algorithm/crc32.h>
89
 
 
90
 
#include <google/protobuf/io/coded_stream.h>
91
80
 
92
81
using namespace std;
93
82
using namespace drizzled;
94
 
using namespace google;
95
83
 
96
84
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
97
85
 
98
86
TransactionLog::TransactionLog(const string in_log_file_path,
99
 
                               uint32_t in_flush_frequency,
100
 
                               bool in_do_checksum) : 
 
87
                               uint32_t in_sync_method) : 
101
88
    state(OFFLINE),
102
89
    log_file_path(in_log_file_path),
103
90
    has_error(false),
104
91
    error_message(),
105
 
    flush_frequency(in_flush_frequency),
106
 
    do_checksum(in_do_checksum)
 
92
    sync_method(in_sync_method)
107
93
{
108
94
  /* Setup our log file and determine the next write offset... */
109
95
  log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
110
96
  if (log_file == -1)
111
97
  {
112
 
    char errmsg[STRERROR_MAX];
113
 
    strerror_r(errno, errmsg, sizeof(errmsg));
114
98
    error_message.assign(_("Failed to open transaction log file "));
115
99
    error_message.append(log_file_path);
116
100
    error_message.append("  Got error: ");
117
 
    error_message.append(errmsg);
 
101
    error_message.append(strerror(errno));
118
102
    error_message.push_back('\n');
119
103
    has_error= true;
120
104
    return;
140
124
  state= ONLINE;
141
125
}
142
126
 
143
 
uint8_t *TransactionLog::packTransactionIntoLogEntry(const message::Transaction &trx,
144
 
                                                     uint8_t *buffer,
145
 
                                                     uint32_t *checksum_out)
 
127
TransactionLog::~TransactionLog()
146
128
{
147
 
  uint8_t *orig_buffer= buffer;
148
 
  size_t message_byte_length= trx.ByteSize();
149
 
 
150
 
  /*
151
 
   * Write the header information, which is the message type and
152
 
   * the length of the transaction message into the buffer
153
 
   */
154
 
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
155
 
      static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
156
 
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
157
 
      static_cast<uint32_t>(message_byte_length), buffer);
158
 
  
159
 
  /*
160
 
   * Now write the serialized transaction message, followed
161
 
   * by the optional checksum into the buffer.
162
 
   */
163
 
  buffer= trx.SerializeWithCachedSizesToArray(buffer);
164
 
 
165
 
  if (do_checksum)
 
129
  /* Clear up any resources we've consumed */
 
130
  if (log_file != -1)
166
131
  {
167
 
    *checksum_out= drizzled::algorithm::crc32(
168
 
        reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
 
132
    (void) close(log_file);
169
133
  }
170
 
  else
171
 
    *checksum_out= 0;
172
 
 
173
 
  /* We always write in network byte order */
174
 
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(*checksum_out, buffer);
175
 
  /* Reset the pointer back to its original location... */
176
 
  buffer= orig_buffer;
177
 
  return orig_buffer;
178
134
}
179
135
 
180
136
off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
186
142
   */
187
143
  off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
188
144
 
 
145
  /*
 
146
   * We adjust cur_offset back to the original log_offset before
 
147
   * the increment above...
 
148
   */
 
149
  cur_offset-= static_cast<off_t>(data_length);
 
150
 
189
151
  /* 
190
152
   * Quick safety...if an error occurs above in another writer, the log 
191
153
   * file will be in a crashed state.
209
171
 
210
172
  if (unlikely(written != static_cast<ssize_t>(data_length)))
211
173
  {
212
 
    char errmsg[STRERROR_MAX];
213
 
    strerror_r(errno, errmsg, sizeof(errmsg));
214
 
    errmsg_printf(error::ERROR, 
 
174
    errmsg_printf(ERRMSG_LVL_ERROR, 
215
175
                  _("Failed to write full size of log entry.  Tried to write %" PRId64
216
176
                    " bytes at offset %" PRId64 ", but only wrote %" PRId32 " bytes.  Error: %s\n"), 
217
177
                  static_cast<int64_t>(data_length),
218
178
                  static_cast<int64_t>(cur_offset),
219
 
                  static_cast<int32_t>(written), 
220
 
                  errmsg);
 
179
                  static_cast<int64_t>(written), 
 
180
                  strerror(errno));
221
181
    state= CRASHED;
222
182
    /* 
223
183
     * Reset the log's offset in case we want to produce a decent error message including
230
190
 
231
191
  if (unlikely(error_code != 0))
232
192
  {
233
 
    sql_perror(_("Failed to sync log file."));
 
193
    errmsg_printf(ERRMSG_LVL_ERROR, 
 
194
                  _("Failed to sync log file. Got error: %s\n"), 
 
195
                  strerror(errno));
234
196
  }
235
 
 
236
197
  return cur_offset;
237
198
}
238
199
 
239
200
int TransactionLog::syncLogFile()
240
201
{
241
 
  switch (flush_frequency)
 
202
  switch (sync_method)
242
203
  {
243
 
  case FLUSH_FREQUENCY_EVERY_WRITE:
 
204
  case SYNC_METHOD_EVERY_WRITE:
244
205
    return internal::my_sync(log_file, 0);
245
 
  case FLUSH_FREQUENCY_EVERY_SECOND:
 
206
  case SYNC_METHOD_EVERY_SECOND:
246
207
    {
247
208
      time_t now_time= time(NULL);
248
209
      if (last_sync_time <= (now_time - 1))
252
213
      }
253
214
      return 0;
254
215
    }
255
 
  case FLUSH_FREQUENCY_OS:
 
216
  case SYNC_METHOD_OS:
256
217
  default:
257
218
    return 0;
258
219
  }
311
272
{
312
273
  return error_message;
313
274
}
314
 
 
315
 
size_t TransactionLog::getLogEntrySize(const message::Transaction &trx)
316
 
{
317
 
  return trx.ByteSize() + HEADER_TRAILER_BYTES;
318
 
}