package usecase import ( "blockchain/internal/domain/blockchain" "blockchain/internal/domain/repository" "blockchain/internal/domain/usecase" "context" "github.com/lxzan/gws" "github.com/zeromicro/go-zero/core/logx" "sync" "time" ) type BinanceExchangeParam struct { Adapter repository.ExchangeAdapter Handler repository.StreamHandler Backoff time.Duration MaxBackoff time.Duration Parallel bool ParallelN int } type Connection struct { // Adapter:交易所的實作(Binance/Coinbase/OKX ...) // 用來決定如何建立連線、訂閱訊息、解析 K 線、補歷史資料等 Adapter repository.ExchangeAdapter // Handler:資料事件回呼,讓上層接收到統一格式的 K 線或錯誤通知 Handler repository.StreamHandler // === 內部連線狀態 === conn *gws.Conn // 當前 WebSocket 連線物件 ctx context.Context // 控制心跳、讀取等 goroutine 的生命週期 cancel context.CancelFunc // 用來結束 ctx // === 訂閱管理 === subsMu sync.Mutex // 保護 subs map 的互斥鎖 subs map[string]map[blockchain.Interval]struct{} // 已訂閱的主題表,key=symbol,value=set(interval) // === 重連控制 === backoff time.Duration // 當前重連延遲時間(指數退避用) maxBackoff time.Duration // 最大重連延遲時間 // === gws 併發讀取設定 === parallel bool // 是否開啟 gws 的 ParallelEnabled parallelN int // 平行讀取時允許的最大 goroutine 數 } func (c *Connection) SubscribeKLine(symbols []string, interval blockchain.Interval) error { c.subsMu.Lock() defer c.subsMu.Unlock() for _, s := range symbols { if _, ok := c.subs[s]; !ok { c.subs[s] = map[blockchain.Interval]struct{}{} } c.subs[s][interval] = struct{}{} } // 若已連線就立即送訂閱訊息 if c.conn != nil { payloads, err := c.Adapter.BuildSubscribe(symbols, interval) if err != nil { return err } for _, p := range payloads { _ = c.conn.WriteMessage(gws.OpcodeText, p) } } return nil } func (c *Connection) RunForever() { for { h := WebSocket{c} conn, _, err := gws.NewClient(&h, &gws.ClientOption{ Addr: c.Adapter.URL(), ParallelEnabled: c.parallel, ParallelGolimit: c.parallelN, PermessageDeflate: gws.PermessageDeflate{ Enabled: true, ServerContextTakeover: true, ClientContextTakeover: true, }, RequestHeader: map[string][]string{"User-Agent": {"universal-ws/1.0"}}, }) if err != nil { c.sleepBackoff() continue } c.conn = conn _ = conn.SetReadDeadline(time.Now().Add(c.Adapter.ReadDeadline())) conn.ReadLoop() // 阻塞直到關閉 c.conn = nil // 清除 c.sleepBackoff() // 退避再重連 } } func (c *Connection) sleepBackoff() { time.Sleep(c.backoff) if c.backoff < c.maxBackoff { c.backoff *= 2 if c.backoff > c.maxBackoff { c.backoff = c.maxBackoff } } } func NewConnection(param BinanceExchangeParam) usecase.ExchangeConnect { ctx, cancel := context.WithCancel(context.Background()) return &Connection{ Adapter: param.Adapter, Handler: param.Handler, ctx: ctx, cancel: cancel, subs: make(map[string]map[blockchain.Interval]struct{}), backoff: param.Backoff, maxBackoff: param.MaxBackoff, parallel: param.Parallel, parallelN: param.ParallelN, } } // ======================================== type WebSocket struct { conn *Connection } func (ws *WebSocket) OnClose(socket *gws.Conn, err error) { ws.conn.Handler.OnError(err) } func (ws *WebSocket) OnPong(socket *gws.Conn, payload []byte) { _ = socket.SetReadDeadline(time.Now().Add(ws.conn.Adapter.ReadDeadline())) } func (ws *WebSocket) OnOpen(c *gws.Conn) { ws.conn.backoff = time.Second // 重置退避 // 重訂閱所有 ws.conn.subsMu.Lock() for s, set := range ws.conn.subs { for itv := range set { payloads, err := ws.conn.Adapter.BuildSubscribe([]string{s}, itv) if err != nil { ws.conn.Handler.OnError(err) continue } for _, p := range payloads { _ = c.WriteMessage(gws.OpcodeText, p) } //// 回補(可平行:這裡示範最近 2 根的缺口;實務上你可記錄 last close 再補) //go func(symbol string, iv blockchain.Interval) { // end := time.Now().UnixMilli() // start := end - 2*int64(time.Minute/time.Millisecond) // demo: 補近 2m // kl, err := ws.conn.Adapter.Backfill(symbol, iv, start, end, 500) // if err != nil { // ws.conn.Handler.OnError(err) // return // } // for _, k := range kl { // ws.conn.Handler.OnKline(k) // } //}(s, itv) } } ws.conn.subsMu.Unlock() } func (ws *WebSocket) OnPing(socket *gws.Conn, payload []byte) { _ = socket.WritePong(payload) _ = socket.SetReadDeadline(time.Now().Add(ws.conn.Adapter.ReadDeadline())) } func (ws *WebSocket) OnMessage(socket *gws.Conn, message *gws.Message) { defer message.Close() _ = socket.SetReadDeadline(time.Now().Add(ws.conn.Adapter.ReadDeadline())) kLines, err := ws.conn.Adapter.ParseKLines(message.Data.Bytes()) if err != nil { ws.conn.Handler.OnError(err) return } for _, k := range kLines { ws.conn.Handler.OnKline(k) } } // ======================================== type PubHandler struct{} func (h *PubHandler) OnKline(k blockchain.Kline) { // 收到之後可以送到 pub/sub 讓其他 logx.Infof("[PubHandler] Kline:%v", k) } func (h *PubHandler) OnError(err error) { logx.Errorf("failed to pub k line %v", err.Error()) }