~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# Copyright 2009 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

# pylint: disable-msg=E0611,W0212

"""Database classes for linking bugtasks and branches."""

__metaclass__ = type

__all__ = ["BugBranch",
           "BugBranchSet"]

from sqlobject import (
    ForeignKey,
    IN,
    IntCol,
    StringCol,
    )
from storm.expr import (
    And,
    Exists,
    Or,
    Select,
    )
from zope.component import getUtility
from zope.interface import implements

from lp.app.interfaces.launchpad import ILaunchpadCelebrities
from lp.bugs.interfaces.bugbranch import (
    IBugBranch,
    IBugBranchSet,
    )
from lp.code.interfaces.branchtarget import IHasBranchTarget
from lp.registry.interfaces.person import validate_public_person
from lp.registry.model.teammembership import TeamParticipation
from lp.services.database.constants import UTC_NOW
from lp.services.database.datetimecol import UtcDateTimeCol
from lp.services.database.lpstorm import IStore
from lp.services.database.sqlbase import SQLBase


class BugBranch(SQLBase):
    """See `IBugBranch`."""
    implements(IBugBranch, IHasBranchTarget)

    datecreated = UtcDateTimeCol(notNull=True, default=UTC_NOW)
    bug = ForeignKey(dbName="bug", foreignKey="Bug", notNull=True)
    branch_id = IntCol(dbName="branch", notNull=True)
    branch = ForeignKey(dbName="branch", foreignKey="Branch", notNull=True)
    revision_hint = StringCol(default=None)

    registrant = ForeignKey(
        dbName='registrant', foreignKey='Person',
        storm_validator=validate_public_person, notNull=True)

    @property
    def target(self):
        """See `IHasBranchTarget`."""
        return self.branch.target

    @property
    def bug_task(self):
        """See `IBugBranch`."""
        task = self.bug.getBugTask(self.branch.product)
        if task is None:
            # Just choose the first task for the bug.
            task = self.bug.bugtasks[0]
        return task


class BugBranchSet:

    implements(IBugBranchSet)

    def getBugBranch(self, bug, branch):
        "See `IBugBranchSet`."
        return BugBranch.selectOneBy(bugID=bug.id, branchID=branch.id)

    def getBranchesWithVisibleBugs(self, branches, user):
        """See `IBugBranchSet`."""
        # Avoid circular imports.
        from lp.bugs.model.bug import Bug
        from lp.bugs.model.bugsubscription import BugSubscription

        branch_ids = [branch.id for branch in branches]
        if branch_ids == []:
            return []

        admins = getUtility(ILaunchpadCelebrities).admin
        if user is None:
            # Anonymous visitors only get to know about public bugs.
            visible = And(
                Bug.id == BugBranch.bugID,
                Bug.private == False)
        elif user.inTeam(admins):
            # Administrators know about all bugs.
            visible = True
        else:
            # Anyone else can know about public bugs plus any private
            # ones they may be directly or indirectly subscribed to.
            subscribed = And(
                TeamParticipation.teamID == BugSubscription.person_id,
                TeamParticipation.personID == user.id,
                Bug.id == BugSubscription.bug_id)

            visible = And(
                Bug.id == BugBranch.bugID,
                Or(
                    Bug.private == False,
                    Exists(Select(
                        columns=[True],
                        tables=[BugSubscription, TeamParticipation],
                        where=subscribed))))

        return IStore(BugBranch).find(
            BugBranch.branchID,
            BugBranch.branch_id.is_in(branch_ids),
            visible).config(distinct=True)

    def getBugBranchesForBugTasks(self, tasks):
        "See IBugBranchSet."
        bug_ids = [task.bugID for task in tasks]
        if not bug_ids:
            return []
        bugbranches = BugBranch.select(IN(BugBranch.q.bugID, bug_ids),
                                       orderBy=['branch'])
        return bugbranches.prejoin(
            ['branch', 'branch.owner', 'branch.product'])