~launchpad-pqm/launchpad/devel

« back to all changes in this revision

Viewing changes to lib/lp/registry/tests/test_personset.py

  • Committer: Curtis Hovey
  • Date: 2012-01-06 15:14:48 UTC
  • mto: This revision was merged to the branch mainline in revision 14651.
  • Revision ID: curtis.hovey@canonical.com-20120106151448-ie2e51xxf5ap2s2l
Moved personset tests to test_personset.

Show diffs side-by-side

added added

removed removed

Lines of Context:
5
5
 
6
6
__metaclass__ = type
7
7
 
 
8
from datetime import datetime
 
9
 
8
10
import transaction
 
11
 
 
12
import pytz
 
13
 
 
14
from testtools.matchers import (
 
15
    LessThan,
 
16
    )
 
17
 
9
18
from zope.component import getUtility
10
19
from zope.security.proxy import removeSecurityProxy
11
20
 
12
21
from lp.code.tests.helpers import remove_all_sample_data_branches
 
22
from lp.registry.errors import (
 
23
    InvalidName,
 
24
    NameAlreadyTaken,
 
25
    )
 
26
from lp.registry.interfaces.karma import IKarmaCacheManager
 
27
from lp.registry.interfaces.mailinglist import MailingListStatus
13
28
from lp.registry.interfaces.mailinglistsubscription import (
14
29
    MailingListAutoSubscribePolicy,
15
30
    )
 
31
from lp.registry.interfaces.nameblacklist import INameBlacklistSet
16
32
from lp.registry.interfaces.person import (
17
33
    IPersonSet,
18
34
    PersonCreationRationale,
19
 
    )
20
 
from lp.registry.model.person import PersonSet
 
35
    PersonVisibility,
 
36
    )
 
37
from lp.registry.interfaces.personnotification import IPersonNotificationSet
 
38
from lp.registry.model.accesspolicy import AccessPolicyGrant
 
39
from lp.registry.model.person import (
 
40
    Person,
 
41
    PersonSet,
 
42
    )
 
43
from lp.registry.tests.test_person import KarmaTestMixin
 
44
from lp.services.config import config
 
45
from lp.services.database.lpstorm import (
 
46
    IMasterStore,
 
47
    IStore,
 
48
    )
21
49
from lp.services.database.sqlbase import cursor
22
50
from lp.services.identity.interfaces.account import (
 
51
    AccountCreationRationale,
23
52
    AccountStatus,
24
53
    AccountSuspendedError,
25
54
    )
 
55
from lp.services.identity.interfaces.emailaddress import (
 
56
    EmailAddressAlreadyTaken,
 
57
    EmailAddressStatus,
 
58
    IEmailAddressSet,
 
59
    InvalidEmailAddress,
 
60
    )
 
61
from lp.services.identity.model.account import Account
 
62
from lp.services.identity.model.emailaddress import EmailAddress
 
63
from lp.services.openid.model.openididentifier import OpenIdIdentifier
 
64
from lp.soyuz.enums import (
 
65
    ArchiveStatus,
 
66
    )
26
67
from lp.testing import (
 
68
    ANONYMOUS,
 
69
    celebrity_logged_in,
 
70
    login,
 
71
    login_person,
 
72
    logout,
27
73
    person_logged_in,
 
74
    StormStatementRecorder,
 
75
    TestCase,
28
76
    TestCaseWithFactory,
29
77
    )
 
78
 
 
79
from lp.testing.dbuser import dbuser
30
80
from lp.testing.layers import DatabaseFunctionalLayer
 
81
from lp.testing.matchers import HasQueryCount
 
82
 
 
83
 
 
84
class TestPersonSet(TestCaseWithFactory):
 
85
    """Test `IPersonSet`."""
 
86
    layer = DatabaseFunctionalLayer
 
87
 
 
88
    def setUp(self):
 
89
        super(TestPersonSet, self).setUp()
 
90
        login(ANONYMOUS)
 
91
        self.addCleanup(logout)
 
92
        self.person_set = getUtility(IPersonSet)
 
93
 
 
94
    def test_isNameBlacklisted(self):
 
95
        cursor().execute(
 
96
            "INSERT INTO NameBlacklist(id, regexp) VALUES (-100, 'foo')")
 
97
        self.failUnless(self.person_set.isNameBlacklisted('foo'))
 
98
        self.failIf(self.person_set.isNameBlacklisted('bar'))
 
99
 
 
100
    def test_isNameBlacklisted_user_is_admin(self):
 
101
        team = self.factory.makeTeam()
 
102
        name_blacklist_set = getUtility(INameBlacklistSet)
 
103
        self.admin_exp = name_blacklist_set.create(u'fnord', admin=team)
 
104
        self.store = IStore(self.admin_exp)
 
105
        self.store.flush()
 
106
        user = team.teamowner
 
107
        self.assertFalse(self.person_set.isNameBlacklisted('fnord', user))
 
108
 
 
109
    def test_getByEmail_ignores_case_and_whitespace(self):
 
110
        person1_email = 'foo.bar@canonical.com'
 
111
        person1 = self.person_set.getByEmail(person1_email)
 
112
        self.failIf(
 
113
            person1 is None,
 
114
            "PersonSet.getByEmail() could not find %r" % person1_email)
 
115
 
 
116
        person2 = self.person_set.getByEmail('  foo.BAR@canonICAL.com  ')
 
117
        self.failIf(
 
118
            person2 is None,
 
119
            "PersonSet.getByEmail() should ignore case and whitespace.")
 
120
        self.assertEqual(person1, person2)
 
