~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/registry/tests/test_person.py

  • Committer: Curtis Hovey
  • Date: 2012-01-06 15:14:48 UTC
  • mto: This revision was merged to the branch mainline in revision 14651.
  • Revision ID: curtis.hovey@canonical.com-20120106151448-ie2e51xxf5ap2s2l
Moved personset tests to test_personset.

Show diffs side-by-side

added added

removed removed

Lines of Context:
5
5
 
6
6
from datetime import datetime
7
7
 
8
 
from lazr.lifecycle.snapshot import Snapshot
9
8
import pytz
 
9
 
10
10
from storm.store import Store
 
11
 
11
12
from testtools.matchers import (
12
13
    Equals,
13
14
    LessThan,
14
15
    )
15
 
import transaction
 
16
 
16
17
from zope.component import getUtility
17
18
from zope.interface import providedBy
18
19
from zope.security.interfaces import Unauthorized
19
20
from zope.security.proxy import removeSecurityProxy
20
21
 
 
22
from lazr.lifecycle.snapshot import Snapshot
 
23
 
21
24
from lp.answers.model.answercontact import AnswerContact
22
25
from lp.app.interfaces.launchpad import ILaunchpadCelebrities
23
26
from lp.blueprints.model.specification import Specification
25
28
from lp.bugs.model.bug import Bug
26
29
from lp.bugs.model.bugtask import get_related_bugtasks_search_params
27
30
from lp.registry.errors import (
28
 
    InvalidName,
29
 
    NameAlreadyTaken,
30
31
    PrivatePersonLinkageError,
31
32
    )
32
33
from lp.registry.interfaces.karma import IKarmaCacheManager
33
 
from lp.registry.interfaces.mailinglist import MailingListStatus
34
 
from lp.registry.interfaces.nameblacklist import INameBlacklistSet
35
34
from lp.registry.interfaces.person import (
36
35
    ImmutableVisibilityError,
37
36
    IPersonSet,
38
 
    PersonCreationRationale,
39
37
    PersonVisibility,
40
38
    )
41
 
from lp.registry.interfaces.personnotification import IPersonNotificationSet
42
39
from lp.registry.interfaces.pocket import PackagePublishingPocket
43
40
from lp.registry.interfaces.product import IProductSet
44
 
from lp.registry.model.accesspolicy import AccessPolicyGrant
45
41
from lp.registry.model.karma import (
46
42
    KarmaCategory,
47
43
    KarmaTotalCache,
50
46
    get_recipients,
51
47
    Person,
52
48
    )
53
 
from lp.services.config import config
54
 
from lp.services.database.lpstorm import (
55
 
    IMasterStore,
56
 
    IStore,
57
 
    )
58
 
from lp.services.database.sqlbase import cursor
59
49
from lp.services.identity.interfaces.account import (
60
 
    AccountCreationRationale,
61
50
    AccountStatus,
62
51
    )
63
52
from lp.services.identity.interfaces.emailaddress import (
64
 
    EmailAddressAlreadyTaken,
65
53
    EmailAddressStatus,
66
 
    IEmailAddressSet,
67
 
    InvalidEmailAddress,
68
54
    )
69
 
from lp.services.identity.model.account import Account
70
 
from lp.services.identity.model.emailaddress import EmailAddress
71
 
from lp.services.openid.model.openididentifier import OpenIdIdentifier
72
55
from lp.services.propertycache import clear_property_cache
73
56
from lp.soyuz.enums import (
74
57
    ArchivePurpose,
75
 
    ArchiveStatus,
76
58
    )
77
59
from lp.testing import (
78
 
    ANONYMOUS,
79
60
    celebrity_logged_in,
80
61
    login,
81
62
    login_person,
82
63
    logout,
83
64
    person_logged_in,
84
65
    StormStatementRecorder,
85
 
    TestCase,
86
66
    TestCaseWithFactory,
87
67
    )
88
68
from lp.testing._webservice import QueryCollector
736
716
        self.assertEqual('(\\u0170-tester)>', displayname)
737
717
 
738
718
 
739
 
class TestPersonSet(TestCaseWithFactory):
740
 
    """Test `IPersonSet`."""
741
 
    layer = DatabaseFunctionalLayer
742
 
 
743
 
    def setUp(self):
744
 
        super(TestPersonSet, self).setUp()
745
 
        login(ANONYMOUS)
746
 
        self.addCleanup(logout)
747
 
        self.person_set = getUtility(IPersonSet)
748
 
 
749
 
    def test_isNameBlacklisted(self):
750
 
        cursor().execute(
751
 
            "INSERT INTO NameBlacklist(id, regexp) VALUES (-100, 'foo')")
752
 
        self.failUnless(self.person_set.isNameBlacklisted('foo'))
753
 
        self.failIf(self.person_set.isNameBlacklisted('bar'))
754
 
 
755
 
    def test_isNameBlacklisted_user_is_admin(self):
756
 
        team = self.factory.makeTeam()
757
 
        name_blacklist_set = getUtility(INameBlacklistSet)
758
 
        self.admin_exp = name_blacklist_set.create(u'fnord', admin=team)
759
 
        self.store = IStore(self.admin_exp)
760
 
        self.store.flush()
761
 
        user = team.teamowner
762
 
        self.assertFalse(self.person_set.isNameBlacklisted('fnord', user))
763
 
 
764
 
    def test_getByEmail_ignores_case_and_whitespace(self):
765
 
        person1_email = 'foo.bar@canonical.com'
766
 
        person1 = self.person_set.getByEmail(person1_email)
767
 
        self.failIf(
768
 
            person1 is None,
769
 
            "PersonSet.getByEmail() could not find %r" % person1_email)
