166 lines
5.7 KiB
TypeScript
166 lines
5.7 KiB
TypeScript
import { parseCount } from "@/lib/utils";
|
||
import { runWithConcurrency } from "@/lib/utils/concurrency";
|
||
import { withSharedContext, withPage } from "./browser";
|
||
import {
|
||
DEFAULT_STAGGER_MS,
|
||
getReplyFetchConcurrency,
|
||
humanLandingPause,
|
||
humanPause,
|
||
humanScrollPage,
|
||
} from "./human-behavior";
|
||
import { browserSessionOptions, type ActiveSession } from "./session";
|
||
import { extractFromPageScripts, extractRepliesFromJson } from "./extract";
|
||
import type { ThreadsReply } from "./types";
|
||
import { assertThreadsPageSafe, consumeCrawlerPageQuota } from "./safety";
|
||
|
||
const HOME_URL = "https://www.threads.com/";
|
||
|
||
async function scrapeRepliesFromDom(page: import("playwright").Page): Promise<ThreadsReply[]> {
|
||
const replies: ThreadsReply[] = [];
|
||
const postLinks = page.locator('a[href*="/post/"]');
|
||
const count = await postLinks.count();
|
||
|
||
for (let i = 1; i < Math.min(count, 30); i++) {
|
||
const link = postLinks.nth(i);
|
||
try {
|
||
const container = link.locator("xpath=ancestor::*[contains(@data-pressable-container,'true')][1]");
|
||
const scope = (await container.count()) > 0 ? container : link.locator("xpath=ancestor::div[position()<=5]").first();
|
||
const text = await scope.locator('div[dir="auto"], span[dir="auto"]').first().innerText({ timeout: 1200 });
|
||
if (!text || text.length < 2) continue;
|
||
|
||
const authorHref = await scope.locator('a[href*="/@"]').first().getAttribute("href").catch(() => null);
|
||
const authorName = authorHref?.match(/@([^/?]+)/)?.[1];
|
||
const statsText = await scope.innerText().catch(() => "");
|
||
const likeMatch = statsText.match(/(\d[\d.,kKmM萬]*)\s*(讚|likes?)/i);
|
||
|
||
replies.push({
|
||
text: text.trim(),
|
||
authorName,
|
||
likeCount: parseCount(likeMatch?.[1]),
|
||
});
|
||
} catch {
|
||
// skip
|
||
}
|
||
}
|
||
|
||
return replies;
|
||
}
|
||
|
||
function attachReplyCollector(page: import("playwright").Page, collected: ThreadsReply[]) {
|
||
page.on("response", async (response) => {
|
||
const url = response.url();
|
||
if (!url.includes("graphql") && !url.includes("threads")) return;
|
||
try {
|
||
const contentType = response.headers()["content-type"] ?? "";
|
||
if (!contentType.includes("json")) return;
|
||
const json = await response.json();
|
||
extractRepliesFromJson(json, collected);
|
||
} catch {
|
||
// ignore
|
||
}
|
||
});
|
||
}
|
||
|
||
function dedupeReplies(collected: ThreadsReply[], limit: number): ThreadsReply[] {
|
||
const seen = new Set<string>();
|
||
const unique: ThreadsReply[] = [];
|
||
for (const reply of collected) {
|
||
const key = reply.text.slice(0, 80);
|
||
if (seen.has(key)) continue;
|
||
seen.add(key);
|
||
unique.push(reply);
|
||
}
|
||
return unique.sort((a, b) => (b.likeCount ?? 0) - (a.likeCount ?? 0)).slice(0, limit);
|
||
}
|
||
|
||
/** 在既有 page 上抓取留言(優先 GraphQL) */
|
||
export async function getRepliesOnPage(
|
||
page: import("playwright").Page,
|
||
permalink: string,
|
||
limit = 10
|
||
): Promise<ThreadsReply[]> {
|
||
const collected: ThreadsReply[] = [];
|
||
attachReplyCollector(page, collected);
|
||
|
||
const response = await page.goto(permalink, { waitUntil: "domcontentloaded", timeout: 45000 });
|
||
await humanLandingPause(page);
|
||
const bodyText = await page.locator("body").innerText().catch(() => "");
|
||
await assertThreadsPageSafe(page, response?.status(), bodyText);
|
||
await humanScrollPage(page, { minPasses: 2, maxPasses: 3 });
|
||
|
||
if (collected.length < 2) {
|
||
const { replies: scriptReplies } = await extractFromPageScripts(page);
|
||
collected.push(...scriptReplies);
|
||
}
|
||
|
||
if (collected.length < 2) {
|
||
const domReplies = await scrapeRepliesFromDom(page);
|
||
collected.push(...domReplies);
|
||
}
|
||
|
||
return dedupeReplies(collected, limit);
|
||
}
|
||
|
||
export async function getReplies(
|
||
storageState: string,
|
||
permalink: string,
|
||
limit = 10,
|
||
session?: ActiveSession
|
||
): Promise<ThreadsReply[]> {
|
||
return withPage(storageState, async (page) => {
|
||
await page.goto(HOME_URL, { waitUntil: "domcontentloaded", timeout: 45000 });
|
||
await humanLandingPause(page);
|
||
return getRepliesOnPage(page, permalink, limit);
|
||
}, session ? browserSessionOptions(session) : undefined);
|
||
}
|
||
|
||
/** 同一 session 平行抓多篇留言 */
|
||
export async function getRepliesParallel(
|
||
storageState: string,
|
||
permalinks: string[],
|
||
limitPerPost: number,
|
||
options?: {
|
||
concurrency?: number;
|
||
session?: ActiveSession;
|
||
onProgress?: (done: number, total: number, permalink: string) => void | Promise<void>;
|
||
}
|
||
): Promise<Map<string, ThreadsReply[]>> {
|
||
const result = new Map<string, ThreadsReply[]>();
|
||
if (permalinks.length === 0) return result;
|
||
|
||
await withSharedContext(storageState, async (context) => {
|
||
const primePage = await context.newPage();
|
||
await primePage.goto(HOME_URL, { waitUntil: "domcontentloaded", timeout: 45000 });
|
||
await humanLandingPause(primePage);
|
||
await primePage.close();
|
||
|
||
await runWithConcurrency(
|
||
permalinks.slice(0, 4),
|
||
async (permalink) => {
|
||
consumeCrawlerPageQuota(options?.session?.id);
|
||
const page = await context.newPage();
|
||
try {
|
||
const replies = await getRepliesOnPage(page, permalink, limitPerPost);
|
||
result.set(permalink, replies);
|
||
} catch (error) {
|
||
if (error instanceof Error && error.message.includes("已立即停止海巡")) throw error;
|
||
result.set(permalink, []);
|
||
} finally {
|
||
await humanPause("betweenPages");
|
||
await page.close();
|
||
}
|
||
},
|
||
{
|
||
concurrency: options?.concurrency ?? getReplyFetchConcurrency(),
|
||
staggerMs: DEFAULT_STAGGER_MS,
|
||
onProgress: (done, total) => {
|
||
const permalink = permalinks.slice(0, 4)[done - 1] ?? "";
|
||
void options?.onProgress?.(done, total, permalink);
|
||
},
|
||
}
|
||
);
|
||
}, options?.session ? browserSessionOptions(options.session) : undefined);
|
||
|
||
return result;
|
||
}
|