haixunMaster/lib/threads-browser/search.ts

293 lines
9.9 KiB
TypeScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { parseCount } from "@/lib/utils";
import { runWithConcurrency } from "@/lib/utils/concurrency";
import { rankAndDedupe, type RankedPost } from "@/lib/ranking";
import type { ScanTask } from "@/lib/services/scan-tasks";
import { withSharedContext, type BrowserSessionOptions } from "./browser";
import {
DEFAULT_TASK_STAGGER_MS,
getBrowserConcurrency,
humanLandingPause,
humanPause,
humanScrollPage,
} from "./human-behavior";
import { browserSessionOptions, type ActiveSession } from "./session";
import { extractFromPageScripts, extractPostsFromJson } from "./extract";
import { getProfilePostsOnPage } from "./profile";
import type { BrowserCrawlStep, BrowserProgressCallback } from "./progress";
import type { ThreadsPost } from "./types";
import {
assertThreadsPageSafe,
consumeCrawlerPageQuota,
limitCrawlerTasks,
} from "./safety";
const HOME_URL = "https://www.threads.com/";
const IG_APP_ID = "238260118697367";
async function scrapeFromDom(page: import("playwright").Page): Promise<ThreadsPost[]> {
const posts: ThreadsPost[] = [];
const seen = new Set<string>();
const postLinks = page.locator('a[href*="/post/"]');
const count = await postLinks.count();
for (let i = 0; i < Math.min(count, 40); i++) {
const link = postLinks.nth(i);
try {
const href = await link.getAttribute("href", { timeout: 1200 });
if (!href || seen.has(href)) continue;
seen.add(href);
const permalink = href.startsWith("http") ? href : `https://www.threads.com${href}`;
const authorName = href.match(/@([^/]+)\/post/)?.[1];
const container = link.locator("xpath=ancestor::*[contains(@data-pressable-container,'true')][1]");
const hasContainer = (await container.count()) > 0;
const scope = hasContainer ? container : link.locator("xpath=ancestor::div[position()<=6]").first();
const textEl = scope.locator('div[dir="auto"], span[dir="auto"]').first();
const text = await textEl.innerText({ timeout: 1200 }).catch(() => "");
if (!text || text.length < 5) continue;
const statsText = await scope.innerText().catch(() => "");
const likeMatch = statsText.match(/(\d[\d.,kKmM萬]*)\s*(讚|likes?)/i);
const replyMatch = statsText.match(/(\d[\d.,kKmM萬]*)\s*(則回覆|replies?|留言)/i);
posts.push({
text: text.trim(),
permalink,
authorName,
likeCount: parseCount(likeMatch?.[1]),
replyCount: parseCount(replyMatch?.[1]),
externalId: href.match(/\/post\/([^/?]+)/)?.[1],
});
} catch {
// skip unreadable post
}
}
return posts;
}
async function primeSession(
page: import("playwright").Page,
onStep?: BrowserProgressCallback
): Promise<void> {
await onStep?.("session_check");
const response = await page.goto(HOME_URL, { waitUntil: "domcontentloaded", timeout: 45000 });
await onStep?.("landing_pause", "首頁");
await humanLandingPause(page);
const bodyText = await page.locator("body").innerText().catch(() => "");
await assertThreadsPageSafe(page, response?.status(), bodyText);
if (page.url().includes("/login")) {
throw new Error("Session 已失效,請到設定頁重新登入 Threads");
}
}
function attachGraphqlCollector(page: import("playwright").Page, collected: ThreadsPost[]) {
page.on("response", async (response) => {
const url = response.url();
if (!url.includes("graphql") && !url.includes("threads") && !url.includes("instagram")) {
return;
}
try {
const contentType = response.headers()["content-type"] ?? "";
if (!contentType.includes("json")) return;
const json = await response.json();
extractPostsFromJson(json, collected);
} catch {
// ignore parse errors
}
});
}
export async function searchQueryOnPage(
page: import("playwright").Page,
query: string,
limit = 20,
onStep?: BrowserProgressCallback
): Promise<RankedPost[]> {
const collected: ThreadsPost[] = [];
attachGraphqlCollector(page, collected);
const searchUrl = `https://www.threads.com/search?q=${encodeURIComponent(query)}&serp_type=default`;
await onStep?.("open_page", `搜尋「${query}`);
const response = await page.goto(searchUrl, { waitUntil: "domcontentloaded", timeout: 45000 });
await onStep?.("landing_pause");
await humanLandingPause(page);
const bodyText = await page.locator("body").innerText();
await assertThreadsPageSafe(page, response?.status(), bodyText);
if (bodyText.includes("走丟") || bodyText.includes("頁面不存在")) {
await onStep?.("done", "0 篇");
return [];
}
await onStep?.("wait_content");
await page.waitForSelector('a[href*="/post/"]', { timeout: 12000 }).catch(() => undefined);
await onStep?.("scroll");
await humanScrollPage(page, { minPasses: 2, maxPasses: 4 });
if (collected.length < 3) {
await onStep?.("parse_scripts");
const { posts: scriptPosts } = await extractFromPageScripts(page);
collected.push(...scriptPosts);
} else {
await onStep?.("parse_network", `${collected.length}`);
}
if (collected.length < 3) {
await onStep?.("parse_dom");
const domPosts = await scrapeFromDom(page);
collected.push(...domPosts);
}
const ranked = rankAndDedupe(collected, limit);
await onStep?.("done", `${ranked.length}`);
return ranked;
}
function resolveSessionOptions(session?: ActiveSession): BrowserSessionOptions | undefined {
return session ? browserSessionOptions(session) : undefined;
}
export interface BrowserSearchOptions {
onStep?: BrowserProgressCallback;
}
export async function search(
storageState: string,
query: string,
limit = 20,
session?: ActiveSession,
options?: BrowserSearchOptions
): Promise<RankedPost[]> {
return withSharedContext(
storageState,
async (context) => {
const page = await context.newPage();
await primeSession(page, options?.onStep);
return searchQueryOnPage(page, query, limit, options?.onStep);
},
resolveSessionOptions(session)
);
}
export type RankedPostWithTag = RankedPost & { searchTag?: string };
function mergePosts(batches: RankedPostWithTag[][]): RankedPostWithTag[] {
const merged = new Map<string, RankedPostWithTag>();
for (const batch of batches) {
for (const post of batch) {
const key = post.externalId ?? `${post.authorName ?? ""}:${post.text.trim().slice(0, 120)}`;
const existing = merged.get(key);
if (!existing || post.score > existing.score) {
merged.set(key, post);
}
}
}
return Array.from(merged.values()).sort((a, b) => b.score - a.score);
}
/** 執行混合任務:關鍵字搜尋 + 帳號海巡,有限平行(建議 ≤3 */
export async function executeScanTasks(
storageState: string,
tasks: ScanTask[],
options?: {
session?: ActiveSession;
concurrency?: number;
onProgress?: (done: number, total: number, label: string) => void;
shouldAbort?: () => boolean | Promise<boolean>;
onTaskStart?: (task: ScanTask) => void | Promise<void>;
onTaskDone?: (task: ScanTask, found: number) => void | Promise<void>;
onTaskFail?: (task: ScanTask, error: unknown) => void | Promise<void>;
onTaskStep?: (
task: ScanTask,
step: BrowserCrawlStep,
detail?: string
) => void | Promise<void>;
}
): Promise<RankedPostWithTag[]> {
const safeTasks = limitCrawlerTasks(tasks);
if (safeTasks.length === 0) return [];
try {
return await withSharedContext(storageState, async (context) => {
const primePage = await context.newPage();
await primeSession(primePage, (step, detail) =>
options?.onTaskStep?.(safeTasks[0] ?? { id: "prime", kind: "keyword", query: "", label: "初始化", limit: 0 }, step, detail)
);
await primePage.close();
const results = await runWithConcurrency(
safeTasks,
async (task) => {
consumeCrawlerPageQuota(options?.session?.id);
await options?.onTaskStart?.(task);
const page = await context.newPage();
const onStep: BrowserProgressCallback = (step, detail) =>
options?.onTaskStep?.(task, step, detail);
try {
let posts: RankedPost[];
if (task.kind === "account") {
posts = await getProfilePostsOnPage(page, task.query, task.limit, onStep);
} else {
posts = await searchQueryOnPage(page, task.query, task.limit, onStep);
}
const mapped = posts.map((post) => ({ ...post, searchTag: task.label }));
await options?.onTaskDone?.(task, mapped.length);
return mapped;
} catch (error) {
await options?.onTaskFail?.(task, error);
throw error;
} finally {
await onStep("between_pages");
await humanPause("betweenPages");
await page.close();
}
},
{
concurrency: options?.concurrency ?? getBrowserConcurrency(),
staggerMs: DEFAULT_TASK_STAGGER_MS,
shouldAbort: options?.shouldAbort,
onProgress: (done, total) => {
const task = safeTasks[done - 1];
options?.onProgress?.(done, total, task?.label ?? "");
},
}
);
return mergePosts(results);
}, resolveSessionOptions(options?.session));
} catch (error) {
if (error instanceof Error && error.message === "__JOB_ABORT__") {
const { JobCancelledError } = await import("@/lib/jobs/cancel");
throw new JobCancelledError();
}
throw error;
}
}
/** @deprecated 使用 executeScanTasks */
export async function searchTagsParallel(
storageState: string,
tags: string[],
perTagLimit: number,
options?: {
concurrency?: number;
onProgress?: (done: number, total: number, tag: string) => void;
}
): Promise<RankedPostWithTag[]> {
const tasks: ScanTask[] = tags.map((tag) => ({
id: `keyword:${tag}`,
kind: "keyword",
query: tag,
label: tag,
limit: perTagLimit,
}));
return executeScanTasks(storageState, tasks, options);
}
export { IG_APP_ID };