~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/services/rabbit/testing/server.py

[r=allenap][bug=788557] In the RabbitMQ fixture don't double fork
 (and use subprocess),
 ensure SIGPIPE behaviour is returned to the default in spawned processes,
 and avoid the cookie race between the RabbitMQ daemon and rabbitmqctl.

Show diffs side-by-side

added added

removed removed

Lines of Context:
9
9
    "RabbitServer",
10
10
    ]
11
11
 
12
 
import errno
13
12
import os
14
13
import re
15
14
import signal
16
15
import socket
17
16
import subprocess
18
 
import sys
19
17
import time
20
18
 
21
19
from amqplib import client_0_8 as amqp
69
67
#     return True
70
68
 
71
69
 
72
 
def os_exec(*args):
73
 
    """Wrapper for `os.execve()` that catches execution errors."""
74
 
    try:
75
 
        os.execv(args[0], args)
76
 
        os._exit(1)
77
 
    except OSError:
78
 
        sys.stderr.write("\nERROR:\nCould not exec: %s\n" % (args,))
79
 
    # if we reach here, it's an error anyway
80
 
    os._exit(-1)
81
 
 
82
 
 
83
 
def daemon(name, logfilename, pidfilename, *args, **kwargs):
84
 
    """Execute a double fork to start up a daemon."""
85
 
 
86
 
    # fork 1 - close fds and start new process group
87
 
    pid = os.fork()
88
 
    if pid:
89
 
        # parent process - we collect the first child to avoid ghosts.
90
 
        os.waitpid(pid, 0)
91
 
        return
92
 
    # start a new process group and detach ttys
93
 
    # print '## Starting', name, '##'
94
 
    os.setsid()
95
 
 
 
70
def preexec_fn():
96
71
    # Revert Python's handling of SIGPIPE. See
97
72
    # http://bugs.python.org/issue1652 for more info.
98
73
    signal.signal(signal.SIGPIPE, signal.SIG_DFL)
99
74
 
100
 
    # fork 2 - now detach once more free and clear
101
 
    pid = os.fork()
102
 
    if pid:
103
 
        # this is the first fork - its job is done
104
 
        os._exit(0)
105
 
    # make attempts to read from stdin fail.
106
 
    fnullr = os.open(os.devnull, os.O_RDONLY)
107
 
    os.dup2(fnullr, 0)
108
 
    if fnullr:
109
 
        os.close(fnullr)
110
 
    # open up the logfile and start up the process
111
 
    f = os.open(logfilename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
112
 
    os.dup2(f, 1)
113
 
    os.dup2(f, 2)
114
 
    if f > 2:
115
 
        os.close(f)
116
 
    # With output setup to log we can start running code again.
117
 
    if 'command' in kwargs:
118
 
        args = (kwargs['command'],) + args
119
 
    else:
120
 
        args = ('/usr/bin/env', 'python', '-u',) + args
121
 
    if 'homedir' in kwargs:
122
 
        os.environ['HOME'] = kwargs['homedir']
123
 
    print os.environ['HOME']
124
 
    print os.stat(os.environ['HOME'])
125
 
    # this should get logged
126
 
    print '## Starting %s as %s' % (name, args)
127
 
    # write the pidfile file
128
 
    with open(pidfilename, "w") as pidfile:
129
 
        pidfile.write("%d" % os.getpid())
130
 
        pidfile.flush()
131
 
    os_exec(*args)
132
 
 
133
 
 
134
 
# def status():
135
 
#     """ provides status information about the RabbitMQ server """
136
 
#     # Not ported yet.
137
 
#     nodename = _get_nodename()
138
 
#     if not _check_running():
139
 
#         print "ERROR: RabbitMQ node %s is not running" % nodename
140
 
#         return
141
 
#     for act in ["list_exchanges", "list_queues"]:
142
 
#         outstr, errstr = _rabbitctl(act, strip=True)
143
 
#         if errstr:
144
 
#             print >> sys.stderr, errstr
145
 
#         if outstr:
146
 
#             print outstr
147
 
#     return
148
 
 
149
75
 
150
76
def allocate_ports(n=1):
151
77
    """Allocate `n` unused ports.
170
96
    :ivar hostname: The host the RabbitMQ is on (always localhost for
171
97
        `RabbitServerResources`).
172
98
    :ivar port: A port that was free at the time setUp() was called.
173
 
    :ivar rabbitdir: A directory to put the RabbitMQ logs in.
 
99
    :ivar homedir: A directory to put the RabbitMQ logs in.
174
100
    :ivar mnesiadir: A directory for the RabbitMQ db.
175
101
    :ivar logfile: The logfile allocated for the server.
176
 
    :ivar pidfile: The file the pid should be written to.
177
102
    :ivar nodename: The name of the node.
178
103
    """
179
104
    def setUp(self):
180
105
        super(RabbitServerResources, self).setUp()
181
106
        self.hostname = 'localhost'
182
107
        self.port = allocate_ports()[0]
183
 
        self.rabbitdir = self.useFixture(TempDir()).path
 
108
        self.homedir = self.useFixture(TempDir()).path
184
109
        self.mnesiadir = self.useFixture(TempDir()).path
185
 
        self.logfile = os.path.join(self.rabbitdir, 'rabbit.log')
186
 
        self.pidfile = os.path.join(self.rabbitdir, 'rabbit.pid')
 
110
        self.logfile = os.path.join(self.homedir, 'server.log')
187
111
        self.nodename = os.path.basename(self.useFixture(TempDir()).path)
188
112
 
189
113
    @property
221
145
        self.useFixture(EnvironmentVariableFixture(
222
146
            "RABBITMQ_MNESIA_BASE", self.config.mnesiadir))
223
147
        self.useFixture(EnvironmentVariableFixture(
224
 
            "RABBITMQ_LOG_BASE", self.config.rabbitdir))
 
148
            "RABBITMQ_LOG_BASE", self.config.homedir))
