25
25
#include "config.h"
27
#include <drizzled/gettext.h>
29
26
#include "rabbitmq_handler.h"
31
28
using namespace std;
33
namespace drizzle_plugin
36
RabbitMQHandler::RabbitMQHandler(const std::string &rabbitMQHost,
37
const in_port_t rabbitMQPort,
38
const std::string &rabbitMQUsername,
39
const std::string &rabbitMQPassword,
40
const std::string &rabbitMQVirtualhost)
41
throw(rabbitmq_handler_exception) :
42
rabbitmqConnection(amqp_new_connection()),
43
sockfd(amqp_open_socket(rabbitMQHost.c_str(), rabbitMQPort)),
44
hostname(rabbitMQHost),
46
username(rabbitMQUsername),
47
password(rabbitMQPassword),
48
virtualhost(rabbitMQVirtualhost)
30
RabbitMQHandler::RabbitMQHandler(const char* rabbitMQHost,
31
const int rabbitMQPort,
32
const char* rabbitMQUsername,
33
const char* rabbitMQPassword,
34
const char* rabbitMQVirtualhost)
35
throw(rabbitmq_handler_exception):
36
hostname(rabbitMQHost),
38
username(rabbitMQUsername),
39
password(rabbitMQPassword),
40
virtualhost(rabbitMQVirtualhost)
42
rabbitmqConnection = amqp_new_connection();
50
43
/* open the socket to the rabbitmq server */
44
sockfd = amqp_open_socket(hostname, port);
53
throw rabbitmq_handler_exception(_("Could not open socket, is rabbitmq running?"));
47
throw rabbitmq_handler_exception("Could not open socket, is rabbitmq running?");
55
49
amqp_set_sockfd(rabbitmqConnection, sockfd);
56
50
/* login to rabbitmq, handleAMQPError throws exception if there is a problem */
57
51
handleAMQPError(amqp_login(rabbitmqConnection,
62
AMQP_SASL_METHOD_PLAIN,
56
AMQP_SASL_METHOD_PLAIN,
66
60
/* open the channel */
67
61
amqp_channel_open(rabbitmqConnection, 1);
88
void RabbitMQHandler::publish(void *message,
90
const std::string &exchangeName,
91
const std::string &routingKey)
92
throw(rabbitmq_handler_exception)
82
void RabbitMQHandler::publish(const uint8_t *message,
84
const char* exchangeName,
85
const char* routingKey) throw(rabbitmq_handler_exception)
88
b.bytes= (void*)message;
98
if (amqp_basic_publish(rabbitmqConnection,
100
amqp_cstring_bytes(exchangeName.c_str()),
101
amqp_cstring_bytes(routingKey.c_str()),
91
if(amqp_basic_publish(rabbitmqConnection,
93
amqp_cstring_bytes(exchangeName),
94
amqp_cstring_bytes(routingKey),
107
100
throw rabbitmq_handler_exception("Could not publish message");