121
 
 
122
    def test_getPrecachedPersonsFromIDs(self):
 
123
        # The getPrecachedPersonsFromIDs() method should only make one
 
124
        # query to load all the extraneous data. Accessing the
 
125
        # attributes should then cause zero queries.
 
126
        person_ids = [
 
127
            self.factory.makePerson().id
 
128
            for i in range(3)]
 
129
 
 
130
        with StormStatementRecorder() as recorder:
 
131
            persons = list(self.person_set.getPrecachedPersonsFromIDs(
 
132
                person_ids, need_karma=True, need_ubuntu_coc=True,
 
133
                need_location=True, need_archive=True,
 
134
                need_preferred_email=True, need_validity=True))
 
135
        self.assertThat(recorder, HasQueryCount(LessThan(2)))
 
136
 
 
137
        with StormStatementRecorder() as recorder:
 
138
            for person in persons:
 
139
                person.is_valid_person
 
140
                person.karma
 
141
                person.is_ubuntu_coc_signer
 
142
                person.location
 
143
                person.archive
 
144
                person.preferredemail
 
145
        self.assertThat(recorder, HasQueryCount(LessThan(1)))
 
146
 
 
147
    def test_latest_teams_public(self):
 
148
        # Anyone can see the latest 5 teams if they are public.
 
149
        teams = []
 
150
        for num in xrange(1, 7):
 
151
            teams.append(self.factory.makeTeam(name='team-%s' % num))
 
152
        teams.reverse()
 
153
        result = self.person_set.latest_teams()
 
154
        self.assertEqual(teams[0:5], list(result))
 
155
 
 
156
    def test_latest_teams_private(self):
 
157
        # Private teams are only included in the latest teams if the
 
158
        # user can view the team.
 
159
        teams = []
 
160
        for num in xrange(1, 7):
 
161
            teams.append(self.factory.makeTeam(name='team-%s' % num))
 
162
        owner = self.factory.makePerson()
 
163
        teams.append(
 
164
            self.factory.makeTeam(
 
165
                name='private-team', owner=owner,
 
166
                visibility=PersonVisibility.PRIVATE))
 
167
        teams.reverse()
 
168
        login_person(owner)
 
169
        result = self.person_set.latest_teams()
 
170
        self.assertEqual(teams[0:5], list(result))
 
171
        login_person(self.factory.makePerson())
 
172
        result = self.person_set.latest_teams()
 
173
        self.assertEqual(teams[1:6], list(result))
 
174
 
 
175
    def test_latest_teams_limit(self):
 
176
        # The limit controls the number of latest teams returned.
 
177
        teams = []
 
178
        for num in xrange(1, 7):
 
179
            teams.append(self.factory.makeTeam(name='team-%s' % num))
 
180
        teams.reverse()
 
181
        result = self.person_set.latest_teams(limit=3)
 
182
        self.assertEqual(teams[0:3], list(result))
 
183
 
 
184
 
 
185
class TestPersonSetMergeMailingListSubscriptions(TestCaseWithFactory):
 
186
 
 
187
    layer = DatabaseFunctionalLayer
 
188
 
 
189
    def setUp(self):
 
190
        TestCaseWithFactory.setUp(self)
 
191
        # Use the unsecured PersonSet so that private methods can be tested.
 
192
        self.person_set = PersonSet()
 
193
        self.from_person = self.factory.makePerson()
 
194
        self.to_person = self.factory.makePerson()
 
195
        self.cur = cursor()
 
196
 
 
197
    def test__mergeMailingListSubscriptions_no_subscriptions(self):
 
198
        self.person_set._mergeMailingListSubscriptions(
 
199
            self.cur, self.from_person.id, self.to_person.id)
 
200
        self.assertEqual(0, self.cur.rowcount)
 
201
 
 
202
    def test__mergeMailingListSubscriptions_with_subscriptions(self):
 
203
        naked_person = removeSecurityProxy(self.from_person)
 
204
        naked_person.mailing_list_auto_subscribe_policy = (
 
205
            MailingListAutoSubscribePolicy.ALWAYS)
 
206
        self.team, self.mailing_list = self.factory.makeTeamAndMailingList(
 
207
            'test-mailinglist', 'team-owner')
 
208
        with person_logged_in(self.team.teamowner):
 
209
            self.team.addMember(
 
210
                self.from_person, reviewer=self.team.teamowner)
 
211
        transaction.commit()
 
212
        self.person_set._mergeMailingListSubscriptions(
 
213
            self.cur, self.from_person.id, self.to_person.id)
 
214
        self.assertEqual(1, self.cur.rowcount)
 
215
 
 
216
 
 
217
class TestPersonSetMerge(TestCaseWithFactory, KarmaTestMixin):
 
218
    """Test cases for PersonSet merge."""
 
219
 
 
220
    layer = DatabaseFunctionalLayer
 
221
 
 
222
    def setUp(self):
 
223
        super(TestPersonSetMerge, self).setUp()
 
224
        self.person_set = getUtility(IPersonSet)
 
225
 
 
226
    def _do_premerge(self, from_person, to_person):
 
227
        # Do the pre merge work performed by the LoginToken.
 
228
        with celebrity_logged_in('admin'):
 
229
            email = from_person.preferredemail
 
230
            email.status = EmailAddressStatus.NEW
 
231
            email.person = to_person
 
232
            email.account = to_person.account
 
233
        transaction.commit()
 
234
 
 
235
    def _do_merge(self, from_person, to_person, reviewer=None):
 
236
        # Perform the merge as the db user that will be used by the jobs.
 
