~launchpad-pqm/launchpad/devel

14617.4.4 by William Grant
Update copyright.
1
# Copyright 2009-2012 Canonical Ltd.  This software is licensed under the
8687.15.17 by Karl Fogel
Add the copyright header block to the rest of the files under lib/lp/.
2
# GNU Affero General Public License version 3 (see the file LICENSE).
6096.5.5 by Tim Penhey
initial work
3
4
"""Tests for PersonSet."""
5
6
__metaclass__ = type
7
14642.2.5 by Curtis Hovey
Moved personset tests to test_personset.
8
from datetime import datetime
9
9866.1.2 by Curtis Hovey
Updated _mergeMailingListSubscriptions to delete the from_id's subscriptions since
10
import transaction
14642.2.5 by Curtis Hovey
Moved personset tests to test_personset.
11
12
import pytz
13
14
from testtools.matchers import (
15
    LessThan,
16
    )
17
9305.2.1 by Celso Providelo
Experimental fix for bug #408528 (IPersonSet.ensurePerson() coping with already existing email/account scenario).
18
from zope.component import getUtility
7675.340.2 by Celso Providelo
applying review comments, r=sinzui,gmb.
19
from zope.security.proxy import removeSecurityProxy
6096.5.5 by Tim Penhey
initial work
20
14562.1.4 by William Grant
Unbreak import syntax.
21
from lp.code.tests.helpers import remove_all_sample_data_branches
14642.2.5 by Curtis Hovey
Moved personset tests to test_personset.
22
from lp.registry.errors import (
23
    InvalidName,
24
    NameAlreadyTaken,
25
    )
26
from lp.registry.interfaces.karma import IKarmaCacheManager
27
from lp.registry.interfaces.mailinglist import MailingListStatus
11403.1.4 by Henning Eggers
Reformatted imports using format-imports script r32.
28
from lp.registry.interfaces.mailinglistsubscription import (
29
    MailingListAutoSubscribePolicy,
30
    )
14642.2.5 by Curtis Hovey
Moved personset tests to test_personset.
31
from lp.registry.interfaces.nameblacklist import INameBlacklistSet
11403.1.4 by Henning Eggers
Reformatted imports using format-imports script r32.
32
from lp.registry.interfaces.person import (
33
    IPersonSet,
34
    PersonCreationRationale,
14642.2.5 by Curtis Hovey
Moved personset tests to test_personset.
35
    PersonVisibility,
36
    )
37
from lp.registry.interfaces.personnotification import IPersonNotificationSet
38
from lp.registry.model.accesspolicy import AccessPolicyGrant
39
from lp.registry.model.person import (
40
    Person,
41
    PersonSet,
42
    )
43
from lp.registry.tests.test_person import KarmaTestMixin
44
from lp.services.config import config
45
from lp.services.database.lpstorm import (
46
    IMasterStore,
47
    IStore,
48
    )
14606.3.1 by William Grant
Merge canonical.database into lp.services.database.
49
from lp.services.database.sqlbase import cursor
14550.1.1 by Steve Kowalik
Run format-imports over lib/lp and lib/canonical/launchpad
50
from lp.services.identity.interfaces.account import (
14642.2.5 by Curtis Hovey
Moved personset tests to test_personset.
51
    AccountCreationRationale,
14550.1.1 by Steve Kowalik
Run format-imports over lib/lp and lib/canonical/launchpad
52
    AccountStatus,
53
    AccountSuspendedError,
54
    )
14642.2.5 by Curtis Hovey
Moved personset tests to test_personset.
55
from lp.services.identity.interfaces.emailaddress import (
56
    EmailAddressAlreadyTaken,
57
    EmailAddressStatus,
58
    IEmailAddressSet,
59
    InvalidEmailAddress,
60
    )
61
from lp.services.identity.model.account import Account
62
from lp.services.identity.model.emailaddress import EmailAddress
63
from lp.services.openid.model.openididentifier import OpenIdIdentifier
64
from lp.soyuz.enums import (
65
    ArchiveStatus,
66
    )
11403.1.4 by Henning Eggers
Reformatted imports using format-imports script r32.
67
from lp.testing import (
14642.2.5 by Curtis Hovey
Moved personset tests to test_personset.
68
    ANONYMOUS,
69
    celebrity_logged_in,
70
    login,
71
    login_person,
72
    logout,
14449.5.14 by Curtis Hovey
Do not logout while still working with secured objects.
73
    person_logged_in,
14642.2.5 by Curtis Hovey
Moved personset tests to test_personset.
74
    StormStatementRecorder,
75
    TestCase,
11403.1.4 by Henning Eggers
Reformatted imports using format-imports script r32.
76
    TestCaseWithFactory,
77
    )
14642.2.5 by Curtis Hovey
Moved personset tests to test_personset.
78
79
from lp.testing.dbuser import dbuser
14606.3.1 by William Grant
Merge canonical.database into lp.services.database.
80
from lp.testing.layers import DatabaseFunctionalLayer
14642.2.5 by Curtis Hovey
Moved personset tests to test_personset.
81
from lp.testing.matchers import HasQueryCount
82
83
84
class TestPersonSet(TestCaseWithFactory):
85
    """Test `IPersonSet`."""
86
    layer = DatabaseFunctionalLayer
87
88
    def setUp(self):
89
        super(TestPersonSet, self).setUp()
90
        login(ANONYMOUS)
91
        self.addCleanup(logout)
92
        self.person_set = getUtility(IPersonSet)
93
94
    def test_isNameBlacklisted(self):
95
        cursor().execute(
96
            "INSERT INTO NameBlacklist(id, regexp) VALUES (-100, 'foo')")
97
        self.failUnless(self.person_set.isNameBlacklisted('foo'))
98
        self.failIf(self.person_set.isNameBlacklisted('bar'))
99
100
    def test_isNameBlacklisted_user_is_admin(self):
101
        team = self.factory.makeTeam()
102
        name_blacklist_set = getUtility(INameBlacklistSet)
103
        self.admin_exp = name_blacklist_set.create(u'fnord', admin=team)
104
        self.store = IStore(self.admin_exp)
105
        self.store.flush()
106
        user = team.teamowner
107
        self.assertFalse(self.person_set.isNameBlacklisted('fnord', user))
108
109
    def test_getByEmail_ignores_case_and_whitespace(self):
110
        person1_email = 'foo.bar@canonical.com'
111
        person1 = self.person_set.getByEmail(person1_email)
112
        self.failIf(
113
            person1 is None,
114
            "PersonSet.getByEmail() could not find %r" % person1_email)
115
116
        person2 = self.person_set.getByEmail('  foo.BAR@canonICAL.com  ')
117
        self.failIf(
118
            person2 is None,
119
            "PersonSet.getByEmail() should ignore case and whitespace.")
120
        self.assertEqual(person1, person2)
121
122
    def test_getPrecachedPersonsFromIDs(self):
123
        # The getPrecachedPersonsFromIDs() method should only make one
