mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 06:21:11 +00:00
Generic request/response infrastructure for Polkadot (#2352)
* Move NetworkBridgeEvent to subsystem::messages. It is not protocol related at all, it is in fact only part of the subsystem communication as it gets wrapped into messages of each subsystem. * Request/response infrastructure is taking shape. WIP: Does not compile. * Multiplexer variant not supported by Rusts type system. * request_response::request type checks. * Cleanup. * Minor fixes for request_response. * Implement request sending + move multiplexer. Request multiplexer is moved to bridge, as there the implementation is more straight forward as we can specialize on `AllMessages` for the multiplexing target. Sending of requests is mostly complete, apart from a few `From` instances. Receiving is also almost done, initializtion needs to be fixed and the multiplexer needs to be invoked. * Remove obsolete multiplexer. * Initialize bridge with multiplexer. * Finish generic request sending/receiving. Subsystems are now able to receive and send requests and responses via the overseer. * Doc update. * Fixes. * Link issue for not yet implemented code. * Fixes suggested by @ordian - thanks! - start encoding at 0 - don't crash on zero protocols - don't panic on not yet implemented request handling * Update node/network/protocol/src/request_response/v1.rs Use index 0 instead of 1. Co-authored-by: Andronik Ordian <write@reusable.software> * Update node/network/protocol/src/request_response.rs Co-authored-by: Andronik Ordian <write@reusable.software> * Fix existing tests. * Better avoidance of division by zoro errors. * Doc fixes. * send_request -> start_request. * Fix missing renamings. * Update substrate. * Pass TryConnect instead of true. * Actually import `IfDisconnected`. * Fix wrong import. * Update node/network/bridge/src/lib.rs typo Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com> * Update node/network/bridge/src/multiplexer.rs Remove redundant import. Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com> * Stop doing tracing from within `From` instance. Thanks for the catch @tomaka! * Get rid of redundant import. * Formatting cleanup. * Fix tests. * Add link to issue. * Clarify comments some more. * Fix tests. * Formatting fix. * tabs * Fix link Co-authored-by: Bernhard Schuster <bernhard@ahoi.io> * Use map_err. Co-authored-by: Bernhard Schuster <bernhard@ahoi.io> * Improvements inspired by suggestions by @drahnr. - Channel size is now determined by function. - Explicitely scope NetworkService::start_request. Co-authored-by: Andronik Ordian <write@reusable.software> Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com> Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>
This commit is contained in:
@@ -22,13 +22,14 @@ use polkadot_node_network_protocol::{
|
||||
peer_set::PeerSet, v1 as protocol_v1, PeerId, ReputationChange,
|
||||
};
|
||||
use polkadot_primitives::v1::{AuthorityDiscoveryId, BlockNumber};
|
||||
use polkadot_subsystem::messages::NetworkBridgeMessage;
|
||||
use polkadot_subsystem::messages::{AllMessages, NetworkBridgeMessage};
|
||||
use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal};
|
||||
use sc_network::Event as NetworkEvent;
|
||||
|
||||
use polkadot_node_network_protocol::ObservedRole;
|
||||
use polkadot_node_network_protocol::{request_response::Requests, ObservedRole};
|
||||
|
||||
use super::{WireMessage, LOG_TARGET, MALFORMED_MESSAGE_COST};
|
||||
use super::multiplexer::RequestMultiplexError;
|
||||
use super::{WireMessage, MALFORMED_MESSAGE_COST};
|
||||
|
||||
/// Internal type combining all actions a `NetworkBridge` might perform.
|
||||
///
|
||||
@@ -43,6 +44,9 @@ pub(crate) enum Action {
|
||||
/// Ask network to send a collation message.
|
||||
SendCollationMessages(Vec<(Vec<PeerId>, protocol_v1::CollationProtocol)>),
|
||||
|
||||
/// Ask network to send requests.
|
||||
SendRequests(Vec<Requests>),
|
||||
|
||||
/// Ask network to connect to validators.
|
||||
ConnectToValidators {
|
||||
validator_ids: Vec<AuthorityDiscoveryId>,
|
||||
@@ -76,13 +80,32 @@ pub(crate) enum Action {
|
||||
Vec<WireMessage<protocol_v1::CollationProtocol>>,
|
||||
),
|
||||
|
||||
Abort,
|
||||
/// Send a message to another subsystem or the overseer.
|
||||
///
|
||||
/// Used for handling incoming requests.
|
||||
SendMessage(AllMessages),
|
||||
|
||||
/// Abort with reason.
|
||||
Abort(AbortReason),
|
||||
Nop,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum AbortReason {
|
||||
/// Received error from overseer:
|
||||
SubsystemError(polkadot_subsystem::SubsystemError),
|
||||
/// The stream of incoming events concluded.
|
||||
EventStreamConcluded,
|
||||
/// The stream of incoming requests concluded.
|
||||
RequestStreamConcluded,
|
||||
/// We received OverseerSignal::Conclude
|
||||
OverseerConcluded,
|
||||
}
|
||||
|
||||
impl From<polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>>> for Action {
|
||||
#[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))]
|
||||
fn from(res: polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>>) -> Self {
|
||||
fn from(
|
||||
res: polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>>,
|
||||
) -> Self {
|
||||
match res {
|
||||
Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(active_leaves))) => {
|
||||
Action::ActiveLeaves(active_leaves)
|
||||
@@ -90,7 +113,9 @@ impl From<polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>
|
||||
Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number))) => {
|
||||
Action::BlockFinalized(number)
|
||||
}
|
||||
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => Action::Abort,
|
||||
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => {
|
||||
Action::Abort(AbortReason::OverseerConcluded)
|
||||
}
|
||||
Ok(FromOverseer::Communication { msg }) => match msg {
|
||||
NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep),
|
||||
NetworkBridgeMessage::SendValidationMessage(peers, msg) => {
|
||||
@@ -99,6 +124,7 @@ impl From<polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>
|
||||
NetworkBridgeMessage::SendCollationMessage(peers, msg) => {
|
||||
Action::SendCollationMessages(vec![(peers, msg)])
|
||||
}
|
||||
NetworkBridgeMessage::SendRequests(reqs) => Action::SendRequests(reqs),
|
||||
NetworkBridgeMessage::SendValidationMessages(msgs) => {
|
||||
Action::SendValidationMessages(msgs)
|
||||
}
|
||||
@@ -113,25 +139,15 @@ impl From<polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>
|
||||
connected,
|
||||
},
|
||||
},
|
||||
Err(e) => {
|
||||
tracing::warn!(target: LOG_TARGET, err = ?e, "Shutting down Network Bridge due to error");
|
||||
Action::Abort
|
||||
}
|
||||
Err(e) => Action::Abort(AbortReason::SubsystemError(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Option<NetworkEvent>> for Action {
|
||||
#[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))]
|
||||
fn from(event: Option<NetworkEvent>) -> Action {
|
||||
match event {
|
||||
None => {
|
||||
tracing::info!(
|
||||
target: LOG_TARGET,
|
||||
"Shutting down Network Bridge: underlying event stream concluded"
|
||||
);
|
||||
Action::Abort
|
||||
}
|
||||
None => Action::Abort(AbortReason::EventStreamConcluded),
|
||||
Some(NetworkEvent::Dht(_))
|
||||
| Some(NetworkEvent::SyncConnected { .. })
|
||||
| Some(NetworkEvent::SyncDisconnected { .. }) => Action::Nop,
|
||||
@@ -153,7 +169,9 @@ impl From<Option<NetworkEvent>> for Action {
|
||||
Some(NetworkEvent::NotificationsReceived { remote, messages }) => {
|
||||
let v_messages: Result<Vec<_>, _> = messages
|
||||
.iter()
|
||||
.filter(|(protocol, _)| protocol == &PeerSet::Validation.into_protocol_name())
|
||||
.filter(|(protocol, _)| {
|
||||
protocol == &PeerSet::Validation.into_protocol_name()
|
||||
})
|
||||
.map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
|
||||
.collect();
|
||||
|
||||
@@ -164,7 +182,9 @@ impl From<Option<NetworkEvent>> for Action {
|
||||
|
||||
let c_messages: Result<Vec<_>, _> = messages
|
||||
.iter()
|
||||
.filter(|(protocol, _)| protocol == &PeerSet::Collation.into_protocol_name())
|
||||
.filter(|(protocol, _)| {
|
||||
protocol == &PeerSet::Collation.into_protocol_name()
|
||||
})
|
||||
.map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
|
||||
.collect();
|
||||
|
||||
@@ -182,3 +202,13 @@ impl From<Option<NetworkEvent>> for Action {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Option<Result<AllMessages, RequestMultiplexError>>> for Action {
|
||||
fn from(event: Option<Result<AllMessages, RequestMultiplexError>>) -> Self {
|
||||
match event {
|
||||
None => Action::Abort(AbortReason::RequestStreamConcluded),
|
||||
Some(Err(err)) => Action::ReportPeer(err.peer, MALFORMED_MESSAGE_COST),
|
||||
Some(Ok(msg)) => Action::SendMessage(msg),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,11 +30,11 @@ use polkadot_subsystem::{
|
||||
use polkadot_subsystem::messages::{
|
||||
NetworkBridgeMessage, AllMessages, AvailabilityDistributionMessage,
|
||||
BitfieldDistributionMessage, PoVDistributionMessage, StatementDistributionMessage,
|
||||
CollatorProtocolMessage, ApprovalDistributionMessage,
|
||||
CollatorProtocolMessage, ApprovalDistributionMessage, NetworkBridgeEvent,
|
||||
};
|
||||
use polkadot_primitives::v1::{Hash, BlockNumber};
|
||||
use polkadot_node_network_protocol::{
|
||||
ReputationChange, PeerId, peer_set::PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1, OurView,
|
||||
ReputationChange, PeerId, peer_set::PeerSet, View, v1 as protocol_v1, OurView,
|
||||
};
|
||||
|
||||
/// Peer set infos for network initialization.
|
||||
@@ -53,7 +53,7 @@ mod validator_discovery;
|
||||
/// All requested `NetworkBridgeMessage` user actions and `NetworkEvent` network messages are
|
||||
/// translated to `Action` before being processed by `run_network`.
|
||||
mod action;
|
||||
use action::Action;
|
||||
use action::{Action, AbortReason};
|
||||
|
||||
/// Actual interfacing to the network based on the `Network` trait.
|
||||
///
|
||||
@@ -61,6 +61,10 @@ use action::Action;
|
||||
mod network;
|
||||
use network::{Network, send_message};
|
||||
|
||||
/// Request multiplexer for combining the multiple request sources into a single `Stream` of `AllMessages`.
|
||||
mod multiplexer;
|
||||
pub use multiplexer::RequestMultiplexer;
|
||||
|
||||
|
||||
/// The maximum amount of heads a peer is allowed to have in their view at any time.
|
||||
///
|
||||
@@ -95,6 +99,7 @@ pub struct NetworkBridge<N, AD> {
|
||||
/// `Network` trait implementing type.
|
||||
network_service: N,
|
||||
authority_discovery_service: AD,
|
||||
request_multiplexer: RequestMultiplexer,
|
||||
}
|
||||
|
||||
impl<N, AD> NetworkBridge<N, AD> {
|
||||
@@ -102,10 +107,11 @@ impl<N, AD> NetworkBridge<N, AD> {
|
||||
///
|
||||
/// This assumes that the network service has had the notifications protocol for the network
|
||||
/// bridge already registered. See [`peers_sets_info`](peers_sets_info).
|
||||
pub fn new(network_service: N, authority_discovery_service: AD) -> Self {
|
||||
pub fn new(network_service: N, authority_discovery_service: AD, request_multiplexer: RequestMultiplexer) -> Self {
|
||||
NetworkBridge {
|
||||
network_service,
|
||||
authority_discovery_service,
|
||||
request_multiplexer,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -119,12 +125,7 @@ impl<Net, AD, Context> Subsystem<Context> for NetworkBridge<Net, AD>
|
||||
fn start(self, ctx: Context) -> SpawnedSubsystem {
|
||||
// Swallow error because failure is fatal to the node and we log with more precision
|
||||
// within `run_network`.
|
||||
let Self { network_service, authority_discovery_service } = self;
|
||||
let future = run_network(
|
||||
network_service,
|
||||
authority_discovery_service,
|
||||
ctx,
|
||||
)
|
||||
let future = run_network(self, ctx)
|
||||
.map_err(|e| {
|
||||
SubsystemError::with_origin("network-bridge", e)
|
||||
})
|
||||
@@ -142,17 +143,16 @@ struct PeerData {
|
||||
}
|
||||
|
||||
/// Main driver, processing network events and messages from other subsystems.
|
||||
#[tracing::instrument(skip(network_service, authority_discovery_service, ctx), fields(subsystem = LOG_TARGET))]
|
||||
#[tracing::instrument(skip(bridge, ctx), fields(subsystem = LOG_TARGET))]
|
||||
async fn run_network<N, AD>(
|
||||
mut network_service: N,
|
||||
mut authority_discovery_service: AD,
|
||||
mut bridge: NetworkBridge<N, AD>,
|
||||
mut ctx: impl SubsystemContext<Message=NetworkBridgeMessage>,
|
||||
) -> SubsystemResult<()>
|
||||
where
|
||||
N: Network + validator_discovery::Network,
|
||||
AD: validator_discovery::AuthorityDiscovery,
|
||||
{
|
||||
let mut event_stream = network_service.event_stream().fuse();
|
||||
let mut event_stream = bridge.network_service.event_stream().fuse();
|
||||
|
||||
// Most recent heads are at the back.
|
||||
let mut live_heads: Vec<(Hash, Arc<JaegerSpan>)> = Vec::with_capacity(MAX_VIEW_HEADS);
|
||||
@@ -169,22 +169,55 @@ where
|
||||
let action = {
|
||||
let subsystem_next = ctx.recv().fuse();
|
||||
let mut net_event_next = event_stream.next().fuse();
|
||||
let mut req_res_event_next = bridge.request_multiplexer.next().fuse();
|
||||
futures::pin_mut!(subsystem_next);
|
||||
|
||||
futures::select! {
|
||||
subsystem_msg = subsystem_next => Action::from(subsystem_msg),
|
||||
net_event = net_event_next => Action::from(net_event),
|
||||
req_res_event = req_res_event_next => Action::from(req_res_event),
|
||||
}
|
||||
};
|
||||
|
||||
match action {
|
||||
Action::Nop => {}
|
||||
Action::Abort => return Ok(()),
|
||||
Action::Abort(reason) => match reason {
|
||||
AbortReason::SubsystemError(err) => {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
err = ?err,
|
||||
"Shutting down Network Bridge due to error"
|
||||
);
|
||||
return Err(SubsystemError::Context(format!(
|
||||
"Received SubsystemError from overseer: {:?}",
|
||||
err
|
||||
)));
|
||||
}
|
||||
AbortReason::EventStreamConcluded => {
|
||||
tracing::info!(
|
||||
target: LOG_TARGET,
|
||||
"Shutting down Network Bridge: underlying request stream concluded"
|
||||
);
|
||||
return Err(SubsystemError::Context(
|
||||
"Incoming network event stream concluded.".to_string(),
|
||||
));
|
||||
}
|
||||
AbortReason::RequestStreamConcluded => {
|
||||
tracing::info!(
|
||||
target: LOG_TARGET,
|
||||
"Shutting down Network Bridge: underlying request stream concluded"
|
||||
);
|
||||
return Err(SubsystemError::Context(
|
||||
"Incoming network request stream concluded".to_string(),
|
||||
));
|
||||
}
|
||||
AbortReason::OverseerConcluded => return Ok(()),
|
||||
}
|
||||
|
||||
Action::SendValidationMessages(msgs) => {
|
||||
for (peers, msg) in msgs {
|
||||
send_message(
|
||||
&mut network_service,
|
||||
&mut bridge.network_service,
|
||||
peers,
|
||||
PeerSet::Validation,
|
||||
WireMessage::ProtocolMessage(msg),
|
||||
@@ -195,7 +228,7 @@ where
|
||||
Action::SendCollationMessages(msgs) => {
|
||||
for (peers, msg) in msgs {
|
||||
send_message(
|
||||
&mut network_service,
|
||||
&mut bridge.network_service,
|
||||
peers,
|
||||
PeerSet::Collation,
|
||||
WireMessage::ProtocolMessage(msg),
|
||||
@@ -203,6 +236,12 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
Action::SendRequests(reqs) => {
|
||||
for req in reqs {
|
||||
bridge.network_service.start_request(req);
|
||||
}
|
||||
},
|
||||
|
||||
Action::ConnectToValidators {
|
||||
validator_ids,
|
||||
connected,
|
||||
@@ -210,21 +249,28 @@ where
|
||||
let (ns, ads) = validator_discovery.on_request(
|
||||
validator_ids,
|
||||
connected,
|
||||
network_service,
|
||||
authority_discovery_service,
|
||||
bridge.network_service,
|
||||
bridge.authority_discovery_service,
|
||||
).await;
|
||||
network_service = ns;
|
||||
authority_discovery_service = ads;
|
||||
bridge.network_service = ns;
|
||||
bridge.authority_discovery_service = ads;
|
||||
},
|
||||
|
||||
Action::ReportPeer(peer, rep) => network_service.report_peer(peer, rep).await?,
|
||||
Action::ReportPeer(peer, rep) => {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
peer = ?peer,
|
||||
"Peer sent us an invalid request",
|
||||
);
|
||||
bridge.network_service.report_peer(peer, rep).await?
|
||||
}
|
||||
|
||||
Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => {
|
||||
live_heads.extend(activated);
|
||||
live_heads.retain(|h| !deactivated.contains(&h.0));
|
||||
|
||||
update_our_view(
|
||||
&mut network_service,
|
||||
&mut bridge.network_service,
|
||||
&mut ctx,
|
||||
&live_heads,
|
||||
&mut local_view,
|
||||
@@ -250,7 +296,7 @@ where
|
||||
PeerSet::Collation => &mut collation_peers,
|
||||
};
|
||||
|
||||
validator_discovery.on_peer_connected(&peer, &mut authority_discovery_service).await;
|
||||
validator_discovery.on_peer_connected(&peer, &mut bridge.authority_discovery_service).await;
|
||||
|
||||
match peer_map.entry(peer.clone()) {
|
||||
hash_map::Entry::Occupied(_) => continue,
|
||||
@@ -311,7 +357,7 @@ where
|
||||
peer.clone(),
|
||||
&mut validation_peers,
|
||||
v_messages,
|
||||
&mut network_service,
|
||||
&mut bridge.network_service,
|
||||
).await?;
|
||||
|
||||
dispatch_validation_events_to_all(events, &mut ctx).await;
|
||||
@@ -322,12 +368,13 @@ where
|
||||
peer.clone(),
|
||||
&mut collation_peers,
|
||||
c_messages,
|
||||
&mut network_service,
|
||||
&mut bridge.network_service,
|
||||
).await?;
|
||||
|
||||
dispatch_collation_events_to_all(events, &mut ctx).await;
|
||||
}
|
||||
},
|
||||
Action::SendMessage(msg) => ctx.send_message(msg).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -562,9 +609,10 @@ mod tests {
|
||||
use polkadot_node_subsystem_util::metered;
|
||||
use polkadot_node_network_protocol::view;
|
||||
use sc_network::Multiaddr;
|
||||
use sc_network::config::RequestResponseConfig;
|
||||
use sp_keyring::Sr25519Keyring;
|
||||
use polkadot_primitives::v1::AuthorityDiscoveryId;
|
||||
use polkadot_node_network_protocol::ObservedRole;
|
||||
use polkadot_node_network_protocol::{ObservedRole, request_response::request::Requests};
|
||||
|
||||
use crate::network::{Network, NetworkAction};
|
||||
|
||||
@@ -572,6 +620,7 @@ mod tests {
|
||||
struct TestNetwork {
|
||||
net_events: Arc<Mutex<Option<SingleItemStream<NetworkEvent>>>>,
|
||||
action_tx: metered::UnboundedMeteredSender<NetworkAction>,
|
||||
_req_configs: Vec<RequestResponseConfig>,
|
||||
}
|
||||
|
||||
struct TestAuthorityDiscovery;
|
||||
@@ -583,7 +632,7 @@ mod tests {
|
||||
net_tx: SingleItemSink<NetworkEvent>,
|
||||
}
|
||||
|
||||
fn new_test_network() -> (
|
||||
fn new_test_network(req_configs: Vec<RequestResponseConfig>) -> (
|
||||
TestNetwork,
|
||||
TestNetworkHandle,
|
||||
TestAuthorityDiscovery,
|
||||
@@ -595,6 +644,7 @@ mod tests {
|
||||
TestNetwork {
|
||||
net_events: Arc::new(Mutex::new(Some(net_rx))),
|
||||
action_tx,
|
||||
_req_configs: req_configs,
|
||||
},
|
||||
TestNetworkHandle {
|
||||
action_rx,
|
||||
@@ -617,6 +667,9 @@ mod tests {
|
||||
{
|
||||
Box::pin((&mut self.action_tx).sink_map_err(Into::into))
|
||||
}
|
||||
|
||||
fn start_request(&self, _: Requests) {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -698,12 +751,18 @@ mod tests {
|
||||
|
||||
fn test_harness<T: Future<Output=()>>(test: impl FnOnce(TestHarness) -> T) {
|
||||
let pool = sp_core::testing::TaskExecutor::new();
|
||||
let (network, network_handle, discovery) = new_test_network();
|
||||
let (request_multiplexer, req_configs) = RequestMultiplexer::new();
|
||||
let (network, network_handle, discovery) = new_test_network(req_configs);
|
||||
let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
|
||||
|
||||
let bridge = NetworkBridge {
|
||||
network_service: network,
|
||||
authority_discovery_service: discovery,
|
||||
request_multiplexer,
|
||||
};
|
||||
|
||||
let network_bridge = run_network(
|
||||
network,
|
||||
discovery,
|
||||
bridge,
|
||||
context,
|
||||
)
|
||||
.map_err(|_| panic!("subsystem execution failed"))
|
||||
|
||||
@@ -0,0 +1,132 @@
|
||||
// Copyright 2021 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::pin::Pin;
|
||||
|
||||
use futures::channel::mpsc;
|
||||
use futures::stream::Stream;
|
||||
use futures::task::{Context, Poll};
|
||||
use strum::IntoEnumIterator;
|
||||
|
||||
use parity_scale_codec::{Decode, Error as DecodingError};
|
||||
|
||||
use sc_network::config as network;
|
||||
use sc_network::PeerId;
|
||||
|
||||
use polkadot_node_network_protocol::request_response::{
|
||||
request::IncomingRequest, v1, Protocol, RequestResponseConfig,
|
||||
};
|
||||
use polkadot_subsystem::messages::AllMessages;
|
||||
|
||||
/// Multiplex incoming network requests.
|
||||
///
|
||||
/// This multiplexer consumes all request streams and makes them a `Stream` of a single message
|
||||
/// type, useful for the network bridge to send them via the `Overseer` to other subsystems.
|
||||
pub struct RequestMultiplexer {
|
||||
receivers: Vec<(Protocol, mpsc::Receiver<network::IncomingRequest>)>,
|
||||
next_poll: usize,
|
||||
}
|
||||
|
||||
/// Multiplexing can fail in case of invalid messages.
|
||||
pub struct RequestMultiplexError {
|
||||
/// The peer that sent the invalid message.
|
||||
pub peer: PeerId,
|
||||
/// The error that occurred.
|
||||
pub error: DecodingError,
|
||||
}
|
||||
|
||||
impl RequestMultiplexer {
|
||||
/// Create a new `RequestMultiplexer`.
|
||||
///
|
||||
/// This function uses `Protocol::get_config` for each available protocol and creates a
|
||||
/// `RequestMultiplexer` from it. The returned `RequestResponseConfig`s must be passed to the
|
||||
/// network implementation.
|
||||
pub fn new() -> (Self, Vec<RequestResponseConfig>) {
|
||||
let (receivers, cfgs): (Vec<_>, Vec<_>) = Protocol::iter()
|
||||
.map(|p| {
|
||||
let (rx, cfg) = p.get_config();
|
||||
((p, rx), cfg)
|
||||
})
|
||||
.unzip();
|
||||
|
||||
(
|
||||
Self {
|
||||
receivers,
|
||||
next_poll: 0,
|
||||
},
|
||||
cfgs,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for RequestMultiplexer {
|
||||
type Item = Result<AllMessages, RequestMultiplexError>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let len = self.receivers.len();
|
||||
let mut count = len;
|
||||
let mut i = self.next_poll;
|
||||
let mut result = Poll::Ready(None);
|
||||
// Poll streams in round robin fashion:
|
||||
while count > 0 {
|
||||
// % safe, because count initialized to len, loop would not be entered if 0, also
|
||||
// length of receivers is fixed.
|
||||
let (p, rx): &mut (_, _) = &mut self.receivers[i % len];
|
||||
i += 1;
|
||||
count -= 1;
|
||||
match Pin::new(rx).poll_next(cx) {
|
||||
// If at least one stream is pending, then we are not done yet (No
|
||||
// Ready(None)).
|
||||
Poll::Pending => result = Poll::Pending,
|
||||
// Receiver is a fused stream, which allows for this simple handling of
|
||||
// exhausted ones.
|
||||
Poll::Ready(None) => {}
|
||||
Poll::Ready(Some(v)) => {
|
||||
result = Poll::Ready(Some(multiplex_single(*p, v)));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
self.next_poll = i;
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert a single raw incoming request into a `MultiplexMessage`.
|
||||
fn multiplex_single(
|
||||
p: Protocol,
|
||||
network::IncomingRequest {
|
||||
payload,
|
||||
peer,
|
||||
pending_response,
|
||||
}: network::IncomingRequest,
|
||||
) -> Result<AllMessages, RequestMultiplexError> {
|
||||
let r = match p {
|
||||
Protocol::AvailabilityFetching => From::from(IncomingRequest::new(
|
||||
peer,
|
||||
decode_with_peer::<v1::AvailabilityFetchingRequest>(peer, payload)?,
|
||||
pending_response,
|
||||
)),
|
||||
};
|
||||
Ok(r)
|
||||
}
|
||||
|
||||
fn decode_with_peer<Req: Decode>(
|
||||
peer: PeerId,
|
||||
payload: Vec<u8>,
|
||||
) -> Result<Req, RequestMultiplexError> {
|
||||
Req::decode(&mut payload.as_ref()).map_err(|error| RequestMultiplexError { peer, error })
|
||||
}
|
||||
@@ -24,12 +24,18 @@ use futures::stream::BoxStream;
|
||||
use parity_scale_codec::Encode;
|
||||
|
||||
use sc_network::Event as NetworkEvent;
|
||||
use sc_network::{NetworkService, IfDisconnected};
|
||||
|
||||
use super::LOG_TARGET;
|
||||
use polkadot_node_network_protocol::{peer_set::PeerSet, PeerId, ReputationChange};
|
||||
use polkadot_node_network_protocol::{
|
||||
peer_set::PeerSet,
|
||||
request_response::{OutgoingRequest, Requests},
|
||||
PeerId, ReputationChange,
|
||||
};
|
||||
use polkadot_primitives::v1::{Block, Hash};
|
||||
use polkadot_subsystem::{SubsystemError, SubsystemResult};
|
||||
|
||||
use super::LOG_TARGET;
|
||||
|
||||
/// Send a message to the network.
|
||||
///
|
||||
/// This function is only used internally by the network-bridge, which is responsible to only send
|
||||
@@ -86,7 +92,6 @@ pub enum NetworkAction {
|
||||
}
|
||||
|
||||
/// An abstraction over networking for the purposes of this subsystem.
|
||||
///
|
||||
pub trait Network: Send + 'static {
|
||||
/// Get a stream of all events occurring on the network. This may include events unrelated
|
||||
/// to the Polkadot protocol - the user of this function should filter only for events related
|
||||
@@ -99,6 +104,9 @@ pub trait Network: Send + 'static {
|
||||
&'a mut self,
|
||||
) -> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>>;
|
||||
|
||||
/// Send a request to a remote peer.
|
||||
fn start_request(&self, req: Requests);
|
||||
|
||||
/// Report a given peer as either beneficial (+) or costly (-) according to the given scalar.
|
||||
fn report_peer(
|
||||
&mut self,
|
||||
@@ -129,9 +137,9 @@ pub trait Network: Send + 'static {
|
||||
}
|
||||
}
|
||||
|
||||
impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
|
||||
impl Network for Arc<NetworkService<Block, Hash>> {
|
||||
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
|
||||
sc_network::NetworkService::event_stream(self, "polkadot-network-bridge").boxed()
|
||||
NetworkService::event_stream(self, "polkadot-network-bridge").boxed()
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
|
||||
@@ -141,7 +149,7 @@ impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
|
||||
use futures::task::{Context, Poll};
|
||||
|
||||
// wrapper around a NetworkService to make it act like a sink.
|
||||
struct ActionSink<'b>(&'b sc_network::NetworkService<Block, Hash>);
|
||||
struct ActionSink<'b>(&'b NetworkService<Block, Hash>);
|
||||
|
||||
impl<'b> Sink<NetworkAction> for ActionSink<'b> {
|
||||
type Error = SubsystemError;
|
||||
@@ -180,4 +188,23 @@ impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
|
||||
|
||||
Box::pin(ActionSink(&**self))
|
||||
}
|
||||
|
||||
fn start_request(&self, req: Requests) {
|
||||
let (
|
||||
protocol,
|
||||
OutgoingRequest {
|
||||
peer,
|
||||
payload,
|
||||
pending_response,
|
||||
},
|
||||
) = req.encode_request();
|
||||
|
||||
NetworkService::start_request(&*self,
|
||||
peer,
|
||||
protocol.into_protocol_name(),
|
||||
payload,
|
||||
pending_response,
|
||||
IfDisconnected::TryConnect,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user