package usecase import ( "context" domentity "gateway/internal/model/notification/domain/entity" "gateway/internal/model/notification/domain/enum" domrepo "gateway/internal/model/notification/domain/repository" domusecase "gateway/internal/model/notification/domain/usecase" ) type adminNotifierUseCase struct { repo domrepo.NotificationRepository dlq domrepo.NotificationDLQRepository queue domrepo.RetryQueue } // AdminNotifierUseCaseParam wires admin notification operations. type AdminNotifierUseCaseParam struct { Repo domrepo.NotificationRepository DLQ domrepo.NotificationDLQRepository Queue domrepo.RetryQueue } // NewAdminNotifierUseCase constructs AdminNotifierUseCase. func NewAdminNotifierUseCase(param AdminNotifierUseCaseParam) domusecase.AdminNotifierUseCase { return &adminNotifierUseCase{ repo: param.Repo, dlq: param.DLQ, queue: param.Queue, } } func (uc *adminNotifierUseCase) ListDLQ(ctx context.Context, tenantID string, limit int64) ([]*domusecase.DLQEntryDTO, error) { if tenantID == "" { return nil, errb.InputMissingRequired("tenant_id is required") } rows, err := uc.dlq.ListByTenant(ctx, tenantID, limit) if err != nil { return nil, errb.DBError("list notification dlq failed").WithCause(err) } out := make([]*domusecase.DLQEntryDTO, 0, len(rows)) for _, row := range rows { out = append(out, dlqToDTO(row)) } return out, nil } func (uc *adminNotifierUseCase) RetryDLQ(ctx context.Context, tenantID, dlqID, target string) (*domusecase.NotificationDTO, error) { if tenantID == "" || dlqID == "" { return nil, errb.InputMissingRequired("tenant_id and dlq_id are required") } if target == "" { return nil, errb.InputMissingRequired("target is required to retry dlq delivery") } if uc.queue == nil { return nil, errb.SysInternal("retry queue is not configured") } row, err := uc.dlq.FindByID(ctx, tenantID, dlqID) if err != nil { return nil, wrapRepoErr(err) } if row.Payload == nil { return nil, errb.ResInvalidState("dlq entry has no retry payload") } doc, err := uc.repo.FindByID(ctx, tenantID, row.NotificationID) if err != nil { return nil, wrapRepoErr(err) } if err := uc.repo.UpdateDelivery(ctx, tenantID, doc.ID.Hex(), &domrepo.NotificationDeliveryUpdate{ Status: enum.NotifyStatusPending, Attempts: 0, LastError: "", }); err != nil { return nil, wrapRepoErr(err) } job := &domusecase.RetryJob{ NotificationID: row.NotificationID, TenantID: tenantID, UID: row.UID, Channel: row.Channel, Kind: row.Kind, Target: target, Locale: row.Payload.Locale, Data: row.Payload.Data, DoNotPersistBody: row.Payload.DoNotPersistBody, } if err := ScheduleImmediate(ctx, uc.queue, job); err != nil { return nil, errb.SysInternal("schedule dlq retry failed").WithCause(err) } doc.Status = enum.NotifyStatusPending doc.Attempts = 0 doc.LastError = "" return entityToDTO(doc), nil } func dlqToDTO(row *domentity.NotificationDLQ) *domusecase.DLQEntryDTO { if row == nil { return nil } dto := &domusecase.DLQEntryDTO{ ID: row.ID.Hex(), NotificationID: row.NotificationID, TenantID: row.TenantID, UID: row.UID, Channel: string(row.Channel), Kind: string(row.Kind), TargetHash: row.TargetHash, LastError: row.LastError, Attempts: row.Attempts, HasRetryPayload: row.Payload != nil, } if row.OccurredAt != nil { dto.OccurredAt = *row.OccurredAt } return dto }