124
        # query to load all the extraneous data. Accessing the
125
        # attributes should then cause zero queries.
126
        person_ids = [
127
            self.factory.makePerson().id
128
            for i in range(3)]
129
130
        with StormStatementRecorder() as recorder:
131
            persons = list(self.person_set.getPrecachedPersonsFromIDs(
132
                person_ids, need_karma=True, need_ubuntu_coc=True,
133
                need_location=True, need_archive=True,
134
                need_preferred_email=True, need_validity=True))
135
        self.assertThat(recorder, HasQueryCount(LessThan(2)))
136
137
        with StormStatementRecorder() as recorder:
138
            for person in persons:
139
                person.is_valid_person
140
                person.karma
141
                person.is_ubuntu_coc_signer
142
                person.location
143
                person.archive
144
                person.preferredemail
145
        self.assertThat(recorder, HasQueryCount(LessThan(1)))
146
147
    def test_latest_teams_public(self):
148
        # Anyone can see the latest 5 teams if they are public.
149
        teams = []
150
        for num in xrange(1, 7):
151
            teams.append(self.factory.makeTeam(name='team-%s' % num))
152
        teams.reverse()
153
        result = self.person_set.latest_teams()
154
        self.assertEqual(teams[0:5], list(result))
155
156
    def test_latest_teams_private(self):
157
        # Private teams are only included in the latest teams if the
158
        # user can view the team.
159
        teams = []
160
        for num in xrange(1, 7):
161
            teams.append(self.factory.makeTeam(name='team-%s' % num))
162
        owner = self.factory.makePerson()
163
        teams.append(
164
            self.factory.makeTeam(
165
                name='private-team', owner=owner,
166
                visibility=PersonVisibility.PRIVATE))
167
        teams.reverse()
168
        login_person(owner)
169
        result = self.person_set.latest_teams()
170
        self.assertEqual(teams[0:5], list(result))
171
        login_person(self.factory.makePerson())
172
        result = self.person_set.latest_teams()
173
        self.assertEqual(teams[1:6], list(result))
174
175
    def test_latest_teams_limit(self):
176
        # The limit controls the number of latest teams returned.
177
        teams = []
178
        for num in xrange(1, 7):
179
            teams.append(self.factory.makeTeam(name='team-%s' % num))
180
        teams.reverse()
181
        result = self.person_set.latest_teams(limit=3)
182
        self.assertEqual(teams[0:3], list(result))
183
184
185
class TestPersonSetMergeMailingListSubscriptions(TestCaseWithFactory):
186
187
    layer = DatabaseFunctionalLayer
188
189
    def setUp(self):
190
        TestCaseWithFactory.setUp(self)
191
        # Use the unsecured PersonSet so that private methods can be tested.
192
        self.person_set = PersonSet()
193
        self.from_person = self.factory.makePerson()
194
        self.to_person = self.factory.makePerson()
195
        self.cur = cursor()
196
197
    def test__mergeMailingListSubscriptions_no_subscriptions(self):
198
        self.person_set._mergeMailingListSubscriptions(
199
            self.cur, self.from_person.id, self.to_person.id)
200
        self.assertEqual(0, self.cur.rowcount)
201
202
    def test__mergeMailingListSubscriptions_with_subscriptions(self):
203
        naked_person = removeSecurityProxy(self.from_person)
204
        naked_person.mailing_list_auto_subscribe_policy = (
205
            MailingListAutoSubscribePolicy.ALWAYS)
206
        self.team, self.mailing_list = self.factory.makeTeamAndMailingList(
207
            'test-mailinglist', 'team-owner')
208
        with person_logged_in(self.team.teamowner):
209
            self.team.addMember(
210
                self.from_person, reviewer=self.team.teamowner)
211
        transaction.commit()
212
        self.person_set._mergeMailingListSubscriptions(
213
            self.cur, self.from_person.id, self.to_person.id)
214
        self.assertEqual(1, self.cur.rowcount)
215
216
217
class TestPersonSetMerge(TestCaseWithFactory, KarmaTestMixin):
218
    """Test cases for PersonSet merge."""
219
220
    layer = DatabaseFunctionalLayer
221
222
    def setUp(self):
223
        super(TestPersonSetMerge, self).setUp()
224
        self.person_set = getUtility(IPersonSet)
225
226
    def _do_premerge(self, from_person, to_person):
227
        # Do the pre merge work performed by the LoginToken.
228
        with celebrity_logged_in('admin'):
229
            email = from_person.preferredemail
230
            email.status = EmailAddressStatus.NEW
231
            email.person = to_person
232
            email.account = to_person.account
233
        transaction.commit()
234
235
    def _do_merge(self, from_person, to_person, reviewer=None):
236
        # Perform the merge as the db user that will be used by the jobs.
237
        with dbuser(config.IPersonMergeJobSource.dbuser):
238
            self.person_set.merge(from_person, to_person, reviewer=reviewer)
239
        return from_person, to_person
240
241
    def _get_testable_account(self, person, date_created, openid_identifier):
242
        # Return a naked account with predictable attributes.
243
        account = removeSecurityProxy(person.account)
244
        account.date_created = date_created
245
        account.openid_identifier = openid_identifier
246
        return account
247
248
    def test_delete_no_notifications(self):
249
        team = self.factory.makeTeam()
250
        owner = team.teamowner
251
        transaction.commit()
252
        with dbuser(config.IPersonMergeJobSource.dbuser):
253
            self.person_set.delete(team, owner)
254
        notification_set = getUtility(IPersonNotificationSet)
255
        notifications = notification_set.getNotificationsToSend()
256
        self.assertEqual(0, notifications.count())
257
258
    def test_openid_identifiers(self):
259
        # Verify that OpenId Identifiers are merged.
260
        duplicate = self.factory.makePerson()
261
        duplicate_identifier = removeSecurityProxy(
262
            duplicate.account).openid_identifiers.any().identifier
263
        person = self.factory.makePerson()
264
        person_identifier = removeSecurityProxy(
265
            person.account).openid_identifiers.any().identifier
266
        self._do_premerge(duplicate, person)
267
        login_person(person)
268
        duplicate, person = self._do_merge(duplicate, person)
269
        self.assertEqual(
270
            0,
271
            removeSecurityProxy(duplicate.account).openid_identifiers.count())
272
273
        merged_identifiers = [
274
            identifier.identifier for identifier in
275
                removeSecurityProxy(person.account).openid_identifiers]
276
277
        self.assertIn(duplicate_identifier, merged_identifiers)
278
        self.assertIn(person_identifier, merged_identifiers)
279
280
    def test_karmacache_transferred_to_user_has_no_karma(self):
281
        # Verify that the merged user has no KarmaCache entries,
282
        # and the karma total was transfered.
283
        self.cache_manager = getUtility(IKarmaCacheManager)
284
        product = self.factory.makeProduct()
285
        duplicate = self.factory.makePerson()
286
        self._makeKarmaCache(
287
            duplicate, product, [('bugs', 10)])
