thread-master/worker/style-8d-worker.ts

377 lines
12 KiB
TypeScript

import { getProfilePosts } from './threads-profile-scraper'
const SUCCESS_CODE = 102000
const API_BASE = process.env.HAIXUN_BACKEND_URL ?? 'http://127.0.0.1:8890'
const WORKER_ID =
process.env.HAIXUN_NODE_WORKER_ID ??
`${process.env.HOSTNAME ?? 'local'}-style-8d-node-${process.pid}`
const WORKER_SECRET = process.env.HAIXUN_WORKER_SECRET ?? ''
const POLL_INTERVAL_MS = Number(process.env.HAIXUN_WORKER_POLL_MS ?? 3000)
const TARGET_SAMPLES = Number(process.env.HAIXUN_8D_TARGET_SAMPLES ?? 4)
type ApiEnvelope<T> = {
code: number
message: string
data?: T
}
type StepStatus = 'pending' | 'running' | 'succeeded' | 'failed' | 'cancelled'
type JobStep = {
id: string
status: StepStatus
started_at?: number
ended_at?: number
message?: string
}
type JobData = {
id: string
template_type: string
scope_id: string
status: string
phase: string
payload: Record<string, unknown>
progress: {
summary: string
percentage: number
steps: JobStep[]
}
}
type ScrapedPost = {
text: string
permalink?: string
likeCount?: number
replyCount?: number
}
type BrowserSessionData = {
account_id: string
storage_state: string
update_at: number
}
const STYLE_STEPS = ['session', 'samples', 'style', 'store']
const STEP_PROGRESS: Record<string, number> = {
session: 10,
samples: 40,
style: 75,
store: 92,
}
function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
function nowNano() {
return Date.now() * 1_000_000
}
async function api<T>(path: string, body?: unknown, method = 'POST'): Promise<T | undefined> {
const headers: Record<string, string> = { 'Content-Type': 'application/json' }
if (WORKER_SECRET) headers['X-Worker-Secret'] = WORKER_SECRET
const res = await fetch(`${API_BASE}${path}`, {
method,
headers,
body: body === undefined ? undefined : JSON.stringify(body),
})
const raw = await res.text()
let json: ApiEnvelope<T>
try {
json = JSON.parse(raw) as ApiEnvelope<T>
} catch {
throw new Error(`worker API returned non-JSON response from ${path}: HTTP ${res.status}`)
}
if (json.code !== SUCCESS_CODE) {
throw new Error(json.message || `request failed: ${path}`)
}
return json.data
}
async function claimJob() {
return api<JobData | null>('/api/v1/internal/workers/jobs/claim', {
worker_type: 'node',
worker_id: WORKER_ID,
})
}
async function heartbeat(jobID: string) {
await api(`/api/v1/internal/workers/jobs/${encodeURIComponent(jobID)}/heartbeat`, {
worker_id: WORKER_ID,
ttl_seconds: 300,
})
}
async function isCancelRequested(jobID: string) {
const data = await api<{ cancelled: boolean }>(
`/api/v1/internal/workers/jobs/${encodeURIComponent(jobID)}/cancel-check`,
{ worker_id: WORKER_ID },
)
return !!data?.cancelled
}
async function ackCancel(jobID: string) {
await api(`/api/v1/internal/workers/jobs/${encodeURIComponent(jobID)}/cancel-ack`, {
worker_id: WORKER_ID,
})
}
async function failJob(jobID: string, error: unknown, phase?: string) {
const message = error instanceof Error ? error.message : String(error)
await api(`/api/v1/internal/workers/jobs/${encodeURIComponent(jobID)}/fail`, {
worker_id: WORKER_ID,
error: message,
phase,
})
}
async function completeJob(jobID: string, result: Record<string, unknown>) {
await api(`/api/v1/internal/workers/jobs/${encodeURIComponent(jobID)}/complete`, {
worker_id: WORKER_ID,
result,
})
}
async function report(job: JobData, phase: string, summary: string, steps: JobStep[], percentage?: number) {
await api(`/api/v1/internal/workers/jobs/${encodeURIComponent(job.id)}/progress`, {
worker_id: WORKER_ID,
phase,
summary,
percentage: percentage ?? STEP_PROGRESS[phase] ?? job.progress?.percentage ?? 0,
steps,
})
}
function normalizeSteps(job: JobData): JobStep[] {
const existing = new Map((job.progress?.steps ?? []).map((step) => [step.id, step]))
return STYLE_STEPS.map((id) => existing.get(id) ?? { id, status: 'pending' as const })
}
function updateStep(steps: JobStep[], id: string, status: StepStatus, message: string): JobStep[] {
const at = nowNano()
return steps.map((step) => {
if (step.id !== id) return step
return {
...step,
status,
message,
started_at: step.started_at ?? at,
ended_at: status === 'succeeded' || status === 'failed' || status === 'cancelled' ? at : step.ended_at,
}
})
}
async function runStep<T>(
job: JobData,
steps: JobStep[],
phase: string,
runningMessage: string,
doneMessage: string,
fn: () => Promise<T>,
) {
let nextSteps = updateStep(steps, phase, 'running', runningMessage)
await report(job, phase, runningMessage, nextSteps)
if (await isCancelRequested(job.id)) {
nextSteps = updateStep(nextSteps, phase, 'cancelled', '使用者已取消')
await report(job, phase, '任務已收到取消請求', nextSteps)
await ackCancel(job.id)
throw new Error('cancelled')
}
let result: T
try {
result = await fn()
} catch (error) {
const message = error instanceof Error ? error.message : String(error)
nextSteps = updateStep(nextSteps, phase, 'failed', message)
await report(job, phase, message, nextSteps)
throw error
}
nextSteps = updateStep(nextSteps, phase, 'succeeded', doneMessage)
await report(job, phase, doneMessage, nextSteps)
return { result, steps: nextSteps }
}
async function getBrowserStorageState(job: JobData) {
const accountID = String(job.payload.threads_account_id ?? '').trim()
const tenantID = String(job.payload.tenant_id ?? '').trim()
const ownerUID = String(job.payload.owner_uid ?? '').trim()
if (!accountID || !tenantID || !ownerUID) return ''
try {
const data = await api<BrowserSessionData>(
`/api/v1/internal/workers/threads-accounts/${encodeURIComponent(accountID)}/session`,
{
tenant_id: tenantID,
owner_uid: ownerUID,
},
)
return data?.storage_state ?? ''
} catch (error) {
console.warn(`[8d-worker] browser session unavailable, fallback to public page: ${error}`)
return ''
}
}
function toProgressSteps(steps: JobStep[]) {
return steps.map((step) => ({
id: step.id,
status: step.status,
started_at: step.started_at,
ended_at: step.ended_at,
message: step.message,
}))
}
async function analyzeStyle8DWithGo(job: JobData, username: string, posts: ScrapedPost[], steps: JobStep[]) {
const personaID = String(job.payload.persona_id ?? job.scope_id ?? '').trim()
const tenantID = String(job.payload.tenant_id ?? '').trim()
const ownerUID = String(job.payload.owner_uid ?? '').trim()
const accountID = String(job.payload.threads_account_id ?? '').trim()
if (!personaID || !tenantID || !ownerUID || !accountID) {
throw new Error('job payload 缺少 persona_id / tenant_id / owner_uid / threads_account_id')
}
const data = await api<{
persona_id: string
post_count: number
style_profile: string
style_benchmark: string
}>(
`/api/v1/internal/workers/jobs/${encodeURIComponent(job.id)}/analyze-style8d`,
{
worker_id: WORKER_ID,
tenant_id: tenantID,
owner_uid: ownerUID,
persona_id: personaID,
threads_account_id: accountID,
username,
posts: posts.map((post) => ({
text: post.text,
permalink: post.permalink,
like_count: post.likeCount ?? 0,
reply_count: post.replyCount ?? 0,
})),
steps: toProgressSteps(steps),
},
)
if (!data?.style_profile?.trim()) {
throw new Error('Go 8D 分析未回傳 style_profile')
}
return data
}
async function processStyle8D(job: JobData) {
const username = String(job.payload.benchmark_username ?? '').replace(/^@/, '').trim()
if (!username) throw new Error('job payload 缺少 benchmark_username')
let steps = normalizeSteps(job)
const hb = setInterval(() => {
heartbeat(job.id).catch((error) => console.warn(`[8d-worker] heartbeat failed: ${error}`))
}, 30_000)
try {
let storageState = ''
;({ result: storageState, steps } = await runStep(
job,
steps,
'session',
'確認 Chrome extension session 是否已同步…',
'Chrome session 已就緒',
async () => {
await heartbeat(job.id)
const state = await getBrowserStorageState(job)
if (!state.trim()) {
const accountID = String(job.payload.threads_account_id ?? '').trim()
if (!accountID) {
throw new Error(
'尚未選定經營帳號。請在頂部切換 Threads 帳號,到連線頁用擴充套件同步登入態後再跑 8D。',
)
}
throw new Error(
'Chrome session 尚未同步到此帳號。請到該帳號連線頁,確認 Threads 已在 Chrome 登入,再按擴充套件同步。',
)
}
return state
},
))
let posts: ScrapedPost[] = []
;({ result: posts, steps } = await runStep(
job,
steps,
'samples',
`使用登入態抓取 @${username} 近期貼文樣本…`,
'貼文樣本已抓取',
async () => getProfilePosts(storageState, username, 12),
))
if (posts.length < TARGET_SAMPLES) {
throw new Error(
`可讀取的近期貼文不足 ${TARGET_SAMPLES} 篇(目前 ${posts.length} 篇)。請確認對標帳號存在、有公開貼文,且 Chrome session 仍有效(可重新同步擴充套件)。`,
)
}
steps = updateStep(steps, 'style', 'running', `已取得 ${posts.length} 篇,交由 Go LLM 分析八個維度…`)
await report(job, 'style', `已取得 ${posts.length} 篇,交由 Go LLM 分析八個維度…`, steps, 55)
if (await isCancelRequested(job.id)) {
steps = updateStep(steps, 'style', 'cancelled', '使用者已取消')
await report(job, 'style', '任務已收到取消請求', steps)
await ackCancel(job.id)
throw new Error('cancelled')
}
const analyzed = await analyzeStyle8DWithGo(job, username, posts, steps)
steps = updateStep(updateStep(steps, 'style', 'succeeded', '8D 風格策略已產生'), 'store', 'succeeded', '8D 策略已寫入人設')
await report(job, 'store', '8D 策略已寫入人設', steps, 92)
await completeJob(job.id, {
persona_id: job.payload.persona_id,
benchmark_username: username,
post_count: analyzed.post_count,
message: '8D style profile analyzed by Go LLM and stored',
})
console.log(`[8d-worker] completed job=${job.id} username=${username} posts=${posts.length}`)
} finally {
clearInterval(hb)
}
}
async function main() {
console.log(`[8d-worker] started id=${WORKER_ID} api=${API_BASE}`)
while (true) {
try {
const job = await claimJob()
if (!job) {
await sleep(POLL_INTERVAL_MS)
continue
}
console.log(`[8d-worker] claimed job=${job.id} template=${job.template_type}`)
if (job.template_type !== 'style-8d') {
await failJob(job.id, new Error(`unsupported node job template: ${job.template_type}`))
continue
}
try {
await processStyle8D(job)
} catch (error) {
if (error instanceof Error && error.message === 'cancelled') {
console.log(`[8d-worker] cancelled job=${job.id}`)
} else {
console.error(`[8d-worker] failed job=${job.id}`, error)
await failJob(job.id, error, job.phase)
}
}
} catch (error) {
console.error('[8d-worker] loop error', error)
await sleep(POLL_INTERVAL_MS)
}
}
}
main().catch((error) => {
console.error('[8d-worker] fatal', error)
process.exit(1)
})