~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/services/messaging/tests/test_rabbit.py

  • Committer: Launchpad Patch Queue Manager
  • Date: 2011-09-24 09:11:33 UTC
  • mfrom: (14002.1.27 longpoll-dead-rabbit)
  • Revision ID: launchpad@pqm.canonical.com-20110924091133-9ckf4ojh50fs57si
[r=sinzui][no-qa] Make it possible for message queue using code to
 continue to work even if the queue is unavailable.

Show diffs side-by-side

added added

removed removed

Lines of Context:
5
5
 
6
6
__metaclass__ = type
7
7
 
 
8
from functools import partial
 
9
from itertools import count
 
10
import thread
 
11
 
 
12
from amqplib import client_0_8 as amqp
8
13
import transaction
 
14
from transaction._transaction import Status as TransactionStatus
 
15
from zope.component import getUtility
9
16
 
10
 
from canonical.testing.layers import RabbitMQLayer
 
17
from canonical.testing.layers import (
 
18
    LaunchpadFunctionalLayer,
 
19
    RabbitMQLayer,
 
20
    )
11
21
from lp.services.messaging.interfaces import (
12
22
    EmptyQueueException,
13
23
    IMessageConsumer,
14
24
    IMessageProducer,
 
25
    IMessageSession,
15
26
    )
16
 
