~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/rabbitmq/rabbitmq_handler.cc

  • Committer: Padraig O'Sullivan
  • Date: 2009-07-22 23:26:26 UTC
  • mto: (1039.5.43 replication)
  • mto: This revision was merged to the branch mainline in revision 1130.
  • Revision ID: osullivan.padraig@gmail.com-20090722232626-mu4khq7ho6dqcf7q
Created a simple filtered replicator that can filter by schema name or table
name.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2010 Marcus Eriksson
5
 
 *
6
 
 *  Authors:
7
 
 *
8
 
 *  Marcus Eriksson <krummas@gmail.com>
9
 
 *
10
 
 *  This program is free software; you can redistribute it and/or modify
11
 
 *  it under the terms of the GNU General Public License as published by
12
 
 *  the Free Software Foundation; either version 2 of the License, or
13
 
 *  (at your option) any later version.
14
 
 *
15
 
 *  This program is distributed in the hope that it will be useful,
16
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
 
 *  GNU General Public License for more details.
19
 
 *
20
 
 *  You should have received a copy of the GNU General Public License
21
 
 *  along with this program; if not, write to the Free Software
22
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
23
 
 */
24
 
 
25
 
#include "config.h"
26
 
 
27
 
#include <drizzled/gettext.h>
28
 
 
29
 
#include "rabbitmq_handler.h"
30
 
 
31
 
using namespace std;
32
 
 
33
 
namespace drizzle_plugin
34
 
{
35
 
 
36
 
RabbitMQHandler::RabbitMQHandler(const std::string &rabbitMQHost, 
37
 
                                 const in_port_t rabbitMQPort, 
38
 
                                 const std::string &rabbitMQUsername, 
39
 
                                 const std::string &rabbitMQPassword, 
40
 
                                 const std::string &rabbitMQVirtualhost) 
41
 
  throw(rabbitmq_handler_exception) :
42
 
    rabbitmqConnection(amqp_new_connection()),
43
 
    sockfd(amqp_open_socket(rabbitMQHost.c_str(), rabbitMQPort)),
44
 
    hostname(rabbitMQHost),
45
 
    port(rabbitMQPort),
46
 
    username(rabbitMQUsername),
47
 
    password(rabbitMQPassword),
48
 
    virtualhost(rabbitMQVirtualhost)
49
 
{
50
 
  /* open the socket to the rabbitmq server */
51
 
  if(sockfd < 0) 
52
 
  {
53
 
    throw rabbitmq_handler_exception(_("Could not open socket, is rabbitmq running?"));
54
 
  }
55
 
  amqp_set_sockfd(rabbitmqConnection, sockfd);
56
 
  /* login to rabbitmq, handleAMQPError throws exception if there is a problem */
57
 
  handleAMQPError(amqp_login(rabbitmqConnection, 
58
 
                             virtualhost.c_str(), 
59
 
                             0, 
60
 
                             131072, 
61
 
                             0, 
62
 
                             AMQP_SASL_METHOD_PLAIN, 
63
 
                             username.c_str(), 
64
 
                             password.c_str()), 
65
 
                  "rabbitmq login");
66
 
  /* open the channel */
67
 
  amqp_channel_open(rabbitmqConnection, 1);
68
 
}
69
 
 
70
 
RabbitMQHandler::~RabbitMQHandler()
71
 
{
72
 
  try
73
 
  {
74
 
    handleAMQPError(amqp_channel_close(rabbitmqConnection, 
75
 
                                       1, 
76
 
                                       AMQP_REPLY_SUCCESS),
77
 
                    "close channel");
78
 
    handleAMQPError(amqp_connection_close(rabbitmqConnection, 
79
 
                                          AMQP_REPLY_SUCCESS),
80
 
                    "close connection");
81
 
    amqp_destroy_connection(rabbitmqConnection);
82
 
  }
83
 
  catch(exception& e) {} // do not throw in destructorn 
84
 
  
85
 
  close(sockfd);
86
 
}
87
 
 
88
 
void RabbitMQHandler::publish(void *message, 
89
 
                              const int length, 
90
 
                              const std::string &exchangeName, 
91
 
                              const std::string &routingKey)
92
 
throw(rabbitmq_handler_exception)
93
 
{
94
 
  amqp_bytes_t b;
95
 
  b.bytes= message;
96
 
  b.len= length;
97
 
  
98
 
  if (amqp_basic_publish(rabbitmqConnection,
99
 
                         1,
100
 
                         amqp_cstring_bytes(exchangeName.c_str()),
101
 
                         amqp_cstring_bytes(routingKey.c_str()),
102
 
                         0,
103
 
                         0,
104
 
                         NULL,
105
 
                         b) < 0)
106
 
  {
107
 
    throw rabbitmq_handler_exception("Could not publish message");
108
 
  }
109
 
 
110
 
}
111
 
 
112
 
void RabbitMQHandler::handleAMQPError(amqp_rpc_reply_t x, string context) throw(rabbitmq_handler_exception)
113
 
{
114
 
  string errorMessage("");
115
 
  switch (x.reply_type) {
116
 
  case AMQP_RESPONSE_NORMAL:
117
 
    break;
118
 
  case AMQP_RESPONSE_NONE:
119
 
    errorMessage.assign("No response in ");
120
 
    errorMessage.append(context);
121
 
    throw rabbitmq_handler_exception(errorMessage);
122
 
  case AMQP_RESPONSE_LIBRARY_EXCEPTION:
123
 
  case AMQP_RESPONSE_SERVER_EXCEPTION:
124
 
    switch (x.reply.id) {      
125
 
    case AMQP_CONNECTION_CLOSE_METHOD:
126
 
      errorMessage.assign("Connection closed in ");
127
 
      errorMessage.append(context);
128
 
      throw rabbitmq_handler_exception(errorMessage);
129
 
    case AMQP_CHANNEL_CLOSE_METHOD:
130
 
      errorMessage.assign("Channel closed in ");
131
 
      errorMessage.append(context);
132
 
      throw rabbitmq_handler_exception(errorMessage);
133
 
    default:
134
 
      errorMessage.assign("Unknown error in ");
135
 
      errorMessage.append(context);
136
 
      throw rabbitmq_handler_exception(errorMessage);
137
 
    }
138
 
  }
139
 
}
140
 
 
141
 
} /* namespace drizzle_plugin */