~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/transaction_log/transaction_log.cc

Completes the blueprint for refactoring applier out of log descriptor.

1) Makes the TransactionLog class a simple descriptor for the actual transaction log file
2) Splits out the TransactionLogApplier into separate class with constructor taking a TransactionLog instance
3) Splits the module initialization stuff out into a file, /plugin/transaction_log/module.cc

Show diffs side-by-side

added added

removed removed

Lines of Context:
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
4
 *  Copyright (C) 2008-2009 Sun Microsystems
 
5
 *  Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
5
6
 *
6
7
 *  Authors:
7
8
 *
8
 
 *  Jay Pipes <joinfu@sun.com>
 
9
 *  Jay Pipes <jaypipes@gmail.com.com>
9
10
 *
10
11
 *  This program is free software; you can redistribute it and/or modify
11
12
 *  it under the terms of the GNU General Public License as published by
25
26
/**
26
27
 * @file
27
28
 *
28
 
 * Defines the implementation of the default transaction log.
29
 
 *
30
 
 * @see drizzled/plugin/transaction_replicator.h
31
 
 * @see drizzled/plugin/transaction_applier.h
 
29
 * Defines the implementation of the transaction log file descriptor.
32
30
 *
33
31
 * @details
34
32
 *
37
35
 * We have an atomic off_t called log_offset which keeps track of the 
38
36
 * offset into the log file for writing the next Transaction.
39
37
 *
40
 
 * We write Transaction message encapsulated in an 8-byte length header and a
 
38
 * We write Transaction message encapsulated in an 8-byte length/type header and a
41
39
 * 4-byte checksum trailer.
42
40
 *
43
41
 * When writing a Transaction to the log, we calculate the length of the 
63
61
 * Possibly look at a scoreboard approach with multiple file segments.  For
64
62
 * right now, though, this is just a quick simple implementation to serve
65
63
 * as a skeleton and a springboard.
66
 
 *
67
 
 * @todo
68
 
 *
69
 
 * Move the Applier piece of this code out into its own source file and leave
70
 
 * this for all the glue code of the module.
71
64
 */
72
65
 
73
66
#include "config.h"
74
67
#include "transaction_log.h"
75
 
#include "transaction_log_index.h"
76
 
#include "data_dictionary_schema.h"
77
 
#include "print_transaction_message.h"
78
 
#include "hexdump_transaction_message.h"
79
 
#include "background_worker.h"
80
68
 
81
69
#include <sys/stat.h>
82
70
#include <fcntl.h>
85
73
#include <vector>
86
74
#include <string>
87
75
 
88
 
#include "drizzled/internal/my_sys.h" /* for internal::my_sync */
89
 
 
90
 
#include <drizzled/session.h>
91
 
#include <drizzled/set_var.h>
 
76
#include <drizzled/internal/my_sys.h> /* for internal::my_sync */
 
77
#include <drizzled/errmsg_print.h>
92
78
#include <drizzled/gettext.h>
93
 
#include <drizzled/algorithm/crc32.h>
94
 
#include <drizzled/message/transaction.pb.h>
95
 
#include <google/protobuf/io/coded_stream.h>
96
79
 
97
80
using namespace std;
98
81
using namespace drizzled;
99
 
using namespace google;
100
 
 
101
 
/** 
102
 
 * Transaction Log plugin system variable - Is the log enabled? Only used on init().  
103
 
 * The enable() and disable() methods of the TransactionLog class control online
104
 
 * disabling.
105
 
 */
106
 
static bool sysvar_transaction_log_enabled= false;
107
 
/** Transaction Log plugin system variable - The path to the log file used */
108
 
static char* sysvar_transaction_log_file= NULL;
109
 
/** 
110
 
 * Transaction Log plugin system variable - A debugging variable to assist 
111
 
 * in truncating the log file. 
112
 
 */
113
 
static bool sysvar_transaction_log_truncate_debug= false;
114
 
static const char DEFAULT_LOG_FILE_PATH[]= "transaction.log"; /* In datadir... */
115
 
/** 
116
 
 * Transaction Log plugin system variable - Should we write a CRC32 checksum for 
117
 
 * each written Transaction message?
118
 
 */
119
 
static bool sysvar_transaction_log_checksum_enabled= false;
120
 
