350 lines
10 KiB
Go
350 lines
10 KiB
Go
package usecase
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
errs "gateway/internal/library/errors"
|
|
"gateway/internal/library/errors/code"
|
|
"gateway/internal/model/notification"
|
|
notifconfig "gateway/internal/model/notification/config"
|
|
domentity "gateway/internal/model/notification/domain/entity"
|
|
"gateway/internal/model/notification/domain/enum"
|
|
domrepo "gateway/internal/model/notification/domain/repository"
|
|
domtpl "gateway/internal/model/notification/domain/template"
|
|
domusecase "gateway/internal/model/notification/domain/usecase"
|
|
"gateway/internal/model/notification/provider/email"
|
|
"gateway/internal/model/notification/provider/sms"
|
|
"gateway/internal/model/notification/template"
|
|
)
|
|
|
|
const (
|
|
idempotencyCacheTTL = 24 * time.Hour
|
|
quotaKeyTTL = 25 * time.Hour
|
|
)
|
|
|
|
var errb = errs.For(code.Facade)
|
|
|
|
// NotifierUseCaseParam wires dependencies for NotifierUseCase.
|
|
type NotifierUseCaseParam struct {
|
|
Repo domrepo.NotificationRepository
|
|
Idempotency domrepo.IdempotencyCache // optional
|
|
Quota domrepo.QuotaCounter // optional
|
|
RetryQueue domrepo.RetryQueue // required for Enqueue
|
|
Renderer domtpl.Renderer
|
|
Email *email.Chain
|
|
SMS *sms.Chain
|
|
Config notifconfig.Config
|
|
}
|
|
|
|
type notifierUseCase struct {
|
|
NotifierUseCaseParam
|
|
}
|
|
|
|
// MustNotifierUseCase constructs NotifierUseCase.
|
|
func MustNotifierUseCase(param NotifierUseCaseParam) domusecase.NotifierUseCase {
|
|
if param.Renderer == nil {
|
|
param.Renderer = template.NewRenderer(template.DefaultRegistry(), domtpl.LocaleZhTW, domtpl.LocaleEnUS)
|
|
}
|
|
return ¬ifierUseCase{param}
|
|
}
|
|
|
|
func (uc *notifierUseCase) Send(ctx context.Context, req *domusecase.SendRequest) (*domusecase.NotificationDTO, error) {
|
|
if err := validateSendRequest(req); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if dto, ok, err := uc.lookupIdempotent(ctx, req); err != nil {
|
|
return nil, err
|
|
} else if ok {
|
|
return dto, nil
|
|
}
|
|
|
|
if err := uc.checkQuota(ctx, req); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
locale := uc.resolveLocale(req.Locale)
|
|
doc := uc.newNotificationEntity(req, locale, enum.NotifyStatusPending, 0)
|
|
|
|
if err := uc.Repo.Insert(ctx, doc); err != nil {
|
|
if errors.Is(err, notification.ErrDuplicateIdempotency) {
|
|
existing, findErr := uc.Repo.FindByIdempotency(ctx, req.TenantID, req.Kind, req.IdempotencyKey)
|
|
if findErr != nil {
|
|
return nil, wrapRepoErr(findErr)
|
|
}
|
|
dto := entityToDTO(existing)
|
|
uc.cacheIdempotent(ctx, req, dto)
|
|
return dto, nil
|
|
}
|
|
return nil, wrapRepoErr(err)
|
|
}
|
|
|
|
rendered, err := uc.Renderer.Render(req.Kind, locale, req.Data)
|
|
if err != nil {
|
|
return nil, uc.markFailed(ctx, doc, err)
|
|
}
|
|
|
|
provider, providerMsgID, sendErr := uc.delivery().deliver(ctx, req, rendered)
|
|
attempts := 1
|
|
if sendErr != nil {
|
|
if updateErr := uc.Repo.UpdateDelivery(ctx, req.TenantID, doc.ID.Hex(), &domrepo.NotificationDeliveryUpdate{
|
|
Status: enum.NotifyStatusFailed,
|
|
Provider: provider,
|
|
LastError: sendErr.Error(),
|
|
Attempts: attempts,
|
|
}); updateErr != nil {
|
|
sendErr = fmt.Errorf("%w; persist failed status: %w", sendErr, updateErr)
|
|
}
|
|
doc.Status = enum.NotifyStatusFailed
|
|
doc.Attempts = attempts
|
|
doc.LastError = sendErr.Error()
|
|
doc.Provider = provider
|
|
dto := entityToDTO(doc)
|
|
uc.cacheIdempotent(ctx, req, dto)
|
|
return nil, errb.SvcThirdParty("notification delivery failed").WithCause(sendErr)
|
|
}
|
|
|
|
now := time.Now().UTC().UnixNano()
|
|
body := ""
|
|
if !req.DoNotPersistBody {
|
|
body = rendered.Body
|
|
}
|
|
if err := uc.Repo.UpdateDelivery(ctx, req.TenantID, doc.ID.Hex(), &domrepo.NotificationDeliveryUpdate{
|
|
Status: enum.NotifyStatusSent,
|
|
Provider: provider,
|
|
ProviderMessageID: providerMsgID,
|
|
Attempts: attempts,
|
|
Body: body,
|
|
DeliveredAt: &now,
|
|
}); err != nil {
|
|
return nil, wrapRepoErr(err)
|
|
}
|
|
|
|
doc.Status = enum.NotifyStatusSent
|
|
doc.Attempts = attempts
|
|
doc.Provider = provider
|
|
doc.ProviderMessageID = providerMsgID
|
|
doc.DeliveredAt = &now
|
|
if !req.DoNotPersistBody {
|
|
doc.Body = body
|
|
}
|
|
|
|
dto := entityToDTO(doc)
|
|
uc.cacheIdempotent(ctx, req, dto)
|
|
return dto, nil
|
|
}
|
|
|
|
func (uc *notifierUseCase) Enqueue(ctx context.Context, req *domusecase.SendRequest) (*domusecase.NotificationDTO, error) {
|
|
if err := validateSendRequest(req); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if dto, ok, err := uc.lookupIdempotent(ctx, req); err != nil {
|
|
return nil, err
|
|
} else if ok {
|
|
return dto, nil
|
|
}
|
|
|
|
if err := uc.checkQuota(ctx, req); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
locale := uc.resolveLocale(req.Locale)
|
|
doc := uc.newNotificationEntity(req, locale, enum.NotifyStatusPending, 0)
|
|
|
|
if err := uc.Repo.Insert(ctx, doc); err != nil {
|
|
if errors.Is(err, notification.ErrDuplicateIdempotency) {
|
|
existing, findErr := uc.Repo.FindByIdempotency(ctx, req.TenantID, req.Kind, req.IdempotencyKey)
|
|
if findErr != nil {
|
|
return nil, wrapRepoErr(findErr)
|
|
}
|
|
dto := entityToDTO(existing)
|
|
uc.cacheIdempotent(ctx, req, dto)
|
|
return dto, nil
|
|
}
|
|
return nil, wrapRepoErr(err)
|
|
}
|
|
|
|
if uc.RetryQueue == nil {
|
|
return nil, errb.SysInternal("async notification requires redis retry queue")
|
|
}
|
|
job := &domusecase.RetryJob{
|
|
NotificationID: doc.ID.Hex(),
|
|
TenantID: req.TenantID,
|
|
UID: req.UID,
|
|
Channel: req.Channel,
|
|
Kind: req.Kind,
|
|
Target: req.Target,
|
|
Locale: locale,
|
|
Data: req.Data,
|
|
DoNotPersistBody: req.DoNotPersistBody,
|
|
}
|
|
if err := ScheduleImmediate(ctx, uc.RetryQueue, job); err != nil {
|
|
return nil, errb.SysInternal("failed to schedule notification").WithCause(err)
|
|
}
|
|
|
|
dto := entityToDTO(doc)
|
|
uc.cacheIdempotent(ctx, req, dto)
|
|
return dto, nil
|
|
}
|
|
|
|
func (uc *notifierUseCase) Get(ctx context.Context, tenantID, notificationID string) (*domusecase.NotificationDTO, error) {
|
|
doc, err := uc.Repo.FindByID(ctx, tenantID, notificationID)
|
|
if err != nil {
|
|
return nil, wrapRepoErr(err)
|
|
}
|
|
return entityToDTO(doc), nil
|
|
}
|
|
|
|
func (uc *notifierUseCase) delivery() deliveryDeps {
|
|
return deliveryDeps{
|
|
Renderer: uc.Renderer,
|
|
Email: uc.Email,
|
|
SMS: uc.SMS,
|
|
Config: uc.Config,
|
|
}
|
|
}
|
|
|
|
func (uc *notifierUseCase) newNotificationEntity(req *domusecase.SendRequest, locale string, status enum.NotifyStatus, attempts int) *domentity.Notification {
|
|
severity := req.Severity
|
|
if severity == "" {
|
|
severity = enum.SeverityInfo
|
|
}
|
|
return &domentity.Notification{
|
|
TenantID: req.TenantID,
|
|
UID: req.UID,
|
|
Channel: req.Channel,
|
|
Kind: req.Kind,
|
|
TargetHash: hashTarget(req.Target),
|
|
TemplateKey: string(req.Kind),
|
|
Locale: locale,
|
|
Status: status,
|
|
Attempts: attempts,
|
|
IdempotencyKey: req.IdempotencyKey,
|
|
Severity: severity,
|
|
}
|
|
}
|
|
|
|
func (uc *notifierUseCase) resolveLocale(locale string) string {
|
|
if locale != "" {
|
|
return locale
|
|
}
|
|
if uc.Config.DefaultLocale != "" {
|
|
return uc.Config.DefaultLocale
|
|
}
|
|
return domtpl.LocaleZhTW
|
|
}
|
|
|
|
func (uc *notifierUseCase) lookupIdempotent(ctx context.Context, req *domusecase.SendRequest) (*domusecase.NotificationDTO, bool, error) {
|
|
if uc.Idempotency != nil {
|
|
key := notification.GetIdempotencyRedisKey(req.TenantID, string(req.Kind), req.IdempotencyKey)
|
|
raw, err := uc.Idempotency.Get(ctx, key)
|
|
if err != nil {
|
|
return nil, false, errb.SysInternal("idempotency cache read failed").WithCause(err)
|
|
}
|
|
if len(raw) > 0 {
|
|
var dto domusecase.NotificationDTO
|
|
if err := json.Unmarshal(raw, &dto); err != nil {
|
|
return nil, false, errb.SysInternal("idempotency cache decode failed").WithCause(err)
|
|
}
|
|
return &dto, true, nil
|
|
}
|
|
}
|
|
|
|
existing, err := uc.Repo.FindByIdempotency(ctx, req.TenantID, req.Kind, req.IdempotencyKey)
|
|
if err != nil {
|
|
if errors.Is(err, notification.ErrNotFound) {
|
|
return nil, false, nil
|
|
}
|
|
return nil, false, wrapRepoErr(err)
|
|
}
|
|
dto := entityToDTO(existing)
|
|
uc.cacheIdempotent(ctx, req, dto)
|
|
return dto, true, nil
|
|
}
|
|
|
|
func (uc *notifierUseCase) cacheIdempotent(ctx context.Context, req *domusecase.SendRequest, dto *domusecase.NotificationDTO) {
|
|
if uc.Idempotency == nil || dto == nil {
|
|
return
|
|
}
|
|
raw, err := json.Marshal(dto)
|
|
if err != nil {
|
|
return
|
|
}
|
|
key := notification.GetIdempotencyRedisKey(req.TenantID, string(req.Kind), req.IdempotencyKey)
|
|
if err := uc.Idempotency.Set(ctx, key, raw, idempotencyCacheTTL); err != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
func (uc *notifierUseCase) checkQuota(ctx context.Context, req *domusecase.SendRequest) error {
|
|
if uc.Quota == nil {
|
|
return nil
|
|
}
|
|
limit := uc.quotaLimit(req.Channel)
|
|
if limit <= 0 {
|
|
return nil
|
|
}
|
|
day := time.Now().UTC().Format("20060102")
|
|
key := notification.GetQuotaRedisKey(req.TenantID, string(req.Channel), day)
|
|
count, err := uc.Quota.Incr(ctx, key, quotaKeyTTL)
|
|
if err != nil {
|
|
return errb.SysInternal("quota check failed").WithCause(err)
|
|
}
|
|
if count > int64(limit) {
|
|
return errb.ResInsufficientQuota("notification daily quota exceeded")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (uc *notifierUseCase) quotaLimit(ch enum.Channel) int {
|
|
switch ch {
|
|
case enum.ChannelEmail:
|
|
return uc.Config.RatePerTenant.Email
|
|
case enum.ChannelSMS:
|
|
return uc.Config.RatePerTenant.SMS
|
|
default:
|
|
return 0
|
|
}
|
|
}
|
|
|
|
func (uc *notifierUseCase) markFailed(ctx context.Context, doc *domentity.Notification, cause error) error {
|
|
if updateErr := uc.Repo.UpdateDelivery(ctx, doc.TenantID, doc.ID.Hex(), &domrepo.NotificationDeliveryUpdate{
|
|
Status: enum.NotifyStatusFailed,
|
|
LastError: cause.Error(),
|
|
Attempts: 1,
|
|
}); updateErr != nil {
|
|
cause = fmt.Errorf("%w; persist failed status: %w", cause, updateErr)
|
|
}
|
|
return errb.SysInternal("notification template render failed").WithCause(cause)
|
|
}
|
|
|
|
func validateSendRequest(req *domusecase.SendRequest) error {
|
|
if req == nil {
|
|
return errb.InputMissingRequired("send request is required")
|
|
}
|
|
if req.TenantID == "" {
|
|
return errb.InputMissingRequired("tenant_id is required")
|
|
}
|
|
if req.IdempotencyKey == "" {
|
|
return errb.InputMissingRequired("idempotency_key is required")
|
|
}
|
|
if !req.Channel.IsValid() {
|
|
return errb.InputInvalidFormat("invalid notification channel")
|
|
}
|
|
if req.Target == "" {
|
|
return errb.InputMissingRequired("target is required")
|
|
}
|
|
if req.Kind == "" {
|
|
return errb.InputMissingRequired("kind is required")
|
|
}
|
|
if !req.Kind.IsValid() {
|
|
return errb.InputInvalidFormat("invalid notification kind")
|
|
}
|
|
return nil
|
|
}
|