~drizzle-trunk/drizzle/development

1283.2.2 by Marcus Eriksson
add copyright headers, change load_by_default to no, include config.h, change string initialization to () style
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
1283.2.2 by Marcus Eriksson
add copyright headers, change load_by_default to no, include config.h, change string initialization to () style
3
 *
4
 *  Copyright (C) 2010 Marcus Eriksson
5
 *
6
 *  Authors:
7
 *
8
 *  Marcus Eriksson <krummas@gmail.com>
9
 *
10
 *  This program is free software; you can redistribute it and/or modify
11
 *  it under the terms of the GNU General Public License as published by
12
 *  the Free Software Foundation; either version 2 of the License, or
13
 *  (at your option) any later version.
14
 *
15
 *  This program is distributed in the hope that it will be useful,
16
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
 *  GNU General Public License for more details.
19
 *
20
 *  You should have received a copy of the GNU General Public License
21
 *  along with this program; if not, write to the Free Software
22
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
23
 */
1283.2.2 by Marcus Eriksson
add copyright headers, change load_by_default to no, include config.h, change string initialization to () style
24
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
25
#include "config.h"
26
#include "rabbitmq_log.h"
27
#include <drizzled/message/transaction.pb.h>
28
#include <google/protobuf/io/coded_stream.h>
29
#include <stdio.h>
1530.4.3 by Monty Taylor
Two more missed renames.
30
#include <drizzled/module/registry.h>
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
31
#include <drizzled/plugin.h>
32
#include <stdint.h>
33
#include "rabbitmq_handler.h"
1666.3.1 by Vijay Samuel
Merge refactored command line for rabbitmq
34
#include <boost/program_options.hpp>
35
#include <drizzled/module/option_map.h>
36
37
namespace po= boost::program_options;
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
38
39
using namespace std;
40
using namespace drizzled;
41
using namespace google;
42
43
/**
44
 * The hostname to connect to
45
 */
46
static char* sysvar_rabbitmq_host= NULL;
47
48
/**
49
 * rabbitmq port
50
 */
51
static int sysvar_rabbitmq_port= 0;
52
53
/**
54
 * rabbitmq username
55
 */
56
static char* sysvar_rabbitmq_username= NULL;
57
58
/**
59
 * rabbitmq password
60
 */
61
static char* sysvar_rabbitmq_password= NULL;
62
63
/**
64
 * rabbitmq virtualhost
65
 */
66
static char* sysvar_rabbitmq_virtualhost= NULL;
67
68
/**
69
 * rabbitmq exchangename
70
 */
71
static char* sysvar_rabbitmq_exchange= NULL;
72
73
/**
74
 * rabbitmq routing key
75
 */
76
static char* sysvar_rabbitmq_routingkey= NULL;
77
78
/**
79
 * Is the rabbitmq log enabled?
80
 */
81
static bool sysvar_rabbitmq_log_enabled= false;
82
1441.1.4 by Jay Pipes
Add --rabbitmq-use-replicator=[default|filtered|...] functionality and update variables test case
83
/**
84
 * The name of the replicator plugin
85
 * to pair the rabbitmq log's applier with.
86
 * Defaults to "default"
87
 */
88
static char *sysvar_rabbitmq_use_replicator= NULL;
89
static const char DEFAULT_USE_REPLICATOR[]= "default";
90
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
91
92
RabbitMQLog::RabbitMQLog(const string name_arg, 
93
			 RabbitMQHandler* mqHandler)
94
  :plugin::TransactionApplier(name_arg)
