Initial QuantTrade stack for Coolify deployment.

Streamlit + VectorBT dashboard, Parquet harvester with nightly cron, Authentik header auth, SQLite strategy persistence, and Bugsink telemetry.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
epistemophiliac 2026-06-19 00:46:51 -04:00
commit b5db15d6ab
17 changed files with 946 additions and 0 deletions

11
.env.example Normal file
View file

@ -0,0 +1,11 @@
# Bugsink (Sentry-compatible DSN from bugsink.aexoradao.com project settings)
BUGSINK_DSN=
# Local dev only — simulates Authentik proxy header when not behind auth
DEV_USER=dev@local
# Core tickers (comma-separated)
CORE_TICKERS=SPY,QQQ,AAPL,MSFT,GOOGL,AMZN,NVDA,META,IWM,TLT
# Historical seed window (years)
SEED_YEARS=5

10
.gitignore vendored Normal file
View file

@ -0,0 +1,10 @@
.env
__pycache__/
*.py[cod]
*.egg-info/
.venv/
venv/
data/
*.db
.DS_Store
.streamlit/secrets.toml

7
.streamlit/config.toml Normal file
View file

@ -0,0 +1,7 @@
[server]
headless = true
enableCORS = false
enableXsrfProtection = false
[browser]
gatherUsageStats = false

31
Dockerfile Normal file
View file

@ -0,0 +1,31 @@
FROM python:3.11-slim-bookworm
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1 \
PARQUET_DIR=/data/parquet \
STRATEGY_DB_PATH=/data/strategies/strategies.db
WORKDIR /app
RUN apt-get update \
&& apt-get install -y --no-install-recommends bash curl \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY telemetry.py auth.py strategy_db.py backtest.py app.py sync.py ./
COPY .streamlit /app/.streamlit
EXPOSE 8501
HEALTHCHECK --interval=30s --timeout=5s --start-period=20s --retries=3 \
CMD curl -sf http://127.0.0.1:8501/_stcore/health || exit 1
CMD ["streamlit", "run", "app.py", \
"--server.port=8501", \
"--server.address=0.0.0.0", \
"--browser.gatherUsageStats=false", \
"--server.enableCORS=false", \
"--server.enableXsrfProtection=false"]

27
Dockerfile.harvester Normal file
View file

@ -0,0 +1,27 @@
FROM python:3.11-slim-bookworm
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1 \
PARQUET_DIR=/data/parquet \
TZ=America/New_York
WORKDIR /app
RUN apt-get update \
&& apt-get install -y --no-install-recommends bash cron curl \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY telemetry.py sync.py ./
COPY scripts/harvester-entrypoint.sh /usr/local/bin/harvester-entrypoint.sh
COPY scripts/run-daily-sync.sh /usr/local/bin/run-daily-sync.sh
COPY scripts/cron-sync /etc/cron.d/quant-sync
RUN chmod +x /usr/local/bin/harvester-entrypoint.sh /usr/local/bin/run-daily-sync.sh \
&& chmod 0644 /etc/cron.d/quant-sync \
&& crontab /etc/cron.d/quant-sync
ENTRYPOINT ["/usr/local/bin/harvester-entrypoint.sh"]

90
README.md Normal file
View file

