~chipaca/unity-lens-video/custom-user-agent

« back to all changes in this revision

Viewing changes to django_openid_auth/tests/test_views.py

  • Committer: Janos Gyerik
  • Date: 2011-09-26 19:02:47 UTC
  • Revision ID: janos@axiom-20110926190247-sqcw1x0wufosa6fb
bundle django_openid_auth module for openid authentication

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# django-openid-auth -  OpenID integration for django.contrib.auth
 
2
#
 
3
# Copyright (C) 2009-2010 Canonical Ltd.
 
4
#
 
5
# Redistribution and use in source and binary forms, with or without
 
6
# modification, are permitted provided that the following conditions
 
7
# are met:
 
8
#
 
9
# * Redistributions of source code must retain the above copyright
 
10
# notice, this list of conditions and the following disclaimer.
 
11
#
 
12
# * Redistributions in binary form must reproduce the above copyright
 
13
# notice, this list of conditions and the following disclaimer in the
 
14
# documentation and/or other materials provided with the distribution.
 
15
#
 
16
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
17
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
18
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 
19
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
 
20
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 
21
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 
22
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 
23
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 
24
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
 
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 
27
# POSSIBILITY OF SUCH DAMAGE.
 
28
 
 
29
import cgi
 
30
import unittest
 
31
from urllib import quote_plus
 
32
 
 
33
from django.conf import settings
 
34
from django.contrib.auth.models import User, Group
 
35
from django.http import HttpRequest, HttpResponse
 
36
from django.test import TestCase
 
37
from openid.consumer.consumer import Consumer, SuccessResponse
 
38
from openid.consumer.discover import OpenIDServiceEndpoint
 
39
from openid.extensions import ax, sreg, pape
 
40
from openid.fetchers import (
 
41
    HTTPFetcher, HTTPFetchingError, HTTPResponse, setDefaultFetcher)
 
42
from openid.oidutil import importElementTree
 
43
from openid.server.server import BROWSER_REQUEST_MODES, ENCODE_URL, Server
 
44
from openid.store.memstore import MemoryStore
 
45
from openid.message import OPENID1_URL_LIMIT, IDENTIFIER_SELECT
 
46
 
 
47
from django_openid_auth import teams
 
48
from django_openid_auth.models import UserOpenID
 
49
from django_openid_auth.views import (
 
50
    sanitise_redirect_url, 
 
51
    make_consumer,
 
52
)
 
53
from django_openid_auth.auth import OpenIDBackend
 
54
from django_openid_auth.signals import openid_login_complete
 
55
from django_openid_auth.store import DjangoOpenIDStore
 
56
from django_openid_auth.exceptions import (
 
57
    MissingUsernameViolation,
 
58
    DuplicateUsernameViolation,
 
59
    MissingPhysicalMultiFactor,
 
60
    RequiredAttributeNotReturned,
 
61
)
 
62
 
 
63
ET = importElementTree()
 
64
 
 
65
class StubOpenIDProvider(HTTPFetcher):
 
66
 
 
67
    def __init__(self, base_url):
 
68
        self.store = MemoryStore()
 
69
        self.identity_url = base_url + 'identity'
 
70
        self.localid_url = base_url + 'localid'
 
71
        self.endpoint_url = base_url + 'endpoint'
 
72
        self.server = Server(self.store, self.endpoint_url)
 
73
        self.last_request = None
 
74
        self.type_uris = ['http://specs.openid.net/auth/2.0/signon']
 
75
 
 
76
    def fetch(self, url, body=None, headers=None):
 
77
        if url == self.identity_url:
 
78
            # Serve an XRDS document directly, pointing at our endpoint.
 
79
            type_uris = ['<Type>%s</Type>' % uri for uri in self.type_uris]
 
80
            return HTTPResponse(
 
81
                url, 200, {'content-type': 'application/xrds+xml'}, """\
 
82
<?xml version="1.0"?>
 
83
<xrds:XRDS
 
84
    xmlns="xri://$xrd*($v*2.0)"
 
85
    xmlns:xrds="xri://$xrds">
 
86
  <XRD>
 
87
    <Service priority="0">
 
88
      %s
 
89
      <URI>%s</URI>
 
90
      <LocalID>%s</LocalID>
 
91
    </Service>
 
92
  </XRD>
 
93
</xrds:XRDS>
 
94
""" % ('\n'.join(type_uris), self.endpoint_url, self.localid_url))
 
95
        elif url.startswith(self.endpoint_url):
 
96
            # Gather query parameters
 
97
            query = {}
 
98
            if '?' in url:
 
99
                query.update(cgi.parse_qsl(url.split('?', 1)[1]))
 
100
            if body is not None:
 
101
                query.update(cgi.parse_qsl(body))
 
102
            self.last_request = self.server.decodeRequest(query)
 
103
 
 
104
            # The browser based requests should not be handled through
 
105
            # the fetcher interface.
 
106
            assert self.last_request.mode not in BROWSER_REQUEST_MODES
 
107
 
 
108
            response = self.server.handleRequest(self.last_request)
 
109
            webresponse = self.server.encodeResponse(response)
 
110
            return HTTPResponse(url,  webresponse.code, webresponse.headers,
 
111
                                webresponse.body)
 
112
        else:
 
113
            raise HTTPFetchingError('unknown URL %s' % url)
 
114
 
 
115
    def parseFormPost(self, content):
 
116
        """Parse an HTML form post to create an OpenID request."""
 
117
        # Hack to make the javascript XML compliant ...
 
118
        content = content.replace('i < elements.length',
 
119
                                  'i &lt; elements.length')
 
120
        tree = ET.XML(content)
 
121
        form = tree.find('.//form')
 
122
        assert form is not None, 'No form in document'
 
123
        assert form.get('action') == self.endpoint_url, (
 
124
            'Form posts to %s instead of %s' % (form.get('action'),
 
125
                                                self.endpoint_url))
 
126
        query = {}
 
127
        for input in form.findall('input'):
 
128
            if input.get('type') != 'hidden':
 
129
                continue
 
130
            query[input.get('name').encode('UTF-8')] = \
 
131
                input.get('value').encode('UTF-8')
 
132
        self.last_request = self.server.decodeRequest(query)
 
133
        return self.last_request
 
134
 
 
135
 
 
136
class DummyDjangoRequest(object):
 
137
    def __init__(self, request_path):
 
138
        self.request_path = request_path
 
139
        self.META = {
 
140
            'HTTP_HOST': "localhost",
 
141
            'SCRIPT_NAME': "http://localhost",
 
142
            'SERVER_PROTOCOL': "http",
 
143
        }
 
144
        self.POST = {
 
145
            'openid_identifier': "http://example.com/identity",
 
146
        }
 
147
        self.GET = {}
 
148
        self.session = {}
 
149
 
 
150
    def get_full_path(self):
 
151
        return self.META['SCRIPT_NAME'] + self.request_path
 
152
 
 
153
    def build_absolute_uri(self):
 
154
        return self.META['SCRIPT_NAME'] + self.request_path
 
155
        
 
156
    def _combined_request(self):
 
157
        request = {}
 
158
        request.update(self.POST)
 
159
        request.update(self.GET)
 
160
        return request
 
161
    REQUEST = property(_combined_request)
 
162
 
 
163
class RelyingPartyTests(TestCase):
 
164
    urls = 'django_openid_auth.tests.urls'
 
165
 
 
166
    def setUp(self):
 
167
        super(RelyingPartyTests, self).setUp()
 
168
        self.provider = StubOpenIDProvider('http://example.com/')
 
169
        self.req = DummyDjangoRequest('http://localhost/')
 
170
        self.endpoint = OpenIDServiceEndpoint()
 
171
        self.endpoint.claimed_id = 'http://example.com/identity'
 
172
        self.endpoint.server_url = 'http://example.com/'
 
173
        self.consumer = make_consumer(self.req)
 
174
        self.server = Server(DjangoOpenIDStore())
 
175
        setDefaultFetcher(self.provider, wrap_exceptions=False)
 