from lp.services.messaging.queue import (
 
27
from lp.services.messaging.rabbit import (
 
28
    RabbitMessageBase,
17
29
    RabbitQueue,
18
30
    RabbitRoutingKey,
 
31
    RabbitSession,
 
32
    RabbitSessionTransactionSync,
 
33
    RabbitUnreliableSession,
 
34
    session as global_session,
 
35
    unreliable_session as global_unreliable_session,
19
36
    )
20
37
from lp.testing import TestCase
21
 
 
22
 
 
23
 
class TestRabbitQueue(TestCase):
 
38
from lp.testing.faketransaction import FakeTransaction
 
39
from lp.testing.matchers import Provides
 
40
 
 
41
# RabbitMQ is not (yet) torn down or reset between tests, so here are sources
 
42
# of distinct names.
 
43
queue_names = ("queue.%d" % num for num in count(1))
 
44
key_names = ("key.%d" % num for num in count(1))
 
45
 
 
46
 
 
47
class FakeRabbitSession:
 
48
 
 
49
    def __init__(self):
 
50
        self.log = []
 
51
 
 
52
    def finish(self):
 
53
        self.log.append("finish")
 
54
 
 
55
    def reset(self):
 
56
        self.log.append("reset")
 
57
 
 
58
 
 
59
class TestRabbitSessionTransactionSync(TestCase):
 
60
 
 
61
    def test_interface(self):
 
62
        self.assertThat(
 
63
            RabbitSessionTransactionSync(None),
 
64
            Provides(transaction.interfaces.ISynchronizer))
 
65
 
 
66
    def test_afterCompletion_COMMITTED(self):
 
67
        txn = FakeTransaction()
 
68
        txn.status = TransactionStatus.COMMITTED
 
69
        fake_session = FakeRabbitSession()
 
70
        sync = RabbitSessionTransactionSync(fake_session)
 
71
        sync.afterCompletion(txn)
 
72
        self.assertEqual(["finish"], fake_session.log)
 
73
 
 
74
    def test_afterCompletion_ACTIVE(self):
 
75
        txn = FakeTransaction()
 
76
        txn.status = TransactionStatus.ACTIVE
 
77
        fake_session = FakeRabbitSession()
 
78
        sync = RabbitSessionTransactionSync(fake_session)
 
79
        sync.afterCompletion(txn)
 
80
        self.assertEqual(["reset"], fake_session.log)
 
81
 
 
82
 
 
83
class RabbitTestCase(TestCase):
 
84
 
24
85
    layer = RabbitMQLayer
25
86
 
26
 
    def setUp(self):
27
 
        super(TestCase, self).setUp()
28
 
        self.queue_name = 'whatever'
29
 
        self.queue = RabbitQueue(self.queue_name)
30
 
        self.key_name = "arbitrary.routing.key"
31
 
        self.key = RabbitRoutingKey(self.key_name)
32
 
        self.key.associateConsumer(self.queue)
33
 
 
34
87
    def tearDown(self):
35
 
        self.queue._disconnect()
36
 
        super(TestCase, self).tearDown()
37
 
 
38
 
    def test_implements(self):
39
 
        self.assertTrue(IMessageConsumer.providedBy(self.queue))
40
 
        self.assertTrue(IMessageProducer.providedBy(self.key))
41
 
 
42
 
    def test_send_now(self):
 
88
        super(RabbitTestCase, self).tearDown()
 
89
        global_session.reset()
 
90
        global_unreliable_session.reset()
 
91
 
 
92
 
 
93
class TestRabbitSession(RabbitTestCase):
 
94
 
 
95
    def test_interface(self):
 
96
        session = RabbitSession()
 
97
        self.assertThat(session, Provides(IMessageSession))
 
98
 
 
99
    def test_connect(self):
 
100
        session = RabbitSession()
 
101
        self.assertIs(None, session.connection)
 
102
        connection = session.connect()
 
103
        self.assertIsNot(None, session.connection)
 
104
        self.assertIs(connection, session.connection)
 
105
 
 
106
    def test_disconnect(self):
 
107
        session = RabbitSession()
 
108
        session.connect()
 
109
        session.disconnect()
 
110
        self.assertIs(None, session.connection)
 
111
 
 
112
    def test_connection(self):
 
113
        # The connection property is None once a connection has been closed.
 
114
        session = RabbitSession()
 
115
        session.connect()
 
116
        # Close the connection without using disconnect().
 
117
        session.connection.close()
 
118
        self.assertIs(None, session.connection)
 
119
 
 
120
    def test_defer(self):
 
121
        task = lambda foo, bar: None
 
122
        session = RabbitSession()
 
123
        session.defer(task, "foo", bar="baz")
 
124
        self.assertEqual(1, len(session._deferred))
 
125
        [deferred_task] = session._deferred
 
126
        self.assertIsInstance(deferred_task, partial)
 
127
        self.assertIs(task, deferred_task.func)
 
128
        self.assertEqual(("foo",), deferred_task.args)
 
129
        self.assertEqual({"bar": "baz"}, deferred_task.keywords)
 
130
 
 
131
    def test_flush(self):
 
132
        # RabbitSession.flush() runs deferred tasks.
 
133
        log = []
 
134
        task = lambda: log.append("task")
 
135
        session = RabbitSession()
 
136
        session.defer(task)
 
137
        session.connect()
 
138
        session.flush()
 
139
        self.assertEqual(["task"], log)
 
140
        self.assertEqual([], list(session._deferred))
 
141
        self.assertIsNot(None, session.connection)
 
142
 
 
143
    def test_reset(self):
 
144
        # RabbitSession.reset() resets session variables and does not run
 
145
        # deferred tasks.
 
146
        log = []
 
147
        task = lambda: log.append("task")
 
148
        session = RabbitSession()
 
149
        session.defer(task)
 
150
        session.connect()
 
151
        session.reset()
 
152
        self.assertEqual([], log)
 
153
        self.assertEqual([], list(session._deferred))
 
154
        self.assertIs(None, session.connection)
 
155
 
 
156
    def test_finish(self):
 
157
        # RabbitSession.finish() resets session variables after running
 
158
        # deferred tasks.
 
159
        log = []
 
160
        task = lambda: log.append("task")
 
161
        session = RabbitSession()
 
162
        session.defer(task)
 
163
        session.connect()
 
164
        session.finish()
 
165
        self.assertEqual(["task"], log)
 
166
        self.assertEqual([], list(session._deferred))
 
167
        self.assertIs(None, session.connection)
 
168
 
 
169
    def test_getProducer(self):
 
170
        session = RabbitSession()
 
171
        producer = session.getProducer("foo")
 
172
        self.assertIsInstance(producer, RabbitRoutingKey)
 
173
        self.assertIs(session, producer.session)
 
174
        self.assertEqual("foo", producer.key)
 
175
 
 
176
    def test_getConsumer(self):
 
177
        session = RabbitSession()
 
178
        consumer = session.getConsumer("foo")
 
179
        self.assertIsInstance(consumer, RabbitQueue)
 
180
        self.assertIs(session, consumer.session)
 
181
        self.assertEqual("foo", consumer.name)
 
182
 
 
183
 
 
184
class TestRabbitUnreliableSession(RabbitTestCase):
 
185
 
 
186
    def raise_AMQPException(self):
 
187
        raise amqp.AMQPException(123, "Suffin broke.", "Whut?")
 
188
 
 
189
    def test_finish_suppresses_some_errors(self):
 
190
        session = RabbitUnreliableSession()
 
191
        session.defer(self.raise_AMQPException)
 
192
        session.finish()
 
193
        # Look, no exceptions!
 
194
 
 
195
    def raise_Exception(self):
 
196
        raise Exception("That hent worked.")
 
197
 
 
198
    def test_finish_does_not_suppress_other_errors(self):
 
199
        session = RabbitUnreliableSession()
 
200
        session.defer(self.raise_Exception)
 
201
        self.assertRaises(Exception, session.finish)
 
202
 
 
203
 
 
204
class TestRabbitMessageBase(RabbitTestCase):
 
205
 
 
206
    def test_session(self):
 
207
        base = RabbitMessageBase(global_session)
 
208
        self.assertIs(global_session, base.session)
 
209
 
 
210
    def test_channel(self):
 
211
        # Referencing the channel property causes the session to connect.
 
212
        base = RabbitMessageBase(global_session)
 
213
        self.assertIs(None, base.session.connection)
 
214
        channel = base.channel
 
215
        self.assertIsNot(None, base.session.connection)
 
216
        self.assertIsNot(None, channel)
 
217
        # The same channel is returned every time.
 
218
        self.assertIs(channel, base.channel)
 
219
 
 
220
    def test_channel_session_closed(self):
 
221
        # When the session is disconnected the channel is thrown away too.
 
222
        base = RabbitMessageBase(global_session)
 
223
        channel1 = base.channel
 
224
        base.session.disconnect()
 
225
        channel2 = base.channel
 
226
        self.assertNotEqual(channel1, channel2)
 
227
 
 
228
 
 
229
class TestRabbitRoutingKey(RabbitTestCase):
 
230
 
 
231
    def test_interface(self):
 
232
        routing_key = RabbitRoutingKey(global_session, next(key_names))
 
233
        self.assertThat(routing_key, Provides(IMessageProducer))
 
234
 
 
235
    def test_associateConsumer(self):
 
236
        # associateConsumer() only associates the consumer at transaction
 
237
        # commit time. However, order is preserved.
 
238
        consumer = RabbitQueue(global_session, next(queue_names))
 
239
        routing_key = RabbitRoutingKey(global_session, next(key_names))
 
240
        routing_key.associateConsumer(consumer)
 
241
        routing_key.sendNow('now')
 
242
        routing_key.send('later')
 
243
        # There is nothing in the queue because the consumer has not yet been
 
244
        # associated with the routing key.
 
245
        self.assertRaises(EmptyQueueException, consumer.receive, timeout=2)
 
246
        transaction.commit()
 
247
        # Now that the transaction has been committed, the consumer is
 
248
        # associated, and receives the deferred message.
 
249
        self.assertEqual('later', consumer.receive(timeout=2))
 
250
 
 
251
    def test_associateConsumerNow(self):
 
252
        # associateConsumerNow() associates the consumer right away.
 
253
        consumer = RabbitQueue(global_session, next(queue_names))
 
254
        routing_key = RabbitRoutingKey(global_session, next(key_names))
 
255
        routing_key.associateConsumerNow(consumer)
 
256
        routing_key.sendNow('now')
 
257
        routing_key.send('later')
 
258
        # There is already something in the queue.
 
259
        self.assertEqual('now', consumer.receive(timeout=2))
 
260
        transaction.commit()
 
261
        # Now that the transaction has been committed there is another item in
 
262
        # the queue.
 
263
        self.assertEqual('later', consumer.receive(timeout=2))
 
264
 
 
265
    def test_send(self):
 
266
        consumer = RabbitQueue(global_session, next(queue_names))
 
267
        routing_key = RabbitRoutingKey(global_session, next(key_names))
 
268
        routing_key.associateConsumerNow(consumer)
 
269
 
 
270
        for data in range(90, 100):
 
271
            routing_key.send(data)
 
272
 
 
273
        routing_key.sendNow('sync')
 
274
        # There is nothing in the queue except the sync we just sent.
 
275
        self.assertEqual('sync', consumer.receive(timeout=2))
 
276
 
 
277
        # Messages get sent on commit
 
278
        transaction.commit()
 
279
        for data in range(90, 100):
 
280
            self.assertEqual(data, consumer.receive())
 
281
 
 
282
        # There are no more messages. They have all been consumed.
 
283
        routing_key.sendNow('sync')
 
284
        self.assertEqual('sync', consumer.receive(timeout=2))
 
285
 
 
286
    def test_sendNow(self):
 
287
        consumer = RabbitQueue(global_session, next(queue_names))
 
288
        routing_key = RabbitRoutingKey(global_session, next(key_names))
 
289
        routing_key.associateConsumerNow(consumer)
 
290
 
43
291
        for data in range(50, 60):
44
 
            self.key.send_now(data)
45
 
            received_data = self.queue.receive(timeout=5)
46
 
            self.assertEqual(received_data, data)
47
 
 
48
 
    def test_receive_consumes(self):
 
292
            routing_key.sendNow(data)
 
293
            received_data = consumer.receive(timeout=2)
 
294
            self.assertEqual(data, received_data)
 
295
 
 
296
 
 
297
class TestRabbitQueue(RabbitTestCase):
 
298
 
 
299
    def test_interface(self):
 
300
        consumer = RabbitQueue(global_session, next(queue_names))
 
301
        self.assertThat(consumer, Provides(IMessageConsumer))
 
302
 
 
303
    def test_receive(self):
 
304
        consumer = RabbitQueue(global_session, next(queue_names))
 
305
        routing_key = RabbitRoutingKey(global_session, next(key_names))
 
306
        routing_key.associateConsumerNow(consumer)
 
307
 
49
308
        for data in range(55, 65):
50
 
            self.key.send_now(data)
51
 
            self.assertEqual(self.queue.receive(timeout=5), data)
 
309
            routing_key.sendNow(data)
 
310
            self.assertEqual(data, consumer.receive(timeout=2))
52
311
 
53
 
        # None of the messages we received were put back. They were all
54
 
        # consumed.
 
312
        # All the messages received were consumed.
55
313
        self.assertRaises(
56
314
            EmptyQueueException,
57
 
            self.queue.receive, timeout=5)
 
315
            consumer.receive, timeout=2)
