~launchpad-pqm/launchpad/devel

12622.8.30 by Jonathan Lange
Clarify why we are doing things differently.
1
# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
8687.15.22 by Karl Fogel
Add the copyright header block to the remaining .py files.
2
# GNU Affero General Public License version 3 (see the file LICENSE).
6805.19.53 by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts.
3
4
"""Facilities for running Jobs."""
5
8963.10.6 by Aaron Bentley
Refactor job running in terms of IRunnableJob.
6
__metaclass__ = type
7
10100.1.9 by Jonathan Lange
Export BaseRunnableJob.
8
__all__ = [
9
    'BaseRunnableJob',
13242.1.12 by Aaron Bentley
Fix failing test and lint.
10
    'BaseRunnableJobSource',
7675.624.24 by Tim Penhey
Record the counts of all possible jobs
11
    'JobCronScript',
10100.1.9 by Jonathan Lange
Export BaseRunnableJob.
12
    'JobRunner',
10100.1.21 by Jonathan Lange
Lots of trivial import fixes.
13
    'JobRunnerProcess',
7675.624.52 by Tim Penhey
Lots of logging and a better producer.
14
    'TwistedJobRunner',
10100.1.9 by Jonathan Lange
Export BaseRunnableJob.
15
    ]
6805.19.53 by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts.
16
17
10137.6.10 by Aaron Bentley
Switch from timeouts to deadlines.
18
from calendar import timegm
7675.624.28 by Tim Penhey
Fix collections.
19
from collections import defaultdict
9826.11.35 by Aaron Bentley
Add base implementation of contextManager.
20
import contextlib
10137.6.2 by Aaron Bentley
Fix test to be less timing-dependant.
21
import logging
9826.13.9 by Aaron Bentley
Update for review.
22
import os
13242.1.5 by Aaron Bentley
Add memory_limit support to jobs.
23
from resource import (
24
    getrlimit,
25
    RLIMIT_AS,
26
    setrlimit,
27
    )
11403.1.4 by Henning Eggers
Reformatted imports using format-imports script r32.
28
from signal import (
29
    SIGHUP,
30
    signal,
31
    )
6805.19.53 by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts.
32
import sys
7675.1120.25 by Jeroen Vermeulen
Use generic job runner, thanks to StevenK's suggestion; and document the generic job runner a bit.
33
from textwrap import dedent
6805.19.53 by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts.
34
11403.1.4 by Henning Eggers
Reformatted imports using format-imports script r32.
35
from ampoule import (
36
    child,
37
    main,
38
    pool,
39
    )
40
from lazr.delegates import delegates
10312.4.3 by Aaron Bentley
Reorganize imports
41
import transaction
14027.3.2 by Jeroen Vermeulen
Merge devel, resolve conflicts.
42
from twisted.internet import reactor
13785.5.6 by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2.
43
from twisted.internet.defer import (
13785.5.7 by Aaron Bentley
Switch to inlineCallbacks approach.
44
    inlineCallbacks,
13785.5.6 by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2.
45
    succeed,
46
    )
9826.11.15 by Aaron Bentley
Start trying to use the ampoule process pool.
47
from twisted.protocols import amp
13785.5.7 by Aaron Bentley
Switch to inlineCallbacks approach.
48
from twisted.python import (
49
    failure,
50
    log,
51
    )
9222.1.32 by Aaron Bentley
Split JobCronScript out into job/runner
52
from zope.component import getUtility
9826.11.15 by Aaron Bentley
Start trying to use the ampoule process pool.
53
from zope.security.proxy import removeSecurityProxy
9222.1.32 by Aaron Bentley
Split JobCronScript out into job/runner
54
14022.3.1 by William Grant
Replace initZopeless mostly.
55
from canonical.config import (
56
    config,
57
    dbconfig,
58
    )
14565.2.15 by Curtis Hovey
Moved canonical.launchpad.scripts __init__ to lp.services.scripts.
59
from lp.services import scripts
10312.4.3 by Aaron Bentley
Reorganize imports
60
from canonical.launchpad.webapp import errorlog
11403.1.4 by Henning Eggers
Reformatted imports using format-imports script r32.
61
from lp.services.job.interfaces.job import (
62
    IJob,
63
    IRunnableJob,
64
    LeaseHeld,
13125.2.8 by Julian Edwards
rename the special exception
65
    SuspendJobException,
11403.1.4 by Henning Eggers
Reformatted imports using format-imports script r32.
66
    )
14022.2.4 by William Grant
Move set_immediate_mail_delivery into initZopeless' callsites, out of initZopeless itself.
67
from lp.services.mail.sendmail import (
68
    MailController,
69
    set_immediate_mail_delivery,
70
    )
10548.1.1 by Jonathan Lange
Move twistedsupport to lp.services
71
from lp.services.scripts.base import LaunchpadCronScript
13242.1.3 by Aaron Bentley
Extract run_reactor from TwistedJobRunner.runFromSource
72
from lp.services.twistedsupport import run_reactor
6805.19.53 by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts.
73
74
13242.1.12 by Aaron Bentley
Fix failing test and lint.
75
class BaseRunnableJobSource:
76
    """Base class for job sources for the job runner."""
77
78
    memory_limit = None
79
80
    @staticmethod
81
    @contextlib.contextmanager
82
    def contextManager():
83
        yield
84
85
86
class BaseRunnableJob(BaseRunnableJobSource):
8963.10.6 by Aaron Bentley
Refactor job running in terms of IRunnableJob.
87
    """Base class for jobs to be run via JobRunner.
88
89
    Derived classes should implement IRunnableJob, which requires implementing
90
    IRunnableJob.run.  They should have a `job` member which implements IJob.
91
92
    Subclasses may provide getOopsRecipients, to send mail about oopses.
93
    If so, they should also provide getOperationDescription.
94
    """
