12274.1.4
by Jeroen Vermeulen
CommandSpawner. |
1 |
# Copyright 2011 Canonical Ltd. This software is licensed under the
|
2 |
# GNU Affero General Public License version 3 (see the file LICENSE).
|
|
3 |
||
4 |
"""Execute commands in parallel sub-processes."""
|
|
5 |
||
6 |
__metaclass__ = type |
|
7 |
__all__ = [ |
|
8 |
'CommandSpawner', |
|
9 |
'OutputLineHandler', |
|
10 |
'ReturnCodeReceiver', |
|
11 |
]
|
|
12 |
||
12274.1.9
by Jeroen Vermeulen
Better spelling for errno 11: errno.EAGAIN. |
13 |
import errno |
12274.1.4
by Jeroen Vermeulen
CommandSpawner. |
14 |
from fcntl import ( |
15 |
fcntl, |
|
16 |
F_GETFL, |
|
17 |
F_SETFL, |
|
18 |
)
|
|
19 |
from os import O_NONBLOCK |
|
20 |
import select |
|
21 |
import subprocess |
|
22 |
||
23 |
||
24 |
def get_process_output_files(process): |
|
25 |
"""Return the files we watch for output coming from `process`."""
|
|
26 |
return [ |
|
27 |
process.stdout, |
|
28 |
process.stderr, |
|
29 |
]
|
|
30 |
||
31 |
||
32 |
def make_files_nonblocking(files): |
|
12274.1.7
by Jeroen Vermeulen
Documentation polish. |
33 |
"""Put each of `files` in non-blocking mode.
|
34 |
||
35 |
This allows the `CommandSpawner` to read all available output from a
|
|
36 |
process without blocking until the process completes.
|
|
37 |
"""
|
|
12274.1.4
by Jeroen Vermeulen
CommandSpawner. |
38 |
for this_file in files: |
39 |
fcntl(this_file, F_SETFL, fcntl(this_file, F_GETFL) | O_NONBLOCK) |
|
40 |
||
41 |
||
42 |
def has_pending_output(poll_event): |
|
43 |
"""Does the given event mask from `poll` indicate there's data to read?"""
|
|
44 |
input_mask = (select.POLLIN | select.POLLPRI) |
|
45 |
return (poll_event & input_mask) != 0 |
|
46 |
||
47 |
||
48 |
def has_terminated(poll_event): |
|
49 |
"""Does the given event mask from `poll` indicate process death?"""
|
|
50 |
death_mask = (select.POLLERR | select.POLLHUP | select.POLLNVAL) |
|
51 |
return (poll_event & death_mask) != 0 |
|
52 |
||
53 |
||
54 |
STDOUT = 1 |
|
55 |
STDERR = 2 |
|
56 |
COMPLETION = 3 |
|
57 |
||
58 |
||
59 |
class CommandSpawner: |
|
60 |
"""Simple manager to execute commands in parallel.
|
|
61 |
||
12274.1.7
by Jeroen Vermeulen
Documentation polish. |
62 |
Lets you run commands in sub-processes that will run simulaneously.
|
63 |
The CommandSpawner looks for output from the running processes, and
|
|
64 |
manages their cleanup.
|
|
65 |
||
66 |
The typical usage pattern is:
|
|
12274.1.4
by Jeroen Vermeulen
CommandSpawner. |
67 |
|
68 |
>>> spawner = CommandSpawner()
|
|
69 |
>>> spawner.start(["echo", "One parallel process"])
|
|
70 |
>>> spawner.start(["echo", "Another parallel process"])
|
|
71 |
>>> spawner.complete()
|
|
72 |
||
73 |
There are facilities for processing output and error output from the
|
|
12274.1.7
by Jeroen Vermeulen
Documentation polish. |
74 |
sub-processes, as well as dealing with success and failure. You can
|
75 |
pass callbacks to the `start` method, to be called when these events
|
|
76 |
occur.
|
|
12274.1.4
by Jeroen Vermeulen
CommandSpawner. |
77 |
|
78 |
As yet there is no facility for feeding input to the processes.
|
|
79 |
"""
|
|
80 |
||
81 |
def __init__(self): |
|
82 |
self.running_processes = {} |
|
83 |
self.poll = select.poll() |
|
84 |
||
85 |
def start(self, command, stdout_handler=None, stderr_handler=None, |
|
86 |
completion_handler=None): |
|
87 |
"""Run `command` in a sub-process.
|
|
88 |
||
12274.1.7
by Jeroen Vermeulen
Documentation polish. |
89 |
This starts the command, but does not wait for it to complete.
|
90 |
Instead of waiting for completion, you can pass handlers that
|
|
91 |
will be called when certain events occur.
|
|
92 |
||
93 |
:param command: Command line to execute in a sub-process. May be
|
|
12274.1.4
by Jeroen Vermeulen
CommandSpawner. |
94 |
either a string (for a single executable name) or a list of
|
95 |
strings (for an executable name plus arguments).
|
|
96 |
:param stdout_handler: Callback to handle output received from the
|
|
97 |
sub-process. Must take the output as its sole argument. May be
|
|
98 |
called any number of times as output comes in.
|
|
99 |
:param stderr_handler: Callback to handle error output received from
|
|
100 |
the sub-process. Must take the output as its sole argument. May
|
|
101 |
be called any number of times as output comes in.
|
|
7675.1116.19
by Jeroen Vermeulen
Use CommandSpawner for 'live' log output from execute() in generate-contents. |
102 |
:param completion_handler: Callback to be invoked, exactly once, when
|
103 |
the sub-process exits. Must take `command`'s numeric return code
|
|
104 |
as its sole argument.
|
|
12274.1.4
by Jeroen Vermeulen
CommandSpawner. |
105 |
"""
|
106 |
process = self._spawn(command) |
|
107 |
handlers = { |
|
108 |
STDOUT: stdout_handler, |
|
109 |
STDERR: stderr_handler, |
|
110 |
COMPLETION: completion_handler, |
|
111 |
}
|
|
112 |
self.running_processes[process] = handlers |
|
113 |
pipes = get_process_output_files(process) |
|
114 |
for pipe in pipes: |
|
115 |
self.poll.register(pipe, select.POLLIN | select.POLLPRI) |
|
116 |
make_files_nonblocking(pipes) |
|
117 |
||
118 |
def communicate(self): |
|
119 |
"""Execute one iteration of the main event loop. Blocks."""
|
|
120 |
# Poll for output, but also wake up periodically to check for
|
|
121 |
# completed processes.
|
|
122 |
milliseconds = 1 |
|
123 |
poll_result = self.poll.poll(milliseconds) |
|
124 |
||
125 |
# Map each file descriptor to its poll events.
|
|
126 |
events_by_fd = dict(poll_result) |
|
127 |
||
128 |
# Iterate over a copy of the processes list: we may be removing
|
|
129 |
# items from the original as processes complete.
|
|
130 |
processes = self.running_processes.keys() |
|
131 |
for process in processes: |
|
132 |
self._service(process, events_by_fd) |
|
133 |
if process.returncode is not None: |
|
134 |
# Process has completed. Remove it.
|
|
135 |
try: |
|
136 |
self._handle(process, COMPLETION, process.returncode) |
|
137 |
finally: |
|
138 |
for pipe in get_process_output_files(process): |
|
139 |
self.poll.unregister(pipe) |
|
140 |
del self.running_processes[process] |
|
141 |
||
142 |
def complete(self): |
|
143 |
"""Run `self.communicate` until all sub-processes have completed."""
|
|
144 |
while len(self.running_processes) > 0: |
|
145 |
self.communicate() |
|
146 |
||
147 |
def kill(self): |
|
12274.1.12
by Jeroen Vermeulen
Explain about zombie risk. |
148 |
"""Kill any remaining child processes.
|
149 |
||
150 |
You'll still need to call `complete` to make sure that the child
|
|
151 |
processes are cleaned up. Until then, they will stay around as
|
|
152 |
zombies.
|
|
153 |
"""
|
|
12274.1.4
by Jeroen Vermeulen
CommandSpawner. |
154 |
for process in self.running_processes.iterkeys(): |
155 |
process.terminate() |
|
156 |
||
157 |
def _spawn(self, command): |
|
158 |
"""Spawn a sub-process for `command`. Overridable in tests."""
|
|
159 |
return subprocess.Popen( |
|
160 |
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
|
161 |
close_fds=True) |
|
162 |
||
163 |
def _handle(self, process, event, *args): |
|
164 |
"""If we have a handler for `event` on `process`, call it."""
|
|
165 |
process_handlers = self.running_processes[process] |
|
166 |
handler = process_handlers.get(event) |
|
167 |
if handler is not None: |
|
168 |
handler(*args) |
|
169 |
||
170 |
def _read(self, process, pipe_file, event): |
|
171 |
"""Read output from `pipe_file`."""
|
|
172 |
try: |
|
173 |
output = pipe_file.read() |
|
174 |
except IOError, e: |
|
7675.1116.19
by Jeroen Vermeulen
Use CommandSpawner for 'live' log output from execute() in generate-contents. |
175 |
# "Resource temporarily unavailable"--not an error really,
|
176 |
# just means there's nothing to read.
|
|
12274.1.9
by Jeroen Vermeulen
Better spelling for errno 11: errno.EAGAIN. |
177 |
if e.errno != errno.EAGAIN: |
12274.1.4
by Jeroen Vermeulen
CommandSpawner. |
178 |
raise
|
179 |
else: |
|
180 |
if len(output) > 0: |
|
181 |
self._handle(process, event, output) |
|
182 |
||
183 |
def _service(self, process, events_by_fd): |
|
184 |
"""Service `process`."""
|
|
12274.1.7
by Jeroen Vermeulen
Documentation polish. |
185 |
stdout_events = events_by_fd.get(process.stdout.fileno(), 0) |
186 |
stderr_events = events_by_fd.get(process.stderr.fileno(), 0) |
|
12274.1.4
by Jeroen Vermeulen
CommandSpawner. |
187 |
if has_pending_output(stdout_events): |
12274.1.7
by Jeroen Vermeulen
Documentation polish. |
188 |
self._read(process, process.stdout, STDOUT) |
12274.1.4
by Jeroen Vermeulen
CommandSpawner. |
189 |
if has_pending_output(stderr_events): |
12274.1.7
by Jeroen Vermeulen
Documentation polish. |
190 |
self._read(process, process.stderr, STDERR) |
12274.1.4
by Jeroen Vermeulen
CommandSpawner. |
191 |
if has_terminated(stdout_events): |
192 |
process.wait() |
|
193 |
||
194 |
||
195 |
class OutputLineHandler: |
|
196 |
"""Collect and handle lines of output from a sub-process.
|
|
197 |
||
198 |
Output received from a sub-process may not be neatly broken down by
|
|
199 |
line. This class collects them into lines and processes them one by
|
|
200 |
one. If desired, it can also add a prefix to each.
|
|
201 |
"""
|
|
202 |
||
203 |
def __init__(self, line_processor, prefix=""): |
|
204 |
"""Set up an output line handler.
|
|
205 |
||
206 |
:param line_processor: A callback to be invoked for each line of
|
|
207 |
output received. Will receive exactly one argument: a single
|
|
208 |
nonempty line of text, without the trailing newline.
|
|
209 |
:param prefix: An optional string to be prefixed to each line of
|
|
210 |
output before it is sent into the `line_processor`.
|
|
211 |
"""
|
|
212 |
self.line_processor = line_processor |
|
213 |
self.prefix = prefix |
|
214 |
self.incomplete_buffer = "" |
|
215 |
||
216 |
def process_line(self, line): |
|
217 |
"""Process a single line of output."""
|
|
218 |
if line != "": |
|
219 |
self.line_processor("%s%s" % (self.prefix, line)) |
|
220 |
||
221 |
def __call__(self, output): |
|
222 |
"""Process multi-line output.
|
|
223 |
||
224 |
Any trailing text not (yet) terminated with a newline is buffered.
|
|
225 |
"""
|
|
226 |
lines = (self.incomplete_buffer + output).split("\n") |
|
12756.2.2
by William Grant
Even simpler. |
227 |
if len(lines) > 0: |
12274.1.4
by Jeroen Vermeulen
CommandSpawner. |
228 |
self.incomplete_buffer = lines[-1] |
12756.2.3
by William Grant
Better still. |
229 |
for line in lines[:-1]: |
230 |
self.process_line(line) |
|
12274.1.4
by Jeroen Vermeulen
CommandSpawner. |
231 |
|
232 |
def finalize(self): |
|
233 |
"""Process the remaining incomplete line, if any."""
|
|
234 |
if self.incomplete_buffer != "": |
|
235 |
self.process_line(self.incomplete_buffer) |
|
236 |
self.incomplete_buffer = "" |
|
237 |
||
238 |
||
239 |
class ReturnCodeReceiver: |
|
240 |
"""A minimal completion handler for `CommandSpawner` processes.
|
|
241 |
||
242 |
Does nothing but collect the command's return code.
|
|
243 |
||
244 |
:ivar returncode: The numerical return code retrieved from the
|
|
245 |
process. Stays None until the process completes.
|
|
246 |
"""
|
|
247 |
||
248 |
returncode = None |
|
249 |
||
250 |
def __call__(self, returncode): |
|
251 |
self.returncode = returncode |