213 lines
6.7 KiB
TypeScript
213 lines
6.7 KiB
TypeScript
|
|
import { formatAnalyzeError } from "@/lib/ai/analyze-topic";
|
|||
|
|
import { JobCancelledError, isJobCancelled } from "@/lib/jobs/cancel";
|
|||
|
|
import { prisma } from "@/lib/db";
|
|||
|
|
import { getActiveAccountId } from "@/lib/account-context";
|
|||
|
|
import { analyzeTopic } from "@/lib/services/analyze";
|
|||
|
|
import { runScanForTopic } from "@/lib/services/scan";
|
|||
|
|
import { runStyle8DAnalysis } from "@/lib/services/style-analysis";
|
|||
|
|
import type { AnalyzeTopicPayload, JobType, ScanPayload, Style8DPayload } from "./types";
|
|||
|
|
import { ZodError } from "zod";
|
|||
|
|
|
|||
|
|
/** 超過此時間未更新進度,視為伺服器重啟或 API 掛死 */
|
|||
|
|
const STALE_JOB_MS: Partial<Record<JobType, number>> = {
|
|||
|
|
"analyze-topic": 8 * 60 * 1000,
|
|||
|
|
"style-8d": 8 * 60 * 1000,
|
|||
|
|
scan: 25 * 60 * 1000,
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
const STALE_JOB_ERROR =
|
|||
|
|
"任務逾時或伺服器重啟導致中斷,請重新執行(若經常發生可改用較快的 AI 模型)。";
|
|||
|
|
|
|||
|
|
/** 將卡死的 pending/running 任務標為失敗,避免 UI 永遠轉圈 */
|
|||
|
|
export async function recoverStaleBackgroundJobs(): Promise<number> {
|
|||
|
|
const active = await prisma.backgroundJob.findMany({
|
|||
|
|
where: { status: { in: ["pending", "running"] } },
|
|||
|
|
select: { id: true, type: true, updatedAt: true },
|
|||
|
|
});
|
|||
|
|
const now = Date.now();
|
|||
|
|
let recovered = 0;
|
|||
|
|
|
|||
|
|
for (const job of active) {
|
|||
|
|
const maxMs = STALE_JOB_MS[job.type as JobType] ?? 12 * 60 * 1000;
|
|||
|
|
if (now - job.updatedAt.getTime() <= maxMs) continue;
|
|||
|
|
|
|||
|
|
await prisma.backgroundJob.update({
|
|||
|
|
where: { id: job.id },
|
|||
|
|
data: {
|
|||
|
|
status: "failed",
|
|||
|
|
progress: "失敗",
|
|||
|
|
error: STALE_JOB_ERROR,
|
|||
|
|
completedAt: new Date(),
|
|||
|
|
},
|
|||
|
|
});
|
|||
|
|
recovered += 1;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return recovered;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
function formatJobError(error: unknown, type: JobType): string {
|
|||
|
|
if (typeof error === "string") return error;
|
|||
|
|
if (type === "analyze-topic") return formatAnalyzeError(error);
|
|||
|
|
if (type === "style-8d" && error instanceof ZodError) {
|
|||
|
|
return "AI 有回覆,但 8D 欄位格式仍不完整。請重試一次;若持續發生,請在設定更換研究模型。";
|
|||
|
|
}
|
|||
|
|
return error instanceof Error ? error.message : "任務失敗";
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export async function runBackgroundJob(jobId: string) {
|
|||
|
|
const job = await prisma.backgroundJob.findUnique({ where: { id: jobId } });
|
|||
|
|
if (!job || job.status !== "pending") return;
|
|||
|
|
if (await isJobCancelled(jobId)) return;
|
|||
|
|
|
|||
|
|
await prisma.backgroundJob.update({
|
|||
|
|
where: { id: jobId },
|
|||
|
|
data: { status: "running", progress: "執行中…" },
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
const type = job.type as JobType;
|
|||
|
|
|
|||
|
|
try {
|
|||
|
|
let result: unknown;
|
|||
|
|
|
|||
|
|
if (type === "analyze-topic") {
|
|||
|
|
let payload: AnalyzeTopicPayload;
|
|||
|
|
try {
|
|||
|
|
payload = JSON.parse(job.payload ?? "{}") as AnalyzeTopicPayload;
|
|||
|
|
} catch {
|
|||
|
|
throw new Error("任務資料損毀,請重新建立任務");
|
|||
|
|
}
|
|||
|
|
await prisma.backgroundJob.update({
|
|||
|
|
where: { id: jobId },
|
|||
|
|
data: { progress: "AI 分析中,約需 1~4 分鐘…" },
|
|||
|
|
});
|
|||
|
|
result = await analyzeTopic(payload.topicId, payload.brief, {
|
|||
|
|
jobId,
|
|||
|
|
onProgress: async (message) => {
|
|||
|
|
await prisma.backgroundJob.update({
|
|||
|
|
where: { id: jobId },
|
|||
|
|
data: { progress: message },
|
|||
|
|
});
|
|||
|
|
},
|
|||
|
|
});
|
|||
|
|
} else if (type === "style-8d") {
|
|||
|
|
let payload: Style8DPayload;
|
|||
|
|
try {
|
|||
|
|
payload = JSON.parse(job.payload ?? "{}") as Style8DPayload;
|
|||
|
|
} catch {
|
|||
|
|
throw new Error("任務資料損毀,請重新建立任務");
|
|||
|
|
}
|
|||
|
|
result = await runStyle8DAnalysis(payload.accountId, payload.benchmarkUsername, jobId);
|
|||
|
|
} else if (type === "scan") {
|
|||
|
|
let payload: ScanPayload;
|
|||
|
|
try {
|
|||
|
|
payload = JSON.parse(job.payload ?? "{}") as ScanPayload;
|
|||
|
|
} catch {
|
|||
|
|
throw new Error("任務資料損毀,請重新建立任務");
|
|||
|
|
}
|
|||
|
|
if (payload.selectedTags?.length) {
|
|||
|
|
await prisma.topic.update({
|
|||
|
|
where: { id: payload.topicId },
|
|||
|
|
data: { selectedTags: JSON.stringify(payload.selectedTags) },
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
await prisma.backgroundJob.update({
|
|||
|
|
where: { id: jobId },
|
|||
|
|
data: { progress: "海巡中,依標籤搜尋 Threads…" },
|
|||
|
|
});
|
|||
|
|
result = await runScanForTopic(payload.topicId, {
|
|||
|
|
useTags: payload.useTags,
|
|||
|
|
selectedTags: payload.selectedTags,
|
|||
|
|
jobId,
|
|||
|
|
onProgress: async (message) => {
|
|||
|
|
await prisma.backgroundJob.update({
|
|||
|
|
where: { id: jobId },
|
|||
|
|
data: { progress: message },
|
|||
|
|
});
|
|||
|
|
},
|
|||
|
|
});
|
|||
|
|
} else {
|
|||
|
|
throw new Error(`未知的任務類型:${job.type}`);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if (await isJobCancelled(jobId)) return;
|
|||
|
|
|
|||
|
|
const latest = await prisma.backgroundJob.findUnique({ where: { id: jobId } });
|
|||
|
|
await prisma.backgroundJob.update({
|
|||
|
|
where: { id: jobId },
|
|||
|
|
data: {
|
|||
|
|
status: "completed",
|
|||
|
|
progress: latest?.progress ?? "完成",
|
|||
|
|
result: JSON.stringify(result),
|
|||
|
|
completedAt: new Date(),
|
|||
|
|
},
|
|||
|
|
});
|
|||
|
|
} catch (error) {
|
|||
|
|
if (error instanceof JobCancelledError || (await isJobCancelled(jobId))) {
|
|||
|
|
await prisma.backgroundJob.update({
|
|||
|
|
where: { id: jobId },
|
|||
|
|
data: {
|
|||
|
|
status: "cancelled",
|
|||
|
|
progress: "已停止",
|
|||
|
|
completedAt: new Date(),
|
|||
|
|
},
|
|||
|
|
});
|
|||
|
|
return;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
const message = formatJobError(error, type);
|
|||
|
|
await prisma.backgroundJob.update({
|
|||
|
|
where: { id: jobId },
|
|||
|
|
data: {
|
|||
|
|
status: "failed",
|
|||
|
|
progress: "失敗",
|
|||
|
|
error: message,
|
|||
|
|
completedAt: new Date(),
|
|||
|
|
},
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export async function findActiveJob(topicId: string, type: JobType) {
|
|||
|
|
await recoverStaleBackgroundJobs();
|
|||
|
|
const accountId = await getActiveAccountId();
|
|||
|
|
return prisma.backgroundJob.findFirst({
|
|||
|
|
where: { topicId, type, status: { in: ["pending", "running"] }, ...(accountId ? { accountId } : {}) },
|
|||
|
|
orderBy: { createdAt: "desc" },
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export async function findActiveAccountJob(accountId: string, type: JobType) {
|
|||
|
|
await recoverStaleBackgroundJobs();
|
|||
|
|
return prisma.backgroundJob.findFirst({
|
|||
|
|
where: { accountId, type, status: { in: ["pending", "running"] } },
|
|||
|
|
orderBy: { createdAt: "desc" },
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export async function enqueueJob(params: {
|
|||
|
|
type: JobType;
|
|||
|
|
topicId?: string;
|
|||
|
|
accountId?: string;
|
|||
|
|
label?: string;
|
|||
|
|
payload: unknown;
|
|||
|
|
}) {
|
|||
|
|
await recoverStaleBackgroundJobs();
|
|||
|
|
const accountId = params.accountId ?? (await getActiveAccountId());
|
|||
|
|
const job = await prisma.backgroundJob.create({
|
|||
|
|
data: {
|
|||
|
|
accountId,
|
|||
|
|
type: params.type,
|
|||
|
|
topicId: params.topicId,
|
|||
|
|
label: params.label,
|
|||
|
|
payload: JSON.stringify(params.payload),
|
|||
|
|
status: "pending",
|
|||
|
|
progress: "排隊中…",
|
|||
|
|
},
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
return job;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export { cancelJob } from "./cancel";
|