0584ebdc74
Co-authored-by: Cursor <cursoragent@cursor.com>
175 lines
5.4 KiB
Python
175 lines
5.4 KiB
Python
import base64
|
|
import os
|
|
from datetime import datetime, timezone
|
|
|
|
import json
|
|
|
|
from flask import current_app, session
|
|
from webauthn import (
|
|
generate_authentication_options,
|
|
generate_registration_options,
|
|
verify_authentication_response,
|
|
verify_registration_response,
|
|
)
|
|
from webauthn.helpers.parse_authentication_credential_json import (
|
|
parse_authentication_credential_json,
|
|
)
|
|
from webauthn.helpers.parse_registration_credential_json import (
|
|
parse_registration_credential_json,
|
|
)
|
|
from webauthn.helpers.structs import (
|
|
AuthenticatorSelectionCriteria,
|
|
PublicKeyCredentialDescriptor,
|
|
ResidentKeyRequirement,
|
|
UserVerificationRequirement,
|
|
)
|
|
|
|
from app import db
|
|
from app.models import UserPasskey
|
|
from app.settings_service import get_settings
|
|
|
|
|
|
def is_passkey_enabled():
|
|
settings = get_settings()
|
|
return settings.passkey_enabled
|
|
|
|
|
|
def _rp_id():
|
|
settings = get_settings()
|
|
if settings.webauthn_rp_id:
|
|
return settings.webauthn_rp_id.strip()
|
|
return os.getenv("WEBAUTHN_RP_ID", "localhost")
|
|
|
|
|
|
def _rp_name():
|
|
settings = get_settings()
|
|
if settings.webauthn_rp_name:
|
|
return settings.webauthn_rp_name.strip()
|
|
return os.getenv("WEBAUTHN_RP_NAME", "PhotoHost")
|
|
|
|
|
|
def _origin():
|
|
settings = get_settings()
|
|
if settings.webauthn_origin:
|
|
return settings.webauthn_origin.strip()
|
|
return os.getenv("WEBAUTHN_ORIGIN", "http://localhost:8080")
|
|
|
|
|
|
def _b64url_encode(data):
|
|
return base64.urlsafe_b64encode(data).rstrip(b"=").decode("utf-8")
|
|
|
|
|
|
def _b64url_decode(data):
|
|
padding = "=" * (-len(data) % 4)
|
|
return base64.urlsafe_b64decode(data + padding)
|
|
|
|
|
|
def registration_options(user):
|
|
existing = UserPasskey.query.filter_by(user_id=user.id).all()
|
|
exclude = [
|
|
PublicKeyCredentialDescriptor(id=_b64url_decode(item.credential_id))
|
|
for item in existing
|
|
]
|
|
|
|
options = generate_registration_options(
|
|
rp_id=_rp_id(),
|
|
rp_name=_rp_name(),
|
|
user_id=str(user.id).encode("utf-8"),
|
|
user_name=user.username,
|
|
user_display_name=user.username,
|
|
exclude_credentials=exclude,
|
|
authenticator_selection=AuthenticatorSelectionCriteria(
|
|
resident_key=ResidentKeyRequirement.PREFERRED,
|
|
user_verification=UserVerificationRequirement.PREFERRED,
|
|
),
|
|
)
|
|
session["passkey_reg_challenge"] = _b64url_encode(options.challenge)
|
|
return options
|
|
|
|
|
|
def verify_registration(user, credential_json, name):
|
|
challenge = session.pop("passkey_reg_challenge", None)
|
|
if not challenge:
|
|
raise ValueError("Сессия регистрации passkey истекла")
|
|
|
|
verification = verify_registration_response(
|
|
credential=parse_registration_credential_json(json.dumps(credential_json)),
|
|
expected_challenge=_b64url_decode(challenge),
|
|
expected_rp_id=_rp_id(),
|
|
expected_origin=_origin(),
|
|
)
|
|
|
|
passkey = UserPasskey(
|
|
user_id=user.id,
|
|
credential_id=credential_json.get("id") or _b64url_encode(verification.credential_id),
|
|
public_key=_b64url_encode(verification.credential_public_key),
|
|
sign_count=verification.sign_count,
|
|
name=(name or "Passkey").strip()[:120] or "Passkey",
|
|
)
|
|
db.session.add(passkey)
|
|
db.session.commit()
|
|
return passkey
|
|
|
|
|
|
def authentication_options(user):
|
|
passkeys = UserPasskey.query.filter_by(user_id=user.id).all()
|
|
if not passkeys:
|
|
raise ValueError("Passkey не настроен")
|
|
|
|
allow = [
|
|
PublicKeyCredentialDescriptor(id=_b64url_decode(item.credential_id))
|
|
for item in passkeys
|
|
]
|
|
options = generate_authentication_options(
|
|
rp_id=_rp_id(),
|
|
allow_credentials=allow,
|
|
user_verification=UserVerificationRequirement.PREFERRED,
|
|
)
|
|
session["passkey_auth_challenge"] = _b64url_encode(options.challenge)
|
|
session["passkey_auth_user_id"] = user.id
|
|
return options
|
|
|
|
|
|
def verify_authentication(credential_json):
|
|
challenge = session.pop("passkey_auth_challenge", None)
|
|
user_id = session.pop("passkey_auth_user_id", None)
|
|
if not challenge or not user_id:
|
|
raise ValueError("Сессия входа passkey истекла")
|
|
|
|
credential_id = credential_json.get("id") or credential_json.get("rawId")
|
|
if isinstance(credential_id, dict):
|
|
credential_id = credential_id.get("id")
|
|
if not credential_id:
|
|
raise ValueError("Некорректные данные passkey")
|
|
|
|
passkey = UserPasskey.query.filter_by(
|
|
user_id=user_id, credential_id=credential_id
|
|
).first()
|
|
if not passkey:
|
|
passkey = UserPasskey.query.filter_by(credential_id=credential_id).first()
|
|
if not passkey:
|
|
raise ValueError("Passkey не найден")
|
|
|
|
verification = verify_authentication_response(
|
|
credential=parse_authentication_credential_json(json.dumps(credential_json)),
|
|
expected_challenge=_b64url_decode(challenge),
|
|
expected_rp_id=_rp_id(),
|
|
expected_origin=_origin(),
|
|
credential_public_key=_b64url_decode(passkey.public_key),
|
|
credential_current_sign_count=passkey.sign_count,
|
|
)
|
|
|
|
passkey.sign_count = verification.new_sign_count
|
|
passkey.last_used_at = datetime.now(timezone.utc)
|
|
db.session.commit()
|
|
return passkey.user
|
|
|
|
|
|
def delete_passkey(user, passkey_id):
|
|
passkey = UserPasskey.query.filter_by(id=passkey_id, user_id=user.id).first()
|
|
if not passkey:
|
|
return False
|
|
db.session.delete(passkey)
|
|
db.session.commit()
|
|
return True
|