816 lines
25 KiB
Go
816 lines
25 KiB
Go
|
|
package usecase
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"strings"
|
||
|
|
|
||
|
|
"haixun-backend/internal/library/clock"
|
||
|
|
app "haixun-backend/internal/library/errors"
|
||
|
|
"haixun-backend/internal/library/errors/code"
|
||
|
|
"haixun-backend/internal/model/job/domain/entity"
|
||
|
|
"haixun-backend/internal/model/job/domain/enum"
|
||
|
|
domrepo "haixun-backend/internal/model/job/domain/repository"
|
||
|
|
domusecase "haixun-backend/internal/model/job/domain/usecase"
|
||
|
|
)
|
||
|
|
|
||
|
|
const (
|
||
|
|
demoTemplateType = "demo_long_task"
|
||
|
|
style8DTemplateType = "style-8d"
|
||
|
|
expandGraphTemplateType = "expand-graph"
|
||
|
|
placementScanTemplateType = "placement-scan"
|
||
|
|
scanViralTemplateType = "scan-viral"
|
||
|
|
analyzeCopyMissionTemplateType = "analyze-copy-mission"
|
||
|
|
generateCopyMatrixTemplateType = "generate-copy-matrix"
|
||
|
|
generateCopyDraftTemplateType = "generate-copy-draft"
|
||
|
|
style8DWorkerType = "node"
|
||
|
|
)
|
||
|
|
|
||
|
|
type UseCase = domusecase.UseCase
|
||
|
|
|
||
|
|
type jobUseCase struct {
|
||
|
|
templates domrepo.TemplateRepository
|
||
|
|
runs domrepo.RunRepository
|
||
|
|
schedules domrepo.ScheduleRepository
|
||
|
|
events domrepo.EventRepository
|
||
|
|
queue domrepo.QueueRepository
|
||
|
|
}
|
||
|
|
|
||
|
|
func NewUseCase(
|
||
|
|
templates domrepo.TemplateRepository,
|
||
|
|
runs domrepo.RunRepository,
|
||
|
|
schedules domrepo.ScheduleRepository,
|
||
|
|
events domrepo.EventRepository,
|
||
|
|
queue domrepo.QueueRepository,
|
||
|
|
) domusecase.UseCase {
|
||
|
|
return &jobUseCase{
|
||
|
|
templates: templates,
|
||
|
|
runs: runs,
|
||
|
|
schedules: schedules,
|
||
|
|
events: events,
|
||
|
|
queue: queue,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) ListTemplates(ctx context.Context) ([]*entity.Template, error) {
|
||
|
|
return u.templates.List(ctx)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) GetTemplate(ctx context.Context, templateType string) (*entity.Template, error) {
|
||
|
|
return u.templates.FindByType(ctx, templateType)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) EnsureDemoTemplate(ctx context.Context) error {
|
||
|
|
_, err := u.templates.Upsert(ctx, demoTemplate())
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) EnsureStyle8DTemplate(ctx context.Context) error {
|
||
|
|
_, err := u.templates.Upsert(ctx, style8DTemplate())
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) EnsureExpandGraphTemplate(ctx context.Context) error {
|
||
|
|
_, err := u.templates.Upsert(ctx, expandGraphTemplate())
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) EnsurePlacementScanTemplate(ctx context.Context) error {
|
||
|
|
_, err := u.templates.Upsert(ctx, placementScanTemplate())
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) EnsureScanViralTemplate(ctx context.Context) error {
|
||
|
|
_, err := u.templates.Upsert(ctx, scanViralTemplate())
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) EnsureAnalyzeCopyMissionTemplate(ctx context.Context) error {
|
||
|
|
_, err := u.templates.Upsert(ctx, analyzeCopyMissionTemplate())
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) EnsureGenerateCopyMatrixTemplate(ctx context.Context) error {
|
||
|
|
_, err := u.templates.Upsert(ctx, generateCopyMatrixTemplate())
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) EnsureGenerateCopyDraftTemplate(ctx context.Context) error {
|
||
|
|
_, err := u.templates.Upsert(ctx, generateCopyDraftTemplate())
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
func analyzeCopyMissionTemplate() *entity.Template {
|
||
|
|
return &entity.Template{
|
||
|
|
Type: analyzeCopyMissionTemplateType,
|
||
|
|
Version: 1,
|
||
|
|
Name: "Analyze Copy Mission",
|
||
|
|
Description: "LLM research map and default search tags for a copy ninja mission",
|
||
|
|
Enabled: true,
|
||
|
|
Repeatable: true,
|
||
|
|
ConcurrencyPolicy: string(enum.ConcurrencyRejectSameScope),
|
||
|
|
DedupeKeys: []string{"scope_id"},
|
||
|
|
TimeoutSeconds: 300,
|
||
|
|
CancelPolicy: entity.CancelPolicy{
|
||
|
|
Supported: true,
|
||
|
|
Mode: "cooperative",
|
||
|
|
GraceSeconds: 30,
|
||
|
|
},
|
||
|
|
RetryPolicy: entity.RetryPolicy{
|
||
|
|
MaxAttempts: 1,
|
||
|
|
BackoffSeconds: []int{},
|
||
|
|
},
|
||
|
|
Steps: []entity.TemplateStep{
|
||
|
|
{ID: "copy_mission_map", Name: "Copy mission research map", WorkerType: string(enum.WorkerTypeGo), TimeoutSeconds: 300, Cancelable: true},
|
||
|
|
},
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func generateCopyMatrixTemplate() *entity.Template {
|
||
|
|
return &entity.Template{
|
||
|
|
Type: generateCopyMatrixTemplateType,
|
||
|
|
Version: 1,
|
||
|
|
Name: "Generate Copy Matrix",
|
||
|
|
Description: "LLM content matrix drafts for a copy ninja mission",
|
||
|
|
Enabled: true,
|
||
|
|
Repeatable: true,
|
||
|
|
ConcurrencyPolicy: string(enum.ConcurrencyRejectSameScope),
|
||
|
|
DedupeKeys: []string{"scope_id"},
|
||
|
|
TimeoutSeconds: 600,
|
||
|
|
CancelPolicy: entity.CancelPolicy{
|
||
|
|
Supported: true,
|
||
|
|
Mode: "cooperative",
|
||
|
|
GraceSeconds: 30,
|
||
|
|
},
|
||
|
|
RetryPolicy: entity.RetryPolicy{
|
||
|
|
MaxAttempts: 1,
|
||
|
|
BackoffSeconds: []int{},
|
||
|
|
},
|
||
|
|
Steps: []entity.TemplateStep{
|
||
|
|
{ID: "copy_matrix_generate", Name: "Copy matrix generate", WorkerType: string(enum.WorkerTypeGo), TimeoutSeconds: 600, Cancelable: true},
|
||
|
|
},
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func generateCopyDraftTemplate() *entity.Template {
|
||
|
|
return &entity.Template{
|
||
|
|
Type: generateCopyDraftTemplateType,
|
||
|
|
Version: 1,
|
||
|
|
Name: "Generate Copy Draft",
|
||
|
|
Description: "Deep replicate draft from a viral scan post",
|
||
|
|
Enabled: true,
|
||
|
|
Repeatable: true,
|
||
|
|
ConcurrencyPolicy: string(enum.ConcurrencyAllowParallel),
|
||
|
|
DedupeKeys: []string{"scan_post_id"},
|
||
|
|
TimeoutSeconds: 600,
|
||
|
|
CancelPolicy: entity.CancelPolicy{
|
||
|
|
Supported: true,
|
||
|
|
Mode: "cooperative",
|
||
|
|
GraceSeconds: 30,
|
||
|
|
},
|
||
|
|
RetryPolicy: entity.RetryPolicy{
|
||
|
|
MaxAttempts: 1,
|
||
|
|
BackoffSeconds: []int{},
|
||
|
|
},
|
||
|
|
Steps: []entity.TemplateStep{
|
||
|
|
{ID: "copy_draft_generate", Name: "Copy draft generate", WorkerType: string(enum.WorkerTypeGo), TimeoutSeconds: 600, Cancelable: true},
|
||
|
|
},
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func scanViralTemplate() *entity.Template {
|
||
|
|
return &entity.Template{
|
||
|
|
Type: scanViralTemplateType,
|
||
|
|
Version: 1,
|
||
|
|
Name: "Viral Threads Scan",
|
||
|
|
Description: "Keyword crawl for copy-ninja viral candidates (Flow A)",
|
||
|
|
Enabled: true,
|
||
|
|
Repeatable: true,
|
||
|
|
ConcurrencyPolicy: string(enum.ConcurrencyRejectSameScope),
|
||
|
|
DedupeKeys: []string{"scope_id"},
|
||
|
|
TimeoutSeconds: 600,
|
||
|
|
CancelPolicy: entity.CancelPolicy{
|
||
|
|
Supported: true,
|
||
|
|
Mode: "cooperative",
|
||
|
|
GraceSeconds: 30,
|
||
|
|
},
|
||
|
|
RetryPolicy: entity.RetryPolicy{
|
||
|
|
MaxAttempts: 1,
|
||
|
|
BackoffSeconds: []int{},
|
||
|
|
},
|
||
|
|
Steps: []entity.TemplateStep{
|
||
|
|
{ID: "viral_crawl", Name: "Viral keyword crawl", WorkerType: string(enum.WorkerTypeGo), TimeoutSeconds: 600, Cancelable: true},
|
||
|
|
},
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func placementScanTemplate() *entity.Template {
|
||
|
|
return &entity.Template{
|
||
|
|
Type: placementScanTemplateType,
|
||
|
|
Version: 1,
|
||
|
|
Name: "Placement Dual-Track Scan",
|
||
|
|
Description: "Brave/Threads API dual-track crawl for selected knowledge graph tags",
|
||
|
|
Enabled: true,
|
||
|
|
Repeatable: true,
|
||
|
|
ConcurrencyPolicy: string(enum.ConcurrencyRejectSameScope),
|
||
|
|
DedupeKeys: []string{"scope_id"},
|
||
|
|
TimeoutSeconds: 7200,
|
||
|
|
CancelPolicy: entity.CancelPolicy{
|
||
|
|
Supported: true,
|
||
|
|
Mode: "cooperative",
|
||
|
|
GraceSeconds: 30,
|
||
|
|
},
|
||
|
|
RetryPolicy: entity.RetryPolicy{
|
||
|
|
MaxAttempts: 1,
|
||
|
|
BackoffSeconds: []int{},
|
||
|
|
},
|
||
|
|
Steps: []entity.TemplateStep{
|
||
|
|
{ID: "crawl", Name: "Dual-track crawl", WorkerType: string(enum.WorkerTypeGo), TimeoutSeconds: 7200, Cancelable: true},
|
||
|
|
},
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func expandGraphTemplate() *entity.Template {
|
||
|
|
return &entity.Template{
|
||
|
|
Type: expandGraphTemplateType,
|
||
|
|
Version: 1,
|
||
|
|
Name: "Topic Knowledge Graph Expand",
|
||
|
|
Description: "Brave knowledge_expand + AI synthesis for placement research tags",
|
||
|
|
Enabled: true,
|
||
|
|
Repeatable: true,
|
||
|
|
ConcurrencyPolicy: string(enum.ConcurrencyRejectSameScope),
|
||
|
|
DedupeKeys: []string{"scope_id", "seed_query"},
|
||
|
|
TimeoutSeconds: 600,
|
||
|
|
CancelPolicy: entity.CancelPolicy{
|
||
|
|
Supported: true,
|
||
|
|
Mode: "cooperative",
|
||
|
|
GraceSeconds: 30,
|
||
|
|
},
|
||
|
|
RetryPolicy: entity.RetryPolicy{
|
||
|
|
MaxAttempts: 1,
|
||
|
|
BackoffSeconds: []int{},
|
||
|
|
},
|
||
|
|
Steps: []entity.TemplateStep{
|
||
|
|
{ID: "expand", Name: "Expand knowledge graph", WorkerType: string(enum.WorkerTypeGo), TimeoutSeconds: 600, Cancelable: true},
|
||
|
|
},
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func style8DTemplate() *entity.Template {
|
||
|
|
return &entity.Template{
|
||
|
|
Type: style8DTemplateType,
|
||
|
|
Version: 1,
|
||
|
|
Name: "Threads 8D Style Analysis",
|
||
|
|
Description: "Scrape benchmark posts and analyze D1-D8 style profile for a Threads account",
|
||
|
|
Enabled: true,
|
||
|
|
Repeatable: true,
|
||
|
|
ConcurrencyPolicy: string(enum.ConcurrencyRejectSameScope),
|
||
|
|
DedupeKeys: []string{"scope_id", "benchmark_username"},
|
||
|
|
TimeoutSeconds: 480,
|
||
|
|
CancelPolicy: entity.CancelPolicy{
|
||
|
|
Supported: true,
|
||
|
|
Mode: "cooperative",
|
||
|
|
GraceSeconds: 30,
|
||
|
|
},
|
||
|
|
RetryPolicy: entity.RetryPolicy{
|
||
|
|
MaxAttempts: 1,
|
||
|
|
BackoffSeconds: []int{},
|
||
|
|
},
|
||
|
|
Steps: []entity.TemplateStep{
|
||
|
|
{ID: "session", Name: "Confirm Threads connection", WorkerType: style8DWorkerType, TimeoutSeconds: 60, Cancelable: true},
|
||
|
|
{ID: "samples", Name: "Fetch recent posts", WorkerType: style8DWorkerType, TimeoutSeconds: 180, Cancelable: true},
|
||
|
|
{ID: "style", Name: "AI 8D analysis", WorkerType: string(enum.WorkerTypeGo), TimeoutSeconds: 240, Cancelable: true},
|
||
|
|
{ID: "store", Name: "Save persona strategy", WorkerType: string(enum.WorkerTypeGo), TimeoutSeconds: 60, Cancelable: false},
|
||
|
|
},
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func demoTemplate() *entity.Template {
|
||
|
|
return &entity.Template{
|
||
|
|
Type: demoTemplateType,
|
||
|
|
Version: 1,
|
||
|
|
Name: "Demo Long Task",
|
||
|
|
Description: "Demonstrates job progress, cancel, retry, and worker cooperative stop",
|
||
|
|
Enabled: true,
|
||
|
|
Repeatable: true,
|
||
|
|
ConcurrencyPolicy: string(enum.ConcurrencyRejectSameScope),
|
||
|
|
DedupeKeys: []string{"scope_id"},
|
||
|
|
TimeoutSeconds: 600,
|
||
|
|
CancelPolicy: entity.CancelPolicy{
|
||
|
|
Supported: true,
|
||
|
|
Mode: "cooperative",
|
||
|
|
GraceSeconds: 30,
|
||
|
|
},
|
||
|
|
RetryPolicy: entity.RetryPolicy{
|
||
|
|
MaxAttempts: 2,
|
||
|
|
BackoffSeconds: []int{30, 120},
|
||
|
|
},
|
||
|
|
Steps: []entity.TemplateStep{
|
||
|
|
{ID: "prepare", Name: "Prepare data", WorkerType: string(enum.WorkerTypeGo), TimeoutSeconds: 60, Cancelable: true},
|
||
|
|
{ID: "execute", Name: "Execute task", WorkerType: string(enum.WorkerTypeGo), TimeoutSeconds: 300, Cancelable: true},
|
||
|
|
{ID: "finalize", Name: "Finalize result", WorkerType: string(enum.WorkerTypeGo), TimeoutSeconds: 30, Cancelable: false},
|
||
|
|
},
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) CreateRun(ctx context.Context, req domusecase.CreateRunRequest) (*entity.Run, error) {
|
||
|
|
templateType := strings.TrimSpace(req.TemplateType)
|
||
|
|
scope := strings.TrimSpace(req.Scope)
|
||
|
|
scopeID := strings.TrimSpace(req.ScopeID)
|
||
|
|
if templateType == "" || scope == "" || scopeID == "" {
|
||
|
|
return nil, app.For(code.Job).InputMissingRequired("template_type, scope, and scope_id are required")
|
||
|
|
}
|
||
|
|
|
||
|
|
template, err := u.templates.FindByType(ctx, templateType)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
if !template.Enabled {
|
||
|
|
return nil, app.For(code.Job).ResInvalidState("job template is disabled")
|
||
|
|
}
|
||
|
|
|
||
|
|
if err := u.enforceConcurrency(ctx, template, scope, scopeID); err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
|
||
|
|
dedupeKey := buildDedupeKey(template, scope, scopeID, req.Payload)
|
||
|
|
if err := u.enforceDedupe(ctx, template, dedupeKey); err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
|
||
|
|
tenantID, ownerUID := extractRunActor(req)
|
||
|
|
run := &entity.Run{
|
||
|
|
TenantID: tenantID,
|
||
|
|
OwnerUID: ownerUID,
|
||
|
|
TemplateType: template.Type,
|
||
|
|
TemplateVersion: template.Version,
|
||
|
|
Scope: scope,
|
||
|
|
ScopeID: scopeID,
|
||
|
|
Status: enum.RunStatusPending,
|
||
|
|
Phase: firstPhaseFromTemplate(template),
|
||
|
|
WorkerType: workerTypeFromTemplate(template),
|
||
|
|
Payload: req.Payload,
|
||
|
|
Progress: entity.RunProgress{
|
||
|
|
Summary: "waiting for worker",
|
||
|
|
Percentage: 0,
|
||
|
|
Steps: buildStepProgress(template),
|
||
|
|
},
|
||
|
|
Attempt: 1,
|
||
|
|
MaxAttempts: maxAttemptsOrDefault(template.RetryPolicy.MaxAttempts),
|
||
|
|
DedupeKey: dedupeKey,
|
||
|
|
}
|
||
|
|
|
||
|
|
created, err := u.runs.Create(ctx, run)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
jobID := created.ID.Hex()
|
||
|
|
if err := u.acquireDedupeLease(ctx, template, dedupeKey, jobID); err != nil {
|
||
|
|
created.Status = enum.RunStatusCancelled
|
||
|
|
_, _ = u.runs.Update(ctx, created)
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
|
||
|
|
return u.enqueueRun(ctx, created)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) enforceConcurrency(ctx context.Context, template *entity.Template, scope, scopeID string) error {
|
||
|
|
active, err := u.runs.FindActiveByScope(ctx, template.Type, scope, scopeID)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
if len(active) == 0 {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
switch enum.ConcurrencyPolicy(template.ConcurrencyPolicy) {
|
||
|
|
case enum.ConcurrencyAllowParallel:
|
||
|
|
return nil
|
||
|
|
case enum.ConcurrencyRejectSameScope:
|
||
|
|
return app.For(code.Job).ResInvalidState("此範圍已有進行中的任務,請等待任務結束(含取消完成)後再試")
|
||
|
|
case enum.ConcurrencyReplaceExisting:
|
||
|
|
for _, run := range active {
|
||
|
|
if _, err := u.RequestCancel(ctx, domusecase.CancelRunRequest{
|
||
|
|
JobID: run.ID.Hex(),
|
||
|
|
Reason: "replaced by new job",
|
||
|
|
}); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
default:
|
||
|
|
return app.For(code.Job).ResInvalidState("此範圍已有進行中的任務,請等待任務結束(含取消完成)後再試")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) GetRun(ctx context.Context, jobID string) (*entity.Run, error) {
|
||
|
|
return u.runs.FindByID(ctx, jobID)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) ListRuns(ctx context.Context, filter domrepo.RunListFilter, page, pageSize int64) ([]*entity.Run, int64, int64, int64, int64, error) {
|
||
|
|
if page <= 0 {
|
||
|
|
page = 1
|
||
|
|
}
|
||
|
|
if pageSize <= 0 {
|
||
|
|
pageSize = 20
|
||
|
|
}
|
||
|
|
if pageSize > 200 {
|
||
|
|
pageSize = 200
|
||
|
|
}
|
||
|
|
offset := (page - 1) * pageSize
|
||
|
|
items, total, err := u.runs.List(ctx, filter, offset, pageSize)
|
||
|
|
if err != nil {
|
||
|
|
return nil, 0, 0, 0, 0, err
|
||
|
|
}
|
||
|
|
totalPages := int64(0)
|
||
|
|
if total > 0 {
|
||
|
|
totalPages = (total + pageSize - 1) / pageSize
|
||
|
|
}
|
||
|
|
return items, total, page, pageSize, totalPages, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) RequestCancel(ctx context.Context, req domusecase.CancelRunRequest) (*entity.Run, error) {
|
||
|
|
jobID := strings.TrimSpace(req.JobID)
|
||
|
|
if jobID == "" {
|
||
|
|
return nil, app.For(code.Job).InputMissingRequired("job id is required")
|
||
|
|
}
|
||
|
|
|
||
|
|
run, err := u.runs.FindByID(ctx, jobID)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
if !run.Status.IsCancellable() {
|
||
|
|
return nil, app.For(code.Job).ResInvalidState("job cannot be cancelled in current status")
|
||
|
|
}
|
||
|
|
|
||
|
|
template, err := u.templates.FindByType(ctx, run.TemplateType)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
if !template.CancelPolicy.Supported {
|
||
|
|
return nil, app.For(code.Job).ResInvalidState("job template does not support cancel")
|
||
|
|
}
|
||
|
|
|
||
|
|
reason := strings.TrimSpace(req.Reason)
|
||
|
|
if reason == "" {
|
||
|
|
reason = "user requested cancel"
|
||
|
|
}
|
||
|
|
now := clock.NowUnixNano()
|
||
|
|
fromStatus := string(run.Status)
|
||
|
|
|
||
|
|
switch run.Status {
|
||
|
|
case enum.RunStatusPending, enum.RunStatusQueued:
|
||
|
|
_ = u.queue.RemoveJob(ctx, run.WorkerType, jobID)
|
||
|
|
run.Status = enum.RunStatusCancelled
|
||
|
|
run.CancelReason = reason
|
||
|
|
run.CompletedAt = &now
|
||
|
|
updated, err := u.runs.UpdateIfStatus(ctx, run, []enum.RunStatus{
|
||
|
|
enum.RunStatusPending,
|
||
|
|
enum.RunStatusQueued,
|
||
|
|
})
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
u.releaseDedupe(ctx, updated)
|
||
|
|
_ = u.appendEvent(ctx, jobID, "status_changed", fromStatus, string(enum.RunStatusCancelled), "job cancelled before execution", map[string]any{"reason": reason})
|
||
|
|
return updated, nil
|
||
|
|
|
||
|
|
case enum.RunStatusRunning, enum.RunStatusWaitingWorker:
|
||
|
|
run.Status = enum.RunStatusCancelRequested
|
||
|
|
run.CancelRequestedAt = &now
|
||
|
|
run.CancelReason = reason
|
||
|
|
updated, err := u.runs.UpdateIfStatus(ctx, run, []enum.RunStatus{
|
||
|
|
enum.RunStatusRunning,
|
||
|
|
enum.RunStatusWaitingWorker,
|
||
|
|
})
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
// Poke running worker via Redis cancel signal.
|
||
|
|
if err := u.queue.SetCancelSignal(ctx, jobID, reason); err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
_ = u.appendEvent(ctx, jobID, "cancel_requested", fromStatus, string(enum.RunStatusCancelRequested), "cancel signal sent to worker", map[string]any{"reason": reason})
|
||
|
|
return updated, nil
|
||
|
|
|
||
|
|
default:
|
||
|
|
return nil, app.For(code.Job).ResInvalidState("job cannot be cancelled in current status")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) RetryRun(ctx context.Context, jobID string) (*entity.Run, error) {
|
||
|
|
run, err := u.runs.FindByID(ctx, jobID)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
if run.Status != enum.RunStatusFailed &&
|
||
|
|
run.Status != enum.RunStatusCancelled &&
|
||
|
|
run.Status != enum.RunStatusExpired {
|
||
|
|
return nil, app.For(code.Job).ResInvalidState("only failed, cancelled, or expired jobs can be retried")
|
||
|
|
}
|
||
|
|
if run.Attempt >= run.MaxAttempts {
|
||
|
|
return nil, app.For(code.Job).ResInvalidState("job has reached max attempts")
|
||
|
|
}
|
||
|
|
|
||
|
|
fromStatus := string(run.Status)
|
||
|
|
template, err := u.templates.FindByType(ctx, run.TemplateType)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
if err := u.acquireDedupeLease(ctx, template, run.DedupeKey, jobID); err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
|
||
|
|
run.Status = enum.RunStatusPending
|
||
|
|
run.Progress.Steps = prepareStepsForRetry(run.Progress.Steps)
|
||
|
|
run.Phase = firstResumablePhase(run.Progress.Steps)
|
||
|
|
run.Error = ""
|
||
|
|
run.Result = nil
|
||
|
|
run.Attempt++
|
||
|
|
run.LockedBy = ""
|
||
|
|
run.LockedUntil = nil
|
||
|
|
run.CancelRequestedAt = nil
|
||
|
|
run.CancelReason = ""
|
||
|
|
run.StartedAt = nil
|
||
|
|
run.CompletedAt = nil
|
||
|
|
run.Progress.Summary = "waiting for retry"
|
||
|
|
run.Progress.Percentage = calcProgressPercentage(run.Progress.Steps)
|
||
|
|
|
||
|
|
run.ScheduledAt = nil
|
||
|
|
updated, err := u.runs.UpdateIfStatus(ctx, run, []enum.RunStatus{
|
||
|
|
enum.RunStatusFailed,
|
||
|
|
enum.RunStatusCancelled,
|
||
|
|
enum.RunStatusExpired,
|
||
|
|
})
|
||
|
|
if err != nil {
|
||
|
|
u.releaseDedupe(ctx, run)
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
|
||
|
|
backoff := retryBackoffSeconds(template, updated.Attempt)
|
||
|
|
if backoff > 0 {
|
||
|
|
scheduled := clock.AddSecondsFromNow(backoff)
|
||
|
|
updated.ScheduledAt = &scheduled
|
||
|
|
updated, err = u.runs.Update(ctx, updated)
|
||
|
|
if err != nil {
|
||
|
|
u.releaseDedupe(ctx, updated)
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
_ = u.appendEvent(ctx, jobID, "retried", fromStatus, string(enum.RunStatusPending), "job scheduled for retry", map[string]any{"attempt": updated.Attempt, "backoff_seconds": backoff})
|
||
|
|
return updated, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
updated, err = u.enqueueRun(ctx, updated)
|
||
|
|
if err != nil {
|
||
|
|
u.releaseDedupe(ctx, updated)
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
_ = u.appendEvent(ctx, jobID, "retried", fromStatus, string(enum.RunStatusQueued), "job requeued", map[string]any{"attempt": updated.Attempt})
|
||
|
|
return updated, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) ClaimNext(ctx context.Context, req domusecase.ClaimNextRequest) (*entity.Run, error) {
|
||
|
|
workerType := strings.TrimSpace(req.WorkerType)
|
||
|
|
workerID := strings.TrimSpace(req.WorkerID)
|
||
|
|
if workerType == "" || workerID == "" {
|
||
|
|
return nil, app.For(code.Job).InputMissingRequired("worker_type and worker_id are required")
|
||
|
|
}
|
||
|
|
|
||
|
|
for {
|
||
|
|
jobID, err := u.queue.Dequeue(ctx, workerType, 2)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
if jobID == "" {
|
||
|
|
return nil, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
run, err := u.runs.FindByID(ctx, jobID)
|
||
|
|
if err != nil {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
if run.Status.IsTerminal() || run.Status == enum.RunStatusCancelRequested {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
|
||
|
|
ok, err := u.queue.TryLock(ctx, jobID, workerID, 600)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
if !ok {
|
||
|
|
_ = u.queue.Enqueue(ctx, workerType, jobID)
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
|
||
|
|
cancelled, _, err := u.queue.GetCancelSignal(ctx, jobID)
|
||
|
|
if err != nil {
|
||
|
|
_ = u.queue.ReleaseLock(ctx, jobID, workerID)
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
if cancelled || run.Status == enum.RunStatusCancelRequested {
|
||
|
|
_ = u.queue.ReleaseLock(ctx, jobID, workerID)
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
|
||
|
|
now := clock.NowUnixNano()
|
||
|
|
fromStatus := string(run.Status)
|
||
|
|
lockUntil := now + clock.SecondsToNanos(600)
|
||
|
|
run.Status = enum.RunStatusRunning
|
||
|
|
run.LockedBy = workerID
|
||
|
|
run.LockedUntil = &lockUntil
|
||
|
|
run.StartedAt = &now
|
||
|
|
run.Progress.Summary = "worker claimed job"
|
||
|
|
|
||
|
|
updated, err := u.runs.UpdateIfStatus(ctx, run, []enum.RunStatus{
|
||
|
|
enum.RunStatusPending,
|
||
|
|
enum.RunStatusQueued,
|
||
|
|
})
|
||
|
|
if err != nil {
|
||
|
|
_ = u.queue.ReleaseLock(ctx, jobID, workerID)
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
_ = u.appendEvent(ctx, jobID, "status_changed", fromStatus, string(enum.RunStatusRunning), "worker claimed job", map[string]any{"worker_id": workerID})
|
||
|
|
return updated, nil
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) RefreshRunLock(ctx context.Context, jobID, workerID string, ttlSeconds int) error {
|
||
|
|
jobID = strings.TrimSpace(jobID)
|
||
|
|
workerID = strings.TrimSpace(workerID)
|
||
|
|
if jobID == "" || workerID == "" {
|
||
|
|
return app.For(code.Job).InputMissingRequired("job id and worker id are required")
|
||
|
|
}
|
||
|
|
if ttlSeconds <= 0 {
|
||
|
|
ttlSeconds = 300
|
||
|
|
}
|
||
|
|
if err := u.queue.RefreshLock(ctx, jobID, workerID, ttlSeconds); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
run, err := u.runs.FindByID(ctx, jobID)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
if run.LockedBy != workerID {
|
||
|
|
return app.For(code.Job).ResInvalidState("job is locked by another worker")
|
||
|
|
}
|
||
|
|
lockUntil := clock.NowUnixNano() + clock.SecondsToNanos(ttlSeconds)
|
||
|
|
run.LockedUntil = &lockUntil
|
||
|
|
_, err = u.runs.UpdateIfLocked(ctx, run, workerID, []enum.RunStatus{
|
||
|
|
enum.RunStatusRunning,
|
||
|
|
enum.RunStatusWaitingWorker,
|
||
|
|
})
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) IsCancelRequested(ctx context.Context, jobID string) (bool, error) {
|
||
|
|
run, err := u.runs.FindByID(ctx, jobID)
|
||
|
|
if err != nil {
|
||
|
|
return false, err
|
||
|
|
}
|
||
|
|
if run.Status == enum.RunStatusCancelRequested || run.Status == enum.RunStatusCancelled {
|
||
|
|
return true, nil
|
||
|
|
}
|
||
|
|
requested, _, err := u.queue.GetCancelSignal(ctx, jobID)
|
||
|
|
return requested, err
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) AcknowledgeCancel(ctx context.Context, req domusecase.AcknowledgeCancelRequest) (*entity.Run, error) {
|
||
|
|
jobID := strings.TrimSpace(req.JobID)
|
||
|
|
workerID := strings.TrimSpace(req.WorkerID)
|
||
|
|
if jobID == "" || workerID == "" {
|
||
|
|
return nil, app.For(code.Job).InputMissingRequired("job id and worker id are required")
|
||
|
|
}
|
||
|
|
|
||
|
|
run, err := u.runs.FindByID(ctx, jobID)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
if run.LockedBy != "" && run.LockedBy != workerID {
|
||
|
|
return nil, app.For(code.Job).ResInvalidState("job is locked by another worker")
|
||
|
|
}
|
||
|
|
|
||
|
|
now := clock.NowUnixNano()
|
||
|
|
fromStatus := string(run.Status)
|
||
|
|
run.Status = enum.RunStatusCancelled
|
||
|
|
run.CompletedAt = &now
|
||
|
|
run.LockedBy = ""
|
||
|
|
run.LockedUntil = nil
|
||
|
|
run.Progress.Summary = "job cancelled"
|
||
|
|
|
||
|
|
for i := range run.Progress.Steps {
|
||
|
|
if run.Progress.Steps[i].Status == enum.StepStatusRunning {
|
||
|
|
run.Progress.Steps[i].Status = enum.StepStatusCancelled
|
||
|
|
run.Progress.Steps[i].EndedAt = &now
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
updated, err := u.runs.UpdateIfLocked(ctx, run, workerID, []enum.RunStatus{
|
||
|
|
enum.RunStatusRunning,
|
||
|
|
enum.RunStatusCancelRequested,
|
||
|
|
})
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
_ = u.queue.ReleaseLock(ctx, jobID, workerID)
|
||
|
|
_ = u.queue.ClearCancelSignal(ctx, jobID)
|
||
|
|
u.releaseDedupe(ctx, updated)
|
||
|
|
_ = u.appendEvent(ctx, jobID, "status_changed", fromStatus, string(enum.RunStatusCancelled), "worker acknowledged cancel", map[string]any{"worker_id": workerID})
|
||
|
|
return updated, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) UpdateProgress(ctx context.Context, req domusecase.UpdateProgressRequest) (*entity.Run, error) {
|
||
|
|
run, err := u.runs.FindByID(ctx, req.JobID)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
if req.Phase != "" {
|
||
|
|
run.Phase = req.Phase
|
||
|
|
}
|
||
|
|
if req.Summary != "" {
|
||
|
|
run.Progress.Summary = req.Summary
|
||
|
|
}
|
||
|
|
if req.Percentage >= 0 {
|
||
|
|
run.Progress.Percentage = req.Percentage
|
||
|
|
}
|
||
|
|
if len(req.Steps) > 0 {
|
||
|
|
run.Progress.Steps = req.Steps
|
||
|
|
}
|
||
|
|
if strings.TrimSpace(req.WorkerID) != "" && run.LockedBy == req.WorkerID {
|
||
|
|
lockUntil := clock.NowUnixNano() + clock.SecondsToNanos(600)
|
||
|
|
run.LockedUntil = &lockUntil
|
||
|
|
}
|
||
|
|
return u.runs.UpdateIfLocked(ctx, run, req.WorkerID, []enum.RunStatus{enum.RunStatusRunning})
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) CompleteRun(ctx context.Context, req domusecase.CompleteRunRequest) (*entity.Run, error) {
|
||
|
|
run, err := u.runs.FindByID(ctx, req.JobID)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
workerID := strings.TrimSpace(req.WorkerID)
|
||
|
|
if workerID == "" {
|
||
|
|
workerID = run.LockedBy
|
||
|
|
}
|
||
|
|
now := clock.NowUnixNano()
|
||
|
|
fromStatus := string(run.Status)
|
||
|
|
run.Status = enum.RunStatusSucceeded
|
||
|
|
run.Result = req.Result
|
||
|
|
run.CompletedAt = &now
|
||
|
|
run.Progress.Summary = "job completed"
|
||
|
|
run.Progress.Percentage = 100
|
||
|
|
run.LockedBy = ""
|
||
|
|
run.LockedUntil = nil
|
||
|
|
|
||
|
|
updated, err := u.runs.UpdateIfLocked(ctx, run, workerID, []enum.RunStatus{enum.RunStatusRunning})
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
_ = u.queue.ReleaseLock(ctx, req.JobID, workerID)
|
||
|
|
_ = u.queue.ClearCancelSignal(ctx, req.JobID)
|
||
|
|
u.releaseDedupe(ctx, updated)
|
||
|
|
_ = u.appendEvent(ctx, req.JobID, "status_changed", fromStatus, string(enum.RunStatusSucceeded), "job completed", nil)
|
||
|
|
return updated, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (u *jobUseCase) FailRun(ctx context.Context, req domusecase.FailRunRequest) (*entity.Run, error) {
|
||
|
|
run, err := u.runs.FindByID(ctx, req.JobID)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
workerID := strings.TrimSpace(req.WorkerID)
|
||
|
|
if workerID == "" {
|
||
|
|
workerID = run.LockedBy
|
||
|
|
}
|
||
|
|
now := clock.NowUnixNano()
|
||
|
|
fromStatus := string(run.Status)
|
||
|
|
run.Status = enum.RunStatusFailed
|
||
|
|
run.Error = req.Error
|
||
|
|
if req.Phase != "" {
|
||
|
|
run.Phase = req.Phase
|
||
|
|
}
|
||
|
|
run.CompletedAt = &now
|
||
|
|
run.Progress.Summary = "job failed"
|
||
|
|
run.Progress.Percentage = calcProgressPercentage(run.Progress.Steps)
|
||
|
|
run.LockedBy = ""
|
||
|
|
run.LockedUntil = nil
|
||
|
|
|
||
|
|
updated, err := u.runs.UpdateIfLocked(ctx, run, workerID, []enum.RunStatus{
|
||
|
|
enum.RunStatusRunning,
|
||
|
|
enum.RunStatusCancelRequested,
|
||
|
|
})
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
_ = u.queue.ReleaseLock(ctx, req.JobID, workerID)
|
||
|
|
_ = u.queue.ClearCancelSignal(ctx, req.JobID)
|
||
|
|
u.releaseDedupe(ctx, updated)
|
||
|
|
_ = u.appendEvent(ctx, req.JobID, "status_changed", fromStatus, string(enum.RunStatusFailed), req.Error, nil)
|
||
|
|
return updated, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func maxAttemptsOrDefault(maxAttempts int) int {
|
||
|
|
if maxAttempts <= 0 {
|
||
|
|
return 1
|
||
|
|
}
|
||
|
|
return maxAttempts
|
||
|
|
}
|