blockchain/internal/repository/data_source_binance.go

218 lines
5.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package repository
import (
"archive/zip"
"blockchain/internal/config"
"blockchain/internal/domain/blockchain"
"blockchain/internal/domain/entity"
"blockchain/internal/domain/repository"
"context"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/adshao/go-binance/v2"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/redis"
"github.com/zeromicro/go-zero/core/syncx"
)
type BinanceRepositoryParam struct {
Conf *config.Binance
Redis *redis.Redis
}
type BinanceRepository struct {
Client *binance.Client
rds *redis.Redis
barrier syncx.SingleFlight
}
func MustBinanceRepository(param BinanceRepositoryParam) repository.DataSourceRepository {
apiKey := ""
secret := ""
if param.Conf.TestMode {
binance.UseTestnet = true
}
client := binance.NewClient(apiKey, secret)
return &BinanceRepository{
Client: client,
rds: param.Redis,
barrier: syncx.NewSingleFlight(),
}
}
func (repo *BinanceRepository) GetSymbols(ctx context.Context) ([]*entity.Symbol, error) {
// 優先從 redis hash 拿
cached, err := repo.rds.Hgetall(blockchain.RedisKeySymbolList)
if err == nil && len(cached) > 0 {
symbols := make([]*entity.Symbol, 0, len(cached))
canUseCache := true
for _, v := range cached {
var symbol entity.Symbol
if err := json.Unmarshal([]byte(v), &symbol); err == nil {
symbols = append(symbols, &symbol)
} else {
// 如果任何一個反序列化失敗,代表快取可能已損壞,最好是回源重新拉取
canUseCache = false
break
}
}
if canUseCache {
return symbols, nil
}
}
// 用 SingleFlight 保證只有一個請求真的去 Binance
val, err := repo.barrier.Do(blockchain.RedisKeySymbolList, func() (any, error) {
// 拉 source
srcSymbols, err := repo.getSymbolsFromSource(ctx)
if err != nil {
return nil, err
}
result := make([]*entity.Symbol, 0, len(srcSymbols))
hashData := make(map[string]string, len(srcSymbols))
for _, s := range srcSymbols {
// 只挑目前需要的欄位
symbolEntity := &entity.Symbol{
Symbol: s.Symbol,
Status: s.Status,
BaseAsset: s.BaseAsset,
BaseAssetPrecision: s.BaseAssetPrecision,
QuoteAsset: s.QuoteAsset,
QuoteAssetPrecision: s.QuoteAssetPrecision,
}
result = append(result, symbolEntity)
// 將單一 symbol 序列化,準備寫入 hash
raw, err := json.Marshal(symbolEntity)
if err != nil {
logx.Error("failed to marshal symbol entity")
continue
}
hashData[symbolEntity.Symbol] = string(raw)
}
if len(hashData) > 0 {
// 使用 HMSET 一次寫入多個欄位到 hash
if err := repo.rds.Hmset(blockchain.RedisKeySymbolList, hashData); err == nil {
// 再對整個 key 設置過期時間
_ = repo.rds.Expire(blockchain.RedisKeySymbolList, blockchain.SymbolExpire)
}
}
return result, nil
})
if err != nil {
return nil, err
}
return val.([]*entity.Symbol), nil
}
func (repo *BinanceRepository) getSymbolsFromSource(ctx context.Context) ([]binance.Symbol, error) {
if repo.Client == nil {
return nil, fmt.Errorf("binance client not initialized")
}
// 取得幣安交易所資訊
info, err := repo.Client.NewExchangeInfoService().Do(ctx)
if err != nil {
return nil, err
}
return info.Symbols, nil
}
func (repo *BinanceRepository) FetchHistoryKline(ctx context.Context, symbol string, interval string, startMillis, endMillis int64) ([]*entity.Kline, error) {
const baseURL = "https://data.binance.vision/data/spot/daily/klines"
// 計算時間範圍
var startDate, endDate time.Time
if startMillis == 0 {
// 若沒指定,直接假設 2009-01-01比特幣最早有記錄的位置其他幣不太可能早過這個
startDate = time.Date(2009, 1, 1, 0, 0, 0, 0, time.UTC)
} else {
startDate = time.UnixMilli(startMillis)
}
if endMillis == 0 {
endDate = time.Now()
} else {
endDate = time.UnixMilli(endMillis)
}
symbol = strings.ToUpper(symbol)
var result []*entity.Kline
// 逐天下載
for d := startDate; !d.After(endDate); d = d.AddDate(0, 0, 1) {
select {
case <-ctx.Done():
return result, ctx.Err()
default:
}
dateStr := d.Format("2006-01-02")
zipFile := fmt.Sprintf("%s-%s-%s.zip", symbol, interval, dateStr)
url := fmt.Sprintf("%s/%s/%s/%s", baseURL, symbol, interval, zipFile)
resp, err := http.Get(url)
if err != nil || resp.StatusCode != 200 {
continue // 檔案不存在就跳過
}
tmpPath := filepath.Join(os.TempDir(), zipFile)
out, _ := os.Create(tmpPath)
io.Copy(out, resp.Body)
out.Close()
resp.Body.Close()
// 解壓縮
r, err := zip.OpenReader(tmpPath)
if err != nil {
os.Remove(tmpPath)
continue
}
for _, f := range r.File {
rc, _ := f.Open()
reader := csv.NewReader(rc)
for {
record, err := reader.Read()
if err == io.EOF {
break
}
if err != nil || len(record) < 12 {
continue
}
ot, _ := strconv.ParseInt(record[0], 10, 64)
ct, _ := strconv.ParseInt(record[6], 10, 64)
num, _ := strconv.Atoi(record[8])
result = append(result, &entity.Kline{
OpenTime: ot,
Open: record[1],
High: record[2],
Low: record[3],
Close: record[4],
Volume: record[5],
CloseTime: ct,
QuoteAssetVolume: record[7],
NumberOfTrades: num,
TakerBuyBaseAssetVolume: record[9],
TakerBuyQuoteAssetVolume: record[10],
Ignore: record[11],
})
}
rc.Close()
}
r.Close()
os.Remove(tmpPath)
}
return result, nil
}