Files

180 lines
7.4 KiB
Python

from flask import Blueprint, flash, redirect, render_template, request, url_for
from flask_login import current_user, login_user, logout_user
from app import db
from app.captcha_service import verify_captcha
from app.session_service import create_user_session, revoke_current_session
from app.email_service import send_password_reset_email, send_welcome_email
from app.folder_utils import process_pending_invites
from app.models import PasswordResetToken, User, UserGroup
from app.settings_service import get_auth_public_settings, get_settings
bp = Blueprint("auth", __name__, url_prefix="/auth")
def _registration_allowed():
return get_settings().registration_enabled
@bp.route("/register", methods=["GET", "POST"])
def register():
if current_user.is_authenticated:
return redirect(url_for("cabinet.index"))
if not _registration_allowed():
flash("Регистрация новых пользователей отключена", "error")
return redirect(url_for("auth.login"))
if request.method == "POST":
ok, captcha_msg = verify_captcha(request, "register")
if not ok:
flash(captcha_msg, "error")
return render_template("auth/register.html")
username = request.form.get("username", "").strip()
email = request.form.get("email", "").strip().lower()
password = request.form.get("password", "")
password2 = request.form.get("password2", "")
if len(username) < 3:
flash("Имя пользователя — минимум 3 символа", "error")
elif len(password) < 6:
flash("Пароль — минимум 6 символов", "error")
elif password != password2:
flash("Пароли не совпадают", "error")
elif User.query.filter_by(username=username).first():
flash("Это имя пользователя уже занято", "error")
elif User.query.filter_by(email=email).first():
flash("Этот email уже зарегистрирован", "error")
else:
default_group = UserGroup.query.filter_by(is_default=True).first()
user = User(
username=username,
email=email,
group_id=default_group.id if default_group else None,
)
user.set_password(password)
db.session.add(user)
db.session.commit()
login_user(user)
create_user_session(user)
accepted = process_pending_invites(user)
send_welcome_email(user)
flash("Регистрация успешна. Добро пожаловать!", "success")
if accepted:
flash(f"Вам открыт доступ к {accepted} общим папкам", "success")
return redirect(url_for("cabinet.index"))
return render_template("auth/register.html")
@bp.route("/login", methods=["GET", "POST"])
def login():
if current_user.is_authenticated:
return redirect(url_for("cabinet.index"))
auth = get_auth_public_settings()
if not auth["password_login_enabled"] and not auth["passkey_enabled"]:
flash("Вход временно недоступен", "error")
return render_template("auth/login.html")
if request.method == "POST":
if auth["password_login_enabled"]:
ok, captcha_msg = verify_captcha(request, "login")
if not ok:
flash(captcha_msg, "error")
return render_template("auth/login.html")
login = request.form.get("login", "").strip()
password = request.form.get("password", "")
remember = request.form.get("remember") == "on"
user = User.query.filter(
(User.username == login) | (User.email == login.lower())
).first()
if not auth["password_login_enabled"]:
flash("Вход по паролю отключён. Используйте Passkey.", "error")
elif user is None or not user.check_password(password):
flash("Неверный логин или пароль", "error")
elif not user.is_active:
flash("Аккаунт заблокирован", "error")
else:
login_user(user, remember=remember)
create_user_session(user, remember=remember)
accepted = process_pending_invites(user)
flash(f"Добро пожаловать, {user.username}!", "success")
if accepted:
flash(f"Вам открыт доступ к {accepted} общим папкам", "success")
next_page = request.args.get("next")
if next_page:
return redirect(next_page)
if user.is_admin:
return redirect(url_for("admin.dashboard"))
return redirect(url_for("cabinet.index"))
return render_template("auth/login.html")
@bp.route("/forgot-password", methods=["GET", "POST"])
def forgot_password():
if current_user.is_authenticated:
return redirect(url_for("cabinet.index"))
if request.method == "POST":
ok, captcha_msg = verify_captcha(request, "forgot_password")
if not ok:
flash(captcha_msg, "error")
return render_template("auth/forgot_password.html")
email = request.form.get("email", "").strip().lower()
user = User.query.filter_by(email=email).first()
if user:
token = PasswordResetToken.create_for_user(user)
db.session.add(token)
db.session.commit()
ok, msg = send_password_reset_email(user, token.token)
if not ok:
flash(f"Не удалось отправить email: {msg}", "error")
return redirect(url_for("auth.forgot_password"))
flash("Если email зарегистрирован, на него отправлена ссылка для сброса пароля", "success")
return redirect(url_for("auth.login"))
return render_template("auth/forgot_password.html")
@bp.route("/reset-password/<token>", methods=["GET", "POST"])
def reset_password(token):
if current_user.is_authenticated:
return redirect(url_for("cabinet.index"))
reset_token = PasswordResetToken.query.filter_by(token=token).first()
if reset_token is None or not reset_token.is_valid():
flash("Ссылка для сброса пароля недействительна или истекла", "error")
return redirect(url_for("auth.forgot_password"))
if request.method == "POST":
password = request.form.get("password", "")
password2 = request.form.get("password2", "")
if len(password) < 6:
flash("Пароль — минимум 6 символов", "error")
elif password != password2:
flash("Пароли не совпадают", "error")
else:
reset_token.user.set_password(password)
reset_token.used = True
db.session.commit()
flash("Пароль успешно изменён. Войдите в аккаунт.", "success")
return redirect(url_for("auth.login"))
return render_template("auth/reset_password.html", token=token)
@bp.route("/logout")
def logout():
revoke_current_session()
logout_user()
flash("Вы вышли из аккаунта", "success")
return redirect(url_for("main.index"))