1
# Copyright 2011 Canonical Ltd. This software is licensed under the
2
# GNU Affero General Public License version 3 (see the file LICENSE).
6
from storm.exceptions import LostObjectError
7
from storm.store import Store
8
from testtools.matchers import MatchesStructure
9
from zope.component import getUtility
11
from canonical.launchpad.interfaces.lpstorm import IStore
12
from canonical.testing.layers import DatabaseFunctionalLayer
13
from lp.registry.interfaces.accesspolicy import (
16
IAccessPolicyArtifact,
17
IAccessPolicyArtifactSource,
19
IAccessPolicyGrantSource,
22
from lp.testing import TestCaseWithFactory
23
from lp.testing.matchers import Provides
26
class TestAccessPolicy(TestCaseWithFactory):
27
layer = DatabaseFunctionalLayer
29
def test_provides_interface(self):
31
self.factory.makeAccessPolicy(), Provides(IAccessPolicy))
33
def test_pillar(self):
34
product = self.factory.makeProduct()
35
policy = self.factory.makeAccessPolicy(pillar=product)
36
self.assertEqual(product, policy.pillar)
39
class TestAccessPolicySource(TestCaseWithFactory):
40
layer = DatabaseFunctionalLayer
42
def test_create_for_product(self):
43
product = self.factory.makeProduct()
44
type = AccessPolicyType.SECURITY
45
policy = getUtility(IAccessPolicySource).create(product, type)
48
MatchesStructure.byEquality(pillar=product, type=type))
50
def test_getByID(self):
51
# getByID finds the right policy.
52
policy = self.factory.makeAccessPolicy()
53
# Flush so we get an ID.
54
Store.of(policy).flush()
56
policy, getUtility(IAccessPolicySource).getByID(policy.id))
58
def test_getByID_nonexistent(self):
59
# getByID returns None if the policy doesn't exist.
62
getUtility(IAccessPolicySource).getByID(
63
self.factory.getUniqueInteger()))
65
def test_getByPillarAndType(self):
66
# getByPillarAndType finds the right policy.
67
product = self.factory.makeProduct()
69
private_policy = self.factory.makeAccessPolicy(
70
pillar=product, type=AccessPolicyType.PRIVATE)
71
security_policy = self.factory.makeAccessPolicy(
72
pillar=product, type=AccessPolicyType.SECURITY)
75
getUtility(IAccessPolicySource).getByPillarAndType(
76
product, AccessPolicyType.PRIVATE))
79
getUtility(IAccessPolicySource).getByPillarAndType(
80
product, AccessPolicyType.SECURITY))
82
def test_getByPillarAndType_nonexistent(self):
83
# getByPillarAndType returns None if the policy doesn't exist.
84
# Create policy identifiers, and an unrelated policy.
85
self.factory.makeAccessPolicy(type=AccessPolicyType.PRIVATE)
86
product = self.factory.makeProduct()
89
getUtility(IAccessPolicySource).getByPillarAndType(
90
product, AccessPolicyType.PRIVATE))
92
def test_findByPillar(self):
93
# findByPillar finds only the relevant policies.
94
product = self.factory.makeProduct()
96
self.factory.makeAccessPolicy(pillar=product, type=type)
97
for type in AccessPolicyType.items]
98
self.factory.makeAccessPolicy()
99
self.assertContentEqual(
101
getUtility(IAccessPolicySource).findByPillar(product))
104
class TestAccessPolicyArtifact(TestCaseWithFactory):
105
layer = DatabaseFunctionalLayer
107
def test_provides_interface(self):
109
self.factory.makeAccessPolicyArtifact(),
110
Provides(IAccessPolicyArtifact))
112
def test_policy(self):
113
policy = self.factory.makeAccessPolicy()
116
self.factory.makeAccessPolicyArtifact(policy=policy).policy)
119
class TestAccessPolicyArtifactSourceOnce(TestCaseWithFactory):
120
layer = DatabaseFunctionalLayer
122
def test_ensure_other_fails(self):
123
# ensure() rejects unsupported objects.
126
getUtility(IAccessPolicyArtifactSource).ensure,
127
self.factory.makeProduct())
130
class BaseAccessPolicyArtifactTests:
131
layer = DatabaseFunctionalLayer
133
def getConcreteArtifact(self):
134
raise NotImplementedError()
136
def test_ensure(self):
137
# ensure() creates an abstract artifact which maps to the
139
concrete = self.getConcreteArtifact()
140
abstract = getUtility(IAccessPolicyArtifactSource).ensure(concrete)
141
Store.of(abstract).flush()
142
self.assertEqual(concrete, abstract.concrete_artifact)
145
# get() finds an abstract artifact which maps to the concrete
147
concrete = self.getConcreteArtifact()
148
abstract = getUtility(IAccessPolicyArtifactSource).ensure(concrete)
150
abstract, getUtility(IAccessPolicyArtifactSource).get(concrete))
152
def test_ensure_twice(self):
153
# ensure() will reuse an existing matching abstract artifact if
155
concrete = self.getConcreteArtifact()
156
abstract = getUtility(IAccessPolicyArtifactSource).ensure(concrete)
157
Store.of(abstract).flush()
160
getUtility(IAccessPolicyArtifactSource).ensure(concrete).id)
162
def test_delete(self):
163
# delete() removes the abstract artifact and any associated
165
concrete = self.getConcreteArtifact()
166
abstract = getUtility(IAccessPolicyArtifactSource).ensure(concrete)
167
grant = self.factory.makeAccessPolicyGrant(object=abstract)
169
# Make some other grants to ensure they're unaffected.
171
self.factory.makeAccessPolicyGrant(
172
object=self.factory.makeAccessPolicyArtifact()),
173
self.factory.makeAccessPolicyGrant(
174
object=self.factory.makeAccessPolicy()),
177
getUtility(IAccessPolicyArtifactSource).delete(concrete)
178
IStore(grant).invalidate()
179
self.assertRaises(LostObjectError, getattr, grant, 'policy')
181
LostObjectError, getattr, abstract, 'concrete_artifact')
183
for other_grant in other_grants:
186
getUtility(IAccessPolicyGrantSource).getByID(other_grant.id))
188
def test_delete_noop(self):
189
# delete() works even if there's no abstract artifact.
190
concrete = self.getConcreteArtifact()
191
getUtility(IAccessPolicyArtifactSource).delete(concrete)
194
class TestAccessPolicyArtifactBranch(BaseAccessPolicyArtifactTests,
195
TestCaseWithFactory):
197
def getConcreteArtifact(self):
198
return self.factory.makeBranch()
201
class TestAccessPolicyArtifactBug(BaseAccessPolicyArtifactTests,
202
TestCaseWithFactory):
204
def getConcreteArtifact(self):
205
return self.factory.makeBug()
208
class TestAccessPolicyGrant(TestCaseWithFactory):
209
layer = DatabaseFunctionalLayer
211
def test_provides_interface(self):
213
self.factory.makeAccessPolicyGrant(),
214
Provides(IAccessPolicyGrant))
216
def test_concrete_artifact(self):
217
bug = self.factory.makeBug()
218
abstract = self.factory.makeAccessPolicyArtifact(bug)
219
grant = self.factory.makeAccessPolicyGrant(
221
self.assertEqual(bug, grant.concrete_artifact)
223
def test_no_concrete_artifact(self):
224
grant = self.factory.makeAccessPolicyGrant(
225
object=self.factory.makeAccessPolicy())
226
self.assertIs(None, grant.concrete_artifact)
229
class TestAccessPolicyGrantSource(TestCaseWithFactory):
230
layer = DatabaseFunctionalLayer
232
def test_grant_for_policy(self):
233
policy = self.factory.makeAccessPolicy()
234
grantee = self.factory.makePerson()
235
grantor = self.factory.makePerson()
236
grant = getUtility(IAccessPolicyGrantSource).grant(
237
grantee, grantor, policy)
240
MatchesStructure.byEquality(
244
abstract_artifact=None,
245
concrete_artifact=None,))
247
def test_grant_with_artifact(self):
248
artifact = self.factory.makeAccessPolicyArtifact()
249
grantee = self.factory.makePerson()
250
grantor = self.factory.makePerson()
251
grant = getUtility(IAccessPolicyGrantSource).grant(
252
grantee, grantor, artifact)
255
MatchesStructure.byEquality(
259
abstract_artifact=artifact,
260
concrete_artifact=artifact.concrete_artifact))
262
def test_getByID(self):
263
# getByID finds the right grant.
264
grant = self.factory.makeAccessPolicyGrant()
265
# Flush so we get an ID.
266
Store.of(grant).flush()
269
getUtility(IAccessPolicyGrantSource).getByID(grant.id))
271
def test_getByID_nonexistent(self):
272
# getByID returns None if the grant doesn't exist.
275
getUtility(IAccessPolicyGrantSource).getByID(
276
self.factory.getUniqueInteger()))
278
def test_findByPolicy(self):
279
# findByPolicy finds only the relevant grants.
280
policy = self.factory.makeAccessPolicy()
282
self.factory.makeAccessPolicyGrant(object=policy)
284
self.factory.makeAccessPolicyGrant()
285
self.assertContentEqual(
287
getUtility(IAccessPolicyGrantSource).findByPolicy(policy))