~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# Copyright 2009 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Helper functions dealing with emails in tests.
"""
__metaclass__ = type

import email
import operator

import transaction
from zope.component import getUtility

from lp.registry.interfaces.persontransferjob import (
    IMembershipNotificationJobSource,
    )
from lp.services.job.runner import JobRunner
from lp.services.log.logger import DevNullLogger
from lp.services.mail import stub


def pop_notifications(sort_key=None, commit=True):
    """Return generated emails as email messages.

    A helper function which optionally commits the transaction, so
    that the notifications are queued in stub.test_emails and pops these
    notifications from the queue.

    :param sort_key: define sorting function.  sort_key specifies a
    function of one argument that is used to extract a comparison key from
    each list element.  (See the sorted() Python built-in.)
    :param commit: whether to commit before reading email (defaults to True).
    """
    if commit:
        transaction.commit()
    if sort_key is None:
        sort_key = operator.itemgetter('To')

    notifications = []
    for fromaddr, toaddrs, raw_message in stub.test_emails:
        notification = email.message_from_string(raw_message)
        notification['X-Envelope-To'] = ', '.join(toaddrs)
        notification['X-Envelope-From'] = fromaddr
        notifications.append(notification)
    stub.test_emails = []

    return sorted(notifications, key=sort_key)


def sort_addresses(header):
    """Sort an address-list in an e-mail header field body."""
    addresses = set(address.strip() for address in header.split(','))
    return ", ".join(sorted(addresses))


def print_emails(include_reply_to=False, group_similar=False,
                 include_rationale=False, notifications=None):
    """Pop all messages from stub.test_emails and print them with
     their recipients.

    Since the same message may be sent more than once (for different
    recipients), setting 'group_similar' will print each distinct
    message only once and group all recipients of that message
    together in the 'To:' field.  It will also strip the first line of
    the email body.  (The line with "Hello Foo," which is likely
    distinct for each recipient.)

    :param include_reply_to: Include the reply-to header if True.
    :param group_similar: Group messages sent to multiple recipients if True.
    :param include_rationale: Include the X-Launchpad-Message-Rationale
        header.
    :param notifications: Use the provided list of notifications instead of
        the stack.
    """
    distinct_bodies = {}
    if notifications is None:
        notifications = pop_notifications()
    for message in notifications:
        recipients = set(
            recipient.strip()
            for recipient in message['To'].split(','))
        body = message.get_payload()
        if group_similar:
            # Strip the first line as it's different for each recipient.
            body = body[body.find('\n') + 1:]
        if body in distinct_bodies and group_similar:
            message, existing_recipients = distinct_bodies[body]
            distinct_bodies[body] = (
                message, existing_recipients.union(recipients))
        else:
            distinct_bodies[body] = (message, recipients)
    for body in sorted(distinct_bodies):
        message, recipients = distinct_bodies[body]
        print 'From:', message['From']
        print 'To:', ", ".join(sorted(recipients))
        if include_reply_to:
            print 'Reply-To:', message['Reply-To']
        rationale_header = 'X-Launchpad-Message-Rationale'
        if include_rationale and rationale_header in message:
            print (
                '%s: %s' % (rationale_header, message[rationale_header]))
        print 'Subject:', message['Subject']
        print body
        print "-" * 40


def print_distinct_emails(include_reply_to=False, include_rationale=True):
    """A convenient shortcut for `print_emails`(group_similar=True)."""
    return print_emails(group_similar=True,
                        include_reply_to=include_reply_to,
                        include_rationale=include_rationale)


def run_mail_jobs():
    """Process job queues that send out emails.

    If a new job type is added that sends emails, this function can be
    extended to run those jobs, so that testing emails doesn't require a
    bunch of different function calls to process different queues.
    """
    # Commit the transaction to make sure that the JobRunner can find
    # the queued jobs.
    transaction.commit()
    job_source = getUtility(IMembershipNotificationJobSource)
    logger = DevNullLogger()
    runner = JobRunner.fromReady(job_source, logger)
    runner.runAll()