37
37
const in_port_t rabbitMQPort,
38
38
const std::string &rabbitMQUsername,
39
39
const std::string &rabbitMQPassword,
40
const std::string &rabbitMQVirtualhost)
40
const std::string &rabbitMQVirtualhost,
41
const std::string &rabbitMQExchange,
42
const std::string &rabbitMQRoutingKey)
41
43
throw(rabbitmq_handler_exception) :
42
44
rabbitmqConnection(amqp_new_connection()),
43
sockfd(amqp_open_socket(rabbitMQHost.c_str(), rabbitMQPort)),
44
45
hostname(rabbitMQHost),
45
46
port(rabbitMQPort),
46
47
username(rabbitMQUsername),
47
48
password(rabbitMQPassword),
48
virtualhost(rabbitMQVirtualhost)
50
/* open the socket to the rabbitmq server */
49
virtualhost(rabbitMQVirtualhost),
50
exchange(rabbitMQExchange),
51
routingKey(rabbitMQRoutingKey)
53
pthread_mutex_init(&publishLock, NULL);
57
RabbitMQHandler::~RabbitMQHandler()
59
pthread_mutex_destroy(&publishLock);
63
void RabbitMQHandler::publish(void *message,
65
throw(rabbitmq_handler_exception)
67
pthread_mutex_lock(&publishLock);
72
if (amqp_basic_publish(rabbitmqConnection,
74
amqp_cstring_bytes(exchange.c_str()),
75
amqp_cstring_bytes(routingKey.c_str()),
81
pthread_mutex_unlock(&publishLock);
82
throw rabbitmq_handler_exception("Could not publish message");
84
pthread_mutex_unlock(&publishLock);
88
void RabbitMQHandler::reconnect() throw(rabbitmq_handler_exception)
94
void RabbitMQHandler::disconnect() throw(rabbitmq_handler_exception)
98
handleAMQPError(amqp_channel_close(rabbitmqConnection,
102
handleAMQPError(amqp_connection_close(rabbitmqConnection,
105
amqp_destroy_connection(rabbitmqConnection);
107
catch(exception& e) {} // do not throw in destructorn
111
void RabbitMQHandler::connect() throw(rabbitmq_handler_exception) {
112
sockfd = amqp_open_socket(hostname.c_str(), port);
53
115
throw rabbitmq_handler_exception(_("Could not open socket, is rabbitmq running?"));
65
127
"rabbitmq login");
66
128
/* open the channel */
67
129
amqp_channel_open(rabbitmqConnection, 1);
70
RabbitMQHandler::~RabbitMQHandler()
74
handleAMQPError(amqp_channel_close(rabbitmqConnection,
78
handleAMQPError(amqp_connection_close(rabbitmqConnection,
81
amqp_destroy_connection(rabbitmqConnection);
83
catch(exception& e) {} // do not throw in destructorn
88
void RabbitMQHandler::publish(void *message,
90
const std::string &exchangeName,
91
const std::string &routingKey)
92
throw(rabbitmq_handler_exception)
98
if (amqp_basic_publish(rabbitmqConnection,
100
amqp_cstring_bytes(exchangeName.c_str()),
101
amqp_cstring_bytes(routingKey.c_str()),
107
throw rabbitmq_handler_exception("Could not publish message");
130
handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection), "RPC Reply");
131
amqp_table_t empty_table = { 0, NULL }; // for users of old librabbitmq users - amqp_empty_table did not exist
132
amqp_exchange_declare(rabbitmqConnection, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes("fanout"), 0, 0, empty_table);
133
handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection), "RPC Reply");
112
136
void RabbitMQHandler::handleAMQPError(amqp_rpc_reply_t x, string context) throw(rabbitmq_handler_exception)