1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
|
"""Malone / Debbugs Interface
Based on debzilla/bugzilla.py by Matt Zimmerman
(c) Canonical Ltd 2004
"""
import sys
import psycopg
from canonical.launchpad.database import *
from canonical.launchpad.validators.name import valid_name
from canonical.foaf.nickname import generate_nick
from canonical.database.sqlbase import quote
from canonical.lp.encoding import guess as ensure_unicode
from canonical.lp.dbschema import EmailAddressStatus
from sets import Set
class Launchpad:
def __init__(self):
# get the debbugs remote bug tracker id
self.debtrackerid = list(BugTracker.select("name='debbugs'"))[0].id
def ensure_person(self, displayname, email):
person = PersonSet().getByEmail(email)
if not person:
person = createPerson(email, displayname=displayname)
return person
def ensure_sourcepackagename(self, name):
name = name.strip().lower()
if not valid_name(name):
raise ValueError, "'%s' is not a valid name" % name
try:
return SourcePackageName.selectBy(name=name)[0]
except IndexError:
return SourcePackageName(name=name)
def ensure_binarypackagename(self, name):
name = name.strip().lower()
if not valid_name(name):
raise ValueError, "'%s' is not a valid name" % name
try:
return BinaryPackageName.selectBy(name=name)[0]
except IndexError:
return BinaryPackageName(name=name)
def get_distribution_by_name(self, name):
return Distribution.selectBy(name=name)[0]
def all_deb_watches(self):
"""Return a list of all debian bug numbers being watched by
Malone"""
watchlist = BugWatch.select("bugtracker = %d" % self.debtrackerid)
return [int(watch.remotebug) for watch in watchlist]
def get_watch_on_debbug(self, debian_bug):
query = """remotebug = %s AND
bugtracker = %d
""" % (quote(str(debian_bug.id)),
self.debtrackerid)
try:
return BugWatch.select(query)[0]
except IndexError:
return None
def add_debbug_watch(self, malone_bug, debian_bug, owner):
newwatch = BugWatch(bug=malone_bug.id,
remotebug=str(debian_bug.id),
owner=owner,
bugtracker=self.debtrackerid
)
def get_bugtracker_by_baseurl(self, baseurl):
query = "baseurl = %s" % quote(baseurl)
try:
return BugTracker.select(query)[0]
except IndexError:
return None
def get_msg_by_msgid(self, msgid):
query = "rfc822msgid = %s" % quote(msgid)
try:
return Message.select(query)[0]
except IndexError:
return None
def get_malonebug_for_debbug_id(self, debian_bug_id):
"""Return a malone bugfor a debian bug number,
based on the bug watches."""
try:
return BugWatch.select("""
bugtracker = %d AND
remotebug = %s
""" % (self.debtrackerid, quote(str(debian_bug_id))))[0].bug
except IndexError:
return None
def link_bug_and_message(self, bug, msg):
bugmsg = BugMessage(bug=bug.id, message=msg.id)
return bugmsg
def get_bug_task(self, bug, distribution, sourcepackagename,
binarypackagename):
for bugtask in bug.bugtasks:
if bugtask.sourcepackagename == sourcepackagename and \
bugtask.distribution == distribution and \
bugtask.binarypackagename == binarypackagename:
return bugtask
return None
def add_bug_task(self, bug, distro, srcpackagename,
binarypkgname, status, owner, datecreated):
sourcepackagename = srcpackagename.id
if binarypkgname:
binarypackagename = binarypkgname.id
else:
binarypackagename = None
newbugtask = BugTask(bug=bug.id,
distribution=distro.id,
sourcepackagename=sourcepackagename,
binarypackagename=binarypackagename,
status=status,
owner=owner.id,
datecreated=datecreated)
def bug_message_ids(self, bug):
"""Return a list of message IDs found embedded in comments for
the specified bug"""
return [msg.rfc822msgid for msg in bug.messages]
def sourcepackages(self, distroname):
"""return a dictionary mapping sourcepackagename to a sourcepackage in
the given distribution"""
# XXX this should rather look at the publishing table
clauseTables = ['SourcePackage', 'Distribution']
query = '''SourcePackage.distro = Distribution.id AND
Distribution.name = %s
''' % quote(distroname)
spmap = {}
for sp in SourcePackage.select(query, clauseTables=clauseTables):
spmap[sp.sourcepackagename.name] = sp
return spmap
def get_sourcepackage(self, srcpkgname, distroname):
clauseTables = ['Distribution', 'SourcePackage', 'SourcePackageName']
query = """Distribution.name=%s AND
SourcePackage.distro = Distribution.id AND
SourcePackage.sourcepackagename = SourcePackageName.id AND
SourcePackageName.name = %s
""" % ( quote(distroname), quote(srcpkgname) )
try:
return SourcePackage.select(query, clauseTables=clauseTables)[0]
except IndexError:
return None
def get_sourcepackagename(self, srcpkgname):
query = "name = %s" % quote(srcpkgname)
try:
return SourcePackageName.select(query)[0]
except IndexError:
return None
def get_binarypackagename(self, srcpkgname):
query = "name = %s" % quote(srcpkgname)
try:
return BinaryPackageName.select(query)[0]
except IndexError:
return None
def get_distro_by_name(self, distroname):
query = "name = %s" % quote(distroname)
try:
return Distribution.select(query)[0]
except IndexError:
return None
def get_message_by_id(self, message_id):
query = "rfc822msgid = %s" % quote(message_id)
try:
return Message.select(query)[0]
except IndexError:
return None
def add_message(self, message, owner, datecreated):
msgid = message['message-id']
if msgid is None:
print 'ERROR: Message has no message-id'
return None
title=message.get('subject', None)
if not title:
print 'ERROR getting message title for %s ' % msgid
title = 'message without subject'
title = ensure_unicode(title)
contents = ensure_unicode(message.as_string())
try:
newmsg = Message(title=title,
contents=contents,
rfc822msgid=msgid,
owner=owner,
datecreated=datecreated)
except psycopg.ProgrammingError:
print 'ERROR STORING %s IN DATABASE:' % msgid
print ' ', sys.exc_value
return None
return newmsg
def add_sourcepackage(self, srcpkgname, distroname, maintainer):
if get_sourcepackagename(srcpkgname) is None:
srcpkgname_id = SourcePackageName(name=srcpkgname).id
else:
srcpkgname_id = get_sourcepackagename(srcpkgname).id
distribution = get_distro(distroname)
if distribution is None:
raise Error, 'Unknown distribution "%s"' % distroname
newsrcpkg = get_sourcepackage(srcpkgname, distroname)
if newsrcpkg is None:
newsrcpkg = SourcePackage(maintainer=maintainer,
summary=distroname+' package '+srcpkgname,
description=distroname +' package '+srcpkgname,
distro=distribution.id,
sourcepackagename=srcpkgname_id,
srcpackageformat=1)
return newsrcpkg
|