~drizzle-trunk/drizzle/development

1283.2.2 by Marcus Eriksson
add copyright headers, change load_by_default to no, include config.h, change string initialization to () style
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 *
4
 *  Copyright (C) 2010 Marcus Eriksson
5
 *
6
 *  Authors:
7
 *
8
 *  Marcus Eriksson <krummas@gmail.com>
9
 *
10
 *  This program is free software; you can redistribute it and/or modify
11
 *  it under the terms of the GNU General Public License as published by
12
 *  the Free Software Foundation; either version 2 of the License, or
13
 *  (at your option) any later version.
14
 *
15
 *  This program is distributed in the hope that it will be useful,
16
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
 *  GNU General Public License for more details.
19
 *
20
 *  You should have received a copy of the GNU General Public License
21
 *  along with this program; if not, write to the Free Software
22
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
23
 */
24
2173.2.1 by Monty Taylor
Fixes incorrect usage of include
25
#include <config.h>
1964.2.17 by Monty Taylor
Did logging_query. Fixed rabbitmq.
26
27
#include <drizzled/gettext.h>
28
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
29
#include "rabbitmq_handler.h"
30
31
using namespace std;
32
1964.2.14 by Monty Taylor
RabbitMQ.
33
namespace drizzle_plugin
34
{
35
36
RabbitMQHandler::RabbitMQHandler(const std::string &rabbitMQHost, 
37
                                 const in_port_t rabbitMQPort, 
38
                                 const std::string &rabbitMQUsername, 
39
                                 const std::string &rabbitMQPassword, 
2258.2.1 by Marcus Eriksson
Add RabbitMQ documentation and update the rabbitmq plugin to reconnect etc if the server goes away
40
                                 const std::string &rabbitMQVirtualhost, 
41
				 const std::string &rabbitMQExchange, 
42
				 const std::string &rabbitMQRoutingKey) 
1964.2.14 by Monty Taylor
RabbitMQ.
43
  throw(rabbitmq_handler_exception) :
44
    rabbitmqConnection(amqp_new_connection()),
45
    hostname(rabbitMQHost),
46
    port(rabbitMQPort),
47
    username(rabbitMQUsername),
48
    password(rabbitMQPassword),
2258.2.1 by Marcus Eriksson
Add RabbitMQ documentation and update the rabbitmq plugin to reconnect etc if the server goes away
49
    virtualhost(rabbitMQVirtualhost),
50
    exchange(rabbitMQExchange), 
51
    routingKey(rabbitMQRoutingKey)
52
{
2258.2.2 by Marcus Eriksson
add mutex - TransactionAppliers are not single threaded
53
  pthread_mutex_init(&publishLock, NULL);
2258.2.1 by Marcus Eriksson
Add RabbitMQ documentation and update the rabbitmq plugin to reconnect etc if the server goes away
54
  connect();
55
}
56
57
RabbitMQHandler::~RabbitMQHandler()
58
{
2258.2.2 by Marcus Eriksson
add mutex - TransactionAppliers are not single threaded
59
  pthread_mutex_destroy(&publishLock);
2258.2.1 by Marcus Eriksson
Add RabbitMQ documentation and update the rabbitmq plugin to reconnect etc if the server goes away
60
  disconnect();
61
}
62
63
void RabbitMQHandler::publish(void *message, 
64
                              const int length)
65
throw(rabbitmq_handler_exception)
66
{
2258.2.2 by Marcus Eriksson
add mutex - TransactionAppliers are not single threaded
67
  pthread_mutex_lock(&publishLock);
2258.2.1 by Marcus Eriksson
Add RabbitMQ documentation and update the rabbitmq plugin to reconnect etc if the server goes away
68
  amqp_bytes_t b;
69
  b.bytes= message;
70
  b.len= length;
71
  
72
  if (amqp_basic_publish(rabbitmqConnection,
73
                         1,
74
                         amqp_cstring_bytes(exchange.c_str()),
75
                         amqp_cstring_bytes(routingKey.c_str()),
76
                         0,
77
                         0,
78
                         NULL,
79
                         b) < 0)
80
  {
2258.2.2 by Marcus Eriksson
add mutex - TransactionAppliers are not single threaded
81
    pthread_mutex_unlock(&publishLock);
2258.2.1 by Marcus Eriksson
Add RabbitMQ documentation and update the rabbitmq plugin to reconnect etc if the server goes away
82
    throw rabbitmq_handler_exception("Could not publish message");
83
  }
2258.2.2 by Marcus Eriksson
add mutex - TransactionAppliers are not single threaded
84
  pthread_mutex_unlock(&publishLock);
2258.2.1 by Marcus Eriksson
Add RabbitMQ documentation and update the rabbitmq plugin to reconnect etc if the server goes away
85
86
}
87
88
void RabbitMQHandler::reconnect() throw(rabbitmq_handler_exception)
89
{
90
  disconnect();
91
  connect();
92
}
93
94
void RabbitMQHandler::disconnect() throw(rabbitmq_handler_exception) 
95
{
96
  try
97
  {
98
    handleAMQPError(amqp_channel_close(rabbitmqConnection, 
99
				       1, 
100
				       AMQP_REPLY_SUCCESS),
101
		    "close channel");
102
    handleAMQPError(amqp_connection_close(rabbitmqConnection, 
103
					  AMQP_REPLY_SUCCESS),
104
		    "close connection");
105
    amqp_destroy_connection(rabbitmqConnection);
106
  }
107
  catch(exception& e) {} // do not throw in destructorn 
108
  close(sockfd);
109
}
110
111
void RabbitMQHandler::connect() throw(rabbitmq_handler_exception) {
112
  sockfd = amqp_open_socket(hostname.c_str(), port);
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
113
  if(sockfd < 0) 
114
  {
1964.2.17 by Monty Taylor
Did logging_query. Fixed rabbitmq.
115
    throw rabbitmq_handler_exception(_("Could not open socket, is rabbitmq running?"));
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
116
  }
117
  amqp_set_sockfd(rabbitmqConnection, sockfd);
118
  /* login to rabbitmq, handleAMQPError throws exception if there is a problem */
119
  handleAMQPError(amqp_login(rabbitmqConnection, 
1964.2.17 by Monty Taylor
Did logging_query. Fixed rabbitmq.
120
                             virtualhost.c_str(), 
121
                             0, 
122
                             131072, 
123
                             0, 
124
                             AMQP_SASL_METHOD_PLAIN, 
125
                             username.c_str(), 
126
                             password.c_str()), 
127
                  "rabbitmq login");
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
128
  /* open the channel */
129
  amqp_channel_open(rabbitmqConnection, 1);
2258.2.1 by Marcus Eriksson
Add RabbitMQ documentation and update the rabbitmq plugin to reconnect etc if the server goes away
130
  handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection), "RPC Reply");
