Request based collation fetching (#2621)

* Introduce collation fetching protocol

also move to mod.rs

* Allow `PeerId`s in requests to network bridge.

* Fix availability distribution tests.

* Move CompressedPoV to primitives.

* Request based collator protocol: validator side

- Missing: tests
- Collator side
- don't connect, if not connected

* Fixes.

* Basic request based collator side.

* Minor fix on collator side.

* Don't connect in requests in collation protocol.

Also some cleanup.

* Fix PoV distribution

* Bump substrate

* Add back metrics + whitespace fixes.

* Add back missing spans.

* More cleanup.

* Guide update.

* Fix tests

* Handle results in tests.

* Fix weird compilation issue.

* Add missing )

* Get rid of dead code.

* Get rid of redundant import.

* Fix runtime build.

* Cleanup.

* Fix wasm build.

* Format fixes.

Thanks @andronik !
This commit is contained in:
Robert Klotzner
2021-03-18 09:06:36 +01:00
committed by GitHub
parent f33f6badac
commit 503e2b74f9
24 changed files with 576 additions and 737 deletions
@@ -21,7 +21,9 @@ use super::{LOG_TARGET, Result};
use futures::{select, FutureExt, channel::oneshot};
use polkadot_primitives::v1::{
CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, PoV, ValidatorId, CandidateHash,
CandidateHash, CandidateReceipt, CollatorId, CompressedPoV, CoreIndex,
CoreState, Hash, Id as ParaId,
PoV, ValidatorId
};
use polkadot_subsystem::{
jaeger, PerLeafSpan,
@@ -29,7 +31,9 @@ use polkadot_subsystem::{
messages::{AllMessages, CollatorProtocolMessage, NetworkBridgeMessage, NetworkBridgeEvent},
};
use polkadot_node_network_protocol::{
peer_set::PeerSet, v1 as protocol_v1, View, PeerId, RequestId, OurView,
OurView, PeerId, View, peer_set::PeerSet,
request_response::{IncomingRequest, v1::{CollationFetchingRequest, CollationFetchingResponse}},
v1 as protocol_v1
};
use polkadot_node_subsystem_util::{
validator_discovery,
@@ -562,25 +566,61 @@ async fn process_msg(
);
}
},
CollationFetchingRequest(incoming) => {
let _span = state.span_per_relay_parent.get(&incoming.payload.relay_parent).map(|s| s.child("request-collation"));
match state.collating_on {
Some(our_para_id) => {
if our_para_id == incoming.payload.para_id {
let (receipt, pov) = if let Some(collation) = state.collations.get_mut(&incoming.payload.relay_parent) {
collation.status.advance_to_requested();
(collation.receipt.clone(), collation.pov.clone())
} else {
tracing::warn!(
target: LOG_TARGET,
relay_parent = %incoming.payload.relay_parent,
"received a `RequestCollation` for a relay parent we don't have collation stored.",
);
return Ok(());
};
let _span = _span.as_ref().map(|s| s.child("sending"));
send_collation(state, incoming, receipt, pov).await;
} else {
tracing::warn!(
target: LOG_TARGET,
for_para_id = %incoming.payload.para_id,
our_para_id = %our_para_id,
"received a `CollationFetchingRequest` for unexpected para_id",
);
}
}
None => {
tracing::warn!(
target: LOG_TARGET,
for_para_id = %incoming.payload.para_id,
"received a `RequestCollation` while not collating on any para",
);
}
}
}
}
Ok(())
}
/// Issue a response to a previously requested collation.
#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(state, pov), fields(subsystem = LOG_TARGET))]
async fn send_collation(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
state: &mut State,
request_id: RequestId,
origin: PeerId,
request: IncomingRequest<CollationFetchingRequest>,
receipt: CandidateReceipt,
pov: PoV,
) {
let pov = match protocol_v1::CompressedPoV::compress(&pov) {
let pov = match CompressedPoV::compress(&pov) {
Ok(pov) => pov,
Err(error) => {
tracing::debug!(
tracing::error!(
target: LOG_TARGET,
error = ?error,
"Failed to create `CompressedPov`",
@@ -589,22 +629,18 @@ async fn send_collation(
}
};
let wire_message = protocol_v1::CollatorProtocolMessage::Collation(request_id, receipt, pov);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
vec![origin],
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
)).await;
if let Err(_) = request.send_response(CollationFetchingResponse::Collation(receipt, pov)) {
tracing::warn!(
target: LOG_TARGET,
"Sending collation response failed",
);
}
state.metrics.on_collation_sent();
}
/// A networking messages switch.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(state), fields(subsystem = LOG_TARGET))]
async fn handle_incoming_peer_message(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
state: &mut State,
origin: PeerId,
msg: protocol_v1::CollatorProtocolMessage,
@@ -624,50 +660,6 @@ async fn handle_incoming_peer_message(
"AdvertiseCollation message is not expected on the collator side of the protocol",
);
}
RequestCollation(request_id, relay_parent, para_id) => {
let _span = state.span_per_relay_parent.get(&relay_parent).map(|s| s.child("request-collation"));
match state.collating_on {
Some(our_para_id) => {
if our_para_id == para_id {
let (receipt, pov) = if let Some(collation) = state.collations.get_mut(&relay_parent) {
collation.status.advance_to_requested();
(collation.receipt.clone(), collation.pov.clone())
} else {
tracing::warn!(
target: LOG_TARGET,
relay_parent = %relay_parent,
"received a `RequestCollation` for a relay parent we don't have collation stored.",
);
return Ok(());
};
let _span = _span.as_ref().map(|s| s.child("sending"));
send_collation(ctx, state, request_id, origin, receipt, pov).await;
} else {
tracing::warn!(
target: LOG_TARGET,
for_para_id = %para_id,
our_para_id = %our_para_id,
"received a `RequestCollation` for unexpected para_id",
);
}
}
None => {
tracing::warn!(
target: LOG_TARGET,
for_para_id = %para_id,
"received a `RequestCollation` while not collating on any para",
);
}
}
}
Collation(_, _, _) => {
tracing::warn!(
target: LOG_TARGET,
"Collation message is not expected on the collator side of the protocol",
);
}
CollationSeconded(statement) => {
if !matches!(statement.payload(), Statement::Seconded(_)) {
tracing::warn!(
@@ -759,7 +751,7 @@ async fn handle_network_msg(
handle_our_view_change(state, view).await?;
}
PeerMessage(remote, msg) => {
handle_incoming_peer_message(ctx, state, remote, msg).await?;
handle_incoming_peer_message(state, remote, msg).await?;
}
}
@@ -861,7 +853,7 @@ mod tests {
use assert_matches::assert_matches;
use futures::{executor, future, Future, channel::mpsc};
use sp_core::crypto::Pair;
use sp_core::{crypto::Pair, Decode};
use sp_keyring::Sr25519Keyring;
use polkadot_primitives::v1::{
@@ -872,7 +864,11 @@ mod tests {
use polkadot_subsystem::{ActiveLeavesUpdate, messages::{RuntimeApiMessage, RuntimeApiRequest}, jaeger};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_node_network_protocol::{view, our_view};
use polkadot_node_network_protocol::{
our_view,
view,
request_response::request::IncomingRequest,
};
#[derive(Default)]
struct TestCandidateBuilder {
@@ -1380,41 +1376,33 @@ mod tests {
// advertise it.
expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await;
let request_id = 42;
// Request a collation.
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer.clone(),
protocol_v1::CollatorProtocolMessage::RequestCollation(
request_id,
test_state.relay_parent,
test_state.para_id,
)
CollatorProtocolMessage::CollationFetchingRequest(
IncomingRequest::new(
peer,
CollationFetchingRequest {
relay_parent: test_state.relay_parent,
para_id: test_state.para_id,
},
tx,
)
)
).await;
// Wait for the reply.
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
to,
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
rx.await,
Ok(full_response) => {
let CollationFetchingResponse::Collation(receipt, pov): CollationFetchingResponse
= CollationFetchingResponse::decode(
&mut full_response.result
.expect("We should have a proper answer").as_ref()
)
) => {
assert_eq!(to, vec![peer]);
assert_matches!(
wire_message,
protocol_v1::CollatorProtocolMessage::Collation(req_id, receipt, pov) => {
assert_eq!(req_id, request_id);
assert_eq!(receipt, candidate);
assert_eq!(pov.decompress().unwrap(), pov_block);
}
);
.expect("Decoding should work");
assert_eq!(receipt, candidate);
assert_eq!(pov.decompress().unwrap(), pov_block);
}
);
@@ -1424,19 +1412,25 @@ mod tests {
let peer = test_state.validator_peer_id[2].clone();
// Re-request a collation.
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer.clone(),
protocol_v1::CollatorProtocolMessage::RequestCollation(
43,
old_relay_parent,
test_state.para_id,
)
CollatorProtocolMessage::CollationFetchingRequest(
IncomingRequest::new(
peer,
CollationFetchingRequest {
relay_parent: old_relay_parent,
para_id: test_state.para_id,
},
tx,
)
)
).await;
// Re-requesting collation should fail:
assert_matches!(
rx.await,
Err(_) => {}
);
assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none());
@@ -20,7 +20,6 @@
#![deny(missing_docs, unused_crate_dependencies)]
#![recursion_limit="256"]
use std::time::Duration;
use futures::{channel::oneshot, FutureExt, TryFutureExt};
use thiserror::Error;
@@ -44,7 +43,6 @@ mod collator_side;
mod validator_side;
const LOG_TARGET: &'static str = "parachain::collator-protocol";
const REQUEST_TIMEOUT: Duration = Duration::from_secs(1);
#[derive(Debug, Error)]
enum Error {
@@ -94,7 +92,6 @@ impl CollatorProtocolSubsystem {
match self.protocol_side {
ProtocolSide::Validator(metrics) => validator_side::run(
ctx,
REQUEST_TIMEOUT,
metrics,
).await,
ProtocolSide::Collator(id, metrics) => collator_side::run(
@@ -129,7 +126,7 @@ where
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn modify_reputation<Context>(ctx: &mut Context, peer: PeerId, rep: Rep)
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: SubsystemContext,
{
tracing::trace!(
target: LOG_TARGET,
@@ -14,15 +14,10 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::{collections::{HashMap, HashSet}, time::Duration, task::Poll, sync::Arc};
use std::{collections::{HashMap, HashSet}, sync::Arc, task::Poll};
use futures::{
StreamExt,
FutureExt,
channel::oneshot,
future::BoxFuture,
stream::FuturesUnordered,
};
use futures::{FutureExt, channel::oneshot, future::{Fuse, FusedFuture, BoxFuture}};
use always_assert::never;
use polkadot_primitives::v1::{
Id as ParaId, CandidateReceipt, CollatorId, Hash, PoV,
@@ -32,18 +27,25 @@ use polkadot_subsystem::{
FromOverseer, OverseerSignal, SubsystemContext,
messages::{
AllMessages, CandidateSelectionMessage, CollatorProtocolMessage, NetworkBridgeMessage,
NetworkBridgeEvent,
NetworkBridgeEvent, IfDisconnected,
},
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, View, OurView, PeerId, RequestId, UnifiedReputationChange as Rep,
OurView, PeerId, UnifiedReputationChange as Rep, View,
request_response::{OutgoingRequest, Requests, request::{Recipient, RequestError}}, v1 as protocol_v1
};
use polkadot_node_subsystem_util::{TimeoutExt as _, metrics::{self, prometheus}};
use polkadot_node_network_protocol::request_response::v1::{CollationFetchingRequest, CollationFetchingResponse};
use polkadot_node_network_protocol::request_response as req_res;
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_node_primitives::{Statement, SignedFullStatement};
use super::{modify_reputation, LOG_TARGET, Result};
const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message");
/// Message could not be decoded properly.
const COST_CORRUPTED_MESSAGE: Rep = Rep::CostMinor("Message was corrupt");
/// Network errors that originated at the remote host should have same cost as timeout.
const COST_NETWORK_ERROR: Rep = Rep::CostMinor("Some network error");
const COST_REQUEST_TIMED_OUT: Rep = Rep::CostMinor("A collation request has timed out");
const COST_REPORT_BAD: Rep = Rep::CostMajor("A collator was reported by another subsystem");
const BENEFIT_NOTIFY_GOOD: Rep = Rep::BenefitMinor("A collator was noted good by another subsystem");
@@ -51,6 +53,7 @@ const BENEFIT_NOTIFY_GOOD: Rep = Rep::BenefitMinor("A collator was noted good by
#[derive(Clone, Default)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_request(&self, succeeded: std::result::Result<(), ()>) {
if let Some(metrics) = &self.0 {
@@ -118,60 +121,13 @@ impl metrics::Metrics for Metrics {
}
}
#[derive(Debug)]
enum CollationRequestResult {
Received(RequestId),
Timeout(RequestId),
}
/// A Future representing an ongoing collation request.
/// It may timeout or end in a graceful fashion if a requested
/// collation has been received sucessfully or chain has moved on.
struct CollationRequest {
// The response for this request has been received successfully or
// chain has moved forward and this request is no longer relevant.
received: oneshot::Receiver<()>,
// The timeout of this request.
timeout: Duration,
// The id of this request.
request_id: RequestId,
// A jaeger span corresponding to the lifetime of the request.
span: Option<jaeger::Span>,
}
impl CollationRequest {
async fn wait(self) -> CollationRequestResult {
use CollationRequestResult::*;
let CollationRequest {
received,
timeout,
request_id,
mut span,
} = self;
match received.timeout(timeout).await {
None => {
span.as_mut().map(|s| s.add_string_tag("success", "false"));
Timeout(request_id)
}
Some(_) => {
span.as_mut().map(|s| s.add_string_tag("success", "true"));
Received(request_id)
}
}
}
}
struct PerRequest {
// The sender side to signal the `CollationRequest` to resolve successfully.
received: oneshot::Sender<()>,
// Send result here.
result: oneshot::Sender<(CandidateReceipt, PoV)>,
/// Responses from collator.
from_collator: Fuse<BoxFuture<'static, req_res::OutgoingResult<CollationFetchingResponse>>>,
/// Sender to forward to initial requester.
to_requester: oneshot::Sender<(CandidateReceipt, PoV)>,
/// A jaeger span corresponding to the lifetime of the request.
span: Option<jaeger::Span>,
}
/// All state relevant for the validator side of the protocol lives here.
@@ -190,31 +146,12 @@ struct State {
/// per collator per source per relay-parent.
advertisements: HashMap<PeerId, HashSet<(ParaId, Hash)>>,
/// Derive RequestIds from this.
next_request_id: RequestId,
/// The collations we have requested by relay parent and para id.
///
/// For each relay parent and para id we may be connected to a number
/// of collators each of those may have advertised a different collation.
/// So we group such cases here.
requested_collations: HashMap<(Hash, ParaId, PeerId), RequestId>,
/// Housekeeping handles we need to have per request to:
/// - cancel ongoing requests
/// - reply with collations to other subsystems.
requests_info: HashMap<RequestId, PerRequest>,
/// Collation requests that are currently in progress.
requests_in_progress: FuturesUnordered<BoxFuture<'static, CollationRequestResult>>,
/// Delay after which a collation request would time out.
request_timeout: Duration,
/// Leaves have recently moved out of scope.
/// These are looked into when we receive previously requested collations that we
/// are no longer interested in.
recently_removed_heads: HashSet<Hash>,
requested_collations: HashMap<(Hash, ParaId, PeerId), PerRequest>,
/// Metrics.
metrics: Metrics,
@@ -336,92 +273,13 @@ async fn handle_peer_view_change(
advertisements.retain(|(_, relay_parent)| !removed.contains(relay_parent));
}
let mut requests_to_cancel = Vec::new();
for removed in removed.into_iter() {
state.requested_collations.retain(|k, v| {
if k.0 == removed {
requests_to_cancel.push(*v);
false
} else {
true
}
});
}
for r in requests_to_cancel.into_iter() {
if let Some(per_request) = state.requests_info.remove(&r) {
per_request.received.send(()).map_err(|_| oneshot::Canceled)?;
}
state.requested_collations.retain(|k, _| k.0 != removed);
}
Ok(())
}
/// We have received a collation.
/// - Cancel all ongoing requests
/// - Reply to interested parties if any
/// - Store collation.
#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))]
async fn received_collation<Context>(
ctx: &mut Context,
state: &mut State,
origin: PeerId,
request_id: RequestId,
receipt: CandidateReceipt,
pov: protocol_v1::CompressedPoV,
)
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
let relay_parent = receipt.descriptor.relay_parent;
let para_id = receipt.descriptor.para_id;
if let Some(id) = state.requested_collations.remove(
&(relay_parent, para_id, origin.clone())
) {
if id == request_id {
if let Some(per_request) = state.requests_info.remove(&id) {
let _ = per_request.received.send(());
if state.known_collators.get(&origin).is_some() {
let pov = match pov.decompress() {
Ok(pov) => pov,
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
%request_id,
?error,
"Failed to extract PoV",
);
return;
}
};
let _span = jaeger::pov_span(&pov, "received-collation");
tracing::debug!(
target: LOG_TARGET,
%request_id,
?para_id,
?relay_parent,
candidate_hash = ?receipt.hash(),
"Received collation",
);
let _ = per_request.result.send((receipt.clone(), pov.clone()));
state.metrics.on_request(Ok(()));
}
}
}
} else {
// If this collation is not just a delayed one that we were expecting,
// but our view has moved on, in that case modify peer's reputation.
if !state.recently_removed_heads.contains(&relay_parent) {
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
}
}
}
/// Request a collation from the network.
/// This function will
/// - Check for duplicate requests.
@@ -452,7 +310,7 @@ where
}
if state.requested_collations.contains_key(&(relay_parent, para_id.clone(), peer_id.clone())) {
tracing::trace!(
tracing::warn!(
target: LOG_TARGET,
peer_id = %peer_id,
%para_id,
@@ -462,54 +320,37 @@ where
return;
}
let request_id = state.next_request_id;
state.next_request_id += 1;
let (tx, rx) = oneshot::channel();
let (full_request, response_recv) =
OutgoingRequest::new(Recipient::Peer(peer_id), CollationFetchingRequest {
relay_parent,
para_id,
});
let requests = Requests::CollationFetching(full_request);
let per_request = PerRequest {
received: tx,
result,
};
let request = CollationRequest {
received: rx,
timeout: state.request_timeout,
request_id,
from_collator: response_recv.boxed().fuse(),
to_requester: result,
span: state.span_per_relay_parent.get(&relay_parent).map(|s| {
s.child_builder("collation-request")
.with_para_id(para_id)
.build()
}),
};
state.requested_collations.insert((relay_parent, para_id.clone(), peer_id.clone()), request_id);
state.requests_info.insert(request_id, per_request);
state.requests_in_progress.push(request.wait().boxed());
state.requested_collations.insert((relay_parent, para_id.clone(), peer_id.clone()), per_request);
tracing::debug!(
target: LOG_TARGET,
peer_id = %peer_id,
%para_id,
%request_id,
?relay_parent,
"Requesting collation",
);
let wire_message = protocol_v1::CollatorProtocolMessage::RequestCollation(
request_id,
relay_parent,
para_id,
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
vec![peer_id],
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
)).await;
NetworkBridgeMessage::SendRequests(vec![requests], IfDisconnected::ImmediateError))
).await;
}
/// Notify `CandidateSelectionSubsystem` that a collation has been advertised.
@@ -564,16 +405,12 @@ where
);
}
}
RequestCollation(_, _, _) => {
// This is a validator side of the protocol, collation requests are not expected here.
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
}
Collation(request_id, receipt, pov) => {
let _span = state.span_per_relay_parent.get(&receipt.descriptor.relay_parent)
.map(|s| s.child("received-collation"));
received_collation(ctx, state, origin, request_id, receipt, pov).await;
}
CollationSeconded(_) => {
tracing::warn!(
target: LOG_TARGET,
peer_id = ?origin,
"Unexpected `CollationSeconded` message, decreasing reputation",
);
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
}
}
@@ -587,21 +424,9 @@ async fn remove_relay_parent(
state: &mut State,
relay_parent: Hash,
) -> Result<()> {
let mut remove_these = Vec::new();
state.requested_collations.retain(|k, v| {
if k.0 == relay_parent {
remove_these.push(*v);
}
state.requested_collations.retain(|k, _| {
k.0 != relay_parent
});
for id in remove_these.into_iter() {
if let Some(info) = state.requests_info.remove(&id) {
info.received.send(()).map_err(|_| oneshot::Canceled)?;
}
}
Ok(())
}
@@ -628,11 +453,7 @@ async fn handle_our_view_change(
.cloned()
.collect::<Vec<_>>();
// Update the set of recently removed chain heads.
state.recently_removed_heads.clear();
for removed in removed.into_iter() {
state.recently_removed_heads.insert(removed.clone());
remove_relay_parent(state, removed).await?;
state.span_per_relay_parent.remove(&removed);
}
@@ -640,30 +461,6 @@ async fn handle_our_view_change(
Ok(())
}
/// A request has timed out.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn request_timed_out<Context>(
ctx: &mut Context,
state: &mut State,
id: RequestId,
)
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
state.metrics.on_request(Err(()));
// We have to go backwards in the map, again.
if let Some(key) = find_val_in_map(&state.requested_collations, &id) {
if let Some(_) = state.requested_collations.remove(&key) {
if let Some(_) = state.requests_info.remove(&id) {
let peer_id = key.2;
modify_reputation(ctx, peer_id, COST_REQUEST_TIMED_OUT).await;
}
}
}
}
/// Bridge event switch.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_network_msg<Context>(
@@ -753,6 +550,12 @@ where
);
}
}
CollationFetchingRequest(_) => {
tracing::warn!(
target: LOG_TARGET,
"CollationFetchingRequest message is not expected on the validator side of the protocol",
);
}
}
}
@@ -760,7 +563,6 @@ where
#[tracing::instrument(skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
pub(crate) async fn run<Context>(
mut ctx: Context,
request_timeout: Duration,
metrics: Metrics,
) -> Result<()>
where
@@ -770,7 +572,6 @@ where
use OverseerSignal::*;
let mut state = State {
request_timeout,
metrics,
..Default::default()
};
@@ -789,46 +590,166 @@ where
continue;
}
while let Poll::Ready(Some(request)) = futures::poll!(state.requests_in_progress.next()) {
let _timer = state.metrics.time_handle_collation_request_result();
// Request has timed out, we need to penalize the collator and re-send the request
// if the chain has not moved on yet.
match request {
CollationRequestResult::Timeout(id) => {
tracing::debug!(target: LOG_TARGET, request_id=%id, "Collation timed out");
request_timed_out(&mut ctx, &mut state, id).await;
}
CollationRequestResult::Received(id) => {
state.requests_info.remove(&id);
}
let mut retained_requested = HashSet::new();
for ((hash, para_id, peer_id), per_req) in state.requested_collations.iter_mut() {
// Despite the await, this won't block:
let finished = poll_collation_response(
&mut ctx, &state.metrics, &state.span_per_relay_parent,
hash, para_id, peer_id, per_req
).await;
if !finished {
retained_requested.insert((*hash, *para_id, *peer_id));
}
}
state.requested_collations.retain(|k, _| retained_requested.contains(k));
futures::pending!();
}
Ok(())
}
fn find_val_in_map<K: Clone, V: Eq>(map: &HashMap<K, V>, val: &V) -> Option<K> {
map
.iter()
.find_map(|(k, v)| if v == val { Some(k.clone()) } else { None })
/// Poll collation response, return immediately if there is none.
///
/// Ready responses are handled, by logging and decreasing peer's reputation on error and by
/// forwarding proper responses to the requester.
///
/// Returns: `true` if `from_collator` future was ready.
async fn poll_collation_response<Context>(
ctx: &mut Context,
metrics: &Metrics,
spans: &HashMap<Hash, PerLeafSpan>,
hash: &Hash,
para_id: &ParaId,
peer_id: &PeerId,
per_req: &mut PerRequest
)
-> bool
where
Context: SubsystemContext
{
if never!(per_req.from_collator.is_terminated()) {
tracing::error!(
target: LOG_TARGET,
"We remove pending responses once received, this should not happen."
);
return true
}
if let Poll::Ready(response) = futures::poll!(&mut per_req.from_collator) {
let _span = spans.get(&hash)
.map(|s| s.child("received-collation"));
let _timer = metrics.time_handle_collation_request_result();
let mut metrics_result = Err(());
let mut success = "false";
match response {
Err(RequestError::InvalidResponse(err)) => {
tracing::warn!(
target: LOG_TARGET,
hash = ?hash,
para_id = ?para_id,
peer_id = ?peer_id,
err = ?err,
"Collator provided response that could not be decoded"
);
modify_reputation(ctx, *peer_id, COST_CORRUPTED_MESSAGE).await;
}
Err(RequestError::NetworkError(err)) => {
tracing::warn!(
target: LOG_TARGET,
hash = ?hash,
para_id = ?para_id,
peer_id = ?peer_id,
err = ?err,
"Fetching collation failed due to network error"
);
// A minor decrease in reputation for any network failure seems
// sensbile. In theory this could be exploited, by DoSing this node,
// which would result in reduced reputation for proper nodes, but the
// same can happen for penalities on timeouts, which we also have.
modify_reputation(ctx, *peer_id, COST_NETWORK_ERROR).await;
}
Err(RequestError::Canceled(_)) => {
tracing::warn!(
target: LOG_TARGET,
hash = ?hash,
para_id = ?para_id,
peer_id = ?peer_id,
"Request timed out"
);
// A minor decrease in reputation for any network failure seems
// sensbile. In theory this could be exploited, by DoSing this node,
// which would result in reduced reputation for proper nodes, but the
// same can happen for penalities on timeouts, which we also have.
modify_reputation(ctx, *peer_id, COST_REQUEST_TIMED_OUT).await;
}
Ok(CollationFetchingResponse::Collation(receipt, compressed_pov)) => {
match compressed_pov.decompress() {
Ok(pov) => {
tracing::debug!(
target: LOG_TARGET,
para_id = ?para_id,
hash = ?hash,
candidate_hash = ?receipt.hash(),
"Received collation",
);
// Actual sending:
let _span = jaeger::pov_span(&pov, "received-collation");
let (mut tx, _) = oneshot::channel();
std::mem::swap(&mut tx, &mut (per_req.to_requester));
let result = tx.send((receipt, pov));
if let Err(_) = result {
tracing::warn!(
target: LOG_TARGET,
hash = ?hash,
para_id = ?para_id,
peer_id = ?peer_id,
"Sending response back to requester failed (receiving side closed)"
);
} else {
metrics_result = Ok(());
success = "true";
}
}
Err(error) => {
tracing::warn!(
target: LOG_TARGET,
hash = ?hash,
para_id = ?para_id,
peer_id = ?peer_id,
?error,
"Failed to extract PoV",
);
modify_reputation(ctx, *peer_id, COST_CORRUPTED_MESSAGE).await;
}
};
}
};
metrics.on_request(metrics_result);
per_req.span.as_mut().map(|s| s.add_string_tag("success", success));
true
} else {
false
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::iter;
use std::{iter, time::Duration};
use futures::{executor, future, Future};
use sp_core::crypto::Pair;
use polkadot_node_subsystem_util::TimeoutExt;
use sp_core::{crypto::Pair, Encode};
use assert_matches::assert_matches;
use futures_timer::Delay;
use polkadot_primitives::v1::{BlockData, CollatorPair};
use polkadot_primitives::v1::{BlockData, CollatorPair, CompressedPoV};
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_node_network_protocol::our_view;
use polkadot_node_network_protocol::{our_view,
request_response::Requests
};
#[derive(Clone)]
struct TestState {
@@ -878,7 +799,7 @@ mod tests {
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
let subsystem = run(context, Duration::from_millis(50), Metrics::default());
let subsystem = run(context, Metrics::default());
let test_fut = test(TestHarness { virtual_overseer });
@@ -986,125 +907,6 @@ mod tests {
});
}
// Test that an issued request times out a number of times until our view moves on.
#[test]
fn collation_request_times_out() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
let TestHarness {
mut virtual_overseer,
} = test_harness;
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent])
)
).await;
let peer_b = PeerId::random();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
protocol_v1::CollatorProtocolMessage::Declare(
test_state.collators[0].public(),
),
)
)
).await;
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
protocol_v1::CollatorProtocolMessage::AdvertiseCollation(
test_state.relay_parent,
test_state.chain_ids[0],
)
)
)
).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::CandidateSelection(CandidateSelectionMessage::Collation(
relay_parent,
para_id,
collator,
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
assert_eq!(para_id, test_state.chain_ids[0]);
assert_eq!(collator, test_state.collators[0].public());
}
);
let (tx, _rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::FetchCollation(
test_state.relay_parent,
test_state.collators[0].public(),
test_state.chain_ids[0],
tx,
)
).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::SendCollationMessage(
peers,
protocol_v1::CollationProtocol::CollatorProtocol(
protocol_v1::CollatorProtocolMessage::RequestCollation(
_id,
relay_parent,
para_id,
)
)
)
) => {
assert_eq!(relay_parent, test_state.relay_parent);
assert_eq!(peers, vec![peer_b.clone()]);
assert_eq!(para_id, test_state.chain_ids[0]);
});
// Don't send a response and we shoud see reputation penalties to the
// collator.
Delay::new(Duration::from_millis(50)).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep)
) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, COST_REQUEST_TIMED_OUT);
}
);
// Deactivate the relay parent in question.
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(our_view![Hash::repeat_byte(0x42)])
)
).await;
// After we've deactivated it we are not expecting any more requests
// for timed out collations.
assert!(
overseer_recv_with_timeout(
&mut virtual_overseer,
Duration::from_secs(1),
).await.is_none()
);
});
}
// Test that other subsystems may modify collators' reputations.
#[test]
fn collator_reporting_works() {
@@ -1309,81 +1111,64 @@ mod tests {
)
).await;
let (request_id, peer_id) = assert_matches!(
let response_channel = assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::SendCollationMessage(
peers,
protocol_v1::CollationProtocol::CollatorProtocol(
protocol_v1::CollatorProtocolMessage::RequestCollation(
id,
relay_parent,
para_id,
)
)
)
AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError)
) => {
assert_eq!(relay_parent, test_state.relay_parent);
assert_eq!(para_id, test_state.chain_ids[0]);
(id, peers[0].clone())
let req = reqs.into_iter().next()
.expect("There should be exactly one request");
match req {
Requests::CollationFetching(req) => {
let payload = req.payload;
assert_eq!(payload.relay_parent, test_state.relay_parent);
assert_eq!(payload.para_id, test_state.chain_ids[0]);
req.pending_response
}
_ => panic!("Unexpected request"),
}
});
let mut candidate_a = CandidateReceipt::default();
candidate_a.descriptor.para_id = test_state.chain_ids[0];
candidate_a.descriptor.relay_parent = test_state.relay_parent;
response_channel.send(Ok(
CollationFetchingResponse::Collation(
candidate_a.clone(),
CompressedPoV::compress(&PoV {
block_data: BlockData(vec![]),
}).unwrap(),
).encode()
)).expect("Sending response should succeed");
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_id,
protocol_v1::CollatorProtocolMessage::Collation(
request_id,
candidate_a.clone(),
protocol_v1::CompressedPoV::compress(&PoV {
block_data: BlockData(vec![]),
}).unwrap(),
)
)
)
).await;
let (request_id, peer_id) = assert_matches!(
let response_channel = assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::SendCollationMessage(
peers,
protocol_v1::CollationProtocol::CollatorProtocol(
protocol_v1::CollatorProtocolMessage::RequestCollation(
id,
relay_parent,
para_id,
)
)
)
AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError)
) => {
assert_eq!(relay_parent, test_state.relay_parent);
assert_eq!(para_id, test_state.chain_ids[0]);
(id, peers[0].clone())
let req = reqs.into_iter().next()
.expect("There should be exactly one request");
match req {
Requests::CollationFetching(req) => {
let payload = req.payload;
assert_eq!(payload.relay_parent, test_state.relay_parent);
assert_eq!(payload.para_id, test_state.chain_ids[0]);
req.pending_response
}
_ => panic!("Unexpected request"),
}
});
let mut candidate_b = CandidateReceipt::default();
candidate_b.descriptor.para_id = test_state.chain_ids[0];
candidate_b.descriptor.relay_parent = test_state.relay_parent;
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_id,
protocol_v1::CollatorProtocolMessage::Collation(
request_id,
candidate_b.clone(),
protocol_v1::CompressedPoV::compress(&PoV {
block_data: BlockData(vec![1, 2, 3]),
}).unwrap(),
)
)
)
).await;
response_channel.send(Ok(
CollationFetchingResponse::Collation(
candidate_b.clone(),
CompressedPoV::compress(&PoV {
block_data: BlockData(vec![1, 2, 3]),
}).unwrap(),
).encode()
)).expect("Sending response should succeed");
let collation_0 = rx_0.await.unwrap();
let collation_1 = rx_1.await.unwrap();