Better scoped fallback in read_messaging_state_snapshot() (#2852)

* moved changes to master

* spelling

* one last name change
This commit is contained in:
Bradley Olson
2023-07-11 04:30:47 -07:00
committed by GitHub
parent 7074281392
commit cc5e0ae9ab
4 changed files with 25 additions and 16 deletions
@@ -414,11 +414,11 @@ impl ReconnectingWebsocketWorker {
let urls = std::mem::take(&mut self.ws_urls); let urls = std::mem::take(&mut self.ws_urls);
let Ok(mut client_manager) = ClientManager::new(urls).await else { let Ok(mut client_manager) = ClientManager::new(urls).await else {
tracing::error!(target: LOG_TARGET, "No valid RPC url found. Stopping RPC worker."); tracing::error!(target: LOG_TARGET, "No valid RPC url found. Stopping RPC worker.");
return; return
}; };
let Ok(mut subscriptions) = client_manager.get_subscriptions().await else { let Ok(mut subscriptions) = client_manager.get_subscriptions().await else {
tracing::error!(target: LOG_TARGET, "Unable to fetch subscriptions on initial connection."); tracing::error!(target: LOG_TARGET, "Unable to fetch subscriptions on initial connection.");
return; return
}; };
let mut imported_blocks_cache = let mut imported_blocks_cache =
+2 -2
View File
@@ -229,7 +229,7 @@ pub mod pallet {
}; };
<PendingUpwardMessages<T>>::mutate(|up| { <PendingUpwardMessages<T>>::mutate(|up| {
let queue_size = relevant_messaging_state.relay_dispatch_queue_size; let queue_size = relevant_messaging_state.relay_dispatch_queue_remaining_capacity;
let available_capacity = cmp::min( let available_capacity = cmp::min(
queue_size.remaining_count, queue_size.remaining_count,
@@ -1052,7 +1052,7 @@ impl<T: Config> Pallet<T> {
pub fn open_outbound_hrmp_channel_for_benchmarks(target_parachain: ParaId) { pub fn open_outbound_hrmp_channel_for_benchmarks(target_parachain: ParaId) {
RelevantMessagingState::<T>::put(MessagingStateSnapshot { RelevantMessagingState::<T>::put(MessagingStateSnapshot {
dmq_mqc_head: Default::default(), dmq_mqc_head: Default::default(),
relay_dispatch_queue_size: Default::default(), relay_dispatch_queue_remaining_capacity: Default::default(),
ingress_channels: Default::default(), ingress_channels: Default::default(),
egress_channels: vec![( egress_channels: vec![(
target_parachain, target_parachain,
@@ -28,7 +28,7 @@ use sp_trie::{HashDBT, MemoryDB, StorageProof, EMPTY_PREFIX};
// The field order should stay the same as the data can be found in the proof to ensure both are // The field order should stay the same as the data can be found in the proof to ensure both are
// have the same encoded representation. // have the same encoded representation.
#[derive(Clone, Encode, Decode, TypeInfo, Default)] #[derive(Clone, Encode, Decode, TypeInfo, Default)]
pub struct RelayDispachQueueSize { pub struct RelayDispatchQueueRemainingCapacity {
/// The number of additional messages that can be enqueued. /// The number of additional messages that can be enqueued.
pub remaining_count: u32, pub remaining_count: u32,
/// The total size of additional messages that can be enqueued. /// The total size of additional messages that can be enqueued.
@@ -48,7 +48,7 @@ pub struct MessagingStateSnapshot {
pub dmq_mqc_head: relay_chain::Hash, pub dmq_mqc_head: relay_chain::Hash,
/// The current capacity of the upward message queue of the current parachain on the relay chain. /// The current capacity of the upward message queue of the current parachain on the relay chain.
pub relay_dispatch_queue_size: RelayDispachQueueSize, pub relay_dispatch_queue_remaining_capacity: RelayDispatchQueueRemainingCapacity,
/// Information about all the inbound HRMP channels. /// Information about all the inbound HRMP channels.
/// ///
@@ -86,7 +86,7 @@ pub enum Error {
/// The DMQ MQC head cannot be extracted. /// The DMQ MQC head cannot be extracted.
DmqMqcHead(ReadEntryErr), DmqMqcHead(ReadEntryErr),
/// Relay dispatch queue cannot be extracted. /// Relay dispatch queue cannot be extracted.
RelayDispatchQueueSize(ReadEntryErr), RelayDispatchQueueRemainingCapacity(ReadEntryErr),
/// The hrmp inress channel index cannot be extracted. /// The hrmp inress channel index cannot be extracted.
HrmpIngressChannelIndex(ReadEntryErr), HrmpIngressChannelIndex(ReadEntryErr),
/// The hrmp egress channel index cannot be extracted. /// The hrmp egress channel index cannot be extracted.
@@ -183,7 +183,10 @@ impl RelayChainStateProof {
) )
.map_err(Error::DmqMqcHead)?; .map_err(Error::DmqMqcHead)?;
let relay_dispatch_queue_size = read_optional_entry::<RelayDispachQueueSize, _>( let relay_dispatch_queue_remaining_capacity = read_optional_entry::<
RelayDispatchQueueRemainingCapacity,
_,
>(
&self.trie_backend, &self.trie_backend,
&relay_chain::well_known_keys::relay_dispatch_queue_remaining_capacity(self.para_id) &relay_chain::well_known_keys::relay_dispatch_queue_remaining_capacity(self.para_id)
.key, .key,
@@ -195,22 +198,26 @@ impl RelayChainStateProof {
// this code here needs to be removed and above needs to be changed to `read_entry` that // this code here needs to be removed and above needs to be changed to `read_entry` that
// returns an error if `relay_dispatch_queue_remaining_capacity` can not be found/decoded. // returns an error if `relay_dispatch_queue_remaining_capacity` can not be found/decoded.
// //
// For now we just fallback to the old dispatch queue size if there is an error. // For now we just fallback to the old dispatch queue size on `ReadEntryErr::Absent`.
let relay_dispatch_queue_size = match relay_dispatch_queue_size { // `ReadEntryErr::Decode` and `ReadEntryErr::Proof` are potentially subject to meddling
// by malicious collators, so we reject the block in those cases.
let relay_dispatch_queue_remaining_capacity = match relay_dispatch_queue_remaining_capacity
{
Ok(Some(r)) => r, Ok(Some(r)) => r,
_ => { Ok(None) => {
let res = read_entry::<(u32, u32), _>( let res = read_entry::<(u32, u32), _>(
&self.trie_backend, &self.trie_backend,
#[allow(deprecated)] #[allow(deprecated)]
&relay_chain::well_known_keys::relay_dispatch_queue_size(self.para_id), &relay_chain::well_known_keys::relay_dispatch_queue_size(self.para_id),
Some((0, 0)), Some((0, 0)),
) )
.map_err(Error::RelayDispatchQueueSize)?; .map_err(Error::RelayDispatchQueueRemainingCapacity)?;
let remaining_count = host_config.max_upward_queue_count.saturating_sub(res.0); let remaining_count = host_config.max_upward_queue_count.saturating_sub(res.0);
let remaining_size = host_config.max_upward_queue_size.saturating_sub(res.1); let remaining_size = host_config.max_upward_queue_size.saturating_sub(res.1);
RelayDispachQueueSize { remaining_count, remaining_size } RelayDispatchQueueRemainingCapacity { remaining_count, remaining_size }
}, },
Err(e) => return Err(Error::RelayDispatchQueueRemainingCapacity(e)),
}; };
let ingress_channel_index: Vec<ParaId> = read_entry( let ingress_channel_index: Vec<ParaId> = read_entry(
@@ -255,7 +262,7 @@ impl RelayChainStateProof {
// by relying on the fact that `ingress_channel_index` and `egress_channel_index` are themselves sorted. // by relying on the fact that `ingress_channel_index` and `egress_channel_index` are themselves sorted.
Ok(MessagingStateSnapshot { Ok(MessagingStateSnapshot {
dmq_mqc_head, dmq_mqc_head,
relay_dispatch_queue_size, relay_dispatch_queue_remaining_capacity,
ingress_channels, ingress_channels,
egress_channels, egress_channels,
}) })
+4 -2
View File
@@ -124,13 +124,15 @@ impl RelayStateSproofBuilder {
dmq_mqc_head.encode(), dmq_mqc_head.encode(),
); );
} }
if let Some(relay_dispatch_queue_size) = self.relay_dispatch_queue_remaining_capacity { if let Some(relay_dispatch_queue_remaining_capacity) =
self.relay_dispatch_queue_remaining_capacity
{
insert( insert(
relay_chain::well_known_keys::relay_dispatch_queue_remaining_capacity( relay_chain::well_known_keys::relay_dispatch_queue_remaining_capacity(
self.para_id, self.para_id,
) )
.key, .key,
relay_dispatch_queue_size.encode(), relay_dispatch_queue_remaining_capacity.encode(),
); );
} }
if let Some(upgrade_go_ahead) = self.upgrade_go_ahead { if let Some(upgrade_go_ahead) = self.upgrade_go_ahead {