~launchpad-pqm/launchpad/devel

8687.15.17 by Karl Fogel
Add the copyright header block to the rest of the files under lib/lp/.
1
# Copyright 2009 Canonical Ltd.  This software is licensed under the
2
# GNU Affero General Public License version 3 (see the file LICENSE).
3
5977.5.6 by Jonathan Lange
Apply Barry's review comments.
4
# pylint: disable-msg=W0702
5138.7.8 by jml at canonical
Conform to coding conventions (copyright statement and __all__)
5
6
__metaclass__ = type
6876.6.2 by Jonathan Lange
A couple of formatting fixes.
7
__all__ = [
8
    'BadMessage',
9
    'JobScheduler',
10
    'LockError',
11
    'PullerMaster',
12
    'PullerMonitorProtocol',
13
    ]
5138.7.12 by Michael Hudson
mini review and make lint fixes
14
2770.1.55 by Guilherme Salgado
Copy jblack's supermirror pull script files into lib/canonical/launchpad/scripts/supermirror
15
import os
11403.1.4 by Henning Eggers
Reformatted imports using format-imports script r32.
16
import socket
4898.2.32 by Jonathan Lange
Add tests and some implementation for what happens when things go wrong in the protocol.
17
from StringIO import StringIO
11403.1.4 by Henning Eggers
Reformatted imports using format-imports script r32.
18
19
from contrib.glock import (
20
    GlobalLock,
21
    LockAlreadyAcquired,
22
    )
23
from twisted.internet import (
24
    defer,
25
    error,
26
    reactor,
27
    )
28
from twisted.protocols.basic import (
29
    NetstringParseError,
30
    NetstringReceiver,
31
    )
32
from twisted.python import (
33
    failure,
34
    log,
35
    )
4792.2.3 by David Allouche
Define BranchToMirror.traverse_references.
36
11403.1.4 by Henning Eggers
Reformatted imports using format-imports script r32.
37
from canonical.config import config
38
from canonical.launchpad.webapp import errorlog
9590.1.73 by Michael Hudson
tweaks to get the puller acceptance tests running
39
from lp.code.interfaces.codehosting import LAUNCHPAD_SERVICES
8426.6.1 by Michael Hudson
bzr ls --versioned --recursive --kind=file | xargs sed -i -e 's,from canonical.codehosting,from lp.codehosting,'
40
from lp.codehosting.puller import get_lock_id_for_branch_id
11403.1.4 by Henning Eggers
Reformatted imports using format-imports script r32.
41
from lp.codehosting.puller.worker import get_canonical_url_for_branch_name
10548.1.1 by Jonathan Lange
Move twistedsupport to lp.services
42
from lp.services.twistedsupport.processmonitor import (
11403.1.4 by Henning Eggers
Reformatted imports using format-imports script r32.
43
    ProcessMonitorProtocolWithTimeout,
44
    )
10548.1.1 by Jonathan Lange
Move twistedsupport to lp.services
45
from lp.services.twistedsupport.task import (
11403.1.4 by Henning Eggers
Reformatted imports using format-imports script r32.
46
    ParallelLimitedTaskConsumer,
47
    PollingTaskSource,
48
    )
2770.1.55 by Guilherme Salgado
Copy jblack's supermirror pull script files into lib/canonical/launchpad/scripts/supermirror
49
5977.5.6 by Jonathan Lange
Apply Barry's review comments.
50
4898.2.51 by Jonathan Lange
Add tests for handling unexpected failures.
51
class BadMessage(Exception):
52
    """Raised when the protocol receives a message that we don't recognize."""
53
54
    def __init__(self, bad_netstring):
55
        Exception.__init__(
56
            self, 'Received unrecognized message: %r' % bad_netstring)
57
58
6614.2.10 by Jonathan Lange
Merge in the revert, but leave text unchanged.
59
class UnexpectedStderr(Exception):
60
    """Raised when the worker prints to stderr."""
61
62
    def __init__(self, stderr):
63
        if stderr:
64
            last_line = stderr.splitlines()[-1]
65
        else:
66
            last_line = stderr
67
        Exception.__init__(
68
            self, "Unexpected standard error from subprocess: %s" % last_line)
69
        self.error = stderr
