Remove proxy forward-auth header trust; app runs authorization code flow using OIDC_CLIENT_SECRET and registers redirect URI from SERVICE_URL or OIDC_REDIRECT_URI.
217 lines
6.1 KiB
Python
217 lines
6.1 KiB
Python
"""In-app OIDC authentication against Authentik."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import os
|
|
import secrets
|
|
import time
|
|
from typing import Any
|
|
|
|
import requests
|
|
import streamlit as st
|
|
from authlib.integrations.requests_client import OAuth2Session
|
|
|
|
SESSION_USER_KEY = "oidc_username"
|
|
SESSION_SUB_KEY = "oidc_sub"
|
|
STATE_TTL_SECONDS = 600
|
|
|
|
|
|
def _truthy(value: str | None, default: bool = False) -> bool:
|
|
if value is None:
|
|
return default
|
|
return value.strip().lower() in {"1", "true", "yes", "on"}
|
|
|
|
|
|
def auth_required() -> bool:
|
|
return _truthy(os.environ.get("AUTH_REQUIRED"), default=True)
|
|
|
|
|
|
def oidc_issuer() -> str:
|
|
raw = os.environ.get("OIDC_ISSUER") or os.environ.get("AUTHENTIK_ISSUER", "")
|
|
return raw.rstrip("/") + "/"
|
|
|
|
|
|
def oidc_client_id() -> str:
|
|
return os.environ.get("OIDC_CLIENT_ID", "").strip()
|
|
|
|
|
|
def oidc_client_secret() -> str:
|
|
return os.environ.get("OIDC_CLIENT_SECRET", "").strip()
|
|
|
|
|
|
def oidc_scopes() -> str:
|
|
return os.environ.get("OIDC_SCOPES", "openid profile email")
|
|
|
|
|
|
def redirect_uri() -> str:
|
|
explicit = os.environ.get("OIDC_REDIRECT_URI", "").strip()
|
|
if explicit:
|
|
return explicit.rstrip("/")
|
|
service_url = os.environ.get("SERVICE_URL_STREAMLIT_8501", "").strip()
|
|
if service_url:
|
|
return service_url.rstrip("/")
|
|
return "http://localhost:8501"
|
|
|
|
|
|
@st.cache_data(ttl=3600, show_spinner=False)
|
|
def discovery_document(issuer: str) -> dict[str, Any]:
|
|
url = f"{issuer.rstrip('/')}/.well-known/openid-configuration"
|
|
response = requests.get(url, timeout=15)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
def _oauth_client() -> OAuth2Session:
|
|
return OAuth2Session(
|
|
client_id=oidc_client_id(),
|
|
client_secret=oidc_client_secret(),
|
|
redirect_uri=redirect_uri(),
|
|
scope=oidc_scopes(),
|
|
)
|
|
|
|
|
|
def _sign_state(payload: dict[str, Any]) -> str:
|
|
secret = oidc_client_secret().encode()
|
|
body = base64.urlsafe_b64encode(json.dumps(payload, sort_keys=True).encode()).decode()
|
|
signature = hmac.new(secret, body.encode(), hashlib.sha256).hexdigest()
|
|
return f"{body}.{signature}"
|
|
|
|
|
|
def _verify_state(state: str) -> bool:
|
|
try:
|
|
body, signature = state.rsplit(".", 1)
|
|
except ValueError:
|
|
return False
|
|
|
|
expected = hmac.new(oidc_client_secret().encode(), body.encode(), hashlib.sha256).hexdigest()
|
|
if not hmac.compare_digest(signature, expected):
|
|
return False
|
|
|
|
payload = json.loads(base64.urlsafe_b64decode(body.encode()).decode())
|
|
issued_at = int(payload.get("ts", 0))
|
|
if time.time() - issued_at > STATE_TTL_SECONDS:
|
|
return False
|
|
return True
|
|
|
|
|
|
def _create_state() -> str:
|
|
payload = {
|
|
"nonce": secrets.token_urlsafe(16),
|
|
"ts": int(time.time()),
|
|
}
|
|
return _sign_state(payload)
|
|
|
|
|
|
def _username_from_userinfo(userinfo: dict[str, Any]) -> str:
|
|
for key in ("preferred_username", "email", "name", "sub"):
|
|
value = userinfo.get(key)
|
|
if value:
|
|
return str(value)
|
|
return "unknown"
|
|
|
|
|
|
def _dev_bypass() -> str | None:
|
|
if auth_required():
|
|
return None
|
|
dev_user = os.environ.get("DEV_USER", "").strip()
|
|
return dev_user or "anonymous"
|
|
|
|
|
|
def _config_error() -> str | None:
|
|
if not auth_required():
|
|
return None
|
|
if not oidc_client_id():
|
|
return "OIDC_CLIENT_ID is not set."
|
|
if not oidc_client_secret():
|
|
return "OIDC_CLIENT_SECRET is not set."
|
|
if not (os.environ.get("OIDC_ISSUER") or os.environ.get("AUTHENTIK_ISSUER")):
|
|
return "OIDC_ISSUER (or AUTHENTIK_ISSUER) is not set."
|
|
return None
|
|
|
|
|
|
def logout() -> None:
|
|
st.session_state.pop(SESSION_USER_KEY, None)
|
|
st.session_state.pop(SESSION_SUB_KEY, None)
|
|
st.query_params.clear()
|
|
|
|
|
|
def get_current_user() -> str | None:
|
|
"""Return username after in-app OIDC login, or stop and render login UI."""
|
|
if st.session_state.get(SESSION_USER_KEY):
|
|
return str(st.session_state[SESSION_USER_KEY])
|
|
|
|
bypass = _dev_bypass()
|
|
if bypass:
|
|
return bypass
|
|
|
|
config_error = _config_error()
|
|
if config_error:
|
|
st.error(config_error)
|
|
st.caption("Set OIDC_CLIENT_ID, OIDC_CLIENT_SECRET, and OIDC_ISSUER in Coolify.")
|
|
st.stop()
|
|
return None
|
|
|
|
query = st.query_params
|
|
code = query.get("code")
|
|
state = query.get("state")
|
|
oauth_error = query.get("error")
|
|
|
|
if oauth_error:
|
|
st.error(f"Authentik login failed: {oauth_error}")
|
|
st.stop()
|
|
return None
|
|
|
|
try:
|
|
metadata = discovery_document(oidc_issuer())
|
|
except Exception as exc:
|
|
st.error(f"Could not load OIDC discovery document: {exc}")
|
|
st.stop()
|
|
return None
|
|
|
|
client = _oauth_client()
|
|
|
|
if code:
|
|
if not state or not _verify_state(state):
|
|
st.error("Invalid or expired OAuth state. Sign in again.")
|
|
st.query_params.clear()
|
|
st.stop()
|
|
return None
|
|
|
|
try:
|
|
auth_response = f"{redirect_uri()}?code={code}&state={state}"
|
|
client.fetch_token(
|
|
metadata["token_endpoint"],
|
|
authorization_response=auth_response,
|
|
)
|
|
userinfo = client.get(metadata["userinfo_endpoint"]).json()
|
|
except Exception as exc:
|
|
st.error(f"Token exchange failed: {exc}")
|
|
st.caption(f"Redirect URI in use: `{redirect_uri()}`")
|
|
st.stop()
|
|
return None
|
|
|
|
st.session_state[SESSION_USER_KEY] = _username_from_userinfo(userinfo)
|
|
st.session_state[SESSION_SUB_KEY] = str(userinfo.get("sub", ""))
|
|
st.query_params.clear()
|
|
st.rerun()
|
|
return None
|
|
|
|
state_token = _create_state()
|
|
auth_url, _ = client.create_authorization_url(
|
|
metadata["authorization_endpoint"],
|
|
state=state_token,
|
|
)
|
|
|
|
st.title("QuantTrade")
|
|
st.info("Sign in with Authentik to access backtests and saved strategies.")
|
|
st.link_button("Login with Authentik", auth_url, type="primary")
|
|
st.caption(
|
|
f"Register redirect URI `{redirect_uri()}` in your Authentik OIDC provider "
|
|
f"({oidc_client_id()})."
|
|
)
|
|
st.stop()
|
|
return None
|