diff --git a/.github/workflows/upstream-sync.yml b/.github/workflows/upstream-sync.yml new file mode 100644 index 00000000..e236006b --- /dev/null +++ b/.github/workflows/upstream-sync.yml @@ -0,0 +1,261 @@ +name: Upstream Sync + +on: + schedule: + # 每天 UTC 23:00(台灣時間 07:00) + - cron: "0 23 * * *" + workflow_dispatch: + inputs: + dry_run: + description: "Dry run (不建立 PR,只顯示差異)" + required: false + default: "false" + +permissions: + contents: write + pull-requests: write + +jobs: + sync: + runs-on: ubuntu-latest + steps: + - name: Checkout fork + uses: actions/checkout@v4 + with: + fetch-depth: 0 + token: ${{ secrets.PAT }} + + - name: Configure git + run: | + git config user.name "github-actions[bot]" + git config user.email "github-actions[bot]@users.noreply.github.com" + + - name: Add upstream remote + run: | + git remote add upstream https://github.com/github/copilot-sdk.git || true + git fetch upstream main + + - name: Check for new commits + id: check + run: | + UPSTREAM_SHA=$(git rev-parse upstream/main) + LOCAL_SHA=$(git merge-base HEAD upstream/main) + + if [ "$UPSTREAM_SHA" = "$LOCAL_SHA" ]; then + echo "already_up_to_date=true" >> "$GITHUB_OUTPUT" + echo "✅ Already up to date with upstream" + else + COMMIT_COUNT=$(git rev-list --count "$LOCAL_SHA".."$UPSTREAM_SHA") + COMMIT_LOG=$(git log --oneline "$LOCAL_SHA".."$UPSTREAM_SHA" | head -20) + echo "already_up_to_date=false" >> "$GITHUB_OUTPUT" + echo "commit_count=$COMMIT_COUNT" >> "$GITHUB_OUTPUT" + echo "upstream_sha=$UPSTREAM_SHA" >> "$GITHUB_OUTPUT" + echo "base_sha=$LOCAL_SHA" >> "$GITHUB_OUTPUT" + + # Store commit log for PR body (multiline) + { + echo "commit_log<> "$GITHUB_OUTPUT" + + echo "📦 Found $COMMIT_COUNT new upstream commits" + fi + + - name: Attempt merge + if: steps.check.outputs.already_up_to_date != 'true' + id: merge + run: | + BRANCH="upstream-sync/$(date +%Y%m%d-%H%M%S)" + git checkout -b "$BRANCH" + echo "branch=$BRANCH" >> "$GITHUB_OUTPUT" + + # Attempt merge (allow conflicts) + if git merge upstream/main --no-commit --no-ff 2>/dev/null; then + echo "merge_conflicts=false" >> "$GITHUB_OUTPUT" + echo "✅ Merge completed without conflicts" + else + echo "merge_conflicts=true" >> "$GITHUB_OUTPUT" + echo "⚠️ Merge has conflicts, attempting auto-resolve..." + + # === 自動保留我們的 ACP 檔案 === + ACP_RESOLVED=false + for path in nodejs/src/protocols/ nodejs/test/protocols/ nodejs/test/e2e/acp.test.ts; do + if git diff --name-only --diff-filter=U | grep -q "^${path}"; then + echo " 保留 ours: $path" + git checkout --ours -- "$path" 2>/dev/null || true + git add "$path" 2>/dev/null || true + ACP_RESOLVED=true + fi + done + + if [ "$ACP_RESOLVED" = "true" ]; then + echo "acp_auto_resolved=true" >> "$GITHUB_OUTPUT" + else + echo "acp_auto_resolved=false" >> "$GITHUB_OUTPUT" + fi + + # === 檢查核心檔案衝突(需要手動處理)=== + MANUAL_REVIEW=false + CONFLICT_FILES="" + for path in nodejs/src/client.ts nodejs/src/types.ts; do + if git diff --name-only --diff-filter=U | grep -q "^${path}"; then + echo " ❌ 核心檔案衝突: $path" + MANUAL_REVIEW=true + CONFLICT_FILES="$CONFLICT_FILES $path" + fi + done + + echo "needs_manual_review=$MANUAL_REVIEW" >> "$GITHUB_OUTPUT" + echo "conflict_files=$CONFLICT_FILES" >> "$GITHUB_OUTPUT" + + # 如果還有未解決的衝突,檢查是否都在可自動解決的路徑中 + REMAINING=$(git diff --name-only --diff-filter=U 2>/dev/null || true) + if [ -n "$REMAINING" ]; then + echo " 剩餘未解決的衝突:" + echo "$REMAINING" + if [ "$MANUAL_REVIEW" = "false" ]; then + # 非核心檔案的衝突,也標記為需要手動處理 + echo "needs_manual_review=true" >> "$GITHUB_OUTPUT" + echo "conflict_files=$REMAINING" >> "$GITHUB_OUTPUT" + fi + fi + fi + + - name: Commit and push + if: steps.check.outputs.already_up_to_date != 'true' && steps.merge.outputs.needs_manual_review != 'true' + run: | + # Check if there are changes to commit + if git diff --cached --quiet && git diff --quiet; then + echo "No changes to commit" + exit 0 + fi + + git commit -m "chore: sync upstream main (${{ steps.check.outputs.commit_count }} commits) + + Upstream: github/copilot-sdk@${{ steps.check.outputs.upstream_sha }} + ACP auto-resolved: ${{ steps.merge.outputs.acp_auto_resolved || 'false' }}" + + if [ "${{ github.event.inputs.dry_run }}" = "true" ]; then + echo "🏃 Dry run — skipping push" + git log --oneline -5 + else + git push origin "${{ steps.merge.outputs.branch }}" + # Wait for GitHub to index the new branch + sleep 30 + fi + + - name: Commit conflict state for manual review + if: steps.check.outputs.already_up_to_date != 'true' && steps.merge.outputs.needs_manual_review == 'true' + run: | + # 對衝突檔案保留 ours 版本以便 push(PR 中會標示需要手動處理) + git checkout --ours -- . 2>/dev/null || true + git add -A + + git commit -m "chore: sync upstream main (conflicts need manual review) + + Upstream: github/copilot-sdk@${{ steps.check.outputs.upstream_sha }} + Conflicting files: ${{ steps.merge.outputs.conflict_files }} + + ⚠️ This PR has unresolved conflicts that need manual review." + + if [ "${{ github.event.inputs.dry_run }}" = "true" ]; then + echo "🏃 Dry run — skipping push" + else + git push origin "${{ steps.merge.outputs.branch }}" + sleep 30 + fi + + - name: Create PR (clean merge) + if: steps.check.outputs.already_up_to_date != 'true' && steps.merge.outputs.needs_manual_review != 'true' && github.event.inputs.dry_run != 'true' + env: + GH_TOKEN: ${{ secrets.PAT }} + run: | + BODY=$(cat <<'HEREDOC' + ## Upstream Sync + + **Commits:** ${{ steps.check.outputs.commit_count }} new commits from upstream + **Upstream SHA:** `${{ steps.check.outputs.upstream_sha }}` + **Base SHA:** `${{ steps.check.outputs.base_sha }}` + + ### Upstream Changes + ``` + ${{ steps.check.outputs.commit_log }} + ``` + + ### Conflict Resolution + - ACP paths auto-resolved: ${{ steps.merge.outputs.acp_auto_resolved || 'none' }} + - Manual review needed: none + + ### After Merge + ```bash + # Rebase feature branches + git fetch origin main + git rebase origin/main + ``` + + --- + Auto-generated by upstream-sync workflow + HEREDOC + ) + + # Use REST API instead of GraphQL to avoid shallow clone issues + gh api repos/${{ github.repository }}/pulls \ + -f base=main \ + -f head="${{ steps.merge.outputs.branch }}" \ + -f title="chore: sync upstream (${{ steps.check.outputs.commit_count }} commits)" \ + -f body="$BODY" + + - name: Create PR (needs manual review) + if: steps.check.outputs.already_up_to_date != 'true' && steps.merge.outputs.needs_manual_review == 'true' && github.event.inputs.dry_run != 'true' + env: + GH_TOKEN: ${{ secrets.PAT }} + run: | + BODY=$(cat <<'HEREDOC' + ## Upstream Sync — Needs Manual Review + + **Commits:** ${{ steps.check.outputs.commit_count }} new commits from upstream + **Upstream SHA:** `${{ steps.check.outputs.upstream_sha }}` + **Base SHA:** `${{ steps.check.outputs.base_sha }}` + + ### Upstream Changes + ``` + ${{ steps.check.outputs.commit_log }} + ``` + + ### Conflicting Files (need manual resolution) + ``` + ${{ steps.merge.outputs.conflict_files }} + ``` + + These core files have conflicts between upstream changes and our modifications. + Please resolve manually: + + ```bash + # Fetch and attempt merge locally + git fetch upstream main + git checkout main + git merge upstream/main + + # Resolve conflicts in: + ${{ steps.merge.outputs.conflict_files }} + + # Then commit and push + git add -A + git commit + git push + ``` + + ### ACP Paths + - ACP auto-resolved: ${{ steps.merge.outputs.acp_auto_resolved || 'false' }} + + --- + Auto-generated by upstream-sync workflow + HEREDOC + ) + + gh api repos/${{ github.repository }}/pulls \ + -f base=main \ + -f head="${{ steps.merge.outputs.branch }}" \ + -f title="chore: sync upstream (needs manual review)" \ + -f body="$BODY" diff --git a/nodejs/src/client.ts b/nodejs/src/client.ts index 7df64e50..b32ff0a7 100644 --- a/nodejs/src/client.ts +++ b/nodejs/src/client.ts @@ -25,6 +25,8 @@ import { import { createServerRpc } from "./generated/rpc.js"; import { getSdkProtocolVersion } from "./sdkProtocolVersion.js"; import { CopilotSession } from "./session.js"; +import type { ProtocolAdapter, ProtocolConnection } from "./protocols/protocol-adapter.js"; +import { AcpProtocolAdapter } from "./protocols/acp/index.js"; import type { ConnectionState, CopilotClientOptions, @@ -137,13 +139,15 @@ export class CopilotClient { private sessions: Map = new Map(); private stderrBuffer: string = ""; // Captures CLI stderr for error messages private options: Required< - Omit + Omit > & { cliUrl?: string; githubToken?: string; useLoggedInUser?: boolean; + protocol: "copilot" | "acp"; }; private isExternalServer: boolean = false; + private protocolAdapter: ProtocolAdapter | null = null; private forceStopping: boolean = false; private modelsCache: ModelInfo[] | null = null; private modelsCacheLock: Promise = Promise.resolve(); @@ -225,6 +229,7 @@ export class CopilotClient { githubToken: options.githubToken, // Default useLoggedInUser to false when githubToken is provided, otherwise true useLoggedInUser: options.useLoggedInUser ?? (options.githubToken ? false : true), + protocol: options.protocol ?? "copilot", }; } @@ -285,6 +290,25 @@ export class CopilotClient { this.state = "connecting"; try { + // Use ACP protocol adapter for ACP mode + if (this.options.protocol === "acp") { + this.protocolAdapter = new AcpProtocolAdapter(this.options); + await this.protocolAdapter.start(); + + // Get the protocol connection and wrap it for MessageConnection compatibility + const protoConn = this.protocolAdapter.getConnection(); + this.connection = this.wrapProtocolConnection(protoConn); + this.attachConnectionHandlers(); + protoConn.listen(); + + // Verify protocol version + await this.protocolAdapter.verifyProtocolVersion(); + + this.state = "connected"; + return; + } + + // Standard Copilot protocol path // Only start CLI server process if not connecting to external server if (!this.isExternalServer) { await this.startCLIServer(); @@ -357,6 +381,17 @@ export class CopilotClient { } this.sessions.clear(); + // For ACP mode, use the protocol adapter's stop + if (this.protocolAdapter) { + const adapterErrors = await this.protocolAdapter.stop(); + errors.push(...adapterErrors); + this.protocolAdapter = null; + this.connection = null; + this.modelsCache = null; + this.state = "disconnected"; + return errors; + } + // Close connection if (this.connection) { try { @@ -441,6 +476,16 @@ export class CopilotClient { // Clear sessions immediately without trying to destroy them this.sessions.clear(); + // For ACP mode, use the protocol adapter's forceStop + if (this.protocolAdapter) { + await this.protocolAdapter.forceStop(); + this.protocolAdapter = null; + this.connection = null; + this.modelsCache = null; + this.state = "disconnected"; + return; + } + // Force close connection if (this.connection) { try { @@ -1539,6 +1584,57 @@ export class CopilotClient { }; } + /** + * Wraps a ProtocolConnection to provide MessageConnection interface compatibility. + * This allows ACP connections to work with the existing session management code. + */ + private wrapProtocolConnection(protoConn: ProtocolConnection): MessageConnection { + // Create a minimal MessageConnection-like wrapper + // We cast through unknown because we're implementing a subset of MessageConnection + const wrapper = { + sendRequest: async (method: string, params?: unknown) => { + return await protoConn.sendRequest(method, params); + }, + sendNotification: (method: string, params?: unknown) => { + protoConn.sendNotification(method, params); + }, + onNotification: (method: string, handler: (params: unknown) => void) => { + protoConn.onNotification(method, handler); + return { dispose: () => {} }; + }, + onRequest: (method: string, handler: (params: unknown) => Promise) => { + protoConn.onRequest(method, handler); + return { dispose: () => {} }; + }, + onClose: (handler: () => void) => { + protoConn.onClose(handler); + return { dispose: () => {} }; + }, + onError: (handler: (error: Error) => void) => { + protoConn.onError(handler); + return { dispose: () => {} }; + }, + dispose: () => { + protoConn.dispose(); + }, + listen: () => { + protoConn.listen(); + }, + // Additional MessageConnection methods - stubs for interface compatibility + onUnhandledNotification: () => ({ dispose: () => {} }), + onUnhandledRequest: () => ({ dispose: () => {} }), + onDispose: () => ({ dispose: () => {} }), + hasPendingResponse: () => false, + onProgress: () => ({ dispose: () => {} }), + sendProgress: () => {}, + onUnhandledProgress: () => ({ dispose: () => {} }), + trace: () => {}, + end: () => {}, + inspect: () => {}, + }; + return wrapper as unknown as MessageConnection; + } + /** * Attempt to reconnect to the server */ diff --git a/nodejs/src/protocols/acp/acp-adapter.ts b/nodejs/src/protocols/acp/acp-adapter.ts new file mode 100644 index 00000000..4fdbc14d --- /dev/null +++ b/nodejs/src/protocols/acp/acp-adapter.ts @@ -0,0 +1,668 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + *--------------------------------------------------------------------------------------------*/ + +/** + * ACP (Agent Client Protocol) adapter implementation. + * @module protocols/acp/acp-adapter + */ + +import { spawn, type ChildProcess } from "node:child_process"; +import type { ProtocolAdapter, ProtocolConnection } from "../protocol-adapter.js"; +import type { CopilotClientOptions, SessionConfig } from "../../types.js"; +import { AcpTransport } from "./acp-transport.js"; +import { + ACP_PROTOCOL_VERSION, + UNSUPPORTED_ACP_METHODS, + type AcpSessionUpdateParams, + type AcpInitializeResult, + type AcpSessionNewResult, + type AcpSessionPromptResult, + type AcpRequestPermissionParams, + type AcpRequestPermissionResult, + type AcpToolCallUpdateInner, + type AcpToolCallUpdateUpdateInner, +} from "./acp-types.js"; +import { + stringToAcpContent, + acpSessionUpdateToSessionEvent, + acpToolCallToSessionEvent, + copilotSessionConfigToAcpParams, +} from "./acp-mapper.js"; + +/** + * ACP protocol connection implementation. + * Translates Copilot SDK method calls to ACP protocol format. + */ +class AcpConnection implements ProtocolConnection { + private transport: AcpTransport; + private requestId = 0; + private notificationHandlers: Map void>> = new Map(); + private requestHandlers: Map Promise> = new Map(); + private closeHandlers: Set<() => void> = new Set(); + private errorHandlers: Set<(error: Error) => void> = new Set(); + + /** + * Accumulates message_delta content per session so we can synthesize + * an assistant.message event when session.idle fires. + * Gemini only sends agent_message_chunk (delta), not agent_message (complete). + */ + private deltaAccumulator: Map = new Map(); + + constructor(transport: AcpTransport) { + this.transport = transport; + + // Set up transport notification handler for session/update + this.transport.onNotification("session/update", (params) => { + this.handleAcpUpdate(params as AcpSessionUpdateParams); + }); + + // Set up transport request handler for session/request_permission + this.transport.onRequest("session/request_permission", async (id, params) => { + await this.handleAcpPermissionRequest(id, params as AcpRequestPermissionParams); + }); + + this.transport.onClose(() => { + for (const handler of this.closeHandlers) { + try { + handler(); + } catch { + // Ignore handler errors + } + } + }); + + this.transport.onError((error) => { + for (const handler of this.errorHandlers) { + try { + handler(error); + } catch { + // Ignore handler errors + } + } + }); + } + + async sendRequest(method: string, params?: unknown): Promise { + // Check for unsupported methods + if (UNSUPPORTED_ACP_METHODS.includes(method as (typeof UNSUPPORTED_ACP_METHODS)[number])) { + throw new Error( + `Method '${method}' is not supported in ACP mode. ` + + `ACP protocol has limited feature support compared to the full Copilot protocol.` + ); + } + + // Translate method and params + const { acpMethod, acpParams } = this.translateRequest(method, params); + + // Handle no-op methods (like session.destroy which ACP doesn't support) + if (acpMethod === "_noop") { + return {} as T; + } + + const id = ++this.requestId; + const result = await this.transport.sendRequest(id, acpMethod, acpParams); + + // Translate response if needed + const translated = this.translateResponse(method, result, params); + + // After session.create, apply session config (model, mode, etc.) + if (method === "session.create") { + await this.applySessionConfig(translated, params); + } + + return translated as T; + } + + sendNotification(method: string, params?: unknown): void { + this.transport.sendNotification(method, params); + } + + onNotification(method: string, handler: (params: unknown) => void): void { + if (!this.notificationHandlers.has(method)) { + this.notificationHandlers.set(method, new Set()); + } + this.notificationHandlers.get(method)!.add(handler); + } + + onRequest(method: string, handler: (params: unknown) => Promise): void { + // Store the handler for ACP requests (permission.request maps to session/request_permission) + this.requestHandlers.set(method, handler); + } + + onClose(handler: () => void): void { + this.closeHandlers.add(handler); + } + + onError(handler: (error: Error) => void): void { + this.errorHandlers.add(handler); + } + + dispose(): void { + this.transport.dispose(); + this.notificationHandlers.clear(); + this.requestHandlers.clear(); + this.closeHandlers.clear(); + this.errorHandlers.clear(); + } + + listen(): void { + this.transport.listen(); + } + + private translateRequest( + method: string, + params?: unknown + ): { acpMethod: string; acpParams?: unknown } { + switch (method) { + case "ping": + return { + acpMethod: "initialize", + acpParams: { protocolVersion: ACP_PROTOCOL_VERSION }, + }; + + case "session.create": { + const config = params as SessionConfig & { workingDirectory?: string }; + const acpParams = copilotSessionConfigToAcpParams(config); + // ACP requires cwd and mcpServers (as array) + // Gemini expects env as array of "KEY=value" strings + const mcpServers = acpParams.mcpServers + ? Object.entries(acpParams.mcpServers).map(([name, serverConfig]) => { + const envArray: string[] = serverConfig.env + ? Object.entries(serverConfig.env).map( + ([key, value]) => `${key}=${value}` + ) + : []; + return { + name, + command: serverConfig.command, + args: serverConfig.args ?? [], + env: envArray, + }; + }) + : []; + return { + acpMethod: "session/new", + acpParams: { + cwd: acpParams.cwd || process.cwd(), + mcpServers, + }, + }; + } + + case "session.send": { + const sendParams = params as { sessionId: string; prompt: string }; + return { + acpMethod: "session/prompt", + acpParams: { + sessionId: sendParams.sessionId, + prompt: stringToAcpContent(sendParams.prompt), + }, + }; + } + + case "session.destroy": { + // ACP doesn't have session/end - just return success + // The session will be cleaned up when the process exits + return { + acpMethod: "_noop", + acpParams: params, + }; + } + + case "session.abort": { + return { + acpMethod: "session/abort", + acpParams: params, + }; + } + + default: + // Pass through unrecognized methods + return { acpMethod: method, acpParams: params }; + } + } + + private translateResponse(method: string, result: unknown, originalParams?: unknown): unknown { + switch (method) { + case "ping": { + const acpResult = result as AcpInitializeResult; + return { + message: "pong", + timestamp: Date.now(), + protocolVersion: acpResult.protocolVersion, + }; + } + + case "session.create": { + const acpResult = result as AcpSessionNewResult; + return { + sessionId: acpResult.sessionId, + }; + } + + case "session.send": { + const acpResult = result as AcpSessionPromptResult; + const sessionId = (originalParams as { sessionId?: string })?.sessionId ?? ""; + + // Gemini returns stopReason in the response instead of sending + // a separate end_turn notification. Emit session.idle event. + if (acpResult.stopReason === "end_turn") { + // Dispatch events after a microtask to ensure + // they're processed after the send() promise resolves + queueMicrotask(() => { + const handlers = this.notificationHandlers.get("session.event"); + if (!handlers) return; + + const dispatch = (event: unknown) => { + const notification = { sessionId, event }; + for (const handler of handlers) { + try { + handler(notification); + } catch { + // Ignore handler errors + } + } + }; + + // Gemini only sends message_delta, not a complete message. + // Synthesize assistant.message from accumulated deltas. + const accumulated = this.deltaAccumulator.get(sessionId); + if (accumulated) { + dispatch({ + id: `acp-msg-${Date.now()}`, + timestamp: new Date().toISOString(), + parentId: null, + ephemeral: false, + type: "assistant.message", + data: { + messageId: acpResult.messageId ?? `acp-msg-${Date.now()}`, + content: accumulated, + toolRequests: [], + }, + }); + this.deltaAccumulator.delete(sessionId); + } + + // Then dispatch session.idle + dispatch({ + id: `acp-idle-${Date.now()}`, + timestamp: new Date().toISOString(), + parentId: null, + ephemeral: true, + type: "session.idle", + data: {}, + }); + }); + } + + return { + messageId: acpResult.messageId, + }; + } + + default: + return result; + } + } + + private async applySessionConfig( + createResult: unknown, + originalParams: unknown + ): Promise { + const sessionId = (createResult as { sessionId?: string })?.sessionId; + const config = originalParams as SessionConfig | undefined; + if (!sessionId || !config) return; + + if (config.model) { + const id = ++this.requestId; + await this.transport.sendRequest(id, "session/set_config_option", { + sessionId, + configId: "model", + value: config.model, + }); + } + } + + private handleAcpUpdate(updateParams: AcpSessionUpdateParams): void { + const { update } = updateParams; + + // Check if this is a tool call update + if (update.sessionUpdate === "tool_call" || update.sessionUpdate === "tool_call_update") { + const toolEvent = acpToolCallToSessionEvent( + update as AcpToolCallUpdateInner | AcpToolCallUpdateUpdateInner + ); + if (toolEvent) { + this.dispatchSessionEvent(updateParams.sessionId, toolEvent); + } + return; + } + + // Handle message updates + const sessionEvent = acpSessionUpdateToSessionEvent(updateParams); + if (!sessionEvent) { + return; + } + + this.dispatchSessionEvent(updateParams.sessionId, sessionEvent); + } + + private dispatchSessionEvent(sessionId: string, event: unknown): void { + // Accumulate message_delta content for synthesizing assistant.message later + const typedEvent = event as { type?: string; data?: { deltaContent?: string } }; + if (typedEvent.type === "assistant.message_delta" && typedEvent.data?.deltaContent) { + const existing = this.deltaAccumulator.get(sessionId) ?? ""; + this.deltaAccumulator.set(sessionId, existing + typedEvent.data.deltaContent); + } + + const handlers = this.notificationHandlers.get("session.event"); + if (handlers) { + const notification = { + sessionId, + event, + }; + + for (const handler of handlers) { + try { + handler(notification); + } catch { + // Ignore handler errors + } + } + } + } + + private async handleAcpPermissionRequest( + id: string | number, + params: AcpRequestPermissionParams + ): Promise { + const handler = this.requestHandlers.get("permission.request"); + + if (!handler) { + // No handler registered, return cancelled + const result: AcpRequestPermissionResult = { outcome: "cancelled" }; + this.transport.sendResponse(id, result); + return; + } + + try { + // Translate ACP permission request to Copilot format + const copilotParams = { + sessionId: params.sessionId, + permissionRequest: { + toolCallId: params.toolCall.toolCallId, + title: params.toolCall.title, + kind: params.toolCall.kind, + rawInput: params.toolCall.rawInput, + options: params.options, + }, + }; + + const response = (await handler(copilotParams)) as { result?: { optionId?: string } }; + + // Translate response back to ACP format + const result: AcpRequestPermissionResult = { + outcome: "selected", + optionId: response.result?.optionId, + }; + this.transport.sendResponse(id, result); + } catch { + // Handler rejected or threw - return cancelled + const result: AcpRequestPermissionResult = { outcome: "cancelled" }; + this.transport.sendResponse(id, result); + } + } +} + +/** + * ACP protocol adapter implementation. + * Manages CLI process lifecycle and provides a ProtocolConnection for ACP communication. + */ +export class AcpProtocolAdapter implements ProtocolAdapter { + private cliProcess: ChildProcess | null = null; + private connection: AcpConnection | null = null; + private options: CopilotClientOptions; + private forceStopping = false; + + // Store bound handlers for cleanup + private stderrHandler: ((data: Buffer) => void) | null = null; + private stdinErrorHandler: ((err: Error) => void) | null = null; + private exitHandler: (() => void) | null = null; + + constructor(options: CopilotClientOptions) { + this.options = options; + } + + async start(): Promise { + await this.startCliProcess(); + this.createConnection(); + + // Register process exit handler to cleanup child process + this.exitHandler = () => { + if (this.cliProcess) { + try { + this.cliProcess.kill("SIGKILL"); + } catch { + // Ignore errors during exit cleanup + } + } + }; + process.on("exit", this.exitHandler); + process.on("SIGINT", this.exitHandler); + process.on("SIGTERM", this.exitHandler); + } + + async stop(): Promise { + const errors: Error[] = []; + + if (this.connection) { + try { + this.connection.dispose(); + } catch (error) { + errors.push( + new Error( + `Failed to dispose connection: ${error instanceof Error ? error.message : String(error)}` + ) + ); + } + this.connection = null; + } + + if (this.cliProcess) { + const childProcess = this.cliProcess; + this.cliProcess = null; + + // Remove event listeners to allow process to exit + if (this.stderrHandler && childProcess.stderr) { + childProcess.stderr.removeListener("data", this.stderrHandler); + this.stderrHandler = null; + } + if (this.stdinErrorHandler && childProcess.stdin) { + childProcess.stdin.removeListener("error", this.stdinErrorHandler); + this.stdinErrorHandler = null; + } + + // Remove all listeners from process and streams + childProcess.removeAllListeners(); + childProcess.stdout?.removeAllListeners(); + childProcess.stderr?.removeAllListeners(); + childProcess.stdin?.removeAllListeners(); + + try { + // Kill the process first + childProcess.kill(); + + // Destroy all streams to properly close them + childProcess.stdin?.destroy(); + childProcess.stdout?.destroy(); + childProcess.stderr?.destroy(); + } catch { + // Process may already be dead + } + } + + // Remove process exit handlers + if (this.exitHandler) { + process.removeListener("exit", this.exitHandler); + process.removeListener("SIGINT", this.exitHandler); + process.removeListener("SIGTERM", this.exitHandler); + this.exitHandler = null; + } + + return errors; + } + + async forceStop(): Promise { + this.forceStopping = true; + + if (this.connection) { + try { + this.connection.dispose(); + } catch { + // Ignore errors during force stop + } + this.connection = null; + } + + if (this.cliProcess) { + // Remove event listeners + if (this.stderrHandler && this.cliProcess.stderr) { + this.cliProcess.stderr.removeListener("data", this.stderrHandler); + this.stderrHandler = null; + } + if (this.stdinErrorHandler && this.cliProcess.stdin) { + this.cliProcess.stdin.removeListener("error", this.stdinErrorHandler); + this.stdinErrorHandler = null; + } + this.cliProcess.removeAllListeners(); + + try { + this.cliProcess.kill("SIGKILL"); + } catch { + // Ignore errors + } + this.cliProcess = null; + } + + // Remove process exit handlers + if (this.exitHandler) { + process.removeListener("exit", this.exitHandler); + process.removeListener("SIGINT", this.exitHandler); + process.removeListener("SIGTERM", this.exitHandler); + this.exitHandler = null; + } + } + + getConnection(): ProtocolConnection { + if (!this.connection) { + throw new Error("Connection not established. Call start() first."); + } + return this.connection; + } + + async verifyProtocolVersion(): Promise { + const result = (await this.getConnection().sendRequest("ping", {})) as { + protocolVersion?: number; + }; + + if (result.protocolVersion !== ACP_PROTOCOL_VERSION) { + throw new Error( + `ACP protocol version mismatch: SDK expects version ${ACP_PROTOCOL_VERSION}, ` + + `but server reports version ${result.protocolVersion}.` + ); + } + } + + private startCliProcess(): Promise { + return new Promise((resolve, reject) => { + // For ACP mode, we only use cliArgs - don't add --headless, --stdio, --log-level + const args = [...(this.options.cliArgs ?? [])]; + + // Suppress debug output + const env = { ...(this.options.env ?? process.env) }; + delete env.NODE_DEBUG; + + // Handle different CLI path formats + const cliPath = this.options.cliPath ?? "gemini"; + const isJsFile = cliPath.endsWith(".js"); + const isAbsolutePath = cliPath.startsWith("/") || /^[a-zA-Z]:/.test(cliPath); + + let command: string; + let spawnArgs: string[]; + + if (isJsFile) { + command = "node"; + spawnArgs = [cliPath, ...args]; + } else if (process.platform === "win32" && !isAbsolutePath) { + command = "cmd"; + spawnArgs = ["/c", cliPath, ...args]; + } else { + command = cliPath; + spawnArgs = args; + } + + this.cliProcess = spawn(command, spawnArgs, { + stdio: ["pipe", "pipe", "pipe"], + cwd: this.options.cwd ?? process.cwd(), + env, + }); + + let resolved = false; + + // For ACP mode with stdio, we're ready immediately after spawn + resolved = true; + resolve(); + + // Store handler for cleanup + this.stderrHandler = (data: Buffer) => { + // Forward CLI stderr to parent's stderr + const lines = data.toString().split("\n"); + for (const line of lines) { + if (line.trim()) { + process.stderr.write(`[ACP CLI] ${line}\n`); + } + } + }; + this.cliProcess.stderr?.on("data", this.stderrHandler); + + this.cliProcess.on("error", (error) => { + if (!resolved) { + resolved = true; + reject(new Error(`Failed to start ACP CLI: ${error.message}`)); + } + }); + + this.cliProcess.on("exit", (code) => { + if (!resolved) { + resolved = true; + reject(new Error(`ACP CLI exited with code ${code}`)); + } + }); + + // Handle stdin errors during force stop + this.stdinErrorHandler = (err: Error) => { + if (!this.forceStopping) { + throw err; + } + }; + this.cliProcess.stdin?.on("error", this.stdinErrorHandler); + + // Timeout after 10 seconds + setTimeout(() => { + if (!resolved) { + resolved = true; + reject(new Error("Timeout waiting for ACP CLI to start")); + } + }, 10000); + }); + } + + private createConnection(): void { + if (!this.cliProcess || !this.cliProcess.stdout || !this.cliProcess.stdin) { + throw new Error("CLI process not started"); + } + + const transport = new AcpTransport(this.cliProcess.stdout, this.cliProcess.stdin); + this.connection = new AcpConnection(transport); + } +} diff --git a/nodejs/src/protocols/acp/acp-mapper.ts b/nodejs/src/protocols/acp/acp-mapper.ts new file mode 100644 index 00000000..83e87cc0 --- /dev/null +++ b/nodejs/src/protocols/acp/acp-mapper.ts @@ -0,0 +1,334 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + *--------------------------------------------------------------------------------------------*/ + +/** + * ACP format translation utilities. + * Maps between Copilot SDK formats and ACP formats. + * @module protocols/acp/acp-mapper + */ + +import type { + SessionEvent, + SessionConfig, + MessageOptions, + MCPLocalServerConfig, +} from "../../types.js"; +import type { + AcpContentPart, + AcpSessionNewParams, + AcpSessionPromptParams, + AcpSessionUpdate, + AcpSessionUpdateParams, + AcpMcpServerConfig, + AcpToolCallUpdateInner, + AcpToolCallUpdateUpdateInner, +} from "./acp-types.js"; + +let eventIdCounter = 0; + +/** + * Generates a unique event ID for ACP-mapped events. + */ +function generateEventId(): string { + return `acp-${Date.now()}-${++eventIdCounter}`; +} + +/** + * Creates common event metadata fields. + */ +function createEventMetadata(): { id: string; timestamp: string; parentId: null } { + return { + id: generateEventId(), + timestamp: new Date().toISOString(), + parentId: null, + }; +} + +/** + * Converts a string prompt to ACP content array format. + */ +export function stringToAcpContent(prompt: string): AcpContentPart[] { + return [{ type: "text", text: prompt }]; +} + +/** + * Maps an ACP session/update notification (Gemini format) to a Copilot SessionEvent. + * This handles the actual wire format from Gemini CLI. + */ +export function acpSessionUpdateToSessionEvent( + params: AcpSessionUpdateParams +): SessionEvent | null { + const meta = createEventMetadata(); + const { update } = params; + + switch (update.sessionUpdate) { + case "agent_message_chunk": + return { + ...meta, + ephemeral: true, + type: "assistant.message_delta", + data: { + messageId: meta.id, + deltaContent: update.content?.text ?? "", + }, + }; + + case "agent_thought_chunk": + return { + ...meta, + ephemeral: true, + type: "assistant.reasoning_delta", + data: { + reasoningId: meta.id, + deltaContent: update.content?.text ?? "", + }, + }; + + case "agent_message": + return { + ...meta, + type: "assistant.message", + data: { + messageId: meta.id, + content: update.content?.text ?? "", + toolRequests: [], + }, + }; + + case "end_turn": + return { + ...meta, + ephemeral: true, + type: "session.idle", + data: {}, + }; + + case "error": + return { + ...meta, + type: "session.error", + data: { + errorType: "internal", + message: update.message ?? "Unknown error", + }, + }; + + default: + return null; + } +} + +/** + * Maps a flat ACP session update (legacy/test format) to a Copilot SessionEvent. + * This is primarily used for unit tests. + */ +export function acpUpdateToSessionEvent(update: AcpSessionUpdate): SessionEvent | null { + const meta = createEventMetadata(); + + switch (update.type) { + case "agent_message_chunk": + return { + ...meta, + ephemeral: true, + type: "assistant.message_delta", + data: { + messageId: update.messageId, + deltaContent: update.content, + }, + }; + + case "agent_thought_chunk": + return { + ...meta, + ephemeral: true, + type: "assistant.reasoning_delta", + data: { + reasoningId: update.messageId, + deltaContent: update.content, + }, + }; + + case "agent_message": + return { + ...meta, + type: "assistant.message", + data: { + messageId: update.messageId, + content: update.content, + toolRequests: [], + }, + }; + + case "end_turn": + return { + ...meta, + ephemeral: true, + type: "session.idle", + data: {}, + }; + + case "error": + return { + ...meta, + type: "session.error", + data: { + errorType: "internal", + message: update.message, + }, + }; + + default: + return null; + } +} + +/** + * Helper to extract text content from ACP tool call content array. + */ +function extractToolCallContent(content?: { type: string; text: string }[]): string { + if (!content || content.length === 0) { + return ""; + } + return content.map((c) => c.text).join("\n"); +} + +/** + * Maps ACP tool_call or tool_call_update to Copilot SessionEvent. + * Handles both initial tool calls and updates to existing tool calls. + */ +export function acpToolCallToSessionEvent( + update: AcpToolCallUpdateInner | AcpToolCallUpdateUpdateInner +): SessionEvent | null { + const meta = createEventMetadata(); + + if (update.sessionUpdate === "tool_call") { + const toolCall = update as AcpToolCallUpdateInner; + + // If status is completed or failed, return tool.execution_complete + if (toolCall.status === "completed" || toolCall.status === "failed") { + const content = extractToolCallContent(toolCall.content); + const success = toolCall.status === "completed"; + + return { + ...meta, + type: "tool.execution_complete", + data: { + toolCallId: toolCall.toolCallId, + success, + ...(success + ? { result: { content } } + : { error: { message: content || "Tool execution failed" } }), + }, + }; + } + + // Otherwise, return tool.execution_start + return { + ...meta, + type: "tool.execution_start", + data: { + toolCallId: toolCall.toolCallId, + toolName: toolCall.kind, + ...(toolCall.rawInput !== undefined && { arguments: toolCall.rawInput }), + }, + }; + } + + if (update.sessionUpdate === "tool_call_update") { + const toolCallUpdate = update as AcpToolCallUpdateUpdateInner; + const content = extractToolCallContent(toolCallUpdate.content); + + // If status is completed, return tool.execution_complete with success + if (toolCallUpdate.status === "completed") { + return { + ...meta, + type: "tool.execution_complete", + data: { + toolCallId: toolCallUpdate.toolCallId, + success: true, + result: { content }, + }, + }; + } + + // If status is failed, return tool.execution_complete with error + if (toolCallUpdate.status === "failed") { + return { + ...meta, + type: "tool.execution_complete", + data: { + toolCallId: toolCallUpdate.toolCallId, + success: false, + error: { message: content || "Tool execution failed" }, + }, + }; + } + + // If status is running (or undefined), return tool.execution_progress + return { + ...meta, + ephemeral: true, + type: "tool.execution_progress", + data: { + toolCallId: toolCallUpdate.toolCallId, + progressMessage: content, + }, + }; + } + + return null; +} + +/** + * Converts Copilot SessionConfig to ACP session/new params. + */ +export function copilotSessionConfigToAcpParams(config: SessionConfig): AcpSessionNewParams { + const params: AcpSessionNewParams = {}; + + if (config.workingDirectory) { + params.cwd = config.workingDirectory; + } + + if (config.mcpServers) { + const acpMcpServers: Record = {}; + + for (const [name, serverConfig] of Object.entries(config.mcpServers)) { + // Only include local/stdio servers, not remote ones + if (serverConfig.type === "http" || serverConfig.type === "sse") { + continue; + } + + const localConfig = serverConfig as MCPLocalServerConfig; + const acpConfig: AcpMcpServerConfig = { + command: localConfig.command, + args: localConfig.args, + }; + + if (localConfig.env) { + acpConfig.env = localConfig.env; + } + + acpMcpServers[name] = acpConfig; + } + + if (Object.keys(acpMcpServers).length > 0) { + params.mcpServers = acpMcpServers; + } + } + + return params; +} + +/** + * Converts Copilot MessageOptions to ACP session/prompt params. + */ +export function copilotMessageOptionsToAcpParams( + sessionId: string, + options: MessageOptions +): AcpSessionPromptParams { + return { + sessionId, + prompt: stringToAcpContent(options.prompt), + }; +} diff --git a/nodejs/src/protocols/acp/acp-transport.ts b/nodejs/src/protocols/acp/acp-transport.ts new file mode 100644 index 00000000..ee5da073 --- /dev/null +++ b/nodejs/src/protocols/acp/acp-transport.ts @@ -0,0 +1,375 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + *--------------------------------------------------------------------------------------------*/ + +/** + * NDJSON (Newline-Delimited JSON) transport for ACP protocol. + * @module protocols/acp/acp-transport + */ + +import type { Readable, Writable } from "node:stream"; +import type { + AcpMessage, + AcpNotification, + AcpRequest, + AcpResponse, + AcpError, +} from "./acp-types.js"; + +/** + * Pending request tracker + */ +interface PendingRequest { + resolve: (result: unknown) => void; + reject: (error: Error) => void; +} + +/** + * NDJSON transport for ACP protocol. + * Handles reading and writing JSON-RPC 2.0 messages over newline-delimited streams. + */ +export class AcpTransport { + private buffer = ""; + private pendingRequests: Map = new Map(); + private notificationHandlers: Map void>> = new Map(); + private requestHandlers: Map Promise> = + new Map(); + private messageHandlers: Set<(message: AcpMessage) => void> = new Set(); + private closeHandlers: Set<() => void> = new Set(); + private errorHandlers: Set<(error: Error) => void> = new Set(); + private isListening = false; + private disposed = false; + + // Bound handlers for proper removal + private boundDataHandler: ((chunk: Buffer) => void) | null = null; + private boundEndHandler: (() => void) | null = null; + private boundErrorHandler: ((error: Error) => void) | null = null; + + constructor( + private readonly inputStream: Readable, + private readonly outputStream: Writable + ) {} + + /** + * Starts listening for incoming messages. + */ + listen(): void { + if (this.isListening || this.disposed) { + return; + } + this.isListening = true; + + // Create bound handlers so we can remove them later + this.boundDataHandler = (chunk: Buffer) => { + if (this.disposed) return; + this.handleData(chunk.toString()); + }; + + this.boundEndHandler = () => { + if (this.disposed) return; + this.handleClose(); + }; + + this.boundErrorHandler = (error: Error) => { + if (this.disposed) return; + this.emitError(error); + }; + + this.inputStream.on("data", this.boundDataHandler); + this.inputStream.on("end", this.boundEndHandler); + this.inputStream.on("error", this.boundErrorHandler); + } + + /** + * Sends a raw message. + */ + send(message: AcpMessage): void { + const json = JSON.stringify(message) + "\n"; + this.outputStream.write(json); + } + + /** + * Sends a request and returns a promise for the response. + */ + sendRequest(id: string | number, method: string, params?: unknown): Promise { + return new Promise((resolve, reject) => { + const request: AcpRequest = { + jsonrpc: "2.0", + id, + method, + params, + }; + + this.pendingRequests.set(id, { + resolve: resolve as (result: unknown) => void, + reject, + }); + + this.send(request); + }); + } + + /** + * Sends a notification (no response expected). + */ + sendNotification(method: string, params?: unknown): void { + const notification: AcpNotification = { + jsonrpc: "2.0", + method, + params, + }; + this.send(notification); + } + + /** + * Registers a handler for all incoming messages. + */ + onMessage(handler: (message: AcpMessage) => void): void { + this.messageHandlers.add(handler); + } + + /** + * Registers a handler for a specific notification method. + */ + onNotification(method: string, handler: (params: unknown) => void): void { + if (!this.notificationHandlers.has(method)) { + this.notificationHandlers.set(method, new Set()); + } + this.notificationHandlers.get(method)!.add(handler); + } + + /** + * Registers a handler for incoming requests (server-to-client). + */ + onRequest( + method: string, + handler: (id: string | number, params: unknown) => Promise + ): void { + this.requestHandlers.set(method, handler); + } + + /** + * Sends a response to an incoming request. + */ + sendResponse(id: string | number, result: unknown): void { + const response: AcpResponse = { + jsonrpc: "2.0", + id, + result, + }; + this.send(response); + } + + /** + * Registers a handler for close events. + */ + onClose(handler: () => void): void { + this.closeHandlers.add(handler); + } + + /** + * Registers a handler for error events. + */ + onError(handler: (error: Error) => void): void { + this.errorHandlers.add(handler); + } + + /** + * Disposes of the transport and releases resources. + */ + dispose(): void { + this.disposed = true; + this.isListening = false; + + // Remove event listeners from input stream + if (this.boundDataHandler) { + this.inputStream.removeListener("data", this.boundDataHandler); + this.boundDataHandler = null; + } + if (this.boundEndHandler) { + this.inputStream.removeListener("end", this.boundEndHandler); + this.boundEndHandler = null; + } + if (this.boundErrorHandler) { + this.inputStream.removeListener("error", this.boundErrorHandler); + this.boundErrorHandler = null; + } + + // Remove all remaining listeners from streams + this.inputStream.removeAllListeners(); + this.outputStream.removeAllListeners(); + + // Reject all pending requests + for (const pending of this.pendingRequests.values()) { + pending.reject(new Error("Transport disposed")); + } + this.pendingRequests.clear(); + + // Clear handlers + this.messageHandlers.clear(); + this.notificationHandlers.clear(); + this.requestHandlers.clear(); + this.closeHandlers.clear(); + this.errorHandlers.clear(); + } + + private handleData(data: string): void { + this.buffer += data; + + // Process complete lines + let newlineIndex: number; + while ((newlineIndex = this.buffer.indexOf("\n")) !== -1) { + const line = this.buffer.slice(0, newlineIndex); + this.buffer = this.buffer.slice(newlineIndex + 1); + + // Skip empty lines + if (line.trim() === "") { + continue; + } + + this.processLine(line); + } + } + + private processLine(line: string): void { + let message: AcpMessage; + + try { + message = JSON.parse(line) as AcpMessage; + } catch (_error) { + this.emitError(new Error(`Failed to parse JSON: ${line}`)); + return; + } + + // Emit to general message handlers + for (const handler of this.messageHandlers) { + try { + handler(message); + } catch { + // Ignore handler errors + } + } + + // Check if it's a response (has id but no method) + if ("id" in message && !("method" in message)) { + this.handleResponse(message as AcpResponse); + return; + } + + // Check if it's a notification (has method but no id) + if ("method" in message && !("id" in message)) { + this.handleNotification(message as AcpNotification); + return; + } + + // Request with both id and method - incoming request from server + if ("id" in message && "method" in message) { + this.handleIncomingRequest(message as AcpRequest); + return; + } + } + + private handleIncomingRequest(request: AcpRequest): void { + const handler = this.requestHandlers.get(request.method); + if (!handler) { + // No handler registered, send error response + const errorResponse: AcpResponse = { + jsonrpc: "2.0", + id: request.id, + error: { + code: -32601, + message: `Method not found: ${request.method}`, + }, + }; + this.send(errorResponse); + return; + } + + // Call the handler - it's responsible for sending the response + handler(request.id, request.params).catch((error) => { + // If handler fails, send error response + const errorResponse: AcpResponse = { + jsonrpc: "2.0", + id: request.id, + error: { + code: -32603, + message: error instanceof Error ? error.message : "Internal error", + }, + }; + this.send(errorResponse); + }); + } + + private handleResponse(response: AcpResponse): void { + const pending = this.pendingRequests.get(response.id); + if (!pending) { + return; + } + + this.pendingRequests.delete(response.id); + + if (response.error) { + const error = this.createError(response.error); + pending.reject(error); + } else { + pending.resolve(response.result); + } + } + + private handleNotification(notification: AcpNotification): void { + const handlers = this.notificationHandlers.get(notification.method); + if (!handlers) { + return; + } + + for (const handler of handlers) { + try { + handler(notification.params); + } catch { + // Ignore handler errors + } + } + } + + private handleClose(): void { + // Reject all pending requests + for (const pending of this.pendingRequests.values()) { + pending.reject(new Error("Connection closed")); + } + this.pendingRequests.clear(); + + // Emit close event + for (const handler of this.closeHandlers) { + try { + handler(); + } catch { + // Ignore handler errors + } + } + } + + private emitError(error: Error): void { + for (const handler of this.errorHandlers) { + try { + handler(error); + } catch { + // Ignore handler errors + } + } + } + + private createError(acpError: AcpError): Error { + // Include data in error message if available for better debugging + let message = acpError.message; + if (acpError.data) { + const dataStr = + typeof acpError.data === "string" ? acpError.data : JSON.stringify(acpError.data); + message = `${message}: ${dataStr}`; + } + + const error = new Error(message); + (error as Error & { code?: number; data?: unknown }).code = acpError.code; + (error as Error & { code?: number; data?: unknown }).data = acpError.data; + return error; + } +} diff --git a/nodejs/src/protocols/acp/acp-types.ts b/nodejs/src/protocols/acp/acp-types.ts new file mode 100644 index 00000000..be2b2d88 --- /dev/null +++ b/nodejs/src/protocols/acp/acp-types.ts @@ -0,0 +1,393 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + *--------------------------------------------------------------------------------------------*/ + +/** + * ACP (Agent Client Protocol) message types. + * @module protocols/acp/acp-types + */ + +/** + * ACP protocol version + */ +export const ACP_PROTOCOL_VERSION = 1; + +/** + * Base ACP message structure (JSON-RPC 2.0 compatible) + */ +export interface AcpMessage { + jsonrpc: "2.0"; + id?: string | number; +} + +/** + * ACP request message + */ +export interface AcpRequest extends AcpMessage { + id: string | number; + method: string; + params?: unknown; +} + +/** + * ACP response message + */ +export interface AcpResponse extends AcpMessage { + id: string | number; + result?: unknown; + error?: AcpError; +} + +/** + * ACP notification message (no id, no response expected) + */ +export interface AcpNotification extends AcpMessage { + method: string; + params?: unknown; +} + +/** + * ACP error object + */ +export interface AcpError { + code: number; + message: string; + data?: unknown; +} + +// ============================================================================ +// ACP Initialize Types +// ============================================================================ + +/** + * ACP initialize request params + */ +export interface AcpInitializeParams { + protocolVersion: number; +} + +/** + * ACP initialize response + */ +export interface AcpInitializeResult { + protocolVersion: number; + capabilities?: AcpServerCapabilities; +} + +/** + * Server capabilities reported by ACP server + */ +export interface AcpServerCapabilities { + streaming?: boolean; + tools?: boolean; +} + +// ============================================================================ +// ACP Session Configuration Types +// ============================================================================ + +/** + * ACP session/set_config_option request params + * @see https://agentclientprotocol.com/protocol/session-config-options + */ +export interface AcpSetConfigOptionParams { + sessionId: string; + configId: string; + value: string; +} + +/** + * ACP session/set_config_option response + * Returns the complete configuration state after the change. + */ +export interface AcpSetConfigOptionResult { + [configId: string]: string; +} + +// ============================================================================ +// ACP Session Types +// ============================================================================ + +/** + * ACP session/new request params + */ +export interface AcpSessionNewParams { + cwd?: string; + mcpServers?: Record; +} + +/** + * ACP MCP server configuration + */ +export interface AcpMcpServerConfig { + command: string; + args?: string[]; + env?: Record; +} + +/** + * ACP session/new response + */ +export interface AcpSessionNewResult { + sessionId: string; +} + +/** + * ACP content part (for prompt messages) + */ +export interface AcpContentPart { + type: "text"; + text: string; +} + +/** + * ACP session/prompt request params + */ +export interface AcpSessionPromptParams { + sessionId: string; + prompt: AcpContentPart[]; +} + +/** + * ACP session/prompt response + */ +export interface AcpSessionPromptResult { + messageId?: string; + /** + * Indicates why the turn ended (Gemini CLI returns this in the response + * instead of sending a separate end_turn notification) + */ + stopReason?: "end_turn" | "max_tokens" | "stop_sequence"; +} + +// ============================================================================ +// ACP Update Notification Types +// ============================================================================ + +/** + * ACP content structure + */ +export interface AcpTextContent { + type: "text"; + text: string; +} + +/** + * ACP tool call status + */ +export type AcpToolCallStatus = "running" | "completed" | "failed"; + +/** + * ACP tool call kind (affects UI treatment) + */ +export type AcpToolCallKind = "file_edit" | "file_read" | "command" | "search" | "other"; + +/** + * ACP tool call content result + */ +export interface AcpToolCallContent { + type: "text"; + text: string; +} + +/** + * ACP tool call location (affected file paths) + */ +export interface AcpToolCallLocation { + path: string; +} + +/** + * ACP update inner structure for message updates + */ +export interface AcpMessageUpdateInner { + sessionUpdate: + | "agent_message_chunk" + | "agent_thought_chunk" + | "agent_message" + | "end_turn" + | "error"; + content?: AcpTextContent; + message?: string; + code?: string; +} + +/** + * ACP update inner structure for tool_call + */ +export interface AcpToolCallUpdateInner { + sessionUpdate: "tool_call"; + toolCallId: string; + title: string; + kind: AcpToolCallKind; + status: AcpToolCallStatus; + rawInput?: unknown; + rawOutput?: unknown; + content?: AcpToolCallContent[]; + locations?: AcpToolCallLocation[]; +} + +/** + * ACP update inner structure for tool_call_update + */ +export interface AcpToolCallUpdateUpdateInner { + sessionUpdate: "tool_call_update"; + toolCallId: string; + status?: AcpToolCallStatus; + rawOutput?: unknown; + content?: AcpToolCallContent[]; + locations?: AcpToolCallLocation[]; +} + +/** + * ACP update inner structure (union of all update types) + */ +export type AcpUpdateInner = + | AcpMessageUpdateInner + | AcpToolCallUpdateInner + | AcpToolCallUpdateUpdateInner; + +/** + * ACP session/update notification params (actual Gemini format) + */ +export interface AcpSessionUpdateParams { + sessionId: string; + update: AcpUpdateInner; +} + +/** + * Legacy flat format types (for backwards compatibility in tests) + */ +export interface AcpAgentMessageChunkParams { + sessionId: string; + type: "agent_message_chunk"; + messageId: string; + content: string; +} + +export interface AcpAgentThoughtChunkParams { + sessionId: string; + type: "agent_thought_chunk"; + messageId: string; + content: string; +} + +export interface AcpAgentMessageParams { + sessionId: string; + type: "agent_message"; + messageId: string; + content: string; +} + +export interface AcpEndTurnParams { + sessionId: string; + type: "end_turn"; +} + +export interface AcpErrorParams { + sessionId: string; + type: "error"; + message: string; + code?: string; +} + +/** + * Union of legacy flat update types + */ +export type AcpSessionUpdate = + | AcpAgentMessageChunkParams + | AcpAgentThoughtChunkParams + | AcpAgentMessageParams + | AcpEndTurnParams + | AcpErrorParams; + +// ============================================================================ +// ACP Permission Request Types +// ============================================================================ + +/** + * ACP permission option kind (semantic hint for UI) + */ +export type AcpPermissionOptionKind = + | "allow_once" + | "allow_always" + | "reject_once" + | "reject_always"; + +/** + * ACP permission option + */ +export interface AcpPermissionOption { + optionId: string; + name: string; + kind: AcpPermissionOptionKind; +} + +/** + * ACP tool call info for permission requests + */ +export interface AcpPermissionToolCall { + toolCallId: string; + title: string; + kind: AcpToolCallKind; + rawInput?: unknown; +} + +/** + * ACP session/request_permission request params + */ +export interface AcpRequestPermissionParams { + sessionId: string; + toolCall: AcpPermissionToolCall; + options: AcpPermissionOption[]; +} + +/** + * ACP session/request_permission response + */ +export interface AcpRequestPermissionResult { + outcome: "selected" | "cancelled"; + optionId?: string; +} + +// ============================================================================ +// Method Name Mappings +// ============================================================================ + +/** + * Copilot to ACP method name mapping + */ +export const COPILOT_TO_ACP_METHODS: Record = { + ping: "initialize", + "session.create": "session/new", + "session.send": "session/prompt", +}; + +/** + * ACP to Copilot notification mapping + */ +export const ACP_TO_COPILOT_NOTIFICATIONS: Record = { + "session/update": "session.event", +}; + +/** + * Methods that are not supported in ACP mode + */ +export const UNSUPPORTED_ACP_METHODS = [ + "models.list", + "session.resume", + "session.getMessages", + "session.list", + "session.getLastId", + "session.delete", + "session.getForeground", + "session.setForeground", + "status.get", + "auth.getStatus", + // permission.request is now supported via session/request_permission + "userInput.request", + "hooks.invoke", + // tool.call is now supported via session/update tool_call events +] as const; + +export type UnsupportedAcpMethod = (typeof UNSUPPORTED_ACP_METHODS)[number]; diff --git a/nodejs/src/protocols/acp/index.ts b/nodejs/src/protocols/acp/index.ts new file mode 100644 index 00000000..a1fe9552 --- /dev/null +++ b/nodejs/src/protocols/acp/index.ts @@ -0,0 +1,12 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + *--------------------------------------------------------------------------------------------*/ + +/** + * ACP (Agent Client Protocol) support + */ + +export { AcpProtocolAdapter } from "./acp-adapter.js"; +export { AcpTransport } from "./acp-transport.js"; +export * from "./acp-types.js"; +export * from "./acp-mapper.js"; diff --git a/nodejs/src/protocols/index.ts b/nodejs/src/protocols/index.ts new file mode 100644 index 00000000..de275235 --- /dev/null +++ b/nodejs/src/protocols/index.ts @@ -0,0 +1,10 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + *--------------------------------------------------------------------------------------------*/ + +/** + * Protocol adapters for Copilot SDK + */ + +export type { ProtocolAdapter, ProtocolConnection } from "./protocol-adapter.js"; +export { AcpProtocolAdapter } from "./acp/index.js"; diff --git a/nodejs/src/protocols/protocol-adapter.ts b/nodejs/src/protocols/protocol-adapter.ts new file mode 100644 index 00000000..104c8168 --- /dev/null +++ b/nodejs/src/protocols/protocol-adapter.ts @@ -0,0 +1,101 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + *--------------------------------------------------------------------------------------------*/ + +/** + * Protocol adapter interfaces for supporting multiple CLI protocols. + * @module protocols/protocol-adapter + */ + +/** + * Abstract connection interface for protocol adapters. + * Provides a unified API for sending requests and notifications regardless of the underlying protocol. + */ +export interface ProtocolConnection { + /** + * Sends a request and waits for a response. + * @param method - The method name + * @param params - Optional parameters + * @returns A promise resolving to the response + */ + sendRequest(method: string, params?: unknown): Promise; + + /** + * Sends a notification (no response expected). + * @param method - The method name + * @param params - Optional parameters + */ + sendNotification(method: string, params?: unknown): void; + + /** + * Registers a handler for incoming notifications. + * @param method - The method name to handle + * @param handler - The handler function + */ + onNotification(method: string, handler: (params: unknown) => void): void; + + /** + * Registers a handler for incoming requests. + * @param method - The method name to handle + * @param handler - The handler function returning a response + */ + onRequest(method: string, handler: (params: unknown) => Promise): void; + + /** + * Registers a handler for connection close events. + * @param handler - The handler function + */ + onClose(handler: () => void): void; + + /** + * Registers a handler for connection errors. + * @param handler - The handler function + */ + onError(handler: (error: Error) => void): void; + + /** + * Disposes of the connection and releases resources. + */ + dispose(): void; + + /** + * Starts listening for incoming messages. + */ + listen(): void; +} + +/** + * Protocol adapter interface for managing CLI server lifecycle and connection. + * Implementations handle protocol-specific details like message framing and method translation. + */ +export interface ProtocolAdapter { + /** + * Starts the CLI server process (if not connecting to external server). + * @returns A promise that resolves when the server is ready + */ + start(): Promise; + + /** + * Gracefully stops the CLI server and closes all resources. + * @returns A promise resolving to an array of errors encountered during cleanup + */ + stop(): Promise; + + /** + * Forcefully stops the CLI server without graceful cleanup. + * @returns A promise that resolves when the force stop is complete + */ + forceStop(): Promise; + + /** + * Gets the protocol connection for sending requests and handling notifications. + * @returns The protocol connection + */ + getConnection(): ProtocolConnection; + + /** + * Verifies that the server's protocol version is compatible with the SDK. + * @returns A promise that resolves if compatible, rejects otherwise + */ + verifyProtocolVersion(): Promise; +} diff --git a/nodejs/src/types.ts b/nodejs/src/types.ts index 79692b78..6ed46969 100644 --- a/nodejs/src/types.ts +++ b/nodejs/src/types.ts @@ -89,6 +89,14 @@ export interface CopilotClientOptions { * @default true (but defaults to false when githubToken is provided) */ useLoggedInUser?: boolean; + + /** + * Protocol to use for communication with the CLI. + * - "copilot": Standard Copilot CLI protocol (JSON-RPC over LSP Content-Length framing) + * - "acp": Agent Client Protocol (NDJSON framing, different method names) + * @default "copilot" + */ + protocol?: "copilot" | "acp"; } /** diff --git a/nodejs/test/client.test.ts b/nodejs/test/client.test.ts index 5d1ed8ac..1ab89e7c 100644 --- a/nodejs/test/client.test.ts +++ b/nodejs/test/client.test.ts @@ -177,6 +177,34 @@ describe("CopilotClient", () => { }); }); + describe("Protocol options", () => { + it("should default to copilot protocol", () => { + const client = new CopilotClient({ + logLevel: "error", + }); + + expect((client as any).options.protocol).toBe("copilot"); + }); + + it("should accept acp protocol option", () => { + const client = new CopilotClient({ + protocol: "acp", + logLevel: "error", + }); + + expect((client as any).options.protocol).toBe("acp"); + }); + + it("should accept copilot protocol option explicitly", () => { + const client = new CopilotClient({ + protocol: "copilot", + logLevel: "error", + }); + + expect((client as any).options.protocol).toBe("copilot"); + }); + }); + describe("Auth options", () => { it("should accept githubToken option", () => { const client = new CopilotClient({ diff --git a/nodejs/test/e2e/acp.test.ts b/nodejs/test/e2e/acp.test.ts new file mode 100644 index 00000000..0b25eb94 --- /dev/null +++ b/nodejs/test/e2e/acp.test.ts @@ -0,0 +1,349 @@ +/** + * E2E tests for ACP (Agent Client Protocol) support. + * + * These tests require an ACP-compatible CLI to be available. + * By default, tests look for 'gemini' CLI with --experimental-acp flag. + * + * To run these tests: + * 1. Install Gemini CLI: npm install -g @anthropic/gemini-cli + * 2. Set environment variable: ACP_CLI_PATH=gemini (or path to your ACP CLI) + * 3. Run: npm test -- --run test/e2e/acp.test.ts + * + * Skip these tests by not setting ACP_CLI_PATH environment variable. + */ + +import { describe, it, expect, beforeAll, beforeEach, afterEach } from "vitest"; +import { CopilotClient } from "../../src/index.js"; +import type { SessionEvent } from "../../src/types.js"; + +// Get ACP CLI path from environment, skip tests if not set +const ACP_CLI_PATH = process.env.ACP_CLI_PATH; +const ACP_CLI_ARGS = process.env.ACP_CLI_ARGS?.split(" ") ?? ["--experimental-acp"]; + +// Helper to check if ACP CLI is available +async function isAcpCliAvailable(): Promise { + if (!ACP_CLI_PATH) { + return false; + } + + try { + const { execSync } = await import("node:child_process"); + execSync(`which ${ACP_CLI_PATH}`, { stdio: "ignore" }); + return true; + } catch { + return false; + } +} + +describe.skipIf(!ACP_CLI_PATH)("ACP E2E Tests", () => { + let client: CopilotClient; + let cliAvailable: boolean; + + beforeAll(async () => { + cliAvailable = await isAcpCliAvailable(); + if (!cliAvailable) { + console.warn( + `Skipping ACP E2E tests: CLI '${ACP_CLI_PATH}' not found. ` + + `Set ACP_CLI_PATH environment variable to enable these tests.` + ); + } + }); + + beforeEach(() => { + if (!cliAvailable) return; + + client = new CopilotClient({ + cliPath: ACP_CLI_PATH!, + cliArgs: ACP_CLI_ARGS, + protocol: "acp", + autoStart: false, + }); + }); + + afterEach(async () => { + if (client) { + await client.forceStop(); + } + }); + + describe("connection", () => { + it.skipIf(!ACP_CLI_PATH)( + "should connect to ACP CLI and verify protocol version", + async () => { + await client.start(); + + expect(client.getState()).toBe("connected"); + } + ); + + it.skipIf(!ACP_CLI_PATH)("should handle ping request", async () => { + await client.start(); + + const response = await client.ping("test"); + + expect(response.message).toBe("pong"); + expect(response.protocolVersion).toBeDefined(); + expect(typeof response.timestamp).toBe("number"); + }); + }); + + describe("session", () => { + it.skipIf(!ACP_CLI_PATH)("should create a session", async () => { + await client.start(); + + const session = await client.createSession(); + + expect(session.sessionId).toBeDefined(); + expect(typeof session.sessionId).toBe("string"); + }); + + it.skipIf(!ACP_CLI_PATH)("should send a message and receive response", async () => { + await client.start(); + + const session = await client.createSession(); + + const events: SessionEvent[] = []; + session.on((event) => { + events.push(event); + }); + + // Send a simple prompt + await session.send({ prompt: "Say hello in exactly 3 words." }); + + // Wait for session.idle event (with timeout) + const timeout = 30000; + const startTime = Date.now(); + + while (Date.now() - startTime < timeout) { + if (events.some((e) => e.type === "session.idle")) { + break; + } + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + // Verify we received events + expect(events.length).toBeGreaterThan(0); + + // Should have received either streaming deltas or a final message + const hasDelta = events.some((e) => e.type === "assistant.message_delta"); + const hasMessage = events.some((e) => e.type === "assistant.message"); + const hasIdle = events.some((e) => e.type === "session.idle"); + + expect(hasDelta || hasMessage).toBe(true); + expect(hasIdle).toBe(true); + }); + + it.skipIf(!ACP_CLI_PATH)( + "should receive streaming content via assistant.message_delta", + async () => { + await client.start(); + + const session = await client.createSession(); + + const deltas: string[] = []; + session.on("assistant.message_delta", (event) => { + deltas.push(event.data.deltaContent); + }); + + let idleReceived = false; + session.on("session.idle", () => { + idleReceived = true; + }); + + await session.send({ prompt: "Count from 1 to 5, one number per line." }); + + // Wait for idle + const timeout = 30000; + const startTime = Date.now(); + while (!idleReceived && Date.now() - startTime < timeout) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + // Should have received multiple deltas for streaming + expect(deltas.length).toBeGreaterThan(0); + + // Concatenated content should contain the numbers + const fullContent = deltas.join(""); + expect(fullContent.length).toBeGreaterThan(0); + } + ); + }); + + describe("tool calls", () => { + it.skipIf(!ACP_CLI_PATH)("should receive tool events when agent uses tools", async () => { + await client.start(); + + const session = await client.createSession({ + workingDirectory: process.cwd(), + }); + + const toolEvents: SessionEvent[] = []; + const allEvents: SessionEvent[] = []; + + session.on((event) => { + allEvents.push(event); + if ( + event.type === "tool.execution_start" || + event.type === "tool.execution_progress" || + event.type === "tool.execution_complete" + ) { + toolEvents.push(event); + } + }); + + // Prompt that should trigger file read tool + await session.send({ + prompt: "Read the package.json file in the current directory and tell me the package name.", + }); + + // Wait for session.idle + const timeout = 60000; // Longer timeout for tool operations + const startTime = Date.now(); + let idleReceived = false; + + while (!idleReceived && Date.now() - startTime < timeout) { + if (allEvents.some((e) => e.type === "session.idle")) { + idleReceived = true; + break; + } + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + expect(idleReceived).toBe(true); + + // Log for debugging + console.log(`Received ${toolEvents.length} tool events:`); + for (const event of toolEvents) { + console.log(` - ${event.type}:`, JSON.stringify(event.data, null, 2)); + } + + // We may or may not get tool events depending on how the agent responds + // Just verify we completed without error + expect(allEvents.length).toBeGreaterThan(0); + }); + + it.skipIf(!ACP_CLI_PATH)( + "should receive tool.execution_start and tool.execution_complete", + async () => { + await client.start(); + + const session = await client.createSession({ + workingDirectory: process.cwd(), + }); + + let toolStarted = false; + let toolCompleted = false; + + session.on("tool.execution_start", (event) => { + console.log("Tool started:", event.data.toolName, event.data.toolCallId); + toolStarted = true; + }); + + session.on("tool.execution_complete", (event) => { + console.log( + "Tool completed:", + event.data.toolCallId, + event.data.success ? "success" : "failed" + ); + toolCompleted = true; + }); + + let idleReceived = false; + session.on("session.idle", () => { + idleReceived = true; + }); + + // Ask to list files - should trigger tool call + await session.send({ + prompt: "List the files in the current directory.", + }); + + // Wait for idle + const timeout = 60000; + const startTime = Date.now(); + while (!idleReceived && Date.now() - startTime < timeout) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + expect(idleReceived).toBe(true); + + // Log whether tool events were received + console.log(`Tool started: ${toolStarted}, Tool completed: ${toolCompleted}`); + + // Note: Tool events depend on whether Gemini decides to use tools + // This test verifies the event handling works when tools are used + } + ); + }); + + describe("error handling", () => { + it.skipIf(!ACP_CLI_PATH)("should throw for unsupported methods", async () => { + await client.start(); + + // listModels is not supported in ACP mode + await expect(client.listModels()).rejects.toThrow(/not supported in ACP mode/); + }); + + it.skipIf(!ACP_CLI_PATH)("should throw for resumeSession", async () => { + await client.start(); + + await expect(client.resumeSession("fake-session-id")).rejects.toThrow( + /not supported in ACP mode/ + ); + }); + + it.skipIf(!ACP_CLI_PATH)("should throw for session.getMessages", async () => { + await client.start(); + const session = await client.createSession(); + + await expect(session.getMessages()).rejects.toThrow(/not supported in ACP mode/); + }); + }); + + describe("cleanup", () => { + it.skipIf(!ACP_CLI_PATH)("should stop cleanly", async () => { + await client.start(); + await client.createSession(); + + const errors = await client.stop(); + + expect(errors.length).toBe(0); + expect(client.getState()).toBe("disconnected"); + }); + + it.skipIf(!ACP_CLI_PATH)("should force stop cleanly", async () => { + await client.start(); + await client.createSession(); + + await client.forceStop(); + + expect(client.getState()).toBe("disconnected"); + }); + }); +}); + +describe("ACP Protocol Selection", () => { + it("should use ACP adapter when protocol is 'acp'", () => { + const client = new CopilotClient({ + cliPath: "fake-cli", + protocol: "acp", + autoStart: false, + }); + + // Verify internal state - protocol should be set + expect((client as unknown as { options: { protocol: string } }).options.protocol).toBe( + "acp" + ); + }); + + it("should default to copilot protocol", () => { + const client = new CopilotClient({ + cliPath: "fake-cli", + autoStart: false, + }); + + expect((client as unknown as { options: { protocol: string } }).options.protocol).toBe( + "copilot" + ); + }); +}); diff --git a/nodejs/test/protocols/acp/acp-adapter.test.ts b/nodejs/test/protocols/acp/acp-adapter.test.ts new file mode 100644 index 00000000..ee4ff597 --- /dev/null +++ b/nodejs/test/protocols/acp/acp-adapter.test.ts @@ -0,0 +1,677 @@ +import { describe, expect, it, vi, beforeEach, afterEach } from "vitest"; +import { spawn } from "node:child_process"; +import { PassThrough } from "node:stream"; +import { AcpProtocolAdapter } from "../../../src/protocols/acp/acp-adapter.js"; +import type { CopilotClientOptions } from "../../../src/types.js"; +import { ACP_PROTOCOL_VERSION } from "../../../src/protocols/acp/acp-types.js"; + +// Mock child_process +vi.mock("node:child_process", () => ({ + spawn: vi.fn(), +})); + +describe("AcpProtocolAdapter", () => { + let mockProcess: { + stdin: PassThrough; + stdout: PassThrough; + stderr: PassThrough; + on: ReturnType; + kill: ReturnType; + removeAllListeners: ReturnType; + }; + let adapter: AcpProtocolAdapter; + let options: CopilotClientOptions; + + beforeEach(() => { + mockProcess = { + stdin: new PassThrough(), + stdout: new PassThrough(), + stderr: new PassThrough(), + on: vi.fn(), + kill: vi.fn(), + removeAllListeners: vi.fn(), + }; + + (spawn as ReturnType).mockReturnValue(mockProcess); + + options = { + cliPath: "gemini", + cliArgs: ["--experimental-acp"], + cwd: "/test/dir", + protocol: "acp", + }; + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + describe("start", () => { + it("should spawn CLI with correct arguments for ACP (no --headless, --stdio, --log-level)", async () => { + adapter = new AcpProtocolAdapter(options); + + // Don't await, just trigger start + void adapter.start(); + + // Simulate successful spawn - process stays running + await new Promise((resolve) => setImmediate(resolve)); + + // Verify spawn was called with only cliArgs (no SDK-managed args) + expect(spawn).toHaveBeenCalledWith( + "gemini", + ["--experimental-acp"], + expect.objectContaining({ + cwd: "/test/dir", + stdio: ["pipe", "pipe", "pipe"], + }) + ); + + // Clean up by simulating process ready + adapter.forceStop(); + }); + + it("should not add --headless flag for ACP mode", async () => { + adapter = new AcpProtocolAdapter(options); + + void adapter.start(); + await new Promise((resolve) => setImmediate(resolve)); + + const spawnCall = (spawn as ReturnType).mock.calls[0]; + const args = spawnCall[1] as string[]; + + expect(args).not.toContain("--headless"); + expect(args).not.toContain("--stdio"); + expect(args).not.toContain("--log-level"); + + adapter.forceStop(); + }); + }); + + describe("connection methods", () => { + beforeEach(() => { + adapter = new AcpProtocolAdapter(options); + }); + + afterEach(async () => { + await adapter.forceStop(); + }); + + it("should translate ping to initialize request", async () => { + void adapter.start(); + await new Promise((resolve) => setImmediate(resolve)); + + const connection = adapter.getConnection(); + connection.listen(); + + // Call ping + const pingPromise = connection.sendRequest("ping", { message: "test" }); + + // Read what was sent + const sentData = mockProcess.stdin.read(); + const sentMessage = JSON.parse(sentData.toString().trim()); + + expect(sentMessage.method).toBe("initialize"); + expect(sentMessage.params).toEqual({ protocolVersion: ACP_PROTOCOL_VERSION }); + + // Send response + const response = { + jsonrpc: "2.0", + id: sentMessage.id, + result: { protocolVersion: ACP_PROTOCOL_VERSION }, + }; + mockProcess.stdout.write(JSON.stringify(response) + "\n"); + + const result = await pingPromise; + expect(result).toEqual({ + message: "pong", + timestamp: expect.any(Number), + protocolVersion: ACP_PROTOCOL_VERSION, + }); + }); + + it("should translate session.create to session/new", async () => { + adapter.start(); + await new Promise((resolve) => setImmediate(resolve)); + + const connection = adapter.getConnection(); + connection.listen(); + + const createPromise = connection.sendRequest("session.create", { + model: "gemini-pro", + workingDirectory: "/test/project", + }); + + const sentData = mockProcess.stdin.read(); + const sentMessage = JSON.parse(sentData.toString().trim()); + + expect(sentMessage.method).toBe("session/new"); + expect(sentMessage.params).toEqual({ + cwd: "/test/project", + mcpServers: [], + }); + + // Send response for session/new + const response = { + jsonrpc: "2.0", + id: sentMessage.id, + result: { sessionId: "acp-session-123" }, + }; + mockProcess.stdout.write(JSON.stringify(response) + "\n"); + + // session.create with model triggers session/set_config_option + await new Promise((resolve) => setImmediate(resolve)); + const sentData2 = mockProcess.stdin.read(); + const sentMessage2 = JSON.parse(sentData2.toString().trim()); + expect(sentMessage2.method).toBe("session/set_config_option"); + expect(sentMessage2.params).toEqual({ + sessionId: "acp-session-123", + configId: "model", + value: "gemini-pro", + }); + + const response2 = { + jsonrpc: "2.0", + id: sentMessage2.id, + result: {}, + }; + mockProcess.stdout.write(JSON.stringify(response2) + "\n"); + + const result = await createPromise; + expect(result).toEqual({ sessionId: "acp-session-123" }); + }); + + it("should call session/set_config_option after session.create when model is provided", async () => { + adapter.start(); + await new Promise((resolve) => setImmediate(resolve)); + + const connection = adapter.getConnection(); + connection.listen(); + + const createPromise = connection.sendRequest("session.create", { + model: "claude-sonnet-4-5-20250929", + workingDirectory: "/test/project", + }); + + // Read the session/new request + const sentData1 = mockProcess.stdin.read(); + const sentMessage1 = JSON.parse(sentData1.toString().trim()); + expect(sentMessage1.method).toBe("session/new"); + + // Respond to session/new + const response1 = { + jsonrpc: "2.0", + id: sentMessage1.id, + result: { sessionId: "acp-session-456" }, + }; + mockProcess.stdout.write(JSON.stringify(response1) + "\n"); + + // Wait for the set_model request to be sent + await new Promise((resolve) => setImmediate(resolve)); + + // Read the session/set_config_option request + const sentData2 = mockProcess.stdin.read(); + const sentMessage2 = JSON.parse(sentData2.toString().trim()); + expect(sentMessage2.method).toBe("session/set_config_option"); + expect(sentMessage2.params).toEqual({ + sessionId: "acp-session-456", + configId: "model", + value: "claude-sonnet-4-5-20250929", + }); + + // Respond to session/set_config_option + const response2 = { + jsonrpc: "2.0", + id: sentMessage2.id, + result: {}, + }; + mockProcess.stdout.write(JSON.stringify(response2) + "\n"); + + const result = await createPromise; + expect(result).toEqual({ sessionId: "acp-session-456" }); + }); + + it("should not call session/set_config_option when model is not provided", async () => { + adapter.start(); + await new Promise((resolve) => setImmediate(resolve)); + + const connection = adapter.getConnection(); + connection.listen(); + + const createPromise = connection.sendRequest("session.create", { + workingDirectory: "/test/project", + }); + + // Read the session/new request + const sentData1 = mockProcess.stdin.read(); + const sentMessage1 = JSON.parse(sentData1.toString().trim()); + expect(sentMessage1.method).toBe("session/new"); + + // Respond to session/new + const response1 = { + jsonrpc: "2.0", + id: sentMessage1.id, + result: { sessionId: "acp-session-789" }, + }; + mockProcess.stdout.write(JSON.stringify(response1) + "\n"); + + const result = await createPromise; + expect(result).toEqual({ sessionId: "acp-session-789" }); + + // Verify no additional request was sent (no session/set_config_option) + await new Promise((resolve) => setImmediate(resolve)); + const extraData = mockProcess.stdin.read(); + expect(extraData).toBeNull(); + }); + + it("should translate session.send to session/prompt with prompt array", async () => { + adapter.start(); + await new Promise((resolve) => setImmediate(resolve)); + + const connection = adapter.getConnection(); + connection.listen(); + + const sendPromise = connection.sendRequest("session.send", { + sessionId: "sess-123", + prompt: "Hello, world!", + }); + + const sentData = mockProcess.stdin.read(); + const sentMessage = JSON.parse(sentData.toString().trim()); + + expect(sentMessage.method).toBe("session/prompt"); + expect(sentMessage.params).toEqual({ + sessionId: "sess-123", + prompt: [{ type: "text", text: "Hello, world!" }], + }); + + // Send response + const response = { + jsonrpc: "2.0", + id: sentMessage.id, + result: { messageId: "msg-456" }, + }; + mockProcess.stdout.write(JSON.stringify(response) + "\n"); + + const result = await sendPromise; + expect(result).toEqual({ messageId: "msg-456" }); + }); + + it("should emit session.idle when response contains stopReason end_turn", async () => { + adapter.start(); + await new Promise((resolve) => setImmediate(resolve)); + + const connection = adapter.getConnection(); + connection.listen(); + + const eventHandler = vi.fn(); + connection.onNotification("session.event", eventHandler); + + const sendPromise = connection.sendRequest("session.send", { + sessionId: "sess-123", + prompt: "Hello!", + }); + + const sentData = mockProcess.stdin.read(); + const sentMessage = JSON.parse(sentData.toString().trim()); + + // Gemini returns stopReason in the response instead of a separate notification + const response = { + jsonrpc: "2.0", + id: sentMessage.id, + result: { stopReason: "end_turn" }, + }; + mockProcess.stdout.write(JSON.stringify(response) + "\n"); + + await sendPromise; + + // Wait for queueMicrotask to execute + await new Promise((resolve) => setImmediate(resolve)); + + expect(eventHandler).toHaveBeenCalled(); + const callArg = eventHandler.mock.calls[0][0]; + expect(callArg.sessionId).toBe("sess-123"); + expect(callArg.event.type).toBe("session.idle"); + expect(callArg.event.data).toEqual({}); + expect(callArg.event.ephemeral).toBe(true); + }); + + it("should throw for unsupported methods", async () => { + adapter.start(); + await new Promise((resolve) => setImmediate(resolve)); + + const connection = adapter.getConnection(); + connection.listen(); + + await expect(connection.sendRequest("models.list", {})).rejects.toThrow( + /not supported in ACP mode/ + ); + + await expect(connection.sendRequest("session.resume", {})).rejects.toThrow( + /not supported in ACP mode/ + ); + + await expect(connection.sendRequest("session.getMessages", {})).rejects.toThrow( + /not supported in ACP mode/ + ); + }); + }); + + describe("notification handling", () => { + beforeEach(() => { + adapter = new AcpProtocolAdapter(options); + }); + + afterEach(async () => { + await adapter.forceStop(); + }); + + it("should translate session/update notifications to session.event format", async () => { + adapter.start(); + await new Promise((resolve) => setImmediate(resolve)); + + const connection = adapter.getConnection(); + connection.listen(); + + const eventHandler = vi.fn(); + connection.onNotification("session.event", eventHandler); + + // Send ACP update notification (Gemini format with nested update object) + const acpNotification = { + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId: "sess-123", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "Hello" }, + }, + }, + }; + mockProcess.stdout.write(JSON.stringify(acpNotification) + "\n"); + + await new Promise((resolve) => setImmediate(resolve)); + + expect(eventHandler).toHaveBeenCalled(); + const callArg = eventHandler.mock.calls[0][0]; + expect(callArg.sessionId).toBe("sess-123"); + expect(callArg.event.type).toBe("assistant.message_delta"); + expect(callArg.event.data.deltaContent).toBe("Hello"); + expect(callArg.event.ephemeral).toBe(true); + expect(callArg.event.id).toBeDefined(); + expect(callArg.event.timestamp).toBeDefined(); + }); + + it("should translate end_turn to session.idle", async () => { + adapter.start(); + await new Promise((resolve) => setImmediate(resolve)); + + const connection = adapter.getConnection(); + connection.listen(); + + const eventHandler = vi.fn(); + connection.onNotification("session.event", eventHandler); + + // Gemini format for end_turn + const acpNotification = { + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId: "sess-123", + update: { + sessionUpdate: "end_turn", + }, + }, + }; + mockProcess.stdout.write(JSON.stringify(acpNotification) + "\n"); + + await new Promise((resolve) => setImmediate(resolve)); + + expect(eventHandler).toHaveBeenCalled(); + const callArg = eventHandler.mock.calls[0][0]; + expect(callArg.sessionId).toBe("sess-123"); + expect(callArg.event.type).toBe("session.idle"); + expect(callArg.event.data).toEqual({}); + expect(callArg.event.ephemeral).toBe(true); + }); + }); + + describe("tool_call handling", () => { + beforeEach(() => { + adapter = new AcpProtocolAdapter(options); + }); + + afterEach(async () => { + await adapter.forceStop(); + }); + + it("should translate tool_call updates to tool events", async () => { + adapter.start(); + await new Promise((resolve) => setImmediate(resolve)); + + const connection = adapter.getConnection(); + connection.listen(); + + const eventHandler = vi.fn(); + connection.onNotification("session.event", eventHandler); + + // Send ACP tool_call notification + const acpNotification = { + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId: "sess-123", + update: { + sessionUpdate: "tool_call", + toolCallId: "tool-456", + title: "Reading file", + kind: "file_read", + status: "running", + rawInput: { path: "test.txt" }, + }, + }, + }; + mockProcess.stdout.write(JSON.stringify(acpNotification) + "\n"); + + await new Promise((resolve) => setImmediate(resolve)); + + expect(eventHandler).toHaveBeenCalled(); + const callArg = eventHandler.mock.calls[0][0]; + expect(callArg.sessionId).toBe("sess-123"); + expect(callArg.event.type).toBe("tool.execution_start"); + expect(callArg.event.data.toolCallId).toBe("tool-456"); + expect(callArg.event.data.toolName).toBe("file_read"); + }); + + it("should translate tool_call_update with completed status", async () => { + adapter.start(); + await new Promise((resolve) => setImmediate(resolve)); + + const connection = adapter.getConnection(); + connection.listen(); + + const eventHandler = vi.fn(); + connection.onNotification("session.event", eventHandler); + + const acpNotification = { + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId: "sess-123", + update: { + sessionUpdate: "tool_call_update", + toolCallId: "tool-456", + status: "completed", + content: [{ type: "text", text: "File content here" }], + }, + }, + }; + mockProcess.stdout.write(JSON.stringify(acpNotification) + "\n"); + + await new Promise((resolve) => setImmediate(resolve)); + + expect(eventHandler).toHaveBeenCalled(); + const callArg = eventHandler.mock.calls[0][0]; + expect(callArg.event.type).toBe("tool.execution_complete"); + expect(callArg.event.data.toolCallId).toBe("tool-456"); + expect(callArg.event.data.success).toBe(true); + expect(callArg.event.data.result.content).toBe("File content here"); + }); + }); + + describe("permission request handling", () => { + beforeEach(() => { + adapter = new AcpProtocolAdapter(options); + }); + + afterEach(async () => { + await adapter.forceStop(); + }); + + it("should handle session/request_permission and return user choice", async () => { + adapter.start(); + await new Promise((resolve) => setImmediate(resolve)); + + const connection = adapter.getConnection(); + connection.listen(); + + // Register a permission handler that always allows + const permissionHandler = vi.fn().mockResolvedValue({ + result: { optionId: "allow_once" }, + }); + connection.onRequest("permission.request", permissionHandler); + + // Send ACP permission request + const acpRequest = { + jsonrpc: "2.0", + id: 100, + method: "session/request_permission", + params: { + sessionId: "sess-123", + toolCall: { + toolCallId: "tool-456", + title: "Write to file", + kind: "file_edit", + rawInput: { path: "test.txt" }, + }, + options: [ + { optionId: "allow_once", name: "Allow once", kind: "allow_once" }, + { optionId: "reject_once", name: "Reject", kind: "reject_once" }, + ], + }, + }; + mockProcess.stdout.write(JSON.stringify(acpRequest) + "\n"); + + await new Promise((resolve) => setImmediate(resolve)); + + // Check that the permission handler was called + expect(permissionHandler).toHaveBeenCalled(); + + // Read the response sent back + const sentData = mockProcess.stdin.read(); + if (sentData) { + const sentMessage = JSON.parse(sentData.toString().trim()); + expect(sentMessage.id).toBe(100); + expect(sentMessage.result).toMatchObject({ + outcome: "selected", + optionId: "allow_once", + }); + } + }); + + it("should return cancelled when permission handler rejects", async () => { + adapter.start(); + await new Promise((resolve) => setImmediate(resolve)); + + const connection = adapter.getConnection(); + connection.listen(); + + // Register a permission handler that rejects + const permissionHandler = vi.fn().mockRejectedValue(new Error("User cancelled")); + connection.onRequest("permission.request", permissionHandler); + + // Send ACP permission request + const acpRequest = { + jsonrpc: "2.0", + id: 101, + method: "session/request_permission", + params: { + sessionId: "sess-123", + toolCall: { + toolCallId: "tool-789", + title: "Delete file", + kind: "file_edit", + }, + options: [{ optionId: "allow_once", name: "Allow", kind: "allow_once" }], + }, + }; + mockProcess.stdout.write(JSON.stringify(acpRequest) + "\n"); + + await new Promise((resolve) => setImmediate(resolve)); + + // Read the response - should indicate cancelled + const sentData = mockProcess.stdin.read(); + if (sentData) { + const sentMessage = JSON.parse(sentData.toString().trim()); + expect(sentMessage.id).toBe(101); + expect(sentMessage.result.outcome).toBe("cancelled"); + } + }); + }); + + describe("verifyProtocolVersion", () => { + beforeEach(() => { + adapter = new AcpProtocolAdapter(options); + }); + + afterEach(async () => { + await adapter.forceStop(); + }); + + it("should verify ACP protocol version", async () => { + adapter.start(); + await new Promise((resolve) => setImmediate(resolve)); + + const connection = adapter.getConnection(); + connection.listen(); + + const verifyPromise = adapter.verifyProtocolVersion(); + + // Read the initialize request + const sentData = mockProcess.stdin.read(); + const sentMessage = JSON.parse(sentData.toString().trim()); + + // Respond with matching version + const response = { + jsonrpc: "2.0", + id: sentMessage.id, + result: { protocolVersion: ACP_PROTOCOL_VERSION }, + }; + mockProcess.stdout.write(JSON.stringify(response) + "\n"); + + await expect(verifyPromise).resolves.toBeUndefined(); + }); + + it("should reject on version mismatch", async () => { + adapter.start(); + await new Promise((resolve) => setImmediate(resolve)); + + const connection = adapter.getConnection(); + connection.listen(); + + const verifyPromise = adapter.verifyProtocolVersion(); + + const sentData = mockProcess.stdin.read(); + const sentMessage = JSON.parse(sentData.toString().trim()); + + // Respond with different version + const response = { + jsonrpc: "2.0", + id: sentMessage.id, + result: { protocolVersion: 999 }, + }; + mockProcess.stdout.write(JSON.stringify(response) + "\n"); + + await expect(verifyPromise).rejects.toThrow(/version mismatch/i); + }); + }); +}); diff --git a/nodejs/test/protocols/acp/acp-mapper.test.ts b/nodejs/test/protocols/acp/acp-mapper.test.ts new file mode 100644 index 00000000..eaec56bb --- /dev/null +++ b/nodejs/test/protocols/acp/acp-mapper.test.ts @@ -0,0 +1,396 @@ +import { describe, expect, it } from "vitest"; +import { + stringToAcpContent, + acpUpdateToSessionEvent, + acpToolCallToSessionEvent, + copilotSessionConfigToAcpParams, + copilotMessageOptionsToAcpParams, +} from "../../../src/protocols/acp/acp-mapper.js"; +import type { + AcpAgentMessageChunkParams, + AcpAgentThoughtChunkParams, + AcpAgentMessageParams, + AcpEndTurnParams, + AcpErrorParams, + AcpToolCallUpdateInner, + AcpToolCallUpdateUpdateInner, +} from "../../../src/protocols/acp/acp-types.js"; + +describe("AcpMapper", () => { + describe("stringToAcpContent", () => { + it("should convert string prompt to ACP content array", () => { + const result = stringToAcpContent("Hello, world!"); + + expect(result).toEqual([{ type: "text", text: "Hello, world!" }]); + }); + + it("should handle empty string", () => { + const result = stringToAcpContent(""); + + expect(result).toEqual([{ type: "text", text: "" }]); + }); + + it("should handle multi-line string", () => { + const result = stringToAcpContent("Line 1\nLine 2\nLine 3"); + + expect(result).toEqual([{ type: "text", text: "Line 1\nLine 2\nLine 3" }]); + }); + }); + + describe("acpUpdateToSessionEvent", () => { + it("should map agent_message_chunk to assistant.message_delta", () => { + const update: AcpAgentMessageChunkParams = { + sessionId: "sess-123", + type: "agent_message_chunk", + messageId: "msg-456", + content: "Hello", + }; + + const event = acpUpdateToSessionEvent(update); + + expect(event).toMatchObject({ + type: "assistant.message_delta", + ephemeral: true, + data: { + messageId: "msg-456", + deltaContent: "Hello", + }, + }); + expect(event).toHaveProperty("id"); + expect(event).toHaveProperty("timestamp"); + expect(event).toHaveProperty("parentId", null); + }); + + it("should map agent_thought_chunk to assistant.reasoning_delta", () => { + const update: AcpAgentThoughtChunkParams = { + sessionId: "sess-123", + type: "agent_thought_chunk", + messageId: "msg-456", + content: "Let me think...", + }; + + const event = acpUpdateToSessionEvent(update); + + expect(event).toMatchObject({ + type: "assistant.reasoning_delta", + ephemeral: true, + data: { + reasoningId: "msg-456", + deltaContent: "Let me think...", + }, + }); + expect(event).toHaveProperty("id"); + expect(event).toHaveProperty("timestamp"); + }); + + it("should map agent_message to assistant.message", () => { + const update: AcpAgentMessageParams = { + sessionId: "sess-123", + type: "agent_message", + messageId: "msg-456", + content: "Complete response here", + }; + + const event = acpUpdateToSessionEvent(update); + + expect(event).toMatchObject({ + type: "assistant.message", + data: { + messageId: "msg-456", + content: "Complete response here", + toolRequests: [], + }, + }); + expect(event).toHaveProperty("id"); + expect(event).toHaveProperty("timestamp"); + }); + + it("should map end_turn to session.idle", () => { + const update: AcpEndTurnParams = { + sessionId: "sess-123", + type: "end_turn", + }; + + const event = acpUpdateToSessionEvent(update); + + expect(event).toMatchObject({ + type: "session.idle", + ephemeral: true, + data: {}, + }); + expect(event).toHaveProperty("id"); + expect(event).toHaveProperty("timestamp"); + }); + + it("should map error to session.error", () => { + const update: AcpErrorParams = { + sessionId: "sess-123", + type: "error", + message: "Something went wrong", + code: "INTERNAL_ERROR", + }; + + const event = acpUpdateToSessionEvent(update); + + expect(event).toMatchObject({ + type: "session.error", + data: { + errorType: "internal", + message: "Something went wrong", + }, + }); + expect(event).toHaveProperty("id"); + expect(event).toHaveProperty("timestamp"); + }); + + it("should return null for unknown update type", () => { + const update = { + sessionId: "sess-123", + type: "unknown_type" as const, + } as unknown as AcpAgentMessageChunkParams; + + const event = acpUpdateToSessionEvent(update); + + expect(event).toBeNull(); + }); + }); + + describe("copilotSessionConfigToAcpParams", () => { + it("should convert basic session config", () => { + const result = copilotSessionConfigToAcpParams({}); + + expect(result).toEqual({}); + }); + + it("should include cwd from workingDirectory", () => { + const result = copilotSessionConfigToAcpParams({ + workingDirectory: "/home/user/project", + }); + + expect(result).toEqual({ + cwd: "/home/user/project", + }); + }); + + it("should convert mcpServers to ACP format", () => { + const result = copilotSessionConfigToAcpParams({ + mcpServers: { + myServer: { + type: "local", + command: "node", + args: ["server.js"], + tools: ["*"], + env: { DEBUG: "true" }, + }, + }, + }); + + expect(result).toEqual({ + mcpServers: { + myServer: { + command: "node", + args: ["server.js"], + env: { DEBUG: "true" }, + }, + }, + }); + }); + + it("should filter out remote MCP servers", () => { + const result = copilotSessionConfigToAcpParams({ + mcpServers: { + localServer: { + type: "local", + command: "node", + args: ["local.js"], + tools: ["*"], + }, + remoteServer: { + type: "http", + url: "http://example.com", + tools: ["*"], + }, + }, + }); + + expect(result).toEqual({ + mcpServers: { + localServer: { + command: "node", + args: ["local.js"], + }, + }, + }); + }); + }); + + describe("copilotMessageOptionsToAcpParams", () => { + it("should convert prompt to prompt array", () => { + const result = copilotMessageOptionsToAcpParams("sess-123", { + prompt: "Hello!", + }); + + expect(result).toEqual({ + sessionId: "sess-123", + prompt: [{ type: "text", text: "Hello!" }], + }); + }); + + it("should handle prompt with attachments (attachments ignored in ACP)", () => { + const result = copilotMessageOptionsToAcpParams("sess-123", { + prompt: "Analyze this file", + attachments: [{ type: "file", path: "/path/to/file.ts" }], + }); + + // ACP doesn't support attachments in the same way, so we just include prompt + expect(result).toEqual({ + sessionId: "sess-123", + prompt: [{ type: "text", text: "Analyze this file" }], + }); + }); + }); + + describe("acpToolCallToSessionEvent", () => { + it("should map tool_call with status running to tool.execution_start", () => { + const update: AcpToolCallUpdateInner = { + sessionUpdate: "tool_call", + toolCallId: "tool-123", + title: "Reading file config.json", + kind: "file_read", + status: "running", + rawInput: { path: "config.json" }, + }; + + const event = acpToolCallToSessionEvent(update); + + expect(event).toMatchObject({ + type: "tool.execution_start", + data: { + toolCallId: "tool-123", + toolName: "file_read", + arguments: { path: "config.json" }, + }, + }); + expect(event).toHaveProperty("id"); + expect(event).toHaveProperty("timestamp"); + expect(event).toHaveProperty("parentId", null); + }); + + it("should map tool_call_update with status completed to tool.execution_complete", () => { + const update: AcpToolCallUpdateUpdateInner = { + sessionUpdate: "tool_call_update", + toolCallId: "tool-123", + status: "completed", + rawOutput: { success: true }, + content: [{ type: "text", text: "File content here" }], + }; + + const event = acpToolCallToSessionEvent(update); + + expect(event).toMatchObject({ + type: "tool.execution_complete", + data: { + toolCallId: "tool-123", + success: true, + result: { + content: "File content here", + }, + }, + }); + expect(event).toHaveProperty("id"); + expect(event).toHaveProperty("timestamp"); + }); + + it("should map tool_call_update with status failed to tool.execution_complete with error", () => { + const update: AcpToolCallUpdateUpdateInner = { + sessionUpdate: "tool_call_update", + toolCallId: "tool-123", + status: "failed", + content: [{ type: "text", text: "File not found" }], + }; + + const event = acpToolCallToSessionEvent(update); + + expect(event).toMatchObject({ + type: "tool.execution_complete", + data: { + toolCallId: "tool-123", + success: false, + error: { + message: "File not found", + }, + }, + }); + expect(event).toHaveProperty("id"); + expect(event).toHaveProperty("timestamp"); + }); + + it("should map tool_call_update with status running to tool.execution_progress", () => { + const update: AcpToolCallUpdateUpdateInner = { + sessionUpdate: "tool_call_update", + toolCallId: "tool-123", + status: "running", + content: [{ type: "text", text: "Processing 50%..." }], + }; + + const event = acpToolCallToSessionEvent(update); + + expect(event).toMatchObject({ + type: "tool.execution_progress", + ephemeral: true, + data: { + toolCallId: "tool-123", + progressMessage: "Processing 50%...", + }, + }); + expect(event).toHaveProperty("id"); + expect(event).toHaveProperty("timestamp"); + }); + + it("should handle tool_call with completed status directly", () => { + const update: AcpToolCallUpdateInner = { + sessionUpdate: "tool_call", + toolCallId: "tool-456", + title: "Quick command", + kind: "command", + status: "completed", + rawInput: { command: "echo hello" }, + rawOutput: { exitCode: 0 }, + content: [{ type: "text", text: "hello" }], + }; + + const event = acpToolCallToSessionEvent(update); + + expect(event).toMatchObject({ + type: "tool.execution_complete", + data: { + toolCallId: "tool-456", + success: true, + result: { + content: "hello", + }, + }, + }); + }); + + it("should handle tool_call with no content", () => { + const update: AcpToolCallUpdateInner = { + sessionUpdate: "tool_call", + toolCallId: "tool-789", + title: "Silent operation", + kind: "other", + status: "running", + }; + + const event = acpToolCallToSessionEvent(update); + + expect(event).toMatchObject({ + type: "tool.execution_start", + data: { + toolCallId: "tool-789", + toolName: "other", + }, + }); + }); + }); +}); diff --git a/nodejs/test/protocols/acp/acp-transport.test.ts b/nodejs/test/protocols/acp/acp-transport.test.ts new file mode 100644 index 00000000..27bba4af --- /dev/null +++ b/nodejs/test/protocols/acp/acp-transport.test.ts @@ -0,0 +1,278 @@ +import { describe, expect, it, vi, beforeEach } from "vitest"; +import { PassThrough } from "node:stream"; +import { AcpTransport } from "../../../src/protocols/acp/acp-transport.js"; + +describe("AcpTransport", () => { + let inputStream: PassThrough; + let outputStream: PassThrough; + let transport: AcpTransport; + + beforeEach(() => { + inputStream = new PassThrough(); + outputStream = new PassThrough(); + transport = new AcpTransport(inputStream, outputStream); + }); + + describe("NDJSON parsing", () => { + it("should parse a single complete JSON message", async () => { + const handler = vi.fn(); + transport.onMessage(handler); + transport.listen(); + + const message = { jsonrpc: "2.0", id: 1, method: "test" }; + inputStream.write(JSON.stringify(message) + "\n"); + + // Give time for the message to be processed + await new Promise((resolve) => setImmediate(resolve)); + + expect(handler).toHaveBeenCalledWith(message); + }); + + it("should parse multiple messages in one chunk", async () => { + const handler = vi.fn(); + transport.onMessage(handler); + transport.listen(); + + const message1 = { jsonrpc: "2.0", id: 1, method: "test1" }; + const message2 = { jsonrpc: "2.0", id: 2, method: "test2" }; + inputStream.write(JSON.stringify(message1) + "\n" + JSON.stringify(message2) + "\n"); + + await new Promise((resolve) => setImmediate(resolve)); + + expect(handler).toHaveBeenCalledTimes(2); + expect(handler).toHaveBeenNthCalledWith(1, message1); + expect(handler).toHaveBeenNthCalledWith(2, message2); + }); + + it("should handle partial messages across chunks", async () => { + const handler = vi.fn(); + transport.onMessage(handler); + transport.listen(); + + const message = { jsonrpc: "2.0", id: 1, method: "test", params: { data: "value" } }; + const jsonStr = JSON.stringify(message); + + // Split the message across two chunks + const mid = Math.floor(jsonStr.length / 2); + inputStream.write(jsonStr.slice(0, mid)); + + await new Promise((resolve) => setImmediate(resolve)); + expect(handler).not.toHaveBeenCalled(); + + inputStream.write(jsonStr.slice(mid) + "\n"); + + await new Promise((resolve) => setImmediate(resolve)); + expect(handler).toHaveBeenCalledWith(message); + }); + + it("should handle messages split across newline", async () => { + const handler = vi.fn(); + transport.onMessage(handler); + transport.listen(); + + const message = { jsonrpc: "2.0", id: 1, method: "test" }; + inputStream.write(JSON.stringify(message)); + + await new Promise((resolve) => setImmediate(resolve)); + expect(handler).not.toHaveBeenCalled(); + + inputStream.write("\n"); + + await new Promise((resolve) => setImmediate(resolve)); + expect(handler).toHaveBeenCalledWith(message); + }); + + it("should skip empty lines", async () => { + const handler = vi.fn(); + transport.onMessage(handler); + transport.listen(); + + const message = { jsonrpc: "2.0", id: 1, method: "test" }; + inputStream.write("\n\n" + JSON.stringify(message) + "\n\n"); + + await new Promise((resolve) => setImmediate(resolve)); + + expect(handler).toHaveBeenCalledTimes(1); + expect(handler).toHaveBeenCalledWith(message); + }); + + it("should emit error for invalid JSON", async () => { + const errorHandler = vi.fn(); + transport.onError(errorHandler); + transport.listen(); + + inputStream.write("not valid json\n"); + + await new Promise((resolve) => setImmediate(resolve)); + + expect(errorHandler).toHaveBeenCalled(); + expect(errorHandler.mock.calls[0][0]).toBeInstanceOf(Error); + }); + }); + + describe("sending messages", () => { + it("should write JSON followed by newline", () => { + const message = { jsonrpc: "2.0", id: 1, method: "test" }; + transport.send(message); + + const written = outputStream.read(); + expect(written.toString()).toBe(JSON.stringify(message) + "\n"); + }); + + it("should write multiple messages", () => { + const message1 = { jsonrpc: "2.0", id: 1, method: "test1" }; + const message2 = { jsonrpc: "2.0", id: 2, method: "test2" }; + + transport.send(message1); + transport.send(message2); + + const written = outputStream.read(); + expect(written.toString()).toBe( + JSON.stringify(message1) + "\n" + JSON.stringify(message2) + "\n" + ); + }); + }); + + describe("request/response matching", () => { + it("should resolve request when response arrives", async () => { + transport.listen(); + + const requestPromise = transport.sendRequest(1, "test.method", { arg: "value" }); + + // Simulate response + const response = { jsonrpc: "2.0", id: 1, result: { data: "response" } }; + inputStream.write(JSON.stringify(response) + "\n"); + + const result = await requestPromise; + expect(result).toEqual({ data: "response" }); + }); + + it("should reject request when error response arrives", async () => { + transport.listen(); + + const requestPromise = transport.sendRequest(2, "test.method"); + + const errorResponse = { + jsonrpc: "2.0", + id: 2, + error: { code: -32600, message: "Invalid request" }, + }; + inputStream.write(JSON.stringify(errorResponse) + "\n"); + + await expect(requestPromise).rejects.toThrow("Invalid request"); + }); + + it("should handle multiple concurrent requests", async () => { + transport.listen(); + + const promise1 = transport.sendRequest(1, "method1"); + const promise2 = transport.sendRequest(2, "method2"); + const promise3 = transport.sendRequest(3, "method3"); + + // Respond out of order + inputStream.write(JSON.stringify({ jsonrpc: "2.0", id: 2, result: "result2" }) + "\n"); + inputStream.write(JSON.stringify({ jsonrpc: "2.0", id: 3, result: "result3" }) + "\n"); + inputStream.write(JSON.stringify({ jsonrpc: "2.0", id: 1, result: "result1" }) + "\n"); + + expect(await promise1).toBe("result1"); + expect(await promise2).toBe("result2"); + expect(await promise3).toBe("result3"); + }); + }); + + describe("notification handling", () => { + it("should dispatch notifications to handlers", async () => { + const handler = vi.fn(); + transport.onNotification("session/update", handler); + transport.listen(); + + const notification = { + jsonrpc: "2.0", + method: "session/update", + params: { sessionId: "123", type: "end_turn" }, + }; + inputStream.write(JSON.stringify(notification) + "\n"); + + await new Promise((resolve) => setImmediate(resolve)); + + expect(handler).toHaveBeenCalledWith({ sessionId: "123", type: "end_turn" }); + }); + + it("should handle multiple notification handlers for same method", async () => { + const handler1 = vi.fn(); + const handler2 = vi.fn(); + transport.onNotification("session/update", handler1); + transport.onNotification("session/update", handler2); + transport.listen(); + + const notification = { + jsonrpc: "2.0", + method: "session/update", + params: { data: "test" }, + }; + inputStream.write(JSON.stringify(notification) + "\n"); + + await new Promise((resolve) => setImmediate(resolve)); + + expect(handler1).toHaveBeenCalledWith({ data: "test" }); + expect(handler2).toHaveBeenCalledWith({ data: "test" }); + }); + + it("should not call handlers for unregistered notifications", async () => { + const handler = vi.fn(); + transport.onNotification("session/update", handler); + transport.listen(); + + const notification = { + jsonrpc: "2.0", + method: "other/method", + params: {}, + }; + inputStream.write(JSON.stringify(notification) + "\n"); + + await new Promise((resolve) => setImmediate(resolve)); + + expect(handler).not.toHaveBeenCalled(); + }); + }); + + describe("close handling", () => { + it("should emit close when input stream ends", async () => { + const closeHandler = vi.fn(); + transport.onClose(closeHandler); + transport.listen(); + + inputStream.end(); + + await new Promise((resolve) => setImmediate(resolve)); + + expect(closeHandler).toHaveBeenCalled(); + }); + + it("should reject pending requests when stream closes", async () => { + transport.listen(); + + const requestPromise = transport.sendRequest(1, "test.method"); + + inputStream.end(); + + await expect(requestPromise).rejects.toThrow(/closed|ended/i); + }); + }); + + describe("dispose", () => { + it("should stop listening for messages", async () => { + const handler = vi.fn(); + transport.onMessage(handler); + transport.listen(); + + transport.dispose(); + + inputStream.write(JSON.stringify({ jsonrpc: "2.0", id: 1 }) + "\n"); + + await new Promise((resolve) => setImmediate(resolve)); + + expect(handler).not.toHaveBeenCalled(); + }); + }); +});