176
 
 
177
        self.old_login_redirect_url = getattr(settings, 'LOGIN_REDIRECT_URL', '/accounts/profile/')
 
178
        self.old_create_users = getattr(settings, 'OPENID_CREATE_USERS', False)
 
179
        self.old_strict_usernames = getattr(settings, 'OPENID_STRICT_USERNAMES', False)
 
180
        self.old_update_details = getattr(settings, 'OPENID_UPDATE_DETAILS_FROM_SREG', False)
 
181
        self.old_sso_server_url = getattr(settings, 'OPENID_SSO_SERVER_URL', None)
 
182
        self.old_teams_map = getattr(settings, 'OPENID_LAUNCHPAD_TEAMS_MAPPING', {})
 
183
        self.old_use_as_admin_login = getattr(settings, 'OPENID_USE_AS_ADMIN_LOGIN', False)
 
184
        self.old_follow_renames = getattr(settings, 'OPENID_FOLLOW_RENAMES', False)
 
185
        self.old_physical_multifactor = getattr(settings, 'OPENID_PHYSICAL_MULTIFACTOR_REQUIRED', False)
 
186
        self.old_login_render_failure = getattr(settings, 'OPENID_RENDER_FAILURE', None)
 
187
        self.old_consumer_complete = Consumer.complete
 
188
 
 
189
        self.old_required_fields = getattr(
 
190
            settings, 'OPENID_SREG_REQUIRED_FIELDS', [])
 
191
 
 
192
        settings.OPENID_CREATE_USERS = False
 
193
        settings.OPENID_STRICT_USERNAMES = False
 
194
        settings.OPENID_UPDATE_DETAILS_FROM_SREG = False
 
195
        settings.OPENID_SSO_SERVER_URL = None
 
196
        settings.OPENID_LAUNCHPAD_TEAMS_MAPPING = {}
 
197
        settings.OPENID_USE_AS_ADMIN_LOGIN = False
 
198
        settings.OPENID_FOLLOW_RENAMES = False
 
199
        settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = False
 
200
        settings.OPENID_SREG_REQUIRED_FIELDS = []
 
201
 
 
202
    def tearDown(self):
 
203
        settings.LOGIN_REDIRECT_URL = self.old_login_redirect_url
 
204
        settings.OPENID_CREATE_USERS = self.old_create_users
 
205
        settings.OPENID_STRICT_USERNAMES = self.old_strict_usernames
 
206
        settings.OPENID_UPDATE_DETAILS_FROM_SREG = self.old_update_details
 
207
        settings.OPENID_SSO_SERVER_URL = self.old_sso_server_url
 
208
        settings.OPENID_LAUNCHPAD_TEAMS_MAPPING = self.old_teams_map
 
209
        settings.OPENID_USE_AS_ADMIN_LOGIN = self.old_use_as_admin_login
 
210
        settings.OPENID_FOLLOW_RENAMES = self.old_follow_renames
 
211
        settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = self.old_physical_multifactor
 
212
        settings.OPENID_RENDER_FAILURE = self.old_login_render_failure
 
213
        Consumer.complete = self.old_consumer_complete
 
214
        settings.OPENID_SREG_REQUIRED_FIELDS = self.old_required_fields
 
215
 
 
216
        setDefaultFetcher(None)
 
217
        super(RelyingPartyTests, self).tearDown()
 
218
 
 
219
    def complete(self, openid_response):
 
220
        """Complete an OpenID authentication request."""
 
221
        # The server can generate either a redirect or a form post
 
222
        # here.  For simplicity, force generation of a redirect.
 
223
        openid_response.whichEncoding = lambda: ENCODE_URL
 
224
        webresponse = self.provider.server.encodeResponse(openid_response)
 
225
        self.assertEquals(webresponse.code, 302)
 
226
        redirect_to = webresponse.headers['location']
 
227
        self.assertTrue(redirect_to.startswith(
 
228
                'http://testserver/openid/complete/'))
 
229
        return self.client.get('/openid/complete/',
 
230
            dict(cgi.parse_qsl(redirect_to.split('?', 1)[1])))
 
231
 
 
232
    def test_login(self):
 
233
        user = User.objects.create_user('someuser', 'someone@example.com')
 
234
        useropenid = UserOpenID(
 
235
            user=user,
 
236
            claimed_id='http://example.com/identity',
 
237
            display_id='http://example.com/identity')
 
238
        useropenid.save()
 
239
 
 
240
        # The login form is displayed:
 
241
        response = self.client.get('/openid/login/')
 
242
        self.assertTemplateUsed(response, 'openid/login.html')
 
243
 
 
244
        # Posting in an identity URL begins the authentication request:
 
245
        response = self.client.post('/openid/login/',
 
246
            {'openid_identifier': 'http://example.com/identity',
 
247
             'next': '/getuser/'})
 
248
        self.assertContains(response, 'OpenID transaction in progress')
 
249
 
 
250
        openid_request = self.provider.parseFormPost(response.content)
 
251
        self.assertEquals(openid_request.mode, 'checkid_setup')
 
252
        self.assertTrue(openid_request.return_to.startswith(
 
253
                'http://testserver/openid/complete/'))
 
254
 
 
255
        # Complete the request.  The user is redirected to the next URL.
 
256
        openid_response = openid_request.answer(True)
 
257
        response = self.complete(openid_response)
 
258
        self.assertRedirects(response, 'http://testserver/getuser/')
 
259
 
 
260
        # And they are now logged in:
 
261
        response = self.client.get('/getuser/')
 
262
        self.assertEquals(response.content, 'someuser')
 
263
 
 
264
    def test_login_no_next(self):
 
265
        """Logins with no next parameter redirect to LOGIN_REDIRECT_URL."""
 
266
        user = User.objects.create_user('someuser', 'someone@example.com')
 
267
        useropenid = UserOpenID(
 
268
            user=user,
 
269
            claimed_id='http://example.com/identity',
 
270
            display_id='http://example.com/identity')
 
271
        useropenid.save()
 
272
 
 
273
        settings.LOGIN_REDIRECT_URL = '/getuser/'
 
274
        response = self.client.post('/openid/login/',
 
275
            {'openid_identifier': 'http://example.com/identity'})
 
276
        self.assertContains(response, 'OpenID transaction in progress')
 
277
 
 
278
        openid_request = self.provider.parseFormPost(response.content)
 
279
        self.assertEquals(openid_request.mode, 'checkid_setup')
 
280
        self.assertTrue(openid_request.return_to.startswith(
 
281
                'http://testserver/openid/complete/'))
 
282
 
 
283
        # Complete the request.  The user is redirected to the next URL.
 
284
        openid_response = openid_request.answer(True)
 
285
        response = self.complete(openid_response)
 
286
        self.assertRedirects(
 
287
            response, 'http://testserver' + settings.LOGIN_REDIRECT_URL)
 
288
 
 
289
    def test_login_sso(self):
 
290
        settings.OPENID_SSO_SERVER_URL = 'http://example.com/identity'
 
291
        user = User.objects.create_user('someuser', 'someone@example.com')
 
292
        useropenid = UserOpenID(
 
293
            user=user,
 
294
            claimed_id='http://example.com/identity',
 
295
            display_id='http://example.com/identity')
 
296
        useropenid.save()
 
297
 
 
298
        # Requesting the login form immediately begins an
 
299
        # authentication request.
 
300
        response = self.client.get('/openid/login/', {'next': '/getuser/'})
 
301
        self.assertEquals(response.status_code, 200)
 
302
        self.assertContains(response, 'OpenID transaction in progress')
 
303
 
 
304
        openid_request = self.provider.parseFormPost(response.content)
 
305
        self.assertEquals(openid_request.mode, 'checkid_setup')
 
306
        self.assertTrue(openid_request.return_to.startswith(
 
307
                'http://testserver/openid/complete/'))
 
308
 
 
309
        # Complete the request.  The user is redirected to the next URL.
 
310
        openid_response = openid_request.answer(True)
 
311
        response = self.complete(openid_response)
 
312
        self.assertRedirects(response, 'http://testserver/getuser/')
 
313
 
 
314
        # And they are now logged in:
 
