~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/transaction_log/transaction_log.cc

Merge trunk and resolve all conflicts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
4
 *  Copyright (C) 2008-2009 Sun Microsystems
5
 
 *  Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
6
5
 *
7
6
 *  Authors:
8
7
 *
9
 
 *    Jay Pipes <jaypipes@gmail.com.com>
 
8
 *  Jay Pipes <joinfu@sun.com>
10
9
 *
11
10
 *  This program is free software; you can redistribute it and/or modify
12
11
 *  it under the terms of the GNU General Public License as published by
26
25
/**
27
26
 * @file
28
27
 *
29
 
 * Defines the implementation of the transaction log file descriptor.
 
28
 * Defines the implementation of the default transaction log.
 
29
 *
 
30
 * @see drizzled/plugin/transaction_replicator.h
 
31
 * @see drizzled/plugin/transaction_applier.h
30
32
 *
31
33
 * @details
32
34
 *
33
 
 * Currently, the transaction log file uses a simple, single-file, append-only
34
 
 * format.
 
35
 * Currently, the log file uses this implementation:
35
36
 *
36
37
 * We have an atomic off_t called log_offset which keeps track of the 
37
 
 * offset into the log file for writing the next log entry.  The log
38
 
 * entries are written, one after the other, in the following way:
39
 
 *
40
 
 * <pre>
41
 
 * --------------------------------------
42
 
 * |<- 4 bytes ->|<- # Bytes of Entry ->|
43
 
 * --------------------------------------
44
 
 * |  Entry Type |  Serialized Entry    |
45
 
 * --------------------------------------
46
 
 * </pre>
47
 
 *
48
 
 * The Entry Type is an integer defined as an enumeration in the 
49
 
 * /drizzled/message/transaction.proto file called TransactionLogEntry::Type.
50
 
 *
51
 
 * Each transaction log entry type is written to the log differently.  Here,
52
 
 * we cover the format of each log entry type.
53
 
 *
54
 
 * Committed and Prepared Transaction Log Entries
55
 
 * -----------------------------------------------
56
 
 * 
57
 
 * <pre>
58
 
 * ------------------------------------------------------------------
59
 
 * |<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
60
 
 * ------------------------------------------------------------------
61
 
 * |   Length    |   Serialized Transaction Message   |   Checksum  |
62
 
 * ------------------------------------------------------------------
63
 
 * </pre>
 
38
 * offset into the log file for writing the next Transaction.
 
39
 *
 
40
 * We write Transaction message encapsulated in an 8-byte length header and a
 
41
 * 4-byte checksum trailer.
 
42
 *
 
43
 * When writing a Transaction to the log, we calculate the length of the 
 
44
 * Transaction to be written.  We then increment log_offset by the length
 
45
 * of the Transaction plus 2 * sizeof(uint32_t) plus sizeof(uint32_t) and store 
 
46
 * this new offset in a local off_t called cur_offset (see TransactionLog::apply().  
 
47
 * This compare and set is done in an atomic instruction.
 
48
 *
 
49
 * We then adjust the local off_t (cur_offset) back to the original
 
50
 * offset by subtracting the length and sizeof(uint32_t) and sizeof(uint32_t).
 
51
 *
 
52
 * We then first write a 64-bit length and then the serialized transaction/transaction
 
53
 * and optional checksum to our log file at our local cur_offset.
 
54
 *
 
55
 * --------------------------------------------------------------------------------
 
56
 * |<- 4 bytes ->|<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
 
57
 * --------------------------------------------------------------------------------
 
58
 * |  Msg Type   |   Length    |   Serialized Transaction Message   |   Checksum  |
 
59
 * --------------------------------------------------------------------------------
64
60
 *
65
61
 * @todo
66
62
 *
69
65
 * as a skeleton and a springboard.
70
66
 */
71
67
 
72
 
#include "config.h"
 
68
#include <drizzled/server_includes.h>
73
69
#include "transaction_log.h"
74
70
 
75
 
#include <sys/stat.h>
76
 
#include <fcntl.h>
77
71
#include <unistd.h>
78
 
#include <errno.h>
79
72
 
80
73
#include <vector>
81
74
#include <string>
82
75
 
83
 
#include <drizzled/internal/my_sys.h> /* for internal::my_sync */
84
 
#include <drizzled/errmsg_print.h>
 