288
        self._makeKarmaTotalCache(duplicate, 15)
289
        # The karma changes invalidated duplicate instance.
290
        duplicate = self.person_set.get(duplicate.id)
291
        person = self.factory.makePerson()
292
        self._do_premerge(duplicate, person)
293
        login_person(person)
294
        duplicate, person = self._do_merge(duplicate, person)
295
        self.assertEqual([], duplicate.karma_category_caches)
296
        self.assertEqual(0, duplicate.karma)
297
        self.assertEqual(15, person.karma)
298
299
    def test_karmacache_transferred_to_user_has_karma(self):
300
        # Verify that the merged user has no KarmaCache entries,
301
        # and the karma total was summed.
302
        self.cache_manager = getUtility(IKarmaCacheManager)
303
        product = self.factory.makeProduct()
304
        duplicate = self.factory.makePerson()
305
        self._makeKarmaCache(
306
            duplicate, product, [('bugs', 10)])
307
        self._makeKarmaTotalCache(duplicate, 15)
308
        person = self.factory.makePerson()
309
        self._makeKarmaCache(
310
            person, product, [('bugs', 9)])
311
        self._makeKarmaTotalCache(person, 13)
312
        # The karma changes invalidated duplicate and person instances.
313
        duplicate = self.person_set.get(duplicate.id)
314
        person = self.person_set.get(person.id)
315
        self._do_premerge(duplicate, person)
316
        login_person(person)
317
        duplicate, person = self._do_merge(duplicate, person)
318
        self.assertEqual([], duplicate.karma_category_caches)
319
        self.assertEqual(0, duplicate.karma)
320
        self.assertEqual(28, person.karma)
321
322
    def test_person_date_created_preserved(self):
323
        # Verify that the oldest datecreated is merged.
324
        person = self.factory.makePerson()
325
        duplicate = self.factory.makePerson()
326
        oldest_date = datetime(
327
            2005, 11, 25, 0, 0, 0, 0, pytz.timezone('UTC'))
328
        removeSecurityProxy(duplicate).datecreated = oldest_date
329
        self._do_premerge(duplicate, person)
330
        login_person(person)
331
        duplicate, person = self._do_merge(duplicate, person)
332
        self.assertEqual(oldest_date, person.datecreated)
333
334
    def test_team_with_active_mailing_list_raises_error(self):
335
        # A team with an active mailing list cannot be merged.
336
        target_team = self.factory.makeTeam()
337
        test_team = self.factory.makeTeam()
338
        self.factory.makeMailingList(
339
            test_team, test_team.teamowner)
340
        self.assertRaises(
341
            AssertionError, self.person_set.merge, test_team, target_team)
342
343
    def test_team_with_inactive_mailing_list(self):
344
        # A team with an inactive mailing list can be merged.
345
        target_team = self.factory.makeTeam()
346
        test_team = self.factory.makeTeam()
347
        mailing_list = self.factory.makeMailingList(
348
            test_team, test_team.teamowner)
349
        mailing_list.deactivate()
350
        mailing_list.transitionToStatus(MailingListStatus.INACTIVE)
351
        test_team, target_team = self._do_merge(
352
            test_team, target_team, test_team.teamowner)
353
        self.assertEqual(target_team, test_team.merged)
354
        self.assertEqual(
355
            MailingListStatus.PURGED, test_team.mailing_list.status)
356
        emails = getUtility(IEmailAddressSet).getByPerson(target_team).count()
357
        self.assertEqual(0, emails)
358
359
    def test_team_with_purged_mailing_list(self):
360
        # A team with a purges mailing list can be merged.
361
        target_team = self.factory.makeTeam()
362
        test_team = self.factory.makeTeam()
363
        mailing_list = self.factory.makeMailingList(
364
            test_team, test_team.teamowner)
365
        mailing_list.deactivate()
366
        mailing_list.transitionToStatus(MailingListStatus.INACTIVE)
367
        mailing_list.purge()
368
        test_team, target_team = self._do_merge(
369
            test_team, target_team, test_team.teamowner)
370
        self.assertEqual(target_team, test_team.merged)
371
372
    def test_team_with_members(self):
373
        # Team members are removed before merging.
374
        target_team = self.factory.makeTeam()
375
        test_team = self.factory.makeTeam()
376
        former_member = self.factory.makePerson()
377
        with person_logged_in(test_team.teamowner):
378
            test_team.addMember(former_member, test_team.teamowner)
379
        test_team, target_team = self._do_merge(
380
            test_team, target_team, test_team.teamowner)
381
        self.assertEqual(target_team, test_team.merged)
382
        self.assertEqual([], list(former_member.super_teams))
383
384
    def test_team_without_super_teams_is_fine(self):
385
        # A team with no members and no super teams
386
        # merges without errors.
387
        test_team = self.factory.makeTeam()
388
        target_team = self.factory.makeTeam()
389
        login_person(test_team.teamowner)
390
        self._do_merge(test_team, target_team, test_team.teamowner)
391
392
    def test_team_with_super_teams(self):
393
        # A team with superteams can be merged, but the memberships
394
        # are not transferred.
395
        test_team = self.factory.makeTeam()
396
        super_team = self.factory.makeTeam()
397
        target_team = self.factory.makeTeam()
398
        login_person(test_team.teamowner)
399
        test_team.join(super_team, test_team.teamowner)
400
        test_team, target_team = self._do_merge(
401
            test_team, target_team, test_team.teamowner)
402
        self.assertEqual(target_team, test_team.merged)
403
        self.assertEqual([], list(target_team.super_teams))
404
405
    def test_merge_moves_branches(self):
406
        # When person/teams are merged, branches owned by the from person
407
        # are moved.
408
        person = self.factory.makePerson()
409
        branch = self.factory.makeBranch()
410
        duplicate = branch.owner
411
        self._do_premerge(branch.owner, person)
412
        login_person(person)
413
        duplicate, person = self._do_merge(duplicate, person)
414
        branches = person.getBranches()
415
        self.assertEqual(1, branches.count())
416
417
    def test_merge_with_duplicated_branches(self):
418
        # If both the from and to people have branches with the same name,
419
        # merging renames the duplicate from the from person's side.
420
        product = self.factory.makeProduct()
421
        from_branch = self.factory.makeBranch(name='foo', product=product)
422
        to_branch = self.factory.makeBranch(name='foo', product=product)
423
        mergee = to_branch.owner
424
        duplicate = from_branch.owner
425
        self._do_premerge(duplicate, mergee)
426
        login_person(mergee)
427
        duplicate, mergee = self._do_merge(duplicate, mergee)
428
        branches = [b.name for b in mergee.getBranches()]
429
        self.assertEqual(2, len(branches))
430
        self.assertContentEqual([u'foo', u'foo-1'], branches)
431
432
    def test_merge_moves_recipes(self):
433
        # When person/teams are merged, recipes owned by the from person are
434
        # moved.
435
        person = self.factory.makePerson()
