293 lines
9.9 KiB
TypeScript
293 lines
9.9 KiB
TypeScript
|
|
import { parseCount } from "@/lib/utils";
|
|||
|
|
import { runWithConcurrency } from "@/lib/utils/concurrency";
|
|||
|
|
import { rankAndDedupe, type RankedPost } from "@/lib/ranking";
|
|||
|
|
import type { ScanTask } from "@/lib/services/scan-tasks";
|
|||
|
|
import { withSharedContext, type BrowserSessionOptions } from "./browser";
|
|||
|
|
import {
|
|||
|
|
DEFAULT_TASK_STAGGER_MS,
|
|||
|
|
getBrowserConcurrency,
|
|||
|
|
humanLandingPause,
|
|||
|
|
humanPause,
|
|||
|
|
humanScrollPage,
|
|||
|
|
} from "./human-behavior";
|
|||
|
|
import { browserSessionOptions, type ActiveSession } from "./session";
|
|||
|
|
import { extractFromPageScripts, extractPostsFromJson } from "./extract";
|
|||
|
|
import { getProfilePostsOnPage } from "./profile";
|
|||
|
|
import type { BrowserCrawlStep, BrowserProgressCallback } from "./progress";
|
|||
|
|
import type { ThreadsPost } from "./types";
|
|||
|
|
import {
|
|||
|
|
assertThreadsPageSafe,
|
|||
|
|
consumeCrawlerPageQuota,
|
|||
|
|
limitCrawlerTasks,
|
|||
|
|
} from "./safety";
|
|||
|
|
|
|||
|
|
const HOME_URL = "https://www.threads.com/";
|
|||
|
|
const IG_APP_ID = "238260118697367";
|
|||
|
|
|
|||
|
|
async function scrapeFromDom(page: import("playwright").Page): Promise<ThreadsPost[]> {
|
|||
|
|
const posts: ThreadsPost[] = [];
|
|||
|
|
const seen = new Set<string>();
|
|||
|
|
|
|||
|
|
const postLinks = page.locator('a[href*="/post/"]');
|
|||
|
|
const count = await postLinks.count();
|
|||
|
|
|
|||
|
|
for (let i = 0; i < Math.min(count, 40); i++) {
|
|||
|
|
const link = postLinks.nth(i);
|
|||
|
|
try {
|
|||
|
|
const href = await link.getAttribute("href", { timeout: 1200 });
|
|||
|
|
if (!href || seen.has(href)) continue;
|
|||
|
|
seen.add(href);
|
|||
|
|
|
|||
|
|
const permalink = href.startsWith("http") ? href : `https://www.threads.com${href}`;
|
|||
|
|
const authorName = href.match(/@([^/]+)\/post/)?.[1];
|
|||
|
|
|
|||
|
|
const container = link.locator("xpath=ancestor::*[contains(@data-pressable-container,'true')][1]");
|
|||
|
|
const hasContainer = (await container.count()) > 0;
|
|||
|
|
const scope = hasContainer ? container : link.locator("xpath=ancestor::div[position()<=6]").first();
|
|||
|
|
|
|||
|
|
const textEl = scope.locator('div[dir="auto"], span[dir="auto"]').first();
|
|||
|
|
const text = await textEl.innerText({ timeout: 1200 }).catch(() => "");
|
|||
|
|
if (!text || text.length < 5) continue;
|
|||
|
|
|
|||
|
|
const statsText = await scope.innerText().catch(() => "");
|
|||
|
|
const likeMatch = statsText.match(/(\d[\d.,kKmM萬]*)\s*(讚|likes?)/i);
|
|||
|
|
const replyMatch = statsText.match(/(\d[\d.,kKmM萬]*)\s*(則回覆|replies?|留言)/i);
|
|||
|
|
|
|||
|
|
posts.push({
|
|||
|
|
text: text.trim(),
|
|||
|
|
permalink,
|
|||
|
|
authorName,
|
|||
|
|
likeCount: parseCount(likeMatch?.[1]),
|
|||
|
|
replyCount: parseCount(replyMatch?.[1]),
|
|||
|
|
externalId: href.match(/\/post\/([^/?]+)/)?.[1],
|
|||
|
|
});
|
|||
|
|
} catch {
|
|||
|
|
// skip unreadable post
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return posts;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
async function primeSession(
|
|||
|
|
page: import("playwright").Page,
|
|||
|
|
onStep?: BrowserProgressCallback
|
|||
|
|
): Promise<void> {
|
|||
|
|
await onStep?.("session_check");
|
|||
|
|
const response = await page.goto(HOME_URL, { waitUntil: "domcontentloaded", timeout: 45000 });
|
|||
|
|
await onStep?.("landing_pause", "首頁");
|
|||
|
|
await humanLandingPause(page);
|
|||
|
|
|
|||
|
|
const bodyText = await page.locator("body").innerText().catch(() => "");
|
|||
|
|
await assertThreadsPageSafe(page, response?.status(), bodyText);
|
|||
|
|
if (page.url().includes("/login")) {
|
|||
|
|
throw new Error("Session 已失效,請到設定頁重新登入 Threads");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
function attachGraphqlCollector(page: import("playwright").Page, collected: ThreadsPost[]) {
|
|||
|
|
page.on("response", async (response) => {
|
|||
|
|
const url = response.url();
|
|||
|
|
if (!url.includes("graphql") && !url.includes("threads") && !url.includes("instagram")) {
|
|||
|
|
return;
|
|||
|
|
}
|
|||
|
|
try {
|
|||
|
|
const contentType = response.headers()["content-type"] ?? "";
|
|||
|
|
if (!contentType.includes("json")) return;
|
|||
|
|
const json = await response.json();
|
|||
|
|
extractPostsFromJson(json, collected);
|
|||
|
|
} catch {
|
|||
|
|
// ignore parse errors
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export async function searchQueryOnPage(
|
|||
|
|
page: import("playwright").Page,
|
|||
|
|
query: string,
|
|||
|
|
limit = 20,
|
|||
|
|
onStep?: BrowserProgressCallback
|
|||
|
|
): Promise<RankedPost[]> {
|
|||
|
|
const collected: ThreadsPost[] = [];
|
|||
|
|
attachGraphqlCollector(page, collected);
|
|||
|
|
|
|||
|
|
const searchUrl = `https://www.threads.com/search?q=${encodeURIComponent(query)}&serp_type=default`;
|
|||
|
|
await onStep?.("open_page", `搜尋「${query}」`);
|
|||
|
|
const response = await page.goto(searchUrl, { waitUntil: "domcontentloaded", timeout: 45000 });
|
|||
|
|
await onStep?.("landing_pause");
|
|||
|
|
await humanLandingPause(page);
|
|||
|
|
|
|||
|
|
const bodyText = await page.locator("body").innerText();
|
|||
|
|
await assertThreadsPageSafe(page, response?.status(), bodyText);
|
|||
|
|
if (bodyText.includes("走丟") || bodyText.includes("頁面不存在")) {
|
|||
|
|
await onStep?.("done", "0 篇");
|
|||
|
|
return [];
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
await onStep?.("wait_content");
|
|||
|
|
await page.waitForSelector('a[href*="/post/"]', { timeout: 12000 }).catch(() => undefined);
|
|||
|
|
await onStep?.("scroll");
|
|||
|
|
await humanScrollPage(page, { minPasses: 2, maxPasses: 4 });
|
|||
|
|
|
|||
|
|
if (collected.length < 3) {
|
|||
|
|
await onStep?.("parse_scripts");
|
|||
|
|
const { posts: scriptPosts } = await extractFromPageScripts(page);
|
|||
|
|
collected.push(...scriptPosts);
|
|||
|
|
} else {
|
|||
|
|
await onStep?.("parse_network", `${collected.length} 篇`);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if (collected.length < 3) {
|
|||
|
|
await onStep?.("parse_dom");
|
|||
|
|
const domPosts = await scrapeFromDom(page);
|
|||
|
|
collected.push(...domPosts);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
const ranked = rankAndDedupe(collected, limit);
|
|||
|
|
await onStep?.("done", `${ranked.length} 篇`);
|
|||
|
|
return ranked;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
function resolveSessionOptions(session?: ActiveSession): BrowserSessionOptions | undefined {
|
|||
|
|
return session ? browserSessionOptions(session) : undefined;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export interface BrowserSearchOptions {
|
|||
|
|
onStep?: BrowserProgressCallback;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export async function search(
|
|||
|
|
storageState: string,
|
|||
|
|
query: string,
|
|||
|
|
limit = 20,
|
|||
|
|
session?: ActiveSession,
|
|||
|
|
options?: BrowserSearchOptions
|
|||
|
|
): Promise<RankedPost[]> {
|
|||
|
|
return withSharedContext(
|
|||
|
|
storageState,
|
|||
|
|
async (context) => {
|
|||
|
|
const page = await context.newPage();
|
|||
|
|
await primeSession(page, options?.onStep);
|
|||
|
|
return searchQueryOnPage(page, query, limit, options?.onStep);
|
|||
|
|
},
|
|||
|
|
resolveSessionOptions(session)
|
|||
|
|
);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export type RankedPostWithTag = RankedPost & { searchTag?: string };
|
|||
|
|
|
|||
|
|
function mergePosts(batches: RankedPostWithTag[][]): RankedPostWithTag[] {
|
|||
|
|
const merged = new Map<string, RankedPostWithTag>();
|
|||
|
|
for (const batch of batches) {
|
|||
|
|
for (const post of batch) {
|
|||
|
|
const key = post.externalId ?? `${post.authorName ?? ""}:${post.text.trim().slice(0, 120)}`;
|
|||
|
|
const existing = merged.get(key);
|
|||
|
|
if (!existing || post.score > existing.score) {
|
|||
|
|
merged.set(key, post);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
return Array.from(merged.values()).sort((a, b) => b.score - a.score);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/** 執行混合任務:關鍵字搜尋 + 帳號海巡,有限平行(建議 ≤3) */
|
|||
|
|
export async function executeScanTasks(
|
|||
|
|
storageState: string,
|
|||
|
|
tasks: ScanTask[],
|
|||
|
|
options?: {
|
|||
|
|
session?: ActiveSession;
|
|||
|
|
concurrency?: number;
|
|||
|
|
onProgress?: (done: number, total: number, label: string) => void;
|
|||
|
|
shouldAbort?: () => boolean | Promise<boolean>;
|
|||
|
|
onTaskStart?: (task: ScanTask) => void | Promise<void>;
|
|||
|
|
onTaskDone?: (task: ScanTask, found: number) => void | Promise<void>;
|
|||
|
|
onTaskFail?: (task: ScanTask, error: unknown) => void | Promise<void>;
|
|||
|
|
onTaskStep?: (
|
|||
|
|
task: ScanTask,
|
|||
|
|
step: BrowserCrawlStep,
|
|||
|
|
detail?: string
|
|||
|
|
) => void | Promise<void>;
|
|||
|
|
}
|
|||
|
|
): Promise<RankedPostWithTag[]> {
|
|||
|
|
const safeTasks = limitCrawlerTasks(tasks);
|
|||
|
|
if (safeTasks.length === 0) return [];
|
|||
|
|
|
|||
|
|
try {
|
|||
|
|
return await withSharedContext(storageState, async (context) => {
|
|||
|
|
const primePage = await context.newPage();
|
|||
|
|
await primeSession(primePage, (step, detail) =>
|
|||
|
|
options?.onTaskStep?.(safeTasks[0] ?? { id: "prime", kind: "keyword", query: "", label: "初始化", limit: 0 }, step, detail)
|
|||
|
|
);
|
|||
|
|
await primePage.close();
|
|||
|
|
|
|||
|
|
const results = await runWithConcurrency(
|
|||
|
|
safeTasks,
|
|||
|
|
async (task) => {
|
|||
|
|
consumeCrawlerPageQuota(options?.session?.id);
|
|||
|
|
await options?.onTaskStart?.(task);
|
|||
|
|
const page = await context.newPage();
|
|||
|
|
const onStep: BrowserProgressCallback = (step, detail) =>
|
|||
|
|
options?.onTaskStep?.(task, step, detail);
|
|||
|
|
try {
|
|||
|
|
let posts: RankedPost[];
|
|||
|
|
if (task.kind === "account") {
|
|||
|
|
posts = await getProfilePostsOnPage(page, task.query, task.limit, onStep);
|
|||
|
|
} else {
|
|||
|
|
posts = await searchQueryOnPage(page, task.query, task.limit, onStep);
|
|||
|
|
}
|
|||
|
|
const mapped = posts.map((post) => ({ ...post, searchTag: task.label }));
|
|||
|
|
await options?.onTaskDone?.(task, mapped.length);
|
|||
|
|
return mapped;
|
|||
|
|
} catch (error) {
|
|||
|
|
await options?.onTaskFail?.(task, error);
|
|||
|
|
throw error;
|
|||
|
|
} finally {
|
|||
|
|
await onStep("between_pages");
|
|||
|
|
await humanPause("betweenPages");
|
|||
|
|
await page.close();
|
|||
|
|
}
|
|||
|
|
},
|
|||
|
|
{
|
|||
|
|
concurrency: options?.concurrency ?? getBrowserConcurrency(),
|
|||
|
|
staggerMs: DEFAULT_TASK_STAGGER_MS,
|
|||
|
|
shouldAbort: options?.shouldAbort,
|
|||
|
|
onProgress: (done, total) => {
|
|||
|
|
const task = safeTasks[done - 1];
|
|||
|
|
options?.onProgress?.(done, total, task?.label ?? "");
|
|||
|
|
},
|
|||
|
|
}
|
|||
|
|
);
|
|||
|
|
|
|||
|
|
return mergePosts(results);
|
|||
|
|
}, resolveSessionOptions(options?.session));
|
|||
|
|
} catch (error) {
|
|||
|
|
if (error instanceof Error && error.message === "__JOB_ABORT__") {
|
|||
|
|
const { JobCancelledError } = await import("@/lib/jobs/cancel");
|
|||
|
|
throw new JobCancelledError();
|
|||
|
|
}
|
|||
|
|
throw error;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/** @deprecated 使用 executeScanTasks */
|
|||
|
|
export async function searchTagsParallel(
|
|||
|
|
storageState: string,
|
|||
|
|
tags: string[],
|
|||
|
|
perTagLimit: number,
|
|||
|
|
options?: {
|
|||
|
|
concurrency?: number;
|
|||
|
|
onProgress?: (done: number, total: number, tag: string) => void;
|
|||
|
|
}
|
|||
|
|
): Promise<RankedPostWithTag[]> {
|
|||
|
|
const tasks: ScanTask[] = tags.map((tag) => ({
|
|||
|
|
id: `keyword:${tag}`,
|
|||
|
|
kind: "keyword",
|
|||
|
|
query: tag,
|
|||
|
|
label: tag,
|
|||
|
|
limit: perTagLimit,
|
|||
|
|
}));
|
|||
|
|
return executeScanTasks(storageState, tasks, options);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
export { IG_APP_ID };
|