thread-master/internal/model/job/repository/redis_queue.go

221 lines
5.7 KiB
Go
Raw Normal View History

2026-06-26 08:37:04 +00:00
package repository
import (
"context"
"fmt"
"time"
app "haixun-backend/internal/library/errors"
"haixun-backend/internal/library/errors/code"
domrepo "haixun-backend/internal/model/job/domain/repository"
goredis "github.com/redis/go-redis/v9"
)
const (
queueKeyPrefix = "jobs:queue:"
lockKeyPrefix = "jobs:lock:"
cancelKeyPrefix = "jobs:cancel:"
dedupeKeyPrefix = "jobs:dedupe:"
schedulerLockKey = "jobs:scheduler:lock"
)
type redisQueueRepository struct {
client *goredis.Client
}
func NewRedisQueueRepository(client *goredis.Client) domrepo.QueueRepository {
return &redisQueueRepository{client: client}
}
func (r *redisQueueRepository) requireRedis() error {
if r.client == nil {
return app.For(code.Job).DBUnavailable("Redis is not configured")
}
return nil
}
func queueKey(workerType string) string {
return queueKeyPrefix + workerType
}
func lockKey(jobID string) string {
return lockKeyPrefix + jobID
}
func cancelKey(jobID string) string {
return cancelKeyPrefix + jobID
}
func (r *redisQueueRepository) Enqueue(ctx context.Context, workerType, jobID string) error {
if err := r.requireRedis(); err != nil {
return err
}
return r.client.LPush(ctx, queueKey(workerType), jobID).Err()
}
func (r *redisQueueRepository) Dequeue(ctx context.Context, workerType string, timeoutSeconds int) (string, error) {
if err := r.requireRedis(); err != nil {
return "", err
}
if timeoutSeconds <= 0 {
timeoutSeconds = 2
}
result, err := r.client.BRPop(ctx, time.Duration(timeoutSeconds)*time.Second, queueKey(workerType)).Result()
if err == goredis.Nil {
return "", nil
}
if err != nil {
return "", err
}
if len(result) < 2 {
return "", nil
}
return result[1], nil
}
func (r *redisQueueRepository) RemoveJob(ctx context.Context, workerType, jobID string) error {
if err := r.requireRedis(); err != nil {
return err
}
return r.client.LRem(ctx, queueKey(workerType), 0, jobID).Err()
}
func (r *redisQueueRepository) SetCancelSignal(ctx context.Context, jobID, reason string) error {
if err := r.requireRedis(); err != nil {
return err
}
payload := reason
if payload == "" {
payload = "cancel_requested"
}
return r.client.Set(ctx, cancelKey(jobID), payload, 24*time.Hour).Err()
}
func (r *redisQueueRepository) GetCancelSignal(ctx context.Context, jobID string) (bool, string, error) {
if err := r.requireRedis(); err != nil {
return false, "", err
}
value, err := r.client.Get(ctx, cancelKey(jobID)).Result()
if err == goredis.Nil {
return false, "", nil
}
if err != nil {
return false, "", err
}
return true, value, nil
}
func (r *redisQueueRepository) ClearCancelSignal(ctx context.Context, jobID string) error {
if err := r.requireRedis(); err != nil {
return err
}
return r.client.Del(ctx, cancelKey(jobID)).Err()
}
func (r *redisQueueRepository) TryLock(ctx context.Context, jobID, workerID string, ttlSeconds int) (bool, error) {
if err := r.requireRedis(); err != nil {
return false, err
}
if ttlSeconds <= 0 {
ttlSeconds = 300
}
ok, err := r.client.SetNX(ctx, lockKey(jobID), workerID, time.Duration(ttlSeconds)*time.Second).Result()
if err != nil {
return false, err
}
return ok, nil
}
func (r *redisQueueRepository) ReleaseLock(ctx context.Context, jobID, workerID string) error {
if err := r.requireRedis(); err != nil {
return err
}
if workerID == "" {
return app.For(code.Job).InputMissingRequired("worker id is required")
}
const script = `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
end
return 0
`
return r.client.Eval(ctx, script, []string{lockKey(jobID)}, workerID).Err()
}
func dedupeRedisKey(templateType, dedupeHash string) string {
return dedupeKeyPrefix + templateType + ":" + dedupeHash
}
func (r *redisQueueRepository) TryAcquireDedupe(ctx context.Context, templateType, dedupeHash, jobID string, ttlSeconds int) (bool, error) {
if err := r.requireRedis(); err != nil {
return false, err
}
if ttlSeconds <= 0 {
ttlSeconds = 3600
}
ok, err := r.client.SetNX(ctx, dedupeRedisKey(templateType, dedupeHash), jobID, time.Duration(ttlSeconds)*time.Second).Result()
return ok, err
}
func (r *redisQueueRepository) ReleaseDedupe(ctx context.Context, templateType, dedupeHash string) error {
if err := r.requireRedis(); err != nil {
return err
}
return r.client.Del(ctx, dedupeRedisKey(templateType, dedupeHash)).Err()
}
func (r *redisQueueRepository) TrySchedulerLock(ctx context.Context, holder string, ttlSeconds int) (bool, error) {
if err := r.requireRedis(); err != nil {
return false, err
}
if ttlSeconds <= 0 {
ttlSeconds = 60
}
ok, err := r.client.SetNX(ctx, schedulerLockKey, holder, time.Duration(ttlSeconds)*time.Second).Result()
return ok, err
}
func (r *redisQueueRepository) ReleaseSchedulerLock(ctx context.Context, holder string) error {
if err := r.requireRedis(); err != nil {
return err
}
current, err := r.client.Get(ctx, schedulerLockKey).Result()
if err == goredis.Nil {
return nil
}
if err != nil {
return err
}
if current != holder {
return nil
}
return r.client.Del(ctx, schedulerLockKey).Err()
}
func (r *redisQueueRepository) RefreshLock(ctx context.Context, jobID, workerID string, ttlSeconds int) error {
if err := r.requireRedis(); err != nil {
return err
}
if workerID == "" {
return app.For(code.Job).InputMissingRequired("worker id is required")
}
if ttlSeconds <= 0 {
ttlSeconds = 300
}
const script = `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("EXPIRE", KEYS[1], ARGV[2])
end
return 0
`
result, err := r.client.Eval(ctx, script, []string{lockKey(jobID)}, workerID, ttlSeconds).Int()
if err != nil {
return err
}
if result == 0 {
return app.For(code.Job).ResInvalidState(fmt.Sprintf("job lock is not held by worker %s", workerID))
}
return nil
}