~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/rabbitmq/rabbitmq_handler.cc

  • Committer: Marcus Eriksson
  • Date: 2010-02-06 20:18:39 UTC
  • mto: This revision was merged to the branch mainline in revision 1438.
  • Revision ID: marcuse@localhost.localdomain-20100206201839-7fh52959ox940hud
rabbitmq replication applier

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#include "rabbitmq_handler.h"
 
2
 
 
3
using namespace std;
 
4
 
 
5
RabbitMQHandler::RabbitMQHandler(const char* rabbitMQHost, 
 
6
                                 const int rabbitMQPort, 
 
7
                                 const char* rabbitMQUsername, 
 
8
                                 const char* rabbitMQPassword, 
 
9
                                 const char* rabbitMQVirtualhost) 
 
10
  throw(rabbitmq_handler_exception):
 
11
  hostname(rabbitMQHost),
 
12
  port(rabbitMQPort),
 
13
  username(rabbitMQUsername),
 
14
  password(rabbitMQPassword),
 
15
  virtualhost(rabbitMQVirtualhost)
 
16
{
 
17
  rabbitmqConnection = amqp_new_connection();
 
18
  /* open the socket to the rabbitmq server */
 
19
  sockfd = amqp_open_socket(hostname, port);
 
20
  if(sockfd < 0) 
 
21
  {
 
22
    throw rabbitmq_handler_exception("Could not open socket, is rabbitmq running?");
 
23
  }
 
24
  amqp_set_sockfd(rabbitmqConnection, sockfd);
 
25
  /* login to rabbitmq, handleAMQPError throws exception if there is a problem */
 
26
  handleAMQPError(amqp_login(rabbitmqConnection, 
 
27
                             virtualhost, 
 
28
                             0, 
 
29
                             131072, 
 
30
                             0, 
 
31
                             AMQP_SASL_METHOD_PLAIN, 
 
32
                             username, 
 
33
                             password), 
 
34
                  "rabbitmq login");
 
35
  /* open the channel */
 
36
  amqp_channel_open(rabbitmqConnection, 1);
 
37
}
 
38
 
 
39
RabbitMQHandler::~RabbitMQHandler()
 
40
{
 
41
  try
 
42
  {
 
43
    handleAMQPError(amqp_channel_close(rabbitmqConnection, 
 
44
                                       1, 
 
45
                                       AMQP_REPLY_SUCCESS),
 
46
                    "close channel");
 
47
    handleAMQPError(amqp_connection_close(rabbitmqConnection, 
 
48
                                          AMQP_REPLY_SUCCESS),
 
49
                    "close connection");
 
50
    amqp_destroy_connection(rabbitmqConnection);
 
51
  }
 
52
  catch(exception& e) {} // do not throw in destructorn 
 
53
  
 
54
  close(sockfd);
 
55
}
 
56
 
 
57
void RabbitMQHandler::publish(const uint8_t *message, 
 
58
                              const int length, 
 
59
                              const char* exchangeName, 
 
60
                              const char* routingKey) throw(rabbitmq_handler_exception)
 
61
{
 
62
  amqp_bytes_t b;
 
63
  b.bytes= (void*)message;
 
64
  b.len= length;
 
65
  
 
66
  if(amqp_basic_publish(rabbitmqConnection,
 
67
                        1,
 
68
                        amqp_cstring_bytes(exchangeName),
 
69
                        amqp_cstring_bytes(routingKey),
 
70
                        0,
 
71
                        0,
 
72
                        NULL,
 
73
                        b) < 0)
 
74
  {
 
75
    throw rabbitmq_handler_exception("Could not publish message");
 
76
  }
 
77
 
 
78
}
 
79
 
 
80
void RabbitMQHandler::handleAMQPError(amqp_rpc_reply_t x, string context) throw(rabbitmq_handler_exception)
 
81
{
 
82
  string errorMessage = "";
 
83
  switch (x.reply_type) {
 
84
  case AMQP_RESPONSE_NORMAL:
 
85
    break;
 
86
  case AMQP_RESPONSE_NONE:
 
87
    errorMessage.assign("No response in ");
 
88
    errorMessage.append(context);
 
89
    throw rabbitmq_handler_exception(errorMessage);
 
90
  case AMQP_RESPONSE_LIBRARY_EXCEPTION:
 
91
  case AMQP_RESPONSE_SERVER_EXCEPTION:
 
92
    switch (x.reply.id) {      
 
93
    case AMQP_CONNECTION_CLOSE_METHOD:
 
94
      errorMessage.assign("Connection closed in ");
 
95
      errorMessage.append(context);
 
96
      throw rabbitmq_handler_exception(errorMessage);
 
97
    case AMQP_CHANNEL_CLOSE_METHOD:
 
98
      errorMessage.assign("Channel closed in ");
 
99
      errorMessage.append(context);
 
100
      throw rabbitmq_handler_exception(errorMessage);
 
101
    default:
 
102
      errorMessage.assign("Unknown error in ");
 
103
      errorMessage.append(context);
 
104
      throw rabbitmq_handler_exception(errorMessage);
 
105
    }
 
106
  }
 
107
}