~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/command_log/command_log.cc

  • Committer: Brian Aker
  • Date: 2009-10-01 22:56:26 UTC
  • mto: (1154.1.1 staging)
  • mto: This revision was merged to the branch mainline in revision 1155.
  • Revision ID: brian@gaz-20091001225626-sb1pdykpxlnkheaj
Remove Factory/make scheduler work like everything else.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 *  Copyright (C) 2008-2009 Sun Microsystems
 
5
 *
 
6
 *  This program is free software; you can redistribute it and/or modify
 
7
 *  it under the terms of the GNU General Public License as published by
 
8
 *  the Free Software Foundation; either version 2 of the License, or
 
9
 *  (at your option) any later version.
 
10
 *
 
11
 *  This program is distributed in the hope that it will be useful,
 
12
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
 *  GNU General Public License for more details.
 
15
 *
 
16
 *  You should have received a copy of the GNU General Public License
 
17
 *  along with this program; if not, write to the Free Software
 
18
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
19
 */
 
20
 
 
21
/**
 
22
 * @file
 
23
 *
 
24
 * Defines the implementation of the default command log.
 
25
 *
 
26
 * @see drizzled/plugin/command_replicator.h
 
27
 * @see drizzled/plugin/command_applier.h
 
28
 *
 
29
 * @details
 
30
 *
 
31
 * Currently, the log file uses this implementation:
 
32
 *
 
33
 * We have an atomic off_t called log_offset which keeps track of the 
 
34
 * offset into the log file for writing the next Command.
 
35
 *
 
36
 * We write Command message encapsulated in a 8-byte length header and a
 
37
 * 4-byte checksum trailer.
 
38
 *
 
39
 * When writing a Command to the log, we calculate the length of the 
 
40
 * Command to be written.  We then increment log_offset by the length
 
41
 * of the Command plus sizeof(uint64_t) plus sizeof(uint32_t) and store 
 
42
 * this new offset in a local off_t called cur_offset (see CommandLog::apply().  
 
43
 * This compare and set is done in an atomic instruction.
 
44
 *
 
45
 * We then adjust the local off_t (cur_offset) back to the original
 
46
 * offset by subtracting the length and sizeof(uint64_t) and sizeof(uint32_t).
 
47
 *
 
48
 * We then first write a 64-bit length and then the serialized transaction/command
 
49
 * and optional checksum to our log file at our local cur_offset.
 
50
 *
 
51
 * --------------------------------------------------------------
 
52
 * |<- 8 bytes ->|<- # Bytes of Command Message ->|<- 4 bytes ->|
 
53
 * --------------------------------------------------------------
 
54
 * |   Length    |   Serialized Command Message   |   Checksum  |
 
55
 * --------------------------------------------------------------
 
56
 *
 
57
 * @todo
 
58
 *
 
59
 * Possibly look at a scoreboard approach with multiple file segments.  For
 
60
 * right now, though, this is just a quick simple implementation to serve
 
61
 * as a skeleton and a springboard.
 
62
 *
 
63
 * Also, we can move to a ZeroCopyStream implementation instead of using the
 
64
 * string as a buffer in apply()
 
65
 */
 
66
 
 
67
#include "command_log.h"
 
68
 
 
69
#include <unistd.h>
 
70
#include <zlib.h>
 
71
 
 
72
#include <vector>
 
73
#include <string>
 
74
 
 
75
#include <drizzled/session.h>
 
76
#include <drizzled/set_var.h>
 
77
#include <drizzled/gettext.h>
 
78
#include <drizzled/message/replication.pb.h>
 
79
#include <drizzled/crc32.h>
 
80
 
 
81
using namespace std;
 
82
using namespace drizzled;
 
83
 
 
84
/** 
 
85
 * Command Log plugin system variable - Is the log enabled? Only used on init().  
 
86
 * The enable() and disable() methods of the CommandLog class control online
 
87
 * disabling.
 
88
 */
 
89
static bool sysvar_command_log_enabled= false;
 
90
/** Command Log plugin system variable - The path to the log file used */
 
91
static char* sysvar_command_log_file= NULL;
 
92
/** 
 
93
 * Command Log plugin system variable - A debugging variable to assist 
 
94
 * in truncating the log file. 
 
95
 */
 
96
static bool sysvar_command_log_truncate_debug= false;
 
97
static const char DEFAULT_LOG_FILE_PATH[]= "command.log"; /* In datadir... */
 
