~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/rabbitmq/rabbitmq_log.cc

  • Committer: Brian Aker
  • Date: 2010-10-20 20:25:52 UTC
  • mto: (1864.2.1 merge)
  • mto: This revision was merged to the branch mainline in revision 1865.
  • Revision ID: brian@tangent.org-20101020202552-51y5sz5ledoxbp7t
Add support for --with-valgrind

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
#include <drizzled/message/transaction.pb.h>
28
28
#include <google/protobuf/io/coded_stream.h>
29
29
#include <stdio.h>
30
 
#include <drizzled/plugin/registry.h>
 
30
#include <drizzled/module/registry.h>
31
31
#include <drizzled/plugin.h>
32
32
#include <stdint.h>
33
33
#include "rabbitmq_handler.h"
 
34
#include <boost/program_options.hpp>
 
35
#include <drizzled/module/option_map.h>
 
36
 
 
37
namespace po= boost::program_options;
34
38
 
35
39
using namespace std;
36
40
using namespace drizzled;
76
80
 */
77
81
static bool sysvar_rabbitmq_log_enabled= false;
78
82
 
 
83
/**
 
84
 * The name of the replicator plugin
 
85
 * to pair the rabbitmq log's applier with.
 
86
 * Defaults to "default"
 
87
 */
 
88
static char *sysvar_rabbitmq_use_replicator= NULL;
 
89
static const char DEFAULT_USE_REPLICATOR[]= "default";
 
90
 
79
91
 
80
92
RabbitMQLog::RabbitMQLog(const string name_arg, 
81
93
                         RabbitMQHandler* mqHandler)
88
100
{
89
101
}
90
102
 
91
 
void RabbitMQLog::apply(const message::Transaction &to_apply)
 
103
plugin::ReplicationReturnCode
 
104
RabbitMQLog::apply(Session &, const message::Transaction &to_apply)
92
105
{
93
106
  size_t message_byte_length= to_apply.ByteSize();
94
107
  uint8_t* buffer= static_cast<uint8_t *>(malloc(message_byte_length));
96
109
  {
97
110
    errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate enough memory to transaction message\n"));
98
111
    deactivate();
99
 
    return;
 
112
    return plugin::UNKNOWN_ERROR;
100
113
  }
101
114
 
102
115
  to_apply.SerializeWithCachedSizesToArray(buffer);
111
124
  {
112
125
    errmsg_printf(ERRMSG_LVL_ERROR, _(e.what()));
113
126
    deactivate();
 
127
    return plugin::UNKNOWN_ERROR;
114
128
  }
115
129
  free(buffer);
 
130
  return plugin::SUCCESS;
116
131
}
117
132
 
118
 
RabbitMQLog *rabbitmqLogger; ///< the actual plugin
119
 
RabbitMQHandler* rabbitmqHandler; ///< the rabbitmq handler
 
133
static RabbitMQLog *rabbitmqLogger; ///< the actual plugin
 
134
static RabbitMQHandler* rabbitmqHandler; ///< the rabbitmq handler
120
135
 
121
136
/**
122
137
 * Initialize the rabbitmq logger - instanciates the dependencies (the handler)
123
138
 * and creates the log handler with the dependency - makes it easier to swap out
124
139
 * handler implementation
125
140
 */
126
 
static int init(plugin::Registry &registry)
 
141
static int init(drizzled::module::Context &context)
127
142
{
 
143
  const module::option_map &vm= context.getOptions();
 
144
  
 
145
  if (vm.count("username"))
 
146
  {
 
147
    sysvar_rabbitmq_username= const_cast<char *>(vm["username"].as<string>().c_str());
 
148
  }
 
149
 
 
150
  if (vm.count("password"))
 
151
  {
 
152
    sysvar_rabbitmq_password= const_cast<char *>(vm["password"].as<string>().c_str());
 
153
  }
 
154
 
 
155
  if (vm.count("host"))
 
156
  {
 
157
    sysvar_rabbitmq_host= const_cast<char *>(vm["host"].as<string>().c_str());
 
158
  }
 
159
 
128
160
  if(sysvar_rabbitmq_log_enabled)
129
161
  {
130
162
    try 
143
175
    }
144
176
    try 
145
177
    {
146
 
      rabbitmqLogger= new RabbitMQLog("rabbit-log", rabbitmqHandler);
 
178
      rabbitmqLogger= new RabbitMQLog("rabbit_log_applier", rabbitmqHandler);
147
179
    } 
148
180
    catch (exception& e) 
149
181
    {
152
184
      return 1;
153
185
    }
154
186
 
155
 
    registry.add(rabbitmqLogger);
 
187
    context.add(rabbitmqLogger);
 
188
    ReplicationServices &replication_services= ReplicationServices::singleton();
 
189
    string replicator_name(sysvar_rabbitmq_use_replicator);
 
190
    replication_services.attachApplier(rabbitmqLogger, replicator_name);
156
191
    return 0;
157
192
  }
158
193
  return 0;
159
194
}
160
195
 
161
 
static int deinit(plugin::Registry &registry)
162
 
{
163
 
  /* Cleanup the logger itself - delete the logger first, then handler, to avoid NPEs */
164
 
  if (rabbitmqLogger)
165
 
  {
166
 
    registry.remove(rabbitmqLogger);
167
 
    delete rabbitmqLogger;
168
 
    delete rabbitmqHandler;
169
 
  }
170
 
 
171
 
  return 0;
172
 
}
173
 
 
174
 
 
175
196
 
176
197
static DRIZZLE_SYSVAR_BOOL(enable,
177
198
                           sysvar_rabbitmq_log_enabled,
242
263
                          NULL, /* update func*/
243
264
                          "ReplicationRoutingKey" /* default */);
244
265
 
 
266
static DRIZZLE_SYSVAR_STR(use_replicator,
 
267
                          sysvar_rabbitmq_use_replicator,
 
268
                          PLUGIN_VAR_READONLY,
 
269
                          N_("Name of the replicator plugin to use (default='default_replicator')"),
 
270
                          NULL, /* check func */
 
271
                          NULL, /* update func*/
 
272
                          DEFAULT_USE_REPLICATOR /* default */);
 
273
 
 
274
static void init_options(drizzled::module::option_context &context)
 
275
{
 
276
  context ("enable",
 
277
           po::value<bool>(&sysvar_rabbitmq_log_enabled)->default_value(false)->zero_tokens(),
 
278
           N_("Enable rabbitmq log"));
 
279
  context("host", 
 
280
          po::value<string>()->default_value("localhost"),
 
281
          N_("Host name to connect to"));
 
282
  context("username",
 
283
          po::value<string>()->default_value("guest"),
 
284
          N_("RabbitMQ username"));
 
285
  context("password",
 
286
          po::value<string>()->default_value("guest"),
 
287
          N_("RabbitMQ password"));
 
288
}
245
289
 
246
290
static drizzle_sys_var* system_variables[]= {
247
291
  DRIZZLE_SYSVAR(enable),
252
296
  DRIZZLE_SYSVAR(virtualhost),
253
297
  DRIZZLE_SYSVAR(exchange),
254
298
  DRIZZLE_SYSVAR(routingkey),
 
299
  DRIZZLE_SYSVAR(use_replicator),
255
300
  NULL
256
301
};
257
302
 
258
 
DRIZZLE_PLUGIN(init, deinit, NULL, system_variables);
 
303
DRIZZLE_PLUGIN(init, system_variables, init_options);
259
304