quant-web/strategies/executor.py
epistemophiliac 627b2326df Add Python strategy engine, parameter optimization, and faster Docker builds.
Support builtin and custom generate_signals strategies with SQLite persistence, exhaustive grid scans (VectorBT comb optimization for MA crossover), professional backtest/optimize UI, and split harvester/app requirements with BuildKit pip cache.
2026-06-19 01:29:28 -04:00

178 lines
5.4 KiB
Python

"""Execute builtin and user-authored Python strategies."""
from __future__ import annotations
import itertools
from typing import Any
import numpy as np
import pandas as pd
import vectorbt as vbt
from strategies.registry import BuiltinStrategy, get_builtin
SAFE_GLOBALS: dict[str, Any] = {
"__builtins__": {
"range": range,
"len": len,
"min": min,
"max": max,
"abs": abs,
"float": float,
"int": int,
"bool": bool,
"list": list,
"dict": dict,
"tuple": tuple,
"zip": zip,
"enumerate": enumerate,
"sum": sum,
"round": round,
},
"np": np,
"pd": pd,
"vbt": vbt,
}
CUSTOM_TEMPLATE = '''import pandas as pd
import vectorbt as vbt
# Optional: define PARAM_GRID for optimization scans
PARAM_GRID = {
"fast_window": list(range(10, 41, 5)),
"slow_window": list(range(50, 151, 10)),
}
DEFAULT_PARAMS = {
"fast_window": 20,
"slow_window": 50,
}
def generate_signals(close, high, low, volume, **params):
"""Return (entries, exits) as boolean Series aligned to close."""
fast_w = int(params.get("fast_window", 20))
slow_w = int(params.get("slow_window", 50))
if fast_w >= slow_w:
raise ValueError("fast_window must be < slow_window")
fast_ma = vbt.MA.run(close, fast_w, short_name="fast")
slow_ma = vbt.MA.run(close, slow_w, short_name="slow")
entries = fast_ma.ma_crossed_above(slow_ma).fillna(False)
exits = fast_ma.ma_crossed_below(slow_ma).fillna(False)
return entries, exits
'''
class StrategyError(ValueError):
pass
def load_custom_strategy(source_code: str) -> tuple[Any, dict, dict]:
namespace: dict[str, Any] = {}
try:
exec(source_code, SAFE_GLOBALS, namespace)
except Exception as exc:
raise StrategyError(f"Strategy compile error: {exc}") from exc
generate = namespace.get("generate_signals")
if not callable(generate):
raise StrategyError("Custom strategy must define generate_signals(close, high, low, volume, **params)")
param_grid = namespace.get("PARAM_GRID", {})
default_params = namespace.get("DEFAULT_PARAMS", {})
if not isinstance(param_grid, dict):
raise StrategyError("PARAM_GRID must be a dict of param -> list of values")
if not isinstance(default_params, dict):
raise StrategyError("DEFAULT_PARAMS must be a dict")
return generate, param_grid, default_params
def run_builtin_signals(
builtin: BuiltinStrategy,
ohlcv: pd.DataFrame,
params: dict[str, Any],
) -> tuple[pd.Series, pd.Series]:
close = ohlcv["Close"].astype(float)
high = ohlcv.get("High", close).astype(float)
low = ohlcv.get("Low", close).astype(float)
volume = ohlcv.get("Volume", pd.Series(0, index=close.index)).astype(float)
entries, exits = builtin.generate_signals(close, high, low, volume, **params)
return _coerce_signals(entries, exits, close.index)
def run_custom_signals(
source_code: str,
ohlcv: pd.DataFrame,
params: dict[str, Any],
) -> tuple[pd.Series, pd.Series, dict, dict]:
generate, param_grid, defaults = load_custom_strategy(source_code)
merged = {**defaults, **params}
close = ohlcv["Close"].astype(float)
high = ohlcv.get("High", close).astype(float)
low = ohlcv.get("Low", close).astype(float)
volume = ohlcv.get("Volume", pd.Series(0, index=close.index)).astype(float)
entries, exits = generate(close, high, low, volume, **merged)
return _coerce_signals(entries, exits, close.index), param_grid, merged
def _coerce_signals(entries, exits, index: pd.Index) -> tuple[pd.Series, pd.Series]:
e = pd.Series(entries, index=index).fillna(False).astype(bool)
x = pd.Series(exits, index=index).fillna(False).astype(bool)
return e, x
def optimize_custom(
source_code: str,
close: pd.Series,
ohlcv: pd.DataFrame,
init_cash: float,
fees: float,
metric: str,
param_grid: dict | None = None,
max_combos: int = 2_500,
) -> pd.DataFrame:
from metrics import run_from_signals
generate, grid, defaults = load_custom_strategy(source_code)
grid = param_grid or grid
if not grid:
raise StrategyError("Define PARAM_GRID in your strategy to run optimization.")
keys = list(grid.keys())
combos = list(itertools.product(*(grid[k] for k in keys)))
if len(combos) > max_combos:
raise StrategyError(
f"Grid has {len(combos):,} combinations (max {max_combos:,}). "
"Widen step sizes or narrow ranges in PARAM_GRID."
)
high = ohlcv.get("High", close).astype(float)
low = ohlcv.get("Low", close).astype(float)
volume = ohlcv.get("Volume", pd.Series(0, index=close.index)).astype(float)
rows = []
for values in combos:
params = {**defaults, **dict(zip(keys, values))}
try:
entries, exits = generate(close, high, low, volume, **params)
entries, exits = _coerce_signals(entries, exits, close.index)
rows.append(
run_from_signals(
close=close,
entries=entries,
exits=exits,
init_cash=init_cash,
fees=fees,
params=params,
metric=metric,
)
)
except Exception:
continue
frame = pd.DataFrame(rows)
if frame.empty:
return frame
return frame.sort_values("score", ascending=False, na_position="last")