~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Email code for the branch scanner."""

__metaclass__ = type
__all__ = [
    'send_removed_revision_emails',
    'queue_tip_changed_email_jobs',
    ]

from zope.component import getUtility

from canonical.config import config
from lp.code.enums import BranchSubscriptionNotificationLevel
from lp.code.interfaces.branchjob import (
    IRevisionMailJobSource,
    IRevisionsAddedJobSource,
    )


def subscribers_want_notification(db_branch):
    diff_levels = (
        BranchSubscriptionNotificationLevel.DIFFSONLY,
        BranchSubscriptionNotificationLevel.FULL)
    subscriptions = db_branch.getSubscriptionsByLevel(diff_levels)
    return subscriptions.count() > 0


def send_removed_revision_emails(revisions_removed):
    """Notify subscribers of removed revisions.

    When the history is shortened, we send an email that says this.
    """
    if not subscribers_want_notification(revisions_removed.db_branch):
        return
    number_removed = len(revisions_removed.removed_history)
    if number_removed == 0:
        return
    if number_removed == 1:
        count = '1 revision'
        contents = '%s was removed from the branch.' % count
    else:
        count = '%d revisions' % number_removed
        contents = '%s were removed from the branch.' % count
    # No diff is associated with the removed email.
    subject = "[Branch %s] %s removed" % (
        revisions_removed.db_branch.unique_name, count)
    getUtility(IRevisionMailJobSource).create(
        revisions_removed.db_branch, revno='removed',
        from_address=config.canonical.noreply_from_address,
        body=contents, subject=subject)


def queue_tip_changed_email_jobs(tip_changed):
    if not subscribers_want_notification(tip_changed.db_branch):
        return
    if tip_changed.initial_scan:
        revision_count = tip_changed.bzr_branch.revno()
        if revision_count == 1:
            revisions = '1 revision'
        else:
            revisions = '%d revisions' % revision_count
        message = ('First scan of the branch detected %s'
                   ' in the revision history of the branch.' %
                   revisions)
        subject = "[Branch %s] %s" % (
            tip_changed.db_branch.unique_name, revisions)
        getUtility(IRevisionMailJobSource).create(
            tip_changed.db_branch, 'initial',
            config.canonical.noreply_from_address, message, subject)
    else:
        getUtility(IRevisionsAddedJobSource).create(
            tip_changed.db_branch, tip_changed.db_branch.last_scanned_id,
            tip_changed.bzr_branch.last_revision(),
            config.canonical.noreply_from_address)