thread-master/internal/worker/job/scan_placement.go

366 lines
10 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package job
import (
"context"
"fmt"
"strings"
app "haixun-backend/internal/library/errors"
"haixun-backend/internal/library/errors/code"
libkg "haixun-backend/internal/library/knowledge"
"haixun-backend/internal/library/placement"
"haixun-backend/internal/library/websearch"
branddomain "haixun-backend/internal/model/brand/domain/usecase"
jobdom "haixun-backend/internal/model/job/domain/usecase"
kgusecase "haixun-backend/internal/model/knowledge_graph/domain/usecase"
placementusecase "haixun-backend/internal/model/placement/usecase"
placementtopicdomain "haixun-backend/internal/model/placement_topic/domain/usecase"
scanpostusecase "haixun-backend/internal/model/scan_post/domain/usecase"
threadsaccountdomain "haixun-backend/internal/model/threads_account/domain/usecase"
)
type ScanPlacementDeps struct {
Jobs jobdom.UseCase
Brand branddomain.UseCase
PlacementTopic placementtopicdomain.UseCase
KnowledgeGraph kgusecase.UseCase
ScanPost scanpostusecase.UseCase
ThreadsAccount threadsaccountdomain.UseCase
Placement placementusecase.UseCase
}
func RegisterScanPlacementHandler(runner *Runner, deps ScanPlacementDeps) {
if runner == nil {
return
}
runner.RegisterStepHandler("crawl", func(ctx context.Context, step StepContext) error {
return runScanPlacement(ctx, step, deps)
})
}
func runScanPlacement(ctx context.Context, step StepContext, deps ScanPlacementDeps) error {
payload := step.Run.Payload
tenantID := stringField(payload, "tenant_id")
ownerUID := stringField(payload, "owner_uid")
brandID := brandIDFromPayload(payload)
graphID := stringField(payload, "graph_id")
if tenantID == "" || ownerUID == "" || brandID == "" {
return fmt.Errorf("placement-scan payload missing tenant_id, owner_uid, or brand_id")
}
patrolMode := boolField(payload, "patrol_mode")
brand, brandErr := deps.Brand.Get(ctx, tenantID, ownerUID, brandID)
if brandErr != nil {
return brandErr
}
if brand == nil {
return fmt.Errorf("brand not found")
}
topicID := topicIDFromPayload(payload)
if topicID == "" && step.Run != nil && strings.TrimSpace(step.Run.Scope) == "placement_topic" {
topicID = strings.TrimSpace(step.Run.ScopeID)
}
var graphSummary *kgusecase.GraphSummary
var graphErr error
if topicID != "" {
graphSummary, graphErr = deps.KnowledgeGraph.GetByTopic(ctx, tenantID, ownerUID, topicID, brandID)
} else {
graphSummary, graphErr = deps.KnowledgeGraph.Get(ctx, tenantID, ownerUID, brandID)
}
if graphErr != nil {
if patrolMode && isKnowledgeGraphNotFound(graphErr) {
graphSummary = nil
if graphID == "" {
if topicID != "" {
graphID = topicID
} else {
graphID = brandID
}
}
} else {
return graphErr
}
} else if graphID == "" {
graphID = graphSummary.ID
}
researchMap := brand.ResearchMap
if topicID != "" && deps.PlacementTopic != nil {
if topic, topicErr := deps.PlacementTopic.Get(ctx, tenantID, ownerUID, topicID); topicErr == nil && topic != nil {
researchMap = topic.ResearchMap
brand.ProductID = topic.ProductID
}
}
patrolKeywords := []string{}
if patrolMode {
productBrief := strings.TrimSpace(brand.ProductBrief)
if formatted := placement.ProductBriefFromContext(brand.ProductContext); formatted != "" {
productBrief = formatted
}
patrolNodes := []libkg.Node{}
if graphSummary != nil {
patrolNodes = graphSummary.Nodes
}
patrolKeywords = libkg.ResolveScanPatrolKeywords(
stringSliceField(payload, "patrol_keywords"),
researchMap.PatrolKeywords,
libkg.PatrolTagInputFromBrand(brand, productBrief),
patrolNodes,
)
if len(patrolKeywords) == 0 {
return fmt.Errorf("請先在研究地圖填寫要回覆的海巡關鍵字")
}
}
nodes := []libkg.Node{}
if graphSummary != nil {
nodes = graphSummary.Nodes
}
if !patrolMode {
if graphSummary == nil {
return fmt.Errorf("請先產生延伸知識圖譜,或改用手動海巡關鍵字")
}
if ids := stringSliceField(payload, "node_ids"); len(ids) > 0 {
nodes = filterNodesByIDs(graphSummary.Nodes, ids)
} else {
nodes = selectedNodes(graphSummary.Nodes)
}
if len(nodes) == 0 {
return fmt.Errorf("請先勾選要海巡的節點並儲存")
}
}
research, err := deps.Placement.ResearchSettings(ctx, tenantID, ownerUID)
if err != nil {
return err
}
memberCtx, err := deps.ThreadsAccount.ResolveMemberPlacementContext(ctx, tenantID, ownerUID, research)
if err != nil {
return err
}
if !memberCtx.AllowsBrave && !memberCtx.AllowsThreadsAPI && !memberCtx.AllowsCrawler {
return fmt.Errorf("目前連線模式無法海巡,請確認 Threads API、Brave 或 Chrome Session 設定")
}
if placement.MemberNeedsWebSearchKey(memberCtx) && strings.TrimSpace(memberCtx.WebSearchAPIKey()) == "" {
return fmt.Errorf("%s", placement.WebSearchKeyRequiredMessage(placement.ResearchSettings{
WebSearchProvider: memberCtx.WebSearchProvider,
BraveAPIKey: memberCtx.BraveAPIKey,
ExaAPIKey: memberCtx.ExaAPIKey,
}))
}
if memberCtx.DevMode && !memberCtx.BrowserConnected {
return fmt.Errorf("開發模式需先同步 Chrome Session")
}
if !memberCtx.DevMode && memberCtx.AllowsThreadsAPI && !memberCtx.ApiConnected {
return fmt.Errorf("正式模式需先完成 Threads API 連線")
}
updateProgress := func(summary string, percentage int) {
_ = step.Heartbeat(ctx)
_, _ = deps.Jobs.UpdateProgress(ctx, jobdom.UpdateProgressRequest{
JobID: step.JobID,
WorkerID: step.WorkerID,
Phase: "crawl",
Summary: summary,
Percentage: percentage,
})
}
exclusions := append([]string{}, researchMap.Exclusions...)
if len(patrolKeywords) > 0 {
updateProgress(fmt.Sprintf("依 %d 組海巡關鍵字準備雙軌搜尋…", len(patrolKeywords)), 5)
} else {
updateProgress("準備置入海巡…", 5)
}
if err := deps.ScanPost.ClearPlacementScan(ctx, tenantID, ownerUID, brandID, topicID); err != nil {
return err
}
webClient := websearch.New(memberCtx.WebSearchConfig())
crawlerFn := placement.WrapPoliteCrawler(makeCrawlerSearchFn(deps, tenantID, ownerUID))
graphNodes := []libkg.Node{}
if graphSummary != nil {
graphNodes = graphSummary.Nodes
}
checkpointReq := scanpostusecase.CheckpointRequest{
TenantID: tenantID,
OwnerUID: ownerUID,
BrandID: brandID,
TopicID: topicID,
GraphID: graphID,
ScanJobID: step.JobID,
}
candidates, err := placement.RunDualTrackDiscover(ctx, placement.DualTrackInput{
Nodes: graphNodes,
PatrolKeywords: patrolKeywords,
Exclusions: exclusions,
Member: memberCtx,
WebSearch: webClient,
Crawler: crawlerFn,
OnCheckpoint: func(batch []placement.ScanCandidate) error {
if len(batch) == 0 {
return nil
}
checkpointReq.Posts = batch
saved, err := deps.ScanPost.UpsertScanCheckpoint(ctx, checkpointReq)
if err != nil {
return err
}
updateProgress(fmt.Sprintf("已儲存 %d 篇候選貼文(邊爬邊存)", saved), -1)
return nil
},
}, updateProgress)
if err != nil {
return err
}
scrapeReplies := memberCtx.ScrapeReplies
if v, ok := payload["scrape_replies"].(bool); ok {
scrapeReplies = v
} else if memberCtx.ApiConnected && strings.TrimSpace(memberCtx.ThreadsAPIAccessToken) != "" {
// Formal Threads API mode can fetch replies without browser session.
scrapeReplies = true
}
if scrapeReplies {
updateProgress("抓取高優先貼文留言…", 88)
candidates = placement.AttachReplies(ctx, placement.ScrapeRepliesInput{
Posts: candidates,
Member: memberCtx,
RepliesPerPost: memberCtx.RepliesPerPost,
})
}
updateProgress(fmt.Sprintf("整理 %d 篇海巡結果…", len(candidates)), 92)
count, err := deps.ScanPost.FinalizeScan(ctx, scanpostusecase.ReplaceRequest{
TenantID: tenantID,
OwnerUID: ownerUID,
BrandID: brandID,
TopicID: topicID,
GraphID: graphID,
ScanJobID: step.JobID,
Posts: candidates,
})
if err != nil {
return err
}
gold := 0
recent := 0
solved := 0
replyCount := 0
for _, item := range candidates {
replyCount += len(item.Replies)
if item.Priority == "gold" {
gold++
}
if item.Priority == "gold" || item.Priority == "recent" {
recent++
}
if item.SolvedByProduct {
solved++
}
}
handoff := map[string]any{
"flow": "placement",
"brand_id": brandID,
"summary": fmt.Sprintf(
"雙軌海巡完成:%d 篇gold %d、近期軌 %d、產品可解 %d",
count, gold, recent, solved,
),
"pain_breakdown": map[string]any{
"posts": count,
"gold": gold,
"recent_7d": recent,
"solved_by_prod": solved,
"replies": replyCount,
},
"next_route": "/outreach?brand=" + brandID,
"needs_supplemental_expand": false,
"search_source_mode": string(memberCtx.SearchSourceMode),
"dev_mode": memberCtx.DevMode,
}
_, err = deps.Jobs.CompleteRun(ctx, jobdom.CompleteRunRequest{
JobID: step.JobID,
WorkerID: step.WorkerID,
Result: map[string]any{
"post_count": count,
"gold_count": gold,
"recent_count": recent,
"solved_count": solved,
"reply_count": replyCount,
"search_source_mode": string(memberCtx.SearchSourceMode),
"handoff": handoff,
},
})
return err
}
func selectedNodes(nodes []libkg.Node) []libkg.Node {
out := make([]libkg.Node, 0, len(nodes))
for _, node := range nodes {
if node.SelectedForScan {
out = append(out, node)
}
}
return out
}
func filterNodesByIDs(nodes []libkg.Node, ids []string) []libkg.Node {
allowed := map[string]struct{}{}
for _, id := range ids {
id = strings.TrimSpace(id)
if id != "" {
allowed[id] = struct{}{}
}
}
out := make([]libkg.Node, 0, len(ids))
for _, node := range nodes {
if _, ok := allowed[node.ID]; ok {
out = append(out, node)
}
}
return out
}
func isKnowledgeGraphNotFound(err error) bool {
if err == nil {
return false
}
if e := app.FromError(err); e != nil && e.Category() == code.ResNotFound {
return true
}
return strings.Contains(strings.ToLower(err.Error()), "knowledge graph not found")
}
func stringSliceField(payload map[string]any, key string) []string {
if payload == nil {
return nil
}
raw, ok := payload[key]
if !ok || raw == nil {
return nil
}
switch v := raw.(type) {
case []string:
return v
case []any:
out := make([]string, 0, len(v))
for _, item := range v {
if s, ok := item.(string); ok {
out = append(out, s)
}
}
return out
default:
return nil
}
}