|
|
||
|---|---|---|
| .. | ||
| README.md | ||
| const.go | ||
| db.go | ||
| db_test.go | ||
| errors.go | ||
| errors_test.go | ||
| lock.go | ||
| lock_test.go | ||
| metadata.go | ||
| metadata_test.go | ||
| option.go | ||
| option_test.go | ||
| query.go | ||
| query_test.go | ||
| repository.go | ||
| repository_test.go | ||
| sai.go | ||
| sai_test.go | ||
| testhelper.go | ||
| types.go | ||
| types_test.go | ||
README.md
Cassandra Client Library
一個基於 Go Generics 的 Cassandra 客戶端庫,提供類型安全的 Repository 模式和流暢的查詢構建器 API。
功能特色
- 類型安全: 使用 Go Generics 提供編譯時類型檢查
- Repository 模式: 簡潔的 CRUD 操作介面
- 流暢查詢: 鏈式查詢構建器,支援條件、排序、限制
- 分散式鎖: 基於 Cassandra 的 IF NOT EXISTS 實現分散式鎖
- 批次操作: 支援批次插入、更新、刪除
- SAI 索引支援: 完整的 SAI (Storage-Attached Indexing) 索引管理功能
- Option 模式: 靈活的配置選項
- 錯誤處理: 統一的錯誤處理機制
- 高效能: 內建連接池、重試機制、Prepared Statement 快取
安裝
go get github.com/scylladb/gocqlx/v2
go get github.com/gocql/gocql
快速開始
1. 定義資料模型
package main
import (
"time"
"github.com/gocql/gocql"
"backend/pkg/library/cassandra"
)
// User 定義用戶資料模型
type User struct {
ID gocql.UUID `db:"id" partition_key:"true"`
Name string `db:"name"`
Email string `db:"email"`
Age int `db:"age"`
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
}
// TableName 實現 Table 介面
func (u User) TableName() string {
return "users"
}
2. 初始化資料庫連接
package main
import (
"context"
"fmt"
"log"
"backend/pkg/library/cassandra"
"github.com/gocql/gocql"
)
func main() {
// 創建資料庫連接
db, err := cassandra.New(
cassandra.WithHosts("127.0.0.1"),
cassandra.WithPort(9042),
cassandra.WithKeyspace("my_keyspace"),
cassandra.WithAuth("username", "password"),
cassandra.WithConsistency(gocql.Quorum),
)
if err != nil {
log.Fatal(err)
}
defer db.Close()
// 創建 Repository
userRepo, err := cassandra.NewRepository[User](db, "my_keyspace")
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
// 使用 Repository...
_ = userRepo
}
詳細範例
CRUD 操作
插入資料
// 插入單筆資料
user := User{
ID: gocql.TimeUUID(),
Name: "Alice",
Email: "alice@example.com",
Age: 30,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
err := userRepo.Insert(ctx, user)
if err != nil {
log.Printf("插入失敗: %v", err)
}
// 批次插入
users := []User{
{ID: gocql.TimeUUID(), Name: "Bob", Email: "bob@example.com"},
{ID: gocql.TimeUUID(), Name: "Charlie", Email: "charlie@example.com"},
}
err = userRepo.InsertMany(ctx, users)
if err != nil {
log.Printf("批次插入失敗: %v", err)
}
查詢資料
// 根據主鍵查詢
userID := gocql.TimeUUID()
user, err := userRepo.Get(ctx, userID)
if err != nil {
if cassandra.IsNotFound(err) {
log.Println("用戶不存在")
} else {
log.Printf("查詢失敗: %v", err)
}
return
}
fmt.Printf("用戶: %+v\n", user)
更新資料
// 更新資料(只更新非零值欄位)
user.Name = "Alice Updated"
user.Email = "alice.updated@example.com"
err = userRepo.Update(ctx, user)
if err != nil {
log.Printf("更新失敗: %v", err)
}
// 更新所有欄位(包括零值)
user.Age = 0 // 零值也會被更新
err = userRepo.UpdateAll(ctx, user)
if err != nil {
log.Printf("更新失敗: %v", err)
}
刪除資料
// 刪除資料
err = userRepo.Delete(ctx, userID)
if err != nil {
log.Printf("刪除失敗: %v", err)
}
查詢構建器
基本查詢
// 查詢所有符合條件的記錄
var users []User
err := userRepo.Query().
Where(cassandra.Eq("age", 30)).
OrderBy("created_at", cassandra.DESC).
Limit(10).
Scan(ctx, &users)
if err != nil {
log.Printf("查詢失敗: %v", err)
}
// 查詢單筆記錄
user, err := userRepo.Query().
Where(cassandra.Eq("email", "alice@example.com")).
One(ctx)
if err != nil {
if cassandra.IsNotFound(err) {
log.Println("用戶不存在")
} else {
log.Printf("查詢失敗: %v", err)
}
}
條件查詢
// 等於條件
userRepo.Query().Where(cassandra.Eq("name", "Alice"))
// IN 條件
userRepo.Query().Where(cassandra.In("id", []any{id1, id2, id3}))
// 大於條件
userRepo.Query().Where(cassandra.Gt("age", 18))
// 小於條件
userRepo.Query().Where(cassandra.Lt("age", 65))
// 組合多個條件
userRepo.Query().
Where(cassandra.Eq("status", "active")).
Where(cassandra.Gt("age", 18))
排序和限制
// 按建立時間降序排列,限制 20 筆
var users []User
err := userRepo.Query().
OrderBy("created_at", cassandra.DESC).
Limit(20).
Scan(ctx, &users)
// 多欄位排序
err = userRepo.Query().
OrderBy("status", cassandra.ASC).
OrderBy("created_at", cassandra.DESC).
Scan(ctx, &users)
選擇特定欄位
// 只查詢特定欄位
var users []User
err := userRepo.Query().
Select("id", "name", "email").
Where(cassandra.Eq("status", "active")).
Scan(ctx, &users)
計數查詢
// 計算符合條件的記錄數
count, err := userRepo.Query().
Where(cassandra.Eq("status", "active")).
Count(ctx)
if err != nil {
log.Printf("計數失敗: %v", err)
} else {
fmt.Printf("活躍用戶數: %d\n", count)
}
分散式鎖
// 獲取鎖(預設 30 秒 TTL)
lockUser := User{ID: userID}
err := userRepo.TryLock(ctx, lockUser)
if err != nil {
if cassandra.IsLockFailed(err) {
log.Println("獲取鎖失敗,資源已被鎖定")
} else {
log.Printf("鎖操作失敗: %v", err)
}
return
}
// 執行需要鎖定的操作
defer func() {
// 釋放鎖
if err := userRepo.UnLock(ctx, lockUser); err != nil {
log.Printf("釋放鎖失敗: %v", err)
}
}()
// 執行業務邏輯...
自訂鎖 TTL
// 設定鎖的 TTL 為 60 秒
err := userRepo.TryLock(ctx, lockUser, cassandra.WithLockTTL(60*time.Second))
// 永不自動解鎖
err := userRepo.TryLock(ctx, lockUser, cassandra.WithNoLockExpire())
複雜主鍵
複合主鍵(Partition Key + Clustering Key)
// 定義複合主鍵模型
type Order struct {
UserID gocql.UUID `db:"user_id" partition_key:"true"`
OrderID gocql.UUID `db:"order_id" clustering_key:"true"`
ProductID string `db:"product_id"`
Quantity int `db:"quantity"`
Price float64 `db:"price"`
CreatedAt time.Time `db:"created_at"`
}
func (o Order) TableName() string {
return "orders"
}
// 查詢時需要提供完整的主鍵
order, err := orderRepo.Get(ctx, Order{
UserID: userID,
OrderID: orderID,
})
多欄位 Partition Key
type Message struct {
ChatID gocql.UUID `db:"chat_id" partition_key:"true"`
MessageID gocql.UUID `db:"message_id" clustering_key:"true"`
UserID gocql.UUID `db:"user_id" partition_key:"true"`
Content string `db:"content"`
CreatedAt time.Time `db:"created_at"`
}
func (m Message) TableName() string {
return "messages"
}
// 查詢時需要提供所有 Partition Key
message, err := messageRepo.Get(ctx, Message{
ChatID: chatID,
UserID: userID,
MessageID: messageID,
})
配置選項
連接選項
db, err := cassandra.New(
// 主機列表
cassandra.WithHosts("127.0.0.1", "127.0.0.2", "127.0.0.3"),
// 連接埠
cassandra.WithPort(9042),
// Keyspace
cassandra.WithKeyspace("my_keyspace"),
// 認證
cassandra.WithAuth("username", "password"),
// 一致性級別
cassandra.WithConsistency(gocql.Quorum),
// 連接超時
cassandra.WithConnectTimeout(10 * time.Second),
// 每個節點的連接數
cassandra.WithNumConns(10),
// 重試次數
cassandra.WithMaxRetries(3),
// 重試間隔
cassandra.WithRetryInterval(100*time.Millisecond, 1*time.Second),
// 重連間隔
cassandra.WithReconnectInterval(1*time.Second, 10*time.Second),
// CQL 版本
cassandra.WithCQLVersion("3.0.0"),
)
錯誤處理
錯誤類型
// 檢查是否為特定錯誤
if cassandra.IsNotFound(err) {
// 記錄不存在
}
if cassandra.IsConflict(err) {
// 衝突錯誤(如唯一鍵衝突)
}
if cassandra.IsLockFailed(err) {
// 獲取鎖失敗
}
// 使用 errors.As 獲取詳細錯誤資訊
var cassandraErr *cassandra.Error
if errors.As(err, &cassandraErr) {
fmt.Printf("錯誤代碼: %s\n", cassandraErr.Code)
fmt.Printf("錯誤訊息: %s\n", cassandraErr.Message)
fmt.Printf("資料表: %s\n", cassandraErr.Table)
}
錯誤代碼
NOT_FOUND: 記錄未找到CONFLICT: 衝突(如唯一鍵衝突、鎖獲取失敗)INVALID_INPUT: 輸入參數無效MISSING_PARTITION_KEY: 缺少 Partition KeyNO_FIELDS_TO_UPDATE: 沒有欄位需要更新MISSING_TABLE_NAME: 缺少 TableName 方法MISSING_WHERE_CONDITION: 缺少 WHERE 條件
最佳實踐
1. 使用 Context
// 所有操作都應該傳入 context,以便支援超時和取消
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
user, err := userRepo.Get(ctx, userID)
2. 錯誤處理
user, err := userRepo.Get(ctx, userID)
if err != nil {
if cassandra.IsNotFound(err) {
// 處理不存在的情況
return nil, ErrUserNotFound
}
// 處理其他錯誤
return nil, fmt.Errorf("查詢用戶失敗: %w", err)
}
3. 批次操作
// 對於大量資料,使用批次插入
const batchSize = 100
for i := 0; i < len(users); i += batchSize {
end := i + batchSize
if end > len(users) {
end = len(users)
}
err := userRepo.InsertMany(ctx, users[i:end])
if err != nil {
log.Printf("批次插入失敗 (索引 %d-%d): %v", i, end, err)
}
}
4. 使用分散式鎖
// 在需要保證原子性的操作中使用鎖
err := userRepo.TryLock(ctx, lockUser, cassandra.WithLockTTL(30*time.Second))
if err != nil {
return fmt.Errorf("獲取鎖失敗: %w", err)
}
defer userRepo.UnLock(ctx, lockUser)
// 執行需要原子性的操作
5. 查詢優化
// 只選擇需要的欄位
var users []User
err := userRepo.Query().
Select("id", "name", "email"). // 只選擇需要的欄位
Where(cassandra.Eq("status", "active")).
Scan(ctx, &users)
// 使用適當的限制
err = userRepo.Query().
Where(cassandra.Eq("status", "active")).
Limit(100). // 限制結果數量
Scan(ctx, &users)
注意事項
1. 主鍵要求
Get和Delete操作必須提供完整的主鍵(所有 Partition Key 和 Clustering Key)- 單一主鍵值只適用於單一 Partition Key 且無 Clustering Key 的情況
2. 更新操作
Update只更新非零值欄位UpdateAll更新所有欄位(包括零值)- 更新操作必須包含主鍵欄位
3. 查詢限制
- Cassandra 的查詢必須包含所有 Partition Key
- 排序只能按 Clustering Key 進行
- 不支援 JOIN 操作
4. 分散式鎖
- 鎖使用 IF NOT EXISTS 實現,預設 30 秒 TTL
- 獲取鎖失敗時會返回
CONFLICT錯誤 - 釋放鎖時會自動重試,最多 3 次
5. 批次操作
- 批次操作有大小限制(建議不超過 1000 筆)
- 批次操作中的所有操作必須屬於同一個 Partition Key
6. SAI 索引
- SAI 索引需要 Cassandra 4.0.9+ 版本(建議 5.0+)
- 建立索引前請先檢查
db.SaiSupported() - 索引建立是異步操作,可能需要一些時間
- 刪除索引時使用
IF EXISTS,避免索引不存在時報錯
完整範例
package main
import (
"context"
"fmt"
"log"
"time"
"backend/pkg/library/cassandra"
"github.com/gocql/gocql"
)
type User struct {
ID gocql.UUID `db:"id" partition_key:"true"`
Name string `db:"name"`
Email string `db:"email"`
Age int `db:"age"`
Status string `db:"status"`
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
}
func (u User) TableName() string {
return "users"
}
func main() {
// 初始化資料庫連接
db, err := cassandra.New(
cassandra.WithHosts("127.0.0.1"),
cassandra.WithPort(9042),
cassandra.WithKeyspace("my_keyspace"),
)
if err != nil {
log.Fatal(err)
}
defer db.Close()
// 創建 Repository
userRepo, err := cassandra.NewRepository[User](db, "my_keyspace")
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
// 插入用戶
user := User{
ID: gocql.TimeUUID(),
Name: "Alice",
Email: "alice@example.com",
Age: 30,
Status: "active",
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
if err := userRepo.Insert(ctx, user); err != nil {
log.Printf("插入失敗: %v", err)
return
}
// 查詢用戶
foundUser, err := userRepo.Get(ctx, user.ID)
if err != nil {
log.Printf("查詢失敗: %v", err)
return
}
fmt.Printf("查詢到的用戶: %+v\n", foundUser)
// 更新用戶
user.Name = "Alice Updated"
user.Email = "alice.updated@example.com"
if err := userRepo.Update(ctx, user); err != nil {
log.Printf("更新失敗: %v", err)
return
}
// 查詢活躍用戶
var activeUsers []User
if err := userRepo.Query().
Where(cassandra.Eq("status", "active")).
OrderBy("created_at", cassandra.DESC).
Limit(10).
Scan(ctx, &activeUsers); err != nil {
log.Printf("查詢失敗: %v", err)
return
}
fmt.Printf("活躍用戶數: %d\n", len(activeUsers))
// 使用分散式鎖
if err := userRepo.TryLock(ctx, user, cassandra.WithLockTTL(30*time.Second)); err != nil {
if cassandra.IsLockFailed(err) {
log.Println("獲取鎖失敗")
} else {
log.Printf("鎖操作失敗: %v", err)
}
return
}
defer userRepo.UnLock(ctx, user)
// 執行需要鎖定的操作
fmt.Println("執行需要鎖定的操作...")
// 刪除用戶
if err := userRepo.Delete(ctx, user.ID); err != nil {
log.Printf("刪除失敗: %v", err)
return
}
fmt.Println("操作完成")
}
測試
套件包含完整的測試覆蓋,包括:
- 單元測試(table-driven tests)
- 集成測試(使用 testcontainers)
運行測試:
go test ./pkg/library/cassandra/...
查看測試覆蓋率:
go test ./pkg/library/cassandra/... -cover
授權
本專案遵循專案的主要授權協議。