~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/soyuz/tests/test_packagecopyjob.py

[r=allenap][no-qa] Add
 PlainPackageCopyJob.getIncompleteJobsForArchive() method which will return
 all the WAITING, RUNNING or FAILED copy jobs for the passed archive.

Show diffs side-by-side

added added

removed removed

Lines of Context:
96
96
            target_distroseries, target_pocket, requester=requester,
97
97
            package_version=dsd.parent_source_version, **kwargs)
98
98
 
 
99
    def makePPAJob(self, source_archive=None, target_archive=None, **kwargs):
 
100
        if source_archive is None:
 
101
            source_archive = self.factory.makeArchive(
 
102
                purpose=ArchivePurpose.PPA)
 
103
        if target_archive is None:
 
104
            target_archive = self.factory.makeArchive(
 
105
                purpose=ArchivePurpose.PPA)
 
106
        source_name = self.factory.getUniqueString('src-name')
 
107
        target_series = self.factory.makeDistroSeries()
 
108
        target_pocket = self.factory.getAnyPocket()
 
109
        requester = self.factory.makePerson()
 
110
        return getUtility(IPlainPackageCopyJobSource).create(
 
111
            source_name, source_archive, target_archive,
 
112
            target_series, target_pocket, requester=requester,
 
113
            package_version="1.0", **kwargs)
 
114
 
99
115
    def runJob(self, job):
100
116
        """Helper to switch to the right DB user and run the job."""
101
117
        # We are basically mimicking the job runner here.
530
546
        self.assertEqual(
531
547
            {}, job_source.getPendingJobsPerPackage(dsd.derived_series))
532
548
 
 
549
    def test_getIncompleteJobsForArchive_finds_jobs_in_right_archive(self):
 
550
        # getIncompleteJobsForArchive should return all the jobs in an
 
551
        # specified archive.
 
552
        target1 = self.factory.makeArchive(purpose=ArchivePurpose.PPA)
 
553
        target2 = self.factory.makeArchive(purpose=ArchivePurpose.PPA)
 
554
        job_source = getUtility(IPlainPackageCopyJobSource)
 
555
        target1_jobs = [
 
556
            self.makePPAJob(target_archive=target1)
 
557
            for counter in xrange(2)]
 
558
        self.makePPAJob(target2)
 
559
 
 
560
        pending_jobs = list(job_source.getIncompleteJobsForArchive(target1))
 
561
        self.assertContentEqual(pending_jobs, target1_jobs)
 
562
 
 
563
    def test_getIncompleteJobsForArchive_finds_failed_and_running_jobs(self):
 
564
        # getIncompleteJobsForArchive should return only waiting, failed
 
565
        # and running jobs.
 
566
        ppa = self.factory.makeArchive(purpose=ArchivePurpose.PPA)
 
567
        for status in JobStatus.items:
 
568
            job = self.makePPAJob(target_archive=ppa)
 
569
            removeSecurityProxy(job).job._status = status
 
570
 
 
571
        job_source = getUtility(IPlainPackageCopyJobSource)
 
572
        found_jobs = job_source.getIncompleteJobsForArchive(ppa)
 
573
        found_statuses = [job.status for job in found_jobs]
 
574
        self.assertContentEqual(
 
575
            [JobStatus.WAITING, JobStatus.RUNNING, JobStatus.FAILED],
 
576
            found_statuses)
 
577
 
533
578
    def test_copying_to_main_archive_ancestry_overrides(self):
534
579
        # The job will complete right away for auto-approved copies to a
535
580
        # main archive and apply any ancestry overrides.