haixunMaster/lib/threads-browser/replies.ts

166 lines
5.7 KiB
TypeScript
Raw Permalink Normal View History

2026-06-21 12:50:31 +00:00
import { parseCount } from "@/lib/utils";
import { runWithConcurrency } from "@/lib/utils/concurrency";
import { withSharedContext, withPage } from "./browser";
import {
DEFAULT_STAGGER_MS,
getReplyFetchConcurrency,
humanLandingPause,
humanPause,
humanScrollPage,
} from "./human-behavior";
import { browserSessionOptions, type ActiveSession } from "./session";
import { extractFromPageScripts, extractRepliesFromJson } from "./extract";
import type { ThreadsReply } from "./types";
import { assertThreadsPageSafe, consumeCrawlerPageQuota } from "./safety";
const HOME_URL = "https://www.threads.com/";
async function scrapeRepliesFromDom(page: import("playwright").Page): Promise<ThreadsReply[]> {
const replies: ThreadsReply[] = [];
const postLinks = page.locator('a[href*="/post/"]');
const count = await postLinks.count();
for (let i = 1; i < Math.min(count, 30); i++) {
const link = postLinks.nth(i);
try {
const container = link.locator("xpath=ancestor::*[contains(@data-pressable-container,'true')][1]");
const scope = (await container.count()) > 0 ? container : link.locator("xpath=ancestor::div[position()<=5]").first();
const text = await scope.locator('div[dir="auto"], span[dir="auto"]').first().innerText({ timeout: 1200 });
if (!text || text.length < 2) continue;
const authorHref = await scope.locator('a[href*="/@"]').first().getAttribute("href").catch(() => null);
const authorName = authorHref?.match(/@([^/?]+)/)?.[1];
const statsText = await scope.innerText().catch(() => "");
const likeMatch = statsText.match(/(\d[\d.,kKmM萬]*)\s*(讚|likes?)/i);
replies.push({
text: text.trim(),
authorName,
likeCount: parseCount(likeMatch?.[1]),
});
} catch {
// skip
}
}
return replies;
}
function attachReplyCollector(page: import("playwright").Page, collected: ThreadsReply[]) {
page.on("response", async (response) => {
const url = response.url();
if (!url.includes("graphql") && !url.includes("threads")) return;
try {
const contentType = response.headers()["content-type"] ?? "";
if (!contentType.includes("json")) return;
const json = await response.json();
extractRepliesFromJson(json, collected);
} catch {
// ignore
}
});
}
function dedupeReplies(collected: ThreadsReply[], limit: number): ThreadsReply[] {
const seen = new Set<string>();
const unique: ThreadsReply[] = [];
for (const reply of collected) {
const key = reply.text.slice(0, 80);
if (seen.has(key)) continue;
seen.add(key);
unique.push(reply);
}
return unique.sort((a, b) => (b.likeCount ?? 0) - (a.likeCount ?? 0)).slice(0, limit);
}
/** 在既有 page 上抓取留言(優先 GraphQL */
export async function getRepliesOnPage(
page: import("playwright").Page,
permalink: string,
limit = 10
): Promise<ThreadsReply[]> {
const collected: ThreadsReply[] = [];
attachReplyCollector(page, collected);
const response = await page.goto(permalink, { waitUntil: "domcontentloaded", timeout: 45000 });
await humanLandingPause(page);
const bodyText = await page.locator("body").innerText().catch(() => "");
await assertThreadsPageSafe(page, response?.status(), bodyText);
await humanScrollPage(page, { minPasses: 2, maxPasses: 3 });
if (collected.length < 2) {
const { replies: scriptReplies } = await extractFromPageScripts(page);
collected.push(...scriptReplies);
}
if (collected.length < 2) {
const domReplies = await scrapeRepliesFromDom(page);
collected.push(...domReplies);
}
return dedupeReplies(collected, limit);
}
export async function getReplies(
storageState: string,
permalink: string,
limit = 10,
session?: ActiveSession
): Promise<ThreadsReply[]> {
return withPage(storageState, async (page) => {
await page.goto(HOME_URL, { waitUntil: "domcontentloaded", timeout: 45000 });
await humanLandingPause(page);
return getRepliesOnPage(page, permalink, limit);
}, session ? browserSessionOptions(session) : undefined);
}
/** 同一 session 平行抓多篇留言 */
export async function getRepliesParallel(
storageState: string,
permalinks: string[],
limitPerPost: number,
options?: {
concurrency?: number;
session?: ActiveSession;
onProgress?: (done: number, total: number, permalink: string) => void | Promise<void>;
}
): Promise<Map<string, ThreadsReply[]>> {
const result = new Map<string, ThreadsReply[]>();
if (permalinks.length === 0) return result;
await withSharedContext(storageState, async (context) => {
const primePage = await context.newPage();
await primePage.goto(HOME_URL, { waitUntil: "domcontentloaded", timeout: 45000 });
await humanLandingPause(primePage);
await primePage.close();
await runWithConcurrency(
permalinks.slice(0, 4),
async (permalink) => {
consumeCrawlerPageQuota(options?.session?.id);
const page = await context.newPage();
try {
const replies = await getRepliesOnPage(page, permalink, limitPerPost);
result.set(permalink, replies);
} catch (error) {
if (error instanceof Error && error.message.includes("已立即停止海巡")) throw error;
result.set(permalink, []);
} finally {
await humanPause("betweenPages");
await page.close();
}
},
{
concurrency: options?.concurrency ?? getReplyFetchConcurrency(),
staggerMs: DEFAULT_STAGGER_MS,
onProgress: (done, total) => {
const permalink = permalinks.slice(0, 4)[done - 1] ?? "";
void options?.onProgress?.(done, total, permalink);
},
}
);
}, options?.session ? browserSessionOptions(options.session) : undefined);
return result;
}