@ -0,0 +1,90 @@
# QuantTrade
Local-first quantitative backtesting on Coolify: Streamlit UI, VectorBT engine, Parquet market data, nightly Yahoo Finance sync, Authentik OIDC via reverse proxy, and SQLite strategy persistence.
## Architecture
| Layer | Technology |
|-------|------------|
| Auth | Authentik (`auth.aexoradao.com`) via reverse proxy headers |
| UI | Streamlit (`streamlit` service, port 8501) |
| Engine | VectorBT + NumPy |
| Market data | Parquet volume (`parquet-data`) |
| Ingestion | `harvester` cron @ 17:00 America/New_York (weekdays) |
| Strategies | SQLite on `strategy-data` volume, keyed by proxy username |
| Telemetry | Bugsink via `sentry-sdk` (`bugsink.aexoradao.com`) |
## Coolify deployment
1. Create a **Docker Compose** resource pointing at this repo.
2. Assign your public domain to the **`streamlit`** service on port **8501**.
3. Set environment variables in Coolify (first deploy extracts defaults from compose):
- `BUGSINK_DSN` — DSN from your Bugsink project
- `CORE_TICKERS` — optional comma-separated tickers
- `DEV_USER` — only for unauthenticated local testing
4. Protect the domain in your reverse proxy with Authentik forward auth.
### Authentik / proxy headers
After login, the proxy must forward one of these headers to Streamlit:
- `X-Forwarded-User` (recommended)
- `X-Authentik-Username`
- `Remote-User`
The app reads them via Streamlit websocket headers (`auth.get_current_user()`). Saved strategies are scoped to that username.
Example Traefik middleware (adjust provider labels to your stack):
```yaml
# Forward auth endpoint on Authentik
http:
middlewares:
authentik:
forwardAuth:
address: https://auth.aexoradao.com/outpost.goauthentik.io/auth/traefik
trustForwardHeader: true
authResponseHeaders:
- X-authentik-username
- X-authentik-uid
- X-Forwarded-User
```
Map `X-authentik-username``X-Forwarded-User` in your proxy if Streamlit only sees the latter.
## Services
- **`data-seed`** — one-shot 5-year historical download into Parquet (idempotent).
- **`harvester`** — cron container; appends daily bars after US cash close.
- **`streamlit`** — dashboard, backtests, save/load strategies.
## Local development
```bash
cp .env.example .env
python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
python sync.py --seed
DEV_USER=you@example.com streamlit run app.py
```
## Manual sync
```bash
python sync.py --seed # full history
python sync.py --daily # append latest bars
```
## Strategy storage
SQLite path: `/data/strategies/strategies.db` (Docker volume `strategy-data`). Each row stores username, name, ticker, and JSON parameters (MA windows, cash, fees).
## Bugsink
Set `BUGSINK_DSN` to your project DSN, e.g.:
```text
http://<public-key>@bugsink.aexoradao.com/<project-id>
```
Both `app.py` and `sync.py` initialize the Sentry-compatible SDK with tracing disabled for Bugsink compatibility.

179
app.py Normal file
View file

