package repository import ( "context" "encoding/json" "fmt" redislib "gateway/internal/library/redis" domrepo "gateway/internal/model/notification/domain/repository" domusecase "gateway/internal/model/notification/domain/usecase" "github.com/zeromicro/go-zero/core/stores/redis" ) // RedisRetryQueue implements RetryQueue on a Redis sorted set. type RedisRetryQueue struct { client *redis.Redis key string } // NewRedisRetryQueue creates a retry queue backed by the shared Redis client. func NewRedisRetryQueue(client *redislib.Client, key string) domrepo.RetryQueue { if client == nil || client.Zero() == nil { panic("notification: redis client is required for retry queue") } if key == "" { panic("notification: retry queue redis key is required") } return &RedisRetryQueue{client: client.Zero(), key: key} } func (q *RedisRetryQueue) Schedule(ctx context.Context, runAtMs int64, job *domusecase.RetryJob) error { if job == nil { return fmt.Errorf("notification: retry job is nil") } raw, err := json.Marshal(job) if err != nil { return fmt.Errorf("notification: marshal retry job: %w", err) } _, err = q.client.ZaddCtx(ctx, q.key, runAtMs, string(raw)) return err } func (q *RedisRetryQueue) ClaimDue(ctx context.Context, nowMs int64, limit int) ([]*domusecase.RetryJob, error) { if limit <= 0 { limit = 32 } pairs, err := q.client.ZrangebyscoreWithScoresAndLimitCtx(ctx, q.key, 0, nowMs, 0, limit) if err != nil { return nil, err } out := make([]*domusecase.RetryJob, 0, len(pairs)) for _, p := range pairs { member := p.Key if n, err := q.client.ZremCtx(ctx, q.key, member); err != nil { return nil, err } else if n == 0 { continue } var job domusecase.RetryJob if err := json.Unmarshal([]byte(member), &job); err != nil { return nil, fmt.Errorf("notification: unmarshal retry job: %w", err) } out = append(out, &job) } return out, nil }