~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/command_log/command_log.cc

Merge Stewart's dead code removal

Show diffs side-by-side

added added

removed removed

Lines of Context:
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
4
 *  Copyright (C) 2008-2009 Sun Microsystems
5
 
 *  Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
6
 
 *
7
 
 *  Authors:
8
 
 *
9
 
 *  Jay Pipes <jaypipes@gmail.com.com>
10
5
 *
11
6
 *  This program is free software; you can redistribute it and/or modify
12
7
 *  it under the terms of the GNU General Public License as published by
26
21
/**
27
22
 * @file
28
23
 *
29
 
 * Defines the implementation of the transaction log file descriptor.
 
24
 * Defines the implementation of the default command log.
 
25
 *
 
26
 * @see drizzled/plugin/command_replicator.h
 
27
 * @see drizzled/plugin/command_applier.h
30
28
 *
31
29
 * @details
32
30
 *
33
31
 * Currently, the log file uses this implementation:
34
32
 *
35
33
 * We have an atomic off_t called log_offset which keeps track of the 
36
 
 * offset into the log file for writing the next Transaction.
 
34
 * offset into the log file for writing the next Command.
37
35
 *
38
 
 * We write Transaction message encapsulated in an 8-byte length/type header and a
 
36
 * We write Command message encapsulated in a 8-byte length header and a
39
37
 * 4-byte checksum trailer.
40
38
 *
41
 
 * When writing a Transaction to the log, we calculate the length of the 
42
 
 * Transaction to be written.  We then increment log_offset by the length
43
 
 * of the Transaction plus 2 * sizeof(uint32_t) plus sizeof(uint32_t) and store 
44
 
 * this new offset in a local off_t called cur_offset (see TransactionLog::apply().  
 
39
 * When writing a Command to the log, we calculate the length of the 
 
40
 * Command to be written.  We then increment log_offset by the length
 
41
 * of the Command plus sizeof(uint64_t) plus sizeof(uint32_t) and store 
 
42
 * this new offset in a local off_t called cur_offset (see CommandLog::apply().  
45
43
 * This compare and set is done in an atomic instruction.
46
44
 *
47
45
 * We then adjust the local off_t (cur_offset) back to the original
48
 
 * offset by subtracting the length and sizeof(uint32_t) and sizeof(uint32_t).
 
46
 * offset by subtracting the length and sizeof(uint64_t) and sizeof(uint32_t).
49
47
 *
50
 
 * We then first write a 64-bit length and then the serialized transaction/transaction
 
48
 * We then first write a 64-bit length and then the serialized transaction/command
51
49
 * and optional checksum to our log file at our local cur_offset.
52
50
 *
53
 
 * --------------------------------------------------------------------------------
54
 
 * |<- 4 bytes ->|<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
55
 
 * --------------------------------------------------------------------------------
56
 
 * |  Msg Type   |   Length    |   Serialized Transaction Message   |   Checksum  |
57
 
 * --------------------------------------------------------------------------------
 
51
 * --------------------------------------------------------------
 
52
 * |<- 8 bytes ->|<- # Bytes of Command Message ->|<- 4 bytes ->|
 
53
 * --------------------------------------------------------------
 
54
 * |   Length    |   Serialized Command Message   |   Checksum  |
 
55
 * --------------------------------------------------------------
58
56
 *
59
57
 * @todo
60
58
 *
61
59
 * Possibly look at a scoreboard approach with multiple file segments.  For
62
60
 * right now, though, this is just a quick simple implementation to serve
63
61
 * as a skeleton and a springboard.
 
62
 *
 
63
 * Also, we can move to a ZeroCopyStream implementation instead of using the
 
64
 * string as a buffer in apply()
64
65
 */
65
66
 
66
 
#include "config.h"
67
 
#include "transaction_log.h"
 
67
#include <drizzled/server_includes.h>
 
68
#include "command_log.h"
68
69
 
