~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/soyuz/tests/test_archive.py

  • Committer: Launchpad Patch Queue Manager
  • Date: 2011-07-18 17:33:25 UTC
  • mfrom: (13410.1.34 async-copying-part-3-flag)
  • Revision ID: launchpad@pqm.canonical.com-20110718173325-uy5lxud3o9xk5hai
[r=julian-edwards][bug=809805] Amalgamation of branches that adds API
        support for asynchronously copying packages around using the
        webservice API and PackageCopyJob.

Show diffs side-by-side

added added

removed removed

Lines of Context:
39
39
    )
40
40
from lp.registry.interfaces.pocket import PackagePublishingPocket
41
41
from lp.registry.interfaces.series import SeriesStatus
 
42
from lp.services.features.testing import FeatureFixture
42
43
from lp.services.job.interfaces.job import JobStatus
43
44
from lp.services.propertycache import clear_property_cache
44
45
from lp.services.worlddata.interfaces.country import ICountrySet
49
50
    ArchivePermissionType,
50
51
    ArchivePurpose,
51
52
    ArchiveStatus,
 
53
    PackageCopyPolicy,
52
54
    PackagePublishingStatus,
53
55
    )
54
56
from lp.soyuz.interfaces.archive import (
55
57
    ArchiveDependencyError,
56
58
    ArchiveDisabled,
 
59
    CannotCopy,
57
60
    CannotRestrictArchitectures,
58
61
    CannotUploadToPocket,
59
62
    CannotUploadToPPA,
 
63
    ForbiddenByFeatureFlag,
60
64
    IArchiveSet,
61
65
    InsufficientUploadRights,
62
66
    InvalidPocketForPartnerArchive,
69
73
from lp.soyuz.interfaces.archivepermission import IArchivePermissionSet
70
74
from lp.soyuz.interfaces.binarypackagename import IBinaryPackageNameSet
71
75
from lp.soyuz.interfaces.component import IComponentSet
 
76
from lp.soyuz.interfaces.packagecopyjob import IPlainPackageCopyJobSource
72
77
from lp.soyuz.interfaces.processor import IProcessorFamilySet
73
78
from lp.soyuz.model.archive import Archive
74
79
from lp.soyuz.model.archivepermission import ArchivePermission
1933
1938
            filtered_sources])
1934
1939
 
1935
1940
 
 
1941
class TestSyncSourceFeatureFlag(TestCaseWithFactory):
 
1942
 
 
1943
    layer = DatabaseFunctionalLayer
 
1944
 
 
1945
    def test_copyPackage_requires_feature_flag(self):
 
1946
        # Ensure feature is off.
 
1947
        self.useFixture(FeatureFixture({u"soyuz.copypackage.enabled": ''}))
 
1948
        archive = self.factory.makeArchive()
 
1949
        self.assertRaises(
 
1950
            ForbiddenByFeatureFlag,
 
1951
            archive.copyPackage,
 
1952
            None, None, None, None, None)
 
1953
 
 
1954
    def test_copyPackages_requires_feature_flag(self):
 
1955
        # Ensure feature is off.
 
1956
        self.useFixture(FeatureFixture({u"soyuz.copypackage.enabled": ''}))
 
1957
        archive = self.factory.makeArchive()
 
1958
        self.assertRaises(
 
1959
            ForbiddenByFeatureFlag,
 
1960
            archive.copyPackages,
 
1961
            None, None, None, None, None)
 
1962
 
 
1963
 
1936
1964
class TestSyncSource(TestCaseWithFactory):
1937
1965
 
1938
1966
    layer = DatabaseFunctionalLayer
1939
1967
 
 
1968
    def setUp(self):
 
1969
        super(TestSyncSource, self).setUp()
 
1970
        self.useFixture(FeatureFixture({u"soyuz.copypackage.enabled": 'on'}))
 
1971
 
1940
1972
    def test_security_team_can_copy_to_primary(self):
1941
1973
        # A member of ubuntu-security can use syncSource on any package
1942
1974
        # in the Ubuntu primary archive, regardless of their normal
1965
1997
            1,
1966
1998
            ubuntu.main_archive.getPublishedSources(
1967
1999
                name=source.source_package_name).count())
 
2000
 
 
2001
    def _setup_copy_data(self, target_purpose=None):
 
2002
        if target_purpose is None:
 
2003
            target_purpose = ArchivePurpose.PPA
 
2004
        source_archive = self.factory.makeArchive()
 
2005
        target_archive = self.factory.makeArchive(purpose=target_purpose)
 
2006
        source = self.factory.makeSourcePackagePublishingHistory(
 
2007
            archive=source_archive, status=PackagePublishingStatus.PUBLISHED)
 
2008
        source_name = source.source_package_name
 
2009
        version = source.source_package_version
 
2010
        to_pocket = PackagePublishingPocket.RELEASE
 
