~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/registry/tests/test_person.py

  • Committer: Launchpad Patch Queue Manager
  • Date: 2012-01-06 19:56:39 UTC
  • mfrom: (14642.2.5 403-private-team-in-page)
  • Revision ID: launchpad@pqm.canonical.com-20120106195639-ix60p6xlxfwwom9c
[r=aduering][bug=911794] Do not assume the team is public when
        listing teams.

Show diffs side-by-side

added added

removed removed

Lines of Context:
5
5
 
6
6
from datetime import datetime
7
7
 
8
 
from lazr.lifecycle.snapshot import Snapshot
9
8
import pytz
 
9
 
10
10
from storm.store import Store
 
11
 
11
12
from testtools.matchers import (
12
13
    Equals,
13
14
    LessThan,
14
15
    )
15
 
import transaction
 
16
 
16
17
from zope.component import getUtility
17
18
from zope.interface import providedBy
18
19
from zope.security.interfaces import Unauthorized
19
20
from zope.security.proxy import removeSecurityProxy
20
21
 
 
22
from lazr.lifecycle.snapshot import Snapshot
 
23
 
21
24
from lp.answers.model.answercontact import AnswerContact
22
25
from lp.app.interfaces.launchpad import ILaunchpadCelebrities
23
26
from lp.blueprints.model.specification import Specification
25
28
from lp.bugs.model.bug import Bug
26
29
from lp.bugs.model.bugtask import get_related_bugtasks_search_params
27
30
from lp.registry.errors import (
28
 
    InvalidName,
29
 
    NameAlreadyTaken,
30
31
    PrivatePersonLinkageError,
31
32
    )
32
33
from lp.registry.interfaces.karma import IKarmaCacheManager
33
 
from lp.registry.interfaces.mailinglist import MailingListStatus
34
 
from lp.registry.interfaces.nameblacklist import INameBlacklistSet
35
34
from lp.registry.interfaces.person import (
36
35
    ImmutableVisibilityError,
37
36
    IPersonSet,
38
 
    PersonCreationRationale,
39
37
    PersonVisibility,
40
38
    )
41
 
from lp.registry.interfaces.personnotification import IPersonNotificationSet
42
39
from lp.registry.interfaces.pocket import PackagePublishingPocket
43
40
from lp.registry.interfaces.product import IProductSet
44
 
from lp.registry.model.accesspolicy import AccessPolicyGrant
45
41
from lp.registry.model.karma import (
46
42
    KarmaCategory,
47
43
    KarmaTotalCache,
50
46
    get_recipients,
51
47
    Person,
52
48
    )
53
 
from lp.services.config import config
54
 
from lp.services.database.lpstorm import (
55
 
    IMasterStore,
56
 
    IStore,
57
 
    )
58
 
from lp.services.database.sqlbase import cursor
59
49
from lp.services.identity.interfaces.account import (
60
 
    AccountCreationRationale,
61
50
    AccountStatus,
62
51
    )
63
52
from lp.services.identity.interfaces.emailaddress import (
64
 
    EmailAddressAlreadyTaken,
65
53
    EmailAddressStatus,
66
 
    IEmailAddressSet,
67
 
    InvalidEmailAddress,
68
54
    )
69
 
from lp.services.identity.model.account import Account
70
 
from lp.services.identity.model.emailaddress import EmailAddress
71
 
from lp.services.openid.model.openididentifier import OpenIdIdentifier
72
55
from lp.services.propertycache import clear_property_cache
73
56
from lp.soyuz.enums import (
74
57
    ArchivePurpose,
75
 
    ArchiveStatus,
76
58
    )
77
59
from lp.testing import (
78
 
    ANONYMOUS,
79
60
    celebrity_logged_in,
80
61
    login,
81
62
    login_person,
82
63
    logout,
83
64
    person_logged_in,
84
65
    StormStatementRecorder,
85
 
    TestCase,
86
66
    TestCaseWithFactory,
87
67
    )
88
68
from lp.testing._webservice import QueryCollector
736
716
        self.assertEqual('(\\u0170-tester)>', displayname)
737
717
 
738
718
 
739
 
class TestPersonSet(TestCaseWithFactory):
740
 
    """Test `IPersonSet`."""
741
 
    layer = DatabaseFunctionalLayer
742
 
 
743
 
    def setUp(self):
744
 
        super(TestPersonSet, self).setUp()
745
 
        login(ANONYMOUS)
746
 
        self.addCleanup(logout)
747
 
        self.person_set = getUtility(IPersonSet)
748
 
 
749
 
    def test_isNameBlacklisted(self):
750
 
        cursor().execute(
751
 
            "INSERT INTO NameBlacklist(id, regexp) VALUES (-100, 'foo')")
752
 
        self.failUnless(self.person_set.isNameBlacklisted('foo'))
753
 
        self.failIf(self.person_set.isNameBlacklisted('bar'))
754
 
 
755
 
    def test_isNameBlacklisted_user_is_admin(self):
756
 
        team = self.factory.makeTeam()
757
 
        name_blacklist_set = getUtility(INameBlacklistSet)
758
 
        self.admin_exp = name_blacklist_set.create(u'fnord', admin=team)
759
 
        self.store = IStore(self.admin_exp)
760
 
        self.store.flush()
761
 
        user = team.teamowner
762
 
        self.assertFalse(self.person_set.isNameBlacklisted('fnord', user))
763
 
 
764
 
    def test_getByEmail_ignores_case_and_whitespace(self):