315
        response = self.client.get('/getuser/')
 
316
        self.assertEquals(response.content, 'someuser')
 
317
 
 
318
    def test_login_create_users(self):
 
319
        settings.OPENID_CREATE_USERS = True
 
320
        # Create a user with the same name as we'll pass back via sreg.
 
321
        User.objects.create_user('someuser', 'someone@example.com')
 
322
 
 
323
        # Posting in an identity URL begins the authentication request:
 
324
        response = self.client.post('/openid/login/',
 
325
            {'openid_identifier': 'http://example.com/identity',
 
326
             'next': '/getuser/'})
 
327
        self.assertContains(response, 'OpenID transaction in progress')
 
328
 
 
329
        # Complete the request, passing back some simple registration
 
330
        # data.  The user is redirected to the next URL.
 
331
        openid_request = self.provider.parseFormPost(response.content)
 
332
        sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
 
333
        openid_response = openid_request.answer(True)
 
334
        sreg_response = sreg.SRegResponse.extractResponse(
 
335
            sreg_request, {'nickname': 'someuser', 'fullname': 'Some User',
 
336
                           'email': 'foo@example.com'})
 
337
        openid_response.addExtension(sreg_response)
 
338
        response = self.complete(openid_response)
 
339
        self.assertRedirects(response, 'http://testserver/getuser/')
 
340
 
 
341
        # And they are now logged in as a new user (they haven't taken
 
342
        # over the existing "someuser" user).
 
343
        response = self.client.get('/getuser/')
 
344
        self.assertEquals(response.content, 'someuser2')
 
345
 
 
346
        # Check the details of the new user.
 
347
        user = User.objects.get(username='someuser2')
 
348
        self.assertEquals(user.first_name, 'Some')
 
349
        self.assertEquals(user.last_name, 'User')
 
350
        self.assertEquals(user.email, 'foo@example.com')
 
351
 
 
352
    def _do_user_login(self, req_data, resp_data, use_sreg=True, use_pape=None):
 
353
        openid_request = self._get_login_request(req_data)
 
354
        openid_response = self._get_login_response(openid_request, resp_data, use_sreg, use_pape)
 
355
        response = self.complete(openid_response)
 
356
        self.assertRedirects(response, 'http://testserver/getuser/')
 
357
        return response
 
358
 
 
359
    def _get_login_request(self, req_data):
 
360
        # Posting in an identity URL begins the authentication request:
 
361
        response = self.client.post('/openid/login/', req_data)
 
362
        self.assertContains(response, 'OpenID transaction in progress')
 
363
 
 
364
        # Complete the request, passing back some simple registration
 
365
        # data.  The user is redirected to the next URL.
 
366
        openid_request = self.provider.parseFormPost(response.content)
 
367
        return openid_request
 
368
 
 
369
    def _get_login_response(self, openid_request, resp_data, use_sreg, use_pape):
 
370
        openid_response = openid_request.answer(True)
 
371
 
 
372
        if use_sreg:
 
373
            sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
 
374
            sreg_response = sreg.SRegResponse.extractResponse(
 
375
                sreg_request, resp_data)
 
376
            openid_response.addExtension(sreg_response)
 
377
        if use_pape is not None:
 
378
            policies = [
 
379
                use_pape
 
380
            ]
 
381
            pape_response = pape.Response(auth_policies=policies)
 
382
            openid_response.addExtension(pape_response)
 
383
        return openid_response
 
384
 
 
385
    def parse_query_string(self, query_str):
 
386
        query_items = map(tuple,
 
387
            [item.split('=') for item in query_str.split('&')])
 
388
        query = dict(query_items)
 
389
        return query
 
390
 
 
391
    def test_login_physical_multifactor_request(self):
 
392
        settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = True
 
393
        preferred_auth = pape.AUTH_MULTI_FACTOR_PHYSICAL
 
394
        self.provider.type_uris.append(pape.ns_uri)
 
395
        
 
396
        openid_req = {'openid_identifier': 'http://example.com/identity',
 
397
               'next': '/getuser/'}
 
398
        response = self.client.post('/openid/login/', openid_req)
 
399
        openid_request = self.provider.parseFormPost(response.content)
 
400
 
 
401
        request_auth = openid_request.message.getArg(
 
402
            'http://specs.openid.net/extensions/pape/1.0',
 
403
            'preferred_auth_policies',
 
404
        )
 
405
        self.assertEqual(request_auth, preferred_auth)
 
406
 
 
407
    def test_login_physical_multifactor_response(self):
 
408
        settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = True
 
409
        preferred_auth = pape.AUTH_MULTI_FACTOR_PHYSICAL
 
410
        self.provider.type_uris.append(pape.ns_uri)
 
411
 
 
412
        def mock_complete(this, request_args, return_to):
 
413
            request = {'openid.mode': 'checkid_setup',
 
414
                       'openid.trust_root': 'http://localhost/',
 
415
                       'openid.return_to': 'http://localhost/',
 
416
                       'openid.identity': IDENTIFIER_SELECT,
 
417
                       'openid.ns.pape' : pape.ns_uri,
 
418
                       'openid.pape.auth_policies': request_args.get('openid.pape.auth_policies', pape.AUTH_NONE),
 
419
            }
 
420
            openid_server = self.provider.server
 
421
            orequest = openid_server.decodeRequest(request)
 
422
            response = SuccessResponse(
 
423
                self.endpoint, orequest.message,
 
424
                signed_fields=['openid.pape.auth_policies',])
 
425
            return response
 
426
        Consumer.complete = mock_complete
 
427
 
 
428
        user = User.objects.create_user('testuser', 'test@example.com')
 
429
        useropenid = UserOpenID(
 
430
            user=user,
 
431
            claimed_id='http://example.com/identity',
 
432
            display_id='http://example.com/identity')
 
433
        useropenid.save()
 
434
 
 
435
        openid_req = {'openid_identifier': 'http://example.com/identity',
 
436
               'next': '/getuser/'}
 
437
        openid_resp =  {'nickname': 'testuser', 'fullname': 'Openid User',
 
438
                 'email': 'test@example.com'}
 
439
 
 
440
        response = self._do_user_login(openid_req, openid_resp, use_pape=pape.AUTH_MULTI_FACTOR_PHYSICAL)
 
441
 
 
442
        query = self.parse_query_string(response.request['QUERY_STRING'])
 
443
        self.assertTrue('openid.pape.auth_policies' in query)
 
444
        self.assertEqual(query['openid.pape.auth_policies'], 
 
445
                quote_plus(preferred_auth))
 
446
 
 
447
        response = self.client.get('/getuser/')
 
448
        self.assertEqual(response.content, 'testuser')
 
449
 
 
450
 
 
451
    def test_login_physical_multifactor_not_provided(self):
 
452
        settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = True
 
453
        preferred_auth = pape.AUTH_MULTI_FACTOR_PHYSICAL
 
454
        self.provider.type_uris.append(pape.ns_uri)
 
455
 
 
456
        def mock_complete(this, request_args, return_to):
 
457
            request = {'openid.mode': 'checkid_setup',
 
458
                       'openid.trust_root': 'http://localhost/',
 
459
                       'openid.return_to': 'http://localhost/',
 
460
                       'openid.identity': IDENTIFIER_SELECT,
 
461
                       'openid.ns.pape' : pape.ns_uri,
 
462
                       'openid.pape.auth_policies': request_args.get('openid.pape.auth_policies', pape.AUTH_NONE),
 
463
            }
 
464
            openid_server = self.provider.server
 
465
            orequest = openid_server.decodeRequest(request)
 
466
            response = SuccessResponse(
 
467
                self.endpoint, orequest.message,
 
468
                signed_fields=['openid.pape.auth_policies',])
 
469
            return response
 
470
        Consumer.complete = mock_complete
 
471
 
 
472
        user = User.objects.create_user('testuser', 'test@example.com')
 
473
        useropenid = UserOpenID(    
 
474
            user=user,
 
475
            claimed_id='http://example.com/identity',
 
476
            display_id='http://example.com/identity')
 