8963.10.7 by Aaron Bentley
Switch from manual to automatic delegation.
95
    delegates(IJob, 'job')
8963.10.6 by Aaron Bentley
Refactor job running in terms of IRunnableJob.
96
9006.3.3 by Aaron Bentley
Cleanup
97
    user_error_types = ()
98
13084.3.3 by Aaron Bentley
Implement retries.
99
    retry_error_types = ()
100
10224.20.11 by Graham Binns
Review changes for Gavin.
101
    # We redefine __eq__ and __ne__ here to prevent the security proxy
102
    # from mucking up our comparisons in tests and elsewhere.
103
    def __eq__(self, job):
14579.1.3 by Julian Edwards
review suggestion: fix comparisons on jobs by adding __lt__ and fixing __eq__ to look at the job's __dict__ (thanks allenap).
104
        naked_job = removeSecurityProxy(job)
10224.20.11 by Graham Binns
Review changes for Gavin.
105
        return (
14579.1.3 by Julian Edwards
review suggestion: fix comparisons on jobs by adding __lt__ and fixing __eq__ to look at the job's __dict__ (thanks allenap).
106
            self.__class__ is naked_job.__class__ and
107
            self.__dict__ == naked_job.__dict__)
10224.20.11 by Graham Binns
Review changes for Gavin.
108
109
    def __ne__(self, job):
110
        return not (self == job)
111
14579.1.3 by Julian Edwards
review suggestion: fix comparisons on jobs by adding __lt__ and fixing __eq__ to look at the job's __dict__ (thanks allenap).
112
    def __lt__(self, job):
113
        naked_job = removeSecurityProxy(job)
114
        if self.__class__ is naked_job.__class__:
115
            return self.__dict__ < naked_job.__dict__
116
        else:
117
            return NotImplemented
118
8963.10.6 by Aaron Bentley
Refactor job running in terms of IRunnableJob.
119
    def getOopsRecipients(self):
120
        """Return a list of email-ids to notify about oopses."""
9006.3.6 by Aaron Bentley
Fix call to getErrorRecipients.
121
        return self.getErrorRecipients()
9006.3.2 by Aaron Bentley
Mail user errors
122
7675.659.2 by Tim Penhey
Add extra error handling in case the sending of emails fails.
123
    def getOperationDescription(self):
124
        return 'unspecified operation'
125
9006.3.2 by Aaron Bentley
Mail user errors
126
    def getErrorRecipients(self):
127
        """Return a list of email-ids to notify about user errors."""
8963.10.6 by Aaron Bentley
Refactor job running in terms of IRunnableJob.
128
        return []
129
130
    def getOopsMailController(self, oops_id):
131
        """Return a MailController for notifying people about oopses.
132
133
        Return None if there is no-one to notify.
134
        """
135
        recipients = self.getOopsRecipients()
136
        if len(recipients) == 0:
137
            return None
9006.3.3 by Aaron Bentley
Cleanup
138
        subject = 'Launchpad internal error'
8963.10.6 by Aaron Bentley
Refactor job running in terms of IRunnableJob.
139
        body = (
140
            'Launchpad encountered an internal error during the following'
141
            ' operation: %s.  It was logged with id %s.  Sorry for the'
142
            ' inconvenience.' % (self.getOperationDescription(), oops_id))
8963.10.12 by Aaron Bentley
Use official noreply address for error mailings.
143
        from_addr = config.canonical.noreply_from_address
9006.3.3 by Aaron Bentley
Cleanup
144
        return MailController(from_addr, recipients, subject, body)
8963.10.6 by Aaron Bentley
Refactor job running in terms of IRunnableJob.
145
9006.3.2 by Aaron Bentley
Mail user errors
146
    def getUserErrorMailController(self, e):
9006.3.5 by Aaron Bentley
Cleanup.
147
        """Return a MailController for notifying about user errors.
9006.3.2 by Aaron Bentley
Mail user errors
148
149
        Return None if there is no-one to notify.
150
        """
151
        recipients = self.getErrorRecipients()
152
        if len(recipients) == 0:
153
            return None
9006.3.3 by Aaron Bentley
Cleanup
154
        subject = 'Launchpad error while %s' % self.getOperationDescription()
9006.3.2 by Aaron Bentley
Mail user errors
155
        body = (
156
            'Launchpad encountered an error during the following'
157
            ' operation: %s.  %s' % (self.getOperationDescription(), str(e)))
158
        from_addr = config.canonical.noreply_from_address
159
        return MailController(from_addr, recipients, subject, body)
160
8963.10.6 by Aaron Bentley
Refactor job running in terms of IRunnableJob.
161
    def notifyOops(self, oops):
162
        """Report this oops."""
13660.1.1 by Robert Collins
Use the write method from python-oops.
163
        ctrl = self.getOopsMailController(oops['id'])
8963.10.6 by Aaron Bentley
Refactor job running in terms of IRunnableJob.
164
        if ctrl is None:
165
            return
166
        ctrl.send()
167
9314.1.1 by Aaron Bentley
Add oops var handling to the Job infrastructure
168
    def getOopsVars(self):
9314.2.3 by Aaron Bentley
Update documentation
169
        """See `IRunnableJob`."""
9314.1.1 by Aaron Bentley
Add oops var handling to the Job infrastructure
170
        return [('job_id', self.job.id)]
171
9006.3.2 by Aaron Bentley
Mail user errors
172
    def notifyUserError(self, e):
173
        """See `IRunnableJob`."""
174
        ctrl = self.getUserErrorMailController(e)
175
        if ctrl is None:
176
            return
177
        ctrl.send()
178
8963.10.6 by Aaron Bentley
Refactor job running in terms of IRunnableJob.
179
9826.11.9 by Aaron Bentley
Switch to PollingTaskConsumer and refactor.
180
class BaseJobRunner(object):
6805.19.53 by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts.
181
    """Runner of Jobs."""
