~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/rabbitmq/rabbitmq_log.cc

  • Committer: Monty Taylor
  • Date: 2009-04-14 19:16:51 UTC
  • mto: (997.2.5 mordred)
  • mto: This revision was merged to the branch mainline in revision 994.
  • Revision ID: mordred@inaugust.com-20090414191651-ltbww6hpqks8k7qk
Clarified instructions in README.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2010 Marcus Eriksson
5
 
 *
6
 
 *  Authors:
7
 
 *
8
 
 *  Marcus Eriksson <krummas@gmail.com>
9
 
 *
10
 
 *  This program is free software; you can redistribute it and/or modify
11
 
 *  it under the terms of the GNU General Public License as published by
12
 
 *  the Free Software Foundation; either version 2 of the License, or
13
 
 *  (at your option) any later version.
14
 
 *
15
 
 *  This program is distributed in the hope that it will be useful,
16
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
 
 *  GNU General Public License for more details.
19
 
 *
20
 
 *  You should have received a copy of the GNU General Public License
21
 
 *  along with this program; if not, write to the Free Software
22
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
23
 
 */
24
 
 
25
 
#include "config.h"
26
 
#include "rabbitmq_log.h"
27
 
#include <drizzled/message/transaction.pb.h>
28
 
#include <google/protobuf/io/coded_stream.h>
29
 
#include <stdio.h>
30
 
#include <drizzled/plugin/registry.h>
31
 
#include <drizzled/plugin.h>
32
 
#include <stdint.h>
33
 
#include "rabbitmq_handler.h"
34
 
 
35
 
using namespace std;
36
 
using namespace drizzled;
37
 
using namespace google;
38
 
 
39
 
/**
40
 
 * The hostname to connect to
41
 
 */
42
 
static char* sysvar_rabbitmq_host= NULL;
43
 
 
44
 
/**
45
 
 * rabbitmq port
46
 
 */
47
 
static int sysvar_rabbitmq_port= 0;
48
 
 
49
 
/**
50
 
 * rabbitmq username
51
 
 */
52
 
static char* sysvar_rabbitmq_username= NULL;
53
 
 
54
 
/**
55
 
 * rabbitmq password
56
 
 */
57
 
static char* sysvar_rabbitmq_password= NULL;
58
 
 
59
 
/**
60
 
 * rabbitmq virtualhost
61
 
 */
62
 
static char* sysvar_rabbitmq_virtualhost= NULL;
63
 
 
64
 
/**
65
 
 * rabbitmq exchangename
66
 
 */
67
 
static char* sysvar_rabbitmq_exchange= NULL;
68
 
 
69
 
/**
70
 
 * rabbitmq routing key
71
 
 */
72
 
static char* sysvar_rabbitmq_routingkey= NULL;
73
 
 
74
 
/**
75
 
 * Is the rabbitmq log enabled?
76
 
 */
77
 
static bool sysvar_rabbitmq_log_enabled= false;
78
 
 
79
 
 
80
 
RabbitMQLog::RabbitMQLog(const string name_arg, 
81
 
                         RabbitMQHandler* mqHandler)
82
 
  :plugin::TransactionApplier(name_arg)
83
 
{
84
 
  rabbitMQHandler= mqHandler;
85
 
}
86
 
 
87
 
RabbitMQLog::~RabbitMQLog() 
88
 
{
89
 
}
90
 
 
91
 
void RabbitMQLog::apply(const message::Transaction &to_apply)
92
 
{
93
 
  size_t message_byte_length= to_apply.ByteSize();
94
 
  uint8_t* buffer= static_cast<uint8_t *>(malloc(message_byte_length));
95
 
  if(buffer == NULL)
96
 
  {
97
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate enough memory to transaction message\n"));
98
 
    deactivate();
99
 
    return;
100
 
  }
101
 
 
102
 
  to_apply.SerializeWithCachedSizesToArray(buffer);
103
 
  try
104
 
  {
105
 
    rabbitMQHandler->publish(buffer, 
106
 
                             int(message_byte_length), 
107
 
                             sysvar_rabbitmq_exchange, 
108
 
                             sysvar_rabbitmq_routingkey);
109
 
  }
110
 
  catch(exception& e)
111
 
  {
112
 
    errmsg_printf(ERRMSG_LVL_ERROR, _(e.what()));
113
 
    deactivate();
114
 
  }
115
 
  free(buffer);
116
 
}
117
 
 
118
 
RabbitMQLog *rabbitmqLogger; ///< the actual plugin
119
 
RabbitMQHandler* rabbitmqHandler; ///< the rabbitmq handler
120
 
 
121
 
/**
122
 
 * Initialize the rabbitmq logger - instanciates the dependencies (the handler)
123
 
 * and creates the log handler with the dependency - makes it easier to swap out
124
 
 * handler implementation
125
 
 */
126
 
static int init(plugin::Registry &registry)
127
 
