thread-master/internal/model/scan_post/repository/mongo.go

503 lines
15 KiB
Go

package repository
import (
"context"
"strings"
"time"
app "haixun-backend/internal/library/errors"
"haixun-backend/internal/library/errors/code"
libmongo "haixun-backend/internal/library/mongo"
"haixun-backend/internal/model/scan_post/domain/entity"
domrepo "haixun-backend/internal/model/scan_post/domain/repository"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type mongoRepository struct {
collection *mongo.Collection
}
func NewMongoRepository(db *mongo.Database) domrepo.Repository {
if db == nil {
return &mongoRepository{}
}
return &mongoRepository{collection: db.Collection(entity.CollectionName)}
}
func (r *mongoRepository) EnsureIndexes(ctx context.Context) error {
if r.collection == nil {
return nil
}
return libmongo.EnsureIndexes(ctx, r.collection, []mongo.IndexModel{
{
Keys: bson.D{{Key: "tenant_id", Value: 1}, {Key: "owner_uid", Value: 1}, {Key: "brand_id", Value: 1}, {Key: "permalink", Value: 1}},
Options: options.Index().SetUnique(true).SetPartialFilterExpression(bson.M{"brand_id": bson.M{"$gt": ""}, "topic_id": bson.M{"$in": []interface{}{nil, ""}}})},
{
Keys: bson.D{{Key: "tenant_id", Value: 1}, {Key: "owner_uid", Value: 1}, {Key: "topic_id", Value: 1}, {Key: "permalink", Value: 1}},
Options: options.Index().SetUnique(true).SetPartialFilterExpression(bson.M{"topic_id": bson.M{"$gt": ""}})},
{
Keys: bson.D{{Key: "tenant_id", Value: 1}, {Key: "owner_uid", Value: 1}, {Key: "persona_id", Value: 1}, {Key: "permalink", Value: 1}},
Options: options.Index().SetUnique(true).SetPartialFilterExpression(bson.M{"persona_id": bson.M{"$gt": ""}})},
{
Keys: bson.D{{Key: "tenant_id", Value: 1}, {Key: "owner_uid", Value: 1}, {Key: "brand_id", Value: 1}, {Key: "priority", Value: 1}},
},
{
Keys: bson.D{{Key: "tenant_id", Value: 1}, {Key: "owner_uid", Value: 1}, {Key: "persona_id", Value: 1}, {Key: "priority", Value: 1}},
},
{
Keys: bson.D{{Key: "scan_job_id", Value: 1}},
},
})
}
func brandOwnerFilter(tenantID, ownerUID, brandID string) bson.M {
filter := bson.M{
"tenant_id": tenantID,
"owner_uid": ownerUID,
}
for k, v := range libmongo.BrandScopeFilter(brandID) {
filter[k] = v
}
return filter
}
func personaViralFilter(tenantID, ownerUID, personaID string) bson.M {
return bson.M{
"tenant_id": tenantID,
"owner_uid": ownerUID,
"persona_id": strings.TrimSpace(personaID),
"flow": entity.FlowViral,
}
}
func personaViralScopeFilter(tenantID, ownerUID, personaID, copyMissionID string) bson.M {
filter := personaViralFilter(tenantID, ownerUID, personaID)
copyMissionID = strings.TrimSpace(copyMissionID)
if copyMissionID != "" {
filter["copy_mission_id"] = copyMissionID
} else {
filter["$or"] = []bson.M{
{"copy_mission_id": bson.M{"$exists": false}},
{"copy_mission_id": ""},
}
}
return filter
}
func (r *mongoRepository) ReplaceForViralScan(ctx context.Context, tenantID, ownerUID, personaID, copyMissionID, scanJobID string, posts []entity.ScanPost) error {
if r.collection == nil {
return app.For(code.Persona).DBUnavailable("Mongo is not configured")
}
_, err := r.collection.DeleteMany(ctx, personaViralScopeFilter(tenantID, ownerUID, personaID, copyMissionID))
if err != nil {
return err
}
if len(posts) == 0 {
return nil
}
docs := make([]any, 0, len(posts))
for _, post := range posts {
docs = append(docs, post)
}
_, err = r.collection.InsertMany(ctx, docs)
return err
}
func placementWriteFilter(tenantID, ownerUID, brandID, topicID string) bson.M {
topicID = strings.TrimSpace(topicID)
if topicID != "" {
return bson.M{
"tenant_id": tenantID,
"owner_uid": ownerUID,
"topic_id": topicID,
}
}
return brandOwnerFilter(tenantID, ownerUID, brandID)
}
func untaggedTopicFilter() bson.M {
return bson.M{
"$or": []bson.M{
{"topic_id": bson.M{"$exists": false}},
{"topic_id": ""},
},
}
}
func (r *mongoRepository) ClearForPlacementScan(ctx context.Context, tenantID, ownerUID, brandID, topicID string) error {
if r.collection == nil {
return app.For(code.Brand).DBUnavailable("Mongo is not configured")
}
topicID = strings.TrimSpace(topicID)
filter := bson.M{
"tenant_id": tenantID,
"owner_uid": ownerUID,
}
if topicID != "" {
filter["topic_id"] = topicID
} else {
for k, v := range brandOwnerFilter(tenantID, ownerUID, brandID) {
filter[k] = v
}
for k, v := range untaggedTopicFilter() {
filter[k] = v
}
}
_, err := r.collection.DeleteMany(ctx, filter)
return err
}
func (r *mongoRepository) UpsertBatchForScan(ctx context.Context, tenantID, ownerUID, brandID, topicID string, posts []entity.ScanPost) (int, error) {
if r.collection == nil {
return 0, app.For(code.Brand).DBUnavailable("Mongo is not configured")
}
if len(posts) == 0 {
return 0, nil
}
upserted := 0
for _, post := range posts {
permalink := strings.TrimSpace(post.Permalink)
if permalink == "" {
continue
}
writeTopicID := strings.TrimSpace(topicID)
if writeTopicID == "" && strings.TrimSpace(post.TopicID) != "" {
writeTopicID = strings.TrimSpace(post.TopicID)
}
filter := placementWriteFilter(tenantID, ownerUID, brandID, writeTopicID)
filter["permalink"] = permalink
var existing entity.ScanPost
err := r.collection.FindOne(ctx, filter).Decode(&existing)
if err == nil {
post.ID = existing.ID
if existing.CreateAt > 0 {
post.CreateAt = existing.CreateAt
}
if strings.TrimSpace(existing.OutreachStatus) != "" && existing.OutreachStatus != entity.OutreachStatusPending {
post.OutreachStatus = existing.OutreachStatus
post.PublishedReplyID = existing.PublishedReplyID
post.PublishedPermalink = existing.PublishedPermalink
post.OutreachUpdateAt = existing.OutreachUpdateAt
}
if strings.TrimSpace(existing.PostedAt) != "" && strings.TrimSpace(post.PostedAt) == "" {
post.PostedAt = existing.PostedAt
}
}
if writeTopicID != "" {
post.TopicID = writeTopicID
}
if strings.TrimSpace(post.BrandID) == "" {
post.BrandID = brandID
}
if strings.TrimSpace(post.ID) == "" {
continue
}
if post.CreateAt == 0 {
post.CreateAt = time.Now().UnixNano()
}
raw, err := bson.Marshal(post)
if err != nil {
return upserted, err
}
var doc bson.M
if err := bson.Unmarshal(raw, &doc); err != nil {
return upserted, err
}
delete(doc, "_id")
_, err = r.collection.UpdateOne(
ctx,
filter,
bson.M{"$set": doc, "$setOnInsert": bson.M{"_id": post.ID}},
options.Update().SetUpsert(true),
)
if err != nil {
return upserted, err
}
upserted++
}
return upserted, nil
}
func (r *mongoRepository) PruneScanJobPosts(ctx context.Context, tenantID, ownerUID, brandID, topicID, scanJobID string, keepPermalinks []string) error {
if r.collection == nil {
return app.For(code.Brand).DBUnavailable("Mongo is not configured")
}
filter := placementWriteFilter(tenantID, ownerUID, brandID, topicID)
filter["scan_job_id"] = strings.TrimSpace(scanJobID)
if len(keepPermalinks) > 0 {
filter["permalink"] = bson.M{"$nin": keepPermalinks}
}
_, err := r.collection.DeleteMany(ctx, filter)
return err
}
func (r *mongoRepository) ReplaceForScan(ctx context.Context, tenantID, ownerUID, brandID, scanJobID string, posts []entity.ScanPost) error {
if r.collection == nil {
return app.For(code.Brand).DBUnavailable("Mongo is not configured")
}
_, err := r.collection.DeleteMany(ctx, brandOwnerFilter(tenantID, ownerUID, brandID))
if err != nil {
return err
}
if len(posts) == 0 {
return nil
}
docs := make([]any, 0, len(posts))
for _, post := range posts {
docs = append(docs, post)
}
_, err = r.collection.InsertMany(ctx, docs)
return err
}
func (r *mongoRepository) Get(ctx context.Context, tenantID, ownerUID, brandID, postID string) (*entity.ScanPost, error) {
if r.collection == nil {
return nil, app.For(code.Brand).DBUnavailable("Mongo is not configured")
}
filter := brandOwnerFilter(tenantID, ownerUID, brandID)
filter["_id"] = strings.TrimSpace(postID)
var out entity.ScanPost
err := r.collection.FindOne(ctx, filter).Decode(&out)
if err == mongo.ErrNoDocuments {
return nil, app.For(code.Brand).ResNotFound("scan post not found")
}
if err != nil {
return nil, err
}
return &out, nil
}
func (r *mongoRepository) UpdateOutreach(
ctx context.Context,
tenantID, ownerUID, brandID, postID string,
patch entity.OutreachPatch,
) (*entity.ScanPost, error) {
if r.collection == nil {
return nil, app.For(code.Brand).DBUnavailable("Mongo is not configured")
}
set := bson.M{}
if strings.TrimSpace(patch.Status) != "" {
set["outreach_status"] = strings.TrimSpace(patch.Status)
}
if strings.TrimSpace(patch.PublishedReplyID) != "" {
set["published_reply_id"] = strings.TrimSpace(patch.PublishedReplyID)
}
if strings.TrimSpace(patch.PublishedPermalink) != "" {
set["published_permalink"] = strings.TrimSpace(patch.PublishedPermalink)
}
if len(set) == 0 {
return r.Get(ctx, tenantID, ownerUID, brandID, postID)
}
set["outreach_update_at"] = time.Now().UnixNano()
filter := brandOwnerFilter(tenantID, ownerUID, brandID)
filter["_id"] = strings.TrimSpace(postID)
opts := options.FindOneAndUpdate().SetReturnDocument(options.After)
var out entity.ScanPost
err := r.collection.FindOneAndUpdate(ctx, filter, bson.M{"$set": set}, opts).Decode(&out)
if err == mongo.ErrNoDocuments {
return nil, app.For(code.Brand).ResNotFound("scan post not found")
}
if err != nil {
return nil, err
}
return &out, nil
}
func (r *mongoRepository) Delete(ctx context.Context, tenantID, ownerUID, brandID, topicID, postID string) error {
if r.collection == nil {
return app.For(code.Brand).DBUnavailable("Mongo is not configured")
}
postID = strings.TrimSpace(postID)
if postID == "" {
return app.For(code.Brand).InputMissingRequired("scan post id is required")
}
filter := topicScopeFilter(tenantID, ownerUID, topicID, brandID)
filter["_id"] = postID
result, err := r.collection.DeleteOne(ctx, filter)
if err != nil {
return err
}
if result.DeletedCount == 0 {
return nil
}
return nil
}
func (r *mongoRepository) DeleteMany(ctx context.Context, tenantID, ownerUID, brandID, topicID string, postIDs []string) (int, error) {
if r.collection == nil {
return 0, app.For(code.Brand).DBUnavailable("Mongo is not configured")
}
ids := make([]string, 0, len(postIDs))
seen := map[string]struct{}{}
for _, postID := range postIDs {
id := strings.TrimSpace(postID)
if id == "" {
continue
}
if _, ok := seen[id]; ok {
continue
}
seen[id] = struct{}{}
ids = append(ids, id)
}
if len(ids) == 0 {
return 0, nil
}
filter := topicScopeFilter(tenantID, ownerUID, topicID, brandID)
filter["_id"] = bson.M{"$in": ids}
result, err := r.collection.DeleteMany(ctx, filter)
if err != nil {
return 0, err
}
return int(result.DeletedCount), nil
}
func (r *mongoRepository) GetForPersona(ctx context.Context, tenantID, ownerUID, personaID, postID string) (*entity.ScanPost, error) {
if r.collection == nil {
return nil, app.For(code.Persona).DBUnavailable("Mongo is not configured")
}
filter := personaViralFilter(tenantID, ownerUID, personaID)
filter["_id"] = strings.TrimSpace(postID)
var out entity.ScanPost
err := r.collection.FindOne(ctx, filter).Decode(&out)
if err == mongo.ErrNoDocuments {
return nil, app.For(code.Persona).ResNotFound("viral scan post not found")
}
if err != nil {
return nil, err
}
return &out, nil
}
func (r *mongoRepository) ListForPersona(ctx context.Context, tenantID, ownerUID string, filter domrepo.PersonaListFilter) ([]entity.ScanPost, error) {
if r.collection == nil {
return nil, app.For(code.Persona).DBUnavailable("Mongo is not configured")
}
query := personaViralScopeFilter(tenantID, ownerUID, filter.PersonaID, filter.CopyMissionID)
if flow := strings.TrimSpace(filter.Flow); flow != "" {
query["flow"] = flow
}
limit := filter.Limit
if limit <= 0 {
limit = 100
}
if limit > 500 {
limit = 500
}
opts := options.Find().
SetSort(bson.D{{Key: "engagement_score", Value: -1}, {Key: "create_at", Value: -1}}).
SetLimit(int64(limit))
cur, err := r.collection.Find(ctx, query, opts)
if err != nil {
return nil, err
}
defer cur.Close(ctx)
var out []entity.ScanPost
if err := cur.All(ctx, &out); err != nil {
return nil, err
}
return out, nil
}
func topicScopeFilter(tenantID, ownerUID, topicID, brandID string) bson.M {
topicID = strings.TrimSpace(topicID)
if topicID != "" {
return bson.M{
"tenant_id": tenantID,
"owner_uid": ownerUID,
"topic_id": topicID,
}
}
return brandOwnerFilter(tenantID, ownerUID, brandID)
}
func (r *mongoRepository) AttachTopicID(ctx context.Context, tenantID, ownerUID, brandID, topicID string) error {
if r.collection == nil {
return app.For(code.Brand).DBUnavailable("Mongo is not configured")
}
filter := brandOwnerFilter(tenantID, ownerUID, brandID)
for k, v := range untaggedTopicFilter() {
filter[k] = v
}
_, err := r.collection.UpdateMany(
ctx,
filter,
bson.M{"$set": bson.M{"topic_id": strings.TrimSpace(topicID)}},
)
return err
}
func (r *mongoRepository) List(ctx context.Context, tenantID, ownerUID string, filter domrepo.ListFilter) ([]entity.ScanPost, error) {
if r.collection == nil {
return nil, app.For(code.Brand).DBUnavailable("Mongo is not configured")
}
query := topicScopeFilter(tenantID, ownerUID, filter.TopicID, filter.BrandID)
if priorityQuery := priorityListFilter(filter.Priority); priorityQuery != nil {
for key, value := range priorityQuery {
query[key] = value
}
}
if filter.ProductFitMin > 0 {
query["product_fit_score"] = bson.M{"$gte": filter.ProductFitMin}
}
limit := filter.Limit
if limit <= 0 {
limit = 100
}
if limit > 500 {
limit = 500
}
opts := options.Find().
SetSort(bson.D{{Key: "placement_score", Value: -1}, {Key: "create_at", Value: -1}}).
SetLimit(int64(limit))
cur, err := r.collection.Find(ctx, query, opts)
if err != nil {
return nil, err
}
defer cur.Close(ctx)
var out []entity.ScanPost
if err := cur.All(ctx, &out); err != nil {
return nil, err
}
if filter.Recent7dOnly {
filtered := make([]entity.ScanPost, 0, len(out))
for _, item := range out {
if item.Priority == "gold" || item.Priority == "recent" {
filtered = append(filtered, item)
}
}
return filtered, nil
}
return out, nil
}
// priorityListFilter maps UI track filters to stored priority values.
// gold posts hit both relevance and recency tracks, so they belong in either filter.
func priorityListFilter(priority string) bson.M {
switch strings.TrimSpace(priority) {
case "relevant":
return bson.M{"priority": bson.M{"$in": []string{"relevant", "gold"}}}
case "recent":
return bson.M{"priority": bson.M{"$in": []string{"recent", "gold"}}}
case "gold":
return bson.M{"priority": "gold"}
default:
return nil
}
}
func (r *mongoRepository) CountByBrand(ctx context.Context, tenantID, ownerUID, brandID string) (int, error) {
if r.collection == nil {
return 0, app.For(code.Brand).DBUnavailable("Mongo is not configured")
}
count, err := r.collection.CountDocuments(ctx, brandOwnerFilter(tenantID, ownerUID, brandID))
return int(count), err
}