Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/user_guide/en/execution_logic.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ Execute according to the topological order:
After completing each round of in-cycle execution, the system checks these exit conditions:
- **Exit edge triggered**: If any in-cycle node triggers an edge to an out-of-cycle node, exit the loop
- **Maximum iterations reached**: If the configured maximum (default 100) is reached, force termination
- **Time limit reached**: If a `loop_timer` node within the cycle reaches its configured time limit, exit the loop
- **Initial node not re-triggered**: If the initial node isn't re-triggered by in-cycle predecessors, the loop naturally terminates

If none of the conditions are met, return to Step 2 for the next iteration.
Expand Down
159 changes: 159 additions & 0 deletions docs/user_guide/en/nodes/loop_timer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# Loop Timer Node

The Loop Timer node is a loop control node used to limit the duration of a loop in a workflow. Through a time-tracking mechanism, it suppresses output before reaching the preset time limit, and only releases the message to trigger outgoing edges when the time limit is reached, thereby terminating the loop.

## Configuration

| Field | Type | Required | Default | Description |
|-------|------|----------|---------|-------------|
| `max_duration` | float | Yes | `60.0` | Maximum loop duration, must be > 0 |
| `duration_unit` | string | Yes | `"seconds"` | Time unit: "seconds", "minutes", or "hours" |
| `reset_on_emit` | bool | No | `true` | Whether to reset the timer after reaching the limit |
| `message` | text | No | - | Message content to send to downstream when time limit is reached |

## Core Concepts

### How It Works

The Loop Timer node maintains an internal timer with the following behavior:

1. **On first trigger**: Timer starts tracking elapsed time
2. **Elapsed time < `max_duration`**: **No output is produced**, outgoing edges are not triggered
3. **Elapsed time >= `max_duration`**: Output message is produced, triggering outgoing edges

This "suppress-release" mechanism allows the Loop Timer to precisely control when a loop terminates based on time rather than iteration count.

### Topological Structure Requirements

The Loop Timer node has special placement requirements in the graph structure:

```
┌──────────────────────────────────────┐
▼ │
Agent ──► Human ─────► Loop Timer ──┬──┘
▲ │ │
└─────────┘ ▼
End Node (outside loop)
```

> **Important**: Since Loop Timer **produces no output until the time limit is reached**:
> - **Human must connect to both Agent and Loop Timer**: This way the "continue loop" edge is handled by Human → Agent, while Loop Timer only handles time tracking
> - **Loop Timer must connect to Agent (inside loop)**: So it's recognized as an in-loop node, avoiding premature loop termination
> - **Loop Timer must connect to End Node (outside loop)**: When the time limit is reached, trigger the out-of-loop node to terminate the entire loop execution

### Timer State

- Timer state persists throughout the entire workflow execution
- Timer starts on the first trigger to the Loop Timer node
- When `reset_on_emit: true`, the timer resets after reaching the limit
- When `reset_on_emit: false`, the timer continues running after reaching the limit, outputting on every subsequent trigger

## When to Use

- **Time-based constraints**: Enforce time limits for loops (e.g., "review must complete within 5 minutes")
- **Timeout protection**: Serve as a "circuit breaker" to prevent runaway processes
- **Variable iteration time**: When each loop iteration takes unpredictable time, but total duration must be bounded

## Examples

### Basic Usage

```yaml
nodes:
- id: Time Guard
type: loop_timer
config:
max_duration: 5
duration_unit: minutes
reset_on_emit: true
message: Time limit reached (5 minutes), process terminated.
```

### Time-Limited Review Loop

This is the most typical use case for Loop Timer:

```yaml
graph:
id: timed_review_loop
description: Review loop with 5-minute time limit

nodes:
- id: Writer
type: agent
config:
provider: openai
name: gpt-4o
role: Improve articles based on user feedback

- id: Reviewer
type: human
config:
description: |
Review the article, enter ACCEPT to accept or provide modification suggestions.

- id: Loop Gate
type: loop_timer
config:
max_duration: 5
duration_unit: minutes
message: Time limit (5 minutes) reached, process automatically ended.

- id: Final Output
type: passthrough
config: {}

edges:
# Main loop: Writer -> Reviewer
- from: Writer
to: Reviewer

# Condition 1: User enters ACCEPT -> End
- from: Reviewer
to: Final Output
condition:
type: keyword
config:
any: [ACCEPT]

# Condition 2: User enters modification suggestions -> Trigger both Writer to continue loop AND Loop Gate to track time
- from: Reviewer
to: Writer
condition:
type: keyword
config:
none: [ACCEPT]

- from: Reviewer
to: Loop Gate
condition:
type: keyword
config:
none: [ACCEPT]

# Loop Gate connects to Writer (keeps it inside the loop)
- from: Loop Gate
to: Writer

# When Loop Gate reaches time limit: Trigger Final Output to end the process
- from: Loop Gate
to: Final Output

start: [Writer]
end: [Final Output]
```