58
316
 
59
317
        # New connections to the queue see an empty queue too.
60
 
        self.queue._disconnect()
61
 
        key = RabbitRoutingKey(self.key_name)
62
 
        queue = RabbitQueue(self.queue_name)
63
 
        key.associateConsumer(queue)
64
 
        key.send_now('new conn sync')
65
 
        self.assertEqual(queue.receive(timeout=5), 'new conn sync')
66
 
 
67
 
    def test_send(self):
68
 
        for data in range(90, 100):
69
 
            self.key.send(data)
70
 
 
71
 
        self.key.send_now('sync')
72
 
        # There is nothing in the queue except the sync we just sent.
73
 
        self.assertEqual(self.queue.receive(timeout=5), 'sync')
74
 
 
75
 
        # Messages get sent on commit
76
 
        transaction.commit()
77
 
        for data in range(90, 100):
78
 
            self.assertEqual(self.queue.receive(), data)
79
 
 
80
 
        # There are no more messages. They have all been consumed.
81
 
        self.key.send_now('sync')
82
 
        self.assertEqual(self.queue.receive(timeout=5), 'sync')
 
318
        consumer.session.disconnect()
 
319
        consumer = RabbitQueue(global_session, next(queue_names))
 
320
        routing_key = RabbitRoutingKey(global_session, next(key_names))
 
