290 lines
8.1 KiB
Go
290 lines
8.1 KiB
Go
package cassandra
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"strings"
|
||
|
||
"github.com/gocql/gocql"
|
||
)
|
||
|
||
// SAIIndexType 定義 SAI 索引類型
|
||
type SAIIndexType string
|
||
|
||
const (
|
||
// SAIIndexTypeStandard 標準索引(等於查詢)
|
||
SAIIndexTypeStandard SAIIndexType = "STANDARD"
|
||
// SAIIndexTypeCollection 集合索引(用於 list、set、map)
|
||
SAIIndexTypeCollection SAIIndexType = "COLLECTION"
|
||
// SAIIndexTypeFullText 全文索引
|
||
SAIIndexTypeFullText SAIIndexType = "FULL_TEXT"
|
||
)
|
||
|
||
// SAIIndexOptions 定義 SAI 索引選項
|
||
type SAIIndexOptions struct {
|
||
IndexType SAIIndexType // 索引類型
|
||
IsAsync bool // 是否異步建立索引
|
||
CaseSensitive bool // 是否區分大小寫(用於全文索引)
|
||
}
|
||
|
||
// DefaultSAIIndexOptions 返回預設的 SAI 索引選項
|
||
func DefaultSAIIndexOptions() *SAIIndexOptions {
|
||
return &SAIIndexOptions{
|
||
IndexType: SAIIndexTypeStandard,
|
||
IsAsync: false,
|
||
CaseSensitive: true,
|
||
}
|
||
}
|
||
|
||
// CreateSAIIndex 建立 SAI 索引
|
||
// keyspace: keyspace 名稱
|
||
// table: 資料表名稱
|
||
// column: 欄位名稱
|
||
// indexName: 索引名稱(可選,如果為空則自動生成)
|
||
// opts: 索引選項(可選,如果為 nil 則使用預設選項)
|
||
func (db *DB) CreateSAIIndex(ctx context.Context, keyspace, table, column, indexName string, opts *SAIIndexOptions) error {
|
||
// 檢查是否支援 SAI
|
||
if !db.saiSupported {
|
||
return ErrInvalidInput.WithError(fmt.Errorf("SAI is not supported in Cassandra version %s (requires 4.0.9+ or 5.0+)", db.version))
|
||
}
|
||
|
||
// 驗證參數
|
||
if keyspace == "" {
|
||
return ErrInvalidInput.WithError(fmt.Errorf("keyspace is required"))
|
||
}
|
||
if table == "" {
|
||
return ErrInvalidInput.WithError(fmt.Errorf("table is required"))
|
||
}
|
||
if column == "" {
|
||
return ErrInvalidInput.WithError(fmt.Errorf("column is required"))
|
||
}
|
||
|
||
// 使用預設選項如果未提供
|
||
if opts == nil {
|
||
opts = DefaultSAIIndexOptions()
|
||
}
|
||
|
||
// 生成索引名稱如果未提供
|
||
if indexName == "" {
|
||
indexName = fmt.Sprintf("%s_%s_sai_idx", table, column)
|
||
}
|
||
|
||
// 構建 CREATE INDEX 語句
|
||
var stmt strings.Builder
|
||
stmt.WriteString("CREATE CUSTOM INDEX IF NOT EXISTS ")
|
||
stmt.WriteString(indexName)
|
||
stmt.WriteString(" ON ")
|
||
stmt.WriteString(keyspace)
|
||
stmt.WriteString(".")
|
||
stmt.WriteString(table)
|
||
stmt.WriteString(" (")
|
||
stmt.WriteString(column)
|
||
stmt.WriteString(") USING 'StorageAttachedIndex'")
|
||
|
||
// 添加選項
|
||
var options []string
|
||
if opts.IsAsync {
|
||
options = append(options, "'async'='true'")
|
||
}
|
||
|
||
// 根據索引類型添加特定選項
|
||
switch opts.IndexType {
|
||
case SAIIndexTypeFullText:
|
||
if !opts.CaseSensitive {
|
||
options = append(options, "'case_sensitive'='false'")
|
||
} else {
|
||
options = append(options, "'case_sensitive'='true'")
|
||
}
|
||
case SAIIndexTypeCollection:
|
||
// Collection 索引不需要額外選項
|
||
}
|
||
|
||
// 如果有選項,添加到語句中
|
||
if len(options) > 0 {
|
||
stmt.WriteString(" WITH OPTIONS = {")
|
||
stmt.WriteString(strings.Join(options, ", "))
|
||
stmt.WriteString("}")
|
||
}
|
||
|
||
// 執行建立索引語句
|
||
query := db.session.Query(stmt.String(), nil).
|
||
WithContext(ctx).
|
||
Consistency(gocql.Quorum)
|
||
|
||
err := query.ExecRelease()
|
||
if err != nil {
|
||
return ErrInvalidInput.WithError(fmt.Errorf("failed to create SAI index: %w", err))
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// DropSAIIndex 刪除 SAI 索引
|
||
// keyspace: keyspace 名稱
|
||
// indexName: 索引名稱
|
||
func (db *DB) DropSAIIndex(ctx context.Context, keyspace, indexName string) error {
|
||
// 驗證參數
|
||
if keyspace == "" {
|
||
return ErrInvalidInput.WithError(fmt.Errorf("keyspace is required"))
|
||
}
|
||
if indexName == "" {
|
||
return ErrInvalidInput.WithError(fmt.Errorf("index name is required"))
|
||
}
|
||
|
||
// 構建 DROP INDEX 語句
|
||
stmt := fmt.Sprintf("DROP INDEX IF EXISTS %s.%s", keyspace, indexName)
|
||
|
||
// 執行刪除索引語句
|
||
query := db.session.Query(stmt, nil).
|
||
WithContext(ctx).
|
||
Consistency(gocql.Quorum)
|
||
|
||
err := query.ExecRelease()
|
||
if err != nil {
|
||
return ErrInvalidInput.WithError(fmt.Errorf("failed to drop SAI index: %w", err))
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// ListSAIIndexes 列出指定資料表的所有 SAI 索引
|
||
// keyspace: keyspace 名稱
|
||
// table: 資料表名稱
|
||
func (db *DB) ListSAIIndexes(ctx context.Context, keyspace, table string) ([]SAIIndexInfo, error) {
|
||
// 驗證參數
|
||
if keyspace == "" {
|
||
return nil, ErrInvalidInput.WithError(fmt.Errorf("keyspace is required"))
|
||
}
|
||
if table == "" {
|
||
return nil, ErrInvalidInput.WithError(fmt.Errorf("table is required"))
|
||
}
|
||
|
||
// 查詢系統表獲取索引資訊
|
||
// system_schema.indexes 表的結構:keyspace_name, table_name, index_name, kind, options
|
||
stmt := `
|
||
SELECT index_name, kind, options
|
||
FROM system_schema.indexes
|
||
WHERE keyspace_name = ? AND table_name = ?
|
||
`
|
||
|
||
var indexes []SAIIndexInfo
|
||
iter := db.session.Query(stmt, []string{"keyspace_name", "table_name"}).
|
||
WithContext(ctx).
|
||
Consistency(gocql.One).
|
||
Bind(keyspace, table).
|
||
Iter()
|
||
|
||
var indexName, kind string
|
||
var options map[string]string
|
||
for iter.Scan(&indexName, &kind, &options) {
|
||
// 檢查是否為 SAI 索引(kind = 'CUSTOM' 且 class_name 包含 StorageAttachedIndex)
|
||
if kind == "CUSTOM" {
|
||
if className, ok := options["class_name"]; ok && strings.Contains(className, "StorageAttachedIndex") {
|
||
// 從 options 中提取 target(欄位名稱)
|
||
columnName := ""
|
||
if target, ok := options["target"]; ok {
|
||
columnName = strings.Trim(target, "()\"'")
|
||
}
|
||
indexes = append(indexes, SAIIndexInfo{
|
||
Name: indexName,
|
||
Type: "StorageAttachedIndex",
|
||
Options: options,
|
||
Column: columnName,
|
||
})
|
||
}
|
||
}
|
||
}
|
||
|
||
if err := iter.Close(); err != nil {
|
||
return nil, ErrInvalidInput.WithError(fmt.Errorf("failed to list SAI indexes: %w", err))
|
||
}
|
||
|
||
return indexes, nil
|
||
}
|
||
|
||
// SAIIndexInfo 表示 SAI 索引資訊
|
||
type SAIIndexInfo struct {
|
||
Name string // 索引名稱
|
||
Type string // 索引類型
|
||
Options map[string]string // 索引選項
|
||
Column string // 索引欄位名稱
|
||
}
|
||
|
||
// CheckSAIIndexExists 檢查 SAI 索引是否存在
|
||
// keyspace: keyspace 名稱
|
||
// indexName: 索引名稱
|
||
func (db *DB) CheckSAIIndexExists(ctx context.Context, keyspace, indexName string) (bool, error) {
|
||
// 驗證參數
|
||
if keyspace == "" {
|
||
return false, ErrInvalidInput.WithError(fmt.Errorf("keyspace is required"))
|
||
}
|
||
if indexName == "" {
|
||
return false, ErrInvalidInput.WithError(fmt.Errorf("index name is required"))
|
||
}
|
||
|
||
// 查詢系統表檢查索引是否存在
|
||
stmt := `
|
||
SELECT index_name, kind, options
|
||
FROM system_schema.indexes
|
||
WHERE keyspace_name = ? AND index_name = ?
|
||
LIMIT 1
|
||
`
|
||
|
||
var foundIndexName, kind string
|
||
var options map[string]string
|
||
err := db.session.Query(stmt, []string{"keyspace_name", "index_name"}).
|
||
WithContext(ctx).
|
||
Consistency(gocql.One).
|
||
Bind(keyspace, indexName).
|
||
Scan(&foundIndexName, &kind, &options)
|
||
|
||
if err == gocql.ErrNotFound {
|
||
return false, nil
|
||
}
|
||
if err != nil {
|
||
return false, ErrInvalidInput.WithError(fmt.Errorf("failed to check SAI index existence: %w", err))
|
||
}
|
||
|
||
// 檢查是否為 SAI 索引
|
||
if kind == "CUSTOM" {
|
||
if className, ok := options["class_name"]; ok && strings.Contains(className, "StorageAttachedIndex") {
|
||
return true, nil
|
||
}
|
||
}
|
||
|
||
return false, nil
|
||
}
|
||
|
||
// WaitForSAIIndex 等待 SAI 索引建立完成(用於異步建立)
|
||
// keyspace: keyspace 名稱
|
||
// indexName: 索引名稱
|
||
// maxWaitTime: 最大等待時間(秒)
|
||
func (db *DB) WaitForSAIIndex(ctx context.Context, keyspace, indexName string, maxWaitTime int) error {
|
||
// 驗證參數
|
||
if keyspace == "" {
|
||
return ErrInvalidInput.WithError(fmt.Errorf("keyspace is required"))
|
||
}
|
||
if indexName == "" {
|
||
return ErrInvalidInput.WithError(fmt.Errorf("index name is required"))
|
||
}
|
||
|
||
// 查詢索引狀態
|
||
// 注意:Cassandra 沒有直接的索引狀態查詢,這裡需要通過檢查索引是否可用來判斷
|
||
// 實際實作可能需要根據具體的 Cassandra 版本調整
|
||
|
||
// 簡單實作:檢查索引是否存在
|
||
exists, err := db.CheckSAIIndexExists(ctx, keyspace, indexName)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
if !exists {
|
||
return ErrInvalidInput.WithError(fmt.Errorf("index %s does not exist", indexName))
|
||
}
|
||
|
||
// 注意:實際的等待邏輯可能需要查詢系統表或使用其他方法
|
||
// 這裡只是基本框架,實際使用時可能需要根據具體需求調整
|
||
|
||
return nil
|
||
}
|