thread-master/backend/internal/model/job/usecase/test_mocks.go

446 lines
12 KiB
Go

package usecase
import (
"context"
"errors"
"haixun-backend/internal/library/clock"
"haixun-backend/internal/model/job/domain/entity"
"haixun-backend/internal/model/job/domain/enum"
domrepo "haixun-backend/internal/model/job/domain/repository"
domusecase "haixun-backend/internal/model/job/domain/usecase"
"sync"
"go.mongodb.org/mongo-driver/bson/primitive"
)
type memoryTemplateRepo struct {
template *entity.Template
}
func (m *memoryTemplateRepo) EnsureIndexes(context.Context) error { return nil }
func (m *memoryTemplateRepo) List(context.Context) ([]*entity.Template, error) {
if m.template == nil {
return nil, nil
}
return []*entity.Template{m.template}, nil
}
func (m *memoryTemplateRepo) FindByType(context.Context, string) (*entity.Template, error) {
return m.template, nil
}
func (m *memoryTemplateRepo) Upsert(_ context.Context, template *entity.Template) (*entity.Template, error) {
m.template = template
return template, nil
}
type memoryRunRepo struct {
mu sync.Mutex
run *entity.Run
succeeded map[string]*entity.Run
}
func newMemoryRunRepo(run *entity.Run) *memoryRunRepo {
return &memoryRunRepo{run: run, succeeded: map[string]*entity.Run{}}
}
func (m *memoryRunRepo) EnsureIndexes(context.Context) error { return nil }
func (m *memoryRunRepo) Create(ctx context.Context, run *entity.Run) (*entity.Run, error) {
m.mu.Lock()
defer m.mu.Unlock()
if run.ID.IsZero() {
run.ID = primitive.NewObjectID()
}
now := clock.NowUnixNano()
run.CreateAt = now
run.UpdateAt = now
m.run = run
return run, nil
}
func (m *memoryRunRepo) Update(ctx context.Context, run *entity.Run) (*entity.Run, error) {
m.mu.Lock()
defer m.mu.Unlock()
run.UpdateAt = clock.NowUnixNano()
m.run = run
if run.Status == enum.RunStatusSucceeded && run.DedupeKey != "" {
m.succeeded[run.TemplateType+":"+run.DedupeKey] = run
}
return run, nil
}
func (m *memoryRunRepo) UpdateIfStatus(ctx context.Context, run *entity.Run, allowed []enum.RunStatus) (*entity.Run, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.run != nil {
ok := false
for _, status := range allowed {
if m.run.Status == status {
ok = true
break
}
}
if !ok {
return nil, errors.New("job state changed; update rejected")
}
}
run.UpdateAt = clock.NowUnixNano()
m.run = run
if run.Status == enum.RunStatusSucceeded && run.DedupeKey != "" {
m.succeeded[run.TemplateType+":"+run.DedupeKey] = run
}
return run, nil
}
func (m *memoryRunRepo) UpdateIfLocked(ctx context.Context, run *entity.Run, workerID string, allowed []enum.RunStatus) (*entity.Run, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.run != nil {
if m.run.LockedBy != "" && m.run.LockedBy != workerID {
return nil, errors.New("job lock held by another worker")
}
ok := false
for _, status := range allowed {
if m.run.Status == status {
ok = true
break
}
}
if !ok {
return nil, errors.New("job state changed; update rejected")
}
}
run.UpdateAt = clock.NowUnixNano()
m.run = run
if run.Status == enum.RunStatusSucceeded && run.DedupeKey != "" {
m.succeeded[run.TemplateType+":"+run.DedupeKey] = run
}
return run, nil
}
func (m *memoryRunRepo) FindByID(context.Context, string) (*entity.Run, error) {
m.mu.Lock()
defer m.mu.Unlock()
return cloneRun(m.run), nil
}
func (m *memoryRunRepo) List(context.Context, domrepo.RunListFilter, int64, int64) ([]*entity.Run, int64, error) {
return nil, 0, nil
}
func (m *memoryRunRepo) FindActiveByScope(context.Context, string, string, string) ([]*entity.Run, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.run == nil {
return nil, nil
}
switch m.run.Status {
case enum.RunStatusPending, enum.RunStatusQueued, enum.RunStatusRunning, enum.RunStatusWaitingWorker, enum.RunStatusCancelRequested:
return []*entity.Run{m.run}, nil
default:
return nil, nil
}
}
func (m *memoryRunRepo) FindSucceededByDedupeKey(_ context.Context, templateType, dedupeKey string) (*entity.Run, error) {
m.mu.Lock()
defer m.mu.Unlock()
return m.succeeded[templateType+":"+dedupeKey], nil
}
func (m *memoryRunRepo) FindPendingDue(_ context.Context, now int64, _ int64) ([]*entity.Run, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.run == nil || m.run.Status != enum.RunStatusPending || m.run.ScheduledAt == nil {
return nil, nil
}
if *m.run.ScheduledAt <= now {
return []*entity.Run{m.run}, nil
}
return nil, nil
}
func (m *memoryRunRepo) FindCancelRequestedBefore(_ context.Context, before int64, _ int64) ([]*entity.Run, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.run == nil || m.run.Status != enum.RunStatusCancelRequested || m.run.CancelRequestedAt == nil {
return nil, nil
}
if *m.run.CancelRequestedAt <= before {
return []*entity.Run{m.run}, nil
}
return nil, nil
}
func (m *memoryRunRepo) FindRunningTimedOut(_ context.Context, now int64, _ int64) ([]*entity.Run, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.run == nil || m.run.LockedUntil == nil {
return nil, nil
}
if m.run.Status != enum.RunStatusRunning && m.run.Status != enum.RunStatusWaitingWorker {
return nil, nil
}
if *m.run.LockedUntil <= now {
return []*entity.Run{m.run}, nil
}
return nil, nil
}
type memoryScheduleRepo struct {
mu sync.Mutex
schedules []*entity.Schedule
}
func (m *memoryScheduleRepo) EnsureIndexes(context.Context) error { return nil }
func (m *memoryScheduleRepo) Create(_ context.Context, schedule *entity.Schedule) (*entity.Schedule, error) {
m.mu.Lock()
defer m.mu.Unlock()
if schedule.ID.IsZero() {
schedule.ID = primitive.NewObjectID()
}
now := clock.NowUnixNano()
schedule.CreateAt = now
schedule.UpdateAt = now
m.schedules = append(m.schedules, schedule)
return schedule, nil
}
func (m *memoryScheduleRepo) Update(_ context.Context, schedule *entity.Schedule) (*entity.Schedule, error) {
m.mu.Lock()
defer m.mu.Unlock()
schedule.UpdateAt = clock.NowUnixNano()
for i, item := range m.schedules {
if item.ID == schedule.ID {
m.schedules[i] = schedule
return schedule, nil
}
}
m.schedules = append(m.schedules, schedule)
return schedule, nil
}
func (m *memoryScheduleRepo) FindByID(_ context.Context, id string) (*entity.Schedule, error) {
m.mu.Lock()
defer m.mu.Unlock()
objectID, _ := primitive.ObjectIDFromHex(id)
for _, item := range m.schedules {
if item.ID == objectID {
return item, nil
}
}
return nil, nil
}
func (m *memoryScheduleRepo) Delete(_ context.Context, id string) error {
m.mu.Lock()
defer m.mu.Unlock()
objectID, err := primitive.ObjectIDFromHex(id)
if err != nil {
return err
}
for i, item := range m.schedules {
if item.ID == objectID {
m.schedules = append(m.schedules[:i], m.schedules[i+1:]...)
return nil
}
}
return nil
}
func (m *memoryScheduleRepo) List(_ context.Context, _ domrepo.ScheduleListFilter, _, _ int64) ([]*entity.Schedule, int64, error) {
m.mu.Lock()
defer m.mu.Unlock()
return m.schedules, int64(len(m.schedules)), nil
}
func (m *memoryScheduleRepo) FindDue(_ context.Context, now int64, _ int64) ([]*entity.Schedule, error) {
m.mu.Lock()
defer m.mu.Unlock()
var due []*entity.Schedule
for _, item := range m.schedules {
if item.Enabled && item.NextRunAt <= now {
due = append(due, item)
}
}
return due, nil
}
type memoryEventRepo struct {
events []*entity.Event
}
func (m *memoryEventRepo) EnsureIndexes(context.Context) error { return nil }
func (m *memoryEventRepo) Append(_ context.Context, event *entity.Event) error {
m.events = append(m.events, event)
return nil
}
func (m *memoryEventRepo) ListByJobID(_ context.Context, jobID string, limit int64) ([]*entity.Event, error) {
if limit <= 0 {
limit = 50
}
var out []*entity.Event
for _, event := range m.events {
if event.JobID == jobID {
out = append(out, event)
}
}
if int64(len(out)) > limit {
out = out[:limit]
}
return out, nil
}
type memoryQueueRepo struct {
mu sync.Mutex
queues map[string][]string
cancel map[string]string
locks map[string]string
dedupe map[string]string
schedulerLock string
}
func newMemoryQueueRepo() *memoryQueueRepo {
return &memoryQueueRepo{
queues: map[string][]string{},
cancel: map[string]string{},
locks: map[string]string{},
dedupe: map[string]string{},
}
}
func (m *memoryQueueRepo) Enqueue(_ context.Context, workerType, jobID string) error {
m.mu.Lock()
defer m.mu.Unlock()
m.queues[workerType] = append(m.queues[workerType], jobID)
return nil
}
func (m *memoryQueueRepo) Dequeue(context.Context, string, int) (string, error) { return "", nil }
func (m *memoryQueueRepo) RemoveJob(_ context.Context, workerType, jobID string) error {
m.mu.Lock()
defer m.mu.Unlock()
items := m.queues[workerType]
filtered := make([]string, 0, len(items))
for _, item := range items {
if item != jobID {
filtered = append(filtered, item)
}
}
m.queues[workerType] = filtered
return nil
}
func (m *memoryQueueRepo) SetCancelSignal(_ context.Context, jobID, reason string) error {
m.mu.Lock()
defer m.mu.Unlock()
m.cancel[jobID] = reason
return nil
}
func (m *memoryQueueRepo) GetCancelSignal(_ context.Context, jobID string) (bool, string, error) {
m.mu.Lock()
defer m.mu.Unlock()
reason, ok := m.cancel[jobID]
return ok, reason, nil
}
func (m *memoryQueueRepo) ClearCancelSignal(_ context.Context, jobID string) error {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.cancel, jobID)
return nil
}
func (m *memoryQueueRepo) TryLock(_ context.Context, jobID, workerID string, _ int) (bool, error) {
m.mu.Lock()
defer m.mu.Unlock()
if current, ok := m.locks[jobID]; ok && current != workerID {
return false, nil
}
m.locks[jobID] = workerID
return true, nil
}
func (m *memoryQueueRepo) RefreshLock(_ context.Context, jobID, workerID string, _ int) error {
m.mu.Lock()
defer m.mu.Unlock()
if current, ok := m.locks[jobID]; ok && current == workerID {
return nil
}
return errors.New("job lock is not held by worker")
}
func (m *memoryQueueRepo) ReleaseLock(_ context.Context, jobID, workerID string) error {
m.mu.Lock()
defer m.mu.Unlock()
if current, ok := m.locks[jobID]; ok && current == workerID {
delete(m.locks, jobID)
}
return nil
}
func (m *memoryQueueRepo) TryAcquireDedupe(_ context.Context, templateType, dedupeHash, jobID string, _ int) (bool, error) {
m.mu.Lock()
defer m.mu.Unlock()
key := templateType + ":" + dedupeHash
if _, ok := m.dedupe[key]; ok {
return false, nil
}
m.dedupe[key] = jobID
return true, nil
}
func (m *memoryQueueRepo) ReleaseDedupe(_ context.Context, templateType, dedupeHash string) error {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.dedupe, templateType+":"+dedupeHash)
return nil
}
func (m *memoryQueueRepo) TrySchedulerLock(_ context.Context, holder string, _ int) (bool, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.schedulerLock != "" && m.schedulerLock != holder {
return false, nil
}
m.schedulerLock = holder
return true, nil
}
func (m *memoryQueueRepo) ReleaseSchedulerLock(_ context.Context, holder string) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.schedulerLock == holder {
m.schedulerLock = ""
}
return nil
}
func (m *memoryQueueRepo) queued(workerType string) []string {
m.mu.Lock()
defer m.mu.Unlock()
return append([]string(nil), m.queues[workerType]...)
}
func testUseCase(template *entity.Template, run *entity.Run) domusecase.UseCase {
return NewUseCase(
&memoryTemplateRepo{template: template},
newMemoryRunRepo(run),
&memoryScheduleRepo{},
&memoryEventRepo{},
newMemoryQueueRepo(),
)
}
func testUseCaseFull(template *entity.Template, runs *memoryRunRepo, schedules *memoryScheduleRepo, queue *memoryQueueRepo) domusecase.UseCase {
if schedules == nil {
schedules = &memoryScheduleRepo{}
}
if queue == nil {
queue = newMemoryQueueRepo()
}
return NewUseCase(
&memoryTemplateRepo{template: template},
runs,
schedules,
&memoryEventRepo{},
queue,
)
}
func cloneRun(run *entity.Run) *entity.Run {
if run == nil {
return nil
}
out := *run
if run.Progress.Steps != nil {
out.Progress.Steps = append([]entity.StepProgress(nil), run.Progress.Steps...)
}
if run.Payload != nil {
out.Payload = map[string]any{}
for k, v := range run.Payload {
out.Payload[k] = v
}
}
if run.Result != nil {
out.Result = map[string]any{}
for k, v := range run.Result {
out.Result[k] = v
}
}
return &out
}