Req/res optimization for statement distribution (#2803)

* Wip

* Increase proposer timeout.

* WIP.

* Better timeout values now that we are going to be connected to all nodes. (#2778)

* Better timeout values.

* Fix typo.

* Fix validator bandwidth.

* Fix compilation.

* Better and more consistent sizes.

Most importantly code size is now 5 Meg, which is the limit we currently
want to support in statement distribution.

* Introduce statement fetching request.

* WIP

* Statement cache retrieval logic.

* Review remarks by @rphmeier

* Fixes.

* Better requester logic.

* WIP: Handle requester messages.

* Missing dep.

* Fix request launching logic.

* Finish fetching logic.

* Sending logic.

* Redo code size calculations.

Now that max code size is compressed size.

* Update Cargo.lock (new dep)

* Get request receiver to statement distribution.

* Expose new functionality for responding to requests.

* Cleanup.

* Responder logic.

* Fixes + Cleanup.

* Cargo.lock

* Whitespace.

* Add lost copyright.

* Launch responder task.

* Typo.

* info -> warn

* Typo.

* Fix.

* Fix.

* Update comment.

* Doc fix.

* Better large statement heuristics.

* Fix tests.

* Fix network bridge tests.

* Add test for size estimate.

* Very simple tests that checks we get LargeStatement.

* Basic check, that fetching of large candidates is performed.

* More tests.

* Basic metrics for responder.

* More metrics.

* Use Encode::encoded_size().

* Some useful spans.

* Get rid of redundant metrics.

* Don't add peer on duplicate.

* Properly check hash

instead of relying on signatures alone.

* Preserve ordering + better flood protection.

* Get rid of redundant clone.

* Don't shutdown responder on failed query.

And add test for this.

* Smaller fixes.

* Quotes.

* Better queue size calculation.

* A bit saner response sizes.

* Fixes.
This commit is contained in:
Robert Klotzner
2021-04-09 23:30:12 +02:00
committed by GitHub
parent 69bd6d8ef2
commit 305375e1e4
19 changed files with 1711 additions and 190 deletions
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,230 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Large statement requesting background task logic.
use std::time::Duration;
use futures::{SinkExt, channel::{mpsc, oneshot}};
use polkadot_node_network_protocol::{
PeerId, UnifiedReputationChange,
request_response::{
OutgoingRequest, Recipient, Requests,
v1::{
StatementFetchingRequest, StatementFetchingResponse
}
}};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_primitives::v1::{CandidateHash, CommittedCandidateReceipt, Hash};
use polkadot_subsystem::{Span, Stage};
use crate::{LOG_TARGET, Metrics, COST_WRONG_HASH};
// In case we failed fetching from our known peers, how long we should wait before attempting a
// retry, even though we have not yet discovered any new peers. Or in other words how long to
// wait before retrying peers that already failed.
const RETRY_TIMEOUT: Duration = Duration::from_millis(500);
/// Messages coming from a background task.
pub enum RequesterMessage {
/// Get an update of availble peers to try for fetching a given statement.
GetMorePeers {
relay_parent: Hash,
candidate_hash: CandidateHash,
tx: oneshot::Sender<Vec<PeerId>>
},
/// Fetching finished, ask for verification. If verification failes, task will continue asking
/// peers for data.
Finished {
/// Relay parent this candidate is in the context of.
relay_parent: Hash,
/// The candidate we fetched data for.
candidate_hash: CandidateHash,
/// Data was fetched from this peer.
from_peer: PeerId,
/// Response we received from above peer.
response: CommittedCandidateReceipt,
/// Peers which failed providing the data.
bad_peers: Vec<PeerId>,
},
/// Report a peer which behaved worse than just not providing data:
ReportPeer(PeerId, UnifiedReputationChange),
/// Ask subsystem to send a request for us.
SendRequest(Requests),
}
/// A fetching task, taking care of fetching large statements via request/response.
///
/// A fetch task does not know about a particular `Statement` instead it just tries fetching a
/// `CommittedCandidateReceipt` from peers, whether or not this can be used to re-assemble one ore
/// many `SignedFullStatement`s needs to be verified by the caller.
pub async fn fetch(
relay_parent: Hash,
candidate_hash: CandidateHash,
peers: Vec<PeerId>,
mut sender: mpsc::Sender<RequesterMessage>,
metrics: Metrics,
) {
let span = Span::new(candidate_hash, "fetch-large-statement")
.with_relay_parent(relay_parent)
.with_stage(Stage::StatementDistribution);
// Peers we already tried (and failed).
let mut tried_peers = Vec::new();
// Peers left for trying out.
let mut new_peers = peers;
let req = StatementFetchingRequest {
relay_parent,
candidate_hash,
};
// We retry endlessly (with sleep periods), and rely on the subsystem to kill us eventually.
loop {
let span = span.child("try-available-peers");
while let Some(peer) = new_peers.pop() {
let _span = span.child("try-peer")
.with_peer_id(&peer);
let (outgoing, pending_response) = OutgoingRequest::new(
Recipient::Peer(peer),
req.clone(),
);
if let Err(err) = sender.feed(
RequesterMessage::SendRequest(Requests::StatementFetching(outgoing))
).await {
tracing::info!(
target: LOG_TARGET,
?err,
"Sending request failed, node might be shutting down - exiting."
);
return
}
metrics.on_sent_request();
match pending_response.await {
Ok(StatementFetchingResponse::Statement(statement)) => {
if statement.hash() != candidate_hash {
metrics.on_received_response(false);
if let Err(err) = sender.feed(
RequesterMessage::ReportPeer(peer, COST_WRONG_HASH)
).await {
tracing::warn!(
target: LOG_TARGET,
?err,
"Sending reputation change failed: This should not happen."
);
}
// We want to get rid of this peer:
continue
}
if let Err(err) = sender.feed(
RequesterMessage::Finished {
relay_parent,
candidate_hash,
from_peer: peer,
response: statement,
bad_peers: tried_peers,
}
).await {
tracing::warn!(
target: LOG_TARGET,
?err,
"Sending task response failed: This should not happen."
);
}
metrics.on_received_response(true);
// We are done now.
return
},
Err(err) => {
tracing::debug!(
target: LOG_TARGET,
?err,
"Receiving response failed with error - trying next peer."
);
metrics.on_received_response(false);
}
}
tried_peers.push(peer);
}
new_peers = std::mem::take(&mut tried_peers);
// All our peers failed us - try getting new ones before trying again:
match try_get_new_peers(relay_parent, candidate_hash, &mut sender, &span).await {
Ok(Some(mut peers)) => {
// New arrivals will be tried first:
new_peers.append(&mut peers);
}
// No new peers, try the old ones again (if we have any):
Ok(None) => {
// Note: In case we don't have any more peers, we will just keep asking for new
// peers, which is exactly what we want.
},
Err(()) => return,
}
}
}
/// Try getting new peers from subsystem.
///
/// If there are non, we will return after a timeout with `None`.
async fn try_get_new_peers(
relay_parent: Hash,
candidate_hash: CandidateHash,
sender: &mut mpsc::Sender<RequesterMessage>,
span: &Span,
) -> Result<Option<Vec<PeerId>>, ()> {
let _span = span.child("wait-for-peers");
let (tx, rx) = oneshot::channel();
if let Err(err) = sender.send(
RequesterMessage::GetMorePeers { relay_parent, candidate_hash, tx }
).await {
tracing::debug!(
target: LOG_TARGET,
?err,
"Failed sending background task message, subsystem probably moved on."
);
return Err(())
}
match rx.timeout(RETRY_TIMEOUT).await.transpose() {
Err(_) => {
tracing::debug!(
target: LOG_TARGET,
"Failed fetching more peers."
);
Err(())
}
Ok(val) => Ok(val)
}
}
@@ -0,0 +1,169 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Large statement responding background task logic.
use futures::{SinkExt, StreamExt, channel::{mpsc, oneshot}, stream::FuturesUnordered};
use parity_scale_codec::Decode;
use polkadot_node_network_protocol::{
UnifiedReputationChange as Rep,
request_response::{
IncomingRequest, MAX_PARALLEL_STATEMENT_REQUESTS, request::OutgoingResponse,
v1::{
StatementFetchingRequest, StatementFetchingResponse
}
}
};
use polkadot_primitives::v1::{CandidateHash, CommittedCandidateReceipt, Hash};
use crate::LOG_TARGET;
const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
/// Messages coming from a background task.
pub enum ResponderMessage {
/// Get an update of availble peers to try for fetching a given statement.
GetData {
relay_parent: Hash,
candidate_hash: CandidateHash,
tx: oneshot::Sender<CommittedCandidateReceipt>
},
}
/// A fetching task, taking care of fetching large statements via request/response.
///
/// A fetch task does not know about a particular `Statement` instead it just tries fetching a
/// `CommittedCandidateReceipt` from peers, whether or not this can be used to re-assemble one ore
/// many `SignedFullStatement`s needs to be verified by the caller.
pub async fn respond(
mut receiver: mpsc::Receiver<sc_network::config::IncomingRequest>,
mut sender: mpsc::Sender<ResponderMessage>,
) {
let mut pending_out = FuturesUnordered::new();
loop {
// Ensure we are not handling too many requests in parallel.
// We do this for three reasons:
//
// 1. We want some requesters to have full data fast, rather then lots of them having them
// late, as each requester having the data will help distributing it.
// 2. If we take too long, the requests timing out will not yet have had any data sent,
// thus we wasted no bandwidth.
// 3. If the queue is full, requestes will get an immediate error instead of running in a
// timeout, thus requesters can immediately try another peer and be faster.
//
// From this perspective we would not want parallel response sending at all, but we don't
// want a single slow requester slowing everyone down, so we want some parallelism for that
// reason.
if pending_out.len() >= MAX_PARALLEL_STATEMENT_REQUESTS as usize {
// Wait for one to finish:
pending_out.next().await;
}
let raw = match receiver.next().await {
None => {
tracing::debug!(
target: LOG_TARGET,
"Shutting down request responder"
);
return
}
Some(v) => v,
};
let sc_network::config::IncomingRequest {
payload,
peer,
pending_response,
} = raw;
let payload = match StatementFetchingRequest::decode(&mut payload.as_ref()) {
Err(err) => {
tracing::debug!(
target: LOG_TARGET,
?err,
"Decoding request failed"
);
report_peer(pending_response, COST_INVALID_REQUEST);
continue
}
Ok(payload) => payload,
};
let req = IncomingRequest::new(
peer,
payload,
pending_response
);
let (tx, rx) = oneshot::channel();
if let Err(err) = sender.feed(
ResponderMessage::GetData {
relay_parent: req.payload.relay_parent,
candidate_hash: req.payload.candidate_hash,
tx,
}
).await {
tracing::debug!(
target: LOG_TARGET,
?err,
"Shutting down responder"
);
return
}
let response = match rx.await {
Err(err) => {
tracing::debug!(
target: LOG_TARGET,
?err,
"Requested data not found."
);
Err(())
}
Ok(v) => Ok(StatementFetchingResponse::Statement(v)),
};
let (pending_sent_tx, pending_sent_rx) = oneshot::channel();
let response = OutgoingResponse {
result: response,
reputation_changes: Vec::new(),
sent_feedback: Some(pending_sent_tx),
};
pending_out.push(pending_sent_rx);
if let Err(_) = req.send_outgoing_response(response) {
tracing::debug!(
target: LOG_TARGET,
"Sending response failed"
);
}
}
}
/// Report peer who sent us a request.
fn report_peer(
tx: oneshot::Sender<sc_network::config::OutgoingResponse>,
rep: Rep,
) {
if let Err(_) = tx.send(sc_network::config::OutgoingResponse {
result: Err(()),
reputation_changes: vec![rep.into_base_rep()],
sent_feedback: None,
}) {
tracing::debug!(
target: LOG_TARGET,
"Reporting peer failed."
);
}
}