thread-master/docs/job-system-plan.md

382 lines
9.9 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Job 核心系統規劃
## 目標
建立一套通用 job system讓任何長任務、流程任務、定時任務未來都能共用。Job 不只是背景任務,而是「有模板、有設定、有狀態、有進度、有取消能力、有重跑策略、有排程能力」的工作單元。
## 核心設計
採用:
```text
Mongo = job/template/run/history 的真相來源
Redis = queue、distributed lock、schedule tick、短期 lease
```
```mermaid
flowchart LR
Api[GoAPI] --> Template[JobTemplate]
Api --> JobRun[JobRunMongo]
JobRun --> RedisQueue[RedisQueue]
Scheduler[SchedulerTick] --> RedisQueue
Worker[Worker] --> RedisQueue
Worker --> JobRun
Worker --> Step[JobStep]
```
## 核心概念
### JobTemplate
Template 定義「這種 job 要怎麼做」。例如:
```text
demo_long_task
external_worker_task
scheduled_report
multi_step_pipeline
```
Template 要回答:
- 這個 job 的輸入 payload schema 是什麼
- 有哪些 steps
- 最終狀態是什麼
- 可不可以重複執行
- 是否允許同 account / 同 target 同時跑
- retry policy 是什麼
- timeout 是多少
- 是否可被排程
- 是否支援取消,以及取消時 worker 要如何收斂
### JobRun
JobRun 是每一次執行實例。它引用 template保存當次 payload、狀態、進度、結果、錯誤與執行歷史。
### JobSchedule
JobSchedule 是「何時建立 JobRun」。支援
- cron
- enabled / disabled
- timezone
- payload template
- target scope例如 user/account/system
- nextRunAt / lastRunAt
### JobFlow
Flow 是多步驟流程。第一版不用做完整 DAG先支援線性 steps
```text
multi_step_pipeline:
1. prepare
2. execute
3. finalize
```
之後再擴成 DAG 或 conditional branch。
## Mongo Collections
### `job_templates`
```json
{
"_id": "...",
"type": "demo_long_task",
"version": 1,
"name": "示範長任務",
"description": "展示 job template、進度、取消、重跑與排程能力",
"enabled": true,
"repeatable": true,
"concurrencyPolicy": "reject_same_scope",
"dedupeKeys": ["scope_id", "target"],
"timeoutSeconds": 600,
"cancelPolicy": {
"supported": true,
"mode": "cooperative",
"graceSeconds": 30
},
"retryPolicy": {
"maxAttempts": 2,
"backoffSeconds": [30, 120]
},
"steps": [
{ "id": "prepare", "name": "準備資料", "workerType": "go", "timeoutSeconds": 60, "cancelable": true },
{ "id": "execute", "name": "執行任務", "workerType": "go", "timeoutSeconds": 300, "cancelable": true },
{ "id": "finalize", "name": "整理結果", "workerType": "go", "timeoutSeconds": 30, "cancelable": false }
],
"createAt": 0,
"updateAt": 0
}
```
### `job_runs`
```json
{
"_id": "...",
"templateType": "demo_long_task",
"templateVersion": 1,
"scope": "user",
"scopeId": "user_123",
"status": "pending",
"phase": "prepare",
"payload": {},
"progress": {
"summary": "等待 worker 執行",
"percentage": 20,
"steps": []
},
"result": null,
"error": null,
"attempt": 0,
"maxAttempts": 2,
"lockedBy": null,
"lockedUntil": null,
"cancelRequestedAt": null,
"cancelReason": null,
"scheduledAt": null,
"startedAt": null,
"completedAt": null,
"createAt": 0,
"updateAt": 0
}
```
### `job_schedules`
```json
{
"_id": "...",
"templateType": "demo_long_task",
"scope": "user",
"scopeId": "user_123",
"enabled": true,
"cron": "0 9 * * *",
"timezone": "Asia/Taipei",
"payloadTemplate": {},
"lastRunAt": null,
"nextRunAt": 0,
"createAt": 0,
"updateAt": 0
}
```
### `job_events`
用來觀察與 audit
```json
{
"_id": "...",
"jobId": "...",
"type": "status_changed",
"from": "pending",
"to": "running",
"message": "worker claimed job",
"metadata": {},
"createAt": 0
}
```
## Status Model
```text
pending = 已建立,等待 queue
queued = 已推進 Redis queue
running = worker 執行中
waiting_worker = 等外部 worker 回寫
cancel_requested = 使用者已要求取消,等待 worker cooperative stop
succeeded = 成功完成
failed = 最終失敗
cancelled = 使用者取消
expired = lock/timeout 過期後無法恢復
```
Step status
```text
pending | running | succeeded | failed | skipped | cancelled
```
## 取消語意
取消是第一版必做能力,採 cooperative cancellation
```mermaid
flowchart LR
User[User] --> ApiCancel[CancelAPI]
ApiCancel --> Run[JobRun cancel_requested]
Run --> RedisCancel[RedisCancelSignal]
Worker[Worker] -->|"poll cancel flag"| Run
Worker --> Stop[StopCurrentStep]
Stop --> Final[JobRun cancelled]
```
規則:
- `pending` / `queued`:取消後直接變 `cancelled`,並盡量從 Redis queue 移除若無法移除worker claim 時必須檢查狀態並跳過。
- `running`:狀態改為 `cancel_requested`,寫入 `cancelRequestedAt` / `cancelReason`worker 必須在 step 間或長任務 checkpoint 檢查取消旗標。
- `waiting_worker`:狀態改為 `cancel_requested`,同時寫 Redis cancel signal外部 worker 回寫前要檢查 job 狀態。
- `succeeded` / `failed` / `cancelled` / `expired`:不可取消,回傳 ResourceInvalidState。
- worker 收到取消後呼叫 `AcknowledgeCancel(jobId, workerId)`,釋放 lock寫入 `job_events`,狀態變 `cancelled`
-`cancel_requested` 超過 template 的 `cancelPolicy.graceSeconds`scheduler/reaper 可標記為 `cancelled``expired`,第一版建議標記 `cancelled` 並記錄 timeout event。
## 狀態與 Lock 安全規則
第一版已把最容易出問題的 race condition 收斂在 repository / usecase
- `ClaimNext` 只能從 `pending` / `queued` conditional update 成 `running`。如果 API 同時取消Mongo update 會被拒絕。
- `RequestCancel` 只能從 cancellable 狀態 conditional update`pending` / `queued` 直接變 `cancelled``running` / `waiting_worker``cancel_requested`
- `CompleteRun` / `FailRun` / `UpdateProgress` 必須帶 `workerID`,並且只能更新 `lockedBy == workerID` 的 job。
- Redis `jobs:lock:<jobId>` 的 value 是 `workerID``ReleaseLock` / `RefreshLock` 使用 owner check避免舊 worker 誤刪新 worker 的 lock。
- Worker 長任務要定期 heartbeat呼叫 `RefreshRunLock(jobId, workerID, ttlSeconds)`。自訂 step handler 可用 `StepContext.Heartbeat`
之後新增狀態轉移時,不要直接使用裸 `Update`;若是生命週期狀態,應新增明確的 guarded repository 方法或使用現有 conditional update。
## Redis Keys
```text
jobs:queue:<workerType> # list 或 streamworker 消費
jobs:lock:<jobId> # lease lock
jobs:scheduler:lock # scheduler singleton lock
jobs:dedupe:<template>:<hash> # 防止同 scope 重複跑
jobs:cancel:<jobId> # cancel signalworker checkpoint 讀取
```
第一版建議用 Redis List 即可,之後需要 ack/replay 再升級 Redis Streams。
## API 規劃
```text
GET /api/v1/job/templates
GET /api/v1/job/templates/:type
PUT /api/v1/job/templates/:type
POST /api/v1/jobs
GET /api/v1/jobs/:id
GET /api/v1/jobs?page=1&pageSize=20
POST /api/v1/jobs/:id/cancel
POST /api/v1/jobs/:id/retry
GET /api/v1/job/schedules?page=1&pageSize=20
POST /api/v1/job/schedules
PUT /api/v1/job/schedules/:id
POST /api/v1/job/schedules/:id/enable
POST /api/v1/job/schedules/:id/disable
```
所有列表回應使用目前標準:
```json
{
"code": 102000,
"message": "SUCCESS",
"data": {
"pagination": {
"total": 42,
"page": 1,
"pageSize": 10,
"totalPages": 5
},
"list": []
}
}
```
## 分層規劃
```text
internal/model/job/
domain/entity/template.go
domain/entity/run.go
domain/entity/schedule.go
domain/entity/event.go
domain/enum/status.go
domain/repository/*.go
domain/usecase/*.go
repository/mongo_*.go
repository/redis_queue.go
usecase/template_usecase.go
usecase/run_usecase.go
usecase/schedule_usecase.go
usecase/progress_usecase.go
usecase/worker_usecase.go
internal/logic/job/
create_job_logic.go
get_job_logic.go
list_jobs_logic.go
cancel_job_logic.go
retry_job_logic.go
template_logic.go
schedule_logic.go
internal/worker/job/
runner.go
scheduler.go
dispatcher.go
```
## Template 執行規則
### 可不可以重複做
`repeatable``concurrencyPolicy` 控制:
```text
repeatable=false
同 dedupe key 完成過就不再建立
repeatable=true + reject_same_scope
已有 running/pending job 時拒絕建立
repeatable=true + allow_parallel
允許平行跑
repeatable=true + replace_existing
取消舊 job建立新 job
```
### 最終狀態
Template 可定義成功條件:
```text
successWhen = all_steps_succeeded
```
第一版只支援 `all_steps_succeeded`。之後再加 `any_step_succeeded` 或 conditional flow。
### 取消能力
Template 用 `cancelPolicy` 控制取消:
```text
supported=false
API 不允許取消此 job
mode=cooperative
worker checkpoint 檢查取消旗標後收斂
graceSeconds=30
cancel_requested 超過 grace 後由 reaper 收斂
```
Step 用 `cancelable` 控制目前步驟是否可立即停止。若目前 step 不可取消worker 需要在 step 結束後停止後續 steps最後狀態仍為 `cancelled`
## 第一版建議實作順序
1.`model/job` 的 enum/entity/repository interface。
2. 實作 Mongo repositoriestemplate/run/schedule/event。
3. 實作 Redis queue/lock repository。
4. 實作 `CreateRun`:讀 template、檢查 repeat/concurrency、建立 run、push queue。
5. 實作 `ClaimNext`worker 從 Redis 取 jobMongo 設 lock/status。
6. 實作 `RequestCancel`:狀態轉 `cancel_requested` 或直接 `cancelled`,寫 Redis cancel signal 與 event。
7. 實作 `AcknowledgeCancel`worker 收斂後釋放 lock狀態轉 `cancelled`
8. 實作 `UpdateProgress`、`Complete`、`Fail`、`Retry`。
9. 實作 schedule tick`job_schedules.nextRunAt <= now`,建立 JobRun。
10. 實作 API 與文件。