from flask import ( Blueprint, redirect, url_for, render_template, request, session, abort, flash, current_app ) from functools import wraps from secrets import compare_digest as compare_timesafe from wand.image import Image from wand.color import Color from wand.exceptions import WandException from ..auth import ( digest, verify, create_session, is_logged_in, parse_username, is_password_valid, login_required, revoke_session, get_active_user, parse_display_name, revoke_all_sessions, csrf_verified ) from ..models import Users, Posts, Reactions, Threads, Avatars, PostHistory, Mentions, BookmarkCollections from ..constants import PermissionLevel, InfoboxKind from ..util import get_form_checkbox import math import os import time AVATAR_MAX_SIZE = 1000 * 1000 # 1MB bp = Blueprint('users', __name__, url_prefix='/users/') def validate_and_create_avatar(input_image, filename): try: with Image(blob=input_image) as img: if hasattr(img, 'sequence') and len(img.sequence) > 1: img = Image(image=img.sequence[0]) img.strip() img.gravity = 'center' width, height = img.width, img.height min_dim = min(width, height) if min_dim > 256: ratio = 256.0 / min_dim new_width = int(width * ratio) new_height = int(height * ratio) img.resize(new_width, new_height) width, height = img.width, img.height crop_size = min(width, height) x_offset = (width - crop_size) // 2 y_offset = (height - crop_size) // 2 img.crop(left=x_offset, top=y_offset, width=crop_size, height=crop_size) img.resize(256, 256) img.background_color = Color('white') img.alpha_channel = 'remove' img.format = 'webp' img.compression_quality = 85 img.save(filename=filename) return True except WandException: return False def anonymize_user(user_id): deleted_user = Users.find({'username': 'deleteduser'}) from ..lib.babycode import sanitize, babycode_to_html from ..db import db threads = Threads.findall({'user_id': user_id}) posts = Posts.findall({'user_id': user_id}) revs_q = """SELECT DISTINCT m.revision_id FROM mentions m WHERE m.mentioned_user_id = ?""" mentioned_revs = db.query(revs_q, int(user_id)) with db.transaction(): for thread in threads: thread.update({'user_id': int(deleted_user.id)}) for post in posts: post.update({'user_id': int(deleted_user.id)}) revs = {} for rev in mentioned_revs: ph = PostHistory.find({'id': int(rev['revision_id'])}) ms = Mentions.findall({ 'mentioned_user_id': int(user_id), 'revision_id': int(rev['revision_id']), }) data = { 'text': sanitize(ph.original_markup), 'mentions': ms, } data['mentions'] = sorted(data['mentions'], key=lambda x: int(x.end_index), reverse=True) revs[rev['revision_id']] = data for rev_id, data in revs.items(): text = data['text'] for mention in data['mentions']: text = text[:mention.start_index] + '@deleteduser' + text[mention.end_index:] mention.delete() res = babycode_to_html(text) ph = PostHistory.find({'id': int(rev_id)}) ph.update({ 'original_markup': text.unescape(), 'content': res.result, }) def redirect_if_logged_in(destination='topics.all_topics'): def decorator(view_func): @wraps(view_func) def wrapper(*args, **kwargs): if is_logged_in(): return redirect(url_for(destination)) return view_func(*args, **kwargs) return wrapper return decorator def redirect_to_own(view_func): @wraps(view_func) def wrapper(username, *args, **kwargs): user = get_active_user() if username.lower() != user.username: view_args = dict(request.view_args) view_args.pop('username', None) new_args = {**view_args, 'username': user.username} return redirect(url_for(request.endpoint, **new_args)) return view_func(username, *args, **kwargs) return wrapper def user_required(view_func): @wraps(view_func) def wrapper(username, *args, **kwargs): user = get_active_user() if user.is_guest(): abort(403) return view_func(username, *args, **kwargs) return wrapper @bp.get('/log-in/') @redirect_if_logged_in() def log_in(): return render_template('users/log_in.html') @bp.post('/log-in/') @redirect_if_logged_in() def log_in_post(): username = request.form.get('username', default='').lower() user = Users.find({'username': username}) if not user: return redirect(url_for('.log_in', error='The username or password you entered is incorrect.')) password = request.form.get('password', default='') if not verify(user.password_hash, password): return redirect(url_for('.log_in', error='The username or password you entered is incorrect.')) session['remember'] = request.form.get('remember') == 'on' sess = create_session(user.id, not session['remember']) session['pyrom_session_key'] = sess.key if session['remember']: session.permanent = True return redirect(request.form.get('return_to', default=url_for('topics.all_topics'))) @bp.post('/log-out/') @login_required def log_out(): revoke_session(get_active_user().id) return redirect(url_for('topics.all_topics')) @bp.get('/sign-up/') @redirect_if_logged_in() def sign_up(): return render_template('users/sign_up.html') @bp.post('/sign-up/') @redirect_if_logged_in() def sign_up_post(): generic_error_page = redirect(url_for('.sign_up', error='The username or password you entered is invalid.')) invalid_username_error_page = redirect(url_for('.sign_up', error='This username cannot be used. Please pick another.')) passwords_error_page = redirect(url_for('.sign_up', error='The passwords do not match.')) username = request.form.get('username', default='') if not username: return generic_error_page if request.form.get('password') is None: return generic_error_page if len(request.form.getlist('password')) != 2: return passwords_error_page try: username_pair = parse_username(username) except ValueError: return invalid_username_error_page potential_user = Users.find({'username': username}) if potential_user: return invalid_username_error_page if not compare_timesafe(request.form.getlist('password')[0], request.form.getlist('password')[1]): return passwords_error_page password_hash = digest(request.form.get('password')) user = Users.create({ 'username': username_pair[0], 'password_hash': password_hash, 'permission': PermissionLevel.GUEST.value, 'created_at': int(time.time()), }) BookmarkCollections.create_default(user.id) if username_pair[0] != username_pair[1]: user.update({ 'display_name': parse_display_name(username_pair[1]) }) session['remember'] = request.form.get('remember') == 'on' sess = create_session(user.id, not session['remember']) session['pyrom_session_key'] = sess.key if session['remember']: session.permanent = True return redirect(url_for('topics.all_topics')) @bp.get('//') def user_page(username): username = username.lower() target_user = Users.find({'username': username}) if not target_user: abort(404) return render_template('users/user_page.html', target_user=target_user) @bp.get('//posts/') def posts(username): username = username.lower() if username == 'deleteduser': abort(404) target_user = Users.find({'username': username}) if not target_user: abort(404) PER_PAGE = 10 posts_count = Posts.count({'user_id': target_user.id}) page_count = max(1, math.ceil(posts_count / PER_PAGE)) page = 1 try: page = max(1, min(int(request.args.get('page', default=1)), page_count)) except ValueError: abort(404) posts = target_user.get_posts(PER_PAGE, page) return render_template( 'users/posts.html', posts=posts, page=page, page_count=page_count, target_user=target_user, Reactions=Reactions ) @bp.get('//threads/') def threads(username): username = username.lower() if username == 'deleteduser': abort(404) target_user = Users.find({'username': username}) if not target_user: abort(404) PER_PAGE = 10 threads_count = Threads.count({'user_id': target_user.id}) page_count = max(1, math.ceil(threads_count / PER_PAGE)) page = 1 try: page = max(1, min(int(request.args.get('page', default=1)), page_count)) except ValueError: abort(404) threads = target_user.get_started_threads(PER_PAGE, page) return render_template( 'users/threads.html', threads=threads, page=page, page_count=page_count, target_user=target_user, Reactions=Reactions ) @bp.get('//comments/') def comments(username): username = username.lower() if username == 'deleteduser': abort(404) return 'stub' @bp.get('//settings/') @login_required @redirect_to_own def settings(username): user = get_active_user() sort_by = session.get('sort_by', 'activity') return render_template( 'users/settings.html', user=user, sort_by=sort_by ) @bp.post('//settings/set-avatar') @login_required @user_required @redirect_to_own def set_avatar(username): user = get_active_user() return_to = redirect(url_for('.settings', username=user.username)) if 'avatar' not in request.files: abort(400) avi_file = request.files['avatar'] if avi_file.filename == '': abort(400) avi_file.seek(0, os.SEEK_END) file_size = avi_file.tell() avi_file.seek(0, os.SEEK_SET) if file_size > AVATAR_MAX_SIZE: flash('Your avatar must be 1MB or less.', InfoboxKind.ERROR) return return_to avi_bytes = avi_file.read() now = int(time.time()) filename = f'u{user.id}d{now}.webp' output_path = os.path.join(current_app.config['AVATAR_UPLOAD_PATH'], filename) proxied_filename = f'/static/avatars/{filename}' res = validate_and_create_avatar(avi_bytes, output_path) if res: flash('Avatar updated.', InfoboxKind.INFO) avatar = Avatars.create({ 'file_path': proxied_filename, 'uploaded_at': now, }) old_avatar = Avatars.find({'id': user.avatar_id}) user.update({'avatar_id': avatar.id}) if int(old_avatar.id) != 1: filename = os.path.join(current_app.config['AVATAR_UPLOAD_PATH'], os.path.basename(old_avatar.file_path)) os.remove(filename) old_avatar.delete() return return_to else: flash('Something went wrong.;Please try again.', InfoboxKind.ERROR) return return_to @bp.post('//settings/clear-avatar') @login_required @user_required @redirect_to_own def clear_avatar(username): user = get_active_user() if user.is_default_avatar(): return redirect(url_for('.settings', username=username)) old_avatar = Avatars.find({'id': user.avatar_id}) user.update({'avatar_id': 1}) filename = os.path.join(current_app.config['AVATAR_UPLOAD_PATH'], os.path.basename(old_avatar.file_path)) os.remove(filename) old_avatar.delete() return redirect(url_for('.settings', username=username)) @bp.post('//settings/change-password') @login_required @redirect_to_own def change_password(username): user = get_active_user() current_password = request.form.get('current_password', '') new_password = request.form.get('new_password', '') new_password2 = request.form.get('new_password2', '') new_hash = digest(new_password) old_correct = verify(user.password_hash, current_password) new_match = compare_timesafe(new_password, new_password2) if (old_correct and new_match) == False: flash('The current password is incorrect or the new passwords do not match.;Please try again.', InfoboxKind.ERROR) return redirect(url_for('.settings', username=username)) user.update({ 'password_hash': new_hash }) revoke_all_sessions(user.id) return redirect(url_for('.log_in')) @bp.post('//settings/set-personalization') @login_required @user_required @redirect_to_own def set_personalization(username): user = get_active_user() session['sort_by'] = request.form.get('sort_by', 'activity') session['dont_subscribe_by_default'] = not get_form_checkbox('subscribe_by_default') user.update({ 'status': request.form.get('status', '')[:100], 'display_name': parse_display_name(request.form.get('display_name', '')) }) flash('Personalization settings updated.', InfoboxKind.INFO) return redirect(url_for('.settings', username=username)) @bp.post('//settings/set-sig') @login_required @user_required @redirect_to_own def set_sig(username): user = get_active_user() user.set_signature(request.form.get('babycode_content', '')) flash('Signature updated.', InfoboxKind.INFO) return redirect(url_for('.settings', username=username)) @bp.post('//settings/set-bio') @login_required @user_required @redirect_to_own def set_bio(username): return 'stub' @bp.get('//inbox/') @login_required @redirect_to_own def inbox(username): user = get_active_user() unread_count = user.get_unread_count() subscriptions = user.get_all_subscriptions() return render_template('users/inbox.html', unread_count=unread_count, subscriptions=subscriptions) @bp.get('//bookmarks/') @login_required @redirect_to_own def bookmarks(username): username = username.lower() return 'stub' @bp.get('//delete-confirm/') @login_required @redirect_to_own def delete_confirm(username): return render_template('users/delete_confirm.html') @bp.post('//delete-confirm/') @login_required @redirect_to_own @csrf_verified def delete_confirm_post(username): user = get_active_user() password = request.form.get('password', '') if not verify(user.password_hash, password): flash('Incorrect password.', InfoboxKind.ERROR) return redirect(url_for('.delete_confirm', username=username)) if user.is_admin(): flash('You can not delete the admin account.', InfoboxKind.ERROR) return redirect(url_for('.delete_confirm', username=username)) anonymize_user(user.id) revoke_all_sessions(user.id) user.delete() return redirect(url_for('topics.all_topics'))