diff --git a/cumulus/pallets/xcmp-queue/src/lib.rs b/cumulus/pallets/xcmp-queue/src/lib.rs index 4ef989dae4..2444aaabe4 100644 --- a/cumulus/pallets/xcmp-queue/src/lib.rs +++ b/cumulus/pallets/xcmp-queue/src/lib.rs @@ -124,11 +124,8 @@ pub mod pallet { /// Status of the inbound XCMP channels. #[pallet::storage] - pub(super) type InboundXcmpStatus = StorageValue< - _, - Vec<(ParaId, InboundStatus, Vec<(RelayBlockNumber, XcmpMessageFormat)>)>, - ValueQuery, - >; + pub(super) type InboundXcmpStatus = + StorageValue<_, Vec, ValueQuery>; /// Inbound aggregate XCMP messages. It can only be one per ParaId/block. #[pallet::storage] @@ -150,7 +147,7 @@ pub mod pallet { /// The bool is true if there is a signal message waiting to be sent. #[pallet::storage] pub(super) type OutboundXcmpStatus = - StorageValue<_, Vec<(ParaId, OutboundStatus, bool, u16, u16)>, ValueQuery>; + StorageValue<_, Vec, ValueQuery>; // The new way of doing it: /// The messages outbound in a given XCMP channel. @@ -169,17 +166,68 @@ pub mod pallet { } #[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Encode, Decode, RuntimeDebug, TypeInfo)] -pub enum InboundStatus { +pub enum InboundState { Ok, Suspended, } #[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)] -pub enum OutboundStatus { +pub enum OutboundState { Ok, Suspended, } +/// Struct containing detailed information about the inbound channel. +#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Encode, Decode, TypeInfo)] +pub struct InboundChannelDetails { + /// The `ParaId` of the parachain that this channel is connected with. + sender: ParaId, + /// The state of the channel. + state: InboundState, + /// The ordered metadata of each inbound message. + /// + /// Contains info about the relay block number that the message was sent at, and the format + /// of the incoming message. + message_metadata: Vec<(RelayBlockNumber, XcmpMessageFormat)>, +} + +/// Struct containing detailed information about the outbound channel. +#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo)] +pub struct OutboundChannelDetails { + /// The `ParaId` of the parachain that this channel is connected with. + recipient: ParaId, + /// The state of the channel. + state: OutboundState, + /// Whether or not any signals exist in this channel. + signals_exist: bool, + /// The index of the first outbound message. + first_index: u16, + /// The index of the last outbound message. + last_index: u16, +} + +impl OutboundChannelDetails { + pub fn new(recipient: ParaId) -> OutboundChannelDetails { + OutboundChannelDetails { + recipient, + state: OutboundState::Ok, + signals_exist: false, + first_index: 0, + last_index: 0, + } + } + + pub fn with_signals(mut self) -> OutboundChannelDetails { + self.signals_exist = true; + self + } + + pub fn with_suspended_state(mut self) -> OutboundChannelDetails { + self.state = OutboundState::Suspended; + self + } +} + #[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)] pub struct QueueConfigData { /// The number of pages of messages which must be in the queue for the other side to be told to @@ -242,13 +290,13 @@ impl Pallet { } let mut s = >::get(); - let index = s.iter().position(|item| item.0 == recipient).unwrap_or_else(|| { - s.push((recipient, OutboundStatus::Ok, false, 0, 0)); + let index = s.iter().position(|item| item.recipient == recipient).unwrap_or_else(|| { + s.push(OutboundChannelDetails::new(recipient)); s.len() - 1 }); - let have_active = s[index].4 > s[index].3; + let have_active = s[index].last_index > s[index].first_index; let appended = have_active && - >::mutate(recipient, s[index].4 - 1, |s| { + >::mutate(recipient, s[index].last_index - 1, |s| { if XcmpMessageFormat::decode_and_advance_with_depth_limit( MAX_XCM_DECODE_DEPTH, &mut &s[..], @@ -263,15 +311,15 @@ impl Pallet { return true }); if appended { - Ok((s[index].4 - s[index].3 - 1) as u32) + Ok((s[index].last_index - s[index].first_index - 1) as u32) } else { // Need to add a new page. - let page_index = s[index].4; - s[index].4 += 1; + let page_index = s[index].last_index; + s[index].last_index += 1; let mut new_page = format.encode(); new_page.extend_from_slice(&data[..]); >::insert(recipient, page_index, new_page); - let r = (s[index].4 - s[index].3 - 1) as u32; + let r = (s[index].last_index - s[index].first_index - 1) as u32; >::put(s); Ok(r) } @@ -281,10 +329,10 @@ impl Pallet { /// block. fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), ()> { let mut s = >::get(); - if let Some(index) = s.iter().position(|item| item.0 == dest) { - s[index].2 = true; + if let Some(index) = s.iter().position(|item| item.recipient == dest) { + s[index].signals_exist = true; } else { - s.push((dest, OutboundStatus::Ok, true, 0, 0)); + s.push(OutboundChannelDetails::new(dest).with_signals()); } >::mutate(dest, |page| { if page.is_empty() { @@ -493,7 +541,7 @@ impl Pallet { max_weight.saturating_sub(weight_used) >= threshold_weight { let index = shuffled[shuffle_index]; - let sender = status[index].0; + let sender = status[index].sender; if weight_available != max_weight { // Get incrementally closer to freeing up max_weight for message execution over the @@ -510,34 +558,37 @@ impl Pallet { } } - let weight_processed = if status[index].2.is_empty() { + let weight_processed = if status[index].message_metadata.is_empty() { debug_assert!(false, "channel exists in status; there must be messages; qed"); 0 } else { // Process up to one block's worth for now. let weight_remaining = weight_available.saturating_sub(weight_used); - let (weight_processed, is_empty) = - Self::process_xcmp_message(sender, status[index].2[0], weight_remaining); + let (weight_processed, is_empty) = Self::process_xcmp_message( + sender, + status[index].message_metadata[0], + weight_remaining, + ); if is_empty { - status[index].2.remove(0); + status[index].message_metadata.remove(0); } weight_processed }; weight_used += weight_processed; - if status[index].2.len() as u32 <= resume_threshold && - status[index].1 == InboundStatus::Suspended + if status[index].message_metadata.len() as u32 <= resume_threshold && + status[index].state == InboundState::Suspended { // Resume let r = Self::send_signal(sender, ChannelSignal::Resume); debug_assert!(r.is_ok(), "WARNING: Failed sending resume into suspended channel"); - status[index].1 = InboundStatus::Ok; + status[index].state = InboundState::Ok; } // If there are more and we're making progress, we process them after we've given the // other channels a look in. If we've still not unlocked all weight, then we set them // up for processing a second time anyway. - if !status[index].2.is_empty() && + if !status[index].message_metadata.is_empty() && (weight_processed > 0 || weight_available != max_weight) { if shuffle_index + 1 == shuffled.len() { @@ -550,7 +601,7 @@ impl Pallet { } // Only retain the senders that have non-empty queues. - status.retain(|item| !item.2.is_empty()); + status.retain(|item| !item.message_metadata.is_empty()); >::put(status); weight_used @@ -558,28 +609,28 @@ impl Pallet { fn suspend_channel(target: ParaId) { >::mutate(|s| { - if let Some(index) = s.iter().position(|item| item.0 == target) { - let ok = s[index].1 == OutboundStatus::Ok; + if let Some(index) = s.iter().position(|item| item.recipient == target) { + let ok = s[index].state == OutboundState::Ok; debug_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok."); - s[index].1 = OutboundStatus::Suspended; + s[index].state = OutboundState::Suspended; } else { - s.push((target, OutboundStatus::Suspended, false, 0, 0)); + s.push(OutboundChannelDetails::new(target).with_suspended_state()); } }); } fn resume_channel(target: ParaId) { >::mutate(|s| { - if let Some(index) = s.iter().position(|item| item.0 == target) { - let suspended = s[index].1 == OutboundStatus::Suspended; + if let Some(index) = s.iter().position(|item| item.recipient == target) { + let suspended = s[index].state == OutboundState::Suspended; debug_assert!( suspended, "WARNING: Attempt to resume channel that was not suspended." ); - if s[index].3 == s[index].4 { + if s[index].first_index == s[index].last_index { s.remove(index); } else { - s[index].1 = OutboundStatus::Ok; + s[index].state = OutboundState::Ok; } } else { debug_assert!(false, "WARNING: Attempt to resume channel that was not suspended."); @@ -621,11 +672,12 @@ impl XcmpMessageHandler for Pallet { } } else { // Record the fact we received it. - match status.binary_search_by_key(&sender, |item| item.0) { + match status.binary_search_by_key(&sender, |item| item.sender) { Ok(i) => { - let count = status[i].2.len(); - if count as u32 >= suspend_threshold && status[i].1 == InboundStatus::Ok { - status[i].1 = InboundStatus::Suspended; + let count = status[i].message_metadata.len(); + if count as u32 >= suspend_threshold && status[i].state == InboundState::Ok + { + status[i].state = InboundState::Suspended; let r = Self::send_signal(sender, ChannelSignal::Suspend); if r.is_err() { log::warn!( @@ -634,7 +686,7 @@ impl XcmpMessageHandler for Pallet { } } if (count as u32) < drop_threshold { - status[i].2.push((sent_at, format)); + status[i].message_metadata.push((sent_at, format)); } else { debug_assert!( false, @@ -642,7 +694,11 @@ impl XcmpMessageHandler for Pallet { ); } }, - Err(_) => status.push((sender, InboundStatus::Ok, vec![(sent_at, format)])), + Err(_) => status.push(InboundChannelDetails { + sender, + state: InboundState::Ok, + message_metadata: vec![(sent_at, format)], + }), } // Queue the payload for later execution. >::insert(sender, sent_at, data_ref); @@ -666,47 +722,53 @@ impl XcmpMessageSource for Pallet { let mut result = Vec::with_capacity(max_message_count); for status in statuses.iter_mut() { - let (para_id, outbound_status, mut signalling, mut begin, mut end) = *status; + let OutboundChannelDetails { + recipient: para_id, + state: outbound_state, + mut signals_exist, + mut first_index, + mut last_index, + } = *status; if result.len() == max_message_count { // We check this condition in the beginning of the loop so that we don't include // a message where the limit is 0. break } - if outbound_status == OutboundStatus::Suspended { + if outbound_state == OutboundState::Suspended { continue } let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(para_id) { ChannelStatus::Closed => { // This means that there is no such channel anymore. Nothing to be done but // swallow the messages and discard the status. - for i in begin..end { + for i in first_index..last_index { >::remove(para_id, i); } - if signalling { + if signals_exist { >::remove(para_id); } - *status = (para_id, OutboundStatus::Ok, false, 0, 0); + *status = OutboundChannelDetails::new(para_id); continue }, ChannelStatus::Full => continue, ChannelStatus::Ready(n, e) => (n, e), }; - let page = if signalling { + let page = if signals_exist { let page = >::get(para_id); if page.len() < max_size_now { >::remove(para_id); - signalling = false; + signals_exist = false; page } else { continue } - } else if end > begin { - let page = >::get(para_id, begin); + } else if last_index > first_index { + let page = >::get(para_id, first_index); if page.len() < max_size_now { - >::remove(para_id, begin); - begin += 1; + >::remove(para_id, first_index); + first_index += 1; page } else { continue @@ -714,9 +776,9 @@ impl XcmpMessageSource for Pallet { } else { continue }; - if begin == end { - begin = 0; - end = 0; + if first_index == last_index { + first_index = 0; + last_index = 0; } if page.len() > max_size_ever { @@ -728,7 +790,13 @@ impl XcmpMessageSource for Pallet { result.push((para_id, page)); } - *status = (para_id, outbound_status, signalling, begin, end); + *status = OutboundChannelDetails { + recipient: para_id, + state: outbound_state, + signals_exist, + first_index, + last_index, + }; } // Sort the outbound messages by ascending recipient para id to satisfy the acceptance @@ -743,7 +811,9 @@ impl XcmpMessageSource for Pallet { // // To mitigate this we shift all processed elements towards the end of the vector using // `rotate_left`. To get intuition how it works see the examples in its rustdoc. - statuses.retain(|x| x.1 == OutboundStatus::Suspended || x.2 || x.3 < x.4); + statuses.retain(|x| { + x.state == OutboundState::Suspended || x.signals_exist || x.first_index < x.last_index + }); // old_status_len must be >= status.len() since we never add anything to status. let pruned = old_statuses_len - statuses.len();