~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/rabbitmq/rabbitmq_log.cc

  • Committer: Monty Taylor
  • Date: 2008-10-23 00:05:28 UTC
  • Revision ID: monty@inaugust.com-20081023000528-grdvrd8c4058nutm
Moved my_handler to myisam, which is where it actually belongs.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2010 Marcus Eriksson
5
 
 *
6
 
 *  Authors:
7
 
 *
8
 
 *  Marcus Eriksson <krummas@gmail.com>
9
 
 *
10
 
 *  This program is free software; you can redistribute it and/or modify
11
 
 *  it under the terms of the GNU General Public License as published by
12
 
 *  the Free Software Foundation; either version 2 of the License, or
13
 
 *  (at your option) any later version.
14
 
 *
15
 
 *  This program is distributed in the hope that it will be useful,
16
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
 
 *  GNU General Public License for more details.
19
 
 *
20
 
 *  You should have received a copy of the GNU General Public License
21
 
 *  along with this program; if not, write to the Free Software
22
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
23
 
 */
24
 
 
25
 
#include "config.h"
26
 
#include "rabbitmq_log.h"
27
 
#include <drizzled/message/transaction.pb.h>
28
 
#include <google/protobuf/io/coded_stream.h>
29
 
#include <stdio.h>
30
 
#include <drizzled/module/registry.h>
31
 
#include <drizzled/plugin.h>
32
 
#include <stdint.h>
33
 
#include "rabbitmq_handler.h"
34
 
#include <boost/program_options.hpp>
35
 
#include <drizzled/module/option_map.h>
36
 
 
37
 
namespace po= boost::program_options;
38
 
 
39
 
using namespace std;
40
 
using namespace drizzled;
41
 
using namespace google;
42
 
 
43
 
/**
44
 
 * The hostname to connect to
45
 
 */
46
 
static char* sysvar_rabbitmq_host= NULL;
47
 
 
48
 
/**
49
 
 * rabbitmq port
50
 
 */
51
 
static int sysvar_rabbitmq_port= 0;
52
 
 
53
 
/**
54
 
 * rabbitmq username
55
 
 */
56
 
static char* sysvar_rabbitmq_username= NULL;
57
 
 
58
 
/**
59
 
 * rabbitmq password
60
 
 */
61
 
static char* sysvar_rabbitmq_password= NULL;
62
 
 
63
 
/**
64
 
 * rabbitmq virtualhost
65
 
 */
66
 
static char* sysvar_rabbitmq_virtualhost= NULL;
67
 
 
68
 
/**
69
 
 * rabbitmq exchangename
70
 
 */
71
 
static char* sysvar_rabbitmq_exchange= NULL;
72
 
 
73
 
/**
74
 
 * rabbitmq routing key
75
 
 */
76
 
static char* sysvar_rabbitmq_routingkey= NULL;
77
 
 
78
 
/**
79
 
 * Is the rabbitmq log enabled?
80
 
 */
81
 
static bool sysvar_rabbitmq_log_enabled= false;
82
 
 
83
 
/**
84
 
 * The name of the replicator plugin
85
 
 * to pair the rabbitmq log's applier with.
86
 
 * Defaults to "default"
87
 
 */
88
 
static char *sysvar_rabbitmq_use_replicator= NULL;
89
 
static const char DEFAULT_USE_REPLICATOR[]= "default";
90
 
 
91
 
 
92
 
RabbitMQLog::RabbitMQLog(const string name_arg, 
93
 
                         RabbitMQHandler* mqHandler)
94
 
  :plugin::TransactionApplier(name_arg)
95
 
{
96
 
  rabbitMQHandler= mqHandler;
97
 
}
98
 
 
99
 
RabbitMQLog::~RabbitMQLog() 
100
 
{
101
 
}
102
 
 
103
 
plugin::ReplicationReturnCode
104
 
RabbitMQLog::apply(Session &, const message::Transaction &to_apply)
105
 
