~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Import mailing list information."""

__metaclass__ = type
__all__ = [
    'Importer',
    ]


from email.Utils import parseaddr

from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy

from lp.registry.interfaces.mailinglist import (
    CannotSubscribe,
    IMailingListSet,
    MailingListStatus,
    )
from lp.registry.interfaces.person import IPersonSet
from lp.registry.interfaces.teammembership import TeamMembershipStatus
from lp.services.identity.interfaces.emailaddress import (
    EmailAddressStatus,
    IEmailAddressSet,
    )
from lp.services.log.logger import BufferLogger


class Importer:
    """Perform mailing list imports for command line scripts."""

    def __init__(self, team_name, log=None):
        self.team_name = team_name
        self.team = getUtility(IPersonSet).getByName(team_name)
        assert self.team is not None, (
            'No team with name: %s' % team_name)
        self.mailing_list = getUtility(IMailingListSet).get(team_name)
        assert self.mailing_list is not None, (
            'Team has no mailing list: %s' % team_name)
        assert self.mailing_list.status == MailingListStatus.ACTIVE, (
            'Team mailing list is not active: %s' % team_name)
        if log is None:
            self.log = BufferLogger()
        else:
            self.log = log

    def importAddresses(self, addresses):
        """Import all addresses.

        Every address that is preferred or validated and connected to a person
        is made a member of the team, and is subscribed to the mailing list
        (with the address given).  If the address is not valid, or if it is
        associated with a team, the address is ignored.

        :param addresses: The email addresses to join and subscribe.
        :type addresses: sequence of strings
        """
        email_set = getUtility(IEmailAddressSet)
        person_set = getUtility(IPersonSet)
        for entry in addresses:
            real_name, address = parseaddr(entry)
            # address could be empty or None.
            if not address:
                continue
            person = person_set.getByEmail(address)
            if person is None or person.is_team:
                self.log.error('No person for address: %s', address)
                continue
            email = email_set.getByEmail(address)
            assert email is not None, (
                'Address has no IEmailAddress? %s' % address)
            if email.status not in (EmailAddressStatus.PREFERRED,
                                    EmailAddressStatus.VALIDATED):
                self.log.error('No valid email for address: %s', address)
                continue
            # Turn off may_subscribe_to_list because we want to explicitly
            # force subscription without relying on the person's
            # auto-subscribe policy.
            naked_team = removeSecurityProxy(self.team)
            naked_team.addMember(person, reviewer=person,
                                 status=TeamMembershipStatus.APPROVED,
                                 force_team_add=True,
                                 may_subscribe_to_list=False)
            try:
                self.mailing_list.subscribe(person, email)
            except CannotSubscribe, error:
                self.log.error('%s', error)
            # It's okay to str()-ify these because addresses and person names
            # are guaranteed to be in the ASCII range.
            self.log.info('%s (%s) joined and subscribed',
                          str(address), str(person.name))

    def importFromFile(self, filename):
        """Import all addresses given in the named file.

        The named file has email address to import, one per line.  The lines
        may be formatted using any format recognized by
        `email.Utils.parseaddr()`.

        :param filename: The name of the file containing email address.
        :type filename: string
        """
        in_file = open(filename)
        try:
            addresses = list(in_file)
        finally:
            in_file.close()
        self.importAddresses(addresses)