382 lines
9.9 KiB
Markdown
382 lines
9.9 KiB
Markdown
|
|
# Job 核心系統規劃
|
|||
|
|
|
|||
|
|
## 目標
|
|||
|
|
|
|||
|
|
建立一套通用 job system,讓任何長任務、流程任務、定時任務未來都能共用。Job 不只是背景任務,而是「有模板、有設定、有狀態、有進度、有取消能力、有重跑策略、有排程能力」的工作單元。
|
|||
|
|
|
|||
|
|
## 核心設計
|
|||
|
|
|
|||
|
|
採用:
|
|||
|
|
|
|||
|
|
```text
|
|||
|
|
Mongo = job/template/run/history 的真相來源
|
|||
|
|
Redis = queue、distributed lock、schedule tick、短期 lease
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
```mermaid
|
|||
|
|
flowchart LR
|
|||
|
|
Api[GoAPI] --> Template[JobTemplate]
|
|||
|
|
Api --> JobRun[JobRunMongo]
|
|||
|
|
JobRun --> RedisQueue[RedisQueue]
|
|||
|
|
Scheduler[SchedulerTick] --> RedisQueue
|
|||
|
|
Worker[Worker] --> RedisQueue
|
|||
|
|
Worker --> JobRun
|
|||
|
|
Worker --> Step[JobStep]
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 核心概念
|
|||
|
|
|
|||
|
|
### JobTemplate
|
|||
|
|
|
|||
|
|
Template 定義「這種 job 要怎麼做」。例如:
|
|||
|
|
|
|||
|
|
```text
|
|||
|
|
demo_long_task
|
|||
|
|
external_worker_task
|
|||
|
|
scheduled_report
|
|||
|
|
multi_step_pipeline
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Template 要回答:
|
|||
|
|
|
|||
|
|
- 這個 job 的輸入 payload schema 是什麼
|
|||
|
|
- 有哪些 steps
|
|||
|
|
- 最終狀態是什麼
|
|||
|
|
- 可不可以重複執行
|
|||
|
|
- 是否允許同 account / 同 target 同時跑
|
|||
|
|
- retry policy 是什麼
|
|||
|
|
- timeout 是多少
|
|||
|
|
- 是否可被排程
|
|||
|
|
- 是否支援取消,以及取消時 worker 要如何收斂
|
|||
|
|
|
|||
|
|
### JobRun
|
|||
|
|
|
|||
|
|
JobRun 是每一次執行實例。它引用 template,保存當次 payload、狀態、進度、結果、錯誤與執行歷史。
|
|||
|
|
|
|||
|
|
### JobSchedule
|
|||
|
|
|
|||
|
|
JobSchedule 是「何時建立 JobRun」。支援:
|
|||
|
|
|
|||
|
|
- cron
|
|||
|
|
- enabled / disabled
|
|||
|
|
- timezone
|
|||
|
|
- payload template
|
|||
|
|
- target scope,例如 user/account/system
|
|||
|
|
- nextRunAt / lastRunAt
|
|||
|
|
|
|||
|
|
### JobFlow
|
|||
|
|
|
|||
|
|
Flow 是多步驟流程。第一版不用做完整 DAG,先支援線性 steps:
|
|||
|
|
|
|||
|
|
```text
|
|||
|
|
multi_step_pipeline:
|
|||
|
|
1. prepare
|
|||
|
|
2. execute
|
|||
|
|
3. finalize
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
之後再擴成 DAG 或 conditional branch。
|
|||
|
|
|
|||
|
|
## Mongo Collections
|
|||
|
|
|
|||
|
|
### `job_templates`
|
|||
|
|
|
|||
|
|
```json
|
|||
|
|
{
|
|||
|
|
"_id": "...",
|
|||
|
|
"type": "demo_long_task",
|
|||
|
|
"version": 1,
|
|||
|
|
"name": "示範長任務",
|
|||
|
|
"description": "展示 job template、進度、取消、重跑與排程能力",
|
|||
|
|
"enabled": true,
|
|||
|
|
"repeatable": true,
|
|||
|
|
"concurrencyPolicy": "reject_same_scope",
|
|||
|
|
"dedupeKeys": ["scope_id", "target"],
|
|||
|
|
"timeoutSeconds": 600,
|
|||
|
|
"cancelPolicy": {
|
|||
|
|
"supported": true,
|
|||
|
|
"mode": "cooperative",
|
|||
|
|
"graceSeconds": 30
|
|||
|
|
},
|
|||
|
|
"retryPolicy": {
|
|||
|
|
"maxAttempts": 2,
|
|||
|
|
"backoffSeconds": [30, 120]
|
|||
|
|
},
|
|||
|
|
"steps": [
|
|||
|
|
{ "id": "prepare", "name": "準備資料", "workerType": "go", "timeoutSeconds": 60, "cancelable": true },
|
|||
|
|
{ "id": "execute", "name": "執行任務", "workerType": "go", "timeoutSeconds": 300, "cancelable": true },
|
|||
|
|
{ "id": "finalize", "name": "整理結果", "workerType": "go", "timeoutSeconds": 30, "cancelable": false }
|
|||
|
|
],
|
|||
|
|
"createAt": 0,
|
|||
|
|
"updateAt": 0
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### `job_runs`
|
|||
|
|
|
|||
|
|
```json
|
|||
|
|
{
|
|||
|
|
"_id": "...",
|
|||
|
|
"templateType": "demo_long_task",
|
|||
|
|
"templateVersion": 1,
|
|||
|
|
"scope": "user",
|
|||
|
|
"scopeId": "user_123",
|
|||
|
|
"status": "pending",
|
|||
|
|
"phase": "prepare",
|
|||
|
|
"payload": {},
|
|||
|
|
"progress": {
|
|||
|
|
"summary": "等待 worker 執行",
|
|||
|
|
"percentage": 20,
|
|||
|
|
"steps": []
|
|||
|
|
},
|
|||
|
|
"result": null,
|
|||
|
|
"error": null,
|
|||
|
|
"attempt": 0,
|
|||
|
|
"maxAttempts": 2,
|
|||
|
|
"lockedBy": null,
|
|||
|
|
"lockedUntil": null,
|
|||
|
|
"cancelRequestedAt": null,
|
|||
|
|
"cancelReason": null,
|
|||
|
|
"scheduledAt": null,
|
|||
|
|
"startedAt": null,
|
|||
|
|
"completedAt": null,
|
|||
|
|
"createAt": 0,
|
|||
|
|
"updateAt": 0
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### `job_schedules`
|
|||
|
|
|
|||
|
|
```json
|
|||
|
|
{
|
|||
|
|
"_id": "...",
|
|||
|
|
"templateType": "demo_long_task",
|
|||
|
|
"scope": "user",
|
|||
|
|
"scopeId": "user_123",
|
|||
|
|
"enabled": true,
|
|||
|
|
"cron": "0 9 * * *",
|
|||
|
|
"timezone": "Asia/Taipei",
|
|||
|
|
"payloadTemplate": {},
|
|||
|
|
"lastRunAt": null,
|
|||
|
|
"nextRunAt": 0,
|
|||
|
|
"createAt": 0,
|
|||
|
|
"updateAt": 0
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### `job_events`
|
|||
|
|
|
|||
|
|
用來觀察與 audit:
|
|||
|
|
|
|||
|
|
```json
|
|||
|
|
{
|
|||
|
|
"_id": "...",
|
|||
|
|
"jobId": "...",
|
|||
|
|
"type": "status_changed",
|
|||
|
|
"from": "pending",
|
|||
|
|
"to": "running",
|
|||
|
|
"message": "worker claimed job",
|
|||
|
|
"metadata": {},
|
|||
|
|
"createAt": 0
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## Status Model
|
|||
|
|
|
|||
|
|
```text
|
|||
|
|
pending = 已建立,等待 queue
|
|||
|
|
queued = 已推進 Redis queue
|
|||
|
|
running = worker 執行中
|
|||
|
|
waiting_worker = 等外部 worker 回寫
|
|||
|
|
cancel_requested = 使用者已要求取消,等待 worker cooperative stop
|
|||
|
|
succeeded = 成功完成
|
|||
|
|
failed = 最終失敗
|
|||
|
|
cancelled = 使用者取消
|
|||
|
|
expired = lock/timeout 過期後無法恢復
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Step status:
|
|||
|
|
|
|||
|
|
```text
|
|||
|
|
pending | running | succeeded | failed | skipped | cancelled
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 取消語意
|
|||
|
|
|
|||
|
|
取消是第一版必做能力,採 cooperative cancellation:
|
|||
|
|
|
|||
|
|
```mermaid
|
|||
|
|
flowchart LR
|
|||
|
|
User[User] --> ApiCancel[CancelAPI]
|
|||
|
|
ApiCancel --> Run[JobRun cancel_requested]
|
|||
|
|
Run --> RedisCancel[RedisCancelSignal]
|
|||
|
|
Worker[Worker] -->|"poll cancel flag"| Run
|
|||
|
|
Worker --> Stop[StopCurrentStep]
|
|||
|
|
Stop --> Final[JobRun cancelled]
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
規則:
|
|||
|
|
|
|||
|
|
- `pending` / `queued`:取消後直接變 `cancelled`,並盡量從 Redis queue 移除;若無法移除,worker claim 時必須檢查狀態並跳過。
|
|||
|
|
- `running`:狀態改為 `cancel_requested`,寫入 `cancelRequestedAt` / `cancelReason`,worker 必須在 step 間或長任務 checkpoint 檢查取消旗標。
|
|||
|
|
- `waiting_worker`:狀態改為 `cancel_requested`,同時寫 Redis cancel signal;外部 worker 回寫前要檢查 job 狀態。
|
|||
|
|
- `succeeded` / `failed` / `cancelled` / `expired`:不可取消,回傳 ResourceInvalidState。
|
|||
|
|
- worker 收到取消後呼叫 `AcknowledgeCancel(jobId, workerId)`,釋放 lock,寫入 `job_events`,狀態變 `cancelled`。
|
|||
|
|
- 若 `cancel_requested` 超過 template 的 `cancelPolicy.graceSeconds`,scheduler/reaper 可標記為 `cancelled` 或 `expired`,第一版建議標記 `cancelled` 並記錄 timeout event。
|
|||
|
|
|
|||
|
|
## 狀態與 Lock 安全規則
|
|||
|
|
|
|||
|
|
第一版已把最容易出問題的 race condition 收斂在 repository / usecase:
|
|||
|
|
|
|||
|
|
- `ClaimNext` 只能從 `pending` / `queued` conditional update 成 `running`。如果 API 同時取消,Mongo update 會被拒絕。
|
|||
|
|
- `RequestCancel` 只能從 cancellable 狀態 conditional update;`pending` / `queued` 直接變 `cancelled`,`running` / `waiting_worker` 變 `cancel_requested`。
|
|||
|
|
- `CompleteRun` / `FailRun` / `UpdateProgress` 必須帶 `workerID`,並且只能更新 `lockedBy == workerID` 的 job。
|
|||
|
|
- Redis `jobs:lock:<jobId>` 的 value 是 `workerID`;`ReleaseLock` / `RefreshLock` 使用 owner check,避免舊 worker 誤刪新 worker 的 lock。
|
|||
|
|
- Worker 長任務要定期 heartbeat,呼叫 `RefreshRunLock(jobId, workerID, ttlSeconds)`。自訂 step handler 可用 `StepContext.Heartbeat`。
|
|||
|
|
|
|||
|
|
之後新增狀態轉移時,不要直接使用裸 `Update`;若是生命週期狀態,應新增明確的 guarded repository 方法或使用現有 conditional update。
|
|||
|
|
|
|||
|
|
## Redis Keys
|
|||
|
|
|
|||
|
|
```text
|
|||
|
|
jobs:queue:<workerType> # list 或 stream,worker 消費
|
|||
|
|
jobs:lock:<jobId> # lease lock
|
|||
|
|
jobs:scheduler:lock # scheduler singleton lock
|
|||
|
|
jobs:dedupe:<template>:<hash> # 防止同 scope 重複跑
|
|||
|
|
jobs:cancel:<jobId> # cancel signal,worker checkpoint 讀取
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
第一版建議用 Redis List 即可,之後需要 ack/replay 再升級 Redis Streams。
|
|||
|
|
|
|||
|
|
## API 規劃
|
|||
|
|
|
|||
|
|
```text
|
|||
|
|
GET /api/v1/job/templates
|
|||
|
|
GET /api/v1/job/templates/:type
|
|||
|
|
PUT /api/v1/job/templates/:type
|
|||
|
|
|
|||
|
|
POST /api/v1/jobs
|
|||
|
|
GET /api/v1/jobs/:id
|
|||
|
|
GET /api/v1/jobs?page=1&pageSize=20
|
|||
|
|
POST /api/v1/jobs/:id/cancel
|
|||
|
|
POST /api/v1/jobs/:id/retry
|
|||
|
|
|
|||
|
|
GET /api/v1/job/schedules?page=1&pageSize=20
|
|||
|
|
POST /api/v1/job/schedules
|
|||
|
|
PUT /api/v1/job/schedules/:id
|
|||
|
|
POST /api/v1/job/schedules/:id/enable
|
|||
|
|
POST /api/v1/job/schedules/:id/disable
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
所有列表回應使用目前標準:
|
|||
|
|
|
|||
|
|
```json
|
|||
|
|
{
|
|||
|
|
"code": 102000,
|
|||
|
|
"message": "SUCCESS",
|
|||
|
|
"data": {
|
|||
|
|
"pagination": {
|
|||
|
|
"total": 42,
|
|||
|
|
"page": 1,
|
|||
|
|
"pageSize": 10,
|
|||
|
|
"totalPages": 5
|
|||
|
|
},
|
|||
|
|
"list": []
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 分層規劃
|
|||
|
|
|
|||
|
|
```text
|
|||
|
|
internal/model/job/
|
|||
|
|
domain/entity/template.go
|
|||
|
|
domain/entity/run.go
|
|||
|
|
domain/entity/schedule.go
|
|||
|
|
domain/entity/event.go
|
|||
|
|
domain/enum/status.go
|
|||
|
|
domain/repository/*.go
|
|||
|
|
domain/usecase/*.go
|
|||
|
|
repository/mongo_*.go
|
|||
|
|
repository/redis_queue.go
|
|||
|
|
usecase/template_usecase.go
|
|||
|
|
usecase/run_usecase.go
|
|||
|
|
usecase/schedule_usecase.go
|
|||
|
|
usecase/progress_usecase.go
|
|||
|
|
usecase/worker_usecase.go
|
|||
|
|
|
|||
|
|
internal/logic/job/
|
|||
|
|
create_job_logic.go
|
|||
|
|
get_job_logic.go
|
|||
|
|
list_jobs_logic.go
|
|||
|
|
cancel_job_logic.go
|
|||
|
|
retry_job_logic.go
|
|||
|
|
template_logic.go
|
|||
|
|
schedule_logic.go
|
|||
|
|
|
|||
|
|
internal/worker/job/
|
|||
|
|
runner.go
|
|||
|
|
scheduler.go
|
|||
|
|
dispatcher.go
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## Template 執行規則
|
|||
|
|
|
|||
|
|
### 可不可以重複做
|
|||
|
|
|
|||
|
|
由 `repeatable` 與 `concurrencyPolicy` 控制:
|
|||
|
|
|
|||
|
|
```text
|
|||
|
|
repeatable=false
|
|||
|
|
同 dedupe key 完成過就不再建立
|
|||
|
|
|
|||
|
|
repeatable=true + reject_same_scope
|
|||
|
|
已有 running/pending job 時拒絕建立
|
|||
|
|
|
|||
|
|
repeatable=true + allow_parallel
|
|||
|
|
允許平行跑
|
|||
|
|
|
|||
|
|
repeatable=true + replace_existing
|
|||
|
|
取消舊 job,建立新 job
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 最終狀態
|
|||
|
|
|
|||
|
|
Template 可定義成功條件:
|
|||
|
|
|
|||
|
|
```text
|
|||
|
|
successWhen = all_steps_succeeded
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
第一版只支援 `all_steps_succeeded`。之後再加 `any_step_succeeded` 或 conditional flow。
|
|||
|
|
|
|||
|
|
### 取消能力
|
|||
|
|
|
|||
|
|
Template 用 `cancelPolicy` 控制取消:
|
|||
|
|
|
|||
|
|
```text
|
|||
|
|
supported=false
|
|||
|
|
API 不允許取消此 job
|
|||
|
|
|
|||
|
|
mode=cooperative
|
|||
|
|
worker checkpoint 檢查取消旗標後收斂
|
|||
|
|
|
|||
|
|
graceSeconds=30
|
|||
|
|
cancel_requested 超過 grace 後由 reaper 收斂
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
Step 用 `cancelable` 控制目前步驟是否可立即停止。若目前 step 不可取消,worker 需要在 step 結束後停止後續 steps,最後狀態仍為 `cancelled`。
|
|||
|
|
|
|||
|
|
## 第一版建議實作順序
|
|||
|
|
|
|||
|
|
1. 建 `model/job` 的 enum/entity/repository interface。
|
|||
|
|
2. 實作 Mongo repositories:template/run/schedule/event。
|
|||
|
|
3. 實作 Redis queue/lock repository。
|
|||
|
|
4. 實作 `CreateRun`:讀 template、檢查 repeat/concurrency、建立 run、push queue。
|
|||
|
|
5. 實作 `ClaimNext`:worker 從 Redis 取 job,Mongo 設 lock/status。
|
|||
|
|
6. 實作 `RequestCancel`:狀態轉 `cancel_requested` 或直接 `cancelled`,寫 Redis cancel signal 與 event。
|
|||
|
|
7. 實作 `AcknowledgeCancel`:worker 收斂後釋放 lock,狀態轉 `cancelled`。
|
|||
|
|
8. 實作 `UpdateProgress`、`Complete`、`Fail`、`Retry`。
|
|||
|
|
9. 實作 schedule tick:掃 `job_schedules.nextRunAt <= now`,建立 JobRun。
|
|||
|
|
10. 實作 API 與文件。
|