pyrom/app/routes/users.py
2025-07-01 23:20:36 +03:00

363 lines
12 KiB
Python

from flask import (
Blueprint, render_template, request, redirect, url_for, flash, session, current_app
)
from functools import wraps
from ..db import db
from ..models import Users, Sessions, Subscriptions
from ..constants import InfoboxKind, PermissionLevel
from ..auth import digest, verify
import secrets
import time
import re
bp = Blueprint("users", __name__, url_prefix = "/users/")
def is_logged_in():
return "pyrom_session_key" in session
def get_active_user():
if not is_logged_in():
return None
sess = Sessions.find({"key": session["pyrom_session_key"]})
if not sess:
return None
return Users.find({"id": sess.user_id})
def create_session(user_id):
return Sessions.create({
"key": secrets.token_hex(16),
"user_id": user_id,
"expires_at": int(time.time()) + 30 * 24 * 60 * 60,
})
def validate_password(password):
pattern = r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[\W_])(?!.*\s).{10,255}$'
return bool(re.fullmatch(pattern, password))
def validate_username(username):
pattern = r'^[a-zA-Z0-9_-]{3,20}$'
return bool(re.fullmatch(pattern, username))
def redirect_if_logged_in(*args, **kwargs):
def decorator(view_func):
@wraps(view_func)
def wrapper(*view_args, **view_kwargs):
if is_logged_in():
# resolve callables
processed_kwargs = {
k: v(**view_kwargs) if callable(v) else v
for k, v in kwargs.items()
}
endpoint = args[0] if args else processed_kwargs.get("endpoint")
if endpoint.startswith("."):
blueprint = current_app.blueprints.get(view_func.__name__.split(".")[0])
if blueprint:
endpoint = endpoint.lstrip(".")
return redirect(url_for(f"{blueprint.name}.{endpoint}", **processed_kwargs))
return redirect(url_for(*args, **processed_kwargs))
return view_func(*view_args, **view_kwargs)
return wrapper
return decorator
def login_required(view_func):
@wraps(view_func)
def wrapper(*args, **kwargs):
if not is_logged_in():
return redirect(url_for("users.log_in"))
return view_func(*args, **kwargs)
return wrapper
def mod_only(*args, **kwargs):
def decorator(view_func):
@wraps(view_func)
def wrapper(*view_args, **view_kwargs):
if not get_active_user().is_mod():
# resolve callables
processed_kwargs = {
k: v(**view_kwargs) if callable(v) else v
for k, v in kwargs.items()
}
endpoint = args[0] if args else processed_kwargs.get("endpoint")
if endpoint.startswith("."):
blueprint = current_app.blueprints.get(view_func.__name__.split(".")[0])
if blueprint:
endpoint = endpoint.lstrip(".")
return redirect(url_for(f"{blueprint.name}.{endpoint}", **processed_kwargs))
return redirect(url_for(*args, **processed_kwargs))
return view_func(*view_args, **view_kwargs)
return wrapper
return decorator
def admin_only(*args, **kwargs):
def decorator(view_func):
@wraps(view_func)
def wrapper(*view_args, **view_kwargs):
if not get_active_user().is_admin():
# resolve callables
processed_kwargs = {
k: v(**view_kwargs) if callable(v) else v
for k, v in kwargs.items()
}
endpoint = args[0] if args else processed_kwargs.get("endpoint")
if endpoint.startswith("."):
blueprint = current_app.blueprints.get(view_func.__name__.split(".")[0])
if blueprint:
endpoint = endpoint.lstrip(".")
return redirect(url_for(f"{blueprint.name}.{endpoint}", **processed_kwargs))
return redirect(url_for(*args, **processed_kwargs))
return view_func(*view_args, **view_kwargs)
return wrapper
return decorator
@bp.get("/log_in")
@redirect_if_logged_in(".page", username = lambda: get_active_user().username)
def log_in():
return render_template("users/log_in.html")
@bp.post("/log_in")
@redirect_if_logged_in(".page", username = lambda: get_active_user().username)
def log_in_post():
target_user = Users.find({
"username": request.form['username']
})
if not target_user:
flash("Incorrect username or password.", InfoboxKind.ERROR)
return redirect(url_for("users.log_in"))
if not verify(target_user.password_hash, request.form['password']):
flash("Incorrect username or password.", InfoboxKind.ERROR)
return redirect(url_for("users.log_in"))
session_obj = create_session(target_user.id)
session['pyrom_session_key'] = session_obj.key
flash("Logged in!", InfoboxKind.INFO)
return redirect(url_for("users.log_in"))
@bp.get("/sign_up")
@redirect_if_logged_in(".page", username = lambda: get_active_user().username)
def sign_up():
return render_template("users/sign_up.html")
@bp.post("/sign_up")
@redirect_if_logged_in(".page", username = lambda: get_active_user().username)
def sign_up_post():
username = request.form['username']
password = request.form['password']
password_confirm = request.form['password-confirm']
if not validate_username(username):
flash("Invalid username.", InfoboxKind.ERROR)
return redirect(url_for("users.sign_up"))
user_exists = Users.count({"username": username}) > 0
if user_exists:
flash(f"Username '{username}' is already taken.", InfoboxKind.ERROR)
return redirect(url_for("users.sign_up"))
if not validate_password(password):
flash("Invalid password.", InfoboxKind.ERROR)
return redirect(url_for("users.sign_up"))
if password != password_confirm:
flash("Passwords do not match.", InfoboxKind.ERROR)
return redirect(url_for("users.sign_up"))
hashed = digest(password)
new_user = Users.create({
"username": username,
"password_hash": hashed,
"permission": PermissionLevel.GUEST.value,
})
session_obj = create_session(new_user.id)
session['pyrom_session_key'] = session_obj.key
flash("Signed up successfully!", InfoboxKind.INFO)
return redirect(url_for("users.sign_up"))
@bp.get("/<username>")
def page(username):
target_user = Users.find({"username": username})
return render_template("users/user.html", target_user = target_user)
@bp.get("/<username>/setings")
@login_required
def settings(username):
return "stub"
@bp.post("/log_out")
@login_required
def log_out():
user = get_active_user()
session_obj = Sessions.find({"key": session['pyrom_session_key']})
session_obj.delete()
session.clear()
return redirect(url_for(".log_in"))
@bp.post("/confirm_user/<user_id>")
@login_required
@mod_only("topics.all_topics")
def confirm_user(user_id):
target_user = Users.find({"id": user_id})
if not target_user:
return "no"
if int(target_user.permission) > PermissionLevel.GUEST.value:
return "no"
target_user.update({
"permission": PermissionLevel.USER.value,
"confirmed_on": int(time.time()),
})
return redirect(url_for(".page", username=target_user.username))
@bp.post("/mod_user/<user_id>")
@login_required
@admin_only("topics.all_topics")
def mod_user(user_id):
target_user = Users.find({"id": user_id})
if not target_user:
return "no"
if target_user.is_mod():
return "no"
target_user.update({
"permission": PermissionLevel.MODERATOR.value,
})
return redirect(url_for(".page", username=target_user.username))
@bp.post("/demod_user/<user_id>")
@login_required
@admin_only("topics.all_topics")
def demod_user(user_id):
target_user = Users.find({"id": user_id})
if not target_user:
return "no"
if not target_user.is_mod():
return "no"
target_user.update({
"permission": PermissionLevel.USER.value,
})
return redirect(url_for(".page", username=target_user.username))
@bp.post("/guest_user/<user_id>")
@login_required
@admin_only("topics.all_topics")
def guest_user(user_id):
target_user = Users.find({"id": user_id})
if not target_user:
return "no"
if target_user.is_mod():
return "no"
target_user.update({
"permission": PermissionLevel.GUEST.value,
})
return redirect(url_for(".page", username=target_user.username))
@bp.get("/<username>/inbox")
@login_required
def inbox(username):
user = get_active_user()
if username != user.username:
return redirect(url_for(".inbox", username = user.username))
new_posts = []
subscription = Subscriptions.find({"user_id": user.id})
all_subscriptions = None
total_unreads_count = None
if subscription:
all_subscriptions = user.get_all_subscriptions()
q = """
WITH thread_metadata AS (
SELECT
posts.thread_id, threads.slug AS thread_slug, threads.title AS thread_title, COUNT(*) AS unread_count, MAX(posts.created_at) AS newest_post_time
FROM
posts
LEFT JOIN
threads ON threads.id = posts.thread_id
LEFT JOIN
subscriptions ON subscriptions.thread_id = posts.thread_id
WHERE subscriptions.user_id = ? AND posts.created_at > subscriptions.last_seen
GROUP BY posts.thread_id
)
SELECT
tm.thread_id, tm.thread_slug, tm.thread_title, tm.unread_count, tm.newest_post_time,
posts.id, posts.created_at, post_history.content, post_history.edited_at, users.username, users.status, avatars.file_path AS avatar_path, posts.thread_id, users.id AS user_id, post_history.original_markup, users.signature_rendered
FROM
thread_metadata tm
JOIN
posts ON posts.thread_id = tm.thread_id
JOIN
post_history ON posts.current_revision_id = post_history.id
JOIN
users ON posts.user_id = users.id
LEFT JOIN
threads ON threads.id = posts.thread_id
LEFT JOIN
avatars ON users.avatar_id = avatars.id
LEFT JOIN
subscriptions ON subscriptions.thread_id = posts.thread_id
WHERE
subscriptions.user_id = ? AND posts.created_at > subscriptions.last_seen
ORDER BY
tm.newest_post_time DESC, posts.created_at ASC"""
new_posts_raw = db.query(q, user.id, user.id)
current_thread_id = None
current_thread_group = None
total_unreads_count = 0
for row in new_posts_raw:
if row['thread_id'] != current_thread_id:
current_thread_group = {
'thread_id': row['thread_id'],
'thread_title': row['thread_title'],
'unread_count': row['unread_count'],
'thread_slug': row['thread_slug'],
'newest_post_time': row['newest_post_time'],
'posts': [],
}
total_unreads_count += int(row['unread_count'])
new_posts.append(current_thread_group)
current_thread_id = row['thread_id']
current_thread_group['posts'].append({
'id': row['id'],
'created_at': row['created_at'],
'content': row['content'],
'edited_at': row['edited_at'],
'username': row['username'],
'status': row['status'],
'avatar_path': row['avatar_path'],
'thread_id': row['thread_id'],
'user_id': row['user_id'],
'original_markup': row['original_markup'],
'signature_rendered': row['signature_rendered']
})
return render_template("users/inbox.html", new_posts = new_posts, total_unreads_count = total_unreads_count, all_subscriptions = all_subscriptions)