221 lines
5.7 KiB
Go
221 lines
5.7 KiB
Go
package repository
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
app "haixun-backend/internal/library/errors"
|
|
"haixun-backend/internal/library/errors/code"
|
|
domrepo "haixun-backend/internal/model/job/domain/repository"
|
|
|
|
goredis "github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
const (
|
|
queueKeyPrefix = "jobs:queue:"
|
|
lockKeyPrefix = "jobs:lock:"
|
|
cancelKeyPrefix = "jobs:cancel:"
|
|
dedupeKeyPrefix = "jobs:dedupe:"
|
|
schedulerLockKey = "jobs:scheduler:lock"
|
|
)
|
|
|
|
type redisQueueRepository struct {
|
|
client *goredis.Client
|
|
}
|
|
|
|
func NewRedisQueueRepository(client *goredis.Client) domrepo.QueueRepository {
|
|
return &redisQueueRepository{client: client}
|
|
}
|
|
|
|
func (r *redisQueueRepository) requireRedis() error {
|
|
if r.client == nil {
|
|
return app.For(code.Job).DBUnavailable("Redis is not configured")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func queueKey(workerType string) string {
|
|
return queueKeyPrefix + workerType
|
|
}
|
|
|
|
func lockKey(jobID string) string {
|
|
return lockKeyPrefix + jobID
|
|
}
|
|
|
|
func cancelKey(jobID string) string {
|
|
return cancelKeyPrefix + jobID
|
|
}
|
|
|
|
func (r *redisQueueRepository) Enqueue(ctx context.Context, workerType, jobID string) error {
|
|
if err := r.requireRedis(); err != nil {
|
|
return err
|
|
}
|
|
return r.client.LPush(ctx, queueKey(workerType), jobID).Err()
|
|
}
|
|
|
|
func (r *redisQueueRepository) Dequeue(ctx context.Context, workerType string, timeoutSeconds int) (string, error) {
|
|
if err := r.requireRedis(); err != nil {
|
|
return "", err
|
|
}
|
|
if timeoutSeconds <= 0 {
|
|
timeoutSeconds = 2
|
|
}
|
|
result, err := r.client.BRPop(ctx, time.Duration(timeoutSeconds)*time.Second, queueKey(workerType)).Result()
|
|
if err == goredis.Nil {
|
|
return "", nil
|
|
}
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if len(result) < 2 {
|
|
return "", nil
|
|
}
|
|
return result[1], nil
|
|
}
|
|
|
|
func (r *redisQueueRepository) RemoveJob(ctx context.Context, workerType, jobID string) error {
|
|
if err := r.requireRedis(); err != nil {
|
|
return err
|
|
}
|
|
return r.client.LRem(ctx, queueKey(workerType), 0, jobID).Err()
|
|
}
|
|
|
|
func (r *redisQueueRepository) SetCancelSignal(ctx context.Context, jobID, reason string) error {
|
|
if err := r.requireRedis(); err != nil {
|
|
return err
|
|
}
|
|
payload := reason
|
|
if payload == "" {
|
|
payload = "cancel_requested"
|
|
}
|
|
return r.client.Set(ctx, cancelKey(jobID), payload, 24*time.Hour).Err()
|
|
}
|
|
|
|
func (r *redisQueueRepository) GetCancelSignal(ctx context.Context, jobID string) (bool, string, error) {
|
|
if err := r.requireRedis(); err != nil {
|
|
return false, "", err
|
|
}
|
|
value, err := r.client.Get(ctx, cancelKey(jobID)).Result()
|
|
if err == goredis.Nil {
|
|
return false, "", nil
|
|
}
|
|
if err != nil {
|
|
return false, "", err
|
|
}
|
|
return true, value, nil
|
|
}
|
|
|
|
func (r *redisQueueRepository) ClearCancelSignal(ctx context.Context, jobID string) error {
|
|
if err := r.requireRedis(); err != nil {
|
|
return err
|
|
}
|
|
return r.client.Del(ctx, cancelKey(jobID)).Err()
|
|
}
|
|
|
|
func (r *redisQueueRepository) TryLock(ctx context.Context, jobID, workerID string, ttlSeconds int) (bool, error) {
|
|
if err := r.requireRedis(); err != nil {
|
|
return false, err
|
|
}
|
|
if ttlSeconds <= 0 {
|
|
ttlSeconds = 300
|
|
}
|
|
ok, err := r.client.SetNX(ctx, lockKey(jobID), workerID, time.Duration(ttlSeconds)*time.Second).Result()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return ok, nil
|
|
}
|
|
|
|
func (r *redisQueueRepository) ReleaseLock(ctx context.Context, jobID, workerID string) error {
|
|
if err := r.requireRedis(); err != nil {
|
|
return err
|
|
}
|
|
if workerID == "" {
|
|
return app.For(code.Job).InputMissingRequired("worker id is required")
|
|
}
|
|
const script = `
|
|
if redis.call("GET", KEYS[1]) == ARGV[1] then
|
|
return redis.call("DEL", KEYS[1])
|
|
end
|
|
return 0
|
|
`
|
|
return r.client.Eval(ctx, script, []string{lockKey(jobID)}, workerID).Err()
|
|
}
|
|
|
|
func dedupeRedisKey(templateType, dedupeHash string) string {
|
|
return dedupeKeyPrefix + templateType + ":" + dedupeHash
|
|
}
|
|
|
|
func (r *redisQueueRepository) TryAcquireDedupe(ctx context.Context, templateType, dedupeHash, jobID string, ttlSeconds int) (bool, error) {
|
|
if err := r.requireRedis(); err != nil {
|
|
return false, err
|
|
}
|
|
if ttlSeconds <= 0 {
|
|
ttlSeconds = 3600
|
|
}
|
|
ok, err := r.client.SetNX(ctx, dedupeRedisKey(templateType, dedupeHash), jobID, time.Duration(ttlSeconds)*time.Second).Result()
|
|
return ok, err
|
|
}
|
|
|
|
func (r *redisQueueRepository) ReleaseDedupe(ctx context.Context, templateType, dedupeHash string) error {
|
|
if err := r.requireRedis(); err != nil {
|
|
return err
|
|
}
|
|
return r.client.Del(ctx, dedupeRedisKey(templateType, dedupeHash)).Err()
|
|
}
|
|
|
|
func (r *redisQueueRepository) TrySchedulerLock(ctx context.Context, holder string, ttlSeconds int) (bool, error) {
|
|
if err := r.requireRedis(); err != nil {
|
|
return false, err
|
|
}
|
|
if ttlSeconds <= 0 {
|
|
ttlSeconds = 60
|
|
}
|
|
ok, err := r.client.SetNX(ctx, schedulerLockKey, holder, time.Duration(ttlSeconds)*time.Second).Result()
|
|
return ok, err
|
|
}
|
|
|
|
func (r *redisQueueRepository) ReleaseSchedulerLock(ctx context.Context, holder string) error {
|
|
if err := r.requireRedis(); err != nil {
|
|
return err
|
|
}
|
|
current, err := r.client.Get(ctx, schedulerLockKey).Result()
|
|
if err == goredis.Nil {
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if current != holder {
|
|
return nil
|
|
}
|
|
return r.client.Del(ctx, schedulerLockKey).Err()
|
|
}
|
|
|
|
func (r *redisQueueRepository) RefreshLock(ctx context.Context, jobID, workerID string, ttlSeconds int) error {
|
|
if err := r.requireRedis(); err != nil {
|
|
return err
|
|
}
|
|
if workerID == "" {
|
|
return app.For(code.Job).InputMissingRequired("worker id is required")
|
|
}
|
|
if ttlSeconds <= 0 {
|
|
ttlSeconds = 300
|
|
}
|
|
const script = `
|
|
if redis.call("GET", KEYS[1]) == ARGV[1] then
|
|
return redis.call("EXPIRE", KEYS[1], ARGV[2])
|
|
end
|
|
return 0
|
|
`
|
|
result, err := r.client.Eval(ctx, script, []string{lockKey(jobID)}, workerID, ttlSeconds).Int()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if result == 0 {
|
|
return app.For(code.Job).ResInvalidState(fmt.Sprintf("job lock is not held by worker %s", workerID))
|
|
}
|
|
return nil
|
|
}
|