166 lines
4.6 KiB
Go
166 lines
4.6 KiB
Go
|
|
package repository
|
|||
|
|
|
|||
|
|
import (
|
|||
|
|
"backend/pkg/chat/domain/entity"
|
|||
|
|
"backend/pkg/chat/domain/repository"
|
|||
|
|
"backend/pkg/library/cassandra"
|
|||
|
|
"context"
|
|||
|
|
"fmt"
|
|||
|
|
"time"
|
|||
|
|
|
|||
|
|
"github.com/gocql/gocql"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
type messageRepository struct {
|
|||
|
|
repo cassandra.Repository[entity.Message]
|
|||
|
|
dedupRepo cassandra.Repository[entity.MessageDedup]
|
|||
|
|
db *cassandra.DB
|
|||
|
|
keyspace string
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// MessageRepositoryParam 創建 MessageRepository 所需的參數
|
|||
|
|
type MessageRepositoryParam struct {
|
|||
|
|
DB *cassandra.DB
|
|||
|
|
Keyspace string
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// MustMessageRepository 創建 MessageRepository(如果失敗會 panic)
|
|||
|
|
func MustMessageRepository(param MessageRepositoryParam) repository.MessageRepository {
|
|||
|
|
repo, err := NewMessageRepository(param.DB, param.Keyspace)
|
|||
|
|
if err != nil {
|
|||
|
|
panic(fmt.Sprintf("failed to create message repository: %v", err))
|
|||
|
|
}
|
|||
|
|
return repo
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// NewMessageRepository 創建新的訊息 Repository
|
|||
|
|
func NewMessageRepository(db *cassandra.DB, keyspace string) (repository.MessageRepository, error) {
|
|||
|
|
repo, err := cassandra.NewRepository[entity.Message](db, keyspace)
|
|||
|
|
if err != nil {
|
|||
|
|
return nil, err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
dedupRepo, err := cassandra.NewRepository[entity.MessageDedup](db, keyspace)
|
|||
|
|
if err != nil {
|
|||
|
|
return nil, err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return &messageRepository{
|
|||
|
|
repo: repo,
|
|||
|
|
dedupRepo: dedupRepo,
|
|||
|
|
db: db,
|
|||
|
|
keyspace: keyspace,
|
|||
|
|
}, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (message *messageRepository) Insert(ctx context.Context, msg *entity.Message) error {
|
|||
|
|
now := time.Now().UTC()
|
|||
|
|
if msg.TS == 0 {
|
|||
|
|
msg.TS = now.UnixNano()
|
|||
|
|
}
|
|||
|
|
// 只在 BucketDay 為空時才自動設置,保留先前傳入的值
|
|||
|
|
if msg.BucketDay == "" {
|
|||
|
|
msg.BucketDay = now.Format(time.DateOnly)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return message.repo.Insert(ctx, *msg)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (message *messageRepository) ListMessages(ctx context.Context, param repository.ListMessagesReq) ([]entity.Message, error) {
|
|||
|
|
// 設定預設分頁大小
|
|||
|
|
if param.PageSize <= 0 {
|
|||
|
|
param.PageSize = 20
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 將字串 RoomID 轉換為 UUID
|
|||
|
|
roomUUID, err := gocql.ParseUUID(param.RoomID)
|
|||
|
|
if err != nil {
|
|||
|
|
return nil, err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 構建查詢條件
|
|||
|
|
query := message.repo.Query().
|
|||
|
|
Where(cassandra.Eq("room_id", roomUUID)).
|
|||
|
|
Where(cassandra.Eq("bucket_day", param.BucketDay))
|
|||
|
|
|
|||
|
|
// 使用 cursor-based pagination:如果提供了 LastTS,則查詢 ts < LastTS 的訊息
|
|||
|
|
// 因為排序是 DESC,所以使用 < 來獲取更早的訊息(下一頁)
|
|||
|
|
if param.LastTS > 0 {
|
|||
|
|
query = query.Where(cassandra.Lt("ts", param.LastTS))
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 添加排序和限制
|
|||
|
|
query = query.
|
|||
|
|
OrderBy("ts", cassandra.DESC).
|
|||
|
|
Limit(int(param.PageSize))
|
|||
|
|
|
|||
|
|
// 執行查詢
|
|||
|
|
var messages []entity.Message
|
|||
|
|
if err := query.Scan(ctx, &messages); err != nil {
|
|||
|
|
return nil, err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return messages, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
func (message *messageRepository) Count(ctx context.Context, roomID string) (int64, error) {
|
|||
|
|
// 將字串 RoomID 轉換為 UUID
|
|||
|
|
roomUUID, err := gocql.ParseUUID(roomID)
|
|||
|
|
if err != nil {
|
|||
|
|
return 0, err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 注意:由於 partition key 是 (room_id, bucket_day),只用 room_id 查詢需要 ALLOW FILTERING
|
|||
|
|
// 這在生產環境中效能較差,建議改用按 bucket_day 分別查詢後加總
|
|||
|
|
count, err := message.repo.Query().
|
|||
|
|
Where(cassandra.Eq("room_id", roomUUID)).
|
|||
|
|
AllowFiltering().
|
|||
|
|
Count(ctx)
|
|||
|
|
if err != nil {
|
|||
|
|
return 0, err
|
|||
|
|
}
|
|||
|
|
return count, nil
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// CheckAndInsertDedup 檢查並插入去重記錄
|
|||
|
|
// 使用 IF NOT EXISTS 來實現原子性的去重檢查
|
|||
|
|
// 返回值:true 表示已存在(重複),false 表示成功插入(不重複)
|
|||
|
|
func (message *messageRepository) CheckAndInsertDedup(ctx context.Context, param repository.CheckDupReq) (bool, error) {
|
|||
|
|
// 將字串 RoomID 轉換為 UUID
|
|||
|
|
roomUUID, err := gocql.ParseUUID(param.RoomID)
|
|||
|
|
if err != nil {
|
|||
|
|
return false, err
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 使用 IF NOT EXISTS 來實現原子性的去重檢查
|
|||
|
|
// 如果記錄已存在,INSERT 不會插入,且 applied = false
|
|||
|
|
dedup := entity.MessageDedup{
|
|||
|
|
RoomID: roomUUID,
|
|||
|
|
UID: param.UID,
|
|||
|
|
BucketSec: param.BucketSec,
|
|||
|
|
ContentMD5: param.ContentMD5,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 使用原生 CQL 語句來實現 IF NOT EXISTS
|
|||
|
|
tableName := dedup.TableName()
|
|||
|
|
stmt := fmt.Sprintf(
|
|||
|
|
"INSERT INTO %s.%s (room_id, uid, bucket_sec, content_md5) VALUES (?, ?, ?, ?) IF NOT EXISTS",
|
|||
|
|
message.keyspace,
|
|||
|
|
tableName,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
// 執行 INSERT IF NOT EXISTS
|
|||
|
|
applied, err := message.db.GetSession().Query(stmt, nil).
|
|||
|
|
Bind(roomUUID, param.UID, param.BucketSec, param.ContentMD5).
|
|||
|
|
WithContext(ctx).
|
|||
|
|
MapScanCAS(make(map[string]interface{}))
|
|||
|
|
|
|||
|
|
if err != nil {
|
|||
|
|
return false, fmt.Errorf("failed to check dedup: %w", err)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// applied = false 表示記錄已存在(重複)
|
|||
|
|
// applied = true 表示成功插入(不重複)
|
|||
|
|
return !applied, nil
|
|||
|
|
}
|