1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2011 Marcus Eriksson
8
* Marcus Eriksson <krummas@gmail.com>
10
* This program is free software; you can redistribute it and/or modify
11
* it under the terms of the GNU General Public License as published by
12
* the Free Software Foundation; either version 2 of the License, or
13
* (at your option) any later version.
15
* This program is distributed in the hope that it will be useful,
16
* but WITHOUT ANY WARRANTY; without even the implied warranty of
17
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18
* GNU General Public License for more details.
20
* You should have received a copy of the GNU General Public License
21
* along with this program; if not, write to the Free Software
22
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
26
#include "zeromq_log.h"
27
#include <drizzled/message/transaction.pb.h>
28
#include <google/protobuf/io/coded_stream.h>
30
#include <drizzled/module/registry.h>
31
#include <drizzled/plugin.h>
33
#include <boost/program_options.hpp>
34
#include <drizzled/module/option_map.h>
37
namespace po= boost::program_options;
40
using namespace drizzled;
41
using namespace google;
43
namespace drizzle_plugin
46
ZeroMQLog::ZeroMQLog(const string &name, const string &endpoint) :
47
plugin::TransactionApplier(name)
49
void *context= zmq_init(1);
50
_socket= zmq_socket (context, ZMQ_PUB);
52
int rc= zmq_bind (_socket, endpoint.c_str());
54
pthread_mutex_init(&publishLock, NULL);
57
ZeroMQLog::~ZeroMQLog()
60
pthread_mutex_destroy(&publishLock);
63
plugin::ReplicationReturnCode
64
ZeroMQLog::apply(Session &, const message::Transaction &to_apply)
66
size_t message_byte_length= to_apply.ByteSize();
67
uint8_t* buffer= new uint8_t[message_byte_length];
70
errmsg_printf(error::ERROR, _("Failed to allocate enough memory to transaction message\n"));
72
return plugin::UNKNOWN_ERROR;
75
string schema= getSchemaName(to_apply);
77
int rc= zmq_msg_init_size(&schemamsg, schema.length());
78
memcpy(zmq_msg_data(&schemamsg), schema.c_str(), schema.length());
80
to_apply.SerializeWithCachedSizesToArray(buffer);
82
rc= zmq_msg_init_size(&msg, message_byte_length);
84
memcpy(zmq_msg_data(&msg), buffer, message_byte_length);
86
// need a mutex around this since several threads can call this method at the same time
87
pthread_mutex_lock(&publishLock);
88
rc= zmq_send(_socket, &schemamsg, ZMQ_SNDMORE);
89
rc= zmq_send(_socket, &msg, 0);
90
pthread_mutex_unlock(&publishLock);
93
zmq_msg_close(&schemamsg);
95
return plugin::SUCCESS;
98
string ZeroMQLog::getSchemaName(const message::Transaction &txn) {
99
if(txn.statement_size() == 0) return "";
101
const message::Statement &statement= txn.statement(0);
103
switch(statement.type())
105
case message::Statement::INSERT:
106
return statement.insert_header().table_metadata().schema_name();
107
case message::Statement::UPDATE:
108
return statement.update_header().table_metadata().schema_name();
109
case message::Statement::DELETE:
110
return statement.delete_header().table_metadata().schema_name();
111
case message::Statement::CREATE_TABLE:
112
return statement.create_table_statement().table().schema();
113
case message::Statement::TRUNCATE_TABLE:
114
return statement.truncate_table_statement().table_metadata().schema_name();
115
case message::Statement::DROP_TABLE:
116
return statement.drop_table_statement().table_metadata().schema_name();
117
case message::Statement::CREATE_SCHEMA:
118
return statement.create_schema_statement().schema().name();
119
case message::Statement::DROP_SCHEMA:
120
return statement.drop_schema_statement().schema_name();
126
static ZeroMQLog *zeromqLogger; ///< the actual plugin
129
* Initialize the zeromq logger
131
static int init(drizzled::module::Context &context)
133
const module::option_map &vm= context.getOptions();
134
zeromqLogger= new ZeroMQLog("zeromq_log_applier", vm["endpoint"].as<string>());
135
context.add(zeromqLogger);
136
ReplicationServices::attachApplier(zeromqLogger, vm["use-replicator"].as<string>());
137
context.registerVariable(new sys_var_const_string_val("endpoint", vm["endpoint"].as<string>()));
142
static void init_options(drizzled::module::option_context &context)
145
po::value<string>()->default_value("tcp://*:9999"),
146
_("End point to bind to"));
147
context("use-replicator",
148
po::value<string>()->default_value("default_replicator"),
149
_("Name of the replicator plugin to use (default='default_replicator')"));
153
} /* namespace drizzle_plugin */
155
DRIZZLE_PLUGIN(drizzle_plugin::init, NULL, drizzle_plugin::init_options);