Files
pyrom/app/migrations.py
2025-12-02 06:13:50 +03:00

77 lines
2.9 KiB
Python

from .db import db
def migrate_old_avatars():
for avatar in db.query('SELECT id, file_path FROM avatars WHERE file_path LIKE "/avatars/%"'):
new_path = f"/static{avatar['file_path']}"
db.execute('UPDATE avatars SET file_path = ? WHERE id = ?', new_path, avatar['id'])
def add_signature_format():
db.execute('ALTER TABLE "users" ADD COLUMN "signature_markup_language" TEXT NOT NULL DEFAULT "babycode"')
db.execute('ALTER TABLE "users" ADD COLUMN "signature_format_version" INTEGER DEFAULT NULL')
def create_default_bookmark_collections():
from .constants import PermissionLevel
q = """SELECT users.id FROM users
LEFT JOIN bookmark_collections bc ON (users.id = bc.user_id AND bc.is_default = TRUE)
WHERE bc.id IS NULL and users.permission IS NOT ?"""
user_ids_without_default_collection = db.query(q, PermissionLevel.SYSTEM.value)
if len(user_ids_without_default_collection) == 0:
return
from .models import BookmarkCollections
for user in user_ids_without_default_collection:
BookmarkCollections.create_default(user['id'])
def add_display_name():
dq = 'ALTER TABLE "users" ADD COLUMN "display_name" TEXT NOT NULL DEFAULT ""'
db.execute(dq)
from .models import Users
for user in Users.select():
data = {
'username': user.username.lower(),
}
if user.username.lower() != user.username:
data['display_name'] = user.username
user.update(data)
# format: [str|tuple(str, any...)|callable]
MIGRATIONS = [
migrate_old_avatars,
'DELETE FROM sessions', # delete old lua porom sessions
'ALTER TABLE "users" ADD COLUMN "invited_by" INTEGER REFERENCES users(id)', # invitation system
'ALTER TABLE "post_history" ADD COLUMN "format_version" INTEGER DEFAULT NULL',
add_signature_format,
create_default_bookmark_collections,
add_display_name,
]
def run_migrations():
db.execute("""
CREATE TABLE IF NOT EXISTS _migrations(
id INTEGER PRIMARY KEY
)
""")
if len(MIGRATIONS) == 0:
print("No migrations defined.")
return
print("Running migrations...")
ran = 0
completed = {int(row["id"]) for row in db.query("SELECT id FROM _migrations")}
to_run = {idx: migration_obj for idx, migration_obj in enumerate(MIGRATIONS) if idx not in completed}
if not to_run:
print('No migrations need to run.')
return
with db.transaction():
for migration_id, migration_obj in to_run.items():
if isinstance(migration_obj, str):
db.execute(migration_obj)
elif isinstance(migration_obj, tuple):
db.execute(migration_obj[0], *migration_obj[1:])
elif callable(migration_obj):
migration_obj()
db.execute('INSERT INTO _migrations (id) VALUES (?)', migration_id)
ran += 1
print(f"Ran {ran} migrations.")