Files
pyrom/app/models.py

532 lines
17 KiB
Python

from .db import Model, db
from .constants import PermissionLevel
from flask import current_app
import time
class Users(Model):
table = 'users'
def get_avatar_url(self):
return Avatars.find({'id': self.avatar_id}).file_path
def is_default_avatar(self):
return int(Avatars.find({'id': self.avatar_id}).id) == 1
def is_guest(self):
return self.permission == PermissionLevel.GUEST.value
def is_mod(self):
return self.permission >= PermissionLevel.MODERATOR.value
def is_mod_only(self):
return self.permission == PermissionLevel.MODERATOR.value
def is_admin(self):
return self.permission == PermissionLevel.ADMIN.value
def is_system(self):
return self.permission == PermissionLevel.SYSTEM.value
def is_default_avatar(self):
return self.avatar_id == 1
def get_post_stats(self):
q = """SELECT
COUNT(DISTINCT posts.id) AS post_count,
COUNT(DISTINCT threads.id) AS thread_count,
MAX(threads.title) FILTER (WHERE threads.created_at = latest.created_at) AS latest_thread_title,
MAX(threads.slug) FILTER (WHERE threads.created_at = latest.created_at) AS latest_thread_slug,
inviter.username AS inviter_username,
inviter.display_name AS inviter_display_name
FROM users
LEFT JOIN posts ON posts.user_id = users.id
LEFT JOIN threads ON threads.user_id = users.id
LEFT JOIN (
SELECT user_id, MAX(created_at) AS created_at
FROM threads
GROUP BY user_id
) latest ON latest.user_id = users.id
LEFT JOIN users AS inviter ON inviter.id = users.invited_by
WHERE users.id = ?"""
return db.fetch_one(q, self.id)
def get_all_subscriptions(self):
q = """
SELECT threads.title AS thread_title, threads.slug AS thread_slug
FROM
threads
JOIN
subscriptions ON subscriptions.thread_id = threads.id
WHERE
subscriptions.user_id = ?"""
return db.query(q, self.id)
def can_post_to_topic(self, topic):
if self.is_guest():
return False
if self.is_mod():
return True
if topic['is_locked']:
return False
return True
def can_invite(self):
if not current_app.config['DISABLE_SIGNUP']:
return False
if current_app.config['MODS_CAN_INVITE'] and self.is_mod():
return True
if current_app.config['USERS_CAN_INVITE'] and not self.is_guest():
return True
return False
def get_bookmark_collections(self):
q = 'SELECT id FROM bookmark_collections WHERE user_id = ? ORDER BY sort_order ASC'
res = db.query(q, self.id)
return [BookmarkCollections.find({'id': bc['id']}) for bc in res]
def get_readable_name(self):
if self.display_name:
return self.display_name
return self.username
def has_display_name(self):
return self.display_name != ''
def get_badges(self):
return Badges.findall({'user_id': int(self.id)})
class Topics(Model):
table = 'topics'
@classmethod
def get_list(_cls):
q = """
SELECT
topics.id, topics.name, topics.slug, topics.description, topics.is_locked,
COUNT(DISTINCT threads.id) as threads_count,
COUNT(posts.id) AS posts_count,
MAX(posts.created_at) as latest_post_timestamp
FROM
topics
LEFT JOIN
threads ON threads.topic_id = topics.id
LEFT JOIN
posts ON posts.thread_id = threads.id
GROUP BY topics.id ORDER BY topics.sort_order ASC"""
return db.query(q)
@classmethod
def new(_cls, name: str, description: str) -> Topics:
from slugify import slugify
name = name.strip()
description = description.strip()
now = int(time.time())
slug = f'{slugify(name)}-{now}'
topic_count = Topics.count()
return Topics.create({
'name': name,
'description': description,
'slug': slug,
'sort_order': topic_count + 1,
})
def get_threads(self, per_page, page, sort_by = 'activity'):
order_clause = ''
if sort_by == 'thread':
order_clause = 'ORDER BY threads.is_stickied DESC, threads.created_at DESC'
else:
order_clause = 'ORDER BY threads.is_stickied DESC, latest_post_created_at DESC'
q = """
WITH latest_posts AS (
SELECT
thread_id,
id AS latest_post_id,
user_id AS latest_post_user_id,
created_at AS latest_post_created_at,
ROW_NUMBER() OVER (PARTITION BY thread_id ORDER BY created_at DESC) AS rn
FROM posts
),
post_counts AS (
SELECT
thread_id,
COUNT(*) AS posts_count
FROM posts
GROUP BY thread_id
)
SELECT
threads.id,
threads.title,
threads.slug,
threads.created_at,
threads.is_locked,
threads.is_stickied,
starter.username AS started_by,
starter.display_name AS started_by_display_name,
latest_poster.username AS latest_post_username,
latest_poster.display_name AS latest_post_display_name,
latest_posts.latest_post_created_at,
latest_posts.latest_post_id,
COALESCE(post_counts.posts_count, 0) AS posts_count
FROM threads
JOIN users AS starter ON starter.id = threads.user_id
LEFT JOIN latest_posts ON latest_posts.thread_id = threads.id AND latest_posts.rn = 1
LEFT JOIN users AS latest_poster ON latest_poster.id = latest_posts.latest_post_user_id
LEFT JOIN post_counts ON post_counts.thread_id = threads.id
WHERE threads.topic_id = ?
""" + order_clause + ' LIMIT ? OFFSET ?'
return db.query(q, self.id, per_page, (page - 1) * per_page)
def get_threads_with_op_rss(self):
q = """
SELECT
threads.id, threads.title, threads.slug, threads.created_at, threads.is_locked, threads.is_stickied,
users.username AS started_by,
users.display_name AS started_by_display_name,
ph.content_rss AS original_post_content,
posts.id AS original_post_id
FROM
threads
JOIN users ON users.id = threads.user_id
JOIN (
SELECT
posts.thread_id,
posts.id,
posts.user_id,
posts.created_at,
posts.current_revision_id,
ROW_NUMBER() OVER (PARTITION BY posts.thread_id ORDER BY posts.created_at ASC) AS rn
FROM
posts
) posts ON posts.thread_id = threads.id AND posts.rn = 1
JOIN
post_history ph ON ph.id = posts.current_revision_id
JOIN
users u ON u.id = posts.user_id
WHERE
threads.topic_id = ?
ORDER BY threads.created_at DESC"""
return db.query(q, self.id)
def locked(self):
return bool(self.is_locked)
class Threads(Model):
table = 'threads'
def get_posts(self, per_page, page):
q = Posts.FULL_POSTS_QUERY + ' WHERE posts.thread_id = ? ORDER BY posts.created_at ASC LIMIT ? OFFSET ?'
return db.query(q, self.id, per_page, (page - 1) * per_page)
def get_posts_rss(self):
q = Posts.FULL_POSTS_QUERY + ' WHERE posts.thread_id = ?'
return db.query(q, self.id)
def locked(self):
return bool(self.is_locked)
def stickied(self):
return bool(self.is_stickied)
@classmethod
def new(cls, user_id: int, topic_id: int, title: str, content: str, language: str = 'babycode') -> Threads:
from slugify import slugify
now = int(time.time())
slug = f'{slugify(title)}-{now}'
thread = Threads.create({
'topic_id': topic_id,
'user_id': user_id,
'title': title.strip(),
'slug': slug,
'created_at': int(time.time()),
})
post = Posts.new(user_id, thread.id, content, language)
return thread
class Posts(Model):
FULL_POSTS_QUERY = """
WITH user_badges AS (
SELECT
b.user_id,
json_group_array(
json_object(
'label', b.label,
'link', b.link,
'sort_order', b.sort_order,
'file_path', bu.file_path
)
) AS badges_json
FROM badges b
LEFT JOIN badge_uploads bu ON b.upload = bu.id
GROUP BY b.user_id
ORDER BY b.sort_order
)
SELECT
posts.id, posts.created_at,
post_history.content, post_history.edited_at, post_history.content_rss,
users.username, users.display_name, users.status,
avatars.file_path AS avatar_path, posts.thread_id,
users.id AS user_id, post_history.original_markup,
users.signature_rendered, threads.slug AS thread_slug,
threads.is_locked AS thread_is_locked, threads.title AS thread_title,
COALESCE(user_badges.badges_json, '[]') AS badges_json
FROM
posts
JOIN
post_history ON posts.current_revision_id = post_history.id
JOIN
users ON posts.user_id = users.id
JOIN
threads ON posts.thread_id = threads.id
LEFT JOIN
avatars ON users.avatar_id = avatars.id
LEFT JOIN
user_badges ON users.id = user_badges.user_id"""
table = 'posts'
def get_full_post_view(self):
q = f'{self.FULL_POSTS_QUERY} WHERE posts.id = ?'
return db.fetch_one(q, self.id)
@classmethod
def new(cls, user_id: int, thread_id: int, content: str, language: str = 'babycode') -> Posts:
from .lib.babycode import babycode_to_html, babycode_to_rssxml, BABYCODE_VERSION
html_content = babycode_to_html(content)
rssxml_content = babycode_to_rssxml(content)
with db.transaction():
post = Posts.create({
'thread_id': thread_id,
'user_id': user_id,
'current_revision_id': None,
})
revision = PostHistory.create({
'post_id': post.id,
'content': html_content.result,
'content_rss': rssxml_content,
'is_initial_revision': True,
'original_markup': content,
'markup_language': language,
'format_version': BABYCODE_VERSION,
})
for mention in html_content.mentions:
Mentions.create({
'revision_id': revision.id,
'mentioned_iser_id': mention['mentioned_iser_id'],
'start_index': mention['start'],
'end_index': mention['end'],
})
post.update({'current_revision_id': revision.id})
return post
class PostHistory(Model):
table = 'post_history'
class Sessions(Model):
table = 'sessions'
class Avatars(Model):
table = 'avatars'
class Subscriptions(Model):
table = 'subscriptions'
def get_unread_count(self):
q = """SELECT COUNT(*) AS unread_count
FROM posts
LEFT JOIN subscriptions ON subscriptions.thread_id = posts.thread_id
WHERE subscriptions.user_id = ? AND posts.created_at > subscriptions.last_seen AND posts.thread_id = ?"""
res = db.fetch_one(q, self.user_id, self.thread_id)
if res:
return res['unread_count']
return None
class APIRateLimits(Model):
table = 'api_rate_limits'
@classmethod
def is_allowed(cls, user_id, method, seconds):
q = """
SELECT logged_at FROM api_rate_limits
WHERE user_id = ? AND method = ?
ORDER BY logged_at DESC LIMIT 1"""
last_call = db.fetch_one(q, user_id, method)
if last_call is None or (int(time.time()) - int(last_call['logged_at']) >= seconds):
with db.transaction():
db.query(
'DELETE FROM api_rate_limits WHERE user_id = ? AND method = ?',
user_id, method
)
db.query(
'INSERT INTO api_rate_limits (user_id, method) VALUES (?, ?)',
user_id, method
)
return True
else:
return False
class Reactions(Model):
table = 'reactions'
@classmethod
def for_post(cls, post_id):
qb = db.QueryBuilder(cls.table)\
.select('reaction_text, COUNT(*) as c')\
.where({'post_id': post_id})\
.group_by('reaction_text')\
.order_by('c', False)
result = qb.all()
return result if result else []
@classmethod
def get_users(cls, post_id, reaction_text):
q = """
SELECT user_id, username FROM reactions
JOIN
users ON users.id = user_id
WHERE
post_id = ? AND reaction_text = ?
"""
return db.query(q, post_id, reaction_text)
class PasswordResetLinks(Model):
table = 'password_reset_links'
class InviteKeys(Model):
table = 'invite_keys'
class BookmarkCollections(Model):
table = 'bookmark_collections'
@classmethod
def create_default(cls, user_id):
q = """INSERT INTO bookmark_collections (user_id, name, is_default, sort_order)
VALUES (?, "Bookmarks", 1, 0) RETURNING id
"""
res = db.fetch_one(q, user_id)
def has_posts(self):
q = 'SELECT EXISTS(SELECT 1 FROM bookmarked_posts WHERE collection_id = ?) as e'
res = db.fetch_one(q, self.id)['e']
return int(res) == 1
def has_threads(self):
q = 'SELECT EXISTS(SELECT 1 FROM bookmarked_threads WHERE collection_id = ?) as e'
res = db.fetch_one(q, self.id)['e']
return int(res) == 1
def is_empty(self):
return not (self.has_posts() or self.has_threads())
def get_threads(self):
q = 'SELECT id FROM bookmarked_threads WHERE collection_id = ?'
res = db.query(q, self.id)
return [BookmarkedThreads.find({'id': bt['id']}) for bt in res]
def get_posts(self):
q = 'SELECT id FROM bookmarked_posts WHERE collection_id = ?'
res = db.query(q, self.id)
return [BookmarkedPosts.find({'id': bt['id']}) for bt in res]
def get_threads_count(self):
q = 'SELECT COUNT(*) as tc FROM bookmarked_threads WHERE collection_id = ?'
res = db.fetch_one(q, self.id)
return int(res['tc'])
def get_posts_count(self):
q = 'SELECT COUNT(*) as pc FROM bookmarked_posts WHERE collection_id = ?'
res = db.fetch_one(q, self.id)
return int(res['pc'])
def has_thread(self, thread_id):
q = 'SELECT EXISTS(SELECT 1 FROM bookmarked_threads WHERE collection_id = ? AND thread_id = ?) as e'
res = db.fetch_one(q, self.id, int(thread_id))['e']
return int(res) == 1
def has_post(self, post_id):
q = 'SELECT EXISTS(SELECT 1 FROM bookmarked_posts WHERE collection_id = ? AND post_id = ?) as e'
res = db.fetch_one(q, self.id, int(post_id))['e']
return int(res) == 1
class BookmarkedPosts(Model):
table = 'bookmarked_posts'
def get_post(self):
return Posts.find({'id': self.post_id})
class BookmarkedThreads(Model):
table = 'bookmarked_threads'
def get_thread(self):
return Threads.find({'id': self.thread_id})
class MOTD(Model):
table = 'motd'
@classmethod
def has_motd(cls):
q = 'SELECT EXISTS(SELECT 1 FROM motd) as e'
res = db.fetch_one(q)['e']
return int(res) == 1
@classmethod
def get_all(cls):
q = 'SELECT id FROM motd'
res = db.query(q)
return [MOTD.find({'id': i['id']}) for i in res]
class Mentions(Model):
table = 'mentions'
class BadgeUploads(Model):
table = 'badge_uploads'
@classmethod
def get_default(cls):
return BadgeUploads.findall({'user_id': None}, 'IS')
@classmethod
def get_for_user(cls, user_id):
q = 'SELECT * FROM badge_uploads WHERE user_id = ? OR user_id IS NULL ORDER BY uploaded_at'
res = db.query(q, int(user_id))
return [cls.from_data(row) for row in res]
@classmethod
def get_unused_for_user(cls, user_id):
q = 'SELECT bu.* FROM badge_uploads bu LEFT JOIN badges b ON bu.id = b.upload WHERE bu.user_id = ? AND b.upload IS NULL'
res = db.query(q, int(user_id))
return [cls.from_data(row) for row in res]
class Badges(Model):
table = 'badges'
def get_image_url(self):
bu = BadgeUploads.find({'id': int(self.upload)})
return bu.file_path