182
9826.13.5 by Aaron Bentley
Allow specifying error utility.
183
    def __init__(self, logger=None, error_utility=None):
6805.19.53 by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts.
184
        self.completed_jobs = []
185
        self.incomplete_jobs = []
7675.505.4 by Paul Hummer
Fixed tests
186
        if logger is None:
187
            logger = logging.getLogger()
9314.1.4 by Aaron Bentley
Emit oopses in logs.
188
        self.logger = logger
9826.13.5 by Aaron Bentley
Allow specifying error utility.
189
        self.error_utility = error_utility
13242.1.7 by Aaron Bentley
Get oops-ids from job runner.
190
        self.oops_ids = []
9826.13.5 by Aaron Bentley
Allow specifying error utility.
191
        if self.error_utility is None:
192
            self.error_utility = errorlog.globalErrorUtility
6805.19.53 by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts.
193
13785.5.6 by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2.
194
    def acquireLease(self, job):
195
        self.logger.debug(
196
            'Trying to acquire lease for job in state %s' % (
197
                job.status.title,))
198
        try:
199
            job.acquireLease()
200
        except LeaseHeld:
13785.5.9 by Aaron Bentley
Report lease-acquisition-failure as prominently as other failures.
201
            self.logger.info(
13785.5.6 by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2.
202
                'Could not acquire lease for %s' % self.job_str(job))
203
            self.incomplete_jobs.append(job)
204
            return False
205
        return True
206
207
    @staticmethod
208
    def job_str(job):
209
        class_name = job.__class__.__name__
210
        ijob_id = removeSecurityProxy(job).job.id
211
        return '%s (ID %d)' % (class_name, ijob_id)
212
6805.19.53 by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts.
213
    def runJob(self, job):
214
        """Attempt to run a job, updating its status as appropriate."""
8963.10.6 by Aaron Bentley
Refactor job running in terms of IRunnableJob.
215
        job = IRunnableJob(job)
7675.505.1 by Aaron Bentley
Commit logging cowboy.
216
13626.4.1 by Danilo Segan
Log Job ID when running jobs.
217
        self.logger.info(
13785.5.6 by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2.
218
            'Running %s in status %s' % (
219
                self.job_str(job), job.status.title))
7675.505.1 by Aaron Bentley
Commit logging cowboy.
220
        job.start()
221
        transaction.commit()
13084.3.3 by Aaron Bentley
Implement retries.
222
        do_retry = False
6805.19.53 by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts.
223
        try:
13084.3.3 by Aaron Bentley
Implement retries.
224
            try:
225
                job.run()
226
            except job.retry_error_types, e:
227
                if job.attempt_count > job.max_retries:
228
                    raise
229
                self.logger.exception(
230
                    "Scheduling retry due to %s.", e.__class__.__name__)
231
                do_retry = True
13125.2.8 by Julian Edwards
rename the special exception
232
        except SuspendJobException:
13125.2.6 by Julian Edwards
Add new SuspendJobError exception handler
233
            self.logger.debug("Job suspended itself")
234
            job.suspend()
235
            self.incomplete_jobs.append(job)
7675.659.4 by Tim Penhey
logger.exception expects a message not an exception.
236
        except Exception:
8969.2.1 by Aaron Bentley
Abort transaction when running a job that raises an exception.
237
            transaction.abort()
8963.10.6 by Aaron Bentley
Refactor job running in terms of IRunnableJob.
238
            job.fail()
7675.336.1 by Tim Penhey
Add a missing commit.
239
            # Record the failure.
240
            transaction.commit()
6805.19.53 by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts.
241
            self.incomplete_jobs.append(job)
242
            raise
243
        else:
6805.19.59 by Aaron Bentley
Fix failing test
244
            # Commit transaction to update the DB time.
245
            transaction.commit()
13084.3.3 by Aaron Bentley
Implement retries.
246
            if do_retry:
247
                job.queue()
248
                self.incomplete_jobs.append(job)
249
            else:
250
                job.complete()
251
                self.completed_jobs.append(job)
6805.19.59 by Aaron Bentley
Fix failing test
252
        # Commit transaction to update job status.
253
        transaction.commit()
6805.19.53 by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts.
254
9826.11.9 by Aaron Bentley
Switch to PollingTaskConsumer and refactor.
255
    def runJobHandleError(self, job):
9826.12.9 by Aaron Bentley
Update docs.
256
        """Run the specified job, handling errors.
257
258
        Most errors will be logged as Oopses.  Jobs in user_error_types won't.
259
        The list of complete or incomplete jobs will be updated.
260
        """
9826.11.9 by Aaron Bentley
Switch to PollingTaskConsumer and refactor.
261
        job = IRunnableJob(job)
9826.13.5 by Aaron Bentley
Allow specifying error utility.
262
        with self.error_utility.oopsMessage(
9826.11.13 by Aaron Bentley
Merge stable into twistedjob.
263
            dict(job.getOopsVars())):
264
            try:
7675.659.2 by Tim Penhey
Add extra error handling in case the sending of emails fails.
265
                try:
266
                    self.logger.debug('Running %r', job)
267
                    self.runJob(job)
268
                except job.user_error_types, e:
14146.1.5 by Martin Pool
Message grammar fixes
269
                    self.logger.info('Job %r failed with user error %r.' %
14146.1.1 by Martin Pool
Log when jobs fail, even due to a user error
270
                        (job, e))
7675.659.2 by Tim Penhey
Add extra error handling in case the sending of emails fails.
271
                    job.notifyUserError(e)
272
                except Exception:
273
                    info = sys.exc_info()
274
                    return self._doOops(job, info)
7675.659.4 by Tim Penhey
logger.exception expects a message not an exception.
275
            except Exception:
7675.659.2 by Tim Penhey
Add extra error handling in case the sending of emails fails.
276
                # This only happens if sending attempting to notify users