69
 
#include <sys/stat.h>
70
 
#include <fcntl.h>
71
70
#include <unistd.h>
72
 
#include <errno.h>
 
71
#include <zlib.h>
73
72
 
74
73
#include <vector>
75
74
#include <string>
76
75
 
77
 
#include <drizzled/internal/my_sys.h> /* for internal::my_sync */
78
 
#include <drizzled/errmsg_print.h>
 
76
#include <drizzled/session.h>
 
77
#include <drizzled/set_var.h>
79
78
#include <drizzled/gettext.h>
 
79
#include <drizzled/message/replication.pb.h>
 
80
#include <drizzled/crc32.h>
80
81
 
81
82
using namespace std;
82
83
using namespace drizzled;
83
84
 
84
 
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
 
85
/** 
 
86
 * Command Log plugin system variable - Is the log enabled? Only used on init().  
 
87
 * The enable() and disable() methods of the CommandLog class control online
 
88
 * disabling.
 
89
 */
 
90
static bool sysvar_command_log_enabled= false;
 
91
/** Command Log plugin system variable - The path to the log file used */
 
92
static char* sysvar_command_log_file= NULL;
 
93
/** 
 
94
 * Command Log plugin system variable - A debugging variable to assist 
 
95
 * in truncating the log file. 
 
96
 */
 
97
static bool sysvar_command_log_truncate_debug= false;
 
98
static const char DEFAULT_LOG_FILE_PATH[]= "command.log"; /* In datadir... */
 
99
/** 
 
100
 * Command Log plugin system variable - Should we write a CRC32 checksum for 
 
101
 * each written Command message?
 
102
 */
 
103
static bool sysvar_command_log_checksum_enabled= false;
85
104
 
86
 
TransactionLog::TransactionLog(const string in_log_file_path,
87
 
                               uint32_t in_sync_method) : 
 
105
CommandLog::CommandLog(string name_arg,
 
106
                       const char *in_log_file_path, bool in_do_checksum)
 
107
  : 
 
108
    plugin::CommandApplier(name_arg),
88
109
    state(OFFLINE),
89
 
    log_file_path(in_log_file_path),
90
 
    has_error(false),
91
 
    error_message(),
92
 
    sync_method(in_sync_method)
 
110
    log_file_path(in_log_file_path)
93
111
{
 
112
  do_checksum= in_do_checksum; /* Have to do here, not in initialization list b/c atomic<> */
 
113
 
94
114
  /* Setup our log file and determine the next write offset... */
95
 
  log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
 
115
  log_file= open(log_file_path, O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
96
116
  if (log_file == -1)
97
117
  {
98
 
    error_message.assign(_("Failed to open transaction log file "));
99
 
    error_message.append(log_file_path);
100
 
    error_message.append("  Got error: ");
101
 
    error_message.append(strerror(errno));
102
 
    error_message.push_back('\n');
103
 
    has_error= true;
 
118
    errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to open command log file %s.  Got error: %s\n"), 
 
119
                  log_file_path, 
 
120
                  strerror(errno));
 
121
    deactivate();
104
122
    return;
105
123
  }
106
124
 
107
 
  /* For convenience, grab the log file name from the path */
108
 
  if (log_file_path.find_first_of('/') != string::npos)
109
 
  {
110
 
    /* Strip to last / */
111
 
    string tmp;
112
 
    tmp= log_file_path.substr(log_file_path.find_last_of('/') + 1);
113
 
    log_file_name.assign(tmp);
114
 
  }
115
 
  else
116
 
    log_file_name.assign(log_file_path);
117
 
 
118
125
  /* 
119
126
   * The offset of the next write is the current position of the log
120
127
   * file, since it's opened in append mode...
124
131
  state= ONLINE;
125
132
}
126
133
 
127
 
TransactionLog::~TransactionLog()
 
134
CommandLog::~CommandLog()
128
135
{
129
136
  /* Clear up any resources we've consumed */
130
 
  if (log_file != -1)
 
137
  if (isEnabled() && log_file != -1)
131
138
  {
132
139
    (void) close(log_file);
133
140
  }
134
141
}
135
142
 
