~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/transaction_log/transaction_log_applier.cc

Completes the blueprint for refactoring applier out of log descriptor.

1) Makes the TransactionLog class a simple descriptor for the actual transaction log file
2) Splits out the TransactionLogApplier into separate class with constructor taking a TransactionLog instance
3) Splits the module initialization stuff out into a file, /plugin/transaction_log/module.cc

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 *  Copyright (C) 2008-2009 Sun Microsystems
 
5
 *  Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
 
6
 *
 
7
 *  Authors:
 
8
 *
 
9
 *  Jay Pipes <jaypipes@gmail.com.com>
 
10
 *
 
11
 *  This program is free software; you can redistribute it and/or modify
 
12
 *  it under the terms of the GNU General Public License as published by
 
13
 *  the Free Software Foundation; either version 2 of the License, or
 
14
 *  (at your option) any later version.
 
15
 *
 
16
 *  This program is distributed in the hope that it will be useful,
 
17
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
18
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
19
 *  GNU General Public License for more details.
 
20
 *
 
21
 *  You should have received a copy of the GNU General Public License
 
22
 *  along with this program; if not, write to the Free Software
 
23
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
24
 */
 
25
 
 
26
/**
 
27
 * @file
 
28
 *
 
29
 * Defines the implementation of the transaction applier plugin
 
30
 * for the transaction log.
 
31
 *
 
32
 * @see drizzled/plugin/transaction_replicator.h
 
33
 * @see drizzled/plugin/transaction_applier.h
 
34
 *
 
35
 * @details
 
36
 *
 
37
 * The TransactionLogApplier::apply() method constructs the entry
 
38
 * in the transaction log from the supplied Transaction message and
 
39
 * asks its associated TransactionLog object to write this entry.
 
40
 *
 
41
 * Upon a successful write, the applier adds some information about
 
42
 * the written transaction to the transaction log index.
 
43
 */
 
44
 
 
45
#include "config.h"
 
46
#include "transaction_log.h"
 
47
#include "transaction_log_applier.h"
 
48
#include "transaction_log_index.h"
 
49
 
 
50
#include <sys/stat.h>
 
51
#include <fcntl.h>
 
52
#include <unistd.h>
 
53
 
 
54
#include <drizzled/errmsg_print.h>
 
55
#include <drizzled/gettext.h>
 
56
#include <drizzled/algorithm/crc32.h>
 
57
#include <drizzled/message/transaction.pb.h>
 
58
#include <google/protobuf/io/coded_stream.h>
 
59
 
 
60
using namespace std;
 
61
using namespace drizzled;
 
62
using namespace google;
 
63
 
 
64
TransactionLogApplier *transaction_log_applier= NULL; /* The singleton transaction log applier */
 
65
 
 
66
extern TransactionLogIndex *transaction_log_index;
 
67
 
 
68
TransactionLogApplier::TransactionLogApplier(const string name_arg,
 
69
                                             TransactionLog &in_transaction_log,
 
70
                                             bool in_do_checksum) :
 
71
  plugin::TransactionApplier(name_arg),
 
72
  transaction_log(in_transaction_log),  
 
73
  do_checksum(in_do_checksum)
 
74
{
 
75
}
 
76
 
 
77
TransactionLogApplier::~TransactionLogApplier()
 
78
{
 
79
}
 
80
 
 
81
void TransactionLogApplier::apply(const message::Transaction &to_apply)
 
82
{
 
83
  uint8_t *buffer; /* Buffer we will write serialized header, 
 
84
                      message and trailing checksum to */
 
85
  uint8_t *orig_buffer;
 
86
 
 
87
  size_t message_byte_length= to_apply.ByteSize();
 
88
  size_t total_envelope_length= TransactionLog::HEADER_TRAILER_BYTES + message_byte_length;
 
89
 
 
90
  /* 
 
91
   * Attempt allocation of raw memory buffer for the header, 
 
92
   * message and trailing checksum bytes.
 
93
   */
 
94
  buffer= static_cast<uint8_t *>(malloc(total_envelope_length));
 
95
  if (buffer == NULL)
 
96
  {
 
97
    errmsg_printf(ERRMSG_LVL_ERROR, 
 
98
                  _("Failed to allocate enough memory to buffer header, "
 
99
                    "transaction message, and trailing checksum bytes. Tried to allocate %" PRId64
 
100
                    " bytes.  Error: %s\n"), 
 
101
                  static_cast<int64_t>(total_envelope_length),
 
102
                  strerror(errno));
 
103
    return;
 
104
  }
 
105
  else
 
106
    orig_buffer= buffer; /* We will free() orig_buffer, as buffer is moved during write */
 
107
 
 
108
  /*
 
109
   * Write the header information, which is the message type and
 
110
   * the length of the transaction message into the buffer
 
111
   */
 
112
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
 
113
      static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
 
114
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
 
115
      static_cast<uint32_t>(message_byte_length), buffer);
 
116
  
 
117
  /*
 
118
   * Now write the serialized transaction message, followed
 
119
   * by the optional checksum into the buffer.
 
120
   */
 
121
  buffer= to_apply.SerializeWithCachedSizesToArray(buffer);
 
122
 
 
123
  uint32_t checksum= 0;
 
124
  if (do_checksum)
 
125
  {
 
126
    checksum= drizzled::algorithm::crc32(
 
127
        reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
 
128
  }
 
129
 
 
130
  /* We always write in network byte order */
 
131
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(checksum, buffer);
 
132
 
 
133
  /* Ask the transaction log to write the entry and return where it wrote it */
 
134
  off_t written_to= transaction_log.writeEntry(orig_buffer, total_envelope_length);
 
135
 
 
136
  free(orig_buffer);
 
137
 
 
138
  /* Add an entry to the index describing what was just applied */
 
139
  transaction_log_index->addEntry(TransactionLogEntry(ReplicationServices::TRANSACTION,
 
140
                                                      written_to,
 
141
                                                      total_envelope_length),
 
142
                                  to_apply,
 
143
                                  checksum);
 
144
 
 
145
}