diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index cb59e04104..015056f42a 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -555,9 +555,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitvec"
-version = "1.0.0"
+version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1489fcb93a5bb47da0462ca93ad252ad6af2145cce58d10d46a83931ba9f016b"
+checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c"
dependencies = [
"funty",
"radium",
@@ -6290,6 +6290,7 @@ version = "0.9.29"
dependencies = [
"always-assert",
"assert_matches",
+ "bitvec",
"env_logger 0.9.0",
"fatality",
"futures",
diff --git a/polkadot/node/network/collator-protocol/Cargo.toml b/polkadot/node/network/collator-protocol/Cargo.toml
index df9e75c9e9..e089719106 100644
--- a/polkadot/node/network/collator-protocol/Cargo.toml
+++ b/polkadot/node/network/collator-protocol/Cargo.toml
@@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
always-assert = "0.1.2"
+bitvec = { version = "1.0.1", default-features = false, features = ["alloc"] }
futures = "0.3.21"
futures-timer = "3"
gum = { package = "tracing-gum", path = "../../gum" }
diff --git a/polkadot/node/network/collator-protocol/src/collator_side/metrics.rs b/polkadot/node/network/collator-protocol/src/collator_side/metrics.rs
new file mode 100644
index 0000000000..85e00406b9
--- /dev/null
+++ b/polkadot/node/network/collator-protocol/src/collator_side/metrics.rs
@@ -0,0 +1,123 @@
+// Copyright 2017-2022 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+use polkadot_node_subsystem_util::metrics::{self, prometheus};
+
+#[derive(Clone, Default)]
+pub struct Metrics(Option);
+
+impl Metrics {
+ pub fn on_advertisment_made(&self) {
+ if let Some(metrics) = &self.0 {
+ metrics.advertisements_made.inc();
+ }
+ }
+
+ pub fn on_collation_sent_requested(&self) {
+ if let Some(metrics) = &self.0 {
+ metrics.collations_send_requested.inc();
+ }
+ }
+
+ pub fn on_collation_sent(&self) {
+ if let Some(metrics) = &self.0 {
+ metrics.collations_sent.inc();
+ }
+ }
+
+ /// Provide a timer for `process_msg` which observes on drop.
+ pub fn time_process_msg(&self) -> Option {
+ self.0.as_ref().map(|metrics| metrics.process_msg.start_timer())
+ }
+
+ /// Provide a timer for `distribute_collation` which observes on drop.
+ pub fn time_collation_distribution(
+ &self,
+ label: &'static str,
+ ) -> Option {
+ self.0.as_ref().map(|metrics| {
+ metrics.collation_distribution_time.with_label_values(&[label]).start_timer()
+ })
+ }
+}
+
+#[derive(Clone)]
+struct MetricsInner {
+ advertisements_made: prometheus::Counter,
+ collations_sent: prometheus::Counter,
+ collations_send_requested: prometheus::Counter,
+ process_msg: prometheus::Histogram,
+ collation_distribution_time: prometheus::HistogramVec,
+}
+
+impl metrics::Metrics for Metrics {
+ fn try_register(
+ registry: &prometheus::Registry,
+ ) -> std::result::Result {
+ let metrics = MetricsInner {
+ advertisements_made: prometheus::register(
+ prometheus::Counter::new(
+ "polkadot_parachain_collation_advertisements_made_total",
+ "A number of collation advertisements sent to validators.",
+ )?,
+ registry,
+ )?,
+ collations_send_requested: prometheus::register(
+ prometheus::Counter::new(
+ "polkadot_parachain_collations_sent_requested_total",
+ "A number of collations requested to be sent to validators.",
+ )?,
+ registry,
+ )?,
+ collations_sent: prometheus::register(
+ prometheus::Counter::new(
+ "polkadot_parachain_collations_sent_total",
+ "A number of collations sent to validators.",
+ )?,
+ registry,
+ )?,
+ process_msg: prometheus::register(
+ prometheus::Histogram::with_opts(
+ prometheus::HistogramOpts::new(
+ "polkadot_parachain_collator_protocol_collator_process_msg",
+ "Time spent within `collator_protocol_collator::process_msg`",
+ )
+ .buckets(vec![
+ 0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75,
+ 1.0,
+ ]),
+ )?,
+ registry,
+ )?,
+ collation_distribution_time: prometheus::register(
+ prometheus::HistogramVec::new(
+ prometheus::HistogramOpts::new(
+ "polkadot_parachain_collator_protocol_collator_distribution_time",
+ "Time spent within `collator_protocol_collator::distribute_collation`",
+ )
+ .buckets(vec![
+ 0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75,
+ 1.0,
+ ]),
+ &["state"],
+ )?,
+ registry,
+ )?,
+ };
+
+ Ok(Metrics(Some(metrics)))
+ }
+}
diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs
index c1a20a2a67..4f2eea2ca7 100644
--- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs
+++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs
@@ -17,7 +17,7 @@
use std::{
collections::{HashMap, HashSet, VecDeque},
pin::Pin,
- time::Duration,
+ time::{Duration, Instant},
};
use futures::{
@@ -44,19 +44,25 @@ use polkadot_node_subsystem::{
overseer, FromOrchestra, OverseerSignal, PerLeafSpan,
};
use polkadot_node_subsystem_util::{
- metrics::{self, prometheus},
runtime::{get_availability_cores, get_group_rotation_info, RuntimeInfo},
TimeoutExt,
};
use polkadot_primitives::v2::{
AuthorityDiscoveryId, CandidateHash, CandidateReceipt, CollatorPair, CoreIndex, CoreState,
- Hash, Id as ParaId,
+ GroupIndex, Hash, Id as ParaId, SessionIndex,
};
use super::LOG_TARGET;
use crate::error::{log_error, Error, FatalError, Result};
use fatality::Split;
+mod metrics;
+mod validators_buffer;
+
+use validators_buffer::{ValidatorGroupsBuffer, VALIDATORS_BUFFER_CAPACITY};
+
+pub use metrics::Metrics;
+
#[cfg(test)]
mod tests;
@@ -73,111 +79,16 @@ const COST_APPARENT_FLOOD: Rep =
/// For considerations on this value, see: https://github.com/paritytech/polkadot/issues/4386
const MAX_UNSHARED_UPLOAD_TIME: Duration = Duration::from_millis(150);
-#[derive(Clone, Default)]
-pub struct Metrics(Option);
+/// Ensure that collator issues a connection request at least once every this many seconds.
+/// Usually it's done when advertising new collation. However, if the core stays occupied or
+/// it's not our turn to produce a candidate, it's important to disconnect from previous
+/// peers.
+///
+/// Validators are obtained from [`ValidatorGroupsBuffer::validators_to_connect`].
+const RECONNECT_TIMEOUT: Duration = Duration::from_secs(12);
-impl Metrics {
- fn on_advertisment_made(&self) {
- if let Some(metrics) = &self.0 {
- metrics.advertisements_made.inc();
- }
- }
-
- fn on_collation_sent_requested(&self) {
- if let Some(metrics) = &self.0 {
- metrics.collations_send_requested.inc();
- }
- }
-
- fn on_collation_sent(&self) {
- if let Some(metrics) = &self.0 {
- metrics.collations_sent.inc();
- }
- }
-
- /// Provide a timer for `process_msg` which observes on drop.
- fn time_process_msg(&self) -> Option {
- self.0.as_ref().map(|metrics| metrics.process_msg.start_timer())
- }
-
- /// Provide a timer for `distribute_collation` which observes on drop.
- fn time_collation_distribution(
- &self,
- label: &'static str,
- ) -> Option {
- self.0.as_ref().map(|metrics| {
- metrics.collation_distribution_time.with_label_values(&[label]).start_timer()
- })
- }
-}
-
-#[derive(Clone)]
-struct MetricsInner {
- advertisements_made: prometheus::Counter,
- collations_sent: prometheus::Counter,
- collations_send_requested: prometheus::Counter,
- process_msg: prometheus::Histogram,
- collation_distribution_time: prometheus::HistogramVec,
-}
-
-impl metrics::Metrics for Metrics {
- fn try_register(
- registry: &prometheus::Registry,
- ) -> std::result::Result {
- let metrics = MetricsInner {
- advertisements_made: prometheus::register(
- prometheus::Counter::new(
- "polkadot_parachain_collation_advertisements_made_total",
- "A number of collation advertisements sent to validators.",
- )?,
- registry,
- )?,
- collations_send_requested: prometheus::register(
- prometheus::Counter::new(
- "polkadot_parachain_collations_sent_requested_total",
- "A number of collations requested to be sent to validators.",
- )?,
- registry,
- )?,
- collations_sent: prometheus::register(
- prometheus::Counter::new(
- "polkadot_parachain_collations_sent_total",
- "A number of collations sent to validators.",
- )?,
- registry,
- )?,
- process_msg: prometheus::register(
- prometheus::Histogram::with_opts(
- prometheus::HistogramOpts::new(
- "polkadot_parachain_collator_protocol_collator_process_msg",
- "Time spent within `collator_protocol_collator::process_msg`",
- )
- .buckets(vec![
- 0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75,
- 1.0,
- ]),
- )?,
- registry,
- )?,
- collation_distribution_time: prometheus::register(
- prometheus::HistogramVec::new(
- prometheus::HistogramOpts::new(
- "polkadot_parachain_collator_protocol_collator_distribution_time",
- "Time spent within `collator_protocol_collator::distribute_collation`",
- )
- .buckets(vec![
- 0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75,
- 1.0,
- ]),
- &["state"],
- )?,
- registry,
- )?,
- };
-
- Ok(Metrics(Some(metrics)))
- }
-}
+/// How often to check for reconnect timeout.
+const RECONNECT_POLL: Duration = Duration::from_secs(1);
/// Info about validators we are currently connected to.
///
@@ -269,8 +180,14 @@ struct WaitingCollationFetches {
waiting_peers: HashSet,
}
+struct CollationSendResult {
+ relay_parent: Hash,
+ peer_id: PeerId,
+ timed_out: bool,
+}
+
type ActiveCollationFetches =
- FuturesUnordered + Send + 'static>>>;
+ FuturesUnordered + Send + 'static>>>;
struct State {
/// Our network peer id.
@@ -308,6 +225,13 @@ struct State {
/// by `PeerConnected` events.
peer_ids: HashMap>,
+ /// Tracks which validators we want to stay connected to.
+ validator_groups_buf: ValidatorGroupsBuffer,
+
+ /// Timestamp of the last connection request to a non-empty list of validators,
+ /// `None` otherwise.
+ last_connected_at: Option,
+
/// Metrics.
metrics: Metrics,
@@ -339,6 +263,8 @@ impl State {
collation_result_senders: Default::default(),
our_validators_groups: Default::default(),
peer_ids: Default::default(),
+ validator_groups_buf: ValidatorGroupsBuffer::with_capacity(VALIDATORS_BUFFER_CAPACITY),
+ last_connected_at: None,
waiting_collation_fetches: Default::default(),
active_collation_fetches: Default::default(),
}
@@ -373,6 +299,7 @@ async fn distribute_collation(
result_sender: Option>,
) -> Result<()> {
let relay_parent = receipt.descriptor.relay_parent;
+ let candidate_hash = receipt.hash();
// This collation is not in the active-leaves set.
if !state.view.contains(&relay_parent) {
@@ -412,10 +339,10 @@ async fn distribute_collation(
};
// Determine the group on that core.
- let current_validators =
+ let GroupValidators { validators, session_index, group_index } =
determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?;
- if current_validators.validators.is_empty() {
+ if validators.is_empty() {
gum::warn!(
target: LOG_TARGET,
core = ?our_core,
@@ -425,24 +352,36 @@ async fn distribute_collation(
return Ok(())
}
+ // It's important to insert new collation bits **before**
+ // issuing a connection request.
+ //
+ // If a validator managed to fetch all the relevant collations
+ // but still assigned to our core, we keep the connection alive.
+ state.validator_groups_buf.note_collation_advertised(
+ relay_parent,
+ session_index,
+ group_index,
+ &validators,
+ );
+
gum::debug!(
target: LOG_TARGET,
para_id = %id,
relay_parent = %relay_parent,
- candidate_hash = ?receipt.hash(),
+ ?candidate_hash,
pov_hash = ?pov.hash(),
core = ?our_core,
- ?current_validators,
+ current_validators = ?validators,
"Accepted collation, connecting to validators."
);
- // Issue a discovery request for the validators of the current group:
- connect_to_validators(ctx, current_validators.validators.into_iter().collect()).await;
+ // Update a set of connected validators if necessary.
+ state.last_connected_at = connect_to_validators(ctx, &state.validator_groups_buf).await;
state.our_validators_groups.insert(relay_parent, ValidatorGroup::new());
if let Some(result_sender) = result_sender {
- state.collation_result_senders.insert(receipt.hash(), result_sender);
+ state.collation_result_senders.insert(candidate_hash, result_sender);
}
state
@@ -483,6 +422,9 @@ async fn determine_core(
struct GroupValidators {
/// The validators of above group (their discovery keys).
validators: Vec,
+
+ session_index: SessionIndex,
+ group_index: GroupIndex,
}
/// Figure out current group of validators assigned to the para being collated on.
@@ -516,7 +458,11 @@ async fn determine_our_validators(
let current_validators =
current_validators.iter().map(|i| validators[i.0 as usize].clone()).collect();
- let current_validators = GroupValidators { validators: current_validators };
+ let current_validators = GroupValidators {
+ validators: current_validators,
+ session_index,
+ group_index: current_group_index,
+ };
Ok(current_validators)
}
@@ -541,13 +487,19 @@ async fn declare(ctx: &mut Context, state: &mut State, peer: PeerId) {
}
}
-/// Issue a connection request to a set of validators and
-/// revoke the previous connection request.
+/// Updates a set of connected validators based on their advertisement-bits
+/// in a validators buffer.
+///
+/// Returns current timestamp if the connection request was non-empty, `None`
+/// otherwise.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn connect_to_validators(
ctx: &mut Context,
- validator_ids: Vec,
-) {
+ validator_groups_buf: &ValidatorGroupsBuffer,
+) -> Option {
+ let validator_ids = validator_groups_buf.validators_to_connect();
+ let is_disconnect = validator_ids.is_empty();
+
// ignore address resolution failure
// will reissue a new request on new collation
let (failed, _) = oneshot::channel();
@@ -557,6 +509,8 @@ async fn connect_to_validators(
failed,
})
.await;
+
+ (!is_disconnect).then_some(Instant::now())
}
/// Advertise collation to the given `peer`.
@@ -715,15 +669,9 @@ async fn send_collation(
state.active_collation_fetches.push(
async move {
let r = rx.timeout(MAX_UNSHARED_UPLOAD_TIME).await;
- if r.is_none() {
- gum::debug!(
- target: LOG_TARGET,
- ?relay_parent,
- ?peer_id,
- "Sending collation to validator timed out, carrying on with next validator."
- );
- }
- (relay_parent, peer_id)
+ let timed_out = r.is_none();
+
+ CollationSendResult { relay_parent, peer_id, timed_out }
}
.boxed(),
);
@@ -986,6 +934,7 @@ async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()>
state.our_validators_groups.remove(removed);
state.span_per_relay_parent.remove(removed);
state.waiting_collation_fetches.remove(removed);
+ state.validator_groups_buf.remove_relay_parent(removed);
}
state.view = view;
@@ -1007,6 +956,9 @@ pub(crate) async fn run(
let mut state = State::new(local_peer_id, collator_pair, metrics);
let mut runtime = RuntimeInfo::new(None);
+ let reconnect_stream = super::tick_stream(RECONNECT_POLL);
+ pin_mut!(reconnect_stream);
+
loop {
let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse();
pin_mut!(recv_req);
@@ -1022,7 +974,25 @@ pub(crate) async fn run(
FromOrchestra::Signal(BlockFinalized(..)) => {}
FromOrchestra::Signal(Conclude) => return Ok(()),
},
- (relay_parent, peer_id) = state.active_collation_fetches.select_next_some() => {
+ CollationSendResult {
+ relay_parent,
+ peer_id,
+ timed_out,
+ } = state.active_collation_fetches.select_next_some() => {
+ if timed_out {
+ gum::debug!(
+ target: LOG_TARGET,
+ ?relay_parent,
+ ?peer_id,
+ "Sending collation to validator timed out, carrying on with next validator",
+ );
+ } else {
+ for authority_id in state.peer_ids.get(&peer_id).into_iter().flatten() {
+ // Timeout not hit, this peer is no longer interested in this relay parent.
+ state.validator_groups_buf.reset_validator_interest(relay_parent, authority_id);
+ }
+ }
+
let next = if let Some(waiting) = state.waiting_collation_fetches.get_mut(&relay_parent) {
waiting.waiting_peers.remove(&peer_id);
if let Some(next) = waiting.waiting.pop_front() {
@@ -1042,7 +1012,29 @@ pub(crate) async fn run(
send_collation(&mut state, next, receipt, pov).await;
}
- }
+ },
+ _ = reconnect_stream.next() => {
+ let now = Instant::now();
+ if state
+ .last_connected_at
+ .map_or(false, |timestamp| now - timestamp > RECONNECT_TIMEOUT)
+ {
+ // Remove all advertisements from the buffer if the timeout was hit.
+ // Usually, it shouldn't be necessary as leaves get deactivated, rather
+ // serves as a safeguard against finality lags.
+ state.validator_groups_buf.clear_advertisements();
+ // Returns `None` if connection request is empty.
+ state.last_connected_at =
+ connect_to_validators(&mut ctx, &state.validator_groups_buf).await;
+
+ gum::debug!(
+ target: LOG_TARGET,
+ timeout = ?RECONNECT_TIMEOUT,
+ "Timeout hit, sent a connection request. Disconnected from all validators = {}",
+ state.last_connected_at.is_none(),
+ );
+ }
+ },
in_req = recv_req => {
match in_req {
Ok(req) => {
diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests.rs
index 2d2f2cf043..c20a2d6c97 100644
--- a/polkadot/node/network/collator-protocol/src/collator_side/tests.rs
+++ b/polkadot/node/network/collator-protocol/src/collator_side/tests.rs
@@ -56,7 +56,7 @@ struct TestState {
group_rotation_info: GroupRotationInfo,
validator_peer_id: Vec,
relay_parent: Hash,
- availability_core: CoreState,
+ availability_cores: Vec,
local_peer_id: PeerId,
collator_pair: CollatorPair,
session_index: SessionIndex,
@@ -88,14 +88,15 @@ impl Default for TestState {
let validator_peer_id =
std::iter::repeat_with(|| PeerId::random()).take(discovery_keys.len()).collect();
- let validator_groups = vec![vec![2, 0, 4], vec![3, 2, 4]]
+ let validator_groups = vec![vec![2, 0, 4], vec![1, 3]]
.into_iter()
.map(|g| g.into_iter().map(ValidatorIndex).collect())
.collect();
let group_rotation_info =
GroupRotationInfo { session_start_block: 0, group_rotation_frequency: 100, now: 1 };
- let availability_core = CoreState::Scheduled(ScheduledCore { para_id, collator: None });
+ let availability_cores =
+ vec![CoreState::Scheduled(ScheduledCore { para_id, collator: None }), CoreState::Free];
let relay_parent = Hash::random();
@@ -122,7 +123,7 @@ impl Default for TestState {
group_rotation_info,
validator_peer_id,
relay_parent,
- availability_core,
+ availability_cores,
local_peer_id,
collator_pair,
session_index: 1,
@@ -132,7 +133,9 @@ impl Default for TestState {
impl TestState {
fn current_group_validator_indices(&self) -> &[ValidatorIndex] {
- &self.session_info.validator_groups[0]
+ let core_num = self.availability_cores.len();
+ let GroupIndex(group_idx) = self.group_rotation_info.group_for_core(CoreIndex(0), core_num);
+ &self.session_info.validator_groups[group_idx as usize]
}
fn current_session_index(&self) -> SessionIndex {
@@ -333,7 +336,7 @@ async fn distribute_collation(
RuntimeApiRequest::AvailabilityCores(tx)
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
- tx.send(Ok(vec![test_state.availability_core.clone()])).unwrap();
+ tx.send(Ok(test_state.availability_cores.clone())).unwrap();
}
);
@@ -987,3 +990,104 @@ where
test_harness
});
}
+
+#[test]
+fn connect_to_buffered_groups() {
+ let mut test_state = TestState::default();
+ let local_peer_id = test_state.local_peer_id.clone();
+ let collator_pair = test_state.collator_pair.clone();
+
+ test_harness(local_peer_id, collator_pair, |test_harness| async move {
+ let mut virtual_overseer = test_harness.virtual_overseer;
+ let mut req_cfg = test_harness.req_cfg;
+
+ setup_system(&mut virtual_overseer, &test_state).await;
+
+ let group_a = test_state.current_group_validator_authority_ids();
+ let peers_a = test_state.current_group_validator_peer_ids();
+ assert!(group_a.len() > 1);
+
+ distribute_collation(&mut virtual_overseer, &test_state, false).await;
+
+ assert_matches!(
+ overseer_recv(&mut virtual_overseer).await,
+ AllMessages::NetworkBridgeTx(
+ NetworkBridgeTxMessage::ConnectToValidators { validator_ids, .. }
+ ) => {
+ assert_eq!(group_a, validator_ids);
+ }
+ );
+
+ let head_a = test_state.relay_parent;
+
+ for (val, peer) in group_a.iter().zip(&peers_a) {
+ connect_peer(&mut virtual_overseer, peer.clone(), Some(val.clone())).await;
+ }
+
+ for peer_id in &peers_a {
+ expect_declare_msg(&mut virtual_overseer, &test_state, peer_id).await;
+ }
+
+ // Update views.
+ for peed_id in &peers_a {
+ send_peer_view_change(&mut virtual_overseer, peed_id, vec![head_a]).await;
+ expect_advertise_collation_msg(&mut virtual_overseer, peed_id, head_a).await;
+ }
+
+ let peer = peers_a[0];
+ // Peer from the group fetches the collation.
+ let (pending_response, rx) = oneshot::channel();
+ req_cfg
+ .inbound_queue
+ .as_mut()
+ .unwrap()
+ .send(RawIncomingRequest {
+ peer,
+ payload: CollationFetchingRequest {
+ relay_parent: head_a,
+ para_id: test_state.para_id,
+ }
+ .encode(),
+ pending_response,
+ })
+ .await
+ .unwrap();
+ assert_matches!(
+ rx.await,
+ Ok(full_response) => {
+ let CollationFetchingResponse::Collation(..): CollationFetchingResponse =
+ CollationFetchingResponse::decode(
+ &mut full_response.result.expect("We should have a proper answer").as_ref(),
+ )
+ .expect("Decoding should work");
+ }
+ );
+
+ test_state.advance_to_new_round(&mut virtual_overseer, true).await;
+ test_state.group_rotation_info = test_state.group_rotation_info.bump_rotation();
+
+ let head_b = test_state.relay_parent;
+ let group_b = test_state.current_group_validator_authority_ids();
+ assert_ne!(head_a, head_b);
+ assert_ne!(group_a, group_b);
+
+ distribute_collation(&mut virtual_overseer, &test_state, false).await;
+
+ // Should be connected to both groups except for the validator that fetched advertised
+ // collation.
+ assert_matches!(
+ overseer_recv(&mut virtual_overseer).await,
+ AllMessages::NetworkBridgeTx(
+ NetworkBridgeTxMessage::ConnectToValidators { validator_ids, .. }
+ ) => {
+ assert!(!validator_ids.contains(&group_a[0]));
+
+ for validator in group_a[1..].iter().chain(&group_b) {
+ assert!(validator_ids.contains(validator));
+ }
+ }
+ );
+
+ TestHarness { virtual_overseer, req_cfg }
+ });
+}
diff --git a/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs b/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs
new file mode 100644
index 0000000000..5bb31c72d6
--- /dev/null
+++ b/polkadot/node/network/collator-protocol/src/collator_side/validators_buffer.rs
@@ -0,0 +1,317 @@
+// Copyright 2017-2022 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+//! Validator groups buffer for connection managements.
+//!
+//! Solves 2 problems:
+//! 1. A collator may want to stay connected to multiple groups on rotation boundaries.
+//! 2. It's important to disconnect from validator when there're no collations to be fetched.
+//!
+//! We keep a simple FIFO buffer of N validator groups and a bitvec for each advertisement,
+//! 1 indicating we want to be connected to i-th validator in a buffer, 0 otherwise.
+//!
+//! The bit is set to 1 for the whole **group** whenever it's inserted into the buffer. Given a relay
+//! parent, one can reset a bit back to 0 for particular **validator**. For example, if a collation
+//! was fetched or some timeout has been hit.
+//!
+//! The bitwise OR over known advertisements gives us validators indices for connection request.
+
+use std::{
+ collections::{HashMap, VecDeque},
+ num::NonZeroUsize,
+ ops::Range,
+};
+
+use bitvec::{bitvec, vec::BitVec};
+
+use polkadot_primitives::v2::{AuthorityDiscoveryId, GroupIndex, Hash, SessionIndex};
+
+/// The ring buffer stores at most this many unique validator groups.
+///
+/// This value should be chosen in way that all groups assigned to our para
+/// in the view can fit into the buffer.
+pub const VALIDATORS_BUFFER_CAPACITY: NonZeroUsize = match NonZeroUsize::new(3) {
+ Some(cap) => cap,
+ None => panic!("buffer capacity must be non-zero"),
+};
+
+/// Unique identifier of a validators group.
+#[derive(Debug)]
+struct ValidatorsGroupInfo {
+ /// Number of validators in the group.
+ len: usize,
+ session_index: SessionIndex,
+ group_index: GroupIndex,
+}
+
+/// Ring buffer of validator groups.
+///
+/// Tracks which peers we want to be connected to with respect to advertised collations.
+#[derive(Debug)]
+pub struct ValidatorGroupsBuffer {
+ /// Validator groups identifiers we **had** advertisements for.
+ group_infos: VecDeque,
+ /// Continuous buffer of validators discovery keys.
+ validators: VecDeque,
+ /// Mapping from relay-parent to bit-vectors with bits for all `validators`.
+ /// Invariants kept: All bit-vectors are guaranteed to have the same size.
+ should_be_connected: HashMap,
+ /// Buffer capacity, limits the number of **groups** tracked.
+ cap: NonZeroUsize,
+}
+
+impl ValidatorGroupsBuffer {
+ /// Creates a new buffer with a non-zero capacity.
+ pub fn with_capacity(cap: NonZeroUsize) -> Self {
+ Self {
+ group_infos: VecDeque::new(),
+ validators: VecDeque::new(),
+ should_be_connected: HashMap::new(),
+ cap,
+ }
+ }
+
+ /// Returns discovery ids of validators we have at least one advertised-but-not-fetched
+ /// collation for.
+ pub fn validators_to_connect(&self) -> Vec {
+ let validators_num = self.validators.len();
+ let bits = self
+ .should_be_connected
+ .values()
+ .fold(bitvec![0; validators_num], |acc, next| acc | next);
+
+ self.validators
+ .iter()
+ .enumerate()
+ .filter_map(|(idx, authority_id)| bits[idx].then_some(authority_id.clone()))
+ .collect()
+ }
+
+ /// Note a new advertisement, marking that we want to be connected to validators
+ /// from this group.
+ ///
+ /// If max capacity is reached and the group is new, drops validators from the back
+ /// of the buffer.
+ pub fn note_collation_advertised(
+ &mut self,
+ relay_parent: Hash,
+ session_index: SessionIndex,
+ group_index: GroupIndex,
+ validators: &[AuthorityDiscoveryId],
+ ) {
+ if validators.is_empty() {
+ return
+ }
+
+ match self.group_infos.iter().enumerate().find(|(_, group)| {
+ group.session_index == session_index && group.group_index == group_index
+ }) {
+ Some((idx, group)) => {
+ let group_start_idx = self.group_lengths_iter().take(idx).sum();
+ self.set_bits(relay_parent, group_start_idx..(group_start_idx + group.len));
+ },
+ None => self.push(relay_parent, session_index, group_index, validators),
+ }
+ }
+
+ /// Note that a validator is no longer interested in a given relay parent.
+ pub fn reset_validator_interest(
+ &mut self,
+ relay_parent: Hash,
+ authority_id: &AuthorityDiscoveryId,
+ ) {
+ let bits = match self.should_be_connected.get_mut(&relay_parent) {
+ Some(bits) => bits,
+ None => return,
+ };
+
+ for (idx, auth_id) in self.validators.iter().enumerate() {
+ if auth_id == authority_id {
+ bits.set(idx, false);
+ }
+ }
+ }
+
+ /// Remove relay parent from the buffer.
+ ///
+ /// The buffer will no longer track which validators are interested in a corresponding
+ /// advertisement.
+ pub fn remove_relay_parent(&mut self, relay_parent: &Hash) {
+ self.should_be_connected.remove(relay_parent);
+ }
+
+ /// Removes all advertisements from the buffer.
+ pub fn clear_advertisements(&mut self) {
+ self.should_be_connected.clear();
+ }
+
+ /// Pushes a new group to the buffer along with advertisement, setting all validators
+ /// bits to 1.
+ ///
+ /// If the buffer is full, drops group from the tail.
+ fn push(
+ &mut self,
+ relay_parent: Hash,
+ session_index: SessionIndex,
+ group_index: GroupIndex,
+ validators: &[AuthorityDiscoveryId],
+ ) {
+ let new_group_info =
+ ValidatorsGroupInfo { len: validators.len(), session_index, group_index };
+
+ let buf = &mut self.group_infos;
+ let cap = self.cap.get();
+
+ if buf.len() >= cap {
+ let pruned_group = buf.pop_front().expect("buf is not empty; qed");
+ self.validators.drain(..pruned_group.len);
+
+ self.should_be_connected.values_mut().for_each(|bits| {
+ bits.as_mut_bitslice().shift_left(pruned_group.len);
+ });
+ }
+
+ self.validators.extend(validators.iter().cloned());
+ buf.push_back(new_group_info);
+ let buf_len = buf.len();
+ let group_start_idx = self.group_lengths_iter().take(buf_len - 1).sum();
+
+ let new_len = self.validators.len();
+ self.should_be_connected
+ .values_mut()
+ .for_each(|bits| bits.resize(new_len, false));
+ self.set_bits(relay_parent, group_start_idx..(group_start_idx + validators.len()));
+ }
+
+ /// Sets advertisement bits to 1 in a given range (usually corresponding to some group).
+ /// If the relay parent is unknown, inserts 0-initialized bitvec first.
+ ///
+ /// The range must be ensured to be within bounds.
+ fn set_bits(&mut self, relay_parent: Hash, range: Range) {
+ let bits = self
+ .should_be_connected
+ .entry(relay_parent)
+ .or_insert_with(|| bitvec![0; self.validators.len()]);
+
+ bits[range].fill(true);
+ }
+
+ /// Returns iterator over numbers of validators in groups.
+ ///
+ /// Useful for getting an index of the first validator in i-th group.
+ fn group_lengths_iter(&self) -> impl Iterator- + '_ {
+ self.group_infos.iter().map(|group| group.len)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use sp_keyring::Sr25519Keyring;
+
+ #[test]
+ fn one_capacity_buffer() {
+ let cap = NonZeroUsize::new(1).unwrap();
+ let mut buf = ValidatorGroupsBuffer::with_capacity(cap);
+
+ let hash_a = Hash::repeat_byte(0x1);
+ let hash_b = Hash::repeat_byte(0x2);
+
+ let validators: Vec<_> = [
+ Sr25519Keyring::Alice,
+ Sr25519Keyring::Bob,
+ Sr25519Keyring::Charlie,
+ Sr25519Keyring::Dave,
+ Sr25519Keyring::Ferdie,
+ ]
+ .into_iter()
+ .map(|key| AuthorityDiscoveryId::from(key.public()))
+ .collect();
+
+ assert!(buf.validators_to_connect().is_empty());
+
+ buf.note_collation_advertised(hash_a, 0, GroupIndex(0), &validators[..2]);
+ assert_eq!(buf.validators_to_connect(), validators[..2].to_vec());
+
+ buf.reset_validator_interest(hash_a, &validators[1]);
+ assert_eq!(buf.validators_to_connect(), vec![validators[0].clone()]);
+
+ buf.note_collation_advertised(hash_b, 0, GroupIndex(1), &validators[2..]);
+ assert_eq!(buf.validators_to_connect(), validators[2..].to_vec());
+
+ for validator in &validators[2..] {
+ buf.reset_validator_interest(hash_b, validator);
+ }
+ assert!(buf.validators_to_connect().is_empty());
+ }
+
+ #[test]
+ fn buffer_works() {
+ let cap = NonZeroUsize::new(3).unwrap();
+ let mut buf = ValidatorGroupsBuffer::with_capacity(cap);
+
+ let hashes: Vec<_> = (0..5).map(Hash::repeat_byte).collect();
+
+ let validators: Vec<_> = [
+ Sr25519Keyring::Alice,
+ Sr25519Keyring::Bob,
+ Sr25519Keyring::Charlie,
+ Sr25519Keyring::Dave,
+ Sr25519Keyring::Ferdie,
+ ]
+ .into_iter()
+ .map(|key| AuthorityDiscoveryId::from(key.public()))
+ .collect();
+
+ buf.note_collation_advertised(hashes[0], 0, GroupIndex(0), &validators[..2]);
+ buf.note_collation_advertised(hashes[1], 0, GroupIndex(0), &validators[..2]);
+ buf.note_collation_advertised(hashes[2], 0, GroupIndex(1), &validators[2..4]);
+ buf.note_collation_advertised(hashes[2], 0, GroupIndex(1), &validators[2..4]);
+
+ assert_eq!(buf.validators_to_connect(), validators[..4].to_vec());
+
+ for validator in &validators[2..4] {
+ buf.reset_validator_interest(hashes[2], validator);
+ }
+
+ buf.reset_validator_interest(hashes[1], &validators[0]);
+ assert_eq!(buf.validators_to_connect(), validators[..2].to_vec());
+
+ buf.reset_validator_interest(hashes[0], &validators[0]);
+ assert_eq!(buf.validators_to_connect(), vec![validators[1].clone()]);
+
+ buf.note_collation_advertised(hashes[3], 0, GroupIndex(1), &validators[2..4]);
+ buf.note_collation_advertised(
+ hashes[4],
+ 0,
+ GroupIndex(2),
+ std::slice::from_ref(&validators[4]),
+ );
+
+ buf.reset_validator_interest(hashes[3], &validators[2]);
+ buf.note_collation_advertised(
+ hashes[4],
+ 0,
+ GroupIndex(3),
+ std::slice::from_ref(&validators[0]),
+ );
+
+ assert_eq!(
+ buf.validators_to_connect(),
+ vec![validators[3].clone(), validators[4].clone(), validators[0].clone()]
+ );
+ }
+}
diff --git a/polkadot/node/network/collator-protocol/src/lib.rs b/polkadot/node/network/collator-protocol/src/lib.rs
index 66659e4b5b..b71acc127c 100644
--- a/polkadot/node/network/collator-protocol/src/lib.rs
+++ b/polkadot/node/network/collator-protocol/src/lib.rs
@@ -21,9 +21,12 @@
#![deny(unused_crate_dependencies)]
#![recursion_limit = "256"]
-use std::time::Duration;
+use std::time::{Duration, Instant};
-use futures::{FutureExt, TryFutureExt};
+use futures::{
+ stream::{FusedStream, StreamExt},
+ FutureExt, TryFutureExt,
+};
use sp_keystore::SyncCryptoStorePtr;
@@ -134,3 +137,23 @@ async fn modify_reputation(
sender.send_message(NetworkBridgeTxMessage::ReportPeer(peer, rep)).await;
}
+
+/// Wait until tick and return the timestamp for the following one.
+async fn wait_until_next_tick(last_poll: Instant, period: Duration) -> Instant {
+ let now = Instant::now();
+ let next_poll = last_poll + period;
+
+ if next_poll > now {
+ futures_timer::Delay::new(next_poll - now).await
+ }
+
+ Instant::now()
+}
+
+/// Returns an infinite stream that yields with an interval of `period`.
+fn tick_stream(period: Duration) -> impl FusedStream
- {
+ futures::stream::unfold(Instant::now(), move |next_check| async move {
+ Some(((), wait_until_next_tick(next_check, period).await))
+ })
+ .fuse()
+}
diff --git a/polkadot/node/network/collator-protocol/src/validator_side/mod.rs b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs
index 47795aac0c..b74c1d5b5a 100644
--- a/polkadot/node/network/collator-protocol/src/validator_side/mod.rs
+++ b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs
@@ -19,7 +19,7 @@ use futures::{
channel::oneshot,
future::{BoxFuture, Fuse, FusedFuture},
select,
- stream::{FusedStream, FuturesUnordered},
+ stream::FuturesUnordered,
FutureExt, StreamExt,
};
use futures_timer::Delay;
@@ -57,7 +57,7 @@ use polkadot_primitives::v2::{CandidateReceipt, CollatorId, Hash, Id as ParaId};
use crate::error::Result;
-use super::{modify_reputation, LOG_TARGET};
+use super::{modify_reputation, tick_stream, LOG_TARGET};
#[cfg(test)]
mod tests;
@@ -97,7 +97,7 @@ const ACTIVITY_POLL: Duration = Duration::from_millis(10);
// How often to poll collation responses.
// This is a hack that should be removed in a refactoring.
// See https://github.com/paritytech/polkadot/issues/4182
-const CHECK_COLLATIONS_POLL: Duration = Duration::from_millis(5);
+const CHECK_COLLATIONS_POLL: Duration = Duration::from_millis(50);
#[derive(Clone, Default)]
pub struct Metrics(Option);
@@ -1167,25 +1167,6 @@ async fn process_msg(
}
}
-// wait until next inactivity check. returns the instant for the following check.
-async fn wait_until_next_check(last_poll: Instant) -> Instant {
- let now = Instant::now();
- let next_poll = last_poll + ACTIVITY_POLL;
-
- if next_poll > now {
- Delay::new(next_poll - now).await
- }
-
- Instant::now()
-}
-
-fn infinite_stream(every: Duration) -> impl FusedStream
- {
- futures::stream::unfold(Instant::now() + every, |next_check| async move {
- Some(((), wait_until_next_check(next_check).await))
- })
- .fuse()
-}
-
/// The main run loop.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
pub(crate) async fn run(
@@ -1196,10 +1177,10 @@ pub(crate) async fn run(
) -> std::result::Result<(), crate::error::FatalError> {
let mut state = State { metrics, ..Default::default() };
- let next_inactivity_stream = infinite_stream(ACTIVITY_POLL);
+ let next_inactivity_stream = tick_stream(ACTIVITY_POLL);
futures::pin_mut!(next_inactivity_stream);
- let check_collations_stream = infinite_stream(CHECK_COLLATIONS_POLL);
+ let check_collations_stream = tick_stream(CHECK_COLLATIONS_POLL);
futures::pin_mut!(check_collations_stream);
loop {