73
"""Wrapper for `os.execve()` that catches execution errors."""
75
os.execv(args[0], args)
78
sys.stderr.write("\nERROR:\nCould not exec: %s\n" % (args,))
79
# if we reach here, it's an error anyway
83
def daemon(name, logfilename, pidfilename, *args, **kwargs):
84
"""Execute a double fork to start up a daemon."""
86
# fork 1 - close fds and start new process group
89
# parent process - we collect the first child to avoid ghosts.
92
# start a new process group and detach ttys
93
# print '## Starting', name, '##'
96
71
# Revert Python's handling of SIGPIPE. See
97
72
# http://bugs.python.org/issue1652 for more info.
98
73
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
100
# fork 2 - now detach once more free and clear
103
# this is the first fork - its job is done
105
# make attempts to read from stdin fail.
106
fnullr = os.open(os.devnull, os.O_RDONLY)
110
# open up the logfile and start up the process
111
f = os.open(logfilename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
116
# With output setup to log we can start running code again.
117
if 'command' in kwargs:
118
args = (kwargs['command'],) + args
120
args = ('/usr/bin/env', 'python', '-u',) + args
121
if 'homedir' in kwargs:
122
os.environ['HOME'] = kwargs['homedir']
123
print os.environ['HOME']
124
print os.stat(os.environ['HOME'])
125
# this should get logged
126
print '## Starting %s as %s' % (name, args)
127
# write the pidfile file
128
with open(pidfilename, "w") as pidfile:
129
pidfile.write("%d" % os.getpid())
135
# """ provides status information about the RabbitMQ server """
137
# nodename = _get_nodename()
138
# if not _check_running():
139
# print "ERROR: RabbitMQ node %s is not running" % nodename
141
# for act in ["list_exchanges", "list_queues"]:
142
# outstr, errstr = _rabbitctl(act, strip=True)
144
# print >> sys.stderr, errstr
150
76
def allocate_ports(n=1):
151
77
"""Allocate `n` unused ports.
170
96
:ivar hostname: The host the RabbitMQ is on (always localhost for
171
97
`RabbitServerResources`).
172
98
:ivar port: A port that was free at the time setUp() was called.
173
:ivar rabbitdir: A directory to put the RabbitMQ logs in.
99
:ivar homedir: A directory to put the RabbitMQ logs in.
174
100
:ivar mnesiadir: A directory for the RabbitMQ db.
175
101
:ivar logfile: The logfile allocated for the server.
176
:ivar pidfile: The file the pid should be written to.
177
102
:ivar nodename: The name of the node.
180
105
super(RabbitServerResources, self).setUp()
181
106
self.hostname = 'localhost'
182
107
self.port = allocate_ports()[0]
183
self.rabbitdir = self.useFixture(TempDir()).path
108
self.homedir = self.useFixture(TempDir()).path
184
109
self.mnesiadir = self.useFixture(TempDir()).path
185
self.logfile = os.path.join(self.rabbitdir, 'rabbit.log')
186
self.pidfile = os.path.join(self.rabbitdir, 'rabbit.pid')
110
self.logfile = os.path.join(self.homedir, 'server.log')
187
111
self.nodename = os.path.basename(self.useFixture(TempDir()).path)
243
167
"""Executes a ``rabbitctl`` command and returns status."""
244
168
ctlbin = os.path.join(RABBITBIN, "rabbitmqctl")
245
169
nodename = self.config.fq_nodename
246
env = dict(os.environ)
247
env['HOME'] = self.config.rabbitdir
170
env = dict(os.environ, HOME=self.config.homedir)
248
171
ctl = subprocess.Popen(
249
172
(ctlbin, "-n", nodename, command), env=env,
250
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
173
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
174
preexec_fn=preexec_fn)
251
175
outstr, errstr = ctl.communicate()
253
177
return outstr.strip(), errstr.strip()
254
178
return outstr, errstr
256
def check_running(self):
257
"""Checks that RabbitMQ is up and running."""
180
def is_node_running(self):
181
"""Checks that our RabbitMQ node is up and running."""
258
182
nodename = self.config.fq_nodename
259
183
outdata, errdata = self.rabbitctl("status")
315
237
RabbitServerEnvironment(self.config))
240
def is_running(self):
241
"""Is the RabbitMQ server process still running?"""
242
if self.process is None:
245
return self.process.poll() is None
247
def check_running(self):
248
"""Checks that the RabbitMQ server process is still running.
250
:raises Exception: If it not running.
251
:return: True if it is running.
253
if self.is_running():
256
raise Exception("RabbitMQ server is not running.")
259
"""Spawn the RabbitMQ server process."""
319
260
cmd = os.path.join(RABBITBIN, 'rabbitmq-server')
320
name = "RabbitMQ server node:%s on port:%d" % (
321
self.config.nodename, self.config.port)
322
daemon(name, self.config.logfile, self.config.pidfile, command=cmd,
323
homedir=self.config.rabbitdir)
261
env = dict(os.environ, HOME=self.config.homedir)
262
with open(self.config.logfile, "wb") as logfile:
263
with open(os.devnull, "rb") as devnull:
264
self.process = subprocess.Popen(
265
[cmd], stdin=devnull, stdout=logfile, stderr=logfile,
266
close_fds=True, cwd=self.config.homedir, env=env,
267
preexec_fn=preexec_fn)
325
269
os.path.basename(self.config.logfile),
326
270
content_from_file(self.config.logfile))
273
"""Start the RabbitMQ server."""
274
# Check if Rabbit is already running. In truth this is really to avoid
275
# a race condition around creating $HOME/.erlang.cookie: let rabbitctl
276
# create it now, before spawning the daemon.
277
if self.environment.is_node_running():
278
raise AssertionError(
279
"RabbitMQ OTP already running even though it "
280
"hasn't been started it yet!")
327
282
# Wait for the server to come up...
328
283
timeout = time.time() + 15
329
while time.time() < timeout:
330
if self.environment.check_running():
284
while time.time() < timeout and self.check_running():
285
if self.environment.is_node_running():
335
"Timeout waiting for RabbitMQ OTP server to start.")
336
# The erlang OTP is up, but RabbitMQ may not be usable. We need to
290
"Timeout waiting for RabbitMQ server to start.")
291
# The Erlang OTP is up, but RabbitMQ may not be usable. We need to
337
292
# cleanup up the process from here on in even if the full service
338
293
# fails to get together.
339
294
self.addCleanup(self._stop)
340
# Wait until the server is ready...
341
while time.time() < timeout:
342
# rabbitctl can say a node is up before it is ready to
343
# accept connections ... :-(
295
# `rabbitctl status` can say a node is up before it is ready to accept
296
# connections. Wait at least 5 more seconds for the node to come up...
297
timeout = max(timeout, time.time() + 5)
298
while time.time() < timeout and self.check_running():
345
conn = self.environment.get_connection()
300
self.environment.get_connection().close()
346
301
except socket.error:
353
"Timeout waiting for RabbitMQ to start listening.")
354
# All should be well here.
355
with open(self.config.pidfile, "r") as f:
356
self.pid = int(f.read().strip())
307
"Timeout waiting for RabbitMQ to node to come up.")
359
"""Stop the running server. Normally called by cleanups."""
360
if not self.environment.check_running():
361
# If someone has shut it down already, we're done.
309
def _request_stop(self):
363
310
outstr, errstr = self.environment.rabbitctl("stop", strip=True)
365
312
self.addDetail('stop-out', Content(UTF8_TEXT, lambda: [outstr]))
367
314
self.addDetail('stop-err', Content(UTF8_TEXT, lambda: [errstr]))
368
# Wait for the server to go down...
317
"""Stop the running server. Normally called by cleanups."""
319
# Wait for the node to go down...
369
320
timeout = time.time() + 15
370
321
while time.time() < timeout:
371
if not self.environment.check_running():
322
if not self.environment.is_node_running():
376
"Timeout waiting for RabbitMQ shutdown.")
377
# Wait for the process to end...
327
"Timeout waiting for RabbitMQ node to go down.")
328
# Wait at least 5 more seconds for the process to end...
329
timeout = max(timeout, time.time() + 5)
378
330
while time.time() < timeout:
382
if e.errno == errno.ESRCH:
331
if not self.is_running():
333
self.process.terminate()
389
"RabbitMQ (pid=%d) did not quit." % (self.pid,))
337
if self.is_running():
340
if self.is_running():
341
raise Exception("RabbitMQ server just won't die.")
392
344
class RabbitServer(Fixture):