181 lines
4.7 KiB
Go
181 lines
4.7 KiB
Go
package usecase
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"strings"
|
|
|
|
"haixun-backend/internal/library/clock"
|
|
app "haixun-backend/internal/library/errors"
|
|
"haixun-backend/internal/library/errors/code"
|
|
"haixun-backend/internal/model/job/domain/entity"
|
|
"haixun-backend/internal/model/job/domain/enum"
|
|
domusecase "haixun-backend/internal/model/job/domain/usecase"
|
|
)
|
|
|
|
func stringFromAny(value any) string {
|
|
switch v := value.(type) {
|
|
case string:
|
|
return strings.TrimSpace(v)
|
|
default:
|
|
return ""
|
|
}
|
|
}
|
|
|
|
func extractRunActor(req domusecase.CreateRunRequest) (tenantID, ownerUID string) {
|
|
tenantID = strings.TrimSpace(req.TenantID)
|
|
ownerUID = strings.TrimSpace(req.OwnerUID)
|
|
if req.Payload != nil {
|
|
if tenantID == "" {
|
|
tenantID = stringFromAny(req.Payload["tenant_id"])
|
|
}
|
|
if ownerUID == "" {
|
|
ownerUID = stringFromAny(req.Payload["owner_uid"])
|
|
}
|
|
}
|
|
scope := strings.TrimSpace(req.Scope)
|
|
if ownerUID == "" && scope == "user" {
|
|
ownerUID = strings.TrimSpace(req.ScopeID)
|
|
}
|
|
return tenantID, ownerUID
|
|
}
|
|
|
|
func buildDedupeKey(template *entity.Template, scope, scopeID string, payload map[string]any) string {
|
|
parts := []string{template.Type, scope}
|
|
for _, key := range template.DedupeKeys {
|
|
switch key {
|
|
case "scope_id":
|
|
parts = append(parts, scopeID)
|
|
case "target":
|
|
if payload != nil {
|
|
if target, ok := payload["target"].(string); ok {
|
|
parts = append(parts, target)
|
|
}
|
|
}
|
|
case "benchmark_username":
|
|
if payload != nil {
|
|
if username, ok := payload["benchmark_username"].(string); ok {
|
|
parts = append(parts, username)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
raw := strings.Join(parts, "|")
|
|
sum := sha256.Sum256([]byte(raw))
|
|
return hex.EncodeToString(sum[:])
|
|
}
|
|
|
|
func firstPhaseFromTemplateSteps(steps []entity.StepProgress) string {
|
|
if len(steps) == 0 {
|
|
return ""
|
|
}
|
|
return steps[0].ID
|
|
}
|
|
|
|
func firstPhaseFromTemplate(template *entity.Template) string {
|
|
if len(template.Steps) == 0 {
|
|
return ""
|
|
}
|
|
return template.Steps[0].ID
|
|
}
|
|
|
|
func buildStepProgress(template *entity.Template) []entity.StepProgress {
|
|
steps := make([]entity.StepProgress, 0, len(template.Steps))
|
|
for _, step := range template.Steps {
|
|
steps = append(steps, entity.StepProgress{ID: step.ID, Status: enum.StepStatusPending})
|
|
}
|
|
return steps
|
|
}
|
|
|
|
func workerTypeFromTemplate(template *entity.Template) string {
|
|
if len(template.Steps) > 0 && strings.TrimSpace(template.Steps[0].WorkerType) != "" {
|
|
return template.Steps[0].WorkerType
|
|
}
|
|
return string(enum.WorkerTypeGo)
|
|
}
|
|
|
|
func (u *jobUseCase) appendEvent(ctx context.Context, jobID, eventType, from, to, message string, metadata map[string]any) error {
|
|
return u.events.Append(ctx, &entity.Event{
|
|
JobID: jobID,
|
|
Type: eventType,
|
|
From: from,
|
|
To: to,
|
|
Message: message,
|
|
Metadata: metadata,
|
|
})
|
|
}
|
|
|
|
func (u *jobUseCase) enqueueRun(ctx context.Context, run *entity.Run) (*entity.Run, error) {
|
|
jobID := run.ID.Hex()
|
|
if run.ScheduledAt != nil && *run.ScheduledAt > clock.NowUnixNano() {
|
|
return run, nil
|
|
}
|
|
if err := u.queue.Enqueue(ctx, run.WorkerType, jobID); err != nil {
|
|
return nil, err
|
|
}
|
|
fromStatus := string(run.Status)
|
|
run.Status = enum.RunStatusQueued
|
|
updated, err := u.runs.Update(ctx, run)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
_ = u.appendEvent(ctx, jobID, "status_changed", fromStatus, string(enum.RunStatusQueued), "job enqueued", nil)
|
|
return updated, nil
|
|
}
|
|
|
|
func (u *jobUseCase) releaseDedupe(ctx context.Context, run *entity.Run) {
|
|
if run.DedupeKey == "" {
|
|
return
|
|
}
|
|
_ = u.queue.ReleaseDedupe(ctx, run.TemplateType, run.DedupeKey)
|
|
}
|
|
|
|
func (u *jobUseCase) enforceDedupe(ctx context.Context, template *entity.Template, dedupeKey string) error {
|
|
if dedupeKey == "" {
|
|
return nil
|
|
}
|
|
if !template.Repeatable {
|
|
existing, err := u.runs.FindSucceededByDedupeKey(ctx, template.Type, dedupeKey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if existing != nil {
|
|
return app.For(code.Job).ResInvalidState("job already completed for dedupe key")
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (u *jobUseCase) acquireDedupeLease(ctx context.Context, template *entity.Template, dedupeKey, jobID string) error {
|
|
if dedupeKey == "" {
|
|
return nil
|
|
}
|
|
ttl := template.TimeoutSeconds
|
|
if ttl <= 0 {
|
|
ttl = 3600
|
|
}
|
|
ok, err := u.queue.TryAcquireDedupe(ctx, template.Type, dedupeKey, jobID, ttl)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !ok {
|
|
return app.For(code.Job).ResInvalidState("duplicate job already in progress")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func retryBackoffSeconds(template *entity.Template, attempt int) int {
|
|
if len(template.RetryPolicy.BackoffSeconds) == 0 {
|
|
return 0
|
|
}
|
|
idx := attempt - 2
|
|
if idx < 0 {
|
|
idx = 0
|
|
}
|
|
if idx >= len(template.RetryPolicy.BackoffSeconds) {
|
|
idx = len(template.RetryPolicy.BackoffSeconds) - 1
|
|
}
|
|
return template.RetryPolicy.BackoffSeconds[idx]
|
|
}
|