~drizzle-trunk/drizzle/development

1283.2.2 by Marcus Eriksson
add copyright headers, change load_by_default to no, include config.h, change string initialization to () style
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 *
4
 *  Copyright (C) 2010 Marcus Eriksson
5
 *
6
 *  Authors:
7
 *
8
 *  Marcus Eriksson <krummas@gmail.com>
9
 *
10
 *  This program is free software; you can redistribute it and/or modify
11
 *  it under the terms of the GNU General Public License as published by
12
 *  the Free Software Foundation; either version 2 of the License, or
13
 *  (at your option) any later version.
14
 *
15
 *  This program is distributed in the hope that it will be useful,
16
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
 *  GNU General Public License for more details.
19
 *
20
 *  You should have received a copy of the GNU General Public License
21
 *  along with this program; if not, write to the Free Software
22
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
23
 */
24
25
#include "config.h"
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
26
#include "rabbitmq_handler.h"
27
28
using namespace std;
29
30
RabbitMQHandler::RabbitMQHandler(const char* rabbitMQHost, 
31
				 const int rabbitMQPort, 
32
				 const char* rabbitMQUsername, 
33
				 const char* rabbitMQPassword, 
34
				 const char* rabbitMQVirtualhost) 
35
  throw(rabbitmq_handler_exception):
36
  hostname(rabbitMQHost),
37
  port(rabbitMQPort),
38
  username(rabbitMQUsername),
39
  password(rabbitMQPassword),
40
  virtualhost(rabbitMQVirtualhost)
41
{
42
  rabbitmqConnection = amqp_new_connection();
43
  /* open the socket to the rabbitmq server */
44
  sockfd = amqp_open_socket(hostname, port);
45
  if(sockfd < 0) 
46
  {
47
    throw rabbitmq_handler_exception("Could not open socket, is rabbitmq running?");
48
  }
49
  amqp_set_sockfd(rabbitmqConnection, sockfd);
50
  /* login to rabbitmq, handleAMQPError throws exception if there is a problem */
51
  handleAMQPError(amqp_login(rabbitmqConnection, 
52
			     virtualhost, 
53
			     0, 
54
			     131072, 
55
			     0, 
56
			     AMQP_SASL_METHOD_PLAIN, 
57
			     username, 
58
			     password), 
59
		  "rabbitmq login");
60
  /* open the channel */
61
  amqp_channel_open(rabbitmqConnection, 1);
62
}
63
64
RabbitMQHandler::~RabbitMQHandler()
65
{
66
  try
67
  {
68
    handleAMQPError(amqp_channel_close(rabbitmqConnection, 
69
				       1, 
70
				       AMQP_REPLY_SUCCESS),
71
		    "close channel");
72
    handleAMQPError(amqp_connection_close(rabbitmqConnection, 
73
					  AMQP_REPLY_SUCCESS),
74
		    "close connection");
75
    amqp_destroy_connection(rabbitmqConnection);
76
  }
77
  catch(exception& e) {} // do not throw in destructorn 
78
  
79
  close(sockfd);
80
}
81
82
void RabbitMQHandler::publish(const uint8_t *message, 
83
			      const int length, 
84
			      const char* exchangeName, 
85
			      const char* routingKey) throw(rabbitmq_handler_exception)
86
{
87
  amqp_bytes_t b;
88
  b.bytes= (void*)message;
89
  b.len= length;
90
  
91
  if(amqp_basic_publish(rabbitmqConnection,
92
			1,
93
			amqp_cstring_bytes(exchangeName),
94
			amqp_cstring_bytes(routingKey),
95
			0,
96
			0,
97
			NULL,
98
			b) < 0)
99
  {
100
    throw rabbitmq_handler_exception("Could not publish message");
101
  }
102
103
}
104
105
void RabbitMQHandler::handleAMQPError(amqp_rpc_reply_t x, string context) throw(rabbitmq_handler_exception)
106
{
1283.2.2 by Marcus Eriksson
add copyright headers, change load_by_default to no, include config.h, change string initialization to () style
107
  string errorMessage("");
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
108
  switch (x.reply_type) {
109
  case AMQP_RESPONSE_NORMAL:
110
    break;
111
  case AMQP_RESPONSE_NONE:
112
    errorMessage.assign("No response in ");
113
    errorMessage.append(context);
114
    throw rabbitmq_handler_exception(errorMessage);
115
  case AMQP_RESPONSE_LIBRARY_EXCEPTION:
116
  case AMQP_RESPONSE_SERVER_EXCEPTION:
117
    switch (x.reply.id) {      
118
    case AMQP_CONNECTION_CLOSE_METHOD:
119
      errorMessage.assign("Connection closed in ");
120
      errorMessage.append(context);
121
      throw rabbitmq_handler_exception(errorMessage);
122
    case AMQP_CHANNEL_CLOSE_METHOD:
123
      errorMessage.assign("Channel closed in ");
124
      errorMessage.append(context);
125
      throw rabbitmq_handler_exception(errorMessage);
126
    default:
127
      errorMessage.assign("Unknown error in ");
128
      errorMessage.append(context);
129
      throw rabbitmq_handler_exception(errorMessage);
130
    }
131
  }
132
}