Files

216 lines
6.5 KiB
Python

import os
import re
from sqlalchemy import inspect, text
from app import db
from app.models import User, UserGroup
def ensure_schema():
inspector = inspect(db.engine)
tables = inspector.get_table_names()
if "photos" in tables:
db.session.execute(
text(
"ALTER TABLE photos ADD COLUMN IF NOT EXISTS "
"user_id INTEGER REFERENCES users(id)"
)
)
db.session.commit()
if "users" in tables and "user_groups" in tables:
db.session.execute(
text(
"ALTER TABLE users ADD COLUMN IF NOT EXISTS "
"group_id INTEGER REFERENCES user_groups(id)"
)
)
db.session.commit()
def ensure_default_group(app):
default_quota = int(os.getenv("DEFAULT_GROUP_QUOTA_MB", "100"))
default_max_folders = int(os.getenv("DEFAULT_GROUP_MAX_FOLDERS", "10"))
default_max_photos = int(os.getenv("DEFAULT_GROUP_MAX_PHOTOS", "500"))
default_group = UserGroup.query.filter_by(is_default=True).first()
if not default_group:
default_group = UserGroup.query.filter_by(slug="users").first()
if default_group:
default_group.is_default = True
else:
default_group = UserGroup(
name="Пользователи",
slug="users",
disk_quota_mb=default_quota,
max_folders=default_max_folders,
max_photos=default_max_photos,
is_default=True,
)
db.session.add(default_group)
db.session.commit()
app.logger.info(
"Default user group 'users' created (quota=%s MB, folders=%s, photos=%s)",
default_quota,
default_max_folders,
default_max_photos,
)
User.query.filter(User.group_id.is_(None)).update({"group_id": default_group.id})
db.session.commit()
def ensure_group_limit_columns():
inspector = inspect(db.engine)
if "user_groups" not in inspector.get_table_names():
return
db.session.execute(
text(
"ALTER TABLE user_groups ADD COLUMN IF NOT EXISTS "
"max_folders INTEGER NOT NULL DEFAULT 10"
)
)
db.session.execute(
text(
"ALTER TABLE user_groups ADD COLUMN IF NOT EXISTS "
"max_photos INTEGER NOT NULL DEFAULT 500"
)
)
db.session.commit()
def ensure_site_settings(app):
from app.models import SiteSettings
if SiteSettings.query.get(1) is None:
db.session.add(SiteSettings(id=1))
db.session.commit()
app.logger.info("Site settings initialized")
def ensure_photo_storage_column():
inspector = inspect(db.engine)
if "photos" not in inspector.get_table_names():
return
db.session.execute(
text(
"ALTER TABLE photos ADD COLUMN IF NOT EXISTS "
"storage_backend VARCHAR(20) DEFAULT 'local'"
)
)
db.session.commit()
def ensure_site_settings_auth_columns():
inspector = inspect(db.engine)
if "site_settings" not in inspector.get_table_names():
return
columns = [
"registration_enabled BOOLEAN NOT NULL DEFAULT TRUE",
"password_login_enabled BOOLEAN NOT NULL DEFAULT TRUE",
"passkey_enabled BOOLEAN NOT NULL DEFAULT TRUE",
"webauthn_rp_id VARCHAR(255)",
"webauthn_rp_name VARCHAR(120) DEFAULT 'PhotoHost'",
"webauthn_origin VARCHAR(255)",
"captcha_provider VARCHAR(20) NOT NULL DEFAULT 'none'",
"turnstile_site_key VARCHAR(255)",
"turnstile_secret_key VARCHAR(255)",
"recaptcha_v2_site_key VARCHAR(255)",
"recaptcha_v2_secret_key VARCHAR(255)",
"recaptcha_v3_site_key VARCHAR(255)",
"recaptcha_v3_secret_key VARCHAR(255)",
"recaptcha_v3_min_score DOUBLE PRECISION NOT NULL DEFAULT 0.5",
"captcha_on_login BOOLEAN NOT NULL DEFAULT FALSE",
"captcha_on_register BOOLEAN NOT NULL DEFAULT TRUE",
"captcha_on_forgot_password BOOLEAN NOT NULL DEFAULT FALSE",
]
for column in columns:
name = column.split()[0]
db.session.execute(
text(f"ALTER TABLE site_settings ADD COLUMN IF NOT EXISTS {name} {column[len(name) + 1:]}")
)
db.session.commit()
def ensure_user_privacy_columns():
inspector = inspect(db.engine)
if "users" not in inspector.get_table_names():
return
db.session.execute(
text(
"ALTER TABLE users ADD COLUMN IF NOT EXISTS "
"gdpr_accepted_at TIMESTAMP WITH TIME ZONE"
)
)
db.session.execute(
text(
"ALTER TABLE users ADD COLUMN IF NOT EXISTS "
"cookie_analytics BOOLEAN NOT NULL DEFAULT FALSE"
)
)
db.session.commit()
def run_schema_migrations():
ensure_schema()
ensure_group_limit_columns()
ensure_site_settings_auth_columns()
ensure_user_privacy_columns()
from app.folders import ensure_folder_schema
ensure_folder_schema()
ensure_photo_storage_column()
def run_database_setup(app):
run_schema_migrations()
ensure_default_group(app)
ensure_site_settings(app)
create_first_admin(app)
def slugify(name):
slug = re.sub(r"[^a-z0-9]+", "-", name.lower().strip())
slug = slug.strip("-") or "group"
return slug[:80]
def create_first_admin(app):
username = os.getenv("ADMIN_USERNAME", "").strip()
email = os.getenv("ADMIN_EMAIL", "").strip()
password = os.getenv("ADMIN_PASSWORD", "").strip()
if not username or not email or not password:
app.logger.info("ADMIN_* env vars not set — first admin was not created automatically")
return None
if User.query.filter_by(is_admin=True).first():
return None
default_group = UserGroup.query.filter_by(is_default=True).first()
if User.query.filter_by(username=username).first():
user = User.query.filter_by(username=username).first()
user.is_admin = True
user.set_password(password)
if default_group and not user.group_id:
user.group_id = default_group.id
db.session.commit()
app.logger.info("Existing user '%s' promoted to admin", username)
return user
user = User(
username=username,
email=email,
is_admin=True,
group_id=default_group.id if default_group else None,
)
user.set_password(password)
db.session.add(user)
db.session.commit()
app.logger.info("First admin '%s' created", username)
return user