~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/rabbitmq/rabbitmq_log.cc

  • Committer: Monty Taylor
  • Date: 2011-02-13 17:26:39 UTC
  • mfrom: (2157.2.2 give-in-to-pkg-config)
  • mto: This revision was merged to the branch mainline in revision 2166.
  • Revision ID: mordred@inaugust.com-20110213172639-nhy7i72sfhoq13ms
Merged in pkg-config fixes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
40
40
using namespace drizzled;
41
41
using namespace google;
42
42
 
43
 
/**
44
 
 * The hostname to connect to
45
 
 */
46
 
static char* sysvar_rabbitmq_host= NULL;
 
43
namespace drizzle_plugin
 
44
{
47
45
 
48
46
/**
49
47
 * rabbitmq port
50
48
 */
51
 
static int sysvar_rabbitmq_port= 0;
52
 
 
53
 
/**
54
 
 * rabbitmq username
55
 
 */
56
 
static char* sysvar_rabbitmq_username= NULL;
57
 
 
58
 
/**
59
 
 * rabbitmq password
60
 
 */
61
 
static char* sysvar_rabbitmq_password= NULL;
62
 
 
63
 
/**
64
 
 * rabbitmq virtualhost
65
 
 */
66
 
static char* sysvar_rabbitmq_virtualhost= NULL;
67
 
 
68
 
/**
69
 
 * rabbitmq exchangename
70
 
 */
71
 
static char* sysvar_rabbitmq_exchange= NULL;
72
 
 
73
 
/**
74
 
 * rabbitmq routing key
75
 
 */
76
 
static char* sysvar_rabbitmq_routingkey= NULL;
77
 
 
78
 
/**
79
 
 * Is the rabbitmq log enabled?
80
 
 */
81
 
static bool sysvar_rabbitmq_log_enabled= false;
82
 
 
83
 
/**
84
 
 * The name of the replicator plugin
85
 
 * to pair the rabbitmq log's applier with.
86
 
 * Defaults to "default"
87
 
 */
88
 
static char *sysvar_rabbitmq_use_replicator= NULL;
89
 
static const char DEFAULT_USE_REPLICATOR[]= "default";
90
 
 
91
 
 
92
 
RabbitMQLog::RabbitMQLog(const string name_arg, 
93
 
                         RabbitMQHandler* mqHandler)
94
 
  :plugin::TransactionApplier(name_arg)
95
 
{
96
 
  rabbitMQHandler= mqHandler;
97
 
}
 
49
static port_constraint sysvar_rabbitmq_port;
 
50
 
 
51
 
 
52
RabbitMQLog::RabbitMQLog(const string &name, 
 
53
                         const std::string &exchange,
 
54
                         const std::string &routingkey,
 
55
                         RabbitMQHandler* mqHandler) :
 
56
  plugin::TransactionApplier(name),
 
57
  _rabbitMQHandler(mqHandler),
 
58
  _exchange(exchange),
 
59
  _routingkey(routingkey)
 
60
{ }
98
61
 
99
62
RabbitMQLog::~RabbitMQLog() 
100
 
{
101
 
}
 
63
{ }
102
64
 
103
65
plugin::ReplicationReturnCode
104
66
RabbitMQLog::apply(Session &, const message::Transaction &to_apply)
105
67
{
106
68
  size_t message_byte_length= to_apply.ByteSize();
107
 
  uint8_t* buffer= static_cast<uint8_t *>(malloc(message_byte_length));
 
69
  uint8_t* buffer= new uint8_t[message_byte_length];
108
70
  if(buffer == NULL)
109
71
  {
110
 
    errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate enough memory to transaction message\n"));
 
72
    errmsg_printf(error::ERROR, _("Failed to allocate enough memory to transaction message\n"));
111
73
    deactivate();
112
74
    return plugin::UNKNOWN_ERROR;
113
75
  }
115
77
  to_apply.SerializeWithCachedSizesToArray(buffer);
116
78
  try
117
79
  {
118
 
    rabbitMQHandler->publish(buffer, 
119
 
                             int(message_byte_length), 
120
 
                             sysvar_rabbitmq_exchange, 
121
 
                             sysvar_rabbitmq_routingkey);
 
80
    _rabbitMQHandler->publish(buffer, 
 
81
                             int(message_byte_length), 
 
82
                             _exchange,
 
83
                             _routingkey);
122
84
  }
123
85
  catch(exception& e)
124
86
  {
125
 
    errmsg_printf(ERRMSG_LVL_ERROR, _(e.what()));
 
87
    errmsg_printf(error::ERROR, _(e.what()));
126
88
    deactivate();
127
89
    return plugin::UNKNOWN_ERROR;
128
90
  }
129
 
  free(buffer);
 
91
  delete[] buffer;
130
92
  return plugin::SUCCESS;
131
93
}
132
94
 
142
104
{
143
105
  const module::option_map &vm= context.getOptions();
144
106
  
145
 
  if (vm.count("username"))
146
 
  {
147
 
    sysvar_rabbitmq_username= const_cast<char *>(vm["username"].as<string>().c_str());
148
 
  }
149
 
 
150
 
  if (vm.count("password"))
151
 
  {
152
 
    sysvar_rabbitmq_password= const_cast<char *>(vm["password"].as<string>().c_str());
153
 
  }
154
 
 
155
 
  if (vm.count("host"))
156
 
  {
157
 
    sysvar_rabbitmq_host= const_cast<char *>(vm["host"].as<string>().c_str());
158
 
  }
159
 
 
160
 
  if(sysvar_rabbitmq_log_enabled)
161
 
  {
162
 
    try 
163
 
    {
164
 
      rabbitmqHandler= new RabbitMQHandler(sysvar_rabbitmq_host, 
165
 
                                           sysvar_rabbitmq_port, 
166
 
                                           sysvar_rabbitmq_username, 
167
 
                                           sysvar_rabbitmq_password, 
168
 
                                           sysvar_rabbitmq_virtualhost);
169
 
    } 
170
 
    catch (exception& e) 
171
 
    {
172
 
      errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the RabbitMQHandler.  Got error: %s\n"),
173
 
                    e.what());
174
 
      return 1;
175
 
    }
176
 
    try 
177
 
    {
178
 
      rabbitmqLogger= new RabbitMQLog("rabbit_log_applier", rabbitmqHandler);
179
 
    } 
180
 
    catch (exception& e) 
181
 
    {
182
 
      errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the RabbitMQLog instance.  Got error: %s\n"), 
183
 
                    e.what());
184
 
      return 1;
185
 
    }
186
 
 
187
 
    context.add(rabbitmqLogger);
188
 
    ReplicationServices &replication_services= ReplicationServices::singleton();
189
 
    string replicator_name(sysvar_rabbitmq_use_replicator);
190
 
    replication_services.attachApplier(rabbitmqLogger, replicator_name);
191
 
    return 0;
192
 
  }
 