@ -0,0 +1,179 @@
"""QuantTrade Streamlit dashboard."""
from __future__ import annotations
import os
import pandas as pd
import plotly.graph_objects as go
import streamlit as st
from plotly.subplots import make_subplots
from auth import get_current_user
from backtest import load_ohlcv, run_ma_crossover
from strategy_db import delete_strategy, init_db, list_strategies, load_strategy, save_strategy
from telemetry import capture_exception, init_telemetry
init_telemetry("quant-streamlit")
init_db()
st.set_page_config(
page_title="QuantTrade",
page_icon="📈",
layout="wide",
)
DEFAULT_TICKERS = os.environ.get(
"CORE_TICKERS",
"SPY,QQQ,AAPL,MSFT,GOOGL,AMZN,NVDA,META,IWM,TLT",
).split(",")
def render_equity_chart(result) -> None:
fig = make_subplots(
rows=2,
cols=1,
shared_xaxes=True,
vertical_spacing=0.08,
row_heights=[0.65, 0.35],
subplot_titles=(f"{result.ticker} Price", "Strategy Equity"),
)
fig.add_trace(
go.Scatter(x=result.price.index, y=result.price.values, name="Close", line=dict(color="#60a5fa")),
row=1,
col=1,
)
fig.add_trace(
go.Scatter(
x=result.equity_curve.index,
y=result.equity_curve.values,
name="Equity",
line=dict(color="#34d399"),
),
row=2,
col=1,
)
fig.update_layout(height=640, template="plotly_dark", margin=dict(l=20, r=20, t=40, b=20))
st.plotly_chart(fig, use_container_width=True)
def main() -> None:
user = get_current_user()
st.title("QuantTrade")
st.caption("VectorBT backtests on local Parquet market data")
with st.sidebar:
st.subheader("Account")
st.write(f"Signed in as **{user}**")
st.divider()
st.subheader("Strategy")
ticker = st.selectbox(
"Ticker",
options=[t.strip().upper() for t in DEFAULT_TICKERS if t.strip()],
index=0,
)
fast_window = st.slider("Fast MA", min_value=5, max_value=100, value=20, step=1)
slow_window = st.slider("Slow MA", min_value=20, max_value=250, value=50, step=1)
init_cash = st.number_input("Initial cash", min_value=1000.0, value=10_000.0, step=1000.0)
fees = st.number_input("Fees (fraction)", min_value=0.0, max_value=0.05, value=0.001, step=0.0005)
run_clicked = st.button("Run Backtest", type="primary", use_container_width=True)
st.divider()
st.subheader("Saved Strategies")
saved = list_strategies(user)
saved_names = [s.name for s in saved]
selected_name = st.selectbox("Load strategy", options=[""] + saved_names)
strategy_name = st.text_input("Strategy name", placeholder="My SPY crossover")
col_save, col_delete = st.columns(2)
with col_save:
save_clicked = st.button("Save Strategy", use_container_width=True)
with col_delete:
delete_clicked = st.button("Delete", use_container_width=True)
params = {
"fast_window": fast_window,
"slow_window": slow_window,
"init_cash": init_cash,
"fees": fees,
}
if save_clicked:
if not strategy_name.strip():
st.sidebar.error("Enter a strategy name before saving.")
else:
save_strategy(user, strategy_name.strip(), ticker, params)
st.sidebar.success(f"Saved '{strategy_name.strip()}'.")
st.rerun()
if delete_clicked and selected_name != "":
delete_strategy(user, selected_name)
st.sidebar.success(f"Deleted '{selected_name}'.")
st.rerun()
active_ticker = ticker
active_params = dict(params)
if selected_name != "":
loaded = load_strategy(user, selected_name)
if loaded:
active_ticker = loaded.ticker
active_params.update(loaded.params)
st.info(f"Loaded strategy **{loaded.name}** ({loaded.ticker}). Adjust sliders or run.")
if run_clicked or selected_name != "":
try:
load_ohlcv(active_ticker)
result = run_ma_crossover(
ticker=active_ticker,
fast_window=int(active_params["fast_window"]),
slow_window=int(active_params["slow_window"]),
init_cash=float(active_params.get("init_cash", init_cash)),
fees=float(active_params.get("fees", fees)),
)
c1, c2, c3, c4 = st.columns(4)
c1.metric("Sharpe Ratio", f"{result.sharpe_ratio:.2f}")
c2.metric("Max Drawdown", f"{result.max_drawdown:.1%}")
c3.metric("Total Return", f"{result.total_return:.1%}")
c4.metric("Bars", f"{len(result.price):,}")
render_equity_chart(result)
with st.expander("Raw stats"):
st.write(
pd.DataFrame(
{
"Metric": ["Ticker", "Fast MA", "Slow MA", "Sharpe", "Max DD", "Return"],
"Value": [
result.ticker,
result.fast_window,
result.slow_window,
result.sharpe_ratio,
result.max_drawdown,
result.total_return,
],
}
)
)
except FileNotFoundError:
st.warning(
f"No Parquet data for **{active_ticker}** yet. "
"Wait for the harvester seed job or check container logs."
)
except ValueError as exc:
st.error(str(exc))
except Exception as exc:
capture_exception(exc)
st.error("Backtest failed. The error was reported to Bugsink.")
st.exception(exc)
else:
st.info("Configure parameters in the sidebar and click **Run Backtest**.")
if __name__ == "__main__":
main()

45
auth.py Normal file
View file