{
106
 
  size_t message_byte_length= to_apply.ByteSize();
107
 
  uint8_t* buffer= static_cast<uint8_t *>(malloc(message_byte_length));
108
 
  if(buffer == NULL)
109
 
  {
110
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate enough memory to transaction message\n"));
111
 
    deactivate();
112
 
    return plugin::UNKNOWN_ERROR;
113
 
  }
114
 
 
115
 
  to_apply.SerializeWithCachedSizesToArray(buffer);
116
 
  try
117
 
  {
118
 
    rabbitMQHandler->publish(buffer, 
119
 
                             int(message_byte_length), 
120
 
                             sysvar_rabbitmq_exchange, 
121
 
                             sysvar_rabbitmq_routingkey);
122
 
  }
123
 
  catch(exception& e)
124
 
  {
125
 
    errmsg_printf(ERRMSG_LVL_ERROR, _(e.what()));
126
 
    deactivate();
127
 
    return plugin::UNKNOWN_ERROR;
128
 
  }
129
 
  free(buffer);
130
 
  return plugin::SUCCESS;
131
 
}
132
 
 
133
 
static RabbitMQLog *rabbitmqLogger; ///< the actual plugin
134
 
static RabbitMQHandler* rabbitmqHandler; ///< the rabbitmq handler
135
 
 
136
 
/**
137
 
 * Initialize the rabbitmq logger - instanciates the dependencies (the handler)
138
 
 * and creates the log handler with the dependency - makes it easier to swap out
139
 
 * handler implementation
140
 
 */
141
 
static int init(drizzled::module::Context &context)
142
 
{
143
 
  const module::option_map &vm= context.getOptions();
144
 
  
145
 
  if (vm.count("username"))
146
 
  {
147
 
    sysvar_rabbitmq_username= const_cast<char *>(vm["username"].as<string>().c_str());
148
 
  }
149
 
 
150
 
  if (vm.count("password"))
151
 
  {
152
 
    sysvar_rabbitmq_password= const_cast<char *>(vm["password"].as<string>().c_str());
153
 
  }
154
 
 
155
 
  if (vm.count("host"))
156
 
  {
157
 
    sysvar_rabbitmq_host= const_cast<char *>(vm["host"].as<string>().c_str());
158
 
  }
159
 
 
160
 
  if(sysvar_rabbitmq_log_enabled)
161
 
  {
162
 
    try 
163
 
    {
164
 
      rabbitmqHandler= new RabbitMQHandler(sysvar_rabbitmq_host, 
165
 
                                           sysvar_rabbitmq_port, 
166
 
                                           sysvar_rabbitmq_username, 
167
 
                                           sysvar_rabbitmq_password, 
168
 
                                           sysvar_rabbitmq_virtualhost);
169
 
    } 
170
 
    catch (exception& e) 
171
 
    {
172
 
      errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the RabbitMQHandler.  Got error: %s\n"),
173
 
                    e.what());
174
 
      return 1;
175
 
    }
176
 
    try 
177
 
    {
178
 
      rabbitmqLogger= new RabbitMQLog("rabbit_log_applier", rabbitmqHandler);
179
 
    } 
180
 
    catch (exception& e) 
181
 
    {
182
 
      errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the RabbitMQLog instance.  Got error: %s\n"), 
183
 
                    e.what());
184
 
      return 1;
185
 
    }
186
 
 
187
 
    context.add(rabbitmqLogger);
188
 
    ReplicationServices &replication_services= ReplicationServices::singleton();
189
 
    string replicator_name(sysvar_rabbitmq_use_replicator);
190
 
    replication_services.attachApplier(rabbitmqLogger, replicator_name);
191
 
    return 0;
192
 
  }
193
 
  return 0;
194
 
}
195
 
 
196
 
 
197
 
static DRIZZLE_SYSVAR_BOOL(enable,
198
 
                           sysvar_rabbitmq_log_enabled,
199
 
                           PLUGIN_VAR_NOCMDARG,
200
 
                           N_("Enable rabbitmq log"),
201
 
                           NULL, /* check func */
202
 
                           NULL, /* update func */
203
 
                           false /* default */);
204
 
 
205
 
 
206
 
static DRIZZLE_SYSVAR_STR(hostname,
207
 
                          sysvar_rabbitmq_host,
208
 
                          PLUGIN_VAR_READONLY,
209
 
                          N_("Host name to connect to"),
210
 
                          NULL, /* check func */
211
 
                          NULL, /* update func*/
212
 
                          "localhost" /* default */);
213
 
 
214
 
 
215
 
