thread-master/docs/job-system-plan.md

382 lines
9.9 KiB
Markdown
Raw Permalink Normal View History

2026-06-26 08:37:04 +00:00
# Job 核心系統規劃
## 目標
建立一套通用 job system讓任何長任務、流程任務、定時任務未來都能共用。Job 不只是背景任務,而是「有模板、有設定、有狀態、有進度、有取消能力、有重跑策略、有排程能力」的工作單元。
## 核心設計
採用:
```text
Mongo = job/template/run/history 的真相來源
Redis = queue、distributed lock、schedule tick、短期 lease
```
```mermaid
flowchart LR
Api[GoAPI] --> Template[JobTemplate]
Api --> JobRun[JobRunMongo]
JobRun --> RedisQueue[RedisQueue]
Scheduler[SchedulerTick] --> RedisQueue
Worker[Worker] --> RedisQueue
Worker --> JobRun
Worker --> Step[JobStep]
```
## 核心概念
### JobTemplate
Template 定義「這種 job 要怎麼做」。例如:
```text
demo_long_task
external_worker_task
scheduled_report
multi_step_pipeline
```
Template 要回答:
- 這個 job 的輸入 payload schema 是什麼
- 有哪些 steps
- 最終狀態是什麼
- 可不可以重複執行
- 是否允許同 account / 同 target 同時跑
- retry policy 是什麼
- timeout 是多少
- 是否可被排程
- 是否支援取消,以及取消時 worker 要如何收斂
### JobRun
JobRun 是每一次執行實例。它引用 template保存當次 payload、狀態、進度、結果、錯誤與執行歷史。
### JobSchedule
JobSchedule 是「何時建立 JobRun」。支援
- cron
- enabled / disabled
- timezone
- payload template
- target scope例如 user/account/system
- nextRunAt / lastRunAt
### JobFlow
Flow 是多步驟流程。第一版不用做完整 DAG先支援線性 steps
```text
multi_step_pipeline:
1. prepare
2. execute
3. finalize
```
之後再擴成 DAG 或 conditional branch。
## Mongo Collections
### `job_templates`
```json
{
"_id": "...",
"type": "demo_long_task",
"version": 1,
"name": "示範長任務",
"description": "展示 job template、進度、取消、重跑與排程能力",
"enabled": true,
"repeatable": true,
"concurrencyPolicy": "reject_same_scope",
"dedupeKeys": ["scope_id", "target"],
"timeoutSeconds": 600,
"cancelPolicy": {
"supported": true,
"mode": "cooperative",
"graceSeconds": 30
},
"retryPolicy": {
"maxAttempts": 2,
"backoffSeconds": [30, 120]
},
"steps": [
{ "id": "prepare", "name": "準備資料", "workerType": "go", "timeoutSeconds": 60, "cancelable": true },
{ "id": "execute", "name": "執行任務", "workerType": "go", "timeoutSeconds": 300, "cancelable": true },
{ "id": "finalize", "name": "整理結果", "workerType": "go", "timeoutSeconds": 30, "cancelable": false }
],
"createAt": 0,
"updateAt": 0
}
```
### `job_runs`
```json
{
"_id": "...",
"templateType": "demo_long_task",
"templateVersion": 1,
"scope": "user",
"scopeId": "user_123",
"status": "pending",
"phase": "prepare",
"payload": {},
"progress": {
"summary": "等待 worker 執行",
"percentage": 20,
"steps": []
},
"result": null,
"error": null,
"attempt": 0,
"maxAttempts": 2,
"lockedBy": null,
"lockedUntil": null,
"cancelRequestedAt": null,
"cancelReason": null,
"scheduledAt": null,
"startedAt": null,
"completedAt": null,
"createAt": 0,
"updateAt": 0
}
```
### `job_schedules`
```json
{
"_id": "...",
"templateType": "demo_long_task",
"scope": "user",
"scopeId": "user_123",
"enabled": true,
"cron": "0 9 * * *",
"timezone": "Asia/Taipei",
"payloadTemplate": {},
"lastRunAt": null,
"nextRunAt": 0,
"createAt": 0,
"updateAt": 0
}
```
### `job_events`
用來觀察與 audit
```json
{
"_id": "...",
"jobId": "...",
"type": "status_changed",
"from": "pending",
"to": "running",
"message": "worker claimed job",
"metadata": {},
"createAt": 0
}
```
## Status Model
```text
pending = 已建立,等待 queue
queued = 已推進 Redis queue
running = worker 執行中
waiting_worker = 等外部 worker 回寫
cancel_requested = 使用者已要求取消,等待 worker cooperative stop
succeeded = 成功完成
failed = 最終失敗
cancelled = 使用者取消
expired = lock/timeout 過期後無法恢復
```
Step status
```text
pending | running | succeeded | failed | skipped | cancelled
```
## 取消語意
取消是第一版必做能力,採 cooperative cancellation
```mermaid
flowchart LR
User[User] --> ApiCancel[CancelAPI]
ApiCancel --> Run[JobRun cancel_requested]
Run --> RedisCancel[RedisCancelSignal]
Worker[Worker] -->|"poll cancel flag"| Run
Worker --> Stop[StopCurrentStep]
Stop --> Final[JobRun cancelled]
```
規則:
- `pending` / `queued`:取消後直接變 `cancelled`,並盡量從 Redis queue 移除若無法移除worker claim 時必須檢查狀態並跳過。
- `running`:狀態改為 `cancel_requested`,寫入 `cancelRequestedAt` / `cancelReason`worker 必須在 step 間或長任務 checkpoint 檢查取消旗標。
- `waiting_worker`:狀態改為 `cancel_requested`,同時寫 Redis cancel signal外部 worker 回寫前要檢查 job 狀態。
- `succeeded` / `failed` / `cancelled` / `expired`:不可取消,回傳 ResourceInvalidState。
- worker 收到取消後呼叫 `AcknowledgeCancel(jobId, workerId)`,釋放 lock寫入 `job_events`,狀態變 `cancelled`
-`cancel_requested` 超過 template 的 `cancelPolicy.graceSeconds`scheduler/reaper 可標記為 `cancelled``expired`,第一版建議標記 `cancelled` 並記錄 timeout event。
## 狀態與 Lock 安全規則
第一版已把最容易出問題的 race condition 收斂在 repository / usecase
- `ClaimNext` 只能從 `pending` / `queued` conditional update 成 `running`。如果 API 同時取消Mongo update 會被拒絕。
- `RequestCancel` 只能從 cancellable 狀態 conditional update`pending` / `queued` 直接變 `cancelled``running` / `waiting_worker``cancel_requested`
- `CompleteRun` / `FailRun` / `UpdateProgress` 必須帶 `workerID`,並且只能更新 `lockedBy == workerID` 的 job。
- Redis `jobs:lock:<jobId>` 的 value 是 `workerID``ReleaseLock` / `RefreshLock` 使用 owner check避免舊 worker 誤刪新 worker 的 lock。
- Worker 長任務要定期 heartbeat呼叫 `RefreshRunLock(jobId, workerID, ttlSeconds)`。自訂 step handler 可用 `StepContext.Heartbeat`
之後新增狀態轉移時,不要直接使用裸 `Update`;若是生命週期狀態,應新增明確的 guarded repository 方法或使用現有 conditional update。
## Redis Keys
```text
jobs:queue:<workerType> # list 或 streamworker 消費
jobs:lock:<jobId> # lease lock
jobs:scheduler:lock # scheduler singleton lock
jobs:dedupe:<template>:<hash> # 防止同 scope 重複跑
jobs:cancel:<jobId> # cancel signalworker checkpoint 讀取
```
第一版建議用 Redis List 即可,之後需要 ack/replay 再升級 Redis Streams。
## API 規劃
```text
GET /api/v1/job/templates
GET /api/v1/job/templates/:type
PUT /api/v1/job/templates/:type
POST /api/v1/jobs
GET /api/v1/jobs/:id
GET /api/v1/jobs?page=1&pageSize=20
POST /api/v1/jobs/:id/cancel
POST /api/v1/jobs/:id/retry
GET /api/v1/job/schedules?page=1&pageSize=20
POST /api/v1/job/schedules
PUT /api/v1/job/schedules/:id
POST /api/v1/job/schedules/:id/enable
POST /api/v1/job/schedules/:id/disable
```
所有列表回應使用目前標準:
```json
{
"code": 102000,
"message": "SUCCESS",
"data": {
"pagination": {
"total": 42,
"page": 1,
"pageSize": 10,
"totalPages": 5
},
"list": []
}
}
```
## 分層規劃
```text
internal/model/job/
domain/entity/template.go
domain/entity/run.go
domain/entity/schedule.go
domain/entity/event.go
domain/enum/status.go
domain/repository/*.go
domain/usecase/*.go
repository/mongo_*.go
repository/redis_queue.go
usecase/template_usecase.go
usecase/run_usecase.go
usecase/schedule_usecase.go
usecase/progress_usecase.go
usecase/worker_usecase.go
internal/logic/job/
create_job_logic.go
get_job_logic.go
list_jobs_logic.go
cancel_job_logic.go
retry_job_logic.go
template_logic.go
schedule_logic.go
internal/worker/job/
runner.go
scheduler.go
dispatcher.go
```
## Template 執行規則
### 可不可以重複做
`repeatable``concurrencyPolicy` 控制:
```text
repeatable=false
同 dedupe key 完成過就不再建立
repeatable=true + reject_same_scope
已有 running/pending job 時拒絕建立
repeatable=true + allow_parallel
允許平行跑
repeatable=true + replace_existing
取消舊 job建立新 job
```
### 最終狀態
Template 可定義成功條件:
```text
successWhen = all_steps_succeeded
```
第一版只支援 `all_steps_succeeded`。之後再加 `any_step_succeeded` 或 conditional flow。
### 取消能力
Template 用 `cancelPolicy` 控制取消:
```text
supported=false
API 不允許取消此 job
mode=cooperative
worker checkpoint 檢查取消旗標後收斂
graceSeconds=30
cancel_requested 超過 grace 後由 reaper 收斂
```
Step 用 `cancelable` 控制目前步驟是否可立即停止。若目前 step 不可取消worker 需要在 step 結束後停止後續 steps最後狀態仍為 `cancelled`
## 第一版建議實作順序
1.`model/job` 的 enum/entity/repository interface。
2. 實作 Mongo repositoriestemplate/run/schedule/event。
3. 實作 Redis queue/lock repository。
4. 實作 `CreateRun`:讀 template、檢查 repeat/concurrency、建立 run、push queue。
5. 實作 `ClaimNext`worker 從 Redis 取 jobMongo 設 lock/status。
6. 實作 `RequestCancel`:狀態轉 `cancel_requested` 或直接 `cancelled`,寫 Redis cancel signal 與 event。
7. 實作 `AcknowledgeCancel`worker 收斂後釋放 lock狀態轉 `cancelled`
8. 實作 `UpdateProgress`、`Complete`、`Fail`、`Retry`。
9. 實作 schedule tick`job_schedules.nextRunAt <= now`,建立 JobRun。
10. 實作 API 與文件。