d4f0eaa7d9
Co-authored-by: Cursor <cursoragent@cursor.com>
297 lines
9.1 KiB
Python
297 lines
9.1 KiB
Python
import ipaddress
|
|
import os
|
|
import re
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from io import BytesIO
|
|
from urllib.parse import urlparse
|
|
|
|
import requests
|
|
from werkzeug.datastructures import FileStorage
|
|
from werkzeug.utils import secure_filename
|
|
|
|
from app import db
|
|
from app.models import Photo
|
|
from app.quota_utils import check_photo_count_limit, check_upload_quota
|
|
from app.settings_service import get_settings
|
|
from app.storage_service import save_photo_file
|
|
|
|
URL_SPLIT_RE = re.compile(r"[\r\n,;\s]+")
|
|
|
|
MIME_TO_EXT = {
|
|
"image/jpeg": "jpg",
|
|
"image/jpg": "jpg",
|
|
"image/png": "png",
|
|
"image/gif": "gif",
|
|
"image/webp": "webp",
|
|
"image/bmp": "bmp",
|
|
"image/x-png": "png",
|
|
}
|
|
|
|
|
|
def allowed_file(filename, allowed_extensions):
|
|
return "." in filename and filename.rsplit(".", 1)[1].lower() in allowed_extensions
|
|
|
|
|
|
def collect_upload_files(request_files):
|
|
files = request_files.getlist("photos")
|
|
if not files or all(f.filename == "" for f in files):
|
|
single = request_files.get("photo")
|
|
if single and single.filename:
|
|
files = [single]
|
|
return [f for f in files if f and f.filename]
|
|
|
|
|
|
def parse_image_urls(raw_text):
|
|
if not raw_text:
|
|
return []
|
|
urls = []
|
|
for part in URL_SPLIT_RE.split(raw_text.strip()):
|
|
url = part.strip()
|
|
if url and url not in urls:
|
|
urls.append(url)
|
|
return urls
|
|
|
|
|
|
def is_safe_image_url(url):
|
|
parsed = urlparse(url)
|
|
if parsed.scheme not in ("http", "https"):
|
|
return False
|
|
if not parsed.hostname:
|
|
return False
|
|
|
|
host = parsed.hostname.lower()
|
|
if host in ("localhost", "0.0.0.0") or host.endswith(".local"):
|
|
return False
|
|
|
|
try:
|
|
ip = ipaddress.ip_address(host)
|
|
if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved:
|
|
return False
|
|
except ValueError:
|
|
pass
|
|
|
|
return True
|
|
|
|
|
|
def guess_extension(url, content_type):
|
|
ext = MIME_TO_EXT.get((content_type or "").split(";")[0].strip().lower())
|
|
if ext:
|
|
return ext
|
|
|
|
path = urlparse(url).path
|
|
if "." in path:
|
|
candidate = path.rsplit(".", 1)[1].lower()
|
|
if candidate in {"png", "jpg", "jpeg", "gif", "webp", "bmp"}:
|
|
return "jpg" if candidate == "jpeg" else candidate
|
|
return None
|
|
|
|
|
|
def download_image(url, max_bytes, timeout=30):
|
|
response = requests.get(
|
|
url,
|
|
stream=True,
|
|
timeout=timeout,
|
|
headers={"User-Agent": "PhotoHost/2.0"},
|
|
allow_redirects=True,
|
|
)
|
|
response.raise_for_status()
|
|
|
|
final_url = response.url
|
|
if not is_safe_image_url(final_url):
|
|
raise ValueError("Недопустимый URL после редиректа")
|
|
|
|
content_type = response.headers.get("Content-Type", "")
|
|
ext = guess_extension(final_url, content_type)
|
|
if not ext:
|
|
raise ValueError("URL не содержит изображение")
|
|
|
|
chunks = []
|
|
total = 0
|
|
for chunk in response.iter_content(chunk_size=65536):
|
|
if not chunk:
|
|
continue
|
|
total += len(chunk)
|
|
if total > max_bytes:
|
|
raise ValueError("Файл превышает лимит размера")
|
|
chunks.append(chunk)
|
|
|
|
if total == 0:
|
|
raise ValueError("Пустой файл")
|
|
|
|
data = b"".join(chunks)
|
|
filename = os.path.basename(urlparse(final_url).path) or f"image.{ext}"
|
|
return data, ext, content_type, filename
|
|
|
|
|
|
def save_downloaded_image(data, ext, content_type, original_name, user, folder):
|
|
stored_name = f"{uuid.uuid4().hex}.{ext}"
|
|
safe_original = secure_filename(original_name) or f"photo.{ext}"
|
|
if not safe_original.lower().endswith(f".{ext}"):
|
|
safe_original = f"{safe_original.rsplit('.', 1)[0]}.{ext}"
|
|
|
|
stream = BytesIO(data)
|
|
file_storage = FileStorage(
|
|
stream=stream,
|
|
filename=safe_original,
|
|
content_type=content_type or f"image/{ext}",
|
|
)
|
|
|
|
_path, file_size, storage_backend, sync_errors = save_photo_file(file_storage, stored_name)
|
|
photo = Photo(
|
|
filename=stored_name,
|
|
original_name=safe_original,
|
|
file_size=file_size,
|
|
mime_type=content_type or f"image/{ext}",
|
|
user_id=user.id,
|
|
folder_id=folder.id if folder else None,
|
|
storage_backend=storage_backend,
|
|
created_at=datetime.now(timezone.utc),
|
|
)
|
|
db.session.add(photo)
|
|
return photo, sync_errors
|
|
|
|
|
|
def process_url_uploads(raw_urls, user, folder, allowed_extensions, max_upload_mb):
|
|
settings = get_settings()
|
|
max_bulk = settings.max_bulk_upload or 100
|
|
urls = parse_image_urls(raw_urls)
|
|
|
|
if not urls:
|
|
return {"uploaded": 0, "errors": ["Ссылки не указаны"], "photos": []}
|
|
|
|
if len(urls) > max_bulk:
|
|
return {
|
|
"uploaded": 0,
|
|
"errors": [f"Максимум {max_bulk} ссылок за раз"],
|
|
"photos": [],
|
|
}
|
|
|
|
ok, photo_limit_msg = check_photo_count_limit(user, len(urls))
|
|
if not ok:
|
|
return {"uploaded": 0, "errors": [photo_limit_msg], "photos": []}
|
|
|
|
max_bytes = max_upload_mb * 1024 * 1024
|
|
errors = []
|
|
uploaded_photos = []
|
|
pending_sizes = []
|
|
|
|
for url in urls:
|
|
if not is_safe_image_url(url):
|
|
errors.append(f"{url}: недопустимый URL")
|
|
continue
|
|
try:
|
|
data, ext, content_type, filename = download_image(url, max_bytes)
|
|
if ext not in allowed_extensions:
|
|
errors.append(f"{url}: недопустимый формат")
|
|
continue
|
|
pending_sizes.append((url, data, ext, content_type, filename, len(data)))
|
|
except requests.RequestException:
|
|
errors.append(f"{url}: не удалось скачать")
|
|
except ValueError as exc:
|
|
errors.append(f"{url}: {exc}")
|
|
|
|
if not pending_sizes:
|
|
return {"uploaded": 0, "errors": errors, "photos": []}
|
|
|
|
total_size = sum(item[5] for item in pending_sizes)
|
|
ok, quota_msg = check_upload_quota(user, total_size)
|
|
if not ok:
|
|
return {"uploaded": 0, "errors": [quota_msg], "photos": []}
|
|
|
|
for url, data, ext, content_type, filename, _size in pending_sizes:
|
|
try:
|
|
photo, sync_errors = save_downloaded_image(
|
|
data, ext, content_type, filename, user, folder
|
|
)
|
|
for sync_err in sync_errors:
|
|
errors.append(f"{filename}: {sync_err}")
|
|
uploaded_photos.append(photo)
|
|
except Exception as exc:
|
|
errors.append(f"{url}: {exc}")
|
|
|
|
if uploaded_photos:
|
|
db.session.commit()
|
|
|
|
return {
|
|
"uploaded": len(uploaded_photos),
|
|
"errors": errors,
|
|
"photos": uploaded_photos,
|
|
}
|
|
|
|
|
|
def process_uploads(request_files, user, folder, allowed_extensions):
|
|
settings = get_settings()
|
|
max_bulk = settings.max_bulk_upload or 100
|
|
files = collect_upload_files(request_files)
|
|
|
|
if not files:
|
|
return {"uploaded": 0, "errors": ["Файлы не выбраны"], "photos": []}
|
|
|
|
if len(files) > max_bulk:
|
|
return {
|
|
"uploaded": 0,
|
|
"errors": [f"Максимум {max_bulk} файлов за раз"],
|
|
"photos": [],
|
|
}
|
|
|
|
total_size = 0
|
|
valid_files = []
|
|
errors = []
|
|
|
|
for file in files:
|
|
if not allowed_file(file.filename, allowed_extensions):
|
|
errors.append(f"{file.filename}: недопустимый формат")
|
|
continue
|
|
file.seek(0, os.SEEK_END)
|
|
size = file.tell()
|
|
file.seek(0)
|
|
total_size += size
|
|
valid_files.append((file, size))
|
|
|
|
if not valid_files:
|
|
return {"uploaded": 0, "errors": errors, "photos": []}
|
|
|
|
ok, photo_limit_msg = check_photo_count_limit(user, len(valid_files))
|
|
if not ok:
|
|
return {"uploaded": 0, "errors": [photo_limit_msg], "photos": []}
|
|
|
|
ok, quota_msg = check_upload_quota(user, total_size)
|
|
if not ok:
|
|
return {"uploaded": 0, "errors": [quota_msg], "photos": []}
|
|
|
|
uploaded_photos = []
|
|
for file, _size in valid_files:
|
|
ext = file.filename.rsplit(".", 1)[1].lower()
|
|
stored_name = f"{uuid.uuid4().hex}.{ext}"
|
|
safe_original = secure_filename(file.filename) or f"photo.{ext}"
|
|
|
|
try:
|
|
_path, file_size, storage_backend, sync_errors = save_photo_file(file, stored_name)
|
|
for sync_err in sync_errors:
|
|
errors.append(f"{safe_original}: {sync_err}")
|
|
|
|
photo = Photo(
|
|
filename=stored_name,
|
|
original_name=safe_original,
|
|
file_size=file_size,
|
|
mime_type=file.content_type or f"image/{ext}",
|
|
user_id=user.id,
|
|
folder_id=folder.id if folder else None,
|
|
storage_backend=storage_backend,
|
|
created_at=datetime.now(timezone.utc),
|
|
)
|
|
db.session.add(photo)
|
|
uploaded_photos.append(photo)
|
|
except Exception as exc:
|
|
errors.append(f"{safe_original}: {exc}")
|
|
|
|
if uploaded_photos:
|
|
db.session.commit()
|
|
|
|
return {
|
|
"uploaded": len(uploaded_photos),
|
|
"errors": errors,
|
|
"photos": uploaded_photos,
|
|
}
|