1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
# Copyright 2009 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Person notifications."""
__metaclass__ = type
__all__ = [
'PersonNotification',
'PersonNotificationSet',
]
from datetime import datetime
import pytz
from sqlobject import (
ForeignKey,
StringCol,
)
from zope.interface import implements
from canonical.config import config
from canonical.database.constants import UTC_NOW
from canonical.database.datetimecol import UtcDateTimeCol
from canonical.database.sqlbase import (
SQLBase,
sqlvalues,
)
from lp.registry.interfaces.personnotification import (
IPersonNotification,
IPersonNotificationSet,
)
from lp.services.propertycache import cachedproperty
from lp.services.mail.sendmail import (
format_address,
simple_sendmail,
)
class PersonNotification(SQLBase):
"""See `IPersonNotification`."""
implements(IPersonNotification)
person = ForeignKey(dbName='person', notNull=True, foreignKey='Person')
date_created = UtcDateTimeCol(notNull=True, default=UTC_NOW)
date_emailed = UtcDateTimeCol(notNull=False)
body = StringCol(notNull=True)
subject = StringCol(notNull=True)
@cachedproperty
def to_addresses(self):
"""See `IPersonNotification`."""
if self.person.is_team:
return self.person.getTeamAdminsEmailAddresses()
elif self.person.preferredemail is None:
return []
else:
return [format_address(
self.person.displayname, self.person.preferredemail.email)]
@property
def can_send(self):
"""See `IPersonNotification`."""
return len(self.to_addresses) > 0
def send(self, logger=None):
"""See `IPersonNotification`."""
if not self.can_send:
raise AssertionError(
"Can't send a notification to a person without an email.")
to_addresses = self.to_addresses
if logger:
logger.info("Sending notification to %r." % to_addresses)
from_addr = config.canonical.bounce_address
simple_sendmail(from_addr, to_addresses, self.subject, self.body)
self.date_emailed = datetime.now(pytz.timezone('UTC'))
class PersonNotificationSet:
"""See `IPersonNotificationSet`."""
implements(IPersonNotificationSet)
def getNotificationsToSend(self):
"""See `IPersonNotificationSet`."""
return PersonNotification.selectBy(
date_emailed=None, orderBy=['date_created,id'])
def addNotification(self, person, subject, body):
"""See `IPersonNotificationSet`."""
return PersonNotification(person=person, subject=subject, body=body)
def getNotificationsOlderThan(self, time_limit):
"""See `IPersonNotificationSet`."""
return PersonNotification.select(
'date_created < %s' % sqlvalues(time_limit))
|