166 lines
4.6 KiB
Go
166 lines
4.6 KiB
Go
package repository
|
||
|
||
import (
|
||
"backend/pkg/chat/domain/entity"
|
||
"backend/pkg/chat/domain/repository"
|
||
"backend/pkg/library/cassandra"
|
||
"context"
|
||
"fmt"
|
||
"time"
|
||
|
||
"github.com/gocql/gocql"
|
||
)
|
||
|
||
type messageRepository struct {
|
||
repo cassandra.Repository[entity.Message]
|
||
dedupRepo cassandra.Repository[entity.MessageDedup]
|
||
db *cassandra.DB
|
||
keyspace string
|
||
}
|
||
|
||
// MessageRepositoryParam 創建 MessageRepository 所需的參數
|
||
type MessageRepositoryParam struct {
|
||
DB *cassandra.DB
|
||
Keyspace string
|
||
}
|
||
|
||
// MustMessageRepository 創建 MessageRepository(如果失敗會 panic)
|
||
func MustMessageRepository(param MessageRepositoryParam) repository.MessageRepository {
|
||
repo, err := NewMessageRepository(param.DB, param.Keyspace)
|
||
if err != nil {
|
||
panic(fmt.Sprintf("failed to create message repository: %v", err))
|
||
}
|
||
return repo
|
||
}
|
||
|
||
// NewMessageRepository 創建新的訊息 Repository
|
||
func NewMessageRepository(db *cassandra.DB, keyspace string) (repository.MessageRepository, error) {
|
||
repo, err := cassandra.NewRepository[entity.Message](db, keyspace)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
dedupRepo, err := cassandra.NewRepository[entity.MessageDedup](db, keyspace)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return &messageRepository{
|
||
repo: repo,
|
||
dedupRepo: dedupRepo,
|
||
db: db,
|
||
keyspace: keyspace,
|
||
}, nil
|
||
}
|
||
|
||
func (message *messageRepository) Insert(ctx context.Context, msg *entity.Message) error {
|
||
now := time.Now().UTC()
|
||
if msg.TS == 0 {
|
||
msg.TS = now.UnixNano()
|
||
}
|
||
// 只在 BucketDay 為空時才自動設置,保留先前傳入的值
|
||
if msg.BucketDay == "" {
|
||
msg.BucketDay = now.Format(time.DateOnly)
|
||
}
|
||
|
||
return message.repo.Insert(ctx, *msg)
|
||
}
|
||
|
||
func (message *messageRepository) ListMessages(ctx context.Context, param repository.ListMessagesReq) ([]entity.Message, error) {
|
||
// 設定預設分頁大小
|
||
if param.PageSize <= 0 {
|
||
param.PageSize = 20
|
||
}
|
||
|
||
// 將字串 RoomID 轉換為 UUID
|
||
roomUUID, err := gocql.ParseUUID(param.RoomID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 構建查詢條件
|
||
query := message.repo.Query().
|
||
Where(cassandra.Eq("room_id", roomUUID)).
|
||
Where(cassandra.Eq("bucket_day", param.BucketDay))
|
||
|
||
// 使用 cursor-based pagination:如果提供了 LastTS,則查詢 ts < LastTS 的訊息
|
||
// 因為排序是 DESC,所以使用 < 來獲取更早的訊息(下一頁)
|
||
if param.LastTS > 0 {
|
||
query = query.Where(cassandra.Lt("ts", param.LastTS))
|
||
}
|
||
|
||
// 添加排序和限制
|
||
query = query.
|
||
OrderBy("ts", cassandra.DESC).
|
||
Limit(int(param.PageSize))
|
||
|
||
// 執行查詢
|
||
var messages []entity.Message
|
||
if err := query.Scan(ctx, &messages); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return messages, nil
|
||
}
|
||
|
||
func (message *messageRepository) Count(ctx context.Context, roomID string) (int64, error) {
|
||
// 將字串 RoomID 轉換為 UUID
|
||
roomUUID, err := gocql.ParseUUID(roomID)
|
||
if err != nil {
|
||
return 0, err
|
||
}
|
||
|
||
// 注意:由於 partition key 是 (room_id, bucket_day),只用 room_id 查詢需要 ALLOW FILTERING
|
||
// 這在生產環境中效能較差,建議改用按 bucket_day 分別查詢後加總
|
||
count, err := message.repo.Query().
|
||
Where(cassandra.Eq("room_id", roomUUID)).
|
||
AllowFiltering().
|
||
Count(ctx)
|
||
if err != nil {
|
||
return 0, err
|
||
}
|
||
return count, nil
|
||
}
|
||
|
||
// CheckAndInsertDedup 檢查並插入去重記錄
|
||
// 使用 IF NOT EXISTS 來實現原子性的去重檢查
|
||
// 返回值:true 表示已存在(重複),false 表示成功插入(不重複)
|
||
func (message *messageRepository) CheckAndInsertDedup(ctx context.Context, param repository.CheckDupReq) (bool, error) {
|
||
// 將字串 RoomID 轉換為 UUID
|
||
roomUUID, err := gocql.ParseUUID(param.RoomID)
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
|
||
// 使用 IF NOT EXISTS 來實現原子性的去重檢查
|
||
// 如果記錄已存在,INSERT 不會插入,且 applied = false
|
||
dedup := entity.MessageDedup{
|
||
RoomID: roomUUID,
|
||
UID: param.UID,
|
||
BucketSec: param.BucketSec,
|
||
ContentMD5: param.ContentMD5,
|
||
}
|
||
|
||
// 使用原生 CQL 語句來實現 IF NOT EXISTS
|
||
tableName := dedup.TableName()
|
||
stmt := fmt.Sprintf(
|
||
"INSERT INTO %s.%s (room_id, uid, bucket_sec, content_md5) VALUES (?, ?, ?, ?) IF NOT EXISTS",
|
||
message.keyspace,
|
||
tableName,
|
||
)
|
||
|
||
// 執行 INSERT IF NOT EXISTS
|
||
applied, err := message.db.GetSession().Query(stmt, nil).
|
||
Bind(roomUUID, param.UID, param.BucketSec, param.ContentMD5).
|
||
WithContext(ctx).
|
||
MapScanCAS(make(map[string]interface{}))
|
||
|
||
if err != nil {
|
||
return false, fmt.Errorf("failed to check dedup: %w", err)
|
||
}
|
||
|
||
// applied = false 表示記錄已存在(重複)
|
||
// applied = true 表示成功插入(不重複)
|
||
return !applied, nil
|
||
}
|