~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/rabbitmq/rabbitmq_handler.cc

  • Committer: Lee Bieber
  • Date: 2011-03-29 22:31:41 UTC
  • mfrom: (2257.1.3 build)
  • Revision ID: kalebral@gmail.com-20110329223141-yxc22h3l2he58sk0
Merge Andrew - 743842: Build failure using GCC 4.6
Merge Stewart - 738022: CachedDirectory silently fails to add entries if stat() fails
Merge Olaf - Common fwd: add copyright, add more declaration

Show diffs side-by-side

added added

removed removed

Lines of Context:
37
37
                                 const in_port_t rabbitMQPort, 
38
38
                                 const std::string &rabbitMQUsername, 
39
39
                                 const std::string &rabbitMQPassword, 
40
 
                                 const std::string &rabbitMQVirtualhost, 
41
 
                                 const std::string &rabbitMQExchange, 
42
 
                                 const std::string &rabbitMQRoutingKey) 
 
40
                                 const std::string &rabbitMQVirtualhost) 
43
41
  throw(rabbitmq_handler_exception) :
44
42
    rabbitmqConnection(amqp_new_connection()),
 
43
    sockfd(amqp_open_socket(rabbitMQHost.c_str(), rabbitMQPort)),
45
44
    hostname(rabbitMQHost),
46
45
    port(rabbitMQPort),
47
46
    username(rabbitMQUsername),
48
47
    password(rabbitMQPassword),
49
 
    virtualhost(rabbitMQVirtualhost),
50
 
    exchange(rabbitMQExchange), 
51
 
    routingKey(rabbitMQRoutingKey)
52
 
{
53
 
  connect();
54
 
}
55
 
 
56
 
RabbitMQHandler::~RabbitMQHandler()
57
 
{
58
 
  disconnect();
59
 
}
60
 
 
61
 
void RabbitMQHandler::publish(void *message, 
62
 
                              const int length)
63
 
throw(rabbitmq_handler_exception)
64
 
{
65
 
  amqp_bytes_t b;
66
 
  b.bytes= message;
67
 
  b.len= length;
68
 
  
69
 
  if (amqp_basic_publish(rabbitmqConnection,
70
 
                         1,
71
 
                         amqp_cstring_bytes(exchange.c_str()),
72
 
                         amqp_cstring_bytes(routingKey.c_str()),
73
 
                         0,
74
 
                         0,
75
 
                         NULL,
76
 
                         b) < 0)
77
 
  {
78
 
    throw rabbitmq_handler_exception("Could not publish message");
79
 
  }
80
 
 
81
 
}
82
 
 
83
 
void RabbitMQHandler::reconnect() throw(rabbitmq_handler_exception)
84
 
{
85
 
  disconnect();
86
 
  connect();
87
 
}
88
 
 
89
 
void RabbitMQHandler::disconnect() throw(rabbitmq_handler_exception) 
90
 
{
91
 
  try
92
 
  {
93
 
    handleAMQPError(amqp_channel_close(rabbitmqConnection, 
94
 
                                       1, 
95
 
                                       AMQP_REPLY_SUCCESS),
96
 
                    "close channel");
97
 
    handleAMQPError(amqp_connection_close(rabbitmqConnection, 
98
 
                                          AMQP_REPLY_SUCCESS),
99
 
                    "close connection");
100
 
    amqp_destroy_connection(rabbitmqConnection);
101
 
  }
102
 
  catch(exception& e) {} // do not throw in destructorn 
103
 
  close(sockfd);
104
 
}
105
 
 
106
 
void RabbitMQHandler::connect() throw(rabbitmq_handler_exception) {
107
 
  sockfd = amqp_open_socket(hostname.c_str(), port);
 
48
    virtualhost(rabbitMQVirtualhost)
 
49
{
 
50
  /* open the socket to the rabbitmq server */
108
51
  if(sockfd < 0) 
109
52
  {
110
53
    throw rabbitmq_handler_exception(_("Could not open socket, is rabbitmq running?"));
122
65
                  "rabbitmq login");
123
66
  /* open the channel */
124
67
  amqp_channel_open(rabbitmqConnection, 1);
125
 
  handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection), "RPC Reply");
126
 
  amqp_exchange_declare(rabbitmqConnection, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes("fanout"), 0, 0, amqp_empty_table);
127
 
  handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection), "RPC Reply");
 
68
}
 
69
 
 
70
RabbitMQHandler::~RabbitMQHandler()
 
71
{
 
72
  try
 
73
  {
 
74
    handleAMQPError(amqp_channel_close(rabbitmqConnection, 
 
75
                                       1, 
 
76
                                       AMQP_REPLY_SUCCESS),
 
77
                    "close channel");
 
78
    handleAMQPError(amqp_connection_close(rabbitmqConnection, 
 
79
                                          AMQP_REPLY_SUCCESS),
 
80
                    "close connection");
 
81
    amqp_destroy_connection(rabbitmqConnection);
 
82
  }
 
83
  catch(exception& e) {} // do not throw in destructorn 
 
84
  
 
85
  close(sockfd);
 
86
}
 
87
 
 
88
void RabbitMQHandler::publish(void *message, 
 
89
                              const int length, 
 
90
                              const std::string &exchangeName, 
 
91
                              const std::string &routingKey)
 
92
throw(rabbitmq_handler_exception)
 
93
{
 
94
  amqp_bytes_t b;
 
95
  b.bytes= message;
 
96
  b.len= length;
 
97
  
 
98
  if (amqp_basic_publish(rabbitmqConnection,
 
99
                         1,
 
100
                         amqp_cstring_bytes(exchangeName.c_str()),
 
101
                         amqp_cstring_bytes(routingKey.c_str()),
 
102
                         0,
 
103
                         0,
 
104
                         NULL,
 
105
                         b) < 0)
 
106
  {
 
107
    throw rabbitmq_handler_exception("Could not publish message");
 
108
  }
 
109
 
128
110
}
129
111
 
130
112
void RabbitMQHandler::handleAMQPError(amqp_rpc_reply_t x, string context) throw(rabbitmq_handler_exception)