From 8eea4769180f3eb0af4e2cf744d30a1af3b13a49 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 12 Oct 2025 13:48:04 +0000 Subject: [PATCH 01/19] Update BestBlock to store ANTI_REORG_DELAY * 2 recent block hashes On restart, LDK expects the chain to be replayed starting from where it was when objects were last serialized. This is fine in the normal case, but if there was a reorg and the node which we were syncing from either resynced or was changed, the last block that we were synced as of might no longer be available. As a result, it becomes impossible to figure out where the fork point is, and thus to replay the chain. Luckily, changing the block source during a reorg isn't exactly common, but we shouldn't end up with a bricked node. To address this, `lightning-block-sync` allows the user to pass in `Cache` which can be used to cache recent blocks and thus allow for reorg handling in this case. However, serialization for, and a reasonable default implementation of a `Cache` was never built. Instead, here, we start taking a different approach. To avoid developers having to persist yet another object, we move `BestBlock` to storing some number of recent block hashes. This allows us to find the fork point with just the serialized state. In conjunction with 403dc1a48bb71ae794f6883ae0b760aad44cda39 (which allows us to disconnect blocks without having the stored header), this should allow us to replay chain state after a reorg even if we no longer have access to the top few blocks of the old chain tip. While we only really need to store `ANTI_REORG_DELAY` blocks (as we generally assume that any deeper reorg won't happen and thus we don't guarantee we handle it correctly), its nice to store a few more to be able to handle more than a six block reorg. While other parts of the codebase may not be entirely robust against such a reorg if the transactions confirmed change out from under us, its entirely possible (and, indeed, common) for reorgs to contain nearly identical transactions. --- lightning/src/chain/mod.rs | 77 ++++++++++++++++++++++++++++++++++++-- lightning/src/util/ser.rs | 27 +++++++++++++ 2 files changed, 101 insertions(+), 3 deletions(-) diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index bc47f1b1db6..fe5c01509d7 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -18,7 +18,9 @@ use bitcoin::network::Network; use bitcoin::script::{Script, ScriptBuf}; use bitcoin::secp256k1::PublicKey; -use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, MonitorEvent}; +use crate::chain::channelmonitor::{ + ChannelMonitor, ChannelMonitorUpdate, MonitorEvent, ANTI_REORG_DELAY, +}; use crate::chain::transaction::{OutPoint, TransactionData}; use crate::ln::types::ChannelId; use crate::sign::ecdsa::EcdsaChannelSigner; @@ -43,13 +45,20 @@ pub struct BestBlock { pub block_hash: BlockHash, /// The height at which the block was confirmed. pub height: u32, + /// Previous blocks immediately before [`Self::block_hash`], in reverse chronological order. + /// + /// These ensure we can find the fork point of a reorg if our block source no longer has the + /// previous best tip after a restart. + pub previous_blocks: [Option; ANTI_REORG_DELAY as usize * 2], } impl BestBlock { /// Constructs a `BestBlock` that represents the genesis block at height 0 of the given /// network. pub fn from_network(network: Network) -> Self { - BestBlock { block_hash: genesis_block(network).header.block_hash(), height: 0 } + let block_hash = genesis_block(network).header.block_hash(); + let previous_blocks = [None; ANTI_REORG_DELAY as usize * 2]; + BestBlock { block_hash, height: 0, previous_blocks } } /// Returns a `BestBlock` as identified by the given block hash and height. @@ -57,13 +66,75 @@ impl BestBlock { /// This is not exported to bindings users directly as the bindings auto-generate an /// equivalent `new`. pub fn new(block_hash: BlockHash, height: u32) -> Self { - BestBlock { block_hash, height } + let previous_blocks = [None; ANTI_REORG_DELAY as usize * 2]; + BestBlock { block_hash, height, previous_blocks } + } + + /// Advances to a new block at height [`Self::height`] + 1. + pub fn advance(&mut self, new_hash: BlockHash) { + // Shift all block hashes to the right (making room for the old tip at index 0) + for i in (1..self.previous_blocks.len()).rev() { + self.previous_blocks[i] = self.previous_blocks[i - 1]; + } + + // The old tip becomes the new index 0 (tip-1) + self.previous_blocks[0] = Some(self.block_hash); + + // Update to the new tip + self.block_hash = new_hash; + self.height += 1; + } + + /// Returns the block hash at the given height, if available in our history. + pub fn get_hash_at_height(&self, height: u32) -> Option { + if height > self.height { + return None; + } + if height == self.height { + return Some(self.block_hash); + } + + // offset = 1 means we want tip-1, which is block_hashes[0] + // offset = 2 means we want tip-2, which is block_hashes[1], etc. + let offset = self.height.saturating_sub(height) as usize; + if offset >= 1 && offset <= self.previous_blocks.len() { + self.previous_blocks[offset - 1] + } else { + None + } + } + + /// Find the most recent common ancestor between two BestBlocks by searching their block hash + /// histories. + /// + /// Returns the common block hash and height, or None if no common block is found in the + /// available histories. + pub fn find_common_ancestor(&self, other: &BestBlock) -> Option<(BlockHash, u32)> { + // First check if either tip matches + if self.block_hash == other.block_hash && self.height == other.height { + return Some((self.block_hash, self.height)); + } + + // Check all heights covered by self's history + let min_height = self.height.saturating_sub(self.previous_blocks.len() as u32); + for check_height in (min_height..=self.height).rev() { + if let Some(self_hash) = self.get_hash_at_height(check_height) { + if let Some(other_hash) = other.get_hash_at_height(check_height) { + if self_hash == other_hash { + return Some((self_hash, check_height)); + } + } + } + } + None } } impl_writeable_tlv_based!(BestBlock, { (0, block_hash, required), + (1, previous_blocks_read, (legacy, [Option; ANTI_REORG_DELAY as usize * 2], |us: &BestBlock| Some(us.previous_blocks))), (2, height, required), + (unused, previous_blocks, (static_value, previous_blocks_read.unwrap_or([None; ANTI_REORG_DELAY as usize * 2]))), }); /// The `Listen` trait is used to notify when blocks have been connected or disconnected from the diff --git a/lightning/src/util/ser.rs b/lightning/src/util/ser.rs index f821aa5afc0..2bd8a9eb879 100644 --- a/lightning/src/util/ser.rs +++ b/lightning/src/util/ser.rs @@ -1448,6 +1448,33 @@ impl Readable for BlockHash { } } +impl Writeable for [Option; 12] { + fn write(&self, w: &mut W) -> Result<(), io::Error> { + for hash_opt in self { + match hash_opt { + Some(hash) => hash.write(w)?, + None => ([0u8; 32]).write(w)?, + } + } + Ok(()) + } +} + +impl Readable for [Option; 12] { + fn read(r: &mut R) -> Result { + use bitcoin::hashes::Hash; + + let mut res = [None; 12]; + for hash_opt in res.iter_mut() { + let buf: [u8; 32] = Readable::read(r)?; + if buf != [0; 32] { + *hash_opt = Some(BlockHash::from_slice(&buf[..]).unwrap()); + } + } + Ok(res) + } +} + impl Writeable for ChainHash { fn write(&self, w: &mut W) -> Result<(), io::Error> { w.write_all(self.as_bytes()) From ac4cfb060b7e913cb4890034913de47bdd6de989 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 27 Jan 2026 15:38:34 +0000 Subject: [PATCH 02/19] f add claude's bestblock test --- lightning/src/chain/mod.rs | 42 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index fe5c01509d7..7d60fd2e518 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -566,3 +566,45 @@ impl ClaimId { ClaimId(Sha256::from_engine(engine).to_byte_array()) } } + +#[cfg(test)] +mod tests { + use super::*; + use bitcoin::hashes::Hash; + + #[test] + fn test_best_block() { + let hash1 = BlockHash::from_slice(&[1; 32]).unwrap(); + let mut chain_a = BestBlock::new(hash1, 100); + let mut chain_b = BestBlock::new(hash1, 100); + + // Test get_hash_at_height on initial block + assert_eq!(chain_a.get_hash_at_height(100), Some(hash1)); + assert_eq!(chain_a.get_hash_at_height(101), None); + assert_eq!(chain_a.get_hash_at_height(99), None); + + // Test find_common_ancestor with identical blocks + assert_eq!(chain_a.find_common_ancestor(&chain_b), Some((hash1, 100))); + + let hash2 = BlockHash::from_slice(&[2; 32]).unwrap(); + chain_a.advance(hash2); + assert_eq!(chain_a.height, 101); + assert_eq!(chain_a.block_hash, hash2); + assert_eq!(chain_a.previous_blocks[0], Some(hash1)); + assert_eq!(chain_a.get_hash_at_height(101), Some(hash2)); + assert_eq!(chain_a.get_hash_at_height(100), Some(hash1)); + + // Test find_common_ancestor with different heights + assert_eq!(chain_a.find_common_ancestor(&chain_b), Some((hash1, 100))); + + // Test find_common_ancestor with diverged chains but the same height + let hash_b3 = BlockHash::from_slice(&[33; 32]).unwrap(); + chain_b.advance(hash_b3); + assert_eq!(chain_a.find_common_ancestor(&chain_b), Some((hash1, 100))); + + // Test find_common_ancestor with no common history + let hash_other = BlockHash::from_slice(&[99; 32]).unwrap(); + let chain_c = BestBlock::new(hash_other, 200); + assert_eq!(chain_a.find_common_ancestor(&chain_c), None); + } +} From 513e73096149f93b108e87728a85a60f49cf82de Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 27 Jan 2026 15:38:19 +0000 Subject: [PATCH 03/19] f use hard-coded values to break constant changes leading to ser break --- lightning/src/chain/mod.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index 7d60fd2e518..6c3ab8c6b9d 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -132,9 +132,11 @@ impl BestBlock { impl_writeable_tlv_based!(BestBlock, { (0, block_hash, required), - (1, previous_blocks_read, (legacy, [Option; ANTI_REORG_DELAY as usize * 2], |us: &BestBlock| Some(us.previous_blocks))), + // Note that any change to the previous_blocks array length will change the serialization + // format and thus it is specified without constants here. + (1, previous_blocks_read, (legacy, [Option; 6 * 2], |us: &BestBlock| Some(us.previous_blocks))), (2, height, required), - (unused, previous_blocks, (static_value, previous_blocks_read.unwrap_or([None; ANTI_REORG_DELAY as usize * 2]))), + (unused, previous_blocks, (static_value, previous_blocks_read.unwrap_or([None; 6 * 2]))), }); /// The `Listen` trait is used to notify when blocks have been connected or disconnected from the From 35f9565888a382456e96c65dedaa6242c5a1d05d Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 10 Feb 2026 01:30:46 +0000 Subject: [PATCH 04/19] f rebase --- lightning/src/chain/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index 6c3ab8c6b9d..2937b768178 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -134,7 +134,7 @@ impl_writeable_tlv_based!(BestBlock, { (0, block_hash, required), // Note that any change to the previous_blocks array length will change the serialization // format and thus it is specified without constants here. - (1, previous_blocks_read, (legacy, [Option; 6 * 2], |us: &BestBlock| Some(us.previous_blocks))), + (1, previous_blocks_read, (legacy, [Option; 6 * 2], |_| Ok(()), |us: &BestBlock| Some(us.previous_blocks))), (2, height, required), (unused, previous_blocks, (static_value, previous_blocks_read.unwrap_or([None; 6 * 2]))), }); From bea44d25a8a693bf9bfe2de3e78c250e49d963cc Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 12 Oct 2025 14:08:16 +0000 Subject: [PATCH 05/19] Return `BestBlock` when deserializing chain-synced structs The deserialization of `ChannelMonitor`, `ChannelManager`, and `OutputSweeper` is implemented for a `(BlockHash, ...)` pair rather than on the object itself. This ensures developers are pushed to think about initial chain sync after deserialization and provides the latest chain sync state conviniently at deserialization-time. In the previous commit we started storing additional recent block hashes in `BestBlock` for use during initial sync to ensure we can handle reorgs while offline if the chain source loses the reorged-out blocks. Here, we move the deserialization routines to be on a `(BestBlock, ...)` pair instead of `(BlockHash, ...)`, providing access to those recent block hashes at deserialization-time. --- lightning-block-sync/src/init.rs | 17 ++++---- lightning/src/chain/channelmonitor.rs | 14 +++---- lightning/src/ln/chanmon_update_fail_tests.rs | 4 +- lightning/src/ln/channelmanager.rs | 24 ++++++------ lightning/src/ln/functional_test_utils.rs | 8 ++-- lightning/src/ln/functional_tests.rs | 7 ++-- lightning/src/ln/reload_tests.rs | 11 +++--- lightning/src/util/persist.rs | 39 ++++++++++--------- lightning/src/util/test_utils.rs | 9 +++-- 9 files changed, 66 insertions(+), 67 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index a870f8ca88c..61f44c6139e 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -40,11 +40,10 @@ where /// switching to [`SpvClient`]. For example: /// /// ``` -/// use bitcoin::hash_types::BlockHash; /// use bitcoin::network::Network; /// /// use lightning::chain; -/// use lightning::chain::Watch; +/// use lightning::chain::{BestBlock, Watch}; /// use lightning::chain::chainmonitor; /// use lightning::chain::chainmonitor::ChainMonitor; /// use lightning::chain::channelmonitor::ChannelMonitor; @@ -89,14 +88,14 @@ where /// logger: &L, /// persister: &P, /// ) { -/// // Read a serialized channel monitor paired with the block hash when it was persisted. +/// // Read a serialized channel monitor paired with the best block when it was persisted. /// let serialized_monitor = "..."; -/// let (monitor_block_hash, mut monitor) = <(BlockHash, ChannelMonitor)>::read( +/// let (monitor_best_block, mut monitor) = <(BestBlock, ChannelMonitor)>::read( /// &mut Cursor::new(&serialized_monitor), (entropy_source, signer_provider)).unwrap(); /// -/// // Read the channel manager paired with the block hash when it was persisted. +/// // Read the channel manager paired with the best block when it was persisted. /// let serialized_manager = "..."; -/// let (manager_block_hash, mut manager) = { +/// let (manager_best_block, mut manager) = { /// let read_args = ChannelManagerReadArgs::new( /// entropy_source, /// node_signer, @@ -110,7 +109,7 @@ where /// config, /// vec![&mut monitor], /// ); -/// <(BlockHash, ChannelManager<&ChainMonitor, &T, &ES, &NS, &SP, &F, &R, &MR, &L>)>::read( +/// <(BestBlock, ChannelManager<&ChainMonitor, &T, &ES, &NS, &SP, &F, &R, &MR, &L>)>::read( /// &mut Cursor::new(&serialized_manager), read_args).unwrap() /// }; /// @@ -118,8 +117,8 @@ where /// let mut cache = UnboundedCache::new(); /// let mut monitor_listener = (monitor, &*tx_broadcaster, &*fee_estimator, &*logger); /// let listeners = vec![ -/// (monitor_block_hash, &monitor_listener as &dyn chain::Listen), -/// (manager_block_hash, &manager as &dyn chain::Listen), +/// (monitor_best_block.block_hash, &monitor_listener as &dyn chain::Listen), +/// (manager_best_block.block_hash, &manager as &dyn chain::Listen), /// ]; /// let chain_tip = init::synchronize_listeners( /// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap(); diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 20c41e5abcd..ecd6aa99476 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -1058,7 +1058,7 @@ impl Readable for IrrevocablyResolvedHTLC { /// You MUST ensure that no ChannelMonitors for a given channel anywhere contain out-of-date /// information and are actively monitoring the chain. /// -/// Like the [`ChannelManager`], deserialization is implemented for `(BlockHash, ChannelMonitor)`, +/// Like the [`ChannelManager`], deserialization is implemented for `(BestBlock, ChannelMonitor)`, /// providing you with the last block hash which was connected before shutting down. You must begin /// syncing the chain from that point, disconnecting and connecting blocks as required to get to /// the best chain on startup. Note that all [`ChannelMonitor`]s passed to a [`ChainMonitor`] must @@ -1066,7 +1066,7 @@ impl Readable for IrrevocablyResolvedHTLC { /// initialization. /// /// For those loading potentially-ancient [`ChannelMonitor`]s, deserialization is also implemented -/// for `Option<(BlockHash, ChannelMonitor)>`. LDK can no longer deserialize a [`ChannelMonitor`] +/// for `Option<(BestBlock, ChannelMonitor)>`. LDK can no longer deserialize a [`ChannelMonitor`] /// that was first created in LDK prior to 0.0.110 and last updated prior to LDK 0.0.119. In such /// cases, the `Option<(..)>` deserialization option may return `Ok(None)` rather than failing to /// deserialize, allowing you to differentiate between the two cases. @@ -6295,7 +6295,7 @@ where const MAX_ALLOC_SIZE: usize = 64 * 1024; impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP)> - for (BlockHash, ChannelMonitor) + for (BestBlock, ChannelMonitor) { fn read(reader: &mut R, args: (&'a ES, &'b SP)) -> Result { match >::read(reader, args) { @@ -6307,7 +6307,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP } impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP)> - for Option<(BlockHash, ChannelMonitor)> + for Option<(BestBlock, ChannelMonitor)> { #[rustfmt::skip] fn read(reader: &mut R, args: (&'a ES, &'b SP)) -> Result { @@ -6735,14 +6735,14 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP To continue, run a v0.1 release, send/route a payment over the channel or close it."); } } - Ok(Some((best_block.block_hash, monitor))) + Ok(Some((best_block, monitor))) } } #[cfg(test)] mod tests { use bitcoin::amount::Amount; - use bitcoin::hash_types::{BlockHash, Txid}; + use bitcoin::hash_types::Txid; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::Hash; use bitcoin::hex::FromHex; @@ -6841,7 +6841,7 @@ mod tests { nodes[1].chain_monitor.chain_monitor.transactions_confirmed(&new_header, &[(0, broadcast_tx)], conf_height); - let (_, pre_update_monitor) = <(BlockHash, ChannelMonitor<_>)>::read( + let (_, pre_update_monitor) = <(BestBlock, ChannelMonitor<_>)>::read( &mut io::Cursor::new(&get_monitor!(nodes[1], channel.2).encode()), (&nodes[1].keys_manager.backing, &nodes[1].keys_manager.backing)).unwrap(); diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 5a0c37bd61d..af8f0d4e475 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -16,7 +16,7 @@ use crate::chain::chaininterface::LowerBoundedFeeEstimator; use crate::chain::chainmonitor::ChainMonitor; use crate::chain::channelmonitor::{ChannelMonitor, MonitorEvent, ANTI_REORG_DELAY}; use crate::chain::transaction::OutPoint; -use crate::chain::{ChannelMonitorUpdateStatus, Listen, Watch}; +use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Listen, Watch}; use crate::events::{ClosureReason, Event, HTLCHandlingFailureType, PaymentPurpose}; use crate::ln::channel::AnnouncementSigsState; use crate::ln::channelmanager::{PaymentId, RAACommitmentOrder}; @@ -89,7 +89,7 @@ fn test_monitor_and_persister_update_fail() { let chain_mon = { let new_monitor = { let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(chan.2).unwrap(); - let (_, new_monitor) = <(BlockHash, ChannelMonitor)>::read( + let (_, new_monitor) = <(BestBlock, ChannelMonitor)>::read( &mut &monitor.encode()[..], (nodes[0].keys_manager, nodes[0].keys_manager), ) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 106cc7ccaed..efc4060555e 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1922,7 +1922,6 @@ impl< /// detailed in the [`ChannelManagerReadArgs`] documentation. /// /// ``` -/// use bitcoin::BlockHash; /// use bitcoin::network::Network; /// use lightning::chain::BestBlock; /// # use lightning::chain::channelmonitor::ChannelMonitor; @@ -1971,8 +1970,8 @@ impl< /// entropy_source, node_signer, signer_provider, fee_estimator, chain_monitor, tx_broadcaster, /// router, message_router, logger, config, channel_monitors.iter().collect(), /// ); -/// let (block_hash, channel_manager) = -/// <(BlockHash, ChannelManager<_, _, _, _, _, _, _, _, _>)>::read(&mut reader, args)?; +/// let (best_block, channel_manager) = +/// <(BestBlock, ChannelManager<_, _, _, _, _, _, _, _, _>)>::read(&mut reader, args)?; /// /// // Update the ChannelManager and ChannelMonitors with the latest chain data /// // ... @@ -2539,7 +2538,7 @@ impl< /// [`read`], those channels will be force-closed based on the `ChannelMonitor` state and no funds /// will be lost (modulo on-chain transaction fees). /// -/// Note that the deserializer is only implemented for `(`[`BlockHash`]`, `[`ChannelManager`]`)`, which +/// Note that the deserializer is only implemented for `(`[`BestBlock`]`, `[`ChannelManager`]`)`, which /// tells you the last block hash which was connected. You should get the best block tip before using the manager. /// See [`chain::Listen`] and [`chain::Confirm`] for more details. /// @@ -2606,7 +2605,6 @@ impl< /// [`peer_disconnected`]: msgs::BaseMessageHandler::peer_disconnected /// [`funding_created`]: msgs::FundingCreated /// [`funding_transaction_generated`]: Self::funding_transaction_generated -/// [`BlockHash`]: bitcoin::hash_types::BlockHash /// [`update_channel`]: chain::Watch::update_channel /// [`ChannelUpdate`]: msgs::ChannelUpdate /// [`read`]: ReadableArgs::read @@ -17617,7 +17615,7 @@ impl<'a, ES: EntropySource, NS: NodeSigner, SP: SignerProvider, L: Logger> /// is: /// 1) Deserialize all stored [`ChannelMonitor`]s. /// 2) Deserialize the [`ChannelManager`] by filling in this struct and calling: -/// `<(BlockHash, ChannelManager)>::read(reader, args)` +/// `<(BestBlock, ChannelManager)>::read(reader, args)` /// This may result in closing some channels if the [`ChannelMonitor`] is newer than the stored /// [`ChannelManager`] state to ensure no loss of funds. Thus, transactions may be broadcasted. /// 3) If you are not fetching full blocks, register all relevant [`ChannelMonitor`] outpoints the @@ -17807,14 +17805,14 @@ impl< MR: MessageRouter, L: Logger + Clone, > ReadableArgs> - for (BlockHash, Arc>) + for (BestBlock, Arc>) { fn read( reader: &mut Reader, args: ChannelManagerReadArgs<'a, M, T, ES, NS, SP, F, R, MR, L>, ) -> Result { - let (blockhash, chan_manager) = - <(BlockHash, ChannelManager)>::read(reader, args)?; - Ok((blockhash, Arc::new(chan_manager))) + let (best_block, chan_manager) = + <(BestBlock, ChannelManager)>::read(reader, args)?; + Ok((best_block, Arc::new(chan_manager))) } } @@ -17830,7 +17828,7 @@ impl< MR: MessageRouter, L: Logger + Clone, > ReadableArgs> - for (BlockHash, ChannelManager) + for (BestBlock, ChannelManager) { fn read( reader: &mut Reader, args: ChannelManagerReadArgs<'a, M, T, ES, NS, SP, F, R, MR, L>, @@ -17875,7 +17873,7 @@ impl< pub(super) fn from_channel_manager_data( data: ChannelManagerData, mut args: ChannelManagerReadArgs<'_, M, T, ES, NS, SP, F, R, MR, L>, - ) -> Result<(BlockHash, Self), DecodeError> { + ) -> Result<(BestBlock, Self), DecodeError> { let ChannelManagerData { chain_hash, best_block_height, @@ -19459,7 +19457,7 @@ impl< //TODO: Broadcast channel update for closed channels, but only after we've made a //connection or two. - Ok((best_block_hash, channel_manager)) + Ok((best_block, channel_manager)) } } diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index c21c62d464c..fd1d2028b23 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -849,7 +849,7 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> { let mon = self.chain_monitor.chain_monitor.get_monitor(channel_id).unwrap(); mon.write(&mut w).unwrap(); let (_, deserialized_monitor) = - <(BlockHash, ChannelMonitor)>::read( + <(BestBlock, ChannelMonitor)>::read( &mut io::Cursor::new(&w.0), (self.keys_manager, self.keys_manager), ) @@ -878,7 +878,7 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> { let mut w = test_utils::TestVecWriter(Vec::new()); self.node.write(&mut w).unwrap(); <( - BlockHash, + BestBlock, ChannelManager< &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, @@ -1307,7 +1307,7 @@ pub fn _reload_node<'a, 'b, 'c>( let mut monitors_read = Vec::with_capacity(monitors_encoded.len()); for encoded in monitors_encoded { let mut monitor_read = &encoded[..]; - let (_, monitor) = <(BlockHash, ChannelMonitor)>::read( + let (_, monitor) = <(BestBlock, ChannelMonitor)>::read( &mut monitor_read, (node.keys_manager, node.keys_manager), ) @@ -1322,7 +1322,7 @@ pub fn _reload_node<'a, 'b, 'c>( for monitor in monitors_read.iter() { assert!(channel_monitors.insert(monitor.channel_id(), monitor).is_none()); } - <(BlockHash, TestChannelManager<'b, 'c>)>::read( + <(BestBlock, TestChannelManager<'b, 'c>)>::read( &mut node_read, ChannelManagerReadArgs { config, diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 6fe0c83dfe8..d013c6804c9 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -19,6 +19,7 @@ use crate::chain::channelmonitor::{ LATENCY_GRACE_PERIOD_BLOCKS, }; use crate::chain::transaction::OutPoint; +use crate::chain::BestBlock; use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Listen, Watch}; use crate::events::{ ClosureReason, Event, HTLCHandlingFailureType, PathFailure, PaymentFailureReason, @@ -7382,7 +7383,7 @@ pub fn test_update_err_monitor_lockdown() { let new_monitor = { let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(chan_1.2).unwrap(); let new_monitor = - <(BlockHash, channelmonitor::ChannelMonitor)>::read( + <(BestBlock, channelmonitor::ChannelMonitor)>::read( &mut io::Cursor::new(&monitor.encode()), (nodes[0].keys_manager, nodes[0].keys_manager), ) @@ -7490,7 +7491,7 @@ pub fn test_concurrent_monitor_claim() { let new_monitor = { let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(chan_1.2).unwrap(); let new_monitor = - <(BlockHash, channelmonitor::ChannelMonitor)>::read( + <(BestBlock, channelmonitor::ChannelMonitor)>::read( &mut io::Cursor::new(&monitor.encode()), (nodes[0].keys_manager, nodes[0].keys_manager), ) @@ -7540,7 +7541,7 @@ pub fn test_concurrent_monitor_claim() { let new_monitor = { let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(chan_1.2).unwrap(); let new_monitor = - <(BlockHash, channelmonitor::ChannelMonitor)>::read( + <(BestBlock, channelmonitor::ChannelMonitor)>::read( &mut io::Cursor::new(&monitor.encode()), (nodes[0].keys_manager, nodes[0].keys_manager), ) diff --git a/lightning/src/ln/reload_tests.rs b/lightning/src/ln/reload_tests.rs index 6bd190ae12c..aed15a4196c 100644 --- a/lightning/src/ln/reload_tests.rs +++ b/lightning/src/ln/reload_tests.rs @@ -11,7 +11,7 @@ //! Functional tests which test for correct behavior across node restarts. -use crate::chain::{ChannelMonitorUpdateStatus, Watch}; +use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Watch}; use crate::chain::chaininterface::LowerBoundedFeeEstimator; use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateStep}; use crate::routing::router::{PaymentParameters, RouteParameters}; @@ -30,7 +30,6 @@ use crate::util::ser::{Writeable, ReadableArgs}; use crate::util::config::{HTLCInterceptionFlags, UserConfig}; use bitcoin::hashes::Hash; -use bitcoin::hash_types::BlockHash; use types::payment::{PaymentHash, PaymentPreimage}; use crate::prelude::*; @@ -412,7 +411,7 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { let mut node_0_stale_monitors = Vec::new(); for serialized in node_0_stale_monitors_serialized.iter() { let mut read = &serialized[..]; - let (_, monitor) = <(BlockHash, ChannelMonitor)>::read(&mut read, (keys_manager, keys_manager)).unwrap(); + let (_, monitor) = <(BestBlock, ChannelMonitor)>::read(&mut read, (keys_manager, keys_manager)).unwrap(); assert!(read.is_empty()); node_0_stale_monitors.push(monitor); } @@ -420,14 +419,14 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { let mut node_0_monitors = Vec::new(); for serialized in node_0_monitors_serialized.iter() { let mut read = &serialized[..]; - let (_, monitor) = <(BlockHash, ChannelMonitor)>::read(&mut read, (keys_manager, keys_manager)).unwrap(); + let (_, monitor) = <(BestBlock, ChannelMonitor)>::read(&mut read, (keys_manager, keys_manager)).unwrap(); assert!(read.is_empty()); node_0_monitors.push(monitor); } let mut nodes_0_read = &nodes_0_serialized[..]; if let Err(msgs::DecodeError::DangerousValue) = - <(BlockHash, ChannelManager<&test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestRouter, &test_utils::TestMessageRouter, &test_utils::TestLogger>)>::read(&mut nodes_0_read, ChannelManagerReadArgs { + <(BestBlock, ChannelManager<&test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestRouter, &test_utils::TestMessageRouter, &test_utils::TestLogger>)>::read(&mut nodes_0_read, ChannelManagerReadArgs { config: UserConfig::default(), entropy_source: keys_manager, node_signer: keys_manager, @@ -445,7 +444,7 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { let mut nodes_0_read = &nodes_0_serialized[..]; let (_, nodes_0_deserialized_tmp) = - <(BlockHash, ChannelManager<&test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestRouter, &test_utils::TestMessageRouter, &test_utils::TestLogger>)>::read(&mut nodes_0_read, ChannelManagerReadArgs { + <(BestBlock, ChannelManager<&test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestRouter, &test_utils::TestMessageRouter, &test_utils::TestLogger>)>::read(&mut nodes_0_read, ChannelManagerReadArgs { config: UserConfig::default(), entropy_source: keys_manager, node_signer: keys_manager, diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index cb4bdeb6a51..d17ecdd3696 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -14,7 +14,7 @@ use alloc::sync::Arc; use bitcoin::hashes::hex::FromHex; -use bitcoin::{BlockHash, Txid}; +use bitcoin::Txid; use core::convert::Infallible; use core::future::Future; @@ -32,6 +32,7 @@ use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use crate::chain::chainmonitor::Persist; use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate}; use crate::chain::transaction::OutPoint; +use crate::chain::BestBlock; use crate::ln::types::ChannelId; use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider}; use crate::sync::Mutex; @@ -467,7 +468,7 @@ impl Persist( kv_store: K, entropy_source: ES, signer_provider: SP, -) -> Result)>, io::Error> +) -> Result)>, io::Error> where K::Target: KVStoreSync, { @@ -477,7 +478,7 @@ where CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, )? { - match )>>::read( + match )>>::read( &mut io::Cursor::new(kv_store.read( CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, @@ -485,7 +486,7 @@ where )?), (&entropy_source, &signer_provider), ) { - Ok(Some((block_hash, channel_monitor))) => { + Ok(Some((best_block, channel_monitor))) => { let monitor_name = MonitorName::from_str(&stored_key)?; if channel_monitor.persistence_key() != monitor_name { return Err(io::Error::new( @@ -494,7 +495,7 @@ where )); } - res.push((block_hash, channel_monitor)); + res.push((best_block, channel_monitor)); }, Ok(None) => {}, Err(_) => { @@ -670,7 +671,7 @@ where /// Reads all stored channel monitors, along with any stored updates for them. pub fn read_all_channel_monitors_with_updates( &self, - ) -> Result)>, io::Error> { + ) -> Result)>, io::Error> { poll_sync_future(self.0.read_all_channel_monitors_with_updates()) } @@ -691,7 +692,7 @@ where /// function to accomplish this. Take care to limit the number of parallel readers. pub fn read_channel_monitor_with_updates( &self, monitor_key: &str, - ) -> Result<(BlockHash, ChannelMonitor), io::Error> { + ) -> Result<(BestBlock, ChannelMonitor), io::Error> { poll_sync_future(self.0.read_channel_monitor_with_updates(monitor_key)) } @@ -858,7 +859,7 @@ impl< /// deserialization as well. pub async fn read_all_channel_monitors_with_updates( &self, - ) -> Result)>, io::Error> { + ) -> Result)>, io::Error> { let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE; let monitor_list = self.0.kv_store.list(primary, secondary).await?; @@ -889,7 +890,7 @@ impl< /// `Arc` that can live for `'static` and be sent and accessed across threads. pub async fn read_all_channel_monitors_with_updates_parallel( self: &Arc, - ) -> Result)>, io::Error> + ) -> Result)>, io::Error> where K: MaybeSend + MaybeSync + 'static, L: MaybeSend + MaybeSync + 'static, @@ -939,7 +940,7 @@ impl< /// function to accomplish this. Take care to limit the number of parallel readers. pub async fn read_channel_monitor_with_updates( &self, monitor_key: &str, - ) -> Result<(BlockHash, ChannelMonitor), io::Error> { + ) -> Result<(BestBlock, ChannelMonitor), io::Error> { self.0.read_channel_monitor_with_updates(monitor_key).await } @@ -1050,7 +1051,7 @@ impl< { pub async fn read_channel_monitor_with_updates( &self, monitor_key: &str, - ) -> Result<(BlockHash, ChannelMonitor), io::Error> { + ) -> Result<(BestBlock, ChannelMonitor), io::Error> { match self.maybe_read_channel_monitor_with_updates(monitor_key).await? { Some(res) => Ok(res), None => Err(io::Error::new( @@ -1067,14 +1068,14 @@ impl< async fn maybe_read_channel_monitor_with_updates( &self, monitor_key: &str, - ) -> Result)>, io::Error> { + ) -> Result)>, io::Error> { let monitor_name = MonitorName::from_str(monitor_key)?; let read_future = pin!(self.maybe_read_monitor(&monitor_name, monitor_key)); let list_future = pin!(self .kv_store .list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_key)); let (read_res, list_res) = TwoFutureJoiner::new(read_future, list_future).await; - let (block_hash, monitor) = match read_res? { + let (best_block, monitor) = match read_res? { Some(res) => res, None => return Ok(None), }; @@ -1105,13 +1106,13 @@ impl< io::Error::new(io::ErrorKind::Other, "Monitor update failed") })?; } - Ok(Some((block_hash, monitor))) + Ok(Some((best_block, monitor))) } /// Read a channel monitor. async fn maybe_read_monitor( &self, monitor_name: &MonitorName, monitor_key: &str, - ) -> Result)>, io::Error> { + ) -> Result)>, io::Error> { let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE; let monitor_bytes = self.kv_store.read(primary, secondary, monitor_key).await?; @@ -1120,12 +1121,12 @@ impl< if monitor_cursor.get_ref().starts_with(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL) { monitor_cursor.set_position(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() as u64); } - match )>>::read( + match )>>::read( &mut monitor_cursor, (&self.entropy_source, &self.signer_provider), ) { Ok(None) => Ok(None), - Ok(Some((blockhash, channel_monitor))) => { + Ok(Some((best_block, channel_monitor))) => { if channel_monitor.persistence_key() != *monitor_name { log_error!( self.logger, @@ -1137,7 +1138,7 @@ impl< "ChannelMonitor was stored under the wrong key", )) } else { - Ok(Some((blockhash, channel_monitor))) + Ok(Some((best_block, channel_monitor))) } }, Err(e) => { @@ -1316,7 +1317,7 @@ impl< async fn archive_persisted_channel(&self, monitor_name: MonitorName) { let monitor_key = monitor_name.to_string(); let monitor = match self.read_channel_monitor_with_updates(&monitor_key).await { - Ok((_block_hash, monitor)) => monitor, + Ok((_best_block, monitor)) => monitor, Err(_) => return, }; let primary = ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index bcf39fde482..077f6d62687 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -20,6 +20,7 @@ use crate::chain::channelmonitor::{ ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, MonitorEvent, }; use crate::chain::transaction::OutPoint; +use crate::chain::BestBlock; use crate::chain::WatchedOutput; use crate::events::bump_transaction::sync::WalletSourceSync; use crate::events::bump_transaction::Utxo; @@ -67,7 +68,7 @@ use bitcoin::amount::Amount; use bitcoin::block::Block; use bitcoin::constants::genesis_block; use bitcoin::constants::ChainHash; -use bitcoin::hash_types::{BlockHash, Txid}; +use bitcoin::hash_types::Txid; use bitcoin::hashes::{hex::FromHex, Hash}; use bitcoin::network::Network; use bitcoin::script::{Builder, Script, ScriptBuf}; @@ -564,7 +565,7 @@ impl<'a> TestChainMonitor<'a> { // underlying `ChainMonitor`. let mut w = TestVecWriter(Vec::new()); monitor.write(&mut w).unwrap(); - let new_monitor = <(BlockHash, ChannelMonitor)>::read( + let new_monitor = <(BestBlock, ChannelMonitor)>::read( &mut io::Cursor::new(&w.0), (self.keys_manager, self.keys_manager), ) @@ -601,7 +602,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { // monitor to a serialized copy and get he same one back. let mut w = TestVecWriter(Vec::new()); monitor.write(&mut w).unwrap(); - let new_monitor = <(BlockHash, ChannelMonitor)>::read( + let new_monitor = <(BestBlock, ChannelMonitor)>::read( &mut io::Cursor::new(&w.0), (self.keys_manager, self.keys_manager), ) @@ -657,7 +658,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { let monitor = self.chain_monitor.get_monitor(channel_id).unwrap(); w.0.clear(); monitor.write(&mut w).unwrap(); - let new_monitor = <(BlockHash, ChannelMonitor)>::read( + let new_monitor = <(BestBlock, ChannelMonitor)>::read( &mut io::Cursor::new(&w.0), (self.keys_manager, self.keys_manager), ) From 32d09831b07aedd20486dc7788619d0c3fb46611 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 12 Oct 2025 16:01:55 +0000 Subject: [PATCH 06/19] Replace `Cache::block_disconnected` with `blocks_disconnected` In 403dc1a48bb71ae794f6883ae0b760aad44cda39 we converted the `Listen` disconnect semantics to only pass the fork point, rather than each block being disconnected. We did not, however, update the semantics of `lightning-block-sync`'s `Cache` to reduce patch size. Here we go ahead and do so, dropping `ChainDifference::disconnected_blocks` as well as its no longer needed. --- lightning-block-sync/src/init.rs | 8 +++---- lightning-block-sync/src/lib.rs | 37 +++++++++++++------------------- 2 files changed, 19 insertions(+), 26 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index 61f44c6139e..07575c6f523 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -175,7 +175,9 @@ where let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; let difference = chain_notifier.find_difference(best_header, &old_header, &mut chain_poller).await?; - chain_notifier.disconnect_blocks(difference.disconnected_blocks); + if difference.common_ancestor != old_header { + chain_notifier.disconnect_blocks(difference.common_ancestor); + } (difference.common_ancestor, difference.connected_blocks) }; @@ -215,9 +217,7 @@ impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> { unreachable!() } - fn block_disconnected(&mut self, _block_hash: &BlockHash) -> Option { - None - } + fn blocks_disconnected(&mut self, _fork_point: &ValidatedBlockHeader) {} } /// Wrapper for supporting dynamically sized chain listeners. diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 02593047658..91de5beaca3 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -202,9 +202,11 @@ pub trait Cache { /// disconnected later if needed. fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader); - /// Called when a block has been disconnected from the best chain. Once disconnected, a block's - /// header is no longer needed and thus can be removed. - fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option; + /// Called when blocks have been disconnected from the best chain. Only the fork point + /// (best comon ancestor) is provided. + /// + /// Once disconnected, a block's header is no longer needed and thus can be removed. + fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader); } /// Unbounded cache of block headers keyed by block hash. @@ -219,8 +221,8 @@ impl Cache for UnboundedCache { self.insert(block_hash, block_header); } - fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option { - self.remove(block_hash) + fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) { + self.retain(|_, block_info| block_info.height < fork_point.height); } } @@ -315,9 +317,6 @@ struct ChainDifference { /// If there are any disconnected blocks, this is where the chain forked. common_ancestor: ValidatedBlockHeader, - /// Blocks that were disconnected from the chain since the last poll. - disconnected_blocks: Vec, - /// Blocks that were connected to the chain since the last poll. connected_blocks: Vec, } @@ -341,7 +340,9 @@ where .find_difference(new_header, old_header, chain_poller) .await .map_err(|e| (e, None))?; - self.disconnect_blocks(difference.disconnected_blocks); + if difference.common_ancestor != *old_header { + self.disconnect_blocks(difference.common_ancestor); + } self.connect_blocks(difference.common_ancestor, difference.connected_blocks, chain_poller) .await } @@ -354,7 +355,6 @@ where &self, current_header: ValidatedBlockHeader, prev_header: &ValidatedBlockHeader, chain_poller: &mut P, ) -> BlockSourceResult { - let mut disconnected_blocks = Vec::new(); let mut connected_blocks = Vec::new(); let mut current = current_header; let mut previous = *prev_header; @@ -369,7 +369,6 @@ where let current_height = current.height; let previous_height = previous.height; if current_height <= previous_height { - disconnected_blocks.push(previous); previous = self.look_up_previous_header(chain_poller, &previous).await?; } if current_height >= previous_height { @@ -379,7 +378,7 @@ where } let common_ancestor = current; - Ok(ChainDifference { common_ancestor, disconnected_blocks, connected_blocks }) + Ok(ChainDifference { common_ancestor, connected_blocks }) } /// Returns the previous header for the given header, either by looking it up in the cache or @@ -394,16 +393,10 @@ where } /// Notifies the chain listeners of disconnected blocks. - fn disconnect_blocks(&mut self, disconnected_blocks: Vec) { - for header in disconnected_blocks.iter() { - if let Some(cached_header) = self.header_cache.block_disconnected(&header.block_hash) { - assert_eq!(cached_header, *header); - } - } - if let Some(block) = disconnected_blocks.last() { - let fork_point = BestBlock::new(block.header.prev_blockhash, block.height - 1); - self.chain_listener.blocks_disconnected(fork_point); - } + fn disconnect_blocks(&mut self, fork_point: ValidatedBlockHeader) { + self.header_cache.blocks_disconnected(&fork_point); + let best_block = BestBlock::new(fork_point.block_hash, fork_point.height); + self.chain_listener.blocks_disconnected(best_block); } /// Notifies the chain listeners of connected blocks. From e64a9392a450b284787de81d9842853faf070ed6 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 12 Oct 2025 15:49:11 +0000 Subject: [PATCH 07/19] Pass a `BestBlock` to `init::synchronize_listeners` On restart, LDK expects the chain to be replayed starting from where it was when objects were last serialized. This is fine in the normal case, but if there was a reorg and the node which we were syncing from either resynced or was changed, the last block that we were synced as of might no longer be available. As a result, it becomes impossible to figure out where the fork point is, and thus to replay the chain. Luckily, changing the block source during a reorg isn't exactly common, but we shouldn't end up with a bricked node. To address this, `lightning-block-sync` allows the user to pass in `Cache` which can be used to cache recent blocks and thus allow for reorg handling in this case. However, serialization for, and a reasonable default implementation of a `Cache` was never built. Instead, here, we start taking a different approach. To avoid developers having to persist yet another object, we move `BestBlock` to storing some number of recent block hashes. This allows us to find the fork point with just the serialized state. In a previous commit, we moved deserialization of various structs to return the `BestBlock` rather than a `BlockHash`. Here we move to actually using it, taking a `BestBlock` in place of `BlockHash` to `init::synchronize_listeners` and walking the `previous_blocks` list to find the fork point rather than relying on the `Cache`. --- lightning-block-sync/src/init.rs | 51 ++++++++++---------------- lightning-block-sync/src/lib.rs | 45 ++++++++++++++++++++++- lightning-block-sync/src/poll.rs | 13 +++++++ lightning-block-sync/src/test_utils.rs | 17 +++++++++ 4 files changed, 93 insertions(+), 33 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index 07575c6f523..4fdd3efabd8 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -117,8 +117,8 @@ where /// let mut cache = UnboundedCache::new(); /// let mut monitor_listener = (monitor, &*tx_broadcaster, &*fee_estimator, &*logger); /// let listeners = vec![ -/// (monitor_best_block.block_hash, &monitor_listener as &dyn chain::Listen), -/// (manager_best_block.block_hash, &manager as &dyn chain::Listen), +/// (monitor_best_block, &monitor_listener as &dyn chain::Listen), +/// (manager_best_block, &manager as &dyn chain::Listen), /// ]; /// let chain_tip = init::synchronize_listeners( /// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap(); @@ -143,39 +143,28 @@ pub async fn synchronize_listeners< L: chain::Listen + ?Sized, >( block_source: B, network: Network, header_cache: &mut C, - mut chain_listeners: Vec<(BlockHash, &L)>, + mut chain_listeners: Vec<(BestBlock, &L)>, ) -> BlockSourceResult where B::Target: BlockSource, { let best_header = validate_best_block_header(&*block_source).await?; - // Fetch the header for the block hash paired with each listener. - let mut chain_listeners_with_old_headers = Vec::new(); - for (old_block_hash, chain_listener) in chain_listeners.drain(..) { - let old_header = match header_cache.look_up(&old_block_hash) { - Some(header) => *header, - None => { - block_source.get_header(&old_block_hash, None).await?.validate(old_block_hash)? - }, - }; - chain_listeners_with_old_headers.push((old_header, chain_listener)) - } - // Find differences and disconnect blocks for each listener individually. let mut chain_poller = ChainPoller::new(block_source, network); let mut chain_listeners_at_height = Vec::new(); let mut most_common_ancestor = None; let mut most_connected_blocks = Vec::new(); - for (old_header, chain_listener) in chain_listeners_with_old_headers.drain(..) { + for (old_best_block, chain_listener) in chain_listeners.drain(..) { // Disconnect any stale blocks, but keep them in the cache for the next iteration. let header_cache = &mut ReadOnlyCache(header_cache); let (common_ancestor, connected_blocks) = { let chain_listener = &DynamicChainListener(chain_listener); let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; - let difference = - chain_notifier.find_difference(best_header, &old_header, &mut chain_poller).await?; - if difference.common_ancestor != old_header { + let difference = chain_notifier + .find_difference_from_best_block(best_header, old_best_block, &mut chain_poller) + .await?; + if difference.common_ancestor.block_hash != old_best_block.block_hash { chain_notifier.disconnect_blocks(difference.common_ancestor); } (difference.common_ancestor, difference.connected_blocks) @@ -281,9 +270,9 @@ mod tests { let listener_3 = MockChainListener::new().expect_block_connected(*chain.at_height(4)); let listeners = vec![ - (chain.at_height(1).block_hash, &listener_1 as &dyn chain::Listen), - (chain.at_height(2).block_hash, &listener_2 as &dyn chain::Listen), - (chain.at_height(3).block_hash, &listener_3 as &dyn chain::Listen), + (chain.best_block_at_height(1), &listener_1 as &dyn chain::Listen), + (chain.best_block_at_height(2), &listener_2 as &dyn chain::Listen), + (chain.best_block_at_height(3), &listener_3 as &dyn chain::Listen), ]; let mut cache = chain.header_cache(0..=4); match synchronize_listeners(&chain, Network::Bitcoin, &mut cache, listeners).await { @@ -313,9 +302,9 @@ mod tests { .expect_block_connected(*main_chain.at_height(4)); let listeners = vec![ - (fork_chain_1.tip().block_hash, &listener_1 as &dyn chain::Listen), - (fork_chain_2.tip().block_hash, &listener_2 as &dyn chain::Listen), - (fork_chain_3.tip().block_hash, &listener_3 as &dyn chain::Listen), + (fork_chain_1.best_block(), &listener_1 as &dyn chain::Listen), + (fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen), + (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), ]; let mut cache = fork_chain_1.header_cache(2..=4); cache.extend(fork_chain_2.header_cache(3..=4)); @@ -350,9 +339,9 @@ mod tests { .expect_block_connected(*main_chain.at_height(4)); let listeners = vec![ - (fork_chain_1.tip().block_hash, &listener_1 as &dyn chain::Listen), - (fork_chain_2.tip().block_hash, &listener_2 as &dyn chain::Listen), - (fork_chain_3.tip().block_hash, &listener_3 as &dyn chain::Listen), + (fork_chain_1.best_block(), &listener_1 as &dyn chain::Listen), + (fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen), + (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), ]; let mut cache = fork_chain_1.header_cache(2..=4); cache.extend(fork_chain_2.header_cache(3..=4)); @@ -368,18 +357,18 @@ mod tests { let main_chain = Blockchain::default().with_height(2); let fork_chain = main_chain.fork_at_height(1); let new_tip = main_chain.tip(); - let old_tip = fork_chain.tip(); + let old_best_block = fork_chain.best_block(); let listener = MockChainListener::new() .expect_blocks_disconnected(*fork_chain.at_height(1)) .expect_block_connected(*new_tip); - let listeners = vec![(old_tip.block_hash, &listener as &dyn chain::Listen)]; + let listeners = vec![(old_best_block, &listener as &dyn chain::Listen)]; let mut cache = fork_chain.header_cache(2..=2); match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await { Ok(_) => { assert!(cache.contains_key(&new_tip.block_hash)); - assert!(cache.contains_key(&old_tip.block_hash)); + assert!(cache.contains_key(&old_best_block.block_hash)); }, Err(e) => panic!("Unexpected error: {:?}", e), } diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 91de5beaca3..ad2cb4215b3 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -337,7 +337,7 @@ where chain_poller: &mut P, ) -> Result<(), (BlockSourceError, Option)> { let difference = self - .find_difference(new_header, old_header, chain_poller) + .find_difference_from_header(new_header, old_header, chain_poller) .await .map_err(|e| (e, None))?; if difference.common_ancestor != *old_header { @@ -347,11 +347,52 @@ where .await } + /// Returns the changes needed to produce the chain with `current_header` as its tip from the + /// chain with `prev_best_block` as its tip. + /// + /// First resolves `prev_best_block` to a `ValidatedBlockHeader` using the `previous_blocks` + /// field as fallback if needed, then finds the common ancestor. + async fn find_difference_from_best_block( + &self, current_header: ValidatedBlockHeader, prev_best_block: BestBlock, + chain_poller: &mut P, + ) -> BlockSourceResult { + // Try to resolve the header for the previous best block. First try the block_hash, + // then fall back to previous_blocks if that fails. + let cur_tip = core::iter::once((0, &prev_best_block.block_hash)); + let prev_tips = + prev_best_block.previous_blocks.iter().enumerate().filter_map(|(idx, hash_opt)| { + if let Some(block_hash) = hash_opt { + Some((idx as u32 + 1, block_hash)) + } else { + None + } + }); + let mut found_header = None; + for (height_diff, block_hash) in cur_tip.chain(prev_tips) { + if let Some(header) = self.header_cache.look_up(block_hash) { + found_header = Some(*header); + break; + } + let height = prev_best_block.height.checked_sub(height_diff).ok_or( + BlockSourceError::persistent("BestBlock had more previous_blocks than its height"), + )?; + if let Ok(header) = chain_poller.get_header(block_hash, Some(height)).await { + found_header = Some(header); + break; + } + } + let found_header = found_header.ok_or_else(|| { + BlockSourceError::persistent("could not resolve any block from BestBlock") + })?; + + self.find_difference_from_header(current_header, &found_header, chain_poller).await + } + /// Returns the changes needed to produce the chain with `current_header` as its tip from the /// chain with `prev_header` as its tip. /// /// Walks backwards from `current_header` and `prev_header`, finding the common ancestor. - async fn find_difference( + async fn find_difference_from_header( &self, current_header: ValidatedBlockHeader, prev_header: &ValidatedBlockHeader, chain_poller: &mut P, ) -> BlockSourceResult { diff --git a/lightning-block-sync/src/poll.rs b/lightning-block-sync/src/poll.rs index 13e0403c3b6..fd8c546c56f 100644 --- a/lightning-block-sync/src/poll.rs +++ b/lightning-block-sync/src/poll.rs @@ -31,6 +31,11 @@ pub trait Poll { fn fetch_block<'a>( &'a self, header: &'a ValidatedBlockHeader, ) -> impl Future> + Send + 'a; + + /// Returns the header for a given hash and optional height hint. + fn get_header<'a>( + &'a self, block_hash: &'a BlockHash, height_hint: Option, + ) -> impl Future> + Send + 'a; } /// A chain tip relative to another chain tip in terms of block hash and chainwork. @@ -258,6 +263,14 @@ impl + Sized + Send + Sync, T: BlockSource + ?Sized> Poll ) -> impl Future> + Send + 'a { async move { self.block_source.get_block(&header.block_hash).await?.validate(header.block_hash) } } + + fn get_header<'a>( + &'a self, block_hash: &'a BlockHash, height_hint: Option, + ) -> impl Future> + Send + 'a { + Box::pin(async move { + self.block_source.get_header(block_hash, height_hint).await?.validate(*block_hash) + }) + } } #[cfg(test)] diff --git a/lightning-block-sync/src/test_utils.rs b/lightning-block-sync/src/test_utils.rs index 40788e4d08c..3d7870afb1e 100644 --- a/lightning-block-sync/src/test_utils.rs +++ b/lightning-block-sync/src/test_utils.rs @@ -104,6 +104,18 @@ impl Blockchain { block_header.validate(block_hash).unwrap() } + pub fn best_block_at_height(&self, height: usize) -> BestBlock { + let mut previous_blocks = [None; 12]; + for (i, height) in (0..height).rev().take(12).enumerate() { + previous_blocks[i] = Some(self.blocks[height].block_hash()); + } + BestBlock { + height: height as u32, + block_hash: self.blocks[height].block_hash(), + previous_blocks, + } + } + fn at_height_unvalidated(&self, height: usize) -> BlockHeaderData { assert!(!self.blocks.is_empty()); assert!(height < self.blocks.len()); @@ -123,6 +135,11 @@ impl Blockchain { self.at_height(self.blocks.len() - 1) } + pub fn best_block(&self) -> BestBlock { + assert!(!self.blocks.is_empty()); + self.best_block_at_height(self.blocks.len() - 1) + } + pub fn disconnect_tip(&mut self) -> Option { self.blocks.pop() } From d03ed920229057257ef323b09c9558c56cf6d360 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 27 Jan 2026 00:37:20 +0000 Subject: [PATCH 08/19] Make the `Cache` trait priv, just use `UnboundedCache` publicly In the previous commit, we moved to relying on `BestBlock::previous_blocks` to find the fork point in `lightning-block-sync`'s `init::synchronize_listeners`. Here we now drop the `Cache` parameter as we no longer rely on it. Because we now have no reason to want a persistent `Cache`, we remove the trait from the public interface. However, to keep disconnections reliable we return the `UnboundedCache` we built up during initial sync from `init::synchronize_listeners` which we expect developers to pass to `SpvClient::new`. --- lightning-block-sync/src/init.rs | 94 +++++++------------------------- lightning-block-sync/src/lib.rs | 61 ++++++++++++--------- 2 files changed, 56 insertions(+), 99 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index 4fdd3efabd8..5e1c344e3f5 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -2,7 +2,7 @@ //! from disk. use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader}; -use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier}; +use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier, UnboundedCache}; use bitcoin::block::Header; use bitcoin::hash_types::BlockHash; @@ -32,9 +32,9 @@ where /// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each /// listener's view of the chain from its paired block hash to `block_source`'s best chain tip. /// -/// Upon success, the returned header can be used to initialize [`SpvClient`]. In the case of -/// failure, each listener may be left at a different block hash than the one it was originally -/// paired with. +/// Upon success, the returned header and header cache can be used to initialize [`SpvClient`]. In +/// the case of failure, each listener may be left at a different block hash than the one it was +/// originally paired with. /// /// Useful during startup to bring the [`ChannelManager`] and each [`ChannelMonitor`] in sync before /// switching to [`SpvClient`]. For example: @@ -114,14 +114,13 @@ where /// }; /// /// // Synchronize any channel monitors and the channel manager to be on the best block. -/// let mut cache = UnboundedCache::new(); /// let mut monitor_listener = (monitor, &*tx_broadcaster, &*fee_estimator, &*logger); /// let listeners = vec![ /// (monitor_best_block, &monitor_listener as &dyn chain::Listen), /// (manager_best_block, &manager as &dyn chain::Listen), /// ]; -/// let chain_tip = init::synchronize_listeners( -/// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap(); +/// let (chain_cache, chain_tip) = init::synchronize_listeners( +/// block_source, Network::Bitcoin, listeners).await.unwrap(); /// /// // Allow the chain monitor to watch any channels. /// let monitor = monitor_listener.0; @@ -130,21 +129,16 @@ where /// // Create an SPV client to notify the chain monitor and channel manager of block events. /// let chain_poller = poll::ChainPoller::new(block_source, Network::Bitcoin); /// let mut chain_listener = (chain_monitor, &manager); -/// let spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, &chain_listener); +/// let spv_client = SpvClient::new(chain_tip, chain_poller, chain_cache, &chain_listener); /// } /// ``` /// /// [`SpvClient`]: crate::SpvClient /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor -pub async fn synchronize_listeners< - B: Deref + Sized + Send + Sync, - C: Cache, - L: chain::Listen + ?Sized, ->( - block_source: B, network: Network, header_cache: &mut C, - mut chain_listeners: Vec<(BestBlock, &L)>, -) -> BlockSourceResult +pub async fn synchronize_listeners( + block_source: B, network: Network, mut chain_listeners: Vec<(BestBlock, &L)>, +) -> BlockSourceResult<(UnboundedCache, ValidatedBlockHeader)> where B::Target: BlockSource, { @@ -155,12 +149,13 @@ where let mut chain_listeners_at_height = Vec::new(); let mut most_common_ancestor = None; let mut most_connected_blocks = Vec::new(); + let mut header_cache = UnboundedCache::new(); for (old_best_block, chain_listener) in chain_listeners.drain(..) { // Disconnect any stale blocks, but keep them in the cache for the next iteration. - let header_cache = &mut ReadOnlyCache(header_cache); let (common_ancestor, connected_blocks) = { let chain_listener = &DynamicChainListener(chain_listener); - let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; + let mut chain_notifier = + ChainNotifier { header_cache: &mut header_cache, chain_listener }; let difference = chain_notifier .find_difference_from_best_block(best_header, old_best_block, &mut chain_poller) .await?; @@ -181,32 +176,14 @@ where // Connect new blocks for all listeners at once to avoid re-fetching blocks. if let Some(common_ancestor) = most_common_ancestor { let chain_listener = &ChainListenerSet(chain_listeners_at_height); - let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; + let mut chain_notifier = ChainNotifier { header_cache: &mut header_cache, chain_listener }; chain_notifier .connect_blocks(common_ancestor, most_connected_blocks, &mut chain_poller) .await .map_err(|(e, _)| e)?; } - Ok(best_header) -} - -/// A wrapper to make a cache read-only. -/// -/// Used to prevent losing headers that may be needed to disconnect blocks common to more than one -/// listener. -struct ReadOnlyCache<'a, C: Cache>(&'a mut C); - -impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> { - fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { - self.0.look_up(block_hash) - } - - fn block_connected(&mut self, _block_hash: BlockHash, _block_header: ValidatedBlockHeader) { - unreachable!() - } - - fn blocks_disconnected(&mut self, _fork_point: &ValidatedBlockHeader) {} + Ok((header_cache, best_header)) } /// Wrapper for supporting dynamically sized chain listeners. @@ -274,9 +251,8 @@ mod tests { (chain.best_block_at_height(2), &listener_2 as &dyn chain::Listen), (chain.best_block_at_height(3), &listener_3 as &dyn chain::Listen), ]; - let mut cache = chain.header_cache(0..=4); - match synchronize_listeners(&chain, Network::Bitcoin, &mut cache, listeners).await { - Ok(header) => assert_eq!(header, chain.tip()), + match synchronize_listeners(&chain, Network::Bitcoin, listeners).await { + Ok((_, header)) => assert_eq!(header, chain.tip()), Err(e) => panic!("Unexpected error: {:?}", e), } } @@ -306,11 +282,8 @@ mod tests { (fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen), (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), ]; - let mut cache = fork_chain_1.header_cache(2..=4); - cache.extend(fork_chain_2.header_cache(3..=4)); - cache.extend(fork_chain_3.header_cache(4..=4)); - match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await { - Ok(header) => assert_eq!(header, main_chain.tip()), + match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await { + Ok((_, header)) => assert_eq!(header, main_chain.tip()), Err(e) => panic!("Unexpected error: {:?}", e), } } @@ -343,33 +316,8 @@ mod tests { (fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen), (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), ]; - let mut cache = fork_chain_1.header_cache(2..=4); - cache.extend(fork_chain_2.header_cache(3..=4)); - cache.extend(fork_chain_3.header_cache(4..=4)); - match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await { - Ok(header) => assert_eq!(header, main_chain.tip()), - Err(e) => panic!("Unexpected error: {:?}", e), - } - } - - #[tokio::test] - async fn cache_connected_and_keep_disconnected_blocks() { - let main_chain = Blockchain::default().with_height(2); - let fork_chain = main_chain.fork_at_height(1); - let new_tip = main_chain.tip(); - let old_best_block = fork_chain.best_block(); - - let listener = MockChainListener::new() - .expect_blocks_disconnected(*fork_chain.at_height(1)) - .expect_block_connected(*new_tip); - - let listeners = vec![(old_best_block, &listener as &dyn chain::Listen)]; - let mut cache = fork_chain.header_cache(2..=2); - match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await { - Ok(_) => { - assert!(cache.contains_key(&new_tip.block_hash)); - assert!(cache.contains_key(&old_best_block.block_hash)); - }, + match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await { + Ok((_, header)) => assert_eq!(header, main_chain.tip()), Err(e) => panic!("Unexpected error: {:?}", e), } } diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index ad2cb4215b3..4ed8a466629 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -170,18 +170,13 @@ pub enum BlockData { /// sources for the best chain tip. During this process it detects any chain forks, determines which /// constitutes the best chain, and updates the listener accordingly with any blocks that were /// connected or disconnected since the last poll. -/// -/// Block headers for the best chain are maintained in the parameterized cache, allowing for a -/// custom cache eviction policy. This offers flexibility to those sensitive to resource usage. -/// Hence, there is a trade-off between a lower memory footprint and potentially increased network -/// I/O as headers are re-fetched during fork detection. -pub struct SpvClient<'a, P: Poll, C: Cache, L: Deref> +pub struct SpvClient where L::Target: chain::Listen, { chain_tip: ValidatedBlockHeader, chain_poller: P, - chain_notifier: ChainNotifier<'a, C, L>, + chain_notifier: ChainNotifier, } /// The `Cache` trait defines behavior for managing a block header cache, where block headers are @@ -194,7 +189,7 @@ where /// Implementations may define how long to retain headers such that it's unlikely they will ever be /// needed to disconnect a block. In cases where block sources provide access to headers on stale /// forks reliably, caches may be entirely unnecessary. -pub trait Cache { +pub(crate) trait Cache { /// Retrieves the block header keyed by the given block hash. fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader>; @@ -226,7 +221,21 @@ impl Cache for UnboundedCache { } } -impl<'a, P: Poll, C: Cache, L: Deref> SpvClient<'a, P, C, L> +impl Cache for &mut UnboundedCache { + fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { + self.get(block_hash) + } + + fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { + self.insert(block_hash, block_header); + } + + fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) { + self.retain(|_, block_info| block_info.height < fork_point.height); + } +} + +impl SpvClient where L::Target: chain::Listen, { @@ -241,7 +250,7 @@ where /// /// [`poll_best_tip`]: SpvClient::poll_best_tip pub fn new( - chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: &'a mut C, + chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: UnboundedCache, chain_listener: L, ) -> Self { let chain_notifier = ChainNotifier { header_cache, chain_listener }; @@ -295,15 +304,15 @@ where /// Notifies [listeners] of blocks that have been connected or disconnected from the chain. /// /// [listeners]: lightning::chain::Listen -pub struct ChainNotifier<'a, C: Cache, L: Deref> +pub(crate) struct ChainNotifier where L::Target: chain::Listen, { /// Cache for looking up headers before fetching from a block source. - header_cache: &'a mut C, + pub(crate) header_cache: C, /// Listener that will be notified of connected or disconnected blocks. - chain_listener: L, + pub(crate) chain_listener: L, } /// Changes made to the chain between subsequent polls that transformed it from having one chain tip @@ -321,7 +330,7 @@ struct ChainDifference { connected_blocks: Vec, } -impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L> +impl ChainNotifier where L::Target: chain::Listen, { @@ -481,9 +490,9 @@ mod spv_client_tests { let best_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = UnboundedCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(best_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => { assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); @@ -500,9 +509,9 @@ mod spv_client_tests { let common_tip = chain.tip(); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = UnboundedCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(common_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(common_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { @@ -520,9 +529,9 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = UnboundedCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { @@ -540,9 +549,9 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = UnboundedCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { @@ -560,9 +569,9 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = UnboundedCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { @@ -581,9 +590,9 @@ mod spv_client_tests { let worse_tip = chain.tip(); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = UnboundedCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(best_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { From c83cb72029b56c69d07e8187e051945125f0307e Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 27 Jan 2026 00:37:58 +0000 Subject: [PATCH 09/19] Make `UnboundedCache` bounded In the previous commit we moved to hard-coding `UnboundedCache` in the `lightning-block-sync` interface. This is great, except that its an unbounded cache that can use arbitrary amounts of memory (though never really all that much - its just headers that come in while we're running). Here we simply limit the size, and while we're at it give it a more generic `HeaderCache` name. --- lightning-block-sync/src/init.rs | 7 ++-- lightning-block-sync/src/lib.rs | 49 ++++++++++++++++---------- lightning-block-sync/src/test_utils.rs | 9 ++--- 3 files changed, 39 insertions(+), 26 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index 5e1c344e3f5..ddb90d6d97f 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -2,10 +2,9 @@ //! from disk. use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader}; -use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier, UnboundedCache}; +use crate::{BlockSource, BlockSourceResult, ChainNotifier, HeaderCache}; use bitcoin::block::Header; -use bitcoin::hash_types::BlockHash; use bitcoin::network::Network; use lightning::chain; @@ -138,7 +137,7 @@ where /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor pub async fn synchronize_listeners( block_source: B, network: Network, mut chain_listeners: Vec<(BestBlock, &L)>, -) -> BlockSourceResult<(UnboundedCache, ValidatedBlockHeader)> +) -> BlockSourceResult<(HeaderCache, ValidatedBlockHeader)> where B::Target: BlockSource, { @@ -149,7 +148,7 @@ where let mut chain_listeners_at_height = Vec::new(); let mut most_common_ancestor = None; let mut most_connected_blocks = Vec::new(); - let mut header_cache = UnboundedCache::new(); + let mut header_cache = HeaderCache::new(); for (old_best_block, chain_listener) in chain_listeners.drain(..) { // Disconnect any stale blocks, but keep them in the cache for the next iteration. let (common_ancestor, connected_blocks) = { diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 4ed8a466629..8f8bd84fd2a 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -176,7 +176,7 @@ where { chain_tip: ValidatedBlockHeader, chain_poller: P, - chain_notifier: ChainNotifier, + chain_notifier: ChainNotifier, } /// The `Cache` trait defines behavior for managing a block header cache, where block headers are @@ -204,34 +204,47 @@ pub(crate) trait Cache { fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader); } -/// Unbounded cache of block headers keyed by block hash. -pub type UnboundedCache = std::collections::HashMap; +/// Bounded cache of block headers keyed by block hash. +/// +/// Retains only the last `ANTI_REORG_DELAY * 2` block headers based on height. +pub struct HeaderCache(std::collections::HashMap); -impl Cache for UnboundedCache { +impl HeaderCache { + /// Creates a new empty header cache. + pub fn new() -> Self { + Self(std::collections::HashMap::new()) + } +} + +impl Cache for HeaderCache { fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { - self.get(block_hash) + self.0.get(block_hash) } fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { - self.insert(block_hash, block_header); + self.0.insert(block_hash, block_header); + + // Remove headers older than a week. + let cutoff_height = block_header.height.saturating_sub(6 * 24 * 7); + self.0.retain(|_, header| header.height >= cutoff_height); } fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) { - self.retain(|_, block_info| block_info.height < fork_point.height); + self.0.retain(|_, block_info| block_info.height < fork_point.height); } } -impl Cache for &mut UnboundedCache { +impl Cache for &mut HeaderCache { fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { - self.get(block_hash) + self.0.get(block_hash) } fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { - self.insert(block_hash, block_header); + (*self).block_connected(block_hash, block_header); } fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) { - self.retain(|_, block_info| block_info.height < fork_point.height); + self.0.retain(|_, block_info| block_info.height < fork_point.height); } } @@ -250,7 +263,7 @@ where /// /// [`poll_best_tip`]: SpvClient::poll_best_tip pub fn new( - chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: UnboundedCache, + chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: HeaderCache, chain_listener: L, ) -> Self { let chain_notifier = ChainNotifier { header_cache, chain_listener }; @@ -490,7 +503,7 @@ mod spv_client_tests { let best_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; let mut client = SpvClient::new(best_tip, poller, cache, &mut listener); match client.poll_best_tip().await { @@ -509,7 +522,7 @@ mod spv_client_tests { let common_tip = chain.tip(); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; let mut client = SpvClient::new(common_tip, poller, cache, &mut listener); match client.poll_best_tip().await { @@ -529,7 +542,7 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); match client.poll_best_tip().await { @@ -549,7 +562,7 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); match client.poll_best_tip().await { @@ -569,7 +582,7 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); match client.poll_best_tip().await { @@ -590,7 +603,7 @@ mod spv_client_tests { let worse_tip = chain.tip(); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; let mut client = SpvClient::new(best_tip, poller, cache, &mut listener); match client.poll_best_tip().await { diff --git a/lightning-block-sync/src/test_utils.rs b/lightning-block-sync/src/test_utils.rs index 3d7870afb1e..89cb3e81d60 100644 --- a/lightning-block-sync/src/test_utils.rs +++ b/lightning-block-sync/src/test_utils.rs @@ -1,6 +1,7 @@ use crate::poll::{Validate, ValidatedBlockHeader}; use crate::{ - BlockData, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceResult, UnboundedCache, + BlockData, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceResult, Cache, + HeaderCache, }; use bitcoin::block::{Block, Header, Version}; @@ -144,12 +145,12 @@ impl Blockchain { self.blocks.pop() } - pub fn header_cache(&self, heights: std::ops::RangeInclusive) -> UnboundedCache { - let mut cache = UnboundedCache::new(); + pub fn header_cache(&self, heights: std::ops::RangeInclusive) -> HeaderCache { + let mut cache = HeaderCache::new(); for i in heights { let value = self.at_height(i); let key = value.header.block_hash(); - assert!(cache.insert(key, value).is_none()); + cache.block_connected(key, value); } cache } From 0a9b8a15a721431f0c5baedf55da6abb48b4867f Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 27 Jan 2026 15:38:11 +0000 Subject: [PATCH 10/19] f correct cache retention on reorg --- lightning-block-sync/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 8f8bd84fd2a..37d87339e06 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -230,7 +230,7 @@ impl Cache for HeaderCache { } fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) { - self.0.retain(|_, block_info| block_info.height < fork_point.height); + self.0.retain(|_, block_info| block_info.height <= fork_point.height); } } @@ -244,7 +244,7 @@ impl Cache for &mut HeaderCache { } fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) { - self.0.retain(|_, block_info| block_info.height < fork_point.height); + self.0.retain(|_, block_info| block_info.height <= fork_point.height); } } From fd0230b509ae2cc0adac9a3ac228860491acc111 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 27 Jan 2026 16:51:38 +0000 Subject: [PATCH 11/19] f correct + constify header cache limit --- lightning-block-sync/src/lib.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 37d87339e06..5f1ae88fcc5 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -204,9 +204,12 @@ pub(crate) trait Cache { fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader); } +/// The maximum number of [`ValidatedBlockHeader`]s stored in a [`HeaderCache`]. +pub const HEADER_CACHE_LIMIT: u32 = 6 * 24 * 7; + /// Bounded cache of block headers keyed by block hash. /// -/// Retains only the last `ANTI_REORG_DELAY * 2` block headers based on height. +/// Retains only the latest [`HEADER_CACHE_LIMIT`] block headers based on height. pub struct HeaderCache(std::collections::HashMap); impl HeaderCache { @@ -225,7 +228,7 @@ impl Cache for HeaderCache { self.0.insert(block_hash, block_header); // Remove headers older than a week. - let cutoff_height = block_header.height.saturating_sub(6 * 24 * 7); + let cutoff_height = block_header.height.saturating_sub(HEADER_CACHE_LIMIT); self.0.retain(|_, header| header.height >= cutoff_height); } From 4ba4f0ed2726867e625a254d02c848623ba795e7 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 8 Dec 2025 01:10:30 +0000 Subject: [PATCH 12/19] Consolidate all the pub aync utils to `native_async` --- lightning-background-processor/src/lib.rs | 4 +-- lightning/src/chain/chainmonitor.rs | 3 +- lightning/src/events/bump_transaction/mod.rs | 2 +- lightning/src/events/bump_transaction/sync.rs | 3 +- lightning/src/sign/mod.rs | 2 +- lightning/src/util/async_poll.rs | 28 ----------------- lightning/src/util/mod.rs | 2 +- lightning/src/util/native_async.rs | 31 ++++++++++++++++++- lightning/src/util/persist.rs | 4 +-- lightning/src/util/test_utils.rs | 2 +- 10 files changed, 41 insertions(+), 40 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index f052f3d8d4c..56b557d2af5 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -55,9 +55,9 @@ use lightning::routing::utxo::UtxoLookup; #[cfg(not(c_bindings))] use lightning::sign::EntropySource; use lightning::sign::{ChangeDestinationSource, ChangeDestinationSourceSync, OutputSpender}; -#[cfg(not(c_bindings))] -use lightning::util::async_poll::MaybeSend; use lightning::util::logger::Logger; +#[cfg(not(c_bindings))] +use lightning::util::native_async::MaybeSend; use lightning::util::persist::{ KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 7db1b697c2b..0cd4c8ab7f9 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -51,10 +51,9 @@ use crate::sign::ecdsa::EcdsaChannelSigner; use crate::sign::{EntropySource, PeerStorageKey, SignerProvider}; use crate::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard}; use crate::types::features::{InitFeatures, NodeFeatures}; -use crate::util::async_poll::{MaybeSend, MaybeSync}; use crate::util::errors::APIError; use crate::util::logger::{Logger, WithContext}; -use crate::util::native_async::FutureSpawner; +use crate::util::native_async::{FutureSpawner, MaybeSend, MaybeSync}; use crate::util::persist::{KVStore, MonitorName, MonitorUpdatingPersisterAsync}; #[cfg(peer_storage)] use crate::util::ser::{VecWriter, Writeable}; diff --git a/lightning/src/events/bump_transaction/mod.rs b/lightning/src/events/bump_transaction/mod.rs index ff034176385..a17feae4569 100644 --- a/lightning/src/events/bump_transaction/mod.rs +++ b/lightning/src/events/bump_transaction/mod.rs @@ -38,8 +38,8 @@ use crate::sign::{ P2WPKH_WITNESS_WEIGHT, }; use crate::sync::Mutex; -use crate::util::async_poll::{MaybeSend, MaybeSync}; use crate::util::logger::Logger; +use crate::util::native_async::{MaybeSend, MaybeSync}; use bitcoin::amount::Amount; use bitcoin::consensus::Encodable; diff --git a/lightning/src/events/bump_transaction/sync.rs b/lightning/src/events/bump_transaction/sync.rs index f4245cd5194..a367b8bb9e1 100644 --- a/lightning/src/events/bump_transaction/sync.rs +++ b/lightning/src/events/bump_transaction/sync.rs @@ -18,8 +18,9 @@ use crate::chain::chaininterface::BroadcasterInterface; use crate::chain::ClaimId; use crate::prelude::*; use crate::sign::SignerProvider; -use crate::util::async_poll::{dummy_waker, MaybeSend, MaybeSync}; +use crate::util::async_poll::dummy_waker; use crate::util::logger::Logger; +use crate::util::native_async::{MaybeSend, MaybeSync}; use bitcoin::{Psbt, ScriptBuf, Transaction, TxOut}; diff --git a/lightning/src/sign/mod.rs b/lightning/src/sign/mod.rs index 84bfbb902ea..10a3c155374 100644 --- a/lightning/src/sign/mod.rs +++ b/lightning/src/sign/mod.rs @@ -58,7 +58,7 @@ use crate::ln::script::ShutdownScript; use crate::offers::invoice::UnsignedBolt12Invoice; use crate::types::features::ChannelTypeFeatures; use crate::types::payment::PaymentPreimage; -use crate::util::async_poll::MaybeSend; +use crate::util::native_async::MaybeSend; use crate::util::ser::{ReadableArgs, Writeable}; use crate::util::transaction_utils; diff --git a/lightning/src/util/async_poll.rs b/lightning/src/util/async_poll.rs index 57df5b26cb0..23ca1aad603 100644 --- a/lightning/src/util/async_poll.rs +++ b/lightning/src/util/async_poll.rs @@ -164,31 +164,3 @@ const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } } - -/// Marker trait to optionally implement `Sync` under std. -/// -/// This is not exported to bindings users as async is only supported in Rust. -#[cfg(feature = "std")] -pub use core::marker::Sync as MaybeSync; - -#[cfg(not(feature = "std"))] -/// Marker trait to optionally implement `Sync` under std. -/// -/// This is not exported to bindings users as async is only supported in Rust. -pub trait MaybeSync {} -#[cfg(not(feature = "std"))] -impl MaybeSync for T where T: ?Sized {} - -/// Marker trait to optionally implement `Send` under std. -/// -/// This is not exported to bindings users as async is only supported in Rust. -#[cfg(feature = "std")] -pub use core::marker::Send as MaybeSend; - -#[cfg(not(feature = "std"))] -/// Marker trait to optionally implement `Send` under std. -/// -/// This is not exported to bindings users as async is only supported in Rust. -pub trait MaybeSend {} -#[cfg(not(feature = "std"))] -impl MaybeSend for T where T: ?Sized {} diff --git a/lightning/src/util/mod.rs b/lightning/src/util/mod.rs index dcbea904b51..51e608d185f 100644 --- a/lightning/src/util/mod.rs +++ b/lightning/src/util/mod.rs @@ -20,7 +20,7 @@ pub mod mut_global; pub mod anchor_channel_reserves; -pub mod async_poll; +pub(crate) mod async_poll; #[cfg(fuzzing)] pub mod base32; #[cfg(not(fuzzing))] diff --git a/lightning/src/util/native_async.rs b/lightning/src/util/native_async.rs index 0c380f2b1d1..31b07c2f3b5 100644 --- a/lightning/src/util/native_async.rs +++ b/lightning/src/util/native_async.rs @@ -9,8 +9,9 @@ #[cfg(all(test, feature = "std"))] use crate::sync::{Arc, Mutex}; -use crate::util::async_poll::{MaybeSend, MaybeSync}; +#[cfg(test)] +use alloc::boxed::Box; #[cfg(all(test, not(feature = "std")))] use alloc::rc::Rc; @@ -53,6 +54,34 @@ trait MaybeSendableFuture: Future + MaybeSend + 'static {} #[cfg(test)] impl + MaybeSend + 'static> MaybeSendableFuture for F {} +/// Marker trait to optionally implement `Sync` under std. +/// +/// This is not exported to bindings users as async is only supported in Rust. +#[cfg(feature = "std")] +pub use core::marker::Sync as MaybeSync; + +#[cfg(not(feature = "std"))] +/// Marker trait to optionally implement `Sync` under std. +/// +/// This is not exported to bindings users as async is only supported in Rust. +pub trait MaybeSync {} +#[cfg(not(feature = "std"))] +impl MaybeSync for T where T: ?Sized {} + +/// Marker trait to optionally implement `Send` under std. +/// +/// This is not exported to bindings users as async is only supported in Rust. +#[cfg(feature = "std")] +pub use core::marker::Send as MaybeSend; + +#[cfg(not(feature = "std"))] +/// Marker trait to optionally implement `Send` under std. +/// +/// This is not exported to bindings users as async is only supported in Rust. +pub trait MaybeSend {} +#[cfg(not(feature = "std"))] +impl MaybeSend for T where T: ?Sized {} + /// A simple [`FutureSpawner`] which holds [`Future`]s until they are manually polled via /// [`Self::poll_futures`]. #[cfg(all(test, feature = "std"))] diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index d17ecdd3696..47aed70ca93 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -37,10 +37,10 @@ use crate::ln::types::ChannelId; use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider}; use crate::sync::Mutex; use crate::util::async_poll::{ - dummy_waker, MaybeSend, MaybeSync, MultiResultFuturePoller, ResultFuture, TwoFutureJoiner, + dummy_waker, MultiResultFuturePoller, ResultFuture, TwoFutureJoiner, }; use crate::util::logger::Logger; -use crate::util::native_async::FutureSpawner; +use crate::util::native_async::{FutureSpawner, MaybeSend, MaybeSync}; use crate::util::ser::{Readable, ReadableArgs, Writeable}; use crate::util::wakers::Notifier; diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 077f6d62687..d80c1b68173 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -51,7 +51,6 @@ use crate::sign::{self, ReceiveAuthKey}; use crate::sign::{ChannelSigner, PeerStorageKey}; use crate::sync::RwLock; use crate::types::features::{ChannelFeatures, InitFeatures, NodeFeatures}; -use crate::util::async_poll::MaybeSend; use crate::util::config::UserConfig; use crate::util::dyn_signer::{ DynKeysInterface, DynKeysInterfaceTrait, DynPhantomKeysInterface, DynSigner, @@ -59,6 +58,7 @@ use crate::util::dyn_signer::{ use crate::util::logger::{Logger, Record}; #[cfg(feature = "std")] use crate::util::mut_global::MutGlobal; +use crate::util::native_async::MaybeSend; use crate::util::persist::{KVStore, KVStoreSync, MonitorName}; use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use crate::util::test_channel_signer::{EnforcementState, TestChannelSigner}; From efa3eb01c99cc1ea6a30c2b34778406ba31d0d6f Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 18 Oct 2025 01:36:01 +0000 Subject: [PATCH 13/19] Add `async_poll.rs` to `lightning-block-sync` In the next commit we'll fetch blocks during initial connection in parallel, which requires a multi-future poller. Here we add a symlink to the existing `lightning` `async_poll.rs` file, making it available in `lightning-block-sync` --- lightning-block-sync/src/async_poll.rs | 1 + lightning-block-sync/src/lib.rs | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) create mode 120000 lightning-block-sync/src/async_poll.rs diff --git a/lightning-block-sync/src/async_poll.rs b/lightning-block-sync/src/async_poll.rs new file mode 120000 index 00000000000..eb85cdac697 --- /dev/null +++ b/lightning-block-sync/src/async_poll.rs @@ -0,0 +1 @@ +../../lightning/src/util/async_poll.rs \ No newline at end of file diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 5f1ae88fcc5..7788103e6a2 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -16,9 +16,11 @@ #![deny(rustdoc::broken_intra_doc_links)] #![deny(rustdoc::private_intra_doc_links)] #![deny(missing_docs)] -#![deny(unsafe_code)] #![cfg_attr(docsrs, feature(doc_cfg))] +extern crate alloc; +extern crate core; + #[cfg(any(feature = "rest-client", feature = "rpc-client"))] pub mod http; @@ -42,6 +44,9 @@ mod test_utils; #[cfg(any(feature = "rest-client", feature = "rpc-client"))] mod utils; +#[allow(unused)] +mod async_poll; + use crate::poll::{ChainTip, Poll, ValidatedBlockHeader}; use bitcoin::block::{Block, Header}; From cdc7b2b98bb3777f45a975829e97632f5fb16908 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 7 Dec 2025 23:30:57 +0000 Subject: [PATCH 14/19] Fetch blocks from source in parallel during initial sync In `init::synchronize_listeners` we may end up spending a decent chunk of our time just fetching block data. Here we parallelize that step across up to 36 blocks at a time. On my node with bitcoind on localhost, the impact of this is somewhat muted by block deserialization being the bulk of the work, however a networked bitcoind would likely change that. Even still, fetching a batch of 36 blocks in parallel happens on my node in ~615 ms vs ~815ms in serial. --- lightning-block-sync/src/init.rs | 85 +++++++++++++++++--------------- 1 file changed, 46 insertions(+), 39 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index ddb90d6d97f..0a0ee3c4bc1 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -1,8 +1,9 @@ //! Utilities to assist in the initial sync required to initialize or reload Rust-Lightning objects //! from disk. -use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader}; -use crate::{BlockSource, BlockSourceResult, ChainNotifier, HeaderCache}; +use crate::async_poll::{MultiResultFuturePoller, ResultFuture}; +use crate::poll::{ChainPoller, Poll, Validate, ValidatedBlockHeader}; +use crate::{BlockData, BlockSource, BlockSourceResult, ChainNotifier, HeaderCache}; use bitcoin::block::Header; use bitcoin::network::Network; @@ -146,7 +147,6 @@ where // Find differences and disconnect blocks for each listener individually. let mut chain_poller = ChainPoller::new(block_source, network); let mut chain_listeners_at_height = Vec::new(); - let mut most_common_ancestor = None; let mut most_connected_blocks = Vec::new(); let mut header_cache = HeaderCache::new(); for (old_best_block, chain_listener) in chain_listeners.drain(..) { @@ -167,19 +167,53 @@ where // Keep track of the most common ancestor and all blocks connected across all listeners. chain_listeners_at_height.push((common_ancestor.height, chain_listener)); if connected_blocks.len() > most_connected_blocks.len() { - most_common_ancestor = Some(common_ancestor); most_connected_blocks = connected_blocks; } } - // Connect new blocks for all listeners at once to avoid re-fetching blocks. - if let Some(common_ancestor) = most_common_ancestor { - let chain_listener = &ChainListenerSet(chain_listeners_at_height); - let mut chain_notifier = ChainNotifier { header_cache: &mut header_cache, chain_listener }; - chain_notifier - .connect_blocks(common_ancestor, most_connected_blocks, &mut chain_poller) - .await - .map_err(|(e, _)| e)?; + while !most_connected_blocks.is_empty() { + #[cfg(not(test))] + const MAX_BLOCKS_AT_ONCE: usize = 6 * 6; // Six hours of blocks, 144MiB encoded + #[cfg(test)] + const MAX_BLOCKS_AT_ONCE: usize = 2; + + let mut fetch_block_futures = + Vec::with_capacity(core::cmp::min(MAX_BLOCKS_AT_ONCE, most_connected_blocks.len())); + for header in most_connected_blocks.iter().rev().take(MAX_BLOCKS_AT_ONCE) { + let fetch_future = chain_poller.fetch_block(header); + fetch_block_futures + .push(ResultFuture::Pending(Box::pin(async move { (header, fetch_future.await) }))); + } + let results = MultiResultFuturePoller::new(fetch_block_futures).await.into_iter(); + + let mut fetched_blocks = [const { None }; MAX_BLOCKS_AT_ONCE]; + for ((header, block_res), result) in results.into_iter().zip(fetched_blocks.iter_mut()) { + *result = Some((header.height, block_res?)); + } + debug_assert!(fetched_blocks.iter().take(most_connected_blocks.len()).all(|r| r.is_some())); + debug_assert!(fetched_blocks + .is_sorted_by_key(|r| r.as_ref().map(|(height, _)| *height).unwrap_or(u32::MAX))); + + for (listener_height, listener) in chain_listeners_at_height.iter() { + // Connect blocks for this listener. + for result in fetched_blocks.iter() { + if let Some((height, block_data)) = result { + if *height > *listener_height { + match &**block_data { + BlockData::FullBlock(block) => { + listener.block_connected(&block, *height); + }, + BlockData::HeaderOnly(header_data) => { + listener.filtered_block_connected(&header_data, &[], *height); + }, + } + } + } + } + } + + most_connected_blocks + .truncate(most_connected_blocks.len().saturating_sub(MAX_BLOCKS_AT_ONCE)); } Ok((header_cache, best_header)) @@ -200,33 +234,6 @@ impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L } } -/// A set of dynamically sized chain listeners, each paired with a starting block height. -struct ChainListenerSet<'a, L: chain::Listen + ?Sized>(Vec<(u32, &'a L)>); - -impl<'a, L: chain::Listen + ?Sized> chain::Listen for ChainListenerSet<'a, L> { - fn block_connected(&self, block: &bitcoin::Block, height: u32) { - for (starting_height, chain_listener) in self.0.iter() { - if height > *starting_height { - chain_listener.block_connected(block, height); - } - } - } - - fn filtered_block_connected( - &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32, - ) { - for (starting_height, chain_listener) in self.0.iter() { - if height > *starting_height { - chain_listener.filtered_block_connected(header, txdata, height); - } - } - } - - fn blocks_disconnected(&self, _fork_point: BestBlock) { - unreachable!() - } -} - #[cfg(test)] mod tests { use super::*; From 323646ab2be3c7f3d1699d09a16b8fb14bf6ebba Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 10 Feb 2026 01:36:21 +0000 Subject: [PATCH 15/19] f msrv --- lightning-block-sync/src/init.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index 0a0ee3c4bc1..cb2ecf9992b 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -191,8 +191,15 @@ where *result = Some((header.height, block_res?)); } debug_assert!(fetched_blocks.iter().take(most_connected_blocks.len()).all(|r| r.is_some())); - debug_assert!(fetched_blocks - .is_sorted_by_key(|r| r.as_ref().map(|(height, _)| *height).unwrap_or(u32::MAX))); + // TODO: When our MSRV is 1.82, use is_sorted_by_key + debug_assert!(fetched_blocks.windows(2).all(|blocks| { + if let (Some(a), Some(b)) = (&blocks[0], &blocks[1]) { + a.0 < b.0 + } else { + // Any non-None blocks have to come before any None entries + blocks[1].is_none() + } + })); for (listener_height, listener) in chain_listeners_at_height.iter() { // Connect blocks for this listener. From 12abae10fc05bbae769542a56084104bd58f2e0e Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 10 Feb 2026 11:50:20 +0000 Subject: [PATCH 16/19] f msrv --- lightning-block-sync/src/init.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index cb2ecf9992b..e440e0fe78d 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -186,7 +186,8 @@ where } let results = MultiResultFuturePoller::new(fetch_block_futures).await.into_iter(); - let mut fetched_blocks = [const { None }; MAX_BLOCKS_AT_ONCE]; + const NO_BLOCK: Option<(u32, crate::poll::ValidatedBlock)> = None; + let mut fetched_blocks = [NO_BLOCK; MAX_BLOCKS_AT_ONCE]; for ((header, block_res), result) in results.into_iter().zip(fetched_blocks.iter_mut()) { *result = Some((header.height, block_res?)); } From 348dfee397d3834e58fe6d3dbc0acc2486dd12ee Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 8 Dec 2025 12:18:01 +0000 Subject: [PATCH 17/19] Silence "elided lifetime has a name" warnings in no-std locking --- lightning/src/sync/nostd_sync.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lightning/src/sync/nostd_sync.rs b/lightning/src/sync/nostd_sync.rs index 12070741918..18055d1ebe4 100644 --- a/lightning/src/sync/nostd_sync.rs +++ b/lightning/src/sync/nostd_sync.rs @@ -61,7 +61,7 @@ impl<'a, T: 'a> LockTestExt<'a> for Mutex { } type ExclLock = MutexGuard<'a, T>; #[inline] - fn unsafe_well_ordered_double_lock_self(&'a self) -> MutexGuard { + fn unsafe_well_ordered_double_lock_self(&'a self) -> MutexGuard<'a, T> { self.lock().unwrap() } } @@ -132,7 +132,7 @@ impl<'a, T: 'a> LockTestExt<'a> for RwLock { } type ExclLock = RwLockWriteGuard<'a, T>; #[inline] - fn unsafe_well_ordered_double_lock_self(&'a self) -> RwLockWriteGuard { + fn unsafe_well_ordered_double_lock_self(&'a self) -> RwLockWriteGuard<'a, T> { self.write().unwrap() } } From f7b403c4ee1ce256ba01c22dce634a5de048c2bf Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 8 Dec 2025 14:15:47 +0000 Subject: [PATCH 18/19] Use the header cache across listeners during initial disconnect In `lightning-blocksync::init::synchronize_listeners`, we may have many listeners we want to do a chain diff on. When doing so, we should make sure we utilize our header cache, rather than querying our chain source for every header we need for each listener. Here we do so, inserting into the cache as we do chain diffs. On my node with a bitcoind on localhost, this brings the calculate-differences step of `init::synchronize_listeners` from ~500ms to under 150ms. --- lightning-block-sync/src/lib.rs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 7788103e6a2..bb3fcc7f1ff 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -198,6 +198,10 @@ pub(crate) trait Cache { /// Retrieves the block header keyed by the given block hash. fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader>; + /// Inserts the given block header during a find_difference operation, implying it might not be + /// the best header. + fn insert_during_diff(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader); + /// Called when a block has been connected to the best chain to ensure it is available to be /// disconnected later if needed. fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader); @@ -229,6 +233,15 @@ impl Cache for HeaderCache { self.0.get(block_hash) } + fn insert_during_diff(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { + self.0.insert(block_hash, block_header); + + // Remove headers older than our newest header minus a week. + let best_height = self.0.iter().map(|(_, header)| header.height).max().unwrap_or(0); + let cutoff_height = best_height.saturating_sub(HEADER_CACHE_LIMIT); + self.0.retain(|_, header| header.height >= cutoff_height); + } + fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { self.0.insert(block_hash, block_header); @@ -247,6 +260,10 @@ impl Cache for &mut HeaderCache { self.0.get(block_hash) } + fn insert_during_diff(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { + (*self).insert_during_diff(block_hash, block_header); + } + fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { (*self).block_connected(block_hash, block_header); } @@ -383,7 +400,7 @@ where /// First resolves `prev_best_block` to a `ValidatedBlockHeader` using the `previous_blocks` /// field as fallback if needed, then finds the common ancestor. async fn find_difference_from_best_block( - &self, current_header: ValidatedBlockHeader, prev_best_block: BestBlock, + &mut self, current_header: ValidatedBlockHeader, prev_best_block: BestBlock, chain_poller: &mut P, ) -> BlockSourceResult { // Try to resolve the header for the previous best block. First try the block_hash, @@ -408,6 +425,7 @@ where )?; if let Ok(header) = chain_poller.get_header(block_hash, Some(height)).await { found_header = Some(header); + self.header_cache.insert_during_diff(*block_hash, header); break; } } From 2454f0cec05071baa15166c78d400f283db4c2ae Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 27 Jan 2026 17:22:06 +0000 Subject: [PATCH 19/19] Include recent blocks in the `synchronize_listeners`-returned cache When `synchronize_listeners` runs, it returns a cache of the headers it needed when doing chain difference-finding. This allows us to ensure that when we start running normally we have all the recent headers in case we need them to reorg. Sadly, in some cases it was returning a mostly-empty cache. Because it was only being filled during block difference reconciliation it would only get a block around each listener's fork point. Worse, because we were calling `disconnect_blocks` with the cache the cache would assume we were reorging against the main chain and drop blocks we actually want. Instead, we avoid dropping blocks on `disconnect_blocks` calls and ensure we always add connected blocks to the cache. --- lightning-block-sync/src/init.rs | 62 ++++++++++++++++++++++++++++---- 1 file changed, 56 insertions(+), 6 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index e440e0fe78d..45b0248b9f7 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -3,7 +3,7 @@ use crate::async_poll::{MultiResultFuturePoller, ResultFuture}; use crate::poll::{ChainPoller, Poll, Validate, ValidatedBlockHeader}; -use crate::{BlockData, BlockSource, BlockSourceResult, ChainNotifier, HeaderCache}; +use crate::{BlockData, BlockSource, BlockSourceResult, Cache, ChainNotifier, HeaderCache}; use bitcoin::block::Header; use bitcoin::network::Network; @@ -153,8 +153,9 @@ where // Disconnect any stale blocks, but keep them in the cache for the next iteration. let (common_ancestor, connected_blocks) = { let chain_listener = &DynamicChainListener(chain_listener); + let mut cache_wrapper = HeaderCacheNoDisconnect(&mut header_cache); let mut chain_notifier = - ChainNotifier { header_cache: &mut header_cache, chain_listener }; + ChainNotifier { header_cache: &mut cache_wrapper, chain_listener }; let difference = chain_notifier .find_difference_from_best_block(best_header, old_best_block, &mut chain_poller) .await?; @@ -189,7 +190,9 @@ where const NO_BLOCK: Option<(u32, crate::poll::ValidatedBlock)> = None; let mut fetched_blocks = [NO_BLOCK; MAX_BLOCKS_AT_ONCE]; for ((header, block_res), result) in results.into_iter().zip(fetched_blocks.iter_mut()) { - *result = Some((header.height, block_res?)); + let block = block_res?; + header_cache.block_connected(header.block_hash, *header); + *result = Some((header.height, block)); } debug_assert!(fetched_blocks.iter().take(most_connected_blocks.len()).all(|r| r.is_some())); // TODO: When our MSRV is 1.82, use is_sorted_by_key @@ -242,10 +245,34 @@ impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L } } +/// Wrapper around HeaderCache that ignores `blocks_disconnected` calls, retaining disconnected +/// blocks in the cache. This is useful during initial sync to keep headers available across +/// multiple listeners. +struct HeaderCacheNoDisconnect<'a>(&'a mut HeaderCache); + +impl<'a> crate::Cache for &mut HeaderCacheNoDisconnect<'a> { + fn look_up(&self, block_hash: &bitcoin::hash_types::BlockHash) -> Option<&ValidatedBlockHeader> { + self.0.look_up(block_hash) + } + + fn insert_during_diff(&mut self, block_hash: bitcoin::hash_types::BlockHash, block_header: ValidatedBlockHeader) { + self.0.insert_during_diff(block_hash, block_header); + } + + fn block_connected(&mut self, block_hash: bitcoin::hash_types::BlockHash, block_header: ValidatedBlockHeader) { + self.0.block_connected(block_hash, block_header); + } + + fn blocks_disconnected(&mut self, _fork_point: &ValidatedBlockHeader) { + // Intentionally ignore disconnections to retain blocks in cache + } +} + #[cfg(test)] mod tests { use super::*; use crate::test_utils::{Blockchain, MockChainListener}; + use crate::Cache; #[tokio::test] async fn sync_from_same_chain() { @@ -266,7 +293,13 @@ mod tests { (chain.best_block_at_height(3), &listener_3 as &dyn chain::Listen), ]; match synchronize_listeners(&chain, Network::Bitcoin, listeners).await { - Ok((_, header)) => assert_eq!(header, chain.tip()), + Ok((cache, header)) => { + assert_eq!(header, chain.tip()); + assert!(cache.look_up(&chain.at_height(1).block_hash).is_some()); + assert!(cache.look_up(&chain.at_height(2).block_hash).is_some()); + assert!(cache.look_up(&chain.at_height(3).block_hash).is_some()); + assert!(cache.look_up(&chain.at_height(4).block_hash).is_some()); + }, Err(e) => panic!("Unexpected error: {:?}", e), } } @@ -297,7 +330,15 @@ mod tests { (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), ]; match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await { - Ok((_, header)) => assert_eq!(header, main_chain.tip()), + Ok((cache, header)) => { + assert_eq!(header, main_chain.tip()); + assert!(cache.look_up(&main_chain.at_height(1).block_hash).is_some()); + assert!(cache.look_up(&main_chain.at_height(2).block_hash).is_some()); + assert!(cache.look_up(&main_chain.at_height(3).block_hash).is_some()); + assert!(cache.look_up(&fork_chain_1.at_height(2).block_hash).is_none()); + assert!(cache.look_up(&fork_chain_2.at_height(3).block_hash).is_none()); + assert!(cache.look_up(&fork_chain_3.at_height(4).block_hash).is_none()); + }, Err(e) => panic!("Unexpected error: {:?}", e), } } @@ -331,7 +372,16 @@ mod tests { (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), ]; match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await { - Ok((_, header)) => assert_eq!(header, main_chain.tip()), + Ok((cache, header)) => { + assert_eq!(header, main_chain.tip()); + assert!(cache.look_up(&main_chain.at_height(1).block_hash).is_some()); + assert!(cache.look_up(&main_chain.at_height(2).block_hash).is_some()); + assert!(cache.look_up(&main_chain.at_height(3).block_hash).is_some()); + assert!(cache.look_up(&main_chain.at_height(4).block_hash).is_some()); + assert!(cache.look_up(&fork_chain_1.at_height(2).block_hash).is_none()); + assert!(cache.look_up(&fork_chain_1.at_height(3).block_hash).is_none()); + assert!(cache.look_up(&fork_chain_1.at_height(4).block_hash).is_none()); + }, Err(e) => panic!("Unexpected error: {:?}", e), } }