thread-master/backend/internal/model/job/usecase/schedule.go

182 lines
5.1 KiB
Go
Raw Permalink Normal View History

2026-06-26 08:37:04 +00:00
package usecase
import (
"context"
"strings"
"haixun-backend/internal/library/clock"
app "haixun-backend/internal/library/errors"
"haixun-backend/internal/library/errors/code"
jobcron "haixun-backend/internal/model/job/cron"
"haixun-backend/internal/model/job/domain/entity"
domrepo "haixun-backend/internal/model/job/domain/repository"
domusecase "haixun-backend/internal/model/job/domain/usecase"
)
func (u *jobUseCase) ListSchedules(ctx context.Context, scope, scopeID string, page, pageSize int64) ([]*entity.Schedule, int64, int64, int64, int64, error) {
if page <= 0 {
page = 1
}
if pageSize <= 0 {
pageSize = 20
}
if pageSize > 200 {
pageSize = 200
}
offset := (page - 1) * pageSize
items, total, err := u.schedules.List(ctx, domrepo.ScheduleListFilter{
Scope: scope,
ScopeID: scopeID,
}, offset, pageSize)
if err != nil {
return nil, 0, 0, 0, 0, err
}
totalPages := int64(0)
if total > 0 {
totalPages = (total + pageSize - 1) / pageSize
}
return items, total, page, pageSize, totalPages, nil
}
func (u *jobUseCase) GetSchedule(ctx context.Context, scheduleID string) (*entity.Schedule, error) {
return u.schedules.FindByID(ctx, scheduleID)
}
func (u *jobUseCase) CreateSchedule(ctx context.Context, req domusecase.CreateScheduleRequest) (*entity.Schedule, error) {
templateType := strings.TrimSpace(req.TemplateType)
scope := strings.TrimSpace(req.Scope)
scopeID := strings.TrimSpace(req.ScopeID)
cronExpr := strings.TrimSpace(req.Cron)
if templateType == "" || scope == "" || scopeID == "" || cronExpr == "" {
return nil, app.For(code.Job).InputMissingRequired("template_type, scope, scope_id, and cron are required")
}
if _, err := u.templates.FindByType(ctx, templateType); err != nil {
return nil, err
}
timezone := req.Timezone
if timezone == "" {
timezone = "UTC"
}
nextRunAt, err := jobcron.NextRunAt(cronExpr, timezone, clock.Now())
if err != nil {
return nil, app.For(code.Job).InputInvalidFormat(err.Error())
}
schedule := &entity.Schedule{
TemplateType: templateType,
Scope: scope,
ScopeID: scopeID,
Enabled: req.Enabled,
Cron: cronExpr,
Timezone: timezone,
PayloadTemplate: req.PayloadTemplate,
NextRunAt: nextRunAt,
}
return u.schedules.Create(ctx, schedule)
}
func (u *jobUseCase) UpdateSchedule(ctx context.Context, req domusecase.UpdateScheduleRequest) (*entity.Schedule, error) {
schedule, err := u.schedules.FindByID(ctx, req.ID)
if err != nil {
return nil, err
}
if cronExpr := strings.TrimSpace(req.Cron); cronExpr != "" {
schedule.Cron = cronExpr
}
if tz := strings.TrimSpace(req.Timezone); tz != "" {
schedule.Timezone = tz
}
if req.PayloadTemplate != nil {
schedule.PayloadTemplate = req.PayloadTemplate
}
if req.Enabled != nil {
schedule.Enabled = *req.Enabled
}
nextRunAt, err := jobcron.NextRunAt(schedule.Cron, schedule.Timezone, clock.Now())
if err != nil {
return nil, app.For(code.Job).InputInvalidFormat(err.Error())
}
schedule.NextRunAt = nextRunAt
return u.schedules.Update(ctx, schedule)
}
func (u *jobUseCase) EnableSchedule(ctx context.Context, scheduleID string) (*entity.Schedule, error) {
schedule, err := u.schedules.FindByID(ctx, scheduleID)
if err != nil {
return nil, err
}
schedule.Enabled = true
nextRunAt, err := jobcron.NextRunAt(schedule.Cron, schedule.Timezone, clock.Now())
if err != nil {
return nil, app.For(code.Job).InputInvalidFormat(err.Error())
}
schedule.NextRunAt = nextRunAt
return u.schedules.Update(ctx, schedule)
}
func (u *jobUseCase) DisableSchedule(ctx context.Context, scheduleID string) (*entity.Schedule, error) {
schedule, err := u.schedules.FindByID(ctx, scheduleID)
if err != nil {
return nil, err
}
schedule.Enabled = false
return u.schedules.Update(ctx, schedule)
}
func (u *jobUseCase) DeleteSchedule(ctx context.Context, scheduleID string) error {
if strings.TrimSpace(scheduleID) == "" {
return app.For(code.Job).InputMissingRequired("schedule id is required")
}
if _, err := u.schedules.FindByID(ctx, scheduleID); err != nil {
return err
}
return u.schedules.Delete(ctx, scheduleID)
}
func (u *jobUseCase) RunSchedulerTick(ctx context.Context, holder string) (int, error) {
ok, err := u.queue.TrySchedulerLock(ctx, holder, 55)
if err != nil {
return 0, err
}
if !ok {
return 0, nil
}
defer func() { _ = u.queue.ReleaseSchedulerLock(ctx, holder) }()
now := clock.NowUnixNano()
due, err := u.schedules.FindDue(ctx, now, 50)
if err != nil {
return 0, err
}
created := 0
var firstErr error
for _, schedule := range due {
payload := buildScheduleRunPayload(schedule, now)
if _, err := u.CreateRun(ctx, domusecase.CreateRunRequest{
TemplateType: schedule.TemplateType,
Scope: schedule.Scope,
ScopeID: schedule.ScopeID,
Payload: payload,
}); err != nil {
if firstErr == nil {
firstErr = err
}
continue
}
created++
lastRun := now
schedule.LastRunAt = &lastRun
nextRunAt, calcErr := jobcron.NextRunAt(schedule.Cron, schedule.Timezone, clock.Now())
if calcErr != nil {
continue
}
schedule.NextRunAt = nextRunAt
_, _ = u.schedules.Update(ctx, schedule)
}
return created, firstErr
}