@ -0,0 +1,45 @@
"""Read Authentik / reverse-proxy identity headers in Streamlit."""
from __future__ import annotations
import os
from typing import Mapping
HEADER_CANDIDATES = (
"X-Forwarded-User",
"X-Authentik-Username",
"X-Authentik-Uid",
"Remote-User",
"X-Forwarded-Email",
)
def _normalize(value: str | None) -> str | None:
if not value:
return None
cleaned = value.strip()
return cleaned or None
def username_from_headers(headers: Mapping[str, str]) -> str | None:
lowered = {k.lower(): v for k, v in headers.items()}
for name in HEADER_CANDIDATES:
value = _normalize(lowered.get(name.lower()))
if value:
return value
return None
def get_current_user() -> str:
"""Return the authenticated username from proxy-injected headers."""
try:
from streamlit.web.server.websocket_headers import _get_websocket_headers
headers = _get_websocket_headers() or {}
user = username_from_headers(headers)
if user:
return user
except Exception:
pass
return os.environ.get("DEV_USER", "anonymous")

88
backtest.py Normal file
View file

@ -0,0 +1,88 @@
"""VectorBT backtest engine reading local Parquet OHLCV data."""
from __future__ import annotations
import os
from dataclasses import dataclass
from pathlib import Path
import pandas as pd
import vectorbt as vbt
@dataclass(frozen=True)
class BacktestResult:
ticker: str
fast_window: int
slow_window: int
sharpe_ratio: float
max_drawdown: float
total_return: float
equity_curve: pd.Series
price: pd.Series
def parquet_dir() -> Path:
return Path(os.environ.get("PARQUET_DIR", "/data/parquet"))
def load_ohlcv(ticker: str) -> pd.DataFrame:
path = parquet_dir() / f"{ticker.upper()}.parquet"
if not path.exists():
raise FileNotFoundError(f"No Parquet file for {ticker.upper()} at {path}")
df = pd.read_parquet(path)
if "Date" in df.columns:
df = df.set_index("Date")
df.index = pd.to_datetime(df.index)
df = df.sort_index()
return df
def run_ma_crossover(
ticker: str,
fast_window: int,
slow_window: int,
init_cash: float = 10_000.0,
fees: float = 0.001,
) -> BacktestResult:
if fast_window >= slow_window:
raise ValueError("Fast MA window must be smaller than slow MA window")
ohlcv = load_ohlcv(ticker)
close = ohlcv["Close"].astype(float)
fast_ma = vbt.MA.run(close, fast_window, short_name="fast")
slow_ma = vbt.MA.run(close, slow_window, short_name="slow")
entries = fast_ma.ma_crossed_above(slow_ma)
exits = fast_ma.ma_crossed_below(slow_ma)
portfolio = vbt.Portfolio.from_signals(
close,
entries=entries,
exits=exits,
init_cash=init_cash,
fees=fees,
freq="1D",
)
stats = portfolio.stats()
sharpe = float(stats.get("Sharpe Ratio", 0.0) or 0.0)
max_dd = float(stats.get("Max Drawdown [%]", 0.0) or 0.0) / 100.0
total_return = float(stats.get("Total Return [%]", 0.0) or 0.0) / 100.0
equity = portfolio.value()
if isinstance(equity, pd.DataFrame):
equity = equity.iloc[:, 0]
return BacktestResult(
ticker=ticker.upper(),
fast_window=fast_window,
slow_window=slow_window,
sharpe_ratio=sharpe,
max_drawdown=max_dd,
total_return=total_return,
equity_curve=equity,
price=close,
)

74
docker-compose.yml Normal file
View file

