2026-05-20 07:01:08 +00:00
|
|
|
package usecase
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
|
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/zeromicro/go-zero/core/logx"
|
|
|
|
|
|
|
|
|
|
"gateway/internal/model/notification"
|
|
|
|
|
notifconfig "gateway/internal/model/notification/config"
|
|
|
|
|
domentity "gateway/internal/model/notification/domain/entity"
|
|
|
|
|
"gateway/internal/model/notification/domain/enum"
|
|
|
|
|
domrepo "gateway/internal/model/notification/domain/repository"
|
|
|
|
|
domtpl "gateway/internal/model/notification/domain/template"
|
|
|
|
|
domusecase "gateway/internal/model/notification/domain/usecase"
|
|
|
|
|
"gateway/internal/model/notification/provider/email"
|
|
|
|
|
"gateway/internal/model/notification/provider/sms"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const defaultPollInterval = time.Second
|
|
|
|
|
|
|
|
|
|
// RetryWorker processes async notification jobs from the retry queue.
|
|
|
|
|
type RetryWorker struct {
|
|
|
|
|
Repo domrepo.NotificationRepository
|
|
|
|
|
DLQ domrepo.NotificationDLQRepository
|
|
|
|
|
Queue domrepo.RetryQueue
|
|
|
|
|
delivery deliveryDeps
|
|
|
|
|
Config notifconfig.Config
|
|
|
|
|
|
|
|
|
|
pollInterval time.Duration
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// RetryWorkerParam wires async retry + DLQ processing.
|
|
|
|
|
type RetryWorkerParam struct {
|
|
|
|
|
Repo domrepo.NotificationRepository
|
|
|
|
|
DLQ domrepo.NotificationDLQRepository
|
|
|
|
|
Queue domrepo.RetryQueue
|
|
|
|
|
Renderer domtpl.Renderer
|
|
|
|
|
Email *email.Chain
|
|
|
|
|
SMS *sms.Chain
|
|
|
|
|
Config notifconfig.Config
|
|
|
|
|
PollInterval time.Duration
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewRetryWorker constructs a RetryWorker.
|
|
|
|
|
func NewRetryWorker(param RetryWorkerParam) *RetryWorker {
|
|
|
|
|
interval := param.PollInterval
|
|
|
|
|
if interval <= 0 {
|
|
|
|
|
interval = defaultPollInterval
|
|
|
|
|
}
|
|
|
|
|
return &RetryWorker{
|
|
|
|
|
Repo: param.Repo,
|
|
|
|
|
DLQ: param.DLQ,
|
|
|
|
|
Queue: param.Queue,
|
|
|
|
|
delivery: deliveryDeps{
|
|
|
|
|
Renderer: param.Renderer,
|
|
|
|
|
Email: param.Email,
|
|
|
|
|
SMS: param.SMS,
|
|
|
|
|
Config: param.Config,
|
|
|
|
|
},
|
|
|
|
|
Config: param.Config,
|
|
|
|
|
pollInterval: interval,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Run polls the retry queue until ctx is cancelled. Spawns Worker goroutines per tick.
|
|
|
|
|
func (w *RetryWorker) Run(ctx context.Context) {
|
|
|
|
|
workers := w.Config.Async.Worker
|
|
|
|
|
if workers < 1 {
|
|
|
|
|
workers = 1
|
|
|
|
|
}
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return
|
|
|
|
|
default:
|
|
|
|
|
jobs, err := w.Queue.ClaimDue(ctx, time.Now().UTC().UnixMilli(), 32)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logx.WithContext(ctx).Errorf("notification retry: claim due: %v", err)
|
|
|
|
|
w.sleep(ctx)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if len(jobs) == 0 {
|
|
|
|
|
w.sleep(ctx)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
sem := make(chan struct{}, workers)
|
|
|
|
|
for _, job := range jobs {
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
sem <- struct{}{}
|
|
|
|
|
go func(j *domusecase.RetryJob) {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
defer func() { <-sem }()
|
|
|
|
|
if err := w.ProcessJob(ctx, j); err != nil {
|
|
|
|
|
logx.WithContext(ctx).Errorf("notification retry: job %s: %v", j.NotificationID, err)
|
|
|
|
|
}
|
|
|
|
|
}(job)
|
|
|
|
|
}
|
|
|
|
|
wg.Wait()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (w *RetryWorker) sleep(ctx context.Context) {
|
|
|
|
|
t := time.NewTimer(w.pollInterval)
|
|
|
|
|
defer t.Stop()
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
case <-t.C:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ProcessJob delivers one notification or schedules retry / DLQ.
|
|
|
|
|
func (w *RetryWorker) ProcessJob(ctx context.Context, job *domusecase.RetryJob) error {
|
|
|
|
|
if job == nil {
|
|
|
|
|
return fmt.Errorf("notification: retry job is nil")
|
|
|
|
|
}
|
|
|
|
|
doc, err := w.Repo.FindByID(ctx, job.TenantID, job.NotificationID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
if errors.Is(err, notification.ErrNotFound) {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
2026-05-21 06:45:35 +00:00
|
|
|
return wrapRepoErr(err, "read notification failed")
|
2026-05-20 07:01:08 +00:00
|
|
|
}
|
|
|
|
|
if doc.Status == enum.NotifyStatusSent || doc.Status == enum.NotifyStatusDropped {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
req := sendRequestFromJob(job)
|
|
|
|
|
locale := req.Locale
|
|
|
|
|
if locale == "" {
|
|
|
|
|
locale = doc.Locale
|
|
|
|
|
}
|
|
|
|
|
rendered, err := w.delivery.Renderer.Render(req.Kind, locale, req.Data)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return w.handleFailure(ctx, doc, job, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
provider, providerMsgID, sendErr := w.delivery.deliver(ctx, req, rendered)
|
|
|
|
|
attempts := doc.Attempts + 1
|
|
|
|
|
if sendErr != nil {
|
|
|
|
|
return w.handleFailure(ctx, doc, job, fmt.Errorf("%s: %w", provider, sendErr))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
now := time.Now().UTC().UnixNano()
|
|
|
|
|
body := ""
|
|
|
|
|
if !req.DoNotPersistBody {
|
|
|
|
|
body = rendered.Body
|
|
|
|
|
}
|
|
|
|
|
if err := w.Repo.UpdateDelivery(ctx, doc.TenantID, doc.ID.Hex(), &domrepo.NotificationDeliveryUpdate{
|
|
|
|
|
Status: enum.NotifyStatusSent,
|
|
|
|
|
Provider: provider,
|
|
|
|
|
ProviderMessageID: providerMsgID,
|
|
|
|
|
Attempts: attempts,
|
|
|
|
|
Body: body,
|
|
|
|
|
DeliveredAt: &now,
|
|
|
|
|
}); err != nil {
|
2026-05-21 06:45:35 +00:00
|
|
|
return wrapRepoErr(err, "update notification delivery failed")
|
2026-05-20 07:01:08 +00:00
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (w *RetryWorker) handleFailure(ctx context.Context, doc *domentity.Notification, job *domusecase.RetryJob, cause error) error {
|
|
|
|
|
attempts := doc.Attempts + 1
|
|
|
|
|
maxRetry := effectiveMaxRetry(w.Config.Async)
|
|
|
|
|
lastErr := cause.Error()
|
|
|
|
|
|
|
|
|
|
if attempts >= maxRetry {
|
|
|
|
|
if err := w.Repo.UpdateDelivery(ctx, doc.TenantID, doc.ID.Hex(), &domrepo.NotificationDeliveryUpdate{
|
|
|
|
|
Status: enum.NotifyStatusDropped,
|
|
|
|
|
LastError: lastErr,
|
|
|
|
|
Attempts: attempts,
|
|
|
|
|
}); err != nil {
|
2026-05-21 06:45:35 +00:00
|
|
|
return wrapRepoErr(err, "update notification delivery failed")
|
2026-05-20 07:01:08 +00:00
|
|
|
}
|
|
|
|
|
return w.insertDLQ(ctx, doc, job, lastErr, attempts)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := w.Repo.UpdateDelivery(ctx, doc.TenantID, doc.ID.Hex(), &domrepo.NotificationDeliveryUpdate{
|
|
|
|
|
Status: enum.NotifyStatusRetrying,
|
|
|
|
|
LastError: lastErr,
|
|
|
|
|
Attempts: attempts,
|
|
|
|
|
}); err != nil {
|
2026-05-21 06:45:35 +00:00
|
|
|
return wrapRepoErr(err, "update notification delivery failed")
|
2026-05-20 07:01:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
runAt := time.Now().UTC().Add(retryDelay(w.Config.Async, attempts)).UnixMilli()
|
2026-05-21 06:45:35 +00:00
|
|
|
if err := w.Queue.Schedule(ctx, runAt, job); err != nil {
|
|
|
|
|
return wrapStoreErr(err, "schedule notification retry failed")
|
|
|
|
|
}
|
|
|
|
|
return nil
|
2026-05-20 07:01:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (w *RetryWorker) insertDLQ(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
doc *domentity.Notification,
|
|
|
|
|
job *domusecase.RetryJob,
|
|
|
|
|
lastErr string,
|
|
|
|
|
attempts int,
|
|
|
|
|
) error {
|
|
|
|
|
if w.DLQ == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
now := time.Now().UTC().UnixNano()
|
|
|
|
|
var payload *domentity.DLQDeliveryPayload
|
|
|
|
|
if job != nil {
|
|
|
|
|
payload = &domentity.DLQDeliveryPayload{
|
|
|
|
|
Locale: job.Locale,
|
|
|
|
|
Data: job.Data,
|
|
|
|
|
IdempotencyKey: doc.IdempotencyKey,
|
|
|
|
|
DoNotPersistBody: job.DoNotPersistBody,
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-05-21 06:45:35 +00:00
|
|
|
return wrapRepoErr(w.DLQ.Insert(ctx, &domentity.NotificationDLQ{
|
2026-05-20 07:01:08 +00:00
|
|
|
NotificationID: doc.ID.Hex(),
|
|
|
|
|
TenantID: doc.TenantID,
|
|
|
|
|
UID: doc.UID,
|
|
|
|
|
Channel: doc.Channel,
|
|
|
|
|
Kind: doc.Kind,
|
|
|
|
|
TargetHash: doc.TargetHash,
|
|
|
|
|
LastError: lastErr,
|
|
|
|
|
Attempts: attempts,
|
|
|
|
|
Payload: payload,
|
|
|
|
|
OccurredAt: doc.OccurredAt,
|
|
|
|
|
CreateAt: &now,
|
2026-05-21 06:45:35 +00:00
|
|
|
}), "insert notification dlq failed")
|
2026-05-20 07:01:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ScheduleImmediate enqueues a job to run now (used by Enqueue).
|
|
|
|
|
func ScheduleImmediate(ctx context.Context, queue domrepo.RetryQueue, job *domusecase.RetryJob) error {
|
|
|
|
|
if queue == nil {
|
2026-05-21 06:45:35 +00:00
|
|
|
return errb.SysNotImplemented("notification: retry queue is not configured")
|
2026-05-20 07:01:08 +00:00
|
|
|
}
|
|
|
|
|
return queue.Schedule(ctx, time.Now().UTC().UnixMilli(), job)
|
|
|
|
|
}
|