2011
        to_series = self.factory.makeDistroSeries(
 
2012
            distribution=target_archive.distribution)
 
2013
        return (source, source_archive, source_name, target_archive,
 
2014
                to_pocket, to_series, version)
 
2015
 
 
2016
    def test_copyPackage_creates_packagecopyjob(self):
 
2017
        # The copyPackage method should create a PCJ with the appropriate
 
2018
        # parameters.
 
2019
        (source, source_archive, source_name, target_archive, to_pocket,
 
2020
         to_series, version) = self._setup_copy_data()
 
2021
        with person_logged_in(target_archive.owner):
 
2022
            target_archive.copyPackage(
 
2023
                source_name, version, source_archive, to_pocket.name,
 
2024
                to_series=to_series.name, include_binaries=False,
 
2025
                person=target_archive.owner)
 
2026
 
 
2027
        # The source should not be published yet in the target_archive.
 
2028
        published = target_archive.getPublishedSources(
 
2029
            name=source.source_package_name).any()
 
2030
        self.assertIs(None, published)
 
2031
 
 
2032
        # There should be one copy job.
 
2033
        job_source = getUtility(IPlainPackageCopyJobSource)
 
2034
        copy_job = job_source.getActiveJobs(target_archive).one()
 
2035
 
 
2036
        # Its data should reflect the requested copy.
 
2037
        self.assertEqual(source_name, copy_job.package_name)
 
2038
        self.assertEqual(version, copy_job.package_version)
 
2039
        self.assertEqual(target_archive, copy_job.target_archive)
 
2040
        self.assertEqual(source_archive, copy_job.source_archive)
 
2041
        self.assertEqual(to_series, copy_job.target_distroseries)
 
2042
        self.assertEqual(to_pocket, copy_job.target_pocket)
 
2043
        self.assertFalse(copy_job.include_binaries)
 
2044
        self.assertEquals(PackageCopyPolicy.INSECURE, copy_job.copy_policy)
 
2045
 
 
2046
    def test_copyPackage_disallows_non_primary_archive_uploaders(self):
 
2047
        # If copying to a primary archive and you're not an uploader for
 
2048
        # the package then you can't copy.
 
2049
        (source, source_archive, source_name, target_archive, to_pocket,
 
2050
         to_series, version) = self._setup_copy_data(
 
2051
            target_purpose=ArchivePurpose.PRIMARY)
 
2052
        person = self.factory.makePerson()
 
2053
        self.assertRaises(
 
2054
            CannotCopy,
 
2055
            target_archive.copyPackage, source_name, version, source_archive,
 
2056
            to_pocket.name, to_series=to_series.name, include_binaries=False,
 
2057
            person=person)
 
2058
 
 
2059
    def test_copyPackage_allows_primary_archive_uploaders(self):
 
2060
        # Copying to a primary archive if you're already an uploader is OK.
 
2061
        (source, source_archive, source_name, target_archive, to_pocket,
 
2062
         to_series, version) = self._setup_copy_data(
 
2063
            target_purpose=ArchivePurpose.PRIMARY)
 
2064
        person = self.factory.makePerson()
 
2065
        with person_logged_in(target_archive.owner):
 
2066
            target_archive.newComponentUploader(person, "universe")
 
2067
        target_archive.copyPackage(
 
2068
            source_name, version, source_archive, to_pocket.name,
 
2069
            to_series=to_series.name, include_binaries=False,
 
2070
            person=person)
 
2071
 
 
2072
        # There should be one copy job.
 
2073
        job_source = getUtility(IPlainPackageCopyJobSource)
 
2074
        copy_job = job_source.getActiveJobs(target_archive).one()
 
2075
        self.assertEqual(target_archive, copy_job.target_archive)
 
2076
 
 
2077
    def test_copyPackage_disallows_non_PPA_owners(self):
 
2078
        # Only people with launchpad.Append are allowed to call copyPackage.
 
2079
        (source, source_archive, source_name, target_archive, to_pocket,
 
2080
         to_series, version) = self._setup_copy_data()
 
2081
        person = self.factory.makePerson()
 
2082
        self.assertTrue(target_archive.is_ppa)
 
2083
        self.assertRaises(
 
2084
            CannotCopy,
 
2085
            target_archive.copyPackage, source_name, version, source_archive,
 
2086
            to_pocket.name, to_series=to_series.name, include_binaries=False,
 
2087
            person=person)
 
2088
 
 
2089
    def test_copyPackage_disallows_non_release_target_pocket_for_PPA(self):
 
2090
        (source, source_archive, source_name, target_archive, to_pocket,
 
2091
         to_series, version) = self._setup_copy_data()
 
2092
        to_pocket = PackagePublishingPocket.UPDATES
 
2093
        self.assertTrue(target_archive.is_ppa)
 
2094
        self.assertRaises(
 
2095
            CannotCopy,
 
2096
            target_archive.copyPackage, source_name, version, source_archive,
 
2097
            to_pocket.name, to_series=to_series.name, include_binaries=False,
 
2098
            person=target_archive.owner)
 