107
  try 
 
108
  {
 
109
    rabbitmqHandler= new RabbitMQHandler(vm["host"].as<string>(),
 
110
                                         sysvar_rabbitmq_port, 
 
111
                                         vm["username"].as<string>(), 
 
112
                                         vm["password"].as<string>(), 
 
113
                                         vm["virtualhost"].as<string>());
 
114
  } 
 
115
  catch (exception& e) 
 
116
  {
 
117
    errmsg_printf(error::ERROR, _("Failed to allocate the RabbitMQHandler.  Got error: %s\n"),
 
118
                  e.what());
 
119
    return 1;
 
120
  }
 
121
  try 
 
122
  {
 
123
    rabbitmqLogger= new RabbitMQLog("rabbit_log_applier",
 
124
                                    vm["exchange"].as<string>(),
 
125
                                    vm["routingkey"].as<string>(),
 
126
                                    rabbitmqHandler);
 
127
  } 
 
128
  catch (exception& e) 
 
129
  {
 
130
    errmsg_printf(error::ERROR, _("Failed to allocate the RabbitMQLog instance.  Got error: %s\n"), 
 
131
                  e.what());
 
132
    return 1;
 
133
  }
 
134
 
 
135
  context.add(rabbitmqLogger);
 
136
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
137
  replication_services.attachApplier(rabbitmqLogger, vm["use-replicator"].as<string>());
 
138
 
 
139
  context.registerVariable(new sys_var_const_string_val("host", vm["host"].as<string>()));
 
140
  context.registerVariable(new sys_var_constrained_value_readonly<in_port_t>("port", sysvar_rabbitmq_port));
 
141
  context.registerVariable(new sys_var_const_string_val("username", vm["username"].as<string>()));
 
142
  context.registerVariable(new sys_var_const_string_val("password", vm["password"].as<string>()));
 
143
  context.registerVariable(new sys_var_const_string_val("virtualhost", vm["virtualhost"].as<string>()));
 
144
  context.registerVariable(new sys_var_const_string_val("exchange", vm["exchange"].as<string>()));
 
145
  context.registerVariable(new sys_var_const_string_val("routingkey", vm["routingkey"].as<string>()));
 
146
 
193
147
  return 0;
194
148
}
195
149
 
196
150
 
197
 
static DRIZZLE_SYSVAR_BOOL(enable,
198
 
                           sysvar_rabbitmq_log_enabled,
199
 
                           PLUGIN_VAR_NOCMDARG,
200
 
                           N_("Enable rabbitmq log"),
201
 
                           NULL, /* check func */
202
 
                           NULL, /* update func */
203
 
                           false /* default */);
204
 
 
205
 
 
206
 
static DRIZZLE_SYSVAR_STR(hostname,
207
 
                          sysvar_rabbitmq_host,
208
 
                          PLUGIN_VAR_READONLY,
209
 
                          N_("Host name to connect to"),
210
 
                          NULL, /* check func */
211
 
                          NULL, /* update func*/
212
 
                          "localhost" /* default */);
213
 
 
214
 
 
215
 
static DRIZZLE_SYSVAR_INT(port,
216
 
                          sysvar_rabbitmq_port,
217
 
                          PLUGIN_VAR_READONLY,
218
 
                          N_("RabbitMQ Port"),
219
 
                          NULL, /* check func */
220
 
                          NULL, /* update func */
221
 
                          5672, /* default */
222
 
                          0,
223
 
                          65535,
224
 
                          0);