765
 
        person1_email = 'foo.bar@canonical.com'
766
 
        person1 = self.person_set.getByEmail(person1_email)
767
 
        self.failIf(
768
 
            person1 is None,
769
 
            "PersonSet.getByEmail() could not find %r" % person1_email)
770
 
 
771
 
        person2 = self.person_set.getByEmail('  foo.BAR@canonICAL.com  ')
772
 
        self.failIf(
773
 
            person2 is None,
774
 
            "PersonSet.getByEmail() should ignore case and whitespace.")
775
 
        self.assertEqual(person1, person2)
776
 
 
777
 
    def test_getPrecachedPersonsFromIDs(self):
778
 
        # The getPrecachedPersonsFromIDs() method should only make one
779
 
        # query to load all the extraneous data. Accessing the
780
 
        # attributes should then cause zero queries.
781
 
        person_ids = [
782
 
            self.factory.makePerson().id
783
 
            for i in range(3)]
784
 
 
785
 
        with StormStatementRecorder() as recorder:
786
 
            persons = list(self.person_set.getPrecachedPersonsFromIDs(
787
 
                person_ids, need_karma=True, need_ubuntu_coc=True,
788
 
                need_location=True, need_archive=True,
789
 
                need_preferred_email=True, need_validity=True))
790
 
        self.assertThat(recorder, HasQueryCount(LessThan(2)))
791
 
 
792
 
        with StormStatementRecorder() as recorder:
793
 
            for person in persons:
794
 
                person.is_valid_person
795
 
                person.karma
796
 
                person.is_ubuntu_coc_signer
797
 
                person.location
798
 
                person.archive
799
 
                person.preferredemail
800
 
        self.assertThat(recorder, HasQueryCount(LessThan(1)))
801
 
 
802
 
 
803
 
class KarmaTestMixin:
804
 
    """Helper methods for setting karma."""
805
 
 
806
 
    def _makeKarmaCache(self, person, product, category_name_values):
807
 
        """Create a KarmaCache entry with the given arguments.
808
 
 
809
 
        In order to create the KarmaCache record we must switch to the DB
810
 
        user 'karma'. This invalidates the objects under test so they
811
 
        must be retrieved again.
812
 
        """
813
 
        with dbuser('karma'):
814
 
            total = 0
815
 
            # Insert category total for person and project.
816
 
            for category_name, value in category_name_values:
817
 
                category = KarmaCategory.byName(category_name)
818
 
                self.cache_manager.new(
819
 
                    value, person.id, category.id, product_id=product.id)
820
 
                total += value
821
 
            # Insert total cache for person and project.
822
 
            self.cache_manager.new(
823
 
                total, person.id, None, product_id=product.id)
824
 
 
825
 
    def _makeKarmaTotalCache(self, person, total):
826
 
        """Create a KarmaTotalCache entry.
827
 
 
828
 
        In order to create the KarmaTotalCache record we must switch to the DB
829
 
        user 'karma'. This invalidates the objects under test so they
830
 
        must be retrieved again.
831
 
        """
832
 
        with dbuser('karma'):
833
 
            KarmaTotalCache(person=person.id, karma_total=total)
834
 
 
835
 
 
836
 
class TestPersonSetMerge(TestCaseWithFactory, KarmaTestMixin):
837
 
    """Test cases for PersonSet merge."""
838
 
 
839
 
    layer = DatabaseFunctionalLayer
840
 
 
841
 
    def setUp(self):
842
 
        super(TestPersonSetMerge, self).setUp()
843
 
        self.person_set = getUtility(IPersonSet)
844
 
 
845
 
    def _do_premerge(self, from_person, to_person):
846
 
        # Do the pre merge work performed by the LoginToken.
847
 
        with celebrity_logged_in('admin'):
848
 
            email = from_person.preferredemail
849
 
            email.status = EmailAddressStatus.NEW
850
 
            email.person = to_person
851
 
            email.account = to_person.account
852
 
        transaction.commit()
853
 
 
854
 
    def _do_merge(self, from_person, to_person, reviewer=None):
855
 
        # Perform the merge as the db user that will be used by the jobs.
856
 
        with dbuser(config.IPersonMergeJobSource.dbuser):
857
 
            self.person_set.merge(from_person, to_person, reviewer=reviewer)
858
 
        return from_person, to_person
859
 
 
860
 
    def _get_testable_account(self, person, date_created, openid_identifier):
861
 
        # Return a naked account with predictable attributes.
862
 
        account = removeSecurityProxy(person.account)
863
 
        account.date_created = date_created
864
 
        account.openid_identifier = openid_identifier
865
 
        return account
866
 
 
867
 
    def test_delete_no_notifications(self):
868
 
        team = self.factory.makeTeam()
869
 
        owner = team.teamowner
870
 
        transaction.commit()
871
 
        with dbuser(config.IPersonMergeJobSource.dbuser):
872
 
            self.person_set.delete(team, owner)
873
 
        notification_set = getUtility(IPersonNotificationSet)
874
 
        notifications = notification_set.getNotificationsToSend()
875
 
        self.assertEqual(0, notifications.count())
876
 
 
877
 
    def test_openid_identifiers(self):
878
 
        # Verify that OpenId Identifiers are merged.
879
 
        duplicate = self.factory.makePerson()
880
 
        duplicate_identifier = removeSecurityProxy(
881
 
            duplicate.account).openid_identifiers.any().identifier
882
 
        person = self.factory.makePerson()
883
 
        person_identifier = removeSecurityProxy(
884
 
            person.account).openid_identifiers.any().identifier
