40
40
from lazr.delegates import delegates
42
42
from twisted.internet import (
45
from twisted.internet.defer import (
46
48
from twisted.protocols import amp
47
49
from twisted.python import log
48
50
from zope.component import getUtility
61
63
from lp.services.mail.sendmail import MailController
62
64
from lp.services.scripts.base import LaunchpadCronScript
63
65
from lp.services.twistedsupport import run_reactor
64
from lp.services.twistedsupport.task import (
65
ParallelLimitedTaskConsumer,
70
68
class BaseRunnableJobSource:
178
176
if self.error_utility is None:
179
177
self.error_utility = errorlog.globalErrorUtility
179
def acquireLease(self, job):
181
'Trying to acquire lease for job in state %s' % (
187
'Could not acquire lease for %s' % self.job_str(job))
188
self.incomplete_jobs.append(job)
194
class_name = job.__class__.__name__
195
ijob_id = removeSecurityProxy(job).job.id
196
return '%s (ID %d)' % (class_name, ijob_id)
181
198
def runJob(self, job):
182
199
"""Attempt to run a job, updating its status as appropriate."""
183
200
job = IRunnableJob(job)
185
class_name = job.__class__.__name__
186
job_id = removeSecurityProxy(job).job.id
187
202
self.logger.info(
188
'Running %s (ID %d) in status %s' % (
189
class_name, job_id, job.status.title,))
203
'Running %s in status %s' % (
204
self.job_str(job), job.status.title))
191
206
transaction.commit()
296
311
"""Run all the Jobs for this JobRunner."""
297
312
for job in self.jobs:
298
313
job = IRunnableJob(job)
300
'Trying to acquire lease for job in state %s' % (
305
self.logger.debug('Could not acquire lease for job')
306
self.incomplete_jobs.append(job)
314
if not self.acquireLease(job):
308
316
# Commit transaction to clear the row lock.
309
317
transaction.commit()
417
425
:return: a Deferred that fires when the job has completed.
419
427
job = IRunnableJob(job)
423
self.incomplete_jobs.append(job)
428
if not self.acquireLease(job):
425
430
# Commit transaction to clear the row lock.
426
431
transaction.commit()
428
433
deadline = timegm(job.lease_expires.timetuple())
430
435
# Log the job class and database ID for debugging purposes.
431
class_name = job.__class__.__name__
432
ijob_id = removeSecurityProxy(job).job.id
433
436
self.logger.info(
434
'Running %s (ID %d).' % (class_name, ijob_id))
437
'Running %s.' % self.job_str(job))
435
438
self.logger.debug(
436
439
'Running %r, lease expires %s', job, job.lease_expires)
437
440
deferred = self.pool.doWork(
472
475
oops = self._doOops(job, sys.exc_info())
473
476
self._logOopsId(oops['id'])
475
def getTaskSource(self):
476
"""Return a task source for all jobs in job_source."""
480
# XXX: JonathanLange bug=741204: If we're getting all of the
481
# jobs at the start anyway, we can use a DeferredSemaphore,
482
# instead of the more complex PollingTaskSource, which is
483
# better suited to cases where we don't know how much work
485
jobs = list(self.job_source.iterReady())
489
yield lambda: self.runJobInSubprocess(job)
490
return PollingTaskSource(5, producer().next)
492
def doConsumer(self):
493
"""Create a ParallelLimitedTaskConsumer for this job type."""
494
# 1 is hard-coded for now until we're sure we'd get gains by running
495
# more than one at a time. Note that several tests, including
496
# test_timeout, rely on this being 1.
497
consumer = ParallelLimitedTaskConsumer(1, logger=None)
498
return consumer.consume(self.getTaskSource())
500
478
def runAll(self):
501
"""Run all ready jobs, and any that become ready while running."""
479
"""Run all ready jobs."""
502
480
self.pool.start()
503
d = defer.maybeDeferred(self.doConsumer)
504
d.addCallbacks(self.terminated, self.failed)
482
jobs = list(self.job_source.iterReady())
486
d = self.runJobInSubprocess(jobs[0])
488
d.addCallback(lambda ignored: self.runJobInSubprocess(job))
489
d.addCallbacks(self.terminated, self.failed)
506
494
def terminated(self, ignored=None):
507
495
"""Callback to stop the processpool and reactor."""