pyrom/app/routes/users.py
2025-07-02 02:23:11 +03:00

479 lines
16 KiB
Python

from flask import (
Blueprint, render_template, request, redirect, url_for, flash, session, current_app
)
from functools import wraps
from ..db import db
from ..lib.babycode import babycode_to_html
from ..models import Users, Sessions, Subscriptions, Avatars
from ..constants import InfoboxKind, PermissionLevel
from ..auth import digest, verify
from wand.image import Image
from wand.exceptions import WandException
import secrets
import time
import re
import os
bp = Blueprint("users", __name__, url_prefix = "/users/")
def validate_and_create_avatar(input_image, filename):
try:
with Image(blob=input_image) as img:
img.strip()
img.gravity = 'center'
width, height = img.width, img.height
min_dim = min(width, height)
if min_dim > 256:
ratio = 256.0 / min_dim
new_width = int(width * ratio)
new_height = int(height * ratio)
img.resize(new_width, new_height)
width, height = img.width, img.height
crop_size = min(width, height)
x_offset = (width - crop_size) // 2
y_offset = (height - crop_size) // 2
img.crop(left=x_offset, top=y_offset,
width=crop_size, height=crop_size)
img.resize(256, 256)
img.format = 'webp'
img.compression_quality = 85
img.save(filename=filename)
return True
except WandException:
return False
def is_logged_in():
return "pyrom_session_key" in session
def get_active_user():
if not is_logged_in():
return None
sess = Sessions.find({"key": session["pyrom_session_key"]})
if not sess:
return None
return Users.find({"id": sess.user_id})
def create_session(user_id):
return Sessions.create({
"key": secrets.token_hex(16),
"user_id": user_id,
"expires_at": int(time.time()) + 30 * 24 * 60 * 60,
})
def validate_password(password):
pattern = r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[\W_])(?!.*\s).{10,255}$'
return bool(re.fullmatch(pattern, password))
def validate_username(username):
pattern = r'^[a-zA-Z0-9_-]{3,20}$'
return bool(re.fullmatch(pattern, username))
def redirect_if_logged_in(*args, **kwargs):
def decorator(view_func):
@wraps(view_func)
def wrapper(*view_args, **view_kwargs):
if is_logged_in():
# resolve callables
processed_kwargs = {
k: v(**view_kwargs) if callable(v) else v
for k, v in kwargs.items()
}
endpoint = args[0] if args else processed_kwargs.get("endpoint")
if endpoint.startswith("."):
blueprint = current_app.blueprints.get(view_func.__name__.split(".")[0])
if blueprint:
endpoint = endpoint.lstrip(".")
return redirect(url_for(f"{blueprint.name}.{endpoint}", **processed_kwargs))
return redirect(url_for(*args, **processed_kwargs))
return view_func(*view_args, **view_kwargs)
return wrapper
return decorator
def login_required(view_func):
@wraps(view_func)
def wrapper(*args, **kwargs):
if not is_logged_in():
return redirect(url_for("users.log_in"))
return view_func(*args, **kwargs)
return wrapper
def mod_only(*args, **kwargs):
def decorator(view_func):
@wraps(view_func)
def wrapper(*view_args, **view_kwargs):
if not get_active_user().is_mod():
# resolve callables
processed_kwargs = {
k: v(**view_kwargs) if callable(v) else v
for k, v in kwargs.items()
}
endpoint = args[0] if args else processed_kwargs.get("endpoint")
if endpoint.startswith("."):
blueprint = current_app.blueprints.get(view_func.__name__.split(".")[0])
if blueprint:
endpoint = endpoint.lstrip(".")
return redirect(url_for(f"{blueprint.name}.{endpoint}", **processed_kwargs))
return redirect(url_for(*args, **processed_kwargs))
return view_func(*view_args, **view_kwargs)
return wrapper
return decorator
def admin_only(*args, **kwargs):
def decorator(view_func):
@wraps(view_func)
def wrapper(*view_args, **view_kwargs):
if not get_active_user().is_admin():
# resolve callables
processed_kwargs = {
k: v(**view_kwargs) if callable(v) else v
for k, v in kwargs.items()
}
endpoint = args[0] if args else processed_kwargs.get("endpoint")
if endpoint.startswith("."):
blueprint = current_app.blueprints.get(view_func.__name__.split(".")[0])
if blueprint:
endpoint = endpoint.lstrip(".")
return redirect(url_for(f"{blueprint.name}.{endpoint}", **processed_kwargs))
return redirect(url_for(*args, **processed_kwargs))
return view_func(*view_args, **view_kwargs)
return wrapper
return decorator
@bp.get("/log_in")
@redirect_if_logged_in(".page", username = lambda: get_active_user().username)
def log_in():
return render_template("users/log_in.html")
@bp.post("/log_in")
@redirect_if_logged_in(".page", username = lambda: get_active_user().username)
def log_in_post():
target_user = Users.find({
"username": request.form['username']
})
if not target_user:
flash("Incorrect username or password.", InfoboxKind.ERROR)
return redirect(url_for("users.log_in"))
if not verify(target_user.password_hash, request.form['password']):
flash("Incorrect username or password.", InfoboxKind.ERROR)
return redirect(url_for("users.log_in"))
session_obj = create_session(target_user.id)
session['pyrom_session_key'] = session_obj.key
flash("Logged in!", InfoboxKind.INFO)
return redirect(url_for("users.log_in"))
@bp.get("/sign_up")
@redirect_if_logged_in(".page", username = lambda: get_active_user().username)
def sign_up():
return render_template("users/sign_up.html")
@bp.post("/sign_up")
@redirect_if_logged_in(".page", username = lambda: get_active_user().username)
def sign_up_post():
username = request.form['username']
password = request.form['password']
password_confirm = request.form['password-confirm']
if not validate_username(username):
flash("Invalid username.", InfoboxKind.ERROR)
return redirect(url_for("users.sign_up"))
user_exists = Users.count({"username": username}) > 0
if user_exists:
flash(f"Username '{username}' is already taken.", InfoboxKind.ERROR)
return redirect(url_for("users.sign_up"))
if not validate_password(password):
flash("Invalid password.", InfoboxKind.ERROR)
return redirect(url_for("users.sign_up"))
if password != password_confirm:
flash("Passwords do not match.", InfoboxKind.ERROR)
return redirect(url_for("users.sign_up"))
hashed = digest(password)
new_user = Users.create({
"username": username,
"password_hash": hashed,
"permission": PermissionLevel.GUEST.value,
})
session_obj = create_session(new_user.id)
session['pyrom_session_key'] = session_obj.key
flash("Signed up successfully!", InfoboxKind.INFO)
return redirect(url_for("users.sign_up"))
@bp.get("/<username>")
def page(username):
target_user = Users.find({"username": username})
return render_template("users/user.html", target_user = target_user)
@bp.get("/<username>/settings")
@login_required
def settings(username):
target_user = Users.find({'username': username})
if target_user.id != get_active_user().id:
return redirect('.settings', username = get_active_user().username)
return render_template('users/settings.html')
@bp.post('/<username>/settings')
@login_required
def settings_form(username):
# we silently ignore the passed username
# and grab the correct user from the session
user = get_active_user()
topic_sort_by = request.form.get('topic_sort_by', default='activity')
if topic_sort_by == 'activity' or topic_sort_by == 'thread':
sort_by = session['sort_by'] = topic_sort_by
status = request.form.get('status', default="")[:100]
original_sig = request.form.get('signature', default='')
rendered_sig = babycode_to_html(original_sig)
session['subscribe_by_default'] = request.form.get('subscribe_by_default', default='on') == 'on'
user.update({
'status': status,
'signature_original_markup': original_sig,
'signature_rendered': rendered_sig,
})
flash('Settings updated.', InfoboxKind.INFO)
return redirect(url_for('.settings', username=user.username))
@bp.post('/<username>/set_avatar')
@login_required
def set_avatar(username):
user = get_active_user()
if user.is_guest():
return 'no'
if 'avatar' not in request.files:
return 'no!...'
file = request.files['avatar']
if file.filename == '':
return 'no..?'
file_bytes = file.read()
now = int(time.time())
filename = f"u{user.id}d{now}.webp"
output_path = os.path.join(current_app.config['AVATAR_UPLOAD_PATH'], filename)
proxied_filename = f"/static/avatars/{filename}"
res = validate_and_create_avatar(file_bytes, output_path)
if res:
flash('Avatar updated.', InfoboxKind.INFO)
avatar = Avatars.create({
'file_path': proxied_filename,
'uploaded_at': now,
})
old_avatar = Avatars.find({'id': user.avatar_id})
user.update({'avatar_id': avatar.id})
if int(old_avatar.id) != 1:
# delete old avi, but not default
filename = os.path.join(current_app.config['AVATAR_UPLOAD_PATH'], os.path.basename(old_avatar.file_path))
os.remove(filename)
old_avatar.delete()
return redirect(url_for('.settings', username=user.username))
else:
return 'uhhhh no'
@bp.post('/<username>/clear_avatar')
@login_required
def clear_avatar(username):
user = get_active_user()
if user.is_default_avatar():
return 'no'
old_avatar = Avatars.find({'id': user.avatar_id})
user.update({'avatar_id': 1})
# delete old avi
filename = os.path.join(current_app.config['AVATAR_UPLOAD_PATH'], os.path.basename(old_avatar.file_path))
os.remove(filename)
old_avatar.delete()
return redirect(url_for('.settings', username=user.username))
@bp.post("/log_out")
@login_required
def log_out():
user = get_active_user()
session_obj = Sessions.find({"key": session['pyrom_session_key']})
session_obj.delete()
session.clear()
return redirect(url_for(".log_in"))
@bp.post("/confirm_user/<user_id>")
@login_required
@mod_only("topics.all_topics")
def confirm_user(user_id):
target_user = Users.find({"id": user_id})
if not target_user:
return "no"
if int(target_user.permission) > PermissionLevel.GUEST.value:
return "no"
target_user.update({
"permission": PermissionLevel.USER.value,
"confirmed_on": int(time.time()),
})
return redirect(url_for(".page", username=target_user.username))
@bp.post("/mod_user/<user_id>")
@login_required
@admin_only("topics.all_topics")
def mod_user(user_id):
target_user = Users.find({"id": user_id})
if not target_user:
return "no"
if target_user.is_mod():
return "no"
target_user.update({
"permission": PermissionLevel.MODERATOR.value,
})
return redirect(url_for(".page", username=target_user.username))
@bp.post("/demod_user/<user_id>")
@login_required
@admin_only("topics.all_topics")
def demod_user(user_id):
target_user = Users.find({"id": user_id})
if not target_user:
return "no"
if not target_user.is_mod():
return "no"
target_user.update({
"permission": PermissionLevel.USER.value,
})
return redirect(url_for(".page", username=target_user.username))
@bp.post("/guest_user/<user_id>")
@login_required
@admin_only("topics.all_topics")
def guest_user(user_id):
target_user = Users.find({"id": user_id})
if not target_user:
return "no"
if target_user.is_mod():
return "no"
target_user.update({
"permission": PermissionLevel.GUEST.value,
})
return redirect(url_for(".page", username=target_user.username))
@bp.get("/<username>/inbox")
@login_required
def inbox(username):
user = get_active_user()
if username != user.username:
return redirect(url_for(".inbox", username = user.username))
new_posts = []
subscription = Subscriptions.find({"user_id": user.id})
all_subscriptions = None
total_unreads_count = None
if subscription:
all_subscriptions = user.get_all_subscriptions()
q = """
WITH thread_metadata AS (
SELECT
posts.thread_id, threads.slug AS thread_slug, threads.title AS thread_title, COUNT(*) AS unread_count, MAX(posts.created_at) AS newest_post_time
FROM
posts
LEFT JOIN
threads ON threads.id = posts.thread_id
LEFT JOIN
subscriptions ON subscriptions.thread_id = posts.thread_id
WHERE subscriptions.user_id = ? AND posts.created_at > subscriptions.last_seen
GROUP BY posts.thread_id
)
SELECT
tm.thread_id, tm.thread_slug, tm.thread_title, tm.unread_count, tm.newest_post_time,
posts.id, posts.created_at, post_history.content, post_history.edited_at, users.username, users.status, avatars.file_path AS avatar_path, posts.thread_id, users.id AS user_id, post_history.original_markup, users.signature_rendered
FROM
thread_metadata tm
JOIN
posts ON posts.thread_id = tm.thread_id
JOIN
post_history ON posts.current_revision_id = post_history.id
JOIN
users ON posts.user_id = users.id
LEFT JOIN
threads ON threads.id = posts.thread_id
LEFT JOIN
avatars ON users.avatar_id = avatars.id
LEFT JOIN
subscriptions ON subscriptions.thread_id = posts.thread_id
WHERE
subscriptions.user_id = ? AND posts.created_at > subscriptions.last_seen
ORDER BY
tm.newest_post_time DESC, posts.created_at ASC"""
new_posts_raw = db.query(q, user.id, user.id)
current_thread_id = None
current_thread_group = None
total_unreads_count = 0
for row in new_posts_raw:
if row['thread_id'] != current_thread_id:
current_thread_group = {
'thread_id': row['thread_id'],
'thread_title': row['thread_title'],
'unread_count': row['unread_count'],
'thread_slug': row['thread_slug'],
'newest_post_time': row['newest_post_time'],
'posts': [],
}
total_unreads_count += int(row['unread_count'])
new_posts.append(current_thread_group)
current_thread_id = row['thread_id']
current_thread_group['posts'].append({
'id': row['id'],
'created_at': row['created_at'],
'content': row['content'],
'edited_at': row['edited_at'],
'username': row['username'],
'status': row['status'],
'avatar_path': row['avatar_path'],
'thread_id': row['thread_id'],
'user_id': row['user_id'],
'original_markup': row['original_markup'],
'signature_rendered': row['signature_rendered']
})
return render_template("users/inbox.html", new_posts = new_posts, total_unreads_count = total_unreads_count, all_subscriptions = all_subscriptions)