~drizzle-trunk/drizzle/development

1283.2.2 by Marcus Eriksson
add copyright headers, change load_by_default to no, include config.h, change string initialization to () style
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 *
4
 *  Copyright (C) 2010 Marcus Eriksson
5
 *
6
 *  Authors:
7
 *
8
 *  Marcus Eriksson <krummas@gmail.com>
9
 *
10
 *  This program is free software; you can redistribute it and/or modify
11
 *  it under the terms of the GNU General Public License as published by
12
 *  the Free Software Foundation; either version 2 of the License, or
13
 *  (at your option) any later version.
14
 *
15
 *  This program is distributed in the hope that it will be useful,
16
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
 *  GNU General Public License for more details.
19
 *
20
 *  You should have received a copy of the GNU General Public License
21
 *  along with this program; if not, write to the Free Software
22
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
23
 */
24
2173.2.1 by Monty Taylor
Fixes incorrect usage of include
25
#include <config.h>
1964.2.17 by Monty Taylor
Did logging_query. Fixed rabbitmq.
26
27
#include <drizzled/gettext.h>
28
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
29
#include "rabbitmq_handler.h"
30
31
using namespace std;
32
1964.2.14 by Monty Taylor
RabbitMQ.
33
namespace drizzle_plugin
34
{
35
36
RabbitMQHandler::RabbitMQHandler(const std::string &rabbitMQHost, 
37
                                 const in_port_t rabbitMQPort, 
38
                                 const std::string &rabbitMQUsername, 
39
                                 const std::string &rabbitMQPassword, 
40
                                 const std::string &rabbitMQVirtualhost) 
41
  throw(rabbitmq_handler_exception) :
42
    rabbitmqConnection(amqp_new_connection()),
43
    sockfd(amqp_open_socket(rabbitMQHost.c_str(), rabbitMQPort)),
44
    hostname(rabbitMQHost),
45
    port(rabbitMQPort),
46
    username(rabbitMQUsername),
47
    password(rabbitMQPassword),
48
    virtualhost(rabbitMQVirtualhost)
49
{
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
50
  /* open the socket to the rabbitmq server */
51
  if(sockfd < 0) 
52
  {
1964.2.17 by Monty Taylor
Did logging_query. Fixed rabbitmq.
53
    throw rabbitmq_handler_exception(_("Could not open socket, is rabbitmq running?"));
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
54
  }
55
  amqp_set_sockfd(rabbitmqConnection, sockfd);
56
  /* login to rabbitmq, handleAMQPError throws exception if there is a problem */
57
  handleAMQPError(amqp_login(rabbitmqConnection, 
1964.2.17 by Monty Taylor
Did logging_query. Fixed rabbitmq.
58
                             virtualhost.c_str(), 
59
                             0, 
60
                             131072, 
61
                             0, 
62
                             AMQP_SASL_METHOD_PLAIN, 
63
                             username.c_str(), 
64
                             password.c_str()), 
65
                  "rabbitmq login");
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
66
  /* open the channel */
67
  amqp_channel_open(rabbitmqConnection, 1);
68
}
69
70
RabbitMQHandler::~RabbitMQHandler()
71
{
72
  try
73
  {
74
    handleAMQPError(amqp_channel_close(rabbitmqConnection, 
75
				       1, 
76
				       AMQP_REPLY_SUCCESS),
77
		    "close channel");
78
    handleAMQPError(amqp_connection_close(rabbitmqConnection, 
79
					  AMQP_REPLY_SUCCESS),
80
		    "close connection");
81
    amqp_destroy_connection(rabbitmqConnection);
82
  }
83
  catch(exception& e) {} // do not throw in destructorn 
84
  
85
  close(sockfd);
86
}
87
1964.2.14 by Monty Taylor
RabbitMQ.
88
void RabbitMQHandler::publish(void *message, 
89
                              const int length, 
90
                              const std::string &exchangeName, 
91
                              const std::string &routingKey)
92
throw(rabbitmq_handler_exception)
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
93
{
94
  amqp_bytes_t b;
1964.2.14 by Monty Taylor
RabbitMQ.
95
  b.bytes= message;
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
96
  b.len= length;
97
  
1964.2.14 by Monty Taylor
RabbitMQ.
98
  if (amqp_basic_publish(rabbitmqConnection,
99
                         1,
100
                         amqp_cstring_bytes(exchangeName.c_str()),
101
                         amqp_cstring_bytes(routingKey.c_str()),
102
                         0,
103
                         0,
104
                         NULL,
105
                         b) < 0)
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
106
  {
107
    throw rabbitmq_handler_exception("Could not publish message");
108
  }
109
110
}
111
112
void RabbitMQHandler::handleAMQPError(amqp_rpc_reply_t x, string context) throw(rabbitmq_handler_exception)
113
{
1283.2.2 by Marcus Eriksson
add copyright headers, change load_by_default to no, include config.h, change string initialization to () style
114
  string errorMessage("");
1283.2.1 by Marcus Eriksson
rabbitmq replication applier
115
  switch (x.reply_type) {
116
  case AMQP_RESPONSE_NORMAL:
117
    break;
118
  case AMQP_RESPONSE_NONE:
119
    errorMessage.assign("No response in ");
120
    errorMessage.append(context);
121
    throw rabbitmq_handler_exception(errorMessage);
122
  case AMQP_RESPONSE_LIBRARY_EXCEPTION:
123
  case AMQP_RESPONSE_SERVER_EXCEPTION:
124
    switch (x.reply.id) {      
125
    case AMQP_CONNECTION_CLOSE_METHOD:
126
      errorMessage.assign("Connection closed in ");
127
      errorMessage.append(context);
128
      throw rabbitmq_handler_exception(errorMessage);
129
    case AMQP_CHANNEL_CLOSE_METHOD:
130
      errorMessage.assign("Channel closed in ");
131
      errorMessage.append(context);
132
      throw rabbitmq_handler_exception(errorMessage);
133
    default:
134
      errorMessage.assign("Unknown error in ");
135
      errorMessage.append(context);
136
      throw rabbitmq_handler_exception(errorMessage);
137
    }
138
  }
139
}
1964.2.14 by Monty Taylor
RabbitMQ.
140
141
} /* namespace drizzle_plugin */