from flask import Blueprint, abort, redirect, url_for, request, render_template, flash from ..constants import InfoboxKind, PermissionLevel, MOTD_BANNED_TAGS from ..auth import is_logged_in, get_active_user, csrf_verified from ..models import Topics, Threads, Users, MOTD from ..db import db from ..lib.babycode import babycode_to_html, BABYCODE_VERSION from slugify import slugify from functools import wraps import time bp = Blueprint('mod', __name__, url_prefix='/mod/') @bp.before_request def mod_only(): if not is_logged_in(): abort(403) if not get_active_user().is_mod(): abort(403) def admin_only(view_func): @wraps(view_func) def wrapper(*args, **kwargs): if not get_active_user().is_admin(): abort(403) return view_func(*args, **kwargs) return wrapper @bp.get('/') def index(): motd = MOTD.get_all()[0] if MOTD.has_motd() else None topics = Topics.get_list() return render_template('mod/panel.html', MOTD_BANNED_TAGS=MOTD_BANNED_TAGS, motd=motd, topics=topics) @bp.post('/motd/set/') def set_motd(): title = request.form.get('motd_title', '') if not title: flash('MOTD must have a title.', InfoboxKind.ERROR) return redirect(url_for('.index')) orig_body = request.form.get('babycode_content', '') if not orig_body: flash('MOTD must have a body.', InfoboxKind.ERROR) return redirect(url_for('.index')) user = get_active_user() data = { 'title': title.strip(), 'body_original_markup': orig_body, 'body_rendered': babycode_to_html(orig_body, banned_tags=MOTD_BANNED_TAGS).result, 'format_version': BABYCODE_VERSION, 'edited_at': int(time.time()), 'user_id': user.id, } if MOTD.has_motd(): motd = MOTD.get_all()[0] motd.update(data) message = 'MOTD updated.' else: data['created_at'] = int(time.time()) motd = MOTD.create(data) message = 'MOTD created.' flash(message, InfoboxKind.INFO) return redirect(url_for('.index')) @bp.post('/motd/clear/') def clear_motd(): if not MOTD.has_motd(): return redirect(url_for('.index')) current = MOTD.get_all()[0] current.delete() flash('MOTD cleared.', InfoboxKind.INFO) return redirect(url_for('.index')) @bp.get('/topics/new/') def new_topic(): return render_template('mod/new_topic.html') @bp.post('/topics/new/') def new_topic_post(): topic = Topics.new(request.form.get('name'), request.form.get('description')) return redirect(url_for('topics.topic_by_id', topic_id=topic.id)) @bp.get('/topics//edit/') def edit_topic(topic_id): topic = Topics.find({'id': topic_id}) if not topic: abort(404) return render_template('mod/edit_topic.html', topic=topic) @bp.post('/topics//edit/') def edit_topic_post(topic_id): topic = Topics.find({'id': topic_id}) if not topic: abort(404) target_name = request.form.get('name').strip() name_exists = Topics.count([ ('lower(name)', '=', target_name.lower()), ('id', '!=', topic.id) ]) > 0 if name_exists: flash(f'A topic named "{target_name}" already exists.', InfoboxKind.ERROR) return redirect(url_for('.edit_topic', topic_id=topic_id)) topic.update({ 'name': target_name, 'description': request.form.get('description').strip(), 'slug': slugify(target_name[:50]), }) return redirect(url_for('topics.topic_by_id', topic_id=topic.id)) @bp.post('/topics//lock/') def lock_topic(topic_id): topic = Topics.find({'id': topic_id}) if not topic: abort(404) topic.update({'is_locked': request.form.get('lock', default=0)}) return redirect(url_for('topics.topic_by_id', topic_id=topic.id)) @bp.post('/threads//move/') def move_thread(thread_id): thread = Threads.find({'id': thread_id}) if not thread: abort(404) target_topic = Topics.find({'id': request.form.get('new_topic_id', default=None)}) if not target_topic: abort(404) thread.update({'topic_id': target_topic.id}) return redirect(url_for('threads.thread_by_id', thread_id=thread.id)) @bp.post('/threads//lock/') def lock_thread(thread_id): thread = Threads.find({'id': thread_id}) if not thread: abort(404) thread.update({'is_locked': request.form.get('lock')}) return redirect(url_for('threads.thread_by_id', thread_id=thread.id)) @bp.post('/threads//sticky/') def sticky_thread(thread_id): thread = Threads.find({'id': thread_id}) if not thread: abort(404) thread.update({'is_stickied': request.form.get('sticky')}) return redirect(url_for('threads.thread_by_id', thread_id=thread.id)) @bp.post('/threads/sort/') def sort_topics(): topics_list = request.form.getlist('topics[]') with db.transaction(): for new_order, topic_id in enumerate(topics_list): db.execute('UPDATE topics SET sort_order = ? WHERE id = ?', new_order, topic_id) return redirect(url_for('.index', _anchor='sort-topics')) @bp.post('/users//make-guest/') @csrf_verified def make_user_guest(user_id): mod = get_active_user() target_user = Users.find({'id': user_id}) if not target_user: abort(404) if target_user.is_admin() or target_user.is_system(): abort(403) if int(target_user.permission) >= int(mod.permission): abort(403) target_user.update({ 'permission': PermissionLevel.GUEST.value, 'confirmed_on': None, }) return redirect(url_for('users.user_page', username=target_user.username)) @bp.post('/users//make-user/') @csrf_verified def make_user_regular(user_id): mod = get_active_user() target_user = Users.find({'id': user_id}) if not target_user: abort(404) if target_user.is_admin() or target_user.is_system(): abort(403) # mod -> regular user, abort if not admin if int(target_user.permission) >= int(mod.permission): abort(403) update_dict = {'permission': PermissionLevel.USER.value} # set approved date if the user was guest if target_user.is_guest(): update_dict['confirmed_on'] = int(time.time()) target_user.update(update_dict) return redirect(url_for('users.user_page', username=target_user.username)) @bp.post('/users//make-mod/') @admin_only @csrf_verified def make_user_mod(user_id): mod = get_active_user() target_user = Users.find({'id': user_id}) if not target_user: abort(404) if target_user.is_admin() or target_user.is_system(): abort(403) if int(target_user.permission) >= int(mod.permission): abort(403) target_user.update({'permission': PermissionLevel.MODERATOR.value}) return redirect(url_for('users.user_page', username=target_user.username))