225
149
        self.useFixture(EnvironmentVariableFixture(
226
150
            "RABBITMQ_NODE_PORT", str(self.config.port)))
227
151
        self.useFixture(EnvironmentVariableFixture(
243
167
        """Executes a ``rabbitctl`` command and returns status."""
244
168
        ctlbin = os.path.join(RABBITBIN, "rabbitmqctl")
245
169
        nodename = self.config.fq_nodename
246
 
        env = dict(os.environ)
247
 
        env['HOME'] = self.config.rabbitdir
 
170
        env = dict(os.environ, HOME=self.config.homedir)
248
171
        ctl = subprocess.Popen(
249
172
            (ctlbin, "-n", nodename, command), env=env,
250
 
            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
173
            stdout=subprocess.PIPE, stderr=subprocess.PIPE,
 
174
            preexec_fn=preexec_fn)
251
175
        outstr, errstr = ctl.communicate()
252
176
        if strip:
253
177
            return outstr.strip(), errstr.strip()
254
178
        return outstr, errstr
255
179
 
256
 
    def check_running(self):
257
 
        """Checks that RabbitMQ is up and running."""
 
180
    def is_node_running(self):
 
181
        """Checks that our RabbitMQ node is up and running."""
258
182
        nodename = self.config.fq_nodename
259
183
        outdata, errdata = self.rabbitctl("status")
260
184
        if errdata:
295
219
 
296
220
 
297
221
class RabbitServerRunner(Fixture):
298
 
    """Run a RabbitMQ server.
299
 
 
300
 
    :ivar pid: The pid of the server.
301
 
    """
 
222
    """Run a RabbitMQ server."""
302
223
 
303
224
    def __init__(self, config):
304
225
        """Create a `RabbitServerRunner` instance.
308
229
        """
309
230
        super(RabbitServerRunner, self).__init__()
310
231
        self.config = config
 
232
        self.process = None
311
233
 
312
234
    def setUp(self):
313
235
        super(RabbitServerRunner, self).setUp()
315
237
            RabbitServerEnvironment(self.config))
316
238
        self._start()
317
239
 
318
 
    def _start(self):
 
240
    def is_running(self):
 
241
        """Is the RabbitMQ server process still running?"""
 
242
        if self.process is None:
 
243
            return False
 
244
        else:
 
245
            return self.process.poll() is None
 
246
 
 
247
    def check_running(self):
 
248
        """Checks that the RabbitMQ server process is still running.
 
249
 
 
250
        :raises Exception: If it not running.
 
251
        :return: True if it is running.
 
252
        """
 
253
        if self.is_running():
 
254
            return True
 
255
        else:
 
256
            raise Exception("RabbitMQ server is not running.")
 
257
 
 
258
    def _spawn(self):
 
259
        """Spawn the RabbitMQ server process."""
319
260
        cmd = os.path.join(RABBITBIN, 'rabbitmq-server')
320
 
        name = "RabbitMQ server node:%s on port:%d" % (
321
 
            self.config.nodename, self.config.port)
322
 
        daemon(name, self.config.logfile, self.config.pidfile, command=cmd,
323
 
            homedir=self.config.rabbitdir)
 
261
        env = dict(os.environ, HOME=self.config.homedir)
 
262
        with open(self.config.logfile, "wb") as logfile:
 
263
            with open(os.devnull, "rb") as devnull:
 
264
                self.process = subprocess.Popen(
 
265
                    [cmd], stdin=devnull, stdout=logfile, stderr=logfile,
 
266
                    close_fds=True, cwd=self.config.homedir, env=env,
 
267
                    preexec_fn=preexec_fn)
324
268
        self.addDetail(
325
269
            os.path.basename(self.config.logfile),
326
270
            content_from_file(self.config.logfile))
 
271
 
 
272
    def _start(self):
 
273
        """Start the RabbitMQ server."""
 
274
        # Check if Rabbit is already running. In truth this is really to avoid
 
275
        # a race condition around creating $HOME/.erlang.cookie: let rabbitctl
 
276
        # create it now, before spawning the daemon.
 
277
        if self.environment.is_node_running():
 
278
            raise AssertionError(
 
279
                "RabbitMQ OTP already running even though it "
 
280
                "hasn't been started it yet!")
 
281
        self._spawn()
327
282
        # Wait for the server to come up...
328
283
        timeout = time.time() + 15
329
 
        while time.time() < timeout:
330
 
            if self.environment.check_running():
 
284
        while time.time() < timeout and self.check_running():
 
285
            if self.environment.is_node_running():
331
286
                break
332
287
            time.sleep(0.3)
333
288
        else:
334
289
            raise Exception(
335
 
                "Timeout waiting for RabbitMQ OTP server to start.")
336
 
        # The erlang OTP is up, but RabbitMQ may not be usable. We need to
 
290
                "Timeout waiting for RabbitMQ server to start.")
 
