stock-flow/.agent/skills/sakata/scripts/chart_plotter.py

327 lines
11 KiB
Python
Raw Permalink Normal View History

2026-01-30 09:02:35 +00:00
"""
Sakata Chart Plotter v2.0
K 線圖表繪製 + 型態標註
改良: 使用英文標籤避免中文字體問題
"""
import pandas as pd
import numpy as np
import mplfinance as mpf
import matplotlib.pyplot as plt
from matplotlib.patches import FancyBboxPatch
import matplotlib.font_manager as fm
import os
from datetime import datetime
def calculate_support_resistance(df: pd.DataFrame, window: int = 20) -> dict:
"""Calculate support and resistance levels."""
highs = df['High'].values
lows = df['Low'].values
closes = df['Close'].values
resistance_levels = []
support_levels = []
for i in range(window, len(df) - window):
if highs[i] == max(highs[i-window:i+window+1]):
resistance_levels.append(highs[i])
if lows[i] == min(lows[i-window:i+window+1]):
support_levels.append(lows[i])
def cluster_levels(levels, threshold=0.02):
if not levels:
return []
levels = sorted(levels)
clusters = [[levels[0]]]
for level in levels[1:]:
if (level - clusters[-1][-1]) / clusters[-1][-1] < threshold:
clusters[-1].append(level)
else:
clusters.append([level])
return [np.mean(c) for c in clusters]
resistance = cluster_levels(resistance_levels)[-3:] if resistance_levels else []
support = cluster_levels(support_levels)[:3] if support_levels else []
return {
'resistance': resistance,
'support': support,
'current_price': closes[-1]
}
def calculate_rsi(df: pd.DataFrame, period: int = 14) -> pd.Series:
"""Calculate RSI indicator."""
delta = df['Close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def create_sakata_chart(
df: pd.DataFrame,
detected_patterns: list,
signals: list,
ticker: str,
output_dir: str = './output'
) -> str:
"""
Create a candlestick chart with Sakata pattern annotations.
Uses English labels for better font compatibility.
"""
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
# Calculate technical indicators
df = df.copy()
df['MA20'] = df['Close'].rolling(window=20).mean()
df['MA50'] = df['Close'].rolling(window=50).mean()
df['RSI'] = calculate_rsi(df)
# Calculate support/resistance
sr_levels = calculate_support_resistance(df)
# Prepare annotation markers
buy_signals = []
sell_signals = []
# 去重: 每天每型態只標記一次
seen_dates_buy = set()
seen_dates_sell = set()
for pattern in detected_patterns[-30:]:
date = pattern['date']
if date in df.index:
date_key = str(date)[:10]
if pattern['direction'] == 'bullish':
if date_key not in seen_dates_buy:
buy_signals.append({
'date': date,
'price': df.loc[date, 'Low'] * 0.98,
'name': pattern['english'] # 使用英文名
})
seen_dates_buy.add(date_key)
else:
if date_key not in seen_dates_sell:
sell_signals.append({
'date': date,
'price': df.loc[date, 'High'] * 1.02,
'name': pattern['english'] # 使用英文名
})
seen_dates_sell.add(date_key)
# Create marker arrays
buy_markers = np.nan * np.ones(len(df))
sell_markers = np.nan * np.ones(len(df))
for sig in buy_signals:
if sig['date'] in df.index:
idx = df.index.get_loc(sig['date'])
buy_markers[idx] = df.iloc[idx]['Low'] * 0.97
for sig in sell_signals:
if sig['date'] in df.index:
idx = df.index.get_loc(sig['date'])
sell_markers[idx] = df.iloc[idx]['High'] * 1.03
# Define custom style
mc = mpf.make_marketcolors(
up='#26a69a',
down='#ef5350',
edge='inherit',
wick='inherit',
volume='in',
ohlc='i'
)
s = mpf.make_mpf_style(
marketcolors=mc,
gridstyle='-',
gridcolor='#e0e0e0',
y_on_right=True,
rc={
'font.size': 10,
'axes.labelsize': 12,
'axes.titlesize': 14
}
)
# Create additional plots
apds = []
# Buy/Sell markers
if not np.all(np.isnan(buy_markers)):
apds.append(mpf.make_addplot(
buy_markers, type='scatter', markersize=100,
marker='^', color='#26a69a', panel=0
))
if not np.all(np.isnan(sell_markers)):
apds.append(mpf.make_addplot(
sell_markers, type='scatter', markersize=100,
marker='v', color='#ef5350', panel=0
))
# Moving averages
apds.append(mpf.make_addplot(df['MA20'], color='#2196F3', width=1.5, panel=0))
apds.append(mpf.make_addplot(df['MA50'], color='#FF9800', width=1.5, panel=0))
# RSI in separate panel
apds.append(mpf.make_addplot(df['RSI'], color='#9C27B0', width=1, panel=2, ylabel='RSI'))
# RSI overbought/oversold lines
rsi_70 = np.full(len(df), 70)
rsi_30 = np.full(len(df), 30)
apds.append(mpf.make_addplot(rsi_70, color='#ef5350', width=0.5, linestyle='--', panel=2))
apds.append(mpf.make_addplot(rsi_30, color='#26a69a', width=0.5, linestyle='--', panel=2))
# Generate main chart (使用英文標題避免字體問題)
fig, axes = mpf.plot(
df,
type='candle',
style=s,
title=f'\n{ticker} - Sakata Pattern Analysis',
ylabel='Price',
ylabel_lower='Volume',
volume=True,
addplot=apds if apds else None,
figsize=(18, 12),
returnfig=True,
panel_ratios=(5, 1.5, 1.5)
)
ax = axes[0]
# Draw support lines
for level in sr_levels['support']:
ax.axhline(y=level, color='#26a69a', linestyle='--', linewidth=1.5, alpha=0.7)
ax.text(df.index[2], level, f' Support ${level:.2f}',
fontsize=9, color='#26a69a', va='bottom')
# Draw resistance lines
for level in sr_levels['resistance']:
ax.axhline(y=level, color='#ef5350', linestyle='--', linewidth=1.5, alpha=0.7)
ax.text(df.index[2], level, f' Resistance ${level:.2f}',
fontsize=9, color='#ef5350', va='top')
# Add signal summary box (使用英文)
bullish_count = len(set(p['date'] for p in detected_patterns if p['direction'] == 'bullish'))
bearish_count = len(set(p['date'] for p in detected_patterns if p['direction'] == 'bearish'))
summary_text = (
f"Signal Summary\n"
f"{'='*14}\n"
f"Bullish: {bullish_count}\n"
f"Bearish: {bearish_count}\n"
f"{'='*14}\n"
f"Close: ${sr_levels['current_price']:.2f}"
)
ax.text(
0.02, 0.98, summary_text,
transform=ax.transAxes, fontsize=10,
verticalalignment='top',
bbox=dict(boxstyle='round,pad=0.5', facecolor='white', alpha=0.9, edgecolor='#ccc'),
family='monospace'
)
# Add recent patterns annotation (使用英文)
if detected_patterns:
# 去重
seen = set()
unique_recent = []
for p in detected_patterns[:10]:
key = (str(p['date'])[:10], p['english'])
if key not in seen:
seen.add(key)
unique_recent.append(p)
pattern_text = "Recent Patterns\n" + "="*16 + "\n"
for p in unique_recent[:6]:
date_str = p['date'].strftime('%m/%d') if hasattr(p['date'], 'strftime') else str(p['date'])[:10]
signal_emoji = '+' if p['direction'] == 'bullish' else '-'
# 截斷過長的型態名稱
name = p['english'][:18] if len(p['english']) > 18 else p['english']
pattern_text += f"{date_str}: {name} [{signal_emoji}]\n"
ax.text(
0.98, 0.98, pattern_text,
transform=ax.transAxes, fontsize=9,
verticalalignment='top', horizontalalignment='right',
bbox=dict(boxstyle='round,pad=0.5', facecolor='white', alpha=0.9, edgecolor='#ccc'),
family='monospace'
)
# Add legend
from matplotlib.lines import Line2D
legend_elements = [
Line2D([0], [0], color='#2196F3', linewidth=2, label='MA20'),
Line2D([0], [0], color='#FF9800', linewidth=2, label='MA50'),
Line2D([0], [0], color='#26a69a', linestyle='--', linewidth=1.5, label='Support'),
Line2D([0], [0], color='#ef5350', linestyle='--', linewidth=1.5, label='Resistance'),
Line2D([0], [0], marker='^', color='#26a69a', linestyle='None', markersize=10, label='Bullish Signal'),
Line2D([0], [0], marker='v', color='#ef5350', linestyle='None', markersize=10, label='Bearish Signal'),
]
ax.legend(handles=legend_elements, loc='upper center', ncol=6, fontsize=8,
bbox_to_anchor=(0.5, 1.02), framealpha=0.9)
# Save chart
output_path = os.path.join(output_dir, f'{ticker}_sakata.png')
fig.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='white')
plt.close(fig)
return output_path
def create_pattern_detail_chart(
df: pd.DataFrame,
pattern: dict,
ticker: str,
output_dir: str = './output'
) -> str:
"""
Create a detailed chart for a specific pattern occurrence.
"""
# Focus on 20 days around the pattern
pattern_date = pattern['date']
if pattern_date in df.index:
idx = df.index.get_loc(pattern_date)
start_idx = max(0, idx - 10)
end_idx = min(len(df), idx + 10)
focused_df = df.iloc[start_idx:end_idx].copy()
else:
focused_df = df.tail(20).copy()
# Create marker for the pattern
marker = np.nan * np.ones(len(focused_df))
if pattern_date in focused_df.index:
local_idx = focused_df.index.get_loc(pattern_date)
marker[local_idx] = focused_df.iloc[local_idx]['Low'] * 0.97
mc = mpf.make_marketcolors(up='#26a69a', down='#ef5350', edge='inherit', wick='inherit')
s = mpf.make_mpf_style(marketcolors=mc, gridstyle='-', gridcolor='#e0e0e0')
apds = []
if not np.all(np.isnan(marker)):
apds.append(mpf.make_addplot(marker, type='scatter', markersize=200, marker='*', color='gold'))
fig, axes = mpf.plot(
focused_df,
type='candle',
style=s,
title=f"\n{ticker} - {pattern['english']}",
figsize=(10, 6),
returnfig=True,
addplot=apds if apds else None
)
output_path = os.path.join(output_dir, f"{ticker}_{pattern['pattern']}_detail.png")
fig.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='white')
plt.close(fig)
return output_path