76
#include <drizzled/session.h>
 
77
#include <drizzled/set_var.h>
85
78
#include <drizzled/gettext.h>
 
79
#include <drizzled/hash/crc32.h>
86
80
#include <drizzled/message/transaction.pb.h>
87
 
#include <drizzled/transaction_services.h>
88
 
#include <drizzled/algorithm/crc32.h>
89
 
 
90
81
#include <google/protobuf/io/coded_stream.h>
91
82
 
92
83
using namespace std;
93
84
using namespace drizzled;
94
85
using namespace google;
95
86
 
96
 
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
 
87
/** 
 
88
 * Transaction Log plugin system variable - Is the log enabled? Only used on init().  
 
89
 * The enable() and disable() methods of the TransactionLog class control online
 
90
 * disabling.
 
91
 */
 
92
static bool sysvar_transaction_log_enabled= false;
 
93
/** Transaction Log plugin system variable - The path to the log file used */
 
94
static char* sysvar_transaction_log_file= NULL;
 
95
/** 
 
96
 * Transaction Log plugin system variable - A debugging variable to assist 
 
97
 * in truncating the log file. 
 
98
 */
 
99
static bool sysvar_transaction_log_truncate_debug= false;
 
100
static const char DEFAULT_LOG_FILE_PATH[]= "transaction.log"; /* In datadir... */
 
101
/** 
 
102
 * Transaction Log plugin system variable - Should we write a CRC32 checksum for 
 
103
 * each written Transaction message?
 
104
 */
 
105
static bool sysvar_transaction_log_checksum_enabled= false;
97
106
 
98
 
TransactionLog::TransactionLog(const string in_log_file_path,
99
 
                               uint32_t in_flush_frequency,
100
 
                               bool in_do_checksum) : 
 
107
TransactionLog::TransactionLog(string name_arg,
 
108
                               const char *in_log_file_path,
 
109
                               bool in_do_checksum)
 
110
  : plugin::TransactionApplier(name_arg),
101
111
    state(OFFLINE),
102
 
    log_file_path(in_log_file_path),
103
 
    has_error(false),
104
 
    error_message(),
105
 
    flush_frequency(in_flush_frequency),
106
 
    do_checksum(in_do_checksum)
 
112
    log_file_path(in_log_file_path)
107
113
{
 
114
  do_checksum= in_do_checksum; /* Have to do here, not in initialization list b/c atomic<> */
 
115
 
108
116
  /* Setup our log file and determine the next write offset... */
109
 
  log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
 
117
  log_file= open(log_file_path, O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
110
118
  if (log_file == -1)
111
119
  {
112
 
    char errmsg[STRERROR_MAX];
113
 
    strerror_r(errno, errmsg, sizeof(errmsg));
114
 
    error_message.assign(_("Failed to open transaction log file "));
115
 
    error_message.append(log_file_path);
116
 
    error_message.append("  Got error: ");
117
 
    error_message.append(errmsg);
118
 
    error_message.push_back('\n');
119
 
    has_error= true;
 
120
    errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to open transaction log file %s.  Got error: %s\n"), 
 
121
                  log_file_path, 
 
122
                  strerror(errno));
 
123
    deactivate();
120
124
    return;
121
125
  }
122
126
 
123
 
  /* For convenience, grab the log file name from the path */
124
 
  if (log_file_path.find_first_of('/') != string::npos)
125
 
  {
126
 
    /* Strip to last / */
127
 
    string tmp;
128
 
    tmp= log_file_path.substr(log_file_path.find_last_of('/') + 1);
129
 
    log_file_name.assign(tmp);
130
 
  }
131
 
  else
132
 
    log_file_name.assign(log_file_path);
133
 
 
134
127
  /* 
135
128
   * The offset of the next write is the current position of the log
136
129
   * file, since it's opened in append mode...
140
133
  state= ONLINE;
141
134
}
142
135
 
143
 
uint8_t *TransactionLog::packTransactionIntoLogEntry(const message::Transaction &trx,
144
 
                                                     uint8_t *buffer,
145
 
                                                     uint32_t *checksum_out)
146
 
{
147
 
  uint8_t *orig_buffer= buffer;
148
 
  size_t message_byte_length= trx.ByteSize();
 
136
TransactionLog::~TransactionLog()
 
137
{
 
138
  /* Clear up any resources we've consumed */
 
139
  if (isEnabled() && log_file != -1)
 
140
  {
 
141
    (void) close(log_file);
 
142
  }
 
143
}
 
