125 lines
3.5 KiB
Python
125 lines
3.5 KiB
Python
from flask import session, flash, redirect, url_for, abort, request, current_app
|
|
from .models import Sessions, Users
|
|
from argon2 import PasswordHasher
|
|
from functools import wraps
|
|
import secrets
|
|
import hmac
|
|
import time
|
|
import re
|
|
|
|
ph = PasswordHasher()
|
|
|
|
FORBIDDEN_USERNAMES = (
|
|
'administrator', 'administration', 'administrators',
|
|
'system',
|
|
'mod', 'moderator', 'moderators', 'moderation',
|
|
'deleted-user', 'deleted_user',
|
|
'support',
|
|
#routes
|
|
'log-in', 'log_in', 'login',
|
|
'sign-up', 'sign_up', 'signup',
|
|
)
|
|
|
|
def digest(password):
|
|
return ph.hash(password)
|
|
|
|
def verify(expected, given):
|
|
try:
|
|
return ph.verify(expected, given)
|
|
except:
|
|
return False
|
|
|
|
def is_logged_in() -> bool:
|
|
if 'pyrom_session_key' not in session:
|
|
return False
|
|
sess = Sessions.find({'key': session['pyrom_session_key']})
|
|
if not sess:
|
|
return False
|
|
if sess.expires_at < int(time.time()):
|
|
session.clear()
|
|
sess.delete()
|
|
# flash('Your session expired.;Please log in again.', InfoboxKind.INFO)
|
|
return False
|
|
return True
|
|
|
|
def get_active_user() -> Users | None:
|
|
if not is_logged_in():
|
|
return None
|
|
|
|
sess = Sessions.find({'key': session['pyrom_session_key']})
|
|
return Users.find({'id': sess.user_id})
|
|
|
|
def create_session(user_id, temporary=False):
|
|
expires_days = 2 if temporary else 31
|
|
return Sessions.create({
|
|
'key': secrets.token_hex(16),
|
|
'user_id': user_id,
|
|
'expires_at': int(time.time()) + (expires_days * 24 * 60 * 60),
|
|
})
|
|
|
|
def parse_username(username: str) -> Tuple[str, str]:
|
|
"""first is the unmodified name/display name, second is username"""
|
|
if len(username) < 3:
|
|
raise ValueError
|
|
|
|
if username.lower() in FORBIDDEN_USERNAMES:
|
|
raise ValueError
|
|
|
|
invalid_regex = r'[^a-zA-Z0-9_-]'
|
|
return re.sub(invalid_regex, '_', username.lower())[:24], username
|
|
|
|
def is_password_valid(password: str) -> bool:
|
|
return re.match(r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[\W_])(?!.*\s).{10,255}$', password) is not None
|
|
|
|
# annotations
|
|
def login_required(view_func):
|
|
@wraps(view_func)
|
|
def wrapper(*args, **kwargs):
|
|
if not is_logged_in():
|
|
return redirect(url_for('users.log_in'))
|
|
return view_func(*args, **kwargs)
|
|
return wrapper
|
|
|
|
def mod_only(view_func):
|
|
@wraps(view_func)
|
|
def wrapper(*args, **kwargs):
|
|
if not is_logged_in():
|
|
abort(403)
|
|
if not get_active_user().is_mod():
|
|
abort(403)
|
|
return view_func(*args, **kwargs)
|
|
return wrapper
|
|
|
|
def csrf_verified(view_func):
|
|
"""
|
|
protects a request with a form against csrf and invalidates the csrf token stored in the session.
|
|
|
|
requires @login_requred.
|
|
"""
|
|
|
|
@wraps(view_func)
|
|
def wrapper(*args, **kwargs):
|
|
if not session.get('csrf'):
|
|
abort(403)
|
|
if not request.form.get('csrf'):
|
|
abort(403)
|
|
|
|
parts = request.form['csrf'].split('.')
|
|
if len(parts) != 2:
|
|
abort(403)
|
|
|
|
given_message = parts[0]
|
|
rng = bytes.fromhex(parts[1])
|
|
session_key = session['pyrom_session_key']
|
|
message = f'd${len(session_key)}${session_key}@{len(rng)}@{rng.hex()}'
|
|
expected = hmac.digest(current_app.config['SECRET_KEY'].encode('utf-8'), message.encode('utf-8'), 'SHA256').hex()
|
|
|
|
if not hmac.compare_digest(given_message, expected):
|
|
abort(403)
|
|
|
|
session.pop('csrf')
|
|
|
|
return view_func(*args, **kwargs)
|
|
return wrapper
|
|
|