~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/command_log/command_log.cc

  • Committer: Jay Pipes
  • Date: 2009-09-21 14:33:44 UTC
  • mfrom: (1126.10.26 dtrace-probes)
  • mto: This revision was merged to the branch mainline in revision 1133.
  • Revision ID: jpipes@serialcoder-20090921143344-jnarp7gcn6zmg19c
Merge fixes from Trond and Padraig on dtrace probes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
4
 *  Copyright (C) 2008-2009 Sun Microsystems
5
 
 *  Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
6
 
 *
7
 
 *  Authors:
8
 
 *
9
 
 *  Jay Pipes <jaypipes@gmail.com.com>
10
5
 *
11
6
 *  This program is free software; you can redistribute it and/or modify
12
7
 *  it under the terms of the GNU General Public License as published by
26
21
/**
27
22
 * @file
28
23
 *
29
 
 * Defines the implementation of the transaction log file descriptor.
 
24
 * Defines the implementation of the default command log.
 
25
 *
 
26
 * @see drizzled/plugin/replicator.h
 
27
 * @see drizzled/plugin/applier.h
30
28
 *
31
29
 * @details
32
30
 *
33
31
 * Currently, the log file uses this implementation:
34
32
 *
35
33
 * We have an atomic off_t called log_offset which keeps track of the 
36
 
 * offset into the log file for writing the next Transaction.
37
 
 *
38
 
 * We write Transaction message encapsulated in an 8-byte length/type header and a
39
 
 * 4-byte checksum trailer.
40
 
 *
41
 
 * When writing a Transaction to the log, we calculate the length of the 
42
 
 * Transaction to be written.  We then increment log_offset by the length
43
 
 * of the Transaction plus 2 * sizeof(uint32_t) plus sizeof(uint32_t) and store 
44
 
 * this new offset in a local off_t called cur_offset (see TransactionLog::apply().  
45
 
 * This compare and set is done in an atomic instruction.
 
34
 * offset into the log file for writing the next Command.
 
35
 *
 
36
 * When writing a Command to the log, we calculate the length of the 
 
37
 * Command to be written.  We then increment log_offset by the length
 
38
 * of the Command plus sizeof(uint64_t) and store this new offset in a 
 
39
 * local off_t called cur_offset (see CommandLog::apply().  This 
 
40
 * compare and set is done in an atomic instruction.
46
41
 *
47
42
 * We then adjust the local off_t (cur_offset) back to the original
48
 
 * offset by subtracting the length and sizeof(uint32_t) and sizeof(uint32_t).
49
 
 *
50
 
 * We then first write a 64-bit length and then the serialized transaction/transaction
51
 
 * and optional checksum to our log file at our local cur_offset.
52
 
 *
53
 
 * --------------------------------------------------------------------------------
54
 
 * |<- 4 bytes ->|<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
55
 
 * --------------------------------------------------------------------------------
56
 
 * |  Msg Type   |   Length    |   Serialized Transaction Message   |   Checksum  |
57
 
 * --------------------------------------------------------------------------------
 
43
 * offset by subtracting the length and sizeof(uint64_t).
 
44
 *
 
45
 * We then first write a 64-bit length and then the serialized transaction/command
 
46
 * to our log file at our local cur_offset.
58
47
 *
59
48
 * @todo
60
49
 *
63
52
 * as a skeleton and a springboard.
64
53
 */
65
54
 
66
 
#include "config.h"
67
 
#include "transaction_log.h"
 
55
#include "command_log.h"
68
56
 
69
 
#include <sys/stat.h>
70
 
#include <fcntl.h>
71
 
#include <unistd.h>
72
 
#include <errno.h>
 
57
#include <drizzled/session.h>
 
58
#include <drizzled/set_var.h>
 
59
#include <drizzled/gettext.h>
 
60
#include <drizzled/message/replication.pb.h>
73
61
 
74
62
#include <vector>
75
63
#include <string>
76
 
 
77
 
#include <drizzled/internal/my_sys.h> /* for internal::my_sync */
78
 
#include <drizzled/errmsg_print.h>
79
 
#include <drizzled/gettext.h>
 
64
#include <unistd.h>
80
65
 
81
66
using namespace std;
82
67
using namespace drizzled;
83
68
 
84
 
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
 
69
/** 
 
70
 * Command Log plugin system variable - Is the log enabled? Only used on init().  
 
71
 * The enable() and disable() methods of the CommandLog class control online
 
72
 * disabling.
 
73
 */
 
74
static bool sysvar_command_log_enabled= false;
 
75
/** Command Log plugin system variable - The path to the log file used */
 
76
static char* sysvar_command_log_file= NULL;
 
77
/** Command Log plugin system variable - A debugging variable to assist in truncating the log file. */
 
78
static bool sysvar_command_log_truncate_debug= false;
 
79
static const char DEFAULT_LOG_FILE_PATH[]= "command.log"; /* In datadir... */
85
80
 
86
 
TransactionLog::TransactionLog(const string in_log_file_path,
87
 
                               uint32_t in_sync_method) : 
88
 
    state(OFFLINE),
89
 
    log_file_path(in_log_file_path),
90
 
    has_error(false),
91
 
    error_message(),
92
 
    sync_method(in_sync_method)
 
81
CommandLog::CommandLog(const char *in_log_file_path)
 
82
  : 
 
83
    plugin::Applier(),
 
84
    state(CommandLog::OFFLINE),
 
85
    log_file_path(in_log_file_path)
93
86
{
 
87
  is_enabled= true; /* If constructed, the plugin is enabled until taken offline with disable() */
 
88
  is_active= false;
 
89
 
94
90
  /* Setup our log file and determine the next write offset... */
95
 
  log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
 
91
  log_file= open(log_file_path, O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
96
92
  if (log_file == -1)
97
93
  {
98
 
    error_message.assign(_("Failed to open transaction log file "));
99
 
    error_message.append(log_file_path);
100
 
    error_message.append("  Got error: ");
101
 
    error_message.append(strerror(errno));
102
 
    error_message.push_back('\n');
103
 
    has_error= true;
 
94
    errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to open command log file.  Got error: %s"), strerror(errno));
 
95
    is_active= false;
104
96
    return;
105
97
  }
106
98
 
107
 
  /* For convenience, grab the log file name from the path */
108
 
  if (log_file_path.find_first_of('/') != string::npos)
109
 
  {
110
 
    /* Strip to last / */
111
 
    string tmp;
112
 
    tmp= log_file_path.substr(log_file_path.find_last_of('/') + 1);
113
 
    log_file_name.assign(tmp);
114
 
  }
115
 
  else
116
 
    log_file_name.assign(log_file_path);
117
 
 
118
99
  /* 
119
100
   * The offset of the next write is the current position of the log
120
101
   * file, since it's opened in append mode...
121
102
   */
122
103
  log_offset= lseek(log_file, 0, SEEK_END);
123
104
 
124
 
  state= ONLINE;
 
105
  state= CommandLog::ONLINE;
 
106
  is_active= true;
125
107
}
126
108
 
127
 
TransactionLog::~TransactionLog()
 
109
CommandLog::~CommandLog()
128
110
{
129
111
  /* Clear up any resources we've consumed */
130
 
  if (log_file != -1)
 
112
  if (isActive() && log_file != -1)
131
113
  {
132
114
    (void) close(log_file);
133
115
  }
134
116
}
135
117
 
136
 
off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
137
 
{
138
 
  ssize_t written= 0;
 
118
bool CommandLog::isActive()
 
119
{
 
120
  return is_enabled && is_active;
 
121
}
 
122
 
 
123
void CommandLog::apply(message::Command *to_apply)
 
124
{
 
125
  /* 
 
126
   * There is an issue on Solaris/SunStudio where if the std::string buffer is
 
127
   * NOT initialized with the below, the code produces an EFAULT when accessing
 
128
   * c_str() later on.  Stoopid, but true.
 
129
   */
 
130
  string buffer= string(""); /* Buffer we will write serialized command to */
 
131
 
 
132
  size_t length;
 
133
  ssize_t written;
 
134
  off_t cur_offset;
 
135
 
 
136
  to_apply->SerializeToString(&buffer);
 
137
 
 
138
  length= buffer.length(); 
139
139
 
140
140
  /*
141
141
   * Do an atomic increment on the offset of the log file position
142
142
   */
143
 
  off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
 
143
  cur_offset= log_offset.fetch_and_add((off_t) (sizeof(uint64_t) + length));
144
144
 
145
145
  /*
146
146
   * We adjust cur_offset back to the original log_offset before
147
147
   * the increment above...
148
148
   */
149
 
  cur_offset-= static_cast<off_t>(data_length);
 
149
  cur_offset-= (off_t) (sizeof(uint64_t) + length);
 
150
 
 
151
  /* 
 
152
   * Quick safety...if an error occurs below, the log file will
 
153
   * not be active, therefore a caller could have been ready
 
154
   * to write...but the log is crashed.
 
155
   */
 
156
  if (unlikely(state == CommandLog::CRASHED))
 
157
    return;
 
158
 
 
159
  /* We always write in network byte order */
 
160
  unsigned char nbo_length[8];
 
161
  int8store(nbo_length, length);
 
162
 
 
163
  do
 
164
  {
 
165
    written= pwrite(log_file, nbo_length, sizeof(uint64_t), cur_offset);
 
166
  }
 
167
  while (written == EINTR); /* Just retry the write when interrupted by a signal... */
 
168
 
 
169
  if (unlikely(written != sizeof(uint64_t)))
 
170
  {
 
171
    errmsg_printf(ERRMSG_LVL_ERROR, 
 
172
                  _("Failed to write full size of command.  Tried to write %" PRId64 " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes.  Error: %s\n"), 
 
173
                  (int64_t) length, 
 
174
                  (int64_t) cur_offset,
 
175
                  (int64_t) written, 
 
176
                  strerror(errno));
 
177
    state= CRASHED;
 
178
    /* 
 
179
     * Reset the log's offset in case we want to produce a decent error message including
 
180
     * the original offset where an error occurred.
 
181
     */
 
182
    log_offset= cur_offset;
 
183
    is_active= false;
 
184
    return;
 
185
  }
 
186
 
 
187
  cur_offset+= (off_t) written;
150
188
 
151
189
  /* 
152
190
   * Quick safety...if an error occurs above in another writer, the log 
153
191
   * file will be in a crashed state.
154
192
   */
155
 
  if (unlikely(state == CRASHED))
 
193
  if (unlikely(state == CommandLog::CRASHED))
156
194
  {
157
195
    /* 
158
196
     * Reset the log's offset in case we want to produce a decent error message including
159
197
     * the original offset where an error occurred.
160
198
     */
161
199
    log_offset= cur_offset;
162
 
    return log_offset;
 
200
    return;
163
201
  }
164
202
 
165
 
  /* Write the full buffer in one swoop */
166
203
  do
167
204
  {
168
 
    written= pwrite(log_file, data, data_length, cur_offset);
 
205
    written= pwrite(log_file, buffer.c_str(), length, cur_offset);
169
206
  }
170
 
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
 
207
  while (written == EINTR); /* Just retry the write when interrupted by a signal... */
171
208
 
172
 
  if (unlikely(written != static_cast<ssize_t>(data_length)))
 
209
  if (unlikely(written != (ssize_t) length))
173
210
  {
174
211
    errmsg_printf(ERRMSG_LVL_ERROR, 
175
 
                  _("Failed to write full size of log entry.  Tried to write %" PRId64
176
 
                    " bytes at offset %" PRId64 ", but only wrote %" PRId32 " bytes.  Error: %s\n"), 
177
 
                  static_cast<int64_t>(data_length),
178
 
                  static_cast<int64_t>(cur_offset),
179
 
                  static_cast<int64_t>(written), 
 
212
                  _("Failed to write full serialized command.  Tried to write %" PRId64 " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes.  Error: %s\n"), 
 
213
                  (int64_t) length, 
 
214
                  (int64_t) cur_offset,
 
215
                  (int64_t) written, 
180
216
                  strerror(errno));
181
217
    state= CRASHED;
182
218
    /* 
184
220
     * the original offset where an error occurred.
185
221
     */
186
222
    log_offset= cur_offset;
187
 
  }
188
 
 
189
 
  int error_code= syncLogFile();
190
 
 
191
 
  if (unlikely(error_code != 0))
192
 
  {
193
 
    errmsg_printf(ERRMSG_LVL_ERROR, 
194
 
                  _("Failed to sync log file. Got error: %s\n"), 
195
 
                  strerror(errno));
196
 
  }
197
 
  return cur_offset;
198
 
}
199
 
 
200
 
int TransactionLog::syncLogFile()
201
 
{
202
 
  switch (sync_method)
203
 
  {
204
 
  case SYNC_METHOD_EVERY_WRITE:
205
 
    return internal::my_sync(log_file, 0);
206
 
  case SYNC_METHOD_EVERY_SECOND:
207
 
    {
208
 
      time_t now_time= time(NULL);
209
 
      if (last_sync_time <= (now_time - 1))
210
 
      {
211
 
        last_sync_time= now_time;
212
 
        return internal::my_sync(log_file, 0);
213
 
      }
214
 
      return 0;
215
 
    }
216
 
  case SYNC_METHOD_OS:
217
 
  default:
218
 
    return 0;
219
 
  }
220
 
}
221
 
 
222
 
const string &TransactionLog::getLogFilename()
223
 
{
224
 
  return log_file_name;
225
 
}
226
 
 
227
 
const string &TransactionLog::getLogFilepath()
228
 
{
229
 
  return log_file_path;
230
 
}
231
 
 
232
 
void TransactionLog::truncate()
233
 
{
 
223
    is_active= false;
 
224
  }
 
225
}
 
226
 
 
227
void CommandLog::truncate()
 
228
{
 
229
  bool orig_is_enabled= is_enabled;
 
230
  is_enabled= false;
 
231
  
234
232
  /* 
 
233
   * Wait a short amount of time before truncating.  This just prevents error messages
 
234
   * from being produced during a call to apply().  Setting is_enabled to false above
 
235
   * means that once the current caller to apply() is done, no other calls are made to
 
236
   * apply() before is_enabled is reset to its original state
 
237
   *
235
238
   * @note
236
239
   *
237
 
   * This is NOT THREAD SAFE! DEBUG/TEST code only!
 
240
   * This is DEBUG code only!
238
241
   */
 
242
  usleep(500); /* Sleep for half a second */
239
243
  log_offset= (off_t) 0;
240
244
  int result;
241
245
  do
243
247
    result= ftruncate(log_file, log_offset);
244
248
  }
245
249
  while (result == -1 && errno == EINTR);
246
 
}
247
 
 
248
 
