commit b5db15d6abce97544125505b2868f55357b19b44 Author: epistemophiliac Date: Fri Jun 19 00:46:51 2026 -0400 Initial QuantTrade stack for Coolify deployment. Streamlit + VectorBT dashboard, Parquet harvester with nightly cron, Authentik header auth, SQLite strategy persistence, and Bugsink telemetry. Co-authored-by: Cursor diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..9a2b5e5 --- /dev/null +++ b/.env.example @@ -0,0 +1,11 @@ +# Bugsink (Sentry-compatible DSN from bugsink.aexoradao.com project settings) +BUGSINK_DSN= + +# Local dev only — simulates Authentik proxy header when not behind auth +DEV_USER=dev@local + +# Core tickers (comma-separated) +CORE_TICKERS=SPY,QQQ,AAPL,MSFT,GOOGL,AMZN,NVDA,META,IWM,TLT + +# Historical seed window (years) +SEED_YEARS=5 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7a3cf72 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +.env +__pycache__/ +*.py[cod] +*.egg-info/ +.venv/ +venv/ +data/ +*.db +.DS_Store +.streamlit/secrets.toml diff --git a/.streamlit/config.toml b/.streamlit/config.toml new file mode 100644 index 0000000..a6a14a6 --- /dev/null +++ b/.streamlit/config.toml @@ -0,0 +1,7 @@ +[server] +headless = true +enableCORS = false +enableXsrfProtection = false + +[browser] +gatherUsageStats = false diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..aeaa7a9 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,31 @@ +FROM python:3.11-slim-bookworm + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + PIP_NO_CACHE_DIR=1 \ + PARQUET_DIR=/data/parquet \ + STRATEGY_DB_PATH=/data/strategies/strategies.db + +WORKDIR /app + +RUN apt-get update \ + && apt-get install -y --no-install-recommends bash curl \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY telemetry.py auth.py strategy_db.py backtest.py app.py sync.py ./ +COPY .streamlit /app/.streamlit + +EXPOSE 8501 + +HEALTHCHECK --interval=30s --timeout=5s --start-period=20s --retries=3 \ + CMD curl -sf http://127.0.0.1:8501/_stcore/health || exit 1 + +CMD ["streamlit", "run", "app.py", \ + "--server.port=8501", \ + "--server.address=0.0.0.0", \ + "--browser.gatherUsageStats=false", \ + "--server.enableCORS=false", \ + "--server.enableXsrfProtection=false"] diff --git a/Dockerfile.harvester b/Dockerfile.harvester new file mode 100644 index 0000000..14a1726 --- /dev/null +++ b/Dockerfile.harvester @@ -0,0 +1,27 @@ +FROM python:3.11-slim-bookworm + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + PIP_NO_CACHE_DIR=1 \ + PARQUET_DIR=/data/parquet \ + TZ=America/New_York + +WORKDIR /app + +RUN apt-get update \ + && apt-get install -y --no-install-recommends bash cron curl \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY telemetry.py sync.py ./ +COPY scripts/harvester-entrypoint.sh /usr/local/bin/harvester-entrypoint.sh +COPY scripts/run-daily-sync.sh /usr/local/bin/run-daily-sync.sh +COPY scripts/cron-sync /etc/cron.d/quant-sync + +RUN chmod +x /usr/local/bin/harvester-entrypoint.sh /usr/local/bin/run-daily-sync.sh \ + && chmod 0644 /etc/cron.d/quant-sync \ + && crontab /etc/cron.d/quant-sync + +ENTRYPOINT ["/usr/local/bin/harvester-entrypoint.sh"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..2c83390 --- /dev/null +++ b/README.md @@ -0,0 +1,90 @@ +# QuantTrade + +Local-first quantitative backtesting on Coolify: Streamlit UI, VectorBT engine, Parquet market data, nightly Yahoo Finance sync, Authentik OIDC via reverse proxy, and SQLite strategy persistence. + +## Architecture + +| Layer | Technology | +|-------|------------| +| Auth | Authentik (`auth.aexoradao.com`) via reverse proxy headers | +| UI | Streamlit (`streamlit` service, port 8501) | +| Engine | VectorBT + NumPy | +| Market data | Parquet volume (`parquet-data`) | +| Ingestion | `harvester` cron @ 17:00 America/New_York (weekdays) | +| Strategies | SQLite on `strategy-data` volume, keyed by proxy username | +| Telemetry | Bugsink via `sentry-sdk` (`bugsink.aexoradao.com`) | + +## Coolify deployment + +1. Create a **Docker Compose** resource pointing at this repo. +2. Assign your public domain to the **`streamlit`** service on port **8501**. +3. Set environment variables in Coolify (first deploy extracts defaults from compose): + - `BUGSINK_DSN` — DSN from your Bugsink project + - `CORE_TICKERS` — optional comma-separated tickers + - `DEV_USER` — only for unauthenticated local testing +4. Protect the domain in your reverse proxy with Authentik forward auth. + +### Authentik / proxy headers + +After login, the proxy must forward one of these headers to Streamlit: + +- `X-Forwarded-User` (recommended) +- `X-Authentik-Username` +- `Remote-User` + +The app reads them via Streamlit websocket headers (`auth.get_current_user()`). Saved strategies are scoped to that username. + +Example Traefik middleware (adjust provider labels to your stack): + +```yaml +# Forward auth endpoint on Authentik +http: + middlewares: + authentik: + forwardAuth: + address: https://auth.aexoradao.com/outpost.goauthentik.io/auth/traefik + trustForwardHeader: true + authResponseHeaders: + - X-authentik-username + - X-authentik-uid + - X-Forwarded-User +``` + +Map `X-authentik-username` → `X-Forwarded-User` in your proxy if Streamlit only sees the latter. + +## Services + +- **`data-seed`** — one-shot 5-year historical download into Parquet (idempotent). +- **`harvester`** — cron container; appends daily bars after US cash close. +- **`streamlit`** — dashboard, backtests, save/load strategies. + +## Local development + +```bash +cp .env.example .env +python -m venv .venv && source .venv/bin/activate +pip install -r requirements.txt +python sync.py --seed +DEV_USER=you@example.com streamlit run app.py +``` + +## Manual sync + +```bash +python sync.py --seed # full history +python sync.py --daily # append latest bars +``` + +## Strategy storage + +SQLite path: `/data/strategies/strategies.db` (Docker volume `strategy-data`). Each row stores username, name, ticker, and JSON parameters (MA windows, cash, fees). + +## Bugsink + +Set `BUGSINK_DSN` to your project DSN, e.g.: + +```text +http://@bugsink.aexoradao.com/ +``` + +Both `app.py` and `sync.py` initialize the Sentry-compatible SDK with tracing disabled for Bugsink compatibility. diff --git a/app.py b/app.py new file mode 100644 index 0000000..3011983 --- /dev/null +++ b/app.py @@ -0,0 +1,179 @@ +"""QuantTrade Streamlit dashboard.""" + +from __future__ import annotations + +import os + +import pandas as pd +import plotly.graph_objects as go +import streamlit as st +from plotly.subplots import make_subplots + +from auth import get_current_user +from backtest import load_ohlcv, run_ma_crossover +from strategy_db import delete_strategy, init_db, list_strategies, load_strategy, save_strategy +from telemetry import capture_exception, init_telemetry + +init_telemetry("quant-streamlit") +init_db() + +st.set_page_config( + page_title="QuantTrade", + page_icon="📈", + layout="wide", +) + +DEFAULT_TICKERS = os.environ.get( + "CORE_TICKERS", + "SPY,QQQ,AAPL,MSFT,GOOGL,AMZN,NVDA,META,IWM,TLT", +).split(",") + + +def render_equity_chart(result) -> None: + fig = make_subplots( + rows=2, + cols=1, + shared_xaxes=True, + vertical_spacing=0.08, + row_heights=[0.65, 0.35], + subplot_titles=(f"{result.ticker} Price", "Strategy Equity"), + ) + + fig.add_trace( + go.Scatter(x=result.price.index, y=result.price.values, name="Close", line=dict(color="#60a5fa")), + row=1, + col=1, + ) + fig.add_trace( + go.Scatter( + x=result.equity_curve.index, + y=result.equity_curve.values, + name="Equity", + line=dict(color="#34d399"), + ), + row=2, + col=1, + ) + fig.update_layout(height=640, template="plotly_dark", margin=dict(l=20, r=20, t=40, b=20)) + st.plotly_chart(fig, use_container_width=True) + + +def main() -> None: + user = get_current_user() + + st.title("QuantTrade") + st.caption("VectorBT backtests on local Parquet market data") + + with st.sidebar: + st.subheader("Account") + st.write(f"Signed in as **{user}**") + + st.divider() + st.subheader("Strategy") + ticker = st.selectbox( + "Ticker", + options=[t.strip().upper() for t in DEFAULT_TICKERS if t.strip()], + index=0, + ) + fast_window = st.slider("Fast MA", min_value=5, max_value=100, value=20, step=1) + slow_window = st.slider("Slow MA", min_value=20, max_value=250, value=50, step=1) + init_cash = st.number_input("Initial cash", min_value=1000.0, value=10_000.0, step=1000.0) + fees = st.number_input("Fees (fraction)", min_value=0.0, max_value=0.05, value=0.001, step=0.0005) + + run_clicked = st.button("Run Backtest", type="primary", use_container_width=True) + + st.divider() + st.subheader("Saved Strategies") + saved = list_strategies(user) + saved_names = [s.name for s in saved] + selected_name = st.selectbox("Load strategy", options=["—"] + saved_names) + + strategy_name = st.text_input("Strategy name", placeholder="My SPY crossover") + col_save, col_delete = st.columns(2) + with col_save: + save_clicked = st.button("Save Strategy", use_container_width=True) + with col_delete: + delete_clicked = st.button("Delete", use_container_width=True) + + params = { + "fast_window": fast_window, + "slow_window": slow_window, + "init_cash": init_cash, + "fees": fees, + } + + if save_clicked: + if not strategy_name.strip(): + st.sidebar.error("Enter a strategy name before saving.") + else: + save_strategy(user, strategy_name.strip(), ticker, params) + st.sidebar.success(f"Saved '{strategy_name.strip()}'.") + st.rerun() + + if delete_clicked and selected_name != "—": + delete_strategy(user, selected_name) + st.sidebar.success(f"Deleted '{selected_name}'.") + st.rerun() + + active_ticker = ticker + active_params = dict(params) + + if selected_name != "—": + loaded = load_strategy(user, selected_name) + if loaded: + active_ticker = loaded.ticker + active_params.update(loaded.params) + st.info(f"Loaded strategy **{loaded.name}** ({loaded.ticker}). Adjust sliders or run.") + + if run_clicked or selected_name != "—": + try: + load_ohlcv(active_ticker) + result = run_ma_crossover( + ticker=active_ticker, + fast_window=int(active_params["fast_window"]), + slow_window=int(active_params["slow_window"]), + init_cash=float(active_params.get("init_cash", init_cash)), + fees=float(active_params.get("fees", fees)), + ) + + c1, c2, c3, c4 = st.columns(4) + c1.metric("Sharpe Ratio", f"{result.sharpe_ratio:.2f}") + c2.metric("Max Drawdown", f"{result.max_drawdown:.1%}") + c3.metric("Total Return", f"{result.total_return:.1%}") + c4.metric("Bars", f"{len(result.price):,}") + + render_equity_chart(result) + + with st.expander("Raw stats"): + st.write( + pd.DataFrame( + { + "Metric": ["Ticker", "Fast MA", "Slow MA", "Sharpe", "Max DD", "Return"], + "Value": [ + result.ticker, + result.fast_window, + result.slow_window, + result.sharpe_ratio, + result.max_drawdown, + result.total_return, + ], + } + ) + ) + except FileNotFoundError: + st.warning( + f"No Parquet data for **{active_ticker}** yet. " + "Wait for the harvester seed job or check container logs." + ) + except ValueError as exc: + st.error(str(exc)) + except Exception as exc: + capture_exception(exc) + st.error("Backtest failed. The error was reported to Bugsink.") + st.exception(exc) + else: + st.info("Configure parameters in the sidebar and click **Run Backtest**.") + + +if __name__ == "__main__": + main() diff --git a/auth.py b/auth.py new file mode 100644 index 0000000..e7f297f --- /dev/null +++ b/auth.py @@ -0,0 +1,45 @@ +"""Read Authentik / reverse-proxy identity headers in Streamlit.""" + +from __future__ import annotations + +import os +from typing import Mapping + +HEADER_CANDIDATES = ( + "X-Forwarded-User", + "X-Authentik-Username", + "X-Authentik-Uid", + "Remote-User", + "X-Forwarded-Email", +) + + +def _normalize(value: str | None) -> str | None: + if not value: + return None + cleaned = value.strip() + return cleaned or None + + +def username_from_headers(headers: Mapping[str, str]) -> str | None: + lowered = {k.lower(): v for k, v in headers.items()} + for name in HEADER_CANDIDATES: + value = _normalize(lowered.get(name.lower())) + if value: + return value + return None + + +def get_current_user() -> str: + """Return the authenticated username from proxy-injected headers.""" + try: + from streamlit.web.server.websocket_headers import _get_websocket_headers + + headers = _get_websocket_headers() or {} + user = username_from_headers(headers) + if user: + return user + except Exception: + pass + + return os.environ.get("DEV_USER", "anonymous") diff --git a/backtest.py b/backtest.py new file mode 100644 index 0000000..ecd0f60 --- /dev/null +++ b/backtest.py @@ -0,0 +1,88 @@ +"""VectorBT backtest engine reading local Parquet OHLCV data.""" + +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path + +import pandas as pd +import vectorbt as vbt + + +@dataclass(frozen=True) +class BacktestResult: + ticker: str + fast_window: int + slow_window: int + sharpe_ratio: float + max_drawdown: float + total_return: float + equity_curve: pd.Series + price: pd.Series + + +def parquet_dir() -> Path: + return Path(os.environ.get("PARQUET_DIR", "/data/parquet")) + + +def load_ohlcv(ticker: str) -> pd.DataFrame: + path = parquet_dir() / f"{ticker.upper()}.parquet" + if not path.exists(): + raise FileNotFoundError(f"No Parquet file for {ticker.upper()} at {path}") + + df = pd.read_parquet(path) + if "Date" in df.columns: + df = df.set_index("Date") + df.index = pd.to_datetime(df.index) + df = df.sort_index() + return df + + +def run_ma_crossover( + ticker: str, + fast_window: int, + slow_window: int, + init_cash: float = 10_000.0, + fees: float = 0.001, +) -> BacktestResult: + if fast_window >= slow_window: + raise ValueError("Fast MA window must be smaller than slow MA window") + + ohlcv = load_ohlcv(ticker) + close = ohlcv["Close"].astype(float) + + fast_ma = vbt.MA.run(close, fast_window, short_name="fast") + slow_ma = vbt.MA.run(close, slow_window, short_name="slow") + + entries = fast_ma.ma_crossed_above(slow_ma) + exits = fast_ma.ma_crossed_below(slow_ma) + + portfolio = vbt.Portfolio.from_signals( + close, + entries=entries, + exits=exits, + init_cash=init_cash, + fees=fees, + freq="1D", + ) + + stats = portfolio.stats() + sharpe = float(stats.get("Sharpe Ratio", 0.0) or 0.0) + max_dd = float(stats.get("Max Drawdown [%]", 0.0) or 0.0) / 100.0 + total_return = float(stats.get("Total Return [%]", 0.0) or 0.0) / 100.0 + + equity = portfolio.value() + if isinstance(equity, pd.DataFrame): + equity = equity.iloc[:, 0] + + return BacktestResult( + ticker=ticker.upper(), + fast_window=fast_window, + slow_window=slow_window, + sharpe_ratio=sharpe, + max_drawdown=max_dd, + total_return=total_return, + equity_curve=equity, + price=close, + ) diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..07592e5 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,74 @@ +services: + data-seed: + build: + context: . + dockerfile: Dockerfile.harvester + exclude_from_hc: true + restart: "no" + environment: + - 'BUGSINK_DSN=${BUGSINK_DSN:-}' + - 'APP_ENV=${APP_ENV:-production}' + - 'APP_RELEASE=${APP_RELEASE:-quant-web@1.0.0}' + - 'CORE_TICKERS=${CORE_TICKERS:-SPY,QQQ,AAPL,MSFT,GOOGL,AMZN,NVDA,META,IWM,TLT}' + - 'SEED_YEARS=${SEED_YEARS:-5}' + - 'PARQUET_DIR=/data/parquet' + - 'TZ=America/New_York' + volumes: + - 'parquet-data:/data/parquet' + entrypoint: ["bash", "-c"] + command: + - > + if [ -f /data/parquet/.seed_complete ]; then echo "Parquet seed already present"; exit 0; fi; + python /app/sync.py --seed; + touch /data/parquet/.seed_complete; + echo "Seed finished"; + + harvester: + build: + context: . + dockerfile: Dockerfile.harvester + restart: unless-stopped + depends_on: + data-seed: + condition: service_completed_successfully + environment: + - 'BUGSINK_DSN=${BUGSINK_DSN:-}' + - 'APP_ENV=${APP_ENV:-production}' + - 'APP_RELEASE=${APP_RELEASE:-quant-web@1.0.0}' + - 'CORE_TICKERS=${CORE_TICKERS:-SPY,QQQ,AAPL,MSFT,GOOGL,AMZN,NVDA,META,IWM,TLT}' + - 'SEED_YEARS=${SEED_YEARS:-5}' + - 'PARQUET_DIR=/data/parquet' + - 'TZ=America/New_York' + volumes: + - 'parquet-data:/data/parquet' + + streamlit: + build: + context: . + dockerfile: Dockerfile + restart: unless-stopped + depends_on: + data-seed: + condition: service_completed_successfully + environment: + - SERVICE_URL_STREAMLIT_8501 + - 'BUGSINK_DSN=${BUGSINK_DSN:-}' + - 'APP_ENV=${APP_ENV:-production}' + - 'APP_RELEASE=${APP_RELEASE:-quant-web@1.0.0}' + - 'CORE_TICKERS=${CORE_TICKERS:-SPY,QQQ,AAPL,MSFT,GOOGL,AMZN,NVDA,META,IWM,TLT}' + - 'PARQUET_DIR=/data/parquet' + - 'STRATEGY_DB_PATH=/data/strategies/strategies.db' + - 'DEV_USER=${DEV_USER:-anonymous}' + volumes: + - 'parquet-data:/data/parquet' + - 'strategy-data:/data/strategies' + healthcheck: + test: ['CMD-SHELL', 'curl -sf http://127.0.0.1:8501/_stcore/health || exit 1'] + interval: 30s + timeout: 5s + start_period: 30s + retries: 3 + +volumes: + parquet-data: + strategy-data: diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..5fc12bf --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +streamlit>=1.32.0 +vectorbt>=0.26.0 +yfinance>=0.2.36 +pandas>=2.1.0 +numpy>=1.26.0 +pyarrow>=15.0.0 +plotly>=5.18.0 +sentry-sdk>=2.0.0 diff --git a/scripts/cron-sync b/scripts/cron-sync new file mode 100644 index 0000000..18bab25 --- /dev/null +++ b/scripts/cron-sync @@ -0,0 +1,2 @@ +# m h dom mon dow command +0 17 * * 1-5 root /usr/local/bin/run-daily-sync.sh >> /var/log/quant-sync.log 2>&1 diff --git a/scripts/harvester-entrypoint.sh b/scripts/harvester-entrypoint.sh new file mode 100644 index 0000000..1c6038c --- /dev/null +++ b/scripts/harvester-entrypoint.sh @@ -0,0 +1,16 @@ +#!/bin/bash +set -euo pipefail + +echo "Quant harvester starting (TZ=${TZ:-UTC})" + +if [ ! -f "/data/parquet/.seed_complete" ]; then + echo "Running initial historical seed..." + python /app/sync.py --seed + touch /data/parquet/.seed_complete + echo "Seed complete." +else + echo "Seed marker found; skipping full history download." +fi + +echo "Launching cron (weekdays 17:00 America/New_York)..." +exec cron -f diff --git a/scripts/run-daily-sync.sh b/scripts/run-daily-sync.sh new file mode 100644 index 0000000..5c73d5c --- /dev/null +++ b/scripts/run-daily-sync.sh @@ -0,0 +1,13 @@ +#!/bin/bash +set -euo pipefail + +export PARQUET_DIR="${PARQUET_DIR:-/data/parquet}" +export CORE_TICKERS="${CORE_TICKERS:-SPY,QQQ,AAPL,MSFT,GOOGL,AMZN,NVDA,META,IWM,TLT}" +export SEED_YEARS="${SEED_YEARS:-5}" +export BUGSINK_DSN="${BUGSINK_DSN:-}" +export APP_ENV="${APP_ENV:-production}" +export APP_RELEASE="${APP_RELEASE:-quant-web@1.0.0}" +export TZ="${TZ:-America/New_York}" + +cd /app +exec /usr/local/bin/python sync.py --daily diff --git a/strategy_db.py b/strategy_db.py new file mode 100644 index 0000000..075ab9d --- /dev/null +++ b/strategy_db.py @@ -0,0 +1,135 @@ +"""SQLite persistence for user-saved strategies.""" + +from __future__ import annotations + +import json +import os +import sqlite3 +from contextlib import contextmanager +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any + + +@dataclass(frozen=True) +class SavedStrategy: + id: int + username: str + name: str + ticker: str + params: dict[str, Any] + created_at: str + + +def _db_path() -> str: + return os.environ.get("STRATEGY_DB_PATH", "/data/strategies/strategies.db") + + +def init_db() -> None: + path = _db_path() + os.makedirs(os.path.dirname(path), exist_ok=True) + with _connect() as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS strategies ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + username TEXT NOT NULL, + name TEXT NOT NULL, + ticker TEXT NOT NULL, + params_json TEXT NOT NULL, + created_at TEXT NOT NULL, + UNIQUE(username, name) + ) + """ + ) + conn.commit() + + +@contextmanager +def _connect(): + conn = sqlite3.connect(_db_path()) + conn.row_factory = sqlite3.Row + try: + yield conn + finally: + conn.close() + + +def save_strategy( + username: str, + name: str, + ticker: str, + params: dict[str, Any], +) -> None: + created_at = datetime.now(timezone.utc).isoformat() + with _connect() as conn: + conn.execute( + """ + INSERT INTO strategies (username, name, ticker, params_json, created_at) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(username, name) DO UPDATE SET + ticker = excluded.ticker, + params_json = excluded.params_json, + created_at = excluded.created_at + """, + (username, name.strip(), ticker.upper(), json.dumps(params), created_at), + ) + conn.commit() + + +def list_strategies(username: str) -> list[SavedStrategy]: + with _connect() as conn: + rows = conn.execute( + """ + SELECT id, username, name, ticker, params_json, created_at + FROM strategies + WHERE username = ? + ORDER BY created_at DESC + """, + (username,), + ).fetchall() + + return [ + SavedStrategy( + id=row["id"], + username=row["username"], + name=row["name"], + ticker=row["ticker"], + params=json.loads(row["params_json"]), + created_at=row["created_at"], + ) + for row in rows + ] + + +def load_strategy(username: str, name: str) -> SavedStrategy | None: + with _connect() as conn: + row = conn.execute( + """ + SELECT id, username, name, ticker, params_json, created_at + FROM strategies + WHERE username = ? AND name = ? + """, + (username, name), + ).fetchone() + + if row is None: + return None + + return SavedStrategy( + id=row["id"], + username=row["username"], + name=row["name"], + ticker=row["ticker"], + params=json.loads(row["params_json"]), + created_at=row["created_at"], + ) + + +def delete_strategy(username: str, name: str) -> None: + with _connect() as conn: + conn.execute( + "DELETE FROM strategies WHERE username = ? AND name = ?", + (username, name), + ) + conn.commit() diff --git a/sync.py b/sync.py new file mode 100644 index 0000000..6ca30f8 --- /dev/null +++ b/sync.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +"""Download Yahoo Finance OHLCV data and persist as Parquet files.""" + +from __future__ import annotations + +import argparse +import logging +import os +import sys +from datetime import datetime, timedelta +from pathlib import Path +from zoneinfo import ZoneInfo + +import pandas as pd +import yfinance as yf + +from telemetry import capture_exception, init_telemetry + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", +) +logger = logging.getLogger("sync") + +ET = ZoneInfo("America/New_York") + + +def parquet_dir() -> Path: + path = Path(os.environ.get("PARQUET_DIR", "/data/parquet")) + path.mkdir(parents=True, exist_ok=True) + return path + + +def core_tickers() -> list[str]: + raw = os.environ.get( + "CORE_TICKERS", + "SPY,QQQ,AAPL,MSFT,GOOGL,AMZN,NVDA,META,IWM,TLT", + ) + return [t.strip().upper() for t in raw.split(",") if t.strip()] + + +def seed_years() -> int: + return int(os.environ.get("SEED_YEARS", "5")) + + +def parquet_path(ticker: str) -> Path: + return parquet_dir() / f"{ticker.upper()}.parquet" + + +def normalize_frame(df: pd.DataFrame) -> pd.DataFrame: + if isinstance(df.columns, pd.MultiIndex): + df.columns = [col[0] if isinstance(col, tuple) else col for col in df.columns] + + df = df.reset_index() + if "Datetime" in df.columns and "Date" not in df.columns: + df = df.rename(columns={"Datetime": "Date"}) + if "Date" not in df.columns and "index" in df.columns: + df = df.rename(columns={"index": "Date"}) + + keep = [c for c in ["Date", "Open", "High", "Low", "Close", "Volume"] if c in df.columns] + df = df[keep].copy() + df["Date"] = pd.to_datetime(df["Date"]).dt.tz_localize(None) + df = df.drop_duplicates(subset=["Date"], keep="last").sort_values("Date") + return df + + +def download_history(ticker: str, start: datetime, end: datetime | None = None) -> pd.DataFrame: + end = end or datetime.now(tz=ET) + raw = yf.download( + ticker, + start=start.date(), + end=(end + timedelta(days=1)).date(), + auto_adjust=True, + progress=False, + threads=False, + ) + if raw.empty: + raise RuntimeError(f"No data returned for {ticker}") + return normalize_frame(raw) + + +def save_ticker(ticker: str, frame: pd.DataFrame) -> None: + path = parquet_path(ticker) + frame.to_parquet(path, index=False) + logger.info("Wrote %s rows to %s", len(frame), path) + + +def seed_all() -> int: + tickers = core_tickers() + start = datetime.now(tz=ET) - timedelta(days=365 * seed_years()) + failures = 0 + + for ticker in tickers: + try: + frame = download_history(ticker, start=start) + save_ticker(ticker, frame) + except Exception as exc: + failures += 1 + logger.exception("Seed failed for %s", ticker) + capture_exception(exc) + + return failures + + +def append_daily(ticker: str) -> None: + path = parquet_path(ticker) + if path.exists(): + existing = pd.read_parquet(path) + existing["Date"] = pd.to_datetime(existing["Date"]) + last_date = existing["Date"].max() + start = last_date.to_pydatetime() + else: + start = datetime.now(tz=ET) - timedelta(days=30) + existing = pd.DataFrame() + + new_frame = download_history(ticker, start=start - timedelta(days=5)) + if existing.empty: + merged = new_frame + else: + merged = ( + pd.concat([existing, new_frame], ignore_index=True) + .drop_duplicates(subset=["Date"], keep="last") + .sort_values("Date") + ) + + save_ticker(ticker, merged) + + +def sync_daily() -> int: + failures = 0 + for ticker in core_tickers(): + try: + append_daily(ticker) + except Exception as exc: + failures += 1 + logger.exception("Daily sync failed for %s", ticker) + capture_exception(exc) + return failures + + +def main(argv: list[str] | None = None) -> int: + init_telemetry("quant-harvester") + + parser = argparse.ArgumentParser(description="Yahoo Finance -> Parquet sync") + parser.add_argument("--seed", action="store_true", help="Download full history") + parser.add_argument("--daily", action="store_true", help="Append latest daily bars") + args = parser.parse_args(argv) + + if not args.seed and not args.daily: + parser.error("Specify --seed and/or --daily") + + exit_code = 0 + if args.seed: + logger.info("Starting historical seed for %s", ", ".join(core_tickers())) + if seed_all() > 0: + exit_code = 1 + + if args.daily: + logger.info("Starting daily append for %s", ", ".join(core_tickers())) + if sync_daily() > 0: + exit_code = 1 + + return exit_code + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/telemetry.py b/telemetry.py new file mode 100644 index 0000000..a75a6ce --- /dev/null +++ b/telemetry.py @@ -0,0 +1,43 @@ +"""Bugsink telemetry via Sentry-compatible SDK.""" + +from __future__ import annotations + +import logging +import os + +_initialized = False + + +def init_telemetry(service_name: str) -> None: + global _initialized + if _initialized: + return + + dsn = os.environ.get("BUGSINK_DSN", "").strip() + if not dsn: + logging.getLogger(__name__).info( + "BUGSINK_DSN not set; telemetry disabled for %s", service_name + ) + _initialized = True + return + + import sentry_sdk + + sentry_sdk.init( + dsn=dsn, + environment=os.environ.get("APP_ENV", "production"), + release=os.environ.get("APP_RELEASE", "quant-web@1.0.0"), + traces_sample_rate=0, + send_client_reports=False, + auto_session_tracking=False, + ) + sentry_sdk.set_tag("service", service_name) + _initialized = True + + +def capture_exception(exc: BaseException) -> None: + if not os.environ.get("BUGSINK_DSN", "").strip(): + return + import sentry_sdk + + sentry_sdk.capture_exception(exc)