885
 
        self._do_premerge(duplicate, person)
886
 
        login_person(person)
887
 
        duplicate, person = self._do_merge(duplicate, person)
888
 
        self.assertEqual(
889
 
            0,
890
 
            removeSecurityProxy(duplicate.account).openid_identifiers.count())
891
 
 
892
 
        merged_identifiers = [
893
 
            identifier.identifier for identifier in
894
 
                removeSecurityProxy(person.account).openid_identifiers]
895
 
 
896
 
        self.assertIn(duplicate_identifier, merged_identifiers)
897
 
        self.assertIn(person_identifier, merged_identifiers)
898
 
 
899
 
    def test_karmacache_transferred_to_user_has_no_karma(self):
900
 
        # Verify that the merged user has no KarmaCache entries,
901
 
        # and the karma total was transfered.
902
 
        self.cache_manager = getUtility(IKarmaCacheManager)
903
 
        product = self.factory.makeProduct()
904
 
        duplicate = self.factory.makePerson()
905
 
        self._makeKarmaCache(
906
 
            duplicate, product, [('bugs', 10)])
907
 
        self._makeKarmaTotalCache(duplicate, 15)
908
 
        # The karma changes invalidated duplicate instance.
909
 
        duplicate = self.person_set.get(duplicate.id)
910
 
        person = self.factory.makePerson()
911
 
        self._do_premerge(duplicate, person)
912
 
        login_person(person)
913
 
        duplicate, person = self._do_merge(duplicate, person)
914
 
        self.assertEqual([], duplicate.karma_category_caches)
915
 
        self.assertEqual(0, duplicate.karma)
916
 
        self.assertEqual(15, person.karma)
917
 
 
918
 
    def test_karmacache_transferred_to_user_has_karma(self):
919
 
        # Verify that the merged user has no KarmaCache entries,
920
 
        # and the karma total was summed.
921
 
        self.cache_manager = getUtility(IKarmaCacheManager)
922
 
        product = self.factory.makeProduct()
923
 
        duplicate = self.factory.makePerson()
924
 
        self._makeKarmaCache(
925
 
            duplicate, product, [('bugs', 10)])
926
 
        self._makeKarmaTotalCache(duplicate, 15)
927
 
        person = self.factory.makePerson()
928
 
        self._makeKarmaCache(
929
 
            person, product, [('bugs', 9)])
930
 
        self._makeKarmaTotalCache(person, 13)
931
 
        # The karma changes invalidated duplicate and person instances.
932
 
        duplicate = self.person_set.get(duplicate.id)
933
 
        person = self.person_set.get(person.id)
934
 
        self._do_premerge(duplicate, person)
935
 
        login_person(person)
936
 
        duplicate, person = self._do_merge(duplicate, person)
937
 
        self.assertEqual([], duplicate.karma_category_caches)
938
 
        self.assertEqual(0, duplicate.karma)
939
 
        self.assertEqual(28, person.karma)
940
 
 
941
 
    def test_person_date_created_preserved(self):
942
 
        # Verify that the oldest datecreated is merged.
943
 
        person = self.factory.makePerson()
944
 
        duplicate = self.factory.makePerson()
945
 
        oldest_date = datetime(
946
 
            2005, 11, 25, 0, 0, 0, 0, pytz.timezone('UTC'))
947
 
        removeSecurityProxy(duplicate).datecreated = oldest_date
948
 
        self._do_premerge(duplicate, person)
949
 
        login_person(person)
950
 
        duplicate, person = self._do_merge(duplicate, person)
951
 
        self.assertEqual(oldest_date, person.datecreated)
952
 
 
953
 
    def test_team_with_active_mailing_list_raises_error(self):
954
 
        # A team with an active mailing list cannot be merged.
955
 
        target_team = self.factory.makeTeam()
956
 
        test_team = self.factory.makeTeam()
957
 
        self.factory.makeMailingList(
958
 
            test_team, test_team.teamowner)
959
 
        self.assertRaises(
960
 
            AssertionError, self.person_set.merge, test_team, target_team)
961
 
 
962
 
    def test_team_with_inactive_mailing_list(self):
963
 
        # A team with an inactive mailing list can be merged.
964
 
        target_team = self.factory.makeTeam()
965
 
        test_team = self.factory.makeTeam()
966
 
        mailing_list = self.factory.makeMailingList(
967
 
            test_team, test_team.teamowner)
968
 
        mailing_list.deactivate()
969
 
        mailing_list.transitionToStatus(MailingListStatus.INACTIVE)
970
 
        test_team, target_team = self._do_merge(
971
 
            test_team, target_team, test_team.teamowner)
972
 
        self.assertEqual(target_team, test_team.merged)
973
 
        self.assertEqual(
974
 
            MailingListStatus.PURGED, test_team.mailing_list.status)
975
 
        emails = getUtility(IEmailAddressSet).getByPerson(target_team).count()
976
 
        self.assertEqual(0, emails)
977
 
 
978
 
    def test_team_with_purged_mailing_list(self):
979
 
        # A team with a purges mailing list can be merged.
980
 
        target_team = self.factory.makeTeam()
981
 
        test_team = self.factory.makeTeam()
982
 
        mailing_list = self.factory.makeMailingList(
983
 
            test_team, test_team.teamowner)
984
 
        mailing_list.deactivate()
985
 
        mailing_list.transitionToStatus(MailingListStatus.INACTIVE)
986
 
        mailing_list.purge()
