~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/rabbitmq/rabbitmq_log.cc

  • Committer: Lee Bieber
  • Date: 2010-12-05 19:04:34 UTC
  • mfrom: (1975.1.4 build)
  • Revision ID: kalebral@gmail.com-20101205190434-9mtyc2cfzbd1hbye
Merge Padraig - Fix for query rewrite plugin.
Merge Monty: more plugin_sysvar removal work.

Show diffs side-by-side

added added

removed removed

Lines of Context:
40
40
using namespace drizzled;
41
41
using namespace google;
42
42
 
43
 
/**
44
 
 * The hostname to connect to
45
 
 */
46
 
static char* sysvar_rabbitmq_host= NULL;
 
43
namespace drizzle_plugin
 
44
{
47
45
 
48
46
/**
49
47
 * rabbitmq port
50
48
 */
51
 
static int sysvar_rabbitmq_port= 0;
52
 
 
53
 
/**
54
 
 * rabbitmq username
55
 
 */
56
 
static char* sysvar_rabbitmq_username= NULL;
57
 
 
58
 
/**
59
 
 * rabbitmq password
60
 
 */
61
 
static char* sysvar_rabbitmq_password= NULL;
62
 
 
63
 
/**
64
 
 * rabbitmq virtualhost
65
 
 */
66
 
static char* sysvar_rabbitmq_virtualhost= NULL;
67
 
 
68
 
/**
69
 
 * rabbitmq exchangename
70
 
 */
71
 
static char* sysvar_rabbitmq_exchange= NULL;
72
 
 
73
 
/**
74
 
 * rabbitmq routing key
75
 
 */
76
 
static char* sysvar_rabbitmq_routingkey= NULL;
77
 
 
78
 
/**
79
 
 * Is the rabbitmq log enabled?
80
 
 */
81
 
static bool sysvar_rabbitmq_log_enabled= false;
82
 
 
83
 
/**
84
 
 * The name of the replicator plugin
85
 
 * to pair the rabbitmq log's applier with.
86
 
 * Defaults to "default"
87
 
 */
88
 
static char *sysvar_rabbitmq_use_replicator= NULL;
89
 
static const char DEFAULT_USE_REPLICATOR[]= "default";
90
 
 
91
 
 
92
 
RabbitMQLog::RabbitMQLog(const string name_arg, 
93
 
                         RabbitMQHandler* mqHandler)
94
 
  :plugin::TransactionApplier(name_arg)
95
 
{
96
 
  rabbitMQHandler= mqHandler;
97
 
}
 
49
typedef drizzled::constrained_check<in_port_t, 65535, 0> port_constraint;
 
50
static port_constraint sysvar_rabbitmq_port;
 
51
 
 
52
 
 
53
RabbitMQLog::RabbitMQLog(const string &name, 
 
54
                         const std::string &exchange,
 
55
                         const std::string &routingkey,
 
56
                         RabbitMQHandler* mqHandler) :
 
57
  plugin::TransactionApplier(name),
 
58
  _rabbitMQHandler(mqHandler),
 
59
  _exchange(exchange),
 
60
  _routingkey(routingkey)
 
61
{ }
98
62
 
99
63
RabbitMQLog::~RabbitMQLog() 
100
 
{
101
 
}
 
64
{ }
102
65
 
103
66
plugin::ReplicationReturnCode
104
67
RabbitMQLog::apply(Session &, const message::Transaction &to_apply)
105
68
{
106
69
  size_t message_byte_length= to_apply.ByteSize();
107
 
  uint8_t* buffer= static_cast<uint8_t *>(malloc(message_byte_length));
 
70
  uint8_t* buffer= new uint8_t[message_byte_length];
108
71
  if(buffer == NULL)
109
72
  {
110
73
    errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate enough memory to transaction message\n"));
115
78
  to_apply.SerializeWithCachedSizesToArray(buffer);
116
79
  try
117
80
  {
118
 
    rabbitMQHandler->publish(buffer, 
119
 
                             int(message_byte_length), 
120
 
                             sysvar_rabbitmq_exchange, 
121
 
                             sysvar_rabbitmq_routingkey);
 
81
    _rabbitMQHandler->publish(buffer, 
 
82
                             int(message_byte_length), 
 
83
                             _exchange,
 
84
                             _routingkey);
122
85
  }
123
86
  catch(exception& e)
124
87
  {
126
89
    deactivate();
127
90
    return plugin::UNKNOWN_ERROR;
128
91
  }
129
 
  free(buffer);
 
92
  delete[] buffer;
130
93
  return plugin::SUCCESS;
131
94
}
132
95
 
142
105
{
143
106
  const module::option_map &vm= context.getOptions();
144
107
  
145
 
  if (vm.count("username"))
146
 
  {
147
 
    sysvar_rabbitmq_username= const_cast<char *>(vm["username"].as<string>().c_str());
148
 
  }
149
 
 
150
 
  if (vm.count("password"))
151
 
  {
152
 
    sysvar_rabbitmq_password= const_cast<char *>(vm["password"].as<string>().c_str());
153
 
  }
154
 
 
155
 
  if (vm.count("host"))
156
 
  {
157
 
    sysvar_rabbitmq_host= const_cast<char *>(vm["host"].as<string>().c_str());
158
 
  }
159
 
 
160
 
  if(sysvar_rabbitmq_log_enabled)
161
 
  {
162
 
    try 
163
 
    {
164
 
      rabbitmqHandler= new RabbitMQHandler(sysvar_rabbitmq_host, 
165
 
                                           sysvar_rabbitmq_port, 
166
 
                                           sysvar_rabbitmq_username, 
167
 
                                           sysvar_rabbitmq_password, 
168
 
                                           sysvar_rabbitmq_virtualhost);
169
 
    } 
170
 
    catch (exception& e) 
171
 
    {
172
 
      errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the RabbitMQHandler.  Got error: %s\n"),
173
 
                    e.what());
174
 
      return 1;
175
 
    }
176
 
    try 
177
 
    {
178
 
      rabbitmqLogger= new RabbitMQLog("rabbit_log_applier", rabbitmqHandler);
179
 
    } 
180
 
    catch (exception& e) 
181
 
    {
182
 
      errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the RabbitMQLog instance.  Got error: %s\n"), 
183
 
                    e.what());
184
 
      return 1;
185
 
    }
186
 
 
187
 
    context.add(rabbitmqLogger);
188
 
    ReplicationServices &replication_services= ReplicationServices::singleton();
189
 
    string replicator_name(sysvar_rabbitmq_use_replicator);
190
 
    replication_services.attachApplier(rabbitmqLogger, replicator_name);
191
 
    return 0;
192
 
  }
 
108
  try 
 
109
  {
 
110
    rabbitmqHandler= new RabbitMQHandler(vm["host"].as<string>(),
 
111
                                         sysvar_rabbitmq_port, 
 
112
                                         vm["username"].as<string>(), 
 
113
                                         vm["password"].as<string>(), 
 
114
                                         vm["virtualhost"].as<string>());
 
115
  } 
 
116
  catch (exception& e) 
 
117
  {
 
118
    errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the RabbitMQHandler.  Got error: %s\n"),
 
119
                  e.what());
 
120
    return 1;
 
121
  }
 