477
        useropenid.save()
 
478
 
 
479
        openid_req = {'openid_identifier': 'http://example.com/identity',
 
480
               'next': '/getuser/'}
 
481
        openid_resp =  {'nickname': 'testuser', 'fullname': 'Openid User',
 
482
                 'email': 'test@example.com'}
 
483
 
 
484
        openid_request = self._get_login_request(openid_req)
 
485
        openid_response = self._get_login_response(openid_request, openid_req, openid_resp, use_pape=pape.AUTH_NONE)
 
486
 
 
487
        response_auth = openid_request.message.getArg(
 
488
            'http://specs.openid.net/extensions/pape/1.0',
 
489
            'auth_policies',
 
490
        )
 
491
        self.assertNotEqual(response_auth, preferred_auth)
 
492
 
 
493
        response = self.complete(openid_response)
 
494
        self.assertEquals(403, response.status_code)
 
495
        self.assertContains(response, '<h1>OpenID failed</h1>', status_code=403)
 
496
        self.assertContains(response, '<p>Login requires physical multi-factor authentication.</p>', status_code=403)
 
497
 
 
498
    def test_login_physical_multifactor_not_provided_override(self):
 
499
        settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = True
 
500
        preferred_auth = pape.AUTH_MULTI_FACTOR_PHYSICAL
 
501
        self.provider.type_uris.append(pape.ns_uri)
 
502
 
 
503
        # Override the login_failure handler
 
504
        def mock_login_failure_handler(request, message, status=403,
 
505
                                       template_name=None,
 
506
                                       exception=None):
 
507
           self.assertTrue(isinstance(exception, MissingPhysicalMultiFactor))
 
508
           return HttpResponse('Test Failure Override', status=200)
 
509
        settings.OPENID_RENDER_FAILURE = mock_login_failure_handler
 
510
 
 
511
        def mock_complete(this, request_args, return_to):
 
512
            request = {'openid.mode': 'checkid_setup',
 
513
                       'openid.trust_root': 'http://localhost/',
 
514
                       'openid.return_to': 'http://localhost/',
 
515
                       'openid.identity': IDENTIFIER_SELECT,
 
516
                       'openid.ns.pape' : pape.ns_uri,
 
517
                       'openid.pape.auth_policies': request_args.get('openid.pape.auth_policies', pape.AUTH_NONE),
 
518
            }
 
519
            openid_server = self.provider.server
 
520
            orequest = openid_server.decodeRequest(request)
 
521
            response = SuccessResponse(
 
522
                self.endpoint, orequest.message,
 
523
                signed_fields=['openid.pape.auth_policies',])
 
524
            return response
 
525
        Consumer.complete = mock_complete
 
526
 
 
527
        user = User.objects.create_user('testuser', 'test@example.com')
 
528
        useropenid = UserOpenID(    
 
529
            user=user,
 
530
            claimed_id='http://example.com/identity',
 
531
            display_id='http://example.com/identity')
 
532
        useropenid.save()
 
533
 
 
534
        openid_req = {'openid_identifier': 'http://example.com/identity',
 
535
               'next': '/getuser/'}
 
536
        openid_resp =  {'nickname': 'testuser', 'fullname': 'Openid User',
 
537
                 'email': 'test@example.com'}
 
538
 
 
539
        openid_request = self._get_login_request(openid_req)
 
540
        openid_response = self._get_login_response(openid_request, openid_req, openid_resp, use_pape=pape.AUTH_NONE)
 
541
 
 
542
        response_auth = openid_request.message.getArg(
 
543
            'http://specs.openid.net/extensions/pape/1.0',
 
544
            'auth_policies',
 
545
        )
 
546
        self.assertNotEqual(response_auth, preferred_auth)
 
547
 
 
548
        # Status code should be 200, since we over-rode the login_failure handler
 
549
        response = self.complete(openid_response)
 
550
        self.assertEquals(200, response.status_code)
 
551
        self.assertContains(response, 'Test Failure Override')
 
552
 
 
553
    def test_login_without_nickname(self):
 
554
        settings.OPENID_CREATE_USERS = True
 
555
 
 
556
        openid_req = {'openid_identifier': 'http://example.com/identity',
 
557
               'next': '/getuser/'}
 
558
        openid_resp =  {'nickname': '', 'fullname': 'Openid User',
 
559
                 'email': 'foo@example.com'}
 
560
        self._do_user_login(openid_req, openid_resp)
 
561
        response = self.client.get('/getuser/')
 
562
 
 
563
        # username defaults to 'openiduser'
 
564
        self.assertEquals(response.content, 'openiduser')
 
565
 
 
566
        # The user's full name and email have been updated.
 
567
        user = User.objects.get(username=response.content)
 
568
        self.assertEquals(user.first_name, 'Openid')
 
569
        self.assertEquals(user.last_name, 'User')
 
570
        self.assertEquals(user.email, 'foo@example.com')
 
571
 
 
572
    def test_login_follow_rename(self):
 
573
        settings.OPENID_FOLLOW_RENAMES = True
 
574
        settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
 
575
        user = User.objects.create_user('testuser', 'someone@example.com')
 
576
        useropenid = UserOpenID(
 
577
            user=user,
 
578
            claimed_id='http://example.com/identity',
 
579
            display_id='http://example.com/identity')
 
580
        useropenid.save()
 
581
 
 
582
        openid_req = {'openid_identifier': 'http://example.com/identity',
 
583
               'next': '/getuser/'}
 
584
        openid_resp =  {'nickname': 'someuser', 'fullname': 'Some User',
 
585
                 'email': 'foo@example.com'}
 
586
        self._do_user_login(openid_req, openid_resp)
 
587
        response = self.client.get('/getuser/')
 
588
 
 
589
        # If OPENID_FOLLOW_RENAMES, they are logged in as
 
590
        # someuser (the passed in nickname has changed the username)
 
591
        self.assertEquals(response.content, 'someuser')
 
592
 
 
593
        # The user's full name and email have been updated.
 
594
        user = User.objects.get(username=response.content)
 
595
        self.assertEquals(user.first_name, 'Some')
 
596
        self.assertEquals(user.last_name, 'User')
 
597
        self.assertEquals(user.email, 'foo@example.com')
 
598
 
 
599
    def test_login_follow_rename_without_nickname_change(self):
 
600
        settings.OPENID_FOLLOW_RENAMES = True
 
601
        settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
 
602
        settings.OPENID_STRICT_USERNAMES = True
 
603
        user = User.objects.create_user('testuser', 'someone@example.com')
 
604
        useropenid = UserOpenID(
 
605
            user=user,
 
606
            claimed_id='http://example.com/identity',
 
607
            display_id='http://example.com/identity')
 
608
        useropenid.save()
 
609
 
 
610
        openid_req = {'openid_identifier': 'http://example.com/identity',
 
611
               'next': '/getuser/'}
 
612
        openid_resp =  {'nickname': 'testuser', 'fullname': 'Some User',
 
613
                 'email': 'foo@example.com'}
 
614
        self._do_user_login(openid_req, openid_resp)
 
615
        response = self.client.get('/getuser/')
 
616
 
 
617
        # Username should not have changed
 
618
        self.assertEquals(response.content, 'testuser')
 
619
 
 
620
        # The user's full name and email have been updated.
 
621
        user = User.objects.get(username=response.content)
 
622
        self.assertEquals(user.first_name, 'Some')
 
623
        self.assertEquals(user.last_name, 'User')
 
624
        self.assertEquals(user.email, 'foo@example.com')
 
625
 
 
626
    def test_login_follow_rename_conflict(self):
 
627
        settings.OPENID_FOLLOW_RENAMES = True
 
628
        settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
 
629
        # Setup existing user who's name we're going to switch to
 
630
        user = User.objects.create_user('testuser', 'someone@example.com')
 
631
        UserOpenID.objects.get_or_create(
 
632
            user=user,
 
633
            claimed_id='http://example.com/existing_identity',
 
634
            display_id='http://example.com/existing_identity')
 
635
 
 
636
        # Setup user who is going to try to change username to 'testuser'
 
