736
716
self.assertEqual('(\\u0170-tester)>', displayname)
739
class TestPersonSet(TestCaseWithFactory):
740
"""Test `IPersonSet`."""
741
layer = DatabaseFunctionalLayer
744
super(TestPersonSet, self).setUp()
746
self.addCleanup(logout)
747
self.person_set = getUtility(IPersonSet)
749
def test_isNameBlacklisted(self):
751
"INSERT INTO NameBlacklist(id, regexp) VALUES (-100, 'foo')")
752
self.failUnless(self.person_set.isNameBlacklisted('foo'))
753
self.failIf(self.person_set.isNameBlacklisted('bar'))
755
def test_isNameBlacklisted_user_is_admin(self):
756
team = self.factory.makeTeam()
757
name_blacklist_set = getUtility(INameBlacklistSet)
758
self.admin_exp = name_blacklist_set.create(u'fnord', admin=team)
759
self.store = IStore(self.admin_exp)
761
user = team.teamowner
762
self.assertFalse(self.person_set.isNameBlacklisted('fnord', user))
764
def test_getByEmail_ignores_case_and_whitespace(self):
765
person1_email = 'foo.bar@canonical.com'
766
person1 = self.person_set.getByEmail(person1_email)
769
"PersonSet.getByEmail() could not find %r" % person1_email)
771
person2 = self.person_set.getByEmail(' foo.BAR@canonICAL.com ')
774
"PersonSet.getByEmail() should ignore case and whitespace.")
775
self.assertEqual(person1, person2)
777
def test_getPrecachedPersonsFromIDs(self):
778
# The getPrecachedPersonsFromIDs() method should only make one
779
# query to load all the extraneous data. Accessing the
780
# attributes should then cause zero queries.
782
self.factory.makePerson().id
785
with StormStatementRecorder() as recorder:
786
persons = list(self.person_set.getPrecachedPersonsFromIDs(
787
person_ids, need_karma=True, need_ubuntu_coc=True,
788
need_location=True, need_archive=True,
789
need_preferred_email=True, need_validity=True))
790
self.assertThat(recorder, HasQueryCount(LessThan(2)))
792
with StormStatementRecorder() as recorder:
793
for person in persons:
794
person.is_valid_person
796
person.is_ubuntu_coc_signer
799
person.preferredemail
800
self.assertThat(recorder, HasQueryCount(LessThan(1)))
802
def test_latest_teams_public(self):
803
# Anyone can see the latest 5 teams if they are public.
805
for num in xrange(1, 7):
806
teams.append(self.factory.makeTeam(name='team-%s' % num))
808
result = self.person_set.latest_teams()
809
self.assertEqual(teams[0:5], list(result))
811
def test_latest_teams_private(self):
812
# Private teams are only included in the latest teams if the
813
# user can view the team.
815
for num in xrange(1, 7):
816
teams.append(self.factory.makeTeam(name='team-%s' % num))
817
owner = self.factory.makePerson()
819
self.factory.makeTeam(
820
name='private-team', owner=owner,
821
visibility=PersonVisibility.PRIVATE))
824
result = self.person_set.latest_teams()
825
self.assertEqual(teams[0:5], list(result))
826
login_person(self.factory.makePerson())
827
result = self.person_set.latest_teams()
828
self.assertEqual(teams[1:6], list(result))
830
def test_latest_teams_limit(self):
831
# The limit controls the number of latest teams returned.
833
for num in xrange(1, 7):
834
teams.append(self.factory.makeTeam(name='team-%s' % num))
836
result = self.person_set.latest_teams(limit=3)
837
self.assertEqual(teams[0:3], list(result))
840
class KarmaTestMixin:
841
"""Helper methods for setting karma."""
843
def _makeKarmaCache(self, person, product, category_name_values):
844
"""Create a KarmaCache entry with the given arguments.
846
In order to create the KarmaCache record we must switch to the DB
847
user 'karma'. This invalidates the objects under test so they
848
must be retrieved again.
850
with dbuser('karma'):
852
# Insert category total for person and project.
853
for category_name, value in category_name_values:
854
category = KarmaCategory.byName(category_name)
855
self.cache_manager.new(
856
value, person.id, category.id, product_id=product.id)
858
# Insert total cache for person and project.
859
self.cache_manager.new(
860
total, person.id, None, product_id=product.id)
862
def _makeKarmaTotalCache(self, person, total):
863
"""Create a KarmaTotalCache entry.
865
In order to create the KarmaTotalCache record we must switch to the DB
866
user 'karma'. This invalidates the objects under test so they
867
must be retrieved again.
869
with dbuser('karma'):
870
KarmaTotalCache(person=person.id, karma_total=total)
873
class TestPersonSetMerge(TestCaseWithFactory, KarmaTestMixin):
874
"""Test cases for PersonSet merge."""
876
layer = DatabaseFunctionalLayer
879
super(TestPersonSetMerge, self).setUp()
880
self.person_set = getUtility(IPersonSet)
882
def _do_premerge(self, from_person, to_person):
883
# Do the pre merge work performed by the LoginToken.
884
with celebrity_logged_in('admin'):
885
email = from_person.preferredemail
886
email.status = EmailAddressStatus.NEW
887
email.person = to_person
888
email.account = to_person.account
891
def _do_merge(self, from_person, to_person, reviewer=None):
892
# Perform the merge as the db user that will be used by the jobs.
893
with dbuser(config.IPersonMergeJobSource.dbuser):
894
self.person_set.merge(from_person, to_person, reviewer=reviewer)
895
return from_person, to_person
897
def _get_testable_account(self, person, date_created, openid_identifier):
898
# Return a naked account with predictable attributes.
899
account = removeSecurityProxy(person.account)
900
account.date_created = date_created
901
account.openid_identifier = openid_identifier
904
def test_delete_no_notifications(self):
905
team = self.factory.makeTeam()
906
owner = team.teamowner
908
with dbuser(config.IPersonMergeJobSource.dbuser):
909
self.person_set.delete(team, owner)
910
notification_set = getUtility(IPersonNotificationSet)
911
notifications = notification_set.getNotificationsToSend()
912
self.assertEqual(0, notifications.count())
914
def test_openid_identifiers(self):
915
# Verify that OpenId Identifiers are merged.
916
duplicate = self.factory.makePerson()
917
duplicate_identifier = removeSecurityProxy(
918
duplicate.account).openid_identifiers.any().identifier
919
person = self.factory.makePerson()
920
person_identifier = removeSecurityProxy(
921
person.account).openid_identifiers.any().identifier
922
self._do_premerge(duplicate, person)
924
duplicate, person = self._do_merge(duplicate, person)
927
removeSecurityProxy(duplicate.account).openid_identifiers.count())
929
merged_identifiers = [
930
identifier.identifier for identifier in
931
removeSecurityProxy(person.account).openid_identifiers]
933
self.assertIn(duplicate_identifier, merged_identifiers)
934
self.assertIn(person_identifier, merged_identifiers)
936
def test_karmacache_transferred_to_user_has_no_karma(self):
937
# Verify that the merged user has no KarmaCache entries,
938
# and the karma total was transfered.
939
self.cache_manager = getUtility(IKarmaCacheManager)
940
product = self.factory.makeProduct()
941
duplicate = self.factory.makePerson()
942
self._makeKarmaCache(
943
duplicate, product, [('bugs', 10)])
944
self._makeKarmaTotalCache(duplicate, 15)
945
# The karma changes invalidated duplicate instance.
946
duplicate = self.person_set.get(duplicate.id)
947
person = self.factory.makePerson()
948
self._do_premerge(duplicate, person)
950
duplicate, person = self._do_merge(duplicate, person)
951
self.assertEqual([], duplicate.karma_category_caches)
952
self.assertEqual(0, duplicate.karma)
953
self.assertEqual(15, person.karma)
955
def test_karmacache_transferred_to_user_has_karma(self):
956
# Verify that the merged user has no KarmaCache entries,
957
# and the karma total was summed.
958
self.cache_manager = getUtility(IKarmaCacheManager)
959
product = self.factory.makeProduct()
960
duplicate = self.factory.makePerson()
961
self._makeKarmaCache(
962
duplicate, product, [('bugs', 10)])
963
self._makeKarmaTotalCache(duplicate, 15)
964
person = self.factory.makePerson()
965
self._makeKarmaCache(
966
person, product, [('bugs', 9)])
967
self._makeKarmaTotalCache(person, 13)
968
# The karma changes invalidated duplicate and person instances.
969
duplicate = self.person_set.get(duplicate.id)
970
person = self.person_set.get(person.id)
971
self._do_premerge(duplicate, person)
973
duplicate, person = self._do_merge(duplicate, person)
974
self.assertEqual([], duplicate.karma_category_caches)
975
self.assertEqual(0, duplicate.karma)
976
self.assertEqual(28, person.karma)
978
def test_person_date_created_preserved(self):
979
# Verify that the oldest datecreated is merged.
980
person = self.factory.makePerson()
981
duplicate = self.factory.makePerson()
982
oldest_date = datetime(
983
2005, 11, 25, 0, 0, 0, 0, pytz.timezone('UTC'))
984
removeSecurityProxy(duplicate).datecreated = oldest_date
985
self._do_premerge(duplicate, person)
987
duplicate, person = self._do_merge(duplicate, person)
988
self.assertEqual(oldest_date, person.datecreated)
990
def test_team_with_active_mailing_list_raises_error(self):
991
# A team with an active mailing list cannot be merged.
992
target_team = self.factory.makeTeam()
993
test_team = self.factory.makeTeam()
994
self.factory.makeMailingList(
995
test_team, test_team.teamowner)
997
AssertionError, self.person_set.merge, test_team, target_team)
999
def test_team_with_inactive_mailing_list(self):
1000
# A team with an inactive mailing list can be merged.
1001
target_team = self.factory.makeTeam()
1002
test_team = self.factory.makeTeam()
1003
mailing_list = self.factory.makeMailingList(
1004
test_team, test_team.teamowner)
1005
mailing_list.deactivate()
1006
mailing_list.transitionToStatus(MailingListStatus.INACTIVE)
1007
test_team, target_team = self._do_merge(
1008
test_team, target_team, test_team.teamowner)
1009
self.assertEqual(target_team, test_team.merged)
1011
MailingListStatus.PURGED, test_team.mailing_list.status)
1012
emails = getUtility(IEmailAddressSet).getByPerson(target_team).count()
1013
self.assertEqual(0, emails)
1015
def test_team_with_purged_mailing_list(self):
1016
# A team with a purges mailing list can be merged.
1017
target_team = self.factory.makeTeam()
1018
test_team = self.factory.makeTeam()
1019
mailing_list = self.factory.makeMailingList(
1020
test_team, test_team.teamowner)
1021
mailing_list.deactivate()
1022
mailing_list.transitionToStatus(MailingListStatus.INACTIVE)
1023
mailing_list.purge()
1024
test_team, target_team = self._do_merge(
1025
test_team, target_team, test_team.teamowner)
1026
self.assertEqual(target_team, test_team.merged)
1028
def test_team_with_members(self):
1029
# Team members are removed before merging.
1030
target_team = self.factory.makeTeam()
1031
test_team = self.factory.makeTeam()
1032
former_member = self.factory.makePerson()
1033
with person_logged_in(test_team.teamowner):
1034
test_team.addMember(former_member, test_team.teamowner)
1035
test_team, target_team = self._do_merge(
1036
test_team, target_team, test_team.teamowner)
1037
self.assertEqual(target_team, test_team.merged)
1038
self.assertEqual([], list(former_member.super_teams))
1040
def test_team_without_super_teams_is_fine(self):
1041
# A team with no members and no super teams
1042
# merges without errors.
1043
test_team = self.factory.makeTeam()
1044
target_team = self.factory.makeTeam()
1045
login_person(test_team.teamowner)
1046
self._do_merge(test_team, target_team, test_team.teamowner)
1048
def test_team_with_super_teams(self):
1049
# A team with superteams can be merged, but the memberships
1050
# are not transferred.
1051
test_team = self.factory.makeTeam()
1052
super_team = self.factory.makeTeam()
1053
target_team = self.factory.makeTeam()
1054
login_person(test_team.teamowner)
1055
test_team.join(super_team, test_team.teamowner)
1056
test_team, target_team = self._do_merge(
1057
test_team, target_team, test_team.teamowner)
1058
self.assertEqual(target_team, test_team.merged)
1059
self.assertEqual([], list(target_team.super_teams))
1061
def test_merge_moves_branches(self):
1062
# When person/teams are merged, branches owned by the from person
1064
person = self.factory.makePerson()
1065
branch = self.factory.makeBranch()
1066
duplicate = branch.owner
1067
self._do_premerge(branch.owner, person)
1068
login_person(person)
1069
duplicate, person = self._do_merge(duplicate, person)
1070
branches = person.getBranches()
1071
self.assertEqual(1, branches.count())
1073
def test_merge_with_duplicated_branches(self):
1074
# If both the from and to people have branches with the same name,
1075
# merging renames the duplicate from the from person's side.
1076
product = self.factory.makeProduct()
1077
from_branch = self.factory.makeBranch(name='foo', product=product)
1078
to_branch = self.factory.makeBranch(name='foo', product=product)
1079
mergee = to_branch.owner
1080
duplicate = from_branch.owner
1081
self._do_premerge(duplicate, mergee)
1082
login_person(mergee)
1083
duplicate, mergee = self._do_merge(duplicate, mergee)
1084
branches = [b.name for b in mergee.getBranches()]
1085
self.assertEqual(2, len(branches))
1086
self.assertContentEqual([u'foo', u'foo-1'], branches)
1088
def test_merge_moves_recipes(self):
1089
# When person/teams are merged, recipes owned by the from person are
1091
person = self.factory.makePerson()
1092
recipe = self.factory.makeSourcePackageRecipe()
1093
duplicate = recipe.owner
1094
# Delete the PPA, which is required for the merge to work.
1095
with person_logged_in(duplicate):
1096
recipe.owner.archive.status = ArchiveStatus.DELETED
1097
self._do_premerge(duplicate, person)
1098
login_person(person)
1099
duplicate, person = self._do_merge(duplicate, person)
1100
self.assertEqual(1, person.recipes.count())
1102
def test_merge_with_duplicated_recipes(self):
1103
# If both the from and to people have recipes with the same name,
1104
# merging renames the duplicate from the from person's side.
1105
merge_from = self.factory.makeSourcePackageRecipe(
1106
name=u'foo', description=u'FROM')
1107
merge_to = self.factory.makeSourcePackageRecipe(
1108
name=u'foo', description=u'TO')
1109
duplicate = merge_from.owner
1110
mergee = merge_to.owner
1111
# Delete merge_from's PPA, which is required for the merge to work.
1112
with person_logged_in(merge_from.owner):
1113
merge_from.owner.archive.status = ArchiveStatus.DELETED
1114
self._do_premerge(merge_from.owner, mergee)
1115
login_person(mergee)
1116
duplicate, mergee = self._do_merge(duplicate, mergee)
1117
recipes = mergee.recipes
1118
self.assertEqual(2, recipes.count())
1119
descriptions = [r.description for r in recipes]
1120
self.assertEqual([u'TO', u'FROM'], descriptions)
1121
self.assertEqual(u'foo-1', recipes[1].name)
1123
def assertSubscriptionMerges(self, target):
1124
# Given a subscription target, we want to make sure that subscriptions
1125
# that the duplicate person made are carried over to the merged
1127
duplicate = self.factory.makePerson()
1128
with person_logged_in(duplicate):
1129
target.addSubscription(duplicate, duplicate)
1130
person = self.factory.makePerson()
1131
self._do_premerge(duplicate, person)
1132
login_person(person)
1133
duplicate, person = self._do_merge(duplicate, person)
1134
# The merged person has the subscription, and the duplicate person
1136
self.assertTrue(target.getSubscription(person) is not None)
1137
self.assertTrue(target.getSubscription(duplicate) is None)
1139
def assertConflictingSubscriptionDeletes(self, target):
1140
# Given a subscription target, we want to make sure that subscriptions
1141
# that the duplicate person made that conflict with existing
1142
# subscriptions in the merged account are deleted.
1143
duplicate = self.factory.makePerson()
1144
person = self.factory.makePerson()
1145
with person_logged_in(duplicate):
1146
target.addSubscription(duplicate, duplicate)
1147
with person_logged_in(person):
1148
# The description lets us show that we still have the right
1149
# subscription later.
1150
target.addBugSubscriptionFilter(person, person).description = (
1152
self._do_premerge(duplicate, person)
1153
login_person(person)
1154
duplicate, person = self._do_merge(duplicate, person)
1155
# The merged person still has the original subscription, as shown
1156
# by the marker name.
1158
target.getSubscription(person).bug_filters[0].description,
1160
# The conflicting subscription on the duplicate has been deleted.
1161
self.assertTrue(target.getSubscription(duplicate) is None)
1163
def test_merge_with_product_subscription(self):
1164
# See comments in assertSubscriptionMerges.
1165
self.assertSubscriptionMerges(self.factory.makeProduct())
1167
def test_merge_with_conflicting_product_subscription(self):
1168
# See comments in assertConflictingSubscriptionDeletes.
1169
self.assertConflictingSubscriptionDeletes(self.factory.makeProduct())
1171
def test_merge_with_project_subscription(self):
1172
# See comments in assertSubscriptionMerges.
1173
self.assertSubscriptionMerges(self.factory.makeProject())
1175
def test_merge_with_conflicting_project_subscription(self):
1176
# See comments in assertConflictingSubscriptionDeletes.
1177
self.assertConflictingSubscriptionDeletes(self.factory.makeProject())
1179
def test_merge_with_distroseries_subscription(self):
1180
# See comments in assertSubscriptionMerges.
1181
self.assertSubscriptionMerges(self.factory.makeDistroSeries())
1183
def test_merge_with_conflicting_distroseries_subscription(self):
1184
# See comments in assertConflictingSubscriptionDeletes.
1185
self.assertConflictingSubscriptionDeletes(
1186
self.factory.makeDistroSeries())
1188
def test_merge_with_milestone_subscription(self):
1189
# See comments in assertSubscriptionMerges.
1190
self.assertSubscriptionMerges(self.factory.makeMilestone())
1192
def test_merge_with_conflicting_milestone_subscription(self):
1193
# See comments in assertConflictingSubscriptionDeletes.
1194
self.assertConflictingSubscriptionDeletes(
1195
self.factory.makeMilestone())
1197
def test_merge_with_productseries_subscription(self):
1198
# See comments in assertSubscriptionMerges.
1199
self.assertSubscriptionMerges(self.factory.makeProductSeries())
1201
def test_merge_with_conflicting_productseries_subscription(self):
1202
# See comments in assertConflictingSubscriptionDeletes.
1203
self.assertConflictingSubscriptionDeletes(
1204
self.factory.makeProductSeries())
1206
def test_merge_with_distribution_subscription(self):
1207
# See comments in assertSubscriptionMerges.
1208
self.assertSubscriptionMerges(self.factory.makeDistribution())
1210
def test_merge_with_conflicting_distribution_subscription(self):
1211
# See comments in assertConflictingSubscriptionDeletes.
1212
self.assertConflictingSubscriptionDeletes(
1213
self.factory.makeDistribution())
1215
def test_merge_with_sourcepackage_subscription(self):
1216
# See comments in assertSubscriptionMerges.
1217
dsp = self.factory.makeDistributionSourcePackage()
1218
self.assertSubscriptionMerges(dsp)
1220
def test_merge_with_conflicting_sourcepackage_subscription(self):
1221
# See comments in assertConflictingSubscriptionDeletes.
1222
dsp = self.factory.makeDistributionSourcePackage()
1223
self.assertConflictingSubscriptionDeletes(dsp)
1225
def test_merge_accesspolicygrants(self):
1226
# AccessPolicyGrants are transferred from the duplicate.
1227
person = self.factory.makePerson()
1228
grant = self.factory.makeAccessPolicyGrant()
1229
self._do_premerge(grant.grantee, person)
1230
with person_logged_in(person):
1231
self._do_merge(grant.grantee, person)
1232
self.assertEqual(person, grant.grantee)
1234
def test_merge_accesspolicygrants_conflicts(self):
1235
# Conflicting AccessPolicyGrants are deleted.
1236
policy = self.factory.makeAccessPolicy()
1238
person = self.factory.makePerson()
1239
person_grantor = self.factory.makePerson()
1240
person_grant = self.factory.makeAccessPolicyGrant(
1241
grantee=person, grantor=person_grantor, object=policy)
1243
duplicate = self.factory.makePerson()
1244
duplicate_grantor = self.factory.makePerson()
1245
duplicate_grant = self.factory.makeAccessPolicyGrant(
1246
grantee=duplicate, grantor=duplicate_grantor, object=policy)
1248
self._do_premerge(duplicate, person)
1249
with person_logged_in(person):
1250
self._do_merge(duplicate, person)
1251
transaction.commit()
1253
self.assertEqual(person, person_grant.grantee)
1254
self.assertEqual(person_grantor, person_grant.grantor)
1257
IStore(AccessPolicyGrant).get(
1258
AccessPolicyGrant, duplicate_grant.id))
1260
def test_mergeAsync(self):
1261
# mergeAsync() creates a new `PersonMergeJob`.
1262
from_person = self.factory.makePerson()
1263
to_person = self.factory.makePerson()
1264
login_person(from_person)
1265
job = self.person_set.mergeAsync(from_person, to_person)
1266
self.assertEqual(from_person, job.from_person)
1267
self.assertEqual(to_person, job.to_person)
1270
class TestPersonSetCreateByOpenId(TestCaseWithFactory):
1271
layer = DatabaseFunctionalLayer
1274
super(TestPersonSetCreateByOpenId, self).setUp()
1275
self.person_set = getUtility(IPersonSet)
1276
self.store = IMasterStore(Account)
1278
# Generate some valid test data.
1279
self.account = self.makeAccount()
1280
self.identifier = self.makeOpenIdIdentifier(self.account, u'whatever')
1281
self.person = self.makePerson(self.account)
1282
self.email = self.makeEmailAddress(
1283
email='whatever@example.com', person=self.person)
1285
def makeAccount(self):
1286
return self.store.add(Account(
1287
displayname='Displayname',
1288
creation_rationale=AccountCreationRationale.UNKNOWN,
1289
status=AccountStatus.ACTIVE))
1291
def makeOpenIdIdentifier(self, account, identifier):
1292
openid_identifier = OpenIdIdentifier()
1293
openid_identifier.identifier = identifier
1294
openid_identifier.account = account
1295
return self.store.add(openid_identifier)
1297
def makePerson(self, account):
1298
return self.store.add(Person(
1299
name='acc%d' % account.id, account=account,
1300
displayname='Displayname',
1301
creation_rationale=PersonCreationRationale.UNKNOWN))
1303
def makeEmailAddress(self, email, person):
1304
return self.store.add(EmailAddress(
1306
account=person.account,
1308
status=EmailAddressStatus.PREFERRED))
1310
def testAllValid(self):
1311
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1312
self.identifier.identifier, self.email.email, 'Ignored Name',
1313
PersonCreationRationale.UNKNOWN, 'No Comment')
1314
found = removeSecurityProxy(found)
1316
self.assertIs(False, updated)
1317
self.assertIs(self.person, found)
1318
self.assertIs(self.account, found.account)
1319
self.assertIs(self.email, found.preferredemail)
1320
self.assertIs(self.email.account, self.account)
1321
self.assertIs(self.email.person, self.person)
1323
[self.identifier], list(self.account.openid_identifiers))
1325
def testEmailAddressCaseInsensitive(self):
1326
# As per testAllValid, but the email address used for the lookup
1327
# is all upper case.
1328
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1329
self.identifier.identifier, self.email.email.upper(),
1330
'Ignored Name', PersonCreationRationale.UNKNOWN, 'No Comment')
1331
found = removeSecurityProxy(found)
1333
self.assertIs(False, updated)
1334
self.assertIs(self.person, found)
1335
self.assertIs(self.account, found.account)
1336
self.assertIs(self.email, found.preferredemail)
1337
self.assertIs(self.email.account, self.account)
1338
self.assertIs(self.email.person, self.person)
1340
[self.identifier], list(self.account.openid_identifiers))
1342
def testNewOpenId(self):
1343
# Account looked up by email and the new OpenId identifier
1344
# attached. We can do this because we trust our OpenId Provider.
1345
new_identifier = u'newident'
1346
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1347
new_identifier, self.email.email, 'Ignored Name',
1348
PersonCreationRationale.UNKNOWN, 'No Comment')
1349
found = removeSecurityProxy(found)
1351
self.assertIs(True, updated)
1352
self.assertIs(self.person, found)
1353
self.assertIs(self.account, found.account)
1354
self.assertIs(self.email, found.preferredemail)
1355
self.assertIs(self.email.account, self.account)
1356
self.assertIs(self.email.person, self.person)
1358
# Old OpenId Identifier still attached.
1359
self.assertIn(self.identifier, list(self.account.openid_identifiers))
1361
# So is our new one.
1363
identifier.identifier for identifier
1364
in self.account.openid_identifiers]
1365
self.assertIn(new_identifier, identifiers)
1367
def testNewEmailAddress(self):
1368
# Account looked up by OpenId identifier and new EmailAddress
1369
# attached. We can do this because we trust our OpenId Provider.
1370
new_email = u'new_email@example.com'
1371
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1372
self.identifier.identifier, new_email, 'Ignored Name',
1373
PersonCreationRationale.UNKNOWN, 'No Comment')
1374
found = removeSecurityProxy(found)
1376
self.assertIs(True, updated)
1377
self.assertIs(self.person, found)
1378
self.assertIs(self.account, found.account)
1380
[self.identifier], list(self.account.openid_identifiers))
1382
# The old email address is still there and correctly linked.
1383
self.assertIs(self.email, found.preferredemail)
1384
self.assertIs(self.email.account, self.account)
1385
self.assertIs(self.email.person, self.person)
1387
# The new email address is there too and correctly linked.
1388
new_email = self.store.find(EmailAddress, email=new_email).one()
1389
self.assertIs(new_email.account, self.account)
1390
self.assertIs(new_email.person, self.person)
1391
self.assertEqual(EmailAddressStatus.NEW, new_email.status)
1393
def testNewAccountAndIdentifier(self):
1394
# If neither the OpenId Identifier nor the email address are
1395
# found, we create everything.
1396
new_email = u'new_email@example.com'
1397
new_identifier = u'new_identifier'
1398
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1399
new_identifier, new_email, 'New Name',
1400
PersonCreationRationale.UNKNOWN, 'No Comment')
1401
found = removeSecurityProxy(found)
1403
# We have a new Person
1404
self.assertIs(True, updated)
1405
self.assertIsNot(None, found)
1407
# It is correctly linked to an account, emailaddress and
1409
self.assertIs(found, found.preferredemail.person)
1410
self.assertIs(found.account, found.preferredemail.account)
1412
new_identifier, found.account.openid_identifiers.any().identifier)
1414
def testNoPerson(self):
1415
# If the account is not linked to a Person, create one. ShipIt
1416
# users fall into this category the first time they log into
1418
self.email.person = None
1419
self.person.account = None
1421
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1422
self.identifier.identifier, self.email.email, 'New Name',
1423
PersonCreationRationale.UNKNOWN, 'No Comment')
1424
found = removeSecurityProxy(found)
1426
# We have a new Person
1427
self.assertIs(True, updated)
1428
self.assertIsNot(self.person, found)
1430
# It is correctly linked to an account, emailaddress and
1432
self.assertIs(found, found.preferredemail.person)
1433
self.assertIs(found.account, found.preferredemail.account)
1434
self.assertIn(self.identifier, list(found.account.openid_identifiers))
1436
def testNoAccount(self):
1437
# EmailAddress is linked to a Person, but there is no Account.
1438
# Convert this stub into something valid.
1439
self.email.account = None
1440
self.email.status = EmailAddressStatus.NEW
1441
self.person.account = None
1442
new_identifier = u'new_identifier'
1443
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1444
new_identifier, self.email.email, 'Ignored',
1445
PersonCreationRationale.UNKNOWN, 'No Comment')
1446
found = removeSecurityProxy(found)
1448
self.assertIs(True, updated)
1450
self.assertIsNot(None, found.account)
1452
new_identifier, found.account.openid_identifiers.any().identifier)
1453
self.assertIs(self.email.person, found)
1454
self.assertIs(self.email.account, found.account)
1455
self.assertEqual(EmailAddressStatus.PREFERRED, self.email.status)
1457
def testMovedEmailAddress(self):
1458
# The EmailAddress and OpenId Identifier are both in the
1459
# database, but they are not linked to the same account. The
1460
# identifier needs to be relinked to the correct account - the
1461
# user able to log into the trusted SSO with that email address
1462
# should be able to log into Launchpad with that email address.
1463
# This lets us cope with the SSO migrating email addresses
1464
# between SSO accounts.
1465
self.identifier.account = self.store.find(
1466
Account, displayname='Foo Bar').one()
1468
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1469
self.identifier.identifier, self.email.email, 'New Name',
1470
PersonCreationRationale.UNKNOWN, 'No Comment')
1471
found = removeSecurityProxy(found)
1473
self.assertIs(True, updated)
1474
self.assertIs(self.person, found)
1476
self.assertIs(found.account, self.identifier.account)
1477
self.assertIn(self.identifier, list(found.account.openid_identifiers))
1480
class TestCreatePersonAndEmail(TestCase):
1481
"""Test `IPersonSet`.createPersonAndEmail()."""
1482
layer = DatabaseFunctionalLayer
1485
TestCase.setUp(self)
1487
self.addCleanup(logout)
1488
self.person_set = getUtility(IPersonSet)
1490
def test_duplicated_name_not_accepted(self):
1491
self.person_set.createPersonAndEmail(
1492
'testing@example.com', PersonCreationRationale.UNKNOWN,
1495
NameAlreadyTaken, self.person_set.createPersonAndEmail,
1496
'testing2@example.com', PersonCreationRationale.UNKNOWN,
1499
def test_duplicated_email_not_accepted(self):
1500
self.person_set.createPersonAndEmail(
1501
'testing@example.com', PersonCreationRationale.UNKNOWN)
1503
EmailAddressAlreadyTaken, self.person_set.createPersonAndEmail,
1504
'testing@example.com', PersonCreationRationale.UNKNOWN)
1506
def test_invalid_email_not_accepted(self):
1508
InvalidEmailAddress, self.person_set.createPersonAndEmail,
1509
'testing@.com', PersonCreationRationale.UNKNOWN)
1511
def test_invalid_name_not_accepted(self):
1513
InvalidName, self.person_set.createPersonAndEmail,
1514
'testing@example.com', PersonCreationRationale.UNKNOWN,
1518
719
class TestPersonRelatedBugTaskSearch(TestCaseWithFactory):
1520
721
layer = DatabaseFunctionalLayer