~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/transaction_log/transaction_log.cc

  • Committer: Jay Pipes
  • Date: 2009-10-07 23:59:47 UTC
  • mto: (1234.1.1 push) (1237.2.10 push)
  • mto: This revision was merged to the branch mainline in revision 1193.
  • Revision ID: jpipes@serialcoder-20091007235947-18simrecnzwv8t1q
Phase 2 new replication work:

* Removes old replication.proto file, old command_transform library
* Removes use of korr.h macro calls in favor of GPB's CodedOutputStream
  API.
* Updates transaction_log, default_replicator, and filtered_replicator module
  to use new Transaction message.
* Updates ReplicationServices to construct the new Transaction messages and
  associated Statement sub-messages
* Corrects transaction boundaries.  AUTOCOMMIT now works properly, and I have
  added a new test case to verify AUTOCOMMIT variable modification of the way
  in which Transaction messages are bundled up and sent across to replicators.

Show diffs side-by-side

added added

removed removed

Lines of Context:
3
3
 *
4
4
 *  Copyright (C) 2008-2009 Sun Microsystems
5
5
 *
 
6
 *  Authors:
 
7
 *
 
8
 *  Jay Pipes <joinfu@sun.com>
 
9
 *
6
10
 *  This program is free software; you can redistribute it and/or modify
7
11
 *  it under the terms of the GNU General Public License as published by
8
12
 *  the Free Software Foundation; either version 2 of the License, or
21
25
/**
22
26
 * @file
23
27
 *
24
 
 * Defines the implementation of the default command log.
 
28
 * Defines the implementation of the default transaction log.
25
29
 *
26
 
 * @see drizzled/plugin/command_replicator.h
27
 
 * @see drizzled/plugin/command_applier.h
 
30
 * @see drizzled/plugin/transaction_replicator.h
 
31
 * @see drizzled/plugin/transaction_applier.h
28
32
 *
29
33
 * @details
30
34
 *
31
35
 * Currently, the log file uses this implementation:
32
36
 *
33
37
 * We have an atomic off_t called log_offset which keeps track of the 
34
 
 * offset into the log file for writing the next Command.
 
38
 * offset into the log file for writing the next Transaction.
35
39
 *
36
 
 * We write Command message encapsulated in a 8-byte length header and a
 
40
 * We write Transaction message encapsulated in a 8-byte length header and a
37
41
 * 4-byte checksum trailer.
38
42
 *
39
 
 * When writing a Command to the log, we calculate the length of the 
40
 
 * Command to be written.  We then increment log_offset by the length
41
 
 * of the Command plus sizeof(uint64_t) plus sizeof(uint32_t) and store 
42
 
 * this new offset in a local off_t called cur_offset (see CommandLog::apply().  
 
43
 * When writing a Transaction to the log, we calculate the length of the 
 
44
 * Transaction to be written.  We then increment log_offset by the length
 
45
 * of the Transaction plus sizeof(uint64_t) plus sizeof(uint32_t) and store 
 
46
 * this new offset in a local off_t called cur_offset (see TransactionLog::apply().  
43
47
 * This compare and set is done in an atomic instruction.
44
48
 *
45
49
 * We then adjust the local off_t (cur_offset) back to the original
46
50
 * offset by subtracting the length and sizeof(uint64_t) and sizeof(uint32_t).
47
51
 *
48
 
 * We then first write a 64-bit length and then the serialized transaction/command
 
52
 * We then first write a 64-bit length and then the serialized transaction/transaction
49
53
 * and optional checksum to our log file at our local cur_offset.
50
54
 *
51
 
 * --------------------------------------------------------------
52
 
 * |<- 8 bytes ->|<- # Bytes of Command Message ->|<- 4 bytes ->|
53
 
 * --------------------------------------------------------------
54
 
 * |   Length    |   Serialized Command Message   |   Checksum  |
55
 
 * --------------------------------------------------------------
 
55
 * ------------------------------------------------------------------
 
56
 * |<- 8 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
 
57
 * ------------------------------------------------------------------
 
58
 * |   Length    |   Serialized Transaction Message   |   Checksum  |
 
59
 * ------------------------------------------------------------------
56
60
 *
57
61
 * @todo
58
62
 *
64
68
 * string as a buffer in apply()
65
69
 */
