~launchpad-pqm/launchpad/devel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
# Copyright 2009 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

__metaclass__ = type

from datetime import (
    datetime,
    timedelta,
    )
from doctest import (
    DocTestSuite,
    ELLIPSIS,
    NORMALIZE_WHITESPACE,
    )
import unittest
from urllib2 import (
    HTTPError,
    Request,
    )

from lazr.lifecycle.snapshot import Snapshot
from pytz import utc
import transaction
from zope.component import getUtility
from zope.security.interfaces import Unauthorized
from zope.security.proxy import removeSecurityProxy

from canonical.testing.layers import (
    DatabaseFunctionalLayer,
    LaunchpadFunctionalLayer,
    )
from lp.app.interfaces.launchpad import ILaunchpadCelebrities
from lp.bugs.externalbugtracker import (
    BugTrackerConnectError,
    Mantis,
    MantisLoginHandler,
    )
from lp.bugs.interfaces.bugtracker import (
    BugTrackerType,
    IBugTracker,
    )
from lp.bugs.model.bugtracker import (
    BugTrackerSet,
    make_bugtracker_name,
    make_bugtracker_title,
    )
from lp.bugs.tests.externalbugtracker import UrlLib2TransportTestHandler
from lp.registry.interfaces.person import IPersonSet
from lp.testing import (
    login,
    login_person,
    TestCase,
    TestCaseWithFactory,
    )
from lp.testing.sampledata import ADMIN_EMAIL


class TestBugTrackerSet(TestCaseWithFactory):

    layer = DatabaseFunctionalLayer

    def test_trackers(self):
        tracker = self.factory.makeBugTracker()
        trackers = BugTrackerSet()
        # Active trackers are in all trackers,
        self.assertTrue(tracker in trackers.trackers())
        # and active,
        self.assertTrue(tracker in trackers.trackers(active=True))
        # But not inactive.
        self.assertFalse(tracker in trackers.trackers(active=False))
        login(ADMIN_EMAIL)
        tracker.active = False
        # Inactive trackers are in all trackers
        self.assertTrue(tracker in trackers.trackers())
        # and inactive,
        self.assertTrue(tracker in trackers.trackers(active=False))
        # but not in active.
        self.assertFalse(tracker in trackers.trackers(active=True))


