import { formatAnalyzeError } from "@/lib/ai/analyze-topic"; import { JobCancelledError, isJobCancelled } from "@/lib/jobs/cancel"; import { prisma } from "@/lib/db"; import { getActiveAccountId } from "@/lib/account-context"; import { analyzeTopic } from "@/lib/services/analyze"; import { runScanForTopic } from "@/lib/services/scan"; import { runStyle8DAnalysis } from "@/lib/services/style-analysis"; import type { AnalyzeTopicPayload, JobType, ScanPayload, Style8DPayload } from "./types"; import { ZodError } from "zod"; /** 超過此時間未更新進度,視為伺服器重啟或 API 掛死 */ const STALE_JOB_MS: Partial> = { "analyze-topic": 8 * 60 * 1000, "style-8d": 8 * 60 * 1000, scan: 25 * 60 * 1000, }; const STALE_JOB_ERROR = "任務逾時或伺服器重啟導致中斷,請重新執行(若經常發生可改用較快的 AI 模型)。"; /** 將卡死的 pending/running 任務標為失敗,避免 UI 永遠轉圈 */ export async function recoverStaleBackgroundJobs(): Promise { const active = await prisma.backgroundJob.findMany({ where: { status: { in: ["pending", "running"] } }, select: { id: true, type: true, updatedAt: true }, }); const now = Date.now(); let recovered = 0; for (const job of active) { const maxMs = STALE_JOB_MS[job.type as JobType] ?? 12 * 60 * 1000; if (now - job.updatedAt.getTime() <= maxMs) continue; await prisma.backgroundJob.update({ where: { id: job.id }, data: { status: "failed", progress: "失敗", error: STALE_JOB_ERROR, completedAt: new Date(), }, }); recovered += 1; } return recovered; } function formatJobError(error: unknown, type: JobType): string { if (typeof error === "string") return error; if (type === "analyze-topic") return formatAnalyzeError(error); if (type === "style-8d" && error instanceof ZodError) { return "AI 有回覆,但 8D 欄位格式仍不完整。請重試一次;若持續發生,請在設定更換研究模型。"; } return error instanceof Error ? error.message : "任務失敗"; } export async function runBackgroundJob(jobId: string) { const job = await prisma.backgroundJob.findUnique({ where: { id: jobId } }); if (!job || job.status !== "pending") return; if (await isJobCancelled(jobId)) return; await prisma.backgroundJob.update({ where: { id: jobId }, data: { status: "running", progress: "執行中…" }, }); const type = job.type as JobType; try { let result: unknown; if (type === "analyze-topic") { let payload: AnalyzeTopicPayload; try { payload = JSON.parse(job.payload ?? "{}") as AnalyzeTopicPayload; } catch { throw new Error("任務資料損毀,請重新建立任務"); } await prisma.backgroundJob.update({ where: { id: jobId }, data: { progress: "AI 分析中,約需 1~4 分鐘…" }, }); result = await analyzeTopic(payload.topicId, payload.brief, { jobId, onProgress: async (message) => { await prisma.backgroundJob.update({ where: { id: jobId }, data: { progress: message }, }); }, }); } else if (type === "style-8d") { let payload: Style8DPayload; try { payload = JSON.parse(job.payload ?? "{}") as Style8DPayload; } catch { throw new Error("任務資料損毀,請重新建立任務"); } result = await runStyle8DAnalysis(payload.accountId, payload.benchmarkUsername, jobId); } else if (type === "scan") { let payload: ScanPayload; try { payload = JSON.parse(job.payload ?? "{}") as ScanPayload; } catch { throw new Error("任務資料損毀,請重新建立任務"); } if (payload.selectedTags?.length) { await prisma.topic.update({ where: { id: payload.topicId }, data: { selectedTags: JSON.stringify(payload.selectedTags) }, }); } await prisma.backgroundJob.update({ where: { id: jobId }, data: { progress: "海巡中,依標籤搜尋 Threads…" }, }); result = await runScanForTopic(payload.topicId, { useTags: payload.useTags, selectedTags: payload.selectedTags, jobId, onProgress: async (message) => { await prisma.backgroundJob.update({ where: { id: jobId }, data: { progress: message }, }); }, }); } else { throw new Error(`未知的任務類型:${job.type}`); } if (await isJobCancelled(jobId)) return; const latest = await prisma.backgroundJob.findUnique({ where: { id: jobId } }); await prisma.backgroundJob.update({ where: { id: jobId }, data: { status: "completed", progress: latest?.progress ?? "完成", result: JSON.stringify(result), completedAt: new Date(), }, }); } catch (error) { if (error instanceof JobCancelledError || (await isJobCancelled(jobId))) { await prisma.backgroundJob.update({ where: { id: jobId }, data: { status: "cancelled", progress: "已停止", completedAt: new Date(), }, }); return; } const message = formatJobError(error, type); await prisma.backgroundJob.update({ where: { id: jobId }, data: { status: "failed", progress: "失敗", error: message, completedAt: new Date(), }, }); } } export async function findActiveJob(topicId: string, type: JobType) { await recoverStaleBackgroundJobs(); const accountId = await getActiveAccountId(); return prisma.backgroundJob.findFirst({ where: { topicId, type, status: { in: ["pending", "running"] }, ...(accountId ? { accountId } : {}) }, orderBy: { createdAt: "desc" }, }); } export async function findActiveAccountJob(accountId: string, type: JobType) { await recoverStaleBackgroundJobs(); return prisma.backgroundJob.findFirst({ where: { accountId, type, status: { in: ["pending", "running"] } }, orderBy: { createdAt: "desc" }, }); } export async function enqueueJob(params: { type: JobType; topicId?: string; accountId?: string; label?: string; payload: unknown; }) { await recoverStaleBackgroundJobs(); const accountId = params.accountId ?? (await getActiveAccountId()); const job = await prisma.backgroundJob.create({ data: { accountId, type: params.type, topicId: params.topicId, label: params.label, payload: JSON.stringify(params.payload), status: "pending", progress: "排隊中…", }, }); return job; } export { cancelJob } from "./cancel";