~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Sourceforge ExternalBugTracker utility."""

__metaclass__ = type
__all__ = ['SourceForge']

import re
import urllib

from BeautifulSoup import BeautifulSoup

from canonical.launchpad.webapp import urlsplit
from lp.bugs.externalbugtracker import (
    BugNotFound,
    ExternalBugTracker,
    InvalidBugId,
    LookupTree,
    PrivateRemoteBug,
    UnknownRemoteStatusError,
    UnparsableBugData,
    )
from lp.bugs.interfaces.bugtask import (
    BugTaskImportance,
    BugTaskStatus,
    )
from lp.bugs.interfaces.externalbugtracker import UNKNOWN_REMOTE_IMPORTANCE


class SourceForge(ExternalBugTracker):
    """An ExternalBugTracker for SourceForge bugs."""

    # We only allow ourselves to update one SourceForge bug at a time to
    # avoid getting clobbered by SourceForge's rate limiting code.
    export_url = 'support/tracker.php?aid=%s'
    batch_size = 1

    def initializeRemoteBugDB(self, bug_ids):
        """See `ExternalBugTracker`.

        We override this method because SourceForge does not provide a
        nice way for us to export bug statuses en masse. Instead, we
        resort to screen-scraping on a per-bug basis. Therefore the
        usual choice of batch vs. single export does not apply here and
        we only perform single exports.
        """
        self.bugs = {}

        for bug_id in bug_ids:
            query_url = self.export_url % bug_id
            page_data = self._getPage(query_url)

            soup = BeautifulSoup(page_data)
            status_tag = soup.find(text=re.compile('Status:'))

            status = None
            private = False
            if status_tag:
                # We can extract the status by finding the grandparent tag.
                # Happily, BeautifulSoup will turn the contents of this tag
                # into a newline-delimited list from which we can then
                # extract the requisite data.
                status_row = status_tag.findParent().findParent()
                status = status_row.contents[-1]
                status = status.strip()
            else:
                error_message = self._extractErrorMessage(page_data)

                # If the error message suggests that the bug is private,
                # set the bug's private field to True.
                # XXX 2008-05-01 gmb bug=225354:
                #     We should know more about possible errors and deal
                #     with them accordingly.
                if error_message and 'private' in error_message.lower():
                    private = True
                else:
                    # If we can't find a status line in the output from
                    # SourceForge there's little point in continuing.
                    raise UnparsableBugData(
                        'Remote bug %s does not define a status.' % bug_id)

            # We need to do the same for Resolution, though if we can't
            # find it it's not critical.
            resolution_tag = soup.find(text=re.compile('Resolution:'))
            if resolution_tag:
                resolution_row = resolution_tag.findParent().findParent()
                resolution = resolution_row.contents[-1]
                resolution = resolution.strip()
            else:
                resolution = None

            # We save the group_id and atid parameters from the
            # query_url. They'll be returned by getRemoteProduct().
            query_dict = {}
            bugtracker_link = soup.find('a', text='Bugs')
            if bugtracker_link:
                href = bugtracker_link.findParent()['href']

                # We need to replace encoded ampersands in the URL since
                # SourceForge occasionally encodes them.
                href = href.replace('&', '&')
                schema, host, path, query, fragment = urlsplit(href)

                query_bits = query.split('&')
                for bit in query_bits:
                    key, value = urllib.splitvalue(bit)
                    query_dict[key] = value

                try:
                    atid = int(query_dict.get('atid', None))
                    group_id = int(query_dict.get('group_id', None))
                except ValueError:
                    atid = None
                    group_id = None
            else:
                group_id = None
                atid = None

            self.bugs[int(bug_id)] = {
                'id': int(bug_id),
                'private': private,
                'status': status,
                'resolution': resolution,
                'group_id': group_id,
                'atid': atid,
                }

    def _extractErrorMessage(self, page_data):
        """Extract an error message from a SourceForge page and return it."""
        soup = BeautifulSoup(page_data)
        error_frame = soup.find(attrs={'class': 'error'})

        if not error_frame:
            return None

        # We find the error message by going via the somewhat shakey
        # method of looking for the only paragraph inside the
        # error_frame div.
        error_message = error_frame.find(name='p')
        if error_message:
            # Strip out any leading or trailing whitespace and return the
            # error message.
            return error_message.string.strip()
        else:
            # We know there was an error, but we can't tell what it was.
            return 'Unspecified error.'

    def getRemoteImportance(self, bug_id):
        """See `ExternalBugTracker`.

        This method is implemented here as a stub to ensure that
        existing functionality is preserved. As a result,
        UNKNOWN_REMOTE_IMPORTANCE will always be returned.
        """
        return UNKNOWN_REMOTE_IMPORTANCE

    def getRemoteStatus(self, bug_id):
        """See `ExternalBugTracker`."""
        try:
            bug_id = int(bug_id)
        except ValueError:
            raise InvalidBugId(
                "bug_id must be convertible to an integer: %s" % str(bug_id))

        try:
            remote_bug = self.bugs[bug_id]
        except KeyError:
            raise BugNotFound(bug_id)

        # If the remote bug is private, raise a PrivateRemoteBug error.
        if remote_bug['private']:
            raise PrivateRemoteBug(
                "Bug %i on %s is private." % (bug_id, self.baseurl))

        try:
            return '%(status)s:%(resolution)s' % remote_bug
        except KeyError:
            raise UnparsableBugData(
                "Remote bug %i does not define a status." % bug_id)

    def convertRemoteImportance(self, remote_importance):
        """See `ExternalBugTracker`.

        This method is implemented here as a stub to ensure that
        existing functionality is preserved. As a result,
        BugTaskImportance.UNKNOWN will always be returned.
        """
        return BugTaskImportance.UNKNOWN

    # SourceForge statuses come in two parts: status and
    # resolution. Both of these are strings.  We use the open status
    # as a fallback when we can't find an exact mapping for the other
    # statuses.
    _status_lookup_open = LookupTree(
        (None, BugTaskStatus.NEW),
        ('Accepted', BugTaskStatus.CONFIRMED),
        ('Duplicate', BugTaskStatus.CONFIRMED),
        ('Fixed', BugTaskStatus.FIXCOMMITTED),
        ('Invalid', BugTaskStatus.INVALID),
        ('Later', BugTaskStatus.CONFIRMED),
        ('Out of Date', BugTaskStatus.INVALID),
        ('Postponed', BugTaskStatus.CONFIRMED),
        ('Rejected', BugTaskStatus.WONTFIX),
        ('Remind', BugTaskStatus.CONFIRMED),
        # Some custom SourceForge trackers misspell this, so we
        # deal with the syntactically incorrect version, too.
        ("Won't Fix", BugTaskStatus.WONTFIX),
        ('Wont Fix', BugTaskStatus.WONTFIX),
        ('Works For Me', BugTaskStatus.INVALID),
        )
    _status_lookup_titles = 'SourceForge status', 'SourceForge resolution'
    _status_lookup = LookupTree(
        ('Open', _status_lookup_open),
        ('Closed', LookupTree(
            (None, BugTaskStatus.FIXRELEASED),
            ('Accepted', BugTaskStatus.FIXCOMMITTED),
            ('Fixed', BugTaskStatus.FIXRELEASED),
            ('Postponed', BugTaskStatus.WONTFIX),
            _status_lookup_open)),
        ('Pending', LookupTree(
            (None, BugTaskStatus.INCOMPLETE),
            ('Postponed', BugTaskStatus.WONTFIX),
            _status_lookup_open)),
        )

    def convertRemoteStatus(self, remote_status):
        """See `IExternalBugTracker`."""
        # We have to deal with situations where we can't get a
        # resolution to go with the status, so we define both even if
        # we can't get both from SourceForge.
        if ':' in remote_status:
            status, resolution = remote_status.split(':')
            if resolution == 'None':
                resolution = None
        else:
            status = remote_status
            resolution = None

        try:
            return self._status_lookup.find(status, resolution)
        except KeyError:
            raise UnknownRemoteStatusError(remote_status)

    def getRemoteProduct(self, remote_bug):
        """Return the remote product for a given bug.

        :return: A tuple of (group_id, atid) for the remote bug.
        """
        try:
            remote_bug = int(remote_bug)
        except ValueError:
            raise InvalidBugId(
                "remote_bug must be convertible to an integer: %s" %
                str(remote_bug))

        try:
            remote_bug = self.bugs[remote_bug]
        except KeyError:
            raise BugNotFound(remote_bug)

        group_id = remote_bug['group_id']
        atid = remote_bug['atid']

        if group_id is None or atid is None:
            return None
        else:
            return "%s&%s" % (group_id, atid)