~launchpad-pqm/launchpad/devel

12274.1.4 by Jeroen Vermeulen
CommandSpawner.
1
# Copyright 2011 Canonical Ltd.  This software is licensed under the
2
# GNU Affero General Public License version 3 (see the file LICENSE).
3
4
"""Execute commands in parallel sub-processes."""
5
6
__metaclass__ = type
7
__all__ = [
8
    'CommandSpawner',
9
    'OutputLineHandler',
10
    'ReturnCodeReceiver',
11
    ]
12
12274.1.9 by Jeroen Vermeulen
Better spelling for errno 11: errno.EAGAIN.
13
import errno
12274.1.4 by Jeroen Vermeulen
CommandSpawner.
14
from fcntl import (
15
    fcntl,
16
    F_GETFL,
17
    F_SETFL,
18
    )
19
from os import O_NONBLOCK
20
import select
21
import subprocess
22
23
24
def get_process_output_files(process):
25
    """Return the files we watch for output coming from `process`."""
26
    return [
27
        process.stdout,
28
        process.stderr,
29
        ]
30
31
32
def make_files_nonblocking(files):
12274.1.7 by Jeroen Vermeulen
Documentation polish.
33
    """Put each of `files` in non-blocking mode.
34
35
    This allows the `CommandSpawner` to read all available output from a
36
    process without blocking until the process completes.
37
    """
12274.1.4 by Jeroen Vermeulen
CommandSpawner.
38
    for this_file in files:
39
        fcntl(this_file, F_SETFL, fcntl(this_file, F_GETFL) | O_NONBLOCK)
40
41
42
def has_pending_output(poll_event):
43
    """Does the given event mask from `poll` indicate there's data to read?"""
44
    input_mask = (select.POLLIN | select.POLLPRI)
45
    return (poll_event & input_mask) != 0
46
47
48
def has_terminated(poll_event):
49
    """Does the given event mask from `poll` indicate process death?"""
50
    death_mask = (select.POLLERR | select.POLLHUP | select.POLLNVAL)
51
    return (poll_event & death_mask) != 0
52
53
54
STDOUT = 1
55
STDERR = 2
56
COMPLETION = 3
57
58
59
class CommandSpawner:
60
    """Simple manager to execute commands in parallel.
61
12274.1.7 by Jeroen Vermeulen
Documentation polish.
62
    Lets you run commands in sub-processes that will run simulaneously.
63
    The CommandSpawner looks for output from the running processes, and
64
    manages their cleanup.
65
66
    The typical usage pattern is:
12274.1.4 by Jeroen Vermeulen
CommandSpawner.
67
68
    >>> spawner = CommandSpawner()
69
    >>> spawner.start(["echo", "One parallel process"])
70
    >>> spawner.start(["echo", "Another parallel process"])
71
    >>> spawner.complete()
72
73
    There are facilities for processing output and error output from the
12274.1.7 by Jeroen Vermeulen
Documentation polish.
74
    sub-processes, as well as dealing with success and failure.  You can
75
    pass callbacks to the `start` method, to be called when these events
76
    occur.
12274.1.4 by Jeroen Vermeulen
CommandSpawner.
77
78
    As yet there is no facility for feeding input to the processes.
79
    """
80
81
    def __init__(self):
82
        self.running_processes = {}
83
        self.poll = select.poll()
84
85
    def start(self, command, stdout_handler=None, stderr_handler=None,
86
              completion_handler=None):
87
        """Run `command` in a sub-process.
88
12274.1.7 by Jeroen Vermeulen
Documentation polish.
89
        This starts the command, but does not wait for it to complete.
90
        Instead of waiting for completion, you can pass handlers that
91
        will be called when certain events occur.
92
93
        :param command: Command line to execute in a sub-process.  May be
12274.1.4 by Jeroen Vermeulen
CommandSpawner.
94
            either a string (for a single executable name) or a list of
95
            strings (for an executable name plus arguments).
96
        :param stdout_handler: Callback to handle output received from the
97
            sub-process.  Must take the output as its sole argument.  May be
98
            called any number of times as output comes in.
99
        :param stderr_handler: Callback to handle error output received from
100
            the sub-process.  Must take the output as its sole argument.  May
101
            be called any number of times as output comes in.
7675.1116.19 by Jeroen Vermeulen
Use CommandSpawner for 'live' log output from execute() in generate-contents.
102
        :param completion_handler: Callback to be invoked, exactly once, when
103
            the sub-process exits.  Must take `command`'s numeric return code
104
            as its sole argument.
12274.1.4 by Jeroen Vermeulen
CommandSpawner.
105
        """
106
        process = self._spawn(command)
107
        handlers = {
108
            STDOUT: stdout_handler,
109
            STDERR: stderr_handler,
110
            COMPLETION: completion_handler,
111
        }
112
        self.running_processes[process] = handlers
113
        pipes = get_process_output_files(process)
114
        for pipe in pipes:
115
            self.poll.register(pipe, select.POLLIN | select.POLLPRI)
116
        make_files_nonblocking(pipes)
117
118
    def communicate(self):
119
        """Execute one iteration of the main event loop.  Blocks."""
120
        # Poll for output, but also wake up periodically to check for
121
        # completed processes.
122
        milliseconds = 1
123
        poll_result = self.poll.poll(milliseconds)
124
125
        # Map each file descriptor to its poll events.
126
        events_by_fd = dict(poll_result)