436
        recipe = self.factory.makeSourcePackageRecipe()
437
        duplicate = recipe.owner
438
        # Delete the PPA, which is required for the merge to work.
439
        with person_logged_in(duplicate):
440
            recipe.owner.archive.status = ArchiveStatus.DELETED
441
        self._do_premerge(duplicate, person)
442
        login_person(person)
443
        duplicate, person = self._do_merge(duplicate, person)
444
        self.assertEqual(1, person.recipes.count())
445
446
    def test_merge_with_duplicated_recipes(self):
447
        # If both the from and to people have recipes with the same name,
448
        # merging renames the duplicate from the from person's side.
449
        merge_from = self.factory.makeSourcePackageRecipe(
450
            name=u'foo', description=u'FROM')
451
        merge_to = self.factory.makeSourcePackageRecipe(
452
            name=u'foo', description=u'TO')
453
        duplicate = merge_from.owner
454
        mergee = merge_to.owner
455
        # Delete merge_from's PPA, which is required for the merge to work.
456
        with person_logged_in(merge_from.owner):
457
            merge_from.owner.archive.status = ArchiveStatus.DELETED
458
        self._do_premerge(merge_from.owner, mergee)
459
        login_person(mergee)
460
        duplicate, mergee = self._do_merge(duplicate, mergee)
461
        recipes = mergee.recipes
462
        self.assertEqual(2, recipes.count())
463
        descriptions = [r.description for r in recipes]
464
        self.assertEqual([u'TO', u'FROM'], descriptions)
465
        self.assertEqual(u'foo-1', recipes[1].name)
466
467
    def assertSubscriptionMerges(self, target):
468
        # Given a subscription target, we want to make sure that subscriptions
469
        # that the duplicate person made are carried over to the merged
470
        # account.
471
        duplicate = self.factory.makePerson()
472
        with person_logged_in(duplicate):
473
            target.addSubscription(duplicate, duplicate)
474
        person = self.factory.makePerson()
475
        self._do_premerge(duplicate, person)
476
        login_person(person)
477
        duplicate, person = self._do_merge(duplicate, person)
478
        # The merged person has the subscription, and the duplicate person
479
        # does not.
480
        self.assertTrue(target.getSubscription(person) is not None)
481
        self.assertTrue(target.getSubscription(duplicate) is None)
482
483
    def assertConflictingSubscriptionDeletes(self, target):
484
        # Given a subscription target, we want to make sure that subscriptions
485
        # that the duplicate person made that conflict with existing
486
        # subscriptions in the merged account are deleted.
487
        duplicate = self.factory.makePerson()
488
        person = self.factory.makePerson()
489
        with person_logged_in(duplicate):
490
            target.addSubscription(duplicate, duplicate)
491
        with person_logged_in(person):
492
            # The description lets us show that we still have the right
493
            # subscription later.
494
            target.addBugSubscriptionFilter(person, person).description = (
495
                u'a marker')
496
        self._do_premerge(duplicate, person)
497
        login_person(person)
498
        duplicate, person = self._do_merge(duplicate, person)
499
        # The merged person still has the original subscription, as shown
500
        # by the marker name.
501
        self.assertEqual(
502
            target.getSubscription(person).bug_filters[0].description,
503
            u'a marker')
504
        # The conflicting subscription on the duplicate has been deleted.
505
        self.assertTrue(target.getSubscription(duplicate) is None)
506
507
    def test_merge_with_product_subscription(self):
508
        # See comments in assertSubscriptionMerges.
509
        self.assertSubscriptionMerges(self.factory.makeProduct())
510
511
    def test_merge_with_conflicting_product_subscription(self):
512
        # See comments in assertConflictingSubscriptionDeletes.
513
        self.assertConflictingSubscriptionDeletes(self.factory.makeProduct())
514
515
    def test_merge_with_project_subscription(self):
516
        # See comments in assertSubscriptionMerges.
517
        self.assertSubscriptionMerges(self.factory.makeProject())
518
519
    def test_merge_with_conflicting_project_subscription(self):
520
        # See comments in assertConflictingSubscriptionDeletes.
521
        self.assertConflictingSubscriptionDeletes(self.factory.makeProject())
522
523
    def test_merge_with_distroseries_subscription(self):
524
        # See comments in assertSubscriptionMerges.
525
        self.assertSubscriptionMerges(self.factory.makeDistroSeries())
526
527
    def test_merge_with_conflicting_distroseries_subscription(self):
528
        # See comments in assertConflictingSubscriptionDeletes.
529
        self.assertConflictingSubscriptionDeletes(
530
            self.factory.makeDistroSeries())
531
532
    def test_merge_with_milestone_subscription(self):
533
        # See comments in assertSubscriptionMerges.
534
        self.assertSubscriptionMerges(self.factory.makeMilestone())
535
536
    def test_merge_with_conflicting_milestone_subscription(self):
537
        # See comments in assertConflictingSubscriptionDeletes.
538
        self.assertConflictingSubscriptionDeletes(
539
            self.factory.makeMilestone())
540
541
    def test_merge_with_productseries_subscription(self):
542
        # See comments in assertSubscriptionMerges.
543
        self.assertSubscriptionMerges(self.factory.makeProductSeries())
544
545
    def test_merge_with_conflicting_productseries_subscription(self):
546
        # See comments in assertConflictingSubscriptionDeletes.
547
        self.assertConflictingSubscriptionDeletes(
548
            self.factory.makeProductSeries())
549
550
    def test_merge_with_distribution_subscription(self):
551
        # See comments in assertSubscriptionMerges.
552
        self.assertSubscriptionMerges(self.factory.makeDistribution())
553
554
    def test_merge_with_conflicting_distribution_subscription(self):
555
        # See comments in assertConflictingSubscriptionDeletes.
556
        self.assertConflictingSubscriptionDeletes(
557
            self.factory.makeDistribution())
558
559
    def test_merge_with_sourcepackage_subscription(self):
560
        # See comments in assertSubscriptionMerges.
561
        dsp = self.factory.makeDistributionSourcePackage()
562
        self.assertSubscriptionMerges(dsp)
563
564
    def test_merge_with_conflicting_sourcepackage_subscription(self):
565
        # See comments in assertConflictingSubscriptionDeletes.
566
        dsp = self.factory.makeDistributionSourcePackage()
567
        self.assertConflictingSubscriptionDeletes(dsp)
568
569
    def test_merge_accesspolicygrants(self):
570
        # AccessPolicyGrants are transferred from the duplicate.
571
        person = self.factory.makePerson()
572
        grant = self.factory.makeAccessPolicyGrant()
573
        self._do_premerge(grant.grantee, person)
574
        with person_logged_in(person):
575
            self._do_merge(grant.grantee, person)
576
        self.assertEqual(person, grant.grantee)
577
578
    def test_merge_accesspolicygrants_conflicts(self):
579
        # Conflicting AccessPolicyGrants are deleted.
580
        policy = self.factory.makeAccessPolicy()
