~drizzle-trunk/drizzle/development

1273.22.1 by Jay Pipes
Completes the blueprint for refactoring applier out of log descriptor.
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 *
4
 *  Copyright (C) 2008-2009 Sun Microsystems
5
 *  Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
6
 *
7
 *  Authors:
8
 *
9
 *  Jay Pipes <jaypipes@gmail.com.com>
10
 *
11
 *  This program is free software; you can redistribute it and/or modify
12
 *  it under the terms of the GNU General Public License as published by
13
 *  the Free Software Foundation; either version 2 of the License, or
14
 *  (at your option) any later version.
15
 *
16
 *  This program is distributed in the hope that it will be useful,
17
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
18
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19
 *  GNU General Public License for more details.
20
 *
21
 *  You should have received a copy of the GNU General Public License
22
 *  along with this program; if not, write to the Free Software
23
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
24
 */
25
26
/**
27
 * @file
28
 *
29
 * Defines the implementation of the transaction applier plugin
30
 * for the transaction log.
31
 *
32
 * @see drizzled/plugin/transaction_replicator.h
33
 * @see drizzled/plugin/transaction_applier.h
34
 *
35
 * @details
36
 *
37
 * The TransactionLogApplier::apply() method constructs the entry
38
 * in the transaction log from the supplied Transaction message and
39
 * asks its associated TransactionLog object to write this entry.
40
 *
41
 * Upon a successful write, the applier adds some information about
42
 * the written transaction to the transaction log index.
43
 */
44
45
#include "config.h"
46
#include "transaction_log.h"
47
#include "transaction_log_applier.h"
48
#include "transaction_log_index.h"
49
50
#include <sys/stat.h>
51
#include <fcntl.h>
52
#include <unistd.h>
1273.21.3 by Jay Pipes
Add errno.h to a few files. Apparently, this is necessary on OSX.
53
#include <errno.h>
1273.22.1 by Jay Pipes
Completes the blueprint for refactoring applier out of log descriptor.
54
55
#include <drizzled/errmsg_print.h>
56
#include <drizzled/gettext.h>
57
#include <drizzled/algorithm/crc32.h>
58
#include <drizzled/message/transaction.pb.h>
59
#include <google/protobuf/io/coded_stream.h>
60
61
using namespace std;
62
using namespace drizzled;
63
using namespace google;
64
65
TransactionLogApplier *transaction_log_applier= NULL; /* The singleton transaction log applier */
66
67
extern TransactionLogIndex *transaction_log_index;
68
69
TransactionLogApplier::TransactionLogApplier(const string name_arg,
70
                                             TransactionLog &in_transaction_log,
71
                                             bool in_do_checksum) :
72
  plugin::TransactionApplier(name_arg),
73
  transaction_log(in_transaction_log),  
74
  do_checksum(in_do_checksum)
75
{
76
}
77
78
TransactionLogApplier::~TransactionLogApplier()
79
{
80
}
81
82
void TransactionLogApplier::apply(const message::Transaction &to_apply)
83
{
84
  uint8_t *buffer; /* Buffer we will write serialized header, 
85
                      message and trailing checksum to */
86
  uint8_t *orig_buffer;
87
88
  size_t message_byte_length= to_apply.ByteSize();
89
  size_t total_envelope_length= TransactionLog::HEADER_TRAILER_BYTES + message_byte_length;
90
91
  /* 
92
   * Attempt allocation of raw memory buffer for the header, 
93
   * message and trailing checksum bytes.
94
   */
95
  buffer= static_cast<uint8_t *>(malloc(total_envelope_length));
96
  if (buffer == NULL)
97
  {
98
    errmsg_printf(ERRMSG_LVL_ERROR, 
99
                  _("Failed to allocate enough memory to buffer header, "
100
                    "transaction message, and trailing checksum bytes. Tried to allocate %" PRId64
101
                    " bytes.  Error: %s\n"), 
102
                  static_cast<int64_t>(total_envelope_length),
103
                  strerror(errno));
104
    return;
105
  }
106
  else
107
    orig_buffer= buffer; /* We will free() orig_buffer, as buffer is moved during write */
108
109
  /*
110
   * Write the header information, which is the message type and
111
   * the length of the transaction message into the buffer
112
   */
113
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
114
      static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
115
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
116
      static_cast<uint32_t>(message_byte_length), buffer);
117
  
118
  /*
119
   * Now write the serialized transaction message, followed
120
   * by the optional checksum into the buffer.
121
   */
122
  buffer= to_apply.SerializeWithCachedSizesToArray(buffer);
123
124
  uint32_t checksum= 0;
125
  if (do_checksum)
126
  {
127
    checksum= drizzled::algorithm::crc32(
128
        reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
129
  }
130
131
  /* We always write in network byte order */
132
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(checksum, buffer);
133
134
  /* Ask the transaction log to write the entry and return where it wrote it */
135
  off_t written_to= transaction_log.writeEntry(orig_buffer, total_envelope_length);
136
137
  free(orig_buffer);
138
139
  /* Add an entry to the index describing what was just applied */
140
  transaction_log_index->addEntry(TransactionLogEntry(ReplicationServices::TRANSACTION,
141
                                                      written_to,
142
                                                      total_envelope_length),
143
                                  to_apply,
144
                                  checksum);
145
146
}