improve handling of request cookies

Pando's `request.headers.cookie` silently ignores parsing errors. This commit removes that attribute entirely by monkey-patching Pando.
This commit is contained in:
Changaco 2024-07-13 11:03:12 +02:00
parent 510aa18c98
commit af8c5eb66d
10 changed files with 41 additions and 29 deletions

View File

@ -36,13 +36,9 @@ def get_short_traceback(exception):
return ' none'
def render_cookies(request):
cookies = request.headers.cookie
if cookies:
return '; '.join(
f"{name}=[{len(value.value)} bytes]" for name, value in cookies.items()
)
else:
return 'none'
return '; '.join(
f"{name}=[{len(value)} bytes]" for name, value in request.cookies.items()
) or 'none'
[----------------------------------------]

View File

@ -681,9 +681,9 @@ def add_currency_to_state(request, user, locale):
qs_currency = request.qs.get('currency')
if qs_currency in CURRENCIES:
return {'currency': qs_currency}
cookie = request.headers.cookie.get('currency')
if cookie and cookie.value in CURRENCIES:
return {'currency': cookie.value}
cookie = request.cookies.get('currency')
if cookie in CURRENCIES:
return {'currency': cookie}
if user:
return {'currency': user.main_currency}
else:

View File

@ -10,6 +10,7 @@ else:
_init_modules = set(sys.modules.keys())
import builtins
import http.cookies
from ipaddress import ip_address
import os
import signal
@ -342,6 +343,28 @@ def _Querystring_serialize(self, **kw):
return ('?' + urlencode(self, doseq=True)) if self else ''
aspen.http.request.Querystring.serialize = _Querystring_serialize
pando.http.request.Headers.__init__ = pando.http.mapping.CaseInsensitiveMapping.__init__
if hasattr(pando.http.request.Request, 'cookies'):
raise Warning('pando.http.request.Request.cookies already exists')
def _cookies(self):
cookies = self.__dict__.get('cookies')
if cookies is None:
header = self.headers.get(b'Cookie', b'').decode('utf8', 'backslashreplace')
cookies = {}
for item in header.split(';'):
try:
k, v = item.split('=', 1)
except ValueError:
continue
k = k.strip()
if len(v) > 1 and v.startswith('"') and v.endswith('"'):
v = http.cookies._unquote(v)
cookies[k] = v
self.__dict__['cookies'] = cookies
return cookies
pando.http.request.Request.cookies = property(_cookies)
if hasattr(pando.http.request.Request, 'queued_success_messages'):
raise Warning('pando.http.request.Request.queued_success_messages already exists')
def _queued_success_messages(self):

View File

@ -262,8 +262,8 @@ def authenticate_user_if_possible(csrf_token, request, response, state, user, _)
# We want to try cookie auth first, but we want password and email auth to
# supersede it.
session_p = None
if SESSION in request.headers.cookie:
creds = request.headers.cookie[SESSION].value.split(':', 2)
if SESSION in request.cookies:
creds = request.cookies[SESSION].split(':', 2)
if len(creds) == 2:
creds = [creds[0], 1, creds[1]]
if len(creds) == 3:

View File

@ -50,10 +50,7 @@ class CSRF_Token:
def token(self):
if not self._token:
state = website.state.get()
try:
cookie_token = state['request'].headers.cookie[CSRF_TOKEN].value
except KeyError:
cookie_token = ''
cookie_token = state['request'].cookies.get(CSRF_TOKEN, '')
if len(cookie_token) == TOKEN_LENGTH:
self._token = cookie_token
else:
@ -77,10 +74,7 @@ def reject_forgeries(state, request, response, website, _):
return
# Get token from cookies.
try:
cookie_token = request.headers.cookie[CSRF_TOKEN].value
except KeyError:
cookie_token = ""
cookie_token = request.cookies.get(CSRF_TOKEN)
if not cookie_token:
raise response.error(403, _(
"A security check has failed. Please make sure your browser is "
@ -124,7 +118,7 @@ def require_cookie(state):
check that the client supports cookies.
"""
request = state['request']
if request.headers.cookie.get(CSRF_TOKEN):
if request.cookies.get(CSRF_TOKEN):
request.qs.pop('cookie_sent', None)
return
_, response, website = state['_'], state['response'], state['website']

View File

@ -105,8 +105,7 @@ def detect_obsolete_browsers(request, response, state):
if b'MSIE' in request.headers.get(b'User-Agent', b''):
if state.get('etag'):
return
cookie = request.headers.cookie.get('obsolete_browser_warning')
if cookie and cookie.value == 'ignore':
if request.cookies.get('obsolete_browser_warning') == 'ignore':
return
if request.method == 'POST':
try:

View File

@ -28,7 +28,7 @@ query_id = platform.get_query_id(request.qs)
# Check that we have a cookie that matches the query id (CSRF prevention)
cookie_name = platform.name+'_'+domain+('_' if domain else '')+query_id
try:
cookie_value = request.headers.cookie[cookie_name].value
cookie_value = request.cookies[cookie_name]
except KeyError:
raise response.error(400, f"Cookie {cookie_name!r} not found")
if not cookie_value:

View File

@ -10,8 +10,8 @@ account = AccountElsewhere.from_id(account_id, _raise=False)
if not account:
raise response.invalid_input(account_id, 'id', 'querystring')
cookie_name = 'connect_%s' % account_id
token = request.headers.cookie.get(cookie_name)
if not token or not account.check_connect_token(token.value):
token = request.cookies.get(cookie_name)
if not token or not account.check_connect_token(token):
raise response.invalid_input(token, cookie_name, 'cookies')
p = Participant.from_id(p_id, _raise=False) if p_id else user

View File

@ -28,8 +28,8 @@ if not account:
raise response.invalid_input(account_id, 'account_id', 'body')
cookie_name = 'connect_%s' % account_id
token = request.headers.cookie.get(cookie_name)
if not token or not account.check_connect_token(token.value):
token = request.cookies.get(cookie_name)
if not token or not account.check_connect_token(token):
raise response.invalid_input(token, cookie_name, 'cookies')
account.save_connect_token(None, None)

View File

@ -72,7 +72,7 @@ elif 'state' in request.qs:
# Get the cookie that matches the query id
cookie_name = str(provider_name + '_' + query_id)
try:
cookie_value = request.headers.cookie[cookie_name].value
cookie_value = request.cookies[cookie_name]
except KeyError:
raise response.error(400, 'Missing cookie')
if not cookie_value: