759 lines
30 KiB
Python
759 lines
30 KiB
Python
"""
|
|
Sakata 80 Candlestick Pattern Detector v2.0
|
|
酒田戰法 80 種 K 線型態偵測器 (改良版)
|
|
|
|
改良重點:
|
|
1. 趨勢濾網: 高檔偵測三山, 低檔偵測三川
|
|
2. 時間跨度: 三山/三川至少間隔 10 根 K 線
|
|
3. 結構分析: 計算頸線、量價背離
|
|
"""
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import talib
|
|
|
|
# Pattern categories and their TA-Lib function names
|
|
TALIB_PATTERNS = {
|
|
# Single Candle Patterns (單根型態)
|
|
'CDL_DOJI': {'name': '十字星', 'en': 'Doji', 'type': 'reversal', 'signal': 'neutral'},
|
|
'CDL_DOJISTAR': {'name': '十字星', 'en': 'Doji Star', 'type': 'reversal', 'signal': 'neutral'},
|
|
'CDL_DRAGONFLYDOJI': {'name': '蜻蜓十字', 'en': 'Dragonfly Doji', 'type': 'reversal', 'signal': 'bullish'},
|
|
'CDL_GRAVESTONEDOJI': {'name': '墓碑十字', 'en': 'Gravestone Doji', 'type': 'reversal', 'signal': 'bearish'},
|
|
'CDL_LONGLEGGEDDOJI': {'name': '長腳十字', 'en': 'Long Legged Doji', 'type': 'reversal', 'signal': 'neutral'},
|
|
'CDL_HAMMER': {'name': '錘子', 'en': 'Hammer', 'type': 'reversal', 'signal': 'bullish'},
|
|
'CDL_HANGINGMAN': {'name': '吊人', 'en': 'Hanging Man', 'type': 'reversal', 'signal': 'bearish'},
|
|
'CDL_SHOOTINGSTAR': {'name': '流星', 'en': 'Shooting Star', 'type': 'reversal', 'signal': 'bearish'},
|
|
'CDL_INVERTEDHAMMER': {'name': '倒錘子', 'en': 'Inverted Hammer', 'type': 'reversal', 'signal': 'bullish'},
|
|
'CDL_MARUBOZU': {'name': '光頭光腳', 'en': 'Marubozu', 'type': 'continuation', 'signal': 'trend'},
|
|
'CDL_SPINNINGTOP': {'name': '陀螺', 'en': 'Spinning Top', 'type': 'reversal', 'signal': 'neutral'},
|
|
'CDL_HIGHWAVE': {'name': '高浪線', 'en': 'High Wave', 'type': 'reversal', 'signal': 'neutral'},
|
|
'CDL_BELTHOLD': {'name': '捉腰帶線', 'en': 'Belt Hold', 'type': 'reversal', 'signal': 'trend'},
|
|
'CDL_TAKURI': {'name': '探底', 'en': 'Takuri', 'type': 'reversal', 'signal': 'bullish'},
|
|
|
|
# Double Candle Patterns (雙根型態)
|
|
'CDL_ENGULFING': {'name': '吞噬', 'en': 'Engulfing', 'type': 'reversal', 'signal': 'trend'},
|
|
'CDL_DARKCLOUDCOVER': {'name': '烏雲蓋頂', 'en': 'Dark Cloud Cover', 'type': 'reversal', 'signal': 'bearish'},
|
|
'CDL_PIERCING': {'name': '刺透', 'en': 'Piercing', 'type': 'reversal', 'signal': 'bullish'},
|
|
'CDL_HARAMI': {'name': '孕線', 'en': 'Harami', 'type': 'reversal', 'signal': 'trend'},
|
|
'CDL_HARAMICROSS': {'name': '十字孕線', 'en': 'Harami Cross', 'type': 'reversal', 'signal': 'trend'},
|
|
'CDL_MATCHINGLOW': {'name': '相同低價', 'en': 'Matching Low', 'type': 'reversal', 'signal': 'bullish'},
|
|
'CDL_KICKING': {'name': '反沖', 'en': 'Kicking', 'type': 'reversal', 'signal': 'trend'},
|
|
'CDL_KICKINGBYLENGTH': {'name': '反沖(長度)', 'en': 'Kicking by Length', 'type': 'reversal', 'signal': 'trend'},
|
|
'CDL_COUNTERATTACK': {'name': '反擊線', 'en': 'Counterattack', 'type': 'reversal', 'signal': 'trend'},
|
|
'CDL_CLOSINGMARUBOZU': {'name': '收盤光頭', 'en': 'Closing Marubozu', 'type': 'continuation', 'signal': 'trend'},
|
|
'CDL_HIKKAKE': {'name': '陷阱', 'en': 'Hikkake', 'type': 'reversal', 'signal': 'trend'},
|
|
'CDL_HIKKAKEMOD': {'name': '改良陷阱', 'en': 'Modified Hikkake', 'type': 'reversal', 'signal': 'trend'},
|
|
'CDL_HOMINGPIGEON': {'name': '家鴿', 'en': 'Homing Pigeon', 'type': 'reversal', 'signal': 'bullish'},
|
|
'CDL_INNECK': {'name': '頸內線', 'en': 'In Neck', 'type': 'continuation', 'signal': 'bearish'},
|
|
'CDL_ONNECK': {'name': '頸上線', 'en': 'On Neck', 'type': 'continuation', 'signal': 'bearish'},
|
|
'CDL_THRUSTING': {'name': '切入線', 'en': 'Thrusting', 'type': 'continuation', 'signal': 'bearish'},
|
|
'CDL_SEPARATINGLINES': {'name': '分離線', 'en': 'Separating Lines', 'type': 'continuation', 'signal': 'trend'},
|
|
'CDL_TWOCROWS': {'name': '二隻烏鴉', 'en': 'Two Crows', 'type': 'reversal', 'signal': 'bearish'},
|
|
|
|
# Triple Candle Patterns (三根型態)
|
|
'CDL_MORNINGSTAR': {'name': '晨星', 'en': 'Morning Star', 'type': 'reversal', 'signal': 'bullish'},
|
|
'CDL_MORNINGDOJISTAR': {'name': '十字晨星', 'en': 'Morning Doji Star', 'type': 'reversal', 'signal': 'bullish'},
|
|
'CDL_EVENINGSTAR': {'name': '夜星', 'en': 'Evening Star', 'type': 'reversal', 'signal': 'bearish'},
|
|
'CDL_EVENINGDOJISTAR': {'name': '十字夜星', 'en': 'Evening Doji Star', 'type': 'reversal', 'signal': 'bearish'},
|
|
'CDL_ABANDONEDBABY': {'name': '棄嬰', 'en': 'Abandoned Baby', 'type': 'reversal', 'signal': 'trend'},
|
|
'CDL_3WHITESOLDIERS': {'name': '三白兵', 'en': 'Three White Soldiers', 'type': 'reversal', 'signal': 'bullish'},
|
|
'CDL_3BLACKCROWS': {'name': '三黑鴉', 'en': 'Three Black Crows', 'type': 'reversal', 'signal': 'bearish'},
|
|
'CDL_3INSIDE': {'name': '三內部', 'en': 'Three Inside', 'type': 'reversal', 'signal': 'trend'},
|
|
'CDL_3OUTSIDE': {'name': '三外部', 'en': 'Three Outside', 'type': 'reversal', 'signal': 'trend'},
|
|
'CDL_3STARSINSOUTH': {'name': '南方三星', 'en': 'Three Stars In South', 'type': 'reversal', 'signal': 'bullish'},
|
|
'CDL_TRISTAR': {'name': '三星', 'en': 'Tristar', 'type': 'reversal', 'signal': 'trend'},
|
|
'CDL_UPSIDEGAP2CROWS': {'name': '向上跳空二鴉', 'en': 'Upside Gap Two Crows', 'type': 'reversal', 'signal': 'bearish'},
|
|
'CDL_UNIQUE3RIVER': {'name': '奇特三川底', 'en': 'Unique 3 River', 'type': 'reversal', 'signal': 'bullish'},
|
|
'CDL_XSIDEGAP3METHODS': {'name': '跳空並列三法', 'en': 'Side Gap Three Methods', 'type': 'continuation', 'signal': 'trend'},
|
|
'CDL_TASUKIGAP': {'name': '跳空缺口', 'en': 'Tasuki Gap', 'type': 'continuation', 'signal': 'trend'},
|
|
'CDL_GAPSIDESIDEWHITE': {'name': '向上跳空並列陽線', 'en': 'Gap Side Side White', 'type': 'continuation', 'signal': 'bullish'},
|
|
|
|
# Complex Patterns (複雜型態)
|
|
'CDL_ADVANCEBLOCK': {'name': '前進受阻', 'en': 'Advance Block', 'type': 'reversal', 'signal': 'bearish'},
|
|
'CDL_STALLEDPATTERN': {'name': '停滯型態', 'en': 'Stalled Pattern', 'type': 'reversal', 'signal': 'bearish'},
|
|
'CDL_CONCEALBABYSWALL': {'name': '藏嬰吞噬', 'en': 'Concealing Baby Swallow', 'type': 'reversal', 'signal': 'bullish'},
|
|
'CDL_LADDERBOTTOM': {'name': '梯底', 'en': 'Ladder Bottom', 'type': 'reversal', 'signal': 'bullish'},
|
|
'CDL_RICKSHAWMAN': {'name': '黃包車夫', 'en': 'Rickshaw Man', 'type': 'reversal', 'signal': 'neutral'},
|
|
'CDL_RISEFALL3METHODS': {'name': '上升/下降三法', 'en': 'Rise/Fall Three Methods', 'type': 'continuation', 'signal': 'trend'},
|
|
'CDL_STICKSANDWICH': {'name': '棍子三明治', 'en': 'Stick Sandwich', 'type': 'reversal', 'signal': 'bullish'},
|
|
'CDL_BREAKAWAY': {'name': '脫離', 'en': 'Breakaway', 'type': 'reversal', 'signal': 'trend'},
|
|
'CDL_MATHOLD': {'name': '鋪墊', 'en': 'Mat Hold', 'type': 'continuation', 'signal': 'bullish'},
|
|
'CDL_IDENTICAL3CROWS': {'name': '相同三鴉', 'en': 'Identical Three Crows', 'type': 'reversal', 'signal': 'bearish'},
|
|
'CDL_SHORTLINE': {'name': '短線', 'en': 'Short Line', 'type': 'neutral', 'signal': 'neutral'},
|
|
'CDL_LONGLINE': {'name': '長線', 'en': 'Long Line', 'type': 'continuation', 'signal': 'trend'},
|
|
}
|
|
|
|
# Custom patterns not in TA-Lib (酒田特有型態)
|
|
CUSTOM_PATTERNS = {
|
|
'SANZAN': {'name': '三山', 'en': 'Three Mountains (Triple Top)', 'type': 'reversal', 'signal': 'bearish'},
|
|
'SANSEN': {'name': '三川', 'en': 'Three Rivers (Triple Bottom)', 'type': 'reversal', 'signal': 'bullish'},
|
|
'SANKU_UP': {'name': '三空(上漲)', 'en': 'Three Gaps Up', 'type': 'reversal', 'signal': 'bearish'},
|
|
'SANKU_DOWN': {'name': '三空(下跌)', 'en': 'Three Gaps Down', 'type': 'reversal', 'signal': 'bullish'},
|
|
'SANPOH_UP': {'name': '上升三法', 'en': 'Rising Three Methods', 'type': 'continuation', 'signal': 'bullish'},
|
|
'SANPOH_DOWN': {'name': '下降三法', 'en': 'Falling Three Methods', 'type': 'continuation', 'signal': 'bearish'},
|
|
'ISLAND_TOP': {'name': '島型頂', 'en': 'Island Top', 'type': 'reversal', 'signal': 'bearish'},
|
|
'ISLAND_BOTTOM': {'name': '島型底', 'en': 'Island Bottom', 'type': 'reversal', 'signal': 'bullish'},
|
|
'ROUND_TOP': {'name': '圓頂', 'en': 'Rounding Top', 'type': 'reversal', 'signal': 'bearish'},
|
|
'ROUND_BOTTOM': {'name': '圓底', 'en': 'Rounding Bottom', 'type': 'reversal', 'signal': 'bullish'},
|
|
'TWEEZERS_TOP': {'name': '鑷子頂', 'en': 'Tweezers Top', 'type': 'reversal', 'signal': 'bearish'},
|
|
'TWEEZERS_BOTTOM': {'name': '鑷子底', 'en': 'Tweezers Bottom', 'type': 'reversal', 'signal': 'bullish'},
|
|
'WINDOW_UP': {'name': '向上窗口', 'en': 'Rising Window', 'type': 'continuation', 'signal': 'bullish'},
|
|
'WINDOW_DOWN': {'name': '向下窗口', 'en': 'Falling Window', 'type': 'continuation', 'signal': 'bearish'},
|
|
'EXHAUSTION_UP': {'name': '竭盡缺口(上)', 'en': 'Exhaustion Gap Up', 'type': 'reversal', 'signal': 'bearish'},
|
|
'EXHAUSTION_DOWN': {'name': '竭盡缺口(下)', 'en': 'Exhaustion Gap Down', 'type': 'reversal', 'signal': 'bullish'},
|
|
'STRONG_BULL': {'name': '強勢陽線', 'en': 'Strong Bullish Candle', 'type': 'continuation', 'signal': 'bullish'},
|
|
'STRONG_BEAR': {'name': '強勢陰線', 'en': 'Strong Bearish Candle', 'type': 'continuation', 'signal': 'bearish'},
|
|
}
|
|
|
|
|
|
# ============================================================
|
|
# 趨勢濾網 (Trend Filter)
|
|
# ============================================================
|
|
|
|
def calculate_trend_context(df: pd.DataFrame) -> dict:
|
|
"""
|
|
計算趨勢背景資訊,用於過濾不合理的信號。
|
|
|
|
Returns:
|
|
dict: {
|
|
'ma20': MA20 值,
|
|
'ma60': MA60 值,
|
|
'position': 'high' | 'mid' | 'low',
|
|
'trend': 'uptrend' | 'downtrend' | 'sideways'
|
|
}
|
|
"""
|
|
close = df['Close'].values
|
|
|
|
# 計算均線
|
|
ma20 = talib.SMA(close, timeperiod=20)
|
|
ma60 = talib.SMA(close, timeperiod=60)
|
|
|
|
# 計算 ATR 用於判斷波動性
|
|
atr = talib.ATR(df['High'].values, df['Low'].values, close, timeperiod=14)
|
|
|
|
# 計算過去 60 天的高低點
|
|
lookback = min(60, len(df))
|
|
recent_high = df['High'].iloc[-lookback:].max()
|
|
recent_low = df['Low'].iloc[-lookback:].min()
|
|
price_range = recent_high - recent_low
|
|
|
|
# 當前價格在區間中的位置
|
|
current_price = close[-1]
|
|
position_pct = (current_price - recent_low) / price_range if price_range > 0 else 0.5
|
|
|
|
if position_pct > 0.7:
|
|
position = 'high'
|
|
elif position_pct < 0.3:
|
|
position = 'low'
|
|
else:
|
|
position = 'mid'
|
|
|
|
# 趨勢判斷 (基於 MA20 斜率)
|
|
if len(ma20) >= 10:
|
|
ma20_slope = (ma20[-1] - ma20[-10]) / ma20[-10] if ma20[-10] > 0 else 0
|
|
if ma20_slope > 0.02:
|
|
trend = 'uptrend'
|
|
elif ma20_slope < -0.02:
|
|
trend = 'downtrend'
|
|
else:
|
|
trend = 'sideways'
|
|
else:
|
|
trend = 'sideways'
|
|
|
|
return {
|
|
'ma20': ma20,
|
|
'ma60': ma60,
|
|
'atr': atr,
|
|
'position': position,
|
|
'position_pct': position_pct,
|
|
'trend': trend,
|
|
'recent_high': recent_high,
|
|
'recent_low': recent_low
|
|
}
|
|
|
|
|
|
# ============================================================
|
|
# 結構分析器 (Structure Analyzer)
|
|
# ============================================================
|
|
|
|
def find_swing_points(prices: np.ndarray, min_distance: int = 5) -> tuple:
|
|
"""
|
|
找出擺盪高低點,確保相鄰點之間有最小間距。
|
|
|
|
Args:
|
|
prices: 價格陣列
|
|
min_distance: 相鄰擺盪點的最小間距
|
|
|
|
Returns:
|
|
(peaks, troughs): 高點和低點的索引列表
|
|
"""
|
|
peaks = []
|
|
troughs = []
|
|
|
|
for i in range(min_distance, len(prices) - min_distance):
|
|
# 找高點
|
|
is_peak = True
|
|
for j in range(1, min_distance + 1):
|
|
if prices[i] <= prices[i - j] or prices[i] <= prices[i + j]:
|
|
is_peak = False
|
|
break
|
|
if is_peak:
|
|
# 確保和前一個高點有足夠間距
|
|
if not peaks or (i - peaks[-1]) >= min_distance:
|
|
peaks.append(i)
|
|
|
|
# 找低點
|
|
is_trough = True
|
|
for j in range(1, min_distance + 1):
|
|
if prices[i] >= prices[i - j] or prices[i] >= prices[i + j]:
|
|
is_trough = False
|
|
break
|
|
if is_trough:
|
|
if not troughs or (i - troughs[-1]) >= min_distance:
|
|
troughs.append(i)
|
|
|
|
return peaks, troughs
|
|
|
|
|
|
def calculate_neckline(df: pd.DataFrame, peaks: list, troughs: list, pattern_type: str) -> dict:
|
|
"""
|
|
計算頸線位置。
|
|
|
|
Args:
|
|
df: DataFrame
|
|
peaks: 高點索引列表
|
|
troughs: 低點索引列表
|
|
pattern_type: 'sanzan' (三山) 或 'sansen' (三川)
|
|
|
|
Returns:
|
|
dict: {
|
|
'neckline': 頸線價格,
|
|
'confirmed': 是否已跌破/突破頸線,
|
|
'distance_pct': 當前價格距離頸線的百分比
|
|
}
|
|
"""
|
|
if pattern_type == 'sanzan' and len(troughs) >= 2:
|
|
# 三山的頸線 = 兩個谷底的連線 (取較高者)
|
|
trough_prices = [df['Low'].iloc[t] for t in troughs[-2:]]
|
|
neckline = max(trough_prices)
|
|
current_price = df['Close'].iloc[-1]
|
|
confirmed = current_price < neckline
|
|
distance_pct = (current_price - neckline) / neckline * 100
|
|
|
|
elif pattern_type == 'sansen' and len(peaks) >= 2:
|
|
# 三川的頸線 = 兩個高點的連線 (取較低者)
|
|
peak_prices = [df['High'].iloc[p] for p in peaks[-2:]]
|
|
neckline = min(peak_prices)
|
|
current_price = df['Close'].iloc[-1]
|
|
confirmed = current_price > neckline
|
|
distance_pct = (current_price - neckline) / neckline * 100
|
|
|
|
else:
|
|
return {'neckline': None, 'confirmed': False, 'distance_pct': 0}
|
|
|
|
return {
|
|
'neckline': neckline,
|
|
'confirmed': confirmed,
|
|
'distance_pct': distance_pct
|
|
}
|
|
|
|
|
|
def check_volume_divergence(df: pd.DataFrame, peaks: list) -> dict:
|
|
"""
|
|
檢查量價背離。
|
|
|
|
三山時,第三座山的成交量應該萎縮 (看空確認)
|
|
三川時,第三個底的成交量應該萎縮然後放大 (看多確認)
|
|
|
|
Returns:
|
|
dict: {
|
|
'divergence': bool,
|
|
'type': 'bearish' | 'bullish' | None,
|
|
'description': 說明
|
|
}
|
|
"""
|
|
if 'Volume' not in df.columns or len(peaks) < 3:
|
|
return {'divergence': False, 'type': None, 'description': '無成交量資料'}
|
|
|
|
volumes = df['Volume'].values
|
|
|
|
# 取最後三個高點/低點的成交量
|
|
peak_volumes = [volumes[p] for p in peaks[-3:]]
|
|
|
|
# 量價背離: 價格創新高,但成交量遞減
|
|
vol_trend = (peak_volumes[-1] < peak_volumes[-2] < peak_volumes[-3])
|
|
|
|
if vol_trend:
|
|
return {
|
|
'divergence': True,
|
|
'type': 'bearish',
|
|
'description': f'量價背離: 成交量由 {peak_volumes[-3]:,.0f} → {peak_volumes[-2]:,.0f} → {peak_volumes[-1]:,.0f} 遞減'
|
|
}
|
|
|
|
return {'divergence': False, 'type': None, 'description': '成交量正常'}
|
|
|
|
|
|
# ============================================================
|
|
# 改良版三山/三川偵測
|
|
# ============================================================
|
|
|
|
def detect_sanzan_v2(df: pd.DataFrame, trend_ctx: dict,
|
|
lookback: int = 60, min_peak_distance: int = 10) -> dict:
|
|
"""
|
|
改良版三山 (Triple Top) 偵測。
|
|
|
|
改良點:
|
|
1. 只在高檔 (position = 'high') 才偵測
|
|
2. 三個高點至少間隔 min_peak_distance 根 K 線
|
|
3. 計算頸線和量價背離
|
|
|
|
Returns:
|
|
dict: 完整的型態分析結果
|
|
"""
|
|
signals = np.zeros(len(df))
|
|
analysis = {
|
|
'detected': False,
|
|
'stage': 0, # 1-5 階段
|
|
'peaks': [],
|
|
'neckline': None,
|
|
'volume_divergence': None,
|
|
'description': ''
|
|
}
|
|
|
|
# 濾網1: 只在高檔偵測三山
|
|
if trend_ctx['position'] not in ['high', 'mid']:
|
|
analysis['description'] = '股價位於低檔,不適合偵測三山'
|
|
return {'signals': signals, 'analysis': analysis}
|
|
|
|
highs = df['High'].values
|
|
|
|
# 找擺盪高點 (至少間隔 min_peak_distance)
|
|
peaks, troughs = find_swing_points(highs, min_distance=min_peak_distance)
|
|
|
|
if len(peaks) < 3:
|
|
analysis['description'] = f'高點數量不足 ({len(peaks)}/3)'
|
|
return {'signals': signals, 'analysis': analysis}
|
|
|
|
# 取最後三個高點
|
|
last_3_peaks = peaks[-3:]
|
|
peak_values = [highs[p] for p in last_3_peaks]
|
|
avg_peak = np.mean(peak_values)
|
|
|
|
# 檢查三個高點是否在相近水平 (容許 5% 誤差)
|
|
tolerance = 0.05
|
|
peaks_aligned = all(abs(p - avg_peak) / avg_peak < tolerance for p in peak_values)
|
|
|
|
if not peaks_aligned:
|
|
analysis['description'] = '三個高點水平差異過大'
|
|
return {'signals': signals, 'analysis': analysis}
|
|
|
|
# 檢查時間跨度 (三個高點的總跨度應該夠大)
|
|
total_span = last_3_peaks[-1] - last_3_peaks[0]
|
|
if total_span < 20: # 至少跨越 20 根 K 線
|
|
analysis['description'] = f'時間跨度不足 ({total_span}/20 根K線)'
|
|
return {'signals': signals, 'analysis': analysis}
|
|
|
|
# 計算頸線
|
|
neckline_info = calculate_neckline(df, last_3_peaks, troughs, 'sanzan')
|
|
|
|
# 檢查量價背離
|
|
volume_info = check_volume_divergence(df, last_3_peaks)
|
|
|
|
# 判斷型態階段
|
|
current_price = df['Close'].iloc[-1]
|
|
if neckline_info['neckline']:
|
|
if neckline_info['confirmed']:
|
|
stage = 5 # 已跌破頸線,型態確立
|
|
elif current_price < avg_peak * 0.97:
|
|
stage = 4 # 正在形成右肩
|
|
else:
|
|
stage = 3 # 第三山頂形成中
|
|
else:
|
|
stage = 2 # 僅形成兩個山頂
|
|
|
|
# 只在型態較成熟時才發出信號 (stage >= 3)
|
|
if stage >= 3:
|
|
signals[-1] = -100 if stage >= 4 else -50
|
|
analysis['detected'] = True
|
|
|
|
analysis.update({
|
|
'stage': stage,
|
|
'peaks': [{'idx': p, 'price': highs[p], 'date': df.index[p]} for p in last_3_peaks],
|
|
'neckline': neckline_info,
|
|
'volume_divergence': volume_info,
|
|
'avg_peak': avg_peak,
|
|
'description': f'三山型態 ({stage}/5 階段)' +
|
|
(' ⚠️ 量價背離' if volume_info['divergence'] else '')
|
|
})
|
|
|
|
return {'signals': signals, 'analysis': analysis}
|
|
|
|
|
|
def detect_sansen_v2(df: pd.DataFrame, trend_ctx: dict,
|
|
lookback: int = 60, min_trough_distance: int = 10) -> dict:
|
|
"""
|
|
改良版三川 (Triple Bottom) 偵測。
|
|
|
|
改良點:
|
|
1. 只在低檔 (position = 'low') 才偵測
|
|
2. 三個低點至少間隔 min_trough_distance 根 K 線
|
|
3. 計算頸線和量能確認
|
|
"""
|
|
signals = np.zeros(len(df))
|
|
analysis = {
|
|
'detected': False,
|
|
'stage': 0,
|
|
'troughs': [],
|
|
'neckline': None,
|
|
'volume_divergence': None,
|
|
'description': ''
|
|
}
|
|
|
|
# 濾網1: 只在低檔偵測三川
|
|
if trend_ctx['position'] not in ['low', 'mid']:
|
|
analysis['description'] = '股價位於高檔,不適合偵測三川'
|
|
return {'signals': signals, 'analysis': analysis}
|
|
|
|
lows = df['Low'].values
|
|
|
|
# 找擺盪低點
|
|
peaks, troughs = find_swing_points(lows, min_distance=min_trough_distance)
|
|
|
|
if len(troughs) < 3:
|
|
analysis['description'] = f'低點數量不足 ({len(troughs)}/3)'
|
|
return {'signals': signals, 'analysis': analysis}
|
|
|
|
# 取最後三個低點
|
|
last_3_troughs = troughs[-3:]
|
|
trough_values = [lows[t] for t in last_3_troughs]
|
|
avg_trough = np.mean(trough_values)
|
|
|
|
# 檢查三個低點是否在相近水平
|
|
tolerance = 0.05
|
|
troughs_aligned = all(abs(t - avg_trough) / avg_trough < tolerance for t in trough_values)
|
|
|
|
if not troughs_aligned:
|
|
analysis['description'] = '三個低點水平差異過大'
|
|
return {'signals': signals, 'analysis': analysis}
|
|
|
|
# 時間跨度檢查
|
|
total_span = last_3_troughs[-1] - last_3_troughs[0]
|
|
if total_span < 20:
|
|
analysis['description'] = f'時間跨度不足 ({total_span}/20 根K線)'
|
|
return {'signals': signals, 'analysis': analysis}
|
|
|
|
# 計算頸線
|
|
neckline_info = calculate_neckline(df, peaks, last_3_troughs, 'sansen')
|
|
|
|
# 判斷型態階段
|
|
current_price = df['Close'].iloc[-1]
|
|
if neckline_info['neckline']:
|
|
if neckline_info['confirmed']:
|
|
stage = 5
|
|
elif current_price > avg_trough * 1.03:
|
|
stage = 4
|
|
else:
|
|
stage = 3
|
|
else:
|
|
stage = 2
|
|
|
|
if stage >= 3:
|
|
signals[-1] = 100 if stage >= 4 else 50
|
|
analysis['detected'] = True
|
|
|
|
analysis.update({
|
|
'stage': stage,
|
|
'troughs': [{'idx': t, 'price': lows[t], 'date': df.index[t]} for t in last_3_troughs],
|
|
'neckline': neckline_info,
|
|
'avg_trough': avg_trough,
|
|
'description': f'三川型態 ({stage}/5 階段)'
|
|
})
|
|
|
|
return {'signals': signals, 'analysis': analysis}
|
|
|
|
|
|
# ============================================================
|
|
# TA-Lib 型態偵測 (保持原有功能)
|
|
# ============================================================
|
|
|
|
def detect_talib_patterns(df: pd.DataFrame) -> dict:
|
|
"""Detect all TA-Lib candlestick patterns."""
|
|
results = {}
|
|
|
|
open_prices = df['Open'].values
|
|
high_prices = df['High'].values
|
|
low_prices = df['Low'].values
|
|
close_prices = df['Close'].values
|
|
|
|
for pattern_key, pattern_info in TALIB_PATTERNS.items():
|
|
func_name = pattern_key
|
|
|
|
try:
|
|
func = getattr(talib, func_name)
|
|
pattern_result = func(open_prices, high_prices, low_prices, close_prices)
|
|
results[pattern_key] = {
|
|
'values': pattern_result,
|
|
'info': pattern_info
|
|
}
|
|
except Exception as e:
|
|
print(f"Warning: Could not compute {pattern_key}: {e}")
|
|
|
|
return results
|
|
|
|
|
|
# ============================================================
|
|
# 其他自訂型態 (簡化版)
|
|
# ============================================================
|
|
|
|
def detect_sanku_v2(df: pd.DataFrame, trend_ctx: dict) -> dict:
|
|
"""
|
|
改良版三空偵測 (v2.1) - 使用狀態機記錄連續缺口。
|
|
|
|
三空上漲: 連續 3 個向上缺口 -> 空頭信號 (漲勢竭盡)
|
|
三空下跌: 連續 3 個向下缺口 -> 多頭信號 (跌勢竭盡)
|
|
|
|
Returns:
|
|
dict: {
|
|
'up_signals': 三空上漲信號,
|
|
'down_signals': 三空下跌信號,
|
|
'sequences': 偵測到的序列詳情
|
|
}
|
|
"""
|
|
up_signals = np.zeros(len(df))
|
|
down_signals = np.zeros(len(df))
|
|
sequences = []
|
|
|
|
highs = df['High'].values
|
|
lows = df['Low'].values
|
|
|
|
# 狀態機: 追蹤連續缺口
|
|
gap_up_streak = 0
|
|
gap_up_start = -1
|
|
gap_down_streak = 0
|
|
gap_down_start = -1
|
|
|
|
for i in range(1, len(df)):
|
|
# 檢測向上缺口
|
|
is_gap_up = lows[i] > highs[i-1]
|
|
# 檢測向下缺口
|
|
is_gap_down = highs[i] < lows[i-1]
|
|
|
|
# 向上缺口序列追蹤
|
|
if is_gap_up:
|
|
if gap_up_streak == 0:
|
|
gap_up_start = i
|
|
gap_up_streak += 1
|
|
|
|
# 三空上漲確認
|
|
if gap_up_streak >= 3:
|
|
up_signals[i] = -100 # 空頭信號
|
|
sequences.append({
|
|
'type': 'SANKU_UP',
|
|
'start_idx': gap_up_start,
|
|
'end_idx': i,
|
|
'gaps': gap_up_streak,
|
|
'signal': 'bearish',
|
|
'description': f'連續 {gap_up_streak} 個向上缺口 (漲勢竭盡)'
|
|
})
|
|
else:
|
|
gap_up_streak = 0
|
|
gap_up_start = -1
|
|
|
|
# 向下缺口序列追蹤
|
|
if is_gap_down:
|
|
if gap_down_streak == 0:
|
|
gap_down_start = i
|
|
gap_down_streak += 1
|
|
|
|
# 三空下跌確認
|
|
if gap_down_streak >= 3:
|
|
down_signals[i] = 100 # 多頭信號
|
|
sequences.append({
|
|
'type': 'SANKU_DOWN',
|
|
'start_idx': gap_down_start,
|
|
'end_idx': i,
|
|
'gaps': gap_down_streak,
|
|
'signal': 'bullish',
|
|
'description': f'連續 {gap_down_streak} 個向下缺口 (跌勢竭盡)'
|
|
})
|
|
else:
|
|
gap_down_streak = 0
|
|
gap_down_start = -1
|
|
|
|
return {
|
|
'up_signals': up_signals,
|
|
'down_signals': down_signals,
|
|
'sequences': sequences
|
|
}
|
|
|
|
|
|
def detect_tweezers_v2(df: pd.DataFrame, trend_ctx: dict, lookback: int = 20) -> tuple:
|
|
"""
|
|
改良版鑷子偵測 (v2.1)。
|
|
|
|
關鍵修正: 只有在創下 20 日新高/新低時出現的雙針,才算有效鑷子。
|
|
半山腰或盤整區的雙針視為雜訊,直接丟棄。
|
|
|
|
Args:
|
|
df: OHLC DataFrame
|
|
trend_ctx: 趨勢背景
|
|
lookback: 回看天數 (預設 20 日)
|
|
"""
|
|
top_signals = np.zeros(len(df))
|
|
bottom_signals = np.zeros(len(df))
|
|
|
|
highs = df['High'].values
|
|
lows = df['Low'].values
|
|
|
|
tolerance = 0.002 # 0.2% tolerance
|
|
|
|
for i in range(lookback, len(df)):
|
|
# 計算 lookback 期間的最高/最低
|
|
recent_high = max(highs[i-lookback:i])
|
|
recent_low = min(lows[i-lookback:i])
|
|
|
|
# ========================================
|
|
# 鑷子頂: 需同時滿足以下條件
|
|
# 1. 今日高點 ≈ 昨日高點 (容差 0.2%)
|
|
# 2. 今日高點 >= 近 20 日最高
|
|
# 3. 趨勢位置在高檔或中間
|
|
# ========================================
|
|
if trend_ctx['position'] in ['high', 'mid']:
|
|
two_highs_equal = abs(highs[i] - highs[i-1]) / highs[i] < tolerance
|
|
at_20d_high = highs[i] >= recent_high * 0.998 # 允許微小誤差
|
|
|
|
if two_highs_equal and at_20d_high:
|
|
top_signals[i] = -100
|
|
|
|
# ========================================
|
|
# 鑷子底: 需同時滿足以下條件
|
|
# 1. 今日低點 ≈ 昨日低點 (容差 0.2%)
|
|
# 2. 今日低點 <= 近 20 日最低
|
|
# 3. 趨勢位置在低檔或中間
|
|
# ========================================
|
|
if trend_ctx['position'] in ['low', 'mid']:
|
|
two_lows_equal = abs(lows[i] - lows[i-1]) / lows[i] < tolerance
|
|
at_20d_low = lows[i] <= recent_low * 1.002 # 允許微小誤差
|
|
|
|
if two_lows_equal and at_20d_low:
|
|
bottom_signals[i] = 100
|
|
|
|
return top_signals, bottom_signals
|
|
|
|
|
|
def detect_windows(df: pd.DataFrame) -> tuple:
|
|
"""Detect Rising/Falling Window (gap) patterns."""
|
|
up_signals = np.zeros(len(df))
|
|
down_signals = np.zeros(len(df))
|
|
|
|
highs = df['High'].values
|
|
lows = df['Low'].values
|
|
|
|
for i in range(1, len(df)):
|
|
# Rising window (gap up)
|
|
if lows[i] > highs[i-1]:
|
|
up_signals[i] = 100
|
|
|
|
# Falling window (gap down)
|
|
if highs[i] < lows[i-1]:
|
|
down_signals[i] = -100
|
|
|
|
return up_signals, down_signals
|
|
|
|
|
|
# ============================================================
|
|
# 主要入口函數
|
|
# ============================================================
|
|
|
|
def detect_all_patterns(df: pd.DataFrame) -> dict:
|
|
"""
|
|
偵測所有 80 種酒田型態 (改良版)。
|
|
|
|
Returns:
|
|
dict: {
|
|
'patterns': DataFrame (每日每型態的信號),
|
|
'trend_context': 趨勢背景資訊,
|
|
'sanzan_analysis': 三山詳細分析,
|
|
'sansen_analysis': 三川詳細分析
|
|
}
|
|
"""
|
|
# 計算趨勢背景
|
|
trend_ctx = calculate_trend_context(df)
|
|
|
|
# 建立結果 DataFrame
|
|
results = pd.DataFrame(index=df.index)
|
|
|
|
# TA-Lib 型態
|
|
talib_results = detect_talib_patterns(df)
|
|
for pattern_key, pattern_data in talib_results.items():
|
|
results[pattern_key] = pattern_data['values']
|
|
|
|
# 改良版三山/三川
|
|
sanzan_result = detect_sanzan_v2(df, trend_ctx)
|
|
sansen_result = detect_sansen_v2(df, trend_ctx)
|
|
results['SANZAN'] = sanzan_result['signals']
|
|
results['SANSEN'] = sansen_result['signals']
|
|
|
|
# 其他自訂型態 (使用 v2.1 改良版)
|
|
sanku_result = detect_sanku_v2(df, trend_ctx)
|
|
results['SANKU_UP'] = sanku_result['up_signals']
|
|
results['SANKU_DOWN'] = sanku_result['down_signals']
|
|
|
|
tweezers_top, tweezers_bottom = detect_tweezers_v2(df, trend_ctx)
|
|
results['TWEEZERS_TOP'] = tweezers_top
|
|
results['TWEEZERS_BOTTOM'] = tweezers_bottom
|
|
|
|
window_up, window_down = detect_windows(df)
|
|
results['WINDOW_UP'] = window_up
|
|
results['WINDOW_DOWN'] = window_down
|
|
|
|
return {
|
|
'patterns': results,
|
|
'trend_context': trend_ctx,
|
|
'sanzan_analysis': sanzan_result['analysis'],
|
|
'sansen_analysis': sansen_result['analysis']
|
|
}
|
|
|
|
|
|
def get_pattern_info(pattern_key: str) -> dict:
|
|
"""Get pattern information by key."""
|
|
if pattern_key in TALIB_PATTERNS:
|
|
return TALIB_PATTERNS[pattern_key]
|
|
elif pattern_key in CUSTOM_PATTERNS:
|
|
return CUSTOM_PATTERNS[pattern_key]
|
|
return None
|
|
|
|
|
|
def summarize_detected_patterns(df: pd.DataFrame, pattern_results: pd.DataFrame) -> list:
|
|
"""
|
|
Summarize all detected patterns with dates and signals.
|
|
|
|
Returns:
|
|
List of dicts: [{date, pattern, signal, strength, info}, ...]
|
|
"""
|
|
detected = []
|
|
|
|
for col in pattern_results.columns:
|
|
non_zero = pattern_results[col] != 0
|
|
if non_zero.any():
|
|
for idx in pattern_results[non_zero].index:
|
|
signal_value = pattern_results.loc[idx, col]
|
|
info = get_pattern_info(col)
|
|
|
|
if info:
|
|
detected.append({
|
|
'date': idx,
|
|
'pattern': col,
|
|
'name': info['name'],
|
|
'english': info['en'],
|
|
'type': info['type'],
|
|
'direction': 'bullish' if signal_value > 0 else 'bearish',
|
|
'strength': abs(signal_value),
|
|
'info': info
|
|
})
|
|
|
|
# Sort by date
|
|
detected.sort(key=lambda x: x['date'], reverse=True)
|
|
|
|
return detected
|