127
128
        # Iterate over a copy of the processes list: we may be removing
129
        # items from the original as processes complete.
130
        processes = self.running_processes.keys()
131
        for process in processes:
132
            self._service(process, events_by_fd)
133
            if process.returncode is not None:
134
                # Process has completed.  Remove it.
135
                try:
136
                    self._handle(process, COMPLETION, process.returncode)
137
                finally:
138
                    for pipe in get_process_output_files(process):
139
                        self.poll.unregister(pipe)
140
                    del self.running_processes[process]
141
142
    def complete(self):
143
        """Run `self.communicate` until all sub-processes have completed."""
144
        while len(self.running_processes) > 0:
145
            self.communicate()
146
147
    def kill(self):
12274.1.12 by Jeroen Vermeulen
Explain about zombie risk.
148
        """Kill any remaining child processes.
149
150
        You'll still need to call `complete` to make sure that the child
151
        processes are cleaned up.  Until then, they will stay around as
152
        zombies.
153
        """
12274.1.4 by Jeroen Vermeulen
CommandSpawner.
154
        for process in self.running_processes.iterkeys():
155
            process.terminate()
156
157
    def _spawn(self, command):
158
        """Spawn a sub-process for `command`.  Overridable in tests."""
159
        return subprocess.Popen(
160
            command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
161
            close_fds=True)
162
163
    def _handle(self, process, event, *args):
164
        """If we have a handler for `event` on `process`, call it."""
165
        process_handlers = self.running_processes[process]
166
        handler = process_handlers.get(event)
167
        if handler is not None:
168
            handler(*args)
169
170
    def _read(self, process, pipe_file, event):
171
        """Read output from `pipe_file`."""
172
        try:
173
            output = pipe_file.read()
174
        except IOError, e:
7675.1116.19 by Jeroen Vermeulen
Use CommandSpawner for 'live' log output from execute() in generate-contents.
175
            # "Resource temporarily unavailable"--not an error really,
176
            # just means there's nothing to read.
12274.1.9 by Jeroen Vermeulen
Better spelling for errno 11: errno.EAGAIN.
177
            if e.errno != errno.EAGAIN:
12274.1.4 by Jeroen Vermeulen
CommandSpawner.
178
                raise
179
        else:
180
            if len(output) > 0:
181
                self._handle(process, event, output)
182
183
    def _service(self, process, events_by_fd):
184
        """Service `process`."""
12274.1.7 by Jeroen Vermeulen
Documentation polish.
185
        stdout_events = events_by_fd.get(process.stdout.fileno(), 0)
186
        stderr_events = events_by_fd.get(process.stderr.fileno(), 0)
12274.1.4 by Jeroen Vermeulen
CommandSpawner.
187
        if has_pending_output(stdout_events):
12274.1.7 by Jeroen Vermeulen
Documentation polish.
188
            self._read(process, process.stdout, STDOUT)
12274.1.4 by Jeroen Vermeulen
CommandSpawner.
189
        if has_pending_output(stderr_events):
12274.1.7 by Jeroen Vermeulen
Documentation polish.
190
            self._read(process, process.stderr, STDERR)
12274.1.4 by Jeroen Vermeulen
CommandSpawner.
191
        if has_terminated(stdout_events):
192
            process.wait()
193
194
195
class OutputLineHandler:
196
    """Collect and handle lines of output from a sub-process.
197
198
    Output received from a sub-process may not be neatly broken down by
199
    line.  This class collects them into lines and processes them one by
200
    one.  If desired, it can also add a prefix to each.
201
    """
202
203
    def __init__(self, line_processor, prefix=""):
204
        """Set up an output line handler.
205
206
        :param line_processor: A callback to be invoked for each line of
207
            output received.  Will receive exactly one argument: a single
208
            nonempty line of text, without the trailing newline.
209
        :param prefix: An optional string to be prefixed to each line of
210
            output before it is sent into the `line_processor`.
211
        """
212
        self.line_processor = line_processor
213
        self.prefix = prefix
214
        self.incomplete_buffer = ""
215
216
    def process_line(self, line):
217
        """Process a single line of output."""
218
        if line != "":
219
            self.line_processor("%s%s" % (self.prefix, line))
220
221
    def __call__(self, output):
222
        """Process multi-line output.
223
224
        Any trailing text not (yet) terminated with a newline is buffered.
225
        """
226
        lines = (self.incomplete_buffer + output).split("\n")
12756.2.2 by William Grant
Even simpler.
227
        if len(lines) > 0:
12274.1.4 by Jeroen Vermeulen
CommandSpawner.
228
            self.incomplete_buffer = lines[-1]
12756.2.3 by William Grant
Better still.
229
            for line in lines[:-1]:
230
                self.process_line(line)
12274.1.4 by Jeroen Vermeulen
CommandSpawner.
231
232
    def finalize(self):
233
        """Process the remaining incomplete line, if any."""
234
        if self.incomplete_buffer != "":
235
            self.process_line(self.incomplete_buffer)
236
            self.incomplete_buffer = ""
237
238
239
class ReturnCodeReceiver:
240
    """A minimal completion handler for `CommandSpawner` processes.
241
242
    Does nothing but collect the command's return code.
243
244
    :ivar returncode: The numerical return code retrieved from the
245
        process.  Stays None until the process completes.
246
    """
247
248
    returncode = None
249
250
    def __call__(self, returncode):
251
        self.returncode = returncode