70
71
5861.1.16 by Michael Hudson
split out netstring part of pullermasterprotocol
72
class PullerWireProtocol(NetstringReceiver):
73
    """The wire protocol for receiving events from the puller worker.
74
75
    The wire-level protocol is a series of netstrings.
76
77
    At the next level up, the protocol consists of messages which each look
78
    like this::
79
80
            [method-name] [number-of-arguments] [arguments]+
81
82
    Thus the instance is always in one of three states::
83
84
        [0] Waiting for command name.
85
        [1] Waiting for argument count.
86
        [2] Waiting for an argument.
87
88
    In state [0], we are waiting for a command name.  When we get one, we
89
    store it in self._current_command and move into state [1].
90
91
    In state [1], we are waiting for an argument count.  When we receive a
92
    message, we try to convert it to an integer.  If we fail in this, we call
93
    unexpectedError().  Otherwise, if it's greater than zero, we store the
94
    number in self._expected_args and go into state [2] or if it's zero
95
    execute the command (see below).
96
97
    In state [2], we are waiting for an argument.  When we receive one, we
98
    append it to self._current_args.  If len(self._current_args) ==
99
    self._expected_args, execute the command.
100
5861.1.20 by Michael Hudson
final review comments
101
    "Executing the command" means looking for a method called
102
    do_<command name> on self.puller_protocol and calling it with
103
    *self._current_args.  If this raises, call
104
    self.puller_protocol.unexpectedError().
5861.1.16 by Michael Hudson
split out netstring part of pullermasterprotocol
105
106
    The method _resetState() forces us back into state [0].
107
    """
108
5861.1.20 by Michael Hudson
final review comments
109
    def __init__(self, puller_protocol):
110
        self.puller_protocol = puller_protocol
5861.1.17 by Michael Hudson
some fixes, still some failures too
111
        self._resetState()
5861.1.16 by Michael Hudson
split out netstring part of pullermasterprotocol
112
113
    def dataReceived(self, data):
114
        """See `NetstringReceiver.dataReceived`."""
115
        NetstringReceiver.dataReceived(self, data)
116
        # XXX: JonathanLange 2007-10-16
117
        # bug=http://twistedmatrix.com/trac/ticket/2851: There are no hooks in
118
        # NetstringReceiver to catch a NetstringParseError. The best we can do
119
        # is check the value of brokenPeer.
120
        if self.brokenPeer:
5861.1.20 by Michael Hudson
final review comments
121
            self.puller_protocol.unexpectedError(
5861.1.16 by Michael Hudson
split out netstring part of pullermasterprotocol
122
                failure.Failure(NetstringParseError(data)))
123
124
    def stringReceived(self, line):
125
        """See `NetstringReceiver.stringReceived`."""
126
        if (self._current_command is not None
127
            and self._expected_args is not None):
128
            # state [2]
129
            self._current_args.append(line)
130
        elif self._current_command is not None:
131
            # state [1]
132
            try:
133
                self._expected_args = int(line)
134
            except ValueError:
5861.1.20 by Michael Hudson
final review comments
135
                self.puller_protocol.unexpectedError(failure.Failure())
5861.1.16 by Michael Hudson
split out netstring part of pullermasterprotocol
136
        else:
137
            # state [0]
5861.1.20 by Michael Hudson
final review comments
138
            if getattr(self.puller_protocol, 'do_%s' % line, None) is None:
139
                self.puller_protocol.unexpectedError(
5861.1.16 by Michael Hudson
split out netstring part of pullermasterprotocol
140
                    failure.Failure(BadMessage(line)))
141
            else:
142
                self._current_command = line
143
144
        if len(self._current_args) == self._expected_args:
145
            # Execute the command.
146
            method = getattr(
5861.1.20 by Michael Hudson
final review comments
147
                self.puller_protocol, 'do_%s' % self._current_command)
5861.1.16 by Michael Hudson
split out netstring part of pullermasterprotocol
148
            try:
149
                try:
150
                    method(*self._current_args)
151
                except:
5861.1.20 by Michael Hudson
final review comments
152
                    self.puller_protocol.unexpectedError(failure.Failure())
5861.1.16 by Michael Hudson
split out netstring part of pullermasterprotocol
153
            finally:
154
                self._resetState()
155
156
    def _resetState(self):
157
        """Force into the 'waiting for command' state."""
158
        self._current_command = None
159
        self._expected_args = None
160
        self._current_args = []
161
162
5861.1.15 by Michael Hudson
respond to some review comments
163
class PullerMonitorProtocol(ProcessMonitorProtocolWithTimeout,
164
                            NetstringReceiver):
4898.2.67 by jml at canonical
Add some comments and docstrings.
165
    """The protocol for receiving events from the puller worker."""
4898.2.8 by Jonathan Lange
Timeout the process after a certain period of time.
166
5138.7.4 by jml at canonical
Add a timeout to the protocol. This timeout covers the entire connection
167
    def __init__(self, deferred, listener, clock=None):
