Files
fotohost/app/upload_service.py
2026-06-07 02:36:59 +03:00

297 lines
9.1 KiB
Python

import ipaddress
import os
import re
import uuid
from datetime import datetime, timezone
from io import BytesIO
from urllib.parse import urlparse
import requests
from werkzeug.datastructures import FileStorage
from werkzeug.utils import secure_filename
from app import db
from app.models import Photo
from app.quota_utils import check_photo_count_limit, check_upload_quota
from app.settings_service import get_settings
from app.storage_service import save_photo_file
URL_SPLIT_RE = re.compile(r"[\r\n,;\s]+")
MIME_TO_EXT = {
"image/jpeg": "jpg",
"image/jpg": "jpg",
"image/png": "png",
"image/gif": "gif",
"image/webp": "webp",
"image/bmp": "bmp",
"image/x-png": "png",
}
def allowed_file(filename, allowed_extensions):
return "." in filename and filename.rsplit(".", 1)[1].lower() in allowed_extensions
def collect_upload_files(request_files):
files = request_files.getlist("photos")
if not files or all(f.filename == "" for f in files):
single = request_files.get("photo")
if single and single.filename:
files = [single]
return [f for f in files if f and f.filename]
def parse_image_urls(raw_text):
if not raw_text:
return []
urls = []
for part in URL_SPLIT_RE.split(raw_text.strip()):
url = part.strip()
if url and url not in urls:
urls.append(url)
return urls
def is_safe_image_url(url):
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
return False
if not parsed.hostname:
return False
host = parsed.hostname.lower()
if host in ("localhost", "0.0.0.0") or host.endswith(".local"):
return False
try:
ip = ipaddress.ip_address(host)
if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved:
return False
except ValueError:
pass
return True
def guess_extension(url, content_type):
ext = MIME_TO_EXT.get((content_type or "").split(";")[0].strip().lower())
if ext:
return ext
path = urlparse(url).path
if "." in path:
candidate = path.rsplit(".", 1)[1].lower()
if candidate in {"png", "jpg", "jpeg", "gif", "webp", "bmp"}:
return "jpg" if candidate == "jpeg" else candidate
return None
def download_image(url, max_bytes, timeout=30):
response = requests.get(
url,
stream=True,
timeout=timeout,
headers={"User-Agent": "PhotoHost/2.0"},
allow_redirects=True,
)
response.raise_for_status()
final_url = response.url
if not is_safe_image_url(final_url):
raise ValueError("Недопустимый URL после редиректа")
content_type = response.headers.get("Content-Type", "")
ext = guess_extension(final_url, content_type)
if not ext:
raise ValueError("URL не содержит изображение")
chunks = []
total = 0
for chunk in response.iter_content(chunk_size=65536):
if not chunk:
continue
total += len(chunk)
if total > max_bytes:
raise ValueError("Файл превышает лимит размера")
chunks.append(chunk)
if total == 0:
raise ValueError("Пустой файл")
data = b"".join(chunks)
filename = os.path.basename(urlparse(final_url).path) or f"image.{ext}"
return data, ext, content_type, filename
def save_downloaded_image(data, ext, content_type, original_name, user, folder):
stored_name = f"{uuid.uuid4().hex}.{ext}"
safe_original = secure_filename(original_name) or f"photo.{ext}"
if not safe_original.lower().endswith(f".{ext}"):
safe_original = f"{safe_original.rsplit('.', 1)[0]}.{ext}"
stream = BytesIO(data)
file_storage = FileStorage(
stream=stream,
filename=safe_original,
content_type=content_type or f"image/{ext}",
)
_path, file_size, storage_backend, sync_errors = save_photo_file(file_storage, stored_name)
photo = Photo(
filename=stored_name,
original_name=safe_original,
file_size=file_size,
mime_type=content_type or f"image/{ext}",
user_id=user.id,
folder_id=folder.id if folder else None,
storage_backend=storage_backend,
created_at=datetime.now(timezone.utc),
)
db.session.add(photo)
return photo, sync_errors
def process_url_uploads(raw_urls, user, folder, allowed_extensions, max_upload_mb):
settings = get_settings()
max_bulk = settings.max_bulk_upload or 100
urls = parse_image_urls(raw_urls)
if not urls:
return {"uploaded": 0, "errors": ["Ссылки не указаны"], "photos": []}
if len(urls) > max_bulk:
return {
"uploaded": 0,
"errors": [f"Максимум {max_bulk} ссылок за раз"],
"photos": [],
}
ok, photo_limit_msg = check_photo_count_limit(user, len(urls))
if not ok:
return {"uploaded": 0, "errors": [photo_limit_msg], "photos": []}
max_bytes = max_upload_mb * 1024 * 1024
errors = []
uploaded_photos = []
pending_sizes = []
for url in urls:
if not is_safe_image_url(url):
errors.append(f"{url}: недопустимый URL")
continue
try:
data, ext, content_type, filename = download_image(url, max_bytes)
if ext not in allowed_extensions:
errors.append(f"{url}: недопустимый формат")
continue
pending_sizes.append((url, data, ext, content_type, filename, len(data)))
except requests.RequestException:
errors.append(f"{url}: не удалось скачать")
except ValueError as exc:
errors.append(f"{url}: {exc}")
if not pending_sizes:
return {"uploaded": 0, "errors": errors, "photos": []}
total_size = sum(item[5] for item in pending_sizes)
ok, quota_msg = check_upload_quota(user, total_size)
if not ok:
return {"uploaded": 0, "errors": [quota_msg], "photos": []}
for url, data, ext, content_type, filename, _size in pending_sizes:
try:
photo, sync_errors = save_downloaded_image(
data, ext, content_type, filename, user, folder
)
for sync_err in sync_errors:
errors.append(f"{filename}: {sync_err}")
uploaded_photos.append(photo)
except Exception as exc:
errors.append(f"{url}: {exc}")
if uploaded_photos:
db.session.commit()
return {
"uploaded": len(uploaded_photos),
"errors": errors,
"photos": uploaded_photos,
}
def process_uploads(request_files, user, folder, allowed_extensions):
settings = get_settings()
max_bulk = settings.max_bulk_upload or 100
files = collect_upload_files(request_files)
if not files:
return {"uploaded": 0, "errors": ["Файлы не выбраны"], "photos": []}
if len(files) > max_bulk:
return {
"uploaded": 0,
"errors": [f"Максимум {max_bulk} файлов за раз"],
"photos": [],
}
total_size = 0
valid_files = []
errors = []
for file in files:
if not allowed_file(file.filename, allowed_extensions):
errors.append(f"{file.filename}: недопустимый формат")
continue
file.seek(0, os.SEEK_END)
size = file.tell()
file.seek(0)
total_size += size
valid_files.append((file, size))
if not valid_files:
return {"uploaded": 0, "errors": errors, "photos": []}
ok, photo_limit_msg = check_photo_count_limit(user, len(valid_files))
if not ok:
return {"uploaded": 0, "errors": [photo_limit_msg], "photos": []}
ok, quota_msg = check_upload_quota(user, total_size)
if not ok:
return {"uploaded": 0, "errors": [quota_msg], "photos": []}
uploaded_photos = []
for file, _size in valid_files:
ext = file.filename.rsplit(".", 1)[1].lower()
stored_name = f"{uuid.uuid4().hex}.{ext}"
safe_original = secure_filename(file.filename) or f"photo.{ext}"
try:
_path, file_size, storage_backend, sync_errors = save_photo_file(file, stored_name)
for sync_err in sync_errors:
errors.append(f"{safe_original}: {sync_err}")
photo = Photo(
filename=stored_name,
original_name=safe_original,
file_size=file_size,
mime_type=file.content_type or f"image/{ext}",
user_id=user.id,
folder_id=folder.id if folder else None,
storage_backend=storage_backend,
created_at=datetime.now(timezone.utc),
)
db.session.add(photo)
uploaded_photos.append(photo)
except Exception as exc:
errors.append(f"{safe_original}: {exc}")
if uploaded_photos:
db.session.commit()
return {
"uploaded": len(uploaded_photos),
"errors": errors,
"photos": uploaded_photos,
}