from flask import Blueprint, redirect, url_for, render_template, request, abort from functools import wraps from ..auth import login_required, get_active_user from ..models import Threads, Posts, Topics, Users, Reactions, Subscriptions from ..util import get_form_checkbox, time_now import math bp = Blueprint('threads', __name__, url_prefix='/threads/') def ownership_or_mod_required(view_func): @wraps(view_func) def wrapper(*args, **kwargs): thread = Threads.find({'id': kwargs.get('thread_id', None)}) if not thread: abort(404) if thread.user_id != get_active_user().id and not get_active_user().is_mod(): abort(403) return view_func(*args, **kwargs) return wrapper @bp.get('//') def thread_by_id(thread_id): thread = Threads.find({'id': thread_id}) if not thread: abort(404) return redirect(url_for('.thread', thread_id=thread_id, slug=thread.slug, **request.args)) @bp.get('///') def thread(thread_id, slug): thread = Threads.find({'id': thread_id}) if not thread: abort(404) if thread.slug != slug: return redirect(url_for('.thread', thread_id=thread_id, slug=thread.slug, **request.kwargs)) topic = Topics.find({'id': thread.topic_id}) started_by = Users.find({'id': thread.user_id}) PER_PAGE = 10 posts_count = Posts.count({'thread_id': thread.id}) page_count = max(1, math.ceil(posts_count / PER_PAGE)) page = 1 after = request.args.get('after') if after is not None: try: after_id = int(after) post_position = Posts.count([ ('thread_id', '=', thread.id), ('id', '<=', after_id), ]) page = math.ceil((post_position) / PER_PAGE) except ValueError: abort(404) else: try: page = max(1, min(int(request.args.get('page', default=1)), page_count)) except ValueError: abort(404) posts = thread.get_posts(PER_PAGE, page) last_post = posts[-1] return render_template( 'threads/thread.html', thread=thread, posts=posts, page=page, page_count=page_count, topic=topic, started_by=started_by, topics=Topics.get_list(), Reactions=Reactions, last_post=last_post ) @bp.post('//') @login_required def reply(thread_id): user = get_active_user() thread = Threads.find({'id': thread_id}) if not thread: abort(404) if not user.can_post_to_thread_or_topic(thread): return redirect(url_for('.thread_by_id', thread_id=thread_id)) post = Posts.new(user.id, thread.id, request.form.get('babycode_content')) if get_form_checkbox('subscribe'): if not Subscriptions.find({'user_id': user.id, 'thread_id': thread.id}): Subscriptions.create({ 'user_id': user.id, 'thread_id': thread.id, 'last_seen': time_now(), }) return redirect(url_for('.thread_by_id', thread_id=thread_id, after=post.id, _anchor=f'post-{post.id}')) @bp.get('//edit/') @login_required @ownership_or_mod_required def edit(thread_id): thread = Threads.find({'id': thread_id}) if not thread: abort(404) return render_template('threads/edit.html', thread=thread) @bp.post('//edit/') @login_required @ownership_or_mod_required def edit_post(thread_id): thread = Threads.find({'id': thread_id}) if not thread: abort(404) new_title = request.form.get('title', '').strip() if not new_title: abort(400) if new_title != thread.title: thread.update({'title': new_title}) return redirect(url_for('.thread_by_id', thread_id=thread_id)) @bp.post('//subscribe/') @login_required def subscribe(thread_id): thread = Threads.find({'id': thread_id}) if not thread: abort(404) user = get_active_user() last_post_id = request.form.get('last_post_id', None) if last_post_id is None: abort(400) if user.is_subscribed(thread_id): return redirect(url_for('.thread_by_id', thread_id=thread_id, after=last_post_id)) Subscriptions.create({ 'user_id': user.id, 'thread_id': thread_id, 'last_seen': request.form.get('last_post_timestamp', time_now()) }) return redirect(url_for('.thread_by_id', thread_id=thread_id, after=last_post_id)) @bp.post('//unsubscribe/') @login_required def unsubscribe(thread_id): thread = Threads.find({'id': thread_id}) if not thread: abort(404) user = get_active_user() last_post_id = request.form.get('last_post_id', None) if last_post_id is None: abort(400) subscription = Subscriptions.find({'user_id': user.id, 'thread_id': thread_id}) if not subscription: return redirect(url_for('.thread_by_id', thread_id=thread_id, after=last_post_id)) subscription.delete() return redirect(url_for('.thread_by_id', thread_id=thread_id, after=last_post_id)) @bp.get('//feed.atom/') def feed(thread_id): return 'stub' @bp.get('/new/') @login_required def new(): topics = Topics.select() try: selected_topic = int(request.args.get('topic_id')) except ValueError, TypeError: selected_topic = None return render_template('threads/new_thread.html', topics=topics, selected_topic=selected_topic) @bp.post('/new/') @login_required def new_post(): try: topic_id = int(request.form.get('topic_id')) except ValueError, TypeError: abort(404) topic_id = int(topic_id) topic = Topics.find({'id': topic_id}) if not topic: abort(404) user = get_active_user() if not user.can_post_to_thread_or_topic(topic): abort(403) title = request.form.get('title', '').strip() if not title: abort(400) title = title.strip() content = request.form.get('babycode_content') thread = Threads.new(user.id, topic.id, title, content) return redirect(url_for('.thread_by_id', thread_id=thread.id))