37
37
const in_port_t rabbitMQPort,
38
38
const std::string &rabbitMQUsername,
39
39
const std::string &rabbitMQPassword,
40
const std::string &rabbitMQVirtualhost,
41
const std::string &rabbitMQExchange,
42
const std::string &rabbitMQRoutingKey)
40
const std::string &rabbitMQVirtualhost)
43
41
throw(rabbitmq_handler_exception) :
44
42
rabbitmqConnection(amqp_new_connection()),
43
sockfd(amqp_open_socket(rabbitMQHost.c_str(), rabbitMQPort)),
45
44
hostname(rabbitMQHost),
46
45
port(rabbitMQPort),
47
46
username(rabbitMQUsername),
48
47
password(rabbitMQPassword),
49
virtualhost(rabbitMQVirtualhost),
50
exchange(rabbitMQExchange),
51
routingKey(rabbitMQRoutingKey)
56
RabbitMQHandler::~RabbitMQHandler()
61
void RabbitMQHandler::publish(void *message,
63
throw(rabbitmq_handler_exception)
69
if (amqp_basic_publish(rabbitmqConnection,
71
amqp_cstring_bytes(exchange.c_str()),
72
amqp_cstring_bytes(routingKey.c_str()),
78
throw rabbitmq_handler_exception("Could not publish message");
83
void RabbitMQHandler::reconnect() throw(rabbitmq_handler_exception)
89
void RabbitMQHandler::disconnect() throw(rabbitmq_handler_exception)
93
handleAMQPError(amqp_channel_close(rabbitmqConnection,
97
handleAMQPError(amqp_connection_close(rabbitmqConnection,
100
amqp_destroy_connection(rabbitmqConnection);
102
catch(exception& e) {} // do not throw in destructorn
106
void RabbitMQHandler::connect() throw(rabbitmq_handler_exception) {
107
sockfd = amqp_open_socket(hostname.c_str(), port);
48
virtualhost(rabbitMQVirtualhost)
50
/* open the socket to the rabbitmq server */
110
53
throw rabbitmq_handler_exception(_("Could not open socket, is rabbitmq running?"));
122
65
"rabbitmq login");
123
66
/* open the channel */
124
67
amqp_channel_open(rabbitmqConnection, 1);
125
handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection), "RPC Reply");
126
amqp_exchange_declare(rabbitmqConnection, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes("fanout"), 0, 0, amqp_empty_table);
127
handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection), "RPC Reply");
70
RabbitMQHandler::~RabbitMQHandler()
74
handleAMQPError(amqp_channel_close(rabbitmqConnection,
78
handleAMQPError(amqp_connection_close(rabbitmqConnection,
81
amqp_destroy_connection(rabbitmqConnection);
83
catch(exception& e) {} // do not throw in destructorn
88
void RabbitMQHandler::publish(void *message,
90
const std::string &exchangeName,
91
const std::string &routingKey)
92
throw(rabbitmq_handler_exception)
98
if (amqp_basic_publish(rabbitmqConnection,
100
amqp_cstring_bytes(exchangeName.c_str()),
101
amqp_cstring_bytes(routingKey.c_str()),
107
throw rabbitmq_handler_exception("Could not publish message");
130
112
void RabbitMQHandler::handleAMQPError(amqp_rpc_reply_t x, string context) throw(rabbitmq_handler_exception)