770
 
 
771
 
        person2 = self.person_set.getByEmail('  foo.BAR@canonICAL.com  ')
772
 
        self.failIf(
773
 
            person2 is None,
774
 
            "PersonSet.getByEmail() should ignore case and whitespace.")
775
 
        self.assertEqual(person1, person2)
776
 
 
777
 
    def test_getPrecachedPersonsFromIDs(self):
778
 
        # The getPrecachedPersonsFromIDs() method should only make one
779
 
        # query to load all the extraneous data. Accessing the
780
 
        # attributes should then cause zero queries.
781
 
        person_ids = [
782
 
            self.factory.makePerson().id
783
 
            for i in range(3)]
784
 
 
785
 
        with StormStatementRecorder() as recorder:
786
 
            persons = list(self.person_set.getPrecachedPersonsFromIDs(
787
 
                person_ids, need_karma=True, need_ubuntu_coc=True,
788
 
                need_location=True, need_archive=True,
789
 
                need_preferred_email=True, need_validity=True))
790
 
        self.assertThat(recorder, HasQueryCount(LessThan(2)))
791
 
 
792
 
        with StormStatementRecorder() as recorder:
793
 
            for person in persons:
794
 
                person.is_valid_person
795
 
                person.karma
796
 
                person.is_ubuntu_coc_signer
797
 
                person.location
798
 
                person.archive
799
 
                person.preferredemail
800
 
        self.assertThat(recorder, HasQueryCount(LessThan(1)))
801
 
 
802
 
    def test_latest_teams_public(self):
803
 
        # Anyone can see the latest 5 teams if they are public.
804
 
        teams = []
805
 
        for num in xrange(1, 7):
806
 
            teams.append(self.factory.makeTeam(name='team-%s' % num))
807
 
        teams.reverse()
808
 
        result = self.person_set.latest_teams()
809
 
        self.assertEqual(teams[0:5], list(result))
810
 
 
811
 
    def test_latest_teams_private(self):
812
 
        # Private teams are only included in the latest teams if the
813
 
        # user can view the team.
814
 
        teams = []
815
 
        for num in xrange(1, 7):
816
 
            teams.append(self.factory.makeTeam(name='team-%s' % num))
817
 
        owner = self.factory.makePerson()
818
 
        teams.append(
819
 
            self.factory.makeTeam(
820
 
                name='private-team', owner=owner,
821
 
                visibility=PersonVisibility.PRIVATE))
822
 
        teams.reverse()
823
 
        login_person(owner)
824
 
        result = self.person_set.latest_teams()
825
 
        self.assertEqual(teams[0:5], list(result))
826
 
        login_person(self.factory.makePerson())
827
 
        result = self.person_set.latest_teams()
828
 
        self.assertEqual(teams[1:6], list(result))
829
 
 
830
 
    def test_latest_teams_limit(self):
831
 
        # The limit controls the number of latest teams returned.
832
 
        teams = []
833
 
        for num in xrange(1, 7):
834
 
            teams.append(self.factory.makeTeam(name='team-%s' % num))
835
 
        teams.reverse()
836
 
        result = self.person_set.latest_teams(limit=3)
837
 
        self.assertEqual(teams[0:3], list(result))
838
 
 
839
 
 
840
 
class KarmaTestMixin:
841
 
    """Helper methods for setting karma."""
842
 
 
843
 
    def _makeKarmaCache(self, person, product, category_name_values):
844
 
        """Create a KarmaCache entry with the given arguments.
845
 
 
846
 
        In order to create the KarmaCache record we must switch to the DB
847
 
        user 'karma'. This invalidates the objects under test so they
848
 
        must be retrieved again.
849
 
        """
850
 
        with dbuser('karma'):
851
 
            total = 0
852
 
            # Insert category total for person and project.
853
 
            for category_name, value in category_name_values:
854
 
                category = KarmaCategory.byName(category_name)
855
 
                self.cache_manager.new(
856
 
                    value, person.id, category.id, product_id=product.id)
857
 
                total += value
858
 
            # Insert total cache for person and project.
859
 
            self.cache_manager.new(
860
 
                total, person.id, None, product_id=product.id)
861
 
 
862
 
    def _makeKarmaTotalCache(self, person, total):
863
 
        """Create a KarmaTotalCache entry.
864
 
 
865
 
        In order to create the KarmaTotalCache record we must switch to the DB
866
 
        user 'karma'. This invalidates the objects under test so they
867
 
        must be retrieved again.
868
 
        """
869
 
        with dbuser('karma'):
870
 
            KarmaTotalCache(person=person.id, karma_total=total)
871
 
 
872
 
 
873
 
class TestPersonSetMerge(TestCaseWithFactory, KarmaTestMixin):
874
 
    """Test cases for PersonSet merge."""
875
 
 
876
 
    layer = DatabaseFunctionalLayer
877
 
 
878
 
    def setUp(self):
879
 
        super(TestPersonSetMerge, self).setUp()
880
 
        self.person_set = getUtility(IPersonSet)
881
 
 
882
 
    def _do_premerge(self, from_person, to_person):
883
 
        # Do the pre merge work performed by the LoginToken.
884
 
        with celebrity_logged_in('admin'):
885
 
            email = from_person.preferredemail
886
 
            email.status = EmailAddressStatus.NEW
887
 
            email.person = to_person
888
 
            email.account = to_person.account
889
 
        transaction.commit()
890
 
 
891
 
    def _do_merge(self, from_person, to_person, reviewer=None):
892
 
        # Perform the merge as the db user that will be used by the jobs.
893
 
        with dbuser(config.IPersonMergeJobSource.dbuser):
894
 
            self.person_set.merge(from_person, to_person, reviewer=reviewer)
895
 
        return from_person, to_person
