haixunMaster/haixun-backend/internal/model/job/usecase/helpers.go

153 lines
4.0 KiB
Go
Raw Normal View History

2026-06-23 09:54:27 +00:00
package usecase
import (
"context"
"crypto/sha256"
"encoding/hex"
"strings"
"haixun-backend/internal/library/clock"
app "haixun-backend/internal/library/errors"
"haixun-backend/internal/library/errors/code"
"haixun-backend/internal/model/job/domain/entity"
"haixun-backend/internal/model/job/domain/enum"
)
func buildDedupeKey(template *entity.Template, scope, scopeID string, payload map[string]any) string {
parts := []string{template.Type, scope}
for _, key := range template.DedupeKeys {
switch key {
case "scope_id":
parts = append(parts, scopeID)
case "target":
if payload != nil {
if target, ok := payload["target"].(string); ok {
parts = append(parts, target)
}
}
2026-06-23 16:55:10 +00:00
case "benchmark_username":
if payload != nil {
if username, ok := payload["benchmark_username"].(string); ok {
parts = append(parts, username)
}
}
2026-06-23 09:54:27 +00:00
}
}
raw := strings.Join(parts, "|")
sum := sha256.Sum256([]byte(raw))
return hex.EncodeToString(sum[:])
}
func firstPhaseFromTemplateSteps(steps []entity.StepProgress) string {
if len(steps) == 0 {
return ""
}
return steps[0].ID
}
func firstPhaseFromTemplate(template *entity.Template) string {
if len(template.Steps) == 0 {
return ""
}
return template.Steps[0].ID
}
func buildStepProgress(template *entity.Template) []entity.StepProgress {
steps := make([]entity.StepProgress, 0, len(template.Steps))
for _, step := range template.Steps {
steps = append(steps, entity.StepProgress{ID: step.ID, Status: enum.StepStatusPending})
}
return steps
}
func workerTypeFromTemplate(template *entity.Template) string {
if len(template.Steps) > 0 && strings.TrimSpace(template.Steps[0].WorkerType) != "" {
return template.Steps[0].WorkerType
}
return string(enum.WorkerTypeGo)
}
func (u *jobUseCase) appendEvent(ctx context.Context, jobID, eventType, from, to, message string, metadata map[string]any) error {
return u.events.Append(ctx, &entity.Event{
JobID: jobID,
Type: eventType,
From: from,
To: to,
Message: message,
Metadata: metadata,
})
}
func (u *jobUseCase) enqueueRun(ctx context.Context, run *entity.Run) (*entity.Run, error) {
jobID := run.ID.Hex()
if run.ScheduledAt != nil && *run.ScheduledAt > clock.NowUnixNano() {
return run, nil
}
if err := u.queue.Enqueue(ctx, run.WorkerType, jobID); err != nil {
return nil, err
}
fromStatus := string(run.Status)
run.Status = enum.RunStatusQueued
updated, err := u.runs.Update(ctx, run)
if err != nil {
return nil, err
}
_ = u.appendEvent(ctx, jobID, "status_changed", fromStatus, string(enum.RunStatusQueued), "job enqueued", nil)
return updated, nil
}
func (u *jobUseCase) releaseDedupe(ctx context.Context, run *entity.Run) {
if run.DedupeKey == "" {
return
}
_ = u.queue.ReleaseDedupe(ctx, run.TemplateType, run.DedupeKey)
}
func (u *jobUseCase) enforceDedupe(ctx context.Context, template *entity.Template, dedupeKey string) error {
if dedupeKey == "" {
return nil
}
if !template.Repeatable {
existing, err := u.runs.FindSucceededByDedupeKey(ctx, template.Type, dedupeKey)
if err != nil {
return err
}
if existing != nil {
return app.For(code.Job).ResInvalidState("job already completed for dedupe key")
}
}
return nil
}
func (u *jobUseCase) acquireDedupeLease(ctx context.Context, template *entity.Template, dedupeKey, jobID string) error {
if dedupeKey == "" {
return nil
}
ttl := template.TimeoutSeconds
if ttl <= 0 {
ttl = 3600
}
ok, err := u.queue.TryAcquireDedupe(ctx, template.Type, dedupeKey, jobID, ttl)
if err != nil {
return err
}
if !ok {
return app.For(code.Job).ResInvalidState("duplicate job already in progress")
}
return nil
}
func retryBackoffSeconds(template *entity.Template, attempt int) int {
if len(template.RetryPolicy.BackoffSeconds) == 0 {
return 0
}
idx := attempt - 2
if idx < 0 {
idx = 0
}
if idx >= len(template.RetryPolicy.BackoffSeconds) {
idx = len(template.RetryPolicy.BackoffSeconds) - 1
}
return template.RetryPolicy.BackoffSeconds[idx]
}