136
 
off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
 
143
void CommandLog::apply(const message::Command &to_apply)
137
144
{
138
 
  ssize_t written= 0;
 
145
  /* 
 
146
   * There is an issue on Solaris/SunStudio where if the std::string buffer is
 
147
   * NOT initialized with the below, the code produces an EFAULT when accessing
 
148
   * c_str() later on.  Stoopid, but true.
 
149
   */
 
150
  string buffer(""); /* Buffer we will write serialized command to */
 
151
 
 
152
  static const uint32_t HEADER_TRAILER_BYTES= sizeof(uint64_t) + /* 8-byte length header */
 
153
                                              sizeof(uint32_t); /* 4 byte checksum trailer */
 
154
 
 
155
  size_t length;
 
156
  ssize_t written;
 
157
  off_t cur_offset;
 
158
 
 
159
  to_apply.SerializeToString(&buffer);
 
160
 
 
161
  length= buffer.length(); 
139
162
 
140
163
  /*
141
164
   * Do an atomic increment on the offset of the log file position
142
165
   */
143
 
  off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
 
166
  cur_offset= log_offset.fetch_and_add(static_cast<off_t>((HEADER_TRAILER_BYTES + length)));
144
167
 
145
168
  /*
146
169
   * We adjust cur_offset back to the original log_offset before
147
170
   * the increment above...
148
171
   */
149
 
  cur_offset-= static_cast<off_t>(data_length);
150
 
 
151
 
  /* 
152
 
   * Quick safety...if an error occurs above in another writer, the log 
153
 
   * file will be in a crashed state.
154
 
   */
155
 
  if (unlikely(state == CRASHED))
156
 
  {
157
 
    /* 
158
 
     * Reset the log's offset in case we want to produce a decent error message including
159
 
     * the original offset where an error occurred.
160
 
     */
161
 
    log_offset= cur_offset;
162
 
    return log_offset;
163
 
  }
164
 
 
165
 
  /* Write the full buffer in one swoop */
166
 
  do
167
 
  {
168
 
    written= pwrite(log_file, data, data_length, cur_offset);
169
 
  }
170
 
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
171
 
 
172
 
  if (unlikely(written != static_cast<ssize_t>(data_length)))
173
 
  {
174
 
    errmsg_printf(ERRMSG_LVL_ERROR, 
175
 
                  _("Failed to write full size of log entry.  Tried to write %" PRId64
176
 
                    " bytes at offset %" PRId64 ", but only wrote %" PRId32 " bytes.  Error: %s\n"), 
177
 
                  static_cast<int64_t>(data_length),
178
 
                  static_cast<int64_t>(cur_offset),
179
 
                  static_cast<int64_t>(written), 
180
 
                  strerror(errno));
181
 
    state= CRASHED;
182
 
    /* 
183
 
     * Reset the log's offset in case we want to produce a decent error message including
184
 
     * the original offset where an error occurred.
185
 
     */
186
 
    log_offset= cur_offset;
187
 
  }
188
 
 
189
 
  int error_code= syncLogFile();
190
 
 
191
 
  if (unlikely(error_code != 0))
192
 
  {
193
 
    errmsg_printf(ERRMSG_LVL_ERROR, 
194
 
                  _("Failed to sync log file. Got error: %s\n"), 
195
 
                  strerror(errno));
196
 
  }
197
 
  return cur_offset;
198
 
}
199
 
 
200
 
int TransactionLog::syncLogFile()
201
 
