haixunMaster/haixun-backend/internal/model/job/repository/mongo_schedule.go

164 lines
4.8 KiB
Go
Raw Permalink Normal View History

2026-06-23 09:54:27 +00:00
package repository
import (
"context"
"haixun-backend/internal/library/clock"
app "haixun-backend/internal/library/errors"
"haixun-backend/internal/library/errors/code"
"haixun-backend/internal/model/job/domain/entity"
domrepo "haixun-backend/internal/model/job/domain/repository"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type mongoScheduleRepository struct {
collection *mongo.Collection
}
func NewMongoScheduleRepository(db *mongo.Database) domrepo.ScheduleRepository {
if db == nil {
return &mongoScheduleRepository{}
}
return &mongoScheduleRepository{collection: db.Collection(entity.ScheduleCollectionName)}
}
func (r *mongoScheduleRepository) EnsureIndexes(ctx context.Context) error {
if r.collection == nil {
return nil
}
models := []mongo.IndexModel{
{Keys: bson.D{{Key: "enabled", Value: 1}, {Key: "next_run_at", Value: 1}}},
{Keys: bson.D{{Key: "template_type", Value: 1}, {Key: "scope", Value: 1}, {Key: "scope_id", Value: 1}}},
}
_, err := r.collection.Indexes().CreateMany(ctx, models)
return err
}
func (r *mongoScheduleRepository) Create(ctx context.Context, schedule *entity.Schedule) (*entity.Schedule, error) {
if r.collection == nil {
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
}
now := clock.NowUnixNano()
schedule.CreateAt = now
schedule.UpdateAt = now
res, err := r.collection.InsertOne(ctx, schedule)
if err != nil {
return nil, err
}
schedule.ID = res.InsertedID.(primitive.ObjectID)
return schedule, nil
}
func (r *mongoScheduleRepository) Update(ctx context.Context, schedule *entity.Schedule) (*entity.Schedule, error) {
if r.collection == nil {
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
}
schedule.UpdateAt = clock.NowUnixNano()
filter := bson.M{"_id": schedule.ID}
update := bson.M{"$set": schedule}
opts := options.FindOneAndUpdate().SetReturnDocument(options.After)
var out entity.Schedule
err := r.collection.FindOneAndUpdate(ctx, filter, update, opts).Decode(&out)
if err == mongo.ErrNoDocuments {
return nil, app.For(code.Job).ResNotFound("job schedule not found")
}
if err != nil {
return nil, err
}
return &out, nil
}
2026-06-25 09:34:28 +00:00
func (r *mongoScheduleRepository) Delete(ctx context.Context, id string) error {
if r.collection == nil {
return app.For(code.Job).DBUnavailable("Mongo is not configured")
}
objectID, err := primitive.ObjectIDFromHex(id)
if err != nil {
return app.For(code.Job).InputInvalidFormat("invalid schedule id")
}
res, err := r.collection.DeleteOne(ctx, bson.M{"_id": objectID})
if err != nil {
return err
}
if res.DeletedCount == 0 {
return app.For(code.Job).ResNotFound("job schedule not found")
}
return nil
}
2026-06-23 09:54:27 +00:00
func (r *mongoScheduleRepository) FindByID(ctx context.Context, id string) (*entity.Schedule, error) {
if r.collection == nil {
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
}
objectID, err := primitive.ObjectIDFromHex(id)
if err != nil {
return nil, app.For(code.Job).InputInvalidFormat("invalid schedule id")
}
var schedule entity.Schedule
err = r.collection.FindOne(ctx, bson.M{"_id": objectID}).Decode(&schedule)
if err == mongo.ErrNoDocuments {
return nil, app.For(code.Job).ResNotFound("job schedule not found")
}
if err != nil {
return nil, err
}
return &schedule, nil
}
func (r *mongoScheduleRepository) List(ctx context.Context, filter domrepo.ScheduleListFilter, offset, limit int64) ([]*entity.Schedule, int64, error) {
if r.collection == nil {
return nil, 0, app.For(code.Job).DBUnavailable("Mongo is not configured")
}
query := bson.M{}
if filter.Scope != "" {
query["scope"] = filter.Scope
}
if filter.ScopeID != "" {
query["scope_id"] = filter.ScopeID
}
total, err := r.collection.CountDocuments(ctx, query)
if err != nil {
return nil, 0, err
}
cursor, err := r.collection.Find(
ctx,
query,
options.Find().SetSort(bson.D{{Key: "next_run_at", Value: 1}}).SetSkip(offset).SetLimit(limit),
)
if err != nil {
return nil, 0, err
}
defer cursor.Close(ctx)
var items []*entity.Schedule
if err := cursor.All(ctx, &items); err != nil {
return nil, 0, err
}
return items, total, nil
}
func (r *mongoScheduleRepository) FindDue(ctx context.Context, now int64, limit int64) ([]*entity.Schedule, error) {
if r.collection == nil {
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
}
if limit <= 0 {
limit = 50
}
cursor, err := r.collection.Find(ctx, bson.M{
"enabled": true,
"next_run_at": bson.M{"$lte": now},
}, options.Find().SetSort(bson.D{{Key: "next_run_at", Value: 1}}).SetLimit(limit))
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
var items []*entity.Schedule
if err := cursor.All(ctx, &items); err != nil {
return nil, err
}
return items, nil
}