@ -0,0 +1,74 @@
services:
data-seed:
build:
context: .
dockerfile: Dockerfile.harvester
exclude_from_hc: true
restart: "no"
environment:
- 'BUGSINK_DSN=${BUGSINK_DSN:-}'
- 'APP_ENV=${APP_ENV:-production}'
- 'APP_RELEASE=${APP_RELEASE:-quant-web@1.0.0}'
- 'CORE_TICKERS=${CORE_TICKERS:-SPY,QQQ,AAPL,MSFT,GOOGL,AMZN,NVDA,META,IWM,TLT}'
- 'SEED_YEARS=${SEED_YEARS:-5}'
- 'PARQUET_DIR=/data/parquet'
- 'TZ=America/New_York'
volumes:
- 'parquet-data:/data/parquet'
entrypoint: ["bash", "-c"]
command:
- >
if [ -f /data/parquet/.seed_complete ]; then echo "Parquet seed already present"; exit 0; fi;
python /app/sync.py --seed;
touch /data/parquet/.seed_complete;
echo "Seed finished";
harvester:
build:
context: .
dockerfile: Dockerfile.harvester
restart: unless-stopped
depends_on:
data-seed:
condition: service_completed_successfully
environment:
- 'BUGSINK_DSN=${BUGSINK_DSN:-}'
- 'APP_ENV=${APP_ENV:-production}'
- 'APP_RELEASE=${APP_RELEASE:-quant-web@1.0.0}'
- 'CORE_TICKERS=${CORE_TICKERS:-SPY,QQQ,AAPL,MSFT,GOOGL,AMZN,NVDA,META,IWM,TLT}'
- 'SEED_YEARS=${SEED_YEARS:-5}'
- 'PARQUET_DIR=/data/parquet'
- 'TZ=America/New_York'
volumes:
- 'parquet-data:/data/parquet'
streamlit:
build:
context: .
dockerfile: Dockerfile
restart: unless-stopped
depends_on:
data-seed:
condition: service_completed_successfully
environment:
- SERVICE_URL_STREAMLIT_8501
- 'BUGSINK_DSN=${BUGSINK_DSN:-}'
- 'APP_ENV=${APP_ENV:-production}'
- 'APP_RELEASE=${APP_RELEASE:-quant-web@1.0.0}'
- 'CORE_TICKERS=${CORE_TICKERS:-SPY,QQQ,AAPL,MSFT,GOOGL,AMZN,NVDA,META,IWM,TLT}'
- 'PARQUET_DIR=/data/parquet'
- 'STRATEGY_DB_PATH=/data/strategies/strategies.db'
- 'DEV_USER=${DEV_USER:-anonymous}'
volumes:
- 'parquet-data:/data/parquet'
- 'strategy-data:/data/strategies'
healthcheck:
test: ['CMD-SHELL', 'curl -sf http://127.0.0.1:8501/_stcore/health || exit 1']
interval: 30s
timeout: 5s
start_period: 30s
retries: 3
volumes:
parquet-data:
strategy-data:

8
requirements.txt Normal file
View file

@ -0,0 +1,8 @@
streamlit>=1.32.0
vectorbt>=0.26.0
yfinance>=0.2.36
pandas>=2.1.0
numpy>=1.26.0
pyarrow>=15.0.0
plotly>=5.18.0
sentry-sdk>=2.0.0

2
scripts/cron-sync Normal file
View file

@ -0,0 +1,2 @@
# m h dom mon dow command
0 17 * * 1-5 root /usr/local/bin/run-daily-sync.sh >> /var/log/quant-sync.log 2>&1

View file

@ -0,0 +1,16 @@
#!/bin/bash
set -euo pipefail
echo "Quant harvester starting (TZ=${TZ:-UTC})"
if [ ! -f "/data/parquet/.seed_complete" ]; then
echo "Running initial historical seed..."
python /app/sync.py --seed
touch /data/parquet/.seed_complete
echo "Seed complete."
else
echo "Seed marker found; skipping full history download."
fi
echo "Launching cron (weekdays 17:00 America/New_York)..."
exec cron -f

13
scripts/run-daily-sync.sh Normal file
View file

@ -0,0 +1,13 @@
#!/bin/bash
set -euo pipefail
export PARQUET_DIR="${PARQUET_DIR:-/data/parquet}"
export CORE_TICKERS="${CORE_TICKERS:-SPY,QQQ,AAPL,MSFT,GOOGL,AMZN,NVDA,META,IWM,TLT}"
export SEED_YEARS="${SEED_YEARS:-5}"
export BUGSINK_DSN="${BUGSINK_DSN:-}"
export APP_ENV="${APP_ENV:-production}"
export APP_RELEASE="${APP_RELEASE:-quant-web@1.0.0}"
export TZ="${TZ:-America/New_York}"
cd /app
exec /usr/local/bin/python sync.py --daily

135
strategy_db.py Normal file
View file