225
 
 
226
 
static DRIZZLE_SYSVAR_STR(username,
227
 
                          sysvar_rabbitmq_username,
228
 
                          PLUGIN_VAR_READONLY,
229
 
                          N_("RabbitMQ username"),
230
 
                          NULL, /* check func */
231
 
                          NULL, /* update func*/
232
 
                          "guest" /* default */);
233
 
 
234
 
static DRIZZLE_SYSVAR_STR(password,
235
 
                          sysvar_rabbitmq_password,
236
 
                          PLUGIN_VAR_READONLY,
237
 
                          N_("RabbitMQ password"),
238
 
                          NULL, /* check func */
239
 
                          NULL, /* update func*/
240
 
                          "guest" /* default */);
241
 
 
242
 
static DRIZZLE_SYSVAR_STR(virtualhost,
243
 
                          sysvar_rabbitmq_virtualhost,
244
 
                          PLUGIN_VAR_READONLY,
245
 
                          N_("RabbitMQ virtualhost"),
246
 
                          NULL, /* check func */
247
 
                          NULL, /* update func*/
248
 
                          "/" /* default */);
249
 
 
250
 
static DRIZZLE_SYSVAR_STR(exchange,
251
 
                          sysvar_rabbitmq_exchange,
252
 
                          PLUGIN_VAR_READONLY,
253
 
                          N_("Name of RabbitMQ exchange to publish to"),
254
 
                          NULL, /* check func */
255
 
                          NULL, /* update func*/
256
 
                          "ReplicationExchange" /* default */);
257
 
 
258
 
static DRIZZLE_SYSVAR_STR(routingkey,
259
 
                          sysvar_rabbitmq_routingkey,
260
 
                          PLUGIN_VAR_READONLY,
261
 
                          N_("Name of RabbitMQ routing key to use"),
262
 
                          NULL, /* check func */
263
 
                          NULL, /* update func*/
264
 
                          "ReplicationRoutingKey" /* default */);
265
 
 
266
 
static DRIZZLE_SYSVAR_STR(use_replicator,
267
 
                          sysvar_rabbitmq_use_replicator,
268
 
                          PLUGIN_VAR_READONLY,
269
 
                          N_("Name of the replicator plugin to use (default='default_replicator')"),
270
 
                          NULL, /* check func */
271
 
                          NULL, /* update func*/
272
 
                          DEFAULT_USE_REPLICATOR /* default */);
273
 
 
274
151
static void init_options(drizzled::module::option_context &context)
275
152
{
276
 
  context ("enable",
277
 
           po::value<bool>(&sysvar_rabbitmq_log_enabled)->default_value(false)->zero_tokens(),
278
 
           N_("Enable rabbitmq log"));
279
153
  context("host", 
280
154
          po::value<string>()->default_value("localhost"),
281
 
          N_("Host name to connect to"));
 
155
          _("Host name to connect to"));
 
156
  context("port",
 
157
          po::value<port_constraint>(&sysvar_rabbitmq_port)->default_value(5672),
 
158
          _("Port to connect to"));
 
159
  context("virtualhost",
 
160
          po::value<string>()->default_value("/"),
 
161
          _("RabbitMQ virtualhost"));
282
162
  context("username",
283
163
          po::value<string>()->default_value("guest"),
284
 
          N_("RabbitMQ username"));
 
164
          _("RabbitMQ username"));
285
165
  context("password",
286
166
          po::value<string>()->default_value("guest"),
287
 
          N_("RabbitMQ password"));
 
167
          _("RabbitMQ password"));
 
168
  context("use-replicator",
 
169
          po::value<string>()->default_value("default_replicator"),
 
170
          _("Name of the replicator plugin to use (default='default_replicator')"));
 
171
  context("exchange",
 
172
          po::value<string>()->default_value("ReplicationExchange"),
 
173
          _("Name of RabbitMQ exchange to publish to"));
 
174
  context("routingkey",
 
175
          po::value<string>()->default_value("ReplicationRoutingKey"),
 
176
          _("Name of RabbitMQ routing key to use"));
288
177
}
289
178
 
290
 
static drizzle_sys_var* system_variables[]= {
291
 
  DRIZZLE_SYSVAR(enable),
292
 
  DRIZZLE_SYSVAR(hostname),
293
 
  DRIZZLE_SYSVAR(port),
294
 
  DRIZZLE_SYSVAR(username),
295
 
  DRIZZLE_SYSVAR(password),
296
 
  DRIZZLE_SYSVAR(virtualhost),
297
 
  DRIZZLE_SYSVAR(exchange),
298
 
  DRIZZLE_SYSVAR(routingkey),
299
 
  DRIZZLE_SYSVAR(use_replicator),
300
 
  NULL
301
 
};
 
179
} /* namespace drizzle_plugin */
302
180
 
303
 
DRIZZLE_PLUGIN(init, system_variables, init_options);
 
181
DRIZZLE_PLUGIN(drizzle_plugin::init, NULL, drizzle_plugin::init_options);
304
182