1
# django-openid-auth - OpenID integration for django.contrib.auth
3
# Copyright (C) 2009-2010 Canonical Ltd.
5
# Redistribution and use in source and binary forms, with or without
6
# modification, are permitted provided that the following conditions
9
# * Redistributions of source code must retain the above copyright
10
# notice, this list of conditions and the following disclaimer.
12
# * Redistributions in binary form must reproduce the above copyright
13
# notice, this list of conditions and the following disclaimer in the
14
# documentation and/or other materials provided with the distribution.
16
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
19
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
20
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
21
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
22
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
23
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
24
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
31
from urllib import quote_plus
33
from django.conf import settings
34
from django.contrib.auth.models import User, Group
35
from django.http import HttpRequest, HttpResponse
36
from django.test import TestCase
37
from openid.consumer.consumer import Consumer, SuccessResponse
38
from openid.consumer.discover import OpenIDServiceEndpoint
39
from openid.extensions import ax, sreg, pape
40
from openid.fetchers import (
41
HTTPFetcher, HTTPFetchingError, HTTPResponse, setDefaultFetcher)
42
from openid.oidutil import importElementTree
43
from openid.server.server import BROWSER_REQUEST_MODES, ENCODE_URL, Server
44
from openid.store.memstore import MemoryStore
45
from openid.message import OPENID1_URL_LIMIT, IDENTIFIER_SELECT
47
from django_openid_auth import teams
48
from django_openid_auth.models import UserOpenID
49
from django_openid_auth.views import (
50
sanitise_redirect_url,
53
from django_openid_auth.auth import OpenIDBackend
54
from django_openid_auth.signals import openid_login_complete
55
from django_openid_auth.store import DjangoOpenIDStore
56
from django_openid_auth.exceptions import (
57
MissingUsernameViolation,
58
DuplicateUsernameViolation,
59
MissingPhysicalMultiFactor,
60
RequiredAttributeNotReturned,
63
ET = importElementTree()
65
class StubOpenIDProvider(HTTPFetcher):
67
def __init__(self, base_url):
68
self.store = MemoryStore()
69
self.identity_url = base_url + 'identity'
70
self.localid_url = base_url + 'localid'
71
self.endpoint_url = base_url + 'endpoint'
72
self.server = Server(self.store, self.endpoint_url)
73
self.last_request = None
74
self.type_uris = ['http://specs.openid.net/auth/2.0/signon']
76
def fetch(self, url, body=None, headers=None):
77
if url == self.identity_url:
78
# Serve an XRDS document directly, pointing at our endpoint.
79
type_uris = ['<Type>%s</Type>' % uri for uri in self.type_uris]
81
url, 200, {'content-type': 'application/xrds+xml'}, """\
84
xmlns="xri://$xrd*($v*2.0)"
85
xmlns:xrds="xri://$xrds">
87
<Service priority="0">
94
""" % ('\n'.join(type_uris), self.endpoint_url, self.localid_url))
95
elif url.startswith(self.endpoint_url):
96
# Gather query parameters
99
query.update(cgi.parse_qsl(url.split('?', 1)[1]))
101
query.update(cgi.parse_qsl(body))
102
self.last_request = self.server.decodeRequest(query)
104
# The browser based requests should not be handled through
105
# the fetcher interface.
106
assert self.last_request.mode not in BROWSER_REQUEST_MODES
108
response = self.server.handleRequest(self.last_request)
109
webresponse = self.server.encodeResponse(response)
110
return HTTPResponse(url, webresponse.code, webresponse.headers,
113
raise HTTPFetchingError('unknown URL %s' % url)
115
def parseFormPost(self, content):
116
"""Parse an HTML form post to create an OpenID request."""
117
# Hack to make the javascript XML compliant ...
118
content = content.replace('i < elements.length',
119
'i < elements.length')
120
tree = ET.XML(content)
121
form = tree.find('.//form')
122
assert form is not None, 'No form in document'
123
assert form.get('action') == self.endpoint_url, (
124
'Form posts to %s instead of %s' % (form.get('action'),
127
for input in form.findall('input'):
128
if input.get('type') != 'hidden':
130
query[input.get('name').encode('UTF-8')] = \
131
input.get('value').encode('UTF-8')
132
self.last_request = self.server.decodeRequest(query)
133
return self.last_request
136
class DummyDjangoRequest(object):
137
def __init__(self, request_path):
138
self.request_path = request_path
140
'HTTP_HOST': "localhost",
141
'SCRIPT_NAME': "http://localhost",
142
'SERVER_PROTOCOL': "http",
145
'openid_identifier': "http://example.com/identity",
150
def get_full_path(self):
151
return self.META['SCRIPT_NAME'] + self.request_path
153
def build_absolute_uri(self):
154
return self.META['SCRIPT_NAME'] + self.request_path
156
def _combined_request(self):
158
request.update(self.POST)
159
request.update(self.GET)
161
REQUEST = property(_combined_request)
163
class RelyingPartyTests(TestCase):
164
urls = 'django_openid_auth.tests.urls'
167
super(RelyingPartyTests, self).setUp()
168
self.provider = StubOpenIDProvider('http://example.com/')
169
self.req = DummyDjangoRequest('http://localhost/')
170
self.endpoint = OpenIDServiceEndpoint()
171
self.endpoint.claimed_id = 'http://example.com/identity'
172
self.endpoint.server_url = 'http://example.com/'
173
self.consumer = make_consumer(self.req)
174
self.server = Server(DjangoOpenIDStore())
175
setDefaultFetcher(self.provider, wrap_exceptions=False)
177
self.old_login_redirect_url = getattr(settings, 'LOGIN_REDIRECT_URL', '/accounts/profile/')
178
self.old_create_users = getattr(settings, 'OPENID_CREATE_USERS', False)
179
self.old_strict_usernames = getattr(settings, 'OPENID_STRICT_USERNAMES', False)
180
self.old_update_details = getattr(settings, 'OPENID_UPDATE_DETAILS_FROM_SREG', False)
181
self.old_sso_server_url = getattr(settings, 'OPENID_SSO_SERVER_URL', None)
182
self.old_teams_map = getattr(settings, 'OPENID_LAUNCHPAD_TEAMS_MAPPING', {})
183
self.old_use_as_admin_login = getattr(settings, 'OPENID_USE_AS_ADMIN_LOGIN', False)
184
self.old_follow_renames = getattr(settings, 'OPENID_FOLLOW_RENAMES', False)
185
self.old_physical_multifactor = getattr(settings, 'OPENID_PHYSICAL_MULTIFACTOR_REQUIRED', False)
186
self.old_login_render_failure = getattr(settings, 'OPENID_RENDER_FAILURE', None)
187
self.old_consumer_complete = Consumer.complete
189
self.old_required_fields = getattr(
190
settings, 'OPENID_SREG_REQUIRED_FIELDS', [])
192
settings.OPENID_CREATE_USERS = False
193
settings.OPENID_STRICT_USERNAMES = False
194
settings.OPENID_UPDATE_DETAILS_FROM_SREG = False
195
settings.OPENID_SSO_SERVER_URL = None
196
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING = {}
197
settings.OPENID_USE_AS_ADMIN_LOGIN = False
198
settings.OPENID_FOLLOW_RENAMES = False
199
settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = False
200
settings.OPENID_SREG_REQUIRED_FIELDS = []
203
settings.LOGIN_REDIRECT_URL = self.old_login_redirect_url
204
settings.OPENID_CREATE_USERS = self.old_create_users
205
settings.OPENID_STRICT_USERNAMES = self.old_strict_usernames
206
settings.OPENID_UPDATE_DETAILS_FROM_SREG = self.old_update_details
207
settings.OPENID_SSO_SERVER_URL = self.old_sso_server_url
208
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING = self.old_teams_map
209
settings.OPENID_USE_AS_ADMIN_LOGIN = self.old_use_as_admin_login
210
settings.OPENID_FOLLOW_RENAMES = self.old_follow_renames
211
settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = self.old_physical_multifactor
212
settings.OPENID_RENDER_FAILURE = self.old_login_render_failure
213
Consumer.complete = self.old_consumer_complete
214
settings.OPENID_SREG_REQUIRED_FIELDS = self.old_required_fields
216
setDefaultFetcher(None)
217
super(RelyingPartyTests, self).tearDown()
219
def complete(self, openid_response):
220
"""Complete an OpenID authentication request."""
221
# The server can generate either a redirect or a form post
222
# here. For simplicity, force generation of a redirect.
223
openid_response.whichEncoding = lambda: ENCODE_URL
224
webresponse = self.provider.server.encodeResponse(openid_response)
225
self.assertEquals(webresponse.code, 302)
226
redirect_to = webresponse.headers['location']
227
self.assertTrue(redirect_to.startswith(
228
'http://testserver/openid/complete/'))
229
return self.client.get('/openid/complete/',
230
dict(cgi.parse_qsl(redirect_to.split('?', 1)[1])))
232
def test_login(self):
233
user = User.objects.create_user('someuser', 'someone@example.com')
234
useropenid = UserOpenID(
236
claimed_id='http://example.com/identity',
237
display_id='http://example.com/identity')
240
# The login form is displayed:
241
response = self.client.get('/openid/login/')
242
self.assertTemplateUsed(response, 'openid/login.html')
244
# Posting in an identity URL begins the authentication request:
245
response = self.client.post('/openid/login/',
246
{'openid_identifier': 'http://example.com/identity',
247
'next': '/getuser/'})
248
self.assertContains(response, 'OpenID transaction in progress')
250
openid_request = self.provider.parseFormPost(response.content)
251
self.assertEquals(openid_request.mode, 'checkid_setup')
252
self.assertTrue(openid_request.return_to.startswith(
253
'http://testserver/openid/complete/'))
255
# Complete the request. The user is redirected to the next URL.
256
openid_response = openid_request.answer(True)
257
response = self.complete(openid_response)
258
self.assertRedirects(response, 'http://testserver/getuser/')
260
# And they are now logged in:
261
response = self.client.get('/getuser/')
262
self.assertEquals(response.content, 'someuser')
264
def test_login_no_next(self):
265
"""Logins with no next parameter redirect to LOGIN_REDIRECT_URL."""
266
user = User.objects.create_user('someuser', 'someone@example.com')
267
useropenid = UserOpenID(
269
claimed_id='http://example.com/identity',
270
display_id='http://example.com/identity')
273
settings.LOGIN_REDIRECT_URL = '/getuser/'
274
response = self.client.post('/openid/login/',
275
{'openid_identifier': 'http://example.com/identity'})
276
self.assertContains(response, 'OpenID transaction in progress')
278
openid_request = self.provider.parseFormPost(response.content)
279
self.assertEquals(openid_request.mode, 'checkid_setup')
280
self.assertTrue(openid_request.return_to.startswith(
281
'http://testserver/openid/complete/'))
283
# Complete the request. The user is redirected to the next URL.
284
openid_response = openid_request.answer(True)
285
response = self.complete(openid_response)
286
self.assertRedirects(
287
response, 'http://testserver' + settings.LOGIN_REDIRECT_URL)
289
def test_login_sso(self):
290
settings.OPENID_SSO_SERVER_URL = 'http://example.com/identity'
291
user = User.objects.create_user('someuser', 'someone@example.com')
292
useropenid = UserOpenID(
294
claimed_id='http://example.com/identity',
295
display_id='http://example.com/identity')
298
# Requesting the login form immediately begins an
299
# authentication request.
300
response = self.client.get('/openid/login/', {'next': '/getuser/'})
301
self.assertEquals(response.status_code, 200)
302
self.assertContains(response, 'OpenID transaction in progress')
304
openid_request = self.provider.parseFormPost(response.content)
305
self.assertEquals(openid_request.mode, 'checkid_setup')
306
self.assertTrue(openid_request.return_to.startswith(
307
'http://testserver/openid/complete/'))
309
# Complete the request. The user is redirected to the next URL.
310
openid_response = openid_request.answer(True)
311
response = self.complete(openid_response)
312
self.assertRedirects(response, 'http://testserver/getuser/')
314
# And they are now logged in:
315
response = self.client.get('/getuser/')
316
self.assertEquals(response.content, 'someuser')
318
def test_login_create_users(self):
319
settings.OPENID_CREATE_USERS = True
320
# Create a user with the same name as we'll pass back via sreg.
321
User.objects.create_user('someuser', 'someone@example.com')
323
# Posting in an identity URL begins the authentication request:
324
response = self.client.post('/openid/login/',
325
{'openid_identifier': 'http://example.com/identity',
326
'next': '/getuser/'})
327
self.assertContains(response, 'OpenID transaction in progress')
329
# Complete the request, passing back some simple registration
330
# data. The user is redirected to the next URL.
331
openid_request = self.provider.parseFormPost(response.content)
332
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
333
openid_response = openid_request.answer(True)
334
sreg_response = sreg.SRegResponse.extractResponse(
335
sreg_request, {'nickname': 'someuser', 'fullname': 'Some User',
336
'email': 'foo@example.com'})
337
openid_response.addExtension(sreg_response)
338
response = self.complete(openid_response)
339
self.assertRedirects(response, 'http://testserver/getuser/')
341
# And they are now logged in as a new user (they haven't taken
342
# over the existing "someuser" user).
343
response = self.client.get('/getuser/')
344
self.assertEquals(response.content, 'someuser2')
346
# Check the details of the new user.
347
user = User.objects.get(username='someuser2')
348
self.assertEquals(user.first_name, 'Some')
349
self.assertEquals(user.last_name, 'User')
350
self.assertEquals(user.email, 'foo@example.com')
352
def _do_user_login(self, req_data, resp_data, use_sreg=True, use_pape=None):
353
openid_request = self._get_login_request(req_data)
354
openid_response = self._get_login_response(openid_request, resp_data, use_sreg, use_pape)
355
response = self.complete(openid_response)
356
self.assertRedirects(response, 'http://testserver/getuser/')
359
def _get_login_request(self, req_data):
360
# Posting in an identity URL begins the authentication request:
361
response = self.client.post('/openid/login/', req_data)
362
self.assertContains(response, 'OpenID transaction in progress')
364
# Complete the request, passing back some simple registration
365
# data. The user is redirected to the next URL.
366
openid_request = self.provider.parseFormPost(response.content)
367
return openid_request
369
def _get_login_response(self, openid_request, resp_data, use_sreg, use_pape):
370
openid_response = openid_request.answer(True)
373
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
374
sreg_response = sreg.SRegResponse.extractResponse(
375
sreg_request, resp_data)
376
openid_response.addExtension(sreg_response)
377
if use_pape is not None:
381
pape_response = pape.Response(auth_policies=policies)
382
openid_response.addExtension(pape_response)
383
return openid_response
385
def parse_query_string(self, query_str):
386
query_items = map(tuple,
387
[item.split('=') for item in query_str.split('&')])
388
query = dict(query_items)
391
def test_login_physical_multifactor_request(self):
392
settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = True
393
preferred_auth = pape.AUTH_MULTI_FACTOR_PHYSICAL
394
self.provider.type_uris.append(pape.ns_uri)
396
openid_req = {'openid_identifier': 'http://example.com/identity',
398
response = self.client.post('/openid/login/', openid_req)
399
openid_request = self.provider.parseFormPost(response.content)
401
request_auth = openid_request.message.getArg(
402
'http://specs.openid.net/extensions/pape/1.0',
403
'preferred_auth_policies',
405
self.assertEqual(request_auth, preferred_auth)
407
def test_login_physical_multifactor_response(self):
408
settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = True
409
preferred_auth = pape.AUTH_MULTI_FACTOR_PHYSICAL
410
self.provider.type_uris.append(pape.ns_uri)
412
def mock_complete(this, request_args, return_to):
413
request = {'openid.mode': 'checkid_setup',
414
'openid.trust_root': 'http://localhost/',
415
'openid.return_to': 'http://localhost/',
416
'openid.identity': IDENTIFIER_SELECT,
417
'openid.ns.pape' : pape.ns_uri,
418
'openid.pape.auth_policies': request_args.get('openid.pape.auth_policies', pape.AUTH_NONE),
420
openid_server = self.provider.server
421
orequest = openid_server.decodeRequest(request)
422
response = SuccessResponse(
423
self.endpoint, orequest.message,
424
signed_fields=['openid.pape.auth_policies',])
426
Consumer.complete = mock_complete
428
user = User.objects.create_user('testuser', 'test@example.com')
429
useropenid = UserOpenID(
431
claimed_id='http://example.com/identity',
432
display_id='http://example.com/identity')
435
openid_req = {'openid_identifier': 'http://example.com/identity',
437
openid_resp = {'nickname': 'testuser', 'fullname': 'Openid User',
438
'email': 'test@example.com'}
440
response = self._do_user_login(openid_req, openid_resp, use_pape=pape.AUTH_MULTI_FACTOR_PHYSICAL)
442
query = self.parse_query_string(response.request['QUERY_STRING'])
443
self.assertTrue('openid.pape.auth_policies' in query)
444
self.assertEqual(query['openid.pape.auth_policies'],
445
quote_plus(preferred_auth))
447
response = self.client.get('/getuser/')
448
self.assertEqual(response.content, 'testuser')
451
def test_login_physical_multifactor_not_provided(self):
452
settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = True
453
preferred_auth = pape.AUTH_MULTI_FACTOR_PHYSICAL
454
self.provider.type_uris.append(pape.ns_uri)
456
def mock_complete(this, request_args, return_to):
457
request = {'openid.mode': 'checkid_setup',
458
'openid.trust_root': 'http://localhost/',
459
'openid.return_to': 'http://localhost/',
460
'openid.identity': IDENTIFIER_SELECT,
461
'openid.ns.pape' : pape.ns_uri,
462
'openid.pape.auth_policies': request_args.get('openid.pape.auth_policies', pape.AUTH_NONE),
464
openid_server = self.provider.server
465
orequest = openid_server.decodeRequest(request)
466
response = SuccessResponse(
467
self.endpoint, orequest.message,
468
signed_fields=['openid.pape.auth_policies',])
470
Consumer.complete = mock_complete
472
user = User.objects.create_user('testuser', 'test@example.com')
473
useropenid = UserOpenID(
475
claimed_id='http://example.com/identity',
476
display_id='http://example.com/identity')
479
openid_req = {'openid_identifier': 'http://example.com/identity',
481
openid_resp = {'nickname': 'testuser', 'fullname': 'Openid User',
482
'email': 'test@example.com'}
484
openid_request = self._get_login_request(openid_req)
485
openid_response = self._get_login_response(openid_request, openid_req, openid_resp, use_pape=pape.AUTH_NONE)
487
response_auth = openid_request.message.getArg(
488
'http://specs.openid.net/extensions/pape/1.0',
491
self.assertNotEqual(response_auth, preferred_auth)
493
response = self.complete(openid_response)
494
self.assertEquals(403, response.status_code)
495
self.assertContains(response, '<h1>OpenID failed</h1>', status_code=403)
496
self.assertContains(response, '<p>Login requires physical multi-factor authentication.</p>', status_code=403)
498
def test_login_physical_multifactor_not_provided_override(self):
499
settings.OPENID_PHYSICAL_MULTIFACTOR_REQUIRED = True
500
preferred_auth = pape.AUTH_MULTI_FACTOR_PHYSICAL
501
self.provider.type_uris.append(pape.ns_uri)
503
# Override the login_failure handler
504
def mock_login_failure_handler(request, message, status=403,
507
self.assertTrue(isinstance(exception, MissingPhysicalMultiFactor))
508
return HttpResponse('Test Failure Override', status=200)
509
settings.OPENID_RENDER_FAILURE = mock_login_failure_handler
511
def mock_complete(this, request_args, return_to):
512
request = {'openid.mode': 'checkid_setup',
513
'openid.trust_root': 'http://localhost/',
514
'openid.return_to': 'http://localhost/',
515
'openid.identity': IDENTIFIER_SELECT,
516
'openid.ns.pape' : pape.ns_uri,
517
'openid.pape.auth_policies': request_args.get('openid.pape.auth_policies', pape.AUTH_NONE),
519
openid_server = self.provider.server
520
orequest = openid_server.decodeRequest(request)
521
response = SuccessResponse(
522
self.endpoint, orequest.message,
523
signed_fields=['openid.pape.auth_policies',])
525
Consumer.complete = mock_complete
527
user = User.objects.create_user('testuser', 'test@example.com')
528
useropenid = UserOpenID(
530
claimed_id='http://example.com/identity',
531
display_id='http://example.com/identity')
534
openid_req = {'openid_identifier': 'http://example.com/identity',
536
openid_resp = {'nickname': 'testuser', 'fullname': 'Openid User',
537
'email': 'test@example.com'}
539
openid_request = self._get_login_request(openid_req)
540
openid_response = self._get_login_response(openid_request, openid_req, openid_resp, use_pape=pape.AUTH_NONE)
542
response_auth = openid_request.message.getArg(
543
'http://specs.openid.net/extensions/pape/1.0',
546
self.assertNotEqual(response_auth, preferred_auth)
548
# Status code should be 200, since we over-rode the login_failure handler
549
response = self.complete(openid_response)
550
self.assertEquals(200, response.status_code)
551
self.assertContains(response, 'Test Failure Override')
553
def test_login_without_nickname(self):
554
settings.OPENID_CREATE_USERS = True
556
openid_req = {'openid_identifier': 'http://example.com/identity',
558
openid_resp = {'nickname': '', 'fullname': 'Openid User',
559
'email': 'foo@example.com'}
560
self._do_user_login(openid_req, openid_resp)
561
response = self.client.get('/getuser/')
563
# username defaults to 'openiduser'
564
self.assertEquals(response.content, 'openiduser')
566
# The user's full name and email have been updated.
567
user = User.objects.get(username=response.content)
568
self.assertEquals(user.first_name, 'Openid')
569
self.assertEquals(user.last_name, 'User')
570
self.assertEquals(user.email, 'foo@example.com')
572
def test_login_follow_rename(self):
573
settings.OPENID_FOLLOW_RENAMES = True
574
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
575
user = User.objects.create_user('testuser', 'someone@example.com')
576
useropenid = UserOpenID(
578
claimed_id='http://example.com/identity',
579
display_id='http://example.com/identity')
582
openid_req = {'openid_identifier': 'http://example.com/identity',
584
openid_resp = {'nickname': 'someuser', 'fullname': 'Some User',
585
'email': 'foo@example.com'}
586
self._do_user_login(openid_req, openid_resp)
587
response = self.client.get('/getuser/')
589
# If OPENID_FOLLOW_RENAMES, they are logged in as
590
# someuser (the passed in nickname has changed the username)
591
self.assertEquals(response.content, 'someuser')
593
# The user's full name and email have been updated.
594
user = User.objects.get(username=response.content)
595
self.assertEquals(user.first_name, 'Some')
596
self.assertEquals(user.last_name, 'User')
597
self.assertEquals(user.email, 'foo@example.com')
599
def test_login_follow_rename_without_nickname_change(self):
600
settings.OPENID_FOLLOW_RENAMES = True
601
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
602
settings.OPENID_STRICT_USERNAMES = True
603
user = User.objects.create_user('testuser', 'someone@example.com')
604
useropenid = UserOpenID(
606
claimed_id='http://example.com/identity',
607
display_id='http://example.com/identity')
610
openid_req = {'openid_identifier': 'http://example.com/identity',
612
openid_resp = {'nickname': 'testuser', 'fullname': 'Some User',
613
'email': 'foo@example.com'}
614
self._do_user_login(openid_req, openid_resp)
615
response = self.client.get('/getuser/')
617
# Username should not have changed
618
self.assertEquals(response.content, 'testuser')
620
# The user's full name and email have been updated.
621
user = User.objects.get(username=response.content)
622
self.assertEquals(user.first_name, 'Some')
623
self.assertEquals(user.last_name, 'User')
624
self.assertEquals(user.email, 'foo@example.com')
626
def test_login_follow_rename_conflict(self):
627
settings.OPENID_FOLLOW_RENAMES = True
628
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
629
# Setup existing user who's name we're going to switch to
630
user = User.objects.create_user('testuser', 'someone@example.com')
631
UserOpenID.objects.get_or_create(
633
claimed_id='http://example.com/existing_identity',
634
display_id='http://example.com/existing_identity')
636
# Setup user who is going to try to change username to 'testuser'
637
renamed_user = User.objects.create_user('renameuser', 'someone@example.com')
638
UserOpenID.objects.get_or_create(
640
claimed_id='http://example.com/identity',
641
display_id='http://example.com/identity')
643
# identity url is for 'renameuser'
644
openid_req = {'openid_identifier': 'http://example.com/identity',
646
# but returned username is for 'testuser', which already exists for another identity
647
openid_resp = {'nickname': 'testuser', 'fullname': 'Rename User',
648
'email': 'rename@example.com'}
649
self._do_user_login(openid_req, openid_resp)
650
response = self.client.get('/getuser/')
652
# If OPENID_FOLLOW_RENAMES, attempt to change username to 'testuser'
653
# but since that username is already taken by someone else, we go through
654
# the process of adding +i to it, and get testuser2.
655
self.assertEquals(response.content, 'testuser2')
657
# The user's full name and email have been updated.
658
user = User.objects.get(username=response.content)
659
self.assertEquals(user.first_name, 'Rename')
660
self.assertEquals(user.last_name, 'User')
661
self.assertEquals(user.email, 'rename@example.com')
663
def test_login_follow_rename_false_onlyonce(self):
664
settings.OPENID_FOLLOW_RENAMES = True
665
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
666
# Setup existing user who's name we're going to switch to
667
user = User.objects.create_user('testuser', 'someone@example.com')
668
UserOpenID.objects.get_or_create(
670
claimed_id='http://example.com/existing_identity',
671
display_id='http://example.com/existing_identity')
673
# Setup user who is going to try to change username to 'testuser'
674
renamed_user = User.objects.create_user('testuser2000eight', 'someone@example.com')
675
UserOpenID.objects.get_or_create(
677
claimed_id='http://example.com/identity',
678
display_id='http://example.com/identity')
680
# identity url is for 'testuser2000eight'
681
openid_req = {'openid_identifier': 'http://example.com/identity',
683
# but returned username is for 'testuser', which already exists for another identity
684
openid_resp = {'nickname': 'testuser2', 'fullname': 'Rename User',
685
'email': 'rename@example.com'}
686
self._do_user_login(openid_req, openid_resp)
687
response = self.client.get('/getuser/')
689
# If OPENID_FOLLOW_RENAMES, attempt to change username to 'testuser'
690
# but since that username is already taken by someone else, we go through
691
# the process of adding +i to it. Even though it looks like the username
692
# follows the nickname+i scheme, it has non-numbers in the suffix, so
693
# it's not an auto-generated one. The regular process of renaming to
694
# 'testuser' has a conflict, so we get +2 at the end.
695
self.assertEquals(response.content, 'testuser2')
697
# The user's full name and email have been updated.
698
user = User.objects.get(username=response.content)
699
self.assertEquals(user.first_name, 'Rename')
700
self.assertEquals(user.last_name, 'User')
701
self.assertEquals(user.email, 'rename@example.com')
703
def test_login_follow_rename_conflict_onlyonce(self):
704
settings.OPENID_FOLLOW_RENAMES = True
705
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
706
# Setup existing user who's name we're going to switch to
707
user = User.objects.create_user('testuser', 'someone@example.com')
708
UserOpenID.objects.get_or_create(
710
claimed_id='http://example.com/existing_identity',
711
display_id='http://example.com/existing_identity')
713
# Setup user who is going to try to change username to 'testuser'
714
renamed_user = User.objects.create_user('testuser2000', 'someone@example.com')
715
UserOpenID.objects.get_or_create(
717
claimed_id='http://example.com/identity',
718
display_id='http://example.com/identity')
720
# identity url is for 'testuser2000'
721
openid_req = {'openid_identifier': 'http://example.com/identity',
723
# but returned username is for 'testuser', which already exists for another identity
724
openid_resp = {'nickname': 'testuser', 'fullname': 'Rename User',
725
'email': 'rename@example.com'}
726
self._do_user_login(openid_req, openid_resp)
727
response = self.client.get('/getuser/')
729
# If OPENID_FOLLOW_RENAMES, attempt to change username to 'testuser'
730
# but since that username is already taken by someone else, we go through
731
# the process of adding +i to it. Since the user for this identity url
732
# already has a name matching that pattern, check if first.
733
self.assertEquals(response.content, 'testuser2000')
735
# The user's full name and email have been updated.
736
user = User.objects.get(username=response.content)
737
self.assertEquals(user.first_name, 'Rename')
738
self.assertEquals(user.last_name, 'User')
739
self.assertEquals(user.email, 'rename@example.com')
741
def test_login_follow_rename_false_conflict(self):
742
settings.OPENID_FOLLOW_RENAMES = True
743
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
744
# Setup existing user who's username matches the name+i pattern
745
user = User.objects.create_user('testuser2', 'someone@example.com')
746
UserOpenID.objects.get_or_create(
748
claimed_id='http://example.com/identity',
749
display_id='http://example.com/identity')
751
# identity url is for 'testuser2'
752
openid_req = {'openid_identifier': 'http://example.com/identity',
754
# but returned username is for 'testuser', which looks like we've done
755
# a username+1 for them already, but 'testuser' isn't actually taken
756
openid_resp = {'nickname': 'testuser', 'fullname': 'Same User',
757
'email': 'same@example.com'}
758
self._do_user_login(openid_req, openid_resp)
759
response = self.client.get('/getuser/')
761
# If OPENID_FOLLOW_RENAMES, username should be changed to 'testuser'
762
# because it wasn't currently taken
763
self.assertEquals(response.content, 'testuser')
765
# The user's full name and email have been updated.
766
user = User.objects.get(username=response.content)
767
self.assertEquals(user.first_name, 'Same')
768
self.assertEquals(user.last_name, 'User')
769
self.assertEquals(user.email, 'same@example.com')
771
def test_strict_username_no_nickname(self):
772
settings.OPENID_CREATE_USERS = True
773
settings.OPENID_STRICT_USERNAMES = True
774
settings.OPENID_SREG_REQUIRED_FIELDS = []
776
# Posting in an identity URL begins the authentication request:
777
response = self.client.post('/openid/login/',
778
{'openid_identifier': 'http://example.com/identity',
779
'next': '/getuser/'})
780
self.assertContains(response, 'OpenID transaction in progress')
782
# Complete the request, passing back some simple registration
783
# data. The user is redirected to the next URL.
784
openid_request = self.provider.parseFormPost(response.content)
785
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
786
openid_response = openid_request.answer(True)
787
sreg_response = sreg.SRegResponse.extractResponse(
788
sreg_request, {'nickname': '', # No nickname
789
'fullname': 'Some User',
790
'email': 'foo@example.com'})
791
openid_response.addExtension(sreg_response)
792
response = self.complete(openid_response)
794
# Status code should be 403: Forbidden
795
self.assertEquals(403, response.status_code)
796
self.assertContains(response, '<h1>OpenID failed</h1>', status_code=403)
797
self.assertContains(response, "An attribute required for logging in was not returned "
798
"(nickname)", status_code=403)
800
def test_strict_username_no_nickname_override(self):
801
settings.OPENID_CREATE_USERS = True
802
settings.OPENID_STRICT_USERNAMES = True
803
settings.OPENID_SREG_REQUIRED_FIELDS = []
805
# Override the login_failure handler
806
def mock_login_failure_handler(request, message, status=403,
809
self.assertTrue(isinstance(exception, (RequiredAttributeNotReturned, MissingUsernameViolation)))
810
return HttpResponse('Test Failure Override', status=200)
811
settings.OPENID_RENDER_FAILURE = mock_login_failure_handler
813
# Posting in an identity URL begins the authentication request:
814
response = self.client.post('/openid/login/',
815
{'openid_identifier': 'http://example.com/identity',
816
'next': '/getuser/'})
817
self.assertContains(response, 'OpenID transaction in progress')
819
# Complete the request, passing back some simple registration
820
# data. The user is redirected to the next URL.
821
openid_request = self.provider.parseFormPost(response.content)
822
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
823
openid_response = openid_request.answer(True)
824
sreg_response = sreg.SRegResponse.extractResponse(
825
sreg_request, {'nickname': '', # No nickname
826
'fullname': 'Some User',
827
'email': 'foo@example.com'})
828
openid_response.addExtension(sreg_response)
829
response = self.complete(openid_response)
831
# Status code should be 200, since we over-rode the login_failure handler
832
self.assertEquals(200, response.status_code)
833
self.assertContains(response, 'Test Failure Override')
835
def test_strict_username_duplicate_user(self):
836
settings.OPENID_CREATE_USERS = True
837
settings.OPENID_STRICT_USERNAMES = True
838
# Create a user with the same name as we'll pass back via sreg.
839
user = User.objects.create_user('someuser', 'someone@example.com')
840
useropenid = UserOpenID(
842
claimed_id='http://example.com/different_identity',
843
display_id='http://example.com/different_identity')
846
# Posting in an identity URL begins the authentication request:
847
response = self.client.post('/openid/login/',
848
{'openid_identifier': 'http://example.com/identity',
849
'next': '/getuser/'})
850
self.assertContains(response, 'OpenID transaction in progress')
852
# Complete the request, passing back some simple registration
853
# data. The user is redirected to the next URL.
854
openid_request = self.provider.parseFormPost(response.content)
855
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
856
openid_response = openid_request.answer(True)
857
sreg_response = sreg.SRegResponse.extractResponse(
858
sreg_request, {'nickname': 'someuser', 'fullname': 'Some User',
859
'email': 'foo@example.com'})
860
openid_response.addExtension(sreg_response)
861
response = self.complete(openid_response)
863
# Status code should be 403: Forbidden
864
self.assertEquals(403, response.status_code)
865
self.assertContains(response, '<h1>OpenID failed</h1>', status_code=403)
866
self.assertContains(response,
867
"The username (someuser) with which you tried to log in is "
868
"already in use for a different account.",
871
def test_strict_username_duplicate_user_override(self):
872
settings.OPENID_CREATE_USERS = True
873
settings.OPENID_STRICT_USERNAMES = True
875
# Override the login_failure handler
876
def mock_login_failure_handler(request, message, status=403,
879
self.assertTrue(isinstance(exception, DuplicateUsernameViolation))
880
return HttpResponse('Test Failure Override', status=200)
881
settings.OPENID_RENDER_FAILURE = mock_login_failure_handler
883
# Create a user with the same name as we'll pass back via sreg.
884
user = User.objects.create_user('someuser', 'someone@example.com')
885
useropenid = UserOpenID(
887
claimed_id='http://example.com/different_identity',
888
display_id='http://example.com/different_identity')
891
# Posting in an identity URL begins the authentication request:
892
response = self.client.post('/openid/login/',
893
{'openid_identifier': 'http://example.com/identity',
894
'next': '/getuser/'})
895
self.assertContains(response, 'OpenID transaction in progress')
897
# Complete the request, passing back some simple registration
898
# data. The user is redirected to the next URL.
899
openid_request = self.provider.parseFormPost(response.content)
900
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
901
openid_response = openid_request.answer(True)
902
sreg_response = sreg.SRegResponse.extractResponse(
903
sreg_request, {'nickname': 'someuser', 'fullname': 'Some User',
904
'email': 'foo@example.com'})
905
openid_response.addExtension(sreg_response)
906
response = self.complete(openid_response)
908
# Status code should be 200, since we over-rode the login_failure handler
909
self.assertEquals(200, response.status_code)
910
self.assertContains(response, 'Test Failure Override')
912
def test_login_requires_sreg_required_fields(self):
913
# If any required attributes are not included in the response,
914
# we fail with a forbidden.
915
settings.OPENID_CREATE_USERS = True
916
settings.OPENID_SREG_REQUIRED_FIELDS = ('email', 'language')
917
# Posting in an identity URL begins the authentication request:
918
response = self.client.post('/openid/login/',
919
{'openid_identifier': 'http://example.com/identity',
920
'next': '/getuser/'})
921
self.assertContains(response, 'OpenID transaction in progress')
923
# Complete the request, passing back some simple registration
924
# data. The user is redirected to the next URL.
925
openid_request = self.provider.parseFormPost(response.content)
926
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
927
openid_response = openid_request.answer(True)
928
sreg_response = sreg.SRegResponse.extractResponse(
929
sreg_request, {'nickname': 'foo',
930
'fullname': 'Some User',
931
'email': 'foo@example.com'})
932
openid_response.addExtension(sreg_response)
933
response = self.complete(openid_response)
935
# Status code should be 403: Forbidden as we didn't include
936
# a required field - language.
937
self.assertContains(response,
938
"An attribute required for logging in was not returned "
939
"(language)", status_code=403)
941
def test_login_update_details(self):
942
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
943
user = User.objects.create_user('testuser', 'someone@example.com')
944
useropenid = UserOpenID(
946
claimed_id='http://example.com/identity',
947
display_id='http://example.com/identity')
950
openid_req = {'openid_identifier': 'http://example.com/identity',
952
openid_resp = {'nickname': 'testuser', 'fullname': 'Some User',
953
'email': 'foo@example.com'}
954
self._do_user_login(openid_req, openid_resp)
955
response = self.client.get('/getuser/')
957
self.assertEquals(response.content, 'testuser')
959
# The user's full name and email have been updated.
960
user = User.objects.get(username=response.content)
961
self.assertEquals(user.first_name, 'Some')
962
self.assertEquals(user.last_name, 'User')
963
self.assertEquals(user.email, 'foo@example.com')
965
def test_login_uses_sreg_extra_fields(self):
966
# The configurable sreg attributes are used in the request.
967
settings.OPENID_SREG_EXTRA_FIELDS = ('language',)
968
user = User.objects.create_user('testuser', 'someone@example.com')
969
useropenid = UserOpenID(
971
claimed_id='http://example.com/identity',
972
display_id='http://example.com/identity')
975
# Posting in an identity URL begins the authentication request:
976
response = self.client.post('/openid/login/',
977
{'openid_identifier': 'http://example.com/identity',
978
'next': '/getuser/'})
980
openid_request = self.provider.parseFormPost(response.content)
981
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
982
for field in ('email', 'fullname', 'nickname', 'language'):
983
self.assertTrue(field in sreg_request)
985
def test_login_uses_sreg_required_fields(self):
986
# The configurable sreg attributes are used in the request.
987
settings.OPENID_SREG_REQUIRED_FIELDS = ('email', 'language')
988
user = User.objects.create_user('testuser', 'someone@example.com')
989
useropenid = UserOpenID(
991
claimed_id='http://example.com/identity',
992
display_id='http://example.com/identity')
995
# Posting in an identity URL begins the authentication request:
996
response = self.client.post('/openid/login/',
997
{'openid_identifier': 'http://example.com/identity',
998
'next': '/getuser/'})
1000
openid_request = self.provider.parseFormPost(response.content)
1001
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
1003
self.assertEqual(['email', 'language'], sreg_request.required)
1004
self.assertEqual(['fullname', 'nickname'], sreg_request.optional)
1006
def test_login_attribute_exchange(self):
1007
settings.OPENID_UPDATE_DETAILS_FROM_SREG = True
1008
user = User.objects.create_user('testuser', 'someone@example.com')
1009
useropenid = UserOpenID(
1011
claimed_id='http://example.com/identity',
1012
display_id='http://example.com/identity')
1015
# Configure the provider to advertise attribute exchange
1016
# protocol and start the authentication process:
1017
self.provider.type_uris.append('http://openid.net/srv/ax/1.0')
1018
response = self.client.post('/openid/login/',
1019
{'openid_identifier': 'http://example.com/identity',
1020
'next': '/getuser/'})
1021
self.assertContains(response, 'OpenID transaction in progress')
1023
# The resulting OpenID request uses the Attribute Exchange
1024
# extension rather than the Simple Registration extension.
1025
openid_request = self.provider.parseFormPost(response.content)
1026
sreg_request = sreg.SRegRequest.fromOpenIDRequest(openid_request)
1027
self.assertEqual(sreg_request.required, [])
1028
self.assertEqual(sreg_request.optional, [])
1030
fetch_request = ax.FetchRequest.fromOpenIDRequest(openid_request)
1031
self.assertTrue(fetch_request.has_key(
1032
'http://axschema.org/contact/email'))
1033
self.assertTrue(fetch_request.has_key(
1034
'http://axschema.org/namePerson'))
1035
self.assertTrue(fetch_request.has_key(
1036
'http://axschema.org/namePerson/first'))
1037
self.assertTrue(fetch_request.has_key(
1038
'http://axschema.org/namePerson/last'))
1039
self.assertTrue(fetch_request.has_key(
1040
'http://axschema.org/namePerson/friendly'))
1041
# myOpenID compatibilty attributes:
1042
self.assertTrue(fetch_request.has_key(
1043
'http://schema.openid.net/contact/email'))
1044
self.assertTrue(fetch_request.has_key(
1045
'http://schema.openid.net/namePerson'))
1046
self.assertTrue(fetch_request.has_key(
1047
'http://schema.openid.net/namePerson/friendly'))
1049
# Build up a response including AX data.
1050
openid_response = openid_request.answer(True)
1051
fetch_response = ax.FetchResponse(fetch_request)
1052
fetch_response.addValue(
1053
'http://axschema.org/contact/email', 'foo@example.com')
1054
fetch_response.addValue(
1055
'http://axschema.org/namePerson/first', 'Firstname')
1056
fetch_response.addValue(
1057
'http://axschema.org/namePerson/last', 'Lastname')
1058
fetch_response.addValue(
1059
'http://axschema.org/namePerson/friendly', 'someuser')
1060
openid_response.addExtension(fetch_response)
1061
response = self.complete(openid_response)
1062
self.assertRedirects(response, 'http://testserver/getuser/')
1064
# And they are now logged in as testuser (the passed in
1065
# nickname has not caused the username to change).
1066
response = self.client.get('/getuser/')
1067
self.assertEquals(response.content, 'testuser')
1069
# The user's full name and email have been updated.
1070
user = User.objects.get(username='testuser')
1071
self.assertEquals(user.first_name, 'Firstname')
1072
self.assertEquals(user.last_name, 'Lastname')
1073
self.assertEquals(user.email, 'foo@example.com')
1075
def test_login_teams(self):
1076
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO = False
1077
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING = {'teamname': 'groupname',
1078
'otherteam': 'othergroup'}
1079
user = User.objects.create_user('testuser', 'someone@example.com')
1080
group = Group(name='groupname')
1082
ogroup = Group(name='othergroup')
1084
user.groups.add(ogroup)
1086
useropenid = UserOpenID(
1088
claimed_id='http://example.com/identity',
1089
display_id='http://example.com/identity')
1092
# Posting in an identity URL begins the authentication request:
1093
response = self.client.post('/openid/login/',
1094
{'openid_identifier': 'http://example.com/identity',
1095
'next': '/getuser/'})
1096
self.assertContains(response, 'OpenID transaction in progress')
1098
# Complete the request
1099
openid_request = self.provider.parseFormPost(response.content)
1100
openid_response = openid_request.answer(True)
1101
teams_request = teams.TeamsRequest.fromOpenIDRequest(openid_request)
1102
teams_response = teams.TeamsResponse.extractResponse(
1103
teams_request, 'teamname,some-other-team')
1104
openid_response.addExtension(teams_response)
1105
response = self.complete(openid_response)
1106
self.assertRedirects(response, 'http://testserver/getuser/')
1108
# And they are now logged in as testuser
1109
response = self.client.get('/getuser/')
1110
self.assertEquals(response.content, 'testuser')
1112
# The user's groups have been updated.
1113
user = User.objects.get(username='testuser')
1114
self.assertTrue(group in user.groups.all())
1115
self.assertTrue(ogroup not in user.groups.all())
1117
def test_login_teams_automapping(self):
1118
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING = {'teamname': 'groupname',
1119
'otherteam': 'othergroup'}
1120
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO = True
1121
settings.OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO_BLACKLIST = ['django-group1', 'django-group2']
1122
user = User.objects.create_user('testuser', 'someone@example.com')
1123
group1 = Group(name='django-group1')
1125
group2 = Group(name='django-group2')
1127
group3 = Group(name='django-group3')
1130
useropenid = UserOpenID(
1132
claimed_id='http://example.com/identity',
1133
display_id='http://example.com/identity')
1136
# Posting in an identity URL begins the authentication request:
1137
response = self.client.post('/openid/login/',
1138
{'openid_identifier': 'http://example.com/identity',
1139
'next': '/getuser/'})
1140
self.assertContains(response, 'OpenID transaction in progress')
1142
# Complete the request
1143
openid_request = self.provider.parseFormPost(response.content)
1144
openid_response = openid_request.answer(True)
1145
teams_request = teams.TeamsRequest.fromOpenIDRequest(openid_request)
1147
self.assertEqual(group1 in user.groups.all(), False)
1148
self.assertEqual(group2 in user.groups.all(), False)
1149
self.assertTrue(group3 not in user.groups.all())
1151
def test_login_teams_staff_not_defined(self):
1152
delattr(settings, 'OPENID_LAUNCHPAD_STAFF_TEAMS')
1153
user = User.objects.create_user('testuser', 'someone@example.com')
1154
user.is_staff = True
1156
self.assertTrue(user.is_staff)
1158
user = self.get_openid_authed_user_with_teams(user, 'teamname,some-other-team')
1159
self.assertTrue(user.is_staff)
1161
def test_login_teams_staff_assignment(self):
1162
settings.OPENID_LAUNCHPAD_STAFF_TEAMS = ('teamname',)
1163
user = User.objects.create_user('testuser', 'someone@example.com')
1164
user.is_staff = False
1166
self.assertFalse(user.is_staff)
1168
user = self.get_openid_authed_user_with_teams(user, 'teamname,some-other-team')
1169
self.assertTrue(user.is_staff)
1171
def test_login_teams_staff_unassignment(self):
1172
settings.OPENID_LAUNCHPAD_STAFF_TEAMS = ('different-teamname',)
1173
user = User.objects.create_user('testuser', 'someone@example.com')
1174
user.is_staff = True
1176
self.assertTrue(user.is_staff)
1178
user = self.get_openid_authed_user_with_teams(user, 'teamname,some-other-team')
1179
self.assertFalse(user.is_staff)
1181
def get_openid_authed_user_with_teams(self, user, teams_str):
1182
useropenid = UserOpenID(
1184
claimed_id='http://example.com/identity',
1185
display_id='http://example.com/identity')
1188
# Posting in an identity URL begins the authentication request:
1189
response = self.client.post('/openid/login/',
1190
{'openid_identifier': 'http://example.com/identity'})
1192
# Complete the request
1193
openid_request = self.provider.parseFormPost(response.content)
1194
openid_response = openid_request.answer(True)
1195
teams_request = teams.TeamsRequest.fromOpenIDRequest(openid_request)
1196
teams_response = teams.TeamsResponse.extractResponse(
1197
teams_request, teams_str)
1198
openid_response.addExtension(teams_response)
1199
response = self.complete(openid_response)
1200
return User.objects.get(username=user.username)
1202
def test_login_complete_signals_login(self):
1203
# An oauth_login_complete signal is emitted including the
1204
# request and sreg_response.
1205
user = User.objects.create_user('someuser', 'someone@example.com')
1206
useropenid = UserOpenID(
1208
claimed_id='http://example.com/identity',
1209
display_id='http://example.com/identity')
1211
response = self.client.post('/openid/login/',
1212
{'openid_identifier': 'http://example.com/identity'})
1213
openid_request = self.provider.parseFormPost(response.content)
1214
openid_response = openid_request.answer(True)
1215
# Use a closure to test whether the signal handler was called.
1216
self.signal_handler_called = False
1217
def login_callback(sender, **kwargs):
1218
self.assertTrue(isinstance(
1219
kwargs.get('request', None), HttpRequest))
1220
self.assertTrue(isinstance(
1221
kwargs.get('openid_response', None), SuccessResponse))
1222
self.signal_handler_called = True
1223
openid_login_complete.connect(login_callback)
1225
response = self.complete(openid_response)
1227
self.assertTrue(self.signal_handler_called)
1228
openid_login_complete.disconnect(login_callback)
1231
class HelperFunctionsTest(TestCase):
1232
def test_sanitise_redirect_url(self):
1233
settings.ALLOWED_EXTERNAL_OPENID_REDIRECT_DOMAINS = [
1234
"example.com", "example.org"]
1235
# list of URLs and whether they should be passed or not
1237
("http://example.com", True),
1238
("http://example.org/", True),
1239
("http://example.org/foo/bar", True),
1240
("http://example.org/foo/bar?baz=quux", True),
1241
("http://example.org:9999/foo/bar?baz=quux", True),
1242
("http://www.example.org/", False),
1243
("http://example.net/foo/bar?baz=quux", False),
1244
("/somewhere/local", True),
1245
("/somewhere/local?url=http://fail.com/bar", True),
1246
# An empty path, as seen when no "next" parameter is passed.
1248
("/path with spaces", False),
1250
for url, returns_self in urls:
1251
sanitised = sanitise_redirect_url(url)
1253
self.assertEqual(url, sanitised)
1255
self.assertEqual(settings.LOGIN_REDIRECT_URL, sanitised)
1258
return unittest.TestLoader().loadTestsFromName(__name__)