896
 
 
897
 
    def _get_testable_account(self, person, date_created, openid_identifier):
898
 
        # Return a naked account with predictable attributes.
899
 
        account = removeSecurityProxy(person.account)
900
 
        account.date_created = date_created
901
 
        account.openid_identifier = openid_identifier
902
 
        return account
903
 
 
904
 
    def test_delete_no_notifications(self):
905
 
        team = self.factory.makeTeam()
906
 
        owner = team.teamowner
907
 
        transaction.commit()
908
 
        with dbuser(config.IPersonMergeJobSource.dbuser):
909
 
            self.person_set.delete(team, owner)
910
 
        notification_set = getUtility(IPersonNotificationSet)
911
 
        notifications = notification_set.getNotificationsToSend()
912
 
        self.assertEqual(0, notifications.count())
913
 
 
914
 
    def test_openid_identifiers(self):
915
 
        # Verify that OpenId Identifiers are merged.
916
 
        duplicate = self.factory.makePerson()
917
 
        duplicate_identifier = removeSecurityProxy(
918
 
            duplicate.account).openid_identifiers.any().identifier
919
 
        person = self.factory.makePerson()
920
 
        person_identifier = removeSecurityProxy(
921
 
            person.account).openid_identifiers.any().identifier
922
 
        self._do_premerge(duplicate, person)
923
 
        login_person(person)
924
 
        duplicate, person = self._do_merge(duplicate, person)
925
 
        self.assertEqual(
926
 
            0,
927
 
            removeSecurityProxy(duplicate.account).openid_identifiers.count())
928
 
 
929
 
        merged_identifiers = [
930
 
            identifier.identifier for identifier in
931
 
                removeSecurityProxy(person.account).openid_identifiers]
932
 
 
933
 
        self.assertIn(duplicate_identifier, merged_identifiers)
934
 
        self.assertIn(person_identifier, merged_identifiers)
935
 
 
936
 
    def test_karmacache_transferred_to_user_has_no_karma(self):
937
 
        # Verify that the merged user has no KarmaCache entries,
938
 
        # and the karma total was transfered.
939
 
        self.cache_manager = getUtility(IKarmaCacheManager)
940
 
        product = self.factory.makeProduct()
941
 
        duplicate = self.factory.makePerson()
942
 
        self._makeKarmaCache(
943
 
            duplicate, product, [('bugs', 10)])
944
 
        self._makeKarmaTotalCache(duplicate, 15)
945
 
        # The karma changes invalidated duplicate instance.
946
 
        duplicate = self.person_set.get(duplicate.id)
947
 
        person = self.factory.makePerson()
948
 
        self._do_premerge(duplicate, person)
949
 
        login_person(person)
950
 
        duplicate, person = self._do_merge(duplicate, person)
951
 
        self.assertEqual([], duplicate.karma_category_caches)
952
 
        self.assertEqual(0, duplicate.karma)
953
 
        self.assertEqual(15, person.karma)
954
 
 
955
 
    def test_karmacache_transferred_to_user_has_karma(self):
956
 
        # Verify that the merged user has no KarmaCache entries,
957
 
        # and the karma total was summed.
958
 
        self.cache_manager = getUtility(IKarmaCacheManager)
959
 
        product = self.factory.makeProduct()
960
 
        duplicate = self.factory.makePerson()
961
 
        self._makeKarmaCache(
962
 
            duplicate, product, [('bugs', 10)])
963
 
        self._makeKarmaTotalCache(duplicate, 15)
964
 
        person = self.factory.makePerson()
965
 
        self._makeKarmaCache(
966
 
            person, product, [('bugs', 9)])
967
 
        self._makeKarmaTotalCache(person, 13)
968
 
        # The karma changes invalidated duplicate and person instances.
969
 
        duplicate = self.person_set.get(duplicate.id)
970
 
        person = self.person_set.get(person.id)
971
 
        self._do_premerge(duplicate, person)
972
 
        login_person(person)
973
 
        duplicate, person = self._do_merge(duplicate, person)
974
 
        self.assertEqual([], duplicate.karma_category_caches)
975
 
        self.assertEqual(0, duplicate.karma)
976
 
        self.assertEqual(28, person.karma)
977
 
 
978
 
    def test_person_date_created_preserved(self):
979
 
        # Verify that the oldest datecreated is merged.
980
 
        person = self.factory.makePerson()
981
 
        duplicate = self.factory.makePerson()
982
 
        oldest_date = datetime(
983
 
            2005, 11, 25, 0, 0, 0, 0, pytz.timezone('UTC'))
984
 
        removeSecurityProxy(duplicate).datecreated = oldest_date
985
 
        self._do_premerge(duplicate, person)
986
 
        login_person(person)
987
 
        duplicate, person = self._do_merge(duplicate, person)
988
 
        self.assertEqual(oldest_date, person.datecreated)
989
 
 
990
 
    def test_team_with_active_mailing_list_raises_error(self):
991
 
        # A team with an active mailing list cannot be merged.
992
 
        target_team = self.factory.makeTeam()
993
 
        test_team = self.factory.makeTeam()
994
 
        self.factory.makeMailingList(
995
 
            test_team, test_team.teamowner)
996
 
        self.assertRaises(
997
 
            AssertionError, self.person_set.merge, test_team, target_team)
998
 
 
999
 
    def test_team_with_inactive_mailing_list(self):
1000
 
        # A team with an inactive mailing list can be merged.
1001
 
        target_team = self.factory.makeTeam()
1002
 
        test_team = self.factory.makeTeam()
1003
 
        mailing_list = self.factory.makeMailingList(
1004
 
            test_team, test_team.teamowner)
1005
 
        mailing_list.deactivate()