4898.2.67 by jml at canonical
Add some comments and docstrings.
168
        """Construct an instance of the protocol, for listening to a worker.
169
170
        :param deferred: A Deferred that will be fired when the worker has
171
            finished (either successfully or unsuccesfully).
172
        :param listener: A PullerMaster object that is notified when the
173
            protocol receives events from the worker.
5138.7.12 by Michael Hudson
mini review and make lint fixes
174
        :param clock: A provider of Twisted's IReactorTime.  This parameter
5138.7.17 by Michael Hudson
review comments
175
            exists to allow testing that does not depend on an external clock.
176
            If a clock is not passed in explicitly the reactor is used.
4898.2.67 by jml at canonical
Add some comments and docstrings.
177
        """
5861.1.3 by Michael Hudson
Docstrings and tests for ProcessMonitorProtocol and
178
        ProcessMonitorProtocolWithTimeout.__init__(
179
            self, deferred, config.supermirror.worker_timeout, clock)
5412.1.1 by Michael Hudson
1) record subprocess stderr in the oops, if any
180
        self.reported_mirror_finished = False
4898.2.17 by Jonathan Lange
Implement the server-side protocol.
181
        self.listener = listener
5861.1.20 by Michael Hudson
final review comments
182
        self.wire_protocol = PullerWireProtocol(self)
4898.2.32 by Jonathan Lange
Add tests and some implementation for what happens when things go wrong in the protocol.
183
        self._stderr = StringIO()
5861.1.3 by Michael Hudson
Docstrings and tests for ProcessMonitorProtocol and
184
        self._deferred.addCallbacks(
5861.1.15 by Michael Hudson
respond to some review comments
185
            self.checkReportingFinishedAndNoStderr,
186
            self.ensureReportingFinished)
187
5861.1.16 by Michael Hudson
split out netstring part of pullermasterprotocol
188
    def reportMirrorFinished(self, ignored):
189
        self.reported_mirror_finished = True
190
5861.1.15 by Michael Hudson
respond to some review comments
191
    def checkReportingFinishedAndNoStderr(self, result):
192
        """Check that the worker process behaved properly on clean exit.
193
194
        When the process exits cleanly, we expect it to have not printed
195
        anything to stderr and to have reported success or failure.  If it has
196
        failed to do either of these things, we should fail noisily."""
5861.1.4 by Michael Hudson
fix one TestPullerMasterProtocol test, delete some others
197
        stderr = self._stderr.getvalue()
198
        if stderr:
6614.2.10 by Jonathan Lange
Merge in the revert, but leave text unchanged.
199
            fail = failure.Failure(UnexpectedStderr(stderr))
5861.1.4 by Michael Hudson
fix one TestPullerMasterProtocol test, delete some others
200
            fail.error = stderr
201
            return fail
5861.1.3 by Michael Hudson
Docstrings and tests for ProcessMonitorProtocol and
202
        if not self.reported_mirror_finished:
203
            raise AssertionError('Process exited successfully without '
204
                                 'reporting success or failure?')
205
        return result
206
207
    def ensureReportingFinished(self, reason):
5861.1.15 by Michael Hudson
respond to some review comments
208
        """Clean up after the worker process exits uncleanly.
209
210
        If the worker process exited uncleanly, it probably didn't report
211
        success or failure, so we should report failure.  If there was output
212
        on stderr, it's probably a traceback, so we use the last line of that
6614.2.10 by Jonathan Lange
Merge in the revert, but leave text unchanged.
213
        as a failure reason.
214
        """
5861.1.3 by Michael Hudson
Docstrings and tests for ProcessMonitorProtocol and
215
        if not self.reported_mirror_finished:
5977.5.6 by Jonathan Lange
Apply Barry's review comments.
216
            stderr = self._stderr.getvalue()
217
            reason.error = stderr
6614.2.10 by Jonathan Lange
Merge in the revert, but leave text unchanged.
218
            if stderr:
5977.5.6 by Jonathan Lange
Apply Barry's review comments.
219
                errorline = stderr.splitlines()[-1]
5861.1.3 by Michael Hudson
Docstrings and tests for ProcessMonitorProtocol and
220
            else:
5861.1.2 by Michael Hudson
use processmonitorprotocol in the scheduler implementation, and make all the
221
                errorline = str(reason.value)
5861.1.15 by Michael Hudson
respond to some review comments
222
            # The general policy when multiple errors occur is to report the
223
            # one that happens first and as an error has already happened here
224
            # (the process exiting uncleanly) we can only log.err() any
225
            # failure that comes from mirrorFailed failing.  In any case, we
226
            # just pass along the failure.
