From ae6c3bbac098d7938891d93d2ea57242af156565 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 9 Feb 2026 13:55:24 +0100 Subject: [PATCH 1/3] Extract watch_channel_internal/update_channel_internal from Watch impl Pure refactor: move the bodies of Watch::watch_channel and Watch::update_channel into methods on ChainMonitor, and have the Watch trait methods delegate to them. This prepares for adding deferred mode where the Watch methods will conditionally queue operations instead of executing them immediately. Co-Authored-By: Claude Opus 4.6 --- lightning/src/chain/chainmonitor.rs | 300 +++++++++++++++------------- 1 file changed, 156 insertions(+), 144 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 7db1b697c2b..ecad0db6550 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -1058,6 +1058,160 @@ where Ok(ChannelMonitorUpdateStatus::Completed) } + + fn watch_channel_internal( + &self, channel_id: ChannelId, monitor: ChannelMonitor, + ) -> Result { + let logger = WithChannelMonitor::from(&self.logger, &monitor, None); + let mut monitors = self.monitors.write().unwrap(); + let entry = match monitors.entry(channel_id) { + hash_map::Entry::Occupied(_) => { + log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present"); + return Err(()); + }, + hash_map::Entry::Vacant(e) => e, + }; + log_trace!(logger, "Got new ChannelMonitor"); + let update_id = monitor.get_latest_update_id(); + let mut pending_monitor_updates = Vec::new(); + let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor); + match persist_res { + ChannelMonitorUpdateStatus::InProgress => { + log_info!(logger, "Persistence of new ChannelMonitor in progress",); + pending_monitor_updates.push(update_id); + }, + ChannelMonitorUpdateStatus::Completed => { + log_info!(logger, "Persistence of new ChannelMonitor completed",); + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(logger, "{}", err_str); + panic!("{}", err_str); + }, + } + if let Some(ref chain_source) = self.chain_source { + monitor.load_outputs_to_watch(chain_source, &self.logger); + } + entry.insert(MonitorHolder { + monitor, + pending_monitor_updates: Mutex::new(pending_monitor_updates), + }); + Ok(persist_res) + } + + fn update_channel_internal( + &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, + ) -> ChannelMonitorUpdateStatus { + // `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those + // versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`. + debug_assert_eq!(update.channel_id.unwrap(), channel_id); + // Update the monitor that watches the channel referred to by the given outpoint. + let monitors = self.monitors.read().unwrap(); + match monitors.get(&channel_id) { + None => { + let logger = WithContext::from(&self.logger, None, Some(channel_id), None); + log_error!(logger, "Failed to update channel monitor: no such monitor registered"); + + // We should never ever trigger this from within ChannelManager. Technically a + // user could use this object with some proxying in between which makes this + // possible, but in tests and fuzzing, this should be a panic. + #[cfg(debug_assertions)] + panic!("ChannelManager generated a channel update for a channel that was not yet registered!"); + #[cfg(not(debug_assertions))] + ChannelMonitorUpdateStatus::InProgress + }, + Some(monitor_state) => { + let monitor = &monitor_state.monitor; + let logger = WithChannelMonitor::from(&self.logger, &monitor, None); + log_trace!(logger, "Updating ChannelMonitor to id {}", update.update_id,); + + // We hold a `pending_monitor_updates` lock through `update_monitor` to ensure we + // have well-ordered updates from the users' point of view. See the + // `pending_monitor_updates` docs for more. + let mut pending_monitor_updates = + monitor_state.pending_monitor_updates.lock().unwrap(); + let update_res = monitor.update_monitor( + update, + &self.broadcaster, + &self.fee_estimator, + &self.logger, + ); + + let update_id = update.update_id; + let persist_res = if update_res.is_err() { + // Even if updating the monitor returns an error, the monitor's state will + // still be changed. Therefore, we should persist the updated monitor despite the error. + // We don't want to persist a `monitor_update` which results in a failure to apply later + // while reading `channel_monitor` with updates from storage. Instead, we should persist + // the entire `channel_monitor` here. + log_warn!(logger, "Failed to update ChannelMonitor. Going ahead and persisting the entire ChannelMonitor"); + self.persister.update_persisted_channel( + monitor.persistence_key(), + None, + monitor, + ) + } else { + self.persister.update_persisted_channel( + monitor.persistence_key(), + Some(update), + monitor, + ) + }; + match persist_res { + ChannelMonitorUpdateStatus::InProgress => { + pending_monitor_updates.push(update_id); + log_debug!( + logger, + "Persistence of ChannelMonitorUpdate id {:?} in progress", + update_id, + ); + }, + ChannelMonitorUpdateStatus::Completed => { + log_debug!( + logger, + "Persistence of ChannelMonitorUpdate id {:?} completed", + update_id, + ); + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + // Take the monitors lock for writing so that we poison it and any future + // operations going forward fail immediately. + core::mem::drop(pending_monitor_updates); + core::mem::drop(monitors); + let _poison = self.monitors.write().unwrap(); + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(logger, "{}", err_str); + panic!("{}", err_str); + }, + } + + // We may need to start monitoring for any alternative funding transactions. + if let Some(ref chain_source) = self.chain_source { + for (funding_outpoint, funding_script) in + update.internal_renegotiated_funding_data() + { + log_trace!( + logger, + "Registering renegotiated funding outpoint {} with the filter to monitor confirmations and spends", + funding_outpoint + ); + chain_source.register_tx(&funding_outpoint.txid, &funding_script); + chain_source.register_output(WatchedOutput { + block_hash: None, + outpoint: funding_outpoint, + script_pubkey: funding_script, + }); + } + } + + if update_res.is_err() { + ChannelMonitorUpdateStatus::InProgress + } else { + persist_res + } + }, + } + } } impl< @@ -1272,155 +1426,13 @@ where fn watch_channel( &self, channel_id: ChannelId, monitor: ChannelMonitor, ) -> Result { - let logger = WithChannelMonitor::from(&self.logger, &monitor, None); - let mut monitors = self.monitors.write().unwrap(); - let entry = match monitors.entry(channel_id) { - hash_map::Entry::Occupied(_) => { - log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present"); - return Err(()); - }, - hash_map::Entry::Vacant(e) => e, - }; - log_trace!(logger, "Got new ChannelMonitor"); - let update_id = monitor.get_latest_update_id(); - let mut pending_monitor_updates = Vec::new(); - let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor); - match persist_res { - ChannelMonitorUpdateStatus::InProgress => { - log_info!(logger, "Persistence of new ChannelMonitor in progress",); - pending_monitor_updates.push(update_id); - }, - ChannelMonitorUpdateStatus::Completed => { - log_info!(logger, "Persistence of new ChannelMonitor completed",); - }, - ChannelMonitorUpdateStatus::UnrecoverableError => { - let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(logger, "{}", err_str); - panic!("{}", err_str); - }, - } - if let Some(ref chain_source) = self.chain_source { - monitor.load_outputs_to_watch(chain_source, &self.logger); - } - entry.insert(MonitorHolder { - monitor, - pending_monitor_updates: Mutex::new(pending_monitor_updates), - }); - Ok(persist_res) + self.watch_channel_internal(channel_id, monitor) } fn update_channel( &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, ) -> ChannelMonitorUpdateStatus { - // `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those - // versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`. - debug_assert_eq!(update.channel_id.unwrap(), channel_id); - // Update the monitor that watches the channel referred to by the given outpoint. - let monitors = self.monitors.read().unwrap(); - match monitors.get(&channel_id) { - None => { - let logger = WithContext::from(&self.logger, None, Some(channel_id), None); - log_error!(logger, "Failed to update channel monitor: no such monitor registered"); - - // We should never ever trigger this from within ChannelManager. Technically a - // user could use this object with some proxying in between which makes this - // possible, but in tests and fuzzing, this should be a panic. - #[cfg(debug_assertions)] - panic!("ChannelManager generated a channel update for a channel that was not yet registered!"); - #[cfg(not(debug_assertions))] - ChannelMonitorUpdateStatus::InProgress - }, - Some(monitor_state) => { - let monitor = &monitor_state.monitor; - let logger = WithChannelMonitor::from(&self.logger, &monitor, None); - log_trace!(logger, "Updating ChannelMonitor to id {}", update.update_id,); - - // We hold a `pending_monitor_updates` lock through `update_monitor` to ensure we - // have well-ordered updates from the users' point of view. See the - // `pending_monitor_updates` docs for more. - let mut pending_monitor_updates = - monitor_state.pending_monitor_updates.lock().unwrap(); - let update_res = monitor.update_monitor( - update, - &self.broadcaster, - &self.fee_estimator, - &self.logger, - ); - - let update_id = update.update_id; - let persist_res = if update_res.is_err() { - // Even if updating the monitor returns an error, the monitor's state will - // still be changed. Therefore, we should persist the updated monitor despite the error. - // We don't want to persist a `monitor_update` which results in a failure to apply later - // while reading `channel_monitor` with updates from storage. Instead, we should persist - // the entire `channel_monitor` here. - log_warn!(logger, "Failed to update ChannelMonitor. Going ahead and persisting the entire ChannelMonitor"); - self.persister.update_persisted_channel( - monitor.persistence_key(), - None, - monitor, - ) - } else { - self.persister.update_persisted_channel( - monitor.persistence_key(), - Some(update), - monitor, - ) - }; - match persist_res { - ChannelMonitorUpdateStatus::InProgress => { - pending_monitor_updates.push(update_id); - log_debug!( - logger, - "Persistence of ChannelMonitorUpdate id {:?} in progress", - update_id, - ); - }, - ChannelMonitorUpdateStatus::Completed => { - log_debug!( - logger, - "Persistence of ChannelMonitorUpdate id {:?} completed", - update_id, - ); - }, - ChannelMonitorUpdateStatus::UnrecoverableError => { - // Take the monitors lock for writing so that we poison it and any future - // operations going forward fail immediately. - core::mem::drop(pending_monitor_updates); - core::mem::drop(monitors); - let _poison = self.monitors.write().unwrap(); - let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(logger, "{}", err_str); - panic!("{}", err_str); - }, - } - - // We may need to start monitoring for any alternative funding transactions. - if let Some(ref chain_source) = self.chain_source { - for (funding_outpoint, funding_script) in - update.internal_renegotiated_funding_data() - { - log_trace!( - logger, - "Registering renegotiated funding outpoint {} with the filter to monitor confirmations and spends", - funding_outpoint - ); - chain_source.register_tx(&funding_outpoint.txid, &funding_script); - chain_source.register_output(WatchedOutput { - block_hash: None, - outpoint: funding_outpoint, - script_pubkey: funding_script, - }); - } - } - - if update_res.is_err() { - ChannelMonitorUpdateStatus::InProgress - } else { - persist_res - } - }, - } + self.update_channel_internal(channel_id, update) } fn release_pending_monitor_events( From 0a4efd61f8040a0048f112eb7cfc9796c88b7fa8 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 9 Feb 2026 14:46:35 +0100 Subject: [PATCH 2/3] Add deferred bool to ChainMonitor Add a `deferred` parameter to `ChainMonitor::new` and `ChainMonitor::new_async_beta`. When set to true, the Watch trait methods (watch_channel and update_channel) will unimplemented!() for now. All existing callers pass false to preserve current behavior. Co-Authored-By: Claude Opus 4.6 --- fuzz/src/chanmon_consistency.rs | 1 + fuzz/src/full_stack.rs | 1 + fuzz/src/lsps_message.rs | 1 + lightning/src/chain/chainmonitor.rs | 21 +++++++++++++++---- lightning/src/ln/chanmon_update_fail_tests.rs | 1 + lightning/src/ln/channelmanager.rs | 4 ++-- lightning/src/util/test_utils.rs | 1 + 7 files changed, 24 insertions(+), 6 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 87d58da4832..34bd8e091ed 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -285,6 +285,7 @@ impl TestChainMonitor { Arc::clone(&persister), Arc::clone(&keys), keys.get_peer_storage_key(), + false, )), logger, keys, diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index 39588bcdc50..5949f2c1a56 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -597,6 +597,7 @@ pub fn do_test(mut data: &[u8], logger: &Arc) { Arc::new(TestPersister { update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed) }), Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + false, )); let network = Network::Bitcoin; diff --git a/fuzz/src/lsps_message.rs b/fuzz/src/lsps_message.rs index 547a27b70ee..d01e5632025 100644 --- a/fuzz/src/lsps_message.rs +++ b/fuzz/src/lsps_message.rs @@ -59,6 +59,7 @@ pub fn do_test(data: &[u8]) { Arc::clone(&kv_store), Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + false, )); let best_block = BestBlock::from_network(network); let params = ChainParameters { network, best_block }; diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index ecad0db6550..14524d59ea5 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -371,6 +371,9 @@ pub struct ChainMonitor< #[cfg(peer_storage)] our_peerstorage_encryption_key: PeerStorageKey, + + /// When `true`, [`chain::Watch`] operations are queued rather than executed immediately. + deferred: bool, } impl< @@ -397,7 +400,7 @@ where pub fn new_async_beta( chain_source: Option, broadcaster: T, logger: L, feeest: F, persister: MonitorUpdatingPersisterAsync, _entropy_source: ES, - _our_peerstorage_encryption_key: PeerStorageKey, + _our_peerstorage_encryption_key: PeerStorageKey, deferred: bool, ) -> Self { let event_notifier = Arc::new(Notifier::new()); Self { @@ -414,6 +417,7 @@ where pending_send_only_events: Mutex::new(Vec::new()), #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, + deferred, } } } @@ -603,7 +607,7 @@ where /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager pub fn new( chain_source: Option, broadcaster: T, logger: L, feeest: F, persister: P, - _entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey, + _entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey, deferred: bool, ) -> Self { Self { monitors: RwLock::new(new_hash_map()), @@ -619,6 +623,7 @@ where pending_send_only_events: Mutex::new(Vec::new()), #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, + deferred, } } @@ -1426,13 +1431,21 @@ where fn watch_channel( &self, channel_id: ChannelId, monitor: ChannelMonitor, ) -> Result { - self.watch_channel_internal(channel_id, monitor) + if !self.deferred { + return self.watch_channel_internal(channel_id, monitor); + } + + unimplemented!(); } fn update_channel( &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, ) -> ChannelMonitorUpdateStatus { - self.update_channel_internal(channel_id, update) + if !self.deferred { + return self.update_channel_internal(channel_id, update); + } + + unimplemented!(); } fn release_pending_monitor_events( diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 5a0c37bd61d..19715be6e8d 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -4918,6 +4918,7 @@ fn native_async_persist() { native_async_persister, Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + false, ); // Write the initial ChannelMonitor async, testing primarily that the `MonitorEvent::Completed` diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 106cc7ccaed..5f74dfce806 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -20641,7 +20641,7 @@ pub mod bench { let seed_a = [1u8; 32]; let keys_manager_a = KeysManager::new(&seed_a, 42, 42, true); - let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a, &keys_manager_a, keys_manager_a.get_peer_storage_key()); + let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a, &keys_manager_a, keys_manager_a.get_peer_storage_key(), false); let node_a = ChannelManager::new(&fee_estimator, &chain_monitor_a, &tx_broadcaster, &router, &message_router, &logger_a, &keys_manager_a, &keys_manager_a, &keys_manager_a, config.clone(), ChainParameters { network, best_block: BestBlock::from_network(network), @@ -20651,7 +20651,7 @@ pub mod bench { let logger_b = test_utils::TestLogger::with_id("node a".to_owned()); let seed_b = [2u8; 32]; let keys_manager_b = KeysManager::new(&seed_b, 42, 42, true); - let chain_monitor_b = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_b, &keys_manager_b, keys_manager_b.get_peer_storage_key()); + let chain_monitor_b = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_b, &keys_manager_b, keys_manager_b.get_peer_storage_key(), false); let node_b = ChannelManager::new(&fee_estimator, &chain_monitor_b, &tx_broadcaster, &router, &message_router, &logger_b, &keys_manager_b, &keys_manager_b, &keys_manager_b, config.clone(), ChainParameters { network, best_block: BestBlock::from_network(network), diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index bcf39fde482..667cc2ae850 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -537,6 +537,7 @@ impl<'a> TestChainMonitor<'a> { persister, keys_manager, keys_manager.get_peer_storage_key(), + false, ), keys_manager, expect_channel_force_closed: Mutex::new(None), From b140bf901a84662d7383015c6f6effcde8b49582 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 9 Feb 2026 14:48:31 +0100 Subject: [PATCH 3/3] Implement deferred monitor write queueing and flushing Replace the unimplemented!() stubs with a full deferred write implementation. When ChainMonitor has deferred=true, Watch trait operations queue PendingMonitorOp entries instead of executing immediately. A new flush() method drains the queue and forwards operations to the internal watch/update methods, calling channel_monitor_updated on Completed status. The BackgroundProcessor is updated to capture pending_operation_count before persisting the ChannelManager, then flush that many writes afterward - ensuring monitor writes happen in the correct order relative to manager persistence. Key changes: - Add PendingMonitorOp enum and pending_ops queue to ChainMonitor - Implement flush() and pending_operation_count() public methods - Integrate flush calls in BackgroundProcessor (both sync and async) - Add TestChainMonitor::new_deferred, flush helpers, and auto-flush in release_pending_monitor_events for test compatibility - Add create_node_cfgs_deferred for deferred-mode test networks - Add unit tests for queue/flush mechanics and full payment flow Co-Authored-By: Claude Opus 4.6 --- lightning-background-processor/src/lib.rs | 75 ++++- lightning/src/chain/chainmonitor.rs | 376 +++++++++++++++++++++- lightning/src/ln/functional_test_utils.rs | 60 +++- lightning/src/util/test_utils.rs | 59 +++- 4 files changed, 555 insertions(+), 15 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index f052f3d8d4c..5ba799aee48 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1120,6 +1120,10 @@ where let mut futures = Joiner::new(); + // Capture the pending count before persisting. Only this many writes will be + // flushed afterward, so that updates arriving after persist aren't included. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); + if channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!(logger, "Persisting ChannelManager..."); @@ -1317,6 +1321,10 @@ where res?; } + // Flush monitor operations that were pending before we persisted. New updates + // that arrived after are left for the next iteration. + let _ = chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + match check_and_reset_sleeper(&mut last_onion_message_handler_call, || { sleeper(ONION_MESSAGE_HANDLER_TIMER) }) { @@ -1373,6 +1381,7 @@ where // After we exit, ensure we persist the ChannelManager one final time - this avoids // some races where users quit while channel updates were in-flight, with // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); kv_store .write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1381,6 +1390,10 @@ where channel_manager.get_cm().encode(), ) .await?; + + // Flush monitor operations that were pending before final persistence. + let _ = chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + if let Some(ref scorer) = scorer { kv_store .write( @@ -1684,6 +1697,11 @@ impl BackgroundProcessor { channel_manager.get_cm().timer_tick_occurred(); last_freshness_call = Instant::now(); } + + // Capture the pending count before persisting. Only this many writes will be + // flushed afterward, so that updates arriving after persist aren't included. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); + if channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!(logger, "Persisting ChannelManager..."); (kv_store.write( @@ -1695,6 +1713,10 @@ impl BackgroundProcessor { log_trace!(logger, "Done persisting ChannelManager."); } + // Flush monitor operations that were pending before we persisted. New + // updates that arrived after are left for the next iteration. + let _ = chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + if let Some(liquidity_manager) = liquidity_manager.as_ref() { log_trace!(logger, "Persisting LiquidityManager..."); let _ = liquidity_manager.get_lm().persist().map_err(|e| { @@ -1809,12 +1831,17 @@ impl BackgroundProcessor { // After we exit, ensure we persist the ChannelManager one final time - this avoids // some races where users quit while channel updates were in-flight, with // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); kv_store.write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, channel_manager.get_cm().encode(), )?; + + // Flush monitor operations that were pending before final persistence. + let _ = chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + if let Some(ref scorer) = scorer { kv_store.write( SCORER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1896,9 +1923,10 @@ mod tests { use bitcoin::transaction::{Transaction, TxOut}; use bitcoin::{Amount, ScriptBuf, Txid}; use core::sync::atomic::{AtomicBool, Ordering}; + use lightning::chain::chainmonitor; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use lightning::chain::transaction::OutPoint; - use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter}; + use lightning::chain::{BestBlock, Confirm, Filter}; use lightning::events::{Event, PathFailure, ReplayEvent}; use lightning::ln::channelmanager; use lightning::ln::channelmanager::{ @@ -2444,6 +2472,7 @@ mod tests { Arc::clone(&kv_store), Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + true, )); let best_block = BestBlock::from_network(network); let params = ChainParameters { network, best_block }; @@ -2567,6 +2596,8 @@ mod tests { (persist_dir, nodes) } + /// Opens a channel between two nodes without a running `BackgroundProcessor`, + /// so deferred monitor operations are flushed manually at each step. macro_rules! open_channel { ($node_a: expr, $node_b: expr, $channel_value: expr) => {{ begin_open_channel!($node_a, $node_b, $channel_value); @@ -2582,12 +2613,20 @@ mod tests { tx.clone(), ) .unwrap(); + // funding_transaction_generated does not call watch_channel, so no + // deferred op is queued and FundingCreated is available immediately. let msg_a = get_event_msg!( $node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id() ); $node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a); + // Flush node_b's new monitor (watch_channel) so it releases the + // FundingSigned message. + $node_b + .chain_monitor + .flush($node_b.chain_monitor.pending_operation_count(), &$node_b.logger) + .unwrap(); get_event!($node_b, Event::ChannelPending); let msg_b = get_event_msg!( $node_b, @@ -2595,6 +2634,12 @@ mod tests { $node_a.node.get_our_node_id() ); $node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b); + // Flush node_a's new monitor (watch_channel) queued by + // handle_funding_signed. + $node_a + .chain_monitor + .flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger) + .unwrap(); get_event!($node_a, Event::ChannelPending); tx }}; @@ -2720,6 +2765,20 @@ mod tests { confirm_transaction_depth(node, tx, ANTI_REORG_DELAY); } + /// Waits until the background processor has flushed all pending deferred monitor + /// operations for the given node. Panics if the pending count does not reach zero + /// within `EVENT_DEADLINE`. + fn wait_for_flushed(chain_monitor: &ChainMonitor) { + let start = std::time::Instant::now(); + while chain_monitor.pending_operation_count() > 0 { + assert!( + start.elapsed() < EVENT_DEADLINE, + "Pending monitor operations were not flushed within deadline" + ); + std::thread::sleep(Duration::from_millis(10)); + } + } + #[test] fn test_background_processor() { // Test that when a new channel is created, the ChannelManager needs to be re-persisted with @@ -3060,11 +3119,22 @@ mod tests { .node .funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone()) .unwrap(); + // funding_transaction_generated does not call watch_channel, so no deferred op is + // queued and the FundingCreated message is available immediately. let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id); nodes[1].node.handle_funding_created(node_0_id, &msg_0); + // Node 1 has no bg processor, flush its new monitor (watch_channel) manually so + // events and FundingSigned are released. + nodes[1] + .chain_monitor + .flush(nodes[1].chain_monitor.pending_operation_count(), &nodes[1].logger) + .unwrap(); get_event!(nodes[1], Event::ChannelPending); let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id); nodes[0].node.handle_funding_signed(node_1_id, &msg_1); + // Wait for the bg processor to flush the new monitor (watch_channel) queued by + // handle_funding_signed. + wait_for_flushed(&nodes[0].chain_monitor); channel_pending_recv .recv_timeout(EVENT_DEADLINE) .expect("ChannelPending not handled within deadline"); @@ -3125,6 +3195,9 @@ mod tests { error_message.to_string(), ) .unwrap(); + // Wait for the bg processor to flush the monitor update triggered by force close + // so the commitment tx is broadcast. + wait_for_flushed(&nodes[0].chain_monitor); let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap(); confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32); diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 14524d59ea5..0e3ce5563e0 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -60,12 +60,21 @@ use crate::util::persist::{KVStore, MonitorName, MonitorUpdatingPersisterAsync}; use crate::util::ser::{VecWriter, Writeable}; use crate::util::wakers::{Future, Notifier}; +use alloc::collections::VecDeque; use alloc::sync::Arc; #[cfg(peer_storage)] use core::iter::Cycle; use core::ops::Deref; use core::sync::atomic::{AtomicUsize, Ordering}; +/// A pending operation queued for later execution when `ChainMonitor` is in deferred mode. +enum PendingMonitorOp { + /// A new monitor to insert and persist. + NewMonitor { channel_id: ChannelId, monitor: ChannelMonitor, update_id: u64 }, + /// An update to apply and persist. + Update { channel_id: ChannelId, update: ChannelMonitorUpdate }, +} + /// `Persist` defines behavior for persisting channel monitors: this could mean /// writing once to disk, and/or uploading to one or more backup services. /// @@ -374,6 +383,29 @@ pub struct ChainMonitor< /// When `true`, [`chain::Watch`] operations are queued rather than executed immediately. deferred: bool, + /// Queued monitor operations awaiting flush. Unused when `deferred` is `false`. + /// + /// # Locking order with `monitors` + /// + /// The consistent lock order is `pending_ops` **before** `monitors`. + /// + /// [`watch_channel`] holds `pending_ops.lock()` **then** `monitors.read()` to atomically + /// check for duplicate channel IDs in both the pending queue and the flushed set. + /// + /// [`flush`] holds `pending_ops.lock()` across [`watch_channel_internal`] (which acquires + /// `monitors.write()`) for [`NewMonitor`] ops, ensuring mutual exclusion with + /// `watch_channel`: both acquire `pending_ops` first, so they serialize on that lock. + /// + /// For [`Update`] ops, `pending_ops` is released before calling + /// [`update_channel_internal`] so that concurrent `update_channel` queuing is not blocked. + /// + /// [`watch_channel`]: chain::Watch::watch_channel + /// [`flush`]: Self::flush + /// [`watch_channel_internal`]: Self::watch_channel_internal + /// [`update_channel_internal`]: Self::update_channel_internal + /// [`NewMonitor`]: PendingMonitorOp::NewMonitor + /// [`Update`]: PendingMonitorOp::Update + pending_ops: Mutex>>, } impl< @@ -418,6 +450,7 @@ where #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, deferred, + pending_ops: Mutex::new(VecDeque::new()), } } } @@ -624,6 +657,7 @@ where #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, deferred, + pending_ops: Mutex::new(VecDeque::new()), } } @@ -1217,6 +1251,77 @@ where }, } } + + /// Returns the number of pending monitor operations queued for later execution. + pub fn pending_operation_count(&self) -> usize { + self.pending_ops.lock().unwrap().len() + } + + /// Flushes up to `count` pending monitor operations. + /// + /// Returns `Ok(())` when all `count` operations (or the entire queue, if shorter) were + /// flushed. Returns `Err(())` if a `NewMonitor` operation failed, in which case + /// remaining operations are left in the queue for the next call. + pub fn flush(&self, count: usize, logger: &L) -> Result<(), ()> { + if count > 0 { + log_trace!(logger, "Flushing up to {} monitor operations", count); + } + for _ in 0..count { + let mut queue = self.pending_ops.lock().unwrap(); + let op = match queue.pop_front() { + Some(op) => op, + None => return Ok(()), + }; + + let (channel_id, update_id, status) = match op { + PendingMonitorOp::NewMonitor { channel_id, monitor, update_id } => { + let logger = WithChannelMonitor::from(logger, &monitor, None); + log_trace!(logger, "Flushing new monitor"); + // Hold `pending_ops` across the internal call so that + // `watch_channel` (which checks `monitors` + `pending_ops` + // atomically) cannot race with this insertion. + match self.watch_channel_internal(channel_id, monitor) { + Ok(status) => { + drop(queue); + (channel_id, update_id, status) + }, + Err(()) => { + drop(queue); + // The monitor is consumed and cannot be retried. + // Any queued updates for this channel will also + // fail since no monitor was inserted. + log_error!(logger, "watch_channel failed"); + return Err(()); + }, + } + }, + PendingMonitorOp::Update { channel_id, update } => { + let logger = WithContext::from(logger, None, Some(channel_id), None); + log_trace!(logger, "Flushing monitor update {}", update.update_id); + // Release `pending_ops` before the internal call so that + // concurrent `update_channel` queuing is not blocked. + drop(queue); + let update_id = update.update_id; + let status = self.update_channel_internal(channel_id, &update); + (channel_id, update_id, status) + }, + }; + + match status { + ChannelMonitorUpdateStatus::Completed => { + let logger = WithContext::from(logger, None, Some(channel_id), None); + if let Err(e) = self.channel_monitor_updated(channel_id, update_id) { + log_error!(logger, "channel_monitor_updated failed: {:?}", e); + } + }, + ChannelMonitorUpdateStatus::InProgress => {}, + ChannelMonitorUpdateStatus::UnrecoverableError => { + panic!("UnrecoverableError during monitor operation"); + }, + } + } + Ok(()) + } } impl< @@ -1435,7 +1540,24 @@ where return self.watch_channel_internal(channel_id, monitor); } - unimplemented!(); + let update_id = monitor.get_latest_update_id(); + // Atomically check for duplicates in both the pending queue and the + // flushed monitor set. Lock order: `pending_ops` before `monitors` + // (see `pending_ops` field doc). + let mut pending_ops = self.pending_ops.lock().unwrap(); + let monitors = self.monitors.read().unwrap(); + if monitors.contains_key(&channel_id) { + return Err(()); + } + let already_pending = pending_ops.iter().any(|op| match op { + PendingMonitorOp::NewMonitor { channel_id: id, .. } => *id == channel_id, + _ => false, + }); + if already_pending { + return Err(()); + } + pending_ops.push_back(PendingMonitorOp::NewMonitor { channel_id, monitor, update_id }); + Ok(ChannelMonitorUpdateStatus::InProgress) } fn update_channel( @@ -1445,7 +1567,9 @@ where return self.update_channel_internal(channel_id, update); } - unimplemented!(); + let mut pending_ops = self.pending_ops.lock().unwrap(); + pending_ops.push_back(PendingMonitorOp::Update { channel_id, update: update.clone() }); + ChannelMonitorUpdateStatus::InProgress } fn release_pending_monitor_events( @@ -1575,12 +1699,34 @@ where #[cfg(test)] mod tests { - use crate::chain::channelmonitor::ANTI_REORG_DELAY; + use super::ChainMonitor; + use crate::chain::channelmonitor::{ChannelMonitorUpdate, ANTI_REORG_DELAY}; + use crate::chain::transaction::OutPoint; use crate::chain::{ChannelMonitorUpdateStatus, Watch}; use crate::events::{ClosureReason, Event}; + use crate::ln::chan_utils::{ + ChannelTransactionParameters, CounterpartyChannelTransactionParameters, + HolderCommitmentTransaction, + }; + use crate::ln::channel_keys::{DelayedPaymentBasepoint, HtlcBasepoint, RevocationBasepoint}; use crate::ln::functional_test_utils::*; use crate::ln::msgs::{BaseMessageHandler, ChannelMessageHandler, MessageSendEvent}; + use crate::ln::script::ShutdownScript; + use crate::ln::types::ChannelId; + use crate::sign::{ChannelSigner, InMemorySigner, NodeSigner}; + use crate::types::features::ChannelTypeFeatures; + use crate::util::dyn_signer::DynSigner; + use crate::util::test_channel_signer::TestChannelSigner; + use crate::util::test_utils::{ + TestBroadcaster, TestChainSource, TestFeeEstimator, TestKeysInterface, TestLogger, + TestPersister, + }; use crate::{expect_payment_path_successful, get_event_msg}; + use bitcoin::hash_types::Txid; + use bitcoin::hashes::Hash; + use bitcoin::script::ScriptBuf; + use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; + use bitcoin::Network; const CHAINSYNC_MONITOR_PARTITION_FACTOR: u32 = 5; @@ -1838,4 +1984,228 @@ mod tests { }) .is_err()); } + + /// Concrete `ChainMonitor` type wired to the standard test utilities in deferred mode. + type TestDeferredChainMonitor<'a> = ChainMonitor< + TestChannelSigner, + &'a TestChainSource, + &'a TestBroadcaster, + &'a TestFeeEstimator, + &'a TestLogger, + &'a TestPersister, + &'a TestKeysInterface, + >; + + /// Creates a minimal `ChannelMonitorUpdate` with no actual update steps. + fn dummy_update(update_id: u64, channel_id: ChannelId) -> ChannelMonitorUpdate { + ChannelMonitorUpdate { updates: vec![], update_id, channel_id: Some(channel_id) } + } + + /// Creates a minimal `ChannelMonitor` for the given `channel_id`. + fn dummy_monitor( + channel_id: ChannelId, + ) -> crate::chain::channelmonitor::ChannelMonitor { + let secp_ctx = Secp256k1::new(); + let dummy_key = + PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()); + let keys = InMemorySigner::new( + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + true, + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + [41; 32], + [0; 32], + [0; 32], + ); + let counterparty_pubkeys = crate::ln::chan_utils::ChannelPublicKeys { + funding_pubkey: dummy_key, + revocation_basepoint: RevocationBasepoint::from(dummy_key), + payment_point: dummy_key, + delayed_payment_basepoint: DelayedPaymentBasepoint::from(dummy_key), + htlc_basepoint: HtlcBasepoint::from(dummy_key), + }; + let funding_outpoint = OutPoint { txid: Txid::all_zeros(), index: u16::MAX }; + let channel_parameters = ChannelTransactionParameters { + holder_pubkeys: keys.pubkeys(&secp_ctx), + holder_selected_contest_delay: 66, + is_outbound_from_holder: true, + counterparty_parameters: Some(CounterpartyChannelTransactionParameters { + pubkeys: counterparty_pubkeys, + selected_contest_delay: 67, + }), + funding_outpoint: Some(funding_outpoint), + splice_parent_funding_txid: None, + channel_type_features: ChannelTypeFeatures::only_static_remote_key(), + channel_value_satoshis: 0, + }; + let shutdown_script = ShutdownScript::new_p2wpkh_from_pubkey(dummy_key); + let best_block = crate::chain::BestBlock::from_network(Network::Testnet); + let signer = TestChannelSigner::new(DynSigner::new(keys)); + crate::chain::channelmonitor::ChannelMonitor::new( + secp_ctx, + signer, + Some(shutdown_script.into_inner()), + 0, + &ScriptBuf::new(), + &channel_parameters, + true, + 0, + HolderCommitmentTransaction::dummy(0, funding_outpoint, Vec::new()), + best_block, + dummy_key, + channel_id, + false, + ) + } + + fn create_deferred_chain_monitor<'a>( + chain_source: &'a TestChainSource, broadcaster: &'a TestBroadcaster, + logger: &'a TestLogger, fee_est: &'a TestFeeEstimator, persister: &'a TestPersister, + keys: &'a TestKeysInterface, + ) -> TestDeferredChainMonitor<'a> { + ChainMonitor::new( + Some(chain_source), + broadcaster, + logger, + fee_est, + persister, + keys, + keys.get_peer_storage_key(), + true, + ) + } + + /// Tests queueing and flushing of both `watch_channel` and `update_channel` operations + /// when `ChainMonitor` is in deferred mode, verifying that operations flow through to + /// `Persist` and that `channel_monitor_updated` is called on `Completed` status. + #[test] + fn test_queue_and_flush() { + let broadcaster = TestBroadcaster::new(Network::Testnet); + let fee_est = TestFeeEstimator::new(253); + let logger = TestLogger::new(); + let persister = TestPersister::new(); + let chain_source = TestChainSource::new(Network::Testnet); + let keys = TestKeysInterface::new(&[0; 32], Network::Testnet); + let deferred = create_deferred_chain_monitor( + &chain_source, + &broadcaster, + &logger, + &fee_est, + &persister, + &keys, + ); + + // Queue starts empty. + assert_eq!(deferred.pending_operation_count(), 0); + + // Queue a watch_channel, verifying InProgress status. + let chan = ChannelId::from_bytes([1u8; 32]); + let status = Watch::watch_channel(&deferred, chan, dummy_monitor(chan)); + assert_eq!(status, Ok(ChannelMonitorUpdateStatus::InProgress)); + assert_eq!(deferred.pending_operation_count(), 1); + + // Nothing persisted yet — operations are only queued. + assert!(persister.new_channel_persistences.lock().unwrap().is_empty()); + + // Queue two updates after the watch. Update IDs must be sequential (starting + // from 1 since the initial monitor has update_id 0). + assert_eq!( + Watch::update_channel(&deferred, chan, &dummy_update(1, chan)), + ChannelMonitorUpdateStatus::InProgress + ); + assert_eq!( + Watch::update_channel(&deferred, chan, &dummy_update(2, chan)), + ChannelMonitorUpdateStatus::InProgress + ); + assert_eq!(deferred.pending_operation_count(), 3); + + // Flush 2 of 3: persist_new_channel returns Completed (triggers + // channel_monitor_updated), update_persisted_channel returns InProgress (does not). + persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); + persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + deferred.flush(2, &&logger).unwrap(); + + assert_eq!(deferred.pending_operation_count(), 1); + + // persist_new_channel was called for the watch. + assert_eq!(persister.new_channel_persistences.lock().unwrap().len(), 1); + + // Because persist_new_channel returned Completed, channel_monitor_updated was called, + // so update_id 0 should no longer be pending. + let pending = deferred.list_pending_monitor_updates(); + #[cfg(not(c_bindings))] + let pending_for_chan = pending.get(&chan).unwrap(); + #[cfg(c_bindings)] + let pending_for_chan = &pending.iter().find(|(chan_id, _)| *chan_id == chan).unwrap().1; + assert!(!pending_for_chan.contains(&0)); + + // update_persisted_channel was called for update_id 1, and because it returned + // InProgress, update_id 1 remains pending. + let monitor_name = deferred.get_monitor(chan).unwrap().persistence_key(); + assert!(persister + .offchain_monitor_updates + .lock() + .unwrap() + .get(&monitor_name) + .unwrap() + .contains(&1)); + assert!(pending_for_chan.contains(&1)); + + // Flush remaining: update_persisted_channel returns Completed (default), triggers + // channel_monitor_updated. + deferred.flush(1, &&logger).unwrap(); + assert_eq!(deferred.pending_operation_count(), 0); + + // update_persisted_channel was called for update_id 2. + assert!(persister + .offchain_monitor_updates + .lock() + .unwrap() + .get(&monitor_name) + .unwrap() + .contains(&2)); + + // update_id 1 is still pending from the InProgress earlier, but update_id 2 was + // completed in this flush so it is no longer pending. + let pending = deferred.list_pending_monitor_updates(); + #[cfg(not(c_bindings))] + let pending_for_chan = pending.get(&chan).unwrap(); + #[cfg(c_bindings)] + let pending_for_chan = &pending.iter().find(|(chan_id, _)| *chan_id == chan).unwrap().1; + assert!(pending_for_chan.contains(&1)); + assert!(!pending_for_chan.contains(&2)); + + // Flushing an empty queue is a no-op. + let persist_count_before = persister.new_channel_persistences.lock().unwrap().len(); + deferred.flush(5, &&logger).unwrap(); + assert_eq!(persister.new_channel_persistences.lock().unwrap().len(), persist_count_before); + } + + /// Tests that `ChainMonitor` in deferred mode properly defers `watch_channel` and + /// `update_channel` operations, verifying correctness through a complete channel open + /// and payment flow. Operations are auto-flushed via the `TestChainMonitor` + /// `release_pending_monitor_events` helper. + #[test] + fn test_deferred_monitor_payment() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs_deferred(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + let chain_monitor_a = &nodes[0].chain_monitor.chain_monitor; + let chain_monitor_b = &nodes[1].chain_monitor.chain_monitor; + + create_announced_chan_between_nodes(&nodes, 0, 1); + + let (preimage, _hash, ..) = route_payment(&nodes[0], &[&nodes[1]], 10_000); + claim_payment(&nodes[0], &[&nodes[1]], preimage); + + assert_eq!(chain_monitor_a.list_monitors().len(), 1); + assert_eq!(chain_monitor_b.list_monitors().len(), 1); + assert_eq!(chain_monitor_a.pending_operation_count(), 0); + assert_eq!(chain_monitor_b.pending_operation_count(), 0); + } } diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index c21c62d464c..09f23a2d8c5 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -4449,6 +4449,26 @@ fn create_node_cfgs_internal<'a, F>( node_count: usize, chanmon_cfgs: &'a Vec, persisters: Vec<&'a impl test_utils::SyncPersist>, message_router_constructor: F, ) -> Vec> +where + F: Fn( + Arc>, + &'a TestKeysInterface, + ) -> test_utils::TestMessageRouter<'a>, +{ + create_node_cfgs_internal_deferred( + node_count, + chanmon_cfgs, + persisters, + message_router_constructor, + false, + ) +} + +fn create_node_cfgs_internal_deferred<'a, F>( + node_count: usize, chanmon_cfgs: &'a Vec, + persisters: Vec<&'a impl test_utils::SyncPersist>, message_router_constructor: F, + deferred: bool, +) -> Vec> where F: Fn( Arc>, @@ -4460,14 +4480,25 @@ where for i in 0..node_count { let cfg = &chanmon_cfgs[i]; let network_graph = Arc::new(NetworkGraph::new(Network::Testnet, &cfg.logger)); - let chain_monitor = test_utils::TestChainMonitor::new( - Some(&cfg.chain_source), - &cfg.tx_broadcaster, - &cfg.logger, - &cfg.fee_estimator, - persisters[i], - &cfg.keys_manager, - ); + let chain_monitor = if deferred { + test_utils::TestChainMonitor::new_deferred( + Some(&cfg.chain_source), + &cfg.tx_broadcaster, + &cfg.logger, + &cfg.fee_estimator, + persisters[i], + &cfg.keys_manager, + ) + } else { + test_utils::TestChainMonitor::new( + Some(&cfg.chain_source), + &cfg.tx_broadcaster, + &cfg.logger, + &cfg.fee_estimator, + persisters[i], + &cfg.keys_manager, + ) + }; let seed = [i as u8; 32]; nodes.push(NodeCfg { @@ -4507,6 +4538,19 @@ pub fn create_node_cfgs<'a>( ) } +pub fn create_node_cfgs_deferred<'a>( + node_count: usize, chanmon_cfgs: &'a Vec, +) -> Vec> { + let persisters = chanmon_cfgs.iter().map(|c| &c.persister).collect(); + create_node_cfgs_internal_deferred( + node_count, + chanmon_cfgs, + persisters, + test_utils::TestMessageRouter::new_default, + true, + ) +} + pub fn create_node_cfgs_with_persisters<'a>( node_count: usize, chanmon_cfgs: &'a Vec, persisters: Vec<&'a impl test_utils::SyncPersist>, diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 667cc2ae850..cbe73e16e6a 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -509,6 +509,7 @@ pub struct TestChainMonitor<'a> { &'a TestKeysInterface, >, pub keys_manager: &'a TestKeysInterface, + pub logger: &'a TestLogger, /// If this is set to Some(), the next update_channel call (not watch_channel) must be a /// ChannelForceClosed event for the given channel_id with should_broadcast set to the given /// boolean. @@ -524,6 +525,38 @@ impl<'a> TestChainMonitor<'a> { chain_source: Option<&'a TestChainSource>, broadcaster: &'a dyn SyncBroadcaster, logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, persister: &'a dyn SyncPersist, keys_manager: &'a TestKeysInterface, + ) -> Self { + Self::with_deferred( + chain_source, + broadcaster, + logger, + fee_estimator, + persister, + keys_manager, + false, + ) + } + + pub fn new_deferred( + chain_source: Option<&'a TestChainSource>, broadcaster: &'a dyn SyncBroadcaster, + logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, + persister: &'a dyn SyncPersist, keys_manager: &'a TestKeysInterface, + ) -> Self { + Self::with_deferred( + chain_source, + broadcaster, + logger, + fee_estimator, + persister, + keys_manager, + true, + ) + } + + fn with_deferred( + chain_source: Option<&'a TestChainSource>, broadcaster: &'a dyn SyncBroadcaster, + logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, + persister: &'a dyn SyncPersist, keys_manager: &'a TestKeysInterface, deferred: bool, ) -> Self { Self { added_monitors: Mutex::new(Vec::new()), @@ -537,9 +570,10 @@ impl<'a> TestChainMonitor<'a> { persister, keys_manager, keys_manager.get_peer_storage_key(), - false, + deferred, ), keys_manager, + logger, expect_channel_force_closed: Mutex::new(None), expect_monitor_round_trip_fail: Mutex::new(None), #[cfg(feature = "std")] @@ -547,6 +581,10 @@ impl<'a> TestChainMonitor<'a> { } } + pub fn pending_operation_count(&self) -> usize { + self.chain_monitor.pending_operation_count() + } + pub fn complete_sole_pending_chan_update(&self, channel_id: &ChannelId) { let (_, latest_update) = self.latest_monitor_update_id.lock().unwrap().get(channel_id).unwrap().clone(); @@ -677,6 +715,12 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { fn release_pending_monitor_events( &self, ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + // Auto-flush pending operations so that the ChannelManager can pick up monitor + // completion events. When not in deferred mode the queue is empty so this only + // costs a lock acquisition. It ensures standard test helpers (route_payment, etc.) + // work with deferred chain monitors. + let count = self.chain_monitor.pending_operation_count(); + self.chain_monitor.flush(count, &self.logger).unwrap(); return self.chain_monitor.release_pending_monitor_events(); } } @@ -836,6 +880,8 @@ pub struct TestPersister { /// The queue of update statuses we'll return. If none are queued, ::Completed will always be /// returned. pub update_rets: Mutex>, + /// When we get a persist_new_channel call, we push the monitor name here. + pub new_channel_persistences: Mutex>, /// When we get an update_persisted_channel call *with* a ChannelMonitorUpdate, we insert the /// [`ChannelMonitor::get_latest_update_id`] here. pub offchain_monitor_updates: Mutex>>, @@ -846,9 +892,15 @@ pub struct TestPersister { impl TestPersister { pub fn new() -> Self { let update_rets = Mutex::new(VecDeque::new()); + let new_channel_persistences = Mutex::new(Vec::new()); let offchain_monitor_updates = Mutex::new(new_hash_map()); let chain_sync_monitor_persistences = Mutex::new(VecDeque::new()); - Self { update_rets, offchain_monitor_updates, chain_sync_monitor_persistences } + Self { + update_rets, + new_channel_persistences, + offchain_monitor_updates, + chain_sync_monitor_persistences, + } } /// Queue an update status to return. @@ -858,8 +910,9 @@ impl TestPersister { } impl Persist for TestPersister { fn persist_new_channel( - &self, _monitor_name: MonitorName, _data: &ChannelMonitor, + &self, monitor_name: MonitorName, _data: &ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { + self.new_channel_persistences.lock().unwrap().push(monitor_name); if let Some(update_ret) = self.update_rets.lock().unwrap().pop_front() { return update_ret; }