"""
Stream C: NO-GAP edge candidates for 09:30 -> 11:00 window.
Sample-based validation across cache_intraday_3y.
"""
import pandas as pd
import numpy as np
import os
import json
from pathlib import Path

CACHE = Path('C:/datum-api-examples-main/cache_intraday_3y')
GAPS = Path('C:/datum-api-examples-main/enrichment_data/daily_4yr_gaps.parquet')
OHLCV = Path('C:/datum-api-examples-main/enrichment_data/daily_4yr_ohlcv.parquet')

# Sample diverse tickers (~80 to keep <2 min)
all_files = sorted([f for f in os.listdir(CACHE) if f.endswith('.parquet')])
SAMPLE_TICKERS = all_files[::6][:80]  # every 6th, cap 80
print(f"Sampling {len(SAMPLE_TICKERS)} tickers")

# Load gaps for verification (we won't use as filter, only verify)
gaps = pd.read_parquet(GAPS)
gaps['date'] = pd.to_datetime(gaps['date'])

# Load daily OHLCV for prev-day patterns
daily = pd.read_parquet(OHLCV)
daily['date'] = pd.to_datetime(daily['date'])
daily = daily.sort_values(['ticker', 'date'])
daily['prev_close'] = daily.groupby('ticker')['close'].shift(1)
daily['prev_high'] = daily.groupby('ticker')['close'].shift(1)  # placeholder
daily['ret_5d'] = daily.groupby('ticker')['close'].pct_change(5) * 100
daily['ret_1d'] = daily.groupby('ticker')['close'].pct_change(1) * 100

# Build per-day events table
records = []

for f in SAMPLE_TICKERS:
    tk = f.replace('.parquet', '')
    try:
        df = pd.read_parquet(CACHE / f)
    except Exception:
        continue
    df['date'] = pd.to_datetime(df['date'])
    df['hm'] = df['hm'].astype(str).str.zfill(4)
    # daily merge
    dly = daily[daily['ticker'] == tk][['date', 'open', 'close', 'volume', 'prev_close', 'ret_5d', 'ret_1d']]
    if dly.empty:
        continue

    # group per date
    for date, day_df in df.groupby('date'):
        day_df = day_df.sort_values('hm').reset_index(drop=True)
        # PM bars: 04:00-09:29
        pm = day_df[(day_df['hm'] >= '0400') & (day_df['hm'] < '0930')]
        if pm.empty:
            continue
        # RTH bars
        rth = day_df[day_df['hm'] >= '0930']
        if rth.empty:
            continue
        # Need 09:30 open and 11:00 close
        b0930 = rth[rth['hm'] == '0930']
        b1100 = rth[rth['hm'] == '1100']
        b0935 = rth[rth['hm'] == '0935']
        b0945 = rth[rth['hm'] == '0945']
        if b0930.empty or b1100.empty:
            continue
        open_930 = float(b0930['open'].iloc[0])
        close_1100 = float(b1100['close'].iloc[0])
        ret_1100 = (close_1100 / open_930 - 1) * 100  # 09:30->11:00 in %

        # PM features
        pm_vol = float(pm['vol'].sum())
        pm_first30 = pm[pm['hm'] < '0430']['vol'].sum()
        pm_last15 = pm[pm['hm'] >= '0915']['vol'].sum()
        pm_high = float(pm['high'].max())
        pm_low = float(pm['low'].min())
        pm_close = float(pm['close'].iloc[-1])
        pm_open = float(pm['open'].iloc[0])

        # First 5min RTH
        rth_5m = rth[rth['hm'] < '0935']
        first5_vol = float(rth_5m['vol'].sum())
        first5_high = float(rth_5m['high'].max())
        first5_low = float(rth_5m['low'].min())
        first5_range = (first5_high - first5_low) / open_930 * 100 if open_930 else 0

        # First 15min RTH (09:30-09:45)
        rth_15m = rth[rth['hm'] < '0945']
        first15_vol = float(rth_15m['vol'].sum())
        first15_high = float(rth_15m['high'].max())
        first15_low = float(rth_15m['low'].min())

        # daily merge
        d = dly[dly['date'] == date]
        if d.empty:
            continue
        d = d.iloc[0]
        prev_close = d['prev_close']
        ret_5d = d['ret_5d']
        ret_1d_prev = d['ret_1d']  # actually current day; we want prev
        if prev_close is None or pd.isna(prev_close):
            continue

        # Day of week
        dow = pd.Timestamp(date).dayofweek  # 0=Mon

        records.append({
            'ticker': tk,
            'date': date,
            'open_930': open_930,
            'close_1100': close_1100,
            'ret_1100': ret_1100,
            'pm_vol': pm_vol,
            'pm_first30': pm_first30,
            'pm_last15': pm_last15,
            'pm_high': pm_high,
            'pm_low': pm_low,
            'pm_close': pm_close,
            'pm_open': pm_open,
            'first5_vol': first5_vol,
            'first5_range': first5_range,
            'first5_high': first5_high,
            'first5_low': first5_low,
            'first15_high': first15_high,
            'first15_low': first15_low,
            'prev_close': prev_close,
            'ret_5d': ret_5d,
            'dow': dow,
        })

