318 lines
8.9 KiB
Go
318 lines
8.9 KiB
Go
|
|
package repository
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
|
||
|
|
"haixun-backend/internal/library/clock"
|
||
|
|
app "haixun-backend/internal/library/errors"
|
||
|
|
"haixun-backend/internal/library/errors/code"
|
||
|
|
"haixun-backend/internal/model/job/domain/entity"
|
||
|
|
"haixun-backend/internal/model/job/domain/enum"
|
||
|
|
domrepo "haixun-backend/internal/model/job/domain/repository"
|
||
|
|
|
||
|
|
"go.mongodb.org/mongo-driver/bson"
|
||
|
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
||
|
|
"go.mongodb.org/mongo-driver/mongo"
|
||
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||
|
|
)
|
||
|
|
|
||
|
|
type mongoRunRepository struct {
|
||
|
|
collection *mongo.Collection
|
||
|
|
}
|
||
|
|
|
||
|
|
func NewMongoRunRepository(db *mongo.Database) domrepo.RunRepository {
|
||
|
|
if db == nil {
|
||
|
|
return &mongoRunRepository{}
|
||
|
|
}
|
||
|
|
return &mongoRunRepository{collection: db.Collection(entity.RunCollectionName)}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (r *mongoRunRepository) EnsureIndexes(ctx context.Context) error {
|
||
|
|
if r.collection == nil {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
models := []mongo.IndexModel{
|
||
|
|
{Keys: bson.D{{Key: "template_type", Value: 1}, {Key: "scope", Value: 1}, {Key: "scope_id", Value: 1}, {Key: "status", Value: 1}}},
|
||
|
|
{Keys: bson.D{{Key: "create_at", Value: -1}}},
|
||
|
|
{Keys: bson.D{{Key: "dedupe_key", Value: 1}}},
|
||
|
|
}
|
||
|
|
_, err := r.collection.Indexes().CreateMany(ctx, models)
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
func (r *mongoRunRepository) Create(ctx context.Context, run *entity.Run) (*entity.Run, error) {
|
||
|
|
if r.collection == nil {
|
||
|
|
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
|
||
|
|
}
|
||
|
|
now := clock.NowUnixNano()
|
||
|
|
run.CreateAt = now
|
||
|
|
run.UpdateAt = now
|
||
|
|
res, err := r.collection.InsertOne(ctx, run)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
run.ID = res.InsertedID.(primitive.ObjectID)
|
||
|
|
return run, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (r *mongoRunRepository) Update(ctx context.Context, run *entity.Run) (*entity.Run, error) {
|
||
|
|
if r.collection == nil {
|
||
|
|
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
|
||
|
|
}
|
||
|
|
run.UpdateAt = clock.NowUnixNano()
|
||
|
|
filter := bson.M{"_id": run.ID}
|
||
|
|
update, err := runSet(run)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
opts := options.FindOneAndUpdate().SetReturnDocument(options.After)
|
||
|
|
var out entity.Run
|
||
|
|
err = r.collection.FindOneAndUpdate(ctx, filter, update, opts).Decode(&out)
|
||
|
|
if err == mongo.ErrNoDocuments {
|
||
|
|
return nil, app.For(code.Job).ResNotFound("job run not found")
|
||
|
|
}
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
return &out, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (r *mongoRunRepository) UpdateIfStatus(ctx context.Context, run *entity.Run, allowed []enum.RunStatus) (*entity.Run, error) {
|
||
|
|
if r.collection == nil {
|
||
|
|
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
|
||
|
|
}
|
||
|
|
if len(allowed) == 0 {
|
||
|
|
return nil, app.For(code.Job).ResInvalidState("allowed statuses are required")
|
||
|
|
}
|
||
|
|
run.UpdateAt = clock.NowUnixNano()
|
||
|
|
filter := bson.M{"_id": run.ID, "status": bson.M{"$in": allowed}}
|
||
|
|
return r.updateWithFilter(ctx, filter, run)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (r *mongoRunRepository) UpdateIfLocked(ctx context.Context, run *entity.Run, workerID string, allowed []enum.RunStatus) (*entity.Run, error) {
|
||
|
|
if r.collection == nil {
|
||
|
|
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
|
||
|
|
}
|
||
|
|
if workerID == "" {
|
||
|
|
return nil, app.For(code.Job).InputMissingRequired("worker id is required")
|
||
|
|
}
|
||
|
|
if len(allowed) == 0 {
|
||
|
|
return nil, app.For(code.Job).ResInvalidState("allowed statuses are required")
|
||
|
|
}
|
||
|
|
run.UpdateAt = clock.NowUnixNano()
|
||
|
|
filter := bson.M{
|
||
|
|
"_id": run.ID,
|
||
|
|
"locked_by": workerID,
|
||
|
|
"status": bson.M{"$in": allowed},
|
||
|
|
}
|
||
|
|
return r.updateWithFilter(ctx, filter, run)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (r *mongoRunRepository) updateWithFilter(ctx context.Context, filter bson.M, run *entity.Run) (*entity.Run, error) {
|
||
|
|
update, err := runSet(run)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
opts := options.FindOneAndUpdate().SetReturnDocument(options.After)
|
||
|
|
var out entity.Run
|
||
|
|
err = r.collection.FindOneAndUpdate(ctx, filter, update, opts).Decode(&out)
|
||
|
|
if err == mongo.ErrNoDocuments {
|
||
|
|
return nil, app.For(code.Job).ResInvalidState("job state changed; update rejected")
|
||
|
|
}
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
return &out, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func runSet(run *entity.Run) (bson.M, error) {
|
||
|
|
raw, err := bson.Marshal(run)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
set := bson.M{}
|
||
|
|
if err := bson.Unmarshal(raw, &set); err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
delete(set, "_id")
|
||
|
|
return bson.M{"$set": set}, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (r *mongoRunRepository) FindByID(ctx context.Context, id string) (*entity.Run, error) {
|
||
|
|
if r.collection == nil {
|
||
|
|
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
|
||
|
|
}
|
||
|
|
objectID, err := primitive.ObjectIDFromHex(id)
|
||
|
|
if err != nil {
|
||
|
|
return nil, app.For(code.Job).InputInvalidFormat("invalid job id")
|
||
|
|
}
|
||
|
|
var run entity.Run
|
||
|
|
err = r.collection.FindOne(ctx, bson.M{"_id": objectID}).Decode(&run)
|
||
|
|
if err == mongo.ErrNoDocuments {
|
||
|
|
return nil, app.For(code.Job).ResNotFound("job run not found")
|
||
|
|
}
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
return &run, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (r *mongoRunRepository) List(ctx context.Context, filter domrepo.RunListFilter, offset, limit int64) ([]*entity.Run, int64, error) {
|
||
|
|
if r.collection == nil {
|
||
|
|
return nil, 0, app.For(code.Job).DBUnavailable("Mongo is not configured")
|
||
|
|
}
|
||
|
|
query := bson.M{}
|
||
|
|
if filter.Scope != "" {
|
||
|
|
query["scope"] = filter.Scope
|
||
|
|
}
|
||
|
|
if filter.ScopeID != "" {
|
||
|
|
query["scope_id"] = filter.ScopeID
|
||
|
|
}
|
||
|
|
if len(filter.Statuses) > 0 {
|
||
|
|
query["status"] = bson.M{"$in": filter.Statuses}
|
||
|
|
}
|
||
|
|
|
||
|
|
total, err := r.collection.CountDocuments(ctx, query)
|
||
|
|
if err != nil {
|
||
|
|
return nil, 0, err
|
||
|
|
}
|
||
|
|
cursor, err := r.collection.Find(
|
||
|
|
ctx,
|
||
|
|
query,
|
||
|
|
options.Find().SetSort(bson.D{{Key: "create_at", Value: -1}}).SetSkip(offset).SetLimit(limit),
|
||
|
|
)
|
||
|
|
if err != nil {
|
||
|
|
return nil, 0, err
|
||
|
|
}
|
||
|
|
defer cursor.Close(ctx)
|
||
|
|
|
||
|
|
var items []*entity.Run
|
||
|
|
if err := cursor.All(ctx, &items); err != nil {
|
||
|
|
return nil, 0, err
|
||
|
|
}
|
||
|
|
return items, total, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (r *mongoRunRepository) FindActiveByScope(ctx context.Context, templateType, scope, scopeID string) ([]*entity.Run, error) {
|
||
|
|
if r.collection == nil {
|
||
|
|
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
|
||
|
|
}
|
||
|
|
active := []enum.RunStatus{
|
||
|
|
enum.RunStatusPending,
|
||
|
|
enum.RunStatusQueued,
|
||
|
|
enum.RunStatusRunning,
|
||
|
|
enum.RunStatusWaitingWorker,
|
||
|
|
enum.RunStatusCancelRequested,
|
||
|
|
}
|
||
|
|
cursor, err := r.collection.Find(ctx, bson.M{
|
||
|
|
"template_type": templateType,
|
||
|
|
"scope": scope,
|
||
|
|
"scope_id": scopeID,
|
||
|
|
"status": bson.M{"$in": active},
|
||
|
|
})
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
defer cursor.Close(ctx)
|
||
|
|
|
||
|
|
var items []*entity.Run
|
||
|
|
if err := cursor.All(ctx, &items); err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
return items, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (r *mongoRunRepository) FindSucceededByDedupeKey(ctx context.Context, templateType, dedupeKey string) (*entity.Run, error) {
|
||
|
|
if r.collection == nil {
|
||
|
|
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
|
||
|
|
}
|
||
|
|
var run entity.Run
|
||
|
|
err := r.collection.FindOne(ctx, bson.M{
|
||
|
|
"template_type": templateType,
|
||
|
|
"dedupe_key": dedupeKey,
|
||
|
|
"status": enum.RunStatusSucceeded,
|
||
|
|
}).Decode(&run)
|
||
|
|
if err == mongo.ErrNoDocuments {
|
||
|
|
return nil, nil
|
||
|
|
}
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
return &run, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (r *mongoRunRepository) FindPendingDue(ctx context.Context, now int64, limit int64) ([]*entity.Run, error) {
|
||
|
|
if r.collection == nil {
|
||
|
|
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
|
||
|
|
}
|
||
|
|
if limit <= 0 {
|
||
|
|
limit = 50
|
||
|
|
}
|
||
|
|
cursor, err := r.collection.Find(ctx, bson.M{
|
||
|
|
"status": enum.RunStatusPending,
|
||
|
|
"scheduled_at": bson.M{
|
||
|
|
"$lte": now,
|
||
|
|
"$ne": nil,
|
||
|
|
},
|
||
|
|
}, options.Find().SetSort(bson.D{{Key: "scheduled_at", Value: 1}}).SetLimit(limit))
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
defer cursor.Close(ctx)
|
||
|
|
var items []*entity.Run
|
||
|
|
if err := cursor.All(ctx, &items); err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
return items, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (r *mongoRunRepository) FindCancelRequestedBefore(ctx context.Context, before int64, limit int64) ([]*entity.Run, error) {
|
||
|
|
if r.collection == nil {
|
||
|
|
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
|
||
|
|
}
|
||
|
|
if limit <= 0 {
|
||
|
|
limit = 50
|
||
|
|
}
|
||
|
|
cursor, err := r.collection.Find(ctx, bson.M{
|
||
|
|
"status": enum.RunStatusCancelRequested,
|
||
|
|
"cancel_requested_at": bson.M{
|
||
|
|
"$lte": before,
|
||
|
|
"$ne": nil,
|
||
|
|
},
|
||
|
|
}, options.Find().SetSort(bson.D{{Key: "cancel_requested_at", Value: 1}}).SetLimit(limit))
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
defer cursor.Close(ctx)
|
||
|
|
var items []*entity.Run
|
||
|
|
if err := cursor.All(ctx, &items); err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
return items, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (r *mongoRunRepository) FindRunningTimedOut(ctx context.Context, now int64, limit int64) ([]*entity.Run, error) {
|
||
|
|
if r.collection == nil {
|
||
|
|
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
|
||
|
|
}
|
||
|
|
if limit <= 0 {
|
||
|
|
limit = 50
|
||
|
|
}
|
||
|
|
cursor, err := r.collection.Find(ctx, bson.M{
|
||
|
|
"status": bson.M{"$in": []enum.RunStatus{enum.RunStatusRunning, enum.RunStatusWaitingWorker}},
|
||
|
|
"locked_until": bson.M{
|
||
|
|
"$lte": now,
|
||
|
|
"$ne": nil,
|
||
|
|
},
|
||
|
|
}, options.Find().SetSort(bson.D{{Key: "locked_until", Value: 1}}).SetLimit(limit))
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
defer cursor.Close(ctx)
|
||
|
|
var items []*entity.Run
|
||
|
|
if err := cursor.All(ctx, &items); err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
return items, nil
|
||
|
|
}
|