from flask import Blueprint, abort, redirect, url_for, request, render_template, flash from ..constants import InfoboxKind, PermissionLevel from ..auth import is_logged_in, get_active_user, csrf_verified from ..models import Topics, Threads, Users from slugify import slugify from functools import wraps import time bp = Blueprint('mod', __name__, url_prefix='/mod/') @bp.before_request def mod_only(): if not is_logged_in(): abort(403) if not get_active_user().is_mod(): abort(403) def admin_only(view_func): @wraps(view_func) def wrapper(*args, **kwargs): if not get_active_user().is_admin(): abort(403) return view_func(*args, **kwargs) return wrapper @bp.get('/') def index(): return 'stub' @bp.get('/topics/new/') def new_topic(): return render_template('mod/new_topic.html') @bp.post('/topics/new/') def new_topic_post(): topic = Topics.new(request.form.get('name'), request.form.get('description')) return redirect(url_for('topics.topic_by_id', topic_id=topic.id)) @bp.get('/topics/sort/') def sort_topics(): return 'stub' @bp.get('/topics//edit/') def edit_topic(topic_id): topic = Topics.find({'id': topic_id}) if not topic: abort(404) return render_template('mod/edit_topic.html', topic=topic) @bp.post('/topics//edit/') def edit_topic_post(topic_id): topic = Topics.find({'id': topic_id}) if not topic: abort(404) target_name = request.form.get('name').strip() name_exists = Topics.count([ ('lower(name)', '=', target_name.lower()), ('id', '!=', topic.id) ]) > 0 if name_exists: flash(f'A topic named "{target_name}" already exists.', InfoboxKind.ERROR) return redirect(url_for('.edit_topic', topic_id=topic_id)) topic.update({ 'name': target_name, 'description': request.form.get('description').strip(), 'slug': slugify(target_name[:50]), }) return redirect(url_for('topics.topic_by_id', topic_id=topic.id)) @bp.post('/topics//lock/') def lock_topic(topic_id): topic = Topics.find({'id': topic_id}) if not topic: abort(404) topic.update({'is_locked': request.form.get('lock', default=0)}) return redirect(url_for('topics.topic_by_id', topic_id=topic.id)) @bp.post('/threads//move/') def move_thread(thread_id): thread = Threads.find({'id': thread_id}) if not thread: abort(404) target_topic = Topics.find({'id': request.form.get('new_topic_id', default=None)}) if not target_topic: abort(404) thread.update({'topic_id': target_topic.id}) return redirect(url_for('threads.thread_by_id', thread_id=thread.id)) @bp.post('/threads//lock/') def lock_thread(thread_id): thread = Threads.find({'id': thread_id}) if not thread: abort(404) thread.update({'is_locked': request.form.get('lock')}) return redirect(url_for('threads.thread_by_id', thread_id=thread.id)) @bp.post('/threads//sticky/') def sticky_thread(thread_id): thread = Threads.find({'id': thread_id}) if not thread: abort(404) thread.update({'is_stickied': request.form.get('sticky')}) return redirect(url_for('threads.thread_by_id', thread_id=thread.id)) @bp.post('/users//make-guest/') @csrf_verified def make_user_guest(user_id): mod = get_active_user() target_user = Users.find({'id': user_id}) if not target_user: abort(404) if target_user.is_admin() or target_user.is_system(): abort(403) if int(target_user.permission) >= int(mod.permission): abort(403) target_user.update({ 'permission': PermissionLevel.GUEST.value, 'confirmed_on': None, }) return redirect(url_for('users.user_page', username=target_user.username)) @bp.post('/users//make-user/') @csrf_verified def make_user_regular(user_id): mod = get_active_user() target_user = Users.find({'id': user_id}) if not target_user: abort(404) if target_user.is_admin() or target_user.is_system(): abort(403) # mod -> regular user, abort if not admin if int(target_user.permission) >= int(mod.permission): abort(403) update_dict = {'permission': PermissionLevel.USER.value} # set approved date if the user was guest if target_user.is_guest(): update_dict['confirmed_on'] = int(time.time()) target_user.update(update_dict) return redirect(url_for('users.user_page', username=target_user.username)) @bp.post('/users//make-mod/') @admin_only @csrf_verified def make_user_mod(user_id): mod = get_active_user() target_user = Users.find({'id': user_id}) if not target_user: abort(404) if target_user.is_admin() or target_user.is_system(): abort(403) if int(target_user.permission) >= int(mod.permission): abort(403) target_user.update({'permission': PermissionLevel.MODERATOR.value}) return redirect(url_for('users.user_page', username=target_user.username))