10
10
from amqplib import client_0_8 as amqp
11
11
from fixtures import EnvironmentVariableFixture
13
from lp.services.rabbit.testing.server import RabbitServer
13
14
from lp.testing import TestCase
14
from lp.services.rabbit.testing.server import RabbitServer
17
17
class TestRabbitFixture(TestCase):
21
21
# .erlange.cookie has to be ignored, and ditto bogus HOME if other
22
22
# tests fail to cleanup.
23
23
self.useFixture(EnvironmentVariableFixture('HOME', '/nonsense/value'))
24
25
fixture = RabbitServer()
26
# Workaround failures-in-setup-not-attaching-details (if they did
27
# we could use self.useFixture).
28
self.addCleanup(self._gather_details, fixture.getDetails)
27
# Work around failures-in-setup-not-attaching-details (if they did we
28
# could use self.useFixture).
29
self.addCleanup(self._gather_details, fixture.getDetails)
31
host = 'localhost:%s' % fixture.config.port
32
conn = amqp.Connection(host=host, userid="guest",
33
password="guest", virtual_host="/", insist=False)
36
log = fixture.getDetails()['rabbit log file']
34
"host": 'localhost:%s' % fixture.config.port,
35
"userid": "guest", "password": "guest",
36
"virtual_host": "/", "insist": False,
38
amqp.Connection(**connect_arguments).close()
40
log = fixture.runner.getDetails()["rabbit.log"]
37
41
# Which shouldn't blow up on iteration.
38
42
list(log.iter_text())
41
44
# The daemon should be closed now.
42
self.assertRaises(socket.error, amqp.Connection, host=host,
43
userid="guest", password="guest", virtual_host="/", insist=False)
45
self.assertRaises(socket.error, amqp.Connection, **connect_arguments)