1006
 
        mailing_list.transitionToStatus(MailingListStatus.INACTIVE)
1007
 
        test_team, target_team = self._do_merge(
1008
 
            test_team, target_team, test_team.teamowner)
1009
 
        self.assertEqual(target_team, test_team.merged)
1010
 
        self.assertEqual(
1011
 
            MailingListStatus.PURGED, test_team.mailing_list.status)
1012
 
        emails = getUtility(IEmailAddressSet).getByPerson(target_team).count()
1013
 
        self.assertEqual(0, emails)
1014
 
 
1015
 
    def test_team_with_purged_mailing_list(self):
1016
 
        # A team with a purges mailing list can be merged.
1017
 
        target_team = self.factory.makeTeam()
1018
 
        test_team = self.factory.makeTeam()
1019
 
        mailing_list = self.factory.makeMailingList(
1020
 
            test_team, test_team.teamowner)
1021
 
        mailing_list.deactivate()
1022
 
        mailing_list.transitionToStatus(MailingListStatus.INACTIVE)
1023
 
        mailing_list.purge()
1024
 
        test_team, target_team = self._do_merge(
1025
 
            test_team, target_team, test_team.teamowner)
1026
 
        self.assertEqual(target_team, test_team.merged)
1027
 
 
1028
 
    def test_team_with_members(self):
1029
 
        # Team members are removed before merging.
1030
 
        target_team = self.factory.makeTeam()
1031
 
        test_team = self.factory.makeTeam()
1032
 
        former_member = self.factory.makePerson()
1033
 
        with person_logged_in(test_team.teamowner):
1034
 
            test_team.addMember(former_member, test_team.teamowner)
1035
 
        test_team, target_team = self._do_merge(
1036
 
            test_team, target_team, test_team.teamowner)
1037
 
        self.assertEqual(target_team, test_team.merged)
1038
 
        self.assertEqual([], list(former_member.super_teams))
1039
 
 
1040
 
    def test_team_without_super_teams_is_fine(self):
1041
 
        # A team with no members and no super teams
1042
 
        # merges without errors.
1043
 
        test_team = self.factory.makeTeam()
1044
 
        target_team = self.factory.makeTeam()
1045
 
        login_person(test_team.teamowner)
1046
 
        self._do_merge(test_team, target_team, test_team.teamowner)
1047
 
 
1048
 
    def test_team_with_super_teams(self):
1049
 
        # A team with superteams can be merged, but the memberships
1050
 
        # are not transferred.
1051
 
        test_team = self.factory.makeTeam()
1052
 
        super_team = self.factory.makeTeam()
1053
 
        target_team = self.factory.makeTeam()
1054
 
        login_person(test_team.teamowner)
1055
 
        test_team.join(super_team, test_team.teamowner)
1056
 
        test_team, target_team = self._do_merge(
1057
 
            test_team, target_team, test_team.teamowner)
1058
 
        self.assertEqual(target_team, test_team.merged)
1059
 
        self.assertEqual([], list(target_team.super_teams))
1060
 
 
1061
 
    def test_merge_moves_branches(self):
1062
 
        # When person/teams are merged, branches owned by the from person
1063
 
        # are moved.
1064
 
        person = self.factory.makePerson()
1065
 
        branch = self.factory.makeBranch()
1066
 
        duplicate = branch.owner
1067
 
        self._do_premerge(branch.owner, person)
1068
 
        login_person(person)
1069
 
        duplicate, person = self._do_merge(duplicate, person)
1070
 
        branches = person.getBranches()
1071
 
        self.assertEqual(1, branches.count())
1072
 
 
1073
 
    def test_merge_with_duplicated_branches(self):
1074
 
        # If both the from and to people have branches with the same name,
1075
 
        # merging renames the duplicate from the from person's side.
1076
 
        product = self.factory.makeProduct()
1077
 
        from_branch = self.factory.makeBranch(name='foo', product=product)
1078
 
        to_branch = self.factory.makeBranch(name='foo', product=product)
1079
 
        mergee = to_branch.owner
1080
 
        duplicate = from_branch.owner
1081
 
        self._do_premerge(duplicate, mergee)
1082
 
        login_person(mergee)
1083
 
        duplicate, mergee = self._do_merge(duplicate, mergee)
1084
 
        branches = [b.name for b in mergee.getBranches()]
1085
 
        self.assertEqual(2, len(branches))
1086
 
        self.assertContentEqual([u'foo', u'foo-1'], branches)
1087
 
 
1088
 
    def test_merge_moves_recipes(self):
1089
 
        # When person/teams are merged, recipes owned by the from person are
1090
 
        # moved.
1091
 
        person = self.factory.makePerson()
1092
 
        recipe = self.factory.makeSourcePackageRecipe()
1093
 
        duplicate = recipe.owner
1094
 
        # Delete the PPA, which is required for the merge to work.
1095
 
        with person_logged_in(duplicate):
1096
 
            recipe.owner.archive.status = ArchiveStatus.DELETED
1097
 
        self._do_premerge(duplicate, person)
1098
 
        login_person(person)
1099
 
        duplicate, person = self._do_merge(duplicate, person)
1100
 
        self.assertEqual(1, person.recipes.count())
1101
 
 
1102
 
    def test_merge_with_duplicated_recipes(self):
1103
 
        # If both the from and to people have recipes with the same name,
1104
 
        # merging renames the duplicate from the from person's side.
1105
 
        merge_from = self.factory.makeSourcePackageRecipe(
1106
 
            name=u'foo', description=u'FROM')
1107
 
        merge_to = self.factory.makeSourcePackageRecipe(
1108
 
            name=u'foo', description=u'TO')
1109
 
        duplicate = merge_from.owner
1110
 
        mergee = merge_to.owner
