import { parseCount } from "@/lib/utils"; import { runWithConcurrency } from "@/lib/utils/concurrency"; import { rankAndDedupe, type RankedPost } from "@/lib/ranking"; import type { ScanTask } from "@/lib/services/scan-tasks"; import { withSharedContext, type BrowserSessionOptions } from "./browser"; import { DEFAULT_TASK_STAGGER_MS, getBrowserConcurrency, humanLandingPause, humanPause, humanScrollPage, } from "./human-behavior"; import { browserSessionOptions, type ActiveSession } from "./session"; import { extractFromPageScripts, extractPostsFromJson } from "./extract"; import { getProfilePostsOnPage } from "./profile"; import type { BrowserCrawlStep, BrowserProgressCallback } from "./progress"; import type { ThreadsPost } from "./types"; import { assertThreadsPageSafe, consumeCrawlerPageQuota, limitCrawlerTasks, } from "./safety"; const HOME_URL = "https://www.threads.com/"; const IG_APP_ID = "238260118697367"; async function scrapeFromDom(page: import("playwright").Page): Promise { const posts: ThreadsPost[] = []; const seen = new Set(); const postLinks = page.locator('a[href*="/post/"]'); const count = await postLinks.count(); for (let i = 0; i < Math.min(count, 40); i++) { const link = postLinks.nth(i); try { const href = await link.getAttribute("href", { timeout: 1200 }); if (!href || seen.has(href)) continue; seen.add(href); const permalink = href.startsWith("http") ? href : `https://www.threads.com${href}`; const authorName = href.match(/@([^/]+)\/post/)?.[1]; const container = link.locator("xpath=ancestor::*[contains(@data-pressable-container,'true')][1]"); const hasContainer = (await container.count()) > 0; const scope = hasContainer ? container : link.locator("xpath=ancestor::div[position()<=6]").first(); const textEl = scope.locator('div[dir="auto"], span[dir="auto"]').first(); const text = await textEl.innerText({ timeout: 1200 }).catch(() => ""); if (!text || text.length < 5) continue; const statsText = await scope.innerText().catch(() => ""); const likeMatch = statsText.match(/(\d[\d.,kKmM萬]*)\s*(讚|likes?)/i); const replyMatch = statsText.match(/(\d[\d.,kKmM萬]*)\s*(則回覆|replies?|留言)/i); posts.push({ text: text.trim(), permalink, authorName, likeCount: parseCount(likeMatch?.[1]), replyCount: parseCount(replyMatch?.[1]), externalId: href.match(/\/post\/([^/?]+)/)?.[1], }); } catch { // skip unreadable post } } return posts; } async function primeSession( page: import("playwright").Page, onStep?: BrowserProgressCallback ): Promise { await onStep?.("session_check"); const response = await page.goto(HOME_URL, { waitUntil: "domcontentloaded", timeout: 45000 }); await onStep?.("landing_pause", "首頁"); await humanLandingPause(page); const bodyText = await page.locator("body").innerText().catch(() => ""); await assertThreadsPageSafe(page, response?.status(), bodyText); if (page.url().includes("/login")) { throw new Error("Session 已失效,請到設定頁重新登入 Threads"); } } function attachGraphqlCollector(page: import("playwright").Page, collected: ThreadsPost[]) { page.on("response", async (response) => { const url = response.url(); if (!url.includes("graphql") && !url.includes("threads") && !url.includes("instagram")) { return; } try { const contentType = response.headers()["content-type"] ?? ""; if (!contentType.includes("json")) return; const json = await response.json(); extractPostsFromJson(json, collected); } catch { // ignore parse errors } }); } export async function searchQueryOnPage( page: import("playwright").Page, query: string, limit = 20, onStep?: BrowserProgressCallback ): Promise { const collected: ThreadsPost[] = []; attachGraphqlCollector(page, collected); const searchUrl = `https://www.threads.com/search?q=${encodeURIComponent(query)}&serp_type=default`; await onStep?.("open_page", `搜尋「${query}」`); const response = await page.goto(searchUrl, { waitUntil: "domcontentloaded", timeout: 45000 }); await onStep?.("landing_pause"); await humanLandingPause(page); const bodyText = await page.locator("body").innerText(); await assertThreadsPageSafe(page, response?.status(), bodyText); if (bodyText.includes("走丟") || bodyText.includes("頁面不存在")) { await onStep?.("done", "0 篇"); return []; } await onStep?.("wait_content"); await page.waitForSelector('a[href*="/post/"]', { timeout: 12000 }).catch(() => undefined); await onStep?.("scroll"); await humanScrollPage(page, { minPasses: 2, maxPasses: 4 }); if (collected.length < 3) { await onStep?.("parse_scripts"); const { posts: scriptPosts } = await extractFromPageScripts(page); collected.push(...scriptPosts); } else { await onStep?.("parse_network", `${collected.length} 篇`); } if (collected.length < 3) { await onStep?.("parse_dom"); const domPosts = await scrapeFromDom(page); collected.push(...domPosts); } const ranked = rankAndDedupe(collected, limit); await onStep?.("done", `${ranked.length} 篇`); return ranked; } function resolveSessionOptions(session?: ActiveSession): BrowserSessionOptions | undefined { return session ? browserSessionOptions(session) : undefined; } export interface BrowserSearchOptions { onStep?: BrowserProgressCallback; } export async function search( storageState: string, query: string, limit = 20, session?: ActiveSession, options?: BrowserSearchOptions ): Promise { return withSharedContext( storageState, async (context) => { const page = await context.newPage(); await primeSession(page, options?.onStep); return searchQueryOnPage(page, query, limit, options?.onStep); }, resolveSessionOptions(session) ); } export type RankedPostWithTag = RankedPost & { searchTag?: string }; function mergePosts(batches: RankedPostWithTag[][]): RankedPostWithTag[] { const merged = new Map(); for (const batch of batches) { for (const post of batch) { const key = post.externalId ?? `${post.authorName ?? ""}:${post.text.trim().slice(0, 120)}`; const existing = merged.get(key); if (!existing || post.score > existing.score) { merged.set(key, post); } } } return Array.from(merged.values()).sort((a, b) => b.score - a.score); } /** 執行混合任務:關鍵字搜尋 + 帳號海巡,有限平行(建議 ≤3) */ export async function executeScanTasks( storageState: string, tasks: ScanTask[], options?: { session?: ActiveSession; concurrency?: number; onProgress?: (done: number, total: number, label: string) => void; shouldAbort?: () => boolean | Promise; onTaskStart?: (task: ScanTask) => void | Promise; onTaskDone?: (task: ScanTask, found: number) => void | Promise; onTaskFail?: (task: ScanTask, error: unknown) => void | Promise; onTaskStep?: ( task: ScanTask, step: BrowserCrawlStep, detail?: string ) => void | Promise; } ): Promise { const safeTasks = limitCrawlerTasks(tasks); if (safeTasks.length === 0) return []; try { return await withSharedContext(storageState, async (context) => { const primePage = await context.newPage(); await primeSession(primePage, (step, detail) => options?.onTaskStep?.(safeTasks[0] ?? { id: "prime", kind: "keyword", query: "", label: "初始化", limit: 0 }, step, detail) ); await primePage.close(); const results = await runWithConcurrency( safeTasks, async (task) => { consumeCrawlerPageQuota(options?.session?.id); await options?.onTaskStart?.(task); const page = await context.newPage(); const onStep: BrowserProgressCallback = (step, detail) => options?.onTaskStep?.(task, step, detail); try { let posts: RankedPost[]; if (task.kind === "account") { posts = await getProfilePostsOnPage(page, task.query, task.limit, onStep); } else { posts = await searchQueryOnPage(page, task.query, task.limit, onStep); } const mapped = posts.map((post) => ({ ...post, searchTag: task.label })); await options?.onTaskDone?.(task, mapped.length); return mapped; } catch (error) { await options?.onTaskFail?.(task, error); throw error; } finally { await onStep("between_pages"); await humanPause("betweenPages"); await page.close(); } }, { concurrency: options?.concurrency ?? getBrowserConcurrency(), staggerMs: DEFAULT_TASK_STAGGER_MS, shouldAbort: options?.shouldAbort, onProgress: (done, total) => { const task = safeTasks[done - 1]; options?.onProgress?.(done, total, task?.label ?? ""); }, } ); return mergePosts(results); }, resolveSessionOptions(options?.session)); } catch (error) { if (error instanceof Error && error.message === "__JOB_ABORT__") { const { JobCancelledError } = await import("@/lib/jobs/cancel"); throw new JobCancelledError(); } throw error; } } /** @deprecated 使用 executeScanTasks */ export async function searchTagsParallel( storageState: string, tags: string[], perTagLimit: number, options?: { concurrency?: number; onProgress?: (done: number, total: number, tag: string) => void; } ): Promise { const tasks: ScanTask[] = tags.map((tag) => ({ id: `keyword:${tag}`, kind: "keyword", query: tag, label: tag, limit: perTagLimit, })); return executeScanTasks(storageState, tasks, options); } export { IG_APP_ID };