61
60
from lp.services.mail.sendmail import MailController
62
61
from lp.services.scripts.base import LaunchpadCronScript
63
62
from lp.services.twistedsupport import run_reactor
64
from lp.services.twistedsupport.task import (
65
ParallelLimitedTaskConsumer,
70
65
class BaseRunnableJobSource:
472
467
oops = self._doOops(job, sys.exc_info())
473
468
self._logOopsId(oops['id'])
475
def getTaskSource(self):
476
"""Return a task source for all jobs in job_source."""
480
# XXX: JonathanLange bug=741204: If we're getting all of the
481
# jobs at the start anyway, we can use a DeferredSemaphore,
482
# instead of the more complex PollingTaskSource, which is
483
# better suited to cases where we don't know how much work
485
jobs = list(self.job_source.iterReady())
489
yield lambda: self.runJobInSubprocess(job)
490
return PollingTaskSource(5, producer().next)
492
def doConsumer(self):
493
"""Create a ParallelLimitedTaskConsumer for this job type."""
494
# 1 is hard-coded for now until we're sure we'd get gains by running
495
# more than one at a time. Note that several tests, including
496
# test_timeout, rely on this being 1.
497
consumer = ParallelLimitedTaskConsumer(1, logger=None)
498
return consumer.consume(self.getTaskSource())
500
470
def runAll(self):
501
"""Run all ready jobs, and any that become ready while running."""
471
"""Run all ready jobs."""
502
472
self.pool.start()
503
d = defer.maybeDeferred(self.doConsumer)
504
d.addCallbacks(self.terminated, self.failed)
474
jobs = list(self.job_source.iterReady())
478
d = self.runJobInSubprocess(jobs[0])
480
d.addCallback(lambda ignored: self.runJobInSubprocess(job))
481
d.addCallbacks(self.terminated, self.failed)
506
486
def terminated(self, ignored=None):
507
487
"""Callback to stop the processpool and reactor."""