144
 
 
145
void TransactionLog::apply(const message::Transaction &to_apply)
 
146
{
 
147
  uint8_t *buffer; /* Buffer we will write serialized header, 
 
148
                      message and trailing checksum to */
 
149
  uint8_t *orig_buffer;
 
150
 
 
151
  size_t message_byte_length= to_apply.ByteSize();
 
152
  ssize_t written;
 
153
  off_t cur_offset;
 
154
  size_t total_envelope_length= HEADER_TRAILER_BYTES + message_byte_length;
 
155
 
 
156
  /* 
 
157
   * Attempt allocation of raw memory buffer for the header, 
 
158
   * message and trailing checksum bytes.
 
159
   */
 
160
  buffer= static_cast<uint8_t *>(malloc(total_envelope_length));
 
161
  if (buffer == NULL)
 
162
  {
 
163
    errmsg_printf(ERRMSG_LVL_ERROR, 
 
164
                  _("Failed to allocate enough memory to buffer header, transaction message, and trailing checksum bytes. Tried to allocate %" PRId64
 
165
                    " bytes.  Error: %s\n"), 
 
166
                  static_cast<int64_t>(total_envelope_length),
 
167
                  strerror(errno));
 
168
    state= CRASHED;
 
169
    deactivate();
 
170
    return;
 
171
  }
 
172
  else
 
173
    orig_buffer= buffer; /* We will free() orig_buffer, as buffer is moved during write */
 
174
 
 
175
  /*
 
176
   * Do an atomic increment on the offset of the log file position
 
177
   */
 
178
  cur_offset= log_offset.fetch_and_add(static_cast<off_t>(total_envelope_length));
 
179
 
 
180
  /*
 
181
   * We adjust cur_offset back to the original log_offset before
 
182
   * the increment above...
 
183
   */
 
184
  cur_offset-= static_cast<off_t>((total_envelope_length));
149
185
 
150
186
  /*
151
187
   * Write the header information, which is the message type and
152
188
   * the length of the transaction message into the buffer
153
189
   */
154
 
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
155
 
      static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
156
 
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
157
 
      static_cast<uint32_t>(message_byte_length), buffer);
 
190
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
 
191
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(static_cast<uint32_t>(message_byte_length), buffer);
158
192
  
159
193
  /*
160
194
   * Now write the serialized transaction message, followed
161
195
   * by the optional checksum into the buffer.
162
196
   */
163
 
  buffer= trx.SerializeWithCachedSizesToArray(buffer);
 
197
  buffer= to_apply.SerializeWithCachedSizesToArray(buffer);
164
198
 
 
199
  uint32_t checksum= 0;
165
200
  if (do_checksum)