637
        renamed_user = User.objects.create_user('renameuser', 'someone@example.com')
 
638
        UserOpenID.objects.get_or_create(
 
639
            user=renamed_user,
 
640
            claimed_id='http://example.com/identity',
 
641
            display_id='http://example.com/identity')
 
642
 
 
643
        # identity url is for 'renameuser'
 
644
        openid_req = {'openid_identifier': 'http://example.com/identity',
 
645
               'next': '/getuser/'}
 
646
        # but returned username is for 'testuser', which already exists for another identity
 
647
        openid_resp =  {'nickname': 'testuser', 'fullname': 'Rename User',
 
648
                 'email': 'rename@example.com'}
 
649
        self._do_user_login(openid_req, openid_resp)
 
650
        response = self.client.get('/getuser/')
 
651
 
 
652
        # If OPENID_FOLLOW_RENAMES, attempt to change username to 'testuser'
 
653
        # but since that username is already taken by someone else, we go through
 
654
        # the process of adding +i to it, and get testuser2.
 
655
        self.assertEquals(response.content, 'testuser2')
 
656
 
 
657
        # The user's full name and email have been updated.
 
658
        user = User.objects.get(username=response.content)
 
659
        self.assertEquals(user.first_name, 'Rename')
 
660
        self.assertEquals(user.last_name, 'User')
 
661
        self.assertEquals(user.email, 'rename@example.com')
 
662
 
 
663
    def test_login_follow_rename_false_onlyonce(self):
 
664
        settings.OPENID_FOLLOW_RENAMES = True
 
665
        settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
 
666
        # Setup existing user who's name we're going to switch to
 
667
        user = User.objects.create_user('testuser', 'someone@example.com')
 
668
        UserOpenID.objects.get_or_create(
 
669
            user=user,
 
670
            claimed_id='http://example.com/existing_identity',
 
671
            display_id='http://example.com/existing_identity')
 
672
 
 
673
        # Setup user who is going to try to change username to 'testuser'
 
674
        renamed_user = User.objects.create_user('testuser2000eight', 'someone@example.com')
 
675
        UserOpenID.objects.get_or_create(
 
676
            user=renamed_user,
 
677
            claimed_id='http://example.com/identity',
 
678
            display_id='http://example.com/identity')
 
679
 
 
680
        # identity url is for 'testuser2000eight'
 
681
        openid_req = {'openid_identifier': 'http://example.com/identity',
 
682
               'next': '/getuser/'}
 
683
        # but returned username is for 'testuser', which already exists for another identity
 
684
        openid_resp =  {'nickname': 'testuser2', 'fullname': 'Rename User',
 
685
                 'email': 'rename@example.com'}
 
686
        self._do_user_login(openid_req, openid_resp)
 
687
        response = self.client.get('/getuser/')
 
688
 
 
689
        # If OPENID_FOLLOW_RENAMES, attempt to change username to 'testuser'
 
690
        # but since that username is already taken by someone else, we go through
 
691
        # the process of adding +i to it.  Even though it looks like the username
 
692
        # follows the nickname+i scheme, it has non-numbers in the suffix, so
 
693
        # it's not an auto-generated one.  The regular process of renaming to
 
694
        # 'testuser' has a conflict, so we get +2 at the end.
 
695
        self.assertEquals(response.content, 'testuser2')
 
696
 
 
697
        # The user's full name and email have been updated.
 
698
        user = User.objects.get(username=response.content)
 
699
        self.assertEquals(user.first_name, 'Rename')
 
700
        self.assertEquals(user.last_name, 'User')
 
701
        self.assertEquals(user.email, 'rename@example.com')
 
702
 
 
703
    def test_login_follow_rename_conflict_onlyonce(self):
 
704
        settings.OPENID_FOLLOW_RENAMES = True
 
705
        settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
 
706
        # Setup existing user who's name we're going to switch to
 
707
        user = User.objects.create_user('testuser', 'someone@example.com')
 
708
        UserOpenID.objects.get_or_create(
 
709
            user=user,
 
710
            claimed_id='http://example.com/existing_identity',
 
711
            display_id='http://example.com/existing_identity')
 
712
 
 
713
        # Setup user who is going to try to change username to 'testuser'
 
714
        renamed_user = User.objects.create_user('testuser2000', 'someone@example.com')
 
715
        UserOpenID.objects.get_or_create(
 
716
            user=renamed_user,
 
717
            claimed_id='http://example.com/identity',
 
718
            display_id='http://example.com/identity')
 
719
 
 
720
        # identity url is for 'testuser2000'
 
721
        openid_req = {'openid_identifier': 'http://example.com/identity',
 
722
               'next': '/getuser/'}
 
723
        # but returned username is for 'testuser', which already exists for another identity
 
724
        openid_resp =  {'nickname': 'testuser', 'fullname': 'Rename User',
 
725
                 'email': 'rename@example.com'}
 
726
        self._do_user_login(openid_req, openid_resp)
 
727
        response = self.client.get('/getuser/')
 
728
 
 
729
        # If OPENID_FOLLOW_RENAMES, attempt to change username to 'testuser'
 
730
        # but since that username is already taken by someone else, we go through
 
731
        # the process of adding +i to it.  Since the user for this identity url
 
732
        # already has a name matching that pattern, check if first.
 
733
        self.assertEquals(response.content, 'testuser2000')
 
734
 
 
735
        # The user's full name and email have been updated.
 
736
        user = User.objects.get(username=response.content)
 
737
        self.assertEquals(user.first_name, 'Rename')
 
738
        self.assertEquals(user.last_name, 'User')
 
739
        self.assertEquals(user.email, 'rename@example.com')
 
740
 
 
741
    def test_login_follow_rename_false_conflict(self):
 
742
        settings.OPENID_FOLLOW_RENAMES = True
 
743
        settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
 
744
        # Setup existing user who's username matches the name+i pattern
 
745
        user = User.objects.create_user('testuser2', 'someone@example.com')
 
746
        UserOpenID.objects.get_or_create(
 
747
            user=user,
 
748
            claimed_id='http://example.com/identity',
 
749
            display_id='http://example.com/identity')
 
750
 
 
751
        # identity url is for 'testuser2'
 
752
        openid_req = {'openid_identifier': 'http://example.com/identity',
 
753
               'next': '/getuser/'}
 
754
        # but returned username is for 'testuser', which looks like we've done
 
755
        # a username+1 for them already, but 'testuser' isn't actually taken
 
756
        openid_resp =  {'nickname': 'testuser', 'fullname': 'Same User',
 
757
                 'email': 'same@example.com'}
 
758
        self._do_user_login(openid_req, openid_resp)
 
759
        response = self.client.get('/getuser/')
 
760
 
 
761
        # If OPENID_FOLLOW_RENAMES, username should be changed to 'testuser'
 
762
        # because it wasn't currently taken
 
763
        self.assertEquals(response.content, 'testuser')
 
764
 
 
765
        # The user's full name and email have been updated.
 
766
        user = User.objects.get(username=response.content)
 
767
        self.assertEquals(user.first_name, 'Same')
 
768
        self.assertEquals(user.last_name, 'User')
 
769
        self.assertEquals(user.email, 'same@example.com')
 
770
 
 
771
    def test_strict_username_no_nickname(self):
 
772
        settings.OPENID_CREATE_USERS = True
 
773
        settings.OPENID_STRICT_USERNAMES = True
 
774
        settings.OPENID_SREG_REQUIRED_FIELDS = []
 
775
 
 
776
        # Posting in an identity URL begins the authentication request:
 
777
        response = self.client.post('/openid/login/',
 
778
            {'openid_identifier': 'http://example.com/identity',
 
779
             'next': '/getuser/'})
 
780
        self.assertContains(response, 'OpenID transaction in progress')
 
781
 
 
782
        # Complete the request, passing back some simple registration
 
783
        # data.  The user is redirected to the next URL.
 
784
        openid_request = self.provider.parseFormPost(response.content)
 
785
        sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
 
786
        openid_response = openid_request.answer(True)
 
