From c4abce7fff9c36ad7c816d7701717845dc2771bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=80=A7=E9=A9=8A?= Date: Sun, 10 Aug 2025 00:36:24 +0800 Subject: [PATCH] add cassandra --- go.mod | 3 +- go.sum | 2 + internal/domain/blockchain/const.go | 15 ++ internal/domain/blockchain/kline.go | 16 ++ .../domain/repository/exchange_adapter.go | 25 +++ internal/domain/usecase/exchange_connect.go | 8 + internal/lib/websocket/config.go | 27 --- internal/lib/websocket/connection.go | 25 --- internal/lib/websocket/option.go | 1 - internal/repository/binance_adapter.go | 149 +++++++++++++ internal/repository/data_source_binance.go | 2 +- internal/svc/service_context.go | 3 +- internal/svc/websocket.go | 58 ++++++ internal/usecase/websocket_binance.go | 197 ++++++++++++++++++ 14 files changed, 474 insertions(+), 57 deletions(-) create mode 100644 internal/domain/blockchain/kline.go create mode 100644 internal/domain/repository/exchange_adapter.go create mode 100644 internal/domain/usecase/exchange_connect.go delete mode 100644 internal/lib/websocket/config.go delete mode 100644 internal/lib/websocket/connection.go delete mode 100644 internal/lib/websocket/option.go create mode 100644 internal/repository/binance_adapter.go create mode 100644 internal/svc/websocket.go create mode 100644 internal/usecase/websocket_binance.go diff --git a/go.mod b/go.mod index d6c01a9..4a948d8 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,11 @@ go 1.24.5 require ( github.com/alicebob/miniredis/v2 v2.35.0 + github.com/goccy/go-json v0.10.5 github.com/gocql/gocql v1.5.0 github.com/lxzan/gws v1.8.9 github.com/panjf2000/ants/v2 v2.11.3 github.com/scylladb/gocqlx/v3 v3.0.1 - github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.10.0 github.com/testcontainers/testcontainers-go v0.38.0 github.com/zeromicro/go-zero v1.8.5 @@ -56,6 +56,7 @@ require ( github.com/scylladb/go-reflectx v1.0.1 // indirect github.com/shirou/gopsutil/v4 v4.25.5 // indirect github.com/shopspring/decimal v1.4.0 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect diff --git a/go.sum b/go.sum index 2bc1cd5..096efff 100644 --- a/go.sum +++ b/go.sum @@ -87,6 +87,8 @@ github.com/go-openapi/swag v0.22.4 h1:QLMzNJnMGPRNDCbySlcj1x01tzU8/9LTTL9hZZZogB github.com/go-openapi/swag v0.22.4/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= +github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= +github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/gocql/gocql v1.5.0 h1:Lth5ZH2Wzf6lQ8UC/ddOQS0BZ/YtMuUtIsbyt9oCBTM= github.com/gocql/gocql v1.5.0/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= diff --git a/internal/domain/blockchain/const.go b/internal/domain/blockchain/const.go index f72119c..8f3b055 100644 --- a/internal/domain/blockchain/const.go +++ b/internal/domain/blockchain/const.go @@ -2,3 +2,18 @@ package blockchain const RedisKeySymbolList = "symbol:all" const SymbolExpire = 3600 // 秒(目前為一小時) + +type Interval string + +const ( + Interval1m Interval = "1m" + Interval3m Interval = "3m" + Interval5m Interval = "5m" + Interval15m Interval = "15m" + Interval30m Interval = "30m" + Interval1h Interval = "1h" + Interval4h Interval = "4h" + Interval1d Interval = "1d" + Interval1w Interval = "1w" + Interval1M Interval = "1M" +) diff --git a/internal/domain/blockchain/kline.go b/internal/domain/blockchain/kline.go new file mode 100644 index 0000000..f8afad2 --- /dev/null +++ b/internal/domain/blockchain/kline.go @@ -0,0 +1,16 @@ +package blockchain + +type Kline struct { + Exchange string // "binance" | "coinbase" | "okx" ... + Symbol string // 內部統一格式:BTCUSDT(Adapter 內部自行轉換) + Interval Interval + OpenTime int64 // ms + CloseTime int64 // ms + Open string + High string + Low string + Close string + Volume string // 以交易所原生定義為準(現貨通常 base 量) + Final bool // 這根是否已收 + Raw []byte // 原始 payload(除錯用途) +} diff --git a/internal/domain/repository/exchange_adapter.go b/internal/domain/repository/exchange_adapter.go new file mode 100644 index 0000000..1690f99 --- /dev/null +++ b/internal/domain/repository/exchange_adapter.go @@ -0,0 +1,25 @@ +package repository + +import ( + "blockchain/internal/domain/blockchain" + "time" +) + +// ====== Adapter 介面(每家交易所各實作一份) ====== + +type ExchangeAdapter interface { + Name() string // e.g. "binance" + URL() string // 該交易所的 ws 連線位址 + NormalizeSymbol(internal string) string // 內部 -> 交易所格式(ex: BTCUSDT -> BTC-USD / BTC-USDT) + DenormalizeSymbol(external string) string // 交易所 -> 內部格式 + BuildSubscribe(symbols []string, interval blockchain.Interval) ([][]byte, error) // BuildSubscribe 訂閱K線(可一次多個) + BuildUnsubscribe(symbols []string, interval blockchain.Interval) ([][]byte, error) // BuildUnsubscribe 取消訂閱K線(可一次多個) + ParseKLines(msg []byte) ([]blockchain.Kline, error) // ParseKLines 解析 WS 訊息:不是 K 線就回 (nil, nil) + ClientPingInterval() time.Duration // ClientPingInterval 心跳:是否需要主動 ping(有些交易所 server 會送 ping) ,0 表示不用主動 ping + ReadDeadline() time.Duration // 建議 read deadline +} + +type StreamHandler interface { + OnKline(k blockchain.Kline) + OnError(err error) +} diff --git a/internal/domain/usecase/exchange_connect.go b/internal/domain/usecase/exchange_connect.go new file mode 100644 index 0000000..196629e --- /dev/null +++ b/internal/domain/usecase/exchange_connect.go @@ -0,0 +1,8 @@ +package usecase + +import "blockchain/internal/domain/blockchain" + +type ExchangeConnect interface { + SubscribeKLine(symbols []string, interval blockchain.Interval) error + RunForever() +} diff --git a/internal/lib/websocket/config.go b/internal/lib/websocket/config.go deleted file mode 100644 index e8fafea..0000000 --- a/internal/lib/websocket/config.go +++ /dev/null @@ -1,27 +0,0 @@ -package websocket - -import ( - "crypto/tls" - "github.com/lxzan/gws" - "net/http" - "time" -) - -type ClientOption struct { - WriteBufferSize int - PermessageDeflate gws.PermessageDeflate - ParallelEnabled bool - ParallelGoLimit int - ReadMaxPayloadSize int - ReadBufferSize int - WriteMaxPayloadSize int - CheckUtf8Enabled bool - Logger gws.Logger - Recovery func(logger gws.Logger) - Addr string - RequestHeader http.Header - HandshakeTimeout time.Duration - TlsConfig *tls.Config - NewDialer func() (gws.Dialer, error) - NewSession func() gws.SessionStorage -} diff --git a/internal/lib/websocket/connection.go b/internal/lib/websocket/connection.go deleted file mode 100644 index 3ed8ef8..0000000 --- a/internal/lib/websocket/connection.go +++ /dev/null @@ -1,25 +0,0 @@ -package websocket - -import ( - "github.com/lxzan/gws" - "net/url" -) - -type Connection struct { - *gws.Conn -} - -func NewWebSocketConnect(url url.URL, handler gws.Event) (*Connection, error) { - socket, _, err := gws.NewClient(handler, &gws.ClientOption{ - Addr: url.String(), - }) - if err != nil { - return nil, err - } - // 取得消息 - go socket.ReadLoop() - - return &Connection{ - socket, - }, nil -} diff --git a/internal/lib/websocket/option.go b/internal/lib/websocket/option.go deleted file mode 100644 index 708bc8c..0000000 --- a/internal/lib/websocket/option.go +++ /dev/null @@ -1 +0,0 @@ -package websocket diff --git a/internal/repository/binance_adapter.go b/internal/repository/binance_adapter.go new file mode 100644 index 0000000..62fc8b6 --- /dev/null +++ b/internal/repository/binance_adapter.go @@ -0,0 +1,149 @@ +package repository + +import ( + "blockchain/internal/domain/blockchain" + "blockchain/internal/domain/repository" + "fmt" + "github.com/goccy/go-json" + "strings" + "time" +) + +type BinanceAdapterParam struct { + Name string + WsURL string + ClientPingInterval time.Duration + ReadDeadline time.Duration +} + +type BinanceAdapter struct { + name string + wsUrl string + clientPingInterval time.Duration + readDeadline time.Duration +} + +func NewBinanceAdapter(param BinanceAdapterParam) repository.ExchangeAdapter { + return &BinanceAdapter{ + name: param.Name, + wsUrl: param.WsURL, + clientPingInterval: param.ClientPingInterval, + readDeadline: param.ReadDeadline, + } +} + +func (repo *BinanceAdapter) Name() string { + return repo.name +} + +func (repo *BinanceAdapter) URL() string { + return repo.wsUrl +} + +// NormalizeSymbol 我系統內部也是統一用 大寫 BTCUSDT +func (repo *BinanceAdapter) NormalizeSymbol(internal string) string { + // 驗證:必須全部是大寫英文,且長度至少 6(如 BTCUSDT) + if !isAllUpperAlpha(internal) { + panic(fmt.Sprintf("invalid symbol format: %s (must be all uppercase letters, e.g., BTCUSDT)", internal)) + } + // Binance 訂閱格式是小寫 + return strings.ToLower(internal) +} + +func (repo *BinanceAdapter) DenormalizeSymbol(external string) string { + // Binance 回傳小寫,轉回內部格式(大寫) + return strings.ToUpper(external) +} + +func (repo *BinanceAdapter) BuildSubscribe(symbols []string, interval blockchain.Interval) ([][]byte, error) { + params := make([]string, 0, len(symbols)) + for _, s := range symbols { + params = append(params, fmt.Sprintf("%s@kline_%s", repo.NormalizeSymbol(s), interval)) + } + req := map[string]any{"method": "SUBSCRIBE", "params": params, "id": time.Now().UnixNano()} + b, _ := json.Marshal(req) + + return [][]byte{b}, nil +} + +func (repo *BinanceAdapter) BuildUnsubscribe(symbols []string, interval blockchain.Interval) ([][]byte, error) { + params := make([]string, 0, len(symbols)) + for _, s := range symbols { + params = append(params, fmt.Sprintf("%s@kline_%s", repo.NormalizeSymbol(s), interval)) + } + req := map[string]any{"method": "UNSUBSCRIBE", "params": params, "id": time.Now().UnixNano()} + b, _ := json.Marshal(req) + return [][]byte{b}, nil +} + +func (repo *BinanceAdapter) ParseKLines(msg []byte) ([]blockchain.Kline, error) { + res := BinanceKlineEvent{} + if err := json.Unmarshal(msg, &res); err != nil { + return nil, nil + } // 不是 kline 就略過 + if res.EventType != "kline" { + return nil, nil + } + + return []blockchain.Kline{{ + Exchange: repo.Name(), + Symbol: repo.DenormalizeSymbol(res.Symbol), + Interval: blockchain.Interval(res.K.Interval), + OpenTime: res.K.StartTime, + CloseTime: res.K.CloseTime, + Open: res.K.Open, High: res.K.High, Low: res.K.Low, Close: res.K.Close, Volume: res.K.Volume, + Final: res.K.Final, + Raw: msg, + }}, nil +} + +func (repo *BinanceAdapter) ClientPingInterval() time.Duration { + return repo.clientPingInterval +} + +func (repo *BinanceAdapter) ReadDeadline() time.Duration { + return repo.readDeadline +} + +// 工具函式:檢查是否全為大寫英文字母 +func isAllUpperAlpha(s string) bool { + if len(s) == 0 { + return false + } + for _, r := range s { + if r < 'A' || r > 'Z' { + return false + } + } + return true +} + +// BinanceKlineEvent 代表幣安 WebSocket 推送的 kline 事件 +// 範例:{"e":"kline","E":..., "s":"BTCUSDT", "k":{...}} +type BinanceKlineEvent struct { + EventType string `json:"e"` // 事件類型,固定為 "kline" + EventTime int64 `json:"E"` // 事件時間 (毫秒 UNIX 時戳) + Symbol string `json:"s"` // 交易對,例如 "BTCUSDT" + K BinanceKlineBody `json:"k"` // K 線細節 +} + +// BinanceKlineBody 對應 "k" 物件(單一根 K 線的詳細資訊) +type BinanceKlineBody struct { + StartTime int64 `json:"t"` // 本根 K 線開盤時間 (ms) + CloseTime int64 `json:"T"` // 本根 K 線關閉時間/結束時間 (ms) + Symbol string `json:"s"` // 交易對(與外層 s 相同) + Interval string `json:"i"` // 週期,例如 "1m","5m","1h","1d","1M"(月線) + FirstTradeID int64 `json:"f"` // 本根K線包含的第一筆成交ID + LastTradeID int64 `json:"L"` // 本根K線包含的最後一筆成交ID + Open string `json:"o"` // 開盤價(字串,避免浮點誤差) + Close string `json:"c"` // 收盤價(字串) + High string `json:"h"` // 最高價(字串) + Low string `json:"l"` // 最低價(字串) + Volume string `json:"v"` // 交易量(Base 資產數量,字串) + TradeCount int64 `json:"n"` // 成交筆數 + Final bool `json:"x"` // 是否已收盤(true=此根K線已完成;false=仍在形成中) + QuoteAssetVolume string `json:"q"` // 交易額(Quote 資產成交額,字串) + TakerBuyBaseVolume string `json:"V"` // 主動買單成交量(Base),字串 + TakerBuyQuoteVolume string `json:"Q"` // 主動買單成交額(Quote),字串 + Ignore string `json:"B"` // 保留欄位(可忽略) +} diff --git a/internal/repository/data_source_binance.go b/internal/repository/data_source_binance.go index 2f31bfe..bca47bf 100644 --- a/internal/repository/data_source_binance.go +++ b/internal/repository/data_source_binance.go @@ -10,8 +10,8 @@ import ( "bytes" "context" "encoding/csv" - "encoding/json" "fmt" + "github.com/goccy/go-json" "io" "net/http" "os" diff --git a/internal/svc/service_context.go b/internal/svc/service_context.go index 824981d..19d9233 100644 --- a/internal/svc/service_context.go +++ b/internal/svc/service_context.go @@ -6,7 +6,6 @@ import ( "blockchain/internal/domain/usecase" repo "blockchain/internal/repository" uc "blockchain/internal/usecase" - "github.com/zeromicro/go-zero/core/stores/redis" ) @@ -33,7 +32,7 @@ func NewServiceContext(c config.Config) *ServiceContext { DB: cassandra, KeySpace: c.Cassandra.Keyspace, }) - + InitBinanceKLineWebsocket() return &ServiceContext{ Config: c, BinanceRepo: binanceRepo, diff --git a/internal/svc/websocket.go b/internal/svc/websocket.go new file mode 100644 index 0000000..abc9276 --- /dev/null +++ b/internal/svc/websocket.go @@ -0,0 +1,58 @@ +package svc + +import ( + "blockchain/internal/domain/blockchain" + uc "blockchain/internal/domain/usecase" + repo "blockchain/internal/repository" + "blockchain/internal/usecase" + "time" +) + +func InitBinanceKLineWebsocket() { + // 建立 adapter & handler + adapter := repo.NewBinanceAdapter(repo.BinanceAdapterParam{ + Name: "Binance", + WsURL: "wss://fstream.binance.com/ws", + ClientPingInterval: 15 * time.Second, + ReadDeadline: 70 * time.Second, + }) + handler := &usecase.PubHandler{} + + cli := usecase.NewConnection(usecase.BinanceExchangeParam{ + Adapter: adapter, + Handler: handler, + Backoff: time.Second, + MaxBackoff: 30 * time.Second, + Parallel: true, + ParallelN: 2048, + }) + + go Sub(cli, "BTCUSDT", "ETHUSDT") + // 這裡之後可以看要怎麼寫,改一下就可以訂閱更多,並且分連線,1024 個東西分一個連線 + + go cli.RunForever() +} + +func Sub(cli uc.ExchangeConnect, pair ...string) { + _ = cli.SubscribeKLine(pair, blockchain.Interval1m) + time.Sleep(5 * time.Second) + _ = cli.SubscribeKLine(pair, blockchain.Interval3m) + time.Sleep(5 * time.Second) + _ = cli.SubscribeKLine(pair, blockchain.Interval5m) + time.Sleep(5 * time.Second) + _ = cli.SubscribeKLine(pair, blockchain.Interval15m) + time.Sleep(5 * time.Second) + _ = cli.SubscribeKLine(pair, blockchain.Interval30m) + time.Sleep(5 * time.Second) + _ = cli.SubscribeKLine(pair, blockchain.Interval1h) + time.Sleep(5 * time.Second) + _ = cli.SubscribeKLine(pair, blockchain.Interval4h) + time.Sleep(5 * time.Second) + _ = cli.SubscribeKLine(pair, blockchain.Interval1d) + time.Sleep(5 * time.Second) + _ = cli.SubscribeKLine(pair, blockchain.Interval1w) + time.Sleep(5 * time.Second) + _ = cli.SubscribeKLine(pair, blockchain.Interval1M) + + return +} diff --git a/internal/usecase/websocket_binance.go b/internal/usecase/websocket_binance.go new file mode 100644 index 0000000..6d9e23d --- /dev/null +++ b/internal/usecase/websocket_binance.go @@ -0,0 +1,197 @@ +package usecase + +import ( + "blockchain/internal/domain/blockchain" + "blockchain/internal/domain/repository" + "blockchain/internal/domain/usecase" + "context" + "github.com/lxzan/gws" + "github.com/zeromicro/go-zero/core/logx" + "sync" + "time" +) + +type BinanceExchangeParam struct { + Adapter repository.ExchangeAdapter + Handler repository.StreamHandler + Backoff time.Duration + MaxBackoff time.Duration + Parallel bool + ParallelN int +} + +type Connection struct { + // Adapter:交易所的實作(Binance/Coinbase/OKX ...) + // 用來決定如何建立連線、訂閱訊息、解析 K 線、補歷史資料等 + Adapter repository.ExchangeAdapter + + // Handler:資料事件回呼,讓上層接收到統一格式的 K 線或錯誤通知 + Handler repository.StreamHandler + + // === 內部連線狀態 === + conn *gws.Conn // 當前 WebSocket 連線物件 + ctx context.Context // 控制心跳、讀取等 goroutine 的生命週期 + cancel context.CancelFunc // 用來結束 ctx + + // === 訂閱管理 === + subsMu sync.Mutex // 保護 subs map 的互斥鎖 + subs map[string]map[blockchain.Interval]struct{} // 已訂閱的主題表,key=symbol,value=set(interval) + + // === 重連控制 === + backoff time.Duration // 當前重連延遲時間(指數退避用) + maxBackoff time.Duration // 最大重連延遲時間 + + // === gws 併發讀取設定 === + parallel bool // 是否開啟 gws 的 ParallelEnabled + parallelN int // 平行讀取時允許的最大 goroutine 數 +} + +func (c *Connection) SubscribeKLine(symbols []string, interval blockchain.Interval) error { + c.subsMu.Lock() + defer c.subsMu.Unlock() + for _, s := range symbols { + if _, ok := c.subs[s]; !ok { + c.subs[s] = map[blockchain.Interval]struct{}{} + } + c.subs[s][interval] = struct{}{} + } + // 若已連線就立即送訂閱訊息 + if c.conn != nil { + payloads, err := c.Adapter.BuildSubscribe(symbols, interval) + if err != nil { + return err + } + for _, p := range payloads { + _ = c.conn.WriteMessage(gws.OpcodeText, p) + } + } + + return nil +} + +func (c *Connection) RunForever() { + for { + h := WebSocket{c} + conn, _, err := gws.NewClient(&h, &gws.ClientOption{ + Addr: c.Adapter.URL(), + ParallelEnabled: c.parallel, + ParallelGolimit: c.parallelN, + PermessageDeflate: gws.PermessageDeflate{ + Enabled: true, + ServerContextTakeover: true, + ClientContextTakeover: true, + }, + RequestHeader: map[string][]string{"User-Agent": {"universal-ws/1.0"}}, + }) + if err != nil { + c.sleepBackoff() + continue + } + c.conn = conn + _ = conn.SetReadDeadline(time.Now().Add(c.Adapter.ReadDeadline())) + conn.ReadLoop() // 阻塞直到關閉 + c.conn = nil // 清除 + c.sleepBackoff() // 退避再重連 + } +} + +func (c *Connection) sleepBackoff() { + time.Sleep(c.backoff) + if c.backoff < c.maxBackoff { + c.backoff *= 2 + if c.backoff > c.maxBackoff { + c.backoff = c.maxBackoff + } + } +} + +func NewConnection(param BinanceExchangeParam) usecase.ExchangeConnect { + ctx, cancel := context.WithCancel(context.Background()) + return &Connection{ + Adapter: param.Adapter, + Handler: param.Handler, + ctx: ctx, + cancel: cancel, + subs: make(map[string]map[blockchain.Interval]struct{}), + backoff: param.Backoff, + maxBackoff: param.MaxBackoff, + parallel: param.Parallel, + parallelN: param.ParallelN, + } +} + +// ======================================== + +type WebSocket struct { + conn *Connection +} + +func (ws *WebSocket) OnClose(socket *gws.Conn, err error) { + ws.conn.Handler.OnError(err) +} + +func (ws *WebSocket) OnPong(socket *gws.Conn, payload []byte) { + _ = socket.SetReadDeadline(time.Now().Add(ws.conn.Adapter.ReadDeadline())) +} + +func (ws *WebSocket) OnOpen(c *gws.Conn) { + ws.conn.backoff = time.Second // 重置退避 + // 重訂閱所有 + ws.conn.subsMu.Lock() + for s, set := range ws.conn.subs { + for itv := range set { + payloads, err := ws.conn.Adapter.BuildSubscribe([]string{s}, itv) + if err != nil { + ws.conn.Handler.OnError(err) + continue + } + for _, p := range payloads { + _ = c.WriteMessage(gws.OpcodeText, p) + } + //// 回補(可平行:這裡示範最近 2 根的缺口;實務上你可記錄 last close 再補) + //go func(symbol string, iv blockchain.Interval) { + // end := time.Now().UnixMilli() + // start := end - 2*int64(time.Minute/time.Millisecond) // demo: 補近 2m + // kl, err := ws.conn.Adapter.Backfill(symbol, iv, start, end, 500) + // if err != nil { + // ws.conn.Handler.OnError(err) + // return + // } + // for _, k := range kl { + // ws.conn.Handler.OnKline(k) + // } + //}(s, itv) + } + } + ws.conn.subsMu.Unlock() +} + +func (ws *WebSocket) OnPing(socket *gws.Conn, payload []byte) { + _ = socket.WritePong(payload) + _ = socket.SetReadDeadline(time.Now().Add(ws.conn.Adapter.ReadDeadline())) +} + +func (ws *WebSocket) OnMessage(socket *gws.Conn, message *gws.Message) { + defer message.Close() + _ = socket.SetReadDeadline(time.Now().Add(ws.conn.Adapter.ReadDeadline())) + kLines, err := ws.conn.Adapter.ParseKLines(message.Data.Bytes()) + if err != nil { + ws.conn.Handler.OnError(err) + + return + } + + for _, k := range kLines { + ws.conn.Handler.OnKline(k) + } +} + +// ======================================== + +type PubHandler struct{} + +func (h *PubHandler) OnKline(k blockchain.Kline) { + // 收到之後可以送到 pub/sub 讓其他 + logx.Infof("[PubHandler] Kline:%v", k) +} +func (h *PubHandler) OnError(err error) { logx.Errorf("failed to pub k line %v", err.Error()) }