Files
fotohost/app/passkey_service.py
T

175 lines
5.4 KiB
Python

import base64
import os
from datetime import datetime, timezone
import json
from flask import current_app, session
from webauthn import (
generate_authentication_options,
generate_registration_options,
verify_authentication_response,
verify_registration_response,
)
from webauthn.helpers.parse_authentication_credential_json import (
parse_authentication_credential_json,
)
from webauthn.helpers.parse_registration_credential_json import (
parse_registration_credential_json,
)
from webauthn.helpers.structs import (
AuthenticatorSelectionCriteria,
PublicKeyCredentialDescriptor,
ResidentKeyRequirement,
UserVerificationRequirement,
)
from app import db
from app.models import UserPasskey
from app.settings_service import get_settings
def is_passkey_enabled():
settings = get_settings()
return settings.passkey_enabled
def _rp_id():
settings = get_settings()
if settings.webauthn_rp_id:
return settings.webauthn_rp_id.strip()
return os.getenv("WEBAUTHN_RP_ID", "localhost")
def _rp_name():
settings = get_settings()
if settings.webauthn_rp_name:
return settings.webauthn_rp_name.strip()
return os.getenv("WEBAUTHN_RP_NAME", "PhotoHost")
def _origin():
settings = get_settings()
if settings.webauthn_origin:
return settings.webauthn_origin.strip()
return os.getenv("WEBAUTHN_ORIGIN", "http://localhost:8080")
def _b64url_encode(data):
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("utf-8")
def _b64url_decode(data):
padding = "=" * (-len(data) % 4)
return base64.urlsafe_b64decode(data + padding)
def registration_options(user):
existing = UserPasskey.query.filter_by(user_id=user.id).all()
exclude = [
PublicKeyCredentialDescriptor(id=_b64url_decode(item.credential_id))
for item in existing
]
options = generate_registration_options(
rp_id=_rp_id(),
rp_name=_rp_name(),
user_id=str(user.id).encode("utf-8"),
user_name=user.username,
user_display_name=user.username,
exclude_credentials=exclude,
authenticator_selection=AuthenticatorSelectionCriteria(
resident_key=ResidentKeyRequirement.PREFERRED,
user_verification=UserVerificationRequirement.PREFERRED,
),
)
session["passkey_reg_challenge"] = _b64url_encode(options.challenge)
return options
def verify_registration(user, credential_json, name):
challenge = session.pop("passkey_reg_challenge", None)
if not challenge:
raise ValueError("Сессия регистрации passkey истекла")
verification = verify_registration_response(
credential=parse_registration_credential_json(json.dumps(credential_json)),
expected_challenge=_b64url_decode(challenge),
expected_rp_id=_rp_id(),
expected_origin=_origin(),
)
passkey = UserPasskey(
user_id=user.id,
credential_id=credential_json.get("id") or _b64url_encode(verification.credential_id),
public_key=_b64url_encode(verification.credential_public_key),
sign_count=verification.sign_count,
name=(name or "Passkey").strip()[:120] or "Passkey",
)
db.session.add(passkey)
db.session.commit()
return passkey
def authentication_options(user):
passkeys = UserPasskey.query.filter_by(user_id=user.id).all()
if not passkeys:
raise ValueError("Passkey не настроен")
allow = [
PublicKeyCredentialDescriptor(id=_b64url_decode(item.credential_id))
for item in passkeys
]
options = generate_authentication_options(
rp_id=_rp_id(),
allow_credentials=allow,
user_verification=UserVerificationRequirement.PREFERRED,
)
session["passkey_auth_challenge"] = _b64url_encode(options.challenge)
session["passkey_auth_user_id"] = user.id
return options
def verify_authentication(credential_json):
challenge = session.pop("passkey_auth_challenge", None)
user_id = session.pop("passkey_auth_user_id", None)
if not challenge or not user_id:
raise ValueError("Сессия входа passkey истекла")
credential_id = credential_json.get("id") or credential_json.get("rawId")
if isinstance(credential_id, dict):
credential_id = credential_id.get("id")
if not credential_id:
raise ValueError("Некорректные данные passkey")
passkey = UserPasskey.query.filter_by(
user_id=user_id, credential_id=credential_id
).first()
if not passkey:
passkey = UserPasskey.query.filter_by(credential_id=credential_id).first()
if not passkey:
raise ValueError("Passkey не найден")
verification = verify_authentication_response(
credential=parse_authentication_credential_json(json.dumps(credential_json)),
expected_challenge=_b64url_decode(challenge),
expected_rp_id=_rp_id(),
expected_origin=_origin(),
credential_public_key=_b64url_decode(passkey.public_key),
credential_current_sign_count=passkey.sign_count,
)
passkey.sign_count = verification.new_sign_count
passkey.last_used_at = datetime.now(timezone.utc)
db.session.commit()
return passkey.user
def delete_passkey(user, passkey_id):
passkey = UserPasskey.query.filter_by(id=passkey_id, user_id=user.id).first()
if not passkey:
return False
db.session.delete(passkey)
db.session.commit()
return True