787
        sreg_response = sreg.SRegResponse.extractResponse(
 
788
            sreg_request, {'nickname': '', # No nickname
 
789
                           'fullname': 'Some User',
 
790
                           'email': 'foo@example.com'})
 
791
        openid_response.addExtension(sreg_response)
 
792
        response = self.complete(openid_response)
 
793
 
 
794
        # Status code should be 403: Forbidden
 
795
        self.assertEquals(403, response.status_code)
 
796
        self.assertContains(response, '<h1>OpenID failed</h1>', status_code=403)
 
797
        self.assertContains(response, "An attribute required for logging in was not returned "
 
798
            "(nickname)", status_code=403)
 
799
 
 
800
    def test_strict_username_no_nickname_override(self):
 
801
        settings.OPENID_CREATE_USERS = True
 
802
        settings.OPENID_STRICT_USERNAMES = True
 
803
        settings.OPENID_SREG_REQUIRED_FIELDS = []
 
804
 
 
805
        # Override the login_failure handler
 
806
        def mock_login_failure_handler(request, message, status=403,
 
807
                                       template_name=None,
 
808
                                       exception=None):
 
809
           self.assertTrue(isinstance(exception, (RequiredAttributeNotReturned, MissingUsernameViolation)))
 
810
           return HttpResponse('Test Failure Override', status=200)
 
811
        settings.OPENID_RENDER_FAILURE = mock_login_failure_handler
 
812
        
 
813
        # Posting in an identity URL begins the authentication request:
 
814
        response = self.client.post('/openid/login/',
 
815
            {'openid_identifier': 'http://example.com/identity',
 
816
             'next': '/getuser/'})
 
817
        self.assertContains(response, 'OpenID transaction in progress')
 
818
 
 
819
        # Complete the request, passing back some simple registration
 
820
        # data.  The user is redirected to the next URL.
 
821
        openid_request = self.provider.parseFormPost(response.content)
 
822
        sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
 
823
        openid_response = openid_request.answer(True)
 
824
        sreg_response = sreg.SRegResponse.extractResponse(
 
825
            sreg_request, {'nickname': '', # No nickname
 
826
                           'fullname': 'Some User',
 
827
                           'email': 'foo@example.com'})
 
828
        openid_response.addExtension(sreg_response)
 
829
        response = self.complete(openid_response)
 
830
            
 
831
        # Status code should be 200, since we over-rode the login_failure handler
 
832
        self.assertEquals(200, response.status_code)
 
833
        self.assertContains(response, 'Test Failure Override')
 
834
 
 
835
    def test_strict_username_duplicate_user(self):
 
836
        settings.OPENID_CREATE_USERS = True
 
837
        settings.OPENID_STRICT_USERNAMES = True
 
838
        # Create a user with the same name as we'll pass back via sreg.
 
839
        user = User.objects.create_user('someuser', 'someone@example.com')
 
840
        useropenid = UserOpenID(
 
841
            user=user,
 
842
            claimed_id='http://example.com/different_identity',
 
843
            display_id='http://example.com/different_identity')
 
844
        useropenid.save()
 
845
 
 
846
        # Posting in an identity URL begins the authentication request:
 
847
        response = self.client.post('/openid/login/',
 
848
            {'openid_identifier': 'http://example.com/identity',
 
849
             'next': '/getuser/'})
 
850
        self.assertContains(response, 'OpenID transaction in progress')
 
851
 
 
852
        # Complete the request, passing back some simple registration
 
853
        # data.  The user is redirected to the next URL.
 
854
        openid_request = self.provider.parseFormPost(response.content)
 
855
        sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
 
856
        openid_response = openid_request.answer(True)
 
857
        sreg_response = sreg.SRegResponse.extractResponse(
 
858
            sreg_request, {'nickname': 'someuser', 'fullname': 'Some User',
 
859
                           'email': 'foo@example.com'})
 
860
        openid_response.addExtension(sreg_response)
 
861
        response = self.complete(openid_response)
 
862
 
 
863
        # Status code should be 403: Forbidden
 
864
        self.assertEquals(403, response.status_code)
 
865
        self.assertContains(response, '<h1>OpenID failed</h1>', status_code=403)
 
866
        self.assertContains(response,
 
867
            "The username (someuser) with which you tried to log in is "
 
868
            "already in use for a different account.",
 
869
            status_code=403)
 
870
 
 
871
    def test_strict_username_duplicate_user_override(self):
 
872
        settings.OPENID_CREATE_USERS = True
 
873
        settings.OPENID_STRICT_USERNAMES = True
 
874
 
 
875
        # Override the login_failure handler
 
876
        def mock_login_failure_handler(request, message, status=403,
 
877
                                       template_name=None,
 
878
                                       exception=None):
 
879
           self.assertTrue(isinstance(exception, DuplicateUsernameViolation))
 
880
           return HttpResponse('Test Failure Override', status=200)
 
881
        settings.OPENID_RENDER_FAILURE = mock_login_failure_handler
 
882
 
 
883
        # Create a user with the same name as we'll pass back via sreg.
 
884
        user = User.objects.create_user('someuser', 'someone@example.com')
 
885
        useropenid = UserOpenID(
 
886
            user=user,
 
887
            claimed_id='http://example.com/different_identity',
 
888
            display_id='http://example.com/different_identity')
 
889
        useropenid.save()
 
890
 
 
891
        # Posting in an identity URL begins the authentication request:
 
892
        response = self.client.post('/openid/login/',
 
893
            {'openid_identifier': 'http://example.com/identity',
 
894
             'next': '/getuser/'})
 
895
        self.assertContains(response, 'OpenID transaction in progress')
 
896
 
 
897
        # Complete the request, passing back some simple registration
 
898
        # data.  The user is redirected to the next URL.
 
899
        openid_request = self.provider.parseFormPost(response.content)
 
900
        sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
 
901
        openid_response = openid_request.answer(True)
 
902
        sreg_response = sreg.SRegResponse.extractResponse(
 
903
            sreg_request, {'nickname': 'someuser', 'fullname': 'Some User',
 
904
                           'email': 'foo@example.com'})
 
905
        openid_response.addExtension(sreg_response)
 
906
        response = self.complete(openid_response)
 
907
        
 
908
        # Status code should be 200, since we over-rode the login_failure handler
 
909
        self.assertEquals(200, response.status_code)
 
910
        self.assertContains(response, 'Test Failure Override')
 
911
 
 
912
    def test_login_requires_sreg_required_fields(self):
 
913
        # If any required attributes are not included in the response,
 
914
        # we fail with a forbidden.
 
915
        settings.OPENID_CREATE_USERS = True
 
916
        settings.OPENID_SREG_REQUIRED_FIELDS = ('email', 'language')
 
917
        # Posting in an identity URL begins the authentication request:
 
918
        response = self.client.post('/openid/login/',
 
919
            {'openid_identifier': 'http://example.com/identity',
 
920
             'next': '/getuser/'})
 
921
        self.assertContains(response, 'OpenID transaction in progress')
 
922
 
 
923
        # Complete the request, passing back some simple registration
 
924
        # data.  The user is redirected to the next URL.
 
925
        openid_request = self.provider.parseFormPost(response.content)
 
926
        sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
 
927
        openid_response = openid_request.answer(True)
 
928
        sreg_response = sreg.SRegResponse.extractResponse(
 
929
            sreg_request, {'nickname': 'foo',
 
930
                           'fullname': 'Some User',
 
931
                           'email': 'foo@example.com'})
 
932
        openid_response.addExtension(sreg_response)
 
933
        response = self.complete(openid_response)
 
934
 
 
935
        # Status code should be 403: Forbidden as we didn't include
 
936
        # a required field - language.
 
937
        self.assertContains(response,
 
938
            "An attribute required for logging in was not returned "
 
939
            "(language)", status_code=403)
 
940
 
 
941
    def test_login_update_details(self):
 
942
        settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
 
943
        user = User.objects.create_user('testuser', 'someone@example.com')
 
944
        useropenid = UserOpenID(
 
945
            user=user,
 
946
            claimed_id='http://example.com/identity',
 
947
            display_id='http://example.com/identity')
 