class BugTrackerTestCase(TestCaseWithFactory):
    """Unit tests for the `BugTracker` class."""

    layer = LaunchpadFunctionalLayer

    def setUp(self):
        super(BugTrackerTestCase, self).setUp()
        self.bug_tracker = self.factory.makeBugTracker()
        for i in range(5):
            self.factory.makeBugWatch(bugtracker=self.bug_tracker)

        self.now = datetime.now(utc)

    def test_multi_product_constraints_observed(self):
        """BugTrackers for which multi_product=True should return None
        when no remote product is passed to getBugFilingURL().

        BugTrackers for which multi_product=False should still return a
        URL even when getBugFilingURL() is passed no remote product.
        """
        for type in BugTrackerType.items:
            bugtracker = self.factory.makeBugTracker(bugtrackertype=type)

            bugtracker_urls = bugtracker.getBugFilingAndSearchLinks(None)
            bug_filing_url = bugtracker_urls['bug_filing_url']
            bug_search_url = bugtracker_urls['bug_search_url']

            if bugtracker.multi_product:
                self.assertTrue(
                    bug_filing_url is None,
                    "getBugFilingAndSearchLinks() should return a "
                    "bug_filing_url of None for BugTrackers of type %s when "
                    "no remote product is passed." %
                    type.title)
                self.assertTrue(
                    bug_search_url is None,
                    "getBugFilingAndSearchLinks() should return a "
                    "bug_search_url of None for BugTrackers of type %s when "
                    "no remote product is passed." %
                    type.title)
            else:
                self.assertTrue(
                    bug_filing_url is not None,
                    "getBugFilingAndSearchLinks() should not return a "
                    "bug_filing_url of None for BugTrackers of type %s when "
                    "no remote product is passed." %
                    type.title)
                self.assertTrue(
                    bug_search_url is not None,
                    "getBugFilingAndSearchLinks() should not return a "
                    "bug_search_url of None for BugTrackers of type %s when "
                    "no remote product is passed." %
                    type.title)

    def test_attributes_not_in_snapshot(self):
        # A snapshot of an IBugTracker will not contain a copy of
        # several attributes.
        marker = object()
        original = self.factory.makeBugTracker()
        attributes = [
            'watches',
            'watches_needing_update',
            'watches_ready_to_check',
            'watches_with_unpushed_comments',
            ]
        for attribute in attributes:
            self.failUnless(
                getattr(original, attribute, marker) is not marker,
                "Attribute %s missing from bug tracker." % attribute)
        snapshot = Snapshot(original, providing=IBugTracker)
        for attribute in attributes:
            self.failUnless(
                getattr(snapshot, attribute, marker) is marker,
                "Attribute %s not missing from snapshot." % attribute)

    def test_watches_ready_to_check(self):
        bug_tracker = self.factory.makeBugTracker()
        # Initially there are no watches, so none need to be checked.
        self.failUnless(bug_tracker.watches_ready_to_check.is_empty())
        # A bug watch without a next_check set is not ready either.
        bug_watch = self.factory.makeBugWatch(bugtracker=bug_tracker)
        removeSecurityProxy(bug_watch).next_check = None
        self.failUnless(bug_tracker.watches_ready_to_check.is_empty())
        # If we set its next_check date, it will be ready.
        removeSecurityProxy(bug_watch).next_check = (
            datetime.now(utc) - timedelta(hours=1))
        self.failUnless(1, bug_tracker.watches_ready_to_check.count())
        self.failUnlessEqual(
            bug_watch, bug_tracker.watches_ready_to_check.one())

    def test_watches_with_unpushed_comments(self):
        bug_tracker = self.factory.makeBugTracker()
        # Initially there are no watches, so there are no unpushed
        # comments.
        self.failUnless(bug_tracker.watches_with_unpushed_comments.is_empty())
        # A new bug watch has no comments, so the same again.
        bug_watch = self.factory.makeBugWatch(bugtracker=bug_tracker)
        self.failUnless(bug_tracker.watches_with_unpushed_comments.is_empty())
        # A comment linked to the bug watch will be found.
        login_person(bug_watch.bug.owner)
        message = self.factory.makeMessage(owner=bug_watch.owner)
        bug_message = bug_watch.bug.linkMessage(message, bug_watch)
        self.failUnless(1, bug_tracker.watches_with_unpushed_comments.count())
        self.failUnlessEqual(
            bug_watch, bug_tracker.watches_with_unpushed_comments.one())
        # Once the comment has been pushed, it will no longer be found.
        removeSecurityProxy(bug_message).remote_comment_id = 'brains'
        self.failUnless(bug_tracker.watches_with_unpushed_comments.is_empty())

    def _assertBugWatchesAreCheckedInTheFuture(self):
        """Check the dates of all self.bug_tracker.watches.

        Raise an error if:
         * The next_check dates aren't in the future.
         * The next_check dates aren't <= 1 day in the future.
         * The lastcheck dates are not None
         * The last_error_types are not None.
        """
        for watch in self.bug_tracker.watches:
            self.assertTrue(
                watch.next_check is not None,
                "BugWatch next_check time should not be None.")
            self.assertTrue(
                watch.next_check >= self.now,
                "BugWatch next_check time should be in the future.")
            self.assertTrue(
                watch.next_check <= self.now + timedelta(days=1),
                "BugWatch next_check time should be one day or less in "
                "the future.")
            self.assertTrue(
                watch.lastchecked is None,
                "BugWatch lastchecked should be None.")
            self.assertTrue(
                watch.last_error_type is None,
                "BugWatch last_error_type should be None.")

    def test_unprivileged_user_cant_reset_watches(self):
        # It isn't possible for a user who isn't an admin or a member of
        # the Launchpad Developers team to reset the watches for a bug
        # tracker.
        unprivileged_user = self.factory.makePerson()
        login_person(unprivileged_user)
        self.assertRaises(
            Unauthorized, getattr, self.bug_tracker, 'resetWatches',
            "Unprivileged users should not be allowed to reset a "
            "tracker's watches.")

    def test_admin_can_reset_watches(self):
        # Launchpad admins can reset the watches on a bugtracker.
        admin_user = getUtility(IPersonSet).getByEmail(ADMIN_EMAIL)
        login_person(admin_user)
        self.bug_tracker.resetWatches()
        self._assertBugWatchesAreCheckedInTheFuture()

    def test_lp_dev_can_reset_watches(self):
        # Launchpad developers can reset the watches on a bugtracker.
        login(ADMIN_EMAIL)
        admin = getUtility(IPersonSet).getByEmail(ADMIN_EMAIL)
        launchpad_developers = getUtility(
            ILaunchpadCelebrities).launchpad_developers
        lp_dev = self.factory.makePerson()
        launchpad_developers.addMember(lp_dev, admin)
        login_person(lp_dev)
        self.bug_tracker.resetWatches()
        self._assertBugWatchesAreCheckedInTheFuture()

    def test_janitor_can_reset_watches(self):
        # The Janitor can reset the watches on a bug tracker.
        janitor = getUtility(ILaunchpadCelebrities).janitor
        login_person(janitor)
        self.bug_tracker.resetWatches()
        self._assertBugWatchesAreCheckedInTheFuture()


