14
from collections import deque
15
from functools import partial
12
from amqplib import client_0_8 as amqp
14
from threading import local as thread_local
21
from amqplib import client_0_8 as amqp
23
from transaction._transaction import Status as TransactionStatus
24
17
from zope.interface import implements
26
19
from canonical.config import config
27
20
from lp.services.messaging.interfaces import (
37
26
LAUNCHPAD_EXCHANGE = "launchpad-exchange"
40
class RabbitSessionTransactionSync:
42
implements(transaction.interfaces.ISynchronizer)
44
def __init__(self, session):
45
self.session = session
47
def newTransaction(self, txn):
50
def beforeCompletion(self, txn):
53
def afterCompletion(self, txn):
54
if txn.status == TransactionStatus.COMMITTED:
61
"""Return True if rabbit looks to be configured."""
63
config.rabbitmq.host is None or
64
config.rabbitmq.userid is None or
65
config.rabbitmq.password is None or
66
config.rabbitmq.virtual_host is None)
70
"""Connect to AMQP if possible.
72
:raises MessagingUnavailable: If the configuration is incomplete.
74
if not is_configured():
75
raise MessagingUnavailable("Incomplete configuration")
76
return amqp.Connection(
77
host=config.rabbitmq.host, userid=config.rabbitmq.userid,
78
password=config.rabbitmq.password,
79
virtual_host=config.rabbitmq.virtual_host, insist=False)
82
class RabbitSession(threading.local):
84
implements(IMessageSession)
86
exchange = LAUNCHPAD_EXCHANGE
89
super(RabbitSession, self).__init__()
90
self._connection = None
91
self._deferred = deque()
92
# Maintain sessions according to transaction boundaries. Keep a strong
93
# reference to the sync because the transaction manager does not. We
94
# need one per thread (definining it here is enough to ensure that).
95
self._sync = RabbitSessionTransactionSync(self)
96
transaction.manager.registerSynch(self._sync)
99
def is_connected(self):
100
"""See `IMessageSession`."""
102
self._connection is not None and
103
self._connection.transport is not None)
106
"""See `IMessageSession`.
108
Open a connection for this thread if necessary. Connections cannot be
109
shared between threads.
111
if self._connection is None or self._connection.transport is None:
112
self._connection = connect()
113
return self._connection
115
def disconnect(self):
116
"""See `IMessageSession`."""
117
if self._connection is not None:
119
self._connection.close()
121
self._connection = None
124
"""See `IMessageSession`."""
125
tasks = self._deferred
126
while len(tasks) != 0:
130
"""See `IMessageSession`."""
137
"""See `IMessageSession`."""
138
self._deferred.clear()
141
def defer(self, func, *args, **kwargs):
142
"""See `IMessageSession`."""
143
self._deferred.append(partial(func, *args, **kwargs))
145
def getProducer(self, name):
146
"""See `IMessageSession`."""
147
return RabbitRoutingKey(self, name)
149
def getConsumer(self, name):
150
"""See `IMessageSession`."""
151
return RabbitQueue(self, name)
154
# Per-thread sessions.
155
session = RabbitSession()
156
session_finish_handler = (
157
lambda event: session.finish())
160
class RabbitUnreliableSession(RabbitSession):
161
"""An "unreliable" `RabbitSession`.
163
Unreliable in this case means that certain errors in deferred tasks are
164
silently suppressed. This means that services can continue to function
165
even in the absence of a running and fully functional message queue.
167
Other types of errors are also caught because we don't want this
168
subsystem to destabilise other parts of Launchpad but we nonetheless
169
record OOPses for these.
171
XXX: We only suppress MessagingUnavailable for now because we want to
172
monitor this closely before we add more exceptions to the
173
suppressed_errors list. Potential candidates are `MessagingException`,
174
`IOError` or `amqp.AMQPException`.
177
suppressed_errors = (
178
MessagingUnavailable,
182
"""See `IMessageSession`.
184
Suppresses errors listed in `suppressed_errors`. Also suppresses
185
other errors but files an oops report for these.
188
super(RabbitUnreliableSession, self).finish()
189
except self.suppressed_errors:
192
from canonical.launchpad.webapp import errorlog
193
errorlog.globalErrorUtility.raising(sys.exc_info())
196
# Per-thread "unreliable" sessions.
197
unreliable_session = RabbitUnreliableSession()
198
unreliable_session_finish_handler = (
199
lambda event: unreliable_session.finish())
29
class MessagingDataManager:
30
"""A Zope transaction data manager for Launchpad messaging.
32
This class implements the necessary code to send messages only when
33
the Zope transactions are committed. It will iterate over the messages
34
and send them using queue.send(oncommit=False).
36
def __init__(self, messages):
37
self.messages = messages
40
"""Completely remove the list of stored messages"""
46
def tpc_begin(self, txn):
49
def tpc_vote(self, txn):
52
def tpc_finish(self, txn):
55
def tpc_abort(self, txn):
59
"""Ensure that messages are sent after PostgresSQL connections
61
return "zz_messaging_%s" % id(self)
63
def commit(self, txn):
64
for send_func, data in self.messages:
202
69
class RabbitMessageBase:
203
70
"""Base class for all RabbitMQ messaging."""
205
def __init__(self, session):
206
self.session = IMessageSession(session)
211
if self._channel is None or not self._channel.is_open:
212
connection = self.session.connect()
213
self._channel = connection.channel()
214
self._channel.exchange_declare(
215
self.session.exchange, "direct", durable=False,
216
auto_delete=False, nowait=False)
72
class_locals = thread_local()
76
def _initialize(self):
77
# Open a connection and channel for this thread if necessary.
78
# Connections cannot be shared between threads.
79
if not hasattr(self.class_locals, "rabbit_connection"):
80
conn = amqp.Connection(
81
host=config.rabbitmq.host, userid=config.rabbitmq.userid,
82
password=config.rabbitmq.password,
83
virtual_host=config.rabbitmq.virtual_host, insist=False)
84
self.class_locals.rabbit_connection = conn
86
# Initialize storage for oncommit messages.
87
self.class_locals.messages = []
89
conn = self.class_locals.rabbit_connection
90
self.channel = conn.channel()
91
#self.channel.access_request(
92
# '/data', active=True, write=True, read=True)
93
self.channel.exchange_declare(
94
LAUNCHPAD_EXCHANGE, "direct", durable=False,
95
auto_delete=False, nowait=False)
98
# Note the connection is not closed - it is shared with other
99
# queues. Just close our channel.
103
def _disconnect(self):
104
"""Disconnect from rabbit. The connection is shared, so this will
105
break other RabbitQueue instances."""
107
if hasattr(self.class_locals, 'rabbit_connection'):
108
self.class_locals.rabbit_connection.close()
109
del self.class_locals.rabbit_connection
220
112
class RabbitRoutingKey(RabbitMessageBase):
223
115
implements(IMessageProducer)
225
def __init__(self, session, routing_key):
226
super(RabbitRoutingKey, self).__init__(session)
117
def __init__(self, routing_key):
227
118
self.key = routing_key
229
120
def associateConsumer(self, consumer):
230
121
"""Only receive messages for requested routing key."""
231
self.session.defer(self.associateConsumerNow, consumer)
233
def associateConsumerNow(self, consumer):
234
"""Only receive messages for requested routing key."""
235
# The queue will be auto-deleted 5 minutes after its last use.
236
# http://www.rabbitmq.com/extensions.html#queue-leases
237
self.channel.queue_declare(
238
consumer.name, nowait=False, auto_delete=False,
239
arguments={"x-expires": 300000}) # 5 minutes.
240
123
self.channel.queue_bind(
241
queue=consumer.name, exchange=self.session.exchange,
124
queue=consumer.name, exchange=LAUNCHPAD_EXCHANGE,
125
routing_key=self.key, nowait=False)
127
def disassociateConsumer(self, consumer):
128
"""Stop receiving messages for the requested routing key."""
130
self.channel.queue_unbind(
131
queue=consumer.name, exchange=LAUNCHPAD_EXCHANGE,
242
132
routing_key=self.key, nowait=False)
244
134
def send(self, data):
245
135
"""See `IMessageProducer`."""
246
self.session.defer(self.sendNow, data)
137
messages = self.class_locals.messages
138
# XXX: The data manager should close channels and flush too
140
transaction.get().join(MessagingDataManager(messages))
141
messages.append((self.send_now, data))
248
def sendNow(self, data):
143
def send_now(self, data):
249
144
"""Immediately send a message to the broker."""
250
146
json_data = json.dumps(data)
251
147
msg = amqp.Message(json_data)
252
148
self.channel.basic_publish(
253
exchange=self.session.exchange,
254
routing_key=self.key, msg=msg)
149
exchange=LAUNCHPAD_EXCHANGE, routing_key=self.key, msg=msg)
257
152
class RabbitQueue(RabbitMessageBase):
260
155
implements(IMessageConsumer)
262
def __init__(self, session, name):
263
super(RabbitQueue, self).__init__(session)
157
def __init__(self, name):
160
self.channel.queue_declare(self.name, nowait=False)
266
162
def receive(self, timeout=0.0):
267
163
"""Pull a message from the queue.
269
165
:param timeout: Wait a maximum of `timeout` seconds before giving up,
270
trying at least once.
271
:raises QueueEmpty: if the timeout passes.
166
trying at least once. If timeout is None, block forever.
167
:raises: EmptyQueueException if the timeout passes.
273
endtime = time.time() + timeout
169
starttime = time.time()
276
message = self.channel.basic_get(self.name)
278
if time.time() > endtime:
282
self.channel.basic_ack(message.delivery_tag)
283
return json.loads(message.body)
284
except amqp.AMQPChannelException, error:
285
if error.amqp_reply_code == 404:
286
raise QueueNotFound()
171
message = self.channel.basic_get(self.name)
173
if time.time() > (starttime + timeout):
174
raise EmptyQueueException
177
data = json.loads(message.body)
178
self.channel.basic_ack(message.delivery_tag)
181
# XXX The code below will be useful when we can implement this
185
result.append(json.loads(msg.body))
187
self.channel.basic_consume(self.name, callback=callback)