thread-master/docs/job-system-plan.md

9.9 KiB
Raw Permalink Blame History

Job 核心系統規劃

目標

建立一套通用 job system讓任何長任務、流程任務、定時任務未來都能共用。Job 不只是背景任務,而是「有模板、有設定、有狀態、有進度、有取消能力、有重跑策略、有排程能力」的工作單元。

核心設計

採用:

Mongo = job/template/run/history 的真相來源
Redis = queue、distributed lock、schedule tick、短期 lease
flowchart LR
  Api[GoAPI] --> Template[JobTemplate]
  Api --> JobRun[JobRunMongo]
  JobRun --> RedisQueue[RedisQueue]
  Scheduler[SchedulerTick] --> RedisQueue
  Worker[Worker] --> RedisQueue
  Worker --> JobRun
  Worker --> Step[JobStep]

核心概念

JobTemplate

Template 定義「這種 job 要怎麼做」。例如:

demo_long_task
external_worker_task
scheduled_report
multi_step_pipeline

Template 要回答:

  • 這個 job 的輸入 payload schema 是什麼
  • 有哪些 steps
  • 最終狀態是什麼
  • 可不可以重複執行
  • 是否允許同 account / 同 target 同時跑
  • retry policy 是什麼
  • timeout 是多少
  • 是否可被排程
  • 是否支援取消,以及取消時 worker 要如何收斂

JobRun

JobRun 是每一次執行實例。它引用 template保存當次 payload、狀態、進度、結果、錯誤與執行歷史。

JobSchedule

JobSchedule 是「何時建立 JobRun」。支援

  • cron
  • enabled / disabled
  • timezone
  • payload template
  • target scope例如 user/account/system
  • nextRunAt / lastRunAt

JobFlow

Flow 是多步驟流程。第一版不用做完整 DAG先支援線性 steps

multi_step_pipeline:
  1. prepare
  2. execute
  3. finalize

之後再擴成 DAG 或 conditional branch。

Mongo Collections

job_templates

{
  "_id": "...",
  "type": "demo_long_task",
  "version": 1,
  "name": "示範長任務",
  "description": "展示 job template、進度、取消、重跑與排程能力",
  "enabled": true,
  "repeatable": true,
  "concurrencyPolicy": "reject_same_scope",
  "dedupeKeys": ["scope_id", "target"],
  "timeoutSeconds": 600,
  "cancelPolicy": {
    "supported": true,
    "mode": "cooperative",
    "graceSeconds": 30
  },
  "retryPolicy": {
    "maxAttempts": 2,
    "backoffSeconds": [30, 120]
  },
  "steps": [
    { "id": "prepare", "name": "準備資料", "workerType": "go", "timeoutSeconds": 60, "cancelable": true },
    { "id": "execute", "name": "執行任務", "workerType": "go", "timeoutSeconds": 300, "cancelable": true },
    { "id": "finalize", "name": "整理結果", "workerType": "go", "timeoutSeconds": 30, "cancelable": false }
  ],
  "createAt": 0,
  "updateAt": 0
}

job_runs

{
  "_id": "...",
  "templateType": "demo_long_task",
  "templateVersion": 1,
  "scope": "user",
  "scopeId": "user_123",
  "status": "pending",
  "phase": "prepare",
  "payload": {},
  "progress": {
    "summary": "等待 worker 執行",
    "percentage": 20,
    "steps": []
  },
  "result": null,
  "error": null,
  "attempt": 0,
  "maxAttempts": 2,
  "lockedBy": null,
  "lockedUntil": null,
  "cancelRequestedAt": null,
  "cancelReason": null,
  "scheduledAt": null,
  "startedAt": null,
  "completedAt": null,
  "createAt": 0,
  "updateAt": 0
}

job_schedules

{
  "_id": "...",
  "templateType": "demo_long_task",
  "scope": "user",
  "scopeId": "user_123",
  "enabled": true,
  "cron": "0 9 * * *",
  "timezone": "Asia/Taipei",
  "payloadTemplate": {},
  "lastRunAt": null,
  "nextRunAt": 0,
  "createAt": 0,
  "updateAt": 0
}

job_events

用來觀察與 audit

{
  "_id": "...",
  "jobId": "...",
  "type": "status_changed",
  "from": "pending",
  "to": "running",
  "message": "worker claimed job",
  "metadata": {},
  "createAt": 0
}

Status Model

pending          = 已建立,等待 queue
queued           = 已推進 Redis queue
running          = worker 執行中
waiting_worker   = 等外部 worker 回寫
cancel_requested = 使用者已要求取消,等待 worker cooperative stop
succeeded        = 成功完成
failed           = 最終失敗
cancelled        = 使用者取消
expired          = lock/timeout 過期後無法恢復

Step status

pending | running | succeeded | failed | skipped | cancelled

取消語意

取消是第一版必做能力,採 cooperative cancellation

flowchart LR
  User[User] --> ApiCancel[CancelAPI]
  ApiCancel --> Run[JobRun cancel_requested]
  Run --> RedisCancel[RedisCancelSignal]
  Worker[Worker] -->|"poll cancel flag"| Run
  Worker --> Stop[StopCurrentStep]
  Stop --> Final[JobRun cancelled]

