add query binance history data

This commit is contained in:
王性驊 2025-08-05 22:11:01 +08:00
parent 5451ca1992
commit 56a62912b0
17 changed files with 160 additions and 76 deletions

View File

@ -21,3 +21,8 @@ gen-rpc: # 建立 rpc code
$(GO_CTL_NAME) rpc protoc ./generate/rpc/blockchain.proto -m --style=$(GO_ZERO_STYLE) --go_out=./gen_result/pb --go-grpc_out=./gen_result/pb --zrpc_out=. $(GO_CTL_NAME) rpc protoc ./generate/rpc/blockchain.proto -m --style=$(GO_ZERO_STYLE) --go_out=./gen_result/pb --go-grpc_out=./gen_result/pb --zrpc_out=.
go mod tidy go mod tidy
@echo "Generate core-api files successfully" @echo "Generate core-api files successfully"
fmt: # 格式優化
$(GOFMT) -w $(GOFILES)
goimports -w ./
golangci-lint run

View File

@ -2,9 +2,10 @@ package main
import ( import (
"flag" "flag"
"fmt"
"blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain" "github.com/zeromicro/go-zero/core/logx"
app_cloudep_blockchain "blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain"
"blockchain/internal/config" "blockchain/internal/config"
blockchainserviceServer "blockchain/internal/server/blockchainservice" blockchainserviceServer "blockchain/internal/server/blockchainservice"
"blockchain/internal/svc" "blockchain/internal/svc"
@ -34,6 +35,6 @@ func main() {
}) })
defer s.Stop() defer s.Stop()
fmt.Printf("Starting rpc server at %s...\n", c.ListenOn) logx.Infof("Starting rpc server at %s...\n", c.ListenOn)
s.Start() s.Start()
} }

View File

