1
#include "rabbitmq_handler.h"
5
RabbitMQHandler::RabbitMQHandler(const char* rabbitMQHost,
6
const int rabbitMQPort,
7
const char* rabbitMQUsername,
8
const char* rabbitMQPassword,
9
const char* rabbitMQVirtualhost)
10
throw(rabbitmq_handler_exception):
11
hostname(rabbitMQHost),
13
username(rabbitMQUsername),
14
password(rabbitMQPassword),
15
virtualhost(rabbitMQVirtualhost)
17
rabbitmqConnection = amqp_new_connection();
18
/* open the socket to the rabbitmq server */
19
sockfd = amqp_open_socket(hostname, port);
22
throw rabbitmq_handler_exception("Could not open socket, is rabbitmq running?");
24
amqp_set_sockfd(rabbitmqConnection, sockfd);
25
/* login to rabbitmq, handleAMQPError throws exception if there is a problem */
26
handleAMQPError(amqp_login(rabbitmqConnection,
31
AMQP_SASL_METHOD_PLAIN,
35
/* open the channel */
36
amqp_channel_open(rabbitmqConnection, 1);
39
RabbitMQHandler::~RabbitMQHandler()
43
handleAMQPError(amqp_channel_close(rabbitmqConnection,
47
handleAMQPError(amqp_connection_close(rabbitmqConnection,
50
amqp_destroy_connection(rabbitmqConnection);
52
catch(exception& e) {} // do not throw in destructorn
57
void RabbitMQHandler::publish(const uint8_t *message,
59
const char* exchangeName,
60
const char* routingKey) throw(rabbitmq_handler_exception)
63
b.bytes= (void*)message;
66
if(amqp_basic_publish(rabbitmqConnection,
68
amqp_cstring_bytes(exchangeName),
69
amqp_cstring_bytes(routingKey),
75
throw rabbitmq_handler_exception("Could not publish message");
80
void RabbitMQHandler::handleAMQPError(amqp_rpc_reply_t x, string context) throw(rabbitmq_handler_exception)
82
string errorMessage = "";
83
switch (x.reply_type) {
84
case AMQP_RESPONSE_NORMAL:
86
case AMQP_RESPONSE_NONE:
87
errorMessage.assign("No response in ");
88
errorMessage.append(context);
89
throw rabbitmq_handler_exception(errorMessage);
90
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
91
case AMQP_RESPONSE_SERVER_EXCEPTION:
93
case AMQP_CONNECTION_CLOSE_METHOD:
94
errorMessage.assign("Connection closed in ");
95
errorMessage.append(context);
96
throw rabbitmq_handler_exception(errorMessage);
97
case AMQP_CHANNEL_CLOSE_METHOD:
98
errorMessage.assign("Channel closed in ");
99
errorMessage.append(context);
100
throw rabbitmq_handler_exception(errorMessage);
102
errorMessage.assign("Unknown error in ");
103
errorMessage.append(context);
104
throw rabbitmq_handler_exception(errorMessage);