277
                # about errors fails for some reason (like a misconfigured
278
                # email server).
7675.659.4 by Tim Penhey
logger.exception expects a message not an exception.
279
                self.logger.exception(
280
                    "Failed to notify users about a failure.")
9826.11.13 by Aaron Bentley
Merge stable into twistedjob.
281
                info = sys.exc_info()
7675.659.2 by Tim Penhey
Add extra error handling in case the sending of emails fails.
282
                # Returning the oops says something went wrong.
7675.863.2 by Julian Edwards
include a fix from r11597 of devel that tries to stop test isolation issues pertaining to OOPSes
283
                return self.error_utility.raising(info)
9826.11.29 by Aaron Bentley
Fix exception handling and SIGCHLD signal handling.
284
9826.13.5 by Aaron Bentley
Allow specifying error utility.
285
    def _doOops(self, job, info):
9826.13.9 by Aaron Bentley
Update for review.
286
        """Report an OOPS for the provided job and info.
287
288
        :param job: The IRunnableJob whose run failed.
289
        :param info: The standard sys.exc_info() value.
290
        :return: the Oops that was reported.
291
        """
7675.863.2 by Julian Edwards
include a fix from r11597 of devel that tries to stop test isolation issues pertaining to OOPSes
292
        oops = self.error_utility.raising(info)
9826.11.29 by Aaron Bentley
Fix exception handling and SIGCHLD signal handling.
293
        job.notifyOops(oops)
294
        return oops
9826.11.21 by Aaron Bentley
Ensure oopses are handled correctly.
295
9826.11.22 by Aaron Bentley
Fix oops logging.
296
    def _logOopsId(self, oops_id):
9826.13.9 by Aaron Bentley
Update for review.
297
        """Report oopses by id to the log."""
9826.11.21 by Aaron Bentley
Ensure oopses are handled correctly.
298
        if self.logger is not None:
13626.4.4 by Danilo Segan
j.c.sackett noticed an unneeded change, reverting.
299
            self.logger.info('Job resulted in OOPS: %s' % oops_id)
13242.1.7 by Aaron Bentley
Get oops-ids from job runner.
300
        self.oops_ids.append(oops_id)
9826.11.9 by Aaron Bentley
Switch to PollingTaskConsumer and refactor.
301
302
303
class JobRunner(BaseJobRunner):
304
305
    def __init__(self, jobs, logger=None):
306
        BaseJobRunner.__init__(self, logger=logger)
307
        self.jobs = jobs
308
309
    @classmethod
310
    def fromReady(cls, job_class, logger=None):
311
        """Return a job runner for all ready jobs of a given class."""
312
        return cls(job_class.iterReady(), logger)
313
9826.11.10 by Aaron Bentley
Allow JobCronScript to be parameterized by runner, remove TwistedJobCronSript
314
    @classmethod
10372.1.1 by Aaron Bentley
Run with the designated database user.
315
    def runFromSource(cls, job_source, dbuser, logger):
10372.1.3 by Aaron Bentley
Updates from review.
316
        """Run all ready jobs provided by the specified source.
317
318
        The dbuser parameter is ignored.
319
        """
9826.11.26 by Aaron Bentley
Provide job running context in a ContextManager.
320
        with removeSecurityProxy(job_source.contextManager()):
321
            logger.info("Running synchronously.")
322
            runner = cls.fromReady(job_source, logger)
323
            runner.runAll()
9826.11.10 by Aaron Bentley
Allow JobCronScript to be parameterized by runner, remove TwistedJobCronSript
324
        return runner
325
6805.19.53 by Aaron Bentley
Use codehosting.JobRunner to run jobs for scripts.
326
    def runAll(self):
327
        """Run all the Jobs for this JobRunner."""
328
        for job in self.jobs:
9826.11.47 by Aaron Bentley
Fix names and merge breakage.
329
            job = IRunnableJob(job)
13785.5.6 by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2.
330
            if not self.acquireLease(job):
9826.11.36 by Aaron Bentley
Use lease to determine timeout.
331
                continue
332
            # Commit transaction to clear the row lock.
333
            transaction.commit()
9826.11.21 by Aaron Bentley
Ensure oopses are handled correctly.
334
            oops = self.runJobHandleError(job)
335
            if oops is not None:
13660.1.1 by Robert Collins
Use the write method from python-oops.
336
                self._logOopsId(oops['id'])
9826.11.9 by Aaron Bentley
Switch to PollingTaskConsumer and refactor.
337
338
9826.13.9 by Aaron Bentley
Update for review.
339
class RunJobCommand(amp.Command):
9826.11.18 by Aaron Bentley
UpdatePreviewDiff jobs successfully complete under ampoule.
340
9826.11.15 by Aaron Bentley
Start trying to use the ampoule process pool.
341
    arguments = [('job_id', amp.Integer())]
9826.11.21 by Aaron Bentley
Ensure oopses are handled correctly.
342
    response = [('success', amp.Integer()), ('oops_id', amp.String())]
9826.11.15 by Aaron Bentley
Start trying to use the ampoule process pool.
343
344
12269.4.10 by Aaron Bentley
Cleanup.
345
def import_source(job_source_name):
346
    """Return the IJobSource specified by its full name."""
347
    module, name = job_source_name.rsplit('.', 1)
348
    source_module = __import__(module, fromlist=[name])
349
    return getattr(source_module, name)
350
351
9826.13.9 by Aaron Bentley
Update for review.
352
class JobRunnerProcess(child.AMPChild):
353
    """Base class for processes that run jobs."""
9826.11.20 by Aaron Bentley
Avoid repeating setup and teardown.
354
10372.1.1 by Aaron Bentley
Run with the designated database user.
355
    def __init__(self, job_source_name, dbuser):
9826.11.20 by Aaron Bentley
Avoid repeating setup and teardown.
356
        child.AMPChild.__init__(self)