581
582
        person = self.factory.makePerson()
583
        person_grantor = self.factory.makePerson()
584
        person_grant = self.factory.makeAccessPolicyGrant(
585
            grantee=person, grantor=person_grantor, object=policy)
586
587
        duplicate = self.factory.makePerson()
588
        duplicate_grantor = self.factory.makePerson()
589
        duplicate_grant = self.factory.makeAccessPolicyGrant(
590
            grantee=duplicate, grantor=duplicate_grantor, object=policy)
591
592
        self._do_premerge(duplicate, person)
593
        with person_logged_in(person):
594
            self._do_merge(duplicate, person)
595
        transaction.commit()
596
597
        self.assertEqual(person, person_grant.grantee)
598
        self.assertEqual(person_grantor, person_grant.grantor)
599
        self.assertIs(
600
            None,
601
            IStore(AccessPolicyGrant).get(
602
                AccessPolicyGrant, duplicate_grant.id))
603
604
    def test_mergeAsync(self):
605
        # mergeAsync() creates a new `PersonMergeJob`.
606
        from_person = self.factory.makePerson()
607
        to_person = self.factory.makePerson()
608
        login_person(from_person)
609
        job = self.person_set.mergeAsync(from_person, to_person)
610
        self.assertEqual(from_person, job.from_person)
611
        self.assertEqual(to_person, job.to_person)
612
613
614
class TestPersonSetCreateByOpenId(TestCaseWithFactory):
615
    layer = DatabaseFunctionalLayer
616
617
    def setUp(self):
618
        super(TestPersonSetCreateByOpenId, self).setUp()
619
        self.person_set = getUtility(IPersonSet)
620
        self.store = IMasterStore(Account)
621
622
        # Generate some valid test data.
623
        self.account = self.makeAccount()
624
        self.identifier = self.makeOpenIdIdentifier(self.account, u'whatever')
625
        self.person = self.makePerson(self.account)
626
        self.email = self.makeEmailAddress(
627
            email='whatever@example.com', person=self.person)
628
629
    def makeAccount(self):
630
        return self.store.add(Account(
631
            displayname='Displayname',
632
            creation_rationale=AccountCreationRationale.UNKNOWN,
633
            status=AccountStatus.ACTIVE))
634
635
    def makeOpenIdIdentifier(self, account, identifier):
636
        openid_identifier = OpenIdIdentifier()
637
        openid_identifier.identifier = identifier
638
        openid_identifier.account = account
639
        return self.store.add(openid_identifier)
640
641
    def makePerson(self, account):
642
        return self.store.add(Person(
643
            name='acc%d' % account.id, account=account,
644
            displayname='Displayname',
645
            creation_rationale=PersonCreationRationale.UNKNOWN))
646
647
    def makeEmailAddress(self, email, person):
648
            return self.store.add(EmailAddress(
649
                email=email,
650
                account=person.account,
651
                person=person,
652
                status=EmailAddressStatus.PREFERRED))
653
654
    def testAllValid(self):
655
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
656
            self.identifier.identifier, self.email.email, 'Ignored Name',
657
            PersonCreationRationale.UNKNOWN, 'No Comment')
658
        found = removeSecurityProxy(found)
659
660
        self.assertIs(False, updated)
661
        self.assertIs(self.person, found)
662
        self.assertIs(self.account, found.account)
663
        self.assertIs(self.email, found.preferredemail)
664
        self.assertIs(self.email.account, self.account)
665
        self.assertIs(self.email.person, self.person)
666
        self.assertEqual(
667
            [self.identifier], list(self.account.openid_identifiers))
668
669
    def testEmailAddressCaseInsensitive(self):
670
        # As per testAllValid, but the email address used for the lookup
671
        # is all upper case.
672
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
673
            self.identifier.identifier, self.email.email.upper(),
674
            'Ignored Name', PersonCreationRationale.UNKNOWN, 'No Comment')
675
        found = removeSecurityProxy(found)
676
677
        self.assertIs(False, updated)
678
        self.assertIs(self.person, found)
679
        self.assertIs(self.account, found.account)
680
        self.assertIs(self.email, found.preferredemail)
681
        self.assertIs(self.email.account, self.account)
682
        self.assertIs(self.email.person, self.person)
683
        self.assertEqual(
684
            [self.identifier], list(self.account.openid_identifiers))
685
686
    def testNewOpenId(self):
687
        # Account looked up by email and the new OpenId identifier
688
        # attached. We can do this because we trust our OpenId Provider.
689
        new_identifier = u'newident'
690
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
691
            new_identifier, self.email.email, 'Ignored Name',
692
            PersonCreationRationale.UNKNOWN, 'No Comment')
693
        found = removeSecurityProxy(found)
694
695
        self.assertIs(True, updated)
696
        self.assertIs(self.person, found)
697
        self.assertIs(self.account, found.account)
698
        self.assertIs(self.email, found.preferredemail)
699
        self.assertIs(self.email.account, self.account)
700
        self.assertIs(self.email.person, self.person)
701
702
        # Old OpenId Identifier still attached.
703
        self.assertIn(self.identifier, list(self.account.openid_identifiers))
704
705
        # So is our new one.
706
        identifiers = [
707
            identifier.identifier for identifier
708
                in self.account.openid_identifiers]
709
        self.assertIn(new_identifier, identifiers)
710
711
    def testNewEmailAddress(self):
712
        # Account looked up by OpenId identifier and new EmailAddress
713
        # attached. We can do this because we trust our OpenId Provider.
714
        new_email = u'new_email@example.com'
715
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
716
            self.identifier.identifier, new_email, 'Ignored Name',
717
            PersonCreationRationale.UNKNOWN, 'No Comment')
718
        found = removeSecurityProxy(found)
719
720
        self.assertIs(True, updated)
721
        self.assertIs(self.person, found)
722
        self.assertIs(self.account, found.account)
723
        self.assertEqual(
724
            [self.identifier], list(self.account.openid_identifiers))
725
726
        # The old email address is still there and correctly linked.
727
        self.assertIs(self.email, found.preferredemail)
728
        self.assertIs(self.email.account, self.account)
729
        self.assertIs(self.email.person, self.person)
730
731
        # The new email address is there too and correctly linked.
732
        new_email = self.store.find(EmailAddress, email=new_email).one()
733
        self.assertIs(new_email.account, self.account)
734
        self.assertIs(new_email.person, self.person)
735
        self.assertEqual(EmailAddressStatus.NEW, new_email.status)
736
737
    def testNewAccountAndIdentifier(self):
738
        # If neither the OpenId Identifier nor the email address are
739
        # found, we create everything.
740
        new_email = u'new_email@example.com'
741
        new_identifier = u'new_identifier'
742
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
743
            new_identifier, new_email, 'New Name',
744
            PersonCreationRationale.UNKNOWN, 'No Comment')
745
        found = removeSecurityProxy(found)
746
747
        # We have a new Person
748
        self.assertIs(True, updated)
