diff --git a/BotSharp.sln b/BotSharp.sln index ad95f29e8..6abc5b47b 100644 --- a/BotSharp.sln +++ b/BotSharp.sln @@ -157,6 +157,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BotSharp.Core.A2A", "src\In EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BotSharp.Plugin.MultiTenancy", "src\Plugins\BotSharp.Plugin.MultiTenancy\BotSharp.Plugin.MultiTenancy.csproj", "{562DD0C6-DAC8-02CC-C1DD-D43DF186CE76}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BotSharp.Plugin.RabbitMQ", "src\Plugins\BotSharp.Plugin.RabbitMQ\BotSharp.Plugin.RabbitMQ.csproj", "{8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -669,6 +671,14 @@ Global {562DD0C6-DAC8-02CC-C1DD-D43DF186CE76}.Release|Any CPU.Build.0 = Release|Any CPU {562DD0C6-DAC8-02CC-C1DD-D43DF186CE76}.Release|x64.ActiveCfg = Release|Any CPU {562DD0C6-DAC8-02CC-C1DD-D43DF186CE76}.Release|x64.Build.0 = Release|Any CPU + {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Debug|x64.ActiveCfg = Debug|Any CPU + {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Debug|x64.Build.0 = Debug|Any CPU + {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Release|Any CPU.Build.0 = Release|Any CPU + {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Release|x64.ActiveCfg = Release|Any CPU + {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5}.Release|x64.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -745,6 +755,7 @@ Global {13223C71-9EAC-9835-28ED-5A4833E6F915} = {53E7CD86-0D19-40D9-A0FA-AB4613837E89} {E8D01281-D52A-BFF4-33DB-E35D91754272} = {E29DC6C4-5E57-48C5-BCB0-6B8F84782749} {562DD0C6-DAC8-02CC-C1DD-D43DF186CE76} = {51AFE054-AE99-497D-A593-69BAEFB5106F} + {8E609A1C-0421-5BB5-DEA9-5FDB68F6D1C5} = {64264688-0F5C-4AB0-8F2B-B59B717CCE00} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {A9969D89-C98B-40A5-A12B-FC87E55B3A19} diff --git a/Directory.Packages.props b/Directory.Packages.props index 1c198a828..96897fb92 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -10,6 +10,7 @@ + diff --git a/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs b/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs index 75c0985a8..2653feec2 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Agents/Models/AgentRule.cs @@ -1,3 +1,5 @@ +using System.Text.Json; + namespace BotSharp.Abstraction.Agents.Models; public class AgentRule @@ -8,6 +10,63 @@ public class AgentRule [JsonPropertyName("disabled")] public bool Disabled { get; set; } - [JsonPropertyName("criteria")] - public string Criteria { get; set; } = string.Empty; + [JsonPropertyName("rule_criteria")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public AgentRuleCriteria? RuleCriteria { get; set; } + + [JsonPropertyName("rule_actions")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public IEnumerable RuleActions { get; set; } = []; +} + +public class AgentRuleCriteria : AgentRuleConfigBase +{ + /// + /// Criteria + /// + [JsonPropertyName("criteria_text")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string CriteriaText { get; set; } = string.Empty; + + /// + /// Adaptive configuration for rule criteria. + /// This flexible JSON document can store any criteria-specific configuration. + /// The structure depends on the criteria executor + /// + [JsonPropertyName("config")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public override JsonDocument? Config { get; set; } } + +public class AgentRuleAction : AgentRuleConfigBase +{ + /// + /// Adaptive configuration for rule actions. + /// This flexible JSON document can store any action-specific configuration. + /// The structure depends on the action type: + /// - For "Http" action: contains http_context with base_url, relative_url, method, etc. + /// - For "MessageQueue" action: contains mq_config with topic_name, routing_key, etc. + /// - For custom actions: can contain any custom configuration structure + /// + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public override JsonDocument? Config { get; set; } + + /// + /// Skipping the number of actions using liquid template, starting from the action itself. + /// + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? SkippingExpression { get; set; } +} + +public class AgentRuleConfigBase +{ + public virtual string Name { get; set; } + + public virtual bool Disabled { get; set; } + + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public virtual JsonDocument? Config { get; set; } + + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public virtual string? JsonConfig { get; set; } +} \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Abstraction/Coding/Contexts/CodeExecutionContext.cs b/src/Infrastructure/BotSharp.Abstraction/Coding/Contexts/CodeExecutionContext.cs index e2ec38a5a..8442eae45 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Coding/Contexts/CodeExecutionContext.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Coding/Contexts/CodeExecutionContext.cs @@ -4,4 +4,5 @@ public class CodeExecutionContext { public AgentCodeScript CodeScript { get; set; } public List Arguments { get; set; } = []; + public string? InvokeFrom { get; set; } } diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQConsumer.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQConsumer.cs new file mode 100644 index 000000000..4df43dd0e --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQConsumer.cs @@ -0,0 +1,21 @@ +namespace BotSharp.Abstraction.Infrastructures.MessageQueues; + +/// +/// Abstract interface for message queue consumers. +/// Implement this interface to create consumers that are independent of MQ products (e.g., RabbitMQ, Kafka, Azure Service Bus). +/// +public interface IMQConsumer : IDisposable +{ + /// + /// Gets the consumer config + /// + object Config { get; } + + /// + /// Handles the received message from the queue. + /// + /// The consumer channel identifier + /// The message data as string + /// True if the message was handled successfully, false otherwise + Task HandleMessageAsync(string channel, string data); +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQService.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQService.cs new file mode 100644 index 000000000..672e539c1 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/IMQService.cs @@ -0,0 +1,31 @@ +using BotSharp.Abstraction.Infrastructures.MessageQueues.Models; + +namespace BotSharp.Abstraction.Infrastructures.MessageQueues; + +public interface IMQService : IDisposable +{ + /// + /// Subscribe a consumer to the message queue. + /// The consumer will be initialized with the appropriate MQ-specific infrastructure. + /// + /// Unique identifier for the consumer + /// The consumer implementing IMQConsumer interface + /// Task representing the async subscription operation + Task SubscribeAsync(string key, IMQConsumer consumer); + + /// + /// Unsubscribe a consumer from the message queue. + /// + /// Unique identifier for the consumer + /// Task representing the async unsubscription operation + Task UnsubscribeAsync(string key); + + /// + /// Publish payload to message queue + /// + /// + /// + /// + /// + Task PublishAsync(T payload, MQPublishOptions options); +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MQConsumerBase.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MQConsumerBase.cs new file mode 100644 index 000000000..cd66be1fd --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MQConsumerBase.cs @@ -0,0 +1,51 @@ +using Microsoft.Extensions.Logging; + +namespace BotSharp.Abstraction.Infrastructures.MessageQueues; + +/// +/// Abstract base class for RabbitMQ consumers. +/// Implements IMQConsumer to allow other projects to define consumers independently of RabbitMQ. +/// The RabbitMQ-specific infrastructure is handled by RabbitMQService. +/// +public abstract class MQConsumerBase : IMQConsumer +{ + protected readonly IServiceProvider _services; + protected readonly ILogger _logger; + private bool _disposed = false; + + /// + /// Gets the consumer config for this consumer. + /// Override this property to customize exchange, queue and routing configuration. + /// + public abstract object Config { get; } + + protected MQConsumerBase( + IServiceProvider services, + ILogger logger) + { + _services = services; + _logger = logger; + } + + /// + /// Handles the received message from the queue. + /// + /// The consumer channel identifier + /// The message data as string + /// True if the message was handled successfully, false otherwise + public abstract Task HandleMessageAsync(string channel, string data); + + public void Dispose() + { + if (_disposed) + { + return; + } + + var consumerName = GetType().Name; + _logger.LogWarning($"Disposing consumer: {consumerName}"); + _disposed = true; + GC.SuppressFinalize(this); + } +} + diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MessageQueueSettings.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MessageQueueSettings.cs new file mode 100644 index 000000000..b08a5a054 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/MessageQueueSettings.cs @@ -0,0 +1,7 @@ +namespace BotSharp.Abstraction.Infrastructures.MessageQueues; + +public class MessageQueueSettings +{ + public bool Enabled { get; set; } + public string Provider { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQMessage.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQMessage.cs new file mode 100644 index 000000000..e940aff01 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQMessage.cs @@ -0,0 +1,14 @@ +namespace BotSharp.Abstraction.Infrastructures.MessageQueues.Models; + +public class MQMessage +{ + public MQMessage(T payload, string messageId) + { + Payload = payload; + MessageId = messageId; + } + + public T Payload { get; set; } + public string MessageId { get; set; } + public DateTime CreateDate { get; set; } = DateTime.UtcNow; +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQPublishOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQPublishOptions.cs new file mode 100644 index 000000000..dead523be --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Infrastructures/MessageQueues/Models/MQPublishOptions.cs @@ -0,0 +1,40 @@ +using System.Text.Json; + +namespace BotSharp.Abstraction.Infrastructures.MessageQueues.Models; + +/// +/// Configuration options for publishing messages to a message queue. +/// These options are MQ-product agnostic and can be adapted by different implementations. +/// +public class MQPublishOptions +{ + /// + /// The topic name (exchange in RabbitMQ, topic in Kafka/Azure Service Bus). + /// + public string TopicName { get; set; } = string.Empty; + + /// + /// The routing key (partition key in some MQ systems, used for message routing). + /// + public string RoutingKey { get; set; } = string.Empty; + + /// + /// Delay in milliseconds before the message is delivered. + /// + public long DelayMilliseconds { get; set; } + + /// + /// Optional unique identifier for the message. + /// + public string? MessageId { get; set; } + + /// + /// Additional arguments for the publish configuration (MQ-specific). + /// + public Dictionary Arguments { get; set; } = []; + + /// + /// Json serializer options + /// + public JsonSerializerOptions? JsonOptions { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Instructs/IInstructHook.cs b/src/Infrastructure/BotSharp.Abstraction/Instructs/IInstructHook.cs index 54fc80a90..a39e4bf05 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Instructs/IInstructHook.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Instructs/IInstructHook.cs @@ -12,5 +12,5 @@ public interface IInstructHook : IHookBase Task OnResponseGenerated(InstructResponseModel response) => Task.CompletedTask; Task BeforeCodeExecution(Agent agent, CodeExecutionContext context) => Task.CompletedTask; - Task AfterCodeExecution(Agent agent, CodeExecutionResponseModel response) => Task.CompletedTask; + Task AfterCodeExecution(Agent agent, CodeExecutionContext context, CodeExecutionResponseModel response) => Task.CompletedTask; } diff --git a/src/Infrastructure/BotSharp.Abstraction/Instructs/InstructHookBase.cs b/src/Infrastructure/BotSharp.Abstraction/Instructs/InstructHookBase.cs index f5758434c..81023135f 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Instructs/InstructHookBase.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Instructs/InstructHookBase.cs @@ -28,7 +28,7 @@ public virtual async Task BeforeCodeExecution(Agent agent, CodeExecutionContext await Task.CompletedTask; } - public virtual async Task AfterCodeExecution(Agent agent, CodeExecutionResponseModel response) + public virtual async Task AfterCodeExecution(Agent agent, CodeExecutionContext context, CodeExecutionResponseModel response) { await Task.CompletedTask; } diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Hooks/IRuleTriggerHook.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Hooks/IRuleTriggerHook.cs new file mode 100644 index 000000000..5d5c31fd6 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Hooks/IRuleTriggerHook.cs @@ -0,0 +1,13 @@ +using BotSharp.Abstraction.Hooks; +using BotSharp.Abstraction.Rules.Models; + +namespace BotSharp.Abstraction.Rules.Hooks; + +public interface IRuleTriggerHook : IHookBase +{ + Task BeforeRuleCriteriaExecuted(Agent agent, AgentRuleCriteria ruleCriteria, IRuleTrigger trigger, RuleCriteriaContext context) => Task.CompletedTask; + Task AfterRuleCriteriaExecuted(Agent agent, AgentRuleCriteria ruleCriteria, IRuleTrigger trigger, RuleCriteriaResult result) => Task.CompletedTask; + + Task BeforeRuleActionExecuted(Agent agent, AgentRuleAction ruleAction, IRuleTrigger trigger, RuleActionContext context) => Task.CompletedTask; + Task AfterRuleActionExecuted(Agent agent, AgentRuleAction ruleAction, IRuleTrigger trigger, RuleActionResult result) => Task.CompletedTask; +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs index bebac5d6f..ee2d43e68 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleAction.cs @@ -1,5 +1,32 @@ +using BotSharp.Abstraction.Rules.Models; +using System.Text.Json; + namespace BotSharp.Abstraction.Rules; +/// +/// Base interface for rule actions that can be executed by the RuleEngine +/// public interface IRuleAction { -} + /// + /// The unique name of the rule action provider + /// + string Name { get; } + + /// + /// The default config json format. + /// + JsonDocument DefaultConfig => JsonDocument.Parse("{}"); + + /// + /// Execute the rule action + /// + /// The agent that triggered the rule + /// The rule trigger + /// The action context + /// The action execution result + Task ExecuteAsync( + Agent agent, + IRuleTrigger trigger, + RuleActionContext context); +} \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs index bc5022911..30658e582 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleCriteria.cs @@ -1,5 +1,14 @@ +using BotSharp.Abstraction.Rules.Models; +using System.Text.Json; + namespace BotSharp.Abstraction.Rules; public interface IRuleCriteria { + string Provider { get; } + + JsonDocument DefaultConfig => JsonDocument.Parse("{}"); + + Task ValidateAsync(Agent agent, IRuleTrigger trigger, RuleCriteriaContext context) + => Task.FromResult(new RuleCriteriaResult()); } diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleEngine.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleEngine.cs index c7a6d847b..ff539e87f 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleEngine.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/IRuleEngine.cs @@ -13,4 +13,14 @@ public interface IRuleEngine /// Task> Triggered(IRuleTrigger trigger, string text, IEnumerable? states = null, RuleTriggerOptions? options = null) => throw new NotImplementedException(); + + /// + /// Execute rule actions + /// + /// + /// + /// + /// + Task ExecuteActions(IRuleTrigger trigger, IEnumerable actions, RuleExecutionActionOptions options) + => Task.FromResult(false); } diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionContext.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionContext.cs new file mode 100644 index 000000000..aecbf2eed --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionContext.cs @@ -0,0 +1,12 @@ +using System.Text.Json; + +namespace BotSharp.Abstraction.Rules.Models; + +public class RuleActionContext +{ + public string Text { get; set; } = string.Empty; + public Dictionary Parameters { get; set; } = []; + public IEnumerable PrevStepResults { get; set; } = []; + public IEnumerable NextActions { get; set; } = []; + public JsonSerializerOptions? JsonOptions { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionResult.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionResult.cs new file mode 100644 index 000000000..453d75a4c --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleActionResult.cs @@ -0,0 +1,55 @@ +namespace BotSharp.Abstraction.Rules.Models; + +/// +/// Result of a rule action execution +/// +public class RuleActionResult +{ + /// + /// Whether the action executed successfully + /// + public bool Success { get; set; } + + /// + /// Response content from the action + /// + public string? Response { get; set; } + + /// + /// Result data + /// + public Dictionary Data { get; set; } = []; + + /// + /// Error message if the action failed + /// + public string? ErrorMessage { get; set; } + + /// + /// Whether the action is delayed + /// + public bool IsDelayed { get; set; } + + public static RuleActionResult Succeeded(string? response = null) + { + return new RuleActionResult + { + Success = true, + Response = response + }; + } + + public static RuleActionResult Failed(string errorMessage) + { + return new RuleActionResult + { + Success = false, + ErrorMessage = errorMessage + }; + } +} + +public class RuleActionStepResult : RuleActionResult +{ + public AgentRuleAction RuleAction { get; set; } +} \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaContext.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaContext.cs new file mode 100644 index 000000000..7fff33344 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaContext.cs @@ -0,0 +1,10 @@ +using System.Text.Json; + +namespace BotSharp.Abstraction.Rules.Models; + +public class RuleCriteriaContext +{ + public string Text { get; set; } = string.Empty; + public Dictionary Parameters { get; set; } = []; + public JsonSerializerOptions? JsonOptions { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaResult.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaResult.cs new file mode 100644 index 000000000..fc1df6ceb --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Models/RuleCriteriaResult.cs @@ -0,0 +1,19 @@ +namespace BotSharp.Abstraction.Rules.Models; + +public class RuleCriteriaResult +{ + /// + /// Whether the criteria executed successfully + /// + public bool Success { get; set; } + + /// + /// Response content from the action + /// + public bool IsValid { get; set; } + + /// + /// Error message if the criteria failed + /// + public string? ErrorMessage { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleCriteriaOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleCriteriaOptions.cs new file mode 100644 index 000000000..27b5167f4 --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleCriteriaOptions.cs @@ -0,0 +1,39 @@ +using System.Text.Json; + +namespace BotSharp.Abstraction.Rules.Options; + +public class RuleCriteriaOptions : CriteriaExecuteOptions +{ + /// + /// Criteria execution provider + /// + public string Provider { get; set; } = "botsharp-rule"; +} + +public class CriteriaExecuteOptions +{ + /// + /// Code processor provider + /// + public string? CodeProcessor { get; set; } + + /// + /// Code script name + /// + public string? CodeScriptName { get; set; } + + /// + /// Argument name as an input key to the code script + /// + public string? ArgumentName { get; set; } + + /// + /// Json arguments as an input value to the code script + /// + public JsonDocument? ArgumentContent { get; set; } + + /// + /// Custom parameters + /// + public Dictionary Parameters { get; set; } = []; +} \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleExecutionActionOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleExecutionActionOptions.cs new file mode 100644 index 000000000..48787e06e --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleExecutionActionOptions.cs @@ -0,0 +1,8 @@ +namespace BotSharp.Abstraction.Rules.Options; + +public class RuleExecutionActionOptions +{ + public string AgentId { get; set; } + public string Text { get; set; } + public IEnumerable States { get; set; } = []; +} diff --git a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs index 068052b0b..abba98115 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Rules/Options/RuleTriggerOptions.cs @@ -1,3 +1,4 @@ +using BotSharp.Abstraction.Repositories.Filters; using System.Text.Json; namespace BotSharp.Abstraction.Rules.Options; @@ -5,22 +6,12 @@ namespace BotSharp.Abstraction.Rules.Options; public class RuleTriggerOptions { /// - /// Code processor provider + /// Filter agents /// - public string? CodeProcessor { get; set; } + public AgentFilter? AgentFilter { get; set; } /// - /// Code script name + /// Json serializer options /// - public string? CodeScriptName { get; set; } - - /// - /// Argument name as an input key to the code script - /// - public string? ArgumentName { get; set; } - - /// - /// Json arguments as an input value to the code script - /// - public JsonDocument? ArgumentContent { get; set; } + public JsonSerializerOptions? JsonOptions { get; set; } } diff --git a/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs b/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs index a36516c8a..c0f858fd5 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Utilities/ObjectExtensions.cs @@ -65,4 +65,70 @@ public static class ObjectExtensions return null; } } + + public static T? TryGetValueOrDefault(this IDictionary dict, string key, T? defaultValue = default, JsonSerializerOptions? jsonOptions = null) + { + return dict.TryGetValue(key, out var value, jsonOptions) + ? value! + : defaultValue; + } + + public static bool TryGetValue(this IDictionary dict, string key, out T? result, JsonSerializerOptions? jsonOptions = null) + { + result = default; + + if (!dict.TryGetValue(key, out var value) || value is null) + { + return false; + } + + if (value is T t) + { + result = t; + return true; + } + + if (value is JsonElement je) + { + try + { + result = je.Deserialize(jsonOptions); + return true; + } + catch + { + return false; + } + } + + return false; + } + + + public static T? TryGetObjectValueOrDefault(this IDictionary dict, string key, T? defaultValue = default, JsonSerializerOptions? jsonOptions = null) where T : class + { + return dict.TryGetObjectValue(key, out var value, jsonOptions) + ? value! + : defaultValue; + } + + public static bool TryGetObjectValue(this IDictionary dict, string key, out T? result, JsonSerializerOptions? jsonOptions = null) where T : class + { + result = default; + + if (!dict.TryGetValue(key, out var value) || value is null) + { + return false; + } + + try + { + result = JsonSerializer.Deserialize(value, jsonOptions); + return true; + } + catch + { + return false; + } + } } diff --git a/src/Infrastructure/BotSharp.Core.Rules/Actions/ChatRuleAction.cs b/src/Infrastructure/BotSharp.Core.Rules/Actions/ChatRuleAction.cs new file mode 100644 index 000000000..eae15d74e --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Actions/ChatRuleAction.cs @@ -0,0 +1,77 @@ +namespace BotSharp.Core.Rules.Actions; + +public sealed class ChatRuleAction : IRuleAction +{ + private readonly IServiceProvider _services; + private readonly ILogger _logger; + + public ChatRuleAction( + IServiceProvider services, + ILogger logger) + { + _services = services; + _logger = logger; + } + + public string Name => "send_message_to_agent"; + + public async Task ExecuteAsync( + Agent agent, + IRuleTrigger trigger, + RuleActionContext context) + { + using var scope = _services.CreateScope(); + var sp = scope.ServiceProvider; + + try + { + var channel = trigger.Channel; + var convService = sp.GetRequiredService(); + var conv = await convService.NewConversation(new Conversation + { + Channel = channel, + Title = context.Text, + AgentId = agent.Id + }); + + var message = new RoleDialogModel(AgentRole.User, context.Text); + + var allStates = new List + { + new("channel", channel) + }; + + if (!context.Parameters.IsNullOrEmpty()) + { + var states = context.Parameters.Where(x => x.Value != null).Select(x => new MessageState(x.Key, x.Value!)); + allStates.AddRange(states); + } + + await convService.SetConversationId(conv.Id, allStates); + await convService.SendMessage(agent.Id, + message, + null, + msg => Task.CompletedTask); + + await convService.SaveStates(); + + _logger.LogInformation("Chat rule action executed successfully for agent {AgentId}, conversation {ConversationId}", agent.Id, conv.Id); + + return new RuleActionResult + { + Success = true, + Response = conv.Id, + Data = new() + { + ["agent_id"] = agent.Id, + ["conversation_id"] = conv.Id + } + }; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error when sending chat via rule action for agent {AgentId} and trigger {TriggerName}", agent.Id, trigger.Name); + return RuleActionResult.Failed(ex.Message); + } + } +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/Actions/FunctionCallRuleAction.cs b/src/Infrastructure/BotSharp.Core.Rules/Actions/FunctionCallRuleAction.cs new file mode 100644 index 000000000..7251a24d7 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Actions/FunctionCallRuleAction.cs @@ -0,0 +1,50 @@ +using BotSharp.Abstraction.Functions; + +namespace BotSharp.Core.Rules.Actions; + +public sealed class FunctionCallRuleAction : IRuleAction +{ + private readonly IServiceProvider _services; + private readonly ILogger _logger; + + public FunctionCallRuleAction( + IServiceProvider services, + ILogger logger) + { + _services = services; + _logger = logger; + } + + public string Name => "function_call"; + + public async Task ExecuteAsync( + Agent agent, + IRuleTrigger trigger, + RuleActionContext context) + { + var funcName = context.Parameters.TryGetValue("function_name", out var fName) ? fName : null; + var func = _services.GetServices().FirstOrDefault(x => x.Name.IsEqualTo(funcName)); + + if (func == null) + { + var errorMsg = $"Unable to find function '{funcName}' when running action {agent.Name}-{trigger.Name}"; + _logger.LogWarning(errorMsg); + return RuleActionResult.Failed(errorMsg); + } + + var funcArg = context.Parameters.TryGetObjectValueOrDefault("function_argument", new()) ?? new(); + await func.Execute(funcArg); + + return new RuleActionResult + { + Success = true, + Response = funcArg?.RichContent?.Message?.Text ?? funcArg?.Content, + Data = new() + { + ["function_name"] = funcName!, + ["function_argument"] = funcArg?.ConvertToString() ?? "{}", + ["function_call_result"] = funcArg?.RichContent?.Message?.Text ?? funcArg?.Content ?? string.Empty + } + }; + } +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/Actions/HttpRuleAction.cs b/src/Infrastructure/BotSharp.Core.Rules/Actions/HttpRuleAction.cs new file mode 100644 index 000000000..18aa2446a --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Actions/HttpRuleAction.cs @@ -0,0 +1,191 @@ +using System.Net.Mime; +using System.Text.Json; +using System.Web; + +namespace BotSharp.Core.Rules.Actions; + +public sealed class HttpRuleAction : IRuleAction +{ + private readonly IServiceProvider _services; + private readonly ILogger _logger; + private readonly IHttpClientFactory _httpClientFactory; + + public HttpRuleAction( + IServiceProvider services, + ILogger logger, + IHttpClientFactory httpClientFactory) + { + _services = services; + _logger = logger; + _httpClientFactory = httpClientFactory; + } + + public string Name => "http_request"; + + public JsonDocument DefaultConfig => JsonDocument.Parse(JsonSerializer.Serialize(new + { + http_url = "https://dummy.example.com/api/v1/employees", + http_method = "GET" + })); + + public async Task ExecuteAsync( + Agent agent, + IRuleTrigger trigger, + RuleActionContext context) + { + try + { + var httpMethod = GetHttpMethod(context); + if (httpMethod == null) + { + var errorMsg = $"HTTP method is not supported in agent rule {agent.Name}-{trigger.Name}"; + _logger.LogWarning(errorMsg); + return RuleActionResult.Failed(errorMsg); + } + + // Build the full URL + var fullUrl = BuildUrl(context); + + using var client = _httpClientFactory.CreateClient(); + + // Add headers + AddHttpHeaders(client, context); + + // Create request + var request = new HttpRequestMessage(httpMethod, fullUrl); + + // Add request body if provided + var requestBodyStr = GetHttpRequestBody(context); + if (!string.IsNullOrEmpty(requestBodyStr)) + { + request.Content = new StringContent(requestBodyStr, Encoding.UTF8, MediaTypeNames.Application.Json); + } + + _logger.LogInformation("Executing HTTP rule action for agent {AgentId}, URL: {Url}, Method: {Method}", + agent.Id, fullUrl, httpMethod); + + // Send request + var response = await client.SendAsync(request); + var responseContent = await response.Content.ReadAsStringAsync(); + + if (response.IsSuccessStatusCode) + { + _logger.LogInformation("HTTP rule action executed successfully for agent {AgentId}, Status: {StatusCode}, Response: {Response}", + agent.Id, response.StatusCode, responseContent); + + return new RuleActionResult + { + Success = true, + Response = responseContent, + Data = new() + { + ["http_response"] = responseContent + } + }; + } + else + { + var errorMsg = $"HTTP request failed with status code {response.StatusCode}: {responseContent}"; + _logger.LogWarning(errorMsg); + return RuleActionResult.Failed(errorMsg); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error executing HTTP rule action for agent {AgentId} and trigger {TriggerName}", + agent.Id, trigger.Name); + return RuleActionResult.Failed(ex.Message); + } + } + + private string BuildUrl(RuleActionContext context) + { + var url = context.Parameters.GetValueOrDefault("http_url", string.Empty); + if (string.IsNullOrEmpty(url)) + { + throw new ArgumentNullException("Unable to find http_url in context"); + } + + // Fill in placeholders in url + foreach (var param in context.Parameters) + { + var value = param.Value?.ToString(); + if (string.IsNullOrEmpty(value)) + { + continue; + } + url = url.Replace($"{{{param.Key}}}", value); + } + + // Add query parameters + var queryParams = context.Parameters.TryGetObjectValueOrDefault>("http_query_params"); + if (!queryParams.IsNullOrEmpty()) + { + var builder = new UriBuilder(url); + var query = HttpUtility.ParseQueryString(builder.Query); + + // Add new query params + foreach (var kv in queryParams!.Where(x => x.Value != null)) + { + query[kv.Key] = kv.Value!; + } + + // Assign merged query back + builder.Query = query.ToString(); + url = builder.ToString(); + } + + _logger.LogInformation("HTTP url after filling: {Url}", url); + return url; + } + + private HttpMethod? GetHttpMethod(RuleActionContext context) + { + var method = context.Parameters.GetValueOrDefault("http_method", string.Empty); + var innerMethod = method?.Trim()?.ToUpper(); + HttpMethod? matchMethod = null; + + switch (innerMethod) + { + case "GET": + matchMethod = HttpMethod.Get; + break; + case "POST": + matchMethod = HttpMethod.Post; + break; + case "DELETE": + matchMethod = HttpMethod.Delete; + break; + case "PUT": + matchMethod = HttpMethod.Put; + break; + case "PATCH": + matchMethod = HttpMethod.Patch; + break; + default: + break; + + } + + return matchMethod; + } + + private void AddHttpHeaders(HttpClient client, RuleActionContext context) + { + var headerParams = context.Parameters.TryGetObjectValueOrDefault>("http_request_headers"); + if (!headerParams.IsNullOrEmpty()) + { + foreach (var header in headerParams!) + { + client.DefaultRequestHeaders.TryAddWithoutValidation(header.Key, header.Value); + } + } + } + + private string? GetHttpRequestBody(RuleActionContext context) + { + var body = context.Parameters.GetValueOrDefault("http_request_body"); + return body; + } +} + diff --git a/src/Infrastructure/BotSharp.Core.Rules/Constants/RuleConstant.cs b/src/Infrastructure/BotSharp.Core.Rules/Constants/RuleConstant.cs new file mode 100644 index 000000000..daf54db77 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Constants/RuleConstant.cs @@ -0,0 +1,6 @@ +namespace BotSharp.Core.Rules.Constants; + +public static class RuleConstant +{ + public const string DEFAULT_CRITERIA_PROVIDER = "code_script"; +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/Controllers/RuleController.cs b/src/Infrastructure/BotSharp.Core.Rules/Controllers/RuleController.cs new file mode 100644 index 000000000..feb314ef7 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Controllers/RuleController.cs @@ -0,0 +1,42 @@ +using BotSharp.Core.Rules.Models; +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Mvc; + +namespace BotSharp.Core.Rules.Controllers; + +[Authorize] +[ApiController] +public class RuleController : ControllerBase +{ + private readonly IServiceProvider _services; + private readonly ILogger _logger; + private readonly IRuleEngine _ruleEngine; + + public RuleController( + IServiceProvider services, + ILogger logger, + IRuleEngine ruleEngine) + { + _services = services; + _logger = logger; + _ruleEngine = ruleEngine; + } + + [HttpPost("/rule/trigger/action")] + public async Task RunAction([FromBody] RuleTriggerActionRequest request) + { + if (request == null) + { + return BadRequest(new { Success = false, Error = "Request cannnot be empty." }); + } + + var trigger = _services.GetServices().FirstOrDefault(x => x.Name.IsEqualTo(request.TriggerName)); + if (trigger == null) + { + return BadRequest(new { Success = false, Error = "Unable to find rule trigger." }); + } + + var result = await _ruleEngine.Triggered(trigger, request.Text, request.States, request.Options); + return Ok(new { Success = true }); + } +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/Criteria/CodeScriptRuleCriteria.cs b/src/Infrastructure/BotSharp.Core.Rules/Criteria/CodeScriptRuleCriteria.cs new file mode 100644 index 000000000..31536844f --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Criteria/CodeScriptRuleCriteria.cs @@ -0,0 +1,140 @@ +using System.Text.Json; + +namespace BotSharp.Core.Rules.Criteria; + +public class CodeScriptRuleCriteria : IRuleCriteria +{ + private readonly IServiceProvider _services; + private readonly ILogger _logger; + private readonly CodingSettings _codingSettings; + + public CodeScriptRuleCriteria( + IServiceProvider services, + ILogger logger, + CodingSettings codingSettings) + { + _services = services; + _logger = logger; + _codingSettings = codingSettings; + } + + public string Provider => RuleConstant.DEFAULT_CRITERIA_PROVIDER; + + public JsonDocument DefaultConfig => JsonDocument.Parse(JsonSerializer.Serialize(new + { + code_processor = BuiltInCodeProcessor.PyInterpreter, + code_script_name = "{trigger_name}_rule.py", + code_script_arg_name = "trigger_args", + code_script_arg_value = JsonDocument.Parse("{}") + })); + + public async Task ValidateAsync(Agent agent, IRuleTrigger trigger, RuleCriteriaContext context) + { + var result = new RuleCriteriaResult(); + + if (string.IsNullOrWhiteSpace(agent?.Id)) + { + return result; + } + + var provider = context.Parameters.GetValueOrDefault("code_processor", BuiltInCodeProcessor.PyInterpreter); + var processor = _services.GetServices().FirstOrDefault(x => x.Provider.IsEqualTo(provider)); + if (processor == null) + { + _logger.LogWarning($"Unable to find code processor: {provider}."); + return result; + } + + var agentService = _services.GetRequiredService(); + var scriptName = context.Parameters.GetValueOrDefault("code_script_name", $"{trigger.Name}_rule.py"); + var codeScript = await agentService.GetAgentCodeScript(agent.Id, scriptName, scriptType: AgentCodeScriptType.Src); + + var msg = $"rule trigger ({trigger.Name}) code script ({scriptName}) in agent ({agent.Name})."; + + if (codeScript == null || string.IsNullOrWhiteSpace(codeScript.Content)) + { + _logger.LogWarning($"Unable to find {msg}."); + return result; + } + + try + { + var hooks = _services.GetHooks(agent.Id); + + var argName = context.Parameters.GetValueOrDefault("code_script_arg_name", null); + var argValue = context.Parameters.TryGetValue("code_script_arg_value", out var val) && val != null ? JsonSerializer.Deserialize(val) : (JsonElement?)null; + var arguments = BuildArguments(argName, argValue); + var codeExecutionContext = new CodeExecutionContext + { + CodeScript = codeScript, + Arguments = arguments, + InvokeFrom = nameof(CodeScriptRuleCriteria) + }; + + foreach (var hook in hooks) + { + await hook.BeforeCodeExecution(agent, codeExecutionContext); + } + + codeScript = codeExecutionContext.CodeScript; + var (useLock, useProcess, timeoutSeconds) = CodingUtil.GetCodeExecutionConfig(_codingSettings); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSeconds)); + var response = processor.Run(codeScript.Content, options: new() + { + ScriptName = scriptName, + Arguments = codeExecutionContext.Arguments, + UseLock = useLock, + UseProcess = useProcess + }, cancellationToken: cts.Token); + + var codeResponse = new CodeExecutionResponseModel + { + CodeProcessor = processor.Provider, + CodeScript = codeScript, + Arguments = arguments.DistinctBy(x => x.Key).ToDictionary(x => x.Key, x => x.Value ?? string.Empty), + ExecutionResult = response + }; + + foreach (var hook in hooks) + { + await hook.AfterCodeExecution(agent, codeExecutionContext, codeResponse); + } + + if (response == null || !response.Success) + { + _logger.LogWarning($"Failed to handle {msg}"); + return result; + } + + LogLevel logLevel; + if (response.Result.IsEqualTo("true")) + { + logLevel = LogLevel.Information; + result.Success = true; + result.IsValid = true; + } + else + { + logLevel = LogLevel.Warning; + } + + _logger.Log(logLevel, $"Code script execution result ({response}) from {msg}"); + return result; + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error when handling {msg}"); + return result; + } + } + + private List BuildArguments(string? name, JsonElement? args) + { + var keyValues = new List(); + if (args != null) + { + keyValues.Add(new KeyValue(name ?? "trigger_args", args.Value.GetRawText())); + } + return keyValues; + } +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/DemoRuleTrigger.cs b/src/Infrastructure/BotSharp.Core.Rules/DemoRuleTrigger.cs new file mode 100644 index 000000000..70a9eac22 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/DemoRuleTrigger.cs @@ -0,0 +1,10 @@ +namespace BotSharp.Core.Rules; + +public class DemoRuleTrigger : IRuleTrigger +{ + public string Channel => "crontab"; + public string Name => nameof(DemoRuleTrigger); + + public string EntityType { get; set; } = "DemoType"; + public string EntityId { get; set; } = "DemoId"; +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs b/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs index 5f68b722d..75a13d398 100644 --- a/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs +++ b/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs @@ -1,17 +1,4 @@ -using BotSharp.Abstraction.Agents.Models; -using BotSharp.Abstraction.Coding; -using BotSharp.Abstraction.Coding.Contexts; -using BotSharp.Abstraction.Coding.Enums; -using BotSharp.Abstraction.Coding.Models; -using BotSharp.Abstraction.Coding.Settings; -using BotSharp.Abstraction.Coding.Utils; -using BotSharp.Abstraction.Conversations; -using BotSharp.Abstraction.Hooks; -using BotSharp.Abstraction.Models; -using BotSharp.Abstraction.Repositories.Filters; -using BotSharp.Abstraction.Rules.Options; -using BotSharp.Abstraction.Utilities; -using Microsoft.Extensions.Logging; +using BotSharp.Abstraction.Templating; using System.Data; using System.Text.Json; @@ -21,16 +8,13 @@ public class RuleEngine : IRuleEngine { private readonly IServiceProvider _services; private readonly ILogger _logger; - private readonly CodingSettings _codingSettings; public RuleEngine( IServiceProvider services, - ILogger logger, - CodingSettings codingSettings) + ILogger logger) { _services = services; _logger = logger; - _codingSettings = codingSettings; } public async Task> Triggered(IRuleTrigger trigger, string text, IEnumerable? states = null, RuleTriggerOptions? options = null) @@ -39,7 +23,7 @@ public async Task> Triggered(IRuleTrigger trigger, string te // Pull all user defined rules var agentService = _services.GetRequiredService(); - var agents = await agentService.GetAgents(new AgentFilter + var agents = await agentService.GetAgents(options?.AgentFilter ?? new AgentFilter { Pager = new Pagination { @@ -51,154 +35,351 @@ public async Task> Triggered(IRuleTrigger trigger, string te var filteredAgents = agents.Items.Where(x => x.Rules.Exists(r => r.TriggerName.IsEqualTo(trigger.Name) && !x.Disabled)).ToList(); foreach (var agent in filteredAgents) { - // Code trigger - if (options != null) + var rule = agent.Rules.FirstOrDefault(x => x.TriggerName.IsEqualTo(trigger.Name) && !x.Disabled); + if (rule == null) { - var isTriggered = await TriggerCodeScript(agent, trigger.Name, options); - if (!isTriggered) + continue; + } + + // Criteria validation + if (!string.IsNullOrEmpty(rule.RuleCriteria?.Name) && !rule.RuleCriteria.Disabled) + { + var criteriaResult = await ExecuteCriteriaAsync(agent, rule.RuleCriteria, trigger, text, states, options); + if (criteriaResult?.IsValid == false) { + _logger.LogWarning("Criteria validation failed for agent {AgentId} with trigger {TriggerName}", agent.Id, trigger.Name); continue; } } - var convService = _services.GetRequiredService(); - var conv = await convService.NewConversation(new Conversation + // Execute action + var ruleActions = rule.RuleActions?.Where(x => x != null && !string.IsNullOrEmpty(x.Name) && !x.Disabled) ?? []; + if (ruleActions.IsNullOrEmpty()) { - Channel = trigger.Channel, - Title = text, - AgentId = agent.Id - }); - - var message = new RoleDialogModel(AgentRole.User, text); + continue; + } - var allStates = new List + var actionIdx = 0; + var stepResults = new List(); + while (actionIdx >= 0 && actionIdx < ruleActions.Count()) { - new("channel", trigger.Channel) - }; + var ruleAction = ruleActions.ElementAt(actionIdx); + var dict = BuildContextParameters(ruleAction.Config, states, stepResults); - if (states != null) - { - allStates.AddRange(states); - } + var skipSteps = RenderSkippingExpression(ruleAction.SkippingExpression, dict); + if (skipSteps.HasValue && skipSteps > 0) + { + actionIdx += skipSteps.Value; + continue; + } - await convService.SetConversationId(conv.Id, allStates); + var actionResult = await ExecuteActionAsync(agent, ruleAction, ruleActions.Skip(actionIdx + 1), trigger, text, dict, stepResults, options); + if (actionResult == null) + { + actionIdx++; + continue; + } - await convService.SendMessage(agent.Id, - message, - null, - msg => Task.CompletedTask); + if (!actionResult.Success) + { + break; + } + + stepResults.Add(new() + { + RuleAction = ruleAction, + Success = actionResult.Success, + Response = actionResult.Response, + ErrorMessage = actionResult.ErrorMessage, + Data = actionResult.Data + }); + + if (actionResult?.Success == true + && actionResult.Data.TryGetValue("conversation_id", out var convId) + && convId != null) + { + newConversationIds.Add(convId.ToString()!); + } - await convService.SaveStates(); - newConversationIds.Add(conv.Id); + if (actionResult?.IsDelayed == true) + { + break; + } + + actionIdx++; + } } return newConversationIds; } - #region Private methods - private async Task TriggerCodeScript(Agent agent, string triggerName, RuleTriggerOptions options) + public async Task ExecuteActions(IRuleTrigger trigger, IEnumerable actions, RuleExecutionActionOptions options) { - if (string.IsNullOrWhiteSpace(agent?.Id)) - { - return false; - } + var agentService = _services.GetRequiredService(); + var agent = await agentService.GetAgent(options.AgentId); - var provider = options.CodeProcessor ?? BuiltInCodeProcessor.PyInterpreter; - var processor = _services.GetServices().FirstOrDefault(x => x.Provider.IsEqualTo(provider)); - if (processor == null) + var actionIdx = 0; + var stepResults = new List(); + while (actionIdx >= 0 && actionIdx < actions.Count()) { - _logger.LogWarning($"Unable to find code processor: {provider}."); - return false; - } + var ruleAction = actions.ElementAt(actionIdx); + var dict = BuildContextParameters(ruleAction.Config, options.States, stepResults); - var agentService = _services.GetRequiredService(); - var scriptName = options.CodeScriptName ?? $"{triggerName}_rule.py"; - var codeScript = await agentService.GetAgentCodeScript(agent.Id, scriptName, scriptType: AgentCodeScriptType.Src); + var skipSteps = RenderSkippingExpression(ruleAction.SkippingExpression, dict); + if (skipSteps.HasValue && skipSteps > 0) + { + actionIdx += skipSteps.Value; + continue; + } - var msg = $"rule trigger ({triggerName}) code script ({scriptName}) in agent ({agent.Name}) => args: {options.ArgumentContent?.RootElement.GetRawText()}."; + var actionResult = await ExecuteActionAsync(agent, ruleAction, actions.Skip(actionIdx + 1), trigger, options.Text, dict, stepResults); + if (actionResult == null) + { + actionIdx++; + continue; + } - if (codeScript == null || string.IsNullOrWhiteSpace(codeScript.Content)) - { - _logger.LogWarning($"Unable to find {msg}."); - return false; + if (!actionResult.Success) + { + break; + } + + stepResults.Add(new() + { + RuleAction = ruleAction, + Success = actionResult.Success, + Response = actionResult.Response, + ErrorMessage = actionResult.ErrorMessage, + Data = actionResult.Data + }); + + if (actionResult?.IsDelayed == true) + { + break; + } + + actionIdx++; } + return true; + } + + + #region Criteria + private async Task ExecuteCriteriaAsync( + Agent agent, + AgentRuleCriteria ruleCriteria, + IRuleTrigger trigger, + string text, + IEnumerable? states, + RuleTriggerOptions? triggerOptions) + { + var result = new RuleCriteriaResult(); + try { - var hooks = _services.GetHooks(agent.Id); + var criteria = _services.GetServices() + .FirstOrDefault(x => x.Provider == ruleCriteria.Name); + + if (criteria == null) + { + return result; + } + - var arguments = BuildArguments(options.ArgumentName, options.ArgumentContent); - var context = new CodeExecutionContext + var context = new RuleCriteriaContext { - CodeScript = codeScript, - Arguments = arguments + Text = text, + Parameters = BuildContextParameters(ruleCriteria.Config, states), + JsonOptions = triggerOptions?.JsonOptions }; + _logger.LogInformation("Start execution rule criteria {CriteriaProvider} for agent {AgentId} with trigger {TriggerName}", + criteria.Provider, agent.Id, trigger.Name); + + var hooks = _services.GetHooks(agent.Id); foreach (var hook in hooks) { - await hook.BeforeCodeExecution(agent, context); + await hook.BeforeRuleCriteriaExecuted(agent, ruleCriteria, trigger, context); } - var (useLock, useProcess, timeoutSeconds) = CodingUtil.GetCodeExecutionConfig(_codingSettings); - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSeconds)); - var response = processor.Run(codeScript.Content, options: new() + // Execute criteria + context.Parameters ??= []; + result = await criteria.ValidateAsync(agent, trigger, context); + + foreach (var hook in hooks) + { + await hook.AfterRuleCriteriaExecuted(agent, ruleCriteria, trigger, result); + } + + return result; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error executing rule criteria {CriteriaProvider} for agent {AgentId}", ruleCriteria.Name, agent.Id); + return result; + } + } + #endregion + + + #region Action + private async Task ExecuteActionAsync( + Agent agent, + AgentRuleAction curRuleAction, + IEnumerable nextRuleActions, + IRuleTrigger trigger, + string text, + Dictionary param, + IEnumerable prevStepResults, + RuleTriggerOptions? triggerOptions = null) + { + try + { + // Get all registered rule actions + var actions = _services.GetServices(); + + // Find the matching action + var foundAction = actions.FirstOrDefault(x => x.Name.IsEqualTo(curRuleAction.Name)); + + if (foundAction == null) { - ScriptName = scriptName, - Arguments = arguments, - UseLock = useLock, - UseProcess = useProcess - }, cancellationToken: cts.Token); + var errorMsg = $"No rule action {curRuleAction.Name} is found"; + _logger.LogWarning(errorMsg); + return RuleActionResult.Failed(errorMsg); + } - var codeResponse = new CodeExecutionResponseModel + var context = new RuleActionContext { - CodeProcessor = processor.Provider, - CodeScript = codeScript, - Arguments = arguments.DistinctBy(x => x.Key).ToDictionary(x => x.Key, x => x.Value ?? string.Empty), - ExecutionResult = response + Text = text, + Parameters = param, + PrevStepResults = prevStepResults, + NextActions = nextRuleActions, + JsonOptions = triggerOptions?.JsonOptions }; + _logger.LogInformation("Start execution rule action {ActionName} for agent {AgentId} with trigger {TriggerName}", + foundAction.Name, agent.Id, trigger.Name); + + var hooks = _services.GetHooks(agent.Id); foreach (var hook in hooks) { - await hook.AfterCodeExecution(agent, codeResponse); + await hook.BeforeRuleActionExecuted(agent, curRuleAction, trigger, context); } - if (response == null || !response.Success) + // Execute action + context.Parameters ??= []; + var result = await foundAction.ExecuteAsync(agent, trigger, context); + + foreach (var hook in hooks) { - _logger.LogWarning($"Failed to handle {msg}"); - return false; + await hook.AfterRuleActionExecuted(agent, curRuleAction, trigger, result); } - bool result; - LogLevel logLevel; - if (response.Result.IsEqualTo("true")) + return result; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error executing rule action {ActionName} for agent {AgentId}", curRuleAction.Name, agent.Id); + return RuleActionResult.Failed(ex.Message); + } + } + #endregion + + + #region Private methods + private Dictionary BuildContextParameters(JsonDocument? config, IEnumerable? states, IEnumerable? stepResults = null) + { + var dict = new Dictionary(); + + if (config != null) + { + dict = ConvertToDictionary(config); + } + + if (!states.IsNullOrEmpty()) + { + foreach (var state in states!) { - logLevel = LogLevel.Information; - result = true; + dict[state.Key] = state.Value?.ConvertToString(); } - else + } + + if (!stepResults.IsNullOrEmpty()) + { + foreach (var result in stepResults!) { - logLevel = LogLevel.Warning; - result = false; - } + if (result.Data.IsNullOrEmpty()) continue; - _logger.Log(logLevel, $"Code script execution result ({response}) from {msg}"); - return result; + foreach (var item in result.Data) + { + dict[item.Key] = item.Value; + } + } } - catch (Exception ex) + + return dict; + } + + private static Dictionary ConvertToDictionary(JsonDocument doc) + { + var dict = new Dictionary(); + + foreach (var prop in doc.RootElement.EnumerateObject()) { - _logger.LogError(ex, $"Error when handling {msg}"); - return false; + object? value = prop.Value.ValueKind switch + { + JsonValueKind.String => prop.Value.GetString(), + JsonValueKind.Number when prop.Value.TryGetDecimal(out decimal decimalValue) => decimalValue, + JsonValueKind.Number when prop.Value.TryGetDouble(out double doubleValue) => doubleValue, + JsonValueKind.Number when prop.Value.TryGetInt32(out int intValue) => intValue, + JsonValueKind.Number when prop.Value.TryGetInt64(out long longValue) => longValue, + JsonValueKind.Number when prop.Value.TryGetDateTime(out DateTime dateTimeValue) => dateTimeValue, + JsonValueKind.Number when prop.Value.TryGetDateTimeOffset(out DateTimeOffset dateTimeOffsetValue) => dateTimeOffsetValue, + JsonValueKind.Number when prop.Value.TryGetGuid(out Guid guidValue) => guidValue, + JsonValueKind.Number => prop.Value.GetDouble(), + JsonValueKind.True => true, + JsonValueKind.False => false, + JsonValueKind.Null => null, + JsonValueKind.Undefined => null, + JsonValueKind.Array => prop.Value, + JsonValueKind.Object => prop.Value, + _ => prop.Value + }; + dict[prop.Name] = value?.ConvertToString(); } + + return dict; + #endregion } - private List BuildArguments(string? name, JsonDocument? args) + private int? RenderSkippingExpression(string? expression, Dictionary dict) { - var keyValues = new List(); - if (args != null) + int? steps = null; + + if (string.IsNullOrWhiteSpace(expression)) { - keyValues.Add(new KeyValue(name ?? "trigger_args", args.RootElement.GetRawText())); + return steps; } - return keyValues; + + var render = _services.GetRequiredService(); + var copy = dict != null + ? new Dictionary(dict.Where(x => x.Value != null).ToDictionary(x => x.Key, x => (object)x.Value!)) + : []; + var result = render.Render(expression, new Dictionary + { + { "states", copy } + }); + + if (int.TryParse(result, out var intVal)) + { + steps = intVal; + } + else if (bool.TryParse(result, out var boolVal) && boolVal) + { + steps = 1; + } + + return steps; } - #endregion } diff --git a/src/Infrastructure/BotSharp.Core.Rules/Models/RuleMessagePayload.cs b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleMessagePayload.cs new file mode 100644 index 000000000..1c5e81d71 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleMessagePayload.cs @@ -0,0 +1,11 @@ +namespace BotSharp.Core.Rules.Models; + +public class RuleMessagePayload +{ + public string AgentId { get; set; } + public string TriggerName { get; set; } + public string Channel { get; set; } + public string Text { get; set; } + public Dictionary States { get; set; } + public DateTime Timestamp { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/Models/RuleTriggerActionRequest.cs b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleTriggerActionRequest.cs new file mode 100644 index 000000000..0abea08b0 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core.Rules/Models/RuleTriggerActionRequest.cs @@ -0,0 +1,18 @@ +using System.Text.Json.Serialization; + +namespace BotSharp.Core.Rules.Models; + +public class RuleTriggerActionRequest +{ + [JsonPropertyName("trigger_name")] + public string TriggerName { get; set; } + + [JsonPropertyName("text")] + public string Text { get; set; } = string.Empty; + + [JsonPropertyName("states")] + public IEnumerable? States { get; set; } + + [JsonPropertyName("options")] + public RuleTriggerOptions? Options { get; set; } +} diff --git a/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs b/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs index 56e1fb8ae..c12ce1f3c 100644 --- a/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs +++ b/src/Infrastructure/BotSharp.Core.Rules/RulesPlugin.cs @@ -1,3 +1,5 @@ +using BotSharp.Core.Rules.Actions; +using BotSharp.Core.Rules.Criteria; using BotSharp.Core.Rules.Engines; namespace BotSharp.Core.Rules; @@ -17,5 +19,15 @@ public class RulesPlugin : IBotSharpPlugin public void RegisterDI(IServiceCollection services, IConfiguration config) { services.AddScoped(); + services.AddScoped(); + + // Register rule actions + services.AddScoped(); + services.AddScoped(); + services.AddScoped(); + +#if DEBUG + services.AddScoped(); +#endif } } diff --git a/src/Infrastructure/BotSharp.Core.Rules/Using.cs b/src/Infrastructure/BotSharp.Core.Rules/Using.cs index a4353c960..2d1dc6844 100644 --- a/src/Infrastructure/BotSharp.Core.Rules/Using.cs +++ b/src/Infrastructure/BotSharp.Core.Rules/Using.cs @@ -1,5 +1,7 @@ global using Microsoft.Extensions.Configuration; global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.Logging; +global using System.Text; global using BotSharp.Abstraction.Agents.Enums; global using BotSharp.Abstraction.Plugins; @@ -8,4 +10,24 @@ global using BotSharp.Abstraction.Instructs; global using BotSharp.Abstraction.Instructs.Models; -global using BotSharp.Abstraction.Rules; \ No newline at end of file +global using BotSharp.Abstraction.Agents.Models; +global using BotSharp.Abstraction.Conversations; + +global using BotSharp.Abstraction.Infrastructures.MessageQueues; +global using BotSharp.Abstraction.Infrastructures.MessageQueues.Models; +global using BotSharp.Abstraction.Models; +global using BotSharp.Abstraction.Repositories.Filters; +global using BotSharp.Abstraction.Rules; +global using BotSharp.Abstraction.Rules.Options; +global using BotSharp.Abstraction.Rules.Models; +global using BotSharp.Abstraction.Rules.Hooks; +global using BotSharp.Abstraction.Utilities; +global using BotSharp.Abstraction.Coding; +global using BotSharp.Abstraction.Coding.Contexts; +global using BotSharp.Abstraction.Coding.Enums; +global using BotSharp.Abstraction.Coding.Models; +global using BotSharp.Abstraction.Coding.Utils; +global using BotSharp.Abstraction.Coding.Settings; +global using BotSharp.Abstraction.Hooks; + +global using BotSharp.Core.Rules.Constants; \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Core/Instructs/Services/InstructService.Execute.cs b/src/Infrastructure/BotSharp.Core/Instructs/Services/InstructService.Execute.cs index f0ec11c23..6995b49ec 100644 --- a/src/Infrastructure/BotSharp.Core/Instructs/Services/InstructService.Execute.cs +++ b/src/Infrastructure/BotSharp.Core/Instructs/Services/InstructService.Execute.cs @@ -195,7 +195,7 @@ public async Task Execute( foreach (var hook in hooks) { await hook.AfterCompletion(agent, instructResult); - await hook.AfterCodeExecution(agent, codeExecution); + await hook.AfterCodeExecution(agent, context, codeExecution); } return instructResult; diff --git a/src/Infrastructure/BotSharp.Core/Messaging/MessagingPlugin.cs b/src/Infrastructure/BotSharp.Core/Messaging/MessagingPlugin.cs new file mode 100644 index 000000000..5c84fcb63 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core/Messaging/MessagingPlugin.cs @@ -0,0 +1,18 @@ +using BotSharp.Abstraction.Infrastructures.MessageQueues; +using Microsoft.Extensions.Configuration; + +namespace BotSharp.Core.Messaging; + +public class MessagingPlugin : IBotSharpPlugin +{ + public string Id => "52a0aa30-4820-42a9-9cae-df0be81bad2b"; + public string Name => "Messaging"; + public string Description => "Provides message queue services."; + + public void RegisterDI(IServiceCollection services, IConfiguration config) + { + var mqSettings = new MessageQueueSettings(); + config.Bind("MessageQueue", mqSettings); + services.AddSingleton(mqSettings); + } +} \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.Logger/Hooks/InstructionLogHook.cs b/src/Infrastructure/BotSharp.Logger/Hooks/InstructionLogHook.cs index 6ef8c5935..117feaebe 100644 --- a/src/Infrastructure/BotSharp.Logger/Hooks/InstructionLogHook.cs +++ b/src/Infrastructure/BotSharp.Logger/Hooks/InstructionLogHook.cs @@ -1,9 +1,11 @@ +using BotSharp.Abstraction.Coding.Contexts; using BotSharp.Abstraction.Coding.Models; using BotSharp.Abstraction.Instructs.Models; using BotSharp.Abstraction.Instructs.Settings; using BotSharp.Abstraction.Loggers.Models; using BotSharp.Abstraction.Users; using BotSharp.Abstraction.Utilities; +using System; namespace BotSharp.Logger.Hooks; @@ -61,7 +63,7 @@ await db.SaveInstructionLogs(new List await base.OnResponseGenerated(response); } - public override async Task AfterCodeExecution(Agent agent, CodeExecutionResponseModel response) + public override async Task AfterCodeExecution(Agent agent, CodeExecutionContext context, CodeExecutionResponseModel response) { if (response == null || !IsLoggingEnabled(agent?.Id)) { @@ -88,7 +90,7 @@ await db.SaveInstructionLogs(new List } }); - await base.AfterCodeExecution(agent, response); + await base.AfterCodeExecution(agent, context, response); } private bool IsLoggingEnabled(string? agentId) diff --git a/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs b/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs index 52fd719fd..de5b21a14 100644 --- a/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs +++ b/src/Infrastructure/BotSharp.OpenAPI/Controllers/Agent/AgentController.Rule.cs @@ -1,4 +1,3 @@ -using BotSharp.Abstraction.Agents.Models; using BotSharp.Abstraction.Rules; namespace BotSharp.OpenAPI.Controllers; @@ -18,9 +17,23 @@ public IEnumerable GetRuleTriggers() }).OrderBy(x => x.TriggerName); } - [HttpGet("/rule/formalization")] - public async Task GetFormalizedRuleDefinition([FromBody] AgentRule rule) + [HttpGet("/rule/criteria-providers")] + public async Task> GetRuleCriteriaProviders() { - return "{}"; + return _services.GetServices().OrderBy(x => x.Provider).Select(x => new KeyValue + { + Key = x.Provider, + Value = x.DefaultConfig != null ? x.DefaultConfig.RootElement.GetRawText() : "{}" + }); + } + + [HttpGet("/rule/actions")] + public async Task> GetRuleActions() + { + return _services.GetServices().OrderBy(x => x.Name).Select(x => new KeyValue + { + Key = x.Name, + Value = x.DefaultConfig != null ? x.DefaultConfig.RootElement.GetRawText() : "{}" + }); } } diff --git a/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs b/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs index be189898e..8e29bb1bb 100644 --- a/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs +++ b/src/Plugins/BotSharp.Plugin.Graph/GraphDb.cs @@ -84,7 +84,7 @@ private async Task SendRequest(string url, GraphQueryRequest r } catch (Exception ex) { - _logger.LogError(ex, $"Error when fetching Lessen GLM response (Endpoint: {url})."); + _logger.LogError(ex, $"Error when fetching {Provider} Graph db response (Endpoint: {url})."); return result; } } diff --git a/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs b/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs index 4205fdc46..1bd76635a 100644 --- a/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs +++ b/src/Plugins/BotSharp.Plugin.MongoStorage/Models/AgentRuleMongoElement.cs @@ -1,4 +1,5 @@ using BotSharp.Abstraction.Agents.Models; +using System.Text.Json; namespace BotSharp.Plugin.MongoStorage.Models; @@ -7,7 +8,8 @@ public class AgentRuleMongoElement { public string TriggerName { get; set; } = default!; public bool Disabled { get; set; } - public string Criteria { get; set; } = default!; + public AgentRuleCriteriaMongoModel? RuleCriteria { get; set; } + public List RuleActions { get; set; } = []; public static AgentRuleMongoElement ToMongoElement(AgentRule rule) { @@ -15,7 +17,8 @@ public static AgentRuleMongoElement ToMongoElement(AgentRule rule) { TriggerName = rule.TriggerName, Disabled = rule.Disabled, - Criteria = rule.Criteria + RuleCriteria = AgentRuleCriteriaMongoModel.ToMongoModel(rule.RuleCriteria), + RuleActions = rule.RuleActions?.Where(x => x != null).Select(x => AgentRuleActionMongoElement.ToMongoElement(x)!)?.ToList() ?? [] }; } @@ -25,7 +28,92 @@ public static AgentRule ToDomainElement(AgentRuleMongoElement rule) { TriggerName = rule.TriggerName, Disabled = rule.Disabled, - Criteria = rule.Criteria + RuleCriteria = AgentRuleCriteriaMongoModel.ToDomainModel(rule.RuleCriteria), + RuleActions = rule.RuleActions?.Where(x => x != null).Select(x => AgentRuleActionMongoElement.ToDomainElement(x)!)?.ToList() ?? [] }; } } + +[BsonIgnoreExtraElements(Inherited = true)] +public class AgentRuleCriteriaMongoModel : AgentRuleConfigMongoModel +{ + public string CriteriaText { get; set; } + + public static AgentRuleCriteriaMongoModel? ToMongoModel(AgentRuleCriteria? criteria) + { + if (criteria == null) + { + return null; + } + + return new AgentRuleCriteriaMongoModel + { + Name = criteria.Name, + CriteriaText = criteria.CriteriaText, + Disabled = criteria.Disabled, + Config = criteria.Config != null ? BsonDocument.Parse(criteria.Config.RootElement.GetRawText()) : null + }; + } + + public static AgentRuleCriteria? ToDomainModel(AgentRuleCriteriaMongoModel? criteria) + { + if (criteria == null) + { + return null; + } + + return new AgentRuleCriteria + { + Name = criteria.Name, + CriteriaText = criteria.CriteriaText, + Disabled = criteria.Disabled, + Config = criteria.Config != null ? JsonDocument.Parse(criteria.Config.ToJson()) : null + }; + } +} + +[BsonIgnoreExtraElements(Inherited = true)] +public class AgentRuleActionMongoElement : AgentRuleConfigMongoModel +{ + public string? SkippingExpression { get; set; } + + public static AgentRuleActionMongoElement? ToMongoElement(AgentRuleAction? action) + { + if (action == null) + { + return null; + } + + return new AgentRuleActionMongoElement + { + Name = action.Name, + Disabled = action.Disabled, + Config = action.Config != null ? BsonDocument.Parse(action.Config.RootElement.GetRawText()) : null, + SkippingExpression = action.SkippingExpression + }; + } + + public static AgentRuleAction? ToDomainElement(AgentRuleActionMongoElement? action) + { + if (action == null) + { + return null; + } + + return new AgentRuleAction + { + Name = action.Name, + Disabled = action.Disabled, + Config = action.Config != null ? JsonDocument.Parse(action.Config.ToJson()) : null, + SkippingExpression = action.SkippingExpression + }; + } +} + +[BsonIgnoreExtraElements(Inherited = true)] +public class AgentRuleConfigMongoModel +{ + public string Name { get; set; } + public bool Disabled { get; set; } + public BsonDocument? Config { get; set; } +} \ No newline at end of file diff --git a/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs b/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs index 300700a64..83b2f1658 100644 --- a/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs +++ b/src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs @@ -634,8 +634,7 @@ public async Task> TruncateConversation(string conversationId, stri continue; } - var values = state.Values.Where(x => x.MessageId != messageId) - .Where(x => x.UpdateTime < refTime) + var values = state.Values.Where(x => x.MessageId != messageId && x.UpdateTime < refTime) .ToList(); if (values.Count == 0) continue; diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/BotSharp.Plugin.RabbitMQ.csproj b/src/Plugins/BotSharp.Plugin.RabbitMQ/BotSharp.Plugin.RabbitMQ.csproj new file mode 100644 index 000000000..4a8f3ff20 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/BotSharp.Plugin.RabbitMQ.csproj @@ -0,0 +1,22 @@ + + + + $(TargetFramework) + enable + $(LangVersion) + $(BotSharpVersion) + $(GeneratePackageOnBuild) + $(GenerateDocumentationFile) + $(SolutionDir)packages + + + + + + + + + + + + diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPool.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPool.cs new file mode 100644 index 000000000..81c7de270 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPool.cs @@ -0,0 +1,73 @@ +using Microsoft.Extensions.ObjectPool; +using RabbitMQ.Client; + +namespace BotSharp.Plugin.RabbitMQ.Connections; + +public class RabbitMQChannelPool +{ + private readonly ObjectPool _pool; + private readonly ILogger _logger; + private readonly int _tryLimit = 3; + + public RabbitMQChannelPool( + IServiceProvider services, + IRabbitMQConnection mqConnection) + { + _logger = services.GetRequiredService().CreateLogger(); + var poolProvider = new DefaultObjectPoolProvider(); + var policy = new ChannelPoolPolicy(mqConnection.Connection); + _pool = poolProvider.Create(policy); + } + + public IChannel Get() + { + var count = 0; + var channel = _pool.Get(); + + while (count < _tryLimit && channel.IsClosed) + { + channel.Dispose(); + channel = _pool.Get(); + count++; + } + + if (channel.IsClosed) + { + _logger.LogWarning($"No open channel from the pool after {_tryLimit} retries."); + } + + return channel; + } + + public void Return(IChannel channel) + { + if (channel.IsOpen) + { + _pool.Return(channel); + } + else + { + channel.Dispose(); + } + } +} + +internal class ChannelPoolPolicy : IPooledObjectPolicy +{ + private readonly IConnection _connection; + + public ChannelPoolPolicy(IConnection connection) + { + _connection = connection; + } + + public IChannel Create() + { + return _connection.CreateChannelAsync().ConfigureAwait(false).GetAwaiter().GetResult(); + } + + public bool Return(IChannel obj) + { + return true; + } +} \ No newline at end of file diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPoolFactory.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPoolFactory.cs new file mode 100644 index 000000000..989c0a7b7 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQChannelPoolFactory.cs @@ -0,0 +1,13 @@ +using System.Collections.Concurrent; + +namespace BotSharp.Plugin.RabbitMQ.Connections; + +public static class RabbitMQChannelPoolFactory +{ + private static readonly ConcurrentDictionary _poolDict = new(); + + public static RabbitMQChannelPool GetChannelPool(IServiceProvider services, IRabbitMQConnection rabbitMQConnection) + { + return _poolDict.GetOrAdd(rabbitMQConnection.Connection.ToString()!, key => new RabbitMQChannelPool(services, rabbitMQConnection)); + } +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQConnection.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQConnection.cs new file mode 100644 index 000000000..dac9e8c07 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Connections/RabbitMQConnection.cs @@ -0,0 +1,154 @@ +using Polly; +using Polly.Retry; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using System.Threading; + +namespace BotSharp.Plugin.RabbitMQ.Connections; + +public class RabbitMQConnection : IRabbitMQConnection +{ + private readonly RabbitMQSettings _settings; + private readonly IConnectionFactory _connectionFactory; + private readonly SemaphoreSlim _lock = new(initialCount: 1, maxCount: 1); + private readonly ILogger _logger; + private readonly int _retryCount = 5; + + private IConnection _connection; + private bool _disposed = false; + + public RabbitMQConnection( + RabbitMQSettings settings, + ILogger logger) + { + _settings = settings; + _logger = logger; + _connectionFactory = new ConnectionFactory + { + HostName = settings.HostName, + Port = settings.Port, + UserName = settings.UserName, + Password = settings.Password, + VirtualHost = settings.VirtualHost, + ConsumerDispatchConcurrency = 1, + AutomaticRecoveryEnabled = true, + HandshakeContinuationTimeout = TimeSpan.FromSeconds(20) + }; + } + + public bool IsConnected => _connection != null && _connection.IsOpen && !_disposed; + + public IConnection Connection => _connection; + + public async Task CreateChannelAsync() + { + if (!IsConnected) + { + throw new InvalidOperationException("Rabbit MQ is not connectioned."); + } + return await _connection.CreateChannelAsync(); + } + + public async Task ConnectAsync() + { + await _lock.WaitAsync(); + + try + { + if (IsConnected) + { + return true; + } + + var policy = BuildRetryPolicy(); + await policy.Execute(async () => + { + _connection = await _connectionFactory.CreateConnectionAsync(); + }); + + if (IsConnected) + { + _connection.ConnectionShutdownAsync += OnConnectionShutdownAsync; + _connection.CallbackExceptionAsync += OnCallbackExceptionAsync; + _connection.ConnectionBlockedAsync += OnConnectionBlockedAsync; + _logger.LogInformation($"Rabbit MQ client connection success. host: {_connection.Endpoint.HostName}, port: {_connection.Endpoint.Port}, localPort:{_connection.LocalPort}"); + return true; + } + _logger.LogError("Rabbit MQ client connection error."); + return false; + } + finally + { + _lock.Release(); + } + + } + + private RetryPolicy BuildRetryPolicy() + { + return Policy.Handle().WaitAndRetry( + _retryCount, + retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), + (ex, time) => + { + _logger.LogError(ex, $"RabbitMQ cannot build connection: after {time.TotalSeconds:n1}s"); + }); + } + + private Task OnConnectionShutdownAsync(object sender, ShutdownEventArgs e) + { + if (_disposed) + { + return Task.CompletedTask; + } + + _logger.LogError($"Rabbit MQ connection is shutdown. {e}."); + return Task.CompletedTask; + } + + private Task OnCallbackExceptionAsync(object sender, CallbackExceptionEventArgs e) + { + if (_disposed) + { + return Task.CompletedTask; + } + + _logger.LogError($"Rabbit MQ connection throw exception. Trying to reconnect, {e.Exception}."); + return Task.CompletedTask; + } + + private Task OnConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs e) + { + if (_disposed) + { + return Task.CompletedTask; + } + + _logger.LogError($"Rabbit MQ connection is shutdown. Trying to reconnect, {e.Reason}."); + return Task.CompletedTask; + } + + + public void Dispose() + { + if (_disposed) + { + return; + } + + _logger.LogWarning("Start disposing Rabbit MQ connection."); + + try + { + _connection.Dispose(); + _disposed = true; + _logger.LogWarning("Disposed Rabbit MQ connection."); + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error when disposing Rabbit MQ connection"); + } + + GC.SuppressFinalize(this); + } +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/DummyMessageConsumer.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/DummyMessageConsumer.cs new file mode 100644 index 000000000..36af0df90 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/DummyMessageConsumer.cs @@ -0,0 +1,24 @@ +namespace BotSharp.Plugin.RabbitMQ.Consumers; + +public class DummyMessageConsumer : MQConsumerBase +{ + public override object Config => new RabbitMQConsumerConfig + { + ExchangeName = "my.exchange", + QueueName = "dummy.queue", + RoutingKey = "my.routing" + }; + + public DummyMessageConsumer( + IServiceProvider services, + ILogger logger) + : base(services, logger) + { + } + + public override async Task HandleMessageAsync(string channel, string data) + { + _logger.LogCritical($"Received delayed dummy message data: {data}"); + return await Task.FromResult(true); + } +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/ScheduledMessageConsumer.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/ScheduledMessageConsumer.cs new file mode 100644 index 000000000..f6040dcd7 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Consumers/ScheduledMessageConsumer.cs @@ -0,0 +1,25 @@ +namespace BotSharp.Plugin.RabbitMQ.Consumers; + +public class ScheduledMessageConsumer : MQConsumerBase +{ + public override object Config => new RabbitMQConsumerConfig + { + ExchangeName = "my.exchange", + QueueName = "scheduled.queue", + RoutingKey = "my.routing" + }; + + public ScheduledMessageConsumer( + IServiceProvider services, + ILogger logger) + : base(services, logger) + { + } + + public override async Task HandleMessageAsync(string channel, string data) + { + _logger.LogCritical($"Received delayed scheduled message data: {data}"); + return await Task.FromResult(true); + } +} + diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Controllers/RabbitMQController.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Controllers/RabbitMQController.cs new file mode 100644 index 000000000..802e4fa1b --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Controllers/RabbitMQController.cs @@ -0,0 +1,89 @@ +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Mvc; + +namespace BotSharp.Plugin.RabbitMQ.Controllers; + +[Authorize] +[ApiController] +public class RabbitMQController : ControllerBase +{ + private readonly IServiceProvider _services; + private readonly IMQService _mqService; + private readonly ILogger _logger; + + public RabbitMQController( + IServiceProvider services, + IMQService mqService, + ILogger logger) + { + _services = services; + _mqService = mqService; + _logger = logger; + } + + /// + /// Publish a scheduled message to be delivered after a delay + /// + /// The scheduled message request + [HttpPost("/message-queue/publish")] + public async Task PublishScheduledMessage([FromBody] PublishScheduledMessageRequest request) + { + if (request == null) + { + return BadRequest(new PublishMessageResponse { Success = false, Error = "Request body is required." }); + } + + try + { + var payload = new ScheduledMessagePayload + { + Name = request.Name ?? "Hello" + }; + + var success = await _mqService.PublishAsync( + payload, + options: new() + { + TopicName = "my.exchange", + RoutingKey = "my.routing", + DelayMilliseconds = request.DelayMilliseconds ?? 10000, + MessageId = request.MessageId + }); + return Ok(new { Success = success }); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to publish scheduled message"); + return StatusCode(StatusCodes.Status500InternalServerError, + new PublishMessageResponse { Success = false, Error = ex.Message }); + } + } + + /// + /// Unsubscribe a consumer + /// + /// + /// + [HttpPost("/message-queue/unsubscribe/consumer")] + public async Task UnSubscribeConsuer([FromBody] UnsubscribeConsumerRequest request) + { + if (request == null) + { + return BadRequest(new { Success = false, Error = "Request body is required." }); + } + + try + { + var success = await _mqService.UnsubscribeAsync(request.Name); + return Ok(new { Success = success }); + } + catch (Exception ex) + { + _logger.LogError(ex, $"Failed to unsubscribe consumer {request.Name}"); + return StatusCode(StatusCodes.Status500InternalServerError, + new { Success = false, Error = ex.Message }); + } + } +} + diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Interfaces/IRabbitMQConnection.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Interfaces/IRabbitMQConnection.cs new file mode 100644 index 000000000..cb89c2976 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Interfaces/IRabbitMQConnection.cs @@ -0,0 +1,11 @@ +using RabbitMQ.Client; + +namespace BotSharp.Plugin.RabbitMQ.Interfaces; + +public interface IRabbitMQConnection : IDisposable +{ + bool IsConnected { get; } + IConnection Connection { get; } + Task CreateChannelAsync(); + Task ConnectAsync(); +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/PublishDelayedMessageRequest.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/PublishDelayedMessageRequest.cs new file mode 100644 index 000000000..ad655b795 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/PublishDelayedMessageRequest.cs @@ -0,0 +1,31 @@ +namespace BotSharp.Plugin.RabbitMQ.Models; + +/// +/// Request model for publishing a scheduled message +/// +public class PublishScheduledMessageRequest +{ + public string? Name { get; set; } + + public long? DelayMilliseconds { get; set; } + + public string? MessageId { get; set; } +} + + +/// +/// Response model for publish operations +/// +public class PublishMessageResponse +{ + /// + /// Whether the message was successfully published + /// + public bool Success { get; set; } + + /// + /// Error message if publish failed + /// + public string? Error { get; set; } +} + diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/RabbitMQConsumerConfig.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/RabbitMQConsumerConfig.cs new file mode 100644 index 000000000..93754d455 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/RabbitMQConsumerConfig.cs @@ -0,0 +1,24 @@ +namespace BotSharp.Plugin.RabbitMQ.Models; + +internal class RabbitMQConsumerConfig +{ + /// + /// The exchange name (topic in some MQ systems). + /// + internal string ExchangeName { get; set; } = "rabbitmq.exchange"; + + /// + /// The queue name (subscription in some MQ systems). + /// + internal string QueueName { get; set; } = "rabbitmq.queue"; + + /// + /// The routing key (filter in some MQ systems). + /// + internal string RoutingKey { get; set; } = "rabbitmq.routing"; + + /// + /// Additional arguments for the consumer configuration. + /// + internal Dictionary Arguments { get; set; } = new(); +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/ScheduledMessagePayload.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/ScheduledMessagePayload.cs new file mode 100644 index 000000000..2180fb2d7 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/ScheduledMessagePayload.cs @@ -0,0 +1,9 @@ +namespace BotSharp.Plugin.RabbitMQ.Models; + +/// +/// Payload for scheduled/delayed messages +/// +public class ScheduledMessagePayload +{ + public string Name { get; set; } +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/UnsubscribeConsumerRequest.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/UnsubscribeConsumerRequest.cs new file mode 100644 index 000000000..509d432b2 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Models/UnsubscribeConsumerRequest.cs @@ -0,0 +1,6 @@ +namespace BotSharp.Plugin.RabbitMQ.Models; + +public class UnsubscribeConsumerRequest +{ + public string Name { get; set; } +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/RabbitMQPlugin.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/RabbitMQPlugin.cs new file mode 100644 index 000000000..ff45dfe48 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/RabbitMQPlugin.cs @@ -0,0 +1,56 @@ +using BotSharp.Plugin.RabbitMQ.Services; +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.Configuration; + +namespace BotSharp.Plugin.RabbitMQ; + +public class RabbitMQPlugin : IBotSharpAppPlugin +{ + public string Id => "3f93407f-3c37-4e25-be28-142a2da9b514"; + public string Name => "RabbitMQ"; + public string Description => "Handle AI messages in RabbitMQ."; + public string IconUrl => "https://icon-library.com/images/message-queue-icon/message-queue-icon-13.jpg"; + + public void RegisterDI(IServiceCollection services, IConfiguration config) + { + var settings = new RabbitMQSettings(); + config.Bind("RabbitMQ", settings); + services.AddSingleton(settings); + + var mqSettings = new MessageQueueSettings(); + config.Bind("MessageQueue", mqSettings); + + if (mqSettings.Enabled && mqSettings.Provider.IsEqualTo("RabbitMQ")) + { + services.AddSingleton(); + services.AddSingleton(); + } + } + + public void Configure(IApplicationBuilder app) + { +#if DEBUG + var sp = app.ApplicationServices; + var mqSettings = sp.GetRequiredService(); + + if (mqSettings.Enabled && mqSettings.Provider.IsEqualTo("RabbitMQ")) + { + var mqService = sp.GetRequiredService(); + var loggerFactory = sp.GetRequiredService(); + + // Create and subscribe the consumer using the abstract interface + var scheduledConsumer = new ScheduledMessageConsumer(sp, loggerFactory.CreateLogger()); + mqService.SubscribeAsync(nameof(ScheduledMessageConsumer), scheduledConsumer) + .ConfigureAwait(false) + .GetAwaiter() + .GetResult(); + + var dummyConsumer = new DummyMessageConsumer(sp, loggerFactory.CreateLogger()); + mqService.SubscribeAsync(nameof(DummyMessageConsumer), dummyConsumer) + .ConfigureAwait(false) + .GetAwaiter() + .GetResult(); + } +#endif + } +} \ No newline at end of file diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Services/RabbitMQService.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Services/RabbitMQService.cs new file mode 100644 index 000000000..0117bad14 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Services/RabbitMQService.cs @@ -0,0 +1,318 @@ +using Polly; +using Polly.Retry; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using System.Collections.Concurrent; + +namespace BotSharp.Plugin.RabbitMQ.Services; + +public class RabbitMQService : IMQService +{ + private readonly IRabbitMQConnection _mqConnection; + private readonly IServiceProvider _services; + private readonly ILogger _logger; + + private readonly int _retryCount = 5; + private bool _disposed = false; + private static readonly ConcurrentDictionary _consumers = []; + + public RabbitMQService( + IRabbitMQConnection mqConnection, + IServiceProvider services, + ILogger logger) + { + _mqConnection = mqConnection; + _services = services; + _logger = logger; + } + + public async Task SubscribeAsync(string key, IMQConsumer consumer) + { + if (_consumers.ContainsKey(key)) + { + _logger.LogWarning($"Consumer with key '{key}' is already subscribed."); + return false; + } + + var registration = await CreateConsumerRegistrationAsync(consumer); + if (registration != null && _consumers.TryAdd(key, registration)) + { + var config = consumer.Config as RabbitMQConsumerConfig ?? new(); + _logger.LogInformation($"Consumer '{key}' subscribed to queue '{config.QueueName}'."); + return true; + } + + return false; + } + + public async Task UnsubscribeAsync(string key) + { + if (!_consumers.TryRemove(key, out var registration)) + { + return false; + } + + try + { + if (registration.Channel != null) + { + registration.Channel.Dispose(); + } + registration.Consumer.Dispose(); + _logger.LogInformation($"Consumer '{key}' unsubscribed."); + return true; + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error unsubscribing consumer '{key}'."); + return false; + } + } + + private async Task CreateConsumerRegistrationAsync(IMQConsumer consumer) + { + try + { + var channel = await CreateChannelAsync(consumer); + + var config = consumer.Config as RabbitMQConsumerConfig ?? new(); + var registration = new ConsumerRegistration(consumer, channel); + + var asyncConsumer = new AsyncEventingBasicConsumer(channel); + asyncConsumer.ReceivedAsync += async (sender, eventArgs) => + { + await ConsumeEventAsync(registration, eventArgs); + }; + + await channel.BasicConsumeAsync( + queue: config.QueueName, + autoAck: false, + consumer: asyncConsumer); + + _logger.LogWarning($"RabbitMQ consuming queue '{config.QueueName}'."); + return registration; + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error when register consumer in RabbitMQ."); + return null; + } + } + + private async Task CreateChannelAsync(IMQConsumer consumer) + { + if (!_mqConnection.IsConnected) + { + await _mqConnection.ConnectAsync(); + } + + var config = consumer.Config as RabbitMQConsumerConfig ?? new(); + var channel = await _mqConnection.CreateChannelAsync(); + _logger.LogWarning($"Created RabbitMQ channel {channel.ChannelNumber} for queue '{config.QueueName}'"); + + var args = new Dictionary + { + ["x-delayed-type"] = "direct" + }; + + if (config.Arguments != null) + { + foreach (var kvp in config.Arguments) + { + args[kvp.Key] = kvp.Value; + } + } + + await channel.ExchangeDeclareAsync( + exchange: config.ExchangeName, + type: "x-delayed-message", + durable: true, + autoDelete: false, + arguments: args); + + await channel.QueueDeclareAsync( + queue: config.QueueName, + durable: true, + exclusive: false, + autoDelete: false); + + await channel.QueueBindAsync( + queue: config.QueueName, + exchange: config.ExchangeName, + routingKey: config.RoutingKey); + + return channel; + } + + private async Task ConsumeEventAsync(ConsumerRegistration registration, BasicDeliverEventArgs eventArgs) + { + var data = string.Empty; + var config = registration.Consumer.Config as RabbitMQConsumerConfig ?? new(); + + try + { + data = Encoding.UTF8.GetString(eventArgs.Body.Span); + _logger.LogInformation($"Message received on '{config.QueueName}', id: {eventArgs.BasicProperties?.MessageId}, data: {data}"); + + var isHandled = await registration.Consumer.HandleMessageAsync(config.QueueName, data); + if (registration.Channel?.IsOpen == true) + { + if (isHandled) + { + await registration.Channel.BasicAckAsync(eventArgs.DeliveryTag, multiple: false); + } + else + { + await registration.Channel.BasicNackAsync(eventArgs.DeliveryTag, multiple: false, requeue: false); + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error consuming message on queue '{config.QueueName}': {data}"); + if (registration.Channel?.IsOpen == true) + { + await registration.Channel.BasicNackAsync(eventArgs.DeliveryTag, multiple: false, requeue: false); + } + } + } + + public async Task PublishAsync(T payload, MQPublishOptions options) + { + try + { + if (options == null) + { + return false; + } + + if (!_mqConnection.IsConnected) + { + await _mqConnection.ConnectAsync(); + } + + var isPublished = false; + var policy = BuildRetryPolicy(); + await policy.Execute(async () => + { + var channelPool = RabbitMQChannelPoolFactory.GetChannelPool(_services, _mqConnection); + var channel = channelPool.Get(); + + try + { + var args = new Dictionary + { + ["x-delayed-type"] = "direct" + }; + + if (!options.Arguments.IsNullOrEmpty()) + { + foreach (var kvp in options.Arguments) + { + args[kvp.Key] = kvp.Value; + } + } + + await channel.ExchangeDeclareAsync( + exchange: options.TopicName, + type: "x-delayed-message", + durable: true, + autoDelete: false, + arguments: args); + + var messageId = options.MessageId ?? Guid.NewGuid().ToString(); + var message = new MQMessage(payload, messageId); + var body = ConvertToBinary(message, options.JsonOptions); + var properties = new BasicProperties + { + MessageId = messageId, + DeliveryMode = DeliveryModes.Persistent, + Headers = new Dictionary + { + ["x-delay"] = options.DelayMilliseconds + } + }; + + await channel.BasicPublishAsync( + exchange: options.TopicName, + routingKey: options.RoutingKey, + mandatory: true, + basicProperties: properties, + body: body); + + isPublished = true; + } + catch (Exception) + { + throw; + } + finally + { + channelPool.Return(channel); + } + }); + + return isPublished; + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error when RabbitMQ publish message."); + return false; + } + } + + private RetryPolicy BuildRetryPolicy() + { + return Policy.Handle().WaitAndRetry( + _retryCount, + retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), + (ex, time) => + { + _logger.LogError(ex, $"RabbitMQ publish error: after {time.TotalSeconds:n1}s"); + }); + } + + private static byte[] ConvertToBinary(T data, JsonSerializerOptions? jsonOptions = null) + { + var jsonStr = JsonSerializer.Serialize(data, jsonOptions); + var body = Encoding.UTF8.GetBytes(jsonStr); + return body; + } + + public void Dispose() + { + if (_disposed) + { + return; + } + + _logger.LogWarning($"Disposing {nameof(RabbitMQService)}"); + + foreach (var item in _consumers) + { + if (item.Value.Channel != null) + { + item.Value.Channel.Dispose(); + } + item.Value.Consumer.Dispose(); + } + + _disposed = true; + GC.SuppressFinalize(this); + } + + /// + /// Internal class to track consumer registrations with their RabbitMQ channels. + /// + private class ConsumerRegistration + { + public IMQConsumer Consumer { get; } + public IChannel? Channel { get; } + + public ConsumerRegistration(IMQConsumer consumer, IChannel? channel) + { + Consumer = consumer; + Channel = channel; + } + } +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Settings/RabbitMQSettings.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Settings/RabbitMQSettings.cs new file mode 100644 index 000000000..0e61b5c71 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Settings/RabbitMQSettings.cs @@ -0,0 +1,10 @@ +namespace BotSharp.Plugin.RabbitMQ.Settings; + +public class RabbitMQSettings +{ + public string HostName { get; set; } = "localhost"; + public int Port { get; set; } = 5672; + public string UserName { get; set; } = "guest"; + public string Password { get; set; } = "guest"; + public string VirtualHost { get; set; } = "/"; +} diff --git a/src/Plugins/BotSharp.Plugin.RabbitMQ/Using.cs b/src/Plugins/BotSharp.Plugin.RabbitMQ/Using.cs new file mode 100644 index 000000000..0a8a8c3a5 --- /dev/null +++ b/src/Plugins/BotSharp.Plugin.RabbitMQ/Using.cs @@ -0,0 +1,38 @@ +global using System; +global using System.Collections.Generic; +global using System.Text; +global using System.Linq; +global using System.Text.Json; +global using System.Net.Mime; +global using System.Threading.Tasks; +global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.Logging; +global using BotSharp.Abstraction.Agents; +global using BotSharp.Abstraction.Conversations; +global using BotSharp.Abstraction.Plugins; +global using BotSharp.Abstraction.Conversations.Models; +global using BotSharp.Abstraction.Functions; +global using BotSharp.Abstraction.Agents.Models; +global using BotSharp.Abstraction.Agents.Enums; +global using BotSharp.Abstraction.Files.Enums; +global using BotSharp.Abstraction.Files.Models; +global using BotSharp.Abstraction.Files.Converters; +global using BotSharp.Abstraction.Files; +global using BotSharp.Abstraction.MLTasks; +global using BotSharp.Abstraction.Utilities; +global using BotSharp.Abstraction.Agents.Settings; +global using BotSharp.Abstraction.Functions.Models; +global using BotSharp.Abstraction.Repositories; +global using BotSharp.Abstraction.Settings; +global using BotSharp.Abstraction.Messaging; +global using BotSharp.Abstraction.Messaging.Models.RichContent; +global using BotSharp.Abstraction.Options; +global using BotSharp.Abstraction.Models; +global using BotSharp.Abstraction.Infrastructures.MessageQueues; +global using BotSharp.Abstraction.Infrastructures.MessageQueues.Models; + +global using BotSharp.Plugin.RabbitMQ.Settings; +global using BotSharp.Plugin.RabbitMQ.Models; +global using BotSharp.Plugin.RabbitMQ.Interfaces; +global using BotSharp.Plugin.RabbitMQ.Consumers; +global using BotSharp.Plugin.RabbitMQ.Connections; \ No newline at end of file diff --git a/src/WebStarter/WebStarter.csproj b/src/WebStarter/WebStarter.csproj index 9374d95fd..be332a38e 100644 --- a/src/WebStarter/WebStarter.csproj +++ b/src/WebStarter/WebStarter.csproj @@ -42,6 +42,7 @@ + diff --git a/src/WebStarter/appsettings.json b/src/WebStarter/appsettings.json index 39587b64e..b3c2b35e9 100644 --- a/src/WebStarter/appsettings.json +++ b/src/WebStarter/appsettings.json @@ -1006,6 +1006,7 @@ "Language": "en" } }, + "A2AIntegration": { "Enabled": true, "DefaultTimeoutSeconds": 30, @@ -1018,12 +1019,27 @@ } ] }, + + "MessageQueue": { + "Enabled": false, + "Provider": "RabbitMQ" + }, + + "RabbitMQ": { + "HostName": "localhost", + "Port": 5672, + "UserName": "guest", + "Password": "guest", + "VirtualHost": "/" + }, + "PluginLoader": { "Assemblies": [ "BotSharp.Core", "BotSharp.Core.A2A", "BotSharp.Core.SideCar", "BotSharp.Core.Crontab", + "BotSharp.Core.Rules", "BotSharp.Core.Realtime", "BotSharp.Logger", "BotSharp.Plugin.MongoStorage", @@ -1061,7 +1077,8 @@ "BotSharp.Plugin.PythonInterpreter", "BotSharp.Plugin.FuzzySharp", "BotSharp.Plugin.MMPEmbedding", - "BotSharp.Plugin.MultiTenancy" + "BotSharp.Plugin.MultiTenancy", + "BotSharp.Plugin.RabbitMQ" ] },