Implement request-responses protocols (#6634)

* Implement request-responses protocols

* Add tests

* Fix sc-cli

* Apply suggestions from code review

Co-authored-by: Max Inden <mail@max-inden.de>

* Fix naming

* Fix other issues

* Other naming fix

* Fix error logging

* Max sizes to u64

* Don't kill connections on refusal to process

* Adjust comment

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Pierre Krieger
2020-08-27 14:53:20 +02:00
committed by GitHub
parent 5b7af66384
commit 37d0e00d83
9 changed files with 1183 additions and 65 deletions
+191 -34
View File
@@ -29,7 +29,7 @@
use crate::{
ExHashT, NetworkStateInfo,
behaviour::{Behaviour, BehaviourOut},
behaviour::{self, Behaviour, BehaviourOut},
config::{parse_str_addr, NonReservedPeerMode, Params, Role, TransportConfig},
DhtEvent,
discovery::DiscoveryConfig,
@@ -42,7 +42,7 @@ use crate::{
protocol::{self, event::Event, NotifsHandlerError, LegacyConnectionKillError, NotificationsSink, Ready, sync::SyncState, PeerInfo, Protocol},
transport, ReputationChange,
};
use futures::prelude::*;
use futures::{channel::oneshot, prelude::*};
use libp2p::{PeerId, multiaddr, Multiaddr};
use libp2p::core::{ConnectedPoint, Executor, connection::{ConnectionError, PendingConnectionError}, either::EitherError};
use libp2p::kad::record;
@@ -76,6 +76,9 @@ use std::{
},
task::Poll,
};
use wasm_timer::Instant;
pub use behaviour::{ResponseFailure, InboundFailure, RequestFailure, OutboundFailure};
mod out_events;
#[cfg(test)]
@@ -309,16 +312,28 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
config
};
let mut behaviour = Behaviour::new(
protocol,
params.role,
user_agent,
local_public,
block_requests,
finality_proof_requests,
light_client_handler,
discovery_config
);
let mut behaviour = {
let result = Behaviour::new(
protocol,
params.role,
user_agent,
local_public,
block_requests,
finality_proof_requests,
light_client_handler,
discovery_config,
params.network_config.request_response_protocols,
);
match result {
Ok(b) => b,
Err(crate::request_responses::RegisterError::DuplicateProtocol(proto)) => {
return Err(Error::DuplicateRequestResponseProtocol {
protocol: proto,
})
},
}
};
for (engine_id, protocol_name) in &params.network_config.notifications_protocols {
behaviour.register_notifications_protocol(*engine_id, protocol_name.clone());
@@ -404,6 +419,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
peers_notifications_sinks,
metrics,
boot_node_ids,
pending_requests: HashMap::with_capacity(128),
})
}
@@ -752,12 +768,50 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
/// parameter is a `&'static str`, and not a `String`, in order to avoid accidentally having
/// an unbounded set of Prometheus metrics, which would be quite bad in terms of memory
pub fn event_stream(&self, name: &'static str) -> impl Stream<Item = Event> {
// Note: when transitioning to stable futures, remove the `Error` entirely
let (tx, rx) = out_events::channel(name);
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
rx
}
/// Sends a single targeted request to a specific peer. On success, returns the response of
/// the peer.
///
/// Request-response protocols are a way to complement notifications protocols, but
/// notifications should remain the default ways of communicating information. For example, a
/// peer can announce something through a notification, after which the recipient can obtain
/// more information by performing a request.
/// As such, this function is meant to be called only with peers we are already connected to.
/// Calling this method with a `target` we are not connected to will *not* attempt to connect
/// to said peer.
///
/// No limit or throttling of concurrent outbound requests per peer and protocol are enforced.
/// Such restrictions, if desired, need to be enforced at the call site(s).
///
/// The protocol must have been registered through
/// [`NetworkConfiguration::request_response_protocols`].
pub async fn request(
&self,
target: PeerId,
protocol: impl Into<Cow<'static, str>>,
request: Vec<u8>
) -> Result<Vec<u8>, RequestFailure> {
let (tx, rx) = oneshot::channel();
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request {
target,
protocol: protocol.into(),
request,
pending_response: tx
});
match rx.await {
Ok(v) => v,
// The channel can only be closed if the network worker no longer exists. If the
// network worker no longer exists, then all connections to `target` are necessarily
// closed, and we legitimately report this situation as a "ConnectionClosed".
Err(_) => Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)),
}
}
/// Registers a new notifications protocol.
///
/// After a protocol has been registered, you can call `write_notifications`.
@@ -1096,6 +1150,12 @@ enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> {
AddKnownAddress(PeerId, Multiaddr),
SyncFork(Vec<PeerId>, B::Hash, NumberFor<B>),
EventStream(out_events::Sender),
Request {
target: PeerId,
protocol: Cow<'static, str>,
request: Vec<u8>,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
RegisterNotifProtocol {
engine_id: ConsensusEngineId,
protocol_name: Cow<'static, [u8]>,
@@ -1132,6 +1192,13 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
metrics: Option<Metrics>,
/// The `PeerId`'s of all boot nodes.
boot_node_ids: Arc<HashSet<PeerId>>,
/// Requests started using [`NetworkService::request`]. Includes the channel to send back the
/// response, when the request has started, and the name of the protocol for diagnostic
/// purposes.
pending_requests: HashMap<
behaviour::RequestId,
(oneshot::Sender<Result<Vec<u8>, RequestFailure>>, Instant, String)
>,
/// For each peer and protocol combination, an object that allows sending notifications to
/// that peer. Shared with the [`NetworkService`].
peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, ConsensusEngineId), NotificationsSink>>>,
@@ -1165,8 +1232,10 @@ struct Metrics {
peerset_num_requested: Gauge<U64>,
pending_connections: Gauge<U64>,
pending_connections_errors_total: CounterVec<U64>,
requests_in_total: HistogramVec,
requests_out_finished: HistogramVec,
requests_in_failure_total: CounterVec<U64>,
requests_in_success_total: HistogramVec,
requests_out_failure_total: CounterVec<U64>,
requests_out_success_total: HistogramVec,
requests_out_started_total: CounterVec<U64>,
}
@@ -1347,10 +1416,17 @@ impl Metrics {
),
&["reason"]
)?, registry)?,
requests_in_total: register(HistogramVec::new(
requests_in_failure_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_requests_in_failure_total",
"Total number of incoming requests that the node has failed to answer"
),
&["protocol", "reason"]
)?, registry)?,
requests_in_success_total: register(HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"sub_libp2p_requests_in_total",
"sub_libp2p_requests_in_success_total",
"Total number of requests received and answered"
),
buckets: prometheus_endpoint::exponential_buckets(0.001, 2.0, 16)
@@ -1358,11 +1434,18 @@ impl Metrics {
},
&["protocol"]
)?, registry)?,
requests_out_finished: register(HistogramVec::new(
requests_out_failure_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_requests_out_failure_total",
"Total number of requests that have failed"
),
&["protocol", "reason"]
)?, registry)?,
requests_out_success_total: register(HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"sub_libp2p_requests_out_finished",
"Time between a request's start and finish (successful or not)"
"sub_libp2p_requests_out_success_total",
"For successful requests, time between a request's start and finish"
),
buckets: prometheus_endpoint::exponential_buckets(0.001, 2.0, 16)
.expect("parameters are always valid values; qed"),
@@ -1446,6 +1529,31 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
this.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number),
ServiceToWorkerMsg::EventStream(sender) =>
this.event_streams.push(sender),
ServiceToWorkerMsg::Request { target, protocol, request, pending_response } => {
// Calling `send_request` can fail immediately in some circumstances.
// This is handled by sending back an error on the channel.
match this.network_service.send_request(&target, &protocol, request) {
Ok(request_id) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.requests_out_started_total
.with_label_values(&[&protocol])
.inc();
}
this.pending_requests.insert(
request_id,
(pending_response, Instant::now(), protocol.to_string())
);
},
Err(behaviour::SendRequestError::NotConnected) => {
let err = RequestFailure::Network(OutboundFailure::ConnectionClosed);
let _ = pending_response.send(Err(err));
},
Err(behaviour::SendRequestError::UnknownProtocol) => {
let err = RequestFailure::Network(OutboundFailure::UnsupportedProtocols);
let _ = pending_response.send(Err(err));
},
}
},
ServiceToWorkerMsg::RegisterNotifProtocol { engine_id, protocol_name } => {
this.network_service
.register_notifications_protocol(engine_id, protocol_name);
@@ -1494,23 +1602,72 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
}
this.import_queue.import_finality_proof(origin, hash, nb, proof);
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::AnsweredRequest { protocol, build_time, .. })) => {
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. })) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.requests_in_total
.with_label_values(&[&protocol])
.observe(build_time.as_secs_f64());
match result {
Ok(serve_time) => {
metrics.requests_in_success_total
.with_label_values(&[&protocol])
.observe(serve_time.as_secs_f64());
}
Err(err) => {
let reason = match err {
ResponseFailure::Busy => "busy",
ResponseFailure::Network(InboundFailure::Timeout) => "timeout",
ResponseFailure::Network(InboundFailure::UnsupportedProtocols) =>
"unsupported",
};
metrics.requests_in_failure_total
.with_label_values(&[&protocol, reason])
.inc();
}
}
}
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestStarted { protocol, .. })) => {
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestFinished { request_id, result })) => {
if let Some((send_back, started, protocol)) = this.pending_requests.remove(&request_id) {
if let Some(metrics) = this.metrics.as_ref() {
match &result {
Ok(_) => {
metrics.requests_out_success_total
.with_label_values(&[&protocol])
.observe(started.elapsed().as_secs_f64());
}
Err(err) => {
let reason = match err {
RequestFailure::Refused => "refused",
RequestFailure::Network(OutboundFailure::DialFailure) =>
"dial-failure",
RequestFailure::Network(OutboundFailure::Timeout) =>
"timeout",
RequestFailure::Network(OutboundFailure::ConnectionClosed) =>
"connection-closed",
RequestFailure::Network(OutboundFailure::UnsupportedProtocols) =>
"unsupported",
};
metrics.requests_out_failure_total
.with_label_values(&[&protocol, reason])
.inc();
}
}
}
let _ = send_back.send(result);
} else {
error!("Request not in pending_requests");
}
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::OpaqueRequestStarted { protocol, .. })) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.requests_out_started_total
.with_label_values(&[&protocol])
.inc();
}
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestFinished { protocol, request_duration, .. })) => {
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::OpaqueRequestFinished { protocol, request_duration, .. })) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.requests_out_finished
metrics.requests_out_success_total
.with_label_values(&[&protocol])
.observe(request_duration.as_secs_f64());
}
@@ -1635,14 +1792,14 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
let reason = match cause {
Some(ConnectionError::IO(_)) => "transport-error",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::A(EitherError::B(
EitherError::A(PingFailure::Timeout))))))))) => "ping-timeout",
EitherError::A(EitherError::A(EitherError::A(EitherError::B(
EitherError::A(PingFailure::Timeout)))))))))) => "ping-timeout",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::A(EitherError::A(
NotifsHandlerError::Legacy(LegacyConnectionKillError))))))))) => "force-closed",
EitherError::A(EitherError::A(EitherError::A(EitherError::A(
NotifsHandlerError::Legacy(LegacyConnectionKillError)))))))))) => "force-closed",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::A(EitherError::A(
NotifsHandlerError::SyncNotificationsClogged)))))))) => "sync-notifications-clogged",
EitherError::A(EitherError::A(EitherError::A(EitherError::A(
NotifsHandlerError::SyncNotificationsClogged))))))))) => "sync-notifications-clogged",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(_))) => "protocol-error",
Some(ConnectionError::Handler(NodeHandlerWrapperError::KeepAliveTimeout)) => "keep-alive-timeout",
None => "actively-closed",
@@ -1800,7 +1957,7 @@ impl<B: BlockT + 'static, H: ExHashT> Unpin for NetworkWorker<B, H> {
/// Turns bytes that are potentially UTF-8 into a reasonable representable string.
///
/// Meant to be used only for debugging or metrics-reporting purposes.
fn maybe_utf8_bytes_to_string(id: &[u8]) -> Cow<str> {
pub(crate) fn maybe_utf8_bytes_to_string(id: &[u8]) -> Cow<str> {
if let Ok(s) = std::str::from_utf8(&id[..]) {
Cow::Borrowed(s)
} else {