749
        self.assertIsNot(None, found)
750
751
        # It is correctly linked to an account, emailaddress and
752
        # identifier.
753
        self.assertIs(found, found.preferredemail.person)
754
        self.assertIs(found.account, found.preferredemail.account)
755
        self.assertEqual(
756
            new_identifier, found.account.openid_identifiers.any().identifier)
757
758
    def testNoPerson(self):
759
        # If the account is not linked to a Person, create one. ShipIt
760
        # users fall into this category the first time they log into
761
        # Launchpad.
762
        self.email.person = None
763
        self.person.account = None
764
765
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
766
            self.identifier.identifier, self.email.email, 'New Name',
767
            PersonCreationRationale.UNKNOWN, 'No Comment')
768
        found = removeSecurityProxy(found)
769
770
        # We have a new Person
771
        self.assertIs(True, updated)
772
        self.assertIsNot(self.person, found)
773
774
        # It is correctly linked to an account, emailaddress and
775
        # identifier.
776
        self.assertIs(found, found.preferredemail.person)
777
        self.assertIs(found.account, found.preferredemail.account)
778
        self.assertIn(self.identifier, list(found.account.openid_identifiers))
779
780
    def testNoAccount(self):
781
        # EmailAddress is linked to a Person, but there is no Account.
782
        # Convert this stub into something valid.
783
        self.email.account = None
784
        self.email.status = EmailAddressStatus.NEW
785
        self.person.account = None
786
        new_identifier = u'new_identifier'
787
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
788
            new_identifier, self.email.email, 'Ignored',
789
            PersonCreationRationale.UNKNOWN, 'No Comment')
790
        found = removeSecurityProxy(found)
791
792
        self.assertIs(True, updated)
793
794
        self.assertIsNot(None, found.account)
795
        self.assertEqual(
796
            new_identifier, found.account.openid_identifiers.any().identifier)
797
        self.assertIs(self.email.person, found)
798
        self.assertIs(self.email.account, found.account)
799
        self.assertEqual(EmailAddressStatus.PREFERRED, self.email.status)
800
801
    def testMovedEmailAddress(self):
802
        # The EmailAddress and OpenId Identifier are both in the
803
        # database, but they are not linked to the same account. The
804
        # identifier needs to be relinked to the correct account - the
805
        # user able to log into the trusted SSO with that email address
806
        # should be able to log into Launchpad with that email address.
807
        # This lets us cope with the SSO migrating email addresses
808
        # between SSO accounts.
809
        self.identifier.account = self.store.find(
810
            Account, displayname='Foo Bar').one()
811
812
        found, updated = self.person_set.getOrCreateByOpenIDIdentifier(
813
            self.identifier.identifier, self.email.email, 'New Name',
814
            PersonCreationRationale.UNKNOWN, 'No Comment')
815
        found = removeSecurityProxy(found)
816
817
        self.assertIs(True, updated)
818
        self.assertIs(self.person, found)
819
820
        self.assertIs(found.account, self.identifier.account)
821
        self.assertIn(self.identifier, list(found.account.openid_identifiers))
822
823
824
class TestCreatePersonAndEmail(TestCase):
825
    """Test `IPersonSet`.createPersonAndEmail()."""
826
    layer = DatabaseFunctionalLayer
827
828
    def setUp(self):
829
        TestCase.setUp(self)
830
        login(ANONYMOUS)
831
        self.addCleanup(logout)
832
        self.person_set = getUtility(IPersonSet)
833
834
    def test_duplicated_name_not_accepted(self):
835
        self.person_set.createPersonAndEmail(
836
            'testing@example.com', PersonCreationRationale.UNKNOWN,
837
            name='zzzz')
838
        self.assertRaises(
839
            NameAlreadyTaken, self.person_set.createPersonAndEmail,
840
            'testing2@example.com', PersonCreationRationale.UNKNOWN,
841
            name='zzzz')
842
843
    def test_duplicated_email_not_accepted(self):
844
        self.person_set.createPersonAndEmail(
845
            'testing@example.com', PersonCreationRationale.UNKNOWN)
846
        self.assertRaises(
847
            EmailAddressAlreadyTaken, self.person_set.createPersonAndEmail,
848
            'testing@example.com', PersonCreationRationale.UNKNOWN)
849
850
    def test_invalid_email_not_accepted(self):
851
        self.assertRaises(
852
            InvalidEmailAddress, self.person_set.createPersonAndEmail,
853
            'testing@.com', PersonCreationRationale.UNKNOWN)
854
855
    def test_invalid_name_not_accepted(self):
856
        self.assertRaises(
857
            InvalidName, self.person_set.createPersonAndEmail,
858
            'testing@example.com', PersonCreationRationale.UNKNOWN,
859
            name='/john')
6096.5.5 by Tim Penhey
initial work
860
861
9305.2.1 by Celso Providelo
Experimental fix for bug #408528 (IPersonSet.ensurePerson() coping with already existing email/account scenario).
862
class TestPersonSetBranchCounts(TestCaseWithFactory):
6096.5.5 by Tim Penhey
initial work
863
9866.1.1 by Curtis Hovey
Run test_personset tests in the DatabaseFunctionalLayer.
864
    layer = DatabaseFunctionalLayer
6096.5.5 by Tim Penhey
initial work
865
866
    def setUp(self):
9305.2.1 by Celso Providelo
Experimental fix for bug #408528 (IPersonSet.ensurePerson() coping with already existing email/account scenario).
867
        TestCaseWithFactory.setUp(self)
6096.5.5 by Tim Penhey
initial work
868
        remove_all_sample_data_branches()
9866.1.2 by Curtis Hovey
Updated _mergeMailingListSubscriptions to delete the from_id's subscriptions since
869
        self.person_set = getUtility(IPersonSet)
6096.5.5 by Tim Penhey
initial work
870
871
    def test_no_branches(self):
872
        """Initially there should be no branches."""
9866.1.2 by Curtis Hovey
Updated _mergeMailingListSubscriptions to delete the from_id's subscriptions since
873
        self.assertEqual(0, self.person_set.getPeopleWithBranches().count())
6096.5.5 by Tim Penhey
initial work
874
875
    def test_five_branches(self):
7362.12.24 by Jonathan Lange
Restore change that somehow got lost.
876
        branches = [self.factory.makeAnyBranch() for x in range(5)]
6096.5.5 by Tim Penhey
initial work
877
        # Each branch has a different product, so any individual product
878
        # will return one branch.
9866.1.2 by Curtis Hovey
Updated _mergeMailingListSubscriptions to delete the from_id's subscriptions since
879
        self.assertEqual(5, self.person_set.getPeopleWithBranches().count())
880
        self.assertEqual(1, self.person_set.getPeopleWithBranches(
6096.5.5 by Tim Penhey
initial work
881
                branches[0].product).count())
