Don't drop UMP queue items if weight exhausted (#3784)

* Requeue UMP queue items if weight exhausted

* Reduce complexity and remove Deque

* Formatting

* Formatting

* Avoid needless storage writes

* Test

* Formatting

* Docs and cleanup

* fmt

* Remove now irrelevant comment.

* Simplify `take_processed` by using `mem::take`

* Clean up & fmt: use `upward_message` directly.

* Grumbles

Co-authored-by: Shawn Tabrizi <shawntabrizi@gmail.com>
Co-authored-by: Sergei Shulepov <sergei@parity.io>
This commit is contained in:
Gavin Wood
2021-09-08 19:57:26 +02:00
committed by GitHub
parent a14b667723
commit c5e25877f1
2 changed files with 158 additions and 240 deletions
+42 -4
View File
@@ -18,12 +18,15 @@
use crate::{ use crate::{
configuration, disputes, dmp, hrmp, inclusion, initializer, paras, paras_inherent, scheduler, configuration, disputes, dmp, hrmp, inclusion, initializer, paras, paras_inherent, scheduler,
session_info, shared, ump, session_info, shared,
ump::{self, MessageId, UmpSink},
ParaId,
}; };
use frame_support::{parameter_types, traits::GenesisBuild}; use frame_support::{parameter_types, traits::GenesisBuild, weights::Weight};
use frame_support_test::TestRandomness; use frame_support_test::TestRandomness;
use parity_scale_codec::Decode;
use primitives::v1::{ use primitives::v1::{
AuthorityDiscoveryId, Balance, BlockNumber, Header, SessionIndex, ValidatorIndex, AuthorityDiscoveryId, Balance, BlockNumber, Header, SessionIndex, UpwardMessage, ValidatorIndex,
}; };
use sp_core::H256; use sp_core::H256;
use sp_io::TestExternalities; use sp_io::TestExternalities;
@@ -128,7 +131,7 @@ parameter_types! {
impl crate::ump::Config for Test { impl crate::ump::Config for Test {
type Event = Event; type Event = Event;
type UmpSink = crate::ump::mock_sink::MockUmpSink; type UmpSink = TestUmpSink;
type FirstMessageFactorPercent = FirstMessageFactorPercent; type FirstMessageFactorPercent = FirstMessageFactorPercent;
} }
@@ -232,6 +235,41 @@ pub fn availability_rewards() -> HashMap<ValidatorIndex, usize> {
AVAILABILITY_REWARDS.with(|r| r.borrow().clone()) AVAILABILITY_REWARDS.with(|r| r.borrow().clone())
} }
std::thread_local! {
static PROCESSED: RefCell<Vec<(ParaId, UpwardMessage)>> = RefCell::new(vec![]);
}
/// Return which messages have been processed by `pocess_upward_message` and clear the buffer.
pub fn take_processed() -> Vec<(ParaId, UpwardMessage)> {
PROCESSED.with(|opt_hook| std::mem::take(&mut *opt_hook.borrow_mut()))
}
/// An implementation of a UMP sink that just records which messages were processed.
///
/// A message's weight is defined by the first 4 bytes of its data, which we decode into a
/// `u32`.
pub struct TestUmpSink;
impl UmpSink for TestUmpSink {
fn process_upward_message(
actual_origin: ParaId,
actual_msg: &[u8],
max_weight: Weight,
) -> Result<Weight, (MessageId, Weight)> {
let weight = match u32::decode(&mut &actual_msg[..]) {
Ok(w) => w as Weight,
Err(_) => return Ok(0), // same as the real `UmpSink`
};
if weight > max_weight {
let id = sp_io::hashing::blake2_256(actual_msg);
return Err((id, weight))
}
PROCESSED.with(|opt_hook| {
opt_hook.borrow_mut().push((actual_origin, actual_msg.to_owned()));
});
Ok(weight)
}
}
pub struct TestRewardValidators; pub struct TestRewardValidators;
impl inclusion::RewardValidators for TestRewardValidators { impl inclusion::RewardValidators for TestRewardValidators {
+110 -230
View File
@@ -21,11 +21,7 @@ use crate::{
use frame_support::pallet_prelude::*; use frame_support::pallet_prelude::*;
use primitives::v1::{Id as ParaId, UpwardMessage}; use primitives::v1::{Id as ParaId, UpwardMessage};
use sp_std::{ use sp_std::{
collections::{btree_map::BTreeMap, vec_deque::VecDeque}, collections::btree_map::BTreeMap, convert::TryFrom, fmt, marker::PhantomData, prelude::*,
convert::TryFrom,
fmt,
marker::PhantomData,
prelude::*,
}; };
use xcm::latest::Outcome; use xcm::latest::Outcome;
@@ -211,7 +207,7 @@ pub mod pallet {
/// The messages are processed in FIFO order. /// The messages are processed in FIFO order.
#[pallet::storage] #[pallet::storage]
pub type RelayDispatchQueues<T: Config> = pub type RelayDispatchQueues<T: Config> =
StorageMap<_, Twox64Concat, ParaId, VecDeque<UpwardMessage>, ValueQuery>; StorageMap<_, Twox64Concat, ParaId, Vec<UpwardMessage>, ValueQuery>;
/// Size of the dispatch queues. Caches sizes of the queues in `RelayDispatchQueue`. /// Size of the dispatch queues. Caches sizes of the queues in `RelayDispatchQueue`.
/// ///
@@ -407,23 +403,27 @@ impl<T: Config> Pallet<T> {
config.ump_service_total_weight - weight_used config.ump_service_total_weight - weight_used
}; };
// dequeue the next message from the queue of the dispatchee // attempt to process the next message from the queue of the dispatchee; if not beyond
let (upward_message, became_empty) = queue_cache.dequeue::<T>(dispatchee); // our remaining weight limit, then consume it.
if let Some(upward_message) = upward_message { let maybe_next = queue_cache.peek_front::<T>(dispatchee);
match T::UmpSink::process_upward_message( let became_empty = if let Some(upward_message) = maybe_next {
dispatchee, match T::UmpSink::process_upward_message(dispatchee, upward_message, max_weight) {
&upward_message[..], Ok(used) => {
max_weight, weight_used += used;
) { queue_cache.consume_front::<T>(dispatchee)
Ok(used) => weight_used += used, },
Err((id, required)) => { Err((id, required)) => {
// we process messages in order and don't drop them if we run out of weight, so need to break // we process messages in order and don't drop them if we run out of weight,
// here. // so need to break here without calling `consume_front`.
Self::deposit_event(Event::WeightExhausted(id, max_weight, required)); Self::deposit_event(Event::WeightExhausted(id, max_weight, required));
break break
}, },
} }
} } else {
// this should never happen, since the cursor should never point to an empty queue.
// it is resolved harmlessly here anyway.
true
};
if became_empty { if became_empty {
// the queue is empty now - this para doesn't need attention anymore. // the queue is empty now - this para doesn't need attention anymore.
@@ -442,8 +442,8 @@ impl<T: Config> Pallet<T> {
/// To avoid constant fetching, deserializing and serialization the queues are cached. /// To avoid constant fetching, deserializing and serialization the queues are cached.
/// ///
/// After an item dequeued from a queue for the first time, the queue is stored in this struct rather /// After an item dequeued from a queue for the first time, the queue is stored in this struct
/// than being serialized and persisted. /// rather than being serialized and persisted.
/// ///
/// This implementation works best when: /// This implementation works best when:
/// ///
@@ -461,9 +461,10 @@ impl<T: Config> Pallet<T> {
struct QueueCache(BTreeMap<ParaId, QueueCacheEntry>); struct QueueCache(BTreeMap<ParaId, QueueCacheEntry>);
struct QueueCacheEntry { struct QueueCacheEntry {
queue: VecDeque<UpwardMessage>, queue: Vec<UpwardMessage>,
count: u32,
total_size: u32, total_size: u32,
consumed_count: usize,
consumed_size: usize,
} }
impl QueueCache { impl QueueCache {
@@ -471,26 +472,35 @@ impl QueueCache {
Self(BTreeMap::new()) Self(BTreeMap::new())
} }
/// Dequeues one item from the upward message queue of the given para. fn ensure_cached<T: Config>(&mut self, para: ParaId) -> &mut QueueCacheEntry {
/// self.0.entry(para).or_insert_with(|| {
/// Returns `(upward_message, became_empty)`, where let queue = RelayDispatchQueues::<T>::get(&para);
/// let (_, total_size) = RelayDispatchQueueSize::<T>::get(&para);
/// - `upward_message` a dequeued message or `None` if the queue _was_ empty. QueueCacheEntry { queue, total_size, consumed_count: 0, consumed_size: 0 }
/// - `became_empty` is true if the queue _became_ empty. })
fn dequeue<T: Config>(&mut self, para: ParaId) -> (Option<UpwardMessage>, bool) {
let cache_entry = self.0.entry(para).or_insert_with(|| {
let queue = <Pallet<T> as Store>::RelayDispatchQueues::get(&para);
let (count, total_size) = <Pallet<T> as Store>::RelayDispatchQueueSize::get(&para);
QueueCacheEntry { queue, count, total_size }
});
let upward_message = cache_entry.queue.pop_front();
if let Some(ref msg) = upward_message {
cache_entry.count -= 1;
cache_entry.total_size -= msg.len() as u32;
} }
let became_empty = cache_entry.queue.is_empty(); /// Returns the message at the front of `para`'s queue, or `None` if the queue is empty.
(upward_message, became_empty) ///
/// Does not mutate the queue.
fn peek_front<T: Config>(&mut self, para: ParaId) -> Option<&UpwardMessage> {
let entry = self.ensure_cached::<T>(para);
entry.queue.get(entry.consumed_count)
}
/// Attempts to remove one message from the front of `para`'s queue. If the queue is empty, then
/// does nothing.
///
/// Returns `true` iff there are no more messages in the queue after the removal attempt.
fn consume_front<T: Config>(&mut self, para: ParaId) -> bool {
let cache_entry = self.ensure_cached::<T>(para);
let upward_message = cache_entry.queue.get(cache_entry.consumed_count);
if let Some(msg) = upward_message {
cache_entry.consumed_count += 1;
cache_entry.consumed_size += msg.len();
}
cache_entry.consumed_count >= cache_entry.queue.len()
} }
/// Flushes the updated queues into the storage. /// Flushes the updated queues into the storage.
@@ -498,14 +508,16 @@ impl QueueCache {
// NOTE we use an explicit method here instead of Drop impl because it has unwanted semantics // NOTE we use an explicit method here instead of Drop impl because it has unwanted semantics
// within runtime. It is dangerous to use because of double-panics and flushing on a panic // within runtime. It is dangerous to use because of double-panics and flushing on a panic
// is not necessary as well. // is not necessary as well.
for (para, QueueCacheEntry { queue, count, total_size }) in self.0 { for (para, entry) in self.0 {
if queue.is_empty() { if entry.consumed_count >= entry.queue.len() {
// remove the entries altogether. // remove the entries altogether.
<Pallet<T> as Store>::RelayDispatchQueues::remove(&para); RelayDispatchQueues::<T>::remove(&para);
<Pallet<T> as Store>::RelayDispatchQueueSize::remove(&para); RelayDispatchQueueSize::<T>::remove(&para);
} else { } else if entry.consumed_count > 0 {
<Pallet<T> as Store>::RelayDispatchQueues::insert(&para, queue); RelayDispatchQueues::<T>::insert(&para, &entry.queue[entry.consumed_count..]);
<Pallet<T> as Store>::RelayDispatchQueueSize::insert(&para, (count, total_size)); let count = (entry.queue.len() - entry.consumed_count) as u32;
let size = entry.total_size.saturating_sub(entry.consumed_size as u32);
RelayDispatchQueueSize::<T>::insert(&para, (count, size));
} }
} }
} }
@@ -586,137 +598,10 @@ impl NeedsDispatchCursor {
} }
#[cfg(test)] #[cfg(test)]
pub(crate) mod mock_sink { pub(crate) mod tests {
//! An implementation of a mock UMP sink that allows attaching a probe for mocking the weights use super::*;
//! and checking the sent messages. use crate::mock::{new_test_ext, take_processed, Configuration, MockGenesisConfig, Ump};
//!
//! A default behavior of the UMP sink is to ignore an incoming message and return 0 weight.
//!
//! A probe can be attached to the mock UMP sink. When attached, the mock sink would consult the
//! probe to check whether the received message was expected and what weight it should return.
//!
//! There are two rules on how to use a probe:
//!
//! 1. There can be only one active probe at a time. Creation of another probe while there is
//! already an active one leads to a panic. The probe is scoped to a thread where it was created.
//!
//! 2. All messages expected by the probe must be received by the time of dropping it. Unreceived
//! messages will lead to a panic while dropping a probe.
use super::{MessageId, ParaId, UmpSink, UpwardMessage};
use frame_support::weights::Weight; use frame_support::weights::Weight;
use std::{cell::RefCell, collections::vec_deque::VecDeque};
#[derive(Debug)]
struct UmpExpectation {
expected_origin: ParaId,
expected_msg: UpwardMessage,
mock_weight: Weight,
}
std::thread_local! {
// `Some` here indicates that there is an active probe.
static HOOK: RefCell<Option<VecDeque<UmpExpectation>>> = RefCell::new(None);
}
pub struct MockUmpSink;
impl UmpSink for MockUmpSink {
fn process_upward_message(
actual_origin: ParaId,
actual_msg: &[u8],
_max_weight: Weight,
) -> Result<Weight, (MessageId, Weight)> {
Ok(HOOK
.with(|opt_hook| {
opt_hook.borrow_mut().as_mut().map(|hook| {
let UmpExpectation { expected_origin, expected_msg, mock_weight } =
match hook.pop_front() {
Some(expectation) => expectation,
None => {
panic!(
"The probe is active but didn't expect the message:\n\n\t{:?}.",
actual_msg,
);
},
};
assert_eq!(expected_origin, actual_origin);
assert_eq!(expected_msg, &actual_msg[..]);
mock_weight
})
})
.unwrap_or(0))
}
}
pub struct Probe {
_private: (),
}
impl Probe {
pub fn new() -> Self {
HOOK.with(|opt_hook| {
let prev = opt_hook.borrow_mut().replace(VecDeque::default());
// that can trigger if there were two probes were created during one session which
// is may be a bit strict, but may save time figuring out what's wrong.
// if you land here and you do need the two probes in one session consider
// dropping the the existing probe explicitly.
assert!(prev.is_none());
});
Self { _private: () }
}
/// Add an expected message.
///
/// The enqueued messages are processed in FIFO order.
pub fn assert_msg(
&mut self,
expected_origin: ParaId,
expected_msg: UpwardMessage,
mock_weight: Weight,
) {
HOOK.with(|opt_hook| {
opt_hook.borrow_mut().as_mut().unwrap().push_back(UmpExpectation {
expected_origin,
expected_msg,
mock_weight,
})
});
}
}
impl Drop for Probe {
fn drop(&mut self) {
let _ = HOOK.try_with(|opt_hook| {
let prev = opt_hook.borrow_mut().take().expect(
"this probe was created and hasn't been yet destroyed;
the probe cannot be replaced;
there is only one probe at a time allowed;
thus it cannot be `None`;
qed",
);
if !prev.is_empty() {
// some messages are left unchecked. We should notify the developer about this.
// however, we do so only if the thread doesn't panic already. Otherwise, the
// developer would get a SIGILL or SIGABRT without a meaningful error message.
if !std::thread::panicking() {
panic!(
"the probe is dropped and not all expected messages arrived: {:?}",
prev
);
}
}
});
// an `Err` here signals here that the thread local was already destroyed.
}
}
}
#[cfg(test)]
mod tests {
use super::{mock_sink::Probe, *};
use crate::mock::{new_test_ext, Configuration, MockGenesisConfig, Ump};
use std::collections::HashSet; use std::collections::HashSet;
struct GenesisConfigBuilder { struct GenesisConfigBuilder {
@@ -826,15 +711,12 @@ mod tests {
#[test] #[test]
fn dispatch_single_message() { fn dispatch_single_message() {
let a = ParaId::from(228); let a = ParaId::from(228);
let msg = vec![1, 2, 3]; let msg = 1000u32.encode();
new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
let mut probe = Probe::new(); queue_upward_msg(a, msg.clone());
probe.assert_msg(a, msg.clone(), 0);
queue_upward_msg(a, msg);
Ump::process_pending_upward_messages(); Ump::process_pending_upward_messages();
assert_eq!(take_processed(), vec![(a, msg)]);
assert_storage_consistency_exhaustive(); assert_storage_consistency_exhaustive();
}); });
@@ -846,11 +728,11 @@ mod tests {
let c = ParaId::from(228); let c = ParaId::from(228);
let q = ParaId::from(911); let q = ParaId::from(911);
let a_msg_1 = vec![1, 2, 3]; let a_msg_1 = (200u32, "a_msg_1").encode();
let a_msg_2 = vec![3, 2, 1]; let a_msg_2 = (100u32, "a_msg_2").encode();
let c_msg_1 = vec![4, 5, 6]; let c_msg_1 = (300u32, "c_msg_1").encode();
let c_msg_2 = vec![9, 8, 7]; let c_msg_2 = (100u32, "c_msg_2").encode();
let q_msg = b"we are Q".to_vec(); let q_msg = (500u32, "q_msg").encode();
new_test_ext( new_test_ext(
GenesisConfigBuilder { ump_service_total_weight: 500, ..Default::default() }.build(), GenesisConfigBuilder { ump_service_total_weight: 500, ..Default::default() }.build(),
@@ -864,52 +746,60 @@ mod tests {
assert_storage_consistency_exhaustive(); assert_storage_consistency_exhaustive();
// we expect only two first messages to fit in the first iteration. // we expect only two first messages to fit in the first iteration.
{
let mut probe = Probe::new();
probe.assert_msg(a, a_msg_1.clone(), 300);
probe.assert_msg(c, c_msg_1.clone(), 300);
Ump::process_pending_upward_messages(); Ump::process_pending_upward_messages();
assert_eq!(take_processed(), vec![(a, a_msg_1), (c, c_msg_1)]);
assert_storage_consistency_exhaustive(); assert_storage_consistency_exhaustive();
drop(probe);
}
queue_upward_msg(c, c_msg_2.clone()); queue_upward_msg(c, c_msg_2.clone());
assert_storage_consistency_exhaustive(); assert_storage_consistency_exhaustive();
// second iteration should process the second message. // second iteration should process the second message.
{
let mut probe = Probe::new();
probe.assert_msg(q, q_msg.clone(), 500);
Ump::process_pending_upward_messages(); Ump::process_pending_upward_messages();
assert_eq!(take_processed(), vec![(q, q_msg)]);
assert_storage_consistency_exhaustive(); assert_storage_consistency_exhaustive();
drop(probe);
}
// 3rd iteration. // 3rd iteration.
{
let mut probe = Probe::new();
probe.assert_msg(a, a_msg_2.clone(), 100);
probe.assert_msg(c, c_msg_2.clone(), 100);
Ump::process_pending_upward_messages(); Ump::process_pending_upward_messages();
assert_eq!(take_processed(), vec![(a, a_msg_2), (c, c_msg_2)]);
assert_storage_consistency_exhaustive(); assert_storage_consistency_exhaustive();
drop(probe);
}
// finally, make sure that the queue is empty. // finally, make sure that the queue is empty.
{
let probe = Probe::new();
Ump::process_pending_upward_messages(); Ump::process_pending_upward_messages();
assert_eq!(take_processed(), vec![]);
assert_storage_consistency_exhaustive();
});
}
#[test]
fn dispatch_keeps_message_after_weight_exhausted() {
let a = ParaId::from(128);
let a_msg_1 = (300u32, "a_msg_1").encode();
let a_msg_2 = (300u32, "a_msg_2").encode();
new_test_ext(
GenesisConfigBuilder { ump_service_total_weight: 500, ..Default::default() }.build(),
)
.execute_with(|| {
queue_upward_msg(a, a_msg_1.clone());
queue_upward_msg(a, a_msg_2.clone());
assert_storage_consistency_exhaustive(); assert_storage_consistency_exhaustive();
drop(probe); // we expect only one message to fit in the first iteration.
} Ump::process_pending_upward_messages();
assert_eq!(take_processed(), vec![(a, a_msg_1)]);
assert_storage_consistency_exhaustive();
// second iteration should process the remaining message.
Ump::process_pending_upward_messages();
assert_eq!(take_processed(), vec![(a, a_msg_2)]);
assert_storage_consistency_exhaustive();
// finally, make sure that the queue is empty.
Ump::process_pending_upward_messages();
assert_eq!(take_processed(), vec![]);
assert_storage_consistency_exhaustive();
}); });
} }
@@ -918,9 +808,9 @@ mod tests {
let a = ParaId::from(1991); let a = ParaId::from(1991);
let b = ParaId::from(1999); let b = ParaId::from(1999);
let a_msg_1 = vec![1, 2, 3]; let a_msg_1 = (300u32, "a_msg_1").encode();
let a_msg_2 = vec![3, 2, 1]; let a_msg_2 = (300u32, "a_msg_2").encode();
let b_msg_1 = vec![4, 5, 6]; let b_msg_1 = (300u32, "b_msg_1").encode();
new_test_ext( new_test_ext(
GenesisConfigBuilder { ump_service_total_weight: 900, ..Default::default() }.build(), GenesisConfigBuilder { ump_service_total_weight: 900, ..Default::default() }.build(),
@@ -935,18 +825,8 @@ mod tests {
queue_upward_msg(a, a_msg_1.clone()); queue_upward_msg(a, a_msg_1.clone());
queue_upward_msg(a, a_msg_2.clone()); queue_upward_msg(a, a_msg_2.clone());
queue_upward_msg(b, b_msg_1.clone()); queue_upward_msg(b, b_msg_1.clone());
{
let mut probe = Probe::new();
probe.assert_msg(a, a_msg_1.clone(), 300);
probe.assert_msg(b, b_msg_1.clone(), 300);
probe.assert_msg(a, a_msg_2.clone(), 300);
Ump::process_pending_upward_messages(); Ump::process_pending_upward_messages();
assert_eq!(take_processed(), vec![(a, a_msg_1), (b, b_msg_1), (a, a_msg_2)]);
drop(probe);
}
}); });
} }