334 lines
11 KiB
Python
334 lines
11 KiB
Python
from flask import Flask, session, request, render_template
|
|
from dotenv import load_dotenv
|
|
from .models import Avatars, Users, PostHistory, Posts, MOTD, BadgeUploads, Sessions
|
|
from .auth import digest, is_logged_in, get_active_user
|
|
from .constants import (
|
|
PermissionLevel, permission_level_string,
|
|
InfoboxKind, InfoboxHTMLClass,
|
|
REACTION_EMOJI, MOTD_BANNED_TAGS,
|
|
SIG_BANNED_TAGS, STRICT_BANNED_TAGS,
|
|
)
|
|
from .lib.babycode import babycode_to_html, babycode_to_rssxml, EMOJI, BABYCODE_VERSION
|
|
from .lib.exceptions import SiteNameMissingException
|
|
from .util import get_post_url, dict_to_query_string
|
|
from datetime import datetime, timezone
|
|
from flask_caching import Cache
|
|
import os
|
|
import time
|
|
import secrets
|
|
import tomllib
|
|
import json
|
|
|
|
def create_default_avatar():
|
|
if Avatars.count() == 0:
|
|
print('Creating default avatar reference')
|
|
Avatars.create({
|
|
'file_path': '/static/avatars/default.webp',
|
|
'uploaded_at': int(time.time())
|
|
})
|
|
|
|
def create_admin():
|
|
username = 'admin'
|
|
if Users.count({'username': username}) == 0:
|
|
print('!!!!!Creating admin account!!!!!')
|
|
password_length = 16
|
|
password = secrets.token_urlsafe(password_length)
|
|
hashed = digest(password)
|
|
Users.create({
|
|
'username': username,
|
|
'password_hash': hashed,
|
|
'permission': PermissionLevel.ADMIN.value,
|
|
})
|
|
print(f"!!!!!Administrator account created, use '{username}' as the login and '{password}' as the password. This will only be shown once!!!!!")
|
|
|
|
def create_deleted_user():
|
|
username = 'DeletedUser'
|
|
if Users.count({'username': username.lower()}) == 0:
|
|
print('Creating DeletedUser')
|
|
Users.create({
|
|
'username': username.lower(),
|
|
'display_name': username,
|
|
'password_hash': '',
|
|
'permission': PermissionLevel.SYSTEM.value,
|
|
})
|
|
|
|
def reparse_babycode():
|
|
print('Re-parsing babycode, this may take a while...')
|
|
from .db import db
|
|
from .constants import MOTD_BANNED_TAGS
|
|
|
|
post_histories_without_rss = PostHistory.findall([
|
|
('markup_language', '=', 'babycode'),
|
|
('content_rss', 'IS', None),
|
|
])
|
|
|
|
with db.transaction():
|
|
for ph in post_histories_without_rss:
|
|
ph.update({
|
|
'content_rss': babycode_to_rssxml(ph['original_markup']),
|
|
})
|
|
|
|
post_histories = PostHistory.findall([
|
|
('markup_language', '=', 'babycode'),
|
|
('format_version', 'IS NOT', BABYCODE_VERSION)
|
|
])
|
|
if len(post_histories) > 0:
|
|
print('Re-parsing user posts...')
|
|
with db.transaction():
|
|
for ph in post_histories:
|
|
ph.update({
|
|
'content': babycode_to_html(ph['original_markup']).result,
|
|
'content_rss': babycode_to_rssxml(ph['original_markup']),
|
|
'format_version': BABYCODE_VERSION,
|
|
})
|
|
print('Re-parsing posts done.')
|
|
|
|
users_with_sigs = Users.findall([
|
|
('signature_markup_language', '=', 'babycode'),
|
|
('signature_format_version', 'IS NOT', BABYCODE_VERSION),
|
|
('signature_original_markup', 'IS NOT', '')
|
|
])
|
|
if len(users_with_sigs) > 0:
|
|
print('Re-parsing user sigs...')
|
|
with db.transaction():
|
|
for user in users_with_sigs:
|
|
user.update({
|
|
'signature_rendered': babycode_to_html(user['signature_original_markup']).result,
|
|
'signature_format_version': BABYCODE_VERSION,
|
|
})
|
|
print(f'Re-parsed {len(users_with_sigs)} user sigs.')
|
|
|
|
stale_motds = MOTD.findall([
|
|
['markup_language', '=', 'babycode'],
|
|
['format_version', 'IS NOT', BABYCODE_VERSION]
|
|
])
|
|
if stale_motds:
|
|
print('Re-parsing MOTDs...')
|
|
with db.transaction():
|
|
for motd in stale_motds:
|
|
motd.update({
|
|
'body_rendered': babycode_to_html(motd['body_original_markup'], banned_tags=MOTD_BANNED_TAGS).result,
|
|
'format_version': BABYCODE_VERSION,
|
|
})
|
|
print('Re-parsing MOTDs done.')
|
|
|
|
print('Re-parsing done.')
|
|
|
|
def bind_default_badges(path):
|
|
from .db import db
|
|
with db.transaction():
|
|
potential_stales = BadgeUploads.get_default()
|
|
d = os.listdir(path)
|
|
for bu in potential_stales:
|
|
if os.path.basename(bu.file_path) not in d:
|
|
print(f'Deleted stale default badge{os.path.basename(bu.file_path)}')
|
|
bu.delete()
|
|
|
|
for f in d:
|
|
real_path = os.path.join(path, f)
|
|
if not os.path.isfile(real_path):
|
|
continue
|
|
if not f.endswith('.webp'):
|
|
continue
|
|
proxied_path = f'/static/badges/{f}'
|
|
bu = BadgeUploads.find({'file_path': proxied_path})
|
|
if not bu:
|
|
BadgeUploads.create({
|
|
'file_path': proxied_path,
|
|
'uploaded_at': int(os.path.getmtime(real_path)),
|
|
})
|
|
|
|
def clear_stale_sessions():
|
|
from .db import db
|
|
with db.transaction():
|
|
now = int(time.time())
|
|
stale_sessions = Sessions.findall([
|
|
('expires_at', '<', now)
|
|
])
|
|
for sess in stale_sessions:
|
|
sess.delete()
|
|
|
|
|
|
cache = Cache()
|
|
|
|
def create_app():
|
|
app = Flask(__name__)
|
|
app.config['SITE_NAME'] = 'Pyrom'
|
|
app.config['DISABLE_SIGNUP'] = False
|
|
app.config['MODS_CAN_INVITE'] = True
|
|
app.config['USERS_CAN_INVITE'] = False
|
|
app.config['ADMIN_CONTACT_INFO'] = ''
|
|
app.config['GUIDE_DESCRIPTION'] = ''
|
|
|
|
app.config['CACHE_TYPE'] = 'FileSystemCache'
|
|
app.config['CACHE_DEFAULT_TIMEOUT'] = 300
|
|
|
|
try:
|
|
app.config.from_file('../config/pyrom_config.toml', load=tomllib.load, text=False)
|
|
except FileNotFoundError:
|
|
print('No configuration file found, leaving defaults.')
|
|
|
|
if os.getenv('PYROM_PROD') is None:
|
|
app.static_folder = os.path.join(os.path.dirname(__file__), '../data/static')
|
|
app.debug = True
|
|
app.config['DB_PATH'] = 'data/db/db.dev.sqlite'
|
|
app.config['SERVER_NAME'] = 'localhost:8080'
|
|
load_dotenv()
|
|
else:
|
|
app.config['DB_PATH'] = 'data/db/db.prod.sqlite'
|
|
if not app.config['SERVER_NAME']:
|
|
raise SiteNameMissingException()
|
|
|
|
app.config['SECRET_KEY'] = os.getenv('FLASK_SECRET_KEY')
|
|
|
|
app.config['AVATAR_UPLOAD_PATH'] = 'data/static/avatars/'
|
|
app.config['BADGES_PATH'] = 'data/static/badges/'
|
|
app.config['BADGES_UPLOAD_PATH'] = 'data/static/badges/user/'
|
|
app.config['MAX_CONTENT_LENGTH'] = 3 * 1000 * 1000 # 3M total, subject to further limits per route
|
|
|
|
os.makedirs(os.path.dirname(app.config['DB_PATH']), exist_ok = True)
|
|
os.makedirs(os.path.dirname(app.config['BADGES_UPLOAD_PATH']), exist_ok = True)
|
|
|
|
if app.config['CACHE_TYPE'] == 'FileSystemCache':
|
|
cache_dir = app.config.get('CACHE_DIR', 'data/_cached')
|
|
os.makedirs(cache_dir, exist_ok = True)
|
|
app.config['CACHE_DIR'] = cache_dir
|
|
|
|
cache.init_app(app)
|
|
|
|
from app.routes.app import bp as app_bp
|
|
from app.routes.topics import bp as topics_bp
|
|
from app.routes.threads import bp as threads_bp
|
|
from app.routes.users import bp as users_bp
|
|
from app.routes.guides import bp as guides_bp
|
|
from app.routes.mod import bp as mod_bp
|
|
app.register_blueprint(app_bp)
|
|
app.register_blueprint(topics_bp)
|
|
app.register_blueprint(threads_bp)
|
|
app.register_blueprint(users_bp)
|
|
app.register_blueprint(guides_bp)
|
|
app.register_blueprint(mod_bp)
|
|
|
|
with app.app_context():
|
|
from .schema import create as create_tables
|
|
from .migrations import run_migrations
|
|
create_tables()
|
|
run_migrations()
|
|
|
|
create_default_avatar()
|
|
create_admin()
|
|
create_deleted_user()
|
|
|
|
clear_stale_sessions()
|
|
|
|
reparse_babycode()
|
|
|
|
bind_default_badges(app.config['BADGES_PATH'])
|
|
|
|
app.config['SESSION_COOKIE_SECURE'] = True
|
|
|
|
@app.before_request
|
|
def make_session_permanent():
|
|
session.permanent = True
|
|
|
|
commit = ''
|
|
with open('.git/refs/heads/main') as f:
|
|
commit = f.read().strip()
|
|
|
|
@app.context_processor
|
|
def inject_constants():
|
|
return {
|
|
'InfoboxHTMLClass': InfoboxHTMLClass,
|
|
'InfoboxKind': InfoboxKind,
|
|
'PermissionLevel': PermissionLevel,
|
|
'__commit': commit,
|
|
'__emoji': EMOJI,
|
|
'REACTION_EMOJI': REACTION_EMOJI,
|
|
'MOTD_BANNED_TAGS': MOTD_BANNED_TAGS,
|
|
'SIG_BANNED_TAGS': SIG_BANNED_TAGS,
|
|
}
|
|
|
|
@app.context_processor
|
|
def inject_funcs():
|
|
return {
|
|
'get_motds': MOTD.get_all,
|
|
'get_time_now': lambda: int(time.time()),
|
|
'is_logged_in': is_logged_in,
|
|
'is_mod': lambda: is_logged_in() and get_active_user().is_mod(),
|
|
'get_active_user': get_active_user,
|
|
'get_post_url': get_post_url,
|
|
}
|
|
|
|
@app.template_filter('ts_datetime')
|
|
def ts_datetime(ts, format):
|
|
return datetime.utcfromtimestamp(ts or int(time.time())).strftime(format)
|
|
|
|
@app.template_filter('dict_to_query_string')
|
|
def d2q(d):
|
|
return dict_to_query_string(d)
|
|
|
|
@app.template_filter('pluralize')
|
|
def pluralize(subject, num=1, singular = '', plural = 's'):
|
|
if int(num) == 1:
|
|
return subject + singular
|
|
|
|
return subject + plural
|
|
|
|
@app.template_filter('permission_string')
|
|
def permission_string(term):
|
|
return permission_level_string(term)
|
|
|
|
@app.template_filter('babycode')
|
|
def babycode_filter(markup, nofrag=False):
|
|
return babycode_to_html(markup, fragment=not nofrag).result
|
|
|
|
@app.template_filter('babycode_strict')
|
|
def babycode_strict_filter(markup, nofrag=False):
|
|
return babycode_to_html(markup, banned_tags=STRICT_BANNED_TAGS, fragment=not nofrag).result
|
|
|
|
@app.template_filter('basename_noext')
|
|
def basename_noext(subj):
|
|
return os.path.splitext(os.path.basename(subj))[0]
|
|
|
|
@app.errorhandler(404)
|
|
def _handle_404(e):
|
|
if request.path.startswith('/hyperapi/'):
|
|
return '<h1>not found</h1>', e.code
|
|
elif request.path.startswith('/api/'):
|
|
return {'error': 'not found'}, e.code
|
|
else:
|
|
return render_template('common/404.html'), e.code
|
|
#
|
|
# @app.errorhandler(413)
|
|
# def _handle_413(e):
|
|
# if request.path.startswith('/hyperapi/'):
|
|
# return '<h1>request body too large</h1>', e.code
|
|
# elif request.path.startswith('/api/'):
|
|
# return {'error': 'body too large'}, e.code
|
|
# else:
|
|
# return render_template('common/413.html'), e.code
|
|
|
|
# this only happens at build time but
|
|
# build time is when updates are done anyway
|
|
# sooo... /shrug
|
|
@app.template_filter('cachebust')
|
|
def cachebust(subject):
|
|
return f'{subject}?v={str(int(time.time()))}'
|
|
|
|
@app.template_filter('theme_name')
|
|
def get_theme_name(subject: str):
|
|
if subject == 'style':
|
|
return 'Default'
|
|
|
|
return f'{subject.removeprefix('theme-').replace('-', ' ').capitalize()} (beta)'
|
|
|
|
@app.template_filter('fromjson')
|
|
def fromjson(subject: str):
|
|
return json.loads(subject)
|
|
|
|
@app.template_filter('iso8601')
|
|
def unix_to_iso8601(subject: str):
|
|
return datetime.fromtimestamp(int(subject), timezone.utc).isoformat()
|
|
|
|
return app
|