| """ |
| volume_analysis.py — Volume & order flow with absorption detection, |
| multi-bar breakout confirmation, and fake breakout identification. |
| |
| Key fixes vs prior version: |
| - Absorption detection: high-volume small-body bars at resistance = institutional selling |
| - Multi-bar breakout confirmation (BREAKOUT_CONFIRMATION_BARS) before firing signal |
| - ATR buffer on breakout level (price must exceed level by N*ATR, not just 1 tick) |
| - OBV slope computed over configurable window, normalized vs rolling stddev |
| - Climax threshold lowered (3.0x) and now triggers a hard absorption check |
| - Failed retest detection: breakout that closes back below the level = fake |
| """ |
|
|
| from typing import Dict, Any |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| from config import ( |
| VOLUME_MA_PERIOD, |
| VOLUME_SPIKE_MULT, |
| VOLUME_CLIMAX_MULT, |
| VOLUME_WEAK_THRESHOLD, |
| BREAKOUT_LOOKBACK, |
| BREAKOUT_ATR_BUFFER, |
| BREAKOUT_CONFIRMATION_BARS, |
| BREAKOUT_RETEST_BARS, |
| ABSORPTION_WICK_RATIO, |
| ABSORPTION_VOL_MULT, |
| ABSORPTION_BODY_RATIO, |
| OBV_SLOPE_BARS, |
| ATR_PERIOD, |
| ) |
|
|
|
|
| def compute_volume_ma(df: pd.DataFrame, period: int = VOLUME_MA_PERIOD) -> pd.Series: |
| return df["volume"].rolling(period).mean() |
|
|
|
|
| def detect_spikes(df: pd.DataFrame, vol_ma: pd.Series) -> pd.Series: |
| return df["volume"] > vol_ma * VOLUME_SPIKE_MULT |
|
|
|
|
| def detect_climax(df: pd.DataFrame, vol_ma: pd.Series) -> pd.Series: |
| return df["volume"] > vol_ma * VOLUME_CLIMAX_MULT |
|
|
|
|
| def detect_absorption(df: pd.DataFrame, vol_ma: pd.Series) -> pd.Series: |
| """ |
| Absorption = high-volume bar with small body and large upper wick, |
| occurring near recent highs (institutional supply absorbing retail demand). |
| |
| Conditions (all must be true): |
| - Volume > ABSORPTION_VOL_MULT * MA |
| - Body / range < ABSORPTION_BODY_RATIO (small real body) |
| - Upper wick / range > ABSORPTION_WICK_RATIO (large upper wick) |
| - Close is in lower half of the bar's range (sellers won the bar) |
| """ |
| bar_range = (df["high"] - df["low"]).replace(0, np.nan) |
| body = (df["close"] - df["open"]).abs() |
| upper_wick = df["high"] - df[["close", "open"]].max(axis=1) |
|
|
| body_ratio = body / bar_range |
| wick_ratio = upper_wick / bar_range |
| close_in_lower_half = df["close"] < (df["low"] + bar_range * 0.5) |
|
|
| high_volume = df["volume"] > vol_ma * ABSORPTION_VOL_MULT |
| small_body = body_ratio < ABSORPTION_BODY_RATIO |
| large_wick = wick_ratio > ABSORPTION_WICK_RATIO |
|
|
| return high_volume & small_body & large_wick & close_in_lower_half |
|
|
|
|
| def compute_obv(df: pd.DataFrame) -> pd.Series: |
| direction = np.sign(df["close"].diff()).fillna(0) |
| return (df["volume"] * direction).cumsum() |
|
|
|
|
| def compute_obv_slope(obv: pd.Series, bars: int = OBV_SLOPE_BARS) -> pd.Series: |
| """ |
| OBV slope normalized by rolling stddev of OBV to make it comparable |
| across different price scales. Values > 1 = strong upward flow. |
| """ |
| x = np.arange(bars) |
|
|
| def slope_normalized(window): |
| if len(window) < bars: |
| return np.nan |
| s = np.polyfit(x, window, 1)[0] |
| std = np.std(window) |
| return s / std if std > 0 else 0.0 |
|
|
| return obv.rolling(bars).apply(slope_normalized, raw=True) |
|
|
|
|
| def compute_delta_approx(df: pd.DataFrame) -> pd.Series: |
| body = df["close"] - df["open"] |
| wick = (df["high"] - df["low"]).replace(0, np.nan) |
| buy_ratio = ((body / wick) * 0.5 + 0.5).clip(0.0, 1.0).fillna(0.5) |
| return df["volume"] * buy_ratio - df["volume"] * (1 - buy_ratio) |
|
|
|
|
| def compute_vwap_deviation(df: pd.DataFrame, period: int = VOLUME_MA_PERIOD) -> pd.Series: |
| typical = (df["high"] + df["low"] + df["close"]) / 3 |
| cum_vp = (typical * df["volume"]).rolling(period).sum() |
| cum_vol = df["volume"].rolling(period).sum().replace(0, np.nan) |
| vwap = cum_vp / cum_vol |
| atr_approx = (df["high"] - df["low"]).rolling(ATR_PERIOD).mean().replace(0, np.nan) |
| return (df["close"] - vwap) / atr_approx |
|
|
|
|
| def compute_confirmed_breakout( |
| df: pd.DataFrame, |
| atr_series: pd.Series, |
| vol_ma: pd.Series, |
| lookback: int = BREAKOUT_LOOKBACK, |
| confirm_bars: int = BREAKOUT_CONFIRMATION_BARS, |
| atr_buffer: float = BREAKOUT_ATR_BUFFER, |
| ) -> pd.Series: |
| """ |
| Genuine breakout requires ALL of: |
| 1. Close exceeds prior N-bar high/low by at least atr_buffer * ATR |
| 2. Close holds above/below that level for confirm_bars consecutive bars |
| 3. Volume spike on at least one of the confirmation bars |
| 4. No absorption signal on the breakout bar or confirmation bars |
| |
| Returns: +1 confirmed bull breakout, -1 confirmed bear, 0 none |
| """ |
| prior_high = df["high"].rolling(lookback).max().shift(lookback) |
| prior_low = df["low"].rolling(lookback).min().shift(lookback) |
| spike = detect_spikes(df, vol_ma) |
| absorption = detect_absorption(df, vol_ma) |
|
|
| |
| cleared_up = df["close"] > prior_high + atr_series * atr_buffer |
| cleared_dn = df["close"] < prior_low - atr_series * atr_buffer |
|
|
| |
| held_up = cleared_up.rolling(confirm_bars).min().fillna(0).astype(bool) |
| held_dn = cleared_dn.rolling(confirm_bars).min().fillna(0).astype(bool) |
|
|
| |
| vol_ok = spike.rolling(confirm_bars).max().fillna(0).astype(bool) |
|
|
| |
| no_absorption = (~absorption).rolling(confirm_bars).min().fillna(1).astype(bool) |
|
|
| signal = pd.Series(0, index=df.index) |
| signal[held_up & vol_ok & no_absorption] = 1 |
| signal[held_dn & vol_ok & no_absorption] = -1 |
| return signal |
|
|
|
|
| def detect_failed_breakout( |
| df: pd.DataFrame, |
| breakout_series: pd.Series, |
| atr_series: pd.Series, |
| retest_bars: int = BREAKOUT_RETEST_BARS, |
| ) -> pd.Series: |
| """ |
| A breakout that closes back below/above the breakout level within |
| retest_bars is flagged as a failed (fake) breakout. |
| Returns: True where a prior confirmed breakout has since failed. |
| """ |
| prior_high = df["high"].rolling(BREAKOUT_LOOKBACK).max().shift(BREAKOUT_LOOKBACK) |
| prior_low = df["low"].rolling(BREAKOUT_LOOKBACK).min().shift(BREAKOUT_LOOKBACK) |
|
|
| had_bull_bo = breakout_series.shift(1).rolling(retest_bars).max().fillna(0) > 0 |
| had_bear_bo = breakout_series.shift(1).rolling(retest_bars).min().fillna(0) < 0 |
|
|
| |
| bull_failed = had_bull_bo & (df["close"] < prior_high.shift(retest_bars)) |
| bear_failed = had_bear_bo & (df["close"] > prior_low.shift(retest_bars)) |
|
|
| return bull_failed | bear_failed |
|
|
|
|
| def analyze_volume(df: pd.DataFrame, atr_series: pd.Series = None) -> Dict[str, Any]: |
| if atr_series is None: |
| |
| high, low, prev_close = df["high"], df["low"], df["close"].shift(1) |
| tr = pd.concat( |
| [high - low, (high - prev_close).abs(), (low - prev_close).abs()], |
| axis=1, |
| ).max(axis=1) |
| atr_series = tr.ewm(alpha=1.0 / ATR_PERIOD, adjust=False).mean() |
|
|
| vol_ma = compute_volume_ma(df, VOLUME_MA_PERIOD) |
| spike_series = detect_spikes(df, vol_ma) |
| climax_series = detect_climax(df, vol_ma) |
| absorption_series = detect_absorption(df, vol_ma) |
| obv = compute_obv(df) |
| obv_slope_series = compute_obv_slope(obv, OBV_SLOPE_BARS) |
| delta = compute_delta_approx(df) |
| vwap_dev = compute_vwap_deviation(df, VOLUME_MA_PERIOD) |
|
|
| breakout_series = compute_confirmed_breakout( |
| df, atr_series, vol_ma, |
| lookback=BREAKOUT_LOOKBACK, |
| confirm_bars=BREAKOUT_CONFIRMATION_BARS, |
| atr_buffer=BREAKOUT_ATR_BUFFER, |
| ) |
| failed_breakout_series = detect_failed_breakout(df, breakout_series, atr_series) |
|
|
| last_vol = float(df["volume"].iloc[-1]) |
| last_vol_ma = float(vol_ma.iloc[-1]) if not np.isnan(vol_ma.iloc[-1]) else 1.0 |
| last_spike = bool(spike_series.iloc[-1]) |
| last_climax = bool(climax_series.iloc[-1]) |
| last_absorption = bool(absorption_series.iloc[-1]) |
| last_breakout = int(breakout_series.iloc[-1]) |
| last_failed_bo = bool(failed_breakout_series.iloc[-1]) |
| last_obv_slope = float(obv_slope_series.iloc[-1]) if not np.isnan(obv_slope_series.iloc[-1]) else 0.0 |
| last_vwap_dev = float(vwap_dev.iloc[-1]) if not np.isnan(vwap_dev.iloc[-1]) else 0.0 |
|
|
| vol_ratio = last_vol / last_vol_ma if last_vol_ma > 0 else 1.0 |
| weak_vol = vol_ratio < VOLUME_WEAK_THRESHOLD |
|
|
| delta_5 = float(delta.iloc[-5:].sum()) |
| delta_sign = 1 if delta_5 > 0 else -1 |
|
|
| |
| recent_failed = int(failed_breakout_series.iloc[-10:].sum()) |
|
|
| |
| if last_absorption: |
| |
| base_score = 0.15 |
| elif last_climax: |
| base_score = 0.25 |
| elif last_breakout != 0 and not last_failed_bo: |
| base_score = 1.0 |
| elif last_breakout != 0 and last_failed_bo: |
| base_score = 0.20 |
| elif last_spike and not last_absorption: |
| base_score = 0.60 |
| elif vol_ratio >= 1.2: |
| base_score = 0.45 |
| elif vol_ratio >= 0.8: |
| base_score = 0.30 |
| else: |
| base_score = 0.10 |
|
|
| |
| obv_bonus = float(np.clip(last_obv_slope * 0.08, -0.12, 0.12)) |
|
|
| |
| vwap_bonus = 0.05 if (last_vwap_dev > 0 and last_breakout == 1) else 0.0 |
| vwap_bonus += 0.05 if (last_vwap_dev < 0 and last_breakout == -1) else 0.0 |
|
|
| |
| fake_penalty = min(0.20, recent_failed * 0.05) |
|
|
| volume_score = float(np.clip(base_score + obv_bonus + vwap_bonus - fake_penalty, 0.0, 1.0)) |
|
|
| return { |
| "vol_ratio": round(vol_ratio, 3), |
| "spike": last_spike, |
| "climax": last_climax, |
| "absorption": last_absorption, |
| "weak": weak_vol, |
| "breakout": last_breakout, |
| "failed_breakout": last_failed_bo, |
| "recent_failed_count": recent_failed, |
| "obv_slope_norm": round(last_obv_slope, 4), |
| "delta_sum_5": round(delta_5, 2), |
| "delta_sign": delta_sign, |
| "vwap_deviation": round(last_vwap_dev, 4), |
| "volume_score": round(volume_score, 4), |
| "spike_series": spike_series, |
| "climax_series": climax_series, |
| "absorption_series": absorption_series, |
| "breakout_series": breakout_series, |
| "failed_breakout_series": failed_breakout_series, |
| } |
|
|