from flask import Blueprint, abort, render_template, redirect, url_for, request, flash from functools import wraps from ..auth import login_required, get_active_user from ..models import Posts, Threads, Topics from ..util import get_post_url from ..db import db from ..constants import InfoboxKind bp = Blueprint('posts', __name__, url_prefix='/posts/') def ownership_required(view_func): @wraps(view_func) def wrapper(*args, **kwargs): post = Posts.find({'id': kwargs.get('post_id', None)}) user = get_active_user() if not post: abort(404) if post.user_id != user.id: abort(403) return view_func(*args, **kwargs) return wrapper def ownership_or_mod_required(view_func): @wraps(view_func) def wrapper(*args, **kwargs): post = Posts.find({'id': kwargs.get('post_id', None)}) if not post: abort(404) if post.user_id != get_active_user().id and not get_active_user().is_mod(): abort(403) return view_func(*args, **kwargs) return wrapper @bp.get('//edit/') @login_required @ownership_required def edit(post_id): post = Posts.find({'id': post_id}) if not post: abort(404) thread = Threads.find({'id': post.thread_id}) if not thread: # what? abort(404) user = get_active_user() if thread.locked() and not user.is_mod(): abort(403) thread_predicate = f'{Posts.FULL_POSTS_QUERY} WHERE posts.thread_id = ?' context_prev_q = f'{thread_predicate} AND posts.created_at < ? ORDER BY posts.created_at DESC LIMIT 2' context_next_q = f'{thread_predicate} AND posts.created_at > ? ORDER BY posts.created_at ASC LIMIT 2' context_next = db.query(context_next_q, thread.id, post.created_at) context_prev = db.query(context_prev_q, thread.id, post.created_at) return render_template( 'posts/edit.html', post=post.get_full_post_view(), context_next=context_next, context_prev=context_prev ) @bp.post('//edit/') @login_required @ownership_required def edit_post(post_id): post = Posts.find({'id': post_id}) if not post: abort(404) thread = Threads.find({'id': post.thread_id}) if not thread: # what? abort(404) user = get_active_user() if thread.locked() and not user.is_mod(): abort(403) post.edit(request.form.get('babycode_content', '')) return redirect(get_post_url(post.id, _anchor=True)) @bp.get('//delete/') @login_required @ownership_or_mod_required def delete(post_id): post = Posts.find({'id': post_id}) if not post: abort(404) thread = Threads.find({'id': post.thread_id}) if not thread: # what? abort(404) user = get_active_user() if thread.locked() and not user.is_mod(): abort(403) return render_template('posts/delete.html', post=post.get_full_post_view()) @bp.post('//delete/') @login_required @ownership_or_mod_required def delete_post(post_id): post = Posts.find({'id': post_id}) if not post: abort(404) thread = Threads.find({'id': post.thread_id}) if not thread: # what? abort(404) user = get_active_user() if thread.locked() and not user.is_mod(): abort(403) post.delete() if Posts.count({'thread_id': thread.id}) == 0: topic = Topics.find({'id': thread.topic_id}) thread.delete() flash('Thread deleted.', InfoboxKind.INFO) return redirect(url_for('topics.topic_by_id', topic_id=topic.id)) flash('Post deleted.', InfoboxKind.INFO) return redirect(url_for('threads.thread_by_id', thread_id=thread.id))