Files
fotohost/app/folders.py
T
2026-06-06 22:50:10 +03:00

328 lines
12 KiB
Python

import os
from flask import (
Blueprint,
abort,
flash,
redirect,
render_template,
request,
url_for,
)
from flask_login import current_user, login_required
from sqlalchemy import inspect, text
from app import db
from app.folder_utils import (
can_edit_folder,
can_manage_folder_settings,
can_view_folder,
is_folder_unlocked,
process_pending_invites,
unlock_folder,
)
from app.models import Folder, FolderInvite, FolderMember, Photo, User
from app.quota_utils import check_folder_limit
from app.settings_service import get_settings
from app.storage_service import delete_photo_file
bp = Blueprint("folders", __name__)
@bp.route("/cabinet/folders")
@login_required
def list_folders():
from app.quota_utils import quota_status
process_pending_invites(current_user)
owned = Folder.query.filter_by(owner_id=current_user.id).order_by(Folder.created_at.desc()).all()
shared = (
Folder.query.join(FolderMember)
.filter(
FolderMember.user_id == current_user.id,
Folder.owner_id != current_user.id,
)
.order_by(Folder.created_at.desc())
.all()
)
return render_template(
"cabinet/folders/list.html",
owned_folders=owned,
shared_folders=shared,
quota=quota_status(current_user),
)
@bp.route("/cabinet/folders/create", methods=["POST"])
@login_required
def create_folder():
name = request.form.get("name", "").strip()
is_private = request.form.get("is_private") == "on"
access_password = request.form.get("access_password", "").strip()
if len(name) < 2:
flash("Название папки — минимум 2 символа", "error")
return redirect(url_for("folders.list_folders"))
ok, limit_msg = check_folder_limit(current_user)
if not ok:
flash(limit_msg, "error")
return redirect(url_for("folders.list_folders"))
folder = Folder(name=name, owner_id=current_user.id, is_private=is_private)
if access_password:
if len(access_password) < 4:
flash("Пароль папки — минимум 4 символа", "error")
return redirect(url_for("folders.list_folders"))
folder.set_access_password(access_password)
db.session.add(folder)
db.session.commit()
flash(f"Папка «{folder.name}» создана", "success")
return redirect(url_for("folders.view_folder", folder_id=folder.id))
@bp.route("/cabinet/folders/<int:folder_id>")
@login_required
def view_folder(folder_id):
folder = Folder.query.get_or_404(folder_id)
if not can_view_folder(folder):
if folder.has_password:
return redirect(url_for("folders.folder_password", folder_id=folder.id))
abort(403)
photos = folder.photos.order_by(Photo.created_at.desc()).all()
can_edit = can_edit_folder(folder)
return render_template(
"cabinet/folders/view.html",
folder=folder,
photos=photos,
can_edit=can_edit,
share_url=_share_url(folder),
max_bulk_upload=get_settings().max_bulk_upload,
)
@bp.route("/cabinet/folders/<int:folder_id>/password", methods=["GET", "POST"])
@login_required
def folder_password(folder_id):
folder = Folder.query.get_or_404(folder_id)
if is_folder_owner_or_member(folder):
return redirect(url_for("folders.view_folder", folder_id=folder.id))
if request.method == "POST":
password = request.form.get("password", "")
if folder.check_access_password(password):
unlock_folder(folder.id)
flash("Доступ к папке открыт", "success")
return redirect(url_for("folders.view_folder", folder_id=folder.id))
flash("Неверный пароль", "error")
return render_template("cabinet/folders/password.html", folder=folder)
@bp.route("/cabinet/folders/<int:folder_id>/settings", methods=["GET", "POST"])
@login_required
def folder_settings(folder_id):
folder = Folder.query.get_or_404(folder_id)
if not can_manage_folder_settings(folder):
abort(403)
if request.method == "POST":
action = request.form.get("action", "save")
if action == "regenerate_link":
folder.regenerate_share_token()
db.session.commit()
flash("Ссылка для sharing обновлена", "success")
return redirect(url_for("folders.folder_settings", folder_id=folder.id))
if action == "delete":
_delete_folder(folder)
flash(f"Папка «{folder.name}» удалена", "success")
return redirect(url_for("folders.list_folders"))
name = request.form.get("name", "").strip()
is_private = request.form.get("is_private") == "on"
access_password = request.form.get("access_password", "").strip()
remove_password = request.form.get("remove_password") == "on"
if len(name) < 2:
flash("Название папки — минимум 2 символа", "error")
else:
folder.name = name
folder.is_private = is_private
if remove_password:
folder.set_access_password(None)
elif access_password:
if len(access_password) < 4:
flash("Пароль папки — минимум 4 символа", "error")
return redirect(url_for("folders.folder_settings", folder_id=folder.id))
folder.set_access_password(access_password)
db.session.commit()
flash("Настройки папки сохранены", "success")
return redirect(url_for("folders.folder_settings", folder_id=folder.id))
members = FolderMember.query.filter_by(folder_id=folder.id).all()
invites = FolderInvite.query.filter_by(folder_id=folder.id).all()
return render_template(
"cabinet/folders/settings.html",
folder=folder,
members=members,
invites=invites,
share_url=_share_url(folder),
)
@bp.route("/cabinet/folders/<int:folder_id>/invite", methods=["POST"])
@login_required
def invite_member(folder_id):
folder = Folder.query.get_or_404(folder_id)
if not can_manage_folder_settings(folder):
abort(403)
email = request.form.get("email", "").strip().lower()
role = request.form.get("role", "viewer")
if role not in ("viewer", "editor"):
role = "viewer"
if not email or "@" not in email:
flash("Укажите корректный email", "error")
return redirect(url_for("folders.folder_settings", folder_id=folder.id))
user = User.query.filter_by(email=email).first()
if user:
if user.id == folder.owner_id:
flash("Владелец папки уже имеет доступ", "error")
return redirect(url_for("folders.folder_settings", folder_id=folder.id))
existing = FolderMember.query.filter_by(folder_id=folder.id, user_id=user.id).first()
if existing:
existing.role = role
flash(f"Пользователь {user.username} уже в папке — роль обновлена", "success")
else:
db.session.add(
FolderMember(
folder_id=folder.id,
user_id=user.id,
role=role,
added_by_id=current_user.id,
)
)
FolderInvite.query.filter_by(folder_id=folder.id, email=email).delete()
flash(f"Пользователь {user.username} добавлен в папку", "success")
else:
invite = FolderInvite.query.filter_by(folder_id=folder.id, email=email).first()
if invite:
invite.role = role
flash("Приглашение обновлено", "success")
else:
db.session.add(
FolderInvite(
folder_id=folder.id,
email=email,
role=role,
invited_by_id=current_user.id,
)
)
flash(f"Приглашение отправлено на {email}. Доступ откроется после регистрации.", "success")
db.session.commit()
return redirect(url_for("folders.folder_settings", folder_id=folder.id))
@bp.route("/cabinet/folders/<int:folder_id>/members/<int:user_id>/remove", methods=["POST"])
@login_required
def remove_member(folder_id, user_id):
folder = Folder.query.get_or_404(folder_id)
if not can_manage_folder_settings(folder):
abort(403)
member = FolderMember.query.filter_by(folder_id=folder.id, user_id=user_id).first_or_404()
db.session.delete(member)
db.session.commit()
flash("Пользователь удалён из папки", "success")
return redirect(url_for("folders.folder_settings", folder_id=folder.id))
@bp.route("/cabinet/folders/<int:folder_id>/invites/<int:invite_id>/remove", methods=["POST"])
@login_required
def remove_invite(folder_id, invite_id):
folder = Folder.query.get_or_404(folder_id)
if not can_manage_folder_settings(folder):
abort(403)
invite = FolderInvite.query.filter_by(folder_id=folder.id, id=invite_id).first_or_404()
db.session.delete(invite)
db.session.commit()
flash("Приглашение отменено", "success")
return redirect(url_for("folders.folder_settings", folder_id=folder.id))
@bp.route("/share/f/<share_token>", methods=["GET", "POST"])
def share_folder(share_token):
folder = Folder.query.filter_by(share_token=share_token).first_or_404()
if current_user.is_authenticated:
process_pending_invites(current_user)
if is_folder_owner_or_member(folder) or (can_view_folder(folder) and not folder.has_password):
return _render_share_folder(folder)
if folder.has_password and not is_folder_unlocked(folder):
if request.method == "POST":
password = request.form.get("password", "")
if folder.check_access_password(password):
unlock_folder(folder.id)
flash("Доступ к папке открыт", "success")
return redirect(url_for("folders.share_folder", share_token=share_token))
flash("Неверный пароль", "error")
return render_template("share/password.html", folder=folder, share_token=share_token)
return _render_share_folder(folder)
def _render_share_folder(folder):
photos = folder.photos.order_by(Photo.created_at.desc()).all()
return render_template(
"share/folder.html",
folder=folder,
photos=photos,
can_edit=can_edit_folder(folder),
share_url=_share_url(folder),
max_bulk_upload=get_settings().max_bulk_upload,
)
def _share_url(folder):
return url_for("folders.share_folder", share_token=folder.share_token, _external=True)
def is_folder_owner_or_member(folder):
if not current_user.is_authenticated:
return False
if folder.owner_id == current_user.id:
return True
return FolderMember.query.filter_by(folder_id=folder.id, user_id=current_user.id).first() is not None
def _delete_folder(folder):
for photo in folder.photos.all():
delete_photo_file(photo.filename, photo.storage_backend)
db.session.delete(photo)
db.session.delete(folder)
db.session.commit()
def ensure_folder_schema():
inspector = inspect(db.engine)
tables = inspector.get_table_names()
if "photos" in tables:
columns = {col["name"] for col in inspector.get_columns("photos")}
if "folder_id" not in columns:
db.session.execute(
text("ALTER TABLE photos ADD COLUMN folder_id INTEGER REFERENCES folders(id)")
)
db.session.commit()