from flask import ( Blueprint, render_template, request, redirect, url_for, flash, session, current_app ) from functools import wraps from ..db import db from ..lib.babycode import babycode_to_html from ..models import Users, Sessions, Subscriptions, Avatars from ..constants import InfoboxKind, PermissionLevel from ..auth import digest, verify from wand.image import Image from wand.exceptions import WandException import secrets import time import re import os bp = Blueprint("users", __name__, url_prefix = "/users/") def validate_and_create_avatar(input_image, filename): try: with Image(blob=input_image) as img: img.strip() img.gravity = 'center' width, height = img.width, img.height min_dim = min(width, height) if min_dim > 256: ratio = 256.0 / min_dim new_width = int(width * ratio) new_height = int(height * ratio) img.resize(new_width, new_height) width, height = img.width, img.height crop_size = min(width, height) x_offset = (width - crop_size) // 2 y_offset = (height - crop_size) // 2 img.crop(left=x_offset, top=y_offset, width=crop_size, height=crop_size) img.resize(256, 256) img.format = 'webp' img.compression_quality = 85 img.save(filename=filename) return True except WandException: return False def is_logged_in(): return "pyrom_session_key" in session def get_active_user(): if not is_logged_in(): return None sess = Sessions.find({"key": session["pyrom_session_key"]}) if not sess: return None return Users.find({"id": sess.user_id}) def create_session(user_id): return Sessions.create({ "key": secrets.token_hex(16), "user_id": user_id, "expires_at": int(time.time()) + 32 * 24 * 60 * 60, }) def validate_password(password): pattern = r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[\W_])(?!.*\s).{10,255}$' return bool(re.fullmatch(pattern, password)) def validate_username(username): pattern = r'^[a-zA-Z0-9_-]{3,20}$' return bool(re.fullmatch(pattern, username)) def redirect_if_logged_in(*args, **kwargs): def decorator(view_func): @wraps(view_func) def wrapper(*view_args, **view_kwargs): if is_logged_in(): # resolve callables processed_kwargs = { k: v(**view_kwargs) if callable(v) else v for k, v in kwargs.items() } endpoint = args[0] if args else processed_kwargs.get("endpoint") if endpoint.startswith("."): blueprint = current_app.blueprints.get(view_func.__name__.split(".")[0]) if blueprint: endpoint = endpoint.lstrip(".") return redirect(url_for(f"{blueprint.name}.{endpoint}", **processed_kwargs)) return redirect(url_for(*args, **processed_kwargs)) return view_func(*view_args, **view_kwargs) return wrapper return decorator def login_required(view_func): @wraps(view_func) def wrapper(*args, **kwargs): if not is_logged_in(): return redirect(url_for("users.log_in")) return view_func(*args, **kwargs) return wrapper def mod_only(*args, **kwargs): def decorator(view_func): @wraps(view_func) def wrapper(*view_args, **view_kwargs): if not get_active_user().is_mod(): # resolve callables processed_kwargs = { k: v(**view_kwargs) if callable(v) else v for k, v in kwargs.items() } endpoint = args[0] if args else processed_kwargs.get("endpoint") if endpoint.startswith("."): blueprint = current_app.blueprints.get(view_func.__name__.split(".")[0]) if blueprint: endpoint = endpoint.lstrip(".") return redirect(url_for(f"{blueprint.name}.{endpoint}", **processed_kwargs)) return redirect(url_for(*args, **processed_kwargs)) return view_func(*view_args, **view_kwargs) return wrapper return decorator def admin_only(*args, **kwargs): def decorator(view_func): @wraps(view_func) def wrapper(*view_args, **view_kwargs): if not get_active_user().is_admin(): # resolve callables processed_kwargs = { k: v(**view_kwargs) if callable(v) else v for k, v in kwargs.items() } endpoint = args[0] if args else processed_kwargs.get("endpoint") if endpoint.startswith("."): blueprint = current_app.blueprints.get(view_func.__name__.split(".")[0]) if blueprint: endpoint = endpoint.lstrip(".") return redirect(url_for(f"{blueprint.name}.{endpoint}", **processed_kwargs)) return redirect(url_for(*args, **processed_kwargs)) return view_func(*view_args, **view_kwargs) return wrapper return decorator @bp.get("/log_in") @redirect_if_logged_in(".page", username = lambda: get_active_user().username) def log_in(): return render_template("users/log_in.html") @bp.post("/log_in") @redirect_if_logged_in(".page", username = lambda: get_active_user().username) def log_in_post(): target_user = Users.find({ "username": request.form['username'] }) if not target_user: flash("Incorrect username or password.", InfoboxKind.ERROR) return redirect(url_for("users.log_in")) if not verify(target_user.password_hash, request.form['password']): flash("Incorrect username or password.", InfoboxKind.ERROR) return redirect(url_for("users.log_in")) session_obj = create_session(target_user.id) session['pyrom_session_key'] = session_obj.key flash("Logged in!", InfoboxKind.INFO) return redirect(url_for("users.log_in")) @bp.get("/sign_up") @redirect_if_logged_in(".page", username = lambda: get_active_user().username) def sign_up(): return render_template("users/sign_up.html") @bp.post("/sign_up") @redirect_if_logged_in(".page", username = lambda: get_active_user().username) def sign_up_post(): username = request.form['username'] password = request.form['password'] password_confirm = request.form['password-confirm'] if not validate_username(username): flash("Invalid username.", InfoboxKind.ERROR) return redirect(url_for("users.sign_up")) user_exists = Users.count({"username": username}) > 0 if user_exists: flash(f"Username '{username}' is already taken.", InfoboxKind.ERROR) return redirect(url_for("users.sign_up")) if not validate_password(password): flash("Invalid password.", InfoboxKind.ERROR) return redirect(url_for("users.sign_up")) if password != password_confirm: flash("Passwords do not match.", InfoboxKind.ERROR) return redirect(url_for("users.sign_up")) hashed = digest(password) new_user = Users.create({ "username": username, "password_hash": hashed, "permission": PermissionLevel.GUEST.value, }) session_obj = create_session(new_user.id) session['pyrom_session_key'] = session_obj.key flash("Signed up successfully!", InfoboxKind.INFO) return redirect(url_for("users.sign_up")) @bp.get("/") def page(username): target_user = Users.find({"username": username}) return render_template("users/user.html", target_user = target_user) @bp.get("//settings") @login_required def settings(username): target_user = Users.find({'username': username}) if target_user.id != get_active_user().id: return redirect('.settings', username = get_active_user().username) return render_template('users/settings.html') @bp.post('//settings') @login_required def settings_form(username): # we silently ignore the passed username # and grab the correct user from the session user = get_active_user() topic_sort_by = request.form.get('topic_sort_by', default='activity') if topic_sort_by == 'activity' or topic_sort_by == 'thread': sort_by = session['sort_by'] = topic_sort_by status = request.form.get('status', default="")[:100] original_sig = request.form.get('signature', default='') rendered_sig = babycode_to_html(original_sig) session['subscribe_by_default'] = request.form.get('subscribe_by_default', default='on') == 'on' user.update({ 'status': status, 'signature_original_markup': original_sig, 'signature_rendered': rendered_sig, }) flash('Settings updated.', InfoboxKind.INFO) return redirect(url_for('.settings', username=user.username)) @bp.post('//set_avatar') @login_required def set_avatar(username): user = get_active_user() if user.is_guest(): return 'no' if 'avatar' not in request.files: return 'no!...' file = request.files['avatar'] if file.filename == '': return 'no..?' file_bytes = file.read() now = int(time.time()) filename = f"u{user.id}d{now}.webp" output_path = os.path.join(current_app.config['AVATAR_UPLOAD_PATH'], filename) proxied_filename = f"/static/avatars/{filename}" res = validate_and_create_avatar(file_bytes, output_path) if res: flash('Avatar updated.', InfoboxKind.INFO) avatar = Avatars.create({ 'file_path': proxied_filename, 'uploaded_at': now, }) old_avatar = Avatars.find({'id': user.avatar_id}) user.update({'avatar_id': avatar.id}) if int(old_avatar.id) != 1: # delete old avi, but not default filename = os.path.join(current_app.config['AVATAR_UPLOAD_PATH'], os.path.basename(old_avatar.file_path)) os.remove(filename) old_avatar.delete() return redirect(url_for('.settings', username=user.username)) else: return 'uhhhh no' @bp.post('//clear_avatar') @login_required def clear_avatar(username): user = get_active_user() if user.is_default_avatar(): return 'no' old_avatar = Avatars.find({'id': user.avatar_id}) user.update({'avatar_id': 1}) # delete old avi filename = os.path.join(current_app.config['AVATAR_UPLOAD_PATH'], os.path.basename(old_avatar.file_path)) os.remove(filename) old_avatar.delete() return redirect(url_for('.settings', username=user.username)) @bp.post("/log_out") @login_required def log_out(): user = get_active_user() session_obj = Sessions.find({"key": session['pyrom_session_key']}) session_obj.delete() session.clear() return redirect(url_for(".log_in")) @bp.post("/confirm_user/") @login_required @mod_only("topics.all_topics") def confirm_user(user_id): target_user = Users.find({"id": user_id}) if not target_user: return "no" if int(target_user.permission) > PermissionLevel.GUEST.value: return "no" target_user.update({ "permission": PermissionLevel.USER.value, "confirmed_on": int(time.time()), }) return redirect(url_for(".page", username=target_user.username)) @bp.post("/mod_user/") @login_required @admin_only("topics.all_topics") def mod_user(user_id): target_user = Users.find({"id": user_id}) if not target_user: return "no" if target_user.is_mod(): return "no" target_user.update({ "permission": PermissionLevel.MODERATOR.value, }) return redirect(url_for(".page", username=target_user.username)) @bp.post("/demod_user/") @login_required @admin_only("topics.all_topics") def demod_user(user_id): target_user = Users.find({"id": user_id}) if not target_user: return "no" if not target_user.is_mod(): return "no" target_user.update({ "permission": PermissionLevel.USER.value, }) return redirect(url_for(".page", username=target_user.username)) @bp.post("/guest_user/") @login_required @admin_only("topics.all_topics") def guest_user(user_id): target_user = Users.find({"id": user_id}) if not target_user: return "no" if target_user.is_mod(): return "no" target_user.update({ "permission": PermissionLevel.GUEST.value, }) return redirect(url_for(".page", username=target_user.username)) @bp.get("//inbox") @login_required def inbox(username): user = get_active_user() if username != user.username: return redirect(url_for(".inbox", username = user.username)) new_posts = [] subscription = Subscriptions.find({"user_id": user.id}) all_subscriptions = None total_unreads_count = None if subscription: all_subscriptions = user.get_all_subscriptions() q = """ WITH thread_metadata AS ( SELECT posts.thread_id, threads.slug AS thread_slug, threads.title AS thread_title, COUNT(*) AS unread_count, MAX(posts.created_at) AS newest_post_time FROM posts LEFT JOIN threads ON threads.id = posts.thread_id LEFT JOIN subscriptions ON subscriptions.thread_id = posts.thread_id WHERE subscriptions.user_id = ? AND posts.created_at > subscriptions.last_seen GROUP BY posts.thread_id ) SELECT tm.thread_id, tm.thread_slug, tm.thread_title, tm.unread_count, tm.newest_post_time, posts.id, posts.created_at, post_history.content, post_history.edited_at, users.username, users.status, avatars.file_path AS avatar_path, posts.thread_id, users.id AS user_id, post_history.original_markup, users.signature_rendered FROM thread_metadata tm JOIN posts ON posts.thread_id = tm.thread_id JOIN post_history ON posts.current_revision_id = post_history.id JOIN users ON posts.user_id = users.id LEFT JOIN threads ON threads.id = posts.thread_id LEFT JOIN avatars ON users.avatar_id = avatars.id LEFT JOIN subscriptions ON subscriptions.thread_id = posts.thread_id WHERE subscriptions.user_id = ? AND posts.created_at > subscriptions.last_seen ORDER BY tm.newest_post_time DESC, posts.created_at ASC""" new_posts_raw = db.query(q, user.id, user.id) current_thread_id = None current_thread_group = None total_unreads_count = 0 for row in new_posts_raw: if row['thread_id'] != current_thread_id: current_thread_group = { 'thread_id': row['thread_id'], 'thread_title': row['thread_title'], 'unread_count': row['unread_count'], 'thread_slug': row['thread_slug'], 'newest_post_time': row['newest_post_time'], 'posts': [], } total_unreads_count += int(row['unread_count']) new_posts.append(current_thread_group) current_thread_id = row['thread_id'] current_thread_group['posts'].append({ 'id': row['id'], 'created_at': row['created_at'], 'content': row['content'], 'edited_at': row['edited_at'], 'username': row['username'], 'status': row['status'], 'avatar_path': row['avatar_path'], 'thread_id': row['thread_id'], 'user_id': row['user_id'], 'original_markup': row['original_markup'], 'signature_rendered': row['signature_rendered'] }) return render_template("users/inbox.html", new_posts = new_posts, total_unreads_count = total_unreads_count, all_subscriptions = all_subscriptions)