"""Build moo_asym v1.0.1 dashboard DATA JSON. Reads walker_trades.parquet, applies user-specified sizing rules, computes backtest at 4 equity tiers, daily activity, liquidity, and Monte Carlo forecasts (6m + 12m). Writes JSON to be consumed by HTML dashboard. """ from __future__ import annotations import json from pathlib import Path from collections import Counter import numpy as np import pandas as pd ROOT = Path("C:/datum-api-examples-main") OUT_DIR = Path("C:/Users/wsu/Downloads/viz") WALKER = ROOT / "research_results/asymmetric_moo1100_v2/walker_trades.parquet" CAPACITY = ROOT / "research_results/asymmetric_moo1100_v3/2_capacity_execution/capacity_results.csv" TIERS = [ {"name": "$10K (Phase 1)", "equity": 10000, "risk_per_pos": 150, "daily_flat": 500}, {"name": "$25K (Phase 2A)", "equity": 25000, "risk_per_pos": 375, "daily_flat": 1250}, {"name": "$75K (Phase 2B)", "equity": 75000, "risk_per_pos": 1125, "daily_flat": 3750}, {"name": "$250K (Phase 3)", "equity": 250000, "risk_per_pos": 3750, "daily_flat": 12500}, ] STOP_PCT = 2.0 SLIP_BPS = 14.0 FEE = 2.0 LOC_BPS = 0.5 MAX_CON = 2 LIQ_F = 0.05 T1E = "gap_ge10_x_px_lt5" RNG = np.random.default_rng(42) NP = 1000 H6 = 126 H12 = 252 def costs(g, side, pos): n = g - SLIP_BPS / 100.0 if side == "SHORT": n -= LOC_BPS / 100.0 return n - FEE / max(pos, 1) * 100.0 def psize(eq, stop, risk, dvol): raw = risk / (stop / 100.0) cap = LIQ_F * dvol if dvol and dvol > 0 else raw return min(raw, cap, eq * 0.5) def sim(tr, eq0, risk, flat, stop=STOP_PCT, calendar_dates=None): """Simulate over full calendar grid (carry equity on no-trade days).""" eq = eq0 out = [] peak = eq by_date = dict(list(tr.groupby("date", sort=True))) dates = calendar_dates if calendar_dates is not None else sorted(by_date.keys()) for date in dates: pnl = 0.0 n = 0 killed = False if date in by_date: for _, t in by_date[date].iterrows(): if killed or n >= MAX_CON: break pos = psize(eq, stop, risk, float(t.get("pm_dvol", 0) or 0)) net = costs(float(t["realized_gross"]), t["edge_side"], pos) pnl += pos * net / 100.0 n += 1 if pnl <= -flat: killed = True eq += pnl peak = max(peak, eq) out.append({ "date": str(pd.Timestamp(date).date()), "pnl": pnl, "equity": eq, "peak": peak, "dd": eq - peak, "n_trades": n, "killed": killed, }) return out print("Loading walker trades...") ta = pd.read_parquet(WALKER).copy() ta["date"] = pd.to_datetime(ta["date"]) ta = ta.sort_values(["date", "day_rank"]).reset_index(drop=True) print(f" total: {len(ta)} | days: {ta['date'].nunique()}") t1 = ta[ta["edge_name"] == T1E].copy() print(f" Tier 1: {len(t1)}") cap = pd.read_csv(CAPACITY) def make_bt(tr, calendar_dates=None): out = {} for tier in TIERS: d = sim(tr, tier["equity"], tier["risk_per_pos"], tier["daily_flat"], calendar_dates=calendar_dates) if not d: continue df = pd.DataFrame(d) ndays = df["date"].nunique() ddmin = df["dd"].min() std_pnl = df["pnl"].std(ddof=1) out[tier["name"]] = { "equity_curve": [ {"d": r["date"], "e": round(r["equity"], 2), "p": round(r["pnl"], 2), "dd": round(r["dd"], 2), "n": r["n_trades"]} for r in d ], "summary": { "start": tier["equity"], "end": round(df["equity"].iloc[-1], 2), "total_pnl": round(df["pnl"].sum(), 2), "ret_pct": round((df["equity"].iloc[-1] / tier["equity"] - 1) * 100, 2), "max_dd_usd": round(ddmin, 2), "max_dd_pct": round(ddmin / tier["equity"] * 100, 2), "sharpe_daily": round(df["pnl"].mean() / std_pnl * np.sqrt(252), 2) if std_pnl > 0 else 0.0, "n_days_active": int((df["n_trades"] > 0).sum()), "n_days_total": int(ndays), "kill_days": int(df["killed"].sum()), "win_days": int((df["pnl"] > 0).sum()), "loss_days": int((df["pnl"] < 0).sum()), "flat_days": int((df["pnl"] == 0).sum()), "best_day": round(df["pnl"].max(), 2), "worst_day": round(df["pnl"].min(), 2), }, } return out print("Backtesting...") calendar = sorted(ta["date"].unique()) # full trading-day grid bt1 = make_bt(t1, calendar_dates=calendar) bta = make_bt(ta, calendar_dates=calendar) def aggs(tr): if not len(tr): return {} g = tr["realized_gross"] df = tr.copy() df["year"] = df["date"].dt.year df["month"] = df["date"].dt.to_period("M").astype(str) df["weekday"] = df["date"].dt.day_name() by_y = df.groupby("year").agg( n=("realized_gross", "size"), wr=("realized_gross", lambda x: (x > 0).mean()), mean=("realized_gross", "mean"), sum=("realized_gross", "sum"), ).round(3).reset_index().to_dict(orient="records") by_m = df.groupby("month").agg( n=("realized_gross", "size"), sum=("realized_gross", "sum"), mean=("realized_gross", "mean"), ).round(3).reset_index().to_dict(orient="records") by_w = df.groupby("weekday").agg( n=("realized_gross", "size"), wr=("realized_gross", lambda x: (x > 0).mean()), mean=("realized_gross", "mean"), ).round(3).reset_index().to_dict(orient="records") by_t = df.groupby("ticker").agg( n=("realized_gross", "size"), wr=("realized_gross", lambda x: (x > 0).mean()), sum=("realized_gross", "sum"), ).round(3).reset_index().sort_values("n", ascending=False).head(30).to_dict(orient="records") return { "n": int(len(tr)), "win_rate": round(float((g > 0).mean()), 4), "mean_pct": round(float(g.mean()), 3), "median_pct": round(float(g.median()), 3), "std_pct": round(float(g.std(ddof=1)), 3), "pf": round(float(g[g > 0].sum() / max(abs(g[g < 0].sum()), 1e-9)), 3), "by_year": by_y, "by_month": by_m, "by_weekday": by_w, "top_tickers": by_t, } a1 = aggs(t1) aa = aggs(ta) def activity(tr, all_dates): if not len(tr): return {} by = tr.groupby("date").size() full = pd.Series(0, index=all_dates) full.loc[by.index] = by.values cnts = Counter(full.tolist()) total = len(full) flat = cnts.get(0, 0) return { "total_trading_days": int(total), "active_days": int((full > 0).sum()), "flat_days": int(flat), "flat_pct": round(flat / total * 100, 1), "active_pct": round((full > 0).mean() * 100, 1), "histogram": [{"n_trades": int(k), "n_days": int(v)} for k, v in sorted(cnts.items())], "mean_trades_per_active_day": round(float(full[full > 0].mean()), 2) if (full > 0).any() else 0.0, "mean_trades_per_calendar_day": round(float(full.mean()), 2), "max_trades_in_day": int(full.max()), } all_d = pd.DatetimeIndex(sorted(ta["date"].unique())) ac1 = activity(t1, all_d) aca = activity(ta, all_d) def liq(tr, eq, risk): df = tr.copy() raw = risk / 0.02 df["pos"] = pd.Series(raw, index=df.index).clip(upper=df["pm_dvol"] * LIQ_F).clip(upper=eq * 0.5) df["pct"] = df["pos"] / df["pm_dvol"] * 100 return { "pm_dvol_p25": round(float(df["pm_dvol"].quantile(0.25)) / 1e6, 2), "pm_dvol_median": round(float(df["pm_dvol"].median()) / 1e6, 2), "pm_dvol_p75": round(float(df["pm_dvol"].quantile(0.75)) / 1e6, 2), "pct_dvol_median": round(float(df["pct"].median()), 3), "pct_dvol_p95": round(float(df["pct"].quantile(0.95)), 3), "pos_usd_median": round(float(df["pos"].median()), 0), "histogram_dvol_musd": [ {"bucket": "0-1M", "n": int(((df["pm_dvol"] >= 0) & (df["pm_dvol"] < 1e6)).sum())}, {"bucket": "1-5M", "n": int(((df["pm_dvol"] >= 1e6) & (df["pm_dvol"] < 5e6)).sum())}, {"bucket": "5-10M", "n": int(((df["pm_dvol"] >= 5e6) & (df["pm_dvol"] < 10e6)).sum())}, {"bucket": "10-50M", "n": int(((df["pm_dvol"] >= 10e6) & (df["pm_dvol"] < 50e6)).sum())}, {"bucket": "50M+", "n": int((df["pm_dvol"] >= 50e6).sum())}, ], } lq1 = {tier["name"]: liq(t1, tier["equity"], tier["risk_per_pos"]) for tier in TIERS} def mc(returns, lam, ndays, npaths, eq0, risk, flat, stop=STOP_PCT, slip=0.16): paths = np.zeros((npaths, ndays + 1)) paths[:, 0] = eq0 n = len(returns) base = risk / (stop / 100.0) for p in range(npaths): eq = eq0 for d in range(1, ndays + 1): nt = min(int(RNG.poisson(lam)), MAX_CON) pnl = 0.0 killed = False for _ in range(nt): if killed: break gross = returns[RNG.integers(0, n)] net = gross - slip pos = min(base, eq * 0.5) pnl += pos * net / 100.0 if pnl <= -flat: killed = True eq += pnl paths[p, d] = eq if eq <= 0: paths[p, d:] = 0 break return paths def mc_sum(paths, start, ndays): final = paths[:, -1] step = max(1, ndays // 50) kd = list(range(0, ndays + 1, step)) if kd[-1] != ndays: kd.append(ndays) q = { "p05": np.quantile(paths[:, kd], 0.05, axis=0).round(0).tolist(), "p25": np.quantile(paths[:, kd], 0.25, axis=0).round(0).tolist(), "p50": np.quantile(paths[:, kd], 0.50, axis=0).round(0).tolist(), "p75": np.quantile(paths[:, kd], 0.75, axis=0).round(0).tolist(), "p95": np.quantile(paths[:, kd], 0.95, axis=0).round(0).tolist(), "days": kd, } sample = paths[RNG.choice(paths.shape[0], 30, replace=False), :][:, kd].round(0).tolist() peaks = np.maximum.accumulate(paths, axis=1) dds = paths - peaks md = dds.min(axis=1) return { "final_p05": round(float(np.quantile(final, 0.05)), 0), "final_p25": round(float(np.quantile(final, 0.25)), 0), "final_p50": round(float(np.quantile(final, 0.50)), 0), "final_p75": round(float(np.quantile(final, 0.75)), 0), "final_p95": round(float(np.quantile(final, 0.95)), 0), "final_mean": round(float(np.mean(final)), 0), "p_double": round(float((final >= start * 2).mean()), 4), "p_25k": round(float((final >= 25000).mean()), 4), "p_75k": round(float((final >= 75000).mean()), 4), "p_loss": round(float((final < start).mean()), 4), "p_dd_500": round(float((md <= -500).mean()), 4), "p_dd_1000": round(float((md <= -1000).mean()), 4), "p_dd_2000": round(float((md <= -2000).mean()), 4), "median_max_dd": round(float(np.median(md)), 0), "quantiles": q, "sample_paths": sample, } print("Monte Carlo...") r1 = t1["realized_gross"].values.astype(float) l1 = float(ac1["mean_trades_per_calendar_day"]) ra = ta["realized_gross"].values.astype(float) la = float(aca["mean_trades_per_calendar_day"]) mc_data = { "tier1_6m": mc_sum(mc(r1, l1, H6, NP, 10000, 150, 500), 10000, H6), "tier1_12m": mc_sum(mc(r1, l1, H12, NP, 10000, 150, 500), 10000, H12), "all_6m": mc_sum(mc(ra, la, H6, NP, 10000, 150, 500), 10000, H6), "all_12m": mc_sum(mc(ra, la, H12, NP, 10000, 150, 500), 10000, H12), } data = { "meta": { "strategy": "moo_1100_asym v1.0.1 (sub5_pump SHORT)", "tag": "moo_1100_asym-v1.0.1", "frozen_date": "2026-04-26", "source_dataset": "research_results/asymmetric_moo1100_v2/walker_trades.parquet", "trades_total": int(len(ta)), "trades_tier1": int(len(t1)), "date_min": str(ta["date"].min().date()), "date_max": str(ta["date"].max().date()), "trading_days": int(ta["date"].nunique()), "tiers": TIERS, "stop_pct": STOP_PCT, "target_pct": 7.0, "slip_bps_rt": SLIP_BPS, "max_concurrent": MAX_CON, }, "headline_capacity_sim": cap.to_dict(orient="records"), "agg_tier1": a1, "agg_all": aa, "activity_tier1": ac1, "activity_all": aca, "liquidity_tier1": lq1, "backtest_tier1": bt1, "backtest_all": bta, "monte_carlo": mc_data, "v101_config": { "max_concurrent_positions": 2, "max_short_concurrent": 2, "daily_loss_kill_switch_pct": -3.0, "weekly_drawdown_pause_pct": -8.0, "monthly_drawdown_review_pct": -15.0, "boost_mult_regime": 1.25, "boost_mult_indicator": 1.25, "boost_mult_positioning": 1.15, "hard_skips": [ "FOMC release day", "SPY 5d above plus 3 percent", "Earnings plus minus 1 day", "Halt within 30d lookback", "ADV less than 2M shares", "Leveraged ETF blacklist", ], }, } OUT_DIR.mkdir(parents=True, exist_ok=True) json_path = OUT_DIR / "moo_asym_v101_data.json" with open(json_path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, separators=(",", ":"), default=str) print(f" wrote {json_path} ({json_path.stat().st_size/1024:.1f} KB)") print("DONE")