95
{
96
  rabbitMQHandler= mqHandler;
97
}
98
99
RabbitMQLog::~RabbitMQLog() 
100
{
101
}
102
1441.1.4 by Jay Pipes
Add --rabbitmq-use-replicator=[default|filtered|...] functionality and update variables test case
103
plugin::ReplicationReturnCode
104
RabbitMQLog::apply(Session &, const message::Transaction &to_apply)
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
105
{
106
  size_t message_byte_length= to_apply.ByteSize();
107
  uint8_t* buffer= static_cast<uint8_t *>(malloc(message_byte_length));
108
  if(buffer == NULL)
109
  {
110
    errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate enough memory to transaction message\n"));
111
    deactivate();
1441.1.4 by Jay Pipes
Add --rabbitmq-use-replicator=[default|filtered|...] functionality and update variables test case
112
    return plugin::UNKNOWN_ERROR;
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
113
  }
114
115
  to_apply.SerializeWithCachedSizesToArray(buffer);
116
  try
117
  {
118
    rabbitMQHandler->publish(buffer, 
119
			     int(message_byte_length), 
120
			     sysvar_rabbitmq_exchange, 
121
			     sysvar_rabbitmq_routingkey);
122
  }
123
  catch(exception& e)
124
  {
125
    errmsg_printf(ERRMSG_LVL_ERROR, _(e.what()));
126
    deactivate();
1441.1.4 by Jay Pipes
Add --rabbitmq-use-replicator=[default|filtered|...] functionality and update variables test case
127
    return plugin::UNKNOWN_ERROR;
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
128
  }
129
  free(buffer);
1441.1.4 by Jay Pipes
Add --rabbitmq-use-replicator=[default|filtered|...] functionality and update variables test case
130
  return plugin::SUCCESS;
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
131
}
132
1441.1.2 by Jay Pipes
Two little problems were preventing the plugin from being loaded:
133
static RabbitMQLog *rabbitmqLogger; ///< the actual plugin
134
static RabbitMQHandler* rabbitmqHandler; ///< the rabbitmq handler
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
135
136
/**
137
 * Initialize the rabbitmq logger - instanciates the dependencies (the handler)
138
 * and creates the log handler with the dependency - makes it easier to swap out
139
 * handler implementation
140
 */
1530.2.6 by Monty Taylor
Moved plugin::Context to module::Context.
141
static int init(drizzled::module::Context &context)
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
142
{
1700.2.1 by Vijay Samuel
Merge precautionary fixes for rabbitmq to prevent plugin from breaking when my_getopt is removed.
143
  const module::option_map &vm= context.getOptions();
144
  
145
  if (vm.count("username"))
146
  {
147
    sysvar_rabbitmq_username= const_cast<char *>(vm["username"].as<string>().c_str());
148
  }
149
150
  if (vm.count("password"))
151
  {
152
    sysvar_rabbitmq_password= const_cast<char *>(vm["password"].as<string>().c_str());
153
  }
154
155
  if (vm.count("host"))
156
  {
157
    sysvar_rabbitmq_host= const_cast<char *>(vm["host"].as<string>().c_str());
158
  }
159
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
160
  if(sysvar_rabbitmq_log_enabled)
161
  {
162
    try 
163
    {
164
      rabbitmqHandler= new RabbitMQHandler(sysvar_rabbitmq_host, 
165
					   sysvar_rabbitmq_port, 
166
					   sysvar_rabbitmq_username, 
167
					   sysvar_rabbitmq_password, 
168
					   sysvar_rabbitmq_virtualhost);
169
    } 
170
    catch (exception& e) 
171
    {
172
      errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the RabbitMQHandler.  Got error: %s\n"),
173
		    e.what());
174
      return 1;
175
    }
176
    try 
177
    {
1441.1.4 by Jay Pipes
Add --rabbitmq-use-replicator=[default|filtered|...] functionality and update variables test case
178
      rabbitmqLogger= new RabbitMQLog("rabbit_log_applier", rabbitmqHandler);
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
179
    } 
180
    catch (exception& e) 
181
    {
182
      errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the RabbitMQLog instance.  Got error: %s\n"), 
183
		    e.what());
184
      return 1;
185
    }
186
1441.1.1 by Marcus Eriksson
drizzle_plugin changed
187
    context.add(rabbitmqLogger);
1441.1.4 by Jay Pipes
Add --rabbitmq-use-replicator=[default|filtered|...] functionality and update variables test case
188
    ReplicationServices &replication_services= ReplicationServices::singleton();
189
    string replicator_name(sysvar_rabbitmq_use_replicator);
190
    replication_services.attachApplier(rabbitmqLogger, replicator_name);
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
191
    return 0;
192
  }
193
  return 0;
194
}
195
196
197
static DRIZZLE_SYSVAR_BOOL(enable,
198
                           sysvar_rabbitmq_log_enabled,
199
                           PLUGIN_VAR_NOCMDARG,
200
                           N_("Enable rabbitmq log"),
201
                           NULL, /* check func */
202
                           NULL, /* update func */
203
                           false /* default */);
204
205
206
static DRIZZLE_SYSVAR_STR(hostname,
207
                          sysvar_rabbitmq_host,
208
                          PLUGIN_VAR_READONLY,
209
                          N_("Host name to connect to"),
210
                          NULL, /* check func */
211
                          NULL, /* update func*/
212
                          "localhost" /* default */);
