213 lines
6.7 KiB
TypeScript
213 lines
6.7 KiB
TypeScript
import { formatAnalyzeError } from "@/lib/ai/analyze-topic";
|
||
import { JobCancelledError, isJobCancelled } from "@/lib/jobs/cancel";
|
||
import { prisma } from "@/lib/db";
|
||
import { getActiveAccountId } from "@/lib/account-context";
|
||
import { analyzeTopic } from "@/lib/services/analyze";
|
||
import { runScanForTopic } from "@/lib/services/scan";
|
||
import { runStyle8DAnalysis } from "@/lib/services/style-analysis";
|
||
import type { AnalyzeTopicPayload, JobType, ScanPayload, Style8DPayload } from "./types";
|
||
import { ZodError } from "zod";
|
||
|
||
/** 超過此時間未更新進度,視為伺服器重啟或 API 掛死 */
|
||
const STALE_JOB_MS: Partial<Record<JobType, number>> = {
|
||
"analyze-topic": 8 * 60 * 1000,
|
||
"style-8d": 8 * 60 * 1000,
|
||
scan: 25 * 60 * 1000,
|
||
};
|
||
|
||
const STALE_JOB_ERROR =
|
||
"任務逾時或伺服器重啟導致中斷,請重新執行(若經常發生可改用較快的 AI 模型)。";
|
||
|
||
/** 將卡死的 pending/running 任務標為失敗,避免 UI 永遠轉圈 */
|
||
export async function recoverStaleBackgroundJobs(): Promise<number> {
|
||
const active = await prisma.backgroundJob.findMany({
|
||
where: { status: { in: ["pending", "running"] } },
|
||
select: { id: true, type: true, updatedAt: true },
|
||
});
|
||
const now = Date.now();
|
||
let recovered = 0;
|
||
|
||
for (const job of active) {
|
||
const maxMs = STALE_JOB_MS[job.type as JobType] ?? 12 * 60 * 1000;
|
||
if (now - job.updatedAt.getTime() <= maxMs) continue;
|
||
|
||
await prisma.backgroundJob.update({
|
||
where: { id: job.id },
|
||
data: {
|
||
status: "failed",
|
||
progress: "失敗",
|
||
error: STALE_JOB_ERROR,
|
||
completedAt: new Date(),
|
||
},
|
||
});
|
||
recovered += 1;
|
||
}
|
||
|
||
return recovered;
|
||
}
|
||
|
||
function formatJobError(error: unknown, type: JobType): string {
|
||
if (typeof error === "string") return error;
|
||
if (type === "analyze-topic") return formatAnalyzeError(error);
|
||
if (type === "style-8d" && error instanceof ZodError) {
|
||
return "AI 有回覆,但 8D 欄位格式仍不完整。請重試一次;若持續發生,請在設定更換研究模型。";
|
||
}
|
||
return error instanceof Error ? error.message : "任務失敗";
|
||
}
|
||
|
||
export async function runBackgroundJob(jobId: string) {
|
||
const job = await prisma.backgroundJob.findUnique({ where: { id: jobId } });
|
||
if (!job || job.status !== "pending") return;
|
||
if (await isJobCancelled(jobId)) return;
|
||
|
||
await prisma.backgroundJob.update({
|
||
where: { id: jobId },
|
||
data: { status: "running", progress: "執行中…" },
|
||
});
|
||
|
||
const type = job.type as JobType;
|
||
|
||
try {
|
||
let result: unknown;
|
||
|
||
if (type === "analyze-topic") {
|
||
let payload: AnalyzeTopicPayload;
|
||
try {
|
||
payload = JSON.parse(job.payload ?? "{}") as AnalyzeTopicPayload;
|
||
} catch {
|
||
throw new Error("任務資料損毀,請重新建立任務");
|
||
}
|
||
await prisma.backgroundJob.update({
|
||
where: { id: jobId },
|
||
data: { progress: "AI 分析中,約需 1~4 分鐘…" },
|
||
});
|
||
result = await analyzeTopic(payload.topicId, payload.brief, {
|
||
jobId,
|
||
onProgress: async (message) => {
|
||
await prisma.backgroundJob.update({
|
||
where: { id: jobId },
|
||
data: { progress: message },
|
||
});
|
||
},
|
||
});
|
||
} else if (type === "style-8d") {
|
||
let payload: Style8DPayload;
|
||
try {
|
||
payload = JSON.parse(job.payload ?? "{}") as Style8DPayload;
|
||
} catch {
|
||
throw new Error("任務資料損毀,請重新建立任務");
|
||
}
|
||
result = await runStyle8DAnalysis(payload.accountId, payload.benchmarkUsername, jobId);
|
||
} else if (type === "scan") {
|
||
let payload: ScanPayload;
|
||
try {
|
||
payload = JSON.parse(job.payload ?? "{}") as ScanPayload;
|
||
} catch {
|
||
throw new Error("任務資料損毀,請重新建立任務");
|
||
}
|
||
if (payload.selectedTags?.length) {
|
||
await prisma.topic.update({
|
||
where: { id: payload.topicId },
|
||
data: { selectedTags: JSON.stringify(payload.selectedTags) },
|
||
});
|
||
}
|
||
await prisma.backgroundJob.update({
|
||
where: { id: jobId },
|
||
data: { progress: "海巡中,依標籤搜尋 Threads…" },
|
||
});
|
||
result = await runScanForTopic(payload.topicId, {
|
||
useTags: payload.useTags,
|
||
selectedTags: payload.selectedTags,
|
||
jobId,
|
||
onProgress: async (message) => {
|
||
await prisma.backgroundJob.update({
|
||
where: { id: jobId },
|
||
data: { progress: message },
|
||
});
|
||
},
|
||
});
|
||
} else {
|
||
throw new Error(`未知的任務類型:${job.type}`);
|
||
}
|
||
|
||
if (await isJobCancelled(jobId)) return;
|
||
|
||
const latest = await prisma.backgroundJob.findUnique({ where: { id: jobId } });
|
||
await prisma.backgroundJob.update({
|
||
where: { id: jobId },
|
||
data: {
|
||
status: "completed",
|
||
progress: latest?.progress ?? "完成",
|
||
result: JSON.stringify(result),
|
||
completedAt: new Date(),
|
||
},
|
||
});
|
||
} catch (error) {
|
||
if (error instanceof JobCancelledError || (await isJobCancelled(jobId))) {
|
||
await prisma.backgroundJob.update({
|
||
where: { id: jobId },
|
||
data: {
|
||
status: "cancelled",
|
||
progress: "已停止",
|
||
completedAt: new Date(),
|
||
},
|
||
});
|
||
return;
|
||
}
|
||
|
||
const message = formatJobError(error, type);
|
||
await prisma.backgroundJob.update({
|
||
where: { id: jobId },
|
||
data: {
|
||
status: "failed",
|
||
progress: "失敗",
|
||
error: message,
|
||
completedAt: new Date(),
|
||
},
|
||
});
|
||
}
|
||
}
|
||
|
||
export async function findActiveJob(topicId: string, type: JobType) {
|
||
await recoverStaleBackgroundJobs();
|
||
const accountId = await getActiveAccountId();
|
||
return prisma.backgroundJob.findFirst({
|
||
where: { topicId, type, status: { in: ["pending", "running"] }, ...(accountId ? { accountId } : {}) },
|
||
orderBy: { createdAt: "desc" },
|
||
});
|
||
}
|
||
|
||
export async function findActiveAccountJob(accountId: string, type: JobType) {
|
||
await recoverStaleBackgroundJobs();
|
||
return prisma.backgroundJob.findFirst({
|
||
where: { accountId, type, status: { in: ["pending", "running"] } },
|
||
orderBy: { createdAt: "desc" },
|
||
});
|
||
}
|
||
|
||
export async function enqueueJob(params: {
|
||
type: JobType;
|
||
topicId?: string;
|
||
accountId?: string;
|
||
label?: string;
|
||
payload: unknown;
|
||
}) {
|
||
await recoverStaleBackgroundJobs();
|
||
const accountId = params.accountId ?? (await getActiveAccountId());
|
||
const job = await prisma.backgroundJob.create({
|
||
data: {
|
||
accountId,
|
||
type: params.type,
|
||
topicId: params.topicId,
|
||
label: params.label,
|
||
payload: JSON.stringify(params.payload),
|
||
status: "pending",
|
||
progress: "排隊中…",
|
||
},
|
||
});
|
||
|
||
return job;
|
||
}
|
||
|
||
export { cancelJob } from "./cancel";
|