1111
 
        # Delete merge_from's PPA, which is required for the merge to work.
1112
 
        with person_logged_in(merge_from.owner):
1113
 
            merge_from.owner.archive.status = ArchiveStatus.DELETED
1114
 
        self._do_premerge(merge_from.owner, mergee)
1115
 
        login_person(mergee)
1116
 
        duplicate, mergee = self._do_merge(duplicate, mergee)
1117
 
        recipes = mergee.recipes
1118
 
        self.assertEqual(2, recipes.count())
1119
 
        descriptions = [r.description for r in recipes]
1120
 
        self.assertEqual([u'TO', u'FROM'], descriptions)
1121
 
        self.assertEqual(u'foo-1', recipes[1].name)
1122
 
 
1123
 
    def assertSubscriptionMerges(self, target):
1124
 
        # Given a subscription target, we want to make sure that subscriptions
1125
 
        # that the duplicate person made are carried over to the merged
1126
 
        # account.
1127
 
        duplicate = self.factory.makePerson()
1128
 
        with person_logged_in(duplicate):
1129
 
            target.addSubscription(duplicate, duplicate)
1130
 
        person = self.factory.makePerson()
1131
 
        self._do_premerge(duplicate, person)
1132
 
        login_person(person)
1133
 
        duplicate, person = self._do_merge(duplicate, person)
1134
 
        # The merged person has the subscription, and the duplicate person
1135
 
        # does not.
1136
 
        self.assertTrue(target.getSubscription(person) is not None)
1137
 
        self.assertTrue(target.getSubscription(duplicate) is None)
1138
 
 
1139
 
    def assertConflictingSubscriptionDeletes(self, target):
1140
 
        # Given a subscription target, we want to make sure that subscriptions
1141
 
        # that the duplicate person made that conflict with existing
1142
 
        # subscriptions in the merged account are deleted.
1143
 
        duplicate = self.factory.makePerson()
1144
 
        person = self.factory.makePerson()
1145
 
        with person_logged_in(duplicate):
1146
 
            target.addSubscription(duplicate, duplicate)
1147
 
        with person_logged_in(person):
1148
 
            # The description lets us show that we still have the right
1149
 
            # subscription later.
1150
 
            target.addBugSubscriptionFilter(person, person).description = (
1151
 
                u'a marker')
1152
 
        self._do_premerge(duplicate, person)
1153
 
        login_person(person)
1154
 
        duplicate, person = self._do_merge(duplicate, person)
1155
 
        # The merged person still has the original subscription, as shown
1156
 
        # by the marker name.
1157
 
        self.assertEqual(
1158
 
            target.getSubscription(person).bug_filters[0].description,
1159
 
            u'a marker')
1160
 
        # The conflicting subscription on the duplicate has been deleted.
1161
 
        self.assertTrue(target.getSubscription(duplicate) is None)
1162
 
 
1163
 
    def test_merge_with_product_subscription(self):
1164
 
        # See comments in assertSubscriptionMerges.
1165
 
        self.assertSubscriptionMerges(self.factory.makeProduct())
1166
 
 
1167
 
    def test_merge_with_conflicting_product_subscription(self):
1168
 
        # See comments in assertConflictingSubscriptionDeletes.
1169
 
        self.assertConflictingSubscriptionDeletes(self.factory.makeProduct())
1170
 
 
1171
 
    def test_merge_with_project_subscription(self):
1172
 
        # See comments in assertSubscriptionMerges.
1173
 
        self.assertSubscriptionMerges(self.factory.makeProject())
1174
 
 
1175
 
    def test_merge_with_conflicting_project_subscription(self):
1176
 
        # See comments in assertConflictingSubscriptionDeletes.
1177
 
        self.assertConflictingSubscriptionDeletes(self.factory.makeProject())
1178
 
 
1179
 
    def test_merge_with_distroseries_subscription(self):
1180
 
        # See comments in assertSubscriptionMerges.
1181
 
        self.assertSubscriptionMerges(self.factory.makeDistroSeries())
1182
 
 
1183
 
    def test_merge_with_conflicting_distroseries_subscription(self):
1184
 
        # See comments in assertConflictingSubscriptionDeletes.
1185
 
        self.assertConflictingSubscriptionDeletes(
1186
 
            self.factory.makeDistroSeries())
1187
 
 
1188
 
    def test_merge_with_milestone_subscription(self):
1189
 
        # See comments in assertSubscriptionMerges.
1190
 
        self.assertSubscriptionMerges(self.factory.makeMilestone())
1191
 
 
1192
 
    def test_merge_with_conflicting_milestone_subscription(self):
1193
 
        # See comments in assertConflictingSubscriptionDeletes.
1194
 
        self.assertConflictingSubscriptionDeletes(
1195
 
            self.factory.makeMilestone())
1196
 
 
1197
 
    def test_merge_with_productseries_subscription(self):
1198
 
        # See comments in assertSubscriptionMerges.
1199
 
        self.assertSubscriptionMerges(self.factory.makeProductSeries())
1200
 
 
1201
 
    def test_merge_with_conflicting_productseries_subscription(self):
1202
 
        # See comments in assertConflictingSubscriptionDeletes.
1203
 
        self.assertConflictingSubscriptionDeletes(
1204
 
            self.factory.makeProductSeries())
1205
 
 
1206
 
    def test_merge_with_distribution_subscription(self):
1207
 
        # See comments in assertSubscriptionMerges.
1208
 
        self.assertSubscriptionMerges(self.factory.makeDistribution())
1209
 
 
1210
 
    def test_merge_with_conflicting_distribution_subscription(self):
1211
 
        # See comments in assertConflictingSubscriptionDeletes.
1212
 
        self.assertConflictingSubscriptionDeletes(
1213
 
            self.factory.makeDistribution())
