8
from datetime import datetime
14
from testtools.matchers import (
9
18
from zope.component import getUtility
10
19
from zope.security.proxy import removeSecurityProxy
12
21
from lp.code.tests.helpers import remove_all_sample_data_branches
22
from lp.registry.errors import (
26
from lp.registry.interfaces.karma import IKarmaCacheManager
27
from lp.registry.interfaces.mailinglist import MailingListStatus
13
28
from lp.registry.interfaces.mailinglistsubscription import (
14
29
MailingListAutoSubscribePolicy,
31
from lp.registry.interfaces.nameblacklist import INameBlacklistSet
16
32
from lp.registry.interfaces.person import (
18
34
PersonCreationRationale,
20
from lp.registry.model.person import PersonSet
37
from lp.registry.interfaces.personnotification import IPersonNotificationSet
38
from lp.registry.model.accesspolicy import AccessPolicyGrant
39
from lp.registry.model.person import (
43
from lp.registry.tests.test_person import KarmaTestMixin
44
from lp.services.config import config
45
from lp.services.database.lpstorm import (
21
49
from lp.services.database.sqlbase import cursor
22
50
from lp.services.identity.interfaces.account import (
51
AccountCreationRationale,
24
53
AccountSuspendedError,
55
from lp.services.identity.interfaces.emailaddress import (
56
EmailAddressAlreadyTaken,
61
from lp.services.identity.model.account import Account
62
from lp.services.identity.model.emailaddress import EmailAddress
63
from lp.services.openid.model.openididentifier import OpenIdIdentifier
64
from lp.soyuz.enums import (
26
67
from lp.testing import (
74
StormStatementRecorder,
28
76
TestCaseWithFactory,
79
from lp.testing.dbuser import dbuser
30
80
from lp.testing.layers import DatabaseFunctionalLayer
81
from lp.testing.matchers import HasQueryCount
84
class TestPersonSet(TestCaseWithFactory):
85
"""Test `IPersonSet`."""
86
layer = DatabaseFunctionalLayer
89
super(TestPersonSet, self).setUp()
91
self.addCleanup(logout)
92
self.person_set = getUtility(IPersonSet)
94
def test_isNameBlacklisted(self):
96
"INSERT INTO NameBlacklist(id, regexp) VALUES (-100, 'foo')")
97
self.failUnless(self.person_set.isNameBlacklisted('foo'))
98
self.failIf(self.person_set.isNameBlacklisted('bar'))
100
def test_isNameBlacklisted_user_is_admin(self):
101
team = self.factory.makeTeam()
102
name_blacklist_set = getUtility(INameBlacklistSet)
103
self.admin_exp = name_blacklist_set.create(u'fnord', admin=team)
104
self.store = IStore(self.admin_exp)
106
user = team.teamowner
107
self.assertFalse(self.person_set.isNameBlacklisted('fnord', user))
109
def test_getByEmail_ignores_case_and_whitespace(self):
110
person1_email = 'foo.bar@canonical.com'
111
person1 = self.person_set.getByEmail(person1_email)
114
"PersonSet.getByEmail() could not find %r" % person1_email)
116
person2 = self.person_set.getByEmail(' foo.BAR@canonICAL.com ')
119
"PersonSet.getByEmail() should ignore case and whitespace.")
120
self.assertEqual(person1, person2)
122
def test_getPrecachedPersonsFromIDs(self):
123
# The getPrecachedPersonsFromIDs() method should only make one
124
# query to load all the extraneous data. Accessing the
125
# attributes should then cause zero queries.
127
self.factory.makePerson().id
130
with StormStatementRecorder() as recorder:
131
persons = list(self.person_set.getPrecachedPersonsFromIDs(
132
person_ids, need_karma=True, need_ubuntu_coc=True,
133
need_location=True, need_archive=True,
134
need_preferred_email=True, need_validity=True))
135
self.assertThat(recorder, HasQueryCount(LessThan(2)))
137
with StormStatementRecorder() as recorder:
138
for person in persons:
139
person.is_valid_person
141
person.is_ubuntu_coc_signer
144
person.preferredemail
145
self.assertThat(recorder, HasQueryCount(LessThan(1)))
147
def test_latest_teams_public(self):
148
# Anyone can see the latest 5 teams if they are public.
150
for num in xrange(1, 7):
151
teams.append(self.factory.makeTeam(name='team-%s' % num))
153
result = self.person_set.latest_teams()
154
self.assertEqual(teams[0:5], list(result))
156
def test_latest_teams_private(self):
157
# Private teams are only included in the latest teams if the
158
# user can view the team.
160
for num in xrange(1, 7):
161
teams.append(self.factory.makeTeam(name='team-%s' % num))
162
owner = self.factory.makePerson()
164
self.factory.makeTeam(
165
name='private-team', owner=owner,
166
visibility=PersonVisibility.PRIVATE))
169
result = self.person_set.latest_teams()
170
self.assertEqual(teams[0:5], list(result))
171
login_person(self.factory.makePerson())
172
result = self.person_set.latest_teams()
173
self.assertEqual(teams[1:6], list(result))
175
def test_latest_teams_limit(self):
176
# The limit controls the number of latest teams returned.
178
for num in xrange(1, 7):
179
teams.append(self.factory.makeTeam(name='team-%s' % num))
181
result = self.person_set.latest_teams(limit=3)
182
self.assertEqual(teams[0:3], list(result))
185
class TestPersonSetMergeMailingListSubscriptions(TestCaseWithFactory):
187
layer = DatabaseFunctionalLayer
190
TestCaseWithFactory.setUp(self)
191
# Use the unsecured PersonSet so that private methods can be tested.
192
self.person_set = PersonSet()
193
self.from_person = self.factory.makePerson()
194
self.to_person = self.factory.makePerson()
197
def test__mergeMailingListSubscriptions_no_subscriptions(self):
198
self.person_set._mergeMailingListSubscriptions(
199
self.cur, self.from_person.id, self.to_person.id)
200
self.assertEqual(0, self.cur.rowcount)
202
def test__mergeMailingListSubscriptions_with_subscriptions(self):
203
naked_person = removeSecurityProxy(self.from_person)
204
naked_person.mailing_list_auto_subscribe_policy = (
205
MailingListAutoSubscribePolicy.ALWAYS)
206
self.team, self.mailing_list = self.factory.makeTeamAndMailingList(
207
'test-mailinglist', 'team-owner')
208
with person_logged_in(self.team.teamowner):
210
self.from_person, reviewer=self.team.teamowner)
212
self.person_set._mergeMailingListSubscriptions(
213
self.cur, self.from_person.id, self.to_person.id)
214
self.assertEqual(1, self.cur.rowcount)
217
class TestPersonSetMerge(TestCaseWithFactory, KarmaTestMixin):
218
"""Test cases for PersonSet merge."""
220
layer = DatabaseFunctionalLayer
223
super(TestPersonSetMerge, self).setUp()
224
self.person_set = getUtility(IPersonSet)
226
def _do_premerge(self, from_person, to_person):
227
# Do the pre merge work performed by the LoginToken.
228
with celebrity_logged_in('admin'):
229
email = from_person.preferredemail
230
email.status = EmailAddressStatus.NEW
231
email.person = to_person
232
email.account = to_person.account
235
def _do_merge(self, from_person, to_person, reviewer=None):
236
# Perform the merge as the db user that will be used by the jobs.
237
with dbuser(config.IPersonMergeJobSource.dbuser):
238
self.person_set.merge(from_person, to_person, reviewer=reviewer)
239
return from_person, to_person
241
def _get_testable_account(self, person, date_created, openid_identifier):
242
# Return a naked account with predictable attributes.
243
account = removeSecurityProxy(person.account)
244
account.date_created = date_created
245
account.openid_identifier = openid_identifier
248
def test_delete_no_notifications(self):
249
team = self.factory.makeTeam()
250
owner = team.teamowner
252
with dbuser(config.IPersonMergeJobSource.dbuser):
253
self.person_set.delete(team, owner)
254
notification_set = getUtility(IPersonNotificationSet)
255
notifications = notification_set.getNotificationsToSend()
256
self.assertEqual(0, notifications.count())
258
def test_openid_identifiers(self):
259
# Verify that OpenId Identifiers are merged.
260
duplicate = self.factory.makePerson()
261
duplicate_identifier = removeSecurityProxy(
262
duplicate.account).openid_identifiers.any().identifier
263
person = self.factory.makePerson()
264
person_identifier = removeSecurityProxy(
265
person.account).openid_identifiers.any().identifier
266
self._do_premerge(duplicate, person)
268
duplicate, person = self._do_merge(duplicate, person)
271
removeSecurityProxy(duplicate.account).openid_identifiers.count())
273
merged_identifiers = [
274
identifier.identifier for identifier in
275
removeSecurityProxy(person.account).openid_identifiers]
277
self.assertIn(duplicate_identifier, merged_identifiers)
278
self.assertIn(person_identifier, merged_identifiers)
280
def test_karmacache_transferred_to_user_has_no_karma(self):
281
# Verify that the merged user has no KarmaCache entries,
282
# and the karma total was transfered.
283
self.cache_manager = getUtility(IKarmaCacheManager)
284
product = self.factory.makeProduct()
285
duplicate = self.factory.makePerson()
286
self._makeKarmaCache(
287
duplicate, product, [('bugs', 10)])
288
self._makeKarmaTotalCache(duplicate, 15)
289
# The karma changes invalidated duplicate instance.
290
duplicate = self.person_set.get(duplicate.id)
291
person = self.factory.makePerson()
292
self._do_premerge(duplicate, person)
294
duplicate, person = self._do_merge(duplicate, person)
295
self.assertEqual([], duplicate.karma_category_caches)
296
self.assertEqual(0, duplicate.karma)
297
self.assertEqual(15, person.karma)
299
def test_karmacache_transferred_to_user_has_karma(self):
300
# Verify that the merged user has no KarmaCache entries,
301
# and the karma total was summed.
302
self.cache_manager = getUtility(IKarmaCacheManager)
303
product = self.factory.makeProduct()
304
duplicate = self.factory.makePerson()
305
self._makeKarmaCache(
306
duplicate, product, [('bugs', 10)])
307
self._makeKarmaTotalCache(duplicate, 15)
308
person = self.factory.makePerson()
309
self._makeKarmaCache(
310
person, product, [('bugs', 9)])
311
self._makeKarmaTotalCache(person, 13)
312
# The karma changes invalidated duplicate and person instances.
313
duplicate = self.person_set.get(duplicate.id)
314
person = self.person_set.get(person.id)
315
self._do_premerge(duplicate, person)
317
duplicate, person = self._do_merge(duplicate, person)
318
self.assertEqual([], duplicate.karma_category_caches)
319
self.assertEqual(0, duplicate.karma)
320
self.assertEqual(28, person.karma)
322
def test_person_date_created_preserved(self):
323
# Verify that the oldest datecreated is merged.
324
person = self.factory.makePerson()
325
duplicate = self.factory.makePerson()
326
oldest_date = datetime(
327
2005, 11, 25, 0, 0, 0, 0, pytz.timezone('UTC'))
328
removeSecurityProxy(duplicate).datecreated = oldest_date
329
self._do_premerge(duplicate, person)
331
duplicate, person = self._do_merge(duplicate, person)
332
self.assertEqual(oldest_date, person.datecreated)
334
def test_team_with_active_mailing_list_raises_error(self):
335
# A team with an active mailing list cannot be merged.
336
target_team = self.factory.makeTeam()
337
test_team = self.factory.makeTeam()
338
self.factory.makeMailingList(
339
test_team, test_team.teamowner)
341
AssertionError, self.person_set.merge, test_team, target_team)
343
def test_team_with_inactive_mailing_list(self):
344
# A team with an inactive mailing list can be merged.
345
target_team = self.factory.makeTeam()
346
test_team = self.factory.makeTeam()
347
mailing_list = self.factory.makeMailingList(
348
test_team, test_team.teamowner)
349
mailing_list.deactivate()
350
mailing_list.transitionToStatus(MailingListStatus.INACTIVE)
351
test_team, target_team = self._do_merge(
352
test_team, target_team, test_team.teamowner)
353
self.assertEqual(target_team, test_team.merged)
355
MailingListStatus.PURGED, test_team.mailing_list.status)
356
emails = getUtility(IEmailAddressSet).getByPerson(target_team).count()
357
self.assertEqual(0, emails)
359
def test_team_with_purged_mailing_list(self):
360
# A team with a purges mailing list can be merged.
361
target_team = self.factory.makeTeam()
362
test_team = self.factory.makeTeam()
363
mailing_list = self.factory.makeMailingList(
364
test_team, test_team.teamowner)
365
mailing_list.deactivate()
366
mailing_list.transitionToStatus(MailingListStatus.INACTIVE)
368
test_team, target_team = self._do_merge(
369
test_team, target_team, test_team.teamowner)
370
self.assertEqual(target_team, test_team.merged)
372
def test_team_with_members(self):
373
# Team members are removed before merging.
374
target_team = self.factory.makeTeam()
375
test_team = self.factory.makeTeam()
376
former_member = self.factory.makePerson()
377
with person_logged_in(test_team.teamowner):
378
test_team.addMember(former_member, test_team.teamowner)
379
test_team, target_team = self._do_merge(
380
test_team, target_team, test_team.teamowner)
381
self.assertEqual(target_team, test_team.merged)
382
self.assertEqual([], list(former_member.super_teams))
384
def test_team_without_super_teams_is_fine(self):
385
# A team with no members and no super teams
386
# merges without errors.
387
test_team = self.factory.makeTeam()
388
target_team = self.factory.makeTeam()
389
login_person(test_team.teamowner)
390
self._do_merge(test_team, target_team, test_team.teamowner)
392
def test_team_with_super_teams(self):
393
# A team with superteams can be merged, but the memberships
394
# are not transferred.
395
test_team = self.factory.makeTeam()
396
super_team = self.factory.makeTeam()
397
target_team = self.factory.makeTeam()
398
login_person(test_team.teamowner)
399
test_team.join(super_team, test_team.teamowner)
400
test_team, target_team = self._do_merge(
401
test_team, target_team, test_team.teamowner)
402
self.assertEqual(target_team, test_team.merged)
403
self.assertEqual([], list(target_team.super_teams))
405
def test_merge_moves_branches(self):
406
# When person/teams are merged, branches owned by the from person
408
person = self.factory.makePerson()
409
branch = self.factory.makeBranch()
410
duplicate = branch.owner
411
self._do_premerge(branch.owner, person)
413
duplicate, person = self._do_merge(duplicate, person)
414
branches = person.getBranches()
415
self.assertEqual(1, branches.count())
417
def test_merge_with_duplicated_branches(self):
418
# If both the from and to people have branches with the same name,
419
# merging renames the duplicate from the from person's side.
420
product = self.factory.makeProduct()
421
from_branch = self.factory.makeBranch(name='foo', product=product)
422
to_branch = self.factory.makeBranch(name='foo', product=product)
423
mergee = to_branch.owner
424
duplicate = from_branch.owner
425
self._do_premerge(duplicate, mergee)
427
duplicate, mergee = self._do_merge(duplicate, mergee)
428
branches = [b.name for b in mergee.getBranches()]
429
self.assertEqual(2, len(branches))
430
self.assertContentEqual([u'foo', u'foo-1'], branches)
432
def test_merge_moves_recipes(self):
433
# When person/teams are merged, recipes owned by the from person are
435
person = self.factory.makePerson()
436
recipe = self.factory.makeSourcePackageRecipe()
437
duplicate = recipe.owner
438
# Delete the PPA, which is required for the merge to work.
439
with person_logged_in(duplicate):
440
recipe.owner.archive.status = ArchiveStatus.DELETED
441
self._do_premerge(duplicate, person)
443
duplicate, person = self._do_merge(duplicate, person)
444
self.assertEqual(1, person.recipes.count())
446
def test_merge_with_duplicated_recipes(self):
447
# If both the from and to people have recipes with the same name,
448
# merging renames the duplicate from the from person's side.
449
merge_from = self.factory.makeSourcePackageRecipe(
450
name=u'foo', description=u'FROM')
451
merge_to = self.factory.makeSourcePackageRecipe(
452
name=u'foo', description=u'TO')
453
duplicate = merge_from.owner
454
mergee = merge_to.owner
455
# Delete merge_from's PPA, which is required for the merge to work.
456
with person_logged_in(merge_from.owner):
457
merge_from.owner.archive.status = ArchiveStatus.DELETED
458
self._do_premerge(merge_from.owner, mergee)
460
duplicate, mergee = self._do_merge(duplicate, mergee)
461
recipes = mergee.recipes
462
self.assertEqual(2, recipes.count())
463
descriptions = [r.description for r in recipes]
464
self.assertEqual([u'TO', u'FROM'], descriptions)
465
self.assertEqual(u'foo-1', recipes[1].name)
467
def assertSubscriptionMerges(self, target):
468
# Given a subscription target, we want to make sure that subscriptions
469
# that the duplicate person made are carried over to the merged
471
duplicate = self.factory.makePerson()
472
with person_logged_in(duplicate):
473
target.addSubscription(duplicate, duplicate)
474
person = self.factory.makePerson()
475
self._do_premerge(duplicate, person)
477
duplicate, person = self._do_merge(duplicate, person)
478
# The merged person has the subscription, and the duplicate person
480
self.assertTrue(target.getSubscription(person) is not None)
481
self.assertTrue(target.getSubscription(duplicate) is None)
483
def assertConflictingSubscriptionDeletes(self, target):
484
# Given a subscription target, we want to make sure that subscriptions
485
# that the duplicate person made that conflict with existing
486
# subscriptions in the merged account are deleted.
487
duplicate = self.factory.makePerson()
488
person = self.factory.makePerson()
489
with person_logged_in(duplicate):
490
target.addSubscription(duplicate, duplicate)
491
with person_logged_in(person):
492
# The description lets us show that we still have the right
493
# subscription later.
494
target.addBugSubscriptionFilter(person, person).description = (
496
self._do_premerge(duplicate, person)
498
duplicate, person = self._do_merge(duplicate, person)
499
# The merged person still has the original subscription, as shown
500
# by the marker name.
502
target.getSubscription(person).bug_filters[0].description,
504
# The conflicting subscription on the duplicate has been deleted.
505
self.assertTrue(target.getSubscription(duplicate) is None)
507
def test_merge_with_product_subscription(self):
508
# See comments in assertSubscriptionMerges.
509
self.assertSubscriptionMerges(self.factory.makeProduct())
511
def test_merge_with_conflicting_product_subscription(self):
512
# See comments in assertConflictingSubscriptionDeletes.
513
self.assertConflictingSubscriptionDeletes(self.factory.makeProduct())
515
def test_merge_with_project_subscription(self):
516
# See comments in assertSubscriptionMerges.
517
self.assertSubscriptionMerges(self.factory.makeProject())
519
def test_merge_with_conflicting_project_subscription(self):
520
# See comments in assertConflictingSubscriptionDeletes.
521
self.assertConflictingSubscriptionDeletes(self.factory.makeProject())
523
def test_merge_with_distroseries_subscription(self):
524
# See comments in assertSubscriptionMerges.
525
self.assertSubscriptionMerges(self.factory.makeDistroSeries())
527
def test_merge_with_conflicting_distroseries_subscription(self):
528
# See comments in assertConflictingSubscriptionDeletes.
529
self.assertConflictingSubscriptionDeletes(
530
self.factory.makeDistroSeries())
532
def test_merge_with_milestone_subscription(self):
533
# See comments in assertSubscriptionMerges.
534
self.assertSubscriptionMerges(self.factory.makeMilestone())
536
def test_merge_with_conflicting_milestone_subscription(self):
537
# See comments in assertConflictingSubscriptionDeletes.
538
self.assertConflictingSubscriptionDeletes(
539
self.factory.makeMilestone())
541
def test_merge_with_productseries_subscription(self):
542
# See comments in assertSubscriptionMerges.
543
self.assertSubscriptionMerges(self.factory.makeProductSeries())
545
def test_merge_with_conflicting_productseries_subscription(self):
546
# See comments in assertConflictingSubscriptionDeletes.
547
self.assertConflictingSubscriptionDeletes(
548
self.factory.makeProductSeries())
550
def test_merge_with_distribution_subscription(self):
551
# See comments in assertSubscriptionMerges.
552
self.assertSubscriptionMerges(self.factory.makeDistribution())
554
def test_merge_with_conflicting_distribution_subscription(self):
555
# See comments in assertConflictingSubscriptionDeletes.
556
self.assertConflictingSubscriptionDeletes(
557
self.factory.makeDistribution())
559
def test_merge_with_sourcepackage_subscription(self):
560
# See comments in assertSubscriptionMerges.
561
dsp = self.factory.makeDistributionSourcePackage()
562
self.assertSubscriptionMerges(dsp)
564
def test_merge_with_conflicting_sourcepackage_subscription(self):
565
# See comments in assertConflictingSubscriptionDeletes.
566
dsp = self.factory.makeDistributionSourcePackage()
567
self.assertConflictingSubscriptionDeletes(dsp)
569
def test_merge_accesspolicygrants(self):
570
# AccessPolicyGrants are transferred from the duplicate.
571
person = self.factory.makePerson()
572
grant = self.factory.makeAccessPolicyGrant()
573
self._do_premerge(grant.grantee, person)
574
with person_logged_in(person):
575
self._do_merge(grant.grantee, person)
576
self.assertEqual(person, grant.grantee)
578
def test_merge_accesspolicygrants_conflicts(self):
579
# Conflicting AccessPolicyGrants are deleted.
580
policy = self.factory.makeAccessPolicy()
582
person = self.factory.makePerson()
583
person_grantor = self.factory.makePerson()
584
person_grant = self.factory.makeAccessPolicyGrant(
585
grantee=person, grantor=person_grantor, object=policy)
587
duplicate = self.factory.makePerson()
588
duplicate_grantor = self.factory.makePerson()
589
duplicate_grant = self.factory.makeAccessPolicyGrant(
590
grantee=duplicate, grantor=duplicate_grantor, object=policy)
592
self._do_premerge(duplicate, person)
593
with person_logged_in(person):
594
self._do_merge(duplicate, person)
597
self.assertEqual(person, person_grant.grantee)
598
self.assertEqual(person_grantor, person_grant.grantor)
601
IStore(AccessPolicyGrant).get(
602
AccessPolicyGrant, duplicate_grant.id))
604
def test_mergeAsync(self):
605
# mergeAsync() creates a new `PersonMergeJob`.
606
from_person = self.factory.makePerson()
607
to_person = self.factory.makePerson()
608
login_person(from_person)
609
job = self.person_set.mergeAsync(from_person, to_person)
610
self.assertEqual(from_person, job.from_person)
611
self.assertEqual(to_person, job.to_person)
614
class TestPersonSetCreateByOpenId(TestCaseWithFactory):
615
layer = DatabaseFunctionalLayer
618
super(TestPersonSetCreateByOpenId, self).setUp()
619
self.person_set = getUtility(IPersonSet)
620
self.store = IMasterStore(Account)
622
# Generate some valid test data.
623
self.account = self.makeAccount()
624
self.identifier = self.makeOpenIdIdentifier(self.account, u'whatever')
625
self.person = self.makePerson(self.account)
626
self.email = self.makeEmailAddress(
627
email='whatever@example.com', person=self.person)
629
def makeAccount(self):
630
return self.store.add(Account(
631
displayname='Displayname',
632
creation_rationale=AccountCreationRationale.UNKNOWN,
633
status=AccountStatus.ACTIVE))
635
def makeOpenIdIdentifier(self, account, identifier):
636
openid_identifier = OpenIdIdentifier()
637
openid_identifier.identifier = identifier
638
openid_identifier.account = account
639
return self.store.add(openid_identifier)
641
def makePerson(self, account):
642
return self.store.add(Person(
643
name='acc%d' % account.id, account=account,
644
displayname='Displayname',
645
creation_rationale=PersonCreationRationale.UNKNOWN))
647
def makeEmailAddress(self, email, person):
648
return self.store.add(EmailAddress(
650
account=person.account,
652
status=EmailAddressStatus.PREFERRED))
654
def testAllValid(self):
655
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
656
self.identifier.identifier, self.email.email, 'Ignored Name',
657
PersonCreationRationale.UNKNOWN, 'No Comment')
658
found = removeSecurityProxy(found)
660
self.assertIs(False, updated)
661
self.assertIs(self.person, found)
662
self.assertIs(self.account, found.account)
663
self.assertIs(self.email, found.preferredemail)
664
self.assertIs(self.email.account, self.account)
665
self.assertIs(self.email.person, self.person)
667
[self.identifier], list(self.account.openid_identifiers))
669
def testEmailAddressCaseInsensitive(self):
670
# As per testAllValid, but the email address used for the lookup
672
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
673
self.identifier.identifier, self.email.email.upper(),
674
'Ignored Name', PersonCreationRationale.UNKNOWN, 'No Comment')
675
found = removeSecurityProxy(found)
677
self.assertIs(False, updated)
678
self.assertIs(self.person, found)
679
self.assertIs(self.account, found.account)
680
self.assertIs(self.email, found.preferredemail)
681
self.assertIs(self.email.account, self.account)
682
self.assertIs(self.email.person, self.person)
684
[self.identifier], list(self.account.openid_identifiers))
686
def testNewOpenId(self):
687
# Account looked up by email and the new OpenId identifier
688
# attached. We can do this because we trust our OpenId Provider.
689
new_identifier = u'newident'
690
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
691
new_identifier, self.email.email, 'Ignored Name',
692
PersonCreationRationale.UNKNOWN, 'No Comment')
693
found = removeSecurityProxy(found)
695
self.assertIs(True, updated)
696
self.assertIs(self.person, found)
697
self.assertIs(self.account, found.account)
698
self.assertIs(self.email, found.preferredemail)
699
self.assertIs(self.email.account, self.account)
700
self.assertIs(self.email.person, self.person)
702
# Old OpenId Identifier still attached.
703
self.assertIn(self.identifier, list(self.account.openid_identifiers))
707
identifier.identifier for identifier
708
in self.account.openid_identifiers]
709
self.assertIn(new_identifier, identifiers)
711
def testNewEmailAddress(self):
712
# Account looked up by OpenId identifier and new EmailAddress
713
# attached. We can do this because we trust our OpenId Provider.
714
new_email = u'new_email@example.com'
715
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
716
self.identifier.identifier, new_email, 'Ignored Name',
717
PersonCreationRationale.UNKNOWN, 'No Comment')
718
found = removeSecurityProxy(found)
720
self.assertIs(True, updated)
721
self.assertIs(self.person, found)
722
self.assertIs(self.account, found.account)
724
[self.identifier], list(self.account.openid_identifiers))
726
# The old email address is still there and correctly linked.
727
self.assertIs(self.email, found.preferredemail)
728
self.assertIs(self.email.account, self.account)
729
self.assertIs(self.email.person, self.person)
731
# The new email address is there too and correctly linked.
732
new_email = self.store.find(EmailAddress, email=new_email).one()
733
self.assertIs(new_email.account, self.account)
734
self.assertIs(new_email.person, self.person)
735
self.assertEqual(EmailAddressStatus.NEW, new_email.status)
737
def testNewAccountAndIdentifier(self):
738
# If neither the OpenId Identifier nor the email address are
739
# found, we create everything.
740
new_email = u'new_email@example.com'
741
new_identifier = u'new_identifier'
742
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
743
new_identifier, new_email, 'New Name',
744
PersonCreationRationale.UNKNOWN, 'No Comment')
745
found = removeSecurityProxy(found)
747
# We have a new Person
748
self.assertIs(True, updated)
749
self.assertIsNot(None, found)
751
# It is correctly linked to an account, emailaddress and
753
self.assertIs(found, found.preferredemail.person)
754
self.assertIs(found.account, found.preferredemail.account)
756
new_identifier, found.account.openid_identifiers.any().identifier)
758
def testNoPerson(self):
759
# If the account is not linked to a Person, create one. ShipIt
760
# users fall into this category the first time they log into
762
self.email.person = None
763
self.person.account = None
765
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
766
self.identifier.identifier, self.email.email, 'New Name',
767
PersonCreationRationale.UNKNOWN, 'No Comment')
768
found = removeSecurityProxy(found)
770
# We have a new Person
771
self.assertIs(True, updated)
772
self.assertIsNot(self.person, found)
774
# It is correctly linked to an account, emailaddress and
776
self.assertIs(found, found.preferredemail.person)
777
self.assertIs(found.account, found.preferredemail.account)
778
self.assertIn(self.identifier, list(found.account.openid_identifiers))
780
def testNoAccount(self):
781
# EmailAddress is linked to a Person, but there is no Account.
782
# Convert this stub into something valid.
783
self.email.account = None
784
self.email.status = EmailAddressStatus.NEW
785
self.person.account = None
786
new_identifier = u'new_identifier'
787
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
788
new_identifier, self.email.email, 'Ignored',
789
PersonCreationRationale.UNKNOWN, 'No Comment')
790
found = removeSecurityProxy(found)
792
self.assertIs(True, updated)
794
self.assertIsNot(None, found.account)
796
new_identifier, found.account.openid_identifiers.any().identifier)
797
self.assertIs(self.email.person, found)
798
self.assertIs(self.email.account, found.account)
799
self.assertEqual(EmailAddressStatus.PREFERRED, self.email.status)
801
def testMovedEmailAddress(self):
802
# The EmailAddress and OpenId Identifier are both in the
803
# database, but they are not linked to the same account. The
804
# identifier needs to be relinked to the correct account - the
805
# user able to log into the trusted SSO with that email address
806
# should be able to log into Launchpad with that email address.
807
# This lets us cope with the SSO migrating email addresses
808
# between SSO accounts.
809
self.identifier.account = self.store.find(
810
Account, displayname='Foo Bar').one()
812
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
813
self.identifier.identifier, self.email.email, 'New Name',
814
PersonCreationRationale.UNKNOWN, 'No Comment')
815
found = removeSecurityProxy(found)
817
self.assertIs(True, updated)
818
self.assertIs(self.person, found)
820
self.assertIs(found.account, self.identifier.account)
821
self.assertIn(self.identifier, list(found.account.openid_identifiers))
824
class TestCreatePersonAndEmail(TestCase):
825
"""Test `IPersonSet`.createPersonAndEmail()."""
826
layer = DatabaseFunctionalLayer
831
self.addCleanup(logout)
832
self.person_set = getUtility(IPersonSet)
834
def test_duplicated_name_not_accepted(self):
835
self.person_set.createPersonAndEmail(
836
'testing@example.com', PersonCreationRationale.UNKNOWN,
839
NameAlreadyTaken, self.person_set.createPersonAndEmail,
840
'testing2@example.com', PersonCreationRationale.UNKNOWN,
843
def test_duplicated_email_not_accepted(self):
844
self.person_set.createPersonAndEmail(
845
'testing@example.com', PersonCreationRationale.UNKNOWN)
847
EmailAddressAlreadyTaken, self.person_set.createPersonAndEmail,
848
'testing@example.com', PersonCreationRationale.UNKNOWN)
850
def test_invalid_email_not_accepted(self):
852
InvalidEmailAddress, self.person_set.createPersonAndEmail,
853
'testing@.com', PersonCreationRationale.UNKNOWN)
855
def test_invalid_name_not_accepted(self):
857
InvalidName, self.person_set.createPersonAndEmail,
858
'testing@example.com', PersonCreationRationale.UNKNOWN,
33
862
class TestPersonSetBranchCounts(TestCaseWithFactory):