0584ebdc74
Co-authored-by: Cursor <cursoragent@cursor.com>
395 lines
15 KiB
Python
395 lines
15 KiB
Python
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from flask_login import UserMixin
|
|
from werkzeug.security import check_password_hash, generate_password_hash
|
|
|
|
from app import db
|
|
|
|
|
|
class SiteSettings(db.Model):
|
|
__tablename__ = "site_settings"
|
|
|
|
id = db.Column(db.Integer, primary_key=True, default=1)
|
|
max_bulk_upload = db.Column(db.Integer, nullable=False, default=100)
|
|
|
|
s3_enabled = db.Column(db.Boolean, nullable=False, default=False)
|
|
s3_endpoint = db.Column(db.String(255), nullable=True)
|
|
s3_bucket = db.Column(db.String(120), nullable=True)
|
|
s3_access_key = db.Column(db.String(120), nullable=True)
|
|
s3_secret_key = db.Column(db.String(255), nullable=True)
|
|
s3_region = db.Column(db.String(80), nullable=True, default="us-east-1")
|
|
s3_public_url = db.Column(db.String(255), nullable=True)
|
|
|
|
sftp_enabled = db.Column(db.Boolean, nullable=False, default=False)
|
|
sftp_host = db.Column(db.String(255), nullable=True)
|
|
sftp_port = db.Column(db.Integer, nullable=False, default=22)
|
|
sftp_username = db.Column(db.String(120), nullable=True)
|
|
sftp_password = db.Column(db.String(255), nullable=True)
|
|
sftp_remote_path = db.Column(db.String(255), nullable=True, default="/uploads")
|
|
|
|
ftp_enabled = db.Column(db.Boolean, nullable=False, default=False)
|
|
ftp_host = db.Column(db.String(255), nullable=True)
|
|
ftp_port = db.Column(db.Integer, nullable=False, default=21)
|
|
ftp_username = db.Column(db.String(120), nullable=True)
|
|
ftp_password = db.Column(db.String(255), nullable=True)
|
|
ftp_remote_path = db.Column(db.String(255), nullable=True, default="/uploads")
|
|
ftp_use_tls = db.Column(db.Boolean, nullable=False, default=False)
|
|
|
|
smtp_enabled = db.Column(db.Boolean, nullable=False, default=False)
|
|
smtp_host = db.Column(db.String(255), nullable=True)
|
|
smtp_port = db.Column(db.Integer, nullable=False, default=587)
|
|
smtp_username = db.Column(db.String(120), nullable=True)
|
|
smtp_password = db.Column(db.String(255), nullable=True)
|
|
smtp_from_email = db.Column(db.String(120), nullable=True)
|
|
smtp_from_name = db.Column(db.String(120), nullable=True, default="PhotoHost")
|
|
smtp_use_tls = db.Column(db.Boolean, nullable=False, default=True)
|
|
|
|
registration_enabled = db.Column(db.Boolean, nullable=False, default=True)
|
|
password_login_enabled = db.Column(db.Boolean, nullable=False, default=True)
|
|
passkey_enabled = db.Column(db.Boolean, nullable=False, default=True)
|
|
webauthn_rp_id = db.Column(db.String(255), nullable=True)
|
|
webauthn_rp_name = db.Column(db.String(120), nullable=True, default="PhotoHost")
|
|
webauthn_origin = db.Column(db.String(255), nullable=True)
|
|
|
|
captcha_provider = db.Column(db.String(20), nullable=False, default="none")
|
|
turnstile_site_key = db.Column(db.String(255), nullable=True)
|
|
turnstile_secret_key = db.Column(db.String(255), nullable=True)
|
|
recaptcha_v2_site_key = db.Column(db.String(255), nullable=True)
|
|
recaptcha_v2_secret_key = db.Column(db.String(255), nullable=True)
|
|
recaptcha_v3_site_key = db.Column(db.String(255), nullable=True)
|
|
recaptcha_v3_secret_key = db.Column(db.String(255), nullable=True)
|
|
recaptcha_v3_min_score = db.Column(db.Float, nullable=False, default=0.5)
|
|
captcha_on_login = db.Column(db.Boolean, nullable=False, default=False)
|
|
captcha_on_register = db.Column(db.Boolean, nullable=False, default=True)
|
|
captcha_on_forgot_password = db.Column(db.Boolean, nullable=False, default=False)
|
|
|
|
updated_at = db.Column(
|
|
db.DateTime,
|
|
nullable=False,
|
|
default=lambda: datetime.now(timezone.utc),
|
|
onupdate=lambda: datetime.now(timezone.utc),
|
|
)
|
|
|
|
|
|
class PasswordResetToken(db.Model):
|
|
__tablename__ = "password_reset_tokens"
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False, index=True)
|
|
token = db.Column(db.String(64), unique=True, nullable=False, index=True)
|
|
expires_at = db.Column(db.DateTime, nullable=False)
|
|
used = db.Column(db.Boolean, nullable=False, default=False)
|
|
created_at = db.Column(
|
|
db.DateTime,
|
|
nullable=False,
|
|
default=lambda: datetime.now(timezone.utc),
|
|
)
|
|
|
|
user = db.relationship("User", backref="reset_tokens")
|
|
|
|
@staticmethod
|
|
def create_for_user(user, hours=24):
|
|
token = PasswordResetToken(
|
|
user_id=user.id,
|
|
token=uuid.uuid4().hex,
|
|
expires_at=datetime.now(timezone.utc) + timedelta(hours=hours),
|
|
)
|
|
return token
|
|
|
|
def is_valid(self):
|
|
now = datetime.now(timezone.utc)
|
|
expires = self.expires_at
|
|
if expires.tzinfo is None:
|
|
expires = expires.replace(tzinfo=timezone.utc)
|
|
return not self.used and expires > now
|
|
|
|
|
|
class User(UserMixin, db.Model):
|
|
__tablename__ = "users"
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
|
|
email = db.Column(db.String(120), unique=True, nullable=False, index=True)
|
|
password_hash = db.Column(db.String(256), nullable=False)
|
|
is_admin = db.Column(db.Boolean, nullable=False, default=False)
|
|
is_active = db.Column(db.Boolean, nullable=False, default=True)
|
|
group_id = db.Column(db.Integer, db.ForeignKey("user_groups.id"), nullable=True, index=True)
|
|
gdpr_accepted_at = db.Column(db.DateTime, nullable=True)
|
|
cookie_analytics = db.Column(db.Boolean, nullable=False, default=False)
|
|
created_at = db.Column(
|
|
db.DateTime,
|
|
nullable=False,
|
|
default=lambda: datetime.now(timezone.utc),
|
|
)
|
|
|
|
photos = db.relationship("Photo", backref="owner", lazy="dynamic")
|
|
folders = db.relationship("Folder", backref="owner", lazy="dynamic")
|
|
group = db.relationship("UserGroup", backref="users")
|
|
|
|
def set_password(self, password):
|
|
self.password_hash = generate_password_hash(password)
|
|
|
|
def check_password(self, password):
|
|
return check_password_hash(self.password_hash, password)
|
|
|
|
@property
|
|
def photo_count(self):
|
|
return self.photos.count()
|
|
|
|
@property
|
|
def total_size(self):
|
|
from sqlalchemy import func
|
|
|
|
result = db.session.query(func.coalesce(func.sum(Photo.file_size), 0)).filter(
|
|
Photo.user_id == self.id
|
|
).scalar()
|
|
return int(result or 0)
|
|
|
|
|
|
class UserSession(db.Model):
|
|
__tablename__ = "user_sessions"
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False, index=True)
|
|
session_key = db.Column(db.String(64), unique=True, nullable=False, index=True)
|
|
ip_address = db.Column(db.String(45), nullable=True)
|
|
user_agent = db.Column(db.String(512), nullable=True)
|
|
created_at = db.Column(
|
|
db.DateTime,
|
|
nullable=False,
|
|
default=lambda: datetime.now(timezone.utc),
|
|
)
|
|
last_seen_at = db.Column(
|
|
db.DateTime,
|
|
nullable=False,
|
|
default=lambda: datetime.now(timezone.utc),
|
|
)
|
|
revoked = db.Column(db.Boolean, nullable=False, default=False)
|
|
|
|
user = db.relationship("User", backref=db.backref("sessions", lazy="dynamic"))
|
|
|
|
@property
|
|
def device_label(self):
|
|
if not self.user_agent:
|
|
return "Неизвестное устройство"
|
|
ua = self.user_agent.lower()
|
|
if "mobile" in ua or "android" in ua or "iphone" in ua:
|
|
return "Мобильное устройство"
|
|
if "windows" in ua:
|
|
return "Windows"
|
|
if "mac" in ua:
|
|
return "macOS"
|
|
if "linux" in ua:
|
|
return "Linux"
|
|
return "Браузер"
|
|
|
|
|
|
class UserPasskey(db.Model):
|
|
__tablename__ = "user_passkeys"
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False, index=True)
|
|
credential_id = db.Column(db.String(512), unique=True, nullable=False, index=True)
|
|
public_key = db.Column(db.Text, nullable=False)
|
|
sign_count = db.Column(db.Integer, nullable=False, default=0)
|
|
name = db.Column(db.String(120), nullable=False, default="Passkey")
|
|
created_at = db.Column(
|
|
db.DateTime,
|
|
nullable=False,
|
|
default=lambda: datetime.now(timezone.utc),
|
|
)
|
|
last_used_at = db.Column(db.DateTime, nullable=True)
|
|
|
|
user = db.relationship("User", backref=db.backref("passkeys", lazy="dynamic"))
|
|
|
|
|
|
class UserGroup(db.Model):
|
|
__tablename__ = "user_groups"
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(80), unique=True, nullable=False)
|
|
slug = db.Column(db.String(80), unique=True, nullable=False, index=True)
|
|
disk_quota_mb = db.Column(db.Integer, nullable=False, default=100)
|
|
max_folders = db.Column(db.Integer, nullable=False, default=10)
|
|
max_photos = db.Column(db.Integer, nullable=False, default=500)
|
|
is_default = db.Column(db.Boolean, nullable=False, default=False)
|
|
created_at = db.Column(
|
|
db.DateTime,
|
|
nullable=False,
|
|
default=lambda: datetime.now(timezone.utc),
|
|
)
|
|
|
|
@property
|
|
def user_count(self):
|
|
return len(self.users)
|
|
|
|
@property
|
|
def quota_label(self):
|
|
if self.disk_quota_mb == 0:
|
|
return "Без лимита"
|
|
return f"{self.disk_quota_mb} МБ"
|
|
|
|
@property
|
|
def folders_limit_label(self):
|
|
if self.max_folders == 0:
|
|
return "Без лимита"
|
|
return str(self.max_folders)
|
|
|
|
@property
|
|
def photos_limit_label(self):
|
|
if self.max_photos == 0:
|
|
return "Без лимита"
|
|
return str(self.max_photos)
|
|
|
|
|
|
class AdBanner(db.Model):
|
|
__tablename__ = "ad_banners"
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
title = db.Column(db.String(120), nullable=False)
|
|
image_url = db.Column(db.String(500), nullable=False)
|
|
link_url = db.Column(db.String(500), nullable=True)
|
|
alt_text = db.Column(db.String(200), nullable=True)
|
|
position = db.Column(db.String(30), nullable=False, default="main", index=True)
|
|
is_active = db.Column(db.Boolean, nullable=False, default=True)
|
|
sort_order = db.Column(db.Integer, nullable=False, default=0)
|
|
created_at = db.Column(
|
|
db.DateTime,
|
|
nullable=False,
|
|
default=lambda: datetime.now(timezone.utc),
|
|
)
|
|
|
|
POSITIONS = {
|
|
"main": "Главная (под hero)",
|
|
"cabinet": "Личный кабинет",
|
|
"sidebar": "Боковая колонка",
|
|
"footer": "Подвал",
|
|
}
|
|
|
|
@property
|
|
def position_label(self):
|
|
return self.POSITIONS.get(self.position, self.position)
|
|
|
|
|
|
class Folder(db.Model):
|
|
__tablename__ = "folders"
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(120), nullable=False)
|
|
owner_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False, index=True)
|
|
share_token = db.Column(db.String(64), unique=True, nullable=False, index=True)
|
|
is_private = db.Column(db.Boolean, nullable=False, default=True)
|
|
password_hash = db.Column(db.String(256), nullable=True)
|
|
created_at = db.Column(
|
|
db.DateTime,
|
|
nullable=False,
|
|
default=lambda: datetime.now(timezone.utc),
|
|
)
|
|
|
|
photos = db.relationship("Photo", backref="folder", lazy="dynamic")
|
|
members = db.relationship("FolderMember", backref="folder", lazy="dynamic", cascade="all, delete-orphan")
|
|
invites = db.relationship("FolderInvite", backref="folder", lazy="dynamic", cascade="all, delete-orphan")
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
if not self.share_token:
|
|
self.share_token = uuid.uuid4().hex
|
|
|
|
def set_access_password(self, password):
|
|
if password:
|
|
self.password_hash = generate_password_hash(password)
|
|
else:
|
|
self.password_hash = None
|
|
|
|
def check_access_password(self, password):
|
|
if not self.password_hash:
|
|
return True
|
|
return check_password_hash(self.password_hash, password)
|
|
|
|
@property
|
|
def has_password(self):
|
|
return bool(self.password_hash)
|
|
|
|
@property
|
|
def photo_count(self):
|
|
return self.photos.count()
|
|
|
|
def regenerate_share_token(self):
|
|
self.share_token = uuid.uuid4().hex
|
|
|
|
|
|
class FolderMember(db.Model):
|
|
__tablename__ = "folder_members"
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
folder_id = db.Column(db.Integer, db.ForeignKey("folders.id"), nullable=False, index=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False, index=True)
|
|
role = db.Column(db.String(20), nullable=False, default="viewer")
|
|
added_at = db.Column(
|
|
db.DateTime,
|
|
nullable=False,
|
|
default=lambda: datetime.now(timezone.utc),
|
|
)
|
|
added_by_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True)
|
|
|
|
user = db.relationship("User", foreign_keys=[user_id])
|
|
added_by = db.relationship("User", foreign_keys=[added_by_id])
|
|
|
|
__table_args__ = (db.UniqueConstraint("folder_id", "user_id", name="uq_folder_member"),)
|
|
|
|
|
|
class FolderInvite(db.Model):
|
|
__tablename__ = "folder_invites"
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
folder_id = db.Column(db.Integer, db.ForeignKey("folders.id"), nullable=False, index=True)
|
|
email = db.Column(db.String(120), nullable=False, index=True)
|
|
role = db.Column(db.String(20), nullable=False, default="viewer")
|
|
invited_by_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
|
|
created_at = db.Column(
|
|
db.DateTime,
|
|
nullable=False,
|
|
default=lambda: datetime.now(timezone.utc),
|
|
)
|
|
|
|
invited_by = db.relationship("User", foreign_keys=[invited_by_id])
|
|
|
|
__table_args__ = (db.UniqueConstraint("folder_id", "email", name="uq_folder_invite"),)
|
|
|
|
|
|
class Photo(db.Model):
|
|
__tablename__ = "photos"
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
filename = db.Column(db.String(255), nullable=False)
|
|
original_name = db.Column(db.String(255), nullable=False)
|
|
file_size = db.Column(db.Integer, nullable=False, default=0)
|
|
mime_type = db.Column(db.String(100), nullable=False, default="image/jpeg")
|
|
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True, index=True)
|
|
folder_id = db.Column(db.Integer, db.ForeignKey("folders.id"), nullable=True, index=True)
|
|
storage_backend = db.Column(db.String(20), nullable=False, default="local")
|
|
created_at = db.Column(
|
|
db.DateTime,
|
|
nullable=False,
|
|
default=lambda: datetime.now(timezone.utc),
|
|
)
|
|
|
|
@property
|
|
def url(self):
|
|
from app.settings_service import get_settings
|
|
|
|
settings = get_settings()
|
|
if self.storage_backend == "s3" and settings.s3_public_url:
|
|
return f"{settings.s3_public_url.rstrip('/')}/{self.filename}"
|
|
return f"/uploads/{self.filename}"
|
|
|
|
@property
|
|
def size_human(self):
|
|
size = self.file_size
|
|
for unit in ("Б", "КБ", "МБ", "ГБ"):
|
|
if size < 1024:
|
|
return f"{size:.0f} {unit}" if unit == "Б" else f"{size:.1f} {unit}"
|
|
size /= 1024
|
|
return f"{size:.1f} ТБ"
|