8687.15.17
by Karl Fogel
Add the copyright header block to the rest of the files under lib/lp/. |
1 |
# Copyright 2009 Canonical Ltd. This software is licensed under the
|
2 |
# GNU Affero General Public License version 3 (see the file LICENSE).
|
|
3 |
||
5977.5.6
by Jonathan Lange
Apply Barry's review comments. |
4 |
# pylint: disable-msg=W0702
|
5138.7.8
by jml at canonical
Conform to coding conventions (copyright statement and __all__) |
5 |
|
6 |
__metaclass__ = type |
|
6876.6.2
by Jonathan Lange
A couple of formatting fixes. |
7 |
__all__ = [ |
8 |
'BadMessage', |
|
9 |
'JobScheduler', |
|
10 |
'LockError', |
|
11 |
'PullerMaster', |
|
12 |
'PullerMonitorProtocol', |
|
13 |
]
|
|
5138.7.12
by Michael Hudson
mini review and make lint fixes |
14 |
|
2770.1.55
by Guilherme Salgado
Copy jblack's supermirror pull script files into lib/canonical/launchpad/scripts/supermirror |
15 |
import os |
11403.1.4
by Henning Eggers
Reformatted imports using format-imports script r32. |
16 |
import socket |
4898.2.32
by Jonathan Lange
Add tests and some implementation for what happens when things go wrong in the protocol. |
17 |
from StringIO import StringIO |
11403.1.4
by Henning Eggers
Reformatted imports using format-imports script r32. |
18 |
|
19 |
from contrib.glock import ( |
|
20 |
GlobalLock, |
|
21 |
LockAlreadyAcquired, |
|
22 |
)
|
|
23 |
from twisted.internet import ( |
|
24 |
defer, |
|
25 |
error, |
|
26 |
reactor, |
|
27 |
)
|
|
28 |
from twisted.protocols.basic import ( |
|
29 |
NetstringParseError, |
|
30 |
NetstringReceiver, |
|
31 |
)
|
|
32 |
from twisted.python import ( |
|
33 |
failure, |
|
34 |
log, |
|
35 |
)
|
|
4792.2.3
by David Allouche
Define BranchToMirror.traverse_references. |
36 |
|
11403.1.4
by Henning Eggers
Reformatted imports using format-imports script r32. |
37 |
from canonical.config import config |
38 |
from canonical.launchpad.webapp import errorlog |
|
9590.1.73
by Michael Hudson
tweaks to get the puller acceptance tests running |
39 |
from lp.code.interfaces.codehosting import LAUNCHPAD_SERVICES |
8426.6.1
by Michael Hudson
bzr ls --versioned --recursive --kind=file | xargs sed -i -e 's,from canonical.codehosting,from lp.codehosting,' |
40 |
from lp.codehosting.puller import get_lock_id_for_branch_id |
11403.1.4
by Henning Eggers
Reformatted imports using format-imports script r32. |
41 |
from lp.codehosting.puller.worker import get_canonical_url_for_branch_name |
10548.1.1
by Jonathan Lange
Move twistedsupport to lp.services |
42 |
from lp.services.twistedsupport.processmonitor import ( |
11403.1.4
by Henning Eggers
Reformatted imports using format-imports script r32. |
43 |
ProcessMonitorProtocolWithTimeout, |
44 |
)
|
|
10548.1.1
by Jonathan Lange
Move twistedsupport to lp.services |
45 |
from lp.services.twistedsupport.task import ( |
11403.1.4
by Henning Eggers
Reformatted imports using format-imports script r32. |
46 |
ParallelLimitedTaskConsumer, |
47 |
PollingTaskSource, |
|
48 |
)
|
|
2770.1.55
by Guilherme Salgado
Copy jblack's supermirror pull script files into lib/canonical/launchpad/scripts/supermirror |
49 |
|
5977.5.6
by Jonathan Lange
Apply Barry's review comments. |
50 |
|
4898.2.51
by Jonathan Lange
Add tests for handling unexpected failures. |
51 |
class BadMessage(Exception): |
52 |
"""Raised when the protocol receives a message that we don't recognize."""
|
|
53 |
||
54 |
def __init__(self, bad_netstring): |
|
55 |
Exception.__init__( |
|
56 |
self, 'Received unrecognized message: %r' % bad_netstring) |
|
57 |
||
58 |
||
6614.2.10
by Jonathan Lange
Merge in the revert, but leave text unchanged. |
59 |
class UnexpectedStderr(Exception): |
60 |
"""Raised when the worker prints to stderr."""
|
|
61 |
||
62 |
def __init__(self, stderr): |
|
63 |
if stderr: |
|
64 |
last_line = stderr.splitlines()[-1] |
|
65 |
else: |
|
66 |
last_line = stderr |
|
67 |
Exception.__init__( |
|
68 |
self, "Unexpected standard error from subprocess: %s" % last_line) |
|
69 |
self.error = stderr |
|
70 |
||
71 |
||
5861.1.16
by Michael Hudson
split out netstring part of pullermasterprotocol |
72 |
class PullerWireProtocol(NetstringReceiver): |
73 |
"""The wire protocol for receiving events from the puller worker.
|
|
74 |
||
75 |
The wire-level protocol is a series of netstrings.
|
|
76 |
||
77 |
At the next level up, the protocol consists of messages which each look
|
|
78 |
like this::
|
|
79 |
||
80 |
[method-name] [number-of-arguments] [arguments]+
|
|
81 |
||
82 |
Thus the instance is always in one of three states::
|
|
83 |
||
84 |
[0] Waiting for command name.
|
|
85 |
[1] Waiting for argument count.
|
|
86 |
[2] Waiting for an argument.
|
|
87 |
||
88 |
In state [0], we are waiting for a command name. When we get one, we
|
|
89 |
store it in self._current_command and move into state [1].
|
|
90 |
||
91 |
In state [1], we are waiting for an argument count. When we receive a
|
|
92 |
message, we try to convert it to an integer. If we fail in this, we call
|
|
93 |
unexpectedError(). Otherwise, if it's greater than zero, we store the
|
|
94 |
number in self._expected_args and go into state [2] or if it's zero
|
|
95 |
execute the command (see below).
|
|
96 |
||
97 |
In state [2], we are waiting for an argument. When we receive one, we
|
|
98 |
append it to self._current_args. If len(self._current_args) ==
|
|
99 |
self._expected_args, execute the command.
|
|
100 |
||
5861.1.20
by Michael Hudson
final review comments |
101 |
"Executing the command" means looking for a method called
|
102 |
do_<command name> on self.puller_protocol and calling it with
|
|
103 |
*self._current_args. If this raises, call
|
|
104 |
self.puller_protocol.unexpectedError().
|
|
5861.1.16
by Michael Hudson
split out netstring part of pullermasterprotocol |
105 |
|
106 |
The method _resetState() forces us back into state [0].
|
|
107 |
"""
|
|
108 |
||
5861.1.20
by Michael Hudson
final review comments |
109 |
def __init__(self, puller_protocol): |
110 |
self.puller_protocol = puller_protocol |
|
5861.1.17
by Michael Hudson
some fixes, still some failures too |
111 |
self._resetState() |
5861.1.16
by Michael Hudson
split out netstring part of pullermasterprotocol |
112 |
|
113 |
def dataReceived(self, data): |
|
114 |
"""See `NetstringReceiver.dataReceived`."""
|
|
115 |
NetstringReceiver.dataReceived(self, data) |
|
116 |
# XXX: JonathanLange 2007-10-16
|
|
117 |
# bug=http://twistedmatrix.com/trac/ticket/2851: There are no hooks in
|
|
118 |
# NetstringReceiver to catch a NetstringParseError. The best we can do
|
|
119 |
# is check the value of brokenPeer.
|
|
120 |
if self.brokenPeer: |
|
5861.1.20
by Michael Hudson
final review comments |
121 |
self.puller_protocol.unexpectedError( |
5861.1.16
by Michael Hudson
split out netstring part of pullermasterprotocol |
122 |
failure.Failure(NetstringParseError(data))) |
123 |
||
124 |
def stringReceived(self, line): |
|
125 |
"""See `NetstringReceiver.stringReceived`."""
|
|
126 |
if (self._current_command is not None |
|
127 |
and self._expected_args is not None): |
|
128 |
# state [2]
|
|
129 |
self._current_args.append(line) |
|
130 |
elif self._current_command is not None: |
|
131 |
# state [1]
|
|
132 |
try: |
|
133 |
self._expected_args = int(line) |
|
134 |
except ValueError: |
|
5861.1.20
by Michael Hudson
final review comments |
135 |
self.puller_protocol.unexpectedError(failure.Failure()) |
5861.1.16
by Michael Hudson
split out netstring part of pullermasterprotocol |
136 |
else: |
137 |
# state [0]
|
|
5861.1.20
by Michael Hudson
final review comments |
138 |
if getattr(self.puller_protocol, 'do_%s' % line, None) is None: |
139 |
self.puller_protocol.unexpectedError( |
|
5861.1.16
by Michael Hudson
split out netstring part of pullermasterprotocol |
140 |
failure.Failure(BadMessage(line))) |
141 |
else: |
|
142 |
self._current_command = line |
|
143 |
||
144 |
if len(self._current_args) == self._expected_args: |
|
145 |
# Execute the command.
|
|
146 |
method = getattr( |
|
5861.1.20
by Michael Hudson
final review comments |
147 |
self.puller_protocol, 'do_%s' % self._current_command) |
5861.1.16
by Michael Hudson
split out netstring part of pullermasterprotocol |
148 |
try: |
149 |
try: |
|
150 |
method(*self._current_args) |
|
151 |
except: |
|
5861.1.20
by Michael Hudson
final review comments |
152 |
self.puller_protocol.unexpectedError(failure.Failure()) |
5861.1.16
by Michael Hudson
split out netstring part of pullermasterprotocol |
153 |
finally: |
154 |
self._resetState() |
|
155 |
||
156 |
def _resetState(self): |
|
157 |
"""Force into the 'waiting for command' state."""
|
|
158 |
self._current_command = None |
|
159 |
self._expected_args = None |
|
160 |
self._current_args = [] |
|
161 |
||
162 |
||
5861.1.15
by Michael Hudson
respond to some review comments |
163 |
class PullerMonitorProtocol(ProcessMonitorProtocolWithTimeout, |
164 |
NetstringReceiver): |
|
4898.2.67
by jml at canonical
Add some comments and docstrings. |
165 |
"""The protocol for receiving events from the puller worker."""
|
4898.2.8
by Jonathan Lange
Timeout the process after a certain period of time. |
166 |
|
5138.7.4
by jml at canonical
Add a timeout to the protocol. This timeout covers the entire connection |
167 |
def __init__(self, deferred, listener, clock=None): |
4898.2.67
by jml at canonical
Add some comments and docstrings. |
168 |
"""Construct an instance of the protocol, for listening to a worker.
|
169 |
||
170 |
:param deferred: A Deferred that will be fired when the worker has
|
|
171 |
finished (either successfully or unsuccesfully).
|
|
172 |
:param listener: A PullerMaster object that is notified when the
|
|
173 |
protocol receives events from the worker.
|
|
5138.7.12
by Michael Hudson
mini review and make lint fixes |
174 |
:param clock: A provider of Twisted's IReactorTime. This parameter
|
5138.7.17
by Michael Hudson
review comments |
175 |
exists to allow testing that does not depend on an external clock.
|
176 |
If a clock is not passed in explicitly the reactor is used.
|
|
4898.2.67
by jml at canonical
Add some comments and docstrings. |
177 |
"""
|
5861.1.3
by Michael Hudson
Docstrings and tests for ProcessMonitorProtocol and |
178 |
ProcessMonitorProtocolWithTimeout.__init__( |
179 |
self, deferred, config.supermirror.worker_timeout, clock) |
|
5412.1.1
by Michael Hudson
1) record subprocess stderr in the oops, if any |
180 |
self.reported_mirror_finished = False |
4898.2.17
by Jonathan Lange
Implement the server-side protocol. |
181 |
self.listener = listener |
5861.1.20
by Michael Hudson
final review comments |
182 |
self.wire_protocol = PullerWireProtocol(self) |
4898.2.32
by Jonathan Lange
Add tests and some implementation for what happens when things go wrong in the protocol. |
183 |
self._stderr = StringIO() |
5861.1.3
by Michael Hudson
Docstrings and tests for ProcessMonitorProtocol and |
184 |
self._deferred.addCallbacks( |
5861.1.15
by Michael Hudson
respond to some review comments |
185 |
self.checkReportingFinishedAndNoStderr, |
186 |
self.ensureReportingFinished) |
|
187 |
||
5861.1.16
by Michael Hudson
split out netstring part of pullermasterprotocol |
188 |
def reportMirrorFinished(self, ignored): |
189 |
self.reported_mirror_finished = True |
|
190 |
||
5861.1.15
by Michael Hudson
respond to some review comments |
191 |
def checkReportingFinishedAndNoStderr(self, result): |
192 |
"""Check that the worker process behaved properly on clean exit.
|
|
193 |
||
194 |
When the process exits cleanly, we expect it to have not printed
|
|
195 |
anything to stderr and to have reported success or failure. If it has
|
|
196 |
failed to do either of these things, we should fail noisily."""
|
|
5861.1.4
by Michael Hudson
fix one TestPullerMasterProtocol test, delete some others |
197 |
stderr = self._stderr.getvalue() |
198 |
if stderr: |
|
6614.2.10
by Jonathan Lange
Merge in the revert, but leave text unchanged. |
199 |
fail = failure.Failure(UnexpectedStderr(stderr)) |
5861.1.4
by Michael Hudson
fix one TestPullerMasterProtocol test, delete some others |
200 |
fail.error = stderr |
201 |
return fail |
|
5861.1.3
by Michael Hudson
Docstrings and tests for ProcessMonitorProtocol and |
202 |
if not self.reported_mirror_finished: |
203 |
raise AssertionError('Process exited successfully without ' |
|
204 |
'reporting success or failure?') |
|
205 |
return result |
|
206 |
||
207 |
def ensureReportingFinished(self, reason): |
|
5861.1.15
by Michael Hudson
respond to some review comments |
208 |
"""Clean up after the worker process exits uncleanly.
|
209 |
||
210 |
If the worker process exited uncleanly, it probably didn't report
|
|
211 |
success or failure, so we should report failure. If there was output
|
|
212 |
on stderr, it's probably a traceback, so we use the last line of that
|
|
6614.2.10
by Jonathan Lange
Merge in the revert, but leave text unchanged. |
213 |
as a failure reason.
|
214 |
"""
|
|
5861.1.3
by Michael Hudson
Docstrings and tests for ProcessMonitorProtocol and |
215 |
if not self.reported_mirror_finished: |
5977.5.6
by Jonathan Lange
Apply Barry's review comments. |
216 |
stderr = self._stderr.getvalue() |
217 |
reason.error = stderr |
|
6614.2.10
by Jonathan Lange
Merge in the revert, but leave text unchanged. |
218 |
if stderr: |
5977.5.6
by Jonathan Lange
Apply Barry's review comments. |
219 |
errorline = stderr.splitlines()[-1] |
5861.1.3
by Michael Hudson
Docstrings and tests for ProcessMonitorProtocol and |
220 |
else: |
5861.1.2
by Michael Hudson
use processmonitorprotocol in the scheduler implementation, and make all the |
221 |
errorline = str(reason.value) |
5861.1.15
by Michael Hudson
respond to some review comments |
222 |
# The general policy when multiple errors occur is to report the
|
223 |
# one that happens first and as an error has already happened here
|
|
224 |
# (the process exiting uncleanly) we can only log.err() any
|
|
225 |
# failure that comes from mirrorFailed failing. In any case, we
|
|
226 |
# just pass along the failure.
|
|
227 |
report_failed_deferred = defer.maybeDeferred( |
|
5861.1.2
by Michael Hudson
use processmonitorprotocol in the scheduler implementation, and make all the |
228 |
self.listener.mirrorFailed, errorline, None) |
5861.1.15
by Michael Hudson
respond to some review comments |
229 |
report_failed_deferred.addErrback(log.err) |
230 |
return report_failed_deferred.addCallback( |
|
231 |
lambda result: reason) |
|
5861.1.2
by Michael Hudson
use processmonitorprotocol in the scheduler implementation, and make all the |
232 |
else: |
233 |
return reason |
|
234 |
||
12181.2.3
by Jonathan Lange
Call out to makeConnection on the underlying wire protocol so that it's |
235 |
def makeConnection(self, process): |
236 |
"""Called when the process has been created."""
|
|
237 |
ProcessMonitorProtocolWithTimeout.makeConnection(self, process) |
|
238 |
NetstringReceiver.makeConnection(self, process) |
|
239 |
self.wire_protocol.makeConnection(process) |
|
240 |
||
5861.1.16
by Michael Hudson
split out netstring part of pullermasterprotocol |
241 |
def outReceived(self, data): |
5861.1.20
by Michael Hudson
final review comments |
242 |
self.wire_protocol.dataReceived(data) |
5861.1.16
by Michael Hudson
split out netstring part of pullermasterprotocol |
243 |
|
244 |
def errReceived(self, data): |
|
245 |
self._stderr.write(data) |
|
5861.1.15
by Michael Hudson
respond to some review comments |
246 |
|
4898.2.26
by Jonathan Lange
Refactor server-side protocol to be more generic. Restore '_pythonpath' import |
247 |
def do_startMirroring(self): |
5138.7.5
by jml at canonical
Reset the timeout when we get interesting progress. |
248 |
self.resetTimeout() |
5861.1.2
by Michael Hudson
use processmonitorprotocol in the scheduler implementation, and make all the |
249 |
self.runNotification(self.listener.startMirroring) |
4898.2.26
by Jonathan Lange
Refactor server-side protocol to be more generic. Restore '_pythonpath' import |
250 |
|
9590.1.71
by Michael Hudson
merge the actual changes to the puller from the next pipe |
251 |
def do_branchChanged(self, stacked_on_url, revid_before, revid_after, |
252 |
control_string, branch_string, repository_string): |
|
253 |
def branchChanged(): |
|
5861.1.2
by Michael Hudson
use processmonitorprotocol in the scheduler implementation, and make all the |
254 |
d = defer.maybeDeferred( |
9590.1.71
by Michael Hudson
merge the actual changes to the puller from the next pipe |
255 |
self.listener.branchChanged, stacked_on_url, revid_before, |
256 |
revid_after, control_string, branch_string, repository_string) |
|
5861.1.2
by Michael Hudson
use processmonitorprotocol in the scheduler implementation, and make all the |
257 |
d.addCallback(self.reportMirrorFinished) |
258 |
return d |
|
9590.1.71
by Michael Hudson
merge the actual changes to the puller from the next pipe |
259 |
self.runNotification(branchChanged) |
4898.2.26
by Jonathan Lange
Refactor server-side protocol to be more generic. Restore '_pythonpath' import |
260 |
|
4898.2.27
by Jonathan Lange
Send and handle an extra parameter to mirrorFailed in the protocol. The extra |
261 |
def do_mirrorFailed(self, reason, oops): |
5861.1.2
by Michael Hudson
use processmonitorprotocol in the scheduler implementation, and make all the |
262 |
def mirrorFailed(): |
263 |
d = defer.maybeDeferred( |
|
264 |
self.listener.mirrorFailed, reason, oops) |
|
265 |
d.addCallback(self.reportMirrorFinished) |
|
266 |
return d |
|
267 |
self.runNotification(mirrorFailed) |
|
4898.2.7
by Jonathan Lange
Mirror multiple branches in subprocesses, limited by a semaphore |
268 |
|
5138.7.3
by jml at canonical
Remove indicator from progressMade in protocol, not needed yet. |
269 |
def do_progressMade(self): |
5138.7.17
by Michael Hudson
review comments |
270 |
"""Any progress resets the timout counter."""
|
5138.7.5
by jml at canonical
Reset the timeout when we get interesting progress. |
271 |
self.resetTimeout() |
5138.7.2
by jml at canonical
Extend the scheduler protocol to support progress indicator. |
272 |
|
7109.2.1
by Michael Hudson
add the ability to log from the puller worker |
273 |
def do_log(self, message): |
274 |
self.listener.log(message) |
|
275 |
||
4898.2.7
by Jonathan Lange
Mirror multiple branches in subprocesses, limited by a semaphore |
276 |
|
4898.2.42
by Jonathan Lange
scheduler.BranchToMirror -> scheduler.PullerMaster |
277 |
class PullerMaster: |
4898.2.69
by jml at canonical
More docstrings and remove the unused _recordOops |
278 |
"""Controller for a single puller worker.
|
279 |
||
280 |
The `PullerMaster` kicks off a child worker process and handles the events
|
|
281 |
generated by that process.
|
|
282 |
"""
|
|
4898.2.23
by Jonathan Lange
Move event listener into separate class. |
283 |
|
9590.1.34
by Michael Hudson
use mirrored branches in the puller acceptance tests |
284 |
path_to_script = os.path.join(config.root, 'scripts/mirror-branch.py') |
5861.1.15
by Michael Hudson
respond to some review comments |
285 |
protocol_class = PullerMonitorProtocol |
5283.7.1
by Michael Hudson
test and implementation for locking the branch with a prescribed lock id |
286 |
|
8721.2.7
by Michael Hudson
some simplifications and a pretty obscure hack to exit if no jobs are present on startup |
287 |
def __init__(self, branch_id, source_url, unique_name, branch_type_name, |
14104.6.23
by Robert Collins
Nuke setOopsToken unneeded in a concurrency safe world. |
288 |
default_stacked_on_url, logger, client): |
4898.2.69
by jml at canonical
More docstrings and remove the unused _recordOops |
289 |
"""Construct a PullerMaster object.
|
290 |
||
291 |
:param branch_id: The database ID of the branch to be mirrored.
|
|
292 |
:param source_url: The location from which the branch is to be
|
|
293 |
mirrored.
|
|
294 |
:param unique_name: The unique name of the branch to be mirrored.
|
|
9590.1.24
by Michael Hudson
start hacking code out of the puller |
295 |
:param branch_type_name: The name of the BranchType of the branch to
|
296 |
be mirrored (e.g. 'MIRRORED').
|
|
6999.4.38
by Jonathan Lange
Call it default_stacked_on_url, because we're including the leading |
297 |
:param default_stacked_on_url: The default stacked-on URL for the
|
6999.4.55
by Jonathan Lange
Don't mess about with None as an acceptable value for default_stacked_on_url. |
298 |
product that the branch is in. '' implies that there is no such
|
299 |
default.
|
|
4898.2.69
by jml at canonical
More docstrings and remove the unused _recordOops |
300 |
:param logger: A Python logging object.
|
301 |
:param client: An asynchronous client for the branch status XML-RPC
|
|
302 |
service.
|
|
303 |
"""
|
|
4898.2.23
by Jonathan Lange
Move event listener into separate class. |
304 |
self.branch_id = branch_id |
305 |
self.source_url = source_url.strip() |
|
9590.1.24
by Michael Hudson
start hacking code out of the puller |
306 |
self.destination_url = 'lp-internal:///%s' % (unique_name,) |
4898.2.23
by Jonathan Lange
Move event listener into separate class. |
307 |
self.unique_name = unique_name |
8721.2.7
by Michael Hudson
some simplifications and a pretty obscure hack to exit if no jobs are present on startup |
308 |
self.branch_type_name = branch_type_name |
6999.4.38
by Jonathan Lange
Call it default_stacked_on_url, because we're including the leading |
309 |
self.default_stacked_on_url = default_stacked_on_url |
4898.2.23
by Jonathan Lange
Move event listener into separate class. |
310 |
self.logger = logger |
9590.1.49
by Michael Hudson
more combining the puller and filesystem endpoints |
311 |
self.codehosting_endpoint = client |
5343.2.7
by jml at canonical
Use cachedproperty and fiddle things around to be attributes. Not sure I |
312 |
|
313 |
def mirror(self): |
|
5294.2.13
by Michael Hudson
put a docstring back |
314 |
"""Spawn a worker process to mirror a branch."""
|
4898.2.23
by Jonathan Lange
Move event listener into separate class. |
315 |
deferred = defer.Deferred() |
5861.1.15
by Michael Hudson
respond to some review comments |
316 |
protocol = self.protocol_class(deferred, self) |
8554.1.3
by Michael Hudson
remove all uses of sys.executable in lp.codehosting |
317 |
interpreter = '%s/bin/py' % config.root |
4898.2.23
by Jonathan Lange
Move event listener into separate class. |
318 |
command = [ |
8554.1.3
by Michael Hudson
remove all uses of sys.executable in lp.codehosting |
319 |
interpreter, self.path_to_script, self.source_url, |
5990.2.1
by Jonathan Lange
We can't pass unicode to the command line. Stringify the branch name. |
320 |
self.destination_url, str(self.branch_id), str(self.unique_name), |
14104.6.23
by Robert Collins
Nuke setOopsToken unneeded in a concurrency safe world. |
321 |
self.branch_type_name, |
6999.4.38
by Jonathan Lange
Call it default_stacked_on_url, because we're including the leading |
322 |
self.default_stacked_on_url] |
7167.9.10
by Michael Hudson
victorrrrrrrrrrrrrrrrrry! |
323 |
self.logger.debug("executing %s", command) |
5283.7.1
by Michael Hudson
test and implementation for locking the branch with a prescribed lock id |
324 |
env = os.environ.copy() |
5294.2.8
by Michael Hudson
OOMPH! |
325 |
env['BZR_EMAIL'] = get_lock_id_for_branch_id(self.branch_id) |
8554.1.3
by Michael Hudson
remove all uses of sys.executable in lp.codehosting |
326 |
reactor.spawnProcess(protocol, interpreter, command, env=env) |
4898.2.23
by Jonathan Lange
Move event listener into separate class. |
327 |
return deferred |
328 |
||
5343.2.7
by jml at canonical
Use cachedproperty and fiddle things around to be attributes. Not sure I |
329 |
def run(self): |
5343.2.8
by jml at canonical
Factor out common logic in errorlog and add a lot more docstrings to schduler. |
330 |
"""Launch a child worker and mirror a branch, handling errors.
|
331 |
||
332 |
This is the main method to call to mirror a branch.
|
|
7675.194.2
by Jonathan Lange
Docs |
333 |
|
334 |
:return: A Deferred that fires when the mirroring job is completed,
|
|
335 |
one way or the other. It will never fire with a failure. The value
|
|
336 |
of the Deferred itself is uninteresting (probably None).
|
|
5343.2.8
by jml at canonical
Factor out common logic in errorlog and add a lot more docstrings to schduler. |
337 |
"""
|
5343.2.7
by jml at canonical
Use cachedproperty and fiddle things around to be attributes. Not sure I |
338 |
deferred = self.mirror() |
4898.2.59
by Jonathan Lange
Record OOPSes for unexpected exceptions in the master. |
339 |
deferred.addErrback(self.unexpectedError) |
340 |
return deferred |
|
341 |
||
4898.2.23
by Jonathan Lange
Move event listener into separate class. |
342 |
def startMirroring(self): |
4898.2.24
by Jonathan Lange
Move lots of the logging out of the child process. |
343 |
self.logger.info( |
9587.3.1
by Michael Hudson
one less xmlrpc call |
344 |
'Worker started on branch %d: %s to %s', self.branch_id, |
345 |
self.source_url, self.destination_url) |
|
4898.2.23
by Jonathan Lange
Move event listener into separate class. |
346 |
|
4898.2.28
by Jonathan Lange
Actually pass the oops id along to the event listener. |
347 |
def mirrorFailed(self, reason, oops): |
4898.2.29
by Jonathan Lange
No more logging in the child process. |
348 |
self.logger.info('Recorded %s', oops) |
4898.2.24
by Jonathan Lange
Move lots of the logging out of the child process. |
349 |
self.logger.info('Recorded failure: %s', str(reason)) |
9590.1.49
by Michael Hudson
more combining the puller and filesystem endpoints |
350 |
return self.codehosting_endpoint.callRemote( |
6971.2.1
by Jonathan Lange
Get rid of BranchStatusClient. This should avoid the attribute error we |
351 |
'mirrorFailed', self.branch_id, reason) |
4898.2.23
by Jonathan Lange
Move event listener into separate class. |
352 |
|
9590.1.71
by Michael Hudson
merge the actual changes to the puller from the next pipe |
353 |
def branchChanged(self, stacked_on_url, revid_before, revid_after, |
354 |
control_string, branch_string, repository_string): |
|
10067.2.1
by Michael Hudson
log revid from before the puller runs |
355 |
if revid_before == revid_after: |
356 |
was_noop = 'noop' |
|
357 |
else: |
|
358 |
was_noop = 'non-trivial' |
|
9586.1.1
by Tim Penhey
More logging on success. |
359 |
self.logger.info( |
10067.2.1
by Michael Hudson
log revid from before the puller runs |
360 |
'Successfully mirrored %s branch %d %s to %s to from rev %s to %s' |
361 |
' (%s)', self.branch_type_name, self.branch_id, self.source_url, |
|
362 |
self.destination_url, revid_before, revid_after, was_noop) |
|
9590.1.49
by Michael Hudson
more combining the puller and filesystem endpoints |
363 |
return self.codehosting_endpoint.callRemote( |
9590.1.73
by Michael Hudson
tweaks to get the puller acceptance tests running |
364 |
'branchChanged', LAUNCHPAD_SERVICES, self.branch_id, |
365 |
stacked_on_url, revid_after, control_string, branch_string, |
|
366 |
repository_string) |
|
4898.2.23
by Jonathan Lange
Move event listener into separate class. |
367 |
|
7109.2.1
by Michael Hudson
add the ability to log from the puller worker |
368 |
def log(self, message): |
369 |
self.logger.info('From worker: %s', message) |
|
370 |
||
13686.2.17
by Robert Collins
Test failure fixups. Most innocuous, the only complex one is dropping now from unexpectedError - but it was all driven by tests wanting that, vs production code. |
371 |
def unexpectedError(self, failure): |
4898.2.59
by Jonathan Lange
Record OOPSes for unexpected exceptions in the master. |
372 |
request = errorlog.ScriptRequest([ |
373 |
('branch_id', self.branch_id), |
|
374 |
('source', self.source_url), |
|
375 |
('dest', self.destination_url), |
|
376 |
('error-explanation', failure.getErrorMessage())]) |
|
5121.3.1
by jml at canonical
Add a test to exercise unexpected error handling in scheduler and fix |
377 |
request.URL = get_canonical_url_for_branch_name(self.unique_name) |
6614.2.10
by Jonathan Lange
Merge in the revert, but leave text unchanged. |
378 |
# If the sub-process exited abnormally, the stderr it produced is
|
5412.1.2
by Michael Hudson
comments |
379 |
# probably a much more interesting traceback than the one attached to
|
380 |
# the Failure we've been passed.
|
|
5412.1.1
by Michael Hudson
1) record subprocess stderr in the oops, if any |
381 |
tb = None |
6614.2.10
by Jonathan Lange
Merge in the revert, but leave text unchanged. |
382 |
if failure.check(error.ProcessTerminated, UnexpectedStderr): |
5412.1.1
by Michael Hudson
1) record subprocess stderr in the oops, if any |
383 |
tb = getattr(failure, 'error', None) |
384 |
if tb is None: |
|
385 |
tb = failure.getTraceback() |
|
4898.2.59
by Jonathan Lange
Record OOPSes for unexpected exceptions in the master. |
386 |
errorlog.globalErrorUtility.raising( |
13686.2.17
by Robert Collins
Test failure fixups. Most innocuous, the only complex one is dropping now from unexpectedError - but it was all driven by tests wanting that, vs production code. |
387 |
(failure.type, failure.value, tb), request) |
4898.2.59
by Jonathan Lange
Record OOPSes for unexpected exceptions in the master. |
388 |
self.logger.info('Recorded %s', request.oopsid) |
389 |
||
4898.2.23
by Jonathan Lange
Move event listener into separate class. |
390 |
|
4898.2.42
by Jonathan Lange
scheduler.BranchToMirror -> scheduler.PullerMaster |
391 |
class JobScheduler: |
2770.1.55
by Guilherme Salgado
Copy jblack's supermirror pull script files into lib/canonical/launchpad/scripts/supermirror |
392 |
"""Schedule and manage the mirroring of branches.
|
3691.395.4
by David Allouche
supermirror-pull.py takes an argument: "upload", "mirror" or "import", and only pulls the branches of this type. Prevents slow pulls in one queue from stalling the whole supermirror, and helps achieving low latency. |
393 |
|
2770.1.55
by Guilherme Salgado
Copy jblack's supermirror pull script files into lib/canonical/launchpad/scripts/supermirror |
394 |
The jobmanager is responsible for organizing the mirroring of all
|
395 |
branches.
|
|
396 |
"""
|
|
397 |
||
9590.1.49
by Michael Hudson
more combining the puller and filesystem endpoints |
398 |
def __init__(self, codehosting_endpoint, logger, branch_type_names): |
399 |
self.codehosting_endpoint = codehosting_endpoint |
|
4898.2.23
by Jonathan Lange
Move event listener into separate class. |
400 |
self.logger = logger |
10379.2.4
by Michael Hudson
puller-side changes |
401 |
self.branch_type_names = branch_type_names |
2770.1.55
by Guilherme Salgado
Copy jblack's supermirror pull script files into lib/canonical/launchpad/scripts/supermirror |
402 |
self.actualLock = None |
8721.2.7
by Michael Hudson
some simplifications and a pretty obscure hack to exit if no jobs are present on startup |
403 |
self.name = 'branch-puller' |
4756.1.19
by Jonathan Lange
Completely gut JobManager subclasses, moving towards just having a |
404 |
self.lockfilename = '/var/lock/launchpad-%s.lock' % self.name |
3691.396.6
by David Allouche
record oopses for all errors |
405 |
|
8721.2.3
by Michael Hudson
bit less broken |
406 |
def _turnJobTupleIntoTask(self, job_tuple): |
8721.2.15
by Michael Hudson
review comments |
407 |
"""Turn the return value of `acquireBranchToPull` into a job.
|
408 |
||
409 |
`IBranchPuller.acquireBranchToPull` returns either an empty tuple
|
|
410 |
(indicating there are no branches to pull currently) or a tuple of 6
|
|
411 |
arguments, which are more or less those needed to construct a
|
|
412 |
`PullerMaster` object.
|
|
413 |
"""
|
|
8721.2.3
by Michael Hudson
bit less broken |
414 |
if len(job_tuple) == 0: |
415 |
return None |
|
416 |
(branch_id, pull_url, unique_name, |
|
417 |
default_stacked_on_url, branch_type_name) = job_tuple |
|
418 |
master = PullerMaster( |
|
8721.2.7
by Michael Hudson
some simplifications and a pretty obscure hack to exit if no jobs are present on startup |
419 |
branch_id, pull_url, unique_name, branch_type_name, |
8721.2.3
by Michael Hudson
bit less broken |
420 |
default_stacked_on_url, self.logger, |
14104.6.23
by Robert Collins
Nuke setOopsToken unneeded in a concurrency safe world. |
421 |
self.codehosting_endpoint) |
8721.2.3
by Michael Hudson
bit less broken |
422 |
return master.run |
423 |
||
8721.2.2
by Michael Hudson
well this nearly works.... |
424 |
def _poll(self): |
9590.1.49
by Michael Hudson
more combining the puller and filesystem endpoints |
425 |
deferred = self.codehosting_endpoint.callRemote( |
10379.2.5
by Michael Hudson
integration test and implied fixes |
426 |
'acquireBranchToPull', self.branch_type_names) |
8721.2.3
by Michael Hudson
bit less broken |
427 |
deferred.addCallback(self._turnJobTupleIntoTask) |
8721.2.2
by Michael Hudson
well this nearly works.... |
428 |
return deferred |
429 |
||
430 |
def run(self): |
|
8721.2.14
by Michael Hudson
lint |
431 |
consumer = ParallelLimitedTaskConsumer( |
9841.1.6
by Jonathan Lange
Make the logging actually work when the script is run, and tweak it |
432 |
config.supermirror.maximum_workers, logger=self.logger) |
8721.2.7
by Michael Hudson
some simplifications and a pretty obscure hack to exit if no jobs are present on startup |
433 |
self.consumer = consumer |
8721.2.15
by Michael Hudson
review comments |
434 |
source = PollingTaskSource( |
9841.1.6
by Jonathan Lange
Make the logging actually work when the script is run, and tweak it |
435 |
config.supermirror.polling_interval, self._poll, |
436 |
logger=self.logger) |
|
8721.2.2
by Michael Hudson
well this nearly works.... |
437 |
deferred = consumer.consume(source) |
4898.2.23
by Jonathan Lange
Move event listener into separate class. |
438 |
deferred.addCallback(self._finishedRunning) |
4898.2.7
by Jonathan Lange
Mirror multiple branches in subprocesses, limited by a semaphore |
439 |
return deferred |
440 |
||
4898.2.23
by Jonathan Lange
Move event listener into separate class. |
441 |
def _finishedRunning(self, ignored): |
442 |
self.logger.info('Mirroring complete') |
|
4898.2.50
by Jonathan Lange
Introduce unexpectd error handling. Consolidate mock BranchStatusClients. |
443 |
return ignored |
4898.2.4
by Jonathan Lange
Do the actual mirroring in a subprocess. Very naive implementation. |
444 |
|
3691.395.4
by David Allouche
supermirror-pull.py takes an argument: "upload", "mirror" or "import", and only pulls the branches of this type. Prevents slow pulls in one queue from stalling the whole supermirror, and helps achieving low latency. |
445 |
def lock(self): |
446 |
self.actualLock = GlobalLock(self.lockfilename) |
|
2770.1.55
by Guilherme Salgado
Copy jblack's supermirror pull script files into lib/canonical/launchpad/scripts/supermirror |
447 |
try: |
448 |
self.actualLock.acquire() |
|
3691.348.3
by kiko
Fix a leftover script (sorry\!) and change the place we set is_locked to true in glock.py |
449 |
except LockAlreadyAcquired: |
3691.395.4
by David Allouche
supermirror-pull.py takes an argument: "upload", "mirror" or "import", and only pulls the branches of this type. Prevents slow pulls in one queue from stalling the whole supermirror, and helps achieving low latency. |
450 |
raise LockError(self.lockfilename) |
2770.1.55
by Guilherme Salgado
Copy jblack's supermirror pull script files into lib/canonical/launchpad/scripts/supermirror |
451 |
|
452 |
def unlock(self): |
|
453 |
self.actualLock.release() |
|
454 |
||
4898.2.5
by Jonathan Lange
Clean up further instances where branch_status_client is passed around. |
455 |
def recordActivity(self, date_started, date_completed): |
4501.2.1
by David Allouche
supermirror-pull script inserts to ScriptActivity through the authserver. |
456 |
"""Record successful completion of the script."""
|
6971.2.1
by Jonathan Lange
Get rid of BranchStatusClient. This should avoid the attribute error we |
457 |
started_tuple = tuple(date_started.utctimetuple()) |
458 |
completed_tuple = tuple(date_completed.utctimetuple()) |
|
9590.1.49
by Michael Hudson
more combining the puller and filesystem endpoints |
459 |
return self.codehosting_endpoint.callRemote( |
6971.2.1
by Jonathan Lange
Get rid of BranchStatusClient. This should avoid the attribute error we |
460 |
'recordSuccess', self.name, socket.gethostname(), started_tuple, |
461 |
completed_tuple) |
|
4501.2.1
by David Allouche
supermirror-pull script inserts to ScriptActivity through the authserver. |
462 |
|
2770.1.55
by Guilherme Salgado
Copy jblack's supermirror pull script files into lib/canonical/launchpad/scripts/supermirror |
463 |
|
2770.1.56
by Guilherme Salgado
Cleanup loads of code, improved some tests and got all of them to work. |
464 |
class LockError(StandardError): |
465 |
||
466 |
def __init__(self, lockfilename): |
|
5267.1.4
by Michael Hudson
fix lint |
467 |
StandardError.__init__(self) |
2770.1.56
by Guilherme Salgado
Cleanup loads of code, improved some tests and got all of them to work. |
468 |
self.lockfilename = lockfilename |
469 |
||
470 |
def __str__(self): |
|
471 |
return 'Jobmanager unable to get master lock: %s' % self.lockfilename |