-
Notifications
You must be signed in to change notification settings - Fork 2
feat: add agent graph tracker #89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,7 @@ | ||
| import time | ||
| from dataclasses import dataclass | ||
| from enum import Enum | ||
| from typing import Any, Dict, Optional | ||
| from typing import Any, Dict, List, Optional | ||
|
|
||
| from ldclient import Context, LDClient | ||
|
|
||
|
|
@@ -407,3 +407,244 @@ def _openai_to_token_usage(data: dict) -> TokenUsage: | |
| input=data.get("prompt_tokens", 0), | ||
| output=data.get("completion_tokens", 0), | ||
| ) | ||
|
|
||
|
|
||
| class AIGraphTracker: | ||
| """ | ||
| Tracks graph-level, node-level, and edge-level metrics for AI agent graph operations. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| ld_client: LDClient, | ||
| variation_key: str, | ||
| graph_key: str, | ||
| version: int, | ||
| context: Context, | ||
| ): | ||
| """ | ||
| Initialize an AI Graph tracker. | ||
|
|
||
| :param ld_client: LaunchDarkly client instance. | ||
| :param variation_key: Variation key for tracking. | ||
| :param graph_key: Graph configuration key for tracking. | ||
| :param version: Version of the variation. | ||
| :param context: Context for evaluation. | ||
| """ | ||
| self._ld_client = ld_client | ||
| self._variation_key = variation_key | ||
| self._graph_key = graph_key | ||
| self._version = version | ||
| self._context = context | ||
|
|
||
| def __get_track_data(self): | ||
| """ | ||
| Get tracking data for events. | ||
|
|
||
| :return: Dictionary containing variation, graph key, and version. | ||
| """ | ||
| track_data = { | ||
| "variationKey": self._variation_key, | ||
| "graphKey": self._graph_key, | ||
| "version": self._version, | ||
| } | ||
| return track_data | ||
|
|
||
| def track_invocation_success(self) -> None: | ||
| """ | ||
| Track a successful graph invocation. | ||
| """ | ||
| self._ld_client.track( | ||
| "$ld:ai:graph:invocation_success", | ||
| self._context, | ||
| self.__get_track_data(), | ||
| 1, | ||
| ) | ||
|
|
||
| def track_invocation_failure(self) -> None: | ||
| """ | ||
| Track an unsuccessful graph invocation. | ||
| """ | ||
| self._ld_client.track( | ||
| "$ld:ai:graph:invocation_failure", | ||
| self._context, | ||
| self.__get_track_data(), | ||
| 1, | ||
| ) | ||
|
|
||
| def track_latency(self, duration: int) -> None: | ||
| """ | ||
| Track the total latency of graph execution. | ||
|
|
||
| :param duration: Duration in milliseconds. | ||
| """ | ||
| self._ld_client.track( | ||
| "$ld:ai:graph:latency", | ||
| self._context, | ||
| self.__get_track_data(), | ||
| duration, | ||
| ) | ||
|
|
||
| def track_total_tokens(self, tokens: TokenUsage) -> None: | ||
| """ | ||
| Track aggregated token usage across the entire graph invocation. | ||
|
|
||
| :param tokens: Token usage data. | ||
| """ | ||
| self._ld_client.track( | ||
| "$ld:ai:graph:total_tokens", | ||
| self._context, | ||
| self.__get_track_data(), | ||
| tokens.total, | ||
| ) | ||
|
|
||
| def track_path(self, path: List[str]) -> None: | ||
| """ | ||
| Track the execution path through the graph. | ||
|
|
||
| :param path: An array of configuration keys representing the sequence of nodes executed during graph traversal. | ||
| """ | ||
| track_data = {**self.__get_track_data(), "path": path} | ||
| self._ld_client.track( | ||
| "$ld:ai:graph:path", | ||
| self._context, | ||
| track_data, | ||
| 1, | ||
| ) | ||
|
|
||
| def track_judge_response(self, response: Any) -> None: | ||
| """ | ||
| Track judge responses for the final graph output. | ||
|
|
||
| :param response: JudgeResponse object containing evals and success status. | ||
| """ | ||
| from ldai.providers.types import EvalScore, JudgeResponse | ||
|
|
||
| if isinstance(response, JudgeResponse): | ||
| if response.evals: | ||
| track_data = self.__get_track_data() | ||
| if response.judge_config_key: | ||
| track_data = {**track_data, "judgeConfigKey": response.judge_config_key} | ||
|
|
||
| for metric_key, eval_score in response.evals.items(): | ||
| if isinstance(eval_score, EvalScore): | ||
| self._ld_client.track( | ||
| metric_key, | ||
| self._context, | ||
| track_data, | ||
| eval_score.score, | ||
| ) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Duplicated judge response tracking logic within same classLow Severity
Additional Locations (1) |
||
|
|
||
| def track_node_invocation(self, config_key: str) -> None: | ||
| """ | ||
| Track when a node is invoked during graph execution. | ||
|
|
||
| :param config_key: The configuration key of the node being invoked. | ||
| """ | ||
| track_data = {**self.__get_track_data(), "configKey": config_key} | ||
| self._ld_client.track( | ||
| "$ld:ai:graph:node_invocation", | ||
| self._context, | ||
| track_data, | ||
| 1, | ||
| ) | ||
|
|
||
| def track_tool_call(self, config_key: str, tool_key: str) -> None: | ||
| """ | ||
| Track tool calls made by nodes during graph execution. | ||
|
|
||
| :param config_key: The configuration key of the node making the tool call. | ||
| :param tool_key: The key of the tool being called. | ||
| """ | ||
| track_data = { | ||
| **self.__get_track_data(), | ||
| "configKey": config_key, | ||
| "toolKey": tool_key, | ||
| } | ||
| self._ld_client.track( | ||
| "$ld:ai:graph:tool_call", | ||
| self._context, | ||
| track_data, | ||
| 1, | ||
| ) | ||
|
|
||
| def track_node_judge_response(self, config_key: str, response: Any) -> None: | ||
| """ | ||
| Track judge responses for a specific node. | ||
|
|
||
| :param config_key: The configuration key of the node being evaluated. | ||
| :param response: JudgeResponse object containing evals and success status. | ||
| """ | ||
| from ldai.providers.types import EvalScore, JudgeResponse | ||
|
|
||
| if isinstance(response, JudgeResponse): | ||
| if response.evals: | ||
| track_data = {**self.__get_track_data(), "configKey": config_key} | ||
| if response.judge_config_key: | ||
| track_data = {**track_data, "judgeConfigKey": response.judge_config_key} | ||
|
|
||
| for metric_key, eval_score in response.evals.items(): | ||
| if isinstance(eval_score, EvalScore): | ||
| self._ld_client.track( | ||
| metric_key, | ||
| self._context, | ||
| track_data, | ||
| eval_score.score, | ||
| ) | ||
|
|
||
| def track_redirect(self, source_key: str, redirected_target: str) -> None: | ||
| """ | ||
| Track when a node redirects to a different target than originally specified. | ||
|
|
||
| :param source_key: The configuration key of the source node. | ||
| :param redirected_target: The configuration key of the target node that was redirected to. | ||
| """ | ||
| track_data = { | ||
| **self.__get_track_data(), | ||
| "sourceKey": source_key, | ||
| "redirectedTarget": redirected_target, | ||
| } | ||
| self._ld_client.track( | ||
| "$ld:ai:graph:redirect", | ||
| self._context, | ||
| track_data, | ||
| 1, | ||
| ) | ||
|
|
||
| def track_handoff_success(self, source_key: str, target_key: str) -> None: | ||
| """ | ||
| Track successful handoffs between nodes. | ||
|
|
||
| :param source_key: The configuration key of the source node. | ||
| :param target_key: The configuration key of the target node. | ||
| """ | ||
| track_data = { | ||
| **self.__get_track_data(), | ||
| "sourceKey": source_key, | ||
| "targetKey": target_key, | ||
| } | ||
| self._ld_client.track( | ||
| "$ld:ai:graph:handoff_success", | ||
| self._context, | ||
| track_data, | ||
| 1, | ||
| ) | ||
|
|
||
| def track_handoff_failure(self, source_key: str, target_key: str) -> None: | ||
| """ | ||
| Track failed handoffs between nodes. | ||
|
|
||
| :param source_key: The configuration key of the source node. | ||
| :param target_key: The configuration key of the target node. | ||
| """ | ||
| track_data = { | ||
| **self.__get_track_data(), | ||
| "sourceKey": source_key, | ||
| "targetKey": target_key, | ||
| } | ||
| self._ld_client.track( | ||
| "$ld:ai:graph:handoff_failure", | ||
| self._context, | ||
| track_data, | ||
| 1, | ||
| ) | ||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused import here, not sure why it wasn't picked up in previous lint