277 lines
8.7 KiB
Go
277 lines
8.7 KiB
Go
package cassandra
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"reflect"
|
||
"time"
|
||
|
||
"github.com/gocql/gocql"
|
||
"github.com/scylladb/gocqlx/v3"
|
||
"github.com/scylladb/gocqlx/v3/qb"
|
||
"github.com/scylladb/gocqlx/v3/table"
|
||
"github.com/testcontainers/testcontainers-go/log"
|
||
)
|
||
|
||
/*
|
||
todo 目前尚未實作的部分,但因為目前使用上並沒有嚴格一致性,故目前簡易的版本可先行
|
||
|
||
1. 讀寫一致性問題
|
||
Cassandra 本身為最終一致性,如果在 Commit 期間網路有短暫中斷,可能造成部分操作成功、部分失敗的「半提交」狀態。
|
||
Commit 之後,再次掃描 Steps,看是否所有 IsExec 都為 true,若有 false,則觸發額外的重試或警示機制。
|
||
|
||
2. 反射收集欄位的可靠度
|
||
Update 方法透過反射與 isZero 來排除不更新欄位,但若結構體中出現自訂零值(如自訂型態有預設值),可能誤過濾掉真正要更新的欄位。
|
||
可能在資料模型層先明確標示「要更新的欄位列表」,或提供外部參數指明更新欄位,以減少反射過濾錯誤。
|
||
|
||
3. 交易邊界與隔離度
|
||
此實作並未提供交易隔離(Isolation),外部程式仍可能在交易尚未 Commit 時讀到中間狀態。
|
||
若對讀取一致性有嚴格要求,可考慮使用 Cassandra 的 Lightweight Transactions(LWT)搭配 IF NOT EXISTS / IF 條件,確保寫入前的前置檢查。
|
||
|
||
4. 錯誤重試與警示
|
||
當 Commit 中某個步驟失敗,直接返回錯誤,但沒有集中收集失敗資訊。
|
||
建議整合一個「監控與重試」機制,將失敗細節(step index、錯誤訊息)記錄到外部持久化系統,以便運維人員介入或自動重試。
|
||
|
||
5. 崩潰恢復
|
||
如果程式在 Commit 過程中程式本身當掉,記憶體中的 Steps 會丟失,無法回滾。
|
||
可以把 OperationLog 持久化到可靠的日誌表(Cassandra 或外部 DB),Commit 之前就先寫入,並在啟動時掃描未完成的交易回滾或重試。
|
||
*/
|
||
|
||
type Action int64
|
||
|
||
const (
|
||
ActionUnknown Action = iota
|
||
ActionInsert
|
||
ActionDelete
|
||
ActionUpdate
|
||
)
|
||
|
||
// OperationLog 記錄操作日誌,用於補償回滾
|
||
type OperationLog struct {
|
||
ID gocql.UUID // 操作ID,用來標識該操作
|
||
Action Action // 操作類型(增、刪、改)
|
||
IsExec bool
|
||
Exec []*gocqlx.Queryx // 這一個步驟要執行的東西
|
||
OldData any // 變更前的數據,僅對修改和刪除有效
|
||
NewData any // 變更後的數據,僅對新增和修改有效
|
||
}
|
||
|
||
// Transaction 這裡是單個實體內的 TX ,不管一些競爭的問題,不管隔離,就可以確定一筆資料要嘛全成功,要嘛全失敗。
|
||
type Transaction interface {
|
||
Insert(ctx context.Context, document any) error
|
||
Delete(ctx context.Context, filter any) error
|
||
Update(ctx context.Context, document any) error
|
||
Rollback() error
|
||
Commit() error
|
||
}
|
||
|
||
// transaction 定義補償操作的結構
|
||
type transaction struct {
|
||
ctx context.Context
|
||
keyspace string
|
||
db *CassandraDB
|
||
Steps []OperationLog // 用來記錄所有操作步驟的日誌
|
||
}
|
||
|
||
func NewEZTransaction(ctx context.Context, keyspace string, db *CassandraDB) Transaction {
|
||
return &transaction{
|
||
ctx: ctx,
|
||
keyspace: keyspace,
|
||
db: db,
|
||
Steps: []OperationLog{},
|
||
}
|
||
}
|
||
|
||
func (tx *transaction) Insert(ctx context.Context, document any) error {
|
||
metadata, err := GenerateTableMetadata(document, tx.keyspace)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
t := table.New(metadata)
|
||
|
||
q := tx.db.GetSession().Query(t.Insert()).BindStruct(document).WithContext(ctx).WithTimestamp(time.Now().UnixNano() / 1e3)
|
||
logEntry := OperationLog{
|
||
ID: gocql.TimeUUID(),
|
||
Action: ActionInsert,
|
||
Exec: []*gocqlx.Queryx{q},
|
||
NewData: document,
|
||
}
|
||
tx.Steps = append(tx.Steps, logEntry)
|
||
|
||
return nil
|
||
}
|
||
|
||
func (tx *transaction) Delete(ctx context.Context, filter any) error {
|
||
metadata, err := GenerateTableMetadata(filter, tx.keyspace)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
t := table.New(metadata)
|
||
|
||
doc := filter
|
||
|
||
get := tx.db.GetSession().Query(t.Get()).BindStruct(doc).WithContext(ctx)
|
||
q := tx.db.GetSession().Query(t.Delete()).BindStruct(filter).WithContext(ctx).WithTimestamp(time.Now().UnixNano() / 1e3)
|
||
|
||
logEntry := OperationLog{
|
||
ID: gocql.TimeUUID(),
|
||
Action: ActionDelete,
|
||
Exec: []*gocqlx.Queryx{get, q}, // 有順序,要先拿取保留舊資料,
|
||
OldData: doc, // 保留結構體才有機會回復
|
||
}
|
||
tx.Steps = append(tx.Steps, logEntry)
|
||
|
||
return nil
|
||
}
|
||
|
||
func (tx *transaction) Update(ctx context.Context, document any) error {
|
||
metadata, err := GenerateTableMetadata(document, tx.keyspace)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
t := table.New(metadata)
|
||
|
||
v := reflect.ValueOf(document)
|
||
if v.Kind() == reflect.Ptr {
|
||
v = v.Elem()
|
||
}
|
||
typ := v.Type()
|
||
|
||
// 收集更新欄位與其值(排除零值,保留主鍵)
|
||
setCols := make([]string, 0)
|
||
setVals := make([]any, 0)
|
||
whereCols := make([]string, 0)
|
||
whereVals := make([]any, 0)
|
||
|
||
for i := 0; i < typ.NumField(); i++ {
|
||
field := typ.Field(i)
|
||
tag := field.Tag.Get("db")
|
||
if tag == "" || tag == "-" {
|
||
continue
|
||
}
|
||
|
||
val := v.Field(i)
|
||
if !val.IsValid() {
|
||
continue
|
||
}
|
||
|
||
if contains(metadata.PartKey, tag) || contains(metadata.SortKey, tag) {
|
||
whereCols = append(whereCols, tag)
|
||
whereVals = append(whereVals, val.Interface())
|
||
continue
|
||
}
|
||
|
||
if isZero(val) {
|
||
continue
|
||
}
|
||
|
||
setCols = append(setCols, tag)
|
||
setVals = append(setVals, val.Interface())
|
||
}
|
||
|
||
if len(setCols) == 0 {
|
||
return fmt.Errorf("no non-zero update fields provided")
|
||
}
|
||
|
||
// Build UPDATE statement
|
||
builder := qb.Update(metadata.Name).Set(setCols...)
|
||
for _, col := range whereCols {
|
||
builder = builder.Where(qb.Eq(col))
|
||
}
|
||
stmt, names := builder.ToCql()
|
||
|
||
args := append(setVals, whereVals...)
|
||
q := tx.db.GetSession().Query(stmt, names).Bind(args...).WithContext(ctx).WithTimestamp(time.Now().UnixNano() / 1e3)
|
||
|
||
doc := document
|
||
get := tx.db.GetSession().Query(t.Get()).BindStruct(doc).WithContext(ctx)
|
||
|
||
logEntry := OperationLog{
|
||
ID: gocql.TimeUUID(),
|
||
Action: ActionUpdate,
|
||
Exec: []*gocqlx.Queryx{get, q}, // 有順序,要先拿取保留舊資料,才可以 update
|
||
OldData: doc, // 保留結構體才有機會回復
|
||
NewData: document,
|
||
}
|
||
tx.Steps = append(tx.Steps, logEntry)
|
||
|
||
return nil
|
||
}
|
||
|
||
func (tx *transaction) Rollback() error {
|
||
for _, item := range tx.Steps {
|
||
// 沒有做過的就不用回復了
|
||
if !item.IsExec {
|
||
continue
|
||
}
|
||
|
||
switch item.Action {
|
||
case ActionInsert:
|
||
err := tx.db.Delete(tx.ctx, item.NewData, tx.keyspace)
|
||
if err != nil {
|
||
// todo log 下來,最大程度保證,都有做完,真的有錯從 log 去補,目前不做
|
||
log.Printf("failed to delete since rollback, data: %v", item.NewData)
|
||
continue
|
||
}
|
||
case ActionUpdate:
|
||
err := tx.db.Update(tx.ctx, item.OldData, tx.keyspace)
|
||
if err != nil {
|
||
// todo log 下來,最大程度保證,都有做完,真的有錯從 log 去補,目前不做
|
||
log.Printf("failed to update since rollback, data: %v", item.OldData)
|
||
continue
|
||
}
|
||
case ActionDelete:
|
||
err := tx.db.Insert(tx.ctx, item.OldData, tx.keyspace)
|
||
if err != nil {
|
||
// todo log 下來,最大程度保證,都有做完,真的有錯從 log 去補,目前不做
|
||
log.Printf("failed to insert since rollback, data: %v", item.OldData)
|
||
continue
|
||
}
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (tx *transaction) Commit() error {
|
||
for i, step := range tx.Steps {
|
||
switch step.Action {
|
||
case ActionInsert:
|
||
// 單純插入,不用回滾額外做事,插入的資料已經放在 New Data 裡面了
|
||
if err := step.Exec[0].ExecRelease(); err != nil {
|
||
return fmt.Errorf("failed to insert: %w", err)
|
||
}
|
||
// 標示為以執行,如果有錯誤要回復,指座椅執行的就好
|
||
tx.Steps[i].IsExec = true
|
||
case ActionUpdate:
|
||
// 要先 get 之後再 Update
|
||
// 單純插入,不用回滾額外做事,插入的資料已經放在 New Data 裡面了
|
||
if err := step.Exec[0].GetRelease(step.OldData); err != nil {
|
||
return fmt.Errorf("failed to get: %w", err)
|
||
}
|
||
|
||
if err := step.Exec[1].ExecRelease(); err != nil {
|
||
return fmt.Errorf("failed to update: %w", err)
|
||
}
|
||
|
||
// 標示為以執行,如果有錯誤要回復,指座椅執行的就好
|
||
tx.Steps[i].IsExec = true
|
||
case ActionDelete:
|
||
// 要先 get 之後再 Update
|
||
// 單純插入,不用回滾額外做事,插入的資料已經放在 New Data 裡面了
|
||
if err := step.Exec[0].GetRelease(step.OldData); err != nil {
|
||
return fmt.Errorf("failed to get: %w", err)
|
||
}
|
||
if err := step.Exec[1].ExecRelease(); err != nil {
|
||
return fmt.Errorf("failed to delete: %w", err)
|
||
}
|
||
// 標示為以執行,如果有錯誤要回復,指座椅執行的就好
|
||
tx.Steps[i].IsExec = true
|
||
default:
|
||
return fmt.Errorf("unknown action: %v", step.Action)
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|