987
 
        test_team, target_team = self._do_merge(
988
 
            test_team, target_team, test_team.teamowner)
989
 
        self.assertEqual(target_team, test_team.merged)
990
 
 
991
 
    def test_team_with_members(self):
992
 
        # Team members are removed before merging.
993
 
        target_team = self.factory.makeTeam()
994
 
        test_team = self.factory.makeTeam()
995
 
        former_member = self.factory.makePerson()
996
 
        with person_logged_in(test_team.teamowner):
997
 
            test_team.addMember(former_member, test_team.teamowner)
998
 
        test_team, target_team = self._do_merge(
999
 
            test_team, target_team, test_team.teamowner)
1000
 
        self.assertEqual(target_team, test_team.merged)
1001
 
        self.assertEqual([], list(former_member.super_teams))
1002
 
 
1003
 
    def test_team_without_super_teams_is_fine(self):
1004
 
        # A team with no members and no super teams
1005
 
        # merges without errors.
1006
 
        test_team = self.factory.makeTeam()
1007
 
        target_team = self.factory.makeTeam()
1008
 
        login_person(test_team.teamowner)
1009
 
        self._do_merge(test_team, target_team, test_team.teamowner)
1010
 
 
1011
 
    def test_team_with_super_teams(self):
1012
 
        # A team with superteams can be merged, but the memberships
1013
 
        # are not transferred.
1014
 
        test_team = self.factory.makeTeam()
1015
 
        super_team = self.factory.makeTeam()
1016
 
        target_team = self.factory.makeTeam()
1017
 
        login_person(test_team.teamowner)
1018
 
        test_team.join(super_team, test_team.teamowner)
1019
 
        test_team, target_team = self._do_merge(
1020
 
            test_team, target_team, test_team.teamowner)
1021
 
        self.assertEqual(target_team, test_team.merged)
1022
 
        self.assertEqual([], list(target_team.super_teams))
1023
 
 
1024
 
    def test_merge_moves_branches(self):
1025
 
        # When person/teams are merged, branches owned by the from person
1026
 
        # are moved.
1027
 
        person = self.factory.makePerson()
1028
 
        branch = self.factory.makeBranch()
1029
 
        duplicate = branch.owner
1030
 
        self._do_premerge(branch.owner, person)
1031
 
        login_person(person)
1032
 
        duplicate, person = self._do_merge(duplicate, person)
1033
 
        branches = person.getBranches()
1034
 
        self.assertEqual(1, branches.count())
1035
 
 
1036
 
    def test_merge_with_duplicated_branches(self):
1037
 
        # If both the from and to people have branches with the same name,
1038
 
        # merging renames the duplicate from the from person's side.
1039
 
        product = self.factory.makeProduct()
1040
 
        from_branch = self.factory.makeBranch(name='foo', product=product)
1041
 
        to_branch = self.factory.makeBranch(name='foo', product=product)
1042
 
        mergee = to_branch.owner
1043
 
        duplicate = from_branch.owner
1044
 
        self._do_premerge(duplicate, mergee)
1045
 
        login_person(mergee)
1046
 
        duplicate, mergee = self._do_merge(duplicate, mergee)
1047
 
        branches = [b.name for b in mergee.getBranches()]
1048
 
        self.assertEqual(2, len(branches))
1049
 
        self.assertContentEqual([u'foo', u'foo-1'], branches)
1050
 
 
1051
 
    def test_merge_moves_recipes(self):
1052
 
        # When person/teams are merged, recipes owned by the from person are
1053
 
        # moved.
1054
 
        person = self.factory.makePerson()
1055
 
        recipe = self.factory.makeSourcePackageRecipe()
1056
 
        duplicate = recipe.owner
1057
 
        # Delete the PPA, which is required for the merge to work.
1058
 
        with person_logged_in(duplicate):
1059
 
            recipe.owner.archive.status = ArchiveStatus.DELETED
1060
 
        self._do_premerge(duplicate, person)
1061
 
        login_person(person)
1062
 
        duplicate, person = self._do_merge(duplicate, person)
1063
 
        self.assertEqual(1, person.recipes.count())
1064
 
 
1065
 
    def test_merge_with_duplicated_recipes(self):
1066
 
        # If both the from and to people have recipes with the same name,
1067
 
        # merging renames the duplicate from the from person's side.
1068
 
        merge_from = self.factory.makeSourcePackageRecipe(
1069
 
            name=u'foo', description=u'FROM')
1070
 
        merge_to = self.factory.makeSourcePackageRecipe(
1071
 
            name=u'foo', description=u'TO')
1072
 
        duplicate = merge_from.owner
1073
 
        mergee = merge_to.owner
1074
 
        # Delete merge_from's PPA, which is required for the merge to work.
1075
 
        with person_logged_in(merge_from.owner):
1076
 
            merge_from.owner.archive.status = ArchiveStatus.DELETED
1077
 
        self._do_premerge(merge_from.owner, mergee)
1078
 
        login_person(mergee)
1079
 
        duplicate, mergee = self._do_merge(duplicate, mergee)
1080
 
        recipes = mergee.recipes
1081
 
        self.assertEqual(2, recipes.count())
1082
 
        descriptions = [r.description for r in recipes]
1083
 
        self.assertEqual([u'TO', u'FROM'], descriptions)
1084
 
        self.assertEqual(u'foo-1', recipes[1].name)
1085
 
 
1086
 
    def assertSubscriptionMerges(self, target):
1087
 
        # Given a subscription target, we want to make sure that subscriptions
