diff --git a/src/api/providers/__tests__/openai.spec.ts b/src/api/providers/__tests__/openai.spec.ts index 73b542dbc73..c75279de282 100644 --- a/src/api/providers/__tests__/openai.spec.ts +++ b/src/api/providers/__tests__/openai.spec.ts @@ -1280,3 +1280,256 @@ describe("getOpenAiModels", () => { expect(result).toEqual(["gpt-4", "gpt-3.5-turbo"]) }) }) + +describe("Ollama streaming patterns", () => { + // These tests verify that the OpenAI handler correctly processes + // Ollama's non-standard streaming format for thinking models like kimi-k2.5. + // Ollama differs from OpenAI in several ways: + // 1. Tool call IDs use "functions.tool_name:N" format (not "call_xxx") + // 2. Entire tool calls arrive in a single chunk (not incrementally) + // 3. reasoning_content field for thinking tokens + // 4. role: "assistant" on every delta (not just first) + // 5. content: "" (empty string) on deltas + + let handler: OpenAiHandler + const systemPrompt = "You are a helpful assistant." + const messages: Anthropic.Messages.MessageParam[] = [ + { role: "user", content: [{ type: "text" as const, text: "Read /etc/hostname" }] }, + ] + + beforeEach(() => { + handler = new OpenAiHandler({ + openAiApiKey: "test-key", + openAiModelId: "kimi-k2.5:cloud", + openAiBaseUrl: "http://localhost:4141/v1", + }) + mockCreate.mockClear() + }) + + it("should yield reasoning chunks from reasoning_content field", async () => { + mockCreate.mockImplementation(async () => ({ + [Symbol.asyncIterator]: async function* () { + yield { + choices: [ + { + delta: { role: "assistant", reasoning_content: "Let me think about " }, + finish_reason: null, + }, + ], + } + yield { + choices: [ + { + delta: { role: "assistant", reasoning_content: "this request." }, + finish_reason: null, + }, + ], + } + yield { + choices: [ + { + delta: { role: "assistant", content: "Here is the result." }, + finish_reason: null, + }, + ], + } + yield { + choices: [ + { + delta: {}, + finish_reason: "stop", + }, + ], + usage: { prompt_tokens: 10, completion_tokens: 20, total_tokens: 30 }, + } + }, + })) + + const stream = handler.createMessage(systemPrompt, messages) + const chunks: any[] = [] + for await (const chunk of stream) { + chunks.push(chunk) + } + + const reasoningChunks = chunks.filter((c) => c.type === "reasoning") + expect(reasoningChunks).toHaveLength(2) + expect(reasoningChunks[0].text).toBe("Let me think about ") + expect(reasoningChunks[1].text).toBe("this request.") + + const textChunks = chunks.filter((c) => c.type === "text") + expect(textChunks).toHaveLength(1) + expect(textChunks[0].text).toBe("Here is the result.") + }) + + it("should yield reasoning chunks from non-standard 'reasoning' field (Ollama)", async () => { + mockCreate.mockImplementation(async () => ({ + [Symbol.asyncIterator]: async function* () { + yield { + choices: [ + { + delta: { role: "assistant", reasoning: "Thinking via Ollama field" }, + finish_reason: null, + }, + ], + } + yield { + choices: [ + { + delta: { role: "assistant", content: "Done." }, + finish_reason: null, + }, + ], + } + yield { + choices: [{ delta: {}, finish_reason: "stop" }], + usage: { prompt_tokens: 5, completion_tokens: 10, total_tokens: 15 }, + } + }, + })) + + const stream = handler.createMessage(systemPrompt, messages) + const chunks: any[] = [] + for await (const chunk of stream) { + chunks.push(chunk) + } + + const reasoningChunks = chunks.filter((c) => c.type === "reasoning") + expect(reasoningChunks).toHaveLength(1) + expect(reasoningChunks[0].text).toBe("Thinking via Ollama field") + }) + + it("should handle Ollama single-chunk tool call with non-standard ID", async () => { + // This is the exact pattern Ollama sends for kimi-k2.5 tool calls: + // reasoning chunks → single tool call chunk → finish_reason: "tool_calls" + mockCreate.mockImplementation(async () => ({ + [Symbol.asyncIterator]: async function* () { + // Reasoning chunks + yield { + choices: [ + { + delta: { role: "assistant", reasoning_content: "I should read the file." }, + finish_reason: null, + }, + ], + } + // Single chunk with complete tool call (Ollama style) + yield { + choices: [ + { + delta: { + role: "assistant", + tool_calls: [ + { + index: 0, + id: "functions.read_file:0", + type: "function", + function: { + name: "read_file", + arguments: '{"path":"/etc/hostname"}', + }, + }, + ], + }, + finish_reason: null, + }, + ], + } + // Empty delta then finish + yield { + choices: [ + { + delta: { role: "assistant" }, + finish_reason: "tool_calls", + }, + ], + } + // Usage + yield { + choices: [], + usage: { prompt_tokens: 57, completion_tokens: 44, total_tokens: 101 }, + } + }, + })) + + const stream = handler.createMessage(systemPrompt, messages) + const chunks: any[] = [] + for await (const chunk of stream) { + chunks.push(chunk) + } + + // Should have reasoning chunk + const reasoningChunks = chunks.filter((c) => c.type === "reasoning") + expect(reasoningChunks).toHaveLength(1) + expect(reasoningChunks[0].text).toBe("I should read the file.") + + // Should have tool_call_partial with Ollama-style ID + const toolPartials = chunks.filter((c) => c.type === "tool_call_partial") + expect(toolPartials).toHaveLength(1) + expect(toolPartials[0]).toEqual({ + type: "tool_call_partial", + index: 0, + id: "functions.read_file:0", + name: "read_file", + arguments: '{"path":"/etc/hostname"}', + }) + + // Should have tool_call_end when finish_reason is "tool_calls" + const toolEnds = chunks.filter((c) => c.type === "tool_call_end") + expect(toolEnds).toHaveLength(1) + expect(toolEnds[0].id).toBe("functions.read_file:0") + + // Should have usage + const usageChunks = chunks.filter((c) => c.type === "usage") + expect(usageChunks).toHaveLength(1) + }) + + it("should handle non-streaming Ollama response with reasoning and tool calls", async () => { + mockCreate.mockResolvedValueOnce({ + choices: [ + { + message: { + role: "assistant", + content: null, + reasoning_content: "Let me read the file.", + tool_calls: [ + { + id: "functions.read_file:0", + type: "function", + function: { + name: "read_file", + arguments: '{"path":"/etc/hostname"}', + }, + }, + ], + }, + finish_reason: "tool_calls", + }, + ], + usage: { prompt_tokens: 50, completion_tokens: 40, total_tokens: 90 }, + }) + + const nonStreamHandler = new OpenAiHandler({ + openAiApiKey: "test-key", + openAiModelId: "kimi-k2.5:cloud", + openAiBaseUrl: "http://localhost:4141/v1", + openAiStreamingEnabled: false, + }) + + const stream = nonStreamHandler.createMessage(systemPrompt, messages) + const chunks: any[] = [] + for await (const chunk of stream) { + chunks.push(chunk) + } + + // Should have tool_call + const toolCalls = chunks.filter((c) => c.type === "tool_call") + expect(toolCalls).toHaveLength(1) + expect(toolCalls[0].id).toBe("functions.read_file:0") + expect(toolCalls[0].name).toBe("read_file") + + // Should have reasoning + const reasoningChunks = chunks.filter((c) => c.type === "reasoning") + expect(reasoningChunks).toHaveLength(1) + expect(reasoningChunks[0].text).toBe("Let me read the file.") + }) +}) diff --git a/src/api/providers/openai.ts b/src/api/providers/openai.ts index 33b29abcafe..a4ccd47f66f 100644 --- a/src/api/providers/openai.ts +++ b/src/api/providers/openai.ts @@ -199,10 +199,16 @@ export class OpenAiHandler extends BaseProvider implements SingleCompletionHandl } } - if ("reasoning_content" in delta && delta.reasoning_content) { + // Handle reasoning/thinking tokens from various providers: + // - reasoning_content: DeepSeek, OpenAI (standard) + // - reasoning: Ollama /v1/ (non-standard, used by Kimi K2.5, etc.) + const reasoningText = + ("reasoning_content" in delta && (delta as any).reasoning_content) || + ("reasoning" in delta && (delta as any).reasoning) + if (reasoningText) { yield { type: "reasoning", - text: (delta.reasoning_content as string | undefined) || "", + text: String(reasoningText), } } @@ -260,6 +266,18 @@ export class OpenAiHandler extends BaseProvider implements SingleCompletionHandl } } + // Yield reasoning content from non-streaming responses + // Thinking models (Kimi K2.5, DeepSeek-R1) may return reasoning + // in the message alongside (or instead of) regular content + const messageAny = message as any + const nonStreamReasoning = messageAny?.reasoning_content || messageAny?.reasoning + if (nonStreamReasoning) { + yield { + type: "reasoning", + text: String(nonStreamReasoning), + } + } + yield { type: "text", text: message?.content || "", diff --git a/src/core/assistant-message/__tests__/NativeToolCallParser.spec.ts b/src/core/assistant-message/__tests__/NativeToolCallParser.spec.ts index 2c15e12069c..4842c4def51 100644 --- a/src/core/assistant-message/__tests__/NativeToolCallParser.spec.ts +++ b/src/core/assistant-message/__tests__/NativeToolCallParser.spec.ts @@ -343,4 +343,148 @@ describe("NativeToolCallParser", () => { }) }) }) + + describe("Ollama single-chunk tool call pattern", () => { + // Ollama sends the entire tool call (id + name + full arguments) in a single chunk, + // unlike OpenAI which streams them incrementally across multiple chunks. + // Ollama also uses non-standard tool call IDs like "functions.read_file:0" + // instead of OpenAI's "call_abc123" format. + + it("should handle Ollama-style single-chunk tool call with non-standard ID", () => { + // Simulate exactly what Ollama sends through the proxy: + // One chunk with id, name, and complete arguments all at once + const events = NativeToolCallParser.processRawChunk({ + index: 0, + id: "functions.read_file:0", + name: "read_file", + arguments: '{"path":"/etc/hostname"}', + }) + + // Should emit tool_call_start followed by tool_call_delta + expect(events.length).toBeGreaterThanOrEqual(2) + expect(events[0].type).toBe("tool_call_start") + expect(events[0]).toEqual({ + type: "tool_call_start", + id: "functions.read_file:0", + name: "read_file", + }) + expect(events[1].type).toBe("tool_call_delta") + expect(events[1]).toEqual({ + type: "tool_call_delta", + id: "functions.read_file:0", + delta: '{"path":"/etc/hostname"}', + }) + }) + + it("should finalize Ollama tool call via finalizeRawChunks", () => { + // Step 1: Process the single chunk (simulating what Task.ts does) + const rawEvents = NativeToolCallParser.processRawChunk({ + index: 0, + id: "functions.read_file:0", + name: "read_file", + arguments: '{"path":"/etc/hostname"}', + }) + + // Step 2: Start streaming tool call (simulating Task.ts tool_call_start handler) + const startEvent = rawEvents.find((e) => e.type === "tool_call_start") + expect(startEvent).toBeDefined() + NativeToolCallParser.startStreamingToolCall( + startEvent!.id, + startEvent!.type === "tool_call_start" ? (startEvent as any).name : "", + ) + + // Step 3: Process delta (simulating Task.ts tool_call_delta handler) + const deltaEvent = rawEvents.find((e) => e.type === "tool_call_delta") + expect(deltaEvent).toBeDefined() + if (deltaEvent?.type === "tool_call_delta") { + NativeToolCallParser.processStreamingChunk(deltaEvent.id, deltaEvent.delta) + } + + // Step 4: Finalize via finalizeRawChunks (simulating end of stream in Task.ts) + // This is what happens when the stream ends — Task.ts calls finalizeRawChunks() + const finalEvents = NativeToolCallParser.finalizeRawChunks() + expect(finalEvents.length).toBe(1) + expect(finalEvents[0].type).toBe("tool_call_end") + expect(finalEvents[0].id).toBe("functions.read_file:0") + + // Step 5: Finalize the streaming tool call (simulating Task.ts handling tool_call_end) + const finalToolUse = NativeToolCallParser.finalizeStreamingToolCall("functions.read_file:0") + expect(finalToolUse).not.toBeNull() + expect(finalToolUse?.type).toBe("tool_use") + if (finalToolUse?.type === "tool_use") { + expect(finalToolUse.name).toBe("read_file") + const nativeArgs = finalToolUse.nativeArgs as { path: string } + expect(nativeArgs.path).toBe("/etc/hostname") + } + }) + + it("should finalize Ollama tool call via processFinishReason", () => { + // Step 1: Process the single chunk + NativeToolCallParser.processRawChunk({ + index: 0, + id: "functions.read_file:0", + name: "read_file", + arguments: '{"path":"/etc/hostname"}', + }) + + // Step 2: Process finish_reason (Ollama sends finish_reason: "tool_calls") + const endEvents = NativeToolCallParser.processFinishReason("tool_calls") + expect(endEvents.length).toBe(1) + expect(endEvents[0].type).toBe("tool_call_end") + expect(endEvents[0].id).toBe("functions.read_file:0") + }) + + it("should handle Ollama execute_command tool call", () => { + const rawEvents = NativeToolCallParser.processRawChunk({ + index: 0, + id: "functions.execute_command:0", + name: "execute_command", + arguments: '{"command":"ls -la","cwd":"/tmp"}', + }) + + expect(rawEvents.length).toBeGreaterThanOrEqual(2) + + // Start and process streaming + NativeToolCallParser.startStreamingToolCall("functions.execute_command:0", "execute_command") + const deltaEvent = rawEvents.find((e) => e.type === "tool_call_delta") + if (deltaEvent?.type === "tool_call_delta") { + NativeToolCallParser.processStreamingChunk(deltaEvent.id, deltaEvent.delta) + } + + // Finalize + const finalToolUse = NativeToolCallParser.finalizeStreamingToolCall("functions.execute_command:0") + expect(finalToolUse).not.toBeNull() + expect(finalToolUse?.type).toBe("tool_use") + if (finalToolUse?.type === "tool_use") { + expect(finalToolUse.name).toBe("execute_command") + const nativeArgs = finalToolUse.nativeArgs as { command: string; cwd?: string } + expect(nativeArgs.command).toBe("ls -la") + expect(nativeArgs.cwd).toBe("/tmp") + } + }) + + it("should handle multiple Ollama tool calls in sequence", () => { + // First tool call + const events1 = NativeToolCallParser.processRawChunk({ + index: 0, + id: "functions.read_file:0", + name: "read_file", + arguments: '{"path":"/etc/hostname"}', + }) + expect(events1[0].type).toBe("tool_call_start") + + // Second tool call + const events2 = NativeToolCallParser.processRawChunk({ + index: 1, + id: "functions.read_file:1", + name: "read_file", + arguments: '{"path":"/etc/hosts"}', + }) + expect(events2[0].type).toBe("tool_call_start") + + // Finish both via processFinishReason + const endEvents = NativeToolCallParser.processFinishReason("tool_calls") + expect(endEvents.length).toBe(2) + }) + }) }) diff --git a/src/core/task/Task.ts b/src/core/task/Task.ts index 3feb695e104..0e4f9215927 100644 --- a/src/core/task/Task.ts +++ b/src/core/task/Task.ts @@ -3401,14 +3401,19 @@ export class Task extends EventEmitter implements TaskLike { // the assistant message is already in history. Otherwise, tool_result blocks would appear // BEFORE their corresponding tool_use blocks, causing API errors. - // Check if we have any content to process (text or tool uses) + // Check if we have any content to process (text, reasoning, or tool uses) + // Thinking models (e.g., Kimi K2, DeepSeek-R1, QwQ) may produce only + // reasoning_content with no regular text content. This should not be + // treated as an empty/failed response — the model did respond, just + // entirely in reasoning tokens. const hasTextContent = assistantMessage.length > 0 + const hasReasoningContent = reasoningMessage.length > 0 const hasToolUses = this.assistantMessageContent.some( (block) => block.type === "tool_use" || block.type === "mcp_tool_use", ) - if (hasTextContent || hasToolUses) { + if (hasTextContent || hasToolUses || hasReasoningContent) { // Reset counter when we get a successful response with content this.consecutiveNoAssistantMessagesCount = 0 // Display grounding sources to the user if they exist @@ -3559,22 +3564,15 @@ export class Task extends EventEmitter implements TaskLike { presentAssistantMessage(this) } - if (hasTextContent || hasToolUses) { - // NOTE: This comment is here for future reference - this was a - // workaround for `userMessageContent` not getting set to true. - // It was due to it not recursively calling for partial blocks - // when `didRejectTool`, so it would get stuck waiting for a - // partial block to complete before it could continue. - // In case the content blocks finished it may be the api stream - // finished after the last parsed content block was executed, so - // we are able to detect out of bounds and set - // `userMessageContentReady` to true (note you should not call - // `presentAssistantMessage` since if the last block i - // completed it will be presented again). - // const completeBlocks = this.assistantMessageContent.filter((block) => !block.partial) // If there are any partial blocks after the stream ended we can consider them invalid. - // if (this.currentStreamingContentIndex >= completeBlocks.length) { - // this.userMessageContentReady = true - // } + if (hasTextContent || hasToolUses || hasReasoningContent) { + // When the model produces only reasoning content (no text blocks, + // no tool uses), assistantMessageContent is empty. In that case, + // presentAssistantMessage is never called, so userMessageContentReady + // would never be set to true. We must set it directly to avoid + // blocking forever on the pWaitFor below. + if (this.assistantMessageContent.length === 0) { + this.userMessageContentReady = true + } await pWaitFor(() => this.userMessageContentReady)