237
        with dbuser(config.IPersonMergeJobSource.dbuser):
 
238
            self.person_set.merge(from_person, to_person, reviewer=reviewer)
 
239
        return from_person, to_person
 
240
 
 
241
    def _get_testable_account(self, person, date_created, openid_identifier):
 
242
        # Return a naked account with predictable attributes.
 
243
        account = removeSecurityProxy(person.account)
 
244
        account.date_created = date_created
 
245
        account.openid_identifier = openid_identifier
 
246
        return account
 
247
 
 
248
    def test_delete_no_notifications(self):
 
249
        team = self.factory.makeTeam()
 
250
        owner = team.teamowner
 
251
        transaction.commit()
 
252
        with dbuser(config.IPersonMergeJobSource.dbuser):
 
253
            self.person_set.delete(team, owner)
 
254
        notification_set = getUtility(IPersonNotificationSet)
 
255
        notifications = notification_set.getNotificationsToSend()
 
256
        self.assertEqual(0, notifications.count())
 
257
 
 
258
    def test_openid_identifiers(self):
 
259
        # Verify that OpenId Identifiers are merged.
 
260
        duplicate = self.factory.makePerson()
 
261
        duplicate_identifier = removeSecurityProxy(
 
262
            duplicate.account).openid_identifiers.any().identifier
 
263
        person = self.factory.makePerson()
 
264
        person_identifier = removeSecurityProxy(
 
265
            person.account).openid_identifiers.any().identifier
 
266
        self._do_premerge(duplicate, person)
 
267
        login_person(person)
 
268
        duplicate, person = self._do_merge(duplicate, person)
 
269
        self.assertEqual(
 
270
            0,
 
271
            removeSecurityProxy(duplicate.account).openid_identifiers.count())
 
272
 
 
273
        merged_identifiers = [
 
274
            identifier.identifier for identifier in
 
275
                removeSecurityProxy(person.account).openid_identifiers]
 
276
 
 
277
        self.assertIn(duplicate_identifier, merged_identifiers)
 
278
        self.assertIn(person_identifier, merged_identifiers)
 
279
 
 
280
    def test_karmacache_transferred_to_user_has_no_karma(self):
 
281
        # Verify that the merged user has no KarmaCache entries,
 
282
        # and the karma total was transfered.
 
283
        self.cache_manager = getUtility(IKarmaCacheManager)
 
284
        product = self.factory.makeProduct()
 
285
        duplicate = self.factory.makePerson()
 
286
        self._makeKarmaCache(
 
287
            duplicate, product, [('bugs', 10)])
 
288
        self._makeKarmaTotalCache(duplicate, 15)
 
289
        # The karma changes invalidated duplicate instance.
 
290
        duplicate = self.person_set.get(duplicate.id)
 
291
        person = self.factory.makePerson()
 
292
        self._do_premerge(duplicate, person)
 
293
        login_person(person)
 
294
        duplicate, person = self._do_merge(duplicate, person)
 
295
        self.assertEqual([], duplicate.karma_category_caches)
 
296
        self.assertEqual(0, duplicate.karma)
 
297
        self.assertEqual(15, person.karma)
 
298
 
 
299
    def test_karmacache_transferred_to_user_has_karma(self):
 
300
        # Verify that the merged user has no KarmaCache entries,
 
301
        # and the karma total was summed.
 
302
        self.cache_manager = getUtility(IKarmaCacheManager)
 
303
        product = self.factory.makeProduct()
 
304
        duplicate = self.factory.makePerson()
 
305
        self._makeKarmaCache(
 
306
            duplicate, product, [('bugs', 10)])
 
307
        self._makeKarmaTotalCache(duplicate, 15)
 
308
        person = self.factory.makePerson()
 
309
        self._makeKarmaCache(
 
310
            person, product, [('bugs', 9)])
 
311
        self._makeKarmaTotalCache(person, 13)
 
312
        # The karma changes invalidated duplicate and person instances.
 
313
        duplicate = self.person_set.get(duplicate.id)
 
314
        person = self.person_set.get(person.id)
 
315
        self._do_premerge(duplicate, person)
 
316
        login_person(person)
 
317
        duplicate, person = self._do_merge(duplicate, person)
 
318
        self.assertEqual([], duplicate.karma_category_caches)
 
319
        self.assertEqual(0, duplicate.karma)
 
320
        self.assertEqual(28, person.karma)
 
321
 
 
322
    def test_person_date_created_preserved(self):
 
323
        # Verify that the oldest datecreated is merged.
 
324
        person = self.factory.makePerson()
 
325
        duplicate = self.factory.makePerson()
 
326
        oldest_date = datetime(
 
327
            2005, 11, 25, 0, 0, 0, 0, pytz.timezone('UTC'))
 
328
        removeSecurityProxy(duplicate).datecreated = oldest_date
 
329
        self._do_premerge(duplicate, person)
 
330
        login_person(person)
 
331
        duplicate, person = self._do_merge(duplicate, person)
 
332
        self.assertEqual(oldest_date, person.datecreated)
 
333
 
 
334
    def test_team_with_active_mailing_list_raises_error(self):
 
335
        # A team with an active mailing list cannot be merged.
 
336
        target_team = self.factory.makeTeam()
 
337
        test_team = self.factory.makeTeam()
 
338
        self.factory.makeMailingList(
 
339
            test_team, test_team.teamowner)
 
340
        self.assertRaises(
 
341
            AssertionError, self.person_set.merge, test_team, target_team)
 
342
 
 
343
    def test_team_with_inactive_mailing_list(self):
 
344
        # A team with an inactive mailing list can be merged.
 
