import base64 import os from datetime import datetime, timezone import json from flask import current_app, session from webauthn import ( generate_authentication_options, generate_registration_options, verify_authentication_response, verify_registration_response, ) from webauthn.helpers.parse_authentication_credential_json import ( parse_authentication_credential_json, ) from webauthn.helpers.parse_registration_credential_json import ( parse_registration_credential_json, ) from webauthn.helpers.structs import ( AuthenticatorSelectionCriteria, PublicKeyCredentialDescriptor, ResidentKeyRequirement, UserVerificationRequirement, ) from app import db from app.models import UserPasskey def _rp_id(): return os.getenv("WEBAUTHN_RP_ID", "localhost") def _rp_name(): return os.getenv("WEBAUTHN_RP_NAME", "PhotoHost") def _origin(): return os.getenv("WEBAUTHN_ORIGIN", "http://localhost:8080") def _b64url_encode(data): return base64.urlsafe_b64encode(data).rstrip(b"=").decode("utf-8") def _b64url_decode(data): padding = "=" * (-len(data) % 4) return base64.urlsafe_b64decode(data + padding) def registration_options(user): existing = UserPasskey.query.filter_by(user_id=user.id).all() exclude = [ PublicKeyCredentialDescriptor(id=_b64url_decode(item.credential_id)) for item in existing ] options = generate_registration_options( rp_id=_rp_id(), rp_name=_rp_name(), user_id=str(user.id).encode("utf-8"), user_name=user.username, user_display_name=user.username, exclude_credentials=exclude, authenticator_selection=AuthenticatorSelectionCriteria( resident_key=ResidentKeyRequirement.PREFERRED, user_verification=UserVerificationRequirement.PREFERRED, ), ) session["passkey_reg_challenge"] = _b64url_encode(options.challenge) return options def verify_registration(user, credential_json, name): challenge = session.pop("passkey_reg_challenge", None) if not challenge: raise ValueError("Сессия регистрации passkey истекла") verification = verify_registration_response( credential=parse_registration_credential_json(json.dumps(credential_json)), expected_challenge=_b64url_decode(challenge), expected_rp_id=_rp_id(), expected_origin=_origin(), ) passkey = UserPasskey( user_id=user.id, credential_id=credential_json.get("id") or _b64url_encode(verification.credential_id), public_key=_b64url_encode(verification.credential_public_key), sign_count=verification.sign_count, name=(name or "Passkey").strip()[:120] or "Passkey", ) db.session.add(passkey) db.session.commit() return passkey def authentication_options(user): passkeys = UserPasskey.query.filter_by(user_id=user.id).all() if not passkeys: raise ValueError("Passkey не настроен") allow = [ PublicKeyCredentialDescriptor(id=_b64url_decode(item.credential_id)) for item in passkeys ] options = generate_authentication_options( rp_id=_rp_id(), allow_credentials=allow, user_verification=UserVerificationRequirement.PREFERRED, ) session["passkey_auth_challenge"] = _b64url_encode(options.challenge) session["passkey_auth_user_id"] = user.id return options def verify_authentication(credential_json): challenge = session.pop("passkey_auth_challenge", None) user_id = session.pop("passkey_auth_user_id", None) if not challenge or not user_id: raise ValueError("Сессия входа passkey истекла") credential_id = credential_json.get("id") or credential_json.get("rawId") if isinstance(credential_id, dict): credential_id = credential_id.get("id") if not credential_id: raise ValueError("Некорректные данные passkey") passkey = UserPasskey.query.filter_by( user_id=user_id, credential_id=credential_id ).first() if not passkey: passkey = UserPasskey.query.filter_by(credential_id=credential_id).first() if not passkey: raise ValueError("Passkey не найден") verification = verify_authentication_response( credential=parse_authentication_credential_json(json.dumps(credential_json)), expected_challenge=_b64url_decode(challenge), expected_rp_id=_rp_id(), expected_origin=_origin(), credential_public_key=_b64url_decode(passkey.public_key), credential_current_sign_count=passkey.sign_count, ) passkey.sign_count = verification.new_sign_count passkey.last_used_at = datetime.now(timezone.utc) db.session.commit() return passkey.user def delete_passkey(user, passkey_id): passkey = UserPasskey.query.filter_by(id=passkey_id, user_id=user.id).first() if not passkey: return False db.session.delete(passkey) db.session.commit() return True