{
202
 
  switch (sync_method)
203
 
  {
204
 
  case SYNC_METHOD_EVERY_WRITE:
205
 
    return internal::my_sync(log_file, 0);
206
 
  case SYNC_METHOD_EVERY_SECOND:
207
 
    {
208
 
      time_t now_time= time(NULL);
209
 
      if (last_sync_time <= (now_time - 1))
210
 
      {
211
 
        last_sync_time= now_time;
212
 
        return internal::my_sync(log_file, 0);
213
 
      }
214
 
      return 0;
215
 
    }
216
 
  case SYNC_METHOD_OS:
217
 
  default:
218
 
    return 0;
219
 
  }
220
 
}
221
 
 
222
 
const string &TransactionLog::getLogFilename()
223
 
{
224
 
  return log_file_name;
225
 
}
226
 
 
227
 
const string &TransactionLog::getLogFilepath()
228
 
{
229
 
  return log_file_path;
230
 
}
231
 
 
232
 
void TransactionLog::truncate()
233
 
{
234
 
  /* 
 
172
  cur_offset-= static_cast<off_t>((HEADER_TRAILER_BYTES + length));
 
173
 
 
174
  /* 
 
175
   * Quick safety...if an error occurs below, the log file will
 
176
   * not be active, therefore a caller could have been ready
 
177
   * to write...but the log is crashed.
 
178
   */
 
179
  if (unlikely(state == CRASHED))
 
180
    return;
 
181
 
 
182
  /* We always write in network byte order */
 
183
  unsigned char nbo_length[8];
 
184
  int8store(nbo_length, (uint64_t) length);
 
185
 
 
186
  /* Write the length header */
 
187
  do
 
188
  {
 
189
    written= pwrite(log_file, nbo_length, sizeof(uint64_t), cur_offset);
 
190
  }
 
191
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
 
192
 
 
193
  if (unlikely(written != sizeof(uint64_t)))
 
194
  {
 
195
    errmsg_printf(ERRMSG_LVL_ERROR, 
 
196
                  _("Failed to write full size of command.  Tried to write %" PRId64 
 
197
                    " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes.  Error: %s\n"), 
 
198
                  static_cast<int64_t>(sizeof(uint64_t)), 
 
199
                  static_cast<int64_t>(cur_offset),
 
200
                  static_cast<int64_t>(written), 
 
201
                  strerror(errno));
 
202
    state= CRASHED;
 
203
    /* 
 
204
     * Reset the log's offset in case we want to produce a decent error message including
 
205
     * the original offset where an error occurred.
 
206
     */
 
207
    log_offset= cur_offset;
 
208
    deactivate();
 
209
    return;
 
210
  }
 
211
 
 
212
  cur_offset+= static_cast<off_t>(written);
 
213
 
 
214
  /* 
 
215
   * Quick safety...if an error occurs above in another writer, the log 
 
216
   * file will be in a crashed state.
 
217
   */
 
218
  if (unlikely(state == CRASHED))
 
219
  {
 
220
    /* 
 
221
     * Reset the log's offset in case we want to produce a decent error message including
 
222
     * the original offset where an error occurred.
 
223
     */
 
224
    log_offset= cur_offset;
 
225
    return;
 
226
  }
 
227
 
 
228
  /* Write the command message itself */
 
229
  do
 
230
  {
 
231
    written= pwrite(log_file, buffer.c_str(), length, cur_offset);
 
232
  }
 
233
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
 
234
 
 
235
  if (unlikely(written != static_cast<ssize_t>(length)))
 
236
  {
 
237
    errmsg_printf(ERRMSG_LVL_ERROR, 
 
238
                  _("Failed to write full serialized command.  Tried to write %" PRId64 
 
239
                    " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes.  Error: %s\n"), 
 
240
                  static_cast<int64_t>(length), 
 
241
                  static_cast<int64_t>(cur_offset),
 
242
                  static_cast<int64_t>(written), 
 
243
                  strerror(errno));
 
244
    state= CRASHED;
 
245
    /* 
 
246
     * Reset the log's offset in case we want to produce a decent error message including
 
247
     * the original offset where an error occurred.
 
248
     */
 
249
    log_offset= cur_offset;
 
250
    deactivate();
 
251
  }
 
252
 
 
253
  cur_offset+= static_cast<off_t>(written);
 
254
 
 
255
  /* 
 
256
   * Quick safety...if an error occurs above in another writer, the log 
 
257
   * file will be in a crashed state.
 
258
   */
 
259
  if (unlikely(state == CRASHED))
 
260
  {
 
261
    /* 
 
262
     * Reset the log's offset in case we want to produce a decent error message including
 
263
     * the original offset where an error occurred.
 
264
     */
 
265
    log_offset= cur_offset;
 
266
    return;
 
267
  }
 
268
 
 
269
  uint32_t checksum= 0;
 
270
 
 
271
  if (do_checksum)
 
272
  {
 
273
    checksum= hash_crc32(buffer.c_str(), length);
 
274
  }
 
275
 
 
276
  /* We always write in network byte order */
 
277
  unsigned char nbo_checksum[4];
 
278
  int4store(nbo_checksum, checksum);
 
279
 
 
280
  /* Write the checksum trailer */
 
281
  do
 
282
  {
 
283
    written= pwrite(log_file, nbo_checksum, sizeof(uint32_t), cur_offset);
 
284
  }
 
285
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
 
286
 
 
287
  if (unlikely(written != static_cast<ssize_t>(sizeof(uint32_t))))
 
288
  {
 
289
    errmsg_printf(ERRMSG_LVL_ERROR, 
 
290
                  _("Failed to write full checksum of command.  Tried to write %" PRId64 
 
291
                    " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes.  Error: %s\n"), 
 
292
                  static_cast<int64_t>(sizeof(uint32_t)), 
 
293
                  static_cast<int64_t>(cur_offset),
 
294
                  static_cast<int64_t>(written), 
 
295
                  strerror(errno));
 
296
    state= CRASHED;
 
297
    /* 
 
298
     * Reset the log's offset in case we want to produce a decent error message including
 
299
     * the original offset where an error occurred.
 
300
     */
 
301
    log_offset= cur_offset;
 
302
    deactivate();
 
303
    return;
 
304
  }
 
305
}
 
