c1aac7ecac
Co-authored-by: Cursor <cursoragent@cursor.com>
273 lines
9.8 KiB
Python
273 lines
9.8 KiB
Python
import os
|
|
|
|
from flask import Blueprint, current_app, flash, redirect, render_template, request, url_for
|
|
from flask_login import current_user
|
|
from sqlalchemy import func
|
|
|
|
from app import db
|
|
from app.auth_utils import admin_required
|
|
from app.bootstrap import slugify
|
|
from app.deploy_utils import (
|
|
checkout_version,
|
|
deploy_rebuild,
|
|
fetch_remote,
|
|
get_current_version,
|
|
get_deploy_status,
|
|
is_deploy_enabled,
|
|
)
|
|
from app.models import Photo, User, UserGroup
|
|
from app.quota_utils import get_user_storage_used
|
|
from app.settings_service import get_settings, update_settings_from_form
|
|
from app.storage_service import delete_photo_file
|
|
|
|
bp = Blueprint("admin", __name__, url_prefix="/admin")
|
|
|
|
|
|
@bp.route("/")
|
|
@admin_required
|
|
def dashboard():
|
|
stats = {
|
|
"users": User.query.count(),
|
|
"photos": Photo.query.count(),
|
|
"admins": User.query.filter_by(is_admin=True).count(),
|
|
"groups": UserGroup.query.count(),
|
|
"storage": int(
|
|
db.session.query(func.coalesce(func.sum(Photo.file_size), 0)).scalar() or 0
|
|
),
|
|
}
|
|
recent_users = User.query.order_by(User.created_at.desc()).limit(5).all()
|
|
recent_photos = Photo.query.order_by(Photo.created_at.desc()).limit(8).all()
|
|
current_version, _ = get_current_version()
|
|
return render_template(
|
|
"admin/dashboard.html",
|
|
stats=stats,
|
|
recent_users=recent_users,
|
|
recent_photos=recent_photos,
|
|
current_version=current_version,
|
|
deploy_enabled=is_deploy_enabled(),
|
|
)
|
|
|
|
|
|
@bp.route("/users")
|
|
@admin_required
|
|
def users():
|
|
all_users = User.query.order_by(User.created_at.desc()).all()
|
|
groups = UserGroup.query.order_by(UserGroup.name).all()
|
|
return render_template("admin/users.html", users=all_users, groups=groups)
|
|
|
|
|
|
@bp.route("/users/<int:user_id>/set-group", methods=["POST"])
|
|
@admin_required
|
|
def set_user_group(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
group_id = request.form.get("group_id", type=int)
|
|
group = UserGroup.query.get_or_404(group_id)
|
|
user.group_id = group.id
|
|
db.session.commit()
|
|
flash(f"Пользователь {user.username} перемещён в группу «{group.name}»", "success")
|
|
return redirect(url_for("admin.users"))
|
|
|
|
|
|
@bp.route("/users/<int:user_id>/toggle-admin", methods=["POST"])
|
|
@admin_required
|
|
def toggle_admin(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
if user.id == current_user.id:
|
|
flash("Нельзя снять права администратора с самого себя", "error")
|
|
return redirect(url_for("admin.users"))
|
|
|
|
admin_count = User.query.filter_by(is_admin=True).count()
|
|
if user.is_admin and admin_count <= 1:
|
|
flash("Нельзя удалить последнего администратора", "error")
|
|
return redirect(url_for("admin.users"))
|
|
|
|
user.is_admin = not user.is_admin
|
|
db.session.commit()
|
|
action = "назначен администратором" if user.is_admin else "лишён прав администратора"
|
|
flash(f"Пользователь {user.username} {action}", "success")
|
|
return redirect(url_for("admin.users"))
|
|
|
|
|
|
@bp.route("/users/<int:user_id>/toggle-active", methods=["POST"])
|
|
@admin_required
|
|
def toggle_active(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
if user.id == current_user.id:
|
|
flash("Нельзя заблокировать самого себя", "error")
|
|
return redirect(url_for("admin.users"))
|
|
|
|
user.is_active = not user.is_active
|
|
db.session.commit()
|
|
action = "разблокирован" if user.is_active else "заблокирован"
|
|
flash(f"Пользователь {user.username} {action}", "success")
|
|
return redirect(url_for("admin.users"))
|
|
|
|
|
|
@bp.route("/users/<int:user_id>/delete", methods=["POST"])
|
|
@admin_required
|
|
def delete_user(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
if user.id == current_user.id:
|
|
flash("Нельзя удалить самого себя", "error")
|
|
return redirect(url_for("admin.users"))
|
|
|
|
if user.is_admin and User.query.filter_by(is_admin=True).count() <= 1:
|
|
flash("Нельзя удалить последнего администратора", "error")
|
|
return redirect(url_for("admin.users"))
|
|
|
|
for photo in user.photos.all():
|
|
delete_photo_file(photo.filename, photo.storage_backend)
|
|
db.session.delete(photo)
|
|
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
flash(f"Пользователь {user.username} удалён", "success")
|
|
return redirect(url_for("admin.users"))
|
|
|
|
|
|
@bp.route("/groups", methods=["GET", "POST"])
|
|
@admin_required
|
|
def groups():
|
|
if request.method == "POST":
|
|
name = request.form.get("name", "").strip()
|
|
quota_mb = request.form.get("disk_quota_mb", type=int) or 100
|
|
|
|
if len(name) < 2:
|
|
flash("Название группы — минимум 2 символа", "error")
|
|
elif UserGroup.query.filter_by(name=name).first():
|
|
flash("Группа с таким названием уже существует", "error")
|
|
else:
|
|
slug = slugify(name)
|
|
base_slug = slug
|
|
counter = 1
|
|
while UserGroup.query.filter_by(slug=slug).first():
|
|
slug = f"{base_slug}-{counter}"
|
|
counter += 1
|
|
|
|
group = UserGroup(name=name, slug=slug, disk_quota_mb=max(0, quota_mb))
|
|
db.session.add(group)
|
|
db.session.commit()
|
|
flash(f"Группа «{name}» создана", "success")
|
|
return redirect(url_for("admin.groups"))
|
|
|
|
all_groups = UserGroup.query.order_by(UserGroup.is_default.desc(), UserGroup.name).all()
|
|
group_stats = []
|
|
for group in all_groups:
|
|
used = sum(get_user_storage_used(u.id) for u in group.users)
|
|
group_stats.append({"group": group, "storage_used": used})
|
|
return render_template("admin/groups.html", group_stats=group_stats)
|
|
|
|
|
|
@bp.route("/groups/<int:group_id>/edit", methods=["POST"])
|
|
@admin_required
|
|
def edit_group(group_id):
|
|
group = UserGroup.query.get_or_404(group_id)
|
|
name = request.form.get("name", "").strip()
|
|
quota_mb = request.form.get("disk_quota_mb", type=int)
|
|
|
|
if len(name) < 2:
|
|
flash("Название группы — минимум 2 символа", "error")
|
|
return redirect(url_for("admin.groups"))
|
|
|
|
other = UserGroup.query.filter(UserGroup.name == name, UserGroup.id != group.id).first()
|
|
if other:
|
|
flash("Группа с таким названием уже существует", "error")
|
|
return redirect(url_for("admin.groups"))
|
|
|
|
group.name = name
|
|
if quota_mb is not None:
|
|
group.disk_quota_mb = max(0, quota_mb)
|
|
db.session.commit()
|
|
flash(f"Группа «{group.name}» обновлена", "success")
|
|
return redirect(url_for("admin.groups"))
|
|
|
|
|
|
@bp.route("/groups/<int:group_id>/delete", methods=["POST"])
|
|
@admin_required
|
|
def delete_group(group_id):
|
|
group = UserGroup.query.get_or_404(group_id)
|
|
if group.is_default:
|
|
flash("Нельзя удалить группу по умолчанию", "error")
|
|
return redirect(url_for("admin.groups"))
|
|
|
|
default_group = UserGroup.query.filter_by(is_default=True).first()
|
|
if not default_group:
|
|
flash("Не найдена группа по умолчанию", "error")
|
|
return redirect(url_for("admin.groups"))
|
|
|
|
User.query.filter_by(group_id=group.id).update({"group_id": default_group.id})
|
|
db.session.delete(group)
|
|
db.session.commit()
|
|
flash(f"Группа удалена, пользователи перенесены в «{default_group.name}»", "success")
|
|
return redirect(url_for("admin.groups"))
|
|
|
|
|
|
@bp.route("/photos")
|
|
@admin_required
|
|
def photos():
|
|
all_photos = Photo.query.order_by(Photo.created_at.desc()).all()
|
|
return render_template("admin/photos.html", photos=all_photos)
|
|
|
|
|
|
@bp.route("/photos/<int:photo_id>/delete", methods=["POST"])
|
|
@admin_required
|
|
def delete_photo(photo_id):
|
|
photo = Photo.query.get_or_404(photo_id)
|
|
delete_photo_file(photo.filename, photo.storage_backend)
|
|
db.session.delete(photo)
|
|
db.session.commit()
|
|
flash("Фото удалено", "success")
|
|
return redirect(url_for("admin.photos"))
|
|
|
|
|
|
@bp.route("/deploy", methods=["GET", "POST"])
|
|
@admin_required
|
|
def deploy():
|
|
status = get_deploy_status()
|
|
|
|
if request.method == "POST":
|
|
action = request.form.get("action")
|
|
|
|
if action == "fetch":
|
|
ok, msg = fetch_remote()
|
|
flash(msg if ok else msg, "success" if ok else "error")
|
|
elif action == "checkout":
|
|
ref = request.form.get("ref", "").strip()
|
|
ok, msg = checkout_version(ref)
|
|
flash(msg, "success" if ok else "error")
|
|
elif action == "rebuild":
|
|
ok, msg = deploy_rebuild()
|
|
flash(msg, "success" if ok else "error")
|
|
else:
|
|
flash("Неизвестное действие", "error")
|
|
|
|
return redirect(url_for("admin.deploy"))
|
|
|
|
return render_template("admin/deploy.html", status=status)
|
|
|
|
|
|
@bp.route("/settings", methods=["GET", "POST"])
|
|
@admin_required
|
|
def settings():
|
|
site_settings = get_settings()
|
|
|
|
if request.method == "POST":
|
|
action = request.form.get("action", "save")
|
|
|
|
if action == "test_smtp":
|
|
from app.email_service import send_email
|
|
|
|
ok, msg = send_email(
|
|
current_user.email,
|
|
"PhotoHost — тест SMTP",
|
|
"SMTP настроен корректно.",
|
|
)
|
|
flash(msg, "success" if ok else "error")
|
|
return redirect(url_for("admin.settings"))
|
|
|
|
update_settings_from_form(request.form)
|
|
flash("Настройки сохранены", "success")
|
|
return redirect(url_for("admin.settings"))
|
|
|
|
return render_template("admin/settings.html", settings=site_settings)
|