thread-master/backend/internal/worker/job/runner.go

232 lines
5.9 KiB
Go
Raw Permalink Normal View History

2026-06-26 08:37:04 +00:00
package job
import (
"context"
"errors"
"log"
"time"
"haixun-backend/internal/library/clock"
"haixun-backend/internal/model/job/domain/entity"
"haixun-backend/internal/model/job/domain/enum"
domusecase "haixun-backend/internal/model/job/domain/usecase"
"haixun-backend/internal/model/job/resume"
)
type Runner struct {
workerID string
workerType string
jobs domusecase.UseCase
handlers map[string]StepHandler
}
type StepContext struct {
JobID string
WorkerID string
Run *entity.Run
Template *entity.Template
Step entity.TemplateStep
Heartbeat func(ctx context.Context) error
}
type StepHandler func(ctx context.Context, step StepContext) error
func NewRunner(workerID, workerType string, jobs domusecase.UseCase) *Runner {
return &Runner{
workerID: workerID,
workerType: workerType,
jobs: jobs,
handlers: map[string]StepHandler{},
}
}
func (r *Runner) RegisterStepHandler(stepID string, handler StepHandler) {
if stepID == "" || handler == nil {
return
}
r.handlers[stepID] = handler
}
func (r *Runner) Start(ctx context.Context) {
log.Printf("job worker started: id=%s type=%s", r.workerID, r.workerType)
for {
select {
case <-ctx.Done():
log.Printf("job worker stopped: id=%s", r.workerID)
return
default:
run, err := r.jobs.ClaimNext(ctx, domusecase.ClaimNextRequest{
WorkerType: r.workerType,
WorkerID: r.workerID,
})
if err != nil {
log.Printf("job worker claim error: %v", err)
time.Sleep(time.Second)
continue
}
if run == nil {
continue
}
r.execute(ctx, run)
}
}
}
func (r *Runner) execute(ctx context.Context, run *entity.Run) {
jobID := run.ID.Hex()
template, err := r.jobs.GetTemplate(ctx, run.TemplateType)
if err != nil {
_, _ = r.jobs.FailRun(ctx, domusecase.FailRunRequest{JobID: jobID, WorkerID: r.workerID, Error: err.Error()})
return
}
steps := make([]entity.StepProgress, len(run.Progress.Steps))
copy(steps, run.Progress.Steps)
totalSteps := len(template.Steps)
if totalSteps == 0 {
totalSteps = 1
}
completedBefore := resume.CountSucceededSteps(steps)
for _, step := range template.Steps {
existing := resume.StepProgressByID(steps, step.ID)
if existing != nil && resume.ShouldSkipStep(existing.Status) {
continue
}
if cancelled, _ := r.jobs.IsCancelRequested(ctx, jobID); cancelled {
_, _ = r.jobs.AcknowledgeCancel(ctx, domusecase.AcknowledgeCancelRequest{
JobID: jobID,
WorkerID: r.workerID,
})
return
}
now := clock.NowUnixNano()
for i := range steps {
if steps[i].ID == step.ID {
steps[i].Status = enum.StepStatusRunning
steps[i].StartedAt = &now
steps[i].Message = "running"
}
}
percentage := (completedBefore * 100) / totalSteps
2026-06-26 16:02:06 +00:00
if _, err := r.jobs.UpdateProgress(ctx, domusecase.UpdateProgressRequest{
2026-06-26 08:37:04 +00:00
JobID: jobID,
WorkerID: r.workerID,
Phase: step.ID,
Summary: "running step " + step.ID,
Percentage: percentage,
Steps: steps,
2026-06-26 16:02:06 +00:00
}); err != nil {
log.Printf("job worker update progress (running) failed: job=%s step=%s err=%v", jobID, step.ID, err)
}
2026-06-26 08:37:04 +00:00
if err := r.runStep(ctx, run, template, step); err != nil {
if err == errJobCancelled {
_, _ = r.jobs.AcknowledgeCancel(ctx, domusecase.AcknowledgeCancelRequest{
JobID: jobID,
WorkerID: r.workerID,
})
return
}
for i := range steps {
if steps[i].ID == step.ID {
steps[i].Status = enum.StepStatusFailed
ended := clock.NowUnixNano()
steps[i].EndedAt = &ended
steps[i].Message = err.Error()
}
}
_, _ = r.jobs.UpdateProgress(ctx, domusecase.UpdateProgressRequest{
JobID: jobID, WorkerID: r.workerID, Phase: step.ID, Summary: "step failed", Steps: steps,
})
_, _ = r.jobs.FailRun(ctx, domusecase.FailRunRequest{
JobID: jobID, WorkerID: r.workerID, Error: err.Error(), Phase: step.ID,
})
return
}
ended := clock.NowUnixNano()
for i := range steps {
if steps[i].ID == step.ID {
steps[i].Status = enum.StepStatusSucceeded
steps[i].EndedAt = &ended
steps[i].Message = "done"
}
}
completedBefore++
percentage = (completedBefore * 100) / totalSteps
2026-06-26 16:02:06 +00:00
if _, err := r.jobs.UpdateProgress(ctx, domusecase.UpdateProgressRequest{
2026-06-26 08:37:04 +00:00
JobID: jobID,
WorkerID: r.workerID,
Phase: step.ID,
Summary: "completed step " + step.ID,
Percentage: percentage,
Steps: steps,
2026-06-26 16:02:06 +00:00
}); err != nil {
log.Printf("job worker update progress (completed step) failed: job=%s step=%s err=%v", jobID, step.ID, err)
}
2026-06-26 08:37:04 +00:00
}
fresh, err := r.jobs.GetRun(ctx, jobID)
if err == nil && fresh != nil && fresh.Status.IsTerminal() {
return
}
2026-06-26 16:02:06 +00:00
if _, err := r.jobs.CompleteRun(ctx, domusecase.CompleteRunRequest{
2026-06-26 08:37:04 +00:00
JobID: jobID,
WorkerID: r.workerID,
Result: map[string]any{
2026-06-26 16:02:06 +00:00
"template": run.TemplateType,
"steps": len(template.Steps),
2026-06-26 08:37:04 +00:00
},
2026-06-26 16:02:06 +00:00
}); err != nil {
log.Printf("job worker complete run failed: job=%s err=%v", jobID, err)
}
2026-06-26 08:37:04 +00:00
}
var errJobCancelled = errors.New("job cancelled")
func (r *Runner) runStep(ctx context.Context, run *entity.Run, template *entity.Template, step entity.TemplateStep) error {
jobID := run.ID.Hex()
if handler := r.handlers[step.ID]; handler != nil {
return handler(ctx, StepContext{
JobID: jobID,
WorkerID: r.workerID,
Run: run,
Template: template,
Step: step,
Heartbeat: func(ctx context.Context) error {
return r.jobs.RefreshRunLock(ctx, jobID, r.workerID, 600)
},
})
}
ticks := 10
sleepEach := 500 * time.Millisecond
if step.ID == "execute" {
ticks = 20
}
for i := 0; i < ticks; i++ {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err := r.jobs.RefreshRunLock(ctx, jobID, r.workerID, 300); err != nil {
return err
}
cancelled, err := r.jobs.IsCancelRequested(ctx, jobID)
if err != nil {
return err
}
if cancelled {
return errJobCancelled
}
time.Sleep(sleepEach)
}
return nil
}