~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/rabbitmq/rabbitmq_handler.cc

  • Committer: Brian Aker
  • Date: 2010-10-20 20:26:18 UTC
  • mfrom: (1859.2.13 refactor)
  • Revision ID: brian@tangent.org-20101020202618-9222n39lm329urv5
Merge for Brian 

Show diffs side-by-side

added added

removed removed

Lines of Context:
23
23
 */
24
24
 
25
25
#include "config.h"
26
 
 
27
 
#include <drizzled/gettext.h>
28
 
 
29
26
#include "rabbitmq_handler.h"
30
27
 
31
28
using namespace std;
32
29
 
33
 
namespace drizzle_plugin
34
 
{
35
 
 
36
 
RabbitMQHandler::RabbitMQHandler(const std::string &rabbitMQHost, 
37
 
                                 const in_port_t rabbitMQPort, 
38
 
                                 const std::string &rabbitMQUsername, 
39
 
                                 const std::string &rabbitMQPassword, 
40
 
                                 const std::string &rabbitMQVirtualhost) 
41
 
  throw(rabbitmq_handler_exception) :
42
 
    rabbitmqConnection(amqp_new_connection()),
43
 
    sockfd(amqp_open_socket(rabbitMQHost.c_str(), rabbitMQPort)),
44
 
    hostname(rabbitMQHost),
45
 
    port(rabbitMQPort),
46
 
    username(rabbitMQUsername),
47
 
    password(rabbitMQPassword),
48
 
    virtualhost(rabbitMQVirtualhost)
49
 
{
 
30
RabbitMQHandler::RabbitMQHandler(const char* rabbitMQHost, 
 
31
                                 const int rabbitMQPort, 
 
32
                                 const char* rabbitMQUsername, 
 
33
                                 const char* rabbitMQPassword, 
 
34
                                 const char* rabbitMQVirtualhost) 
 
35
  throw(rabbitmq_handler_exception):
 
36
  hostname(rabbitMQHost),
 
37
  port(rabbitMQPort),
 
38
  username(rabbitMQUsername),
 
39
  password(rabbitMQPassword),
 
40
  virtualhost(rabbitMQVirtualhost)
 
41
{
 
42
  rabbitmqConnection = amqp_new_connection();
50
43
  /* open the socket to the rabbitmq server */
 
44
  sockfd = amqp_open_socket(hostname, port);
51
45
  if(sockfd < 0) 
52
46
  {
53
 
    throw rabbitmq_handler_exception(_("Could not open socket, is rabbitmq running?"));
 
47
    throw rabbitmq_handler_exception("Could not open socket, is rabbitmq running?");
54
48
  }
55
49
  amqp_set_sockfd(rabbitmqConnection, sockfd);
56
50
  /* login to rabbitmq, handleAMQPError throws exception if there is a problem */
57
51
  handleAMQPError(amqp_login(rabbitmqConnection, 
58
 
                             virtualhost.c_str(), 
59
 
                             0, 
60
 
                             131072, 
61
 
                             0, 
62
 
                             AMQP_SASL_METHOD_PLAIN, 
63
 
                             username.c_str(), 
64
 
                             password.c_str()), 
65
 
                  "rabbitmq login");
 
52
                             virtualhost, 
 
53
                             0, 
 
54
                             131072, 
 
55
                             0, 
 
56
                             AMQP_SASL_METHOD_PLAIN, 
 
57
                             username, 
 
58
                             password), 
 
59
                  "rabbitmq login");
66
60
  /* open the channel */
67
61
  amqp_channel_open(rabbitmqConnection, 1);
68
62
}
85
79
  close(sockfd);
86
80
}
87
81
 
88
 
void RabbitMQHandler::publish(void *message, 
89
 
                              const int length, 
90
 
                              const std::string &exchangeName, 
91
 
                              const std::string &routingKey)
92
 
throw(rabbitmq_handler_exception)
 
82
void RabbitMQHandler::publish(const uint8_t *message, 
 
83
                              const int length, 
 
84
                              const char* exchangeName, 
 
85
                              const char* routingKey) throw(rabbitmq_handler_exception)
93
86
{
94
87
  amqp_bytes_t b;
95
 
  b.bytes= message;
 
88
  b.bytes= (void*)message;
96
89
  b.len= length;
97
90
  
98
 
  if (amqp_basic_publish(rabbitmqConnection,
99
 
                         1,
100
 
                         amqp_cstring_bytes(exchangeName.c_str()),
101
 
                         amqp_cstring_bytes(routingKey.c_str()),
102
 
                         0,
103
 
                         0,
104
 
                         NULL,
105
 
                         b) < 0)
 
91
  if(amqp_basic_publish(rabbitmqConnection,
 
92
                        1,
 
93
                        amqp_cstring_bytes(exchangeName),
 
94
                        amqp_cstring_bytes(routingKey),
 
95
                        0,
 
96
                        0,
 
97
                        NULL,
 
98
                        b) < 0)
106
99
  {
107
100
    throw rabbitmq_handler_exception("Could not publish message");
108
101
  }
137
130
    }
138
131
  }
139
132
}
140
 
 
141
 
} /* namespace drizzle_plugin */