# Job 核心系統規劃 ## 目標 建立一套通用 job system,讓任何長任務、流程任務、定時任務未來都能共用。Job 不只是背景任務,而是「有模板、有設定、有狀態、有進度、有取消能力、有重跑策略、有排程能力」的工作單元。 ## 核心設計 採用: ```text Mongo = job/template/run/history 的真相來源 Redis = queue、distributed lock、schedule tick、短期 lease ``` ```mermaid flowchart LR Api[GoAPI] --> Template[JobTemplate] Api --> JobRun[JobRunMongo] JobRun --> RedisQueue[RedisQueue] Scheduler[SchedulerTick] --> RedisQueue Worker[Worker] --> RedisQueue Worker --> JobRun Worker --> Step[JobStep] ``` ## 核心概念 ### JobTemplate Template 定義「這種 job 要怎麼做」。例如: ```text demo_long_task external_worker_task scheduled_report multi_step_pipeline ``` Template 要回答: - 這個 job 的輸入 payload schema 是什麼 - 有哪些 steps - 最終狀態是什麼 - 可不可以重複執行 - 是否允許同 account / 同 target 同時跑 - retry policy 是什麼 - timeout 是多少 - 是否可被排程 - 是否支援取消,以及取消時 worker 要如何收斂 ### JobRun JobRun 是每一次執行實例。它引用 template,保存當次 payload、狀態、進度、結果、錯誤與執行歷史。 ### JobSchedule JobSchedule 是「何時建立 JobRun」。支援: - cron - enabled / disabled - timezone - payload template - target scope,例如 user/account/system - nextRunAt / lastRunAt ### JobFlow Flow 是多步驟流程。第一版不用做完整 DAG,先支援線性 steps: ```text multi_step_pipeline: 1. prepare 2. execute 3. finalize ``` 之後再擴成 DAG 或 conditional branch。 ## Mongo Collections ### `job_templates` ```json { "_id": "...", "type": "demo_long_task", "version": 1, "name": "示範長任務", "description": "展示 job template、進度、取消、重跑與排程能力", "enabled": true, "repeatable": true, "concurrencyPolicy": "reject_same_scope", "dedupeKeys": ["scope_id", "target"], "timeoutSeconds": 600, "cancelPolicy": { "supported": true, "mode": "cooperative", "graceSeconds": 30 }, "retryPolicy": { "maxAttempts": 2, "backoffSeconds": [30, 120] }, "steps": [ { "id": "prepare", "name": "準備資料", "workerType": "go", "timeoutSeconds": 60, "cancelable": true }, { "id": "execute", "name": "執行任務", "workerType": "go", "timeoutSeconds": 300, "cancelable": true }, { "id": "finalize", "name": "整理結果", "workerType": "go", "timeoutSeconds": 30, "cancelable": false } ], "createAt": 0, "updateAt": 0 } ``` ### `job_runs` ```json { "_id": "...", "templateType": "demo_long_task", "templateVersion": 1, "scope": "user", "scopeId": "user_123", "status": "pending", "phase": "prepare", "payload": {}, "progress": { "summary": "等待 worker 執行", "percentage": 20, "steps": [] }, "result": null, "error": null, "attempt": 0, "maxAttempts": 2, "lockedBy": null, "lockedUntil": null, "cancelRequestedAt": null, "cancelReason": null, "scheduledAt": null, "startedAt": null, "completedAt": null, "createAt": 0, "updateAt": 0 } ``` ### `job_schedules` ```json { "_id": "...", "templateType": "demo_long_task", "scope": "user", "scopeId": "user_123", "enabled": true, "cron": "0 9 * * *", "timezone": "Asia/Taipei", "payloadTemplate": {}, "lastRunAt": null, "nextRunAt": 0, "createAt": 0, "updateAt": 0 } ``` ### `job_events` 用來觀察與 audit: ```json { "_id": "...", "jobId": "...", "type": "status_changed", "from": "pending", "to": "running", "message": "worker claimed job", "metadata": {}, "createAt": 0 } ``` ## Status Model ```text pending = 已建立,等待 queue queued = 已推進 Redis queue running = worker 執行中 waiting_worker = 等外部 worker 回寫 cancel_requested = 使用者已要求取消,等待 worker cooperative stop succeeded = 成功完成 failed = 最終失敗 cancelled = 使用者取消 expired = lock/timeout 過期後無法恢復 ``` Step status: ```text pending | running | succeeded | failed | skipped | cancelled ``` ## 取消語意 取消是第一版必做能力,採 cooperative cancellation: ```mermaid flowchart LR User[User] --> ApiCancel[CancelAPI] ApiCancel --> Run[JobRun cancel_requested] Run --> RedisCancel[RedisCancelSignal] Worker[Worker] -->|"poll cancel flag"| Run Worker --> Stop[StopCurrentStep] Stop --> Final[JobRun cancelled] ``` 規則: - `pending` / `queued`:取消後直接變 `cancelled`,並盡量從 Redis queue 移除;若無法移除,worker claim 時必須檢查狀態並跳過。 - `running`:狀態改為 `cancel_requested`,寫入 `cancelRequestedAt` / `cancelReason`,worker 必須在 step 間或長任務 checkpoint 檢查取消旗標。 - `waiting_worker`:狀態改為 `cancel_requested`,同時寫 Redis cancel signal;外部 worker 回寫前要檢查 job 狀態。 - `succeeded` / `failed` / `cancelled` / `expired`:不可取消,回傳 ResourceInvalidState。 - worker 收到取消後呼叫 `AcknowledgeCancel(jobId, workerId)`,釋放 lock,寫入 `job_events`,狀態變 `cancelled`。 - 若 `cancel_requested` 超過 template 的 `cancelPolicy.graceSeconds`,scheduler/reaper 可標記為 `cancelled` 或 `expired`,第一版建議標記 `cancelled` 並記錄 timeout event。 ## 狀態與 Lock 安全規則 第一版已把最容易出問題的 race condition 收斂在 repository / usecase: - `ClaimNext` 只能從 `pending` / `queued` conditional update 成 `running`。如果 API 同時取消,Mongo update 會被拒絕。 - `RequestCancel` 只能從 cancellable 狀態 conditional update;`pending` / `queued` 直接變 `cancelled`,`running` / `waiting_worker` 變 `cancel_requested`。 - `CompleteRun` / `FailRun` / `UpdateProgress` 必須帶 `workerID`,並且只能更新 `lockedBy == workerID` 的 job。 - Redis `jobs:lock:` 的 value 是 `workerID`;`ReleaseLock` / `RefreshLock` 使用 owner check,避免舊 worker 誤刪新 worker 的 lock。 - Worker 長任務要定期 heartbeat,呼叫 `RefreshRunLock(jobId, workerID, ttlSeconds)`。自訂 step handler 可用 `StepContext.Heartbeat`。 之後新增狀態轉移時,不要直接使用裸 `Update`;若是生命週期狀態,應新增明確的 guarded repository 方法或使用現有 conditional update。 ## Redis Keys ```text jobs:queue: # list 或 stream,worker 消費 jobs:lock: # lease lock jobs:scheduler:lock # scheduler singleton lock jobs:dedupe: