From 2e42079498dff292116b1e34725509c45ce7ece4 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Wed, 4 Feb 2026 11:48:13 +0100 Subject: [PATCH 01/11] Introduce new error type for session creation issues --- crates/hotfix/src/session.rs | 11 +++++++---- crates/hotfix/src/session/error.rs | 19 +++++++++++++++++-- crates/hotfix/src/session_schedule.rs | 20 ++++++++++---------- 3 files changed, 34 insertions(+), 16 deletions(-) diff --git a/crates/hotfix/src/session.rs b/crates/hotfix/src/session.rs index 2781f29..0d158f4 100644 --- a/crates/hotfix/src/session.rs +++ b/crates/hotfix/src/session.rs @@ -14,7 +14,7 @@ use crate::message::parser::RawFixMessage; use crate::message::{InboundMessage, generate_message}; use crate::store::MessageStore; use crate::transport::writer::WriterRef; -use anyhow::{Context, Result, anyhow, bail}; +use anyhow::{Context, Result, anyhow}; use chrono::Utc; use hotfix_message::dict::Dictionary; use hotfix_message::message::{Config as MessageConfig, Message}; @@ -36,6 +36,7 @@ use crate::message::verification::verify_message; use crate::message::verification_error::{CompIdType, MessageVerificationError}; use crate::message_utils::{is_admin, prepare_message_for_resend}; use crate::session::admin_request::AdminRequest; +use crate::session::error::SessionCreationError; pub use crate::session::error::{SendError, SendOutcome}; pub use crate::session::info::{SessionInfo, Status}; pub use crate::session::session_handle::SessionHandle; @@ -80,7 +81,7 @@ where config: SessionConfig, application: App, store: Store, - ) -> Result> { + ) -> std::result::Result, SessionCreationError> { let schedule_check_timer = sleep(Duration::from_secs(SCHEDULE_CHECK_INTERVAL)); let dictionary = Self::get_data_dictionary(&config)?; @@ -104,12 +105,14 @@ where Ok(session) } - fn get_data_dictionary(config: &SessionConfig) -> Result { + fn get_data_dictionary(config: &SessionConfig) -> Result { match &config.data_dictionary_path { None => match config.begin_string.as_str() { #[cfg(feature = "fix44")] "FIX.4.4" => Ok(Dictionary::fix44()), - _ => bail!("unsupported begin string: {}", config.begin_string), + _ => Err(SessionCreationError::UnsupportedBeginString( + config.begin_string.to_string(), + )), }, Some(dictionary_path) => Ok(Dictionary::load_from_file(dictionary_path)?), } diff --git a/crates/hotfix/src/session/error.rs b/crates/hotfix/src/session/error.rs index 42da203..b9b4202 100644 --- a/crates/hotfix/src/session/error.rs +++ b/crates/hotfix/src/session/error.rs @@ -3,7 +3,7 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum SessionError { - #[error("Schedule configuration is invalid: {0}")] + #[error("schedule configuration is invalid: {0}")] InvalidSchedule(String), #[error("store operation failed")] @@ -12,6 +12,21 @@ pub enum SessionError { pub type Result = std::result::Result; +#[derive(Debug, Error)] +pub enum SessionCreationError { + #[error("unsupported BeginString: {0}")] + UnsupportedBeginString(String), + + #[error("dictionary failed to parse")] + MalformedDictionary(#[from] hotfix_message::dict::ParseError), + + #[error("dictionary contents are invalid")] + InvalidDictionary(#[from] hotfix_message::error::ParserError), + + #[error("schedule configuration is invalid: {0}")] + InvalidSchedule(String), +} + /// Outcome of a successful message send operation. #[derive(Debug, Clone, PartialEq, Eq)] pub enum SendOutcome { @@ -21,7 +36,7 @@ pub enum SendOutcome { Dropped, } -/// Error that can occur when sending a message. +/// Error that can occur when sending an outbound message to the session. #[derive(Debug, Error)] pub enum SendError { #[error("session is disconnected")] diff --git a/crates/hotfix/src/session_schedule.rs b/crates/hotfix/src/session_schedule.rs index 08a4756..1178a84 100644 --- a/crates/hotfix/src/session_schedule.rs +++ b/crates/hotfix/src/session_schedule.rs @@ -1,5 +1,5 @@ use crate::config::ScheduleConfig; -use crate::session::error::SessionError; +use crate::session::error::{SessionCreationError, SessionError}; use chrono::{DateTime, Datelike, Days, NaiveDate, NaiveTime, TimeDelta, Utc, Weekday}; use chrono_tz::Tz; @@ -29,7 +29,6 @@ pub enum SessionSchedule { }, } -#[allow(dead_code)] impl SessionSchedule { pub fn is_active_at(&self, datetime: &DateTime) -> bool { match self { @@ -130,6 +129,7 @@ impl SessionSchedule { } #[allow(unused_variables)] + #[allow(dead_code)] fn check_weekly_schedule( datetime: &DateTime, start_day: Weekday, @@ -141,7 +141,7 @@ impl SessionSchedule { } impl TryFrom<&ScheduleConfig> for SessionSchedule { - type Error = SessionError; + type Error = SessionCreationError; fn try_from(config: &ScheduleConfig) -> Result { match config { @@ -175,7 +175,7 @@ impl TryFrom<&ScheduleConfig> for SessionSchedule { }) } } else if start == end { - Err(SessionError::InvalidSchedule( + Err(SessionCreationError::InvalidSchedule( "Start and end times cannot be equal when weekdays is set".to_string(), )) } else { @@ -199,8 +199,8 @@ impl TryFrom<&ScheduleConfig> for SessionSchedule { } => { // Weekdays should be empty for weekly sessions if !weekdays.is_empty() { - return Err(SessionError::InvalidSchedule( - "Weekly sessions cannot have weekdays specified".to_string(), + return Err(SessionCreationError::InvalidSchedule( + "weekly sessions cannot have weekdays specified".to_string(), )); } @@ -214,15 +214,15 @@ impl TryFrom<&ScheduleConfig> for SessionSchedule { } // Invalid combinations - _ => Err(SessionError::InvalidSchedule( - "Invalid schedule configuration: incomplete or conflicting parameters".to_string(), + _ => Err(SessionCreationError::InvalidSchedule( + "invalid schedule configuration: incomplete or conflicting parameters".to_string(), )), } } } impl TryFrom> for SessionSchedule { - type Error = SessionError; + type Error = SessionCreationError; fn try_from(maybe_schedule: Option<&ScheduleConfig>) -> Result { match maybe_schedule { @@ -1040,7 +1040,7 @@ mod tests { let result = SessionSchedule::try_from(&config); assert!(result.is_err()); match result.unwrap_err() { - SessionError::InvalidSchedule(msg) => { + SessionCreationError::InvalidSchedule(msg) => { assert!(msg.contains("Weekly sessions cannot have weekdays specified")); } other => panic!("unexpected error: {other}"), From aac4801fafe741e1d370ed6ed70564899d714772 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Wed, 4 Feb 2026 11:55:29 +0100 Subject: [PATCH 02/11] Improve error handling in session schedule code --- crates/hotfix/src/session.rs | 45 ++-- crates/hotfix/src/session/error.rs | 5 - crates/hotfix/src/session_schedule.rs | 288 +++++++++++++++++++------- 3 files changed, 245 insertions(+), 93 deletions(-) diff --git a/crates/hotfix/src/session.rs b/crates/hotfix/src/session.rs index 0d158f4..738f6d0 100644 --- a/crates/hotfix/src/session.rs +++ b/crates/hotfix/src/session.rs @@ -47,7 +47,7 @@ pub use crate::session::session_ref::InternalSessionRef; use crate::session::session_ref::OutboundRequest; use crate::session::state::SessionState; use crate::session::state::{AwaitingResendTransitionOutcome, TestRequestId}; -use crate::session_schedule::SessionSchedule; +use crate::session_schedule::{SessionPeriodComparison, SessionSchedule}; use event::SessionEvent; use hotfix_message::parsed_message::{InvalidReason, ParsedMessage}; use hotfix_message::session_fields::{ @@ -1052,10 +1052,10 @@ where .schedule .is_same_session_period(&self.store.creation_time(), &now) { - Ok(true) => { + Ok(SessionPeriodComparison::SamePeriod) => { // we are in the same period, nothing needs to be done } - Ok(false) => { + Ok(SessionPeriodComparison::DifferentPeriod) => { // the message store is for a previous session, // we need to terminate this session, reset the store, and reestablish the session self.logout_and_terminate("session period changed").await; @@ -1065,7 +1065,20 @@ where SessionState::new_disconnected(false, "unexpected error in reset"); } } + Ok(SessionPeriodComparison::OutsideSessionTime { .. }) => { + // the creation_time was recorded outside the session schedule, + // treat this similarly to a different period - reset the store + warn!("store creation time is outside session schedule, resetting store"); + self.logout_and_terminate("creation time outside schedule") + .await; + if let Err(err) = self.store.reset().await { + error!("error resetting session store: {err:}"); + self.state = + SessionState::new_disconnected(false, "unexpected error in reset"); + } + } Err(err) => { + // actual schedule calculation error (e.g., DST transition, date overflow) error!("error checking session period: {err:?}"); self.logout_and_terminate("internal error").await; } @@ -1373,6 +1386,8 @@ mod tests { #[tokio::test] async fn test_handle_schedule_check_active_different_period() { + use crate::session_schedule::SessionPeriodComparison; + // Use a Daily schedule that's currently active let schedule = create_active_schedule(); let writer = create_writer_ref(); @@ -1390,13 +1405,13 @@ mod tests { .schedule .is_same_session_period(&creation_time, &now); assert!( - matches!(same_period, Ok(false)), + matches!(same_period, Ok(SessionPeriodComparison::DifferentPeriod)), "Schedule should identify different periods" ); session.handle_schedule_check().await; - // Store reset should have been called (indicates Ok(false) branch was taken) + // Store reset should have been called (indicates DifferentPeriod branch was taken) // Note: logout_and_terminate disconnects the writer but state transition to // Disconnected happens asynchronously via event processing, not in this call assert!( @@ -1438,7 +1453,9 @@ mod tests { } #[tokio::test] - async fn test_handle_schedule_check_active_period_error() { + async fn test_handle_schedule_check_active_creation_time_outside_schedule() { + use crate::session_schedule::SessionPeriodComparison; + // Use a narrow schedule that's currently active but creation_time is outside let now = Utc::now(); let current_hour = now.time().hour(); @@ -1475,22 +1492,24 @@ mod tests { let mut session = create_test_session(schedule, state, store); - // Verify that is_same_session_period will return an error + // Verify that is_same_session_period returns OutsideSessionTime let same_period = session .schedule .is_same_session_period(&creation_time, &now); assert!( - same_period.is_err(), - "Schedule should return error when creation_time is outside active window" + matches!( + same_period, + Ok(SessionPeriodComparison::OutsideSessionTime { .. }) + ), + "Schedule should return OutsideSessionTime when creation_time is outside active window" ); session.handle_schedule_check().await; - // The Err branch calls logout_and_terminate which disconnects the writer. - // Store reset is NOT called in the Err branch, only in Ok(false). + // The OutsideSessionTime branch now triggers a store reset (same as DifferentPeriod) assert!( - !session.store.was_reset_called(), - "Store reset should not be called on period check error" + session.store.was_reset_called(), + "Store reset should be called when creation_time is outside schedule" ); } diff --git a/crates/hotfix/src/session/error.rs b/crates/hotfix/src/session/error.rs index b9b4202..9180d72 100644 --- a/crates/hotfix/src/session/error.rs +++ b/crates/hotfix/src/session/error.rs @@ -3,15 +3,10 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum SessionError { - #[error("schedule configuration is invalid: {0}")] - InvalidSchedule(String), - #[error("store operation failed")] Store(#[from] StoreError), } -pub type Result = std::result::Result; - #[derive(Debug, Error)] pub enum SessionCreationError { #[error("unsupported BeginString: {0}")] diff --git a/crates/hotfix/src/session_schedule.rs b/crates/hotfix/src/session_schedule.rs index 1178a84..c09e2f3 100644 --- a/crates/hotfix/src/session_schedule.rs +++ b/crates/hotfix/src/session_schedule.rs @@ -1,12 +1,38 @@ use crate::config::ScheduleConfig; -use crate::session::error::{SessionCreationError, SessionError}; +use crate::session::error::SessionCreationError; use chrono::{DateTime, Datelike, Days, NaiveDate, NaiveTime, TimeDelta, Utc, Weekday}; use chrono_tz::Tz; +use thiserror::Error; + +/// Result of comparing two times to determine if they fall within the same session period. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum SessionPeriodComparison { + SamePeriod, + DifferentPeriod, + OutsideSessionTime { which: WhichTime }, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum WhichTime { + First, + Second, + Both, +} + +#[derive(Debug, Error)] +pub enum ScheduleError { + #[error("ambiguous or missing time: {date} {time} in timezone {timezone} (DST transition)")] + AmbiguousOrMissingTime { + date: NaiveDate, + time: NaiveTime, + timezone: Tz, + }, -type Result = std::result::Result; + #[error("date calculation overflow: {context}")] + DateCalculationOverflow { context: String }, +} #[derive(Clone, Debug)] -#[allow(dead_code)] pub enum SessionSchedule { NonStop, Daily { @@ -20,6 +46,7 @@ pub enum SessionSchedule { weekdays: Vec, timezone: Tz, }, + #[allow(dead_code)] Weekly { start_day: Weekday, start_time: NaiveTime, @@ -54,21 +81,36 @@ impl SessionSchedule { } } - pub fn is_same_session_period(&self, dt1: &DateTime, dt2: &DateTime) -> Result { - if !self.is_active_at(dt1) || !self.is_active_at(dt2) { - return Err(SessionError::InvalidSchedule( - "Time doesn't fall in any session period".to_string(), - )); + pub fn is_same_session_period( + &self, + dt1: &DateTime, + dt2: &DateTime, + ) -> Result { + let dt1_active = self.is_active_at(dt1); + let dt2_active = self.is_active_at(dt2); + + if !dt1_active || !dt2_active { + let which = match (dt1_active, dt2_active) { + (false, false) => WhichTime::Both, + (false, true) => WhichTime::First, + (true, false) => WhichTime::Second, + (true, true) => unreachable!(), + }; + return Ok(SessionPeriodComparison::OutsideSessionTime { which }); } let (start, end) = self.get_session_bounds(dt1)?; - Ok(start <= *dt2 && *dt2 < end) + if start <= *dt2 && *dt2 < end { + Ok(SessionPeriodComparison::SamePeriod) + } else { + Ok(SessionPeriodComparison::DifferentPeriod) + } } fn get_session_bounds( &self, datetime: &DateTime, - ) -> Result<(DateTime, DateTime)> { + ) -> Result<(DateTime, DateTime), ScheduleError> { match self { SessionSchedule::NonStop => { Ok((DateTime::default(), Utc::now() + TimeDelta::weeks(1000))) @@ -204,13 +246,17 @@ impl TryFrom<&ScheduleConfig> for SessionSchedule { )); } - Ok(SessionSchedule::Weekly { + let _ = SessionSchedule::Weekly { start_day: *start_day, start_time: *start, end_day: *end_day, end_time: *end, timezone: timezone.unwrap_or(Tz::UTC), - }) + }; + + Err(SessionCreationError::InvalidSchedule( + "weekly sessions are not supported yet".to_string(), + )) } // Invalid combinations @@ -232,15 +278,19 @@ impl TryFrom> for SessionSchedule { } } -fn construct_utc(date: &NaiveDate, time: &NaiveTime, timezone: &Tz) -> Result> { - // TODO: do we want to handle Ambiguous and None outcomes? - // these variants correspond to Python's gap and fold: https://peps.python.org/pep-0495/#terminology +fn construct_utc( + date: &NaiveDate, + time: &NaiveTime, + timezone: &Tz, +) -> Result, ScheduleError> { if let Some(dt) = date.and_time(*time).and_local_timezone(*timezone).single() { Ok(dt.to_utc()) } else { - Err(SessionError::InvalidSchedule( - "Invalid schedule configuration: invalid time".to_string(), - )) + Err(ScheduleError::AmbiguousOrMissingTime { + date: *date, + time: *time, + timezone: *timezone, + }) } } @@ -249,7 +299,7 @@ fn calculate_single_day_session_bounds( start_time: &NaiveTime, end_time: &NaiveTime, timezone: &Tz, -) -> Result<(DateTime, DateTime)> { +) -> Result<(DateTime, DateTime), ScheduleError> { let local_datetime = datetime.with_timezone(timezone); if local_datetime.time() >= *start_time { @@ -261,7 +311,9 @@ fn calculate_single_day_session_bounds( local_datetime .date_naive() .checked_add_days(Days::new(1)) - .ok_or_else(|| SessionError::InvalidSchedule("Failed to add day".to_string()))? + .ok_or_else(|| ScheduleError::DateCalculationOverflow { + context: "failed to add day for end date".to_string(), + })? } else { local_datetime.date_naive() }; @@ -272,8 +324,8 @@ fn calculate_single_day_session_bounds( let start_date = local_datetime .date_naive() .checked_sub_days(Days::new(1)) - .ok_or_else(|| { - SessionError::InvalidSchedule("Failed to get previous day".to_string()) + .ok_or_else(|| ScheduleError::DateCalculationOverflow { + context: "failed to get previous day for start date".to_string(), })?; let start = construct_utc(&start_date, start_time, timezone)?; let end = construct_utc(&local_datetime.date_naive(), end_time, timezone)?; @@ -981,7 +1033,7 @@ mod tests { } #[test] - fn test_into_weekly_session() { + fn test_into_weekly_session_not_supported() { let config = ScheduleConfig { start_time: Some(NaiveTime::from_hms_opt(18, 0, 0).unwrap()), end_time: Some(NaiveTime::from_hms_opt(17, 0, 0).unwrap()), @@ -991,27 +1043,18 @@ mod tests { timezone: None, }; - let schedule = SessionSchedule::try_from(&config).unwrap(); - match schedule { - SessionSchedule::Weekly { - start_day, - start_time, - end_day, - end_time, - timezone, - } => { - assert_eq!(start_day, Weekday::Sun); - assert_eq!(start_time, NaiveTime::from_hms_opt(18, 0, 0).unwrap()); - assert_eq!(end_day, Weekday::Fri); - assert_eq!(end_time, NaiveTime::from_hms_opt(17, 0, 0).unwrap()); - assert_eq!(timezone, Tz::UTC); + let result = SessionSchedule::try_from(&config); + assert!(result.is_err()); + match result.unwrap_err() { + SessionCreationError::InvalidSchedule(msg) => { + assert!(msg.contains("weekly sessions are not supported yet")); } - _ => panic!("Expected Weekly schedule"), + other => panic!("unexpected error: {other}"), } } #[test] - fn test_into_weekly_session_with_equal_times_is_still_weekly() { + fn test_into_weekly_session_with_equal_times_not_supported() { let time = NaiveTime::from_hms_opt(12, 0, 0).unwrap(); let config = ScheduleConfig { start_time: Some(time), @@ -1022,8 +1065,14 @@ mod tests { timezone: None, }; - let schedule = SessionSchedule::try_from(&config).unwrap(); - assert!(matches!(schedule, SessionSchedule::Weekly { .. })); + let result = SessionSchedule::try_from(&config); + assert!(result.is_err()); + match result.unwrap_err() { + SessionCreationError::InvalidSchedule(msg) => { + assert!(msg.contains("weekly sessions are not supported yet")); + } + other => panic!("unexpected error: {other}"), + } } #[test] @@ -1041,7 +1090,7 @@ mod tests { assert!(result.is_err()); match result.unwrap_err() { SessionCreationError::InvalidSchedule(msg) => { - assert!(msg.contains("Weekly sessions cannot have weekdays specified")); + assert!(msg.contains("weekly sessions cannot have weekdays specified")); } other => panic!("unexpected error: {other}"), } @@ -1153,7 +1202,10 @@ mod tests { let dt2 = DateTime::parse_from_rfc3339("2026-01-15T23:30:00-05:00") .unwrap() .to_utc(); - assert!(schedule.is_same_session_period(&dt1, &dt2).unwrap()); + assert_eq!( + schedule.is_same_session_period(&dt1, &dt2).unwrap(), + SessionPeriodComparison::SamePeriod + ); } #[test] @@ -1164,7 +1216,7 @@ mod tests { timezone: Tz::UTC, }; - // two times within the same session period return true + // two times within the same session period return SamePeriod let dt1 = DateTime::from_naive_utc_and_offset( NaiveDate::from_ymd_opt(2025, 6, 27) .unwrap() @@ -1179,10 +1231,16 @@ mod tests { .unwrap(), Utc, ); - assert!(schedule.is_same_session_period(&dt1, &dt2).unwrap()); - assert!(schedule.is_same_session_period(&dt2, &dt1).unwrap()); + assert_eq!( + schedule.is_same_session_period(&dt1, &dt2).unwrap(), + SessionPeriodComparison::SamePeriod + ); + assert_eq!( + schedule.is_same_session_period(&dt2, &dt1).unwrap(), + SessionPeriodComparison::SamePeriod + ); - // time for the next session period returns false + // time for the next session period returns DifferentPeriod let dt3 = DateTime::from_naive_utc_and_offset( NaiveDate::from_ymd_opt(2025, 6, 28) .unwrap() @@ -1190,10 +1248,16 @@ mod tests { .unwrap(), Utc, ); - assert!(!schedule.is_same_session_period(&dt1, &dt3).unwrap()); - assert!(!schedule.is_same_session_period(&dt3, &dt1).unwrap()); + assert_eq!( + schedule.is_same_session_period(&dt1, &dt3).unwrap(), + SessionPeriodComparison::DifferentPeriod + ); + assert_eq!( + schedule.is_same_session_period(&dt3, &dt1).unwrap(), + SessionPeriodComparison::DifferentPeriod + ); - // time on the same day but outside session time returns error + // time on the same day but outside session time returns OutsideSessionTime let dt4 = DateTime::from_naive_utc_and_offset( NaiveDate::from_ymd_opt(2025, 6, 27) .unwrap() @@ -1201,8 +1265,18 @@ mod tests { .unwrap(), Utc, ); - assert!(schedule.is_same_session_period(&dt1, &dt4).is_err()); - assert!(schedule.is_same_session_period(&dt4, &dt1).is_err()); + assert!(matches!( + schedule.is_same_session_period(&dt1, &dt4).unwrap(), + SessionPeriodComparison::OutsideSessionTime { + which: WhichTime::Second + } + )); + assert!(matches!( + schedule.is_same_session_period(&dt4, &dt1).unwrap(), + SessionPeriodComparison::OutsideSessionTime { + which: WhichTime::First + } + )); } #[test] @@ -1220,7 +1294,10 @@ mod tests { let dt2 = DateTime::parse_from_rfc3339("2025-01-15T22:45:00-05:00") .unwrap() .to_utc(); - assert!(schedule.is_same_session_period(&dt1, &dt2).unwrap()); + assert_eq!( + schedule.is_same_session_period(&dt1, &dt2).unwrap(), + SessionPeriodComparison::SamePeriod + ); // different session periods on consecutive days let dt3 = DateTime::parse_from_rfc3339("2024-01-15T22:30:00-05:00") @@ -1229,7 +1306,10 @@ mod tests { let dt4 = DateTime::parse_from_rfc3339("2024-01-16T02:30:00-05:00") .unwrap() .to_utc(); - assert!(!schedule.is_same_session_period(&dt3, &dt4).unwrap()); + assert_eq!( + schedule.is_same_session_period(&dt3, &dt4).unwrap(), + SessionPeriodComparison::DifferentPeriod + ); // session boundary testing - end of session vs start of next session let dt5 = DateTime::parse_from_rfc3339("2024-01-15T22:59:59-05:00") @@ -1238,7 +1318,10 @@ mod tests { let dt6 = DateTime::parse_from_rfc3339("2024-01-16T01:00:01-05:00") .unwrap() .to_utc(); - assert!(!schedule.is_same_session_period(&dt5, &dt6).unwrap()); + assert_eq!( + schedule.is_same_session_period(&dt5, &dt6).unwrap(), + SessionPeriodComparison::DifferentPeriod + ); // time that doesn't fall into any session period let dt7 = DateTime::parse_from_rfc3339("2024-01-15T23:30:00-05:00") @@ -1247,7 +1330,12 @@ mod tests { let dt8 = DateTime::parse_from_rfc3339("2024-01-15T10:00:00-05:00") .unwrap() .to_utc(); - assert!(schedule.is_same_session_period(&dt7, &dt8).is_err()); + assert!(matches!( + schedule.is_same_session_period(&dt7, &dt8).unwrap(), + SessionPeriodComparison::OutsideSessionTime { + which: WhichTime::First + } + )); } #[test] @@ -1266,8 +1354,14 @@ mod tests { let dt2 = DateTime::parse_from_rfc3339("2025-01-16T00:45:00-05:00") .unwrap() .to_utc(); - assert!(schedule.is_same_session_period(&dt1, &dt2).unwrap()); - assert!(schedule.is_same_session_period(&dt2, &dt1).unwrap()); + assert_eq!( + schedule.is_same_session_period(&dt1, &dt2).unwrap(), + SessionPeriodComparison::SamePeriod + ); + assert_eq!( + schedule.is_same_session_period(&dt2, &dt1).unwrap(), + SessionPeriodComparison::SamePeriod + ); // different session period on the same day let dt1 = DateTime::parse_from_rfc3339("2025-01-15T15:30:00-05:00") @@ -1276,8 +1370,14 @@ mod tests { let dt2 = DateTime::parse_from_rfc3339("2025-01-15T00:45:00-05:00") .unwrap() .to_utc(); - assert!(!schedule.is_same_session_period(&dt1, &dt2).unwrap()); - assert!(!schedule.is_same_session_period(&dt2, &dt1).unwrap()); + assert_eq!( + schedule.is_same_session_period(&dt1, &dt2).unwrap(), + SessionPeriodComparison::DifferentPeriod + ); + assert_eq!( + schedule.is_same_session_period(&dt2, &dt1).unwrap(), + SessionPeriodComparison::DifferentPeriod + ); } #[test] @@ -1296,7 +1396,7 @@ mod tests { }; // 10/07/2025 is a Thursday - // two times within the same session period return true + // two times within the same session period return SamePeriod let dt1 = DateTime::from_naive_utc_and_offset( NaiveDate::from_ymd_opt(2025, 7, 10) .unwrap() @@ -1311,10 +1411,16 @@ mod tests { .unwrap(), Utc, ); - assert!(schedule.is_same_session_period(&dt1, &dt2).unwrap()); - assert!(schedule.is_same_session_period(&dt2, &dt1).unwrap()); + assert_eq!( + schedule.is_same_session_period(&dt1, &dt2).unwrap(), + SessionPeriodComparison::SamePeriod + ); + assert_eq!( + schedule.is_same_session_period(&dt2, &dt1).unwrap(), + SessionPeriodComparison::SamePeriod + ); - // time for the next session period returns false + // time for the next session period returns DifferentPeriod let dt3 = DateTime::from_naive_utc_and_offset( NaiveDate::from_ymd_opt(2025, 7, 11) .unwrap() @@ -1322,10 +1428,16 @@ mod tests { .unwrap(), Utc, ); - assert!(!schedule.is_same_session_period(&dt1, &dt3).unwrap()); - assert!(!schedule.is_same_session_period(&dt3, &dt1).unwrap()); + assert_eq!( + schedule.is_same_session_period(&dt1, &dt3).unwrap(), + SessionPeriodComparison::DifferentPeriod + ); + assert_eq!( + schedule.is_same_session_period(&dt3, &dt1).unwrap(), + SessionPeriodComparison::DifferentPeriod + ); - // time on the same day but outside session time returns error + // time on the same day but outside session time returns OutsideSessionTime let dt4 = DateTime::from_naive_utc_and_offset( NaiveDate::from_ymd_opt(2025, 7, 10) .unwrap() @@ -1333,19 +1445,39 @@ mod tests { .unwrap(), Utc, ); - assert!(schedule.is_same_session_period(&dt1, &dt4).is_err()); - assert!(schedule.is_same_session_period(&dt4, &dt1).is_err()); + assert!(matches!( + schedule.is_same_session_period(&dt1, &dt4).unwrap(), + SessionPeriodComparison::OutsideSessionTime { + which: WhichTime::Second + } + )); + assert!(matches!( + schedule.is_same_session_period(&dt4, &dt1).unwrap(), + SessionPeriodComparison::OutsideSessionTime { + which: WhichTime::First + } + )); - // time falls on the Saturday (outside session period) returns error - let dt4 = DateTime::from_naive_utc_and_offset( + // time falls on the Saturday (outside session period) returns OutsideSessionTime + let dt5 = DateTime::from_naive_utc_and_offset( NaiveDate::from_ymd_opt(2025, 7, 12) .unwrap() .and_hms_opt(13, 0, 0) .unwrap(), Utc, ); - assert!(schedule.is_same_session_period(&dt1, &dt4).is_err()); - assert!(schedule.is_same_session_period(&dt4, &dt1).is_err()); + assert!(matches!( + schedule.is_same_session_period(&dt1, &dt5).unwrap(), + SessionPeriodComparison::OutsideSessionTime { + which: WhichTime::Second + } + )); + assert!(matches!( + schedule.is_same_session_period(&dt5, &dt1).unwrap(), + SessionPeriodComparison::OutsideSessionTime { + which: WhichTime::First + } + )); } #[test] @@ -1357,7 +1489,10 @@ mod tests { let result = construct_utc(&date, &time, &timezone); - assert!(result.is_err()); + assert!(matches!( + result, + Err(ScheduleError::AmbiguousOrMissingTime { .. }) + )); } #[test] @@ -1369,6 +1504,9 @@ mod tests { let result = construct_utc(&date, &time, &timezone); - assert!(result.is_err()); + assert!(matches!( + result, + Err(ScheduleError::AmbiguousOrMissingTime { .. }) + )); } } From 6698e5222a8cecc63f6ab4c855566d738db07236 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Wed, 4 Feb 2026 12:59:29 +0100 Subject: [PATCH 03/11] Expect message sequence numbers once message has passed validation --- crates/hotfix/src/session.rs | 57 ++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 31 deletions(-) diff --git a/crates/hotfix/src/session.rs b/crates/hotfix/src/session.rs index 738f6d0..4502b67 100644 --- a/crates/hotfix/src/session.rs +++ b/crates/hotfix/src/session.rs @@ -14,7 +14,7 @@ use crate::message::parser::RawFixMessage; use crate::message::{InboundMessage, generate_message}; use crate::store::MessageStore; use crate::transport::writer::WriterRef; -use anyhow::{Context, Result, anyhow}; +use anyhow::{Context, Result}; use chrono::Utc; use hotfix_message::dict::Dictionary; use hotfix_message::message::{Config as MessageConfig, Message}; @@ -190,10 +190,7 @@ where let message_type = message.header().get(MSG_TYPE)?; if let SessionState::AwaitingResend(state) = &mut self.state { - let seq_number: u64 = message - .header() - .get(MSG_SEQ_NUM) - .map_err(|e| anyhow!("failed to get seq number: {:?}", e))?; + let seq_number = get_msg_seq_num(&message); if seq_number > state.end_seq_number { state.inbound_queue.push_back(message); return Ok(()); @@ -411,14 +408,9 @@ where let begin_seq_number: u64 = match message.get(BEGIN_SEQ_NO) { Ok(seq_number) => seq_number, Err(_) => { - let reject = Reject::new( - message - .header() - .get(MSG_SEQ_NUM) - .map_err(|_| anyhow!("failed to get seq number"))?, - ) - .session_reject_reason(SessionRejectReason::RequiredTagMissing) - .text("missing begin sequence number for resend request"); + let reject = Reject::new(get_msg_seq_num(message)) + .session_reject_reason(SessionRejectReason::RequiredTagMissing) + .text("missing begin sequence number for resend request"); self.send_message(reject) .await .context("failed to send reject for invalid resend request")?; @@ -436,14 +428,9 @@ where } } Err(_) => { - let reject = Reject::new( - message - .header() - .get(MSG_SEQ_NUM) - .map_err(|_| anyhow!("failed to get seq number"))?, - ) - .session_reject_reason(SessionRejectReason::RequiredTagMissing) - .text("missing end sequence number for resend request"); + let reject = Reject::new(get_msg_seq_num(message)) + .session_reject_reason(SessionRejectReason::RequiredTagMissing) + .text("missing end sequence number for resend request"); self.send_message(reject) .await .context("failed to send reject for invalid resend request")?; @@ -474,10 +461,7 @@ where } async fn on_sequence_reset(&mut self, message: &Message) -> Result<()> { - let msg_seq_num = message - .header() - .get(MSG_SEQ_NUM) - .map_err(|_| anyhow!("failed to get seq number"))?; + let msg_seq_num = get_msg_seq_num(message); let is_gap_fill: bool = message.get(GAP_FILL_FLAG).unwrap_or(false); if let Err(err) = self.verify_message(message, is_gap_fill) { self.handle_verification_error(err).await?; @@ -727,12 +711,7 @@ where .build(msg.as_slice()) .into_message() .with_context(|| format!("failed to build message for raw message: {msg:?}"))?; - sequence_number = message.header().get::(MSG_SEQ_NUM).map_err(|e| { - anyhow!( - "sequence number in message to resend is unexpectedly missing: {:?}", - e - ) - })?; + sequence_number = get_msg_seq_num(&message); let message_type: String = message .header() .get::<&str>(MSG_TYPE) @@ -1107,6 +1086,22 @@ where } } +/// Extracts MsgSeqNum from a message header. +/// +/// To be removed once https://github.com/Validus-Risk-Management/hotfix/issues/301 +/// is implemented. +/// +/// # Panics +/// Panics if the message does not contain a valid MsgSeqNum field. +/// This should never happen for messages that have passed validation. +#[allow(clippy::expect_used)] +fn get_msg_seq_num(message: &Message) -> u64 { + message + .header() + .get(MSG_SEQ_NUM) + .expect("MsgSeqNum missing from validated message - parser bug") +} + async fn run_session( mut session: Session, mut event_receiver: mpsc::Receiver, From 1dd8dbc710021cf01653f5e82fb7c9b7eb25b498 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Wed, 4 Feb 2026 15:32:19 +0100 Subject: [PATCH 04/11] Introduce new internal error types in session code --- crates/hotfix/src/session.rs | 249 +++++++++++++------- crates/hotfix/src/session/error.rs | 65 ++++- crates/hotfix/src/session/session_handle.rs | 2 +- 3 files changed, 234 insertions(+), 82 deletions(-) diff --git a/crates/hotfix/src/session.rs b/crates/hotfix/src/session.rs index 4502b67..0be0355 100644 --- a/crates/hotfix/src/session.rs +++ b/crates/hotfix/src/session.rs @@ -12,9 +12,9 @@ use crate::message::heartbeat::Heartbeat; use crate::message::logon::{Logon, ResetSeqNumConfig}; use crate::message::parser::RawFixMessage; use crate::message::{InboundMessage, generate_message}; +use crate::session::error::{InternalSendError, SessionOperationError}; use crate::store::MessageStore; use crate::transport::writer::WriterRef; -use anyhow::{Context, Result}; use chrono::Utc; use hotfix_message::dict::Dictionary; use hotfix_message::message::{Config as MessageConfig, Message}; @@ -118,7 +118,10 @@ where } } - async fn on_incoming(&mut self, raw_message: RawFixMessage) -> Result<()> { + async fn on_incoming( + &mut self, + raw_message: RawFixMessage, + ) -> Result<(), SessionOperationError> { debug!("received message: {}", raw_message); if !self.state.is_expecting_test_response() { // if we are not awaiting a specific test response, any message can reset the timer @@ -144,9 +147,12 @@ where let reject = Reject::new(msg_seq_num) .session_reject_reason(SessionRejectReason::InvalidTagNumber) .text(&format!("invalid field {tag}")); - self.send_message(reject) - .await - .context("failed to send reject")?; + self.send_message(reject).await.map_err(|e| { + SessionOperationError::Send { + source: e, + context: "reject for invalid field", + } + })?; } Err(err) => { error!("failed to get message seq num: {:?}", err); @@ -168,9 +174,12 @@ where SessionRejectReason::RepeatingGroupFieldsOutOfOrder, ) .text(&format!("field appears in incorrect order:{tag}")); - self.send_message(reject) - .await - .context("failed to send reject")?; + self.send_message(reject).await.map_err(|e| { + SessionOperationError::Send { + source: e, + context: "reject for invalid group order", + } + })?; } Err(err) => { error!("failed to get message seq num: {:?}", err); @@ -186,8 +195,11 @@ where Ok(()) } - async fn process_message(&mut self, message: Message) -> Result<()> { - let message_type = message.header().get(MSG_TYPE)?; + async fn process_message(&mut self, message: Message) -> Result<(), SessionOperationError> { + let message_type: &str = message + .header() + .get(MSG_TYPE) + .map_err(|_| SessionOperationError::MissingField("MSG_TYPE"))?; if let SessionState::AwaitingResend(state) = &mut self.state { let seq_number = get_msg_seq_num(&message); @@ -233,7 +245,10 @@ where Ok(()) } - async fn process_app_message(&mut self, message: &Message) -> Result<()> { + async fn process_app_message( + &mut self, + message: &Message, + ) -> Result<(), SessionOperationError> { match self.verify_message(message, true) { Ok(_) => { let parsed_message = Inbound::parse(message); @@ -244,18 +259,18 @@ where error!("failed to send inbound message to application"); self.state.disconnect_writer().await; } - self.store.increment_target_seq_number().await?; + self.store + .increment_target_seq_number() + .await + .map_err(SessionOperationError::Store)?; } - Err(err) => self - .handle_verification_error(err) - .await - .context("failed to handle verification error")?, + Err(err) => self.handle_verification_error(err).await?, } Ok(()) } - async fn check_end_of_resend(&mut self) -> Result<()> { + async fn check_end_of_resend(&mut self) -> Result<(), SessionOperationError> { let ended_state = if let SessionState::AwaitingResend(state) = &mut self.state { if self.store.next_target_seq_number() > state.end_seq_number { let new_state = @@ -299,7 +314,7 @@ where verify_message(message, &self.config, expected_seq_number) } - async fn on_connect(&mut self, writer: WriterRef) -> Result<()> { + async fn on_connect(&mut self, writer: WriterRef) -> Result<(), SessionOperationError> { self.state = SessionState::AwaitingLogon { writer, logon_sent: false, @@ -328,7 +343,7 @@ where } } - async fn on_logon(&mut self, message: &Message) -> Result<()> { + async fn on_logon(&mut self, message: &Message) -> Result<(), SessionOperationError> { if let SessionState::AwaitingLogon { writer, .. } = &self.state { match self.verify_message(message, true) { Ok(_) => { @@ -336,12 +351,12 @@ where self.state = SessionState::new_active(writer.clone(), self.config.heartbeat_interval); self.application.on_logon().await; - self.store.increment_target_seq_number().await?; + self.store + .increment_target_seq_number() + .await + .map_err(SessionOperationError::Store)?; } - Err(err) => self - .handle_verification_error(err) - .await - .context("failed to handle verification error")?, + Err(err) => self.handle_verification_error(err).await?, } } else { error!("received unexpected logon message"); @@ -350,7 +365,7 @@ where Ok(()) } - async fn on_logout(&mut self) -> Result<()> { + async fn on_logout(&mut self) -> Result<(), SessionOperationError> { if self.state.is_logged_on() { self.send_logout("Logout acknowledged").await?; } @@ -367,11 +382,14 @@ where } } - self.store.increment_target_seq_number().await?; + self.store + .increment_target_seq_number() + .await + .map_err(SessionOperationError::Store)?; Ok(()) } - async fn on_heartbeat(&mut self, message: &Message) -> Result<()> { + async fn on_heartbeat(&mut self, message: &Message) -> Result<(), SessionOperationError> { if let (Some(expected_req_id), Ok(message_req_id)) = ( &self.state.expected_test_response_id(), message.get::<&str>(TEST_REQ_ID), @@ -381,26 +399,35 @@ where self.reset_peer_timer(None); } - self.store.increment_target_seq_number().await?; + self.store + .increment_target_seq_number() + .await + .map_err(SessionOperationError::Store)?; Ok(()) } - async fn on_test_request(&mut self, message: &Message) -> Result<()> { + async fn on_test_request(&mut self, message: &Message) -> Result<(), SessionOperationError> { let req_id: &str = message.get(TEST_REQ_ID).unwrap_or_else(|_| { // TODO: send reject? todo!() }); - self.store.increment_target_seq_number().await?; + self.store + .increment_target_seq_number() + .await + .map_err(SessionOperationError::Store)?; self.send_message(Heartbeat::for_request(req_id.to_string())) .await - .context("failed to send heartbeat in response to test request")?; + .map_err(|e| SessionOperationError::Send { + source: e, + context: "heartbeat response", + })?; Ok(()) } - async fn on_resend_request(&mut self, message: &Message) -> Result<()> { + async fn on_resend_request(&mut self, message: &Message) -> Result<(), SessionOperationError> { if !self.state.is_connected() { warn!("received resend request while disconnected, ignoring"); } @@ -413,7 +440,10 @@ where .text("missing begin sequence number for resend request"); self.send_message(reject) .await - .context("failed to send reject for invalid resend request")?; + .map_err(|e| SessionOperationError::Send { + source: e, + context: "reject for missing BEGIN_SEQ_NO", + })?; return Ok(()); } }; @@ -433,12 +463,18 @@ where .text("missing end sequence number for resend request"); self.send_message(reject) .await - .context("failed to send reject for invalid resend request")?; + .map_err(|e| SessionOperationError::Send { + source: e, + context: "reject for missing END_SEQ_NO", + })?; return Ok(()); } }; - self.store.increment_target_seq_number().await?; + self.store + .increment_target_seq_number() + .await + .map_err(SessionOperationError::Store)?; self.resend_messages(begin_seq_number, end_seq_number, message) .await?; @@ -450,17 +486,20 @@ where /// /// Returns whether the message should be processed as usual /// and whether the target sequence number should be incremented. - async fn on_reject(&mut self, message: &Message) -> Result<()> { + async fn on_reject(&mut self, message: &Message) -> Result<(), SessionOperationError> { if let Ok(seq_num) = message.get::(MSG_SEQ_NUM) && seq_num == self.store.next_target_seq_number() { - self.store.increment_target_seq_number().await?; + self.store + .increment_target_seq_number() + .await + .map_err(SessionOperationError::Store)?; } Ok(()) } - async fn on_sequence_reset(&mut self, message: &Message) -> Result<()> { + async fn on_sequence_reset(&mut self, message: &Message) -> Result<(), SessionOperationError> { let msg_seq_num = get_msg_seq_num(message); let is_gap_fill: bool = message.get(GAP_FILL_FLAG).unwrap_or(false); if let Err(err) = self.verify_message(message, is_gap_fill) { @@ -478,9 +517,12 @@ where let reject = Reject::new(msg_seq_num) .session_reject_reason(SessionRejectReason::RequiredTagMissing) .text("missing NewSeqNo tag in sequence reset message"); - self.send_message(reject).await.context( - "failed to send reject message in response to invalid sequence reset message", - )?; + self.send_message(reject) + .await + .map_err(|e| SessionOperationError::Send { + source: e, + context: "reject for missing NEW_SEQ_NO", + })?; // note: we don't increment the target seq number here // this is an ambiguous case in the specification, but leaving the @@ -500,17 +542,26 @@ where let reject = Reject::new(msg_seq_num) .session_reject_reason(SessionRejectReason::ValueIsIncorrect) .text(&text); - self.send_message(reject).await.context( - "failed to send reject message in response to invalid sequence reset message", - )?; + self.send_message(reject) + .await + .map_err(|e| SessionOperationError::Send { + source: e, + context: "reject for invalid sequence reset", + })?; return Ok(()); } - self.store.set_target_seq_number(end - 1).await?; + self.store + .set_target_seq_number(end - 1) + .await + .map_err(SessionOperationError::Store)?; Ok(()) } - async fn handle_verification_error(&mut self, error: MessageVerificationError) -> Result<()> { + async fn handle_verification_error( + &mut self, + error: MessageVerificationError, + ) -> Result<(), SessionOperationError> { match error { MessageVerificationError::SeqNumberTooLow { expected, @@ -607,7 +658,11 @@ where self.state = SessionState::new_disconnected(false, &reason); } - async fn handle_sequence_number_too_high(&mut self, expected: u64, actual: u64) -> Result<()> { + async fn handle_sequence_number_too_high( + &mut self, + expected: u64, + actual: u64, + ) -> Result<(), SessionOperationError> { match self .state .try_transition_to_awaiting_resend(expected, actual) @@ -616,9 +671,7 @@ where debug!( "we are behind target (ours: {expected}, theirs: {actual}), requesting resend." ); - self.send_resend_request(expected, actual) - .await - .context("failed to send resend request")?; + self.send_resend_request(expected, actual).await?; } AwaitingResendTransitionOutcome::InvalidState(reason) => { error!("failed to request resend: {reason}"); @@ -691,13 +744,18 @@ where }; } - async fn resend_messages(&mut self, begin: u64, end: u64, _message: &Message) -> Result<()> { + async fn resend_messages( + &mut self, + begin: u64, + end: u64, + _message: &Message, + ) -> Result<(), SessionOperationError> { info!(begin, end, "resending messages as requested"); let messages = self .store .get_slice(begin as usize, end as usize) .await - .context("failed to retrieve messages from store")?; + .map_err(SessionOperationError::Store)?; let no = messages.len(); debug!(number_of_messages = no, "number of messages"); @@ -710,12 +768,16 @@ where .message_builder .build(msg.as_slice()) .into_message() - .with_context(|| format!("failed to build message for raw message: {msg:?}"))?; + .ok_or_else(|| { + SessionOperationError::StoredMessageParse(format!( + "failed to build message for raw message: {msg:?}" + )) + })?; sequence_number = get_msg_seq_num(&message); let message_type: String = message .header() .get::<&str>(MSG_TYPE) - .context("message type in message to resend is unexpectedly missing")? + .map_err(|_| SessionOperationError::MissingField("MSG_TYPE"))? .to_string(); if is_admin(message_type.as_str()) { @@ -728,9 +790,7 @@ where if let Some(begin) = reset_start { let end = sequence_number; Self::log_skipped_admin_messages(begin, end); - self.send_sequence_reset(begin, end) - .await - .context("failed to send sequence reset")?; + self.send_sequence_reset(begin, end).await?; reset_start = None; } @@ -744,7 +804,7 @@ where message_type.as_bytes(), message .encode(&self.message_config) - .context("failed to encode message")?, + .map_err(SessionOperationError::MessageEncoding)?, ) .await; @@ -759,9 +819,7 @@ where // the final reset if needed let end = sequence_number; Self::log_skipped_admin_messages(begin, end); - self.send_sequence_reset(begin, end) - .await - .context("failed to send sequence reset")?; + self.send_sequence_reset(begin, end).await?; } Ok(()) @@ -791,7 +849,7 @@ where match self.application.on_outbound_message(&message).await { OutboundDecision::Send => { - let sequence_number = self.send_message(message).await?; + let sequence_number = self.send_message(message).await.map_err(SendError::from)?; Ok(SendOutcome::Sent { sequence_number }) } OutboundDecision::Drop => { @@ -806,7 +864,10 @@ where } } - async fn send_message(&mut self, message: impl OutboundMessage) -> Result { + async fn send_message( + &mut self, + message: impl OutboundMessage, + ) -> Result { let seq_num = self.store.next_sender_seq_number(); let msg_type = message.message_type().as_bytes().to_vec(); let msg = generate_message( @@ -817,7 +878,7 @@ where message, ) .map_err(|e| { - SendError::Persist(crate::store::StoreError::PersistMessage { + InternalSendError::Persist(crate::store::StoreError::PersistMessage { sequence_number: seq_num, source: e.into(), }) @@ -826,12 +887,12 @@ where self.store .increment_sender_seq_number() .await - .map_err(SendError::SequenceNumber)?; + .map_err(InternalSendError::SequenceNumber)?; self.store .add(seq_num, &msg) .await - .map_err(SendError::Persist)?; + .map_err(InternalSendError::Persist)?; self.send_raw(&msg_type, msg).await; @@ -845,7 +906,11 @@ where self.reset_heartbeat_timer(); } - async fn send_sequence_reset(&mut self, begin: u64, end: u64) -> Result<()> { + async fn send_sequence_reset( + &mut self, + begin: u64, + end: u64, + ) -> Result<(), SessionOperationError> { let sequence_reset = SequenceReset { gap_fill: true, new_seq_no: end, @@ -857,7 +922,7 @@ where begin, sequence_reset, ) - .context("failed to generate message")?; + .map_err(SessionOperationError::MessageEncoding)?; self.send_raw(b"4", raw_message).await; debug!(begin, end, "sent reset sequence"); @@ -865,15 +930,27 @@ where Ok(()) } - async fn send_resend_request(&mut self, begin: u64, end: u64) -> Result<()> { + async fn send_resend_request( + &mut self, + begin: u64, + end: u64, + ) -> Result<(), SessionOperationError> { let request = ResendRequest::new(begin, end); - self.send_message(request).await.map(|_| ())?; - Ok(()) + self.send_message(request) + .await + .map(|_| ()) + .map_err(|e| SessionOperationError::Send { + source: e, + context: "resend request", + }) } - async fn send_logon(&mut self) -> Result<()> { + async fn send_logon(&mut self) -> Result<(), SessionOperationError> { let reset_config = if self.config.reset_on_logon || self.reset_on_next_logon { - self.store.reset().await?; + self.store + .reset() + .await + .map_err(SessionOperationError::Store)?; ResetSeqNumConfig::Reset } else { ResetSeqNumConfig::NoReset(Some(self.store.next_target_seq_number())) @@ -882,14 +959,24 @@ where let logon = Logon::new(self.config.heartbeat_interval, reset_config); - self.send_message(logon).await.map(|_| ())?; - Ok(()) + self.send_message(logon) + .await + .map(|_| ()) + .map_err(|e| SessionOperationError::Send { + source: e, + context: "logon", + }) } - async fn send_logout(&mut self, reason: &str) -> Result<()> { + async fn send_logout(&mut self, reason: &str) -> Result<(), SessionOperationError> { let logout = Logout::with_reason(reason.to_string()); - self.send_message(logout).await.map(|_| ())?; - Ok(()) + self.send_message(logout) + .await + .map(|_| ()) + .map_err(|e| SessionOperationError::Send { + source: e, + context: "logout", + }) } /// Sends a logout message and immediately disconnects the counterparty. @@ -911,7 +998,11 @@ where /// The session waits for a configurable timeout period for the counterparty to /// respond with a `Logout` message. If no response is received within the timeout /// period, it disconnects the counterparty. - async fn initiate_graceful_logout(&mut self, reason: &str, reconnect: bool) -> Result<()> { + async fn initiate_graceful_logout( + &mut self, + reason: &str, + reconnect: bool, + ) -> Result<(), SessionOperationError> { if self.state.try_transition_to_awaiting_logout( Duration::from_secs(self.config.logout_timeout), reconnect, diff --git a/crates/hotfix/src/session/error.rs b/crates/hotfix/src/session/error.rs index 9180d72..5643f03 100644 --- a/crates/hotfix/src/session/error.rs +++ b/crates/hotfix/src/session/error.rs @@ -1,4 +1,5 @@ use crate::store::StoreError; +use hotfix_message::error::EncodingError; use thiserror::Error; #[derive(Debug, Error)] @@ -46,6 +47,66 @@ pub enum SendError { #[error("session terminated by application")] SessionTerminated, - #[error("confirmation channel closed")] - ConfirmationLost, + /// The session task is no longer running. + #[error("session is no longer available")] + SessionGone, +} + +/// Error that can occur when sending a message internally within the session. +/// +/// This is a subset of `SendError` without `SessionTerminated` and `SessionGone`, +/// which only make sense in the context of the public API. +#[derive(Debug, Error)] +pub(crate) enum InternalSendError { + /// The session is disconnected. Currently unused as internal sends don't + /// check connection state, but kept for symmetry with `SendError`. + #[allow(dead_code)] + #[error("session is disconnected")] + Disconnected, + + #[error("failed to persist message")] + Persist(#[source] StoreError), + + #[error("failed to update sequence number")] + SequenceNumber(#[source] StoreError), +} + +impl From for SendError { + fn from(err: InternalSendError) -> Self { + match err { + InternalSendError::Disconnected => SendError::Disconnected, + InternalSendError::Persist(e) => SendError::Persist(e), + InternalSendError::SequenceNumber(e) => SendError::SequenceNumber(e), + } + } +} + +/// Error that can occur during internal session operations. +/// +/// This replaces anyhow::Context wrapping with structured error variants. +#[derive(Debug, Error)] +pub(crate) enum SessionOperationError { + /// Failed to send a message. + #[error("failed to send {context}")] + Send { + #[source] + source: InternalSendError, + context: &'static str, + }, + + /// A store operation failed. + #[error("store operation failed")] + Store(#[source] StoreError), + + /// Failed to encode a message. + #[error("failed to encode message")] + MessageEncoding(#[source] EncodingError), + + /// Failed to parse a stored message. + #[error("failed to parse stored message: {0}")] + StoredMessageParse(String), + + /// A required field was missing from a message. + #[error("missing required field: {0}")] + MissingField(&'static str), } diff --git a/crates/hotfix/src/session/session_handle.rs b/crates/hotfix/src/session/session_handle.rs index f4a37e9..bb5ba38 100644 --- a/crates/hotfix/src/session/session_handle.rs +++ b/crates/hotfix/src/session/session_handle.rs @@ -41,7 +41,7 @@ impl SessionHandle { .await .map_err(|_| SendError::Disconnected)?; - rx.await.map_err(|_| SendError::ConfirmationLost)? + rx.await.map_err(|_| SendError::SessionGone)? } /// Sends a message without waiting for confirmation. From f3c004b01fec966194c8371baca5409baf09c916 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Wed, 4 Feb 2026 17:03:36 +0100 Subject: [PATCH 05/11] Make fix44 a default feature --- Cargo.lock | 2 -- crates/hotfix/Cargo.toml | 4 +--- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6bc59a9..e3bfefa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1286,7 +1286,6 @@ dependencies = [ "async-trait", "chrono", "chrono-tz", - "futures", "hotfix-message", "hotfix-store", "hotfix-store-mongodb", @@ -1302,7 +1301,6 @@ dependencies = [ "tokio-rustls", "toml", "tracing", - "uuid", "webpki-roots", ] diff --git a/crates/hotfix/Cargo.toml b/crates/hotfix/Cargo.toml index e9ffc0b..7913175 100644 --- a/crates/hotfix/Cargo.toml +++ b/crates/hotfix/Cargo.toml @@ -12,7 +12,7 @@ keywords.workspace = true categories.workspace = true [features] -default = ["test-utils"] +default = ["fix44", "test-utils"] fix44 = ["hotfix-message/fix44"] mongodb = ["dep:hotfix-store-mongodb"] test-utils = ["hotfix-store/test-utils"] @@ -29,7 +29,6 @@ anyhow = { workspace = true } async-trait = { workspace = true } chrono = { workspace = true, features = ["serde"] } chrono-tz = { workspace = true, features = ["serde"] } -futures = { workspace = true } rustls-pki-types = { workspace = true } rustls = { workspace = true } rustls-native-certs = { workspace = true } @@ -41,7 +40,6 @@ tokio = { workspace = true, features = ["full"] } tokio-rustls = { workspace = true } toml = { workspace = true } tracing = { workspace = true } -uuid = { workspace = true, features = ["v4"] } [dev-dependencies] hotfix-message = { version = "0.3.0", path = "../hotfix-message", features = ["fix44", "utils-chrono"] } From 976df058f3df38836c32f970bcaf53063086a584 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Wed, 4 Feb 2026 17:10:55 +0100 Subject: [PATCH 06/11] Clean up error mapping in session layer --- crates/hotfix/src/session.rs | 145 ++++++++--------------------- crates/hotfix/src/session/error.rs | 16 +++- 2 files changed, 53 insertions(+), 108 deletions(-) diff --git a/crates/hotfix/src/session.rs b/crates/hotfix/src/session.rs index 0be0355..675e333 100644 --- a/crates/hotfix/src/session.rs +++ b/crates/hotfix/src/session.rs @@ -6,15 +6,6 @@ mod session_handle; pub mod session_ref; mod state; -use crate::config::SessionConfig; -use crate::message::OutboundMessage; -use crate::message::heartbeat::Heartbeat; -use crate::message::logon::{Logon, ResetSeqNumConfig}; -use crate::message::parser::RawFixMessage; -use crate::message::{InboundMessage, generate_message}; -use crate::session::error::{InternalSendError, SessionOperationError}; -use crate::store::MessageStore; -use crate::transport::writer::WriterRef; use chrono::Utc; use hotfix_message::dict::Dictionary; use hotfix_message::message::{Config as MessageConfig, Message}; @@ -27,16 +18,23 @@ use tracing::{debug, enabled, error, info, warn}; use crate::Application; use crate::application::{InboundDecision, OutboundDecision}; +use crate::config::SessionConfig; +use crate::message::OutboundMessage; +use crate::message::heartbeat::Heartbeat; +use crate::message::logon::{Logon, ResetSeqNumConfig}; use crate::message::logout::Logout; +use crate::message::parser::RawFixMessage; use crate::message::reject::Reject; use crate::message::resend_request::ResendRequest; use crate::message::sequence_reset::SequenceReset; use crate::message::test_request::TestRequest; use crate::message::verification::verify_message; use crate::message::verification_error::{CompIdType, MessageVerificationError}; +use crate::message::{InboundMessage, generate_message}; use crate::message_utils::{is_admin, prepare_message_for_resend}; use crate::session::admin_request::AdminRequest; use crate::session::error::SessionCreationError; +use crate::session::error::{InternalSendError, InternalSendResultExt, SessionOperationError}; pub use crate::session::error::{SendError, SendOutcome}; pub use crate::session::info::{SessionInfo, Status}; pub use crate::session::session_handle::SessionHandle; @@ -48,6 +46,8 @@ use crate::session::session_ref::OutboundRequest; use crate::session::state::SessionState; use crate::session::state::{AwaitingResendTransitionOutcome, TestRequestId}; use crate::session_schedule::{SessionPeriodComparison, SessionSchedule}; +use crate::store::MessageStore; +use crate::transport::writer::WriterRef; use event::SessionEvent; use hotfix_message::parsed_message::{InvalidReason, ParsedMessage}; use hotfix_message::session_fields::{ @@ -147,12 +147,9 @@ where let reject = Reject::new(msg_seq_num) .session_reject_reason(SessionRejectReason::InvalidTagNumber) .text(&format!("invalid field {tag}")); - self.send_message(reject).await.map_err(|e| { - SessionOperationError::Send { - source: e, - context: "reject for invalid field", - } - })?; + self.send_message(reject) + .await + .with_send_context("reject for invalid field")?; } Err(err) => { error!("failed to get message seq num: {:?}", err); @@ -174,12 +171,9 @@ where SessionRejectReason::RepeatingGroupFieldsOutOfOrder, ) .text(&format!("field appears in incorrect order:{tag}")); - self.send_message(reject).await.map_err(|e| { - SessionOperationError::Send { - source: e, - context: "reject for invalid group order", - } - })?; + self.send_message(reject) + .await + .with_send_context("reject for invalid group order")?; } Err(err) => { error!("failed to get message seq num: {:?}", err); @@ -259,10 +253,7 @@ where error!("failed to send inbound message to application"); self.state.disconnect_writer().await; } - self.store - .increment_target_seq_number() - .await - .map_err(SessionOperationError::Store)?; + self.store.increment_target_seq_number().await?; } Err(err) => self.handle_verification_error(err).await?, } @@ -351,10 +342,7 @@ where self.state = SessionState::new_active(writer.clone(), self.config.heartbeat_interval); self.application.on_logon().await; - self.store - .increment_target_seq_number() - .await - .map_err(SessionOperationError::Store)?; + self.store.increment_target_seq_number().await?; } Err(err) => self.handle_verification_error(err).await?, } @@ -382,10 +370,7 @@ where } } - self.store - .increment_target_seq_number() - .await - .map_err(SessionOperationError::Store)?; + self.store.increment_target_seq_number().await?; Ok(()) } @@ -399,10 +384,7 @@ where self.reset_peer_timer(None); } - self.store - .increment_target_seq_number() - .await - .map_err(SessionOperationError::Store)?; + self.store.increment_target_seq_number().await?; Ok(()) } @@ -412,17 +394,11 @@ where todo!() }); - self.store - .increment_target_seq_number() - .await - .map_err(SessionOperationError::Store)?; + self.store.increment_target_seq_number().await?; self.send_message(Heartbeat::for_request(req_id.to_string())) .await - .map_err(|e| SessionOperationError::Send { - source: e, - context: "heartbeat response", - })?; + .with_send_context("heartbeat response")?; Ok(()) } @@ -440,10 +416,7 @@ where .text("missing begin sequence number for resend request"); self.send_message(reject) .await - .map_err(|e| SessionOperationError::Send { - source: e, - context: "reject for missing BEGIN_SEQ_NO", - })?; + .with_send_context("reject for missing BEGIN_SEQ_NO")?; return Ok(()); } }; @@ -463,18 +436,12 @@ where .text("missing end sequence number for resend request"); self.send_message(reject) .await - .map_err(|e| SessionOperationError::Send { - source: e, - context: "reject for missing END_SEQ_NO", - })?; + .with_send_context("reject for missing END_SEQ_NO")?; return Ok(()); } }; - self.store - .increment_target_seq_number() - .await - .map_err(SessionOperationError::Store)?; + self.store.increment_target_seq_number().await?; self.resend_messages(begin_seq_number, end_seq_number, message) .await?; @@ -490,10 +457,7 @@ where if let Ok(seq_num) = message.get::(MSG_SEQ_NUM) && seq_num == self.store.next_target_seq_number() { - self.store - .increment_target_seq_number() - .await - .map_err(SessionOperationError::Store)?; + self.store.increment_target_seq_number().await?; } Ok(()) @@ -519,10 +483,7 @@ where .text("missing NewSeqNo tag in sequence reset message"); self.send_message(reject) .await - .map_err(|e| SessionOperationError::Send { - source: e, - context: "reject for missing NEW_SEQ_NO", - })?; + .with_send_context("reject for missing NEW_SEQ_NO")?; // note: we don't increment the target seq number here // this is an ambiguous case in the specification, but leaving the @@ -544,17 +505,11 @@ where .text(&text); self.send_message(reject) .await - .map_err(|e| SessionOperationError::Send { - source: e, - context: "reject for invalid sequence reset", - })?; + .with_send_context("reject for invalid sequence reset")?; return Ok(()); } - self.store - .set_target_seq_number(end - 1) - .await - .map_err(SessionOperationError::Store)?; + self.store.set_target_seq_number(end - 1).await?; Ok(()) } @@ -751,11 +706,7 @@ where _message: &Message, ) -> Result<(), SessionOperationError> { info!(begin, end, "resending messages as requested"); - let messages = self - .store - .get_slice(begin as usize, end as usize) - .await - .map_err(SessionOperationError::Store)?; + let messages = self.store.get_slice(begin as usize, end as usize).await?; let no = messages.len(); debug!(number_of_messages = no, "number of messages"); @@ -802,9 +753,7 @@ where } self.send_raw( message_type.as_bytes(), - message - .encode(&self.message_config) - .map_err(SessionOperationError::MessageEncoding)?, + message.encode(&self.message_config)?, ) .await; @@ -849,7 +798,7 @@ where match self.application.on_outbound_message(&message).await { OutboundDecision::Send => { - let sequence_number = self.send_message(message).await.map_err(SendError::from)?; + let sequence_number = self.send_message(message).await?; Ok(SendOutcome::Sent { sequence_number }) } OutboundDecision::Drop => { @@ -921,8 +870,7 @@ where &self.config.target_comp_id, begin, sequence_reset, - ) - .map_err(SessionOperationError::MessageEncoding)?; + )?; self.send_raw(b"4", raw_message).await; debug!(begin, end, "sent reset sequence"); @@ -938,19 +886,13 @@ where let request = ResendRequest::new(begin, end); self.send_message(request) .await - .map(|_| ()) - .map_err(|e| SessionOperationError::Send { - source: e, - context: "resend request", - }) + .with_send_context("resend request")?; + Ok(()) } async fn send_logon(&mut self) -> Result<(), SessionOperationError> { let reset_config = if self.config.reset_on_logon || self.reset_on_next_logon { - self.store - .reset() - .await - .map_err(SessionOperationError::Store)?; + self.store.reset().await?; ResetSeqNumConfig::Reset } else { ResetSeqNumConfig::NoReset(Some(self.store.next_target_seq_number())) @@ -958,25 +900,16 @@ where self.reset_on_next_logon = false; let logon = Logon::new(self.config.heartbeat_interval, reset_config); - - self.send_message(logon) - .await - .map(|_| ()) - .map_err(|e| SessionOperationError::Send { - source: e, - context: "logon", - }) + self.send_message(logon).await.with_send_context("logon")?; + Ok(()) } async fn send_logout(&mut self, reason: &str) -> Result<(), SessionOperationError> { let logout = Logout::with_reason(reason.to_string()); self.send_message(logout) .await - .map(|_| ()) - .map_err(|e| SessionOperationError::Send { - source: e, - context: "logout", - }) + .with_send_context("logout")?; + Ok(()) } /// Sends a logout message and immediately disconnects the counterparty. diff --git a/crates/hotfix/src/session/error.rs b/crates/hotfix/src/session/error.rs index 5643f03..3e85f9f 100644 --- a/crates/hotfix/src/session/error.rs +++ b/crates/hotfix/src/session/error.rs @@ -96,11 +96,11 @@ pub(crate) enum SessionOperationError { /// A store operation failed. #[error("store operation failed")] - Store(#[source] StoreError), + Store(#[from] StoreError), /// Failed to encode a message. #[error("failed to encode message")] - MessageEncoding(#[source] EncodingError), + MessageEncoding(#[from] EncodingError), /// Failed to parse a stored message. #[error("failed to parse stored message: {0}")] @@ -110,3 +110,15 @@ pub(crate) enum SessionOperationError { #[error("missing required field: {0}")] MissingField(&'static str), } + +/// Extension trait to convert `Result` to `Result` +/// with context about what send operation failed. +pub(crate) trait InternalSendResultExt { + fn with_send_context(self, context: &'static str) -> Result; +} + +impl InternalSendResultExt for Result { + fn with_send_context(self, context: &'static str) -> Result { + self.map_err(|source| SessionOperationError::Send { source, context }) + } +} From f8446fc93e516ac01164640b047fb2b2256dcbef Mon Sep 17 00:00:00 2001 From: David Steiner Date: Wed, 4 Feb 2026 17:44:25 +0100 Subject: [PATCH 07/11] Remove anyhow from session handle and initiator APIs --- crates/hotfix-web/src/session_controller.rs | 6 ++++-- crates/hotfix/src/initiator.rs | 11 ++++++----- crates/hotfix/src/session/error.rs | 12 ++++++++++++ crates/hotfix/src/session/session_handle.rs | 20 +++++--------------- crates/hotfix/src/session/session_ref.rs | 5 ++--- examples/load-testing/src/main.rs | 10 ++++++---- examples/simple-new-order/src/main.rs | 10 ++++++---- 7 files changed, 41 insertions(+), 33 deletions(-) diff --git a/crates/hotfix-web/src/session_controller.rs b/crates/hotfix-web/src/session_controller.rs index c2d99f2..c7ce02e 100644 --- a/crates/hotfix-web/src/session_controller.rs +++ b/crates/hotfix-web/src/session_controller.rs @@ -22,11 +22,13 @@ impl SessionController for HttpSessionController anyhow::Result<()> { - self.session_handle.request_reset_on_next_logon().await + self.session_handle.request_reset_on_next_logon().await?; + Ok(()) } async fn shutdown(&self, reconnect: bool) -> anyhow::Result<()> { - self.session_handle.shutdown(reconnect).await + self.session_handle.shutdown(reconnect).await?; + Ok(()) } } diff --git a/crates/hotfix/src/initiator.rs b/crates/hotfix/src/initiator.rs index 77087cd..d09705f 100644 --- a/crates/hotfix/src/initiator.rs +++ b/crates/hotfix/src/initiator.rs @@ -6,7 +6,6 @@ //! The initiator establishes the transport layer connection with //! the peer, and sends the initial Logon (35=A) message. For transport, //! `HotFIX` supports plain TCP and encrypted TLS over TCP connections. -use anyhow::Result; use std::time::Duration; use tokio::sync::watch; use tokio::time::sleep; @@ -15,7 +14,7 @@ use tracing::{debug, warn}; use crate::application::Application; use crate::config::SessionConfig; use crate::message::{InboundMessage, OutboundMessage}; -use crate::session::error::{SendError, SendOutcome}; +use crate::session::error::{SendError, SendOutcome, SessionCreationError}; use crate::session::{InternalSessionRef, SessionHandle}; use crate::store::MessageStore; use crate::transport::connect; @@ -32,7 +31,7 @@ impl Initiator { config: SessionConfig, application: impl Application, store: impl MessageStore + 'static, - ) -> Result { + ) -> Result { let session_ref = InternalSessionRef::new(config.clone(), application, store)?; let (completion_tx, completion_rx) = watch::channel(false); @@ -76,9 +75,11 @@ impl Initiator { self.session_handle.clone() } - pub async fn shutdown(self, reconnect: bool) -> Result<()> { + pub async fn shutdown(self, reconnect: bool) -> Result<(), SendError> { self.session_handle.shutdown(reconnect).await?; - tokio::time::timeout(Duration::from_secs(5), self.wait_for_shutdown()).await?; + tokio::time::timeout(Duration::from_secs(5), self.wait_for_shutdown()) + .await + .map_err(|_| SendError::SessionGone)?; Ok(()) } diff --git a/crates/hotfix/src/session/error.rs b/crates/hotfix/src/session/error.rs index 3e85f9f..858da9d 100644 --- a/crates/hotfix/src/session/error.rs +++ b/crates/hotfix/src/session/error.rs @@ -52,6 +52,18 @@ pub enum SendError { SessionGone, } +impl From> for SendError { + fn from(_: tokio::sync::mpsc::error::SendError) -> Self { + SendError::SessionGone + } +} + +impl From for SendError { + fn from(_: tokio::sync::oneshot::error::RecvError) -> Self { + SendError::SessionGone + } +} + /// Error that can occur when sending a message internally within the session. /// /// This is a subset of `SendError` without `SessionTerminated` and `SessionGone`, diff --git a/crates/hotfix/src/session/session_handle.rs b/crates/hotfix/src/session/session_handle.rs index bb5ba38..f02a62b 100644 --- a/crates/hotfix/src/session/session_handle.rs +++ b/crates/hotfix/src/session/session_handle.rs @@ -36,12 +36,8 @@ impl SessionHandle { message: msg, confirm: Some(tx), }; - self.outbound_message_sender - .send(request) - .await - .map_err(|_| SendError::Disconnected)?; - - rx.await.map_err(|_| SendError::SessionGone)? + self.outbound_message_sender.send(request).await?; + rx.await? } /// Sends a message without waiting for confirmation. @@ -53,27 +49,21 @@ impl SessionHandle { message: msg, confirm: None, }; - self.outbound_message_sender - .send(request) - .await - .map_err(|_| SendError::Disconnected)?; - + self.outbound_message_sender.send(request).await?; Ok(()) } - pub async fn shutdown(&self, reconnect: bool) -> anyhow::Result<()> { + pub async fn shutdown(&self, reconnect: bool) -> Result<(), SendError> { self.admin_request_sender .send(AdminRequest::InitiateGracefulShutdown { reconnect }) .await?; - Ok(()) } - pub async fn request_reset_on_next_logon(&self) -> anyhow::Result<()> { + pub async fn request_reset_on_next_logon(&self) -> Result<(), SendError> { self.admin_request_sender .send(AdminRequest::ResetSequenceNumbersOnNextLogon) .await?; - Ok(()) } } diff --git a/crates/hotfix/src/session/session_ref.rs b/crates/hotfix/src/session/session_ref.rs index d7e8095..5165d7c 100644 --- a/crates/hotfix/src/session/session_ref.rs +++ b/crates/hotfix/src/session/session_ref.rs @@ -1,4 +1,3 @@ -use anyhow::Result; use thiserror::Error; use tokio::sync::{mpsc, oneshot}; use tracing::debug; @@ -7,7 +6,7 @@ use crate::config::SessionConfig; use crate::message::{InboundMessage, OutboundMessage, RawFixMessage}; use crate::session::Session; use crate::session::admin_request::AdminRequest; -use crate::session::error::{SendError, SendOutcome}; +use crate::session::error::{SendError, SendOutcome, SessionCreationError}; use crate::session::event::{AwaitingActiveSessionResponse, SessionEvent}; use crate::store::MessageStore; use crate::transport::writer::WriterRef; @@ -31,7 +30,7 @@ impl InternalSessionRef { config: SessionConfig, application: impl Application, store: impl MessageStore + 'static, - ) -> Result { + ) -> Result { let (event_sender, event_receiver) = mpsc::channel::(100); let (outbound_message_sender, outbound_message_receiver) = mpsc::channel::>(10); diff --git a/examples/load-testing/src/main.rs b/examples/load-testing/src/main.rs index 69ee6ee..7f2ebe1 100644 --- a/examples/load-testing/src/main.rs +++ b/examples/load-testing/src/main.rs @@ -94,17 +94,19 @@ async fn start_session( db_config: Database, app: LoadTestingApplication, ) -> Result> { - match db_config { + let initiator = match db_config { Database::Memory => { let store = hotfix::store::in_memory::InMemoryMessageStore::default(); - Initiator::start(session_config, app, store).await + Initiator::start(session_config, app, store).await? } Database::File => { let store = hotfix::store::file::FileStore::new("data", "load-testing-store") .expect("be able to create store"); - Initiator::start(session_config, app, store).await + Initiator::start(session_config, app, store).await? } - } + }; + + Ok(initiator) } async fn submit_messages(session_handle: SessionHandle, message_count: u32) { diff --git a/examples/simple-new-order/src/main.rs b/examples/simple-new-order/src/main.rs index f5c03b0..f56a4ac 100644 --- a/examples/simple-new-order/src/main.rs +++ b/examples/simple-new-order/src/main.rs @@ -152,17 +152,19 @@ async fn start_session( .pop() .context("config must include a session")?; - match db_config { + let initiator = match db_config { Database::Memory => { let store = hotfix::store::in_memory::InMemoryMessageStore::default(); - Initiator::start(session_config, app, store).await + Initiator::start(session_config, app, store).await? } Database::File => { let store = hotfix::store::file::FileStore::new("data", "simple-new-order-store") .context("failed to create file store")?; - Initiator::start(session_config, app, store).await + Initiator::start(session_config, app, store).await? } - } + }; + + Ok(initiator) } async fn start_web_service( From ea77308ada2dbc3fe7b6e03b2f44831795eb2aac Mon Sep 17 00:00:00 2001 From: David Steiner Date: Wed, 4 Feb 2026 17:49:44 +0100 Subject: [PATCH 08/11] Remove remaining traces of anyhow in core hotfix crate --- Cargo.toml | 4 +++- crates/hotfix-web/src/session_controller.rs | 2 +- crates/hotfix/Cargo.toml | 6 +++--- crates/hotfix/src/session/session_handle.rs | 2 +- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 94a1583..fa7a253 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,13 +48,14 @@ rustls = "0.23" rustls-native-certs = "0.8" rustls-pemfile = "2.2" rustls-pki-types = { version = "1" } -webpki-roots = "1.0" +rcgen = "0.13" serde = "^1.0.177" serde_json = "1.0.143" smartstring = "1" strum = "0.27" strum_macros = "0.27" syn = "2" +tempfile = "3" testcontainers = "0.25" thiserror = "2" tokio = { version = "1" } @@ -65,6 +66,7 @@ tower = "0.5" tracing = "0.1" tracing-subscriber = "0.3" uuid = { version = "1.5.0" } +webpki-roots = "1.0" wiremock = "0.6" [workspace.lints.rust] diff --git a/crates/hotfix-web/src/session_controller.rs b/crates/hotfix-web/src/session_controller.rs index c7ce02e..73451ab 100644 --- a/crates/hotfix-web/src/session_controller.rs +++ b/crates/hotfix-web/src/session_controller.rs @@ -18,7 +18,7 @@ pub struct HttpSessionController { #[async_trait::async_trait] impl SessionController for HttpSessionController { async fn get_session_info(&self) -> anyhow::Result { - self.session_handle.get_session_info().await + Ok(self.session_handle.get_session_info().await?) } async fn request_reset_on_next_logon(&self) -> anyhow::Result<()> { diff --git a/crates/hotfix/Cargo.toml b/crates/hotfix/Cargo.toml index 7913175..12952e9 100644 --- a/crates/hotfix/Cargo.toml +++ b/crates/hotfix/Cargo.toml @@ -25,7 +25,6 @@ hotfix-message = { version = "0.3.0", path = "../hotfix-message", features = ["u hotfix-store = { version = "0.1.1", path = "../hotfix-store" } hotfix-store-mongodb = { version = "0.1.3", path = "../hotfix-store-mongodb", optional = true } -anyhow = { workspace = true } async-trait = { workspace = true } chrono = { workspace = true, features = ["serde"] } chrono-tz = { workspace = true, features = ["serde"] } @@ -44,7 +43,8 @@ tracing = { workspace = true } [dev-dependencies] hotfix-message = { version = "0.3.0", path = "../hotfix-message", features = ["fix44", "utils-chrono"] } -rcgen = "0.13" +anyhow = { workspace = true } +rcgen = { workspace = true } rustls = { workspace = true, features = ["ring"] } -tempfile = "3" +tempfile = { workspace = true } tokio = { workspace = true, features = ["test-util"] } diff --git a/crates/hotfix/src/session/session_handle.rs b/crates/hotfix/src/session/session_handle.rs index f02a62b..6a5e710 100644 --- a/crates/hotfix/src/session/session_handle.rs +++ b/crates/hotfix/src/session/session_handle.rs @@ -17,7 +17,7 @@ pub struct SessionHandle { } impl SessionHandle { - pub async fn get_session_info(&self) -> anyhow::Result { + pub async fn get_session_info(&self) -> Result { let (sender, receiver) = oneshot::channel::(); self.admin_request_sender .send(AdminRequest::RequestSessionInfo(sender)) From 1fe5adf9ad7b778442b9211ae06d841b6390b96d Mon Sep 17 00:00:00 2001 From: David Steiner Date: Wed, 4 Feb 2026 18:28:41 +0100 Subject: [PATCH 09/11] Add unit tests for session errors --- crates/hotfix/src/session/error.rs | 68 ++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/crates/hotfix/src/session/error.rs b/crates/hotfix/src/session/error.rs index 858da9d..e0d7688 100644 --- a/crates/hotfix/src/session/error.rs +++ b/crates/hotfix/src/session/error.rs @@ -134,3 +134,71 @@ impl InternalSendResultExt for Result { self.map_err(|source| SessionOperationError::Send { source, context }) } } + +#[cfg(test)] +mod tests { + use super::*; + + fn test_store_error() -> StoreError { + StoreError::Initialization("test".into()) + } + + #[test] + fn mpsc_send_error_converts_to_session_gone() { + let err: SendError = tokio::sync::mpsc::error::SendError(()).into(); + assert!(matches!(err, SendError::SessionGone)); + } + + #[tokio::test] + async fn oneshot_recv_error_converts_to_session_gone() { + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + drop(tx); + // await the receiver to get RecvError (not TryRecvError) + let recv_err = rx.await.unwrap_err(); + + let err: SendError = recv_err.into(); + assert!(matches!(err, SendError::SessionGone)); + } + + #[test] + fn internal_send_error_disconnected_converts_to_send_error() { + let internal_err = InternalSendError::Disconnected; + let send_err: SendError = internal_err.into(); + assert!(matches!(send_err, SendError::Disconnected)); + } + + #[test] + fn internal_send_error_persist_converts_to_send_error() { + let internal_err = InternalSendError::Persist(test_store_error()); + let send_err: SendError = internal_err.into(); + assert!(matches!(send_err, SendError::Persist(_))); + } + + #[test] + fn internal_send_error_sequence_number_converts_to_send_error() { + let internal_err = InternalSendError::SequenceNumber(test_store_error()); + let send_err: SendError = internal_err.into(); + assert!(matches!(send_err, SendError::SequenceNumber(_))); + } + + #[test] + fn with_send_context_converts_error() { + let result: Result<(), InternalSendError> = + Err(InternalSendError::Persist(test_store_error())); + + let op_err = result.with_send_context("heartbeat").unwrap_err(); + match op_err { + SessionOperationError::Send { context, .. } => { + assert_eq!(context, "heartbeat"); + } + _ => panic!("expected SessionOperationError::Send"), + } + } + + #[test] + fn with_send_context_passes_through_ok() { + let result: Result = Ok(42); + let op_result = result.with_send_context("heartbeat"); + assert_eq!(op_result.unwrap(), 42); + } +} From cf2fac18cf4e46c57a4e6f454aed01b5bce15eae Mon Sep 17 00:00:00 2001 From: David Steiner Date: Wed, 4 Feb 2026 18:52:50 +0100 Subject: [PATCH 10/11] Add better test coverage for initiator and fix bug with shut down session trying to reconnect --- crates/hotfix/src/initiator.rs | 155 ++++++++++++++++++++++++++++++++- crates/hotfix/src/session.rs | 7 +- 2 files changed, 158 insertions(+), 4 deletions(-) diff --git a/crates/hotfix/src/initiator.rs b/crates/hotfix/src/initiator.rs index d09705f..0ebe9dc 100644 --- a/crates/hotfix/src/initiator.rs +++ b/crates/hotfix/src/initiator.rs @@ -152,15 +152,22 @@ async fn establish_connection( completion_tx.send_replace(true); } -#[cfg(all(test, feature = "fix44"))] +#[cfg(test)] +#[allow(clippy::expect_used)] mod tests { use super::*; use crate::application::{Application, InboundDecision, OutboundDecision}; - use crate::message::InboundMessage; + use crate::message::logon::{Logon, ResetSeqNumConfig}; + use crate::message::logout::Logout; + use crate::message::parser::Parser; + use crate::message::{InboundMessage, generate_message}; use crate::store::in_memory::InMemoryMessageStore; + use hotfix_message::Part; use hotfix_message::message::Message; + use hotfix_message::session_fields::MSG_TYPE; use std::time::Duration; - use tokio::net::TcpListener; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::net::{TcpListener, TcpStream}; // Minimal message type for tests #[derive(Clone)] @@ -194,6 +201,90 @@ mod tests { async fn on_logon(&mut self) {} } + /// A minimal FIX counterparty for testing the Initiator over TCP. + struct TestCounterparty { + stream: TcpStream, + parser: Parser, + seq_num: u64, + // Counterparty's view: sender is TEST-TARGET, target is TEST-SENDER + sender_comp_id: String, + target_comp_id: String, + } + + impl TestCounterparty { + async fn accept(listener: &TcpListener, config: &SessionConfig) -> Self { + let (stream, _) = tokio::time::timeout(Duration::from_secs(2), listener.accept()) + .await + .expect("timeout waiting for connection") + .expect("failed to accept connection"); + + Self { + stream, + parser: Parser::default(), + seq_num: 1, + // Swap sender/target for counterparty perspective + sender_comp_id: config.target_comp_id.clone(), + target_comp_id: config.sender_comp_id.clone(), + } + } + + async fn read_message(&mut self) -> Message { + let mut buf = [0u8; 4096]; + loop { + let n = self.stream.read(&mut buf).await.expect("read failed"); + if n == 0 { + panic!("connection closed before receiving complete message"); + } + let messages = self.parser.parse(&buf[..n]); + if let Some(raw_msg) = messages.into_iter().next() { + let builder = hotfix_message::MessageBuilder::new( + hotfix_message::dict::Dictionary::fix44(), + hotfix_message::message::Config::default(), + ) + .expect("failed to create message builder"); + match builder.build(raw_msg.as_bytes()) { + hotfix_message::parsed_message::ParsedMessage::Valid(msg) => return msg, + _ => panic!("received invalid FIX message"), + } + } + } + } + + async fn expect_message(&mut self, expected_type: &str) -> Message { + let msg = tokio::time::timeout(Duration::from_secs(2), self.read_message()) + .await + .expect("timeout waiting for message"); + let msg_type: &str = msg.header().get(MSG_TYPE).expect("missing MSG_TYPE"); + assert_eq!(msg_type, expected_type, "unexpected message type"); + msg + } + + async fn send_logon(&mut self, heartbeat_interval: u64) { + let logon = Logon::new(heartbeat_interval, ResetSeqNumConfig::NoReset(None)); + self.send_message(logon).await; + } + + async fn send_logout(&mut self) { + self.send_message(Logout::default()).await; + } + + async fn send_message(&mut self, message: impl OutboundMessage) { + let raw = generate_message( + "FIX.4.4", + &self.sender_comp_id, + &self.target_comp_id, + self.seq_num, + message, + ) + .expect("failed to generate message"); + self.seq_num += 1; + self.stream + .write_all(&raw) + .await + .expect("failed to send message"); + } + } + fn create_test_config(host: &str, port: u16) -> SessionConfig { SessionConfig { begin_string: "FIX.4.4".to_string(), @@ -212,6 +303,27 @@ mod tests { } } + async fn given_logged_on_initiator() -> (Initiator, TestCounterparty) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + let config = create_test_config("127.0.0.1", port); + + let initiator = Initiator::start(config.clone(), NoOpApp, InMemoryMessageStore::default()) + .await + .unwrap(); + + let mut counterparty = TestCounterparty::accept(&listener, &config).await; + + // Complete the logon handshake + counterparty.expect_message("A").await; // Receive Logon + counterparty.send_logon(30).await; // Send Logon response + + // Give the session a moment to process the logon + sleep(Duration::from_millis(50)).await; + + (initiator, counterparty) + } + #[tokio::test] async fn test_start_creates_initiator_successfully() { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); @@ -321,4 +433,41 @@ mod tests { let result = initiator.send_forget(DummyMessage).await; assert!(result.is_ok()); } + + #[tokio::test] + async fn test_session_handle_returns_working_handle() { + use crate::session::error::SendOutcome; + + let (initiator, mut counterparty) = given_logged_on_initiator().await; + + // Get the session handle and use it to send a message + let handle = initiator.session_handle(); + let result = handle.send(DummyMessage).await; + + assert!(matches!(result, Ok(SendOutcome::Sent { .. }))); + + // Verify counterparty received the message (msg type "0" = Heartbeat) + counterparty.expect_message("0").await; + } + + #[tokio::test] + async fn test_shutdown_with_logout_handshake() { + let (initiator, mut counterparty) = given_logged_on_initiator().await; + + assert!(!initiator.is_shutdown()); + + // Spawn shutdown in background - it sends Logout and waits for response + let shutdown_handle = tokio::spawn(async move { initiator.shutdown(false).await }); + + // Counterparty receives Logout and responds + counterparty.expect_message("5").await; // Logout + counterparty.send_logout().await; + + // Close the TCP connection - this completes the disconnect + drop(counterparty); + + // Shutdown should complete successfully + let result = shutdown_handle.await.expect("shutdown task panicked"); + assert!(result.is_ok(), "Shutdown should complete, got {:?}", result); + } } diff --git a/crates/hotfix/src/session.rs b/crates/hotfix/src/session.rs index 675e333..e7f5855 100644 --- a/crates/hotfix/src/session.rs +++ b/crates/hotfix/src/session.rs @@ -363,7 +363,12 @@ where match self.state { // if the session is already disconnected, we have nothing else to do SessionState::Disconnected(..) => {} - // otherwise set the state to disconnected and assume it makes sense to try to reconnect + // if we initiated the logout, preserve the reconnect flag + SessionState::AwaitingLogout { reconnect, .. } => { + self.state.disconnect_writer().await; + self.state = SessionState::new_disconnected(reconnect, "logout completed"); + } + // otherwise assume it makes sense to try to reconnect _ => { self.state.disconnect_writer().await; self.state = SessionState::new_disconnected(true, "peer has logged us out") From d85499b2889e7005681944834450495f8b73697f Mon Sep 17 00:00:00 2001 From: David Steiner Date: Wed, 4 Feb 2026 18:55:14 +0100 Subject: [PATCH 11/11] Remove unused Disconnected internal send error variant --- crates/hotfix/src/initiator.rs | 6 +++--- crates/hotfix/src/session/error.rs | 14 -------------- 2 files changed, 3 insertions(+), 17 deletions(-) diff --git a/crates/hotfix/src/initiator.rs b/crates/hotfix/src/initiator.rs index 0ebe9dc..b7ef35e 100644 --- a/crates/hotfix/src/initiator.rs +++ b/crates/hotfix/src/initiator.rs @@ -303,7 +303,7 @@ mod tests { } } - async fn given_logged_on_initiator() -> (Initiator, TestCounterparty) { + async fn create_logged_on_initiator() -> (Initiator, TestCounterparty) { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let port = listener.local_addr().unwrap().port(); let config = create_test_config("127.0.0.1", port); @@ -438,7 +438,7 @@ mod tests { async fn test_session_handle_returns_working_handle() { use crate::session::error::SendOutcome; - let (initiator, mut counterparty) = given_logged_on_initiator().await; + let (initiator, mut counterparty) = create_logged_on_initiator().await; // Get the session handle and use it to send a message let handle = initiator.session_handle(); @@ -452,7 +452,7 @@ mod tests { #[tokio::test] async fn test_shutdown_with_logout_handshake() { - let (initiator, mut counterparty) = given_logged_on_initiator().await; + let (initiator, mut counterparty) = create_logged_on_initiator().await; assert!(!initiator.is_shutdown()); diff --git a/crates/hotfix/src/session/error.rs b/crates/hotfix/src/session/error.rs index e0d7688..50d3b29 100644 --- a/crates/hotfix/src/session/error.rs +++ b/crates/hotfix/src/session/error.rs @@ -70,12 +70,6 @@ impl From for SendError { /// which only make sense in the context of the public API. #[derive(Debug, Error)] pub(crate) enum InternalSendError { - /// The session is disconnected. Currently unused as internal sends don't - /// check connection state, but kept for symmetry with `SendError`. - #[allow(dead_code)] - #[error("session is disconnected")] - Disconnected, - #[error("failed to persist message")] Persist(#[source] StoreError), @@ -86,7 +80,6 @@ pub(crate) enum InternalSendError { impl From for SendError { fn from(err: InternalSendError) -> Self { match err { - InternalSendError::Disconnected => SendError::Disconnected, InternalSendError::Persist(e) => SendError::Persist(e), InternalSendError::SequenceNumber(e) => SendError::SequenceNumber(e), } @@ -160,13 +153,6 @@ mod tests { assert!(matches!(err, SendError::SessionGone)); } - #[test] - fn internal_send_error_disconnected_converts_to_send_error() { - let internal_err = InternalSendError::Disconnected; - let send_err: SendError = internal_err.into(); - assert!(matches!(send_err, SendError::Disconnected)); - } - #[test] fn internal_send_error_persist_converts_to_send_error() { let internal_err = InternalSendError::Persist(test_store_error());