291
        # The Erlang OTP is up, but RabbitMQ may not be usable. We need to
337
292
        # cleanup up the process from here on in even if the full service
338
293
        # fails to get together.
339
294
        self.addCleanup(self._stop)
340
 
        # Wait until the server is ready...
341
 
        while time.time() < timeout:
342
 
            # rabbitctl can say a node is up before it is ready to
343
 
            # accept connections ... :-(
 
295
        # `rabbitctl status` can say a node is up before it is ready to accept
 
296
        # connections. Wait at least 5 more seconds for the node to come up...
 
297
        timeout = max(timeout, time.time() + 5)
 
298
        while time.time() < timeout and self.check_running():
344
299
            try:
345
 
                conn = self.environment.get_connection()
 
300
                self.environment.get_connection().close()
346
301
            except socket.error:
347
302
                time.sleep(0.1)
348
303
            else:
349
 
                conn.close()
350
304
                break
351
305
        else:
352
306
            raise Exception(
353
 
                "Timeout waiting for RabbitMQ to start listening.")
354
 
        # All should be well here.
355
 
        with open(self.config.pidfile, "r") as f:
356
 
            self.pid = int(f.read().strip())
 
307
                "Timeout waiting for RabbitMQ to node to come up.")
357
308
 
358
 
    def _stop(self):
359
 
        """Stop the running server. Normally called by cleanups."""
360
 
        if not self.environment.check_running():
361
 
            # If someone has shut it down already, we're done.
362
 
            return
 
309
    def _request_stop(self):
363
310
        outstr, errstr = self.environment.rabbitctl("stop", strip=True)
364
311
        if outstr:
365
312
            self.addDetail('stop-out', Content(UTF8_TEXT, lambda: [outstr]))
366
313
        if errstr:
367
314
            self.addDetail('stop-err', Content(UTF8_TEXT, lambda: [errstr]))
368
 
        # Wait for the server to go down...
 
315
 
 
316
    def _stop(self):
 
317
        """Stop the running server. Normally called by cleanups."""
 
318
        self._request_stop()
 
319
        # Wait for the node to go down...
369
320
        timeout = time.time() + 15
370
321
        while time.time() < timeout:
371
 
            if not self.environment.check_running():
 
322
            if not self.environment.is_node_running():
372
323
                break
373
324
            time.sleep(0.3)
374
325
        else:
375
326
            raise Exception(
376
 
                "Timeout waiting for RabbitMQ shutdown.")
377
 
        # Wait for the process to end...
 
327
                "Timeout waiting for RabbitMQ node to go down.")
 
328
        # Wait at least 5 more seconds for the process to end...
 
329
        timeout = max(timeout, time.time() + 5)
378
330
        while time.time() < timeout:
379
 
            try:
380
 
                os.kill(self.pid, 0)
381
 
            except OSError, e:
382
 
                if e.errno == errno.ESRCH:
383
 
                    break
384
 
                raise
385
 
            else:
386
 
                time.sleep(0.1)
 
331
            if not self.is_running():
 
332
                break
 
333
            self.process.terminate()
 
334
            time.sleep(0.1)
387
335
        else:
388
 
            raise Exception(
389
 
                "RabbitMQ (pid=%d) did not quit." % (self.pid,))
 
336
            # Die!!!
 
337
            if self.is_running():
 
338
                self.process.kill()
 
339
                time.sleep(0.5)
 
340
            if self.is_running():
 
341
                raise Exception("RabbitMQ server just won't die.")
390
342
 
391
343
 
392
344
class RabbitServer(Fixture):