"""In-app OIDC authentication against Authentik.""" from __future__ import annotations import base64 import hashlib import hmac import json import os import secrets import time from typing import Any import requests import streamlit as st from authlib.integrations.requests_client import OAuth2Session SESSION_USER_KEY = "oidc_username" SESSION_SUB_KEY = "oidc_sub" STATE_TTL_SECONDS = 600 def _truthy(value: str | None, default: bool = False) -> bool: if value is None: return default return value.strip().lower() in {"1", "true", "yes", "on"} def auth_required() -> bool: return _truthy(os.environ.get("AUTH_REQUIRED"), default=True) def oidc_issuer() -> str: raw = os.environ.get("OIDC_ISSUER") or os.environ.get("AUTHENTIK_ISSUER", "") return raw.rstrip("/") + "/" def oidc_client_id() -> str: return os.environ.get("OIDC_CLIENT_ID", "").strip() def oidc_client_secret() -> str: return os.environ.get("OIDC_CLIENT_SECRET", "").strip() def oidc_scopes() -> str: return os.environ.get("OIDC_SCOPES", "openid profile email") def redirect_uri() -> str: explicit = os.environ.get("OIDC_REDIRECT_URI", "").strip() if explicit: return explicit.rstrip("/") service_url = os.environ.get("SERVICE_URL_STREAMLIT_8501", "").strip() if service_url: return service_url.rstrip("/") return "http://localhost:8501" @st.cache_data(ttl=3600, show_spinner=False) def discovery_document(issuer: str) -> dict[str, Any]: url = f"{issuer.rstrip('/')}/.well-known/openid-configuration" response = requests.get(url, timeout=15) response.raise_for_status() return response.json() def _oauth_client() -> OAuth2Session: return OAuth2Session( client_id=oidc_client_id(), client_secret=oidc_client_secret(), redirect_uri=redirect_uri(), scope=oidc_scopes(), ) def _sign_state(payload: dict[str, Any]) -> str: secret = oidc_client_secret().encode() body = base64.urlsafe_b64encode(json.dumps(payload, sort_keys=True).encode()).decode() signature = hmac.new(secret, body.encode(), hashlib.sha256).hexdigest() return f"{body}.{signature}" def _verify_state(state: str) -> bool: try: body, signature = state.rsplit(".", 1) except ValueError: return False expected = hmac.new(oidc_client_secret().encode(), body.encode(), hashlib.sha256).hexdigest() if not hmac.compare_digest(signature, expected): return False payload = json.loads(base64.urlsafe_b64decode(body.encode()).decode()) issued_at = int(payload.get("ts", 0)) if time.time() - issued_at > STATE_TTL_SECONDS: return False return True def _create_state() -> str: payload = { "nonce": secrets.token_urlsafe(16), "ts": int(time.time()), } return _sign_state(payload) def _username_from_userinfo(userinfo: dict[str, Any]) -> str: for key in ("preferred_username", "email", "name", "sub"): value = userinfo.get(key) if value: return str(value) return "unknown" def _dev_bypass() -> str | None: if auth_required(): return None dev_user = os.environ.get("DEV_USER", "").strip() return dev_user or "anonymous" def _config_error() -> str | None: if not auth_required(): return None if not oidc_client_id(): return "OIDC_CLIENT_ID is not set." if not oidc_client_secret(): return "OIDC_CLIENT_SECRET is not set." if not (os.environ.get("OIDC_ISSUER") or os.environ.get("AUTHENTIK_ISSUER")): return "OIDC_ISSUER (or AUTHENTIK_ISSUER) is not set." return None def logout() -> None: st.session_state.pop(SESSION_USER_KEY, None) st.session_state.pop(SESSION_SUB_KEY, None) st.query_params.clear() def get_current_user() -> str | None: """Return username after in-app OIDC login, or stop and render login UI.""" if st.session_state.get(SESSION_USER_KEY): return str(st.session_state[SESSION_USER_KEY]) bypass = _dev_bypass() if bypass: return bypass config_error = _config_error() if config_error: st.error(config_error) st.caption("Set OIDC_CLIENT_ID, OIDC_CLIENT_SECRET, and OIDC_ISSUER in Coolify.") st.stop() return None query = st.query_params code = query.get("code") state = query.get("state") oauth_error = query.get("error") if oauth_error: st.error(f"Authentik login failed: {oauth_error}") st.stop() return None try: metadata = discovery_document(oidc_issuer()) except Exception as exc: st.error(f"Could not load OIDC discovery document: {exc}") st.stop() return None client = _oauth_client() if code: if not state or not _verify_state(state): st.error("Invalid or expired OAuth state. Sign in again.") st.query_params.clear() st.stop() return None try: auth_response = f"{redirect_uri()}?code={code}&state={state}" client.fetch_token( metadata["token_endpoint"], authorization_response=auth_response, ) userinfo = client.get(metadata["userinfo_endpoint"]).json() except Exception as exc: st.error(f"Token exchange failed: {exc}") st.caption(f"Redirect URI in use: `{redirect_uri()}`") st.stop() return None st.session_state[SESSION_USER_KEY] = _username_from_userinfo(userinfo) st.session_state[SESSION_SUB_KEY] = str(userinfo.get("sub", "")) st.query_params.clear() st.rerun() return None state_token = _create_state() auth_url, _ = client.create_authorization_url( metadata["authorization_endpoint"], state=state_token, ) st.title("QuantTrade") st.info("Sign in with Authentik to access backtests and saved strategies.") st.link_button("Login with Authentik", auth_url, type="primary") st.caption( f"Register redirect URI `{redirect_uri()}` in your Authentik OIDC provider " f"({oidc_client_id()})." ) st.stop() return None