haixunMaster/haixun-backend/internal/worker/job/scan_placement.go

366 lines
10 KiB
Go
Raw Permalink Normal View History

2026-06-24 10:02:42 +00:00
package job
import (
"context"
"fmt"
"strings"
2026-06-24 16:48:56 +00:00
app "haixun-backend/internal/library/errors"
"haixun-backend/internal/library/errors/code"
2026-06-24 10:02:42 +00:00
libkg "haixun-backend/internal/library/knowledge"
"haixun-backend/internal/library/placement"
2026-06-25 08:20:03 +00:00
"haixun-backend/internal/library/websearch"
2026-06-24 10:02:42 +00:00
branddomain "haixun-backend/internal/model/brand/domain/usecase"
jobdom "haixun-backend/internal/model/job/domain/usecase"
kgusecase "haixun-backend/internal/model/knowledge_graph/domain/usecase"
placementusecase "haixun-backend/internal/model/placement/usecase"
2026-06-25 08:20:03 +00:00
placementtopicdomain "haixun-backend/internal/model/placement_topic/domain/usecase"
2026-06-24 10:02:42 +00:00
scanpostusecase "haixun-backend/internal/model/scan_post/domain/usecase"
threadsaccountdomain "haixun-backend/internal/model/threads_account/domain/usecase"
)
type ScanPlacementDeps struct {
Jobs jobdom.UseCase
Brand branddomain.UseCase
2026-06-24 17:30:47 +00:00
PlacementTopic placementtopicdomain.UseCase
2026-06-24 10:02:42 +00:00
KnowledgeGraph kgusecase.UseCase
ScanPost scanpostusecase.UseCase
ThreadsAccount threadsaccountdomain.UseCase
Placement placementusecase.UseCase
}
func RegisterScanPlacementHandler(runner *Runner, deps ScanPlacementDeps) {
if runner == nil {
return
}
runner.RegisterStepHandler("crawl", func(ctx context.Context, step StepContext) error {
return runScanPlacement(ctx, step, deps)
})
}
func runScanPlacement(ctx context.Context, step StepContext, deps ScanPlacementDeps) error {
payload := step.Run.Payload
tenantID := stringField(payload, "tenant_id")
ownerUID := stringField(payload, "owner_uid")
brandID := brandIDFromPayload(payload)
graphID := stringField(payload, "graph_id")
if tenantID == "" || ownerUID == "" || brandID == "" {
return fmt.Errorf("placement-scan payload missing tenant_id, owner_uid, or brand_id")
}
2026-06-24 16:48:56 +00:00
patrolMode := boolField(payload, "patrol_mode")
brand, brandErr := deps.Brand.Get(ctx, tenantID, ownerUID, brandID)
if brandErr != nil {
return brandErr
2026-06-24 10:02:42 +00:00
}
2026-06-24 16:48:56 +00:00
if brand == nil {
return fmt.Errorf("brand not found")
}
2026-06-24 17:30:47 +00:00
topicID := topicIDFromPayload(payload)
if topicID == "" && step.Run != nil && strings.TrimSpace(step.Run.Scope) == "placement_topic" {
topicID = strings.TrimSpace(step.Run.ScopeID)
}
var graphSummary *kgusecase.GraphSummary
var graphErr error
if topicID != "" {
2026-06-25 08:20:03 +00:00
graphSummary, graphErr = deps.KnowledgeGraph.GetByTopic(ctx, tenantID, ownerUID, topicID, brandID)
2026-06-24 17:30:47 +00:00
} else {
graphSummary, graphErr = deps.KnowledgeGraph.Get(ctx, tenantID, ownerUID, brandID)
}
2026-06-24 16:48:56 +00:00
if graphErr != nil {
if patrolMode && isKnowledgeGraphNotFound(graphErr) {
2026-06-24 17:30:47 +00:00
graphSummary = nil
2026-06-24 16:48:56 +00:00
if graphID == "" {
2026-06-24 17:30:47 +00:00
if topicID != "" {
graphID = topicID
} else {
graphID = brandID
}
2026-06-24 16:48:56 +00:00
}
} else {
return graphErr
}
} else if graphID == "" {
2026-06-24 17:30:47 +00:00
graphID = graphSummary.ID
}
researchMap := brand.ResearchMap
if topicID != "" && deps.PlacementTopic != nil {
if topic, topicErr := deps.PlacementTopic.Get(ctx, tenantID, ownerUID, topicID); topicErr == nil && topic != nil {
researchMap = topic.ResearchMap
brand.ProductID = topic.ProductID
}
2026-06-24 10:02:42 +00:00
}
2026-06-24 16:48:56 +00:00
patrolKeywords := []string{}
if patrolMode {
2026-06-24 17:30:47 +00:00
productBrief := strings.TrimSpace(brand.ProductBrief)
if formatted := placement.ProductBriefFromContext(brand.ProductContext); formatted != "" {
productBrief = formatted
2026-06-24 16:48:56 +00:00
}
2026-06-24 17:30:47 +00:00
patrolNodes := []libkg.Node{}
if graphSummary != nil {
patrolNodes = graphSummary.Nodes
}
patrolKeywords = libkg.ResolveScanPatrolKeywords(
stringSliceField(payload, "patrol_keywords"),
researchMap.PatrolKeywords,
libkg.PatrolTagInputFromBrand(brand, productBrief),
patrolNodes,
)
2026-06-24 16:48:56 +00:00
if len(patrolKeywords) == 0 {
return fmt.Errorf("請先在研究地圖填寫要回覆的海巡關鍵字")
}
}
nodes := []libkg.Node{}
2026-06-24 17:30:47 +00:00
if graphSummary != nil {
nodes = graphSummary.Nodes
2026-06-24 10:02:42 +00:00
}
2026-06-24 16:48:56 +00:00
if !patrolMode {
2026-06-24 17:30:47 +00:00
if graphSummary == nil {
2026-06-24 16:48:56 +00:00
return fmt.Errorf("請先產生延伸知識圖譜,或改用手動海巡關鍵字")
}
if ids := stringSliceField(payload, "node_ids"); len(ids) > 0 {
2026-06-24 17:30:47 +00:00
nodes = filterNodesByIDs(graphSummary.Nodes, ids)
2026-06-24 16:48:56 +00:00
} else {
2026-06-24 17:30:47 +00:00
nodes = selectedNodes(graphSummary.Nodes)
2026-06-24 16:48:56 +00:00
}
if len(nodes) == 0 {
2026-06-24 17:30:47 +00:00
return fmt.Errorf("請先勾選要海巡的節點並儲存")
2026-06-24 16:48:56 +00:00
}
2026-06-24 10:02:42 +00:00
}
research, err := deps.Placement.ResearchSettings(ctx, tenantID, ownerUID)
if err != nil {
return err
}
memberCtx, err := deps.ThreadsAccount.ResolveMemberPlacementContext(ctx, tenantID, ownerUID, research)
if err != nil {
return err
}
if !memberCtx.AllowsBrave && !memberCtx.AllowsThreadsAPI && !memberCtx.AllowsCrawler {
return fmt.Errorf("目前連線模式無法海巡,請確認 Threads API、Brave 或 Chrome Session 設定")
}
2026-06-25 08:20:03 +00:00
if placement.MemberNeedsWebSearchKey(memberCtx) && strings.TrimSpace(memberCtx.WebSearchAPIKey()) == "" {
return fmt.Errorf("%s", placement.WebSearchKeyRequiredMessage(placement.ResearchSettings{
WebSearchProvider: memberCtx.WebSearchProvider,
BraveAPIKey: memberCtx.BraveAPIKey,
ExaAPIKey: memberCtx.ExaAPIKey,
}))
2026-06-24 10:02:42 +00:00
}
if memberCtx.DevMode && !memberCtx.BrowserConnected {
return fmt.Errorf("開發模式需先同步 Chrome Session")
}
2026-06-24 16:48:56 +00:00
if !memberCtx.DevMode && memberCtx.AllowsThreadsAPI && !memberCtx.ApiConnected {
return fmt.Errorf("正式模式需先完成 Threads API 連線")
}
2026-06-24 10:02:42 +00:00
updateProgress := func(summary string, percentage int) {
_ = step.Heartbeat(ctx)
_, _ = deps.Jobs.UpdateProgress(ctx, jobdom.UpdateProgressRequest{
JobID: step.JobID,
WorkerID: step.WorkerID,
Phase: "crawl",
Summary: summary,
Percentage: percentage,
})
}
2026-06-24 17:30:47 +00:00
exclusions := append([]string{}, researchMap.Exclusions...)
2026-06-24 16:48:56 +00:00
if len(patrolKeywords) > 0 {
updateProgress(fmt.Sprintf("依 %d 組海巡關鍵字準備雙軌搜尋…", len(patrolKeywords)), 5)
} else {
updateProgress("準備置入海巡…", 5)
2026-06-24 10:02:42 +00:00
}
2026-06-24 17:30:47 +00:00
if err := deps.ScanPost.ClearPlacementScan(ctx, tenantID, ownerUID, brandID, topicID); err != nil {
2026-06-24 16:48:56 +00:00
return err
}
2026-06-24 10:02:42 +00:00
2026-06-25 08:20:03 +00:00
webClient := websearch.New(memberCtx.WebSearchConfig())
2026-06-24 16:48:56 +00:00
crawlerFn := placement.WrapPoliteCrawler(makeCrawlerSearchFn(deps, tenantID, ownerUID))
graphNodes := []libkg.Node{}
2026-06-24 17:30:47 +00:00
if graphSummary != nil {
graphNodes = graphSummary.Nodes
2026-06-24 16:48:56 +00:00
}
checkpointReq := scanpostusecase.CheckpointRequest{
TenantID: tenantID,
OwnerUID: ownerUID,
BrandID: brandID,
2026-06-24 17:30:47 +00:00
TopicID: topicID,
2026-06-24 16:48:56 +00:00
GraphID: graphID,
ScanJobID: step.JobID,
}
2026-06-24 10:02:42 +00:00
candidates, err := placement.RunDualTrackDiscover(ctx, placement.DualTrackInput{
2026-06-24 16:48:56 +00:00
Nodes: graphNodes,
PatrolKeywords: patrolKeywords,
Exclusions: exclusions,
Member: memberCtx,
2026-06-25 08:20:03 +00:00
WebSearch: webClient,
2026-06-24 16:48:56 +00:00
Crawler: crawlerFn,
OnCheckpoint: func(batch []placement.ScanCandidate) error {
if len(batch) == 0 {
return nil
}
checkpointReq.Posts = batch
saved, err := deps.ScanPost.UpsertScanCheckpoint(ctx, checkpointReq)
if err != nil {
return err
}
updateProgress(fmt.Sprintf("已儲存 %d 篇候選貼文(邊爬邊存)", saved), -1)
return nil
},
2026-06-24 10:02:42 +00:00
}, updateProgress)
if err != nil {
return err
}
scrapeReplies := memberCtx.ScrapeReplies
if v, ok := payload["scrape_replies"].(bool); ok {
scrapeReplies = v
} else if memberCtx.ApiConnected && strings.TrimSpace(memberCtx.ThreadsAPIAccessToken) != "" {
// Formal Threads API mode can fetch replies without browser session.
scrapeReplies = true
}
if scrapeReplies {
updateProgress("抓取高優先貼文留言…", 88)
candidates = placement.AttachReplies(ctx, placement.ScrapeRepliesInput{
Posts: candidates,
Member: memberCtx,
RepliesPerPost: memberCtx.RepliesPerPost,
})
}
2026-06-24 16:48:56 +00:00
updateProgress(fmt.Sprintf("整理 %d 篇海巡結果…", len(candidates)), 92)
count, err := deps.ScanPost.FinalizeScan(ctx, scanpostusecase.ReplaceRequest{
2026-06-24 10:02:42 +00:00
TenantID: tenantID,
OwnerUID: ownerUID,
BrandID: brandID,
2026-06-24 17:30:47 +00:00
TopicID: topicID,
2026-06-24 10:02:42 +00:00
GraphID: graphID,
ScanJobID: step.JobID,
Posts: candidates,
})
if err != nil {
return err
}
gold := 0
recent := 0
solved := 0
replyCount := 0
for _, item := range candidates {
replyCount += len(item.Replies)
if item.Priority == "gold" {
gold++
}
if item.Priority == "gold" || item.Priority == "recent" {
recent++
}
if item.SolvedByProduct {
solved++
}
}
handoff := map[string]any{
"flow": "placement",
"brand_id": brandID,
"summary": fmt.Sprintf(
"雙軌海巡完成:%d 篇gold %d、近期軌 %d、產品可解 %d",
count, gold, recent, solved,
),
"pain_breakdown": map[string]any{
"posts": count,
"gold": gold,
"recent_7d": recent,
"solved_by_prod": solved,
"replies": replyCount,
},
"next_route": "/outreach?brand=" + brandID,
"needs_supplemental_expand": false,
"search_source_mode": string(memberCtx.SearchSourceMode),
"dev_mode": memberCtx.DevMode,
}
_, err = deps.Jobs.CompleteRun(ctx, jobdom.CompleteRunRequest{
JobID: step.JobID,
WorkerID: step.WorkerID,
Result: map[string]any{
"post_count": count,
"gold_count": gold,
"recent_count": recent,
"solved_count": solved,
"reply_count": replyCount,
"search_source_mode": string(memberCtx.SearchSourceMode),
"handoff": handoff,
},
})
return err
}
func selectedNodes(nodes []libkg.Node) []libkg.Node {
out := make([]libkg.Node, 0, len(nodes))
for _, node := range nodes {
if node.SelectedForScan {
out = append(out, node)
}
}
return out
}
func filterNodesByIDs(nodes []libkg.Node, ids []string) []libkg.Node {
allowed := map[string]struct{}{}
for _, id := range ids {
id = strings.TrimSpace(id)
if id != "" {
allowed[id] = struct{}{}
}
}
out := make([]libkg.Node, 0, len(ids))
for _, node := range nodes {
if _, ok := allowed[node.ID]; ok {
out = append(out, node)
}
}
return out
}
2026-06-24 16:48:56 +00:00
func isKnowledgeGraphNotFound(err error) bool {
if err == nil {
return false
}
if e := app.FromError(err); e != nil && e.Category() == code.ResNotFound {
return true
}
return strings.Contains(strings.ToLower(err.Error()), "knowledge graph not found")
}
2026-06-24 10:02:42 +00:00
func stringSliceField(payload map[string]any, key string) []string {
if payload == nil {
return nil
}
raw, ok := payload[key]
if !ok || raw == nil {
return nil
}
switch v := raw.(type) {
case []string:
return v
case []any:
out := make([]string, 0, len(v))
for _, item := range v {
if s, ok := item.(string); ok {
out = append(out, s)
}
}
return out
default:
return nil
}
}