package bridge import ( "bufio" "bytes" "context" "encoding/json" "fmt" "io" "log/slog" "os" "os/exec" "regexp" "strings" "sync" "sync/atomic" "time" "github.com/daniel/cursor-adapter/internal/workspace" ) // Bridge 定義與 Cursor CLI 的整合介面。 type Bridge interface { Execute(ctx context.Context, prompt string, model string, sessionKey string) (<-chan string, <-chan error) ExecuteSync(ctx context.Context, prompt string, model string, sessionKey string) (string, error) ListModels(ctx context.Context) ([]string, error) CheckHealth(ctx context.Context) error } // NewBridge 建立 Bridge。chatOnly=true 會讓每個子程序都跑在空的 temp // workspace、並且用 env overrides 把 HOME / CURSOR_CONFIG_DIR 導到那個 // temp dir,讓 Cursor agent 讀不到任何真實專案檔案或全域 rules。 func NewBridge(cursorPath string, logger *slog.Logger, useACP bool, chatOnly bool, maxConcurrent int, timeout time.Duration) Bridge { if useACP { return NewACPBridge(cursorPath, logger, chatOnly, maxConcurrent, timeout) } return NewCLIBridge(cursorPath, chatOnly, maxConcurrent, timeout) } // --- CLI Bridge --- type CLIBridge struct { cursorPath string semaphore chan struct{} timeout time.Duration chatOnly bool } func buildCLICommandArgs(prompt, model, workspaceDir string, stream, chatOnly bool) []string { args := []string{"--print", "--mode", "ask"} if chatOnly { args = append(args, "--trust") } if workspaceDir != "" { args = append(args, "--workspace", workspaceDir) } if model != "" { args = append(args, "--model", model) } if stream { args = append(args, "--stream-partial-output", "--output-format", "stream-json") } else { args = append(args, "--output-format", "text") } args = append(args, prompt) return args } func NewCLIBridge(cursorPath string, chatOnly bool, maxConcurrent int, timeout time.Duration) *CLIBridge { if maxConcurrent <= 0 { maxConcurrent = 1 } return &CLIBridge{ cursorPath: cursorPath, semaphore: make(chan struct{}, maxConcurrent), timeout: timeout, chatOnly: chatOnly, } } // prepareWorkspace returns (workspaceDir, envOverrides, cleanup). When // chatOnly is enabled, workspaceDir is a fresh temp dir and cleanup removes // it. Otherwise workspaceDir falls back to the adapter's cwd with no // cleanup. func (b *CLIBridge) prepareWorkspace() (string, map[string]string, func()) { if !b.chatOnly { ws, _ := os.Getwd() return ws, nil, func() {} } dir, env, err := workspace.ChatOnly("") if err != nil { slog.Warn("chat-only workspace setup failed, falling back to cwd", "err", err) ws, _ := os.Getwd() return ws, nil, func() {} } return dir, env, func() { _ = os.RemoveAll(dir) } } func (b *CLIBridge) Execute(ctx context.Context, prompt string, model string, sessionKey string) (<-chan string, <-chan error) { outputChan := make(chan string, 64) errChan := make(chan error, 1) go func() { defer close(outputChan) defer close(errChan) select { case b.semaphore <- struct{}{}: defer func() { <-b.semaphore }() case <-ctx.Done(): errChan <- ctx.Err() return } execCtx, cancel := context.WithTimeout(ctx, b.timeout) defer cancel() ws, envOverrides, cleanup := b.prepareWorkspace() defer cleanup() cmd := exec.CommandContext(execCtx, b.cursorPath, buildCLICommandArgs(prompt, model, ws, true, b.chatOnly)...) cmd.Dir = ws cmd.Env = workspace.MergeEnv(os.Environ(), envOverrides) stdoutPipe, err := cmd.StdoutPipe() if err != nil { errChan <- fmt.Errorf("stdout pipe: %w", err) return } defer stdoutPipe.Close() if err := cmd.Start(); err != nil { errChan <- fmt.Errorf("start command: %w", err) return } scanner := bufio.NewScanner(stdoutPipe) for scanner.Scan() { line := scanner.Text() line = strings.TrimSpace(line) if line == "" { continue } outputChan <- line } if err := scanner.Err(); err != nil { errChan <- fmt.Errorf("stdout scanner: %w", err) } if err := cmd.Wait(); err != nil { errChan <- err } }() return outputChan, errChan } func (b *CLIBridge) ExecuteSync(ctx context.Context, prompt string, model string, sessionKey string) (string, error) { select { case b.semaphore <- struct{}{}: defer func() { <-b.semaphore }() case <-ctx.Done(): return "", ctx.Err() } execCtx, cancel := context.WithTimeout(ctx, b.timeout) defer cancel() ws, envOverrides, cleanup := b.prepareWorkspace() defer cleanup() cmd := exec.CommandContext(execCtx, b.cursorPath, buildCLICommandArgs(prompt, model, ws, false, b.chatOnly)...) cmd.Dir = ws cmd.Env = workspace.MergeEnv(os.Environ(), envOverrides) var stdout, stderr bytes.Buffer cmd.Stdout = &stdout cmd.Stderr = &stderr if err := cmd.Run(); err != nil { return "", fmt.Errorf("run command: %w (stderr: %s)", err, strings.TrimSpace(stderr.String())) } return strings.TrimSpace(stdout.String()), nil } func (b *CLIBridge) ListModels(ctx context.Context) ([]string, error) { select { case b.semaphore <- struct{}{}: defer func() { <-b.semaphore }() case <-ctx.Done(): return nil, ctx.Err() } cmd := exec.CommandContext(ctx, b.cursorPath, "models") output, err := cmd.CombinedOutput() if err != nil { return nil, fmt.Errorf("list models: %w", err) } return parseModelsOutput(string(output)), nil } func (b *CLIBridge) CheckHealth(ctx context.Context) error { cmd := exec.CommandContext(ctx, b.cursorPath, "--version") _, err := cmd.CombinedOutput() if err != nil { return fmt.Errorf("health check: %w", err) } return nil } var ansiEscapeRe = regexp.MustCompile(`\x1b\[[0-9;]*[A-Za-z]`) func parseModelsOutput(output string) []string { clean := ansiEscapeRe.ReplaceAllString(output, "") var models []string for _, line := range strings.Split(clean, "\n") { line = strings.TrimSpace(line) if line == "" { continue } if strings.Contains(line, "Loading") || strings.Contains(line, "Available") || strings.Contains(line, "Tip:") || strings.Contains(line, "Error") { continue } id := line if idx := strings.Index(line, " - "); idx > 0 { id = line[:idx] } id = strings.TrimSpace(id) if id != "" { models = append(models, id) } } return models } // --- ACP Bridge (per-request 完整流程,參考 cursor-api-proxy) --- type ACPBridge struct { cursorPath string logger *slog.Logger timeout time.Duration chatOnly bool workers []*acpWorker nextWorker atomic.Uint32 sessionsMu sync.Mutex sessions map[string]acpSessionHandle sessionTTL time.Duration } type acpSessionHandle struct { WorkerIndex int SessionID string Model string Generation uint64 LastUsedAt time.Time } func NewACPBridge(cursorPath string, logger *slog.Logger, chatOnly bool, maxConcurrent int, timeout time.Duration) *ACPBridge { if maxConcurrent <= 0 { maxConcurrent = 1 } bridge := &ACPBridge{ cursorPath: cursorPath, logger: logger, timeout: timeout, chatOnly: chatOnly, sessions: make(map[string]acpSessionHandle), sessionTTL: 30 * time.Minute, } for i := 0; i < maxConcurrent; i++ { bridge.workers = append(bridge.workers, newACPWorker(cursorPath, logger, chatOnly, timeout)) } return bridge } func buildACPCommandArgs(workspace, model string) []string { args := []string{"--workspace", workspace} if model != "" && model != "auto" && model != "default" { args = append(args, "--model", model) } args = append(args, "acp") return args } // acpMessage 定義 ACP JSON-RPC message 格式。 type acpMessage struct { JSONRPC string `json:"jsonrpc"` ID *int `json:"id,omitempty"` Method string `json:"method,omitempty"` Params json.RawMessage `json:"params,omitempty"` Result json.RawMessage `json:"result,omitempty"` Error *acpError `json:"error,omitempty"` } type acpError struct { Code int `json:"code"` Message string `json:"message"` } type acpSession struct { SessionID string `json:"sessionId"` } type acpResponse struct { result json.RawMessage err error } type acpWorker struct { cursorPath string logger *slog.Logger timeout time.Duration chatOnly bool reqMu sync.Mutex writeMu sync.Mutex stateMu sync.Mutex workspace string envOverrides map[string]string currentModel string cmd *exec.Cmd stdin io.WriteCloser pending map[int]chan acpResponse nextID int readerErr error readerDone chan struct{} activeSink func(string) generation atomic.Uint64 } func newACPWorker(cursorPath string, logger *slog.Logger, chatOnly bool, timeout time.Duration) *acpWorker { return &acpWorker{ cursorPath: cursorPath, logger: logger, timeout: timeout, chatOnly: chatOnly, } } func (b *ACPBridge) Execute(ctx context.Context, prompt string, model string, sessionKey string) (<-chan string, <-chan error) { outputChan := make(chan string, 64) errChan := make(chan error, 1) go func() { defer close(outputChan) defer close(errChan) worker, sessionID := b.resolveSession(sessionKey, model) finalSessionID, err := worker.run(ctx, prompt, model, sessionID, func(text string) { if text == "" { return } select { case outputChan <- text: case <-ctx.Done(): } }) if err == nil { b.storeSession(sessionKey, model, worker, finalSessionID) } if err != nil { errChan <- err } }() return outputChan, errChan } func (b *ACPBridge) ExecuteSync(ctx context.Context, prompt string, model string, sessionKey string) (string, error) { var content strings.Builder worker, sessionID := b.resolveSession(sessionKey, model) finalSessionID, err := worker.run(ctx, prompt, model, sessionID, func(text string) { content.WriteString(text) }) if err != nil { return "", err } b.storeSession(sessionKey, model, worker, finalSessionID) return strings.TrimSpace(content.String()), nil } func (b *ACPBridge) pickWorker() *acpWorker { if len(b.workers) == 0 { return newACPWorker(b.cursorPath, b.logger, b.chatOnly, b.timeout) } idx := int(b.nextWorker.Add(1)-1) % len(b.workers) return b.workers[idx] } func (b *ACPBridge) resolveSession(sessionKey, model string) (*acpWorker, string) { normalizedModel := normalizeModel(model) if sessionKey == "" { return b.pickWorker(), "" } b.sessionsMu.Lock() defer b.sessionsMu.Unlock() b.cleanupExpiredSessionsLocked() handle, ok := b.sessions[sessionKey] if !ok { return b.pickWorker(), "" } if handle.Model != normalizedModel { delete(b.sessions, sessionKey) return b.workers[handle.WorkerIndex], "" } if handle.WorkerIndex < 0 || handle.WorkerIndex >= len(b.workers) { delete(b.sessions, sessionKey) return b.pickWorker(), "" } worker := b.workers[handle.WorkerIndex] if worker.Generation() != handle.Generation { delete(b.sessions, sessionKey) return worker, "" } handle.LastUsedAt = time.Now() b.sessions[sessionKey] = handle return worker, handle.SessionID } func (b *ACPBridge) storeSession(sessionKey, model string, worker *acpWorker, sessionID string) { if sessionKey == "" || sessionID == "" { return } workerIndex := -1 for i, candidate := range b.workers { if candidate == worker { workerIndex = i break } } if workerIndex == -1 { return } b.sessionsMu.Lock() defer b.sessionsMu.Unlock() b.cleanupExpiredSessionsLocked() b.sessions[sessionKey] = acpSessionHandle{ WorkerIndex: workerIndex, SessionID: sessionID, Model: normalizeModel(model), Generation: worker.Generation(), LastUsedAt: time.Now(), } } func (b *ACPBridge) cleanupExpiredSessionsLocked() { if b.sessionTTL <= 0 { return } cutoff := time.Now().Add(-b.sessionTTL) for key, handle := range b.sessions { if handle.LastUsedAt.Before(cutoff) { delete(b.sessions, key) } } } func (b *ACPBridge) ListModels(ctx context.Context) ([]string, error) { cmd := exec.CommandContext(ctx, b.cursorPath, "models") output, err := cmd.CombinedOutput() if err != nil { return nil, fmt.Errorf("list models: %w", err) } return parseModelsOutput(string(output)), nil } func (b *ACPBridge) CheckHealth(ctx context.Context) error { cmd := exec.CommandContext(ctx, b.cursorPath, "--version") _, err := cmd.CombinedOutput() if err != nil { return fmt.Errorf("health check: %w", err) } return nil } // sendACP 送 JSON-RPC request。 func sendACP(stdin io.Writer, id int, method string, params interface{}) error { msg := acpMessage{ JSONRPC: "2.0", ID: &id, Method: method, } if params != nil { data, _ := json.Marshal(params) msg.Params = data } data, _ := json.Marshal(msg) _, err := fmt.Fprintf(stdin, "%s\n", data) return err } func respondACP(stdin io.Writer, id int, result interface{}) error { msg := map[string]interface{}{ "jsonrpc": "2.0", "id": id, "result": result, } data, _ := json.Marshal(msg) _, err := fmt.Fprintf(stdin, "%s\n", data) return err } func extractACPText(params json.RawMessage) string { var update map[string]interface{} _ = json.Unmarshal(params, &update) updateMap, _ := update["update"].(map[string]interface{}) sessionUpdate, _ := updateMap["sessionUpdate"].(string) if sessionUpdate != "" && !strings.HasPrefix(sessionUpdate, "agent_message") { return "" } content, _ := updateMap["content"].(interface{}) switch v := content.(type) { case map[string]interface{}: if t, ok := v["text"].(string); ok { return t } case []interface{}: var parts []string for _, item := range v { if m, ok := item.(map[string]interface{}); ok { if nested, ok := m["content"].(map[string]interface{}); ok { if t, ok := nested["text"].(string); ok { parts = append(parts, t) continue } } if t, ok := m["text"].(string); ok { parts = append(parts, t) } } } return strings.Join(parts, "") case string: return v } return "" } func (w *acpWorker) Generation() uint64 { return w.generation.Load() } func normalizeModel(m string) string { m = strings.TrimSpace(m) if m == "" || m == "default" { return "auto" } return m } func (w *acpWorker) run(ctx context.Context, prompt string, model string, sessionID string, sink func(string)) (string, error) { w.reqMu.Lock() defer w.reqMu.Unlock() t0 := time.Now() wantModel := normalizeModel(model) if w.cmd != nil && w.currentModel != wantModel { slog.Debug("acp: model changed, restarting worker", "from", w.currentModel, "to", wantModel) w.resetLocked() sessionID = "" } if err := w.ensureStartedLocked(ctx, wantModel); err != nil { return "", err } slog.Debug("acp: ensureStarted", "model", wantModel, "elapsed", time.Since(t0)) newSession := sessionID == "" if newSession { var err error sessionID, err = w.createSessionLocked(ctx) if err != nil { w.resetLocked() return "", err } slog.Debug("acp: session created", "elapsed", time.Since(t0)) } w.setActiveSinkLocked(sink) defer w.setActiveSinkLocked(nil) slog.Debug("acp: sending prompt", "newSession", newSession, "elapsed", time.Since(t0)) if _, err := w.sendRequestLocked(ctx, "session/prompt", map[string]interface{}{ "sessionId": sessionID, "prompt": []interface{}{map[string]interface{}{ "type": "text", "text": prompt, }}, }); err != nil { w.resetLocked() return "", err } slog.Debug("acp: prompt complete", "elapsed", time.Since(t0)) return sessionID, nil } func (w *acpWorker) ensureStartedLocked(ctx context.Context, model string) error { if w.cmd != nil { return nil } if w.workspace == "" { var ( dir string env map[string]string err error ) if w.chatOnly { dir, env, err = workspace.ChatOnly("") if err != nil { return fmt.Errorf("chat-only workspace: %w", err) } } else { dir, err = os.MkdirTemp("", "cursor-acp-worker-*") if err != nil { return fmt.Errorf("temp workspace: %w", err) } } w.workspace = dir w.envOverrides = env } w.currentModel = model cmd := exec.Command(w.cursorPath, buildACPCommandArgs(w.workspace, model)...) cmd.Dir = w.workspace cmd.Env = workspace.MergeEnv(os.Environ(), w.envOverrides) stdin, err := cmd.StdinPipe() if err != nil { return fmt.Errorf("stdin pipe: %w", err) } stdoutPipe, err := cmd.StdoutPipe() if err != nil { _ = stdin.Close() return fmt.Errorf("stdout pipe: %w", err) } if err := cmd.Start(); err != nil { _ = stdin.Close() _ = stdoutPipe.Close() return fmt.Errorf("start acp: %w", err) } w.cmd = cmd w.stdin = stdin w.pending = make(map[int]chan acpResponse) w.nextID = 1 w.readerDone = make(chan struct{}) w.readerErr = nil w.generation.Add(1) go w.readLoop(stdoutPipe) if _, err := w.sendRequestLocked(ctx, "initialize", map[string]interface{}{ "protocolVersion": 1, "clientCapabilities": map[string]interface{}{ "promptCapabilities": map[string]interface{}{"text": true}, "fs": map[string]interface{}{"readTextFile": false, "writeTextFile": false}, "terminal": false, }, "clientInfo": map[string]interface{}{ "name": "cursor-adapter", "version": "0.2.0", }, }); err != nil { return fmt.Errorf("initialize: %w", err) } if _, err := w.sendRequestLocked(ctx, "authenticate", map[string]interface{}{ "methodId": "cursor_login", }); err != nil { return fmt.Errorf("authenticate: %w", err) } return nil } func (w *acpWorker) createSessionLocked(ctx context.Context) (string, error) { resp, err := w.sendRequestLocked(ctx, "session/new", map[string]interface{}{ "cwd": w.workspace, "mcpServers": []interface{}{}, }) if err != nil { return "", fmt.Errorf("session/new: %w", err) } var session acpSession if err := json.Unmarshal(resp, &session); err != nil || session.SessionID == "" { return "", fmt.Errorf("session/new invalid response: %s", string(resp)) } return session.SessionID, nil } func (w *acpWorker) setConfigLocked(ctx context.Context, sessionID, configID string, value interface{}) error { _, err := w.sendRequestLocked(ctx, "session/set_config_option", map[string]interface{}{ "sessionId": sessionID, "configId": configID, "value": value, }) if err != nil { return fmt.Errorf("session/set_config_option(%s=%v): %w", configID, value, err) } return nil } func (w *acpWorker) sendRequestLocked(ctx context.Context, method string, params interface{}) (json.RawMessage, error) { if w.stdin == nil { return nil, fmt.Errorf("acp stdin unavailable") } id := w.nextID w.nextID++ respCh := make(chan acpResponse, 1) w.stateMu.Lock() w.pending[id] = respCh readerDone := w.readerDone w.stateMu.Unlock() if err := w.writeJSONRPCLocked(id, method, params); err != nil { w.removePending(id) return nil, err } timer := time.NewTimer(w.timeout) defer timer.Stop() select { case resp := <-respCh: return resp.result, resp.err case <-ctx.Done(): w.removePending(id) return nil, ctx.Err() case <-readerDone: return nil, w.getReaderErr() case <-timer.C: w.removePending(id) return nil, fmt.Errorf("acp %s timed out after %s", method, w.timeout) } } func (w *acpWorker) writeJSONRPCLocked(id int, method string, params interface{}) error { w.writeMu.Lock() defer w.writeMu.Unlock() return sendACP(w.stdin, id, method, params) } func (w *acpWorker) readLoop(stdout io.ReadCloser) { defer stdout.Close() scanner := bufio.NewScanner(stdout) scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) for scanner.Scan() { line := strings.TrimSpace(scanner.Text()) if line == "" { continue } var msg acpMessage if err := json.Unmarshal([]byte(line), &msg); err != nil { continue } if msg.ID != nil && (msg.Result != nil || msg.Error != nil) { w.deliverResponse(*msg.ID, msg) continue } w.handleACPNotification(msg) } err := scanner.Err() w.stateMu.Lock() w.readerErr = err done := w.readerDone pending := w.pending w.pending = make(map[int]chan acpResponse) w.stateMu.Unlock() for _, ch := range pending { ch <- acpResponse{err: w.getReaderErr()} close(ch) } if w.cmd != nil { _ = w.cmd.Wait() } if done != nil { close(done) } } func (w *acpWorker) deliverResponse(id int, msg acpMessage) { w.stateMu.Lock() ch := w.pending[id] delete(w.pending, id) w.stateMu.Unlock() if ch == nil { return } resp := acpResponse{result: msg.Result} if msg.Error != nil { resp.err = fmt.Errorf("%s", msg.Error.Message) } ch <- resp close(ch) } func (w *acpWorker) handleACPNotification(msg acpMessage) bool { switch msg.Method { case "session/update": text := extractACPText(msg.Params) if text != "" { w.stateMu.Lock() sink := w.activeSink w.stateMu.Unlock() if sink != nil { sink(text) } } return true case "session/request_permission": if msg.ID != nil { w.writeMu.Lock() _ = respondACP(w.stdin, *msg.ID, map[string]interface{}{ "outcome": map[string]interface{}{ "outcome": "selected", "optionId": "reject-once", }, }) w.writeMu.Unlock() } return true } if msg.ID != nil && strings.HasPrefix(msg.Method, "cursor/") { var params map[string]interface{} _ = json.Unmarshal(msg.Params, ¶ms) w.writeMu.Lock() defer w.writeMu.Unlock() switch msg.Method { case "cursor/ask_question": selectedID := "" if options, ok := params["options"].([]interface{}); ok && len(options) > 0 { if first, ok := options[0].(map[string]interface{}); ok { if id, ok := first["id"].(string); ok { selectedID = id } } } _ = respondACP(w.stdin, *msg.ID, map[string]interface{}{"selectedId": selectedID}) case "cursor/create_plan": _ = respondACP(w.stdin, *msg.ID, map[string]interface{}{"approved": true}) default: _ = respondACP(w.stdin, *msg.ID, map[string]interface{}{}) } return true } return false } func (w *acpWorker) setActiveSinkLocked(sink func(string)) { w.stateMu.Lock() w.activeSink = sink w.stateMu.Unlock() } func (w *acpWorker) removePending(id int) { w.stateMu.Lock() delete(w.pending, id) w.stateMu.Unlock() } func (w *acpWorker) getReaderErr() error { w.stateMu.Lock() defer w.stateMu.Unlock() if w.readerErr != nil { return w.readerErr } return fmt.Errorf("acp process exited") } func (w *acpWorker) resetLocked() { w.stateMu.Lock() cmd := w.cmd stdin := w.stdin done := w.readerDone w.cmd = nil w.stdin = nil w.pending = make(map[int]chan acpResponse) w.nextID = 1 w.readerDone = nil w.readerErr = nil w.activeSink = nil w.stateMu.Unlock() w.generation.Add(1) if stdin != nil { _ = stdin.Close() } if cmd != nil && cmd.Process != nil { _ = cmd.Process.Kill() } if done != nil { select { case <-done: case <-time.After(2 * time.Second): } } if w.workspace != "" { _ = os.RemoveAll(w.workspace) w.workspace = "" } w.envOverrides = nil }