1
/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5
#include "rabbitmq_log.h"
6
#include <drizzled/message/transaction.pb.h>
7
#include <google/protobuf/io/coded_stream.h>
9
#include <drizzled/plugin/registry.h>
10
#include <drizzled/plugin.h>
12
#include "rabbitmq_handler.h"
15
using namespace drizzled;
16
using namespace google;
19
* The hostname to connect to
21
static char* sysvar_rabbitmq_host= NULL;
26
static int sysvar_rabbitmq_port= 0;
31
static char* sysvar_rabbitmq_username= NULL;
36
static char* sysvar_rabbitmq_password= NULL;
39
* rabbitmq virtualhost
41
static char* sysvar_rabbitmq_virtualhost= NULL;
44
* rabbitmq exchangename
46
static char* sysvar_rabbitmq_exchange= NULL;
49
* rabbitmq routing key
51
static char* sysvar_rabbitmq_routingkey= NULL;
54
* Is the rabbitmq log enabled?
56
static bool sysvar_rabbitmq_log_enabled= false;
59
RabbitMQLog::RabbitMQLog(const string name_arg,
60
RabbitMQHandler* mqHandler)
61
:plugin::TransactionApplier(name_arg)
63
rabbitMQHandler= mqHandler;
66
RabbitMQLog::~RabbitMQLog()
70
void RabbitMQLog::apply(const message::Transaction &to_apply)
72
size_t message_byte_length= to_apply.ByteSize();
73
uint8_t* buffer= static_cast<uint8_t *>(malloc(message_byte_length));
76
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate enough memory to transaction message\n"));
81
to_apply.SerializeWithCachedSizesToArray(buffer);
84
rabbitMQHandler->publish(buffer,
85
int(message_byte_length),
86
sysvar_rabbitmq_exchange,
87
sysvar_rabbitmq_routingkey);
91
errmsg_printf(ERRMSG_LVL_ERROR, _(e.what()));
97
RabbitMQLog *rabbitmqLogger; ///< the actual plugin
98
RabbitMQHandler* rabbitmqHandler; ///< the rabbitmq handler
101
* Initialize the rabbitmq logger - instanciates the dependencies (the handler)
102
* and creates the log handler with the dependency - makes it easier to swap out
103
* handler implementation
105
static int init(plugin::Registry ®istry)
107
if(sysvar_rabbitmq_log_enabled)
111
rabbitmqHandler= new RabbitMQHandler(sysvar_rabbitmq_host,
112
sysvar_rabbitmq_port,
113
sysvar_rabbitmq_username,
114
sysvar_rabbitmq_password,
115
sysvar_rabbitmq_virtualhost);
119
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the RabbitMQHandler. Got error: %s\n"),
125
rabbitmqLogger= new RabbitMQLog("rabbit-log", rabbitmqHandler);
129
errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the RabbitMQLog instance. Got error: %s\n"),
134
registry.add(rabbitmqLogger);
140
static int deinit(plugin::Registry ®istry)
142
/* Cleanup the logger itself - delete the logger first, then handler, to avoid NPEs */
145
registry.remove(rabbitmqLogger);
146
delete rabbitmqLogger;
147
delete rabbitmqHandler;
155
static DRIZZLE_SYSVAR_BOOL(enable,
156
sysvar_rabbitmq_log_enabled,
158
N_("Enable rabbitmq log"),
159
NULL, /* check func */
160
NULL, /* update func */
161
false /* default */);
164
static DRIZZLE_SYSVAR_STR(hostname,
165
sysvar_rabbitmq_host,
167
N_("Host name to connect to"),
168
NULL, /* check func */
169
NULL, /* update func*/
170
"localhost" /* default */);
173
static DRIZZLE_SYSVAR_INT(port,
174
sysvar_rabbitmq_port,
177
NULL, /* check func */
178
NULL, /* update func */
184
static DRIZZLE_SYSVAR_STR(username,
185
sysvar_rabbitmq_username,
187
N_("RabbitMQ username"),
188
NULL, /* check func */
189
NULL, /* update func*/
190
"guest" /* default */);
192
static DRIZZLE_SYSVAR_STR(password,
193
sysvar_rabbitmq_password,
195
N_("RabbitMQ password"),
196
NULL, /* check func */
197
NULL, /* update func*/
198
"guest" /* default */);
200
static DRIZZLE_SYSVAR_STR(virtualhost,
201
sysvar_rabbitmq_virtualhost,
203
N_("RabbitMQ virtualhost"),
204
NULL, /* check func */
205
NULL, /* update func*/
208
static DRIZZLE_SYSVAR_STR(exchange,
209
sysvar_rabbitmq_exchange,
211
N_("Name of RabbitMQ exchange to publish to"),
212
NULL, /* check func */
213
NULL, /* update func*/
214
"ReplicationExchange" /* default */);
216
static DRIZZLE_SYSVAR_STR(routingkey,
217
sysvar_rabbitmq_routingkey,
219
N_("Name of RabbitMQ routing key to use"),
220
NULL, /* check func */
221
NULL, /* update func*/
222
"ReplicationRoutingKey" /* default */);
225
static drizzle_sys_var* system_variables[]= {
226
DRIZZLE_SYSVAR(enable),
227
DRIZZLE_SYSVAR(hostname),
228
DRIZZLE_SYSVAR(port),
229
DRIZZLE_SYSVAR(username),
230
DRIZZLE_SYSVAR(password),
231
DRIZZLE_SYSVAR(virtualhost),
232
DRIZZLE_SYSVAR(exchange),
233
DRIZZLE_SYSVAR(routingkey),
237
DRIZZLE_PLUGIN(init, deinit, NULL, system_variables);