規則:

  • pending / queued:取消後直接變 cancelled,並盡量從 Redis queue 移除若無法移除worker claim 時必須檢查狀態並跳過。
  • running:狀態改為 cancel_requested,寫入 cancelRequestedAt / cancelReasonworker 必須在 step 間或長任務 checkpoint 檢查取消旗標。
  • waiting_worker:狀態改為 cancel_requested,同時寫 Redis cancel signal外部 worker 回寫前要檢查 job 狀態。
  • succeeded / failed / cancelled / expired:不可取消,回傳 ResourceInvalidState。
  • worker 收到取消後呼叫 AcknowledgeCancel(jobId, workerId),釋放 lock寫入 job_events,狀態變 cancelled
  • cancel_requested 超過 template 的 cancelPolicy.graceSecondsscheduler/reaper 可標記為 cancelledexpired,第一版建議標記 cancelled 並記錄 timeout event。

狀態與 Lock 安全規則

第一版已把最容易出問題的 race condition 收斂在 repository / usecase

  • ClaimNext 只能從 pending / queued conditional update 成 running。如果 API 同時取消Mongo update 會被拒絕。
  • RequestCancel 只能從 cancellable 狀態 conditional updatepending / queued 直接變 cancelledrunning / waiting_workercancel_requested
  • CompleteRun / FailRun / UpdateProgress 必須帶 workerID,並且只能更新 lockedBy == workerID 的 job。
  • Redis jobs:lock:<jobId> 的 value 是 workerIDReleaseLock / RefreshLock 使用 owner check避免舊 worker 誤刪新 worker 的 lock。
  • Worker 長任務要定期 heartbeat呼叫 RefreshRunLock(jobId, workerID, ttlSeconds)。自訂 step handler 可用 StepContext.Heartbeat

之後新增狀態轉移時,不要直接使用裸 Update;若是生命週期狀態,應新增明確的 guarded repository 方法或使用現有 conditional update。

Redis Keys

jobs:queue:<workerType>           # list 或 streamworker 消費
jobs:lock:<jobId>                 # lease lock
jobs:scheduler:lock               # scheduler singleton lock
jobs:dedupe:<template>:<hash>     # 防止同 scope 重複跑
jobs:cancel:<jobId>               # cancel signalworker checkpoint 讀取

第一版建議用 Redis List 即可,之後需要 ack/replay 再升級 Redis Streams。

API 規劃

GET  /api/v1/job/templates
GET  /api/v1/job/templates/:type
PUT  /api/v1/job/templates/:type

POST /api/v1/jobs
GET  /api/v1/jobs/:id
GET  /api/v1/jobs?page=1&pageSize=20
POST /api/v1/jobs/:id/cancel
POST /api/v1/jobs/:id/retry

GET  /api/v1/job/schedules?page=1&pageSize=20
POST /api/v1/job/schedules
PUT  /api/v1/job/schedules/:id
POST /api/v1/job/schedules/:id/enable
POST /api/v1/job/schedules/:id/disable

所有列表回應使用目前標準:

{
  "code": 102000,
  "message": "SUCCESS",
  "data": {
    "pagination": {
      "total": 42,
      "page": 1,
      "pageSize": 10,
      "totalPages": 5
    },
    "list": []
  }
}

分層規劃

internal/model/job/
  domain/entity/template.go
  domain/entity/run.go
  domain/entity/schedule.go
  domain/entity/event.go
  domain/enum/status.go
  domain/repository/*.go
  domain/usecase/*.go
  repository/mongo_*.go
  repository/redis_queue.go
  usecase/template_usecase.go
  usecase/run_usecase.go
  usecase/schedule_usecase.go
  usecase/progress_usecase.go
  usecase/worker_usecase.go

internal/logic/job/
  create_job_logic.go
  get_job_logic.go
  list_jobs_logic.go
  cancel_job_logic.go
  retry_job_logic.go
  template_logic.go
  schedule_logic.go

internal/worker/job/
  runner.go
  scheduler.go
  dispatcher.go

Template 執行規則

可不可以重複做

repeatableconcurrencyPolicy 控制:

repeatable=false
  同 dedupe key 完成過就不再建立

repeatable=true + reject_same_scope
  已有 running/pending job 時拒絕建立

repeatable=true + allow_parallel
  允許平行跑

repeatable=true + replace_existing
  取消舊 job建立新 job

最終狀態

Template 可定義成功條件:

successWhen = all_steps_succeeded

第一版只支援 all_steps_succeeded。之後再加 any_step_succeeded 或 conditional flow。

取消能力

Template 用 cancelPolicy 控制取消:

supported=false
  API 不允許取消此 job

mode=cooperative
  worker checkpoint 檢查取消旗標後收斂

graceSeconds=30
  cancel_requested 超過 grace 後由 reaper 收斂

Step 用 cancelable 控制目前步驟是否可立即停止。若目前 step 不可取消worker 需要在 step 結束後停止後續 steps最後狀態仍為 cancelled

第一版建議實作順序

  1. model/job 的 enum/entity/repository interface。
  2. 實作 Mongo repositoriestemplate/run/schedule/event。
  3. 實作 Redis queue/lock repository。
  4. 實作 CreateRun:讀 template、檢查 repeat/concurrency、建立 run、push queue。
  5. 實作 ClaimNextworker 從 Redis 取 jobMongo 設 lock/status。
  6. 實作 RequestCancel:狀態轉 cancel_requested 或直接 cancelled,寫 Redis cancel signal 與 event。
  7. 實作 AcknowledgeCancelworker 收斂後釋放 lock狀態轉 cancelled
  8. 實作 UpdateProgressCompleteFailRetry
  9. 實作 schedule tickjob_schedules.nextRunAt <= now,建立 JobRun。
  10. 實作 API 與文件。