feature/fanout #3

Merged
daniel.w merged 11 commits from feature/fanout into main 2024-09-03 11:45:06 +00:00
15 changed files with 464 additions and 84 deletions
Showing only changes of commit f135327805 - Show all commits

View File

@ -15,4 +15,8 @@ Mongo:
TimelineSetting: TimelineSetting:
Expire: 86400 Expire: 86400
MaxLength: 1000 MaxLength: 1000
RedisCluster:
Host: 127.0.0.1:7001
Type: cluster

View File

@ -188,14 +188,20 @@ service CommentService
message GetTimelineReq message GetTimelineReq
{ {
string uid = 1; // ID string uid = 1; // ID
int32 pageIndex = 2; // int64 pageIndex = 2; //
int32 pageSize = 3; // int64 pageSize = 3; //
} }
message GetTimelineResp message FetchTimelineResponse
{ {
repeated PostDetailItem posts = 1; // repeated FetchTimelineItem posts = 1; //
Pager page = 2; // Pager page = 2; //
}
message FetchTimelineItem
{
string post_id = 1;
int64 score = 2;
} }
message AddPostToTimelineReq message AddPostToTimelineReq
@ -207,18 +213,33 @@ message AddPostToTimelineReq
// //
message PostTimelineItem message PostTimelineItem
{ {
string post_id = 1; // ID string post_id = 1; // ID
int64 created_at = 7; // -> 使 int64 created_at = 7; // -> 使
} }
message DoNoMoreDataReq
{
string uid = 1;
}
message HasNoMoreDataResp
{
bool status = 1;
}
// TimelineService // TimelineService
service TimelineService service TimelineService
{ {
// AddPostToTimeline // AddPost
// 1000 1000 // 1000 1000
// //
rpc AddPostToTimeline(GetTimelineReq) returns (OKResp); rpc AddPost(AddPostToTimelineReq) returns (OKResp);
// GetTimeline // FetchTimeline
rpc GetTimeline(GetTimelineReq) returns (GetTimelineResp); rpc FetchTimeline(GetTimelineReq) returns (FetchTimelineResponse);
// SetNoMoreDataFlag
rpc SetNoMoreDataFlag(DoNoMoreDataReq) returns (OKResp);
// HasNoMoreData
rpc HasNoMoreData(DoNoMoreDataReq) returns (HasNoMoreDataResp);
// ClearNoMoreDataFlag "NoMoreData"
rpc ClearNoMoreDataFlag(DoNoMoreDataReq) returns (OKResp);
} }

View File

@ -1,6 +1,9 @@
package config package config
import "github.com/zeromicro/go-zero/zrpc" import (
"github.com/zeromicro/go-zero/core/stores/redis"
"github.com/zeromicro/go-zero/zrpc"
)
type Config struct { type Config struct {
zrpc.RpcServerConf zrpc.RpcServerConf
@ -18,4 +21,7 @@ type Config struct {
Expire int64 // Second Expire int64 // Second
MaxLength int64 // 暫存筆數 MaxLength int64 // 暫存筆數
} }
// Redis Cluster
RedisCluster redis.RedisConf
} }

View File

