~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/registry/tests/test_accesspolicy.py

  • Committer: Launchpad Patch Queue Manager
  • Date: 2011-11-21 23:18:40 UTC
  • mfrom: (14186.6.43 observer-model)
  • Revision ID: launchpad@pqm.canonical.com-20111121231840-jek6hufhmc2q0f2g
[r=sinzui][no-qa] Initial model work for access policies.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2011 Canonical Ltd.  This software is licensed under the
 
2
# GNU Affero General Public License version 3 (see the file LICENSE).
 
3
 
 
4
__metaclass__ = type
 
5
 
 
6
from storm.exceptions import LostObjectError
 
7
from storm.store import Store
 
8
from testtools.matchers import MatchesStructure
 
9
from zope.component import getUtility
 
10
 
 
11
from canonical.launchpad.interfaces.lpstorm import IStore
 
12
from canonical.testing.layers import DatabaseFunctionalLayer
 
13
from lp.registry.interfaces.accesspolicy import (
 
14
    AccessPolicyType,
 
15
    IAccessPolicy,
 
16
    IAccessPolicyArtifact,
 
17
    IAccessPolicyArtifactSource,
 
18
    IAccessPolicyGrant,
 
19
    IAccessPolicyGrantSource,
 
20
    IAccessPolicySource,
 
21
    )
 
22
from lp.testing import TestCaseWithFactory
 
23
from lp.testing.matchers import Provides
 
24
 
 
25
 
 
26
class TestAccessPolicy(TestCaseWithFactory):
 
27
    layer = DatabaseFunctionalLayer
 
28
 
 
29
    def test_provides_interface(self):
 
30
        self.assertThat(
 
31
            self.factory.makeAccessPolicy(), Provides(IAccessPolicy))
 
32
 
 
33
    def test_pillar(self):
 
34
        product = self.factory.makeProduct()
 
35
        policy = self.factory.makeAccessPolicy(pillar=product)
 
36
        self.assertEqual(product, policy.pillar)
 
37
 
 
38
 
 
39
class TestAccessPolicySource(TestCaseWithFactory):
 
40
    layer = DatabaseFunctionalLayer
 
41
 
 
42
    def test_create_for_product(self):
 
43
        product = self.factory.makeProduct()
 
44
        type = AccessPolicyType.SECURITY
 
45
        policy = getUtility(IAccessPolicySource).create(product, type)
 
46
        self.assertThat(
 
47
            policy,
 
48
            MatchesStructure.byEquality(pillar=product, type=type))
 
49
 
 
50
    def test_getByID(self):
 
51
        # getByID finds the right policy.
 
52
        policy = self.factory.makeAccessPolicy()
 
53
        # Flush so we get an ID.
 
54
        Store.of(policy).flush()
 
55
        self.assertEqual(
 
56
            policy, getUtility(IAccessPolicySource).getByID(policy.id))
 
57
 
 
58
    def test_getByID_nonexistent(self):
 
59
        # getByID returns None if the policy doesn't exist.
 
60
        self.assertIs(
 
61
            None,
 
62
            getUtility(IAccessPolicySource).getByID(
 
63
                self.factory.getUniqueInteger()))
 
64
 
 
65
    def test_getByPillarAndType(self):
 
66
        # getByPillarAndType finds the right policy.
 
67
        product = self.factory.makeProduct()
 
68
 
 
69
        private_policy = self.factory.makeAccessPolicy(
 
70
            pillar=product, type=AccessPolicyType.PRIVATE)
 
71
        security_policy = self.factory.makeAccessPolicy(
 
72
            pillar=product, type=AccessPolicyType.SECURITY)
 
73
        self.assertEqual(
 
74
            private_policy,
 
75
            getUtility(IAccessPolicySource).getByPillarAndType(
 
76
                product, AccessPolicyType.PRIVATE))
 
77
        self.assertEqual(
 
78
            security_policy,
 
79
            getUtility(IAccessPolicySource).getByPillarAndType(
 
80
                product, AccessPolicyType.SECURITY))
 
81
 
 
82
    def test_getByPillarAndType_nonexistent(self):
 
83
        # getByPillarAndType returns None if the policy doesn't exist.
 
84
        # Create policy identifiers, and an unrelated policy.
 
85
        self.factory.makeAccessPolicy(type=AccessPolicyType.PRIVATE)
 
86
        product = self.factory.makeProduct()
 
87
        self.assertIs(
 
88
            None,
 
89
            getUtility(IAccessPolicySource).getByPillarAndType(
 
90
                product, AccessPolicyType.PRIVATE))
 