1088
 
        # that the duplicate person made are carried over to the merged
1089
 
        # account.
1090
 
        duplicate = self.factory.makePerson()
1091
 
        with person_logged_in(duplicate):
1092
 
            target.addSubscription(duplicate, duplicate)
1093
 
        person = self.factory.makePerson()
1094
 
        self._do_premerge(duplicate, person)
1095
 
        login_person(person)
1096
 
        duplicate, person = self._do_merge(duplicate, person)
1097
 
        # The merged person has the subscription, and the duplicate person
1098
 
        # does not.
1099
 
        self.assertTrue(target.getSubscription(person) is not None)
1100
 
        self.assertTrue(target.getSubscription(duplicate) is None)
1101
 
 
1102
 
    def assertConflictingSubscriptionDeletes(self, target):
1103
 
        # Given a subscription target, we want to make sure that subscriptions
1104
 
        # that the duplicate person made that conflict with existing
1105
 
        # subscriptions in the merged account are deleted.
1106
 
        duplicate = self.factory.makePerson()
1107
 
        person = self.factory.makePerson()
1108
 
        with person_logged_in(duplicate):
1109
 
            target.addSubscription(duplicate, duplicate)
1110
 
        with person_logged_in(person):
1111
 
            # The description lets us show that we still have the right
1112
 
            # subscription later.
1113
 
            target.addBugSubscriptionFilter(person, person).description = (
1114
 
                u'a marker')
1115
 
        self._do_premerge(duplicate, person)
1116
 
        login_person(person)
1117
 
        duplicate, person = self._do_merge(duplicate, person)
1118
 
        # The merged person still has the original subscription, as shown
1119
 
        # by the marker name.
1120
 
        self.assertEqual(
1121
 
            target.getSubscription(person).bug_filters[0].description,
1122
 
            u'a marker')
1123
 
        # The conflicting subscription on the duplicate has been deleted.
1124
 
        self.assertTrue(target.getSubscription(duplicate) is None)
1125
 
 
1126
 
    def test_merge_with_product_subscription(self):
1127
 
        # See comments in assertSubscriptionMerges.
1128
 
        self.assertSubscriptionMerges(self.factory.makeProduct())
1129
 
 
1130
 
    def test_merge_with_conflicting_product_subscription(self):
1131
 
        # See comments in assertConflictingSubscriptionDeletes.
1132
 
        self.assertConflictingSubscriptionDeletes(self.factory.makeProduct())
1133
 
 
1134
 
    def test_merge_with_project_subscription(self):
1135
 
        # See comments in assertSubscriptionMerges.
1136
 
        self.assertSubscriptionMerges(self.factory.makeProject())
1137
 
 
1138
 
    def test_merge_with_conflicting_project_subscription(self):
1139
 
        # See comments in assertConflictingSubscriptionDeletes.
1140
 
        self.assertConflictingSubscriptionDeletes(self.factory.makeProject())
1141
 
 
1142
 
    def test_merge_with_distroseries_subscription(self):
1143
 
        # See comments in assertSubscriptionMerges.
1144
 
        self.assertSubscriptionMerges(self.factory.makeDistroSeries())
1145
 
 
1146
 
    def test_merge_with_conflicting_distroseries_subscription(self):
1147
 
        # See comments in assertConflictingSubscriptionDeletes.
1148
 
        self.assertConflictingSubscriptionDeletes(
1149
 
            self.factory.makeDistroSeries())
1150
 
 
1151
 
    def test_merge_with_milestone_subscription(self):
1152
 
        # See comments in assertSubscriptionMerges.
1153
 
        self.assertSubscriptionMerges(self.factory.makeMilestone())
1154
 
 
1155
 
    def test_merge_with_conflicting_milestone_subscription(self):
1156
 
        # See comments in assertConflictingSubscriptionDeletes.
1157
 
        self.assertConflictingSubscriptionDeletes(
1158
 
            self.factory.makeMilestone())
1159
 
 
1160
 
    def test_merge_with_productseries_subscription(self):
1161
 
        # See comments in assertSubscriptionMerges.
1162
 
        self.assertSubscriptionMerges(self.factory.makeProductSeries())
1163
 
 
1164
 
    def test_merge_with_conflicting_productseries_subscription(self):
1165
 
        # See comments in assertConflictingSubscriptionDeletes.
1166
 
        self.assertConflictingSubscriptionDeletes(
1167
 
            self.factory.makeProductSeries())
1168
 
 
1169
 
    def test_merge_with_distribution_subscription(self):
1170
 
        # See comments in assertSubscriptionMerges.
1171
 
        self.assertSubscriptionMerges(self.factory.makeDistribution())
1172
 
 
1173
 
    def test_merge_with_conflicting_distribution_subscription(self):
1174
 
        # See comments in assertConflictingSubscriptionDeletes.
1175
 
        self.assertConflictingSubscriptionDeletes(
1176
 
            self.factory.makeDistribution())
1177
 
 
1178
 
    def test_merge_with_sourcepackage_subscription(self):
1179
 
        # See comments in assertSubscriptionMerges.
1180
 
        dsp = self.factory.makeDistributionSourcePackage()
1181
 
        self.assertSubscriptionMerges(dsp)
1182
 
 
1183
 
    def test_merge_with_conflicting_sourcepackage_subscription(self):
1184
 
        # See comments in assertConflictingSubscriptionDeletes.
1185
 
        dsp = self.factory.makeDistributionSourcePackage()
1186
 
        self.assertConflictingSubscriptionDeletes(dsp)
