diff --git a/BotSharp.sln b/BotSharp.sln
index ad95f29e8..6abc5b47b 100644
--- a/BotSharp.sln
+++ b/BotSharp.sln
@@ -157,6 +157,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BotSharp.Core.A2A", "src\In
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BotSharp.Plugin.MultiTenancy", "src\Plugins\BotSharp.Plugin.MultiTenancy\BotSharp.Plugin.MultiTenancy.csproj", "{562DD0C6-DAC8-02CC-C1DD-D43DF186CE76}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BotSharp.Plugin.RabbitMQ", "src\Plugins\BotSharp.Plugin.RabbitMQ\BotSharp.Plugin.RabbitMQ.csproj", "{8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -669,6 +671,14 @@ Global
{562DD0C6-DAC8-02CC-C1DD-D43DF186CE76}.Release|Any CPU.Build.0 = Release|Any CPU
{562DD0C6-DAC8-02CC-C1DD-D43DF186CE76}.Release|x64.ActiveCfg = Release|Any CPU
{562DD0C6-DAC8-02CC-C1DD-D43DF186CE76}.Release|x64.Build.0 = Release|Any CPU
+ {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Debug|x64.Build.0 = Debug|Any CPU
+ {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Release|Any CPU.Build.0 = Release|Any CPU
+ {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Release|x64.ActiveCfg = Release|Any CPU
+ {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Release|x64.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -745,6 +755,7 @@ Global
{13223C71-9EAC-9835-28ED-5A4833E6F915} = {53E7CD86-0D19-40D9-A0FA-AB4613837E89}
{E8D01281-D52A-BFF4-33DB-E35D91754272} = {E29DC6C4-5E57-48C5-BCB0-6B8F84782749}
{562DD0C6-DAC8-02CC-C1DD-D43DF186CE76} = {51AFE054-AE99-497D-A593-69BAEFB5106F}
+ {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5} = {64264688-0F5C-4AB0-8F2B-B59B717CCE00}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A9969D89-C98B-40A5-A12B-FC87E55B3A19}
diff --git a/Directory.Packages.props b/Directory.Packages.props
index 1c198a828..96897fb92 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -10,6 +10,7 @@
+
diff --git a/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs b/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs
index 75c0985a8..2653feec2 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs
@@ -1,3 +1,5 @@
+using System.Text.Json;
+
namespace BotSharp.Abstraction.Agents.Models;
public class AgentRule
@@ -8,6 +10,63 @@ public class AgentRule
[JsonPropertyName("disabled")]
public bool Disabled { get; set; }
- [JsonPropertyName("criteria")]
- public string Criteria { get; set; } = string.Empty;
+ [JsonPropertyName("rule_criteria")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
+ public AgentRuleCriteria? RuleCriteria { get; set; }
+
+ [JsonPropertyName("rule_actions")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
+ public IEnumerable RuleActions { get; set; } = [];
+}
+
+public class AgentRuleCriteria : AgentRuleConfigBase
+{
+ ///
+ /// Criteria
+ ///
+ [JsonPropertyName("criteria_text")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
+ public string CriteriaText { get; set; } = string.Empty;
+
+ ///
+ /// Adaptive configuration for rule criteria.
+ /// This flexible JSON document can store any criteria-specific configuration.
+ /// The structure depends on the criteria executor
+ ///
+ [JsonPropertyName("config")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
+ public override JsonDocument? Config { get; set; }
}
+
+public class AgentRuleAction : AgentRuleConfigBase
+{
+ ///
+ /// Adaptive configuration for rule actions.
+ /// This flexible JSON document can store any action-specific configuration.
+ /// The structure depends on the action type:
+ /// - For "Http" action: contains http_context with base_url, relative_url, method, etc.
+ /// - For "MessageQueue" action: contains mq_config with topic_name, routing_key, etc.
+ /// - For custom actions: can contain any custom configuration structure
+ ///
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
+ public override JsonDocument? Config { get; set; }
+
+ ///
+ /// Skipping the number of actions using liquid template, starting from the action itself.
+ ///
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
+ public string? SkippingExpression { get; set; }
+}
+
+public class AgentRuleConfigBase
+{
+ public virtual string Name { get; set; }
+
+ public virtual bool Disabled { get; set; }
+
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
+ public virtual JsonDocument? Config { get; set; }
+
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
+ public virtual string? JsonConfig { get; set; }
+}
\ No newline at end of file
diff --git a/src/Infrastructure/BotSharp.Abstraction/Coding/Contexts/CodeExecutionContext.cs b/src/Infrastructure/BotSharp.Abstraction/Coding/Contexts/CodeExecutionContext.cs
index e2ec38a5a..8442eae45 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Coding/Contexts/CodeExecutionContext.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Coding/Contexts/CodeExecutionContext.cs
@@ -4,4 +4,5 @@ public class CodeExecutionContext
{
public AgentCodeScript CodeScript { get; set; }
public List Arguments { get; set; } = [];
+ public string? InvokeFrom { get; set; }
}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQConsumer.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQConsumer.cs
new file mode 100644
index 000000000..4df43dd0e
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQConsumer.cs
@@ -0,0 +1,21 @@
+namespace BotSharp.Abstraction.Infrastructures.MessageQueues;
+
+///
+/// Abstract interface for message queue consumers.
+/// Implement this interface to create consumers that are independent of MQ products (e.g., RabbitMQ, Kafka, Azure Service Bus).
+///
+public interface IMQConsumer : IDisposable
+{
+ ///
+ /// Gets the consumer config
+ ///
+ object Config { get; }
+
+ ///
+ /// Handles the received message from the queue.
+ ///
+ /// The consumer channel identifier
+ /// The message data as string
+ /// True if the message was handled successfully, false otherwise
+ Task HandleMessageAsync(string channel, string data);
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQService.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQService.cs
new file mode 100644
index 000000000..672e539c1
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQService.cs
@@ -0,0 +1,31 @@
+using BotSharp.Abstraction.Infrastructures.MessageQueues.Models;
+
+namespace BotSharp.Abstraction.Infrastructures.MessageQueues;
+
+public interface IMQService : IDisposable
+{
+ ///
+ /// Subscribe a consumer to the message queue.
+ /// The consumer will be initialized with the appropriate MQ-specific infrastructure.
+ ///
+ /// Unique identifier for the consumer
+ /// The consumer implementing IMQConsumer interface
+ /// Task representing the async subscription operation
+ Task SubscribeAsync(string key, IMQConsumer consumer);
+
+ ///
+ /// Unsubscribe a consumer from the message queue.
+ ///
+ /// Unique identifier for the consumer
+ /// Task representing the async unsubscription operation
+ Task UnsubscribeAsync(string key);
+
+ ///
+ /// Publish payload to message queue
+ ///
+ ///
+ ///
+ ///
+ ///
+ Task PublishAsync(T payload, MQPublishOptions options);
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MQConsumerBase.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MQConsumerBase.cs
new file mode 100644
index 000000000..cd66be1fd
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MQConsumerBase.cs
@@ -0,0 +1,51 @@
+using Microsoft.Extensions.Logging;
+
+namespace BotSharp.Abstraction.Infrastructures.MessageQueues;
+
+///
+/// Abstract base class for RabbitMQ consumers.
+/// Implements IMQConsumer to allow other projects to define consumers independently of RabbitMQ.
+/// The RabbitMQ-specific infrastructure is handled by RabbitMQService.
+///
+public abstract class MQConsumerBase : IMQConsumer
+{
+ protected readonly IServiceProvider _services;
+ protected readonly ILogger _logger;
+ private bool _disposed = false;
+
+ ///
+ /// Gets the consumer config for this consumer.
+ /// Override this property to customize exchange, queue and routing configuration.
+ ///
+ public abstract object Config { get; }
+
+ protected MQConsumerBase(
+ IServiceProvider services,
+ ILogger logger)
+ {
+ _services = services;
+ _logger = logger;
+ }
+
+ ///
+ /// Handles the received message from the queue.
+ ///
+ /// The consumer channel identifier
+ /// The message data as string
+ /// True if the message was handled successfully, false otherwise
+ public abstract Task HandleMessageAsync(string channel, string data);
+
+ public void Dispose()
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ var consumerName = GetType().Name;
+ _logger.LogWarning($"Disposing consumer: {consumerName}");
+ _disposed = true;
+ GC.SuppressFinalize(this);
+ }
+}
+
diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MessageQueueSettings.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MessageQueueSettings.cs
new file mode 100644
index 000000000..b08a5a054
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MessageQueueSettings.cs
@@ -0,0 +1,7 @@
+namespace BotSharp.Abstraction.Infrastructures.MessageQueues;
+
+public class MessageQueueSettings
+{
+ public bool Enabled { get; set; }
+ public string Provider { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQMessage.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQMessage.cs
new file mode 100644
index 000000000..e940aff01
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQMessage.cs
@@ -0,0 +1,14 @@
+namespace BotSharp.Abstraction.Infrastructures.MessageQueues.Models;
+
+public class MQMessage
+{
+ public MQMessage(T payload, string messageId)
+ {
+ Payload = payload;
+ MessageId = messageId;
+ }
+
+ public T Payload { get; set; }
+ public string MessageId { get; set; }
+ public DateTime CreateDate { get; set; } = DateTime.UtcNow;
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQPublishOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQPublishOptions.cs
new file mode 100644
index 000000000..dead523be
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQPublishOptions.cs
@@ -0,0 +1,40 @@
+using System.Text.Json;
+
+namespace BotSharp.Abstraction.Infrastructures.MessageQueues.Models;
+
+///
+/// Configuration options for publishing messages to a message queue.
+/// These options are MQ-product agnostic and can be adapted by different implementations.
+///
+public class MQPublishOptions
+{
+ ///
+ /// The topic name (exchange in RabbitMQ, topic in Kafka/Azure Service Bus).
+ ///
+ public string TopicName { get; set; } = string.Empty;
+
+ ///
+ /// The routing key (partition key in some MQ systems, used for message routing).
+ ///
+ public string RoutingKey { get; set; } = string.Empty;
+
+ ///
+ /// Delay in milliseconds before the message is delivered.
+ ///
+ public long DelayMilliseconds { get; set; }
+
+ ///
+ /// Optional unique identifier for the message.
+ ///
+ public string? MessageId { get; set; }
+
+ ///
+ /// Additional arguments for the publish configuration (MQ-specific).
+ ///
+ public Dictionary Arguments { get; set; } = [];
+
+ ///
+ /// Json serializer options
+ ///
+ public JsonSerializerOptions? JsonOptions { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Instructs/IInstructHook.cs b/src/Infrastructure/BotSharp.Abstraction/Instructs/IInstructHook.cs
index 54fc80a90..a39e4bf05 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Instructs/IInstructHook.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Instructs/IInstructHook.cs
@@ -12,5 +12,5 @@ public interface IInstructHook : IHookBase
Task OnResponseGenerated(InstructResponseModel response) => Task.CompletedTask;
Task BeforeCodeExecution(Agent agent, CodeExecutionContext context) => Task.CompletedTask;
- Task AfterCodeExecution(Agent agent, CodeExecutionResponseModel response) => Task.CompletedTask;
+ Task AfterCodeExecution(Agent agent, CodeExecutionContext context, CodeExecutionResponseModel response) => Task.CompletedTask;
}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Instructs/InstructHookBase.cs b/src/Infrastructure/BotSharp.Abstraction/Instructs/InstructHookBase.cs
index f5758434c..81023135f 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Instructs/InstructHookBase.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Instructs/InstructHookBase.cs
@@ -28,7 +28,7 @@ public virtual async Task BeforeCodeExecution(Agent agent, CodeExecutionContext
await Task.CompletedTask;
}
- public virtual async Task AfterCodeExecution(Agent agent, CodeExecutionResponseModel response)
+ public virtual async Task AfterCodeExecution(Agent agent, CodeExecutionContext context, CodeExecutionResponseModel response)
{
await Task.CompletedTask;
}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Hooks/IRuleTriggerHook.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Hooks/IRuleTriggerHook.cs
new file mode 100644
index 000000000..5d5c31fd6
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Hooks/IRuleTriggerHook.cs
@@ -0,0 +1,13 @@
+using BotSharp.Abstraction.Hooks;
+using BotSharp.Abstraction.Rules.Models;
+
+namespace BotSharp.Abstraction.Rules.Hooks;
+
+public interface IRuleTriggerHook : IHookBase
+{
+ Task BeforeRuleCriteriaExecuted(Agent agent, AgentRuleCriteria ruleCriteria, IRuleTrigger trigger, RuleCriteriaContext context) => Task.CompletedTask;
+ Task AfterRuleCriteriaExecuted(Agent agent, AgentRuleCriteria ruleCriteria, IRuleTrigger trigger, RuleCriteriaResult result) => Task.CompletedTask;
+
+ Task BeforeRuleActionExecuted(Agent agent, AgentRuleAction ruleAction, IRuleTrigger trigger, RuleActionContext context) => Task.CompletedTask;
+ Task AfterRuleActionExecuted(Agent agent, AgentRuleAction ruleAction, IRuleTrigger trigger, RuleActionResult result) => Task.CompletedTask;
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs
index bebac5d6f..ee2d43e68 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs
@@ -1,5 +1,32 @@
+using BotSharp.Abstraction.Rules.Models;
+using System.Text.Json;
+
namespace BotSharp.Abstraction.Rules;
+///
+/// Base interface for rule actions that can be executed by the RuleEngine
+///
public interface IRuleAction
{
-}
+ ///
+ /// The unique name of the rule action provider
+ ///
+ string Name { get; }
+
+ ///
+ /// The default config json format.
+ ///
+ JsonDocument DefaultConfig => JsonDocument.Parse("{}");
+
+ ///
+ /// Execute the rule action
+ ///
+ /// The agent that triggered the rule
+ /// The rule trigger
+ /// The action context
+ /// The action execution result
+ Task ExecuteAsync(
+ Agent agent,
+ IRuleTrigger trigger,
+ RuleActionContext context);
+}
\ No newline at end of file
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs
index bc5022911..30658e582 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs
@@ -1,5 +1,14 @@
+using BotSharp.Abstraction.Rules.Models;
+using System.Text.Json;
+
namespace BotSharp.Abstraction.Rules;
public interface IRuleCriteria
{
+ string Provider { get; }
+
+ JsonDocument DefaultConfig => JsonDocument.Parse("{}");
+
+ Task ValidateAsync(Agent agent, IRuleTrigger trigger, RuleCriteriaContext context)
+ => Task.FromResult(new RuleCriteriaResult());
}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleEngine.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleEngine.cs
index c7a6d847b..ff539e87f 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleEngine.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleEngine.cs
@@ -13,4 +13,14 @@ public interface IRuleEngine
///
Task> Triggered(IRuleTrigger trigger, string text, IEnumerable? states = null, RuleTriggerOptions? options = null)
=> throw new NotImplementedException();
+
+ ///
+ /// Execute rule actions
+ ///
+ ///
+ ///
+ ///
+ ///
+ Task ExecuteActions(IRuleTrigger trigger, IEnumerable actions, RuleExecutionActionOptions options)
+ => Task.FromResult(false);
}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionContext.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionContext.cs
new file mode 100644
index 000000000..aecbf2eed
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionContext.cs
@@ -0,0 +1,12 @@
+using System.Text.Json;
+
+namespace BotSharp.Abstraction.Rules.Models;
+
+public class RuleActionContext
+{
+ public string Text { get; set; } = string.Empty;
+ public Dictionary Parameters { get; set; } = [];
+ public IEnumerable PrevStepResults { get; set; } = [];
+ public IEnumerable NextActions { get; set; } = [];
+ public JsonSerializerOptions? JsonOptions { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionResult.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionResult.cs
new file mode 100644
index 000000000..453d75a4c
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionResult.cs
@@ -0,0 +1,55 @@
+namespace BotSharp.Abstraction.Rules.Models;
+
+///
+/// Result of a rule action execution
+///
+public class RuleActionResult
+{
+ ///
+ /// Whether the action executed successfully
+ ///
+ public bool Success { get; set; }
+
+ ///
+ /// Response content from the action
+ ///
+ public string? Response { get; set; }
+
+ ///
+ /// Result data
+ ///
+ public Dictionary Data { get; set; } = [];
+
+ ///
+ /// Error message if the action failed
+ ///
+ public string? ErrorMessage { get; set; }
+
+ ///
+ /// Whether the action is delayed
+ ///
+ public bool IsDelayed { get; set; }
+
+ public static RuleActionResult Succeeded(string? response = null)
+ {
+ return new RuleActionResult
+ {
+ Success = true,
+ Response = response
+ };
+ }
+
+ public static RuleActionResult Failed(string errorMessage)
+ {
+ return new RuleActionResult
+ {
+ Success = false,
+ ErrorMessage = errorMessage
+ };
+ }
+}
+
+public class RuleActionStepResult : RuleActionResult
+{
+ public AgentRuleAction RuleAction { get; set; }
+}
\ No newline at end of file
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaContext.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaContext.cs
new file mode 100644
index 000000000..7fff33344
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaContext.cs
@@ -0,0 +1,10 @@
+using System.Text.Json;
+
+namespace BotSharp.Abstraction.Rules.Models;
+
+public class RuleCriteriaContext
+{
+ public string Text { get; set; } = string.Empty;
+ public Dictionary Parameters { get; set; } = [];
+ public JsonSerializerOptions? JsonOptions { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaResult.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaResult.cs
new file mode 100644
index 000000000..fc1df6ceb
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaResult.cs
@@ -0,0 +1,19 @@
+namespace BotSharp.Abstraction.Rules.Models;
+
+public class RuleCriteriaResult
+{
+ ///
+ /// Whether the criteria executed successfully
+ ///
+ public bool Success { get; set; }
+
+ ///
+ /// Response content from the action
+ ///
+ public bool IsValid { get; set; }
+
+ ///
+ /// Error message if the criteria failed
+ ///
+ public string? ErrorMessage { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleCriteriaOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleCriteriaOptions.cs
new file mode 100644
index 000000000..27b5167f4
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleCriteriaOptions.cs
@@ -0,0 +1,39 @@
+using System.Text.Json;
+
+namespace BotSharp.Abstraction.Rules.Options;
+
+public class RuleCriteriaOptions : CriteriaExecuteOptions
+{
+ ///
+ /// Criteria execution provider
+ ///
+ public string Provider { get; set; } = "botsharp-rule";
+}
+
+public class CriteriaExecuteOptions
+{
+ ///
+ /// Code processor provider
+ ///
+ public string? CodeProcessor { get; set; }
+
+ ///
+ /// Code script name
+ ///
+ public string? CodeScriptName { get; set; }
+
+ ///
+ /// Argument name as an input key to the code script
+ ///
+ public string? ArgumentName { get; set; }
+
+ ///
+ /// Json arguments as an input value to the code script
+ ///
+ public JsonDocument? ArgumentContent { get; set; }
+
+ ///
+ /// Custom parameters
+ ///
+ public Dictionary Parameters { get; set; } = [];
+}
\ No newline at end of file
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleExecutionActionOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleExecutionActionOptions.cs
new file mode 100644
index 000000000..48787e06e
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleExecutionActionOptions.cs
@@ -0,0 +1,8 @@
+namespace BotSharp.Abstraction.Rules.Options;
+
+public class RuleExecutionActionOptions
+{
+ public string AgentId { get; set; }
+ public string Text { get; set; }
+ public IEnumerable States { get; set; } = [];
+}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs
index 068052b0b..abba98115 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs
@@ -1,3 +1,4 @@
+using BotSharp.Abstraction.Repositories.Filters;
using System.Text.Json;
namespace BotSharp.Abstraction.Rules.Options;
@@ -5,22 +6,12 @@ namespace BotSharp.Abstraction.Rules.Options;
public class RuleTriggerOptions
{
///
- /// Code processor provider
+ /// Filter agents
///
- public string? CodeProcessor { get; set; }
+ public AgentFilter? AgentFilter { get; set; }
///
- /// Code script name
+ /// Json serializer options
///
- public string? CodeScriptName { get; set; }
-
- ///
- /// Argument name as an input key to the code script
- ///
- public string? ArgumentName { get; set; }
-
- ///
- /// Json arguments as an input value to the code script
- ///
- public JsonDocument? ArgumentContent { get; set; }
+ public JsonSerializerOptions? JsonOptions { get; set; }
}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs b/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs
index a36516c8a..c0f858fd5 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs
@@ -65,4 +65,70 @@ public static class ObjectExtensions
return null;
}
}
+
+ public static T? TryGetValueOrDefault(this IDictionary dict, string key, T? defaultValue = default, JsonSerializerOptions? jsonOptions = null)
+ {
+ return dict.TryGetValue(key, out var value, jsonOptions)
+ ? value!
+ : defaultValue;
+ }
+
+ public static bool TryGetValue(this IDictionary dict, string key, out T? result, JsonSerializerOptions? jsonOptions = null)
+ {
+ result = default;
+
+ if (!dict.TryGetValue(key, out var value) || value is null)
+ {
+ return false;
+ }
+
+ if (value is T t)
+ {
+ result = t;
+ return true;
+ }
+
+ if (value is JsonElement je)
+ {
+ try
+ {
+ result = je.Deserialize(jsonOptions);
+ return true;
+ }
+ catch
+ {
+ return false;
+ }
+ }
+
+ return false;
+ }
+
+
+ public static T? TryGetObjectValueOrDefault(this IDictionary dict, string key, T? defaultValue = default, JsonSerializerOptions? jsonOptions = null) where T : class
+ {
+ return dict.TryGetObjectValue(key, out var value, jsonOptions)
+ ? value!
+ : defaultValue;
+ }
+
+ public static bool TryGetObjectValue(this IDictionary dict, string key, out T? result, JsonSerializerOptions? jsonOptions = null) where T : class
+ {
+ result = default;
+
+ if (!dict.TryGetValue(key, out var value) || value is null)
+ {
+ return false;
+ }
+
+ try
+ {
+ result = JsonSerializer.Deserialize(value, jsonOptions);
+ return true;
+ }
+ catch
+ {
+ return false;
+ }
+ }
}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Actions/ChatRuleAction.cs b/src/Infrastructure/BotSharp.Core.Rules/Actions/ChatRuleAction.cs
new file mode 100644
index 000000000..eae15d74e
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Actions/ChatRuleAction.cs
@@ -0,0 +1,77 @@
+namespace BotSharp.Core.Rules.Actions;
+
+public sealed class ChatRuleAction : IRuleAction
+{
+ private readonly IServiceProvider _services;
+ private readonly ILogger _logger;
+
+ public ChatRuleAction(
+ IServiceProvider services,
+ ILogger logger)
+ {
+ _services = services;
+ _logger = logger;
+ }
+
+ public string Name => "send_message_to_agent";
+
+ public async Task ExecuteAsync(
+ Agent agent,
+ IRuleTrigger trigger,
+ RuleActionContext context)
+ {
+ using var scope = _services.CreateScope();
+ var sp = scope.ServiceProvider;
+
+ try
+ {
+ var channel = trigger.Channel;
+ var convService = sp.GetRequiredService();
+ var conv = await convService.NewConversation(new Conversation
+ {
+ Channel = channel,
+ Title = context.Text,
+ AgentId = agent.Id
+ });
+
+ var message = new RoleDialogModel(AgentRole.User, context.Text);
+
+ var allStates = new List
+ {
+ new("channel", channel)
+ };
+
+ if (!context.Parameters.IsNullOrEmpty())
+ {
+ var states = context.Parameters.Where(x => x.Value != null).Select(x => new MessageState(x.Key, x.Value!));
+ allStates.AddRange(states);
+ }
+
+ await convService.SetConversationId(conv.Id, allStates);
+ await convService.SendMessage(agent.Id,
+ message,
+ null,
+ msg => Task.CompletedTask);
+
+ await convService.SaveStates();
+
+ _logger.LogInformation("Chat rule action executed successfully for agent {AgentId}, conversation {ConversationId}", agent.Id, conv.Id);
+
+ return new RuleActionResult
+ {
+ Success = true,
+ Response = conv.Id,
+ Data = new()
+ {
+ ["agent_id"] = agent.Id,
+ ["conversation_id"] = conv.Id
+ }
+ };
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error when sending chat via rule action for agent {AgentId} and trigger {TriggerName}", agent.Id, trigger.Name);
+ return RuleActionResult.Failed(ex.Message);
+ }
+ }
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Actions/FunctionCallRuleAction.cs b/src/Infrastructure/BotSharp.Core.Rules/Actions/FunctionCallRuleAction.cs
new file mode 100644
index 000000000..7251a24d7
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Actions/FunctionCallRuleAction.cs
@@ -0,0 +1,50 @@
+using BotSharp.Abstraction.Functions;
+
+namespace BotSharp.Core.Rules.Actions;
+
+public sealed class FunctionCallRuleAction : IRuleAction
+{
+ private readonly IServiceProvider _services;
+ private readonly ILogger _logger;
+
+ public FunctionCallRuleAction(
+ IServiceProvider services,
+ ILogger logger)
+ {
+ _services = services;
+ _logger = logger;
+ }
+
+ public string Name => "function_call";
+
+ public async Task ExecuteAsync(
+ Agent agent,
+ IRuleTrigger trigger,
+ RuleActionContext context)
+ {
+ var funcName = context.Parameters.TryGetValue("function_name", out var fName) ? fName : null;
+ var func = _services.GetServices().FirstOrDefault(x => x.Name.IsEqualTo(funcName));
+
+ if (func == null)
+ {
+ var errorMsg = $"Unable to find function '{funcName}' when running action {agent.Name}-{trigger.Name}";
+ _logger.LogWarning(errorMsg);
+ return RuleActionResult.Failed(errorMsg);
+ }
+
+ var funcArg = context.Parameters.TryGetObjectValueOrDefault("function_argument", new()) ?? new();
+ await func.Execute(funcArg);
+
+ return new RuleActionResult
+ {
+ Success = true,
+ Response = funcArg?.RichContent?.Message?.Text ?? funcArg?.Content,
+ Data = new()
+ {
+ ["function_name"] = funcName!,
+ ["function_argument"] = funcArg?.ConvertToString() ?? "{}",
+ ["function_call_result"] = funcArg?.RichContent?.Message?.Text ?? funcArg?.Content ?? string.Empty
+ }
+ };
+ }
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Actions/HttpRuleAction.cs b/src/Infrastructure/BotSharp.Core.Rules/Actions/HttpRuleAction.cs
new file mode 100644
index 000000000..18aa2446a
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Actions/HttpRuleAction.cs
@@ -0,0 +1,191 @@
+using System.Net.Mime;
+using System.Text.Json;
+using System.Web;
+
+namespace BotSharp.Core.Rules.Actions;
+
+public sealed class HttpRuleAction : IRuleAction
+{
+ private readonly IServiceProvider _services;
+ private readonly ILogger _logger;
+ private readonly IHttpClientFactory _httpClientFactory;
+
+ public HttpRuleAction(
+ IServiceProvider services,
+ ILogger logger,
+ IHttpClientFactory httpClientFactory)
+ {
+ _services = services;
+ _logger = logger;
+ _httpClientFactory = httpClientFactory;
+ }
+
+ public string Name => "http_request";
+
+ public JsonDocument DefaultConfig => JsonDocument.Parse(JsonSerializer.Serialize(new
+ {
+ http_url = "https://dummy.example.com/api/v1/employees",
+ http_method = "GET"
+ }));
+
+ public async Task ExecuteAsync(
+ Agent agent,
+ IRuleTrigger trigger,
+ RuleActionContext context)
+ {
+ try
+ {
+ var httpMethod = GetHttpMethod(context);
+ if (httpMethod == null)
+ {
+ var errorMsg = $"HTTP method is not supported in agent rule {agent.Name}-{trigger.Name}";
+ _logger.LogWarning(errorMsg);
+ return RuleActionResult.Failed(errorMsg);
+ }
+
+ // Build the full URL
+ var fullUrl = BuildUrl(context);
+
+ using var client = _httpClientFactory.CreateClient();
+
+ // Add headers
+ AddHttpHeaders(client, context);
+
+ // Create request
+ var request = new HttpRequestMessage(httpMethod, fullUrl);
+
+ // Add request body if provided
+ var requestBodyStr = GetHttpRequestBody(context);
+ if (!string.IsNullOrEmpty(requestBodyStr))
+ {
+ request.Content = new StringContent(requestBodyStr, Encoding.UTF8, MediaTypeNames.Application.Json);
+ }
+
+ _logger.LogInformation("Executing HTTP rule action for agent {AgentId}, URL: {Url}, Method: {Method}",
+ agent.Id, fullUrl, httpMethod);
+
+ // Send request
+ var response = await client.SendAsync(request);
+ var responseContent = await response.Content.ReadAsStringAsync();
+
+ if (response.IsSuccessStatusCode)
+ {
+ _logger.LogInformation("HTTP rule action executed successfully for agent {AgentId}, Status: {StatusCode}, Response: {Response}",
+ agent.Id, response.StatusCode, responseContent);
+
+ return new RuleActionResult
+ {
+ Success = true,
+ Response = responseContent,
+ Data = new()
+ {
+ ["http_response"] = responseContent
+ }
+ };
+ }
+ else
+ {
+ var errorMsg = $"HTTP request failed with status code {response.StatusCode}: {responseContent}";
+ _logger.LogWarning(errorMsg);
+ return RuleActionResult.Failed(errorMsg);
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error executing HTTP rule action for agent {AgentId} and trigger {TriggerName}",
+ agent.Id, trigger.Name);
+ return RuleActionResult.Failed(ex.Message);
+ }
+ }
+
+ private string BuildUrl(RuleActionContext context)
+ {
+ var url = context.Parameters.GetValueOrDefault("http_url", string.Empty);
+ if (string.IsNullOrEmpty(url))
+ {
+ throw new ArgumentNullException("Unable to find http_url in context");
+ }
+
+ // Fill in placeholders in url
+ foreach (var param in context.Parameters)
+ {
+ var value = param.Value?.ToString();
+ if (string.IsNullOrEmpty(value))
+ {
+ continue;
+ }
+ url = url.Replace($"{{{param.Key}}}", value);
+ }
+
+ // Add query parameters
+ var queryParams = context.Parameters.TryGetObjectValueOrDefault>("http_query_params");
+ if (!queryParams.IsNullOrEmpty())
+ {
+ var builder = new UriBuilder(url);
+ var query = HttpUtility.ParseQueryString(builder.Query);
+
+ // Add new query params
+ foreach (var kv in queryParams!.Where(x => x.Value != null))
+ {
+ query[kv.Key] = kv.Value!;
+ }
+
+ // Assign merged query back
+ builder.Query = query.ToString();
+ url = builder.ToString();
+ }
+
+ _logger.LogInformation("HTTP url after filling: {Url}", url);
+ return url;
+ }
+
+ private HttpMethod? GetHttpMethod(RuleActionContext context)
+ {
+ var method = context.Parameters.GetValueOrDefault("http_method", string.Empty);
+ var innerMethod = method?.Trim()?.ToUpper();
+ HttpMethod? matchMethod = null;
+
+ switch (innerMethod)
+ {
+ case "GET":
+ matchMethod = HttpMethod.Get;
+ break;
+ case "POST":
+ matchMethod = HttpMethod.Post;
+ break;
+ case "DELETE":
+ matchMethod = HttpMethod.Delete;
+ break;
+ case "PUT":
+ matchMethod = HttpMethod.Put;
+ break;
+ case "PATCH":
+ matchMethod = HttpMethod.Patch;
+ break;
+ default:
+ break;
+
+ }
+
+ return matchMethod;
+ }
+
+ private void AddHttpHeaders(HttpClient client, RuleActionContext context)
+ {
+ var headerParams = context.Parameters.TryGetObjectValueOrDefault>("http_request_headers");
+ if (!headerParams.IsNullOrEmpty())
+ {
+ foreach (var header in headerParams!)
+ {
+ client.DefaultRequestHeaders.TryAddWithoutValidation(header.Key, header.Value);
+ }
+ }
+ }
+
+ private string? GetHttpRequestBody(RuleActionContext context)
+ {
+ var body = context.Parameters.GetValueOrDefault("http_request_body");
+ return body;
+ }
+}
+
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Constants/RuleConstant.cs b/src/Infrastructure/BotSharp.Core.Rules/Constants/RuleConstant.cs
new file mode 100644
index 000000000..daf54db77
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Constants/RuleConstant.cs
@@ -0,0 +1,6 @@
+namespace BotSharp.Core.Rules.Constants;
+
+public static class RuleConstant
+{
+ public const string DEFAULT_CRITERIA_PROVIDER = "code_script";
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Controllers/RuleController.cs b/src/Infrastructure/BotSharp.Core.Rules/Controllers/RuleController.cs
new file mode 100644
index 000000000..feb314ef7
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Controllers/RuleController.cs
@@ -0,0 +1,42 @@
+using BotSharp.Core.Rules.Models;
+using Microsoft.AspNetCore.Authorization;
+using Microsoft.AspNetCore.Mvc;
+
+namespace BotSharp.Core.Rules.Controllers;
+
+[Authorize]
+[ApiController]
+public class RuleController : ControllerBase
+{
+ private readonly IServiceProvider _services;
+ private readonly ILogger _logger;
+ private readonly IRuleEngine _ruleEngine;
+
+ public RuleController(
+ IServiceProvider services,
+ ILogger logger,
+ IRuleEngine ruleEngine)
+ {
+ _services = services;
+ _logger = logger;
+ _ruleEngine = ruleEngine;
+ }
+
+ [HttpPost("/rule/trigger/action")]
+ public async Task RunAction([FromBody] RuleTriggerActionRequest request)
+ {
+ if (request == null)
+ {
+ return BadRequest(new { Success = false, Error = "Request cannnot be empty." });
+ }
+
+ var trigger = _services.GetServices().FirstOrDefault(x => x.Name.IsEqualTo(request.TriggerName));
+ if (trigger == null)
+ {
+ return BadRequest(new { Success = false, Error = "Unable to find rule trigger." });
+ }
+
+ var result = await _ruleEngine.Triggered(trigger, request.Text, request.States, request.Options);
+ return Ok(new { Success = true });
+ }
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Criteria/CodeScriptRuleCriteria.cs b/src/Infrastructure/BotSharp.Core.Rules/Criteria/CodeScriptRuleCriteria.cs
new file mode 100644
index 000000000..31536844f
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Criteria/CodeScriptRuleCriteria.cs
@@ -0,0 +1,140 @@
+using System.Text.Json;
+
+namespace BotSharp.Core.Rules.Criteria;
+
+public class CodeScriptRuleCriteria : IRuleCriteria
+{
+ private readonly IServiceProvider _services;
+ private readonly ILogger _logger;
+ private readonly CodingSettings _codingSettings;
+
+ public CodeScriptRuleCriteria(
+ IServiceProvider services,
+ ILogger logger,
+ CodingSettings codingSettings)
+ {
+ _services = services;
+ _logger = logger;
+ _codingSettings = codingSettings;
+ }
+
+ public string Provider => RuleConstant.DEFAULT_CRITERIA_PROVIDER;
+
+ public JsonDocument DefaultConfig => JsonDocument.Parse(JsonSerializer.Serialize(new
+ {
+ code_processor = BuiltInCodeProcessor.PyInterpreter,
+ code_script_name = "{trigger_name}_rule.py",
+ code_script_arg_name = "trigger_args",
+ code_script_arg_value = JsonDocument.Parse("{}")
+ }));
+
+ public async Task ValidateAsync(Agent agent, IRuleTrigger trigger, RuleCriteriaContext context)
+ {
+ var result = new RuleCriteriaResult();
+
+ if (string.IsNullOrWhiteSpace(agent?.Id))
+ {
+ return result;
+ }
+
+ var provider = context.Parameters.GetValueOrDefault("code_processor", BuiltInCodeProcessor.PyInterpreter);
+ var processor = _services.GetServices().FirstOrDefault(x => x.Provider.IsEqualTo(provider));
+ if (processor == null)
+ {
+ _logger.LogWarning($"Unable to find code processor: {provider}.");
+ return result;
+ }
+
+ var agentService = _services.GetRequiredService();
+ var scriptName = context.Parameters.GetValueOrDefault("code_script_name", $"{trigger.Name}_rule.py");
+ var codeScript = await agentService.GetAgentCodeScript(agent.Id, scriptName, scriptType: AgentCodeScriptType.Src);
+
+ var msg = $"rule trigger ({trigger.Name}) code script ({scriptName}) in agent ({agent.Name}).";
+
+ if (codeScript == null || string.IsNullOrWhiteSpace(codeScript.Content))
+ {
+ _logger.LogWarning($"Unable to find {msg}.");
+ return result;
+ }
+
+ try
+ {
+ var hooks = _services.GetHooks(agent.Id);
+
+ var argName = context.Parameters.GetValueOrDefault("code_script_arg_name", null);
+ var argValue = context.Parameters.TryGetValue("code_script_arg_value", out var val) && val != null ? JsonSerializer.Deserialize(val) : (JsonElement?)null;
+ var arguments = BuildArguments(argName, argValue);
+ var codeExecutionContext = new CodeExecutionContext
+ {
+ CodeScript = codeScript,
+ Arguments = arguments,
+ InvokeFrom = nameof(CodeScriptRuleCriteria)
+ };
+
+ foreach (var hook in hooks)
+ {
+ await hook.BeforeCodeExecution(agent, codeExecutionContext);
+ }
+
+ codeScript = codeExecutionContext.CodeScript;
+ var (useLock, useProcess, timeoutSeconds) = CodingUtil.GetCodeExecutionConfig(_codingSettings);
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSeconds));
+ var response = processor.Run(codeScript.Content, options: new()
+ {
+ ScriptName = scriptName,
+ Arguments = codeExecutionContext.Arguments,
+ UseLock = useLock,
+ UseProcess = useProcess
+ }, cancellationToken: cts.Token);
+
+ var codeResponse = new CodeExecutionResponseModel
+ {
+ CodeProcessor = processor.Provider,
+ CodeScript = codeScript,
+ Arguments = arguments.DistinctBy(x => x.Key).ToDictionary(x => x.Key, x => x.Value ?? string.Empty),
+ ExecutionResult = response
+ };
+
+ foreach (var hook in hooks)
+ {
+ await hook.AfterCodeExecution(agent, codeExecutionContext, codeResponse);
+ }
+
+ if (response == null || !response.Success)
+ {
+ _logger.LogWarning($"Failed to handle {msg}");
+ return result;
+ }
+
+ LogLevel logLevel;
+ if (response.Result.IsEqualTo("true"))
+ {
+ logLevel = LogLevel.Information;
+ result.Success = true;
+ result.IsValid = true;
+ }
+ else
+ {
+ logLevel = LogLevel.Warning;
+ }
+
+ _logger.Log(logLevel, $"Code script execution result ({response}) from {msg}");
+ return result;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Error when handling {msg}");
+ return result;
+ }
+ }
+
+ private List BuildArguments(string? name, JsonElement? args)
+ {
+ var keyValues = new List();
+ if (args != null)
+ {
+ keyValues.Add(new KeyValue(name ?? "trigger_args", args.Value.GetRawText()));
+ }
+ return keyValues;
+ }
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/DemoRuleTrigger.cs b/src/Infrastructure/BotSharp.Core.Rules/DemoRuleTrigger.cs
new file mode 100644
index 000000000..70a9eac22
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/DemoRuleTrigger.cs
@@ -0,0 +1,10 @@
+namespace BotSharp.Core.Rules;
+
+public class DemoRuleTrigger : IRuleTrigger
+{
+ public string Channel => "crontab";
+ public string Name => nameof(DemoRuleTrigger);
+
+ public string EntityType { get; set; } = "DemoType";
+ public string EntityId { get; set; } = "DemoId";
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs b/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs
index 5f68b722d..75a13d398 100644
--- a/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs
+++ b/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs
@@ -1,17 +1,4 @@
-using BotSharp.Abstraction.Agents.Models;
-using BotSharp.Abstraction.Coding;
-using BotSharp.Abstraction.Coding.Contexts;
-using BotSharp.Abstraction.Coding.Enums;
-using BotSharp.Abstraction.Coding.Models;
-using BotSharp.Abstraction.Coding.Settings;
-using BotSharp.Abstraction.Coding.Utils;
-using BotSharp.Abstraction.Conversations;
-using BotSharp.Abstraction.Hooks;
-using BotSharp.Abstraction.Models;
-using BotSharp.Abstraction.Repositories.Filters;
-using BotSharp.Abstraction.Rules.Options;
-using BotSharp.Abstraction.Utilities;
-using Microsoft.Extensions.Logging;
+using BotSharp.Abstraction.Templating;
using System.Data;
using System.Text.Json;
@@ -21,16 +8,13 @@ public class RuleEngine : IRuleEngine
{
private readonly IServiceProvider _services;
private readonly ILogger _logger;
- private readonly CodingSettings _codingSettings;
public RuleEngine(
IServiceProvider services,
- ILogger logger,
- CodingSettings codingSettings)
+ ILogger logger)
{
_services = services;
_logger = logger;
- _codingSettings = codingSettings;
}
public async Task> Triggered(IRuleTrigger trigger, string text, IEnumerable? states = null, RuleTriggerOptions? options = null)
@@ -39,7 +23,7 @@ public async Task> Triggered(IRuleTrigger trigger, string te
// Pull all user defined rules
var agentService = _services.GetRequiredService();
- var agents = await agentService.GetAgents(new AgentFilter
+ var agents = await agentService.GetAgents(options?.AgentFilter ?? new AgentFilter
{
Pager = new Pagination
{
@@ -51,154 +35,351 @@ public async Task> Triggered(IRuleTrigger trigger, string te
var filteredAgents = agents.Items.Where(x => x.Rules.Exists(r => r.TriggerName.IsEqualTo(trigger.Name) && !x.Disabled)).ToList();
foreach (var agent in filteredAgents)
{
- // Code trigger
- if (options != null)
+ var rule = agent.Rules.FirstOrDefault(x => x.TriggerName.IsEqualTo(trigger.Name) && !x.Disabled);
+ if (rule == null)
{
- var isTriggered = await TriggerCodeScript(agent, trigger.Name, options);
- if (!isTriggered)
+ continue;
+ }
+
+ // Criteria validation
+ if (!string.IsNullOrEmpty(rule.RuleCriteria?.Name) && !rule.RuleCriteria.Disabled)
+ {
+ var criteriaResult = await ExecuteCriteriaAsync(agent, rule.RuleCriteria, trigger, text, states, options);
+ if (criteriaResult?.IsValid == false)
{
+ _logger.LogWarning("Criteria validation failed for agent {AgentId} with trigger {TriggerName}", agent.Id, trigger.Name);
continue;
}
}
- var convService = _services.GetRequiredService();
- var conv = await convService.NewConversation(new Conversation
+ // Execute action
+ var ruleActions = rule.RuleActions?.Where(x => x != null && !string.IsNullOrEmpty(x.Name) && !x.Disabled) ?? [];
+ if (ruleActions.IsNullOrEmpty())
{
- Channel = trigger.Channel,
- Title = text,
- AgentId = agent.Id
- });
-
- var message = new RoleDialogModel(AgentRole.User, text);
+ continue;
+ }
- var allStates = new List
+ var actionIdx = 0;
+ var stepResults = new List();
+ while (actionIdx >= 0 && actionIdx < ruleActions.Count())
{
- new("channel", trigger.Channel)
- };
+ var ruleAction = ruleActions.ElementAt(actionIdx);
+ var dict = BuildContextParameters(ruleAction.Config, states, stepResults);
- if (states != null)
- {
- allStates.AddRange(states);
- }
+ var skipSteps = RenderSkippingExpression(ruleAction.SkippingExpression, dict);
+ if (skipSteps.HasValue && skipSteps > 0)
+ {
+ actionIdx += skipSteps.Value;
+ continue;
+ }
- await convService.SetConversationId(conv.Id, allStates);
+ var actionResult = await ExecuteActionAsync(agent, ruleAction, ruleActions.Skip(actionIdx + 1), trigger, text, dict, stepResults, options);
+ if (actionResult == null)
+ {
+ actionIdx++;
+ continue;
+ }
- await convService.SendMessage(agent.Id,
- message,
- null,
- msg => Task.CompletedTask);
+ if (!actionResult.Success)
+ {
+ break;
+ }
+
+ stepResults.Add(new()
+ {
+ RuleAction = ruleAction,
+ Success = actionResult.Success,
+ Response = actionResult.Response,
+ ErrorMessage = actionResult.ErrorMessage,
+ Data = actionResult.Data
+ });
+
+ if (actionResult?.Success == true
+ && actionResult.Data.TryGetValue("conversation_id", out var convId)
+ && convId != null)
+ {
+ newConversationIds.Add(convId.ToString()!);
+ }
- await convService.SaveStates();
- newConversationIds.Add(conv.Id);
+ if (actionResult?.IsDelayed == true)
+ {
+ break;
+ }
+
+ actionIdx++;
+ }
}
return newConversationIds;
}
- #region Private methods
- private async Task TriggerCodeScript(Agent agent, string triggerName, RuleTriggerOptions options)
+ public async Task ExecuteActions(IRuleTrigger trigger, IEnumerable actions, RuleExecutionActionOptions options)
{
- if (string.IsNullOrWhiteSpace(agent?.Id))
- {
- return false;
- }
+ var agentService = _services.GetRequiredService();
+ var agent = await agentService.GetAgent(options.AgentId);
- var provider = options.CodeProcessor ?? BuiltInCodeProcessor.PyInterpreter;
- var processor = _services.GetServices().FirstOrDefault(x => x.Provider.IsEqualTo(provider));
- if (processor == null)
+ var actionIdx = 0;
+ var stepResults = new List();
+ while (actionIdx >= 0 && actionIdx < actions.Count())
{
- _logger.LogWarning($"Unable to find code processor: {provider}.");
- return false;
- }
+ var ruleAction = actions.ElementAt(actionIdx);
+ var dict = BuildContextParameters(ruleAction.Config, options.States, stepResults);
- var agentService = _services.GetRequiredService();
- var scriptName = options.CodeScriptName ?? $"{triggerName}_rule.py";
- var codeScript = await agentService.GetAgentCodeScript(agent.Id, scriptName, scriptType: AgentCodeScriptType.Src);
+ var skipSteps = RenderSkippingExpression(ruleAction.SkippingExpression, dict);
+ if (skipSteps.HasValue && skipSteps > 0)
+ {
+ actionIdx += skipSteps.Value;
+ continue;
+ }
- var msg = $"rule trigger ({triggerName}) code script ({scriptName}) in agent ({agent.Name}) => args: {options.ArgumentContent?.RootElement.GetRawText()}.";
+ var actionResult = await ExecuteActionAsync(agent, ruleAction, actions.Skip(actionIdx + 1), trigger, options.Text, dict, stepResults);
+ if (actionResult == null)
+ {
+ actionIdx++;
+ continue;
+ }
- if (codeScript == null || string.IsNullOrWhiteSpace(codeScript.Content))
- {
- _logger.LogWarning($"Unable to find {msg}.");
- return false;
+ if (!actionResult.Success)
+ {
+ break;
+ }
+
+ stepResults.Add(new()
+ {
+ RuleAction = ruleAction,
+ Success = actionResult.Success,
+ Response = actionResult.Response,
+ ErrorMessage = actionResult.ErrorMessage,
+ Data = actionResult.Data
+ });
+
+ if (actionResult?.IsDelayed == true)
+ {
+ break;
+ }
+
+ actionIdx++;
}
+ return true;
+ }
+
+
+ #region Criteria
+ private async Task ExecuteCriteriaAsync(
+ Agent agent,
+ AgentRuleCriteria ruleCriteria,
+ IRuleTrigger trigger,
+ string text,
+ IEnumerable? states,
+ RuleTriggerOptions? triggerOptions)
+ {
+ var result = new RuleCriteriaResult();
+
try
{
- var hooks = _services.GetHooks(agent.Id);
+ var criteria = _services.GetServices()
+ .FirstOrDefault(x => x.Provider == ruleCriteria.Name);
+
+ if (criteria == null)
+ {
+ return result;
+ }
+
- var arguments = BuildArguments(options.ArgumentName, options.ArgumentContent);
- var context = new CodeExecutionContext
+ var context = new RuleCriteriaContext
{
- CodeScript = codeScript,
- Arguments = arguments
+ Text = text,
+ Parameters = BuildContextParameters(ruleCriteria.Config, states),
+ JsonOptions = triggerOptions?.JsonOptions
};
+ _logger.LogInformation("Start execution rule criteria {CriteriaProvider} for agent {AgentId} with trigger {TriggerName}",
+ criteria.Provider, agent.Id, trigger.Name);
+
+ var hooks = _services.GetHooks(agent.Id);
foreach (var hook in hooks)
{
- await hook.BeforeCodeExecution(agent, context);
+ await hook.BeforeRuleCriteriaExecuted(agent, ruleCriteria, trigger, context);
}
- var (useLock, useProcess, timeoutSeconds) = CodingUtil.GetCodeExecutionConfig(_codingSettings);
- using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSeconds));
- var response = processor.Run(codeScript.Content, options: new()
+ // Execute criteria
+ context.Parameters ??= [];
+ result = await criteria.ValidateAsync(agent, trigger, context);
+
+ foreach (var hook in hooks)
+ {
+ await hook.AfterRuleCriteriaExecuted(agent, ruleCriteria, trigger, result);
+ }
+
+ return result;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error executing rule criteria {CriteriaProvider} for agent {AgentId}", ruleCriteria.Name, agent.Id);
+ return result;
+ }
+ }
+ #endregion
+
+
+ #region Action
+ private async Task ExecuteActionAsync(
+ Agent agent,
+ AgentRuleAction curRuleAction,
+ IEnumerable nextRuleActions,
+ IRuleTrigger trigger,
+ string text,
+ Dictionary param,
+ IEnumerable prevStepResults,
+ RuleTriggerOptions? triggerOptions = null)
+ {
+ try
+ {
+ // Get all registered rule actions
+ var actions = _services.GetServices();
+
+ // Find the matching action
+ var foundAction = actions.FirstOrDefault(x => x.Name.IsEqualTo(curRuleAction.Name));
+
+ if (foundAction == null)
{
- ScriptName = scriptName,
- Arguments = arguments,
- UseLock = useLock,
- UseProcess = useProcess
- }, cancellationToken: cts.Token);
+ var errorMsg = $"No rule action {curRuleAction.Name} is found";
+ _logger.LogWarning(errorMsg);
+ return RuleActionResult.Failed(errorMsg);
+ }
- var codeResponse = new CodeExecutionResponseModel
+ var context = new RuleActionContext
{
- CodeProcessor = processor.Provider,
- CodeScript = codeScript,
- Arguments = arguments.DistinctBy(x => x.Key).ToDictionary(x => x.Key, x => x.Value ?? string.Empty),
- ExecutionResult = response
+ Text = text,
+ Parameters = param,
+ PrevStepResults = prevStepResults,
+ NextActions = nextRuleActions,
+ JsonOptions = triggerOptions?.JsonOptions
};
+ _logger.LogInformation("Start execution rule action {ActionName} for agent {AgentId} with trigger {TriggerName}",
+ foundAction.Name, agent.Id, trigger.Name);
+
+ var hooks = _services.GetHooks(agent.Id);
foreach (var hook in hooks)
{
- await hook.AfterCodeExecution(agent, codeResponse);
+ await hook.BeforeRuleActionExecuted(agent, curRuleAction, trigger, context);
}
- if (response == null || !response.Success)
+ // Execute action
+ context.Parameters ??= [];
+ var result = await foundAction.ExecuteAsync(agent, trigger, context);
+
+ foreach (var hook in hooks)
{
- _logger.LogWarning($"Failed to handle {msg}");
- return false;
+ await hook.AfterRuleActionExecuted(agent, curRuleAction, trigger, result);
}
- bool result;
- LogLevel logLevel;
- if (response.Result.IsEqualTo("true"))
+ return result;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error executing rule action {ActionName} for agent {AgentId}", curRuleAction.Name, agent.Id);
+ return RuleActionResult.Failed(ex.Message);
+ }
+ }
+ #endregion
+
+
+ #region Private methods
+ private Dictionary BuildContextParameters(JsonDocument? config, IEnumerable? states, IEnumerable? stepResults = null)
+ {
+ var dict = new Dictionary();
+
+ if (config != null)
+ {
+ dict = ConvertToDictionary(config);
+ }
+
+ if (!states.IsNullOrEmpty())
+ {
+ foreach (var state in states!)
{
- logLevel = LogLevel.Information;
- result = true;
+ dict[state.Key] = state.Value?.ConvertToString();
}
- else
+ }
+
+ if (!stepResults.IsNullOrEmpty())
+ {
+ foreach (var result in stepResults!)
{
- logLevel = LogLevel.Warning;
- result = false;
- }
+ if (result.Data.IsNullOrEmpty()) continue;
- _logger.Log(logLevel, $"Code script execution result ({response}) from {msg}");
- return result;
+ foreach (var item in result.Data)
+ {
+ dict[item.Key] = item.Value;
+ }
+ }
}
- catch (Exception ex)
+
+ return dict;
+ }
+
+ private static Dictionary ConvertToDictionary(JsonDocument doc)
+ {
+ var dict = new Dictionary();
+
+ foreach (var prop in doc.RootElement.EnumerateObject())
{
- _logger.LogError(ex, $"Error when handling {msg}");
- return false;
+ object? value = prop.Value.ValueKind switch
+ {
+ JsonValueKind.String => prop.Value.GetString(),
+ JsonValueKind.Number when prop.Value.TryGetDecimal(out decimal decimalValue) => decimalValue,
+ JsonValueKind.Number when prop.Value.TryGetDouble(out double doubleValue) => doubleValue,
+ JsonValueKind.Number when prop.Value.TryGetInt32(out int intValue) => intValue,
+ JsonValueKind.Number when prop.Value.TryGetInt64(out long longValue) => longValue,
+ JsonValueKind.Number when prop.Value.TryGetDateTime(out DateTime dateTimeValue) => dateTimeValue,
+ JsonValueKind.Number when prop.Value.TryGetDateTimeOffset(out DateTimeOffset dateTimeOffsetValue) => dateTimeOffsetValue,
+ JsonValueKind.Number when prop.Value.TryGetGuid(out Guid guidValue) => guidValue,
+ JsonValueKind.Number => prop.Value.GetDouble(),
+ JsonValueKind.True => true,
+ JsonValueKind.False => false,
+ JsonValueKind.Null => null,
+ JsonValueKind.Undefined => null,
+ JsonValueKind.Array => prop.Value,
+ JsonValueKind.Object => prop.Value,
+ _ => prop.Value
+ };
+ dict[prop.Name] = value?.ConvertToString();
}
+
+ return dict;
+ #endregion
}
- private List BuildArguments(string? name, JsonDocument? args)
+ private int? RenderSkippingExpression(string? expression, Dictionary dict)
{
- var keyValues = new List();
- if (args != null)
+ int? steps = null;
+
+ if (string.IsNullOrWhiteSpace(expression))
{
- keyValues.Add(new KeyValue(name ?? "trigger_args", args.RootElement.GetRawText()));
+ return steps;
}
- return keyValues;
+
+ var render = _services.GetRequiredService();
+ var copy = dict != null
+ ? new Dictionary(dict.Where(x => x.Value != null).ToDictionary(x => x.Key, x => (object)x.Value!))
+ : [];
+ var result = render.Render(expression, new Dictionary
+ {
+ { "states", copy }
+ });
+
+ if (int.TryParse(result, out var intVal))
+ {
+ steps = intVal;
+ }
+ else if (bool.TryParse(result, out var boolVal) && boolVal)
+ {
+ steps = 1;
+ }
+
+ return steps;
}
- #endregion
}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Models/RuleMessagePayload.cs b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleMessagePayload.cs
new file mode 100644
index 000000000..1c5e81d71
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleMessagePayload.cs
@@ -0,0 +1,11 @@
+namespace BotSharp.Core.Rules.Models;
+
+public class RuleMessagePayload
+{
+ public string AgentId { get; set; }
+ public string TriggerName { get; set; }
+ public string Channel { get; set; }
+ public string Text { get; set; }
+ public Dictionary States { get; set; }
+ public DateTime Timestamp { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Models/RuleTriggerActionRequest.cs b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleTriggerActionRequest.cs
new file mode 100644
index 000000000..0abea08b0
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleTriggerActionRequest.cs
@@ -0,0 +1,18 @@
+using System.Text.Json.Serialization;
+
+namespace BotSharp.Core.Rules.Models;
+
+public class RuleTriggerActionRequest
+{
+ [JsonPropertyName("trigger_name")]
+ public string TriggerName { get; set; }
+
+ [JsonPropertyName("text")]
+ public string Text { get; set; } = string.Empty;
+
+ [JsonPropertyName("states")]
+ public IEnumerable? States { get; set; }
+
+ [JsonPropertyName("options")]
+ public RuleTriggerOptions? Options { get; set; }
+}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs b/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs
index 56e1fb8ae..c12ce1f3c 100644
--- a/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs
+++ b/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs
@@ -1,3 +1,5 @@
+using BotSharp.Core.Rules.Actions;
+using BotSharp.Core.Rules.Criteria;
using BotSharp.Core.Rules.Engines;
namespace BotSharp.Core.Rules;
@@ -17,5 +19,15 @@ public class RulesPlugin : IBotSharpPlugin
public void RegisterDI(IServiceCollection services, IConfiguration config)
{
services.AddScoped();
+ services.AddScoped();
+
+ // Register rule actions
+ services.AddScoped();
+ services.AddScoped();
+ services.AddScoped();
+
+#if DEBUG
+ services.AddScoped();
+#endif
}
}
diff --git a/src/Infrastructure/BotSharp.Core.Rules/Using.cs b/src/Infrastructure/BotSharp.Core.Rules/Using.cs
index a4353c960..2d1dc6844 100644
--- a/src/Infrastructure/BotSharp.Core.Rules/Using.cs
+++ b/src/Infrastructure/BotSharp.Core.Rules/Using.cs
@@ -1,5 +1,7 @@
global using Microsoft.Extensions.Configuration;
global using Microsoft.Extensions.DependencyInjection;
+global using Microsoft.Extensions.Logging;
+global using System.Text;
global using BotSharp.Abstraction.Agents.Enums;
global using BotSharp.Abstraction.Plugins;
@@ -8,4 +10,24 @@
global using BotSharp.Abstraction.Instructs;
global using BotSharp.Abstraction.Instructs.Models;
-global using BotSharp.Abstraction.Rules;
\ No newline at end of file
+global using BotSharp.Abstraction.Agents.Models;
+global using BotSharp.Abstraction.Conversations;
+
+global using BotSharp.Abstraction.Infrastructures.MessageQueues;
+global using BotSharp.Abstraction.Infrastructures.MessageQueues.Models;
+global using BotSharp.Abstraction.Models;
+global using BotSharp.Abstraction.Repositories.Filters;
+global using BotSharp.Abstraction.Rules;
+global using BotSharp.Abstraction.Rules.Options;
+global using BotSharp.Abstraction.Rules.Models;
+global using BotSharp.Abstraction.Rules.Hooks;
+global using BotSharp.Abstraction.Utilities;
+global using BotSharp.Abstraction.Coding;
+global using BotSharp.Abstraction.Coding.Contexts;
+global using BotSharp.Abstraction.Coding.Enums;
+global using BotSharp.Abstraction.Coding.Models;
+global using BotSharp.Abstraction.Coding.Utils;
+global using BotSharp.Abstraction.Coding.Settings;
+global using BotSharp.Abstraction.Hooks;
+
+global using BotSharp.Core.Rules.Constants;
\ No newline at end of file
diff --git a/src/Infrastructure/BotSharp.Core/Instructs/Services/InstructService.Execute.cs b/src/Infrastructure/BotSharp.Core/Instructs/Services/InstructService.Execute.cs
index f0ec11c23..6995b49ec 100644
--- a/src/Infrastructure/BotSharp.Core/Instructs/Services/InstructService.Execute.cs
+++ b/src/Infrastructure/BotSharp.Core/Instructs/Services/InstructService.Execute.cs
@@ -195,7 +195,7 @@ public async Task Execute(
foreach (var hook in hooks)
{
await hook.AfterCompletion(agent, instructResult);
- await hook.AfterCodeExecution(agent, codeExecution);
+ await hook.AfterCodeExecution(agent, context, codeExecution);
}
return instructResult;
diff --git a/src/Infrastructure/BotSharp.Core/Messaging/MessagingPlugin.cs b/src/Infrastructure/BotSharp.Core/Messaging/MessagingPlugin.cs
new file mode 100644
index 000000000..5c84fcb63
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core/Messaging/MessagingPlugin.cs
@@ -0,0 +1,18 @@
+using BotSharp.Abstraction.Infrastructures.MessageQueues;
+using Microsoft.Extensions.Configuration;
+
+namespace BotSharp.Core.Messaging;
+
+public class MessagingPlugin : IBotSharpPlugin
+{
+ public string Id => "52a0aa30-4820-42a9-9cae-df0be81bad2b";
+ public string Name => "Messaging";
+ public string Description => "Provides message queue services.";
+
+ public void RegisterDI(IServiceCollection services, IConfiguration config)
+ {
+ var mqSettings = new MessageQueueSettings();
+ config.Bind("MessageQueue", mqSettings);
+ services.AddSingleton(mqSettings);
+ }
+}
\ No newline at end of file
diff --git a/src/Infrastructure/BotSharp.Logger/Hooks/InstructionLogHook.cs b/src/Infrastructure/BotSharp.Logger/Hooks/InstructionLogHook.cs
index 6ef8c5935..117feaebe 100644
--- a/src/Infrastructure/BotSharp.Logger/Hooks/InstructionLogHook.cs
+++ b/src/Infrastructure/BotSharp.Logger/Hooks/InstructionLogHook.cs
@@ -1,9 +1,11 @@
+using BotSharp.Abstraction.Coding.Contexts;
using BotSharp.Abstraction.Coding.Models;
using BotSharp.Abstraction.Instructs.Models;
using BotSharp.Abstraction.Instructs.Settings;
using BotSharp.Abstraction.Loggers.Models;
using BotSharp.Abstraction.Users;
using BotSharp.Abstraction.Utilities;
+using System;
namespace BotSharp.Logger.Hooks;
@@ -61,7 +63,7 @@ await db.SaveInstructionLogs(new List
await base.OnResponseGenerated(response);
}
- public override async Task AfterCodeExecution(Agent agent, CodeExecutionResponseModel response)
+ public override async Task AfterCodeExecution(Agent agent, CodeExecutionContext context, CodeExecutionResponseModel response)
{
if (response == null || !IsLoggingEnabled(agent?.Id))
{
@@ -88,7 +90,7 @@ await db.SaveInstructionLogs(new List
}
});
- await base.AfterCodeExecution(agent, response);
+ await base.AfterCodeExecution(agent, context, response);
}
private bool IsLoggingEnabled(string? agentId)
diff --git a/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs b/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs
index 52fd719fd..de5b21a14 100644
--- a/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs
+++ b/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs
@@ -1,4 +1,3 @@
-using BotSharp.Abstraction.Agents.Models;
using BotSharp.Abstraction.Rules;
namespace BotSharp.OpenAPI.Controllers;
@@ -18,9 +17,23 @@ public IEnumerable GetRuleTriggers()
}).OrderBy(x => x.TriggerName);
}
- [HttpGet("/rule/formalization")]
- public async Task GetFormalizedRuleDefinition([FromBody] AgentRule rule)
+ [HttpGet("/rule/criteria-providers")]
+ public async Task> GetRuleCriteriaProviders()
{
- return "{}";
+ return _services.GetServices().OrderBy(x => x.Provider).Select(x => new KeyValue
+ {
+ Key = x.Provider,
+ Value = x.DefaultConfig != null ? x.DefaultConfig.RootElement.GetRawText() : "{}"
+ });
+ }
+
+ [HttpGet("/rule/actions")]
+ public async Task> GetRuleActions()
+ {
+ return _services.GetServices().OrderBy(x => x.Name).Select(x => new KeyValue
+ {
+ Key = x.Name,
+ Value = x.DefaultConfig != null ? x.DefaultConfig.RootElement.GetRawText() : "{}"
+ });
}
}
diff --git a/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs b/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs
index be189898e..8e29bb1bb 100644
--- a/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs
+++ b/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs
@@ -84,7 +84,7 @@ private async Task SendRequest(string url, GraphQueryRequest r
}
catch (Exception ex)
{
- _logger.LogError(ex, $"Error when fetching Lessen GLM response (Endpoint: {url}).");
+ _logger.LogError(ex, $"Error when fetching {Provider} Graph db response (Endpoint: {url}).");
return result;
}
}
diff --git a/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs b/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs
index 4205fdc46..1bd76635a 100644
--- a/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs
+++ b/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs
@@ -1,4 +1,5 @@
using BotSharp.Abstraction.Agents.Models;
+using System.Text.Json;
namespace BotSharp.Plugin.MongoStorage.Models;
@@ -7,7 +8,8 @@ public class AgentRuleMongoElement
{
public string TriggerName { get; set; } = default!;
public bool Disabled { get; set; }
- public string Criteria { get; set; } = default!;
+ public AgentRuleCriteriaMongoModel? RuleCriteria { get; set; }
+ public List RuleActions { get; set; } = [];
public static AgentRuleMongoElement ToMongoElement(AgentRule rule)
{
@@ -15,7 +17,8 @@ public static AgentRuleMongoElement ToMongoElement(AgentRule rule)
{
TriggerName = rule.TriggerName,
Disabled = rule.Disabled,
- Criteria = rule.Criteria
+ RuleCriteria = AgentRuleCriteriaMongoModel.ToMongoModel(rule.RuleCriteria),
+ RuleActions = rule.RuleActions?.Where(x => x != null).Select(x => AgentRuleActionMongoElement.ToMongoElement(x)!)?.ToList() ?? []
};
}
@@ -25,7 +28,92 @@ public static AgentRule ToDomainElement(AgentRuleMongoElement rule)
{
TriggerName = rule.TriggerName,
Disabled = rule.Disabled,
- Criteria = rule.Criteria
+ RuleCriteria = AgentRuleCriteriaMongoModel.ToDomainModel(rule.RuleCriteria),
+ RuleActions = rule.RuleActions?.Where(x => x != null).Select(x => AgentRuleActionMongoElement.ToDomainElement(x)!)?.ToList() ?? []
};
}
}
+
+[BsonIgnoreExtraElements(Inherited = true)]
+public class AgentRuleCriteriaMongoModel : AgentRuleConfigMongoModel
+{
+ public string CriteriaText { get; set; }
+
+ public static AgentRuleCriteriaMongoModel? ToMongoModel(AgentRuleCriteria? criteria)
+ {
+ if (criteria == null)
+ {
+ return null;
+ }
+
+ return new AgentRuleCriteriaMongoModel
+ {
+ Name = criteria.Name,
+ CriteriaText = criteria.CriteriaText,
+ Disabled = criteria.Disabled,
+ Config = criteria.Config != null ? BsonDocument.Parse(criteria.Config.RootElement.GetRawText()) : null
+ };
+ }
+
+ public static AgentRuleCriteria? ToDomainModel(AgentRuleCriteriaMongoModel? criteria)
+ {
+ if (criteria == null)
+ {
+ return null;
+ }
+
+ return new AgentRuleCriteria
+ {
+ Name = criteria.Name,
+ CriteriaText = criteria.CriteriaText,
+ Disabled = criteria.Disabled,
+ Config = criteria.Config != null ? JsonDocument.Parse(criteria.Config.ToJson()) : null
+ };
+ }
+}
+
+[BsonIgnoreExtraElements(Inherited = true)]
+public class AgentRuleActionMongoElement : AgentRuleConfigMongoModel
+{
+ public string? SkippingExpression { get; set; }
+
+ public static AgentRuleActionMongoElement? ToMongoElement(AgentRuleAction? action)
+ {
+ if (action == null)
+ {
+ return null;
+ }
+
+ return new AgentRuleActionMongoElement
+ {
+ Name = action.Name,
+ Disabled = action.Disabled,
+ Config = action.Config != null ? BsonDocument.Parse(action.Config.RootElement.GetRawText()) : null,
+ SkippingExpression = action.SkippingExpression
+ };
+ }
+
+ public static AgentRuleAction? ToDomainElement(AgentRuleActionMongoElement? action)
+ {
+ if (action == null)
+ {
+ return null;
+ }
+
+ return new AgentRuleAction
+ {
+ Name = action.Name,
+ Disabled = action.Disabled,
+ Config = action.Config != null ? JsonDocument.Parse(action.Config.ToJson()) : null,
+ SkippingExpression = action.SkippingExpression
+ };
+ }
+}
+
+[BsonIgnoreExtraElements(Inherited = true)]
+public class AgentRuleConfigMongoModel
+{
+ public string Name { get; set; }
+ public bool Disabled { get; set; }
+ public BsonDocument? Config { get; set; }
+}
\ No newline at end of file
diff --git a/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs b/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs
index 300700a64..83b2f1658 100644
--- a/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs
+++ b/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs
@@ -634,8 +634,7 @@ public async Task> TruncateConversation(string conversationId, stri
continue;
}
- var values = state.Values.Where(x => x.MessageId != messageId)
- .Where(x => x.UpdateTime < refTime)
+ var values = state.Values.Where(x => x.MessageId != messageId && x.UpdateTime < refTime)
.ToList();
if (values.Count == 0) continue;
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/BotSharp.Plugin.RabbitMQ.csproj b/src/Plugins/BotSharp.Plugin.RabbitMQ/BotSharp.Plugin.RabbitMQ.csproj
new file mode 100644
index 000000000..4a8f3ff20
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/BotSharp.Plugin.RabbitMQ.csproj
@@ -0,0 +1,22 @@
+
+
+
+ $(TargetFramework)
+ enable
+ $(LangVersion)
+ $(BotSharpVersion)
+ $(GeneratePackageOnBuild)
+ $(GenerateDocumentationFile)
+ $(SolutionDir)packages
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPool.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPool.cs
new file mode 100644
index 000000000..81c7de270
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPool.cs
@@ -0,0 +1,73 @@
+using Microsoft.Extensions.ObjectPool;
+using RabbitMQ.Client;
+
+namespace BotSharp.Plugin.RabbitMQ.Connections;
+
+public class RabbitMQChannelPool
+{
+ private readonly ObjectPool _pool;
+ private readonly ILogger _logger;
+ private readonly int _tryLimit = 3;
+
+ public RabbitMQChannelPool(
+ IServiceProvider services,
+ IRabbitMQConnection mqConnection)
+ {
+ _logger = services.GetRequiredService().CreateLogger();
+ var poolProvider = new DefaultObjectPoolProvider();
+ var policy = new ChannelPoolPolicy(mqConnection.Connection);
+ _pool = poolProvider.Create(policy);
+ }
+
+ public IChannel Get()
+ {
+ var count = 0;
+ var channel = _pool.Get();
+
+ while (count < _tryLimit && channel.IsClosed)
+ {
+ channel.Dispose();
+ channel = _pool.Get();
+ count++;
+ }
+
+ if (channel.IsClosed)
+ {
+ _logger.LogWarning($"No open channel from the pool after {_tryLimit} retries.");
+ }
+
+ return channel;
+ }
+
+ public void Return(IChannel channel)
+ {
+ if (channel.IsOpen)
+ {
+ _pool.Return(channel);
+ }
+ else
+ {
+ channel.Dispose();
+ }
+ }
+}
+
+internal class ChannelPoolPolicy : IPooledObjectPolicy
+{
+ private readonly IConnection _connection;
+
+ public ChannelPoolPolicy(IConnection connection)
+ {
+ _connection = connection;
+ }
+
+ public IChannel Create()
+ {
+ return _connection.CreateChannelAsync().ConfigureAwait(false).GetAwaiter().GetResult();
+ }
+
+ public bool Return(IChannel obj)
+ {
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPoolFactory.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPoolFactory.cs
new file mode 100644
index 000000000..989c0a7b7
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPoolFactory.cs
@@ -0,0 +1,13 @@
+using System.Collections.Concurrent;
+
+namespace BotSharp.Plugin.RabbitMQ.Connections;
+
+public static class RabbitMQChannelPoolFactory
+{
+ private static readonly ConcurrentDictionary _poolDict = new();
+
+ public static RabbitMQChannelPool GetChannelPool(IServiceProvider services, IRabbitMQConnection rabbitMQConnection)
+ {
+ return _poolDict.GetOrAdd(rabbitMQConnection.Connection.ToString()!, key => new RabbitMQChannelPool(services, rabbitMQConnection));
+ }
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQConnection.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQConnection.cs
new file mode 100644
index 000000000..dac9e8c07
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQConnection.cs
@@ -0,0 +1,154 @@
+using Polly;
+using Polly.Retry;
+using RabbitMQ.Client;
+using RabbitMQ.Client.Events;
+using System.Threading;
+
+namespace BotSharp.Plugin.RabbitMQ.Connections;
+
+public class RabbitMQConnection : IRabbitMQConnection
+{
+ private readonly RabbitMQSettings _settings;
+ private readonly IConnectionFactory _connectionFactory;
+ private readonly SemaphoreSlim _lock = new(initialCount: 1, maxCount: 1);
+ private readonly ILogger _logger;
+ private readonly int _retryCount = 5;
+
+ private IConnection _connection;
+ private bool _disposed = false;
+
+ public RabbitMQConnection(
+ RabbitMQSettings settings,
+ ILogger logger)
+ {
+ _settings = settings;
+ _logger = logger;
+ _connectionFactory = new ConnectionFactory
+ {
+ HostName = settings.HostName,
+ Port = settings.Port,
+ UserName = settings.UserName,
+ Password = settings.Password,
+ VirtualHost = settings.VirtualHost,
+ ConsumerDispatchConcurrency = 1,
+ AutomaticRecoveryEnabled = true,
+ HandshakeContinuationTimeout = TimeSpan.FromSeconds(20)
+ };
+ }
+
+ public bool IsConnected => _connection != null && _connection.IsOpen && !_disposed;
+
+ public IConnection Connection => _connection;
+
+ public async Task CreateChannelAsync()
+ {
+ if (!IsConnected)
+ {
+ throw new InvalidOperationException("Rabbit MQ is not connectioned.");
+ }
+ return await _connection.CreateChannelAsync();
+ }
+
+ public async Task ConnectAsync()
+ {
+ await _lock.WaitAsync();
+
+ try
+ {
+ if (IsConnected)
+ {
+ return true;
+ }
+
+ var policy = BuildRetryPolicy();
+ await policy.Execute(async () =>
+ {
+ _connection = await _connectionFactory.CreateConnectionAsync();
+ });
+
+ if (IsConnected)
+ {
+ _connection.ConnectionShutdownAsync += OnConnectionShutdownAsync;
+ _connection.CallbackExceptionAsync += OnCallbackExceptionAsync;
+ _connection.ConnectionBlockedAsync += OnConnectionBlockedAsync;
+ _logger.LogInformation($"Rabbit MQ client connection success. host: {_connection.Endpoint.HostName}, port: {_connection.Endpoint.Port}, localPort:{_connection.LocalPort}");
+ return true;
+ }
+ _logger.LogError("Rabbit MQ client connection error.");
+ return false;
+ }
+ finally
+ {
+ _lock.Release();
+ }
+
+ }
+
+ private RetryPolicy BuildRetryPolicy()
+ {
+ return Policy.Handle().WaitAndRetry(
+ _retryCount,
+ retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
+ (ex, time) =>
+ {
+ _logger.LogError(ex, $"RabbitMQ cannot build connection: after {time.TotalSeconds:n1}s");
+ });
+ }
+
+ private Task OnConnectionShutdownAsync(object sender, ShutdownEventArgs e)
+ {
+ if (_disposed)
+ {
+ return Task.CompletedTask;
+ }
+
+ _logger.LogError($"Rabbit MQ connection is shutdown. {e}.");
+ return Task.CompletedTask;
+ }
+
+ private Task OnCallbackExceptionAsync(object sender, CallbackExceptionEventArgs e)
+ {
+ if (_disposed)
+ {
+ return Task.CompletedTask;
+ }
+
+ _logger.LogError($"Rabbit MQ connection throw exception. Trying to reconnect, {e.Exception}.");
+ return Task.CompletedTask;
+ }
+
+ private Task OnConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs e)
+ {
+ if (_disposed)
+ {
+ return Task.CompletedTask;
+ }
+
+ _logger.LogError($"Rabbit MQ connection is shutdown. Trying to reconnect, {e.Reason}.");
+ return Task.CompletedTask;
+ }
+
+
+ public void Dispose()
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ _logger.LogWarning("Start disposing Rabbit MQ connection.");
+
+ try
+ {
+ _connection.Dispose();
+ _disposed = true;
+ _logger.LogWarning("Disposed Rabbit MQ connection.");
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Error when disposing Rabbit MQ connection");
+ }
+
+ GC.SuppressFinalize(this);
+ }
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/DummyMessageConsumer.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/DummyMessageConsumer.cs
new file mode 100644
index 000000000..36af0df90
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/DummyMessageConsumer.cs
@@ -0,0 +1,24 @@
+namespace BotSharp.Plugin.RabbitMQ.Consumers;
+
+public class DummyMessageConsumer : MQConsumerBase
+{
+ public override object Config => new RabbitMQConsumerConfig
+ {
+ ExchangeName = "my.exchange",
+ QueueName = "dummy.queue",
+ RoutingKey = "my.routing"
+ };
+
+ public DummyMessageConsumer(
+ IServiceProvider services,
+ ILogger logger)
+ : base(services, logger)
+ {
+ }
+
+ public override async Task HandleMessageAsync(string channel, string data)
+ {
+ _logger.LogCritical($"Received delayed dummy message data: {data}");
+ return await Task.FromResult(true);
+ }
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/ScheduledMessageConsumer.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/ScheduledMessageConsumer.cs
new file mode 100644
index 000000000..f6040dcd7
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/ScheduledMessageConsumer.cs
@@ -0,0 +1,25 @@
+namespace BotSharp.Plugin.RabbitMQ.Consumers;
+
+public class ScheduledMessageConsumer : MQConsumerBase
+{
+ public override object Config => new RabbitMQConsumerConfig
+ {
+ ExchangeName = "my.exchange",
+ QueueName = "scheduled.queue",
+ RoutingKey = "my.routing"
+ };
+
+ public ScheduledMessageConsumer(
+ IServiceProvider services,
+ ILogger logger)
+ : base(services, logger)
+ {
+ }
+
+ public override async Task HandleMessageAsync(string channel, string data)
+ {
+ _logger.LogCritical($"Received delayed scheduled message data: {data}");
+ return await Task.FromResult(true);
+ }
+}
+
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Controllers/RabbitMQController.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Controllers/RabbitMQController.cs
new file mode 100644
index 000000000..802e4fa1b
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Controllers/RabbitMQController.cs
@@ -0,0 +1,89 @@
+using Microsoft.AspNetCore.Authorization;
+using Microsoft.AspNetCore.Http;
+using Microsoft.AspNetCore.Mvc;
+
+namespace BotSharp.Plugin.RabbitMQ.Controllers;
+
+[Authorize]
+[ApiController]
+public class RabbitMQController : ControllerBase
+{
+ private readonly IServiceProvider _services;
+ private readonly IMQService _mqService;
+ private readonly ILogger _logger;
+
+ public RabbitMQController(
+ IServiceProvider services,
+ IMQService mqService,
+ ILogger logger)
+ {
+ _services = services;
+ _mqService = mqService;
+ _logger = logger;
+ }
+
+ ///
+ /// Publish a scheduled message to be delivered after a delay
+ ///
+ /// The scheduled message request
+ [HttpPost("/message-queue/publish")]
+ public async Task PublishScheduledMessage([FromBody] PublishScheduledMessageRequest request)
+ {
+ if (request == null)
+ {
+ return BadRequest(new PublishMessageResponse { Success = false, Error = "Request body is required." });
+ }
+
+ try
+ {
+ var payload = new ScheduledMessagePayload
+ {
+ Name = request.Name ?? "Hello"
+ };
+
+ var success = await _mqService.PublishAsync(
+ payload,
+ options: new()
+ {
+ TopicName = "my.exchange",
+ RoutingKey = "my.routing",
+ DelayMilliseconds = request.DelayMilliseconds ?? 10000,
+ MessageId = request.MessageId
+ });
+ return Ok(new { Success = success });
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Failed to publish scheduled message");
+ return StatusCode(StatusCodes.Status500InternalServerError,
+ new PublishMessageResponse { Success = false, Error = ex.Message });
+ }
+ }
+
+ ///
+ /// Unsubscribe a consumer
+ ///
+ ///
+ ///
+ [HttpPost("/message-queue/unsubscribe/consumer")]
+ public async Task UnSubscribeConsuer([FromBody] UnsubscribeConsumerRequest request)
+ {
+ if (request == null)
+ {
+ return BadRequest(new { Success = false, Error = "Request body is required." });
+ }
+
+ try
+ {
+ var success = await _mqService.UnsubscribeAsync(request.Name);
+ return Ok(new { Success = success });
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Failed to unsubscribe consumer {request.Name}");
+ return StatusCode(StatusCodes.Status500InternalServerError,
+ new { Success = false, Error = ex.Message });
+ }
+ }
+}
+
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Interfaces/IRabbitMQConnection.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Interfaces/IRabbitMQConnection.cs
new file mode 100644
index 000000000..cb89c2976
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Interfaces/IRabbitMQConnection.cs
@@ -0,0 +1,11 @@
+using RabbitMQ.Client;
+
+namespace BotSharp.Plugin.RabbitMQ.Interfaces;
+
+public interface IRabbitMQConnection : IDisposable
+{
+ bool IsConnected { get; }
+ IConnection Connection { get; }
+ Task CreateChannelAsync();
+ Task ConnectAsync();
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/PublishDelayedMessageRequest.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/PublishDelayedMessageRequest.cs
new file mode 100644
index 000000000..ad655b795
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/PublishDelayedMessageRequest.cs
@@ -0,0 +1,31 @@
+namespace BotSharp.Plugin.RabbitMQ.Models;
+
+///
+/// Request model for publishing a scheduled message
+///
+public class PublishScheduledMessageRequest
+{
+ public string? Name { get; set; }
+
+ public long? DelayMilliseconds { get; set; }
+
+ public string? MessageId { get; set; }
+}
+
+
+///
+/// Response model for publish operations
+///
+public class PublishMessageResponse
+{
+ ///
+ /// Whether the message was successfully published
+ ///
+ public bool Success { get; set; }
+
+ ///
+ /// Error message if publish failed
+ ///
+ public string? Error { get; set; }
+}
+
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/RabbitMQConsumerConfig.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/RabbitMQConsumerConfig.cs
new file mode 100644
index 000000000..93754d455
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/RabbitMQConsumerConfig.cs
@@ -0,0 +1,24 @@
+namespace BotSharp.Plugin.RabbitMQ.Models;
+
+internal class RabbitMQConsumerConfig
+{
+ ///
+ /// The exchange name (topic in some MQ systems).
+ ///
+ internal string ExchangeName { get; set; } = "rabbitmq.exchange";
+
+ ///
+ /// The queue name (subscription in some MQ systems).
+ ///
+ internal string QueueName { get; set; } = "rabbitmq.queue";
+
+ ///
+ /// The routing key (filter in some MQ systems).
+ ///
+ internal string RoutingKey { get; set; } = "rabbitmq.routing";
+
+ ///
+ /// Additional arguments for the consumer configuration.
+ ///
+ internal Dictionary Arguments { get; set; } = new();
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/ScheduledMessagePayload.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/ScheduledMessagePayload.cs
new file mode 100644
index 000000000..2180fb2d7
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/ScheduledMessagePayload.cs
@@ -0,0 +1,9 @@
+namespace BotSharp.Plugin.RabbitMQ.Models;
+
+///
+/// Payload for scheduled/delayed messages
+///
+public class ScheduledMessagePayload
+{
+ public string Name { get; set; }
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/UnsubscribeConsumerRequest.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/UnsubscribeConsumerRequest.cs
new file mode 100644
index 000000000..509d432b2
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/UnsubscribeConsumerRequest.cs
@@ -0,0 +1,6 @@
+namespace BotSharp.Plugin.RabbitMQ.Models;
+
+public class UnsubscribeConsumerRequest
+{
+ public string Name { get; set; }
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/RabbitMQPlugin.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/RabbitMQPlugin.cs
new file mode 100644
index 000000000..ff45dfe48
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/RabbitMQPlugin.cs
@@ -0,0 +1,56 @@
+using BotSharp.Plugin.RabbitMQ.Services;
+using Microsoft.AspNetCore.Builder;
+using Microsoft.Extensions.Configuration;
+
+namespace BotSharp.Plugin.RabbitMQ;
+
+public class RabbitMQPlugin : IBotSharpAppPlugin
+{
+ public string Id => "3f93407f-3c37-4e25-be28-142a2da9b514";
+ public string Name => "RabbitMQ";
+ public string Description => "Handle AI messages in RabbitMQ.";
+ public string IconUrl => "https://icon-library.com/images/message-queue-icon/message-queue-icon-13.jpg";
+
+ public void RegisterDI(IServiceCollection services, IConfiguration config)
+ {
+ var settings = new RabbitMQSettings();
+ config.Bind("RabbitMQ", settings);
+ services.AddSingleton(settings);
+
+ var mqSettings = new MessageQueueSettings();
+ config.Bind("MessageQueue", mqSettings);
+
+ if (mqSettings.Enabled && mqSettings.Provider.IsEqualTo("RabbitMQ"))
+ {
+ services.AddSingleton();
+ services.AddSingleton();
+ }
+ }
+
+ public void Configure(IApplicationBuilder app)
+ {
+#if DEBUG
+ var sp = app.ApplicationServices;
+ var mqSettings = sp.GetRequiredService();
+
+ if (mqSettings.Enabled && mqSettings.Provider.IsEqualTo("RabbitMQ"))
+ {
+ var mqService = sp.GetRequiredService();
+ var loggerFactory = sp.GetRequiredService();
+
+ // Create and subscribe the consumer using the abstract interface
+ var scheduledConsumer = new ScheduledMessageConsumer(sp, loggerFactory.CreateLogger());
+ mqService.SubscribeAsync(nameof(ScheduledMessageConsumer), scheduledConsumer)
+ .ConfigureAwait(false)
+ .GetAwaiter()
+ .GetResult();
+
+ var dummyConsumer = new DummyMessageConsumer(sp, loggerFactory.CreateLogger());
+ mqService.SubscribeAsync(nameof(DummyMessageConsumer), dummyConsumer)
+ .ConfigureAwait(false)
+ .GetAwaiter()
+ .GetResult();
+ }
+#endif
+ }
+}
\ No newline at end of file
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Services/RabbitMQService.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Services/RabbitMQService.cs
new file mode 100644
index 000000000..0117bad14
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Services/RabbitMQService.cs
@@ -0,0 +1,318 @@
+using Polly;
+using Polly.Retry;
+using RabbitMQ.Client;
+using RabbitMQ.Client.Events;
+using System.Collections.Concurrent;
+
+namespace BotSharp.Plugin.RabbitMQ.Services;
+
+public class RabbitMQService : IMQService
+{
+ private readonly IRabbitMQConnection _mqConnection;
+ private readonly IServiceProvider _services;
+ private readonly ILogger _logger;
+
+ private readonly int _retryCount = 5;
+ private bool _disposed = false;
+ private static readonly ConcurrentDictionary _consumers = [];
+
+ public RabbitMQService(
+ IRabbitMQConnection mqConnection,
+ IServiceProvider services,
+ ILogger logger)
+ {
+ _mqConnection = mqConnection;
+ _services = services;
+ _logger = logger;
+ }
+
+ public async Task SubscribeAsync(string key, IMQConsumer consumer)
+ {
+ if (_consumers.ContainsKey(key))
+ {
+ _logger.LogWarning($"Consumer with key '{key}' is already subscribed.");
+ return false;
+ }
+
+ var registration = await CreateConsumerRegistrationAsync(consumer);
+ if (registration != null && _consumers.TryAdd(key, registration))
+ {
+ var config = consumer.Config as RabbitMQConsumerConfig ?? new();
+ _logger.LogInformation($"Consumer '{key}' subscribed to queue '{config.QueueName}'.");
+ return true;
+ }
+
+ return false;
+ }
+
+ public async Task UnsubscribeAsync(string key)
+ {
+ if (!_consumers.TryRemove(key, out var registration))
+ {
+ return false;
+ }
+
+ try
+ {
+ if (registration.Channel != null)
+ {
+ registration.Channel.Dispose();
+ }
+ registration.Consumer.Dispose();
+ _logger.LogInformation($"Consumer '{key}' unsubscribed.");
+ return true;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Error unsubscribing consumer '{key}'.");
+ return false;
+ }
+ }
+
+ private async Task CreateConsumerRegistrationAsync(IMQConsumer consumer)
+ {
+ try
+ {
+ var channel = await CreateChannelAsync(consumer);
+
+ var config = consumer.Config as RabbitMQConsumerConfig ?? new();
+ var registration = new ConsumerRegistration(consumer, channel);
+
+ var asyncConsumer = new AsyncEventingBasicConsumer(channel);
+ asyncConsumer.ReceivedAsync += async (sender, eventArgs) =>
+ {
+ await ConsumeEventAsync(registration, eventArgs);
+ };
+
+ await channel.BasicConsumeAsync(
+ queue: config.QueueName,
+ autoAck: false,
+ consumer: asyncConsumer);
+
+ _logger.LogWarning($"RabbitMQ consuming queue '{config.QueueName}'.");
+ return registration;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Error when register consumer in RabbitMQ.");
+ return null;
+ }
+ }
+
+ private async Task CreateChannelAsync(IMQConsumer consumer)
+ {
+ if (!_mqConnection.IsConnected)
+ {
+ await _mqConnection.ConnectAsync();
+ }
+
+ var config = consumer.Config as RabbitMQConsumerConfig ?? new();
+ var channel = await _mqConnection.CreateChannelAsync();
+ _logger.LogWarning($"Created RabbitMQ channel {channel.ChannelNumber} for queue '{config.QueueName}'");
+
+ var args = new Dictionary
+ {
+ ["x-delayed-type"] = "direct"
+ };
+
+ if (config.Arguments != null)
+ {
+ foreach (var kvp in config.Arguments)
+ {
+ args[kvp.Key] = kvp.Value;
+ }
+ }
+
+ await channel.ExchangeDeclareAsync(
+ exchange: config.ExchangeName,
+ type: "x-delayed-message",
+ durable: true,
+ autoDelete: false,
+ arguments: args);
+
+ await channel.QueueDeclareAsync(
+ queue: config.QueueName,
+ durable: true,
+ exclusive: false,
+ autoDelete: false);
+
+ await channel.QueueBindAsync(
+ queue: config.QueueName,
+ exchange: config.ExchangeName,
+ routingKey: config.RoutingKey);
+
+ return channel;
+ }
+
+ private async Task ConsumeEventAsync(ConsumerRegistration registration, BasicDeliverEventArgs eventArgs)
+ {
+ var data = string.Empty;
+ var config = registration.Consumer.Config as RabbitMQConsumerConfig ?? new();
+
+ try
+ {
+ data = Encoding.UTF8.GetString(eventArgs.Body.Span);
+ _logger.LogInformation($"Message received on '{config.QueueName}', id: {eventArgs.BasicProperties?.MessageId}, data: {data}");
+
+ var isHandled = await registration.Consumer.HandleMessageAsync(config.QueueName, data);
+ if (registration.Channel?.IsOpen == true)
+ {
+ if (isHandled)
+ {
+ await registration.Channel.BasicAckAsync(eventArgs.DeliveryTag, multiple: false);
+ }
+ else
+ {
+ await registration.Channel.BasicNackAsync(eventArgs.DeliveryTag, multiple: false, requeue: false);
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Error consuming message on queue '{config.QueueName}': {data}");
+ if (registration.Channel?.IsOpen == true)
+ {
+ await registration.Channel.BasicNackAsync(eventArgs.DeliveryTag, multiple: false, requeue: false);
+ }
+ }
+ }
+
+ public async Task PublishAsync(T payload, MQPublishOptions options)
+ {
+ try
+ {
+ if (options == null)
+ {
+ return false;
+ }
+
+ if (!_mqConnection.IsConnected)
+ {
+ await _mqConnection.ConnectAsync();
+ }
+
+ var isPublished = false;
+ var policy = BuildRetryPolicy();
+ await policy.Execute(async () =>
+ {
+ var channelPool = RabbitMQChannelPoolFactory.GetChannelPool(_services, _mqConnection);
+ var channel = channelPool.Get();
+
+ try
+ {
+ var args = new Dictionary
+ {
+ ["x-delayed-type"] = "direct"
+ };
+
+ if (!options.Arguments.IsNullOrEmpty())
+ {
+ foreach (var kvp in options.Arguments)
+ {
+ args[kvp.Key] = kvp.Value;
+ }
+ }
+
+ await channel.ExchangeDeclareAsync(
+ exchange: options.TopicName,
+ type: "x-delayed-message",
+ durable: true,
+ autoDelete: false,
+ arguments: args);
+
+ var messageId = options.MessageId ?? Guid.NewGuid().ToString();
+ var message = new MQMessage(payload, messageId);
+ var body = ConvertToBinary(message, options.JsonOptions);
+ var properties = new BasicProperties
+ {
+ MessageId = messageId,
+ DeliveryMode = DeliveryModes.Persistent,
+ Headers = new Dictionary
+ {
+ ["x-delay"] = options.DelayMilliseconds
+ }
+ };
+
+ await channel.BasicPublishAsync(
+ exchange: options.TopicName,
+ routingKey: options.RoutingKey,
+ mandatory: true,
+ basicProperties: properties,
+ body: body);
+
+ isPublished = true;
+ }
+ catch (Exception)
+ {
+ throw;
+ }
+ finally
+ {
+ channelPool.Return(channel);
+ }
+ });
+
+ return isPublished;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"Error when RabbitMQ publish message.");
+ return false;
+ }
+ }
+
+ private RetryPolicy BuildRetryPolicy()
+ {
+ return Policy.Handle().WaitAndRetry(
+ _retryCount,
+ retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
+ (ex, time) =>
+ {
+ _logger.LogError(ex, $"RabbitMQ publish error: after {time.TotalSeconds:n1}s");
+ });
+ }
+
+ private static byte[] ConvertToBinary(T data, JsonSerializerOptions? jsonOptions = null)
+ {
+ var jsonStr = JsonSerializer.Serialize(data, jsonOptions);
+ var body = Encoding.UTF8.GetBytes(jsonStr);
+ return body;
+ }
+
+ public void Dispose()
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ _logger.LogWarning($"Disposing {nameof(RabbitMQService)}");
+
+ foreach (var item in _consumers)
+ {
+ if (item.Value.Channel != null)
+ {
+ item.Value.Channel.Dispose();
+ }
+ item.Value.Consumer.Dispose();
+ }
+
+ _disposed = true;
+ GC.SuppressFinalize(this);
+ }
+
+ ///
+ /// Internal class to track consumer registrations with their RabbitMQ channels.
+ ///
+ private class ConsumerRegistration
+ {
+ public IMQConsumer Consumer { get; }
+ public IChannel? Channel { get; }
+
+ public ConsumerRegistration(IMQConsumer consumer, IChannel? channel)
+ {
+ Consumer = consumer;
+ Channel = channel;
+ }
+ }
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Settings/RabbitMQSettings.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Settings/RabbitMQSettings.cs
new file mode 100644
index 000000000..0e61b5c71
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Settings/RabbitMQSettings.cs
@@ -0,0 +1,10 @@
+namespace BotSharp.Plugin.RabbitMQ.Settings;
+
+public class RabbitMQSettings
+{
+ public string HostName { get; set; } = "localhost";
+ public int Port { get; set; } = 5672;
+ public string UserName { get; set; } = "guest";
+ public string Password { get; set; } = "guest";
+ public string VirtualHost { get; set; } = "/";
+}
diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Using.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Using.cs
new file mode 100644
index 000000000..0a8a8c3a5
--- /dev/null
+++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Using.cs
@@ -0,0 +1,38 @@
+global using System;
+global using System.Collections.Generic;
+global using System.Text;
+global using System.Linq;
+global using System.Text.Json;
+global using System.Net.Mime;
+global using System.Threading.Tasks;
+global using Microsoft.Extensions.DependencyInjection;
+global using Microsoft.Extensions.Logging;
+global using BotSharp.Abstraction.Agents;
+global using BotSharp.Abstraction.Conversations;
+global using BotSharp.Abstraction.Plugins;
+global using BotSharp.Abstraction.Conversations.Models;
+global using BotSharp.Abstraction.Functions;
+global using BotSharp.Abstraction.Agents.Models;
+global using BotSharp.Abstraction.Agents.Enums;
+global using BotSharp.Abstraction.Files.Enums;
+global using BotSharp.Abstraction.Files.Models;
+global using BotSharp.Abstraction.Files.Converters;
+global using BotSharp.Abstraction.Files;
+global using BotSharp.Abstraction.MLTasks;
+global using BotSharp.Abstraction.Utilities;
+global using BotSharp.Abstraction.Agents.Settings;
+global using BotSharp.Abstraction.Functions.Models;
+global using BotSharp.Abstraction.Repositories;
+global using BotSharp.Abstraction.Settings;
+global using BotSharp.Abstraction.Messaging;
+global using BotSharp.Abstraction.Messaging.Models.RichContent;
+global using BotSharp.Abstraction.Options;
+global using BotSharp.Abstraction.Models;
+global using BotSharp.Abstraction.Infrastructures.MessageQueues;
+global using BotSharp.Abstraction.Infrastructures.MessageQueues.Models;
+
+global using BotSharp.Plugin.RabbitMQ.Settings;
+global using BotSharp.Plugin.RabbitMQ.Models;
+global using BotSharp.Plugin.RabbitMQ.Interfaces;
+global using BotSharp.Plugin.RabbitMQ.Consumers;
+global using BotSharp.Plugin.RabbitMQ.Connections;
\ No newline at end of file
diff --git a/src/WebStarter/WebStarter.csproj b/src/WebStarter/WebStarter.csproj
index 9374d95fd..be332a38e 100644
--- a/src/WebStarter/WebStarter.csproj
+++ b/src/WebStarter/WebStarter.csproj
@@ -42,6 +42,7 @@
+
diff --git a/src/WebStarter/appsettings.json b/src/WebStarter/appsettings.json
index 39587b64e..b3c2b35e9 100644
--- a/src/WebStarter/appsettings.json
+++ b/src/WebStarter/appsettings.json
@@ -1006,6 +1006,7 @@
"Language": "en"
}
},
+
"A2AIntegration": {
"Enabled": true,
"DefaultTimeoutSeconds": 30,
@@ -1018,12 +1019,27 @@
}
]
},
+
+ "MessageQueue": {
+ "Enabled": false,
+ "Provider": "RabbitMQ"
+ },
+
+ "RabbitMQ": {
+ "HostName": "localhost",
+ "Port": 5672,
+ "UserName": "guest",
+ "Password": "guest",
+ "VirtualHost": "/"
+ },
+
"PluginLoader": {
"Assemblies": [
"BotSharp.Core",
"BotSharp.Core.A2A",
"BotSharp.Core.SideCar",
"BotSharp.Core.Crontab",
+ "BotSharp.Core.Rules",
"BotSharp.Core.Realtime",
"BotSharp.Logger",
"BotSharp.Plugin.MongoStorage",
@@ -1061,7 +1077,8 @@
"BotSharp.Plugin.PythonInterpreter",
"BotSharp.Plugin.FuzzySharp",
"BotSharp.Plugin.MMPEmbedding",
- "BotSharp.Plugin.MultiTenancy"
+ "BotSharp.Plugin.MultiTenancy",
+ "BotSharp.Plugin.RabbitMQ"
]
},