class TestMantis(TestCaseWithFactory):
    """Tests for the Mantis-specific bug tracker code."""

    layer = LaunchpadFunctionalLayer

    def setUp(self):
        super(TestMantis, self).setUp()
        # We need to commit to avoid there being errors from the
        # checkwatches isolation protection code.
        transaction.commit()

    def test_mantis_login_redirects(self):
        # The Mantis bug tracker needs a special HTTP redirect handler
        # in order to login in. Ensure that redirects to the page with
        # the login form are indeed changed to redirects the form submit
        # URL.
        handler = MantisLoginHandler()
        request = Request('http://mantis.example.com/some/path')
        # Let's pretend that Mantis sent a redirect request to the
        # login page.
        new_request = handler.redirect_request(
            request, None, 302, None, None,
            'http://mantis.example.com/login_page.php'
            '?return=%2Fview.php%3Fid%3D3301')
        self.assertEqual(
            'http://mantis.example.com/login.php?'
            'username=guest&password=guest&return=%2Fview.php%3Fid%3D3301',
            new_request.get_full_url())

    def test_mantis_login_redirect_handler_is_used(self):
        # Ensure that the special Mantis login handler is used
        # by the Mantis tracker
        tracker = Mantis('http://mantis.example.com')
        test_handler = UrlLib2TransportTestHandler()
        test_handler.setRedirect('http://mantis.example.com/login_page.php'
            '?return=%2Fsome%2Fpage')
        opener = tracker._opener
        opener.add_handler(test_handler)
        opener.open('http://mantis.example.com/some/page')
        # We should now have two entries in the test handler's list
        # of visited URLs: The original URL we wanted to visit and the
        # URL changed by the MantisLoginHandler.
        self.assertEqual(
            ['http://mantis.example.com/some/page',
             'http://mantis.example.com/login.php?'
             'username=guest&password=guest&return=%2Fsome%2Fpage'],
            test_handler.accessed_urls)

    def test_mantis_opener_can_handle_cookies(self):
        # Ensure that the OpenerDirector of the Mantis bug tracker
        # handles cookies.
        tracker = Mantis('http://mantis.example.com')
        test_handler = UrlLib2TransportTestHandler()
        opener = tracker._opener
        opener.add_handler(test_handler)
        opener.open('http://mantis.example.com', '')
        cookies = list(tracker._cookie_handler.cookiejar)
        self.assertEqual(1, len(cookies))
        self.assertEqual('foo', cookies[0].name)
        self.assertEqual('bar', cookies[0].value)

    def test_mantis_csv_file_http_500_error(self):
        # If a Mantis bug tracker returns a HTTP 500 error when the
        # URL for CSV data is accessed, we treat this as an
        # indication that we should screen scrape the bug data and
        # thus set csv_data to None.
        tracker = Mantis('http://mantis.example.com')
        test_handler = UrlLib2TransportTestHandler()
        opener = tracker._opener
        opener.add_handler(test_handler)
        test_handler.setError(
            HTTPError(
                'http://mantis.example.com/csv_export.php', 500,
                'Internal Error', {}, None),
            'http://mantis.example.com/csv_export.php')
        self.assertIs(None, tracker.csv_data)

    def test_mantis_csv_file_other_http_errors(self):
        # If the Mantis server returns other HTTP errors than 500,
        # they appear as BugTrackerConnectErrors.
        tracker = Mantis('http://mantis.example.com')
        test_handler = UrlLib2TransportTestHandler()
        opener = tracker._opener
        opener.add_handler(test_handler)
        test_handler.setError(
            HTTPError(
                'http://mantis.example.com/csv_export.php', 503,
                'Service Unavailable', {}, None),
            'http://mantis.example.com/csv_export.php')
        self.assertRaises(BugTrackerConnectError, tracker._csv_data)

        test_handler.setError(
            HTTPError(
                'http://mantis.example.com/csv_export.php', 404,
                'Not Found', {}, None),
            'http://mantis.example.com/csv_export.php')
        self.assertRaises(BugTrackerConnectError, tracker._csv_data)