227
            report_failed_deferred = defer.maybeDeferred(
5861.1.2 by Michael Hudson
use processmonitorprotocol in the scheduler implementation, and make all the
228
                self.listener.mirrorFailed, errorline, None)
5861.1.15 by Michael Hudson
respond to some review comments
229
            report_failed_deferred.addErrback(log.err)
230
            return report_failed_deferred.addCallback(
231
                lambda result: reason)
5861.1.2 by Michael Hudson
use processmonitorprotocol in the scheduler implementation, and make all the
232
        else:
233
            return reason
234
12181.2.3 by Jonathan Lange
Call out to makeConnection on the underlying wire protocol so that it's
235
    def makeConnection(self, process):
236
        """Called when the process has been created."""
237
        ProcessMonitorProtocolWithTimeout.makeConnection(self, process)
238
        NetstringReceiver.makeConnection(self, process)
239
        self.wire_protocol.makeConnection(process)
240
5861.1.16 by Michael Hudson
split out netstring part of pullermasterprotocol
241
    def outReceived(self, data):
5861.1.20 by Michael Hudson
final review comments
242
        self.wire_protocol.dataReceived(data)
5861.1.16 by Michael Hudson
split out netstring part of pullermasterprotocol
243
244
    def errReceived(self, data):
245
        self._stderr.write(data)
5861.1.15 by Michael Hudson
respond to some review comments
246
4898.2.26 by Jonathan Lange
Refactor server-side protocol to be more generic. Restore '_pythonpath' import
247
    def do_startMirroring(self):
5138.7.5 by jml at canonical
Reset the timeout when we get interesting progress.
248
        self.resetTimeout()
5861.1.2 by Michael Hudson
use processmonitorprotocol in the scheduler implementation, and make all the
249
        self.runNotification(self.listener.startMirroring)
4898.2.26 by Jonathan Lange
Refactor server-side protocol to be more generic. Restore '_pythonpath' import
250
9590.1.71 by Michael Hudson
merge the actual changes to the puller from the next pipe
251
    def do_branchChanged(self, stacked_on_url, revid_before, revid_after,
252
                         control_string, branch_string, repository_string):
253
        def branchChanged():
5861.1.2 by Michael Hudson
use processmonitorprotocol in the scheduler implementation, and make all the
254
            d = defer.maybeDeferred(
9590.1.71 by Michael Hudson
merge the actual changes to the puller from the next pipe
255
                self.listener.branchChanged, stacked_on_url, revid_before,
256
                revid_after, control_string, branch_string, repository_string)
5861.1.2 by Michael Hudson
use processmonitorprotocol in the scheduler implementation, and make all the
257
            d.addCallback(self.reportMirrorFinished)
258
            return d
9590.1.71 by Michael Hudson
merge the actual changes to the puller from the next pipe
259
        self.runNotification(branchChanged)
4898.2.26 by Jonathan Lange
Refactor server-side protocol to be more generic. Restore '_pythonpath' import
260
4898.2.27 by Jonathan Lange
Send and handle an extra parameter to mirrorFailed in the protocol. The extra
261
    def do_mirrorFailed(self, reason, oops):
5861.1.2 by Michael Hudson
use processmonitorprotocol in the scheduler implementation, and make all the
262
        def mirrorFailed():
263
            d = defer.maybeDeferred(
264
                self.listener.mirrorFailed, reason, oops)
265
            d.addCallback(self.reportMirrorFinished)
266
            return d
267
        self.runNotification(mirrorFailed)
4898.2.7 by Jonathan Lange
Mirror multiple branches in subprocesses, limited by a semaphore
268
5138.7.3 by jml at canonical
Remove indicator from progressMade in protocol, not needed yet.
269
    def do_progressMade(self):
5138.7.17 by Michael Hudson
review comments
270
        """Any progress resets the timout counter."""
5138.7.5 by jml at canonical
Reset the timeout when we get interesting progress.
271
        self.resetTimeout()
5138.7.2 by jml at canonical
Extend the scheduler protocol to support progress indicator.
272
7109.2.1 by Michael Hudson
add the ability to log from the puller worker
273
    def do_log(self, message):
274
        self.listener.log(message)
275
4898.2.7 by Jonathan Lange
Mirror multiple branches in subprocesses, limited by a semaphore
276
4898.2.42 by Jonathan Lange
scheduler.BranchToMirror -> scheduler.PullerMaster
277
class PullerMaster:
4898.2.69 by jml at canonical
More docstrings and remove the unused _recordOops
278
    """Controller for a single puller worker.
279
280
    The `PullerMaster` kicks off a child worker process and handles the events
281
    generated by that process.
282
    """
