1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
|
# Copyright 2009-2011 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Tests related to bug nominations."""
__metaclass__ = type
from itertools import izip
import re
from testtools.content import Content, UTF8_TEXT
from testtools.matchers import (
Equals,
LessThan,
Not,
)
from canonical.database.sqlbase import flush_database_updates
from canonical.launchpad.ftests import (
login,
logout,
)
from canonical.testing.layers import DatabaseFunctionalLayer
from lp.soyuz.interfaces.publishing import PackagePublishingStatus
from lp.testing import (
celebrity_logged_in,
person_logged_in,
StormStatementRecorder,
TestCaseWithFactory,
)
from lp.testing.matchers import HasQueryCount
class CanBeNominatedForTestMixin:
"""Test case mixin for IBug.canBeNominatedFor."""
layer = DatabaseFunctionalLayer
def setUp(self):
super(CanBeNominatedForTestMixin, self).setUp()
login('foo.bar@canonical.com')
self.eric = self.factory.makePerson(name='eric')
self.setUpTarget()
def tearDown(self):
logout()
super(CanBeNominatedForTestMixin, self).tearDown()
def test_canBeNominatedFor_series(self):
# A bug may be nominated for a series of a product with an existing
# task.
self.assertTrue(self.bug.canBeNominatedFor(self.series))
def test_not_canBeNominatedFor_already_nominated_series(self):
# A bug may not be nominated for a series with an existing nomination.
self.assertTrue(self.bug.canBeNominatedFor(self.series))
self.bug.addNomination(self.eric, self.series)
self.assertFalse(self.bug.canBeNominatedFor(self.series))
def test_not_canBeNominatedFor_non_series(self):
# A bug may not be nominated for something other than a series.
self.assertFalse(self.bug.canBeNominatedFor(self.milestone))
def test_not_canBeNominatedFor_already_targeted_series(self):
# A bug may not be nominated for a series if a task already exists.
# This case should be caught by the check for an existing nomination,
# but there are some historical cases where a series task exists
# without a nomination.
self.assertTrue(self.bug.canBeNominatedFor(self.series))
self.bug.addTask(self.eric, self.series)
self.assertFalse(self.bug.canBeNominatedFor(self.series))
def test_not_canBeNominatedFor_random_series(self):
# A bug may only be nominated for a series if that series' pillar
# already has a task.
self.assertFalse(self.bug.canBeNominatedFor(self.random_series))
class TestBugCanBeNominatedForProductSeries(
CanBeNominatedForTestMixin, TestCaseWithFactory):
"""Test IBug.canBeNominated for IProductSeries nominations."""
def setUpTarget(self):
self.series = self.factory.makeProductSeries()
self.bug = self.factory.makeBug(product=self.series.product)
self.milestone = self.factory.makeMilestone(productseries=self.series)
self.random_series = self.factory.makeProductSeries()
class TestBugCanBeNominatedForDistroSeries(
CanBeNominatedForTestMixin, TestCaseWithFactory):
"""Test IBug.canBeNominated for IDistroSeries nominations."""
def setUpTarget(self):
self.series = self.factory.makeDistroSeries()
# The factory can't create a distro bug directly.
self.bug = self.factory.makeBug()
self.bug.addTask(self.eric, self.series.distribution)
self.milestone = self.factory.makeMilestone(
distribution=self.series.distribution)
self.random_series = self.factory.makeDistroSeries()
def test_not_canBeNominatedFor_source_package(self):
# A bug may not be nominated directly for a source package. The
# distroseries must be nominated instead.
spn = self.factory.makeSourcePackageName()
source_package = self.series.getSourcePackage(spn)
self.assertFalse(self.bug.canBeNominatedFor(source_package))
def test_canBeNominatedFor_with_only_distributionsourcepackage(self):
# A distribution source package task is sufficient to allow nomination
# to a series of that distribution.
sp_bug = self.factory.makeBug()
spn = self.factory.makeSourcePackageName()
self.factory.makeSourcePackagePublishingHistory(
distroseries=self.series, sourcepackagename=spn)
self.assertFalse(sp_bug.canBeNominatedFor(self.series))
sp_bug.addTask(
self.eric, self.series.distribution.getSourcePackage(spn))
self.assertTrue(sp_bug.canBeNominatedFor(self.series))
class TestCanApprove(TestCaseWithFactory):
layer = DatabaseFunctionalLayer
def test_normal_user_cannot_approve(self):
nomination = self.factory.makeBugNomination(
target=self.factory.makeProductSeries())
self.assertFalse(nomination.canApprove(self.factory.makePerson()))
def test_driver_can_approve(self):
product = self.factory.makeProduct(driver=self.factory.makePerson())
nomination = self.factory.makeBugNomination(
target=self.factory.makeProductSeries(product=product))
self.assertTrue(nomination.canApprove(product.driver))
def publishSource(self, series, sourcepackagename, component):
return self.factory.makeSourcePackagePublishingHistory(
archive=series.main_archive,
distroseries=series,
sourcepackagename=sourcepackagename,
component=component,
status=PackagePublishingStatus.PUBLISHED)
def test_component_uploader_can_approve(self):
# A component uploader can approve a nomination for a package in
# that component, but not those in other components
series = self.factory.makeDistroSeries()
package_name = self.factory.makeSourcePackageName()
with celebrity_logged_in('admin'):
perm = series.main_archive.newComponentUploader(
self.factory.makePerson(), self.factory.makeComponent())
other_perm = series.main_archive.newComponentUploader(
self.factory.makePerson(), self.factory.makeComponent())
nomination = self.factory.makeBugNomination(
target=series.getSourcePackage(package_name))
# Publish the package in one of the uploaders' components. The
# uploader for the other component cannot approve the nomination.
self.publishSource(series, package_name, perm.component)
self.assertFalse(nomination.canApprove(other_perm.person))
self.assertTrue(nomination.canApprove(perm.person))
def test_any_component_uploader_can_approve_for_no_package(self):
# An uploader for any component can approve a nomination without
# a package.
series = self.factory.makeDistroSeries()
with celebrity_logged_in('admin'):
perm = series.main_archive.newComponentUploader(
self.factory.makePerson(), self.factory.makeComponent())
nomination = self.factory.makeBugNomination(target=series)
self.assertFalse(nomination.canApprove(self.factory.makePerson()))
self.assertTrue(nomination.canApprove(perm.person))
def test_package_uploader_can_approve(self):
# A package uploader can approve a nomination for that package,
# but not others.
series = self.factory.makeDistroSeries()
package_name = self.factory.makeSourcePackageName()
with celebrity_logged_in('admin'):
perm = series.main_archive.newPackageUploader(
self.factory.makePerson(), package_name)
other_perm = series.main_archive.newPackageUploader(
self.factory.makePerson(),
self.factory.makeSourcePackageName())
nomination = self.factory.makeBugNomination(
target=series.getSourcePackage(package_name))
self.assertFalse(nomination.canApprove(other_perm.person))
self.assertTrue(nomination.canApprove(perm.person))
def test_packageset_uploader_can_approve(self):
# A packageset uploader can approve a nomination for anything in
# that packageset.
series = self.factory.makeDistroSeries()
package_name = self.factory.makeSourcePackageName()
ps = self.factory.makePackageset(
distroseries=series, packages=[package_name])
with celebrity_logged_in('admin'):
perm = series.main_archive.newPackagesetUploader(
self.factory.makePerson(), ps)
nomination = self.factory.makeBugNomination(
target=series.getSourcePackage(package_name))
self.assertFalse(nomination.canApprove(self.factory.makePerson()))
self.assertTrue(nomination.canApprove(perm.person))
def test_any_uploader_can_approve(self):
# If there are multiple tasks for a distribution, an uploader to
# any of the involved packages or components can approve the
# nomination.
series = self.factory.makeDistroSeries()
package_name = self.factory.makeSourcePackageName()
comp_package_name = self.factory.makeSourcePackageName()
with celebrity_logged_in('admin'):
package_perm = series.main_archive.newPackageUploader(
self.factory.makePerson(), package_name)
comp_perm = series.main_archive.newComponentUploader(
self.factory.makePerson(), self.factory.makeComponent())
nomination = self.factory.makeBugNomination(
target=series.getSourcePackage(package_name))
self.factory.makeBugTask(
bug=nomination.bug,
target=series.distribution.getSourcePackage(comp_package_name))
self.publishSource(series, package_name, comp_perm.component)
self.assertFalse(nomination.canApprove(self.factory.makePerson()))
self.assertTrue(nomination.canApprove(package_perm.person))
self.assertTrue(nomination.canApprove(comp_perm.person))
class TestApprovePerformance(TestCaseWithFactory):
"""Test the performance of `BugNomination.approve`."""
layer = DatabaseFunctionalLayer
def check_heat_queries(self, nomination):
self.assertFalse(nomination.isApproved())
# Statement patterns we're looking for:
pattern = "^(SELECT Bug.heat|UPDATE .* max_bug_heat)"
matcher = re.compile(pattern , re.DOTALL | re.I).match
queries_heat = lambda statement: matcher(statement) is not None
with person_logged_in(nomination.target.owner):
flush_database_updates()
with StormStatementRecorder(queries_heat) as recorder:
nomination.approve(nomination.target.owner)
# Post-process the recorder to only have heat-related statements.
recorder.query_data = [
data for statement, data in izip(
recorder.statements, recorder.query_data)
if queries_heat(statement)]
self.addDetail(
"query_data", Content(UTF8_TEXT, lambda: [str(recorder)]))
# If there isn't at least one update to max_bug_heat it may mean that
# this test is no longer relevant.
self.assertThat(recorder, HasQueryCount(Not(Equals(0))))
# At present there are two updates to max_bug_heat because
# recalculateBugHeatCache is called twice, and, even though it is
# lazily evaluated, there are both explicit and implicit flushes in
# bugtask subscriber code.
self.assertThat(recorder, HasQueryCount(LessThan(3)))
def test_heat_queries_for_productseries(self):
# The number of heat-related queries when approving a product series
# nomination is as low as reasonably possible.
series = self.factory.makeProductSeries()
bug = self.factory.makeBug(product=series.product)
with person_logged_in(series.owner):
nomination = bug.addNomination(
target=series, owner=series.owner)
self.check_heat_queries(nomination)
def test_heat_queries_for_distroseries(self):
# The number of heat-related queries when approving a distro series
# nomination is as low as reasonably possible.
series = self.factory.makeDistroSeries()
bug = self.factory.makeBug(distribution=series.distribution)
with person_logged_in(series.owner):
nomination = bug.addNomination(
target=series, owner=series.owner)
self.check_heat_queries(nomination)
|