class TestSourceForge(TestCaseWithFactory):
    """Tests for SourceForge-specific BugTracker code."""

    layer = DatabaseFunctionalLayer

    def test_getBugFilingAndSearchLinks_handles_bad_data_correctly(self):
        # It's possible for Product.remote_product to contain data
        # that's not valid for SourceForge BugTrackers.
        # getBugFilingAndSearchLinks() will return None if it encounters
        # bad data in the remote_product field.
        remote_product = "this is not valid"
        bug_tracker = self.factory.makeBugTracker(
            bugtrackertype=BugTrackerType.SOURCEFORGE)
        self.assertIs(
            None, bug_tracker.getBugFilingAndSearchLinks(remote_product))


class TestMakeBugtrackerName(TestCase):
    """Tests for make_bugtracker_name."""

    def test_url(self):
        self.assertEquals(
            'auto-bugs.example.com',
            make_bugtracker_name('http://bugs.example.com/shrubbery'))

    def test_email_address(self):
        self.assertEquals(
            'auto-foo.bar',
            make_bugtracker_name('mailto:foo.bar@somewhere.com'))

    def test_sanitises_forbidden_characters(self):
        self.assertEquals(
            'auto-foobar',
            make_bugtracker_name('mailto:foo_bar@somewhere.com'))


class TestMakeBugtrackerTitle(TestCase):
    """Tests for make_bugtracker_title."""

    def test_url(self):
        self.assertEquals(
            'bugs.example.com/shrubbery',
            make_bugtracker_title('http://bugs.example.com/shrubbery'))

    def test_email_address(self):
        self.assertEquals(
            'Email to foo.bar@somewhere',
            make_bugtracker_title('mailto:foo.bar@somewhere.com'))


def test_suite():
    suite = unittest.TestLoader().loadTestsFromName(__name__)
    doctest_suite = DocTestSuite(
        'lp.bugs.model.bugtracker',
        optionflags=NORMALIZE_WHITESPACE|ELLIPSIS)
    suite.addTest(doctest_suite)
    return suite