2099
 
 
2100
    def test_copyPackages_with_single_package(self):
 
2101
        (source, source_archive, source_name, target_archive, to_pocket,
 
2102
         to_series, version) = self._setup_copy_data()
 
2103
 
 
2104
        with person_logged_in(target_archive.owner):
 
2105
            target_archive.copyPackages(
 
2106
                [source_name], source_archive, to_pocket.name,
 
2107
                to_series=to_series.name, include_binaries=False,
 
2108
                person=target_archive.owner)
 
2109
 
 
2110
        # The source should not be published yet in the target_archive.
 
2111
        published = target_archive.getPublishedSources(
 
2112
            name=source.source_package_name).any()
 
2113
        self.assertIs(None, published)
 
2114
 
 
2115
        # There should be one copy job.
 
2116
        job_source = getUtility(IPlainPackageCopyJobSource)
 
2117
        copy_job = job_source.getActiveJobs(target_archive).one()
 
2118
        self.assertEqual(target_archive, copy_job.target_archive)
 
2119
 
 
2120
    def test_copyPackages_with_multiple_packages(self):
 
2121
        (source, source_archive, source_name, target_archive, to_pocket,
 
2122
         to_series, version) = self._setup_copy_data()
 
2123
        sources = [source]
 
2124
        sources.append(self.factory.makeSourcePackagePublishingHistory(
 
2125
            archive=source_archive,
 
2126
            status=PackagePublishingStatus.PUBLISHED))
 
2127
        sources.append(self.factory.makeSourcePackagePublishingHistory(
 
2128
            archive=source_archive,
 
2129
            status=PackagePublishingStatus.PUBLISHED))
 
2130
        names = [source.sourcepackagerelease.sourcepackagename.name
 
2131
                 for source in sources]
 
2132
 
 
2133
        with person_logged_in(target_archive.owner):
 
2134
            target_archive.copyPackages(
 
2135
                names, source_archive, to_pocket.name,
 
2136
                to_series=to_series.name, include_binaries=False,
 
2137
                person=target_archive.owner)
 
2138
 
 
2139
        # Make sure three copy jobs exist.
 
2140
        job_source = getUtility(IPlainPackageCopyJobSource)
 
2141
        copy_jobs = job_source.getActiveJobs(target_archive)
 
2142
        self.assertEqual(3, copy_jobs.count())
 
2143
 
 
2144
    def test_copyPackages_disallows_non_primary_archive_uploaders(self):
 
2145
        # If copying to a primary archive and you're not an uploader for
 
2146
        # the package then you can't copy.
 
2147
        (source, source_archive, source_name, target_archive, to_pocket,
 
2148
         to_series, version) = self._setup_copy_data(
 
2149
            target_purpose=ArchivePurpose.PRIMARY)
 
2150
        person = self.factory.makePerson()
 
2151
        self.assertRaises(
 
2152
            CannotCopy,
 
2153
            target_archive.copyPackages, [source_name], source_archive,
 
2154
            to_pocket.name, to_series=to_series.name, include_binaries=False,
 
2155
            person=person)
 
2156
 
 
2157
    def test_copyPackages_allows_primary_archive_uploaders(self):
 
2158
        # Copying to a primary archive if you're already an uploader is OK.
 
2159
        (source, source_archive, source_name, target_archive, to_pocket,
 
2160
         to_series, version) = self._setup_copy_data(
 
2161
            target_purpose=ArchivePurpose.PRIMARY)
 
2162
        person = self.factory.makePerson()
 
2163
        with person_logged_in(target_archive.owner):
 
2164
            target_archive.newComponentUploader(person, "universe")
 
2165
        target_archive.copyPackages(
 
2166
            [source_name], source_archive, to_pocket.name,
 
2167
            to_series=to_series.name, include_binaries=False,
 
2168
            person=person)
 
2169
 
 
2170
        # There should be one copy job.
 
2171
        job_source = getUtility(IPlainPackageCopyJobSource)
 
2172
        copy_job = job_source.getActiveJobs(target_archive).one()
 
2173
        self.assertEqual(target_archive, copy_job.target_archive)
 
2174
 
 
2175
    def test_copyPackages_disallows_non_PPA_owners(self):
 
2176
        # Only people with launchpad.Append are allowed to call copyPackage.
 
2177
        (source, source_archive, source_name, target_archive, to_pocket,
 
2178
         to_series, version) = self._setup_copy_data()
 
2179
        person = self.factory.makePerson()
 
2180
        self.assertTrue(target_archive.is_ppa)
 
2181
        self.assertRaises(
 
2182
            CannotCopy,
 
2183
            target_archive.copyPackages, [source_name], source_archive,
 
2184
            to_pocket.name, to_series=to_series.name, include_binaries=False,
 
2185
            person=person)
 
2186