template-monorepo/internal/model/notification/usecase/notifier_usecase.go

346 lines
10 KiB
Go
Raw Permalink Normal View History

package usecase
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"gateway/internal/model/notification"
notifconfig "gateway/internal/model/notification/config"
domentity "gateway/internal/model/notification/domain/entity"
"gateway/internal/model/notification/domain/enum"
domrepo "gateway/internal/model/notification/domain/repository"
domtpl "gateway/internal/model/notification/domain/template"
domusecase "gateway/internal/model/notification/domain/usecase"
"gateway/internal/model/notification/provider/email"
"gateway/internal/model/notification/provider/sms"
"gateway/internal/model/notification/template"
)
const (
idempotencyCacheTTL = 24 * time.Hour
quotaKeyTTL = 25 * time.Hour
)
// NotifierUseCaseParam wires dependencies for NotifierUseCase.
type NotifierUseCaseParam struct {
Repo domrepo.NotificationRepository
Idempotency domrepo.IdempotencyCache // optional
Quota domrepo.QuotaCounter // optional
RetryQueue domrepo.RetryQueue // required for Enqueue
Renderer domtpl.Renderer
Email *email.Chain
SMS *sms.Chain
Config notifconfig.Config
}
type notifierUseCase struct {
NotifierUseCaseParam
}
// MustNotifierUseCase constructs NotifierUseCase.
func MustNotifierUseCase(param NotifierUseCaseParam) domusecase.NotifierUseCase {
if param.Renderer == nil {
param.Renderer = template.NewRenderer(template.DefaultRegistry(), domtpl.LocaleZhTW, domtpl.LocaleEnUS)
}
return &notifierUseCase{param}
}
func (uc *notifierUseCase) Send(ctx context.Context, req *domusecase.SendRequest) (*domusecase.NotificationDTO, error) {
if err := validateSendRequest(req); err != nil {
return nil, err
}
if dto, ok, err := uc.lookupIdempotent(ctx, req); err != nil {
return nil, err
} else if ok {
return dto, nil
}
if err := uc.checkQuota(ctx, req); err != nil {
return nil, err
}
locale := uc.resolveLocale(req.Locale)
doc := uc.newNotificationEntity(req, locale, enum.NotifyStatusPending, 0)
if err := uc.Repo.Insert(ctx, doc); err != nil {
if errors.Is(err, notification.ErrDuplicateIdempotency) {
existing, findErr := uc.Repo.FindByIdempotency(ctx, req.TenantID, req.Kind, req.IdempotencyKey)
if findErr != nil {
return nil, wrapRepoErr(findErr)
}
dto := entityToDTO(existing)
uc.cacheIdempotent(ctx, req, dto)
return dto, nil
}
return nil, wrapRepoErr(err)
}
rendered, err := uc.Renderer.Render(req.Kind, locale, req.Data)
if err != nil {
return nil, uc.markFailed(ctx, doc, err)
}
provider, providerMsgID, sendErr := uc.delivery().deliver(ctx, req, rendered)
attempts := 1
if sendErr != nil {
if updateErr := uc.Repo.UpdateDelivery(ctx, req.TenantID, doc.ID.Hex(), &domrepo.NotificationDeliveryUpdate{
Status: enum.NotifyStatusFailed,
Provider: provider,
LastError: sendErr.Error(),
Attempts: attempts,
}); updateErr != nil {
sendErr = fmt.Errorf("%w; persist failed status: %w", sendErr, updateErr)
}
doc.Status = enum.NotifyStatusFailed
doc.Attempts = attempts
doc.LastError = sendErr.Error()
doc.Provider = provider
dto := entityToDTO(doc)
uc.cacheIdempotent(ctx, req, dto)
return nil, errb.SvcThirdParty("notification delivery failed").WithCause(sendErr)
}
now := time.Now().UTC().UnixNano()
body := ""
if !req.DoNotPersistBody {
body = rendered.Body
}
if err := uc.Repo.UpdateDelivery(ctx, req.TenantID, doc.ID.Hex(), &domrepo.NotificationDeliveryUpdate{
Status: enum.NotifyStatusSent,
Provider: provider,
ProviderMessageID: providerMsgID,
Attempts: attempts,
Body: body,
DeliveredAt: &now,
}); err != nil {
return nil, wrapRepoErr(err)
}
doc.Status = enum.NotifyStatusSent
doc.Attempts = attempts
doc.Provider = provider
doc.ProviderMessageID = providerMsgID
doc.DeliveredAt = &now
if !req.DoNotPersistBody {
doc.Body = body
}
dto := entityToDTO(doc)
uc.cacheIdempotent(ctx, req, dto)
return dto, nil
}
func (uc *notifierUseCase) Enqueue(ctx context.Context, req *domusecase.SendRequest) (*domusecase.NotificationDTO, error) {
if err := validateSendRequest(req); err != nil {
return nil, err
}
if dto, ok, err := uc.lookupIdempotent(ctx, req); err != nil {
return nil, err
} else if ok {
return dto, nil
}
if err := uc.checkQuota(ctx, req); err != nil {
return nil, err
}
locale := uc.resolveLocale(req.Locale)
doc := uc.newNotificationEntity(req, locale, enum.NotifyStatusPending, 0)
if err := uc.Repo.Insert(ctx, doc); err != nil {
if errors.Is(err, notification.ErrDuplicateIdempotency) {
existing, findErr := uc.Repo.FindByIdempotency(ctx, req.TenantID, req.Kind, req.IdempotencyKey)
if findErr != nil {
return nil, wrapRepoErr(findErr)
}
dto := entityToDTO(existing)
uc.cacheIdempotent(ctx, req, dto)
return dto, nil
}
return nil, wrapRepoErr(err)
}
if uc.RetryQueue == nil {
return nil, errb.SysNotImplemented("async notification requires redis retry queue")
}
job := &domusecase.RetryJob{
NotificationID: doc.ID.Hex(),
TenantID: req.TenantID,
UID: req.UID,
Channel: req.Channel,
Kind: req.Kind,
Target: req.Target,
Locale: locale,
Data: req.Data,
DoNotPersistBody: req.DoNotPersistBody,
}
if err := ScheduleImmediate(ctx, uc.RetryQueue, job); err != nil {
return nil, wrapStoreErr(err, "failed to schedule notification")
}
dto := entityToDTO(doc)
uc.cacheIdempotent(ctx, req, dto)
return dto, nil
}
func (uc *notifierUseCase) Get(ctx context.Context, tenantID, notificationID string) (*domusecase.NotificationDTO, error) {
doc, err := uc.Repo.FindByID(ctx, tenantID, notificationID)
if err != nil {
return nil, wrapRepoErr(err)
}
return entityToDTO(doc), nil
}
func (uc *notifierUseCase) delivery() deliveryDeps {
return deliveryDeps{
Renderer: uc.Renderer,
Email: uc.Email,
SMS: uc.SMS,
Config: uc.Config,
}
}
func (uc *notifierUseCase) newNotificationEntity(req *domusecase.SendRequest, locale string, status enum.NotifyStatus, attempts int) *domentity.Notification {
severity := req.Severity
if severity == "" {
severity = enum.SeverityInfo
}
return &domentity.Notification{
TenantID: req.TenantID,
UID: req.UID,
Channel: req.Channel,
Kind: req.Kind,
TargetHash: hashTarget(req.Target),
TemplateKey: string(req.Kind),
Locale: locale,
Status: status,
Attempts: attempts,
IdempotencyKey: req.IdempotencyKey,
Severity: severity,
}
}
func (uc *notifierUseCase) resolveLocale(locale string) string {
if locale != "" {
return locale
}
if uc.Config.DefaultLocale != "" {
return uc.Config.DefaultLocale
}
return domtpl.LocaleZhTW
}
func (uc *notifierUseCase) lookupIdempotent(ctx context.Context, req *domusecase.SendRequest) (*domusecase.NotificationDTO, bool, error) {
if uc.Idempotency != nil {
key := notification.GetIdempotencyRedisKey(req.TenantID, string(req.Kind), req.IdempotencyKey)
raw, err := uc.Idempotency.Get(ctx, key)
if err != nil {
return nil, false, wrapStoreErr(err, "idempotency cache read failed")
}
if len(raw) > 0 {
var dto domusecase.NotificationDTO
if err := json.Unmarshal(raw, &dto); err != nil {
return nil, false, errb.DBDataConvert("idempotency cache decode failed").WithCause(err)
}
return &dto, true, nil
}
}
existing, err := uc.Repo.FindByIdempotency(ctx, req.TenantID, req.Kind, req.IdempotencyKey)
if err != nil {
if errors.Is(err, notification.ErrNotFound) {
return nil, false, nil
}
return nil, false, wrapRepoErr(err)
}
dto := entityToDTO(existing)
uc.cacheIdempotent(ctx, req, dto)
return dto, true, nil
}
func (uc *notifierUseCase) cacheIdempotent(ctx context.Context, req *domusecase.SendRequest, dto *domusecase.NotificationDTO) {
if uc.Idempotency == nil || dto == nil {
return
}
raw, err := json.Marshal(dto)
if err != nil {
return
}
key := notification.GetIdempotencyRedisKey(req.TenantID, string(req.Kind), req.IdempotencyKey)
if err := uc.Idempotency.Set(ctx, key, raw, idempotencyCacheTTL); err != nil {
return
}
}
func (uc *notifierUseCase) checkQuota(ctx context.Context, req *domusecase.SendRequest) error {
if uc.Quota == nil {
return nil
}
limit := uc.quotaLimit(req.Channel)
if limit <= 0 {
return nil
}
day := time.Now().UTC().Format("20060102")
key := notification.GetQuotaRedisKey(req.TenantID, string(req.Channel), day)
count, err := uc.Quota.Incr(ctx, key, quotaKeyTTL)
if err != nil {
return wrapStoreErr(err, "quota check failed")
}
if count > int64(limit) {
return errb.ResInsufficientQuota("notification daily quota exceeded")
}
return nil
}
func (uc *notifierUseCase) quotaLimit(ch enum.Channel) int {
switch ch {
case enum.ChannelEmail:
return uc.Config.RatePerTenant.Email
case enum.ChannelSMS:
return uc.Config.RatePerTenant.SMS
default:
return 0
}
}
func (uc *notifierUseCase) markFailed(ctx context.Context, doc *domentity.Notification, cause error) error {
if updateErr := uc.Repo.UpdateDelivery(ctx, doc.TenantID, doc.ID.Hex(), &domrepo.NotificationDeliveryUpdate{
Status: enum.NotifyStatusFailed,
LastError: cause.Error(),
Attempts: 1,
}); updateErr != nil {
cause = fmt.Errorf("%w; persist failed status: %w", cause, updateErr)
}
return errb.SvcInternal("notification template render failed").WithCause(cause)
}
func validateSendRequest(req *domusecase.SendRequest) error {
if req == nil {
return errb.InputMissingRequired("send request is required")
}
if req.TenantID == "" {
return errb.InputMissingRequired("tenant_id is required")
}
if req.IdempotencyKey == "" {
return errb.InputMissingRequired("idempotency_key is required")
}
if !req.Channel.IsValid() {
return errb.InputInvalidFormat("invalid notification channel")
}
if req.Target == "" {
return errb.InputMissingRequired("target is required")
}
if req.Kind == "" {
return errb.InputMissingRequired("kind is required")
}
if !req.Kind.IsValid() {
return errb.InputInvalidFormat("invalid notification kind")
}
return nil
}