~drizzle-trunk/drizzle/development

2386.1.1 by Marcus Eriksson
initial zeromq replicator support
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 *
4
 *  Copyright (C) 2011 Marcus Eriksson
5
 *
6
 *  Authors:
7
 *
8
 *  Marcus Eriksson <krummas@gmail.com>
9
 *
10
 *  This program is free software; you can redistribute it and/or modify
11
 *  it under the terms of the GNU General Public License as published by
12
 *  the Free Software Foundation; either version 2 of the License, or
13
 *  (at your option) any later version.
14
 *
15
 *  This program is distributed in the hope that it will be useful,
16
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
 *  GNU General Public License for more details.
19
 *
20
 *  You should have received a copy of the GNU General Public License
21
 *  along with this program; if not, write to the Free Software
22
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
23
 */
24
25
#include <config.h>
26
#include "zeromq_log.h"
27
#include <drizzled/message/transaction.pb.h>
28
#include <google/protobuf/io/coded_stream.h>
29
#include <stdio.h>
30
#include <drizzled/module/registry.h>
31
#include <drizzled/plugin.h>
32
#include <stdint.h>
33
#include <boost/program_options.hpp>
34
#include <drizzled/module/option_map.h>
35
#include <zmq.h>
36
37
namespace po= boost::program_options;
38
39
using namespace std;
40
using namespace drizzled;
41
using namespace google;
42
43
namespace drizzle_plugin
44
{
45
46
ZeroMQLog::ZeroMQLog(const string &name, const string &endpoint) :
47
  plugin::TransactionApplier(name)
48
{
49
  void *context= zmq_init(1);
50
  _socket= zmq_socket (context, ZMQ_PUB);
51
  assert (_socket);
52
  int rc= zmq_bind (_socket, endpoint.c_str());
53
  assert (rc == 0);
54
  pthread_mutex_init(&publishLock, NULL);
55
}
56
57
ZeroMQLog::~ZeroMQLog()
58
{
59
  zmq_close(_socket);
60
  pthread_mutex_destroy(&publishLock);
61
}
62
63
plugin::ReplicationReturnCode
64
ZeroMQLog::apply(Session &, const message::Transaction &to_apply)
65
{
66
  size_t message_byte_length= to_apply.ByteSize();
67
  uint8_t* buffer= new uint8_t[message_byte_length];
68
  if(buffer == NULL)
69
  {
70
    errmsg_printf(error::ERROR, _("Failed to allocate enough memory to transaction message\n"));
71
    deactivate();
72
    return plugin::UNKNOWN_ERROR;
73
  }
74
2386.1.2 by Marcus Eriksson
add schema as first frame and add some initial docs
75
  string schema= getSchemaName(to_apply);
76
  zmq_msg_t schemamsg;
77
  int rc= zmq_msg_init_size(&schemamsg, schema.length());
78
  memcpy(zmq_msg_data(&schemamsg), schema.c_str(), schema.length());
79
2386.1.1 by Marcus Eriksson
initial zeromq replicator support
80
  to_apply.SerializeWithCachedSizesToArray(buffer);
81
  zmq_msg_t msg;
2386.1.2 by Marcus Eriksson
add schema as first frame and add some initial docs
82
  rc= zmq_msg_init_size(&msg, message_byte_length);
2386.1.1 by Marcus Eriksson
initial zeromq replicator support
83
  assert (rc == 0);
84
  memcpy(zmq_msg_data(&msg), buffer, message_byte_length);
85
2386.1.2 by Marcus Eriksson
add schema as first frame and add some initial docs
86
  // need a mutex around this since several threads can call this method at the same time
2386.1.1 by Marcus Eriksson
initial zeromq replicator support
87
  pthread_mutex_lock(&publishLock);
2386.1.2 by Marcus Eriksson
add schema as first frame and add some initial docs
88
  rc= zmq_send(_socket, &schemamsg, ZMQ_SNDMORE);
89
  rc= zmq_send(_socket, &msg, 0);
2386.1.1 by Marcus Eriksson
initial zeromq replicator support
90
  pthread_mutex_unlock(&publishLock);
91
2386.1.2 by Marcus Eriksson
add schema as first frame and add some initial docs
92
  zmq_msg_close(&msg);
93
  zmq_msg_close(&schemamsg);
2386.1.1 by Marcus Eriksson
initial zeromq replicator support
94
  delete[] buffer;
95
  return plugin::SUCCESS;
96
}
97
2386.1.2 by Marcus Eriksson
add schema as first frame and add some initial docs
98
string ZeroMQLog::getSchemaName(const message::Transaction &txn) {
99
  if(txn.statement_size() == 0) return "";
100
101
  const message::Statement &statement= txn.statement(0);
102
103
  switch(statement.type())
104
  {
105
	case message::Statement::INSERT:
106
	  return statement.insert_header().table_metadata().schema_name();
107
	case message::Statement::UPDATE:
108
	  return statement.update_header().table_metadata().schema_name();
109
	case message::Statement::DELETE:
110
	  return statement.delete_header().table_metadata().schema_name();
111
	case message::Statement::CREATE_TABLE:
112
	  return statement.create_table_statement().table().schema();
113
	case message::Statement::TRUNCATE_TABLE:
114
	  return statement.truncate_table_statement().table_metadata().schema_name();
115
	case message::Statement::DROP_TABLE:
116
	  return statement.drop_table_statement().table_metadata().schema_name();
117
	case message::Statement::CREATE_SCHEMA:
118
	  return statement.create_schema_statement().schema().name();
119
	case message::Statement::DROP_SCHEMA:
120
	  return statement.drop_schema_statement().schema_name();
121
    default:
122
	  return "";
123
  }
124
}
125
2386.1.1 by Marcus Eriksson
initial zeromq replicator support
126
static ZeroMQLog *zeromqLogger; ///< the actual plugin
127
128
/**
129
 * Initialize the zeromq logger
130
 */
131
static int init(drizzled::module::Context &context)
132
{
133
  const module::option_map &vm= context.getOptions();
134
  zeromqLogger= new ZeroMQLog("zeromq_log_applier", vm["endpoint"].as<string>());
135
  context.add(zeromqLogger);
136
  ReplicationServices::attachApplier(zeromqLogger, vm["use-replicator"].as<string>());
137
  context.registerVariable(new sys_var_const_string_val("endpoint", vm["endpoint"].as<string>()));
138
  return 0;
139
}
140
141
142
static void init_options(drizzled::module::option_context &context)
143
{
144
  context("endpoint", 
145
          po::value<string>()->default_value("tcp://*:9999"),
146
          _("End point to bind to"));
147
  context("use-replicator",
148
          po::value<string>()->default_value("default_replicator"),
149
          _("Name of the replicator plugin to use (default='default_replicator')"));
150
151
}
152
153
} /* namespace drizzle_plugin */
154
155
DRIZZLE_PLUGIN(drizzle_plugin::init, NULL, drizzle_plugin::init_options);