~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/services/job/runner.py

[rs=buildbot-poller] automatic merge from stable. Revisions: 13831,
        13832, 13833, 13834, 13835 included.

Show diffs side-by-side

added added

removed removed

Lines of Context:
40
40
from lazr.delegates import delegates
41
41
import transaction
42
42
from twisted.internet import (
43
 
    defer,
44
43
    reactor,
45
44
    )
 
45
from twisted.internet.defer import (
 
46
    succeed,
 
47
    )
46
48
from twisted.protocols import amp
47
49
from twisted.python import log
48
50
from zope.component import getUtility
61
63
from lp.services.mail.sendmail import MailController
62
64
from lp.services.scripts.base import LaunchpadCronScript
63
65
from lp.services.twistedsupport import run_reactor
64
 
from lp.services.twistedsupport.task import (
65
 
    ParallelLimitedTaskConsumer,
66
 
    PollingTaskSource,
67
 
    )
68
66
 
69
67
 
70
68
class BaseRunnableJobSource:
178
176
        if self.error_utility is None:
179
177
            self.error_utility = errorlog.globalErrorUtility
180
178
 
 
179
    def acquireLease(self, job):
 
180
        self.logger.debug(
 
181
            'Trying to acquire lease for job in state %s' % (
 
182
                job.status.title,))
 
183
        try:
 
184
            job.acquireLease()
 
185
        except LeaseHeld:
 
186
            self.logger.debug(
 
187
                'Could not acquire lease for %s' % self.job_str(job))
 
188
            self.incomplete_jobs.append(job)
 
189
            return False
 
190
        return True
 
191
 
 
192
    @staticmethod
 
193
    def job_str(job):
 
194
        class_name = job.__class__.__name__
 
195
        ijob_id = removeSecurityProxy(job).job.id
 
196
        return '%s (ID %d)' % (class_name, ijob_id)
 
197
 
181
198
    def runJob(self, job):
182
199
        """Attempt to run a job, updating its status as appropriate."""
183
200
        job = IRunnableJob(job)
184
201
 
185
 
        class_name = job.__class__.__name__
186
 
        job_id = removeSecurityProxy(job).job.id
187
202
        self.logger.info(
188
 
            'Running %s (ID %d) in status %s' % (
189
 
                class_name, job_id, job.status.title,))
 
203
            'Running %s in status %s' % (
 
204
                self.job_str(job), job.status.title))
190
205
        job.start()
191
206
        transaction.commit()
192
207
        do_retry = False
296
311
        """Run all the Jobs for this JobRunner."""
297
312
        for job in self.jobs:
298
313
            job = IRunnableJob(job)
299
 
            self.logger.debug(
300
 
                'Trying to acquire lease for job in state %s' % (
301
 
                    job.status.title,))
302
 
            try:
303
 
                job.acquireLease()
304
 
            except LeaseHeld:
305
 
                self.logger.debug('Could not acquire lease for job')
306
 
                self.incomplete_jobs.append(job)
 
314
            if not self.acquireLease(job):
307
315
                continue
308
316
            # Commit transaction to clear the row lock.
309
317
            transaction.commit()
417
425
        :return: a Deferred that fires when the job has completed.
418
426
        """
419
427
        job = IRunnableJob(job)
420
 
        try:
421
 
            job.acquireLease()
422
 
        except LeaseHeld:
423
 
            self.incomplete_jobs.append(job)
424
 
            return
 
428
        if not self.acquireLease(job):
 
429
            return succeed(None)
425
430
        # Commit transaction to clear the row lock.
426
431
        transaction.commit()
427
432
        job_id = job.id
428
433
        deadline = timegm(job.lease_expires.timetuple())
429
434
 
430
435
        # Log the job class and database ID for debugging purposes.
431
 
        class_name = job.__class__.__name__
432
 
        ijob_id = removeSecurityProxy(job).job.id
433
436
        self.logger.info(
434
 
            'Running %s (ID %d).' % (class_name, ijob_id))
 
437
            'Running %s.' % self.job_str(job))
435
438
        self.logger.debug(
436
439
            'Running %r, lease expires %s', job, job.lease_expires)
437
440
        deferred = self.pool.doWork(
472
475
            oops = self._doOops(job, sys.exc_info())
473
476
            self._logOopsId(oops['id'])
474
477
 
475
 
    def getTaskSource(self):
476
 
        """Return a task source for all jobs in job_source."""
477
 
 
478
 
        def producer():
479
 
            while True:
480
 
                # XXX: JonathanLange bug=741204: If we're getting all of the
481
 
                # jobs at the start anyway, we can use a DeferredSemaphore,
482
 
                # instead of the more complex PollingTaskSource, which is
483
 
                # better suited to cases where we don't know how much work
484
 
                # there will be.
485
 
                jobs = list(self.job_source.iterReady())
486
 
                if len(jobs) == 0:
487
 
                    yield None
488
 
                for job in jobs:
489
 
                    yield lambda: self.runJobInSubprocess(job)
490
 
        return PollingTaskSource(5, producer().next)
491
 
 
492
 
    def doConsumer(self):
493
 
        """Create a ParallelLimitedTaskConsumer for this job type."""
494
 
        # 1 is hard-coded for now until we're sure we'd get gains by running
495
 
        # more than one at a time.  Note that several tests, including
496
 
        # test_timeout, rely on this being 1.
497
 
        consumer = ParallelLimitedTaskConsumer(1, logger=None)
498
 
        return consumer.consume(self.getTaskSource())
499
 
 
500
478
    def runAll(self):
501
 
        """Run all ready jobs, and any that become ready while running."""
 
479
        """Run all ready jobs."""
502
480
        self.pool.start()
503
 
        d = defer.maybeDeferred(self.doConsumer)
504
 
        d.addCallbacks(self.terminated, self.failed)
 
481
        try:
 
482
            jobs = list(self.job_source.iterReady())
 
483
            if len(jobs) == 0:
 
484
                self.terminated()
 
485
                return
 
486
            d = self.runJobInSubprocess(jobs[0])
 
487
            for job in jobs[1:]:
 
488
                d.addCallback(lambda ignored: self.runJobInSubprocess(job))
 
489
            d.addCallbacks(self.terminated, self.failed)
 
490
        except:
 
491
            self.terminated()
 
492
            raise
505
493
 
506
494
    def terminated(self, ignored=None):
507
495
        """Callback to stop the processpool and reactor."""