1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2010 Marcus Eriksson
8
* Marcus Eriksson <krummas@gmail.com>
10
* This program is free software; you can redistribute it and/or modify
11
* it under the terms of the GNU General Public License as published by
12
* the Free Software Foundation; either version 2 of the License, or
13
* (at your option) any later version.
15
* This program is distributed in the hope that it will be useful,
16
* but WITHOUT ANY WARRANTY; without even the implied warranty of
17
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18
* GNU General Public License for more details.
20
* You should have received a copy of the GNU General Public License
21
* along with this program; if not, write to the Free Software
22
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
27
#include <drizzled/gettext.h>
29
#include "rabbitmq_handler.h"
33
namespace drizzle_plugin
36
RabbitMQHandler::RabbitMQHandler(const std::string &rabbitMQHost,
37
const in_port_t rabbitMQPort,
38
const std::string &rabbitMQUsername,
39
const std::string &rabbitMQPassword,
40
const std::string &rabbitMQVirtualhost)
41
throw(rabbitmq_handler_exception) :
42
rabbitmqConnection(amqp_new_connection()),
43
sockfd(amqp_open_socket(rabbitMQHost.c_str(), rabbitMQPort)),
44
hostname(rabbitMQHost),
46
username(rabbitMQUsername),
47
password(rabbitMQPassword),
48
virtualhost(rabbitMQVirtualhost)
50
/* open the socket to the rabbitmq server */
53
throw rabbitmq_handler_exception(_("Could not open socket, is rabbitmq running?"));
55
amqp_set_sockfd(rabbitmqConnection, sockfd);
56
/* login to rabbitmq, handleAMQPError throws exception if there is a problem */
57
handleAMQPError(amqp_login(rabbitmqConnection,
62
AMQP_SASL_METHOD_PLAIN,
66
/* open the channel */
67
amqp_channel_open(rabbitmqConnection, 1);
70
RabbitMQHandler::~RabbitMQHandler()
74
handleAMQPError(amqp_channel_close(rabbitmqConnection,
78
handleAMQPError(amqp_connection_close(rabbitmqConnection,
81
amqp_destroy_connection(rabbitmqConnection);
83
catch(exception& e) {} // do not throw in destructorn
88
void RabbitMQHandler::publish(void *message,
90
const std::string &exchangeName,
91
const std::string &routingKey)
92
throw(rabbitmq_handler_exception)
98
if (amqp_basic_publish(rabbitmqConnection,
100
amqp_cstring_bytes(exchangeName.c_str()),
101
amqp_cstring_bytes(routingKey.c_str()),
107
throw rabbitmq_handler_exception("Could not publish message");
112
void RabbitMQHandler::handleAMQPError(amqp_rpc_reply_t x, string context) throw(rabbitmq_handler_exception)
114
string errorMessage("");
115
switch (x.reply_type) {
116
case AMQP_RESPONSE_NORMAL:
118
case AMQP_RESPONSE_NONE:
119
errorMessage.assign("No response in ");
120
errorMessage.append(context);
121
throw rabbitmq_handler_exception(errorMessage);
122
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
123
case AMQP_RESPONSE_SERVER_EXCEPTION:
124
switch (x.reply.id) {
125
case AMQP_CONNECTION_CLOSE_METHOD:
126
errorMessage.assign("Connection closed in ");
127
errorMessage.append(context);
128
throw rabbitmq_handler_exception(errorMessage);
129
case AMQP_CHANNEL_CLOSE_METHOD:
130
errorMessage.assign("Channel closed in ");
131
errorMessage.append(context);
132
throw rabbitmq_handler_exception(errorMessage);
134
errorMessage.assign("Unknown error in ");
135
errorMessage.append(context);
136
throw rabbitmq_handler_exception(errorMessage);
141
} /* namespace drizzle_plugin */