154
157
message.append('end\n')
155
158
message = ''.join(message)
156
159
response, sock = self._sendMessageToService(message)
157
if response.startswith('FAILURE'):
158
# TODO: Is there a better error to raise?
159
raise RuntimeError("Failed while sending message to forking "
160
"service. message: %r, failure: %r"
161
% (message, response))
162
ok, pid, path, tail = response.split('\n')
166
log.msg('Forking returned pid: %d, path: %s' % (pid, path))
161
ok, pid, path, tail = response.split('\n')
165
log.msg('Forking returned pid: %d, path: %s' % (pid, path))
167
169
return pid, path, sock
171
def _openHandleFailures(self, call_on_failure, path, flags, proc_class,
173
"""Open the given path, adding a cleanup as appropriate.
175
:param call_on_failure: A list holding (callback, args) tuples. We will
176
append new entries for things that we open
177
:param path: The path to open
178
:param flags: Flags to pass to os.open
179
:param proc_class: The ProcessWriter/ProcessReader class to wrap this
181
:param reactor: The Twisted reactor we are connecting to.
182
:param child_fd: The child file descriptor number passed to proc_class
184
fd = os.open(path, flags)
185
call_on_failure.append((os.close, fd))
186
p = proc_class(reactor, self, child_fd, fd)
187
# Now that p has been created, it will close fd for us. So switch the
188
# cleanup to calling p.connectionLost()
189
call_on_failure[-1] = (p.connectionLost, (None,))
190
self.pipes[child_fd] = p
169
192
def _connectSpawnToReactor(self, reactor):
193
self._exiter = _WaitForExit(reactor, self, self.process_sock)
194
call_on_failure = [(self._exiter.connectionLost, (None,))]
170
195
stdin_path = os.path.join(self._fifo_path, 'stdin')
171
196
stdout_path = os.path.join(self._fifo_path, 'stdout')
172
197
stderr_path = os.path.join(self._fifo_path, 'stderr')
173
child_stdin_fd = os.open(stdin_path, os.O_WRONLY)
174
self.pipes[0] = process.ProcessWriter(reactor, self, 0,
176
child_stdout_fd = os.open(stdout_path, os.O_RDONLY)
177
# forceReadHack=True ? Used in process.py doesn't seem to be needed
179
self.pipes[1] = process.ProcessReader(reactor, self, 1,
181
child_stderr_fd = os.open(stderr_path, os.O_RDONLY)
182
self.pipes[2] = process.ProcessReader(reactor, self, 2,
184
# Note: _exiter forms a GC cycle, since it points to us, and we hold a
186
self._exiter = _WaitForExit(reactor, self, self.process_sock)
199
self._openHandleFailures(call_on_failure, stdin_path, os.O_WRONLY,
200
process.ProcessWriter, reactor, 0)
201
self._openHandleFailures(call_on_failure, stdout_path, os.O_RDONLY,
202
process.ProcessReader, reactor, 1)
203
self._openHandleFailures(call_on_failure, stderr_path, os.O_RDONLY,
204
process.ProcessReader, reactor, 2)
206
exc_class, exc_value, exc_tb = sys.exc_info()
207
for func, args in call_on_failure:
211
# Just log any exceptions at this point. This makes sure
212
# all cleanups get called so we don't get leaks. We know
213
# there is an active exception, or we wouldn't be here.
215
raise exc_class, exc_value, exc_tb
187
216
self.pipes['exit'] = self._exiter
189
218
def _getReason(self, status):