diff --git a/polkadot/node/core/chain-api/src/lib.rs b/polkadot/node/core/chain-api/src/lib.rs index 3c7a2a7241..aa8b8ae6e9 100644 --- a/polkadot/node/core/chain-api/src/lib.rs +++ b/polkadot/node/core/chain-api/src/lib.rs @@ -44,7 +44,7 @@ use std::sync::Arc; use futures::prelude::*; -const LOG_TARGET: &str = "ChainApiSubsystem"; +const LOG_TARGET: &str = "chain_api"; /// The Chain API Subsystem implementation. pub struct ChainApiSubsystem { diff --git a/polkadot/node/core/runtime-api/src/lib.rs b/polkadot/node/core/runtime-api/src/lib.rs index c957302737..4dec90b33c 100644 --- a/polkadot/node/core/runtime-api/src/lib.rs +++ b/polkadot/node/core/runtime-api/src/lib.rs @@ -40,7 +40,7 @@ use sp_api::{ProvideRuntimeApi}; use futures::prelude::*; -const LOG_TARGET: &str = "RuntimeApi"; +const LOG_TARGET: &str = "runtime_api"; /// The `RuntimeApiSubsystem`. See module docs for more details. pub struct RuntimeApiSubsystem { diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index 24d48acc9e..920075e042 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -52,7 +52,7 @@ use std::collections::{HashMap, HashSet}; use std::iter; use thiserror::Error; -const LOG_TARGET: &'static str = "AvailabilityDistribution"; +const LOG_TARGET: &'static str = "availability_distribution"; #[derive(Debug, Error)] enum Error { diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 97744c5c77..ed58cd9097 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -24,7 +24,7 @@ use parity_scale_codec::{Encode, Decode}; use futures::prelude::*; use futures::future::BoxFuture; use futures::stream::BoxStream; -use futures::channel::{mpsc, oneshot}; +use futures::channel::mpsc; use sc_network::Event as NetworkEvent; @@ -246,7 +246,6 @@ enum Action { ConnectToValidators { validator_ids: Vec, connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>, - revoke: oneshot::Receiver<()>, }, ReportPeer(PeerId, ReputationChange), @@ -278,11 +277,8 @@ fn action_from_overseer_message( => Action::SendValidationMessage(peers, msg), NetworkBridgeMessage::SendCollationMessage(peers, msg) => Action::SendCollationMessage(peers, msg), - NetworkBridgeMessage::ConnectToValidators { - validator_ids, - connected, - revoke, - } => Action::ConnectToValidators { validator_ids, connected, revoke }, + NetworkBridgeMessage::ConnectToValidators { validator_ids, connected } + => Action::ConnectToValidators { validator_ids, connected }, }, Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_))) => Action::Nop, @@ -627,12 +623,10 @@ where Action::ConnectToValidators { validator_ids, connected, - revoke, } => { let (ns, ads) = validator_discovery.on_request( validator_ids, connected, - revoke, network_service, authority_discovery_service, ).await; diff --git a/polkadot/node/network/bridge/src/validator_discovery.rs b/polkadot/node/network/bridge/src/validator_discovery.rs index d0cc146191..71a3d4a566 100644 --- a/polkadot/node/network/bridge/src/validator_discovery.rs +++ b/polkadot/node/network/bridge/src/validator_discovery.rs @@ -21,7 +21,7 @@ use std::collections::{HashSet, HashMap, hash_map}; use std::sync::Arc; use async_trait::async_trait; -use futures::channel::{mpsc, oneshot}; +use futures::channel::mpsc; use sc_network::multiaddr::{Multiaddr, Protocol}; use sc_authority_discovery::Service as AuthorityDiscoveryService; @@ -29,7 +29,7 @@ use polkadot_node_network_protocol::PeerId; use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash}; const PRIORITY_GROUP: &'static str = "parachain_validators"; -const LOG_TARGET: &str = "ValidatorDiscovery"; +const LOG_TARGET: &str = "validator_discovery"; /// An abstraction over networking for the purposes of validator discovery service. #[async_trait] @@ -76,7 +76,6 @@ struct NonRevokedConnectionRequestState { requested: Vec, pending: HashSet, sender: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>, - revoke: oneshot::Receiver<()>, } impl NonRevokedConnectionRequestState { @@ -85,13 +84,11 @@ impl NonRevokedConnectionRequestState { requested: Vec, pending: HashSet, sender: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>, - revoke: oneshot::Receiver<()>, ) -> Self { Self { requested, pending, sender, - revoke, } } @@ -105,9 +102,7 @@ impl NonRevokedConnectionRequestState { /// Returns `true` if the request is revoked. pub fn is_revoked(&mut self) -> bool { - self.revoke - .try_recv() - .map_or(true, |r| r.is_some()) + self.sender.is_closed() } pub fn requested(&self) -> &[AuthorityDiscoveryId] { @@ -187,7 +182,6 @@ impl Service { if let Some(ids) = self.connected_peers.get_mut(&peer_id) { ids.insert(id.clone()); result.insert(id.clone(), peer_id.clone()); - continue; } } } @@ -203,12 +197,11 @@ impl Service { /// This method will also clean up all previously revoked requests. /// it takes `network_service` and `authority_discovery_service` by value /// and returns them as a workaround for the Future: Send requirement imposed by async fn impl. - #[tracing::instrument(level = "trace", skip(self, connected, revoke, network_service, authority_discovery_service), fields(subsystem = LOG_TARGET))] + #[tracing::instrument(level = "trace", skip(self, connected, network_service, authority_discovery_service), fields(subsystem = LOG_TARGET))] pub async fn on_request( &mut self, validator_ids: Vec, mut connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>, - revoke: oneshot::Receiver<()>, mut network_service: N, mut authority_discovery_service: AD, ) -> (N, AD) { @@ -276,7 +269,7 @@ impl Service { for id in revoked_validators.into_iter() { let result = authority_discovery_service.get_addresses_by_authority_id(id).await; if let Some(addresses) = result { - multiaddr_to_remove.extend(addresses.into_iter().take(MAX_ADDR_PER_PEER)); + multiaddr_to_remove.extend(addresses.into_iter()); } } @@ -300,7 +293,6 @@ impl Service { validator_ids, pending, connected, - revoke, )); (network_service, authority_discovery_service) @@ -418,39 +410,18 @@ mod tests { } #[test] - fn request_is_revoked_on_send() { - let (revoke_tx, revoke_rx) = oneshot::channel(); - let (sender, _receiver) = mpsc::channel(0); + fn request_is_revoked_when_the_receiver_is_dropped() { + let (sender, receiver) = mpsc::channel(0); let mut request = NonRevokedConnectionRequestState::new( Vec::new(), HashSet::new(), sender, - revoke_rx, ); assert!(!request.is_revoked()); - revoke_tx.send(()).unwrap(); - - assert!(request.is_revoked()); - } - - #[test] - fn request_is_revoked_when_the_sender_is_dropped() { - let (revoke_tx, revoke_rx) = oneshot::channel(); - let (sender, _receiver) = mpsc::channel(0); - - let mut request = NonRevokedConnectionRequestState::new( - Vec::new(), - HashSet::new(), - sender, - revoke_rx, - ); - - assert!(!request.is_revoked()); - - drop(revoke_tx); + drop(receiver); assert!(request.is_revoked()); } @@ -467,14 +438,12 @@ mod tests { futures::executor::block_on(async move { let req1 = vec![authority_ids[0].clone(), authority_ids[1].clone()]; let (sender, mut receiver) = mpsc::channel(2); - let (_revoke_tx, revoke_rx) = oneshot::channel(); service.on_peer_connected(&peer_ids[0], &mut ads).await; let _ = service.on_request( req1, sender, - revoke_rx, ns, ads, ).await; @@ -499,12 +468,10 @@ mod tests { futures::executor::block_on(async move { let req1 = vec![authority_ids[0].clone(), authority_ids[1].clone()]; let (sender, mut receiver) = mpsc::channel(2); - let (_revoke_tx, revoke_rx) = oneshot::channel(); let (_, mut ads) = service.on_request( req1, sender, - revoke_rx, ns, ads, ).await; @@ -534,7 +501,6 @@ mod tests { futures::executor::block_on(async move { let (sender, mut receiver) = mpsc::channel(1); - let (revoke_tx, revoke_rx) = oneshot::channel(); service.on_peer_connected(&peer_ids[0], &mut ads).await; service.on_peer_connected(&peer_ids[1], &mut ads).await; @@ -542,22 +508,19 @@ mod tests { let (ns, ads) = service.on_request( vec![authority_ids[0].clone()], sender, - revoke_rx, ns, ads, ).await; let _ = receiver.next().await.unwrap(); // revoke the request - revoke_tx.send(()).unwrap(); + drop(receiver); let (sender, mut receiver) = mpsc::channel(1); - let (_revoke_tx, revoke_rx) = oneshot::channel(); let _ = service.on_request( vec![authority_ids[1].clone()], sender, - revoke_rx, ns, ads, ).await; @@ -581,7 +544,6 @@ mod tests { futures::executor::block_on(async move { let (sender, mut receiver) = mpsc::channel(1); - let (revoke_tx, revoke_rx) = oneshot::channel(); service.on_peer_connected(&peer_ids[0], &mut ads).await; service.on_peer_connected(&peer_ids[1], &mut ads).await; @@ -589,22 +551,19 @@ mod tests { let (ns, ads) = service.on_request( vec![authority_ids[0].clone(), authority_ids[2].clone()], sender, - revoke_rx, ns, ads, ).await; let _ = receiver.next().await.unwrap(); // revoke the first request - revoke_tx.send(()).unwrap(); + drop(receiver); let (sender, mut receiver) = mpsc::channel(1); - let (revoke_tx, revoke_rx) = oneshot::channel(); let (ns, ads) = service.on_request( vec![authority_ids[0].clone(), authority_ids[1].clone()], sender, - revoke_rx, ns, ads, ).await; @@ -614,15 +573,13 @@ mod tests { assert_eq!(ns.priority_group.len(), 2); // revoke the second request - revoke_tx.send(()).unwrap(); + drop(receiver); let (sender, mut receiver) = mpsc::channel(1); - let (_revoke_tx, revoke_rx) = oneshot::channel(); let (ns, _) = service.on_request( vec![authority_ids[0].clone()], sender, - revoke_rx, ns, ads, ).await; @@ -647,7 +604,6 @@ mod tests { futures::executor::block_on(async move { let (sender, mut receiver) = mpsc::channel(1); - let (_revoke_tx, revoke_rx) = oneshot::channel(); service.on_peer_connected(&validator_peer_id, &mut ads).await; @@ -658,7 +614,6 @@ mod tests { let _ = service.on_request( vec![validator_id.clone()], sender, - revoke_rx, ns, ads, ).await; diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs index 853221e0d7..5a2d48003a 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -349,7 +349,7 @@ where Context: SubsystemContext { if let Some(request) = state.last_connection_request.take() { - request.revoke(); + drop(request); } let request = validator_discovery::connect_to_validators( diff --git a/polkadot/node/subsystem-util/src/validator_discovery.rs b/polkadot/node/subsystem-util/src/validator_discovery.rs index fe0df2b17e..0952ad048c 100644 --- a/polkadot/node/subsystem-util/src/validator_discovery.rs +++ b/polkadot/node/subsystem-util/src/validator_discovery.rs @@ -76,33 +76,30 @@ pub async fn connect_to_validators( .filter_map(|(k, v)| v.map(|v| (v, k))) .collect::>(); - let (connections, revoke) = connect_to_authorities(ctx, authorities).await?; + let connections = connect_to_authorities(ctx, authorities).await?; Ok(ConnectionRequest { validator_map, connections, - revoke, }) } async fn connect_to_authorities( ctx: &mut Context, validator_ids: Vec, -) -> Result<(mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>, oneshot::Sender<()>), Error> { +) -> Result, Error> { const PEERS_CAPACITY: usize = 8; - let (revoke_tx, revoke) = oneshot::channel(); let (connected, connected_rx) = mpsc::channel(PEERS_CAPACITY); ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::ConnectToValidators { validator_ids, connected, - revoke, } )).await?; - Ok((connected_rx, revoke_tx)) + Ok(connected_rx) } /// A struct that assists performing multiple concurrent connection requests. @@ -176,15 +173,12 @@ impl stream::Stream for ConnectionRequests { /// This struct implements `Stream` to allow for asynchronous /// discovery of validator addresses. /// -/// NOTE: you should call `revoke` on this struct -/// when you're no longer interested in the requested validators. +/// NOTE: the request will be revoked on drop. #[must_use = "dropping a request will result in its immediate revokation"] pub struct ConnectionRequest { validator_map: HashMap, #[must_use = "streams do nothing unless polled"] connections: mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>, - #[must_use = "a request should be revoked at some point"] - revoke: oneshot::Sender<()>, } impl stream::Stream for ConnectionRequest { @@ -209,29 +203,13 @@ impl stream::Stream for ConnectionRequest { } } -impl ConnectionRequest { - /// By revoking the request the caller allows the network to - /// free some peer slots thus freeing the resources. - /// It doesn't necessarily lead to peers disconnection though. - /// The revokation is enacted on in the next connection request. - /// - /// This can be done either by calling this function or dropping the request. - pub fn revoke(self) { - if let Err(_) = self.revoke.send(()) { - tracing::warn!( - "Failed to revoke a validator connection request", - ); - } - } -} - #[cfg(test)] mod tests { use super::*; use polkadot_primitives::v1::ValidatorPair; use sp_core::{Pair, Public}; - use futures::{executor, poll, channel::{mpsc, oneshot}, StreamExt, SinkExt}; + use futures::{executor, poll, StreamExt, SinkExt}; #[test] fn adding_a_connection_request_works() { @@ -251,7 +229,6 @@ mod tests { validator_map.insert(auth_2.clone(), validator_2.clone()); let (mut rq1_tx, rq1_rx) = mpsc::channel(8); - let (revoke_1_tx, _revoke_1_rx) = oneshot::channel(); let peer_id_1 = PeerId::random(); let peer_id_2 = PeerId::random(); @@ -259,7 +236,6 @@ mod tests { let connection_request_1 = ConnectionRequest { validator_map, connections: rq1_rx, - revoke: revoke_1_tx, }; let relay_parent_1 = Hash::repeat_byte(1); @@ -302,10 +278,8 @@ mod tests { validator_map_2.insert(auth_2.clone(), validator_2.clone()); let (mut rq1_tx, rq1_rx) = mpsc::channel(8); - let (revoke_1_tx, _revoke_1_rx) = oneshot::channel(); let (mut rq2_tx, rq2_rx) = mpsc::channel(8); - let (revoke_2_tx, _revoke_2_rx) = oneshot::channel(); let peer_id_1 = PeerId::random(); let peer_id_2 = PeerId::random(); @@ -313,13 +287,11 @@ mod tests { let connection_request_1 = ConnectionRequest { validator_map: validator_map_1, connections: rq1_rx, - revoke: revoke_1_tx, }; let connection_request_2 = ConnectionRequest { validator_map: validator_map_2, connections: rq2_rx, - revoke: revoke_2_tx, }; let relay_parent_1 = Hash::repeat_byte(1); @@ -364,10 +336,8 @@ mod tests { validator_map_2.insert(auth_2.clone(), validator_2.clone()); let (mut rq1_tx, rq1_rx) = mpsc::channel(8); - let (revoke_1_tx, _revoke_1_rx) = oneshot::channel(); let (mut rq2_tx, rq2_rx) = mpsc::channel(8); - let (revoke_2_tx, _revoke_2_rx) = oneshot::channel(); let peer_id_1 = PeerId::random(); let peer_id_2 = PeerId::random(); @@ -375,13 +345,11 @@ mod tests { let connection_request_1 = ConnectionRequest { validator_map: validator_map_1, connections: rq1_rx, - revoke: revoke_1_tx, }; let connection_request_2 = ConnectionRequest { validator_map: validator_map_2, connections: rq2_rx, - revoke: revoke_2_tx, }; let relay_parent = Hash::repeat_byte(3); diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index 0d395f2167..87d2981b54 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -208,6 +208,7 @@ pub enum NetworkBridgeMessage { /// /// Also ask the network to stay connected to these peers at least /// until the request is revoked. + /// This can be done by dropping the receiver. ConnectToValidators { /// Ids of the validators to connect to. validator_ids: Vec, @@ -215,13 +216,6 @@ pub enum NetworkBridgeMessage { /// the validators as they are connected. /// The response is sent immediately for already connected peers. connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>, - /// By revoking the request the caller allows the network to - /// free some peer slots thus freeing the resources. - /// It doesn't necessarily lead to peers disconnection though. - /// The revokation is enacted on in the next connection request. - /// - /// This can be done by sending to the channel or dropping the sender. - revoke: oneshot::Receiver<()>, }, } diff --git a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md index e0ab023a16..3b4b0d5b02 100644 --- a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -54,8 +54,8 @@ enum ApprovalVotingMessage { /// Check if the assignment is valid and can be accepted by our view of the protocol. /// Should not be sent unless the block hash is known. CheckAndImportAssignment( - Hash, - AssignmentCert, + Hash, + AssignmentCert, ValidatorIndex, ResponseChannel, ), @@ -68,11 +68,11 @@ enum ApprovalVotingMessage { ResponseChannel, ), /// Returns the highest possible ancestor hash of the provided block hash which is - /// acceptable to vote on finality for. + /// acceptable to vote on finality for. /// The `BlockNumber` provided is the number of the block's ancestor which is the /// earliest possible vote. - /// - /// It can also return the same block hash, if that is acceptable to vote upon. + /// + /// It can also return the same block hash, if that is acceptable to vote upon. /// Return `None` if the input hash is unrecognized. ApprovedAncestor(Hash, BlockNumber, ResponseChannel>), } @@ -122,8 +122,8 @@ Messages received by the availability recovery subsystem. enum AvailabilityRecoveryMessage { /// Recover available data from validators on the network. RecoverAvailableData( - CandidateDescriptor, - SessionIndex, + CandidateDescriptor, + SessionIndex, ResponseChannel>, ), } @@ -293,6 +293,7 @@ enum NetworkBridgeMessage { /// /// Also ask the network to stay connected to these peers at least /// until the request is revoked. + /// This can be done by dropping the receiver. ConnectToValidators { /// Ids of the validators to connect to. validator_ids: Vec, @@ -300,13 +301,6 @@ enum NetworkBridgeMessage { /// the validators as they are connected. /// The response is sent immediately for already connected peers. connected: ResponseStream<(AuthorityDiscoveryId, PeerId)>, - /// By revoking the request the caller allows the network to - /// free some peer slots thus freeing the resources. - /// It doesn't necessarily lead to peers disconnection though. - /// The revokation is enacted on in the next connection request. - /// - /// This can be done by sending to the channel or dropping the sender. - revoke: ReceiverChannel<()>, }, } ``` @@ -409,7 +403,7 @@ enum RuntimeApiRequest { SessionIndex(ResponseChannel), /// Get the validation code for a specific para, using the given occupied core assumption. ValidationCode(ParaId, OccupiedCoreAssumption, ResponseChannel>), - /// Fetch the historical validation code used by a para for candidates executed in + /// Fetch the historical validation code used by a para for candidates executed in /// the context of a given block height in the current chain. HistoricalValidationCode(ParaId, BlockNumber, ResponseChannel>), /// with the given occupied core assumption.