From 0a8b68a0411c0dd8fd5b4d5393690ed1c0e10e2e Mon Sep 17 00:00:00 2001 From: David Steiner Date: Fri, 6 Feb 2026 11:09:09 +0100 Subject: [PATCH 1/7] Fix flaky message verification test by addressing race condition in original sending time --- crates/hotfix/src/message/verification.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/hotfix/src/message/verification.rs b/crates/hotfix/src/message/verification.rs index 0ecc6cf..9e884fd 100644 --- a/crates/hotfix/src/message/verification.rs +++ b/crates/hotfix/src/message/verification.rs @@ -323,9 +323,9 @@ mod tests { fn test_seq_number_too_low_with_poss_dup_flag() { let config = build_test_config(); let mut msg = build_test_message("FIX.4.4", "TARGET", "SENDER", 40); + let sending_time: Timestamp = msg.header().get(fix44::SENDING_TIME).unwrap(); msg.header_mut().set(fix44::POSS_DUP_FLAG, true); - msg.header_mut() - .set(fix44::ORIG_SENDING_TIME, Timestamp::utc_now()); + msg.header_mut().set(fix44::ORIG_SENDING_TIME, sending_time); let result = verify_message(&msg, &config, Some(42)); From a4ec94381c2dc861f8d294d9369b28311670ad17 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Fri, 6 Feb 2026 11:10:17 +0100 Subject: [PATCH 2/7] Remove unnecessary fully qualified paths for Result --- crates/hotfix/src/session.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/hotfix/src/session.rs b/crates/hotfix/src/session.rs index e7f5855..a705dd3 100644 --- a/crates/hotfix/src/session.rs +++ b/crates/hotfix/src/session.rs @@ -81,7 +81,7 @@ where config: SessionConfig, application: App, store: Store, - ) -> std::result::Result, SessionCreationError> { + ) -> Result, SessionCreationError> { let schedule_check_timer = sleep(Duration::from_secs(SCHEDULE_CHECK_INTERVAL)); let dictionary = Self::get_data_dictionary(&config)?; @@ -296,7 +296,7 @@ where &self, message: &Message, verify_target_seq_number: bool, - ) -> std::result::Result<(), MessageVerificationError> { + ) -> Result<(), MessageVerificationError> { let expected_seq_number = if verify_target_seq_number { Some(self.store.next_target_seq_number()) } else { From fe14683a10545b61458e0927ec6a531386e678e1 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Fri, 6 Feb 2026 11:49:17 +0100 Subject: [PATCH 3/7] Make application take unparsed messages --- crates/hotfix-message/src/message.rs | 1 + crates/hotfix-message/src/parts/body.rs | 2 +- crates/hotfix-message/src/parts/header.rs | 2 +- crates/hotfix-message/src/parts/trailer.rs | 2 +- crates/hotfix/src/application.rs | 8 +- crates/hotfix/src/initiator.rs | 29 ++---- crates/hotfix/src/message.rs | 4 - crates/hotfix/src/message/heartbeat.rs | 9 +- crates/hotfix/src/message/reject.rs | 32 +++---- crates/hotfix/src/session.rs | 37 +++---- crates/hotfix/src/session/session_ref.rs | 6 +- .../tests/connection_test_cases/helpers.rs | 12 +-- .../session_test_cases/business_tests.rs | 10 +- .../session_test_cases/common/assertions.rs | 2 +- .../common/fakes/fake_application.rs | 13 +-- .../common/fakes/session_spy.rs | 7 +- .../common/test_messages.rs | 96 +++++++++---------- .../tests/session_test_cases/resend_tests.rs | 18 ++-- .../send_confirmation_tests.rs | 2 +- examples/load-testing/src/application.rs | 8 +- examples/load-testing/src/messages.rs | 40 ++++---- examples/simple-new-order/src/application.rs | 19 +--- examples/simple-new-order/src/messages.rs | 14 +-- 23 files changed, 161 insertions(+), 212 deletions(-) diff --git a/crates/hotfix-message/src/message.rs b/crates/hotfix-message/src/message.rs index 254fb45..6fddf40 100644 --- a/crates/hotfix-message/src/message.rs +++ b/crates/hotfix-message/src/message.rs @@ -10,6 +10,7 @@ use crate::parts::{Body, Header, Part, RepeatingGroup, Trailer}; use crate::session_fields::{BEGIN_STRING, BODY_LENGTH, CHECK_SUM, MSG_TYPE}; use hotfix_dictionary::{FieldLocation, IsFieldDefinition}; +#[derive(Clone)] pub struct Message { pub(crate) header: Header, pub(crate) body: Body, diff --git a/crates/hotfix-message/src/parts/body.rs b/crates/hotfix-message/src/parts/body.rs index 9ee58dd..d732880 100644 --- a/crates/hotfix-message/src/parts/body.rs +++ b/crates/hotfix-message/src/parts/body.rs @@ -1,7 +1,7 @@ use crate::field_map::FieldMap; use crate::parts::Part; -#[derive(Default)] +#[derive(Clone, Default)] pub struct Body { pub(crate) fields: FieldMap, } diff --git a/crates/hotfix-message/src/parts/header.rs b/crates/hotfix-message/src/parts/header.rs index 277341c..1c40740 100644 --- a/crates/hotfix-message/src/parts/header.rs +++ b/crates/hotfix-message/src/parts/header.rs @@ -4,7 +4,7 @@ use crate::field_map::FieldMap; use crate::parts::Part; use crate::session_fields; -#[derive(Default)] +#[derive(Clone, Default)] pub struct Header { pub fields: FieldMap, } diff --git a/crates/hotfix-message/src/parts/trailer.rs b/crates/hotfix-message/src/parts/trailer.rs index 01e2fee..3568092 100644 --- a/crates/hotfix-message/src/parts/trailer.rs +++ b/crates/hotfix-message/src/parts/trailer.rs @@ -3,7 +3,7 @@ use crate::parts::Part; use crate::session_fields; use hotfix_dictionary::IsFieldDefinition; -#[derive(Default)] +#[derive(Clone, Default)] pub struct Trailer { pub(crate) fields: FieldMap, } diff --git a/crates/hotfix/src/application.rs b/crates/hotfix/src/application.rs index d57b08f..8539d2e 100644 --- a/crates/hotfix/src/application.rs +++ b/crates/hotfix/src/application.rs @@ -1,14 +1,16 @@ +use hotfix_message::message::Message; + #[async_trait::async_trait] /// The application users of HotFIX can implement to hook into the engine. -pub trait Application: Send + Sync + 'static { +pub trait Application: Send + Sync + 'static { /// Called when a message is sent to the engine to be sent to the counterparty. /// /// This is invoked before the raw message is persisted in the message store. async fn on_outbound_message(&self, msg: &Outbound) -> OutboundDecision; /// Called when a message is received from the counterparty. /// - /// This is invoked after the message is verified and parsed into a typed message. - async fn on_inbound_message(&self, msg: Inbound) -> InboundDecision; + /// This is invoked after the message is verified by the session layer. + async fn on_inbound_message(&self, msg: &Message) -> InboundDecision; /// Called when the session is logged out. async fn on_logout(&mut self, reason: &str); /// Called when the session is logged on. diff --git a/crates/hotfix/src/initiator.rs b/crates/hotfix/src/initiator.rs index b7ef35e..9989168 100644 --- a/crates/hotfix/src/initiator.rs +++ b/crates/hotfix/src/initiator.rs @@ -13,7 +13,7 @@ use tracing::{debug, warn}; use crate::application::Application; use crate::config::SessionConfig; -use crate::message::{InboundMessage, OutboundMessage}; +use crate::message::OutboundMessage; use crate::session::error::{SendError, SendOutcome, SessionCreationError}; use crate::session::{InternalSessionRef, SessionHandle}; use crate::store::MessageStore; @@ -27,9 +27,9 @@ pub struct Initiator { } impl Initiator { - pub async fn start( + pub async fn start( config: SessionConfig, - application: impl Application, + application: impl Application, store: impl MessageStore + 'static, ) -> Result { let session_ref = InternalSessionRef::new(config.clone(), application, store)?; @@ -157,10 +157,10 @@ async fn establish_connection( mod tests { use super::*; use crate::application::{Application, InboundDecision, OutboundDecision}; + use crate::message::generate_message; use crate::message::logon::{Logon, ResetSeqNumConfig}; use crate::message::logout::Logout; use crate::message::parser::Parser; - use crate::message::{InboundMessage, generate_message}; use crate::store::in_memory::InMemoryMessageStore; use hotfix_message::Part; use hotfix_message::message::Message; @@ -180,21 +180,15 @@ mod tests { } } - impl InboundMessage for DummyMessage { - fn parse(_message: &Message) -> Self { - DummyMessage - } - } - // No-op application struct NoOpApp; #[async_trait::async_trait] - impl Application for NoOpApp { + impl Application for NoOpApp { async fn on_outbound_message(&self, _msg: &DummyMessage) -> OutboundDecision { OutboundDecision::Send } - async fn on_inbound_message(&self, _msg: DummyMessage) -> InboundDecision { + async fn on_inbound_message(&self, _msg: &Message) -> InboundDecision { InboundDecision::Accept } async fn on_logout(&mut self, _reason: &str) {} @@ -366,13 +360,10 @@ mod tests { let mut config = create_test_config("127.0.0.1", port); config.reconnect_interval = 1; // Short interval for test - let _initiator = Initiator::::start::( - config, - NoOpApp, - InMemoryMessageStore::default(), - ) - .await - .unwrap(); + let _initiator = + Initiator::::start(config, NoOpApp, InMemoryMessageStore::default()) + .await + .unwrap(); // Accept first connection let (conn1, _) = tokio::time::timeout(Duration::from_secs(2), listener.accept()) diff --git a/crates/hotfix/src/message.rs b/crates/hotfix/src/message.rs index 5d2ba92..2bafbf0 100644 --- a/crates/hotfix/src/message.rs +++ b/crates/hotfix/src/message.rs @@ -25,10 +25,6 @@ pub trait OutboundMessage: Clone + Send + 'static { fn message_type(&self) -> &str; } -pub trait InboundMessage: Clone + Send + 'static { - fn parse(message: &Message) -> Self; -} - pub fn generate_message( begin_string: &str, sender_comp_id: &str, diff --git a/crates/hotfix/src/message/heartbeat.rs b/crates/hotfix/src/message/heartbeat.rs index 9d4c639..56470fe 100644 --- a/crates/hotfix/src/message/heartbeat.rs +++ b/crates/hotfix/src/message/heartbeat.rs @@ -1,4 +1,4 @@ -use crate::message::{InboundMessage, OutboundMessage}; +use crate::message::OutboundMessage; use hotfix_message::Part; use hotfix_message::message::Message; use hotfix_message::session_fields::TEST_REQ_ID; @@ -27,10 +27,3 @@ impl OutboundMessage for Heartbeat { "0" } } - -impl InboundMessage for Heartbeat { - fn parse(_message: &Message) -> Self { - // TODO: this needs to be implemented properly when we're implementing Test Requests - Heartbeat { test_req_id: None } - } -} diff --git a/crates/hotfix/src/message/reject.rs b/crates/hotfix/src/message/reject.rs index bf81ec0..d0c1e54 100644 --- a/crates/hotfix/src/message/reject.rs +++ b/crates/hotfix/src/message/reject.rs @@ -1,4 +1,4 @@ -use crate::message::{InboundMessage, OutboundMessage}; +use crate::message::OutboundMessage; use hotfix_message::Part; use hotfix_message::message::Message; use hotfix_message::session_fields::{ @@ -50,6 +50,20 @@ impl Reject { self.text = Some(text.to_string()); self } + + #[cfg(test)] + fn parse(message: &Message) -> Self { + Self { + #[allow(clippy::expect_used)] + ref_seq_num: message + .get(REF_SEQ_NUM) + .expect("ref_seq_num should be present"), + ref_tag_id: message.get(REF_TAG_ID).ok(), + ref_msg_type: message.get(REF_MSG_TYPE).ok(), + session_reject_reason: message.get(SESSION_REJECT_REASON).ok(), + text: message.get::<&str>(TEXT).ok().map(|s| s.to_string()), + } + } } impl OutboundMessage for Reject { @@ -75,22 +89,6 @@ impl OutboundMessage for Reject { } } -impl InboundMessage for Reject { - fn parse(message: &Message) -> Self { - Self { - // TODO: how do we handle errors in parsing messages? - #[allow(clippy::expect_used)] - ref_seq_num: message - .get(REF_SEQ_NUM) - .expect("ref_seq_num should be present"), - ref_tag_id: message.get(REF_TAG_ID).ok(), - ref_msg_type: message.get(REF_MSG_TYPE).ok(), - session_reject_reason: message.get(SESSION_REJECT_REASON).ok(), - text: message.get::<&str>(TEXT).ok().map(|s| s.to_string()), - } - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/hotfix/src/session.rs b/crates/hotfix/src/session.rs index a705dd3..b55ae3f 100644 --- a/crates/hotfix/src/session.rs +++ b/crates/hotfix/src/session.rs @@ -20,6 +20,7 @@ use crate::Application; use crate::application::{InboundDecision, OutboundDecision}; use crate::config::SessionConfig; use crate::message::OutboundMessage; +use crate::message::generate_message; use crate::message::heartbeat::Heartbeat; use crate::message::logon::{Logon, ResetSeqNumConfig}; use crate::message::logout::Logout; @@ -30,7 +31,6 @@ use crate::message::sequence_reset::SequenceReset; use crate::message::test_request::TestRequest; use crate::message::verification::verify_message; use crate::message::verification_error::{CompIdType, MessageVerificationError}; -use crate::message::{InboundMessage, generate_message}; use crate::message_utils::{is_admin, prepare_message_for_resend}; use crate::session::admin_request::AdminRequest; use crate::session::error::SessionCreationError; @@ -57,7 +57,7 @@ use hotfix_message::session_fields::{ const SCHEDULE_CHECK_INTERVAL: u64 = 1; -struct Session { +struct Session { message_config: MessageConfig, config: SessionConfig, schedule: SessionSchedule, @@ -67,13 +67,12 @@ struct Session { store: S, schedule_check_timer: Pin>, reset_on_next_logon: bool, - _phantom: std::marker::PhantomData (I, O)>, + _phantom: std::marker::PhantomData O>, } -impl Session +impl Session where - App: Application, - Inbound: InboundMessage, + App: Application, Outbound: OutboundMessage, Store: MessageStore, { @@ -81,7 +80,7 @@ where config: SessionConfig, application: App, store: Store, - ) -> Result, SessionCreationError> { + ) -> Result, SessionCreationError> { let schedule_check_timer = sleep(Duration::from_secs(SCHEDULE_CHECK_INTERVAL)); let dictionary = Self::get_data_dictionary(&config)?; @@ -245,9 +244,8 @@ where ) -> Result<(), SessionOperationError> { match self.verify_message(message, true) { Ok(_) => { - let parsed_message = Inbound::parse(message); if matches!( - self.application.on_inbound_message(parsed_message).await, + self.application.on_inbound_message(message).await, InboundDecision::TerminateSession ) { error!("failed to send inbound message to application"); @@ -1131,14 +1129,13 @@ fn get_msg_seq_num(message: &Message) -> u64 { .expect("MsgSeqNum missing from validated message - parser bug") } -async fn run_session( - mut session: Session, +async fn run_session( + mut session: Session, mut event_receiver: mpsc::Receiver, mut outbound_message_receiver: mpsc::Receiver>, mut admin_request_receiver: mpsc::Receiver, ) where - App: Application, - Inbound: InboundMessage, + App: Application, Outbound: OutboundMessage, Store: MessageStore + Send + 'static, { @@ -1196,7 +1193,7 @@ async fn run_session( mod tests { use super::*; use crate::application::{InboundDecision, OutboundDecision}; - use crate::message::{InboundMessage, OutboundMessage}; + use crate::message::OutboundMessage; use crate::store::{Result as StoreResult, StoreError}; use chrono::{DateTime, Datelike, NaiveDate, NaiveTime, TimeDelta, Timelike}; use chrono_tz::Tz; @@ -1293,21 +1290,15 @@ mod tests { } } - impl InboundMessage for DummyMessage { - fn parse(_message: &Message) -> Self { - DummyMessage - } - } - /// Minimal no-op application for testing struct NoOpApp; #[async_trait::async_trait] - impl Application for NoOpApp { + impl Application for NoOpApp { async fn on_outbound_message(&self, _: &DummyMessage) -> OutboundDecision { OutboundDecision::Send } - async fn on_inbound_message(&self, _: DummyMessage) -> InboundDecision { + async fn on_inbound_message(&self, _: &Message) -> InboundDecision { InboundDecision::Accept } async fn on_logout(&mut self, _: &str) {} @@ -1341,7 +1332,7 @@ mod tests { schedule: SessionSchedule, state: SessionState, store: TestStore, - ) -> Session { + ) -> Session { let config = create_test_config(); let message_config = MessageConfig::default(); let dictionary = Dictionary::fix44(); diff --git a/crates/hotfix/src/session/session_ref.rs b/crates/hotfix/src/session/session_ref.rs index 5165d7c..7b0c3c8 100644 --- a/crates/hotfix/src/session/session_ref.rs +++ b/crates/hotfix/src/session/session_ref.rs @@ -3,7 +3,7 @@ use tokio::sync::{mpsc, oneshot}; use tracing::debug; use crate::config::SessionConfig; -use crate::message::{InboundMessage, OutboundMessage, RawFixMessage}; +use crate::message::{OutboundMessage, RawFixMessage}; use crate::session::Session; use crate::session::admin_request::AdminRequest; use crate::session::error::{SendError, SendOutcome, SessionCreationError}; @@ -26,9 +26,9 @@ pub struct InternalSessionRef { } impl InternalSessionRef { - pub fn new( + pub fn new( config: SessionConfig, - application: impl Application, + application: impl Application, store: impl MessageStore + 'static, ) -> Result { let (event_sender, event_receiver) = mpsc::channel::(100); diff --git a/crates/hotfix/tests/connection_test_cases/helpers.rs b/crates/hotfix/tests/connection_test_cases/helpers.rs index 02e6fb5..70be1ce 100644 --- a/crates/hotfix/tests/connection_test_cases/helpers.rs +++ b/crates/hotfix/tests/connection_test_cases/helpers.rs @@ -9,7 +9,7 @@ use std::sync::{Arc, Once}; use hotfix::Application; use hotfix::application::{InboundDecision, OutboundDecision}; -use hotfix::message::{InboundMessage, OutboundMessage}; +use hotfix::message::OutboundMessage; use hotfix_message::message::Message; use rcgen::{CertificateParams, DnType, IsCa, KeyPair, KeyUsagePurpose, SanType}; use rustls::ServerConfig; @@ -337,22 +337,16 @@ impl OutboundMessage for MinimalMessage { } } -impl InboundMessage for MinimalMessage { - fn parse(_message: &Message) -> Self { - MinimalMessage - } -} - /// A minimal Application implementation for testing transport connectivity. pub struct MinimalApplication; #[async_trait::async_trait] -impl Application for MinimalApplication { +impl Application for MinimalApplication { async fn on_outbound_message(&self, _msg: &MinimalMessage) -> OutboundDecision { OutboundDecision::Send } - async fn on_inbound_message(&self, _msg: MinimalMessage) -> InboundDecision { + async fn on_inbound_message(&self, _msg: &Message) -> InboundDecision { InboundDecision::Accept } diff --git a/crates/hotfix/tests/session_test_cases/business_tests.rs b/crates/hotfix/tests/session_test_cases/business_tests.rs index f480199..ba42097 100644 --- a/crates/hotfix/tests/session_test_cases/business_tests.rs +++ b/crates/hotfix/tests/session_test_cases/business_tests.rs @@ -3,8 +3,9 @@ use crate::common::assertions::then; use crate::common::cleanup::finally; use crate::common::setup::given_an_active_session; use crate::common::test_messages::TestMessage; -use hotfix::message::{InboundMessage, OutboundMessage}; -use hotfix_message::{FieldType, fix44::MsgType}; +use hotfix::message::OutboundMessage; +use hotfix_message::fix44::{MSG_TYPE, MsgType}; +use hotfix_message::{FieldType, Part}; #[tokio::test] async fn test_new_order_single() { @@ -25,7 +26,10 @@ async fn test_new_order_single() { .sends_message(TestMessage::dummy_execution_report()) .await; then(&mut session) - .receives(|msg| assert_eq!(msg.message_type(), MsgType::ExecutionReport.to_string())) + .receives(|msg| { + let msg_type: &str = msg.header().get(MSG_TYPE).unwrap(); + assert_eq!(msg_type, MsgType::ExecutionReport.to_string()); + }) .await; finally(&session, &mut counterparty).disconnect().await; diff --git a/crates/hotfix/tests/session_test_cases/common/assertions.rs b/crates/hotfix/tests/session_test_cases/common/assertions.rs index c18bea0..1b54f47 100644 --- a/crates/hotfix/tests/session_test_cases/common/assertions.rs +++ b/crates/hotfix/tests/session_test_cases/common/assertions.rs @@ -24,7 +24,7 @@ impl Then<&mut SessionSpy> { pub async fn receives(self, assertion: F) where - F: FnOnce(&TestMessage), + F: FnOnce(&Message), { self.target .assert_next_with_timeout(assertion, DEFAULT_TIMEOUT) diff --git a/crates/hotfix/tests/session_test_cases/common/fakes/fake_application.rs b/crates/hotfix/tests/session_test_cases/common/fakes/fake_application.rs index 3685d63..06f8cbc 100644 --- a/crates/hotfix/tests/session_test_cases/common/fakes/fake_application.rs +++ b/crates/hotfix/tests/session_test_cases/common/fakes/fake_application.rs @@ -1,14 +1,15 @@ use crate::common::test_messages::TestMessage; use hotfix::Application; use hotfix::application::{InboundDecision, OutboundDecision}; +use hotfix_message::message::Message; pub struct FakeApplication { - message_sender: tokio::sync::mpsc::UnboundedSender, + message_sender: tokio::sync::mpsc::UnboundedSender, outbound_decision: OutboundDecision, } impl FakeApplication { - pub fn new(message_sender: tokio::sync::mpsc::UnboundedSender) -> Self { + pub fn new(message_sender: tokio::sync::mpsc::UnboundedSender) -> Self { Self { message_sender, outbound_decision: OutboundDecision::Send, @@ -16,7 +17,7 @@ impl FakeApplication { } pub fn with_outbound_decision( - message_sender: tokio::sync::mpsc::UnboundedSender, + message_sender: tokio::sync::mpsc::UnboundedSender, decision: OutboundDecision, ) -> Self { Self { @@ -27,13 +28,13 @@ impl FakeApplication { } #[async_trait::async_trait] -impl Application for FakeApplication { +impl Application for FakeApplication { async fn on_outbound_message(&self, _msg: &TestMessage) -> OutboundDecision { self.outbound_decision } - async fn on_inbound_message(&self, msg: TestMessage) -> InboundDecision { - self.message_sender.send(msg).unwrap(); + async fn on_inbound_message(&self, msg: &Message) -> InboundDecision { + self.message_sender.send(msg.clone()).unwrap(); InboundDecision::Accept } diff --git a/crates/hotfix/tests/session_test_cases/common/fakes/session_spy.rs b/crates/hotfix/tests/session_test_cases/common/fakes/session_spy.rs index 7025e27..340eb05 100644 --- a/crates/hotfix/tests/session_test_cases/common/fakes/session_spy.rs +++ b/crates/hotfix/tests/session_test_cases/common/fakes/session_spy.rs @@ -1,15 +1,16 @@ use crate::common::test_messages::TestMessage; use hotfix::session::SessionHandle; +use hotfix_message::message::Message; pub struct SessionSpy { session_handle: SessionHandle, - message_receiver: tokio::sync::mpsc::UnboundedReceiver, + message_receiver: tokio::sync::mpsc::UnboundedReceiver, } impl SessionSpy { pub fn new( session_handle: SessionHandle, - message_receiver: tokio::sync::mpsc::UnboundedReceiver, + message_receiver: tokio::sync::mpsc::UnboundedReceiver, ) -> Self { Self { session_handle, @@ -23,7 +24,7 @@ impl SessionSpy { pub async fn assert_next_with_timeout(&mut self, assertion: F, timeout: std::time::Duration) where - F: FnOnce(&TestMessage), + F: FnOnce(&Message), { match tokio::time::timeout(timeout, self.message_receiver.recv()).await { Ok(Some(message)) => { diff --git a/crates/hotfix/tests/session_test_cases/common/test_messages.rs b/crates/hotfix/tests/session_test_cases/common/test_messages.rs index f17e605..fbfa9fd 100644 --- a/crates/hotfix/tests/session_test_cases/common/test_messages.rs +++ b/crates/hotfix/tests/session_test_cases/common/test_messages.rs @@ -1,7 +1,7 @@ use crate::common::setup::{COUNTERPARTY_COMP_ID, OUR_COMP_ID}; use chrono::TimeDelta; use hotfix::Message as HotfixMessage; -use hotfix::message::{InboundMessage, OutboundMessage, generate_message}; +use hotfix::message::{OutboundMessage, generate_message}; use hotfix_message::dict::{FieldLocation, FixDatatype}; use hotfix_message::field_types::Timestamp; use hotfix_message::message::{Config, Message}; @@ -61,6 +61,51 @@ impl TestMessage { price: 100.0, } } + + pub fn parse(msg: &HotfixMessage) -> Self { + let msg_type: &str = msg.header().get(fix44::MSG_TYPE).unwrap(); + match msg_type { + "8" => { + let order_id: &str = msg.get(fix44::ORDER_ID).unwrap(); + let exec_id: &str = msg.get(fix44::EXEC_ID).unwrap(); + let exec_type = msg.get(fix44::EXEC_TYPE).unwrap(); + let ord_status = msg.get(fix44::ORD_STATUS).unwrap(); + let side = msg.get(fix44::SIDE).unwrap(); + let symbol: &str = msg.get(fix44::SYMBOL).unwrap(); + let order_qty = msg.get(fix44::ORDER_QTY).unwrap(); + let price = msg.get(fix44::PRICE).unwrap(); + + Self::ExecutionReport { + order_id: order_id.to_string(), + exec_id: exec_id.to_string(), + exec_type, + ord_status, + side, + symbol: symbol.to_string(), + order_qty, + price, + } + } + "D" => { + let cl_ord_id: &str = msg.get(fix44::CL_ORD_ID).unwrap(); + let side = msg.get(fix44::SIDE).unwrap(); + let symbol: &str = msg.get(fix44::SYMBOL).unwrap(); + let order_qty = msg.get(fix44::ORDER_QTY).unwrap(); + let ord_type = msg.get(fix44::ORD_TYPE).unwrap(); + let price = msg.get(fix44::PRICE).unwrap(); + + Self::NewOrderSingle { + cl_ord_id: cl_ord_id.to_string(), + side, + symbol: symbol.to_string(), + order_qty, + ord_type, + price, + } + } + _ => panic!("Invalid message type: {msg_type}"), + } + } } impl OutboundMessage for TestMessage { @@ -111,55 +156,6 @@ impl OutboundMessage for TestMessage { } } -impl InboundMessage for TestMessage { - fn parse(msg: &HotfixMessage) -> Self { - let msg_type: &str = msg.header().get(fix44::MSG_TYPE).unwrap(); - match msg_type { - "8" => { - // Execution Report - let order_id: &str = msg.get(fix44::ORDER_ID).unwrap(); - let exec_id: &str = msg.get(fix44::EXEC_ID).unwrap(); - let exec_type = msg.get(fix44::EXEC_TYPE).unwrap(); - let ord_status = msg.get(fix44::ORD_STATUS).unwrap(); - let side = msg.get(fix44::SIDE).unwrap(); - let symbol: &str = msg.get(fix44::SYMBOL).unwrap(); - let order_qty = msg.get(fix44::ORDER_QTY).unwrap(); - let price = msg.get(fix44::PRICE).unwrap(); - - Self::ExecutionReport { - order_id: order_id.to_string(), - exec_id: exec_id.to_string(), - exec_type, - ord_status, - side, - symbol: symbol.to_string(), - order_qty, - price, - } - } - "D" => { - // New Order Single - let cl_ord_id: &str = msg.get(fix44::CL_ORD_ID).unwrap(); - let side = msg.get(fix44::SIDE).unwrap(); - let symbol: &str = msg.get(fix44::SYMBOL).unwrap(); - let order_qty = msg.get(fix44::ORDER_QTY).unwrap(); - let ord_type = msg.get(fix44::ORD_TYPE).unwrap(); - let price = msg.get(fix44::PRICE).unwrap(); - - Self::NewOrderSingle { - cl_ord_id: cl_ord_id.to_string(), - side, - symbol: symbol.to_string(), - order_qty, - ord_type, - price, - } - } - _ => panic!("Invalid message type: {msg_type}"), - } - } -} - /// A new order message with an extra, invalid field. #[derive(Clone, Debug)] pub struct ExecutionReportWithInvalidField { diff --git a/crates/hotfix/tests/session_test_cases/resend_tests.rs b/crates/hotfix/tests/session_test_cases/resend_tests.rs index 82278cf..a54246f 100644 --- a/crates/hotfix/tests/session_test_cases/resend_tests.rs +++ b/crates/hotfix/tests/session_test_cases/resend_tests.rs @@ -5,9 +5,9 @@ use crate::common::setup::{HEARTBEAT_INTERVAL, given_an_active_session}; use crate::common::test_messages::{ TestMessage, build_execution_report_with_incorrect_body_length, build_invalid_resend_request, }; -use hotfix::message::{OutboundMessage, ResendRequest}; +use hotfix::message::ResendRequest; use hotfix::session::Status; -use hotfix_message::fix44::{GAP_FILL_FLAG, MsgType, NEW_SEQ_NO}; +use hotfix_message::fix44::{GAP_FILL_FLAG, MSG_TYPE, MsgType, NEW_SEQ_NO, ORDER_ID}; use hotfix_message::{FieldType, Part}; use std::time::Duration; @@ -116,7 +116,10 @@ async fn test_resent_message_previously_received_is_ignored() { .sends_message(TestMessage::dummy_execution_report()) .await; then(&mut session) - .receives(|msg| assert_eq!(msg.message_type(), MsgType::ExecutionReport.to_string())) + .receives(|msg| { + let msg_type: &str = msg.header().get(MSG_TYPE).unwrap(); + assert_eq!(msg_type, MsgType::ExecutionReport.to_string()); + }) .await; then(&mut session).target_sequence_number_reaches(2).await; @@ -132,11 +135,10 @@ async fn test_resent_message_previously_received_is_ignored() { .await; then(&mut session) .receives(|msg| { - if let TestMessage::ExecutionReport { order_id, .. } = msg { - assert_eq!(order_id, &new_report_order_id); - } else { - panic!("Unexpected message: {:?}", msg); - } + let msg_type: &str = msg.header().get(MSG_TYPE).unwrap(); + assert_eq!(msg_type, MsgType::ExecutionReport.to_string()); + let order_id: &str = msg.get(ORDER_ID).unwrap(); + assert_eq!(order_id, &new_report_order_id); }) .await; diff --git a/crates/hotfix/tests/session_test_cases/send_confirmation_tests.rs b/crates/hotfix/tests/session_test_cases/send_confirmation_tests.rs index b76f6c3..c6f8f0d 100644 --- a/crates/hotfix/tests/session_test_cases/send_confirmation_tests.rs +++ b/crates/hotfix/tests/session_test_cases/send_confirmation_tests.rs @@ -7,7 +7,7 @@ use crate::common::setup::{ }; use crate::common::test_messages::TestMessage; use hotfix::application::OutboundDecision; -use hotfix::message::{InboundMessage, OutboundMessage}; +use hotfix::message::OutboundMessage; use hotfix::session::{SendError, SendOutcome}; #[tokio::test] diff --git a/examples/load-testing/src/application.rs b/examples/load-testing/src/application.rs index 981c42e..575459f 100644 --- a/examples/load-testing/src/application.rs +++ b/examples/load-testing/src/application.rs @@ -1,5 +1,6 @@ use crate::messages::{ExecutionReport, InboundMsg, OutboundMsg}; use hotfix::Application; +use hotfix::Message; use hotfix::application::{InboundDecision, OutboundDecision}; use tokio::sync::mpsc::UnboundedSender; use tracing::info; @@ -15,13 +16,14 @@ impl LoadTestingApplication { } #[async_trait::async_trait] -impl Application for LoadTestingApplication { +impl Application for LoadTestingApplication { async fn on_outbound_message(&self, _msg: &OutboundMsg) -> OutboundDecision { OutboundDecision::Send } - async fn on_inbound_message(&self, msg: InboundMsg) -> InboundDecision { - match msg { + async fn on_inbound_message(&self, msg: &Message) -> InboundDecision { + let parsed = InboundMsg::parse(msg); + match parsed { InboundMsg::Unimplemented(data) => { let pretty_bytes: Vec = data .iter() diff --git a/examples/load-testing/src/messages.rs b/examples/load-testing/src/messages.rs index 7e54600..ba00b2f 100644 --- a/examples/load-testing/src/messages.rs +++ b/examples/load-testing/src/messages.rs @@ -2,7 +2,7 @@ use hotfix::Message as HotfixMessage; use hotfix::field_types::{Date, Timestamp}; use hotfix::fix44; use hotfix::fix44::{OrdStatus, OrdType, Side}; -use hotfix::message::{InboundMessage, OutboundMessage, Part, RepeatingGroup}; +use hotfix::message::{OutboundMessage, Part, RepeatingGroup}; #[derive(Debug, Clone)] #[allow(dead_code)] @@ -48,24 +48,6 @@ pub enum OutboundMsg { NewOrderSingle(NewOrderSingle), } -impl InboundMsg { - fn parse_execution_report_ack(message: &HotfixMessage) -> Self { - let report = ExecutionReport { - order_id: message.get::<&str>(fix44::ORDER_ID).unwrap().to_string(), - cl_ord_id: message.get::<&str>(fix44::CL_ORD_ID).unwrap().to_string(), - exec_id: message.get::<&str>(fix44::EXEC_ID).unwrap().to_string(), - exec_type: message.get::(fix44::EXEC_TYPE).unwrap(), - ord_status: message.get::(fix44::ORD_STATUS).unwrap(), - side: message.get::(fix44::SIDE).unwrap(), - symbol: message.get::<&str>(fix44::SYMBOL).unwrap().to_string(), - leaves_qty: message.get::(fix44::LEAVES_QTY).unwrap(), - cum_qty: message.get::(fix44::CUM_QTY).unwrap(), - avx_px: message.get::(fix44::AVG_PX).unwrap(), - }; - Self::ExecutionReport(report) - } -} - impl OutboundMessage for OutboundMsg { fn write(&self, msg: &mut HotfixMessage) { match self { @@ -97,8 +79,8 @@ impl OutboundMessage for OutboundMsg { } } -impl InboundMessage for InboundMsg { - fn parse(message: &HotfixMessage) -> Self { +impl InboundMsg { + pub fn parse(message: &HotfixMessage) -> Self { let message_type: &str = message.header().get(fix44::MSG_TYPE).unwrap(); if message_type == "8" { Self::parse_execution_report_ack(message) @@ -106,4 +88,20 @@ impl InboundMessage for InboundMsg { Self::Unimplemented(message_type.as_bytes().to_vec()) } } + + fn parse_execution_report_ack(message: &HotfixMessage) -> Self { + let report = ExecutionReport { + order_id: message.get::<&str>(fix44::ORDER_ID).unwrap().to_string(), + cl_ord_id: message.get::<&str>(fix44::CL_ORD_ID).unwrap().to_string(), + exec_id: message.get::<&str>(fix44::EXEC_ID).unwrap().to_string(), + exec_type: message.get::(fix44::EXEC_TYPE).unwrap(), + ord_status: message.get::(fix44::ORD_STATUS).unwrap(), + side: message.get::(fix44::SIDE).unwrap(), + symbol: message.get::<&str>(fix44::SYMBOL).unwrap().to_string(), + leaves_qty: message.get::(fix44::LEAVES_QTY).unwrap(), + cum_qty: message.get::(fix44::CUM_QTY).unwrap(), + avx_px: message.get::(fix44::AVG_PX).unwrap(), + }; + Self::ExecutionReport(report) + } } diff --git a/examples/simple-new-order/src/application.rs b/examples/simple-new-order/src/application.rs index 9f0219f..5c5977b 100644 --- a/examples/simple-new-order/src/application.rs +++ b/examples/simple-new-order/src/application.rs @@ -1,5 +1,6 @@ -use crate::messages::{InboundMsg, OutboundMsg}; +use crate::messages::OutboundMsg; use hotfix::Application; +use hotfix::Message; use hotfix::application::{InboundDecision, OutboundDecision}; use tracing::info; @@ -7,23 +8,13 @@ use tracing::info; pub struct TestApplication {} #[async_trait::async_trait] -impl Application for TestApplication { +impl Application for TestApplication { async fn on_outbound_message(&self, _msg: &OutboundMsg) -> OutboundDecision { OutboundDecision::Send } - async fn on_inbound_message(&self, msg: InboundMsg) -> InboundDecision { - match msg { - InboundMsg::Unimplemented(data) => { - let pretty_bytes: Vec = data - .iter() - .map(|b| if *b == b'\x01' { b'|' } else { *b }) - .collect(); - let s = std::str::from_utf8(&pretty_bytes).unwrap_or("invalid characters"); - info!("received message: {:?}", s); - } - } - + async fn on_inbound_message(&self, _msg: &Message) -> InboundDecision { + info!("received inbound message"); InboundDecision::Accept } diff --git a/examples/simple-new-order/src/messages.rs b/examples/simple-new-order/src/messages.rs index 5f7092f..614aad2 100644 --- a/examples/simple-new-order/src/messages.rs +++ b/examples/simple-new-order/src/messages.rs @@ -1,7 +1,7 @@ use hotfix::Message as HotfixMessage; use hotfix::field_types::{Date, Timestamp}; use hotfix::fix44; -use hotfix::message::{InboundMessage, OutboundMessage, Part, RepeatingGroup}; +use hotfix::message::{OutboundMessage, Part, RepeatingGroup}; #[derive(Debug, Clone)] pub struct NewOrderSingle { @@ -20,11 +20,6 @@ pub struct NewOrderSingle { pub allocation_quantity: u32, } -#[derive(Debug, Clone)] -pub enum InboundMsg { - Unimplemented(Vec), -} - #[derive(Debug, Clone)] pub enum OutboundMsg { NewOrderSingle(NewOrderSingle), @@ -59,10 +54,3 @@ impl OutboundMessage for OutboundMsg { } } } - -impl InboundMessage for InboundMsg { - fn parse(message: &HotfixMessage) -> Self { - let message_type: &str = message.header().get(fix44::MSG_TYPE).unwrap(); - Self::Unimplemented(message_type.as_bytes().to_vec()) - } -} From f3569173bfe97d0a637446838cc3b23b45ddbddd Mon Sep 17 00:00:00 2001 From: David Steiner Date: Fri, 6 Feb 2026 13:01:49 +0100 Subject: [PATCH 4/7] Convert outbound message type to associated type --- crates/hotfix/src/application.rs | 7 +++-- crates/hotfix/src/initiator.rs | 6 ++-- crates/hotfix/src/session.rs | 31 +++++++++---------- crates/hotfix/src/session/session_ref.rs | 2 +- .../tests/connection_test_cases/helpers.rs | 4 ++- .../common/fakes/fake_application.rs | 4 ++- examples/load-testing/src/application.rs | 4 ++- examples/simple-new-order/src/application.rs | 4 ++- 8 files changed, 36 insertions(+), 26 deletions(-) diff --git a/crates/hotfix/src/application.rs b/crates/hotfix/src/application.rs index 8539d2e..5d93277 100644 --- a/crates/hotfix/src/application.rs +++ b/crates/hotfix/src/application.rs @@ -1,12 +1,15 @@ +use crate::message::OutboundMessage; use hotfix_message::message::Message; #[async_trait::async_trait] /// The application users of HotFIX can implement to hook into the engine. -pub trait Application: Send + Sync + 'static { +pub trait Application: Send + Sync + 'static { + type Outbound: OutboundMessage; + /// Called when a message is sent to the engine to be sent to the counterparty. /// /// This is invoked before the raw message is persisted in the message store. - async fn on_outbound_message(&self, msg: &Outbound) -> OutboundDecision; + async fn on_outbound_message(&self, msg: &Self::Outbound) -> OutboundDecision; /// Called when a message is received from the counterparty. /// /// This is invoked after the message is verified by the session layer. diff --git a/crates/hotfix/src/initiator.rs b/crates/hotfix/src/initiator.rs index 9989168..19543e4 100644 --- a/crates/hotfix/src/initiator.rs +++ b/crates/hotfix/src/initiator.rs @@ -29,7 +29,7 @@ pub struct Initiator { impl Initiator { pub async fn start( config: SessionConfig, - application: impl Application, + application: impl Application, store: impl MessageStore + 'static, ) -> Result { let session_ref = InternalSessionRef::new(config.clone(), application, store)?; @@ -184,7 +184,9 @@ mod tests { struct NoOpApp; #[async_trait::async_trait] - impl Application for NoOpApp { + impl Application for NoOpApp { + type Outbound = DummyMessage; + async fn on_outbound_message(&self, _msg: &DummyMessage) -> OutboundDecision { OutboundDecision::Send } diff --git a/crates/hotfix/src/session.rs b/crates/hotfix/src/session.rs index b55ae3f..9cca4b1 100644 --- a/crates/hotfix/src/session.rs +++ b/crates/hotfix/src/session.rs @@ -57,7 +57,7 @@ use hotfix_message::session_fields::{ const SCHEDULE_CHECK_INTERVAL: u64 = 1; -struct Session { +struct Session { message_config: MessageConfig, config: SessionConfig, schedule: SessionSchedule, @@ -67,20 +67,18 @@ struct Session { store: S, schedule_check_timer: Pin>, reset_on_next_logon: bool, - _phantom: std::marker::PhantomData O>, } -impl Session +impl Session where - App: Application, - Outbound: OutboundMessage, + App: Application, Store: MessageStore, { fn new( config: SessionConfig, application: App, store: Store, - ) -> Result, SessionCreationError> { + ) -> Result, SessionCreationError> { let schedule_check_timer = sleep(Duration::from_secs(SCHEDULE_CHECK_INTERVAL)); let dictionary = Self::get_data_dictionary(&config)?; @@ -98,7 +96,6 @@ where store, schedule_check_timer: Box::pin(schedule_check_timer), reset_on_next_logon: false, - _phantom: std::marker::PhantomData, }; Ok(session) @@ -794,7 +791,7 @@ where .reset_peer_timer(self.config.heartbeat_interval, test_request_id); } - async fn send_app_message(&mut self, message: Outbound) -> Result { + async fn send_app_message(&mut self, message: App::Outbound) -> Result { if !self.state.is_connected() { return Err(SendError::Disconnected); } @@ -981,7 +978,7 @@ where } } - async fn handle_outbound_message(&mut self, request: OutboundRequest) { + async fn handle_outbound_message(&mut self, request: OutboundRequest) { let OutboundRequest { message, confirm } = request; let result = self.send_app_message(message).await; match confirm { @@ -1129,14 +1126,13 @@ fn get_msg_seq_num(message: &Message) -> u64 { .expect("MsgSeqNum missing from validated message - parser bug") } -async fn run_session( - mut session: Session, +async fn run_session( + mut session: Session, mut event_receiver: mpsc::Receiver, - mut outbound_message_receiver: mpsc::Receiver>, + mut outbound_message_receiver: mpsc::Receiver>, mut admin_request_receiver: mpsc::Receiver, ) where - App: Application, - Outbound: OutboundMessage, + App: Application, Store: MessageStore + Send + 'static, { loop { @@ -1294,7 +1290,9 @@ mod tests { struct NoOpApp; #[async_trait::async_trait] - impl Application for NoOpApp { + impl Application for NoOpApp { + type Outbound = DummyMessage; + async fn on_outbound_message(&self, _: &DummyMessage) -> OutboundDecision { OutboundDecision::Send } @@ -1332,7 +1330,7 @@ mod tests { schedule: SessionSchedule, state: SessionState, store: TestStore, - ) -> Session { + ) -> Session { let config = create_test_config(); let message_config = MessageConfig::default(); let dictionary = Dictionary::fix44(); @@ -1348,7 +1346,6 @@ mod tests { store, schedule_check_timer: Box::pin(sleep(Duration::from_secs(1))), reset_on_next_logon: false, - _phantom: std::marker::PhantomData, } } diff --git a/crates/hotfix/src/session/session_ref.rs b/crates/hotfix/src/session/session_ref.rs index 7b0c3c8..e04ba2d 100644 --- a/crates/hotfix/src/session/session_ref.rs +++ b/crates/hotfix/src/session/session_ref.rs @@ -28,7 +28,7 @@ pub struct InternalSessionRef { impl InternalSessionRef { pub fn new( config: SessionConfig, - application: impl Application, + application: impl Application, store: impl MessageStore + 'static, ) -> Result { let (event_sender, event_receiver) = mpsc::channel::(100); diff --git a/crates/hotfix/tests/connection_test_cases/helpers.rs b/crates/hotfix/tests/connection_test_cases/helpers.rs index 70be1ce..0b35c26 100644 --- a/crates/hotfix/tests/connection_test_cases/helpers.rs +++ b/crates/hotfix/tests/connection_test_cases/helpers.rs @@ -341,7 +341,9 @@ impl OutboundMessage for MinimalMessage { pub struct MinimalApplication; #[async_trait::async_trait] -impl Application for MinimalApplication { +impl Application for MinimalApplication { + type Outbound = MinimalMessage; + async fn on_outbound_message(&self, _msg: &MinimalMessage) -> OutboundDecision { OutboundDecision::Send } diff --git a/crates/hotfix/tests/session_test_cases/common/fakes/fake_application.rs b/crates/hotfix/tests/session_test_cases/common/fakes/fake_application.rs index 06f8cbc..d4bcf5c 100644 --- a/crates/hotfix/tests/session_test_cases/common/fakes/fake_application.rs +++ b/crates/hotfix/tests/session_test_cases/common/fakes/fake_application.rs @@ -28,7 +28,9 @@ impl FakeApplication { } #[async_trait::async_trait] -impl Application for FakeApplication { +impl Application for FakeApplication { + type Outbound = TestMessage; + async fn on_outbound_message(&self, _msg: &TestMessage) -> OutboundDecision { self.outbound_decision } diff --git a/examples/load-testing/src/application.rs b/examples/load-testing/src/application.rs index 575459f..da74713 100644 --- a/examples/load-testing/src/application.rs +++ b/examples/load-testing/src/application.rs @@ -16,7 +16,9 @@ impl LoadTestingApplication { } #[async_trait::async_trait] -impl Application for LoadTestingApplication { +impl Application for LoadTestingApplication { + type Outbound = OutboundMsg; + async fn on_outbound_message(&self, _msg: &OutboundMsg) -> OutboundDecision { OutboundDecision::Send } diff --git a/examples/simple-new-order/src/application.rs b/examples/simple-new-order/src/application.rs index 5c5977b..d3d7735 100644 --- a/examples/simple-new-order/src/application.rs +++ b/examples/simple-new-order/src/application.rs @@ -8,7 +8,9 @@ use tracing::info; pub struct TestApplication {} #[async_trait::async_trait] -impl Application for TestApplication { +impl Application for TestApplication { + type Outbound = OutboundMsg; + async fn on_outbound_message(&self, _msg: &OutboundMsg) -> OutboundDecision { OutboundDecision::Send } From cbdf26013451289d00a9e1d43568ab749c575381 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Fri, 6 Feb 2026 13:34:52 +0100 Subject: [PATCH 5/7] Add reject as inbound decision variant --- crates/hotfix/src/application.rs | 18 ++ crates/hotfix/src/message.rs | 1 + crates/hotfix/src/message/business_reject.rs | 220 +++++++++++++++++++ crates/hotfix/src/session.rs | 28 ++- 4 files changed, 261 insertions(+), 6 deletions(-) create mode 100644 crates/hotfix/src/message/business_reject.rs diff --git a/crates/hotfix/src/application.rs b/crates/hotfix/src/application.rs index 5d93277..72022c2 100644 --- a/crates/hotfix/src/application.rs +++ b/crates/hotfix/src/application.rs @@ -20,8 +20,26 @@ pub trait Application: Send + Sync + 'static { async fn on_logon(&mut self); } +/// Standard FIX Business Reject Reason values (tag 380). +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[repr(u32)] +pub enum BusinessRejectReason { + Other = 0, + UnknownId = 1, + UnknownSecurity = 2, + UnsupportedMessageType = 3, + ApplicationNotAvailable = 4, + ConditionallyRequiredFieldMissing = 5, + NotAuthorized = 6, + DeliverToFirmNotAvailable = 7, +} + pub enum InboundDecision { Accept, + Reject { + reason: BusinessRejectReason, + text: Option, + }, TerminateSession, } diff --git a/crates/hotfix/src/message.rs b/crates/hotfix/src/message.rs index 2bafbf0..77f2e59 100644 --- a/crates/hotfix/src/message.rs +++ b/crates/hotfix/src/message.rs @@ -5,6 +5,7 @@ pub(crate) use hotfix_message::message::{Config, Message}; use hotfix_message::session_fields::{MSG_SEQ_NUM, SENDER_COMP_ID, SENDING_TIME, TARGET_COMP_ID}; pub use hotfix_message::{Part, RepeatingGroup}; +pub mod business_reject; pub mod heartbeat; pub mod logon; pub mod logout; diff --git a/crates/hotfix/src/message/business_reject.rs b/crates/hotfix/src/message/business_reject.rs new file mode 100644 index 0000000..c194bf4 --- /dev/null +++ b/crates/hotfix/src/message/business_reject.rs @@ -0,0 +1,220 @@ +use crate::application::BusinessRejectReason; +use crate::message::OutboundMessage; +use hotfix_message::dict::{FieldLocation, FixDatatype}; +use hotfix_message::message::Message; +use hotfix_message::session_fields::{REF_MSG_TYPE, REF_SEQ_NUM, TEXT}; +use hotfix_message::{Buffer, FieldType, HardCodedFixFieldDefinition, Part}; + +#[allow(dead_code)] +const BUSINESS_REJECT_REF_ID: &HardCodedFixFieldDefinition = &HardCodedFixFieldDefinition { + name: "BusinessRejectRefID", + tag: 379, + data_type: FixDatatype::String, + location: FieldLocation::Body, +}; + +const BUSINESS_REJECT_REASON: &HardCodedFixFieldDefinition = &HardCodedFixFieldDefinition { + name: "BusinessRejectReason", + tag: 380, + data_type: FixDatatype::Int, + location: FieldLocation::Body, +}; + +impl<'a> FieldType<'a> for BusinessRejectReason { + type Error = (); + type SerializeSettings = (); + + fn serialize_with(&self, buffer: &mut B, _settings: Self::SerializeSettings) -> usize + where + B: Buffer, + { + let value = *self as u32; + value.serialize(buffer) + } + + fn deserialize(data: &'a [u8]) -> Result { + let value = u32::deserialize(data).map_err(|_| ())?; + match value { + 0 => Ok(Self::Other), + 1 => Ok(Self::UnknownId), + 2 => Ok(Self::UnknownSecurity), + 3 => Ok(Self::UnsupportedMessageType), + 4 => Ok(Self::ApplicationNotAvailable), + 5 => Ok(Self::ConditionallyRequiredFieldMissing), + 6 => Ok(Self::NotAuthorized), + 7 => Ok(Self::DeliverToFirmNotAvailable), + _ => Err(()), + } + } +} + +#[derive(Clone, Debug)] +pub(crate) struct BusinessReject { + ref_msg_type: String, + reason: BusinessRejectReason, + ref_seq_num: Option, + text: Option, +} + +impl BusinessReject { + pub(crate) fn new(ref_msg_type: &str, reason: BusinessRejectReason) -> Self { + Self { + ref_msg_type: ref_msg_type.to_string(), + reason, + ref_seq_num: None, + text: None, + } + } + + pub(crate) fn ref_seq_num(mut self, ref_seq_num: u64) -> Self { + self.ref_seq_num = Some(ref_seq_num); + self + } + + pub(crate) fn text(mut self, text: &str) -> Self { + self.text = Some(text.to_string()); + self + } + + #[cfg(test)] + fn parse(message: &Message) -> Self { + Self { + #[allow(clippy::expect_used)] + ref_msg_type: message + .get::<&str>(REF_MSG_TYPE) + .expect("ref_msg_type should be present") + .to_string(), + #[allow(clippy::expect_used)] + reason: message + .get(BUSINESS_REJECT_REASON) + .expect("reason should be present"), + ref_seq_num: message.get(REF_SEQ_NUM).ok(), + text: message.get::<&str>(TEXT).ok().map(|s| s.to_string()), + } + } +} + +impl OutboundMessage for BusinessReject { + fn write(&self, msg: &mut Message) { + msg.set(REF_MSG_TYPE, self.ref_msg_type.as_str()); + msg.set(BUSINESS_REJECT_REASON, self.reason); + + if let Some(ref_seq_num) = self.ref_seq_num { + msg.set(REF_SEQ_NUM, ref_seq_num); + } + if let Some(text) = &self.text { + msg.set(TEXT, text.as_str()); + } + } + + fn message_type(&self) -> &str { + "j" + } +} + +#[cfg(test)] +mod tests { + use super::*; + use hotfix_message::message::Message; + + #[test] + fn test_write_business_reject_with_required_fields_only() { + let reject = BusinessReject::new("D", BusinessRejectReason::UnsupportedMessageType); + + let mut msg = Message::new("FIX.4.4", "j"); + reject.write(&mut msg); + + assert_eq!(msg.get::<&str>(REF_MSG_TYPE).unwrap(), "D"); + assert_eq!( + msg.get::(BUSINESS_REJECT_REASON) + .unwrap(), + BusinessRejectReason::UnsupportedMessageType + ); + assert!(msg.get::(REF_SEQ_NUM).is_err()); + assert!(msg.get::<&str>(TEXT).is_err()); + } + + #[test] + fn test_write_business_reject_with_all_fields() { + let reject = BusinessReject::new("8", BusinessRejectReason::NotAuthorized) + .ref_seq_num(456) + .text("Not authorized for execution reports"); + + let mut msg = Message::new("FIX.4.4", "j"); + reject.write(&mut msg); + + assert_eq!(msg.get::<&str>(REF_MSG_TYPE).unwrap(), "8"); + assert_eq!( + msg.get::(BUSINESS_REJECT_REASON) + .unwrap(), + BusinessRejectReason::NotAuthorized + ); + assert_eq!(msg.get::(REF_SEQ_NUM).unwrap(), 456); + assert_eq!( + msg.get::<&str>(TEXT).unwrap(), + "Not authorized for execution reports" + ); + } + + #[test] + fn test_round_trip_serialization() { + let original = + BusinessReject::new("D", BusinessRejectReason::ConditionallyRequiredFieldMissing) + .ref_seq_num(789) + .text("ClOrdID is required"); + + let mut msg = Message::new("FIX.4.4", "j"); + original.write(&mut msg); + + let parsed = BusinessReject::parse(&msg); + + assert_eq!(parsed.ref_msg_type, original.ref_msg_type); + assert_eq!(parsed.reason, original.reason); + assert_eq!(parsed.ref_seq_num, original.ref_seq_num); + assert_eq!(parsed.text, original.text); + } + + #[test] + fn test_round_trip_with_minimal_fields() { + let original = BusinessReject::new("0", BusinessRejectReason::Other); + + let mut msg = Message::new("FIX.4.4", "j"); + original.write(&mut msg); + + let parsed = BusinessReject::parse(&msg); + + assert_eq!(parsed.ref_msg_type, original.ref_msg_type); + assert_eq!(parsed.reason, original.reason); + assert_eq!(parsed.ref_seq_num, original.ref_seq_num); + assert_eq!(parsed.text, original.text); + } + + #[test] + fn test_message_type() { + let reject = BusinessReject::new("D", BusinessRejectReason::Other); + assert_eq!(reject.message_type(), "j"); + } + + #[test] + fn test_all_reject_reasons_round_trip() { + let reasons = [ + BusinessRejectReason::Other, + BusinessRejectReason::UnknownId, + BusinessRejectReason::UnknownSecurity, + BusinessRejectReason::UnsupportedMessageType, + BusinessRejectReason::ApplicationNotAvailable, + BusinessRejectReason::ConditionallyRequiredFieldMissing, + BusinessRejectReason::NotAuthorized, + BusinessRejectReason::DeliverToFirmNotAvailable, + ]; + + for reason in reasons { + let reject = BusinessReject::new("D", reason); + let mut msg = Message::new("FIX.4.4", "j"); + reject.write(&mut msg); + + let parsed = BusinessReject::parse(&msg); + assert_eq!(parsed.reason, reason, "Round-trip failed for {reason:?}"); + } + } +} diff --git a/crates/hotfix/src/session.rs b/crates/hotfix/src/session.rs index 9cca4b1..1ed606a 100644 --- a/crates/hotfix/src/session.rs +++ b/crates/hotfix/src/session.rs @@ -25,6 +25,7 @@ use crate::message::heartbeat::Heartbeat; use crate::message::logon::{Logon, ResetSeqNumConfig}; use crate::message::logout::Logout; use crate::message::parser::RawFixMessage; +use crate::message::business_reject::BusinessReject; use crate::message::reject::Reject; use crate::message::resend_request::ResendRequest; use crate::message::sequence_reset::SequenceReset; @@ -241,12 +242,27 @@ where ) -> Result<(), SessionOperationError> { match self.verify_message(message, true) { Ok(_) => { - if matches!( - self.application.on_inbound_message(message).await, - InboundDecision::TerminateSession - ) { - error!("failed to send inbound message to application"); - self.state.disconnect_writer().await; + match self.application.on_inbound_message(message).await { + InboundDecision::Accept => {} + InboundDecision::Reject { reason, text } => { + let msg_type: &str = message + .header() + .get(MSG_TYPE) + .map_err(|_| SessionOperationError::MissingField("MSG_TYPE"))?; + let mut reject = + BusinessReject::new(msg_type, reason) + .ref_seq_num(get_msg_seq_num(message)); + if let Some(text) = text { + reject = reject.text(&text); + } + self.send_message(reject) + .await + .with_send_context("business message reject")?; + } + InboundDecision::TerminateSession => { + error!("failed to send inbound message to application"); + self.state.disconnect_writer().await; + } } self.store.increment_target_seq_number().await?; } From c0732d97978828c039c056f6b10b72b660aa1d08 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Fri, 6 Feb 2026 14:14:44 +0100 Subject: [PATCH 6/7] Add test case for business rejects coming from the application --- crates/hotfix/src/session.rs | 7 +- .../business_reject_tests.rs | 96 +++++++++++++++++++ .../common/fakes/fake_application.rs | 58 ++++++++--- .../tests/session_test_cases/common/setup.rs | 26 +++-- crates/hotfix/tests/session_test_cases/mod.rs | 1 + .../send_confirmation_tests.rs | 18 ++-- 6 files changed, 173 insertions(+), 33 deletions(-) create mode 100644 crates/hotfix/tests/session_test_cases/business_reject_tests.rs diff --git a/crates/hotfix/src/session.rs b/crates/hotfix/src/session.rs index 1ed606a..bbfa3e9 100644 --- a/crates/hotfix/src/session.rs +++ b/crates/hotfix/src/session.rs @@ -20,12 +20,12 @@ use crate::Application; use crate::application::{InboundDecision, OutboundDecision}; use crate::config::SessionConfig; use crate::message::OutboundMessage; +use crate::message::business_reject::BusinessReject; use crate::message::generate_message; use crate::message::heartbeat::Heartbeat; use crate::message::logon::{Logon, ResetSeqNumConfig}; use crate::message::logout::Logout; use crate::message::parser::RawFixMessage; -use crate::message::business_reject::BusinessReject; use crate::message::reject::Reject; use crate::message::resend_request::ResendRequest; use crate::message::sequence_reset::SequenceReset; @@ -249,9 +249,8 @@ where .header() .get(MSG_TYPE) .map_err(|_| SessionOperationError::MissingField("MSG_TYPE"))?; - let mut reject = - BusinessReject::new(msg_type, reason) - .ref_seq_num(get_msg_seq_num(message)); + let mut reject = BusinessReject::new(msg_type, reason) + .ref_seq_num(get_msg_seq_num(message)); if let Some(text) = text { reject = reject.text(&text); } diff --git a/crates/hotfix/tests/session_test_cases/business_reject_tests.rs b/crates/hotfix/tests/session_test_cases/business_reject_tests.rs new file mode 100644 index 0000000..86405e5 --- /dev/null +++ b/crates/hotfix/tests/session_test_cases/business_reject_tests.rs @@ -0,0 +1,96 @@ +use crate::common::actions::when; +use crate::common::assertions::then; +use crate::common::cleanup::finally; +use crate::common::fakes::FakeApplication; +use crate::common::setup::given_an_active_session_with_app; +use crate::common::test_messages::TestMessage; +use hotfix::application::{BusinessRejectReason, InboundDecision}; +use hotfix_message::Part; +use hotfix_message::fix44::{MSG_TYPE, REF_MSG_TYPE, REF_SEQ_NUM, TEXT}; + +/// Tests that when the application returns InboundDecision::Reject, +/// the session sends a Business Message Reject (MsgType "j") back to the counterparty. +#[tokio::test] +async fn test_inbound_reject_sends_business_message_reject() { + let (message_tx, message_rx) = tokio::sync::mpsc::unbounded_channel(); + let app = FakeApplication::builder(message_tx) + .inbound_decision_fn(|_| InboundDecision::Reject { + reason: BusinessRejectReason::NotAuthorized, + text: Some("Not authorized for this message".to_string()), + }) + .build(); + let (session, mut counterparty) = given_an_active_session_with_app(app, message_rx).await; + + // counterparty sends an execution report + when(&mut counterparty) + .sends_message(TestMessage::dummy_execution_report()) + .await; + + // session should respond with a Business Message Reject (MsgType "j") + then(&mut counterparty) + .receives(|msg| { + let msg_type: &str = msg.header().get(MSG_TYPE).unwrap(); + assert_eq!(msg_type, "j"); + + // RefMsgType should be the original message type ("8" for ExecutionReport) + let ref_msg_type: &str = msg.get(REF_MSG_TYPE).unwrap(); + assert_eq!(ref_msg_type, "8"); + + // BusinessRejectReason (tag 380) should be 6 (NotAuthorized) + let reason: u32 = msg.get(BUSINESS_REJECT_REASON).unwrap(); + assert_eq!(reason, 6); + + // RefSeqNum should be the sequence number of the rejected message + let ref_seq_num: u64 = msg.get(REF_SEQ_NUM).unwrap(); + assert!(ref_seq_num > 0); + + // Text should contain our reject reason + let text: &str = msg.get(TEXT).unwrap(); + assert_eq!(text, "Not authorized for this message"); + }) + .await; + + finally(&session, &mut counterparty).disconnect().await; +} + +/// Tests that when the application returns InboundDecision::Reject without text, +/// the Business Message Reject is sent without the Text field. +#[tokio::test] +async fn test_inbound_reject_without_text() { + let (message_tx, message_rx) = tokio::sync::mpsc::unbounded_channel(); + let app = FakeApplication::builder(message_tx) + .inbound_decision_fn(|_| InboundDecision::Reject { + reason: BusinessRejectReason::UnsupportedMessageType, + text: None, + }) + .build(); + let (session, mut counterparty) = given_an_active_session_with_app(app, message_rx).await; + + when(&mut counterparty) + .sends_message(TestMessage::dummy_execution_report()) + .await; + + then(&mut counterparty) + .receives(|msg| { + let msg_type: &str = msg.header().get(MSG_TYPE).unwrap(); + assert_eq!(msg_type, "j"); + + let reason: u32 = msg.get(BUSINESS_REJECT_REASON).unwrap(); + assert_eq!(reason, 3); + + // Text field should not be present + assert!(msg.get::<&str>(TEXT).is_err()); + }) + .await; + + finally(&session, &mut counterparty).disconnect().await; +} + +/// Field definition for BusinessRejectReason (tag 380), used for assertions only. +const BUSINESS_REJECT_REASON: &hotfix_message::HardCodedFixFieldDefinition = + &hotfix_message::HardCodedFixFieldDefinition { + name: "BusinessRejectReason", + tag: 380, + data_type: hotfix_message::dict::FixDatatype::Int, + location: hotfix_message::dict::FieldLocation::Body, + }; diff --git a/crates/hotfix/tests/session_test_cases/common/fakes/fake_application.rs b/crates/hotfix/tests/session_test_cases/common/fakes/fake_application.rs index d4bcf5c..ec89df2 100644 --- a/crates/hotfix/tests/session_test_cases/common/fakes/fake_application.rs +++ b/crates/hotfix/tests/session_test_cases/common/fakes/fake_application.rs @@ -2,27 +2,57 @@ use crate::common::test_messages::TestMessage; use hotfix::Application; use hotfix::application::{InboundDecision, OutboundDecision}; use hotfix_message::message::Message; +use std::sync::Mutex; + +type OutboundDecisionFn = Box OutboundDecision + Send>; +type InboundDecisionFn = Box InboundDecision + Send>; pub struct FakeApplication { message_sender: tokio::sync::mpsc::UnboundedSender, - outbound_decision: OutboundDecision, + outbound_decision_fn: Mutex, + inbound_decision_fn: Mutex, } impl FakeApplication { - pub fn new(message_sender: tokio::sync::mpsc::UnboundedSender) -> Self { - Self { + pub fn builder( + message_sender: tokio::sync::mpsc::UnboundedSender, + ) -> FakeApplicationBuilder { + FakeApplicationBuilder { message_sender, - outbound_decision: OutboundDecision::Send, + outbound_decision_fn: Box::new(|_| OutboundDecision::Send), + inbound_decision_fn: Box::new(|_| InboundDecision::Accept), } } +} - pub fn with_outbound_decision( - message_sender: tokio::sync::mpsc::UnboundedSender, - decision: OutboundDecision, +pub struct FakeApplicationBuilder { + message_sender: tokio::sync::mpsc::UnboundedSender, + outbound_decision_fn: OutboundDecisionFn, + inbound_decision_fn: InboundDecisionFn, +} + +impl FakeApplicationBuilder { + pub fn outbound_decision_fn( + mut self, + f: impl Fn(&TestMessage) -> OutboundDecision + Send + 'static, ) -> Self { - Self { - message_sender, - outbound_decision: decision, + self.outbound_decision_fn = Box::new(f); + self + } + + pub fn inbound_decision_fn( + mut self, + f: impl Fn(&Message) -> InboundDecision + Send + 'static, + ) -> Self { + self.inbound_decision_fn = Box::new(f); + self + } + + pub fn build(self) -> FakeApplication { + FakeApplication { + message_sender: self.message_sender, + outbound_decision_fn: Mutex::new(self.outbound_decision_fn), + inbound_decision_fn: Mutex::new(self.inbound_decision_fn), } } } @@ -31,13 +61,15 @@ impl FakeApplication { impl Application for FakeApplication { type Outbound = TestMessage; - async fn on_outbound_message(&self, _msg: &TestMessage) -> OutboundDecision { - self.outbound_decision + async fn on_outbound_message(&self, msg: &TestMessage) -> OutboundDecision { + let decision_fn = self.outbound_decision_fn.lock().unwrap(); + (decision_fn)(msg) } async fn on_inbound_message(&self, msg: &Message) -> InboundDecision { self.message_sender.send(msg.clone()).unwrap(); - InboundDecision::Accept + let decision_fn = self.inbound_decision_fn.lock().unwrap(); + (decision_fn)(msg) } async fn on_logout(&mut self, _reason: &str) {} diff --git a/crates/hotfix/tests/session_test_cases/common/setup.rs b/crates/hotfix/tests/session_test_cases/common/setup.rs index 9230ba1..b6faeea 100644 --- a/crates/hotfix/tests/session_test_cases/common/setup.rs +++ b/crates/hotfix/tests/session_test_cases/common/setup.rs @@ -3,13 +3,13 @@ use crate::common::assertions::then; use crate::common::fakes::{FakeApplication, FakeCounterparty, SessionSpy}; use crate::common::test_messages::TestMessage; use crate::session_test_cases::common::fakes::DisconnectedSession; -use hotfix::application::OutboundDecision; use hotfix::config::SessionConfig; use hotfix::session::InternalSessionRef; use hotfix::session::Status; use hotfix::store::in_memory::InMemoryMessageStore; use hotfix_message::Part; use hotfix_message::fix44::MSG_TYPE; +use hotfix_message::message::Message; pub const HEARTBEAT_INTERVAL: u64 = 30; pub const LOGON_TIMEOUT: u64 = 10; @@ -30,8 +30,12 @@ pub async fn given_a_connected_session_with_store( let counterparty_config = create_counterparty_session_config(config.clone()); let (message_tx, message_rx) = tokio::sync::mpsc::unbounded_channel(); - let session = InternalSessionRef::new(config, FakeApplication::new(message_tx), message_store) - .expect("session to be created successfully"); + let session = InternalSessionRef::new( + config, + FakeApplication::builder(message_tx).build(), + message_store, + ) + .expect("session to be created successfully"); let session_spy = SessionSpy::new(session.clone().into(), message_rx); let mock_counterparty = FakeCounterparty::start(session.clone(), counterparty_config) @@ -42,15 +46,14 @@ pub async fn given_a_connected_session_with_store( } /// Creates an active session with a configurable application. -pub async fn given_an_active_session_with_outbound_decision( - decision: OutboundDecision, +pub async fn given_an_active_session_with_app( + app: FakeApplication, + message_rx: tokio::sync::mpsc::UnboundedReceiver, ) -> (SessionSpy, FakeCounterparty) { let config = create_session_config(); let counterparty_config = create_counterparty_session_config(config.clone()); let message_store = InMemoryMessageStore::default(); - let (message_tx, message_rx) = tokio::sync::mpsc::unbounded_channel(); - let app = FakeApplication::with_outbound_decision(message_tx, decision); let session = InternalSessionRef::new(config, app, message_store) .expect("session to be created successfully"); @@ -77,9 +80,12 @@ pub fn given_a_disconnected_session() -> DisconnectedSession { let message_store = InMemoryMessageStore::default(); let (message_tx, _message_rx) = tokio::sync::mpsc::unbounded_channel(); - let session_ref = - InternalSessionRef::new(config, FakeApplication::new(message_tx), message_store) - .expect("session to be created successfully"); + let session_ref = InternalSessionRef::new( + config, + FakeApplication::builder(message_tx).build(), + message_store, + ) + .expect("session to be created successfully"); let session_handle = session_ref.clone().into(); diff --git a/crates/hotfix/tests/session_test_cases/mod.rs b/crates/hotfix/tests/session_test_cases/mod.rs index a9e9b40..c2dcb73 100644 --- a/crates/hotfix/tests/session_test_cases/mod.rs +++ b/crates/hotfix/tests/session_test_cases/mod.rs @@ -1,4 +1,5 @@ mod admin_request_tests; +mod business_reject_tests; mod business_tests; pub(crate) mod common; mod heartbeat_tests; diff --git a/crates/hotfix/tests/session_test_cases/send_confirmation_tests.rs b/crates/hotfix/tests/session_test_cases/send_confirmation_tests.rs index c6f8f0d..fb86a28 100644 --- a/crates/hotfix/tests/session_test_cases/send_confirmation_tests.rs +++ b/crates/hotfix/tests/session_test_cases/send_confirmation_tests.rs @@ -1,9 +1,9 @@ use crate::common::actions::when; use crate::common::assertions::then; use crate::common::cleanup::finally; +use crate::common::fakes::FakeApplication; use crate::common::setup::{ - given_a_disconnected_session, given_an_active_session, - given_an_active_session_with_outbound_decision, + given_a_disconnected_session, given_an_active_session, given_an_active_session_with_app, }; use crate::common::test_messages::TestMessage; use hotfix::application::OutboundDecision; @@ -129,8 +129,11 @@ async fn test_send_returns_disconnected_when_not_connected() { #[tokio::test] async fn test_send_returns_dropped_when_app_drops_message() { // Create an active session with an application configured to drop messages - let (session, mut counterparty) = - given_an_active_session_with_outbound_decision(OutboundDecision::Drop).await; + let (message_tx, message_rx) = tokio::sync::mpsc::unbounded_channel(); + let app = FakeApplication::builder(message_tx) + .outbound_decision_fn(|_| OutboundDecision::Drop) + .build(); + let (session, mut counterparty) = given_an_active_session_with_app(app, message_rx).await; // Send a message - should be dropped by the application let result = when(&session) @@ -153,8 +156,11 @@ async fn test_send_returns_dropped_when_app_drops_message() { #[tokio::test] async fn test_send_returns_session_terminated_when_app_terminates() { // Create an active session with an application configured to terminate session - let (session, mut counterparty) = - given_an_active_session_with_outbound_decision(OutboundDecision::TerminateSession).await; + let (message_tx, message_rx) = tokio::sync::mpsc::unbounded_channel(); + let app = FakeApplication::builder(message_tx) + .outbound_decision_fn(|_| OutboundDecision::TerminateSession) + .build(); + let (session, mut counterparty) = given_an_active_session_with_app(app, message_rx).await; // Send a message - should cause session termination let result = when(&session) From 83d2f70cb6bf1a21f5509deef97af2d939dcdd53 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Fri, 6 Feb 2026 14:43:59 +0100 Subject: [PATCH 7/7] Remove dead code --- crates/hotfix/src/message/business_reject.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/crates/hotfix/src/message/business_reject.rs b/crates/hotfix/src/message/business_reject.rs index c194bf4..302a2e0 100644 --- a/crates/hotfix/src/message/business_reject.rs +++ b/crates/hotfix/src/message/business_reject.rs @@ -5,14 +5,6 @@ use hotfix_message::message::Message; use hotfix_message::session_fields::{REF_MSG_TYPE, REF_SEQ_NUM, TEXT}; use hotfix_message::{Buffer, FieldType, HardCodedFixFieldDefinition, Part}; -#[allow(dead_code)] -const BUSINESS_REJECT_REF_ID: &HardCodedFixFieldDefinition = &HardCodedFixFieldDefinition { - name: "BusinessRejectRefID", - tag: 379, - data_type: FixDatatype::String, - location: FieldLocation::Body, -}; - const BUSINESS_REJECT_REASON: &HardCodedFixFieldDefinition = &HardCodedFixFieldDefinition { name: "BusinessRejectReason", tag: 380,