template-monorepo/internal/model/notification/usecase/retry_worker.go

237 lines
6.2 KiB
Go
Raw Normal View History

package usecase
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/zeromicro/go-zero/core/logx"
"gateway/internal/model/notification"
notifconfig "gateway/internal/model/notification/config"
domentity "gateway/internal/model/notification/domain/entity"
"gateway/internal/model/notification/domain/enum"
domrepo "gateway/internal/model/notification/domain/repository"
domtpl "gateway/internal/model/notification/domain/template"
domusecase "gateway/internal/model/notification/domain/usecase"
"gateway/internal/model/notification/provider/email"
"gateway/internal/model/notification/provider/sms"
)
const defaultPollInterval = time.Second
// RetryWorker processes async notification jobs from the retry queue.
type RetryWorker struct {
Repo domrepo.NotificationRepository
DLQ domrepo.NotificationDLQRepository
Queue domrepo.RetryQueue
delivery deliveryDeps
Config notifconfig.Config
pollInterval time.Duration
}
// RetryWorkerParam wires async retry + DLQ processing.
type RetryWorkerParam struct {
Repo domrepo.NotificationRepository
DLQ domrepo.NotificationDLQRepository
Queue domrepo.RetryQueue
Renderer domtpl.Renderer
Email *email.Chain
SMS *sms.Chain
Config notifconfig.Config
PollInterval time.Duration
}
// NewRetryWorker constructs a RetryWorker.
func NewRetryWorker(param RetryWorkerParam) *RetryWorker {
interval := param.PollInterval
if interval <= 0 {
interval = defaultPollInterval
}
return &RetryWorker{
Repo: param.Repo,
DLQ: param.DLQ,
Queue: param.Queue,
delivery: deliveryDeps{
Renderer: param.Renderer,
Email: param.Email,
SMS: param.SMS,
Config: param.Config,
},
Config: param.Config,
pollInterval: interval,
}
}
// Run polls the retry queue until ctx is cancelled. Spawns Worker goroutines per tick.
func (w *RetryWorker) Run(ctx context.Context) {
workers := w.Config.Async.Worker
if workers < 1 {
workers = 1
}
for {
select {
case <-ctx.Done():
return
default:
jobs, err := w.Queue.ClaimDue(ctx, time.Now().UTC().UnixMilli(), 32)
if err != nil {
logx.WithContext(ctx).Errorf("notification retry: claim due: %v", err)
w.sleep(ctx)
continue
}
if len(jobs) == 0 {
w.sleep(ctx)
continue
}
var wg sync.WaitGroup
sem := make(chan struct{}, workers)
for _, job := range jobs {
wg.Add(1)
sem <- struct{}{}
go func(j *domusecase.RetryJob) {
defer wg.Done()
defer func() { <-sem }()
if err := w.ProcessJob(ctx, j); err != nil {
logx.WithContext(ctx).Errorf("notification retry: job %s: %v", j.NotificationID, err)
}
}(job)
}
wg.Wait()
}
}
}
func (w *RetryWorker) sleep(ctx context.Context) {
t := time.NewTimer(w.pollInterval)
defer t.Stop()
select {
case <-ctx.Done():
case <-t.C:
}
}
// ProcessJob delivers one notification or schedules retry / DLQ.
func (w *RetryWorker) ProcessJob(ctx context.Context, job *domusecase.RetryJob) error {
if job == nil {
return fmt.Errorf("notification: retry job is nil")
}
doc, err := w.Repo.FindByID(ctx, job.TenantID, job.NotificationID)
if err != nil {
if errors.Is(err, notification.ErrNotFound) {
return nil
}
return err
}
if doc.Status == enum.NotifyStatusSent || doc.Status == enum.NotifyStatusDropped {
return nil
}
req := sendRequestFromJob(job)
locale := req.Locale
if locale == "" {
locale = doc.Locale
}
rendered, err := w.delivery.Renderer.Render(req.Kind, locale, req.Data)
if err != nil {
return w.handleFailure(ctx, doc, job, err)
}
provider, providerMsgID, sendErr := w.delivery.deliver(ctx, req, rendered)
attempts := doc.Attempts + 1
if sendErr != nil {
return w.handleFailure(ctx, doc, job, fmt.Errorf("%s: %w", provider, sendErr))
}
now := time.Now().UTC().UnixNano()
body := ""
if !req.DoNotPersistBody {
body = rendered.Body
}
if err := w.Repo.UpdateDelivery(ctx, doc.TenantID, doc.ID.Hex(), &domrepo.NotificationDeliveryUpdate{
Status: enum.NotifyStatusSent,
Provider: provider,
ProviderMessageID: providerMsgID,
Attempts: attempts,
Body: body,
DeliveredAt: &now,
}); err != nil {
return err
}
return nil
}
func (w *RetryWorker) handleFailure(ctx context.Context, doc *domentity.Notification, job *domusecase.RetryJob, cause error) error {
attempts := doc.Attempts + 1
maxRetry := effectiveMaxRetry(w.Config.Async)
lastErr := cause.Error()
if attempts >= maxRetry {
if err := w.Repo.UpdateDelivery(ctx, doc.TenantID, doc.ID.Hex(), &domrepo.NotificationDeliveryUpdate{
Status: enum.NotifyStatusDropped,
LastError: lastErr,
Attempts: attempts,
}); err != nil {
return err
}
return w.insertDLQ(ctx, doc, job, lastErr, attempts)
}
if err := w.Repo.UpdateDelivery(ctx, doc.TenantID, doc.ID.Hex(), &domrepo.NotificationDeliveryUpdate{
Status: enum.NotifyStatusRetrying,
LastError: lastErr,
Attempts: attempts,
}); err != nil {
return err
}
runAt := time.Now().UTC().Add(retryDelay(w.Config.Async, attempts)).UnixMilli()
return w.Queue.Schedule(ctx, runAt, job)
}
func (w *RetryWorker) insertDLQ(
ctx context.Context,
doc *domentity.Notification,
job *domusecase.RetryJob,
lastErr string,
attempts int,
) error {
if w.DLQ == nil {
return nil
}
now := time.Now().UTC().UnixNano()
var payload *domentity.DLQDeliveryPayload
if job != nil {
payload = &domentity.DLQDeliveryPayload{
Locale: job.Locale,
Data: job.Data,
IdempotencyKey: doc.IdempotencyKey,
DoNotPersistBody: job.DoNotPersistBody,
}
}
return w.DLQ.Insert(ctx, &domentity.NotificationDLQ{
NotificationID: doc.ID.Hex(),
TenantID: doc.TenantID,
UID: doc.UID,
Channel: doc.Channel,
Kind: doc.Kind,
TargetHash: doc.TargetHash,
LastError: lastErr,
Attempts: attempts,
Payload: payload,
OccurredAt: doc.OccurredAt,
CreateAt: &now,
})
}
// ScheduleImmediate enqueues a job to run now (used by Enqueue).
func ScheduleImmediate(ctx context.Context, queue domrepo.RetryQueue, job *domusecase.RetryJob) error {
if queue == nil {
return fmt.Errorf("notification: retry queue is not configured")
}
return queue.Schedule(ctx, time.Now().UTC().UnixMilli(), job)
}