1214
 
 
1215
 
    def test_merge_with_sourcepackage_subscription(self):
1216
 
        # See comments in assertSubscriptionMerges.
1217
 
        dsp = self.factory.makeDistributionSourcePackage()
1218
 
        self.assertSubscriptionMerges(dsp)
1219
 
 
1220
 
    def test_merge_with_conflicting_sourcepackage_subscription(self):
1221
 
        # See comments in assertConflictingSubscriptionDeletes.
1222
 
        dsp = self.factory.makeDistributionSourcePackage()
1223
 
        self.assertConflictingSubscriptionDeletes(dsp)
1224
 
 
1225
 
    def test_merge_accesspolicygrants(self):
1226
 
        # AccessPolicyGrants are transferred from the duplicate.
1227
 
        person = self.factory.makePerson()
1228
 
        grant = self.factory.makeAccessPolicyGrant()
1229
 
        self._do_premerge(grant.grantee, person)
1230
 
        with person_logged_in(person):
1231
 
            self._do_merge(grant.grantee, person)
1232
 
        self.assertEqual(person, grant.grantee)
1233
 
 
1234
 
    def test_merge_accesspolicygrants_conflicts(self):
1235
 
        # Conflicting AccessPolicyGrants are deleted.
1236
 
        policy = self.factory.makeAccessPolicy()
1237
 
 
1238
 
        person = self.factory.makePerson()
1239
 
        person_grantor = self.factory.makePerson()
1240
 
        person_grant = self.factory.makeAccessPolicyGrant(
1241
 
            grantee=person, grantor=person_grantor, object=policy)
1242
 
 
1243
 
        duplicate = self.factory.makePerson()
1244
 
        duplicate_grantor = self.factory.makePerson()
1245
 
        duplicate_grant = self.factory.makeAccessPolicyGrant(
1246
 
            grantee=duplicate, grantor=duplicate_grantor, object=policy)
1247
 
 
1248
 
        self._do_premerge(duplicate, person)
1249
 
        with person_logged_in(person):
1250
 
            self._do_merge(duplicate, person)
1251
 
        transaction.commit()
1252
 
 
1253
 
        self.assertEqual(person, person_grant.grantee)
1254
 
        self.assertEqual(person_grantor, person_grant.grantor)
1255
 
        self.assertIs(
1256
 
            None,
1257
 
            IStore(AccessPolicyGrant).get(
1258
 
                AccessPolicyGrant, duplicate_grant.id))
1259
 
 
1260
 
    def test_mergeAsync(self):
1261
 
        # mergeAsync() creates a new `PersonMergeJob`.
1262
 
        from_person = self.factory.makePerson()
1263
 
        to_person = self.factory.makePerson()
1264
 
        login_person(from_person)
1265
 
        job = self.person_set.mergeAsync(from_person, to_person)
1266
 
        self.assertEqual(from_person, job.from_person)
1267
 
        self.assertEqual(to_person, job.to_person)
1268
 
 
1269
 
 
1270
 
class TestPersonSetCreateByOpenId(TestCaseWithFactory):
1271
 
    layer = DatabaseFunctionalLayer
1272
 
 
1273
 
    def setUp(self):
1274
 
        super(TestPersonSetCreateByOpenId, self).setUp()
1275
 
        self.person_set = getUtility(IPersonSet)
1276
 
        self.store = IMasterStore(Account)
1277
 
 
1278
 
        # Generate some valid test data.
1279
 
        self.account = self.makeAccount()
1280
 
        self.identifier = self.makeOpenIdIdentifier(self.account, u'whatever')
1281
 
        self.person = self.makePerson(self.account)
1282
 
        self.email = self.makeEmailAddress(
1283
 
            email='whatever@example.com', person=self.person)
1284
 
 
1285
 
    def makeAccount(self):
1286
 
        return self.store.add(Account(
1287
 
            displayname='Displayname',
1288
 
            creation_rationale=AccountCreationRationale.UNKNOWN,
1289
 
            status=AccountStatus.ACTIVE))
1290
 
 
1291
 
    def makeOpenIdIdentifier(self, account, identifier):
1292
 
        openid_identifier = OpenIdIdentifier()
1293
 
        openid_identifier.identifier = identifier
1294
 
        openid_identifier.account = account
1295
 
        return self.store.add(openid_identifier)
1296
 
 
1297
 
    def makePerson(self, account):
1298
 
        return self.store.add(Person(
1299
 
            name='acc%d' % account.id, account=account,
1300
 
            displayname='Displayname',
1301
 
            creation_rationale=PersonCreationRationale.UNKNOWN))
1302
 
 
1303
 
    def makeEmailAddress(self, email, person):
1304
 
            return self.store.add(EmailAddress(
1305
 
                email=email,
1306
 
                account=person.account,
1307
 
                person=person,
1308
 
                status=EmailAddressStatus.PREFERRED))
1309
 
 
1310
 
    def testAllValid(self):
1311
 
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1312
 
            self.identifier.identifier, self.email.email, 'Ignored Name',
1313
 
            PersonCreationRationale.UNKNOWN, 'No Comment')
1314
 
        found = removeSecurityProxy(found)
1315
 
 
1316
 
        self.assertIs(False, updated)
1317
 
        self.assertIs(self.person, found)
1318
 
        self.assertIs(self.account, found.account)
1319
 
        self.assertIs(self.email, found.preferredemail)
1320
 
        self.assertIs(self.email.account, self.account)
1321
 
        self.assertIs(self.email.person, self.person)
1322
 
        self.assertEqual(
1323
 
            [self.identifier], list(self.account.openid_identifiers))
1324
 
 
1325
 
    def testEmailAddressCaseInsensitive(self):
