backend/pkg/library/cassandra/sai.go

290 lines
8.1 KiB
Go
Raw Permalink Normal View History

2025-11-19 05:33:06 +00:00
package cassandra
import (
"context"
"fmt"
"strings"
"github.com/gocql/gocql"
)
// SAIIndexType 定義 SAI 索引類型
type SAIIndexType string
const (
2025-11-19 09:06:44 +00:00
// SAIIndexTypeStandard 標準索引(等於查詢)
SAIIndexTypeStandard SAIIndexType = "STANDARD"
// SAIIndexTypeCollection 集合索引(用於 list、set、map
SAIIndexTypeCollection SAIIndexType = "COLLECTION"
// SAIIndexTypeFullText 全文索引
SAIIndexTypeFullText SAIIndexType = "FULL_TEXT"
2025-11-19 05:33:06 +00:00
)
// SAIIndexOptions 定義 SAI 索引選項
type SAIIndexOptions struct {
2025-11-19 09:06:44 +00:00
IndexType SAIIndexType // 索引類型
IsAsync bool // 是否異步建立索引
CaseSensitive bool // 是否區分大小寫(用於全文索引)
2025-11-19 05:33:06 +00:00
}
2025-11-19 09:06:44 +00:00
// DefaultSAIIndexOptions 返回預設的 SAI 索引選項
func DefaultSAIIndexOptions() *SAIIndexOptions {
return &SAIIndexOptions{
IndexType: SAIIndexTypeStandard,
IsAsync: false,
CaseSensitive: true,
}
2025-11-19 05:33:06 +00:00
}
// CreateSAIIndex 建立 SAI 索引
2025-11-19 09:06:44 +00:00
// keyspace: keyspace 名稱
// table: 資料表名稱
2025-11-19 05:33:06 +00:00
// column: 欄位名稱
// indexName: 索引名稱(可選,如果為空則自動生成)
2025-11-19 09:06:44 +00:00
// opts: 索引選項(可選,如果為 nil 則使用預設選項)
func (db *DB) CreateSAIIndex(ctx context.Context, keyspace, table, column, indexName string, opts *SAIIndexOptions) error {
// 檢查是否支援 SAI
2025-11-19 05:33:06 +00:00
if !db.saiSupported {
2025-11-19 09:06:44 +00:00
return ErrInvalidInput.WithError(fmt.Errorf("SAI is not supported in Cassandra version %s (requires 4.0.9+ or 5.0+)", db.version))
2025-11-19 05:33:06 +00:00
}
2025-11-19 09:06:44 +00:00
// 驗證參數
2025-11-19 05:33:06 +00:00
if keyspace == "" {
return ErrInvalidInput.WithError(fmt.Errorf("keyspace is required"))
}
if table == "" {
return ErrInvalidInput.WithError(fmt.Errorf("table is required"))
}
if column == "" {
return ErrInvalidInput.WithError(fmt.Errorf("column is required"))
}
2025-11-19 09:06:44 +00:00
// 使用預設選項如果未提供
if opts == nil {
opts = DefaultSAIIndexOptions()
}
// 生成索引名稱如果未提供
2025-11-19 05:33:06 +00:00
if indexName == "" {
2025-11-19 09:06:44 +00:00
indexName = fmt.Sprintf("%s_%s_sai_idx", table, column)
2025-11-19 05:33:06 +00:00
}
// 構建 CREATE INDEX 語句
2025-11-19 09:06:44 +00:00
var stmt strings.Builder
stmt.WriteString("CREATE CUSTOM INDEX IF NOT EXISTS ")
stmt.WriteString(indexName)
stmt.WriteString(" ON ")
stmt.WriteString(keyspace)
stmt.WriteString(".")
stmt.WriteString(table)
stmt.WriteString(" (")
stmt.WriteString(column)
stmt.WriteString(") USING 'StorageAttachedIndex'")
2025-11-19 05:33:06 +00:00
// 添加選項
2025-11-19 09:06:44 +00:00
var options []string
if opts.IsAsync {
options = append(options, "'async'='true'")
}
// 根據索引類型添加特定選項
switch opts.IndexType {
case SAIIndexTypeFullText:
if !opts.CaseSensitive {
options = append(options, "'case_sensitive'='false'")
} else {
options = append(options, "'case_sensitive'='true'")
2025-11-19 05:33:06 +00:00
}
2025-11-19 09:06:44 +00:00
case SAIIndexTypeCollection:
// Collection 索引不需要額外選項
}
// 如果有選項,添加到語句中
if len(options) > 0 {
stmt.WriteString(" WITH OPTIONS = {")
stmt.WriteString(strings.Join(options, ", "))
stmt.WriteString("}")
2025-11-19 05:33:06 +00:00
}
2025-11-19 09:06:44 +00:00
// 執行建立索引語句
query := db.session.Query(stmt.String(), nil).
WithContext(ctx).
Consistency(gocql.Quorum)
err := query.ExecRelease()
if err != nil {
return ErrInvalidInput.WithError(fmt.Errorf("failed to create SAI index: %w", err))
2025-11-19 05:33:06 +00:00
}
return nil
}
// DropSAIIndex 刪除 SAI 索引
2025-11-19 09:06:44 +00:00
// keyspace: keyspace 名稱
2025-11-19 05:33:06 +00:00
// indexName: 索引名稱
func (db *DB) DropSAIIndex(ctx context.Context, keyspace, indexName string) error {
2025-11-19 09:06:44 +00:00
// 驗證參數
2025-11-19 05:33:06 +00:00
if keyspace == "" {
return ErrInvalidInput.WithError(fmt.Errorf("keyspace is required"))
}
if indexName == "" {
return ErrInvalidInput.WithError(fmt.Errorf("index name is required"))
}
// 構建 DROP INDEX 語句
stmt := fmt.Sprintf("DROP INDEX IF EXISTS %s.%s", keyspace, indexName)
2025-11-19 09:06:44 +00:00
// 執行刪除索引語句
query := db.session.Query(stmt, nil).
WithContext(ctx).
Consistency(gocql.Quorum)
err := query.ExecRelease()
if err != nil {
2025-11-19 05:33:06 +00:00
return ErrInvalidInput.WithError(fmt.Errorf("failed to drop SAI index: %w", err))
}
return nil
}
2025-11-19 09:06:44 +00:00
// ListSAIIndexes 列出指定資料表的所有 SAI 索引
// keyspace: keyspace 名稱
// table: 資料表名稱
2025-11-19 05:33:06 +00:00
func (db *DB) ListSAIIndexes(ctx context.Context, keyspace, table string) ([]SAIIndexInfo, error) {
2025-11-19 09:06:44 +00:00
// 驗證參數
2025-11-19 05:33:06 +00:00
if keyspace == "" {
return nil, ErrInvalidInput.WithError(fmt.Errorf("keyspace is required"))
}
2025-11-19 09:06:44 +00:00
if table == "" {
return nil, ErrInvalidInput.WithError(fmt.Errorf("table is required"))
2025-11-19 05:33:06 +00:00
}
2025-11-19 09:06:44 +00:00
// 查詢系統表獲取索引資訊
// system_schema.indexes 表的結構keyspace_name, table_name, index_name, kind, options
stmt := `
SELECT index_name, kind, options
FROM system_schema.indexes
WHERE keyspace_name = ? AND table_name = ?
`
2025-11-19 05:33:06 +00:00
var indexes []SAIIndexInfo
2025-11-19 09:06:44 +00:00
iter := db.session.Query(stmt, []string{"keyspace_name", "table_name"}).
WithContext(ctx).
Consistency(gocql.One).
Bind(keyspace, table).
Iter()
2025-11-19 05:33:06 +00:00
2025-11-19 09:06:44 +00:00
var indexName, kind string
2025-11-19 05:33:06 +00:00
var options map[string]string
2025-11-19 09:06:44 +00:00
for iter.Scan(&indexName, &kind, &options) {
// 檢查是否為 SAI 索引kind = 'CUSTOM' 且 class_name 包含 StorageAttachedIndex
if kind == "CUSTOM" {
if className, ok := options["class_name"]; ok && strings.Contains(className, "StorageAttachedIndex") {
// 從 options 中提取 target欄位名稱
columnName := ""
if target, ok := options["target"]; ok {
columnName = strings.Trim(target, "()\"'")
}
indexes = append(indexes, SAIIndexInfo{
Name: indexName,
Type: "StorageAttachedIndex",
Options: options,
Column: columnName,
})
}
2025-11-19 05:33:06 +00:00
}
}
if err := iter.Close(); err != nil {
return nil, ErrInvalidInput.WithError(fmt.Errorf("failed to list SAI indexes: %w", err))
}
return indexes, nil
}
2025-11-19 09:06:44 +00:00
// SAIIndexInfo 表示 SAI 索引資訊
type SAIIndexInfo struct {
Name string // 索引名稱
Type string // 索引類型
Options map[string]string // 索引選項
Column string // 索引欄位名稱
}
2025-11-19 05:33:06 +00:00
2025-11-19 09:06:44 +00:00
// CheckSAIIndexExists 檢查 SAI 索引是否存在
// keyspace: keyspace 名稱
// indexName: 索引名稱
func (db *DB) CheckSAIIndexExists(ctx context.Context, keyspace, indexName string) (bool, error) {
// 驗證參數
2025-11-19 05:33:06 +00:00
if keyspace == "" {
2025-11-19 09:06:44 +00:00
return false, ErrInvalidInput.WithError(fmt.Errorf("keyspace is required"))
2025-11-19 05:33:06 +00:00
}
if indexName == "" {
2025-11-19 09:06:44 +00:00
return false, ErrInvalidInput.WithError(fmt.Errorf("index name is required"))
2025-11-19 05:33:06 +00:00
}
2025-11-19 09:06:44 +00:00
// 查詢系統表檢查索引是否存在
stmt := `
SELECT index_name, kind, options
FROM system_schema.indexes
WHERE keyspace_name = ? AND index_name = ?
LIMIT 1
`
2025-11-19 05:33:06 +00:00
2025-11-19 09:06:44 +00:00
var foundIndexName, kind string
2025-11-19 05:33:06 +00:00
var options map[string]string
2025-11-19 09:06:44 +00:00
err := db.session.Query(stmt, []string{"keyspace_name", "index_name"}).
WithContext(ctx).
Consistency(gocql.One).
Bind(keyspace, indexName).
Scan(&foundIndexName, &kind, &options)
if err == gocql.ErrNotFound {
return false, nil
}
2025-11-19 05:33:06 +00:00
if err != nil {
2025-11-19 09:06:44 +00:00
return false, ErrInvalidInput.WithError(fmt.Errorf("failed to check SAI index existence: %w", err))
2025-11-19 05:33:06 +00:00
}
// 檢查是否為 SAI 索引
2025-11-19 09:06:44 +00:00
if kind == "CUSTOM" {
if className, ok := options["class_name"]; ok && strings.Contains(className, "StorageAttachedIndex") {
return true, nil
}
2025-11-19 05:33:06 +00:00
}
2025-11-19 09:06:44 +00:00
return false, nil
}
// WaitForSAIIndex 等待 SAI 索引建立完成(用於異步建立)
// keyspace: keyspace 名稱
// indexName: 索引名稱
// maxWaitTime: 最大等待時間(秒)
func (db *DB) WaitForSAIIndex(ctx context.Context, keyspace, indexName string, maxWaitTime int) error {
// 驗證參數
if keyspace == "" {
return ErrInvalidInput.WithError(fmt.Errorf("keyspace is required"))
}
if indexName == "" {
return ErrInvalidInput.WithError(fmt.Errorf("index name is required"))
2025-11-19 05:33:06 +00:00
}
2025-11-19 09:06:44 +00:00
// 查詢索引狀態
// 注意Cassandra 沒有直接的索引狀態查詢,這裡需要通過檢查索引是否可用來判斷
// 實際實作可能需要根據具體的 Cassandra 版本調整
// 簡單實作:檢查索引是否存在
exists, err := db.CheckSAIIndexExists(ctx, keyspace, indexName)
if err != nil {
return err
}
if !exists {
return ErrInvalidInput.WithError(fmt.Errorf("index %s does not exist", indexName))
}
// 注意:實際的等待邏輯可能需要查詢系統表或使用其他方法
// 這裡只是基本框架,實際使用時可能需要根據具體需求調整
return nil
2025-11-19 05:33:06 +00:00
}