template-monorepo/internal/model/notification/repository/retry_queue_redis.go

68 lines
1.9 KiB
Go
Raw Normal View History

package repository
import (
"context"
"encoding/json"
"fmt"
redislib "gateway/internal/library/redis"
domrepo "gateway/internal/model/notification/domain/repository"
domusecase "gateway/internal/model/notification/domain/usecase"
"github.com/zeromicro/go-zero/core/stores/redis"
)
// RedisRetryQueue implements RetryQueue on a Redis sorted set.
type RedisRetryQueue struct {
client *redis.Redis
key string
}
// NewRedisRetryQueue creates a retry queue backed by the shared Redis client.
func NewRedisRetryQueue(client *redislib.Client, key string) domrepo.RetryQueue {
if client == nil || client.Zero() == nil {
panic("notification: redis client is required for retry queue")
}
if key == "" {
panic("notification: retry queue redis key is required")
}
return &RedisRetryQueue{client: client.Zero(), key: key}
}
func (q *RedisRetryQueue) Schedule(ctx context.Context, runAtMs int64, job *domusecase.RetryJob) error {
if job == nil {
return fmt.Errorf("notification: retry job is nil")
}
raw, err := json.Marshal(job)
if err != nil {
return fmt.Errorf("notification: marshal retry job: %w", err)
}
_, err = q.client.ZaddCtx(ctx, q.key, runAtMs, string(raw))
return err
}
func (q *RedisRetryQueue) ClaimDue(ctx context.Context, nowMs int64, limit int) ([]*domusecase.RetryJob, error) {
if limit <= 0 {
limit = 32
}
pairs, err := q.client.ZrangebyscoreWithScoresAndLimitCtx(ctx, q.key, 0, nowMs, 0, limit)
if err != nil {
return nil, err
}
out := make([]*domusecase.RetryJob, 0, len(pairs))
for _, p := range pairs {
member := p.Key
if n, err := q.client.ZremCtx(ctx, q.key, member); err != nil {
return nil, err
} else if n == 0 {
continue
}
var job domusecase.RetryJob
if err := json.Unmarshal([]byte(member), &job); err != nil {
return nil, fmt.Errorf("notification: unmarshal retry job: %w", err)
}
out = append(out, &job)
}
return out, nil
}