12269.4.10 by Aaron Bentley
Cleanup.
357
        self.job_source = import_source(job_source_name)
10137.6.12 by Aaron Bentley
Use command arguments instead of subclassing JobRunnerProcess.
358
        self.context_manager = self.job_source.contextManager()
10372.1.1 by Aaron Bentley
Run with the designated database user.
359
        # icky, but it's really a global value anyhow.
360
        self.__class__.dbuser = dbuser
9826.11.20 by Aaron Bentley
Avoid repeating setup and teardown.
361
10372.1.1 by Aaron Bentley
Run with the designated database user.
362
    @classmethod
363
    def __enter__(cls):
10137.6.8 by Aaron Bentley
Use Context Manager instead of custom BOOTSTRAP.
364
        def handler(signum, frame):
12622.8.31 by Jonathan Lange
Do it twice, to make sure and to get a better stack trace.
365
            # We raise an exception **and** schedule a call to exit the
366
            # process hard.  This is because we cannot rely on the exception
367
            # being raised during useful code.  Sometimes, it will be raised
368
            # while the reactor is looping, which means that it will be
369
            # ignored.
370
            #
371
            # If the exception is raised during the actual job, then we'll get
372
            # a nice traceback indicating what timed out, and that will be
373
            # logged as an OOPS.
374
            #
375
            # Regardless of where the exception is raised, we'll hard exit the
376
            # process and have a TimeoutError OOPS logged, although that will
377
            # have a crappy traceback. See the job_raised callback in
378
            # TwistedJobRunner.runJobInSubprocess for the other half of that.
379
            reactor.callFromThread(
380
                reactor.callLater, 0, os._exit, TwistedJobRunner.TIMEOUT_CODE)
381
            raise TimeoutError
10137.6.8 by Aaron Bentley
Use Context Manager instead of custom BOOTSTRAP.
382
        scripts.execute_zcml_for_scripts(use_web_security=False)
383
        signal(SIGHUP, handler)
14022.3.1 by William Grant
Replace initZopeless mostly.
384
        dbconfig.override(dbuser=cls.dbuser, isolation_level='read_committed')
14022.2.4 by William Grant
Move set_immediate_mail_delivery into initZopeless' callsites, out of initZopeless itself.
385
        # XXX wgrant 2011-09-24 bug=29744: initZopeless used to do this.
386
        # Should be removed from callsites verified to not need it.
387
        set_immediate_mail_delivery(True)
10137.6.8 by Aaron Bentley
Use Context Manager instead of custom BOOTSTRAP.
388
389
    @staticmethod
390
    def __exit__(exc_type, exc_val, exc_tb):
391
        pass
392
9826.11.20 by Aaron Bentley
Avoid repeating setup and teardown.
393
    def makeConnection(self, transport):
9826.13.9 by Aaron Bentley
Update for review.
394
        """The Job context is entered on connect."""
9826.11.20 by Aaron Bentley
Avoid repeating setup and teardown.
395
        child.AMPChild.makeConnection(self, transport)
9826.11.26 by Aaron Bentley
Provide job running context in a ContextManager.
396
        self.context_manager.__enter__()
9826.11.20 by Aaron Bentley
Avoid repeating setup and teardown.
397
398
    def connectionLost(self, reason):
9826.13.9 by Aaron Bentley
Update for review.
399
        """The Job context is left on disconnect."""
9826.11.26 by Aaron Bentley
Provide job running context in a ContextManager.
400
        self.context_manager.__exit__(None, None, None)
9826.11.20 by Aaron Bentley
Avoid repeating setup and teardown.
401
        child.AMPChild.connectionLost(self, reason)
402
9826.13.9 by Aaron Bentley
Update for review.
403
    @RunJobCommand.responder
404
    def runJobCommand(self, job_id):
10137.6.12 by Aaron Bentley
Use command arguments instead of subclassing JobRunnerProcess.
405
        """Run a job from this job_source according to its job id."""
9826.11.15 by Aaron Bentley
Start trying to use the ampoule process pool.
406
        runner = BaseJobRunner()
10137.6.12 by Aaron Bentley
Use command arguments instead of subclassing JobRunnerProcess.
407
        job = self.job_source.get(job_id)
13242.1.5 by Aaron Bentley
Add memory_limit support to jobs.
408
        if self.job_source.memory_limit is not None:
409
            soft_limit, hard_limit = getrlimit(RLIMIT_AS)
410
            if soft_limit != self.job_source.memory_limit:
411
                limits = (self.job_source.memory_limit, hard_limit)
412
                setrlimit(RLIMIT_AS, limits)
9826.11.21 by Aaron Bentley
Ensure oopses are handled correctly.
413
        oops = runner.runJobHandleError(job)
414
        if oops is None:
415
            oops_id = ''
416
        else:
13660.1.1 by Robert Collins
Use the write method from python-oops.
417
            oops_id = oops['id']
9826.11.21 by Aaron Bentley
Ensure oopses are handled correctly.
418
        return {'success': len(runner.completed_jobs), 'oops_id': oops_id}
9826.11.15 by Aaron Bentley
Start trying to use the ampoule process pool.
419
420
9826.11.9 by Aaron Bentley
Switch to PollingTaskConsumer and refactor.
421
class TwistedJobRunner(BaseJobRunner):
9826.12.9 by Aaron Bentley
Update docs.
422
    """Run Jobs via twisted."""
9826.11.9 by Aaron Bentley
Switch to PollingTaskConsumer and refactor.
423
12622.8.25 by Jonathan Lange
Make both tests pass reliably
424
    TIMEOUT_CODE = 42
425
10372.1.1 by Aaron Bentley
Run with the designated database user.
426
    def __init__(self, job_source, dbuser, logger=None, error_utility=None):
7675.845.15 by Edwin Grubbs
Better cron script.
427
        env = {'PATH': os.environ['PATH']}