345
        target_team = self.factory.makeTeam()
 
346
        test_team = self.factory.makeTeam()
 
347
        mailing_list = self.factory.makeMailingList(
 
348
            test_team, test_team.teamowner)
 
349
        mailing_list.deactivate()
 
350
        mailing_list.transitionToStatus(MailingListStatus.INACTIVE)
 
351
        test_team, target_team = self._do_merge(
 
352
            test_team, target_team, test_team.teamowner)
 
353
        self.assertEqual(target_team, test_team.merged)
 
354
        self.assertEqual(
 
355
            MailingListStatus.PURGED, test_team.mailing_list.status)
 
356
        emails = getUtility(IEmailAddressSet).getByPerson(target_team).count()
 
357
        self.assertEqual(0, emails)
 
358
 
 
359
    def test_team_with_purged_mailing_list(self):
 
360
        # A team with a purges mailing list can be merged.
 
361
        target_team = self.factory.makeTeam()
 
362
        test_team = self.factory.makeTeam()
 
363
        mailing_list = self.factory.makeMailingList(
 
364
            test_team, test_team.teamowner)
 
365
        mailing_list.deactivate()
 
366
        mailing_list.transitionToStatus(MailingListStatus.INACTIVE)
 
367
        mailing_list.purge()
 
368
        test_team, target_team = self._do_merge(
 
369
            test_team, target_team, test_team.teamowner)
 
370
        self.assertEqual(target_team, test_team.merged)
 
371
 
 
372
    def test_team_with_members(self):
 
373
        # Team members are removed before merging.
 
374
        target_team = self.factory.makeTeam()
 
375
        test_team = self.factory.makeTeam()
 
376
        former_member = self.factory.makePerson()
 
377
        with person_logged_in(test_team.teamowner):
 
378
            test_team.addMember(former_member, test_team.teamowner)
 
379
        test_team, target_team = self._do_merge(
 
380
            test_team, target_team, test_team.teamowner)
 
381
        self.assertEqual(target_team, test_team.merged)
 
382
        self.assertEqual([], list(former_member.super_teams))
 
383
 
 
384
    def test_team_without_super_teams_is_fine(self):
 
385
        # A team with no members and no super teams
 
386
        # merges without errors.
 
387
        test_team = self.factory.makeTeam()
 
388
        target_team = self.factory.makeTeam()
 
389
        login_person(test_team.teamowner)
 
390
        self._do_merge(test_team, target_team, test_team.teamowner)
 
391
 
 
392
    def test_team_with_super_teams(self):
 
393
        # A team with superteams can be merged, but the memberships
 
394
        # are not transferred.
 
395
        test_team = self.factory.makeTeam()
 
396
        super_team = self.factory.makeTeam()
 
397
        target_team = self.factory.makeTeam()
 
398
        login_person(test_team.teamowner)
 
399
        test_team.join(super_team, test_team.teamowner)
 
400
        test_team, target_team = self._do_merge(
 
401
            test_team, target_team, test_team.teamowner)
 
402
        self.assertEqual(target_team, test_team.merged)
 
403
        self.assertEqual([], list(target_team.super_teams))
 
404
 
 
405
    def test_merge_moves_branches(self):
 
406
        # When person/teams are merged, branches owned by the from person
 
407
        # are moved.
 
408
        person = self.factory.makePerson()
 
409
        branch = self.factory.makeBranch()
 
410
        duplicate = branch.owner
 
411
        self._do_premerge(branch.owner, person)
 
412
        login_person(person)
 
413
        duplicate, person = self._do_merge(duplicate, person)
 
414
        branches = person.getBranches()
 
415
        self.assertEqual(1, branches.count())
 
416
 
 
417
    def test_merge_with_duplicated_branches(self):
 
418
        # If both the from and to people have branches with the same name,
 
419
        # merging renames the duplicate from the from person's side.
 
420
        product = self.factory.makeProduct()
 
421
        from_branch = self.factory.makeBranch(name='foo', product=product)
 
422
        to_branch = self.factory.makeBranch(name='foo', product=product)
 
423
        mergee = to_branch.owner
 
424
        duplicate = from_branch.owner
 
425
        self._do_premerge(duplicate, mergee)
 
426
        login_person(mergee)
 
427
        duplicate, mergee = self._do_merge(duplicate, mergee)
 
428
        branches = [b.name for b in mergee.getBranches()]
 
429
        self.assertEqual(2, len(branches))
 
430
        self.assertContentEqual([u'foo', u'foo-1'], branches)
 
431
 
 
432
    def test_merge_moves_recipes(self):
 
433
        # When person/teams are merged, recipes owned by the from person are
 
434
        # moved.
 
435
        person = self.factory.makePerson()
 
436
        recipe = self.factory.makeSourcePackageRecipe()
 
437
        duplicate = recipe.owner
 
438
        # Delete the PPA, which is required for the merge to work.
 
439
        with person_logged_in(duplicate):
 
440
            recipe.owner.archive.status = ArchiveStatus.DELETED
 
441
        self._do_premerge(duplicate, person)
 
442
        login_person(person)
 
443
        duplicate, person = self._do_merge(duplicate, person)
 
444
        self.assertEqual(1, person.recipes.count())
 
445
 
 
446
    def test_merge_with_duplicated_recipes(self):
 
447
        # If both the from and to people have recipes with the same name,
 
448
        # merging renames the duplicate from the from person's side.
 
449
        merge_from = self.factory.makeSourcePackageRecipe(
 
450
            name=u'foo', description=u'FROM')
 
