add history url

This commit is contained in:
王性驊 2025-08-05 17:35:07 +08:00
parent ba68a8d4c1
commit 5451ca1992
5 changed files with 132 additions and 73 deletions

1
go.mod
View File

@ -46,6 +46,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/jszwec/csvutil v1.10.0
github.com/klauspost/compress v1.17.11 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect

View File

@ -0,0 +1,4 @@
package blockchain
const BinanceHistoryDataBase = "https://data.binance.vision"
const BinanceHistoryDataKlines = "/data/spot/daily/klines"

View File

@ -1,5 +1,4 @@
package blockchain
const RedisKeySymbolList = "symbol:all"
const SymbolExpire = 3600 // 秒(目前為一小時)

View File

@ -6,6 +6,7 @@ import (
"blockchain/internal/domain/blockchain"
"blockchain/internal/domain/entity"
"blockchain/internal/domain/repository"
"bytes"
"context"
"encoding/csv"
"encoding/json"
@ -14,11 +15,11 @@ import (
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"sync"
"github.com/adshao/go-binance/v2"
"github.com/jszwec/csvutil"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/redis"
"github.com/zeromicro/go-zero/core/syncx"
@ -132,86 +133,123 @@ func (repo *BinanceRepository) getSymbolsFromSource(ctx context.Context) ([]bina
return info.Symbols, nil
}
func (repo *BinanceRepository) FetchHistoryKline(ctx context.Context, symbol string, interval string, startMillis, endMillis int64) ([]*entity.Kline, error) {
const baseURL = "https://data.binance.vision/data/spot/daily/klines"
// 計算時間範圍
var startDate, endDate time.Time
if startMillis == 0 {
// 若沒指定,直接假設 2009-01-01比特幣最早有記錄的位置其他幣不太可能早過這個
startDate = time.Date(2009, 1, 1, 0, 0, 0, 0, time.UTC)
} else {
startDate = time.UnixMilli(startMillis)
}
if endMillis == 0 {
endDate = time.Now()
} else {
endDate = time.UnixMilli(endMillis)
}
func (repo *BinanceRepository) fetchHistoryKline(ctx context.Context, symbol string, interval string, date string) ([]*entity.Kline, error) {
baseURL := fmt.Sprintf("%s%s", blockchain.BinanceHistoryDataBase, blockchain.BinanceHistoryDataKlines)
symbol = strings.ToUpper(symbol)
zipFile := fmt.Sprintf("%s-%s-%s.zip", symbol, interval, date)
url := fmt.Sprintf("%s/%s/%s/%s", baseURL, symbol, interval, zipFile)
// 先 HEAD 確認檔案是否存在,節省流量
respHead, err := http.Head(url)
if err != nil || respHead.StatusCode != 200 {
return nil, fmt.Errorf("file not found: %s", url)
}
// 下載 zip
resp, err := http.Get(url)
if err != nil || resp.StatusCode != 200 {
if resp != nil {
resp.Body.Close()
}
return nil, fmt.Errorf("failed to fetch file %s", url)
}
tmpPath := filepath.Join(os.TempDir(), zipFile)
out, err := os.Create(tmpPath)
if err != nil {
resp.Body.Close()
return nil, err
}
_, _ = io.Copy(out, resp.Body)
out.Close()
resp.Body.Close()
// 解壓縮
r, err := zip.OpenReader(tmpPath)
if err != nil {
os.Remove(tmpPath)
return nil, err
}
defer r.Close()
defer os.Remove(tmpPath)
var result []*entity.Kline
header := []string{
"open_time", "open", "high", "low", "close", "volume", "close_time",
"quote_asset_volume", "number_of_trades", "taker_buy_base_asset_volume",
"taker_buy_quote_asset_volume", "ignore",
}
// 逐天下載
for d := startDate; !d.After(endDate); d = d.AddDate(0, 0, 1) {
select {
case <-ctx.Done():
return result, ctx.Err()
default:
}
dateStr := d.Format("2006-01-02")
zipFile := fmt.Sprintf("%s-%s-%s.zip", symbol, interval, dateStr)
url := fmt.Sprintf("%s/%s/%s/%s", baseURL, symbol, interval, zipFile)
resp, err := http.Get(url)
if err != nil || resp.StatusCode != 200 {
continue // 檔案不存在就跳過
}
tmpPath := filepath.Join(os.TempDir(), zipFile)
out, _ := os.Create(tmpPath)
io.Copy(out, resp.Body)
out.Close()
resp.Body.Close()
// 解壓縮
r, err := zip.OpenReader(tmpPath)
for _, f := range r.File {
rc, err := f.Open()
if err != nil {
os.Remove(tmpPath)
continue
}
for _, f := range r.File {
rc, _ := f.Open()
reader := csv.NewReader(rc)
for {
record, err := reader.Read()
if err == io.EOF {
break
}
if err != nil || len(record) < 12 {
continue
}
ot, _ := strconv.ParseInt(record[0], 10, 64)
ct, _ := strconv.ParseInt(record[6], 10, 64)
num, _ := strconv.Atoi(record[8])
result = append(result, &entity.Kline{
OpenTime: ot,
Open: record[1],
High: record[2],
Low: record[3],
Close: record[4],
Volume: record[5],
CloseTime: ct,
QuoteAssetVolume: record[7],
NumberOfTrades: num,
TakerBuyBaseAssetVolume: record[9],
TakerBuyQuoteAssetVolume: record[10],
Ignore: record[11],
})
var buf bytes.Buffer
writer := csv.NewWriter(&buf)
_ = writer.Write(header)
reader := csv.NewReader(rc)
for {
record, err := reader.Read()
if err == io.EOF {
break
}
rc.Close()
if err != nil || len(record) < 12 {
continue
}
_ = writer.Write(record)
}
r.Close()
os.Remove(tmpPath)
writer.Flush()
rc.Close()
// csvutil parse
var klines []*entity.Kline
if err := csvutil.Unmarshal(buf.Bytes(), &klines); err != nil {
continue
}
// 可根據需要加上 symbol/interval
for _, k := range klines {
k.Symbol = symbol
k.Interval = interval
}
result = append(result, klines...)
}
return result, nil
}
func (repo *BinanceRepository) FetchHistoryKline(ctx context.Context, symbol string, interval string, startMillis, endMillis int64) ([]*entity.Kline, error) {
ch := make(chan []*entity.Kline, poolSize)
var wg sync.WaitGroup
pool, _ := ants.NewPool(poolSize)
defer pool.Release()
// 產生所有天的任務
for d := startDate; !d.After(endDate); d = d.AddDate(0, 0, 1) {
day := d
wg.Add(1)
_ = pool.Submit(func() {
defer wg.Done()
klines, err := fetchOneDayKline(ctx, symbol, interval, day)
if err == nil && len(klines) > 0 {
ch <- klines // 只要拿到資料就丟進 channel
}
// 沒資料不用丟,避免 nil append
})
}
// 等全部任務完成再關閉 channel
go func() {
wg.Wait()
close(ch)
}()
// 收集所有 K 線
var allKlines []*Kline
for klines := range ch {
allKlines = append(allKlines, klines...)
}
return allKlines, nil
}

View File

@ -6,6 +6,7 @@ import (
"fmt"
"sync"
"testing"
"time"
miniredis "github.com/alicebob/miniredis/v2"
"github.com/zeromicro/go-zero/core/stores/redis"
@ -81,3 +82,19 @@ func TestGetSymbolsFromSource_TableDriven(t *testing.T) {
})
}
}
func TestSymbol(t *testing.T) {
mr, rdb := setupMiniRedis()
defer mr.Close()
repo := MustBinanceRepository(BinanceRepositoryParam{
Conf: &config.Binance{
Key: "",
Secret: "",
TestMode: true,
},
Redis: rdb,
})
repo.FetchHistoryKline(context.Background(), "BTCUSDT", "1m", time.Date(2025, 8, 4, 0, 0, 0, 0, time.UTC).UnixMilli(), 0)
}