~launchpad-pqm/launchpad/devel

7675.906.1 by Ian Booth
Initial prototype
1
# Copyright 2010 Canonical Ltd.  This software is licensed under the
2
# GNU Affero General Public License version 3 (see the file LICENSE).
3
4
"""Implementations of `IBranchMergeQueueCollection`."""
5
6
__metaclass__ = type
7
__all__ = [
8
    'GenericBranchCollection',
9
    ]
10
11
from zope.interface import implements
12
7675.906.5 by Ian Booth
Finish view and add unit tests
13
from lp.code.interfaces.branchmergequeue import (
14
    user_has_special_merge_queue_access,
15
    )
7675.906.1 by Ian Booth
Initial prototype
16
from lp.code.interfaces.branchmergequeuecollection import (
17
    IBranchMergeQueueCollection,
18
    InvalidFilter,
19
    )
7675.906.3 by Ian Booth
Clean up - remoce some prototyping code
20
from lp.code.interfaces.codehosting import LAUNCHPAD_SERVICES
7675.906.1 by Ian Booth
Initial prototype
21
from lp.code.model.branchmergequeue import BranchMergeQueue
14612.2.1 by William Grant
format-imports on lib/. So many imports.
22
from lp.services.database.lpstorm import IMasterStore
7675.906.1 by Ian Booth
Initial prototype
23
24
25
class GenericBranchMergeQueueCollection:
26
    """See `IBranchMergeQueueCollection`."""
27
28
    implements(IBranchMergeQueueCollection)
29
30
    def __init__(self, store=None, merge_queue_filter_expressions=None,
31
                 tables=None, exclude_from_search=None):
32
        """Construct a `GenericBranchMergeQueueCollection`.
33
34
        :param store: The store to look in for merge queues. If not specified,
35
            use the default store.
36
        :param merge_queue_filter_expressions: A list of Storm expressions to
37
            restrict the queues in the collection. If unspecified, then
38
            there will be no restrictions on the result set. That is, all
39
            queues in the store will be in the collection.
40
        :param tables: A dict of Storm tables to the Join expression.  If an
41
            expression in merge_queue_filter_expressions refers to a table,
42
            then that table *must* be in this list.
43
        """
44
        self._store = store
45
        if merge_queue_filter_expressions is None:
46
            merge_queue_filter_expressions = []
47
        self._merge_queue_filter_expressions = merge_queue_filter_expressions
48
        if tables is None:
49
            tables = {}
50
        self._tables = tables
51
        if exclude_from_search is None:
52
            exclude_from_search = []
53
        self._exclude_from_search = exclude_from_search
54
55
    def count(self):
7675.906.5 by Ian Booth
Finish view and add unit tests
56
        return self._getCount()
57
58
    def _getCount(self):
7675.906.1 by Ian Booth
Initial prototype
59
        """See `IBranchMergeQueueCollection`."""
7675.906.5 by Ian Booth
Finish view and add unit tests
60
        return self._getMergeQueues().count()
7675.906.1 by Ian Booth
Initial prototype
61
62
    @property
63
    def store(self):
64
        if self._store is None:
7675.906.7 by Ian Booth
code review fixes
65
            return IMasterStore(BranchMergeQueue)
7675.906.1 by Ian Booth
Initial prototype
66
        else:
67
            return self._store
68
69
    def _filterBy(self, expressions, table=None, join=None,
70
                  exclude_from_search=None):
71
        """Return a subset of this collection, filtered by 'expressions'."""
72
        tables = self._tables.copy()
73
        if table is not None:
74
            if join is None:
75
                raise InvalidFilter("Cannot specify a table without a join.")
76
            tables[table] = join
77
        if exclude_from_search is None:
78
            exclude_from_search = []
79
        if expressions is None:
80
            expressions = []
81
        return self.__class__(
82
            self.store,
83
            self._merge_queue_filter_expressions + expressions,
84
            tables,
85
            self._exclude_from_search + exclude_from_search)
