0a51001791
Co-authored-by: Cursor <cursoragent@cursor.com>
111 lines
2.7 KiB
Python
111 lines
2.7 KiB
Python
import secrets
|
|
from datetime import datetime, timezone
|
|
|
|
from flask import request, session
|
|
|
|
from app import db
|
|
from app.models import UserSession
|
|
|
|
|
|
def get_current_session_key():
|
|
return session.get("sid")
|
|
|
|
|
|
def create_user_session(user, remember=False):
|
|
session_key = secrets.token_hex(32)
|
|
record = UserSession(
|
|
user_id=user.id,
|
|
session_key=session_key,
|
|
ip_address=_client_ip(),
|
|
user_agent=_client_user_agent(),
|
|
)
|
|
db.session.add(record)
|
|
db.session.commit()
|
|
session["sid"] = session_key
|
|
session.permanent = bool(remember)
|
|
return record
|
|
|
|
|
|
def ensure_user_session(user):
|
|
key = get_current_session_key()
|
|
if not key:
|
|
return create_user_session(user)
|
|
|
|
record = UserSession.query.filter_by(
|
|
session_key=key, user_id=user.id, revoked=False
|
|
).first()
|
|
if record:
|
|
return record
|
|
return create_user_session(user)
|
|
|
|
|
|
def validate_user_session(user_id):
|
|
key = get_current_session_key()
|
|
if not key:
|
|
return False
|
|
return (
|
|
UserSession.query.filter_by(
|
|
session_key=key, user_id=user_id, revoked=False
|
|
).first()
|
|
is not None
|
|
)
|
|
|
|
|
|
def touch_user_session():
|
|
key = get_current_session_key()
|
|
if not key:
|
|
return
|
|
record = UserSession.query.filter_by(session_key=key, revoked=False).first()
|
|
if record:
|
|
record.last_seen_at = datetime.now(timezone.utc)
|
|
db.session.commit()
|
|
|
|
|
|
def list_user_sessions(user_id):
|
|
return (
|
|
UserSession.query.filter_by(user_id=user_id, revoked=False)
|
|
.order_by(UserSession.last_seen_at.desc())
|
|
.all()
|
|
)
|
|
|
|
|
|
def revoke_session(session_id, user_id):
|
|
record = UserSession.query.filter_by(id=session_id, user_id=user_id).first()
|
|
if not record:
|
|
return False
|
|
record.revoked = True
|
|
db.session.commit()
|
|
return True
|
|
|
|
|
|
def revoke_all_sessions(user_id, except_current=True):
|
|
current_key = get_current_session_key() if except_current else None
|
|
query = UserSession.query.filter_by(user_id=user_id, revoked=False)
|
|
if current_key:
|
|
query = query.filter(UserSession.session_key != current_key)
|
|
count = query.update({"revoked": True})
|
|
db.session.commit()
|
|
return count
|
|
|
|
|
|
def revoke_current_session():
|
|
key = get_current_session_key()
|
|
if not key:
|
|
return
|
|
UserSession.query.filter_by(session_key=key).update({"revoked": True})
|
|
db.session.commit()
|
|
session.pop("sid", None)
|
|
|
|
|
|
def _client_ip():
|
|
forwarded = request.headers.get("X-Forwarded-For", "")
|
|
if forwarded:
|
|
return forwarded.split(",")[0].strip()
|
|
return request.remote_addr
|
|
|
|
|
|
def _client_user_agent():
|
|
if request.user_agent and request.user_agent.string:
|
|
return request.user_agent.string[:512]
|
|
return None
|