backend/pkg/library/cassandra/lock.go

121 lines
2.9 KiB
Go
Raw Permalink Normal View History

2025-11-17 09:31:58 +00:00
package cassandra
import (
"context"
"errors"
"fmt"
"time"
"github.com/gocql/gocql"
2025-11-19 05:33:06 +00:00
"github.com/scylladb/gocqlx/v2/qb"
2025-11-17 09:31:58 +00:00
)
const (
2025-11-18 09:45:38 +00:00
defaultLockTTLSec = 30
defaultLockRetry = 3
lockBaseDelay = 100 * time.Millisecond
2025-11-17 09:31:58 +00:00
)
// LockOption 用來設定 TryLock 的 TTL 行為
type LockOption func(*lockOptions)
type lockOptions struct {
ttlSeconds int // TTL單位秒<=0 代表不 expire
}
2025-11-18 09:45:38 +00:00
// WithLockTTL 設定鎖的 TTL
2025-11-17 09:31:58 +00:00
func WithLockTTL(d time.Duration) LockOption {
return func(o *lockOptions) {
o.ttlSeconds = int(d.Seconds())
}
}
// WithNoLockExpire 永不自動解鎖
func WithNoLockExpire() LockOption {
return func(o *lockOptions) {
o.ttlSeconds = 0
}
}
// TryLock 嘗試在表上插入一筆唯一鍵IF NOT EXISTS作為鎖
// 預設 30 秒 TTL可透過 option 調整或取消 TTL
2025-11-18 09:45:38 +00:00
func (r *repository[T]) TryLock(ctx context.Context, doc T, opts ...LockOption) error {
// 組合 option
options := &lockOptions{ttlSeconds: defaultLockTTLSec}
2025-11-17 09:31:58 +00:00
for _, opt := range opts {
opt(options)
}
2025-11-18 09:45:38 +00:00
// 建 TTL 子句
builder := qb.Insert(r.table).
2025-11-17 09:31:58 +00:00
Unique(). // IF NOT EXISTS
2025-11-18 09:45:38 +00:00
Columns(r.metadata.Columns...)
2025-11-17 09:31:58 +00:00
if options.ttlSeconds > 0 {
ttl := time.Duration(options.ttlSeconds) * time.Second
builder = builder.TTL(ttl)
}
stmt, names := builder.ToCql()
2025-11-18 09:45:38 +00:00
// 執行 CAS
q := r.db.session.Query(stmt, names).BindStruct(doc).
2025-11-17 09:31:58 +00:00
WithContext(ctx).
WithTimestamp(time.Now().UnixNano() / 1e3).
SerialConsistency(gocql.Serial)
applied, err := q.ExecCASRelease()
if err != nil {
2025-11-18 09:45:38 +00:00
return ErrInvalidInput.WithTable(r.table).WithError(err)
2025-11-17 09:31:58 +00:00
}
if !applied {
2025-11-18 09:45:38 +00:00
return NewError(ErrCodeConflict, "acquire lock failed").WithTable(r.table)
2025-11-17 09:31:58 +00:00
}
return nil
}
// UnLock 釋放鎖,其實就是 Delete
2025-11-18 09:45:38 +00:00
func (r *repository[T]) UnLock(ctx context.Context, doc T) error {
2025-11-17 09:31:58 +00:00
var lastErr error
2025-11-18 09:45:38 +00:00
for i := 0; i < defaultLockRetry; i++ {
builder := qb.Delete(r.table).Existing()
2025-11-17 09:31:58 +00:00
2025-11-18 09:45:38 +00:00
// 動態添加 WHERE 條件(使用 Partition Key
for _, key := range r.metadata.PartKey {
2025-11-17 09:31:58 +00:00
builder = builder.Where(qb.Eq(key))
}
stmt, names := builder.ToCql()
2025-11-18 09:45:38 +00:00
q := r.db.session.Query(stmt, names).BindStruct(doc).
2025-11-17 09:31:58 +00:00
WithContext(ctx).
WithTimestamp(time.Now().UnixNano() / 1e3).
SerialConsistency(gocql.Serial)
applied, err := q.ExecCASRelease()
if err == nil && applied {
return nil
}
if err != nil {
2025-11-18 09:45:38 +00:00
lastErr = fmt.Errorf("unlock error: %w", err)
2025-11-17 09:31:58 +00:00
} else if !applied {
2025-11-18 09:45:38 +00:00
lastErr = fmt.Errorf("unlock not applied: row not found or not visible yet")
2025-11-17 09:31:58 +00:00
}
2025-11-18 09:45:38 +00:00
time.Sleep(lockBaseDelay * time.Duration(1<<i)) // 100ms → 200ms → 400ms
2025-11-17 09:31:58 +00:00
}
2025-11-18 09:45:38 +00:00
return ErrInvalidInput.WithTable(r.table).WithError(
fmt.Errorf("unlock failed after %d retries: %w", defaultLockRetry, lastErr),
)
}
// IsLockFailed 檢查錯誤是否為獲取鎖失敗
func IsLockFailed(err error) bool {
var e *Error
if errors.As(err, &e) {
return e.Code == ErrCodeConflict && e.Message == "acquire lock failed"
}
return false
2025-11-17 09:31:58 +00:00
}