98
/** 
 
99
 * Command Log plugin system variable - Should we write a CRC32 checksum for 
 
100
 * each written Command message?
 
101
 */
 
102
static bool sysvar_command_log_checksum_enabled= false;
 
103
 
 
104
CommandLog::CommandLog(const char *in_log_file_path, bool in_do_checksum)
 
105
  : 
 
106
    plugin::CommandApplier(),
 
107
    state(OFFLINE),
 
108
    log_file_path(in_log_file_path)
 
109
{
 
110
  is_enabled= true; /* If constructed, the plugin is enabled until taken offline with disable() */
 
111
  is_active= false;
 
112
  do_checksum= in_do_checksum; /* Have to do here, not in initialization list b/c atomic<> */
 
113
 
 
114
  /* Setup our log file and determine the next write offset... */
 
115
  log_file= open(log_file_path, O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
 
116
  if (log_file == -1)
 
117
  {
 
118
    errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to open command log file %s.  Got error: %s\n"), 
 
119
                  log_file_path, 
 
120
                  strerror(errno));
 
121
    is_active= false;
 
122
    return;
 
123
  }
 
124
 
 
125
  /* 
 
126
   * The offset of the next write is the current position of the log
 
127
   * file, since it's opened in append mode...
 
128
   */
 
129
  log_offset= lseek(log_file, 0, SEEK_END);
 
130
 
 
131
  state= ONLINE;
 
132
  is_active= true;
 
133
}
 
134
 
 
135
CommandLog::~CommandLog()
 
136
{
 
137
  /* Clear up any resources we've consumed */
 
138
  if (isActive() && log_file != -1)
 
139
  {
 
140
    (void) close(log_file);
 
141
  }
 
142
}
 
143
 
 
144
bool CommandLog::isActive()
 
145
{
 
146
  return is_enabled && is_active;
 
147
}
 
148
 
 
149
void CommandLog::apply(const message::Command &to_apply)
 
