Files
pyrom/app/routes/threads.py
2026-04-29 21:42:22 +03:00

196 lines
6.0 KiB
Python

from flask import Blueprint, redirect, url_for, render_template, request, abort
from functools import wraps
from ..auth import login_required, get_active_user
from ..models import Threads, Posts, Topics, Users, Reactions, Subscriptions
from ..util import get_form_checkbox, time_now
import math
bp = Blueprint('threads', __name__, url_prefix='/threads/')
def ownership_or_mod_required(view_func):
@wraps(view_func)
def wrapper(*args, **kwargs):
thread = Threads.find({'id': kwargs.get('thread_id', None)})
if not thread:
abort(404)
if thread.user_id != get_active_user().id and not get_active_user().is_mod():
abort(403)
return view_func(*args, **kwargs)
return wrapper
@bp.get('/<int:thread_id>/')
def thread_by_id(thread_id):
thread = Threads.find({'id': thread_id})
if not thread:
abort(404)
return redirect(url_for('.thread', thread_id=thread_id, slug=thread.slug, **request.args))
@bp.get('/<int:thread_id>/<slug>/')
def thread(thread_id, slug):
thread = Threads.find({'id': thread_id})
if not thread:
abort(404)
if thread.slug != slug:
return redirect(url_for('.thread', thread_id=thread_id, slug=thread.slug, **request.kwargs))
topic = Topics.find({'id': thread.topic_id})
started_by = Users.find({'id': thread.user_id})
PER_PAGE = 10
posts_count = Posts.count({'thread_id': thread.id})
page_count = max(1, math.ceil(posts_count / PER_PAGE))
page = 1
after = request.args.get('after')
if after is not None:
try:
after_id = int(after)
post_position = Posts.count([
('thread_id', '=', thread.id),
('id', '<=', after_id),
])
page = math.ceil((post_position) / PER_PAGE)
except ValueError:
abort(404)
else:
try:
page = max(1, min(int(request.args.get('page', default=1)), page_count))
except ValueError:
abort(404)
posts = thread.get_posts(PER_PAGE, page)
last_post = posts[-1]
return render_template(
'threads/thread.html', thread=thread,
posts=posts, page=page,
page_count=page_count, topic=topic,
started_by=started_by, topics=Topics.get_list(),
Reactions=Reactions, last_post=last_post
)
@bp.post('/<int:thread_id>/')
@login_required
def reply(thread_id):
user = get_active_user()
thread = Threads.find({'id': thread_id})
if not thread:
abort(404)
if not user.can_post_to_thread_or_topic(thread):
return redirect(url_for('.thread_by_id', thread_id=thread_id))
post = Posts.new(user.id, thread.id, request.form.get('babycode_content'))
if get_form_checkbox('subscribe'):
if not Subscriptions.find({'user_id': user.id, 'thread_id': thread.id}):
Subscriptions.create({
'user_id': user.id,
'thread_id': thread.id,
'last_seen': time_now(),
})
return redirect(url_for('.thread_by_id', thread_id=thread_id, after=post.id, _anchor=f'post-{post.id}'))
@bp.get('/<int:thread_id>/edit/')
@login_required
@ownership_or_mod_required
def edit(thread_id):
thread = Threads.find({'id': thread_id})
if not thread:
abort(404)
return render_template('threads/edit.html', thread=thread)
@bp.post('/<int:thread_id>/edit/')
@login_required
@ownership_or_mod_required
def edit_post(thread_id):
thread = Threads.find({'id': thread_id})
if not thread:
abort(404)
new_title = request.form.get('title', '').strip()
if not new_title:
abort(400)
if new_title != thread.title:
thread.update({'title': new_title})
return redirect(url_for('.thread_by_id', thread_id=thread_id))
@bp.post('/<int:thread_id>/subscribe/')
@login_required
def subscribe(thread_id):
thread = Threads.find({'id': thread_id})
if not thread:
abort(404)
user = get_active_user()
last_post_id = request.form.get('last_post_id', None)
if last_post_id is None:
abort(400)
if user.is_subscribed(thread_id):
return redirect(url_for('.thread_by_id', thread_id=thread_id, after=last_post_id))
Subscriptions.create({
'user_id': user.id,
'thread_id': thread_id,
'last_seen': request.form.get('last_post_timestamp', time_now())
})
return redirect(url_for('.thread_by_id', thread_id=thread_id, after=last_post_id))
@bp.post('/<int:thread_id>/unsubscribe/')
@login_required
def unsubscribe(thread_id):
thread = Threads.find({'id': thread_id})
if not thread:
abort(404)
user = get_active_user()
last_post_id = request.form.get('last_post_id', None)
if last_post_id is None:
abort(400)
subscription = Subscriptions.find({'user_id': user.id, 'thread_id': thread_id})
if not subscription:
return redirect(url_for('.thread_by_id', thread_id=thread_id, after=last_post_id))
subscription.delete()
return redirect(url_for('.thread_by_id', thread_id=thread_id, after=last_post_id))
@bp.get('/<int:thread_id>/feed.atom/')
def feed(thread_id):
return 'stub'
@bp.get('/new/')
@login_required
def new():
topics = Topics.select()
try:
selected_topic = int(request.args.get('topic_id'))
except ValueError, TypeError:
selected_topic = None
return render_template('threads/new_thread.html', topics=topics, selected_topic=selected_topic)
@bp.post('/new/')
@login_required
def new_post():
try:
topic_id = int(request.form.get('topic_id'))
except ValueError, TypeError:
abort(404)
topic_id = int(topic_id)
topic = Topics.find({'id': topic_id})
if not topic:
abort(404)
user = get_active_user()
if not user.can_post_to_thread_or_topic(topic):
abort(403)
title = request.form.get('title', '').strip()
if not title:
abort(400)
title = title.strip()
content = request.form.get('babycode_content')
thread = Threads.new(user.id, topic.id, title, content)
return redirect(url_for('.thread', slug=thread.slug))