import { existsSync } from "fs"; import { getActiveAccountProfile, setWorkerActiveAccountId } from "@/lib/account-context"; import { prisma, resolvePersona } from "@/lib/db"; import { getActiveAccountConnectionSettings } from "@/lib/account-connection-settings"; import { getOrCreateSettings } from "@/lib/user-settings"; import { parseProviderApiKeys } from "@/lib/ai/keys"; import { generateInboundReplyDrafts } from "@/lib/ai/generate-replies"; import { runScanForAllActiveTopics } from "@/lib/services/scan"; import { generateDraftsForScan } from "@/lib/services/generate"; import { generateOutreachForScanItem } from "@/lib/services/outreach"; import { syncThreadsOwnPostsAndInsights } from "@/lib/services/threads-api-sync"; import { getActiveThreadsCredentials } from "@/lib/services/threads-credentials"; import { publishViaThreadsApi, replyViaThreadsApi } from "@/lib/threads-api"; import { ensureActiveSession, publish } from "@/lib/threads-browser"; import { deleteDraftImages, draftImageAbsolutePath, parseDraftImagePaths, } from "@/lib/drafts/images"; import { humanDelay } from "@/lib/utils"; import { cronMatches } from "./cron-match"; import { logAction, countTodaySuccess } from "./log"; import { getRulesForAccount } from "./rules"; import type { AutomationMode, AutomationTaskType } from "./types"; const OUTREACH_GENERATE_LIMIT = 8; async function setActiveAccount(accountId: string) { setWorkerActiveAccountId(accountId); } export interface TaskRunResult { taskType: AutomationTaskType; ok: boolean; summary: string; count?: number; } /** 對某帳號執行某項任務。triggeredBy=auto 代表排程觸發、會記入配額;manual 代表使用者按「立即執行」。 */ export async function runTaskForAccount( accountId: string, taskType: AutomationTaskType, opts: { triggeredBy: "auto" | "manual"; mode?: AutomationMode } ): Promise { const previousActive = (await getActiveAccountProfile())?.id ?? null; await setActiveAccount(accountId); try { const rule = (await getRulesForAccount(accountId)).find((r) => r.taskType === taskType); const mode: AutomationMode = opts.mode ?? (rule?.mode as AutomationMode) ?? "manual"; const dailyCap = rule?.dailyCap ?? 10; switch (taskType) { case "scan": return await runScanTask(accountId, opts.triggeredBy); case "generate": return await runGenerateTask(accountId, opts.triggeredBy); case "publish": return await runPublishTask(accountId, mode, dailyCap, opts.triggeredBy); case "outreach": return await runOutreachTask(accountId, mode, dailyCap, opts.triggeredBy); case "engagement": return await runEngagementTask(accountId, mode, dailyCap, opts.triggeredBy); default: return { taskType, ok: false, summary: "未知任務" }; } } catch (error) { const message = error instanceof Error ? error.message : "任務失敗"; await logAction({ accountId, taskType, action: taskType, mode: opts.triggeredBy, status: "failed", error: message, }); return { taskType, ok: false, summary: message }; } finally { if (previousActive && previousActive !== accountId) { await setActiveAccount(previousActive); } else { setWorkerActiveAccountId(null); } } } async function runScanTask( accountId: string, triggeredBy: "auto" | "manual" ): Promise { const scans = await runScanForAllActiveTopics(accountId); const valid = scans.filter(Boolean); await logAction({ accountId, taskType: "scan", action: "scan", mode: triggeredBy, status: "success", detail: `海巡 ${valid.length} 個主題`, }); return { taskType: "scan", ok: true, summary: `海巡完成,共 ${valid.length} 個主題`, count: valid.length }; } async function runGenerateTask( accountId: string, triggeredBy: "auto" | "manual" ): Promise { // 找近 24h、屬於本帳號、尚未生成草稿的海巡 const since = new Date(Date.now() - 24 * 60 * 60 * 1000); const scans = await prisma.scan.findMany({ where: { accountId, createdAt: { gte: since } }, orderBy: { createdAt: "desc" }, include: { _count: { select: { items: true } } }, }); let generated = 0; for (const scan of scans) { if (scan._count.items === 0) continue; const already = await prisma.draft.count({ where: { topicId: scan.topicId, createdAt: { gte: scan.createdAt } }, }); if (already > 0) continue; try { const drafts = await generateDraftsForScan(scan.id); generated += drafts.length; } catch { // 跳過個別失敗 } } await logAction({ accountId, taskType: "generate", action: "generate", mode: triggeredBy, status: "success", detail: `生成 ${generated} 篇草稿`, }); return { taskType: "generate", ok: true, summary: `生成 ${generated} 篇草稿`, count: generated }; } async function autoPublishDraft(draft: { id: string; text: string; angle: string | null; hook: string | null; rationale: string | null; draftType: string | null; accountId: string | null; topicId: string | null; imagePath?: string | null; imagePaths?: string | null; }): Promise<{ success: boolean; permalink?: string; error?: string }> { const connection = await getActiveAccountConnectionSettings(); const images = parseDraftImagePaths(draft).filter((p) => existsSync(draftImageAbsolutePath(p))); const credentials = connection.publishViaApi ? await getActiveThreadsCredentials() : null; let result: { success: boolean; permalink?: string; error?: string } | null = null; // API 優先(純文字):有圖時 API 需公開網址,直接走瀏覽器以保留配圖。 if (credentials && images.length === 0) { try { const apiResult = await publishViaThreadsApi(credentials, { text: draft.text }); result = apiResult.success ? { success: true, permalink: apiResult.permalink } : { success: false, error: apiResult.error }; } catch (error) { result = { success: false, error: error instanceof Error ? error.message : "API 發布失敗" }; } } // API 未連線、有配圖、或 API 發布失敗 → 退回瀏覽器 if (!result || !result.success) { try { const session = await ensureActiveSession(); const browserResult = await publish( session, draft.text, images.map((p) => draftImageAbsolutePath(p)) ); result = { success: browserResult.success, permalink: browserResult.permalink, error: browserResult.error, }; } catch (error) { result = result ?? { success: false, error: error instanceof Error ? error.message : "發布失敗", }; } } if (!result.success) return result; await prisma.$transaction(async (tx) => { await tx.published.create({ data: { accountId: draft.accountId, topicId: draft.topicId, text: draft.text, angle: draft.angle, hook: draft.hook, rationale: draft.rationale, draftType: draft.draftType, permalink: result.permalink, }, }); await deleteDraftImages(images); await tx.draft.delete({ where: { id: draft.id } }); }); return result; } async function runPublishTask( accountId: string, mode: AutomationMode, dailyCap: number, triggeredBy: "auto" | "manual" ): Promise { if (mode !== "auto") { return { taskType: "publish", ok: true, summary: "publish 為 manual 模式,草稿保留待人工審核", count: 0, }; } const used = await countTodaySuccess(accountId, "publish"); const remaining = Math.max(0, dailyCap - used); if (remaining === 0) { return { taskType: "publish", ok: true, summary: "今日發文已達上限", count: 0 }; } const drafts = await prisma.draft.findMany({ where: { accountId, status: "PENDING" }, orderBy: { createdAt: "asc" }, take: remaining, }); let published = 0; for (const draft of drafts) { const result = await autoPublishDraft(draft); if (result.success) { published += 1; await logAction({ accountId, taskType: "publish", action: "publish-post", mode: triggeredBy, status: "success", targetId: draft.id, permalink: result.permalink, detail: draft.text.slice(0, 60), }); await humanDelay(8000, 20000); } else { await logAction({ accountId, taskType: "publish", action: "publish-post", mode: triggeredBy, status: "failed", targetId: draft.id, error: result.error, }); } } return { taskType: "publish", ok: true, summary: `自動發布 ${published} 篇`, count: published }; } async function runOutreachTask( accountId: string, mode: AutomationMode, dailyCap: number, triggeredBy: "auto" | "manual" ): Promise { // Step 1:為近期、相關、尚未生成獲客留言的貼文生成草稿 const since = new Date(Date.now() - 3 * 24 * 60 * 60 * 1000); const candidates = await prisma.scanItem.findMany({ where: { externalId: { not: null }, OR: [{ qualityTier: null }, { qualityTier: { not: "EXCLUDE" } }], outreachTargets: { none: {} }, scan: { accountId, scanGoal: "placement", createdAt: { gte: since } }, }, orderBy: [{ combinedScore: "desc" }, { score: "desc" }], take: OUTREACH_GENERATE_LIMIT, }); let drafted = 0; for (const item of candidates) { try { await generateOutreachForScanItem(item.id); drafted += 1; } catch { // 略過個別失敗 } } if (mode !== "auto") { await logAction({ accountId, taskType: "outreach", action: "outreach-generate", mode: triggeredBy, status: "success", detail: `生成 ${drafted} 則獲客留言草稿(待審核)`, }); return { taskType: "outreach", ok: true, summary: `生成 ${drafted} 則留言草稿,待人工審核後留言`, count: drafted, }; } // Step 2:auto 模式 → 直接留言推廣(受配額限制) const used = await countTodaySuccess(accountId, "outreach"); const remaining = Math.max(0, dailyCap - used); let commented = 0; if (remaining > 0) { const credentials = await getActiveThreadsCredentials(); if (!credentials) { return { taskType: "outreach", ok: false, summary: `生成 ${drafted} 則草稿,但 Threads API 未連線,無法自動留言`, count: drafted, }; } const readyDrafts = await prisma.outreachDraft.findMany({ where: { status: "PENDING", outreachTarget: { status: "DRAFTED", scanItem: { externalId: { not: null }, scan: { accountId, scanGoal: "placement" }, }, }, }, orderBy: { createdAt: "asc" }, take: remaining, include: { outreachTarget: { include: { scanItem: true } } }, }); for (const draft of readyDrafts) { const externalId = draft.outreachTarget.scanItem.externalId; if (!externalId) continue; const result = await replyViaThreadsApi(credentials, { replyToId: externalId, text: draft.text, }); if (result.success) { commented += 1; await prisma.$transaction([ prisma.outreachDraft.update({ where: { id: draft.id }, data: { status: "PUBLISHED", publishedAt: new Date() }, }), prisma.outreachTarget.update({ where: { id: draft.outreachTargetId }, data: { status: "COMMENTED" }, }), ]); await logAction({ accountId, taskType: "outreach", action: "reply-outreach", mode: triggeredBy, status: "success", targetId: draft.id, externalId, detail: draft.text.slice(0, 60), }); await humanDelay(10000, 25000); } else { await logAction({ accountId, taskType: "outreach", action: "reply-outreach", mode: triggeredBy, status: "failed", targetId: draft.id, externalId, error: result.error, }); } } } return { taskType: "outreach", ok: true, summary: `生成 ${drafted} 則草稿,自動留言 ${commented} 則`, count: commented, }; } async function runEngagementTask( accountId: string, mode: AutomationMode, dailyCap: number, triggeredBy: "auto" | "manual" ): Promise { // Step 1:同步自己貼文與底下新留言 try { await syncThreadsOwnPostsAndInsights({ postsLimit: 25, repliesLimit: 25 }); } catch (error) { return { taskType: "engagement", ok: false, summary: error instanceof Error ? error.message : "同步留言失敗", }; } // Step 2:為 NEW 留言生成回覆草稿 const settings = await getOrCreateSettings(); const account = await getActiveAccountProfile(); const apiKeys = parseProviderApiKeys(settings.providerApiKeys); const newReplies = await prisma.inboundReply.findMany({ where: { status: "NEW", published: { accountId } }, orderBy: { createdAt: "desc" }, take: 20, include: { published: true }, }); let drafted = 0; for (const inbound of newReplies) { try { const generated = await generateInboundReplyDrafts({ persona: resolvePersona(settings, account), aiProvider: settings.aiProvider, aiModel: settings.aiModel, apiKeys, publishedText: inbound.published?.text, replyText: inbound.text, authorName: inbound.authorName, count: 2, }); await prisma.inboundReply.update({ where: { id: inbound.id }, data: { sentiment: generated.sentiment, intent: generated.intent, status: "DRAFTED" }, }); for (const draft of generated.drafts) { await prisma.replyDraft.create({ data: { inboundReplyId: inbound.id, text: draft.text, rationale: draft.rationale }, }); } drafted += 1; } catch { // 略過個別失敗 } } if (mode !== "auto") { await logAction({ accountId, taskType: "engagement", action: "engagement-generate", mode: triggeredBy, status: "success", detail: `生成 ${drafted} 則回覆草稿(待審核)`, }); return { taskType: "engagement", ok: true, summary: `生成 ${drafted} 則回覆草稿,待人工審核後回覆`, count: drafted, }; } // Step 3:auto 模式 → 直接回覆(受配額限制) const used = await countTodaySuccess(accountId, "engagement"); const remaining = Math.max(0, dailyCap - used); let replied = 0; if (remaining > 0) { const credentials = await getActiveThreadsCredentials(); if (credentials) { const readyDrafts = await prisma.replyDraft.findMany({ where: { status: "PENDING", inboundReply: { externalId: { not: null }, published: { accountId } }, }, orderBy: { createdAt: "asc" }, take: remaining, include: { inboundReply: true }, }); for (const draft of readyDrafts) { const externalId = draft.inboundReply.externalId; if (!externalId) continue; const result = await replyViaThreadsApi(credentials, { replyToId: externalId, text: draft.text, }); if (result.success) { replied += 1; await prisma.$transaction([ prisma.replyDraft.update({ where: { id: draft.id }, data: { status: "PUBLISHED", publishedAt: new Date() }, }), prisma.inboundReply.update({ where: { id: draft.inboundReplyId }, data: { status: "REPLIED" }, }), ]); await logAction({ accountId, taskType: "engagement", action: "reply-inbound", mode: triggeredBy, status: "success", targetId: draft.id, externalId, detail: draft.text.slice(0, 60), }); await humanDelay(8000, 18000); } else { await logAction({ accountId, taskType: "engagement", action: "reply-inbound", mode: triggeredBy, status: "failed", targetId: draft.id, externalId, error: result.error, }); } } } } return { taskType: "engagement", ok: true, summary: `生成 ${drafted} 則草稿,自動回覆 ${replied} 則`, count: replied, }; } /** 每分鐘輪詢:跑所有「總開關開啟」帳號中「已啟用且排程到點」的規則。 */ export async function runDueAutomationTick(now: Date = new Date()): Promise { const accounts = await prisma.account.findMany({ where: { automationEnabled: true, userId: { not: null } }, }); const results: TaskRunResult[] = []; for (const account of accounts) { const rules = await getRulesForAccount(account.id); for (const rule of rules) { if (!rule.enabled) continue; if (!cronMatches(rule.schedule, now)) continue; const result = await runTaskForAccount(account.id, rule.taskType as AutomationTaskType, { triggeredBy: "auto", }); results.push(result); await prisma.automationRule.update({ where: { id: rule.id }, data: { lastRunAt: now }, }); } } return results; } /** 總殺停:關掉所有帳號的自動化總開關。 */ export async function killAllAutomation(): Promise { const result = await prisma.account.updateMany({ data: { automationEnabled: false }, }); return result.count; }