222 lines
5.4 KiB
Go
222 lines
5.4 KiB
Go
package job
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"log"
|
|
"time"
|
|
|
|
"haixun-backend/internal/library/clock"
|
|
"haixun-backend/internal/model/job/domain/entity"
|
|
"haixun-backend/internal/model/job/domain/enum"
|
|
domusecase "haixun-backend/internal/model/job/domain/usecase"
|
|
"haixun-backend/internal/model/job/resume"
|
|
)
|
|
|
|
type Runner struct {
|
|
workerID string
|
|
workerType string
|
|
jobs domusecase.UseCase
|
|
handlers map[string]StepHandler
|
|
}
|
|
|
|
type StepContext struct {
|
|
JobID string
|
|
WorkerID string
|
|
Run *entity.Run
|
|
Template *entity.Template
|
|
Step entity.TemplateStep
|
|
Heartbeat func(ctx context.Context) error
|
|
}
|
|
|
|
type StepHandler func(ctx context.Context, step StepContext) error
|
|
|
|
func NewRunner(workerID, workerType string, jobs domusecase.UseCase) *Runner {
|
|
return &Runner{
|
|
workerID: workerID,
|
|
workerType: workerType,
|
|
jobs: jobs,
|
|
handlers: map[string]StepHandler{},
|
|
}
|
|
}
|
|
|
|
func (r *Runner) RegisterStepHandler(stepID string, handler StepHandler) {
|
|
if stepID == "" || handler == nil {
|
|
return
|
|
}
|
|
r.handlers[stepID] = handler
|
|
}
|
|
|
|
func (r *Runner) Start(ctx context.Context) {
|
|
log.Printf("job worker started: id=%s type=%s", r.workerID, r.workerType)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Printf("job worker stopped: id=%s", r.workerID)
|
|
return
|
|
default:
|
|
run, err := r.jobs.ClaimNext(ctx, domusecase.ClaimNextRequest{
|
|
WorkerType: r.workerType,
|
|
WorkerID: r.workerID,
|
|
})
|
|
if err != nil {
|
|
log.Printf("job worker claim error: %v", err)
|
|
time.Sleep(time.Second)
|
|
continue
|
|
}
|
|
if run == nil {
|
|
continue
|
|
}
|
|
r.execute(ctx, run)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Runner) execute(ctx context.Context, run *entity.Run) {
|
|
jobID := run.ID.Hex()
|
|
template, err := r.jobs.GetTemplate(ctx, run.TemplateType)
|
|
if err != nil {
|
|
_, _ = r.jobs.FailRun(ctx, domusecase.FailRunRequest{JobID: jobID, WorkerID: r.workerID, Error: err.Error()})
|
|
return
|
|
}
|
|
|
|
steps := make([]entity.StepProgress, len(run.Progress.Steps))
|
|
copy(steps, run.Progress.Steps)
|
|
totalSteps := len(template.Steps)
|
|
if totalSteps == 0 {
|
|
totalSteps = 1
|
|
}
|
|
completedBefore := resume.CountSucceededSteps(steps)
|
|
|
|
for _, step := range template.Steps {
|
|
existing := resume.StepProgressByID(steps, step.ID)
|
|
if existing != nil && resume.ShouldSkipStep(existing.Status) {
|
|
continue
|
|
}
|
|
|
|
if cancelled, _ := r.jobs.IsCancelRequested(ctx, jobID); cancelled {
|
|
_, _ = r.jobs.AcknowledgeCancel(ctx, domusecase.AcknowledgeCancelRequest{
|
|
JobID: jobID,
|
|
WorkerID: r.workerID,
|
|
})
|
|
return
|
|
}
|
|
|
|
now := clock.NowUnixNano()
|
|
for i := range steps {
|
|
if steps[i].ID == step.ID {
|
|
steps[i].Status = enum.StepStatusRunning
|
|
steps[i].StartedAt = &now
|
|
steps[i].Message = "running"
|
|
}
|
|
}
|
|
percentage := (completedBefore * 100) / totalSteps
|
|
_, _ = r.jobs.UpdateProgress(ctx, domusecase.UpdateProgressRequest{
|
|
JobID: jobID,
|
|
WorkerID: r.workerID,
|
|
Phase: step.ID,
|
|
Summary: "running step " + step.ID,
|
|
Percentage: percentage,
|
|
Steps: steps,
|
|
})
|
|
|
|
if err := r.runStep(ctx, run, template, step); err != nil {
|
|
if err == errJobCancelled {
|
|
_, _ = r.jobs.AcknowledgeCancel(ctx, domusecase.AcknowledgeCancelRequest{
|
|
JobID: jobID,
|
|
WorkerID: r.workerID,
|
|
})
|
|
return
|
|
}
|
|
for i := range steps {
|
|
if steps[i].ID == step.ID {
|
|
steps[i].Status = enum.StepStatusFailed
|
|
ended := clock.NowUnixNano()
|
|
steps[i].EndedAt = &ended
|
|
steps[i].Message = err.Error()
|
|
}
|
|
}
|
|
_, _ = r.jobs.UpdateProgress(ctx, domusecase.UpdateProgressRequest{
|
|
JobID: jobID, WorkerID: r.workerID, Phase: step.ID, Summary: "step failed", Steps: steps,
|
|
})
|
|
_, _ = r.jobs.FailRun(ctx, domusecase.FailRunRequest{
|
|
JobID: jobID, WorkerID: r.workerID, Error: err.Error(), Phase: step.ID,
|
|
})
|
|
return
|
|
}
|
|
|
|
ended := clock.NowUnixNano()
|
|
for i := range steps {
|
|
if steps[i].ID == step.ID {
|
|
steps[i].Status = enum.StepStatusSucceeded
|
|
steps[i].EndedAt = &ended
|
|
steps[i].Message = "done"
|
|
}
|
|
}
|
|
completedBefore++
|
|
percentage = (completedBefore * 100) / totalSteps
|
|
_, _ = r.jobs.UpdateProgress(ctx, domusecase.UpdateProgressRequest{
|
|
JobID: jobID,
|
|
WorkerID: r.workerID,
|
|
Phase: step.ID,
|
|
Summary: "completed step " + step.ID,
|
|
Percentage: percentage,
|
|
Steps: steps,
|
|
})
|
|
}
|
|
|
|
_, _ = r.jobs.CompleteRun(ctx, domusecase.CompleteRunRequest{
|
|
JobID: jobID,
|
|
WorkerID: r.workerID,
|
|
Result: map[string]any{
|
|
"message": "demo long task completed",
|
|
"steps": len(template.Steps),
|
|
},
|
|
})
|
|
}
|
|
|
|
var errJobCancelled = errors.New("job cancelled")
|
|
|
|
func (r *Runner) runStep(ctx context.Context, run *entity.Run, template *entity.Template, step entity.TemplateStep) error {
|
|
jobID := run.ID.Hex()
|
|
if handler := r.handlers[step.ID]; handler != nil {
|
|
return handler(ctx, StepContext{
|
|
JobID: jobID,
|
|
WorkerID: r.workerID,
|
|
Run: run,
|
|
Template: template,
|
|
Step: step,
|
|
Heartbeat: func(ctx context.Context) error {
|
|
return r.jobs.RefreshRunLock(ctx, jobID, r.workerID, 300)
|
|
},
|
|
})
|
|
}
|
|
ticks := 10
|
|
sleepEach := 500 * time.Millisecond
|
|
if step.ID == "execute" {
|
|
ticks = 20
|
|
}
|
|
|
|
for i := 0; i < ticks; i++ {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
if err := r.jobs.RefreshRunLock(ctx, jobID, r.workerID, 300); err != nil {
|
|
return err
|
|
}
|
|
cancelled, err := r.jobs.IsCancelRequested(ctx, jobID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if cancelled {
|
|
return errJobCancelled
|
|
}
|
|
|
|
time.Sleep(sleepEach)
|
|
}
|
|
return nil
|
|
}
|