package repository import ( "context" "haixun-backend/internal/library/clock" app "haixun-backend/internal/library/errors" "haixun-backend/internal/library/errors/code" "haixun-backend/internal/model/job/domain/entity" "haixun-backend/internal/model/job/domain/enum" domrepo "haixun-backend/internal/model/job/domain/repository" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) type mongoRunRepository struct { collection *mongo.Collection } func NewMongoRunRepository(db *mongo.Database) domrepo.RunRepository { if db == nil { return &mongoRunRepository{} } return &mongoRunRepository{collection: db.Collection(entity.RunCollectionName)} } func (r *mongoRunRepository) EnsureIndexes(ctx context.Context) error { if r.collection == nil { return nil } models := []mongo.IndexModel{ {Keys: bson.D{{Key: "template_type", Value: 1}, {Key: "scope", Value: 1}, {Key: "scope_id", Value: 1}, {Key: "status", Value: 1}}}, {Keys: bson.D{{Key: "tenant_id", Value: 1}, {Key: "owner_uid", Value: 1}, {Key: "create_at", Value: -1}}}, {Keys: bson.D{{Key: "payload.owner_uid", Value: 1}, {Key: "create_at", Value: -1}}}, {Keys: bson.D{{Key: "create_at", Value: -1}}}, {Keys: bson.D{{Key: "dedupe_key", Value: 1}}}, } _, err := r.collection.Indexes().CreateMany(ctx, models) return err } func (r *mongoRunRepository) Create(ctx context.Context, run *entity.Run) (*entity.Run, error) { if r.collection == nil { return nil, app.For(code.Job).DBUnavailable("Mongo is not configured") } now := clock.NowUnixNano() run.CreateAt = now run.UpdateAt = now res, err := r.collection.InsertOne(ctx, run) if err != nil { return nil, err } run.ID = res.InsertedID.(primitive.ObjectID) return run, nil } func (r *mongoRunRepository) Update(ctx context.Context, run *entity.Run) (*entity.Run, error) { if r.collection == nil { return nil, app.For(code.Job).DBUnavailable("Mongo is not configured") } run.UpdateAt = clock.NowUnixNano() filter := bson.M{"_id": run.ID} update, err := runSet(run) if err != nil { return nil, err } opts := options.FindOneAndUpdate().SetReturnDocument(options.After) var out entity.Run err = r.collection.FindOneAndUpdate(ctx, filter, update, opts).Decode(&out) if err == mongo.ErrNoDocuments { return nil, app.For(code.Job).ResNotFound("job run not found") } if err != nil { return nil, err } return &out, nil } func (r *mongoRunRepository) UpdateIfStatus(ctx context.Context, run *entity.Run, allowed []enum.RunStatus) (*entity.Run, error) { if r.collection == nil { return nil, app.For(code.Job).DBUnavailable("Mongo is not configured") } if len(allowed) == 0 { return nil, app.For(code.Job).ResInvalidState("allowed statuses are required") } run.UpdateAt = clock.NowUnixNano() filter := bson.M{"_id": run.ID, "status": bson.M{"$in": allowed}} return r.updateWithFilter(ctx, filter, run) } func (r *mongoRunRepository) UpdateIfLocked(ctx context.Context, run *entity.Run, workerID string, allowed []enum.RunStatus) (*entity.Run, error) { if r.collection == nil { return nil, app.For(code.Job).DBUnavailable("Mongo is not configured") } if workerID == "" { return nil, app.For(code.Job).InputMissingRequired("worker id is required") } if len(allowed) == 0 { return nil, app.For(code.Job).ResInvalidState("allowed statuses are required") } run.UpdateAt = clock.NowUnixNano() filter := bson.M{ "_id": run.ID, "locked_by": workerID, "status": bson.M{"$in": allowed}, } return r.updateWithFilter(ctx, filter, run) } func (r *mongoRunRepository) updateWithFilter(ctx context.Context, filter bson.M, run *entity.Run) (*entity.Run, error) { update, err := runSet(run) if err != nil { return nil, err } opts := options.FindOneAndUpdate().SetReturnDocument(options.After) var out entity.Run err = r.collection.FindOneAndUpdate(ctx, filter, update, opts).Decode(&out) if err == mongo.ErrNoDocuments { return nil, app.For(code.Job).ResInvalidState("job state changed; update rejected") } if err != nil { return nil, err } return &out, nil } func runSet(run *entity.Run) (bson.M, error) { raw, err := bson.Marshal(run) if err != nil { return nil, err } set := bson.M{} if err := bson.Unmarshal(raw, &set); err != nil { return nil, err } delete(set, "_id") return bson.M{"$set": set}, nil } func (r *mongoRunRepository) FindByID(ctx context.Context, id string) (*entity.Run, error) { if r.collection == nil { return nil, app.For(code.Job).DBUnavailable("Mongo is not configured") } objectID, err := primitive.ObjectIDFromHex(id) if err != nil { return nil, app.For(code.Job).InputInvalidFormat("invalid job id") } var run entity.Run err = r.collection.FindOne(ctx, bson.M{"_id": objectID}).Decode(&run) if err == mongo.ErrNoDocuments { return nil, app.For(code.Job).ResNotFound("job run not found") } if err != nil { return nil, err } return &run, nil } func (r *mongoRunRepository) List(ctx context.Context, filter domrepo.RunListFilter, offset, limit int64) ([]*entity.Run, int64, error) { if r.collection == nil { return nil, 0, app.For(code.Job).DBUnavailable("Mongo is not configured") } query := buildRunListQuery(filter) total, err := r.collection.CountDocuments(ctx, query) if err != nil { return nil, 0, err } cursor, err := r.collection.Find( ctx, query, options.Find().SetSort(bson.D{{Key: "create_at", Value: -1}}).SetSkip(offset).SetLimit(limit), ) if err != nil { return nil, 0, err } defer cursor.Close(ctx) var items []*entity.Run if err := cursor.All(ctx, &items); err != nil { return nil, 0, err } return items, total, nil } func (r *mongoRunRepository) FindActiveByScope(ctx context.Context, templateType, scope, scopeID string) ([]*entity.Run, error) { if r.collection == nil { return nil, app.For(code.Job).DBUnavailable("Mongo is not configured") } active := []enum.RunStatus{ enum.RunStatusPending, enum.RunStatusQueued, enum.RunStatusRunning, enum.RunStatusWaitingWorker, enum.RunStatusCancelRequested, } cursor, err := r.collection.Find(ctx, bson.M{ "template_type": templateType, "scope": scope, "scope_id": scopeID, "status": bson.M{"$in": active}, }) if err != nil { return nil, err } defer cursor.Close(ctx) var items []*entity.Run if err := cursor.All(ctx, &items); err != nil { return nil, err } return items, nil } func (r *mongoRunRepository) FindSucceededByDedupeKey(ctx context.Context, templateType, dedupeKey string) (*entity.Run, error) { if r.collection == nil { return nil, app.For(code.Job).DBUnavailable("Mongo is not configured") } var run entity.Run err := r.collection.FindOne(ctx, bson.M{ "template_type": templateType, "dedupe_key": dedupeKey, "status": enum.RunStatusSucceeded, }).Decode(&run) if err == mongo.ErrNoDocuments { return nil, nil } if err != nil { return nil, err } return &run, nil } func (r *mongoRunRepository) FindPendingDue(ctx context.Context, now int64, limit int64) ([]*entity.Run, error) { if r.collection == nil { return nil, app.For(code.Job).DBUnavailable("Mongo is not configured") } if limit <= 0 { limit = 50 } cursor, err := r.collection.Find(ctx, bson.M{ "status": enum.RunStatusPending, "scheduled_at": bson.M{ "$lte": now, "$ne": nil, }, }, options.Find().SetSort(bson.D{{Key: "scheduled_at", Value: 1}}).SetLimit(limit)) if err != nil { return nil, err } defer cursor.Close(ctx) var items []*entity.Run if err := cursor.All(ctx, &items); err != nil { return nil, err } return items, nil } func (r *mongoRunRepository) FindCancelRequestedBefore(ctx context.Context, before int64, limit int64) ([]*entity.Run, error) { if r.collection == nil { return nil, app.For(code.Job).DBUnavailable("Mongo is not configured") } if limit <= 0 { limit = 50 } cursor, err := r.collection.Find(ctx, bson.M{ "status": enum.RunStatusCancelRequested, "cancel_requested_at": bson.M{ "$lte": before, "$ne": nil, }, }, options.Find().SetSort(bson.D{{Key: "cancel_requested_at", Value: 1}}).SetLimit(limit)) if err != nil { return nil, err } defer cursor.Close(ctx) var items []*entity.Run if err := cursor.All(ctx, &items); err != nil { return nil, err } return items, nil } func (r *mongoRunRepository) FindRunningTimedOut(ctx context.Context, now int64, limit int64) ([]*entity.Run, error) { if r.collection == nil { return nil, app.For(code.Job).DBUnavailable("Mongo is not configured") } if limit <= 0 { limit = 50 } cursor, err := r.collection.Find(ctx, bson.M{ "status": bson.M{"$in": []enum.RunStatus{enum.RunStatusRunning, enum.RunStatusWaitingWorker}}, "locked_until": bson.M{ "$lte": now, "$ne": nil, }, }, options.Find().SetSort(bson.D{{Key: "locked_until", Value: 1}}).SetLimit(limit)) if err != nil { return nil, err } defer cursor.Close(ctx) var items []*entity.Run if err := cursor.All(ctx, &items); err != nil { return nil, err } return items, nil } func buildRunListQuery(filter domrepo.RunListFilter) bson.M { clauses := make([]bson.M, 0, 4) if filter.Scope != "" { clauses = append(clauses, bson.M{"scope": filter.Scope}) } if filter.ScopeID != "" { clauses = append(clauses, bson.M{"scope_id": filter.ScopeID}) } if filter.OwnerUID != "" { clauses = append(clauses, bson.M{"$or": []bson.M{ {"owner_uid": filter.OwnerUID}, {"payload.owner_uid": filter.OwnerUID}, {"scope": "user", "scope_id": filter.OwnerUID}, }}) } if filter.TenantID != "" { clauses = append(clauses, bson.M{"$or": []bson.M{ {"tenant_id": filter.TenantID}, {"payload.tenant_id": filter.TenantID}, {"$and": []bson.M{ {"$or": []bson.M{ {"tenant_id": bson.M{"$exists": false}}, {"tenant_id": ""}, }}, {"$or": []bson.M{ {"payload.tenant_id": bson.M{"$exists": false}}, {"payload.tenant_id": ""}, }}, }}, }}) } if len(filter.Statuses) > 0 { clauses = append(clauses, bson.M{"status": bson.M{"$in": filter.Statuses}}) } if len(clauses) == 0 { return bson.M{} } if len(clauses) == 1 { return clauses[0] } return bson.M{"$and": clauses} }