diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 87d58da4832..34bd8e091ed 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -285,6 +285,7 @@ impl TestChainMonitor { Arc::clone(&persister), Arc::clone(&keys), keys.get_peer_storage_key(), + false, )), logger, keys, diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index 39588bcdc50..5949f2c1a56 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -597,6 +597,7 @@ pub fn do_test(mut data: &[u8], logger: &Arc) { Arc::new(TestPersister { update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed) }), Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + false, )); let network = Network::Bitcoin; diff --git a/fuzz/src/lsps_message.rs b/fuzz/src/lsps_message.rs index 547a27b70ee..d01e5632025 100644 --- a/fuzz/src/lsps_message.rs +++ b/fuzz/src/lsps_message.rs @@ -59,6 +59,7 @@ pub fn do_test(data: &[u8]) { Arc::clone(&kv_store), Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + false, )); let best_block = BestBlock::from_network(network); let params = ChainParameters { network, best_block }; diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index f052f3d8d4c..5ba799aee48 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1120,6 +1120,10 @@ where let mut futures = Joiner::new(); + // Capture the pending count before persisting. Only this many writes will be + // flushed afterward, so that updates arriving after persist aren't included. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); + if channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!(logger, "Persisting ChannelManager..."); @@ -1317,6 +1321,10 @@ where res?; } + // Flush monitor operations that were pending before we persisted. New updates + // that arrived after are left for the next iteration. + let _ = chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + match check_and_reset_sleeper(&mut last_onion_message_handler_call, || { sleeper(ONION_MESSAGE_HANDLER_TIMER) }) { @@ -1373,6 +1381,7 @@ where // After we exit, ensure we persist the ChannelManager one final time - this avoids // some races where users quit while channel updates were in-flight, with // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); kv_store .write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1381,6 +1390,10 @@ where channel_manager.get_cm().encode(), ) .await?; + + // Flush monitor operations that were pending before final persistence. + let _ = chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + if let Some(ref scorer) = scorer { kv_store .write( @@ -1684,6 +1697,11 @@ impl BackgroundProcessor { channel_manager.get_cm().timer_tick_occurred(); last_freshness_call = Instant::now(); } + + // Capture the pending count before persisting. Only this many writes will be + // flushed afterward, so that updates arriving after persist aren't included. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); + if channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!(logger, "Persisting ChannelManager..."); (kv_store.write( @@ -1695,6 +1713,10 @@ impl BackgroundProcessor { log_trace!(logger, "Done persisting ChannelManager."); } + // Flush monitor operations that were pending before we persisted. New + // updates that arrived after are left for the next iteration. + let _ = chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + if let Some(liquidity_manager) = liquidity_manager.as_ref() { log_trace!(logger, "Persisting LiquidityManager..."); let _ = liquidity_manager.get_lm().persist().map_err(|e| { @@ -1809,12 +1831,17 @@ impl BackgroundProcessor { // After we exit, ensure we persist the ChannelManager one final time - this avoids // some races where users quit while channel updates were in-flight, with // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. + let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count(); kv_store.write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, channel_manager.get_cm().encode(), )?; + + // Flush monitor operations that were pending before final persistence. + let _ = chain_monitor.get_cm().flush(pending_monitor_writes, &logger); + if let Some(ref scorer) = scorer { kv_store.write( SCORER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -1896,9 +1923,10 @@ mod tests { use bitcoin::transaction::{Transaction, TxOut}; use bitcoin::{Amount, ScriptBuf, Txid}; use core::sync::atomic::{AtomicBool, Ordering}; + use lightning::chain::chainmonitor; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use lightning::chain::transaction::OutPoint; - use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter}; + use lightning::chain::{BestBlock, Confirm, Filter}; use lightning::events::{Event, PathFailure, ReplayEvent}; use lightning::ln::channelmanager; use lightning::ln::channelmanager::{ @@ -2444,6 +2472,7 @@ mod tests { Arc::clone(&kv_store), Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + true, )); let best_block = BestBlock::from_network(network); let params = ChainParameters { network, best_block }; @@ -2567,6 +2596,8 @@ mod tests { (persist_dir, nodes) } + /// Opens a channel between two nodes without a running `BackgroundProcessor`, + /// so deferred monitor operations are flushed manually at each step. macro_rules! open_channel { ($node_a: expr, $node_b: expr, $channel_value: expr) => {{ begin_open_channel!($node_a, $node_b, $channel_value); @@ -2582,12 +2613,20 @@ mod tests { tx.clone(), ) .unwrap(); + // funding_transaction_generated does not call watch_channel, so no + // deferred op is queued and FundingCreated is available immediately. let msg_a = get_event_msg!( $node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id() ); $node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a); + // Flush node_b's new monitor (watch_channel) so it releases the + // FundingSigned message. + $node_b + .chain_monitor + .flush($node_b.chain_monitor.pending_operation_count(), &$node_b.logger) + .unwrap(); get_event!($node_b, Event::ChannelPending); let msg_b = get_event_msg!( $node_b, @@ -2595,6 +2634,12 @@ mod tests { $node_a.node.get_our_node_id() ); $node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b); + // Flush node_a's new monitor (watch_channel) queued by + // handle_funding_signed. + $node_a + .chain_monitor + .flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger) + .unwrap(); get_event!($node_a, Event::ChannelPending); tx }}; @@ -2720,6 +2765,20 @@ mod tests { confirm_transaction_depth(node, tx, ANTI_REORG_DELAY); } + /// Waits until the background processor has flushed all pending deferred monitor + /// operations for the given node. Panics if the pending count does not reach zero + /// within `EVENT_DEADLINE`. + fn wait_for_flushed(chain_monitor: &ChainMonitor) { + let start = std::time::Instant::now(); + while chain_monitor.pending_operation_count() > 0 { + assert!( + start.elapsed() < EVENT_DEADLINE, + "Pending monitor operations were not flushed within deadline" + ); + std::thread::sleep(Duration::from_millis(10)); + } + } + #[test] fn test_background_processor() { // Test that when a new channel is created, the ChannelManager needs to be re-persisted with @@ -3060,11 +3119,22 @@ mod tests { .node .funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone()) .unwrap(); + // funding_transaction_generated does not call watch_channel, so no deferred op is + // queued and the FundingCreated message is available immediately. let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id); nodes[1].node.handle_funding_created(node_0_id, &msg_0); + // Node 1 has no bg processor, flush its new monitor (watch_channel) manually so + // events and FundingSigned are released. + nodes[1] + .chain_monitor + .flush(nodes[1].chain_monitor.pending_operation_count(), &nodes[1].logger) + .unwrap(); get_event!(nodes[1], Event::ChannelPending); let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id); nodes[0].node.handle_funding_signed(node_1_id, &msg_1); + // Wait for the bg processor to flush the new monitor (watch_channel) queued by + // handle_funding_signed. + wait_for_flushed(&nodes[0].chain_monitor); channel_pending_recv .recv_timeout(EVENT_DEADLINE) .expect("ChannelPending not handled within deadline"); @@ -3125,6 +3195,9 @@ mod tests { error_message.to_string(), ) .unwrap(); + // Wait for the bg processor to flush the monitor update triggered by force close + // so the commitment tx is broadcast. + wait_for_flushed(&nodes[0].chain_monitor); let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap(); confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32); diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 7db1b697c2b..1bee15372eb 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -60,12 +60,21 @@ use crate::util::persist::{KVStore, MonitorName, MonitorUpdatingPersisterAsync}; use crate::util::ser::{VecWriter, Writeable}; use crate::util::wakers::{Future, Notifier}; +use alloc::collections::VecDeque; use alloc::sync::Arc; #[cfg(peer_storage)] use core::iter::Cycle; use core::ops::Deref; use core::sync::atomic::{AtomicUsize, Ordering}; +/// A pending operation queued for later execution when `ChainMonitor` is in deferred mode. +enum PendingMonitorOp { + /// A new monitor to insert and persist. + NewMonitor { channel_id: ChannelId, monitor: ChannelMonitor, update_id: u64 }, + /// An update to apply and persist. + Update { channel_id: ChannelId, update: ChannelMonitorUpdate }, +} + /// `Persist` defines behavior for persisting channel monitors: this could mean /// writing once to disk, and/or uploading to one or more backup services. /// @@ -371,6 +380,32 @@ pub struct ChainMonitor< #[cfg(peer_storage)] our_peerstorage_encryption_key: PeerStorageKey, + + /// When `true`, [`chain::Watch`] operations are queued rather than executed immediately. + deferred: bool, + /// Queued monitor operations awaiting flush. Unused when `deferred` is `false`. + /// + /// # Locking order with `monitors` + /// + /// The consistent lock order is `pending_ops` **before** `monitors`. + /// + /// [`watch_channel`] holds `pending_ops.lock()` **then** `monitors.read()` to atomically + /// check for duplicate channel IDs in both the pending queue and the flushed set. + /// + /// [`flush`] holds `pending_ops.lock()` across [`watch_channel_internal`] (which acquires + /// `monitors.write()`) for [`NewMonitor`] ops, ensuring mutual exclusion with + /// `watch_channel`: both acquire `pending_ops` first, so they serialize on that lock. + /// + /// For [`Update`] ops, `pending_ops` is released before calling + /// [`update_channel_internal`] so that concurrent `update_channel` queuing is not blocked. + /// + /// [`watch_channel`]: chain::Watch::watch_channel + /// [`flush`]: Self::flush + /// [`watch_channel_internal`]: Self::watch_channel_internal + /// [`update_channel_internal`]: Self::update_channel_internal + /// [`NewMonitor`]: PendingMonitorOp::NewMonitor + /// [`Update`]: PendingMonitorOp::Update + pending_ops: Mutex>>, } impl< @@ -397,7 +432,7 @@ where pub fn new_async_beta( chain_source: Option, broadcaster: T, logger: L, feeest: F, persister: MonitorUpdatingPersisterAsync, _entropy_source: ES, - _our_peerstorage_encryption_key: PeerStorageKey, + _our_peerstorage_encryption_key: PeerStorageKey, deferred: bool, ) -> Self { let event_notifier = Arc::new(Notifier::new()); Self { @@ -414,6 +449,8 @@ where pending_send_only_events: Mutex::new(Vec::new()), #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, + deferred, + pending_ops: Mutex::new(VecDeque::new()), } } } @@ -603,7 +640,7 @@ where /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager pub fn new( chain_source: Option, broadcaster: T, logger: L, feeest: F, persister: P, - _entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey, + _entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey, deferred: bool, ) -> Self { Self { monitors: RwLock::new(new_hash_map()), @@ -619,6 +656,8 @@ where pending_send_only_events: Mutex::new(Vec::new()), #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, + deferred, + pending_ops: Mutex::new(VecDeque::new()), } } @@ -1058,6 +1097,262 @@ where Ok(ChannelMonitorUpdateStatus::Completed) } + + fn watch_channel_internal( + &self, channel_id: ChannelId, monitor: ChannelMonitor, + ) -> Result { + let logger = WithChannelMonitor::from(&self.logger, &monitor, None); + let mut monitors = self.monitors.write().unwrap(); + let entry = match monitors.entry(channel_id) { + hash_map::Entry::Occupied(_) => { + log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present"); + return Err(()); + }, + hash_map::Entry::Vacant(e) => e, + }; + log_trace!(logger, "Got new ChannelMonitor"); + let update_id = monitor.get_latest_update_id(); + let mut pending_monitor_updates = Vec::new(); + let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor); + match persist_res { + ChannelMonitorUpdateStatus::InProgress => { + log_info!(logger, "Persistence of new ChannelMonitor in progress",); + pending_monitor_updates.push(update_id); + }, + ChannelMonitorUpdateStatus::Completed => { + log_info!(logger, "Persistence of new ChannelMonitor completed",); + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(logger, "{}", err_str); + panic!("{}", err_str); + }, + } + if let Some(ref chain_source) = self.chain_source { + monitor.load_outputs_to_watch(chain_source, &self.logger); + } + entry.insert(MonitorHolder { + monitor, + pending_monitor_updates: Mutex::new(pending_monitor_updates), + }); + Ok(persist_res) + } + + fn update_channel_internal( + &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, + ) -> ChannelMonitorUpdateStatus { + // `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those + // versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`. + debug_assert_eq!(update.channel_id.unwrap(), channel_id); + // Update the monitor that watches the channel referred to by the given outpoint. + let monitors = self.monitors.read().unwrap(); + match monitors.get(&channel_id) { + None => { + let logger = WithContext::from(&self.logger, None, Some(channel_id), None); + log_error!(logger, "Failed to update channel monitor: no such monitor registered"); + + // We should never ever trigger this from within ChannelManager. Technically a + // user could use this object with some proxying in between which makes this + // possible, but in tests and fuzzing, this should be a panic. + #[cfg(debug_assertions)] + panic!("ChannelManager generated a channel update for a channel that was not yet registered!"); + #[cfg(not(debug_assertions))] + ChannelMonitorUpdateStatus::InProgress + }, + Some(monitor_state) => { + let monitor = &monitor_state.monitor; + let logger = WithChannelMonitor::from(&self.logger, &monitor, None); + log_trace!(logger, "Updating ChannelMonitor to id {}", update.update_id,); + + // We hold a `pending_monitor_updates` lock through `update_monitor` to ensure we + // have well-ordered updates from the users' point of view. See the + // `pending_monitor_updates` docs for more. + let mut pending_monitor_updates = + monitor_state.pending_monitor_updates.lock().unwrap(); + let update_res = monitor.update_monitor( + update, + &self.broadcaster, + &self.fee_estimator, + &self.logger, + ); + + let update_id = update.update_id; + let persist_res = if update_res.is_err() { + // Even if updating the monitor returns an error, the monitor's state will + // still be changed. Therefore, we should persist the updated monitor despite the error. + // We don't want to persist a `monitor_update` which results in a failure to apply later + // while reading `channel_monitor` with updates from storage. Instead, we should persist + // the entire `channel_monitor` here. + log_warn!(logger, "Failed to update ChannelMonitor. Going ahead and persisting the entire ChannelMonitor"); + self.persister.update_persisted_channel( + monitor.persistence_key(), + None, + monitor, + ) + } else { + self.persister.update_persisted_channel( + monitor.persistence_key(), + Some(update), + monitor, + ) + }; + match persist_res { + ChannelMonitorUpdateStatus::InProgress => { + pending_monitor_updates.push(update_id); + log_debug!( + logger, + "Persistence of ChannelMonitorUpdate id {:?} in progress", + update_id, + ); + }, + ChannelMonitorUpdateStatus::Completed => { + log_debug!( + logger, + "Persistence of ChannelMonitorUpdate id {:?} completed", + update_id, + ); + }, + ChannelMonitorUpdateStatus::UnrecoverableError => { + // Take the monitors lock for writing so that we poison it and any future + // operations going forward fail immediately. + core::mem::drop(pending_monitor_updates); + core::mem::drop(monitors); + let _poison = self.monitors.write().unwrap(); + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(logger, "{}", err_str); + panic!("{}", err_str); + }, + } + + // We may need to start monitoring for any alternative funding transactions. + if let Some(ref chain_source) = self.chain_source { + for (funding_outpoint, funding_script) in + update.internal_renegotiated_funding_data() + { + log_trace!( + logger, + "Registering renegotiated funding outpoint {} with the filter to monitor confirmations and spends", + funding_outpoint + ); + chain_source.register_tx(&funding_outpoint.txid, &funding_script); + chain_source.register_output(WatchedOutput { + block_hash: None, + outpoint: funding_outpoint, + script_pubkey: funding_script, + }); + } + } + + if update_res.is_err() { + ChannelMonitorUpdateStatus::InProgress + } else { + persist_res + } + }, + } + } + + /// Returns the number of pending monitor operations queued for later execution. + pub fn pending_operation_count(&self) -> usize { + self.pending_ops.lock().unwrap().len() + } + + /// Flushes up to `count` pending monitor operations. + /// + /// Returns `Ok(())` when all `count` operations (or the entire queue, if shorter) were + /// flushed. Returns `Err(())` if a `NewMonitor` operation failed, in which case + /// remaining operations are left in the queue for the next call. + pub fn flush(&self, count: usize, logger: &L) -> Result<(), ()> { + if count > 0 { + log_trace!(logger, "Flushing up to {} monitor operations", count); + eprintln!( + "[ChainMonitor::flush] count={} queue_len={} monitors_len={}", + count, + self.pending_ops.lock().unwrap().len(), + self.monitors.read().unwrap().len(), + ); + } + for i in 0..count { + let mut queue = self.pending_ops.lock().unwrap(); + let op = match queue.pop_front() { + Some(op) => op, + None => { + eprintln!("[ChainMonitor::flush] queue empty at iteration {}", i); + return Ok(()); + }, + }; + + let (channel_id, update_id, status) = match op { + PendingMonitorOp::NewMonitor { channel_id, monitor, update_id } => { + let logger = WithChannelMonitor::from(logger, &monitor, None); + log_trace!(logger, "Flushing new monitor"); + eprintln!("[ChainMonitor::flush] iter={} NewMonitor chan={}", i, channel_id); + // Hold `pending_ops` across the internal call so that + // `watch_channel` (which checks `monitors` + `pending_ops` + // atomically) cannot race with this insertion. + match self.watch_channel_internal(channel_id, monitor) { + Ok(status) => { + drop(queue); + eprintln!( + "[ChainMonitor::flush] NewMonitor chan={} ok status={:?}", + channel_id, status + ); + (channel_id, update_id, status) + }, + Err(()) => { + drop(queue); + // The monitor is consumed and cannot be retried. + // Any queued updates for this channel will also + // fail since no monitor was inserted. + log_error!(logger, "watch_channel failed"); + eprintln!( + "[ChainMonitor::flush] NewMonitor chan={} FAILED", + channel_id + ); + return Err(()); + }, + } + }, + PendingMonitorOp::Update { channel_id, update } => { + let logger = WithContext::from(logger, None, Some(channel_id), None); + log_trace!(logger, "Flushing monitor update {}", update.update_id); + eprintln!( + "[ChainMonitor::flush] iter={} Update chan={} update_id={}", + i, channel_id, update.update_id + ); + // Release `pending_ops` before the internal call so that + // concurrent `update_channel` queuing is not blocked. + drop(queue); + let update_id = update.update_id; + let status = self.update_channel_internal(channel_id, &update); + eprintln!( + "[ChainMonitor::flush] Update chan={} status={:?}", + channel_id, status + ); + (channel_id, update_id, status) + }, + }; + + match status { + ChannelMonitorUpdateStatus::Completed => { + let logger = WithContext::from(logger, None, Some(channel_id), None); + if let Err(e) = self.channel_monitor_updated(channel_id, update_id) { + log_error!(logger, "channel_monitor_updated failed: {:?}", e); + eprintln!("[ChainMonitor::flush] channel_monitor_updated FAILED chan={} update_id={}: {:?}", channel_id, update_id, e); + } + }, + ChannelMonitorUpdateStatus::InProgress => {}, + ChannelMonitorUpdateStatus::UnrecoverableError => { + panic!("UnrecoverableError during monitor operation"); + }, + } + } + eprintln!( + "[ChainMonitor::flush] done monitors_len={}", + self.monitors.read().unwrap().len(), + ); + Ok(()) + } } impl< @@ -1272,155 +1567,56 @@ where fn watch_channel( &self, channel_id: ChannelId, monitor: ChannelMonitor, ) -> Result { - let logger = WithChannelMonitor::from(&self.logger, &monitor, None); - let mut monitors = self.monitors.write().unwrap(); - let entry = match monitors.entry(channel_id) { - hash_map::Entry::Occupied(_) => { - log_error!(logger, "Failed to add new channel data: channel monitor for given channel ID is already present"); - return Err(()); - }, - hash_map::Entry::Vacant(e) => e, - }; - log_trace!(logger, "Got new ChannelMonitor"); - let update_id = monitor.get_latest_update_id(); - let mut pending_monitor_updates = Vec::new(); - let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor); - match persist_res { - ChannelMonitorUpdateStatus::InProgress => { - log_info!(logger, "Persistence of new ChannelMonitor in progress",); - pending_monitor_updates.push(update_id); - }, - ChannelMonitorUpdateStatus::Completed => { - log_info!(logger, "Persistence of new ChannelMonitor completed",); - }, - ChannelMonitorUpdateStatus::UnrecoverableError => { - let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(logger, "{}", err_str); - panic!("{}", err_str); - }, + if !self.deferred { + return self.watch_channel_internal(channel_id, monitor); } - if let Some(ref chain_source) = self.chain_source { - monitor.load_outputs_to_watch(chain_source, &self.logger); + + let update_id = monitor.get_latest_update_id(); + // Atomically check for duplicates in both the pending queue and the + // flushed monitor set. Lock order: `pending_ops` before `monitors` + // (see `pending_ops` field doc). + let mut pending_ops = self.pending_ops.lock().unwrap(); + let monitors = self.monitors.read().unwrap(); + if monitors.contains_key(&channel_id) { + eprintln!( + "[ChainMonitor] watch_channel(deferred) chan={} ALREADY IN MONITORS", + channel_id + ); + return Err(()); } - entry.insert(MonitorHolder { - monitor, - pending_monitor_updates: Mutex::new(pending_monitor_updates), + let already_pending = pending_ops.iter().any(|op| match op { + PendingMonitorOp::NewMonitor { channel_id: id, .. } => *id == channel_id, + _ => false, }); - Ok(persist_res) + if already_pending { + eprintln!("[ChainMonitor] watch_channel(deferred) chan={} ALREADY PENDING", channel_id); + return Err(()); + } + eprintln!( + "[ChainMonitor] watch_channel(deferred) chan={} queued, pending_after={}", + channel_id, + pending_ops.len() + 1, + ); + pending_ops.push_back(PendingMonitorOp::NewMonitor { channel_id, monitor, update_id }); + Ok(ChannelMonitorUpdateStatus::InProgress) } fn update_channel( &self, channel_id: ChannelId, update: &ChannelMonitorUpdate, ) -> ChannelMonitorUpdateStatus { - // `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those - // versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`. - debug_assert_eq!(update.channel_id.unwrap(), channel_id); - // Update the monitor that watches the channel referred to by the given outpoint. - let monitors = self.monitors.read().unwrap(); - match monitors.get(&channel_id) { - None => { - let logger = WithContext::from(&self.logger, None, Some(channel_id), None); - log_error!(logger, "Failed to update channel monitor: no such monitor registered"); - - // We should never ever trigger this from within ChannelManager. Technically a - // user could use this object with some proxying in between which makes this - // possible, but in tests and fuzzing, this should be a panic. - #[cfg(debug_assertions)] - panic!("ChannelManager generated a channel update for a channel that was not yet registered!"); - #[cfg(not(debug_assertions))] - ChannelMonitorUpdateStatus::InProgress - }, - Some(monitor_state) => { - let monitor = &monitor_state.monitor; - let logger = WithChannelMonitor::from(&self.logger, &monitor, None); - log_trace!(logger, "Updating ChannelMonitor to id {}", update.update_id,); - - // We hold a `pending_monitor_updates` lock through `update_monitor` to ensure we - // have well-ordered updates from the users' point of view. See the - // `pending_monitor_updates` docs for more. - let mut pending_monitor_updates = - monitor_state.pending_monitor_updates.lock().unwrap(); - let update_res = monitor.update_monitor( - update, - &self.broadcaster, - &self.fee_estimator, - &self.logger, - ); - - let update_id = update.update_id; - let persist_res = if update_res.is_err() { - // Even if updating the monitor returns an error, the monitor's state will - // still be changed. Therefore, we should persist the updated monitor despite the error. - // We don't want to persist a `monitor_update` which results in a failure to apply later - // while reading `channel_monitor` with updates from storage. Instead, we should persist - // the entire `channel_monitor` here. - log_warn!(logger, "Failed to update ChannelMonitor. Going ahead and persisting the entire ChannelMonitor"); - self.persister.update_persisted_channel( - monitor.persistence_key(), - None, - monitor, - ) - } else { - self.persister.update_persisted_channel( - monitor.persistence_key(), - Some(update), - monitor, - ) - }; - match persist_res { - ChannelMonitorUpdateStatus::InProgress => { - pending_monitor_updates.push(update_id); - log_debug!( - logger, - "Persistence of ChannelMonitorUpdate id {:?} in progress", - update_id, - ); - }, - ChannelMonitorUpdateStatus::Completed => { - log_debug!( - logger, - "Persistence of ChannelMonitorUpdate id {:?} completed", - update_id, - ); - }, - ChannelMonitorUpdateStatus::UnrecoverableError => { - // Take the monitors lock for writing so that we poison it and any future - // operations going forward fail immediately. - core::mem::drop(pending_monitor_updates); - core::mem::drop(monitors); - let _poison = self.monitors.write().unwrap(); - let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!(logger, "{}", err_str); - panic!("{}", err_str); - }, - } - - // We may need to start monitoring for any alternative funding transactions. - if let Some(ref chain_source) = self.chain_source { - for (funding_outpoint, funding_script) in - update.internal_renegotiated_funding_data() - { - log_trace!( - logger, - "Registering renegotiated funding outpoint {} with the filter to monitor confirmations and spends", - funding_outpoint - ); - chain_source.register_tx(&funding_outpoint.txid, &funding_script); - chain_source.register_output(WatchedOutput { - block_hash: None, - outpoint: funding_outpoint, - script_pubkey: funding_script, - }); - } - } - - if update_res.is_err() { - ChannelMonitorUpdateStatus::InProgress - } else { - persist_res - } - }, + if !self.deferred { + return self.update_channel_internal(channel_id, update); } + + let mut pending_ops = self.pending_ops.lock().unwrap(); + eprintln!( + "[ChainMonitor] update_channel(deferred) chan={} update_id={} queued, pending_after={}", + channel_id, + update.update_id, + pending_ops.len() + 1, + ); + pending_ops.push_back(PendingMonitorOp::Update { channel_id, update: update.clone() }); + ChannelMonitorUpdateStatus::InProgress } fn release_pending_monitor_events( @@ -1550,12 +1746,34 @@ where #[cfg(test)] mod tests { - use crate::chain::channelmonitor::ANTI_REORG_DELAY; + use super::ChainMonitor; + use crate::chain::channelmonitor::{ChannelMonitorUpdate, ANTI_REORG_DELAY}; + use crate::chain::transaction::OutPoint; use crate::chain::{ChannelMonitorUpdateStatus, Watch}; use crate::events::{ClosureReason, Event}; + use crate::ln::chan_utils::{ + ChannelTransactionParameters, CounterpartyChannelTransactionParameters, + HolderCommitmentTransaction, + }; + use crate::ln::channel_keys::{DelayedPaymentBasepoint, HtlcBasepoint, RevocationBasepoint}; use crate::ln::functional_test_utils::*; use crate::ln::msgs::{BaseMessageHandler, ChannelMessageHandler, MessageSendEvent}; + use crate::ln::script::ShutdownScript; + use crate::ln::types::ChannelId; + use crate::sign::{ChannelSigner, InMemorySigner, NodeSigner}; + use crate::types::features::ChannelTypeFeatures; + use crate::util::dyn_signer::DynSigner; + use crate::util::test_channel_signer::TestChannelSigner; + use crate::util::test_utils::{ + TestBroadcaster, TestChainSource, TestFeeEstimator, TestKeysInterface, TestLogger, + TestPersister, + }; use crate::{expect_payment_path_successful, get_event_msg}; + use bitcoin::hash_types::Txid; + use bitcoin::hashes::Hash; + use bitcoin::script::ScriptBuf; + use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; + use bitcoin::Network; const CHAINSYNC_MONITOR_PARTITION_FACTOR: u32 = 5; @@ -1813,4 +2031,245 @@ mod tests { }) .is_err()); } + + /// Concrete `ChainMonitor` type wired to the standard test utilities in deferred mode. + type TestDeferredChainMonitor<'a> = ChainMonitor< + TestChannelSigner, + &'a TestChainSource, + &'a TestBroadcaster, + &'a TestFeeEstimator, + &'a TestLogger, + &'a TestPersister, + &'a TestKeysInterface, + >; + + /// Creates a minimal `ChannelMonitorUpdate` with no actual update steps. + fn dummy_update(update_id: u64, channel_id: ChannelId) -> ChannelMonitorUpdate { + ChannelMonitorUpdate { updates: vec![], update_id, channel_id: Some(channel_id) } + } + + /// Creates a minimal `ChannelMonitor` for the given `channel_id`. + fn dummy_monitor( + channel_id: ChannelId, + ) -> crate::chain::channelmonitor::ChannelMonitor { + let secp_ctx = Secp256k1::new(); + let dummy_key = + PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()); + let keys = InMemorySigner::new( + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + true, + SecretKey::from_slice(&[41; 32]).unwrap(), + SecretKey::from_slice(&[41; 32]).unwrap(), + [41; 32], + [0; 32], + [0; 32], + ); + let counterparty_pubkeys = crate::ln::chan_utils::ChannelPublicKeys { + funding_pubkey: dummy_key, + revocation_basepoint: RevocationBasepoint::from(dummy_key), + payment_point: dummy_key, + delayed_payment_basepoint: DelayedPaymentBasepoint::from(dummy_key), + htlc_basepoint: HtlcBasepoint::from(dummy_key), + }; + let funding_outpoint = OutPoint { txid: Txid::all_zeros(), index: u16::MAX }; + let channel_parameters = ChannelTransactionParameters { + holder_pubkeys: keys.pubkeys(&secp_ctx), + holder_selected_contest_delay: 66, + is_outbound_from_holder: true, + counterparty_parameters: Some(CounterpartyChannelTransactionParameters { + pubkeys: counterparty_pubkeys, + selected_contest_delay: 67, + }), + funding_outpoint: Some(funding_outpoint), + splice_parent_funding_txid: None, + channel_type_features: ChannelTypeFeatures::only_static_remote_key(), + channel_value_satoshis: 0, + }; + let shutdown_script = ShutdownScript::new_p2wpkh_from_pubkey(dummy_key); + let best_block = crate::chain::BestBlock::from_network(Network::Testnet); + let signer = TestChannelSigner::new(DynSigner::new(keys)); + crate::chain::channelmonitor::ChannelMonitor::new( + secp_ctx, + signer, + Some(shutdown_script.into_inner()), + 0, + &ScriptBuf::new(), + &channel_parameters, + true, + 0, + HolderCommitmentTransaction::dummy(0, funding_outpoint, Vec::new()), + best_block, + dummy_key, + channel_id, + false, + ) + } + + fn create_deferred_chain_monitor<'a>( + chain_source: &'a TestChainSource, broadcaster: &'a TestBroadcaster, + logger: &'a TestLogger, fee_est: &'a TestFeeEstimator, persister: &'a TestPersister, + keys: &'a TestKeysInterface, + ) -> TestDeferredChainMonitor<'a> { + ChainMonitor::new( + Some(chain_source), + broadcaster, + logger, + fee_est, + persister, + keys, + keys.get_peer_storage_key(), + true, + ) + } + + /// Tests queueing and flushing of both `watch_channel` and `update_channel` operations + /// when `ChainMonitor` is in deferred mode, verifying that operations flow through to + /// `Persist` and that `channel_monitor_updated` is called on `Completed` status. + #[test] + fn test_queue_and_flush() { + let broadcaster = TestBroadcaster::new(Network::Testnet); + let fee_est = TestFeeEstimator::new(253); + let logger = TestLogger::new(); + let persister = TestPersister::new(); + let chain_source = TestChainSource::new(Network::Testnet); + let keys = TestKeysInterface::new(&[0; 32], Network::Testnet); + let deferred = create_deferred_chain_monitor( + &chain_source, + &broadcaster, + &logger, + &fee_est, + &persister, + &keys, + ); + + // Queue starts empty. + assert_eq!(deferred.pending_operation_count(), 0); + + // Queue a watch_channel, verifying InProgress status. + let chan = ChannelId::from_bytes([1u8; 32]); + let status = Watch::watch_channel(&deferred, chan, dummy_monitor(chan)); + assert_eq!(status, Ok(ChannelMonitorUpdateStatus::InProgress)); + assert_eq!(deferred.pending_operation_count(), 1); + + // Nothing persisted yet — operations are only queued. + assert!(persister.new_channel_persistences.lock().unwrap().is_empty()); + + // Queue two updates after the watch. Update IDs must be sequential (starting + // from 1 since the initial monitor has update_id 0). + assert_eq!( + Watch::update_channel(&deferred, chan, &dummy_update(1, chan)), + ChannelMonitorUpdateStatus::InProgress + ); + assert_eq!( + Watch::update_channel(&deferred, chan, &dummy_update(2, chan)), + ChannelMonitorUpdateStatus::InProgress + ); + assert_eq!(deferred.pending_operation_count(), 3); + + // Flush 2 of 3: persist_new_channel returns Completed (triggers + // channel_monitor_updated), update_persisted_channel returns InProgress (does not). + persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); + persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + deferred.flush(2, &&logger).unwrap(); + + assert_eq!(deferred.pending_operation_count(), 1); + + // persist_new_channel was called for the watch. + assert_eq!(persister.new_channel_persistences.lock().unwrap().len(), 1); + + // Because persist_new_channel returned Completed, channel_monitor_updated was called, + // so update_id 0 should no longer be pending. + let pending = deferred.list_pending_monitor_updates(); + #[cfg(not(c_bindings))] + let pending_for_chan = pending.get(&chan).unwrap(); + #[cfg(c_bindings)] + let pending_for_chan = &pending.iter().find(|(chan_id, _)| *chan_id == chan).unwrap().1; + assert!(!pending_for_chan.contains(&0)); + + // update_persisted_channel was called for update_id 1, and because it returned + // InProgress, update_id 1 remains pending. + let monitor_name = deferred.get_monitor(chan).unwrap().persistence_key(); + assert!(persister + .offchain_monitor_updates + .lock() + .unwrap() + .get(&monitor_name) + .unwrap() + .contains(&1)); + assert!(pending_for_chan.contains(&1)); + + // Flush remaining: update_persisted_channel returns Completed (default), triggers + // channel_monitor_updated. + deferred.flush(1, &&logger).unwrap(); + assert_eq!(deferred.pending_operation_count(), 0); + + // update_persisted_channel was called for update_id 2. + assert!(persister + .offchain_monitor_updates + .lock() + .unwrap() + .get(&monitor_name) + .unwrap() + .contains(&2)); + + // update_id 1 is still pending from the InProgress earlier, but update_id 2 was + // completed in this flush so it is no longer pending. + let pending = deferred.list_pending_monitor_updates(); + #[cfg(not(c_bindings))] + let pending_for_chan = pending.get(&chan).unwrap(); + #[cfg(c_bindings)] + let pending_for_chan = &pending.iter().find(|(chan_id, _)| *chan_id == chan).unwrap().1; + assert!(pending_for_chan.contains(&1)); + assert!(!pending_for_chan.contains(&2)); + + // Flushing an empty queue is a no-op. + let persist_count_before = persister.new_channel_persistences.lock().unwrap().len(); + deferred.flush(5, &&logger).unwrap(); + assert_eq!(persister.new_channel_persistences.lock().unwrap().len(), persist_count_before); + } + + /// Tests that `ChainMonitor` in deferred mode properly defers `watch_channel` and + /// `update_channel` operations, verifying correctness through a complete channel open + /// and payment flow. Operations are auto-flushed via the `TestChainMonitor` + /// `release_pending_monitor_events` helper. + #[test] + fn test_deferred_monitor_payment() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs_deferred(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + let chain_monitor_a = &nodes[0].chain_monitor.chain_monitor; + let chain_monitor_b = &nodes[1].chain_monitor.chain_monitor; + + eprintln!("[test_deferred] before create_announced_chan: a_monitors={} a_pending={} b_monitors={} b_pending={}", + chain_monitor_a.list_monitors().len(), chain_monitor_a.pending_operation_count(), + chain_monitor_b.list_monitors().len(), chain_monitor_b.pending_operation_count()); + + create_announced_chan_between_nodes(&nodes, 0, 1); + + eprintln!("[test_deferred] after create_announced_chan: a_monitors={} a_pending={} b_monitors={} b_pending={}", + chain_monitor_a.list_monitors().len(), chain_monitor_a.pending_operation_count(), + chain_monitor_b.list_monitors().len(), chain_monitor_b.pending_operation_count()); + + let (preimage, _hash, ..) = route_payment(&nodes[0], &[&nodes[1]], 10_000); + + eprintln!("[test_deferred] after route_payment: a_monitors={} a_pending={} b_monitors={} b_pending={}", + chain_monitor_a.list_monitors().len(), chain_monitor_a.pending_operation_count(), + chain_monitor_b.list_monitors().len(), chain_monitor_b.pending_operation_count()); + + claim_payment(&nodes[0], &[&nodes[1]], preimage); + + eprintln!("[test_deferred] after claim_payment: a_monitors={} a_pending={} b_monitors={} b_pending={}", + chain_monitor_a.list_monitors().len(), chain_monitor_a.pending_operation_count(), + chain_monitor_b.list_monitors().len(), chain_monitor_b.pending_operation_count()); + + assert_eq!(chain_monitor_a.list_monitors().len(), 1); + assert_eq!(chain_monitor_b.list_monitors().len(), 1); + assert_eq!(chain_monitor_a.pending_operation_count(), 0); + assert_eq!(chain_monitor_b.pending_operation_count(), 0); + } } diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 5a0c37bd61d..19715be6e8d 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -4918,6 +4918,7 @@ fn native_async_persist() { native_async_persister, Arc::clone(&keys_manager), keys_manager.get_peer_storage_key(), + false, ); // Write the initial ChannelMonitor async, testing primarily that the `MonitorEvent::Completed` diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 106cc7ccaed..5f74dfce806 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -20641,7 +20641,7 @@ pub mod bench { let seed_a = [1u8; 32]; let keys_manager_a = KeysManager::new(&seed_a, 42, 42, true); - let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a, &keys_manager_a, keys_manager_a.get_peer_storage_key()); + let chain_monitor_a = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_a, &keys_manager_a, keys_manager_a.get_peer_storage_key(), false); let node_a = ChannelManager::new(&fee_estimator, &chain_monitor_a, &tx_broadcaster, &router, &message_router, &logger_a, &keys_manager_a, &keys_manager_a, &keys_manager_a, config.clone(), ChainParameters { network, best_block: BestBlock::from_network(network), @@ -20651,7 +20651,7 @@ pub mod bench { let logger_b = test_utils::TestLogger::with_id("node a".to_owned()); let seed_b = [2u8; 32]; let keys_manager_b = KeysManager::new(&seed_b, 42, 42, true); - let chain_monitor_b = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_b, &keys_manager_b, keys_manager_b.get_peer_storage_key()); + let chain_monitor_b = ChainMonitor::new(None, &tx_broadcaster, &logger_a, &fee_estimator, &persister_b, &keys_manager_b, keys_manager_b.get_peer_storage_key(), false); let node_b = ChannelManager::new(&fee_estimator, &chain_monitor_b, &tx_broadcaster, &router, &message_router, &logger_b, &keys_manager_b, &keys_manager_b, &keys_manager_b, config.clone(), ChainParameters { network, best_block: BestBlock::from_network(network), diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index c21c62d464c..09f23a2d8c5 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -4449,6 +4449,26 @@ fn create_node_cfgs_internal<'a, F>( node_count: usize, chanmon_cfgs: &'a Vec, persisters: Vec<&'a impl test_utils::SyncPersist>, message_router_constructor: F, ) -> Vec> +where + F: Fn( + Arc>, + &'a TestKeysInterface, + ) -> test_utils::TestMessageRouter<'a>, +{ + create_node_cfgs_internal_deferred( + node_count, + chanmon_cfgs, + persisters, + message_router_constructor, + false, + ) +} + +fn create_node_cfgs_internal_deferred<'a, F>( + node_count: usize, chanmon_cfgs: &'a Vec, + persisters: Vec<&'a impl test_utils::SyncPersist>, message_router_constructor: F, + deferred: bool, +) -> Vec> where F: Fn( Arc>, @@ -4460,14 +4480,25 @@ where for i in 0..node_count { let cfg = &chanmon_cfgs[i]; let network_graph = Arc::new(NetworkGraph::new(Network::Testnet, &cfg.logger)); - let chain_monitor = test_utils::TestChainMonitor::new( - Some(&cfg.chain_source), - &cfg.tx_broadcaster, - &cfg.logger, - &cfg.fee_estimator, - persisters[i], - &cfg.keys_manager, - ); + let chain_monitor = if deferred { + test_utils::TestChainMonitor::new_deferred( + Some(&cfg.chain_source), + &cfg.tx_broadcaster, + &cfg.logger, + &cfg.fee_estimator, + persisters[i], + &cfg.keys_manager, + ) + } else { + test_utils::TestChainMonitor::new( + Some(&cfg.chain_source), + &cfg.tx_broadcaster, + &cfg.logger, + &cfg.fee_estimator, + persisters[i], + &cfg.keys_manager, + ) + }; let seed = [i as u8; 32]; nodes.push(NodeCfg { @@ -4507,6 +4538,19 @@ pub fn create_node_cfgs<'a>( ) } +pub fn create_node_cfgs_deferred<'a>( + node_count: usize, chanmon_cfgs: &'a Vec, +) -> Vec> { + let persisters = chanmon_cfgs.iter().map(|c| &c.persister).collect(); + create_node_cfgs_internal_deferred( + node_count, + chanmon_cfgs, + persisters, + test_utils::TestMessageRouter::new_default, + true, + ) +} + pub fn create_node_cfgs_with_persisters<'a>( node_count: usize, chanmon_cfgs: &'a Vec, persisters: Vec<&'a impl test_utils::SyncPersist>, diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index bcf39fde482..0e36e457aba 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -509,6 +509,7 @@ pub struct TestChainMonitor<'a> { &'a TestKeysInterface, >, pub keys_manager: &'a TestKeysInterface, + pub logger: &'a TestLogger, /// If this is set to Some(), the next update_channel call (not watch_channel) must be a /// ChannelForceClosed event for the given channel_id with should_broadcast set to the given /// boolean. @@ -524,6 +525,38 @@ impl<'a> TestChainMonitor<'a> { chain_source: Option<&'a TestChainSource>, broadcaster: &'a dyn SyncBroadcaster, logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, persister: &'a dyn SyncPersist, keys_manager: &'a TestKeysInterface, + ) -> Self { + Self::with_deferred( + chain_source, + broadcaster, + logger, + fee_estimator, + persister, + keys_manager, + false, + ) + } + + pub fn new_deferred( + chain_source: Option<&'a TestChainSource>, broadcaster: &'a dyn SyncBroadcaster, + logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, + persister: &'a dyn SyncPersist, keys_manager: &'a TestKeysInterface, + ) -> Self { + Self::with_deferred( + chain_source, + broadcaster, + logger, + fee_estimator, + persister, + keys_manager, + true, + ) + } + + fn with_deferred( + chain_source: Option<&'a TestChainSource>, broadcaster: &'a dyn SyncBroadcaster, + logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, + persister: &'a dyn SyncPersist, keys_manager: &'a TestKeysInterface, deferred: bool, ) -> Self { Self { added_monitors: Mutex::new(Vec::new()), @@ -537,8 +570,10 @@ impl<'a> TestChainMonitor<'a> { persister, keys_manager, keys_manager.get_peer_storage_key(), + deferred, ), keys_manager, + logger, expect_channel_force_closed: Mutex::new(None), expect_monitor_round_trip_fail: Mutex::new(None), #[cfg(feature = "std")] @@ -546,6 +581,10 @@ impl<'a> TestChainMonitor<'a> { } } + pub fn pending_operation_count(&self) -> usize { + self.chain_monitor.pending_operation_count() + } + pub fn complete_sole_pending_chan_update(&self, channel_id: &ChannelId) { let (_, latest_update) = self.latest_monitor_update_id.lock().unwrap().get(channel_id).unwrap().clone(); @@ -613,7 +652,17 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { .unwrap() .insert(channel_id, (monitor.get_latest_update_id(), monitor.get_latest_update_id())); self.added_monitors.lock().unwrap().push((channel_id, monitor)); - self.chain_monitor.watch_channel(channel_id, new_monitor) + let deferred = self.chain_monitor.pending_operation_count() > 0 + || self.chain_monitor.list_monitors().is_empty(); + let res = self.chain_monitor.watch_channel(channel_id, new_monitor); + eprintln!( + "[TestChainMonitor] watch_channel chan={} deferred_hint={} pending_after={} monitors_after={} result={:?}", + channel_id, deferred, + self.chain_monitor.pending_operation_count(), + self.chain_monitor.list_monitors().len(), + res.as_ref().map(|s| format!("{:?}", s)), + ); + res } fn update_channel( @@ -654,6 +703,19 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { let update_res = self.chain_monitor.update_channel(channel_id, update); // At every point where we get a monitor update, we should be able to send a useful monitor // to a watchtower and disk... + let in_monitors = self.chain_monitor.list_monitors().contains(&channel_id); + let pending_count = self.chain_monitor.pending_operation_count(); + eprintln!( + "[TestChainMonitor] update_channel chan={} update_id={} in_monitors={} pending={} update_res={:?}", + channel_id, update.update_id, in_monitors, pending_count, update_res, + ); + if !in_monitors { + eprintln!( + "[TestChainMonitor] WARNING: update_channel called but monitor not in monitors map! \ + Calling get_monitor will panic. pending_count={}", + pending_count, + ); + } let monitor = self.chain_monitor.get_monitor(channel_id).unwrap(); w.0.clear(); monitor.write(&mut w).unwrap(); @@ -676,6 +738,24 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { fn release_pending_monitor_events( &self, ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + // Auto-flush pending operations so that the ChannelManager can pick up monitor + // completion events. When not in deferred mode the queue is empty so this only + // costs a lock acquisition. It ensures standard test helpers (route_payment, etc.) + // work with deferred chain monitors. + let count = self.chain_monitor.pending_operation_count(); + if count > 0 { + eprintln!( + "[TestChainMonitor] release_pending_monitor_events: flushing {} ops, monitors_before={}", + count, self.chain_monitor.list_monitors().len(), + ); + } + self.chain_monitor.flush(count, &self.logger).unwrap(); + if count > 0 { + eprintln!( + "[TestChainMonitor] release_pending_monitor_events: after flush monitors={}", + self.chain_monitor.list_monitors().len(), + ); + } return self.chain_monitor.release_pending_monitor_events(); } } @@ -835,6 +915,8 @@ pub struct TestPersister { /// The queue of update statuses we'll return. If none are queued, ::Completed will always be /// returned. pub update_rets: Mutex>, + /// When we get a persist_new_channel call, we push the monitor name here. + pub new_channel_persistences: Mutex>, /// When we get an update_persisted_channel call *with* a ChannelMonitorUpdate, we insert the /// [`ChannelMonitor::get_latest_update_id`] here. pub offchain_monitor_updates: Mutex>>, @@ -845,9 +927,15 @@ pub struct TestPersister { impl TestPersister { pub fn new() -> Self { let update_rets = Mutex::new(VecDeque::new()); + let new_channel_persistences = Mutex::new(Vec::new()); let offchain_monitor_updates = Mutex::new(new_hash_map()); let chain_sync_monitor_persistences = Mutex::new(VecDeque::new()); - Self { update_rets, offchain_monitor_updates, chain_sync_monitor_persistences } + Self { + update_rets, + new_channel_persistences, + offchain_monitor_updates, + chain_sync_monitor_persistences, + } } /// Queue an update status to return. @@ -857,8 +945,9 @@ impl TestPersister { } impl Persist for TestPersister { fn persist_new_channel( - &self, _monitor_name: MonitorName, _data: &ChannelMonitor, + &self, monitor_name: MonitorName, _data: &ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { + self.new_channel_persistences.lock().unwrap().push(monitor_name); if let Some(update_ret) = self.update_rets.lock().unwrap().pop_front() { return update_ret; }