209 lines
6.7 KiB
Python
209 lines
6.7 KiB
Python
from flask import Blueprint, redirect, url_for, render_template, request, abort
|
|
from functools import wraps
|
|
from ..auth import login_required, get_active_user, is_logged_in
|
|
from ..models import Threads, Posts, Topics, Users, Reactions, Subscriptions
|
|
from ..util import get_form_checkbox, time_now
|
|
import math
|
|
|
|
bp = Blueprint('threads', __name__, url_prefix='/threads/')
|
|
|
|
def ownership_or_mod_required(view_func):
|
|
@wraps(view_func)
|
|
def wrapper(*args, **kwargs):
|
|
thread = Threads.find({'id': kwargs.get('thread_id', None)})
|
|
if not thread:
|
|
abort(404)
|
|
|
|
if thread.user_id != get_active_user().id and not get_active_user().is_mod():
|
|
abort(403)
|
|
|
|
return view_func(*args, **kwargs)
|
|
return wrapper
|
|
|
|
@bp.get('/<int:thread_id>/')
|
|
def thread_by_id(thread_id):
|
|
thread = Threads.find({'id': thread_id})
|
|
if not thread:
|
|
abort(404)
|
|
return redirect(url_for('.thread', thread_id=thread_id, slug=thread.slug, **request.args))
|
|
|
|
@bp.get('/<int:thread_id>/<slug>/')
|
|
def thread(thread_id, slug):
|
|
thread = Threads.find({'id': thread_id})
|
|
if not thread:
|
|
abort(404)
|
|
if thread.slug != slug:
|
|
return redirect(url_for('.thread', thread_id=thread_id, slug=thread.slug, **request.args))
|
|
|
|
topic = Topics.find({'id': thread.topic_id})
|
|
started_by = Users.find({'id': thread.user_id})
|
|
PER_PAGE = 10
|
|
posts_count = Posts.count({'thread_id': thread.id})
|
|
page_count = max(1, math.ceil(posts_count / PER_PAGE))
|
|
page = 1
|
|
after = request.args.get('after')
|
|
if after is not None:
|
|
try:
|
|
after_id = int(after)
|
|
post_position = Posts.count([
|
|
('thread_id', '=', thread.id),
|
|
('id', '<=', after_id),
|
|
])
|
|
page = math.ceil((post_position) / PER_PAGE)
|
|
except ValueError:
|
|
abort(404)
|
|
else:
|
|
try:
|
|
page = max(1, min(int(request.args.get('page', default=1)), page_count))
|
|
except ValueError:
|
|
abort(404)
|
|
posts = thread.get_posts(PER_PAGE, page)
|
|
last_post = posts[-1]
|
|
user = get_active_user()
|
|
if user:
|
|
subscription = Subscriptions.find({'user_id': user.id, 'thread_id': thread.id})
|
|
if subscription and last_post['created_at'] > int(subscription.last_seen):
|
|
subscription.update({'last_seen': last_post['created_at']})
|
|
return render_template(
|
|
'threads/thread.html', thread=thread,
|
|
posts=posts, page=page,
|
|
page_count=page_count, topic=topic,
|
|
started_by=started_by, topics=Topics.get_list(),
|
|
Reactions=Reactions, last_post=last_post
|
|
)
|
|
|
|
@bp.post('/<int:thread_id>/')
|
|
@login_required
|
|
def reply(thread_id):
|
|
user = get_active_user()
|
|
thread = Threads.find({'id': thread_id})
|
|
if not thread:
|
|
abort(404)
|
|
if not user.can_post_to_thread_or_topic(thread):
|
|
return redirect(url_for('.thread_by_id', thread_id=thread_id))
|
|
post = Posts.new(user.id, thread.id, request.form.get('babycode_content'))
|
|
subscription = Subscriptions.find({'user_id': user.id, 'thread_id': thread.id})
|
|
if get_form_checkbox('subscribe'):
|
|
if not subscription:
|
|
subscription = Subscriptions.create({
|
|
'user_id': user.id,
|
|
'thread_id': thread.id,
|
|
'last_seen': time_now(),
|
|
})
|
|
if subscription and subscription.last_seen < time_now():
|
|
subscription.update({'last_seen': time_now()})
|
|
return redirect(url_for('.thread_by_id', thread_id=thread_id, after=post.id, _anchor=f'post-{post.id}'))
|
|
|
|
@bp.get('/<int:thread_id>/edit/')
|
|
@login_required
|
|
@ownership_or_mod_required
|
|
def edit(thread_id):
|
|
thread = Threads.find({'id': thread_id})
|
|
if not thread:
|
|
abort(404)
|
|
return render_template('threads/edit.html', thread=thread)
|
|
|
|
@bp.post('/<int:thread_id>/edit/')
|
|
@login_required
|
|
@ownership_or_mod_required
|
|
def edit_post(thread_id):
|
|
thread = Threads.find({'id': thread_id})
|
|
if not thread:
|
|
abort(404)
|
|
new_title = request.form.get('title', '').strip()
|
|
if not new_title:
|
|
abort(400)
|
|
|
|
if new_title != thread.title:
|
|
from slugify import slugify
|
|
thread.update({'title': new_title, 'slug': slugify(new_title, max_length=50)})
|
|
|
|
return redirect(url_for('.thread_by_id', thread_id=thread_id))
|
|
|
|
@bp.post('/<int:thread_id>/subscribe/')
|
|
@login_required
|
|
def subscribe(thread_id):
|
|
thread = Threads.find({'id': thread_id})
|
|
if not thread:
|
|
abort(404)
|
|
|
|
user = get_active_user()
|
|
last_post_id = request.form.get('last_post_id', None)
|
|
if last_post_id is None:
|
|
abort(400)
|
|
|
|
if user.is_subscribed(thread_id):
|
|
return redirect(url_for('.thread_by_id', thread_id=thread_id, after=last_post_id))
|
|
|
|
Subscriptions.create({
|
|
'user_id': user.id,
|
|
'thread_id': thread_id,
|
|
'last_seen': request.form.get('last_post_timestamp', time_now())
|
|
})
|
|
|
|
return redirect(url_for('.thread_by_id', thread_id=thread_id, after=last_post_id))
|
|
|
|
@bp.post('/<int:thread_id>/unsubscribe/')
|
|
@login_required
|
|
def unsubscribe(thread_id):
|
|
thread = Threads.find({'id': thread_id})
|
|
if not thread:
|
|
abort(404)
|
|
|
|
user = get_active_user()
|
|
last_post_id = request.form.get('last_post_id', None)
|
|
|
|
return_to = request.form.get('return_to', None)
|
|
if return_to is None and last_post_id is None:
|
|
abort(400)
|
|
elif return_to is None and last_post_id is not None:
|
|
return_to = url_for('.thread_by_id', thread_id=thread_id, after=last_post_id)
|
|
|
|
subscription = Subscriptions.find({'user_id': user.id, 'thread_id': thread_id})
|
|
if not subscription:
|
|
return redirect(return_to)
|
|
|
|
subscription.delete()
|
|
return redirect(return_to)
|
|
|
|
@bp.get('/<int:thread_id>/feed.atom/')
|
|
def feed(thread_id):
|
|
return 'stub'
|
|
|
|
@bp.get('/new/')
|
|
@login_required
|
|
def new():
|
|
topics = Topics.select()
|
|
try:
|
|
selected_topic = int(request.args.get('topic_id'))
|
|
except ValueError, TypeError:
|
|
selected_topic = None
|
|
return render_template('threads/new_thread.html', topics=topics, selected_topic=selected_topic)
|
|
|
|
@bp.post('/new/')
|
|
@login_required
|
|
def new_post():
|
|
try:
|
|
topic_id = int(request.form.get('topic_id'))
|
|
except ValueError, TypeError:
|
|
abort(404)
|
|
topic_id = int(topic_id)
|
|
topic = Topics.find({'id': topic_id})
|
|
if not topic:
|
|
abort(404)
|
|
|
|
user = get_active_user()
|
|
if not user.can_post_to_thread_or_topic(topic):
|
|
abort(403)
|
|
|
|
title = request.form.get('title', '').strip()
|
|
if not title:
|
|
abort(400)
|
|
|
|
title = title.strip()
|
|
|
|
content = request.form.get('babycode_content')
|
|
|
|
thread = Threads.new(user.id, topic.id, title, content)
|
|
return redirect(url_for('.thread_by_id', thread_id=thread.id))
|