882
883
9305.2.1 by Celso Providelo
Experimental fix for bug #408528 (IPersonSet.ensurePerson() coping with already existing email/account scenario).
884
class TestPersonSetEnsurePerson(TestCaseWithFactory):
885
9866.1.1 by Curtis Hovey
Run test_personset tests in the DatabaseFunctionalLayer.
886
    layer = DatabaseFunctionalLayer
9305.2.4 by Celso Providelo
More tests for IPersonSet.ensurePerson.
887
    email_address = 'testing.ensure.person@example.com'
888
    displayname = 'Testing ensurePerson'
889
    rationale = PersonCreationRationale.SOURCEPACKAGEUPLOAD
890
9866.1.2 by Curtis Hovey
Updated _mergeMailingListSubscriptions to delete the from_id's subscriptions since
891
    def setUp(self):
892
        TestCaseWithFactory.setUp(self)
893
        self.person_set = getUtility(IPersonSet)
894
9305.2.4 by Celso Providelo
More tests for IPersonSet.ensurePerson.
895
    def test_ensurePerson_returns_existing_person(self):
896
        # IPerson.ensurePerson returns existing person and does not
897
        # override its details.
898
        testing_displayname = 'will not be modified'
899
        testing_person = self.factory.makePerson(
900
            email=self.email_address, displayname=testing_displayname)
901
9866.1.2 by Curtis Hovey
Updated _mergeMailingListSubscriptions to delete the from_id's subscriptions since
902
        ensured_person = self.person_set.ensurePerson(
9305.2.4 by Celso Providelo
More tests for IPersonSet.ensurePerson.
903
            self.email_address, self.displayname, self.rationale)
904
        self.assertEquals(testing_person.id, ensured_person.id)
905
        self.assertIsNot(
906
            ensured_person.displayname, self.displayname,
907
            'Person.displayname should not be overridden.')
908
        self.assertIsNot(
909
            ensured_person.creation_rationale, self.rationale,
910
            'Person.creation_rationale should not be overridden.')
911
912
    def test_ensurePerson_hides_new_person_email(self):
913
        # IPersonSet.ensurePerson creates new person with
914
        # 'hide_email_addresses' set.
9866.1.2 by Curtis Hovey
Updated _mergeMailingListSubscriptions to delete the from_id's subscriptions since
915
        ensured_person = self.person_set.ensurePerson(
9305.2.4 by Celso Providelo
More tests for IPersonSet.ensurePerson.
916
            self.email_address, self.displayname, self.rationale)
917
        self.assertTrue(ensured_person.hide_email_addresses)
9305.2.1 by Celso Providelo
Experimental fix for bug #408528 (IPersonSet.ensurePerson() coping with already existing email/account scenario).
918
919
    def test_ensurePerson_for_existing_account(self):
9305.2.4 by Celso Providelo
More tests for IPersonSet.ensurePerson.
920
        # IPerson.ensurePerson creates missing Person for existing
921
        # Accounts.
922
        test_account = self.factory.makeAccount(
923
            self.displayname, email=self.email_address)
924
        self.assertIs(None, test_account.preferredemail.person)
9305.2.3 by Celso Providelo
applying review comments, r=barry.
925
9866.1.2 by Curtis Hovey
Updated _mergeMailingListSubscriptions to delete the from_id's subscriptions since
926
        ensured_person = self.person_set.ensurePerson(
9305.2.4 by Celso Providelo
More tests for IPersonSet.ensurePerson.
927
            self.email_address, self.displayname, self.rationale)
928
        self.assertEquals(test_account.id, ensured_person.account.id)
12685.1.1 by William Grant
Add failing test.
929
        self.assertEquals(
930
            test_account.preferredemail, ensured_person.preferredemail)
931
        self.assertEquals(ensured_person, test_account.preferredemail.person)
9305.2.4 by Celso Providelo
More tests for IPersonSet.ensurePerson.
932
        self.assertTrue(ensured_person.hide_email_addresses)
9305.2.1 by Celso Providelo
Experimental fix for bug #408528 (IPersonSet.ensurePerson() coping with already existing email/account scenario).
933
9570.11.1 by Celso Providelo
Re-fixing IPersonSet.ensurePerson() to cope with accounts with existing person but with unlinked email.
934
    def test_ensurePerson_for_existing_account_with_person(self):
935
        # IPerson.ensurePerson return existing Person for existing
7675.340.2 by Celso Providelo
applying review comments, r=sinzui,gmb.
936
        # Accounts and additionally bounds the account email to the
937
        # Person in question.
9570.11.1 by Celso Providelo
Re-fixing IPersonSet.ensurePerson() to cope with accounts with existing person but with unlinked email.
938
939
        # Create a testing `Account` and a testing `Person` directly,
10556.4.7 by Guilherme Salgado
Fix a couple tests. These should be the last ones.
940
        # linked.
9570.11.1 by Celso Providelo
Re-fixing IPersonSet.ensurePerson() to cope with accounts with existing person but with unlinked email.
941
        testing_account = self.factory.makeAccount(
942
            self.displayname, email=self.email_address)
7675.340.2 by Celso Providelo
applying review comments, r=sinzui,gmb.
943
        testing_person = removeSecurityProxy(
944
            testing_account).createPerson(self.rationale)
10556.4.7 by Guilherme Salgado
Fix a couple tests. These should be the last ones.
945
        self.assertEqual(
946
            testing_person, testing_account.preferredemail.person)
9570.11.1 by Celso Providelo
Re-fixing IPersonSet.ensurePerson() to cope with accounts with existing person but with unlinked email.
947
10556.4.7 by Guilherme Salgado
Fix a couple tests. These should be the last ones.
948
        # Since there's an existing Person for the given email address,
949
        # IPersonSet.ensurePerson() will just return it.
9866.1.2 by Curtis Hovey
Updated _mergeMailingListSubscriptions to delete the from_id's subscriptions since
950
        ensured_person = self.person_set.ensurePerson(
9570.11.1 by Celso Providelo
Re-fixing IPersonSet.ensurePerson() to cope with accounts with existing person but with unlinked email.
951
            self.email_address, self.displayname, self.rationale)
10556.4.7 by Guilherme Salgado
Fix a couple tests. These should be the last ones.
952
        self.assertEqual(testing_person, ensured_person)
9570.11.1 by Celso Providelo
Re-fixing IPersonSet.ensurePerson() to cope with accounts with existing person but with unlinked email.
953
9305.2.1 by Celso Providelo
Experimental fix for bug #408528 (IPersonSet.ensurePerson() coping with already existing email/account scenario).
954
7675.738.1 by Michael Nelson
Further work for account/person creation.
955
class TestPersonSetGetOrCreateByOpenIDIdentifier(TestCaseWithFactory):
956
957
    layer = DatabaseFunctionalLayer
958
959
    def setUp(self):
960
        super(TestPersonSetGetOrCreateByOpenIDIdentifier, self).setUp()
7675.738.3 by Michael Nelson
Refactored existing code in webapp.login.processPositiveAssertion().
961
        self.person_set = getUtility(IPersonSet)
