From 5451ca1992d796d9d298c8b17d5f0d2fdba89efc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=80=A7=E9=A9=8A?= Date: Tue, 5 Aug 2025 17:35:07 +0800 Subject: [PATCH] add history url --- go.mod | 1 + internal/domain/blockchain/binance.go | 4 + internal/domain/blockchain/const.go | 1 - internal/repository/data_source_binance.go | 182 +++++++++++------- .../repository/data_source_binance_test.go | 17 ++ 5 files changed, 132 insertions(+), 73 deletions(-) create mode 100644 internal/domain/blockchain/binance.go diff --git a/go.mod b/go.mod index 4c5ea26..acdd162 100644 --- a/go.mod +++ b/go.mod @@ -46,6 +46,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/jszwec/csvutil v1.10.0 github.com/klauspost/compress v1.17.11 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect diff --git a/internal/domain/blockchain/binance.go b/internal/domain/blockchain/binance.go new file mode 100644 index 0000000..815111b --- /dev/null +++ b/internal/domain/blockchain/binance.go @@ -0,0 +1,4 @@ +package blockchain + +const BinanceHistoryDataBase = "https://data.binance.vision" +const BinanceHistoryDataKlines = "/data/spot/daily/klines" diff --git a/internal/domain/blockchain/const.go b/internal/domain/blockchain/const.go index 362b037..f72119c 100644 --- a/internal/domain/blockchain/const.go +++ b/internal/domain/blockchain/const.go @@ -1,5 +1,4 @@ package blockchain const RedisKeySymbolList = "symbol:all" - const SymbolExpire = 3600 // 秒(目前為一小時) diff --git a/internal/repository/data_source_binance.go b/internal/repository/data_source_binance.go index 25f8664..bc3a0cd 100644 --- a/internal/repository/data_source_binance.go +++ b/internal/repository/data_source_binance.go @@ -6,6 +6,7 @@ import ( "blockchain/internal/domain/blockchain" "blockchain/internal/domain/entity" "blockchain/internal/domain/repository" + "bytes" "context" "encoding/csv" "encoding/json" @@ -14,11 +15,11 @@ import ( "net/http" "os" "path/filepath" - "strconv" "strings" - "time" + "sync" "github.com/adshao/go-binance/v2" + "github.com/jszwec/csvutil" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/stores/redis" "github.com/zeromicro/go-zero/core/syncx" @@ -132,86 +133,123 @@ func (repo *BinanceRepository) getSymbolsFromSource(ctx context.Context) ([]bina return info.Symbols, nil } -func (repo *BinanceRepository) FetchHistoryKline(ctx context.Context, symbol string, interval string, startMillis, endMillis int64) ([]*entity.Kline, error) { - const baseURL = "https://data.binance.vision/data/spot/daily/klines" - // 計算時間範圍 - var startDate, endDate time.Time - if startMillis == 0 { - // 若沒指定,直接假設 2009-01-01(比特幣最早有記錄的位置,其他幣不太可能早過這個) - startDate = time.Date(2009, 1, 1, 0, 0, 0, 0, time.UTC) - } else { - startDate = time.UnixMilli(startMillis) - } - if endMillis == 0 { - endDate = time.Now() - } else { - endDate = time.UnixMilli(endMillis) - } +func (repo *BinanceRepository) fetchHistoryKline(ctx context.Context, symbol string, interval string, date string) ([]*entity.Kline, error) { + baseURL := fmt.Sprintf("%s%s", blockchain.BinanceHistoryDataBase, blockchain.BinanceHistoryDataKlines) symbol = strings.ToUpper(symbol) + zipFile := fmt.Sprintf("%s-%s-%s.zip", symbol, interval, date) + url := fmt.Sprintf("%s/%s/%s/%s", baseURL, symbol, interval, zipFile) + + // 先 HEAD 確認檔案是否存在,節省流量 + respHead, err := http.Head(url) + if err != nil || respHead.StatusCode != 200 { + return nil, fmt.Errorf("file not found: %s", url) + } + + // 下載 zip + resp, err := http.Get(url) + if err != nil || resp.StatusCode != 200 { + if resp != nil { + resp.Body.Close() + } + return nil, fmt.Errorf("failed to fetch file %s", url) + } + tmpPath := filepath.Join(os.TempDir(), zipFile) + out, err := os.Create(tmpPath) + if err != nil { + resp.Body.Close() + return nil, err + } + _, _ = io.Copy(out, resp.Body) + out.Close() + resp.Body.Close() + + // 解壓縮 + r, err := zip.OpenReader(tmpPath) + if err != nil { + os.Remove(tmpPath) + return nil, err + } + defer r.Close() + defer os.Remove(tmpPath) var result []*entity.Kline + header := []string{ + "open_time", "open", "high", "low", "close", "volume", "close_time", + "quote_asset_volume", "number_of_trades", "taker_buy_base_asset_volume", + "taker_buy_quote_asset_volume", "ignore", + } - // 逐天下載 - for d := startDate; !d.After(endDate); d = d.AddDate(0, 0, 1) { - select { - case <-ctx.Done(): - return result, ctx.Err() - default: - } - dateStr := d.Format("2006-01-02") - zipFile := fmt.Sprintf("%s-%s-%s.zip", symbol, interval, dateStr) - url := fmt.Sprintf("%s/%s/%s/%s", baseURL, symbol, interval, zipFile) - - resp, err := http.Get(url) - if err != nil || resp.StatusCode != 200 { - continue // 檔案不存在就跳過 - } - tmpPath := filepath.Join(os.TempDir(), zipFile) - out, _ := os.Create(tmpPath) - io.Copy(out, resp.Body) - out.Close() - resp.Body.Close() - - // 解壓縮 - r, err := zip.OpenReader(tmpPath) + for _, f := range r.File { + rc, err := f.Open() if err != nil { - os.Remove(tmpPath) continue } - for _, f := range r.File { - rc, _ := f.Open() - reader := csv.NewReader(rc) - for { - record, err := reader.Read() - if err == io.EOF { - break - } - if err != nil || len(record) < 12 { - continue - } - ot, _ := strconv.ParseInt(record[0], 10, 64) - ct, _ := strconv.ParseInt(record[6], 10, 64) - num, _ := strconv.Atoi(record[8]) - result = append(result, &entity.Kline{ - OpenTime: ot, - Open: record[1], - High: record[2], - Low: record[3], - Close: record[4], - Volume: record[5], - CloseTime: ct, - QuoteAssetVolume: record[7], - NumberOfTrades: num, - TakerBuyBaseAssetVolume: record[9], - TakerBuyQuoteAssetVolume: record[10], - Ignore: record[11], - }) + + var buf bytes.Buffer + writer := csv.NewWriter(&buf) + _ = writer.Write(header) + reader := csv.NewReader(rc) + for { + record, err := reader.Read() + if err == io.EOF { + break } - rc.Close() + if err != nil || len(record) < 12 { + continue + } + _ = writer.Write(record) } - r.Close() - os.Remove(tmpPath) + writer.Flush() + rc.Close() + + // csvutil parse + var klines []*entity.Kline + if err := csvutil.Unmarshal(buf.Bytes(), &klines); err != nil { + continue + } + // 可根據需要加上 symbol/interval + for _, k := range klines { + k.Symbol = symbol + k.Interval = interval + } + result = append(result, klines...) } return result, nil } + +func (repo *BinanceRepository) FetchHistoryKline(ctx context.Context, symbol string, interval string, startMillis, endMillis int64) ([]*entity.Kline, error) { + ch := make(chan []*entity.Kline, poolSize) + var wg sync.WaitGroup + + pool, _ := ants.NewPool(poolSize) + defer pool.Release() + + // 產生所有天的任務 + for d := startDate; !d.After(endDate); d = d.AddDate(0, 0, 1) { + day := d + wg.Add(1) + _ = pool.Submit(func() { + defer wg.Done() + klines, err := fetchOneDayKline(ctx, symbol, interval, day) + if err == nil && len(klines) > 0 { + ch <- klines // 只要拿到資料就丟進 channel + } + // 沒資料不用丟,避免 nil append + }) + } + + // 等全部任務完成再關閉 channel + go func() { + wg.Wait() + close(ch) + }() + + // 收集所有 K 線 + var allKlines []*Kline + for klines := range ch { + allKlines = append(allKlines, klines...) + } + + return allKlines, nil +} diff --git a/internal/repository/data_source_binance_test.go b/internal/repository/data_source_binance_test.go index cd5c7f4..5334185 100644 --- a/internal/repository/data_source_binance_test.go +++ b/internal/repository/data_source_binance_test.go @@ -6,6 +6,7 @@ import ( "fmt" "sync" "testing" + "time" miniredis "github.com/alicebob/miniredis/v2" "github.com/zeromicro/go-zero/core/stores/redis" @@ -81,3 +82,19 @@ func TestGetSymbolsFromSource_TableDriven(t *testing.T) { }) } } + +func TestSymbol(t *testing.T) { + mr, rdb := setupMiniRedis() + defer mr.Close() + + repo := MustBinanceRepository(BinanceRepositoryParam{ + Conf: &config.Binance{ + Key: "", + Secret: "", + TestMode: true, + }, + Redis: rdb, + }) + + repo.FetchHistoryKline(context.Background(), "BTCUSDT", "1m", time.Date(2025, 8, 4, 0, 0, 0, 0, time.UTC).UnixMilli(), 0) +}