948
        useropenid.save()
 
949
 
 
950
        openid_req = {'openid_identifier': 'http://example.com/identity',
 
951
               'next': '/getuser/'}
 
952
        openid_resp =  {'nickname': 'testuser', 'fullname': 'Some User',
 
953
                 'email': 'foo@example.com'}
 
954
        self._do_user_login(openid_req, openid_resp)
 
955
        response = self.client.get('/getuser/')
 
956
 
 
957
        self.assertEquals(response.content, 'testuser')
 
958
 
 
959
        # The user's full name and email have been updated.
 
960
        user = User.objects.get(username=response.content)
 
961
        self.assertEquals(user.first_name, 'Some')
 
962
        self.assertEquals(user.last_name, 'User')
 
963
        self.assertEquals(user.email, 'foo@example.com')
 
964
 
 
965
    def test_login_uses_sreg_extra_fields(self):
 
966
        # The configurable sreg attributes are used in the request.
 
967
        settings.OPENID_SREG_EXTRA_FIELDS = ('language',)
 
968
        user = User.objects.create_user('testuser', 'someone@example.com')
 
969
        useropenid = UserOpenID(
 
970
            user=user,
 
971
            claimed_id='http://example.com/identity',
 
972
            display_id='http://example.com/identity')
 
973
        useropenid.save()
 
974
 
 
975
        # Posting in an identity URL begins the authentication request:
 
976
        response = self.client.post('/openid/login/',
 
977
            {'openid_identifier': 'http://example.com/identity',
 
978
             'next': '/getuser/'})
 
979
 
 
980
        openid_request = self.provider.parseFormPost(response.content)
 
981
        sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
 
982
        for field in ('email', 'fullname', 'nickname', 'language'):
 
983
            self.assertTrue(field in sreg_request)
 
984
 
 
985
    def test_login_uses_sreg_required_fields(self):
 
986
        # The configurable sreg attributes are used in the request.
 
987
        settings.OPENID_SREG_REQUIRED_FIELDS = ('email', 'language')
 
988
        user = User.objects.create_user('testuser', 'someone@example.com')
 
989
        useropenid = UserOpenID(
 
990
            user=user,
 
991
            claimed_id='http://example.com/identity',
 
992
            display_id='http://example.com/identity')
 
993
        useropenid.save()
 
994
 
 
995
        # Posting in an identity URL begins the authentication request:
 
996
        response = self.client.post('/openid/login/',
 
997
            {'openid_identifier': 'http://example.com/identity',
 
998
             'next': '/getuser/'})
 
999
 
 
1000
        openid_request = self.provider.parseFormPost(response.content)
 
1001
        sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
 
1002
 
 
1003
        self.assertEqual(['email', 'language'], sreg_request.required)
 
1004
        self.assertEqual(['fullname', 'nickname'], sreg_request.optional)
 
1005
 
 
1006
    def test_login_attribute_exchange(self):
 
1007
        settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
 
1008
        user = User.objects.create_user('testuser', 'someone@example.com')
 
1009
        useropenid = UserOpenID(
 
1010
            user=user,
 
1011
            claimed_id='http://example.com/identity',
 
1012
            display_id='http://example.com/identity')
 
1013
        useropenid.save()
 
1014
 
 
1015
        # Configure the provider to advertise attribute exchange
 
1016
        # protocol and start the authentication process:
 
1017
        self.provider.type_uris.append('http://openid.net/srv/ax/1.0')
 
1018
        response = self.client.post('/openid/login/',
 
1019
            {'openid_identifier': 'http://example.com/identity',
 
1020
             'next': '/getuser/'})
 
1021
        self.assertContains(response, 'OpenID transaction in progress')
 
1022
 
 
1023
        # The resulting OpenID request uses the Attribute Exchange
 
1024
        # extension rather than the Simple Registration extension.
 
1025
        openid_request = self.provider.parseFormPost(response.content)
 
1026
        sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
 
1027
        self.assertEqual(sreg_request.required, [])
 
1028
        self.assertEqual(sreg_request.optional, [])
 
1029
 
 
1030
        fetch_request = ax.FetchRequest.fromOpenIDRequest(openid_request)
 
1031
        self.assertTrue(fetch_request.has_key(
 
1032
                'http://axschema.org/contact/email'))
 
1033
        self.assertTrue(fetch_request.has_key(
 
1034
                'http://axschema.org/namePerson'))
 
1035
        self.assertTrue(fetch_request.has_key(
 
1036
                'http://axschema.org/namePerson/first'))
 
1037
        self.assertTrue(fetch_request.has_key(
 
1038
                'http://axschema.org/namePerson/last'))
 
1039
        self.assertTrue(fetch_request.has_key(
 
1040
                'http://axschema.org/namePerson/friendly'))
 
1041
        # myOpenID compatibilty attributes:
 
1042
        self.assertTrue(fetch_request.has_key(
 
1043
                'http://schema.openid.net/contact/email'))
 
1044
        self.assertTrue(fetch_request.has_key(
 
1045
                'http://schema.openid.net/namePerson'))
 
1046
        self.assertTrue(fetch_request.has_key(
 
1047
                'http://schema.openid.net/namePerson/friendly'))
 
1048
 
 
1049
        # Build up a response including AX data.
 
1050
        openid_response = openid_request.answer(True)
 
1051
        fetch_response = ax.FetchResponse(fetch_request)
 
1052
        fetch_response.addValue(
 
1053
            'http://axschema.org/contact/email', 'foo@example.com')
 
1054
        fetch_response.addValue(
 
1055
            'http://axschema.org/namePerson/first', 'Firstname')
 
1056
        fetch_response.addValue(
 
1057
            'http://axschema.org/namePerson/last', 'Lastname')
 
1058
        fetch_response.addValue(
 
1059
            'http://axschema.org/namePerson/friendly', 'someuser')
 
1060
        openid_response.addExtension(fetch_response)
 
1061
        response = self.complete(openid_response)
 
1062
        self.assertRedirects(response, 'http://testserver/getuser/')
 
1063
 
 
1064
        # And they are now logged in as testuser (the passed in
 
1065
        # nickname has not caused the username to change).
 
1066
        response = self.client.get('/getuser/')
 
1067
        self.assertEquals(response.content, 'testuser')
 
1068
 
 
1069
        # The user's full name and email have been updated.
 
1070
        user = User.objects.get(username='testuser')
 
1071
        self.assertEquals(user.first_name, 'Firstname')
 
1072
        self.assertEquals(user.last_name, 'Lastname')
 
1073
        self.assertEquals(user.email, 'foo@example.com')
 
1074
 
 
1075
    def test_login_teams(self):
 
1076
        settings.OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO = False
 
1077
        settings.OPENID_LAUNCHPAD_TEAMS_MAPPING = {'teamname': 'groupname',
 
1078
                                                   'otherteam': 'othergroup'}
 
1079
        user = User.objects.create_user('testuser', 'someone@example.com')
 
1080
        group = Group(name='groupname')
 
1081
        group.save()
 
1082
        ogroup = Group(name='othergroup')
 
1083
        ogroup.save()
 
1084
        user.groups.add(ogroup)
 
1085
        user.save()
 
1086
        useropenid = UserOpenID(
 
1087
            user=user,
 
1088
            claimed_id='http://example.com/identity',
 
1089
            display_id='http://example.com/identity')
 
1090
        useropenid.save()
 
1091
 
 
1092
        # Posting in an identity URL begins the authentication request:
 
1093
        response = self.client.post('/openid/login/',
 
1094
            {'openid_identifier': 'http://example.com/identity',
 
1095
             'next': '/getuser/'})
 
1096
        self.assertContains(response, 'OpenID transaction in progress')
 
1097
 
 
1098
        # Complete the request
 
1099
        openid_request = self.provider.parseFormPost(response.content)
 
1100
        openid_response = openid_request.answer(True)
 
1101
        teams_request = teams.TeamsRequest.fromOpenIDRequest(openid_request)
 
1102
        teams_response = teams.TeamsResponse.extractResponse(
 
1103
            teams_request, 'teamname,some-other-team')
 