66
70
 
67
 
#include "command_log.h"
 
71
#include "transaction_log.h"
68
72
 
69
73
#include <unistd.h>
70
 
#include <zlib.h>
71
74
 
72
75
#include <vector>
73
76
#include <string>
75
78
#include <drizzled/session.h>
76
79
#include <drizzled/set_var.h>
77
80
#include <drizzled/gettext.h>
78
 
#include <drizzled/message/replication.pb.h>
79
81
#include <drizzled/hash/crc32.h>
 
82
#include <drizzled/message/transaction.pb.h>
 
83
#include <google/protobuf/io/coded_stream.h>
80
84
 
81
85
using namespace std;
82
86
using namespace drizzled;
 
87
using namespace google;
83
88
 
84
89
/** 
85
 
 * Command Log plugin system variable - Is the log enabled? Only used on init().  
86
 
 * The enable() and disable() methods of the CommandLog class control online
 
90
 * Transaction Log plugin system variable - Is the log enabled? Only used on init().  
 
91
 * The enable() and disable() methods of the TransactionLog class control online
87
92
 * disabling.
88
93
 */
89
 
static bool sysvar_command_log_enabled= false;
90
 
/** Command Log plugin system variable - The path to the log file used */
91
 
static char* sysvar_command_log_file= NULL;
 
94
static bool sysvar_transaction_log_enabled= false;
 
95
/** Transaction Log plugin system variable - The path to the log file used */
 
96
static char* sysvar_transaction_log_file= NULL;
92
97
/** 
93
 
 * Command Log plugin system variable - A debugging variable to assist 
 
98
 * Transaction Log plugin system variable - A debugging variable to assist 
94
99
 * in truncating the log file. 
95
100
 */
96
 
static bool sysvar_command_log_truncate_debug= false;
97
 
static const char DEFAULT_LOG_FILE_PATH[]= "command.log"; /* In datadir... */
 
101
static bool sysvar_transaction_log_truncate_debug= false;
 
102
static const char DEFAULT_LOG_FILE_PATH[]= "transaction.log"; /* In datadir... */
98
103
/** 
99
 
 * Command Log plugin system variable - Should we write a CRC32 checksum for 
100
 
 * each written Command message?
 
104
 * Transaction Log plugin system variable - Should we write a CRC32 checksum for 
 
105
 * each written Transaction message?
101
106
 */
102
 
static bool sysvar_command_log_checksum_enabled= false;
 
107
static bool sysvar_transaction_log_checksum_enabled= false;
103
108
 
104
 
CommandLog::CommandLog(const char *in_log_file_path, bool in_do_checksum)
 
109
TransactionLog::TransactionLog(const char *in_log_file_path, bool in_do_checksum)
105
110
  : 
106
 
    plugin::CommandApplier(),
 
111
    plugin::TransactionApplier(),
107
112
    state(OFFLINE),
108
113
    log_file_path(in_log_file_path)
109
114
{
115
120
  log_file= open(log_file_path, O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
116
121
  if (log_file == -1)
117
122
  {
118
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to open command log file %s.  Got error: %s\n"), 
 
123
    errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to open transaction log file %s.  Got error: %s\n"), 
119
124
                  log_file_path, 
120
125
                  strerror(errno));
121
126
    is_active= false;
132
137
  is_active= true;
133
138
}
134
139
 
135
 
CommandLog::~CommandLog()
 
140
TransactionLog::~TransactionLog()
136
141
{
137
142
  /* Clear up any resources we've consumed */
138
143
  if (isActive() && log_file != -1)
141
146
  }
142
147
}
143
148
 
144
 
bool CommandLog::isActive()
 
149
bool TransactionLog::isActive()
145
150
{
146
151
  return is_enabled && is_active;
147
152
}
148
153
 
149
 
void CommandLog::apply(const message::Command &to_apply)
 