1326
 
        # As per testAllValid, but the email address used for the lookup
1327
 
        # is all upper case.
1328
 
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1329
 
            self.identifier.identifier, self.email.email.upper(),
1330
 
            'Ignored Name', PersonCreationRationale.UNKNOWN, 'No Comment')
1331
 
        found = removeSecurityProxy(found)
1332
 
 
1333
 
        self.assertIs(False, updated)
1334
 
        self.assertIs(self.person, found)
1335
 
        self.assertIs(self.account, found.account)
1336
 
        self.assertIs(self.email, found.preferredemail)
1337
 
        self.assertIs(self.email.account, self.account)
1338
 
        self.assertIs(self.email.person, self.person)
1339
 
        self.assertEqual(
1340
 
            [self.identifier], list(self.account.openid_identifiers))
1341
 
 
1342
 
    def testNewOpenId(self):
1343
 
        # Account looked up by email and the new OpenId identifier
1344
 
        # attached. We can do this because we trust our OpenId Provider.
1345
 
        new_identifier = u'newident'
1346
 
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1347
 
            new_identifier, self.email.email, 'Ignored Name',
1348
 
            PersonCreationRationale.UNKNOWN, 'No Comment')
1349
 
        found = removeSecurityProxy(found)
1350
 
 
1351
 
        self.assertIs(True, updated)
1352
 
        self.assertIs(self.person, found)
1353
 
        self.assertIs(self.account, found.account)
1354
 
        self.assertIs(self.email, found.preferredemail)
1355
 
        self.assertIs(self.email.account, self.account)
1356
 
        self.assertIs(self.email.person, self.person)
1357
 
 
1358
 
        # Old OpenId Identifier still attached.
1359
 
        self.assertIn(self.identifier, list(self.account.openid_identifiers))
1360
 
 
1361
 
        # So is our new one.
1362
 
        identifiers = [
1363
 
            identifier.identifier for identifier
1364
 
                in self.account.openid_identifiers]
1365
 
        self.assertIn(new_identifier, identifiers)
1366
 
 
1367
 
    def testNewEmailAddress(self):
1368
 
        # Account looked up by OpenId identifier and new EmailAddress
1369
 
        # attached. We can do this because we trust our OpenId Provider.
1370
 
        new_email = u'new_email@example.com'
1371
 
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1372
 
            self.identifier.identifier, new_email, 'Ignored Name',
1373
 
            PersonCreationRationale.UNKNOWN, 'No Comment')
1374
 
        found = removeSecurityProxy(found)
1375
 
 
1376
 
        self.assertIs(True, updated)
1377
 
        self.assertIs(self.person, found)
1378
 
        self.assertIs(self.account, found.account)
1379
 
        self.assertEqual(
1380
 
            [self.identifier], list(self.account.openid_identifiers))
1381
 
 
1382
 
        # The old email address is still there and correctly linked.
1383
 
        self.assertIs(self.email, found.preferredemail)
1384
 
        self.assertIs(self.email.account, self.account)
1385
 
        self.assertIs(self.email.person, self.person)
1386
 
 
1387
 
        # The new email address is there too and correctly linked.
1388
 
        new_email = self.store.find(EmailAddress, email=new_email).one()
1389
 
        self.assertIs(new_email.account, self.account)
1390
 
        self.assertIs(new_email.person, self.person)
1391
 
        self.assertEqual(EmailAddressStatus.NEW, new_email.status)
1392
 
 
1393
 
    def testNewAccountAndIdentifier(self):
1394
 
        # If neither the OpenId Identifier nor the email address are
1395
 
        # found, we create everything.
1396
 
        new_email = u'new_email@example.com'
1397
 
        new_identifier = u'new_identifier'
1398
 
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1399
 
            new_identifier, new_email, 'New Name',
1400
 
            PersonCreationRationale.UNKNOWN, 'No Comment')
1401
 
        found = removeSecurityProxy(found)
1402
 
 
1403
 
        # We have a new Person
1404
 
        self.assertIs(True, updated)
1405
 
        self.assertIsNot(None, found)
1406
 
 
1407
 
        # It is correctly linked to an account, emailaddress and
1408
 
        # identifier.
1409
 
        self.assertIs(found, found.preferredemail.person)
1410
 
        self.assertIs(found.account, found.preferredemail.account)
1411
 
        self.assertEqual(
1412
 
            new_identifier, found.account.openid_identifiers.any().identifier)
1413
 
 
1414
 
    def testNoPerson(self):
1415
 
        # If the account is not linked to a Person, create one. ShipIt
1416
 
        # users fall into this category the first time they log into
1417
 
        # Launchpad.
1418
 
        self.email.person = None
1419
 
        self.person.account = None
1420
 
 
1421
 
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1422
 
            self.identifier.identifier, self.email.email, 'New Name',
1423
 
            PersonCreationRationale.UNKNOWN, 'No Comment')
1424
 
        found = removeSecurityProxy(found)
1425
 
 
1426
 
        # We have a new Person
1427
 
        self.assertIs(True, updated)
1428
 
        self.assertIsNot(self.person, found)
1429
 
 
1430
 
        # It is correctly linked to an account, emailaddress and
1431
 
        # identifier.
1432
 
        self.assertIs(found, found.preferredemail.person)
1433
 
        self.assertIs(found.account, found.preferredemail.account)
1434
 
        self.assertIn(self.identifier, list(found.account.openid_identifiers))
1435
 
 
1436
 
    def testNoAccount(self):
1437
 
        # EmailAddress is linked to a Person, but there is no Account.
1438
 
        # Convert this stub into something valid.
1439
 
        self.email.account = None
1440
 
        self.email.status = EmailAddressStatus.NEW
