~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/services/rabbit/testing/server.py

  • Committer: Launchpad Patch Queue Manager
  • Date: 2011-05-23 19:55:29 UTC
  • mfrom: (13047.3.18 job-start-cleanup)
  • Revision ID: launchpad@pqm.canonical.com-20110523195529-zq0tcaiqayia2c03
[r=allenap][no-qa] Fix a tight loop during RabbitMQ fixture set-up,
        and lots of other clean-ups and incremental improvements.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
# Copyright 2011 Canonical Ltd.  This software is licensed under the
2
2
# GNU Affero General Public License version 3 (see the file LICENSE).
3
3
 
4
 
"""Test server fixture for RabbitMQ."""
 
4
"""Test server fixtures for RabbitMQ."""
 
5
 
 
6
__metaclass__ = type
 
7
 
 
8
__all__ = [
 
9
    "RabbitServer",
 
10
    ]
5
11
 
6
12
import errno
 
13
import itertools
7
14
import os
8
15
import re
9
 
import sys
10
16
import socket
11
 
import optparse
12
17
import subprocess
 
18
import sys
13
19
import time
14
20
 
15
 
import bzrlib.branch
 
21
from amqplib import client_0_8 as amqp
16
22
from fixtures import (
17
23
    EnvironmentVariableFixture,
18
24
    Fixture,
19
25
    TempDir,
20
26
    )
21
 
from testtools.content import Content
 
27
from testtools.content import (
 
28
    Content,
 
29
    content_from_file,
 
30
    )
22
31
from testtools.content_type import UTF8_TEXT
23
32
 
24
 
from amqplib import client_0_8 as amqp
25
 
 
26
33
# The default binaries have a check that the running use is uid 0 or uname
27
34
# 'rabbitmq', neither of which are needed to operate correctly. So we run the
28
35
# actual erlang binaries.
29
36
RABBITBIN = "/usr/lib/rabbitmq/bin"
30
37
 
31
38
 
32
 
def setup_exchange(conf, port):
33
 
    """ create an exchange """
34
 
    # Not ported yet.
35
 
    conn = _get_connection(conf, port)
36
 
    # see if we already have the exchange
37
 
    must_create = False
38
 
    chan = conn.channel()
39
 
    try:
40
 
        chan.exchange_declare(exchange=conf.exchange_name + BRANCH_NICK,
41
 
                              type=conf.exchange_type, passive=True)
42
 
    except (amqp.AMQPConnectionException, amqp.AMQPChannelException), e:
43
 
        if e.amqp_reply_code == 404:
44
 
            must_create = True
45
 
            # amqplib kills the channel on error.... we dispose of it too
46
 
            chan.close()
47
 
            chan = conn.channel()
48
 
        else:
49
 
            raise
50
 
    # now create the exchange if needed
51
 
    if must_create:
52
 
        chan.exchange_declare(exchange=conf.exchange_name + BRANCH_NICK,
53
 
                              type=conf.exchange_type,
54
 
                              durable=True, auto_delete=False,)
55
 
        print "Created new exchange %s (%s)" % (
56
 
            conf.exchange_name + BRANCH_NICK, conf.exchange_type)
57
 
    else:
58
 
        print "Exchange %s (%s) is already declared" % (
59
 
            conf.exchange_name + BRANCH_NICK, conf.exchange_type)
60
 
    chan.close()
61
 
    conn.close()
62
 
    return True
 
39
# def setup_exchange(conf, port):
 
40
#     """ create an exchange """
 
41
#     # Not ported yet.
 
42
#     conn = _get_connection(conf, port)
 
43
#     # see if we already have the exchange
 
44
#     must_create = False
 
45
#     chan = conn.channel()
 
46
#     try:
 
47
#         chan.exchange_declare(exchange=conf.exchange_name + BRANCH_NICK,
 
48
#                               type=conf.exchange_type, passive=True)
 
49
#     except (amqp.AMQPConnectionException, amqp.AMQPChannelException), e:
 
50
#         if e.amqp_reply_code == 404:
 