1187
 
 
1188
 
    def test_merge_accesspolicygrants(self):
1189
 
        # AccessPolicyGrants are transferred from the duplicate.
1190
 
        person = self.factory.makePerson()
1191
 
        grant = self.factory.makeAccessPolicyGrant()
1192
 
        self._do_premerge(grant.grantee, person)
1193
 
        with person_logged_in(person):
1194
 
            self._do_merge(grant.grantee, person)
1195
 
        self.assertEqual(person, grant.grantee)
1196
 
 
1197
 
    def test_merge_accesspolicygrants_conflicts(self):
1198
 
        # Conflicting AccessPolicyGrants are deleted.
1199
 
        policy = self.factory.makeAccessPolicy()
1200
 
 
1201
 
        person = self.factory.makePerson()
1202
 
        person_grantor = self.factory.makePerson()
1203
 
        person_grant = self.factory.makeAccessPolicyGrant(
1204
 
            grantee=person, grantor=person_grantor, object=policy)
1205
 
 
1206
 
        duplicate = self.factory.makePerson()
1207
 
        duplicate_grantor = self.factory.makePerson()
1208
 
        duplicate_grant = self.factory.makeAccessPolicyGrant(
1209
 
            grantee=duplicate, grantor=duplicate_grantor, object=policy)
1210
 
 
1211
 
        self._do_premerge(duplicate, person)
1212
 
        with person_logged_in(person):
1213
 
            self._do_merge(duplicate, person)
1214
 
        transaction.commit()
1215
 
 
1216
 
        self.assertEqual(person, person_grant.grantee)
1217
 
        self.assertEqual(person_grantor, person_grant.grantor)
1218
 
        self.assertIs(
1219
 
            None,
1220
 
            IStore(AccessPolicyGrant).get(
1221
 
                AccessPolicyGrant, duplicate_grant.id))
1222
 
 
1223
 
    def test_mergeAsync(self):
1224
 
        # mergeAsync() creates a new `PersonMergeJob`.
1225
 
        from_person = self.factory.makePerson()
1226
 
        to_person = self.factory.makePerson()
1227
 
        login_person(from_person)
1228
 
        job = self.person_set.mergeAsync(from_person, to_person)
1229
 
        self.assertEqual(from_person, job.from_person)
1230
 
        self.assertEqual(to_person, job.to_person)
1231
 
 
1232
 
 
1233
 
class TestPersonSetCreateByOpenId(TestCaseWithFactory):
1234
 
    layer = DatabaseFunctionalLayer
1235
 
 
1236
 
    def setUp(self):
1237
 
        super(TestPersonSetCreateByOpenId, self).setUp()
1238
 
        self.person_set = getUtility(IPersonSet)
1239
 
        self.store = IMasterStore(Account)
1240
 
 
1241
 
        # Generate some valid test data.
1242
 
        self.account = self.makeAccount()
1243
 
        self.identifier = self.makeOpenIdIdentifier(self.account, u'whatever')
1244
 
        self.person = self.makePerson(self.account)
1245
 
        self.email = self.makeEmailAddress(
1246
 
            email='whatever@example.com', person=self.person)
1247
 
 
1248
 
    def makeAccount(self):
1249
 
        return self.store.add(Account(
1250
 
            displayname='Displayname',
1251
 
            creation_rationale=AccountCreationRationale.UNKNOWN,
1252
 
            status=AccountStatus.ACTIVE))
1253
 
 
1254
 
    def makeOpenIdIdentifier(self, account, identifier):
1255
 
        openid_identifier = OpenIdIdentifier()
1256
 
        openid_identifier.identifier = identifier
1257
 
        openid_identifier.account = account
1258
 
        return self.store.add(openid_identifier)
1259
 
 
1260
 
    def makePerson(self, account):
1261
 
        return self.store.add(Person(
1262
 
            name='acc%d' % account.id, account=account,
1263
 
            displayname='Displayname',
1264
 
            creation_rationale=PersonCreationRationale.UNKNOWN))
1265
 
 
1266
 
    def makeEmailAddress(self, email, person):
1267
 
            return self.store.add(EmailAddress(
1268
 
                email=email,
1269
 
                account=person.account,
1270
 
                person=person,
1271
 
                status=EmailAddressStatus.PREFERRED))
1272
 
 
1273
 
    def testAllValid(self):
1274
 
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1275
 
            self.identifier.identifier, self.email.email, 'Ignored Name',
1276
 
            PersonCreationRationale.UNKNOWN, 'No Comment')
1277
 
        found = removeSecurityProxy(found)
1278
 
 
1279
 
        self.assertIs(False, updated)
1280
 
        self.assertIs(self.person, found)
1281
 
        self.assertIs(self.account, found.account)
1282
 
        self.assertIs(self.email, found.preferredemail)
1283
 
        self.assertIs(self.email.account, self.account)
1284
 
        self.assertIs(self.email.person, self.person)
1285
 
        self.assertEqual(
1286
 
            [self.identifier], list(self.account.openid_identifiers))
1287
 
 
1288
 
    def testEmailAddressCaseInsensitive(self):
1289
 
        # As per testAllValid, but the email address used for the lookup
1290
 
        # is all upper case.
1291
 
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1292
 
            self.identifier.identifier, self.email.email.upper(),
1293
 
            'Ignored Name', PersonCreationRationale.UNKNOWN, 'No Comment')
1294
 
        found = removeSecurityProxy(found)
1295
 
 
1296
 
        self.assertIs(False, updated)
