NetworkBridge: validator (authorities) discovery api (#1699)

* stupid, but it compiles

* redo

* cleanup

* add ValidatorDiscovery to msgs

* sketch network bridge code

* ConnectToAuthorities instead of validators

* more stuff

* cleanup

* more stuff

* complete ConnectToAuthoritiesState

* Update node/network/bridge/src/lib.rs

Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* Collator protocol subsystem (#1659)

* WIP

* The initial implementation of the collator side.

* Improve comments

* Multiple collation requests

* Add more tests and comments to validator side

* Add comments, remove dead code

* Apply suggestions from code review

Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* Fix build after suggested changes

* Also connect to the next validator group

* Remove a Future impl and move TimeoutExt to util

* Minor nits

* Fix build

* Change FetchCollations back to FetchCollation

* Try this

* Final fixes

* Fix build

Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* handle multiple in-flight connection requests

* handle cancelled requests

* Update node/core/runtime-api/src/lib.rs

Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>

* redo it again

* more stuff

* redo it again

* update comments

* workaround Future is not Send

* fix trailing spaces

* clarify comments

* bridge: fix compilation in tests

* update more comments

* small fixes

* port collator protocol to new validator discovery api

* collator tests compile

* collator tests pass

* do not revoke a request when the stream receiver is closed

* make revoking opt-in

* fix is_fulfilled

* handle request revokation in collator

* tests

* wait for validator connections asyncronously

* fix compilation

* relabel my todos

* apply Fedor's patch

* resolve reconnection TODO

* resolve revoking TODO

* resolve channel capacity TODO

* resolve peer cloning TODO

* resolve peer disconnected TODO

* resolve PeerSet TODO

* wip tests

* more tests

* resolve Arc TODO

* rename pending to non_revoked

* one more test

* extract utility function into util crate

* fix compilation in tests

* Apply suggestions from code review

Co-authored-by: Fedor Sakharov <fedor.sakharov@gmail.com>

* revert pin_project removal

* fix while let loop

* Revert "revert pin_project removal"

This reverts commit ae7f529d8de982ef66c3007dd1ff74c6ddce80d2.

* fix compilation

* Update node/subsystem/src/messages.rs

* docs on pub items

* guide updates

* remove a TODO

* small guide update

* fix a typo

* link to the issue

* validator discovery: on_request docs

Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
Co-authored-by: Fedor Sakharov <fedor.sakharov@gmail.com>
Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>
This commit is contained in:
Andronik Ordian
2020-10-06 13:34:57 +02:00
committed by GitHub
parent 96f6b5ae2d
commit ca89e3edbe
20 changed files with 1102 additions and 124 deletions
@@ -16,13 +16,17 @@
use std::collections::HashMap;
use super::{TARGET, Result};
use futures::channel::oneshot;
use log::{trace, warn};
use futures::stream::StreamExt as _;
use futures::task::Poll;
use log::warn;
use polkadot_primitives::v1::{
CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt,
PoV, ValidatorId,
};
use super::{TARGET, Result};
use polkadot_subsystem::{
FromOverseer, OverseerSignal, SubsystemContext,
messages::{
@@ -31,9 +35,10 @@ use polkadot_subsystem::{
},
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, View, PeerId, PeerSet, NetworkBridgeEvent, RequestId,
v1 as protocol_v1, View, PeerId, NetworkBridgeEvent, RequestId,
};
use polkadot_node_subsystem_util::{
validator_discovery,
request_validators_ctx,
request_validator_groups_ctx,
metrics::{self, prometheus},
@@ -119,6 +124,9 @@ struct State {
/// go out of scope with their respective deactivated leafs.
known_validators: HashMap<PeerId, ValidatorId>,
/// Use to await for the next validator connection and revoke the request.
last_connection_request: Option<validator_discovery::ConnectionRequest>,
/// Metrics.
metrics: Metrics,
}
@@ -188,7 +196,7 @@ where
state.our_validators_groups.insert(relay_parent, our_validators.clone());
// Issue a discovery request for the validators of the current group and the next group.
connect_to_validators(ctx, state, our_validators).await?;
connect_to_validators(ctx, relay_parent, state, our_validators).await?;
state.collations.insert(relay_parent, (receipt, pov));
@@ -289,27 +297,29 @@ where
Ok(())
}
/// Issue a connection request to a set of validators.
/// Issue a connection request to a set of validators and
/// revoke the previous connection request.
async fn connect_to_validators<Context>(
ctx: &mut Context,
relay_parent: Hash,
state: &mut State,
validators: Vec<ValidatorId>,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::ConnectToValidators(PeerSet::Collation, validators, tx),
)).await?;
let mut validators_ids = rx.await?;
for id in validators_ids.drain(..) {
state.known_validators.insert(id.1, id.0);
if let Some(request) = state.last_connection_request.take() {
request.revoke();
}
let request = validator_discovery::connect_to_validators(
ctx,
relay_parent,
validators,
).await?;
state.last_connection_request = Some(request);
Ok(())
}
@@ -533,11 +543,10 @@ where
Ok(())
}
/// A peer is connected.
/// A validator is connected.
///
/// We first want to check if this is a validator we are expecting to talk to
/// and if so `Declare` that we are a collator with a given `CollatorId`.
async fn handle_peer_connected<Context>(
/// `Declare` that we are a collator with a given `CollatorId`.
async fn handle_validator_connected<Context>(
ctx: &mut Context,
state: &mut State,
peer_id: PeerId,
@@ -545,12 +554,6 @@ async fn handle_peer_connected<Context>(
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
if !state.known_validators.contains_key(&peer_id) {
trace!(target: TARGET, "An unknown peer has connected {:?}", peer_id);
return Ok(())
}
state.peer_views.entry(peer_id.clone()).or_default();
declare(ctx, state, vec![peer_id]).await?;
@@ -570,8 +573,8 @@ where
use NetworkBridgeEvent::*;
match bridge_message {
PeerConnected(peer_id, _observed_role) => {
handle_peer_connected(ctx, state, peer_id).await?;
PeerConnected(_peer_id, _observed_role) => {
// validators first connection is handled by `handle_validator_connected`
}
PeerViewChange(peer_id, view) => {
handle_peer_view_change(ctx, state, peer_id, view).await?;
@@ -602,8 +605,8 @@ async fn handle_our_view_change(
let removed = old_view.difference(&view).collect::<Vec<_>>();
for removed in removed.into_iter() {
state.collations.remove(&removed);
if let Some(group) = state.our_validators_groups.remove(&removed) {
state.collations.remove(removed);
if let Some(group) = state.our_validators_groups.remove(removed) {
state.known_validators.retain(|_, v| !group.contains(v));
}
}
@@ -631,25 +634,43 @@ where
state.our_id = our_id;
loop {
match ctx.recv().await? {
Communication { msg } => process_msg(&mut ctx, &mut state, msg).await?,
Signal(ActiveLeaves(_update)) => {}
Signal(BlockFinalized(_)) => {}
Signal(Conclude) => break,
if let Some(mut request) = state.last_connection_request.take() {
while let Poll::Ready(Some((validator_id, peer_id))) = futures::poll!(request.next()) {
state.known_validators.insert(peer_id.clone(), validator_id);
if let Err(err) = handle_validator_connected(&mut ctx, &mut state, peer_id).await {
warn!(
target: TARGET,
"Failed to declare our collator id: {:?}",
err,
);
}
}
// put it back
state.last_connection_request = Some(request);
}
}
Ok(())
while let Poll::Ready(msg) = futures::poll!(ctx.recv()) {
match msg? {
Communication { msg } => process_msg(&mut ctx, &mut state, msg).await?,
Signal(ActiveLeaves(_update)) => {}
Signal(BlockFinalized(_)) => {}
Signal(Conclude) => return Ok(()),
}
}
futures::pending!()
}
}
#[cfg(test)]
mod tests {
use super::*;
use log::trace;
use std::time::Duration;
use futures::{executor, future, Future};
use assert_matches::assert_matches;
use futures::{executor, future, Future};
use log::trace;
use smallvec::smallvec;
use sp_core::crypto::Pair;
@@ -657,12 +678,11 @@ mod tests {
use polkadot_primitives::v1::{
BlockData, CandidateDescriptor, CollatorPair, ScheduledCore,
ValidatorIndex, GroupRotationInfo,
ValidatorIndex, GroupRotationInfo, AuthorityDiscoveryId,
};
use polkadot_subsystem::ActiveLeavesUpdate;
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_node_network_protocol::ObservedRole;
#[derive(Default)]
struct TestCandidateBuilder {
@@ -691,6 +711,7 @@ mod tests {
chain_ids: Vec<ParaId>,
validators: Vec<Sr25519Keyring>,
validator_public: Vec<ValidatorId>,
validator_authority_id: Vec<AuthorityDiscoveryId>,
validator_peer_id: Vec<PeerId>,
validator_groups: (Vec<Vec<ValidatorIndex>>, GroupRotationInfo),
relay_parent: Hash,
@@ -702,6 +723,10 @@ mod tests {
val_ids.iter().map(|v| v.public().into()).collect()
}
fn validator_authority_id(val_ids: &[Sr25519Keyring]) -> Vec<AuthorityDiscoveryId> {
val_ids.iter().map(|v| v.public().into()).collect()
}
impl Default for TestState {
fn default() -> Self {
let chain_a = ParaId::from(1);
@@ -718,6 +743,7 @@ mod tests {
];
let validator_public = validator_pubkeys(&validators);
let validator_authority_id = validator_authority_id(&validators);
let validator_peer_id = std::iter::repeat_with(|| PeerId::random())
.take(validator_public.len())
@@ -750,6 +776,7 @@ mod tests {
chain_ids,
validators,
validator_public,
validator_authority_id,
validator_peer_id,
validator_groups,
relay_parent,
@@ -925,62 +952,82 @@ mod tests {
}
);
// We now should connect to our validator group.
// obtain the validator_id to authority_id mapping
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ConnectToValidators(
peer_set,
validators,
tx,
)
) => {
assert_eq!(peer_set, PeerSet::Collation);
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::ValidatorDiscovery(validators, tx),
)) => {
assert_eq!(relay_parent, current);
assert_eq!(validators.len(), 4);
assert!(validators.contains(&test_state.validator_public[2]));
assert!(validators.contains(&test_state.validator_public[0]));
assert!(validators.contains(&test_state.validator_public[4]));
assert!(validators.contains(&test_state.validator_public[1]));
tx.send(vec![
(test_state.validator_public[2].clone(), test_state.validator_peer_id[2].clone()),
(test_state.validator_public[0].clone(), test_state.validator_peer_id[0].clone()),
(test_state.validator_public[4].clone(), test_state.validator_peer_id[4].clone()),
(test_state.validator_public[1].clone(), test_state.validator_peer_id[1].clone()),
]).unwrap();
let result = vec![
Some(test_state.validator_authority_id[2].clone()),
Some(test_state.validator_authority_id[0].clone()),
Some(test_state.validator_authority_id[4].clone()),
Some(test_state.validator_authority_id[1].clone()),
];
tx.send(Ok(result)).unwrap();
}
);
// Validator 2 connects.
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(
test_state.validator_peer_id[2].clone(),
ObservedRole::Authority,
)
),
).await;
// We declare to the connected validator that we are a collator.
// We now should connect to our validator group.
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
to,
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
NetworkBridgeMessage::ConnectToValidators {
validator_ids,
mut connected,
..
}
) => {
assert_eq!(to, vec![test_state.validator_peer_id[2].clone()]);
assert_matches!(
wire_message,
protocol_v1::CollatorProtocolMessage::Declare(collator_id) => {
assert_eq!(collator_id, test_state.our_collator_pair.public());
}
);
assert_eq!(validator_ids.len(), 4);
assert!(validator_ids.contains(&test_state.validator_authority_id[2]));
assert!(validator_ids.contains(&test_state.validator_authority_id[0]));
assert!(validator_ids.contains(&test_state.validator_authority_id[4]));
assert!(validator_ids.contains(&test_state.validator_authority_id[1]));
let result = vec![
(test_state.validator_authority_id[2].clone(), test_state.validator_peer_id[2].clone()),
(test_state.validator_authority_id[0].clone(), test_state.validator_peer_id[0].clone()),
(test_state.validator_authority_id[4].clone(), test_state.validator_peer_id[4].clone()),
(test_state.validator_authority_id[1].clone(), test_state.validator_peer_id[1].clone()),
];
for (id, peer_id) in result.into_iter() {
connected.try_send((id, peer_id)).unwrap();
}
}
);
// We declare to the connected validators that we are a collator.
// We need to catch all `Declare` messages to the validators we've
// previosly connected to.
for i in vec![2, 0, 4, 1].into_iter() {
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
to,
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
) => {
assert_eq!(to, vec![test_state.validator_peer_id[i].clone()]);
assert_matches!(
wire_message,
protocol_v1::CollatorProtocolMessage::Declare(collator_id) => {
assert_eq!(collator_id, test_state.our_collator_pair.public());
}
);
}
);
}
// Send info about peer's view.
overseer_send(
&mut virtual_overseer,
@@ -59,6 +59,16 @@ enum Error {
Prometheus(prometheus::PrometheusError),
}
impl From<util::validator_discovery::Error> for Error {
fn from(me: util::validator_discovery::Error) -> Self {
match me {
util::validator_discovery::Error::Subsystem(s) => Error::Subsystem(s),
util::validator_discovery::Error::RuntimeApi(ra) => Error::RuntimeApi(ra),
util::validator_discovery::Error::Oneshot(c) => Error::Oneshot(c),
}
}
}
type Result<T> = std::result::Result<T, Error>;
enum ProtocolSide {