quant-web/auth.py
epistemophiliac 9756809b68 Move Authentik OIDC into Streamlit with client secret.
Remove proxy forward-auth header trust; app runs authorization code flow using OIDC_CLIENT_SECRET and registers redirect URI from SERVICE_URL or OIDC_REDIRECT_URI.
2026-06-19 00:58:26 -04:00

217 lines
6.1 KiB
Python

"""In-app OIDC authentication against Authentik."""
from __future__ import annotations
import base64
import hashlib
import hmac
import json
import os
import secrets
import time
from typing import Any
import requests
import streamlit as st
from authlib.integrations.requests_client import OAuth2Session
SESSION_USER_KEY = "oidc_username"
SESSION_SUB_KEY = "oidc_sub"
STATE_TTL_SECONDS = 600
def _truthy(value: str | None, default: bool = False) -> bool:
if value is None:
return default
return value.strip().lower() in {"1", "true", "yes", "on"}
def auth_required() -> bool:
return _truthy(os.environ.get("AUTH_REQUIRED"), default=True)
def oidc_issuer() -> str:
raw = os.environ.get("OIDC_ISSUER") or os.environ.get("AUTHENTIK_ISSUER", "")
return raw.rstrip("/") + "/"
def oidc_client_id() -> str:
return os.environ.get("OIDC_CLIENT_ID", "").strip()
def oidc_client_secret() -> str:
return os.environ.get("OIDC_CLIENT_SECRET", "").strip()
def oidc_scopes() -> str:
return os.environ.get("OIDC_SCOPES", "openid profile email")
def redirect_uri() -> str:
explicit = os.environ.get("OIDC_REDIRECT_URI", "").strip()
if explicit:
return explicit.rstrip("/")
service_url = os.environ.get("SERVICE_URL_STREAMLIT_8501", "").strip()
if service_url:
return service_url.rstrip("/")
return "http://localhost:8501"
@st.cache_data(ttl=3600, show_spinner=False)
def discovery_document(issuer: str) -> dict[str, Any]:
url = f"{issuer.rstrip('/')}/.well-known/openid-configuration"
response = requests.get(url, timeout=15)
response.raise_for_status()
return response.json()
def _oauth_client() -> OAuth2Session:
return OAuth2Session(
client_id=oidc_client_id(),
client_secret=oidc_client_secret(),
redirect_uri=redirect_uri(),
scope=oidc_scopes(),
)
def _sign_state(payload: dict[str, Any]) -> str:
secret = oidc_client_secret().encode()
body = base64.urlsafe_b64encode(json.dumps(payload, sort_keys=True).encode()).decode()
signature = hmac.new(secret, body.encode(), hashlib.sha256).hexdigest()
return f"{body}.{signature}"
def _verify_state(state: str) -> bool:
try:
body, signature = state.rsplit(".", 1)
except ValueError:
return False
expected = hmac.new(oidc_client_secret().encode(), body.encode(), hashlib.sha256).hexdigest()
if not hmac.compare_digest(signature, expected):
return False
payload = json.loads(base64.urlsafe_b64decode(body.encode()).decode())
issued_at = int(payload.get("ts", 0))
if time.time() - issued_at > STATE_TTL_SECONDS:
return False
return True
def _create_state() -> str:
payload = {
"nonce": secrets.token_urlsafe(16),
"ts": int(time.time()),
}
return _sign_state(payload)
def _username_from_userinfo(userinfo: dict[str, Any]) -> str:
for key in ("preferred_username", "email", "name", "sub"):
value = userinfo.get(key)
if value:
return str(value)
return "unknown"
def _dev_bypass() -> str | None:
if auth_required():
return None
dev_user = os.environ.get("DEV_USER", "").strip()
return dev_user or "anonymous"
def _config_error() -> str | None:
if not auth_required():
return None
if not oidc_client_id():
return "OIDC_CLIENT_ID is not set."
if not oidc_client_secret():
return "OIDC_CLIENT_SECRET is not set."
if not (os.environ.get("OIDC_ISSUER") or os.environ.get("AUTHENTIK_ISSUER")):
return "OIDC_ISSUER (or AUTHENTIK_ISSUER) is not set."
return None
def logout() -> None:
st.session_state.pop(SESSION_USER_KEY, None)
st.session_state.pop(SESSION_SUB_KEY, None)
st.query_params.clear()
def get_current_user() -> str | None:
"""Return username after in-app OIDC login, or stop and render login UI."""
if st.session_state.get(SESSION_USER_KEY):
return str(st.session_state[SESSION_USER_KEY])
bypass = _dev_bypass()
if bypass:
return bypass
config_error = _config_error()
if config_error:
st.error(config_error)
st.caption("Set OIDC_CLIENT_ID, OIDC_CLIENT_SECRET, and OIDC_ISSUER in Coolify.")
st.stop()
return None
query = st.query_params
code = query.get("code")
state = query.get("state")
oauth_error = query.get("error")
if oauth_error:
st.error(f"Authentik login failed: {oauth_error}")
st.stop()
return None
try:
metadata = discovery_document(oidc_issuer())
except Exception as exc:
st.error(f"Could not load OIDC discovery document: {exc}")
st.stop()
return None
client = _oauth_client()
if code:
if not state or not _verify_state(state):
st.error("Invalid or expired OAuth state. Sign in again.")
st.query_params.clear()
st.stop()
return None
try:
auth_response = f"{redirect_uri()}?code={code}&state={state}"
client.fetch_token(
metadata["token_endpoint"],
authorization_response=auth_response,
)
userinfo = client.get(metadata["userinfo_endpoint"]).json()
except Exception as exc:
st.error(f"Token exchange failed: {exc}")
st.caption(f"Redirect URI in use: `{redirect_uri()}`")
st.stop()
return None
st.session_state[SESSION_USER_KEY] = _username_from_userinfo(userinfo)
st.session_state[SESSION_SUB_KEY] = str(userinfo.get("sub", ""))
st.query_params.clear()
st.rerun()
return None
state_token = _create_state()
auth_url, _ = client.create_authorization_url(
metadata["authorization_endpoint"],
state=state_token,
)
st.title("QuantTrade")
st.info("Sign in with Authentik to access backtests and saved strategies.")
st.link_button("Login with Authentik", auth_url, type="primary")
st.caption(
f"Register redirect URI `{redirect_uri()}` in your Authentik OIDC provider "
f"({oidc_client_id()})."
)
st.stop()
return None