import secrets from datetime import datetime, timezone from flask import request, session from app import db from app.models import UserSession def get_current_session_key(): return session.get("sid") def create_user_session(user, remember=False): session_key = secrets.token_hex(32) record = UserSession( user_id=user.id, session_key=session_key, ip_address=_client_ip(), user_agent=_client_user_agent(), ) db.session.add(record) db.session.commit() session["sid"] = session_key session.permanent = bool(remember) return record def ensure_user_session(user): key = get_current_session_key() if not key: return create_user_session(user) record = UserSession.query.filter_by( session_key=key, user_id=user.id, revoked=False ).first() if record: return record return create_user_session(user) def validate_user_session(user_id): key = get_current_session_key() if not key: return False return ( UserSession.query.filter_by( session_key=key, user_id=user_id, revoked=False ).first() is not None ) def touch_user_session(): key = get_current_session_key() if not key: return record = UserSession.query.filter_by(session_key=key, revoked=False).first() if record: record.last_seen_at = datetime.now(timezone.utc) db.session.commit() def list_user_sessions(user_id): return ( UserSession.query.filter_by(user_id=user_id, revoked=False) .order_by(UserSession.last_seen_at.desc()) .all() ) def revoke_session(session_id, user_id): record = UserSession.query.filter_by(id=session_id, user_id=user_id).first() if not record: return False record.revoked = True db.session.commit() return True def revoke_all_sessions(user_id, except_current=True): current_key = get_current_session_key() if except_current else None query = UserSession.query.filter_by(user_id=user_id, revoked=False) if current_key: query = query.filter(UserSession.session_key != current_key) count = query.update({"revoked": True}) db.session.commit() return count def revoke_current_session(): key = get_current_session_key() if not key: return UserSession.query.filter_by(session_key=key).update({"revoked": True}) db.session.commit() session.pop("sid", None) def _client_ip(): forwarded = request.headers.get("X-Forwarded-For", "") if forwarded: return forwarded.split(",")[0].strip() return request.remote_addr def _client_user_agent(): if request.user_agent and request.user_agent.string: return request.user_agent.string[:512] return None