bool TransactionLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,
249
 
                                                            string &out_filename) const
 
250
 
 
251
  is_enabled= orig_is_enabled;
 
252
}
 
253
 
 
254
static CommandLog *command_log= NULL; /* The singleton command log */
 
255
 
 
256
static int init(drizzled::plugin::Registry &registry)
 
257
{
 
258
  if (sysvar_command_log_enabled)
 
259
  {
 
260
    command_log= new CommandLog(sysvar_command_log_file);
 
261
    registry.add(command_log);
 
262
  }
 
263
  return 0;
 
264
}
 
265
 
 
266
static int deinit(drizzled::plugin::Registry &registry)
 
267
{
 
268
  if (command_log)
 
269
  {
 
270
    registry.remove(command_log);
 
271
    delete command_log;
 
272
  }
 
273
  return 0;
 
274
}
 
275
 
 
276
static void set_truncate_debug(Session *, struct st_mysql_sys_var *, void *, const void *save)
250
277
{
251
278
  /* 
252
 
   * Currently, we simply return the single logfile name
253
 
   * Eventually, we'll have an index/hash with upper and
254
 
   * lower bounds to look up a log file with a transaction id
 
279
   * The const void * save comes directly from the check function, 
 
280
   * which should simply return the result from the set statement. 
255
281
   */
256
 
  out_filename.assign(log_file_path);
257
 
  return true;
258
 
}
259
 
 
260
 