91
 
 
92
    def test_findByPillar(self):
 
93
        # findByPillar finds only the relevant policies.
 
94
        product = self.factory.makeProduct()
 
95
        policies = [
 
96
            self.factory.makeAccessPolicy(pillar=product, type=type)
 
97
            for type in AccessPolicyType.items]
 
98
        self.factory.makeAccessPolicy()
 
99
        self.assertContentEqual(
 
100
            policies,
 
101
            getUtility(IAccessPolicySource).findByPillar(product))
 
102
 
 
103
 
 
104
class TestAccessPolicyArtifact(TestCaseWithFactory):
 
105
    layer = DatabaseFunctionalLayer
 
106
 
 
107
    def test_provides_interface(self):
 
108
        self.assertThat(
 
109
            self.factory.makeAccessPolicyArtifact(),
 
110
            Provides(IAccessPolicyArtifact))
 
111
 
 
112
    def test_policy(self):
 
113
        policy = self.factory.makeAccessPolicy()
 
114
        self.assertEqual(
 
115
            policy,
 
116
            self.factory.makeAccessPolicyArtifact(policy=policy).policy)
 
117
 
 
118
 
 
119
class TestAccessPolicyArtifactSourceOnce(TestCaseWithFactory):
 
120
    layer = DatabaseFunctionalLayer
 
121
 
 
122
    def test_ensure_other_fails(self):
 
123
        # ensure() rejects unsupported objects.
 
124
        self.assertRaises(
 
125
            AssertionError,
 
126
            getUtility(IAccessPolicyArtifactSource).ensure,
 
127
            self.factory.makeProduct())
 
128
 
 
129
 
 
130
class BaseAccessPolicyArtifactTests:
 
131
    layer = DatabaseFunctionalLayer
 
132
 
 
133
    def getConcreteArtifact(self):
 
134
        raise NotImplementedError()
 
135
 
 
136
    def test_ensure(self):
 
137
        # ensure() creates an abstract artifact which maps to the
 
138
        # concrete one.
 
139
        concrete = self.getConcreteArtifact()
 
140
        abstract = getUtility(IAccessPolicyArtifactSource).ensure(concrete)
 
141
        Store.of(abstract).flush()
 
142
        self.assertEqual(concrete, abstract.concrete_artifact)
 
143
 
 
144
    def test_get(self):
 
145
        # get() finds an abstract artifact which maps to the concrete
 
146
        # one.
 
147
        concrete = self.getConcreteArtifact()
 
148
        abstract = getUtility(IAccessPolicyArtifactSource).ensure(concrete)
 
149
        self.assertEqual(
 
150
            abstract, getUtility(IAccessPolicyArtifactSource).get(concrete))
 
151
 
 
152
    def test_ensure_twice(self):
 
153
        # ensure() will reuse an existing matching abstract artifact if
 
154
        # it exists.
 
155
        concrete = self.getConcreteArtifact()
 
156
        abstract = getUtility(IAccessPolicyArtifactSource).ensure(concrete)
 
157
        Store.of(abstract).flush()
 
158
        self.assertEqual(
 
159
            abstract.id,
 
160
            getUtility(IAccessPolicyArtifactSource).ensure(concrete).id)
 
161
 
 
162
    def test_delete(self):
 
163
        # delete() removes the abstract artifact and any associated
 
164
        # grants.
 
165
        concrete = self.getConcreteArtifact()
 
166
        abstract = getUtility(IAccessPolicyArtifactSource).ensure(concrete)
 
167
        grant = self.factory.makeAccessPolicyGrant(object=abstract)
 
168
 
 
169
        # Make some other grants to ensure they're unaffected.
 
170
        other_grants = [
 
171
            self.factory.makeAccessPolicyGrant(
 
172
                object=self.factory.makeAccessPolicyArtifact()),
 
173
            self.factory.makeAccessPolicyGrant(
 
174
                object=self.factory.makeAccessPolicy()),
 
175
            ]
 
176
 
 
177
        getUtility(IAccessPolicyArtifactSource).delete(concrete)
 
178
        IStore(grant).invalidate()
 
179
        self.assertRaises(LostObjectError, getattr, grant, 'policy')
 
180
        self.assertRaises(
 
181
            LostObjectError, getattr, abstract, 'concrete_artifact')
 
182
 
 
183
        for other_grant in other_grants:
 
184
            self.assertEqual(
 
185
                other_grant,
 
186
                getUtility(IAccessPolicyGrantSource).getByID(other_grant.id))
 
