opencode-cursor-agent/pkg/infrastructure/parser/stream.go

111 lines
2.8 KiB
Go
Raw Normal View History

package parser
import "encoding/json"
type StreamParser func(line string)
type Parser struct {
Parse StreamParser
Flush func()
}
// CreateStreamParser 建立串流解析器(向後相容,不傳遞 thinking
func CreateStreamParser(onText func(string), onDone func()) Parser {
return CreateStreamParserWithThinking(onText, nil, onDone)
}
// CreateStreamParserWithThinking 建立串流解析器,支援思考過程輸出。
// onThinking 可為 nil表示忽略思考過程。
func CreateStreamParserWithThinking(onText func(string), onThinking func(string), onDone func()) Parser {
// accumulated 是所有已輸出內容的串接
accumulatedText := ""
accumulatedThinking := ""
done := false
parse := func(line string) {
if done {
return
}
var obj struct {
Type string `json:"type"`
Subtype string `json:"subtype"`
Message *struct {
Content []struct {
Type string `json:"type"`
Text string `json:"text"`
Thinking string `json:"thinking"`
} `json:"content"`
} `json:"message"`
}
if err := json.Unmarshal([]byte(line), &obj); err != nil {
return
}
if obj.Type == "assistant" && obj.Message != nil {
fullText := ""
fullThinking := ""
for _, p := range obj.Message.Content {
switch p.Type {
case "text":
if p.Text != "" {
fullText += p.Text
}
case "thinking":
if p.Thinking != "" {
fullThinking += p.Thinking
}
}
}
// 處理思考過程(不因去重而 return避免跳過同行的文字內容
if onThinking != nil && fullThinking != "" && fullThinking != accumulatedThinking {
// 增量模式:新內容以 accumulated 為前綴
if len(fullThinking) >= len(accumulatedThinking) && fullThinking[:len(accumulatedThinking)] == accumulatedThinking {
delta := fullThinking[len(accumulatedThinking):]
if delta != "" {
onThinking(delta)
}
accumulatedThinking = fullThinking
} else {
// 獨立片段:直接輸出,但 accumulated 要串接
onThinking(fullThinking)
accumulatedThinking = accumulatedThinking + fullThinking
}
}
// 處理一般文字
if fullText == "" || fullText == accumulatedText {
return
}
// 增量模式:新內容以 accumulated 為前綴
if len(fullText) >= len(accumulatedText) && fullText[:len(accumulatedText)] == accumulatedText {
delta := fullText[len(accumulatedText):]
if delta != "" {
onText(delta)
}
accumulatedText = fullText
} else {
// 獨立片段:直接輸出,但 accumulated 要串接
onText(fullText)
accumulatedText = accumulatedText + fullText
}
}
if obj.Type == "result" && obj.Subtype == "success" {
done = true
onDone()
}
}
flush := func() {
if !done {
done = true
onDone()
}
}
return Parser{Parse: parse, Flush: flush}
}