~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# Copyright 2009 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

# pylint: disable-msg=E0611,W0212

__metaclass__ = type
__all__ = [
    'BinaryPackageName',
    'BinaryPackageNameSet',
    'BinaryPackageNameVocabulary',
    'getBinaryPackageDescriptions',
]

# SQLObject/SQLBase
from sqlobject import (
    SQLObjectNotFound,
    StringCol,
    )
from storm.store import EmptyResultSet
# Zope imports
from zope.interface import implements
from zope.schema.vocabulary import SimpleTerm

from canonical.database.sqlbase import (
    SQLBase,
    sqlvalues,
    )
from lp.services.helpers import ensure_unicode
from lp.services.database.lpstorm import IStore
from canonical.launchpad.webapp.vocabulary import (
    BatchedCountableIterator,
    NamedSQLObjectHugeVocabulary,
    )
from lp.app.errors import NotFoundError
from lp.soyuz.enums import PackagePublishingStatus
from lp.soyuz.interfaces.binarypackagename import (
    IBinaryPackageName,
    IBinaryPackageNameSet,
    )
from lp.soyuz.model.binarypackagerelease import BinaryPackageRelease


class BinaryPackageName(SQLBase):

    implements(IBinaryPackageName)
    _table = 'BinaryPackageName'
    name = StringCol(dbName='name', notNull=True, unique=True,
                     alternateID=True)

    def __unicode__(self):
        return self.name

    def __repr__(self):
        return "<BinaryPackageName at %X name=%r>" % (id(self), self.name)


class BinaryPackageNameSet:
    implements(IBinaryPackageNameSet)

    def __getitem__(self, name):
        """See `IBinaryPackageNameSet`."""
        try:
            return BinaryPackageName.byName(name)
        except SQLObjectNotFound:
            raise NotFoundError(name)

    def getAll(self):
        """See `IBinaryPackageNameSet`."""
        return BinaryPackageName.select()

    def findByName(self, name):
        """Find binarypackagenames by its name or part of it."""
        return IStore(BinaryPackageName).find(
            BinaryPackageName,
            BinaryPackageName.name.contains_string(ensure_unicode(name)))

    def queryByName(self, name):
        return IStore(BinaryPackageName).find(
            BinaryPackageName, name=ensure_unicode(name)).one()

    def new(self, name):
        return BinaryPackageName(name=ensure_unicode(name))

    def ensure(self, name):
        """Ensure that the given BinaryPackageName exists, creating it
        if necessary.

        Returns the BinaryPackageName
        """
        name = ensure_unicode(name)
        try:
            return self[name]
        except NotFoundError:
            return self.new(name)

    getOrCreateByName = ensure

    def getNotNewByNames(self, name_ids, distroseries, archive_ids):
        """See `IBinaryPackageNameSet`."""
        # Here we're returning `BinaryPackageName`s where the records
        # for the supplied `BinaryPackageName` IDs are published in the
        # supplied distroseries.  If they're already published then they
        # must not be new.
        if len(name_ids) == 0:
            return EmptyResultSet()

        statuses = (
            PackagePublishingStatus.PUBLISHED,
            PackagePublishingStatus.PENDING,
            )

        return BinaryPackageName.select("""
            BinaryPackagePublishingHistory.binarypackagerelease =
                BinaryPackageRelease.id AND
            BinaryPackagePublishingHistory.distroarchseries =
                DistroArchSeries.id AND
            DistroArchSeries.distroseries = %s AND
            BinaryPackagePublishingHistory.status IN %s AND
            BinaryPackagePublishingHistory.archive IN %s AND
            BinaryPackageRelease.binarypackagename = BinaryPackageName.id AND
            BinaryPackageName.id IN %s
            """ % sqlvalues(distroseries, statuses, archive_ids, name_ids),
            distinct=True,
            clauseTables=["BinaryPackagePublishingHistory",
                          "BinaryPackageRelease",
                          "DistroArchSeries"])


class BinaryPackageNameIterator(BatchedCountableIterator):
    """An iterator for BinaryPackageNameVocabulary.

    Builds descriptions based on releases of that binary package name.
    """

    def getTermsWithDescriptions(self, results):
        # Prefill the descriptions dictionary with the latest
        # description uploaded for that package name.
        descriptions = getBinaryPackageDescriptions(results)
        return [SimpleTerm(obj, obj.name,
                    descriptions.get(obj.name, "Not uploaded"))
                for obj in results]


class BinaryPackageNameVocabulary(NamedSQLObjectHugeVocabulary):
    """A vocabulary for searching for binary package names."""
    _table = BinaryPackageName
    _orderBy = 'name'
    displayname = 'Select a Binary Package'
    iterator = BinaryPackageNameIterator


def getBinaryPackageDescriptions(results, use_names=False,
                                 max_title_length=50):
    """Return a dict of descriptions keyed by package name.

    See sourcepackage.py:getSourcePackageDescriptions, which is analogous.
    """
    if len(list(results)) < 1:
        return {}
    if use_names:
        clause = ("BinaryPackageName.name in %s" %
                 sqlvalues([pn.name for pn in results]))
    else:
        clause = ("BinaryPackageName.id in %s" %
                 sqlvalues([bpn.id for bpn in results]))

    descriptions = {}
    releases = BinaryPackageRelease.select(
        """BinaryPackageRelease.binarypackagename =
            BinaryPackageName.id AND
           %s""" % clause,
        clauseTables=["BinaryPackageRelease", "BinaryPackageName"],
        orderBy=["-BinaryPackageRelease.datecreated"])

    for release in releases:
        binarypackagename = release.binarypackagename.name
        if binarypackagename not in descriptions:
            description = release.description.strip().replace("\n", " ")
            if len(description) > max_title_length:
                description = (release.description[:max_title_length]
                              + "...")
            descriptions[binarypackagename] = description
    return descriptions