~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/rabbitmq/rabbitmq_log.cc

  • Committer: Stewart Smith
  • Date: 2010-11-07 04:22:31 UTC
  • mto: (1911.1.2 build)
  • mto: This revision was merged to the branch mainline in revision 1912.
  • Revision ID: stewart@flamingspork.com-20101107042231-ola4sl7j0qvg58tz
fix ARCHIVE storage engine calling exit (lintian warning). Was because we were linking in libinternal into libazio, which links into archive plugin. Just link libinternal into the command line utilities.

Show diffs side-by-side

added added

removed removed

Lines of Context:
22
22
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
23
23
 */
24
24
 
25
 
#include <config.h>
 
25
#include "config.h"
26
26
#include "rabbitmq_log.h"
27
27
#include <drizzled/message/transaction.pb.h>
28
28
#include <google/protobuf/io/coded_stream.h>
40
40
using namespace drizzled;
41
41
using namespace google;
42
42
 
43
 
namespace drizzle_plugin
44
 
{
 
43
/**
 
44
 * The hostname to connect to
 
45
 */
 
46
static char* sysvar_rabbitmq_host= NULL;
45
47
 
46
48
/**
47
49
 * rabbitmq port
48
50
 */
49
 
static port_constraint sysvar_rabbitmq_port;
50
 
 
51
 
 
52
 
RabbitMQLog::RabbitMQLog(const string &name, 
53
 
                         const std::string &exchange,
54
 
                         const std::string &routingkey,
55
 
                         RabbitMQHandler* mqHandler) :
56
 
  plugin::TransactionApplier(name),
57
 
  _rabbitMQHandler(mqHandler),
58
 
  _exchange(exchange),
59
 
  _routingkey(routingkey)
60
 
{ }
 
51
static int sysvar_rabbitmq_port= 0;
 
52
 
 
53
/**
 
54
 * rabbitmq username
 
55
 */
 
56
static char* sysvar_rabbitmq_username= NULL;
 
57
 
 
58
/**
 
59
 * rabbitmq password
 
60
 */
 
61
static char* sysvar_rabbitmq_password= NULL;
 
62
 
 
63
/**
 
64
 * rabbitmq virtualhost
 
65
 */
 
66
static char* sysvar_rabbitmq_virtualhost= NULL;
 
67
 
 
68
/**
 
69
 * rabbitmq exchangename
 
70
 */
 
71
static char* sysvar_rabbitmq_exchange= NULL;
 
72
 
 
73
/**
 
74
 * rabbitmq routing key
 
75
 */
 
76
static char* sysvar_rabbitmq_routingkey= NULL;
 
77
 
 
78
/**
 
79
 * Is the rabbitmq log enabled?
 
80
 */
 
81
static bool sysvar_rabbitmq_log_enabled= false;
 
82
 
 
83
/**
 
84
 * The name of the replicator plugin
 
85
 * to pair the rabbitmq log's applier with.
 
86
 * Defaults to "default"
 
87
 */
 
88
static char *sysvar_rabbitmq_use_replicator= NULL;
 
89
static const char DEFAULT_USE_REPLICATOR[]= "default";
 
90
 
 
91
 
 
92
RabbitMQLog::RabbitMQLog(const string name_arg, 
 
93
                         RabbitMQHandler* mqHandler)
 
94
  :plugin::TransactionApplier(name_arg)
 
95
{
 
96
  rabbitMQHandler= mqHandler;
 
97
}
61
98
 
62
99
RabbitMQLog::~RabbitMQLog() 
63
 
{ }
 
100
{
 
101
}
64
102
 
65
103
plugin::ReplicationReturnCode
66
104
RabbitMQLog::apply(Session &, const message::Transaction &to_apply)
67
105
{
68
106
  size_t message_byte_length= to_apply.ByteSize();
69
 
  uint8_t* buffer= new uint8_t[message_byte_length];
 
107
  uint8_t* buffer= static_cast<uint8_t *>(malloc(message_byte_length));
70
108
  if(buffer == NULL)
71
109
  {
72
 
    errmsg_printf(error::ERROR, _("Failed to allocate enough memory to transaction message\n"));
 
110
    errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate enough memory to transaction message\n"));
73
111
    deactivate();
74
112
    return plugin::UNKNOWN_ERROR;
75
113
  }
77
115
  to_apply.SerializeWithCachedSizesToArray(buffer);
78
116
  try
79
117
  {
80
 
    _rabbitMQHandler->publish(buffer, 
81
 
                             int(message_byte_length), 
82
 
                             _exchange,
83
 
                             _routingkey);
 
118
    rabbitMQHandler->publish(buffer, 
 
119
                             int(message_byte_length), 
 
120
                             sysvar_rabbitmq_exchange, 
 
121
                             sysvar_rabbitmq_routingkey);
84
122
  }
85
123
  catch(exception& e)
86
124
  {
87
 
    errmsg_printf(error::ERROR, _(e.what()));
 
125
    errmsg_printf(ERRMSG_LVL_ERROR, _(e.what()));
88
126
    deactivate();
89
127
    return plugin::UNKNOWN_ERROR;
90
128
  }
91
 
  delete[] buffer;
 
129
  free(buffer);
92
130
  return plugin::SUCCESS;
93
131
}
94
132
 
104
142
{
105
143
  const module::option_map &vm= context.getOptions();
106
144
  
107
 
  try 
108
 
  {
109
 
    rabbitmqHandler= new RabbitMQHandler(vm["host"].as<string>(),
110
 
                                         sysvar_rabbitmq_port, 
111
 
                                         vm["username"].as<string>(), 
112
 
                                         vm["password"].as<string>(), 
113
 
                                         vm["virtualhost"].as<string>());
114
 
  } 
115
 
  catch (exception& e) 
116
 
  {
117
 
    errmsg_printf(error::ERROR, _("Failed to allocate the RabbitMQHandler.  Got error: %s\n"),
118
 
                  e.what());
119
 
    return 1;
120
 
  }
121
 
  try 
122
 
  {
123
 
    rabbitmqLogger= new RabbitMQLog("rabbit_log_applier",
124
 
                                    vm["exchange"].as<string>(),
125
 
                                    vm["routingkey"].as<string>(),
126
 
                                    rabbitmqHandler);
127
 
  } 
128
 
  catch (exception& e) 
129
 
  {
130
 
    errmsg_printf(error::ERROR, _("Failed to allocate the RabbitMQLog instance.  Got error: %s\n"), 
131
 
                  e.what());
132
 
    return 1;
133
 
  }
134
 
 
135
 
  context.add(rabbitmqLogger);
136
 
  ReplicationServices &replication_services= ReplicationServices::singleton();
137
 
  replication_services.attachApplier(rabbitmqLogger, vm["use-replicator"].as<string>());
138
 
 
139
 
  context.registerVariable(new sys_var_const_string_val("host", vm["host"].as<string>()));
140
 
  context.registerVariable(new sys_var_constrained_value_readonly<in_port_t>("port", sysvar_rabbitmq_port));
141
 
  context.registerVariable(new sys_var_const_string_val("username", vm["username"].as<string>()));
142
 
  context.registerVariable(new sys_var_const_string_val("password", vm["password"].as<string>()));
143
 
  context.registerVariable(new sys_var_const_string_val("virtualhost", vm["virtualhost"].as<string>()));
144
 
  context.registerVariable(new sys_var_const_string_val("exchange", vm["exchange"].as<string>()));
145
 
  context.registerVariable(new sys_var_const_string_val("routingkey", vm["routingkey"].as<string>()));
146
 
 
 
145
  if (vm.count("username"))
 
146
  {
 
147
    sysvar_rabbitmq_username= const_cast<char *>(vm["username"].as<string>().c_str());
 
148
  }
 
149
 
 
150
  if (vm.count("password"))
 
151
  {
 
152
    sysvar_rabbitmq_password= const_cast<char *>(vm["password"].as<string>().c_str());
 
153
  }
 
154
 
 
155
  if (vm.count("host"))
 
156
  {
 
157
    sysvar_rabbitmq_host= const_cast<char *>(vm["host"].as<string>().c_str());
 
158
  }
 
159
 
 
160
  if(sysvar_rabbitmq_log_enabled)
 
161
  {
 
162
    try 
 
163
    {
 
164
      rabbitmqHandler= new RabbitMQHandler(sysvar_rabbitmq_host, 
 
165
                                           sysvar_rabbitmq_port, 
 
166
                                           sysvar_rabbitmq_username, 
 
167
                                           sysvar_rabbitmq_password, 
 
168
                                           sysvar_rabbitmq_virtualhost);
 
169
    } 
 
170
    catch (exception& e) 
 
171
    {
 
172
      errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the RabbitMQHandler.  Got error: %s\n"),
 
173
                    e.what());
 
174
      return 1;
 
175
    }
 
176
    try 
 
177
    {
 
178
      rabbitmqLogger= new RabbitMQLog("rabbit_log_applier", rabbitmqHandler);
 
179
    } 
 
180
    catch (exception& e) 
 
181
    {
 
182
      errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the RabbitMQLog instance.  Got error: %s\n"), 
 
183
                    e.what());
 
184
      return 1;
 
185
    }
 
186
 
 
187
    context.add(rabbitmqLogger);
 
188
    ReplicationServices &replication_services= ReplicationServices::singleton();
 
189
    string replicator_name(sysvar_rabbitmq_use_replicator);
 
190
    replication_services.attachApplier(rabbitmqLogger, replicator_name);
 
191
    return 0;
 
192
  }
