~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""People Merge related wiew classes."""

__metaclass__ = type

__all__ = [
    'AdminPeopleMergeView',
    'AdminTeamMergeView',
    'DeleteTeamView',
    'FinishedPeopleMergeRequestView',
    'RequestPeopleMergeMultipleEmailsView',
    'RequestPeopleMergeView',
    ]


from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy

from canonical.launchpad import _
from canonical.launchpad.interfaces.authtoken import LoginTokenType
from canonical.launchpad.interfaces.emailaddress import (
    EmailAddressStatus,
    IEmailAddressSet,
    )
from canonical.launchpad.interfaces.logintoken import ILoginTokenSet
from canonical.launchpad.interfaces.lpstorm import IMasterObject
from canonical.launchpad.webapp import (
    canonical_url,
    LaunchpadView,
    )
from canonical.launchpad.webapp.interfaces import ILaunchBag
from lp.app.browser.launchpadform import (
    action,
    LaunchpadFormView,
    )
from lp.app.interfaces.launchpad import ILaunchpadCelebrities
from lp.registry.interfaces.mailinglist import (
    MailingListStatus,
    PURGE_STATES,
    )
from lp.registry.interfaces.person import (
    IAdminPeopleMergeSchema,
    IAdminTeamMergeSchema,
    IPersonSet,
    IRequestPeopleMerge,
    )
from lp.services.propertycache import cachedproperty
from lp.soyuz.enums import ArchiveStatus
from lp.soyuz.interfaces.archive import IArchiveSet


class ValidatingMergeView(LaunchpadFormView):

    def validate(self, data):
        """Check that user is not attempting to merge a person into itself."""
        dupe_person = data.get('dupe_person')
        target_person = data.get('target_person') or self.user
        if dupe_person is None:
            self.setFieldError(
                'dupe_person', 'The duplicate is not a valid person or team.')
        else:
            if dupe_person == target_person:
                self.addError(_("You can't merge ${name} into itself.",
                      mapping=dict(name=dupe_person.name)))
            dupe_person_ppas = getUtility(IArchiveSet).getPPAOwnedByPerson(
                dupe_person, statuses=[ArchiveStatus.ACTIVE,
                                       ArchiveStatus.DELETING])
            if dupe_person_ppas is not None:
                self.addError(_(
                    "${name} has a PPA that must be deleted before it "
                    "can be merged. It may take ten minutes to remove the "
                    "deleted PPA's files.",
                    mapping=dict(name=dupe_person.name)))
            if dupe_person.is_merge_pending:
                self.addError(_("${name} is already queued for merging.",
                      mapping=dict(name=dupe_person.name)))
        if target_person is not None and target_person.is_merge_pending:
            self.addError(_("${name} is already queued for merging.",
                  mapping=dict(name=target_person.name)))


class AdminMergeBaseView(ValidatingMergeView):
    """Base view for the pages where admins can merge people/teams."""

    page_title = 'Merge Launchpad accounts'
    # Both subclasses share the same template so we need to define these
    # variables (which are used in the template) here rather than on
    # subclasses.
    should_confirm_email_reassignment = False
    should_confirm_member_deactivation = False
    merge_message = _(
        'A merge is queued and is expected to complete in a few minutes.')

    dupe_person_emails = ()
    dupe_person = None
    target_person = None
    delete = False

    @property
    def cancel_url(self):
        return canonical_url(getUtility(IPersonSet))

    @property
    def success_url(self):
        return canonical_url(self.target_person)

    def render(self):
        # Subclasses may define other actions that they will render manually
        # only in certain circunstances, so don't include them in the list of
        # actions to be rendered.
        self.actions = [self.merge_action]
        return super(AdminMergeBaseView, self).render()

    def setUpPeople(self, data):
        """Store the people to be merged in instance variables.

        Also store all emails associated with the dupe account in an
        instance variable.
        """
        emailset = getUtility(IEmailAddressSet)
        self.dupe_person = data['dupe_person']
        self.target_person = data.get('target_person', None)
        self.dupe_person_emails = emailset.getByPerson(self.dupe_person)

    def doMerge(self, data):
        """Merge the two person/team entries specified in the form.

        Before merging this moves each email address of the duplicate person
        to the target person, and resets them to `NEW`.
        """
        if not self.dupe_person.is_team:
            # Transfer user email addresses. Team addresses will be deleted.
            for email in self.dupe_person_emails:
                email = IMasterObject(email)
                # EmailAddress.person and EmailAddress.account are readonly
                # fields, so we need to remove the security proxy here.
                naked_email = removeSecurityProxy(email)
                naked_email.personID = self.target_person.id
                naked_email.accountID = self.target_person.accountID
                naked_email.status = EmailAddressStatus.NEW
        getUtility(IPersonSet).mergeAsync(
            self.dupe_person, self.target_person, reviewer=self.user,
            delete=self.delete)
        self.request.response.addInfoNotification(self.merge_message)
        self.next_url = self.success_url


class AdminPeopleMergeView(AdminMergeBaseView):
    """A view for merging two Persons.

    If the duplicate person has any email addresses associated with we'll
    ask the user to confirm that it's okay to reassign these emails to the
    other account.  We do it because the fact that the dupe person still has
    email addresses is a possible indication that the admin may be merging
    the wrong person.
    """

    label = "Merge Launchpad people"
    schema = IAdminPeopleMergeSchema

    @action('Merge', name='merge')
    def merge_action(self, action, data):
        """Merge the two person entries specified in the form.

        If we're merging a person which has email addresses associated with
        we'll ask for confirmation before actually performing the merge.
        """
        self.setUpPeople(data)
        if self.dupe_person_emails.count() > 0:
            # We're merging a person which has one or more email addresses,
            # so we better warn the admin doing the operation and have him
            # check the emails that will be reassigned to ensure he's not
            # doing anything stupid.
            self.should_confirm_email_reassignment = True
            return
        self.doMerge(data)

    @action('Reassign E-mails and Merge', name='reassign_emails_and_merge')
    def reassign_emails_and_merge_action(self, action, data):
        """Reassign emails of the person to be merged and merge them."""
        self.setUpPeople(data)
        self.doMerge(data)


class AdminTeamMergeView(AdminMergeBaseView):
    """A view for merging two Teams.

    The duplicate team cannot be associated with a mailing list and if it
    has any active members we'll ask for confirmation from the user as we'll
    need to deactivate all members before we can do the merge.
    """

    label = "Merge Launchpad teams"
    schema = IAdminTeamMergeSchema

    def hasMailingList(self, team):
        unused_states = [state for state in PURGE_STATES]
        unused_states.append(MailingListStatus.PURGED)
        return (
            team.mailing_list is not None
            and team.mailing_list.status not in unused_states)

    @cachedproperty
    def registry_experts(self):
        return getUtility(ILaunchpadCelebrities).registry_experts

    def validate(self, data):
        """Check there are no mailing lists associated with the dupe team."""
        # If errors have already been discovered there is no need to continue,
        # especially since some of our expected data may be missing in the
        # case of user-entered invalid data.
        if len(self.errors) > 0:
            return

        super(AdminTeamMergeView, self).validate(data)
        dupe_team = data['dupe_person']
        # We cannot merge the teams if there is a mailing list on the
        # duplicate person, unless that mailing list is purged.
        if self.hasMailingList(dupe_team):
            self.addError(_(
                "${name} is associated with a Launchpad mailing list; we "
                "can't merge it.", mapping=dict(name=dupe_team.name)))

    @action('Merge', name='merge')
    def merge_action(self, action, data):
        """Merge the two team entries specified in the form.

        A confirmation will be asked if the team we're merging from still
        has active members, as in that case we'll have to deactivate all
        members first.
        """
        self.setUpPeople(data)
        if self.dupe_person.activemembers.count() > 0:
            # Merging teams with active members is not possible, so we'll
            # ask the admin if he wants to deactivate all members and then
            # merge.
            self.should_confirm_member_deactivation = True
            return
        super(AdminTeamMergeView, self).doMerge(data)

    @action('Deactivate Members and Merge',
            name='deactivate_members_and_merge')
    def deactivate_members_and_merge_action(self, action, data):
        """Deactivate all members of the team to be merged and merge them."""
        self.setUpPeople(data)
        super(AdminTeamMergeView, self).doMerge(data)


class DeleteTeamView(AdminTeamMergeView):
    """A view that deletes a team by merging it with Registry experts."""

    page_title = 'Delete'
    field_names = ['dupe_person']
    merge_message = _('The team is queued to be deleted.')

    @property
    def label(self):
        return 'Delete %s' % self.context.displayname

    def __init__(self, context, request):
        super(DeleteTeamView, self).__init__(context, request)
        if ('field.dupe_person' in self.request.form):
            # These fields have fixed values and are managed by this method.
            # The user has crafted a request to gain ownership of the dupe
            # team's assets.
            self.addError('Unable to process submitted data.')
        elif 'field.actions.delete' in self.request.form:
            # In the case of deleting a team, the form values are always
            # the context team, and the registry experts team. These values
            # are injected during __init__ because the base classes assume the
            # values are submitted. The validations performed by the base
            # classes are still required to ensure the team can be deleted.
            self.request.form.update(self.default_values)
        else:
            # Show the page explaining the action.
            pass

    @property
    def default_values(self):
        return {
            'field.dupe_person': self.context.name,
            'field.delete': True,
            }

    @property
    def cancel_url(self):
        return canonical_url(self.context)

    @property
    def success_url(self):
        return canonical_url(getUtility(IPersonSet))

    @property
    def has_mailing_list(self):
        return self.hasMailingList(self.context)

    def canDelete(self, data):
        return not self.has_mailing_list

    @action('Delete', name='delete', condition=canDelete)
    def merge_action(self, action, data):
        base = super(DeleteTeamView, self)
        self.delete = True
        base.deactivate_members_and_merge_action.success(data)


class FinishedPeopleMergeRequestView(LaunchpadView):
    """A simple view for a page where we only tell the user that we sent the
    email with further instructions to complete the merge.

    This view is used only when the dupe account has a single email address.
    """

    page_title = 'Merge request sent'

    def initialize(self):
        user = getUtility(ILaunchBag).user
        try:
            dupe_id = int(self.request.get('dupe'))
        except (ValueError, TypeError):
            self.request.response.redirect(canonical_url(user))
            return

        dupe_account = getUtility(IPersonSet).get(dupe_id)
        results = getUtility(IEmailAddressSet).getByPerson(dupe_account)

        result_count = results.count()
        if not result_count:
            # The user came back to visit this page with nothing to
            # merge, so we redirect him away to somewhere useful.
            self.request.response.redirect(canonical_url(user))
            return
        assert result_count == 1
        # Need to remove the security proxy because the dupe account may have
        # hidden email addresses.
        self.dupe_email = removeSecurityProxy(results[0]).email

    def render(self):
        if self.dupe_email:
            return LaunchpadView.render(self)
        else:
            return ''


class RequestPeopleMergeMultipleEmailsView(LaunchpadView):
    """Merge request view when dupe account has multiple email addresses."""

    label = 'Merge Launchpad accounts'
    page_title = label

    def __init__(self, context, request):
        super(RequestPeopleMergeMultipleEmailsView, self).__init__(
            context, request)
        self.form_processed = False
        self.dupe = None
        self.notified_addresses = []

    def processForm(self):
        dupe = self.request.form.get('dupe')
        if dupe is None:
            # We just got redirected to this page and we don't have the dupe
            # hidden field in request.form.
            dupe = self.request.get('dupe')
            if dupe is None:
                return

        self.dupe = getUtility(IPersonSet).get(int(dupe))
        emailaddrset = getUtility(IEmailAddressSet)
        self.dupeemails = emailaddrset.getByPerson(self.dupe)

        if self.request.method != "POST":
            return

        login = getUtility(ILaunchBag).login
        logintokenset = getUtility(ILoginTokenSet)

        email_addresses = []
        if self.email_hidden:
            # If the email addresses are hidden we must send a merge request
            # to each of them.  But first we've got to remove the security
            # proxy so we can get to them.
            email_addresses = [removeSecurityProxy(email).email
                               for email in self.dupeemails]
        else:
            # Otherwise we send a merge request only to the ones the user
            # selected.
            emails = self.request.form.get("selected")
            if emails is not None:
                # We can have multiple email addresses selected, and in this
                # case emails will be a list. Otherwise it will be a string
                # and we need to make a list with that value to use in the for
                # loop.
                if not isinstance(emails, list):
                    emails = [emails]

                for emailaddress in emails:
                    email = emailaddrset.getByEmail(emailaddress)
                    if email is None or email not in self.dupeemails:
                        # The dupe person has changes his email addresses.
                        # See bug 239838.
                        self.request.response.addNotification(
                            "An address was removed from the duplicate "
                            "account while you were making this merge "
                            "request. Select again.")
                        return
                    email_addresses.append(emailaddress)

        for emailaddress in email_addresses:
            token = logintokenset.new(
                self.user, login, emailaddress, LoginTokenType.ACCOUNTMERGE)
            token.sendMergeRequestEmail()
            self.notified_addresses.append(emailaddress)
        self.form_processed = True

    @property
    def cancel_url(self):
        """Cancel URL."""
        return canonical_url(self.user)

    @property
    def email_hidden(self):
        """Does the duplicate account hide email addresses?"""
        return self.dupe.hide_email_addresses


class RequestPeopleMergeView(ValidatingMergeView):
    """The view for the page where the user asks a merge of two accounts.

    If the dupe account have only one email address we send a message to that
    address and then redirect the user to other page saying that everything
    went fine. Otherwise we redirect the user to another page where we list
    all email addresses owned by the dupe account and the user selects which
    of those (s)he wants to claim.
    """

    label = 'Merge Launchpad accounts'
    page_title = label
    schema = IRequestPeopleMerge

    @property
    def cancel_url(self):
        return canonical_url(getUtility(IPersonSet))

    @action('Continue', name='continue')
    def continue_action(self, action, data):
        dupeaccount = data['dupe_person']
        if dupeaccount == self.user:
            # Please, don't try to merge you into yourself.
            return

        emails = getUtility(IEmailAddressSet).getByPerson(dupeaccount)
        emails_count = emails.count()
        if emails_count > 1:
            # The dupe account have more than one email address. Must redirect
            # the user to another page to ask which of those emails (s)he
            # wants to claim.
            self.next_url = '+requestmerge-multiple?dupe=%d' % dupeaccount.id
            return

        assert emails_count == 1
        email = emails[0]
        login = getUtility(ILaunchBag).login
        logintokenset = getUtility(ILoginTokenSet)
        # Need to remove the security proxy because the dupe account may have
        # hidden email addresses.
        token = logintokenset.new(
            self.user, login, removeSecurityProxy(email).email,
            LoginTokenType.ACCOUNTMERGE)
        token.sendMergeRequestEmail()
        self.next_url = './+mergerequest-sent?dupe=%d' % dupeaccount.id