Refactor some types in XCMP queue (#813)

* Rename InboundXcmpStatus and its related types

* Rename OutboundXcmpStatus and its relevant types

* Add documentation to the newly created types

* Ensure storage prefixes are kept the same

* Add code comments

* Revert storage name changes

* typo fix

Co-authored-by: Alexander Popiak <alexander.popiak@parity.io>
This commit is contained in:
Keith Yeung
2021-12-01 16:24:04 -08:00
committed by GitHub
parent a3798775d8
commit ad8fa5fc07
+129 -59
View File
@@ -124,11 +124,8 @@ pub mod pallet {
/// Status of the inbound XCMP channels.
#[pallet::storage]
pub(super) type InboundXcmpStatus<T: Config> = StorageValue<
_,
Vec<(ParaId, InboundStatus, Vec<(RelayBlockNumber, XcmpMessageFormat)>)>,
ValueQuery,
>;
pub(super) type InboundXcmpStatus<T: Config> =
StorageValue<_, Vec<InboundChannelDetails>, ValueQuery>;
/// Inbound aggregate XCMP messages. It can only be one per ParaId/block.
#[pallet::storage]
@@ -150,7 +147,7 @@ pub mod pallet {
/// The bool is true if there is a signal message waiting to be sent.
#[pallet::storage]
pub(super) type OutboundXcmpStatus<T: Config> =
StorageValue<_, Vec<(ParaId, OutboundStatus, bool, u16, u16)>, ValueQuery>;
StorageValue<_, Vec<OutboundChannelDetails>, ValueQuery>;
// The new way of doing it:
/// The messages outbound in a given XCMP channel.
@@ -169,17 +166,68 @@ pub mod pallet {
}
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Encode, Decode, RuntimeDebug, TypeInfo)]
pub enum InboundStatus {
pub enum InboundState {
Ok,
Suspended,
}
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
pub enum OutboundStatus {
pub enum OutboundState {
Ok,
Suspended,
}
/// Struct containing detailed information about the inbound channel.
#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Encode, Decode, TypeInfo)]
pub struct InboundChannelDetails {
/// The `ParaId` of the parachain that this channel is connected with.
sender: ParaId,
/// The state of the channel.
state: InboundState,
/// The ordered metadata of each inbound message.
///
/// Contains info about the relay block number that the message was sent at, and the format
/// of the incoming message.
message_metadata: Vec<(RelayBlockNumber, XcmpMessageFormat)>,
}
/// Struct containing detailed information about the outbound channel.
#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo)]
pub struct OutboundChannelDetails {
/// The `ParaId` of the parachain that this channel is connected with.
recipient: ParaId,
/// The state of the channel.
state: OutboundState,
/// Whether or not any signals exist in this channel.
signals_exist: bool,
/// The index of the first outbound message.
first_index: u16,
/// The index of the last outbound message.
last_index: u16,
}
impl OutboundChannelDetails {
pub fn new(recipient: ParaId) -> OutboundChannelDetails {
OutboundChannelDetails {
recipient,
state: OutboundState::Ok,
signals_exist: false,
first_index: 0,
last_index: 0,
}
}
pub fn with_signals(mut self) -> OutboundChannelDetails {
self.signals_exist = true;
self
}
pub fn with_suspended_state(mut self) -> OutboundChannelDetails {
self.state = OutboundState::Suspended;
self
}
}
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
pub struct QueueConfigData {
/// The number of pages of messages which must be in the queue for the other side to be told to
@@ -242,13 +290,13 @@ impl<T: Config> Pallet<T> {
}
let mut s = <OutboundXcmpStatus<T>>::get();
let index = s.iter().position(|item| item.0 == recipient).unwrap_or_else(|| {
s.push((recipient, OutboundStatus::Ok, false, 0, 0));
let index = s.iter().position(|item| item.recipient == recipient).unwrap_or_else(|| {
s.push(OutboundChannelDetails::new(recipient));
s.len() - 1
});
let have_active = s[index].4 > s[index].3;
let have_active = s[index].last_index > s[index].first_index;
let appended = have_active &&
<OutboundXcmpMessages<T>>::mutate(recipient, s[index].4 - 1, |s| {
<OutboundXcmpMessages<T>>::mutate(recipient, s[index].last_index - 1, |s| {
if XcmpMessageFormat::decode_and_advance_with_depth_limit(
MAX_XCM_DECODE_DEPTH,
&mut &s[..],
@@ -263,15 +311,15 @@ impl<T: Config> Pallet<T> {
return true
});
if appended {
Ok((s[index].4 - s[index].3 - 1) as u32)
Ok((s[index].last_index - s[index].first_index - 1) as u32)
} else {
// Need to add a new page.
let page_index = s[index].4;
s[index].4 += 1;
let page_index = s[index].last_index;
s[index].last_index += 1;
let mut new_page = format.encode();
new_page.extend_from_slice(&data[..]);
<OutboundXcmpMessages<T>>::insert(recipient, page_index, new_page);
let r = (s[index].4 - s[index].3 - 1) as u32;
let r = (s[index].last_index - s[index].first_index - 1) as u32;
<OutboundXcmpStatus<T>>::put(s);
Ok(r)
}
@@ -281,10 +329,10 @@ impl<T: Config> Pallet<T> {
/// block.
fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), ()> {
let mut s = <OutboundXcmpStatus<T>>::get();
if let Some(index) = s.iter().position(|item| item.0 == dest) {
s[index].2 = true;
if let Some(index) = s.iter().position(|item| item.recipient == dest) {
s[index].signals_exist = true;
} else {
s.push((dest, OutboundStatus::Ok, true, 0, 0));
s.push(OutboundChannelDetails::new(dest).with_signals());
}
<SignalMessages<T>>::mutate(dest, |page| {
if page.is_empty() {
@@ -493,7 +541,7 @@ impl<T: Config> Pallet<T> {
max_weight.saturating_sub(weight_used) >= threshold_weight
{
let index = shuffled[shuffle_index];
let sender = status[index].0;
let sender = status[index].sender;
if weight_available != max_weight {
// Get incrementally closer to freeing up max_weight for message execution over the
@@ -510,34 +558,37 @@ impl<T: Config> Pallet<T> {
}
}
let weight_processed = if status[index].2.is_empty() {
let weight_processed = if status[index].message_metadata.is_empty() {
debug_assert!(false, "channel exists in status; there must be messages; qed");
0
} else {
// Process up to one block's worth for now.
let weight_remaining = weight_available.saturating_sub(weight_used);
let (weight_processed, is_empty) =
Self::process_xcmp_message(sender, status[index].2[0], weight_remaining);
let (weight_processed, is_empty) = Self::process_xcmp_message(
sender,
status[index].message_metadata[0],
weight_remaining,
);
if is_empty {
status[index].2.remove(0);
status[index].message_metadata.remove(0);
}
weight_processed
};
weight_used += weight_processed;
if status[index].2.len() as u32 <= resume_threshold &&
status[index].1 == InboundStatus::Suspended
if status[index].message_metadata.len() as u32 <= resume_threshold &&
status[index].state == InboundState::Suspended
{
// Resume
let r = Self::send_signal(sender, ChannelSignal::Resume);
debug_assert!(r.is_ok(), "WARNING: Failed sending resume into suspended channel");
status[index].1 = InboundStatus::Ok;
status[index].state = InboundState::Ok;
}
// If there are more and we're making progress, we process them after we've given the
// other channels a look in. If we've still not unlocked all weight, then we set them
// up for processing a second time anyway.
if !status[index].2.is_empty() &&
if !status[index].message_metadata.is_empty() &&
(weight_processed > 0 || weight_available != max_weight)
{
if shuffle_index + 1 == shuffled.len() {
@@ -550,7 +601,7 @@ impl<T: Config> Pallet<T> {
}
// Only retain the senders that have non-empty queues.
status.retain(|item| !item.2.is_empty());
status.retain(|item| !item.message_metadata.is_empty());
<InboundXcmpStatus<T>>::put(status);
weight_used
@@ -558,28 +609,28 @@ impl<T: Config> Pallet<T> {
fn suspend_channel(target: ParaId) {
<OutboundXcmpStatus<T>>::mutate(|s| {
if let Some(index) = s.iter().position(|item| item.0 == target) {
let ok = s[index].1 == OutboundStatus::Ok;
if let Some(index) = s.iter().position(|item| item.recipient == target) {
let ok = s[index].state == OutboundState::Ok;
debug_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
s[index].1 = OutboundStatus::Suspended;
s[index].state = OutboundState::Suspended;
} else {
s.push((target, OutboundStatus::Suspended, false, 0, 0));
s.push(OutboundChannelDetails::new(target).with_suspended_state());
}
});
}
fn resume_channel(target: ParaId) {
<OutboundXcmpStatus<T>>::mutate(|s| {
if let Some(index) = s.iter().position(|item| item.0 == target) {
let suspended = s[index].1 == OutboundStatus::Suspended;
if let Some(index) = s.iter().position(|item| item.recipient == target) {
let suspended = s[index].state == OutboundState::Suspended;
debug_assert!(
suspended,
"WARNING: Attempt to resume channel that was not suspended."
);
if s[index].3 == s[index].4 {
if s[index].first_index == s[index].last_index {
s.remove(index);
} else {
s[index].1 = OutboundStatus::Ok;
s[index].state = OutboundState::Ok;
}
} else {
debug_assert!(false, "WARNING: Attempt to resume channel that was not suspended.");
@@ -621,11 +672,12 @@ impl<T: Config> XcmpMessageHandler for Pallet<T> {
}
} else {
// Record the fact we received it.
match status.binary_search_by_key(&sender, |item| item.0) {
match status.binary_search_by_key(&sender, |item| item.sender) {
Ok(i) => {
let count = status[i].2.len();
if count as u32 >= suspend_threshold && status[i].1 == InboundStatus::Ok {
status[i].1 = InboundStatus::Suspended;
let count = status[i].message_metadata.len();
if count as u32 >= suspend_threshold && status[i].state == InboundState::Ok
{
status[i].state = InboundState::Suspended;
let r = Self::send_signal(sender, ChannelSignal::Suspend);
if r.is_err() {
log::warn!(
@@ -634,7 +686,7 @@ impl<T: Config> XcmpMessageHandler for Pallet<T> {
}
}
if (count as u32) < drop_threshold {
status[i].2.push((sent_at, format));
status[i].message_metadata.push((sent_at, format));
} else {
debug_assert!(
false,
@@ -642,7 +694,11 @@ impl<T: Config> XcmpMessageHandler for Pallet<T> {
);
}
},
Err(_) => status.push((sender, InboundStatus::Ok, vec![(sent_at, format)])),
Err(_) => status.push(InboundChannelDetails {
sender,
state: InboundState::Ok,
message_metadata: vec![(sent_at, format)],
}),
}
// Queue the payload for later execution.
<InboundXcmpMessages<T>>::insert(sender, sent_at, data_ref);
@@ -666,47 +722,53 @@ impl<T: Config> XcmpMessageSource for Pallet<T> {
let mut result = Vec::with_capacity(max_message_count);
for status in statuses.iter_mut() {
let (para_id, outbound_status, mut signalling, mut begin, mut end) = *status;
let OutboundChannelDetails {
recipient: para_id,
state: outbound_state,
mut signals_exist,
mut first_index,
mut last_index,
} = *status;
if result.len() == max_message_count {
// We check this condition in the beginning of the loop so that we don't include
// a message where the limit is 0.
break
}
if outbound_status == OutboundStatus::Suspended {
if outbound_state == OutboundState::Suspended {
continue
}
let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(para_id) {
ChannelStatus::Closed => {
// This means that there is no such channel anymore. Nothing to be done but
// swallow the messages and discard the status.
for i in begin..end {
for i in first_index..last_index {
<OutboundXcmpMessages<T>>::remove(para_id, i);
}
if signalling {
if signals_exist {
<SignalMessages<T>>::remove(para_id);
}
*status = (para_id, OutboundStatus::Ok, false, 0, 0);
*status = OutboundChannelDetails::new(para_id);
continue
},
ChannelStatus::Full => continue,
ChannelStatus::Ready(n, e) => (n, e),
};
let page = if signalling {
let page = if signals_exist {
let page = <SignalMessages<T>>::get(para_id);
if page.len() < max_size_now {
<SignalMessages<T>>::remove(para_id);
signalling = false;
signals_exist = false;
page
} else {
continue
}
} else if end > begin {
let page = <OutboundXcmpMessages<T>>::get(para_id, begin);
} else if last_index > first_index {
let page = <OutboundXcmpMessages<T>>::get(para_id, first_index);
if page.len() < max_size_now {
<OutboundXcmpMessages<T>>::remove(para_id, begin);
begin += 1;
<OutboundXcmpMessages<T>>::remove(para_id, first_index);
first_index += 1;
page
} else {
continue
@@ -714,9 +776,9 @@ impl<T: Config> XcmpMessageSource for Pallet<T> {
} else {
continue
};
if begin == end {
begin = 0;
end = 0;
if first_index == last_index {
first_index = 0;
last_index = 0;
}
if page.len() > max_size_ever {
@@ -728,7 +790,13 @@ impl<T: Config> XcmpMessageSource for Pallet<T> {
result.push((para_id, page));
}
*status = (para_id, outbound_status, signalling, begin, end);
*status = OutboundChannelDetails {
recipient: para_id,
state: outbound_state,
signals_exist,
first_index,
last_index,
};
}
// Sort the outbound messages by ascending recipient para id to satisfy the acceptance
@@ -743,7 +811,9 @@ impl<T: Config> XcmpMessageSource for Pallet<T> {
//
// To mitigate this we shift all processed elements towards the end of the vector using
// `rotate_left`. To get intuition how it works see the examples in its rustdoc.
statuses.retain(|x| x.1 == OutboundStatus::Suspended || x.2 || x.3 < x.4);
statuses.retain(|x| {
x.state == OutboundState::Suspended || x.signals_exist || x.first_index < x.last_index
});
// old_status_len must be >= status.len() since we never add anything to status.
let pruned = old_statuses_len - statuses.len();