Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 50 additions & 21 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8154,6 +8154,19 @@ impl<
self.apply_post_close_monitor_update(counterparty_node_id, channel_id, funding_txo, update);
},
BackgroundEvent::MonitorUpdatesComplete { counterparty_node_id, channel_id } => {
// Now that we can finally handle the background event, remove all in-flight
// monitor updates for this channel, as they have already been persisted to the
// monitor and can be applied to our internal state such that the channel
// resumes operation.
{
let per_peer_state = self.per_peer_state.read().unwrap();
let mut peer_state_lock;
let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
if peer_state_mutex_opt.is_none() { continue; }
peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
let peer_state = &mut *peer_state_lock;
peer_state.in_flight_monitor_updates.remove(&channel_id);
}
self.channel_monitor_updated(&channel_id, None, &counterparty_node_id);
},
}
Expand Down Expand Up @@ -18134,39 +18147,55 @@ impl<
($counterparty_node_id: expr, $chan_in_flight_upds: expr, $monitor: expr,
$peer_state: expr, $logger: expr, $channel_info_log: expr
) => { {
let mut max_in_flight_update_id = 0;
let starting_len = $chan_in_flight_upds.len();
$chan_in_flight_upds.retain(|upd| upd.update_id > $monitor.get_latest_update_id());
if $chan_in_flight_upds.len() < starting_len {
// When all in-flight updates have completed after we were last serialized, we
// need to remove them. However, we can't guarantee that the next serialization
// will have happened after processing the
// `BackgroundEvent::MonitorUpdatesComplete`, so removing them now could lead to the
// channel never being resumed as the event would not be regenerated after another
// reload. At the same time, we don't want to resume the channel now because there
// may be post-update actions to handle. Therefore, we're forced to keep tracking
// the completed in-flight updates (but only when they have all completed) until we
// are processing the `BackgroundEvent::MonitorUpdatesComplete`.
let num_updates_completed = $chan_in_flight_upds
.iter()
.filter(|upd| upd.update_id <= $monitor.get_latest_update_id())
.count();
if num_updates_completed > 0 {
log_debug!(
$logger,
"{} ChannelMonitorUpdates completed after ChannelManager was last serialized",
starting_len - $chan_in_flight_upds.len()
num_updates_completed,
);
}
let all_updates_completed = num_updates_completed == $chan_in_flight_upds.len();

let funding_txo = $monitor.get_funding_txo();
for update in $chan_in_flight_upds.iter() {
log_debug!($logger, "Replaying ChannelMonitorUpdate {} for {}channel {}",
update.update_id, $channel_info_log, &$monitor.channel_id());
max_in_flight_update_id = cmp::max(max_in_flight_update_id, update.update_id);
pending_background_events.push(
BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
counterparty_node_id: $counterparty_node_id,
funding_txo: funding_txo,
channel_id: $monitor.channel_id(),
update: update.clone(),
});
}
if $chan_in_flight_upds.is_empty() {
// We had some updates to apply, but it turns out they had completed before we
// were serialized, we just weren't notified of that. Thus, we may have to run
// the completion actions for any monitor updates, but otherwise are done.
let mut max_in_flight_update_id = 0;
if all_updates_completed {
log_debug!($logger, "All monitor updates completed since the ChannelManager was last serialized");
pending_background_events.push(
BackgroundEvent::MonitorUpdatesComplete {
counterparty_node_id: $counterparty_node_id,
channel_id: $monitor.channel_id(),
});
} else {
$chan_in_flight_upds.retain(|update| {
let replay = update.update_id > $monitor.get_latest_update_id();
if replay {
log_debug!($logger, "Replaying ChannelMonitorUpdate {} for {}channel {}",
update.update_id, $channel_info_log, &$monitor.channel_id());
max_in_flight_update_id = cmp::max(max_in_flight_update_id, update.update_id);
pending_background_events.push(
BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
counterparty_node_id: $counterparty_node_id,
funding_txo: funding_txo,
channel_id: $monitor.channel_id(),
update: update.clone(),
}
);
}
replay
});
$peer_state.closed_channel_monitor_update_ids.entry($monitor.channel_id())
.and_modify(|v| *v = cmp::max(max_in_flight_update_id, *v))
.or_insert(max_in_flight_update_id);
Expand Down
88 changes: 88 additions & 0 deletions lightning/src/ln/reload_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ use crate::prelude::*;

use crate::ln::functional_test_utils::*;

fn get_latest_mon_update_id<'a, 'b, 'c>(
node: &Node<'a, 'b, 'c>, channel_id: ChannelId,
) -> (u64, u64) {
let monitor_id_state = node.chain_monitor.latest_monitor_update_id.lock().unwrap();
monitor_id_state.get(&channel_id).unwrap().clone()
}

#[test]
fn test_funding_peer_disconnect() {
// Test that we can lock in our funding tx while disconnected
Expand Down Expand Up @@ -1566,3 +1573,84 @@ fn test_peer_storage() {
assert!(res.is_err());
}

#[test]
fn test_hold_completed_inflight_monitor_updates_upon_manager_reload() {
// Test that if a `ChannelMonitorUpdate` completes after the `ChannelManager` is serialized,
// but before it is deserialized, we hold any completed in-flight updates until background event
// processing. Previously, we would remove completed monitor updates from
// `in_flight_monitor_updates` during deserialization, relying on
// [`ChannelManager::process_background_events`] to eventually be called before the
// `ChannelManager` is serialized again such that the channel is resumed and further updates can
// be made.
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let (persister_a, persister_b);
let (chain_monitor_a, chain_monitor_b);

let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes_0_deserialized_a;
let nodes_0_deserialized_b;

let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1).2;

send_payment(&nodes[0], &[&nodes[1]], 1_000_000);

chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);

// Send a payment that will be pending due to an async monitor update.
let (route, payment_hash, _, payment_secret) =
get_route_and_payment_hash!(nodes[0], nodes[1], 1_000_000);
let payment_id = PaymentId(payment_hash.0);
let onion = RecipientOnionFields::secret_only(payment_secret);
nodes[0].node.send_payment_with_route(route, payment_hash, onion, payment_id).unwrap();
check_added_monitors(&nodes[0], 1);

assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());

// Serialize the ChannelManager while the monitor update is still in-flight.
let node_0_serialized = nodes[0].node.encode();

// Now complete the monitor update by calling force_channel_monitor_updated.
// This updates the monitor's state, but the ChannelManager still thinks it's pending.
let (_, latest_update_id) = get_latest_mon_update_id(&nodes[0], chan_id);
nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(chan_id, latest_update_id);
let monitor_serialized_updated = get_monitor!(nodes[0], chan_id).encode();

// Reload the node with the updated monitor. Upon deserialization, the ChannelManager will
// detect that the monitor update completed (monitor's update_id >= the in-flight update_id)
// and queue a `BackgroundEvent::MonitorUpdatesComplete`.
nodes[0].node.peer_disconnected(nodes[1].node.get_our_node_id());
nodes[1].node.peer_disconnected(nodes[0].node.get_our_node_id());
reload_node!(
nodes[0],
test_default_channel_config(),
&node_0_serialized,
&[&monitor_serialized_updated[..]],
persister_a,
chain_monitor_a,
nodes_0_deserialized_a
);

// If we serialize again, even though we haven't processed any background events yet, we should
// still see the `BackgroundEvent::MonitorUpdatesComplete` be regenerated on startup.
let node_0_serialized = nodes[0].node.encode();
reload_node!(
nodes[0],
test_default_channel_config(),
&node_0_serialized,
&[&monitor_serialized_updated[..]],
persister_b,
chain_monitor_b,
nodes_0_deserialized_b
);

// Reconnect the nodes. We should finally see the `update_add_htlc` go out, as the reconnection
// should first process `BackgroundEvent::MonitorUpdatesComplete, allowing the channel to be
// resumed.
let mut reconnect_args = ReconnectArgs::new(&nodes[0], &nodes[1]);
reconnect_args.pending_htlc_adds = (0, 1);
reconnect_nodes(reconnect_args);
}