c1aac7ecac
Co-authored-by: Cursor <cursoragent@cursor.com>
114 lines
3.6 KiB
Python
114 lines
3.6 KiB
Python
import os
|
|
import re
|
|
|
|
from sqlalchemy import inspect, text
|
|
|
|
from app import db
|
|
from app.models import User, UserGroup
|
|
|
|
|
|
def ensure_schema():
|
|
inspector = inspect(db.engine)
|
|
tables = inspector.get_table_names()
|
|
|
|
if "photos" in tables:
|
|
columns = {col["name"] for col in inspector.get_columns("photos")}
|
|
if "user_id" not in columns:
|
|
db.session.execute(
|
|
text("ALTER TABLE photos ADD COLUMN user_id INTEGER REFERENCES users(id)")
|
|
)
|
|
db.session.commit()
|
|
|
|
if "users" in tables and "user_groups" in tables:
|
|
columns = {col["name"] for col in inspector.get_columns("users")}
|
|
if "group_id" not in columns:
|
|
db.session.execute(
|
|
text("ALTER TABLE users ADD COLUMN group_id INTEGER REFERENCES user_groups(id)")
|
|
)
|
|
db.session.commit()
|
|
|
|
|
|
def ensure_default_group(app):
|
|
default_quota = int(os.getenv("DEFAULT_GROUP_QUOTA_MB", "100"))
|
|
default_group = UserGroup.query.filter_by(is_default=True).first()
|
|
|
|
if not default_group:
|
|
default_group = UserGroup.query.filter_by(slug="users").first()
|
|
if default_group:
|
|
default_group.is_default = True
|
|
else:
|
|
default_group = UserGroup(
|
|
name="Пользователи",
|
|
slug="users",
|
|
disk_quota_mb=default_quota,
|
|
is_default=True,
|
|
)
|
|
db.session.add(default_group)
|
|
db.session.commit()
|
|
app.logger.info("Default user group 'users' created with %s MB quota", default_quota)
|
|
|
|
User.query.filter(User.group_id.is_(None)).update({"group_id": default_group.id})
|
|
db.session.commit()
|
|
|
|
|
|
def ensure_site_settings(app):
|
|
from app.models import SiteSettings
|
|
|
|
if SiteSettings.query.get(1) is None:
|
|
db.session.add(SiteSettings(id=1))
|
|
db.session.commit()
|
|
app.logger.info("Site settings initialized")
|
|
|
|
|
|
def ensure_photo_storage_column():
|
|
inspector = inspect(db.engine)
|
|
if "photos" not in inspector.get_table_names():
|
|
return
|
|
columns = {col["name"] for col in inspector.get_columns("photos")}
|
|
if "storage_backend" not in columns:
|
|
db.session.execute(text("ALTER TABLE photos ADD COLUMN storage_backend VARCHAR(20) DEFAULT 'local'"))
|
|
db.session.commit()
|
|
|
|
|
|
def slugify(name):
|
|
slug = re.sub(r"[^a-z0-9]+", "-", name.lower().strip())
|
|
slug = slug.strip("-") or "group"
|
|
return slug[:80]
|
|
|
|
|
|
def create_first_admin(app):
|
|
username = os.getenv("ADMIN_USERNAME", "").strip()
|
|
email = os.getenv("ADMIN_EMAIL", "").strip()
|
|
password = os.getenv("ADMIN_PASSWORD", "").strip()
|
|
|
|
if not username or not email or not password:
|
|
app.logger.info("ADMIN_* env vars not set — first admin was not created automatically")
|
|
return None
|
|
|
|
if User.query.filter_by(is_admin=True).first():
|
|
return None
|
|
|
|
default_group = UserGroup.query.filter_by(is_default=True).first()
|
|
|
|
if User.query.filter_by(username=username).first():
|
|
user = User.query.filter_by(username=username).first()
|
|
user.is_admin = True
|
|
user.set_password(password)
|
|
if default_group and not user.group_id:
|
|
user.group_id = default_group.id
|
|
db.session.commit()
|
|
app.logger.info("Existing user '%s' promoted to admin", username)
|
|
return user
|
|
|
|
user = User(
|
|
username=username,
|
|
email=email,
|
|
is_admin=True,
|
|
group_id=default_group.id if default_group else None,
|
|
)
|
|
user.set_password(password)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
app.logger.info("First admin '%s' created", username)
|
|
return user
|