package cassandra import ( "context" "fmt" "github.com/gocql/gocql" "github.com/scylladb/gocqlx/v3" "github.com/scylladb/gocqlx/v3/qb" "github.com/scylladb/gocqlx/v3/table" "github.com/testcontainers/testcontainers-go/log" "reflect" "time" ) /* todo 目前尚未實作的部分,但因為目前使用上並沒有嚴格一致性,故目前簡易的版本可先行 1. 讀寫一致性問題 Cassandra 本身為最終一致性,如果在 Commit 期間網路有短暫中斷,可能造成部分操作成功、部分失敗的「半提交」狀態。 Commit 之後,再次掃描 Steps,看是否所有 IsExec 都為 true,若有 false,則觸發額外的重試或警示機制。 2. 反射收集欄位的可靠度 Update 方法透過反射與 isZero 來排除不更新欄位,但若結構體中出現自訂零值(如自訂型態有預設值),可能誤過濾掉真正要更新的欄位。 可能在資料模型層先明確標示「要更新的欄位列表」,或提供外部參數指明更新欄位,以減少反射過濾錯誤。 3. 交易邊界與隔離度 此實作並未提供交易隔離(Isolation),外部程式仍可能在交易尚未 Commit 時讀到中間狀態。 若對讀取一致性有嚴格要求,可考慮使用 Cassandra 的 Lightweight Transactions(LWT)搭配 IF NOT EXISTS / IF 條件,確保寫入前的前置檢查。 4. 錯誤重試與警示 當 Commit 中某個步驟失敗,直接返回錯誤,但沒有集中收集失敗資訊。 建議整合一個「監控與重試」機制,將失敗細節(step index、錯誤訊息)記錄到外部持久化系統,以便運維人員介入或自動重試。 5. 崩潰恢復 如果程式在 Commit 過程中程式本身當掉,記憶體中的 Steps 會丟失,無法回滾。 可以把 OperationLog 持久化到可靠的日誌表(Cassandra 或外部 DB),Commit 之前就先寫入,並在啟動時掃描未完成的交易回滾或重試。 */ type Action int64 const ( ActionUnknown Action = iota ActionInsert ActionDelete ActionUpdate ) // OperationLog 記錄操作日誌,用於補償回滾 type OperationLog struct { ID gocql.UUID // 操作ID,用來標識該操作 Action Action // 操作類型(增、刪、改) IsExec bool Exec []*gocqlx.Queryx // 這一個步驟要執行的東西 OldData any // 變更前的數據,僅對修改和刪除有效 NewData any // 變更後的數據,僅對新增和修改有效 } // Transaction 這裡是單個實體內的 TX ,不管一些競爭的問題,不管隔離,就可以確定一筆資料要嘛全成功,要嘛全失敗。 type Transaction interface { Insert(ctx context.Context, document any) error Delete(ctx context.Context, filter any) error Update(ctx context.Context, document any) error Rollback() error Commit() error } // transaction 定義補償操作的結構 type transaction struct { ctx context.Context keyspace string db *DB Steps []OperationLog // 用來記錄所有操作步驟的日誌 } func NewEZTransaction(ctx context.Context, keyspace string, db *DB) Transaction { return &transaction{ ctx: ctx, keyspace: keyspace, db: db, Steps: []OperationLog{}, } } func (tx *transaction) Insert(ctx context.Context, document any) error { metadata, err := GenerateTableMetadata(document, tx.keyspace) if err != nil { return err } t := table.New(metadata) q := tx.db.GetSession().Query(t.Insert()).BindStruct(document).WithContext(ctx).WithTimestamp(time.Now().UnixNano() / 1e3) logEntry := OperationLog{ ID: gocql.TimeUUID(), Action: ActionInsert, Exec: []*gocqlx.Queryx{q}, NewData: document, } tx.Steps = append(tx.Steps, logEntry) return nil } func (tx *transaction) Delete(ctx context.Context, filter any) error { metadata, err := GenerateTableMetadata(filter, tx.keyspace) if err != nil { return err } t := table.New(metadata) doc := filter get := tx.db.GetSession().Query(t.Get()).BindStruct(doc).WithContext(ctx) q := tx.db.GetSession().Query(t.Delete()).BindStruct(filter).WithContext(ctx).WithTimestamp(time.Now().UnixNano() / 1e3) logEntry := OperationLog{ ID: gocql.TimeUUID(), Action: ActionDelete, Exec: []*gocqlx.Queryx{get, q}, // 有順序,要先拿取保留舊資料, OldData: doc, // 保留結構體才有機會回復 } tx.Steps = append(tx.Steps, logEntry) return nil } func (tx *transaction) Update(ctx context.Context, document any) error { metadata, err := GenerateTableMetadata(document, tx.keyspace) if err != nil { return err } t := table.New(metadata) v := reflect.ValueOf(document) if v.Kind() == reflect.Ptr { v = v.Elem() } typ := v.Type() // 收集更新欄位與其值(排除零值,保留主鍵) setCols := make([]string, 0) setVals := make([]any, 0) whereCols := make([]string, 0) whereVals := make([]any, 0) for i := 0; i < typ.NumField(); i++ { field := typ.Field(i) tag := field.Tag.Get("db") if tag == "" || tag == "-" { continue } val := v.Field(i) if !val.IsValid() { continue } if contains(metadata.PartKey, tag) || contains(metadata.SortKey, tag) { whereCols = append(whereCols, tag) whereVals = append(whereVals, val.Interface()) continue } if isZero(val) { continue } setCols = append(setCols, tag) setVals = append(setVals, val.Interface()) } if len(setCols) == 0 { return fmt.Errorf("no non-zero update fields provided") } // Build UPDATE statement builder := qb.Update(metadata.Name).Set(setCols...) for _, col := range whereCols { builder = builder.Where(qb.Eq(col)) } stmt, names := builder.ToCql() args := append(setVals, whereVals...) q := tx.db.GetSession().Query(stmt, names).Bind(args...).WithContext(ctx).WithTimestamp(time.Now().UnixNano() / 1e3) doc := document get := tx.db.GetSession().Query(t.Get()).BindStruct(doc).WithContext(ctx) logEntry := OperationLog{ ID: gocql.TimeUUID(), Action: ActionUpdate, Exec: []*gocqlx.Queryx{get, q}, // 有順序,要先拿取保留舊資料,才可以 update OldData: doc, // 保留結構體才有機會回復 NewData: document, } tx.Steps = append(tx.Steps, logEntry) return nil } func (tx *transaction) Rollback() error { for _, item := range tx.Steps { // 沒有做過的就不用回復了 if !item.IsExec { continue } switch item.Action { case ActionInsert: err := tx.db.Delete(tx.ctx, item.NewData, tx.keyspace) if err != nil { // todo log 下來,最大程度保證,都有做完,真的有錯從 log 去補,目前不做 log.Printf("failed to delete since rollback, data: %v", item.NewData) continue } case ActionUpdate: err := tx.db.Update(tx.ctx, item.OldData, tx.keyspace) if err != nil { // todo log 下來,最大程度保證,都有做完,真的有錯從 log 去補,目前不做 log.Printf("failed to update since rollback, data: %v", item.OldData) continue } case ActionDelete: err := tx.db.Insert(tx.ctx, item.OldData, tx.keyspace) if err != nil { // todo log 下來,最大程度保證,都有做完,真的有錯從 log 去補,目前不做 log.Printf("failed to insert since rollback, data: %v", item.OldData) continue } } } return nil } func (tx *transaction) Commit() error { for i, step := range tx.Steps { switch step.Action { case ActionInsert: // 單純插入,不用回滾額外做事,插入的資料已經放在 New Data 裡面了 if err := step.Exec[0].ExecRelease(); err != nil { return fmt.Errorf("failed to insert: %w", err) } // 標示為以執行,如果有錯誤要回復,指座椅執行的就好 tx.Steps[i].IsExec = true case ActionUpdate: // 要先 get 之後再 Update // 單純插入,不用回滾額外做事,插入的資料已經放在 New Data 裡面了 if err := step.Exec[0].GetRelease(step.OldData); err != nil { return fmt.Errorf("failed to get: %w", err) } if err := step.Exec[1].ExecRelease(); err != nil { return fmt.Errorf("failed to update: %w", err) } // 標示為以執行,如果有錯誤要回復,指座椅執行的就好 tx.Steps[i].IsExec = true case ActionDelete: // 要先 get 之後再 Update // 單純插入,不用回滾額外做事,插入的資料已經放在 New Data 裡面了 if err := step.Exec[0].GetRelease(step.OldData); err != nil { return fmt.Errorf("failed to get: %w", err) } if err := step.Exec[1].ExecRelease(); err != nil { return fmt.Errorf("failed to delete: %w", err) } // 標示為以執行,如果有錯誤要回復,指座椅執行的就好 tx.Steps[i].IsExec = true default: return fmt.Errorf("unknown action: %v", step.Action) } } return nil }