2026-05-20 07:01:08 +00:00
|
|
|
package usecase
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
|
|
|
|
|
domentity "gateway/internal/model/notification/domain/entity"
|
|
|
|
|
"gateway/internal/model/notification/domain/enum"
|
|
|
|
|
domrepo "gateway/internal/model/notification/domain/repository"
|
|
|
|
|
domusecase "gateway/internal/model/notification/domain/usecase"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type adminNotifierUseCase struct {
|
|
|
|
|
repo domrepo.NotificationRepository
|
|
|
|
|
dlq domrepo.NotificationDLQRepository
|
|
|
|
|
queue domrepo.RetryQueue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// AdminNotifierUseCaseParam wires admin notification operations.
|
|
|
|
|
type AdminNotifierUseCaseParam struct {
|
|
|
|
|
Repo domrepo.NotificationRepository
|
|
|
|
|
DLQ domrepo.NotificationDLQRepository
|
|
|
|
|
Queue domrepo.RetryQueue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewAdminNotifierUseCase constructs AdminNotifierUseCase.
|
|
|
|
|
func NewAdminNotifierUseCase(param AdminNotifierUseCaseParam) domusecase.AdminNotifierUseCase {
|
|
|
|
|
return &adminNotifierUseCase{
|
|
|
|
|
repo: param.Repo,
|
|
|
|
|
dlq: param.DLQ,
|
|
|
|
|
queue: param.Queue,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (uc *adminNotifierUseCase) ListDLQ(ctx context.Context, tenantID string, limit int64) ([]*domusecase.DLQEntryDTO, error) {
|
|
|
|
|
if tenantID == "" {
|
|
|
|
|
return nil, errb.InputMissingRequired("tenant_id is required")
|
|
|
|
|
}
|
|
|
|
|
rows, err := uc.dlq.ListByTenant(ctx, tenantID, limit)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, errb.DBError("list notification dlq failed").WithCause(err)
|
|
|
|
|
}
|
|
|
|
|
out := make([]*domusecase.DLQEntryDTO, 0, len(rows))
|
|
|
|
|
for _, row := range rows {
|
|
|
|
|
out = append(out, dlqToDTO(row))
|
|
|
|
|
}
|
|
|
|
|
return out, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (uc *adminNotifierUseCase) RetryDLQ(ctx context.Context, tenantID, dlqID, target string) (*domusecase.NotificationDTO, error) {
|
|
|
|
|
if tenantID == "" || dlqID == "" {
|
|
|
|
|
return nil, errb.InputMissingRequired("tenant_id and dlq_id are required")
|
|
|
|
|
}
|
|
|
|
|
if target == "" {
|
|
|
|
|
return nil, errb.InputMissingRequired("target is required to retry dlq delivery")
|
|
|
|
|
}
|
|
|
|
|
if uc.queue == nil {
|
2026-05-21 06:45:35 +00:00
|
|
|
return nil, errb.SysNotImplemented("retry queue is not configured")
|
2026-05-20 07:01:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
row, err := uc.dlq.FindByID(ctx, tenantID, dlqID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, wrapRepoErr(err)
|
|
|
|
|
}
|
|
|
|
|
if row.Payload == nil {
|
|
|
|
|
return nil, errb.ResInvalidState("dlq entry has no retry payload")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
doc, err := uc.repo.FindByID(ctx, tenantID, row.NotificationID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, wrapRepoErr(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := uc.repo.UpdateDelivery(ctx, tenantID, doc.ID.Hex(), &domrepo.NotificationDeliveryUpdate{
|
|
|
|
|
Status: enum.NotifyStatusPending,
|
|
|
|
|
Attempts: 0,
|
|
|
|
|
LastError: "",
|
|
|
|
|
}); err != nil {
|
|
|
|
|
return nil, wrapRepoErr(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
job := &domusecase.RetryJob{
|
|
|
|
|
NotificationID: row.NotificationID,
|
|
|
|
|
TenantID: tenantID,
|
|
|
|
|
UID: row.UID,
|
|
|
|
|
Channel: row.Channel,
|
|
|
|
|
Kind: row.Kind,
|
|
|
|
|
Target: target,
|
|
|
|
|
Locale: row.Payload.Locale,
|
|
|
|
|
Data: row.Payload.Data,
|
|
|
|
|
DoNotPersistBody: row.Payload.DoNotPersistBody,
|
|
|
|
|
}
|
|
|
|
|
if err := ScheduleImmediate(ctx, uc.queue, job); err != nil {
|
2026-05-21 06:45:35 +00:00
|
|
|
return nil, wrapStoreErr(err, "schedule dlq retry failed")
|
2026-05-20 07:01:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
doc.Status = enum.NotifyStatusPending
|
|
|
|
|
doc.Attempts = 0
|
|
|
|
|
doc.LastError = ""
|
|
|
|
|
return entityToDTO(doc), nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func dlqToDTO(row *domentity.NotificationDLQ) *domusecase.DLQEntryDTO {
|
|
|
|
|
if row == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
dto := &domusecase.DLQEntryDTO{
|
|
|
|
|
ID: row.ID.Hex(),
|
|
|
|
|
NotificationID: row.NotificationID,
|
|
|
|
|
TenantID: row.TenantID,
|
|
|
|
|
UID: row.UID,
|
|
|
|
|
Channel: string(row.Channel),
|
|
|
|
|
Kind: string(row.Kind),
|
|
|
|
|
TargetHash: row.TargetHash,
|
|
|
|
|
LastError: row.LastError,
|
|
|
|
|
Attempts: row.Attempts,
|
|
|
|
|
HasRetryPayload: row.Payload != nil,
|
|
|
|
|
}
|
|
|
|
|
if row.OccurredAt != nil {
|
|
|
|
|
dto.OccurredAt = *row.OccurredAt
|
|
|
|
|
}
|
|
|
|
|
return dto
|
|
|
|
|
}
|