""" Sakata Chart Plotter v2.0 K 線圖表繪製 + 型態標註 改良: 使用英文標籤避免中文字體問題 """ import pandas as pd import numpy as np import mplfinance as mpf import matplotlib.pyplot as plt from matplotlib.patches import FancyBboxPatch import matplotlib.font_manager as fm import os from datetime import datetime def calculate_support_resistance(df: pd.DataFrame, window: int = 20) -> dict: """Calculate support and resistance levels.""" highs = df['High'].values lows = df['Low'].values closes = df['Close'].values resistance_levels = [] support_levels = [] for i in range(window, len(df) - window): if highs[i] == max(highs[i-window:i+window+1]): resistance_levels.append(highs[i]) if lows[i] == min(lows[i-window:i+window+1]): support_levels.append(lows[i]) def cluster_levels(levels, threshold=0.02): if not levels: return [] levels = sorted(levels) clusters = [[levels[0]]] for level in levels[1:]: if (level - clusters[-1][-1]) / clusters[-1][-1] < threshold: clusters[-1].append(level) else: clusters.append([level]) return [np.mean(c) for c in clusters] resistance = cluster_levels(resistance_levels)[-3:] if resistance_levels else [] support = cluster_levels(support_levels)[:3] if support_levels else [] return { 'resistance': resistance, 'support': support, 'current_price': closes[-1] } def calculate_rsi(df: pd.DataFrame, period: int = 14) -> pd.Series: """Calculate RSI indicator.""" delta = df['Close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() rs = gain / loss rsi = 100 - (100 / (1 + rs)) return rsi def create_sakata_chart( df: pd.DataFrame, detected_patterns: list, signals: list, ticker: str, output_dir: str = './output' ) -> str: """ Create a candlestick chart with Sakata pattern annotations. Uses English labels for better font compatibility. """ # Ensure output directory exists os.makedirs(output_dir, exist_ok=True) # Calculate technical indicators df = df.copy() df['MA20'] = df['Close'].rolling(window=20).mean() df['MA50'] = df['Close'].rolling(window=50).mean() df['RSI'] = calculate_rsi(df) # Calculate support/resistance sr_levels = calculate_support_resistance(df) # Prepare annotation markers buy_signals = [] sell_signals = [] # 去重: 每天每型態只標記一次 seen_dates_buy = set() seen_dates_sell = set() for pattern in detected_patterns[-30:]: date = pattern['date'] if date in df.index: date_key = str(date)[:10] if pattern['direction'] == 'bullish': if date_key not in seen_dates_buy: buy_signals.append({ 'date': date, 'price': df.loc[date, 'Low'] * 0.98, 'name': pattern['english'] # 使用英文名 }) seen_dates_buy.add(date_key) else: if date_key not in seen_dates_sell: sell_signals.append({ 'date': date, 'price': df.loc[date, 'High'] * 1.02, 'name': pattern['english'] # 使用英文名 }) seen_dates_sell.add(date_key) # Create marker arrays buy_markers = np.nan * np.ones(len(df)) sell_markers = np.nan * np.ones(len(df)) for sig in buy_signals: if sig['date'] in df.index: idx = df.index.get_loc(sig['date']) buy_markers[idx] = df.iloc[idx]['Low'] * 0.97 for sig in sell_signals: if sig['date'] in df.index: idx = df.index.get_loc(sig['date']) sell_markers[idx] = df.iloc[idx]['High'] * 1.03 # Define custom style mc = mpf.make_marketcolors( up='#26a69a', down='#ef5350', edge='inherit', wick='inherit', volume='in', ohlc='i' ) s = mpf.make_mpf_style( marketcolors=mc, gridstyle='-', gridcolor='#e0e0e0', y_on_right=True, rc={ 'font.size': 10, 'axes.labelsize': 12, 'axes.titlesize': 14 } ) # Create additional plots apds = [] # Buy/Sell markers if not np.all(np.isnan(buy_markers)): apds.append(mpf.make_addplot( buy_markers, type='scatter', markersize=100, marker='^', color='#26a69a', panel=0 )) if not np.all(np.isnan(sell_markers)): apds.append(mpf.make_addplot( sell_markers, type='scatter', markersize=100, marker='v', color='#ef5350', panel=0 )) # Moving averages apds.append(mpf.make_addplot(df['MA20'], color='#2196F3', width=1.5, panel=0)) apds.append(mpf.make_addplot(df['MA50'], color='#FF9800', width=1.5, panel=0)) # RSI in separate panel apds.append(mpf.make_addplot(df['RSI'], color='#9C27B0', width=1, panel=2, ylabel='RSI')) # RSI overbought/oversold lines rsi_70 = np.full(len(df), 70) rsi_30 = np.full(len(df), 30) apds.append(mpf.make_addplot(rsi_70, color='#ef5350', width=0.5, linestyle='--', panel=2)) apds.append(mpf.make_addplot(rsi_30, color='#26a69a', width=0.5, linestyle='--', panel=2)) # Generate main chart (使用英文標題避免字體問題) fig, axes = mpf.plot( df, type='candle', style=s, title=f'\n{ticker} - Sakata Pattern Analysis', ylabel='Price', ylabel_lower='Volume', volume=True, addplot=apds if apds else None, figsize=(18, 12), returnfig=True, panel_ratios=(5, 1.5, 1.5) ) ax = axes[0] # Draw support lines for level in sr_levels['support']: ax.axhline(y=level, color='#26a69a', linestyle='--', linewidth=1.5, alpha=0.7) ax.text(df.index[2], level, f' Support ${level:.2f}', fontsize=9, color='#26a69a', va='bottom') # Draw resistance lines for level in sr_levels['resistance']: ax.axhline(y=level, color='#ef5350', linestyle='--', linewidth=1.5, alpha=0.7) ax.text(df.index[2], level, f' Resistance ${level:.2f}', fontsize=9, color='#ef5350', va='top') # Add signal summary box (使用英文) bullish_count = len(set(p['date'] for p in detected_patterns if p['direction'] == 'bullish')) bearish_count = len(set(p['date'] for p in detected_patterns if p['direction'] == 'bearish')) summary_text = ( f"Signal Summary\n" f"{'='*14}\n" f"Bullish: {bullish_count}\n" f"Bearish: {bearish_count}\n" f"{'='*14}\n" f"Close: ${sr_levels['current_price']:.2f}" ) ax.text( 0.02, 0.98, summary_text, transform=ax.transAxes, fontsize=10, verticalalignment='top', bbox=dict(boxstyle='round,pad=0.5', facecolor='white', alpha=0.9, edgecolor='#ccc'), family='monospace' ) # Add recent patterns annotation (使用英文) if detected_patterns: # 去重 seen = set() unique_recent = [] for p in detected_patterns[:10]: key = (str(p['date'])[:10], p['english']) if key not in seen: seen.add(key) unique_recent.append(p) pattern_text = "Recent Patterns\n" + "="*16 + "\n" for p in unique_recent[:6]: date_str = p['date'].strftime('%m/%d') if hasattr(p['date'], 'strftime') else str(p['date'])[:10] signal_emoji = '+' if p['direction'] == 'bullish' else '-' # 截斷過長的型態名稱 name = p['english'][:18] if len(p['english']) > 18 else p['english'] pattern_text += f"{date_str}: {name} [{signal_emoji}]\n" ax.text( 0.98, 0.98, pattern_text, transform=ax.transAxes, fontsize=9, verticalalignment='top', horizontalalignment='right', bbox=dict(boxstyle='round,pad=0.5', facecolor='white', alpha=0.9, edgecolor='#ccc'), family='monospace' ) # Add legend from matplotlib.lines import Line2D legend_elements = [ Line2D([0], [0], color='#2196F3', linewidth=2, label='MA20'), Line2D([0], [0], color='#FF9800', linewidth=2, label='MA50'), Line2D([0], [0], color='#26a69a', linestyle='--', linewidth=1.5, label='Support'), Line2D([0], [0], color='#ef5350', linestyle='--', linewidth=1.5, label='Resistance'), Line2D([0], [0], marker='^', color='#26a69a', linestyle='None', markersize=10, label='Bullish Signal'), Line2D([0], [0], marker='v', color='#ef5350', linestyle='None', markersize=10, label='Bearish Signal'), ] ax.legend(handles=legend_elements, loc='upper center', ncol=6, fontsize=8, bbox_to_anchor=(0.5, 1.02), framealpha=0.9) # Save chart output_path = os.path.join(output_dir, f'{ticker}_sakata.png') fig.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='white') plt.close(fig) return output_path def create_pattern_detail_chart( df: pd.DataFrame, pattern: dict, ticker: str, output_dir: str = './output' ) -> str: """ Create a detailed chart for a specific pattern occurrence. """ # Focus on 20 days around the pattern pattern_date = pattern['date'] if pattern_date in df.index: idx = df.index.get_loc(pattern_date) start_idx = max(0, idx - 10) end_idx = min(len(df), idx + 10) focused_df = df.iloc[start_idx:end_idx].copy() else: focused_df = df.tail(20).copy() # Create marker for the pattern marker = np.nan * np.ones(len(focused_df)) if pattern_date in focused_df.index: local_idx = focused_df.index.get_loc(pattern_date) marker[local_idx] = focused_df.iloc[local_idx]['Low'] * 0.97 mc = mpf.make_marketcolors(up='#26a69a', down='#ef5350', edge='inherit', wick='inherit') s = mpf.make_mpf_style(marketcolors=mc, gridstyle='-', gridcolor='#e0e0e0') apds = [] if not np.all(np.isnan(marker)): apds.append(mpf.make_addplot(marker, type='scatter', markersize=200, marker='*', color='gold')) fig, axes = mpf.plot( focused_df, type='candle', style=s, title=f"\n{ticker} - {pattern['english']}", figsize=(10, 6), returnfig=True, addplot=apds if apds else None ) output_path = os.path.join(output_dir, f"{ticker}_{pattern['pattern']}_detail.png") fig.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='white') plt.close(fig) return output_path