51
#             must_create = True
 
52
#             # amqplib kills the channel on error.... we dispose of it too
 
53
#             chan.close()
 
54
#             chan = conn.channel()
 
55
#         else:
 
56
#             raise
 
57
#     # now create the exchange if needed
 
58
#     if must_create:
 
59
#         chan.exchange_declare(exchange=conf.exchange_name + BRANCH_NICK,
 
60
#                               type=conf.exchange_type,
 
61
#                               durable=True, auto_delete=False,)
 
62
#         print "Created new exchange %s (%s)" % (
 
63
#             conf.exchange_name + BRANCH_NICK, conf.exchange_type)
 
64
#     else:
 
65
#         print "Exchange %s (%s) is already declared" % (
 
66
#             conf.exchange_name + BRANCH_NICK, conf.exchange_type)
 
67
#     chan.close()
 
68
#     conn.close()
 
69
#     return True
63
70
 
64
71
 
65
72
def os_exec(*args):
66
 
    """ warpper for os.execve() that catches execution errors """
 
73
    """Wrapper for `os.execve()` that catches execution errors."""
67
74
    try:
68
75
        os.execv(args[0], args)
69
76
        os._exit(1)
74
81
 
75
82
 
76
83
def daemon(name, logfilename, pidfilename, *args, **kwargs):
77
 
    """Execute a double fork to start up a daemon """
 
84
    """Execute a double fork to start up a daemon."""
78
85
 
79
86
    # fork 1 - close fds and start new process group
80
87
    pid = os.fork()
97
104
    if fnullr:
98
105
        os.close(fnullr)
99
106
    # open up the logfile and start up the process
100
 
    f = os.open(logfilename,
101
 
                os.O_WRONLY|os.O_CREAT|os.O_TRUNC)
 
