backend/pkg/library/cassandra/ez_transaction.go

286 lines
9.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package cassandra
import (
"context"
"fmt"
"reflect"
"github.com/gocql/gocql"
"github.com/scylladb/gocqlx/v3"
"github.com/scylladb/gocqlx/v3/qb"
"github.com/scylladb/gocqlx/v3/table"
)
/*
todo 目前尚未實作的部分,但因為目前使用上並沒有嚴格一致性,故目前簡易的版本可先行
1. 讀寫一致性問題
Cassandra 本身為最終一致性,如果在 Commit 期間網路有短暫中斷,可能造成部分操作成功、部分失敗的「半提交」狀態。
Commit 之後,再次掃描 Steps看是否所有 IsExec 都為 true若有 false則觸發額外的重試或警示機制。
2. 反射收集欄位的可靠度
Update 方法透過反射與 isZero 來排除不更新欄位,但若結構體中出現自訂零值(如自訂型態有預設值),可能誤過濾掉真正要更新的欄位。
可能在資料模型層先明確標示「要更新的欄位列表」,或提供外部參數指明更新欄位,以減少反射過濾錯誤。
3. 交易邊界與隔離度
此實作並未提供交易隔離Isolation外部程式仍可能在交易尚未 Commit 時讀到中間狀態。
若對讀取一致性有嚴格要求,可考慮使用 Cassandra 的 Lightweight TransactionsLWT搭配 IF NOT EXISTS / IF 條件,確保寫入前的前置檢查。
4. 錯誤重試與警示
當 Commit 中某個步驟失敗,直接返回錯誤,但沒有集中收集失敗資訊。
建議整合一個「監控與重試」機制將失敗細節step index、錯誤訊息記錄到外部持久化系統以便運維人員介入或自動重試。
5. 崩潰恢復
如果程式在 Commit 過程中程式本身當掉,記憶體中的 Steps 會丟失,無法回滾。
可以把 OperationLog 持久化到可靠的日誌表Cassandra 或外部 DBCommit 之前就先寫入,並在啟動時掃描未完成的交易回滾或重試。
*/
type Action int64
const (
ActionUnknown Action = iota
ActionInsert
ActionDelete
ActionUpdate
)
// OperationLog 記錄操作日誌,用於補償回滾
type OperationLog struct {
ID gocql.UUID // 操作ID用來標識該操作
Action Action // 操作類型(增、刪、改)
IsExec bool
Exec []*gocqlx.Queryx // 這一個步驟要執行的東西
OldData any // 變更前的數據,僅對修改和刪除有效
NewData any // 變更後的數據,僅對新增和修改有效
}
// CompensatingTransaction 補償式交易介面
// 這是一個基於補償操作Compensating Action的交易模式適用於最終一致性場景
// 與傳統 ACID 交易不同,它不提供隔離性保證,但可以確保「要嘛全成功,要嘛全失敗」
// 注意:這不是真正的原子性交易,而是透過記錄操作日誌並在失敗時執行補償操作來實現
type CompensatingTransaction interface {
Insert(ctx context.Context, document any) error
Delete(ctx context.Context, filter any) error
Update(ctx context.Context, document any) error
Rollback() error
Commit() error
}
// transaction 定義補償操作的結構
type transaction struct {
ctx context.Context
keyspace string
db *CassandraDB
Steps []OperationLog // 用來記錄所有操作步驟的日誌
}
// NewCompensatingTransaction 創建一個新的補償式交易
// keyspace 如果為空,則使用初始化時設定的預設 keyspace
func NewCompensatingTransaction(ctx context.Context, keyspace string, db *CassandraDB) CompensatingTransaction {
keyspace = getKeyspace(db, keyspace)
return &transaction{
ctx: ctx,
keyspace: keyspace,
db: db,
Steps: []OperationLog{},
}
}
// NewEZTransaction 創建一個新的補償式交易(向後相容的別名)
// Deprecated: 使用 NewCompensatingTransaction 代替
func NewEZTransaction(ctx context.Context, keyspace string, db *CassandraDB) CompensatingTransaction {
return NewCompensatingTransaction(ctx, keyspace, db)
}
func (tx *transaction) Insert(ctx context.Context, document any) error {
metadata, err := GenerateTableMetadata(document, tx.keyspace)
if err != nil {
return err
}
t := table.New(metadata)
q := qh.withContextAndTimestamp(ctx, tx.db.GetSession().Query(t.Insert()).BindStruct(document))
logEntry := OperationLog{
ID: gocql.TimeUUID(),
Action: ActionInsert,
Exec: []*gocqlx.Queryx{q},
NewData: document,
}
tx.Steps = append(tx.Steps, logEntry)
return nil
}
func (tx *transaction) Delete(ctx context.Context, filter any) error {
metadata, err := GenerateTableMetadata(filter, tx.keyspace)
if err != nil {
return err
}
t := table.New(metadata)
doc := filter
get := tx.db.GetSession().Query(t.Get()).BindStruct(doc).WithContext(ctx)
q := qh.withContextAndTimestamp(ctx, tx.db.GetSession().Query(t.Delete()).BindStruct(filter))
logEntry := OperationLog{
ID: gocql.TimeUUID(),
Action: ActionDelete,
Exec: []*gocqlx.Queryx{get, q}, // 有順序,要先拿取保留舊資料,
OldData: doc, // 保留結構體才有機會回復
}
tx.Steps = append(tx.Steps, logEntry)
return nil
}
func (tx *transaction) Update(ctx context.Context, document any) error {
metadata, err := GenerateTableMetadata(document, tx.keyspace)
if err != nil {
return err
}
t := table.New(metadata)
v := reflect.ValueOf(document)
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
typ := v.Type()
// 收集更新欄位與其值(排除零值,保留主鍵)
setCols := make([]string, 0)
setVals := make([]any, 0)
whereCols := make([]string, 0)
whereVals := make([]any, 0)
for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i)
tag := field.Tag.Get("db")
if tag == "" || tag == "-" {
continue
}
val := v.Field(i)
if !val.IsValid() {
continue
}
if contains(metadata.PartKey, tag) || contains(metadata.SortKey, tag) {
whereCols = append(whereCols, tag)
whereVals = append(whereVals, val.Interface())
continue
}
if isZero(val) {
continue
}
setCols = append(setCols, tag)
setVals = append(setVals, val.Interface())
}
if len(setCols) == 0 {
return ErrNoFieldsToUpdate.WithTable(metadata.Name)
}
// Build UPDATE statement
builder := qb.Update(metadata.Name).Set(setCols...)
for _, col := range whereCols {
builder = builder.Where(qb.Eq(col))
}
stmt, names := builder.ToCql()
setVals = append(setVals, whereVals...)
q := qh.withContextAndTimestamp(ctx, tx.db.GetSession().Query(stmt, names).Bind(setVals...))
doc := document
get := tx.db.GetSession().Query(t.Get()).BindStruct(doc).WithContext(ctx)
logEntry := OperationLog{
ID: gocql.TimeUUID(),
Action: ActionUpdate,
Exec: []*gocqlx.Queryx{get, q}, // 有順序,要先拿取保留舊資料,才可以 update
OldData: doc, // 保留結構體才有機會回復
NewData: document,
}
tx.Steps = append(tx.Steps, logEntry)
return nil
}
func (tx *transaction) Rollback() error {
for _, item := range tx.Steps {
// 沒有做過的就不用回復了
if !item.IsExec {
continue
}
switch item.Action {
case ActionInsert:
err := tx.db.Delete(tx.ctx, item.NewData, tx.keyspace)
if err != nil {
// Rollback 失敗時繼續處理其他步驟,但最終會返回錯誤
// 注意:這裡不記錄日誌,因為 library 包不應該直接記錄日誌
// 調用者應該根據返回的錯誤進行日誌記錄
continue
}
case ActionUpdate:
err := tx.db.Update(tx.ctx, item.OldData, tx.keyspace)
if err != nil {
// Rollback 失敗時繼續處理其他步驟,但最終會返回錯誤
continue
}
case ActionDelete:
err := tx.db.Insert(tx.ctx, item.OldData, tx.keyspace)
if err != nil {
// Rollback 失敗時繼續處理其他步驟,但最終會返回錯誤
continue
}
}
}
return nil
}
func (tx *transaction) Commit() error {
for i, step := range tx.Steps {
switch step.Action {
case ActionInsert:
// 單純插入,不用回滾額外做事,插入的資料已經放在 New Data 裡面了
if err := step.Exec[0].ExecRelease(); err != nil {
return fmt.Errorf("failed to insert: %w", err)
}
// 標示為以執行,如果有錯誤要回復,指座椅執行的就好
tx.Steps[i].IsExec = true
case ActionUpdate:
// 要先 get 之後再 Update
// 單純插入,不用回滾額外做事,插入的資料已經放在 New Data 裡面了
if err := step.Exec[0].GetRelease(step.OldData); err != nil {
return fmt.Errorf("failed to get: %w", err)
}
if err := step.Exec[1].ExecRelease(); err != nil {
return fmt.Errorf("failed to update: %w", err)
}
// 標示為以執行,如果有錯誤要回復,指座椅執行的就好
tx.Steps[i].IsExec = true
case ActionDelete:
// 要先 get 之後再 Update
// 單純插入,不用回滾額外做事,插入的資料已經放在 New Data 裡面了
if err := step.Exec[0].GetRelease(step.OldData); err != nil {
return fmt.Errorf("failed to get: %w", err)
}
if err := step.Exec[1].ExecRelease(); err != nil {
return fmt.Errorf("failed to delete: %w", err)
}
// 標示為以執行,如果有錯誤要回復,指座椅執行的就好
tx.Steps[i].IsExec = true
default:
return fmt.Errorf("unknown action: %v", step.Action)
}
}
return nil
}