122
  try 
 
123
  {
 
124
    rabbitmqLogger= new RabbitMQLog("rabbit_log_applier",
 
125
                                    vm["exchange"].as<string>(),
 
126
                                    vm["routingkey"].as<string>(),
 
127
                                    rabbitmqHandler);
 
128
  } 
 
129
  catch (exception& e) 
 
130
  {
 
131
    errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the RabbitMQLog instance.  Got error: %s\n"), 
 
132
                  e.what());
 
133
    return 1;
 
134
  }
 
135
 
 
136
  context.add(rabbitmqLogger);
 
137
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
138
  replication_services.attachApplier(rabbitmqLogger, vm["use-replicator"].as<string>());
 
139
 
 
140
  context.registerVariable(new sys_var_const_string_val("host", vm["host"].as<string>()));
 
141
  context.registerVariable(new sys_var_constrained_value_readonly<in_port_t>("port", sysvar_rabbitmq_port));
 
142
  context.registerVariable(new sys_var_const_string_val("username", vm["username"].as<string>()));
 
143
  context.registerVariable(new sys_var_const_string_val("password", vm["password"].as<string>()));
 
144
  context.registerVariable(new sys_var_const_string_val("virtualhost", vm["virtualhost"].as<string>()));
 
145
  context.registerVariable(new sys_var_const_string_val("exchange", vm["exchange"].as<string>()));
 
146
  context.registerVariable(new sys_var_const_string_val("routingkey", vm["routingkey"].as<string>()));
 
147
 
193
148
  return 0;
194
149
}
195
150
 
196
151
 
197
 
static DRIZZLE_SYSVAR_BOOL(enable,
198
 
                           sysvar_rabbitmq_log_enabled,
199
 
                           PLUGIN_VAR_NOCMDARG,
200
 
                           N_("Enable rabbitmq log"),
201
 
                           NULL, /* check func */
202
 
                           NULL, /* update func */
203
 
                           false /* default */);
204
 
 
205
 
 
206
 
static DRIZZLE_SYSVAR_STR(hostname,
207
 
                          sysvar_rabbitmq_host,
208
 
                          PLUGIN_VAR_READONLY,
209
 
                          N_("Host name to connect to"),
210
 
                          NULL, /* check func */
211
 
                          NULL, /* update func*/
212
 
                          "localhost" /* default */);
213
 
 
214
 
 
215
 
static DRIZZLE_SYSVAR_INT(port,
216
 
                          sysvar_rabbitmq_port,
217
 
                          PLUGIN_VAR_READONLY,
218
 
                          N_("RabbitMQ Port"),
219
 
                          NULL, /* check func */
220
 
                          NULL, /* update func */
221
 
                          5672, /* default */
222
 
                          0,
223
 
                          65535,
224
 
                          0);
