Streamlit + VectorBT dashboard, Parquet harvester with nightly cron, Authentik header auth, SQLite strategy persistence, and Bugsink telemetry. Co-authored-by: Cursor <cursoragent@cursor.com>
167 lines
4.7 KiB
Python
167 lines
4.7 KiB
Python
#!/usr/bin/env python3
|
|
"""Download Yahoo Finance OHLCV data and persist as Parquet files."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from zoneinfo import ZoneInfo
|
|
|
|
import pandas as pd
|
|
import yfinance as yf
|
|
|
|
from telemetry import capture_exception, init_telemetry
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(message)s",
|
|
)
|
|
logger = logging.getLogger("sync")
|
|
|
|
ET = ZoneInfo("America/New_York")
|
|
|
|
|
|
def parquet_dir() -> Path:
|
|
path = Path(os.environ.get("PARQUET_DIR", "/data/parquet"))
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
return path
|
|
|
|
|
|
def core_tickers() -> list[str]:
|
|
raw = os.environ.get(
|
|
"CORE_TICKERS",
|
|
"SPY,QQQ,AAPL,MSFT,GOOGL,AMZN,NVDA,META,IWM,TLT",
|
|
)
|
|
return [t.strip().upper() for t in raw.split(",") if t.strip()]
|
|
|
|
|
|
def seed_years() -> int:
|
|
return int(os.environ.get("SEED_YEARS", "5"))
|
|
|
|
|
|
def parquet_path(ticker: str) -> Path:
|
|
return parquet_dir() / f"{ticker.upper()}.parquet"
|
|
|
|
|
|
def normalize_frame(df: pd.DataFrame) -> pd.DataFrame:
|
|
if isinstance(df.columns, pd.MultiIndex):
|
|
df.columns = [col[0] if isinstance(col, tuple) else col for col in df.columns]
|
|
|
|
df = df.reset_index()
|
|
if "Datetime" in df.columns and "Date" not in df.columns:
|
|
df = df.rename(columns={"Datetime": "Date"})
|
|
if "Date" not in df.columns and "index" in df.columns:
|
|
df = df.rename(columns={"index": "Date"})
|
|
|
|
keep = [c for c in ["Date", "Open", "High", "Low", "Close", "Volume"] if c in df.columns]
|
|
df = df[keep].copy()
|
|
df["Date"] = pd.to_datetime(df["Date"]).dt.tz_localize(None)
|
|
df = df.drop_duplicates(subset=["Date"], keep="last").sort_values("Date")
|
|
return df
|
|
|
|
|
|
def download_history(ticker: str, start: datetime, end: datetime | None = None) -> pd.DataFrame:
|
|
end = end or datetime.now(tz=ET)
|
|
raw = yf.download(
|
|
ticker,
|
|
start=start.date(),
|
|
end=(end + timedelta(days=1)).date(),
|
|
auto_adjust=True,
|
|
progress=False,
|
|
threads=False,
|
|
)
|
|
if raw.empty:
|
|
raise RuntimeError(f"No data returned for {ticker}")
|
|
return normalize_frame(raw)
|
|
|
|
|
|
def save_ticker(ticker: str, frame: pd.DataFrame) -> None:
|
|
path = parquet_path(ticker)
|
|
frame.to_parquet(path, index=False)
|
|
logger.info("Wrote %s rows to %s", len(frame), path)
|
|
|
|
|
|
def seed_all() -> int:
|
|
tickers = core_tickers()
|
|
start = datetime.now(tz=ET) - timedelta(days=365 * seed_years())
|
|
failures = 0
|
|
|
|
for ticker in tickers:
|
|
try:
|
|
frame = download_history(ticker, start=start)
|
|
save_ticker(ticker, frame)
|
|
except Exception as exc:
|
|
failures += 1
|
|
logger.exception("Seed failed for %s", ticker)
|
|
capture_exception(exc)
|
|
|
|
return failures
|
|
|
|
|
|
def append_daily(ticker: str) -> None:
|
|
path = parquet_path(ticker)
|
|
if path.exists():
|
|
existing = pd.read_parquet(path)
|
|
existing["Date"] = pd.to_datetime(existing["Date"])
|
|
last_date = existing["Date"].max()
|
|
start = last_date.to_pydatetime()
|
|
else:
|
|
start = datetime.now(tz=ET) - timedelta(days=30)
|
|
existing = pd.DataFrame()
|
|
|
|
new_frame = download_history(ticker, start=start - timedelta(days=5))
|
|
if existing.empty:
|
|
merged = new_frame
|
|
else:
|
|
merged = (
|
|
pd.concat([existing, new_frame], ignore_index=True)
|
|
.drop_duplicates(subset=["Date"], keep="last")
|
|
.sort_values("Date")
|
|
)
|
|
|
|
save_ticker(ticker, merged)
|
|
|
|
|
|
def sync_daily() -> int:
|
|
failures = 0
|
|
for ticker in core_tickers():
|
|
try:
|
|
append_daily(ticker)
|
|
except Exception as exc:
|
|
failures += 1
|
|
logger.exception("Daily sync failed for %s", ticker)
|
|
capture_exception(exc)
|
|
return failures
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
init_telemetry("quant-harvester")
|
|
|
|
parser = argparse.ArgumentParser(description="Yahoo Finance -> Parquet sync")
|
|
parser.add_argument("--seed", action="store_true", help="Download full history")
|
|
parser.add_argument("--daily", action="store_true", help="Append latest daily bars")
|
|
args = parser.parse_args(argv)
|
|
|
|
if not args.seed and not args.daily:
|
|
parser.error("Specify --seed and/or --daily")
|
|
|
|
exit_code = 0
|
|
if args.seed:
|
|
logger.info("Starting historical seed for %s", ", ".join(core_tickers()))
|
|
if seed_all() > 0:
|
|
exit_code = 1
|
|
|
|
if args.daily:
|
|
logger.info("Starting daily append for %s", ", ".join(core_tickers()))
|
|
if sync_daily() > 0:
|
|
exit_code = 1
|
|
|
|
return exit_code
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|