backend/pkg/library/cassandra/README.md

16 KiB
Raw Blame History

Cassandra Client Library

一個基於 Go Generics 的 Cassandra 客戶端庫,提供類型安全的 Repository 模式和流暢的查詢構建器 API。

功能特色

  • 類型安全: 使用 Go Generics 提供編譯時類型檢查
  • Repository 模式: 簡潔的 CRUD 操作介面
  • 流暢查詢: 鏈式查詢構建器,支援條件、排序、限制
  • 分散式鎖: 基於 Cassandra 的 IF NOT EXISTS 實現分散式鎖
  • 批次操作: 支援批次插入、更新、刪除
  • SAI 索引支援: 完整的 SAI (Storage-Attached Indexing) 索引管理功能
  • Option 模式: 靈活的配置選項
  • 錯誤處理: 統一的錯誤處理機制
  • 高效能: 內建連接池、重試機制、Prepared Statement 快取

安裝

go get github.com/scylladb/gocqlx/v2
go get github.com/gocql/gocql

快速開始

1. 定義資料模型

package main

import (
	"time"
	"github.com/gocql/gocql"
	"backend/pkg/library/cassandra"
)

// User 定義用戶資料模型
type User struct {
	ID        gocql.UUID `db:"id" partition_key:"true"`
	Name      string     `db:"name"`
	Email     string     `db:"email"`
	Age       int        `db:"age"`
	CreatedAt time.Time  `db:"created_at"`
	UpdatedAt time.Time  `db:"updated_at"`
}

// TableName 實現 Table 介面
func (u User) TableName() string {
	return "users"
}

2. 初始化資料庫連接

package main

import (
	"context"
	"fmt"
	"log"
	
	"backend/pkg/library/cassandra"
	"github.com/gocql/gocql"
)

