1
1
# Copyright 2011 Canonical Ltd. This software is licensed under the
2
2
# GNU Affero General Public License version 3 (see the file LICENSE).
4
"""Test server fixture for RabbitMQ."""
4
"""Test server fixtures for RabbitMQ."""
21
from amqplib import client_0_8 as amqp
16
22
from fixtures import (
17
23
EnvironmentVariableFixture,
21
from testtools.content import Content
27
from testtools.content import (
22
31
from testtools.content_type import UTF8_TEXT
24
from amqplib import client_0_8 as amqp
26
33
# The default binaries have a check that the running use is uid 0 or uname
27
34
# 'rabbitmq', neither of which are needed to operate correctly. So we run the
28
35
# actual erlang binaries.
29
36
RABBITBIN = "/usr/lib/rabbitmq/bin"
32
def setup_exchange(conf, port):
33
""" create an exchange """
35
conn = _get_connection(conf, port)
36
# see if we already have the exchange
40
chan.exchange_declare(exchange=conf.exchange_name + BRANCH_NICK,
41
type=conf.exchange_type, passive=True)
42
except (amqp.AMQPConnectionException, amqp.AMQPChannelException), e:
43
if e.amqp_reply_code == 404:
45
# amqplib kills the channel on error.... we dispose of it too
50
# now create the exchange if needed
52
chan.exchange_declare(exchange=conf.exchange_name + BRANCH_NICK,
53
type=conf.exchange_type,
54
durable=True, auto_delete=False,)
55
print "Created new exchange %s (%s)" % (
56
conf.exchange_name + BRANCH_NICK, conf.exchange_type)
58
print "Exchange %s (%s) is already declared" % (
59
conf.exchange_name + BRANCH_NICK, conf.exchange_type)
39
# def setup_exchange(conf, port):
40
# """ create an exchange """
42
# conn = _get_connection(conf, port)
43
# # see if we already have the exchange
45
# chan = conn.channel()
47
# chan.exchange_declare(exchange=conf.exchange_name + BRANCH_NICK,
48
# type=conf.exchange_type, passive=True)
49
# except (amqp.AMQPConnectionException, amqp.AMQPChannelException), e:
50
# if e.amqp_reply_code == 404:
52
# # amqplib kills the channel on error.... we dispose of it too
54
# chan = conn.channel()
57
# # now create the exchange if needed
59
# chan.exchange_declare(exchange=conf.exchange_name + BRANCH_NICK,
60
# type=conf.exchange_type,
61
# durable=True, auto_delete=False,)
62
# print "Created new exchange %s (%s)" % (
63
# conf.exchange_name + BRANCH_NICK, conf.exchange_type)
65
# print "Exchange %s (%s) is already declared" % (
66
# conf.exchange_name + BRANCH_NICK, conf.exchange_type)
65
72
def os_exec(*args):
66
""" warpper for os.execve() that catches execution errors """
73
"""Wrapper for `os.execve()` that catches execution errors."""
68
75
os.execv(args[0], args)
125
""" provides status information about the RabbitMQ server """
127
nodename = _get_nodename()
128
if not _check_running():
129
print "ERROR: RabbitMQ node %s is not running" % nodename
131
for act in ["list_exchanges", "list_queues"]:
132
outstr, errstr = _rabbitctl(act, strip=True)
134
print >> sys.stderr, errstr
131
# """ provides status information about the RabbitMQ server """
133
# nodename = _get_nodename()
134
# if not _check_running():
135
# print "ERROR: RabbitMQ node %s is not running" % nodename
137
# for act in ["list_exchanges", "list_queues"]:
138
# outstr, errstr = _rabbitctl(act, strip=True)
140
# print >> sys.stderr, errstr
140
146
def allocate_ports(n=1):
142
Allocate n unused ports
147
"""Allocate `n` unused ports.
144
There is a small race condition here (between the time we allocate
145
the port, and the time it actually gets used), but for the purposes
146
for which this function gets used it isn't a problem in practice.
149
There is a small race condition here (between the time we allocate the
150
port, and the time it actually gets used), but for the purposes for which
151
this function gets used it isn't a problem in practice.
148
153
sockets = map(lambda _: socket.socket(), xrange(n))
150
155
for s in sockets:
151
156
s.bind(('localhost', 0))
152
ports = map(lambda s: s.getsockname()[1], sockets)
157
return map(lambda s: s.getsockname()[1], sockets)
159
class AllocateRabbitServer(Fixture):
160
"""Allocate the resources a rabbit server needs.
162
:ivar hostname: The host the rabbit is on (always localhost for
163
AllocateRabbitServer).
163
class RabbitFixture(Fixture):
164
"""Common fixture stuff for dealing with RabbitMQ servers.
166
In particular this adopts detail handling code from `testtools` so that
167
details from sub-fixtures are propagated up to the test case.
170
def useFixture(self, fixture):
171
super(RabbitFixture, self).useFixture(fixture)
172
self.addCleanup(self._gather_details, fixture.getDetails)
175
def _gather_details(self, getDetails):
176
"""Merge the details from getDetails() into self.getDetails().
178
Shamelessly adapted from `testtools.TestCase._gather_details`.
180
details = getDetails()
181
my_details = self.getDetails()
182
for name, content_object in details.items():
184
disambiguator = itertools.count(1)
185
while new_name in my_details:
186
new_name = '%s-%d' % (name, next(disambiguator))
188
content_bytes = list(content_object.iter_bytes())
189
content_callback = lambda: content_bytes
191
Content(content_object.content_type, content_callback))
194
class RabbitServerResources(RabbitFixture):
195
"""Allocate the resources a RabbitMQ server needs.
197
:ivar hostname: The host the RabbitMQ is on (always localhost for
198
`RabbitServerResources`).
164
199
:ivar port: A port that was free at the time setUp() was called.
165
:ivar rabbitdir: A directory to put the rabbit logs in.
166
:ivar mnesiadir: A directory for the rabbit db.
200
:ivar rabbitdir: A directory to put the RabbitMQ logs in.
201
:ivar mnesiadir: A directory for the RabbitMQ db.
167
202
:ivar logfile: The logfile allocated for the server.
168
203
:ivar pidfile: The file the pid should be written to.
169
204
:ivar nodename: The name of the node.
172
super(AllocateRabbitServer, self).setUp()
207
super(RabbitServerResources, self).setUp()
173
208
self.hostname = 'localhost'
174
209
self.port = allocate_ports()[0]
175
210
self.rabbitdir = self.useFixture(TempDir()).path
178
213
self.pidfile = os.path.join(self.rabbitdir, 'rabbit.pid')
179
214
self.nodename = os.path.basename(self.useFixture(TempDir()).path)
181
217
def fq_nodename(self):
182
"""Get the node of the rabbit that is being exported."""
218
"""The node of the RabbitMQ that is being exported."""
183
219
# Note that socket.gethostname is recommended by the rabbitctl manpage
184
220
# even though we're always on localhost, its what the erlang cluster
186
222
return "%s@%s" % (self.nodename, socket.gethostname())
189
class ExportRabbitServer(Fixture):
190
"""Export the environmen variables needed to talk to a rabbit instance.
192
When setup this exports the key rabbit variables::
193
* RABBITMQ_MNESIA_BASE
225
class RabbitServerEnvironment(RabbitFixture):
226
"""Export the environment variables needed to talk to a RabbitMQ instance.
228
When setup this exports the key RabbitMQ variables:
230
- ``RABBITMQ_MNESIA_BASE``
231
- ``RABBITMQ_LOG_BASE``
232
- ``RABBITMQ_NODE_PORT``
233
- ``RABBITMQ_NODENAME``
199
237
def __init__(self, config):
200
"""Create a ExportRabbitServer instance.
238
"""Create a `RabbitServerEnvironment` instance.
202
:param config: An object exporting the variables `AllocateRabbitServer`
240
:param config: An object exporting the variables
241
`RabbitServerResources` exports.
205
super(ExportRabbitServer, self).__init__()
243
super(RabbitServerEnvironment, self).__init__()
206
244
self.config = config
209
super(ExportRabbitServer, self).setUp()
247
super(RabbitServerEnvironment, self).setUp()
210
248
self.useFixture(EnvironmentVariableFixture(
211
249
"RABBITMQ_MNESIA_BASE", self.config.mnesiadir))
212
250
self.useFixture(EnvironmentVariableFixture(
230
269
def rabbitctl(self, command, strip=False):
231
""" executes a rabbitctl command and returns status """
232
ctlbin = RABBITBIN + "/rabbitmqctl"
233
nodename = self.config.fq_nodename()
270
"""Executes a ``rabbitctl`` command and returns status."""
271
ctlbin = os.path.join(RABBITBIN, "rabbitmqctl")
272
nodename = self.config.fq_nodename
234
273
env = dict(os.environ)
235
274
env['HOME'] = self.config.rabbitdir
236
ctl = subprocess.Popen((ctlbin, "-n", nodename, command),
237
stdout = subprocess.PIPE,
238
stderr = subprocess.PIPE, env=env)
275
ctl = subprocess.Popen(
276
(ctlbin, "-n", nodename, command), env=env,
277
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
239
278
outstr, errstr = ctl.communicate()
241
280
return outstr.strip(), errstr.strip()
242
281
return outstr, errstr
244
283
def check_running(self):
245
""" checks that the rabbitmq process is up and running """
246
nodename = self.config.fq_nodename()
284
"""Checks that RabbitMQ is up and running."""
285
nodename = self.config.fq_nodename
247
286
outdata, errdata = self.rabbitctl("status")
249
288
self._errors.append(errdata)
269
308
self._errors.append(outdata)
271
found_node = match.groupdict()['nodename']
310
found_node = match.group('nodename')
272
311
return found_node == nodename
274
313
def get_connection(self):
275
314
"""Get an AMQP connection to the RabbitMQ server.
277
316
:raises socket.error: If the connection cannot be made.
279
318
host_port = "%s:%s" % (self.config.hostname, self.config.port)
280
conn = amqp.Connection(host=host_port, userid="guest",
281
password="guest", virtual_host="/", insist=False)
285
class RunRabbitServer(Fixture):
286
"""Run a rabbit server.
319
return amqp.Connection(
320
host=host_port, userid="guest",
321
password="guest", virtual_host="/", insist=False)
324
class RabbitServerRunner(RabbitFixture):
325
"""Run a RabbitMQ server.
288
327
:ivar pid: The pid of the server.
291
330
def __init__(self, config):
292
"""Create a RunRabbitServer instance.
331
"""Create a `RabbitServerRunner` instance.
294
:param config: An object exporting the variables `AllocateRabbitServer`
333
:param config: An object exporting the variables
334
`RabbitServerResources` exports.
297
super(RunRabbitServer, self).__init__()
336
super(RabbitServerRunner, self).__init__()
298
337
self.config = config
301
super(RunRabbitServer, self).setUp()
302
self.rabbit = self.useFixture(ExportRabbitServer(self.config))
303
# Workaround fixtures not adding details from used fixtures.
304
self.addDetail('rabbitctl errors',
305
Content(UTF8_TEXT, self.rabbit._get_errors))
306
self.addDetail('rabbit log file',
307
Content(UTF8_TEXT, lambda:[file(self.config.logfile, 'rb').read()]))
308
cmd = RABBITBIN + '/rabbitmq-server'
340
super(RabbitServerRunner, self).setUp()
341
self.environment = self.useFixture(
342
RabbitServerEnvironment(self.config))
346
cmd = os.path.join(RABBITBIN, 'rabbitmq-server')
309
347
name = "RabbitMQ server node:%s on port:%d" % (
310
348
self.config.nodename, self.config.port)
311
349
daemon(name, self.config.logfile, self.config.pidfile, command=cmd,
312
350
homedir=self.config.rabbitdir)
313
# now wait about 5 secs for it to start
352
os.path.basename(self.config.logfile),
353
content_from_file(self.config.logfile))
354
# Wait for the server to come up...
314
355
timeout = time.time() + 5
316
if self.rabbit.check_running():
356
while time.time() < timeout:
357
if self.environment.check_running():
318
elif time.time() > timeout:
319
raise Exception('Timeout waiting for rabbit OTP server to start.')
320
# The erlang OTP is up, but rabbit may not be usable. We need to
321
# cleanup up the process from here on in even if the full service fails
323
self.addCleanup(self.stop)
362
"Timeout waiting for RabbitMQ OTP server to start.")
363
# The erlang OTP is up, but RabbitMQ may not be usable. We need to
364
# cleanup up the process from here on in even if the full service
365
# fails to get together.
366
self.addCleanup(self._stop)
367
# Wait until the server is ready...
368
while time.time() < timeout:
325
369
# rabbitctl can say a node is up before it is ready to
326
370
# accept connections ... :-(
328
conn = self.rabbit.get_connection()
372
conn = self.environment.get_connection()
329
373
except socket.error:
335
if time.time() > timeout:
336
raise Exception('Timeout waiting for rabbit to start listening.')
337
# all should be well here
380
"Timeout waiting for RabbitMQ to start listening.")
381
# All should be well here.
338
382
with open(self.config.pidfile, "r") as f:
339
383
self.pid = int(f.read().strip())
342
386
"""Stop the running server. Normally called by cleanups."""
343
if not self.rabbit.check_running():
387
if not self.environment.check_running():
344
388
# If someone has shut it down already, we're done.
346
outstr, errstr = self.rabbit.rabbitctl("stop", strip=True)
390
outstr, errstr = self.environment.rabbitctl("stop", strip=True)
348
self.addDetail('stop-out', Content(UTF8_TEXT, lambda:[outstr]))
392
self.addDetail('stop-out', Content(UTF8_TEXT, lambda: [outstr]))
350
self.addDetail('stop-err', Content(UTF8_TEXT, lambda:[errstr]))
351
# wait for the process to finish...
394
self.addDetail('stop-err', Content(UTF8_TEXT, lambda: [errstr]))
395
# Wait for the server to go down...
352
396
timeout = time.time() + 15
353
while self.rabbit.check_running():
397
while time.time() < timeout:
398
if not self.environment.check_running():
355
if time.time() > timeout:
357
"Error - reached timeout waiting for RabbitMQ shutdown")
403
"Timeout waiting for RabbitMQ shutdown.")
404
# Wait for the process to end...
358
405
while time.time() < timeout:
360
407
os.kill(self.pid, 0)
361
408
except OSError, e:
362
409
if e.errno == errno.ESRCH:
365
if time.time() > timeout:
367
"Error - rabbit pid %d did not quit." % (pid,))
370
class RabbitServer(Fixture):
416
"RabbitMQ (pid=%d) did not quit." % (self.pid,))
419
class RabbitServer(RabbitFixture):
371
420
"""A RabbitMQ server fixture.
373
When setup a rabbit instance will be running and the environment variables
374
needed to talk to it will be already configured.
422
When setup a RabbitMQ instance will be running and the environment
423
variables needed to talk to it will be already configured.
376
:ivar config: The `AllocateRabbitServer` used to start the rabbit.
425
:ivar config: The `RabbitServerResources` used to start the server.
426
:ivar runner: The `RabbitServerRunner` that bootstraps the server.
380
430
super(RabbitServer, self).setUp()
381
self.config = self.useFixture(AllocateRabbitServer())
382
self.server = RunRabbitServer(self.config)
383
self.useFixture(self.server)
385
def getDetails(self):
386
return self.server.getDetails()
431
self.config = self.useFixture(RabbitServerResources())
432
self.runner = self.useFixture(RabbitServerRunner(self.config))