150
{
 
151
  /* 
 
152
   * There is an issue on Solaris/SunStudio where if the std::string buffer is
 
153
   * NOT initialized with the below, the code produces an EFAULT when accessing
 
154
   * c_str() later on.  Stoopid, but true.
 
155
   */
 
156
  string buffer(""); /* Buffer we will write serialized command to */
 
157
 
 
158
  static const uint32_t HEADER_TRAILER_BYTES= sizeof(uint64_t) + /* 8-byte length header */
 
159
                                              sizeof(uint32_t); /* 4 byte checksum trailer */
 
160
 
 
161
  size_t length;
 
162
  ssize_t written;
 
163
  off_t cur_offset;
 
164
 
 
165
  to_apply.SerializeToString(&buffer);
 
166
 
 
167
  length= buffer.length(); 
 
168
 
 
169
  /*
 
170
   * Do an atomic increment on the offset of the log file position
 
171
   */
 
172
  cur_offset= log_offset.fetch_and_add(static_cast<off_t>((HEADER_TRAILER_BYTES + length)));
 
173
 
 
174
  /*
 
175
   * We adjust cur_offset back to the original log_offset before
 
176
   * the increment above...
 
177
   */
 
178
  cur_offset-= static_cast<off_t>((HEADER_TRAILER_BYTES + length));
 
179
 
 
180
  /* 
 
181
   * Quick safety...if an error occurs below, the log file will
 
182
   * not be active, therefore a caller could have been ready
 
183
   * to write...but the log is crashed.
 
184
   */
 
185
  if (unlikely(state == CRASHED))
 
186
    return;
 
187
 
 
188
  /* We always write in network byte order */
 
189
  unsigned char nbo_length[8];
 
190
  int8store(nbo_length, length);
 
191
 
 
192
  /* Write the length header */
 
193
  do
 
194
  {
 
195
    written= pwrite(log_file, nbo_length, sizeof(uint64_t), cur_offset);
 
196
  }
 
197
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
 
198
 
 
199
  if (unlikely(written != sizeof(uint64_t)))
 
200
  {
 
201
    errmsg_printf(ERRMSG_LVL_ERROR, 
 
202
                  _("Failed to write full size of command.  Tried to write %" PRId64 
 
203
                    " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes.  Error: %s\n"), 
 
204
                  static_cast<int64_t>(sizeof(uint64_t)), 
 
205
                  static_cast<int64_t>(cur_offset),
 
206
                  static_cast<int64_t>(written), 
 
207
                  strerror(errno));
 
208
    state= CRASHED;
 
209
    /* 
 
210
     * Reset the log's offset in case we want to produce a decent error message including
 
211
     * the original offset where an error occurred.
 
212
     */
 
213
    log_offset= cur_offset;
 
214
    is_active= false;
 
215
    return;
 
216
  }
 
217
 
 
218
  cur_offset+= static_cast<off_t>(written);
 
219
 
 
220
  /* 
 
221
   * Quick safety...if an error occurs above in another writer, the log 
 
222
   * file will be in a crashed state.
 
223
   */
 
224
  if (unlikely(state == CRASHED))
 
225
  {
 
226
    /* 
 
227
     * Reset the log's offset in case we want to produce a decent error message including
 
228
     * the original offset where an error occurred.
 
229
     */
 
230
    log_offset= cur_offset;
 
231
    return;
 
232
  }
 
233
 
 
234
  /* Write the command message itself */
 
235
  do
 
236
  {
 
237
    written= pwrite(log_file, buffer.c_str(), length, cur_offset);
 
238
  }
 
239
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
 
240
 
 
241
  if (unlikely(written != static_cast<ssize_t>(length)))
 
242
  {
 
243
    errmsg_printf(ERRMSG_LVL_ERROR, 
 
244
                  _("Failed to write full serialized command.  Tried to write %" PRId64 
 
245
                    " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes.  Error: %s\n"), 
 
246
                  static_cast<int64_t>(length), 
 
247
                  static_cast<int64_t>(cur_offset),
 
248
                  static_cast<int64_t>(written), 
 
249
                  strerror(errno));
 
250
    state= CRASHED;
 
251
    /* 
 
252
     * Reset the log's offset in case we want to produce a decent error message including
 
253
     * the original offset where an error occurred.
 
254
     */
 
255
    log_offset= cur_offset;
 
256
    is_active= false;
 
257
  }
 
258
 
 
259
  cur_offset+= static_cast<off_t>(written);
 
260
 
 
261
  /* 
 
262
   * Quick safety...if an error occurs above in another writer, the log 
 
263
   * file will be in a crashed state.
 
264
   */
 
265
  if (unlikely(state == CRASHED))
 
266
  {
 
267
    /* 
 
268
     * Reset the log's offset in case we want to produce a decent error message including
 
269
     * the original offset where an error occurred.
 
270
     */
 
271
    log_offset= cur_offset;
 
272
    return;
 
273
  }
 
274
 
 
275
  uint32_t checksum= 0;
 
276
 
 
277
  if (do_checksum)
 
278
  {
 
279
    checksum= hash_crc32(buffer.c_str(), length);
 
280
  }
 
281
 
 
282
  /* We always write in network byte order */
 
283
  unsigned char nbo_checksum[4];
 
284
  int4store(nbo_checksum, checksum);
 
285
 
 
286
  /* Write the checksum trailer */
 
287
  do
 
288
  {
 
289
    written= pwrite(log_file, nbo_checksum, sizeof(uint32_t), cur_offset);
 
290
  }
 
291
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
 
292
 
 
293
  if (unlikely(written != static_cast<ssize_t>(sizeof(uint32_t))))
 
294
  {
 
295
    errmsg_printf(ERRMSG_LVL_ERROR, 
 
296
                  _("Failed to write full checksum of command.  Tried to write %" PRId64 
 
297
                    " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes.  Error: %s\n"), 
 
298
                  static_cast<int64_t>(sizeof(uint32_t)), 
 
299
                  static_cast<int64_t>(cur_offset),
 
300
                  static_cast<int64_t>(written), 
 
301
                  strerror(errno));
 
302
    state= CRASHED;
 
303
    /* 
 
304
     * Reset the log's offset in case we want to produce a decent error message including
 
305
     * the original offset where an error occurred.
 
306
     */
 
307
    log_offset= cur_offset;
 
308
    is_active= false;
 
309
    return;
 
310
  }
 
311
}
 
312
 
 
313
void CommandLog::truncate()
 
314
{
 
315
  bool orig_is_enabled= is_enabled;
 
316
  is_enabled= false;
 
317
  
 
318
  /* 
 
319
   * Wait a short amount of time before truncating.  This just prevents error messages
 
320
   * from being produced during a call to apply().  Setting is_enabled to false above
 
321
   * means that once the current caller to apply() is done, no other calls are made to
 
322
   * apply() before is_enabled is reset to its original state
 
323
   *
 
324
   * @note
 
325
   *
 
326
   * This is DEBUG code only!
 
327
   */
 
328
  usleep(500); /* Sleep for half a second */
 
329
  log_offset= (off_t) 0;
 
330
  int result;
 
331
  do
 
332
  {
 
333
    result= ftruncate(log_file, log_offset);
 
334
  }
 
335
  while (result == -1 && errno == EINTR);
 
336
 
 
337
  is_enabled= orig_is_enabled;
 
338
}
 