13785.5.11 by Aaron Bentley
Inject current sys.path into subprocesses.
428
        if 'LPCONFIG' in os.environ:
429
            env['LPCONFIG'] = os.environ['LPCONFIG']
430
        env['PYTHONPATH'] = os.pathsep.join(sys.path)
431
        starter = main.ProcessStarter(env=env)
9826.13.9 by Aaron Bentley
Update for review.
432
        super(TwistedJobRunner, self).__init__(logger, error_utility)
9826.11.9 by Aaron Bentley
Switch to PollingTaskConsumer and refactor.
433
        self.job_source = job_source
13626.4.1 by Danilo Segan
Log Job ID when running jobs.
434
        self.import_name = '%s.%s' % (
10137.6.13 by Aaron Bentley
Ensure update_preview_diffs doesn't require UpdatePreviewDiffJob.amp
435
            removeSecurityProxy(job_source).__module__, job_source.__name__)
10137.6.9 by Aaron Bentley
Parameterize ProcessPool instead of using HUPProcessPool.
436
        self.pool = pool.ProcessPool(
13626.4.1 by Danilo Segan
Log Job ID when running jobs.
437
            JobRunnerProcess, ampChildArgs=[self.import_name, str(dbuser)],
10372.1.1 by Aaron Bentley
Run with the designated database user.
438
            starter=starter, min=0, timeout_signal=SIGHUP)
9826.11.15 by Aaron Bentley
Start trying to use the ampoule process pool.
439
440
    def runJobInSubprocess(self, job):
9826.13.9 by Aaron Bentley
Update for review.
441
        """Run the job_class with the specified id in the process pool.
442
443
        :return: a Deferred that fires when the job has completed.
444
        """
9826.11.47 by Aaron Bentley
Fix names and merge breakage.
445
        job = IRunnableJob(job)
13785.5.6 by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2.
446
        if not self.acquireLease(job):
447
            return succeed(None)
7675.624.54 by Tim Penhey
Commit the transaction once the lease is acquired.
448
        # Commit transaction to clear the row lock.
449
        transaction.commit()
9826.11.27 by Aaron Bentley
Stop removing security proxy to get job id.
450
        job_id = job.id
10137.6.10 by Aaron Bentley
Switch from timeouts to deadlines.
451
        deadline = timegm(job.lease_expires.timetuple())
13626.4.2 by Danilo Segan
Log the ID for twistedjobrunner as well.
452
453
        # Log the job class and database ID for debugging purposes.
454
        self.logger.info(
13785.5.6 by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2.
455
            'Running %s.' % self.job_str(job))
7675.845.20 by Edwin Grubbs
Formatted imports and delinted.
456
        self.logger.debug(
457
            'Running %r, lease expires %s', job, job.lease_expires)
9826.11.47 by Aaron Bentley
Fix names and merge breakage.
458
        deferred = self.pool.doWork(
12622.8.28 by Jonathan Lange
Reduce duplication by making the StuckJob twins more data driven
459
            RunJobCommand, job_id=job_id, _deadline=deadline)
12269.4.6 by Aaron Bentley
Support running TranslationMergeJobs from a script.
460
9826.11.18 by Aaron Bentley
UpdatePreviewDiff jobs successfully complete under ampoule.
461
        def update(response):
13785.5.8 by Aaron Bentley
Handle no-response jobs better.
462
            if response is None:
463
                self.incomplete_jobs.append(job)
464
                self.logger.debug('No response for %r', job)
465
                return
9826.11.18 by Aaron Bentley
UpdatePreviewDiff jobs successfully complete under ampoule.
466
            if response['success']:
467
                self.completed_jobs.append(job)
7675.624.52 by Tim Penhey
Lots of logging and a better producer.
468
                self.logger.debug('Finished %r', job)
9826.11.18 by Aaron Bentley
UpdatePreviewDiff jobs successfully complete under ampoule.
469
            else:
470
                self.incomplete_jobs.append(job)
7675.624.52 by Tim Penhey
Lots of logging and a better producer.
471
                self.logger.debug('Incomplete %r', job)
13242.1.4 by Aaron Bentley
Processes from failed jobs are not reused.
472
                # Kill the worker that experienced a failure; this only
473
                # works because there's a single worker.
474
                self.pool.stopAWorker()
9826.11.21 by Aaron Bentley
Ensure oopses are handled correctly.
475
            if response['oops_id'] != '':
9826.11.22 by Aaron Bentley
Fix oops logging.
476
                self._logOopsId(response['oops_id'])
12269.4.6 by Aaron Bentley
Support running TranslationMergeJobs from a script.
477
9826.11.29 by Aaron Bentley
Fix exception handling and SIGCHLD signal handling.
478
        def job_raised(failure):
479
            self.incomplete_jobs.append(job)
12622.8.25 by Jonathan Lange
Make both tests pass reliably
480
            exit_code = getattr(failure.value, 'exitCode', None)
481
            if exit_code == self.TIMEOUT_CODE:
12622.8.30 by Jonathan Lange
Clarify why we are doing things differently.
482
                # The process ended with the error code that we have
483
                # arbitrarily chosen to indicate a timeout. Rather than log
484
                # that error (ProcessDone), we log a TimeoutError instead.
12622.8.25 by Jonathan Lange
Make both tests pass reliably
485
                self._logTimeout(job)
486
            else:
487
                info = (failure.type, failure.value, failure.tb)
488
                oops = self._doOops(job, info)
13660.1.1 by Robert Collins
Use the write method from python-oops.
489
                self._logOopsId(oops['id'])
9826.11.29 by Aaron Bentley
Fix exception handling and SIGCHLD signal handling.
490
        deferred.addCallbacks(update, job_raised)
9826.11.18 by Aaron Bentley
UpdatePreviewDiff jobs successfully complete under ampoule.
491
        return deferred
