Files
pyrom/app/routes/users.py
2025-12-04 10:27:49 +03:00

846 lines
28 KiB
Python

from flask import (
Blueprint, render_template, request, redirect, url_for, flash, session, current_app, abort
)
from functools import wraps
from ..db import db
from ..lib.babycode import babycode_to_html, BABYCODE_VERSION
from ..models import (
Users, Sessions, Subscriptions,
Avatars, PasswordResetLinks, InviteKeys,
BookmarkCollections, BookmarkedThreads,
Mentions, PostHistory,
)
from ..constants import InfoboxKind, PermissionLevel, SIG_BANNED_TAGS
from ..auth import digest, verify
from wand.image import Image
from wand.exceptions import WandException
from datetime import datetime, timedelta
import secrets
import time
import re
import os
bp = Blueprint("users", __name__, url_prefix = "/users/")
def validate_and_create_avatar(input_image, filename):
try:
with Image(blob=input_image) as img:
img.strip()
img.gravity = 'center'
width, height = img.width, img.height
min_dim = min(width, height)
if min_dim > 256:
ratio = 256.0 / min_dim
new_width = int(width * ratio)
new_height = int(height * ratio)
img.resize(new_width, new_height)
width, height = img.width, img.height
crop_size = min(width, height)
x_offset = (width - crop_size) // 2
y_offset = (height - crop_size) // 2
img.crop(left=x_offset, top=y_offset,
width=crop_size, height=crop_size)
img.resize(256, 256)
img.format = 'webp'
img.compression_quality = 85
img.save(filename=filename)
return True
except WandException:
return False
def is_logged_in():
return "pyrom_session_key" in session
def get_active_user():
if not is_logged_in():
return None
sess = Sessions.find({"key": session["pyrom_session_key"]})
if not sess:
return None
return Users.find({"id": sess.user_id})
def create_session(user_id):
key = secrets.token_hex(16)
expires_at = int(time.time()) + 31 * 24 * 60 * 60
s = Sessions.create({
"key": key,
"user_id": user_id,
"expires_at": expires_at,
})
return s
def extend_session(user_id):
session_obj = Sessions.find({'key': session['pyrom_session_key']})
if not session_obj:
return
new_duration = timedelta(31)
current_app.permanent_session_lifetime = new_duration
session.modified = True
session_obj.update({
'expires_at': int(time.time()) + 31 * 24 * 60 * 60
})
def validate_password(password):
pattern = r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[\W_])(?!.*\s).{10,255}$'
return bool(re.fullmatch(pattern, password))
def validate_username(username):
pattern = r'^[a-zA-Z0-9_-]{3,20}$'
return bool(re.fullmatch(pattern, username))
def validate_display_name(display_name):
if not display_name:
return True
pattern = r'^[\w!#$%^*\(\)\-_=+\[\]\{\}\|;:,.?\s]{3,50}$'
display_name = display_name.replace('@', '_')
return bool(re.fullmatch(pattern, display_name))
def redirect_if_logged_in(*args, **kwargs):
def decorator(view_func):
@wraps(view_func)
def wrapper(*view_args, **view_kwargs):
if is_logged_in():
# resolve callables
processed_kwargs = {
k: v(**view_kwargs) if callable(v) else v
for k, v in kwargs.items()
}
endpoint = args[0] if args else processed_kwargs.get("endpoint")
if endpoint.startswith("."):
blueprint = current_app.blueprints.get(view_func.__name__.split(".")[0])
if blueprint:
endpoint = endpoint.lstrip(".")
return redirect(url_for(f"{blueprint.name}.{endpoint}", **processed_kwargs))
return redirect(url_for(*args, **processed_kwargs))
return view_func(*view_args, **view_kwargs)
return wrapper
return decorator
def redirect_to_own(view_func):
@wraps(view_func)
def wrapper(username, *args, **kwargs):
user = get_active_user()
if username.lower() != user.username:
view_args = dict(request.view_args)
view_args.pop('username', None)
new_args = {**view_args, 'username': user.username}
return redirect(url_for(request.endpoint, **new_args))
return view_func(username, *args, **kwargs)
return wrapper
def login_required(view_func):
@wraps(view_func)
def wrapper(*args, **kwargs):
if not is_logged_in():
return redirect(url_for("users.log_in"))
return view_func(*args, **kwargs)
return wrapper
def mod_only(*args, **kwargs):
def decorator(view_func):
@wraps(view_func)
def wrapper(*view_args, **view_kwargs):
if not get_active_user().is_mod():
# resolve callables
processed_kwargs = {
k: v(**view_kwargs) if callable(v) else v
for k, v in kwargs.items()
}
endpoint = args[0] if args else processed_kwargs.get("endpoint")
if endpoint.startswith("."):
blueprint = current_app.blueprints.get(view_func.__name__.split(".")[0])
if blueprint:
endpoint = endpoint.lstrip(".")
return redirect(url_for(f"{blueprint.name}.{endpoint}", **processed_kwargs))
return redirect(url_for(*args, **processed_kwargs))
return view_func(*view_args, **view_kwargs)
return wrapper
return decorator
def admin_only(*args, **kwargs):
def decorator(view_func):
@wraps(view_func)
def wrapper(*view_args, **view_kwargs):
if not get_active_user().is_admin():
# resolve callables
processed_kwargs = {
k: v(**view_kwargs) if callable(v) else v
for k, v in kwargs.items()
}
endpoint = args[0] if args else processed_kwargs.get("endpoint")
if endpoint.startswith("."):
blueprint = current_app.blueprints.get(view_func.__name__.split(".")[0])
if blueprint:
endpoint = endpoint.lstrip(".")
return redirect(url_for(f"{blueprint.name}.{endpoint}", **processed_kwargs))
return redirect(url_for(*args, **processed_kwargs))
return view_func(*view_args, **view_kwargs)
return wrapper
return decorator
def get_prefers_theme():
if not 'theme' in session:
return 'style'
if session['theme'] not in current_app.config['allowed_themes']:
return 'style'
return session['theme']
def anonymize_user(user_id):
deleted_user = Users.find({'username': 'deleteduser'})
from ..models import Threads, Posts
from ..lib.babycode import sanitize
threads = Threads.findall({'user_id': user_id})
posts = Posts.findall({'user_id': user_id})
revs_q = """SELECT DISTINCT m.revision_id FROM mentions m
WHERE m.mentioned_user_id = ?"""
mentioned_revs = db.query(revs_q, int(user_id))
with db.transaction():
for thread in threads:
thread.update({'user_id': int(deleted_user.id)})
for post in posts:
post.update({'user_id': int(deleted_user.id)})
revs = {}
for rev in mentioned_revs:
ph = PostHistory.find({'id': int(rev['revision_id'])})
ms = Mentions.findall({
'mentioned_user_id': int(user_id),
'revision_id': int(rev['revision_id'])
})
data = {
'text': sanitize(ph.original_markup),
'mentions': ms,
}
data['mentions'] = sorted(data['mentions'], key=lambda x: int(x.end_index), reverse=True)
revs[rev['revision_id']] = data
for rev_id, data in revs.items():
text = data['text']
for mention in data['mentions']:
text = text[:mention.start_index] + '@deleteduser' + text[mention.end_index:]
mention.delete()
res = babycode_to_html(text)
ph = PostHistory.find({'id': int(rev_id)})
ph.update({
'original_markup': text.unescape(),
'content': res.result,
})
@bp.get("/log_in")
@redirect_if_logged_in(".page", username = lambda: get_active_user().username)
def log_in():
return render_template("users/log_in.html")
@bp.post("/log_in")
@redirect_if_logged_in(".page", username = lambda: get_active_user().username)
def log_in_post():
target_user = Users.find({
"username": request.form['username'].lower()
})
if not target_user:
flash("Incorrect username or password.", InfoboxKind.ERROR)
return redirect(url_for("users.log_in"))
if not verify(target_user.password_hash, request.form['password']):
flash("Incorrect username or password.", InfoboxKind.ERROR)
return redirect(url_for("users.log_in"))
session_obj = create_session(target_user.id)
session['pyrom_session_key'] = session_obj.key
flash("Logged in!", InfoboxKind.INFO)
return redirect(url_for("users.log_in"))
@bp.get("/sign_up")
@redirect_if_logged_in(".page", username = lambda: get_active_user().username)
def sign_up():
if current_app.config['DISABLE_SIGNUP']:
key = request.args.get('key', default=None)
if key is None:
return redirect(url_for('topics.all_topics'))
invite = InviteKeys.find({'key': key})
if not invite:
return redirect(url_for('topics.all_topics'))
inviter = Users.find({'id': invite.created_by})
return render_template("users/sign_up.html", inviter=inviter, key=key)
return render_template("users/sign_up.html")
@bp.post("/sign_up")
@redirect_if_logged_in(".page", username = lambda: get_active_user().username)
def sign_up_post():
key = request.form.get('key', default=None)
if current_app.config['DISABLE_SIGNUP']:
if not key:
return redirect(url_for("topics.all_topics"))
invite_key = InviteKeys.find({'key': key})
if not invite_key:
return redirect(url_for("topics.all_topics"))
username = request.form['username']
password = request.form['password']
password_confirm = request.form['password-confirm']
if not validate_username(username):
flash("Invalid username.", InfoboxKind.ERROR)
return redirect(url_for("users.sign_up", key=key))
user_exists = Users.count({"username": username.lower()}) > 0
if user_exists:
flash(f"Username '{username}' is already taken.", InfoboxKind.ERROR)
return redirect(url_for("users.sign_up", key=key))
if not validate_password(password):
flash("Invalid password.", InfoboxKind.ERROR)
return redirect(url_for("users.sign_up", key=key))
if password != password_confirm:
flash("Passwords do not match.", InfoboxKind.ERROR)
return redirect(url_for("users.sign_up", key=key))
hashed = digest(password)
if username.lower() != username:
display_name = username
else:
display_name = ''
with db.transaction():
new_user = Users.create({
"username": username.lower(),
'display_name': display_name,
"password_hash": hashed,
"permission": PermissionLevel.GUEST.value,
})
BookmarkCollections.create_default(new_user.id)
if current_app.config['DISABLE_SIGNUP']:
invite_key = InviteKeys.find({'key': key})
new_user.update({
'invited_by': invite_key.created_by,
'permission': PermissionLevel.USER.value,
})
invite_key.delete()
session_obj = create_session(new_user.id)
session['pyrom_session_key'] = session_obj.key
flash("Signed up successfully!", InfoboxKind.INFO)
return redirect(url_for("topics.all_topics"))
@bp.get("/<username>")
def page(username):
target_user = Users.find({"username": username.lower()})
if not target_user:
abort(404)
return render_template("users/user.html", target_user = target_user)
@bp.get("/<username>/settings")
@login_required
@redirect_to_own
def settings(username):
return render_template('users/settings.html')
@bp.post('/<username>/settings')
@login_required
@redirect_to_own
def settings_form(username):
# we silently ignore the passed username
# and grab the correct user from the session
user = get_active_user()
theme = request.form.get('theme', default='style')
if theme == 'style':
if 'theme' in session:
session.pop('theme')
else:
session['theme'] = theme
topic_sort_by = request.form.get('topic_sort_by', default='activity')
if topic_sort_by == 'activity' or topic_sort_by == 'thread':
sort_by = session['sort_by'] = topic_sort_by
status = request.form.get('status', default="")[:100]
original_sig = request.form.get('signature', default='').strip()
if original_sig:
rendered_sig = babycode_to_html(original_sig, SIG_BANNED_TAGS).result
else:
rendered_sig = ''
session['subscribe_by_default'] = request.form.get('subscribe_by_default', default='off') == 'on'
display_name = request.form.get('display_name', default='')
if not validate_display_name(display_name):
flash('Invalid display name.', InfoboxKind.ERROR)
return redirect('.settings', username=user.username)
old_dn = user.display_name
user.update({
'status': status,
'signature_original_markup': original_sig,
'signature_rendered': rendered_sig,
'signature_format_version': BABYCODE_VERSION,
'signature_markup_language': 'babycode',
'display_name': display_name,
})
if old_dn != display_name:
# re-parse mentions
q = """SELECT DISTINCT m.revision_id FROM mentions m
JOIN post_history ph ON m.revision_id = ph.id
JOIN posts p ON p.current_revision_id = ph.id
WHERE m.mentioned_user_id = ?"""
mentions = db.query(q, int(user.id))
with db.transaction():
for mention in mentions:
rev = PostHistory.find({'id': int(mention['revision_id'])})
parsed_content = babycode_to_html(rev.original_markup).result
rev.update({'content': parsed_content})
flash('Settings updated.', InfoboxKind.INFO)
return redirect(url_for('.settings', username=user.username))
@bp.post('/<username>/set_avatar')
@login_required
@redirect_to_own
def set_avatar(username):
user = get_active_user()
if user.is_guest():
flash('You are a guest. Your account must be confirmed by a moderator to perform this action.', InfoboxKind.ERROR)
return redirect(url_for('.settings', username=user.username))
if 'avatar' not in request.files:
flash('Avatar missing.', InfoboxKind.ERROR)
return redirect(url_for('.settings', username=user.username))
file = request.files['avatar']
if file.filename == '':
flash('Avatar missing.', InfoboxKind.ERROR)
return redirect(url_for('.settings', username=user.username))
file_bytes = file.read()
now = int(time.time())
filename = f"u{user.id}d{now}.webp"
output_path = os.path.join(current_app.config['AVATAR_UPLOAD_PATH'], filename)
proxied_filename = f"/static/avatars/{filename}"
res = validate_and_create_avatar(file_bytes, output_path)
if res:
flash('Avatar updated.', InfoboxKind.INFO)
avatar = Avatars.create({
'file_path': proxied_filename,
'uploaded_at': now,
})
old_avatar = Avatars.find({'id': user.avatar_id})
user.update({'avatar_id': avatar.id})
if int(old_avatar.id) != 1:
# delete old avi, but not default
filename = os.path.join(current_app.config['AVATAR_UPLOAD_PATH'], os.path.basename(old_avatar.file_path))
os.remove(filename)
old_avatar.delete()
return redirect(url_for('.settings', username=user.username))
else:
flash('Something went wrong. Please try again later.', InfoboxKind.WARN)
return redirect(url_for('.settings', username=user.username))
@bp.post('/<username>/change_password')
@login_required
@redirect_to_own
def change_password(username):
user = get_active_user()
password = request.form.get('new_password')
password2 = request.form.get('new_password2')
if not validate_password(password):
flash("Invalid password.", InfoboxKind.ERROR)
return redirect(url_for('.settings', username=user.username))
if password != password2:
flash("Passwords do not match.", InfoboxKind.ERROR)
return redirect(url_for('.settings', username=user.username))
hashed = digest(password)
user.update({'password_hash': hashed})
extend_session(user.id)
flash('Password updated.', InfoboxKind.INFO)
return redirect(url_for('.settings', username=user.username))
@bp.post('/<username>/clear_avatar')
@login_required
@redirect_to_own
def clear_avatar(username):
user = get_active_user()
if user.is_default_avatar():
return redirect(url_for('.settings', user.username))
old_avatar = Avatars.find({'id': user.avatar_id})
user.update({'avatar_id': 1})
# delete old avi
filename = os.path.join(current_app.config['AVATAR_UPLOAD_PATH'], os.path.basename(old_avatar.file_path))
os.remove(filename)
old_avatar.delete()
return redirect(url_for('.settings', username=user.username))
@bp.post("/log_out")
@login_required
def log_out():
user = get_active_user()
session_obj = Sessions.find({"key": session['pyrom_session_key']})
session_obj.delete()
session.clear()
return redirect(url_for(".log_in"))
@bp.post("/confirm_user/<user_id>")
@login_required
@mod_only("topics.all_topics")
def confirm_user(user_id):
target_user = Users.find({"id": user_id})
if not target_user:
return redirect(url_for('.all_topics'))
if int(target_user.permission) > PermissionLevel.GUEST.value:
return redirect(url_for('.page', username=target_user.username))
target_user.update({
"permission": PermissionLevel.USER.value,
"confirmed_on": int(time.time()),
})
return redirect(url_for(".page", username=target_user.username))
@bp.post("/mod_user/<user_id>")
@login_required
@admin_only("topics.all_topics")
def mod_user(user_id):
target_user = Users.find({"id": user_id})
if not target_user:
return redirect(url_for('.all_topics'))
if target_user.is_mod():
return redirect(url_for('.page', username=target_user.username))
target_user.update({
"permission": PermissionLevel.MODERATOR.value,
})
return redirect(url_for(".page", username=target_user.username))
@bp.post("/demod_user/<user_id>")
@login_required
@admin_only("topics.all_topics")
def demod_user(user_id):
target_user = Users.find({"id": user_id})
if not target_user:
return redirect(url_for('.all_topics'))
if not target_user.is_mod():
return redirect(url_for('.page', username=target_user.username))
target_user.update({
"permission": PermissionLevel.USER.value,
})
return redirect(url_for(".page", username=target_user.username))
@bp.post("/guest_user/<user_id>")
@login_required
@mod_only("topics.all_topics")
def guest_user(user_id):
target_user = Users.find({"id": user_id})
if not target_user:
return redirect(url_for('.all_topics'))
if get_active_user().is_mod_only() and target_user.is_mod():
return redirect(url_for('.page', username=target_user.username))
target_user.update({
"permission": PermissionLevel.GUEST.value,
})
return redirect(url_for(".page", username=target_user.username))
@bp.get("/<username>/inbox")
@login_required
@redirect_to_own
def inbox(username):
user = get_active_user()
new_posts = []
subscription = Subscriptions.find({"user_id": user.id})
all_subscriptions = None
total_unreads_count = None
if subscription:
all_subscriptions = user.get_all_subscriptions()
q = """
WITH thread_metadata AS (
SELECT
posts.thread_id, threads.slug AS thread_slug, threads.title AS thread_title, COUNT(*) AS unread_count, MAX(posts.created_at) AS newest_post_time
FROM
posts
LEFT JOIN
threads ON threads.id = posts.thread_id
LEFT JOIN
subscriptions ON subscriptions.thread_id = posts.thread_id
WHERE subscriptions.user_id = ? AND posts.created_at > subscriptions.last_seen
GROUP BY posts.thread_id
)
SELECT
tm.thread_id, tm.thread_slug, tm.thread_title, tm.unread_count, tm.newest_post_time,
posts.id, posts.created_at, post_history.content, post_history.edited_at, users.username, users.status, avatars.file_path AS avatar_path, posts.thread_id, users.id AS user_id, post_history.original_markup, users.signature_rendered
FROM
thread_metadata tm
JOIN
posts ON posts.thread_id = tm.thread_id
JOIN
post_history ON posts.current_revision_id = post_history.id
JOIN
users ON posts.user_id = users.id
LEFT JOIN
threads ON threads.id = posts.thread_id
LEFT JOIN
avatars ON users.avatar_id = avatars.id
LEFT JOIN
subscriptions ON subscriptions.thread_id = posts.thread_id
WHERE
subscriptions.user_id = ? AND posts.created_at > subscriptions.last_seen
ORDER BY
tm.newest_post_time DESC, posts.created_at ASC"""
new_posts_raw = db.query(q, user.id, user.id)
current_thread_id = None
current_thread_group = None
total_unreads_count = 0
for row in new_posts_raw:
if row['thread_id'] != current_thread_id:
current_thread_group = {
'thread_id': row['thread_id'],
'thread_title': row['thread_title'],
'unread_count': row['unread_count'],
'thread_slug': row['thread_slug'],
'newest_post_time': row['newest_post_time'],
'posts': [],
}
total_unreads_count += int(row['unread_count'])
new_posts.append(current_thread_group)
current_thread_id = row['thread_id']
current_thread_group['posts'].append({
'id': row['id'],
'created_at': row['created_at'],
'content': row['content'],
'edited_at': row['edited_at'],
'username': row['username'],
'status': row['status'],
'avatar_path': row['avatar_path'],
'thread_id': row['thread_id'],
'user_id': row['user_id'],
'original_markup': row['original_markup'],
'signature_rendered': row['signature_rendered'],
'thread_slug': row['thread_slug'],
})
return render_template("users/inbox.html", new_posts = new_posts, total_unreads_count = total_unreads_count, all_subscriptions = all_subscriptions)
@bp.get('/reset-link/<key>')
def reset_link_login(key):
reset_link = PasswordResetLinks.find({
'key': key
})
if not reset_link:
return redirect(url_for('topics.all_topics'))
if int(time.time()) > int(reset_link.expires_at):
reset_link.delete()
return redirect(url_for('topics.all_topics'))
target_user = Users.find({
'id': reset_link.user_id
})
return render_template('users/reset_link_login.html', username = target_user.username)
@bp.post('/reset-link/<key>')
def reset_link_login_form(key):
reset_link = PasswordResetLinks.find({
'key': key
})
if not reset_link:
return redirect('topics.all_topics')
if int(time.time()) > int(reset_link.expires_at):
reset_link.delete()
return redirect('topics.all_topics')
password = request.form.get('password')
password2 = request.form.get('password2')
if not validate_password(password):
flash("Invalid password.", InfoboxKind.ERROR)
return redirect(url_for('.reset_link_login', key=key))
if password != password2:
flash("Passwords do not match.", InfoboxKind.ERROR)
return redirect(url_for('.reset_link_login', key=key))
target_user = Users.find({
'id': reset_link.user_id
})
reset_link.delete()
hashed = digest(password)
target_user.update({'password_hash': hashed})
session_obj = create_session(target_user.id)
session['pyrom_session_key'] = session_obj.key
flash("Logged in!", InfoboxKind.INFO)
return redirect(url_for('.page', username=target_user.username))
@bp.get('/<username>/invite-links/')
@login_required
@redirect_to_own
def invite_links(username):
target_user = Users.find({
'username': username.lower()
})
if not target_user or not target_user.can_invite():
return redirect(url_for('.page', username=username))
invites = InviteKeys.findall({
'created_by': target_user.id
})
return render_template('users/invite_links.html', invites=invites)
@bp.post('/<username>/invite-links/create')
@login_required
@redirect_to_own
def create_invite_link(username):
target_user = Users.find({
'username': username.lower()
})
if not target_user or not target_user.can_invite():
return redirect(url_for('.page', username=username.lower()))
invite = InviteKeys.create({
'created_by': target_user.id,
'key': secrets.token_urlsafe(20),
})
return redirect(url_for('.invite_links', username=target_user.username))
@bp.post('/<username>/invite-links/revoke')
@login_required
@redirect_to_own
def revoke_invite_link(username):
target_user = Users.find({
'username': username.lower()
})
if not target_user or not target_user.can_invite():
return redirect(url_for('.page', username=username.lower()))
invite = InviteKeys.find({
'key': request.form.get('key'),
})
if not invite:
return redirect(url_for('.invite_links', username=target_user.username))
if invite.created_by != target_user.id:
return redirect(url_for('.invite_links', username=target_user.username))
invite.delete()
return redirect(url_for('.invite_links', username=target_user.username))
@bp.get('/<username>/bookmarks')
@login_required
@redirect_to_own
def bookmarks(username):
target_user = get_active_user()
collections = target_user.get_bookmark_collections()
return render_template('users/bookmarks.html', collections=collections)
@bp.get('/<username>/bookmarks/collections')
@login_required
@redirect_to_own
def bookmark_collections(username):
target_user = get_active_user()
collections = target_user.get_bookmark_collections()
return render_template('users/bookmark_collections.html', collections=collections)
@bp.get('/<username>/delete-account')
@login_required
@redirect_to_own
def delete_page(username):
target_user = get_active_user()
return render_template('users/delete_page.html')
@bp.post('/<username>/delete-account')
@login_required
@redirect_to_own
def delete_page_confirm(username):
target_user = get_active_user()
password = request.form.get('password', default='')
if not verify(target_user.password_hash, password):
flash('Incorrect password.', InfoboxKind.ERROR)
return redirect(url_for('.delete_page', username=username))
anonymize_user(target_user.id)
sessions = Sessions.findall({'user_id': int(target_user.id)})
for session_obj in sessions:
session_obj.delete()
session.clear()
target_user.delete()
return redirect(url_for('topics.all_topics'))