Buffered connection management for collator-protocol (#6022)

* Extract metrics into a separate module

* Introduce validators buffer

* Integrate buffer into the subsystem

* Only reconnect on new advertisements

* Test

* comma

* doc comment

* Make capacity buffer compile time non-zero

* Add doc comments

* nits

* remove derives

* review

* better naming

* check timeout

* Extract interval stream into lib

* Ensure collator disconnects after timeout

* spellcheck

* rename buf

* Remove double interval

* Add a log on timeout

* Cleanup buffer on timeout
This commit is contained in:
Chris Sosnin
2022-10-05 11:48:50 +04:00
committed by GitHub
parent e0e836671f
commit b13e07bc47
8 changed files with 707 additions and 165 deletions
+3 -2
View File
@@ -555,9 +555,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitvec"
version = "1.0.0"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1489fcb93a5bb47da0462ca93ad252ad6af2145cce58d10d46a83931ba9f016b"
checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c"
dependencies = [
"funty",
"radium",
@@ -6290,6 +6290,7 @@ version = "0.9.29"
dependencies = [
"always-assert",
"assert_matches",
"bitvec",
"env_logger 0.9.0",
"fatality",
"futures",
@@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
always-assert = "0.1.2"
bitvec = { version = "1.0.1", default-features = false, features = ["alloc"] }
futures = "0.3.21"
futures-timer = "3"
gum = { package = "tracing-gum", path = "../../gum" }
@@ -0,0 +1,123 @@
// Copyright 2017-2022 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use polkadot_node_subsystem_util::metrics::{self, prometheus};
#[derive(Clone, Default)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
pub fn on_advertisment_made(&self) {
if let Some(metrics) = &self.0 {
metrics.advertisements_made.inc();
}
}
pub fn on_collation_sent_requested(&self) {
if let Some(metrics) = &self.0 {
metrics.collations_send_requested.inc();
}
}
pub fn on_collation_sent(&self) {
if let Some(metrics) = &self.0 {
metrics.collations_sent.inc();
}
}
/// Provide a timer for `process_msg` which observes on drop.
pub fn time_process_msg(&self) -> Option<prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.process_msg.start_timer())
}
/// Provide a timer for `distribute_collation` which observes on drop.
pub fn time_collation_distribution(
&self,
label: &'static str,
) -> Option<prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| {
metrics.collation_distribution_time.with_label_values(&[label]).start_timer()
})
}
}
#[derive(Clone)]
struct MetricsInner {
advertisements_made: prometheus::Counter<prometheus::U64>,
collations_sent: prometheus::Counter<prometheus::U64>,
collations_send_requested: prometheus::Counter<prometheus::U64>,
process_msg: prometheus::Histogram,
collation_distribution_time: prometheus::HistogramVec,
}
impl metrics::Metrics for Metrics {
fn try_register(
registry: &prometheus::Registry,
) -> std::result::Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
advertisements_made: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_collation_advertisements_made_total",
"A number of collation advertisements sent to validators.",
)?,
registry,
)?,
collations_send_requested: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_collations_sent_requested_total",
"A number of collations requested to be sent to validators.",
)?,
registry,
)?,
collations_sent: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_collations_sent_total",
"A number of collations sent to validators.",
)?,
registry,
)?,
process_msg: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_collator_protocol_collator_process_msg",
"Time spent within `collator_protocol_collator::process_msg`",
)
.buckets(vec![
0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75,
1.0,
]),
)?,
registry,
)?,
collation_distribution_time: prometheus::register(
prometheus::HistogramVec::new(
prometheus::HistogramOpts::new(
"polkadot_parachain_collator_protocol_collator_distribution_time",
"Time spent within `collator_protocol_collator::distribute_collation`",
)
.buckets(vec![
0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75,
1.0,
]),
&["state"],
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
@@ -17,7 +17,7 @@
use std::{
collections::{HashMap, HashSet, VecDeque},
pin::Pin,
time::Duration,
time::{Duration, Instant},
};
use futures::{
@@ -44,19 +44,25 @@ use polkadot_node_subsystem::{
overseer, FromOrchestra, OverseerSignal, PerLeafSpan,
};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
runtime::{get_availability_cores, get_group_rotation_info, RuntimeInfo},
TimeoutExt,
};
use polkadot_primitives::v2::{
AuthorityDiscoveryId, CandidateHash, CandidateReceipt, CollatorPair, CoreIndex, CoreState,
Hash, Id as ParaId,
GroupIndex, Hash, Id as ParaId, SessionIndex,
};
use super::LOG_TARGET;
use crate::error::{log_error, Error, FatalError, Result};
use fatality::Split;
mod metrics;
mod validators_buffer;
use validators_buffer::{ValidatorGroupsBuffer, VALIDATORS_BUFFER_CAPACITY};
pub use metrics::Metrics;
#[cfg(test)]
mod tests;
@@ -73,111 +79,16 @@ const COST_APPARENT_FLOOD: Rep =
/// For considerations on this value, see: https://github.com/paritytech/polkadot/issues/4386
const MAX_UNSHARED_UPLOAD_TIME: Duration = Duration::from_millis(150);
#[derive(Clone, Default)]
pub struct Metrics(Option<MetricsInner>);
/// Ensure that collator issues a connection request at least once every this many seconds.
/// Usually it's done when advertising new collation. However, if the core stays occupied or
/// it's not our turn to produce a candidate, it's important to disconnect from previous
/// peers.
///
/// Validators are obtained from [`ValidatorGroupsBuffer::validators_to_connect`].
const RECONNECT_TIMEOUT: Duration = Duration::from_secs(12);
impl Metrics {
fn on_advertisment_made(&self) {
if let Some(metrics) = &self.0 {
metrics.advertisements_made.inc();
}
}
fn on_collation_sent_requested(&self) {
if let Some(metrics) = &self.0 {
metrics.collations_send_requested.inc();
}
}
fn on_collation_sent(&self) {
if let Some(metrics) = &self.0 {
metrics.collations_sent.inc();
}
}
/// Provide a timer for `process_msg` which observes on drop.
fn time_process_msg(&self) -> Option<prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.process_msg.start_timer())
}
/// Provide a timer for `distribute_collation` which observes on drop.
fn time_collation_distribution(
&self,
label: &'static str,
) -> Option<prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| {
metrics.collation_distribution_time.with_label_values(&[label]).start_timer()
})
}
}
#[derive(Clone)]
struct MetricsInner {
advertisements_made: prometheus::Counter<prometheus::U64>,
collations_sent: prometheus::Counter<prometheus::U64>,
collations_send_requested: prometheus::Counter<prometheus::U64>,
process_msg: prometheus::Histogram,
collation_distribution_time: prometheus::HistogramVec,
}
impl metrics::Metrics for Metrics {
fn try_register(
registry: &prometheus::Registry,
) -> std::result::Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
advertisements_made: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_collation_advertisements_made_total",
"A number of collation advertisements sent to validators.",
)?,
registry,
)?,
collations_send_requested: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_collations_sent_requested_total",
"A number of collations requested to be sent to validators.",
)?,
registry,
)?,
collations_sent: prometheus::register(
prometheus::Counter::new(
"polkadot_parachain_collations_sent_total",
"A number of collations sent to validators.",
)?,
registry,
)?,
process_msg: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_collator_protocol_collator_process_msg",
"Time spent within `collator_protocol_collator::process_msg`",
)
.buckets(vec![
0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75,
1.0,
]),
)?,
registry,
)?,
collation_distribution_time: prometheus::register(
prometheus::HistogramVec::new(
prometheus::HistogramOpts::new(
"polkadot_parachain_collator_protocol_collator_distribution_time",
"Time spent within `collator_protocol_collator::distribute_collation`",
)
.buckets(vec![
0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75,
1.0,
]),
&["state"],
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
/// How often to check for reconnect timeout.
const RECONNECT_POLL: Duration = Duration::from_secs(1);
/// Info about validators we are currently connected to.
///
@@ -269,8 +180,14 @@ struct WaitingCollationFetches {
waiting_peers: HashSet<PeerId>,
}
struct CollationSendResult {
relay_parent: Hash,
peer_id: PeerId,
timed_out: bool,
}
type ActiveCollationFetches =
FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, PeerId)> + Send + 'static>>>;
FuturesUnordered<Pin<Box<dyn Future<Output = CollationSendResult> + Send + 'static>>>;
struct State {
/// Our network peer id.
@@ -308,6 +225,13 @@ struct State {
/// by `PeerConnected` events.
peer_ids: HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
/// Tracks which validators we want to stay connected to.
validator_groups_buf: ValidatorGroupsBuffer,
/// Timestamp of the last connection request to a non-empty list of validators,
/// `None` otherwise.
last_connected_at: Option<Instant>,
/// Metrics.
metrics: Metrics,
@@ -339,6 +263,8 @@ impl State {
collation_result_senders: Default::default(),
our_validators_groups: Default::default(),
peer_ids: Default::default(),
validator_groups_buf: ValidatorGroupsBuffer::with_capacity(VALIDATORS_BUFFER_CAPACITY),
last_connected_at: None,
waiting_collation_fetches: Default::default(),
active_collation_fetches: Default::default(),
}
@@ -373,6 +299,7 @@ async fn distribute_collation<Context>(
result_sender: Option<oneshot::Sender<CollationSecondedSignal>>,
) -> Result<()> {
let relay_parent = receipt.descriptor.relay_parent;
let candidate_hash = receipt.hash();
// This collation is not in the active-leaves set.
if !state.view.contains(&relay_parent) {
@@ -412,10 +339,10 @@ async fn distribute_collation<Context>(
};
// Determine the group on that core.
let current_validators =
let GroupValidators { validators, session_index, group_index } =
determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?;
if current_validators.validators.is_empty() {
if validators.is_empty() {
gum::warn!(
target: LOG_TARGET,
core = ?our_core,
@@ -425,24 +352,36 @@ async fn distribute_collation<Context>(
return Ok(())
}
// It's important to insert new collation bits **before**
// issuing a connection request.
//
// If a validator managed to fetch all the relevant collations
// but still assigned to our core, we keep the connection alive.
state.validator_groups_buf.note_collation_advertised(
relay_parent,
session_index,
group_index,
&validators,
);
gum::debug!(
target: LOG_TARGET,
para_id = %id,
relay_parent = %relay_parent,
candidate_hash = ?receipt.hash(),
?candidate_hash,
pov_hash = ?pov.hash(),
core = ?our_core,
?current_validators,
current_validators = ?validators,
"Accepted collation, connecting to validators."
);
// Issue a discovery request for the validators of the current group:
connect_to_validators(ctx, current_validators.validators.into_iter().collect()).await;
// Update a set of connected validators if necessary.
state.last_connected_at = connect_to_validators(ctx, &state.validator_groups_buf).await;
state.our_validators_groups.insert(relay_parent, ValidatorGroup::new());
if let Some(result_sender) = result_sender {
state.collation_result_senders.insert(receipt.hash(), result_sender);
state.collation_result_senders.insert(candidate_hash, result_sender);
}
state
@@ -483,6 +422,9 @@ async fn determine_core(
struct GroupValidators {
/// The validators of above group (their discovery keys).
validators: Vec<AuthorityDiscoveryId>,
session_index: SessionIndex,
group_index: GroupIndex,
}
/// Figure out current group of validators assigned to the para being collated on.
@@ -516,7 +458,11 @@ async fn determine_our_validators<Context>(
let current_validators =
current_validators.iter().map(|i| validators[i.0 as usize].clone()).collect();
let current_validators = GroupValidators { validators: current_validators };
let current_validators = GroupValidators {
validators: current_validators,
session_index,
group_index: current_group_index,
};
Ok(current_validators)
}
@@ -541,13 +487,19 @@ async fn declare<Context>(ctx: &mut Context, state: &mut State, peer: PeerId) {
}
}
/// Issue a connection request to a set of validators and
/// revoke the previous connection request.
/// Updates a set of connected validators based on their advertisement-bits
/// in a validators buffer.
///
/// Returns current timestamp if the connection request was non-empty, `None`
/// otherwise.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn connect_to_validators<Context>(
ctx: &mut Context,
validator_ids: Vec<AuthorityDiscoveryId>,
) {
validator_groups_buf: &ValidatorGroupsBuffer,
) -> Option<Instant> {
let validator_ids = validator_groups_buf.validators_to_connect();
let is_disconnect = validator_ids.is_empty();
// ignore address resolution failure
// will reissue a new request on new collation
let (failed, _) = oneshot::channel();
@@ -557,6 +509,8 @@ async fn connect_to_validators<Context>(
failed,
})
.await;
(!is_disconnect).then_some(Instant::now())
}
/// Advertise collation to the given `peer`.
@@ -715,15 +669,9 @@ async fn send_collation(
state.active_collation_fetches.push(
async move {
let r = rx.timeout(MAX_UNSHARED_UPLOAD_TIME).await;
if r.is_none() {
gum::debug!(
target: LOG_TARGET,
?relay_parent,
?peer_id,
"Sending collation to validator timed out, carrying on with next validator."
);
}
(relay_parent, peer_id)
let timed_out = r.is_none();
CollationSendResult { relay_parent, peer_id, timed_out }
}
.boxed(),
);
@@ -986,6 +934,7 @@ async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()>
state.our_validators_groups.remove(removed);
state.span_per_relay_parent.remove(removed);
state.waiting_collation_fetches.remove(removed);
state.validator_groups_buf.remove_relay_parent(removed);
}
state.view = view;
@@ -1007,6 +956,9 @@ pub(crate) async fn run<Context>(
let mut state = State::new(local_peer_id, collator_pair, metrics);
let mut runtime = RuntimeInfo::new(None);
let reconnect_stream = super::tick_stream(RECONNECT_POLL);
pin_mut!(reconnect_stream);
loop {
let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse();
pin_mut!(recv_req);
@@ -1022,7 +974,25 @@ pub(crate) async fn run<Context>(
FromOrchestra::Signal(BlockFinalized(..)) => {}
FromOrchestra::Signal(Conclude) => return Ok(()),
},
(relay_parent, peer_id) = state.active_collation_fetches.select_next_some() => {
CollationSendResult {
relay_parent,
peer_id,
timed_out,
} = state.active_collation_fetches.select_next_some() => {
if timed_out {
gum::debug!(
target: LOG_TARGET,
?relay_parent,
?peer_id,
"Sending collation to validator timed out, carrying on with next validator",
);
} else {
for authority_id in state.peer_ids.get(&peer_id).into_iter().flatten() {
// Timeout not hit, this peer is no longer interested in this relay parent.
state.validator_groups_buf.reset_validator_interest(relay_parent, authority_id);
}
}
let next = if let Some(waiting) = state.waiting_collation_fetches.get_mut(&relay_parent) {
waiting.waiting_peers.remove(&peer_id);
if let Some(next) = waiting.waiting.pop_front() {
@@ -1042,7 +1012,29 @@ pub(crate) async fn run<Context>(
send_collation(&mut state, next, receipt, pov).await;
}
}
},
_ = reconnect_stream.next() => {
let now = Instant::now();
if state
.last_connected_at
.map_or(false, |timestamp| now - timestamp > RECONNECT_TIMEOUT)
{
// Remove all advertisements from the buffer if the timeout was hit.
// Usually, it shouldn't be necessary as leaves get deactivated, rather
// serves as a safeguard against finality lags.
state.validator_groups_buf.clear_advertisements();
// Returns `None` if connection request is empty.
state.last_connected_at =
connect_to_validators(&mut ctx, &state.validator_groups_buf).await;
gum::debug!(
target: LOG_TARGET,
timeout = ?RECONNECT_TIMEOUT,
"Timeout hit, sent a connection request. Disconnected from all validators = {}",
state.last_connected_at.is_none(),
);
}
},
in_req = recv_req => {
match in_req {
Ok(req) => {
@@ -56,7 +56,7 @@ struct TestState {
group_rotation_info: GroupRotationInfo,
validator_peer_id: Vec<PeerId>,
relay_parent: Hash,
availability_core: CoreState,
availability_cores: Vec<CoreState>,
local_peer_id: PeerId,
collator_pair: CollatorPair,
session_index: SessionIndex,
@@ -88,14 +88,15 @@ impl Default for TestState {
let validator_peer_id =
std::iter::repeat_with(|| PeerId::random()).take(discovery_keys.len()).collect();
let validator_groups = vec![vec![2, 0, 4], vec![3, 2, 4]]
let validator_groups = vec![vec![2, 0, 4], vec![1, 3]]
.into_iter()
.map(|g| g.into_iter().map(ValidatorIndex).collect())
.collect();
let group_rotation_info =
GroupRotationInfo { session_start_block: 0, group_rotation_frequency: 100, now: 1 };
let availability_core = CoreState::Scheduled(ScheduledCore { para_id, collator: None });
let availability_cores =
vec![CoreState::Scheduled(ScheduledCore { para_id, collator: None }), CoreState::Free];
let relay_parent = Hash::random();
@@ -122,7 +123,7 @@ impl Default for TestState {
group_rotation_info,
validator_peer_id,
relay_parent,
availability_core,
availability_cores,
local_peer_id,
collator_pair,
session_index: 1,
@@ -132,7 +133,9 @@ impl Default for TestState {
impl TestState {
fn current_group_validator_indices(&self) -> &[ValidatorIndex] {
&self.session_info.validator_groups[0]
let core_num = self.availability_cores.len();
let GroupIndex(group_idx) = self.group_rotation_info.group_for_core(CoreIndex(0), core_num);
&self.session_info.validator_groups[group_idx as usize]
}
fn current_session_index(&self) -> SessionIndex {
@@ -333,7 +336,7 @@ async fn distribute_collation(
RuntimeApiRequest::AvailabilityCores(tx)
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
tx.send(Ok(vec![test_state.availability_core.clone()])).unwrap();
tx.send(Ok(test_state.availability_cores.clone())).unwrap();
}
);
@@ -987,3 +990,104 @@ where
test_harness
});
}
#[test]
fn connect_to_buffered_groups() {
let mut test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
let mut req_cfg = test_harness.req_cfg;
setup_system(&mut virtual_overseer, &test_state).await;
let group_a = test_state.current_group_validator_authority_ids();
let peers_a = test_state.current_group_validator_peer_ids();
assert!(group_a.len() > 1);
distribute_collation(&mut virtual_overseer, &test_state, false).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ConnectToValidators { validator_ids, .. }
) => {
assert_eq!(group_a, validator_ids);
}
);
let head_a = test_state.relay_parent;
for (val, peer) in group_a.iter().zip(&peers_a) {
connect_peer(&mut virtual_overseer, peer.clone(), Some(val.clone())).await;
}
for peer_id in &peers_a {
expect_declare_msg(&mut virtual_overseer, &test_state, peer_id).await;
}
// Update views.
for peed_id in &peers_a {
send_peer_view_change(&mut virtual_overseer, peed_id, vec![head_a]).await;
expect_advertise_collation_msg(&mut virtual_overseer, peed_id, head_a).await;
}
let peer = peers_a[0];
// Peer from the group fetches the collation.
let (pending_response, rx) = oneshot::channel();
req_cfg
.inbound_queue
.as_mut()
.unwrap()
.send(RawIncomingRequest {
peer,
payload: CollationFetchingRequest {
relay_parent: head_a,
para_id: test_state.para_id,
}
.encode(),
pending_response,
})
.await
.unwrap();
assert_matches!(
rx.await,
Ok(full_response) => {
let CollationFetchingResponse::Collation(..): CollationFetchingResponse =
CollationFetchingResponse::decode(
&mut full_response.result.expect("We should have a proper answer").as_ref(),
)
.expect("Decoding should work");
}
);
test_state.advance_to_new_round(&mut virtual_overseer, true).await;
test_state.group_rotation_info = test_state.group_rotation_info.bump_rotation();
let head_b = test_state.relay_parent;
let group_b = test_state.current_group_validator_authority_ids();
assert_ne!(head_a, head_b);
assert_ne!(group_a, group_b);
distribute_collation(&mut virtual_overseer, &test_state, false).await;
// Should be connected to both groups except for the validator that fetched advertised
// collation.
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridgeTx(
NetworkBridgeTxMessage::ConnectToValidators { validator_ids, .. }
) => {
assert!(!validator_ids.contains(&group_a[0]));
for validator in group_a[1..].iter().chain(&group_b) {
assert!(validator_ids.contains(validator));
}
}
);
TestHarness { virtual_overseer, req_cfg }
});
}
@@ -0,0 +1,317 @@
// Copyright 2017-2022 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Validator groups buffer for connection managements.
//!
//! Solves 2 problems:
//! 1. A collator may want to stay connected to multiple groups on rotation boundaries.
//! 2. It's important to disconnect from validator when there're no collations to be fetched.
//!
//! We keep a simple FIFO buffer of N validator groups and a bitvec for each advertisement,
//! 1 indicating we want to be connected to i-th validator in a buffer, 0 otherwise.
//!
//! The bit is set to 1 for the whole **group** whenever it's inserted into the buffer. Given a relay
//! parent, one can reset a bit back to 0 for particular **validator**. For example, if a collation
//! was fetched or some timeout has been hit.
//!
//! The bitwise OR over known advertisements gives us validators indices for connection request.
use std::{
collections::{HashMap, VecDeque},
num::NonZeroUsize,
ops::Range,
};
use bitvec::{bitvec, vec::BitVec};
use polkadot_primitives::v2::{AuthorityDiscoveryId, GroupIndex, Hash, SessionIndex};
/// The ring buffer stores at most this many unique validator groups.
///
/// This value should be chosen in way that all groups assigned to our para
/// in the view can fit into the buffer.
pub const VALIDATORS_BUFFER_CAPACITY: NonZeroUsize = match NonZeroUsize::new(3) {
Some(cap) => cap,
None => panic!("buffer capacity must be non-zero"),
};
/// Unique identifier of a validators group.
#[derive(Debug)]
struct ValidatorsGroupInfo {
/// Number of validators in the group.
len: usize,
session_index: SessionIndex,
group_index: GroupIndex,
}
/// Ring buffer of validator groups.
///
/// Tracks which peers we want to be connected to with respect to advertised collations.
#[derive(Debug)]
pub struct ValidatorGroupsBuffer {
/// Validator groups identifiers we **had** advertisements for.
group_infos: VecDeque<ValidatorsGroupInfo>,
/// Continuous buffer of validators discovery keys.
validators: VecDeque<AuthorityDiscoveryId>,
/// Mapping from relay-parent to bit-vectors with bits for all `validators`.
/// Invariants kept: All bit-vectors are guaranteed to have the same size.
should_be_connected: HashMap<Hash, BitVec>,
/// Buffer capacity, limits the number of **groups** tracked.
cap: NonZeroUsize,
}
impl ValidatorGroupsBuffer {
/// Creates a new buffer with a non-zero capacity.
pub fn with_capacity(cap: NonZeroUsize) -> Self {
Self {
group_infos: VecDeque::new(),
validators: VecDeque::new(),
should_be_connected: HashMap::new(),
cap,
}
}
/// Returns discovery ids of validators we have at least one advertised-but-not-fetched
/// collation for.
pub fn validators_to_connect(&self) -> Vec<AuthorityDiscoveryId> {
let validators_num = self.validators.len();
let bits = self
.should_be_connected
.values()
.fold(bitvec![0; validators_num], |acc, next| acc | next);
self.validators
.iter()
.enumerate()
.filter_map(|(idx, authority_id)| bits[idx].then_some(authority_id.clone()))
.collect()
}
/// Note a new advertisement, marking that we want to be connected to validators
/// from this group.
///
/// If max capacity is reached and the group is new, drops validators from the back
/// of the buffer.
pub fn note_collation_advertised(
&mut self,
relay_parent: Hash,
session_index: SessionIndex,
group_index: GroupIndex,
validators: &[AuthorityDiscoveryId],
) {
if validators.is_empty() {
return
}
match self.group_infos.iter().enumerate().find(|(_, group)| {
group.session_index == session_index && group.group_index == group_index
}) {
Some((idx, group)) => {
let group_start_idx = self.group_lengths_iter().take(idx).sum();
self.set_bits(relay_parent, group_start_idx..(group_start_idx + group.len));
},
None => self.push(relay_parent, session_index, group_index, validators),
}
}
/// Note that a validator is no longer interested in a given relay parent.
pub fn reset_validator_interest(
&mut self,
relay_parent: Hash,
authority_id: &AuthorityDiscoveryId,
) {
let bits = match self.should_be_connected.get_mut(&relay_parent) {
Some(bits) => bits,
None => return,
};
for (idx, auth_id) in self.validators.iter().enumerate() {
if auth_id == authority_id {
bits.set(idx, false);
}
}
}
/// Remove relay parent from the buffer.
///
/// The buffer will no longer track which validators are interested in a corresponding
/// advertisement.
pub fn remove_relay_parent(&mut self, relay_parent: &Hash) {
self.should_be_connected.remove(relay_parent);
}
/// Removes all advertisements from the buffer.
pub fn clear_advertisements(&mut self) {
self.should_be_connected.clear();
}
/// Pushes a new group to the buffer along with advertisement, setting all validators
/// bits to 1.
///
/// If the buffer is full, drops group from the tail.
fn push(
&mut self,
relay_parent: Hash,
session_index: SessionIndex,
group_index: GroupIndex,
validators: &[AuthorityDiscoveryId],
) {
let new_group_info =
ValidatorsGroupInfo { len: validators.len(), session_index, group_index };
let buf = &mut self.group_infos;
let cap = self.cap.get();
if buf.len() >= cap {
let pruned_group = buf.pop_front().expect("buf is not empty; qed");
self.validators.drain(..pruned_group.len);
self.should_be_connected.values_mut().for_each(|bits| {
bits.as_mut_bitslice().shift_left(pruned_group.len);
});
}
self.validators.extend(validators.iter().cloned());
buf.push_back(new_group_info);
let buf_len = buf.len();
let group_start_idx = self.group_lengths_iter().take(buf_len - 1).sum();
let new_len = self.validators.len();
self.should_be_connected
.values_mut()
.for_each(|bits| bits.resize(new_len, false));
self.set_bits(relay_parent, group_start_idx..(group_start_idx + validators.len()));
}
/// Sets advertisement bits to 1 in a given range (usually corresponding to some group).
/// If the relay parent is unknown, inserts 0-initialized bitvec first.
///
/// The range must be ensured to be within bounds.
fn set_bits(&mut self, relay_parent: Hash, range: Range<usize>) {
let bits = self
.should_be_connected
.entry(relay_parent)
.or_insert_with(|| bitvec![0; self.validators.len()]);
bits[range].fill(true);
}
/// Returns iterator over numbers of validators in groups.
///
/// Useful for getting an index of the first validator in i-th group.
fn group_lengths_iter(&self) -> impl Iterator<Item = usize> + '_ {
self.group_infos.iter().map(|group| group.len)
}
}
#[cfg(test)]
mod tests {
use super::*;
use sp_keyring::Sr25519Keyring;
#[test]
fn one_capacity_buffer() {
let cap = NonZeroUsize::new(1).unwrap();
let mut buf = ValidatorGroupsBuffer::with_capacity(cap);
let hash_a = Hash::repeat_byte(0x1);
let hash_b = Hash::repeat_byte(0x2);
let validators: Vec<_> = [
Sr25519Keyring::Alice,
Sr25519Keyring::Bob,
Sr25519Keyring::Charlie,
Sr25519Keyring::Dave,
Sr25519Keyring::Ferdie,
]
.into_iter()
.map(|key| AuthorityDiscoveryId::from(key.public()))
.collect();
assert!(buf.validators_to_connect().is_empty());
buf.note_collation_advertised(hash_a, 0, GroupIndex(0), &validators[..2]);
assert_eq!(buf.validators_to_connect(), validators[..2].to_vec());
buf.reset_validator_interest(hash_a, &validators[1]);
assert_eq!(buf.validators_to_connect(), vec![validators[0].clone()]);
buf.note_collation_advertised(hash_b, 0, GroupIndex(1), &validators[2..]);
assert_eq!(buf.validators_to_connect(), validators[2..].to_vec());
for validator in &validators[2..] {
buf.reset_validator_interest(hash_b, validator);
}
assert!(buf.validators_to_connect().is_empty());
}
#[test]
fn buffer_works() {
let cap = NonZeroUsize::new(3).unwrap();
let mut buf = ValidatorGroupsBuffer::with_capacity(cap);
let hashes: Vec<_> = (0..5).map(Hash::repeat_byte).collect();
let validators: Vec<_> = [
Sr25519Keyring::Alice,
Sr25519Keyring::Bob,
Sr25519Keyring::Charlie,
Sr25519Keyring::Dave,
Sr25519Keyring::Ferdie,
]
.into_iter()
.map(|key| AuthorityDiscoveryId::from(key.public()))
.collect();
buf.note_collation_advertised(hashes[0], 0, GroupIndex(0), &validators[..2]);
buf.note_collation_advertised(hashes[1], 0, GroupIndex(0), &validators[..2]);
buf.note_collation_advertised(hashes[2], 0, GroupIndex(1), &validators[2..4]);
buf.note_collation_advertised(hashes[2], 0, GroupIndex(1), &validators[2..4]);
assert_eq!(buf.validators_to_connect(), validators[..4].to_vec());
for validator in &validators[2..4] {
buf.reset_validator_interest(hashes[2], validator);
}
buf.reset_validator_interest(hashes[1], &validators[0]);
assert_eq!(buf.validators_to_connect(), validators[..2].to_vec());
buf.reset_validator_interest(hashes[0], &validators[0]);
assert_eq!(buf.validators_to_connect(), vec![validators[1].clone()]);
buf.note_collation_advertised(hashes[3], 0, GroupIndex(1), &validators[2..4]);
buf.note_collation_advertised(
hashes[4],
0,
GroupIndex(2),
std::slice::from_ref(&validators[4]),
);
buf.reset_validator_interest(hashes[3], &validators[2]);
buf.note_collation_advertised(
hashes[4],
0,
GroupIndex(3),
std::slice::from_ref(&validators[0]),
);
assert_eq!(
buf.validators_to_connect(),
vec![validators[3].clone(), validators[4].clone(), validators[0].clone()]
);
}
}
@@ -21,9 +21,12 @@
#![deny(unused_crate_dependencies)]
#![recursion_limit = "256"]
use std::time::Duration;
use std::time::{Duration, Instant};
use futures::{FutureExt, TryFutureExt};
use futures::{
stream::{FusedStream, StreamExt},
FutureExt, TryFutureExt,
};
use sp_keystore::SyncCryptoStorePtr;
@@ -134,3 +137,23 @@ async fn modify_reputation(
sender.send_message(NetworkBridgeTxMessage::ReportPeer(peer, rep)).await;
}
/// Wait until tick and return the timestamp for the following one.
async fn wait_until_next_tick(last_poll: Instant, period: Duration) -> Instant {
let now = Instant::now();
let next_poll = last_poll + period;
if next_poll > now {
futures_timer::Delay::new(next_poll - now).await
}
Instant::now()
}
/// Returns an infinite stream that yields with an interval of `period`.
fn tick_stream(period: Duration) -> impl FusedStream<Item = ()> {
futures::stream::unfold(Instant::now(), move |next_check| async move {
Some(((), wait_until_next_tick(next_check, period).await))
})
.fuse()
}
@@ -19,7 +19,7 @@ use futures::{
channel::oneshot,
future::{BoxFuture, Fuse, FusedFuture},
select,
stream::{FusedStream, FuturesUnordered},
stream::FuturesUnordered,
FutureExt, StreamExt,
};
use futures_timer::Delay;
@@ -57,7 +57,7 @@ use polkadot_primitives::v2::{CandidateReceipt, CollatorId, Hash, Id as ParaId};
use crate::error::Result;
use super::{modify_reputation, LOG_TARGET};
use super::{modify_reputation, tick_stream, LOG_TARGET};
#[cfg(test)]
mod tests;
@@ -97,7 +97,7 @@ const ACTIVITY_POLL: Duration = Duration::from_millis(10);
// How often to poll collation responses.
// This is a hack that should be removed in a refactoring.
// See https://github.com/paritytech/polkadot/issues/4182
const CHECK_COLLATIONS_POLL: Duration = Duration::from_millis(5);
const CHECK_COLLATIONS_POLL: Duration = Duration::from_millis(50);
#[derive(Clone, Default)]
pub struct Metrics(Option<MetricsInner>);
@@ -1167,25 +1167,6 @@ async fn process_msg<Context>(
}
}
// wait until next inactivity check. returns the instant for the following check.
async fn wait_until_next_check(last_poll: Instant) -> Instant {
let now = Instant::now();
let next_poll = last_poll + ACTIVITY_POLL;
if next_poll > now {
Delay::new(next_poll - now).await
}
Instant::now()
}
fn infinite_stream(every: Duration) -> impl FusedStream<Item = ()> {
futures::stream::unfold(Instant::now() + every, |next_check| async move {
Some(((), wait_until_next_check(next_check).await))
})
.fuse()
}
/// The main run loop.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
pub(crate) async fn run<Context>(
@@ -1196,10 +1177,10 @@ pub(crate) async fn run<Context>(
) -> std::result::Result<(), crate::error::FatalError> {
let mut state = State { metrics, ..Default::default() };
let next_inactivity_stream = infinite_stream(ACTIVITY_POLL);
let next_inactivity_stream = tick_stream(ACTIVITY_POLL);
futures::pin_mut!(next_inactivity_stream);
let check_collations_stream = infinite_stream(CHECK_COLLATIONS_POLL);
let check_collations_stream = tick_stream(CHECK_COLLATIONS_POLL);
futures::pin_mut!(check_collations_stream);
loop {