306
 
 
307
void CommandLog::truncate()
 
308
{
 
309
  bool orig_is_enabled= isEnabled();
 
310
  disable();
 
311
  
 
312
  /* 
 
313
   * Wait a short amount of time before truncating.  This just prevents error messages
 
314
   * from being produced during a call to apply().  Calling disable() above
 
315
   * means that once the current caller to apply() is done, no other calls are made to
 
316
   * apply() before enable is reset to its original state
 
317
   *
235
318
   * @note
236
319
   *
237
 
   * This is NOT THREAD SAFE! DEBUG/TEST code only!
 
320
   * This is DEBUG code only!
238
321
   */
 
322
  usleep(500); /* Sleep for half a second */
239
323
  log_offset= (off_t) 0;
240
324
  int result;
241
325
  do
243
327
    result= ftruncate(log_file, log_offset);
244
328
  }
245
329
  while (result == -1 && errno == EINTR);
 
330
 
 
331
  if (orig_is_enabled)
 
332
    enable();
246
333
}
247
334
 
248
 
bool TransactionLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,
249
 
                                                            string &out_filename) const
 
335
bool CommandLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,
 
336
                                                        string &out_filename) const
250
337
{
251
338
  /* 
252
339
   * Currently, we simply return the single logfile name
257
344
  return true;
258
345
}
259
346
 
260
 
bool TransactionLog::hasError() const
261
 
{
262
 
  return has_error;
263
 
}
264
 
 
265
 
void TransactionLog::clearError()
266
 
{
267
 
  has_error= false;
268
 
  error_message.clear();
269
 
}
270
 
 
271
 
const string &TransactionLog::getErrorMessage() const
272
 
{
273
 
  return error_message;
274
 
}
 
347
static CommandLog *command_log= NULL; /* The singleton command log */
 
348
 
 
349
static int init(drizzled::plugin::Registry &registry)
 
350
{
 
351
  if (sysvar_command_log_enabled)
 
352
  {
 
353
    command_log= new CommandLog("command_log",
 
354
                                sysvar_command_log_file, 
 
355
                                sysvar_command_log_checksum_enabled);
 
356
    registry.add(command_log);
 
357
  }
 
358
  return 0;
 
359
}
 
360
 
 
361
static int deinit(drizzled::plugin::Registry &registry)
 
362
{
 
363
  if (command_log)
 
364
  {
 
365
    registry.remove(command_log);
 
366
    delete command_log;
 
367
  }
 
368
  return 0;
 
369
}
 
370
 
 
371
static void set_truncate_debug(Session *,
 
372
                               struct st_mysql_sys_var *, 
 
373
                               void *, 
 
374
                               const void *save)
 
375
{
 
376
  /* 
 
377
   * The const void * save comes directly from the check function, 
 
378
   * which should simply return the result from the set statement. 
 
379
   */
 
380
  if (command_log)
 
381
    if (*(bool *)save != false)
 
382
      command_log->truncate();
 
383
}
 
384
 
 
385
static DRIZZLE_SYSVAR_BOOL(enable,
 
386
                          sysvar_command_log_enabled,
 
387
                          PLUGIN_VAR_NOCMDARG,
 
388
                          N_("Enable command log"),
 
389
                          NULL, /* check func */
 
390
                          NULL, /* update func */
 
391
                          false /* default */);
 
392
 
 
393
static DRIZZLE_SYSVAR_BOOL(truncate_debug,
 
394
                          sysvar_command_log_truncate_debug,
 
395
                          PLUGIN_VAR_NOCMDARG,
 
396
                          N_("DEBUGGING - Truncate command log"),
 
397
                          NULL, /* check func */
 
398
                          set_truncate_debug, /* update func */
 
399
                          false /* default */);
 
400
 
 
401
static DRIZZLE_SYSVAR_STR(log_file,
 
402
                          sysvar_command_log_file,
 
403
                          PLUGIN_VAR_READONLY,
 
404
                          N_("Path to the file to use for command log."),
 
405
                          NULL, /* check func */
 
406
                          NULL, /* update func*/
 
407
                          DEFAULT_LOG_FILE_PATH /* default */);
 
408
 
 
409
static DRIZZLE_SYSVAR_BOOL(enable_checksum,
 
410
                          sysvar_command_log_checksum_enabled,
 
411
                          PLUGIN_VAR_NOCMDARG,
 
412
                          N_("Enable CRC32 Checksumming"),
 
413
                          NULL, /* check func */
 
414
                          NULL, /* update func */
 
415
                          false /* default */);
 
416
 
 
417
static struct st_mysql_sys_var* system_variables[]= {
 
418
  DRIZZLE_SYSVAR(enable),
 
419
  DRIZZLE_SYSVAR(truncate_debug),
 
420
  DRIZZLE_SYSVAR(log_file),
 
421
  DRIZZLE_SYSVAR(enable_checksum),
 
422
  NULL
 
423
};
 
424
 
 
425
drizzle_declare_plugin(command_log)
 
426
{
 
427
  "command_log",
 
428
  "0.1",
 
429
  "Jay Pipes",
 
430
  N_("Command Message Log"),
 
431
  PLUGIN_LICENSE_GPL,
 
432
  init, /* Plugin Init */
 
433
  deinit, /* Plugin Deinit */
 
434
  NULL, /* status variables */
 
435
  system_variables, /* system variables */
 
436
  NULL    /* config options */
 
437
}
 
438
drizzle_declare_plugin_end;