1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2008-2009 Sun Microsystems
5
* Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
9
* Jay Pipes <jaypipes@gmail.com.com>
11
* This program is free software; you can redistribute it and/or modify
12
* it under the terms of the GNU General Public License as published by
13
* the Free Software Foundation; either version 2 of the License, or
14
* (at your option) any later version.
16
* This program is distributed in the hope that it will be useful,
17
* but WITHOUT ANY WARRANTY; without even the implied warranty of
18
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19
* GNU General Public License for more details.
21
* You should have received a copy of the GNU General Public License
22
* along with this program; if not, write to the Free Software
23
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
29
* Defines the implementation of the transaction applier plugin
30
* for the transaction log.
32
* @see drizzled/plugin/transaction_replicator.h
33
* @see drizzled/plugin/transaction_applier.h
37
* The TransactionLogApplier::apply() method constructs the entry
38
* in the transaction log from the supplied Transaction message and
39
* asks its associated TransactionLog object to write this entry.
41
* Upon a successful write, the applier adds some information about
42
* the written transaction to the transaction log index.
46
#include "transaction_log.h"
47
#include "transaction_log_applier.h"
48
#include "transaction_log_index.h"
54
#include <drizzled/errmsg_print.h>
55
#include <drizzled/gettext.h>
56
#include <drizzled/algorithm/crc32.h>
57
#include <drizzled/message/transaction.pb.h>
58
#include <google/protobuf/io/coded_stream.h>
61
using namespace drizzled;
62
using namespace google;
64
TransactionLogApplier *transaction_log_applier= NULL; /* The singleton transaction log applier */
66
extern TransactionLogIndex *transaction_log_index;
68
TransactionLogApplier::TransactionLogApplier(const string name_arg,
69
TransactionLog &in_transaction_log,
70
bool in_do_checksum) :
71
plugin::TransactionApplier(name_arg),
72
transaction_log(in_transaction_log),
73
do_checksum(in_do_checksum)
77
TransactionLogApplier::~TransactionLogApplier()
81
void TransactionLogApplier::apply(const message::Transaction &to_apply)
83
uint8_t *buffer; /* Buffer we will write serialized header,
84
message and trailing checksum to */
87
size_t message_byte_length= to_apply.ByteSize();
88
size_t total_envelope_length= TransactionLog::HEADER_TRAILER_BYTES + message_byte_length;
91
* Attempt allocation of raw memory buffer for the header,
92
* message and trailing checksum bytes.
94
buffer= static_cast<uint8_t *>(malloc(total_envelope_length));
97
errmsg_printf(ERRMSG_LVL_ERROR,
98
_("Failed to allocate enough memory to buffer header, "
99
"transaction message, and trailing checksum bytes. Tried to allocate %" PRId64
100
" bytes. Error: %s\n"),
101
static_cast<int64_t>(total_envelope_length),
106
orig_buffer= buffer; /* We will free() orig_buffer, as buffer is moved during write */
109
* Write the header information, which is the message type and
110
* the length of the transaction message into the buffer
112
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
113
static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
114
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
115
static_cast<uint32_t>(message_byte_length), buffer);
118
* Now write the serialized transaction message, followed
119
* by the optional checksum into the buffer.
121
buffer= to_apply.SerializeWithCachedSizesToArray(buffer);
123
uint32_t checksum= 0;
126
checksum= drizzled::algorithm::crc32(
127
reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
130
/* We always write in network byte order */
131
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(checksum, buffer);
133
/* Ask the transaction log to write the entry and return where it wrote it */
134
off_t written_to= transaction_log.writeEntry(orig_buffer, total_envelope_length);
138
/* Add an entry to the index describing what was just applied */
139
transaction_log_index->addEntry(TransactionLogEntry(ReplicationServices::TRANSACTION,
141
total_envelope_length),