4898.2.23 by Jonathan Lange
Move event listener into separate class.
283
9590.1.34 by Michael Hudson
use mirrored branches in the puller acceptance tests
284
    path_to_script = os.path.join(config.root, 'scripts/mirror-branch.py')
5861.1.15 by Michael Hudson
respond to some review comments
285
    protocol_class = PullerMonitorProtocol
5283.7.1 by Michael Hudson
test and implementation for locking the branch with a prescribed lock id
286
8721.2.7 by Michael Hudson
some simplifications and a pretty obscure hack to exit if no jobs are present on startup
287
    def __init__(self, branch_id, source_url, unique_name, branch_type_name,
14104.6.23 by Robert Collins
Nuke setOopsToken unneeded in a concurrency safe world.
288
                 default_stacked_on_url, logger, client):
4898.2.69 by jml at canonical
More docstrings and remove the unused _recordOops
289
        """Construct a PullerMaster object.
290
291
        :param branch_id: The database ID of the branch to be mirrored.
292
        :param source_url: The location from which the branch is to be
293
            mirrored.
294
        :param unique_name: The unique name of the branch to be mirrored.
9590.1.24 by Michael Hudson
start hacking code out of the puller
295
        :param branch_type_name: The name of the BranchType of the branch to
296
            be mirrored (e.g. 'MIRRORED').
6999.4.38 by Jonathan Lange
Call it default_stacked_on_url, because we're including the leading
297
        :param default_stacked_on_url: The default stacked-on URL for the
6999.4.55 by Jonathan Lange
Don't mess about with None as an acceptable value for default_stacked_on_url.
298
            product that the branch is in. '' implies that there is no such
299
            default.
4898.2.69 by jml at canonical
More docstrings and remove the unused _recordOops
300
        :param logger: A Python logging object.
301
        :param client: An asynchronous client for the branch status XML-RPC
302
            service.
303
        """
4898.2.23 by Jonathan Lange
Move event listener into separate class.
304
        self.branch_id = branch_id
305
        self.source_url = source_url.strip()
9590.1.24 by Michael Hudson
start hacking code out of the puller
306
        self.destination_url = 'lp-internal:///%s' % (unique_name,)
4898.2.23 by Jonathan Lange
Move event listener into separate class.
307
        self.unique_name = unique_name
8721.2.7 by Michael Hudson
some simplifications and a pretty obscure hack to exit if no jobs are present on startup
308
        self.branch_type_name = branch_type_name
6999.4.38 by Jonathan Lange
Call it default_stacked_on_url, because we're including the leading
309
        self.default_stacked_on_url = default_stacked_on_url
4898.2.23 by Jonathan Lange
Move event listener into separate class.
310
        self.logger = logger
9590.1.49 by Michael Hudson
more combining the puller and filesystem endpoints
311
        self.codehosting_endpoint = client
5343.2.7 by jml at canonical
Use cachedproperty and fiddle things around to be attributes. Not sure I
312
313
    def mirror(self):
5294.2.13 by Michael Hudson
put a docstring back
314
        """Spawn a worker process to mirror a branch."""
4898.2.23 by Jonathan Lange
Move event listener into separate class.
315
        deferred = defer.Deferred()
5861.1.15 by Michael Hudson
respond to some review comments
316
        protocol = self.protocol_class(deferred, self)
8554.1.3 by Michael Hudson
remove all uses of sys.executable in lp.codehosting
317
        interpreter = '%s/bin/py' % config.root
4898.2.23 by Jonathan Lange
Move event listener into separate class.
318
        command = [
8554.1.3 by Michael Hudson
remove all uses of sys.executable in lp.codehosting
319
            interpreter, self.path_to_script, self.source_url,
5990.2.1 by Jonathan Lange
We can't pass unicode to the command line. Stringify the branch name.
320
            self.destination_url, str(self.branch_id), str(self.unique_name),
14104.6.23 by Robert Collins
Nuke setOopsToken unneeded in a concurrency safe world.
321
            self.branch_type_name,
6999.4.38 by Jonathan Lange
Call it default_stacked_on_url, because we're including the leading
322
            self.default_stacked_on_url]
7167.9.10 by Michael Hudson
victorrrrrrrrrrrrrrrrrry!
323
        self.logger.debug("executing %s", command)
5283.7.1 by Michael Hudson
test and implementation for locking the branch with a prescribed lock id
324
        env = os.environ.copy()
5294.2.8 by Michael Hudson
OOMPH!
325
        env['BZR_EMAIL'] = get_lock_id_for_branch_id(self.branch_id)
8554.1.3 by Michael Hudson
remove all uses of sys.executable in lp.codehosting
326
        reactor.spawnProcess(protocol, interpreter, command, env=env)
