Files

323 lines
10 KiB
Python

import os
from flask import (
Blueprint,
Response,
abort,
current_app,
flash,
jsonify,
make_response,
redirect,
render_template,
request,
send_file,
url_for,
)
from flask_login import current_user, login_required
from app import db
from app.auth_utils import photo_owner_or_admin
from app.folder_utils import can_edit_folder
from app.models import Folder, Photo
from app.settings_service import get_settings
from app.storage_service import delete_photo_file, get_photo_stream
from app.upload_service import process_uploads, process_url_uploads
from sqlalchemy import text
bp = Blueprint("main", __name__)
@bp.route("/health")
def health():
try:
db.session.execute(text("SELECT 1"))
db.session.remove()
return Response("ok\n", mimetype="text/plain")
except Exception as exc:
db.session.remove()
return Response(f"error: {exc}\n", status=503, mimetype="text/plain")
@bp.route("/")
def index():
photos = Photo.query.order_by(Photo.created_at.desc()).limit(24).all()
total_photos = Photo.query.count()
total_size = db.session.query(db.func.coalesce(db.func.sum(Photo.file_size), 0)).scalar() or 0
settings = get_settings()
return render_template(
"index.html",
photos=photos,
total_photos=total_photos,
total_size=int(total_size),
max_upload_mb=current_app.config["MAX_CONTENT_LENGTH"] // (1024 * 1024),
max_bulk_upload=settings.max_bulk_upload,
)
@bp.route("/upload", methods=["POST"])
@login_required
def upload():
folder_id = request.form.get("folder_id", type=int)
folder = None
if folder_id:
folder = Folder.query.get_or_404(folder_id)
if not can_edit_folder(folder):
abort(403)
image_urls = request.form.get("image_urls", "").strip()
max_upload_mb = current_app.config["MAX_CONTENT_LENGTH"] // (1024 * 1024)
if image_urls:
result = process_url_uploads(
image_urls,
current_user,
folder,
current_app.config["ALLOWED_EXTENSIONS"],
max_upload_mb,
)
else:
result = process_uploads(
request.files,
current_user,
folder,
current_app.config["ALLOWED_EXTENSIONS"],
)
if result["uploaded"] == 0 and result["errors"]:
flash(result["errors"][0], "error")
elif result["uploaded"] == 1:
flash("Фото успешно загружено", "success")
elif result["uploaded"] > 1:
flash(f"Загружено {result['uploaded']} фото", "success")
for err in result["errors"]:
if result["uploaded"] > 0:
flash(err, "error")
if result["uploaded"] > 0:
from app.email_service import send_upload_notification
send_upload_notification(
current_user,
result["uploaded"],
folder.name if folder else None,
)
if folder:
return redirect(url_for("folders.view_folder", folder_id=folder.id))
return redirect(request.referrer or url_for("cabinet.index"))
@bp.route("/api/photos")
def api_photos():
photos = Photo.query.order_by(Photo.created_at.desc()).all()
return jsonify(
[
{
"id": p.id,
"url": p.url,
"original_name": p.original_name,
"file_size": p.file_size,
"size_human": p.size_human,
"user_id": p.user_id,
"created_at": p.created_at.isoformat(),
}
for p in photos
]
)
@bp.route("/photo/<int:photo_id>/qr")
def photo_qr(photo_id):
import io
import qrcode
from app.share_utils import photo_absolute_url
photo = Photo.query.get_or_404(photo_id)
target = photo_absolute_url(photo, request.url_root)
qr = qrcode.QRCode(box_size=8, border=2)
qr.add_data(target)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buf = io.BytesIO()
img.save(buf, format="PNG")
buf.seek(0)
return send_file(buf, mimetype="image/png", download_name=f"photo-{photo.id}-qr.png")
@bp.route("/uploads/<path:filename>")
def uploaded_file(filename):
photo = Photo.query.filter_by(filename=filename).first()
storage_backend = photo.storage_backend if photo else "local"
stream = get_photo_stream(filename, storage_backend)
if stream is None:
abort(404)
mimetype = photo.mime_type if photo else "application/octet-stream"
return send_file(stream, mimetype=mimetype)
@bp.route("/delete/<int:photo_id>", methods=["POST"])
@login_required
def delete_photo(photo_id):
photo = Photo.query.get_or_404(photo_id)
photo_owner_or_admin(photo)
delete_photo_file(photo.filename, photo.storage_backend)
db.session.delete(photo)
db.session.commit()
flash("Фото удалено", "success")
return redirect(request.referrer or url_for("main.index"))
cabinet_bp = Blueprint("cabinet", __name__, url_prefix="/cabinet")
@cabinet_bp.route("/")
@login_required
def index():
from app.folder_utils import process_pending_invites
from app.quota_utils import quota_status
process_pending_invites(current_user)
photos = (
Photo.query.filter_by(user_id=current_user.id, folder_id=None)
.order_by(Photo.created_at.desc())
.all()
)
folders = Folder.query.filter_by(owner_id=current_user.id).order_by(Folder.created_at.desc()).limit(6).all()
total_size = sum(p.file_size for p in photos)
quota = quota_status(current_user)
settings = get_settings()
return render_template(
"cabinet/index.html",
photos=photos,
folders=folders,
total_photos=len(photos),
total_size=total_size,
quota=quota,
max_upload_mb=current_app.config["MAX_CONTENT_LENGTH"] // (1024 * 1024),
max_bulk_upload=settings.max_bulk_upload,
)
@cabinet_bp.route("/profile", methods=["GET", "POST"])
@login_required
def profile():
from app.models import User, UserPasskey
from app.passkey_service import delete_passkey
from app.session_service import (
get_current_session_key,
list_user_sessions,
revoke_all_sessions,
revoke_session,
)
if request.method == "POST":
action = request.form.get("action", "save")
if action == "revoke_session":
session_id = request.form.get("session_id", type=int)
if session_id and revoke_session(session_id, current_user.id):
flash("Сессия завершена", "success")
return redirect(url_for("cabinet.profile"))
if action == "revoke_all_sessions":
count = revoke_all_sessions(current_user.id, except_current=True)
flash(f"Завершено сессий: {count}", "success")
return redirect(url_for("cabinet.profile"))
if action == "delete_passkey":
passkey_id = request.form.get("passkey_id", type=int)
if passkey_id and delete_passkey(current_user, passkey_id):
flash("Passkey удалён", "success")
return redirect(url_for("cabinet.profile"))
if action == "delete_account":
password = request.form.get("delete_password", "")
if not current_user.check_password(password):
flash("Неверный пароль", "error")
else:
_delete_user_account(current_user)
flash("Аккаунт удалён", "success")
return redirect(url_for("main.index"))
email = request.form.get("email", "").strip().lower()
current_password = request.form.get("current_password", "")
new_password = request.form.get("new_password", "")
new_password2 = request.form.get("new_password2", "")
other = User.query.filter(User.email == email, User.id != current_user.id).first()
if other:
flash("Этот email уже используется", "error")
elif not current_user.check_password(current_password):
flash("Неверный текущий пароль", "error")
elif new_password and len(new_password) < 6:
flash("Новый пароль — минимум 6 символов", "error")
elif new_password and new_password != new_password2:
flash("Новые пароли не совпадают", "error")
else:
current_user.email = email
if new_password:
current_user.set_password(new_password)
db.session.commit()
flash("Профиль обновлён", "success")
return redirect(url_for("cabinet.profile"))
sessions = list_user_sessions(current_user.id)
passkeys = current_user.passkeys.order_by(UserPasskey.created_at.desc()).all()
current_sid = get_current_session_key()
return render_template(
"cabinet/profile.html",
sessions=sessions,
passkeys=passkeys,
current_sid=current_sid,
)
@cabinet_bp.route("/profile/export")
@login_required
def export_profile():
from app.legal import export_user_data
import json
data = export_user_data(current_user)
response = make_response(json.dumps(data, ensure_ascii=False, indent=2))
response.headers["Content-Type"] = "application/json; charset=utf-8"
response.headers["Content-Disposition"] = (
f'attachment; filename="photohost-{current_user.username}.json"'
)
return response
def _delete_user_account(user):
from app.models import UserPasskey, UserSession
from app.session_service import revoke_all_sessions, revoke_current_session
from app.storage_service import delete_photo_file
from flask_login import logout_user
revoke_all_sessions(user.id, except_current=False)
UserSession.query.filter_by(user_id=user.id).delete()
UserPasskey.query.filter_by(user_id=user.id).delete()
for photo in user.photos.all():
delete_photo_file(photo.filename, photo.storage_backend)
db.session.delete(photo)
for folder in user.folders.all():
for photo in folder.photos.all():
delete_photo_file(photo.filename, photo.storage_backend)
db.session.delete(photo)
db.session.delete(folder)
db.session.delete(user)
db.session.commit()
revoke_current_session()
logout_user()