static DRIZZLE_SYSVAR_INT(port,
216
 
                          sysvar_rabbitmq_port,
217
 
                          PLUGIN_VAR_READONLY,
218
 
                          N_("RabbitMQ Port"),
219
 
                          NULL, /* check func */
220
 
                          NULL, /* update func */
221
 
                          5672, /* default */
222
 
                          0,
223
 
                          65535,
224
 
                          0);
225
 
 
226
 
static DRIZZLE_SYSVAR_STR(username,
227
 
                          sysvar_rabbitmq_username,
228
 
                          PLUGIN_VAR_READONLY,
229
 
                          N_("RabbitMQ username"),
230
 
                          NULL, /* check func */
231
 
                          NULL, /* update func*/
232
 
                          "guest" /* default */);
233
 
 
234
 
static DRIZZLE_SYSVAR_STR(password,
235
 
                          sysvar_rabbitmq_password,
236
 
                          PLUGIN_VAR_READONLY,
237
 
                          N_("RabbitMQ password"),
238
 
                          NULL, /* check func */
239
 
                          NULL, /* update func*/
240
 
                          "guest" /* default */);
241
 
 
242
 
static DRIZZLE_SYSVAR_STR(virtualhost,
243
 
                          sysvar_rabbitmq_virtualhost,
244
 
                          PLUGIN_VAR_READONLY,
245
 
                          N_("RabbitMQ virtualhost"),
246
 
                          NULL, /* check func */
247
 
                          NULL, /* update func*/
248
 
                          "/" /* default */);
249
 
 
250
 
static DRIZZLE_SYSVAR_STR(exchange,
251
 
                          sysvar_rabbitmq_exchange,
252
 
                          PLUGIN_VAR_READONLY,
253
 
                          N_("Name of RabbitMQ exchange to publish to"),
254
 
                          NULL, /* check func */
255
 
                          NULL, /* update func*/
256
 
                          "ReplicationExchange" /* default */);
257
 
 
258
 
static DRIZZLE_SYSVAR_STR(routingkey,
259
 
                          sysvar_rabbitmq_routingkey,
260
 
                          PLUGIN_VAR_READONLY,
261
 
                          N_("Name of RabbitMQ routing key to use"),
262
 
                          NULL, /* check func */
263
 
                          NULL, /* update func*/
264
 
                          "ReplicationRoutingKey" /* default */);
265
 
 
266
 
static DRIZZLE_SYSVAR_STR(use_replicator,
267
 
                          sysvar_rabbitmq_use_replicator,
268
 
                          PLUGIN_VAR_READONLY,
269
 
                          N_("Name of the replicator plugin to use (default='default_replicator')"),
270
 
                          NULL, /* check func */
271
 
                          NULL, /* update func*/
272
 
                          DEFAULT_USE_REPLICATOR /* default */);
273
 
 
274
 
static void init_options(drizzled::module::option_context &context)
275
 
{
276
 
  context ("enable",
277
 
           po::value<bool>(&sysvar_rabbitmq_log_enabled)->default_value(false)->zero_tokens(),
278
 
           N_("Enable rabbitmq log"));
279
 
  context("host", 
280
 
          po::value<string>()->default_value("localhost"),
281
 
          N_("Host name to connect to"));
282
 
  context("username",
283
 
          po::value<string>()->default_value("guest"),
284
 
          N_("RabbitMQ username"));
285
 
  context("password",
286
 
          po::value<string>()->default_value("guest"),
287
 
          N_("RabbitMQ password"));
288
 
}
289
 
 
290
 
static drizzle_sys_var* system_variables[]= {
291
 
  DRIZZLE_SYSVAR(enable),
292
 
  DRIZZLE_SYSVAR(hostname),
293
 
  DRIZZLE_SYSVAR(port),
294
 
  DRIZZLE_SYSVAR(username),
295
 
  DRIZZLE_SYSVAR(password),
296
 
  DRIZZLE_SYSVAR(virtualhost),
297
 
  DRIZZLE_SYSVAR(exchange),
298
 
  DRIZZLE_SYSVAR(routingkey),
299
 
  DRIZZLE_SYSVAR(use_replicator),
300
 
  NULL
301
 
};
302
 
 
303
 
DRIZZLE_PLUGIN(init, system_variables, init_options);
304