Files
pyrom/app/routes/users.py
2026-05-22 00:39:27 +03:00

381 lines
12 KiB
Python

from flask import (
Blueprint, redirect, url_for,
render_template, request, session,
abort, flash, current_app
)
from functools import wraps
from secrets import compare_digest as compare_timesafe
from wand.image import Image
from wand.exceptions import WandException
from ..auth import (
digest, verify, create_session,
is_logged_in, parse_username, is_password_valid,
login_required, revoke_session, get_active_user,
parse_display_name, revoke_all_sessions
)
from ..models import Users, Posts, Reactions, Threads, Avatars
from ..constants import PermissionLevel, InfoboxKind
from ..util import get_form_checkbox
import math
import os
import time
AVATAR_MAX_SIZE = 1000 * 1000 # 1MB
bp = Blueprint('users', __name__, url_prefix='/users/')
def validate_and_create_avatar(input_image, filename):
try:
with Image(blob=input_image) as img:
if hasattr(img, 'sequence') and len(img.sequence) > 1:
img = Image(image=img.sequence[0])
img.strip()
img.gravity = 'center'
width, height = img.width, img.height
min_dim = min(width, height)
if min_dim > 256:
ratio = 256.0 / min_dim
new_width = int(width * ratio)
new_height = int(height * ratio)
img.resize(new_width, new_height)
width, height = img.width, img.height
crop_size = min(width, height)
x_offset = (width - crop_size) // 2
y_offset = (height - crop_size) // 2
img.crop(left=x_offset, top=y_offset,
width=crop_size, height=crop_size)
img.resize(256, 256)
img.format = 'webp'
img.compression_quality = 85
img.save(filename=filename)
return True
except WandException:
return False
def redirect_if_logged_in(destination='topics.all_topics'):
def decorator(view_func):
@wraps(view_func)
def wrapper(*args, **kwargs):
if is_logged_in():
return redirect(url_for(destination))
return view_func(*args, **kwargs)
return wrapper
return decorator
def redirect_to_own(view_func):
@wraps(view_func)
def wrapper(username, *args, **kwargs):
user = get_active_user()
if username.lower() != user.username:
view_args = dict(request.view_args)
view_args.pop('username', None)
new_args = {**view_args, 'username': user.username}
return redirect(url_for(request.endpoint, **new_args))
return view_func(username, *args, **kwargs)
return wrapper
def user_required(view_func):
@wraps(view_func)
def wrapper(username, *args, **kwargs):
user = get_active_user()
if user.is_guest():
abort(403)
return view_func(username, *args, **kwargs)
return wrapper
@bp.get('/log-in/')
@redirect_if_logged_in()
def log_in():
return render_template('users/log_in.html')
@bp.post('/log-in/')
@redirect_if_logged_in()
def log_in_post():
username = request.form.get('username', default='').lower()
user = Users.find({'username': username})
if not user:
return redirect(url_for('.log_in', error='The username or password you entered is incorrect.'))
password = request.form.get('password', default='')
if not verify(user.password_hash, password):
return redirect(url_for('.log_in', error='The username or password you entered is incorrect.'))
session['remember'] = request.form.get('remember') == 'on'
sess = create_session(user.id, not session['remember'])
session['pyrom_session_key'] = sess.key
if session['remember']:
session.permanent = True
return redirect(request.form.get('return_to', default=url_for('topics.all_topics')))
@bp.post('/log-out/')
@login_required
def log_out():
revoke_session(get_active_user().id)
return redirect(url_for('topics.all_topics'))
@bp.get('/sign-up/')
@redirect_if_logged_in()
def sign_up():
return render_template('users/sign_up.html')
@bp.post('/sign-up/')
@redirect_if_logged_in()
def sign_up_post():
generic_error_page = redirect(url_for('.sign_up', error='The username or password you entered is invalid.'))
invalid_username_error_page = redirect(url_for('.sign_up', error='This username cannot be used. Please pick another.'))
passwords_error_page = redirect(url_for('.sign_up', error='The passwords do not match.'))
username = request.form.get('username', default='')
if not username:
return generic_error_page
if request.form.get('password') is None:
return generic_error_page
if len(request.form.getlist('password')) != 2:
return passwords_error_page
try:
username_pair = parse_username(username)
except ValueError:
return invalid_username_error_page
potential_user = Users.find({'username': username})
if potential_user:
return invalid_username_error_page
if not compare_timesafe(request.form.getlist('password')[0], request.form.getlist('password')[1]):
return passwords_error_page
password_hash = digest(request.form.get('password'))
user = Users.create({
'username': username_pair[0],
'password_hash': password_hash,
'permission': PermissionLevel.GUEST.value,
'created_at': int(time.time()),
})
if username_pair[0] != username_pair[1]:
user.update({
'display_name': parse_display_name(username_pair[1])
})
session['remember'] = request.form.get('remember') == 'on'
sess = create_session(user.id, not session['remember'])
session['pyrom_session_key'] = sess.key
if session['remember']:
session.permanent = True
return redirect(url_for('topics.all_topics'))
@bp.get('/<username>/')
def user_page(username):
username = username.lower()
target_user = Users.find({'username': username})
if not target_user:
abort(404)
return render_template('users/user_page.html', target_user=target_user)
@bp.get('/<username>/posts/')
def posts(username):
username = username.lower()
if username == 'deleteduser':
abort(404)
target_user = Users.find({'username': username})
if not target_user:
abort(404)
PER_PAGE = 10
posts_count = Posts.count({'user_id': target_user.id})
page_count = max(1, math.ceil(posts_count / PER_PAGE))
page = 1
try:
page = max(1, min(int(request.args.get('page', default=1)), page_count))
except ValueError:
abort(404)
posts = target_user.get_posts(PER_PAGE, page)
return render_template(
'users/posts.html', posts=posts,
page=page, page_count=page_count,
target_user=target_user, Reactions=Reactions
)
@bp.get('/<username>/threads/')
def threads(username):
username = username.lower()
if username == 'deleteduser':
abort(404)
target_user = Users.find({'username': username})
if not target_user:
abort(404)
PER_PAGE = 10
threads_count = Threads.count({'user_id': target_user.id})
page_count = max(1, math.ceil(threads_count / PER_PAGE))
page = 1
try:
page = max(1, min(int(request.args.get('page', default=1)), page_count))
except ValueError:
abort(404)
threads = target_user.get_started_threads(PER_PAGE, page)
return render_template(
'users/threads.html', threads=threads,
page=page, page_count=page_count,
target_user=target_user, Reactions=Reactions
)
@bp.get('/<username>/comments/')
def comments(username):
username = username.lower()
if username == 'deleteduser':
abort(404)
return 'stub'
@bp.get('/<username>/settings/')
@login_required
@redirect_to_own
def settings(username):
user = get_active_user()
sort_by = session.get('sort_by', 'activity')
return render_template(
'users/settings.html', user=user,
sort_by=sort_by
)
@bp.post('/<username>/settings/set-avatar')
@login_required
@user_required
@redirect_to_own
def set_avatar(username):
user = get_active_user()
return_to = redirect(url_for('.settings', username=user.username))
if 'avatar' not in request.files:
abort(400)
avi_file = request.files['avatar']
if avi_file.filename == '':
abort(400)
avi_file.seek(0, os.SEEK_END)
file_size = avi_file.tell()
avi_file.seek(0, os.SEEK_SET)
if file_size > AVATAR_MAX_SIZE:
flash('Your avatar must be 1MB or less.', InfoboxKind.ERROR)
return return_to
avi_bytes = avi_file.read()
now = int(time.time())
filename = f'u{user.id}d{now}.webp'
output_path = os.path.join(current_app.config['AVATAR_UPLOAD_PATH'], filename)
proxied_filename = f'/static/avatars/{filename}'
res = validate_and_create_avatar(avi_bytes, output_path)
if res:
flash('Avatar updated.', InfoboxKind.INFO)
avatar = Avatars.create({
'file_path': proxied_filename,
'uploaded_at': now,
})
old_avatar = Avatars.find({'id': user.avatar_id})
user.update({'avatar_id': avatar.id})
if int(old_avatar.id) != 1:
filename = os.path.join(current_app.config['AVATAR_UPLOAD_PATH'], os.path.basename(old_avatar.file_path))
os.remove(filename)
old_avatar.delete()
return return_to
else:
flash('Something went wrong.;Please try again.', InfoboxKind.ERROR)
return return_to
@bp.post('/<username>/settings/clear-avatar')
@login_required
@user_required
@redirect_to_own
def clear_avatar(username):
user = get_active_user()
if user.is_default_avatar():
return redirect(url_for('.settings', username=username))
old_avatar = Avatars.find({'id': user.avatar_id})
user.update({'avatar_id': 1})
filename = os.path.join(current_app.config['AVATAR_UPLOAD_PATH'], os.path.basename(old_avatar.file_path))
os.remove(filename)
old_avatar.delete()
return redirect(url_for('.settings', username=username))
@bp.post('/<username>/settings/change-password')
@login_required
@redirect_to_own
def change_password(username):
user = get_active_user()
current_password = request.form.get('current_password', '')
new_password = request.form.get('new_password', '')
new_password2 = request.form.get('new_password2', '')
new_hash = digest(new_password)
old_correct = verify(user.password_hash, current_password)
new_match = compare_timesafe(new_password, new_password2)
if (old_correct and new_match) == False:
flash('The current password is incorrect or the new passwords do not match.;Please try again.', InfoboxKind.ERROR)
return redirect(url_for('.settings', username=username))
user.update({
'password_hash': new_hash
})
revoke_all_sessions(user.id)
return redirect(url_for('.log_in'))
@bp.post('/<username>/settings/set-personalization')
@login_required
@user_required
@redirect_to_own
def set_personalization(username):
user = get_active_user()
session['sort_by'] = request.form.get('sort_by', 'activity')
session['dont_subscribe_by_default'] = not get_form_checkbox('subscribe_by_default')
user.update({
'status': request.form.get('status', '')[:100],
'display_name': parse_display_name(request.form.get('display_name', ''))
})
flash('Personalization settings updated.', InfoboxKind.INFO)
return redirect(url_for('.settings', username=username))
@bp.post('/<username>/settings/set-sig')
@login_required
@user_required
@redirect_to_own
def set_sig(username):
user = get_active_user()
user.set_signature(request.form.get('babycode_content', ''))
flash('Signature updated.', InfoboxKind.INFO)
return redirect(url_for('.settings', username=username))
@bp.post('/<username>/settings/set-bio')
@login_required
@user_required
@redirect_to_own
def set_bio(username):
return 'stub'
@bp.get('/<username>/inbox/')
@login_required
@redirect_to_own
def inbox(username):
user = get_active_user()
unread_count = user.get_unread_count()
subscriptions = user.get_all_subscriptions()
return render_template('users/inbox.html', unread_count=unread_count, subscriptions=subscriptions)
@bp.get('/<username>/bookmarks/')
@login_required
@redirect_to_own
def bookmarks(username):
username = username.lower()
return 'stub'