147
193
  return 0;
148
194
}
149
195
 
150
196
 
 
197
static DRIZZLE_SYSVAR_BOOL(enable,
 
198
                           sysvar_rabbitmq_log_enabled,
 
199
                           PLUGIN_VAR_NOCMDARG,
 
200
                           N_("Enable rabbitmq log"),
 
201
                           NULL, /* check func */
 
202
                           NULL, /* update func */
 
203
                           false /* default */);
 
204
 
 
205
 
 
206
static DRIZZLE_SYSVAR_STR(hostname,
 
207
                          sysvar_rabbitmq_host,
 
208
                          PLUGIN_VAR_READONLY,
 
209
                          N_("Host name to connect to"),
 
210
                          NULL, /* check func */
 
211
                          NULL, /* update func*/
 
212
                          "localhost" /* default */);
 
213
 
 
214
 
 
215
static DRIZZLE_SYSVAR_INT(port,
 
216
                          sysvar_rabbitmq_port,
 
217
                          PLUGIN_VAR_READONLY,
 
218
                          N_("RabbitMQ Port"),
 
219
                          NULL, /* check func */
 
220
                          NULL, /* update func */
 
221
                          5672, /* default */
 
222
                          0,
 
223
                          65535,
 
224
                          0);
 
225
 
 
226
static DRIZZLE_SYSVAR_STR(username,
 
227
                          sysvar_rabbitmq_username,
 
228
                          PLUGIN_VAR_READONLY,
 
229
                          N_("RabbitMQ username"),
 
230
                          NULL, /* check func */
 
231
                          NULL, /* update func*/
 
232
                          "guest" /* default */);
 
