From edaf0b5ac3c8c43dd3cc1d20daa886e6dd55bf26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=80=A7=E9=A9=8A?= Date: Wed, 6 Aug 2025 07:41:29 +0800 Subject: [PATCH] add cassandra --- etc/blockchain.yaml | 2 +- .../database/202508051001001.kline.up.cql | 16 + internal/config/config.go | 19 + internal/domain/entity/kline.go | 26 +- internal/domain/repository/data_source.go | 3 +- internal/lib/cassandra/batch.go | 111 ++++ internal/lib/cassandra/batch_test.go | 69 +++ internal/lib/cassandra/cassandra.go | 216 +++++++ internal/lib/cassandra/cassandra_test.go | 170 ++++++ internal/lib/cassandra/const.go | 22 + internal/lib/cassandra/crud.go | 206 +++++++ internal/lib/cassandra/crud_test.go | 566 ++++++++++++++++++ internal/lib/cassandra/errors.go | 5 + internal/lib/cassandra/ez_transaction.go | 275 +++++++++ internal/lib/cassandra/ez_transaction_test.go | 291 +++++++++ internal/lib/cassandra/go.mod | 70 +++ internal/lib/cassandra/go.sum | 224 +++++++ .../init_cassandra_container_test.go | 93 +++ internal/lib/cassandra/lock.go | 125 ++++ internal/lib/cassandra/lock_test.go | 1 + internal/lib/cassandra/metadata.go | 68 +++ internal/lib/cassandra/metadata_test.go | 73 +++ internal/lib/cassandra/option.go | 102 ++++ internal/lib/cassandra/option_test.go | 108 ++++ internal/lib/cassandra/table.go | 332 ++++++++++ internal/lib/cassandra/table_test.go | 161 +++++ internal/lib/cassandra/utils.go | 42 ++ internal/lib/cassandra/utils_test.go | 85 +++ internal/repository/data_source_binance.go | 10 + internal/svc/cassandra.go | 45 ++ internal/svc/service_context.go | 6 + 31 files changed, 3527 insertions(+), 15 deletions(-) create mode 100644 generate/database/202508051001001.kline.up.cql create mode 100644 internal/lib/cassandra/batch.go create mode 100644 internal/lib/cassandra/batch_test.go create mode 100644 internal/lib/cassandra/cassandra.go create mode 100644 internal/lib/cassandra/cassandra_test.go create mode 100644 internal/lib/cassandra/const.go create mode 100644 internal/lib/cassandra/crud.go create mode 100644 internal/lib/cassandra/crud_test.go create mode 100644 internal/lib/cassandra/errors.go create mode 100644 internal/lib/cassandra/ez_transaction.go create mode 100644 internal/lib/cassandra/ez_transaction_test.go create mode 100644 internal/lib/cassandra/go.mod create mode 100644 internal/lib/cassandra/go.sum create mode 100644 internal/lib/cassandra/init_cassandra_container_test.go create mode 100644 internal/lib/cassandra/lock.go create mode 100644 internal/lib/cassandra/lock_test.go create mode 100644 internal/lib/cassandra/metadata.go create mode 100644 internal/lib/cassandra/metadata_test.go create mode 100644 internal/lib/cassandra/option.go create mode 100644 internal/lib/cassandra/option_test.go create mode 100644 internal/lib/cassandra/table.go create mode 100644 internal/lib/cassandra/table_test.go create mode 100644 internal/lib/cassandra/utils.go create mode 100644 internal/lib/cassandra/utils_test.go create mode 100644 internal/svc/cassandra.go diff --git a/etc/blockchain.yaml b/etc/blockchain.yaml index 79f7a95..0bb8e20 100644 --- a/etc/blockchain.yaml +++ b/etc/blockchain.yaml @@ -8,7 +8,7 @@ Binance: Key: "" Secret: "" TestMode: true - WorkerSize: 2048 + WorkerSize: 10 RedisCluster: Host: 127.0.0.1:6379 diff --git a/generate/database/202508051001001.kline.up.cql b/generate/database/202508051001001.kline.up.cql new file mode 100644 index 0000000..3d62d2f --- /dev/null +++ b/generate/database/202508051001001.kline.up.cql @@ -0,0 +1,16 @@ +CREATE TABLE IF NOT EXISTS kline ( + symbol TEXT, + interval TEXT, + open_time BIGINT, + open TEXT, + high TEXT, + low TEXT, + close TEXT, + volume TEXT, + close_time BIGINT, + quote_asset_volume TEXT, + number_of_trades INT, + taker_buy_base_asset_volume TEXT, + taker_buy_quote_asset_volume TEXT, + PRIMARY KEY ((symbol, interval), open_time) +) WITH CLUSTERING ORDER BY (open_time ASC); \ No newline at end of file diff --git a/internal/config/config.go b/internal/config/config.go index 170d38e..8d678fe 100755 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,6 +3,7 @@ package config import ( "github.com/zeromicro/go-zero/core/stores/redis" "github.com/zeromicro/go-zero/zrpc" + "time" ) type Config struct { @@ -10,6 +11,7 @@ type Config struct { Binance Binance // Redis Cluster RedisCluster redis.RedisConf + Cassandra Cassandra } type Binance struct { @@ -18,3 +20,20 @@ type Binance struct { TestMode bool WorkerSize int64 } + +// Cassandra 資料庫設定 +type Cassandra struct { + Hosts []string `json:"hosts" env:"CASSANDRA_HOSTS"` // Cassandra 主機列表(逗號分隔) + Port int `json:"port" env:"CASSANDRA_PORT"` // 連線埠 + Keyspace string `json:"keyspace" env:"CASSANDRA_KEYSPACE"` // 使用的 Keyspace 名稱 + UseAuth bool `json:"useAuth" env:"CASSANDRA_USE_AUTH"` // 是否使用帳號密碼 + Username string `json:"username" env:"CASSANDRA_USERNAME"` // 認證用戶名 + Password string `json:"password" env:"CASSANDRA_PASSWORD"` // 認證密碼 + ConnectTimeoutSec int `json:"connectTimeoutSec" env:"CASSANDRA_CONNECT_TIMEOUT"` // 連線逾時(秒) + NumConns int `json:"numConns" env:"CASSANDRA_NUM_CONNS"` // 每節點連線數 + MaxRetries int `json:"maxRetries" env:"CASSANDRA_MAX_RETRIES"` // 重試次數 + RetryMin time.Duration `json:"retryMin" env:"CASSANDRA_RETRY_MIN"` // 最小重試間隔 + RetryMax time.Duration `json:"retryMax" env:"CASSANDRA_RETRY_MAX"` // 最大重試間隔 + ReconnectInitial time.Duration `json:"reconnectInitial" env:"CASSANDRA_RECONNECT_INITIAL"` // 初始重連間隔 + ReconnectMax time.Duration `json:"reconnectMax" env:"CASSANDRA_RECONNECT_MAX"` // 最大重連間隔 +} diff --git a/internal/domain/entity/kline.go b/internal/domain/entity/kline.go index 6e2ee3b..780d6c0 100644 --- a/internal/domain/entity/kline.go +++ b/internal/domain/entity/kline.go @@ -1,19 +1,19 @@ package entity type Kline struct { - OpenTime int64 `csv:"open_time"` // 開盤時間(毫秒) - Open string `csv:"open"` // 開盤價 - High string `csv:"high"` // 最高價 - Low string `csv:"low"` // 最低價 - Close string `csv:"close"` // 收盤價 - Volume string `csv:"volume"` // 成交量 - CloseTime int64 `csv:"close_time"` // 收盤時間(毫秒) - QuoteAssetVolume string `csv:"quote_asset_volume"` // 成交額(以報價資產計) - NumberOfTrades int `csv:"number_of_trades"` // 交易筆數 - TakerBuyBaseAssetVolume string `csv:"taker_buy_base_asset_volume"` // 主動買入成交量 - TakerBuyQuoteAssetVolume string `csv:"taker_buy_quote_asset_volume"` // 主動買入成交額 - Symbol string // 交易對 - Interval string // 12h,15m,1d,1h,1m,1s,2h,30m,3m,4h,5m,6h,8h + OpenTime int64 `csv:"open_time" db:"open_time" cql:"open_time" clustering_key:"true"` // 開盤時間(毫秒),clustering key,用於時序查詢 + Open string `csv:"open" db:"open" cql:"open"` // 開盤價 + High string `csv:"high" db:"high" cql:"high"` // 最高價 + Low string `csv:"low" db:"low" cql:"low"` // 最低價 + Close string `csv:"close" db:"close" cql:"close"` // 收盤價 + Volume string `csv:"volume" db:"volume" cql:"volume"` // 成交量 + CloseTime int64 `csv:"close_time" db:"close_time" cql:"close_time"` // 收盤時間(毫秒) + QuoteAssetVolume string `csv:"quote_asset_volume" db:"quote_asset_volume" cql:"quote_asset_volume"` // 成交額(以報價資產計) + NumberOfTrades int `csv:"number_of_trades" db:"number_of_trades" cql:"number_of_trades"` // 交易筆數 + TakerBuyBaseAssetVolume string `csv:"taker_buy_base_asset_volume" db:"taker_buy_base_asset_volume" cql:"taker_buy_base_asset_volume"` // 主動買入成交量 + TakerBuyQuoteAssetVolume string `csv:"taker_buy_quote_asset_volume" db:"taker_buy_quote_asset_volume" cql:"taker_buy_quote_asset_volume"` // 主動買入成交額 + Symbol string `db:"symbol" partition_key:"true" cql:"symbol"` // 交易對,partition key + Interval string `db:"interval" partition_key:"true" cql:"interval"` // K 線時間區間,partition key // 12h,15m,1d,1h,1m,1s,2h,30m,3m,4h,5m,6h,8h } func (s *Kline) TableName() string { diff --git a/internal/domain/repository/data_source.go b/internal/domain/repository/data_source.go index 6a85ee5..55a990a 100644 --- a/internal/domain/repository/data_source.go +++ b/internal/domain/repository/data_source.go @@ -11,8 +11,9 @@ type DataSourceRepository interface { } type KlineDownloader interface { - // FetchHistoryKline 抓歷史K線資料,startMillis=0 表示從最早,endMillis=0 表示到最新 + // FetchHistoryKline 抓歷史 K 線資料 FetchHistoryKline(ctx context.Context, param QueryKline) ([]*entity.Kline, error) + SaveHistoryKline(ctx context.Context, data []*entity.Kline) error } type QueryKline struct { diff --git a/internal/lib/cassandra/batch.go b/internal/lib/cassandra/batch.go new file mode 100644 index 0000000..afdc6ef --- /dev/null +++ b/internal/lib/cassandra/batch.go @@ -0,0 +1,111 @@ +package cassandra + +import ( + "context" + "fmt" + "reflect" + + "github.com/gocql/gocql" + "github.com/scylladb/gocqlx/v3" + "github.com/scylladb/gocqlx/v3/qb" + "github.com/scylladb/gocqlx/v3/table" +) + +// TODO: 只保證同一個 PK 下有一致性,中間有失敗的話可能只有失敗不會寫入,其他成功的還是會成功。 +// 之後會朝兩個方向走 +// 1. 最終一致性:目前的設計是直接寫入副表,然後透過 background worker 讀取 sync_task 表,補寫副表資料。 +// 2. 研究 自己做 TX_ID 以及 STATUS 的方案 +// 這個是已知問題,一定要解決 + +func (db *DB) NewBatch(ctx context.Context, keyspace string) *Batch { + session := db.GetSession() + return &Batch{ + ctx: ctx, + keyspace: keyspace, + db: db, + batch: gocqlx.Batch{ + Batch: session.NewBatch(gocql.LoggedBatch).WithContext(ctx), + }, + } +} + +type Batch struct { + ctx context.Context + keyspace string + db *DB + batch gocqlx.Batch +} + +func (tx *Batch) Insert(doc any) error { + metadata, err := GenerateTableMetadata(doc, tx.keyspace) + if err != nil { + return err + } + tbl := table.New(metadata) + stmt, names := tbl.Insert() + return tx.batch.BindStruct(tx.db.GetSession().Query(stmt, names), doc) +} + +func (tx *Batch) Delete(doc any) error { + metadata, err := GenerateTableMetadata(doc, tx.keyspace) + if err != nil { + return err + } + tbl := table.New(metadata) + stmt, names := tbl.Delete() + return tx.batch.BindStruct(tx.db.GetSession().Query(stmt, names), doc) +} + +func (tx *Batch) Update(doc any) error { + metadata, err := GenerateTableMetadata(doc, tx.keyspace) + if err != nil { + return err + } + v := reflect.ValueOf(doc) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + typ := v.Type() + + setCols := make([]string, 0) + setVals := make([]any, 0) + whereCols := make([]string, 0) + whereVals := make([]any, 0) + + for i := 0; i < typ.NumField(); i++ { + field := typ.Field(i) + tag := field.Tag.Get("db") + if tag == "" || tag == "-" { + continue + } + val := v.Field(i) + if !val.IsValid() { + continue + } + if contains(metadata.PartKey, tag) || contains(metadata.SortKey, tag) { + whereCols = append(whereCols, tag) + whereVals = append(whereVals, val.Interface()) + } else if !isZero(val) { + setCols = append(setCols, tag) + setVals = append(setVals, val.Interface()) + } + } + + if len(setCols) == 0 { + return fmt.Errorf("update: no non-zero fields in %+v", doc) + } + + builder := qb.Update(metadata.Name).Set(setCols...) + for _, col := range whereCols { + builder = builder.Where(qb.Eq(col)) + } + stmt, names := builder.ToCql() + args := append(setVals, whereVals...) + return tx.batch.Bind(tx.db.GetSession().Query(stmt, names), args...) +} + +func (tx *Batch) Commit() error { + session := tx.db.GetSession() + + return session.ExecuteBatch(&tx.batch) +} diff --git a/internal/lib/cassandra/batch_test.go b/internal/lib/cassandra/batch_test.go new file mode 100644 index 0000000..83b3cd8 --- /dev/null +++ b/internal/lib/cassandra/batch_test.go @@ -0,0 +1,69 @@ +package cassandra + +import ( + "testing" + "time" + + "github.com/gocql/gocql" + "github.com/stretchr/testify/assert" +) + +func TestBatchTx_AllSuccess(t *testing.T) { + ctx, cassandraContainer, host, port := setupCassandraContainer(t) + defer cassandraContainer.Terminate(ctx) + + db, err := NewCassandraDB( + []string{host}, + WithPort(port), + WithConsistency(gocql.One), + WithNumConns(2), + ) + assert.NoError(t, err) + + // 建立 keyspace 和 table + err = db.EnsureTable(` +CREATE KEYSPACE IF NOT EXISTS my_keyspace +WITH replication = { + 'class': 'SimpleStrategy', + 'replication_factor': 1 +};`) + assert.NoError(t, err) + + err = db.EnsureTable(` +CREATE TABLE IF NOT EXISTS my_keyspace.monkey_entity ( + id UUID, + name TEXT, + update_at TIMESTAMP, + create_at TIMESTAMP, + PRIMARY KEY ((id), name) +);`) + assert.NoError(t, err) + + now := time.Now() + id1 := gocql.TimeUUID() + id2 := gocql.TimeUUID() + + tx := db.NewBatch(ctx, "my_keyspace") + err = tx.Insert(&MonkeyEntity{ID: id1, Name: "Alice", UpdateAt: now, CreateAt: now}) + assert.NoError(t, err) + err = tx.Insert(&MonkeyEntity{ID: id2, Name: "Bob", UpdateAt: now, CreateAt: now}) + assert.NoError(t, err) + err = tx.Update(&MonkeyEntity{ID: id1, Name: "Alice", UpdateAt: now.Add(5 * time.Minute)}) + assert.NoError(t, err) + err = tx.Delete(&MonkeyEntity{ID: id2, Name: "Bob"}) + assert.NoError(t, err) + + err = tx.Commit() + assert.NoError(t, err) + + // Alice 應該還在,且被更新 + var alice MonkeyEntity + alice.ID, alice.Name = id1, "Alice" + err = db.Get(ctx, &alice, "my_keyspace") + assert.NoError(t, err) + assert.WithinDuration(t, now.Add(5*time.Minute), alice.UpdateAt, time.Second) + + // Bob 應該被刪除 + err = db.Get(ctx, &MonkeyEntity{ID: id2, Name: "Bob"}, "my_keyspace") + assert.Error(t, err) +} diff --git a/internal/lib/cassandra/cassandra.go b/internal/lib/cassandra/cassandra.go new file mode 100644 index 0000000..c1966dc --- /dev/null +++ b/internal/lib/cassandra/cassandra.go @@ -0,0 +1,216 @@ +package cassandra + +import ( + "fmt" + "github.com/gocql/gocql" + "github.com/scylladb/gocqlx/v3" + "github.com/zeromicro/go-zero/core/logx" + "strconv" + "strings" + "time" +) + +// conf 是初始化 CassandraDB 所需的內部設定(私有) +type conf struct { + Hosts []string // Cassandra 主機列表 + Port int // 連線埠 + Keyspace string // 預設使用的 Keyspace + Username string // 認證用戶名 + Password string // 認證密碼 + Consistency gocql.Consistency // 一致性級別 + ConnectTimeoutSec int // 連線逾時秒數 + NumConnect int // 每個節點連線數 + MaxRetries int // 重試次數 + UseAuth bool // 是否使用帳號密碼驗證 + RetryMin time.Duration // 重試間隔最小值 + RetryMax time.Duration // 重試間隔最大值 + ReconnectInitial time.Duration // 重連初始間隔 + ReconnectMax time.Duration // 重連最大間隔 + CQLVersion string // 執行連線的CQL 版本號 +} + +// DB 是封裝了 Cassandra 資料庫 session 的結構 +type DB struct { + session gocqlx.Session + SaiSupported bool // 是否支援 sai + Version string // 資料庫版本 +} + +// NewDB 初始化並建立 Cassandra 資料庫連線,使用預設設定並可透過Option修改 +func NewDB(hosts []string, opts ...Option) (*DB, error) { + config := &conf{ + Hosts: hosts, + Port: defaultPort, + Consistency: defaultConsistency, + ConnectTimeoutSec: defaultTimeoutSec, + NumConnect: defaultNumConnections, + MaxRetries: defaultMaxRetries, + RetryMin: defaultRetryMin, + RetryMax: defaultRetryMax, + ReconnectInitial: defaultReconnectInitial, + ReconnectMax: defaultReconnectMax, + CQLVersion: defaultCqlVersion, + } + + // 套用Option設定選項 + for _, opt := range opts { + opt(config) + } + + // 建立連線設定 + cluster := gocql.NewCluster(config.Hosts...) + cluster.Port = config.Port + cluster.Consistency = config.Consistency + cluster.Timeout = time.Duration(config.ConnectTimeoutSec) * time.Second + cluster.NumConns = config.NumConnect + cluster.RetryPolicy = &gocql.ExponentialBackoffRetryPolicy{ + NumRetries: config.MaxRetries, + Min: config.RetryMin, + Max: config.RetryMax, + } + + cluster.ReconnectionPolicy = &gocql.ExponentialReconnectionPolicy{ + MaxRetries: config.MaxRetries, + InitialInterval: config.ReconnectInitial, + MaxInterval: config.ReconnectMax, + } + var session *gocql.Session + var err error + for i := 1; i <= config.MaxRetries; i++ { + if session != nil { + session.Close() + } + session, err = cluster.CreateSession() + if err == nil { + break + } + + waitInterval := cluster.ReconnectionPolicy.GetInterval(i) + logx.Errorf("[CassandraDB] Retry attempt #%d, waiting %s...", i, waitInterval) + time.Sleep(waitInterval) + } + + if session == nil { + panic("failed to connect ....") + } + + // 若有提供 Keyspace 則指定 + if config.Keyspace != "" { + cluster.Keyspace = config.Keyspace + } + + // 若啟用驗證則設定帳號密碼 + if config.UseAuth { + cluster.Authenticator = gocql.PasswordAuthenticator{ + Username: config.Username, + Password: config.Password, + } + } + + logx.Infof("[CassandraDB] try to connect to Cassandra cluster %v, port: %d", config.Hosts, config.Port) + // 建立 Session + s, err := gocqlx.WrapSession(session, nil) + if err != nil { + return nil, fmt.Errorf("failed to connect to Cassandra cluster: %s", err) + } + logx.Infof("[CassandraDB] success init Cassandra cluster") + + db := &DB{ + session: s, + } + version, err := db.getReleaseVersion() + if err != nil { + return nil, fmt.Errorf("failed to get DB version: %s", err) + } + + db.Version = version + db.SaiSupported = isSAISupported(version) + + return db, nil +} + +// NewDBFromSession 用現成的 gocql.Session 封裝 +func NewDBFromSession(session *gocql.Session) (*DB, error) { + s, err := gocqlx.WrapSession(session, nil) + if err != nil { + return nil, fmt.Errorf("failed to wrap gocql session: %w", err) + } + + db := &DB{ + session: s, + } + version, err := db.getReleaseVersion() + if err != nil { + return nil, fmt.Errorf("failed to get DB version: %s", err) + } + + db.Version = version + db.SaiSupported = isSAISupported(version) + + return db, nil +} + +// Close 關閉 Cassandra 資料庫連線 +func (db *DB) Close() { + db.session.Close() +} + +// GetSession 返回目前使用的 Cassandra Session +func (db *DB) GetSession() gocqlx.Session { + return db.session +} + +// EnsureTable 確認並建立資料表 +func (db *DB) EnsureTable(schema string) error { + return db.session.ExecStmt(schema) +} + +func (db *DB) InitVersionSupport() error { + version, err := db.getReleaseVersion() + if err != nil { + return err + } + db.Version = version + db.SaiSupported = isSAISupported(version) + return nil +} + +func (db *DB) getReleaseVersion() (string, error) { + var version string + stmt := "SELECT release_version FROM system.local" + err := db.GetSession().Query(stmt, []string{"release_version"}).Consistency(gocql.One).Scan(&version) + return version, err +} + +func isSAISupported(version string) bool { + // 只要 major >=5 就支援 + // 4.0.9+ 才有 SAI,但不穩,強烈建議 5.0+ + parts := strings.Split(version, ".") + if len(parts) < 2 { + return false + } + major, _ := strconv.Atoi(parts[0]) + minor, _ := strconv.Atoi(parts[1]) + + if major > 5 { + return true + } + if major == 5 { + return true + } + if major == 4 { + if minor > 0 { // 4.1.x、4.2.x 直接支援 + return true + } + if minor == 0 { + patch := 0 + if len(parts) >= 3 { + patch, _ = strconv.Atoi(parts[2]) + } + if patch >= 9 { + return true + } + } + } + return false +} diff --git a/internal/lib/cassandra/cassandra_test.go b/internal/lib/cassandra/cassandra_test.go new file mode 100644 index 0000000..2e156fc --- /dev/null +++ b/internal/lib/cassandra/cassandra_test.go @@ -0,0 +1,170 @@ +package cassandra + +import ( + "fmt" + "github.com/gocql/gocql" + "github.com/stretchr/testify/assert" + "testing" +) + +// TestCassandraDB_Integration_TableDriven 使用 table-driven 方式整合測試 +func TestCassandraDB_Integration_TableDriven(t *testing.T) { + // 啟動 Cassandra container + dbContainer, err := initCassandraContainer("5.0.4") + defer func() { + _ = dbContainer.Container.Terminate(dbContainer.Ctx) + fmt.Println("[TEST] Container terminated") + }() + + // 建立 CassandraDB 連線 + hosts := []string{dbContainer.Host} + db, err := NewDB( + hosts, + WithPort(dbContainer.Port), + WithConsistency(gocql.One), + WithNumConnects(2), + ) + assert.NoError(t, err, "should success create CassandraDB") + assert.NotNil(t, db, "db should not be nil") + assert.NotNil(t, db.GetSession(), "get Session should not be nil") + + err = db.EnsureTable("CREATE KEYSPACE my_keyspace\nWITH replication = {\n 'class': 'SimpleStrategy',\n 'replication_factor': 1\n};\n") + assert.NoError(t, err, "should success ensure table") + // 注意:由於 Close 會關閉 session,因此請把測試 Close 的子案例放在所有使用 session 的子案例之後 + tests := []struct { + name string + action func() error + wantErr bool + }{ + { + name: "ok", + action: func() error { + // 建立一個合法的資料表 (使用 IF NOT EXISTS 避免重複建立錯誤) + schema := "CREATE TABLE IF NOT EXISTS my_keyspace.test (id uuid PRIMARY KEY, name text)" + return db.EnsureTable(schema) + }, + wantErr: false, + }, + { + name: "failed to ensure table since wrong schema", + action: func() error { + // 傳入無效的 CQL 語法,預期應回傳錯誤 + schema := "CREATE TABLE invalid schema" + return db.EnsureTable(schema) + }, + wantErr: true, + }, + { + name: "GetSession 返回有效 Session", + action: func() error { + if db.GetSession().Session == nil { + return fmt.Errorf("session is nil") + } + + return nil + }, + wantErr: false, + }, + { + name: "Close close Session", + action: func() error { + db.Close() + // 無法直接驗證內部是否已關閉,但可避免再次使用 session 產生 panic + return nil + }, + wantErr: false, + }, + } + + // 依序執行各子案例 + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := tc.action() + if (err != nil) != tc.wantErr { + t.Errorf("%s havs error = %v, wantErr %v", tc.name, err, tc.wantErr) + } + }) + } +} + +func TestIsSAISupported(t *testing.T) { + tests := []struct { + version string + expected bool + }{ + {"5.0.0", true}, // 5.x 支援 + {"5.1.2", true}, // 5.x 支援 + {"6.0.0", true}, // 6.x 理論上也支援 + {"4.0.8", false}, // 4.0.8 不支援 + {"4.0.9", true}, // 4.0.9 支援 + {"4.1.0", true}, // 4.1.0 支援 + {"4.2.2", true}, // 4.2.2 支援 + {"3.11.10", false}, // 3.x 不支援 + {"3.0.0", false}, + {"", false}, // 空字串,不支援 + {"unknown", false}, // 無效格式 + {"4", false}, // 缺 patch,不支援 + {"4.0", false}, // 缺 patch,不支援 + {"5", false}, // 缺 minor + {"5.0", true}, // 5.0 預設支援 + } + + for _, tt := range tests { + t.Run(tt.version, func(t *testing.T) { + result := isSAISupported(tt.version) + assert.Equal(t, tt.expected, result, "version: %s", tt.version) + }) + } +} + +func TestCassandraDB_getReleaseVersion(t *testing.T) { + type fields struct { + Version string + } + + tests := []struct { + name string + fields fields + want string + wantError bool + }{ + { + name: "3", + fields: fields{Version: "3.11"}, + want: "3.11.19", + wantError: false, + }, + { + name: "5", + fields: fields{Version: "5.0.4"}, + want: "5.0.4", + wantError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + container, err := initCassandraContainer(tt.fields.Version) + defer func() { + _ = container.Container.Terminate(container.Ctx) + fmt.Println("[TEST] Container terminated") + }() + + if !tt.wantError { + assert.NoError(t, err) + // 建立 CassandraDB 連線 + hosts := []string{container.Host} + db, err := NewDB( + hosts, + WithPort(container.Port), + WithConsistency(gocql.One), + WithNumConnects(2), + ) + assert.NoError(t, err) + version, err := db.getReleaseVersion() + assert.NoError(t, err) + assert.Equal(t, version, tt.want) + } + }) + } +} diff --git a/internal/lib/cassandra/const.go b/internal/lib/cassandra/const.go new file mode 100644 index 0000000..6a8eeb3 --- /dev/null +++ b/internal/lib/cassandra/const.go @@ -0,0 +1,22 @@ +package cassandra + +import ( + "time" + + "github.com/gocql/gocql" +) + +// 預設設定常數 +const ( + defaultNumConnections = 10 // 預設每個節點的連線數量 + defaultCqlVersion = "3.0.0" + defaultTimeoutSec = 10 // 預設連線逾時秒數 + defaultMaxRetries = 3 // 預設重試次數 + defaultPort = 9042 + defaultConsistency = gocql.Quorum + defaultRetryMin = 1 * time.Second + defaultRetryMax = 30 * time.Second + defaultReconnectInitial = 1 * time.Second + defaultReconnectMax = 60 * time.Second + spanName = "cassandra" +) diff --git a/internal/lib/cassandra/crud.go b/internal/lib/cassandra/crud.go new file mode 100644 index 0000000..4398bac --- /dev/null +++ b/internal/lib/cassandra/crud.go @@ -0,0 +1,206 @@ +package cassandra + +import ( + "context" + "errors" + "fmt" + "github.com/gocql/gocql" + "github.com/scylladb/gocqlx/v3/qb" + "github.com/scylladb/gocqlx/v3/table" + "reflect" + "time" +) + +// Deprecated: is deprecate please use new func Query.InsertOne() +func (db *DB) Insert(ctx context.Context, document any, keyspace string) error { + metadata, err := GenerateTableMetadata(document, keyspace) + if err != nil { + return err + } + t := table.New(metadata) + q := db.GetSession().Query(t.Insert()).BindStruct(document).WithContext(ctx).WithTimestamp(time.Now().UnixNano() / 1e3) + err = q.ExecRelease() + + return err +} + +// Get 根據 struct 的 Primary Key 查詢單筆資料(Get ByPK) +// - filter 為目標資料 struct,其欄位需對應表格的 Primary Key 欄位(Partition Key + Clustering Key) +// - Cassandra 中 Primary Key 是由 Partition Key 與 Clustering Key 組成的整體,作為唯一識別一筆資料的 key +// - Cassandra 並不保證 Partition Key 或 Clustering Key 單獨具有唯一性,只有整個 Primary Key 才是唯一 +// - Partition Key 的作用是將資料分布到不同節點(Node),Clustering Key 則是節點內排序資料用 +// - 如果僅提供 Partition Key,會查到分區內的多筆資料,但由於 .Get() 預設加 LIMIT 1,僅會取得其中一筆(排序第一) +// - 若想查詢特定欄位(如 name)但該欄位不是 Primary Key 組成部分,則無法使用 .Get() 查詢,也無法用該欄位直接篩選資料(會報錯) +// - 解法是:1. 改變 table 結構使欲查欄位成為 PK,或 2. 建立額外 table 以該欄位為 Partition Key,或 3. 使用 ALLOW FILTERING(不建議) +// Deprecated: is deprecate please use new func Query.Get() +func (db *DB) Get(ctx context.Context, dest any, keyspace string) error { + metadata, err := GenerateTableMetadata(dest, keyspace) + if err != nil { + return err + } + t := table.New(metadata) + q := db.GetSession().Query(t.Get()).BindStruct(dest).WithContext(ctx).WithTimestamp(time.Now().UnixNano() / 1e3) + err = q.GetRelease(dest) + + if errors.Is(err, gocql.ErrNotFound) { + return ErrNotFound + } else if err != nil { + return err + } + + return nil +} + +// Delete 依據 document 的主鍵產生 DELETE 語句並執行 +// Deprecated: is deprecate please use new func Query.Delete() +func (db *DB) Delete(ctx context.Context, filter any, keyspace string) error { + metadata, err := GenerateTableMetadata(filter, keyspace) + if err != nil { + return err + } + t := table.New(metadata) + stmt, names := t.Delete() + q := db.GetSession().Query(stmt, names).BindStruct(filter).WithContext(ctx).WithTimestamp(time.Now().UnixNano() / 1e3) + + return q.ExecRelease() +} + +// TODO: Cassandra 不支援 OFFSET 方式的分頁(例如查詢第 N 頁) +// 原因:Cassandra 是分散式資料庫,設計上不允許像傳統 SQL 那樣用 OFFSET 跳頁,會導致效能極差 +// ✅ 正確方式為使用 PagingState 做游標式(Cursor-based)分頁,一頁一頁往後翻 +// ✅ 如果需要快取第 N 頁位置,應在應用層儲存每一頁的 PagingState 以供跳轉 +// ❌ Cassandra 不適合直接實作全站排行榜或全表分頁查詢,除非搭配 ElasticSearch 或針對 Partition Key 分頁設計 +// 若未來有特定分區(如 user_id)條件,可考慮實作分區內的分頁邏輯以提高效能 + +// GetAll 取得指定 struct 類型在 Cassandra 中的所有資料 +// - structInstance:用來推斷 table 結構的範例物件(可為指標) +// - result:要寫入的 slice 指標,如 *[]MyStruct +// Deprecated: is deprecate please use new func Query.GetAll() +func (db *DB) GetAll(ctx context.Context, filter any, result any, keyspace string) error { + metadata, err := GenerateTableMetadata(filter, keyspace) + if err != nil { + return err + } + t := table.New(metadata) + + stmt, names := qb.Select(t.Name()).Columns(metadata.Columns...).ToCql() + q := db.GetSession().Query(stmt, names).WithContext(ctx).WithTimestamp(time.Now().UnixNano() / 1e3) + + return q.SelectRelease(result) +} + +// QueryBuilder executes a query with optional conditions on Cassandra table +func (db *DB) QueryBuilder( + ctx context.Context, + tableStruct any, + result any, + keyspace string, + opts ...QueryOption, +) error { + metadata, err := GenerateTableMetadata(tableStruct, keyspace) + if err != nil { + return err + } + tbl := table.New(metadata) + + builder := qb.Select(tbl.Name()).Columns(metadata.Columns...) + bindMap := qb.M{} + for _, opt := range opts { + opt(builder, bindMap) + } + + stmt, names := builder.ToCql() + query := db.GetSession().Query(stmt, names).WithContext(ctx).BindMap(bindMap).WithTimestamp(time.Now().UnixNano() / 1e3) + + return query.SelectRelease(result) +} + +// Update 根據 document 欄位產生 UPDATE 語句並執行 +// - 只會更新非零值或非 nil 的欄位(零值欄位會被排除) +// - 主鍵欄位一定會保留,作為 WHERE 條件使用 +// Update 根據 document 產生 UPDATE 語句並執行(只更新非零值欄位,保留主鍵) +// Deprecated: is deprecate please use new func Query.Update() +func (db *DB) Update(ctx context.Context, document any, keyspace string) error { + metadata, err := GenerateTableMetadata(document, keyspace) + if err != nil { + return err + } + + v := reflect.ValueOf(document) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + typ := v.Type() + + // 收集更新欄位與其值(排除零值,保留主鍵) + setCols := make([]string, 0) + setVals := make([]any, 0) + whereCols := make([]string, 0) + whereVals := make([]any, 0) + + for i := 0; i < typ.NumField(); i++ { + field := typ.Field(i) + tag := field.Tag.Get("db") + if tag == "" || tag == "-" { + continue + } + + val := v.Field(i) + if !val.IsValid() { + continue + } + + if contains(metadata.PartKey, tag) || contains(metadata.SortKey, tag) { + whereCols = append(whereCols, tag) + whereVals = append(whereVals, val.Interface()) + continue + } + + if isZero(val) { + continue + } + + setCols = append(setCols, tag) + setVals = append(setVals, val.Interface()) + } + + for i := 0; i < typ.NumField(); i++ { + field := typ.Field(i) + tag := field.Tag.Get("db") + if tag == "" || tag == "-" { + continue + } + val := v.Field(i) + if !val.IsValid() { + continue + } + + if contains(metadata.PartKey, tag) || contains(metadata.SortKey, tag) { + whereCols = append(whereCols, tag) + whereVals = append(whereVals, val.Interface()) + continue + } + + // 只要 pointer 不為 nil,就 update(不管是不是 zero value) + if field.Type.Kind() == reflect.Ptr && !val.IsNil() { + setCols = append(setCols, tag) + setVals = append(setVals, val.Elem().Interface()) + } + } + + if len(setCols) == 0 { + return fmt.Errorf("no non-zero update fields provided") + } + + // Build UPDATE statement + builder := qb.Update(metadata.Name).Set(setCols...) + for _, col := range whereCols { + builder = builder.Where(qb.Eq(col)) + } + stmt, names := builder.ToCql() + + args := append(setVals, whereVals...) + q := db.GetSession().Query(stmt, names).Bind(args...).WithContext(ctx).WithTimestamp(time.Now().UnixNano() / 1e3) + + return q.ExecRelease() +} diff --git a/internal/lib/cassandra/crud_test.go b/internal/lib/cassandra/crud_test.go new file mode 100644 index 0000000..f781a4b --- /dev/null +++ b/internal/lib/cassandra/crud_test.go @@ -0,0 +1,566 @@ +package cassandra + +import ( + "context" + "testing" + "time" + + "github.com/gocql/gocql" + "github.com/scylladb/gocqlx/v3/qb" + "github.com/stretchr/testify/assert" + "github.com/testcontainers/testcontainers-go" +) + +type Consistency struct { + ID gocql.UUID `db:"id" partition:"true"` + ConsistencyName string `db:"consistency_name" sai:"true"` // can editor + ConsistencyType string `db:"consistency_type"` + LastTaskID string `db:"last_task_id"` // ConsistencyTask ID + Target string `db:"target"` // file name can editor + Status string `db:"status" sai:"true"` + ConsistencyMap string `db:"consistency_map"` // JSON string + CreateAT int64 `db:"create_at"` + UpdateAT int64 `db:"update_at"` +} + +func (c *Consistency) TableName() string { + return "consistency" +} + +func TestInsert(t *testing.T) { + ctx, cassandraContainer, host, port := setupCassandraContainer(t) + defer cassandraContainer.Terminate(ctx) + + // 連線 + hosts := []string{host} + db, err := NewCassandraDB( + hosts, + WithPort(port), + WithConsistency(gocql.One), + WithNumConns(2), + ) + assert.NoError(t, err) + assert.NotNil(t, db) + + // 建立 keyspace + table + err = db.EnsureTable("CREATE KEYSPACE my_keyspace\nWITH replication = {\n 'class': 'SimpleStrategy',\n 'replication_factor': 1\n};\n") + assert.NoError(t, err, "should success ensure table") + + err = db.EnsureTable(` +CREATE TABLE IF NOT EXISTS my_keyspace.monkey_entity ( + id UUID, + name TEXT, + update_at TIMESTAMP, + create_at TIMESTAMP, + PRIMARY KEY ((id), name) +);`) + assert.NoError(t, err) + + now := time.Now() + // 測試案例(可擴充) + tests := []struct { + name string + input MonkeyEntity + }{ + { + name: "insert George", + input: MonkeyEntity{ + ID: gocql.TimeUUID(), + Name: "George", + UpdateAt: now, + CreateAt: now, + }, + }, + { + name: "insert Bob", + input: MonkeyEntity{ + ID: gocql.TimeUUID(), + Name: "Bob", + UpdateAt: now, + CreateAt: now, + }, + }, + { + name: "insert Alice", + input: MonkeyEntity{ + ID: gocql.TimeUUID(), + Name: "Alice", + UpdateAt: now, + CreateAt: now, + }, + }, + } + + // 執行測試 + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := db.Insert(ctx, &tc.input, "my_keyspace") + assert.NoError(t, err) + + // 驗證寫入 + var name string + q := db.GetSession().Query("SELECT name FROM my_keyspace.monkey_entity WHERE id = ?", []string{"name"}) + err = q.Bind(tc.input.ID).GetRelease(&name) + assert.NoError(t, err) + assert.Equal(t, tc.input.Name, name) + }) + } +} + +func TestGet(t *testing.T) { + ctx, cassandraContainer, host, port := setupCassandraContainer(t) + defer cassandraContainer.Terminate(ctx) + + db, err := NewCassandraDB( + []string{host}, + WithPort(port), + WithConsistency(gocql.One), + WithNumConns(2), + ) + assert.NoError(t, err) + + err = db.EnsureTable(` +CREATE KEYSPACE IF NOT EXISTS my_keyspace +WITH replication = { + 'class': 'SimpleStrategy', + 'replication_factor': 1 +};`) + assert.NoError(t, err) + + err = db.EnsureTable(` +CREATE TABLE IF NOT EXISTS my_keyspace.monkey_entity ( + id UUID, + name TEXT, + update_at TIMESTAMP, + create_at TIMESTAMP, + PRIMARY KEY ((id), name) +);`) + assert.NoError(t, err) + + now := time.Now() + monkey := MonkeyEntity{ + ID: gocql.TimeUUID(), + Name: "George", + UpdateAt: now, + CreateAt: now, + } + + // 插入一筆資料 + err = db.Insert(ctx, &monkey, "my_keyspace") + assert.NoError(t, err) + + tests := []struct { + name string + filter MonkeyEntity + expect string + }{ + { + name: "Get existing monkey", + filter: MonkeyEntity{ID: monkey.ID, Name: monkey.Name}, + expect: "George", + }, + { + name: "Get non-existent monkey", + filter: MonkeyEntity{ID: gocql.TimeUUID(), Name: "GG"}, + expect: "", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + result := tc.filter // 預設填入主鍵 + err := db.Get(ctx, &result, "my_keyspace") + + if tc.expect == "" { + assert.Error(t, err, "expected error for missing record") + } else { + assert.NoError(t, err) + assert.Equal(t, tc.expect, result.Name) + } + }) + } +} + +func TestDelete(t *testing.T) { + ctx, cassandraContainer, host, port := setupCassandraContainer(t) + defer cassandraContainer.Terminate(ctx) + + db, err := NewCassandraDB( + []string{host}, + WithPort(port), + WithConsistency(gocql.One), + WithNumConns(2), + ) + assert.NoError(t, err) + + // 建立 keyspace & table + err = db.EnsureTable(` +CREATE KEYSPACE IF NOT EXISTS my_keyspace +WITH replication = { + 'class': 'SimpleStrategy', + 'replication_factor': 1 +};`) + assert.NoError(t, err) + + err = db.EnsureTable(` +CREATE TABLE IF NOT EXISTS my_keyspace.monkey_entity ( + id UUID, + name TEXT, + update_at TIMESTAMP, + create_at TIMESTAMP, + PRIMARY KEY ((id), name) +);`) + assert.NoError(t, err) + + now := time.Now() + monkey := MonkeyEntity{ + ID: gocql.TimeUUID(), + Name: "DeleteMe", + UpdateAt: now, + CreateAt: now, + } + + // 插入資料 + err = db.Insert(ctx, &monkey, "my_keyspace") + assert.NoError(t, err) + + // 先確認有插入成功 + verify := MonkeyEntity{ID: monkey.ID, Name: monkey.Name} + err = db.Get(ctx, &verify, "my_keyspace") + assert.NoError(t, err) + assert.Equal(t, "DeleteMe", verify.Name) + + // 執行刪除 + err = db.Delete(ctx, &monkey, "my_keyspace") + assert.NoError(t, err) + + // 再查,應該查不到 + result := MonkeyEntity{ID: monkey.ID, Name: monkey.Name} + err = db.Get(ctx, &result, "my_keyspace") + assert.Error(t, err, "expected error because record should be deleted") +} + +func TestUpdate(t *testing.T) { + ctx, cassandraContainer, host, port := setupCassandraContainer(t) + defer cassandraContainer.Terminate(ctx) + + db, err := NewCassandraDB( + []string{host}, + WithPort(port), + WithConsistency(gocql.One), + WithNumConns(2), + ) + assert.NoError(t, err) + + // 建立 keyspace & table + err = db.EnsureTable(` +CREATE KEYSPACE IF NOT EXISTS my_keyspace +WITH replication = { + 'class': 'SimpleStrategy', + 'replication_factor': 1 +};`) + assert.NoError(t, err) + + err = db.EnsureTable(` +CREATE TABLE IF NOT EXISTS my_keyspace.monkey_entity ( + id UUID, + name TEXT, + update_at TIMESTAMP, + create_at TIMESTAMP, + PRIMARY KEY ((id), name) +);`) + assert.NoError(t, err) + + now := time.Now() + id := gocql.TimeUUID() + + // Step 1: 插入初始資料 + monkey := MonkeyEntity{ + ID: id, + Name: "OldName", + UpdateAt: now, + CreateAt: now, + } + err = db.Insert(ctx, &monkey, "my_keyspace") + assert.NoError(t, err) + + // Step 2: 更新 UpdateAt 欄位(模擬只更新一欄) + updatedTime := now.Add(10 * time.Minute) + updateDoc := MonkeyEntity{ + ID: id, + Name: "OldName", // 主鍵 + UpdateAt: updatedTime, + // CreateAt 是零值,不會被更新 + } + err = db.Update(ctx, &updateDoc, "my_keyspace") + assert.NoError(t, err) + + // Step 3: 查詢回來驗證更新 + result := MonkeyEntity{ + ID: id, + Name: "OldName", + } + err = db.Get(ctx, &result, "my_keyspace") + assert.NoError(t, err) + assert.WithinDuration(t, updatedTime, result.UpdateAt, time.Second) + assert.WithinDuration(t, now, result.CreateAt, time.Second) // 未被更新 +} + +func setupTestQueryBuilder(t *testing.T) (*CassandraDB, testcontainers.Container, context.Context) { + ctx, cassandraContainer, host, port := setupCassandraContainer(t) + + // 連線 + hosts := []string{host} + db, err := NewCassandraDB( + hosts, + WithPort(port), + WithConsistency(gocql.One), + WithNumConns(2), + ) + assert.NoError(t, err) + assert.NotNil(t, db) + + // 建立 keyspace + table + err = db.EnsureTable("CREATE KEYSPACE my_keyspace\nWITH replication = {\n 'class': 'SimpleStrategy',\n 'replication_factor': 1\n};\n") + assert.NoError(t, err, "should success ensure table") + + err = db.EnsureTable(` + CREATE TABLE IF NOT EXISTS my_keyspace.consistency ( + id UUID, + consistency_name TEXT, + last_task_id TEXT, + target TEXT, + status TEXT, + consistency_type TEXT, + consistency_map TEXT, + create_at BIGINT, + update_at BIGINT, + PRIMARY KEY ((id)) + );`) + assert.NoError(t, err) + + return db, cassandraContainer, ctx +} + +func insertSampleConsistency(t *testing.T, db *CassandraDB, ctx context.Context, keyspace string) *Consistency { + c := &Consistency{ + ID: gocql.TimeUUID(), + ConsistencyName: "query-test", + LastTaskID: "task-1", + Target: "test.csv", + Status: "Running", + ConsistencyType: "simple", + ConsistencyMap: `{"example": "value"}`, + CreateAT: time.Now().UnixNano(), + UpdateAT: time.Now().UnixNano(), + } + + err := db.Insert(ctx, c, keyspace) + assert.NoError(t, err) + return c +} + +func TestQueryBuilder_WithWhere(t *testing.T) { + db, def, ctx := setupTestQueryBuilder(t) + defer def.Terminate(ctx) + + saved := insertSampleConsistency(t, db, ctx, "my_keyspace") + + t.Run("query by id", func(t *testing.T) { + var results []*Consistency + e := &Consistency{} + field := GetCqlTag(e, &e.ID) + err := db.QueryBuilder( + ctx, + &Consistency{}, + &results, + "my_keyspace", + WithWhere( + []qb.Cmp{qb.Eq(field)}, + map[string]any{field: saved.ID.String()}, + ), + ) + + assert.NoError(t, err) + assert.NotEmpty(t, results) + + found := false + for _, r := range results { + if r.ID == saved.ID { + found = true + break + } + } + assert.True(t, found, "should find inserted consistency") + }) + + t.Run("query with unmatched id", func(t *testing.T) { + var results []*Consistency + + e := &Consistency{} + field := GetCqlTag(e, &e.ID) + err := db.QueryBuilder( + ctx, + &Consistency{}, + &results, + "my_keyspace", + WithWhere( + []qb.Cmp{qb.Eq(field)}, + map[string]any{field: "NonExist"}, + ), + ) + + assert.Error(t, err) + assert.Empty(t, results) + }) + + t.Run("query by in", func(t *testing.T) { + var results []*Consistency + e := &Consistency{} + field := GetCqlTag(e, &e.ID) + err := db.QueryBuilder( + ctx, + &Consistency{}, + &results, + "my_keyspace", + WithWhere( + []qb.Cmp{qb.In(field)}, + map[string]any{field: []gocql.UUID{saved.ID}}, + ), + ) + + assert.NoError(t, err) + assert.NotEmpty(t, results) + + found := false + for _, r := range results { + if r.ID == saved.ID { + found = true + break + } + } + assert.True(t, found, "should find inserted consistency") + }) + + t.Run("query by one is not in", func(t *testing.T) { + var results []*Consistency + e := &Consistency{} + field := GetCqlTag(e, &e.ID) + err := db.QueryBuilder( + ctx, + &Consistency{}, + &results, + "my_keyspace", + WithWhere( + []qb.Cmp{qb.In(field)}, + map[string]any{field: []gocql.UUID{saved.ID, gocql.TimeUUID()}}, + ), + ) + + assert.NoError(t, err) + assert.NotEmpty(t, results) + + found := false + for _, r := range results { + if r.ID == saved.ID { + found = true + break + } + } + assert.True(t, found, "should find inserted consistency") + }) + + t.Run("query get all", func(t *testing.T) { + var results []*Consistency + e := &Consistency{} + err := db.QueryBuilder( + ctx, + e, + &results, + "my_keyspace", + ) + + assert.NoError(t, err) + assert.NotEmpty(t, results) + + found := false + for _, r := range results { + if r.ID == saved.ID { + found = true + break + } + } + assert.True(t, found, "should find inserted consistency") + }) + +} + +// ====================================================================================================================== +func TestSearchBySAIFields(t *testing.T) { + container, err := initCassandraContainer("5.0.4") + + ctx := context.Background() + defer container.Container.Terminate(ctx) + + // 連線 + hosts := []string{container.Host} + db, err := NewCassandraDB( + hosts, + WithPort(container.Port), + WithConsistency(gocql.One), + WithNumConns(2), + ) + assert.NoError(t, err) + assert.NotNil(t, db) + + // 建立 keyspace + table + err = db.EnsureTable("CREATE KEYSPACE my_keyspace\nWITH replication = {\n 'class': 'SimpleStrategy',\n 'replication_factor': 1\n};\n") + assert.NoError(t, err, "should success ensure table") + + err = db.EnsureTable(` + CREATE TABLE IF NOT EXISTS my_keyspace.consistency ( + id UUID, + consistency_name TEXT, + last_task_id TEXT, + target TEXT, + status TEXT, + consistency_type TEXT, + consistency_map TEXT, + create_at BIGINT, + update_at BIGINT, + PRIMARY KEY ((id)) + );`) + assert.NoError(t, err) + _ = db.AutoCreateSAIIndexes(&Consistency{}, "my_keyspace") + + c := &Consistency{ + ID: gocql.TimeUUID(), + ConsistencyName: "query-test", + LastTaskID: "task-1", + Target: "test.csv", + Status: "Running", + ConsistencyType: "simple", + ConsistencyMap: `{"example": "value"}`, + CreateAT: time.Now().UnixNano(), + UpdateAT: time.Now().UnixNano(), + } + + err = db.Insert(ctx, c, "my_keyspace") + assert.NoError(t, err) + + results := []Consistency{} + err = db.SearchBySAIFields(ctx, &Consistency{}, &results, "my_keyspace", + []qb.Cmp{qb.Eq("consistency_name")}, + map[string]any{"consistency_name": "query-test"}, + ) + assert.NoError(t, err) + assert.Len(t, results, 1) + + results2 := []Consistency{} + err = db.SearchBySAIFields(ctx, &Consistency{}, &results, "my_keyspace", + []qb.Cmp{qb.Eq("consistency_name")}, + map[string]any{"consistency_name": "vvvvvvv"}, + ) + assert.NoError(t, err) + assert.Len(t, results2, 0) +} diff --git a/internal/lib/cassandra/errors.go b/internal/lib/cassandra/errors.go new file mode 100644 index 0000000..adfd95c --- /dev/null +++ b/internal/lib/cassandra/errors.go @@ -0,0 +1,5 @@ +package cassandra + +import "fmt" + +var ErrNotFound = fmt.Errorf("not found") diff --git a/internal/lib/cassandra/ez_transaction.go b/internal/lib/cassandra/ez_transaction.go new file mode 100644 index 0000000..81ab185 --- /dev/null +++ b/internal/lib/cassandra/ez_transaction.go @@ -0,0 +1,275 @@ +package cassandra + +import ( + "context" + "fmt" + "github.com/gocql/gocql" + "github.com/scylladb/gocqlx/v3" + "github.com/scylladb/gocqlx/v3/qb" + "github.com/scylladb/gocqlx/v3/table" + "github.com/testcontainers/testcontainers-go/log" + "reflect" + "time" +) + +/* +todo 目前尚未實作的部分,但因為目前使用上並沒有嚴格一致性,故目前簡易的版本可先行 + + 1. 讀寫一致性問題 + Cassandra 本身為最終一致性,如果在 Commit 期間網路有短暫中斷,可能造成部分操作成功、部分失敗的「半提交」狀態。 + Commit 之後,再次掃描 Steps,看是否所有 IsExec 都為 true,若有 false,則觸發額外的重試或警示機制。 + +2. 反射收集欄位的可靠度 + Update 方法透過反射與 isZero 來排除不更新欄位,但若結構體中出現自訂零值(如自訂型態有預設值),可能誤過濾掉真正要更新的欄位。 + 可能在資料模型層先明確標示「要更新的欄位列表」,或提供外部參數指明更新欄位,以減少反射過濾錯誤。 + +3. 交易邊界與隔離度 + 此實作並未提供交易隔離(Isolation),外部程式仍可能在交易尚未 Commit 時讀到中間狀態。 + 若對讀取一致性有嚴格要求,可考慮使用 Cassandra 的 Lightweight Transactions(LWT)搭配 IF NOT EXISTS / IF 條件,確保寫入前的前置檢查。 + +4. 錯誤重試與警示 + 當 Commit 中某個步驟失敗,直接返回錯誤,但沒有集中收集失敗資訊。 + 建議整合一個「監控與重試」機制,將失敗細節(step index、錯誤訊息)記錄到外部持久化系統,以便運維人員介入或自動重試。 + +5. 崩潰恢復 + 如果程式在 Commit 過程中程式本身當掉,記憶體中的 Steps 會丟失,無法回滾。 + 可以把 OperationLog 持久化到可靠的日誌表(Cassandra 或外部 DB),Commit 之前就先寫入,並在啟動時掃描未完成的交易回滾或重試。 +*/ + +type Action int64 + +const ( + ActionUnknown Action = iota + ActionInsert + ActionDelete + ActionUpdate +) + +// OperationLog 記錄操作日誌,用於補償回滾 +type OperationLog struct { + ID gocql.UUID // 操作ID,用來標識該操作 + Action Action // 操作類型(增、刪、改) + IsExec bool + Exec []*gocqlx.Queryx // 這一個步驟要執行的東西 + OldData any // 變更前的數據,僅對修改和刪除有效 + NewData any // 變更後的數據,僅對新增和修改有效 +} + +// Transaction 這裡是單個實體內的 TX ,不管一些競爭的問題,不管隔離,就可以確定一筆資料要嘛全成功,要嘛全失敗。 +type Transaction interface { + Insert(ctx context.Context, document any) error + Delete(ctx context.Context, filter any) error + Update(ctx context.Context, document any) error + Rollback() error + Commit() error +} + +// transaction 定義補償操作的結構 +type transaction struct { + ctx context.Context + keyspace string + db *DB + Steps []OperationLog // 用來記錄所有操作步驟的日誌 +} + +func NewEZTransaction(ctx context.Context, keyspace string, db *DB) Transaction { + return &transaction{ + ctx: ctx, + keyspace: keyspace, + db: db, + Steps: []OperationLog{}, + } +} + +func (tx *transaction) Insert(ctx context.Context, document any) error { + metadata, err := GenerateTableMetadata(document, tx.keyspace) + if err != nil { + return err + } + t := table.New(metadata) + + q := tx.db.GetSession().Query(t.Insert()).BindStruct(document).WithContext(ctx).WithTimestamp(time.Now().UnixNano() / 1e3) + logEntry := OperationLog{ + ID: gocql.TimeUUID(), + Action: ActionInsert, + Exec: []*gocqlx.Queryx{q}, + NewData: document, + } + tx.Steps = append(tx.Steps, logEntry) + + return nil +} + +func (tx *transaction) Delete(ctx context.Context, filter any) error { + metadata, err := GenerateTableMetadata(filter, tx.keyspace) + if err != nil { + return err + } + t := table.New(metadata) + + doc := filter + + get := tx.db.GetSession().Query(t.Get()).BindStruct(doc).WithContext(ctx) + q := tx.db.GetSession().Query(t.Delete()).BindStruct(filter).WithContext(ctx).WithTimestamp(time.Now().UnixNano() / 1e3) + + logEntry := OperationLog{ + ID: gocql.TimeUUID(), + Action: ActionDelete, + Exec: []*gocqlx.Queryx{get, q}, // 有順序,要先拿取保留舊資料, + OldData: doc, // 保留結構體才有機會回復 + } + tx.Steps = append(tx.Steps, logEntry) + + return nil +} + +func (tx *transaction) Update(ctx context.Context, document any) error { + metadata, err := GenerateTableMetadata(document, tx.keyspace) + if err != nil { + return err + } + t := table.New(metadata) + + v := reflect.ValueOf(document) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + typ := v.Type() + + // 收集更新欄位與其值(排除零值,保留主鍵) + setCols := make([]string, 0) + setVals := make([]any, 0) + whereCols := make([]string, 0) + whereVals := make([]any, 0) + + for i := 0; i < typ.NumField(); i++ { + field := typ.Field(i) + tag := field.Tag.Get("db") + if tag == "" || tag == "-" { + continue + } + + val := v.Field(i) + if !val.IsValid() { + continue + } + + if contains(metadata.PartKey, tag) || contains(metadata.SortKey, tag) { + whereCols = append(whereCols, tag) + whereVals = append(whereVals, val.Interface()) + continue + } + + if isZero(val) { + continue + } + + setCols = append(setCols, tag) + setVals = append(setVals, val.Interface()) + } + + if len(setCols) == 0 { + return fmt.Errorf("no non-zero update fields provided") + } + + // Build UPDATE statement + builder := qb.Update(metadata.Name).Set(setCols...) + for _, col := range whereCols { + builder = builder.Where(qb.Eq(col)) + } + stmt, names := builder.ToCql() + + args := append(setVals, whereVals...) + q := tx.db.GetSession().Query(stmt, names).Bind(args...).WithContext(ctx).WithTimestamp(time.Now().UnixNano() / 1e3) + + doc := document + get := tx.db.GetSession().Query(t.Get()).BindStruct(doc).WithContext(ctx) + + logEntry := OperationLog{ + ID: gocql.TimeUUID(), + Action: ActionUpdate, + Exec: []*gocqlx.Queryx{get, q}, // 有順序,要先拿取保留舊資料,才可以 update + OldData: doc, // 保留結構體才有機會回復 + NewData: document, + } + tx.Steps = append(tx.Steps, logEntry) + + return nil +} + +func (tx *transaction) Rollback() error { + for _, item := range tx.Steps { + // 沒有做過的就不用回復了 + if !item.IsExec { + continue + } + + switch item.Action { + case ActionInsert: + err := tx.db.Delete(tx.ctx, item.NewData, tx.keyspace) + if err != nil { + // todo log 下來,最大程度保證,都有做完,真的有錯從 log 去補,目前不做 + log.Printf("failed to delete since rollback, data: %v", item.NewData) + continue + } + case ActionUpdate: + err := tx.db.Update(tx.ctx, item.OldData, tx.keyspace) + if err != nil { + // todo log 下來,最大程度保證,都有做完,真的有錯從 log 去補,目前不做 + log.Printf("failed to update since rollback, data: %v", item.OldData) + continue + } + case ActionDelete: + err := tx.db.Insert(tx.ctx, item.OldData, tx.keyspace) + if err != nil { + // todo log 下來,最大程度保證,都有做完,真的有錯從 log 去補,目前不做 + log.Printf("failed to insert since rollback, data: %v", item.OldData) + continue + } + } + } + + return nil +} + +func (tx *transaction) Commit() error { + for i, step := range tx.Steps { + switch step.Action { + case ActionInsert: + // 單純插入,不用回滾額外做事,插入的資料已經放在 New Data 裡面了 + if err := step.Exec[0].ExecRelease(); err != nil { + return fmt.Errorf("failed to insert: %w", err) + } + // 標示為以執行,如果有錯誤要回復,指座椅執行的就好 + tx.Steps[i].IsExec = true + case ActionUpdate: + // 要先 get 之後再 Update + // 單純插入,不用回滾額外做事,插入的資料已經放在 New Data 裡面了 + if err := step.Exec[0].GetRelease(step.OldData); err != nil { + return fmt.Errorf("failed to get: %w", err) + } + + if err := step.Exec[1].ExecRelease(); err != nil { + return fmt.Errorf("failed to update: %w", err) + } + + // 標示為以執行,如果有錯誤要回復,指座椅執行的就好 + tx.Steps[i].IsExec = true + case ActionDelete: + // 要先 get 之後再 Update + // 單純插入,不用回滾額外做事,插入的資料已經放在 New Data 裡面了 + if err := step.Exec[0].GetRelease(step.OldData); err != nil { + return fmt.Errorf("failed to get: %w", err) + } + if err := step.Exec[1].ExecRelease(); err != nil { + return fmt.Errorf("failed to delete: %w", err) + } + // 標示為以執行,如果有錯誤要回復,指座椅執行的就好 + tx.Steps[i].IsExec = true + default: + return fmt.Errorf("unknown action: %v", step.Action) + } + } + + return nil +} diff --git a/internal/lib/cassandra/ez_transaction_test.go b/internal/lib/cassandra/ez_transaction_test.go new file mode 100644 index 0000000..69c483b --- /dev/null +++ b/internal/lib/cassandra/ez_transaction_test.go @@ -0,0 +1,291 @@ +package cassandra + +import ( + "github.com/gocql/gocql" + "github.com/stretchr/testify/assert" + "testing" +) + +type TE struct { + ID gocql.UUID `cql:"id" partition:"true"` + Name string `cql:"name"` +} + +func (m *TE) TableName() string { + return "test_entity" +} + +func TestNewEZTransactionInsert(t *testing.T) { + ctx, cassandraContainer, host, port := setupCassandraContainer(t) + defer cassandraContainer.Terminate(ctx) + + // 連線 + hosts := []string{host} + db, err := NewCassandraDB( + hosts, + WithPort(port), + WithConsistency(gocql.One), + WithNumConns(2), + ) + assert.NoError(t, err) + assert.NotNil(t, db) + + // 建立 keyspace + table + err = db.EnsureTable("CREATE KEYSPACE my_keyspace\nWITH replication = {\n 'class': 'SimpleStrategy',\n 'replication_factor': 1\n};\n") + assert.NoError(t, err, "should success ensure table") + + err = db.EnsureTable(` +CREATE TABLE IF NOT EXISTS my_keyspace.test_entity ( + id UUID PRIMARY KEY, + name TEXT +);`) + assert.NoError(t, err) + + // 定義 table-driven 測試案例 + tests := []struct { + name string + doc TE + }{ + { + name: "insert_record_alice", + doc: TE{ + ID: gocql.TimeUUID(), + Name: "Alice", + }, + }, + { + name: "insert_record_bob", + doc: TE{ + ID: gocql.TimeUUID(), + Name: "Bob", + }, + }, + { + name: "insert_record_empty_name", + doc: TE{ + ID: gocql.TimeUUID(), + Name: "", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // 每個子案例都使用新的 transaction + tx := NewEZTransaction(ctx, "my_keyspace", db) + + // 1. 呼叫 Insert + err := tx.Insert(ctx, &tt.doc) + assert.NoError(t, err, "Insert() 應該不會錯誤") + + // 2. 呼叫 Commit,真正寫入 Cassandra + err = tx.Commit() + assert.NoError(t, err, "Commit() 應該不會錯誤") + + // 3. 從 Cassandra 查回資料,驗證 + var got TE + got.ID = tt.doc.ID + + err = db.Get(ctx, &got, "my_keyspace") + assert.NoError(t, err) + // 驗證欄位值符合 + assert.Equal(t, tt.doc.ID, got.ID, "ID 應一致") + assert.Equal(t, tt.doc.Name, got.Name, "Name 應一致") + }) + } +} + +func TestNewEZTransactionDelete(t *testing.T) { + ctx, cassandraContainer, host, port := setupCassandraContainer(t) + defer cassandraContainer.Terminate(ctx) + + // 連線 + hosts := []string{host} + db, err := NewCassandraDB( + hosts, + WithPort(port), + WithConsistency(gocql.One), + WithNumConns(2), + ) + assert.NoError(t, err) + assert.NotNil(t, db) + + // 建立 keyspace + table + err = db.EnsureTable("CREATE KEYSPACE my_keyspace\nWITH replication = {\n 'class': 'SimpleStrategy',\n 'replication_factor': 1\n};\n") + assert.NoError(t, err, "should success ensure table") + + err = db.EnsureTable(` +CREATE TABLE IF NOT EXISTS my_keyspace.test_entity ( + id UUID PRIMARY KEY, + name TEXT +);`) + assert.NoError(t, err) + + // 定義 table-driven 測試案例 + tests := []struct { + name string + doc TE + }{ + { + name: "ok", + doc: TE{ + ID: gocql.TimeUUID(), + Name: "Alice", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // 每個子案例都使用新的 transaction + tx := NewEZTransaction(ctx, "my_keyspace", db) + + // 1. 呼叫 Delete + err := tx.Insert(ctx, &tt.doc) + assert.NoError(t, err, "Insert() 應該不會錯誤") + + // 2. 呼叫 Delete + err = tx.Delete(ctx, &tt.doc) + assert.NoError(t, err, "Delete() 應該不會錯誤") + + // 3. 呼叫 Commit,真正寫入 Cassandra + err = tx.Commit() + assert.NoError(t, err, "Commit() 應該不會錯誤") + // + // 4. 從 Cassandra 查回資料,驗證 + var got TE + got.ID = tt.doc.ID + + err = db.Get(ctx, &got, "my_keyspace") + assert.Equal(t, err, gocql.ErrNotFound) + }) + } +} + +func TestNewEZTransactionUpdate(t *testing.T) { + ctx, cassandraContainer, host, port := setupCassandraContainer(t) + t.Cleanup(func() { cassandraContainer.Terminate(ctx) }) + + // 1. 連線並建立 keyspace + table + db, err := NewCassandraDB( + []string{host}, + WithPort(port), + WithConsistency(gocql.One), + WithNumConns(2), + ) + assert.NoError(t, err) + assert.NotNil(t, db) + + assert.NoError(t, db.EnsureTable(` +CREATE KEYSPACE IF NOT EXISTS my_keyspace +WITH replication = {'class':'SimpleStrategy','replication_factor':1}; +`)) + assert.NoError(t, db.EnsureTable(` +CREATE TABLE IF NOT EXISTS my_keyspace.test_entity ( + id UUID PRIMARY KEY, + name TEXT +); +`)) + + // 2. 插入初始資料 + id := gocql.TimeUUID() + before := TE{ID: id, Name: "Before"} + assert.NoError(t, db.Insert(ctx, &before, "my_keyspace")) + + // 定義多組更新案例 + tests := []struct { + name string + newName string + wantErr bool + }{ + {name: "update_to_Alice", newName: "Alice"}, + {name: "update_to_empty", newName: "", wantErr: true}, + {name: "update_to_Bob", newName: "Bob"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // 為每個案例都重置為 Before + // 重新 insert 一次(覆蓋舊值) + assert.NoError(t, db.Insert(ctx, &before, "my_keyspace")) + + // 3. 建立 transaction 並呼叫 Update + tx := NewEZTransaction(ctx, "my_keyspace", db) + updateDoc := TE{ID: id, Name: tt.newName} + err := tx.Update(ctx, &updateDoc) + if tt.wantErr { + assert.Error(t, err, "Update() 應該會出錯") + return + } + + assert.NoError(t, err, "Update() 不應出錯") + + // 4. Commit 實際寫入 + err = tx.Commit() + assert.NoError(t, err, "Commit() 不應出錯") + + // 5. 查詢並驗證 + var got TE + got.ID = id + err = db.Get(ctx, &got, "my_keyspace") + assert.NoError(t, err, "db.Get() 應成功") + + assert.Equal(t, id, got.ID, "ID 應一致") + assert.Equal(t, tt.newName, got.Name, "Name 應被更新為最新值") + }) + } +} + +func Test_Rollback(t *testing.T) { + ctx, cassandraContainer, host, port := setupCassandraContainer(t) + t.Cleanup(func() { cassandraContainer.Terminate(ctx) }) + + // 1. 連線並建立 keyspace + table + db, err := NewCassandraDB( + []string{host}, + WithPort(port), + WithConsistency(gocql.One), + WithNumConns(2), + ) + assert.NoError(t, err) + assert.NotNil(t, db) + + assert.NoError(t, db.EnsureTable(` +CREATE KEYSPACE IF NOT EXISTS my_keyspace +WITH replication = {'class':'SimpleStrategy','replication_factor':1}; +`)) + assert.NoError(t, db.EnsureTable(` +CREATE TABLE IF NOT EXISTS my_keyspace.test_entity ( + id UUID PRIMARY KEY, + name TEXT +); +`)) + + // 3. 用 Transaction 插入一筆資料,並 Commit + id := gocql.TimeUUID() + doc := TE{ID: id, Name: "Alice"} + tx := NewEZTransaction(ctx, "my_keyspace", db) + err = tx.Insert(ctx, &doc) + assert.NoError(t, err) + err = tx.Commit() + assert.NoError(t, err) + // 4. Query 確認資料已存在 + var got TE + got.ID = id + err = db.Get(ctx, &got, "my_keyspace") + assert.NoError(t, err) + assert.Equal(t, got.Name, doc.Name) + + // 5. 呼叫 Rollback,應自動刪除剛剛那筆 + err = tx.Rollback() + assert.NoError(t, err) + + var afterGot TE + afterGot.ID = id + err = db.Get(ctx, &afterGot, "my_keyspace") + assert.Error(t, err) + + // Output: + // after commit: Alice + // after rollback: not found +} diff --git a/internal/lib/cassandra/go.mod b/internal/lib/cassandra/go.mod new file mode 100644 index 0000000..f572fa3 --- /dev/null +++ b/internal/lib/cassandra/go.mod @@ -0,0 +1,70 @@ +module gitlab.supermicro.com/storage/cassandra + +go 1.24.2 + +require ( + github.com/gocql/gocql v1.7.0 + github.com/scylladb/gocqlx/v3 v3.0.1 + github.com/stretchr/testify v1.10.0 + github.com/testcontainers/testcontainers-go v0.37.0 + github.com/zeromicro/go-zero v1.8.3 +) + +require ( + dario.cat/mergo v1.0.1 // indirect + github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect + github.com/Microsoft/go-winio v0.6.2 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/containerd/log v0.1.0 // indirect + github.com/containerd/platforms v0.2.1 // indirect + github.com/cpuguy83/dockercfg v0.3.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/distribution/reference v0.6.0 // indirect + github.com/docker/docker v28.0.1+incompatible // indirect + github.com/docker/go-connections v0.5.0 // indirect + github.com/docker/go-units v0.5.0 // indirect + github.com/ebitengine/purego v0.8.2 // indirect + github.com/fatih/color v1.18.0 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-ole/go-ole v1.2.6 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect + github.com/klauspost/compress v1.17.11 // indirect + github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect + github.com/magiconair/properties v1.8.10 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/moby/docker-image-spec v1.3.1 // indirect + github.com/moby/patternmatcher v0.6.0 // indirect + github.com/moby/sys/sequential v0.5.0 // indirect + github.com/moby/sys/user v0.1.0 // indirect + github.com/moby/sys/userns v0.1.0 // indirect + github.com/moby/term v0.5.0 // indirect + github.com/morikuni/aec v1.0.0 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect + github.com/opencontainers/image-spec v1.1.1 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect + github.com/scylladb/go-reflectx v1.0.1 // indirect + github.com/shirou/gopsutil/v4 v4.25.1 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/tklauser/go-sysconf v0.3.12 // indirect + github.com/tklauser/numcpus v0.6.1 // indirect + github.com/yusufpapurcu/wmi v1.2.4 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect + go.opentelemetry.io/otel v1.35.0 // indirect + go.opentelemetry.io/otel/metric v1.35.0 // indirect + go.opentelemetry.io/otel/trace v1.35.0 // indirect + go.uber.org/automaxprocs v1.6.0 // indirect + golang.org/x/crypto v0.37.0 // indirect + golang.org/x/sys v0.32.0 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/internal/lib/cassandra/go.sum b/internal/lib/cassandra/go.sum new file mode 100644 index 0000000..15828b0 --- /dev/null +++ b/internal/lib/cassandra/go.sum @@ -0,0 +1,224 @@ +dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s= +dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= +github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU= +github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= +github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= +github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= +github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= +github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= +github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= +github.com/containerd/platforms v0.2.1 h1:zvwtM3rz2YHPQsF2CHYM8+KtB5dvhISiXh5ZpSBQv6A= +github.com/containerd/platforms v0.2.1/go.mod h1:XHCb+2/hzowdiut9rkudds9bE5yJ7npe7dG/wG+uFPw= +github.com/cpuguy83/dockercfg v0.3.2 h1:DlJTyZGBDlXqUZ2Dk2Q3xHs/FtnooJJVaad2S9GKorA= +github.com/cpuguy83/dockercfg v0.3.2/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc= +github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= +github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= +github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= +github.com/docker/docker v28.0.1+incompatible h1:FCHjSRdXhNRFjlHMTv4jUNlIBbTeRjrWfeFuJp7jpo0= +github.com/docker/docker v28.0.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= +github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= +github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= +github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/ebitengine/purego v0.8.2 h1:jPPGWs2sZ1UgOSgD2bClL0MJIqu58nOmIcBuXr62z1I= +github.com/ebitengine/purego v0.8.2/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= +github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= +github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/gocql/gocql v1.7.0 h1:O+7U7/1gSN7QTEAaMEsJc1Oq2QHXvCWoF3DFK9HDHus= +github.com/gocql/gocql v1.7.0/go.mod h1:vnlvXyFZeLBF0Wy+RS8hrOdbn0UWsWtdg07XJnFxZ+4= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= +github.com/magiconair/properties v1.8.10 h1:s31yESBquKXCV9a/ScB3ESkOjUYYv+X0rg8SYxI99mE= +github.com/magiconair/properties v1.8.10/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= +github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= +github.com/moby/patternmatcher v0.6.0 h1:GmP9lR19aU5GqSSFko+5pRqHi+Ohk1O69aFiKkVGiPk= +github.com/moby/patternmatcher v0.6.0/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YOUYxkhApJIxc= +github.com/moby/sys/sequential v0.5.0 h1:OPvI35Lzn9K04PBbCLW0g4LcFAJgHsvXsRyewg5lXtc= +github.com/moby/sys/sequential v0.5.0/go.mod h1:tH2cOOs5V9MlPiXcQzRC+eEyab644PWKGRYaaV5ZZlo= +github.com/moby/sys/user v0.1.0 h1:WmZ93f5Ux6het5iituh9x2zAG7NFY9Aqi49jjE1PaQg= +github.com/moby/sys/user v0.1.0/go.mod h1:fKJhFOnsCN6xZ5gSfbM6zaHGgDJMrqt9/reuj4T7MmU= +github.com/moby/sys/userns v0.1.0 h1:tVLXkFOxVu9A64/yh59slHVv9ahO9UIev4JZusOLG/g= +github.com/moby/sys/userns v0.1.0/go.mod h1:IHUYgu/kao6N8YZlp9Cf444ySSvCmDlmzUcYfDHOl28= +github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= +github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= +github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= +github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040= +github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= +github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= +github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/scylladb/go-reflectx v1.0.1 h1:b917wZM7189pZdlND9PbIJ6NQxfDPfBvUaQ7cjj1iZQ= +github.com/scylladb/go-reflectx v1.0.1/go.mod h1:rWnOfDIRWBGN0miMLIcoPt/Dhi2doCMZqwMCJ3KupFc= +github.com/scylladb/gocqlx/v3 v3.0.1 h1:JBvOUBz62LQ2lbIgJqQbwVMiDftbtrJSi63KVxvRYOQ= +github.com/scylladb/gocqlx/v3 v3.0.1/go.mod h1:EjbSZM0VR2a57ZUxCRQ3v3CSoWIkH1WTMwxeDbFQorY= +github.com/shirou/gopsutil/v4 v4.25.1 h1:QSWkTc+fu9LTAWfkZwZ6j8MSUk4A2LV7rbH0ZqmLjXs= +github.com/shirou/gopsutil/v4 v4.25.1/go.mod h1:RoUCUpndaJFtT+2zsZzzmhvbfGoDCJ7nFXKJf8GqJbI= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/testcontainers/testcontainers-go v0.37.0 h1:L2Qc0vkTw2EHWQ08djon0D2uw7Z/PtHS/QzZZ5Ra/hg= +github.com/testcontainers/testcontainers-go v0.37.0/go.mod h1:QPzbxZhQ6Bclip9igjLFj6z0hs01bU8lrl2dHQmgFGM= +github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= +github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= +github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= +github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= +github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +github.com/zeromicro/go-zero v1.8.3 h1:AwpBJQLAsZAt4OOnK0eR8UU1Ja2RFBIXfKkHdnXQKfc= +github.com/zeromicro/go-zero v1.8.3/go.mod h1:EnuEA3XdIQvAvc4WWTskRTO0jM2/aQi7OXv1gKWRNJ0= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= +go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ= +go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 h1:t6wl9SPayj+c7lEIFgm4ooDBZVb01IhLB4InpomhRw8= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0/go.mod h1:iSDOcsnSA5INXzZtwaBPrKp/lWu/V14Dd+llD0oI2EA= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0 h1:Xw8U6u2f8DK2XAkGRFV7BBLENgnTGX9i4rQRxJf+/vs= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0/go.mod h1:6KW1Fm6R/s6Z3PGXwSJN2K4eT6wQB3vXX6CVnYX9NmM= +go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M= +go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= +go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= +go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= +go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs= +go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= +go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= +go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= +go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= +go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= +golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= +golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= +golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= +golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/term v0.31.0 h1:erwDkOK1Msy6offm1mOgvspSkslFnIGsFnxOKoufg3o= +golang.org/x/term v0.31.0/go.mod h1:R4BeIy7D95HzImkxGkTW1UQTtP54tio2RyHz7PwK0aw= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= +golang.org/x/time v0.10.0 h1:3usCWA8tQn0L8+hFJQNgzpWbd89begxN66o1Ojdn5L4= +golang.org/x/time v0.10.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/api v0.0.0-20240711142825-46eb208f015d h1:kHjw/5UfflP/L5EbledDrcG4C2597RtymmGRZvHiCuY= +google.golang.org/genproto/googleapis/api v0.0.0-20240711142825-46eb208f015d/go.mod h1:mw8MG/Qz5wfgYr6VqVCiZcHe/GJEfI+oGGDCohaVgB0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 h1:BwIjyKYGsK9dMCBOorzRri8MQwmi7mT9rGHsCEinZkA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= +gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= diff --git a/internal/lib/cassandra/init_cassandra_container_test.go b/internal/lib/cassandra/init_cassandra_container_test.go new file mode 100644 index 0000000..3be2784 --- /dev/null +++ b/internal/lib/cassandra/init_cassandra_container_test.go @@ -0,0 +1,93 @@ +package cassandra + +import ( + "context" + "fmt" + "github.com/gocql/gocql" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" + "time" +) + +type Container struct { + Ctx context.Context + Container testcontainers.Container + Host string + Port int +} + +func initCassandraContainer(version string) (Container, error) { + ctx := context.Background() + req := testcontainers.ContainerRequest{ + Image: fmt.Sprintf("cassandra:%s", version), + Env: map[string]string{ + "CASSANDRA_START_RPC": "true", + "CASSANDRA_NUM_TOKENS": "1", + "CASSANDRA_ENDPOINT_SNITCH": "GossipingPropertyFileSnitch", + "CASSANDRA_DC": "datacenter1", + "CASSANDRA_RACK": "rack1", + "MAX_HEAP_SIZE": "256M", + "HEAP_NEWSIZE": "100M", + }, + ExposedPorts: []string{"9042/tcp"}, + // 等待 Cassandra 啟動完成的指標字串,依據實際啟動 log 可調整 + WaitingFor: wait.ForLog("Created default superuser role 'cassandra'"). + WithStartupTimeout(2 * time.Minute), + } + cassandraContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + return Container{}, err + } + + host, err := cassandraContainer.Host(ctx) + if err != nil { + return Container{}, err + } + + mappedPort, err := cassandraContainer.MappedPort(ctx, "9042") + if err != nil { + return Container{}, err + } + + return Container{ctx, cassandraContainer, host, mappedPort.Int()}, nil +} + +// Animal 為不實作 TableName 方法的範例 struct,則會以型別名稱轉換成 snake_case +type Animal struct { + ID gocql.UUID `db:"id" partition_key:"true"` + Type string `db:"type"` +} + +func (m *Animal) TableName() string { + return "animal" +} + +// InvalidEntity 為無 partition key 的範例 struct,預期產生錯誤 +type InvalidEntity struct { + Field string `db:"field"` +} + +type MonkeyEntity struct { + ID gocql.UUID `db:"id" partition_key:"true"` + Name string `db:"name" clustering_key:"true" sai:"true"` + UpdateAt time.Time `db:"update_at"` + CreateAt time.Time `db:"create_at"` +} + +func (m *MonkeyEntity) TableName() string { + return "monkey_entity" +} + +type CatEntity struct { + ID *gocql.UUID `db:"id" partition_key:"true"` + Name *string `db:"name" partition_key:"true"` + UpdateAt *time.Time `db:"update_at"` + CreateAt *time.Time `db:"create_at" clustering_key:"true"` +} + +func (m *CatEntity) TableName() string { + return "cat_entity" +} diff --git a/internal/lib/cassandra/lock.go b/internal/lib/cassandra/lock.go new file mode 100644 index 0000000..327771e --- /dev/null +++ b/internal/lib/cassandra/lock.go @@ -0,0 +1,125 @@ +package cassandra + +import ( + "context" + "fmt" + "github.com/scylladb/gocqlx/v3/qb" + "time" +) + +const ( + defaultTTLSec = 30 + defaultRetry = 3 + baseDelay = 100 * time.Millisecond +) + +// LockOption 用來設定 TryLock 的 TTL 行為 +type LockOption func(*lockOptions) + +type lockOptions struct { + ttlSeconds int // TTL,單位秒;<=0 代表不 expire +} + +func WithLockTTL(d time.Duration) LockOption { + return func(o *lockOptions) { + o.ttlSeconds = int(d.Seconds()) + } +} + +// WithNoLockExpire 永不自動解鎖 +func WithNoLockExpire() LockOption { + return func(o *lockOptions) { + o.ttlSeconds = 0 + } +} + +// TryLock 嘗試在表上插入一筆唯一鍵(IF NOT EXISTS)作為鎖 +// 預設 30 秒 TTL,可透過 option 調整或取消 TTL +func (db *DB) TryLock( + ctx context.Context, + document any, + keyspace string, + opts ...LockOption, +) error { + // 1. 解析 metadata + metadata, err := GenerateTableMetadata(document, keyspace) + if err != nil { + return err + } + + // 2. 組合 option + options := &lockOptions{ttlSeconds: defaultTTLSec} + for _, opt := range opts { + opt(options) + } + + // 3. 建 TTL 子句 + builder := qb.Insert(metadata.Name). + Unique(). // IF NOT EXISTS + Columns(metadata.Columns...) + + if options.ttlSeconds > 0 { + ttl := time.Duration(options.ttlSeconds) * time.Second + builder = builder.TTL(ttl) + } + stmt, names := builder.ToCql() + + // 4. 執行 CAS + q := db.GetSession().Query(stmt, names).BindStruct(document). + WithContext(ctx). + WithTimestamp(time.Now().UnixNano() / 1e3) + + applied, err := q.ExecCASRelease() + if err != nil { + return err + } + if !applied { + return fmt.Errorf("failed to acquire lock") + } + return nil +} + +// UnLock 砍掉鎖,其實就是 Delete +func (db *DB) UnLock(ctx context.Context, filter any, keyspace string) error { + if filter == nil { + return fmt.Errorf("unlock failed: nil filter") + } + + metadata, err := GenerateTableMetadata(filter, keyspace) + if err != nil { + return fmt.Errorf("unlock: generate metadata failed: %w", err) + } + if len(metadata.Columns) == 0 { + return fmt.Errorf("unlock failed: missing primary key in struct") + } + + var lastErr error + + for i := 0; i < defaultRetry; i++ { + builder := qb.Delete(metadata.Name).Existing() + + // 動態添加 WHERE 條件 + for _, key := range metadata.PartKey { + builder = builder.Where(qb.Eq(key)) + } + stmt, names := builder.ToCql() + q := db.GetSession().Query(stmt, names).BindStruct(filter). + WithContext(ctx). + WithTimestamp(time.Now().UnixNano() / 1e3) + + applied, err := q.ExecCASRelease() + if err == nil && applied { + return nil + } + + if err != nil { + lastErr = fmt.Errorf("unlock error: %w", err) + } else if !applied { + lastErr = fmt.Errorf("unlock not applied: row not found or not visible yet") + } + + time.Sleep(baseDelay * time.Duration(1< 0 { + b = b.Where(where...) + for k, v := range args { + bind[k] = v + } + } + } +} diff --git a/internal/lib/cassandra/option_test.go b/internal/lib/cassandra/option_test.go new file mode 100644 index 0000000..91f1124 --- /dev/null +++ b/internal/lib/cassandra/option_test.go @@ -0,0 +1,108 @@ +package cassandra + +import ( + "github.com/gocql/gocql" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestOptions(t *testing.T) { + tests := []struct { + name string + option Option + check func(conf *conf) + }{ + { + name: "WithPort", + option: WithPort(1234), + check: func(conf *conf) { + assert.Equal(t, 1234, conf.Port, "Port 設定錯誤") + }, + }, + { + name: "WithKeyspace", + option: WithKeyspace("my_keyspace"), + check: func(conf *conf) { + assert.Equal(t, "my_keyspace", conf.Keyspace, "Keyspace 設定錯誤") + }, + }, + { + name: "WithAuth", + option: WithAuth("user", "pass"), + check: func(conf *conf) { + assert.Equal(t, "user", conf.Username, "Username 設定錯誤") + assert.Equal(t, "pass", conf.Password, "Password 設定錯誤") + assert.True(t, conf.UseAuth, "UseAuth 應該為 true") + }, + }, + { + name: "WithConsistency", + option: WithConsistency(gocql.Quorum), + check: func(conf *conf) { + assert.Equal(t, gocql.Quorum, conf.Consistency, "Consistency 設定錯誤") + }, + }, + { + name: "WithConnectTimeoutSec", + option: WithConnectTimeoutSec(45), + check: func(conf *conf) { + assert.Equal(t, 45, conf.ConnectTimeoutSec, "ConnectTimeoutSec 設定錯誤") + }, + }, + { + name: "WithNumConns", + option: WithNumConnects(10), + check: func(conf *conf) { + assert.Equal(t, 10, conf.NumConnect, "NumConns 設定錯誤") + }, + }, + { + name: "WithMaxRetries", + option: WithMaxRetries(5), + check: func(conf *conf) { + assert.Equal(t, 5, conf.MaxRetries, "MaxRetries 設定錯誤") + }, + }, + { + name: "WithRetryMin", + option: WithRetryMin(2 * time.Second), + check: func(conf *conf) { + assert.Equal(t, 2*time.Second, conf.RetryMin, "RetryMin 設定錯誤") + }, + }, + { + name: "WithRetryMax", + option: WithRetryMax(10 * time.Second), + check: func(conf *conf) { + assert.Equal(t, 10*time.Second, conf.RetryMax, "RetryMax 設定錯誤") + }, + }, + { + name: "WithReconnectInitial", + option: WithReconnectInitial(1 * time.Second), + check: func(conf *conf) { + assert.Equal(t, 1*time.Second, conf.ReconnectInitial, "ReconnectInitial 設定錯誤") + }, + }, + { + name: "WithReconnectMax", + option: WithReconnectMax(5 * time.Second), + check: func(conf *conf) { + assert.Equal(t, 5*time.Second, conf.ReconnectMax, "ReconnectMax 設定錯誤") + }, + }, + } + + for _, tc := range tests { + tc := tc // 避免 closure 捕捉迴圈變數 + t.Run(tc.name, func(t *testing.T) { + // 為每個測試案例產生一個新的 cassandraConf 實例 + conf := &conf{} + // 套用 Option + tc.option(conf) + // 執行檢查 + tc.check(conf) + }) + } +} diff --git a/internal/lib/cassandra/table.go b/internal/lib/cassandra/table.go new file mode 100644 index 0000000..caf34fd --- /dev/null +++ b/internal/lib/cassandra/table.go @@ -0,0 +1,332 @@ +package cassandra + +import ( + "context" + "fmt" + "github.com/scylladb/gocqlx/v3/qb" + "github.com/scylladb/gocqlx/v3/table" + "reflect" + "time" +) + +func (db *DB) AutoCreateSAIIndexes(doc any, keyspace string) error { + metadata, err := GenerateTableMetadata(doc, keyspace) + if err != nil { + return err + } + t := reflect.TypeOf(doc) + if t.Kind() == reflect.Ptr { + t = t.Elem() + } + for i := 0; i < t.NumField(); i++ { + f := t.Field(i) + if f.Tag.Get("sai") == "true" { + col := f.Tag.Get("cql") + if col == "" { + col = toSnakeCase(f.Name) + } + stmt := fmt.Sprintf("CREATE INDEX IF NOT EXISTS ON %s (%s) USING 'sai';", metadata.Name, col) + if err := db.GetSession().ExecStmt(stmt); err != nil { + return fmt.Errorf("SAI index create fail: %w", err) + } + } + } + return nil +} + +type Query struct { + db *DB + ctx context.Context + table string + keyspace string + columns []string + cmps []qb.Cmp + bindMap map[string]any + orders []orderBy + limit uint + document any + sets []setField // 欲更新欄位及其值 +} + +type orderBy struct { + Column string + Order qb.Order +} + +type setField struct { + Col string + Val any +} + +func (db *DB) Model(ctx context.Context, document any, keyspace string) *Query { + metadata, _ := GenerateTableMetadata(document, keyspace) + + return &Query{ + db: db, + ctx: ctx, + table: metadata.Name, + keyspace: keyspace, + columns: make([]string, 0), + cmps: make([]qb.Cmp, 0), + bindMap: make(map[string]any), + orders: make([]orderBy, 0), + limit: 0, + document: document, + } +} + +// Where 只允許 partition key 或有 sai index 的欄位進行 where 查詢 +func (q *Query) Where(cmp qb.Cmp, args map[string]any) *Query { + metadata, _ := GenerateTableMetadata(q.document, q.keyspace) + for k := range args { + // 允許 partition_key 或 sai 欄位 + isPartition := contains(metadata.PartKey, k) + isSAI := IsSAIField(q.document, k) + if !isPartition && !isSAI { + panic(fmt.Sprintf("field %s must be partition key or SAI index", k)) + } + q.bindMap[k] = args[k] + } + q.cmps = append(q.cmps, cmp) + for k, v := range args { + q.bindMap[k] = v + } + + return q +} + +func (q *Query) Select(cols ...string) *Query { + q.columns = append(q.columns, cols...) + return q +} + +func (q *Query) OrderBy(column string, order qb.Order) *Query { + q.orders = append(q.orders, orderBy{Column: column, Order: order}) + return q +} + +func (q *Query) Limit(limit uint) *Query { + q.limit = limit + + return q +} + +func (q *Query) Set(col string, val any) *Query { + q.sets = append(q.sets, setField{Col: col, Val: val}) + return q +} + +func (q *Query) Scan(dest any) error { + builder := qb.Select(q.table) + if len(q.columns) > 0 { + builder = builder.Columns(q.columns...) + } + if len(q.cmps) > 0 { + builder = builder.Where(q.cmps...) + } + if len(q.orders) > 0 { + for _, o := range q.orders { + builder = builder.OrderBy(o.Column, o.Order) + } + } + if q.limit > 0 { + builder = builder.Limit(q.limit) + } + + stmt, names := builder.ToCql() + query := q.db.GetSession().Query(stmt, names).WithContext(q.ctx) + if q.bindMap == nil { + q.bindMap = qb.M{} + } + query = query.BindMap(q.bindMap) + + return query.SelectRelease(dest) +} + +func (q *Query) Take(dest any) error { + q.limit = 1 + return q.Scan(dest) +} + +func (q *Query) Delete() error { + // 拿 partition key 清單 + metadata, err := GenerateTableMetadata(q.document, q.keyspace) + if err != nil { + return err + } + missingKeys := make([]string, 0) + for _, pk := range metadata.PartKey { + if _, ok := q.bindMap[pk]; !ok { + missingKeys = append(missingKeys, pk) + } + } + if len(missingKeys) > 0 { + return fmt.Errorf("delete operation requires all partition keys in WHERE: missing %v", missingKeys) + } + if len(q.cmps) == 0 { + return fmt.Errorf("delete operation requires at least one WHERE condition for safety") + } + + // 組 Delete 語句 + builder := qb.Delete(q.table) + builder = builder.Where(q.cmps...) + stmt, names := builder.ToCql() + query := q.db.GetSession().Query(stmt, names).WithContext(q.ctx) + if q.bindMap == nil { + q.bindMap = qb.M{} + } + query = query.BindMap(q.bindMap) + + return query.ExecRelease() +} + +func (q *Query) Update() error { + if q.document == nil { + return fmt.Errorf("update requires modelType to check partition keys") + } + + metadata, err := GenerateTableMetadata(q.document, q.keyspace) + if err != nil { + return fmt.Errorf("update: failed to get table metadata: %w", err) + } + + // 檢查 partition key 是否都在 bindMap + missingKeys := make([]string, 0) + for _, pk := range metadata.PartKey { + if _, ok := q.bindMap[pk]; !ok { + missingKeys = append(missingKeys, pk) + } + } + if len(missingKeys) > 0 { + return fmt.Errorf("update operation requires all partition keys in WHERE: missing %v", missingKeys) + } + + // 至少要有一個 set 欄位 + if len(q.sets) == 0 { + return fmt.Errorf("update requires at least one field to set") + } + // 至少一個 where + if len(q.cmps) == 0 { + return fmt.Errorf("update operation requires at least one WHERE condition for safety") + } + + // 組合 set 欄位 + setCols := make([]string, 0, len(q.sets)) + setVals := make([]any, 0, len(q.sets)) + for _, s := range q.sets { + setCols = append(setCols, s.Col) + setVals = append(setVals, s.Val) + } + + // 組合 CQL + builder := qb.Update(q.table).Set(setCols...) + builder = builder.Where(q.cmps...) + stmt, names := builder.ToCql() + + // setVals 要先,剩下的 where bind 順序依照 names + bindVals := append([]any{}, setVals...) + for _, name := range names[len(setCols):] { + if v, ok := q.bindMap[name]; ok { + bindVals = append(bindVals, v) + } + } + + query := q.db.GetSession().Query(stmt, names).WithContext(q.ctx) + if len(bindVals) > 0 { + query = query.Bind(bindVals...) + } + return query.ExecRelease() +} + +func (q *Query) InsertOne(data any) error { + metadata, err := GenerateTableMetadata(q.document, q.keyspace) + if err != nil { + return err + } + tbl := table.New(metadata) + qry := q.db.GetSession().Query(tbl.Insert()).WithContext(q.ctx).WithTimestamp(time.Now().UnixNano() / 1e3) + + switch reflect.TypeOf(data).Kind() { + case reflect.Map: + qry = qry.BindMap(data.(map[string]any)) + default: + qry = qry.BindStruct(data) + } + return qry.ExecRelease() +} + +func (q *Query) InsertMany(documents any) error { + v := reflect.ValueOf(documents) + if v.Kind() != reflect.Slice { + return fmt.Errorf("InsertMany: input must be a slice") + } + if v.Len() == 0 { + return nil + } + + for i := 0; i < v.Len(); i++ { + item := v.Index(i).Interface() + if err := q.InsertOne(item); err != nil { + return fmt.Errorf("InsertMany: failed at idx %d: %w", i, err) + } + } + return nil +} + +func (q *Query) GetAll(dest any) error { + metadata, err := GenerateTableMetadata(q.document, q.keyspace) + if err != nil { + return err + } + t := table.New(metadata) + + stmt, names := qb.Select(t.Name()).Columns(metadata.Columns...).ToCql() + exec := q.db.GetSession().Query(stmt, names).WithContext(q.ctx).WithTimestamp(time.Now().UnixNano() / 1e3) + + return exec.SelectRelease(dest) +} + +func (q *Query) Count() (int64, error) { + metadata, err := GenerateTableMetadata(q.document, q.keyspace) + if err != nil { + return 0, err + } + + t := table.New(metadata) + builder := qb.Select(t.Name()).Columns("COUNT(*)") + if len(q.cmps) > 0 { + builder = builder.Where(q.cmps...) + } + + stmt, names := builder.ToCql() + query := q.db.GetSession().Query(stmt, names).WithContext(q.ctx).WithTimestamp(time.Now().UnixNano() / 1e3) + if q.bindMap == nil { + q.bindMap = qb.M{} + } + query = query.BindMap(q.bindMap) + + var count int64 + if err := query.GetRelease(&count); err != nil { + return 0, err + } + return count, nil +} + +func IsSAIField(model any, fieldName string) bool { + t := reflect.TypeOf(model) + if t.Kind() == reflect.Ptr { + t = t.Elem() + } + for i := 0; i < t.NumField(); i++ { + f := t.Field(i) + tag := f.Tag.Get("sai") + col := f.Tag.Get("db") + if col == "" { + col = toSnakeCase(f.Name) + } + if (col == fieldName || f.Name == fieldName) && tag == "true" { + return true + } + } + return false +} diff --git a/internal/lib/cassandra/table_test.go b/internal/lib/cassandra/table_test.go new file mode 100644 index 0000000..aed2c9e --- /dev/null +++ b/internal/lib/cassandra/table_test.go @@ -0,0 +1,161 @@ +package cassandra + +import ( + "context" + "testing" + + "github.com/scylladb/gocqlx/v3/qb" + "github.com/stretchr/testify/assert" +) + +func TestQueryBuilder_TableDriven(t *testing.T) { + ctx := context.Background() + db := &DB{} // 可以用 mock DB + + type args struct { + cmp qb.Cmp + whereArg map[string]any + selects []string + orderCol string + order qb.Order + limit uint + setCol string + setVal any + } + + tests := []struct { + name string + args args + wantPanic bool + wantColumns []string + wantOrderCol string + wantOrder qb.Order + wantLimit uint + wantSetCol string + wantSetVal any + }{ + { + name: "where by partition key", + args: args{ + cmp: qb.Eq("id"), + whereArg: map[string]any{"id": "abc"}, + selects: []string{"id", "name"}, + orderCol: "id", + order: qb.ASC, + limit: 1, + setCol: "name", + setVal: "Daniel", + }, + wantPanic: false, + wantColumns: []string{"id", "name"}, + wantOrderCol: "id", + wantOrder: qb.ASC, + wantLimit: 1, + wantSetCol: "name", + wantSetVal: "Daniel", + }, + { + name: "where by sai index", + args: args{ + cmp: qb.Eq("name"), + whereArg: map[string]any{"name": "daniel"}, + selects: []string{"id", "name"}, + orderCol: "name", + order: qb.DESC, + limit: 2, + setCol: "name", + setVal: "Jacky", + }, + wantPanic: false, + wantColumns: []string{"id", "name"}, + wantOrderCol: "name", + wantOrder: qb.DESC, + wantLimit: 2, + wantSetCol: "name", + wantSetVal: "Jacky", + }, + { + name: "where by non-partition-non-sai", + args: args{ + cmp: qb.Eq("age"), + whereArg: map[string]any{"age": 18}, + selects: []string{"id", "name"}, + orderCol: "age", + order: qb.ASC, + limit: 3, + setCol: "age", + setVal: 20, + }, + wantPanic: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + run := func() { + q := db.Model(ctx, &MonkeyEntity{}, "my_keyspace"). + Where(tc.args.cmp, tc.args.whereArg). + Select(tc.args.selects...). + OrderBy(tc.args.orderCol, tc.args.order). + Limit(tc.args.limit). + Set(tc.args.setCol, tc.args.setVal) + + assert.Equal(t, tc.wantColumns, q.columns) + if len(q.orders) > 0 { + assert.Equal(t, tc.wantOrderCol, q.orders[0].Column) + assert.Equal(t, tc.wantOrder, q.orders[0].Order) + } + assert.Equal(t, tc.wantLimit, q.limit) + if len(q.sets) > 0 { + assert.Equal(t, tc.wantSetCol, q.sets[0].Col) + assert.Equal(t, tc.wantSetVal, q.sets[0].Val) + } + } + + if tc.wantPanic { + assert.Panics(t, run) + } else { + assert.NotPanics(t, run) + } + }) + } +} + +func TestQuery_Select_TableDriven(t *testing.T) { + tests := []struct { + name string + selectCalls [][]string + wantColumns []string + }{ + { + name: "select one col", + selectCalls: [][]string{{"id"}}, + wantColumns: []string{"id"}, + }, + { + name: "select multi col in one call", + selectCalls: [][]string{{"id", "name"}}, + wantColumns: []string{"id", "name"}, + }, + { + name: "multiple select calls append columns", + selectCalls: [][]string{{"id"}, {"name"}, {"age"}}, + wantColumns: []string{"id", "name", "age"}, + }, + { + name: "multiple select calls with overlap", + selectCalls: [][]string{{"id"}, {"id", "name"}, {"name", "age"}}, + wantColumns: []string{"id", "id", "name", "name", "age"}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + q := &Query{columns: make([]string, 0)} + for _, call := range tc.selectCalls { + q = q.Select(call...) + } + assert.Equal(t, tc.wantColumns, q.columns) + }) + } +} diff --git a/internal/lib/cassandra/utils.go b/internal/lib/cassandra/utils.go new file mode 100644 index 0000000..fe6bd85 --- /dev/null +++ b/internal/lib/cassandra/utils.go @@ -0,0 +1,42 @@ +package cassandra + +import ( + "reflect" + "unicode" +) + +// 判斷字串是否存在於 slice 中 +func contains(list []string, target string) bool { + for _, item := range list { + if item == target { + return true + } + } + return false +} + +// toSnakeCase 將 CamelCase 字串轉換為 snake_case +func toSnakeCase(s string) string { + var result []rune + for i, r := range s { + if unicode.IsUpper(r) { + if i > 0 { + result = append(result, '_') + } + result = append(result, unicode.ToLower(r)) + } else { + result = append(result, r) + } + } + return string(result) +} + +// 判斷欄位是否為零值或 nil +func isZero(v reflect.Value) bool { + switch v.Kind() { + case reflect.Ptr, reflect.Interface, reflect.Map, reflect.Slice: + return v.IsNil() + default: + return reflect.DeepEqual(v.Interface(), reflect.Zero(v.Type()).Interface()) + } +} diff --git a/internal/lib/cassandra/utils_test.go b/internal/lib/cassandra/utils_test.go new file mode 100644 index 0000000..2d88a4a --- /dev/null +++ b/internal/lib/cassandra/utils_test.go @@ -0,0 +1,85 @@ +package cassandra + +import ( + "github.com/stretchr/testify/assert" + "reflect" + "testing" +) + +func TestContains(t *testing.T) { + type testCase struct { + name string + list []string + target string + expected bool + } + + tests := []testCase{ + {"contains first", []string{"a", "b", "c"}, "a", true}, + {"contains middle", []string{"a", "b", "c"}, "b", true}, + {"contains last", []string{"a", "b", "c"}, "c", true}, + {"not contains", []string{"a", "b", "c"}, "d", false}, + {"empty list", []string{}, "a", false}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + actual := contains(tc.list, tc.target) + if actual != tc.expected { + t.Errorf("contains(%v, %q) = %v; want %v", tc.list, tc.target, actual, tc.expected) + } + }) + } +} + +// TestToSnakeCase 測試 toSnakeCase 函式 +func TestToSnakeCase(t *testing.T) { + testCases := []struct { + input string + expected string + }{ + {"CamelCase", "camel_case"}, + {"snake_case", "snake_case"}, + {"HttpServer", "http_server"}, + {"A", "a"}, + {"Already_Snake", "already__snake"}, // 依照實作,"Already_Snake" 轉換後會產生 double underscore + } + + for _, tc := range testCases { + t.Run(tc.input, func(t *testing.T) { + result := toSnakeCase(tc.input) + assert.Equal(t, tc.expected, result) + }) + } +} + +func TestIsZero(t *testing.T) { + type testCase struct { + name string + input any + expected bool + } + + tests := []testCase{ + {"zero int", 0, true}, + {"non-zero int", 42, false}, + {"zero string", "", true}, + {"non-zero string", "hello", false}, + {"zero bool", false, true}, + {"non-zero bool", true, false}, + {"nil slice", []string(nil), true}, + {"empty slice", []string{}, false}, + {"nil pointer", (*int)(nil), true}, + {"non-nil pointer", new(int), false}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + v := reflect.ValueOf(tc.input) + actual := isZero(v) + if actual != tc.expected { + t.Errorf("isZero(%v) = %v; want %v", tc.input, actual, tc.expected) + } + }) + } +} diff --git a/internal/repository/data_source_binance.go b/internal/repository/data_source_binance.go index 71dc0e7..1926c10 100644 --- a/internal/repository/data_source_binance.go +++ b/internal/repository/data_source_binance.go @@ -6,6 +6,7 @@ import ( "blockchain/internal/domain/blockchain" "blockchain/internal/domain/entity" "blockchain/internal/domain/repository" + "blockchain/internal/lib/cassandra" "bytes" "context" "encoding/csv" @@ -31,14 +32,17 @@ import ( type BinanceRepositoryParam struct { Conf *config.Binance Redis *redis.Redis + DB *cassandra.DB } type BinanceRepository struct { Client *binance.Client + db *cassandra.DB rds *redis.Redis barrier syncx.SingleFlight workers *ants.Pool workerSize int64 + KeySpace string } func MustBinanceRepository(param BinanceRepositoryParam) repository.DataSourceRepository { @@ -52,6 +56,7 @@ func MustBinanceRepository(param BinanceRepositoryParam) repository.DataSourceRe return &BinanceRepository{ Client: client, + db: param.DB, rds: param.Redis, barrier: syncx.NewSingleFlight(), workerSize: param.Conf.WorkerSize, @@ -167,6 +172,11 @@ func (repo *BinanceRepository) FetchHistoryKline(ctx context.Context, param repo return allKlines, nil } +func (repo *BinanceRepository) SaveHistoryKline(ctx context.Context, data []*entity.Kline) error { + + //repo.db.Insert(ctx, repo.KeySpace) +} + // ============= func (repo *BinanceRepository) getSymbolsFromSource(ctx context.Context) ([]binance.Symbol, error) { if repo.Client == nil { diff --git a/internal/svc/cassandra.go b/internal/svc/cassandra.go new file mode 100644 index 0000000..bd060c5 --- /dev/null +++ b/internal/svc/cassandra.go @@ -0,0 +1,45 @@ +package svc + +import ( + "blockchain/internal/config" + "blockchain/internal/lib/cassandra" +) + +// NewDB 傳入 config 返回 CassandraDB +func NewDB(cfg config.Config) (*cassandra.DB, error) { + var opts []cassandra.Option + + // 必填欄位 + opts = append(opts, cassandra.WithPort(cfg.Cassandra.Port)) + opts = append(opts, cassandra.WithKeyspace(cfg.Cassandra.Keyspace)) + + // 其他選填值,僅在不為零時加入 + if cfg.Cassandra.ConnectTimeoutSec > 0 { + opts = append(opts, cassandra.WithConnectTimeoutSec(cfg.Cassandra.ConnectTimeoutSec)) + } + if cfg.Cassandra.NumConns > 0 { + opts = append(opts, cassandra.WithNumConnects(cfg.Cassandra.NumConns)) + } + if cfg.Cassandra.MaxRetries > 0 { + opts = append(opts, cassandra.WithMaxRetries(cfg.Cassandra.MaxRetries)) + } + if cfg.Cassandra.RetryMin > 0 { + opts = append(opts, cassandra.WithRetryMin(cfg.Cassandra.RetryMin)) + } + if cfg.Cassandra.RetryMax > 0 { + opts = append(opts, cassandra.WithRetryMax(cfg.Cassandra.RetryMax)) + } + if cfg.Cassandra.ReconnectInitial > 0 { + opts = append(opts, cassandra.WithReconnectInitial(cfg.Cassandra.ReconnectInitial)) + } + if cfg.Cassandra.ReconnectMax > 0 { + opts = append(opts, cassandra.WithReconnectMax(cfg.Cassandra.ReconnectMax)) + } + + // 使用驗證時才加入帳號密碼 + if cfg.Cassandra.UseAuth { + opts = append(opts, cassandra.WithAuth(cfg.Cassandra.Username, cfg.Cassandra.Password)) + } + + return cassandra.NewDB(cfg.Cassandra.Hosts, opts...) +} diff --git a/internal/svc/service_context.go b/internal/svc/service_context.go index bc5d08a..460de33 100644 --- a/internal/svc/service_context.go +++ b/internal/svc/service_context.go @@ -4,6 +4,7 @@ import ( "blockchain/internal/config" "blockchain/internal/domain/repository" "blockchain/internal/domain/usecase" + "blockchain/internal/lib/cassandra" repo "blockchain/internal/repository" uc "blockchain/internal/usecase" @@ -26,6 +27,11 @@ func NewServiceContext(c config.Config) *ServiceContext { Redis: newRedis, }) + cassandra, err := NewDB(c) + if err != nil { + return nil + } + return &ServiceContext{ Config: c, BinanceRepo: binanceRepo,