198 lines
5.4 KiB
Go
198 lines
5.4 KiB
Go
|
package usecase
|
|||
|
|
|||
|
import (
|
|||
|
"blockchain/internal/domain/blockchain"
|
|||
|
"blockchain/internal/domain/repository"
|
|||
|
"blockchain/internal/domain/usecase"
|
|||
|
"context"
|
|||
|
"github.com/lxzan/gws"
|
|||
|
"github.com/zeromicro/go-zero/core/logx"
|
|||
|
"sync"
|
|||
|
"time"
|
|||
|
)
|
|||
|
|
|||
|
type BinanceExchangeParam struct {
|
|||
|
Adapter repository.ExchangeAdapter
|
|||
|
Handler repository.StreamHandler
|
|||
|
Backoff time.Duration
|
|||
|
MaxBackoff time.Duration
|
|||
|
Parallel bool
|
|||
|
ParallelN int
|
|||
|
}
|
|||
|
|
|||
|
type Connection struct {
|
|||
|
// Adapter:交易所的實作(Binance/Coinbase/OKX ...)
|
|||
|
// 用來決定如何建立連線、訂閱訊息、解析 K 線、補歷史資料等
|
|||
|
Adapter repository.ExchangeAdapter
|
|||
|
|
|||
|
// Handler:資料事件回呼,讓上層接收到統一格式的 K 線或錯誤通知
|
|||
|
Handler repository.StreamHandler
|
|||
|
|
|||
|
// === 內部連線狀態 ===
|
|||
|
conn *gws.Conn // 當前 WebSocket 連線物件
|
|||
|
ctx context.Context // 控制心跳、讀取等 goroutine 的生命週期
|
|||
|
cancel context.CancelFunc // 用來結束 ctx
|
|||
|
|
|||
|
// === 訂閱管理 ===
|
|||
|
subsMu sync.Mutex // 保護 subs map 的互斥鎖
|
|||
|
subs map[string]map[blockchain.Interval]struct{} // 已訂閱的主題表,key=symbol,value=set(interval)
|
|||
|
|
|||
|
// === 重連控制 ===
|
|||
|
backoff time.Duration // 當前重連延遲時間(指數退避用)
|
|||
|
maxBackoff time.Duration // 最大重連延遲時間
|
|||
|
|
|||
|
// === gws 併發讀取設定 ===
|
|||
|
parallel bool // 是否開啟 gws 的 ParallelEnabled
|
|||
|
parallelN int // 平行讀取時允許的最大 goroutine 數
|
|||
|
}
|
|||
|
|
|||
|
func (c *Connection) SubscribeKLine(symbols []string, interval blockchain.Interval) error {
|
|||
|
c.subsMu.Lock()
|
|||
|
defer c.subsMu.Unlock()
|
|||
|
for _, s := range symbols {
|
|||
|
if _, ok := c.subs[s]; !ok {
|
|||
|
c.subs[s] = map[blockchain.Interval]struct{}{}
|
|||
|
}
|
|||
|
c.subs[s][interval] = struct{}{}
|
|||
|
}
|
|||
|
// 若已連線就立即送訂閱訊息
|
|||
|
if c.conn != nil {
|
|||
|
payloads, err := c.Adapter.BuildSubscribe(symbols, interval)
|
|||
|
if err != nil {
|
|||
|
return err
|
|||
|
}
|
|||
|
for _, p := range payloads {
|
|||
|
_ = c.conn.WriteMessage(gws.OpcodeText, p)
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
return nil
|
|||
|
}
|
|||
|
|
|||
|
func (c *Connection) RunForever() {
|
|||
|
for {
|
|||
|
h := WebSocket{c}
|
|||
|
conn, _, err := gws.NewClient(&h, &gws.ClientOption{
|
|||
|
Addr: c.Adapter.URL(),
|
|||
|
ParallelEnabled: c.parallel,
|
|||
|
ParallelGolimit: c.parallelN,
|
|||
|
PermessageDeflate: gws.PermessageDeflate{
|
|||
|
Enabled: true,
|
|||
|
ServerContextTakeover: true,
|
|||
|
ClientContextTakeover: true,
|
|||
|
},
|
|||
|
RequestHeader: map[string][]string{"User-Agent": {"universal-ws/1.0"}},
|
|||
|
})
|
|||
|
if err != nil {
|
|||
|
c.sleepBackoff()
|
|||
|
continue
|
|||
|
}
|
|||
|
c.conn = conn
|
|||
|
_ = conn.SetReadDeadline(time.Now().Add(c.Adapter.ReadDeadline()))
|
|||
|
conn.ReadLoop() // 阻塞直到關閉
|
|||
|
c.conn = nil // 清除
|
|||
|
c.sleepBackoff() // 退避再重連
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
func (c *Connection) sleepBackoff() {
|
|||
|
time.Sleep(c.backoff)
|
|||
|
if c.backoff < c.maxBackoff {
|
|||
|
c.backoff *= 2
|
|||
|
if c.backoff > c.maxBackoff {
|
|||
|
c.backoff = c.maxBackoff
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
func NewConnection(param BinanceExchangeParam) usecase.ExchangeConnect {
|
|||
|
ctx, cancel := context.WithCancel(context.Background())
|
|||
|
return &Connection{
|
|||
|
Adapter: param.Adapter,
|
|||
|
Handler: param.Handler,
|
|||
|
ctx: ctx,
|
|||
|
cancel: cancel,
|
|||
|
subs: make(map[string]map[blockchain.Interval]struct{}),
|
|||
|
backoff: param.Backoff,
|
|||
|
maxBackoff: param.MaxBackoff,
|
|||
|
parallel: param.Parallel,
|
|||
|
parallelN: param.ParallelN,
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
// ========================================
|
|||
|
|
|||
|
type WebSocket struct {
|
|||
|
conn *Connection
|
|||
|
}
|
|||
|
|
|||
|
func (ws *WebSocket) OnClose(socket *gws.Conn, err error) {
|
|||
|
ws.conn.Handler.OnError(err)
|
|||
|
}
|
|||
|
|
|||
|
func (ws *WebSocket) OnPong(socket *gws.Conn, payload []byte) {
|
|||
|
_ = socket.SetReadDeadline(time.Now().Add(ws.conn.Adapter.ReadDeadline()))
|
|||
|
}
|
|||
|
|
|||
|
func (ws *WebSocket) OnOpen(c *gws.Conn) {
|
|||
|
ws.conn.backoff = time.Second // 重置退避
|
|||
|
// 重訂閱所有
|
|||
|
ws.conn.subsMu.Lock()
|
|||
|
for s, set := range ws.conn.subs {
|
|||
|
for itv := range set {
|
|||
|
payloads, err := ws.conn.Adapter.BuildSubscribe([]string{s}, itv)
|
|||
|
if err != nil {
|
|||
|
ws.conn.Handler.OnError(err)
|
|||
|
continue
|
|||
|
}
|
|||
|
for _, p := range payloads {
|
|||
|
_ = c.WriteMessage(gws.OpcodeText, p)
|
|||
|
}
|
|||
|
//// 回補(可平行:這裡示範最近 2 根的缺口;實務上你可記錄 last close 再補)
|
|||
|
//go func(symbol string, iv blockchain.Interval) {
|
|||
|
// end := time.Now().UnixMilli()
|
|||
|
// start := end - 2*int64(time.Minute/time.Millisecond) // demo: 補近 2m
|
|||
|
// kl, err := ws.conn.Adapter.Backfill(symbol, iv, start, end, 500)
|
|||
|
// if err != nil {
|
|||
|
// ws.conn.Handler.OnError(err)
|
|||
|
// return
|
|||
|
// }
|
|||
|
// for _, k := range kl {
|
|||
|
// ws.conn.Handler.OnKline(k)
|
|||
|
// }
|
|||
|
//}(s, itv)
|
|||
|
}
|
|||
|
}
|
|||
|
ws.conn.subsMu.Unlock()
|
|||
|
}
|
|||
|
|
|||
|
func (ws *WebSocket) OnPing(socket *gws.Conn, payload []byte) {
|
|||
|
_ = socket.WritePong(payload)
|
|||
|
_ = socket.SetReadDeadline(time.Now().Add(ws.conn.Adapter.ReadDeadline()))
|
|||
|
}
|
|||
|
|
|||
|
func (ws *WebSocket) OnMessage(socket *gws.Conn, message *gws.Message) {
|
|||
|
defer message.Close()
|
|||
|
_ = socket.SetReadDeadline(time.Now().Add(ws.conn.Adapter.ReadDeadline()))
|
|||
|
kLines, err := ws.conn.Adapter.ParseKLines(message.Data.Bytes())
|
|||
|
if err != nil {
|
|||
|
ws.conn.Handler.OnError(err)
|
|||
|
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
for _, k := range kLines {
|
|||
|
ws.conn.Handler.OnKline(k)
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
// ========================================
|
|||
|
|
|||
|
type PubHandler struct{}
|
|||
|
|
|||
|
func (h *PubHandler) OnKline(k blockchain.Kline) {
|
|||
|
// 收到之後可以送到 pub/sub 讓其他
|
|||
|
logx.Infof("[PubHandler] Kline:%v", k)
|
|||
|
}
|
|||
|
func (h *PubHandler) OnError(err error) { logx.Errorf("failed to pub k line %v", err.Error()) }
|