233
 
 
234
static DRIZZLE_SYSVAR_STR(password,
 
235
                          sysvar_rabbitmq_password,
 
236
                          PLUGIN_VAR_READONLY,
 
237
                          N_("RabbitMQ password"),
 
238
                          NULL, /* check func */
 
239
                          NULL, /* update func*/
 
240
                          "guest" /* default */);
 
241
 
 
242
static DRIZZLE_SYSVAR_STR(virtualhost,
 
243
                          sysvar_rabbitmq_virtualhost,
 
244
                          PLUGIN_VAR_READONLY,
 
245
                          N_("RabbitMQ virtualhost"),
 
246
                          NULL, /* check func */
 
247
                          NULL, /* update func*/
 
248
                          "/" /* default */);
 
249
 
 
250
static DRIZZLE_SYSVAR_STR(exchange,
 
251
                          sysvar_rabbitmq_exchange,
 
252
                          PLUGIN_VAR_READONLY,
 
253
                          N_("Name of RabbitMQ exchange to publish to"),
 
254
                          NULL, /* check func */
 
255
                          NULL, /* update func*/
 
256
                          "ReplicationExchange" /* default */);
 
257
 
 
258
static DRIZZLE_SYSVAR_STR(routingkey,
 
259
                          sysvar_rabbitmq_routingkey,
 
260
                          PLUGIN_VAR_READONLY,
 
261
                          N_("Name of RabbitMQ routing key to use"),
 
262
                          NULL, /* check func */
 
263
                          NULL, /* update func*/
 
264
                          "ReplicationRoutingKey" /* default */);
 