/**
121
 
 * Numeric option controlling the sync/flush behaviour of the transaction
122
 
 * log.  Options are:
123
 
 *
124
 
 * TransactionLog::SYNC_METHOD_OS == 0            ... let OS do sync'ing
125
 
 * TransactionLog::SYNC_METHOD_EVERY_WRITE == 1   ... sync on every write
126
 
 * TransactionLog::SYNC_METHOD_EVERY_SECOND == 2  ... sync at most once a second
127
 
 */
128
 
static uint32_t sysvar_transaction_log_sync_method= 0;
129
 
 
130
 
/** DATA_DICTIONARY views */
131
 
static TransactionLogTool *transaction_log_tool;
132
 
static TransactionLogEntriesTool *transaction_log_entries_tool;
133
 
static TransactionLogTransactionsTool *transaction_log_transactions_tool;
134
 
 
135
 
/** Index defined in transaction_log_index.cc */
136
 
extern TransactionLogIndex *transaction_log_index;
137
 
 
138
 
/** Defined in print_transaction_message.cc */
139
 
extern plugin::Create_function<PrintTransactionMessageFunction> *print_transaction_message_func_factory;
140
 
extern plugin::Create_function<HexdumpTransactionMessageFunction> *hexdump_transaction_message_func_factory;
141
 
 
142
 
TransactionLog::TransactionLog(string name_arg,
143
 
                               const string &in_log_file_path,
144
 
                               bool in_do_checksum)
145
 
  : plugin::TransactionApplier(name_arg),
 
82
 
 
83
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
 
84
 
 
85
TransactionLog::TransactionLog(const string in_log_file_path,
 
86
                               uint32_t in_sync_method) : 
146
87
    state(OFFLINE),
147
88
    log_file_path(in_log_file_path),
148
89
    has_error(false),
149
 
    error_message()
 
90
    error_message(),
 
91
    sync_method(in_sync_method)
150
92
{
151
 
  do_checksum= in_do_checksum; /* Have to do here, not in initialization list b/c atomic<> */
152
 
 
153
93
  /* Setup our log file and determine the next write offset... */
154
94
  log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
155
95
  if (log_file == -1)
160
100
    error_message.append(strerror(errno));
161
101
    error_message.push_back('\n');
162
102
    has_error= true;
163
 
    deactivate();
164
103
    return;
165
104
  }
166
105
 
187
126
TransactionLog::~TransactionLog()
188
127
{
189
128
  /* Clear up any resources we've consumed */
190
 
  if (isEnabled() && log_file != -1)
 
129
  if (log_file != -1)
191
130
  {
192
131
    (void) close(log_file);
193
132
  }
194
133
}
195
134
 
196
 
void TransactionLog::apply(const message::Transaction &to_apply)
 
