From dacde443f7751dcd2079cbffe456bb7c94292545 Mon Sep 17 00:00:00 2001 From: Robert Klotzner Date: Fri, 16 Apr 2021 21:42:20 +0200 Subject: [PATCH] Infrastructure improvements (#2897) * Factor out runtime module into utils. * Add maybe_authority information to `PeerConnected` event. We already gather this information in authority discovery, so we might as well share it with others. This opens up an easy path to trigger validators differently from normal nodes, e.g. for prioritization. This change has become more important now, that we just connect to all validators and therefore just have a long peer list without any information about those nodes. * Test fix. --- polkadot/Cargo.lock | 1 + .../node/core/candidate-validation/src/lib.rs | 2 +- .../network/approval-distribution/src/lib.rs | 2 +- .../approval-distribution/src/tests.rs | 2 +- .../availability-distribution/src/error.rs | 25 ++++++--- .../availability-distribution/src/lib.rs | 4 +- .../src/pov_requester/mod.rs | 5 +- .../network/bitfield-distribution/src/lib.rs | 4 +- polkadot/node/network/bridge/src/lib.rs | 43 ++++++++------- .../network/bridge/src/validator_discovery.rs | 33 +++++++----- .../collator-protocol/src/collator_side.rs | 3 +- .../collator-protocol/src/validator_side.rs | 13 ++++- .../protocol/src/request_response/mod.rs | 4 +- .../network/statement-distribution/src/lib.rs | 14 ++--- polkadot/node/subsystem-util/Cargo.toml | 1 + polkadot/node/subsystem-util/src/lib.rs | 3 ++ .../node/subsystem-util/src/runtime/error.rs | 52 +++++++++++++++++++ .../src/runtime/mod.rs} | 32 ++++++------ .../src/messages/network_bridge_event.rs | 8 +-- 19 files changed, 175 insertions(+), 76 deletions(-) create mode 100644 polkadot/node/subsystem-util/src/runtime/error.rs rename polkadot/node/{network/availability-distribution/src/runtime.rs => subsystem-util/src/runtime/mod.rs} (91%) diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 5219306c7a..512d506241 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -6096,6 +6096,7 @@ dependencies = [ "futures 0.3.13", "futures-timer 3.0.2", "log", + "lru", "metered-channel", "parity-scale-codec", "parking_lot 0.11.1", diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 83ea629f49..11537b2e3e 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -592,7 +592,7 @@ impl metrics::Metrics for Metrics { #[cfg(test)] mod tests { - use super::*; + use super::*; use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_primitives::v1::{HeadData, UpwardMessage}; use sp_core::testing::TaskExecutor; diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index 050b9d2b27..441ee502b0 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -191,7 +191,7 @@ impl State { event: NetworkBridgeEvent, ) { match event { - NetworkBridgeEvent::PeerConnected(peer_id, role) => { + NetworkBridgeEvent::PeerConnected(peer_id, role, _) => { // insert a blank view if none already present tracing::trace!( target: LOG_TARGET, diff --git a/polkadot/node/network/approval-distribution/src/tests.rs b/polkadot/node/network/approval-distribution/src/tests.rs index 8b33082e5d..47a444fc77 100644 --- a/polkadot/node/network/approval-distribution/src/tests.rs +++ b/polkadot/node/network/approval-distribution/src/tests.rs @@ -117,7 +117,7 @@ async fn setup_peer_with_view( overseer_send( virtual_overseer, ApprovalDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_id.clone(), ObservedRole::Full) + NetworkBridgeEvent::PeerConnected(peer_id.clone(), ObservedRole::Full, None) ) ).await; overseer_send( diff --git a/polkadot/node/network/availability-distribution/src/error.rs b/polkadot/node/network/availability-distribution/src/error.rs index 5f891325f3..c15669f406 100644 --- a/polkadot/node/network/availability-distribution/src/error.rs +++ b/polkadot/node/network/availability-distribution/src/error.rs @@ -18,12 +18,15 @@ //! Error handling related code and Error/Result definitions. use polkadot_node_network_protocol::request_response::request::RequestError; +use polkadot_primitives::v1::SessionIndex; use thiserror::Error; use futures::channel::oneshot; -use polkadot_node_subsystem_util::Error as UtilError; -use polkadot_primitives::v1::SessionIndex; +use polkadot_node_subsystem_util::{ + runtime, + Error as UtilError, +}; use polkadot_subsystem::{errors::RuntimeApiError, SubsystemError}; use crate::LOG_TARGET; @@ -74,10 +77,6 @@ pub enum Error { #[error("Runtime API error")] RuntimeRequest(RuntimeApiError), - /// We tried fetching a session info which was not available. - #[error("There was no session with the given index")] - NoSuchSession(SessionIndex), - /// Fetching PoV failed with `RequestError`. #[error("FetchPoV request error")] FetchPoV(#[source] RequestError), @@ -92,10 +91,24 @@ pub enum Error { /// No validator with the index could be found in current session. #[error("Given validator index could not be found")] InvalidValidatorIndex, + + /// We tried fetching a session info which was not available. + #[error("There was no session with the given index")] + NoSuchSession(SessionIndex), + + /// Errors coming from runtime::Runtime. + #[error("Error while accessing runtime information")] + Runtime(#[source] runtime::Error), } pub type Result = std::result::Result; +impl From for Error { + fn from(err: runtime::Error) -> Self { + Self::Runtime(err) + } +} + impl From for Error { fn from(err: SubsystemError) -> Self { Self::IncomingMessageChannel(err) diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index 7f9eb739a4..7c522859a0 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -28,9 +28,7 @@ mod error; pub use error::Error; use error::{Result, log_error}; -/// Runtime requests. -mod runtime; -use runtime::Runtime; +use polkadot_node_subsystem_util::runtime::Runtime; /// `Requester` taking care of requesting chunks for candidates pending availability. mod requester; diff --git a/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs b/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs index 5aecdb26d7..7bb5f25397 100644 --- a/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs @@ -33,8 +33,9 @@ use polkadot_subsystem::{ ActiveLeavesUpdate, SubsystemContext, ActivatedLeaf, messages::{AllMessages, NetworkBridgeMessage, IfDisconnected} }; +use polkadot_node_subsystem_util::runtime::{Runtime, ValidatorInfo}; -use crate::{error::{Error, log_error}, runtime::{Runtime, ValidatorInfo}}; +use crate::error::{Error, log_error}; /// Number of sessions we want to keep in the LRU. const NUM_SESSIONS: usize = 2; @@ -274,7 +275,7 @@ mod tests { let (mut context, mut virtual_overseer) = test_helpers::make_subsystem_context::(pool.clone()); let keystore = make_ferdie_keystore(); - let mut runtime = crate::runtime::Runtime::new(keystore); + let mut runtime = polkadot_node_subsystem_util::runtime::Runtime::new(keystore); let (tx, rx) = oneshot::channel(); let testee = async { diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs index 971ac939ac..dbe9a61cf5 100644 --- a/polkadot/node/network/bitfield-distribution/src/lib.rs +++ b/polkadot/node/network/bitfield-distribution/src/lib.rs @@ -508,7 +508,7 @@ where let _timer = metrics.time_handle_network_msg(); match bridge_message { - NetworkBridgeEvent::PeerConnected(peerid, role) => { + NetworkBridgeEvent::PeerConnected(peerid, role, _) => { tracing::trace!( target: LOG_TARGET, ?peerid, @@ -1335,7 +1335,7 @@ mod test { &mut ctx, &mut state, &Default::default(), - NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None), )); // make peer b interested diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index dc921047f9..fe89893a65 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -36,7 +36,7 @@ use polkadot_subsystem::messages::{ NetworkBridgeMessage, AllMessages, CollatorProtocolMessage, NetworkBridgeEvent, }; -use polkadot_primitives::v1::{Hash, BlockNumber}; +use polkadot_primitives::v1::{Hash, BlockNumber, AuthorityDiscoveryId}; use polkadot_node_network_protocol::{ PeerId, peer_set::PeerSet, View, v1 as protocol_v1, OurView, UnifiedReputationChange as Rep, ObservedRole, @@ -316,7 +316,7 @@ impl From for UnexpectedAbort { // notifications to be passed through to the validator discovery worker. enum ValidatorDiscoveryNotification { - PeerConnected(PeerId, PeerSet), + PeerConnected(PeerId, PeerSet, Option), PeerDisconnected(PeerId, PeerSet), } @@ -540,11 +540,11 @@ where }, notification = validator_discovery_notifications.next().fuse() => match notification { None => return Ok(()), - Some(ValidatorDiscoveryNotification::PeerConnected(peer, peer_set)) => { + Some(ValidatorDiscoveryNotification::PeerConnected(peer, peer_set, maybe_auth)) => { validator_discovery.on_peer_connected( peer.clone(), peer_set, - &mut authority_discovery_service, + maybe_auth, ).await; } Some(ValidatorDiscoveryNotification::PeerDisconnected(peer, peer_set)) => { @@ -555,9 +555,10 @@ where } } -async fn handle_network_messages( +async fn handle_network_messages( mut sender: impl SubsystemSender, mut network_service: impl Network, + mut authority_discovery_service: AD, mut request_multiplexer: RequestMultiplexer, mut validator_discovery_notifications: mpsc::Sender, metrics: Metrics, @@ -607,10 +608,14 @@ async fn handle_network_messages( shared.local_view.clone().unwrap_or(View::default()) }; + let maybe_authority = + authority_discovery_service + .get_authority_id_by_peer_id(peer).await; + // Failure here means that the other side of the network bridge // has concluded and this future will be dropped in due course. let _ = validator_discovery_notifications.send( - ValidatorDiscoveryNotification::PeerConnected(peer.clone(), peer_set) + ValidatorDiscoveryNotification::PeerConnected(peer, peer_set, maybe_authority.clone()) ).await; @@ -618,7 +623,7 @@ async fn handle_network_messages( PeerSet::Validation => { dispatch_validation_events_to_all( vec![ - NetworkBridgeEvent::PeerConnected(peer.clone(), role), + NetworkBridgeEvent::PeerConnected(peer.clone(), role, maybe_authority), NetworkBridgeEvent::PeerViewChange( peer.clone(), View::default(), @@ -640,7 +645,7 @@ async fn handle_network_messages( PeerSet::Collation => { dispatch_collation_events_to_all( vec![ - NetworkBridgeEvent::PeerConnected(peer.clone(), role), + NetworkBridgeEvent::PeerConnected(peer.clone(), role, maybe_authority), NetworkBridgeEvent::PeerViewChange( peer.clone(), View::default(), @@ -858,6 +863,7 @@ where let (remote, network_event_handler) = handle_network_messages( ctx.sender().clone(), network_service.clone(), + authority_discovery_service.clone(), request_multiplexer, validation_worker_tx, metrics.clone(), @@ -1191,6 +1197,7 @@ mod tests { _req_configs: Vec, } + #[derive(Clone)] struct TestAuthorityDiscovery; // The test's view of the network. This receives updates from the subsystem in the form @@ -1796,7 +1803,7 @@ mod tests { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), &mut virtual_overseer, ).await; @@ -1848,7 +1855,7 @@ mod tests { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), &mut virtual_overseer, ).await; @@ -1920,7 +1927,7 @@ mod tests { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), &mut virtual_overseer, ).await; @@ -1932,7 +1939,7 @@ mod tests { { assert_sends_collation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), &mut virtual_overseer, ).await; @@ -2004,7 +2011,7 @@ mod tests { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, None), &mut virtual_overseer, ).await; @@ -2016,7 +2023,7 @@ mod tests { { assert_sends_collation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None), &mut virtual_overseer, ).await; @@ -2099,7 +2106,7 @@ mod tests { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), &mut virtual_overseer, ).await; @@ -2111,7 +2118,7 @@ mod tests { { assert_sends_collation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), &mut virtual_overseer, ).await; @@ -2259,7 +2266,7 @@ mod tests { // bridge will inform about all connected peers. { assert_sends_validation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), &mut virtual_overseer, ).await; @@ -2271,7 +2278,7 @@ mod tests { { assert_sends_collation_event_to_all( - NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None), &mut virtual_overseer, ).await; diff --git a/polkadot/node/network/bridge/src/validator_discovery.rs b/polkadot/node/network/bridge/src/validator_discovery.rs index e6170d6c80..4d43d66a23 100644 --- a/polkadot/node/network/bridge/src/validator_discovery.rs +++ b/polkadot/node/network/bridge/src/validator_discovery.rs @@ -34,7 +34,7 @@ const LOG_TARGET: &str = "parachain::validator-discovery"; /// An abstraction over the authority discovery service. #[async_trait] -pub trait AuthorityDiscovery: Send + 'static { +pub trait AuthorityDiscovery: Send + Clone + 'static { /// Get the addresses for the given [`AuthorityId`] from the local address cache. async fn get_addresses_by_authority_id(&mut self, authority: AuthorityDiscoveryId) -> Option>; /// Get the [`AuthorityId`] for the given [`PeerId`] from the local address cache. @@ -307,16 +307,15 @@ impl Service { } /// Should be called when a peer connected. - #[tracing::instrument(level = "trace", skip(self, authority_discovery_service), fields(subsystem = LOG_TARGET))] + #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] pub async fn on_peer_connected( &mut self, peer_id: PeerId, peer_set: PeerSet, - authority_discovery_service: &mut AD, + maybe_authority: Option, ) { let state = &mut self.state[peer_set]; // check if it's an authority we've been waiting for - let maybe_authority = authority_discovery_service.get_authority_id_by_peer_id(peer_id.clone()).await; if let Some(authority) = maybe_authority { for request in state.non_revoked_discovery_requests.iter_mut() { let _ = request.on_authority_connected(&authority, &peer_id); @@ -359,7 +358,7 @@ mod tests { peers_set: HashSet, } - #[derive(Default)] + #[derive(Default, Clone)] struct TestAuthorityDiscovery { by_authority_id: HashMap, by_peer_id: HashMap, @@ -469,7 +468,8 @@ mod tests { let req1 = vec![authority_ids[0].clone(), authority_ids[1].clone()]; let (sender, mut receiver) = mpsc::channel(2); - service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, &mut ads).await; + let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await; + service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await; let _ = service.on_request( req1, @@ -509,12 +509,14 @@ mod tests { ).await; - service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, &mut ads).await; + let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await; + service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await; let reply1 = receiver.next().await.unwrap(); assert_eq!(reply1.0, authority_ids[0]); assert_eq!(reply1.1, peer_ids[0]); - service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, &mut ads).await; + let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await; + service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await; let reply2 = receiver.next().await.unwrap(); assert_eq!(reply2.0, authority_ids[1]); assert_eq!(reply2.1, peer_ids[1]); @@ -534,8 +536,10 @@ mod tests { futures::executor::block_on(async move { let (sender, mut receiver) = mpsc::channel(1); - service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, &mut ads).await; - service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, &mut ads).await; + let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await; + service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await; + let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await; + service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await; let (ns, ads) = service.on_request( vec![authority_ids[0].clone()], @@ -580,8 +584,10 @@ mod tests { futures::executor::block_on(async move { let (sender, mut receiver) = mpsc::channel(1); - service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, &mut ads).await; - service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, &mut ads).await; + let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await; + service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await; + let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await; + service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await; let (ns, ads) = service.on_request( vec![authority_ids[0].clone(), authority_ids[2].clone()], @@ -645,7 +651,8 @@ mod tests { futures::executor::block_on(async move { let (sender, mut receiver) = mpsc::channel(1); - service.on_peer_connected(validator_peer_id.clone(), PeerSet::Validation, &mut ads).await; + let maybe_authority = ads.get_authority_id_by_peer_id(validator_peer_id).await; + service.on_peer_connected(validator_peer_id.clone(), PeerSet::Validation, maybe_authority).await; let address = known_multiaddr()[0].clone().with(Protocol::P2p(validator_peer_id.clone().into())); ads.by_peer_id.insert(validator_peer_id.clone(), validator_id.clone()); diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs index 8c7fc3763e..99f6344aa9 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -791,7 +791,7 @@ async fn handle_network_msg( use NetworkBridgeEvent::*; match bridge_message { - PeerConnected(peer_id, observed_role) => { + PeerConnected(peer_id, observed_role, _) => { // If it is possible that a disconnected validator would attempt a reconnect // it should be handled here. tracing::trace!( @@ -1343,6 +1343,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer.clone(), polkadot_node_network_protocol::ObservedRole::Authority, + None, ), ), ).await; diff --git a/polkadot/node/network/collator-protocol/src/validator_side.rs b/polkadot/node/network/collator-protocol/src/validator_side.rs index 8bba328653..ae5ee6fd12 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side.rs @@ -873,7 +873,7 @@ where use NetworkBridgeEvent::*; match bridge_message { - PeerConnected(peer_id, _role) => { + PeerConnected(peer_id, _role, _) => { state.peer_data.entry(peer_id).or_default(); state.metrics.note_collator_peer_count(state.peer_data.len()); }, @@ -1469,6 +1469,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_b, ObservedRole::Full, + None, ), ) ).await; @@ -1543,6 +1544,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_b, ObservedRole::Full, + None, ), ) ).await; @@ -1553,6 +1555,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_c, ObservedRole::Full, + None, ), ) ).await; @@ -1636,6 +1639,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_b, ObservedRole::Full, + None, ), ) ).await; @@ -1704,6 +1708,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_b, ObservedRole::Full, + None, ), ) ).await; @@ -1714,6 +1719,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_c, ObservedRole::Full, + None, ), ) ).await; @@ -1918,6 +1924,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_b.clone(), ObservedRole::Full, + None, ) ) ).await; @@ -2012,6 +2019,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_b.clone(), ObservedRole::Full, + None, ) ) ).await; @@ -2153,6 +2161,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_b.clone(), ObservedRole::Full, + None, ) ) ).await; @@ -2199,6 +2208,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_b.clone(), ObservedRole::Full, + None, ) ) ).await; @@ -2270,6 +2280,7 @@ mod tests { NetworkBridgeEvent::PeerConnected( peer_b.clone(), ObservedRole::Full, + None, ) ) ).await; diff --git a/polkadot/node/network/protocol/src/request_response/mod.rs b/polkadot/node/network/protocol/src/request_response/mod.rs index 8310d0f9c8..1be3bf05d2 100644 --- a/polkadot/node/network/protocol/src/request_response/mod.rs +++ b/polkadot/node/network/protocol/src/request_response/mod.rs @@ -154,7 +154,7 @@ impl Protocol { name: p_name, max_request_size: 1_000, // Available data size is dominated code size. - // + 1000 to account for protocol overhead (should be way less). + // + 1000 to account for protocol overhead (should be way less). max_response_size: MAX_CODE_SIZE as u64 + 1000, // We need statement fetching to be fast and will try our best at the responding // side to answer requests within that timeout, assuming a bandwidth of 500Mbit/s @@ -199,7 +199,7 @@ impl Protocol { // waisting precious time. let available_bandwidth = 7 * MIN_BANDWIDTH_BYTES / 10; let size = u64::saturating_sub( - STATEMENTS_TIMEOUT.as_millis() as u64 * available_bandwidth / (1000 * MAX_CODE_SIZE as u64), + STATEMENTS_TIMEOUT.as_millis() as u64 * available_bandwidth / (1000 * MAX_CODE_SIZE as u64), MAX_PARALLEL_STATEMENT_REQUESTS as u64 ); debug_assert!( diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index b9c9089bc6..ab1068f2b7 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -1437,7 +1437,7 @@ async fn handle_network_update( metrics: &Metrics, ) { match update { - NetworkBridgeEvent::PeerConnected(peer, role) => { + NetworkBridgeEvent::PeerConnected(peer, role, _) => { tracing::trace!( target: LOG_TARGET, ?peer, @@ -2667,13 +2667,13 @@ mod tests { // notify of peers and view handle.send(FromOverseer::Communication { msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full) + NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, None) ) }).await; handle.send(FromOverseer::Communication { msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full) + NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None) ) }).await; @@ -2835,23 +2835,23 @@ mod tests { // notify of peers and view handle.send(FromOverseer::Communication { msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full) + NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, None) ) }).await; handle.send(FromOverseer::Communication { msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full) + NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None) ) }).await; handle.send(FromOverseer::Communication { msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_c.clone(), ObservedRole::Full) + NetworkBridgeEvent::PeerConnected(peer_c.clone(), ObservedRole::Full, None) ) }).await; handle.send(FromOverseer::Communication { msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected(peer_bad.clone(), ObservedRole::Full) + NetworkBridgeEvent::PeerConnected(peer_bad.clone(), ObservedRole::Full, None) ) }).await; diff --git a/polkadot/node/subsystem-util/Cargo.toml b/polkadot/node/subsystem-util/Cargo.toml index 15db5e9c1a..cd84dc98b4 100644 --- a/polkadot/node/subsystem-util/Cargo.toml +++ b/polkadot/node/subsystem-util/Cargo.toml @@ -16,6 +16,7 @@ rand = "0.8.3" streamunordered = "0.5.1" thiserror = "1.0.23" tracing = "0.1.25" +lru = "0.6.5" polkadot-node-primitives = { path = "../primitives" } polkadot-node-subsystem = { path = "../subsystem" } diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index 58aa2a27de..eb55d31cae 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -65,6 +65,9 @@ pub mod reexports { }; } +/// Convenient and efficient runtime info access. +pub mod runtime; + /// Duration a job will wait after sending a stop signal before hard-aborting. pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1); /// Capacity of channels to and from individual jobs diff --git a/polkadot/node/subsystem-util/src/runtime/error.rs b/polkadot/node/subsystem-util/src/runtime/error.rs new file mode 100644 index 0000000000..9b298f5279 --- /dev/null +++ b/polkadot/node/subsystem-util/src/runtime/error.rs @@ -0,0 +1,52 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . +// + +//! Error handling related code and Error/Result definitions. + +use thiserror::Error; +use futures::channel::oneshot; + +use polkadot_node_subsystem::errors::RuntimeApiError; +use polkadot_primitives::v1::SessionIndex; + +pub type Result = std::result::Result; + +/// Errors for fetching of runtime information. +#[derive(Debug, Error)] +pub enum Error { + /// Runtime API subsystem is down, which means we're shutting down. + #[error("Runtime request canceled")] + RuntimeRequestCanceled(oneshot::Canceled), + + /// Some request to the runtime failed. + /// For example if we prune a block we're requesting info about. + #[error("Runtime API error")] + RuntimeRequest(RuntimeApiError), + + /// We tried fetching a session info which was not available. + #[error("There was no session with the given index")] + NoSuchSession(SessionIndex), +} + +/// Receive a response from a runtime request and convert errors. +pub(crate) async fn recv_runtime( + r: oneshot::Receiver>, +) -> std::result::Result { + r.await + .map_err(Error::RuntimeRequestCanceled)? + .map_err(Error::RuntimeRequest) +} diff --git a/polkadot/node/network/availability-distribution/src/runtime.rs b/polkadot/node/subsystem-util/src/runtime/mod.rs similarity index 91% rename from polkadot/node/network/availability-distribution/src/runtime.rs rename to polkadot/node/subsystem-util/src/runtime/mod.rs index 39022e8250..266415e0f5 100644 --- a/polkadot/node/network/availability-distribution/src/runtime.rs +++ b/polkadot/node/subsystem-util/src/runtime/mod.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -//! Convenient interface to the runtime. +//! Convenient interface to runtime information. use lru::LruCache; @@ -22,18 +22,20 @@ use sp_application_crypto::AppKey; use sp_core::crypto::Public; use sp_keystore::{CryptoStore, SyncCryptoStorePtr}; -use polkadot_node_subsystem_util::{ +use polkadot_primitives::v1::{GroupIndex, Hash, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex}; +use polkadot_node_subsystem::SubsystemContext; + +use crate::{ request_session_index_for_child, request_session_info, }; -use polkadot_primitives::v1::{GroupIndex, Hash, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex}; -use polkadot_subsystem::SubsystemContext; -use super::{ - error::recv_runtime, - Error, -}; +/// Errors that can happen on runtime fetches. +mod error; -/// Caching of session info as needed by availability distribution. +use error::{recv_runtime, Result}; +pub use error::Error; + +/// Caching of session info. /// /// It should be ensured that a cached session stays live in the cache as long as we might need it. pub struct Runtime { @@ -72,8 +74,8 @@ impl Runtime { /// Create a new `Runtime` for convenient runtime fetches. pub fn new(keystore: SyncCryptoStorePtr) -> Self { Self { - // 5 relatively conservative, 1 to 2 should suffice: - session_index_cache: LruCache::new(5), + // Adjust, depending on how many forks we want to support. + session_index_cache: LruCache::new(10), // We need to cache the current and the last session the most: session_info_cache: LruCache::new(2), keystore, @@ -85,7 +87,7 @@ impl Runtime { &mut self, ctx: &mut Context, parent: Hash, - ) -> Result + ) -> Result where Context: SubsystemContext, { @@ -106,7 +108,7 @@ impl Runtime { &'a mut self, ctx: &mut Context, parent: Hash, - ) -> Result<&'a ExtendedSessionInfo, Error> + ) -> Result<&'a ExtendedSessionInfo> where Context: SubsystemContext, { @@ -124,7 +126,7 @@ impl Runtime { ctx: &mut Context, parent: Hash, session_index: SessionIndex, - ) -> Result<&'a ExtendedSessionInfo, Error> + ) -> Result<&'a ExtendedSessionInfo> where Context: SubsystemContext, { @@ -155,7 +157,7 @@ impl Runtime { async fn get_validator_info( &self, session_info: &SessionInfo, - ) -> Result + ) -> Result { if let Some(our_index) = self.get_our_index(&session_info.validators).await { // Get our group index: diff --git a/polkadot/node/subsystem/src/messages/network_bridge_event.rs b/polkadot/node/subsystem/src/messages/network_bridge_event.rs index 608175998c..c0a58ada77 100644 --- a/polkadot/node/subsystem/src/messages/network_bridge_event.rs +++ b/polkadot/node/subsystem/src/messages/network_bridge_event.rs @@ -17,13 +17,15 @@ use std::convert::TryFrom; pub use sc_network::{ReputationChange, PeerId}; + use polkadot_node_network_protocol::{WrongVariant, ObservedRole, OurView, View}; +use polkadot_primitives::v1::AuthorityDiscoveryId; /// Events from network. #[derive(Debug, Clone, PartialEq)] pub enum NetworkBridgeEvent { /// A peer has connected. - PeerConnected(PeerId, ObservedRole), + PeerConnected(PeerId, ObservedRole, Option), /// A peer has disconnected. PeerDisconnected(PeerId), @@ -58,8 +60,8 @@ impl NetworkBridgeEvent { where T: 'a + Clone, &'a T: TryFrom<&'a M, Error = WrongVariant> { Ok(match *self { - NetworkBridgeEvent::PeerConnected(ref peer, ref role) - => NetworkBridgeEvent::PeerConnected(peer.clone(), role.clone()), + NetworkBridgeEvent::PeerConnected(ref peer, ref role, ref authority_id) + => NetworkBridgeEvent::PeerConnected(peer.clone(), role.clone(), authority_id.clone()), NetworkBridgeEvent::PeerDisconnected(ref peer) => NetworkBridgeEvent::PeerDisconnected(peer.clone()), NetworkBridgeEvent::PeerMessage(ref peer, ref msg)