2026-06-23 09:54:27 +00:00
|
|
|
package repository
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
|
|
|
|
|
"haixun-backend/internal/library/clock"
|
|
|
|
|
app "haixun-backend/internal/library/errors"
|
|
|
|
|
"haixun-backend/internal/library/errors/code"
|
|
|
|
|
"haixun-backend/internal/model/job/domain/entity"
|
|
|
|
|
domrepo "haixun-backend/internal/model/job/domain/repository"
|
|
|
|
|
|
|
|
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
|
|
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
|
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
|
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type mongoScheduleRepository struct {
|
|
|
|
|
collection *mongo.Collection
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewMongoScheduleRepository(db *mongo.Database) domrepo.ScheduleRepository {
|
|
|
|
|
if db == nil {
|
|
|
|
|
return &mongoScheduleRepository{}
|
|
|
|
|
}
|
|
|
|
|
return &mongoScheduleRepository{collection: db.Collection(entity.ScheduleCollectionName)}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *mongoScheduleRepository) EnsureIndexes(ctx context.Context) error {
|
|
|
|
|
if r.collection == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
models := []mongo.IndexModel{
|
|
|
|
|
{Keys: bson.D{{Key: "enabled", Value: 1}, {Key: "next_run_at", Value: 1}}},
|
|
|
|
|
{Keys: bson.D{{Key: "template_type", Value: 1}, {Key: "scope", Value: 1}, {Key: "scope_id", Value: 1}}},
|
|
|
|
|
}
|
|
|
|
|
_, err := r.collection.Indexes().CreateMany(ctx, models)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *mongoScheduleRepository) Create(ctx context.Context, schedule *entity.Schedule) (*entity.Schedule, error) {
|
|
|
|
|
if r.collection == nil {
|
|
|
|
|
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
|
|
|
|
|
}
|
|
|
|
|
now := clock.NowUnixNano()
|
|
|
|
|
schedule.CreateAt = now
|
|
|
|
|
schedule.UpdateAt = now
|
|
|
|
|
res, err := r.collection.InsertOne(ctx, schedule)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
schedule.ID = res.InsertedID.(primitive.ObjectID)
|
|
|
|
|
return schedule, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *mongoScheduleRepository) Update(ctx context.Context, schedule *entity.Schedule) (*entity.Schedule, error) {
|
|
|
|
|
if r.collection == nil {
|
|
|
|
|
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
|
|
|
|
|
}
|
|
|
|
|
schedule.UpdateAt = clock.NowUnixNano()
|
|
|
|
|
filter := bson.M{"_id": schedule.ID}
|
|
|
|
|
update := bson.M{"$set": schedule}
|
|
|
|
|
opts := options.FindOneAndUpdate().SetReturnDocument(options.After)
|
|
|
|
|
var out entity.Schedule
|
|
|
|
|
err := r.collection.FindOneAndUpdate(ctx, filter, update, opts).Decode(&out)
|
|
|
|
|
if err == mongo.ErrNoDocuments {
|
|
|
|
|
return nil, app.For(code.Job).ResNotFound("job schedule not found")
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return &out, nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-06-25 09:34:28 +00:00
|
|
|
func (r *mongoScheduleRepository) Delete(ctx context.Context, id string) error {
|
|
|
|
|
if r.collection == nil {
|
|
|
|
|
return app.For(code.Job).DBUnavailable("Mongo is not configured")
|
|
|
|
|
}
|
|
|
|
|
objectID, err := primitive.ObjectIDFromHex(id)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return app.For(code.Job).InputInvalidFormat("invalid schedule id")
|
|
|
|
|
}
|
|
|
|
|
res, err := r.collection.DeleteOne(ctx, bson.M{"_id": objectID})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if res.DeletedCount == 0 {
|
|
|
|
|
return app.For(code.Job).ResNotFound("job schedule not found")
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-06-23 09:54:27 +00:00
|
|
|
func (r *mongoScheduleRepository) FindByID(ctx context.Context, id string) (*entity.Schedule, error) {
|
|
|
|
|
if r.collection == nil {
|
|
|
|
|
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
|
|
|
|
|
}
|
|
|
|
|
objectID, err := primitive.ObjectIDFromHex(id)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, app.For(code.Job).InputInvalidFormat("invalid schedule id")
|
|
|
|
|
}
|
|
|
|
|
var schedule entity.Schedule
|
|
|
|
|
err = r.collection.FindOne(ctx, bson.M{"_id": objectID}).Decode(&schedule)
|
|
|
|
|
if err == mongo.ErrNoDocuments {
|
|
|
|
|
return nil, app.For(code.Job).ResNotFound("job schedule not found")
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return &schedule, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *mongoScheduleRepository) List(ctx context.Context, filter domrepo.ScheduleListFilter, offset, limit int64) ([]*entity.Schedule, int64, error) {
|
|
|
|
|
if r.collection == nil {
|
|
|
|
|
return nil, 0, app.For(code.Job).DBUnavailable("Mongo is not configured")
|
|
|
|
|
}
|
|
|
|
|
query := bson.M{}
|
|
|
|
|
if filter.Scope != "" {
|
|
|
|
|
query["scope"] = filter.Scope
|
|
|
|
|
}
|
|
|
|
|
if filter.ScopeID != "" {
|
|
|
|
|
query["scope_id"] = filter.ScopeID
|
|
|
|
|
}
|
|
|
|
|
total, err := r.collection.CountDocuments(ctx, query)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, 0, err
|
|
|
|
|
}
|
|
|
|
|
cursor, err := r.collection.Find(
|
|
|
|
|
ctx,
|
|
|
|
|
query,
|
|
|
|
|
options.Find().SetSort(bson.D{{Key: "next_run_at", Value: 1}}).SetSkip(offset).SetLimit(limit),
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, 0, err
|
|
|
|
|
}
|
|
|
|
|
defer cursor.Close(ctx)
|
|
|
|
|
var items []*entity.Schedule
|
|
|
|
|
if err := cursor.All(ctx, &items); err != nil {
|
|
|
|
|
return nil, 0, err
|
|
|
|
|
}
|
|
|
|
|
return items, total, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *mongoScheduleRepository) FindDue(ctx context.Context, now int64, limit int64) ([]*entity.Schedule, error) {
|
|
|
|
|
if r.collection == nil {
|
|
|
|
|
return nil, app.For(code.Job).DBUnavailable("Mongo is not configured")
|
|
|
|
|
}
|
|
|
|
|
if limit <= 0 {
|
|
|
|
|
limit = 50
|
|
|
|
|
}
|
|
|
|
|
cursor, err := r.collection.Find(ctx, bson.M{
|
|
|
|
|
"enabled": true,
|
|
|
|
|
"next_run_at": bson.M{"$lte": now},
|
|
|
|
|
}, options.Find().SetSort(bson.D{{Key: "next_run_at", Value: 1}}).SetLimit(limit))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
defer cursor.Close(ctx)
|
|
|
|
|
var items []*entity.Schedule
|
|
|
|
|
if err := cursor.All(ctx, &items); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return items, nil
|
|
|
|
|
}
|