135
off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
197
136
{
198
 
  uint8_t *buffer; /* Buffer we will write serialized header, 
199
 
                      message and trailing checksum to */
200
 
  uint8_t *orig_buffer;
201
 
 
202
 
  size_t message_byte_length= to_apply.ByteSize();
203
 
  ssize_t written;
204
 
  off_t cur_offset;
205
 
  size_t total_envelope_length= HEADER_TRAILER_BYTES + message_byte_length;
206
 
 
207
 
  /* 
208
 
   * Attempt allocation of raw memory buffer for the header, 
209
 
   * message and trailing checksum bytes.
210
 
   */
211
 
  buffer= static_cast<uint8_t *>(malloc(total_envelope_length));
212
 
  if (buffer == NULL)
213
 
  {
214
 
    errmsg_printf(ERRMSG_LVL_ERROR, 
215
 
                  _("Failed to allocate enough memory to buffer header, transaction message, and trailing checksum bytes. Tried to allocate %" PRId64
216
 
                    " bytes.  Error: %s\n"), 
217
 
                  static_cast<int64_t>(total_envelope_length),
218
 
                  strerror(errno));
219
 
    state= CRASHED;
220
 
    deactivate();
221
 
    return;
222
 
  }
223
 
  else
224
 
    orig_buffer= buffer; /* We will free() orig_buffer, as buffer is moved during write */
 
137
  ssize_t written= 0;
225
138
 
226
139
  /*
227
140
   * Do an atomic increment on the offset of the log file position
228
141
   */
229
 
  cur_offset= log_offset.fetch_and_add(static_cast<off_t>(total_envelope_length));
 
142
  off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
230
143
 
231
144
  /*
232
145
   * We adjust cur_offset back to the original log_offset before
233
146
   * the increment above...
234
147
   */
235
 
  cur_offset-= static_cast<off_t>((total_envelope_length));
236
 
 
237
 
  /*
238
 
   * Write the header information, which is the message type and
239
 
   * the length of the transaction message into the buffer
240
 
   */
241
 
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
242
 
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(static_cast<uint32_t>(message_byte_length), buffer);
243
 
  
244
 
  /*
245
 
   * Now write the serialized transaction message, followed
246
 
   * by the optional checksum into the buffer.
247
 
   */
248
 
  buffer= to_apply.SerializeWithCachedSizesToArray(buffer);
249
 
 
250
 
  uint32_t checksum= 0;
251
 
  if (do_checksum)
252
 
  {
253
 
    checksum= drizzled::algorithm::crc32(reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
254
 
  }
255
 
 
256
 
  /* We always write in network byte order */
257
 
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(checksum, buffer);
 
148
  cur_offset-= static_cast<off_t>(data_length);
258
149
 
259
150
  /* 
260
151
   * Quick safety...if an error occurs above in another writer, the log 
267
158
     * the original offset where an error occurred.
268
159
     */
269
160
    log_offset= cur_offset;
270
 
    free(orig_buffer);
271
 
    return;
 
161
    return log_offset;
272
162
  }
273
163
 
274
164
  /* Write the full buffer in one swoop */
275
165
  do
276
166
  {
277
 
    written= pwrite(log_file, orig_buffer, total_envelope_length, cur_offset);
 
167
    written= pwrite(log_file, data, data_length, cur_offset);
278
168
  }
279
169
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
280
170
 
281
 
  if (unlikely(written != static_cast<ssize_t>(total_envelope_length)))
 
171
  if (unlikely(written != static_cast<ssize_t>(data_length)))
282
172
  {
283
173
    errmsg_printf(ERRMSG_LVL_ERROR, 
284
 
                  _("Failed to write full size of transaction.  Tried to write %" PRId64
 
174
                  _("Failed to write full size of log entry.  Tried to write %" PRId64
285
175
                    " bytes at offset %" PRId64 ", but only wrote %" PRId32 " bytes.  Error: %s\n"), 
286
 
                  static_cast<int64_t>(total_envelope_length),
 
176
                  static_cast<int64_t>(data_length),
287
177
                  static_cast<int64_t>(cur_offset),
288
178
                  static_cast<int64_t>(written), 
289
179
                  strerror(errno));
293
183
     * the original offset where an error occurred.
294
184
     */
295
185
    log_offset= cur_offset;
296
 
    deactivate();
297
186
  }
298
 
  free(orig_buffer);
299
187
 
300
188
  int error_code= syncLogFile();
301
189
 
302
 
  transaction_log_index->addEntry(TransactionLogEntry(ReplicationServices::TRANSACTION,
303
 
                                                     cur_offset,
304
 
                                                     total_envelope_length),
305
 
                                  to_apply,
306
 
                                  checksum);
307
 
 
308
190
  if (unlikely(error_code != 0))
309
191
  {
310
192
    errmsg_printf(ERRMSG_LVL_ERROR, 
311
193
                  _("Failed to sync log file. Got error: %s\n"), 
312
194
                  strerror(errno));
313
195
  }
 
196
  return cur_offset;
314
197
}
315
198
 
316
199
int TransactionLog::syncLogFile()
317
200
{
318
 
  switch (sysvar_transaction_log_sync_method)
 
201
  switch (sync_method)
319
202
  {
320
203
  case SYNC_METHOD_EVERY_WRITE:
321
204
    return internal::my_sync(log_file, 0);
347
230
 
348
231
void TransactionLog::truncate()
349
232
{
350
 
  bool orig_is_enabled= isEnabled();
351
 
  disable();
352
 
  
353
233
  /* 
354
 
   * Wait a short amount of time before truncating.  This just prevents error messages
355
 
   * from being produced during a call to apply().  Calling disable() above
356
 
   * means that once the current caller to apply() is done, no other calls are made to
357
 
   * apply() before enable is reset to its original state
358
 
   *
359
234
   * @note
360
235
   *
361
 
   * This is DEBUG code only!
 
236
   * This is NOT THREAD SAFE! DEBUG/TEST code only!
362
237
   */
363
 
  usleep(500); /* Sleep for half a second */
364
238
  log_offset= (off_t) 0;
365
239
  int result;
366
240
  do
368
242
    result= ftruncate(log_file, log_offset);
369
243
  }
370
244
  while (result == -1 && errno == EINTR);
371
 
 
372
 
  if (orig_is_enabled)
373
 
    enable();
374
245
}
375
246
 
376
247
bool TransactionLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,
396
267
  error_message.clear();
397
268
}
398
269
 
399
 
const std::string &TransactionLog::getErrorMessage() const
 
270
const string &TransactionLog::getErrorMessage() const
400
271
{
401
272
  return error_message;
402
273
}
403
 
 
404
 
TransactionLog *transaction_log= NULL; /* The singleton transaction log */
405
 
 
406
 
static int init(drizzled::plugin::Registry &registry)
407
 
{
408
 
  /* Create and initialize the transaction log itself */
409
 
  if (sysvar_transaction_log_enabled)
410
 
  {
411
 
    transaction_log= new (nothrow) TransactionLog("transaction_log_applier",
412
 
                                                  string(sysvar_transaction_log_file), 
413
 
                                                  sysvar_transaction_log_checksum_enabled);
414
 
 
415
 
    if (transaction_log == NULL)
416
 
    {
417
 
      errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the TransactionLog instance.  Got error: %s\n"), 
418
 
                    strerror(errno));
419
 
      return 1;
420
 
    }
421
 
    else
422
 
    {
423
 
      /* Check to see if the log was not created properly */
424
 
      if (transaction_log->hasError())
425
 
      {
426
 
        errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to initialize the Transaction Log.  Got error: %s\n"), 
427
 
                      transaction_log->getErrorMessage().c_str());
428
 
        return 1;
429
 
      }
430
 
    }
431
 
    registry.add(transaction_log);
432
 
 
433
 
    /* Setup DATA_DICTIONARY views */
434
 
 
435
 
    transaction_log_tool= new(std::nothrow)TransactionLogTool;
436
 
    registry.add(transaction_log_tool);
437
 
    transaction_log_entries_tool= new(std::nothrow)TransactionLogEntriesTool;
438
 
    registry.add(transaction_log_entries_tool);
439
 
    transaction_log_transactions_tool= new(std::nothrow)TransactionLogTransactionsTool;
440
 
    registry.add(transaction_log_transactions_tool);
441
 
 
442
 
    /* Setup the module's UDFs */
443
 
    print_transaction_message_func_factory=
444
 
      new plugin::Create_function<PrintTransactionMessageFunction>("print_transaction_message");
445
 
    registry.add(print_transaction_message_func_factory);
446
 
 
447
 
    hexdump_transaction_message_func_factory=
448
 
      new plugin::Create_function<HexdumpTransactionMessageFunction>("hexdump_transaction_message");
449
 
    registry.add(hexdump_transaction_message_func_factory);
450
 
 
451
 
    /* Create and initialize the transaction log index */
452
 
    transaction_log_index= new (nothrow) TransactionLogIndex(*transaction_log);
453
 
    if (transaction_log_index == NULL)
454
 
    {
455
 
      errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the TransactionLogIndex instance.  Got error: %s\n"), 
456
 
                    strerror(errno));
457
 
      return 1;
458
 
    }