166
201
  {
167
 
    *checksum_out= drizzled::algorithm::crc32(
168
 
        reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
 
202
    checksum= drizzled::hash::crc32(reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
169
203
  }
170
 
  else
171
 
    *checksum_out= 0;
172
204
 
173
205
  /* We always write in network byte order */
174
 
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(*checksum_out, buffer);
175
 
  /* Reset the pointer back to its original location... */
176
 
  buffer= orig_buffer;
177
 
  return orig_buffer;
178
 
}
179
 
 
180
 
off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
181
 
{
182
 
  ssize_t written= 0;
183
 
 
184
 
  /*
185
 
   * Do an atomic increment on the offset of the log file position
186
 
   */
187
 
  off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
 
206
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(checksum, buffer);
188
207
 
189
208
  /* 
190
209
   * Quick safety...if an error occurs above in another writer, the log 
197
216
     * the original offset where an error occurred.
198
217
     */
199
218
    log_offset= cur_offset;
200
 
    return log_offset;
 
219
    free(orig_buffer);
 
220
    return;
201
221
  }
202
222
 
203
223
  /* Write the full buffer in one swoop */
204
224
  do
205
225
  {
206
 
    written= pwrite(log_file, data, data_length, cur_offset);
 
226
    written= pwrite(log_file, orig_buffer, total_envelope_length, cur_offset);
207
227
  }
208
228
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
209
229
 
210
 
  if (unlikely(written != static_cast<ssize_t>(data_length)))
 
230
  if (unlikely(written != static_cast<ssize_t>(total_envelope_length)))
211
231
  {
212
 
    char errmsg[STRERROR_MAX];
213
 
    strerror_r(errno, errmsg, sizeof(errmsg));
214
232
    errmsg_printf(ERRMSG_LVL_ERROR, 
215
 
                  _("Failed to write full size of log entry.  Tried to write %" PRId64
 
233
                  _("Failed to write full size of transaction.  Tried to write %" PRId64
216
234
                    " bytes at offset %" PRId64 ", but only wrote %" PRId32 " bytes.  Error: %s\n"), 
217
 
                  static_cast<int64_t>(data_length),
 
235
                  static_cast<int64_t>(total_envelope_length),
218
236
                  static_cast<int64_t>(cur_offset),
219
 
                  static_cast<int32_t>(written), 
220
 
                  errmsg);
 
237
                  static_cast<int64_t>(written), 
 
238
                  strerror(errno));
221
239
    state= CRASHED;
222
240
    /* 
223
241
     * Reset the log's offset in case we want to produce a decent error message including
224
242
     * the original offset where an error occurred.
225
243
     */
226
244
    log_offset= cur_offset;
227
 
  }
228
 
 
229
 
  int error_code= syncLogFile();
230
 
 
231
 
  if (unlikely(error_code != 0))
232
 
  {
233
 
    char errmsg[STRERROR_MAX];
234
 
    strerror_r(errno, errmsg, sizeof(errmsg));
235
 
    errmsg_printf(ERRMSG_LVL_ERROR, 
236
 
                  _("Failed to sync log file. Got error: %s\n"), 
237
 
                  errmsg);
238
 
  }
239
 
  return cur_offset;
240
 
}
241
 
 
242
 
int TransactionLog::syncLogFile()
243
 
{
244
 
  switch (flush_frequency)
245
 
  {
246
 
  case FLUSH_FREQUENCY_EVERY_WRITE:
247
 
    return internal::my_sync(log_file, 0);
248
 
  case FLUSH_FREQUENCY_EVERY_SECOND:
249
 
    {
250
 
      time_t now_time= time(NULL);
251
 
      if (last_sync_time <= (now_time - 1))
252
 
      {
253
 
        last_sync_time= now_time;
254
 
        return internal::my_sync(log_file, 0);
255
 
      }
256
 
      return 0;
257
 
    }
258
 
  case FLUSH_FREQUENCY_OS:
259
 
  default:
260
 
    return 0;
261
 
  }
262
 
}
263
 
 
264
 
const string &TransactionLog::getLogFilename()
265
 
{
266
 
  return log_file_name;
267
 
}
268
 
 
269
 
const string &TransactionLog::getLogFilepath()
270
 
{
271
 
  return log_file_path;
 
245
    deactivate();
 
246
  }
 
247
  free(orig_buffer);
272
248
}
273
249
 
274
250
void TransactionLog::truncate()
275
251
{
 
252
  bool orig_is_enabled= isEnabled();
 
253
  disable();
 
254
  
276
255
  /* 
 
256
   * Wait a short amount of time before truncating.  This just prevents error messages
 
257
   * from being produced during a call to apply().  Calling disable() above
 
258
   * means that once the current caller to apply() is done, no other calls are made to
 
259
   * apply() before enable is reset to its original state
 
260
   *
277
261
   * @note
278
262
   *
279
 
   * This is NOT THREAD SAFE! DEBUG/TEST code only!
 
263
   * This is DEBUG code only!
280
264
   */
 
265
  usleep(500); /* Sleep for half a second */
281
266
  log_offset= (off_t) 0;
282
267
  int result;
283
268
  do
285
270
    result= ftruncate(log_file, log_offset);
286
271
  }
287
272
  while (result == -1 && errno == EINTR);
 
273
 
 
274
  if (orig_is_enabled)
 
275
    enable();
288
276
}
289
277
 
290
278
bool TransactionLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,
291
 
                                                            string &out_filename) const
 
279
                                                        string &out_filename) const
