pyrom/app/routes/users.py

285 lines
8.6 KiB
Python

from flask import (
Blueprint, render_template, request, redirect, url_for, flash, session, current_app
)
from functools import wraps
from ..models import Users, Sessions
from ..constants import InfoboxKind, PermissionLevel
from ..auth import digest, verify
import secrets
import time
import re
bp = Blueprint("users", __name__, url_prefix = "/users/")
def is_logged_in():
return "pyrom_session_key" in session
def get_active_user():
if not is_logged_in():
return None
sess = Sessions.find({"key": session["pyrom_session_key"]})
if not sess:
return None
return Users.find({"id": sess.user_id})
def create_session(user_id):
return Sessions.create({
"key": secrets.token_hex(16),
"user_id": user_id,
"expires_at": int(time.time()) + 30 * 24 * 60 * 60,
})
def validate_password(password):
pattern = r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[\W_])(?!.*\s).{10,255}$'
return bool(re.fullmatch(pattern, password))
def validate_username(username):
pattern = r'^[a-zA-Z0-9_-]{3,20}$'
return bool(re.fullmatch(pattern, username))
def redirect_if_logged_in(*args, **kwargs):
def decorator(view_func):
@wraps(view_func)
def wrapper(*view_args, **view_kwargs):
if is_logged_in():
# resolve callables
processed_kwargs = {
k: v(**view_kwargs) if callable(v) else v
for k, v in kwargs.items()
}
endpoint = args[0] if args else processed_kwargs.get("endpoint")
if endpoint.startswith("."):
blueprint = current_app.blueprints.get(view_func.__name__.split(".")[0])
if blueprint:
endpoint = endpoint.lstrip(".")
return redirect(url_for(f"{blueprint.name}.{endpoint}", **processed_kwargs))
return redirect(url_for(*args, **processed_kwargs))
return view_func(*view_args, **view_kwargs)
return wrapper
return decorator
def login_required(view_func):
@wraps(view_func)
def wrapper(*args, **kwargs):
if not is_logged_in():
return redirect(url_for("users.log_in"))
return view_func(*args, **kwargs)
return wrapper
def mod_only(*args, **kwargs):
def decorator(view_func):
@wraps(view_func)
def wrapper(*view_args, **view_kwargs):
if not get_active_user().is_mod():
# resolve callables
processed_kwargs = {
k: v(**view_kwargs) if callable(v) else v
for k, v in kwargs.items()
}
endpoint = args[0] if args else processed_kwargs.get("endpoint")
if endpoint.startswith("."):
blueprint = current_app.blueprints.get(view_func.__name__.split(".")[0])
if blueprint:
endpoint = endpoint.lstrip(".")
return redirect(url_for(f"{blueprint.name}.{endpoint}", **processed_kwargs))
return redirect(url_for(*args, **processed_kwargs))
return view_func(*view_args, **view_kwargs)
return wrapper
return decorator
def admin_only(*args, **kwargs):
def decorator(view_func):
@wraps(view_func)
def wrapper(*view_args, **view_kwargs):
if not get_active_user().is_admin():
# resolve callables
processed_kwargs = {
k: v(**view_kwargs) if callable(v) else v
for k, v in kwargs.items()
}
endpoint = args[0] if args else processed_kwargs.get("endpoint")
if endpoint.startswith("."):
blueprint = current_app.blueprints.get(view_func.__name__.split(".")[0])
if blueprint:
endpoint = endpoint.lstrip(".")
return redirect(url_for(f"{blueprint.name}.{endpoint}", **processed_kwargs))
return redirect(url_for(*args, **processed_kwargs))
return view_func(*view_args, **view_kwargs)
return wrapper
return decorator
@bp.get("/log_in")
@redirect_if_logged_in(".page", username = lambda: get_active_user().username)
def log_in():
return render_template("users/log_in.html")
@bp.post("/log_in")
@redirect_if_logged_in(".page", username = lambda: get_active_user().username)
def log_in_post():
target_user = Users.find({
"username": request.form['username']
})
if not target_user:
flash("Incorrect username or password.", InfoboxKind.ERROR)
return redirect(url_for("users.log_in"))
if not verify(target_user.password_hash, request.form['password']):
flash("Incorrect username or password.", InfoboxKind.ERROR)
return redirect(url_for("users.log_in"))
session_obj = create_session(target_user.id)
session['pyrom_session_key'] = session_obj.key
flash("Logged in!", InfoboxKind.INFO)
return redirect(url_for("users.log_in"))
@bp.get("/sign_up")
@redirect_if_logged_in(".page", username = lambda: get_active_user().username)
def sign_up():
return render_template("users/sign_up.html")
@bp.post("/sign_up")
@redirect_if_logged_in(".page", username = lambda: get_active_user().username)
def sign_up_post():
username = request.form['username']
password = request.form['password']
password_confirm = request.form['password-confirm']
if not validate_username(username):
flash("Invalid username.", InfoboxKind.ERROR)
return redirect(url_for("users.sign_up"))
user_exists = Users.count({"username": username}) > 0
if user_exists:
flash(f"Username '{username}' is already taken.", InfoboxKind.ERROR)
return redirect(url_for("users.sign_up"))
if not validate_password(password):
flash("Invalid password.", InfoboxKind.ERROR)
return redirect(url_for("users.sign_up"))
if password != password_confirm:
flash("Passwords do not match.", InfoboxKind.ERROR)
return redirect(url_for("users.sign_up"))
hashed = digest(password)
new_user = Users.create({
"username": username,
"password_hash": hashed,
"permission": PermissionLevel.GUEST.value,
})
session_obj = create_session(new_user.id)
session['pyrom_session_key'] = session_obj.key
flash("Signed up successfully!", InfoboxKind.INFO)
return redirect(url_for("users.sign_up"))
@bp.get("/<username>")
def page(username):
target_user = Users.find({"username": username})
return render_template("users/user.html", target_user = target_user)
@bp.get("/<username>/setings")
@login_required
def settings(username):
return "stub"
@bp.get("/<username>/inbox")
@login_required
def inbox(username):
return "stub"
@bp.post("/log_out")
@login_required
def log_out():
user = get_active_user()
session_obj = Sessions.find({"key": session['pyrom_session_key']})
session_obj.delete()
session.clear()
return redirect(url_for(".log_in"))
@bp.post("/confirm_user/<user_id>")
@login_required
@mod_only("topics.all_topics")
def confirm_user(user_id):
target_user = Users.find({"id": user_id})
if not target_user:
return "no"
if int(target_user.permission) > PermissionLevel.GUEST.value:
return "no"
target_user.update({
"permission": PermissionLevel.USER.value,
"confirmed_on": int(time.time()),
})
return redirect(url_for(".page", username=target_user.username))
@bp.post("/mod_user/<user_id>")
@login_required
@admin_only("topics.all_topics")
def mod_user(user_id):
target_user = Users.find({"id": user_id})
if not target_user:
return "no"
if target_user.is_mod():
return "no"
target_user.update({
"permission": PermissionLevel.MODERATOR.value,
})
return redirect(url_for(".page", username=target_user.username))
@bp.post("/demod_user/<user_id>")
@login_required
@admin_only("topics.all_topics")
def demod_user(user_id):
target_user = Users.find({"id": user_id})
if not target_user:
return "no"
if not target_user.is_mod():
return "no"
target_user.update({
"permission": PermissionLevel.USER.value,
})
return redirect(url_for(".page", username=target_user.username))
@bp.post("/guest_user/<user_id>")
@login_required
@admin_only("topics.all_topics")
def guest_user(user_id):
target_user = Users.find({"id": user_id})
if not target_user:
return "no"
if target_user.is_mod():
return "no"
target_user.update({
"permission": PermissionLevel.GUEST.value,
})
return redirect(url_for(".page", username=target_user.username))