459
 
    else
460
 
    {
461
 
      /* Check to see if the index was not created properly */
462
 
      if (transaction_log_index->hasError())
463
 
      {
464
 
        errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to initialize the Transaction Log Index.  Got error: %s\n"), 
465
 
                      transaction_log_index->getErrorMessage().c_str());
466
 
        return 1;
467
 
      }
468
 
    }
469
 
 
470
 
    /* 
471
 
     * Setup the background worker thread which maintains
472
 
     * summary information about the transaction log.
473
 
     */
474
 
    if (initTransactionLogBackgroundWorker())
475
 
      return 1; /* Error message output handled in function above */
476
 
  }
477
 
  return 0;
478
 
}
479
 
 
480
 
static int deinit(drizzled::plugin::Registry &registry)
481
 
{
482
 
  /* Cleanup the transaction log itself */
483
 
  if (transaction_log)
484
 
  {
485
 
    registry.remove(transaction_log);
486
 
    delete transaction_log;
487
 
    delete transaction_log_index;
488
 
 
489
 
    /* Cleanup the DATA_DICTIONARY views */
490
 
    registry.remove(transaction_log_tool);
491
 
    delete transaction_log_tool;
492
 
    registry.remove(transaction_log_entries_tool);
493
 
    delete transaction_log_entries_tool;
494
 
    registry.remove(transaction_log_transactions_tool);
495
 
    delete transaction_log_transactions_tool;
496
 
 
497
 
    /* Cleanup module UDFs */
498
 
    registry.remove(print_transaction_message_func_factory);
499
 
    delete print_transaction_message_func_factory;
500
 
    registry.remove(hexdump_transaction_message_func_factory);
501
 
    delete hexdump_transaction_message_func_factory;
502
 
  }
503
 
 
504
 
  return 0;
505
 
}
506
 
 
507
 
