blockchain/internal/usecase/websocket_binance.go

198 lines
5.4 KiB
Go
Raw Permalink Normal View History

2025-08-09 16:36:24 +00:00
package usecase
import (
"blockchain/internal/domain/blockchain"
"blockchain/internal/domain/repository"
"blockchain/internal/domain/usecase"
"context"
"github.com/lxzan/gws"
"github.com/zeromicro/go-zero/core/logx"
"sync"
"time"
)
type BinanceExchangeParam struct {
Adapter repository.ExchangeAdapter
Handler repository.StreamHandler
Backoff time.Duration
MaxBackoff time.Duration
Parallel bool
ParallelN int
}
type Connection struct {
// Adapter交易所的實作Binance/Coinbase/OKX ...
// 用來決定如何建立連線、訂閱訊息、解析 K 線、補歷史資料等
Adapter repository.ExchangeAdapter
// Handler資料事件回呼讓上層接收到統一格式的 K 線或錯誤通知
Handler repository.StreamHandler
// === 內部連線狀態 ===
conn *gws.Conn // 當前 WebSocket 連線物件
ctx context.Context // 控制心跳、讀取等 goroutine 的生命週期
cancel context.CancelFunc // 用來結束 ctx
// === 訂閱管理 ===
subsMu sync.Mutex // 保護 subs map 的互斥鎖
subs map[string]map[blockchain.Interval]struct{} // 已訂閱的主題表key=symbolvalue=set(interval)
// === 重連控制 ===
backoff time.Duration // 當前重連延遲時間(指數退避用)
maxBackoff time.Duration // 最大重連延遲時間
// === gws 併發讀取設定 ===
parallel bool // 是否開啟 gws 的 ParallelEnabled
parallelN int // 平行讀取時允許的最大 goroutine 數
}
func (c *Connection) SubscribeKLine(symbols []string, interval blockchain.Interval) error {
c.subsMu.Lock()
defer c.subsMu.Unlock()
for _, s := range symbols {
if _, ok := c.subs[s]; !ok {
c.subs[s] = map[blockchain.Interval]struct{}{}
}
c.subs[s][interval] = struct{}{}
}
// 若已連線就立即送訂閱訊息
if c.conn != nil {
payloads, err := c.Adapter.BuildSubscribe(symbols, interval)
if err != nil {
return err
}
for _, p := range payloads {
_ = c.conn.WriteMessage(gws.OpcodeText, p)
}
}
return nil
}
func (c *Connection) RunForever() {
for {
h := WebSocket{c}
conn, _, err := gws.NewClient(&h, &gws.ClientOption{
Addr: c.Adapter.URL(),
ParallelEnabled: c.parallel,
ParallelGolimit: c.parallelN,
PermessageDeflate: gws.PermessageDeflate{
Enabled: true,
ServerContextTakeover: true,
ClientContextTakeover: true,
},
RequestHeader: map[string][]string{"User-Agent": {"universal-ws/1.0"}},
})
if err != nil {
c.sleepBackoff()
continue
}
c.conn = conn
_ = conn.SetReadDeadline(time.Now().Add(c.Adapter.ReadDeadline()))
conn.ReadLoop() // 阻塞直到關閉
c.conn = nil // 清除
c.sleepBackoff() // 退避再重連
}
}
func (c *Connection) sleepBackoff() {
time.Sleep(c.backoff)
if c.backoff < c.maxBackoff {
c.backoff *= 2
if c.backoff > c.maxBackoff {
c.backoff = c.maxBackoff
}
}
}
func NewConnection(param BinanceExchangeParam) usecase.ExchangeConnect {
ctx, cancel := context.WithCancel(context.Background())
return &Connection{
Adapter: param.Adapter,
Handler: param.Handler,
ctx: ctx,
cancel: cancel,
subs: make(map[string]map[blockchain.Interval]struct{}),
backoff: param.Backoff,
maxBackoff: param.MaxBackoff,
parallel: param.Parallel,
parallelN: param.ParallelN,
}
}
// ========================================
type WebSocket struct {
conn *Connection
}
func (ws *WebSocket) OnClose(socket *gws.Conn, err error) {
ws.conn.Handler.OnError(err)
}
func (ws *WebSocket) OnPong(socket *gws.Conn, payload []byte) {
_ = socket.SetReadDeadline(time.Now().Add(ws.conn.Adapter.ReadDeadline()))
}
func (ws *WebSocket) OnOpen(c *gws.Conn) {
ws.conn.backoff = time.Second // 重置退避
// 重訂閱所有
ws.conn.subsMu.Lock()
for s, set := range ws.conn.subs {
for itv := range set {
payloads, err := ws.conn.Adapter.BuildSubscribe([]string{s}, itv)
if err != nil {
ws.conn.Handler.OnError(err)
continue
}
for _, p := range payloads {
_ = c.WriteMessage(gws.OpcodeText, p)
}
//// 回補(可平行:這裡示範最近 2 根的缺口;實務上你可記錄 last close 再補)
//go func(symbol string, iv blockchain.Interval) {
// end := time.Now().UnixMilli()
// start := end - 2*int64(time.Minute/time.Millisecond) // demo: 補近 2m
// kl, err := ws.conn.Adapter.Backfill(symbol, iv, start, end, 500)
// if err != nil {
// ws.conn.Handler.OnError(err)
// return
// }
// for _, k := range kl {
// ws.conn.Handler.OnKline(k)
// }
//}(s, itv)
}
}
ws.conn.subsMu.Unlock()
}
func (ws *WebSocket) OnPing(socket *gws.Conn, payload []byte) {
_ = socket.WritePong(payload)
_ = socket.SetReadDeadline(time.Now().Add(ws.conn.Adapter.ReadDeadline()))
}
func (ws *WebSocket) OnMessage(socket *gws.Conn, message *gws.Message) {
defer message.Close()
_ = socket.SetReadDeadline(time.Now().Add(ws.conn.Adapter.ReadDeadline()))
kLines, err := ws.conn.Adapter.ParseKLines(message.Data.Bytes())
if err != nil {
ws.conn.Handler.OnError(err)
return
}
for _, k := range kLines {
ws.conn.Handler.OnKline(k)
}
}
// ========================================
type PubHandler struct{}
func (h *PubHandler) OnKline(k blockchain.Kline) {
// 收到之後可以送到 pub/sub 讓其他
logx.Infof("[PubHandler] Kline:%v", k)
}
func (h *PubHandler) OnError(err error) { logx.Errorf("failed to pub k line %v", err.Error()) }