~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/rabbitmq/rabbitmq_handler.cc

  • Committer: Marcus Eriksson
  • Date: 2011-03-30 19:50:29 UTC
  • mto: This revision was merged to the branch mainline in revision 2263.
  • Revision ID: krummas@gmail.com-20110330195029-3l4r5kc2cr57i40l
Add RabbitMQ documentation and update the rabbitmq plugin to reconnect etc if the server goes away

Show diffs side-by-side

added added

removed removed

Lines of Context:
37
37
                                 const in_port_t rabbitMQPort, 
38
38
                                 const std::string &rabbitMQUsername, 
39
39
                                 const std::string &rabbitMQPassword, 
40
 
                                 const std::string &rabbitMQVirtualhost) 
 
40
                                 const std::string &rabbitMQVirtualhost, 
 
41
                                 const std::string &rabbitMQExchange, 
 
42
                                 const std::string &rabbitMQRoutingKey) 
41
43
  throw(rabbitmq_handler_exception) :
42
44
    rabbitmqConnection(amqp_new_connection()),
43
 
    sockfd(amqp_open_socket(rabbitMQHost.c_str(), rabbitMQPort)),
44
45
    hostname(rabbitMQHost),
45
46
    port(rabbitMQPort),
46
47
    username(rabbitMQUsername),
47
48
    password(rabbitMQPassword),
48
 
    virtualhost(rabbitMQVirtualhost)
49
 
{
50
 
  /* open the socket to the rabbitmq server */
 
49
    virtualhost(rabbitMQVirtualhost),
 
50
    exchange(rabbitMQExchange), 
 
51
    routingKey(rabbitMQRoutingKey)
 
52
{
 
53
  connect();
 
54
}
 
55
 
 
56
RabbitMQHandler::~RabbitMQHandler()
 
57
{
 
58
  disconnect();
 
59
}
 
60
 
 
61
void RabbitMQHandler::publish(void *message, 
 
62
                              const int length)
 
63
throw(rabbitmq_handler_exception)
 
64
{
 
65
  amqp_bytes_t b;
 
66
  b.bytes= message;
 
67
  b.len= length;
 
68
  
 
69
  if (amqp_basic_publish(rabbitmqConnection,
 
70
                         1,
 
71
                         amqp_cstring_bytes(exchange.c_str()),
 
72
                         amqp_cstring_bytes(routingKey.c_str()),
 
73
                         0,
 
74
                         0,
 
75
                         NULL,
 
76
                         b) < 0)
 
77
  {
 
78
    throw rabbitmq_handler_exception("Could not publish message");
 
79
  }
 
80
 
 
81
}
 
82
 
 
83
void RabbitMQHandler::reconnect() throw(rabbitmq_handler_exception)
 
84
{
 
85
  disconnect();
 
86
  connect();
 
87
}
 
88
 
 
89
void RabbitMQHandler::disconnect() throw(rabbitmq_handler_exception) 
 
90
{
 
91
  try
 
92
  {
 
93
    handleAMQPError(amqp_channel_close(rabbitmqConnection, 
 
94
                                       1, 
 
95
                                       AMQP_REPLY_SUCCESS),
 
96
                    "close channel");
 
97
    handleAMQPError(amqp_connection_close(rabbitmqConnection, 
 
98
                                          AMQP_REPLY_SUCCESS),
 
99
                    "close connection");
 
100
    amqp_destroy_connection(rabbitmqConnection);
 
101
  }
 
102
  catch(exception& e) {} // do not throw in destructorn 
 
103
  close(sockfd);
 
104
}
 
105
 
 
106
void RabbitMQHandler::connect() throw(rabbitmq_handler_exception) {
 
107
  sockfd = amqp_open_socket(hostname.c_str(), port);
51
108
  if(sockfd < 0) 
52
109
  {
53
110
    throw rabbitmq_handler_exception(_("Could not open socket, is rabbitmq running?"));
65
122
                  "rabbitmq login");
66
123
  /* open the channel */
67
124
  amqp_channel_open(rabbitmqConnection, 1);
68
 
}
69
 
 
70
 
RabbitMQHandler::~RabbitMQHandler()
71
 
{
72
 
  try
73
 
  {
74
 
    handleAMQPError(amqp_channel_close(rabbitmqConnection, 
75
 
                                       1, 
76
 
                                       AMQP_REPLY_SUCCESS),
77
 
                    "close channel");
78
 
    handleAMQPError(amqp_connection_close(rabbitmqConnection, 
79
 
                                          AMQP_REPLY_SUCCESS),
80
 
                    "close connection");
81
 
    amqp_destroy_connection(rabbitmqConnection);
82
 
  }
83
 
  catch(exception& e) {} // do not throw in destructorn 
84
 
  
85
 
  close(sockfd);
86
 
}
87
 
 
88
 
void RabbitMQHandler::publish(void *message, 
89
 
                              const int length, 
90
 
                              const std::string &exchangeName, 
91
 
                              const std::string &routingKey)
92
 
throw(rabbitmq_handler_exception)
93
 
{
94
 
  amqp_bytes_t b;
95
 
  b.bytes= message;
96
 
  b.len= length;
97
 
  
98
 
  if (amqp_basic_publish(rabbitmqConnection,
99
 
                         1,
100
 
                         amqp_cstring_bytes(exchangeName.c_str()),
101
 
                         amqp_cstring_bytes(routingKey.c_str()),
102
 
                         0,
103
 
                         0,
104
 
                         NULL,
105
 
                         b) < 0)
106
 
  {
107
 
    throw rabbitmq_handler_exception("Could not publish message");
108
 
  }
109
 
 
 
125
  handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection), "RPC Reply");
 
126
  amqp_exchange_declare(rabbitmqConnection, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes("fanout"), 0, 0, amqp_empty_table);
 
127
  handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection), "RPC Reply");
110
128
}
111
129
 
112
130
void RabbitMQHandler::handleAMQPError(amqp_rpc_reply_t x, string context) throw(rabbitmq_handler_exception)