~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/rabbitmq/rabbitmq_handler.cc

  • Committer: Lee Bieber
  • Date: 2011-04-02 15:51:08 UTC
  • mfrom: (2258.2.4 drizzle)
  • Revision ID: kalebral@gmail.com-20110402155108-ux162y1xly0z0x72
Merge Marcuse - Adds a mutex that makes the rabbitmq plugin thread safe

Show diffs side-by-side

added added

removed removed

Lines of Context:
37
37
                                 const in_port_t rabbitMQPort, 
38
38
                                 const std::string &rabbitMQUsername, 
39
39
                                 const std::string &rabbitMQPassword, 
40
 
                                 const std::string &rabbitMQVirtualhost) 
 
40
                                 const std::string &rabbitMQVirtualhost, 
 
41
                                 const std::string &rabbitMQExchange, 
 
42
                                 const std::string &rabbitMQRoutingKey) 
41
43
  throw(rabbitmq_handler_exception) :
42
44
    rabbitmqConnection(amqp_new_connection()),
43
 
    sockfd(amqp_open_socket(rabbitMQHost.c_str(), rabbitMQPort)),
44
45
    hostname(rabbitMQHost),
45
46
    port(rabbitMQPort),
46
47
    username(rabbitMQUsername),
47
48
    password(rabbitMQPassword),
48
 
    virtualhost(rabbitMQVirtualhost)
49
 
{
50
 
  /* open the socket to the rabbitmq server */
 
49
    virtualhost(rabbitMQVirtualhost),
 
50
    exchange(rabbitMQExchange), 
 
51
    routingKey(rabbitMQRoutingKey)
 
52
{
 
53
  pthread_mutex_init(&publishLock, NULL);
 
54
  connect();
 
55
}
 
56
 
 
57
RabbitMQHandler::~RabbitMQHandler()
 
58
{
 
59
  pthread_mutex_destroy(&publishLock);
 
60
  disconnect();
 
61
}
 
62
 
 
63
void RabbitMQHandler::publish(void *message, 
 
64
                              const int length)
 
65
throw(rabbitmq_handler_exception)
 
66
{
 
67
  pthread_mutex_lock(&publishLock);
 
68
  amqp_bytes_t b;
 
69
  b.bytes= message;
 
70
  b.len= length;
 
71
  
 
72
  if (amqp_basic_publish(rabbitmqConnection,
 
73
                         1,
 
74
                         amqp_cstring_bytes(exchange.c_str()),
 
75
                         amqp_cstring_bytes(routingKey.c_str()),
 
76
                         0,
 
77
                         0,
 
78
                         NULL,
 
79
                         b) < 0)
 
80
  {
 
81
    pthread_mutex_unlock(&publishLock);
 
82
    throw rabbitmq_handler_exception("Could not publish message");
 
83
  }
 
84
  pthread_mutex_unlock(&publishLock);
 
85
 
 
86
}
 
87
 
 
88
void RabbitMQHandler::reconnect() throw(rabbitmq_handler_exception)
 
89
{
 
90
  disconnect();
 
91
  connect();
 
92
}
 
93
 
 
94
void RabbitMQHandler::disconnect() throw(rabbitmq_handler_exception) 
 
95
{
 
96
  try
 
97
  {
 
98
    handleAMQPError(amqp_channel_close(rabbitmqConnection, 
 
99
                                       1, 
 
100
                                       AMQP_REPLY_SUCCESS),
 
101
                    "close channel");
 
102
    handleAMQPError(amqp_connection_close(rabbitmqConnection, 
 
103
                                          AMQP_REPLY_SUCCESS),
 
104
                    "close connection");
 
105
    amqp_destroy_connection(rabbitmqConnection);
 
106
  }
 
107
  catch(exception& e) {} // do not throw in destructorn 
 
108
  close(sockfd);
 
109
}
 
110
 
 
111
void RabbitMQHandler::connect() throw(rabbitmq_handler_exception) {
 
112
  sockfd = amqp_open_socket(hostname.c_str(), port);
51
113
  if(sockfd < 0) 
52
114
  {
53
115
    throw rabbitmq_handler_exception(_("Could not open socket, is rabbitmq running?"));
65
127
                  "rabbitmq login");
66
128
  /* open the channel */
67
129
  amqp_channel_open(rabbitmqConnection, 1);
68
 
}
69
 
 
70
 
RabbitMQHandler::~RabbitMQHandler()
71
 
{
72
 
  try
73
 
  {
74
 
    handleAMQPError(amqp_channel_close(rabbitmqConnection, 
75
 
                                       1, 
76
 
                                       AMQP_REPLY_SUCCESS),
77
 
                    "close channel");
78
 
    handleAMQPError(amqp_connection_close(rabbitmqConnection, 
79
 
                                          AMQP_REPLY_SUCCESS),
80
 
                    "close connection");
81
 
    amqp_destroy_connection(rabbitmqConnection);
82
 
  }
83
 
  catch(exception& e) {} // do not throw in destructorn 
84
 
  
85
 
  close(sockfd);
86
 
}
87
 
 
88
 
void RabbitMQHandler::publish(void *message, 
89
 
                              const int length, 
90
 
                              const std::string &exchangeName, 
91
 
                              const std::string &routingKey)
92
 
throw(rabbitmq_handler_exception)
93
 
{
94
 
  amqp_bytes_t b;
95
 
  b.bytes= message;
96
 
  b.len= length;
97
 
  
98
 
  if (amqp_basic_publish(rabbitmqConnection,
99
 
                         1,
100
 
                         amqp_cstring_bytes(exchangeName.c_str()),
101
 
                         amqp_cstring_bytes(routingKey.c_str()),
102
 
                         0,
103
 
                         0,
104
 
                         NULL,
105
 
                         b) < 0)
106
 
  {
107
 
    throw rabbitmq_handler_exception("Could not publish message");
108
 
  }
109
 
 
 
130
  handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection), "RPC Reply");
 
131
  amqp_table_t empty_table = { 0, NULL }; // for users of old librabbitmq users - amqp_empty_table did not exist
 
132
  amqp_exchange_declare(rabbitmqConnection, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes("fanout"), 0, 0, empty_table);
 
133
  handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection), "RPC Reply");
110
134
}
111
135
 
112
136
void RabbitMQHandler::handleAMQPError(amqp_rpc_reply_t x, string context) throw(rabbitmq_handler_exception)