haixunMaster/haixun-backend/internal/model/job/repository/mongo_run.go

354 lines
10 KiB
Go

package repository
import (
"context"
"haixun-backend/internal/library/clock"
app "haixun-backend/internal/library/errors"
"haixun-backend/internal/library/errors/code"
"haixun-backend/internal/model/job/domain/entity"
"haixun-backend/internal/model/job/domain/enum"
domrepo "haixun-backend/internal/model/job/domain/repository"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type mongoRunRepository struct {
collection *mongo.Collection
}
func NewMongoRunRepository(db *mongo.Database) domrepo.RunRepository {
if db == nil {
return &mongoRunRepository{}
}
return &mongoRunRepository{collection: db.Collection(entity.RunCollectionName)}
}
func (r *mongoRunRepository) EnsureIndexes(ctx context.Context) error {
if r.collection == nil {
return nil
}
models := []mongo.IndexModel{
{Keys: bson.D{{Key: "template_type", Value: 1}, {Key: "scope", Value: 1}, {Key: "scope_id", Value: 1}, {Key: "status", Value: 1}}},
{Keys: bson.D{{Key: "tenant_id", Value: 1}, {Key: "owner_uid", Value: 1}, {Key: "create_at", Value: -1}}},
{Keys: bson.D{{Key: "payload.owner_uid", Value: 1}, {Key: "create_at", Value: -1}}},
{Keys: bson.D{{Key: "create_at", Value: -1}}},
{Keys: bson.D{{Key: "dedupe_key", Value: 1}}},
}
_, err := r.collection.Indexes().CreateMany(ctx, models)
return err
}
func (r *mongoRunRepository) Create(ctx context.Context, run *entity.Run) (*entity.Run, error) {
if r.collection == nil {
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
}
now := clock.NowUnixNano()
run.CreateAt = now
run.UpdateAt = now
res, err := r.collection.InsertOne(ctx, run)
if err != nil {
return nil, err
}
run.ID = res.InsertedID.(primitive.ObjectID)
return run, nil
}
func (r *mongoRunRepository) Update(ctx context.Context, run *entity.Run) (*entity.Run, error) {
if r.collection == nil {
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
}
run.UpdateAt = clock.NowUnixNano()
filter := bson.M{"_id": run.ID}
update, err := runSet(run)
if err != nil {
return nil, err
}
opts := options.FindOneAndUpdate().SetReturnDocument(options.After)
var out entity.Run
err = r.collection.FindOneAndUpdate(ctx, filter, update, opts).Decode(&out)
if err == mongo.ErrNoDocuments {
return nil, app.For(code.Job).ResNotFound("job run not found")
}
if err != nil {
return nil, err
}
return &out, nil
}
func (r *mongoRunRepository) UpdateIfStatus(ctx context.Context, run *entity.Run, allowed []enum.RunStatus) (*entity.Run, error) {
if r.collection == nil {
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
}
if len(allowed) == 0 {
return nil, app.For(code.Job).ResInvalidState("allowed statuses are required")
}
run.UpdateAt = clock.NowUnixNano()
filter := bson.M{"_id": run.ID, "status": bson.M{"$in": allowed}}
return r.updateWithFilter(ctx, filter, run)
}
func (r *mongoRunRepository) UpdateIfLocked(ctx context.Context, run *entity.Run, workerID string, allowed []enum.RunStatus) (*entity.Run, error) {
if r.collection == nil {
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
}
if workerID == "" {
return nil, app.For(code.Job).InputMissingRequired("worker id is required")
}
if len(allowed) == 0 {
return nil, app.For(code.Job).ResInvalidState("allowed statuses are required")
}
run.UpdateAt = clock.NowUnixNano()
filter := bson.M{
"_id": run.ID,
"locked_by": workerID,
"status": bson.M{"$in": allowed},
}
return r.updateWithFilter(ctx, filter, run)
}
func (r *mongoRunRepository) updateWithFilter(ctx context.Context, filter bson.M, run *entity.Run) (*entity.Run, error) {
update, err := runSet(run)
if err != nil {
return nil, err
}
opts := options.FindOneAndUpdate().SetReturnDocument(options.After)
var out entity.Run
err = r.collection.FindOneAndUpdate(ctx, filter, update, opts).Decode(&out)
if err == mongo.ErrNoDocuments {
return nil, app.For(code.Job).ResInvalidState("job state changed; update rejected")
}
if err != nil {
return nil, err
}
return &out, nil
}
func runSet(run *entity.Run) (bson.M, error) {
raw, err := bson.Marshal(run)
if err != nil {
return nil, err
}
set := bson.M{}
if err := bson.Unmarshal(raw, &set); err != nil {
return nil, err
}
delete(set, "_id")
return bson.M{"$set": set}, nil
}
func (r *mongoRunRepository) FindByID(ctx context.Context, id string) (*entity.Run, error) {
if r.collection == nil {
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
}
objectID, err := primitive.ObjectIDFromHex(id)
if err != nil {
return nil, app.For(code.Job).InputInvalidFormat("invalid job id")
}
var run entity.Run
err = r.collection.FindOne(ctx, bson.M{"_id": objectID}).Decode(&run)
if err == mongo.ErrNoDocuments {
return nil, app.For(code.Job).ResNotFound("job run not found")
}
if err != nil {
return nil, err
}
return &run, nil
}
func (r *mongoRunRepository) List(ctx context.Context, filter domrepo.RunListFilter, offset, limit int64) ([]*entity.Run, int64, error) {
if r.collection == nil {
return nil, 0, app.For(code.Job).DBUnavailable("Mongo is not configured")
}
query := buildRunListQuery(filter)
total, err := r.collection.CountDocuments(ctx, query)
if err != nil {
return nil, 0, err
}
cursor, err := r.collection.Find(
ctx,
query,
options.Find().SetSort(bson.D{{Key: "create_at", Value: -1}}).SetSkip(offset).SetLimit(limit),
)
if err != nil {
return nil, 0, err
}
defer cursor.Close(ctx)
var items []*entity.Run
if err := cursor.All(ctx, &items); err != nil {
return nil, 0, err
}
return items, total, nil
}
func (r *mongoRunRepository) FindActiveByScope(ctx context.Context, templateType, scope, scopeID string) ([]*entity.Run, error) {
if r.collection == nil {
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
}
active := []enum.RunStatus{
enum.RunStatusPending,
enum.RunStatusQueued,
enum.RunStatusRunning,
enum.RunStatusWaitingWorker,
enum.RunStatusCancelRequested,
}
cursor, err := r.collection.Find(ctx, bson.M{
"template_type": templateType,
"scope": scope,
"scope_id": scopeID,
"status": bson.M{"$in": active},
})
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
var items []*entity.Run
if err := cursor.All(ctx, &items); err != nil {
return nil, err
}
return items, nil
}
func (r *mongoRunRepository) FindSucceededByDedupeKey(ctx context.Context, templateType, dedupeKey string) (*entity.Run, error) {
if r.collection == nil {
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
}
var run entity.Run
err := r.collection.FindOne(ctx, bson.M{
"template_type": templateType,
"dedupe_key": dedupeKey,
"status": enum.RunStatusSucceeded,
}).Decode(&run)
if err == mongo.ErrNoDocuments {
return nil, nil
}
if err != nil {
return nil, err
}
return &run, nil
}
func (r *mongoRunRepository) FindPendingDue(ctx context.Context, now int64, limit int64) ([]*entity.Run, error) {
if r.collection == nil {
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
}
if limit <= 0 {
limit = 50
}
cursor, err := r.collection.Find(ctx, bson.M{
"status": enum.RunStatusPending,
"scheduled_at": bson.M{
"$lte": now,
"$ne": nil,
},
}, options.Find().SetSort(bson.D{{Key: "scheduled_at", Value: 1}}).SetLimit(limit))
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
var items []*entity.Run
if err := cursor.All(ctx, &items); err != nil {
return nil, err
}
return items, nil
}
func (r *mongoRunRepository) FindCancelRequestedBefore(ctx context.Context, before int64, limit int64) ([]*entity.Run, error) {
if r.collection == nil {
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
}
if limit <= 0 {
limit = 50
}
cursor, err := r.collection.Find(ctx, bson.M{
"status": enum.RunStatusCancelRequested,
"cancel_requested_at": bson.M{
"$lte": before,
"$ne": nil,
},
}, options.Find().SetSort(bson.D{{Key: "cancel_requested_at", Value: 1}}).SetLimit(limit))
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
var items []*entity.Run
if err := cursor.All(ctx, &items); err != nil {
return nil, err
}
return items, nil
}
func (r *mongoRunRepository) FindRunningTimedOut(ctx context.Context, now int64, limit int64) ([]*entity.Run, error) {
if r.collection == nil {
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
}
if limit <= 0 {
limit = 50
}
cursor, err := r.collection.Find(ctx, bson.M{
"status": bson.M{"$in": []enum.RunStatus{enum.RunStatusRunning, enum.RunStatusWaitingWorker}},
"locked_until": bson.M{
"$lte": now,
"$ne": nil,
},
}, options.Find().SetSort(bson.D{{Key: "locked_until", Value: 1}}).SetLimit(limit))
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
var items []*entity.Run
if err := cursor.All(ctx, &items); err != nil {
return nil, err
}
return items, nil
}
func buildRunListQuery(filter domrepo.RunListFilter) bson.M {
clauses := make([]bson.M, 0, 4)
if filter.Scope != "" {
clauses = append(clauses, bson.M{"scope": filter.Scope})
}
if filter.ScopeID != "" {
clauses = append(clauses, bson.M{"scope_id": filter.ScopeID})
}
if filter.OwnerUID != "" {
clauses = append(clauses, bson.M{"$or": []bson.M{
{"owner_uid": filter.OwnerUID},
{"payload.owner_uid": filter.OwnerUID},
{"scope": "user", "scope_id": filter.OwnerUID},
}})
}
if filter.TenantID != "" {
clauses = append(clauses, bson.M{"$or": []bson.M{
{"tenant_id": filter.TenantID},
{"payload.tenant_id": filter.TenantID},
{"$and": []bson.M{
{"$or": []bson.M{
{"tenant_id": bson.M{"$exists": false}},
{"tenant_id": ""},
}},
{"$or": []bson.M{
{"payload.tenant_id": bson.M{"$exists": false}},
{"payload.tenant_id": ""},
}},
}},
}})
}
if len(filter.Statuses) > 0 {
clauses = append(clauses, bson.M{"status": bson.M{"$in": filter.Statuses}})
}
if len(clauses) == 0 {
return bson.M{}
}
if len(clauses) == 1 {
return clauses[0]
}
return bson.M{"$and": clauses}
}