107
    f = os.open(logfilename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
102
108
    os.dup2(f, 1)
103
109
    os.dup2(f, 2)
104
110
    if f > 2:
121
127
    os_exec(*args)
122
128
 
123
129
 
124
 
def status():
125
 
    """ provides status information about the RabbitMQ server """
126
 
    # Not ported yet.
127
 
    nodename = _get_nodename()
128
 
    if not _check_running():
129
 
        print "ERROR: RabbitMQ node %s is not running" % nodename
130
 
        return
131
 
    for act in ["list_exchanges", "list_queues"]:
132
 
        outstr, errstr = _rabbitctl(act, strip=True)
133
 
        if errstr:
134
 
            print >> sys.stderr, errstr
135
 
        if outstr:
136
 
            print outstr
137
 
    return
 
130
# def status():
 
131
#     """ provides status information about the RabbitMQ server """
 
132
#     # Not ported yet.
 
133
#     nodename = _get_nodename()
 
134
#     if not _check_running():
 
135
#         print "ERROR: RabbitMQ node %s is not running" % nodename
 
136
#         return
 
137
#     for act in ["list_exchanges", "list_queues"]:
 
138
#         outstr, errstr = _rabbitctl(act, strip=True)
 
139
#         if errstr:
 
140
#             print >> sys.stderr, errstr
 
141
#         if outstr:
 
142
#             print outstr
 
143
#     return
138
144
 
139
145
 
140
146
def allocate_ports(n=1):
141
 
    """
142
 
    Allocate n unused ports
 
147
    """Allocate `n` unused ports.
143
148
 
144
 
    There is a small race condition here (between the time we allocate
145
 
    the port, and the time it actually gets used), but for the purposes
146
 
    for which this function gets used it isn't a problem in practice.
 
149
    There is a small race condition here (between the time we allocate the
 
150
    port, and the time it actually gets used), but for the purposes for which
 
151
    this function gets used it isn't a problem in practice.
147
152
    """
148
153
    sockets = map(lambda _: socket.socket(), xrange(n))
149
154
    try:
150
155
        for s in sockets:
151
156
            s.bind(('localhost', 0))
152
 
        ports = map(lambda s: s.getsockname()[1], sockets)
 
157
        return map(lambda s: s.getsockname()[1], sockets)
153
158
    finally:
154
 
        for s in sockets: 
 
159
        for s in sockets:
155
160
            s.close()
156
 
    return ports
157
 
 
158
 
 
159
 
class AllocateRabbitServer(Fixture):
160
 
    """Allocate the resources a rabbit server needs.
161
 
 
162
 
    :ivar hostname: The host the rabbit is on (always localhost for
163
 
        AllocateRabbitServer).
 
161
 
 
162
 
 
163
class RabbitFixture(Fixture):
 
164
    """Common fixture stuff for dealing with RabbitMQ servers.
 
165
 
 
166
    In particular this adopts detail handling code from `testtools` so that
 
167
    details from sub-fixtures are propagated up to the test case.
 
168
    """
 
169
 
 
170
    def useFixture(self, fixture):
 
171
        super(RabbitFixture, self).useFixture(fixture)
 
172
        self.addCleanup(self._gather_details, fixture.getDetails)
 
173
        return fixture
 
174
 
 
175
    def _gather_details(self, getDetails):
 
176
        """Merge the details from getDetails() into self.getDetails().
 
177
 
 
178
        Shamelessly adapted from `testtools.TestCase._gather_details`.
 
179
        """
 
180
        details = getDetails()
 
181
        my_details = self.getDetails()
 
182
        for name, content_object in details.items():
 
183
            new_name = name
 
184
            disambiguator = itertools.count(1)
 
185
            while new_name in my_details:
 
186
                new_name = '%s-%d' % (name, next(disambiguator))
 
187
            name = new_name
 
188
            content_bytes = list(content_object.iter_bytes())
 
189
            content_callback = lambda: content_bytes
 
190
            self.addDetail(name,
 
191
                Content(content_object.content_type, content_callback))
 
192
 
 
193
 
 
194
class RabbitServerResources(RabbitFixture):
 
195
    """Allocate the resources a RabbitMQ server needs.
 
196
 
 
197
    :ivar hostname: The host the RabbitMQ is on (always localhost for
 
198
        `RabbitServerResources`).
164
199
    :ivar port: A port that was free at the time setUp() was called.
165
 
    :ivar rabbitdir: A directory to put the rabbit logs in.
166
 
    :ivar mnesiadir: A directory for the rabbit db.
 
200
    :ivar rabbitdir: A directory to put the RabbitMQ logs in.
 
201
    :ivar mnesiadir: A directory for the RabbitMQ db.
167
202
    :ivar logfile: The logfile allocated for the server.
168
203
    :ivar pidfile: The file the pid should be written to.
169
204
    :ivar nodename: The name of the node.
170
205
    """
171
206
    def setUp(self):
172
 
        super(AllocateRabbitServer, self).setUp()
 
207
        super(RabbitServerResources, self).setUp()
173
208
        self.hostname = 'localhost'
174
209
        self.port = allocate_ports()[0]
175
210
        self.rabbitdir = self.useFixture(TempDir()).path
178
213
        self.pidfile = os.path.join(self.rabbitdir, 'rabbit.pid')
179
214
        self.nodename = os.path.basename(self.useFixture(TempDir()).path)
180
215
 
 
216
    @property
181
217
    def fq_nodename(self):
182
 
        """Get the node of the rabbit that is being exported."""
 
218
        """The node of the RabbitMQ that is being exported."""
183
219
        # Note that socket.gethostname is recommended by the rabbitctl manpage
184
220
        # even though we're always on localhost, its what the erlang cluster
185
221
        # code wants.
186
222
        return "%s@%s" % (self.nodename, socket.gethostname())
187
223
 
188
224
 
189
 
class ExportRabbitServer(Fixture):
190
 
    """Export the environmen variables needed to talk to a rabbit instance.
191
 
    
192
 
    When setup this exports the key rabbit variables::
193
 
     * RABBITMQ_MNESIA_BASE
194
 
     * RABBITMQ_LOG_BASE
195
 
     * RABBITMQ_NODE_PORT
196
 
     * RABBITMQ_NODENAME
 
225
class RabbitServerEnvironment(RabbitFixture):
 
226
    """Export the environment variables needed to talk to a RabbitMQ instance.
 
227
 
 
228
    When setup this exports the key RabbitMQ variables:
 
229
 
 
230
    - ``RABBITMQ_MNESIA_BASE``
 
231
    - ``RABBITMQ_LOG_BASE``
 
232
    - ``RABBITMQ_NODE_PORT``
 
233
    - ``RABBITMQ_NODENAME``
 
234
 
197
235
    """
198
236
 
199
237
    def __init__(self, config):
200
 
        """Create a ExportRabbitServer instance.
 
238
        """Create a `RabbitServerEnvironment` instance.
201
239
 
202
 
        :param config: An object exporting the variables `AllocateRabbitServer`
203
 
            exports.
 
240
        :param config: An object exporting the variables
 
241
            `RabbitServerResources` exports.
204
242
        """
205
 
        super(ExportRabbitServer, self).__init__()
 
243
        super(RabbitServerEnvironment, self).__init__()
206
244
        self.config = config
207
245
 
208
246
    def setUp(self):
209
 
        super(ExportRabbitServer, self).setUp()
 
247
        super(RabbitServerEnvironment, self).setUp()
210
248
        self.useFixture(EnvironmentVariableFixture(
211
249
            "RABBITMQ_MNESIA_BASE", self.config.mnesiadir))
212
250
        self.useFixture(EnvironmentVariableFixture(
220
258
            Content(UTF8_TEXT, self._get_errors))
221
259
 
222
260
    def _get_errors(self):
 
261
        """Yield all errors as UTF-8 encoded text."""
223
262
        for error in self._errors:
224
263
            if type(error) is unicode:
225
264
                yield error.encode('utf8')
228
267
            yield '\n'
229
268
 
230
269
    def rabbitctl(self, command, strip=False):
231
 
        """ executes a rabbitctl command and returns status """
232
 
        ctlbin = RABBITBIN + "/rabbitmqctl"
233
 
        nodename = self.config.fq_nodename()
 
270
        """Executes a ``rabbitctl`` command and returns status."""
 
271
        ctlbin = os.path.join(RABBITBIN, "rabbitmqctl")
 
272
        nodename = self.config.fq_nodename
234
273
        env = dict(os.environ)
235
274
        env['HOME'] = self.config.rabbitdir
236
 
        ctl = subprocess.Popen((ctlbin, "-n", nodename, command),
237
 
                               stdout = subprocess.PIPE,
238
 
                               stderr = subprocess.PIPE, env=env)
 
275
        ctl = subprocess.Popen(
 
276
            (ctlbin, "-n", nodename, command), env=env,
 
277
            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
239
278
        outstr, errstr = ctl.communicate()
240
279
        if strip:
241
280
            return outstr.strip(), errstr.strip()
242
281
        return outstr, errstr
243
282
 
244
283
    def check_running(self):
245
 
        """ checks that the rabbitmq process is up and running """
246
 
        nodename = self.config.fq_nodename()
 
284
        """Checks that RabbitMQ is up and running."""
 
285
        nodename = self.config.fq_nodename
247
286
        outdata, errdata = self.rabbitctl("status")
248
287
        if errdata:
249
288
            self._errors.append(errdata)
260
299
                  [^@']+                # hostname
261
300
                )                       # end capturing group
262
301
                '?                      # individual node may be quoted
263
 
                ,?                      # may be multiple nodes, comma-separated
 
302
                ,?                      # may be multiple nodes, comma-sep
264
303
              \]                        # end list
265
304
            \}                          # end section
266
305
        """, re.VERBOSE)
268
307
        if not match:
269
308
            self._errors.append(outdata)
270
309
            return False
271
 
        found_node = match.groupdict()['nodename']
 
310
        found_node = match.group('nodename')
272
311
        return found_node == nodename
273
312
 
274
313
    def get_connection(self):
275
314
        """Get an AMQP connection to the RabbitMQ server.
276
 
        
 
315
 
277
316
        :raises socket.error: If the connection cannot be made.
278
317
        """
279
318
        host_port = "%s:%s" % (self.config.hostname, self.config.port)
280
 
        conn = amqp.Connection(host=host_port, userid="guest",
281
 
                               password="guest", virtual_host="/", insist=False)
282
 
        return conn
283
 
 
284
 
 
285
 
class RunRabbitServer(Fixture):
286
 
    """Run a rabbit server.
287
 
    
 
319
        return amqp.Connection(
 
320
            host=host_port, userid="guest",
 
321
            password="guest", virtual_host="/", insist=False)
 
322
 
 
323
 
 
324
class RabbitServerRunner(RabbitFixture):
 
325
    """Run a RabbitMQ server.
 
326
 
288
327
    :ivar pid: The pid of the server.
289
328
    """
290
329
 
291
330
    def __init__(self, config):
292
 
        """Create a RunRabbitServer instance.
 
331
        """Create a `RabbitServerRunner` instance.
293
332
 
294
 
        :param config: An object exporting the variables `AllocateRabbitServer`
295
 
            exports.
 
333
        :param config: An object exporting the variables
 
334
            `RabbitServerResources` exports.
296
335
        """
297
 
        super(RunRabbitServer, self).__init__()
 
336
        super(RabbitServerRunner, self).__init__()
298
337
        self.config = config
299
338
 
300
339
    def setUp(self):
301
 
        super(RunRabbitServer, self).setUp()
302
 
        self.rabbit = self.useFixture(ExportRabbitServer(self.config))
303
 
        # Workaround fixtures not adding details from used fixtures.
304
 
        self.addDetail('rabbitctl errors',
305
 
            Content(UTF8_TEXT, self.rabbit._get_errors))
306
 
        self.addDetail('rabbit log file',
307
 
            Content(UTF8_TEXT, lambda:[file(self.config.logfile, 'rb').read()]))
308
 
        cmd = RABBITBIN + '/rabbitmq-server'
 
340
        super(RabbitServerRunner, self).setUp()
 
341
        self.environment = self.useFixture(
 
342
            RabbitServerEnvironment(self.config))
 
343
        self._start()
 
344
 
 
345
    def _start(self):
 
346
        cmd = os.path.join(RABBITBIN, 'rabbitmq-server')
309
347
        name = "RabbitMQ server node:%s on port:%d" % (
310
348
            self.config.nodename, self.config.port)
311
349
        daemon(name, self.config.logfile, self.config.pidfile, command=cmd,
312
350
            homedir=self.config.rabbitdir)
313
 
        # now wait about 5 secs for it to start
 
351
        self.addDetail(
 
352
            os.path.basename(self.config.logfile),
 
353
            content_from_file(self.config.logfile))
 
354
        # Wait for the server to come up...
314
355
        timeout = time.time() + 5
315
 
        while True:
316
 
            if self.rabbit.check_running():
 
356
        while time.time() < timeout:
 
357
            if self.environment.check_running():
317
358
                break
318
 
            elif time.time() > timeout:
319
 
                raise Exception('Timeout waiting for rabbit OTP server to start.')
320
 
        # The erlang OTP is up, but rabbit may not be usable. We need to
321
 
        # cleanup up the process from here on in even if the full service fails
322
 
        # to get together.
323
 
        self.addCleanup(self.stop)
324
 
        while True:
 
359
            time.sleep(0.3)
 
360
        else:
 
361
            raise Exception(
 
362
                "Timeout waiting for RabbitMQ OTP server to start.")
 
363
        # The erlang OTP is up, but RabbitMQ may not be usable. We need to
 
364
        # cleanup up the process from here on in even if the full service
 
365
        # fails to get together.
 
366
        self.addCleanup(self._stop)
 
367
        # Wait until the server is ready...
 
368
        while time.time() < timeout:
325
369
            # rabbitctl can say a node is up before it is ready to
326
370
            # accept connections ... :-(
327
371
            try:
328
 
                conn = self.rabbit.get_connection()
 
372
                conn = self.environment.get_connection()
329
373
            except socket.error:
330
 
                pass
 
374
                time.sleep(0.1)
331
375
            else:
332
376
                conn.close()
333
377
                break
334
 
            time.sleep(0.1)
335
 
            if time.time() > timeout:
336
 
                raise Exception('Timeout waiting for rabbit to start listening.')
337
 
        # all should be well here
 
378
        else:
 
379
            raise Exception(
 
380
                "Timeout waiting for RabbitMQ to start listening.")
 
381
        # All should be well here.
338
382
        with open(self.config.pidfile, "r") as f:
339
383
            self.pid = int(f.read().strip())
340
384
 
341
 
    def stop(self):
 
385
    def _stop(self):
342
386
        """Stop the running server. Normally called by cleanups."""
343
 
        if not self.rabbit.check_running():
 
387
        if not self.environment.check_running():
344
388
            # If someone has shut it down already, we're done.
345
389
            return
346
 
        outstr, errstr = self.rabbit.rabbitctl("stop", strip=True)
 
390
        outstr, errstr = self.environment.rabbitctl("stop", strip=True)
347
391
        if outstr:
348
 
            self.addDetail('stop-out', Content(UTF8_TEXT, lambda:[outstr]))
 
392
            self.addDetail('stop-out', Content(UTF8_TEXT, lambda: [outstr]))
349
393
        if errstr:
350
 
            self.addDetail('stop-err', Content(UTF8_TEXT, lambda:[errstr]))
351
 
        # wait for the process to finish...
 
394
            self.addDetail('stop-err', Content(UTF8_TEXT, lambda: [errstr]))
 
395
        # Wait for the server to go down...
352
396
        timeout = time.time() + 15
353
 
        while self.rabbit.check_running():
 
397
        while time.time() < timeout:
 
398
            if not self.environment.check_running():
 
399
                break
354
400
            time.sleep(0.3)
355
 
            if time.time() > timeout:
356
 
                raise Exception(
357
 
                    "Error - reached timeout waiting for RabbitMQ shutdown")
 
401
        else:
 
402
            raise Exception(
 
403
                "Timeout waiting for RabbitMQ shutdown.")
 
404
        # Wait for the process to end...
358
405
        while time.time() < timeout:
359
406
            try:
360
407
                os.kill(self.pid, 0)
361
408
            except OSError, e:
362
409
                if e.errno == errno.ESRCH:
363
410
                    break
364
 
            time.sleep(0.1)
365
 
            if time.time() > timeout:
366
 
                raise Exception(
367
 
                    "Error - rabbit pid %d did not quit." % (pid,))
368
 
 
369
 
 
370
 
class RabbitServer(Fixture):
 
411
                raise
 
412
            else:
 
413
                time.sleep(0.1)
 
414
        else:
 
415
            raise Exception(
 
416
                "RabbitMQ (pid=%d) did not quit." % (self.pid,))
 
417
 
 
418
 
 
419
class RabbitServer(RabbitFixture):
371
420
    """A RabbitMQ server fixture.
372
421
 
373
 
    When setup a rabbit instance will be running and the environment variables
374
 
    needed to talk to it will be already configured.
 
422
    When setup a RabbitMQ instance will be running and the environment
 
423
    variables needed to talk to it will be already configured.
375
424
 
376
 
    :ivar config: The `AllocateRabbitServer` used to start the rabbit.
 
425
    :ivar config: The `RabbitServerResources` used to start the server.
 
426
    :ivar runner: The `RabbitServerRunner` that bootstraps the server.
377
427
    """
378
428
 
379
429
    def setUp(self):
380
430
        super(RabbitServer, self).setUp()
381
 
        self.config = self.useFixture(AllocateRabbitServer())
382
 
        self.server = RunRabbitServer(self.config)
383
 
        self.useFixture(self.server)
384
 
 
385
 
    def getDetails(self):
386
 
        return self.server.getDetails()
387
 
 
 
431
        self.config = self.useFixture(RabbitServerResources())
 
432
        self.runner = self.useFixture(RabbitServerRunner(self.config))