from flask import Flask, session, request, render_template from dotenv import load_dotenv from .models import Avatars, Users, PostHistory, Posts, MOTD, BadgeUploads from .auth import digest from .routes.users import is_logged_in, get_active_user, get_prefers_theme from .routes.threads import get_post_url from .constants import ( PermissionLevel, permission_level_string, InfoboxKind, InfoboxHTMLClass, REACTION_EMOJI, MOTD_BANNED_TAGS, SIG_BANNED_TAGS, STRICT_BANNED_TAGS, ) from .lib.babycode import babycode_to_html, EMOJI, BABYCODE_VERSION from datetime import datetime import os import time import secrets import tomllib import json def create_default_avatar(): if Avatars.count() == 0: print("Creating default avatar reference") Avatars.create({ "file_path": "/static/avatars/default.webp", "uploaded_at": int(time.time()) }) def create_admin(): username = "admin" if Users.count({"username": username}) == 0: print("!!!!!Creating admin account!!!!!") password_length = 16 password = secrets.token_urlsafe(password_length) hashed = digest(password) Users.create({ "username": username, "password_hash": hashed, "permission": PermissionLevel.ADMIN.value, }) print(f"!!!!!Administrator account created, use '{username}' as the login and '{password}' as the password. This will only be shown once!!!!!") def create_deleted_user(): username = "DeletedUser" if Users.count({"username": username.lower()}) == 0: print("Creating DeletedUser") Users.create({ "username": username.lower(), "display_name": username, "password_hash": "", "permission": PermissionLevel.SYSTEM.value, }) def reparse_babycode(): print('Re-parsing babycode, this may take a while...') from .db import db from .constants import MOTD_BANNED_TAGS post_histories = PostHistory.findall([ ('markup_language', '=', 'babycode'), ('format_version', 'IS NOT', BABYCODE_VERSION) ]) if len(post_histories) > 0: print('Re-parsing user posts...') with db.transaction(): for ph in post_histories: ph.update({ 'content': babycode_to_html(ph['original_markup']).result, 'format_version': BABYCODE_VERSION, }) print('Re-parsing posts done.') users_with_sigs = Users.findall([ ('signature_markup_language', '=', 'babycode'), ('signature_format_version', 'IS NOT', BABYCODE_VERSION), ('signature_original_markup', 'IS NOT', '') ]) if len(users_with_sigs) > 0: print('Re-parsing user sigs...') with db.transaction(): for user in users_with_sigs: user.update({ 'signature_rendered': babycode_to_html(user['signature_original_markup']).result, 'signature_format_version': BABYCODE_VERSION, }) print(f'Re-parsed {len(users_with_sigs)} user sigs.') stale_motds = MOTD.findall([ ['markup_language', '=', 'babycode'], ['format_version', 'IS NOT', BABYCODE_VERSION] ]) if stale_motds: print('Re-parsing MOTDs...') with db.transaction(): for motd in stale_motds: motd.update({ 'body_rendered': babycode_to_html(motd['body_original_markup'], banned_tags=MOTD_BANNED_TAGS).result, 'format_version': BABYCODE_VERSION, }) print('Re-parsing MOTDs done.') print('Re-parsing done.') def bind_default_badges(path): from .db import db with db.transaction(): potential_stales = BadgeUploads.get_default() d = os.listdir(path) for bu in potential_stales: if os.path.basename(bu.file_path) not in d: print(f'Deleted stale default badge{os.path.basename(bu.file_path)}') bu.delete() for f in d: real_path = os.path.join(path, f) if not os.path.isfile(real_path): continue if not f.endswith('.webp'): continue proxied_path = f'/static/badges/{f}' bu = BadgeUploads.find({'file_path': proxied_path}) if not bu: BadgeUploads.create({ 'file_path': proxied_path, 'uploaded_at': int(os.path.getmtime(real_path)), }) def create_app(): app = Flask(__name__) app.config['SITE_NAME'] = 'Pyrom' app.config['DISABLE_SIGNUP'] = False app.config['MODS_CAN_INVITE'] = True app.config['USERS_CAN_INVITE'] = False app.config['ADMIN_CONTACT_INFO'] = '' app.config['GUIDE_DESCRIPTION'] = '' try: app.config.from_file('../config/pyrom_config.toml', load=tomllib.load, text=False) except FileNotFoundError: print('No configuration file found, leaving defaults.') if os.getenv("PYROM_PROD") is None: app.static_folder = os.path.join(os.path.dirname(__file__), "../data/static") app.debug = True app.config["DB_PATH"] = "data/db/db.dev.sqlite" load_dotenv() else: app.config["DB_PATH"] = "data/db/db.prod.sqlite" app.config["SECRET_KEY"] = os.getenv("FLASK_SECRET_KEY") app.config['AVATAR_UPLOAD_PATH'] = 'data/static/avatars/' app.config['BADGES_PATH'] = 'data/static/badges/' app.config['BADGES_UPLOAD_PATH'] = 'data/static/badges/user/' app.config['MAX_CONTENT_LENGTH'] = 3 * 1000 * 1000 # 3M total, subject to further limits per route os.makedirs(os.path.dirname(app.config["DB_PATH"]), exist_ok = True) os.makedirs(os.path.dirname(app.config["BADGES_UPLOAD_PATH"]), exist_ok = True) css_dir = 'data/static/css/' allowed_themes = [] for f in os.listdir(css_dir): if not os.path.isfile(os.path.join(css_dir, f)): continue theme_name = os.path.splitext(os.path.basename(f))[0] allowed_themes.append(theme_name) allowed_themes.sort(key=(lambda x: (x != 'style', x))) app.config['allowed_themes'] = allowed_themes with app.app_context(): from .schema import create as create_tables from .migrations import run_migrations create_tables() run_migrations() create_default_avatar() create_admin() create_deleted_user() reparse_babycode() bind_default_badges(app.config['BADGES_PATH']) from app.routes.app import bp as app_bp from app.routes.topics import bp as topics_bp from app.routes.threads import bp as threads_bp from app.routes.users import bp as users_bp from app.routes.mod import bp as mod_bp from app.routes.api import bp as api_bp from app.routes.posts import bp as posts_bp from app.routes.hyperapi import bp as hyperapi_bp from app.routes.guides import bp as guides_bp app.register_blueprint(app_bp) app.register_blueprint(topics_bp) app.register_blueprint(threads_bp) app.register_blueprint(users_bp) app.register_blueprint(mod_bp) app.register_blueprint(api_bp) app.register_blueprint(posts_bp) app.register_blueprint(hyperapi_bp) app.register_blueprint(guides_bp) app.config['SESSION_COOKIE_SECURE'] = True @app.before_request def make_session_permanent(): session.permanent = True commit = "" with open('.git/refs/heads/main') as f: commit = f.read().strip() @app.context_processor def inject_constants(): return { "InfoboxHTMLClass": InfoboxHTMLClass, "InfoboxKind": InfoboxKind, "PermissionLevel": PermissionLevel, "__commit": commit, "__emoji": EMOJI, "REACTION_EMOJI": REACTION_EMOJI, "MOTD_BANNED_TAGS": MOTD_BANNED_TAGS, "SIG_BANNED_TAGS": SIG_BANNED_TAGS, } @app.context_processor def inject_auth(): return {"is_logged_in": is_logged_in, "get_active_user": get_active_user, "active_user": get_active_user()} @app.context_processor def inject_funcs(): return { 'get_post_url': get_post_url, 'get_prefers_theme': get_prefers_theme, 'get_motds': MOTD.get_all, } @app.template_filter("ts_datetime") def ts_datetime(ts, format): return datetime.utcfromtimestamp(ts or int(time.time())).strftime(format) @app.template_filter("pluralize") def pluralize(subject, num=1, singular = "", plural = "s"): if int(num) == 1: return subject + singular return subject + plural @app.template_filter("permission_string") def permission_string(term): return permission_level_string(term) @app.template_filter('babycode') def babycode_filter(markup): return babycode_to_html(markup).result @app.template_filter('babycode_strict') def babycode_strict_filter(markup): return babycode_to_html(markup, STRICT_BANNED_TAGS).result @app.template_filter('extract_h2') def extract_h2(content): import re pattern = r']*>(.*?)<\/h2>' matches = re.findall(pattern, content, re.IGNORECASE | re.DOTALL) return [ {'id': id_.strip(), 'text': text.strip()} for id_, text in matches ] @app.template_filter('basename_noext') def basename_noext(subj): return os.path.splitext(os.path.basename(subj))[0] @app.errorhandler(404) def _handle_404(e): if request.path.startswith('/hyperapi/'): return '

not found

', e.code elif request.path.startswith('/api/'): return {'error': 'not found'}, e.code else: return render_template('common/404.html'), e.code @app.errorhandler(413) def _handle_413(e): if request.path.startswith('/hyperapi/'): return '

request body too large

', e.code elif request.path.startswith('/api/'): return {'error': 'body too large'}, e.code else: return render_template('common/413.html'), e.code # this only happens at build time but # build time is when updates are done anyway # sooo... /shrug @app.template_filter('cachebust') def cachebust(subject): return f"{subject}?v={str(int(time.time()))}" @app.template_filter('theme_name') def get_theme_name(subject: str): if subject == 'style': return 'Default' return f'{subject.removeprefix('theme-').replace('-', ' ').capitalize()} (beta)' @app.template_filter('fromjson') def fromjson(subject: str): return json.loads(subject) return app