@ -7,7 +7,7 @@ package blockchainservice
import ( import (
"context" "context"
"blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain" app_cloudep_blockchain "blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain"
"github.com/zeromicro/go-zero/zrpc" "github.com/zeromicro/go-zero/zrpc"
"google.golang.org/grpc" "google.golang.org/grpc"

View File

@ -8,6 +8,7 @@ Binance:
Key: "" Key: ""
Secret: "" Secret: ""
TestMode: true TestMode: true
WorkerSize: 2048
RedisCluster: RedisCluster:
Host: 127.0.0.1:6379 Host: 127.0.0.1:6379

View File

@ -7,11 +7,12 @@
package app_cloudep_blockchain package app_cloudep_blockchain
import ( import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect" reflect "reflect"
sync "sync" sync "sync"
unsafe "unsafe" unsafe "unsafe"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
) )
const ( const (

View File

@ -8,6 +8,7 @@ package app_cloudep_blockchain
import ( import (
context "context" context "context"
grpc "google.golang.org/grpc" grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes" codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status" status "google.golang.org/grpc/status"

4
go.mod
View File

@ -4,6 +4,8 @@ go 1.24.4
require ( require (
github.com/alicebob/miniredis/v2 v2.35.0 github.com/alicebob/miniredis/v2 v2.35.0
github.com/panjf2000/ants/v2 v2.11.3
github.com/stretchr/testify v1.10.0
github.com/zeromicro/go-zero v1.8.5 github.com/zeromicro/go-zero v1.8.5
google.golang.org/grpc v1.74.2 google.golang.org/grpc v1.74.2
google.golang.org/protobuf v1.36.6 google.golang.org/protobuf v1.36.6
@ -13,8 +15,10 @@ require (
github.com/bitly/go-simplejson v0.5.0 // indirect github.com/bitly/go-simplejson v0.5.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect github.com/gorilla/websocket v1.5.3 // indirect
github.com/jpillora/backoff v1.0.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/shopspring/decimal v1.4.0 // indirect github.com/shopspring/decimal v1.4.0 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect
golang.org/x/sync v0.16.0 // indirect
) )
require ( require (

View File

@ -13,7 +13,8 @@ type Config struct {
} }
type Binance struct { type Binance struct {
Key string Key string
Secret string Secret string
TestMode bool TestMode bool
WorkerSize int64
} }

View File

@ -14,9 +14,8 @@ func (s *Symbol) TableName() string {
return "symbol" return "symbol"
} }
// Symbol 這個是幣安的 // Symbol 這個是幣安的
//type Symbol struct { // type Symbol struct {
// Symbol string `json:"symbol"` // 交易對名稱(如 "BTCUSDT" // Symbol string `json:"symbol"` // 交易對名稱(如 "BTCUSDT"
// Status string `json:"status"` // 狀態(如 "TRADING" 表示可交易) // Status string `json:"status"` // 狀態(如 "TRADING" 表示可交易)
// BaseAsset string `json:"baseAsset"` // 主幣種(如 BTCUSDT 的 BTC // BaseAsset string `json:"baseAsset"` // 主幣種(如 BTCUSDT 的 BTC

View File

@ -12,5 +12,12 @@ type DataSourceRepository interface {
type KlineDownloader interface { type KlineDownloader interface {
// FetchHistoryKline 抓歷史K線資料startMillis=0 表示從最早endMillis=0 表示到最新 // FetchHistoryKline 抓歷史K線資料startMillis=0 表示從最早endMillis=0 表示到最新
FetchHistoryKline(ctx context.Context, symbol string, interval string, startMillis, endMillis int64) ([]*entity.Kline, error) FetchHistoryKline(ctx context.Context, param QueryKline) ([]*entity.Kline, error)
}
type QueryKline struct {
Symbol string
Interval string
StartUnixNano int64
EndUnixNano int64
} }

View File

@ -4,6 +4,8 @@ import (
app_cloudep_blockchain "blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain" app_cloudep_blockchain "blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain"
"blockchain/internal/svc" "blockchain/internal/svc"
"context" "context"
"code.30cm.net/digimon/library-go/errs"
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
) )
@ -22,7 +24,7 @@ func NewListSymbolsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ListS
} }
// ListSymbols retrieves all available trading symbols. // ListSymbols retrieves all available trading symbols.
func (l *ListSymbolsLogic) ListSymbols(in *app_cloudep_blockchain.ListSymbolsRequest) (*app_cloudep_blockchain.ListSymbolsResponse, error) { func (l *ListSymbolsLogic) ListSymbols(_ *app_cloudep_blockchain.ListSymbolsRequest) (*app_cloudep_blockchain.ListSymbolsResponse, error) {
result, err := l.svcCtx.BinanceDataSource.GetSymbols(l.ctx) result, err := l.svcCtx.BinanceDataSource.GetSymbols(l.ctx)
if err != nil { if err != nil {
return nil, err return nil, err
@ -30,14 +32,25 @@ func (l *ListSymbolsLogic) ListSymbols(in *app_cloudep_blockchain.ListSymbolsReq
rpy := make([]*app_cloudep_blockchain.Symbol, 0, len(result)) rpy := make([]*app_cloudep_blockchain.Symbol, 0, len(result))
for _, item := range result { for _, item := range result {
if item != nil { if item != nil {
rpy = append(rpy, &app_cloudep_blockchain.Symbol{ const maxInt32 = 1<<31 - 1
Symbol: item.Symbol,
Status: item.Status, ap := &app_cloudep_blockchain.Symbol{
BaseAsset: item.BaseAsset, Symbol: item.Symbol,
BaseAssetPrecision: int32(item.BaseAssetPrecision), Status: item.Status,
QuoteAsset: item.QuoteAsset, BaseAsset: item.BaseAsset,
QuoteAssetPrecision: int32(item.QuoteAssetPrecision), QuoteAsset: item.QuoteAsset,
}) }
if item.BaseAssetPrecision > maxInt32 || item.BaseAssetPrecision < -maxInt32-1 {
return nil, errs.InvalidFormat("BaseAssetPrecision overflow")
}
ap.BaseAssetPrecision = int32(item.BaseAssetPrecision)
if item.QuoteAssetPrecision > maxInt32 || item.QuoteAssetPrecision < -maxInt32-1 {
return nil, errs.InvalidFormat("QuoteAssetPrecision overflow")
}
ap.QuoteAssetPrecision = int32(item.QuoteAssetPrecision)
rpy = append(rpy, ap)
} }
} }

View File

@ -3,7 +3,7 @@ package blockchainservicelogic
import ( import (
"context" "context"
"blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain" app_cloudep_blockchain "blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain"
"blockchain/internal/svc" "blockchain/internal/svc"
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
@ -23,8 +23,6 @@ func NewPingLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PingLogic {
} }
} }
func (l *PingLogic) Ping(in *app_cloudep_blockchain.NoneReq) (*app_cloudep_blockchain.OKResp, error) { func (l *PingLogic) Ping(_ *app_cloudep_blockchain.NoneReq) (*app_cloudep_blockchain.OKResp, error) {
// todo: add your logic here and delete this line
return &app_cloudep_blockchain.OKResp{}, nil return &app_cloudep_blockchain.OKResp{}, nil
} }

View File

@ -17,6 +17,9 @@ import (
"path/filepath" "path/filepath"
"strings" "strings"
"sync" "sync"
"time"
"github.com/panjf2000/ants/v2"
"github.com/adshao/go-binance/v2" "github.com/adshao/go-binance/v2"
"github.com/jszwec/csvutil" "github.com/jszwec/csvutil"
@ -31,9 +34,11 @@ type BinanceRepositoryParam struct {
} }
type BinanceRepository struct { type BinanceRepository struct {
Client *binance.Client Client *binance.Client
rds *redis.Redis rds *redis.Redis
barrier syncx.SingleFlight barrier syncx.SingleFlight
workers *ants.Pool
workerSize int64
} }
func MustBinanceRepository(param BinanceRepositoryParam) repository.DataSourceRepository { func MustBinanceRepository(param BinanceRepositoryParam) repository.DataSourceRepository {
@ -43,11 +48,14 @@ func MustBinanceRepository(param BinanceRepositoryParam) repository.DataSourceRe
binance.UseTestnet = true binance.UseTestnet = true
} }
client := binance.NewClient(apiKey, secret) client := binance.NewClient(apiKey, secret)
workers, _ := ants.NewPool(int(param.Conf.WorkerSize))
return &BinanceRepository{ return &BinanceRepository{
Client: client, Client: client,
rds: param.Redis, rds: param.Redis,
barrier: syncx.NewSingleFlight(), barrier: syncx.NewSingleFlight(),
workerSize: param.Conf.WorkerSize,
workers: workers,
} }
} }
@ -64,6 +72,7 @@ func (repo *BinanceRepository) GetSymbols(ctx context.Context) ([]*entity.Symbol
} else { } else {
// 如果任何一個反序列化失敗,代表快取可能已損壞,最好是回源重新拉取 // 如果任何一個反序列化失敗,代表快取可能已損壞,最好是回源重新拉取
canUseCache = false canUseCache = false
break break
} }
} }
@ -116,9 +125,49 @@ func (repo *BinanceRepository) GetSymbols(ctx context.Context) ([]*entity.Symbol
return nil, err return nil, err
} }
return val.([]*entity.Symbol), nil if symbols, ok := val.([]*entity.Symbol); ok {
return symbols, nil
}
return nil, fmt.Errorf("invalid symbol type: %T", val)
} }
func (repo *BinanceRepository) FetchHistoryKline(ctx context.Context, param repository.QueryKline) ([]*entity.Kline, error) {
ch := make(chan []*entity.Kline, repo.workerSize)
var wg sync.WaitGroup
start := time.Unix(0, param.StartUnixNano)
end := time.Unix(0, param.EndUnixNano)
// 產生所有天的任務
for d := start; !d.After(end); d = d.AddDate(0, 0, 1) {
day := d
wg.Add(1)
_ = repo.workers.Submit(func() {
defer wg.Done()
klines, err := repo.fetchHistoryKline(ctx, param.Symbol, param.Interval, day.Format(time.DateOnly))
if err == nil && len(klines) > 0 {
ch <- klines // 只要拿到資料就丟進 channel
}
// 沒資料不用丟,避免 nil append
})
}
// 等全部任務完成再關閉 channel
go func() {
wg.Wait()
close(ch)
}()
// 收集所有 K 線
var allKlines []*entity.Kline
for klines := range ch {
allKlines = append(allKlines, klines...)
}
return allKlines, nil
}
// =============
func (repo *BinanceRepository) getSymbolsFromSource(ctx context.Context) ([]binance.Symbol, error) { func (repo *BinanceRepository) getSymbolsFromSource(ctx context.Context) ([]binance.Symbol, error) {
if repo.Client == nil { if repo.Client == nil {
return nil, fmt.Errorf("binance client not initialized") return nil, fmt.Errorf("binance client not initialized")
@ -138,25 +187,32 @@ func (repo *BinanceRepository) fetchHistoryKline(ctx context.Context, symbol str
symbol = strings.ToUpper(symbol) symbol = strings.ToUpper(symbol)
zipFile := fmt.Sprintf("%s-%s-%s.zip", symbol, interval, date) zipFile := fmt.Sprintf("%s-%s-%s.zip", symbol, interval, date)
url := fmt.Sprintf("%s/%s/%s/%s", baseURL, symbol, interval, zipFile) url := fmt.Sprintf("%s/%s/%s/%s", baseURL, symbol, interval, zipFile)
if err := check(ctx, url); err != nil {
// 先 HEAD 確認檔案是否存在,節省流量 return nil, err
respHead, err := http.Head(url)
if err != nil || respHead.StatusCode != 200 {
return nil, fmt.Errorf("file not found: %s", url)
} }
// 這個 URL 只可能指向 binance.vision 官方站,已限定字串組合,不可能被用戶控制。
// #nosec G107
// 下載 zip // 下載 zip
resp, err := http.Get(url) // 這個 URL 只可能指向 binance.vision 官方站,已限定字串組合,不可能被用戶控制。
if err != nil || resp.StatusCode != 200 { // #nosec G107
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil || resp.StatusCode != http.StatusOK {
if resp != nil { if resp != nil {
resp.Body.Close() resp.Body.Close()
} }
return nil, fmt.Errorf("failed to fetch file %s", url) return nil, fmt.Errorf("failed to fetch file %s", url)
} }
defer resp.Body.Close()
tmpPath := filepath.Join(os.TempDir(), zipFile) tmpPath := filepath.Join(os.TempDir(), zipFile)
out, err := os.Create(tmpPath) out, err := os.Create(tmpPath)
if err != nil { if err != nil {
resp.Body.Close()
return nil, err return nil, err
} }
_, _ = io.Copy(out, resp.Body) _, _ = io.Copy(out, resp.Body)
@ -167,6 +223,7 @@ func (repo *BinanceRepository) fetchHistoryKline(ctx context.Context, symbol str
r, err := zip.OpenReader(tmpPath) r, err := zip.OpenReader(tmpPath)
if err != nil { if err != nil {
os.Remove(tmpPath) os.Remove(tmpPath)
return nil, err return nil, err
} }
defer r.Close() defer r.Close()
@ -218,38 +275,24 @@ func (repo *BinanceRepository) fetchHistoryKline(ctx context.Context, symbol str
return result, nil return result, nil
} }
func (repo *BinanceRepository) FetchHistoryKline(ctx context.Context, symbol string, interval string, startMillis, endMillis int64) ([]*entity.Kline, error) { func check(ctx context.Context, url string) error {
ch := make(chan []*entity.Kline, poolSize) // 先 HEAD 確認檔案是否存在,節省流量
var wg sync.WaitGroup // 這個 URL 只可能指向 binance.vision 官方站,已限定字串組合,不可能被用戶控制。
// #nosec G107
pool, _ := ants.NewPool(poolSize) req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil)
defer pool.Release() if err != nil {
return err
// 產生所有天的任務
for d := startDate; !d.After(endDate); d = d.AddDate(0, 0, 1) {
day := d
wg.Add(1)
_ = pool.Submit(func() {
defer wg.Done()
klines, err := fetchOneDayKline(ctx, symbol, interval, day)
if err == nil && len(klines) > 0 {
ch <- klines // 只要拿到資料就丟進 channel
}
// 沒資料不用丟,避免 nil append
})
} }
client := &http.Client{}
respHead, err := client.Do(req)
if err != nil || respHead.StatusCode != http.StatusOK {
if respHead != nil {
respHead.Body.Close()
}
// 等全部任務完成再關閉 channel return fmt.Errorf("file not found: %s", url)
go func() {
wg.Wait()
close(ch)
}()
// 收集所有 K 線
var allKlines []*Kline
for klines := range ch {
allKlines = append(allKlines, klines...)
} }
defer respHead.Body.Close()
return allKlines, nil return nil
} }

View File

@ -2,12 +2,15 @@ package repository
import ( import (
"blockchain/internal/config" "blockchain/internal/config"
"blockchain/internal/domain/repository"
"context" "context"
"fmt" "fmt"
"sync" "sync"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
miniredis "github.com/alicebob/miniredis/v2" miniredis "github.com/alicebob/miniredis/v2"
"github.com/zeromicro/go-zero/core/stores/redis" "github.com/zeromicro/go-zero/core/stores/redis"
) )
@ -96,5 +99,14 @@ func TestSymbol(t *testing.T) {
Redis: rdb, Redis: rdb,
}) })
repo.FetchHistoryKline(context.Background(), "BTCUSDT", "1m", time.Date(2025, 8, 4, 0, 0, 0, 0, time.UTC).UnixMilli(), 0) k, err := repo.FetchHistoryKline(context.Background(), repository.QueryKline{
Symbol: "BTCUSDT",
Interval: "1m",
StartUnixNano: time.Date(2025, 8, 3, 0, 0, 0, 0, time.UTC).UnixNano(),
EndUnixNano: time.Date(2025, 8, 4, 0, 0, 0, 0, time.UTC).UnixNano(),
})
assert.NoError(t, err)
for _, item := range k {
fmt.Println(*item)
}
} }

View File

@ -7,8 +7,8 @@ package server
import ( import (
"context" "context"
"blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain" app_cloudep_blockchain "blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain"
"blockchain/internal/logic/blockchainservice" blockchainservicelogic "blockchain/internal/logic/blockchainservice"
"blockchain/internal/svc" "blockchain/internal/svc"
) )

View File

@ -17,7 +17,6 @@ type ServiceContext struct {
} }
func NewServiceContext(c config.Config) *ServiceContext { func NewServiceContext(c config.Config) *ServiceContext {
newRedis, err := redis.NewRedis(c.RedisCluster) newRedis, err := redis.NewRedis(c.RedisCluster)
if err != nil { if err != nil {
panic(err) panic(err)

View File

@ -55,5 +55,4 @@ func (use *BinanceUseCase) GetSymbols(ctx context.Context) ([]*usecase.Symbol, e
} }
return rpy, nil return rpy, nil
} }