265
 
 
266
static DRIZZLE_SYSVAR_STR(use_replicator,
 
267
                          sysvar_rabbitmq_use_replicator,
 
268
                          PLUGIN_VAR_READONLY,
 
269
                          N_("Name of the replicator plugin to use (default='default_replicator')"),
 
270
                          NULL, /* check func */
 
271
                          NULL, /* update func*/
 
272
                          DEFAULT_USE_REPLICATOR /* default */);
 
273
 
151
274
static void init_options(drizzled::module::option_context &context)
152
275
{
 
276
  context ("enable",
 
277
           po::value<bool>(&sysvar_rabbitmq_log_enabled)->default_value(false)->zero_tokens(),
 
278
           N_("Enable rabbitmq log"));
153
279
  context("host", 
154
280
          po::value<string>()->default_value("localhost"),
155
 
          _("Host name to connect to"));
156
 
  context("port",
157
 
          po::value<port_constraint>(&sysvar_rabbitmq_port)->default_value(5672),
158
 
          _("Port to connect to"));
159
 
  context("virtualhost",
160
 
          po::value<string>()->default_value("/"),
161
 
          _("RabbitMQ virtualhost"));
 
281
          N_("Host name to connect to"));
162
282
  context("username",
163
283
          po::value<string>()->default_value("guest"),
164
 
          _("RabbitMQ username"));
 
284
          N_("RabbitMQ username"));
165
285
  context("password",
166
286
          po::value<string>()->default_value("guest"),
167
 
          _("RabbitMQ password"));
168
 
  context("use-replicator",
169
 
          po::value<string>()->default_value("default_replicator"),
170
 
          _("Name of the replicator plugin to use (default='default_replicator')"));
171
 
  context("exchange",
172
 
          po::value<string>()->default_value("ReplicationExchange"),
173
 
          _("Name of RabbitMQ exchange to publish to"));
174
 
  context("routingkey",
175
 
          po::value<string>()->default_value("ReplicationRoutingKey"),
176
 
          _("Name of RabbitMQ routing key to use"));
 
287
          N_("RabbitMQ password"));
177
288
}
178
289
 
179
 
} /* namespace drizzle_plugin */
 
290
static drizzle_sys_var* system_variables[]= {
 
291
  DRIZZLE_SYSVAR(enable),
 
292
  DRIZZLE_SYSVAR(hostname),
 
293
  DRIZZLE_SYSVAR(port),
 
294
  DRIZZLE_SYSVAR(username),
 
295
  DRIZZLE_SYSVAR(password),
 
296
  DRIZZLE_SYSVAR(virtualhost),
 
297
  DRIZZLE_SYSVAR(exchange),
 
298
  DRIZZLE_SYSVAR(routingkey),
 
299
  DRIZZLE_SYSVAR(use_replicator),
 
300
  NULL
 
301
};
180
302
 
181
 
DRIZZLE_PLUGIN(drizzle_plugin::init, NULL, drizzle_plugin::init_options);
 
303
DRIZZLE_PLUGIN(init, system_variables, init_options);
182
304