339
 
 
340
bool CommandLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,
 
341
                                                        string &out_filename) const
 
342
{
 
343
  /* 
 
344
   * Currently, we simply return the single logfile name
 
345
   * Eventually, we'll have an index/hash with upper and
 
346
   * lower bounds to look up a log file with a transaction id
 
347
   */
 
348
  out_filename.assign(log_file_path);
 
349
  return true;
 
350
}
 
351
 
 
352
static CommandLog *command_log= NULL; /* The singleton command log */
 
353
 
 
354
static int init(drizzled::plugin::Registry &registry)
 
355
{
 
356
  if (sysvar_command_log_enabled)
 
357
  {
 
358
    command_log= new CommandLog(sysvar_command_log_file, 
 
359
                                sysvar_command_log_checksum_enabled);
 
360
    registry.add(command_log);
 
361
  }
 
362
  return 0;
 
363
}
 
364
 
 
365
static int deinit(drizzled::plugin::Registry &registry)
 
366
{
 
367
  if (command_log)
 
368
  {
 
369
    registry.remove(command_log);
 
370
    delete command_log;
 
371
  }
 
372
  return 0;
 
373
}
 
374
 
 
375
static void set_truncate_debug(Session *,
 
376
                               struct st_mysql_sys_var *, 
 
377
                               void *, 
 
378
                               const void *save)
 
379
{
 
380
  /* 
 
381
   * The const void * save comes directly from the check function, 
 
382
   * which should simply return the result from the set statement. 
 
383
   */
 
384
  if (command_log)
 
385
    if (*(bool *)save != false)
 
386
      command_log->truncate();
 
387
}
 
388
 
 
389
static DRIZZLE_SYSVAR_BOOL(enable,
 
390
                          sysvar_command_log_enabled,
 
391
                          PLUGIN_VAR_NOCMDARG,
 
392
                          N_("Enable command log"),
 
393
                          NULL, /* check func */
 
394
                          NULL, /* update func */
 
395
                          false /* default */);
 
396
 
 
397
static DRIZZLE_SYSVAR_BOOL(truncate_debug,
 
398
                          sysvar_command_log_truncate_debug,
 
399
                          PLUGIN_VAR_NOCMDARG,
 
400
                          N_("DEBUGGING - Truncate command log"),
 
401
                          NULL, /* check func */
 
402
                          set_truncate_debug, /* update func */
 
403
                          false /* default */);
 
404
 
 
405
static DRIZZLE_SYSVAR_STR(log_file,
 
406
                          sysvar_command_log_file,
 
407
                          PLUGIN_VAR_READONLY,
 
408
                          N_("Path to the file to use for command log."),
 
409
                          NULL, /* check func */
 
410
                          NULL, /* update func*/
 
411
                          DEFAULT_LOG_FILE_PATH /* default */);
 
412
 
 
413
static DRIZZLE_SYSVAR_BOOL(enable_checksum,
 
414
                          sysvar_command_log_checksum_enabled,
 
415
                          PLUGIN_VAR_NOCMDARG,
 
416
                          N_("Enable CRC32 Checksumming"),
 
417
                          NULL, /* check func */
 
418
                          NULL, /* update func */
 
419
                          false /* default */);
 
420
 
 
421
static struct st_mysql_sys_var* system_variables[]= {
 
422
  DRIZZLE_SYSVAR(enable),
 
423
  DRIZZLE_SYSVAR(truncate_debug),
 
424
  DRIZZLE_SYSVAR(log_file),
 
425
  DRIZZLE_SYSVAR(enable_checksum),
 
426
  NULL
 
427
};
 
428
 
 
429
drizzle_declare_plugin(command_log)
 
430
{
 
431
  "command_log",
 
432
  "0.1",
 
433
  "Jay Pipes",
 
434
  N_("Command Message Log"),
 
435
  PLUGIN_LICENSE_GPL,
 
436
  init, /* Plugin Init */
 
437
  deinit, /* Plugin Deinit */
 
438
  NULL, /* status variables */
 
439
  system_variables, /* system variables */
 
440
  NULL    /* config options */
 
441
}
 
442
drizzle_declare_plugin_end;