Only send one collation per relay parent at a time to validators (#3360)

* Only send one collation per relay parent at a time to validators

This changes the way we are sending collations to validators. Before we
answered every collation request immediatley. Now we only answer one
pov request at a time per relay parent. This should bring down the
bandwidth requirements and should help parachains to include bigger
blocks more easily.

* Guide updates

* Review feedback.
This commit is contained in:
Bastian Köcher
2021-06-28 13:07:44 +02:00
committed by GitHub
parent 30ebd26558
commit 2ea1587e8d
3 changed files with 252 additions and 29 deletions
@@ -14,12 +14,15 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{HashMap, HashSet};
use std::{collections::{HashMap, HashSet, VecDeque}, pin::Pin};
use futures::{FutureExt, channel::oneshot};
use futures::{FutureExt, StreamExt, channel::oneshot, stream::FuturesUnordered, select, Future};
use sp_core::Pair;
use polkadot_primitives::v1::{AuthorityDiscoveryId, CandidateHash, CandidateReceipt, CollatorPair, CoreIndex, CoreState, GroupIndex, Hash, Id as ParaId};
use polkadot_primitives::v1::{
AuthorityDiscoveryId, CandidateHash, CandidateReceipt, CollatorPair, CoreIndex, CoreState, GroupIndex, Hash,
Id as ParaId,
};
use polkadot_subsystem::{
FromOverseer, OverseerSignal, PerLeafSpan, SubsystemContext, jaeger,
messages::{
@@ -27,13 +30,11 @@ use polkadot_subsystem::{
},
};
use polkadot_node_network_protocol::{
OurView, PeerId, View, peer_set::PeerSet,
OurView, PeerId, UnifiedReputationChange as Rep, View, peer_set::PeerSet,
request_response::{
IncomingRequest,
v1::{CollationFetchingRequest, CollationFetchingResponse},
IncomingRequest, request::OutgoingResponse, v1::{CollationFetchingRequest, CollationFetchingResponse}
},
v1 as protocol_v1,
UnifiedReputationChange as Rep,
};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
@@ -59,6 +60,12 @@ impl Metrics {
}
}
fn on_collation_sent_requested(&self) {
if let Some(metrics) = &self.0 {
metrics.collations_send_requested.inc();
}
}
fn on_collation_sent(&self) {
if let Some(metrics) = &self.0 {
metrics.collations_sent.inc();
@@ -75,6 +82,7 @@ impl Metrics {
struct MetricsInner {
advertisements_made: prometheus::Counter<prometheus::U64>,
collations_sent: prometheus::Counter<prometheus::U64>,
collations_send_requested: prometheus::Counter<prometheus::U64>,
process_msg: prometheus::Histogram,
}
@@ -90,6 +98,13 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
collations_send_requested: prometheus::register(
prometheus::Counter::new(
"parachain_collations_sent_requested_total",
"A number of collations requested to be sent to validators.",
)?,
registry,
)?,
collations_sent: prometheus::register(
prometheus::Counter::new(
"parachain_collations_sent_total",
@@ -185,6 +200,17 @@ struct Collation {
status: CollationStatus,
}
/// Stores the state for waiting collation fetches.
#[derive(Default)]
struct WaitingCollationFetches {
/// Is there currently a collation getting fetched?
collation_fetch_active: bool,
/// The collation fetches waiting to be fulfilled.
waiting: VecDeque<IncomingRequest<CollationFetchingRequest>>,
}
type ActiveCollationFetches = FuturesUnordered<Pin<Box<dyn Future<Output = Hash> + Send + 'static>>>;
struct State {
/// Our network peer id.
local_peer_id: PeerId,
@@ -217,11 +243,23 @@ struct State {
/// Our validator groups per active leaf.
our_validators_groups: HashMap<Hash, ValidatorGroup>,
/// The mapping from [`PeerId`] to [`ValidatorId`]. This is filled over time as we learn the [`PeerId`]'s by `PeerConnected` events.
/// The mapping from [`PeerId`] to [`ValidatorId`]. This is filled over time as we learn the [`PeerId`]'s
/// by `PeerConnected` events.
peer_ids: HashMap<PeerId, AuthorityDiscoveryId>,
/// Metrics.
metrics: Metrics,
/// All collation fetching requests that are still waiting to be answered.
///
/// They are stored per relay parent, when our view changes and the relay parent moves out, we will cancel the fetch
/// request.
waiting_collation_fetches: HashMap<Hash, WaitingCollationFetches>,
/// Active collation fetches.
///
/// Each future returns the relay parent of the finished collation fetch.
active_collation_fetches: ActiveCollationFetches,
}
impl State {
@@ -240,6 +278,8 @@ impl State {
collation_result_senders: Default::default(),
our_validators_groups: Default::default(),
peer_ids: Default::default(),
waiting_collation_fetches: Default::default(),
active_collation_fetches: Default::default(),
}
}
@@ -349,8 +389,9 @@ async fn distribute_collation(
state.collations.insert(relay_parent, Collation { receipt, pov, status: CollationStatus::Created });
let interested = state.peers_interested_in_leaf(&relay_parent);
// Make sure already connected peers get collations:
for peer_id in state.peers_interested_in_leaf(&relay_parent) {
for peer_id in interested {
advertise_collation(ctx, state, relay_parent, peer_id).await;
}
@@ -373,6 +414,7 @@ async fn determine_core(
}
}
}
Ok(None)
}
@@ -455,7 +497,7 @@ async fn declare(
async fn connect_to_validators(
ctx: &mut impl SubsystemContext,
validator_ids: Vec<AuthorityDiscoveryId>,
) {
) {
// ignore address resolution failure
// will reissue a new request on new collation
let (failed, _) = oneshot::channel();
@@ -607,8 +649,18 @@ async fn process_msg(
return Ok(());
};
state.metrics.on_collation_sent_requested();
let _span = _span.as_ref().map(|s| s.child("sending"));
send_collation(state, incoming, receipt, pov).await;
let waiting = state.waiting_collation_fetches.entry(incoming.payload.relay_parent).or_default();
if waiting.collation_fetch_active {
waiting.waiting.push_back(incoming);
} else {
waiting.collation_fetch_active = true;
send_collation(state, incoming, receipt, pov).await;
}
} else {
tracing::warn!(
target: LOG_TARGET,
@@ -640,12 +692,28 @@ async fn send_collation(
receipt: CandidateReceipt,
pov: PoV,
) {
if let Err(_) = request.send_response(CollationFetchingResponse::Collation(receipt, pov)) {
let (tx, rx) = oneshot::channel();
let relay_parent = request.payload.relay_parent;
let response = OutgoingResponse {
result: Ok(CollationFetchingResponse::Collation(receipt, pov)),
reputation_changes: Vec::new(),
sent_feedback: Some(tx),
};
if let Err(_) = request.send_outgoing_response(response) {
tracing::warn!(
target: LOG_TARGET,
"Sending collation response failed",
);
}
state.active_collation_fetches.push(async move {
let _ = rx.await;
relay_parent
}.boxed());
state.metrics.on_collation_sent();
}
@@ -840,6 +908,7 @@ async fn handle_our_view_change(
}
state.our_validators_groups.remove(removed);
state.span_per_relay_parent.remove(removed);
state.waiting_collation_fetches.remove(removed);
}
state.view = view;
@@ -861,17 +930,38 @@ pub(crate) async fn run(
let mut runtime = RuntimeInfo::new(None);
loop {
let msg = ctx.recv().fuse().await.map_err(Fatal::SubsystemReceive)?;
match msg {
Communication { msg } => {
log_error(
process_msg(&mut ctx, &mut runtime, &mut state, msg).await,
"Failed to process message"
)?;
select! {
msg = ctx.recv().fuse() => match msg.map_err(Fatal::SubsystemReceive)? {
Communication { msg } => {
log_error(
process_msg(&mut ctx, &mut runtime, &mut state, msg).await,
"Failed to process message"
)?;
},
Signal(ActiveLeaves(_update)) => {}
Signal(BlockFinalized(..)) => {}
Signal(Conclude) => return Ok(()),
},
Signal(ActiveLeaves(_update)) => {}
Signal(BlockFinalized(..)) => {}
Signal(Conclude) => return Ok(()),
relay_parent = state.active_collation_fetches.select_next_some() => {
let next = if let Some(waiting) = state.waiting_collation_fetches.get_mut(&relay_parent) {
if let Some(next) = waiting.waiting.pop_front() {
next
} else {
waiting.collation_fetch_active = false;
continue
}
} else {
// No waiting collation fetches means we already removed the relay parent from our view.
continue
};
if let Some(collation) = state.collations.get(&relay_parent) {
let receipt = collation.receipt.clone();
let pov = collation.pov.clone();
send_collation(&mut state, next, receipt, pov).await;
}
}
}
}
}
@@ -20,6 +20,7 @@ use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use futures::{executor, future, Future};
use futures_timer::Delay;
use sp_core::{crypto::Pair, Decode};
use sp_keyring::Sr25519Keyring;
@@ -31,7 +32,10 @@ use polkadot_node_network_protocol::{
request_response::request::IncomingRequest,
};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_primitives::v1::{AuthorityDiscoveryId, CandidateDescriptor, CollatorPair, GroupRotationInfo, ScheduledCore, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex};
use polkadot_primitives::v1::{
AuthorityDiscoveryId, CandidateDescriptor, CollatorPair, GroupRotationInfo, ScheduledCore, SessionIndex,
SessionInfo, ValidatorId, ValidatorIndex,
};
use polkadot_node_primitives::BlockData;
use polkadot_subsystem::{
jaeger,
@@ -196,6 +200,18 @@ fn test_harness<T: Future<Output = VirtualOverseer>>(
collator_pair: CollatorPair,
test: impl FnOnce(TestHarness) -> T,
) {
let _ = env_logger::builder()
.is_test(true)
.filter(
Some("polkadot_collator_protocol"),
log::LevelFilter::Trace,
)
.filter(
Some(LOG_TARGET),
log::LevelFilter::Trace,
)
.try_init();
let pool = sp_core::testing::TaskExecutor::new();
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
@@ -580,10 +596,7 @@ fn advertise_and_send_collation() {
)
).await;
// Re-requesting collation should fail:
assert_matches!(
rx.await,
Err(_) => {}
);
rx.await.unwrap_err();
assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none());
@@ -605,6 +618,126 @@ fn advertise_and_send_collation() {
});
}
#[test]
fn send_only_one_collation_per_relay_parent_at_a_time() {
let test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
setup_system(&mut virtual_overseer, &test_state).await;
let DistributeCollation { candidate, pov_block } =
distribute_collation(&mut virtual_overseer, &test_state, true).await;
for (val, peer) in test_state.current_group_validator_authority_ids()
.into_iter()
.zip(test_state.current_group_validator_peer_ids())
{
connect_peer(&mut virtual_overseer, peer.clone(), Some(val.clone())).await;
}
// We declare to the connected validators that we are a collator.
// We need to catch all `Declare` messages to the validators we've
// previosly connected to.
for peer_id in test_state.current_group_validator_peer_ids() {
expect_declare_msg(&mut virtual_overseer, &test_state, &peer_id).await;
}
let validator_0 = test_state.current_group_validator_peer_ids()[0].clone();
let validator_1 = test_state.current_group_validator_peer_ids()[1].clone();
// Send info about peer's view.
send_peer_view_change(&mut virtual_overseer, &validator_0, vec![test_state.relay_parent]).await;
send_peer_view_change(&mut virtual_overseer, &validator_1, vec![test_state.relay_parent]).await;
// The peer is interested in a leaf that we have a collation for;
// advertise it.
expect_advertise_collation_msg(&mut virtual_overseer, &validator_0, test_state.relay_parent).await;
expect_advertise_collation_msg(&mut virtual_overseer, &validator_1, test_state.relay_parent).await;
// Request a collation.
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::CollationFetchingRequest(
IncomingRequest::new(
validator_0,
CollationFetchingRequest {
relay_parent: test_state.relay_parent,
para_id: test_state.para_id,
},
tx,
)
)
).await;
// Keep the feedback channel alive because we need to use it to inform about the finished transfer.
let feedback_tx = assert_matches!(
rx.await,
Ok(full_response) => {
let CollationFetchingResponse::Collation(receipt, pov): CollationFetchingResponse
= CollationFetchingResponse::decode(
&mut full_response.result
.expect("We should have a proper answer").as_ref()
)
.expect("Decoding should work");
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_block);
full_response.sent_feedback.expect("Feedback channel is always set")
}
);
// Let the second validator request the collation.
let (tx, mut rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::CollationFetchingRequest(
IncomingRequest::new(
validator_1,
CollationFetchingRequest {
relay_parent: test_state.relay_parent,
para_id: test_state.para_id,
},
tx,
)
)
).await;
Delay::new(Duration::from_millis(500)).await;
assert!(
rx.try_recv().unwrap().is_none(),
"We should not have send the collation yet to the second validator",
);
// Signal that the collation fetch is finished
feedback_tx.send(()).expect("Sending collation fetch finished");
// Now we should send it to the second validator
assert_matches!(
rx.await,
Ok(full_response) => {
let CollationFetchingResponse::Collation(receipt, pov): CollationFetchingResponse
= CollationFetchingResponse::decode(
&mut full_response.result
.expect("We should have a proper answer").as_ref()
)
.expect("Decoding should work");
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_block);
full_response.sent_feedback.expect("Feedback channel is always set")
}
);
virtual_overseer
});
}
#[test]
fn collators_declare_to_connected_peers() {
let test_state = TestState::default();