377 lines
12 KiB
TypeScript
377 lines
12 KiB
TypeScript
import { getProfilePosts } from './threads-profile-scraper'
|
|
|
|
const SUCCESS_CODE = 102000
|
|
const API_BASE = process.env.HAIXUN_BACKEND_URL ?? 'http://127.0.0.1:8890'
|
|
const WORKER_ID =
|
|
process.env.HAIXUN_NODE_WORKER_ID ??
|
|
`${process.env.HOSTNAME ?? 'local'}-style-8d-node-${process.pid}`
|
|
const WORKER_SECRET = process.env.HAIXUN_WORKER_SECRET ?? ''
|
|
const POLL_INTERVAL_MS = Number(process.env.HAIXUN_WORKER_POLL_MS ?? 3000)
|
|
const TARGET_SAMPLES = Number(process.env.HAIXUN_8D_TARGET_SAMPLES ?? 4)
|
|
|
|
type ApiEnvelope<T> = {
|
|
code: number
|
|
message: string
|
|
data?: T
|
|
}
|
|
|
|
type StepStatus = 'pending' | 'running' | 'succeeded' | 'failed' | 'cancelled'
|
|
|
|
type JobStep = {
|
|
id: string
|
|
status: StepStatus
|
|
started_at?: number
|
|
ended_at?: number
|
|
message?: string
|
|
}
|
|
|
|
type JobData = {
|
|
id: string
|
|
template_type: string
|
|
scope_id: string
|
|
status: string
|
|
phase: string
|
|
payload: Record<string, unknown>
|
|
progress: {
|
|
summary: string
|
|
percentage: number
|
|
steps: JobStep[]
|
|
}
|
|
}
|
|
|
|
type ScrapedPost = {
|
|
text: string
|
|
permalink?: string
|
|
likeCount?: number
|
|
replyCount?: number
|
|
}
|
|
|
|
type BrowserSessionData = {
|
|
account_id: string
|
|
storage_state: string
|
|
update_at: number
|
|
}
|
|
|
|
const STYLE_STEPS = ['session', 'samples', 'style', 'store']
|
|
|
|
const STEP_PROGRESS: Record<string, number> = {
|
|
session: 10,
|
|
samples: 40,
|
|
style: 75,
|
|
store: 92,
|
|
}
|
|
|
|
function sleep(ms: number) {
|
|
return new Promise((resolve) => setTimeout(resolve, ms))
|
|
}
|
|
|
|
function nowNano() {
|
|
return Date.now() * 1_000_000
|
|
}
|
|
|
|
async function api<T>(path: string, body?: unknown, method = 'POST'): Promise<T | undefined> {
|
|
const headers: Record<string, string> = { 'Content-Type': 'application/json' }
|
|
if (WORKER_SECRET) headers['X-Worker-Secret'] = WORKER_SECRET
|
|
|
|
const res = await fetch(`${API_BASE}${path}`, {
|
|
method,
|
|
headers,
|
|
body: body === undefined ? undefined : JSON.stringify(body),
|
|
})
|
|
const raw = await res.text()
|
|
let json: ApiEnvelope<T>
|
|
try {
|
|
json = JSON.parse(raw) as ApiEnvelope<T>
|
|
} catch {
|
|
throw new Error(`worker API returned non-JSON response from ${path}: HTTP ${res.status}`)
|
|
}
|
|
if (json.code !== SUCCESS_CODE) {
|
|
throw new Error(json.message || `request failed: ${path}`)
|
|
}
|
|
return json.data
|
|
}
|
|
|
|
async function claimJob() {
|
|
return api<JobData | null>('/api/v1/internal/workers/jobs/claim', {
|
|
worker_type: 'node',
|
|
worker_id: WORKER_ID,
|
|
})
|
|
}
|
|
|
|
async function heartbeat(jobID: string) {
|
|
await api(`/api/v1/internal/workers/jobs/${encodeURIComponent(jobID)}/heartbeat`, {
|
|
worker_id: WORKER_ID,
|
|
ttl_seconds: 300,
|
|
})
|
|
}
|
|
|
|
async function isCancelRequested(jobID: string) {
|
|
const data = await api<{ cancelled: boolean }>(
|
|
`/api/v1/internal/workers/jobs/${encodeURIComponent(jobID)}/cancel-check`,
|
|
{ worker_id: WORKER_ID },
|
|
)
|
|
return !!data?.cancelled
|
|
}
|
|
|
|
async function ackCancel(jobID: string) {
|
|
await api(`/api/v1/internal/workers/jobs/${encodeURIComponent(jobID)}/cancel-ack`, {
|
|
worker_id: WORKER_ID,
|
|
})
|
|
}
|
|
|
|
async function failJob(jobID: string, error: unknown, phase?: string) {
|
|
const message = error instanceof Error ? error.message : String(error)
|
|
await api(`/api/v1/internal/workers/jobs/${encodeURIComponent(jobID)}/fail`, {
|
|
worker_id: WORKER_ID,
|
|
error: message,
|
|
phase,
|
|
})
|
|
}
|
|
|
|
async function completeJob(jobID: string, result: Record<string, unknown>) {
|
|
await api(`/api/v1/internal/workers/jobs/${encodeURIComponent(jobID)}/complete`, {
|
|
worker_id: WORKER_ID,
|
|
result,
|
|
})
|
|
}
|
|
|
|
async function report(job: JobData, phase: string, summary: string, steps: JobStep[], percentage?: number) {
|
|
await api(`/api/v1/internal/workers/jobs/${encodeURIComponent(job.id)}/progress`, {
|
|
worker_id: WORKER_ID,
|
|
phase,
|
|
summary,
|
|
percentage: percentage ?? STEP_PROGRESS[phase] ?? job.progress?.percentage ?? 0,
|
|
steps,
|
|
})
|
|
}
|
|
|
|
function normalizeSteps(job: JobData): JobStep[] {
|
|
const existing = new Map((job.progress?.steps ?? []).map((step) => [step.id, step]))
|
|
return STYLE_STEPS.map((id) => existing.get(id) ?? { id, status: 'pending' as const })
|
|
}
|
|
|
|
function updateStep(steps: JobStep[], id: string, status: StepStatus, message: string): JobStep[] {
|
|
const at = nowNano()
|
|
return steps.map((step) => {
|
|
if (step.id !== id) return step
|
|
return {
|
|
...step,
|
|
status,
|
|
message,
|
|
started_at: step.started_at ?? at,
|
|
ended_at: status === 'succeeded' || status === 'failed' || status === 'cancelled' ? at : step.ended_at,
|
|
}
|
|
})
|
|
}
|
|
|
|
async function runStep<T>(
|
|
job: JobData,
|
|
steps: JobStep[],
|
|
phase: string,
|
|
runningMessage: string,
|
|
doneMessage: string,
|
|
fn: () => Promise<T>,
|
|
) {
|
|
let nextSteps = updateStep(steps, phase, 'running', runningMessage)
|
|
await report(job, phase, runningMessage, nextSteps)
|
|
if (await isCancelRequested(job.id)) {
|
|
nextSteps = updateStep(nextSteps, phase, 'cancelled', '使用者已取消')
|
|
await report(job, phase, '任務已收到取消請求', nextSteps)
|
|
await ackCancel(job.id)
|
|
throw new Error('cancelled')
|
|
}
|
|
|
|
let result: T
|
|
try {
|
|
result = await fn()
|
|
} catch (error) {
|
|
const message = error instanceof Error ? error.message : String(error)
|
|
nextSteps = updateStep(nextSteps, phase, 'failed', message)
|
|
await report(job, phase, message, nextSteps)
|
|
throw error
|
|
}
|
|
nextSteps = updateStep(nextSteps, phase, 'succeeded', doneMessage)
|
|
await report(job, phase, doneMessage, nextSteps)
|
|
return { result, steps: nextSteps }
|
|
}
|
|
|
|
async function getBrowserStorageState(job: JobData) {
|
|
const accountID = String(job.payload.threads_account_id ?? '').trim()
|
|
const tenantID = String(job.payload.tenant_id ?? '').trim()
|
|
const ownerUID = String(job.payload.owner_uid ?? '').trim()
|
|
if (!accountID || !tenantID || !ownerUID) return ''
|
|
try {
|
|
const data = await api<BrowserSessionData>(
|
|
`/api/v1/internal/workers/threads-accounts/${encodeURIComponent(accountID)}/session`,
|
|
{
|
|
tenant_id: tenantID,
|
|
owner_uid: ownerUID,
|
|
},
|
|
)
|
|
return data?.storage_state ?? ''
|
|
} catch (error) {
|
|
console.warn(`[8d-worker] browser session unavailable, fallback to public page: ${error}`)
|
|
return ''
|
|
}
|
|
}
|
|
|
|
function toProgressSteps(steps: JobStep[]) {
|
|
return steps.map((step) => ({
|
|
id: step.id,
|
|
status: step.status,
|
|
started_at: step.started_at,
|
|
ended_at: step.ended_at,
|
|
message: step.message,
|
|
}))
|
|
}
|
|
|
|
async function analyzeStyle8DWithGo(job: JobData, username: string, posts: ScrapedPost[], steps: JobStep[]) {
|
|
const personaID = String(job.payload.persona_id ?? job.scope_id ?? '').trim()
|
|
const tenantID = String(job.payload.tenant_id ?? '').trim()
|
|
const ownerUID = String(job.payload.owner_uid ?? '').trim()
|
|
const accountID = String(job.payload.threads_account_id ?? '').trim()
|
|
if (!personaID || !tenantID || !ownerUID || !accountID) {
|
|
throw new Error('job payload 缺少 persona_id / tenant_id / owner_uid / threads_account_id')
|
|
}
|
|
|
|
const data = await api<{
|
|
persona_id: string
|
|
post_count: number
|
|
style_profile: string
|
|
style_benchmark: string
|
|
}>(
|
|
`/api/v1/internal/workers/jobs/${encodeURIComponent(job.id)}/analyze-style8d`,
|
|
{
|
|
worker_id: WORKER_ID,
|
|
tenant_id: tenantID,
|
|
owner_uid: ownerUID,
|
|
persona_id: personaID,
|
|
threads_account_id: accountID,
|
|
username,
|
|
posts: posts.map((post) => ({
|
|
text: post.text,
|
|
permalink: post.permalink,
|
|
like_count: post.likeCount ?? 0,
|
|
reply_count: post.replyCount ?? 0,
|
|
})),
|
|
steps: toProgressSteps(steps),
|
|
},
|
|
)
|
|
if (!data?.style_profile?.trim()) {
|
|
throw new Error('Go 8D 分析未回傳 style_profile')
|
|
}
|
|
return data
|
|
}
|
|
|
|
async function processStyle8D(job: JobData) {
|
|
const username = String(job.payload.benchmark_username ?? '').replace(/^@/, '').trim()
|
|
if (!username) throw new Error('job payload 缺少 benchmark_username')
|
|
|
|
let steps = normalizeSteps(job)
|
|
const hb = setInterval(() => {
|
|
heartbeat(job.id).catch((error) => console.warn(`[8d-worker] heartbeat failed: ${error}`))
|
|
}, 30_000)
|
|
|
|
try {
|
|
let storageState = ''
|
|
;({ result: storageState, steps } = await runStep(
|
|
job,
|
|
steps,
|
|
'session',
|
|
'確認 Chrome extension session 是否已同步…',
|
|
'Chrome session 已就緒',
|
|
async () => {
|
|
await heartbeat(job.id)
|
|
const state = await getBrowserStorageState(job)
|
|
if (!state.trim()) {
|
|
const accountID = String(job.payload.threads_account_id ?? '').trim()
|
|
if (!accountID) {
|
|
throw new Error(
|
|
'尚未選定經營帳號。請在頂部切換 Threads 帳號,到連線頁用擴充套件同步登入態後再跑 8D。',
|
|
)
|
|
}
|
|
throw new Error(
|
|
'Chrome session 尚未同步到此帳號。請到該帳號連線頁,確認 Threads 已在 Chrome 登入,再按擴充套件同步。',
|
|
)
|
|
}
|
|
return state
|
|
},
|
|
))
|
|
|
|
let posts: ScrapedPost[] = []
|
|
;({ result: posts, steps } = await runStep(
|
|
job,
|
|
steps,
|
|
'samples',
|
|
`使用登入態抓取 @${username} 近期貼文樣本…`,
|
|
'貼文樣本已抓取',
|
|
async () => getProfilePosts(storageState, username, 12),
|
|
))
|
|
|
|
if (posts.length < TARGET_SAMPLES) {
|
|
throw new Error(
|
|
`可讀取的近期貼文不足 ${TARGET_SAMPLES} 篇(目前 ${posts.length} 篇)。請確認對標帳號存在、有公開貼文,且 Chrome session 仍有效(可重新同步擴充套件)。`,
|
|
)
|
|
}
|
|
|
|
steps = updateStep(steps, 'style', 'running', `已取得 ${posts.length} 篇,交由 Go LLM 分析八個維度…`)
|
|
await report(job, 'style', `已取得 ${posts.length} 篇,交由 Go LLM 分析八個維度…`, steps, 55)
|
|
if (await isCancelRequested(job.id)) {
|
|
steps = updateStep(steps, 'style', 'cancelled', '使用者已取消')
|
|
await report(job, 'style', '任務已收到取消請求', steps)
|
|
await ackCancel(job.id)
|
|
throw new Error('cancelled')
|
|
}
|
|
|
|
const analyzed = await analyzeStyle8DWithGo(job, username, posts, steps)
|
|
steps = updateStep(updateStep(steps, 'style', 'succeeded', '8D 風格策略已產生'), 'store', 'succeeded', '8D 策略已寫入人設')
|
|
await report(job, 'store', '8D 策略已寫入人設', steps, 92)
|
|
|
|
await completeJob(job.id, {
|
|
persona_id: job.payload.persona_id,
|
|
benchmark_username: username,
|
|
post_count: analyzed.post_count,
|
|
message: '8D style profile analyzed by Go LLM and stored',
|
|
})
|
|
console.log(`[8d-worker] completed job=${job.id} username=${username} posts=${posts.length}`)
|
|
} finally {
|
|
clearInterval(hb)
|
|
}
|
|
}
|
|
|
|
async function main() {
|
|
console.log(`[8d-worker] started id=${WORKER_ID} api=${API_BASE}`)
|
|
while (true) {
|
|
try {
|
|
const job = await claimJob()
|
|
if (!job) {
|
|
await sleep(POLL_INTERVAL_MS)
|
|
continue
|
|
}
|
|
console.log(`[8d-worker] claimed job=${job.id} template=${job.template_type}`)
|
|
if (job.template_type !== 'style-8d') {
|
|
await failJob(job.id, new Error(`unsupported node job template: ${job.template_type}`))
|
|
continue
|
|
}
|
|
try {
|
|
await processStyle8D(job)
|
|
} catch (error) {
|
|
if (error instanceof Error && error.message === 'cancelled') {
|
|
console.log(`[8d-worker] cancelled job=${job.id}`)
|
|
} else {
|
|
console.error(`[8d-worker] failed job=${job.id}`, error)
|
|
await failJob(job.id, error, job.phase)
|
|
}
|
|
}
|
|
} catch (error) {
|
|
console.error('[8d-worker] loop error', error)
|
|
await sleep(POLL_INTERVAL_MS)
|
|
}
|
|
}
|
|
}
|
|
|
|
main().catch((error) => {
|
|
console.error('[8d-worker] fatal', error)
|
|
process.exit(1)
|
|
})
|