stock-flow/.agent/skills/sakata/scripts/signal_generator.py

416 lines
14 KiB
Python

"""
Sakata Signal Generator
買賣信號產生器
Generates buy/sell signals with entry prices, stop-loss levels, and risk-reward ratios.
"""
import pandas as pd
import numpy as np
from datetime import datetime
from typing import Dict, List, Optional
class SignalGenerator:
"""Generate trading signals based on detected Sakata patterns."""
def __init__(self, atr_period: int = 14, risk_reward_ratio: float = 2.0):
"""
Initialize the signal generator.
Args:
atr_period: Period for ATR calculation (for stop-loss)
risk_reward_ratio: Target risk-reward ratio
"""
self.atr_period = atr_period
self.risk_reward_ratio = risk_reward_ratio
def calculate_atr(self, df: pd.DataFrame) -> pd.Series:
"""Calculate Average True Range."""
high = df['High']
low = df['Low']
close = df['Close']
tr1 = high - low
tr2 = abs(high - close.shift(1))
tr3 = abs(low - close.shift(1))
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = tr.rolling(window=self.atr_period).mean()
return atr
def generate_signals(
self,
df: pd.DataFrame,
detected_patterns: List[Dict],
trend_ctx: Dict = None
) -> List[Dict]:
"""
Generate trading signals from detected patterns.
Args:
df: OHLC DataFrame
detected_patterns: List of detected pattern dicts
trend_ctx: 趨勢背景 (v2.1 新增)
Returns:
List of signal dicts with entry, stop-loss, and target prices
"""
signals = []
atr = self.calculate_atr(df)
for pattern in detected_patterns:
date = pattern['date']
if date not in df.index:
continue
idx = df.index.get_loc(date)
current_price = df.iloc[idx]['Close']
current_atr = atr.iloc[idx] if not pd.isna(atr.iloc[idx]) else current_price * 0.02
# Determine signal direction
is_bullish = pattern['direction'] == 'bullish'
# Calculate entry, stop-loss, and target
if is_bullish:
# Bullish signal - buy
entry_price = current_price * 1.005 # Slightly above close
stop_loss = current_price - (current_atr * 1.5) # 1.5x ATR below
risk = entry_price - stop_loss
target_price = entry_price + (risk * self.risk_reward_ratio)
else:
# Bearish signal - sell/short
entry_price = current_price * 0.995 # Slightly below close
stop_loss = current_price + (current_atr * 1.5) # 1.5x ATR above
risk = stop_loss - entry_price
target_price = entry_price - (risk * self.risk_reward_ratio)
# Calculate signal strength (v2.1: 傳入 trend_ctx)
strength = self._calculate_signal_strength(pattern, df, idx, trend_ctx)
signal = {
'date': date,
'pattern': pattern['name'],
'pattern_en': pattern['english'],
'direction': 'BUY' if is_bullish else 'SELL',
'current_price': round(current_price, 2),
'entry_price': round(entry_price, 2),
'stop_loss': round(stop_loss, 2),
'target_price': round(target_price, 2),
'risk': round(abs(risk), 2),
'reward': round(abs(risk) * self.risk_reward_ratio, 2),
'risk_reward': f"1:{self.risk_reward_ratio}",
'strength': strength,
'strength_stars': '' * strength,
'pattern_type': pattern['type'],
'atr': round(current_atr, 2)
}
signals.append(signal)
# Sort by date (newest first) and strength
signals.sort(key=lambda x: (x['date'], -x['strength']), reverse=True)
return signals
def _calculate_signal_strength(
self,
pattern: Dict,
df: pd.DataFrame,
idx: int,
trend_ctx: Dict = None
) -> int:
"""
Calculate signal strength from 1-5.
v2.1 改良: 考慮順勢/逆勢權重
- 順勢 (uptrend + BUY 或 downtrend + SELL): +2
- 逆勢 (downtrend + BUY 或 uptrend + SELL): -1
Factors:
- Pattern reliability (reversal vs continuation)
- Volume confirmation
- Trend alignment (v2.1 改良)
- Pattern clarity (signal value)
"""
strength = 3 # Base strength
# Pattern type bonus
pattern_type = pattern.get('type', '')
if pattern_type == 'reversal':
strength += 1 # Reversal patterns are stronger signals
# Signal clarity
raw_strength = pattern.get('strength', 50)
if raw_strength >= 100:
strength += 1
elif raw_strength <= 30:
strength -= 1
# Volume confirmation (if available)
if 'Volume' in df.columns and idx > 0:
current_vol = df.iloc[idx]['Volume']
avg_vol = df['Volume'].iloc[max(0, idx-20):idx].mean()
if current_vol > avg_vol * 1.5:
strength += 1 # High volume confirmation
# v2.1: 趨勢順逆權重調整
is_bullish = pattern['direction'] == 'bullish'
# 使用傳入的 trend_ctx 或計算 MA20 斜率
if trend_ctx and 'trend' in trend_ctx:
trend = trend_ctx['trend']
elif idx >= 20:
ma20_current = df['Close'].iloc[idx-20:idx].mean()
ma20_prev = df['Close'].iloc[idx-25:idx-5].mean() if idx >= 25 else ma20_current
if ma20_current > ma20_prev * 1.01:
trend = 'uptrend'
elif ma20_current < ma20_prev * 0.99:
trend = 'downtrend'
else:
trend = 'sideways'
else:
trend = 'sideways'
# 順勢/逆勢權重
if trend == 'uptrend':
if is_bullish:
strength += 2 # 順勢多頭: +2
else:
strength -= 1 # 逆勢空頭: -1
elif trend == 'downtrend':
if not is_bullish:
strength += 2 # 順勢空頭: +2
else:
strength -= 1 # 逆勢多頭: -1 (這是最危險的!)
# sideways: 不調整
return max(1, min(5, strength)) # Clamp to 1-5
def check_consecutive_windows(
self,
signals: List[Dict],
lookback_days: int = 5
) -> Dict:
"""
v2.2: 檢測連續同向窗口 (二空/三空)。
二空: 連續 2 個同向缺口 -> 強勢警示
三空: 連續 3 個同向缺口 -> 力竭反轉信號
"""
from datetime import timedelta
# 篩選窗口型態
window_signals = [
s for s in signals
if 'Window' in s.get('pattern_en', '') or '窗口' in s.get('pattern', '')
]
if len(window_signals) < 2:
return {'count': 0, 'direction': None, 'warning': None}
# 依日期排序 (新到舊)
window_signals.sort(key=lambda x: x['date'], reverse=True)
# 檢測連續同向缺口 (從最新開始)
consecutive_up = 0
consecutive_down = 0
current_direction = None
max_consecutive_up = 0
max_consecutive_down = 0
for i, sig in enumerate(window_signals[:5]):
sig_direction = sig['direction']
if i == 0:
# 第一個信號確定方向
current_direction = sig_direction
if sig_direction == 'BUY':
consecutive_up = 1
else:
consecutive_down = 1
else:
# 檢查是否同向
if sig_direction == current_direction:
if sig_direction == 'BUY':
consecutive_up += 1
else:
consecutive_down += 1
else:
# 方向改變,停止計數
break
max_consecutive_up = max(max_consecutive_up, consecutive_up)
max_consecutive_down = max(max_consecutive_down, consecutive_down)
max_consecutive = max(max_consecutive_up, max_consecutive_down)
direction = 'UP' if max_consecutive_up > max_consecutive_down else 'DOWN'
warning = None
if max_consecutive == 2:
if direction == 'UP':
warning = '⚠️ 二空上漲中 (強勢軋空,勿追空)'
else:
warning = '⚠️ 二空下跌中 (強勢殺多,勿追多)'
elif max_consecutive >= 3:
if direction == 'UP':
warning = '🔴 三空力竭 (準備賣出)'
else:
warning = '🟢 三空力竭 (準備買入)'
return {
'count': max_consecutive,
'direction': direction,
'warning': warning
}
def get_latest_recommendation(
self,
signals: List[Dict],
current_price: float = None,
current_date = None,
lookback_days: int = 3
) -> Optional[Dict]:
"""
v2.2 實戰邏輯: 取得最新交易建議。
改良重點:
1. 時效性: 超過 3 天的信號標記為「已過期」
2. 停損檢查: 已觸發停損的標記為「觀望」
3. 二空/三空: 連續窗口序列特殊處理
Args:
signals: List of all signals
current_price: 當前股價
current_date: 當前日期
lookback_days: 信號有效天數 (預設 3 天)
Returns:
Recommendation dict with status
"""
from datetime import datetime, timedelta
if not signals:
return None
# 設定當前日期
if current_date is None:
current_date = datetime.now()
elif hasattr(current_date, 'to_pydatetime'):
current_date = current_date.to_pydatetime()
# 依日期排序 (新到舊)
sorted_signals = sorted(signals, key=lambda x: x['date'], reverse=True)
if not sorted_signals:
return None
# 取最新信號
latest_signal = sorted_signals[0]
signal_date = latest_signal['date']
if hasattr(signal_date, 'to_pydatetime'):
signal_date = signal_date.to_pydatetime()
elif isinstance(signal_date, str):
signal_date = datetime.strptime(signal_date[:10], '%Y-%m-%d')
# 計算天數差
days_diff = (current_date - signal_date).days if hasattr(current_date, 'days') == False else 0
try:
days_diff = abs((current_date - signal_date).days)
except:
days_diff = 0
# ========================================
# 1. 時效性檢查
# ========================================
is_expired = days_diff > lookback_days
# ========================================
# 2. 停損觸發檢查
# ========================================
stop_loss_triggered = False
if current_price is not None:
if latest_signal['direction'] == 'SELL':
# 空單: 股價 > 停損價 = 已停損
stop_loss_triggered = current_price > latest_signal['stop_loss']
else:
# 多單: 股價 < 停損價 = 已停損
stop_loss_triggered = current_price < latest_signal['stop_loss']
# ========================================
# 3. 連續窗口檢查 (二空/三空)
# ========================================
window_status = self.check_consecutive_windows(signals)
# ========================================
# 4. 決定狀態和建議
# ========================================
status = 'ACTIVE'
status_reason = None
recommendation = latest_signal['direction']
if stop_loss_triggered:
status = 'STOPPED_OUT'
status_reason = f"股價 ${current_price:.2f} 已觸發停損 ${latest_signal['stop_loss']:.2f}"
recommendation = 'HOLD'
elif is_expired:
status = 'EXPIRED'
status_reason = f"信號已過期 ({days_diff} 天前)"
recommendation = 'HOLD'
elif window_status['warning']:
status = 'SPECIAL'
status_reason = window_status['warning']
# 三空力竭時反向操作
if window_status['count'] >= 3:
recommendation = 'SELL' if window_status['direction'] == 'UP' else 'BUY'
return {
'recommendation': recommendation,
'pattern': latest_signal['pattern'],
'entry': latest_signal['entry_price'],
'stop_loss': latest_signal['stop_loss'],
'target': latest_signal['target_price'],
'strength': latest_signal['strength_stars'],
'date': latest_signal['date'],
'status': status,
'status_reason': status_reason,
'days_since_signal': days_diff,
'window_status': window_status
}
def summarize_signals(self, signals: List[Dict]) -> Dict:
"""
Create a summary of all signals.
"""
if not signals:
return {
'total': 0,
'bullish': 0,
'bearish': 0,
'bias': 'NEUTRAL',
'avg_strength': 0
}
bullish = [s for s in signals if s['direction'] == 'BUY']
bearish = [s for s in signals if s['direction'] == 'SELL']
avg_strength = sum(s['strength'] for s in signals) / len(signals)
if len(bullish) > len(bearish) * 1.5:
bias = 'BULLISH'
elif len(bearish) > len(bullish) * 1.5:
bias = 'BEARISH'
else:
bias = 'NEUTRAL'
return {
'total': len(signals),
'bullish': len(bullish),
'bearish': len(bearish),
'bias': bias,
'avg_strength': round(avg_strength, 1)
}