~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/rabbitmq/rabbitmq_handler.cc

  • Committer: Brian Aker
  • Date: 2009-01-24 09:43:35 UTC
  • Revision ID: brian@gir-3.local-20090124094335-6qdtvc35gl5fvivz
Adding in an example singe thread scheduler

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 *  Copyright (C) 2010 Marcus Eriksson
5
 
 *
6
 
 *  Authors:
7
 
 *
8
 
 *  Marcus Eriksson <krummas@gmail.com>
9
 
 *
10
 
 *  This program is free software; you can redistribute it and/or modify
11
 
 *  it under the terms of the GNU General Public License as published by
12
 
 *  the Free Software Foundation; either version 2 of the License, or
13
 
 *  (at your option) any later version.
14
 
 *
15
 
 *  This program is distributed in the hope that it will be useful,
16
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18
 
 *  GNU General Public License for more details.
19
 
 *
20
 
 *  You should have received a copy of the GNU General Public License
21
 
 *  along with this program; if not, write to the Free Software
22
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
23
 
 */
24
 
 
25
 
#include "config.h"
26
 
#include "rabbitmq_handler.h"
27
 
 
28
 
using namespace std;
29
 
 
30
 
RabbitMQHandler::RabbitMQHandler(const char* rabbitMQHost, 
31
 
                                 const int rabbitMQPort, 
32
 
                                 const char* rabbitMQUsername, 
33
 
                                 const char* rabbitMQPassword, 
34
 
                                 const char* rabbitMQVirtualhost) 
35
 
  throw(rabbitmq_handler_exception):
36
 
  hostname(rabbitMQHost),
37
 
  port(rabbitMQPort),
38
 
  username(rabbitMQUsername),
39
 
  password(rabbitMQPassword),
40
 
  virtualhost(rabbitMQVirtualhost)
41
 
{
42
 
  rabbitmqConnection = amqp_new_connection();
43
 
  /* open the socket to the rabbitmq server */
44
 
  sockfd = amqp_open_socket(hostname, port);
45
 
  if(sockfd < 0) 
46
 
  {
47
 
    throw rabbitmq_handler_exception("Could not open socket, is rabbitmq running?");
48
 
  }
49
 
  amqp_set_sockfd(rabbitmqConnection, sockfd);
50
 
  /* login to rabbitmq, handleAMQPError throws exception if there is a problem */
51
 
  handleAMQPError(amqp_login(rabbitmqConnection, 
52
 
                             virtualhost, 
53
 
                             0, 
54
 
                             131072, 
55
 
                             0, 
56
 
                             AMQP_SASL_METHOD_PLAIN, 
57
 
                             username, 
58
 
                             password), 
59
 
                  "rabbitmq login");
60
 
  /* open the channel */
61
 
  amqp_channel_open(rabbitmqConnection, 1);
62
 
}
63
 
 
64
 
RabbitMQHandler::~RabbitMQHandler()
65
 
{
66
 
  try
67
 
  {
68
 
    handleAMQPError(amqp_channel_close(rabbitmqConnection, 
69
 
                                       1, 
70
 
                                       AMQP_REPLY_SUCCESS),
71
 
                    "close channel");
72
 
    handleAMQPError(amqp_connection_close(rabbitmqConnection, 
73
 
                                          AMQP_REPLY_SUCCESS),
74
 
                    "close connection");
75
 
    amqp_destroy_connection(rabbitmqConnection);
76
 
  }
77
 
  catch(exception& e) {} // do not throw in destructorn 
78
 
  
79
 
  close(sockfd);
80
 
}
81
 
 
82
 
void RabbitMQHandler::publish(const uint8_t *message, 
83
 
                              const int length, 
84
 
                              const char* exchangeName, 
85
 
                              const char* routingKey) throw(rabbitmq_handler_exception)
86
 
{
87
 
  amqp_bytes_t b;
88
 
  b.bytes= (void*)message;
89
 
  b.len= length;
90
 
  
91
 
  if(amqp_basic_publish(rabbitmqConnection,
92
 
                        1,
93
 
                        amqp_cstring_bytes(exchangeName),
94
 
                        amqp_cstring_bytes(routingKey),
95
 
                        0,
96
 
                        0,
97
 
                        NULL,
98
 
                        b) < 0)
99
 
  {
100
 
    throw rabbitmq_handler_exception("Could not publish message");
101
 
  }
102
 
 
103
 
}
104
 
 
105
 
void RabbitMQHandler::handleAMQPError(amqp_rpc_reply_t x, string context) throw(rabbitmq_handler_exception)
106
 
{
107
 
  string errorMessage("");
108
 
  switch (x.reply_type) {
109
 
  case AMQP_RESPONSE_NORMAL:
110
 
    break;
111
 
  case AMQP_RESPONSE_NONE:
112
 
    errorMessage.assign("No response in ");
113
 
    errorMessage.append(context);
114
 
    throw rabbitmq_handler_exception(errorMessage);
115
 
  case AMQP_RESPONSE_LIBRARY_EXCEPTION:
116
 
  case AMQP_RESPONSE_SERVER_EXCEPTION:
117
 
    switch (x.reply.id) {      
118
 
    case AMQP_CONNECTION_CLOSE_METHOD:
119
 
      errorMessage.assign("Connection closed in ");
120
 
      errorMessage.append(context);
121
 
      throw rabbitmq_handler_exception(errorMessage);
122
 
    case AMQP_CHANNEL_CLOSE_METHOD:
123
 
      errorMessage.assign("Channel closed in ");
124
 
      errorMessage.append(context);
125
 
      throw rabbitmq_handler_exception(errorMessage);
126
 
    default:
127
 
      errorMessage.assign("Unknown error in ");
128
 
      errorMessage.append(context);
129
 
      throw rabbitmq_handler_exception(errorMessage);
130
 
    }
131
 
  }
132
 
}