292
280
{
293
281
  /* 
294
282
   * Currently, we simply return the single logfile name
299
287
  return true;
300
288
}
301
289
 
302
 
bool TransactionLog::hasError() const
303
 
{
304
 
  return has_error;
305
 
}
306
 
 
307
 
void TransactionLog::clearError()
308
 
{
309
 
  has_error= false;
310
 
  error_message.clear();
311
 
}
312
 
 
313
 
const string &TransactionLog::getErrorMessage() const
314
 
{
315
 
  return error_message;
316
 
}
317
 
 
318
 
size_t TransactionLog::getLogEntrySize(const message::Transaction &trx)
319
 
{
320
 
  return trx.ByteSize() + HEADER_TRAILER_BYTES;
321
 
}
 
290
static TransactionLog *transaction_log= NULL; /* The singleton transaction log */
 
291
 
 
292
static int init(drizzled::plugin::Registry &registry)
 
293
{
 
294
  if (sysvar_transaction_log_enabled)
 
295
  {
 
296
    transaction_log= new TransactionLog("transaction_log",
 
297
                                        sysvar_transaction_log_file, 
 
298
                                        sysvar_transaction_log_checksum_enabled);
 
299
    registry.add(transaction_log);
 
300
  }
 
301
  return 0;
 
302
}
 
303
 
 
304
static int deinit(drizzled::plugin::Registry &registry)
 
305
{
 
306
  if (transaction_log)
 
307
  {
 
308
    registry.remove(transaction_log);
 
309
    delete transaction_log;
 
310
  }
 
311
  return 0;
 
312
}
 
313
 
 
314
static void set_truncate_debug(Session *,
 
315
                               struct st_mysql_sys_var *, 
 
316
                               void *, 
 
317
                               const void *save)
 
318
{
 
319
  /* 
 
320
   * The const void * save comes directly from the check function, 
 
321
   * which should simply return the result from the set statement. 
 
322
   */
 
323
  if (transaction_log)
 
324
    if (*(bool *)save != false)
 
325
      transaction_log->truncate();
 
326
}
 
327
 
 
328
static DRIZZLE_SYSVAR_BOOL(enable,
 
329
                          sysvar_transaction_log_enabled,
 
330
                          PLUGIN_VAR_NOCMDARG,
 
331
                          N_("Enable transaction log"),
 
332
                          NULL, /* check func */
 
333
                          NULL, /* update func */
 
334
                          false /* default */);
 
335
 
 
336
static DRIZZLE_SYSVAR_BOOL(truncate_debug,
 
337
                          sysvar_transaction_log_truncate_debug,
 
338
                          PLUGIN_VAR_NOCMDARG,
 
339
                          N_("DEBUGGING - Truncate transaction log"),
 
340
                          NULL, /* check func */
 
341
                          set_truncate_debug, /* update func */
 
342
                          false /* default */);
 
343
 
 
344
static DRIZZLE_SYSVAR_STR(log_file,
 
345
                          sysvar_transaction_log_file,
 
346
                          PLUGIN_VAR_READONLY,
 
347
                          N_("Path to the file to use for transaction log."),
 
348
                          NULL, /* check func */
 
349
                          NULL, /* update func*/
 
350
                          DEFAULT_LOG_FILE_PATH /* default */);
 
351
 
 
352
static DRIZZLE_SYSVAR_BOOL(enable_checksum,
 
353
                          sysvar_transaction_log_checksum_enabled,
 
354
                          PLUGIN_VAR_NOCMDARG,
 
355
                          N_("Enable CRC32 Checksumming"),
 
356
                          NULL, /* check func */
 
357
                          NULL, /* update func */
 
358
                          false /* default */);
 
359
 
 
360
static struct st_mysql_sys_var* system_variables[]= {
 
361
  DRIZZLE_SYSVAR(enable),
 
362
  DRIZZLE_SYSVAR(truncate_debug),
 
363
  DRIZZLE_SYSVAR(log_file),
 
364
  DRIZZLE_SYSVAR(enable_checksum),
 
365
  NULL
 
366
};
 
367
 
 
368
drizzle_declare_plugin(transaction_log)
 
369
{
 
370
  "transaction_log",
 
371
  "0.1",
 
372
  "Jay Pipes",
 
373
  N_("Transaction Message Log"),
 
374
  PLUGIN_LICENSE_GPL,
 
375
  init, /* Plugin Init */
 
376
  deinit, /* Plugin Deinit */
 
377
  NULL, /* status variables */
 
378
  system_variables, /* system variables */
 
379
  NULL    /* config options */
 
380
}
 
381
drizzle_declare_plugin_end;