diff --git a/client/blockchainservice/blockchain_service.go b/client/blockchainservice/blockchain_service.go index 8ce2ec6..03d832b 100644 --- a/client/blockchainservice/blockchain_service.go +++ b/client/blockchainservice/blockchain_service.go @@ -1,5 +1,5 @@ // Code generated by goctl. DO NOT EDIT. -// goctl 1.8.1 +// goctl 1.8.5 // Source: blockchain.proto package blockchainservice @@ -15,6 +15,7 @@ import ( type ( HistoryReq = app_cloudep_blockchain.HistoryReq + ListCandleDataResp = app_cloudep_blockchain.ListCandleDataResp ListSymbolsRequest = app_cloudep_blockchain.ListSymbolsRequest ListSymbolsResponse = app_cloudep_blockchain.ListSymbolsResponse NoneReq = app_cloudep_blockchain.NoneReq @@ -25,6 +26,7 @@ type ( // ListSymbols retrieves all available trading symbols. ListSymbols(ctx context.Context, in *ListSymbolsRequest, opts ...grpc.CallOption) (*ListSymbolsResponse, error) GetHistoryKlineData(ctx context.Context, in *HistoryReq, opts ...grpc.CallOption) (*OKResp, error) + ListCandleData(ctx context.Context, in *HistoryReq, opts ...grpc.CallOption) (*ListCandleDataResp, error) // Ping is a health-check endpoint. Ping(ctx context.Context, in *NoneReq, opts ...grpc.CallOption) (*OKResp, error) } @@ -51,6 +53,11 @@ func (m *defaultBlockchainService) GetHistoryKlineData(ctx context.Context, in * return client.GetHistoryKlineData(ctx, in, opts...) } +func (m *defaultBlockchainService) ListCandleData(ctx context.Context, in *HistoryReq, opts ...grpc.CallOption) (*ListCandleDataResp, error) { + client := app_cloudep_blockchain.NewBlockchainServiceClient(m.cli.Conn()) + return client.ListCandleData(ctx, in, opts...) +} + // Ping is a health-check endpoint. func (m *defaultBlockchainService) Ping(ctx context.Context, in *NoneReq, opts ...grpc.CallOption) (*OKResp, error) { client := app_cloudep_blockchain.NewBlockchainServiceClient(m.cli.Conn()) diff --git a/etc/blockchain.yaml b/etc/blockchain.yaml index 9d6ba1c..04c764d 100644 --- a/etc/blockchain.yaml +++ b/etc/blockchain.yaml @@ -12,14 +12,14 @@ Binance: WorkerSize: 20 RedisCluster: - Host: 10.0.0.13:6379 + Host: localhost:6379 Type: node Cassandra: Hosts: - - 10.0.0.13 + - localhost Port: 9042 - Keyspace: digimon + Keyspace: kline UseAuth: true Username: cassandra Password: cassandra diff --git a/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain/blockchain.pb.go b/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain/blockchain.pb.go index 26c7afc..5260b8a 100644 --- a/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain/blockchain.pb.go +++ b/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain/blockchain.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.1 +// protoc-gen-go v1.36.5 // protoc v3.19.4 // source: generate/rpc/blockchain.proto @@ -11,6 +11,7 @@ import ( protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" + unsafe "unsafe" ) const ( @@ -330,9 +331,45 @@ func (x *HistoryReq) GetEndTime() string { return "" } +type ListCandleDataResp struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ListCandleDataResp) Reset() { + *x = ListCandleDataResp{} + mi := &file_generate_rpc_blockchain_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ListCandleDataResp) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListCandleDataResp) ProtoMessage() {} + +func (x *ListCandleDataResp) ProtoReflect() protoreflect.Message { + mi := &file_generate_rpc_blockchain_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListCandleDataResp.ProtoReflect.Descriptor instead. +func (*ListCandleDataResp) Descriptor() ([]byte, []int) { + return file_generate_rpc_blockchain_proto_rawDescGZIP(), []int{6} +} + var File_generate_rpc_blockchain_proto protoreflect.FileDescriptor -var file_generate_rpc_blockchain_proto_rawDesc = []byte{ +var file_generate_rpc_blockchain_proto_rawDesc = string([]byte{ 0x0a, 0x1d, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x22, 0x08, 0x0a, 0x06, 0x4f, @@ -364,17 +401,23 @@ var file_generate_rpc_blockchain_proto_rawDesc = []byte{ 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x07, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x32, 0xd7, 0x01, 0x0a, 0x11, 0x42, 0x6c, 0x6f, - 0x63, 0x6b, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x4e, - 0x0a, 0x0b, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x79, 0x6d, 0x62, 0x6f, 0x6c, 0x73, 0x12, 0x1e, 0x2e, - 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x53, - 0x79, 0x6d, 0x62, 0x6f, 0x6c, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, - 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x53, - 0x79, 0x6d, 0x62, 0x6f, 0x6c, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x41, - 0x0a, 0x13, 0x47, 0x65, 0x74, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x4b, 0x6c, 0x69, 0x6e, - 0x65, 0x44, 0x61, 0x74, 0x61, 0x12, 0x16, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x63, 0x68, 0x61, - 0x69, 0x6e, 0x2e, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x52, 0x65, 0x71, 0x1a, 0x12, 0x2e, - 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x2e, 0x4f, 0x4b, 0x52, 0x65, 0x73, + 0x07, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x14, 0x0a, 0x12, 0x4c, 0x69, 0x73, 0x74, + 0x43, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x32, 0xa1, + 0x02, 0x0a, 0x11, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x53, 0x65, 0x72, + 0x76, 0x69, 0x63, 0x65, 0x12, 0x4e, 0x0a, 0x0b, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x79, 0x6d, 0x62, + 0x6f, 0x6c, 0x73, 0x12, 0x1e, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x63, 0x68, 0x61, 0x69, 0x6e, + 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x79, 0x6d, 0x62, 0x6f, 0x6c, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x63, 0x68, 0x61, 0x69, 0x6e, + 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x79, 0x6d, 0x62, 0x6f, 0x6c, 0x73, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x41, 0x0a, 0x13, 0x47, 0x65, 0x74, 0x48, 0x69, 0x73, 0x74, 0x6f, + 0x72, 0x79, 0x4b, 0x6c, 0x69, 0x6e, 0x65, 0x44, 0x61, 0x74, 0x61, 0x12, 0x16, 0x2e, 0x62, 0x6c, + 0x6f, 0x63, 0x6b, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x2e, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, + 0x52, 0x65, 0x71, 0x1a, 0x12, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x63, 0x68, 0x61, 0x69, 0x6e, + 0x2e, 0x4f, 0x4b, 0x52, 0x65, 0x73, 0x70, 0x12, 0x48, 0x0a, 0x0e, 0x4c, 0x69, 0x73, 0x74, 0x43, + 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x12, 0x16, 0x2e, 0x62, 0x6c, 0x6f, 0x63, + 0x6b, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x2e, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x72, 0x79, 0x52, 0x65, + 0x71, 0x1a, 0x1e, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x2e, 0x4c, + 0x69, 0x73, 0x74, 0x43, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x12, 0x2f, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x13, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x2e, 0x4e, 0x6f, 0x6e, 0x65, 0x52, 0x65, 0x71, 0x1a, 0x12, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x2e, 0x4f, 0x4b, 0x52, 0x65, @@ -382,21 +425,21 @@ var file_generate_rpc_blockchain_proto_rawDesc = []byte{ 0x6e, 0x65, 0x74, 0x2f, 0x64, 0x69, 0x67, 0x69, 0x6d, 0x6f, 0x6e, 0x2f, 0x61, 0x70, 0x70, 0x2d, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x70, 0x2d, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} +}) var ( file_generate_rpc_blockchain_proto_rawDescOnce sync.Once - file_generate_rpc_blockchain_proto_rawDescData = file_generate_rpc_blockchain_proto_rawDesc + file_generate_rpc_blockchain_proto_rawDescData []byte ) func file_generate_rpc_blockchain_proto_rawDescGZIP() []byte { file_generate_rpc_blockchain_proto_rawDescOnce.Do(func() { - file_generate_rpc_blockchain_proto_rawDescData = protoimpl.X.CompressGZIP(file_generate_rpc_blockchain_proto_rawDescData) + file_generate_rpc_blockchain_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_generate_rpc_blockchain_proto_rawDesc), len(file_generate_rpc_blockchain_proto_rawDesc))) }) return file_generate_rpc_blockchain_proto_rawDescData } -var file_generate_rpc_blockchain_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_generate_rpc_blockchain_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_generate_rpc_blockchain_proto_goTypes = []any{ (*OKResp)(nil), // 0: blockchain.OKResp (*NoneReq)(nil), // 1: blockchain.NoneReq @@ -404,17 +447,20 @@ var file_generate_rpc_blockchain_proto_goTypes = []any{ (*ListSymbolsResponse)(nil), // 3: blockchain.ListSymbolsResponse (*Symbol)(nil), // 4: blockchain.Symbol (*HistoryReq)(nil), // 5: blockchain.HistoryReq + (*ListCandleDataResp)(nil), // 6: blockchain.ListCandleDataResp } var file_generate_rpc_blockchain_proto_depIdxs = []int32{ 4, // 0: blockchain.ListSymbolsResponse.symbols:type_name -> blockchain.Symbol 2, // 1: blockchain.BlockchainService.ListSymbols:input_type -> blockchain.ListSymbolsRequest 5, // 2: blockchain.BlockchainService.GetHistoryKlineData:input_type -> blockchain.HistoryReq - 1, // 3: blockchain.BlockchainService.Ping:input_type -> blockchain.NoneReq - 3, // 4: blockchain.BlockchainService.ListSymbols:output_type -> blockchain.ListSymbolsResponse - 0, // 5: blockchain.BlockchainService.GetHistoryKlineData:output_type -> blockchain.OKResp - 0, // 6: blockchain.BlockchainService.Ping:output_type -> blockchain.OKResp - 4, // [4:7] is the sub-list for method output_type - 1, // [1:4] is the sub-list for method input_type + 5, // 3: blockchain.BlockchainService.ListCandleData:input_type -> blockchain.HistoryReq + 1, // 4: blockchain.BlockchainService.Ping:input_type -> blockchain.NoneReq + 3, // 5: blockchain.BlockchainService.ListSymbols:output_type -> blockchain.ListSymbolsResponse + 0, // 6: blockchain.BlockchainService.GetHistoryKlineData:output_type -> blockchain.OKResp + 6, // 7: blockchain.BlockchainService.ListCandleData:output_type -> blockchain.ListCandleDataResp + 0, // 8: blockchain.BlockchainService.Ping:output_type -> blockchain.OKResp + 5, // [5:9] is the sub-list for method output_type + 1, // [1:5] is the sub-list for method input_type 1, // [1:1] is the sub-list for extension type_name 1, // [1:1] is the sub-list for extension extendee 0, // [0:1] is the sub-list for field type_name @@ -429,9 +475,9 @@ func file_generate_rpc_blockchain_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_generate_rpc_blockchain_proto_rawDesc, + RawDescriptor: unsafe.Slice(unsafe.StringData(file_generate_rpc_blockchain_proto_rawDesc), len(file_generate_rpc_blockchain_proto_rawDesc)), NumEnums: 0, - NumMessages: 6, + NumMessages: 7, NumExtensions: 0, NumServices: 1, }, @@ -440,7 +486,6 @@ func file_generate_rpc_blockchain_proto_init() { MessageInfos: file_generate_rpc_blockchain_proto_msgTypes, }.Build() File_generate_rpc_blockchain_proto = out.File - file_generate_rpc_blockchain_proto_rawDesc = nil file_generate_rpc_blockchain_proto_goTypes = nil file_generate_rpc_blockchain_proto_depIdxs = nil } diff --git a/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain/blockchain_grpc.pb.go b/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain/blockchain_grpc.pb.go index 5855f47..1a5c5a1 100644 --- a/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain/blockchain_grpc.pb.go +++ b/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain/blockchain_grpc.pb.go @@ -21,6 +21,7 @@ const _ = grpc.SupportPackageIsVersion9 const ( BlockchainService_ListSymbols_FullMethodName = "/blockchain.BlockchainService/ListSymbols" BlockchainService_GetHistoryKlineData_FullMethodName = "/blockchain.BlockchainService/GetHistoryKlineData" + BlockchainService_ListCandleData_FullMethodName = "/blockchain.BlockchainService/ListCandleData" BlockchainService_Ping_FullMethodName = "/blockchain.BlockchainService/Ping" ) @@ -31,6 +32,7 @@ type BlockchainServiceClient interface { // ListSymbols retrieves all available trading symbols. ListSymbols(ctx context.Context, in *ListSymbolsRequest, opts ...grpc.CallOption) (*ListSymbolsResponse, error) GetHistoryKlineData(ctx context.Context, in *HistoryReq, opts ...grpc.CallOption) (*OKResp, error) + ListCandleData(ctx context.Context, in *HistoryReq, opts ...grpc.CallOption) (*ListCandleDataResp, error) // Ping is a health-check endpoint. Ping(ctx context.Context, in *NoneReq, opts ...grpc.CallOption) (*OKResp, error) } @@ -63,6 +65,16 @@ func (c *blockchainServiceClient) GetHistoryKlineData(ctx context.Context, in *H return out, nil } +func (c *blockchainServiceClient) ListCandleData(ctx context.Context, in *HistoryReq, opts ...grpc.CallOption) (*ListCandleDataResp, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(ListCandleDataResp) + err := c.cc.Invoke(ctx, BlockchainService_ListCandleData_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *blockchainServiceClient) Ping(ctx context.Context, in *NoneReq, opts ...grpc.CallOption) (*OKResp, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(OKResp) @@ -80,6 +92,7 @@ type BlockchainServiceServer interface { // ListSymbols retrieves all available trading symbols. ListSymbols(context.Context, *ListSymbolsRequest) (*ListSymbolsResponse, error) GetHistoryKlineData(context.Context, *HistoryReq) (*OKResp, error) + ListCandleData(context.Context, *HistoryReq) (*ListCandleDataResp, error) // Ping is a health-check endpoint. Ping(context.Context, *NoneReq) (*OKResp, error) mustEmbedUnimplementedBlockchainServiceServer() @@ -98,6 +111,9 @@ func (UnimplementedBlockchainServiceServer) ListSymbols(context.Context, *ListSy func (UnimplementedBlockchainServiceServer) GetHistoryKlineData(context.Context, *HistoryReq) (*OKResp, error) { return nil, status.Errorf(codes.Unimplemented, "method GetHistoryKlineData not implemented") } +func (UnimplementedBlockchainServiceServer) ListCandleData(context.Context, *HistoryReq) (*ListCandleDataResp, error) { + return nil, status.Errorf(codes.Unimplemented, "method ListCandleData not implemented") +} func (UnimplementedBlockchainServiceServer) Ping(context.Context, *NoneReq) (*OKResp, error) { return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") } @@ -158,6 +174,24 @@ func _BlockchainService_GetHistoryKlineData_Handler(srv interface{}, ctx context return interceptor(ctx, in, info, handler) } +func _BlockchainService_ListCandleData_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(HistoryReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BlockchainServiceServer).ListCandleData(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: BlockchainService_ListCandleData_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BlockchainServiceServer).ListCandleData(ctx, req.(*HistoryReq)) + } + return interceptor(ctx, in, info, handler) +} + func _BlockchainService_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(NoneReq) if err := dec(in); err != nil { @@ -191,6 +225,10 @@ var BlockchainService_ServiceDesc = grpc.ServiceDesc{ MethodName: "GetHistoryKlineData", Handler: _BlockchainService_GetHistoryKlineData_Handler, }, + { + MethodName: "ListCandleData", + Handler: _BlockchainService_ListCandleData_Handler, + }, { MethodName: "Ping", Handler: _BlockchainService_Ping_Handler, diff --git a/generate/rpc/blockchain.proto b/generate/rpc/blockchain.proto index 8de42f1..189334c 100644 --- a/generate/rpc/blockchain.proto +++ b/generate/rpc/blockchain.proto @@ -33,10 +33,15 @@ message HistoryReq { string end_time=4; } +message ListCandleDataResp { + +} + service BlockchainService{ // ListSymbols retrieves all available trading symbols. rpc ListSymbols(ListSymbolsRequest) returns(ListSymbolsResponse); rpc GetHistoryKlineData(HistoryReq)returns(OKResp); + rpc ListCandleData(HistoryReq)returns(ListCandleDataResp); // Ping is a health-check endpoint. rpc Ping(NoneReq) returns(OKResp); } diff --git a/go.mod b/go.mod index 4a948d8..29dffb5 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/lxzan/gws v1.8.9 github.com/panjf2000/ants/v2 v2.11.3 github.com/scylladb/gocqlx/v3 v3.0.1 + github.com/shopspring/decimal v1.4.0 github.com/stretchr/testify v1.10.0 github.com/testcontainers/testcontainers-go v0.38.0 github.com/zeromicro/go-zero v1.8.5 @@ -55,7 +56,6 @@ require ( github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/scylladb/go-reflectx v1.0.1 // indirect github.com/shirou/gopsutil/v4 v4.25.5 // indirect - github.com/shopspring/decimal v1.4.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect diff --git a/internal/logic/blockchainservice/get_history_kline_data_logic.go b/internal/logic/blockchainservice/get_history_kline_data_logic.go index f158fd0..8996b2c 100644 --- a/internal/logic/blockchainservice/get_history_kline_data_logic.go +++ b/internal/logic/blockchainservice/get_history_kline_data_logic.go @@ -3,6 +3,7 @@ package blockchainservicelogic import ( "blockchain/internal/domain/usecase" "context" + "fmt" "time" "blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain" @@ -26,17 +27,81 @@ func NewGetHistoryKlineDataLogic(ctx context.Context, svcCtx *svc.ServiceContext } func (l *GetHistoryKlineDataLogic) GetHistoryKlineData(in *app_cloudep_blockchain.HistoryReq) (*app_cloudep_blockchain.OKResp, error) { - go func() { - st, _ := time.Parse(time.RFC3339, in.GetStartTime()) - et, _ := time.Parse(time.RFC3339, in.GetEndTime()) - _ = l.svcCtx.BinanceDataSource.UpsertKline(context.Background(), usecase.QueryKline{ - Symbol: in.GetSymbol(), - Interval: in.GetInterval(), - StartTime: st.UnixNano(), - EndTime: et.UnixNano(), + // 1) 先驗證輸入(避免把錯誤時間丟進背景 goroutine) + st, err := time.Parse(time.RFC3339, in.GetStartTime()) + if err != nil { + return nil, fmt.Errorf("invalid startTime (RFC3339): %w", err) + } + et, err := time.Parse(time.RFC3339, in.GetEndTime()) + if err != nil { + return nil, fmt.Errorf("invalid endTime (RFC3339): %w", err) + } + if !st.Before(et) { + return nil, fmt.Errorf("startTime must be before endTime") + } + + chunks := splitByWeeksUTC(st, et) + if len(chunks) == 0 { + return &app_cloudep_blockchain.OKResp{}, nil + } + + for _, ch := range chunks { + timeBucket := ch + err := l.svcCtx.WorkerPools.Submit(func() { + _ = l.svcCtx.BinanceDataSource.UpsertKline(context.Background(), usecase.QueryKline{ + Symbol: in.GetSymbol(), + Interval: in.GetInterval(), + StartTime: timeBucket.start.UnixNano(), + EndTime: timeBucket.end.UnixNano(), + }) }) - return - }() + if err != nil { + return nil, err + } + } return &app_cloudep_blockchain.OKResp{}, nil } + +// --------- 小工具:按「週一起」切段(UTC) --------- + +type timeRange struct { + start time.Time // 含 + end time.Time // 不含 +} + +// 取得該時間所在「週一 00:00:00」(UTC) +func weekStartMondayUTC(t time.Time) time.Time { + t = t.UTC() + // Go: Sunday=0 ... Saturday=6;我們要週一=起始 + wd := int(t.Weekday()) + if wd == 0 { // Sunday + wd = 7 + } + // 將時間歸零到當天 00:00 + d0 := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC) + // 回推到週一 + return d0.AddDate(0, 0, -(wd - 1)) +} + +// 將 [start,end) 切成多個「週粒度」的區間 +func splitByWeeksUTC(start, end time.Time) []timeRange { + var chunks []timeRange + if !start.Before(end) { + return chunks + } + cur := start.UTC() + end = end.UTC() + + for cur.Before(end) { + // 下一個週一 00:00 + nextWeekStart := weekStartMondayUTC(cur).AddDate(0, 0, 7) + chunkEnd := end + if nextWeekStart.Before(end) { + chunkEnd = nextWeekStart + } + chunks = append(chunks, timeRange{start: cur, end: chunkEnd}) + cur = chunkEnd + } + return chunks +} diff --git a/internal/logic/blockchainservice/list_candle_data_logic.go b/internal/logic/blockchainservice/list_candle_data_logic.go new file mode 100644 index 0000000..960eafa --- /dev/null +++ b/internal/logic/blockchainservice/list_candle_data_logic.go @@ -0,0 +1,53 @@ +package blockchainservicelogic + +import ( + "blockchain/internal/domain/usecase" + "context" + "fmt" + "time" + + "blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain" + "blockchain/internal/svc" + + "github.com/zeromicro/go-zero/core/logx" +) + +type ListCandleDataLogic struct { + ctx context.Context + svcCtx *svc.ServiceContext + logx.Logger +} + +func NewListCandleDataLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ListCandleDataLogic { + return &ListCandleDataLogic{ + ctx: ctx, + svcCtx: svcCtx, + Logger: logx.WithContext(ctx), + } +} + +func (l *ListCandleDataLogic) ListCandleData(in *app_cloudep_blockchain.HistoryReq) (*app_cloudep_blockchain.ListCandleDataResp, error) { + // 1) 先驗證輸入(避免把錯誤時間丟進背景 goroutine) + st, err := time.Parse(time.RFC3339, in.GetStartTime()) + if err != nil { + return nil, fmt.Errorf("invalid startTime (RFC3339): %w", err) + } + et, err := time.Parse(time.RFC3339, in.GetEndTime()) + if err != nil { + return nil, fmt.Errorf("invalid endTime (RFC3339): %w", err) + } + + kline, err := l.svcCtx.BinanceDataSource.ListKline(l.ctx, usecase.QueryKline{ + Symbol: in.GetSymbol(), + Interval: in.GetInterval(), + StartTime: st.UnixMilli(), + EndTime: et.UnixMilli(), + }) + if err != nil { + return nil, err + } + + fmt.Println(kline) + + return &app_cloudep_blockchain.ListCandleDataResp{}, nil +} diff --git a/internal/logic/blockchainservice/ping_logic.go b/internal/logic/blockchainservice/ping_logic.go index 35abdfa..0a35691 100644 --- a/internal/logic/blockchainservice/ping_logic.go +++ b/internal/logic/blockchainservice/ping_logic.go @@ -2,10 +2,8 @@ package blockchainservicelogic import ( app_cloudep_blockchain "blockchain/gen_result/pb/code.30cm.net/digimon/app-cloudep-blockchain" - "blockchain/internal/domain/usecase" "blockchain/internal/svc" "context" - "fmt" "github.com/zeromicro/go-zero/core/logx" ) @@ -26,15 +24,15 @@ func NewPingLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PingLogic { func (l *PingLogic) Ping(_ *app_cloudep_blockchain.NoneReq) (*app_cloudep_blockchain.OKResp, error) { - kline, err := l.svcCtx.BinanceDataSource.ListKline(l.ctx, usecase.QueryKline{ - Symbol: "ETHUSDT", - Interval: "1d", - StartTime: 1502928000000, - EndTime: 1503964800000, - }) - if err != nil { - return nil, err - } + //kline, err := l.svcCtx.BinanceDataSource.ListKline(l.ctx, usecase.QueryKline{ + // Symbol: "ETHUSDT", + // Interval: "1d", + // StartTime: 1502928000000, + // EndTime: 1503964800000, + //}) + //if err != nil { + // return nil, err + //} return &app_cloudep_blockchain.OKResp{}, nil } diff --git a/internal/repository/data_source_binance.go b/internal/repository/data_source_binance.go index c1724e3..4f90726 100644 --- a/internal/repository/data_source_binance.go +++ b/internal/repository/data_source_binance.go @@ -207,6 +207,7 @@ func (repo *BinanceRepository) GetKline(ctx context.Context, param repository.Qu qb.LtOrEqNamed("open_time", "end"), ).ToCql() + fmt.Println(stmt, param.Symbol, param.Interval, param.StartTime, param.EndTime) var result []entity.Kline err := repo.db.GetSession().Query(stmt, names).BindMap(qb.M{ "symbol": param.Symbol, diff --git a/internal/server/blockchainservice/blockchain_service_server.go b/internal/server/blockchainservice/blockchain_service_server.go index 18536d8..581dbf7 100644 --- a/internal/server/blockchainservice/blockchain_service_server.go +++ b/internal/server/blockchainservice/blockchain_service_server.go @@ -1,5 +1,5 @@ // Code generated by goctl. DO NOT EDIT. -// goctl 1.8.1 +// goctl 1.8.5 // Source: blockchain.proto package server @@ -34,6 +34,11 @@ func (s *BlockchainServiceServer) GetHistoryKlineData(ctx context.Context, in *a return l.GetHistoryKlineData(in) } +func (s *BlockchainServiceServer) ListCandleData(ctx context.Context, in *app_cloudep_blockchain.HistoryReq) (*app_cloudep_blockchain.ListCandleDataResp, error) { + l := blockchainservicelogic.NewListCandleDataLogic(ctx, s.svcCtx) + return l.ListCandleData(in) +} + // Ping is a health-check endpoint. func (s *BlockchainServiceServer) Ping(ctx context.Context, in *app_cloudep_blockchain.NoneReq) (*app_cloudep_blockchain.OKResp, error) { l := blockchainservicelogic.NewPingLogic(ctx, s.svcCtx) diff --git a/internal/svc/service_context.go b/internal/svc/service_context.go index 21a90f9..6bcafef 100644 --- a/internal/svc/service_context.go +++ b/internal/svc/service_context.go @@ -6,6 +6,7 @@ import ( "blockchain/internal/domain/usecase" repo "blockchain/internal/repository" uc "blockchain/internal/usecase" + "github.com/panjf2000/ants/v2" "github.com/zeromicro/go-zero/core/stores/redis" ) @@ -13,6 +14,8 @@ type ServiceContext struct { Config config.Config BinanceDataSource usecase.DataSourceUseCase BinanceRepo repository.DataSourceRepository + + WorkerPools *ants.Pool } func NewServiceContext(c config.Config) *ServiceContext { @@ -32,6 +35,11 @@ func NewServiceContext(c config.Config) *ServiceContext { DB: cassandra, KeySpace: c.Cassandra.Keyspace, }) + + wp, err := ants.NewPool(1024) + if err != nil { + panic(err) + } // InitBinanceKLineWebsocket() return &ServiceContext{ Config: c, @@ -39,5 +47,6 @@ func NewServiceContext(c config.Config) *ServiceContext { BinanceDataSource: uc.MustBinanceUseCase(uc.BinanceUseCaseParam{ BinanceRepo: binanceRepo, }), + WorkerPools: wp, } }