|
|
|
|
|
"""
|
|
|
Market Regime Detection System
|
|
|
Identifies trending vs ranging markets and provides regime-adaptive trading parameters
|
|
|
"""
|
|
|
|
|
|
import pandas as pd
|
|
|
import numpy as np
|
|
|
from enum import Enum
|
|
|
from typing import Dict, List, Tuple, Optional
|
|
|
import warnings
|
|
|
warnings.filterwarnings('ignore')
|
|
|
|
|
|
class MarketRegime(Enum):
|
|
|
"""Market regime classifications"""
|
|
|
STRONG_BULL = "strong_bull"
|
|
|
BULL_TREND = "bull_trend"
|
|
|
BEAR_TREND = "bear_trend"
|
|
|
STRONG_BEAR = "strong_bear"
|
|
|
RANGING = "ranging"
|
|
|
HIGH_VOLATILITY = "high_volatility"
|
|
|
LOW_VOLATILITY = "low_volatility"
|
|
|
|
|
|
class RegimeParameters:
|
|
|
"""Trading parameters optimized for each market regime"""
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
|
self.regime_params = {
|
|
|
MarketRegime.STRONG_BULL: {
|
|
|
'profit_targets': [0.015, 0.03, 0.06, 0.12],
|
|
|
'trailing_stop_pct': 0.03,
|
|
|
'position_size_multiplier': 1.5,
|
|
|
'max_holding_time': 48,
|
|
|
'breakeven_trigger': 0.02,
|
|
|
'min_confidence_threshold': 0.25,
|
|
|
},
|
|
|
MarketRegime.BULL_TREND: {
|
|
|
'profit_targets': [0.012, 0.025, 0.05, 0.10],
|
|
|
'trailing_stop_pct': 0.025,
|
|
|
'position_size_multiplier': 1.2,
|
|
|
'max_holding_time': 36,
|
|
|
'breakeven_trigger': 0.018,
|
|
|
'min_confidence_threshold': 0.3,
|
|
|
},
|
|
|
MarketRegime.BEAR_TREND: {
|
|
|
'profit_targets': [0.012, 0.025, 0.05, 0.10],
|
|
|
'trailing_stop_pct': 0.025,
|
|
|
'position_size_multiplier': 1.2,
|
|
|
'max_holding_time': 36,
|
|
|
'breakeven_trigger': 0.018,
|
|
|
'min_confidence_threshold': 0.3,
|
|
|
},
|
|
|
MarketRegime.STRONG_BEAR: {
|
|
|
'profit_targets': [0.015, 0.03, 0.06, 0.12],
|
|
|
'trailing_stop_pct': 0.03,
|
|
|
'position_size_multiplier': 1.5,
|
|
|
'max_holding_time': 48,
|
|
|
'breakeven_trigger': 0.02,
|
|
|
'min_confidence_threshold': 0.25,
|
|
|
},
|
|
|
MarketRegime.RANGING: {
|
|
|
'profit_targets': [0.008, 0.015, 0.03, 0.06],
|
|
|
'trailing_stop_pct': 0.02,
|
|
|
'position_size_multiplier': 0.7,
|
|
|
'max_holding_time': 12,
|
|
|
'breakeven_trigger': 0.012,
|
|
|
'min_confidence_threshold': 0.4,
|
|
|
},
|
|
|
MarketRegime.HIGH_VOLATILITY: {
|
|
|
'profit_targets': [0.02, 0.04, 0.08, 0.15],
|
|
|
'trailing_stop_pct': 0.04,
|
|
|
'position_size_multiplier': 0.6,
|
|
|
'max_holding_time': 8,
|
|
|
'breakeven_trigger': 0.025,
|
|
|
'min_confidence_threshold': 0.5,
|
|
|
},
|
|
|
MarketRegime.LOW_VOLATILITY: {
|
|
|
'profit_targets': [0.005, 0.01, 0.02, 0.04],
|
|
|
'trailing_stop_pct': 0.015,
|
|
|
'position_size_multiplier': 1.0,
|
|
|
'max_holding_time': 24,
|
|
|
'breakeven_trigger': 0.008,
|
|
|
'min_confidence_threshold': 0.35,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
def get_parameters(self, regime: MarketRegime) -> Dict:
|
|
|
"""Get trading parameters for a specific regime"""
|
|
|
return self.regime_params.get(regime, self.regime_params[MarketRegime.RANGING])
|
|
|
|
|
|
class MarketRegimeDetector:
|
|
|
"""
|
|
|
Advanced market regime detection using multiple indicators
|
|
|
"""
|
|
|
|
|
|
def __init__(self, lookback_periods: List[int] = [20, 50, 100]):
|
|
|
self.lookback_periods = lookback_periods
|
|
|
self.regime_history = []
|
|
|
self.parameters = RegimeParameters()
|
|
|
|
|
|
def detect_regime(self, price_data: pd.DataFrame, current_idx: int = -1) -> Tuple[MarketRegime, Dict]:
|
|
|
"""
|
|
|
Detect current market regime using comprehensive analysis
|
|
|
|
|
|
Args:
|
|
|
price_data: DataFrame with OHLC and technical indicators
|
|
|
current_idx: Current index in the data (default: latest)
|
|
|
|
|
|
Returns:
|
|
|
Tuple of (detected_regime, regime_parameters)
|
|
|
"""
|
|
|
if len(price_data) < max(self.lookback_periods):
|
|
|
return MarketRegime.RANGING, self.parameters.get_parameters(MarketRegime.RANGING)
|
|
|
|
|
|
|
|
|
if current_idx == -1:
|
|
|
current_idx = len(price_data) - 1
|
|
|
|
|
|
|
|
|
trend_strength = self._calculate_trend_strength(price_data, current_idx)
|
|
|
volatility = self._calculate_volatility(price_data, current_idx)
|
|
|
momentum = self._calculate_momentum(price_data, current_idx)
|
|
|
volume_trend = self._calculate_volume_trend(price_data, current_idx)
|
|
|
|
|
|
|
|
|
regime = self._classify_regime(trend_strength, volatility, momentum, volume_trend)
|
|
|
|
|
|
|
|
|
regime_params = self.parameters.get_parameters(regime)
|
|
|
|
|
|
|
|
|
self.regime_history.append({
|
|
|
'timestamp': price_data.index[current_idx] if hasattr(price_data.index, '__getitem__') else current_idx,
|
|
|
'regime': regime,
|
|
|
'trend_strength': trend_strength,
|
|
|
'volatility': volatility,
|
|
|
'momentum': momentum
|
|
|
})
|
|
|
|
|
|
return regime, regime_params
|
|
|
|
|
|
def _calculate_trend_strength(self, data: pd.DataFrame, idx: int) -> float:
|
|
|
"""Calculate trend strength using ADX-like indicator"""
|
|
|
if 'High' not in data.columns or 'Low' not in data.columns or 'Close' not in data.columns:
|
|
|
|
|
|
if 'Close' in data.columns:
|
|
|
prices = data['Close'].iloc[max(0, idx-50):idx+1]
|
|
|
if len(prices) > 10:
|
|
|
|
|
|
x = np.arange(len(prices))
|
|
|
slope = np.polyfit(x, prices.values, 1)[0]
|
|
|
return abs(slope) / prices.std()
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
high = data['High'].iloc[max(0, idx-50):idx+1]
|
|
|
low = data['Low'].iloc[max(0, idx-50):idx+1]
|
|
|
close = data['Close'].iloc[max(0, idx-50):idx+1]
|
|
|
|
|
|
tr = np.maximum(high - low,
|
|
|
np.maximum(abs(high - close.shift(1)),
|
|
|
abs(low - close.shift(1))))
|
|
|
|
|
|
|
|
|
dm_plus = np.where((high - high.shift(1)) > (low.shift(1) - low),
|
|
|
np.maximum(high - high.shift(1), 0), 0)
|
|
|
dm_minus = np.where((low.shift(1) - low) > (high - high.shift(1)),
|
|
|
np.maximum(low.shift(1) - low, 0), 0)
|
|
|
|
|
|
|
|
|
dm_plus_series = pd.Series(dm_plus, index=high.index)
|
|
|
dm_minus_series = pd.Series(dm_minus, index=high.index)
|
|
|
tr_series = pd.Series(tr, index=high.index)
|
|
|
|
|
|
|
|
|
atr = tr_series.rolling(14).mean()
|
|
|
di_plus = 100 * (dm_plus_series.rolling(14).mean() / atr)
|
|
|
di_minus = 100 * (dm_minus_series.rolling(14).mean() / atr)
|
|
|
|
|
|
|
|
|
dx = 100 * abs(di_plus - di_minus) / (di_plus + di_minus)
|
|
|
adx = dx.rolling(14).mean()
|
|
|
|
|
|
return adx.iloc[-1] if not adx.empty else 0.0
|
|
|
|
|
|
def _calculate_volatility(self, data: pd.DataFrame, idx: int) -> float:
|
|
|
"""Calculate normalized volatility"""
|
|
|
if 'Close' not in data.columns:
|
|
|
return 0.0
|
|
|
|
|
|
prices = data['Close'].iloc[max(0, idx-50):idx+1]
|
|
|
if len(prices) < 10:
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
returns = prices.pct_change().dropna()
|
|
|
|
|
|
|
|
|
sma = prices.rolling(20).mean()
|
|
|
std = prices.rolling(20).std()
|
|
|
bb_width = 2 * std / sma
|
|
|
bb_volatility = bb_width.iloc[-1] if not bb_width.empty else 0
|
|
|
|
|
|
|
|
|
if 'High' in data.columns and 'Low' in data.columns:
|
|
|
high = data['High'].iloc[max(0, idx-20):idx+1]
|
|
|
low = data['Low'].iloc[max(0, idx-20):idx+1]
|
|
|
close = data['Close'].iloc[max(0, idx-20):idx+1]
|
|
|
|
|
|
tr = np.maximum(high - low,
|
|
|
np.maximum(abs(high - close.shift(1)),
|
|
|
abs(low - close.shift(1))))
|
|
|
atr = tr.rolling(14).mean()
|
|
|
atr_volatility = atr.iloc[-1] / prices.iloc[-1] if not atr.empty else 0
|
|
|
else:
|
|
|
atr_volatility = 0
|
|
|
|
|
|
|
|
|
ret_volatility = returns.std() * np.sqrt(252)
|
|
|
|
|
|
|
|
|
combined_volatility = (bb_volatility * 0.4 + atr_volatility * 0.4 + ret_volatility * 0.2)
|
|
|
|
|
|
return combined_volatility
|
|
|
|
|
|
def _calculate_momentum(self, data: pd.DataFrame, idx: int) -> float:
|
|
|
"""Calculate momentum indicators"""
|
|
|
if 'Close' not in data.columns:
|
|
|
return 0.0
|
|
|
|
|
|
prices = data['Close'].iloc[max(0, idx-50):idx+1]
|
|
|
if len(prices) < 20:
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
delta = prices.diff()
|
|
|
gain = (delta.where(delta > 0, 0)).rolling(14).mean()
|
|
|
loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
|
|
|
rs = gain / loss
|
|
|
rsi = 100 - (100 / (1 + rs))
|
|
|
rsi_value = rsi.iloc[-1] if not rsi.empty else 50
|
|
|
|
|
|
|
|
|
ema12 = prices.ewm(span=12).mean()
|
|
|
ema26 = prices.ewm(span=26).mean()
|
|
|
macd = ema12 - ema26
|
|
|
macd_signal = macd.ewm(span=9).mean()
|
|
|
macd_momentum = (macd - macd_signal).iloc[-1] if not macd.empty else 0
|
|
|
|
|
|
|
|
|
rsi_momentum = (rsi_value - 50) / 50
|
|
|
|
|
|
|
|
|
combined_momentum = (rsi_momentum * 0.6 + np.sign(macd_momentum) * 0.4)
|
|
|
|
|
|
return combined_momentum
|
|
|
|
|
|
def _calculate_volume_trend(self, data: pd.DataFrame, idx: int) -> float:
|
|
|
"""Calculate volume trend if volume data is available"""
|
|
|
if 'Volume' not in data.columns:
|
|
|
return 0.0
|
|
|
|
|
|
volume = data['Volume'].iloc[max(0, idx-50):idx+1]
|
|
|
if len(volume) < 20:
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
volume_sma = volume.rolling(20).mean()
|
|
|
volume_trend = (volume.iloc[-1] - volume_sma.iloc[-1]) / volume_sma.iloc[-1]
|
|
|
|
|
|
return volume_trend
|
|
|
|
|
|
def _classify_regime(self, trend_strength: float, volatility: float,
|
|
|
momentum: float, volume_trend: float) -> MarketRegime:
|
|
|
"""
|
|
|
Classify market regime based on calculated indicators
|
|
|
"""
|
|
|
|
|
|
|
|
|
HIGH_VOL_THRESHOLD = 0.05
|
|
|
LOW_VOL_THRESHOLD = 0.015
|
|
|
|
|
|
|
|
|
STRONG_TREND_THRESHOLD = 25
|
|
|
MODERATE_TREND_THRESHOLD = 20
|
|
|
|
|
|
|
|
|
BULL_MOMENTUM_THRESHOLD = 0.3
|
|
|
BEAR_MOMENTUM_THRESHOLD = -0.3
|
|
|
|
|
|
|
|
|
if volatility > HIGH_VOL_THRESHOLD:
|
|
|
return MarketRegime.HIGH_VOLATILITY
|
|
|
|
|
|
|
|
|
if volatility < LOW_VOL_THRESHOLD:
|
|
|
return MarketRegime.LOW_VOLATILITY
|
|
|
|
|
|
|
|
|
if trend_strength > STRONG_TREND_THRESHOLD:
|
|
|
if momentum > BULL_MOMENTUM_THRESHOLD:
|
|
|
return MarketRegime.STRONG_BULL
|
|
|
elif momentum < BEAR_MOMENTUM_THRESHOLD:
|
|
|
return MarketRegime.STRONG_BEAR
|
|
|
|
|
|
|
|
|
if trend_strength > MODERATE_TREND_THRESHOLD:
|
|
|
if momentum > 0.1:
|
|
|
return MarketRegime.BULL_TREND
|
|
|
elif momentum < -0.1:
|
|
|
return MarketRegime.BEAR_TREND
|
|
|
|
|
|
|
|
|
return MarketRegime.RANGING
|
|
|
|
|
|
def get_regime_statistics(self) -> Dict:
|
|
|
"""Get statistics about regime detection performance"""
|
|
|
if not self.regime_history:
|
|
|
return {}
|
|
|
|
|
|
df = pd.DataFrame(self.regime_history)
|
|
|
|
|
|
stats = {
|
|
|
'total_observations': len(df),
|
|
|
'regime_distribution': df['regime'].value_counts().to_dict(),
|
|
|
'avg_trend_strength': df['trend_strength'].mean(),
|
|
|
'avg_volatility': df['volatility'].mean(),
|
|
|
'avg_momentum': df['momentum'].mean(),
|
|
|
'regime_transitions': self._calculate_regime_transitions(df)
|
|
|
}
|
|
|
|
|
|
return stats
|
|
|
|
|
|
def _calculate_regime_transitions(self, df: pd.DataFrame) -> Dict:
|
|
|
"""Calculate regime transition frequencies"""
|
|
|
transitions = {}
|
|
|
regimes = df['regime'].values
|
|
|
|
|
|
for i in range(1, len(regimes)):
|
|
|
from_regime = regimes[i-1]
|
|
|
to_regime = regimes[i]
|
|
|
key = f"{from_regime.value}_to_{to_regime.value}"
|
|
|
transitions[key] = transitions.get(key, 0) + 1
|
|
|
|
|
|
return transitions
|
|
|
|
|
|
def get_optimal_parameters_for_regime(self, regime: MarketRegime) -> Dict:
|
|
|
"""Get optimal trading parameters for a specific regime"""
|
|
|
return self.parameters.get_parameters(regime)
|
|
|
|
|
|
def create_sample_regime_analysis():
|
|
|
"""Create a sample analysis showing regime detection in action"""
|
|
|
print("🧠 MARKET REGIME DETECTION SYSTEM")
|
|
|
print("=" * 50)
|
|
|
|
|
|
|
|
|
np.random.seed(42)
|
|
|
n_points = 500
|
|
|
|
|
|
|
|
|
base_price = 2000
|
|
|
prices = [base_price]
|
|
|
|
|
|
for i in range(1, n_points):
|
|
|
|
|
|
if i < 100:
|
|
|
volatility = 0.005
|
|
|
trend = 0.0001
|
|
|
elif i < 200:
|
|
|
volatility = 0.02
|
|
|
trend = 0.0005
|
|
|
elif i < 350:
|
|
|
volatility = 0.015
|
|
|
trend = 0.002
|
|
|
else:
|
|
|
volatility = 0.012
|
|
|
trend = -0.0015
|
|
|
|
|
|
|
|
|
momentum = 0.1 * (prices[-1] - prices[-2]) / prices[-2] if len(prices) > 1 else 0
|
|
|
|
|
|
|
|
|
price_change = trend + np.random.normal(0, volatility) + momentum
|
|
|
new_price = prices[-1] * (1 + price_change)
|
|
|
prices.append(max(1800, min(2200, new_price)))
|
|
|
|
|
|
|
|
|
data = pd.DataFrame({
|
|
|
'Close': prices,
|
|
|
'High': [p * (1 + np.random.uniform(0, 0.005)) for p in prices],
|
|
|
'Low': [p * (1 - np.random.uniform(0, 0.005)) for p in prices],
|
|
|
'Volume': np.random.randint(1000, 10000, n_points)
|
|
|
})
|
|
|
|
|
|
|
|
|
detector = MarketRegimeDetector()
|
|
|
|
|
|
|
|
|
regimes = []
|
|
|
regime_params = []
|
|
|
|
|
|
print("\n🔍 Analyzing market regimes...")
|
|
|
for i in range(50, len(data)):
|
|
|
regime, params = detector.detect_regime(data, i)
|
|
|
regimes.append(regime)
|
|
|
regime_params.append(params)
|
|
|
|
|
|
if i % 100 == 0:
|
|
|
print(f"Processed {i}/{len(data)} data points...")
|
|
|
|
|
|
|
|
|
regime_counts = pd.Series([r.value for r in regimes]).value_counts()
|
|
|
print("\n📊 Regime Distribution:")
|
|
|
for regime, count in regime_counts.items():
|
|
|
pct = count / len(regimes) * 100
|
|
|
print(f" {regime}: {count} ({pct:.1f}%)")
|
|
|
|
|
|
|
|
|
print("\n🎯 Regime-Specific Parameters:")
|
|
|
unique_regimes = set(regimes)
|
|
|
for regime in unique_regimes:
|
|
|
params = detector.get_optimal_parameters_for_regime(regime)
|
|
|
print(f"\n{regime.value.upper()}:")
|
|
|
print(f" Profit Targets: {[f'{t*100:.1f}%' for t in params['profit_targets']]}")
|
|
|
print(f" Trailing Stop: {params['trailing_stop_pct']*100:.1f}%")
|
|
|
print(f" Position Size Multiplier: {params['position_size_multiplier']:.1f}x")
|
|
|
print(f" Max Holding Time: {params['max_holding_time']} hours")
|
|
|
print(f" Breakeven Trigger: {params['breakeven_trigger']*100:.1f}%")
|
|
|
|
|
|
print("\n✅ Market regime detection system ready!")
|
|
|
print("This will significantly improve trading performance by adapting to market conditions.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
create_sample_regime_analysis() |