8
from functools import partial
9
from itertools import count
12
from amqplib import client_0_8 as amqp
14
from transaction._transaction import Status as TransactionStatus
15
from zope.component import getUtility
10
from canonical.testing.layers import RabbitMQLayer
17
from canonical.testing.layers import (
18
LaunchpadFunctionalLayer,
11
21
from lp.services.messaging.interfaces import (
12
22
EmptyQueueException,
16
from lp.services.messaging.queue import (
27
from lp.services.messaging.rabbit import (
32
RabbitSessionTransactionSync,
33
RabbitUnreliableSession,
34
session as global_session,
35
unreliable_session as global_unreliable_session,
20
37
from lp.testing import TestCase
23
class TestRabbitQueue(TestCase):
38
from lp.testing.faketransaction import FakeTransaction
39
from lp.testing.matchers import Provides
41
# RabbitMQ is not (yet) torn down or reset between tests, so here are sources
43
queue_names = ("queue.%d" % num for num in count(1))
44
key_names = ("key.%d" % num for num in count(1))
47
class FakeRabbitSession:
53
self.log.append("finish")
56
self.log.append("reset")
59
class TestRabbitSessionTransactionSync(TestCase):
61
def test_interface(self):
63
RabbitSessionTransactionSync(None),
64
Provides(transaction.interfaces.ISynchronizer))
66
def test_afterCompletion_COMMITTED(self):
67
txn = FakeTransaction()
68
txn.status = TransactionStatus.COMMITTED
69
fake_session = FakeRabbitSession()
70
sync = RabbitSessionTransactionSync(fake_session)
71
sync.afterCompletion(txn)
72
self.assertEqual(["finish"], fake_session.log)
74
def test_afterCompletion_ACTIVE(self):
75
txn = FakeTransaction()
76
txn.status = TransactionStatus.ACTIVE
77
fake_session = FakeRabbitSession()
78
sync = RabbitSessionTransactionSync(fake_session)
79
sync.afterCompletion(txn)
80
self.assertEqual(["reset"], fake_session.log)
83
class RabbitTestCase(TestCase):
24
85
layer = RabbitMQLayer
27
super(TestCase, self).setUp()
28
self.queue_name = 'whatever'
29
self.queue = RabbitQueue(self.queue_name)
30
self.key_name = "arbitrary.routing.key"
31
self.key = RabbitRoutingKey(self.key_name)
32
self.key.associateConsumer(self.queue)
34
87
def tearDown(self):
35
self.queue._disconnect()
36
super(TestCase, self).tearDown()
38
def test_implements(self):
39
self.assertTrue(IMessageConsumer.providedBy(self.queue))
40
self.assertTrue(IMessageProducer.providedBy(self.key))
42
def test_send_now(self):
88
super(RabbitTestCase, self).tearDown()
89
global_session.reset()
90
global_unreliable_session.reset()
93
class TestRabbitSession(RabbitTestCase):
95
def test_interface(self):
96
session = RabbitSession()
97
self.assertThat(session, Provides(IMessageSession))
99
def test_connect(self):
100
session = RabbitSession()
101
self.assertIs(None, session.connection)
102
connection = session.connect()
103
self.assertIsNot(None, session.connection)
104
self.assertIs(connection, session.connection)
106
def test_disconnect(self):
107
session = RabbitSession()
110
self.assertIs(None, session.connection)
112
def test_connection(self):
113
# The connection property is None once a connection has been closed.
114
session = RabbitSession()
116
# Close the connection without using disconnect().
117
session.connection.close()
118
self.assertIs(None, session.connection)
120
def test_defer(self):
121
task = lambda foo, bar: None
122
session = RabbitSession()
123
session.defer(task, "foo", bar="baz")
124
self.assertEqual(1, len(session._deferred))
125
[deferred_task] = session._deferred
126
self.assertIsInstance(deferred_task, partial)
127
self.assertIs(task, deferred_task.func)
128
self.assertEqual(("foo",), deferred_task.args)
129
self.assertEqual({"bar": "baz"}, deferred_task.keywords)
131
def test_flush(self):
132
# RabbitSession.flush() runs deferred tasks.
134
task = lambda: log.append("task")
135
session = RabbitSession()
139
self.assertEqual(["task"], log)
140
self.assertEqual([], list(session._deferred))
141
self.assertIsNot(None, session.connection)
143
def test_reset(self):
144
# RabbitSession.reset() resets session variables and does not run
147
task = lambda: log.append("task")
148
session = RabbitSession()
152
self.assertEqual([], log)
153
self.assertEqual([], list(session._deferred))
154
self.assertIs(None, session.connection)
156
def test_finish(self):
157
# RabbitSession.finish() resets session variables after running
160
task = lambda: log.append("task")
161
session = RabbitSession()
165
self.assertEqual(["task"], log)
166
self.assertEqual([], list(session._deferred))
167
self.assertIs(None, session.connection)
169
def test_getProducer(self):
170
session = RabbitSession()
171
producer = session.getProducer("foo")
172
self.assertIsInstance(producer, RabbitRoutingKey)
173
self.assertIs(session, producer.session)
174
self.assertEqual("foo", producer.key)
176
def test_getConsumer(self):
177
session = RabbitSession()
178
consumer = session.getConsumer("foo")
179
self.assertIsInstance(consumer, RabbitQueue)
180
self.assertIs(session, consumer.session)
181
self.assertEqual("foo", consumer.name)
184
class TestRabbitUnreliableSession(RabbitTestCase):
186
def raise_AMQPException(self):
187
raise amqp.AMQPException(123, "Suffin broke.", "Whut?")
189
def test_finish_suppresses_some_errors(self):
190
session = RabbitUnreliableSession()
191
session.defer(self.raise_AMQPException)
193
# Look, no exceptions!
195
def raise_Exception(self):
196
raise Exception("That hent worked.")
198
def test_finish_does_not_suppress_other_errors(self):
199
session = RabbitUnreliableSession()
200
session.defer(self.raise_Exception)
201
self.assertRaises(Exception, session.finish)
204
class TestRabbitMessageBase(RabbitTestCase):
206
def test_session(self):
207
base = RabbitMessageBase(global_session)
208
self.assertIs(global_session, base.session)
210
def test_channel(self):
211
# Referencing the channel property causes the session to connect.
212
base = RabbitMessageBase(global_session)
213
self.assertIs(None, base.session.connection)
214
channel = base.channel
215
self.assertIsNot(None, base.session.connection)
216
self.assertIsNot(None, channel)
217
# The same channel is returned every time.
218
self.assertIs(channel, base.channel)
220
def test_channel_session_closed(self):
221
# When the session is disconnected the channel is thrown away too.
222
base = RabbitMessageBase(global_session)
223
channel1 = base.channel
224
base.session.disconnect()
225
channel2 = base.channel
226
self.assertNotEqual(channel1, channel2)
229
class TestRabbitRoutingKey(RabbitTestCase):
231
def test_interface(self):
232
routing_key = RabbitRoutingKey(global_session, next(key_names))
233
self.assertThat(routing_key, Provides(IMessageProducer))
235
def test_associateConsumer(self):
236
# associateConsumer() only associates the consumer at transaction
237
# commit time. However, order is preserved.
238
consumer = RabbitQueue(global_session, next(queue_names))
239
routing_key = RabbitRoutingKey(global_session, next(key_names))
240
routing_key.associateConsumer(consumer)
241
routing_key.sendNow('now')
242
routing_key.send('later')
243
# There is nothing in the queue because the consumer has not yet been
244
# associated with the routing key.
245
self.assertRaises(EmptyQueueException, consumer.receive, timeout=2)
247
# Now that the transaction has been committed, the consumer is
248
# associated, and receives the deferred message.
249
self.assertEqual('later', consumer.receive(timeout=2))
251
def test_associateConsumerNow(self):
252
# associateConsumerNow() associates the consumer right away.
253
consumer = RabbitQueue(global_session, next(queue_names))
254
routing_key = RabbitRoutingKey(global_session, next(key_names))
255
routing_key.associateConsumerNow(consumer)
256
routing_key.sendNow('now')
257
routing_key.send('later')
258
# There is already something in the queue.
259
self.assertEqual('now', consumer.receive(timeout=2))
261
# Now that the transaction has been committed there is another item in
263
self.assertEqual('later', consumer.receive(timeout=2))
266
consumer = RabbitQueue(global_session, next(queue_names))
267
routing_key = RabbitRoutingKey(global_session, next(key_names))
268
routing_key.associateConsumerNow(consumer)
270
for data in range(90, 100):
271
routing_key.send(data)
273
routing_key.sendNow('sync')
274
# There is nothing in the queue except the sync we just sent.
275
self.assertEqual('sync', consumer.receive(timeout=2))
277
# Messages get sent on commit
279
for data in range(90, 100):
280
self.assertEqual(data, consumer.receive())
282
# There are no more messages. They have all been consumed.
283
routing_key.sendNow('sync')
284
self.assertEqual('sync', consumer.receive(timeout=2))
286
def test_sendNow(self):
287
consumer = RabbitQueue(global_session, next(queue_names))
288
routing_key = RabbitRoutingKey(global_session, next(key_names))
289
routing_key.associateConsumerNow(consumer)
43
291
for data in range(50, 60):
44
self.key.send_now(data)
45
received_data = self.queue.receive(timeout=5)
46
self.assertEqual(received_data, data)
48
def test_receive_consumes(self):
292
routing_key.sendNow(data)
293
received_data = consumer.receive(timeout=2)
294
self.assertEqual(data, received_data)
297
class TestRabbitQueue(RabbitTestCase):
299
def test_interface(self):
300
consumer = RabbitQueue(global_session, next(queue_names))
301
self.assertThat(consumer, Provides(IMessageConsumer))
303
def test_receive(self):
304
consumer = RabbitQueue(global_session, next(queue_names))
305
routing_key = RabbitRoutingKey(global_session, next(key_names))
306
routing_key.associateConsumerNow(consumer)
49
308
for data in range(55, 65):
50
self.key.send_now(data)
51
self.assertEqual(self.queue.receive(timeout=5), data)
309
routing_key.sendNow(data)
310
self.assertEqual(data, consumer.receive(timeout=2))
53
# None of the messages we received were put back. They were all
312
# All the messages received were consumed.
55
313
self.assertRaises(
56
314
EmptyQueueException,
57
self.queue.receive, timeout=5)
315
consumer.receive, timeout=2)
59
317
# New connections to the queue see an empty queue too.
60
self.queue._disconnect()
61
key = RabbitRoutingKey(self.key_name)
62
queue = RabbitQueue(self.queue_name)
63
key.associateConsumer(queue)
64
key.send_now('new conn sync')
65
self.assertEqual(queue.receive(timeout=5), 'new conn sync')
68
for data in range(90, 100):
71
self.key.send_now('sync')
72
# There is nothing in the queue except the sync we just sent.
73
self.assertEqual(self.queue.receive(timeout=5), 'sync')
75
# Messages get sent on commit
77
for data in range(90, 100):
78
self.assertEqual(self.queue.receive(), data)
80
# There are no more messages. They have all been consumed.
81
self.key.send_now('sync')
82
self.assertEqual(self.queue.receive(timeout=5), 'sync')
318
consumer.session.disconnect()
319
consumer = RabbitQueue(global_session, next(queue_names))
320
routing_key = RabbitRoutingKey(global_session, next(key_names))
321
routing_key.associateConsumerNow(consumer)
324
consumer.receive, timeout=2)
327
class TestRabbit(RabbitTestCase):
328
"""Integration-like tests for the RabbitMQ messaging abstractions."""
330
def get_synced_sessions(self):
331
thread_id = thread.get_ident()
333
syncs_set = transaction.manager._synchs[thread_id]
338
sync.session for sync in syncs_set.data.itervalues()
339
if isinstance(sync, RabbitSessionTransactionSync))
341
def test_global_session(self):
342
self.assertIsInstance(global_session, RabbitSession)
343
self.assertIn(global_session, self.get_synced_sessions())
345
def test_global_unreliable_session(self):
346
self.assertIsInstance(
347
global_unreliable_session, RabbitUnreliableSession)
348
self.assertIn(global_unreliable_session, self.get_synced_sessions())
84
350
def test_abort(self):
351
consumer = RabbitQueue(global_session, next(queue_names))
352
routing_key = RabbitRoutingKey(global_session, next(key_names))
353
routing_key.associateConsumerNow(consumer)
85
355
for data in range(90, 100):
88
self.key.send_now('sync')
89
# There is nothing in the queue except the sync we just sent.
90
self.assertEqual(self.queue.receive(timeout=5), 'sync')
92
# Messages get forgotten on abort.
356
routing_key.send(data)
358
# Messages sent using send() are forgotten on abort.
93
359
transaction.abort()
95
# There are no more messages. They have all been consumed.
96
self.key.send_now('sync2')
97
self.assertEqual(self.queue.receive(timeout=5), 'sync2')
362
consumer.receive, timeout=2)
365
class TestRabbitWithLaunchpad(RabbitTestCase):
366
"""Integration-like tests for the RabbitMQ messaging abstractions."""
368
layer = LaunchpadFunctionalLayer
370
def test_utility(self):
371
# The unreliable session is registered as the default IMessageSession
374
global_unreliable_session,
375
getUtility(IMessageSession))