1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233 | from django.http import HttpResponse, HttpResponseRedirect
from django.shortcuts import render_to_response
from models import *
OPENID_FILESTORE = '/tmp/openid-filestore'
from openid.store.filestore import FileOpenIDStore
from openid.server.server import Server
from idproxynet.idproxy.views import yhash_from_cookie
from idproxynet import nonceprotect
import datetime, pickle
def openid_server(req):
"""
This view is the actual OpenID server - running at the URL pointed to by
the <link rel="openid.server"> tag.
"""
server = Server(FileOpenIDStore(OPENID_FILESTORE))
# Clear AuthorizationInfo session var, if it is set
if req.session.get('AuthorizationInfo', None):
del req.session['AuthorizationInfo']
querydict = dict(req.REQUEST.items())
orequest = server.decodeRequest(querydict)
if not orequest:
return HttpResponse("This is an OpenID server")
if orequest.mode in ("checkid_immediate", "checkid_setup"):
if openid_is_authorized(req, orequest.identity,
orequest.trust_root):
oresponse = orequest.answer(True)
elif orequest.immediate:
oresponse = orequest.answer(False, 'http://idproxy.net/openid/server/')
else:
return openid_decide_page(req, orequest)
else:
oresponse = server.handleRequest(orequest)
webresponse = server.encodeResponse(oresponse)
return django_response(webresponse)
def django_response(webresponse):
"Convert a webresponse from the OpenID library in to a Django HttpResponse"
response = HttpResponse(webresponse.body)
response.status_code = webresponse.code
for key, value in webresponse.headers.items():
response[key] = value
return response
def pending_request(req, id):
"""
This page allows users to continue a pending request - i.e. one that was
stashed away because they weren't authenticated with the site.
"""
# Allows user to retry an OpenID response stashed in PendingRequest table
yhash = yhash_from_cookie(req)
if not yhash:
return error_page(req, "You are not signed in")
try:
yahoouser = YahooUser.objects.get(yahoo_hash = yhash)
except YahooUser.DoesNotExist:
return error_page(req, "You are signed in as an invalid user")
try:
pendingrequest = PendingRequest.objects.get(
id = id,
openid__user = yahoouser,
)
except PendingRequest.DoesNotExist:
return error_page(req, "Pending request does not exist")
orequest = pendingrequest.unpickled()
# If they already trust the trust_root, log them straight in after deleting
# the pending request
if openid_is_authorized(req, orequest.identity, orequest.trust_root):
pendingrequest.delete()
oresponse = orequest.answer(True)
server = Server(FileOpenIDStore(OPENID_FILESTORE))
webresponse = server.encodeResponse(oresponse)
return django_response(webresponse)
# Show them the decide page
return openid_decide_page(req, orequest, pendingrequest.id)
def openid_decide_page(req, orequest, pending_id = None):
"""
The page that asks the user if they really want to sign in to the site, and
lets them add the consumer to their trusted whitelist.
"""
# If user is logged in, ask if they want to trust this trust_root
# If they are NOT logged in, show the landing page
yhash = yhash_from_cookie(req)
if not yhash:
return landing_page(req, orequest)
try:
yahoouser = YahooUser.objects.get(yahoo_hash = yhash)
except YahooUser.DoesNotExist:
return error_page(req, "You are signed in as an invalid user")
# Check that the user owns the requested identity
subdomain = orequest.identity.replace('http://', '').replace(
'.idproxy.net/', '')
try:
yahoouser.openid_set.get(subdomain = subdomain)
except OpenId.DoesNotExist:
return error_page(req, "You are signed in to idproxy.net but do " + \
"not own the requested OpenID")
# They are logged in - ask if they want to trust this root
req.session['orequest'] = orequest
return render_to_response('decide.html', {
'title': 'Trust this site?',
'djnonce': nonceprotect.new_nonce_for_path_and_yhash('/decide/', yhash),
'trust_root': orequest.trust_root,
'identity': orequest.identity,
'yahoouser': yahoouser,
'pending_id': pending_id,
})
import pprint
from cgi import escape
return HttpResponse(
("This is the page where you decide on identity %s page %s" % (
orequest.identity, orequest.trust_root
)) + escape(pprint.pformat(orequest.__dict__.items()))
)
def error_page(req, msg):
return render_to_response('error.html', {
'title': 'Error',
'msg': msg,
})
def openid_decide_post(req):
"""
The user has submitted the openid_decide_page
"""
assert req.POST, "POST required for this page"
orequest = req.session.get('orequest', None)
assert orequest, "Should have orequest stashed in their session"
yhash = yhash_from_cookie(req)
if not yhash:
return error_page(req, "You are not signed in")
try:
yahoouser = YahooUser.objects.get(yahoo_hash = yhash)
except YahooUser.DoesNotExist:
return error_page(req, "You are signed in as an invalid user")
try:
nonceprotect.validate_nonce_against_path_and_yhash(
req.POST.get('djnonce', ''), req.path, yhash
)
except nonceprotect.BadNonce:
return error_page(req, "Your form has expired; hit back and try again")
# Ensure that auth_user owns the requested domain
try:
subdomain = orequest.identity.replace(
'http://', '').replace('.idproxy.net/', '')
openid = yahoouser.openid_set.get(subdomain=subdomain)
except OpenId.DoesNotExist:
return error_page(req, "You do not own that OpenID")
# If there was an associated pending request, delete it
if req.POST.get('pending_id', None):
PendingRequest.objects.get(
id = req.POST.get('pending_id'),
openid = openid
).delete()
if req.POST.get('cancel', None):
# Cancel their request
del req.session['orequest']
return HttpResponseRedirect(orequest.getCancelURL())
if req.POST.get('always', None):
# Save this to their trust roots
openid.trustedroot_set.create(
trust_root = orequest.trust_root,
created = datetime.datetime.now()
)
# Redirect and say yes
oresponse = orequest.answer(True)
server = Server(FileOpenIDStore(OPENID_FILESTORE))
webresponse = server.encodeResponse(oresponse)
# Add this to their history
OpenIdHistory.objects.create(
openid = openid,
trust_root = orequest.trust_root
)
return django_response(webresponse)
def landing_page(req, orequest):
"""
The page shown when the user attempts to sign in somewhere using OpenID
but is not authenticated with the site. For idproxy.net, a message telling
them to log in manually is displayed.
"""
# stash details so when they have logged in they can complete the auth
subdomain = orequest.identity.replace('http://', '').replace(
'.idproxy.net/', '')
try:
openid = OpenId.objects.get(subdomain = subdomain)
except OpenId.DoesNotExist:
return error_page(req, 'That OpenID does not exist')
openid.pendingrequest_set.create(
trust_root = orequest.trust_root,
orequest = pickle.dumps(orequest)
)
return render_to_response('landing.html', {
'title': 'You need to sign in',
})
def openid_cancel(request):
"Called when the user cancels a pending OpenID auth request"
if request.session.get('AuthorizationInfo', None):
del request.session['AuthorizationInfo']
return HttpResponseRedirect(request.GET.get('n', '/'))
import urlparse
def openid_is_authorized(req, identity_url, trust_root):
"""
Check that they own the given identity URL, and that the trust_root is
in their whitelist of trusted sites.
"""
domain = urlparse.urlparse(identity_url)[1]
subdomain = domain.split('.')[0]
yhash = yhash_from_cookie(req)
if not yhash:
return False
try:
yahoouser = YahooUser.objects.get(yahoo_hash = yhash)
except YahooUser.DoesNotExist:
return False
# Ensure that auth_user owns the requested domain
try:
openid = yahoouser.openid_set.get(subdomain=subdomain)
except OpenId.DoesNotExist:
return False
if openid.trustedroot_set.filter(
trust_root = trust_root
).count() < 1:
# They don't trust this root yet
return False
# Add this to their history
OpenIdHistory.objects.create(openid=openid, trust_root=str(trust_root))
return True
|
Comments