9826.11.9 by Aaron Bentley
Switch to PollingTaskConsumer and refactor.
492
12622.8.25 by Jonathan Lange
Make both tests pass reliably
493
    def _logTimeout(self, job):
494
        try:
495
            raise TimeoutError
496
        except TimeoutError:
497
            oops = self._doOops(job, sys.exc_info())
13660.1.1 by Robert Collins
Use the write method from python-oops.
498
            self._logOopsId(oops['id'])
12622.8.25 by Jonathan Lange
Make both tests pass reliably
499
13785.5.7 by Aaron Bentley
Switch to inlineCallbacks approach.
500
    @inlineCallbacks
9826.11.9 by Aaron Bentley
Switch to PollingTaskConsumer and refactor.
501
    def runAll(self):
13785.5.6 by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2.
502
        """Run all ready jobs."""
9826.13.9 by Aaron Bentley
Update for review.
503
        self.pool.start()
13785.5.6 by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2.
504
        try:
13785.5.7 by Aaron Bentley
Switch to inlineCallbacks approach.
505
            try:
13785.5.12 by Aaron Bentley
Report when no jobs ran.
506
                job = None
13785.5.7 by Aaron Bentley
Switch to inlineCallbacks approach.
507
                for job in self.job_source.iterReady():
508
                    yield self.runJobInSubprocess(job)
13785.5.12 by Aaron Bentley
Report when no jobs ran.
509
                if job is None:
510
                    self.logger.info('No jobs to run.')
13785.5.6 by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2.
511
                self.terminated()
13785.5.7 by Aaron Bentley
Switch to inlineCallbacks approach.
512
            except:
513
                self.failed(failure.Failure())
13785.5.6 by Aaron Bentley
Fake merge of devel r13844 into simplify-twisted-runner-2.
514
        except:
515
            self.terminated()
516
            raise
9826.11.15 by Aaron Bentley
Start trying to use the ampoule process pool.
517
518
    def terminated(self, ignored=None):
9826.13.9 by Aaron Bentley
Update for review.
519
        """Callback to stop the processpool and reactor."""
520
        deferred = self.pool.stop()
9826.11.15 by Aaron Bentley
Start trying to use the ampoule process pool.
521
        deferred.addBoth(lambda ignored: reactor.stop())
522
523
    def failed(self, failure):
9826.12.9 by Aaron Bentley
Update docs.
524
        """Callback for when the job fails."""
9826.11.9 by Aaron Bentley
Switch to PollingTaskConsumer and refactor.
525
        failure.printTraceback()
9826.11.15 by Aaron Bentley
Start trying to use the ampoule process pool.
526
        self.terminated()
9222.1.32 by Aaron Bentley
Split JobCronScript out into job/runner
527
9826.11.10 by Aaron Bentley
Allow JobCronScript to be parameterized by runner, remove TwistedJobCronSript
528
    @classmethod
13242.1.2 by Aaron Bentley
Support runnning TwistedJobRunner with logging.
529
    def runFromSource(cls, job_source, dbuser, logger, _log_twisted=False):
10372.1.3 by Aaron Bentley
Updates from review.
530
        """Run all ready jobs provided by the specified source.
531
532
        The dbuser parameter is not ignored.
13242.1.2 by Aaron Bentley
Support runnning TwistedJobRunner with logging.
533
        :param _log_twisted: For debugging: If True, emit verbose Twisted
534
            messages to stderr.
10372.1.3 by Aaron Bentley
Updates from review.
535
        """
9826.11.10 by Aaron Bentley
Allow JobCronScript to be parameterized by runner, remove TwistedJobCronSript
536
        logger.info("Running through Twisted.")
13242.1.2 by Aaron Bentley
Support runnning TwistedJobRunner with logging.
537
        if _log_twisted:
538
            logging.getLogger().setLevel(0)
539
            logger_object = logging.getLogger('twistedjobrunner')
540
            handler = logging.StreamHandler(sys.stderr)
541
            logger_object.addHandler(handler)
542
            observer = log.PythonLoggingObserver(
543
                loggerName='twistedjobrunner')
544
            log.startLoggingWithObserver(observer.emit)
10372.1.3 by Aaron Bentley
Updates from review.
545
        runner = cls(job_source, dbuser, logger)
9826.11.10 by Aaron Bentley
Allow JobCronScript to be parameterized by runner, remove TwistedJobCronSript
546
        reactor.callWhenRunning(runner.runAll)
13242.1.3 by Aaron Bentley
Extract run_reactor from TwistedJobRunner.runFromSource
547
        run_reactor()
9826.11.10 by Aaron Bentley
Allow JobCronScript to be parameterized by runner, remove TwistedJobCronSript
548
        return runner
549
9222.1.32 by Aaron Bentley
Split JobCronScript out into job/runner
550
551
class JobCronScript(LaunchpadCronScript):
7675.1120.27 by Jeroen Vermeulen
Documented generic job runner a bit better.
552
    """Generic job runner.
553
554
    :ivar config_name: Optional name of a configuration section that specifies
555
        the jobs to run.  Alternatively, may be taken from the command line.
556
    :ivar source_interface: `IJobSource`-derived utility to iterate pending
557
        jobs of the type that is to be run.
558
    """
559
7675.845.22 by Edwin Grubbs
Suggestions from reviewer.
560
    config_name = None
561
7675.1120.25 by Jeroen Vermeulen
Use generic job runner, thanks to StevenK's suggestion; and document the generic job runner a bit.
562
    usage = dedent("""\
563
        run_jobs.py [options] [lazr-configuration-section]
564
565
        Run Launchpad Jobs of one particular type.
566
567
        The lazr configuration section specifies what jobs to run, and how.
568
        It should provide at least:
569
570
         * source_interface, the name of the IJobSource-derived utility
571
           interface for the job type that you want to run.
572
573
         * dbuser, the name of the database role to run the job under.
574
        """).rstrip()