451
        merge_to = self.factory.makeSourcePackageRecipe(
 
452
            name=u'foo', description=u'TO')
 
453
        duplicate = merge_from.owner
 
454
        mergee = merge_to.owner
 
455
        # Delete merge_from's PPA, which is required for the merge to work.
 
456
        with person_logged_in(merge_from.owner):
 
457
            merge_from.owner.archive.status = ArchiveStatus.DELETED
 
458
        self._do_premerge(merge_from.owner, mergee)
 
459
        login_person(mergee)
 
460
        duplicate, mergee = self._do_merge(duplicate, mergee)
 
461
        recipes = mergee.recipes
 
462
        self.assertEqual(2, recipes.count())
 
463
        descriptions = [r.description for r in recipes]
 
464
        self.assertEqual([u'TO', u'FROM'], descriptions)
 
465
        self.assertEqual(u'foo-1', recipes[1].name)
 
466
 
 
467
    def assertSubscriptionMerges(self, target):
 
468
        # Given a subscription target, we want to make sure that subscriptions
 
469
        # that the duplicate person made are carried over to the merged
 
470
        # account.
 
471
        duplicate = self.factory.makePerson()
 
472
        with person_logged_in(duplicate):
 
473
            target.addSubscription(duplicate, duplicate)
 
474
        person = self.factory.makePerson()
 
475
        self._do_premerge(duplicate, person)
 
476
        login_person(person)
 
477
        duplicate, person = self._do_merge(duplicate, person)
 
478
        # The merged person has the subscription, and the duplicate person
 
479
        # does not.
 
480
        self.assertTrue(target.getSubscription(person) is not None)
 
481
        self.assertTrue(target.getSubscription(duplicate) is None)
 
482
 
 
483
    def assertConflictingSubscriptionDeletes(self, target):
 
484
        # Given a subscription target, we want to make sure that subscriptions
 
485
        # that the duplicate person made that conflict with existing
 
486
        # subscriptions in the merged account are deleted.
 
487
        duplicate = self.factory.makePerson()
 
488
        person = self.factory.makePerson()
 
489
        with person_logged_in(duplicate):
 
490
            target.addSubscription(duplicate, duplicate)
 
491
        with person_logged_in(person):
 
492
            # The description lets us show that we still have the right
 
493
            # subscription later.
 
494
            target.addBugSubscriptionFilter(person, person).description = (
 
495
                u'a marker')
 
496
        self._do_premerge(duplicate, person)
 
497
        login_person(person)
 
498
        duplicate, person = self._do_merge(duplicate, person)
 
499
        # The merged person still has the original subscription, as shown
 
500
        # by the marker name.
 
501
        self.assertEqual(
 
502
            target.getSubscription(person).bug_filters[0].description,
 
503
            u'a marker')
 
504
        # The conflicting subscription on the duplicate has been deleted.
 
505
        self.assertTrue(target.getSubscription(duplicate) is None)
 
506
 
 
507
    def test_merge_with_product_subscription(self):
 
508
        # See comments in assertSubscriptionMerges.
 
509
        self.assertSubscriptionMerges(self.factory.makeProduct())
 
510
 
 
511
    def test_merge_with_conflicting_product_subscription(self):
 
512
        # See comments in assertConflictingSubscriptionDeletes.
 
513
        self.assertConflictingSubscriptionDeletes(self.factory.makeProduct())
 
514
 
 
515
    def test_merge_with_project_subscription(self):
 
516
        # See comments in assertSubscriptionMerges.
 
517
        self.assertSubscriptionMerges(self.factory.makeProject())
 
518
 
 
519
    def test_merge_with_conflicting_project_subscription(self):
 
520
        # See comments in assertConflictingSubscriptionDeletes.
 
521
        self.assertConflictingSubscriptionDeletes(self.factory.makeProject())
 
522
 
 
523
    def test_merge_with_distroseries_subscription(self):
 
524
        # See comments in assertSubscriptionMerges.
 
525
        self.assertSubscriptionMerges(self.factory.makeDistroSeries())
 
526
 
 
527
    def test_merge_with_conflicting_distroseries_subscription(self):
 
528
        # See comments in assertConflictingSubscriptionDeletes.
 
529
        self.assertConflictingSubscriptionDeletes(
 
530
            self.factory.makeDistroSeries())
 
531
 
 
532
    def test_merge_with_milestone_subscription(self):
 
533
        # See comments in assertSubscriptionMerges.
 
534
        self.assertSubscriptionMerges(self.factory.makeMilestone())
 
535
 
 
536
    def test_merge_with_conflicting_milestone_subscription(self):
 
537
        # See comments in assertConflictingSubscriptionDeletes.
 
538
        self.assertConflictingSubscriptionDeletes(
 
539
            self.factory.makeMilestone())
 
540
 
 
541
    def test_merge_with_productseries_subscription(self):
 
542
        # See comments in assertSubscriptionMerges.
 
543
        self.assertSubscriptionMerges(self.factory.makeProductSeries())
 
544
 
 
545
    def test_merge_with_conflicting_productseries_subscription(self):
 
546
        # See comments in assertConflictingSubscriptionDeletes.
 
547
        self.assertConflictingSubscriptionDeletes(
 
548
            self.factory.makeProductSeries())
 
549
 
 
550
    def test_merge_with_distribution_subscription(self):
 
551
        # See comments in assertSubscriptionMerges.
 
552
        self.assertSubscriptionMerges(self.factory.makeDistribution())
 
553
 
 
554
    def test_merge_with_conflicting_distribution_subscription(self):
 
555
        # See comments in assertConflictingSubscriptionDeletes.
 