1441
 
        self.person.account = None
1442
 
        new_identifier = u'new_identifier'
1443
 
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1444
 
            new_identifier, self.email.email, 'Ignored',
1445
 
            PersonCreationRationale.UNKNOWN, 'No Comment')
1446
 
        found = removeSecurityProxy(found)
1447
 
 
1448
 
        self.assertIs(True, updated)
1449
 
 
1450
 
        self.assertIsNot(None, found.account)
1451
 
        self.assertEqual(
1452
 
            new_identifier, found.account.openid_identifiers.any().identifier)
1453
 
        self.assertIs(self.email.person, found)
1454
 
        self.assertIs(self.email.account, found.account)
1455
 
        self.assertEqual(EmailAddressStatus.PREFERRED, self.email.status)
1456
 
 
1457
 
    def testMovedEmailAddress(self):
1458
 
        # The EmailAddress and OpenId Identifier are both in the
1459
 
        # database, but they are not linked to the same account. The
1460
 
        # identifier needs to be relinked to the correct account - the
1461
 
        # user able to log into the trusted SSO with that email address
1462
 
        # should be able to log into Launchpad with that email address.
1463
 
        # This lets us cope with the SSO migrating email addresses
1464
 
        # between SSO accounts.
1465
 
        self.identifier.account = self.store.find(
1466
 
            Account, displayname='Foo Bar').one()
1467
 
 
1468
 
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
1469
 
            self.identifier.identifier, self.email.email, 'New Name',
1470
 
            PersonCreationRationale.UNKNOWN, 'No Comment')
1471
 
        found = removeSecurityProxy(found)
1472
 
 
1473
 
        self.assertIs(True, updated)
1474
 
        self.assertIs(self.person, found)
1475
 
 
1476
 
        self.assertIs(found.account, self.identifier.account)
1477
 
        self.assertIn(self.identifier, list(found.account.openid_identifiers))
1478
 
 
1479
 
 
1480
 
class TestCreatePersonAndEmail(TestCase):
1481
 
    """Test `IPersonSet`.createPersonAndEmail()."""
1482
 
    layer = DatabaseFunctionalLayer
1483
 
 
1484
 
    def setUp(self):
1485
 
        TestCase.setUp(self)
1486
 
        login(ANONYMOUS)
1487
 
        self.addCleanup(logout)
1488
 
        self.person_set = getUtility(IPersonSet)
1489
 
 
1490
 
    def test_duplicated_name_not_accepted(self):
1491
 
        self.person_set.createPersonAndEmail(
1492
 
            'testing@example.com', PersonCreationRationale.UNKNOWN,
1493
 
            name='zzzz')
1494
 
        self.assertRaises(
1495
 
            NameAlreadyTaken, self.person_set.createPersonAndEmail,
1496
 
            'testing2@example.com', PersonCreationRationale.UNKNOWN,
1497
 
            name='zzzz')
1498
 
 
1499
 
    def test_duplicated_email_not_accepted(self):
1500
 
        self.person_set.createPersonAndEmail(
1501
 
            'testing@example.com', PersonCreationRationale.UNKNOWN)
1502
 
        self.assertRaises(
1503
 
            EmailAddressAlreadyTaken, self.person_set.createPersonAndEmail,
1504
 
            'testing@example.com', PersonCreationRationale.UNKNOWN)
1505
 
 
1506
 
    def test_invalid_email_not_accepted(self):
1507
 
        self.assertRaises(
1508
 
            InvalidEmailAddress, self.person_set.createPersonAndEmail,
1509
 
            'testing@.com', PersonCreationRationale.UNKNOWN)
1510
 
 
1511
 
    def test_invalid_name_not_accepted(self):
1512
 
        self.assertRaises(
1513
 
            InvalidName, self.person_set.createPersonAndEmail,
1514
 
            'testing@example.com', PersonCreationRationale.UNKNOWN,
1515
 
            name='/john')
1516
 
 
1517
 
 
1518
719
class TestPersonRelatedBugTaskSearch(TestCaseWithFactory):
1519
720
 
1520
721
    layer = DatabaseFunctionalLayer
1630
831
            assignee=self.user)
1631
832
 
1632
833
 
 
834
class KarmaTestMixin:
 
835
    """Helper methods for setting karma."""
 
836
 
 
837
    def _makeKarmaCache(self, person, product, category_name_values):
 
838
        """Create a KarmaCache entry with the given arguments.
 
839
 
 
840
        In order to create the KarmaCache record we must switch to the DB
 
841
        user 'karma'. This invalidates the objects under test so they
 
842
        must be retrieved again.
 
843
        """
 
844
        with dbuser('karma'):
 
845
            total = 0
 
846
            # Insert category total for person and project.
 
847
            for category_name, value in category_name_values:
 
848
                category = KarmaCategory.byName(category_name)
 
849
                self.cache_manager.new(
 
850
                    value, person.id, category.id, product_id=product.id)
 
851
                total += value
 
852
            # Insert total cache for person and project.
 
853
            self.cache_manager.new(
 
854
                total, person.id, None, product_id=product.id)
 
855
 
 
856
    def _makeKarmaTotalCache(self, person, total):
 
857
        """Create a KarmaTotalCache entry.
 
858
 
 
859
        In order to create the KarmaTotalCache record we must switch to the DB
 
860
        user 'karma'. This invalidates the objects under test so they
 
861
        must be retrieved again.
 
862
        """
 
863
        with dbuser('karma'):
 
864
            KarmaTotalCache(person=person.id, karma_total=total)
 
865
 
 
866
 
1633
867
class TestPersonKarma(TestCaseWithFactory, KarmaTestMixin):
1634
868
 
1635
869
    layer = DatabaseFunctionalLayer