142
141
for subscriber in subscribers)
143
142
return subscribers, subscriptions
144
def test_forTask(self):
145
# `forTask` returns a new `BugSubscriptionInfo` narrowed to the given
147
info = self.getInfo()
148
self.assertIs(None, info.bugtask)
149
# If called with the current bugtask the same `BugSubscriptionInfo`
150
# instance is returned.
151
self.assertIs(info, info.forTask(info.bugtask))
152
# If called with a different bugtask a new `BugSubscriptionInfo` is
154
bugtask = self.bug.default_bugtask
155
info_for_task = info.forTask(bugtask)
156
self.assertIs(bugtask, info_for_task.bugtask)
157
self.assertIsNot(info, info_for_task)
158
# The instances share a cache of `BugSubscriptionInfo` instances.
160
info.cache_key: info,
161
info_for_task.cache_key: info_for_task,
163
self.assertEqual(expected_cache, info.cache)
164
self.assertIs(info.cache, info_for_task.cache)
165
# Calling `forTask` again looks in the cache first.
166
self.assertIs(info, info_for_task.forTask(info.bugtask))
167
self.assertIs(info_for_task, info.forTask(info_for_task.bugtask))
168
# The level is the same.
169
self.assertEqual(info.level, info_for_task.level)
171
def test_forLevel(self):
172
# `forLevel` returns a new `BugSubscriptionInfo` narrowed to the given
173
# subscription level.
174
info = self.getInfo(BugNotificationLevel.LIFECYCLE)
175
# If called with the current level the same `BugSubscriptionInfo`
176
# instance is returned.
177
self.assertIs(info, info.forLevel(info.level))
178
# If called with a different level a new `BugSubscriptionInfo` is
180
level = BugNotificationLevel.METADATA
181
info_for_level = info.forLevel(level)
182
self.assertEqual(level, info_for_level.level)
183
self.assertIsNot(info, info_for_level)
184
# The instances share a cache of `BugSubscriptionInfo` instances.
186
info.cache_key: info,
187
info_for_level.cache_key: info_for_level,
189
self.assertEqual(expected_cache, info.cache)
190
self.assertIs(info.cache, info_for_level.cache)
191
# Calling `forLevel` again looks in the cache first.
192
self.assertIs(info, info_for_level.forLevel(info.level))
193
self.assertIs(info_for_level, info.forLevel(info_for_level.level))
194
# The bugtask is the same.
195
self.assertIs(info.bugtask, info_for_level.bugtask)
197
def test_muted(self):
198
# The set of muted subscribers for the bug.
199
subscribers, subscriptions = self._create_direct_subscriptions()
200
sub1, sub2 = subscribers
201
with person_logged_in(sub1):
202
self.bug.mute(sub1, sub1)
203
self.assertContentEqual([sub1], self.getInfo().muted_subscribers)
145
205
def test_direct(self):
146
206
# The set of direct subscribers.
147
207
subscribers, subscriptions = self._create_direct_subscriptions()
148
208
found_subscriptions = self.getInfo().direct_subscriptions
149
self.assertEqual(set(subscriptions), found_subscriptions)
150
self.assertEqual(subscriptions, found_subscriptions.sorted)
151
self.assertEqual(set(subscribers), found_subscriptions.subscribers)
152
self.assertEqual(subscribers, found_subscriptions.subscribers.sorted)
209
self.assertContentEqual(subscriptions, found_subscriptions)
210
self.assertContentEqual(subscribers, found_subscriptions.subscribers)
154
def test_muted_direct(self):
212
def test_direct_muted(self):
155
213
# If a direct is muted, it is not listed.
156
214
subscribers, subscriptions = self._create_direct_subscriptions()
157
215
with person_logged_in(subscribers[0]):
158
216
self.bug.mute(subscribers[0], subscribers[0])
159
217
found_subscriptions = self.getInfo().direct_subscriptions
160
self.assertEqual(set([subscriptions[1]]), found_subscriptions)
218
self.assertContentEqual([subscriptions[1]], found_subscriptions)
220
def test_all_direct(self):
221
# The set of all direct subscribers, regardless of level.
222
subscribers, subscriptions = self._create_direct_subscriptions()
223
# Change the first subscription to be for comments only.
224
sub1, sub2 = subscriptions
225
with person_logged_in(sub1.person):
226
sub1.bug_notification_level = BugNotificationLevel.LIFECYCLE
227
info = self.getInfo(BugNotificationLevel.COMMENTS)
228
self.assertContentEqual([sub2], info.direct_subscriptions)
229
self.assertContentEqual(
230
[sub1, sub2], info.direct_subscriptions_at_all_levels)
162
232
def _create_duplicate_subscription(self):
163
233
duplicate_bug = self.factory.makeBug(product=self.target)
171
241
def test_duplicate(self):
172
242
# The set of subscribers from duplicate bugs.
173
243
found_subscriptions = self.getInfo().duplicate_subscriptions
174
self.assertEqual(set(), found_subscriptions)
175
self.assertEqual((), found_subscriptions.sorted)
176
self.assertEqual(set(), found_subscriptions.subscribers)
177
self.assertEqual((), found_subscriptions.subscribers.sorted)
244
self.assertContentEqual([], found_subscriptions)
245
self.assertContentEqual([], found_subscriptions.subscribers)
178
246
duplicate_bug, duplicate_bug_subscription = (
179
247
self._create_duplicate_subscription())
180
248
found_subscriptions = self.getInfo().duplicate_subscriptions
182
set([duplicate_bug_subscription]),
249
self.assertContentEqual(
250
[duplicate_bug_subscription],
183
251
found_subscriptions)
185
(duplicate_bug_subscription,),
186
found_subscriptions.sorted)
188
set([duplicate_bug.owner]),
252
self.assertContentEqual(
253
[duplicate_bug.owner],
189
254
found_subscriptions.subscribers)
191
(duplicate_bug.owner,),
192
found_subscriptions.subscribers.sorted)
194
def test_muted_duplicate(self):
256
def test_duplicate_muted(self):
195
257
# If a duplicate is muted, it is not listed.
196
258
duplicate_bug, duplicate_bug_subscription = (
197
259
self._create_duplicate_subscription())
198
260
with person_logged_in(duplicate_bug.owner):
199
261
self.bug.mute(duplicate_bug.owner, duplicate_bug.owner)
200
262
found_subscriptions = self.getInfo().duplicate_subscriptions
201
self.assertEqual(set(), found_subscriptions)
263
self.assertContentEqual([], found_subscriptions)
203
265
def test_duplicate_for_private_bug(self):
204
266
# The set of subscribers from duplicate bugs is always empty when the
234
294
self.bug.default_bugtask.transitionToAssignee(
235
295
duplicate_bug_subscription.person)
236
296
found_subscriptions = self.getInfo().duplicate_only_subscriptions
237
self.assertEqual(set(), found_subscriptions)
239
def test_structural(self):
297
self.assertContentEqual([], found_subscriptions)
299
def test_structural_subscriptions(self):
300
# The set of structural subscriptions.
302
self.factory.makePerson(),
303
self.factory.makePerson())
304
with person_logged_in(self.bug.owner):
305
subscriptions = tuple(
306
self.target.addBugSubscription(subscriber, subscriber)
307
for subscriber in subscribers)
308
found_subscriptions = self.getInfo().structural_subscriptions
309
self.assertContentEqual(subscriptions, found_subscriptions)
311
def test_structural_subscriptions_muted(self):
312
# The set of structural subscriptions DOES NOT exclude muted
314
subscriber = self.factory.makePerson()
315
with person_logged_in(subscriber):
316
self.bug.mute(subscriber, subscriber)
317
with person_logged_in(self.bug.owner):
318
subscription = self.target.addBugSubscription(
319
subscriber, subscriber)
320
found_subscriptions = self.getInfo().structural_subscriptions
321
self.assertContentEqual([subscription], found_subscriptions)
323
def test_structural_subscribers(self):
240
324
# The set of structural subscribers.
242
326
self.factory.makePerson(),
243
327
self.factory.makePerson())
244
328
with person_logged_in(self.bug.owner):
245
subscriptions = tuple(
329
for subscriber in subscribers:
246
330
self.target.addBugSubscription(subscriber, subscriber)
247
for subscriber in subscribers)
248
found_subscriptions = self.getInfo().structural_subscriptions
249
self.assertEqual(set(subscriptions), found_subscriptions)
250
self.assertEqual(subscriptions, found_subscriptions.sorted)
251
self.assertEqual(set(subscribers), found_subscriptions.subscribers)
252
self.assertEqual(subscribers, found_subscriptions.subscribers.sorted)
331
found_subscribers = self.getInfo().structural_subscribers
332
self.assertContentEqual(subscribers, found_subscribers)
334
def test_structural_subscribers_muted(self):
335
# The set of structural subscribers DOES NOT exclude muted
337
subscriber = self.factory.makePerson()
338
with person_logged_in(subscriber):
339
self.bug.mute(subscriber, subscriber)
340
with person_logged_in(self.bug.owner):
341
self.target.addBugSubscription(subscriber, subscriber)
342
found_subscribers = self.getInfo().structural_subscribers
343
self.assertContentEqual([subscriber], found_subscribers)
254
345
def test_all_assignees(self):
255
346
# The set of bugtask assignees for bugtasks that have been assigned.
256
347
found_assignees = self.getInfo().all_assignees
257
self.assertEqual(set(), found_assignees)
258
self.assertEqual((), found_assignees.sorted)
348
self.assertContentEqual([], found_assignees)
259
349
bugtask = self.bug.default_bugtask
260
350
with person_logged_in(bugtask.pillar.bug_supervisor):
261
351
bugtask.transitionToAssignee(self.bug.owner)
262
352
found_assignees = self.getInfo().all_assignees
263
self.assertEqual(set([self.bug.owner]), found_assignees)
264
self.assertEqual((self.bug.owner,), found_assignees.sorted)
353
self.assertContentEqual([self.bug.owner], found_assignees)
265
354
bugtask2 = self.factory.makeBugTask(bug=self.bug)
266
355
with person_logged_in(bugtask2.pillar.owner):
267
356
bugtask2.transitionToAssignee(bugtask2.owner)
268
357
found_assignees = self.getInfo().all_assignees
270
set([self.bug.owner, bugtask2.owner]),
358
self.assertContentEqual(
359
[self.bug.owner, bugtask2.owner],
273
(self.bug.owner, bugtask2.owner),
274
found_assignees.sorted)
361
# Getting info for a specific bugtask will return the assignee for
363
self.assertContentEqual(
365
self.getInfo().forTask(bugtask2).all_assignees)
276
367
def test_all_pillar_owners_without_bug_supervisors(self):
277
368
# The set of owners of pillars for which no bug supervisor is
369
# configured and which use Launchpad for bug tracking.
279
370
[bugtask] = self.bug.bugtasks
281
372
self.getInfo().all_pillar_owners_without_bug_supervisors)
282
self.assertEqual(set(), found_owners)
283
self.assertEqual((), found_owners.sorted)
284
# Clear the supervisor for the first bugtask's target.
373
self.assertContentEqual([], found_owners)
374
# Clear the supervisor for the bugtask's target and ensure that the
375
# project uses Launchpad Bugs.
285
376
with person_logged_in(bugtask.target.owner):
286
377
bugtask.target.setBugSupervisor(None, bugtask.owner)
378
bugtask.pillar.official_malone = True
379
# The collection includes the pillar's owner.
288
381
self.getInfo().all_pillar_owners_without_bug_supervisors)
289
self.assertEqual(set([bugtask.pillar.owner]), found_owners)
290
self.assertEqual((bugtask.pillar.owner,), found_owners.sorted)
291
# Add another bugtask with a bug supervisor.
292
target2 = self.factory.makeProduct(bug_supervisor=None)
382
self.assertContentEqual([bugtask.pillar.owner], found_owners)
383
# Add another bugtask for a pillar that uses Launchpad but does not
384
# have a bug supervisor.
385
target2 = self.factory.makeProduct(
386
bug_supervisor=None, official_malone=True)
293
387
bugtask2 = self.factory.makeBugTask(bug=self.bug, target=target2)
295
389
self.getInfo().all_pillar_owners_without_bug_supervisors)
297
set([bugtask.pillar.owner, bugtask2.pillar.owner]),
390
self.assertContentEqual(
391
[bugtask.pillar.owner, bugtask2.pillar.owner],
300
(bugtask.pillar.owner, bugtask2.pillar.owner),
394
def test_all_pillar_owners_without_bug_supervisors_not_using_malone(self):
395
# The set of owners of pillars for which no bug supervisor is
396
# configured and which do not use Launchpad for bug tracking is empty.
397
[bugtask] = self.bug.bugtasks
398
# Clear the supervisor for the first bugtask's target and ensure the
399
# project does not use Launchpad Bugs.
400
with person_logged_in(bugtask.target.owner):
401
bugtask.target.setBugSupervisor(None, bugtask.owner)
402
bugtask.pillar.official_malone = False
404
self.getInfo().all_pillar_owners_without_bug_supervisors)
405
self.assertContentEqual([], found_owners)
407
def test_all_pillar_owners_without_bug_supervisors_for_bugtask(self):
408
# The set of the owner of the chosen bugtask's pillar when no bug
409
# supervisor is configured and which uses Launchpad for bug tracking.
410
[bugtask] = self.bug.bugtasks
411
# Clear the supervisor for the bugtask's target and ensure that the
412
# project uses Launchpad Bugs.
413
with person_logged_in(bugtask.target.owner):
414
bugtask.target.setBugSupervisor(None, bugtask.owner)
415
bugtask.pillar.official_malone = True
416
# Add another bugtask for a pillar that uses Launchpad but does not
417
# have a bug supervisor.
418
target2 = self.factory.makeProduct(
419
bug_supervisor=None, official_malone=True)
420
bugtask2 = self.factory.makeBugTask(bug=self.bug, target=target2)
421
# Getting subscription info for just a specific bugtask will yield
422
# owners for only the pillar associated with that bugtask.
423
info_for_bugtask2 = self.getInfo().forTask(bugtask2)
424
self.assertContentEqual(
425
[bugtask2.pillar.owner],
426
info_for_bugtask2.all_pillar_owners_without_bug_supervisors)
303
428
def _create_also_notified_subscribers(self):
304
429
# Add an assignee, a bug supervisor and a structural subscriber.
445
561
self.assertThat(recorder, condition)
447
def exercise_subscription_set(self, set_name):
563
def exercise_subscription_set(self, set_name, counts=(1, 1, 0)):
564
"""Test the number of queries it takes to inspect a subscription set.
566
:param set_name: The name of the set, e.g. "direct_subscriptions".
567
:param counts: A triple of the expected query counts for each of three
568
operations: get the set, get the set's subscribers, get the set's
569
subscribers in order.
448
571
# Looking up subscriptions takes a single query.
449
with self.exactly_x_queries(1):
572
with self.exactly_x_queries(counts[0]):
450
573
getattr(self.info, set_name)
451
574
# Getting the subscribers results in one additional query.
452
with self.exactly_x_queries(1):
575
with self.exactly_x_queries(counts[1]):
453
576
getattr(self.info, set_name).subscribers
454
577
# Everything is now cached so no more queries are needed.
455
with self.exactly_x_queries(0):
578
with self.exactly_x_queries(counts[2]):
456
579
getattr(self.info, set_name).subscribers
457
580
getattr(self.info, set_name).subscribers.sorted
459
def exercise_subscription_set_sorted_first(self, set_name):
582
def exercise_subscription_set_sorted_first(
583
self, set_name, counts=(1, 1, 0)):
584
"""Test the number of queries it takes to inspect a subscription set.
586
This differs from `exercise_subscription_set` in its second step, when
587
it looks at the sorted subscription list instead of the subscriber
590
:param set_name: The name of the set, e.g. "direct_subscriptions".
591
:param counts: A triple of the expected query counts for each of three
592
operations: get the set, get the set in order, get the set's
593
subscribers in order.
460
595
# Looking up subscriptions takes a single query.
461
with self.exactly_x_queries(1):
596
with self.exactly_x_queries(counts[0]):
462
597
getattr(self.info, set_name)
463
598
# Getting the sorted subscriptions takes one additional query.
464
with self.exactly_x_queries(1):
599
with self.exactly_x_queries(counts[1]):
465
600
getattr(self.info, set_name).sorted
466
601
# Everything is now cached so no more queries are needed.
467
with self.exactly_x_queries(0):
602
with self.exactly_x_queries(counts[2]):
468
603
getattr(self.info, set_name).subscribers
469
604
getattr(self.info, set_name).subscribers.sorted