thread-master/backend/internal/svc/service_context.go

417 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package svc
import (
"context"
"fmt"
"net/http"
"os"
"time"
"haixun-backend/internal/config"
libcrypto "haixun-backend/internal/library/crypto"
libmongo "haixun-backend/internal/library/mongo"
libredis "haixun-backend/internal/library/redis"
"haixun-backend/internal/library/validate"
"haixun-backend/internal/middleware"
aisettings "haixun-backend/internal/model/ai/usecase"
authrepodomain "haixun-backend/internal/model/auth/domain/repository"
authdomain "haixun-backend/internal/model/auth/domain/usecase"
authrepo "haixun-backend/internal/model/auth/repository"
authuc "haixun-backend/internal/model/auth/usecase"
branddomain "haixun-backend/internal/model/brand/domain/usecase"
brandrepo "haixun-backend/internal/model/brand/repository"
brandusecase "haixun-backend/internal/model/brand/usecase"
cmatrixdomain "haixun-backend/internal/model/content_matrix/domain/usecase"
cmatrixrepo "haixun-backend/internal/model/content_matrix/repository"
cmatrixusecase "haixun-backend/internal/model/content_matrix/usecase"
copydraftdomain "haixun-backend/internal/model/copy_draft/domain/usecase"
copydraftrepo "haixun-backend/internal/model/copy_draft/repository"
copydraftusecase "haixun-backend/internal/model/copy_draft/usecase"
copymissiondomain "haixun-backend/internal/model/copy_mission/domain/usecase"
copymissionrepo "haixun-backend/internal/model/copy_mission/repository"
copymissionusecase "haixun-backend/internal/model/copy_mission/usecase"
jobrepo "haixun-backend/internal/model/job/repository"
jobusecase "haixun-backend/internal/model/job/usecase"
kgdomain "haixun-backend/internal/model/knowledge_graph/domain/usecase"
kgrepo "haixun-backend/internal/model/knowledge_graph/repository"
kgusecase "haixun-backend/internal/model/knowledge_graph/usecase"
memberdomain "haixun-backend/internal/model/member/domain/usecase"
memberrepo "haixun-backend/internal/model/member/repository"
memberuc "haixun-backend/internal/model/member/usecase"
outreachdraftdomain "haixun-backend/internal/model/outreach_draft/domain/usecase"
outreachdraftrepo "haixun-backend/internal/model/outreach_draft/repository"
outreachdraftusecase "haixun-backend/internal/model/outreach_draft/usecase"
permissiondomain "haixun-backend/internal/model/permission/domain/usecase"
permissionrepo "haixun-backend/internal/model/permission/repository"
permissionuc "haixun-backend/internal/model/permission/usecase"
personadomain "haixun-backend/internal/model/persona/domain/usecase"
personarepo "haixun-backend/internal/model/persona/repository"
personausecase "haixun-backend/internal/model/persona/usecase"
placementusecase "haixun-backend/internal/model/placement/usecase"
placementtopicdomain "haixun-backend/internal/model/placement_topic/domain/usecase"
placementtopicrepo "haixun-backend/internal/model/placement_topic/repository"
placementtopicusecase "haixun-backend/internal/model/placement_topic/usecase"
scanpostdomain "haixun-backend/internal/model/scan_post/domain/usecase"
scanpostrepo "haixun-backend/internal/model/scan_post/repository"
scanpostusecase "haixun-backend/internal/model/scan_post/usecase"
settingrepo "haixun-backend/internal/model/setting/repository"
settingusecase "haixun-backend/internal/model/setting/usecase"
threadsaccountdomain "haixun-backend/internal/model/threads_account/domain/usecase"
threadsaccountrepo "haixun-backend/internal/model/threads_account/repository"
threadsaccountusecase "haixun-backend/internal/model/threads_account/usecase"
jobworker "haixun-backend/internal/worker/job"
goredis "github.com/redis/go-redis/v9"
"github.com/zeromicro/go-zero/rest"
)
type ServiceContext struct {
Config config.Config
Validator *validate.Validate
Mongo *libmongo.Client
Redis *goredis.Client
Setting settingusecase.UseCase
AI aisettings.UseCase
Job jobusecase.UseCase
AuthToken authdomain.TokenUseCase
Member memberdomain.UseCase
Permission permissiondomain.UseCase
Persona personadomain.UseCase
CopyMission copymissiondomain.UseCase
Brand branddomain.UseCase
PlacementTopic placementtopicdomain.UseCase
KnowledgeGraph kgdomain.UseCase
Placement placementusecase.UseCase
ScanPost scanpostdomain.UseCase
OutreachDraft outreachdraftdomain.UseCase
ContentMatrix cmatrixdomain.UseCase
CopyDraft copydraftdomain.UseCase
ThreadsAccount threadsaccountdomain.UseCase
// Middlewares mounted per route group via generate/api `middleware:` directive.
AuthJWT rest.Middleware
MemberAuth rest.Middleware
WorkerSecret rest.Middleware
stopWorker context.CancelFunc
stopScheduler context.CancelFunc
stopReaper context.CancelFunc
}
func NewServiceContext(c config.Config) *ServiceContext {
ctx := context.Background()
mongoClient, err := libmongo.NewClient(ctx, c.Mongo)
if err != nil {
panic(err)
}
redisClient := libredis.NewClient(c.Redis)
secretsCipher, err := libcrypto.New(c.Secrets.EncryptionKey)
if err != nil {
panic(err)
}
settingRepository := settingrepo.NewMongoRepository(mongoClient.Database())
if err := settingRepository.EnsureIndexes(ctx); err != nil {
panic(err)
}
settingUseCase := settingusecase.NewUseCase(settingRepository)
var tokenRevokeStore authrepodomain.TokenRevokeStore
if redisClient != nil {
tokenRevokeStore = authrepo.NewRedisTokenRevokeStore(redisClient)
}
authTokenUseCase := authuc.NewTokenUseCase(c.Auth, tokenRevokeStore)
memberRepository := memberrepo.NewMongoRepository(mongoClient.Database())
if err := memberRepository.EnsureIndexes(ctx); err != nil {
panic(err)
}
memberUseCase := memberuc.NewUseCase(memberRepository, authTokenUseCase)
permissionRepository := permissionrepo.NewMongoPermissionRepository(mongoClient.Database())
rolePermissionRepository := permissionrepo.NewMongoRolePermissionRepository(mongoClient.Database())
if err := permissionRepository.EnsureIndexes(ctx); err != nil {
panic(err)
}
if err := rolePermissionRepository.EnsureIndexes(ctx); err != nil {
panic(err)
}
permissionUseCase := permissionuc.NewUseCase(permissionRepository, rolePermissionRepository)
if err := permissionUseCase.EnsureDefaultPermissions(ctx); err != nil {
panic(err)
}
if err := permissionUseCase.EnsureDefaultRolePermissions(ctx, "default"); err != nil {
panic(err)
}
jobTemplateRepository := jobrepo.NewMongoTemplateRepository(mongoClient.Database())
jobRunRepository := jobrepo.NewMongoRunRepository(mongoClient.Database())
jobScheduleRepository := jobrepo.NewMongoScheduleRepository(mongoClient.Database())
jobEventRepository := jobrepo.NewMongoEventRepository(mongoClient.Database())
jobQueueRepository := jobrepo.NewRedisQueueRepository(redisClient)
if err := jobTemplateRepository.EnsureIndexes(ctx); err != nil {
panic(err)
}
if err := jobRunRepository.EnsureIndexes(ctx); err != nil {
panic(err)
}
if err := jobScheduleRepository.EnsureIndexes(ctx); err != nil {
panic(err)
}
if err := jobEventRepository.EnsureIndexes(ctx); err != nil {
panic(err)
}
jobUseCase := jobusecase.NewUseCase(jobTemplateRepository, jobRunRepository, jobScheduleRepository, jobEventRepository, jobQueueRepository)
if err := jobUseCase.EnsureDemoTemplate(ctx); err != nil {
panic(err)
}
if err := jobUseCase.EnsureStyle8DTemplate(ctx); err != nil {
panic(err)
}
if err := jobUseCase.EnsureExpandGraphTemplate(ctx); err != nil {
panic(err)
}
if err := jobUseCase.EnsurePlacementScanTemplate(ctx); err != nil {
panic(err)
}
if err := jobUseCase.EnsureScanViralTemplate(ctx); err != nil {
panic(err)
}
if err := jobUseCase.EnsureAnalyzeCopyMissionTemplate(ctx); err != nil {
panic(err)
}
if err := jobUseCase.EnsureGenerateCopyMatrixTemplate(ctx); err != nil {
panic(err)
}
if err := jobUseCase.EnsureGenerateCopyDraftTemplate(ctx); err != nil {
panic(err)
}
copyMissionRepository := copymissionrepo.NewMongoRepository(mongoClient.Database())
if err := copyMissionRepository.EnsureIndexes(ctx); err != nil {
panic(err)
}
copyMissionUseCase := copymissionusecase.NewUseCase(copyMissionRepository)
copyDraftRepository := copydraftrepo.NewMongoRepository(mongoClient.Database())
if err := copyDraftRepository.EnsureIndexes(ctx); err != nil {
panic(err)
}
copyDraftUseCase := copydraftusecase.NewUseCase(copyDraftRepository)
scanPostRepository := scanpostrepo.NewMongoRepository(mongoClient.Database())
if err := scanPostRepository.EnsureIndexes(ctx); err != nil {
panic(err)
}
scanPostUseCase := scanpostusecase.NewUseCase(scanPostRepository)
outreachDraftRepository := outreachdraftrepo.NewMongoRepository(mongoClient.Database())
if err := outreachDraftRepository.EnsureIndexes(ctx); err != nil {
panic(err)
}
outreachDraftUseCase := outreachdraftusecase.NewUseCase(outreachDraftRepository)
contentMatrixRepository := cmatrixrepo.NewMongoRepository(mongoClient.Database())
if err := contentMatrixRepository.EnsureIndexes(ctx); err != nil {
panic(err)
}
contentMatrixUseCase := cmatrixusecase.NewUseCase(contentMatrixRepository)
knowledgeGraphRepository := kgrepo.NewMongoRepository(mongoClient.Database())
if err := knowledgeGraphRepository.EnsureIndexes(ctx); err != nil {
panic(err)
}
knowledgeGraphUseCase := kgusecase.NewUseCase(knowledgeGraphRepository)
personaRepository := personarepo.NewMongoRepository(mongoClient.Database())
if err := personaRepository.EnsureIndexes(ctx); err != nil {
panic(err)
}
personaUseCase := personausecase.NewUseCase(personaRepository)
brandRepository := brandrepo.NewMongoRepository(mongoClient.Database())
if err := brandRepository.EnsureIndexes(ctx); err != nil {
panic(err)
}
brandUseCase := brandusecase.NewUseCase(brandRepository)
placementTopicRepository := placementtopicrepo.NewMongoRepository(mongoClient.Database())
if err := placementTopicRepository.EnsureIndexes(ctx); err != nil {
panic(err)
}
placementTopicUseCase := placementtopicusecase.NewUseCase(
placementTopicRepository,
brandRepository,
knowledgeGraphUseCase,
scanPostRepository,
)
threadsAccountRepository := threadsaccountrepo.NewMongoRepository(mongoClient.Database())
threadsAccountSecretsRepository := threadsaccountrepo.NewSecretsMongoRepository(mongoClient.Database(), secretsCipher)
if err := threadsAccountRepository.EnsureIndexes(ctx); err != nil {
panic(err)
}
if err := threadsAccountSecretsRepository.EnsureIndexes(ctx); err != nil {
panic(err)
}
threadsAccountUseCase := threadsaccountusecase.NewUseCase(
threadsAccountRepository,
threadsAccountSecretsRepository,
memberUseCase,
settingUseCase,
personaUseCase,
secretsCipher,
)
placementUseCase := placementusecase.NewUseCase(settingUseCase, secretsCipher)
sc := &ServiceContext{
Config: c,
Validator: validate.New(),
Mongo: mongoClient,
Redis: redisClient,
Setting: settingUseCase,
AI: aisettings.NewUseCase(),
Job: jobUseCase,
AuthToken: authTokenUseCase,
Member: memberUseCase,
Permission: permissionUseCase,
Persona: personaUseCase,
CopyMission: copyMissionUseCase,
Brand: brandUseCase,
PlacementTopic: placementTopicUseCase,
KnowledgeGraph: knowledgeGraphUseCase,
Placement: placementUseCase,
ScanPost: scanPostUseCase,
OutreachDraft: outreachDraftUseCase,
ContentMatrix: contentMatrixUseCase,
CopyDraft: copyDraftUseCase,
ThreadsAccount: threadsAccountUseCase,
}
hostname, _ := os.Hostname()
if c.JobWorker.Enabled && redisClient != nil {
workerType := c.JobWorker.WorkerType
if workerType == "" {
workerType = "go"
}
workerID := c.JobWorker.WorkerID
if workerID == "" {
workerID = fmt.Sprintf("%s-%s-worker", hostname, workerType)
}
workerCtx, cancel := context.WithCancel(context.Background())
sc.stopWorker = cancel
runner := jobworker.NewRunner(workerID, workerType, jobUseCase)
jobworker.RegisterExpandGraphHandler(runner, jobworker.ExpandGraphDeps{
Jobs: jobUseCase,
Brand: brandUseCase,
PlacementTopic: placementTopicUseCase,
KnowledgeGraph: knowledgeGraphUseCase,
ThreadsAccount: threadsAccountUseCase,
Placement: placementUseCase,
AI: sc.AI,
})
jobworker.RegisterScanPlacementHandler(runner, jobworker.ScanPlacementDeps{
Jobs: jobUseCase,
Brand: brandUseCase,
PlacementTopic: placementTopicUseCase,
KnowledgeGraph: knowledgeGraphUseCase,
ScanPost: scanPostUseCase,
ThreadsAccount: threadsAccountUseCase,
Placement: placementUseCase,
})
jobworker.RegisterScanViralHandler(runner, jobworker.ScanViralDeps{
Jobs: jobUseCase,
CopyMission: copyMissionUseCase,
Persona: personaUseCase,
ScanPost: scanPostUseCase,
CopyDraft: copyDraftUseCase,
ThreadsAccount: threadsAccountUseCase,
Placement: placementUseCase,
AI: sc.AI,
})
jobworker.RegisterAnalyzeCopyMissionHandler(runner, jobworker.AnalyzeCopyMissionDeps{
Jobs: jobUseCase,
CopyMission: copyMissionUseCase,
Persona: personaUseCase,
ThreadsAccount: threadsAccountUseCase,
Placement: placementUseCase,
AI: sc.AI,
})
jobworker.RegisterGenerateCopyMatrixHandler(runner, jobworker.GenerateCopyMatrixDeps{
Jobs: jobUseCase,
CopyMission: copyMissionUseCase,
Persona: personaUseCase,
ScanPost: scanPostUseCase,
CopyDraft: copyDraftUseCase,
ThreadsAccount: threadsAccountUseCase,
AI: sc.AI,
})
jobworker.RegisterGenerateCopyDraftHandler(runner, jobworker.GenerateCopyDraftDeps{
Jobs: jobUseCase,
CopyMission: copyMissionUseCase,
Persona: personaUseCase,
ScanPost: scanPostUseCase,
CopyDraft: copyDraftUseCase,
ThreadsAccount: threadsAccountUseCase,
AI: sc.AI,
})
go runner.Start(workerCtx)
}
if c.JobScheduler.Enabled && redisClient != nil {
schedulerHolder := fmt.Sprintf("%s-scheduler", hostname)
interval := time.Duration(c.JobScheduler.IntervalSeconds) * time.Second
if interval <= 0 {
interval = time.Minute
}
schedulerCtx, cancel := context.WithCancel(context.Background())
sc.stopScheduler = cancel
scheduler := jobworker.NewScheduler(schedulerHolder, jobUseCase)
go scheduler.Start(schedulerCtx, interval)
}
if c.JobReaper.Enabled && redisClient != nil {
interval := time.Duration(c.JobReaper.IntervalSeconds) * time.Second
if interval <= 0 {
interval = 30 * time.Second
}
reaperCtx, cancel := context.WithCancel(context.Background())
sc.stopReaper = cancel
reaper := jobworker.NewReaper(jobUseCase)
go reaper.Start(reaperCtx, interval)
}
// 認證 + RBAC先驗 JWT把 actor 放進 context再用 catalog 權限做路由級授權。
// 兩者組合後掛在受保護的 route group 上routes.go 以 AuthJWT / MemberAuth 引用)。
authJWT := middleware.NewAuthJWTMiddleware(sc.AuthToken, sc.Config.Auth).Handle
memberAuth := middleware.NewMemberAuthMiddleware(sc.AuthToken, sc.Config.Auth).Handle
rbac := middleware.NewPermissionRBACMiddleware(sc.Member, sc.Permission).Handle
sc.AuthJWT = func(next http.HandlerFunc) http.HandlerFunc { return authJWT(rbac(next)) }
sc.MemberAuth = func(next http.HandlerFunc) http.HandlerFunc { return memberAuth(rbac(next)) }
sc.WorkerSecret = middleware.NewWorkerSecretMiddleware(sc.Config.InternalWorker).Handle
return sc
}
func (sc *ServiceContext) Close(ctx context.Context) {
if sc == nil {
return
}
if sc.stopWorker != nil {
sc.stopWorker()
}
if sc.stopScheduler != nil {
sc.stopScheduler()
}
if sc.stopReaper != nil {
sc.stopReaper()
}
_ = sc.Mongo.Close(ctx)
if sc.Redis != nil {
_ = sc.Redis.Close()
}
}