1297
 
        self.assertIs(self.person, found)
1298
 
        self.assertIs(self.account, found.account)
1299
 
        self.assertIs(self.email, found.preferredemail)
1300
 
        self.assertIs(self.email.account, self.account)
1301
 
        self.assertIs(self.email.person, self.person)
1302
 
        self.assertEqual(
1303
 
            [self.identifier], list(self.account.openid_identifiers))
1304
 
 
1305
 
    def testNewOpenId(self):
1306
 
        # Account looked up by email and the new OpenId identifier
1307
 
        # attached. We can do this because we trust our OpenId Provider.
1308
 
        new_identifier = u'newident'
1309
 
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1310
 
            new_identifier, self.email.email, 'Ignored Name',
1311
 
            PersonCreationRationale.UNKNOWN, 'No Comment')
1312
 
        found = removeSecurityProxy(found)
1313
 
 
1314
 
        self.assertIs(True, updated)
1315
 
        self.assertIs(self.person, found)
1316
 
        self.assertIs(self.account, found.account)
1317
 
        self.assertIs(self.email, found.preferredemail)
1318
 
        self.assertIs(self.email.account, self.account)
1319
 
        self.assertIs(self.email.person, self.person)
1320
 
 
1321
 
        # Old OpenId Identifier still attached.
1322
 
        self.assertIn(self.identifier, list(self.account.openid_identifiers))
1323
 
 
1324
 
        # So is our new one.
1325
 
        identifiers = [
1326
 
            identifier.identifier for identifier
1327
 
                in self.account.openid_identifiers]
1328
 
        self.assertIn(new_identifier, identifiers)
1329
 
 
1330
 
    def testNewEmailAddress(self):
1331
 
        # Account looked up by OpenId identifier and new EmailAddress
1332
 
        # attached. We can do this because we trust our OpenId Provider.
1333
 
        new_email = u'new_email@example.com'
1334
 
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1335
 
            self.identifier.identifier, new_email, 'Ignored Name',
1336
 
            PersonCreationRationale.UNKNOWN, 'No Comment')
1337
 
        found = removeSecurityProxy(found)
1338
 
 
1339
 
        self.assertIs(True, updated)
1340
 
        self.assertIs(self.person, found)
1341
 
        self.assertIs(self.account, found.account)
1342
 
        self.assertEqual(
1343
 
            [self.identifier], list(self.account.openid_identifiers))
1344
 
 
1345
 
        # The old email address is still there and correctly linked.
1346
 
        self.assertIs(self.email, found.preferredemail)
1347
 
        self.assertIs(self.email.account, self.account)
1348
 
        self.assertIs(self.email.person, self.person)
1349
 
 
1350
 
        # The new email address is there too and correctly linked.
1351
 
        new_email = self.store.find(EmailAddress, email=new_email).one()
1352
 
        self.assertIs(new_email.account, self.account)
1353
 
        self.assertIs(new_email.person, self.person)
1354
 
        self.assertEqual(EmailAddressStatus.NEW, new_email.status)
1355
 
 
1356
 
    def testNewAccountAndIdentifier(self):
1357
 
        # If neither the OpenId Identifier nor the email address are
1358
 
        # found, we create everything.
1359
 
        new_email = u'new_email@example.com'
1360
 
        new_identifier = u'new_identifier'
1361
 
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1362
 
            new_identifier, new_email, 'New Name',
1363
 
            PersonCreationRationale.UNKNOWN, 'No Comment')
1364
 
        found = removeSecurityProxy(found)
1365
 
 
1366
 
        # We have a new Person
1367
 
        self.assertIs(True, updated)
1368
 
        self.assertIsNot(None, found)
1369
 
 
1370
 
        # It is correctly linked to an account, emailaddress and
1371
 
        # identifier.
1372
 
        self.assertIs(found, found.preferredemail.person)
1373
 
        self.assertIs(found.account, found.preferredemail.account)
1374
 
        self.assertEqual(
1375
 
            new_identifier, found.account.openid_identifiers.any().identifier)
1376
 
 
1377
 
    def testNoPerson(self):
1378
 
        # If the account is not linked to a Person, create one. ShipIt
1379
 
        # users fall into this category the first time they log into
1380
 
        # Launchpad.
1381
 
        self.email.person = None
1382
 
        self.person.account = None
1383
 
 
1384
 
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1385
 
            self.identifier.identifier, self.email.email, 'New Name',
1386
 
            PersonCreationRationale.UNKNOWN, 'No Comment')
1387
 
        found = removeSecurityProxy(found)
1388
 
 
1389
 
        # We have a new Person
1390
 
        self.assertIs(True, updated)
1391
 
        self.assertIsNot(self.person, found)
1392
 
 
1393
 
        # It is correctly linked to an account, emailaddress and
1394
 
        # identifier.
1395
 
        self.assertIs(found, found.preferredemail.person)
1396
 
        self.assertIs(found.account, found.preferredemail.account)
1397
 
        self.assertIn(self.identifier, list(found.account.openid_identifiers))
1398
 
 
1399
 
    def testNoAccount(self):
1400
 
        # EmailAddress is linked to a Person, but there is no Account.
1401
 
        # Convert this stub into something valid.
1402
 
        self.email.account = None
1403
 
        self.email.status = EmailAddressStatus.NEW
1404
 
        self.person.account = None
1405
 
        new_identifier = u'new_identifier'
1406
 
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1407
 
            new_identifier, self.email.email, 'Ignored',
1408
 
            PersonCreationRationale.UNKNOWN, 'No Comment')
