~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/zeromq/zeromq_log.cc

  • Committer: Mark Atwood
  • Date: 2011-08-09 16:49:38 UTC
  • mfrom: (2386.1.2 drizzle)
  • Revision ID: me@mark.atwood.name-20110809164938-mb76epliqt26qy4n
mergeĀ lp:~krummas/drizzle/zeromq-support

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 *  Copyright (C) 2011 Marcus Eriksson
 
5
 *
 
6
 *  Authors:
 
7
 *
 
8
 *  Marcus Eriksson <krummas@gmail.com>
 
9
 *
 
10
 *  This program is free software; you can redistribute it and/or modify
 
11
 *  it under the terms of the GNU General Public License as published by
 
12
 *  the Free Software Foundation; either version 2 of the License, or
 
13
 *  (at your option) any later version.
 
14
 *
 
15
 *  This program is distributed in the hope that it will be useful,
 
16
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
17
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
18
 *  GNU General Public License for more details.
 
19
 *
 
20
 *  You should have received a copy of the GNU General Public License
 
21
 *  along with this program; if not, write to the Free Software
 
22
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
23
 */
 
24
 
 
25
#include <config.h>
 
26
#include "zeromq_log.h"
 
27
#include <drizzled/message/transaction.pb.h>
 
28
#include <google/protobuf/io/coded_stream.h>
 
29
#include <stdio.h>
 
30
#include <drizzled/module/registry.h>
 
31
#include <drizzled/plugin.h>
 
32
#include <stdint.h>
 
33
#include <boost/program_options.hpp>
 
34
#include <drizzled/module/option_map.h>
 
35
#include <zmq.h>
 
36
 
 
37
namespace po= boost::program_options;
 
38
 
 
39
using namespace std;
 
40
using namespace drizzled;
 
41
using namespace google;
 
42
 
 
43
namespace drizzle_plugin
 
44
{
 
45
 
 
46
ZeroMQLog::ZeroMQLog(const string &name, const string &endpoint) :
 
47
  plugin::TransactionApplier(name)
 
48
{
 
49
  void *context= zmq_init(1);
 
50
  _socket= zmq_socket (context, ZMQ_PUB);
 
51
  assert (_socket);
 
52
  int rc= zmq_bind (_socket, endpoint.c_str());
 
53
  assert (rc == 0);
 
54
  pthread_mutex_init(&publishLock, NULL);
 
55
}
 
56
 
 
57
ZeroMQLog::~ZeroMQLog()
 
58
{
 
59
  zmq_close(_socket);
 
60
  pthread_mutex_destroy(&publishLock);
 
61
}
 
62
 
 
63
plugin::ReplicationReturnCode
 
64
ZeroMQLog::apply(Session &, const message::Transaction &to_apply)
 
65
{
 
66
  size_t message_byte_length= to_apply.ByteSize();
 
67
  uint8_t* buffer= new uint8_t[message_byte_length];
 
68
  if(buffer == NULL)
 
69
  {
 
70
    errmsg_printf(error::ERROR, _("Failed to allocate enough memory to transaction message\n"));
 
71
    deactivate();
 
72
    return plugin::UNKNOWN_ERROR;
 
73
  }
 
74
 
 
75
  string schema= getSchemaName(to_apply);
 
76
  zmq_msg_t schemamsg;
 
77
  int rc= zmq_msg_init_size(&schemamsg, schema.length());
 
78
  memcpy(zmq_msg_data(&schemamsg), schema.c_str(), schema.length());
 
79
 
 
80
  to_apply.SerializeWithCachedSizesToArray(buffer);
 
81
  zmq_msg_t msg;
 
82
  rc= zmq_msg_init_size(&msg, message_byte_length);
 
83
  assert (rc == 0);
 
84
  memcpy(zmq_msg_data(&msg), buffer, message_byte_length);
 
85
 
 
86
  // need a mutex around this since several threads can call this method at the same time
 
87
  pthread_mutex_lock(&publishLock);
 
88
  rc= zmq_send(_socket, &schemamsg, ZMQ_SNDMORE);
 
89
  rc= zmq_send(_socket, &msg, 0);
 
90
  pthread_mutex_unlock(&publishLock);
 
91
 
 
92
  zmq_msg_close(&msg);
 
93
  zmq_msg_close(&schemamsg);
 
94
  delete[] buffer;
 
95
  return plugin::SUCCESS;
 
96
}
 
97
 
 
98
string ZeroMQLog::getSchemaName(const message::Transaction &txn) {
 
99
  if(txn.statement_size() == 0) return "";
 
100
 
 
101
  const message::Statement &statement= txn.statement(0);
 
102
 
 
103
  switch(statement.type())
 
104
  {
 
105
        case message::Statement::INSERT:
 
106
          return statement.insert_header().table_metadata().schema_name();
 
107
        case message::Statement::UPDATE:
 
108
          return statement.update_header().table_metadata().schema_name();
 
109
        case message::Statement::DELETE:
 
110
          return statement.delete_header().table_metadata().schema_name();
 
111
        case message::Statement::CREATE_TABLE:
 
112
          return statement.create_table_statement().table().schema();
 
113
        case message::Statement::TRUNCATE_TABLE:
 
114
          return statement.truncate_table_statement().table_metadata().schema_name();
 
115
        case message::Statement::DROP_TABLE:
 
116
          return statement.drop_table_statement().table_metadata().schema_name();
 
117
        case message::Statement::CREATE_SCHEMA:
 
118
          return statement.create_schema_statement().schema().name();
 
119
        case message::Statement::DROP_SCHEMA:
 
120
          return statement.drop_schema_statement().schema_name();
 
121
    default:
 
122
          return "";
 
123
  }
 
124
}
 
125
 
 
126
static ZeroMQLog *zeromqLogger; ///< the actual plugin
 
127
 
 
128
/**
 
129
 * Initialize the zeromq logger
 
130
 */
 
131
static int init(drizzled::module::Context &context)
 
132
{
 
133
  const module::option_map &vm= context.getOptions();
 
134
  zeromqLogger= new ZeroMQLog("zeromq_log_applier", vm["endpoint"].as<string>());
 
135
  context.add(zeromqLogger);
 
136
  ReplicationServices::attachApplier(zeromqLogger, vm["use-replicator"].as<string>());
 
137
  context.registerVariable(new sys_var_const_string_val("endpoint", vm["endpoint"].as<string>()));
 
138
  return 0;
 
139
}
 
140
 
 
141
 
 
142
static void init_options(drizzled::module::option_context &context)
 
143
{
 
144
  context("endpoint", 
 
145
          po::value<string>()->default_value("tcp://*:9999"),
 
146
          _("End point to bind to"));
 
147
  context("use-replicator",
 
148
          po::value<string>()->default_value("default_replicator"),
 
149
          _("Name of the replicator plugin to use (default='default_replicator')"));
 
150
 
 
151
}
 
152
 
 
153
} /* namespace drizzle_plugin */
 
154
 
 
155
DRIZZLE_PLUGIN(drizzle_plugin::init, NULL, drizzle_plugin::init_options);