**Execution Flow Explanation**:
1. User first enters modification suggestions → Triggers both Writer (continue loop) and Loop Gate (track time, no output)
2. User enters modification suggestions again → Triggers both Writer (continue loop) and Loop Gate (track time, no output)
3. After 5 minutes of elapsed time → Loop Gate outputs message triggering Final Output, terminating the loop
4. Or at any time user enters ACCEPT → Goes directly to Final Output to end

## Notes

- `max_duration` must be a positive number (> 0)
- `duration_unit` must be one of: "seconds", "minutes", "hours"
- Loop Timer **produces no output until the time limit is reached**, outgoing edges will not trigger
- Ensure Loop Timer connects to both in-loop and out-of-loop nodes
- The `message` field is optional, default message is `"Time limit reached (N units)"`
- Timer starts on the first trigger to the Loop Timer node
1 change: 1 addition & 0 deletions docs/user_guide/en/workflow_authoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Further reading: `docs/user_guide/en/field_specs.md` (field catalog), `docs/user
| `passthrough` | Pass-through node that forwards only the last message by default and can be configured to forward all messages; used for context filtering and graph structure optimization. | `only_last_message` | [passthrough.md](nodes/passthrough.md) |
| `literal` | Emits a fixed text payload whenever triggered and discards inputs. | `content`, `role` (`user`/`assistant`) | [literal.md](nodes/literal.md) |
| `loop_counter` | Guard node that limits loop iterations before releasing downstream edges. | `max_iterations`, `reset_on_emit`, `message` | [loop_counter.md](nodes/loop_counter.md) |
| `loop_timer` | Guard node that limits loop duration before releasing downstream edges. | `max_duration`, `duration_unit`, `reset_on_emit`, `message`, `passthrough` | [loop_timer.md](nodes/loop_timer.md) |

Fetch the full schema via `POST /api/config/schema` or inspect the dataclasses inside `entity/configs/`.

Expand Down
133 changes: 133 additions & 0 deletions entity/configs/node/loop_timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""Configuration for loop timer guard nodes."""

from dataclasses import dataclass
from typing import Mapping, Any, Optional

from entity.configs.base import (
BaseConfig,
ConfigError,
ConfigFieldSpec,
EnumOption,
require_mapping,
extend_path,
optional_str,
)


@dataclass
class LoopTimerConfig(BaseConfig):
"""Configuration schema for the loop timer node type."""

max_duration: float = 60.0
duration_unit: str = "seconds"
reset_on_emit: bool = True
message: Optional[str] = None
passthrough: bool = False

@classmethod
def from_dict(
cls, data: Mapping[str, Any] | None, *, path: str
) -> "LoopTimerConfig":
mapping = require_mapping(data or {}, path)
max_duration_raw = mapping.get("max_duration", 60.0)
try:
max_duration = float(max_duration_raw)
except (TypeError, ValueError) as exc: # pragma: no cover - defensive
raise ConfigError(
"max_duration must be a number",
extend_path(path, "max_duration"),
) from exc

if max_duration <= 0:
raise ConfigError(
"max_duration must be > 0", extend_path(path, "max_duration")
)

duration_unit = str(mapping.get("duration_unit", "seconds"))
valid_units = ["seconds", "minutes", "hours"]
if duration_unit not in valid_units:
raise ConfigError(
f"duration_unit must be one of: {', '.join(valid_units)}",
extend_path(path, "duration_unit"),
)

reset_on_emit = bool(mapping.get("reset_on_emit", True))
message = optional_str(mapping, "message", path)
passthrough = bool(mapping.get("passthrough", False))

return cls(
max_duration=max_duration,
duration_unit=duration_unit,
reset_on_emit=reset_on_emit,
message=message,
passthrough=passthrough,
path=path,
)

def validate(self) -> None:
if self.max_duration <= 0:
raise ConfigError(
"max_duration must be > 0", extend_path(self.path, "max_duration")
)

valid_units = ["seconds", "minutes", "hours"]
if self.duration_unit not in valid_units:
raise ConfigError(
f"duration_unit must be one of: {', '.join(valid_units)}",
extend_path(self.path, "duration_unit"),
)

FIELD_SPECS = {
"max_duration": ConfigFieldSpec(
name="max_duration",
display_name="Maximum Duration",
type_hint="float",
required=True,
default=60.0,
description="How long the loop can run before this node emits an output.",
),
"duration_unit": ConfigFieldSpec(
name="duration_unit",
display_name="Duration Unit",
type_hint="str",
required=True,
default="seconds",
description="Unit of time for max_duration: 'seconds', 'minutes', or 'hours'.",
enum=["seconds", "minutes", "hours"],
enum_options=[
EnumOption(
value="seconds", label="Seconds", description="Time in seconds"
),
EnumOption(
value="minutes", label="Minutes", description="Time in minutes"
),
EnumOption(value="hours", label="Hours", description="Time in hours"),
],
),
"reset_on_emit": ConfigFieldSpec(
name="reset_on_emit",
display_name="Reset After Emit",
type_hint="bool",
required=False,
default=True,
description="Whether to reset the internal timer after reaching the limit.",
advance=True,
),
"message": ConfigFieldSpec(
name="message",
display_name="Release Message",
type_hint="text",
required=False,
description="Optional text sent downstream once the time limit is reached.",
advance=True,
),
"passthrough": ConfigFieldSpec(
name="passthrough",
display_name="Passthrough Mode",
type_hint="bool",
required=False,
default=False,
description="If true, after emitting the limit message, all subsequent inputs pass through unchanged.",
advance=True,
),
}
21 changes: 15 additions & 6 deletions runtime/node/builtin_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
from entity.configs.node.literal import LiteralNodeConfig
from entity.configs.node.python_runner import PythonRunnerConfig
from entity.configs.node.loop_counter import LoopCounterConfig
from entity.configs.node.loop_timer import LoopTimerConfig
from runtime.node.executor.agent_executor import AgentNodeExecutor
from runtime.node.executor.human_executor import HumanNodeExecutor
from runtime.node.executor.passthrough_executor import PassthroughNodeExecutor
from runtime.node.executor.literal_executor import LiteralNodeExecutor
from runtime.node.executor.python_executor import PythonNodeExecutor
from runtime.node.executor.subgraph_executor import SubgraphNodeExecutor
from runtime.node.executor.loop_counter_executor import LoopCounterNodeExecutor
from runtime.node.executor.loop_timer_executor import LoopTimerNodeExecutor
from runtime.node.registry import NodeCapabilities, register_node_type


Expand Down Expand Up @@ -48,9 +50,10 @@
"subgraph",
config_cls=SubgraphConfig,
executor_cls=SubgraphNodeExecutor,
capabilities=NodeCapabilities(
capabilities=NodeCapabilities(),
executor_factory=lambda context, subgraphs=None: SubgraphNodeExecutor(
context, subgraphs or {}
),
executor_factory=lambda context, subgraphs=None: SubgraphNodeExecutor(context, subgraphs or {}),
summary="Embeds (through file path or inline config) and runs another named subgraph within the current workflow",
)

Expand All @@ -69,17 +72,15 @@
"passthrough",
config_cls=PassthroughConfig,
executor_cls=PassthroughNodeExecutor,
capabilities=NodeCapabilities(
),
capabilities=NodeCapabilities(),
summary="Forwards prior node output downstream without modification",
)

register_node_type(
"literal",
config_cls=LiteralNodeConfig,
executor_cls=LiteralNodeExecutor,
capabilities=NodeCapabilities(
),
capabilities=NodeCapabilities(),
summary="Emits the configured text message every time it is triggered",
)

Expand All @@ -91,6 +92,14 @@
summary="Blocks downstream edges until the configured iteration limit is reached, then emits a message to release the loop.",
)

register_node_type(
"loop_timer",
config_cls=LoopTimerConfig,
executor_cls=LoopTimerNodeExecutor,
capabilities=NodeCapabilities(),
summary="Blocks downstream edges until the configured time limit is reached, then emits a message to release the loop.",
)

# Register subgraph source types (file-based and inline config)
register_subgraph_source(
"config",
Expand Down
Loading
Loading