mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 17:01:09 +00:00
Rework ConnectionsRequests (#2081)
* Rework `ConnectionsRequests` Instead of implementing the `Stream` trait, this struct now provides a function `next()`. This enables us to encode into the type system that it will always return a value or block indefinitely. * Review feedback
This commit is contained in:
@@ -18,7 +18,7 @@ use std::collections::{HashMap, HashSet};
|
||||
|
||||
use super::{LOG_TARGET, Result};
|
||||
|
||||
use futures::{StreamExt, select, FutureExt};
|
||||
use futures::{select, FutureExt};
|
||||
|
||||
use polkadot_primitives::v1::{
|
||||
CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, PoV, ValidatorId,
|
||||
@@ -691,21 +691,15 @@ pub(crate) async fn run(
|
||||
|
||||
loop {
|
||||
select! {
|
||||
res = state.connection_requests.next() => {
|
||||
let (relay_parent, validator_id, peer_id) = match res {
|
||||
Some(res) => res,
|
||||
// Will never happen, but better to be safe.
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
res = state.connection_requests.next().fuse() => {
|
||||
let _timer = state.metrics.time_handle_connection_request();
|
||||
|
||||
handle_validator_connected(
|
||||
&mut ctx,
|
||||
&mut state,
|
||||
peer_id,
|
||||
validator_id,
|
||||
relay_parent,
|
||||
res.peer_id,
|
||||
res.validator_id,
|
||||
res.relay_parent,
|
||||
).await;
|
||||
},
|
||||
msg = ctx.recv().fuse() => match msg? {
|
||||
|
||||
@@ -700,14 +700,7 @@ impl PoVDistribution {
|
||||
// peer view update messages may be racy and we want connection notifications
|
||||
// first.
|
||||
futures::select_biased! {
|
||||
v = state.connection_requests.next() => {
|
||||
match v {
|
||||
Some((_relay_parent, _validator_id, peer_id)) => {
|
||||
handle_validator_connected(&mut state, peer_id);
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
v = state.connection_requests.next().fuse() => handle_validator_connected(&mut state, v.peer_id),
|
||||
v = ctx.recv().fuse() => {
|
||||
match v? {
|
||||
FromOverseer::Signal(signal) => if handle_signal(
|
||||
@@ -743,10 +736,8 @@ impl PoVDistribution {
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ use futures::{
|
||||
channel::mpsc,
|
||||
task::{Poll, self},
|
||||
stream,
|
||||
StreamExt,
|
||||
};
|
||||
use streamunordered::{StreamUnordered, StreamYield};
|
||||
|
||||
@@ -113,33 +114,60 @@ async fn connect_to_authorities<Context: SubsystemContext>(
|
||||
connected_rx
|
||||
}
|
||||
|
||||
/// A struct that assists performing multiple concurrent connection requests.
|
||||
/// Represents a discovered validator.
|
||||
///
|
||||
/// This allows concurrent connections to validator sets at different `relay_parents`
|
||||
/// and multiplexes their results into a single `Stream`.
|
||||
#[derive(Default)]
|
||||
pub struct ConnectionRequests {
|
||||
// added connection requests relay_parent -> StreamUnordered token
|
||||
id_map: HashMap<Hash, usize>,
|
||||
|
||||
// Connection requests themselves.
|
||||
requests: StreamUnordered<ConnectionRequest>,
|
||||
/// Result of [`ConnectionRequests::next`].
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct DiscoveredValidator {
|
||||
/// The relay parent associated with the connection request that returned a result.
|
||||
pub relay_parent: Hash,
|
||||
/// The [`ValidatorId`] that was resolved.
|
||||
pub validator_id: ValidatorId,
|
||||
/// The [`PeerId`] associated to the validator id.
|
||||
pub peer_id: PeerId,
|
||||
}
|
||||
|
||||
impl stream::FusedStream for ConnectionRequests {
|
||||
fn is_terminated(&self) -> bool {
|
||||
false
|
||||
/// Used by [`ConnectionRequests::requests`] to map a [`ConnectionRequest`] item to a [`DiscoveredValidator`].
|
||||
struct ConnectionRequestForRelayParent {
|
||||
request: ConnectionRequest,
|
||||
relay_parent: Hash,
|
||||
}
|
||||
|
||||
impl stream::Stream for ConnectionRequestForRelayParent {
|
||||
type Item = DiscoveredValidator;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
|
||||
self.request
|
||||
.poll_next_unpin(cx)
|
||||
.map(|r| r.map(|(validator_id, peer_id)| DiscoveredValidator {
|
||||
validator_id,
|
||||
peer_id,
|
||||
relay_parent: self.relay_parent,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
/// A struct that assists performing multiple concurrent connection requests.
|
||||
///
|
||||
/// This allows concurrent connections to validator sets at different `relay_parents`.
|
||||
/// Use [`ConnectionRequests::next`] to wait for results of the added connection requests.
|
||||
#[derive(Default)]
|
||||
pub struct ConnectionRequests {
|
||||
/// Connection requests relay_parent -> StreamUnordered token
|
||||
id_map: HashMap<Hash, usize>,
|
||||
|
||||
/// Connection requests themselves.
|
||||
requests: StreamUnordered<ConnectionRequestForRelayParent>,
|
||||
}
|
||||
|
||||
impl ConnectionRequests {
|
||||
/// Insert a new connection request.
|
||||
///
|
||||
/// If a `ConnectionRequest` under a given `relay_parent` already exists it will
|
||||
/// be revoked and substituted with a new one.
|
||||
/// be revoked and substituted with the given one.
|
||||
pub fn put(&mut self, relay_parent: Hash, request: ConnectionRequest) {
|
||||
self.remove(&relay_parent);
|
||||
let token = self.requests.push(request);
|
||||
let token = self.requests.push(ConnectionRequestForRelayParent { relay_parent, request });
|
||||
|
||||
self.id_map.insert(relay_parent, token);
|
||||
}
|
||||
@@ -155,39 +183,23 @@ impl ConnectionRequests {
|
||||
pub fn contains_request(&self, relay_parent: &Hash) -> bool {
|
||||
self.id_map.contains_key(relay_parent)
|
||||
}
|
||||
}
|
||||
|
||||
impl stream::Stream for ConnectionRequests {
|
||||
/// (relay_parent, validator_id, peer_id).
|
||||
type Item = (Hash, ValidatorId, PeerId);
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
|
||||
// If there are currently no requests going on, pend instead of
|
||||
// polling `StreamUnordered` which would lead to it terminating
|
||||
// and returning `Poll::Ready(None)`.
|
||||
if self.requests.is_empty() {
|
||||
return Poll::Pending;
|
||||
/// Returns the next available connection request result.
|
||||
///
|
||||
/// # Note
|
||||
///
|
||||
/// When there are no active requests this will wait indefinitely, like an always pending future.
|
||||
pub async fn next(&mut self) -> DiscoveredValidator {
|
||||
loop {
|
||||
match self.requests.next().await {
|
||||
Some((StreamYield::Item(item), _)) => {
|
||||
return item
|
||||
},
|
||||
// Ignore finished requests, they are required to be removed.
|
||||
Some((StreamYield::Finished(_), _)) => (),
|
||||
None => futures::pending!(),
|
||||
}
|
||||
}
|
||||
|
||||
match Pin::new(&mut self.requests).poll_next(cx) {
|
||||
Poll::Ready(Some((yielded, token))) => {
|
||||
match yielded {
|
||||
StreamYield::Item(item) => {
|
||||
if let Some((relay_parent, _)) = self.id_map.iter()
|
||||
.find(|(_, &val)| val == token)
|
||||
{
|
||||
return Poll::Ready(Some((*relay_parent, item.0, item.1)));
|
||||
}
|
||||
}
|
||||
StreamYield::Finished(_) => {
|
||||
// `ConnectionRequest` is fullfilled, but not revoked
|
||||
}
|
||||
}
|
||||
},
|
||||
_ => {},
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
@@ -231,14 +243,20 @@ mod tests {
|
||||
use polkadot_primitives::v1::ValidatorPair;
|
||||
use sp_core::{Pair, Public};
|
||||
|
||||
use futures::{executor, poll, StreamExt, SinkExt};
|
||||
use futures::{executor, poll, SinkExt};
|
||||
|
||||
async fn check_next_is_pending(connection_requests: &mut ConnectionRequests) {
|
||||
let next = connection_requests.next();
|
||||
futures::pin_mut!(next);
|
||||
assert_eq!(poll!(next), Poll::Pending);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn adding_a_connection_request_works() {
|
||||
let mut connection_requests = ConnectionRequests::default();
|
||||
|
||||
executor::block_on(async move {
|
||||
assert_eq!(poll!(Pin::new(&mut connection_requests).next()), Poll::Pending);
|
||||
check_next_is_pending(&mut connection_requests).await;
|
||||
|
||||
let validator_1 = ValidatorPair::generate().0.public();
|
||||
let validator_2 = ValidatorPair::generate().0.public();
|
||||
@@ -267,16 +285,19 @@ mod tests {
|
||||
rq1_tx.send((auth_1, peer_id_1.clone())).await.unwrap();
|
||||
rq1_tx.send((auth_2, peer_id_2.clone())).await.unwrap();
|
||||
|
||||
let res = Pin::new(&mut connection_requests).next().await.unwrap();
|
||||
assert_eq!(res, (relay_parent_1, validator_1, peer_id_1));
|
||||
|
||||
let res = Pin::new(&mut connection_requests).next().await.unwrap();
|
||||
assert_eq!(res, (relay_parent_1, validator_2, peer_id_2));
|
||||
|
||||
let res = connection_requests.next().await;
|
||||
assert_eq!(
|
||||
poll!(Pin::new(&mut connection_requests).next()),
|
||||
Poll::Pending,
|
||||
res,
|
||||
DiscoveredValidator { relay_parent: relay_parent_1, validator_id: validator_1, peer_id: peer_id_1 },
|
||||
);
|
||||
|
||||
let res = connection_requests.next().await;
|
||||
assert_eq!(
|
||||
res,
|
||||
DiscoveredValidator { relay_parent: relay_parent_1, validator_id: validator_2, peer_id: peer_id_2 },
|
||||
);
|
||||
|
||||
check_next_is_pending(&mut connection_requests).await;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -285,7 +306,7 @@ mod tests {
|
||||
let mut connection_requests = ConnectionRequests::default();
|
||||
|
||||
executor::block_on(async move {
|
||||
assert_eq!(poll!(Pin::new(&mut connection_requests).next()), Poll::Pending);
|
||||
check_next_is_pending(&mut connection_requests).await;
|
||||
|
||||
let validator_1 = ValidatorPair::generate().0.public();
|
||||
let validator_2 = ValidatorPair::generate().0.public();
|
||||
@@ -325,16 +346,19 @@ mod tests {
|
||||
rq1_tx.send((auth_1, peer_id_1.clone())).await.unwrap();
|
||||
rq2_tx.send((auth_2, peer_id_2.clone())).await.unwrap();
|
||||
|
||||
let res = Pin::new(&mut connection_requests).next().await.unwrap();
|
||||
assert_eq!(res, (relay_parent_1, validator_1, peer_id_1));
|
||||
|
||||
let res = Pin::new(&mut connection_requests).next().await.unwrap();
|
||||
assert_eq!(res, (relay_parent_2, validator_2, peer_id_2));
|
||||
|
||||
let res = connection_requests.next().await;
|
||||
assert_eq!(
|
||||
poll!(Pin::new(&mut connection_requests).next()),
|
||||
Poll::Pending,
|
||||
res,
|
||||
DiscoveredValidator { relay_parent: relay_parent_1, validator_id: validator_1, peer_id: peer_id_1 },
|
||||
);
|
||||
|
||||
let res = connection_requests.next().await;
|
||||
assert_eq!(
|
||||
res,
|
||||
DiscoveredValidator { relay_parent: relay_parent_2, validator_id: validator_2, peer_id: peer_id_2 },
|
||||
);
|
||||
|
||||
check_next_is_pending(&mut connection_requests).await;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -343,7 +367,7 @@ mod tests {
|
||||
let mut connection_requests = ConnectionRequests::default();
|
||||
|
||||
executor::block_on(async move {
|
||||
assert_eq!(poll!(Pin::new(&mut connection_requests).next()), Poll::Pending);
|
||||
check_next_is_pending(&mut connection_requests).await;
|
||||
|
||||
let validator_1 = ValidatorPair::generate().0.public();
|
||||
let validator_2 = ValidatorPair::generate().0.public();
|
||||
@@ -380,8 +404,8 @@ mod tests {
|
||||
|
||||
rq1_tx.send((auth_1.clone(), peer_id_1.clone())).await.unwrap();
|
||||
|
||||
let res = Pin::new(&mut connection_requests).next().await.unwrap();
|
||||
assert_eq!(res, (relay_parent, validator_1, peer_id_1.clone()));
|
||||
let res = connection_requests.next().await;
|
||||
assert_eq!(res, DiscoveredValidator { relay_parent, validator_id: validator_1, peer_id: peer_id_1.clone() });
|
||||
|
||||
connection_requests.put(relay_parent.clone(), connection_request_2);
|
||||
|
||||
@@ -389,13 +413,10 @@ mod tests {
|
||||
|
||||
rq2_tx.send((auth_2, peer_id_2.clone())).await.unwrap();
|
||||
|
||||
let res = Pin::new(&mut connection_requests).next().await.unwrap();
|
||||
assert_eq!(res, (relay_parent, validator_2, peer_id_2));
|
||||
let res = connection_requests.next().await;
|
||||
assert_eq!(res, DiscoveredValidator { relay_parent, validator_id: validator_2, peer_id: peer_id_2 });
|
||||
|
||||
assert_eq!(
|
||||
poll!(Pin::new(&mut connection_requests).next()),
|
||||
Poll::Pending,
|
||||
);
|
||||
check_next_is_pending(&mut connection_requests).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user