4898.2.23 by Jonathan Lange
Move event listener into separate class.
327
        return deferred
328
5343.2.7 by jml at canonical
Use cachedproperty and fiddle things around to be attributes. Not sure I
329
    def run(self):
5343.2.8 by jml at canonical
Factor out common logic in errorlog and add a lot more docstrings to schduler.
330
        """Launch a child worker and mirror a branch, handling errors.
331
332
        This is the main method to call to mirror a branch.
7675.194.2 by Jonathan Lange
Docs
333
334
        :return: A Deferred that fires when the mirroring job is completed,
335
            one way or the other. It will never fire with a failure. The value
336
            of the Deferred itself is uninteresting (probably None).
5343.2.8 by jml at canonical
Factor out common logic in errorlog and add a lot more docstrings to schduler.
337
        """
5343.2.7 by jml at canonical
Use cachedproperty and fiddle things around to be attributes. Not sure I
338
        deferred = self.mirror()
4898.2.59 by Jonathan Lange
Record OOPSes for unexpected exceptions in the master.
339
        deferred.addErrback(self.unexpectedError)
340
        return deferred
341
4898.2.23 by Jonathan Lange
Move event listener into separate class.
342
    def startMirroring(self):
4898.2.24 by Jonathan Lange
Move lots of the logging out of the child process.
343
        self.logger.info(
9587.3.1 by Michael Hudson
one less xmlrpc call
344
            'Worker started on branch %d: %s to %s', self.branch_id,
345
            self.source_url, self.destination_url)
4898.2.23 by Jonathan Lange
Move event listener into separate class.
346
4898.2.28 by Jonathan Lange
Actually pass the oops id along to the event listener.
347
    def mirrorFailed(self, reason, oops):
4898.2.29 by Jonathan Lange
No more logging in the child process.
348
        self.logger.info('Recorded %s', oops)
4898.2.24 by Jonathan Lange
Move lots of the logging out of the child process.
349
        self.logger.info('Recorded failure: %s', str(reason))
9590.1.49 by Michael Hudson
more combining the puller and filesystem endpoints
350
        return self.codehosting_endpoint.callRemote(
6971.2.1 by Jonathan Lange
Get rid of BranchStatusClient. This should avoid the attribute error we
351
            'mirrorFailed', self.branch_id, reason)
4898.2.23 by Jonathan Lange
Move event listener into separate class.
352
9590.1.71 by Michael Hudson
merge the actual changes to the puller from the next pipe
353
    def branchChanged(self, stacked_on_url, revid_before, revid_after,
354
                      control_string, branch_string, repository_string):
10067.2.1 by Michael Hudson
log revid from before the puller runs
355
        if revid_before == revid_after:
356
            was_noop = 'noop'
357
        else:
358
            was_noop = 'non-trivial'
9586.1.1 by Tim Penhey
More logging on success.
359
        self.logger.info(
10067.2.1 by Michael Hudson
log revid from before the puller runs
360
            'Successfully mirrored %s branch %d %s to %s to from rev %s to %s'
361
            ' (%s)', self.branch_type_name, self.branch_id, self.source_url,
362
            self.destination_url, revid_before, revid_after, was_noop)
9590.1.49 by Michael Hudson
more combining the puller and filesystem endpoints
363
        return self.codehosting_endpoint.callRemote(
9590.1.73 by Michael Hudson
tweaks to get the puller acceptance tests running
364
            'branchChanged', LAUNCHPAD_SERVICES, self.branch_id,
365
            stacked_on_url, revid_after, control_string, branch_string,
366
            repository_string)
4898.2.23 by Jonathan Lange
Move event listener into separate class.
367
7109.2.1 by Michael Hudson
add the ability to log from the puller worker
368
    def log(self, message):
369
        self.logger.info('From worker: %s', message)
370
13686.2.17 by Robert Collins
Test failure fixups. Most innocuous, the only complex one is dropping now from unexpectedError - but it was all driven by tests wanting that, vs production code.
371
    def unexpectedError(self, failure):
4898.2.59 by Jonathan Lange
Record OOPSes for unexpected exceptions in the master.
372
        request = errorlog.ScriptRequest([
373
            ('branch_id', self.branch_id),
374
            ('source', self.source_url),
375
            ('dest', self.destination_url),
376
            ('error-explanation', failure.getErrorMessage())])
5121.3.1 by jml at canonical
Add a test to exercise unexpected error handling in scheduler and fix
377
        request.URL = get_canonical_url_for_branch_name(self.unique_name)