@ -0,0 +1,135 @@
"""SQLite persistence for user-saved strategies."""
from __future__ import annotations
import json
import os
import sqlite3
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any
@dataclass(frozen=True)
class SavedStrategy:
id: int
username: str
name: str
ticker: str
params: dict[str, Any]
created_at: str
def _db_path() -> str:
return os.environ.get("STRATEGY_DB_PATH", "/data/strategies/strategies.db")
def init_db() -> None:
path = _db_path()
os.makedirs(os.path.dirname(path), exist_ok=True)
with _connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS strategies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
name TEXT NOT NULL,
ticker TEXT NOT NULL,
params_json TEXT NOT NULL,
created_at TEXT NOT NULL,
UNIQUE(username, name)
)
"""
)
conn.commit()
@contextmanager
def _connect():
conn = sqlite3.connect(_db_path())
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
def save_strategy(
username: str,
name: str,
ticker: str,
params: dict[str, Any],
) -> None:
created_at = datetime.now(timezone.utc).isoformat()
with _connect() as conn:
conn.execute(
"""
INSERT INTO strategies (username, name, ticker, params_json, created_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(username, name) DO UPDATE SET
ticker = excluded.ticker,
params_json = excluded.params_json,
created_at = excluded.created_at
""",
(username, name.strip(), ticker.upper(), json.dumps(params), created_at),
)
conn.commit()
def list_strategies(username: str) -> list[SavedStrategy]:
with _connect() as conn:
rows = conn.execute(
"""
SELECT id, username, name, ticker, params_json, created_at
FROM strategies
WHERE username = ?
ORDER BY created_at DESC
""",
(username,),
).fetchall()
return [
SavedStrategy(
id=row["id"],
username=row["username"],
name=row["name"],
ticker=row["ticker"],
params=json.loads(row["params_json"]),
created_at=row["created_at"],
)
for row in rows
]
def load_strategy(username: str, name: str) -> SavedStrategy | None:
with _connect() as conn:
row = conn.execute(
"""
SELECT id, username, name, ticker, params_json, created_at
FROM strategies
WHERE username = ? AND name = ?
""",
(username, name),
).fetchone()
if row is None:
return None
return SavedStrategy(
id=row["id"],
username=row["username"],
name=row["name"],
ticker=row["ticker"],
params=json.loads(row["params_json"]),
created_at=row["created_at"],
)
def delete_strategy(username: str, name: str) -> None:
with _connect() as conn:
conn.execute(
"DELETE FROM strategies WHERE username = ? AND name = ?",
(username, name),
)
conn.commit()

167
sync.py Normal file
View file

@ -0,0 +1,167 @@
#!/usr/bin/env python3
"""Download Yahoo Finance OHLCV data and persist as Parquet files."""
from __future__ import annotations
import argparse
import logging
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
from zoneinfo import ZoneInfo
import pandas as pd
import yfinance as yf
from telemetry import capture_exception, init_telemetry
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
logger = logging.getLogger("sync")
ET = ZoneInfo("America/New_York")
def parquet_dir() -> Path:
path = Path(os.environ.get("PARQUET_DIR", "/data/parquet"))
path.mkdir(parents=True, exist_ok=True)
return path
def core_tickers() -> list[str]:
raw = os.environ.get(
"CORE_TICKERS",
"SPY,QQQ,AAPL,MSFT,GOOGL,AMZN,NVDA,META,IWM,TLT",
)
return [t.strip().upper() for t in raw.split(",") if t.strip()]
def seed_years() -> int:
return int(os.environ.get("SEED_YEARS", "5"))
def parquet_path(ticker: str) -> Path:
return parquet_dir() / f"{ticker.upper()}.parquet"
def normalize_frame(df: pd.DataFrame) -> pd.DataFrame:
if isinstance(df.columns, pd.MultiIndex):
df.columns = [col[0] if isinstance(col, tuple) else col for col in df.columns]
df = df.reset_index()
if "Datetime" in df.columns and "Date" not in df.columns:
df = df.rename(columns={"Datetime": "Date"})
if "Date" not in df.columns and "index" in df.columns:
df = df.rename(columns={"index": "Date"})
keep = [c for c in ["Date", "Open", "High", "Low", "Close", "Volume"] if c in df.columns]
df = df[keep].copy()
df["Date"] = pd.to_datetime(df["Date"]).dt.tz_localize(None)
df = df.drop_duplicates(subset=["Date"], keep="last").sort_values("Date")
return df
def download_history(ticker: str, start: datetime, end: datetime | None = None) -> pd.DataFrame:
end = end or datetime.now(tz=ET)
raw = yf.download(
ticker,
start=start.date(),
end=(end + timedelta(days=1)).date(),
auto_adjust=True,
progress=False,
threads=False,
)
if raw.empty:
raise RuntimeError(f"No data returned for {ticker}")
return normalize_frame(raw)
def save_ticker(ticker: str, frame: pd.DataFrame) -> None:
path = parquet_path(ticker)
frame.to_parquet(path, index=False)
logger.info("Wrote %s rows to %s", len(frame), path)
def seed_all() -> int:
tickers = core_tickers()
start = datetime.now(tz=ET) - timedelta(days=365 * seed_years())
failures = 0
for ticker in tickers:
try:
frame = download_history(ticker, start=start)
save_ticker(ticker, frame)
except Exception as exc:
failures += 1
logger.exception("Seed failed for %s", ticker)
capture_exception(exc)
return failures
def append_daily(ticker: str) -> None:
path = parquet_path(ticker)
if path.exists():
existing = pd.read_parquet(path)
existing["Date"] = pd.to_datetime(existing["Date"])
last_date = existing["Date"].max()
start = last_date.to_pydatetime()
else:
start = datetime.now(tz=ET) - timedelta(days=30)
existing = pd.DataFrame()
new_frame = download_history(ticker, start=start - timedelta(days=5))
if existing.empty:
merged = new_frame
else:
merged = (
pd.concat([existing, new_frame], ignore_index=True)
.drop_duplicates(subset=["Date"], keep="last")
.sort_values("Date")
)
save_ticker(ticker, merged)
def sync_daily() -> int:
failures = 0
for ticker in core_tickers():
try:
append_daily(ticker)
except Exception as exc:
failures += 1
logger.exception("Daily sync failed for %s", ticker)
capture_exception(exc)
return failures
def main(argv: list[str] | None = None) -> int:
init_telemetry("quant-harvester")
parser = argparse.ArgumentParser(description="Yahoo Finance -> Parquet sync")
parser.add_argument("--seed", action="store_true", help="Download full history")
parser.add_argument("--daily", action="store_true", help="Append latest daily bars")
args = parser.parse_args(argv)
if not args.seed and not args.daily:
parser.error("Specify --seed and/or --daily")
exit_code = 0
if args.seed:
logger.info("Starting historical seed for %s", ", ".join(core_tickers()))
if seed_all() > 0:
exit_code = 1
if args.daily:
logger.info("Starting daily append for %s", ", ".join(core_tickers()))
if sync_daily() > 0:
exit_code = 1
return exit_code
if __name__ == "__main__":
sys.exit(main())

43
telemetry.py Normal file
View file

@ -0,0 +1,43 @@
"""Bugsink telemetry via Sentry-compatible SDK."""
from __future__ import annotations
import logging
import os
_initialized = False
def init_telemetry(service_name: str) -> None:
global _initialized
if _initialized:
return
dsn = os.environ.get("BUGSINK_DSN", "").strip()
if not dsn:
logging.getLogger(__name__).info(
"BUGSINK_DSN not set; telemetry disabled for %s", service_name
)
_initialized = True
return
import sentry_sdk
sentry_sdk.init(
dsn=dsn,
environment=os.environ.get("APP_ENV", "production"),
release=os.environ.get("APP_RELEASE", "quant-web@1.0.0"),
traces_sample_rate=0,
send_client_reports=False,
auto_session_tracking=False,
)
sentry_sdk.set_tag("service", service_name)
_initialized = True
def capture_exception(exc: BaseException) -> None:
if not os.environ.get("BUGSINK_DSN", "").strip():
return
import sentry_sdk
sentry_sdk.capture_exception(exc)