package repository import ( "context" "fmt" "time" app "haixun-backend/internal/library/errors" "haixun-backend/internal/library/errors/code" domrepo "haixun-backend/internal/model/job/domain/repository" goredis "github.com/redis/go-redis/v9" ) const ( queueKeyPrefix = "jobs:queue:" lockKeyPrefix = "jobs:lock:" cancelKeyPrefix = "jobs:cancel:" dedupeKeyPrefix = "jobs:dedupe:" schedulerLockKey = "jobs:scheduler:lock" ) type redisQueueRepository struct { client *goredis.Client } func NewRedisQueueRepository(client *goredis.Client) domrepo.QueueRepository { return &redisQueueRepository{client: client} } func (r *redisQueueRepository) requireRedis() error { if r.client == nil { return app.For(code.Job).DBUnavailable("Redis is not configured") } return nil } func queueKey(workerType string) string { return queueKeyPrefix + workerType } func lockKey(jobID string) string { return lockKeyPrefix + jobID } func cancelKey(jobID string) string { return cancelKeyPrefix + jobID } func (r *redisQueueRepository) Enqueue(ctx context.Context, workerType, jobID string) error { if err := r.requireRedis(); err != nil { return err } return r.client.LPush(ctx, queueKey(workerType), jobID).Err() } func (r *redisQueueRepository) Dequeue(ctx context.Context, workerType string, timeoutSeconds int) (string, error) { if err := r.requireRedis(); err != nil { return "", err } if timeoutSeconds <= 0 { timeoutSeconds = 2 } result, err := r.client.BRPop(ctx, time.Duration(timeoutSeconds)*time.Second, queueKey(workerType)).Result() if err == goredis.Nil { return "", nil } if err != nil { return "", err } if len(result) < 2 { return "", nil } return result[1], nil } func (r *redisQueueRepository) RemoveJob(ctx context.Context, workerType, jobID string) error { if err := r.requireRedis(); err != nil { return err } return r.client.LRem(ctx, queueKey(workerType), 0, jobID).Err() } func (r *redisQueueRepository) SetCancelSignal(ctx context.Context, jobID, reason string) error { if err := r.requireRedis(); err != nil { return err } payload := reason if payload == "" { payload = "cancel_requested" } return r.client.Set(ctx, cancelKey(jobID), payload, 24*time.Hour).Err() } func (r *redisQueueRepository) GetCancelSignal(ctx context.Context, jobID string) (bool, string, error) { if err := r.requireRedis(); err != nil { return false, "", err } value, err := r.client.Get(ctx, cancelKey(jobID)).Result() if err == goredis.Nil { return false, "", nil } if err != nil { return false, "", err } return true, value, nil } func (r *redisQueueRepository) ClearCancelSignal(ctx context.Context, jobID string) error { if err := r.requireRedis(); err != nil { return err } return r.client.Del(ctx, cancelKey(jobID)).Err() } func (r *redisQueueRepository) TryLock(ctx context.Context, jobID, workerID string, ttlSeconds int) (bool, error) { if err := r.requireRedis(); err != nil { return false, err } if ttlSeconds <= 0 { ttlSeconds = 300 } ok, err := r.client.SetNX(ctx, lockKey(jobID), workerID, time.Duration(ttlSeconds)*time.Second).Result() if err != nil { return false, err } return ok, nil } func (r *redisQueueRepository) ReleaseLock(ctx context.Context, jobID, workerID string) error { if err := r.requireRedis(); err != nil { return err } if workerID == "" { return app.For(code.Job).InputMissingRequired("worker id is required") } const script = ` if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) end return 0 ` return r.client.Eval(ctx, script, []string{lockKey(jobID)}, workerID).Err() } func dedupeRedisKey(templateType, dedupeHash string) string { return dedupeKeyPrefix + templateType + ":" + dedupeHash } func (r *redisQueueRepository) TryAcquireDedupe(ctx context.Context, templateType, dedupeHash, jobID string, ttlSeconds int) (bool, error) { if err := r.requireRedis(); err != nil { return false, err } if ttlSeconds <= 0 { ttlSeconds = 3600 } ok, err := r.client.SetNX(ctx, dedupeRedisKey(templateType, dedupeHash), jobID, time.Duration(ttlSeconds)*time.Second).Result() return ok, err } func (r *redisQueueRepository) ReleaseDedupe(ctx context.Context, templateType, dedupeHash string) error { if err := r.requireRedis(); err != nil { return err } return r.client.Del(ctx, dedupeRedisKey(templateType, dedupeHash)).Err() } func (r *redisQueueRepository) TrySchedulerLock(ctx context.Context, holder string, ttlSeconds int) (bool, error) { if err := r.requireRedis(); err != nil { return false, err } if ttlSeconds <= 0 { ttlSeconds = 60 } ok, err := r.client.SetNX(ctx, schedulerLockKey, holder, time.Duration(ttlSeconds)*time.Second).Result() return ok, err } func (r *redisQueueRepository) ReleaseSchedulerLock(ctx context.Context, holder string) error { if err := r.requireRedis(); err != nil { return err } current, err := r.client.Get(ctx, schedulerLockKey).Result() if err == goredis.Nil { return nil } if err != nil { return err } if current != holder { return nil } return r.client.Del(ctx, schedulerLockKey).Err() } func (r *redisQueueRepository) RefreshLock(ctx context.Context, jobID, workerID string, ttlSeconds int) error { if err := r.requireRedis(); err != nil { return err } if workerID == "" { return app.For(code.Job).InputMissingRequired("worker id is required") } if ttlSeconds <= 0 { ttlSeconds = 300 } const script = ` if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("EXPIRE", KEYS[1], ARGV[2]) end return 0 ` result, err := r.client.Eval(ctx, script, []string{lockKey(jobID)}, workerID, ttlSeconds).Int() if err != nil { return err } if result == 0 { return app.For(code.Job).ResInvalidState(fmt.Sprintf("job lock is not held by worker %s", workerID)) } return nil }