86
87
    def _getMergeQueueExpressions(self):
88
        """Return the where expressions for this collection."""
89
        return self._merge_queue_filter_expressions
90
91
    def getMergeQueues(self):
7675.906.5 by Ian Booth
Finish view and add unit tests
92
        return list(self._getMergeQueues())
93
94
    def _getMergeQueues(self):
7675.906.2 by Ian Booth
Next round of work
95
        """See `IBranchMergeQueueCollection`."""
7675.906.1 by Ian Booth
Initial prototype
96
        tables = [BranchMergeQueue] + self._tables.values()
97
        expressions = self._getMergeQueueExpressions()
98
        return self.store.using(*tables).find(BranchMergeQueue, *expressions)
99
100
    def ownedBy(self, person):
101
        """See `IBranchMergeQueueCollection`."""
102
        return self._filterBy([BranchMergeQueue.owner == person])
103
7675.906.3 by Ian Booth
Clean up - remoce some prototyping code
104
    def visibleByUser(self, person):
105
        """See `IBranchMergeQueueCollection`."""
106
        if (person == LAUNCHPAD_SERVICES or
107
            user_has_special_merge_queue_access(person)):
108
            return self
7675.906.5 by Ian Booth
Finish view and add unit tests
109
        return VisibleBranchMergeQueueCollection(
110
            person,
7675.906.3 by Ian Booth
Clean up - remoce some prototyping code
111
            self._store, None,
112
            self._tables, self._exclude_from_search)
113
114
7675.906.5 by Ian Booth
Finish view and add unit tests
115
class VisibleBranchMergeQueueCollection(GenericBranchMergeQueueCollection):
116
    """A mergequeue collection which provides queues visible by a user."""
117
118
    def __init__(self, person, store=None,
119
                 merge_queue_filter_expressions=None, tables=None,
120
                 exclude_from_search=None):
121
        super(VisibleBranchMergeQueueCollection, self).__init__(
122
            store=store,
123
            merge_queue_filter_expressions=merge_queue_filter_expressions,
124
            tables=tables,
125
            exclude_from_search=exclude_from_search,
126
        )
127
        self._user = person
128
129
    def _filterBy(self, expressions, table=None, join=None,
130
                  exclude_from_search=None):
131
        """Return a subset of this collection, filtered by 'expressions'."""
132
        tables = self._tables.copy()
133
        if table is not None:
134
            if join is None:
135
                raise InvalidFilter("Cannot specify a table without a join.")
136
            tables[table] = join
137
        if exclude_from_search is None:
138
            exclude_from_search = []
139
        if expressions is None:
140
            expressions = []
141
        return self.__class__(
142
            self._user,
143
            self.store,
144
            self._merge_queue_filter_expressions + expressions,
145
            tables,
146
            self._exclude_from_search + exclude_from_search)
147
148
    def visibleByUser(self, person):
149
        """See `IBranchMergeQueueCollection`."""
150
        if person == self._user:
151
            return self
152
        raise InvalidFilter(
153
            "Cannot filter for merge queues visible by user %r, already "
154
            "filtering for %r" % (person, self._user))
155
156
    def _getCount(self):
157
        """See `IBranchMergeQueueCollection`."""
158
        return len(self._getMergeQueues())
159
160
    def _getMergeQueues(self):
161
        """Return the queues visible by self._user.
162
163
        A queue is visible to a user if that user can see all the branches
164
        associated with the queue.
165
        """
166
167
        def allBranchesVisible(user, branches):
168
            return len([branch for branch in branches
169
                        if branch.visibleByUser(user)]) == branches.count()
170
171
        queues = super(
172
            VisibleBranchMergeQueueCollection, self)._getMergeQueues()
173
        return [queue for queue in queues
174
                if allBranchesVisible(self._user, queue.branches)]