Comprehensive 60-minute tutorial on real-time streaming flow execution
Learn how to build real-time streaming applications with the Streaming Flow Execution system, inspired by Langflow's sophisticated event-driven architecture.
- Introduction
- Prerequisites
- Tutorial Series
- Core Concepts
- Architecture Overview
- Complete Examples
- Production Patterns
- Troubleshooting
The Streaming Flow Execution system provides real-time streaming capabilities for agent execution:
- Token-by-Token Streaming: See LLM responses as they're generated
- Event-Driven Architecture: Queue-based event management
- Progress Tracking: Know exactly what's happening at all times
- Multiple Listeners: Broadcast events to multiple consumers
- SSE Support: Native browser streaming with Server-Sent Events
Traditional execution:
$result = $agent->run($task);
echo $result->getOutput(); // Wait... then all at onceStreaming execution:
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
echo $event['data']['token']; // Real-time, token-by-token
}This system is inspired by Langflow's event-driven architecture, adapted from Python's async patterns to PHP using Generators.
| Python (Langflow) | PHP (This Framework) |
|---|---|
async/await |
Generator/yield |
asyncio.Queue |
SplQueue |
| Async subscribers | Observer + Iterator |
- Basic PHP (variables, functions, classes)
- Understanding of generators (helpful but not required)
- Familiarity with claude-php-agent basics
- PHP 8.1+ installed
- Composer dependencies installed:
cd /path/to/claude-php-agent composer install - API Key configured:
export ANTHROPIC_API_KEY='your-api-key-here'
php -v # Should show PHP 8.1+
echo $ANTHROPIC_API_KEY # Should show your keyComplete the tutorials in order. Each builds on the previous ones.
File: examples/tutorials/streaming-flow/01-basic-streaming.php
Topics:
- Setting up the streaming executor
- Streaming token-by-token responses
- Handling different event types
- Displaying real-time progress
Run:
php examples/tutorials/streaming-flow/01-basic-streaming.phpWhat you'll see:
Step 1: Setting up the streaming executor
------------------------------------------
✅ ServiceManager initialized
✅ Streaming services registered
Step 2: Getting the streaming executor
---------------------------------------
✅ Streaming executor ready
🚀 Flow started
[Token streaming appears here in real-time...]
🔧 Using tool: calculator
Input: {"operation":"multiply","a":25,"b":4}
✅ Result: Result: 100
📊 Progress: 50.0%
✅ Flow completed
Key Takeaways:
- Streaming executor setup is simple with ServiceManager
- Events flow in real-time as they occur
- Different event types provide different information
File: examples/tutorials/streaming-flow/02-progress-tracking.php
Topics:
- Real-time progress bars
- Duration tracking
- Time remaining estimation
- Queue statistics monitoring
Run:
php examples/tutorials/streaming-flow/02-progress-tracking.phpWhat you'll see:
[========================================] 100.0%
📊 Progress: 3/3 iterations
⏱️ Duration: 2.5s
⏰ ETA: 0.0s
Queue Statistics:
Events in queue: 0
Max queue size: 100
Queue utilization: 0.0%
Dropped events: 0
Key Takeaways:
- Progress tracking provides user feedback
- Duration estimation helps set expectations
- Queue statistics help optimize performance
File: examples/tutorials/streaming-flow/03-event-listeners.php
Topics:
- Multiple subscribers to events
- Custom event handlers
- Listener lifecycle management
- Event broadcasting
Run:
php examples/tutorials/streaming-flow/03-event-listeners.phpWhat you'll see:
✅ Listener 1: Token Counter (ID: listener-...)
✅ Listener 2: Performance Monitor (ID: listener-...)
✅ Listener 3: Error Tracker (ID: listener-...)
✅ Listener 4: Event Logger (ID: listener-...)
[All listeners receive events simultaneously]
Listener Results:
Token Counter: 150 tokens
Performance Monitor: 2 iterations, 1 tool
Error Tracker: 0 errors
Event Logger: 45 events
Key Takeaways:
- Multiple listeners can monitor the same execution
- Each listener can track different aspects
- Listener management (subscribe/unsubscribe) is flexible
File: examples/tutorials/streaming-flow/04-sse-streaming.php
Topics:
- Server-Sent Events endpoints
- Browser-based streaming
- Event formatting
- Connection management
Run:
php examples/tutorials/streaming-flow/04-sse-streaming.php serve
# Open http://localhost:8080 in browserWhat you'll see:
- Interactive web interface
- Real-time streaming in browser
- Progress bars and event logs
- Connection status monitoring
Key Takeaways:
- SSE provides native browser streaming
- No WebSocket complexity needed
- Perfect for real-time web UIs
File: examples/tutorials/streaming-flow/05-custom-events.php
Topics:
- Defining custom event types
- Domain-specific events
- Custom event handlers
- Event aggregation
Run:
php examples/tutorials/streaming-flow/05-custom-events.phpWhat you'll see:
Custom event types defined:
- TRADE_EXECUTED
- PRICE_ALERT
- RISK_WARNING
- PORTFOLIO_UPDATE
💰 TRADE: AAPL 100.00 @ $175.50 = $17550.00
🔔 ALERT: TSLA at $245.75 (above target)
⚠️ RISK [MEDIUM]: Portfolio volatility increased
Key Takeaways:
- Extend FlowEvent for domain-specific needs
- Custom events enable specialized monitoring
- Event aggregation provides insights
File: examples/tutorials/streaming-flow/06-error-handling.php
Topics:
- Error handling in streaming
- Recovery strategies
- Error tracking and logging
- Graceful degradation
Run:
php examples/tutorials/streaming-flow/06-error-handling.phpWhat you'll see:
Test 1: Normal execution
▶️ Execution started...
✅ Tool completed: Result: 20
✅ Execution completed
Test 2: Error scenario (division by zero)
▶️ Execution started...
❌ Error caught in stream: Cannot divide by zero
✅ Error was properly caught and reported
Retry mechanism:
Attempt 1/3...
❌ Failed: Network timeout
Retrying in 0.5s...
Attempt 2/3...
✅ Success on attempt 2
Key Takeaways:
- Always wrap streaming in try-catch
- Subscribe error listeners before execution
- Implement retry logic with exponential backoff
- Errors don't stop the stream
File: examples/tutorials/streaming-flow/07-integration.php
Topics:
- Integration with existing agents
- ServiceManager integration
- Combining with other services
- Building complete applications
Run:
php examples/tutorials/streaming-flow/07-integration.phpWhat you'll see:
Service manager configured with:
- Cache service
- Telemetry service
- Flow event manager
- Streaming executor
✅ StreamingLoop configured with FlowEventManager
✅ Telemetry listener registered
✅ Cache integration ready
Telemetry Metrics:
streaming.tokens: 150
streaming.iterations: 3
streaming.tool_calls: 2
Cache Statistics:
Active keys: 5
Key Takeaways:
- Streaming integrates seamlessly with services
- Combine with cache, telemetry, tracing
- Build production-ready applications
- Monitor full stack metrics
FIFO queue for event management:
use ClaudeAgents\Events\EventQueue;
$queue = new EventQueue(maxSize: 100);
// Add events
$queue->enqueue($event);
// Remove events
$event = $queue->dequeue();
// Check status
$queue->isEmpty();
$queue->size();Configuration:
maxSize: Maximum events (default: 100)- Tracks dropped events when full
- Provides utilization statistics
Events represent execution milestones:
use ClaudeAgents\Events\FlowEvent;
// Flow lifecycle
$event = FlowEvent::flowStarted(['input' => 'task']);
$event = FlowEvent::flowCompleted(['output' => 'result']);
// Token streaming
$event = FlowEvent::token('Hello');
// Progress tracking
$event = FlowEvent::progress(50.0, ['step' => 'processing']);
// Tool execution
$event = FlowEvent::toolStarted('calculator', ['a' => 1, 'b' => 2]);Event Types (25+):
- Flow: started, completed, failed
- Token: received, chunk
- Iteration: started, completed, failed
- Tool: started, completed, failed
- Progress: update, step events
- Error: error, warning, info
Manages event registration and broadcasting:
use ClaudeAgents\Events\FlowEventManager;
$manager = new FlowEventManager($eventQueue);
// Register events
$manager->registerEvent('on_token', FlowEvent::TOKEN_RECEIVED);
// Register with callback
$manager->registerEvent('on_error', FlowEvent::ERROR, function($event) {
error_log($event->data['message']);
});
// Emit events
$manager->emit(FlowEvent::TOKEN_RECEIVED, ['token' => 'Hi']);
// Subscribe listeners
$id = $manager->subscribe(function($event) {
echo "Event: {$event->type}\n";
});Main execution engine:
use ClaudeAgents\Execution\StreamingFlowExecutor;
$executor = new StreamingFlowExecutor($eventManager, $eventQueue);
// Stream execution
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
handleEvent($event);
}
// SSE streaming
foreach ($executor->streamSSE($agent, $task) as $sseData) {
echo $sseData;
flush();
}Track execution progress:
use ClaudeAgents\Execution\FlowProgress;
$progress = new FlowProgress(totalIterations: 10);
$progress->start();
$progress->startIteration(1);
echo $progress->getProgress(); // 10.0 (percentage)
echo $progress->getFormattedDuration(); // "1.5s"
echo $progress->getEstimatedTimeRemaining(); // 13.5 (seconds)┌─────────────────────┐
│ StreamingFlowExecutor│
│ Generator-based │
└──────┬──────┬───────┘
│ │
│ └─────────┐
▼ ▼
┌─────────────┐ ┌─────────────┐
│ EventManager│ │ EventQueue │
│ Broadcast │◄─┤ FIFO │
└─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ FlowEvent │
│ 25+ types │
└─────────────┘
Agent executes
│
▼
Event emitted ──► EventQueue
│
▼
StreamingFlowExecutor yields event
│
├──► Consumer 1
├──► Consumer 2
└──► Consumer N
<?php
require 'vendor/autoload.php';
use ClaudeAgents\Services\ServiceManager;
use ClaudeAgents\Services\ServiceType;
$executor = ServiceManager::getInstance()->get(ServiceType::FLOW_EXECUTOR);
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
if ($event['type'] === 'token') {
echo $event['data']['token'];
} elseif ($event['type'] === 'progress') {
$percent = $event['data']['progress_percent'];
echo "\rProgress: " . round($percent) . "%";
}
}<?php
// api/stream.php
header('Content-Type: text/event-stream');
header('Cache-Control: no-cache');
$task = $_GET['task'] ?? 'Hello';
foreach ($executor->streamSSE($agent, $task) as $sseData) {
echo $sseData;
flush();
}<?php
$eventManager = ServiceManager::getInstance()->get(ServiceType::EVENT_MANAGER);
// Logger
$eventManager->subscribe(fn($e) => logToFile($e));
// Metrics
$eventManager->subscribe(fn($e) => recordMetric($e));
// UI Update
$eventManager->subscribe(fn($e) => updateUI($e));
// All receive events simultaneously<?php
$dashboard = [
'tokens' => 0,
'iterations' => 0,
'tools' => [],
'duration' => 0,
];
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
match ($event['type']) {
'token' => $dashboard['tokens']++,
'iteration_end' => $dashboard['iterations']++,
'tool_end' => $dashboard['tools'][] = $event['data']['tool'],
'progress' => $dashboard['duration'] = $event['data']['duration'],
default => null
};
renderDashboard($dashboard);
}Reduce network calls by buffering tokens:
$buffer = '';
$bufferSize = 20;
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
if ($event['type'] === 'token') {
$buffer .= $event['data']['token'];
if (strlen($buffer) >= $bufferSize) {
sendToClient($buffer);
$buffer = '';
}
}
}
sendToClient($buffer); // Flush remainingImplement robust error handling:
$maxRetries = 3;
$attempt = 0;
while ($attempt < $maxRetries) {
try {
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
if ($event['type'] === 'error') {
throw new RuntimeException($event['data']['error']);
}
processEvent($event);
}
break; // Success
} catch (Exception $e) {
$attempt++;
if ($attempt >= $maxRetries) throw $e;
sleep(pow(2, $attempt)); // Exponential backoff
}
}Record events for replay or analysis:
class EventRecorder {
private array $events = [];
public function record(array $event): void {
$this->events[] = $event;
}
public function save(string $path): void {
file_put_contents($path, json_encode($this->events));
}
public function replay(): Generator {
foreach ($this->events as $event) {
yield $event;
}
}
}
$recorder = new EventRecorder();
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
$recorder->record($event);
processEvent($event);
}
$recorder->save('execution_log.json');Track detailed metrics during execution:
$metrics = [
'start_time' => microtime(true),
'token_count' => 0,
'iteration_count' => 0,
'tool_calls' => [],
'errors' => 0,
];
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
match ($event['type']) {
'token' => $metrics['token_count']++,
'iteration_end' => $metrics['iteration_count']++,
'tool_end' => $metrics['tool_calls'][] = $event['data']['tool'],
'error' => $metrics['errors']++,
default => null
};
}
$metrics['duration'] = microtime(true) - $metrics['start_time'];
$metrics['tokens_per_second'] = $metrics['token_count'] / max($metrics['duration'], 0.001);
saveMetrics($metrics);Stream different content based on conditions:
function conditionalStream(
StreamingFlowExecutor $executor,
Agent $agent,
string $task,
bool $verbose
): Generator {
foreach ($executor->executeWithStreaming($agent, $task) as $event) {
if ($verbose) {
yield $event; // All events
} else {
// Only essential events
if (in_array($event['type'], ['token', 'end', 'error'])) {
yield $event;
}
}
}
}Define domain-specific events:
class CustomFlowEvent extends FlowEvent
{
public const CUSTOM_MILESTONE = 'custom.milestone';
public const CUSTOM_METRIC = 'custom.metric';
public static function milestone(string $name, array $data = []): self
{
return new self(
self::CUSTOM_MILESTONE,
array_merge(['milestone' => $name], $data),
microtime(true)
);
}
}
$manager->registerEvent('on_milestone', CustomFlowEvent::CUSTOM_MILESTONE);
$manager->emit(CustomFlowEvent::CUSTOM_MILESTONE, ['milestone' => 'halfway']);Enhance existing StreamingLoop:
use ClaudeAgents\Streaming\StreamingLoop;
$loop = new StreamingLoop();
$loop->setFlowEventManager($eventManager);
// Loop now automatically emits:
// - iteration events
// - token events
// - tool execution events
$context = new AgentContext($client, $config);
$context->setTask($task);
$context->setTools($tools);
$result = $loop->execute($context);Use with full service stack:
$manager = ServiceManager::getInstance();
$manager
->registerFactory(new CacheServiceFactory())
->registerFactory(new TelemetryServiceFactory())
->registerFactory(new TracingServiceFactory())
->registerFactory(new FlowEventManagerServiceFactory())
->registerFactory(new StreamingFlowExecutorServiceFactory());
// All services work together
$cache = $manager->get(ServiceType::CACHE);
$telemetry = $manager->get(ServiceType::TELEMETRY);
$tracing = $manager->get(ServiceType::TRACING);
$executor = $manager->get(ServiceType::FLOW_EXECUTOR);Symptoms: Stream executes but no events appear
Solutions:
-
Ensure Generator is fully consumed:
foreach ($executor->executeWithStreaming(...) as $event) { // Must iterate through all events }
-
Check queue isn't full:
$stats = $eventQueue->getStats(); if ($stats['dropped_events'] > 0) { // Increase queue size $queue = new EventQueue(maxSize: 500); }
Symptoms: Memory consumption grows during execution
Solutions:
-
Reduce queue size:
$queue = new EventQueue(maxSize: 50);
-
Process events immediately:
foreach ($executor->executeWithStreaming(...) as $event) { processEvent($event); // Don't store }
-
Clear queue periodically:
if ($eventCount % 100 === 0) { gc_collect_cycles(); }
Symptoms: Browser doesn't receive events
Solutions:
-
Disable output buffering:
if (ob_get_level()) ob_end_clean();
-
Set correct headers:
header('Content-Type: text/event-stream'); header('Cache-Control: no-cache'); header('X-Accel-Buffering: no'); // For nginx
-
Flush regularly:
echo $sseData; flush();
Symptoms: Events arrive in unexpected order
Solutions:
- EventQueue is FIFO - check emission order
- Verify listeners aren't modifying queue
- Check for multiple event managers
Symptoms: Queue shows dropped events in stats
Solutions:
- Increase queue size
- Process events faster
- Reduce emission rate
- Use selective event registration
// ✅ Good: Full consumption
foreach ($executor->executeWithStreaming(...) as $event) {
processEvent($event);
}
// ❌ Bad: Partial consumption
$generator = $executor->executeWithStreaming(...);
$firstEvent = $generator->current(); // Only gets first eventforeach ($executor->executeWithStreaming($agent, $task) as $event) {
if ($event['type'] === 'error') {
logError($event['data']);
// Don't ignore errors!
}
}$stats = $eventQueue->getStats();
if ($stats['utilization'] > 80) {
logger->warning('Queue nearly full');
}
if ($stats['dropped_events'] > 0) {
logger->error("Lost {$stats['dropped_events']} events");
}// ✅ Good: Use ServiceManager
$executor = ServiceManager::getInstance()->get(ServiceType::FLOW_EXECUTOR);
// ❌ Bad: Manual instantiation (no DI)
$queue = new EventQueue();
$manager = new FlowEventManager($queue);
$executor = new StreamingFlowExecutor($manager, $queue);class StreamingService {
private array $listenerIds = [];
public function start() {
$this->listenerIds[] = $eventManager->subscribe(...);
}
public function stop() {
foreach ($this->listenerIds as $id) {
$eventManager->unsubscribe($id);
}
}
}- Queue Size: Balance memory vs event loss (100-500 typical)
- Selective Events: Only register events you need
- Lightweight Listeners: Keep handlers fast
- Batch Processing: Process events in batches for efficiency
- Clear Queue: Periodically clear processed events
After completing this tutorial series:
-
Read the docs:
-
Explore examples:
-
Build something:
- Real-time dashboard
- CLI with progress bars
- Web application with SSE
- Multi-agent monitoring system
- API Reference: See individual component documentation
- Examples:
examples/tutorials/streaming-flow/ - Tests:
tests/Unit/Events/andtests/Unit/Execution/ - GitHub Issues: Report bugs or ask questions
You now know how to:
✅ Set up streaming flow execution
✅ Handle real-time events
✅ Track progress and duration
✅ Build SSE endpoints
✅ Create custom events
✅ Handle errors gracefully
✅ Integrate with services
Total time: ~90 minutes
Skill level: Intermediate
Completion: Full-stack streaming applications
Last updated: February 4, 2026