from flask import session, flash, redirect, url_for, abort, request, current_app from .models import Sessions, Users from argon2 import PasswordHasher from functools import wraps import secrets import hmac import time import re ph = PasswordHasher() FORBIDDEN_USERNAMES = ( 'administrator', 'administration', 'administrators', 'system', 'mod', 'moderator', 'moderators', 'moderation', 'deleted-user', 'deleted_user', 'support', #routes 'log-in', 'log_in', 'login', 'sign-up', 'sign_up', 'signup', ) def digest(password): return ph.hash(password) def verify(expected, given): try: return ph.verify(expected, given) except: return False def is_logged_in() -> bool: if 'pyrom_session_key' not in session: return False sess = Sessions.find({'key': session['pyrom_session_key']}) if not sess: return False if sess.expires_at < int(time.time()): session.clear() sess.delete() # flash('Your session expired.;Please log in again.', InfoboxKind.INFO) return False return True def get_active_user() -> Users | None: if not is_logged_in(): return None sess = Sessions.find({'key': session['pyrom_session_key']}) return Users.find({'id': sess.user_id}) def create_session(user_id, temporary=False): expires_days = 2 if temporary else 31 return Sessions.create({ 'key': secrets.token_hex(16), 'user_id': user_id, 'expires_at': int(time.time()) + (expires_days * 24 * 60 * 60), }) def parse_username(username: str) -> Tuple[str, str]: """first is the unmodified name/display name, second is username""" if len(username) < 3: raise ValueError if username.lower() in FORBIDDEN_USERNAMES: raise ValueError invalid_regex = r'[^a-zA-Z0-9_-]' return re.sub(invalid_regex, '_', username.lower())[:24], username def is_password_valid(password: str) -> bool: return re.match(r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[\W_])(?!.*\s).{10,255}$', password) is not None # annotations def login_required(view_func): @wraps(view_func) def wrapper(*args, **kwargs): if not is_logged_in(): return redirect(url_for('users.log_in')) return view_func(*args, **kwargs) return wrapper def mod_only(view_func): @wraps(view_func) def wrapper(*args, **kwargs): if not is_logged_in(): abort(403) if not get_active_user().is_mod(): abort(403) return view_func(*args, **kwargs) return wrapper def csrf_verified(view_func): """ protects a request with a form against csrf and invalidates the csrf token stored in the session. requires @login_requred. """ @wraps(view_func) def wrapper(*args, **kwargs): if not session.get('csrf'): abort(403) if not request.form.get('csrf'): abort(403) parts = request.form['csrf'].split('.') if len(parts) != 2: abort(403) given_message = parts[0] rng = bytes.fromhex(parts[1]) session_key = session['pyrom_session_key'] message = f'd${len(session_key)}${session_key}@{len(rng)}@{rng.hex()}' expected = hmac.digest(current_app.config['SECRET_KEY'].encode('utf-8'), message.encode('utf-8'), 'SHA256').hex() if not hmac.compare_digest(given_message, expected): abort(403) session.pop('csrf') return view_func(*args, **kwargs) return wrapper