from flask import Flask, session, request, render_template, redirect, url_for from dotenv import load_dotenv from .models import Avatars, Users, PostHistory, Posts, MOTD, BadgeUploads, Sessions from .auth import digest, is_logged_in, get_active_user from .constants import ( PermissionLevel, permission_level_string, InfoboxKind, InfoboxHTMLClass, REACTION_EMOJI, MOTD_BANNED_TAGS, SIG_BANNED_TAGS, STRICT_BANNED_TAGS, ) from .lib.babycode import babycode_to_html, babycode_to_rssxml, EMOJI, BABYCODE_VERSION from .lib.exceptions import SiteNameMissingException from .util import get_post_url, dict_to_query_string, csrf_input, get_csrf_token from datetime import datetime, timezone from flask_caching import Cache import os import time import secrets import hmac import tomllib import json def create_default_avatar(): if Avatars.count() == 0: print('Creating default avatar reference') Avatars.create({ 'file_path': '/static/avatars/default.webp', 'uploaded_at': int(time.time()) }) def create_admin(): username = 'admin' if Users.count({'username': username}) == 0: print('!!!!!Creating admin account!!!!!') password_length = 16 password = secrets.token_urlsafe(password_length) hashed = digest(password) Users.create({ 'username': username, 'password_hash': hashed, 'permission': PermissionLevel.ADMIN.value, }) print(f"!!!!!Administrator account created, use '{username}' as the login and '{password}' as the password. This will only be shown once!!!!!") def create_deleted_user(): username = 'DeletedUser' if Users.count({'username': username.lower()}) == 0: print('Creating DeletedUser') Users.create({ 'username': username.lower(), 'display_name': username, 'password_hash': '', 'permission': PermissionLevel.SYSTEM.value, }) def reparse_babycode(): print('Re-parsing babycode, this may take a while...') from .db import db from .constants import MOTD_BANNED_TAGS post_histories_without_rss = PostHistory.findall([ ('markup_language', '=', 'babycode'), ('content_rss', 'IS', None), ]) with db.transaction(): for ph in post_histories_without_rss: ph.update({ 'content_rss': babycode_to_rssxml(ph['original_markup']), }) post_histories = PostHistory.findall([ ('markup_language', '=', 'babycode'), ('format_version', 'IS NOT', BABYCODE_VERSION) ]) if len(post_histories) > 0: print('Re-parsing user posts...') with db.transaction(): for ph in post_histories: ph.update({ 'content': babycode_to_html(ph['original_markup']).result, 'content_rss': babycode_to_rssxml(ph['original_markup']), 'format_version': BABYCODE_VERSION, }) print('Re-parsing posts done.') users_with_sigs = Users.findall([ ('signature_markup_language', '=', 'babycode'), ('signature_format_version', 'IS NOT', BABYCODE_VERSION), ('signature_original_markup', 'IS NOT', '') ]) if len(users_with_sigs) > 0: print('Re-parsing user sigs...') with db.transaction(): for user in users_with_sigs: user.update({ 'signature_rendered': babycode_to_html(user['signature_original_markup']).result, 'signature_format_version': BABYCODE_VERSION, }) print(f'Re-parsed {len(users_with_sigs)} user sigs.') stale_motds = MOTD.findall([ ['markup_language', '=', 'babycode'], ['format_version', 'IS NOT', BABYCODE_VERSION] ]) if stale_motds: print('Re-parsing MOTDs...') with db.transaction(): for motd in stale_motds: motd.update({ 'body_rendered': babycode_to_html(motd['body_original_markup'], banned_tags=MOTD_BANNED_TAGS).result, 'format_version': BABYCODE_VERSION, }) print('Re-parsing MOTDs done.') print('Re-parsing done.') def bind_default_badges(path): from .db import db with db.transaction(): potential_stales = BadgeUploads.get_default() d = os.listdir(path) for bu in potential_stales: if os.path.basename(bu.file_path) not in d: print(f'Deleted stale default badge{os.path.basename(bu.file_path)}') bu.delete() for f in d: real_path = os.path.join(path, f) if not os.path.isfile(real_path): continue if not f.endswith('.webp'): continue proxied_path = f'/static/badges/{f}' bu = BadgeUploads.find({'file_path': proxied_path}) if not bu: BadgeUploads.create({ 'file_path': proxied_path, 'uploaded_at': int(os.path.getmtime(real_path)), }) def clear_stale_sessions(): from .db import db with db.transaction(): now = int(time.time()) stale_sessions = Sessions.findall([ ('expires_at', '<', now) ]) for sess in stale_sessions: sess.delete() cache = Cache() def create_app(): app = Flask(__name__) app.config['SITE_NAME'] = 'Pyrom' app.config['DISABLE_SIGNUP'] = False app.config['MODS_CAN_INVITE'] = True app.config['USERS_CAN_INVITE'] = False app.config['ADMIN_CONTACT_INFO'] = '' app.config['GUIDE_DESCRIPTION'] = '' app.config['CACHE_TYPE'] = 'FileSystemCache' app.config['CACHE_DEFAULT_TIMEOUT'] = 300 try: app.config.from_file('../config/pyrom_config.toml', load=tomllib.load, text=False) except FileNotFoundError: print('No configuration file found, leaving defaults.') if os.getenv('PYROM_PROD') is None: app.static_folder = os.path.join(os.path.dirname(__file__), '../data/static') app.debug = True app.config['DB_PATH'] = 'data/db/db.dev.sqlite' app.config['SERVER_NAME'] = 'localhost:8080' load_dotenv() else: app.config['DB_PATH'] = 'data/db/db.prod.sqlite' if not app.config['SERVER_NAME']: raise SiteNameMissingException() app.config['SECRET_KEY'] = os.getenv('FLASK_SECRET_KEY') app.config['AVATAR_UPLOAD_PATH'] = 'data/static/avatars/' app.config['BADGES_PATH'] = 'data/static/badges/' app.config['BADGES_UPLOAD_PATH'] = 'data/static/badges/user/' app.config['MAX_CONTENT_LENGTH'] = 3 * 1000 * 1000 # 3M total, subject to further limits per route os.makedirs(os.path.dirname(app.config['DB_PATH']), exist_ok = True) os.makedirs(os.path.dirname(app.config['BADGES_UPLOAD_PATH']), exist_ok = True) if app.config['CACHE_TYPE'] == 'FileSystemCache': cache_dir = app.config.get('CACHE_DIR', 'data/_cached') os.makedirs(cache_dir, exist_ok = True) app.config['CACHE_DIR'] = cache_dir cache.init_app(app) from app.routes.app import bp as app_bp from app.routes.topics import bp as topics_bp from app.routes.threads import bp as threads_bp from app.routes.users import bp as users_bp from app.routes.guides import bp as guides_bp from app.routes.mod import bp as mod_bp from app.routes.posts import bp as posts_bp app.register_blueprint(app_bp) app.register_blueprint(topics_bp) app.register_blueprint(threads_bp) app.register_blueprint(users_bp) app.register_blueprint(guides_bp) app.register_blueprint(mod_bp) app.register_blueprint(posts_bp) with app.app_context(): from .schema import create as create_tables from .migrations import run_migrations create_tables() run_migrations() create_default_avatar() create_admin() create_deleted_user() clear_stale_sessions() reparse_babycode() bind_default_badges(app.config['BADGES_PATH']) app.config['SESSION_COOKIE_SECURE'] = True @app.before_request def revoke_session(): if is_logged_in(): sess = Sessions.find({'key': session['pyrom_session_key']}) if int(time.time()) > int(sess.expires_at): sess.delete() session.clear() return redirect(url_for('topics.all_topics')) @app.before_request def generate_csrf_token(): if is_logged_in() and not session.get('csrf'): rng = secrets.token_bytes(32) session_key = session['pyrom_session_key'] message = f'd${len(session_key)}${session_key}@{len(rng)}@{rng.hex()}' hashed = hmac.digest(app.config['SECRET_KEY'].encode('utf-8'), message.encode('utf-8'), 'SHA256') csrf_token = f'{hashed.hex()}.{rng.hex()}' session['csrf'] = csrf_token commit = '' with open('.git/refs/heads/main') as f: commit = f.read().strip() @app.context_processor def inject_constants(): return { 'InfoboxHTMLClass': InfoboxHTMLClass, 'InfoboxKind': InfoboxKind, 'PermissionLevel': PermissionLevel, '__commit': commit, '__emoji': EMOJI, 'REACTION_EMOJI': REACTION_EMOJI, 'MOTD_BANNED_TAGS': MOTD_BANNED_TAGS, 'SIG_BANNED_TAGS': SIG_BANNED_TAGS, } @app.context_processor def inject_funcs(): return { 'get_motds': MOTD.get_all, 'get_time_now': lambda: int(time.time()), 'is_logged_in': is_logged_in, 'is_mod': lambda: is_logged_in() and get_active_user().is_mod(), 'get_active_user': get_active_user, 'get_post_url': get_post_url, 'csrf_input': csrf_input, 'get_csrf_token': get_csrf_token, } @app.template_filter('ts_datetime') def ts_datetime(ts, format): return datetime.utcfromtimestamp(ts or int(time.time())).strftime(format) @app.template_filter('dict_to_query_string') def d2q(d): return dict_to_query_string(d) @app.template_filter('pluralize') def pluralize(subject, num=1, singular = '', plural = 's'): if int(num) == 1: return subject + singular return subject + plural @app.template_filter('permission_string') def permission_string(term): return permission_level_string(term) @app.template_filter('babycode') def babycode_filter(markup, nofrag=False): return babycode_to_html(markup, fragment=not nofrag).result @app.template_filter('babycode_strict') def babycode_strict_filter(markup, nofrag=False): return babycode_to_html(markup, banned_tags=STRICT_BANNED_TAGS, fragment=not nofrag).result @app.template_filter('basename_noext') def basename_noext(subj): return os.path.splitext(os.path.basename(subj))[0] @app.errorhandler(404) def _handle_404(e): if request.path.startswith('/hyperapi/'): return '