bool TransactionLog::hasError() const
261
 
{
262
 
  return has_error;
263
 
}
264
 
 
265
 
void TransactionLog::clearError()
266
 
{
267
 
  has_error= false;
268
 
  error_message.clear();
269
 
}
270
 
 
271
 
const string &TransactionLog::getErrorMessage() const
272
 
{
273
 
  return error_message;
274
 
}
 
282
  if (command_log)
 
283
    if (*(bool *)save != false)
 
284
      command_log->truncate();
 
285
}
 
286
 
 
287
static DRIZZLE_SYSVAR_BOOL(enable,
 
288
                          sysvar_command_log_enabled,
 
289
                          PLUGIN_VAR_NOCMDARG,
 
290
                          N_("Enable command log"),
 
291
                          NULL, /* check func */
 
292
                          NULL, /* update func */
 
293
                          false /* default */);
 
294
 
 
295
static DRIZZLE_SYSVAR_BOOL(truncate_debug,
 
296
                          sysvar_command_log_truncate_debug,
 
297
                          PLUGIN_VAR_NOCMDARG,
 
298
                          N_("DEBUGGING - Truncate command log"),
 
299
                          NULL, /* check func */
 
300
                          set_truncate_debug, /* update func */
 
301
                          false /* default */);
 
302
 
 
303
static DRIZZLE_SYSVAR_STR(log_file,
 
304
                          sysvar_command_log_file,
 
305
                          PLUGIN_VAR_READONLY,
 
306
                          N_("Path to the file to use for command log."),
 
307
                          NULL, /* check func */
 
308
                          NULL, /* update func*/
 
309
                          DEFAULT_LOG_FILE_PATH /* default */);
 
310
 
 
311
static struct st_mysql_sys_var* system_variables[]= {
 
312
  DRIZZLE_SYSVAR(enable),
 
313
  DRIZZLE_SYSVAR(truncate_debug),
 
314
  DRIZZLE_SYSVAR(log_file),
 
315
  NULL
 
316
};
 
317
 
 
318
drizzle_declare_plugin(command_log)
 
319
{
 
320
  "command_log",
 
321
  "0.1",
 
322
  "Jay Pipes",
 
323
  N_("Simple Command Message Log"),
 
324
  PLUGIN_LICENSE_GPL,
 
325
  init, /* Plugin Init */
 
326
  deinit, /* Plugin Deinit */
 
327
  NULL, /* status variables */
 
328
  system_variables, /* system variables */
 
329
  NULL    /* config options */
 
330
}
 
331
drizzle_declare_plugin_end;