~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
"""Malone / Debbugs Interface

Based on debzilla/bugzilla.py by Matt Zimmerman

(c) Canonical Ltd 2004
"""

import sys
import psycopg

from canonical.launchpad.database import *
from canonical.launchpad.validators.name import valid_name
from canonical.foaf.nickname import generate_nick
from canonical.database.sqlbase import quote
from canonical.lp.encoding import guess as ensure_unicode
from canonical.lp.dbschema import EmailAddressStatus
from sets import Set



class Launchpad:
    
    def __init__(self):
        # get the debbugs remote bug tracker id
        self.debtrackerid = list(BugTracker.select("name='debbugs'"))[0].id

    def ensure_person(self, displayname, email):
        person = PersonSet().getByEmail(email)
        if not person:
            person = createPerson(email, displayname=displayname)
        return person

    def ensure_sourcepackagename(self, name):
        name = name.strip().lower()
        if not valid_name(name):
            raise ValueError, "'%s' is not a valid name" % name
        try:
            return SourcePackageName.selectBy(name=name)[0]
        except IndexError:
            return SourcePackageName(name=name)

    def ensure_binarypackagename(self, name):
        name = name.strip().lower()
        if not valid_name(name):
            raise ValueError, "'%s' is not a valid name" % name
        try:
            return BinaryPackageName.selectBy(name=name)[0]
        except IndexError:
            return BinaryPackageName(name=name)

    def get_distribution_by_name(self, name):
        return Distribution.selectBy(name=name)[0]

    def all_deb_watches(self):
        """Return a list of all debian bug numbers being watched by
        Malone"""
        watchlist = BugWatch.select("bugtracker = %d" % self.debtrackerid)
        return [int(watch.remotebug) for watch in watchlist]

    def get_watch_on_debbug(self, debian_bug):
        query = """remotebug = %s AND 
                   bugtracker = %d
                   """ % (quote(str(debian_bug.id)),
                          self.debtrackerid)
        try:
            return BugWatch.select(query)[0]
        except IndexError:
            return None

    def add_debbug_watch(self, malone_bug, debian_bug, owner):
        newwatch = BugWatch(bug=malone_bug.id,
                            remotebug=str(debian_bug.id),
                            owner=owner,
                            bugtracker=self.debtrackerid
                            )

    def get_bugtracker_by_baseurl(self, baseurl):
        query = "baseurl = %s" % quote(baseurl)
        try:
            return BugTracker.select(query)[0]
        except IndexError:
            return None

    def get_msg_by_msgid(self, msgid):
        query = "rfc822msgid = %s" % quote(msgid)
        try:
            return Message.select(query)[0]
        except IndexError:
            return None

    def get_malonebug_for_debbug_id(self, debian_bug_id):
        """Return a malone bugfor a debian bug number,
        based on the bug watches."""
        try:
            return BugWatch.select("""
               bugtracker = %d AND
               remotebug = %s
               """ % (self.debtrackerid, quote(str(debian_bug_id))))[0].bug
        except IndexError:
            return None
 
    def link_bug_and_message(self, bug, msg):
        bugmsg = BugMessage(bug=bug.id, message=msg.id)
        return bugmsg

    def get_bug_task(self, bug, distribution, sourcepackagename,
                     binarypackagename):
        for bugtask in bug.bugtasks:
            if bugtask.sourcepackagename == sourcepackagename and \
               bugtask.distribution == distribution and \
               bugtask.binarypackagename == binarypackagename:
                return bugtask
        return None

    def add_bug_task(self, bug, distro, srcpackagename,
                     binarypkgname, status, owner, datecreated):
        sourcepackagename = srcpackagename.id
        if binarypkgname:
            binarypackagename = binarypkgname.id
        else:
            binarypackagename = None
        newbugtask = BugTask(bug=bug.id,
                             distribution=distro.id,
                             sourcepackagename=sourcepackagename,
                             binarypackagename=binarypackagename,
                             status=status,
                             owner=owner.id,
                             datecreated=datecreated)

    def bug_message_ids(self, bug):
        """Return a list of message IDs found embedded in comments for
        the specified bug"""
        return [msg.rfc822msgid for msg in bug.messages]

    def sourcepackages(self, distroname):
        """return a dictionary mapping sourcepackagename to a sourcepackage in
        the given distribution"""
        # XXX this should rather look at the publishing table
        clauseTables = ['SourcePackage', 'Distribution']
        query = '''SourcePackage.distro = Distribution.id AND
                   Distribution.name = %s
                ''' % quote(distroname)
        spmap = {}
        for sp in SourcePackage.select(query, clauseTables=clauseTables):
            spmap[sp.sourcepackagename.name] = sp
        return spmap

    def get_sourcepackage(self, srcpkgname, distroname):
        clauseTables = ['Distribution', 'SourcePackage', 'SourcePackageName']
        query = """Distribution.name=%s AND
                   SourcePackage.distro = Distribution.id AND
                   SourcePackage.sourcepackagename = SourcePackageName.id AND
                   SourcePackageName.name = %s
                   """ % ( quote(distroname), quote(srcpkgname) )
        try:
            return SourcePackage.select(query, clauseTables=clauseTables)[0]
        except IndexError:
            return None

    def get_sourcepackagename(self, srcpkgname):
        query = "name = %s" % quote(srcpkgname)
        try:
            return SourcePackageName.select(query)[0]
        except IndexError:
            return None

    def get_binarypackagename(self, srcpkgname):
        query = "name = %s" % quote(srcpkgname)
        try:
            return BinaryPackageName.select(query)[0]
        except IndexError:
            return None

    def get_distro_by_name(self, distroname):
        query = "name = %s" % quote(distroname)
        try:
            return Distribution.select(query)[0]
        except IndexError:
            return None

    def get_message_by_id(self, message_id):
        query = "rfc822msgid = %s" % quote(message_id)
        try:
            return Message.select(query)[0]
        except IndexError:
            return None

    def add_message(self, message, owner, datecreated):
        msgid = message['message-id']
        if msgid is None:
            print 'ERROR: Message has no message-id'
            return None
        title=message.get('subject', None)
        if not title:
            print 'ERROR getting message title for %s ' % msgid
            title = 'message without subject'
        title = ensure_unicode(title)
        contents = ensure_unicode(message.as_string())
        try:
            newmsg = Message(title=title,
                             contents=contents,
                             rfc822msgid=msgid,
                             owner=owner,
                             datecreated=datecreated)
        except psycopg.ProgrammingError:
            print 'ERROR STORING %s IN DATABASE:' % msgid
            print '    ', sys.exc_value
            return None
        return newmsg
        
    def add_sourcepackage(self, srcpkgname, distroname, maintainer):
        if get_sourcepackagename(srcpkgname) is None:
            srcpkgname_id = SourcePackageName(name=srcpkgname).id
        else:
            srcpkgname_id = get_sourcepackagename(srcpkgname).id
        distribution = get_distro(distroname)
        if distribution is None:
            raise Error, 'Unknown distribution "%s"' % distroname
        newsrcpkg = get_sourcepackage(srcpkgname, distroname)
        if newsrcpkg is None:
            newsrcpkg = SourcePackage(maintainer=maintainer,
                                      summary=distroname+' package '+srcpkgname,
                                      description=distroname +' package '+srcpkgname,
                                      distro=distribution.id,
                                      sourcepackagename=srcpkgname_id,
                                      srcpackageformat=1)
        return newsrcpkg