366 lines
10 KiB
Go
366 lines
10 KiB
Go
package job
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"strings"
|
||
|
||
app "haixun-backend/internal/library/errors"
|
||
"haixun-backend/internal/library/errors/code"
|
||
libkg "haixun-backend/internal/library/knowledge"
|
||
"haixun-backend/internal/library/placement"
|
||
"haixun-backend/internal/library/websearch"
|
||
branddomain "haixun-backend/internal/model/brand/domain/usecase"
|
||
jobdom "haixun-backend/internal/model/job/domain/usecase"
|
||
kgusecase "haixun-backend/internal/model/knowledge_graph/domain/usecase"
|
||
placementusecase "haixun-backend/internal/model/placement/usecase"
|
||
placementtopicdomain "haixun-backend/internal/model/placement_topic/domain/usecase"
|
||
scanpostusecase "haixun-backend/internal/model/scan_post/domain/usecase"
|
||
threadsaccountdomain "haixun-backend/internal/model/threads_account/domain/usecase"
|
||
)
|
||
|
||
type ScanPlacementDeps struct {
|
||
Jobs jobdom.UseCase
|
||
Brand branddomain.UseCase
|
||
PlacementTopic placementtopicdomain.UseCase
|
||
KnowledgeGraph kgusecase.UseCase
|
||
ScanPost scanpostusecase.UseCase
|
||
ThreadsAccount threadsaccountdomain.UseCase
|
||
Placement placementusecase.UseCase
|
||
}
|
||
|
||
func RegisterScanPlacementHandler(runner *Runner, deps ScanPlacementDeps) {
|
||
if runner == nil {
|
||
return
|
||
}
|
||
runner.RegisterStepHandler("crawl", func(ctx context.Context, step StepContext) error {
|
||
return runScanPlacement(ctx, step, deps)
|
||
})
|
||
}
|
||
|
||
func runScanPlacement(ctx context.Context, step StepContext, deps ScanPlacementDeps) error {
|
||
payload := step.Run.Payload
|
||
tenantID := stringField(payload, "tenant_id")
|
||
ownerUID := stringField(payload, "owner_uid")
|
||
brandID := brandIDFromPayload(payload)
|
||
graphID := stringField(payload, "graph_id")
|
||
|
||
if tenantID == "" || ownerUID == "" || brandID == "" {
|
||
return fmt.Errorf("placement-scan payload missing tenant_id, owner_uid, or brand_id")
|
||
}
|
||
|
||
patrolMode := boolField(payload, "patrol_mode")
|
||
|
||
brand, brandErr := deps.Brand.Get(ctx, tenantID, ownerUID, brandID)
|
||
if brandErr != nil {
|
||
return brandErr
|
||
}
|
||
if brand == nil {
|
||
return fmt.Errorf("brand not found")
|
||
}
|
||
|
||
topicID := topicIDFromPayload(payload)
|
||
if topicID == "" && step.Run != nil && strings.TrimSpace(step.Run.Scope) == "placement_topic" {
|
||
topicID = strings.TrimSpace(step.Run.ScopeID)
|
||
}
|
||
var graphSummary *kgusecase.GraphSummary
|
||
var graphErr error
|
||
if topicID != "" {
|
||
graphSummary, graphErr = deps.KnowledgeGraph.GetByTopic(ctx, tenantID, ownerUID, topicID, brandID)
|
||
} else {
|
||
graphSummary, graphErr = deps.KnowledgeGraph.Get(ctx, tenantID, ownerUID, brandID)
|
||
}
|
||
if graphErr != nil {
|
||
if patrolMode && isKnowledgeGraphNotFound(graphErr) {
|
||
graphSummary = nil
|
||
if graphID == "" {
|
||
if topicID != "" {
|
||
graphID = topicID
|
||
} else {
|
||
graphID = brandID
|
||
}
|
||
}
|
||
} else {
|
||
return graphErr
|
||
}
|
||
} else if graphID == "" {
|
||
graphID = graphSummary.ID
|
||
}
|
||
|
||
researchMap := brand.ResearchMap
|
||
if topicID != "" && deps.PlacementTopic != nil {
|
||
if topic, topicErr := deps.PlacementTopic.Get(ctx, tenantID, ownerUID, topicID); topicErr == nil && topic != nil {
|
||
researchMap = topic.ResearchMap
|
||
brand.ProductID = topic.ProductID
|
||
}
|
||
}
|
||
|
||
patrolKeywords := []string{}
|
||
if patrolMode {
|
||
productBrief := strings.TrimSpace(brand.ProductBrief)
|
||
if formatted := placement.ProductBriefFromContext(brand.ProductContext); formatted != "" {
|
||
productBrief = formatted
|
||
}
|
||
patrolNodes := []libkg.Node{}
|
||
if graphSummary != nil {
|
||
patrolNodes = graphSummary.Nodes
|
||
}
|
||
patrolKeywords = libkg.ResolveScanPatrolKeywords(
|
||
stringSliceField(payload, "patrol_keywords"),
|
||
researchMap.PatrolKeywords,
|
||
libkg.PatrolTagInputFromBrand(brand, productBrief),
|
||
patrolNodes,
|
||
)
|
||
if len(patrolKeywords) == 0 {
|
||
return fmt.Errorf("請先在研究地圖填寫要回覆的海巡關鍵字")
|
||
}
|
||
}
|
||
|
||
nodes := []libkg.Node{}
|
||
if graphSummary != nil {
|
||
nodes = graphSummary.Nodes
|
||
}
|
||
if !patrolMode {
|
||
if graphSummary == nil {
|
||
return fmt.Errorf("請先產生延伸知識圖譜,或改用手動海巡關鍵字")
|
||
}
|
||
if ids := stringSliceField(payload, "node_ids"); len(ids) > 0 {
|
||
nodes = filterNodesByIDs(graphSummary.Nodes, ids)
|
||
} else {
|
||
nodes = selectedNodes(graphSummary.Nodes)
|
||
}
|
||
if len(nodes) == 0 {
|
||
return fmt.Errorf("請先勾選要海巡的節點並儲存")
|
||
}
|
||
}
|
||
|
||
research, err := deps.Placement.ResearchSettings(ctx, tenantID, ownerUID)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
memberCtx, err := deps.ThreadsAccount.ResolveMemberPlacementContext(ctx, tenantID, ownerUID, research)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if !memberCtx.AllowsBrave && !memberCtx.AllowsThreadsAPI && !memberCtx.AllowsCrawler {
|
||
return fmt.Errorf("目前連線模式無法海巡,請確認 Threads API、Brave 或 Chrome Session 設定")
|
||
}
|
||
if placement.MemberNeedsWebSearchKey(memberCtx) && strings.TrimSpace(memberCtx.WebSearchAPIKey()) == "" {
|
||
return fmt.Errorf("%s", placement.WebSearchKeyRequiredMessage(placement.ResearchSettings{
|
||
WebSearchProvider: memberCtx.WebSearchProvider,
|
||
BraveAPIKey: memberCtx.BraveAPIKey,
|
||
ExaAPIKey: memberCtx.ExaAPIKey,
|
||
}))
|
||
}
|
||
if memberCtx.DevMode && !memberCtx.BrowserConnected {
|
||
return fmt.Errorf("開發模式需先同步 Chrome Session")
|
||
}
|
||
if !memberCtx.DevMode && memberCtx.AllowsThreadsAPI && !memberCtx.ApiConnected {
|
||
return fmt.Errorf("正式模式需先完成 Threads API 連線")
|
||
}
|
||
|
||
updateProgress := func(summary string, percentage int) {
|
||
_ = step.Heartbeat(ctx)
|
||
_, _ = deps.Jobs.UpdateProgress(ctx, jobdom.UpdateProgressRequest{
|
||
JobID: step.JobID,
|
||
WorkerID: step.WorkerID,
|
||
Phase: "crawl",
|
||
Summary: summary,
|
||
Percentage: percentage,
|
||
})
|
||
}
|
||
|
||
exclusions := append([]string{}, researchMap.Exclusions...)
|
||
|
||
if len(patrolKeywords) > 0 {
|
||
updateProgress(fmt.Sprintf("依 %d 組海巡關鍵字準備雙軌搜尋…", len(patrolKeywords)), 5)
|
||
} else {
|
||
updateProgress("準備置入海巡…", 5)
|
||
}
|
||
|
||
if err := deps.ScanPost.ClearPlacementScan(ctx, tenantID, ownerUID, brandID, topicID); err != nil {
|
||
return err
|
||
}
|
||
|
||
webClient := websearch.New(memberCtx.WebSearchConfig())
|
||
crawlerFn := placement.WrapPoliteCrawler(makeCrawlerSearchFn(deps, tenantID, ownerUID))
|
||
graphNodes := []libkg.Node{}
|
||
if graphSummary != nil {
|
||
graphNodes = graphSummary.Nodes
|
||
}
|
||
checkpointReq := scanpostusecase.CheckpointRequest{
|
||
TenantID: tenantID,
|
||
OwnerUID: ownerUID,
|
||
BrandID: brandID,
|
||
TopicID: topicID,
|
||
GraphID: graphID,
|
||
ScanJobID: step.JobID,
|
||
}
|
||
candidates, err := placement.RunDualTrackDiscover(ctx, placement.DualTrackInput{
|
||
Nodes: graphNodes,
|
||
PatrolKeywords: patrolKeywords,
|
||
Exclusions: exclusions,
|
||
Member: memberCtx,
|
||
WebSearch: webClient,
|
||
Crawler: crawlerFn,
|
||
OnCheckpoint: func(batch []placement.ScanCandidate) error {
|
||
if len(batch) == 0 {
|
||
return nil
|
||
}
|
||
checkpointReq.Posts = batch
|
||
saved, err := deps.ScanPost.UpsertScanCheckpoint(ctx, checkpointReq)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
updateProgress(fmt.Sprintf("已儲存 %d 篇候選貼文(邊爬邊存)", saved), -1)
|
||
return nil
|
||
},
|
||
}, updateProgress)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
scrapeReplies := memberCtx.ScrapeReplies
|
||
if v, ok := payload["scrape_replies"].(bool); ok {
|
||
scrapeReplies = v
|
||
} else if memberCtx.ApiConnected && strings.TrimSpace(memberCtx.ThreadsAPIAccessToken) != "" {
|
||
// Formal Threads API mode can fetch replies without browser session.
|
||
scrapeReplies = true
|
||
}
|
||
if scrapeReplies {
|
||
updateProgress("抓取高優先貼文留言…", 88)
|
||
candidates = placement.AttachReplies(ctx, placement.ScrapeRepliesInput{
|
||
Posts: candidates,
|
||
Member: memberCtx,
|
||
RepliesPerPost: memberCtx.RepliesPerPost,
|
||
})
|
||
}
|
||
|
||
updateProgress(fmt.Sprintf("整理 %d 篇海巡結果…", len(candidates)), 92)
|
||
count, err := deps.ScanPost.FinalizeScan(ctx, scanpostusecase.ReplaceRequest{
|
||
TenantID: tenantID,
|
||
OwnerUID: ownerUID,
|
||
BrandID: brandID,
|
||
TopicID: topicID,
|
||
GraphID: graphID,
|
||
ScanJobID: step.JobID,
|
||
Posts: candidates,
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
gold := 0
|
||
recent := 0
|
||
solved := 0
|
||
replyCount := 0
|
||
for _, item := range candidates {
|
||
replyCount += len(item.Replies)
|
||
if item.Priority == "gold" {
|
||
gold++
|
||
}
|
||
if item.Priority == "gold" || item.Priority == "recent" {
|
||
recent++
|
||
}
|
||
if item.SolvedByProduct {
|
||
solved++
|
||
}
|
||
}
|
||
|
||
handoff := map[string]any{
|
||
"flow": "placement",
|
||
"brand_id": brandID,
|
||
"summary": fmt.Sprintf(
|
||
"雙軌海巡完成:%d 篇(gold %d、近期軌 %d、產品可解 %d)",
|
||
count, gold, recent, solved,
|
||
),
|
||
"pain_breakdown": map[string]any{
|
||
"posts": count,
|
||
"gold": gold,
|
||
"recent_7d": recent,
|
||
"solved_by_prod": solved,
|
||
"replies": replyCount,
|
||
},
|
||
"next_route": "/outreach?brand=" + brandID,
|
||
"needs_supplemental_expand": false,
|
||
"search_source_mode": string(memberCtx.SearchSourceMode),
|
||
"dev_mode": memberCtx.DevMode,
|
||
}
|
||
|
||
_, err = deps.Jobs.CompleteRun(ctx, jobdom.CompleteRunRequest{
|
||
JobID: step.JobID,
|
||
WorkerID: step.WorkerID,
|
||
Result: map[string]any{
|
||
"post_count": count,
|
||
"gold_count": gold,
|
||
"recent_count": recent,
|
||
"solved_count": solved,
|
||
"reply_count": replyCount,
|
||
"search_source_mode": string(memberCtx.SearchSourceMode),
|
||
"handoff": handoff,
|
||
},
|
||
})
|
||
return err
|
||
}
|
||
|
||
func selectedNodes(nodes []libkg.Node) []libkg.Node {
|
||
out := make([]libkg.Node, 0, len(nodes))
|
||
for _, node := range nodes {
|
||
if node.SelectedForScan {
|
||
out = append(out, node)
|
||
}
|
||
}
|
||
return out
|
||
}
|
||
|
||
func filterNodesByIDs(nodes []libkg.Node, ids []string) []libkg.Node {
|
||
allowed := map[string]struct{}{}
|
||
for _, id := range ids {
|
||
id = strings.TrimSpace(id)
|
||
if id != "" {
|
||
allowed[id] = struct{}{}
|
||
}
|
||
}
|
||
out := make([]libkg.Node, 0, len(ids))
|
||
for _, node := range nodes {
|
||
if _, ok := allowed[node.ID]; ok {
|
||
out = append(out, node)
|
||
}
|
||
}
|
||
return out
|
||
}
|
||
|
||
func isKnowledgeGraphNotFound(err error) bool {
|
||
if err == nil {
|
||
return false
|
||
}
|
||
if e := app.FromError(err); e != nil && e.Category() == code.ResNotFound {
|
||
return true
|
||
}
|
||
return strings.Contains(strings.ToLower(err.Error()), "knowledge graph not found")
|
||
}
|
||
|
||
func stringSliceField(payload map[string]any, key string) []string {
|
||
if payload == nil {
|
||
return nil
|
||
}
|
||
raw, ok := payload[key]
|
||
if !ok || raw == nil {
|
||
return nil
|
||
}
|
||
switch v := raw.(type) {
|
||
case []string:
|
||
return v
|
||
case []any:
|
||
out := make([]string, 0, len(v))
|
||
for _, item := range v {
|
||
if s, ok := item.(string); ok {
|
||
out = append(out, s)
|
||
}
|
||
}
|
||
return out
|
||
default:
|
||
return nil
|
||
}
|
||
}
|