stock-flow/.agent/skills/sakata/scripts/pattern_detector.py

759 lines
30 KiB
Python
Raw Permalink Normal View History

2026-01-30 09:02:35 +00:00
"""
Sakata 80 Candlestick Pattern Detector v2.0
酒田戰法 80 K 線型態偵測器 (改良版)
改良重點:
1. 趨勢濾網: 高檔偵測三山, 低檔偵測三川
2. 時間跨度: 三山/三川至少間隔 10 K
3. 結構分析: 計算頸線量價背離
"""
import numpy as np
import pandas as pd
import talib
# Pattern categories and their TA-Lib function names
TALIB_PATTERNS = {
# Single Candle Patterns (單根型態)
'CDL_DOJI': {'name': '十字星', 'en': 'Doji', 'type': 'reversal', 'signal': 'neutral'},
'CDL_DOJISTAR': {'name': '十字星', 'en': 'Doji Star', 'type': 'reversal', 'signal': 'neutral'},
'CDL_DRAGONFLYDOJI': {'name': '蜻蜓十字', 'en': 'Dragonfly Doji', 'type': 'reversal', 'signal': 'bullish'},
'CDL_GRAVESTONEDOJI': {'name': '墓碑十字', 'en': 'Gravestone Doji', 'type': 'reversal', 'signal': 'bearish'},
'CDL_LONGLEGGEDDOJI': {'name': '長腳十字', 'en': 'Long Legged Doji', 'type': 'reversal', 'signal': 'neutral'},
'CDL_HAMMER': {'name': '錘子', 'en': 'Hammer', 'type': 'reversal', 'signal': 'bullish'},
'CDL_HANGINGMAN': {'name': '吊人', 'en': 'Hanging Man', 'type': 'reversal', 'signal': 'bearish'},
'CDL_SHOOTINGSTAR': {'name': '流星', 'en': 'Shooting Star', 'type': 'reversal', 'signal': 'bearish'},
'CDL_INVERTEDHAMMER': {'name': '倒錘子', 'en': 'Inverted Hammer', 'type': 'reversal', 'signal': 'bullish'},
'CDL_MARUBOZU': {'name': '光頭光腳', 'en': 'Marubozu', 'type': 'continuation', 'signal': 'trend'},
'CDL_SPINNINGTOP': {'name': '陀螺', 'en': 'Spinning Top', 'type': 'reversal', 'signal': 'neutral'},
'CDL_HIGHWAVE': {'name': '高浪線', 'en': 'High Wave', 'type': 'reversal', 'signal': 'neutral'},
'CDL_BELTHOLD': {'name': '捉腰帶線', 'en': 'Belt Hold', 'type': 'reversal', 'signal': 'trend'},
'CDL_TAKURI': {'name': '探底', 'en': 'Takuri', 'type': 'reversal', 'signal': 'bullish'},
# Double Candle Patterns (雙根型態)
'CDL_ENGULFING': {'name': '吞噬', 'en': 'Engulfing', 'type': 'reversal', 'signal': 'trend'},
'CDL_DARKCLOUDCOVER': {'name': '烏雲蓋頂', 'en': 'Dark Cloud Cover', 'type': 'reversal', 'signal': 'bearish'},
'CDL_PIERCING': {'name': '刺透', 'en': 'Piercing', 'type': 'reversal', 'signal': 'bullish'},
'CDL_HARAMI': {'name': '孕線', 'en': 'Harami', 'type': 'reversal', 'signal': 'trend'},
'CDL_HARAMICROSS': {'name': '十字孕線', 'en': 'Harami Cross', 'type': 'reversal', 'signal': 'trend'},
'CDL_MATCHINGLOW': {'name': '相同低價', 'en': 'Matching Low', 'type': 'reversal', 'signal': 'bullish'},
'CDL_KICKING': {'name': '反沖', 'en': 'Kicking', 'type': 'reversal', 'signal': 'trend'},
'CDL_KICKINGBYLENGTH': {'name': '反沖(長度)', 'en': 'Kicking by Length', 'type': 'reversal', 'signal': 'trend'},
'CDL_COUNTERATTACK': {'name': '反擊線', 'en': 'Counterattack', 'type': 'reversal', 'signal': 'trend'},
'CDL_CLOSINGMARUBOZU': {'name': '收盤光頭', 'en': 'Closing Marubozu', 'type': 'continuation', 'signal': 'trend'},
'CDL_HIKKAKE': {'name': '陷阱', 'en': 'Hikkake', 'type': 'reversal', 'signal': 'trend'},
'CDL_HIKKAKEMOD': {'name': '改良陷阱', 'en': 'Modified Hikkake', 'type': 'reversal', 'signal': 'trend'},
'CDL_HOMINGPIGEON': {'name': '家鴿', 'en': 'Homing Pigeon', 'type': 'reversal', 'signal': 'bullish'},
'CDL_INNECK': {'name': '頸內線', 'en': 'In Neck', 'type': 'continuation', 'signal': 'bearish'},
'CDL_ONNECK': {'name': '頸上線', 'en': 'On Neck', 'type': 'continuation', 'signal': 'bearish'},
'CDL_THRUSTING': {'name': '切入線', 'en': 'Thrusting', 'type': 'continuation', 'signal': 'bearish'},
'CDL_SEPARATINGLINES': {'name': '分離線', 'en': 'Separating Lines', 'type': 'continuation', 'signal': 'trend'},
'CDL_TWOCROWS': {'name': '二隻烏鴉', 'en': 'Two Crows', 'type': 'reversal', 'signal': 'bearish'},
# Triple Candle Patterns (三根型態)
'CDL_MORNINGSTAR': {'name': '晨星', 'en': 'Morning Star', 'type': 'reversal', 'signal': 'bullish'},
'CDL_MORNINGDOJISTAR': {'name': '十字晨星', 'en': 'Morning Doji Star', 'type': 'reversal', 'signal': 'bullish'},
'CDL_EVENINGSTAR': {'name': '夜星', 'en': 'Evening Star', 'type': 'reversal', 'signal': 'bearish'},
'CDL_EVENINGDOJISTAR': {'name': '十字夜星', 'en': 'Evening Doji Star', 'type': 'reversal', 'signal': 'bearish'},
'CDL_ABANDONEDBABY': {'name': '棄嬰', 'en': 'Abandoned Baby', 'type': 'reversal', 'signal': 'trend'},
'CDL_3WHITESOLDIERS': {'name': '三白兵', 'en': 'Three White Soldiers', 'type': 'reversal', 'signal': 'bullish'},
'CDL_3BLACKCROWS': {'name': '三黑鴉', 'en': 'Three Black Crows', 'type': 'reversal', 'signal': 'bearish'},
'CDL_3INSIDE': {'name': '三內部', 'en': 'Three Inside', 'type': 'reversal', 'signal': 'trend'},
'CDL_3OUTSIDE': {'name': '三外部', 'en': 'Three Outside', 'type': 'reversal', 'signal': 'trend'},
'CDL_3STARSINSOUTH': {'name': '南方三星', 'en': 'Three Stars In South', 'type': 'reversal', 'signal': 'bullish'},
'CDL_TRISTAR': {'name': '三星', 'en': 'Tristar', 'type': 'reversal', 'signal': 'trend'},
'CDL_UPSIDEGAP2CROWS': {'name': '向上跳空二鴉', 'en': 'Upside Gap Two Crows', 'type': 'reversal', 'signal': 'bearish'},
'CDL_UNIQUE3RIVER': {'name': '奇特三川底', 'en': 'Unique 3 River', 'type': 'reversal', 'signal': 'bullish'},
'CDL_XSIDEGAP3METHODS': {'name': '跳空並列三法', 'en': 'Side Gap Three Methods', 'type': 'continuation', 'signal': 'trend'},
'CDL_TASUKIGAP': {'name': '跳空缺口', 'en': 'Tasuki Gap', 'type': 'continuation', 'signal': 'trend'},
'CDL_GAPSIDESIDEWHITE': {'name': '向上跳空並列陽線', 'en': 'Gap Side Side White', 'type': 'continuation', 'signal': 'bullish'},
# Complex Patterns (複雜型態)
'CDL_ADVANCEBLOCK': {'name': '前進受阻', 'en': 'Advance Block', 'type': 'reversal', 'signal': 'bearish'},
'CDL_STALLEDPATTERN': {'name': '停滯型態', 'en': 'Stalled Pattern', 'type': 'reversal', 'signal': 'bearish'},
'CDL_CONCEALBABYSWALL': {'name': '藏嬰吞噬', 'en': 'Concealing Baby Swallow', 'type': 'reversal', 'signal': 'bullish'},
'CDL_LADDERBOTTOM': {'name': '梯底', 'en': 'Ladder Bottom', 'type': 'reversal', 'signal': 'bullish'},
'CDL_RICKSHAWMAN': {'name': '黃包車夫', 'en': 'Rickshaw Man', 'type': 'reversal', 'signal': 'neutral'},
'CDL_RISEFALL3METHODS': {'name': '上升/下降三法', 'en': 'Rise/Fall Three Methods', 'type': 'continuation', 'signal': 'trend'},
'CDL_STICKSANDWICH': {'name': '棍子三明治', 'en': 'Stick Sandwich', 'type': 'reversal', 'signal': 'bullish'},
'CDL_BREAKAWAY': {'name': '脫離', 'en': 'Breakaway', 'type': 'reversal', 'signal': 'trend'},
'CDL_MATHOLD': {'name': '鋪墊', 'en': 'Mat Hold', 'type': 'continuation', 'signal': 'bullish'},
'CDL_IDENTICAL3CROWS': {'name': '相同三鴉', 'en': 'Identical Three Crows', 'type': 'reversal', 'signal': 'bearish'},
'CDL_SHORTLINE': {'name': '短線', 'en': 'Short Line', 'type': 'neutral', 'signal': 'neutral'},
'CDL_LONGLINE': {'name': '長線', 'en': 'Long Line', 'type': 'continuation', 'signal': 'trend'},
}
# Custom patterns not in TA-Lib (酒田特有型態)
CUSTOM_PATTERNS = {
'SANZAN': {'name': '三山', 'en': 'Three Mountains (Triple Top)', 'type': 'reversal', 'signal': 'bearish'},
'SANSEN': {'name': '三川', 'en': 'Three Rivers (Triple Bottom)', 'type': 'reversal', 'signal': 'bullish'},
'SANKU_UP': {'name': '三空(上漲)', 'en': 'Three Gaps Up', 'type': 'reversal', 'signal': 'bearish'},
'SANKU_DOWN': {'name': '三空(下跌)', 'en': 'Three Gaps Down', 'type': 'reversal', 'signal': 'bullish'},
'SANPOH_UP': {'name': '上升三法', 'en': 'Rising Three Methods', 'type': 'continuation', 'signal': 'bullish'},
'SANPOH_DOWN': {'name': '下降三法', 'en': 'Falling Three Methods', 'type': 'continuation', 'signal': 'bearish'},
'ISLAND_TOP': {'name': '島型頂', 'en': 'Island Top', 'type': 'reversal', 'signal': 'bearish'},
'ISLAND_BOTTOM': {'name': '島型底', 'en': 'Island Bottom', 'type': 'reversal', 'signal': 'bullish'},
'ROUND_TOP': {'name': '圓頂', 'en': 'Rounding Top', 'type': 'reversal', 'signal': 'bearish'},
'ROUND_BOTTOM': {'name': '圓底', 'en': 'Rounding Bottom', 'type': 'reversal', 'signal': 'bullish'},
'TWEEZERS_TOP': {'name': '鑷子頂', 'en': 'Tweezers Top', 'type': 'reversal', 'signal': 'bearish'},
'TWEEZERS_BOTTOM': {'name': '鑷子底', 'en': 'Tweezers Bottom', 'type': 'reversal', 'signal': 'bullish'},
'WINDOW_UP': {'name': '向上窗口', 'en': 'Rising Window', 'type': 'continuation', 'signal': 'bullish'},
'WINDOW_DOWN': {'name': '向下窗口', 'en': 'Falling Window', 'type': 'continuation', 'signal': 'bearish'},
'EXHAUSTION_UP': {'name': '竭盡缺口(上)', 'en': 'Exhaustion Gap Up', 'type': 'reversal', 'signal': 'bearish'},
'EXHAUSTION_DOWN': {'name': '竭盡缺口(下)', 'en': 'Exhaustion Gap Down', 'type': 'reversal', 'signal': 'bullish'},
'STRONG_BULL': {'name': '強勢陽線', 'en': 'Strong Bullish Candle', 'type': 'continuation', 'signal': 'bullish'},
'STRONG_BEAR': {'name': '強勢陰線', 'en': 'Strong Bearish Candle', 'type': 'continuation', 'signal': 'bearish'},
}
# ============================================================
# 趨勢濾網 (Trend Filter)
# ============================================================
def calculate_trend_context(df: pd.DataFrame) -> dict:
"""
計算趨勢背景資訊用於過濾不合理的信號
Returns:
dict: {
'ma20': MA20 ,
'ma60': MA60 ,
'position': 'high' | 'mid' | 'low',
'trend': 'uptrend' | 'downtrend' | 'sideways'
}
"""
close = df['Close'].values
# 計算均線
ma20 = talib.SMA(close, timeperiod=20)
ma60 = talib.SMA(close, timeperiod=60)
# 計算 ATR 用於判斷波動性
atr = talib.ATR(df['High'].values, df['Low'].values, close, timeperiod=14)
# 計算過去 60 天的高低點
lookback = min(60, len(df))
recent_high = df['High'].iloc[-lookback:].max()
recent_low = df['Low'].iloc[-lookback:].min()
price_range = recent_high - recent_low
# 當前價格在區間中的位置
current_price = close[-1]
position_pct = (current_price - recent_low) / price_range if price_range > 0 else 0.5
if position_pct > 0.7:
position = 'high'
elif position_pct < 0.3:
position = 'low'
else:
position = 'mid'
# 趨勢判斷 (基於 MA20 斜率)
if len(ma20) >= 10:
ma20_slope = (ma20[-1] - ma20[-10]) / ma20[-10] if ma20[-10] > 0 else 0
if ma20_slope > 0.02:
trend = 'uptrend'
elif ma20_slope < -0.02:
trend = 'downtrend'
else:
trend = 'sideways'
else:
trend = 'sideways'
return {
'ma20': ma20,
'ma60': ma60,
'atr': atr,
'position': position,
'position_pct': position_pct,
'trend': trend,
'recent_high': recent_high,
'recent_low': recent_low
}
# ============================================================
# 結構分析器 (Structure Analyzer)
# ============================================================
def find_swing_points(prices: np.ndarray, min_distance: int = 5) -> tuple:
"""
找出擺盪高低點確保相鄰點之間有最小間距
Args:
prices: 價格陣列
min_distance: 相鄰擺盪點的最小間距
Returns:
(peaks, troughs): 高點和低點的索引列表
"""
peaks = []
troughs = []
for i in range(min_distance, len(prices) - min_distance):
# 找高點
is_peak = True
for j in range(1, min_distance + 1):
if prices[i] <= prices[i - j] or prices[i] <= prices[i + j]:
is_peak = False
break
if is_peak:
# 確保和前一個高點有足夠間距
if not peaks or (i - peaks[-1]) >= min_distance:
peaks.append(i)
# 找低點
is_trough = True
for j in range(1, min_distance + 1):
if prices[i] >= prices[i - j] or prices[i] >= prices[i + j]:
is_trough = False
break
if is_trough:
if not troughs or (i - troughs[-1]) >= min_distance:
troughs.append(i)
return peaks, troughs
def calculate_neckline(df: pd.DataFrame, peaks: list, troughs: list, pattern_type: str) -> dict:
"""
計算頸線位置
Args:
df: DataFrame
peaks: 高點索引列表
troughs: 低點索引列表
pattern_type: 'sanzan' (三山) 'sansen' (三川)
Returns:
dict: {
'neckline': 頸線價格,
'confirmed': 是否已跌破/突破頸線,
'distance_pct': 當前價格距離頸線的百分比
}
"""
if pattern_type == 'sanzan' and len(troughs) >= 2:
# 三山的頸線 = 兩個谷底的連線 (取較高者)
trough_prices = [df['Low'].iloc[t] for t in troughs[-2:]]
neckline = max(trough_prices)
current_price = df['Close'].iloc[-1]
confirmed = current_price < neckline
distance_pct = (current_price - neckline) / neckline * 100
elif pattern_type == 'sansen' and len(peaks) >= 2:
# 三川的頸線 = 兩個高點的連線 (取較低者)
peak_prices = [df['High'].iloc[p] for p in peaks[-2:]]
neckline = min(peak_prices)
current_price = df['Close'].iloc[-1]
confirmed = current_price > neckline
distance_pct = (current_price - neckline) / neckline * 100
else:
return {'neckline': None, 'confirmed': False, 'distance_pct': 0}
return {
'neckline': neckline,
'confirmed': confirmed,
'distance_pct': distance_pct
}
def check_volume_divergence(df: pd.DataFrame, peaks: list) -> dict:
"""
檢查量價背離
三山時第三座山的成交量應該萎縮 (看空確認)
三川時第三個底的成交量應該萎縮然後放大 (看多確認)
Returns:
dict: {
'divergence': bool,
'type': 'bearish' | 'bullish' | None,
'description': 說明
}
"""
if 'Volume' not in df.columns or len(peaks) < 3:
return {'divergence': False, 'type': None, 'description': '無成交量資料'}
volumes = df['Volume'].values
# 取最後三個高點/低點的成交量
peak_volumes = [volumes[p] for p in peaks[-3:]]
# 量價背離: 價格創新高,但成交量遞減
vol_trend = (peak_volumes[-1] < peak_volumes[-2] < peak_volumes[-3])
if vol_trend:
return {
'divergence': True,
'type': 'bearish',
'description': f'量價背離: 成交量由 {peak_volumes[-3]:,.0f}{peak_volumes[-2]:,.0f}{peak_volumes[-1]:,.0f} 遞減'
}
return {'divergence': False, 'type': None, 'description': '成交量正常'}
# ============================================================
# 改良版三山/三川偵測
# ============================================================
def detect_sanzan_v2(df: pd.DataFrame, trend_ctx: dict,
lookback: int = 60, min_peak_distance: int = 10) -> dict:
"""
改良版三山 (Triple Top) 偵測
改良點:
1. 只在高檔 (position = 'high') 才偵測
2. 三個高點至少間隔 min_peak_distance K
3. 計算頸線和量價背離
Returns:
dict: 完整的型態分析結果
"""
signals = np.zeros(len(df))
analysis = {
'detected': False,
'stage': 0, # 1-5 階段
'peaks': [],
'neckline': None,
'volume_divergence': None,
'description': ''
}
# 濾網1: 只在高檔偵測三山
if trend_ctx['position'] not in ['high', 'mid']:
analysis['description'] = '股價位於低檔,不適合偵測三山'
return {'signals': signals, 'analysis': analysis}
highs = df['High'].values
# 找擺盪高點 (至少間隔 min_peak_distance)
peaks, troughs = find_swing_points(highs, min_distance=min_peak_distance)
if len(peaks) < 3:
analysis['description'] = f'高點數量不足 ({len(peaks)}/3)'
return {'signals': signals, 'analysis': analysis}
# 取最後三個高點
last_3_peaks = peaks[-3:]
peak_values = [highs[p] for p in last_3_peaks]
avg_peak = np.mean(peak_values)
# 檢查三個高點是否在相近水平 (容許 5% 誤差)
tolerance = 0.05
peaks_aligned = all(abs(p - avg_peak) / avg_peak < tolerance for p in peak_values)
if not peaks_aligned:
analysis['description'] = '三個高點水平差異過大'
return {'signals': signals, 'analysis': analysis}
# 檢查時間跨度 (三個高點的總跨度應該夠大)
total_span = last_3_peaks[-1] - last_3_peaks[0]
if total_span < 20: # 至少跨越 20 根 K 線
analysis['description'] = f'時間跨度不足 ({total_span}/20 根K線)'
return {'signals': signals, 'analysis': analysis}
# 計算頸線
neckline_info = calculate_neckline(df, last_3_peaks, troughs, 'sanzan')
# 檢查量價背離
volume_info = check_volume_divergence(df, last_3_peaks)
# 判斷型態階段
current_price = df['Close'].iloc[-1]
if neckline_info['neckline']:
if neckline_info['confirmed']:
stage = 5 # 已跌破頸線,型態確立
elif current_price < avg_peak * 0.97:
stage = 4 # 正在形成右肩
else:
stage = 3 # 第三山頂形成中
else:
stage = 2 # 僅形成兩個山頂
# 只在型態較成熟時才發出信號 (stage >= 3)
if stage >= 3:
signals[-1] = -100 if stage >= 4 else -50
analysis['detected'] = True
analysis.update({
'stage': stage,
'peaks': [{'idx': p, 'price': highs[p], 'date': df.index[p]} for p in last_3_peaks],
'neckline': neckline_info,
'volume_divergence': volume_info,
'avg_peak': avg_peak,
'description': f'三山型態 ({stage}/5 階段)' +
(' ⚠️ 量價背離' if volume_info['divergence'] else '')
})
return {'signals': signals, 'analysis': analysis}
def detect_sansen_v2(df: pd.DataFrame, trend_ctx: dict,
lookback: int = 60, min_trough_distance: int = 10) -> dict:
"""
改良版三川 (Triple Bottom) 偵測
改良點:
1. 只在低檔 (position = 'low') 才偵測
2. 三個低點至少間隔 min_trough_distance K
3. 計算頸線和量能確認
"""
signals = np.zeros(len(df))
analysis = {
'detected': False,
'stage': 0,
'troughs': [],
'neckline': None,
'volume_divergence': None,
'description': ''
}
# 濾網1: 只在低檔偵測三川
if trend_ctx['position'] not in ['low', 'mid']:
analysis['description'] = '股價位於高檔,不適合偵測三川'
return {'signals': signals, 'analysis': analysis}
lows = df['Low'].values
# 找擺盪低點
peaks, troughs = find_swing_points(lows, min_distance=min_trough_distance)
if len(troughs) < 3:
analysis['description'] = f'低點數量不足 ({len(troughs)}/3)'
return {'signals': signals, 'analysis': analysis}
# 取最後三個低點
last_3_troughs = troughs[-3:]
trough_values = [lows[t] for t in last_3_troughs]
avg_trough = np.mean(trough_values)
# 檢查三個低點是否在相近水平
tolerance = 0.05
troughs_aligned = all(abs(t - avg_trough) / avg_trough < tolerance for t in trough_values)
if not troughs_aligned:
analysis['description'] = '三個低點水平差異過大'
return {'signals': signals, 'analysis': analysis}
# 時間跨度檢查
total_span = last_3_troughs[-1] - last_3_troughs[0]
if total_span < 20:
analysis['description'] = f'時間跨度不足 ({total_span}/20 根K線)'
return {'signals': signals, 'analysis': analysis}
# 計算頸線
neckline_info = calculate_neckline(df, peaks, last_3_troughs, 'sansen')
# 判斷型態階段
current_price = df['Close'].iloc[-1]
if neckline_info['neckline']:
if neckline_info['confirmed']:
stage = 5
elif current_price > avg_trough * 1.03:
stage = 4
else:
stage = 3
else:
stage = 2
if stage >= 3:
signals[-1] = 100 if stage >= 4 else 50
analysis['detected'] = True
analysis.update({
'stage': stage,
'troughs': [{'idx': t, 'price': lows[t], 'date': df.index[t]} for t in last_3_troughs],
'neckline': neckline_info,
'avg_trough': avg_trough,
'description': f'三川型態 ({stage}/5 階段)'
})
return {'signals': signals, 'analysis': analysis}
# ============================================================
# TA-Lib 型態偵測 (保持原有功能)
# ============================================================
def detect_talib_patterns(df: pd.DataFrame) -> dict:
"""Detect all TA-Lib candlestick patterns."""
results = {}
open_prices = df['Open'].values
high_prices = df['High'].values
low_prices = df['Low'].values
close_prices = df['Close'].values
for pattern_key, pattern_info in TALIB_PATTERNS.items():
func_name = pattern_key
try:
func = getattr(talib, func_name)
pattern_result = func(open_prices, high_prices, low_prices, close_prices)
results[pattern_key] = {
'values': pattern_result,
'info': pattern_info
}
except Exception as e:
print(f"Warning: Could not compute {pattern_key}: {e}")
return results
# ============================================================
# 其他自訂型態 (簡化版)
# ============================================================
def detect_sanku_v2(df: pd.DataFrame, trend_ctx: dict) -> dict:
"""
改良版三空偵測 (v2.1) - 使用狀態機記錄連續缺口
三空上漲: 連續 3 個向上缺口 -> 空頭信號 (漲勢竭盡)
三空下跌: 連續 3 個向下缺口 -> 多頭信號 (跌勢竭盡)
Returns:
dict: {
'up_signals': 三空上漲信號,
'down_signals': 三空下跌信號,
'sequences': 偵測到的序列詳情
}
"""
up_signals = np.zeros(len(df))
down_signals = np.zeros(len(df))
sequences = []
highs = df['High'].values
lows = df['Low'].values
# 狀態機: 追蹤連續缺口
gap_up_streak = 0
gap_up_start = -1
gap_down_streak = 0
gap_down_start = -1
for i in range(1, len(df)):
# 檢測向上缺口
is_gap_up = lows[i] > highs[i-1]
# 檢測向下缺口
is_gap_down = highs[i] < lows[i-1]
# 向上缺口序列追蹤
if is_gap_up:
if gap_up_streak == 0:
gap_up_start = i
gap_up_streak += 1
# 三空上漲確認
if gap_up_streak >= 3:
up_signals[i] = -100 # 空頭信號
sequences.append({
'type': 'SANKU_UP',
'start_idx': gap_up_start,
'end_idx': i,
'gaps': gap_up_streak,
'signal': 'bearish',
'description': f'連續 {gap_up_streak} 個向上缺口 (漲勢竭盡)'
})
else:
gap_up_streak = 0
gap_up_start = -1
# 向下缺口序列追蹤
if is_gap_down:
if gap_down_streak == 0:
gap_down_start = i
gap_down_streak += 1
# 三空下跌確認
if gap_down_streak >= 3:
down_signals[i] = 100 # 多頭信號
sequences.append({
'type': 'SANKU_DOWN',
'start_idx': gap_down_start,
'end_idx': i,
'gaps': gap_down_streak,
'signal': 'bullish',
'description': f'連續 {gap_down_streak} 個向下缺口 (跌勢竭盡)'
})
else:
gap_down_streak = 0
gap_down_start = -1
return {
'up_signals': up_signals,
'down_signals': down_signals,
'sequences': sequences
}
def detect_tweezers_v2(df: pd.DataFrame, trend_ctx: dict, lookback: int = 20) -> tuple:
"""
改良版鑷子偵測 (v2.1)
關鍵修正: 只有在創下 20 日新高/新低時出現的雙針才算有效鑷子
半山腰或盤整區的雙針視為雜訊直接丟棄
Args:
df: OHLC DataFrame
trend_ctx: 趨勢背景
lookback: 回看天數 (預設 20 )
"""
top_signals = np.zeros(len(df))
bottom_signals = np.zeros(len(df))
highs = df['High'].values
lows = df['Low'].values
tolerance = 0.002 # 0.2% tolerance
for i in range(lookback, len(df)):
# 計算 lookback 期間的最高/最低
recent_high = max(highs[i-lookback:i])
recent_low = min(lows[i-lookback:i])
# ========================================
# 鑷子頂: 需同時滿足以下條件
# 1. 今日高點 ≈ 昨日高點 (容差 0.2%)
# 2. 今日高點 >= 近 20 日最高
# 3. 趨勢位置在高檔或中間
# ========================================
if trend_ctx['position'] in ['high', 'mid']:
two_highs_equal = abs(highs[i] - highs[i-1]) / highs[i] < tolerance
at_20d_high = highs[i] >= recent_high * 0.998 # 允許微小誤差
if two_highs_equal and at_20d_high:
top_signals[i] = -100
# ========================================
# 鑷子底: 需同時滿足以下條件
# 1. 今日低點 ≈ 昨日低點 (容差 0.2%)
# 2. 今日低點 <= 近 20 日最低
# 3. 趨勢位置在低檔或中間
# ========================================
if trend_ctx['position'] in ['low', 'mid']:
two_lows_equal = abs(lows[i] - lows[i-1]) / lows[i] < tolerance
at_20d_low = lows[i] <= recent_low * 1.002 # 允許微小誤差
if two_lows_equal and at_20d_low:
bottom_signals[i] = 100
return top_signals, bottom_signals
def detect_windows(df: pd.DataFrame) -> tuple:
"""Detect Rising/Falling Window (gap) patterns."""
up_signals = np.zeros(len(df))
down_signals = np.zeros(len(df))
highs = df['High'].values
lows = df['Low'].values
for i in range(1, len(df)):
# Rising window (gap up)
if lows[i] > highs[i-1]:
up_signals[i] = 100
# Falling window (gap down)
if highs[i] < lows[i-1]:
down_signals[i] = -100
return up_signals, down_signals
# ============================================================
# 主要入口函數
# ============================================================
def detect_all_patterns(df: pd.DataFrame) -> dict:
"""
偵測所有 80 種酒田型態 (改良版)
Returns:
dict: {
'patterns': DataFrame (每日每型態的信號),
'trend_context': 趨勢背景資訊,
'sanzan_analysis': 三山詳細分析,
'sansen_analysis': 三川詳細分析
}
"""
# 計算趨勢背景
trend_ctx = calculate_trend_context(df)
# 建立結果 DataFrame
results = pd.DataFrame(index=df.index)
# TA-Lib 型態
talib_results = detect_talib_patterns(df)
for pattern_key, pattern_data in talib_results.items():
results[pattern_key] = pattern_data['values']
# 改良版三山/三川
sanzan_result = detect_sanzan_v2(df, trend_ctx)
sansen_result = detect_sansen_v2(df, trend_ctx)
results['SANZAN'] = sanzan_result['signals']
results['SANSEN'] = sansen_result['signals']
# 其他自訂型態 (使用 v2.1 改良版)
sanku_result = detect_sanku_v2(df, trend_ctx)
results['SANKU_UP'] = sanku_result['up_signals']
results['SANKU_DOWN'] = sanku_result['down_signals']
tweezers_top, tweezers_bottom = detect_tweezers_v2(df, trend_ctx)
results['TWEEZERS_TOP'] = tweezers_top
results['TWEEZERS_BOTTOM'] = tweezers_bottom
window_up, window_down = detect_windows(df)
results['WINDOW_UP'] = window_up
results['WINDOW_DOWN'] = window_down
return {
'patterns': results,
'trend_context': trend_ctx,
'sanzan_analysis': sanzan_result['analysis'],
'sansen_analysis': sansen_result['analysis']
}
def get_pattern_info(pattern_key: str) -> dict:
"""Get pattern information by key."""
if pattern_key in TALIB_PATTERNS:
return TALIB_PATTERNS[pattern_key]
elif pattern_key in CUSTOM_PATTERNS:
return CUSTOM_PATTERNS[pattern_key]
return None
def summarize_detected_patterns(df: pd.DataFrame, pattern_results: pd.DataFrame) -> list:
"""
Summarize all detected patterns with dates and signals.
Returns:
List of dicts: [{date, pattern, signal, strength, info}, ...]
"""
detected = []
for col in pattern_results.columns:
non_zero = pattern_results[col] != 0
if non_zero.any():
for idx in pattern_results[non_zero].index:
signal_value = pattern_results.loc[idx, col]
info = get_pattern_info(col)
if info:
detected.append({
'date': idx,
'pattern': col,
'name': info['name'],
'english': info['en'],
'type': info['type'],
'direction': 'bullish' if signal_value > 0 else 'bearish',
'strength': abs(signal_value),
'info': info
})
# Sort by date
detected.sort(key=lambda x: x['date'], reverse=True)
return detected