""" Sakata Signal Generator 買賣信號產生器 Generates buy/sell signals with entry prices, stop-loss levels, and risk-reward ratios. """ import pandas as pd import numpy as np from datetime import datetime from typing import Dict, List, Optional class SignalGenerator: """Generate trading signals based on detected Sakata patterns.""" def __init__(self, atr_period: int = 14, risk_reward_ratio: float = 2.0): """ Initialize the signal generator. Args: atr_period: Period for ATR calculation (for stop-loss) risk_reward_ratio: Target risk-reward ratio """ self.atr_period = atr_period self.risk_reward_ratio = risk_reward_ratio def calculate_atr(self, df: pd.DataFrame) -> pd.Series: """Calculate Average True Range.""" high = df['High'] low = df['Low'] close = df['Close'] tr1 = high - low tr2 = abs(high - close.shift(1)) tr3 = abs(low - close.shift(1)) tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) atr = tr.rolling(window=self.atr_period).mean() return atr def generate_signals( self, df: pd.DataFrame, detected_patterns: List[Dict], trend_ctx: Dict = None ) -> List[Dict]: """ Generate trading signals from detected patterns. Args: df: OHLC DataFrame detected_patterns: List of detected pattern dicts trend_ctx: 趨勢背景 (v2.1 新增) Returns: List of signal dicts with entry, stop-loss, and target prices """ signals = [] atr = self.calculate_atr(df) for pattern in detected_patterns: date = pattern['date'] if date not in df.index: continue idx = df.index.get_loc(date) current_price = df.iloc[idx]['Close'] current_atr = atr.iloc[idx] if not pd.isna(atr.iloc[idx]) else current_price * 0.02 # Determine signal direction is_bullish = pattern['direction'] == 'bullish' # Calculate entry, stop-loss, and target if is_bullish: # Bullish signal - buy entry_price = current_price * 1.005 # Slightly above close stop_loss = current_price - (current_atr * 1.5) # 1.5x ATR below risk = entry_price - stop_loss target_price = entry_price + (risk * self.risk_reward_ratio) else: # Bearish signal - sell/short entry_price = current_price * 0.995 # Slightly below close stop_loss = current_price + (current_atr * 1.5) # 1.5x ATR above risk = stop_loss - entry_price target_price = entry_price - (risk * self.risk_reward_ratio) # Calculate signal strength (v2.1: 傳入 trend_ctx) strength = self._calculate_signal_strength(pattern, df, idx, trend_ctx) signal = { 'date': date, 'pattern': pattern['name'], 'pattern_en': pattern['english'], 'direction': 'BUY' if is_bullish else 'SELL', 'current_price': round(current_price, 2), 'entry_price': round(entry_price, 2), 'stop_loss': round(stop_loss, 2), 'target_price': round(target_price, 2), 'risk': round(abs(risk), 2), 'reward': round(abs(risk) * self.risk_reward_ratio, 2), 'risk_reward': f"1:{self.risk_reward_ratio}", 'strength': strength, 'strength_stars': '⭐' * strength, 'pattern_type': pattern['type'], 'atr': round(current_atr, 2) } signals.append(signal) # Sort by date (newest first) and strength signals.sort(key=lambda x: (x['date'], -x['strength']), reverse=True) return signals def _calculate_signal_strength( self, pattern: Dict, df: pd.DataFrame, idx: int, trend_ctx: Dict = None ) -> int: """ Calculate signal strength from 1-5. v2.1 改良: 考慮順勢/逆勢權重 - 順勢 (uptrend + BUY 或 downtrend + SELL): +2 - 逆勢 (downtrend + BUY 或 uptrend + SELL): -1 Factors: - Pattern reliability (reversal vs continuation) - Volume confirmation - Trend alignment (v2.1 改良) - Pattern clarity (signal value) """ strength = 3 # Base strength # Pattern type bonus pattern_type = pattern.get('type', '') if pattern_type == 'reversal': strength += 1 # Reversal patterns are stronger signals # Signal clarity raw_strength = pattern.get('strength', 50) if raw_strength >= 100: strength += 1 elif raw_strength <= 30: strength -= 1 # Volume confirmation (if available) if 'Volume' in df.columns and idx > 0: current_vol = df.iloc[idx]['Volume'] avg_vol = df['Volume'].iloc[max(0, idx-20):idx].mean() if current_vol > avg_vol * 1.5: strength += 1 # High volume confirmation # v2.1: 趨勢順逆權重調整 is_bullish = pattern['direction'] == 'bullish' # 使用傳入的 trend_ctx 或計算 MA20 斜率 if trend_ctx and 'trend' in trend_ctx: trend = trend_ctx['trend'] elif idx >= 20: ma20_current = df['Close'].iloc[idx-20:idx].mean() ma20_prev = df['Close'].iloc[idx-25:idx-5].mean() if idx >= 25 else ma20_current if ma20_current > ma20_prev * 1.01: trend = 'uptrend' elif ma20_current < ma20_prev * 0.99: trend = 'downtrend' else: trend = 'sideways' else: trend = 'sideways' # 順勢/逆勢權重 if trend == 'uptrend': if is_bullish: strength += 2 # 順勢多頭: +2 else: strength -= 1 # 逆勢空頭: -1 elif trend == 'downtrend': if not is_bullish: strength += 2 # 順勢空頭: +2 else: strength -= 1 # 逆勢多頭: -1 (這是最危險的!) # sideways: 不調整 return max(1, min(5, strength)) # Clamp to 1-5 def check_consecutive_windows( self, signals: List[Dict], lookback_days: int = 5 ) -> Dict: """ v2.2: 檢測連續同向窗口 (二空/三空)。 二空: 連續 2 個同向缺口 -> 強勢警示 三空: 連續 3 個同向缺口 -> 力竭反轉信號 """ from datetime import timedelta # 篩選窗口型態 window_signals = [ s for s in signals if 'Window' in s.get('pattern_en', '') or '窗口' in s.get('pattern', '') ] if len(window_signals) < 2: return {'count': 0, 'direction': None, 'warning': None} # 依日期排序 (新到舊) window_signals.sort(key=lambda x: x['date'], reverse=True) # 檢測連續同向缺口 (從最新開始) consecutive_up = 0 consecutive_down = 0 current_direction = None max_consecutive_up = 0 max_consecutive_down = 0 for i, sig in enumerate(window_signals[:5]): sig_direction = sig['direction'] if i == 0: # 第一個信號確定方向 current_direction = sig_direction if sig_direction == 'BUY': consecutive_up = 1 else: consecutive_down = 1 else: # 檢查是否同向 if sig_direction == current_direction: if sig_direction == 'BUY': consecutive_up += 1 else: consecutive_down += 1 else: # 方向改變,停止計數 break max_consecutive_up = max(max_consecutive_up, consecutive_up) max_consecutive_down = max(max_consecutive_down, consecutive_down) max_consecutive = max(max_consecutive_up, max_consecutive_down) direction = 'UP' if max_consecutive_up > max_consecutive_down else 'DOWN' warning = None if max_consecutive == 2: if direction == 'UP': warning = '⚠️ 二空上漲中 (強勢軋空,勿追空)' else: warning = '⚠️ 二空下跌中 (強勢殺多,勿追多)' elif max_consecutive >= 3: if direction == 'UP': warning = '🔴 三空力竭 (準備賣出)' else: warning = '🟢 三空力竭 (準備買入)' return { 'count': max_consecutive, 'direction': direction, 'warning': warning } def get_latest_recommendation( self, signals: List[Dict], current_price: float = None, current_date = None, lookback_days: int = 3 ) -> Optional[Dict]: """ v2.2 實戰邏輯: 取得最新交易建議。 改良重點: 1. 時效性: 超過 3 天的信號標記為「已過期」 2. 停損檢查: 已觸發停損的標記為「觀望」 3. 二空/三空: 連續窗口序列特殊處理 Args: signals: List of all signals current_price: 當前股價 current_date: 當前日期 lookback_days: 信號有效天數 (預設 3 天) Returns: Recommendation dict with status """ from datetime import datetime, timedelta if not signals: return None # 設定當前日期 if current_date is None: current_date = datetime.now() elif hasattr(current_date, 'to_pydatetime'): current_date = current_date.to_pydatetime() # 依日期排序 (新到舊) sorted_signals = sorted(signals, key=lambda x: x['date'], reverse=True) if not sorted_signals: return None # 取最新信號 latest_signal = sorted_signals[0] signal_date = latest_signal['date'] if hasattr(signal_date, 'to_pydatetime'): signal_date = signal_date.to_pydatetime() elif isinstance(signal_date, str): signal_date = datetime.strptime(signal_date[:10], '%Y-%m-%d') # 計算天數差 days_diff = (current_date - signal_date).days if hasattr(current_date, 'days') == False else 0 try: days_diff = abs((current_date - signal_date).days) except: days_diff = 0 # ======================================== # 1. 時效性檢查 # ======================================== is_expired = days_diff > lookback_days # ======================================== # 2. 停損觸發檢查 # ======================================== stop_loss_triggered = False if current_price is not None: if latest_signal['direction'] == 'SELL': # 空單: 股價 > 停損價 = 已停損 stop_loss_triggered = current_price > latest_signal['stop_loss'] else: # 多單: 股價 < 停損價 = 已停損 stop_loss_triggered = current_price < latest_signal['stop_loss'] # ======================================== # 3. 連續窗口檢查 (二空/三空) # ======================================== window_status = self.check_consecutive_windows(signals) # ======================================== # 4. 決定狀態和建議 # ======================================== status = 'ACTIVE' status_reason = None recommendation = latest_signal['direction'] if stop_loss_triggered: status = 'STOPPED_OUT' status_reason = f"股價 ${current_price:.2f} 已觸發停損 ${latest_signal['stop_loss']:.2f}" recommendation = 'HOLD' elif is_expired: status = 'EXPIRED' status_reason = f"信號已過期 ({days_diff} 天前)" recommendation = 'HOLD' elif window_status['warning']: status = 'SPECIAL' status_reason = window_status['warning'] # 三空力竭時反向操作 if window_status['count'] >= 3: recommendation = 'SELL' if window_status['direction'] == 'UP' else 'BUY' return { 'recommendation': recommendation, 'pattern': latest_signal['pattern'], 'entry': latest_signal['entry_price'], 'stop_loss': latest_signal['stop_loss'], 'target': latest_signal['target_price'], 'strength': latest_signal['strength_stars'], 'date': latest_signal['date'], 'status': status, 'status_reason': status_reason, 'days_since_signal': days_diff, 'window_status': window_status } def summarize_signals(self, signals: List[Dict]) -> Dict: """ Create a summary of all signals. """ if not signals: return { 'total': 0, 'bullish': 0, 'bearish': 0, 'bias': 'NEUTRAL', 'avg_strength': 0 } bullish = [s for s in signals if s['direction'] == 'BUY'] bearish = [s for s in signals if s['direction'] == 'SELL'] avg_strength = sum(s['strength'] for s in signals) / len(signals) if len(bullish) > len(bearish) * 1.5: bias = 'BULLISH' elif len(bearish) > len(bullish) * 1.5: bias = 'BEARISH' else: bias = 'NEUTRAL' return { 'total': len(signals), 'bullish': len(bullish), 'bearish': len(bearish), 'bias': bias, 'avg_strength': round(avg_strength, 1) }