~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/zeromq/zeromq_log.cc

  • Committer: Monty Taylor
  • Date: 2010-12-27 18:39:11 UTC
  • mto: This revision was merged to the branch mainline in revision 2038.
  • Revision ID: mordred@inaugust.com-20101227183911-atgh0kcubflay0b9
Added back INNOBASE_SKIP_WARNINGS for solaris. Also dealt with unused params.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2011 Marcus Eriksson
5
 
 *
6
 
 *  Authors:
7
 
 *
8
 
 *  Marcus Eriksson <krummas@gmail.com>
9
 
 *
10
 
 *  This program is free software; you can redistribute it and/or modify
11
 
 *  it under the terms of the GNU General Public License as published by
12
 
 *  the Free Software Foundation; either version 2 of the License, or
13
 
 *  (at your option) any later version.
14
 
 *
15
 
 *  This program is distributed in the hope that it will be useful,
16
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
 
 *  GNU General Public License for more details.
19
 
 *
20
 
 *  You should have received a copy of the GNU General Public License
21
 
 *  along with this program; if not, write to the Free Software
22
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
23
 
 */
24
 
 
25
 
#include <config.h>
26
 
#include "zeromq_log.h"
27
 
#include <drizzled/message/transaction.pb.h>
28
 
#include <google/protobuf/io/coded_stream.h>
29
 
#include <stdio.h>
30
 
#include <drizzled/module/registry.h>
31
 
#include <drizzled/plugin.h>
32
 
#include <stdint.h>
33
 
#include <boost/program_options.hpp>
34
 
#include <drizzled/module/option_map.h>
35
 
#include <zmq.h>
36
 
 
37
 
namespace po= boost::program_options;
38
 
 
39
 
using namespace std;
40
 
using namespace drizzled;
41
 
using namespace google;
42
 
 
43
 
namespace drizzle_plugin
44
 
{
45
 
 
46
 
ZeroMQLog::ZeroMQLog(const string &name, const string &endpoint) :
47
 
  plugin::TransactionApplier(name)
48
 
{
49
 
  void *context= zmq_init(1);
50
 
  _socket= zmq_socket (context, ZMQ_PUB);
51
 
  assert (_socket);
52
 
  int rc= zmq_bind (_socket, endpoint.c_str());
53
 
  assert (rc == 0);
54
 
  pthread_mutex_init(&publishLock, NULL);
55
 
}
56
 
 
57
 
ZeroMQLog::~ZeroMQLog()
58
 
{
59
 
  zmq_close(_socket);
60
 
  pthread_mutex_destroy(&publishLock);
61
 
}
62
 
 
63
 
plugin::ReplicationReturnCode
64
 
ZeroMQLog::apply(Session &, const message::Transaction &to_apply)
65
 
{
66
 
  size_t message_byte_length= to_apply.ByteSize();
67
 
  uint8_t* buffer= new uint8_t[message_byte_length];
68
 
  if(buffer == NULL)
69
 
  {
70
 
    errmsg_printf(error::ERROR, _("Failed to allocate enough memory to transaction message\n"));
71
 
    deactivate();
72
 
    return plugin::UNKNOWN_ERROR;
73
 
  }
74
 
 
75
 
  string schema= getSchemaName(to_apply);
76
 
  zmq_msg_t schemamsg;
77
 
  int rc= zmq_msg_init_size(&schemamsg, schema.length());
78
 
  memcpy(zmq_msg_data(&schemamsg), schema.c_str(), schema.length());
79
 
 
80
 
  to_apply.SerializeWithCachedSizesToArray(buffer);
81
 
  zmq_msg_t msg;
82
 
  rc= zmq_msg_init_size(&msg, message_byte_length);
83
 
  assert (rc == 0);
84
 
  memcpy(zmq_msg_data(&msg), buffer, message_byte_length);
85
 
 
86
 
  // need a mutex around this since several threads can call this method at the same time
87
 
  pthread_mutex_lock(&publishLock);
88
 
  rc= zmq_send(_socket, &schemamsg, ZMQ_SNDMORE);
89
 
  rc= zmq_send(_socket, &msg, 0);
90
 
  pthread_mutex_unlock(&publishLock);
91
 
 
92
 
  zmq_msg_close(&msg);
93
 
  zmq_msg_close(&schemamsg);
94
 
  delete[] buffer;
95
 
  return plugin::SUCCESS;
96
 
}
97
 
 
98
 
string ZeroMQLog::getSchemaName(const message::Transaction &txn) {
99
 
  if(txn.statement_size() == 0) return "";
100
 
 
101
 
  const message::Statement &statement= txn.statement(0);
102
 
 
103
 
  switch(statement.type())
104
 
  {
105
 
        case message::Statement::INSERT:
106
 
          return statement.insert_header().table_metadata().schema_name();
107
 
        case message::Statement::UPDATE:
108
 
          return statement.update_header().table_metadata().schema_name();
109
 
        case message::Statement::DELETE:
110
 
          return statement.delete_header().table_metadata().schema_name();
111
 
        case message::Statement::CREATE_TABLE:
112
 
          return statement.create_table_statement().table().schema();
113
 
        case message::Statement::TRUNCATE_TABLE:
114
 
          return statement.truncate_table_statement().table_metadata().schema_name();
115
 
        case message::Statement::DROP_TABLE:
116
 
          return statement.drop_table_statement().table_metadata().schema_name();
117
 
        case message::Statement::CREATE_SCHEMA:
118
 
          return statement.create_schema_statement().schema().name();
119
 
        case message::Statement::DROP_SCHEMA:
120
 
          return statement.drop_schema_statement().schema_name();
121
 
    default:
122
 
          return "";
123
 
  }
124
 
}
125
 
 
126
 
static ZeroMQLog *zeromqLogger; ///< the actual plugin
127
 
 
128
 
/**
129
 
 * Initialize the zeromq logger
130
 
 */
131
 
static int init(drizzled::module::Context &context)
132
 
{
133
 
  const module::option_map &vm= context.getOptions();
134
 
  zeromqLogger= new ZeroMQLog("zeromq_log_applier", vm["endpoint"].as<string>());
135
 
  context.add(zeromqLogger);
136
 
  ReplicationServices::attachApplier(zeromqLogger, vm["use-replicator"].as<string>());
137
 
  context.registerVariable(new sys_var_const_string_val("endpoint", vm["endpoint"].as<string>()));
138
 
  return 0;
139
 
}
140
 
 
141
 
 
142
 
static void init_options(drizzled::module::option_context &context)
143
 
{
144
 
  context("endpoint", 
145
 
          po::value<string>()->default_value("tcp://*:9999"),
146
 
          _("End point to bind to"));
147
 
  context("use-replicator",
148
 
          po::value<string>()->default_value("default_replicator"),
149
 
          _("Name of the replicator plugin to use (default='default_replicator')"));
150
 
 
151
 
}
152
 
 
153
 
} /* namespace drizzle_plugin */
154
 
 
155
 
DRIZZLE_PLUGIN(drizzle_plugin::init, NULL, drizzle_plugin::init_options);