2258.2.3 by Marcus Eriksson
fix issues with rabbitmq docs and plugin
131
  amqp_table_t empty_table = { 0, NULL }; // for users of old librabbitmq users - amqp_empty_table did not exist
132
  amqp_exchange_declare(rabbitmqConnection, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes("fanout"), 0, 0, empty_table);
2258.2.1 by Marcus Eriksson
Add RabbitMQ documentation and update the rabbitmq plugin to reconnect etc if the server goes away
133
  handleAMQPError(amqp_get_rpc_reply(rabbitmqConnection), "RPC Reply");
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
134
}
135
136
void RabbitMQHandler::handleAMQPError(amqp_rpc_reply_t x, string context) throw(rabbitmq_handler_exception)
137
{
1283.2.2 by Marcus Eriksson
add copyright headers, change load_by_default to no, include config.h, change string initialization to () style
138
  string errorMessage("");
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
139
  switch (x.reply_type) {
140
  case AMQP_RESPONSE_NORMAL:
141
    break;
142
  case AMQP_RESPONSE_NONE:
143
    errorMessage.assign("No response in ");
144
    errorMessage.append(context);
145
    throw rabbitmq_handler_exception(errorMessage);
146
  case AMQP_RESPONSE_LIBRARY_EXCEPTION:
147
  case AMQP_RESPONSE_SERVER_EXCEPTION:
148
    switch (x.reply.id) {      
149
    case AMQP_CONNECTION_CLOSE_METHOD:
150
      errorMessage.assign("Connection closed in ");
151
      errorMessage.append(context);
152
      throw rabbitmq_handler_exception(errorMessage);
153
    case AMQP_CHANNEL_CLOSE_METHOD:
154
      errorMessage.assign("Channel closed in ");
155
      errorMessage.append(context);
156
      throw rabbitmq_handler_exception(errorMessage);
157
    default:
158
      errorMessage.assign("Unknown error in ");
159
      errorMessage.append(context);
160
      throw rabbitmq_handler_exception(errorMessage);
161
    }
162
  }
163
}
1964.2.14 by Monty Taylor
RabbitMQ.
164
165
} /* namespace drizzle_plugin */