556
        self.assertConflictingSubscriptionDeletes(
 
557
            self.factory.makeDistribution())
 
558
 
 
559
    def test_merge_with_sourcepackage_subscription(self):
 
560
        # See comments in assertSubscriptionMerges.
 
561
        dsp = self.factory.makeDistributionSourcePackage()
 
562
        self.assertSubscriptionMerges(dsp)
 
563
 
 
564
    def test_merge_with_conflicting_sourcepackage_subscription(self):
 
565
        # See comments in assertConflictingSubscriptionDeletes.
 
566
        dsp = self.factory.makeDistributionSourcePackage()
 
567
        self.assertConflictingSubscriptionDeletes(dsp)
 
568
 
 
569
    def test_merge_accesspolicygrants(self):
 
570
        # AccessPolicyGrants are transferred from the duplicate.
 
571
        person = self.factory.makePerson()
 
572
        grant = self.factory.makeAccessPolicyGrant()
 
573
        self._do_premerge(grant.grantee, person)
 
574
        with person_logged_in(person):
 
575
            self._do_merge(grant.grantee, person)
 
576
        self.assertEqual(person, grant.grantee)
 
577
 
 
578
    def test_merge_accesspolicygrants_conflicts(self):
 
579
        # Conflicting AccessPolicyGrants are deleted.
 
580
        policy = self.factory.makeAccessPolicy()
 
581
 
 
582
        person = self.factory.makePerson()
 
583
        person_grantor = self.factory.makePerson()
 
584
        person_grant = self.factory.makeAccessPolicyGrant(
 
585
            grantee=person, grantor=person_grantor, object=policy)
 
586
 
 
587
        duplicate = self.factory.makePerson()
 
588
        duplicate_grantor = self.factory.makePerson()
 
589
        duplicate_grant = self.factory.makeAccessPolicyGrant(
 
590
            grantee=duplicate, grantor=duplicate_grantor, object=policy)
 
591
 
 
592
        self._do_premerge(duplicate, person)
 
593
        with person_logged_in(person):
 
594
            self._do_merge(duplicate, person)
 
595
        transaction.commit()
 
596
 
 
597
        self.assertEqual(person, person_grant.grantee)
 
598
        self.assertEqual(person_grantor, person_grant.grantor)
 
599
        self.assertIs(
 
600
            None,
 
601
            IStore(AccessPolicyGrant).get(
 
602
                AccessPolicyGrant, duplicate_grant.id))
 
603
 
 
604
    def test_mergeAsync(self):
 
605
        # mergeAsync() creates a new `PersonMergeJob`.
 
606
        from_person = self.factory.makePerson()
 
607
        to_person = self.factory.makePerson()
 
608
        login_person(from_person)
 
609
        job = self.person_set.mergeAsync(from_person, to_person)
 
610
        self.assertEqual(from_person, job.from_person)
 
611
        self.assertEqual(to_person, job.to_person)
 
612
 
 
613
 
 
614
class TestPersonSetCreateByOpenId(TestCaseWithFactory):
 
615
    layer = DatabaseFunctionalLayer
 
616
 
 
617
    def setUp(self):
 
618
        super(TestPersonSetCreateByOpenId, self).setUp()
 
619
        self.person_set = getUtility(IPersonSet)
 
620
        self.store = IMasterStore(Account)
 
621
 
 
622
        # Generate some valid test data.
 
623
        self.account = self.makeAccount()
 
624
        self.identifier = self.makeOpenIdIdentifier(self.account, u'whatever')
 
625
        self.person = self.makePerson(self.account)
 
626
        self.email = self.makeEmailAddress(
 
627
            email='whatever@example.com', person=self.person)
 
628
 
 
629
    def makeAccount(self):
 
630
        return self.store.add(Account(
 
631
            displayname='Displayname',
 
632
            creation_rationale=AccountCreationRationale.UNKNOWN,
 
633
            status=AccountStatus.ACTIVE))
 
634
 
 
635
    def makeOpenIdIdentifier(self, account, identifier):
 
636
        openid_identifier = OpenIdIdentifier()
 
637
        openid_identifier.identifier = identifier
 
638
        openid_identifier.account = account
 
639
        return self.store.add(openid_identifier)
 
640
 
 
641
    def makePerson(self, account):
 
642
        return self.store.add(Person(
 
643
            name='acc%d' % account.id, account=account,
 
644
            displayname='Displayname',
 
645
            creation_rationale=PersonCreationRationale.UNKNOWN))
 
646
 
 
647
    def makeEmailAddress(self, email, person):
 
648
            return self.store.add(EmailAddress(
 
649
                email=email,
 
650
                account=person.account,
 
651
                person=person,
 
652
                status=EmailAddressStatus.PREFERRED))
 
653
 
 
654
    def testAllValid(self):
 
655
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
 
656
            self.identifier.identifier, self.email.email, 'Ignored Name',
 
657
            PersonCreationRationale.UNKNOWN, 'No Comment')
 
658
        found = removeSecurityProxy(found)
 
659
 
 
660
        self.assertIs(False, updated)
 
661
        self.assertIs(self.person, found)
 
662
        self.assertIs(self.account, found.account)
 
663
        self.assertIs(self.email, found.preferredemail)
 
664
        self.assertIs(self.email.account, self.account)
 
665
        self.assertIs(self.email.person, self.person)
 
666
        self.assertEqual(
 
667
            [self.identifier], list(self.account.openid_identifiers))
 
668
 
 
669
    def testEmailAddressCaseInsensitive(self):
 