{
128
 
  if(sysvar_rabbitmq_log_enabled)
129
 
  {
130
 
    try 
131
 
    {
132
 
      rabbitmqHandler= new RabbitMQHandler(sysvar_rabbitmq_host, 
133
 
                                           sysvar_rabbitmq_port, 
134
 
                                           sysvar_rabbitmq_username, 
135
 
                                           sysvar_rabbitmq_password, 
136
 
                                           sysvar_rabbitmq_virtualhost);
137
 
    } 
138
 
    catch (exception& e) 
139
 
    {
140
 
      errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the RabbitMQHandler.  Got error: %s\n"),
141
 
                    e.what());
142
 
      return 1;
143
 
    }
144
 
    try 
145
 
    {
146
 
      rabbitmqLogger= new RabbitMQLog("rabbit-log", rabbitmqHandler);
147
 
    } 
148
 
    catch (exception& e) 
149
 
    {
150
 
      errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the RabbitMQLog instance.  Got error: %s\n"), 
151
 
                    e.what());
152
 
      return 1;
153
 
    }
154
 
 
155
 
    registry.add(rabbitmqLogger);
156
 
    return 0;
157
 
  }
158
 
  return 0;
159
 
}
160
 
 
161
 
static int deinit(plugin::Registry &registry)
162
 
{
163
 
  /* Cleanup the logger itself - delete the logger first, then handler, to avoid NPEs */
164
 
  if (rabbitmqLogger)
165
 
  {
166
 
    registry.remove(rabbitmqLogger);
167
 
    delete rabbitmqLogger;
168
 
    delete rabbitmqHandler;
169
 
  }
170
 
 
171
 
  return 0;
172
 
}
173
 
 
174
 
 
175
 
 
176
 
static DRIZZLE_SYSVAR_BOOL(enable,
177
 
                           sysvar_rabbitmq_log_enabled,
178
 
                           PLUGIN_VAR_NOCMDARG,
179
 
                           N_("Enable rabbitmq log"),
180
 
                           NULL, /* check func */
181
 
                           NULL, /* update func */
182
 
                           false /* default */);
183
 
 
184
 
 
185
 
static DRIZZLE_SYSVAR_STR(hostname,
186
 
                          sysvar_rabbitmq_host,
187
 
                          PLUGIN_VAR_READONLY,
188
 
                          N_("Host name to connect to"),
189
 
                          NULL, /* check func */
190
 
                          NULL, /* update func*/
191
 
                          "localhost" /* default */);
192
 
 
193
 
 
194
 
static DRIZZLE_SYSVAR_INT(port,
195
 
                          sysvar_rabbitmq_port,
196
 
                          PLUGIN_VAR_READONLY,
197
 
                          N_("RabbitMQ Port"),
198
 
                          NULL, /* check func */
199
 
                          NULL, /* update func */
200
 
                          5672, /* default */
201
 
                          0,
202
 
                          65535,
203
 
                          0);
204
 
 
205
 
static DRIZZLE_SYSVAR_STR(username,
206
 
                          sysvar_rabbitmq_username,
207
 
                          PLUGIN_VAR_READONLY,
208
 
                          N_("RabbitMQ username"),
209
 
                          NULL, /* check func */
210
 
                          NULL, /* update func*/
211
 
                          "guest" /* default */);
212
 
 
213
 
static DRIZZLE_SYSVAR_STR(password,
214
 
                          sysvar_rabbitmq_password,
215
 
                          PLUGIN_VAR_READONLY,
216
 
                          N_("RabbitMQ password"),
217
 
                          NULL, /* check func */
218
 
                          NULL, /* update func*/
219
 
                          "guest" /* default */);
220
 
 
221
 
static DRIZZLE_SYSVAR_STR(virtualhost,
222
 
                          sysvar_rabbitmq_virtualhost,
223
 
                          PLUGIN_VAR_READONLY,
224
 
                          N_("RabbitMQ virtualhost"),
225
 
                          NULL, /* check func */
226
 
                          NULL, /* update func*/
227
 
                          "/" /* default */);
228
 
 
229
 
static DRIZZLE_SYSVAR_STR(exchange,
230
 
                          sysvar_rabbitmq_exchange,
231
 
                          PLUGIN_VAR_READONLY,
232
 
                          N_("Name of RabbitMQ exchange to publish to"),
233
 
                          NULL, /* check func */
234
 
                          NULL, /* update func*/
235
 
                          "ReplicationExchange" /* default */);
236
 
 
237
 
static DRIZZLE_SYSVAR_STR(routingkey,
238
 
                          sysvar_rabbitmq_routingkey,
239
 
                          PLUGIN_VAR_READONLY,
240
 
                          N_("Name of RabbitMQ routing key to use"),
241
 
                          NULL, /* check func */
242
 
                          NULL, /* update func*/
243
 
                          "ReplicationRoutingKey" /* default */);
244
 
 
245
 
 
246
 
static drizzle_sys_var* system_variables[]= {
247
 
  DRIZZLE_SYSVAR(enable),
248
 
  DRIZZLE_SYSVAR(hostname),
249
 
  DRIZZLE_SYSVAR(port),
250
 
  DRIZZLE_SYSVAR(username),
251
 
  DRIZZLE_SYSVAR(password),
252
 
  DRIZZLE_SYSVAR(virtualhost),
253
 
  DRIZZLE_SYSVAR(exchange),
254
 
  DRIZZLE_SYSVAR(routingkey),
255
 
  NULL
256
 
};
257
 
 
258
 
DRIZZLE_PLUGIN(init, deinit, NULL, system_variables);
259