package usecase import ( "context" "strings" "haixun-backend/internal/library/clock" app "haixun-backend/internal/library/errors" "haixun-backend/internal/library/errors/code" "haixun-backend/internal/model/job/domain/entity" "haixun-backend/internal/model/job/domain/enum" domrepo "haixun-backend/internal/model/job/domain/repository" domusecase "haixun-backend/internal/model/job/domain/usecase" ) const demoTemplateType = "demo_long_task" type UseCase = domusecase.UseCase type jobUseCase struct { templates domrepo.TemplateRepository runs domrepo.RunRepository schedules domrepo.ScheduleRepository events domrepo.EventRepository queue domrepo.QueueRepository } func NewUseCase( templates domrepo.TemplateRepository, runs domrepo.RunRepository, schedules domrepo.ScheduleRepository, events domrepo.EventRepository, queue domrepo.QueueRepository, ) domusecase.UseCase { return &jobUseCase{ templates: templates, runs: runs, schedules: schedules, events: events, queue: queue, } } func (u *jobUseCase) ListTemplates(ctx context.Context) ([]*entity.Template, error) { return u.templates.List(ctx) } func (u *jobUseCase) GetTemplate(ctx context.Context, templateType string) (*entity.Template, error) { return u.templates.FindByType(ctx, templateType) } func (u *jobUseCase) EnsureDemoTemplate(ctx context.Context) error { _, err := u.templates.Upsert(ctx, demoTemplate()) return err } func demoTemplate() *entity.Template { return &entity.Template{ Type: demoTemplateType, Version: 1, Name: "Demo Long Task", Description: "Demonstrates job progress, cancel, retry, and worker cooperative stop", Enabled: true, Repeatable: true, ConcurrencyPolicy: string(enum.ConcurrencyRejectSameScope), DedupeKeys: []string{"scope_id"}, TimeoutSeconds: 600, CancelPolicy: entity.CancelPolicy{ Supported: true, Mode: "cooperative", GraceSeconds: 30, }, RetryPolicy: entity.RetryPolicy{ MaxAttempts: 2, BackoffSeconds: []int{30, 120}, }, Steps: []entity.TemplateStep{ {ID: "prepare", Name: "Prepare data", WorkerType: string(enum.WorkerTypeGo), TimeoutSeconds: 60, Cancelable: true}, {ID: "execute", Name: "Execute task", WorkerType: string(enum.WorkerTypeGo), TimeoutSeconds: 300, Cancelable: true}, {ID: "finalize", Name: "Finalize result", WorkerType: string(enum.WorkerTypeGo), TimeoutSeconds: 30, Cancelable: false}, }, } } func (u *jobUseCase) CreateRun(ctx context.Context, req domusecase.CreateRunRequest) (*entity.Run, error) { templateType := strings.TrimSpace(req.TemplateType) scope := strings.TrimSpace(req.Scope) scopeID := strings.TrimSpace(req.ScopeID) if templateType == "" || scope == "" || scopeID == "" { return nil, app.For(code.Job).InputMissingRequired("template_type, scope, and scope_id are required") } template, err := u.templates.FindByType(ctx, templateType) if err != nil { return nil, err } if !template.Enabled { return nil, app.For(code.Job).ResInvalidState("job template is disabled") } if err := u.enforceConcurrency(ctx, template, scope, scopeID); err != nil { return nil, err } dedupeKey := buildDedupeKey(template, scope, scopeID, req.Payload) if err := u.enforceDedupe(ctx, template, dedupeKey); err != nil { return nil, err } run := &entity.Run{ TemplateType: template.Type, TemplateVersion: template.Version, Scope: scope, ScopeID: scopeID, Status: enum.RunStatusPending, Phase: firstPhaseFromTemplate(template), WorkerType: workerTypeFromTemplate(template), Payload: req.Payload, Progress: entity.RunProgress{ Summary: "waiting for worker", Percentage: 0, Steps: buildStepProgress(template), }, Attempt: 1, MaxAttempts: maxAttemptsOrDefault(template.RetryPolicy.MaxAttempts), DedupeKey: dedupeKey, } created, err := u.runs.Create(ctx, run) if err != nil { return nil, err } jobID := created.ID.Hex() if err := u.acquireDedupeLease(ctx, template, dedupeKey, jobID); err != nil { created.Status = enum.RunStatusCancelled _, _ = u.runs.Update(ctx, created) return nil, err } return u.enqueueRun(ctx, created) } func (u *jobUseCase) enforceConcurrency(ctx context.Context, template *entity.Template, scope, scopeID string) error { active, err := u.runs.FindActiveByScope(ctx, template.Type, scope, scopeID) if err != nil { return err } if len(active) == 0 { return nil } switch enum.ConcurrencyPolicy(template.ConcurrencyPolicy) { case enum.ConcurrencyAllowParallel: return nil case enum.ConcurrencyRejectSameScope: return app.For(code.Job).ResInvalidState("an active job already exists for this scope") case enum.ConcurrencyReplaceExisting: for _, run := range active { if _, err := u.RequestCancel(ctx, domusecase.CancelRunRequest{ JobID: run.ID.Hex(), Reason: "replaced by new job", }); err != nil { return err } } return nil default: return app.For(code.Job).ResInvalidState("an active job already exists for this scope") } } func (u *jobUseCase) GetRun(ctx context.Context, jobID string) (*entity.Run, error) { return u.runs.FindByID(ctx, jobID) } func (u *jobUseCase) ListRuns(ctx context.Context, scope, scopeID string, page, pageSize int64) ([]*entity.Run, int64, int64, int64, int64, error) { if page <= 0 { page = 1 } if pageSize <= 0 { pageSize = 20 } if pageSize > 200 { pageSize = 200 } offset := (page - 1) * pageSize items, total, err := u.runs.List(ctx, domrepo.RunListFilter{ Scope: scope, ScopeID: scopeID, }, offset, pageSize) if err != nil { return nil, 0, 0, 0, 0, err } totalPages := int64(0) if total > 0 { totalPages = (total + pageSize - 1) / pageSize } return items, total, page, pageSize, totalPages, nil } func (u *jobUseCase) RequestCancel(ctx context.Context, req domusecase.CancelRunRequest) (*entity.Run, error) { jobID := strings.TrimSpace(req.JobID) if jobID == "" { return nil, app.For(code.Job).InputMissingRequired("job id is required") } run, err := u.runs.FindByID(ctx, jobID) if err != nil { return nil, err } if !run.Status.IsCancellable() { return nil, app.For(code.Job).ResInvalidState("job cannot be cancelled in current status") } template, err := u.templates.FindByType(ctx, run.TemplateType) if err != nil { return nil, err } if !template.CancelPolicy.Supported { return nil, app.For(code.Job).ResInvalidState("job template does not support cancel") } reason := strings.TrimSpace(req.Reason) if reason == "" { reason = "user requested cancel" } now := clock.NowUnixNano() fromStatus := string(run.Status) switch run.Status { case enum.RunStatusPending, enum.RunStatusQueued: _ = u.queue.RemoveJob(ctx, run.WorkerType, jobID) run.Status = enum.RunStatusCancelled run.CancelReason = reason run.CompletedAt = &now updated, err := u.runs.UpdateIfStatus(ctx, run, []enum.RunStatus{ enum.RunStatusPending, enum.RunStatusQueued, }) if err != nil { return nil, err } u.releaseDedupe(ctx, updated) _ = u.appendEvent(ctx, jobID, "status_changed", fromStatus, string(enum.RunStatusCancelled), "job cancelled before execution", map[string]any{"reason": reason}) return updated, nil case enum.RunStatusRunning, enum.RunStatusWaitingWorker: run.Status = enum.RunStatusCancelRequested run.CancelRequestedAt = &now run.CancelReason = reason updated, err := u.runs.UpdateIfStatus(ctx, run, []enum.RunStatus{ enum.RunStatusRunning, enum.RunStatusWaitingWorker, }) if err != nil { return nil, err } // Poke running worker via Redis cancel signal. if err := u.queue.SetCancelSignal(ctx, jobID, reason); err != nil { return nil, err } _ = u.appendEvent(ctx, jobID, "cancel_requested", fromStatus, string(enum.RunStatusCancelRequested), "cancel signal sent to worker", map[string]any{"reason": reason}) return updated, nil default: return nil, app.For(code.Job).ResInvalidState("job cannot be cancelled in current status") } } func (u *jobUseCase) RetryRun(ctx context.Context, jobID string) (*entity.Run, error) { run, err := u.runs.FindByID(ctx, jobID) if err != nil { return nil, err } if run.Status != enum.RunStatusFailed && run.Status != enum.RunStatusCancelled && run.Status != enum.RunStatusExpired { return nil, app.For(code.Job).ResInvalidState("only failed, cancelled, or expired jobs can be retried") } if run.Attempt >= run.MaxAttempts { return nil, app.For(code.Job).ResInvalidState("job has reached max attempts") } fromStatus := string(run.Status) template, err := u.templates.FindByType(ctx, run.TemplateType) if err != nil { return nil, err } if err := u.acquireDedupeLease(ctx, template, run.DedupeKey, jobID); err != nil { return nil, err } run.Status = enum.RunStatusPending run.Progress.Steps = prepareStepsForRetry(run.Progress.Steps) run.Phase = firstResumablePhase(run.Progress.Steps) run.Error = "" run.Result = nil run.Attempt++ run.LockedBy = "" run.LockedUntil = nil run.CancelRequestedAt = nil run.CancelReason = "" run.StartedAt = nil run.CompletedAt = nil run.Progress.Summary = "waiting for retry" run.Progress.Percentage = calcProgressPercentage(run.Progress.Steps) run.ScheduledAt = nil updated, err := u.runs.UpdateIfStatus(ctx, run, []enum.RunStatus{ enum.RunStatusFailed, enum.RunStatusCancelled, enum.RunStatusExpired, }) if err != nil { u.releaseDedupe(ctx, run) return nil, err } backoff := retryBackoffSeconds(template, updated.Attempt) if backoff > 0 { scheduled := clock.AddSecondsFromNow(backoff) updated.ScheduledAt = &scheduled updated, err = u.runs.Update(ctx, updated) if err != nil { u.releaseDedupe(ctx, updated) return nil, err } _ = u.appendEvent(ctx, jobID, "retried", fromStatus, string(enum.RunStatusPending), "job scheduled for retry", map[string]any{"attempt": updated.Attempt, "backoff_seconds": backoff}) return updated, nil } updated, err = u.enqueueRun(ctx, updated) if err != nil { u.releaseDedupe(ctx, updated) return nil, err } _ = u.appendEvent(ctx, jobID, "retried", fromStatus, string(enum.RunStatusQueued), "job requeued", map[string]any{"attempt": updated.Attempt}) return updated, nil } func (u *jobUseCase) ClaimNext(ctx context.Context, req domusecase.ClaimNextRequest) (*entity.Run, error) { workerType := strings.TrimSpace(req.WorkerType) workerID := strings.TrimSpace(req.WorkerID) if workerType == "" || workerID == "" { return nil, app.For(code.Job).InputMissingRequired("worker_type and worker_id are required") } for { jobID, err := u.queue.Dequeue(ctx, workerType, 2) if err != nil { return nil, err } if jobID == "" { return nil, nil } run, err := u.runs.FindByID(ctx, jobID) if err != nil { continue } if run.Status.IsTerminal() || run.Status == enum.RunStatusCancelRequested { continue } ok, err := u.queue.TryLock(ctx, jobID, workerID, 300) if err != nil { return nil, err } if !ok { _ = u.queue.Enqueue(ctx, workerType, jobID) continue } cancelled, _, err := u.queue.GetCancelSignal(ctx, jobID) if err != nil { _ = u.queue.ReleaseLock(ctx, jobID, workerID) return nil, err } if cancelled || run.Status == enum.RunStatusCancelRequested { _ = u.queue.ReleaseLock(ctx, jobID, workerID) continue } now := clock.NowUnixNano() fromStatus := string(run.Status) lockUntil := now + clock.SecondsToNanos(300) run.Status = enum.RunStatusRunning run.LockedBy = workerID run.LockedUntil = &lockUntil run.StartedAt = &now run.Progress.Summary = "worker claimed job" updated, err := u.runs.UpdateIfStatus(ctx, run, []enum.RunStatus{ enum.RunStatusPending, enum.RunStatusQueued, }) if err != nil { _ = u.queue.ReleaseLock(ctx, jobID, workerID) return nil, err } _ = u.appendEvent(ctx, jobID, "status_changed", fromStatus, string(enum.RunStatusRunning), "worker claimed job", map[string]any{"worker_id": workerID}) return updated, nil } } func (u *jobUseCase) RefreshRunLock(ctx context.Context, jobID, workerID string, ttlSeconds int) error { jobID = strings.TrimSpace(jobID) workerID = strings.TrimSpace(workerID) if jobID == "" || workerID == "" { return app.For(code.Job).InputMissingRequired("job id and worker id are required") } return u.queue.RefreshLock(ctx, jobID, workerID, ttlSeconds) } func (u *jobUseCase) IsCancelRequested(ctx context.Context, jobID string) (bool, error) { run, err := u.runs.FindByID(ctx, jobID) if err != nil { return false, err } if run.Status == enum.RunStatusCancelRequested || run.Status == enum.RunStatusCancelled { return true, nil } requested, _, err := u.queue.GetCancelSignal(ctx, jobID) return requested, err } func (u *jobUseCase) AcknowledgeCancel(ctx context.Context, req domusecase.AcknowledgeCancelRequest) (*entity.Run, error) { jobID := strings.TrimSpace(req.JobID) workerID := strings.TrimSpace(req.WorkerID) if jobID == "" || workerID == "" { return nil, app.For(code.Job).InputMissingRequired("job id and worker id are required") } run, err := u.runs.FindByID(ctx, jobID) if err != nil { return nil, err } if run.LockedBy != "" && run.LockedBy != workerID { return nil, app.For(code.Job).ResInvalidState("job is locked by another worker") } now := clock.NowUnixNano() fromStatus := string(run.Status) run.Status = enum.RunStatusCancelled run.CompletedAt = &now run.LockedBy = "" run.LockedUntil = nil run.Progress.Summary = "job cancelled" for i := range run.Progress.Steps { if run.Progress.Steps[i].Status == enum.StepStatusRunning { run.Progress.Steps[i].Status = enum.StepStatusCancelled run.Progress.Steps[i].EndedAt = &now } } updated, err := u.runs.UpdateIfLocked(ctx, run, workerID, []enum.RunStatus{ enum.RunStatusRunning, enum.RunStatusCancelRequested, }) if err != nil { return nil, err } _ = u.queue.ReleaseLock(ctx, jobID, workerID) _ = u.queue.ClearCancelSignal(ctx, jobID) u.releaseDedupe(ctx, updated) _ = u.appendEvent(ctx, jobID, "status_changed", fromStatus, string(enum.RunStatusCancelled), "worker acknowledged cancel", map[string]any{"worker_id": workerID}) return updated, nil } func (u *jobUseCase) UpdateProgress(ctx context.Context, req domusecase.UpdateProgressRequest) (*entity.Run, error) { run, err := u.runs.FindByID(ctx, req.JobID) if err != nil { return nil, err } if req.Phase != "" { run.Phase = req.Phase } if req.Summary != "" { run.Progress.Summary = req.Summary } if req.Percentage >= 0 { run.Progress.Percentage = req.Percentage } if len(req.Steps) > 0 { run.Progress.Steps = req.Steps } return u.runs.UpdateIfLocked(ctx, run, req.WorkerID, []enum.RunStatus{enum.RunStatusRunning}) } func (u *jobUseCase) CompleteRun(ctx context.Context, req domusecase.CompleteRunRequest) (*entity.Run, error) { run, err := u.runs.FindByID(ctx, req.JobID) if err != nil { return nil, err } workerID := strings.TrimSpace(req.WorkerID) if workerID == "" { workerID = run.LockedBy } now := clock.NowUnixNano() fromStatus := string(run.Status) run.Status = enum.RunStatusSucceeded run.Result = req.Result run.CompletedAt = &now run.Progress.Summary = "job completed" run.Progress.Percentage = 100 run.LockedBy = "" run.LockedUntil = nil updated, err := u.runs.UpdateIfLocked(ctx, run, workerID, []enum.RunStatus{enum.RunStatusRunning}) if err != nil { return nil, err } _ = u.queue.ReleaseLock(ctx, req.JobID, workerID) _ = u.queue.ClearCancelSignal(ctx, req.JobID) u.releaseDedupe(ctx, updated) _ = u.appendEvent(ctx, req.JobID, "status_changed", fromStatus, string(enum.RunStatusSucceeded), "job completed", nil) return updated, nil } func (u *jobUseCase) FailRun(ctx context.Context, req domusecase.FailRunRequest) (*entity.Run, error) { run, err := u.runs.FindByID(ctx, req.JobID) if err != nil { return nil, err } workerID := strings.TrimSpace(req.WorkerID) if workerID == "" { workerID = run.LockedBy } now := clock.NowUnixNano() fromStatus := string(run.Status) run.Status = enum.RunStatusFailed run.Error = req.Error if req.Phase != "" { run.Phase = req.Phase } run.CompletedAt = &now run.Progress.Summary = "job failed" run.Progress.Percentage = calcProgressPercentage(run.Progress.Steps) run.LockedBy = "" run.LockedUntil = nil updated, err := u.runs.UpdateIfLocked(ctx, run, workerID, []enum.RunStatus{ enum.RunStatusRunning, enum.RunStatusCancelRequested, }) if err != nil { return nil, err } _ = u.queue.ReleaseLock(ctx, req.JobID, workerID) _ = u.queue.ClearCancelSignal(ctx, req.JobID) u.releaseDedupe(ctx, updated) _ = u.appendEvent(ctx, req.JobID, "status_changed", fromStatus, string(enum.RunStatusFailed), req.Error, nil) return updated, nil } func maxAttemptsOrDefault(maxAttempts int) int { if maxAttempts <= 0 { return 1 } return maxAttempts }