6614.2.10 by Jonathan Lange
Merge in the revert, but leave text unchanged.
378
        # If the sub-process exited abnormally, the stderr it produced is
5412.1.2 by Michael Hudson
comments
379
        # probably a much more interesting traceback than the one attached to
380
        # the Failure we've been passed.
5412.1.1 by Michael Hudson
1) record subprocess stderr in the oops, if any
381
        tb = None
6614.2.10 by Jonathan Lange
Merge in the revert, but leave text unchanged.
382
        if failure.check(error.ProcessTerminated, UnexpectedStderr):
5412.1.1 by Michael Hudson
1) record subprocess stderr in the oops, if any
383
            tb = getattr(failure, 'error', None)
384
        if tb is None:
385
            tb = failure.getTraceback()
4898.2.59 by Jonathan Lange
Record OOPSes for unexpected exceptions in the master.
386
        errorlog.globalErrorUtility.raising(
13686.2.17 by Robert Collins
Test failure fixups. Most innocuous, the only complex one is dropping now from unexpectedError - but it was all driven by tests wanting that, vs production code.
387
            (failure.type, failure.value, tb), request)
4898.2.59 by Jonathan Lange
Record OOPSes for unexpected exceptions in the master.
388
        self.logger.info('Recorded %s', request.oopsid)
389
4898.2.23 by Jonathan Lange
Move event listener into separate class.
390
4898.2.42 by Jonathan Lange
scheduler.BranchToMirror -> scheduler.PullerMaster
391
class JobScheduler:
2770.1.55 by Guilherme Salgado
Copy jblack's supermirror pull script files into lib/canonical/launchpad/scripts/supermirror
392
    """Schedule and manage the mirroring of branches.
3691.395.4 by David Allouche
supermirror-pull.py takes an argument: "upload", "mirror" or "import", and only pulls the branches of this type. Prevents slow pulls in one queue from stalling the whole supermirror, and helps achieving low latency.
393
2770.1.55 by Guilherme Salgado
Copy jblack's supermirror pull script files into lib/canonical/launchpad/scripts/supermirror
394
    The jobmanager is responsible for organizing the mirroring of all
395
    branches.
396
    """
397
9590.1.49 by Michael Hudson
more combining the puller and filesystem endpoints
398
    def __init__(self, codehosting_endpoint, logger, branch_type_names):
399
        self.codehosting_endpoint = codehosting_endpoint
4898.2.23 by Jonathan Lange
Move event listener into separate class.
400
        self.logger = logger
10379.2.4 by Michael Hudson
puller-side changes
401
        self.branch_type_names = branch_type_names
2770.1.55 by Guilherme Salgado
Copy jblack's supermirror pull script files into lib/canonical/launchpad/scripts/supermirror
402
        self.actualLock = None
8721.2.7 by Michael Hudson
some simplifications and a pretty obscure hack to exit if no jobs are present on startup
403
        self.name = 'branch-puller'
4756.1.19 by Jonathan Lange
Completely gut JobManager subclasses, moving towards just having a
404
        self.lockfilename = '/var/lock/launchpad-%s.lock' % self.name
3691.396.6 by David Allouche
record oopses for all errors
405
8721.2.3 by Michael Hudson
bit less broken
406
    def _turnJobTupleIntoTask(self, job_tuple):
8721.2.15 by Michael Hudson
review comments
407
        """Turn the return value of `acquireBranchToPull` into a job.
408
409
        `IBranchPuller.acquireBranchToPull` returns either an empty tuple
410
        (indicating there are no branches to pull currently) or a tuple of 6
411
        arguments, which are more or less those needed to construct a
412
        `PullerMaster` object.
413
        """
8721.2.3 by Michael Hudson
bit less broken
414
        if len(job_tuple) == 0:
415
            return None
416
        (branch_id, pull_url, unique_name,
417
         default_stacked_on_url, branch_type_name) = job_tuple
418
        master = PullerMaster(
8721.2.7 by Michael Hudson
some simplifications and a pretty obscure hack to exit if no jobs are present on startup
419
            branch_id, pull_url, unique_name, branch_type_name,
8721.2.3 by Michael Hudson
bit less broken
420
            default_stacked_on_url, self.logger,
14104.6.23 by Robert Collins
Nuke setOopsToken unneeded in a concurrency safe world.
421
            self.codehosting_endpoint)
8721.2.3 by Michael Hudson
bit less broken
422
        return master.run
423
8721.2.2 by Michael Hudson
well this nearly works....
424
    def _poll(self):
9590.1.49 by Michael Hudson
more combining the puller and filesystem endpoints
425
        deferred = self.codehosting_endpoint.callRemote(
10379.2.5 by Michael Hudson
integration test and implied fixes
426
            'acquireBranchToPull', self.branch_type_names)