670
        # As per testAllValid, but the email address used for the lookup
 
671
        # is all upper case.
 
672
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
 
673
            self.identifier.identifier, self.email.email.upper(),
 
674
            'Ignored Name', PersonCreationRationale.UNKNOWN, 'No Comment')
 
675
        found = removeSecurityProxy(found)
 
676
 
 
677
        self.assertIs(False, updated)
 
678
        self.assertIs(self.person, found)
 
679
        self.assertIs(self.account, found.account)
 
680
        self.assertIs(self.email, found.preferredemail)
 
681
        self.assertIs(self.email.account, self.account)
 
682
        self.assertIs(self.email.person, self.person)
 
683
        self.assertEqual(
 
684
            [self.identifier], list(self.account.openid_identifiers))
 
685
 
 
686
    def testNewOpenId(self):
 
687
        # Account looked up by email and the new OpenId identifier
 
688
        # attached. We can do this because we trust our OpenId Provider.
 
689
        new_identifier = u'newident'
 
690
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
 
691
            new_identifier, self.email.email, 'Ignored Name',
 
692
            PersonCreationRationale.UNKNOWN, 'No Comment')
 
693
        found = removeSecurityProxy(found)
 
694
 
 
695
        self.assertIs(True, updated)
 
696
        self.assertIs(self.person, found)
 
697
        self.assertIs(self.account, found.account)
 
698
        self.assertIs(self.email, found.preferredemail)
 
699
        self.assertIs(self.email.account, self.account)
 
700
        self.assertIs(self.email.person, self.person)
 
701
 
 
702
        # Old OpenId Identifier still attached.
 
703
        self.assertIn(self.identifier, list(self.account.openid_identifiers))
 
704
 
 
705
        # So is our new one.
 
706
        identifiers = [
 
707
            identifier.identifier for identifier
 
708
                in self.account.openid_identifiers]
 
709
        self.assertIn(new_identifier, identifiers)
 
710
 
 
711
    def testNewEmailAddress(self):
 
712
        # Account looked up by OpenId identifier and new EmailAddress
 
713
        # attached. We can do this because we trust our OpenId Provider.
 
714
        new_email = u'new_email@example.com'
 
715
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
 
716
            self.identifier.identifier, new_email, 'Ignored Name',
 
717
            PersonCreationRationale.UNKNOWN, 'No Comment')
 
718
        found = removeSecurityProxy(found)
 
719
 
 
720
        self.assertIs(True, updated)
 
721
        self.assertIs(self.person, found)
 
722
        self.assertIs(self.account, found.account)
 
723
        self.assertEqual(
 
724
            [self.identifier], list(self.account.openid_identifiers))
 
725
 
 
726
        # The old email address is still there and correctly linked.
 
727
        self.assertIs(self.email, found.preferredemail)
 
728
        self.assertIs(self.email.account, self.account)
 
729
        self.assertIs(self.email.person, self.person)
 
730
 
 
731
        # The new email address is there too and correctly linked.
 
732
        new_email = self.store.find(EmailAddress, email=new_email).one()
 
733
        self.assertIs(new_email.account, self.account)
 
734
        self.assertIs(new_email.person, self.person)
 
735
        self.assertEqual(EmailAddressStatus.NEW, new_email.status)
 
736
 
 
737
    def testNewAccountAndIdentifier(self):
 
738
        # If neither the OpenId Identifier nor the email address are
 
739
        # found, we create everything.
 
740
        new_email = u'new_email@example.com'
 
741
        new_identifier = u'new_identifier'
 
742
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
 
743
            new_identifier, new_email, 'New Name',
 
744
            PersonCreationRationale.UNKNOWN, 'No Comment')
 
745
        found = removeSecurityProxy(found)
 
746
 
 
747
        # We have a new Person
 
748
        self.assertIs(True, updated)
 
749
        self.assertIsNot(None, found)
 
750
 
 
751
        # It is correctly linked to an account, emailaddress and
 
752
        # identifier.
 
753
        self.assertIs(found, found.preferredemail.person)
 
754
        self.assertIs(found.account, found.preferredemail.account)
 
755
        self.assertEqual(
 
756
            new_identifier, found.account.openid_identifiers.any().identifier)
 
757
 
 
758
    def testNoPerson(self):
 
759
        # If the account is not linked to a Person, create one. ShipIt
 
760
        # users fall into this category the first time they log into
 
761
        # Launchpad.
 
762
        self.email.person = None
 
763
        self.person.account = None
 
764
 
 
765
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
 
766
            self.identifier.identifier, self.email.email, 'New Name',
 
767
            PersonCreationRationale.UNKNOWN, 'No Comment')
 
768
        found = removeSecurityProxy(found)
 
769
 
 
770
        # We have a new Person
 
771
        self.assertIs(True, updated)
 
772
        self.assertIsNot(self.person, found)
 
773
 
 
774
        # It is correctly linked to an account, emailaddress and
 
775
        # identifier.
 
776
        self.assertIs(found, found.preferredemail.person)
 
777
        self.assertIs(found.account, found.preferredemail.account)
 
778
        self.assertIn(self.identifier, list(found.account.openid_identifiers))
 
779
 
 
780
    def testNoAccount(self):
 
781
        # EmailAddress is linked to a Person, but there is no Account.
 
782
        # Convert this stub into something valid.
 
783
        self.email.account = None
 
784
        self.email.status = EmailAddressStatus.NEW
 
785
        self.person.account = None
 
786
        new_identifier = u'new_identifier'
 
787
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
 
788
            new_identifier, self.email.email, 'Ignored',
 
