diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 5219306c7a..512d506241 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -6096,6 +6096,7 @@ dependencies = [ "futures 0.3.13", "futures-timer 3.0.2", "log", + "lru", "metered-channel", "parity-scale-codec", "parking_lot 0.11.1", diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 83ea629f49..11537b2e3e 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -592,7 +592,7 @@ impl metrics::Metrics for Metrics { #[cfg(test)] mod tests { - use super::*; + use super::*; use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_primitives::v1::{HeadData, UpwardMessage}; use sp_core::testing::TaskExecutor; diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index 050b9d2b27..441ee502b0 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -191,7 +191,7 @@ impl State { event: NetworkBridgeEvent, ) { match event { - NetworkBridgeEvent::PeerConnected(peer_id, role) => { + NetworkBridgeEvent::PeerConnected(peer_id, role, _) => { // insert a blank view if none already present tracing::trace!( target: LOG_TARGET, diff --git a/polkadot/node/network/approval-distribution/src/tests.rs b/polkadot/node/network/approval-distribution/src/tests.rs index 8b33082e5d..47a444fc77 100644 --- a/polkadot/node/network/approval-distribution/src/tests.rs +++ b/polkadot/node/network/approval-distribution/src/tests.rs @@ -117,7 +117,7 @@ async fn setup_peer_with_view( overseer_send( virtual_overseer, ApprovalDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_id.clone(), ObservedRole::Full) + NetworkBridgeEvent::PeerConnected(peer_id.clone(), ObservedRole::Full, None) ) ).await; overseer_send( diff --git a/polkadot/node/network/availability-distribution/src/error.rs b/polkadot/node/network/availability-distribution/src/error.rs index 5f891325f3..c15669f406 100644 --- a/polkadot/node/network/availability-distribution/src/error.rs +++ b/polkadot/node/network/availability-distribution/src/error.rs @@ -18,12 +18,15 @@ //! Error handling related code and Error/Result definitions. use polkadot_node_network_protocol::request_response::request::RequestError; +use polkadot_primitives::v1::SessionIndex; use thiserror::Error; use futures::channel::oneshot; -use polkadot_node_subsystem_util::Error as UtilError; -use polkadot_primitives::v1::SessionIndex; +use polkadot_node_subsystem_util::{ + runtime, + Error as UtilError, +}; use polkadot_subsystem::{errors::RuntimeApiError, SubsystemError}; use crate::LOG_TARGET; @@ -74,10 +77,6 @@ pub enum Error { #[error("Runtime API error")] RuntimeRequest(RuntimeApiError), - /// We tried fetching a session info which was not available. - #[error("There was no session with the given index")] - NoSuchSession(SessionIndex), - /// Fetching PoV failed with `RequestError`. #[error("FetchPoV request error")] FetchPoV(#[source] RequestError), @@ -92,10 +91,24 @@ pub enum Error { /// No validator with the index could be found in current session. #[error("Given validator index could not be found")] InvalidValidatorIndex, + + /// We tried fetching a session info which was not available. + #[error("There was no session with the given index")] + NoSuchSession(SessionIndex), + + /// Errors coming from runtime::Runtime. + #[error("Error while accessing runtime information")] + Runtime(#[source] runtime::Error), } pub type Result = std::result::Result; +impl From for Error { + fn from(err: runtime::Error) -> Self { + Self::Runtime(err) + } +} + impl From for Error { fn from(err: SubsystemError) -> Self { Self::IncomingMessageChannel(err) diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index 7f9eb739a4..7c522859a0 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -28,9 +28,7 @@ mod error; pub use error::Error; use error::{Result, log_error}; -/// Runtime requests. -mod runtime; -use runtime::Runtime; +use polkadot_node_subsystem_util::runtime::Runtime; /// `Requester` taking care of requesting chunks for candidates pending availability. mod requester; diff --git a/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs b/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs index 5aecdb26d7..7bb5f25397 100644 --- a/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs @@ -33,8 +33,9 @@ use polkadot_subsystem::{ ActiveLeavesUpdate, SubsystemContext, ActivatedLeaf, messages::{AllMessages, NetworkBridgeMessage, IfDisconnected} }; +use polkadot_node_subsystem_util::runtime::{Runtime, ValidatorInfo}; -use crate::{error::{Error, log_error}, runtime::{Runtime, ValidatorInfo}}; +use crate::error::{Error, log_error}; /// Number of sessions we want to keep in the LRU. const NUM_SESSIONS: usize = 2; @@ -274,7 +275,7 @@ mod tests { let (mut context, mut virtual_overseer) = test_helpers::make_subsystem_context::(pool.clone()); let keystore = make_ferdie_keystore(); - let mut runtime = crate::runtime::Runtime::new(keystore); + let mut runtime = polkadot_node_subsystem_util::runtime::Runtime::new(keystore); let (tx, rx) = oneshot::channel(); let testee = async { diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs index 971ac939ac..dbe9a61cf5 100644 --- a/polkadot/node/network/bitfield-distribution/src/lib.rs +++ b/polkadot/node/network/bitfield-distribution/src/lib.rs @@ -508,7 +508,7 @@ where let _timer = metrics.time_handle_network_msg(); match bridge_message { - NetworkBridgeEvent::PeerConnected(peerid, role) => { + NetworkBridgeEvent::PeerConnected(peerid, role, _) => { tracing::trace!( target: LOG_TARGET, ?peerid, @@ -1335,7 +1335,7 @@ mod test { &mut ctx, &mut state, &Default::default(), - NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None), )); // make peer b interested diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index dc921047f9..fe89893a65 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -36,7 +36,7 @@ use polkadot_subsystem::messages::{ NetworkBridgeMessage, AllMessages, CollatorProtocolMessage, NetworkBridgeEvent, }; -use polkadot_primitives::v1::{Hash, BlockNumber}; +use polkadot_primitives::v1::{Hash, BlockNumber, AuthorityDiscoveryId}; use polkadot_node_network_protocol::{ PeerId, peer_set::PeerSet, View, v1 as protocol_v1, OurView, UnifiedReputationChange as Rep, ObservedRole, @@ -316,7 +316,7 @@ impl From for UnexpectedAbort { // notifications to be passed through to the validator discovery worker. enum ValidatorDiscoveryNotification { - PeerConnected(PeerId, PeerSet), + PeerConnected(PeerId, PeerSet, Option), PeerDisconnected(PeerId, PeerSet), } @@ -540,11 +540,11 @@ where }, notification = validator_discovery_notifications.next().fuse() => match notification { None => return Ok(()), - Some(ValidatorDiscoveryNotification::PeerConnected(peer, peer_set)) => { + Some(ValidatorDiscoveryNotification::PeerConnected(peer, peer_set, maybe_auth)) => { validator_discovery.on_peer_connected( peer.clone(), peer_set, - &mut authority_discovery_service, + maybe_auth, ).await; } Some(ValidatorDiscoveryNotification::PeerDisconnected(peer, peer_set)) => { @@ -555,9 +555,10 @@ where } } -async fn handle_network_messages( +async fn handle_network_messages( mut sender: impl SubsystemSender, mut network_service: impl Network, + mut authority_discovery_service: AD, mut request_multiplexer: RequestMultiplexer, mut validator_discovery_notifications: mpsc::Sender, metrics: Metrics, @@ -607,10 +608,14 @@ async fn handle_network_messages( shared.local_view.clone().unwrap_or(View::default()) }; + let maybe_authority = + authority_discovery_service + .get_authority_id_by_peer_id(peer).await; + // Failure here means that the other side of the network bridge // has concluded and this future will be dropped in due course. let _ = validator_discovery_notifications.send( - ValidatorDiscoveryNotification::PeerConnected(peer.clone(), peer_set) + ValidatorDiscoveryNotification::PeerConnected(peer, peer_set, maybe_authority.clone()) ).await; @@ -618,7 +623,7 @@ async fn handle_network_messages( PeerSet::Validation => { dispatch_validation_events_to_all( vec![ - NetworkBridgeEvent::PeerConnected(peer.clone(), role), + NetworkBridgeEvent::PeerConnected(peer.clone(), role, maybe_authority), NetworkBridgeEvent::PeerViewChange( peer.clone(), View::default(), @@ -640,7 +645,7 @@ async fn handle_network_messages( PeerSet::Collation => { dispatch_collation_events_to_all( vec![ - NetworkBridgeEvent::PeerConnected(peer.clone(), role), + NetworkBridgeEvent::PeerConnected(peer.clone(), role, maybe_authority), NetworkBridgeEvent::PeerViewChange( peer.clone(), View::default(), @@ -858,6 +863,7 @@ where let (remote, network_event_handler) = handle_network_messages( ctx.sender().clone(), network_service.clone(), + authority_discovery_service.clone(), request_multiplexer, validation_worker_tx, metrics.clone(), @@ -1191,6 +1197,7 @@ mod tests { _req_configs: Vec, } + #[derive(Clone)] struct TestAuthorityDiscovery; // The test's view of the network. This receives updates from the subsystem in the form @@ -1796,7 +1803,7 @@ mod tests { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), &mut virtual_overseer, ).await; @@ -1848,7 +1855,7 @@ mod tests { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), &mut virtual_overseer, ).await; @@ -1920,7 +1927,7 @@ mod tests { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), &mut virtual_overseer, ).await; @@ -1932,7 +1939,7 @@ mod tests { { assert_sends_collation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), &mut virtual_overseer, ).await; @@ -2004,7 +2011,7 @@ mod tests { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, None), &mut virtual_overseer, ).await; @@ -2016,7 +2023,7 @@ mod tests { { assert_sends_collation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None), &mut virtual_overseer, ).await; @@ -2099,7 +2106,7 @@ mod tests { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), &mut virtual_overseer, ).await; @@ -2111,7 +2118,7 @@ mod tests { { assert_sends_collation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), &mut virtual_overseer, ).await; @@ -2259,7 +2266,7 @@ mod tests { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), &mut virtual_overseer, ).await; @@ -2271,7 +2278,7 @@ mod tests { { assert_sends_collation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), &mut virtual_overseer, ).await; diff --git a/polkadot/node/network/bridge/src/validator_discovery.rs b/polkadot/node/network/bridge/src/validator_discovery.rs index e6170d6c80..4d43d66a23 100644 --- a/polkadot/node/network/bridge/src/validator_discovery.rs +++ b/polkadot/node/network/bridge/src/validator_discovery.rs @@ -34,7 +34,7 @@ const LOG_TARGET: &str = "parachain::validator-discovery"; /// An abstraction over the authority discovery service. #[async_trait] -pub trait AuthorityDiscovery: Send + 'static { +pub trait AuthorityDiscovery: Send + Clone + 'static { /// Get the addresses for the given [`AuthorityId`] from the local address cache. async fn get_addresses_by_authority_id(&mut self, authority: AuthorityDiscoveryId) -> Option>; /// Get the [`AuthorityId`] for the given [`PeerId`] from the local address cache. @@ -307,16 +307,15 @@ impl Service { } /// Should be called when a peer connected. - #[tracing::instrument(level = "trace", skip(self, authority_discovery_service), fields(subsystem = LOG_TARGET))] + #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] pub async fn on_peer_connected( &mut self, peer_id: PeerId, peer_set: PeerSet, - authority_discovery_service: &mut AD, + maybe_authority: Option, ) { let state = &mut self.state[peer_set]; // check if it's an authority we've been waiting for - let maybe_authority = authority_discovery_service.get_authority_id_by_peer_id(peer_id.clone()).await; if let Some(authority) = maybe_authority { for request in state.non_revoked_discovery_requests.iter_mut() { let _ = request.on_authority_connected(&authority, &peer_id); @@ -359,7 +358,7 @@ mod tests { peers_set: HashSet, } - #[derive(Default)] + #[derive(Default, Clone)] struct TestAuthorityDiscovery { by_authority_id: HashMap, by_peer_id: HashMap, @@ -469,7 +468,8 @@ mod tests { let req1 = vec![authority_ids[0].clone(), authority_ids[1].clone()]; let (sender, mut receiver) = mpsc::channel(2); - service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, &mut ads).await; + let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await; + service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await; let _ = service.on_request( req1, @@ -509,12 +509,14 @@ mod tests { ).await; - service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, &mut ads).await; + let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await; + service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await; let reply1 = receiver.next().await.unwrap(); assert_eq!(reply1.0, authority_ids[0]); assert_eq!(reply1.1, peer_ids[0]); - service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, &mut ads).await; + let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await; + service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await; let reply2 = receiver.next().await.unwrap(); assert_eq!(reply2.0, authority_ids[1]); assert_eq!(reply2.1, peer_ids[1]); @@ -534,8 +536,10 @@ mod tests { futures::executor::block_on(async move { let (sender, mut receiver) = mpsc::channel(1); - service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, &mut ads).await; - service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, &mut ads).await; + let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await; + service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await; + let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await; + service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await; let (ns, ads) = service.on_request( vec![authority_ids[0].clone()], @@ -580,8 +584,10 @@ mod tests { futures::executor::block_on(async move { let (sender, mut receiver) = mpsc::channel(1); - service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, &mut ads).await; - service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, &mut ads).await; + let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await; + service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await; + let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await; + service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await; let (ns, ads) = service.on_request( vec![authority_ids[0].clone(), authority_ids[2].clone()], @@ -645,7 +651,8 @@ mod tests { futures::executor::block_on(async move { let (sender, mut receiver) = mpsc::channel(1); - service.on_peer_connected(validator_peer_id.clone(), PeerSet::Validation, &mut ads).await; + let maybe_authority = ads.get_authority_id_by_peer_id(validator_peer_id).await; + service.on_peer_connected(validator_peer_id.clone(), PeerSet::Validation, maybe_authority).await; let address = known_multiaddr()[0].clone().with(Protocol::P2p(validator_peer_id.clone().into())); ads.by_peer_id.insert(validator_peer_id.clone(), validator_id.clone()); diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs index 8c7fc3763e..99f6344aa9 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -791,7 +791,7 @@ async fn handle_network_msg( use NetworkBridgeEvent::*; match bridge_message { - PeerConnected(peer_id, observed_role) => { + PeerConnected(peer_id, observed_role, _) => { // If it is possible that a disconnected validator would attempt a reconnect // it should be handled here. tracing::trace!( @@ -1343,6 +1343,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer.clone(), polkadot_node_network_protocol::ObservedRole::Authority, + None, ), ), ).await; diff --git a/polkadot/node/network/collator-protocol/src/validator_side.rs b/polkadot/node/network/collator-protocol/src/validator_side.rs index 8bba328653..ae5ee6fd12 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side.rs @@ -873,7 +873,7 @@ where use NetworkBridgeEvent::*; match bridge_message { - PeerConnected(peer_id, _role) => { + PeerConnected(peer_id, _role, _) => { state.peer_data.entry(peer_id).or_default(); state.metrics.note_collator_peer_count(state.peer_data.len()); }, @@ -1469,6 +1469,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_b, ObservedRole::Full, + None, ), ) ).await; @@ -1543,6 +1544,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_b, ObservedRole::Full, + None, ), ) ).await; @@ -1553,6 +1555,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_c, ObservedRole::Full, + None, ), ) ).await; @@ -1636,6 +1639,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_b, ObservedRole::Full, + None, ), ) ).await; @@ -1704,6 +1708,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_b, ObservedRole::Full, + None, ), ) ).await; @@ -1714,6 +1719,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_c, ObservedRole::Full, + None, ), ) ).await; @@ -1918,6 +1924,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_b.clone(), ObservedRole::Full, + None, ) ) ).await; @@ -2012,6 +2019,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_b.clone(), ObservedRole::Full, + None, ) ) ).await; @@ -2153,6 +2161,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_b.clone(), ObservedRole::Full, + None, ) ) ).await; @@ -2199,6 +2208,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_b.clone(), ObservedRole::Full, + None, ) ) ).await; @@ -2270,6 +2280,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_b.clone(), ObservedRole::Full, + None, ) ) ).await; diff --git a/polkadot/node/network/protocol/src/request_response/mod.rs b/polkadot/node/network/protocol/src/request_response/mod.rs index 8310d0f9c8..1be3bf05d2 100644 --- a/polkadot/node/network/protocol/src/request_response/mod.rs +++ b/polkadot/node/network/protocol/src/request_response/mod.rs @@ -154,7 +154,7 @@ impl Protocol { name: p_name, max_request_size: 1_000, // Available data size is dominated code size. - // + 1000 to account for protocol overhead (should be way less). + // + 1000 to account for protocol overhead (should be way less). max_response_size: MAX_CODE_SIZE as u64 + 1000, // We need statement fetching to be fast and will try our best at the responding // side to answer requests within that timeout, assuming a bandwidth of 500Mbit/s @@ -199,7 +199,7 @@ impl Protocol { // waisting precious time. let available_bandwidth = 7 * MIN_BANDWIDTH_BYTES / 10; let size = u64::saturating_sub( - STATEMENTS_TIMEOUT.as_millis() as u64 * available_bandwidth / (1000 * MAX_CODE_SIZE as u64), + STATEMENTS_TIMEOUT.as_millis() as u64 * available_bandwidth / (1000 * MAX_CODE_SIZE as u64), MAX_PARALLEL_STATEMENT_REQUESTS as u64 ); debug_assert!( diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index b9c9089bc6..ab1068f2b7 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -1437,7 +1437,7 @@ async fn handle_network_update( metrics: &Metrics, ) { match update { - NetworkBridgeEvent::PeerConnected(peer, role) => { + NetworkBridgeEvent::PeerConnected(peer, role, _) => { tracing::trace!( target: LOG_TARGET, ?peer, @@ -2667,13 +2667,13 @@ mod tests { // notify of peers and view handle.send(FromOverseer::Communication { msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full) + NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, None) ) }).await; handle.send(FromOverseer::Communication { msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full) + NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None) ) }).await; @@ -2835,23 +2835,23 @@ mod tests { // notify of peers and view handle.send(FromOverseer::Communication { msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full) + NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, None) ) }).await; handle.send(FromOverseer::Communication { msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full) + NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None) ) }).await; handle.send(FromOverseer::Communication { msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_c.clone(), ObservedRole::Full) + NetworkBridgeEvent::PeerConnected(peer_c.clone(), ObservedRole::Full, None) ) }).await; handle.send(FromOverseer::Communication { msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_bad.clone(), ObservedRole::Full) + NetworkBridgeEvent::PeerConnected(peer_bad.clone(), ObservedRole::Full, None) ) }).await; diff --git a/polkadot/node/subsystem-util/Cargo.toml b/polkadot/node/subsystem-util/Cargo.toml index 15db5e9c1a..cd84dc98b4 100644 --- a/polkadot/node/subsystem-util/Cargo.toml +++ b/polkadot/node/subsystem-util/Cargo.toml @@ -16,6 +16,7 @@ rand = "0.8.3" streamunordered = "0.5.1" thiserror = "1.0.23" tracing = "0.1.25" +lru = "0.6.5" polkadot-node-primitives = { path = "../primitives" } polkadot-node-subsystem = { path = "../subsystem" } diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index 58aa2a27de..eb55d31cae 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -65,6 +65,9 @@ pub mod reexports { }; } +/// Convenient and efficient runtime info access. +pub mod runtime; + /// Duration a job will wait after sending a stop signal before hard-aborting. pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1); /// Capacity of channels to and from individual jobs diff --git a/polkadot/node/subsystem-util/src/runtime/error.rs b/polkadot/node/subsystem-util/src/runtime/error.rs new file mode 100644 index 0000000000..9b298f5279 --- /dev/null +++ b/polkadot/node/subsystem-util/src/runtime/error.rs @@ -0,0 +1,52 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . +// + +//! Error handling related code and Error/Result definitions. + +use thiserror::Error; +use futures::channel::oneshot; + +use polkadot_node_subsystem::errors::RuntimeApiError; +use polkadot_primitives::v1::SessionIndex; + +pub type Result = std::result::Result; + +/// Errors for fetching of runtime information. +#[derive(Debug, Error)] +pub enum Error { + /// Runtime API subsystem is down, which means we're shutting down. + #[error("Runtime request canceled")] + RuntimeRequestCanceled(oneshot::Canceled), + + /// Some request to the runtime failed. + /// For example if we prune a block we're requesting info about. + #[error("Runtime API error")] + RuntimeRequest(RuntimeApiError), + + /// We tried fetching a session info which was not available. + #[error("There was no session with the given index")] + NoSuchSession(SessionIndex), +} + +/// Receive a response from a runtime request and convert errors. +pub(crate) async fn recv_runtime( + r: oneshot::Receiver>, +) -> std::result::Result { + r.await + .map_err(Error::RuntimeRequestCanceled)? + .map_err(Error::RuntimeRequest) +} diff --git a/polkadot/node/network/availability-distribution/src/runtime.rs b/polkadot/node/subsystem-util/src/runtime/mod.rs similarity index 91% rename from polkadot/node/network/availability-distribution/src/runtime.rs rename to polkadot/node/subsystem-util/src/runtime/mod.rs index 39022e8250..266415e0f5 100644 --- a/polkadot/node/network/availability-distribution/src/runtime.rs +++ b/polkadot/node/subsystem-util/src/runtime/mod.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -//! Convenient interface to the runtime. +//! Convenient interface to runtime information. use lru::LruCache; @@ -22,18 +22,20 @@ use sp_application_crypto::AppKey; use sp_core::crypto::Public; use sp_keystore::{CryptoStore, SyncCryptoStorePtr}; -use polkadot_node_subsystem_util::{ +use polkadot_primitives::v1::{GroupIndex, Hash, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex}; +use polkadot_node_subsystem::SubsystemContext; + +use crate::{ request_session_index_for_child, request_session_info, }; -use polkadot_primitives::v1::{GroupIndex, Hash, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex}; -use polkadot_subsystem::SubsystemContext; -use super::{ - error::recv_runtime, - Error, -}; +/// Errors that can happen on runtime fetches. +mod error; -/// Caching of session info as needed by availability distribution. +use error::{recv_runtime, Result}; +pub use error::Error; + +/// Caching of session info. /// /// It should be ensured that a cached session stays live in the cache as long as we might need it. pub struct Runtime { @@ -72,8 +74,8 @@ impl Runtime { /// Create a new `Runtime` for convenient runtime fetches. pub fn new(keystore: SyncCryptoStorePtr) -> Self { Self { - // 5 relatively conservative, 1 to 2 should suffice: - session_index_cache: LruCache::new(5), + // Adjust, depending on how many forks we want to support. + session_index_cache: LruCache::new(10), // We need to cache the current and the last session the most: session_info_cache: LruCache::new(2), keystore, @@ -85,7 +87,7 @@ impl Runtime { &mut self, ctx: &mut Context, parent: Hash, - ) -> Result + ) -> Result where Context: SubsystemContext, { @@ -106,7 +108,7 @@ impl Runtime { &'a mut self, ctx: &mut Context, parent: Hash, - ) -> Result<&'a ExtendedSessionInfo, Error> + ) -> Result<&'a ExtendedSessionInfo> where Context: SubsystemContext, { @@ -124,7 +126,7 @@ impl Runtime { ctx: &mut Context, parent: Hash, session_index: SessionIndex, - ) -> Result<&'a ExtendedSessionInfo, Error> + ) -> Result<&'a ExtendedSessionInfo> where Context: SubsystemContext, { @@ -155,7 +157,7 @@ impl Runtime { async fn get_validator_info( &self, session_info: &SessionInfo, - ) -> Result + ) -> Result { if let Some(our_index) = self.get_our_index(&session_info.validators).await { // Get our group index: diff --git a/polkadot/node/subsystem/src/messages/network_bridge_event.rs b/polkadot/node/subsystem/src/messages/network_bridge_event.rs index 608175998c..c0a58ada77 100644 --- a/polkadot/node/subsystem/src/messages/network_bridge_event.rs +++ b/polkadot/node/subsystem/src/messages/network_bridge_event.rs @@ -17,13 +17,15 @@ use std::convert::TryFrom; pub use sc_network::{ReputationChange, PeerId}; + use polkadot_node_network_protocol::{WrongVariant, ObservedRole, OurView, View}; +use polkadot_primitives::v1::AuthorityDiscoveryId; /// Events from network. #[derive(Debug, Clone, PartialEq)] pub enum NetworkBridgeEvent { /// A peer has connected. - PeerConnected(PeerId, ObservedRole), + PeerConnected(PeerId, ObservedRole, Option), /// A peer has disconnected. PeerDisconnected(PeerId), @@ -58,8 +60,8 @@ impl NetworkBridgeEvent { where T: 'a + Clone, &'a T: TryFrom<&'a M, Error = WrongVariant> { Ok(match *self { - NetworkBridgeEvent::PeerConnected(ref peer, ref role) - => NetworkBridgeEvent::PeerConnected(peer.clone(), role.clone()), + NetworkBridgeEvent::PeerConnected(ref peer, ref role, ref authority_id) + => NetworkBridgeEvent::PeerConnected(peer.clone(), role.clone(), authority_id.clone()), NetworkBridgeEvent::PeerDisconnected(ref peer) => NetworkBridgeEvent::PeerDisconnected(peer.clone()), NetworkBridgeEvent::PeerMessage(ref peer, ref msg)