@ -34,7 +34,11 @@ const (
) )
const ( const (
TimeLineErrorCode ErrorCode = iota + 20 AddTimeLineErrorCode ErrorCode = iota + 20
FetchTimeLineErrorCode
ClearNoMoreDataErrorCode
HasNoMoreDataErrorCode
SetNoMoreDataErrorCode
) )
func CommentError(ec ErrorCode, s ...string) *ers.LibError { func CommentError(ec ErrorCode, s ...string) *ers.LibError {

View File

@ -0,0 +1,76 @@
package timelineservicelogic
import (
"app-cloudep-tweeting-service/internal/domain"
"app-cloudep-tweeting-service/internal/domain/repository"
"context"
ers "code.30cm.net/digimon/library-go/errs"
"app-cloudep-tweeting-service/gen_result/pb/tweeting"
"app-cloudep-tweeting-service/internal/svc"
"github.com/zeromicro/go-zero/core/logx"
)
type AddPostLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewAddPostLogic(ctx context.Context, svcCtx *svc.ServiceContext) *AddPostLogic {
return &AddPostLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
type addPostReq struct {
UID string `json:"uid" validate:"required"`
}
// AddPost 加入貼文,只管一股腦全塞,這裡會自動判斷
func (l *AddPostLogic) AddPost(in *tweeting.AddPostToTimelineReq) (*tweeting.OKResp, error) {
// 驗證資料
if err := l.svcCtx.Validate.ValidateAll(&addPostReq{
UID: in.GetUid(),
}); err != nil {
// 錯誤代碼 05-011-00
return nil, ers.InvalidFormat(err.Error())
}
if len(in.GetPosts()) == 0 {
// 沒資料,直接 OK
return &tweeting.OKResp{}, nil
}
post := make([]repository.TimelineItem, 0, len(in.GetPosts()))
for _, item := range in.GetPosts() {
post = append(post, repository.TimelineItem{
PostID: item.PostId,
Score: item.CreatedAt,
})
}
err := l.svcCtx.TimelineRepo.AddPost(l.ctx, repository.AddPostRequest{
UID: in.GetUid(),
PostItems: post,
})
if err != nil {
// 錯誤代碼 05-021-20
e := domain.CommentErrorL(
domain.AddTimeLineErrorCode,
logx.WithContext(l.ctx),
[]logx.LogField{
{Key: "req", Value: in},
{Key: "func", Value: "TimelineRepo.AddPost"},
{Key: "err", Value: err},
},
"failed to insert timeline repo :", in.GetUid()).Wrap(err)
return nil, e
}
return &tweeting.OKResp{}, nil
}

View File

@ -1,31 +0,0 @@
package timelineservicelogic
import (
"context"
"app-cloudep-tweeting-service/gen_result/pb/tweeting"
"app-cloudep-tweeting-service/internal/svc"
"github.com/zeromicro/go-zero/core/logx"
)
type AddPostToTimelineLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewAddPostToTimelineLogic(ctx context.Context, svcCtx *svc.ServiceContext) *AddPostToTimelineLogic {
return &AddPostToTimelineLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
// AddPostToTimeline 加入貼文,只管一股腦全塞,這裡會自動判斷
func (l *AddPostToTimelineLogic) AddPostToTimeline(in *tweeting.GetTimelineReq) (*tweeting.OKResp, error) {
// todo: add your logic here and delete this line
return &tweeting.OKResp{}, nil
}

View File

@ -0,0 +1,59 @@
package timelineservicelogic
import (
"app-cloudep-tweeting-service/internal/domain"
"context"
ers "code.30cm.net/digimon/library-go/errs"
"app-cloudep-tweeting-service/gen_result/pb/tweeting"
"app-cloudep-tweeting-service/internal/svc"
"github.com/zeromicro/go-zero/core/logx"
)
type ClearNoMoreDataFlagLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewClearNoMoreDataFlagLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ClearNoMoreDataFlagLogic {
return &ClearNoMoreDataFlagLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
type clearNoMoreDataFlagReq struct {
UID string `json:"uid" validate:"required"`
}
// ClearNoMoreDataFlag 清除時間線的 "NoMoreData" 標誌。
func (l *ClearNoMoreDataFlagLogic) ClearNoMoreDataFlag(in *tweeting.DoNoMoreDataReq) (*tweeting.OKResp, error) {
// 驗證資料
if err := l.svcCtx.Validate.ValidateAll(&clearNoMoreDataFlagReq{
UID: in.GetUid(),
}); err != nil {
// 錯誤代碼 05-011-00
return nil, ers.InvalidFormat(err.Error())
}
err := l.svcCtx.TimelineRepo.ClearNoMoreDataFlag(l.ctx, in.GetUid())
if err != nil {
// 錯誤代碼 05-021-22
e := domain.CommentErrorL(
domain.ClearNoMoreDataErrorCode,
logx.WithContext(l.ctx),
[]logx.LogField{
{Key: "req", Value: in},
{Key: "func", Value: "TimelineRepo.ClearNoMoreDataFlag"},
{Key: "err", Value: err},
},
"failed to clear no more data flag timeline repo :", in.GetUid()).Wrap(err)
return nil, e
}
return &tweeting.OKResp{}, nil
}

View File

@ -0,0 +1,83 @@
package timelineservicelogic
import (
"app-cloudep-tweeting-service/internal/domain"
"app-cloudep-tweeting-service/internal/domain/repository"
"context"
ers "code.30cm.net/digimon/library-go/errs"
"app-cloudep-tweeting-service/gen_result/pb/tweeting"
"app-cloudep-tweeting-service/internal/svc"
"github.com/zeromicro/go-zero/core/logx"
)
type FetchTimelineLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewFetchTimelineLogic(ctx context.Context, svcCtx *svc.ServiceContext) *FetchTimelineLogic {
return &FetchTimelineLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
type fetchTimelineReq struct {
UID string `json:"uid" validate:"required"`
PageSize int64 `json:"page_size" validate:"required"`
PageIndex int64 `json:"page_index" validate:"required"`
}
// FetchTimeline 取得這個人的動態時報
func (l *FetchTimelineLogic) FetchTimeline(in *tweeting.GetTimelineReq) (*tweeting.FetchTimelineResponse, error) {
// 驗證資料
if err := l.svcCtx.Validate.ValidateAll(&fetchTimelineReq{
UID: in.GetUid(),
PageSize: in.PageSize,
PageIndex: in.PageIndex,
}); err != nil {
// 錯誤代碼 05-011-00
return nil, ers.InvalidFormat(err.Error())
}
resp, err := l.svcCtx.TimelineRepo.FetchTimeline(l.ctx, repository.FetchTimelineRequest{
UID: in.GetUid(),
PageIndex: in.GetPageIndex(),
PageSize: in.GetPageSize(),
})
if err != nil {
// 錯誤代碼 05-021-21
e := domain.CommentErrorL(
domain.FetchTimeLineErrorCode,
logx.WithContext(l.ctx),
[]logx.LogField{
{Key: "req", Value: in},
{Key: "func", Value: "TimelineRepo.FetchTimeline"},
{Key: "err", Value: err},
},
"failed to fetch timeline repo :", in.GetUid()).Wrap(err)
return nil, e
}
result := make([]*tweeting.FetchTimelineItem, 0, resp.Page.Size)
for _, item := range resp.Items {
result = append(result, &tweeting.FetchTimelineItem{
PostId: item.PostID,
Score: item.Score,
})
}
return &tweeting.FetchTimelineResponse{
Posts: result,
Page: &tweeting.Pager{
Total: resp.Page.Total,
Index: resp.Page.Index,
Size: resp.Page.Size,
},
}, nil
}

View File

@ -1,31 +0,0 @@
package timelineservicelogic
import (
"context"
"app-cloudep-tweeting-service/gen_result/pb/tweeting"
"app-cloudep-tweeting-service/internal/svc"
"github.com/zeromicro/go-zero/core/logx"
)
type GetTimelineLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewGetTimelineLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetTimelineLogic {
return &GetTimelineLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
// GetTimeline 取得這個人的動態時報
func (l *GetTimelineLogic) GetTimeline(in *tweeting.GetTimelineReq) (*tweeting.GetTimelineResp, error) {
// todo: add your logic here and delete this line
return &tweeting.GetTimelineResp{}, nil
}

View File

@ -0,0 +1,61 @@
package timelineservicelogic
import (
"app-cloudep-tweeting-service/internal/domain"
"context"
ers "code.30cm.net/digimon/library-go/errs"
"app-cloudep-tweeting-service/gen_result/pb/tweeting"
"app-cloudep-tweeting-service/internal/svc"
"github.com/zeromicro/go-zero/core/logx"
)
type HasNoMoreDataLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewHasNoMoreDataLogic(ctx context.Context, svcCtx *svc.ServiceContext) *HasNoMoreDataLogic {
return &HasNoMoreDataLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
type hasNoMoreDataReq struct {
UID string `json:"uid" validate:"required"`
}
// HasNoMoreData 檢查時間線是否已完整,決定是否需要查詢資料庫。
func (l *HasNoMoreDataLogic) HasNoMoreData(in *tweeting.DoNoMoreDataReq) (*tweeting.HasNoMoreDataResp, error) {
// 驗證資料
if err := l.svcCtx.Validate.ValidateAll(&hasNoMoreDataReq{
UID: in.GetUid(),
}); err != nil {
// 錯誤代碼 05-011-00
return nil, ers.InvalidFormat(err.Error())
}
res, err := l.svcCtx.TimelineRepo.HasNoMoreData(l.ctx, in.GetUid())
if err != nil {
// 錯誤代碼 05-021-23
e := domain.CommentErrorL(
domain.HasNoMoreDataErrorCode,
logx.WithContext(l.ctx),
[]logx.LogField{
{Key: "req", Value: in},
{Key: "func", Value: "TimelineRepo.HasNoMoreData"},
{Key: "err", Value: err},
},
"failed to get no more data flag:", in.GetUid()).Wrap(err)
return nil, e
}
return &tweeting.HasNoMoreDataResp{
Status: res,
}, nil
}

View File

@ -0,0 +1,59 @@
package timelineservicelogic
import (
"app-cloudep-tweeting-service/internal/domain"
"context"
ers "code.30cm.net/digimon/library-go/errs"
"app-cloudep-tweeting-service/gen_result/pb/tweeting"
"app-cloudep-tweeting-service/internal/svc"
"github.com/zeromicro/go-zero/core/logx"
)
type SetNoMoreDataFlagLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewSetNoMoreDataFlagLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SetNoMoreDataFlagLogic {
return &SetNoMoreDataFlagLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
type sasNoMoreDataReq struct {
UID string `json:"uid" validate:"required"`
}
// SetNoMoreDataFlag 標記時間線已完整,避免繼續查詢資料庫。
func (l *SetNoMoreDataFlagLogic) SetNoMoreDataFlag(in *tweeting.DoNoMoreDataReq) (*tweeting.OKResp, error) {
// 驗證資料
if err := l.svcCtx.Validate.ValidateAll(&sasNoMoreDataReq{
UID: in.GetUid(),
}); err != nil {
// 錯誤代碼 05-011-00
return nil, ers.InvalidFormat(err.Error())
}
err := l.svcCtx.TimelineRepo.SetNoMoreDataFlag(l.ctx, in.GetUid())
if err != nil {
// 錯誤代碼 05-021-24
e := domain.CommentErrorL(
domain.SetNoMoreDataErrorCode,
logx.WithContext(l.ctx),
[]logx.LogField{
{Key: "req", Value: in},
{Key: "func", Value: "TimelineRepo.SetNoMoreDataErrorCode"},
{Key: "err", Value: err},
},
"failed to set no more data flag:", in.GetUid()).Wrap(err)
return nil, e
}
return &tweeting.OKResp{}, nil
}

View File

@ -109,6 +109,40 @@ func TestAddPost(t *testing.T) {
}, },
expectErr: true, expectErr: true,
}, },
{
name: "duplicate Key",
action: func(t *testing.T, repo repository.TimelineRepository) error {
ctx := context.Background()
uid := "OOOOODUP"
// 第一次插入
err := repo.AddPost(ctx, repository.AddPostRequest{
UID: uid,
PostItems: []repository.TimelineItem{
{PostID: "post1", Score: 100},
},
})
if err != nil {
return err
}
// 第二次插入,使用相同的 PostID 但不同的 Score
return repo.AddPost(ctx, repository.AddPostRequest{
UID: uid,
PostItems: []repository.TimelineItem{
{PostID: "post1", Score: 200},
},
})
},
expectErr: false,
validate: func(t *testing.T, r1 *miniredis.Miniredis) {
uid := "OOOOODUP"
key := domain.TimelineRedisKey.With(uid).ToString()
score, err := r1.ZScore(key, "post1")
assert.NoError(t, err)
assert.Equal(t, float64(200), score) // 應該是第二次插入的分數
},
},
} }
for _, tt := range tests { for _, tt := range tests {

View File

@ -22,14 +22,32 @@ func NewTimelineServiceServer(svcCtx *svc.ServiceContext) *TimelineServiceServer
} }
} }
// AddPostToTimeline 加入貼文,只管一股腦全塞,這裡會自動判斷 // AddPost 加入貼文,只管一股腦全塞,這裡會自動判斷
func (s *TimelineServiceServer) AddPostToTimeline(ctx context.Context, in *tweeting.GetTimelineReq) (*tweeting.OKResp, error) { func (s *TimelineServiceServer) AddPost(ctx context.Context, in *tweeting.AddPostToTimelineReq) (*tweeting.OKResp, error) {
l := timelineservicelogic.NewAddPostToTimelineLogic(ctx, s.svcCtx) l := timelineservicelogic.NewAddPostLogic(ctx, s.svcCtx)
return l.AddPostToTimeline(in) return l.AddPost(in)
} }
// GetTimeline 取得這個人的動態時報 // FetchTimeline 取得這個人的動態時報
func (s *TimelineServiceServer) GetTimeline(ctx context.Context, in *tweeting.GetTimelineReq) (*tweeting.GetTimelineResp, error) { func (s *TimelineServiceServer) FetchTimeline(ctx context.Context, in *tweeting.GetTimelineReq) (*tweeting.FetchTimelineResponse, error) {
l := timelineservicelogic.NewGetTimelineLogic(ctx, s.svcCtx) l := timelineservicelogic.NewFetchTimelineLogic(ctx, s.svcCtx)
return l.GetTimeline(in) return l.FetchTimeline(in)
}
// SetNoMoreDataFlag 標記時間線已完整,避免繼續查詢資料庫。
func (s *TimelineServiceServer) SetNoMoreDataFlag(ctx context.Context, in *tweeting.DoNoMoreDataReq) (*tweeting.OKResp, error) {
l := timelineservicelogic.NewSetNoMoreDataFlagLogic(ctx, s.svcCtx)
return l.SetNoMoreDataFlag(in)
}
// HasNoMoreData 檢查時間線是否已完整,決定是否需要查詢資料庫。
func (s *TimelineServiceServer) HasNoMoreData(ctx context.Context, in *tweeting.DoNoMoreDataReq) (*tweeting.HasNoMoreDataResp, error) {
l := timelineservicelogic.NewHasNoMoreDataLogic(ctx, s.svcCtx)
return l.HasNoMoreData(in)
}
// ClearNoMoreDataFlag 清除時間線的 "NoMoreData" 標誌。
func (s *TimelineServiceServer) ClearNoMoreDataFlag(ctx context.Context, in *tweeting.DoNoMoreDataReq) (*tweeting.OKResp, error) {
l := timelineservicelogic.NewClearNoMoreDataFlagLogic(ctx, s.svcCtx)
return l.ClearNoMoreDataFlag(in)
} }

View File

@ -2,7 +2,11 @@ package svc
import ( import (
"app-cloudep-tweeting-service/internal/config" "app-cloudep-tweeting-service/internal/config"
domainRepo "app-cloudep-tweeting-service/internal/domain/repository"
model "app-cloudep-tweeting-service/internal/model/mongo" model "app-cloudep-tweeting-service/internal/model/mongo"
"app-cloudep-tweeting-service/internal/repository"
"github.com/zeromicro/go-zero/core/stores/redis"
vi "code.30cm.net/digimon/library-go/validator" vi "code.30cm.net/digimon/library-go/validator"
) )
@ -13,13 +17,24 @@ type ServiceContext struct {
PostModel model.PostModel PostModel model.PostModel
CommentModel model.CommentModel CommentModel model.CommentModel
TimelineRepo domainRepo.TimelineRepository
} }
func NewServiceContext(c config.Config) *ServiceContext { func NewServiceContext(c config.Config) *ServiceContext {
newRedis, err := redis.NewRedis(c.RedisCluster, redis.Cluster())
if err != nil {
panic(err)
}
return &ServiceContext{ return &ServiceContext{
Config: c, Config: c,
Validate: vi.MustValidator(), Validate: vi.MustValidator(),
PostModel: MustPostModel(c), PostModel: MustPostModel(c),
CommentModel: MustCommentModel(c), CommentModel: MustCommentModel(c),
TimelineRepo: repository.MustGenerateUseCase(repository.TimelineRepositoryParam{
Config: c,
Redis: *newRedis,
}),
} }
} }

View File

@ -18,6 +18,8 @@ import (
var configFile = flag.String("f", "etc/tweeting.yaml", "the config file") var configFile = flag.String("f", "etc/tweeting.yaml", "the config file")
// TODO 要把每一個錯誤代碼修改的更詳細,目前都資料庫錯誤
func main() { func main() {
flag.Parse() flag.Parse()