962
963
    def callGetOrCreate(self, identifier, email='a@b.com'):
964
        return self.person_set.getOrCreateByOpenIDIdentifier(
965
            identifier, email, "Joe Bloggs",
966
            PersonCreationRationale.SOFTWARE_CENTER_PURCHASE,
967
            "when purchasing an application via Software Center.")
7675.738.1 by Michael Nelson
Further work for account/person creation.
968
969
    def test_existing_person(self):
12001.3.23 by j.c.sackett
Updated tests and stories. Last round of this, I hope.
970
        email = 'test-email@example.com'
971
        person = self.factory.makePerson(email=email)
6374.17.23 by Stuart Bishop
Fix PersonSet tests
972
        openid_ident = removeSecurityProxy(
973
            person.account).openid_identifiers.any().identifier
7675.738.3 by Michael Nelson
Refactored existing code in webapp.login.processPositiveAssertion().
974
12001.3.23 by j.c.sackett
Updated tests and stories. Last round of this, I hope.
975
        result, db_updated = self.callGetOrCreate(openid_ident, email=email)
7675.738.3 by Michael Nelson
Refactored existing code in webapp.login.processPositiveAssertion().
976
7675.738.1 by Michael Nelson
Further work for account/person creation.
977
        self.assertEqual(person, result)
7675.738.3 by Michael Nelson
Refactored existing code in webapp.login.processPositiveAssertion().
978
        self.assertFalse(db_updated)
7675.738.1 by Michael Nelson
Further work for account/person creation.
979
980
    def test_existing_account_no_person(self):
981
        # A person is created with the correct rationale.
982
        account = self.factory.makeAccount('purchaser')
6374.17.23 by Stuart Bishop
Fix PersonSet tests
983
        openid_ident = removeSecurityProxy(
984
            account).openid_identifiers.any().identifier
7675.738.1 by Michael Nelson
Further work for account/person creation.
985
7675.738.3 by Michael Nelson
Refactored existing code in webapp.login.processPositiveAssertion().
986
        person, db_updated = self.callGetOrCreate(openid_ident)
987
7675.738.1 by Michael Nelson
Further work for account/person creation.
988
        self.assertEqual(account, person.account)
11094.2.2 by Michael Nelson
Removed the registrant param that I'd added earlier but didn't end up needing.
989
        # The person is created with the correct rationale and creation
990
        # comment.
7675.738.1 by Michael Nelson
Further work for account/person creation.
991
        self.assertEqual(
992
            "when purchasing an application via Software Center.",
993
            person.creation_comment)
994
        self.assertEqual(
995
            PersonCreationRationale.SOFTWARE_CENTER_PURCHASE,
996
            person.creation_rationale)
7675.738.3 by Michael Nelson
Refactored existing code in webapp.login.processPositiveAssertion().
997
        self.assertTrue(db_updated)
998
999
    def test_existing_deactivated_account(self):
1000
        # An existing deactivated account will be reactivated.
14617.4.1 by William Grant
Alter some tests to create people rather than accounts directly.
1001
        person = self.factory.makePerson(
1002
            account_status=AccountStatus.DEACTIVATED)
6374.17.23 by Stuart Bishop
Fix PersonSet tests
1003
        openid_ident = removeSecurityProxy(
14617.4.1 by William Grant
Alter some tests to create people rather than accounts directly.
1004
            person.account).openid_identifiers.any().identifier
7675.738.3 by Michael Nelson
Refactored existing code in webapp.login.processPositiveAssertion().
1005
14617.4.1 by William Grant
Alter some tests to create people rather than accounts directly.
1006
        found_person, db_updated = self.callGetOrCreate(openid_ident)
1007
        self.assertEqual(person, found_person)
7675.738.3 by Michael Nelson
Refactored existing code in webapp.login.processPositiveAssertion().
1008
        self.assertEqual(AccountStatus.ACTIVE, person.account.status)
1009
        self.assertTrue(db_updated)
1010
        self.assertEqual(
1011
            "when purchasing an application via Software Center.",
1012
            removeSecurityProxy(person.account).status_comment)
1013
1014
    def test_existing_suspended_account(self):
1015
        # An existing suspended account will raise an exception.
14617.4.1 by William Grant
Alter some tests to create people rather than accounts directly.
1016
        person = self.factory.makePerson(
1017
            account_status=AccountStatus.SUSPENDED)
6374.17.23 by Stuart Bishop
Fix PersonSet tests
1018
        openid_ident = removeSecurityProxy(
14617.4.1 by William Grant
Alter some tests to create people rather than accounts directly.
1019
            person.account).openid_identifiers.any().identifier
7675.738.3 by Michael Nelson
Refactored existing code in webapp.login.processPositiveAssertion().
1020
1021
        self.assertRaises(
1022
            AccountSuspendedError, self.callGetOrCreate, openid_ident)
7675.738.1 by Michael Nelson
Further work for account/person creation.
1023
1024
    def test_no_account_or_email(self):
7675.738.3 by Michael Nelson
Refactored existing code in webapp.login.processPositiveAssertion().
1025
        # An identifier can be used to create an account (it is assumed
1026
        # to be already authenticated with SSO).
6374.17.23 by Stuart Bishop
Fix PersonSet tests
1027
        person, db_updated = self.callGetOrCreate(u'openid-identifier')
7675.738.1 by Michael Nelson
Further work for account/person creation.
1028
1029
        self.assertEqual(
6374.17.23 by Stuart Bishop
Fix PersonSet tests
1030
            u"openid-identifier", removeSecurityProxy(
1031
                person.account).openid_identifiers.any().identifier)
7675.738.3 by Michael Nelson
Refactored existing code in webapp.login.processPositiveAssertion().
1032
        self.assertTrue(db_updated)
7675.738.1 by Michael Nelson
Further work for account/person creation.
1033
7675.738.3 by Michael Nelson
Refactored existing code in webapp.login.processPositiveAssertion().
1034
    def test_no_matching_account_existing_email(self):
1035
        # The openid_identity of the account matching the email will
1036
        # updated.
14617.4.1 by William Grant
Alter some tests to create people rather than accounts directly.
1037
        other_person = self.factory.makePerson('a@b.com')
7675.738.3 by Michael Nelson
Refactored existing code in webapp.login.processPositiveAssertion().
1038
1039
        person, db_updated = self.callGetOrCreate(
6374.17.23 by Stuart Bishop
Fix PersonSet tests
1040
            u'other-openid-identifier', 'a@b.com')
7675.738.3 by Michael Nelson
Refactored existing code in webapp.login.processPositiveAssertion().
1041
14617.4.1 by William Grant
Alter some tests to create people rather than accounts directly.
1042
        self.assertEqual(other_person, person)
6374.17.23 by Stuart Bishop
Fix PersonSet tests
1043
        self.assert_(
1044
            u'other-openid-identifier' in [
1045
                identifier.identifier for identifier in removeSecurityProxy(
1046
                    person.account).openid_identifiers])