154
void TransactionLog::apply(const message::Transaction &to_apply)
150
155
{
151
156
  /* 
152
157
   * There is an issue on Solaris/SunStudio where if the std::string buffer is
153
158
   * NOT initialized with the below, the code produces an EFAULT when accessing
154
159
   * c_str() later on.  Stoopid, but true.
155
160
   */
156
 
  string buffer(""); /* Buffer we will write serialized command to */
 
161
  string buffer(""); /* Buffer we will write serialized transaction to */
157
162
 
158
163
  static const uint32_t HEADER_TRAILER_BYTES= sizeof(uint64_t) + /* 8-byte length header */
159
164
                                              sizeof(uint32_t); /* 4 byte checksum trailer */
186
191
    return;
187
192
 
188
193
  /* We always write in network byte order */
189
 
  unsigned char nbo_length[8];
190
 
  int8store(nbo_length, length);
 
194
  uint8_t length_bytes[8];
 
195
  protobuf::io::CodedOutputStream::WriteLittleEndian64ToArray(length, length_bytes);
191
196
 
192
197
  /* Write the length header */
193
198
  do
194
199
  {
195
 
    written= pwrite(log_file, nbo_length, sizeof(uint64_t), cur_offset);
 
200
    written= pwrite(log_file, length_bytes, sizeof(uint64_t), cur_offset);
196
201
  }
197
202
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
198
203
 
199
204
  if (unlikely(written != sizeof(uint64_t)))
200
205
  {
201
206
    errmsg_printf(ERRMSG_LVL_ERROR, 
202
 
                  _("Failed to write full size of command.  Tried to write %" PRId64 
 
207
                  _("Failed to write full size of transaction.  Tried to write %" PRId64 
203
208
                    " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes.  Error: %s\n"), 
204
209
                  static_cast<int64_t>(sizeof(uint64_t)), 
205
210
                  static_cast<int64_t>(cur_offset),
231
236
    return;
232
237
  }
233
238
 
234
 
  /* Write the command message itself */
 
239
  /* Write the transaction message itself */
235
240
  do
236
241
  {
237
242
    written= pwrite(log_file, buffer.c_str(), length, cur_offset);
241
246
  if (unlikely(written != static_cast<ssize_t>(length)))
242
247
  {
243
248
    errmsg_printf(ERRMSG_LVL_ERROR, 
244
 
                  _("Failed to write full serialized command.  Tried to write %" PRId64 
 
249
                  _("Failed to write full serialized transaction.  Tried to write %" PRId64 
245
250
                    " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes.  Error: %s\n"), 
246
251
                  static_cast<int64_t>(length), 
247
252
                  static_cast<int64_t>(cur_offset),
280
285
  }
281
286
 
282
287
  /* We always write in network byte order */
283
 
  unsigned char nbo_checksum[4];
284
 
  int4store(nbo_checksum, checksum);
 
288
  uint8_t checksum_bytes[4];
 
289
  protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(checksum, checksum_bytes);
285
290
 
286
291
  /* Write the checksum trailer */
287
292
  do
288
293
  {
289
 
    written= pwrite(log_file, nbo_checksum, sizeof(uint32_t), cur_offset);
 
294
    written= pwrite(log_file, checksum_bytes, sizeof(uint32_t), cur_offset);
290
295
  }
291
296
  while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
292
297
 
293
298
  if (unlikely(written != static_cast<ssize_t>(sizeof(uint32_t))))
294
299
  {
295
300
    errmsg_printf(ERRMSG_LVL_ERROR, 
296
 
                  _("Failed to write full checksum of command.  Tried to write %" PRId64 
 
301
                  _("Failed to write full checksum of transaction.  Tried to write %" PRId64 
297
302
                    " bytes at offset %" PRId64 ", but only wrote %" PRId64 " bytes.  Error: %s\n"), 
298
303
                  static_cast<int64_t>(sizeof(uint32_t)), 
299
304
                  static_cast<int64_t>(cur_offset),
310
315
  }
311
316
}
312
317
 
313
 
void CommandLog::truncate()
 
318
void TransactionLog::truncate()
314
319
{
315
320
  bool orig_is_enabled= is_enabled;
316
321
  is_enabled= false;
337
342
  is_enabled= orig_is_enabled;
338
343
}
339
344
 
340
 
bool CommandLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,
 
345
bool TransactionLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,
341
346
                                                        string &out_filename) const
342
347
{
343
348
  /* 
349
354
  return true;
350
355
}
351
356
 
352
 
static CommandLog *command_log= NULL; /* The singleton command log */
 
357
static TransactionLog *transaction_log= NULL; /* The singleton transaction log */
353
358
 
354
359
static int init(drizzled::plugin::Registry &registry)
355
360
{
356
 
  if (sysvar_command_log_enabled)
 
361
  if (sysvar_transaction_log_enabled)
357
362
  {
358
 
    command_log= new CommandLog(sysvar_command_log_file, 
359
 
                                sysvar_command_log_checksum_enabled);
360
 
    registry.add(command_log);
 
363
    transaction_log= new TransactionLog(sysvar_transaction_log_file, 
 
364
                                sysvar_transaction_log_checksum_enabled);
 
365
    registry.add(transaction_log);
361
366
  }
362
367
  return 0;
363
368
}
364
369
 
365
370
static int deinit(drizzled::plugin::Registry &registry)
366
371
{
367
 
  if (command_log)
 
372
  if (transaction_log)
368
373
  {
369
 
    registry.remove(command_log);
370
 
    delete command_log;
 
374
    registry.remove(transaction_log);
 
375
    delete transaction_log;
371
376
  }
372
377
  return 0;
373
378
}
381
386
   * The const void * save comes directly from the check function, 
382
387
   * which should simply return the result from the set statement. 
383
388
   */
384
 
  if (command_log)
 
389
  if (transaction_log)
385
390
    if (*(bool *)save != false)
386
 
      command_log->truncate();
 
391
      transaction_log->truncate();
387
392
}
388
393
 
389
394
static DRIZZLE_SYSVAR_BOOL(enable,
390
 
                          sysvar_command_log_enabled,
 
395
                          sysvar_transaction_log_enabled,
391
396
                          PLUGIN_VAR_NOCMDARG,
392
 
                          N_("Enable command log"),
 
397
                          N_("Enable transaction log"),
393
398
                          NULL, /* check func */
394
399
                          NULL, /* update func */
395
400
                          false /* default */);
396
401
 
397
402
static DRIZZLE_SYSVAR_BOOL(truncate_debug,
398
 
                          sysvar_command_log_truncate_debug,
 
403
                          sysvar_transaction_log_truncate_debug,
399
404
                          PLUGIN_VAR_NOCMDARG,
400
 
                          N_("DEBUGGING - Truncate command log"),
 
405
                          N_("DEBUGGING - Truncate transaction log"),
401
406
                          NULL, /* check func */
402
407
                          set_truncate_debug, /* update func */
403
408
                          false /* default */);
404
409
 
405
410
static DRIZZLE_SYSVAR_STR(log_file,
406
 
                          sysvar_command_log_file,
 
411
                          sysvar_transaction_log_file,
407
412
                          PLUGIN_VAR_READONLY,
408
 
                          N_("Path to the file to use for command log."),
 
413
                          N_("Path to the file to use for transaction log."),
409
414
                          NULL, /* check func */
410
415
                          NULL, /* update func*/
411
416
                          DEFAULT_LOG_FILE_PATH /* default */);
412
417
 
413
418
static DRIZZLE_SYSVAR_BOOL(enable_checksum,
414
 
                          sysvar_command_log_checksum_enabled,
 
419
                          sysvar_transaction_log_checksum_enabled,
415
420
                          PLUGIN_VAR_NOCMDARG,
416
421
                          N_("Enable CRC32 Checksumming"),
417
422
                          NULL, /* check func */
426
431
  NULL
427
432
};
428
433
 
429
 
drizzle_declare_plugin(command_log)
 
434
drizzle_declare_plugin(transaction_log)
430
435
{
431
 
  "command_log",
 
436
  "transaction_log",
432
437
  "0.1",
433
438
  "Jay Pipes",
434
 
  N_("Command Message Log"),
 
439
  N_("Transaction Message Log"),
435
440
  PLUGIN_LICENSE_GPL,
436
441
  init, /* Plugin Init */
437
442
  deinit, /* Plugin Deinit */