~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# Copyright 2009 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Person notifications."""

__metaclass__ = type
__all__ = [
    'PersonNotification',
    'PersonNotificationSet',
    ]

from datetime import datetime

import pytz
from sqlobject import (
    ForeignKey,
    StringCol,
    )
from zope.interface import implements

from canonical.config import config
from canonical.database.constants import UTC_NOW
from canonical.database.datetimecol import UtcDateTimeCol
from canonical.database.sqlbase import (
    SQLBase,
    sqlvalues,
    )
from lp.registry.interfaces.personnotification import (
    IPersonNotification,
    IPersonNotificationSet,
    )
from lp.services.propertycache import cachedproperty
from lp.services.mail.sendmail import (
    format_address,
    simple_sendmail,
    )


class PersonNotification(SQLBase):
    """See `IPersonNotification`."""
    implements(IPersonNotification)

    person = ForeignKey(dbName='person', notNull=True, foreignKey='Person')
    date_created = UtcDateTimeCol(notNull=True, default=UTC_NOW)
    date_emailed = UtcDateTimeCol(notNull=False)
    body = StringCol(notNull=True)
    subject = StringCol(notNull=True)

    @cachedproperty
    def to_addresses(self):
        """See `IPersonNotification`."""
        if self.person.is_team:
            return self.person.getTeamAdminsEmailAddresses()
        elif self.person.preferredemail is None:
            return []
        else:
            return [format_address(
                self.person.displayname, self.person.preferredemail.email)]

    @property
    def can_send(self):
        """See `IPersonNotification`."""
        return len(self.to_addresses) > 0

    def send(self, logger=None):
        """See `IPersonNotification`."""
        if not self.can_send:
            raise AssertionError(
                "Can't send a notification to a person without an email.")
        to_addresses = self.to_addresses
        if logger:
            logger.info("Sending notification to %r." % to_addresses)
        from_addr = config.canonical.bounce_address
        simple_sendmail(from_addr, to_addresses, self.subject, self.body)
        self.date_emailed = datetime.now(pytz.timezone('UTC'))


class PersonNotificationSet:
    """See `IPersonNotificationSet`."""
    implements(IPersonNotificationSet)

    def getNotificationsToSend(self):
        """See `IPersonNotificationSet`."""
        return PersonNotification.selectBy(
            date_emailed=None, orderBy=['date_created,id'])

    def addNotification(self, person, subject, body):
        """See `IPersonNotificationSet`."""
        return PersonNotification(person=person, subject=subject, body=body)

    def getNotificationsOlderThan(self, time_limit):
        """See `IPersonNotificationSet`."""
        return PersonNotification.select(
            'date_created < %s' % sqlvalues(time_limit))