8721.2.3 by Michael Hudson
bit less broken
427
        deferred.addCallback(self._turnJobTupleIntoTask)
8721.2.2 by Michael Hudson
well this nearly works....
428
        return deferred
429
430
    def run(self):
8721.2.14 by Michael Hudson
lint
431
        consumer = ParallelLimitedTaskConsumer(
9841.1.6 by Jonathan Lange
Make the logging actually work when the script is run, and tweak it
432
            config.supermirror.maximum_workers, logger=self.logger)
8721.2.7 by Michael Hudson
some simplifications and a pretty obscure hack to exit if no jobs are present on startup
433
        self.consumer = consumer
8721.2.15 by Michael Hudson
review comments
434
        source = PollingTaskSource(
9841.1.6 by Jonathan Lange
Make the logging actually work when the script is run, and tweak it
435
            config.supermirror.polling_interval, self._poll,
436
            logger=self.logger)
8721.2.2 by Michael Hudson
well this nearly works....
437
        deferred = consumer.consume(source)
4898.2.23 by Jonathan Lange
Move event listener into separate class.
438
        deferred.addCallback(self._finishedRunning)
4898.2.7 by Jonathan Lange
Mirror multiple branches in subprocesses, limited by a semaphore
439
        return deferred
440
4898.2.23 by Jonathan Lange
Move event listener into separate class.
441
    def _finishedRunning(self, ignored):
442
        self.logger.info('Mirroring complete')
4898.2.50 by Jonathan Lange
Introduce unexpectd error handling. Consolidate mock BranchStatusClients.
443
        return ignored
4898.2.4 by Jonathan Lange
Do the actual mirroring in a subprocess. Very naive implementation.
444
3691.395.4 by David Allouche
supermirror-pull.py takes an argument: "upload", "mirror" or "import", and only pulls the branches of this type. Prevents slow pulls in one queue from stalling the whole supermirror, and helps achieving low latency.
445
    def lock(self):
446
        self.actualLock = GlobalLock(self.lockfilename)
2770.1.55 by Guilherme Salgado
Copy jblack's supermirror pull script files into lib/canonical/launchpad/scripts/supermirror
447
        try:
448
            self.actualLock.acquire()
3691.348.3 by kiko
Fix a leftover script (sorry\!) and change the place we set is_locked to true in glock.py
449
        except LockAlreadyAcquired:
3691.395.4 by David Allouche
supermirror-pull.py takes an argument: "upload", "mirror" or "import", and only pulls the branches of this type. Prevents slow pulls in one queue from stalling the whole supermirror, and helps achieving low latency.
450
            raise LockError(self.lockfilename)
2770.1.55 by Guilherme Salgado
Copy jblack's supermirror pull script files into lib/canonical/launchpad/scripts/supermirror
451
452
    def unlock(self):
453
        self.actualLock.release()
454
4898.2.5 by Jonathan Lange
Clean up further instances where branch_status_client is passed around.
455
    def recordActivity(self, date_started, date_completed):
4501.2.1 by David Allouche
supermirror-pull script inserts to ScriptActivity through the authserver.
456
        """Record successful completion of the script."""
6971.2.1 by Jonathan Lange
Get rid of BranchStatusClient. This should avoid the attribute error we
457
        started_tuple = tuple(date_started.utctimetuple())
458
        completed_tuple = tuple(date_completed.utctimetuple())
9590.1.49 by Michael Hudson
more combining the puller and filesystem endpoints
459
        return self.codehosting_endpoint.callRemote(
6971.2.1 by Jonathan Lange
Get rid of BranchStatusClient. This should avoid the attribute error we
460
            'recordSuccess', self.name, socket.gethostname(), started_tuple,
461
            completed_tuple)
4501.2.1 by David Allouche
supermirror-pull script inserts to ScriptActivity through the authserver.
462
2770.1.55 by Guilherme Salgado
Copy jblack's supermirror pull script files into lib/canonical/launchpad/scripts/supermirror
463
2770.1.56 by Guilherme Salgado
Cleanup loads of code, improved some tests and got all of them to work.
464
class LockError(StandardError):
465
466
    def __init__(self, lockfilename):
5267.1.4 by Michael Hudson
fix lint
467
        StandardError.__init__(self)
2770.1.56 by Guilherme Salgado
Cleanup loads of code, improved some tests and got all of them to work.
468
        self.lockfilename = lockfilename
469
470
    def __str__(self):
471
        return 'Jobmanager unable to get master lock: %s' % self.lockfilename