~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/rabbitmq/rabbitmq_log.cc

  • Committer: Lee Bieber
  • Date: 2011-03-29 22:31:41 UTC
  • mfrom: (2257.1.3 build)
  • Revision ID: kalebral@gmail.com-20110329223141-yxc22h3l2he58sk0
Merge Andrew - 743842: Build failure using GCC 4.6
Merge Stewart - 738022: CachedDirectory silently fails to add entries if stat() fails
Merge Olaf - Common fwd: add copyright, add more declaration

Show diffs side-by-side

added added

removed removed

Lines of Context:
50
50
 
51
51
 
52
52
RabbitMQLog::RabbitMQLog(const string &name, 
 
53
                         const std::string &exchange,
 
54
                         const std::string &routingkey,
53
55
                         RabbitMQHandler* mqHandler) :
54
56
  plugin::TransactionApplier(name),
55
 
  _rabbitMQHandler(mqHandler)
 
57
  _rabbitMQHandler(mqHandler),
 
58
  _exchange(exchange),
 
59
  _routingkey(routingkey)
56
60
{ }
57
61
 
58
62
RabbitMQLog::~RabbitMQLog() 
59
 
60
 
  _rabbitMQHandler->disconnect();
61
 
  delete _rabbitMQHandler;
62
 
}
 
63
{ }
63
64
 
64
65
plugin::ReplicationReturnCode
65
66
RabbitMQLog::apply(Session &, const message::Transaction &to_apply)
74
75
  }
75
76
 
76
77
  to_apply.SerializeWithCachedSizesToArray(buffer);
77
 
  short tries = 3;
78
 
  bool sent = false;
79
 
  while (!sent && tries > 0) {
80
 
    tries--;
81
 
    try
82
 
    {
83
 
      _rabbitMQHandler->publish(buffer, int(message_byte_length));
84
 
      sent = true;
85
 
    } 
86
 
    catch(exception& e)
87
 
    {
88
 
      errmsg_printf(error::ERROR, _(e.what()));
89
 
      try {
90
 
        _rabbitMQHandler->reconnect();
91
 
      } catch(exception &e) {
92
 
        errmsg_printf(error::ERROR, _("Could not reconnect, trying again.. - waiting 10 seconds for server to come back"));
93
 
        sleep(10);
94
 
      } // 
95
 
    }
96
 
  }
97
 
 
 
78
  try
 
79
  {
 
80
    _rabbitMQHandler->publish(buffer, 
 
81
                             int(message_byte_length), 
 
82
                             _exchange,
 
83
                             _routingkey);
 
84
  }
 
85
  catch(exception& e)
 
86
  {
 
87
    errmsg_printf(error::ERROR, _(e.what()));
 
88
    deactivate();
 
89
    return plugin::UNKNOWN_ERROR;
 
90
  }
98
91
  delete[] buffer;
99
 
  if(sent) return plugin::SUCCESS;
100
 
  errmsg_printf(error::ERROR, _("RabbitMQ server has disappeared, failing transaction."));
101
 
  deactivate();
102
 
  return plugin::UNKNOWN_ERROR;
 
92
  return plugin::SUCCESS;
103
93
}
104
94
 
105
95
static RabbitMQLog *rabbitmqLogger; ///< the actual plugin
106
96
static RabbitMQHandler* rabbitmqHandler; ///< the rabbitmq handler
107
97
 
108
 
 
109
98
/**
110
99
 * Initialize the rabbitmq logger - instanciates the dependencies (the handler)
111
100
 * and creates the log handler with the dependency - makes it easier to swap out
121
110
                                         sysvar_rabbitmq_port, 
122
111
                                         vm["username"].as<string>(), 
123
112
                                         vm["password"].as<string>(), 
124
 
                                         vm["virtualhost"].as<string>(),
125
 
                                         vm["exchange"].as<string>(),
126
 
                                         vm["routingkey"].as<string>());
 
113
                                         vm["virtualhost"].as<string>());
127
114
  } 
128
115
  catch (exception& e) 
129
116
  {
134
121
  try 
135
122
  {
136
123
    rabbitmqLogger= new RabbitMQLog("rabbit_log_applier",
 
124
                                    vm["exchange"].as<string>(),
 
125
                                    vm["routingkey"].as<string>(),
137
126
                                    rabbitmqHandler);
138
127
  } 
139
128
  catch (exception& e)