static void set_truncate_debug(Session *,
508
 
                               drizzle_sys_var *, 
509
 
                               void *, 
510
 
                               const void *save)
511
 
{
512
 
  /* 
513
 
   * The const void * save comes directly from the check function, 
514
 
   * which should simply return the result from the set statement. 
515
 
   */
516
 
  if (transaction_log)
517
 
    if (*(bool *)save != false)
518
 
      transaction_log->truncate();
519
 
}
520
 
 
521
 
static DRIZZLE_SYSVAR_BOOL(enable,
522
 
                           sysvar_transaction_log_enabled,
523
 
                           PLUGIN_VAR_NOCMDARG,
524
 
                           N_("Enable transaction log"),
525
 
                           NULL, /* check func */
526
 
                           NULL, /* update func */
527
 
                           false /* default */);
528
 
 
529
 
static DRIZZLE_SYSVAR_BOOL(truncate_debug,
530
 
                           sysvar_transaction_log_truncate_debug,
531
 
                           PLUGIN_VAR_NOCMDARG,
532
 
                           N_("DEBUGGING - Truncate transaction log"),
533
 
                           NULL, /* check func */
534
 
                           set_truncate_debug, /* update func */
535
 
                           false /* default */);
536
 
 
537
 
static DRIZZLE_SYSVAR_STR(log_file,
538
 
                          sysvar_transaction_log_file,
539
 
                          PLUGIN_VAR_READONLY,
540
 
                          N_("Path to the file to use for transaction log"),
541
 
                          NULL, /* check func */
542
 
                          NULL, /* update func*/
543
 
                          DEFAULT_LOG_FILE_PATH /* default */);
544
 
 
545
 
static DRIZZLE_SYSVAR_BOOL(enable_checksum,
546
 
                           sysvar_transaction_log_checksum_enabled,
547
 
                           PLUGIN_VAR_NOCMDARG,
548
 
                           N_("Enable CRC32 Checksumming of each written transaction log entry"),
549
 
                           NULL, /* check func */
550
 
                           NULL, /* update func */
551
 
                           false /* default */);
552
 
 
553
 
static DRIZZLE_SYSVAR_UINT(sync_method,
554
 
                           sysvar_transaction_log_sync_method,
555
 
                           PLUGIN_VAR_OPCMDARG,
556
 
                           N_("0 == rely on operating system to sync log file (default), "
557
 
                              "1 == sync file at each transaction write, "
558
 
                              "2 == sync log file once per second"),
559
 
                           NULL, /* check func */
560
 
                           NULL, /* update func */
561
 
                           0, /* default */
562
 
                           0,
563
 
                           2,
564
 
                           0);
565
 
 
566
 
static drizzle_sys_var* sys_variables[]= {
567
 
  DRIZZLE_SYSVAR(enable),
568
 
  DRIZZLE_SYSVAR(truncate_debug),
569
 
  DRIZZLE_SYSVAR(log_file),
570
 
  DRIZZLE_SYSVAR(enable_checksum),
571
 
  DRIZZLE_SYSVAR(sync_method),
572
 
  NULL
573
 
};
574
 
 
575
 
DRIZZLE_PLUGIN(init, deinit, sys_variables);