575
576
    description = (
577
        "Takes pending jobs of the given type off the queue and runs them.")
578
12269.4.6 by Aaron Bentley
Support running TranslationMergeJobs from a script.
579
    def __init__(self, runner_class=JobRunner, test_args=None, name=None,
13701.1.2 by Aaron Bentley
Provide log-twisted option if job runner is TwistedJobRunner.
580
                 commandline_config=False):
7675.1120.27 by Jeroen Vermeulen
Documented generic job runner a bit better.
581
        """Initialize a `JobCronScript`.
582
583
        :param runner_class: The runner class to use.  Defaults to
584
            `JobRunner`, which runs synchronously, but could also be
585
            `TwistedJobRunner` which is asynchronous.
586
        :param test_args: For tests: pretend that this list of arguments has
587
            been passed on the command line.
588
        :param name: Identifying name for this type of job.  Is also used to
589
            compose a lock file name.
590
        :param commandline_config: If True, take configuration from the
591
            command line (in the form of a config section name).  Otherwise,
592
            rely on the subclass providing `config_name` and
593
            `source_interface`.
594
        """
13701.1.2 by Aaron Bentley
Provide log-twisted option if job runner is TwistedJobRunner.
595
        self._runner_class = runner_class
10427.22.2 by Gavin Panella
Override command-line args when running JobCronScriptSubclass in-process.
596
        super(JobCronScript, self).__init__(
7675.845.27 by Edwin Grubbs
Merged in db-devel.
597
            name=name, dbuser=None, test_args=test_args)
13701.1.5 by Aaron Bentley
Use JobCronScript.log_twisted.
598
        self.log_twisted = getattr(self.options, 'log_twisted', False)
12269.4.6 by Aaron Bentley
Support running TranslationMergeJobs from a script.
599
        if not commandline_config:
600
            return
601
        self.config_name = self.args[0]
12269.4.10 by Aaron Bentley
Cleanup.
602
        self.source_interface = import_source(
603
            self.config_section.source_interface)
7675.845.27 by Edwin Grubbs
Merged in db-devel.
604
13701.1.2 by Aaron Bentley
Provide log-twisted option if job runner is TwistedJobRunner.
605
    def add_my_options(self):
606
        if self.runner_class is TwistedJobRunner:
13701.1.6 by Aaron Bentley
Special-case process-job-source.
607
            self.add_log_twisted_option()
608
609
    def add_log_twisted_option(self):
610
        self.parser.add_option(
611
            '--log-twisted', action='store_true', default=False,
612
            help='Enable extra Twisted logging.')
13701.1.2 by Aaron Bentley
Provide log-twisted option if job runner is TwistedJobRunner.
613
7675.845.27 by Edwin Grubbs
Merged in db-devel.
614
    @property
7675.845.35 by Edwin Grubbs
Fixed tests.
615
    def dbuser(self):
616
        return self.config_section.dbuser
617
618
    @property
7675.845.27 by Edwin Grubbs
Merged in db-devel.
619
    def runner_class(self):
620
        """Enable subclasses to override with command-line arguments."""
621
        return self._runner_class
9826.11.10 by Aaron Bentley
Allow JobCronScript to be parameterized by runner, remove TwistedJobCronSript
622
7675.624.24 by Tim Penhey
Record the counts of all possible jobs
623
    def job_counts(self, jobs):
624
        """Return a list of tuples containing the job name and counts."""
625
        counts = defaultdict(lambda: 0)
626
        for job in jobs:
627
            counts[job.__class__.__name__] += 1
628
        return sorted(counts.items())
629
7675.845.27 by Edwin Grubbs
Merged in db-devel.
630
    @property
631
    def config_section(self):
632
        return getattr(config, self.config_name)
7675.845.22 by Edwin Grubbs
Suggestions from reviewer.
633
9826.11.10 by Aaron Bentley
Allow JobCronScript to be parameterized by runner, remove TwistedJobCronSript
634
    def main(self):
7675.845.27 by Edwin Grubbs
Merged in db-devel.
635
        section = self.config_section
7675.845.21 by Edwin Grubbs
Added db users.
636
        if (getattr(section, 'error_dir', None) is not None
13686.2.1 by Robert Collins
Nuke the unused-in-prod copy-to-zlog aspect of oops reporting.
637
            and getattr(section, 'oops_prefix', None) is not None):
638
            # If the two variables are not set, we will let the error
7675.845.21 by Edwin Grubbs
Added db users.
639
            # utility default to using the [error_reports] config.
640
            errorlog.globalErrorUtility.configure(self.config_name)
9826.11.26 by Aaron Bentley
Provide job running context in a ContextManager.
641
        job_source = getUtility(self.source_interface)
13701.1.1 by Aaron Bentley
Apply original log_twisted patch.
642
        kwargs = {}
13701.1.5 by Aaron Bentley
Use JobCronScript.log_twisted.
643
        if self.log_twisted:
13701.1.1 by Aaron Bentley
Apply original log_twisted patch.
644
            kwargs['_log_twisted'] = True
10372.1.1 by Aaron Bentley
Run with the designated database user.
645
        runner = self.runner_class.runFromSource(
13701.1.1 by Aaron Bentley
Apply original log_twisted patch.
646
            job_source, self.dbuser, self.logger, **kwargs)
7675.624.24 by Tim Penhey
Record the counts of all possible jobs
647
        for name, count in self.job_counts(runner.completed_jobs):
648
            self.logger.info('Ran %d %s jobs.', count, name)
649
        for name, count in self.job_counts(runner.incomplete_jobs):
650
            self.logger.info('%d %s jobs did not complete.', count, name)
9826.11.42 by Aaron Bentley
Terminate jobs with SIGHUP
651
652
653
class TimeoutError(Exception):
654
655
    def __init__(self):
656
        Exception.__init__(self, "Job ran too long.")