thread-master/internal/model/job/usecase/maintenance.go

77 lines
2.2 KiB
Go
Raw Permalink Normal View History

2026-06-26 08:37:04 +00:00
package usecase
import (
"context"
"haixun-backend/internal/library/clock"
"haixun-backend/internal/model/job/domain/enum"
domusecase "haixun-backend/internal/model/job/domain/usecase"
)
func (u *jobUseCase) RunMaintenance(ctx context.Context) (domusecase.MaintenanceResult, error) {
result := domusecase.MaintenanceResult{}
pending, err := u.runs.FindPendingDue(ctx, clock.NowUnixNano(), 50)
if err != nil {
return result, err
}
for _, run := range pending {
if _, err := u.enqueueRun(ctx, run); err == nil {
result.EnqueuedPending++
}
}
now := clock.NowUnixNano()
cancelRuns, err := u.runs.FindCancelRequestedBefore(ctx, now, 50)
if err != nil {
return result, err
}
for _, run := range cancelRuns {
template, err := u.templates.FindByType(ctx, run.TemplateType)
if err != nil {
continue
}
grace := clock.SecondsToNanos(template.CancelPolicy.GraceSeconds)
if run.CancelRequestedAt == nil || now-*run.CancelRequestedAt < grace {
continue
}
jobID := run.ID.Hex()
fromStatus := string(run.Status)
run.Status = enum.RunStatusCancelled
run.CompletedAt = &now
run.Progress.Summary = "cancelled after grace timeout"
updated, err := u.runs.Update(ctx, run)
if err != nil {
continue
}
_ = u.queue.ReleaseLock(ctx, jobID, run.LockedBy)
_ = u.queue.ClearCancelSignal(ctx, jobID)
u.releaseDedupe(ctx, updated)
_ = u.appendEvent(ctx, jobID, "status_changed", fromStatus, string(enum.RunStatusCancelled), "cancel grace timeout", nil)
result.ReapedCancelGrace++
}
expiredRuns, err := u.runs.FindRunningTimedOut(ctx, now, 50)
if err != nil {
return result, err
}
for _, run := range expiredRuns {
jobID := run.ID.Hex()
fromStatus := string(run.Status)
run.Status = enum.RunStatusExpired
run.CompletedAt = &now
run.Progress.Summary = "job lock expired"
updated, err := u.runs.Update(ctx, run)
if err != nil {
continue
}
_ = u.queue.ReleaseLock(ctx, jobID, run.LockedBy)
_ = u.queue.ClearCancelSignal(ctx, jobID)
u.releaseDedupe(ctx, updated)
_ = u.appendEvent(ctx, jobID, "status_changed", fromStatus, string(enum.RunStatusExpired), "job lock expired", nil)
result.ReapedExpiredLocks++
}
return result, nil
}