df = pd.DataFrame(records)
print(f"Total events: {len(df)} across {df['ticker'].nunique()} tickers, {df['date'].nunique()} days")

# Add derived: gap (for VERIFICATION, not filter)
df['gap_pct'] = (df['open_930'] / df['prev_close'] - 1) * 100

# Per-ticker rolling pm_vol average (use 20-event rolling proxy)
df = df.sort_values(['ticker', 'date'])
df['pm_vol_avg20'] = df.groupby('ticker')['pm_vol'].transform(lambda x: x.shift(1).rolling(20, min_periods=5).median())
df['pm_vol_ratio'] = df['pm_vol'] / df['pm_vol_avg20']
df['first5_vol_avg20'] = df.groupby('ticker')['first5_vol'].transform(lambda x: x.shift(1).rolling(20, min_periods=5).median())
df['first5_vol_ratio'] = df['first5_vol'] / df['first5_vol_avg20']
df['first5_range_avg20'] = df.groupby('ticker')['first5_range'].transform(lambda x: x.shift(1).rolling(20, min_periods=5).median())

# pm_late_skew: pm_last15 / pm_first30
df['pm_late_skew'] = df['pm_last15'] / df['pm_first30'].replace(0, np.nan)

# Prev day high test: did 09:30->09:45 hit prev_close * 1.005? we don't have prev_high; use prev_close as proxy for "support"
df['tested_prev_close_up'] = (df['first15_high'] >= df['prev_close']) & (df['open_930'] < df['prev_close'])  # crossed up
df['tested_prev_close_dn'] = (df['first15_low'] <= df['prev_close']) & (df['open_930'] > df['prev_close'])  # crossed down

# Open vs PM VWAP proxy = pm_close
df['open_vs_pm_close'] = (df['open_930'] / df['pm_close'] - 1) * 100

# Save
df.to_parquet('C:/Users/wsu/Downloads/viz/asym_v2_stream_C_events.parquet', index=False)
print("Saved events parquet")

# ===== Validate candidates =====
def metric(sub, side='LONG'):
    if len(sub) < 30:
        return None
    pnl = sub['ret_1100'] if side == 'LONG' else -sub['ret_1100']
    n = len(pnl)
    wr = (pnl > 0).mean() * 100
    mean = pnl.mean()
    std = pnl.std()
    sh = (mean / std * np.sqrt(252)) if std > 0 else 0  # per-trade Sharpe; actual daily ~0.3-0.5x
    return {'N': int(n), 'WR': round(float(wr), 1), 'mean': round(float(mean), 3), 'sh': round(float(sh), 2)}

# C1: Late-PM skew + flat open (no gap dep)
sub = df[(df['pm_late_skew'] > 2) & (df['gap_pct'].abs() < 2) & (df['pm_vol_ratio'] > 1.5)]
c1_long = metric(sub, 'LONG')
c1_short = metric(sub, 'SHORT')

# C2: First5 range explosion (volatility breakout)
sub = df[df['first5_range'] > 2 * df['first5_range_avg20']].dropna(subset=['first5_range_avg20'])
c2_long = metric(sub, 'LONG')
c2_short = metric(sub, 'SHORT')

# C2b: First5 range very large >2% w/o gap
sub = df[(df['first5_range'] > 2.0) & (df['gap_pct'].abs() < 2)]
c2b_long = metric(sub, 'LONG')
c2b_short = metric(sub, 'SHORT')

