haixunMaster/lib/jobs/runner.ts

213 lines
6.7 KiB
TypeScript
Raw Normal View History

2026-06-21 12:50:31 +00:00
import { formatAnalyzeError } from "@/lib/ai/analyze-topic";
import { JobCancelledError, isJobCancelled } from "@/lib/jobs/cancel";
import { prisma } from "@/lib/db";
import { getActiveAccountId } from "@/lib/account-context";
import { analyzeTopic } from "@/lib/services/analyze";
import { runScanForTopic } from "@/lib/services/scan";
import { runStyle8DAnalysis } from "@/lib/services/style-analysis";
import type { AnalyzeTopicPayload, JobType, ScanPayload, Style8DPayload } from "./types";
import { ZodError } from "zod";
/** 超過此時間未更新進度,視為伺服器重啟或 API 掛死 */
const STALE_JOB_MS: Partial<Record<JobType, number>> = {
"analyze-topic": 8 * 60 * 1000,
"style-8d": 8 * 60 * 1000,
scan: 25 * 60 * 1000,
};
const STALE_JOB_ERROR =
"任務逾時或伺服器重啟導致中斷,請重新執行(若經常發生可改用較快的 AI 模型)。";
/** 將卡死的 pending/running 任務標為失敗,避免 UI 永遠轉圈 */
export async function recoverStaleBackgroundJobs(): Promise<number> {
const active = await prisma.backgroundJob.findMany({
where: { status: { in: ["pending", "running"] } },
select: { id: true, type: true, updatedAt: true },
});
const now = Date.now();
let recovered = 0;
for (const job of active) {
const maxMs = STALE_JOB_MS[job.type as JobType] ?? 12 * 60 * 1000;
if (now - job.updatedAt.getTime() <= maxMs) continue;
await prisma.backgroundJob.update({
where: { id: job.id },
data: {
status: "failed",
progress: "失敗",
error: STALE_JOB_ERROR,
completedAt: new Date(),
},
});
recovered += 1;
}
return recovered;
}
function formatJobError(error: unknown, type: JobType): string {
if (typeof error === "string") return error;
if (type === "analyze-topic") return formatAnalyzeError(error);
if (type === "style-8d" && error instanceof ZodError) {
return "AI 有回覆,但 8D 欄位格式仍不完整。請重試一次;若持續發生,請在設定更換研究模型。";
}
return error instanceof Error ? error.message : "任務失敗";
}
export async function runBackgroundJob(jobId: string) {
const job = await prisma.backgroundJob.findUnique({ where: { id: jobId } });
if (!job || job.status !== "pending") return;
if (await isJobCancelled(jobId)) return;
await prisma.backgroundJob.update({
where: { id: jobId },
data: { status: "running", progress: "執行中…" },
});
const type = job.type as JobType;
try {
let result: unknown;
if (type === "analyze-topic") {
let payload: AnalyzeTopicPayload;
try {
payload = JSON.parse(job.payload ?? "{}") as AnalyzeTopicPayload;
} catch {
throw new Error("任務資料損毀,請重新建立任務");
}
await prisma.backgroundJob.update({
where: { id: jobId },
data: { progress: "AI 分析中,約需 14 分鐘…" },
});
result = await analyzeTopic(payload.topicId, payload.brief, {
jobId,
onProgress: async (message) => {
await prisma.backgroundJob.update({
where: { id: jobId },
data: { progress: message },
});
},
});
} else if (type === "style-8d") {
let payload: Style8DPayload;
try {
payload = JSON.parse(job.payload ?? "{}") as Style8DPayload;
} catch {
throw new Error("任務資料損毀,請重新建立任務");
}
result = await runStyle8DAnalysis(payload.accountId, payload.benchmarkUsername, jobId);
} else if (type === "scan") {
let payload: ScanPayload;
try {
payload = JSON.parse(job.payload ?? "{}") as ScanPayload;
} catch {
throw new Error("任務資料損毀,請重新建立任務");
}
if (payload.selectedTags?.length) {
await prisma.topic.update({
where: { id: payload.topicId },
data: { selectedTags: JSON.stringify(payload.selectedTags) },
});
}
await prisma.backgroundJob.update({
where: { id: jobId },
data: { progress: "海巡中,依標籤搜尋 Threads…" },
});
result = await runScanForTopic(payload.topicId, {
useTags: payload.useTags,
selectedTags: payload.selectedTags,
jobId,
onProgress: async (message) => {
await prisma.backgroundJob.update({
where: { id: jobId },
data: { progress: message },
});
},
});
} else {
throw new Error(`未知的任務類型:${job.type}`);
}
if (await isJobCancelled(jobId)) return;
const latest = await prisma.backgroundJob.findUnique({ where: { id: jobId } });
await prisma.backgroundJob.update({
where: { id: jobId },
data: {
status: "completed",
progress: latest?.progress ?? "完成",
result: JSON.stringify(result),
completedAt: new Date(),
},
});
} catch (error) {
if (error instanceof JobCancelledError || (await isJobCancelled(jobId))) {
await prisma.backgroundJob.update({
where: { id: jobId },
data: {
status: "cancelled",
progress: "已停止",
completedAt: new Date(),
},
});
return;
}
const message = formatJobError(error, type);
await prisma.backgroundJob.update({
where: { id: jobId },
data: {
status: "failed",
progress: "失敗",
error: message,
completedAt: new Date(),
},
});
}
}
export async function findActiveJob(topicId: string, type: JobType) {
await recoverStaleBackgroundJobs();
const accountId = await getActiveAccountId();
return prisma.backgroundJob.findFirst({
where: { topicId, type, status: { in: ["pending", "running"] }, ...(accountId ? { accountId } : {}) },
orderBy: { createdAt: "desc" },
});
}
export async function findActiveAccountJob(accountId: string, type: JobType) {
await recoverStaleBackgroundJobs();
return prisma.backgroundJob.findFirst({
where: { accountId, type, status: { in: ["pending", "running"] } },
orderBy: { createdAt: "desc" },
});
}
export async function enqueueJob(params: {
type: JobType;
topicId?: string;
accountId?: string;
label?: string;
payload: unknown;
}) {
await recoverStaleBackgroundJobs();
const accountId = params.accountId ?? (await getActiveAccountId());
const job = await prisma.backgroundJob.create({
data: {
accountId,
type: params.type,
topicId: params.topicId,
label: params.label,
payload: JSON.stringify(params.payload),
status: "pending",
progress: "排隊中…",
},
});
return job;
}
export { cancelJob } from "./cancel";