func main() {
	// 創建資料庫連接
	db, err := cassandra.New(
		cassandra.WithHosts("127.0.0.1"),
		cassandra.WithPort(9042),
		cassandra.WithKeyspace("my_keyspace"),
		cassandra.WithAuth("username", "password"),
		cassandra.WithConsistency(gocql.Quorum),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	// 創建 Repository
	userRepo, err := cassandra.NewRepository[User](db, "my_keyspace")
	if err != nil {
		log.Fatal(err)
	}

	ctx := context.Background()
	
	// 使用 Repository...
	_ = userRepo
}

詳細範例

CRUD 操作

插入資料

// 插入單筆資料
user := User{
	ID:        gocql.TimeUUID(),
	Name:      "Alice",
	Email:     "alice@example.com",
	Age:       30,
	CreatedAt: time.Now(),
	UpdatedAt: time.Now(),
}

err := userRepo.Insert(ctx, user)
if err != nil {
	log.Printf("插入失敗: %v", err)
}

// 批次插入
users := []User{
	{ID: gocql.TimeUUID(), Name: "Bob", Email: "bob@example.com"},
	{ID: gocql.TimeUUID(), Name: "Charlie", Email: "charlie@example.com"},
}

err = userRepo.InsertMany(ctx, users)
if err != nil {
	log.Printf("批次插入失敗: %v", err)
}

查詢資料

// 根據主鍵查詢
userID := gocql.TimeUUID()
user, err := userRepo.Get(ctx, userID)
if err != nil {
	if cassandra.IsNotFound(err) {
		log.Println("用戶不存在")
	} else {
		log.Printf("查詢失敗: %v", err)
	}
	return
}

fmt.Printf("用戶: %+v\n", user)

更新資料

// 更新資料(只更新非零值欄位)
user.Name = "Alice Updated"
user.Email = "alice.updated@example.com"
err = userRepo.Update(ctx, user)
if err != nil {
	log.Printf("更新失敗: %v", err)
}

// 更新所有欄位(包括零值)
user.Age = 0 // 零值也會被更新
err = userRepo.UpdateAll(ctx, user)
if err != nil {
	log.Printf("更新失敗: %v", err)
}

刪除資料

// 刪除資料
err = userRepo.Delete(ctx, userID)
if err != nil {
	log.Printf("刪除失敗: %v", err)
}

查詢構建器

基本查詢

// 查詢所有符合條件的記錄
var users []User
err := userRepo.Query().
	Where(cassandra.Eq("age", 30)).
	OrderBy("created_at", cassandra.DESC).
	Limit(10).
	Scan(ctx, &users)

if err != nil {
	log.Printf("查詢失敗: %v", err)
}

// 查詢單筆記錄
user, err := userRepo.Query().
	Where(cassandra.Eq("email", "alice@example.com")).
	One(ctx)

if err != nil {
	if cassandra.IsNotFound(err) {
		log.Println("用戶不存在")
	} else {
		log.Printf("查詢失敗: %v", err)
	}
}

條件查詢

// 等於條件
userRepo.Query().Where(cassandra.Eq("name", "Alice"))

// IN 條件
userRepo.Query().Where(cassandra.In("id", []any{id1, id2, id3}))

// 大於條件
userRepo.Query().Where(cassandra.Gt("age", 18))

// 小於條件
userRepo.Query().Where(cassandra.Lt("age", 65))

// 組合多個條件
userRepo.Query().
	Where(cassandra.Eq("status", "active")).
	Where(cassandra.Gt("age", 18))

排序和限制

// 按建立時間降序排列,限制 20 筆
var users []User
err := userRepo.Query().
	OrderBy("created_at", cassandra.DESC).
	Limit(20).
	Scan(ctx, &users)

// 多欄位排序
err = userRepo.Query().
	OrderBy("status", cassandra.ASC).
	OrderBy("created_at", cassandra.DESC).
	Scan(ctx, &users)

選擇特定欄位

// 只查詢特定欄位
var users []User
err := userRepo.Query().
	Select("id", "name", "email").
	Where(cassandra.Eq("status", "active")).
	Scan(ctx, &users)

計數查詢

// 計算符合條件的記錄數
count, err := userRepo.Query().
	Where(cassandra.Eq("status", "active")).
	Count(ctx)

if err != nil {
	log.Printf("計數失敗: %v", err)
} else {
	fmt.Printf("活躍用戶數: %d\n", count)
}

分散式鎖

// 獲取鎖(預設 30 秒 TTL
lockUser := User{ID: userID}
err := userRepo.TryLock(ctx, lockUser)
if err != nil {
	if cassandra.IsLockFailed(err) {
		log.Println("獲取鎖失敗,資源已被鎖定")
	} else {
		log.Printf("鎖操作失敗: %v", err)
	}
	return
}

// 執行需要鎖定的操作
defer func() {
	// 釋放鎖
	if err := userRepo.UnLock(ctx, lockUser); err != nil {
		log.Printf("釋放鎖失敗: %v", err)
	}
}()

// 執行業務邏輯...

自訂鎖 TTL

// 設定鎖的 TTL 為 60 秒
err := userRepo.TryLock(ctx, lockUser, cassandra.WithLockTTL(60*time.Second))

// 永不自動解鎖
err := userRepo.TryLock(ctx, lockUser, cassandra.WithNoLockExpire())

複雜主鍵

複合主鍵Partition Key + Clustering Key

// 定義複合主鍵模型
type Order struct {
	UserID    gocql.UUID `db:"user_id" partition_key:"true"`
	OrderID   gocql.UUID `db:"order_id" clustering_key:"true"`
	ProductID string     `db:"product_id"`
	Quantity  int        `db:"quantity"`
	Price     float64    `db:"price"`
	CreatedAt time.Time  `db:"created_at"`
}

func (o Order) TableName() string {
	return "orders"
}

// 查詢時需要提供完整的主鍵
order, err := orderRepo.Get(ctx, Order{
	UserID:  userID,
	OrderID: orderID,
})

多欄位 Partition Key

type Message struct {
	ChatID    gocql.UUID `db:"chat_id" partition_key:"true"`
	MessageID gocql.UUID `db:"message_id" clustering_key:"true"`
	UserID    gocql.UUID `db:"user_id" partition_key:"true"`
	Content   string     `db:"content"`
	CreatedAt time.Time  `db:"created_at"`
}

func (m Message) TableName() string {
	return "messages"
}

// 查詢時需要提供所有 Partition Key
message, err := messageRepo.Get(ctx, Message{
	ChatID: chatID,
	UserID: userID,
	MessageID: messageID,
})

配置選項

連接選項

db, err := cassandra.New(
	// 主機列表
	cassandra.WithHosts("127.0.0.1", "127.0.0.2", "127.0.0.3"),
	
	// 連接埠
	cassandra.WithPort(9042),
	
	// Keyspace
	cassandra.WithKeyspace("my_keyspace"),
	
	// 認證
	cassandra.WithAuth("username", "password"),
	
	// 一致性級別
	cassandra.WithConsistency(gocql.Quorum),
	
	// 連接超時
	cassandra.WithConnectTimeout(10 * time.Second),
	
	// 每個節點的連接數
	cassandra.WithNumConns(10),
	
	// 重試次數
	cassandra.WithMaxRetries(3),
	
	// 重試間隔
	cassandra.WithRetryInterval(100*time.Millisecond, 1*time.Second),
	
	// 重連間隔
	cassandra.WithReconnectInterval(1*time.Second, 10*time.Second),
	
	// CQL 版本
	cassandra.WithCQLVersion("3.0.0"),
)

錯誤處理

錯誤類型

// 檢查是否為特定錯誤
if cassandra.IsNotFound(err) {
	// 記錄不存在
}

if cassandra.IsConflict(err) {
	// 衝突錯誤(如唯一鍵衝突)
}

if cassandra.IsLockFailed(err) {
	// 獲取鎖失敗
}

// 使用 errors.As 獲取詳細錯誤資訊
var cassandraErr *cassandra.Error
if errors.As(err, &cassandraErr) {
	fmt.Printf("錯誤代碼: %s\n", cassandraErr.Code)
	fmt.Printf("錯誤訊息: %s\n", cassandraErr.Message)
	fmt.Printf("資料表: %s\n", cassandraErr.Table)
}

錯誤代碼

  • NOT_FOUND: 記錄未找到
  • CONFLICT: 衝突(如唯一鍵衝突、鎖獲取失敗)
  • INVALID_INPUT: 輸入參數無效
  • MISSING_PARTITION_KEY: 缺少 Partition Key
  • NO_FIELDS_TO_UPDATE: 沒有欄位需要更新
  • MISSING_TABLE_NAME: 缺少 TableName 方法
  • MISSING_WHERE_CONDITION: 缺少 WHERE 條件

最佳實踐

1. 使用 Context

// 所有操作都應該傳入 context以便支援超時和取消
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

user, err := userRepo.Get(ctx, userID)

2. 錯誤處理

user, err := userRepo.Get(ctx, userID)
if err != nil {
	if cassandra.IsNotFound(err) {
		// 處理不存在的情況
		return nil, ErrUserNotFound
	}
	// 處理其他錯誤
	return nil, fmt.Errorf("查詢用戶失敗: %w", err)
}

3. 批次操作

// 對於大量資料,使用批次插入
const batchSize = 100
for i := 0; i < len(users); i += batchSize {
	end := i + batchSize
	if end > len(users) {
		end = len(users)
	}
	
	err := userRepo.InsertMany(ctx, users[i:end])
	if err != nil {
		log.Printf("批次插入失敗 (索引 %d-%d): %v", i, end, err)
	}
}

4. 使用分散式鎖

// 在需要保證原子性的操作中使用鎖
err := userRepo.TryLock(ctx, lockUser, cassandra.WithLockTTL(30*time.Second))
if err != nil {
	return fmt.Errorf("獲取鎖失敗: %w", err)
}
defer userRepo.UnLock(ctx, lockUser)

// 執行需要原子性的操作

5. 查詢優化

// 只選擇需要的欄位
var users []User
err := userRepo.Query().
	Select("id", "name", "email"). // 只選擇需要的欄位
	Where(cassandra.Eq("status", "active")).
	Scan(ctx, &users)

// 使用適當的限制
err = userRepo.Query().
	Where(cassandra.Eq("status", "active")).
	Limit(100). // 限制結果數量
	Scan(ctx, &users)

SAI 索引管理

建立 SAI 索引

// 檢查是否支援 SAI
if !db.SaiSupported() {
    log.Fatal("SAI is not supported in this Cassandra version")
}

// 建立標準索引
err := db.CreateSAIIndex(ctx, "my_keyspace", "users", "email", "users_email_idx", nil)
if err != nil {
    log.Printf("建立索引失敗: %v", err)
}

// 建立全文索引(不區分大小寫)
opts := &cassandra.SAIIndexOptions{
    IndexType:     cassandra.SAIIndexTypeFullText,
    IsAsync:       false,
    CaseSensitive: false,
}
err = db.CreateSAIIndex(ctx, "my_keyspace", "posts", "content", "posts_content_ft_idx", opts)

查詢 SAI 索引

// 列出資料表的所有 SAI 索引
indexes, err := db.ListSAIIndexes(ctx, "my_keyspace", "users")
if err != nil {
    log.Printf("查詢索引失敗: %v", err)
} else {
    for _, idx := range indexes {
        fmt.Printf("索引: %s, 欄位: %s, 類型: %s\n", idx.Name, idx.Column, idx.Type)
    }
}

// 檢查索引是否存在
exists, err := db.CheckSAIIndexExists(ctx, "my_keyspace", "users_email_idx")
if err != nil {
    log.Printf("檢查索引失敗: %v", err)
} else if exists {
    fmt.Println("索引存在")
}

刪除 SAI 索引

// 刪除索引
err := db.DropSAIIndex(ctx, "my_keyspace", "users_email_idx")
if err != nil {
    log.Printf("刪除索引失敗: %v", err)
}

SAI 索引類型

  • SAIIndexTypeStandard: 標準索引(等於查詢)
  • SAIIndexTypeCollection: 集合索引(用於 list、set、map
  • SAIIndexTypeFullText: 全文索引

SAI 索引選項

opts := &cassandra.SAIIndexOptions{
    IndexType:     cassandra.SAIIndexTypeFullText, // 索引類型
    IsAsync:       false,                          // 是否異步建立
    CaseSensitive: true,                           // 是否區分大小寫
}

注意事項

1. 主鍵要求

  • GetDelete 操作必須提供完整的主鍵(所有 Partition Key 和 Clustering Key
  • 單一主鍵值只適用於單一 Partition Key 且無 Clustering Key 的情況

2. 更新操作

  • Update 只更新非零值欄位
  • UpdateAll 更新所有欄位(包括零值)
  • 更新操作必須包含主鍵欄位

3. 查詢限制

  • Cassandra 的查詢必須包含所有 Partition Key
  • 排序只能按 Clustering Key 進行
  • 不支援 JOIN 操作

4. 分散式鎖

  • 鎖使用 IF NOT EXISTS 實現,預設 30 秒 TTL
  • 獲取鎖失敗時會返回 CONFLICT 錯誤
  • 釋放鎖時會自動重試,最多 3 次

5. 批次操作

  • 批次操作有大小限制(建議不超過 1000 筆)
  • 批次操作中的所有操作必須屬於同一個 Partition Key

6. SAI 索引

  • SAI 索引需要 Cassandra 4.0.9+ 版本(建議 5.0+
  • 建立索引前請先檢查 db.SaiSupported()
  • 索引建立是異步操作,可能需要一些時間
  • 刪除索引時使用 IF EXISTS,避免索引不存在時報錯
  • 使用 SAI 索引可以大幅提升非主鍵欄位的查詢效能
  • 全文索引支援不區分大小寫的搜尋

完整範例

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"backend/pkg/library/cassandra"
	"github.com/gocql/gocql"
)

type User struct {
	ID        gocql.UUID `db:"id" partition_key:"true"`
	Name      string     `db:"name"`
	Email     string     `db:"email"`
	Age       int        `db:"age"`
	Status    string     `db:"status"`
	CreatedAt time.Time  `db:"created_at"`
	UpdatedAt time.Time  `db:"updated_at"`
}

func (u User) TableName() string {
	return "users"
}

func main() {
	// 初始化資料庫連接
	db, err := cassandra.New(
		cassandra.WithHosts("127.0.0.1"),
		cassandra.WithPort(9042),
		cassandra.WithKeyspace("my_keyspace"),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	// 創建 Repository
	userRepo, err := cassandra.NewRepository[User](db, "my_keyspace")
	if err != nil {
		log.Fatal(err)
	}

	ctx := context.Background()

	// 插入用戶
	user := User{
		ID:        gocql.TimeUUID(),
		Name:      "Alice",
		Email:     "alice@example.com",
		Age:       30,
		Status:    "active",
		CreatedAt: time.Now(),
		UpdatedAt: time.Now(),
	}

	if err := userRepo.Insert(ctx, user); err != nil {
		log.Printf("插入失敗: %v", err)
		return
	}

	// 查詢用戶
	foundUser, err := userRepo.Get(ctx, user.ID)
	if err != nil {
		log.Printf("查詢失敗: %v", err)
		return
	}
	fmt.Printf("查詢到的用戶: %+v\n", foundUser)

	// 更新用戶
	user.Name = "Alice Updated"
	user.Email = "alice.updated@example.com"
	if err := userRepo.Update(ctx, user); err != nil {
		log.Printf("更新失敗: %v", err)
		return
	}

	// 查詢活躍用戶
	var activeUsers []User
	if err := userRepo.Query().
		Where(cassandra.Eq("status", "active")).
		OrderBy("created_at", cassandra.DESC).
		Limit(10).
		Scan(ctx, &activeUsers); err != nil {
		log.Printf("查詢失敗: %v", err)
		return
	}
	fmt.Printf("活躍用戶數: %d\n", len(activeUsers))

	// 使用分散式鎖
	if err := userRepo.TryLock(ctx, user, cassandra.WithLockTTL(30*time.Second)); err != nil {
		if cassandra.IsLockFailed(err) {
			log.Println("獲取鎖失敗")
		} else {
			log.Printf("鎖操作失敗: %v", err)
		}
		return
	}
	defer userRepo.UnLock(ctx, user)

	// 執行需要鎖定的操作
	fmt.Println("執行需要鎖定的操作...")

	// 刪除用戶
	if err := userRepo.Delete(ctx, user.ID); err != nil {
		log.Printf("刪除失敗: %v", err)
		return
	}

	fmt.Println("操作完成")
}

測試

套件包含完整的測試覆蓋,包括:

  • 單元測試table-driven tests
  • 集成測試(使用 testcontainers

運行測試:

go test ./pkg/library/cassandra/...

查看測試覆蓋率:

go test ./pkg/library/cassandra/... -cover

授權

本專案遵循專案的主要授權協議。