~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/services/messaging/tests/test_rabbit.py

[r=benji,
        rvb][no-qa] Ensure that message sessions based on RabbitSession are
        not connected by merely creating consumers or producers.

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
    RabbitMQLayer,
21
21
    )
22
22
from lp.services.messaging.interfaces import (
23
 
    EmptyQueue,
24
23
    IMessageConsumer,
25
24
    IMessageProducer,
26
25
    IMessageSession,
27
26
    MessagingException,
28
27
    MessagingUnavailable,
 
28
    QueueEmpty,
 
29
    QueueNotFound,
29
30
    )
30
31
from lp.services.messaging.rabbit import (
31
32
    RabbitMessageBase,
101
102
 
102
103
    def test_connect(self):
103
104
        session = RabbitSession()
104
 
        self.assertIs(None, session.connection)
 
105
        self.assertFalse(session.is_connected)
105
106
        connection = session.connect()
106
 
        self.assertIsNot(None, session.connection)
107
 
        self.assertIs(connection, session.connection)
 
107
        self.assertTrue(session.is_connected)
 
108
        self.assertIs(connection, session._connection)
108
109
 
109
110
    def test_connect_with_incomplete_configuration(self):
110
111
        self.pushConfig("rabbitmq", host="none")
117
118
        session = RabbitSession()
118
119
        session.connect()
119
120
        session.disconnect()
120
 
        self.assertIs(None, session.connection)
 
121
        self.assertFalse(session.is_connected)
121
122
 
122
 
    def test_connection(self):
123
 
        # The connection property is None once a connection has been closed.
 
123
    def test_is_connected(self):
 
124
        # is_connected is False once a connection has been closed.
124
125
        session = RabbitSession()
125
126
        session.connect()
126
127
        # Close the connection without using disconnect().
127
 
        session.connection.close()
128
 
        self.assertIs(None, session.connection)
 
128
        session._connection.close()
 
129
        self.assertFalse(session.is_connected)
129
130
 
130
131
    def test_defer(self):
131
132
        task = lambda foo, bar: None
148
149
        session.flush()
149
150
        self.assertEqual(["task"], log)
150
151
        self.assertEqual([], list(session._deferred))
151
 
        self.assertIsNot(None, session.connection)
 
152
        self.assertTrue(session.is_connected)
152
153
 
153
154
    def test_reset(self):
154
155
        # RabbitSession.reset() resets session variables and does not run
161
162
        session.reset()
162
163
        self.assertEqual([], log)
163
164
        self.assertEqual([], list(session._deferred))
164
 
        self.assertIs(None, session.connection)
 
165
        self.assertFalse(session.is_connected)
165
166
 
166
167
    def test_finish(self):
167
168
        # RabbitSession.finish() resets session variables after running
174
175
        session.finish()
175
176
        self.assertEqual(["task"], log)
176
177
        self.assertEqual([], list(session._deferred))
177
 
        self.assertIs(None, session.connection)
 
178
        self.assertFalse(session.is_connected)
178
179
 
179
180
    def test_getProducer(self):
180
181
        session = RabbitSession()
238
239
    def test_channel(self):
239
240
        # Referencing the channel property causes the session to connect.
240
241
        base = RabbitMessageBase(global_session)
241
 
        self.assertIs(None, base.session.connection)
 
242
        self.assertFalse(base.session.is_connected)
242
243
        channel = base.channel
243
 
        self.assertIsNot(None, base.session.connection)
 
244
        self.assertTrue(base.session.is_connected)
244
245
        self.assertIsNot(None, channel)
245
246
        # The same channel is returned every time.
246
247
        self.assertIs(channel, base.channel)
266
267
        consumer = RabbitQueue(global_session, next(queue_names))
267
268
        routing_key = RabbitRoutingKey(global_session, next(key_names))
268
269
        routing_key.associateConsumer(consumer)
 
270
        # The session is still not connected.
 
271
        self.assertFalse(global_session.is_connected)
269
272
        routing_key.sendNow('now')
270
273
        routing_key.send('later')
271
 
        # There is nothing in the queue because the consumer has not yet been
272
 
        # associated with the routing key.
273
 
        self.assertRaises(EmptyQueue, consumer.receive, timeout=2)
 
274
        # The queue is not found because the consumer has not yet been
 
275
        # associated with the routing key and the queue declared.
 
276
        self.assertRaises(QueueNotFound, consumer.receive, timeout=2)
274
277
        transaction.commit()
275
278
        # Now that the transaction has been committed, the consumer is
276
279
        # associated, and receives the deferred message.
321
324
            received_data = consumer.receive(timeout=2)
322
325
            self.assertEqual(data, received_data)
323
326
 
 
327
    def test_does_not_connect_session_immediately(self):
 
328
        # RabbitRoutingKey does not connect the session until necessary.
 
329
        RabbitRoutingKey(global_session, next(key_names))
 
330
        self.assertFalse(global_session.is_connected)
 
331
 
324
332
 
325
333
class TestRabbitQueue(RabbitTestCase):
326
334
 
338
346
            self.assertEqual(data, consumer.receive(timeout=2))
339
347
 
340
348
        # All the messages received were consumed.
341
 
        self.assertRaises(EmptyQueue, consumer.receive, timeout=2)
 
349
        self.assertRaises(QueueEmpty, consumer.receive, timeout=2)
342
350
 
343
351
        # New connections to the queue see an empty queue too.
344
352
        consumer.session.disconnect()
345
353
        consumer = RabbitQueue(global_session, next(queue_names))
346
354
        routing_key = RabbitRoutingKey(global_session, next(key_names))
347
355
        routing_key.associateConsumerNow(consumer)
348
 
        self.assertRaises(EmptyQueue, consumer.receive, timeout=2)
 
356
        self.assertRaises(QueueEmpty, consumer.receive, timeout=2)
 
357
 
 
358
    def test_does_not_connect_session_immediately(self):
 
359
        # RabbitQueue does not connect the session until necessary.
 
360
        RabbitQueue(global_session, next(queue_names))
 
361
        self.assertFalse(global_session.is_connected)
349
362
 
350
363
 
351
364
class TestRabbit(RabbitTestCase):
381
394
 
382
395
        # Messages sent using send() are forgotten on abort.
383
396
        transaction.abort()
384
 
        self.assertRaises(EmptyQueue, consumer.receive, timeout=2)
 
397
        self.assertRaises(QueueEmpty, consumer.receive, timeout=2)
385
398
 
386
399
 
387
400
class TestRabbitWithLaunchpad(RabbitTestCase):