From 02db7cbd7f9c096e17f96d448055fc8e23a01ae4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=80=A7=E9=A9=8A?= Date: Thu, 7 Aug 2025 07:41:18 +0800 Subject: [PATCH] add cassandra --- .golangci.yaml | 4 +- deployment/docker-compose.yaml | 2 - etc/blockchain.yaml | 5 ++- ....up.cql => 202508051001001.BTCUSDT.up.cql} | 0 generate/database/create_kline_keyspace.cql | 1 + internal/config/config.go | 3 +- internal/domain/blockchain/errors.go | 5 ++- internal/domain/entity/kline.go | 7 +++- internal/domain/repository/data_source.go | 4 +- internal/domain/usecase/data_source.go | 8 ++++ internal/lib/websocket_manager/websocket.go | 1 + .../logic/blockchainservice/ping_logic.go | 12 ++++++ internal/repository/data_source_binance.go | 37 +++++++++++++---- internal/svc/service_context.go | 7 ++-- internal/usecase/binance.go | 40 ++++++++++++++++++- 15 files changed, 113 insertions(+), 23 deletions(-) rename generate/database/{202508051001001.kline.up.cql => 202508051001001.BTCUSDT.up.cql} (100%) create mode 100644 generate/database/create_kline_keyspace.cql create mode 100644 internal/lib/websocket_manager/websocket.go diff --git a/.golangci.yaml b/.golangci.yaml index 0e8d79e..948d7f1 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -99,8 +99,8 @@ issues: - gocognit - contextcheck -# exclude-dirs: -# - internal/logic + exclude-dirs: + - internal/lib/cassandra exclude-files: - .*_test.go diff --git a/deployment/docker-compose.yaml b/deployment/docker-compose.yaml index 094ff8e..7365d63 100644 --- a/deployment/docker-compose.yaml +++ b/deployment/docker-compose.yaml @@ -1,5 +1,3 @@ -version: '3' - services: docker-etcd: hostname: etcd diff --git a/etc/blockchain.yaml b/etc/blockchain.yaml index abde5db..e965115 100644 --- a/etc/blockchain.yaml +++ b/etc/blockchain.yaml @@ -1,5 +1,6 @@ Name: blockchain.rpc ListenOn: 0.0.0.0:8888 +Timeout: 10000 Etcd: Hosts: - 10.0.0.19:2379 @@ -8,7 +9,7 @@ Binance: Key: "" Secret: "" TestMode: true - WorkerSize: 10 + WorkerSize: 20 RedisCluster: Host: 127.0.0.1:6379 @@ -18,7 +19,7 @@ Cassandra: Hosts: - 127.0.0.1 Port: 9042 - Keyspace: sccflex + Keyspace: digimon UseAuth: true Username: cassandra Password: cassandra diff --git a/generate/database/202508051001001.kline.up.cql b/generate/database/202508051001001.BTCUSDT.up.cql similarity index 100% rename from generate/database/202508051001001.kline.up.cql rename to generate/database/202508051001001.BTCUSDT.up.cql diff --git a/generate/database/create_kline_keyspace.cql b/generate/database/create_kline_keyspace.cql new file mode 100644 index 0000000..a5bc78b --- /dev/null +++ b/generate/database/create_kline_keyspace.cql @@ -0,0 +1 @@ +create keyspace kline with replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; diff --git a/internal/config/config.go b/internal/config/config.go index 8d678fe..09b04bb 100755 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,9 +1,10 @@ package config import ( + "time" + "github.com/zeromicro/go-zero/core/stores/redis" "github.com/zeromicro/go-zero/zrpc" - "time" ) type Config struct { diff --git a/internal/domain/blockchain/errors.go b/internal/domain/blockchain/errors.go index 75f998b..4e2c601 100644 --- a/internal/domain/blockchain/errors.go +++ b/internal/domain/blockchain/errors.go @@ -4,4 +4,7 @@ import "code.30cm.net/digimon/library-go/errs" const CodeBlockchain uint32 = 10 -const FailedToGetSymbolFormBinanceErrorCode errs.ErrorCode = 1 +const ( + FailedToGetSymbolFormBinanceErrorCode errs.ErrorCode = 1 + FailedToUpsertBinanceErrorCode errs.ErrorCode = 2 +) diff --git a/internal/domain/entity/kline.go b/internal/domain/entity/kline.go index 780d6c0..08f3655 100644 --- a/internal/domain/entity/kline.go +++ b/internal/domain/entity/kline.go @@ -13,9 +13,12 @@ type Kline struct { TakerBuyBaseAssetVolume string `csv:"taker_buy_base_asset_volume" db:"taker_buy_base_asset_volume" cql:"taker_buy_base_asset_volume"` // 主動買入成交量 TakerBuyQuoteAssetVolume string `csv:"taker_buy_quote_asset_volume" db:"taker_buy_quote_asset_volume" cql:"taker_buy_quote_asset_volume"` // 主動買入成交額 Symbol string `db:"symbol" partition_key:"true" cql:"symbol"` // 交易對,partition key - Interval string `db:"interval" partition_key:"true" cql:"interval"` // K 線時間區間,partition key // 12h,15m,1d,1h,1m,1s,2h,30m,3m,4h,5m,6h,8h + // K 線時間區間,partition key // 12h,15m,1d,1h,1m,1s,2h,30m,3m,4h,5m,6h,8h + Interval string `db:"interval" partition_key:"true" cql:"interval"` } func (s *Kline) TableName() string { - return "symbol" + return "kline" } + +// todo 未來在分表,每一個交易幣兌一個表 diff --git a/internal/domain/repository/data_source.go b/internal/domain/repository/data_source.go index 55a990a..1a98be4 100644 --- a/internal/domain/repository/data_source.go +++ b/internal/domain/repository/data_source.go @@ -7,10 +7,10 @@ import ( type DataSourceRepository interface { GetSymbols(ctx context.Context) ([]*entity.Symbol, error) - KlineDownloader + Downloader } -type KlineDownloader interface { +type Downloader interface { // FetchHistoryKline 抓歷史 K 線資料 FetchHistoryKline(ctx context.Context, param QueryKline) ([]*entity.Kline, error) SaveHistoryKline(ctx context.Context, data []*entity.Kline) error diff --git a/internal/domain/usecase/data_source.go b/internal/domain/usecase/data_source.go index 4cb027c..50cba57 100644 --- a/internal/domain/usecase/data_source.go +++ b/internal/domain/usecase/data_source.go @@ -6,6 +6,7 @@ import ( type DataSourceUseCase interface { GetSymbols(ctx context.Context) ([]*Symbol, error) + UpsertKline(ctx context.Context, data QueryKline) error } // Symbol 代表交易對資訊 @@ -17,3 +18,10 @@ type Symbol struct { QuoteAsset string `json:"quote_asset"` // 報價幣種(如 BTCUSDT 的 USDT) QuoteAssetPrecision int `json:"quote_asset_precision"` // 報價資產顯示的小數位數 } + +type QueryKline struct { + Symbol string + Interval string + StartUnixNano int64 + EndUnixNano int64 +} diff --git a/internal/lib/websocket_manager/websocket.go b/internal/lib/websocket_manager/websocket.go new file mode 100644 index 0000000..12a3d45 --- /dev/null +++ b/internal/lib/websocket_manager/websocket.go @@ -0,0 +1 @@ +package websocket_manager diff --git a/internal/logic/blockchainservice/ping_logic.go b/internal/logic/blockchainservice/ping_logic.go index f143e22..27dbe49 100644 --- a/internal/logic/blockchainservice/ping_logic.go +++ b/internal/logic/blockchainservice/ping_logic.go @@ -1,7 +1,9 @@ package blockchainservicelogic import ( + "blockchain/internal/domain/usecase" "context" + "time" app_cloudep_blockchain "blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain" "blockchain/internal/svc" @@ -24,5 +26,15 @@ func NewPingLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PingLogic { } func (l *PingLogic) Ping(_ *app_cloudep_blockchain.NoneReq) (*app_cloudep_blockchain.OKResp, error) { + err := l.svcCtx.BinanceDataSource.UpsertKline(l.ctx, usecase.QueryKline{ + Symbol: "BTCUSDT", + Interval: "1m", + StartUnixNano: time.Date(2024, 8, 1, 0, 0, 0, 0, time.UTC).UnixNano(), + EndUnixNano: time.Date(2025, 8, 2, 0, 0, 0, 0, time.UTC).UnixNano(), + }) + if err != nil { + return nil, err + } + return &app_cloudep_blockchain.OKResp{}, nil } diff --git a/internal/repository/data_source_binance.go b/internal/repository/data_source_binance.go index 2326e27..2f31bfe 100644 --- a/internal/repository/data_source_binance.go +++ b/internal/repository/data_source_binance.go @@ -30,9 +30,10 @@ import ( ) type BinanceRepositoryParam struct { - Conf *config.Binance - Redis *redis.Redis - DB *cassandra.CassandraDB + Conf *config.Binance + Redis *redis.Redis + DB *cassandra.CassandraDB + KeySpace string } type BinanceRepository struct { @@ -61,6 +62,7 @@ func MustBinanceRepository(param BinanceRepositoryParam) repository.DataSourceRe barrier: syncx.NewSingleFlight(), workerSize: param.Conf.WorkerSize, workers: workers, + KeySpace: param.KeySpace, } } @@ -173,11 +175,32 @@ func (repo *BinanceRepository) FetchHistoryKline(ctx context.Context, param repo } func (repo *BinanceRepository) SaveHistoryKline(ctx context.Context, data []*entity.Kline) error { + ch := make(chan struct{}, repo.workerSize) + var wg sync.WaitGroup + var errList []error + var mu sync.Mutex + for _, item := range data { - err := repo.db.Insert(ctx, item, repo.KeySpace) - if err != nil { - logx.Errorf("failed to insert data: %v", item) - } + wg.Add(1) + ch <- struct{}{} // block if max concurrency reached + + go func(k *entity.Kline) { + defer wg.Done() + defer func() { <-ch }() + + if err := repo.db.Insert(ctx, k, repo.KeySpace); err != nil { + mu.Lock() + errList = append(errList, err) + mu.Unlock() + logx.Errorf("failed to insert data: %v", err) + } + }(item) + } + + wg.Wait() + + if len(errList) > 0 { + return fmt.Errorf("insert errors: %v", errList) } return nil diff --git a/internal/svc/service_context.go b/internal/svc/service_context.go index 0d8b39a..824981d 100644 --- a/internal/svc/service_context.go +++ b/internal/svc/service_context.go @@ -28,9 +28,10 @@ func NewServiceContext(c config.Config) *ServiceContext { } binanceRepo := repo.MustBinanceRepository(repo.BinanceRepositoryParam{ - Conf: &c.Binance, - Redis: newRedis, - DB: cassandra, + Conf: &c.Binance, + Redis: newRedis, + DB: cassandra, + KeySpace: c.Cassandra.Keyspace, }) return &ServiceContext{ diff --git a/internal/usecase/binance.go b/internal/usecase/binance.go index fbf40ee..1cdea0b 100644 --- a/internal/usecase/binance.go +++ b/internal/usecase/binance.go @@ -28,7 +28,6 @@ func MustBinanceUseCase(param BinanceUseCaseParam) usecase.DataSourceUseCase { func (use *BinanceUseCase) GetSymbols(ctx context.Context) ([]*usecase.Symbol, error) { result, err := use.BinanceRepo.GetSymbols(ctx) if err != nil { - // 錯誤代碼 20-201-04 e := errs.ThirdPartyErrorL( blockchain.CodeBlockchain, blockchain.FailedToGetSymbolFormBinanceErrorCode, @@ -56,3 +55,42 @@ func (use *BinanceUseCase) GetSymbols(ctx context.Context) ([]*usecase.Symbol, e return rpy, nil } + +func (use *BinanceUseCase) UpsertKline(ctx context.Context, data usecase.QueryKline) error { + origianData, err := use.BinanceRepo.FetchHistoryKline(ctx, repository.QueryKline{ + Symbol: data.Symbol, + Interval: data.Interval, + StartUnixNano: data.StartUnixNano, + EndUnixNano: data.EndUnixNano, + }) + if err != nil { + e := errs.ThirdPartyErrorL( + blockchain.CodeBlockchain, + blockchain.FailedToUpsertBinanceErrorCode, + logx.WithContext(ctx), + []logx.LogField{ + {Key: "func", Value: "BinanceRepo.FetchHistoryKline"}, + {Key: "err", Value: err.Error()}, + }, + "failed to get kline history from binance").Wrap(err) + + return e + } + + err = use.BinanceRepo.SaveHistoryKline(ctx, origianData) + if err != nil { + e := errs.DatabaseErrorWithScopeL( + blockchain.CodeBlockchain, + blockchain.FailedToUpsertBinanceErrorCode, + logx.WithContext(ctx), + []logx.LogField{ + {Key: "func", Value: "BinanceRepo.SaveHistoryKline"}, + {Key: "err", Value: err.Error()}, + }, + "failed save data from binance").Wrap(err) + + return e + } + + return nil +}