haixunMaster/lib/threads-browser/search.ts

293 lines
9.9 KiB
TypeScript
Raw Permalink Normal View History

2026-06-21 12:50:31 +00:00
import { parseCount } from "@/lib/utils";
import { runWithConcurrency } from "@/lib/utils/concurrency";
import { rankAndDedupe, type RankedPost } from "@/lib/ranking";
import type { ScanTask } from "@/lib/services/scan-tasks";
import { withSharedContext, type BrowserSessionOptions } from "./browser";
import {
DEFAULT_TASK_STAGGER_MS,
getBrowserConcurrency,
humanLandingPause,
humanPause,
humanScrollPage,
} from "./human-behavior";
import { browserSessionOptions, type ActiveSession } from "./session";
import { extractFromPageScripts, extractPostsFromJson } from "./extract";
import { getProfilePostsOnPage } from "./profile";
import type { BrowserCrawlStep, BrowserProgressCallback } from "./progress";
import type { ThreadsPost } from "./types";
import {
assertThreadsPageSafe,
consumeCrawlerPageQuota,
limitCrawlerTasks,
} from "./safety";
const HOME_URL = "https://www.threads.com/";
const IG_APP_ID = "238260118697367";
async function scrapeFromDom(page: import("playwright").Page): Promise<ThreadsPost[]> {
const posts: ThreadsPost[] = [];
const seen = new Set<string>();
const postLinks = page.locator('a[href*="/post/"]');
const count = await postLinks.count();
for (let i = 0; i < Math.min(count, 40); i++) {
const link = postLinks.nth(i);
try {
const href = await link.getAttribute("href", { timeout: 1200 });
if (!href || seen.has(href)) continue;
seen.add(href);
const permalink = href.startsWith("http") ? href : `https://www.threads.com${href}`;
const authorName = href.match(/@([^/]+)\/post/)?.[1];
const container = link.locator("xpath=ancestor::*[contains(@data-pressable-container,'true')][1]");
const hasContainer = (await container.count()) > 0;
const scope = hasContainer ? container : link.locator("xpath=ancestor::div[position()<=6]").first();
const textEl = scope.locator('div[dir="auto"], span[dir="auto"]').first();
const text = await textEl.innerText({ timeout: 1200 }).catch(() => "");
if (!text || text.length < 5) continue;
const statsText = await scope.innerText().catch(() => "");
const likeMatch = statsText.match(/(\d[\d.,kKmM萬]*)\s*(讚|likes?)/i);
const replyMatch = statsText.match(/(\d[\d.,kKmM萬]*)\s*(則回覆|replies?|留言)/i);
posts.push({
text: text.trim(),
permalink,
authorName,
likeCount: parseCount(likeMatch?.[1]),
replyCount: parseCount(replyMatch?.[1]),
externalId: href.match(/\/post\/([^/?]+)/)?.[1],
});
} catch {
// skip unreadable post
}
}
return posts;
}
async function primeSession(
page: import("playwright").Page,
onStep?: BrowserProgressCallback
): Promise<void> {
await onStep?.("session_check");
const response = await page.goto(HOME_URL, { waitUntil: "domcontentloaded", timeout: 45000 });
await onStep?.("landing_pause", "首頁");
await humanLandingPause(page);
const bodyText = await page.locator("body").innerText().catch(() => "");
await assertThreadsPageSafe(page, response?.status(), bodyText);
if (page.url().includes("/login")) {
throw new Error("Session 已失效,請到設定頁重新登入 Threads");
}
}
function attachGraphqlCollector(page: import("playwright").Page, collected: ThreadsPost[]) {
page.on("response", async (response) => {
const url = response.url();
if (!url.includes("graphql") && !url.includes("threads") && !url.includes("instagram")) {
return;
}
try {
const contentType = response.headers()["content-type"] ?? "";
if (!contentType.includes("json")) return;
const json = await response.json();
extractPostsFromJson(json, collected);
} catch {
// ignore parse errors
}
});
}
export async function searchQueryOnPage(
page: import("playwright").Page,
query: string,
limit = 20,
onStep?: BrowserProgressCallback
): Promise<RankedPost[]> {
const collected: ThreadsPost[] = [];
attachGraphqlCollector(page, collected);
const searchUrl = `https://www.threads.com/search?q=${encodeURIComponent(query)}&serp_type=default`;
await onStep?.("open_page", `搜尋「${query}`);
const response = await page.goto(searchUrl, { waitUntil: "domcontentloaded", timeout: 45000 });
await onStep?.("landing_pause");
await humanLandingPause(page);
const bodyText = await page.locator("body").innerText();
await assertThreadsPageSafe(page, response?.status(), bodyText);
if (bodyText.includes("走丟") || bodyText.includes("頁面不存在")) {
await onStep?.("done", "0 篇");
return [];
}
await onStep?.("wait_content");
await page.waitForSelector('a[href*="/post/"]', { timeout: 12000 }).catch(() => undefined);
await onStep?.("scroll");
await humanScrollPage(page, { minPasses: 2, maxPasses: 4 });
if (collected.length < 3) {
await onStep?.("parse_scripts");
const { posts: scriptPosts } = await extractFromPageScripts(page);
collected.push(...scriptPosts);
} else {
await onStep?.("parse_network", `${collected.length}`);
}
if (collected.length < 3) {
await onStep?.("parse_dom");
const domPosts = await scrapeFromDom(page);
collected.push(...domPosts);
}
const ranked = rankAndDedupe(collected, limit);
await onStep?.("done", `${ranked.length}`);
return ranked;
}
function resolveSessionOptions(session?: ActiveSession): BrowserSessionOptions | undefined {
return session ? browserSessionOptions(session) : undefined;
}
export interface BrowserSearchOptions {
onStep?: BrowserProgressCallback;
}
export async function search(
storageState: string,
query: string,
limit = 20,
session?: ActiveSession,
options?: BrowserSearchOptions
): Promise<RankedPost[]> {
return withSharedContext(
storageState,
async (context) => {
const page = await context.newPage();
await primeSession(page, options?.onStep);
return searchQueryOnPage(page, query, limit, options?.onStep);
},
resolveSessionOptions(session)
);
}
export type RankedPostWithTag = RankedPost & { searchTag?: string };
function mergePosts(batches: RankedPostWithTag[][]): RankedPostWithTag[] {
const merged = new Map<string, RankedPostWithTag>();
for (const batch of batches) {
for (const post of batch) {
const key = post.externalId ?? `${post.authorName ?? ""}:${post.text.trim().slice(0, 120)}`;
const existing = merged.get(key);
if (!existing || post.score > existing.score) {
merged.set(key, post);
}
}
}
return Array.from(merged.values()).sort((a, b) => b.score - a.score);
}
/** 執行混合任務:關鍵字搜尋 + 帳號海巡,有限平行(建議 ≤3 */
export async function executeScanTasks(
storageState: string,
tasks: ScanTask[],
options?: {
session?: ActiveSession;
concurrency?: number;
onProgress?: (done: number, total: number, label: string) => void;
shouldAbort?: () => boolean | Promise<boolean>;
onTaskStart?: (task: ScanTask) => void | Promise<void>;
onTaskDone?: (task: ScanTask, found: number) => void | Promise<void>;
onTaskFail?: (task: ScanTask, error: unknown) => void | Promise<void>;
onTaskStep?: (
task: ScanTask,
step: BrowserCrawlStep,
detail?: string
) => void | Promise<void>;
}
): Promise<RankedPostWithTag[]> {
const safeTasks = limitCrawlerTasks(tasks);
if (safeTasks.length === 0) return [];
try {
return await withSharedContext(storageState, async (context) => {
const primePage = await context.newPage();
await primeSession(primePage, (step, detail) =>
options?.onTaskStep?.(safeTasks[0] ?? { id: "prime", kind: "keyword", query: "", label: "初始化", limit: 0 }, step, detail)
);
await primePage.close();
const results = await runWithConcurrency(
safeTasks,
async (task) => {
consumeCrawlerPageQuota(options?.session?.id);
await options?.onTaskStart?.(task);
const page = await context.newPage();
const onStep: BrowserProgressCallback = (step, detail) =>
options?.onTaskStep?.(task, step, detail);
try {
let posts: RankedPost[];
if (task.kind === "account") {
posts = await getProfilePostsOnPage(page, task.query, task.limit, onStep);
} else {
posts = await searchQueryOnPage(page, task.query, task.limit, onStep);
}
const mapped = posts.map((post) => ({ ...post, searchTag: task.label }));
await options?.onTaskDone?.(task, mapped.length);
return mapped;
} catch (error) {
await options?.onTaskFail?.(task, error);
throw error;
} finally {
await onStep("between_pages");
await humanPause("betweenPages");
await page.close();
}
},
{
concurrency: options?.concurrency ?? getBrowserConcurrency(),
staggerMs: DEFAULT_TASK_STAGGER_MS,
shouldAbort: options?.shouldAbort,
onProgress: (done, total) => {
const task = safeTasks[done - 1];
options?.onProgress?.(done, total, task?.label ?? "");
},
}
);
return mergePosts(results);
}, resolveSessionOptions(options?.session));
} catch (error) {
if (error instanceof Error && error.message === "__JOB_ABORT__") {
const { JobCancelledError } = await import("@/lib/jobs/cancel");
throw new JobCancelledError();
}
throw error;
}
}
/** @deprecated 使用 executeScanTasks */
export async function searchTagsParallel(
storageState: string,
tags: string[],
perTagLimit: number,
options?: {
concurrency?: number;
onProgress?: (done: number, total: number, tag: string) => void;
}
): Promise<RankedPostWithTag[]> {
const tasks: ScanTask[] = tags.map((tag) => ({
id: `keyword:${tag}`,
kind: "keyword",
query: tag,
label: tag,
limit: perTagLimit,
}));
return executeScanTasks(storageState, tasks, options);
}
export { IG_APP_ID };