187
 
 
188
    def test_delete_noop(self):
 
189
        # delete() works even if there's no abstract artifact.
 
190
        concrete = self.getConcreteArtifact()
 
191
        getUtility(IAccessPolicyArtifactSource).delete(concrete)
 
192
 
 
193
 
 
194
class TestAccessPolicyArtifactBranch(BaseAccessPolicyArtifactTests,
 
195
                                     TestCaseWithFactory):
 
196
 
 
197
    def getConcreteArtifact(self):
 
198
        return self.factory.makeBranch()
 
199
 
 
200
 
 
201
class TestAccessPolicyArtifactBug(BaseAccessPolicyArtifactTests,
 
202
                                  TestCaseWithFactory):
 
203
 
 
204
    def getConcreteArtifact(self):
 
205
        return self.factory.makeBug()
 
206
 
 
207
 
 
208
class TestAccessPolicyGrant(TestCaseWithFactory):
 
209
    layer = DatabaseFunctionalLayer
 
210
 
 
211
    def test_provides_interface(self):
 
212
        self.assertThat(
 
213
            self.factory.makeAccessPolicyGrant(),
 
214
            Provides(IAccessPolicyGrant))
 
215
 
 
216
    def test_concrete_artifact(self):
 
217
        bug = self.factory.makeBug()
 
218
        abstract = self.factory.makeAccessPolicyArtifact(bug)
 
219
        grant = self.factory.makeAccessPolicyGrant(
 
220
            object=abstract)
 
221
        self.assertEqual(bug, grant.concrete_artifact)
 
222
 
 
223
    def test_no_concrete_artifact(self):
 
224
        grant = self.factory.makeAccessPolicyGrant(
 
225
            object=self.factory.makeAccessPolicy())
 
226
        self.assertIs(None, grant.concrete_artifact)
 
227
 
 
228
 
 
229
class TestAccessPolicyGrantSource(TestCaseWithFactory):
 
230
    layer = DatabaseFunctionalLayer
 
231
 
 
232
    def test_grant_for_policy(self):
 
233
        policy = self.factory.makeAccessPolicy()
 
234
        grantee = self.factory.makePerson()
 
235
        grantor = self.factory.makePerson()
 
236
        grant = getUtility(IAccessPolicyGrantSource).grant(
 
237
            grantee, grantor, policy)
 
238
        self.assertThat(
 
239
            grant,
 
240
            MatchesStructure.byEquality(
 
241
                grantee=grantee,
 
242
                grantor=grantor,
 
243
                policy=policy,
 
244
                abstract_artifact=None,
 
245
                concrete_artifact=None,))
 
246
 
 
247
    def test_grant_with_artifact(self):
 
248
        artifact = self.factory.makeAccessPolicyArtifact()
 
249
        grantee = self.factory.makePerson()
 
250
        grantor = self.factory.makePerson()
 
251
        grant = getUtility(IAccessPolicyGrantSource).grant(
 
252
            grantee, grantor, artifact)
 
253
        self.assertThat(
 
254
            grant,
 
255
            MatchesStructure.byEquality(
 
256
                grantee=grantee,
 
257
                grantor=grantor,
 
258
                policy=None,
 
259
                abstract_artifact=artifact,
 
260
                concrete_artifact=artifact.concrete_artifact))
 
261
 
 
262
    def test_getByID(self):
 
263
        # getByID finds the right grant.
 
264
        grant = self.factory.makeAccessPolicyGrant()
 
265
        # Flush so we get an ID.
 
266
        Store.of(grant).flush()
 
267
        self.assertEqual(
 
268
            grant,
 
269
            getUtility(IAccessPolicyGrantSource).getByID(grant.id))
 
270
 
 
271
    def test_getByID_nonexistent(self):
 
272
        # getByID returns None if the grant doesn't exist.
 
273
        self.assertIs(
 
274
            None,
 
275
            getUtility(IAccessPolicyGrantSource).getByID(
 
276
                self.factory.getUniqueInteger()))
 
277
 
 
278
    def test_findByPolicy(self):
 
279
        # findByPolicy finds only the relevant grants.
 
280
        policy = self.factory.makeAccessPolicy()
 
281
        grants = [
 
282
            self.factory.makeAccessPolicyGrant(object=policy)
 
283
            for i in range(3)]
 
284
        self.factory.makeAccessPolicyGrant()
 
285
        self.assertContentEqual(
 
286
            grants,
 
287
            getUtility(IAccessPolicyGrantSource).findByPolicy(policy))