225
 
 
226
 
static DRIZZLE_SYSVAR_STR(username,
227
 
                          sysvar_rabbitmq_username,
228
 
                          PLUGIN_VAR_READONLY,
229
 
                          N_("RabbitMQ username"),
230
 
                          NULL, /* check func */
231
 
                          NULL, /* update func*/
232
 
                          "guest" /* default */);
233
 
 
234
 
static DRIZZLE_SYSVAR_STR(password,
235
 
                          sysvar_rabbitmq_password,
236
 
                          PLUGIN_VAR_READONLY,
237
 
                          N_("RabbitMQ password"),
238
 
                          NULL, /* check func */
239
 
                          NULL, /* update func*/
240
 
                          "guest" /* default */);
241
 
 
242
 
static DRIZZLE_SYSVAR_STR(virtualhost,
243
 
                          sysvar_rabbitmq_virtualhost,
244
 
                          PLUGIN_VAR_READONLY,
245
 
                          N_("RabbitMQ virtualhost"),
246
 
                          NULL, /* check func */
247
 
                          NULL, /* update func*/
248
 
                          "/" /* default */);
249
 
 
250
 
static DRIZZLE_SYSVAR_STR(exchange,
251
 
                          sysvar_rabbitmq_exchange,
252
 
                          PLUGIN_VAR_READONLY,
253
 
                          N_("Name of RabbitMQ exchange to publish to"),
254
 
                          NULL, /* check func */
255
 
                          NULL, /* update func*/
256
 
                          "ReplicationExchange" /* default */);
257
 
 
258
 
static DRIZZLE_SYSVAR_STR(routingkey,
259
 
                          sysvar_rabbitmq_routingkey,
260
 
                          PLUGIN_VAR_READONLY,
261
 
                          N_("Name of RabbitMQ routing key to use"),
262
 
                          NULL, /* check func */
263
 
                          NULL, /* update func*/
264
 
                          "ReplicationRoutingKey" /* default */);
265
 
 
266
 
static DRIZZLE_SYSVAR_STR(use_replicator,
267
 
                          sysvar_rabbitmq_use_replicator,
268
 
                          PLUGIN_VAR_READONLY,
269
 
                          N_("Name of the replicator plugin to use (default='default_replicator')"),
270
 
                          NULL, /* check func */
271
 
                          NULL, /* update func*/
272
 
                          DEFAULT_USE_REPLICATOR /* default */);
273
 
 
274
152
static void init_options(drizzled::module::option_context &context)
275
153
{
276
 
  context ("enable",
277
 
           po::value<bool>(&sysvar_rabbitmq_log_enabled)->default_value(false)->zero_tokens(),
278
 
           N_("Enable rabbitmq log"));
279
154
  context("host", 
280
155
          po::value<string>()->default_value("localhost"),
281
156
          N_("Host name to connect to"));
 
157
  context("port",
 
158
          po::value<port_constraint>(&sysvar_rabbitmq_port)->default_value(5672),
 
159
          N_("Port to connect to"));
 
160
  context("virtualhost",
 
161
          po::value<string>()->default_value("/"),
 
162
          N_("RabbitMQ virtualhost"));
282
163
  context("username",
283
164
          po::value<string>()->default_value("guest"),
284
165
          N_("RabbitMQ username"));
285
166
  context("password",
286
167
          po::value<string>()->default_value("guest"),
287
168
          N_("RabbitMQ password"));
 
169
  context("use-replicator",
 
170
          po::value<string>()->default_value("default_replicator"),
 
171
          N_("Name of the replicator plugin to use (default='default_replicator')"));
 
172
  context("exchange",
 
173
          po::value<string>()->default_value("ReplicationExchange"),
 
174
          N_("Name of RabbitMQ exchange to publish to"));
 
175
  context("routingkey",
 
176
          po::value<string>()->default_value("ReplicationRoutingKey"),
 
177
          N_("Name of RabbitMQ routing key to use"));
288
178
}
289
179
 
290
 
static drizzle_sys_var* system_variables[]= {
291
 
  DRIZZLE_SYSVAR(enable),
292
 
  DRIZZLE_SYSVAR(hostname),
293
 
  DRIZZLE_SYSVAR(port),
294
 
  DRIZZLE_SYSVAR(username),
295
 
  DRIZZLE_SYSVAR(password),
296
 
  DRIZZLE_SYSVAR(virtualhost),
297
 
  DRIZZLE_SYSVAR(exchange),
298
 
  DRIZZLE_SYSVAR(routingkey),
299
 
  DRIZZLE_SYSVAR(use_replicator),
300
 
  NULL
301
 
};
 
180
} /* namespace drizzle_plugin */
302
181
 
303
 
DRIZZLE_PLUGIN(init, system_variables, init_options);
 
182
DRIZZLE_PLUGIN(drizzle_plugin::init, NULL, drizzle_plugin::init_options);
304
183