feature/fanout #3
|
@ -20,3 +20,12 @@ TimelineSetting:
|
||||||
RedisCluster:
|
RedisCluster:
|
||||||
Host: 127.0.0.1:7001
|
Host: 127.0.0.1:7001
|
||||||
Type: cluster
|
Type: cluster
|
||||||
|
|
||||||
|
Neo4J:
|
||||||
|
URI: neo4j://localhost:7687
|
||||||
|
Username: neo4j
|
||||||
|
Password: yyyytttt
|
||||||
|
MaxConnectionPoolSize: 20
|
||||||
|
MaxConnectionLifetime: 200s
|
||||||
|
ConnectionTimeout : 200s
|
||||||
|
LogLevel : debug
|
|
@ -0,0 +1,5 @@
|
||||||
|
// 企業版才能用,社群版只能用預設的
|
||||||
|
CREATE DATABASE relation;
|
||||||
|
|
||||||
|
// 創建 User 節點 UID 是唯一鍵
|
||||||
|
CREATE CONSTRAINT FOR (u:User) REQUIRE u.UID IS UNIQUE
|
|
@ -243,3 +243,15 @@ service TimelineService
|
||||||
// ClearNoMoreDataFlag 清除時間線的 "NoMoreData" 標誌。
|
// ClearNoMoreDataFlag 清除時間線的 "NoMoreData" 標誌。
|
||||||
rpc ClearNoMoreDataFlag(DoNoMoreDataReq) returns (OKResp);
|
rpc ClearNoMoreDataFlag(DoNoMoreDataReq) returns (OKResp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ========== Social Network (關係網路) ==========
|
||||||
|
|
||||||
|
message AddUserToNetworkReq
|
||||||
|
{
|
||||||
|
string uid = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
service SocialNetworkService
|
||||||
|
{
|
||||||
|
rpc AddUserToNetwork(AddUserToNetworkReq) returns (OKResp);
|
||||||
|
}
|
1
go.mod
1
go.mod
|
@ -6,6 +6,7 @@ require (
|
||||||
code.30cm.net/digimon/library-go/errs v1.2.4
|
code.30cm.net/digimon/library-go/errs v1.2.4
|
||||||
code.30cm.net/digimon/library-go/validator v1.0.0
|
code.30cm.net/digimon/library-go/validator v1.0.0
|
||||||
github.com/alicebob/miniredis/v2 v2.33.0
|
github.com/alicebob/miniredis/v2 v2.33.0
|
||||||
|
github.com/neo4j/neo4j-go-driver/v5 v5.24.0
|
||||||
github.com/stretchr/testify v1.9.0
|
github.com/stretchr/testify v1.9.0
|
||||||
github.com/zeromicro/go-zero v1.7.0
|
github.com/zeromicro/go-zero v1.7.0
|
||||||
go.mongodb.org/mongo-driver v1.16.0
|
go.mongodb.org/mongo-driver v1.16.0
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/zeromicro/go-zero/core/stores/redis"
|
"github.com/zeromicro/go-zero/core/stores/redis"
|
||||||
"github.com/zeromicro/go-zero/zrpc"
|
"github.com/zeromicro/go-zero/zrpc"
|
||||||
)
|
)
|
||||||
|
@ -24,4 +26,15 @@ type Config struct {
|
||||||
|
|
||||||
// Redis Cluster
|
// Redis Cluster
|
||||||
RedisCluster redis.RedisConf
|
RedisCluster redis.RedisConf
|
||||||
|
|
||||||
|
// 圖形話資料庫
|
||||||
|
Neo4J struct {
|
||||||
|
URI string
|
||||||
|
Username string
|
||||||
|
Password string
|
||||||
|
MaxConnectionPoolSize int
|
||||||
|
MaxConnectionLifetime time.Duration
|
||||||
|
ConnectionTimeout time.Duration
|
||||||
|
LogLevel string
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,7 @@
|
||||||
|
package repository
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
type SocialNetworkRepository interface {
|
||||||
|
CreateUserNode(ctx context.Context, uid string) error
|
||||||
|
}
|
|
@ -0,0 +1,14 @@
|
||||||
|
package neo4j
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
// Config holds the configuration for Neo4j connection.
|
||||||
|
type Config struct {
|
||||||
|
URI string
|
||||||
|
Username string
|
||||||
|
Password string
|
||||||
|
MaxConnectionPoolSize int
|
||||||
|
MaxConnectionLifetime time.Duration
|
||||||
|
ConnectionTimeout time.Duration
|
||||||
|
LogLevel string
|
||||||
|
}
|
|
@ -0,0 +1,54 @@
|
||||||
|
package neo4j
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
|
||||||
|
n4Cfg "github.com/neo4j/neo4j-go-driver/v5/neo4j/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewNeo4J initializes a Neo4jInit using the provided Config and options.
|
||||||
|
// If opts is not provided, it will initialize Neo4jInit with default configuration.
|
||||||
|
func NewNeo4J(conf *Config, opts ...Option) *Client {
|
||||||
|
driverConfig := &n4Cfg.Config{
|
||||||
|
MaxConnectionLifetime: conf.MaxConnectionLifetime,
|
||||||
|
MaxConnectionPoolSize: conf.MaxConnectionPoolSize,
|
||||||
|
ConnectionAcquisitionTimeout: conf.ConnectionTimeout,
|
||||||
|
}
|
||||||
|
|
||||||
|
neo4ji := &Client{
|
||||||
|
neo4jConf: driverConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(neo4ji)
|
||||||
|
}
|
||||||
|
|
||||||
|
return neo4ji
|
||||||
|
}
|
||||||
|
|
||||||
|
// Conn initiates connection to the database and returns a Neo4j driver instance.
|
||||||
|
func (c *Client) Conn() (neo4j.DriverWithContext, error) {
|
||||||
|
auth := neo4j.BasicAuth(c.serviceConf.Username, c.serviceConf.Password, "")
|
||||||
|
driver, err := neo4j.NewDriverWithContext(c.serviceConf.URI, auth, func(c *n4Cfg.Config) {})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("neo4j driver initialization error: %w", err)
|
||||||
|
}
|
||||||
|
defer func(driver neo4j.DriverWithContext, ctx context.Context) {
|
||||||
|
err := driver.Close(ctx)
|
||||||
|
if err != nil {
|
||||||
|
// fmt
|
||||||
|
|
||||||
|
}
|
||||||
|
}(driver, context.Background())
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
// Verify the connection to Neo4j.
|
||||||
|
err = driver.VerifyConnectivity(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("neo4j connectivity verification error: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return driver, nil
|
||||||
|
}
|
|
@ -0,0 +1,60 @@
|
||||||
|
package neo4j
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
n4Cfg "github.com/neo4j/neo4j-go-driver/v5/neo4j/config"
|
||||||
|
"github.com/neo4j/neo4j-go-driver/v5/neo4j/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultMaxConnectionLifetime = 5 * time.Minute
|
||||||
|
defaultMaxConnectionPoolSize = 25
|
||||||
|
defaultConnectionTimeout = 5 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
// Option configures Neo4jInit behaviour.
|
||||||
|
type Option func(*Client)
|
||||||
|
|
||||||
|
type Client struct {
|
||||||
|
neo4jConf *n4Cfg.Config
|
||||||
|
serviceConf Config
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithLogLevel sets the log level for the Neo4j driver.
|
||||||
|
func WithLogLevel(level string) Option {
|
||||||
|
return func(neo4ji *Client) {
|
||||||
|
var logger log.Logger
|
||||||
|
|
||||||
|
switch strings.ToLower(level) {
|
||||||
|
case "panic", "fatal", "error":
|
||||||
|
logger = log.ToConsole(log.ERROR)
|
||||||
|
case "warn", "warning":
|
||||||
|
logger = log.ToConsole(log.WARNING)
|
||||||
|
case "info", "debug", "trace":
|
||||||
|
logger = log.ToConsole(log.INFO)
|
||||||
|
default:
|
||||||
|
logger = log.ToConsole(log.ERROR)
|
||||||
|
}
|
||||||
|
|
||||||
|
neo4ji.neo4jConf.Log = logger
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithPerformance configures the Neo4j driver for performance by setting connection pool size and lifetime.
|
||||||
|
func WithPerformance() Option {
|
||||||
|
return func(neo4ji *Client) {
|
||||||
|
if neo4ji.serviceConf.MaxConnectionPoolSize > 0 {
|
||||||
|
neo4ji.neo4jConf.MaxConnectionPoolSize = neo4ji.serviceConf.MaxConnectionPoolSize
|
||||||
|
} else {
|
||||||
|
neo4ji.neo4jConf.MaxConnectionPoolSize = defaultMaxConnectionPoolSize
|
||||||
|
}
|
||||||
|
|
||||||
|
if neo4ji.serviceConf.MaxConnectionLifetime > 0 {
|
||||||
|
neo4ji.neo4jConf.MaxConnectionLifetime = neo4ji.serviceConf.MaxConnectionLifetime
|
||||||
|
} else {
|
||||||
|
neo4ji.neo4jConf.MaxConnectionLifetime = defaultMaxConnectionLifetime
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,36 @@
|
||||||
|
package socialnetworkservicelogic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"app-cloudep-tweeting-service/gen_result/pb/tweeting"
|
||||||
|
"app-cloudep-tweeting-service/internal/svc"
|
||||||
|
|
||||||
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
|
)
|
||||||
|
|
||||||
|
type AddUserToNetworkLogic struct {
|
||||||
|
ctx context.Context
|
||||||
|
svcCtx *svc.ServiceContext
|
||||||
|
logx.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAddUserToNetworkLogic(ctx context.Context, svcCtx *svc.ServiceContext) *AddUserToNetworkLogic {
|
||||||
|
return &AddUserToNetworkLogic{
|
||||||
|
ctx: ctx,
|
||||||
|
svcCtx: svcCtx,
|
||||||
|
Logger: logx.WithContext(ctx),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *AddUserToNetworkLogic) AddUserToNetwork(in *tweeting.AddUserToNetworkReq) (*tweeting.OKResp, error) {
|
||||||
|
// todo: add your logic here and delete this line
|
||||||
|
err := l.svcCtx.SocialNetworkRepository.CreateUserNode(l.ctx, in.GetUid())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
fmt.Println(err)
|
||||||
|
|
||||||
|
return &tweeting.OKResp{}, nil
|
||||||
|
}
|
|
@ -0,0 +1,53 @@
|
||||||
|
package repository
|
||||||
|
|
||||||
|
import (
|
||||||
|
"app-cloudep-tweeting-service/internal/config"
|
||||||
|
"app-cloudep-tweeting-service/internal/domain/repository"
|
||||||
|
client4J "app-cloudep-tweeting-service/internal/lib/neo4j"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
ers "code.30cm.net/digimon/library-go/errs"
|
||||||
|
n4 "github.com/neo4j/neo4j-go-driver/v5/neo4j"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SocialNetworkParam struct {
|
||||||
|
Config config.Config
|
||||||
|
Neo4jClient *client4J.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
type SocialNetworkRepository struct {
|
||||||
|
cfg config.Config
|
||||||
|
neo4jClient *client4J.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func MustSocialNetworkRepository(param SocialNetworkParam) repository.SocialNetworkRepository {
|
||||||
|
return &SocialNetworkRepository{
|
||||||
|
cfg: param.Config,
|
||||||
|
neo4jClient: param.Neo4jClient,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s SocialNetworkRepository) CreateUserNode(ctx context.Context, uid string) error {
|
||||||
|
// 執行 Cypher
|
||||||
|
conn, err := s.neo4jClient.Conn()
|
||||||
|
if err != nil {
|
||||||
|
return ers.DBError("failed to connect to node4j", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
insert := map[string]any{
|
||||||
|
"uid": uid,
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := conn.NewSession(ctx, n4.SessionConfig{
|
||||||
|
AccessMode: n4.AccessModeWrite,
|
||||||
|
}).Run(ctx, "CREATE (n:Person {name: $name}) RETURN n", insert)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println(result.Keys())
|
||||||
|
|
||||||
|
return nil
|
||||||
|
|
||||||
|
}
|
|
@ -24,7 +24,7 @@ type TimelineRepository struct {
|
||||||
redis redis.Redis
|
redis redis.Redis
|
||||||
}
|
}
|
||||||
|
|
||||||
func MustGenerateUseCase(param TimelineRepositoryParam) repository.TimelineRepository {
|
func MustGenerateRepository(param TimelineRepositoryParam) repository.TimelineRepository {
|
||||||
return &TimelineRepository{
|
return &TimelineRepository{
|
||||||
cfg: param.Config,
|
cfg: param.Config,
|
||||||
redis: param.Redis,
|
redis: param.Redis,
|
||||||
|
|
|
@ -7,7 +7,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"app-cloudep-tweeting-service/gen_result/pb/tweeting"
|
"app-cloudep-tweeting-service/gen_result/pb/tweeting"
|
||||||
commentservicelogic "app-cloudep-tweeting-service/internal/logic/commentservice"
|
"app-cloudep-tweeting-service/internal/logic/commentservice"
|
||||||
"app-cloudep-tweeting-service/internal/svc"
|
"app-cloudep-tweeting-service/internal/svc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -7,7 +7,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"app-cloudep-tweeting-service/gen_result/pb/tweeting"
|
"app-cloudep-tweeting-service/gen_result/pb/tweeting"
|
||||||
postservicelogic "app-cloudep-tweeting-service/internal/logic/postservice"
|
"app-cloudep-tweeting-service/internal/logic/postservice"
|
||||||
"app-cloudep-tweeting-service/internal/svc"
|
"app-cloudep-tweeting-service/internal/svc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
// Code generated by goctl. DO NOT EDIT.
|
||||||
|
// Source: tweeting.proto
|
||||||
|
|
||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"app-cloudep-tweeting-service/gen_result/pb/tweeting"
|
||||||
|
"app-cloudep-tweeting-service/internal/logic/socialnetworkservice"
|
||||||
|
"app-cloudep-tweeting-service/internal/svc"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SocialNetworkServiceServer struct {
|
||||||
|
svcCtx *svc.ServiceContext
|
||||||
|
tweeting.UnimplementedSocialNetworkServiceServer
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSocialNetworkServiceServer(svcCtx *svc.ServiceContext) *SocialNetworkServiceServer {
|
||||||
|
return &SocialNetworkServiceServer{
|
||||||
|
svcCtx: svcCtx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SocialNetworkServiceServer) AddUserToNetwork(ctx context.Context, in *tweeting.AddUserToNetworkReq) (*tweeting.OKResp, error) {
|
||||||
|
l := socialnetworkservicelogic.NewAddUserToNetworkLogic(ctx, s.svcCtx)
|
||||||
|
return l.AddUserToNetwork(in)
|
||||||
|
}
|
|
@ -7,7 +7,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"app-cloudep-tweeting-service/gen_result/pb/tweeting"
|
"app-cloudep-tweeting-service/gen_result/pb/tweeting"
|
||||||
timelineservicelogic "app-cloudep-tweeting-service/internal/logic/timelineservice"
|
"app-cloudep-tweeting-service/internal/logic/timelineservice"
|
||||||
"app-cloudep-tweeting-service/internal/svc"
|
"app-cloudep-tweeting-service/internal/svc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ package svc
|
||||||
import (
|
import (
|
||||||
"app-cloudep-tweeting-service/internal/config"
|
"app-cloudep-tweeting-service/internal/config"
|
||||||
domainRepo "app-cloudep-tweeting-service/internal/domain/repository"
|
domainRepo "app-cloudep-tweeting-service/internal/domain/repository"
|
||||||
|
"app-cloudep-tweeting-service/internal/lib/neo4j"
|
||||||
model "app-cloudep-tweeting-service/internal/model/mongo"
|
model "app-cloudep-tweeting-service/internal/model/mongo"
|
||||||
"app-cloudep-tweeting-service/internal/repository"
|
"app-cloudep-tweeting-service/internal/repository"
|
||||||
|
|
||||||
|
@ -14,11 +15,10 @@ import (
|
||||||
type ServiceContext struct {
|
type ServiceContext struct {
|
||||||
Config config.Config
|
Config config.Config
|
||||||
Validate vi.Validate
|
Validate vi.Validate
|
||||||
|
|
||||||
PostModel model.PostModel
|
PostModel model.PostModel
|
||||||
CommentModel model.CommentModel
|
CommentModel model.CommentModel
|
||||||
|
|
||||||
TimelineRepo domainRepo.TimelineRepository
|
TimelineRepo domainRepo.TimelineRepository
|
||||||
|
SocialNetworkRepository domainRepo.SocialNetworkRepository
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServiceContext(c config.Config) *ServiceContext {
|
func NewServiceContext(c config.Config) *ServiceContext {
|
||||||
|
@ -27,14 +27,27 @@ func NewServiceContext(c config.Config) *ServiceContext {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
neoClient := neo4j.NewNeo4J(&neo4j.Config{
|
||||||
|
URI: c.Neo4J.URI,
|
||||||
|
Username: c.Neo4J.Username,
|
||||||
|
Password: c.Neo4J.Password,
|
||||||
|
MaxConnectionPoolSize: c.Neo4J.MaxConnectionPoolSize,
|
||||||
|
MaxConnectionLifetime: c.Neo4J.MaxConnectionLifetime,
|
||||||
|
ConnectionTimeout: c.Neo4J.ConnectionTimeout,
|
||||||
|
}, neo4j.WithPerformance(), neo4j.WithLogLevel(c.Neo4J.LogLevel))
|
||||||
|
|
||||||
return &ServiceContext{
|
return &ServiceContext{
|
||||||
Config: c,
|
Config: c,
|
||||||
Validate: vi.MustValidator(),
|
Validate: vi.MustValidator(),
|
||||||
PostModel: MustPostModel(c),
|
PostModel: MustPostModel(c),
|
||||||
CommentModel: MustCommentModel(c),
|
CommentModel: MustCommentModel(c),
|
||||||
TimelineRepo: repository.MustGenerateUseCase(repository.TimelineRepositoryParam{
|
TimelineRepo: repository.MustGenerateRepository(repository.TimelineRepositoryParam{
|
||||||
Config: c,
|
Config: c,
|
||||||
Redis: *newRedis,
|
Redis: *newRedis,
|
||||||
}),
|
}),
|
||||||
|
SocialNetworkRepository: repository.MustSocialNetworkRepository(repository.SocialNetworkParam{
|
||||||
|
Config: c,
|
||||||
|
Neo4jClient: neoClient,
|
||||||
|
}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue