backend/pkg/notification/usecase/delivery.go

348 lines
9.8 KiB
Go

package usecase
import (
"backend/pkg/notification/config"
"backend/pkg/notification/domain/entity"
"backend/pkg/notification/domain/repository"
"backend/pkg/notification/domain/usecase"
"context"
"fmt"
"math"
"sort"
"time"
errs "backend/pkg/library/errors"
)
// DeliveryUseCaseParam 傳送參數配置
type DeliveryUseCaseParam struct {
SMSProviders []usecase.SMSProvider
EmailProviders []usecase.EmailProvider
DeliveryConfig config.DeliveryConfig
HistoryRepo repository.HistoryRepository // 可選的歷史記錄 repository
Logger errs.Logger // 日誌記錄器
}
// DeliveryUseCase 通知發送服務
type DeliveryUseCase struct {
param DeliveryUseCaseParam
}
func MustDeliveryUseCase(param DeliveryUseCaseParam) usecase.DeliveryUseCase {
// 設置默認配置
if param.DeliveryConfig.MaxRetries == 0 {
param.DeliveryConfig.MaxRetries = 3
}
if param.DeliveryConfig.InitialDelay == 0 {
param.DeliveryConfig.InitialDelay = 100 * time.Millisecond
}
if param.DeliveryConfig.BackoffFactor == 0 {
param.DeliveryConfig.BackoffFactor = 2.0
}
if param.DeliveryConfig.MaxDelay == 0 {
param.DeliveryConfig.MaxDelay = 30 * time.Second
}
if param.DeliveryConfig.Timeout == 0 {
param.DeliveryConfig.Timeout = 30 * time.Second
}
return &DeliveryUseCase{
param: param,
}
}
func (use *DeliveryUseCase) SendMessage(ctx context.Context, req usecase.SMSMessageRequest) error {
// 創建歷史記錄
history := &entity.DeliveryHistory{
Type: "sms",
Recipient: req.PhoneNumber,
Subject: "",
Content: req.MessageContent,
Status: entity.DeliveryStatusPending,
AttemptCount: 0,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
if use.param.DeliveryConfig.EnableHistory && use.param.HistoryRepo != nil {
if err := use.param.HistoryRepo.CreateHistory(ctx, history); err != nil {
_ = errs.DBErrorErrorL(use.param.Logger,
[]errs.LogField{
{Key: "type", Val: "sms"},
{Key: "func", Val: "HistoryRepo.CreateHistory"},
{Key: "err", Val: err.Error()},
},
"Failed to create SMS history")
}
}
// 執行發送邏輯
return use.sendWithRetry(ctx, history, &smsProviderAdapter{
providers: use.param.SMSProviders,
request: req,
})
}
func (use *DeliveryUseCase) SendEmail(ctx context.Context, req usecase.MailReq) error {
// 創建歷史記錄
history := &entity.DeliveryHistory{
Type: "email",
Recipient: fmt.Sprintf("%v", req.To),
Subject: req.Subject,
Content: req.Body,
Status: entity.DeliveryStatusPending,
AttemptCount: 0,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
if use.param.DeliveryConfig.EnableHistory && use.param.HistoryRepo != nil {
if err := use.param.HistoryRepo.CreateHistory(ctx, history); err != nil {
_ = errs.DBErrorErrorL(use.param.Logger,
[]errs.LogField{
{Key: "type", Val: "email"},
{Key: "func", Val: "HistoryRepo.CreateHistory"},
{Key: "err", Val: err.Error()},
},
"Failed to create email history")
}
}
// 執行發送邏輯
return use.sendWithRetry(ctx, history, &emailProviderAdapter{
providers: use.param.EmailProviders,
request: req,
})
}
// providerAdapter 統一的供應商適配器接口
type providerAdapter interface {
getProviderCount() int
getProviderName(index int) string
getProviderSort(index int) int64
send(ctx context.Context, providerIndex int) error
getType() string
}
// smsProviderAdapter SMS 供應商適配器
type smsProviderAdapter struct {
providers []usecase.SMSProvider
request usecase.SMSMessageRequest
}
func (a *smsProviderAdapter) getProviderCount() int {
return len(a.providers)
}
func (a *smsProviderAdapter) getProviderName(index int) string {
return fmt.Sprintf("sms_provider_%d", index)
}
func (a *smsProviderAdapter) getProviderSort(index int) int64 {
return a.providers[index].Sort
}
func (a *smsProviderAdapter) send(ctx context.Context, providerIndex int) error {
return a.providers[providerIndex].Repo.SendSMS(ctx, repository.SMSMessageRequest{
PhoneNumber: a.request.PhoneNumber,
RecipientName: a.request.RecipientName,
MessageContent: a.request.MessageContent,
})
}
func (a *smsProviderAdapter) getType() string {
return "SMS"
}
// emailProviderAdapter Email 供應商適配器
type emailProviderAdapter struct {
providers []usecase.EmailProvider
request usecase.MailReq
}
func (a *emailProviderAdapter) getProviderCount() int {
return len(a.providers)
}
func (a *emailProviderAdapter) getProviderName(index int) string {
return fmt.Sprintf("email_provider_%d", index)
}
func (a *emailProviderAdapter) getProviderSort(index int) int64 {
return a.providers[index].Sort
}
func (a *emailProviderAdapter) send(ctx context.Context, providerIndex int) error {
return a.providers[providerIndex].Repo.SendMail(ctx, repository.MailReq{
From: a.request.From,
To: a.request.To,
Subject: a.request.Subject,
Body: a.request.Body,
})
}
func (a *emailProviderAdapter) getType() string {
return "Email"
}
// providerWithIndex 用於排序的結構
type providerWithIndex struct {
index int
sort int64
}
// sendWithRetry 統一的發送重試邏輯
func (use *DeliveryUseCase) sendWithRetry(
ctx context.Context,
history *entity.DeliveryHistory,
adapter providerAdapter,
) error {
// 按 Sort 欄位對供應商進行排序
providerCount := adapter.getProviderCount()
sortedProviders := make([]providerWithIndex, providerCount)
for i := 0; i < providerCount; i++ {
sortedProviders[i] = providerWithIndex{
index: i,
sort: adapter.getProviderSort(i),
}
}
sort.Slice(sortedProviders, func(i, j int) bool {
return sortedProviders[i].sort < sortedProviders[j].sort
})
var lastErr error
totalAttempts := 0
// 嘗試所有 providers
for _, provider := range sortedProviders {
providerIndex := provider.index
// 為每個 provider 嘗試發送
for attempt := 0; attempt < use.param.DeliveryConfig.MaxRetries; attempt++ {
totalAttempts++
// 更新歷史記錄狀態
history.Status = entity.DeliveryStatusSending
history.Provider = adapter.getProviderName(providerIndex)
history.AttemptCount = totalAttempts
history.UpdatedAt = time.Now()
use.updateHistory(ctx, history)
// 記錄發送嘗試
attemptStart := time.Now()
// 創建帶超時的 context
sendCtx, cancel := context.WithTimeout(ctx, use.param.DeliveryConfig.Timeout)
err := adapter.send(sendCtx, providerIndex)
cancel()
// 記錄嘗試結果
attemptDuration := time.Since(attemptStart)
attemptRecord := entity.DeliveryAttempt{
Provider: history.Provider,
AttemptAt: attemptStart,
Success: err == nil,
ErrorMessage: "",
Duration: attemptDuration.Milliseconds(),
}
if err != nil {
attemptRecord.ErrorMessage = err.Error()
lastErr = err
_ = errs.SvcThirdPartyErrorL(use.param.Logger,
[]errs.LogField{
{Key: "type", Val: adapter.getType()},
{Key: "attempt", Val: attempt + 1},
{Key: "provider", Val: providerIndex},
{Key: "err", Val: err.Error()},
},
fmt.Sprintf("%s send attempt %d failed for provider %d", adapter.getType(), attempt+1, providerIndex))
// 如果不是最後一次嘗試,等待後重試
if attempt < use.param.DeliveryConfig.MaxRetries-1 {
delay := use.calculateDelay(attempt)
history.Status = entity.DeliveryStatusRetrying
use.updateHistory(ctx, history)
use.addAttemptRecord(ctx, history.ID, attemptRecord)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(delay):
continue
}
}
} else {
// 發送成功
history.Status = entity.DeliveryStatusSuccess
history.UpdatedAt = time.Now()
now := time.Now()
history.CompletedAt = &now
use.updateHistory(ctx, history)
use.addAttemptRecord(ctx, history.ID, attemptRecord)
// 成功發送不需要記錄錯誤,這裡可以選擇記錄信息日誌或直接返回
return nil
}
use.addAttemptRecord(ctx, history.ID, attemptRecord)
}
}
// 所有 providers 都失敗了
history.Status = entity.DeliveryStatusFailed
history.ErrorMessage = fmt.Sprintf("All providers failed. Last error: %v", lastErr)
history.UpdatedAt = time.Now()
now := time.Now()
history.CompletedAt = &now
use.updateHistory(ctx, history)
return errs.SvcThirdPartyError(
fmt.Sprintf("Failed to send %s after %d attempts across %d providers",
adapter.getType(), totalAttempts, providerCount))
}
// calculateDelay 計算指數退避延遲
func (use *DeliveryUseCase) calculateDelay(attempt int) time.Duration {
delay := float64(use.param.DeliveryConfig.InitialDelay) * math.Pow(use.param.DeliveryConfig.BackoffFactor, float64(attempt))
if delay > float64(use.param.DeliveryConfig.MaxDelay) {
delay = float64(use.param.DeliveryConfig.MaxDelay)
}
return time.Duration(delay)
}
// updateHistory 更新歷史記錄
func (use *DeliveryUseCase) updateHistory(ctx context.Context, history *entity.DeliveryHistory) {
if use.param.DeliveryConfig.EnableHistory && use.param.HistoryRepo != nil {
if err := use.param.HistoryRepo.UpdateHistory(ctx, history); err != nil {
_ = errs.DBErrorErrorL(use.param.Logger,
[]errs.LogField{
{Key: "func", Val: "HistoryRepo.UpdateHistory"},
{Key: "history_id", Val: history.ID},
{Key: "err", Val: err.Error()},
},
"Failed to update delivery history")
}
}
}
// addAttemptRecord 添加發送嘗試記錄
func (use *DeliveryUseCase) addAttemptRecord(ctx context.Context, historyID string, attempt entity.DeliveryAttempt) {
if use.param.DeliveryConfig.EnableHistory && use.param.HistoryRepo != nil {
if err := use.param.HistoryRepo.AddAttempt(ctx, historyID, attempt); err != nil {
_ = errs.DBErrorErrorL(use.param.Logger,
[]errs.LogField{
{Key: "func", Val: "HistoryRepo.AddAttempt"},
{Key: "history_id", Val: historyID},
{Key: "err", Val: err.Error()},
},
"Failed to add attempt record")
}
}
}