213
214
215
static DRIZZLE_SYSVAR_INT(port,
216
			  sysvar_rabbitmq_port,
217
			  PLUGIN_VAR_READONLY,
218
			  N_("RabbitMQ Port"),
219
			  NULL, /* check func */
220
			  NULL, /* update func */
221
			  5672, /* default */
222
			  0,
223
			  65535,
224
			  0);
225
226
static DRIZZLE_SYSVAR_STR(username,
227
                          sysvar_rabbitmq_username,
228
                          PLUGIN_VAR_READONLY,
229
                          N_("RabbitMQ username"),
230
                          NULL, /* check func */
231
                          NULL, /* update func*/
232
                          "guest" /* default */);
233
234
static DRIZZLE_SYSVAR_STR(password,
235
                          sysvar_rabbitmq_password,
236
                          PLUGIN_VAR_READONLY,
237
                          N_("RabbitMQ password"),
238
                          NULL, /* check func */
239
                          NULL, /* update func*/
240
                          "guest" /* default */);
241
242
static DRIZZLE_SYSVAR_STR(virtualhost,
243
                          sysvar_rabbitmq_virtualhost,
244
                          PLUGIN_VAR_READONLY,
245
                          N_("RabbitMQ virtualhost"),
246
                          NULL, /* check func */
247
                          NULL, /* update func*/
248
                          "/" /* default */);
249
250
static DRIZZLE_SYSVAR_STR(exchange,
251
                          sysvar_rabbitmq_exchange,
252
                          PLUGIN_VAR_READONLY,
253
                          N_("Name of RabbitMQ exchange to publish to"),
254
                          NULL, /* check func */
255
                          NULL, /* update func*/
256
                          "ReplicationExchange" /* default */);
257
258
static DRIZZLE_SYSVAR_STR(routingkey,
259
                          sysvar_rabbitmq_routingkey,
260
                          PLUGIN_VAR_READONLY,
261
                          N_("Name of RabbitMQ routing key to use"),
262
                          NULL, /* check func */
263
                          NULL, /* update func*/
264
                          "ReplicationRoutingKey" /* default */);
265
1441.1.4 by Jay Pipes
Add --rabbitmq-use-replicator=[default|filtered|...] functionality and update variables test case
266
static DRIZZLE_SYSVAR_STR(use_replicator,
267
                          sysvar_rabbitmq_use_replicator,
268
                          PLUGIN_VAR_READONLY,
269
                          N_("Name of the replicator plugin to use (default='default_replicator')"),
270
                          NULL, /* check func */
271
                          NULL, /* update func*/
272
                          DEFAULT_USE_REPLICATOR /* default */);
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
273
1666.3.1 by Vijay Samuel
Merge refactored command line for rabbitmq
274
static void init_options(drizzled::module::option_context &context)
275
{
276
  context ("enable",
277
           po::value<bool>(&sysvar_rabbitmq_log_enabled)->default_value(false)->zero_tokens(),
278
           N_("Enable rabbitmq log"));
1700.2.1 by Vijay Samuel
Merge precautionary fixes for rabbitmq to prevent plugin from breaking when my_getopt is removed.
279
  context("host", 
280
          po::value<string>()->default_value("localhost"),
281
          N_("Host name to connect to"));
282
  context("username",
283
          po::value<string>()->default_value("guest"),
284
          N_("RabbitMQ username"));
285
  context("password",
286
          po::value<string>()->default_value("guest"),
287
          N_("RabbitMQ password"));
1666.3.1 by Vijay Samuel
Merge refactored command line for rabbitmq
288
}
289
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
290
static drizzle_sys_var* system_variables[]= {
291
  DRIZZLE_SYSVAR(enable),
292
  DRIZZLE_SYSVAR(hostname),
293
  DRIZZLE_SYSVAR(port),
294
  DRIZZLE_SYSVAR(username),
295
  DRIZZLE_SYSVAR(password),
296
  DRIZZLE_SYSVAR(virtualhost),
297
  DRIZZLE_SYSVAR(exchange),
298
  DRIZZLE_SYSVAR(routingkey),
1441.1.4 by Jay Pipes
Add --rabbitmq-use-replicator=[default|filtered|...] functionality and update variables test case
299
  DRIZZLE_SYSVAR(use_replicator),
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
300
  NULL
301
};
302
1666.3.1 by Vijay Samuel
Merge refactored command line for rabbitmq
303
DRIZZLE_PLUGIN(init, system_variables, init_options);
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
304