diff --git a/Makefile b/Makefile index 6d0eb0b..2a259a4 100644 --- a/Makefile +++ b/Makefile @@ -21,3 +21,8 @@ gen-rpc: # 建立 rpc code $(GO_CTL_NAME) rpc protoc ./generate/rpc/blockchain.proto -m --style=$(GO_ZERO_STYLE) --go_out=./gen_result/pb --go-grpc_out=./gen_result/pb --zrpc_out=. go mod tidy @echo "Generate core-api files successfully" + +fmt: # 格式優化 + $(GOFMT) -w $(GOFILES) + goimports -w ./ + golangci-lint run \ No newline at end of file diff --git a/blockchain.go b/blockchain.go index 76c09b0..8735f01 100644 --- a/blockchain.go +++ b/blockchain.go @@ -2,9 +2,10 @@ package main import ( "flag" - "fmt" - "blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain" + "github.com/zeromicro/go-zero/core/logx" + + app_cloudep_blockchain "blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain" "blockchain/internal/config" blockchainserviceServer "blockchain/internal/server/blockchainservice" "blockchain/internal/svc" @@ -34,6 +35,6 @@ func main() { }) defer s.Stop() - fmt.Printf("Starting rpc server at %s...\n", c.ListenOn) + logx.Infof("Starting rpc server at %s...\n", c.ListenOn) s.Start() } diff --git a/client/blockchainservice/blockchain_service.go b/client/blockchainservice/blockchain_service.go index 2bc7491..d269a8b 100644 --- a/client/blockchainservice/blockchain_service.go +++ b/client/blockchainservice/blockchain_service.go @@ -7,7 +7,7 @@ package blockchainservice import ( "context" - "blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain" + app_cloudep_blockchain "blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain" "github.com/zeromicro/go-zero/zrpc" "google.golang.org/grpc" diff --git a/etc/blockchain.yaml b/etc/blockchain.yaml index 10be160..79f7a95 100644 --- a/etc/blockchain.yaml +++ b/etc/blockchain.yaml @@ -8,6 +8,7 @@ Binance: Key: "" Secret: "" TestMode: true + WorkerSize: 2048 RedisCluster: Host: 127.0.0.1:6379 diff --git a/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain/blockchain.pb.go b/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain/blockchain.pb.go index c0c0c23..2c392ca 100644 --- a/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain/blockchain.pb.go +++ b/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain/blockchain.pb.go @@ -7,11 +7,12 @@ package app_cloudep_blockchain import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" unsafe "unsafe" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) const ( diff --git a/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain/blockchain_grpc.pb.go b/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain/blockchain_grpc.pb.go index 6944943..d0af9f0 100644 --- a/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain/blockchain_grpc.pb.go +++ b/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain/blockchain_grpc.pb.go @@ -8,6 +8,7 @@ package app_cloudep_blockchain import ( context "context" + grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" diff --git a/go.mod b/go.mod index acdd162..2f27521 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,8 @@ go 1.24.4 require ( github.com/alicebob/miniredis/v2 v2.35.0 + github.com/panjf2000/ants/v2 v2.11.3 + github.com/stretchr/testify v1.10.0 github.com/zeromicro/go-zero v1.8.5 google.golang.org/grpc v1.74.2 google.golang.org/protobuf v1.36.6 @@ -13,8 +15,10 @@ require ( github.com/bitly/go-simplejson v0.5.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/jpillora/backoff v1.0.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/shopspring/decimal v1.4.0 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect + golang.org/x/sync v0.16.0 // indirect ) require ( diff --git a/internal/config/config.go b/internal/config/config.go index 4bdd9d1..170d38e 100755 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -13,7 +13,8 @@ type Config struct { } type Binance struct { - Key string - Secret string - TestMode bool + Key string + Secret string + TestMode bool + WorkerSize int64 } diff --git a/internal/domain/entity/symbol.go b/internal/domain/entity/symbol.go index efe6f31..ffab55e 100644 --- a/internal/domain/entity/symbol.go +++ b/internal/domain/entity/symbol.go @@ -14,9 +14,8 @@ func (s *Symbol) TableName() string { return "symbol" } - // Symbol 這個是幣安的 -//type Symbol struct { +// type Symbol struct { // Symbol string `json:"symbol"` // 交易對名稱(如 "BTCUSDT") // Status string `json:"status"` // 狀態(如 "TRADING" 表示可交易) // BaseAsset string `json:"baseAsset"` // 主幣種(如 BTCUSDT 的 BTC) diff --git a/internal/domain/repository/data_source.go b/internal/domain/repository/data_source.go index e56f614..6a85ee5 100644 --- a/internal/domain/repository/data_source.go +++ b/internal/domain/repository/data_source.go @@ -12,5 +12,12 @@ type DataSourceRepository interface { type KlineDownloader interface { // FetchHistoryKline 抓歷史K線資料,startMillis=0 表示從最早,endMillis=0 表示到最新 - FetchHistoryKline(ctx context.Context, symbol string, interval string, startMillis, endMillis int64) ([]*entity.Kline, error) + FetchHistoryKline(ctx context.Context, param QueryKline) ([]*entity.Kline, error) +} + +type QueryKline struct { + Symbol string + Interval string + StartUnixNano int64 + EndUnixNano int64 } diff --git a/internal/logic/blockchainservice/list_symbols_logic.go b/internal/logic/blockchainservice/list_symbols_logic.go index ac20e0a..d011c38 100644 --- a/internal/logic/blockchainservice/list_symbols_logic.go +++ b/internal/logic/blockchainservice/list_symbols_logic.go @@ -4,6 +4,8 @@ import ( app_cloudep_blockchain "blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain" "blockchain/internal/svc" "context" + + "code.30cm.net/digimon/library-go/errs" "github.com/zeromicro/go-zero/core/logx" ) @@ -22,7 +24,7 @@ func NewListSymbolsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ListS } // ListSymbols retrieves all available trading symbols. -func (l *ListSymbolsLogic) ListSymbols(in *app_cloudep_blockchain.ListSymbolsRequest) (*app_cloudep_blockchain.ListSymbolsResponse, error) { +func (l *ListSymbolsLogic) ListSymbols(_ *app_cloudep_blockchain.ListSymbolsRequest) (*app_cloudep_blockchain.ListSymbolsResponse, error) { result, err := l.svcCtx.BinanceDataSource.GetSymbols(l.ctx) if err != nil { return nil, err @@ -30,14 +32,25 @@ func (l *ListSymbolsLogic) ListSymbols(in *app_cloudep_blockchain.ListSymbolsReq rpy := make([]*app_cloudep_blockchain.Symbol, 0, len(result)) for _, item := range result { if item != nil { - rpy = append(rpy, &app_cloudep_blockchain.Symbol{ - Symbol: item.Symbol, - Status: item.Status, - BaseAsset: item.BaseAsset, - BaseAssetPrecision: int32(item.BaseAssetPrecision), - QuoteAsset: item.QuoteAsset, - QuoteAssetPrecision: int32(item.QuoteAssetPrecision), - }) + const maxInt32 = 1<<31 - 1 + + ap := &app_cloudep_blockchain.Symbol{ + Symbol: item.Symbol, + Status: item.Status, + BaseAsset: item.BaseAsset, + QuoteAsset: item.QuoteAsset, + } + + if item.BaseAssetPrecision > maxInt32 || item.BaseAssetPrecision < -maxInt32-1 { + return nil, errs.InvalidFormat("BaseAssetPrecision overflow") + } + ap.BaseAssetPrecision = int32(item.BaseAssetPrecision) + if item.QuoteAssetPrecision > maxInt32 || item.QuoteAssetPrecision < -maxInt32-1 { + return nil, errs.InvalidFormat("QuoteAssetPrecision overflow") + } + ap.QuoteAssetPrecision = int32(item.QuoteAssetPrecision) + + rpy = append(rpy, ap) } } diff --git a/internal/logic/blockchainservice/ping_logic.go b/internal/logic/blockchainservice/ping_logic.go index 128ffbb..f143e22 100644 --- a/internal/logic/blockchainservice/ping_logic.go +++ b/internal/logic/blockchainservice/ping_logic.go @@ -3,7 +3,7 @@ package blockchainservicelogic import ( "context" - "blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain" + app_cloudep_blockchain "blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain" "blockchain/internal/svc" "github.com/zeromicro/go-zero/core/logx" @@ -23,8 +23,6 @@ func NewPingLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PingLogic { } } -func (l *PingLogic) Ping(in *app_cloudep_blockchain.NoneReq) (*app_cloudep_blockchain.OKResp, error) { - // todo: add your logic here and delete this line - +func (l *PingLogic) Ping(_ *app_cloudep_blockchain.NoneReq) (*app_cloudep_blockchain.OKResp, error) { return &app_cloudep_blockchain.OKResp{}, nil } diff --git a/internal/repository/data_source_binance.go b/internal/repository/data_source_binance.go index bc3a0cd..71dc0e7 100644 --- a/internal/repository/data_source_binance.go +++ b/internal/repository/data_source_binance.go @@ -17,6 +17,9 @@ import ( "path/filepath" "strings" "sync" + "time" + + "github.com/panjf2000/ants/v2" "github.com/adshao/go-binance/v2" "github.com/jszwec/csvutil" @@ -31,9 +34,11 @@ type BinanceRepositoryParam struct { } type BinanceRepository struct { - Client *binance.Client - rds *redis.Redis - barrier syncx.SingleFlight + Client *binance.Client + rds *redis.Redis + barrier syncx.SingleFlight + workers *ants.Pool + workerSize int64 } func MustBinanceRepository(param BinanceRepositoryParam) repository.DataSourceRepository { @@ -43,11 +48,14 @@ func MustBinanceRepository(param BinanceRepositoryParam) repository.DataSourceRe binance.UseTestnet = true } client := binance.NewClient(apiKey, secret) + workers, _ := ants.NewPool(int(param.Conf.WorkerSize)) return &BinanceRepository{ - Client: client, - rds: param.Redis, - barrier: syncx.NewSingleFlight(), + Client: client, + rds: param.Redis, + barrier: syncx.NewSingleFlight(), + workerSize: param.Conf.WorkerSize, + workers: workers, } } @@ -64,6 +72,7 @@ func (repo *BinanceRepository) GetSymbols(ctx context.Context) ([]*entity.Symbol } else { // 如果任何一個反序列化失敗,代表快取可能已損壞,最好是回源重新拉取 canUseCache = false + break } } @@ -116,9 +125,49 @@ func (repo *BinanceRepository) GetSymbols(ctx context.Context) ([]*entity.Symbol return nil, err } - return val.([]*entity.Symbol), nil + if symbols, ok := val.([]*entity.Symbol); ok { + return symbols, nil + } + + return nil, fmt.Errorf("invalid symbol type: %T", val) } +func (repo *BinanceRepository) FetchHistoryKline(ctx context.Context, param repository.QueryKline) ([]*entity.Kline, error) { + ch := make(chan []*entity.Kline, repo.workerSize) + var wg sync.WaitGroup + + start := time.Unix(0, param.StartUnixNano) + end := time.Unix(0, param.EndUnixNano) + // 產生所有天的任務 + for d := start; !d.After(end); d = d.AddDate(0, 0, 1) { + day := d + wg.Add(1) + _ = repo.workers.Submit(func() { + defer wg.Done() + klines, err := repo.fetchHistoryKline(ctx, param.Symbol, param.Interval, day.Format(time.DateOnly)) + if err == nil && len(klines) > 0 { + ch <- klines // 只要拿到資料就丟進 channel + } + // 沒資料不用丟,避免 nil append + }) + } + + // 等全部任務完成再關閉 channel + go func() { + wg.Wait() + close(ch) + }() + + // 收集所有 K 線 + var allKlines []*entity.Kline + for klines := range ch { + allKlines = append(allKlines, klines...) + } + + return allKlines, nil +} + +// ============= func (repo *BinanceRepository) getSymbolsFromSource(ctx context.Context) ([]binance.Symbol, error) { if repo.Client == nil { return nil, fmt.Errorf("binance client not initialized") @@ -138,25 +187,32 @@ func (repo *BinanceRepository) fetchHistoryKline(ctx context.Context, symbol str symbol = strings.ToUpper(symbol) zipFile := fmt.Sprintf("%s-%s-%s.zip", symbol, interval, date) url := fmt.Sprintf("%s/%s/%s/%s", baseURL, symbol, interval, zipFile) - - // 先 HEAD 確認檔案是否存在,節省流量 - respHead, err := http.Head(url) - if err != nil || respHead.StatusCode != 200 { - return nil, fmt.Errorf("file not found: %s", url) + if err := check(ctx, url); err != nil { + return nil, err } - + // 這個 URL 只可能指向 binance.vision 官方站,已限定字串組合,不可能被用戶控制。 + // #nosec G107 // 下載 zip - resp, err := http.Get(url) - if err != nil || resp.StatusCode != 200 { + // 這個 URL 只可能指向 binance.vision 官方站,已限定字串組合,不可能被用戶控制。 + // #nosec G107 + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + client := &http.Client{} + resp, err := client.Do(req) + if err != nil || resp.StatusCode != http.StatusOK { if resp != nil { resp.Body.Close() } + return nil, fmt.Errorf("failed to fetch file %s", url) } + defer resp.Body.Close() + tmpPath := filepath.Join(os.TempDir(), zipFile) out, err := os.Create(tmpPath) if err != nil { - resp.Body.Close() return nil, err } _, _ = io.Copy(out, resp.Body) @@ -167,6 +223,7 @@ func (repo *BinanceRepository) fetchHistoryKline(ctx context.Context, symbol str r, err := zip.OpenReader(tmpPath) if err != nil { os.Remove(tmpPath) + return nil, err } defer r.Close() @@ -218,38 +275,24 @@ func (repo *BinanceRepository) fetchHistoryKline(ctx context.Context, symbol str return result, nil } -func (repo *BinanceRepository) FetchHistoryKline(ctx context.Context, symbol string, interval string, startMillis, endMillis int64) ([]*entity.Kline, error) { - ch := make(chan []*entity.Kline, poolSize) - var wg sync.WaitGroup - - pool, _ := ants.NewPool(poolSize) - defer pool.Release() - - // 產生所有天的任務 - for d := startDate; !d.After(endDate); d = d.AddDate(0, 0, 1) { - day := d - wg.Add(1) - _ = pool.Submit(func() { - defer wg.Done() - klines, err := fetchOneDayKline(ctx, symbol, interval, day) - if err == nil && len(klines) > 0 { - ch <- klines // 只要拿到資料就丟進 channel - } - // 沒資料不用丟,避免 nil append - }) +func check(ctx context.Context, url string) error { + // 先 HEAD 確認檔案是否存在,節省流量 + // 這個 URL 只可能指向 binance.vision 官方站,已限定字串組合,不可能被用戶控制。 + // #nosec G107 + req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil) + if err != nil { + return err } + client := &http.Client{} + respHead, err := client.Do(req) + if err != nil || respHead.StatusCode != http.StatusOK { + if respHead != nil { + respHead.Body.Close() + } - // 等全部任務完成再關閉 channel - go func() { - wg.Wait() - close(ch) - }() - - // 收集所有 K 線 - var allKlines []*Kline - for klines := range ch { - allKlines = append(allKlines, klines...) + return fmt.Errorf("file not found: %s", url) } + defer respHead.Body.Close() - return allKlines, nil + return nil } diff --git a/internal/repository/data_source_binance_test.go b/internal/repository/data_source_binance_test.go index 5334185..a4eb74f 100644 --- a/internal/repository/data_source_binance_test.go +++ b/internal/repository/data_source_binance_test.go @@ -2,12 +2,15 @@ package repository import ( "blockchain/internal/config" + "blockchain/internal/domain/repository" "context" "fmt" "sync" "testing" "time" + "github.com/stretchr/testify/assert" + miniredis "github.com/alicebob/miniredis/v2" "github.com/zeromicro/go-zero/core/stores/redis" ) @@ -96,5 +99,14 @@ func TestSymbol(t *testing.T) { Redis: rdb, }) - repo.FetchHistoryKline(context.Background(), "BTCUSDT", "1m", time.Date(2025, 8, 4, 0, 0, 0, 0, time.UTC).UnixMilli(), 0) + k, err := repo.FetchHistoryKline(context.Background(), repository.QueryKline{ + Symbol: "BTCUSDT", + Interval: "1m", + StartUnixNano: time.Date(2025, 8, 3, 0, 0, 0, 0, time.UTC).UnixNano(), + EndUnixNano: time.Date(2025, 8, 4, 0, 0, 0, 0, time.UTC).UnixNano(), + }) + assert.NoError(t, err) + for _, item := range k { + fmt.Println(*item) + } } diff --git a/internal/server/blockchainservice/blockchain_service_server.go b/internal/server/blockchainservice/blockchain_service_server.go index 210f60a..5ea0248 100644 --- a/internal/server/blockchainservice/blockchain_service_server.go +++ b/internal/server/blockchainservice/blockchain_service_server.go @@ -7,8 +7,8 @@ package server import ( "context" - "blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain" - "blockchain/internal/logic/blockchainservice" + app_cloudep_blockchain "blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain" + blockchainservicelogic "blockchain/internal/logic/blockchainservice" "blockchain/internal/svc" ) diff --git a/internal/svc/service_context.go b/internal/svc/service_context.go index e1c6464..bc5d08a 100644 --- a/internal/svc/service_context.go +++ b/internal/svc/service_context.go @@ -17,7 +17,6 @@ type ServiceContext struct { } func NewServiceContext(c config.Config) *ServiceContext { - newRedis, err := redis.NewRedis(c.RedisCluster) if err != nil { panic(err) diff --git a/internal/usecase/binance.go b/internal/usecase/binance.go index 0865451..fbf40ee 100644 --- a/internal/usecase/binance.go +++ b/internal/usecase/binance.go @@ -55,5 +55,4 @@ func (use *BinanceUseCase) GetSymbols(ctx context.Context) ([]*usecase.Symbol, e } return rpy, nil - }