package usecase import ( "context" "errors" "fmt" "sync" "time" "github.com/zeromicro/go-zero/core/logx" "gateway/internal/model/notification" notifconfig "gateway/internal/model/notification/config" domentity "gateway/internal/model/notification/domain/entity" "gateway/internal/model/notification/domain/enum" domrepo "gateway/internal/model/notification/domain/repository" domtpl "gateway/internal/model/notification/domain/template" domusecase "gateway/internal/model/notification/domain/usecase" "gateway/internal/model/notification/provider/email" "gateway/internal/model/notification/provider/sms" ) const defaultPollInterval = time.Second // RetryWorker processes async notification jobs from the retry queue. type RetryWorker struct { Repo domrepo.NotificationRepository DLQ domrepo.NotificationDLQRepository Queue domrepo.RetryQueue delivery deliveryDeps Config notifconfig.Config pollInterval time.Duration } // RetryWorkerParam wires async retry + DLQ processing. type RetryWorkerParam struct { Repo domrepo.NotificationRepository DLQ domrepo.NotificationDLQRepository Queue domrepo.RetryQueue Renderer domtpl.Renderer Email *email.Chain SMS *sms.Chain Config notifconfig.Config PollInterval time.Duration } // NewRetryWorker constructs a RetryWorker. func NewRetryWorker(param RetryWorkerParam) *RetryWorker { interval := param.PollInterval if interval <= 0 { interval = defaultPollInterval } return &RetryWorker{ Repo: param.Repo, DLQ: param.DLQ, Queue: param.Queue, delivery: deliveryDeps{ Renderer: param.Renderer, Email: param.Email, SMS: param.SMS, Config: param.Config, }, Config: param.Config, pollInterval: interval, } } // Run polls the retry queue until ctx is cancelled. Spawns Worker goroutines per tick. func (w *RetryWorker) Run(ctx context.Context) { workers := w.Config.Async.Worker if workers < 1 { workers = 1 } for { select { case <-ctx.Done(): return default: jobs, err := w.Queue.ClaimDue(ctx, time.Now().UTC().UnixMilli(), 32) if err != nil { logx.WithContext(ctx).Errorf("notification retry: claim due: %v", err) w.sleep(ctx) continue } if len(jobs) == 0 { w.sleep(ctx) continue } var wg sync.WaitGroup sem := make(chan struct{}, workers) for _, job := range jobs { wg.Add(1) sem <- struct{}{} go func(j *domusecase.RetryJob) { defer wg.Done() defer func() { <-sem }() if err := w.ProcessJob(ctx, j); err != nil { logx.WithContext(ctx).Errorf("notification retry: job %s: %v", j.NotificationID, err) } }(job) } wg.Wait() } } } func (w *RetryWorker) sleep(ctx context.Context) { t := time.NewTimer(w.pollInterval) defer t.Stop() select { case <-ctx.Done(): case <-t.C: } } // ProcessJob delivers one notification or schedules retry / DLQ. func (w *RetryWorker) ProcessJob(ctx context.Context, job *domusecase.RetryJob) error { if job == nil { return fmt.Errorf("notification: retry job is nil") } doc, err := w.Repo.FindByID(ctx, job.TenantID, job.NotificationID) if err != nil { if errors.Is(err, notification.ErrNotFound) { return nil } return wrapRepoErr(err, "read notification failed") } if doc.Status == enum.NotifyStatusSent || doc.Status == enum.NotifyStatusDropped { return nil } req := sendRequestFromJob(job) locale := req.Locale if locale == "" { locale = doc.Locale } rendered, err := w.delivery.Renderer.Render(req.Kind, locale, req.Data) if err != nil { return w.handleFailure(ctx, doc, job, err) } provider, providerMsgID, sendErr := w.delivery.deliver(ctx, req, rendered) attempts := doc.Attempts + 1 if sendErr != nil { return w.handleFailure(ctx, doc, job, fmt.Errorf("%s: %w", provider, sendErr)) } now := time.Now().UTC().UnixNano() body := "" if !req.DoNotPersistBody { body = rendered.Body } if err := w.Repo.UpdateDelivery(ctx, doc.TenantID, doc.ID.Hex(), &domrepo.NotificationDeliveryUpdate{ Status: enum.NotifyStatusSent, Provider: provider, ProviderMessageID: providerMsgID, Attempts: attempts, Body: body, DeliveredAt: &now, }); err != nil { return wrapRepoErr(err, "update notification delivery failed") } return nil } func (w *RetryWorker) handleFailure(ctx context.Context, doc *domentity.Notification, job *domusecase.RetryJob, cause error) error { attempts := doc.Attempts + 1 maxRetry := effectiveMaxRetry(w.Config.Async) lastErr := cause.Error() if attempts >= maxRetry { if err := w.Repo.UpdateDelivery(ctx, doc.TenantID, doc.ID.Hex(), &domrepo.NotificationDeliveryUpdate{ Status: enum.NotifyStatusDropped, LastError: lastErr, Attempts: attempts, }); err != nil { return wrapRepoErr(err, "update notification delivery failed") } return w.insertDLQ(ctx, doc, job, lastErr, attempts) } if err := w.Repo.UpdateDelivery(ctx, doc.TenantID, doc.ID.Hex(), &domrepo.NotificationDeliveryUpdate{ Status: enum.NotifyStatusRetrying, LastError: lastErr, Attempts: attempts, }); err != nil { return wrapRepoErr(err, "update notification delivery failed") } runAt := time.Now().UTC().Add(retryDelay(w.Config.Async, attempts)).UnixMilli() if err := w.Queue.Schedule(ctx, runAt, job); err != nil { return wrapStoreErr(err, "schedule notification retry failed") } return nil } func (w *RetryWorker) insertDLQ( ctx context.Context, doc *domentity.Notification, job *domusecase.RetryJob, lastErr string, attempts int, ) error { if w.DLQ == nil { return nil } now := time.Now().UTC().UnixNano() var payload *domentity.DLQDeliveryPayload if job != nil { payload = &domentity.DLQDeliveryPayload{ Locale: job.Locale, Data: job.Data, IdempotencyKey: doc.IdempotencyKey, DoNotPersistBody: job.DoNotPersistBody, } } return wrapRepoErr(w.DLQ.Insert(ctx, &domentity.NotificationDLQ{ NotificationID: doc.ID.Hex(), TenantID: doc.TenantID, UID: doc.UID, Channel: doc.Channel, Kind: doc.Kind, TargetHash: doc.TargetHash, LastError: lastErr, Attempts: attempts, Payload: payload, OccurredAt: doc.OccurredAt, CreateAt: &now, }), "insert notification dlq failed") } // ScheduleImmediate enqueues a job to run now (used by Enqueue). func ScheduleImmediate(ctx context.Context, queue domrepo.RetryQueue, job *domusecase.RetryJob) error { if queue == nil { return errb.SysNotImplemented("notification: retry queue is not configured") } return queue.Schedule(ctx, time.Now().UTC().UnixMilli(), job) }