1409
 
        found = removeSecurityProxy(found)
1410
 
 
1411
 
        self.assertIs(True, updated)
1412
 
 
1413
 
        self.assertIsNot(None, found.account)
1414
 
        self.assertEqual(
1415
 
            new_identifier, found.account.openid_identifiers.any().identifier)
1416
 
        self.assertIs(self.email.person, found)
1417
 
        self.assertIs(self.email.account, found.account)
1418
 
        self.assertEqual(EmailAddressStatus.PREFERRED, self.email.status)
1419
 
 
1420
 
    def testMovedEmailAddress(self):
1421
 
        # The EmailAddress and OpenId Identifier are both in the
1422
 
        # database, but they are not linked to the same account. The
1423
 
        # identifier needs to be relinked to the correct account - the
1424
 
        # user able to log into the trusted SSO with that email address
1425
 
        # should be able to log into Launchpad with that email address.
1426
 
        # This lets us cope with the SSO migrating email addresses
1427
 
        # between SSO accounts.
1428
 
        self.identifier.account = self.store.find(
1429
 
            Account, displayname='Foo Bar').one()
1430
 
 
1431
 
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1432
 
            self.identifier.identifier, self.email.email, 'New Name',
1433
 
            PersonCreationRationale.UNKNOWN, 'No Comment')
1434
 
        found = removeSecurityProxy(found)
1435
 
 
1436
 
        self.assertIs(True, updated)
1437
 
        self.assertIs(self.person, found)
1438
 
 
1439
 
        self.assertIs(found.account, self.identifier.account)
1440
 
        self.assertIn(self.identifier, list(found.account.openid_identifiers))
1441
 
 
1442
 
 
1443
 
class TestCreatePersonAndEmail(TestCase):
1444
 
    """Test `IPersonSet`.createPersonAndEmail()."""
1445
 
    layer = DatabaseFunctionalLayer
1446
 
 
1447
 
    def setUp(self):
1448
 
        TestCase.setUp(self)
1449
 
        login(ANONYMOUS)
1450
 
        self.addCleanup(logout)
1451
 
        self.person_set = getUtility(IPersonSet)
1452
 
 
1453
 
    def test_duplicated_name_not_accepted(self):
1454
 
        self.person_set.createPersonAndEmail(
1455
 
            'testing@example.com', PersonCreationRationale.UNKNOWN,
1456
 
            name='zzzz')
1457
 
        self.assertRaises(
1458
 
            NameAlreadyTaken, self.person_set.createPersonAndEmail,
1459
 
            'testing2@example.com', PersonCreationRationale.UNKNOWN,
1460
 
            name='zzzz')
1461
 
 
1462
 
    def test_duplicated_email_not_accepted(self):
1463
 
        self.person_set.createPersonAndEmail(
1464
 
            'testing@example.com', PersonCreationRationale.UNKNOWN)
1465
 
        self.assertRaises(
1466
 
            EmailAddressAlreadyTaken, self.person_set.createPersonAndEmail,
1467
 
            'testing@example.com', PersonCreationRationale.UNKNOWN)
1468
 
 
1469
 
    def test_invalid_email_not_accepted(self):
1470
 
        self.assertRaises(
1471
 
            InvalidEmailAddress, self.person_set.createPersonAndEmail,
1472
 
            'testing@.com', PersonCreationRationale.UNKNOWN)
1473
 
 
1474
 
    def test_invalid_name_not_accepted(self):
1475
 
        self.assertRaises(
1476
 
            InvalidName, self.person_set.createPersonAndEmail,
1477
 
            'testing@example.com', PersonCreationRationale.UNKNOWN,
1478
 
            name='/john')
1479
 
 
1480
 
 
1481
719
class TestPersonRelatedBugTaskSearch(TestCaseWithFactory):
1482
720
 
1483
721
    layer = DatabaseFunctionalLayer
1593
831
            assignee=self.user)
1594
832
 
1595
833
 
 
834
class KarmaTestMixin:
 
835
    """Helper methods for setting karma."""
 
836
 
 
837
    def _makeKarmaCache(self, person, product, category_name_values):
 
838
        """Create a KarmaCache entry with the given arguments.
 
839
 
 
840
        In order to create the KarmaCache record we must switch to the DB
 
841
        user 'karma'. This invalidates the objects under test so they
 
842
        must be retrieved again.
 
843
        """
 
844
        with dbuser('karma'):
 
845
            total = 0
 
846
            # Insert category total for person and project.
 
847
            for category_name, value in category_name_values:
 
848
                category = KarmaCategory.byName(category_name)
 
849
                self.cache_manager.new(
 
850
                    value, person.id, category.id, product_id=product.id)
 
851
                total += value
 
852
            # Insert total cache for person and project.
 
853
            self.cache_manager.new(
 
854
                total, person.id, None, product_id=product.id)
 
855
 
 
856
    def _makeKarmaTotalCache(self, person, total):
 
857
        """Create a KarmaTotalCache entry.
 
858
 
 
859
        In order to create the KarmaTotalCache record we must switch to the DB
 
860
        user 'karma'. This invalidates the objects under test so they
 
861
        must be retrieved again.
 
862
        """
 
863
        with dbuser('karma'):
 
864
            KarmaTotalCache(person=person.id, karma_total=total)
 
865
 
 
866
 
1596
867
class TestPersonKarma(TestCaseWithFactory, KarmaTestMixin):
1597
868
 
1598
869
    layer = DatabaseFunctionalLayer