~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/services/job/runner.py

  • Committer: Launchpad Patch Queue Manager
  • Date: 2011-08-26 13:16:52 UTC
  • mfrom: (13785.5.1 simplify-twisted-runner)
  • Revision ID: launchpad@pqm.canonical.com-20110826131652-0hsozfin7dbypjmh
[r=jcsackett][bug=833888] Simplify Twisted job runner.

Show diffs side-by-side

added added

removed removed

Lines of Context:
40
40
from lazr.delegates import delegates
41
41
import transaction
42
42
from twisted.internet import (
43
 
    defer,
44
43
    reactor,
45
44
    )
46
45
from twisted.protocols import amp
61
60
from lp.services.mail.sendmail import MailController
62
61
from lp.services.scripts.base import LaunchpadCronScript
63
62
from lp.services.twistedsupport import run_reactor
64
 
from lp.services.twistedsupport.task import (
65
 
    ParallelLimitedTaskConsumer,
66
 
    PollingTaskSource,
67
 
    )
68
63
 
69
64
 
70
65
class BaseRunnableJobSource:
472
467
            oops = self._doOops(job, sys.exc_info())
473
468
            self._logOopsId(oops['id'])
474
469
 
475
 
    def getTaskSource(self):
476
 
        """Return a task source for all jobs in job_source."""
477
 
 
478
 
        def producer():
479
 
            while True:
480
 
                # XXX: JonathanLange bug=741204: If we're getting all of the
481
 
                # jobs at the start anyway, we can use a DeferredSemaphore,
482
 
                # instead of the more complex PollingTaskSource, which is
483
 
                # better suited to cases where we don't know how much work
484
 
                # there will be.
485
 
                jobs = list(self.job_source.iterReady())
486
 
                if len(jobs) == 0:
487
 
                    yield None
488
 
                for job in jobs:
489
 
                    yield lambda: self.runJobInSubprocess(job)
490
 
        return PollingTaskSource(5, producer().next)
491
 
 
492
 
    def doConsumer(self):
493
 
        """Create a ParallelLimitedTaskConsumer for this job type."""
494
 
        # 1 is hard-coded for now until we're sure we'd get gains by running
495
 
        # more than one at a time.  Note that several tests, including
496
 
        # test_timeout, rely on this being 1.
497
 
        consumer = ParallelLimitedTaskConsumer(1, logger=None)
498
 
        return consumer.consume(self.getTaskSource())
499
 
 
500
470
    def runAll(self):
501
 
        """Run all ready jobs, and any that become ready while running."""
 
471
        """Run all ready jobs."""
502
472
        self.pool.start()
503
 
        d = defer.maybeDeferred(self.doConsumer)
504
 
        d.addCallbacks(self.terminated, self.failed)
 
473
        try:
 
474
            jobs = list(self.job_source.iterReady())
 
475
            if len(jobs) == 0:
 
476
                self.terminated()
 
477
                return
 
478
            d = self.runJobInSubprocess(jobs[0])
 
479
            for job in jobs[1:]:
 
480
                d.addCallback(lambda ignored: self.runJobInSubprocess(job))
 
481
            d.addCallbacks(self.terminated, self.failed)
 
482
        except:
 
483
            self.terminated()
 
484
            raise
505
485
 
506
486
    def terminated(self, ignored=None):
507
487
        """Callback to stop the processpool and reactor."""