737 lines
24 KiB
Python
737 lines
24 KiB
Python
from flask import (
|
|
Blueprint, redirect, url_for,
|
|
render_template, request, session,
|
|
abort, flash, current_app
|
|
)
|
|
from functools import wraps
|
|
from secrets import compare_digest as compare_timesafe, token_urlsafe
|
|
from wand.image import Image
|
|
from wand.color import Color
|
|
from wand.exceptions import WandException
|
|
from ..auth import (
|
|
digest, verify, create_session,
|
|
is_logged_in, parse_username, is_password_valid,
|
|
login_required, revoke_session, get_active_user,
|
|
parse_display_name, revoke_all_sessions, csrf_verified
|
|
)
|
|
from ..models import Users, Posts, Reactions, Threads, Avatars, PostHistory, Mentions, BookmarkCollections, InviteKeys, Badges, BadgeUploads
|
|
from ..constants import PermissionLevel, InfoboxKind
|
|
from ..util import get_form_checkbox, time_now
|
|
from ..lib.babycode import babycode_to_html
|
|
from ..db import db
|
|
import math
|
|
import os
|
|
import time
|
|
|
|
AVATAR_MAX_SIZE = 1000 * 1000 # 1MB
|
|
BADGE_MAX_SIZE = 1000 * 500 # 500K
|
|
|
|
bp = Blueprint('users', __name__, url_prefix='/users/')
|
|
|
|
def validate_and_create_avatar(input_image, filename):
|
|
try:
|
|
with Image(blob=input_image) as img:
|
|
if hasattr(img, 'sequence') and len(img.sequence) > 1:
|
|
img = Image(image=img.sequence[0])
|
|
img.strip()
|
|
img.gravity = 'center'
|
|
|
|
width, height = img.width, img.height
|
|
min_dim = min(width, height)
|
|
if min_dim > 256:
|
|
ratio = 256.0 / min_dim
|
|
new_width = int(width * ratio)
|
|
new_height = int(height * ratio)
|
|
img.resize(new_width, new_height)
|
|
|
|
width, height = img.width, img.height
|
|
crop_size = min(width, height)
|
|
x_offset = (width - crop_size) // 2
|
|
y_offset = (height - crop_size) // 2
|
|
img.crop(left=x_offset, top=y_offset,
|
|
width=crop_size, height=crop_size)
|
|
|
|
img.resize(256, 256)
|
|
img.background_color = Color('white')
|
|
img.alpha_channel = 'remove'
|
|
img.format = 'webp'
|
|
img.compression_quality = 85
|
|
img.save(filename=filename)
|
|
return True
|
|
except WandException:
|
|
return False
|
|
|
|
def validate_and_create_badge(input_image, filename):
|
|
try:
|
|
with Image(blob=input_image) as img:
|
|
if img.width != 88 or img.height != 31:
|
|
return False
|
|
if hasattr(img, 'sequence') and len(img.sequence) > 1:
|
|
img = Image(image=img.sequence[0])
|
|
img.strip()
|
|
|
|
img.format = 'webp'
|
|
img.compression_quality = 90
|
|
img.save(filename=filename)
|
|
return True
|
|
except WandException:
|
|
return False
|
|
|
|
def anonymize_user(user_id):
|
|
deleted_user = Users.find({'username': 'deleteduser'})
|
|
|
|
from ..lib.babycode import sanitize, babycode_to_html
|
|
from ..db import db
|
|
threads = Threads.findall({'user_id': user_id})
|
|
posts = Posts.findall({'user_id': user_id})
|
|
|
|
revs_q = """SELECT DISTINCT m.revision_id FROM mentions m
|
|
WHERE m.mentioned_user_id = ?"""
|
|
|
|
mentioned_revs = db.query(revs_q, int(user_id))
|
|
with db.transaction():
|
|
for thread in threads:
|
|
thread.update({'user_id': int(deleted_user.id)})
|
|
for post in posts:
|
|
post.update({'user_id': int(deleted_user.id)})
|
|
|
|
revs = {}
|
|
for rev in mentioned_revs:
|
|
ph = PostHistory.find({'id': int(rev['revision_id'])})
|
|
ms = Mentions.findall({
|
|
'mentioned_user_id': int(user_id),
|
|
'revision_id': int(rev['revision_id']),
|
|
})
|
|
data = {
|
|
'text': sanitize(ph.original_markup),
|
|
'mentions': ms,
|
|
}
|
|
data['mentions'] = sorted(data['mentions'], key=lambda x: int(x.end_index), reverse=True)
|
|
revs[rev['revision_id']] = data
|
|
|
|
for rev_id, data in revs.items():
|
|
text = data['text']
|
|
for mention in data['mentions']:
|
|
text = text[:mention.start_index] + '@deleteduser' + text[mention.end_index:]
|
|
mention.delete()
|
|
|
|
res = babycode_to_html(text)
|
|
ph = PostHistory.find({'id': int(rev_id)})
|
|
ph.update({
|
|
'original_markup': text.unescape(),
|
|
'content': res.result,
|
|
})
|
|
|
|
def redirect_if_logged_in(destination='topics.all_topics'):
|
|
def decorator(view_func):
|
|
@wraps(view_func)
|
|
def wrapper(*args, **kwargs):
|
|
if is_logged_in():
|
|
return redirect(url_for(destination))
|
|
return view_func(*args, **kwargs)
|
|
return wrapper
|
|
return decorator
|
|
|
|
def redirect_to_own(view_func):
|
|
@wraps(view_func)
|
|
def wrapper(username, *args, **kwargs):
|
|
user = get_active_user()
|
|
if username.lower() != user.username:
|
|
view_args = dict(request.view_args)
|
|
view_args.pop('username', None)
|
|
new_args = {**view_args, 'username': user.username}
|
|
return redirect(url_for(request.endpoint, **new_args))
|
|
return view_func(username, *args, **kwargs)
|
|
return wrapper
|
|
|
|
def user_required(view_func):
|
|
@wraps(view_func)
|
|
def wrapper(username, *args, **kwargs):
|
|
user = get_active_user()
|
|
if user.is_guest():
|
|
abort(403)
|
|
|
|
return view_func(username, *args, **kwargs)
|
|
return wrapper
|
|
|
|
@bp.get('/log-in/')
|
|
@redirect_if_logged_in()
|
|
def log_in():
|
|
return render_template('users/log_in.html')
|
|
|
|
@bp.post('/log-in/')
|
|
@redirect_if_logged_in()
|
|
def log_in_post():
|
|
username = request.form.get('username', default='').lower()
|
|
user = Users.find({'username': username})
|
|
if not user:
|
|
return redirect(url_for('.log_in', error='The username or password you entered is incorrect.'))
|
|
password = request.form.get('password', default='')
|
|
if not verify(user.password_hash, password):
|
|
return redirect(url_for('.log_in', error='The username or password you entered is incorrect.'))
|
|
|
|
session['remember'] = request.form.get('remember') == 'on'
|
|
sess = create_session(user.id, not session['remember'])
|
|
session['pyrom_session_key'] = sess.key
|
|
if session['remember']:
|
|
session.permanent = True
|
|
return redirect(request.form.get('return_to', default=url_for('topics.all_topics')))
|
|
|
|
@bp.post('/log-out/')
|
|
@login_required
|
|
def log_out():
|
|
revoke_session(get_active_user().id)
|
|
return redirect(url_for('topics.all_topics'))
|
|
|
|
@bp.get('/sign-up/')
|
|
@redirect_if_logged_in()
|
|
def sign_up():
|
|
key = request.args.get('key', '')
|
|
invite = None
|
|
inviter = None
|
|
if not key and current_app.config['DISABLE_SIGNUP']:
|
|
return redirect(url_for('topics.all_topics'))
|
|
elif key and current_app.config['DISABLE_SIGNUP']:
|
|
invite = InviteKeys.find({'key': key})
|
|
if not invite:
|
|
return redirect(url_for('topics.all_topics'))
|
|
inviter = Users.find({'id': invite.created_by})
|
|
return render_template('users/sign_up.html', invite=invite, inviter=inviter)
|
|
|
|
@bp.post('/sign-up/')
|
|
@redirect_if_logged_in()
|
|
def sign_up_post():
|
|
args_sans_error = dict(request.args)
|
|
args_sans_error.pop('error', '')
|
|
generic_error_page = redirect(url_for('.sign_up', error='The username or password you entered is invalid.', **args_sans_error))
|
|
invalid_username_error_page = redirect(url_for('.sign_up', error='This username cannot be used. Please pick another.', **args_sans_error))
|
|
passwords_error_page = redirect(url_for('.sign_up', error='The passwords do not match.', **args_sans_error))
|
|
username = request.form.get('username', default='')
|
|
invite = None
|
|
if current_app.config['DISABLE_SIGNUP']:
|
|
key = request.form.get('key', '')
|
|
if not key:
|
|
return generic_error_page
|
|
invite = InviteKeys.find({'key': key})
|
|
if not invite:
|
|
return generic_error_page
|
|
if invite.expires_at < time_now():
|
|
return generic_error_page
|
|
if not username:
|
|
return generic_error_page
|
|
if request.form.get('password') is None:
|
|
return generic_error_page
|
|
if len(request.form.getlist('password')) != 2:
|
|
return passwords_error_page
|
|
try:
|
|
username_pair = parse_username(username)
|
|
except ValueError:
|
|
return invalid_username_error_page
|
|
potential_user = Users.find({'username': username})
|
|
if potential_user:
|
|
return invalid_username_error_page
|
|
|
|
if not compare_timesafe(request.form.getlist('password')[0], request.form.getlist('password')[1]):
|
|
return passwords_error_page
|
|
|
|
password_hash = digest(request.form.get('password'))
|
|
|
|
user_data = {
|
|
'username': username_pair[0],
|
|
'password_hash': password_hash,
|
|
'permission': PermissionLevel.GUEST.value,
|
|
'created_at': time_now(),
|
|
}
|
|
if invite:
|
|
user_data['invited_by'] = invite.created_by
|
|
user_data['permission'] = PermissionLevel.USER.value
|
|
user_data['confirmed_on'] = time_now()
|
|
invite.delete()
|
|
|
|
user = Users.create(user_data)
|
|
|
|
BookmarkCollections.create_default(user.id)
|
|
|
|
if username_pair[0] != username_pair[1]:
|
|
user.update({
|
|
'display_name': parse_display_name(username_pair[1])
|
|
})
|
|
|
|
session['remember'] = request.form.get('remember') == 'on'
|
|
sess = create_session(user.id, not session['remember'])
|
|
session['pyrom_session_key'] = sess.key
|
|
if session['remember']:
|
|
session.permanent = True
|
|
|
|
flash(f'Welcome to {current_app.config['SITE_NAME']}!', InfoboxKind.INFO)
|
|
return redirect(url_for('topics.all_topics'))
|
|
|
|
@bp.get('/<username>/')
|
|
def user_page(username):
|
|
username = username.lower()
|
|
target_user = Users.find({'username': username})
|
|
if not target_user:
|
|
abort(404)
|
|
if current_app.config['DISABLE_SIGNUP'] and target_user.invited_by:
|
|
invited_by = Users.find({'id': target_user.invited_by})
|
|
else:
|
|
invited_by = None
|
|
return render_template('users/user_page.html', target_user=target_user, invited_by=invited_by)
|
|
|
|
@bp.get('/<username>/posts/')
|
|
def posts(username):
|
|
username = username.lower()
|
|
if username == 'deleteduser':
|
|
abort(404)
|
|
target_user = Users.find({'username': username})
|
|
if not target_user:
|
|
abort(404)
|
|
PER_PAGE = 10
|
|
posts_count = Posts.count({'user_id': target_user.id})
|
|
page_count = max(1, math.ceil(posts_count / PER_PAGE))
|
|
page = 1
|
|
try:
|
|
page = max(1, min(int(request.args.get('page', default=1)), page_count))
|
|
except ValueError:
|
|
abort(404)
|
|
posts = target_user.get_posts(PER_PAGE, page)
|
|
return render_template(
|
|
'users/posts.html', posts=posts,
|
|
page=page, page_count=page_count,
|
|
target_user=target_user,
|
|
Reactions=Reactions,
|
|
)
|
|
|
|
@bp.get('/<username>/threads/')
|
|
def threads(username):
|
|
username = username.lower()
|
|
if username == 'deleteduser':
|
|
abort(404)
|
|
target_user = Users.find({'username': username})
|
|
if not target_user:
|
|
abort(404)
|
|
PER_PAGE = 10
|
|
threads_count = Threads.count({'user_id': target_user.id})
|
|
page_count = max(1, math.ceil(threads_count / PER_PAGE))
|
|
page = 1
|
|
try:
|
|
page = max(1, min(int(request.args.get('page', default=1)), page_count))
|
|
except ValueError:
|
|
abort(404)
|
|
threads = target_user.get_started_threads(PER_PAGE, page)
|
|
return render_template(
|
|
'users/threads.html', threads=threads,
|
|
page=page, page_count=page_count,
|
|
target_user=target_user,
|
|
Reactions=Reactions,
|
|
)
|
|
|
|
@bp.get('/<username>/comments/')
|
|
def comments(username):
|
|
username = username.lower()
|
|
if username == 'deleteduser':
|
|
abort(404)
|
|
return 'stub'
|
|
|
|
@bp.get('/<username>/settings/')
|
|
@login_required
|
|
@redirect_to_own
|
|
def settings(username):
|
|
user = get_active_user()
|
|
sort_by = session.get('sort_by', 'activity')
|
|
invites = InviteKeys.findall({'created_by': user.id})
|
|
return render_template(
|
|
'users/settings.html', user=user,
|
|
sort_by=sort_by,
|
|
invites=invites,
|
|
)
|
|
|
|
@bp.post('/<username>/settings/set-avatar')
|
|
@login_required
|
|
@user_required
|
|
@redirect_to_own
|
|
def set_avatar(username):
|
|
user = get_active_user()
|
|
return_to = redirect(url_for('.settings', username=user.username))
|
|
if 'avatar' not in request.files:
|
|
abort(400)
|
|
|
|
avi_file = request.files['avatar']
|
|
|
|
if avi_file.filename == '':
|
|
abort(400)
|
|
|
|
avi_file.seek(0, os.SEEK_END)
|
|
file_size = avi_file.tell()
|
|
avi_file.seek(0, os.SEEK_SET)
|
|
|
|
if file_size > AVATAR_MAX_SIZE:
|
|
flash('Your avatar must be 1MB or less.', InfoboxKind.ERROR)
|
|
return return_to
|
|
|
|
avi_bytes = avi_file.read()
|
|
now = int(time.time())
|
|
filename = f'u{user.id}d{now}.webp'
|
|
output_path = os.path.join(current_app.config['AVATAR_UPLOAD_PATH'], filename)
|
|
proxied_filename = f'/static/avatars/{filename}'
|
|
|
|
res = validate_and_create_avatar(avi_bytes, output_path)
|
|
if res:
|
|
flash('Avatar updated.', InfoboxKind.INFO)
|
|
avatar = Avatars.create({
|
|
'file_path': proxied_filename,
|
|
'uploaded_at': now,
|
|
})
|
|
old_avatar = Avatars.find({'id': user.avatar_id})
|
|
user.update({'avatar_id': avatar.id})
|
|
if int(old_avatar.id) != 1:
|
|
filename = os.path.join(current_app.config['AVATAR_UPLOAD_PATH'], os.path.basename(old_avatar.file_path))
|
|
os.remove(filename)
|
|
old_avatar.delete()
|
|
return return_to
|
|
else:
|
|
flash('Something went wrong.;Please try again.', InfoboxKind.ERROR)
|
|
|
|
return return_to
|
|
|
|
@bp.post('/<username>/settings/clear-avatar')
|
|
@login_required
|
|
@user_required
|
|
@redirect_to_own
|
|
def clear_avatar(username):
|
|
user = get_active_user()
|
|
if user.is_default_avatar():
|
|
return redirect(url_for('.settings', username=username))
|
|
|
|
old_avatar = Avatars.find({'id': user.avatar_id})
|
|
user.update({'avatar_id': 1})
|
|
filename = os.path.join(current_app.config['AVATAR_UPLOAD_PATH'], os.path.basename(old_avatar.file_path))
|
|
os.remove(filename)
|
|
old_avatar.delete()
|
|
return redirect(url_for('.settings', username=username))
|
|
|
|
@bp.post('/<username>/settings/change-password')
|
|
@login_required
|
|
@redirect_to_own
|
|
def change_password(username):
|
|
user = get_active_user()
|
|
current_password = request.form.get('current_password', '')
|
|
new_password = request.form.get('new_password', '')
|
|
new_password2 = request.form.get('new_password2', '')
|
|
|
|
new_hash = digest(new_password)
|
|
|
|
old_correct = verify(user.password_hash, current_password)
|
|
new_match = compare_timesafe(new_password, new_password2)
|
|
|
|
if (old_correct and new_match) == False:
|
|
flash('The current password is incorrect or the new passwords do not match.;Please try again.', InfoboxKind.ERROR)
|
|
return redirect(url_for('.settings', username=username))
|
|
|
|
user.update({
|
|
'password_hash': new_hash
|
|
})
|
|
|
|
revoke_all_sessions(user.id)
|
|
|
|
return redirect(url_for('.log_in'))
|
|
|
|
@bp.post('/<username>/settings/set-personalization')
|
|
@login_required
|
|
@user_required
|
|
@redirect_to_own
|
|
def set_personalization(username):
|
|
user = get_active_user()
|
|
session['sort_by'] = request.form.get('sort_by', 'activity')
|
|
session['dont_subscribe_by_default'] = not get_form_checkbox('subscribe_by_default')
|
|
|
|
old_display_name = user.display_name
|
|
new_display_name = parse_display_name(request.form.get('display_name', ''))
|
|
|
|
user.update({
|
|
'status': request.form.get('status', '')[:100],
|
|
'display_name': new_display_name
|
|
})
|
|
|
|
if old_display_name != new_display_name:
|
|
# re-parse posts with mentions
|
|
q = """SELECT DISTINCT m.revision_id FROM mentions m
|
|
JOIN post_history ph ON m.revision_id = ph.id
|
|
JOIN posts p ON p.current_revision_id = ph.id
|
|
WHERE m.mentioned_user_id = ?"""
|
|
mentions = db.query(q, int(user.id))
|
|
with db.transaction():
|
|
for mention in mentions:
|
|
rev = PostHistory.find({'id': int(mention['revision_id'])})
|
|
parsed_content = babycode_to_html(rev.original_markup).result
|
|
rev.update({'content': parsed_content})
|
|
|
|
flash('Personalization settings updated.', InfoboxKind.INFO)
|
|
return redirect(url_for('.settings', username=username))
|
|
|
|
@bp.post('/<username>/settings/set-sig')
|
|
@login_required
|
|
@user_required
|
|
@redirect_to_own
|
|
def set_sig(username):
|
|
user = get_active_user()
|
|
user.set_signature(request.form.get('babycode_content', ''))
|
|
flash('Signature updated.', InfoboxKind.INFO)
|
|
return redirect(url_for('.settings', username=username))
|
|
|
|
@bp.post('/<username>/settings/set-bio')
|
|
@login_required
|
|
@user_required
|
|
@redirect_to_own
|
|
def set_bio(username):
|
|
return 'stub'
|
|
|
|
@bp.get('/<username>/inbox/')
|
|
@login_required
|
|
@redirect_to_own
|
|
def inbox(username):
|
|
user = get_active_user()
|
|
unread_count = user.get_unread_count()
|
|
subscriptions = user.get_all_subscriptions()
|
|
return render_template('users/inbox.html', unread_count=unread_count, subscriptions=subscriptions)
|
|
|
|
@bp.get('/<username>/bookmarks/')
|
|
@login_required
|
|
@redirect_to_own
|
|
def bookmarks(username):
|
|
user = get_active_user()
|
|
collections = BookmarkCollections.get_for_user(user.id)
|
|
return render_template('users/bookmarks.html', collections=collections)
|
|
|
|
@bp.get('/<username>/bookmarks/collections/')
|
|
@login_required
|
|
@user_required
|
|
@redirect_to_own
|
|
def bookmark_collections(username):
|
|
user = get_active_user()
|
|
collections = BookmarkCollections.get_for_user(user.id)
|
|
return render_template('users/manage_collections.html', collections=collections)
|
|
|
|
@bp.post('/<username>/bookmarks/collections/')
|
|
@login_required
|
|
@user_required
|
|
@redirect_to_own
|
|
def edit_bookmark_collections(username):
|
|
user = get_active_user()
|
|
ids = request.form.getlist('id[]')
|
|
names = request.form.getlist('name[]')
|
|
if len(ids) == 0 or len(ids) != len(names):
|
|
abort(400)
|
|
deleted_ids = filter(lambda x: x.strip(), request.form.get('deleted_ids', '').split(';'))
|
|
try:
|
|
deleted_ids = map(lambda x: int(x), deleted_ids)
|
|
except ValueError:
|
|
abort(400)
|
|
|
|
with db.transaction():
|
|
for new_order, id in enumerate(ids):
|
|
new_name = names[new_order]
|
|
if id == 'new':
|
|
bc = BookmarkCollections.create({
|
|
'user_id': user.id,
|
|
'is_default': False,
|
|
'name': new_name,
|
|
'sort_order': new_order,
|
|
})
|
|
continue
|
|
id = int(id)
|
|
bc = BookmarkCollections.find({'id': id})
|
|
if not bc:
|
|
continue
|
|
if bc.user_id != user.id:
|
|
continue
|
|
if bc.is_default:
|
|
new_order = 0
|
|
elif new_order == 0:
|
|
new_order = 1
|
|
bc.update({
|
|
'name': new_name,
|
|
'sort_order': new_order,
|
|
})
|
|
|
|
for deleted_id in deleted_ids:
|
|
bc = BookmarkCollections.find({'id': deleted_id})
|
|
if not bc:
|
|
continue
|
|
if bc.user_id != user.id:
|
|
continue
|
|
if bc.is_default:
|
|
continue
|
|
bc.delete()
|
|
|
|
return redirect(url_for('.bookmark_collections', username=username))
|
|
|
|
@bp.get('/<username>/delete-confirm/')
|
|
@login_required
|
|
@redirect_to_own
|
|
def delete_confirm(username):
|
|
return render_template('users/delete_confirm.html')
|
|
|
|
@bp.post('/<username>/delete-confirm/')
|
|
@login_required
|
|
@redirect_to_own
|
|
@csrf_verified
|
|
def delete_confirm_post(username):
|
|
user = get_active_user()
|
|
|
|
password = request.form.get('password', '')
|
|
if not verify(user.password_hash, password):
|
|
flash('Incorrect password.', InfoboxKind.ERROR)
|
|
return redirect(url_for('.delete_confirm', username=username))
|
|
|
|
if user.is_admin():
|
|
flash('You can not delete the admin account.', InfoboxKind.ERROR)
|
|
return redirect(url_for('.delete_confirm', username=username))
|
|
|
|
anonymize_user(user.id)
|
|
revoke_all_sessions(user.id)
|
|
user.delete()
|
|
|
|
return redirect(url_for('topics.all_topics'))
|
|
|
|
@bp.post('/<username>/invite-keys/create/')
|
|
@login_required
|
|
@redirect_to_own
|
|
@csrf_verified
|
|
def create_invite_key(username):
|
|
user = get_active_user()
|
|
if not user.can_invite():
|
|
abort(404)
|
|
|
|
key = token_urlsafe(16)
|
|
expires_at = time_now() + 48 * 60 * 60
|
|
|
|
invite = InviteKeys.create({
|
|
'created_by': user.id,
|
|
'expires_at': expires_at,
|
|
'key': key,
|
|
})
|
|
|
|
return redirect(url_for('.settings', username=username, _anchor='invite'))
|
|
|
|
@bp.post('/<username>/invite-keys/revoke/')
|
|
@login_required
|
|
@redirect_to_own
|
|
@csrf_verified
|
|
def revoke_invite_key(username):
|
|
user = get_active_user()
|
|
if not user.can_invite():
|
|
abort(404)
|
|
|
|
key = request.form.get('key', '')
|
|
invite = InviteKeys.find({'created_by': user.id, 'key': key})
|
|
if not invite:
|
|
abort(404)
|
|
|
|
invite.delete()
|
|
return redirect(url_for('.settings', username=username, _anchor='invite'))
|
|
|
|
@bp.post('/<username>/settings/badges/')
|
|
@login_required
|
|
@redirect_to_own
|
|
def save_badges(username):
|
|
user = get_active_user()
|
|
if user.is_guest():
|
|
abort(403)
|
|
|
|
ids = request.form.getlist('id[]', type=int)
|
|
badge_choices = request.form.getlist('badge_choice[]')
|
|
files = request.files.getlist('badge_file[]')
|
|
labels = request.form.getlist('label[]')
|
|
links = request.form.getlist('link[]')
|
|
|
|
existing_badges = {badge.id: badge for badge in Badges.findall({'user_id': user.id})}
|
|
|
|
if not (len(ids) == len(badge_choices) == len(files) == len(labels) == len(links)):
|
|
abort(400)
|
|
|
|
rejected_badges = []
|
|
# print(ids)
|
|
|
|
# print(db.query(f'SELECT id FROM badges WHERE id NOT IN {db.binding_list(len(ids))}', *ids))
|
|
deleted_badges = Badges.findall([
|
|
('id', 'NOT IN', ids),
|
|
('user_id', '=', user.id),
|
|
])
|
|
print(list(map(lambda x: x.id, deleted_badges)))
|
|
|
|
with db.transaction():
|
|
for b in deleted_badges:
|
|
b.delete()
|
|
|
|
for i, id in enumerate(ids):
|
|
badge_upload_id = badge_choices[i]
|
|
label = labels[i]
|
|
link = links[i]
|
|
pending_badge = {
|
|
'label': label,
|
|
'link': link,
|
|
'sort_order': i,
|
|
}
|
|
if badge_upload_id == 'custom':
|
|
file = files[i]
|
|
if not file:
|
|
rejected_badges.append(file.filename)
|
|
continue
|
|
file.seek(0, os.SEEK_END)
|
|
file_size = file.tell()
|
|
file.seek(0, os.SEEK_SET)
|
|
|
|
if file_size > BADGE_MAX_SIZE:
|
|
rejected_badges.append(file.filename)
|
|
continue
|
|
|
|
file_bytes = file.read()
|
|
now = time_now()
|
|
filename = f'u{user.id}d{now}s{i}.webp'
|
|
output_path = os.path.join(current_app.config['BADGES_UPLOAD_PATH'], filename)
|
|
proxied_filename = f'/static/badges/user/{filename}'
|
|
res = validate_and_create_badge(file_bytes, output_path)
|
|
if not res:
|
|
rejected_badges.append(file.filename)
|
|
continue
|
|
|
|
bu = BadgeUploads.create({
|
|
'user_id': user.id,
|
|
'uploaded_at': now,
|
|
'file_path': proxied_filename,
|
|
'original_filename': file.filename
|
|
})
|
|
else:
|
|
bu = BadgeUploads.find({'id': badge_upload_id})
|
|
if not bu:
|
|
continue
|
|
|
|
pending_badge['upload'] = bu.id
|
|
|
|
if id == -1:
|
|
pending_badge['user_id'] = user.id
|
|
badge = Badges.create(pending_badge)
|
|
else:
|
|
badge = Badges.find({'id': id})
|
|
if badge.user_id != user.id:
|
|
continue
|
|
if not badge:
|
|
continue
|
|
badge.update(pending_badge)
|
|
|
|
for stale_upload in BadgeUploads.get_unused_for_user(user.id):
|
|
filename = os.path.join(current_app.config['BADGES_UPLOAD_PATH'], os.path.basename(stale_upload.file_path))
|
|
os.remove(filename)
|
|
stale_upload.delete()
|
|
|
|
message = 'Badges updated.'
|
|
icon = InfoboxKind.INFO
|
|
if rejected_badges:
|
|
message += f';Some of your badges were incorrect and were not uploaded: {", ".join(rejected_badges)}.'
|
|
icon = InfoboxKind.WARN
|
|
|
|
flash(message, icon)
|
|
|
|
return redirect(url_for('.settings', username=username))
|