blockchain/internal/repository/data_source_binance.go

125 lines
3.2 KiB
Go

package repository
import (
"blockchain/internal/config"
"blockchain/internal/domain/blockchain"
"blockchain/internal/domain/entity"
"blockchain/internal/domain/repository"
"context"
"encoding/json"
"fmt"
"github.com/adshao/go-binance/v2"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/redis"
"github.com/zeromicro/go-zero/core/syncx"
)
type BinanceRepositoryParam struct {
Conf *config.Binance
Redis *redis.Redis
}
type BinanceRepository struct {
Client *binance.Client
rds *redis.Redis
barrier syncx.SingleFlight
}
func MustBinanceRepository(param BinanceRepositoryParam) repository.DataSourceRepository {
apiKey := ""
secret := ""
if param.Conf.TestMode {
binance.UseTestnet = true
}
client := binance.NewClient(apiKey, secret)
return &BinanceRepository{
Client: client,
rds: param.Redis,
barrier: syncx.NewSingleFlight(),
}
}
func (repo *BinanceRepository) GetSymbols(ctx context.Context) ([]entity.Symbol, error) {
// 優先從 redis hash 拿
cached, err := repo.rds.Hgetall(blockchain.RedisKeySymbolList)
if err == nil && len(cached) > 0 {
symbols := make([]entity.Symbol, 0, len(cached))
canUseCache := true
for _, v := range cached {
var symbol entity.Symbol
if err := json.Unmarshal([]byte(v), &symbol); err == nil {
symbols = append(symbols, symbol)
} else {
// 如果任何一個反序列化失敗,代表快取可能已損壞,最好是回源重新拉取
canUseCache = false
break
}
}
if canUseCache {
return symbols, nil
}
}
// 用 SingleFlight 保證只有一個請求真的去 Binance
val, err := repo.barrier.Do(blockchain.RedisKeySymbolList, func() (any, error) {
// 拉 source
srcSymbols, err := repo.getSymbolsFromSource(ctx)
if err != nil {
return nil, err
}
result := make([]entity.Symbol, 0, len(srcSymbols))
hashData := make(map[string]string, len(srcSymbols))
for _, s := range srcSymbols {
// 只挑目前需要的欄位
symbolEntity := entity.Symbol{
Symbol: s.Symbol,
Status: s.Status,
BaseAsset: s.BaseAsset,
BaseAssetPrecision: s.BaseAssetPrecision,
QuoteAsset: s.QuoteAsset,
QuoteAssetPrecision: s.QuoteAssetPrecision,
}
result = append(result, symbolEntity)
// 將單一 symbol 序列化,準備寫入 hash
raw, err := json.Marshal(symbolEntity)
if err != nil {
logx.Error("failed to marshal symbol entity")
continue
}
hashData[symbolEntity.Symbol] = string(raw)
}
if len(hashData) > 0 {
// 使用 HMSET 一次寫入多個欄位到 hash
if err := repo.rds.Hmset(blockchain.RedisKeySymbolList, hashData); err == nil {
// 再對整個 key 設置過期時間
_ = repo.rds.Expire(blockchain.RedisKeySymbolList, blockchain.SymbolExpire)
}
}
return result, nil
})
if err != nil {
return nil, err
}
return val.([]entity.Symbol), nil
}
func (repo *BinanceRepository) getSymbolsFromSource(ctx context.Context) ([]binance.Symbol, error) {
if repo.Client == nil {
return nil, fmt.Errorf("binance client not initialized")
}
// 取得幣安交易所資訊
info, err := repo.Client.NewExchangeInfoService().Do(ctx)
if err != nil {
return nil, err
}
return info.Symbols, nil
}