736
716
self.assertEqual('(\\u0170-tester)>', displayname)
739
class TestPersonSet(TestCaseWithFactory):
740
"""Test `IPersonSet`."""
741
layer = DatabaseFunctionalLayer
744
super(TestPersonSet, self).setUp()
746
self.addCleanup(logout)
747
self.person_set = getUtility(IPersonSet)
749
def test_isNameBlacklisted(self):
751
"INSERT INTO NameBlacklist(id, regexp) VALUES (-100, 'foo')")
752
self.failUnless(self.person_set.isNameBlacklisted('foo'))
753
self.failIf(self.person_set.isNameBlacklisted('bar'))
755
def test_isNameBlacklisted_user_is_admin(self):
756
team = self.factory.makeTeam()
757
name_blacklist_set = getUtility(INameBlacklistSet)
758
self.admin_exp = name_blacklist_set.create(u'fnord', admin=team)
759
self.store = IStore(self.admin_exp)
761
user = team.teamowner
762
self.assertFalse(self.person_set.isNameBlacklisted('fnord', user))
764
def test_getByEmail_ignores_case_and_whitespace(self):
765
person1_email = 'foo.bar@canonical.com'
766
person1 = self.person_set.getByEmail(person1_email)
769
"PersonSet.getByEmail() could not find %r" % person1_email)
771
person2 = self.person_set.getByEmail(' foo.BAR@canonICAL.com ')
774
"PersonSet.getByEmail() should ignore case and whitespace.")
775
self.assertEqual(person1, person2)
777
def test_getPrecachedPersonsFromIDs(self):
778
# The getPrecachedPersonsFromIDs() method should only make one
779
# query to load all the extraneous data. Accessing the
780
# attributes should then cause zero queries.
782
self.factory.makePerson().id
785
with StormStatementRecorder() as recorder:
786
persons = list(self.person_set.getPrecachedPersonsFromIDs(
787
person_ids, need_karma=True, need_ubuntu_coc=True,
788
need_location=True, need_archive=True,
789
need_preferred_email=True, need_validity=True))
790
self.assertThat(recorder, HasQueryCount(LessThan(2)))
792
with StormStatementRecorder() as recorder:
793
for person in persons:
794
person.is_valid_person
796
person.is_ubuntu_coc_signer
799
person.preferredemail
800
self.assertThat(recorder, HasQueryCount(LessThan(1)))
803
class KarmaTestMixin:
804
"""Helper methods for setting karma."""
806
def _makeKarmaCache(self, person, product, category_name_values):
807
"""Create a KarmaCache entry with the given arguments.
809
In order to create the KarmaCache record we must switch to the DB
810
user 'karma'. This invalidates the objects under test so they
811
must be retrieved again.
813
with dbuser('karma'):
815
# Insert category total for person and project.
816
for category_name, value in category_name_values:
817
category = KarmaCategory.byName(category_name)
818
self.cache_manager.new(
819
value, person.id, category.id, product_id=product.id)
821
# Insert total cache for person and project.
822
self.cache_manager.new(
823
total, person.id, None, product_id=product.id)
825
def _makeKarmaTotalCache(self, person, total):
826
"""Create a KarmaTotalCache entry.
828
In order to create the KarmaTotalCache record we must switch to the DB
829
user 'karma'. This invalidates the objects under test so they
830
must be retrieved again.
832
with dbuser('karma'):
833
KarmaTotalCache(person=person.id, karma_total=total)
836
class TestPersonSetMerge(TestCaseWithFactory, KarmaTestMixin):
837
"""Test cases for PersonSet merge."""
839
layer = DatabaseFunctionalLayer
842
super(TestPersonSetMerge, self).setUp()
843
self.person_set = getUtility(IPersonSet)
845
def _do_premerge(self, from_person, to_person):
846
# Do the pre merge work performed by the LoginToken.
847
with celebrity_logged_in('admin'):
848
email = from_person.preferredemail
849
email.status = EmailAddressStatus.NEW
850
email.person = to_person
851
email.account = to_person.account
854
def _do_merge(self, from_person, to_person, reviewer=None):
855
# Perform the merge as the db user that will be used by the jobs.
856
with dbuser(config.IPersonMergeJobSource.dbuser):
857
self.person_set.merge(from_person, to_person, reviewer=reviewer)
858
return from_person, to_person
860
def _get_testable_account(self, person, date_created, openid_identifier):
861
# Return a naked account with predictable attributes.
862
account = removeSecurityProxy(person.account)
863
account.date_created = date_created
864
account.openid_identifier = openid_identifier
867
def test_delete_no_notifications(self):
868
team = self.factory.makeTeam()
869
owner = team.teamowner
871
with dbuser(config.IPersonMergeJobSource.dbuser):
872
self.person_set.delete(team, owner)
873
notification_set = getUtility(IPersonNotificationSet)
874
notifications = notification_set.getNotificationsToSend()
875
self.assertEqual(0, notifications.count())
877
def test_openid_identifiers(self):
878
# Verify that OpenId Identifiers are merged.
879
duplicate = self.factory.makePerson()
880
duplicate_identifier = removeSecurityProxy(
881
duplicate.account).openid_identifiers.any().identifier
882
person = self.factory.makePerson()
883
person_identifier = removeSecurityProxy(
884
person.account).openid_identifiers.any().identifier
885
self._do_premerge(duplicate, person)
887
duplicate, person = self._do_merge(duplicate, person)
890
removeSecurityProxy(duplicate.account).openid_identifiers.count())
892
merged_identifiers = [
893
identifier.identifier for identifier in
894
removeSecurityProxy(person.account).openid_identifiers]
896
self.assertIn(duplicate_identifier, merged_identifiers)
897
self.assertIn(person_identifier, merged_identifiers)
899
def test_karmacache_transferred_to_user_has_no_karma(self):
900
# Verify that the merged user has no KarmaCache entries,
901
# and the karma total was transfered.
902
self.cache_manager = getUtility(IKarmaCacheManager)
903
product = self.factory.makeProduct()
904
duplicate = self.factory.makePerson()
905
self._makeKarmaCache(
906
duplicate, product, [('bugs', 10)])
907
self._makeKarmaTotalCache(duplicate, 15)
908
# The karma changes invalidated duplicate instance.
909
duplicate = self.person_set.get(duplicate.id)
910
person = self.factory.makePerson()
911
self._do_premerge(duplicate, person)
913
duplicate, person = self._do_merge(duplicate, person)
914
self.assertEqual([], duplicate.karma_category_caches)
915
self.assertEqual(0, duplicate.karma)
916
self.assertEqual(15, person.karma)
918
def test_karmacache_transferred_to_user_has_karma(self):
919
# Verify that the merged user has no KarmaCache entries,
920
# and the karma total was summed.
921
self.cache_manager = getUtility(IKarmaCacheManager)
922
product = self.factory.makeProduct()
923
duplicate = self.factory.makePerson()
924
self._makeKarmaCache(
925
duplicate, product, [('bugs', 10)])
926
self._makeKarmaTotalCache(duplicate, 15)
927
person = self.factory.makePerson()
928
self._makeKarmaCache(
929
person, product, [('bugs', 9)])
930
self._makeKarmaTotalCache(person, 13)
931
# The karma changes invalidated duplicate and person instances.
932
duplicate = self.person_set.get(duplicate.id)
933
person = self.person_set.get(person.id)
934
self._do_premerge(duplicate, person)
936
duplicate, person = self._do_merge(duplicate, person)
937
self.assertEqual([], duplicate.karma_category_caches)
938
self.assertEqual(0, duplicate.karma)
939
self.assertEqual(28, person.karma)
941
def test_person_date_created_preserved(self):
942
# Verify that the oldest datecreated is merged.
943
person = self.factory.makePerson()
944
duplicate = self.factory.makePerson()
945
oldest_date = datetime(
946
2005, 11, 25, 0, 0, 0, 0, pytz.timezone('UTC'))
947
removeSecurityProxy(duplicate).datecreated = oldest_date
948
self._do_premerge(duplicate, person)
950
duplicate, person = self._do_merge(duplicate, person)
951
self.assertEqual(oldest_date, person.datecreated)
953
def test_team_with_active_mailing_list_raises_error(self):
954
# A team with an active mailing list cannot be merged.
955
target_team = self.factory.makeTeam()
956
test_team = self.factory.makeTeam()
957
self.factory.makeMailingList(
958
test_team, test_team.teamowner)
960
AssertionError, self.person_set.merge, test_team, target_team)
962
def test_team_with_inactive_mailing_list(self):
963
# A team with an inactive mailing list can be merged.
964
target_team = self.factory.makeTeam()
965
test_team = self.factory.makeTeam()
966
mailing_list = self.factory.makeMailingList(
967
test_team, test_team.teamowner)
968
mailing_list.deactivate()
969
mailing_list.transitionToStatus(MailingListStatus.INACTIVE)
970
test_team, target_team = self._do_merge(
971
test_team, target_team, test_team.teamowner)
972
self.assertEqual(target_team, test_team.merged)
974
MailingListStatus.PURGED, test_team.mailing_list.status)
975
emails = getUtility(IEmailAddressSet).getByPerson(target_team).count()
976
self.assertEqual(0, emails)
978
def test_team_with_purged_mailing_list(self):
979
# A team with a purges mailing list can be merged.
980
target_team = self.factory.makeTeam()
981
test_team = self.factory.makeTeam()
982
mailing_list = self.factory.makeMailingList(
983
test_team, test_team.teamowner)
984
mailing_list.deactivate()
985
mailing_list.transitionToStatus(MailingListStatus.INACTIVE)
987
test_team, target_team = self._do_merge(
988
test_team, target_team, test_team.teamowner)
989
self.assertEqual(target_team, test_team.merged)
991
def test_team_with_members(self):
992
# Team members are removed before merging.
993
target_team = self.factory.makeTeam()
994
test_team = self.factory.makeTeam()
995
former_member = self.factory.makePerson()
996
with person_logged_in(test_team.teamowner):
997
test_team.addMember(former_member, test_team.teamowner)
998
test_team, target_team = self._do_merge(
999
test_team, target_team, test_team.teamowner)
1000
self.assertEqual(target_team, test_team.merged)
1001
self.assertEqual([], list(former_member.super_teams))
1003
def test_team_without_super_teams_is_fine(self):
1004
# A team with no members and no super teams
1005
# merges without errors.
1006
test_team = self.factory.makeTeam()
1007
target_team = self.factory.makeTeam()
1008
login_person(test_team.teamowner)
1009
self._do_merge(test_team, target_team, test_team.teamowner)
1011
def test_team_with_super_teams(self):
1012
# A team with superteams can be merged, but the memberships
1013
# are not transferred.
1014
test_team = self.factory.makeTeam()
1015
super_team = self.factory.makeTeam()
1016
target_team = self.factory.makeTeam()
1017
login_person(test_team.teamowner)
1018
test_team.join(super_team, test_team.teamowner)
1019
test_team, target_team = self._do_merge(
1020
test_team, target_team, test_team.teamowner)
1021
self.assertEqual(target_team, test_team.merged)
1022
self.assertEqual([], list(target_team.super_teams))
1024
def test_merge_moves_branches(self):
1025
# When person/teams are merged, branches owned by the from person
1027
person = self.factory.makePerson()
1028
branch = self.factory.makeBranch()
1029
duplicate = branch.owner
1030
self._do_premerge(branch.owner, person)
1031
login_person(person)
1032
duplicate, person = self._do_merge(duplicate, person)
1033
branches = person.getBranches()
1034
self.assertEqual(1, branches.count())
1036
def test_merge_with_duplicated_branches(self):
1037
# If both the from and to people have branches with the same name,
1038
# merging renames the duplicate from the from person's side.
1039
product = self.factory.makeProduct()
1040
from_branch = self.factory.makeBranch(name='foo', product=product)
1041
to_branch = self.factory.makeBranch(name='foo', product=product)
1042
mergee = to_branch.owner
1043
duplicate = from_branch.owner
1044
self._do_premerge(duplicate, mergee)
1045
login_person(mergee)
1046
duplicate, mergee = self._do_merge(duplicate, mergee)
1047
branches = [b.name for b in mergee.getBranches()]
1048
self.assertEqual(2, len(branches))
1049
self.assertContentEqual([u'foo', u'foo-1'], branches)
1051
def test_merge_moves_recipes(self):
1052
# When person/teams are merged, recipes owned by the from person are
1054
person = self.factory.makePerson()
1055
recipe = self.factory.makeSourcePackageRecipe()
1056
duplicate = recipe.owner
1057
# Delete the PPA, which is required for the merge to work.
1058
with person_logged_in(duplicate):
1059
recipe.owner.archive.status = ArchiveStatus.DELETED
1060
self._do_premerge(duplicate, person)
1061
login_person(person)
1062
duplicate, person = self._do_merge(duplicate, person)
1063
self.assertEqual(1, person.recipes.count())
1065
def test_merge_with_duplicated_recipes(self):
1066
# If both the from and to people have recipes with the same name,
1067
# merging renames the duplicate from the from person's side.
1068
merge_from = self.factory.makeSourcePackageRecipe(
1069
name=u'foo', description=u'FROM')
1070
merge_to = self.factory.makeSourcePackageRecipe(
1071
name=u'foo', description=u'TO')
1072
duplicate = merge_from.owner
1073
mergee = merge_to.owner
1074
# Delete merge_from's PPA, which is required for the merge to work.
1075
with person_logged_in(merge_from.owner):
1076
merge_from.owner.archive.status = ArchiveStatus.DELETED
1077
self._do_premerge(merge_from.owner, mergee)
1078
login_person(mergee)
1079
duplicate, mergee = self._do_merge(duplicate, mergee)
1080
recipes = mergee.recipes
1081
self.assertEqual(2, recipes.count())
1082
descriptions = [r.description for r in recipes]
1083
self.assertEqual([u'TO', u'FROM'], descriptions)
1084
self.assertEqual(u'foo-1', recipes[1].name)
1086
def assertSubscriptionMerges(self, target):
1087
# Given a subscription target, we want to make sure that subscriptions
1088
# that the duplicate person made are carried over to the merged
1090
duplicate = self.factory.makePerson()
1091
with person_logged_in(duplicate):
1092
target.addSubscription(duplicate, duplicate)
1093
person = self.factory.makePerson()
1094
self._do_premerge(duplicate, person)
1095
login_person(person)
1096
duplicate, person = self._do_merge(duplicate, person)
1097
# The merged person has the subscription, and the duplicate person
1099
self.assertTrue(target.getSubscription(person) is not None)
1100
self.assertTrue(target.getSubscription(duplicate) is None)
1102
def assertConflictingSubscriptionDeletes(self, target):
1103
# Given a subscription target, we want to make sure that subscriptions
1104
# that the duplicate person made that conflict with existing
1105
# subscriptions in the merged account are deleted.
1106
duplicate = self.factory.makePerson()
1107
person = self.factory.makePerson()
1108
with person_logged_in(duplicate):
1109
target.addSubscription(duplicate, duplicate)
1110
with person_logged_in(person):
1111
# The description lets us show that we still have the right
1112
# subscription later.
1113
target.addBugSubscriptionFilter(person, person).description = (
1115
self._do_premerge(duplicate, person)
1116
login_person(person)
1117
duplicate, person = self._do_merge(duplicate, person)
1118
# The merged person still has the original subscription, as shown
1119
# by the marker name.
1121
target.getSubscription(person).bug_filters[0].description,
1123
# The conflicting subscription on the duplicate has been deleted.
1124
self.assertTrue(target.getSubscription(duplicate) is None)
1126
def test_merge_with_product_subscription(self):
1127
# See comments in assertSubscriptionMerges.
1128
self.assertSubscriptionMerges(self.factory.makeProduct())
1130
def test_merge_with_conflicting_product_subscription(self):
1131
# See comments in assertConflictingSubscriptionDeletes.
1132
self.assertConflictingSubscriptionDeletes(self.factory.makeProduct())
1134
def test_merge_with_project_subscription(self):
1135
# See comments in assertSubscriptionMerges.
1136
self.assertSubscriptionMerges(self.factory.makeProject())
1138
def test_merge_with_conflicting_project_subscription(self):
1139
# See comments in assertConflictingSubscriptionDeletes.
1140
self.assertConflictingSubscriptionDeletes(self.factory.makeProject())
1142
def test_merge_with_distroseries_subscription(self):
1143
# See comments in assertSubscriptionMerges.
1144
self.assertSubscriptionMerges(self.factory.makeDistroSeries())
1146
def test_merge_with_conflicting_distroseries_subscription(self):
1147
# See comments in assertConflictingSubscriptionDeletes.
1148
self.assertConflictingSubscriptionDeletes(
1149
self.factory.makeDistroSeries())
1151
def test_merge_with_milestone_subscription(self):
1152
# See comments in assertSubscriptionMerges.
1153
self.assertSubscriptionMerges(self.factory.makeMilestone())
1155
def test_merge_with_conflicting_milestone_subscription(self):
1156
# See comments in assertConflictingSubscriptionDeletes.
1157
self.assertConflictingSubscriptionDeletes(
1158
self.factory.makeMilestone())
1160
def test_merge_with_productseries_subscription(self):
1161
# See comments in assertSubscriptionMerges.
1162
self.assertSubscriptionMerges(self.factory.makeProductSeries())
1164
def test_merge_with_conflicting_productseries_subscription(self):
1165
# See comments in assertConflictingSubscriptionDeletes.
1166
self.assertConflictingSubscriptionDeletes(
1167
self.factory.makeProductSeries())
1169
def test_merge_with_distribution_subscription(self):
1170
# See comments in assertSubscriptionMerges.
1171
self.assertSubscriptionMerges(self.factory.makeDistribution())
1173
def test_merge_with_conflicting_distribution_subscription(self):
1174
# See comments in assertConflictingSubscriptionDeletes.
1175
self.assertConflictingSubscriptionDeletes(
1176
self.factory.makeDistribution())
1178
def test_merge_with_sourcepackage_subscription(self):
1179
# See comments in assertSubscriptionMerges.
1180
dsp = self.factory.makeDistributionSourcePackage()
1181
self.assertSubscriptionMerges(dsp)
1183
def test_merge_with_conflicting_sourcepackage_subscription(self):
1184
# See comments in assertConflictingSubscriptionDeletes.
1185
dsp = self.factory.makeDistributionSourcePackage()
1186
self.assertConflictingSubscriptionDeletes(dsp)
1188
def test_merge_accesspolicygrants(self):
1189
# AccessPolicyGrants are transferred from the duplicate.
1190
person = self.factory.makePerson()
1191
grant = self.factory.makeAccessPolicyGrant()
1192
self._do_premerge(grant.grantee, person)
1193
with person_logged_in(person):
1194
self._do_merge(grant.grantee, person)
1195
self.assertEqual(person, grant.grantee)
1197
def test_merge_accesspolicygrants_conflicts(self):
1198
# Conflicting AccessPolicyGrants are deleted.
1199
policy = self.factory.makeAccessPolicy()
1201
person = self.factory.makePerson()
1202
person_grantor = self.factory.makePerson()
1203
person_grant = self.factory.makeAccessPolicyGrant(
1204
grantee=person, grantor=person_grantor, object=policy)
1206
duplicate = self.factory.makePerson()
1207
duplicate_grantor = self.factory.makePerson()
1208
duplicate_grant = self.factory.makeAccessPolicyGrant(
1209
grantee=duplicate, grantor=duplicate_grantor, object=policy)
1211
self._do_premerge(duplicate, person)
1212
with person_logged_in(person):
1213
self._do_merge(duplicate, person)
1214
transaction.commit()
1216
self.assertEqual(person, person_grant.grantee)
1217
self.assertEqual(person_grantor, person_grant.grantor)
1220
IStore(AccessPolicyGrant).get(
1221
AccessPolicyGrant, duplicate_grant.id))
1223
def test_mergeAsync(self):
1224
# mergeAsync() creates a new `PersonMergeJob`.
1225
from_person = self.factory.makePerson()
1226
to_person = self.factory.makePerson()
1227
login_person(from_person)
1228
job = self.person_set.mergeAsync(from_person, to_person)
1229
self.assertEqual(from_person, job.from_person)
1230
self.assertEqual(to_person, job.to_person)
1233
class TestPersonSetCreateByOpenId(TestCaseWithFactory):
1234
layer = DatabaseFunctionalLayer
1237
super(TestPersonSetCreateByOpenId, self).setUp()
1238
self.person_set = getUtility(IPersonSet)
1239
self.store = IMasterStore(Account)
1241
# Generate some valid test data.
1242
self.account = self.makeAccount()
1243
self.identifier = self.makeOpenIdIdentifier(self.account, u'whatever')
1244
self.person = self.makePerson(self.account)
1245
self.email = self.makeEmailAddress(
1246
email='whatever@example.com', person=self.person)
1248
def makeAccount(self):
1249
return self.store.add(Account(
1250
displayname='Displayname',
1251
creation_rationale=AccountCreationRationale.UNKNOWN,
1252
status=AccountStatus.ACTIVE))
1254
def makeOpenIdIdentifier(self, account, identifier):
1255
openid_identifier = OpenIdIdentifier()
1256
openid_identifier.identifier = identifier
1257
openid_identifier.account = account
1258
return self.store.add(openid_identifier)
1260
def makePerson(self, account):
1261
return self.store.add(Person(
1262
name='acc%d' % account.id, account=account,
1263
displayname='Displayname',
1264
creation_rationale=PersonCreationRationale.UNKNOWN))
1266
def makeEmailAddress(self, email, person):
1267
return self.store.add(EmailAddress(
1269
account=person.account,
1271
status=EmailAddressStatus.PREFERRED))
1273
def testAllValid(self):
1274
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1275
self.identifier.identifier, self.email.email, 'Ignored Name',
1276
PersonCreationRationale.UNKNOWN, 'No Comment')
1277
found = removeSecurityProxy(found)
1279
self.assertIs(False, updated)
1280
self.assertIs(self.person, found)
1281
self.assertIs(self.account, found.account)
1282
self.assertIs(self.email, found.preferredemail)
1283
self.assertIs(self.email.account, self.account)
1284
self.assertIs(self.email.person, self.person)
1286
[self.identifier], list(self.account.openid_identifiers))
1288
def testEmailAddressCaseInsensitive(self):
1289
# As per testAllValid, but the email address used for the lookup
1290
# is all upper case.
1291
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1292
self.identifier.identifier, self.email.email.upper(),
1293
'Ignored Name', PersonCreationRationale.UNKNOWN, 'No Comment')
1294
found = removeSecurityProxy(found)
1296
self.assertIs(False, updated)
1297
self.assertIs(self.person, found)
1298
self.assertIs(self.account, found.account)
1299
self.assertIs(self.email, found.preferredemail)
1300
self.assertIs(self.email.account, self.account)
1301
self.assertIs(self.email.person, self.person)
1303
[self.identifier], list(self.account.openid_identifiers))
1305
def testNewOpenId(self):
1306
# Account looked up by email and the new OpenId identifier
1307
# attached. We can do this because we trust our OpenId Provider.
1308
new_identifier = u'newident'
1309
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1310
new_identifier, self.email.email, 'Ignored Name',
1311
PersonCreationRationale.UNKNOWN, 'No Comment')
1312
found = removeSecurityProxy(found)
1314
self.assertIs(True, updated)
1315
self.assertIs(self.person, found)
1316
self.assertIs(self.account, found.account)
1317
self.assertIs(self.email, found.preferredemail)
1318
self.assertIs(self.email.account, self.account)
1319
self.assertIs(self.email.person, self.person)
1321
# Old OpenId Identifier still attached.
1322
self.assertIn(self.identifier, list(self.account.openid_identifiers))
1324
# So is our new one.
1326
identifier.identifier for identifier
1327
in self.account.openid_identifiers]
1328
self.assertIn(new_identifier, identifiers)
1330
def testNewEmailAddress(self):
1331
# Account looked up by OpenId identifier and new EmailAddress
1332
# attached. We can do this because we trust our OpenId Provider.
1333
new_email = u'new_email@example.com'
1334
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1335
self.identifier.identifier, new_email, 'Ignored Name',
1336
PersonCreationRationale.UNKNOWN, 'No Comment')
1337
found = removeSecurityProxy(found)
1339
self.assertIs(True, updated)
1340
self.assertIs(self.person, found)
1341
self.assertIs(self.account, found.account)
1343
[self.identifier], list(self.account.openid_identifiers))
1345
# The old email address is still there and correctly linked.
1346
self.assertIs(self.email, found.preferredemail)
1347
self.assertIs(self.email.account, self.account)
1348
self.assertIs(self.email.person, self.person)
1350
# The new email address is there too and correctly linked.
1351
new_email = self.store.find(EmailAddress, email=new_email).one()
1352
self.assertIs(new_email.account, self.account)
1353
self.assertIs(new_email.person, self.person)
1354
self.assertEqual(EmailAddressStatus.NEW, new_email.status)
1356
def testNewAccountAndIdentifier(self):
1357
# If neither the OpenId Identifier nor the email address are
1358
# found, we create everything.
1359
new_email = u'new_email@example.com'
1360
new_identifier = u'new_identifier'
1361
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1362
new_identifier, new_email, 'New Name',
1363
PersonCreationRationale.UNKNOWN, 'No Comment')
1364
found = removeSecurityProxy(found)
1366
# We have a new Person
1367
self.assertIs(True, updated)
1368
self.assertIsNot(None, found)
1370
# It is correctly linked to an account, emailaddress and
1372
self.assertIs(found, found.preferredemail.person)
1373
self.assertIs(found.account, found.preferredemail.account)
1375
new_identifier, found.account.openid_identifiers.any().identifier)
1377
def testNoPerson(self):
1378
# If the account is not linked to a Person, create one. ShipIt
1379
# users fall into this category the first time they log into
1381
self.email.person = None
1382
self.person.account = None
1384
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1385
self.identifier.identifier, self.email.email, 'New Name',
1386
PersonCreationRationale.UNKNOWN, 'No Comment')
1387
found = removeSecurityProxy(found)
1389
# We have a new Person
1390
self.assertIs(True, updated)
1391
self.assertIsNot(self.person, found)
1393
# It is correctly linked to an account, emailaddress and
1395
self.assertIs(found, found.preferredemail.person)
1396
self.assertIs(found.account, found.preferredemail.account)
1397
self.assertIn(self.identifier, list(found.account.openid_identifiers))
1399
def testNoAccount(self):
1400
# EmailAddress is linked to a Person, but there is no Account.
1401
# Convert this stub into something valid.
1402
self.email.account = None
1403
self.email.status = EmailAddressStatus.NEW
1404
self.person.account = None
1405
new_identifier = u'new_identifier'
1406
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1407
new_identifier, self.email.email, 'Ignored',
1408
PersonCreationRationale.UNKNOWN, 'No Comment')
1409
found = removeSecurityProxy(found)
1411
self.assertIs(True, updated)
1413
self.assertIsNot(None, found.account)
1415
new_identifier, found.account.openid_identifiers.any().identifier)
1416
self.assertIs(self.email.person, found)
1417
self.assertIs(self.email.account, found.account)
1418
self.assertEqual(EmailAddressStatus.PREFERRED, self.email.status)
1420
def testMovedEmailAddress(self):
1421
# The EmailAddress and OpenId Identifier are both in the
1422
# database, but they are not linked to the same account. The
1423
# identifier needs to be relinked to the correct account - the
1424
# user able to log into the trusted SSO with that email address
1425
# should be able to log into Launchpad with that email address.
1426
# This lets us cope with the SSO migrating email addresses
1427
# between SSO accounts.
1428
self.identifier.account = self.store.find(
1429
Account, displayname='Foo Bar').one()
1431
found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1432
self.identifier.identifier, self.email.email, 'New Name',
1433
PersonCreationRationale.UNKNOWN, 'No Comment')
1434
found = removeSecurityProxy(found)
1436
self.assertIs(True, updated)
1437
self.assertIs(self.person, found)
1439
self.assertIs(found.account, self.identifier.account)
1440
self.assertIn(self.identifier, list(found.account.openid_identifiers))
1443
class TestCreatePersonAndEmail(TestCase):
1444
"""Test `IPersonSet`.createPersonAndEmail()."""
1445
layer = DatabaseFunctionalLayer
1448
TestCase.setUp(self)
1450
self.addCleanup(logout)
1451
self.person_set = getUtility(IPersonSet)
1453
def test_duplicated_name_not_accepted(self):
1454
self.person_set.createPersonAndEmail(
1455
'testing@example.com', PersonCreationRationale.UNKNOWN,
1458
NameAlreadyTaken, self.person_set.createPersonAndEmail,
1459
'testing2@example.com', PersonCreationRationale.UNKNOWN,
1462
def test_duplicated_email_not_accepted(self):
1463
self.person_set.createPersonAndEmail(
1464
'testing@example.com', PersonCreationRationale.UNKNOWN)
1466
EmailAddressAlreadyTaken, self.person_set.createPersonAndEmail,
1467
'testing@example.com', PersonCreationRationale.UNKNOWN)
1469
def test_invalid_email_not_accepted(self):
1471
InvalidEmailAddress, self.person_set.createPersonAndEmail,
1472
'testing@.com', PersonCreationRationale.UNKNOWN)
1474
def test_invalid_name_not_accepted(self):
1476
InvalidName, self.person_set.createPersonAndEmail,
1477
'testing@example.com', PersonCreationRationale.UNKNOWN,
1481
719
class TestPersonRelatedBugTaskSearch(TestCaseWithFactory):
1483
721
layer = DatabaseFunctionalLayer