from flask import ( Blueprint, render_template, request, redirect, url_for, flash ) from .users import login_required, mod_only, get_active_user, is_logged_in from ..db import db from ..models import Threads, Topics, Posts, Subscriptions from ..constants import InfoboxKind from .posts import create_post from slugify import slugify import math import time bp = Blueprint("threads", __name__, url_prefix = "/threads/") @bp.get("/") def thread(slug): POSTS_PER_PAGE = 10 thread = Threads.find({"slug": slug}) if not thread: return "no" post_count = Posts.count({"thread_id": thread.id}) page_count = max(math.ceil(post_count / POSTS_PER_PAGE), 1) page = 1 after = request.args.get("after", default=None) if after is not None: after_id = int(after) post_position = Posts.count([ ("thread_id", "=", thread.id), ("id", "<=", after_id), ]) page = math.ceil((post_position) / POSTS_PER_PAGE) else: page = max(1, min(page_count, int(request.args.get("page", default = 1)))) posts = thread.get_posts(POSTS_PER_PAGE, (page - 1) * POSTS_PER_PAGE) topic = Topics.find({"id": thread.topic_id}) other_topics = Topics.select() is_subscribed = False if is_logged_in(): subscription = Subscriptions.find({ 'thread_id': thread.id, 'user_id': get_active_user().id, }) if subscription: if int(posts[-1]['created_at']) > int(subscription.last_seen): subscription.update({ 'last_seen': int(posts[-1]['created_at']) }) is_subscribed = True return render_template( "threads/thread.html", thread = thread, current_page = page, page_count = page_count, posts = posts, topic = topic, topics = other_topics, is_subscribed = is_subscribed, ) @bp.post("/") @login_required def reply(slug): thread = Threads.find({"slug": slug}) if not thread: return "no" user = get_active_user() if user.is_guest(): return "no" if thread.locked() and not user.is_mod(): return "no" post_content = request.form['post_content'] post = create_post(thread.id, user.id, post_content) subscription = Subscriptions.find({'user_id': user.id, 'thread_id': thread.id}) if subscription: subscription.update({'last_seen': int(time.time())}) elif request.form.get('subscribe', default=None) == 'on': Subscriptions.create({'user_id': user.id, 'thread_id': thread.id, 'last_seen': int(time.time())}) return redirect(url_for(".thread", slug=slug, after=post.id, _anchor="latest-post")) @bp.get("/create") @login_required def create(): all_topics = Topics.select() return render_template("threads/create.html", all_topics = all_topics) @bp.post("/create") @login_required def create_form(): topic = Topics.find({"id": request.form['topic_id']}) user = get_active_user() if not topic: return "no" if topic.is_locked and not get_active_user().is_mod(): return "no" title = request.form['title'].strip() now = int(time.time()) slug = f"{slugify(title)}-{now}" post_content = request.form['initial_post'] thread = Threads.create({ "topic_id": topic.id, "user_id": user.id, "title": title, "slug": slug, "created_at": now, }) post = create_post(thread.id, user.id, post_content) return redirect(url_for(".thread", slug = thread.slug)) @bp.post("//lock") @login_required def lock(slug): user = get_active_user() thread = Threads.find({'slug': slug}) if not ((thread.user_id == user.id) or user.is_mod()): return 'no' target_op = request.form.get('target_op') thread.update({ 'is_locked': target_op }) return redirect(url_for('.thread', slug=slug)) @bp.post("//sticky") @login_required @mod_only(".thread", slug = lambda slug: slug) def sticky(slug): user = get_active_user() thread = Threads.find({'slug': slug}) if not ((thread.user_id == user.id) or user.is_mod()): return 'no' target_op = request.form.get('target_op') thread.update({ 'is_stickied': target_op }) return redirect(url_for('.thread', slug=slug)) @bp.post("//move") @login_required @mod_only(".thread", slug = lambda slug: slug) def move(slug): user = get_active_user() new_topic_id = request.form.get('new_topic_id', default=None) if new_topic_id is None: flash('Thread is already in this topic.', InfoboxKind.ERROR) return redirect(url_for('.thread', slug=slug)) new_topic = Topics.find({ 'id': new_topic_id }) if not new_topic: return 'no' thread = Threads.find({ 'slug': slug }) if not thread: return 'no' if new_topic.id == thread.topic_id: flash('Thread is already in this topic.', InfoboxKind.ERROR) return redirect(url_for('.thread', slug=slug)) old_topic = Topics.find({'id': thread.topic_id}) thread.update({'topic_id': new_topic_id}) flash(f'Topic moved from "{old_topic.name}" to "{new_topic.name}".', InfoboxKind.INFO) return redirect(url_for('.thread', slug=slug)) @bp.post("//subscribe") @login_required def subscribe(slug): user = get_active_user() thread = Threads.find({'slug': slug}) if not thread: return 'no' subscription = Subscriptions.find({ 'user_id': user.id, 'thread_id': thread.id, }) if request.form['subscribe'] == 'subscribe': if subscription: subscription.delete() Subscriptions.create({ 'user_id': user.id, 'thread_id': thread.id, 'last_seen': int(time.time()), }) elif request.form['subscribe'] == 'unsubscribe': if not subscription: return 'no' subscription.delete() elif request.form['subscribe'] == 'read': if not subscription: return 'no' subscription.update({ 'last_seen': int(time.time()) }) last_visible_post = request.form.get('last_visible_post', default=None) if last_visible_post is not None: return redirect(url_for('.thread', slug=thread.slug, after=last_visible_post)) else: return redirect(url_for('users.inbox', username=user.username))