diff --git a/cumulus/client/relay-chain-rpc-interface/src/reconnecting_ws_client.rs b/cumulus/client/relay-chain-rpc-interface/src/reconnecting_ws_client.rs index 803a27d00f..5b5babed0d 100644 --- a/cumulus/client/relay-chain-rpc-interface/src/reconnecting_ws_client.rs +++ b/cumulus/client/relay-chain-rpc-interface/src/reconnecting_ws_client.rs @@ -414,11 +414,11 @@ impl ReconnectingWebsocketWorker { let urls = std::mem::take(&mut self.ws_urls); let Ok(mut client_manager) = ClientManager::new(urls).await else { tracing::error!(target: LOG_TARGET, "No valid RPC url found. Stopping RPC worker."); - return; + return }; let Ok(mut subscriptions) = client_manager.get_subscriptions().await else { tracing::error!(target: LOG_TARGET, "Unable to fetch subscriptions on initial connection."); - return; + return }; let mut imported_blocks_cache = diff --git a/cumulus/pallets/parachain-system/src/lib.rs b/cumulus/pallets/parachain-system/src/lib.rs index cda0ffe6b3..2173c5999d 100644 --- a/cumulus/pallets/parachain-system/src/lib.rs +++ b/cumulus/pallets/parachain-system/src/lib.rs @@ -229,7 +229,7 @@ pub mod pallet { }; >::mutate(|up| { - let queue_size = relevant_messaging_state.relay_dispatch_queue_size; + let queue_size = relevant_messaging_state.relay_dispatch_queue_remaining_capacity; let available_capacity = cmp::min( queue_size.remaining_count, @@ -1052,7 +1052,7 @@ impl Pallet { pub fn open_outbound_hrmp_channel_for_benchmarks(target_parachain: ParaId) { RelevantMessagingState::::put(MessagingStateSnapshot { dmq_mqc_head: Default::default(), - relay_dispatch_queue_size: Default::default(), + relay_dispatch_queue_remaining_capacity: Default::default(), ingress_channels: Default::default(), egress_channels: vec![( target_parachain, diff --git a/cumulus/pallets/parachain-system/src/relay_state_snapshot.rs b/cumulus/pallets/parachain-system/src/relay_state_snapshot.rs index e3763549d4..306252ca77 100644 --- a/cumulus/pallets/parachain-system/src/relay_state_snapshot.rs +++ b/cumulus/pallets/parachain-system/src/relay_state_snapshot.rs @@ -28,7 +28,7 @@ use sp_trie::{HashDBT, MemoryDB, StorageProof, EMPTY_PREFIX}; // The field order should stay the same as the data can be found in the proof to ensure both are // have the same encoded representation. #[derive(Clone, Encode, Decode, TypeInfo, Default)] -pub struct RelayDispachQueueSize { +pub struct RelayDispatchQueueRemainingCapacity { /// The number of additional messages that can be enqueued. pub remaining_count: u32, /// The total size of additional messages that can be enqueued. @@ -48,7 +48,7 @@ pub struct MessagingStateSnapshot { pub dmq_mqc_head: relay_chain::Hash, /// The current capacity of the upward message queue of the current parachain on the relay chain. - pub relay_dispatch_queue_size: RelayDispachQueueSize, + pub relay_dispatch_queue_remaining_capacity: RelayDispatchQueueRemainingCapacity, /// Information about all the inbound HRMP channels. /// @@ -86,7 +86,7 @@ pub enum Error { /// The DMQ MQC head cannot be extracted. DmqMqcHead(ReadEntryErr), /// Relay dispatch queue cannot be extracted. - RelayDispatchQueueSize(ReadEntryErr), + RelayDispatchQueueRemainingCapacity(ReadEntryErr), /// The hrmp inress channel index cannot be extracted. HrmpIngressChannelIndex(ReadEntryErr), /// The hrmp egress channel index cannot be extracted. @@ -183,7 +183,10 @@ impl RelayChainStateProof { ) .map_err(Error::DmqMqcHead)?; - let relay_dispatch_queue_size = read_optional_entry::( + let relay_dispatch_queue_remaining_capacity = read_optional_entry::< + RelayDispatchQueueRemainingCapacity, + _, + >( &self.trie_backend, &relay_chain::well_known_keys::relay_dispatch_queue_remaining_capacity(self.para_id) .key, @@ -195,22 +198,26 @@ impl RelayChainStateProof { // this code here needs to be removed and above needs to be changed to `read_entry` that // returns an error if `relay_dispatch_queue_remaining_capacity` can not be found/decoded. // - // For now we just fallback to the old dispatch queue size if there is an error. - let relay_dispatch_queue_size = match relay_dispatch_queue_size { + // For now we just fallback to the old dispatch queue size on `ReadEntryErr::Absent`. + // `ReadEntryErr::Decode` and `ReadEntryErr::Proof` are potentially subject to meddling + // by malicious collators, so we reject the block in those cases. + let relay_dispatch_queue_remaining_capacity = match relay_dispatch_queue_remaining_capacity + { Ok(Some(r)) => r, - _ => { + Ok(None) => { let res = read_entry::<(u32, u32), _>( &self.trie_backend, #[allow(deprecated)] &relay_chain::well_known_keys::relay_dispatch_queue_size(self.para_id), Some((0, 0)), ) - .map_err(Error::RelayDispatchQueueSize)?; + .map_err(Error::RelayDispatchQueueRemainingCapacity)?; let remaining_count = host_config.max_upward_queue_count.saturating_sub(res.0); let remaining_size = host_config.max_upward_queue_size.saturating_sub(res.1); - RelayDispachQueueSize { remaining_count, remaining_size } + RelayDispatchQueueRemainingCapacity { remaining_count, remaining_size } }, + Err(e) => return Err(Error::RelayDispatchQueueRemainingCapacity(e)), }; let ingress_channel_index: Vec = read_entry( @@ -255,7 +262,7 @@ impl RelayChainStateProof { // by relying on the fact that `ingress_channel_index` and `egress_channel_index` are themselves sorted. Ok(MessagingStateSnapshot { dmq_mqc_head, - relay_dispatch_queue_size, + relay_dispatch_queue_remaining_capacity, ingress_channels, egress_channels, }) diff --git a/cumulus/test/relay-sproof-builder/src/lib.rs b/cumulus/test/relay-sproof-builder/src/lib.rs index fd98899ed3..d497025d15 100644 --- a/cumulus/test/relay-sproof-builder/src/lib.rs +++ b/cumulus/test/relay-sproof-builder/src/lib.rs @@ -124,13 +124,15 @@ impl RelayStateSproofBuilder { dmq_mqc_head.encode(), ); } - if let Some(relay_dispatch_queue_size) = self.relay_dispatch_queue_remaining_capacity { + if let Some(relay_dispatch_queue_remaining_capacity) = + self.relay_dispatch_queue_remaining_capacity + { insert( relay_chain::well_known_keys::relay_dispatch_queue_remaining_capacity( self.para_id, ) .key, - relay_dispatch_queue_size.encode(), + relay_dispatch_queue_remaining_capacity.encode(), ); } if let Some(upgrade_go_ahead) = self.upgrade_go_ahead {