# C3: First5 strong up move - fade or follow?
df['first5_ret'] = (df['first5_high'] - df['open_930']) / df['open_930'] * 100  # up wick
df['first5_dn'] = (df['open_930'] - df['first5_low']) / df['open_930'] * 100  # dn wick
sub = df[(df['first5_ret'] > 2) & (df['first5_dn'] < 0.5) & (df['gap_pct'].abs() < 2)]
c3_long = metric(sub, 'LONG')  # follow
c3_short = metric(sub, 'SHORT')  # fade

# C4: First5 strong down move (no gap)
sub = df[(df['first5_dn'] > 2) & (df['first5_ret'] < 0.5) & (df['gap_pct'].abs() < 2)]
c4_long = metric(sub, 'LONG')
c4_short = metric(sub, 'SHORT')

# C5: 5d momentum continuation regardless of gap
sub = df[(df['ret_5d'] > 10) & (df['gap_pct'].abs() < 2)]
c5_long = metric(sub, 'LONG')
c5_short = metric(sub, 'SHORT')
sub_d = df[(df['ret_5d'] < -10) & (df['gap_pct'].abs() < 2)]
c5d_long = metric(sub_d, 'LONG')
c5d_short = metric(sub_d, 'SHORT')

# C6: Open vs PM close divergence (open prints way away from PM close)
sub = df[(df['open_vs_pm_close'].abs() > 1.5) & (df['gap_pct'].abs() < 3)]
c6_long_div_up = metric(df[(df['open_vs_pm_close'] > 1.5) & (df['gap_pct'].abs() < 3)], 'SHORT')  # fade up jolt
c6_short_div_dn = metric(df[(df['open_vs_pm_close'] < -1.5) & (df['gap_pct'].abs() < 3)], 'LONG')  # bounce dn jolt

# C7: PM volume DRY (no PM action) -> mean revert RTH?
sub = df[(df['pm_vol_ratio'] < 0.5) & (df['gap_pct'].abs() < 1)]
c7_long = metric(sub, 'LONG')
c7_short = metric(sub, 'SHORT')

# C8: Tested prev_close from below in first 15m -> momo continuation
sub = df[df['tested_prev_close_up'] == True]
c8_long = metric(sub, 'LONG')
c8_short = metric(sub, 'SHORT')
sub2 = df[df['tested_prev_close_dn'] == True]
c8b_long = metric(sub2, 'LONG')
c8b_short = metric(sub2, 'SHORT')

# C9: Day-of-week: Monday flat-open
sub = df[(df['dow'] == 0) & (df['gap_pct'].abs() < 1)]
c9_long = metric(sub, 'LONG')
c9_short = metric(sub, 'SHORT')

# C10: PM huge volume (>3x) on flat-open => institutional positioning
sub = df[(df['pm_vol_ratio'] > 3) & (df['gap_pct'].abs() < 2)]
c10_long = metric(sub, 'LONG')
c10_short = metric(sub, 'SHORT')

results = {
    'C1_late_pm_skew_flat_open': {'long': c1_long, 'short': c1_short},
    'C2_first5_range_explosion_relative': {'long': c2_long, 'short': c2_short},
    'C2b_first5_range_2pct_flat_open': {'long': c2b_long, 'short': c2b_short},
    'C3_first5_strong_up_no_gap': {'long_follow': c3_long, 'short_fade': c3_short},
    'C4_first5_strong_dn_no_gap': {'long_bounce': c4_long, 'short_follow': c4_short},
    'C5_ret5d_strong_up_flat_open': {'long': c5_long, 'short': c5_short},
    'C5d_ret5d_strong_dn_flat_open': {'long': c5d_long, 'short': c5d_short},
    'C6a_open_diverge_up_fade': {'short': c6_long_div_up},
    'C6b_open_diverge_dn_bounce': {'long': c6_short_div_dn},
    'C7_pm_vol_dry_flat': {'long': c7_long, 'short': c7_short},
    'C8a_tested_prev_close_from_below': {'long': c8_long, 'short': c8_short},
    'C8b_tested_prev_close_from_above': {'long': c8b_long, 'short': c8b_short},
    'C9_monday_flat_open': {'long': c9_long, 'short': c9_short},
    'C10_pm_vol_3x_flat_open': {'long': c10_long, 'short': c10_short},
}

print(json.dumps(results, indent=2))
with open('C:/Users/wsu/Downloads/viz/asym_v2_stream_C_raw_metrics.json', 'w') as fh:
    json.dump(results, fh, indent=2)