1104
        openid_response.addExtension(teams_response)
 
1105
        response = self.complete(openid_response)
 
1106
        self.assertRedirects(response, 'http://testserver/getuser/')
 
1107
 
 
1108
        # And they are now logged in as testuser
 
1109
        response = self.client.get('/getuser/')
 
1110
        self.assertEquals(response.content, 'testuser')
 
1111
 
 
1112
        # The user's groups have been updated.
 
1113
        user = User.objects.get(username='testuser')
 
1114
        self.assertTrue(group in user.groups.all())
 
1115
        self.assertTrue(ogroup not in user.groups.all())
 
1116
 
 
1117
    def test_login_teams_automapping(self):
 
1118
        settings.OPENID_LAUNCHPAD_TEAMS_MAPPING = {'teamname': 'groupname',
 
1119
                                                   'otherteam': 'othergroup'}
 
1120
        settings.OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO = True
 
1121
        settings.OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO_BLACKLIST = ['django-group1', 'django-group2']
 
1122
        user = User.objects.create_user('testuser', 'someone@example.com')
 
1123
        group1 = Group(name='django-group1')
 
1124
        group1.save()
 
1125
        group2 = Group(name='django-group2')
 
1126
        group2.save()
 
1127
        group3 = Group(name='django-group3')
 
1128
        group3.save()
 
1129
        user.save()
 
1130
        useropenid = UserOpenID(
 
1131
            user=user,
 
1132
            claimed_id='http://example.com/identity',
 
1133
            display_id='http://example.com/identity')
 
1134
        useropenid.save()
 
1135
 
 
1136
        # Posting in an identity URL begins the authentication request:
 
1137
        response = self.client.post('/openid/login/',
 
1138
            {'openid_identifier': 'http://example.com/identity',
 
1139
             'next': '/getuser/'})
 
1140
        self.assertContains(response, 'OpenID transaction in progress')
 
1141
 
 
1142
        # Complete the request
 
1143
        openid_request = self.provider.parseFormPost(response.content)
 
1144
        openid_response = openid_request.answer(True)
 
1145
        teams_request = teams.TeamsRequest.fromOpenIDRequest(openid_request)
 
1146
 
 
1147
        self.assertEqual(group1 in user.groups.all(), False)
 
1148
        self.assertEqual(group2 in user.groups.all(), False)
 
1149
        self.assertTrue(group3 not in user.groups.all())
 
1150
 
 
1151
    def test_login_teams_staff_not_defined(self):
 
1152
        delattr(settings, 'OPENID_LAUNCHPAD_STAFF_TEAMS')
 
1153
        user = User.objects.create_user('testuser', 'someone@example.com')
 
1154
        user.is_staff = True
 
1155
        user.save()
 
1156
        self.assertTrue(user.is_staff)
 
1157
 
 
1158
        user = self.get_openid_authed_user_with_teams(user, 'teamname,some-other-team')
 
1159
        self.assertTrue(user.is_staff)
 
1160
 
 
1161
    def test_login_teams_staff_assignment(self):
 
1162
        settings.OPENID_LAUNCHPAD_STAFF_TEAMS = ('teamname',)
 
1163
        user = User.objects.create_user('testuser', 'someone@example.com')
 
1164
        user.is_staff = False
 
1165
        user.save()
 
1166
        self.assertFalse(user.is_staff)
 
1167
 
 
1168
        user = self.get_openid_authed_user_with_teams(user, 'teamname,some-other-team')
 
1169
        self.assertTrue(user.is_staff)
 
1170
 
 
1171
    def test_login_teams_staff_unassignment(self):
 
1172
        settings.OPENID_LAUNCHPAD_STAFF_TEAMS = ('different-teamname',)
 
1173
        user = User.objects.create_user('testuser', 'someone@example.com')
 
1174
        user.is_staff = True
 
1175
        user.save()
 
1176
        self.assertTrue(user.is_staff)
 
1177
 
 
1178
        user = self.get_openid_authed_user_with_teams(user, 'teamname,some-other-team')
 
1179
        self.assertFalse(user.is_staff)
 
1180
 
 
1181
    def get_openid_authed_user_with_teams(self, user, teams_str):
 
1182
        useropenid = UserOpenID(
 
1183
            user=user,
 
1184
            claimed_id='http://example.com/identity',
 
1185
            display_id='http://example.com/identity')
 
1186
        useropenid.save()
 
1187
 
 
1188
        # Posting in an identity URL begins the authentication request:
 
1189
        response = self.client.post('/openid/login/',
 
1190
            {'openid_identifier': 'http://example.com/identity'})
 
1191
 
 
1192
        # Complete the request
 
1193
        openid_request = self.provider.parseFormPost(response.content)
 
1194
        openid_response = openid_request.answer(True)
 
1195
        teams_request = teams.TeamsRequest.fromOpenIDRequest(openid_request)
 
1196
        teams_response = teams.TeamsResponse.extractResponse(
 
1197
            teams_request, teams_str)
 
1198
        openid_response.addExtension(teams_response)
 
1199
        response = self.complete(openid_response)
 
1200
        return User.objects.get(username=user.username)
 
1201
 
 
1202
    def test_login_complete_signals_login(self):
 
1203
        # An oauth_login_complete signal is emitted including the
 
1204
        # request and sreg_response.
 
1205
        user = User.objects.create_user('someuser', 'someone@example.com')
 
1206
        useropenid = UserOpenID(
 
1207
            user=user,
 
1208
            claimed_id='http://example.com/identity',
 
1209
            display_id='http://example.com/identity')
 
1210
        useropenid.save()
 
1211
        response = self.client.post('/openid/login/',
 
1212
            {'openid_identifier': 'http://example.com/identity'})
 
1213
        openid_request = self.provider.parseFormPost(response.content)
 
1214
        openid_response = openid_request.answer(True)
 
1215
        # Use a closure to test whether the signal handler was called.
 
1216
        self.signal_handler_called = False
 
1217
        def login_callback(sender, **kwargs):
 
1218
            self.assertTrue(isinstance(
 
1219
                kwargs.get('request', None), HttpRequest))
 
1220
            self.assertTrue(isinstance(
 
1221
                kwargs.get('openid_response', None), SuccessResponse))
 
1222
            self.signal_handler_called = True
 
1223
        openid_login_complete.connect(login_callback)
 
1224
 
 
1225
        response = self.complete(openid_response)
 
1226
 
 
1227
        self.assertTrue(self.signal_handler_called)
 
1228
        openid_login_complete.disconnect(login_callback)
 
1229
 
 
1230
    
 
1231
class HelperFunctionsTest(TestCase):
 
1232
    def test_sanitise_redirect_url(self):
 
1233
        settings.ALLOWED_EXTERNAL_OPENID_REDIRECT_DOMAINS = [
 
1234
            "example.com", "example.org"]
 
1235
        # list of URLs and whether they should be passed or not
 
1236
        urls = [
 
1237
            ("http://example.com", True),
 
1238
            ("http://example.org/", True),
 
1239
            ("http://example.org/foo/bar", True),
 
1240
            ("http://example.org/foo/bar?baz=quux", True),
 
1241
            ("http://example.org:9999/foo/bar?baz=quux", True),
 
1242
            ("http://www.example.org/", False),
 
1243
            ("http://example.net/foo/bar?baz=quux", False),
 
1244
            ("/somewhere/local", True),
 
1245
            ("/somewhere/local?url=http://fail.com/bar", True),
 
1246
            # An empty path, as seen when no "next" parameter is passed.
 
1247
            ("", False),
 
1248
            ("/path with spaces", False),
 
1249
        ]
 
1250
        for url, returns_self in urls:
 
1251
            sanitised = sanitise_redirect_url(url)
 
1252
            if returns_self:
 
1253
                self.assertEqual(url, sanitised)
 
1254
            else:
 
1255
                self.assertEqual(settings.LOGIN_REDIRECT_URL, sanitised)
 
1256
 
 
1257
def suite():
 
1258
    return unittest.TestLoader().loadTestsFromName(__name__)