From f9c90a90f5e2b2f715c315d46246b330609a023e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=80=A7=E9=A9=8A?= Date: Tue, 20 Aug 2024 15:33:52 +0000 Subject: [PATCH] feature/worker_pools (#5) Co-authored-by: daniel.w Reviewed-on: https://code.30cm.net/digimon/library-go/pulls/5 --- .golangci.yaml | 140 ++++++++++++++++++++++++++ Makefile | 9 ++ errors/code/define.go | 2 +- errors/easy_func.go | 167 +++++++++++++++++++------------- errors/easy_func_test.go | 9 +- errors/errors.go | 49 ++++++---- errors/errors_test.go | 67 ++++++------- go.work | 7 +- validator/validate.go | 1 + worker_pool/go.mod | 15 +++ worker_pool/worker_pool.go | 71 ++++++++++++++ worker_pool/worker_pool_test.go | 89 +++++++++++++++++ 12 files changed, 496 insertions(+), 130 deletions(-) create mode 100644 .golangci.yaml create mode 100644 Makefile create mode 100644 worker_pool/go.mod create mode 100644 worker_pool/worker_pool.go create mode 100644 worker_pool/worker_pool_test.go diff --git a/.golangci.yaml b/.golangci.yaml new file mode 100644 index 0000000..5518484 --- /dev/null +++ b/.golangci.yaml @@ -0,0 +1,140 @@ +run: + timeout: 3m + # Exit code when at least one issue was found. + # Default: 1 + issues-exit-code: 2 + # Include test files or not. + # Default: true + tests: false + +# Reference URL: https://golangci-lint.run/usage/linters/ +linters: + # Disable everything by default so upgrades to not include new - default + # enabled- linters. + disable-all: true + # Specifically enable linters we want to use. + enable: + # - depguard + - errcheck + # - godot + - gofmt + - goimports + - gosimple + - govet + - ineffassign + - misspell + - revive + # - staticcheck + - typecheck + - unused + # - wsl + - asasalint + - asciicheck + - bidichk + - bodyclose + # - containedctx + - contextcheck + # - cyclop + # - varnamelen + # - gci + - wastedassign + - whitespace + # - wrapcheck + - thelper + - tparallel + - unconvert + - unparam + - usestdlibvars + - tenv + - testableexamples + - stylecheck + - sqlclosecheck + - nosprintfhostport + - paralleltest + - prealloc + - predeclared + - promlinter + - reassign + - rowserrcheck + - nakedret + - nestif + - nilerr + - nilnil + - nlreturn + - noctx + - nolintlint + - nonamedreturns + - decorder + - dogsled + # - dupl + - dupword + - durationcheck + - errchkjson + - errname + - errorlint + # - execinquery + - exhaustive + - exportloopref + - forbidigo + - forcetypeassert + # - gochecknoglobals + - gochecknoinits + - gocognit + - goconst + - gocritic + - gocyclo + # - godox + # - goerr113 + # - gofumpt + - goheader + - gomoddirectives + # - gomodguard always failed + - goprintffuncname + - gosec + - grouper + - importas + - interfacebloat + # - ireturn + - lll + - loggercheck + - maintidx + - makezero + +issues: + exclude-rules: + - path: _test\.go + linters: + - funlen + - goconst + - interfacer + - dupl + - lll + - goerr113 + - errcheck + - gocritic + - cyclop + - wrapcheck + - gocognit + - contextcheck + +linters-settings: + gci: + sections: + - standard # Standard section: captures all standard packages. + - default # Default section: contains all imports that could not be matched to another section type. + gocognit: + # Minimal code complexity to report. + # Default: 30 (but we recommend 10-20) + min-complexity: 40 + nestif: + # Minimal complexity of if statements to report. + # Default: 5 + min-complexity: 10 + lll: + # Max line length, lines longer will be reported. + # '\t' is counted as 1 character by default, and can be changed with the tab-width option. + # Default: 120. + line-length: 200 + # Tab width in spaces. + # Default: 1 + tab-width: 1 diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f3676cd --- /dev/null +++ b/Makefile @@ -0,0 +1,9 @@ +.PHONY: test +test: # 進行測試 + go test -v --cover ./... + +.PHONY: fmt +fmt: # 格式優化 + $(GOFMT) -w $(GOFILES) + goimports -w ./ + diff --git a/errors/code/define.go b/errors/code/define.go index bc36826..daa7ad0 100644 --- a/errors/code/define.go +++ b/errors/code/define.go @@ -95,5 +95,5 @@ const ( const ( _ = iota + CatArk ArkInternal - ArkHttp400 + ArkHTTP400 ) diff --git a/errors/easy_func.go b/errors/easy_func.go index d305435..db28447 100644 --- a/errors/easy_func.go +++ b/errors/easy_func.go @@ -1,20 +1,21 @@ -package error +package errors import ( - "code.30cm.net/digimon/library-go/errors/code" "errors" "fmt" "strings" + "code.30cm.net/digimon/library-go/errors/code" + "github.com/zeromicro/go-zero/core/logx" - _ "github.com/zeromicro/go-zero/core/logx" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) -func newErr(scope, detail uint32, msg string) *Err { +func newErr(scope, detail uint32, msg string) *LibError { cat := detail / 100 * 100 - return &Err{ + + return &LibError{ category: cat, code: detail, scope: scope, @@ -22,8 +23,8 @@ func newErr(scope, detail uint32, msg string) *Err { } } -func newBuiltinGRPCErr(scope, detail uint32, msg string) *Err { - return &Err{ +func newBuiltinGRPCErr(scope, detail uint32, msg string) *LibError { + return &LibError{ category: code.CatGRPC, code: detail, scope: scope, @@ -34,12 +35,12 @@ func newBuiltinGRPCErr(scope, detail uint32, msg string) *Err { // FromError tries to let error as Err // it supports to unwrap error that has Err // return nil if failed to transfer -func FromError(err error) *Err { +func FromError(err error) *LibError { if err == nil { return nil } - var e *Err + var e *LibError if errors.As(err, &e) { return e } @@ -52,10 +53,11 @@ func FromError(err error) *Err { // 12 represents Scope // 03 represents Category // 14 represents Detail error code -func FromCode(code uint32) *Err { +func FromCode(code uint32) *LibError { scope := code / 10000 detail := code % 10000 - return &Err{ + + return &LibError{ category: detail / 100 * 100, code: detail, scope: scope, @@ -65,7 +67,7 @@ func FromCode(code uint32) *Err { // FromGRPCError transfer error to Err // useful for gRPC client -func FromGRPCError(err error) *Err { +func FromGRPCError(err error) *LibError { s, _ := status.FromError(err) e := FromCode(uint32(s.Code())) e.msg = s.Message() @@ -81,311 +83,335 @@ func FromGRPCError(err error) *Err { // Deprecated: check GRPCStatus() in Errs struct // ToGRPCError returns the status.Status // Useful to return error in gRPC server -func ToGRPCError(e *Err) error { +func ToGRPCError(e *LibError) error { return status.New(codes.Code(e.FullCode()), e.Error()).Err() } /*** System ***/ // SystemTimeoutError returns Err -func SystemTimeoutError(s ...string) *Err { +func SystemTimeoutError(s ...string) *LibError { return newErr(Scope, code.SystemTimeoutError, fmt.Sprintf("system timeout: %s", strings.Join(s, " "))) } // SystemTimeoutErrorL logs error message and returns Err -func SystemTimeoutErrorL(l logx.Logger, s ...string) *Err { +func SystemTimeoutErrorL(l logx.Logger, s ...string) *LibError { e := SystemTimeoutError(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } // SystemInternalError returns Err struct -func SystemInternalError(s ...string) *Err { +func SystemInternalError(s ...string) *LibError { return newErr(Scope, code.SystemInternalError, fmt.Sprintf("internal error: %s", strings.Join(s, " "))) } // SystemInternalErrorL logs error message and returns Err -func SystemInternalErrorL(l logx.Logger, s ...string) *Err { +func SystemInternalErrorL(l logx.Logger, s ...string) *LibError { e := SystemInternalError(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } // SystemMaintainErrorL logs error message and returns Err -func SystemMaintainErrorL(l logx.Logger, s ...string) *Err { +func SystemMaintainErrorL(l logx.Logger, s ...string) *LibError { e := SystemMaintainError(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } // SystemMaintainError returns Err struct -func SystemMaintainError(s ...string) *Err { +func SystemMaintainError(s ...string) *LibError { return newErr(Scope, code.SystemMaintainError, fmt.Sprintf("service under maintenance: %s", strings.Join(s, " "))) } /*** CatInput ***/ // InvalidFormat returns Err struct -func InvalidFormat(s ...string) *Err { +func InvalidFormat(s ...string) *LibError { return newErr(Scope, code.InvalidFormat, fmt.Sprintf("invalid format: %s", strings.Join(s, " "))) } // InvalidFormatL logs error message and returns Err -func InvalidFormatL(l logx.Logger, s ...string) *Err { +func InvalidFormatL(l logx.Logger, s ...string) *LibError { e := InvalidFormat(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } // InvalidRange returns Err struct -func InvalidRange(s ...string) *Err { +func InvalidRange(s ...string) *LibError { return newErr(Scope, code.InvalidRange, fmt.Sprintf("invalid range: %s", strings.Join(s, " "))) } // InvalidRangeL logs error message and returns Err -func InvalidRangeL(l logx.Logger, s ...string) *Err { +func InvalidRangeL(l logx.Logger, s ...string) *LibError { e := InvalidRange(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } // NotValidImplementation returns Err struct -func NotValidImplementation(s ...string) *Err { +func NotValidImplementation(s ...string) *LibError { return newErr(Scope, code.NotValidImplementation, fmt.Sprintf("not valid implementation: %s", strings.Join(s, " "))) } // NotValidImplementationL logs error message and returns Err -func NotValidImplementationL(l logx.Logger, s ...string) *Err { +func NotValidImplementationL(l logx.Logger, s ...string) *LibError { e := NotValidImplementation(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } /*** CatDB ***/ // DBError returns Err -func DBError(s ...string) *Err { +func DBError(s ...string) *LibError { return newErr(Scope, code.DBError, fmt.Sprintf("db error: %s", strings.Join(s, " "))) } // DBErrorL logs error message and returns Err -func DBErrorL(l logx.Logger, s ...string) *Err { +func DBErrorL(l logx.Logger, s ...string) *LibError { e := DBError(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } // DBDataConvert returns Err -func DBDataConvert(s ...string) *Err { +func DBDataConvert(s ...string) *LibError { return newErr(Scope, code.DBDataConvert, fmt.Sprintf("data from db convert error: %s", strings.Join(s, " "))) } // DBDataConvertL logs error message and returns Err -func DBDataConvertL(l logx.Logger, s ...string) *Err { +func DBDataConvertL(l logx.Logger, s ...string) *LibError { e := DBDataConvert(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } // DBDuplicate returns Err -func DBDuplicate(s ...string) *Err { +func DBDuplicate(s ...string) *LibError { return newErr(Scope, code.DBDuplicate, fmt.Sprintf("data Duplicate key error: %s", strings.Join(s, " "))) } // DBDuplicateL logs error message and returns Err -func DBDuplicateL(l logx.Logger, s ...string) *Err { +func DBDuplicateL(l logx.Logger, s ...string) *LibError { e := DBDuplicate(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } /*** CatResource ***/ // ResourceNotFound returns Err and logging -func ResourceNotFound(s ...string) *Err { +func ResourceNotFound(s ...string) *LibError { return newErr(Scope, code.ResourceNotFound, fmt.Sprintf("resource not found: %s", strings.Join(s, " "))) } // ResourceNotFoundL logs error message and returns Err -func ResourceNotFoundL(l logx.Logger, s ...string) *Err { +func ResourceNotFoundL(l logx.Logger, s ...string) *LibError { e := ResourceNotFound(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } // InvalidResourceFormat returns Err -func InvalidResourceFormat(s ...string) *Err { +func InvalidResourceFormat(s ...string) *LibError { return newErr(Scope, code.InvalidResourceFormat, fmt.Sprintf("invalid resource format: %s", strings.Join(s, " "))) } // InvalidResourceFormatL logs error message and returns Err -func InvalidResourceFormatL(l logx.Logger, s ...string) *Err { +func InvalidResourceFormatL(l logx.Logger, s ...string) *LibError { e := InvalidResourceFormat(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } // InvalidResourceState returns status not correct. // for example: company should be destroy, agent should be no-sensor/fail-install ... -func InvalidResourceState(s ...string) *Err { +func InvalidResourceState(s ...string) *LibError { return newErr(Scope, code.InvalidResourceState, fmt.Sprintf("invalid resource state: %s", strings.Join(s, " "))) } // InvalidResourceStateL logs error message and returns status not correct. -func InvalidResourceStateL(l logx.Logger, s ...string) *Err { +func InvalidResourceStateL(l logx.Logger, s ...string) *LibError { e := InvalidResourceState(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } -func ResourceInsufficient(s ...string) *Err { +func ResourceInsufficient(s ...string) *LibError { return newErr(Scope, code.ResourceInsufficient, fmt.Sprintf("insufficient resource: %s", strings.Join(s, " "))) } -func ResourceInsufficientL(l logx.Logger, s ...string) *Err { +func ResourceInsufficientL(l logx.Logger, s ...string) *LibError { e := ResourceInsufficient(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } // InsufficientPermission returns Err -func InsufficientPermission(s ...string) *Err { +func InsufficientPermission(s ...string) *LibError { return newErr(Scope, code.InsufficientPermission, fmt.Sprintf("insufficient permission: %s", strings.Join(s, " "))) } // InsufficientPermissionL returns Err and log -func InsufficientPermissionL(l logx.Logger, s ...string) *Err { +func InsufficientPermissionL(l logx.Logger, s ...string) *LibError { e := InsufficientPermission(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } // ResourceAlreadyExist returns Err -func ResourceAlreadyExist(s ...string) *Err { +func ResourceAlreadyExist(s ...string) *LibError { return newErr(Scope, code.ResourceAlreadyExist, fmt.Sprintf("resource already exist: %s", strings.Join(s, " "))) } // ResourceAlreadyExistL logs error message and returns Err -func ResourceAlreadyExistL(l logx.Logger, s ...string) *Err { +func ResourceAlreadyExistL(l logx.Logger, s ...string) *LibError { e := ResourceAlreadyExist(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } // InvalidMeasurementID returns Err -func InvalidMeasurementID(s ...string) *Err { +func InvalidMeasurementID(s ...string) *LibError { return newErr(Scope, code.InvalidMeasurementID, fmt.Sprintf("missing measurement id: %s", strings.Join(s, " "))) } // InvalidMeasurementIDL logs error message and returns Err -func InvalidMeasurementIDL(l logx.Logger, s ...string) *Err { +func InvalidMeasurementIDL(l logx.Logger, s ...string) *LibError { e := InvalidMeasurementID(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } // ResourceExpired returns Err -func ResourceExpired(s ...string) *Err { +func ResourceExpired(s ...string) *LibError { return newErr(Scope, code.ResourceExpired, fmt.Sprintf("resource expired: %s", strings.Join(s, " "))) } // ResourceExpiredL logs error message and returns Err -func ResourceExpiredL(l logx.Logger, s ...string) *Err { +func ResourceExpiredL(l logx.Logger, s ...string) *LibError { e := ResourceExpired(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } // ResourceMigrated returns Err -func ResourceMigrated(s ...string) *Err { +func ResourceMigrated(s ...string) *LibError { return newErr(Scope, code.ResourceMigrated, fmt.Sprintf("resource migrated: %s", strings.Join(s, " "))) } // ResourceMigratedL logs error message and returns Err -func ResourceMigratedL(l logx.Logger, s ...string) *Err { +func ResourceMigratedL(l logx.Logger, s ...string) *LibError { e := ResourceMigrated(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } // InsufficientQuota returns Err -func InsufficientQuota(s ...string) *Err { +func InsufficientQuota(s ...string) *LibError { return newErr(Scope, code.InsufficientQuota, fmt.Sprintf("insufficient quota: %s", strings.Join(s, " "))) } // InsufficientQuotaL logs error message and returns Err -func InsufficientQuotaL(l logx.Logger, s ...string) *Err { +func InsufficientQuotaL(l logx.Logger, s ...string) *LibError { e := InsufficientQuota(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } /*** CatAuth ***/ // Unauthorized returns Err -func Unauthorized(s ...string) *Err { +func Unauthorized(s ...string) *LibError { return newErr(Scope, code.Unauthorized, fmt.Sprintf("unauthorized: %s", strings.Join(s, " "))) } // UnauthorizedL logs error message and returns Err -func UnauthorizedL(l logx.Logger, s ...string) *Err { +func UnauthorizedL(l logx.Logger, s ...string) *LibError { e := Unauthorized(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } // AuthExpired returns Err -func AuthExpired(s ...string) *Err { +func AuthExpired(s ...string) *LibError { return newErr(Scope, code.AuthExpired, fmt.Sprintf("expired: %s", strings.Join(s, " "))) } // AuthExpiredL logs error message and returns Err -func AuthExpiredL(l logx.Logger, s ...string) *Err { +func AuthExpiredL(l logx.Logger, s ...string) *LibError { e := AuthExpired(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } // InvalidPosixTime returns Err -func InvalidPosixTime(s ...string) *Err { +func InvalidPosixTime(s ...string) *LibError { return newErr(Scope, code.InvalidPosixTime, fmt.Sprintf("invalid posix time: %s", strings.Join(s, " "))) } // InvalidPosixTimeL logs error message and returns Err -func InvalidPosixTimeL(l logx.Logger, s ...string) *Err { +func InvalidPosixTimeL(l logx.Logger, s ...string) *LibError { e := InvalidPosixTime(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } // SigAndPayloadNotMatched returns Err -func SigAndPayloadNotMatched(s ...string) *Err { +func SigAndPayloadNotMatched(s ...string) *LibError { return newErr(Scope, code.SigAndPayloadNotMatched, fmt.Sprintf("signature and the payload are not match: %s", strings.Join(s, " "))) } // SigAndPayloadNotMatchedL logs error message and returns Err -func SigAndPayloadNotMatchedL(l logx.Logger, s ...string) *Err { +func SigAndPayloadNotMatchedL(l logx.Logger, s ...string) *LibError { e := SigAndPayloadNotMatched(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } // Forbidden returns Err -func Forbidden(s ...string) *Err { +func Forbidden(s ...string) *LibError { return newErr(Scope, code.Forbidden, fmt.Sprintf("forbidden: %s", strings.Join(s, " "))) } // ForbiddenL logs error message and returns Err -func ForbiddenL(l logx.Logger, s ...string) *Err { +func ForbiddenL(l logx.Logger, s ...string) *LibError { e := Forbidden(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } // IsAuthUnauthorizedError check the err is unauthorized error -func IsAuthUnauthorizedError(err *Err) bool { +func IsAuthUnauthorizedError(err *LibError) bool { switch err.Code() { case code.Unauthorized, code.AuthExpired, code.InvalidPosixTime, code.SigAndPayloadNotMatched, code.Forbidden, @@ -399,44 +425,47 @@ func IsAuthUnauthorizedError(err *Err) bool { /*** CatXBC ***/ // ArkInternal returns Err -func ArkInternal(s ...string) *Err { +func ArkInternal(s ...string) *LibError { return newErr(Scope, code.ArkInternal, fmt.Sprintf("ark internal error: %s", strings.Join(s, " "))) } // ArkInternalL logs error message and returns Err -func ArkInternalL(l logx.Logger, s ...string) *Err { +func ArkInternalL(l logx.Logger, s ...string) *LibError { e := ArkInternal(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } /*** CatPubSub ***/ // Publish returns Err -func Publish(s ...string) *Err { +func Publish(s ...string) *LibError { return newErr(Scope, code.Publish, fmt.Sprintf("publish: %s", strings.Join(s, " "))) } // PublishL logs error message and returns Err -func PublishL(l logx.Logger, s ...string) *Err { +func PublishL(l logx.Logger, s ...string) *LibError { e := Publish(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } // Consume returns Err -func Consume(s ...string) *Err { +func Consume(s ...string) *LibError { return newErr(Scope, code.Consume, fmt.Sprintf("consume: %s", strings.Join(s, " "))) } // MsgSizeTooLarge returns Err -func MsgSizeTooLarge(s ...string) *Err { +func MsgSizeTooLarge(s ...string) *LibError { return newErr(Scope, code.MsgSizeTooLarge, fmt.Sprintf("kafka error: %s", strings.Join(s, " "))) } // MsgSizeTooLargeL logs error message and returns Err -func MsgSizeTooLargeL(l logx.Logger, s ...string) *Err { +func MsgSizeTooLargeL(l logx.Logger, s ...string) *LibError { e := MsgSizeTooLarge(s...) l.WithCallerSkip(1).Error(e.Error()) + return e } diff --git a/errors/easy_func_test.go b/errors/easy_func_test.go index 146f696..a3ef45c 100644 --- a/errors/easy_func_test.go +++ b/errors/easy_func_test.go @@ -1,7 +1,6 @@ -package error +package errors import ( - "code.30cm.net/digimon/library-go/errors/code" "context" "errors" "fmt" @@ -9,6 +8,8 @@ import ( "strconv" "testing" + "code.30cm.net/digimon/library-go/errors/code" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/zeromicro/go-zero/core/logx" @@ -74,7 +75,7 @@ func TestFromGRPCError_GivenGeneralError_ShouldReturnErr_Scope0_CatGRPC_DetailGR func TestToGRPCError_GivenErr_StatusShouldHave_Code112233(t *testing.T) { // setup - e := Err{scope: 11, code: 2233, msg: "FAKE MSG"} + e := LibError{scope: 11, code: 2233, msg: "FAKE MSG"} // act err := ToGRPCError(&e) @@ -998,7 +999,7 @@ func TestFromError(t *testing.T) { tests := []struct { name string givenError error - want *Err + want *LibError }{ { "given nil error should return nil", diff --git a/errors/errors.go b/errors/errors.go index 8786848..dec0bf9 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -1,11 +1,12 @@ -package error +package errors import ( - code2 "code.30cm.net/digimon/library-go/errors/code" "errors" "fmt" "net/http" + code2 "code.30cm.net/digimon/library-go/errors/code" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -13,8 +14,8 @@ import ( // Scope 全域變數應由服務或模組設置 var Scope = code2.Unset -// Err 6 碼,服務 2, 大類 2, 詳細錯誤 2 -type Err struct { +// LibError 6 碼,服務 2, 大類 2, 詳細錯誤 2 +type LibError struct { category uint32 code uint32 scope uint32 @@ -24,7 +25,7 @@ type Err struct { // Error 是錯誤的介面 // 私有屬性 "msg" 的 getter 函數 -func (e *Err) Error() string { +func (e *LibError) Error() string { if e == nil { return "" } @@ -39,6 +40,7 @@ func (e *Err) Error() string { if internalErrStr != "" { return fmt.Sprintf("%s: %s", e.msg, internalErrStr) } + return e.msg } @@ -46,19 +48,21 @@ func (e *Err) Error() string { if internalErrStr != "" { return fmt.Sprintf("%s: %s", generalErrStr, internalErrStr) } + return generalErrStr } // Category 私有屬性 "category" 的 getter 函數 -func (e *Err) Category() uint32 { +func (e *LibError) Category() uint32 { if e == nil { return 0 } + return e.category } // Scope 私有屬性 "scope" 的 getter 函數 -func (e *Err) Scope() uint32 { +func (e *LibError) Scope() uint32 { if e == nil { return code2.Unset } @@ -67,7 +71,7 @@ func (e *Err) Scope() uint32 { } // CodeStr 返回帶有零填充的錯誤代碼字串 -func (e *Err) CodeStr() string { +func (e *LibError) CodeStr() string { if e == nil { return "00000" } @@ -80,7 +84,7 @@ func (e *Err) CodeStr() string { } // Code 私有屬性 "code" 的 getter 函數 -func (e *Err) Code() uint32 { +func (e *LibError) Code() uint32 { if e == nil { return code2.OK } @@ -88,7 +92,7 @@ func (e *Err) Code() uint32 { return e.code } -func (e *Err) FullCode() uint32 { +func (e *LibError) FullCode() uint32 { if e == nil { return 0 } @@ -101,7 +105,7 @@ func (e *Err) FullCode() uint32 { } // HTTPStatus 返回對應的 HTTP 狀態碼 -func (e *Err) HTTPStatus() int { +func (e *LibError) HTTPStatus() int { if e == nil || e.Code() == code2.OK { return http.StatusOK } @@ -143,7 +147,7 @@ func (e *Err) HTTPStatus() int { // GeneralError 轉換 category 級別錯誤訊息 // 這是給客戶或 API 調用者的一般錯誤訊息 -func (e *Err) GeneralError() string { +func (e *LibError) GeneralError() string { if e == nil { return "" } @@ -160,34 +164,37 @@ func (e *Err) GeneralError() string { // 除非你非常確定你在做什麼,否則不要直接使用這個函數。 // 請使用 errors.Is 代替。 // 此函數比較兩個錯誤變量是否都是 *Err,並且具有相同的 code(不檢查包裹的內部錯誤) -func (e *Err) Is(f error) bool { - var err *Err +func (e *LibError) Is(f error) bool { + var err *LibError ok := errors.As(f, &err) if !ok { return false } + return e.Code() == err.Code() } // Unwrap 返回底層錯誤 // 解除包裹錯誤的結果本身可能具有 Unwrap 方法; // 我們稱通過反覆解除包裹產生的錯誤序列為錯誤鏈。 -func (e *Err) Unwrap() error { +func (e *LibError) Unwrap() error { if e == nil { return nil } + return e.internalErr } // Wrap 將內部錯誤設置到 Err 結構 -func (e *Err) Wrap(internalErr error) *Err { +func (e *LibError) Wrap(internalErr error) *LibError { if e != nil { e.internalErr = internalErr } + return e } -func (e *Err) GRPCStatus() *status.Status { +func (e *LibError) GRPCStatus() *status.Status { if e == nil { return status.New(codes.OK, "") } @@ -198,8 +205,8 @@ func (e *Err) GRPCStatus() *status.Status { // 工廠函數 // NewErr 創建新的 Err -func NewErr(scope, category, detail uint32, msg string) *Err { - return &Err{ +func NewErr(scope, category, detail uint32, msg string) *LibError { + return &LibError{ category: category, code: detail, scope: scope, @@ -208,8 +215,8 @@ func NewErr(scope, category, detail uint32, msg string) *Err { } // NewGRPCErr 創建新的 gRPC Err -func NewGRPCErr(scope, detail uint32, msg string) *Err { - return &Err{ +func NewGRPCErr(scope, detail uint32, msg string) *LibError { + return &LibError{ category: code2.CatGRPC, code: detail, scope: scope, diff --git a/errors/errors_test.go b/errors/errors_test.go index dc31791..f7bd604 100644 --- a/errors/errors_test.go +++ b/errors/errors_test.go @@ -1,12 +1,13 @@ -package error +package errors import ( - code2 "code.30cm.net/digimon/library-go/errors/code" "errors" "fmt" "net/http" "testing" + code2 "code.30cm.net/digimon/library-go/errors/code" + "github.com/stretchr/testify/assert" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -14,7 +15,7 @@ import ( func TestCode_GivenNilReceiver_CodeReturnOK_CodeStrReturns00000(t *testing.T) { // setup - var e *Err = nil + var e *LibError = nil // act & assert assert.Equal(t, code2.OK, e.Code()) @@ -24,7 +25,7 @@ func TestCode_GivenNilReceiver_CodeReturnOK_CodeStrReturns00000(t *testing.T) { func TestCode_GivenScope99DetailCode6687_ShouldReturn996687(t *testing.T) { // setup - e := Err{scope: 99, code: 6687} + e := LibError{scope: 99, code: 6687} // act & assert assert.Equal(t, uint32(6687), e.Code()) @@ -33,7 +34,7 @@ func TestCode_GivenScope99DetailCode6687_ShouldReturn996687(t *testing.T) { func TestCode_GivenScope0DetailCode87_ShouldReturn87(t *testing.T) { // setup - e := Err{scope: 0, code: 87} + e := LibError{scope: 0, code: 87} // act & assert assert.Equal(t, uint32(87), e.Code()) @@ -87,7 +88,7 @@ func TestErr_ShouldImplementErrorFunction(t *testing.T) { func TestGeneralError_GivenNilErr_ShouldReturnEmptyString(t *testing.T) { // setup - var e *Err = nil + var e *LibError = nil // act & assert assert.Equal(t, "", e.GeneralError()) @@ -95,7 +96,7 @@ func TestGeneralError_GivenNilErr_ShouldReturnEmptyString(t *testing.T) { func TestGeneralError_GivenNotExistCat_ShouldReturnEmptyString(t *testing.T) { // setup - e := Err{category: 123456} + e := LibError{category: 123456} // act & assert assert.Equal(t, "", e.GeneralError()) @@ -103,7 +104,7 @@ func TestGeneralError_GivenNotExistCat_ShouldReturnEmptyString(t *testing.T) { func TestGeneralError_GivenCatDB_ShouldReturnDBError(t *testing.T) { // setup - e := Err{category: code2.CatDB} + e := LibError{category: code2.CatDB} catErrStr := code2.CatToStr[code2.CatDB] // act & assert @@ -112,7 +113,7 @@ func TestGeneralError_GivenCatDB_ShouldReturnDBError(t *testing.T) { func TestError_GivenEmptyMsg_ShouldReturnCatGeneralErrorMessage(t *testing.T) { // setup - e := Err{category: code2.CatDB, msg: ""} + e := LibError{category: code2.CatDB, msg: ""} // act errMsg := e.Error() @@ -123,7 +124,7 @@ func TestError_GivenEmptyMsg_ShouldReturnCatGeneralErrorMessage(t *testing.T) { func TestError_GivenMsg_ShouldReturnGiveMsg(t *testing.T) { // setup - e := Err{msg: "FAKE"} + e := LibError{msg: "FAKE"} // act errMsg := e.Error() @@ -133,7 +134,7 @@ func TestError_GivenMsg_ShouldReturnGiveMsg(t *testing.T) { } func TestIs_GivenNilErr_ShouldReturnFalse(t *testing.T) { - var nilErrs *Err + var nilErrs *LibError // act result := errors.Is(nilErrs, DBError()) result2 := errors.Is(DBError(), nilErrs) @@ -154,7 +155,7 @@ func TestIs_GivenNil_ShouldReturnFalse(t *testing.T) { } func TestIs_GivenNilReceiver_ShouldReturnCorrectResult(t *testing.T) { - var nilErr *Err = nil + var nilErr *LibError = nil // test 1: nilErr != DBError var dbErr error = DBError("fake db error") @@ -165,7 +166,7 @@ func TestIs_GivenNilReceiver_ShouldReturnCorrectResult(t *testing.T) { assert.False(t, nilErr.Is(nilError)) // test 3: nilErr == another nilErr - var nilErr2 *Err = nil + var nilErr2 *LibError = nil assert.True(t, nilErr.Is(nilErr2)) } @@ -198,7 +199,7 @@ func TestIs_GivenDBErrorAssignToErrorType_ShouldReturnTrue(t *testing.T) { func TestWrap_GivenNilErr_ShouldNoPanic(t *testing.T) { // act & assert assert.NotPanics(t, func() { - var e *Err = nil + var e *LibError = nil _ = e.Wrap(fmt.Errorf("test")) }) } @@ -213,19 +214,19 @@ func TestWrap_GivenErrorToWrap_ShouldReturnErrorWithWrappedError(t *testing.T) { } func TestUnwrap_GivenNilErr_ShouldReturnNil(t *testing.T) { - var e *Err = nil + var e *LibError = nil internalErr := e.Unwrap() assert.Nil(t, internalErr) } func TestErrorsIs_GivenNilErr_ShouldReturnFalse(t *testing.T) { - var e *Err = nil + var e *LibError = nil assert.False(t, errors.Is(e, fmt.Errorf("test"))) } func TestErrorsAs_GivenNilErr_ShouldReturnFalse(t *testing.T) { var internalErr *testErr - var e *Err = nil + var e *LibError = nil assert.False(t, errors.As(e, &internalErr)) } @@ -233,7 +234,7 @@ func TestGRPCStatus(t *testing.T) { // setup table driven tests tests := []struct { name string - given *Err + given *LibError expect *status.Status expectConvert error }{ @@ -265,24 +266,24 @@ func TestGRPCStatus(t *testing.T) { func TestErr_HTTPStatus(t *testing.T) { tests := []struct { name string - err *Err + err *LibError want int }{ {name: "nil error", err: nil, want: http.StatusOK}, - {name: "invalid measurement id", err: &Err{category: code2.CatResource, code: code2.InvalidMeasurementID}, want: http.StatusInternalServerError}, - {name: "resource already exists", err: &Err{category: code2.CatResource, code: code2.ResourceAlreadyExist}, want: http.StatusConflict}, - {name: "invalid resource state", err: &Err{category: code2.CatResource, code: code2.InvalidResourceState}, want: http.StatusConflict}, - {name: "invalid posix time", err: &Err{category: code2.CatAuth, code: code2.InvalidPosixTime}, want: http.StatusForbidden}, - {name: "unauthorized", err: &Err{category: code2.CatAuth, code: code2.Unauthorized}, want: http.StatusUnauthorized}, - {name: "db error", err: &Err{category: code2.CatDB, code: code2.DBError}, want: http.StatusInternalServerError}, - {name: "insufficient permission", err: &Err{category: code2.CatResource, code: code2.InsufficientPermission}, want: http.StatusUnauthorized}, - {name: "resource insufficient", err: &Err{category: code2.CatResource, code: code2.ResourceInsufficient}, want: http.StatusBadRequest}, - {name: "invalid format", err: &Err{category: code2.CatInput, code: code2.InvalidFormat}, want: http.StatusBadRequest}, - {name: "resource not found", err: &Err{code: code2.ResourceNotFound}, want: http.StatusNotFound}, - {name: "ok", err: &Err{code: code2.OK}, want: http.StatusOK}, - {name: "not valid implementation", err: &Err{category: code2.CatInput, code: code2.NotValidImplementation}, want: http.StatusNotImplemented}, - {name: "forbidden", err: &Err{category: code2.CatAuth, code: code2.Forbidden}, want: http.StatusForbidden}, - {name: "insufficient quota", err: &Err{category: code2.CatResource, code: code2.InsufficientQuota}, want: http.StatusPaymentRequired}, + {name: "invalid measurement id", err: &LibError{category: code2.CatResource, code: code2.InvalidMeasurementID}, want: http.StatusInternalServerError}, + {name: "resource already exists", err: &LibError{category: code2.CatResource, code: code2.ResourceAlreadyExist}, want: http.StatusConflict}, + {name: "invalid resource state", err: &LibError{category: code2.CatResource, code: code2.InvalidResourceState}, want: http.StatusConflict}, + {name: "invalid posix time", err: &LibError{category: code2.CatAuth, code: code2.InvalidPosixTime}, want: http.StatusForbidden}, + {name: "unauthorized", err: &LibError{category: code2.CatAuth, code: code2.Unauthorized}, want: http.StatusUnauthorized}, + {name: "db error", err: &LibError{category: code2.CatDB, code: code2.DBError}, want: http.StatusInternalServerError}, + {name: "insufficient permission", err: &LibError{category: code2.CatResource, code: code2.InsufficientPermission}, want: http.StatusUnauthorized}, + {name: "resource insufficient", err: &LibError{category: code2.CatResource, code: code2.ResourceInsufficient}, want: http.StatusBadRequest}, + {name: "invalid format", err: &LibError{category: code2.CatInput, code: code2.InvalidFormat}, want: http.StatusBadRequest}, + {name: "resource not found", err: &LibError{code: code2.ResourceNotFound}, want: http.StatusNotFound}, + {name: "ok", err: &LibError{code: code2.OK}, want: http.StatusOK}, + {name: "not valid implementation", err: &LibError{category: code2.CatInput, code: code2.NotValidImplementation}, want: http.StatusNotImplemented}, + {name: "forbidden", err: &LibError{category: code2.CatAuth, code: code2.Forbidden}, want: http.StatusForbidden}, + {name: "insufficient quota", err: &LibError{category: code2.CatResource, code: code2.InsufficientQuota}, want: http.StatusPaymentRequired}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/go.work b/go.work index aa5172a..d9fccaf 100644 --- a/go.work +++ b/go.work @@ -1,4 +1,7 @@ go 1.22.3 -use ./errors -use ./validator +use ( + ./errors + ./validator + ./worker_pool +) diff --git a/validator/validate.go b/validator/validate.go index b94b6b3..d53fb48 100644 --- a/validator/validate.go +++ b/validator/validate.go @@ -2,6 +2,7 @@ package required import ( "fmt" + "github.com/go-playground/validator/v10" "github.com/zeromicro/go-zero/core/logx" diff --git a/worker_pool/go.mod b/worker_pool/go.mod new file mode 100644 index 0000000..87d2917 --- /dev/null +++ b/worker_pool/go.mod @@ -0,0 +1,15 @@ +module code.30cm.net/digimon/library-go/worker_pool + +go 1.22.3 + +require ( + github.com/panjf2000/ants/v2 v2.10.0 + github.com/stretchr/testify v1.8.2 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/sync v0.3.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/worker_pool/worker_pool.go b/worker_pool/worker_pool.go new file mode 100644 index 0000000..79c1f19 --- /dev/null +++ b/worker_pool/worker_pool.go @@ -0,0 +1,71 @@ +package workerpool + +import ( + "sync" + + "github.com/panjf2000/ants/v2" +) + +const defaultWorkerPoolSize = 2000 + +type WorkerPool interface { + Submit(task func()) error + SubmitAndWaitAll(tasks ...func() error) (taskErr chan error, submitErr error) +} + +type workerPool struct { + p *ants.Pool +} + +func NewWorkerPool(size int) WorkerPool { + if size <= 0 { + size = defaultWorkerPoolSize + } + + p, err := ants.NewPool( + size, + ants.WithDisablePurge(true), + ) + if err != nil { + return &workerPool{p: nil} + } + + return &workerPool{p: p} +} + +func (p *workerPool) Submit(task func()) error { + if p.p == nil { + return ants.Submit(task) + } + + return p.p.Submit(task) +} + +func (p *workerPool) SubmitAndWaitAll(tasks ...func() error) (chan error, error) { + taskErrCh := make(chan error, len(tasks)) + submitErrCh := make(chan error, len(tasks)) + wg := sync.WaitGroup{} + wg.Add(len(tasks)) + + for i := range tasks { + task := tasks[i] + err := p.Submit(func() { + defer wg.Done() + if err := task(); err != nil { + taskErrCh <- err + } + }) + if err != nil { + submitErrCh <- err + wg.Done() + } + } + + wg.Wait() + + if len(submitErrCh) != 0 { + return nil, <-submitErrCh + } + + return taskErrCh, nil +} diff --git a/worker_pool/worker_pool_test.go b/worker_pool/worker_pool_test.go new file mode 100644 index 0000000..acff213 --- /dev/null +++ b/worker_pool/worker_pool_test.go @@ -0,0 +1,89 @@ +package workerpool + +import ( + "errors" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNewWorkerPool(t *testing.T) { + t.Run("default size pool", func(t *testing.T) { + pool := NewWorkerPool(0) + assert.NotNil(t, pool) + }) + + t.Run("custom size pool", func(t *testing.T) { + size := 100 + pool := NewWorkerPool(size) + assert.NotNil(t, pool) + }) +} + +func TestSubmit(t *testing.T) { + t.Run("submit task to worker pool", func(t *testing.T) { + pool := NewWorkerPool(10) + var wg sync.WaitGroup + wg.Add(1) + + err := pool.Submit(func() { + defer wg.Done() + time.Sleep(100 * time.Millisecond) + }) + assert.NoError(t, err) + + wg.Wait() + }) +} + +func TestSubmitAndWaitAll(t *testing.T) { + t.Run("submit and wait all tasks succeed", func(t *testing.T) { + pool := NewWorkerPool(10) + tasks := []func() error{ + func() error { + time.Sleep(100 * time.Millisecond) + return nil + }, + func() error { + time.Sleep(50 * time.Millisecond) + return nil + }, + } + + taskErrCh, submitErr := pool.SubmitAndWaitAll(tasks...) + assert.NoError(t, submitErr) + close(taskErrCh) + for err := range taskErrCh { + assert.NoError(t, err) + } + }) + + t.Run("submit and wait all tasks with errors", func(t *testing.T) { + pool := NewWorkerPool(10) + expectedError := errors.New("task error") + tasks := []func() error{ + func() error { + time.Sleep(100 * time.Millisecond) + return nil + }, + func() error { + time.Sleep(50 * time.Millisecond) + return expectedError + }, + } + + taskErrCh, submitErr := pool.SubmitAndWaitAll(tasks...) + assert.NoError(t, submitErr) + close(taskErrCh) + foundError := false + for err := range taskErrCh { + if err != nil { + foundError = true + assert.Equal(t, expectedError, err) + } + } + assert.True(t, foundError) + }) +}