321
        routing_key.associateConsumerNow(consumer)
 
322
        self.assertRaises(
 
323
            EmptyQueueException,
 
324
            consumer.receive, timeout=2)
 
325
 
 
326
 
 
327
class TestRabbit(RabbitTestCase):
 
328
    """Integration-like tests for the RabbitMQ messaging abstractions."""
 
329
 
 
330
    def get_synced_sessions(self):
 
331
        thread_id = thread.get_ident()
 
332
        try:
 
333
            syncs_set = transaction.manager._synchs[thread_id]
 
334
        except KeyError:
 
335
            return set()
 
336
        else:
 
337
            return set(
 
338
                sync.session for sync in syncs_set.data.itervalues()
 
339
                if isinstance(sync, RabbitSessionTransactionSync))
 
340
 
 
341
    def test_global_session(self):
 
342
        self.assertIsInstance(global_session, RabbitSession)
 
343
        self.assertIn(global_session, self.get_synced_sessions())
 
344
 
 
345
    def test_global_unreliable_session(self):
 
346
        self.assertIsInstance(
 
347
            global_unreliable_session, RabbitUnreliableSession)
 
348
        self.assertIn(global_unreliable_session, self.get_synced_sessions())
83
349
 
84
350
    def test_abort(self):
 
351
        consumer = RabbitQueue(global_session, next(queue_names))
 
352
        routing_key = RabbitRoutingKey(global_session, next(key_names))
 
353
        routing_key.associateConsumerNow(consumer)
 
354
 
85
355
        for data in range(90, 100):
86
 
            self.key.send(data)
87
 
 
88
 
        self.key.send_now('sync')
89
 
        # There is nothing in the queue except the sync we just sent.
90
 
        self.assertEqual(self.queue.receive(timeout=5), 'sync')
91
 
 
92
 
        # Messages get forgotten on abort.
 
356
            routing_key.send(data)
 
357
 
 
358
        # Messages sent using send() are forgotten on abort.
93
359
        transaction.abort()
94
 
 
95
 
        # There are no more messages. They have all been consumed.
96
 
        self.key.send_now('sync2')
97
 
        self.assertEqual(self.queue.receive(timeout=5), 'sync2')
 
360
        self.assertRaises(
 
361
            EmptyQueueException,
 
362
            consumer.receive, timeout=2)
 
363
 
 
364
 
 
365
class TestRabbitWithLaunchpad(RabbitTestCase):
 
366
    """Integration-like tests for the RabbitMQ messaging abstractions."""
 
367
 
 
368
    layer = LaunchpadFunctionalLayer
 
369
 
 
370
    def test_utility(self):
 
371
        # The unreliable session is registered as the default IMessageSession
 
372
        # utility.
 
373
        self.assertIs(
 
374
            global_unreliable_session,
 
375
            getUtility(IMessageSession))