789
            PersonCreationRationale.UNKNOWN, 'No Comment')
 
790
        found = removeSecurityProxy(found)
 
791
 
 
792
        self.assertIs(True, updated)
 
793
 
 
794
        self.assertIsNot(None, found.account)
 
795
        self.assertEqual(
 
796
            new_identifier, found.account.openid_identifiers.any().identifier)
 
797
        self.assertIs(self.email.person, found)
 
798
        self.assertIs(self.email.account, found.account)
 
799
        self.assertEqual(EmailAddressStatus.PREFERRED, self.email.status)
 
800
 
 
801
    def testMovedEmailAddress(self):
 
802
        # The EmailAddress and OpenId Identifier are both in the
 
803
        # database, but they are not linked to the same account. The
 
804
        # identifier needs to be relinked to the correct account - the
 
805
        # user able to log into the trusted SSO with that email address
 
806
        # should be able to log into Launchpad with that email address.
 
807
        # This lets us cope with the SSO migrating email addresses
 
808
        # between SSO accounts.
 
809
        self.identifier.account = self.store.find(
 
810
            Account, displayname='Foo Bar').one()
 
811
 
 
812
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
 
813
            self.identifier.identifier, self.email.email, 'New Name',
 
814
            PersonCreationRationale.UNKNOWN, 'No Comment')
 
815
        found = removeSecurityProxy(found)
 
816
 
 
817
        self.assertIs(True, updated)
 
818
        self.assertIs(self.person, found)
 
819
 
 
820
        self.assertIs(found.account, self.identifier.account)
 
821
        self.assertIn(self.identifier, list(found.account.openid_identifiers))
 
822
 
 
823
 
 
824
class TestCreatePersonAndEmail(TestCase):
 
825
    """Test `IPersonSet`.createPersonAndEmail()."""
 
826
    layer = DatabaseFunctionalLayer
 
827
 
 
828
    def setUp(self):
 
829
        TestCase.setUp(self)
 
830
        login(ANONYMOUS)
 
831
        self.addCleanup(logout)
 
832
        self.person_set = getUtility(IPersonSet)
 
833
 
 
834
    def test_duplicated_name_not_accepted(self):
 
835
        self.person_set.createPersonAndEmail(
 
836
            'testing@example.com', PersonCreationRationale.UNKNOWN,
 
837
            name='zzzz')
 
838
        self.assertRaises(
 
839
            NameAlreadyTaken, self.person_set.createPersonAndEmail,
 
840
            'testing2@example.com', PersonCreationRationale.UNKNOWN,
 
841
            name='zzzz')
 
842
 
 
843
    def test_duplicated_email_not_accepted(self):
 
844
        self.person_set.createPersonAndEmail(
 
845
            'testing@example.com', PersonCreationRationale.UNKNOWN)
 
846
        self.assertRaises(
 
847
            EmailAddressAlreadyTaken, self.person_set.createPersonAndEmail,
 
848
            'testing@example.com', PersonCreationRationale.UNKNOWN)
 
849
 
 
850
    def test_invalid_email_not_accepted(self):
 
851
        self.assertRaises(
 
852
            InvalidEmailAddress, self.person_set.createPersonAndEmail,
 
853
            'testing@.com', PersonCreationRationale.UNKNOWN)
 
854
 
 
855
    def test_invalid_name_not_accepted(self):
 
856
        self.assertRaises(
 
857
            InvalidName, self.person_set.createPersonAndEmail,
 
858
            'testing@example.com', PersonCreationRationale.UNKNOWN,
 
859
            name='/john')
31
860
 
32
861
 
33
862
class TestPersonSetBranchCounts(TestCaseWithFactory):
123
952
        self.assertEqual(testing_person, ensured_person)
124
953
 
125
954
 
126
 
class TestPersonSetMerge(TestCaseWithFactory):
127
 
 
128
 
    layer = DatabaseFunctionalLayer
129
 
 
130
 
    def setUp(self):
131
 
        TestCaseWithFactory.setUp(self)
132
 
        # Use the unsecured PersonSet so that private methods can be tested.
133
 
        self.person_set = PersonSet()
134
 
        self.from_person = self.factory.makePerson()
135
 
        self.to_person = self.factory.makePerson()
136
 
        self.cur = cursor()
137
 
 
138
 
    def test__mergeMailingListSubscriptions_no_subscriptions(self):
139
 
        self.person_set._mergeMailingListSubscriptions(
140
 
            self.cur, self.from_person.id, self.to_person.id)
141
 
        self.assertEqual(0, self.cur.rowcount)
142
 
 
143
 
    def test__mergeMailingListSubscriptions_with_subscriptions(self):
144
 
        naked_person = removeSecurityProxy(self.from_person)
145
 
        naked_person.mailing_list_auto_subscribe_policy = (
146
 
            MailingListAutoSubscribePolicy.ALWAYS)
147
 
        self.team, self.mailing_list = self.factory.makeTeamAndMailingList(
148
 
            'test-mailinglist', 'team-owner')
149
 
        with person_logged_in(self.team.teamowner):
150
 
            self.team.addMember(
151
 
                self.from_person, reviewer=self.team.teamowner)
152
 
        transaction.commit()
153
 
        self.person_set._mergeMailingListSubscriptions(
154
 
            self.cur, self.from_person.id, self.to_person.id)
155
 
        self.assertEqual(1, self.cur.rowcount)
156
 
 
157
 
 
158
955
class TestPersonSetGetOrCreateByOpenIDIdentifier(TestCaseWithFactory):
159
956
 
160
957
    layer = DatabaseFunctionalLayer