382 lines
9.9 KiB
Markdown
382 lines
9.9 KiB
Markdown
# Job 核心系統規劃
|
||
|
||
## 目標
|
||
|
||
建立一套通用 job system,讓任何長任務、流程任務、定時任務未來都能共用。Job 不只是背景任務,而是「有模板、有設定、有狀態、有進度、有取消能力、有重跑策略、有排程能力」的工作單元。
|
||
|
||
## 核心設計
|
||
|
||
採用:
|
||
|
||
```text
|
||
Mongo = job/template/run/history 的真相來源
|
||
Redis = queue、distributed lock、schedule tick、短期 lease
|
||
```
|
||
|
||
```mermaid
|
||
flowchart LR
|
||
Api[GoAPI] --> Template[JobTemplate]
|
||
Api --> JobRun[JobRunMongo]
|
||
JobRun --> RedisQueue[RedisQueue]
|
||
Scheduler[SchedulerTick] --> RedisQueue
|
||
Worker[Worker] --> RedisQueue
|
||
Worker --> JobRun
|
||
Worker --> Step[JobStep]
|
||
```
|
||
|
||
## 核心概念
|
||
|
||
### JobTemplate
|
||
|
||
Template 定義「這種 job 要怎麼做」。例如:
|
||
|
||
```text
|
||
demo_long_task
|
||
external_worker_task
|
||
scheduled_report
|
||
multi_step_pipeline
|
||
```
|
||
|
||
Template 要回答:
|
||
|
||
- 這個 job 的輸入 payload schema 是什麼
|
||
- 有哪些 steps
|
||
- 最終狀態是什麼
|
||
- 可不可以重複執行
|
||
- 是否允許同 account / 同 target 同時跑
|
||
- retry policy 是什麼
|
||
- timeout 是多少
|
||
- 是否可被排程
|
||
- 是否支援取消,以及取消時 worker 要如何收斂
|
||
|
||
### JobRun
|
||
|
||
JobRun 是每一次執行實例。它引用 template,保存當次 payload、狀態、進度、結果、錯誤與執行歷史。
|
||
|
||
### JobSchedule
|
||
|
||
JobSchedule 是「何時建立 JobRun」。支援:
|
||
|
||
- cron
|
||
- enabled / disabled
|
||
- timezone
|
||
- payload template
|
||
- target scope,例如 user/account/system
|
||
- nextRunAt / lastRunAt
|
||
|
||
### JobFlow
|
||
|
||
Flow 是多步驟流程。第一版不用做完整 DAG,先支援線性 steps:
|
||
|
||
```text
|
||
multi_step_pipeline:
|
||
1. prepare
|
||
2. execute
|
||
3. finalize
|
||
```
|
||
|
||
之後再擴成 DAG 或 conditional branch。
|
||
|
||
## Mongo Collections
|
||
|
||
### `job_templates`
|
||
|
||
```json
|
||
{
|
||
"_id": "...",
|
||
"type": "demo_long_task",
|
||
"version": 1,
|
||
"name": "示範長任務",
|
||
"description": "展示 job template、進度、取消、重跑與排程能力",
|
||
"enabled": true,
|
||
"repeatable": true,
|
||
"concurrencyPolicy": "reject_same_scope",
|
||
"dedupeKeys": ["scope_id", "target"],
|
||
"timeoutSeconds": 600,
|
||
"cancelPolicy": {
|
||
"supported": true,
|
||
"mode": "cooperative",
|
||
"graceSeconds": 30
|
||
},
|
||
"retryPolicy": {
|
||
"maxAttempts": 2,
|
||
"backoffSeconds": [30, 120]
|
||
},
|
||
"steps": [
|
||
{ "id": "prepare", "name": "準備資料", "workerType": "go", "timeoutSeconds": 60, "cancelable": true },
|
||
{ "id": "execute", "name": "執行任務", "workerType": "go", "timeoutSeconds": 300, "cancelable": true },
|
||
{ "id": "finalize", "name": "整理結果", "workerType": "go", "timeoutSeconds": 30, "cancelable": false }
|
||
],
|
||
"createAt": 0,
|
||
"updateAt": 0
|
||
}
|
||
```
|
||
|
||
### `job_runs`
|
||
|
||
```json
|
||
{
|
||
"_id": "...",
|
||
"templateType": "demo_long_task",
|
||
"templateVersion": 1,
|
||
"scope": "user",
|
||
"scopeId": "user_123",
|
||
"status": "pending",
|
||
"phase": "prepare",
|
||
"payload": {},
|
||
"progress": {
|
||
"summary": "等待 worker 執行",
|
||
"percentage": 20,
|
||
"steps": []
|
||
},
|
||
"result": null,
|
||
"error": null,
|
||
"attempt": 0,
|
||
"maxAttempts": 2,
|
||
"lockedBy": null,
|
||
"lockedUntil": null,
|
||
"cancelRequestedAt": null,
|
||
"cancelReason": null,
|
||
"scheduledAt": null,
|
||
"startedAt": null,
|
||
"completedAt": null,
|
||
"createAt": 0,
|
||
"updateAt": 0
|
||
}
|
||
```
|
||
|
||
### `job_schedules`
|
||
|
||
```json
|
||
{
|
||
"_id": "...",
|
||
"templateType": "demo_long_task",
|
||
"scope": "user",
|
||
"scopeId": "user_123",
|
||
"enabled": true,
|
||
"cron": "0 9 * * *",
|
||
"timezone": "Asia/Taipei",
|
||
"payloadTemplate": {},
|
||
"lastRunAt": null,
|
||
"nextRunAt": 0,
|
||
"createAt": 0,
|
||
"updateAt": 0
|
||
}
|
||
```
|
||
|
||
### `job_events`
|
||
|
||
用來觀察與 audit:
|
||
|
||
```json
|
||
{
|
||
"_id": "...",
|
||
"jobId": "...",
|
||
"type": "status_changed",
|
||
"from": "pending",
|
||
"to": "running",
|
||
"message": "worker claimed job",
|
||
"metadata": {},
|
||
"createAt": 0
|
||
}
|
||
```
|
||
|
||
## Status Model
|
||
|
||
```text
|
||
pending = 已建立,等待 queue
|
||
queued = 已推進 Redis queue
|
||
running = worker 執行中
|
||
waiting_worker = 等外部 worker 回寫
|
||
cancel_requested = 使用者已要求取消,等待 worker cooperative stop
|
||
succeeded = 成功完成
|
||
failed = 最終失敗
|
||
cancelled = 使用者取消
|
||
expired = lock/timeout 過期後無法恢復
|
||
```
|
||
|
||
Step status:
|
||
|
||
```text
|
||
pending | running | succeeded | failed | skipped | cancelled
|
||
```
|
||
|
||
## 取消語意
|
||
|
||
取消是第一版必做能力,採 cooperative cancellation:
|
||
|
||
```mermaid
|
||
flowchart LR
|
||
User[User] --> ApiCancel[CancelAPI]
|
||
ApiCancel --> Run[JobRun cancel_requested]
|
||
Run --> RedisCancel[RedisCancelSignal]
|
||
Worker[Worker] -->|"poll cancel flag"| Run
|
||
Worker --> Stop[StopCurrentStep]
|
||
Stop --> Final[JobRun cancelled]
|
||
```
|
||
|
||
規則:
|
||
|
||
- `pending` / `queued`:取消後直接變 `cancelled`,並盡量從 Redis queue 移除;若無法移除,worker claim 時必須檢查狀態並跳過。
|
||
- `running`:狀態改為 `cancel_requested`,寫入 `cancelRequestedAt` / `cancelReason`,worker 必須在 step 間或長任務 checkpoint 檢查取消旗標。
|
||
- `waiting_worker`:狀態改為 `cancel_requested`,同時寫 Redis cancel signal;外部 worker 回寫前要檢查 job 狀態。
|
||
- `succeeded` / `failed` / `cancelled` / `expired`:不可取消,回傳 ResourceInvalidState。
|
||
- worker 收到取消後呼叫 `AcknowledgeCancel(jobId, workerId)`,釋放 lock,寫入 `job_events`,狀態變 `cancelled`。
|
||
- 若 `cancel_requested` 超過 template 的 `cancelPolicy.graceSeconds`,scheduler/reaper 可標記為 `cancelled` 或 `expired`,第一版建議標記 `cancelled` 並記錄 timeout event。
|
||
|
||
## 狀態與 Lock 安全規則
|
||
|
||
第一版已把最容易出問題的 race condition 收斂在 repository / usecase:
|
||
|
||
- `ClaimNext` 只能從 `pending` / `queued` conditional update 成 `running`。如果 API 同時取消,Mongo update 會被拒絕。
|
||
- `RequestCancel` 只能從 cancellable 狀態 conditional update;`pending` / `queued` 直接變 `cancelled`,`running` / `waiting_worker` 變 `cancel_requested`。
|
||
- `CompleteRun` / `FailRun` / `UpdateProgress` 必須帶 `workerID`,並且只能更新 `lockedBy == workerID` 的 job。
|
||
- Redis `jobs:lock:<jobId>` 的 value 是 `workerID`;`ReleaseLock` / `RefreshLock` 使用 owner check,避免舊 worker 誤刪新 worker 的 lock。
|
||
- Worker 長任務要定期 heartbeat,呼叫 `RefreshRunLock(jobId, workerID, ttlSeconds)`。自訂 step handler 可用 `StepContext.Heartbeat`。
|
||
|
||
之後新增狀態轉移時,不要直接使用裸 `Update`;若是生命週期狀態,應新增明確的 guarded repository 方法或使用現有 conditional update。
|
||
|
||
## Redis Keys
|
||
|
||
```text
|
||
jobs:queue:<workerType> # list 或 stream,worker 消費
|
||
jobs:lock:<jobId> # lease lock
|
||
jobs:scheduler:lock # scheduler singleton lock
|
||
jobs:dedupe:<template>:<hash> # 防止同 scope 重複跑
|
||
jobs:cancel:<jobId> # cancel signal,worker checkpoint 讀取
|
||
```
|
||
|
||
第一版建議用 Redis List 即可,之後需要 ack/replay 再升級 Redis Streams。
|
||
|
||
## API 規劃
|
||
|
||
```text
|
||
GET /api/v1/job/templates
|
||
GET /api/v1/job/templates/:type
|
||
PUT /api/v1/job/templates/:type
|
||
|
||
POST /api/v1/jobs
|
||
GET /api/v1/jobs/:id
|
||
GET /api/v1/jobs?page=1&pageSize=20
|
||
POST /api/v1/jobs/:id/cancel
|
||
POST /api/v1/jobs/:id/retry
|
||
|
||
GET /api/v1/job/schedules?page=1&pageSize=20
|
||
POST /api/v1/job/schedules
|
||
PUT /api/v1/job/schedules/:id
|
||
POST /api/v1/job/schedules/:id/enable
|
||
POST /api/v1/job/schedules/:id/disable
|
||
```
|
||
|
||
所有列表回應使用目前標準:
|
||
|
||
```json
|
||
{
|
||
"code": 102000,
|
||
"message": "SUCCESS",
|
||
"data": {
|
||
"pagination": {
|
||
"total": 42,
|
||
"page": 1,
|
||
"pageSize": 10,
|
||
"totalPages": 5
|
||
},
|
||
"list": []
|
||
}
|
||
}
|
||
```
|
||
|
||
## 分層規劃
|
||
|
||
```text
|
||
internal/model/job/
|
||
domain/entity/template.go
|
||
domain/entity/run.go
|
||
domain/entity/schedule.go
|
||
domain/entity/event.go
|
||
domain/enum/status.go
|
||
domain/repository/*.go
|
||
domain/usecase/*.go
|
||
repository/mongo_*.go
|
||
repository/redis_queue.go
|
||
usecase/template_usecase.go
|
||
usecase/run_usecase.go
|
||
usecase/schedule_usecase.go
|
||
usecase/progress_usecase.go
|
||
usecase/worker_usecase.go
|
||
|
||
internal/logic/job/
|
||
create_job_logic.go
|
||
get_job_logic.go
|
||
list_jobs_logic.go
|
||
cancel_job_logic.go
|
||
retry_job_logic.go
|
||
template_logic.go
|
||
schedule_logic.go
|
||
|
||
internal/worker/job/
|
||
runner.go
|
||
scheduler.go
|
||
dispatcher.go
|
||
```
|
||
|
||
## Template 執行規則
|
||
|
||
### 可不可以重複做
|
||
|
||
由 `repeatable` 與 `concurrencyPolicy` 控制:
|
||
|
||
```text
|
||
repeatable=false
|
||
同 dedupe key 完成過就不再建立
|
||
|
||
repeatable=true + reject_same_scope
|
||
已有 running/pending job 時拒絕建立
|
||
|
||
repeatable=true + allow_parallel
|
||
允許平行跑
|
||
|
||
repeatable=true + replace_existing
|
||
取消舊 job,建立新 job
|
||
```
|
||
|
||
### 最終狀態
|
||
|
||
Template 可定義成功條件:
|
||
|
||
```text
|
||
successWhen = all_steps_succeeded
|
||
```
|
||
|
||
第一版只支援 `all_steps_succeeded`。之後再加 `any_step_succeeded` 或 conditional flow。
|
||
|
||
### 取消能力
|
||
|
||
Template 用 `cancelPolicy` 控制取消:
|
||
|
||
```text
|
||
supported=false
|
||
API 不允許取消此 job
|
||
|
||
mode=cooperative
|
||
worker checkpoint 檢查取消旗標後收斂
|
||
|
||
graceSeconds=30
|
||
cancel_requested 超過 grace 後由 reaper 收斂
|
||
```
|
||
|
||
Step 用 `cancelable` 控制目前步驟是否可立即停止。若目前 step 不可取消,worker 需要在 step 結束後停止後續 steps,最後狀態仍為 `cancelled`。
|
||
|
||
## 第一版建議實作順序
|
||
|
||
1. 建 `model/job` 的 enum/entity/repository interface。
|
||
2. 實作 Mongo repositories:template/run/schedule/event。
|
||
3. 實作 Redis queue/lock repository。
|
||
4. 實作 `CreateRun`:讀 template、檢查 repeat/concurrency、建立 run、push queue。
|
||
5. 實作 `ClaimNext`:worker 從 Redis 取 job,Mongo 設 lock/status。
|
||
6. 實作 `RequestCancel`:狀態轉 `cancel_requested` 或直接 `cancelled`,寫 Redis cancel signal 與 event。
|
||
7. 實作 `AcknowledgeCancel`:worker 收斂後釋放 lock,狀態轉 `cancelled`。
|
||
8. 實作 `UpdateProgress`、`Complete`、`Fail`、`Retry`。
|
||
9. 實作 schedule tick:掃 `job_schedules.nextRunAt <= now`,建立 JobRun。
|
||
10. 實作 API 與文件。
|