Dispute distribution implementation (#3282)

* Dispute protocol.

* Dispute distribution protocol.

* Get network requests routed.

* WIP: Basic dispute sender logic.

* Basic validator determination logic.

* WIP: Getting things to typecheck.

* Slightly larger timeout.

* More typechecking stuff.

* Cleanup.

* Finished most of the sending logic.

* Handle active leaves updates

- Cleanup dead disputes
- Update sends for new sessions
- Retry on errors

* Pass sessions in already.

* Startup dispute sending.

* Provide incoming decoding facilities

and use them in statement-distribution.

* Relaxed runtime util requirements.

We only need a `SubsystemSender` not a full `SubsystemContext`.

* Better usability of incoming requests.

Make it possible to consume stuff without clones.

* Add basic receiver functionality.

* Cleanup + fixes for sender.

* One more sender fix.

* Start receiver.

* Make sure to send responses back.

* WIP: Exposed authority discovery

* Make tests pass.

* Fully featured receiver.

* Decrease cost of `NotAValidator`.

* Make `RuntimeInfo` LRU cache size configurable.

* Cache more sessions.

* Fix collator protocol.

* Disable metrics for now.

* Make dispute-distribution a proper subsystem.

* Fix naming.

* Code style fixes.

* Factored out 4x copied mock function.

* WIP: Tests.

* Whitespace cleanup.

* Accessor functions.

* More testing.

* More Debug instances.

* Fix busy loop.

* Working tests.

* More tests.

* Cleanup.

* Fix build.

* Basic receiving test.

* Non validator message gets dropped.

* More receiving tests.

* Test nested and subsequent imports.

* Fix spaces.

* Better formatted imports.

* Import cleanup.

* Metrics.

* Message -> MuxedMessage

* Message -> MuxedMessage

* More review remarks.

* Add missing metrics.rs.

* Fix flaky test.

* Dispute coordinator - deliver confirmations.

* Send out `DisputeMessage` on issue local statement.

* Unwire dispute distribution.

* Review remarks.

* Review remarks.

* Better docs.
This commit is contained in:
Robert Klotzner
2021-07-09 04:29:53 +02:00
committed by GitHub
parent 20993b32b1
commit b5257b2407
52 changed files with 4040 additions and 407 deletions
@@ -27,7 +27,6 @@ polkadot-subsystem-testhelpers = { package = "polkadot-node-subsystem-test-helpe
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures-timer = "3.0.2"
assert_matches = "1.4.0"
@@ -19,10 +19,8 @@
use futures::{FutureExt, channel::oneshot, future::BoxFuture};
use polkadot_subsystem::jaeger;
use polkadot_node_network_protocol::{
request_response::{OutgoingRequest, Recipient, request::{RequestError, Requests},
v1::{PoVFetchingRequest, PoVFetchingResponse}}
};
use polkadot_node_network_protocol::request_response::{OutgoingRequest, Recipient, request::{RequestError, Requests},
v1::{PoVFetchingRequest, PoVFetchingResponse}};
use polkadot_primitives::v1::{
CandidateHash, Hash, ValidatorIndex,
};
@@ -49,7 +47,7 @@ pub async fn fetch_pov<Context>(
where
Context: SubsystemContext,
{
let info = &runtime.get_session_info(ctx, parent).await?.session_info;
let info = &runtime.get_session_info(ctx.sender(), parent).await?.session_info;
let authority_id = info.discovery_keys.get(from_validator.0 as usize)
.ok_or(NonFatal::InvalidValidatorIndex)?
.clone();
@@ -129,11 +127,12 @@ mod tests {
use polkadot_primitives::v1::{CandidateHash, Hash, ValidatorIndex};
use polkadot_node_primitives::BlockData;
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_subsystem::messages::{AllMessages, AvailabilityDistributionMessage, RuntimeApiMessage, RuntimeApiRequest};
use polkadot_subsystem::messages::{AvailabilityDistributionMessage, RuntimeApiMessage, RuntimeApiRequest, AllMessages};
use test_helpers::mock::make_ferdie_keystore;
use super::*;
use crate::LOG_TARGET;
use crate::tests::mock::{make_session_info, make_ferdie_keystore};
use crate::tests::mock::{make_session_info};
#[test]
fn rejects_invalid_pov() {
@@ -106,7 +106,7 @@ impl SessionCache {
Context: SubsystemContext,
F: FnOnce(&SessionInfo) -> R,
{
let session_index = runtime.get_session_index(ctx, parent).await?;
let session_index = runtime.get_session_index(ctx.sender(), parent).await?;
if let Some(o_info) = self.session_info_cache.get(&session_index) {
tracing::trace!(target: LOG_TARGET, session_index, "Got session from lru");
@@ -183,7 +183,7 @@ impl SessionCache {
where
Context: SubsystemContext,
{
let info = runtime.get_session_info_by_index(ctx, parent, session_index).await?;
let info = runtime.get_session_info_by_index(ctx.sender(), parent, session_index).await?;
let discovery_keys = info.session_info.discovery_keys.clone();
let mut validator_groups = info.session_info.validator_groups.clone();
@@ -19,30 +19,16 @@
use std::sync::Arc;
use sc_keystore::LocalKeystore;
use sp_keyring::Sr25519Keyring;
use sp_application_crypto::AppKey;
use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
use polkadot_primitives::v1::{
CandidateCommitments, CandidateDescriptor, CandidateHash,
CommittedCandidateReceipt, GroupIndex, Hash, HeadData, Id as ParaId,
OccupiedCore, PersistedValidationData, SessionInfo, ValidatorId, ValidatorIndex
OccupiedCore, PersistedValidationData, SessionInfo, ValidatorIndex
};
use polkadot_node_primitives::{PoV, ErasureChunk, AvailableData, BlockData};
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
/// Get mock keystore with `Ferdie` key.
pub fn make_ferdie_keystore() -> SyncCryptoStorePtr {
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
SyncCryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&Sr25519Keyring::Ferdie.to_seed()),
)
.expect("Insert key into keystore");
keystore
}
/// Create dummy session info with two validator groups.
pub fn make_session_info() -> SessionInfo {
@@ -46,9 +46,9 @@ use polkadot_node_network_protocol::{
request_response::{IncomingRequest, OutgoingRequest, Requests, v1}
};
use polkadot_subsystem_testhelpers as test_helpers;
use test_helpers::SingleItemSink;
use test_helpers::{SingleItemSink, mock::make_ferdie_keystore};
use super::mock::{make_session_info, OccupiedCoreBuilder, make_ferdie_keystore};
use super::mock::{make_session_info, OccupiedCoreBuilder};
use crate::LOG_TARGET;
type VirtualOverseer = test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>;
-1
View File
@@ -10,7 +10,6 @@ futures = "0.3.15"
tracing = "0.1.26"
polkadot-primitives = { path = "../../../primitives" }
parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] }
sc-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
+16 -5
View File
@@ -24,6 +24,7 @@ use parity_scale_codec::{Encode, Decode};
use parking_lot::Mutex;
use futures::prelude::*;
use futures::stream::BoxStream;
use polkadot_subsystem::messages::DisputeDistributionMessage;
use sc_network::Event as NetworkEvent;
use sp_consensus::SyncOracle;
@@ -57,7 +58,8 @@ use polkadot_node_subsystem_util::metrics::{self, prometheus};
/// To be added to [`NetworkConfiguration::extra_sets`].
pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority};
use std::collections::{HashMap, hash_map, HashSet};
use std::collections::HashSet;
use std::collections::{HashMap, hash_map};
use std::sync::Arc;
mod validator_discovery;
@@ -66,12 +68,14 @@ mod validator_discovery;
///
/// Defines the `Network` trait with an implementation for an `Arc<NetworkService>`.
mod network;
use network::{Network, send_message, get_peer_id_by_authority_id};
use network::{Network, send_message};
/// Request multiplexer for combining the multiple request sources into a single `Stream` of `AllMessages`.
mod multiplexer;
pub use multiplexer::RequestMultiplexer;
use crate::network::get_peer_id_by_authority_id;
#[cfg(test)]
mod tests;
@@ -304,7 +308,7 @@ impl<N, AD> NetworkBridge<N, AD> {
impl<Net, AD, Context> Subsystem<Context, SubsystemError> for NetworkBridge<Net, AD>
where
Net: Network + Sync,
AD: validator_discovery::AuthorityDiscovery,
AD: validator_discovery::AuthorityDiscovery + Clone,
Context: SubsystemContext<Message = NetworkBridgeMessage> + overseer::SubsystemContext<Message = NetworkBridgeMessage>,
{
fn start(mut self, ctx: Context) -> SpawnedSubsystem {
@@ -380,7 +384,7 @@ where
Context: SubsystemContext<Message = NetworkBridgeMessage>,
Context: overseer::SubsystemContext<Message = NetworkBridgeMessage>,
N: Network,
AD: validator_discovery::AuthorityDiscovery,
AD: validator_discovery::AuthorityDiscovery + Clone,
{
// This is kept sorted, descending, by block number.
let mut live_heads: Vec<ActivatedLeaf> = Vec::with_capacity(MAX_VIEW_HEADS);
@@ -877,7 +881,7 @@ async fn run_network<N, AD, Context>(
) -> SubsystemResult<()>
where
N: Network,
AD: validator_discovery::AuthorityDiscovery,
AD: validator_discovery::AuthorityDiscovery + Clone,
Context: SubsystemContext<Message=NetworkBridgeMessage> + overseer::SubsystemContext<Message=NetworkBridgeMessage>,
{
let shared = Shared::default();
@@ -894,6 +898,10 @@ where
.get_statement_fetching()
.expect("Gets initialized, must be `Some` on startup. qed.");
let dispute_receiver = request_multiplexer
.get_dispute_sending()
.expect("Gets initialized, must be `Some` on startup. qed.");
let (remote, network_event_handler) = handle_network_messages::<>(
ctx.sender().clone(),
network_service.clone(),
@@ -906,6 +914,9 @@ where
ctx.spawn("network-bridge-network-worker", Box::pin(remote))?;
ctx.send_message(
DisputeDistributionMessage::DisputeSendingReceiver(dispute_receiver)
).await;
ctx.send_message(
StatementDistributionMessage::StatementFetchingReceiver(statement_receiver)
).await;
@@ -15,6 +15,7 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::pin::Pin;
use std::unreachable;
use futures::channel::mpsc;
use futures::stream::{FusedStream, Stream};
@@ -42,6 +43,7 @@ use polkadot_overseer::AllMessages;
pub struct RequestMultiplexer {
receivers: Vec<(Protocol, mpsc::Receiver<network::IncomingRequest>)>,
statement_fetching: Option<mpsc::Receiver<network::IncomingRequest>>,
dispute_sending: Option<mpsc::Receiver<network::IncomingRequest>>,
next_poll: usize,
}
@@ -68,6 +70,8 @@ impl RequestMultiplexer {
})
.unzip();
// Ok this code is ugly as hell, it is also a hack, see https://github.com/paritytech/polkadot/issues/2842.
// But it works and is executed on startup so, if anything is wrong here it will be noticed immediately.
let index = receivers.iter().enumerate().find_map(|(i, (p, _))|
if let Protocol::StatementFetching = p {
Some(i)
@@ -77,10 +81,20 @@ impl RequestMultiplexer {
).expect("Statement fetching must be registered. qed.");
let statement_fetching = Some(receivers.remove(index).1);
let index = receivers.iter().enumerate().find_map(|(i, (p, _))|
if let Protocol::DisputeSending = p {
Some(i)
} else {
None
}
).expect("Dispute sending must be registered. qed.");
let dispute_sending = Some(receivers.remove(index).1);
(
Self {
receivers,
statement_fetching,
dispute_sending,
next_poll: 0,
},
cfgs,
@@ -93,6 +107,13 @@ impl RequestMultiplexer {
pub fn get_statement_fetching(&mut self) -> Option<mpsc::Receiver<network::IncomingRequest>> {
std::mem::take(&mut self.statement_fetching)
}
/// Get the receiver for handling dispute sending requests.
///
/// This function will only return `Some` once.
pub fn get_dispute_sending(&mut self) -> Option<mpsc::Receiver<network::IncomingRequest>> {
std::mem::take(&mut self.dispute_sending)
}
}
impl Stream for RequestMultiplexer {
@@ -174,6 +195,9 @@ fn multiplex_single(
Protocol::StatementFetching => {
unreachable!("Statement fetching requests are handled directly. qed.");
}
Protocol::DisputeSending => {
unreachable!("Dispute sending request are handled directly. qed.");
}
};
Ok(r)
}
+45 -2
View File
@@ -67,7 +67,7 @@ struct TestNetwork {
_req_configs: Vec<RequestResponseConfig>,
}
#[derive(Clone)]
#[derive(Clone, Debug)]
struct TestAuthorityDiscovery;
// The test's view of the network. This receives updates from the subsystem in the form
@@ -688,6 +688,12 @@ fn peer_view_updates_sent_via_overseer() {
let view = view![Hash::repeat_byte(1)];
assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeDistribution(
DisputeDistributionMessage::DisputeSendingReceiver(_)
)
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
@@ -740,6 +746,12 @@ fn peer_messages_sent_via_overseer() {
ObservedRole::Full,
).await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeDistribution(
DisputeDistributionMessage::DisputeSendingReceiver(_)
)
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
@@ -812,6 +824,12 @@ fn peer_disconnect_from_just_one_peerset() {
network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await;
network_handle.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full).await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeDistribution(
DisputeDistributionMessage::DisputeSendingReceiver(_)
)
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
@@ -894,6 +912,12 @@ fn relays_collation_protocol_messages() {
let peer_a = PeerId::random();
let peer_b = PeerId::random();
assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeDistribution(
DisputeDistributionMessage::DisputeSendingReceiver(_)
)
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
@@ -992,6 +1016,12 @@ fn different_views_on_different_peer_sets() {
network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await;
network_handle.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full).await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeDistribution(
DisputeDistributionMessage::DisputeSendingReceiver(_)
)
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
@@ -1153,6 +1183,12 @@ fn send_messages_to_peers() {
network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await;
network_handle.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full).await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeDistribution(
DisputeDistributionMessage::DisputeSendingReceiver(_)
)
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
@@ -1279,7 +1315,8 @@ fn spread_event_to_subsystems_is_up_to_date() {
AllMessages::ApprovalDistribution(_) => { cnt += 1; }
AllMessages::GossipSupport(_) => unreachable!("Not interested in network events"),
AllMessages::DisputeCoordinator(_) => unreachable!("Not interested in network events"),
AllMessages::DisputeParticipation(_) => unreachable!("Not interetsed in network events"),
AllMessages::DisputeParticipation(_) => unreachable!("Not interested in network events"),
AllMessages::DisputeDistribution(_) => unreachable!("Not interested in network events"),
AllMessages::ChainSelection(_) => unreachable!("Not interested in network events"),
// Add variants here as needed, `{ cnt += 1; }` for those that need to be
// notified, `unreachable!()` for those that should not.
@@ -1325,6 +1362,12 @@ fn our_view_updates_decreasing_order_and_limited_to_max() {
0,
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::DisputeDistribution(
DisputeDistributionMessage::DisputeSendingReceiver(_)
)
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
@@ -21,37 +21,16 @@ use crate::Network;
use core::marker::PhantomData;
use std::collections::HashSet;
use async_trait::async_trait;
use futures::channel::oneshot;
use sc_network::multiaddr::Multiaddr;
use sc_authority_discovery::Service as AuthorityDiscoveryService;
use polkadot_node_network_protocol::PeerId;
use polkadot_primitives::v1::AuthorityDiscoveryId;
use polkadot_node_network_protocol::peer_set::{PeerSet, PerPeerSet};
pub use polkadot_node_network_protocol::authority_discovery::AuthorityDiscovery;
const LOG_TARGET: &str = "parachain::validator-discovery";
/// An abstraction over the authority discovery service.
#[async_trait]
pub trait AuthorityDiscovery: Send + Clone + 'static {
/// Get the addresses for the given [`AuthorityId`] from the local address cache.
async fn get_addresses_by_authority_id(&mut self, authority: AuthorityDiscoveryId) -> Option<Vec<Multiaddr>>;
/// Get the [`AuthorityId`] for the given [`PeerId`] from the local address cache.
async fn get_authority_id_by_peer_id(&mut self, peer_id: PeerId) -> Option<AuthorityDiscoveryId>;
}
#[async_trait]
impl AuthorityDiscovery for AuthorityDiscoveryService {
async fn get_addresses_by_authority_id(&mut self, authority: AuthorityDiscoveryId) -> Option<Vec<Multiaddr>> {
AuthorityDiscoveryService::get_addresses_by_authority_id(self, authority).await
}
async fn get_authority_id_by_peer_id(&mut self, peer_id: PeerId) -> Option<AuthorityDiscoveryId> {
AuthorityDiscoveryService::get_authority_id_by_peer_id(self, peer_id).await
}
}
pub(super) struct Service<N, AD> {
state: PerPeerSet<StatePerPeerSet>,
// PhantomData used to make the struct generic instead of having generic methods
@@ -147,9 +126,10 @@ mod tests {
use std::{borrow::Cow, collections::HashMap};
use futures::stream::BoxStream;
use async_trait::async_trait;
use sc_network::{Event as NetworkEvent, IfDisconnected};
use sp_keyring::Sr25519Keyring;
use polkadot_node_network_protocol::request_response::request::Requests;
use polkadot_node_network_protocol::{PeerId, request_response::request::Requests};
fn new_service() -> Service<TestNetwork, TestAuthorityDiscovery> {
Service::new()
@@ -164,7 +144,7 @@ mod tests {
peers_set: HashSet<Multiaddr>,
}
#[derive(Default, Clone)]
#[derive(Default, Clone, Debug)]
struct TestAuthorityDiscovery {
by_authority_id: HashMap<AuthorityDiscoveryId, Multiaddr>,
by_peer_id: HashMap<PeerId, AuthorityDiscoveryId>,
@@ -450,8 +450,8 @@ where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
let session_index = runtime.get_session_index(ctx, relay_parent).await?;
let info = &runtime.get_session_info_by_index(ctx, relay_parent, session_index)
let session_index = runtime.get_session_index(ctx.sender(), relay_parent).await?;
let info = &runtime.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
.await?
.session_info;
tracing::debug!(target: LOG_TARGET, ?session_index, "Received session info");
@@ -798,7 +798,7 @@ where
"Collation seconded message received with none-seconded statement.",
);
} else {
let statement = runtime.check_signature(ctx, relay_parent, statement)
let statement = runtime.check_signature(ctx.sender(), relay_parent, statement)
.await?
.map_err(NonFatal::InvalidStatementSignature)?;
@@ -0,0 +1,38 @@
[package]
name = "polkadot-dispute-distribution"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
futures = "0.3.15"
tracing = "0.1.26"
parity-scale-codec = { version = "2.0.0", features = ["std"] }
polkadot-primitives = { path = "../../../primitives" }
polkadot-erasure-coding = { path = "../../../erasure-coding" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-network-protocol = { path = "../../network/protocol" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-node-primitives = { path = "../../primitives" }
polkadot-node-core-runtime-api = { path = "../../core/runtime-api" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
thiserror = "1.0.23"
rand = "0.8.3"
lru = "0.6.5"
[dev-dependencies]
async-trait = "0.1.42"
polkadot-subsystem-testhelpers = { package = "polkadot-node-subsystem-test-helpers", path = "../../subsystem-test-helpers" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network = {git = "https://github.com/paritytech/substrate", branch = "master" }
futures-timer = "3.0.2"
assert_matches = "1.4.0"
maplit = "1.0"
smallvec = "1.6.1"
lazy_static = "1.4.0"
@@ -0,0 +1,101 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//
//! Error handling related code and Error/Result definitions.
use thiserror::Error;
use polkadot_node_subsystem_util::{Fault, runtime, unwrap_non_fatal};
use polkadot_subsystem::SubsystemError;
use crate::LOG_TARGET;
use crate::sender;
#[derive(Debug, Error)]
#[error(transparent)]
pub struct Error(pub Fault<NonFatal, Fatal>);
impl From<NonFatal> for Error {
fn from(e: NonFatal) -> Self {
Self(Fault::from_non_fatal(e))
}
}
impl From<Fatal> for Error {
fn from(f: Fatal) -> Self {
Self(Fault::from_fatal(f))
}
}
impl From<sender::Error> for Error {
fn from(e: sender::Error) -> Self {
match e.0 {
Fault::Fatal(f) => Self(Fault::Fatal(Fatal::Sender(f))),
Fault::Err(nf) => Self(Fault::Err(NonFatal::Sender(nf))),
}
}
}
/// Fatal errors of this subsystem.
#[derive(Debug, Error)]
pub enum Fatal {
/// Receiving subsystem message from overseer failed.
#[error("Receiving message from overseer failed")]
SubsystemReceive(#[source] SubsystemError),
/// Spawning a running task failed.
#[error("Spawning subsystem task failed")]
SpawnTask(#[source] SubsystemError),
/// DisputeSender mpsc receiver exhausted.
#[error("Erasure chunk requester stream exhausted")]
SenderExhausted,
/// Errors coming from runtime::Runtime.
#[error("Error while accessing runtime information")]
Runtime(#[from] #[source] runtime::Fatal),
/// Errors coming from DisputeSender
#[error("Error while accessing runtime information")]
Sender(#[from] #[source] sender::Fatal),
}
/// Non-fatal errors of this subsystem.
#[derive(Debug, Error)]
pub enum NonFatal {
/// Errors coming from DisputeSender
#[error("Error while accessing runtime information")]
Sender(#[from] #[source] sender::NonFatal),
}
pub type Result<T> = std::result::Result<T, Error>;
pub type FatalResult<T> = std::result::Result<T, Fatal>;
/// Utility for eating top level errors and log them.
///
/// We basically always want to try and continue on error. This utility function is meant to
/// consume top-level errors by simply logging them
pub fn log_error(result: Result<()>, ctx: &'static str)
-> std::result::Result<(), Fatal>
{
if let Some(error) = unwrap_non_fatal(result.map_err(|e| e.0))? {
tracing::warn!(target: LOG_TARGET, error = ?error, ctx);
}
Ok(())
}
@@ -0,0 +1,271 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! # Sending and receiving of `DisputeRequest`s.
//!
//! This subsystem essentially consists of two parts:
//!
//! - a sender
//! - and a receiver
//!
//! The sender is responsible for getting our vote out, see [`sender`]. The receiver handles
//! incoming [`DisputeRequest`]s and offers spam protection, see [`receiver`].
use futures::channel::{mpsc};
use futures::{FutureExt, StreamExt, TryFutureExt};
use polkadot_node_network_protocol::authority_discovery::AuthorityDiscovery;
use sp_keystore::SyncCryptoStorePtr;
use polkadot_node_primitives::DISPUTE_WINDOW;
use polkadot_subsystem::{
overseer, messages::DisputeDistributionMessage, FromOverseer, OverseerSignal, SpawnedSubsystem,
SubsystemContext, SubsystemError,
};
use polkadot_node_subsystem_util::{
runtime,
runtime::RuntimeInfo,
};
/// ## The sender [`DisputeSender`]
///
/// The sender (`DisputeSender`) keeps track of live disputes and makes sure our vote gets out for
/// each one of those. The sender is responsible for sending our vote to each validator
/// participating in the dispute and to each authority currently authoring blocks. The sending can
/// be initiated by sending `DisputeDistributionMessage::SendDispute` message to this subsystem.
///
/// In addition the `DisputeSender` will query the coordinator for active disputes on each
/// [`DisputeSender::update_leaves`] call and will initiate sending (start a `SendTask`) for every,
/// to this subsystem, unknown dispute. This is to make sure, we get our vote out, even on
/// restarts.
///
/// The actual work of sending and keeping track of transmission attempts to each validator for a
/// particular dispute are done by [`SendTask`]. The purpose of the `DisputeSender` is to keep
/// track of all ongoing disputes and start and clean up `SendTask`s accordingly.
mod sender;
use self::sender::{DisputeSender, TaskFinish};
/// ## The receiver [`DisputesReceiver`]
///
/// The receiving side is implemented as `DisputesReceiver` and is run as a separate long running task within
/// this subsystem ([`DisputesReceiver::run`]).
///
/// Conceptually all the receiver has to do, is waiting for incoming requests which are passed in
/// via a dedicated channel and forwarding them to the dispute coordinator via
/// `DisputeCoordinatorMessage::ImportStatements`. Being the interface to the network and untrusted
/// nodes, the reality is not that simple of course. Before importing statements the receiver will
/// make sure as good as it can to filter out malicious/unwanted/spammy requests. For this it does
/// the following:
///
/// - Drop all messages from non validator nodes, for this it requires the [`AuthorityDiscovery`]
/// service.
/// - Drop messages from a node, if we are already importing a message from that node (flood).
/// - Drop messages from nodes, that provided us messages where the statement import failed.
/// - Drop any obviously invalid votes (invalid signatures for example).
/// - Ban peers whose votes were deemed invalid.
///
/// For successfully imported votes, we will confirm the receipt of the message back to the sender.
/// This way a received confirmation guarantees, that the vote has been stored to disk by the
/// receiver.
mod receiver;
use self::receiver::DisputesReceiver;
/// Error and [`Result`] type for this subsystem.
mod error;
use error::{Fatal, FatalResult};
use error::{Result, log_error};
#[cfg(test)]
mod tests;
mod metrics;
//// Prometheus `Metrics` for dispute distribution.
pub use metrics::Metrics;
const LOG_TARGET: &'static str = "parachain::dispute-distribution";
/// The dispute distribution subsystem.
pub struct DisputeDistributionSubsystem<AD> {
/// Easy and efficient runtime access for this subsystem.
runtime: RuntimeInfo,
/// Sender for our dispute requests.
disputes_sender: DisputeSender,
/// Receive messages from `SendTask`.
sender_rx: mpsc::Receiver<TaskFinish>,
/// Authority discovery service.
authority_discovery: AD,
/// Metrics for this subsystem.
metrics: Metrics,
}
impl<Context, AD> overseer::Subsystem<Context, SubsystemError> for DisputeDistributionSubsystem<AD>
where
Context: SubsystemContext<Message = DisputeDistributionMessage>
+ overseer::SubsystemContext<Message = DisputeDistributionMessage>
+ Sync + Send,
AD: AuthorityDiscovery + Clone,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self
.run(ctx)
.map_err(|e| SubsystemError::with_origin("dispute-distribution", e))
.boxed();
SpawnedSubsystem {
name: "dispute-distribution-subsystem",
future,
}
}
}
impl<AD> DisputeDistributionSubsystem<AD>
where
AD: AuthorityDiscovery + Clone,
{
/// Create a new instance of the availability distribution.
pub fn new(keystore: SyncCryptoStorePtr, authority_discovery: AD, metrics: Metrics) -> Self {
let runtime = RuntimeInfo::new_with_config(runtime::Config {
keystore: Some(keystore),
session_cache_lru_size: DISPUTE_WINDOW as usize,
});
let (tx, sender_rx) = mpsc::channel(1);
let disputes_sender = DisputeSender::new(tx, metrics.clone());
Self { runtime, disputes_sender, sender_rx, authority_discovery, metrics }
}
/// Start processing work as passed on from the Overseer.
async fn run<Context>(mut self, mut ctx: Context) -> std::result::Result<(), Fatal>
where
Context: SubsystemContext<Message = DisputeDistributionMessage>
+ overseer::SubsystemContext<Message = DisputeDistributionMessage>
+ Sync + Send,
{
loop {
let message = MuxedMessage::receive(&mut ctx, &mut self.sender_rx).await;
match message {
MuxedMessage::Subsystem(result) => {
let result = match result? {
FromOverseer::Signal(signal) => {
match self.handle_signals(&mut ctx, signal).await {
Ok(SignalResult::Conclude) => return Ok(()),
Ok(SignalResult::Continue) => Ok(()),
Err(f) => Err(f),
}
}
FromOverseer::Communication { msg } =>
self.handle_subsystem_message(&mut ctx, msg).await,
};
log_error(result, "on FromOverseer")?;
}
MuxedMessage::Sender(result) => {
self.disputes_sender.on_task_message(
result.ok_or(Fatal::SenderExhausted)?
)
.await;
}
}
}
}
/// Handle overseer signals.
async fn handle_signals<Context: SubsystemContext> (
&mut self,
ctx: &mut Context,
signal: OverseerSignal,
) -> Result<SignalResult>
{
match signal {
OverseerSignal::Conclude =>
return Ok(SignalResult::Conclude),
OverseerSignal::ActiveLeaves(update) => {
self.disputes_sender.update_leaves(
ctx,
&mut self.runtime,
update
)
.await?;
}
OverseerSignal::BlockFinalized(_,_) => {}
};
Ok(SignalResult::Continue)
}
/// Handle `DisputeDistributionMessage`s.
async fn handle_subsystem_message<Context: SubsystemContext> (
&mut self,
ctx: &mut Context,
msg: DisputeDistributionMessage
) -> Result<()>
{
match msg {
DisputeDistributionMessage::SendDispute(dispute_msg) =>
self.disputes_sender.start_sender(ctx, &mut self.runtime, dispute_msg).await?,
// This message will only arrive once:
DisputeDistributionMessage::DisputeSendingReceiver(receiver) => {
let receiver = DisputesReceiver::new(
ctx.sender().clone(),
receiver,
self.authority_discovery.clone(),
self.metrics.clone()
);
ctx
.spawn("disputes-receiver", receiver.run().boxed(),)
.map_err(Fatal::SpawnTask)?;
},
}
Ok(())
}
}
/// Messages to be handled in this subsystem.
#[derive(Debug)]
enum MuxedMessage {
/// Messages from other subsystems.
Subsystem(FatalResult<FromOverseer<DisputeDistributionMessage>>),
/// Messages from spawned sender background tasks.
Sender(Option<TaskFinish>),
}
impl MuxedMessage {
async fn receive(
ctx: &mut (impl SubsystemContext<Message = DisputeDistributionMessage> + overseer::SubsystemContext<Message = DisputeDistributionMessage>),
from_sender: &mut mpsc::Receiver<TaskFinish>,
) -> Self {
// We are only fusing here to make `select` happy, in reality we will quit if the stream
// ends.
let from_overseer = ctx.recv().fuse();
futures::pin_mut!(from_overseer, from_sender);
futures::select!(
msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(Fatal::SubsystemReceive)),
msg = from_sender.next() => MuxedMessage::Sender(msg),
)
}
}
/// Result of handling signal from overseer.
enum SignalResult {
/// Overseer asked us to conclude.
Conclude,
/// We can continue processing events.
Continue,
}
@@ -0,0 +1,109 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use polkadot_node_subsystem_util::metrics::prometheus::{Counter, U64, Registry, PrometheusError, CounterVec, Opts};
use polkadot_node_subsystem_util::metrics::prometheus;
use polkadot_node_subsystem_util::metrics;
/// Label for success counters.
pub const SUCCEEDED: &'static str = "succeeded";
/// Label for fail counters.
pub const FAILED: &'static str = "failed";
/// Dispute Distribution metrics.
#[derive(Clone, Default)]
pub struct Metrics(Option<MetricsInner>);
#[derive(Clone)]
struct MetricsInner {
/// Number of sent dispute requests (succeeded and failed).
sent_requests: CounterVec<U64>,
/// Number of requests received.
///
/// This is all requests coming in, regardless of whether they are processed or dropped.
received_requests: Counter<U64>,
/// Number of requests for which `ImportStatements` returned.
///
/// We both have success full imports and failed imports here.
imported_requests: CounterVec<U64>,
}
impl Metrics {
/// Create new dummy metrics, not reporting anything.
pub fn new_dummy() -> Self {
Metrics(None)
}
/// Increment counter on finished request sending.
pub fn on_sent_request(&self, label: &'static str) {
if let Some(metrics) = &self.0 {
metrics.sent_requests.with_label_values(&[label]).inc()
}
}
/// Increment counter on served chunks.
pub fn on_received_request(&self) {
if let Some(metrics) = &self.0 {
metrics.received_requests.inc()
}
}
/// Statements have been imported.
pub fn on_imported(&self, label: &'static str) {
if let Some(metrics) = &self.0 {
metrics.imported_requests.with_label_values(&[label]).inc()
}
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &Registry) -> Result<Self, PrometheusError> {
let metrics = MetricsInner {
sent_requests: prometheus::register(
CounterVec::new(
Opts::new(
"parachain_dispute_distribution_sent_requests",
"Total number of sent requests.",
),
&["success"]
)?,
registry,
)?,
received_requests: prometheus::register(
Counter::new(
"parachain_dispute_distribution_received_requests",
"Total number of received dispute requests.",
)?,
registry,
)?,
imported_requests: prometheus::register(
CounterVec::new(
Opts::new(
"parachain_dispute_distribution_imported_requests",
"Total number of imported requests.",
),
&["success"]
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
@@ -0,0 +1,110 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//
//! Error handling related code and Error/Result definitions.
use thiserror::Error;
use polkadot_node_network_protocol::PeerId;
use polkadot_node_network_protocol::request_response::request::ReceiveError;
use polkadot_node_subsystem_util::{Fault, runtime, unwrap_non_fatal};
use crate::LOG_TARGET;
#[derive(Debug, Error)]
#[error(transparent)]
pub struct Error(pub Fault<NonFatal, Fatal>);
impl From<NonFatal> for Error {
fn from(e: NonFatal) -> Self {
Self(Fault::from_non_fatal(e))
}
}
impl From<Fatal> for Error {
fn from(f: Fatal) -> Self {
Self(Fault::from_fatal(f))
}
}
impl From<runtime::Error> for Error {
fn from(o: runtime::Error) -> Self {
Self(Fault::from_other(o))
}
}
/// Fatal errors of this subsystem.
#[derive(Debug, Error)]
pub enum Fatal {
/// Request channel returned `None`. Likely a system shutdown.
#[error("Request channel stream finished.")]
RequestChannelFinished,
/// Errors coming from runtime::Runtime.
#[error("Error while accessing runtime information")]
Runtime(#[from] #[source] runtime::Fatal),
}
/// Non-fatal errors of this subsystem.
#[derive(Debug, Error)]
pub enum NonFatal {
/// Answering request failed.
#[error("Sending back response to peer {0} failed.")]
SendResponse(PeerId),
/// Getting request from raw request failed.
#[error("Decoding request failed.")]
FromRawRequest(#[source] ReceiveError),
/// Setting reputation for peer failed.
#[error("Changing peer's ({0}) reputation failed.")]
SetPeerReputation(PeerId),
/// Peer sent us request with invalid signature.
#[error("Dispute request with invalid signatures, from peer {0}.")]
InvalidSignature(PeerId),
/// Import oneshot got canceled.
#[error("Import of dispute got canceled for peer {0} - import failed for some reason.")]
ImportCanceled(PeerId),
/// Non validator tried to participate in dispute.
#[error("Peer {0} is not a validator.")]
NotAValidator(PeerId),
/// Errors coming from runtime::Runtime.
#[error("Error while accessing runtime information")]
Runtime(#[from] #[source] runtime::NonFatal),
}
pub type Result<T> = std::result::Result<T, Error>;
pub type FatalResult<T> = std::result::Result<T, Fatal>;
pub type NonFatalResult<T> = std::result::Result<T, NonFatal>;
/// Utility for eating top level errors and log them.
///
/// We basically always want to try and continue on error. This utility function is meant to
/// consume top-level errors by simply logging them
pub fn log_error(result: Result<()>)
-> std::result::Result<(), Fatal>
{
if let Some(error) = unwrap_non_fatal(result.map_err(|e| e.0))? {
tracing::warn!(target: LOG_TARGET, error = ?error);
}
Ok(())
}
@@ -0,0 +1,429 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashSet;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::FutureExt;
use futures::Stream;
use futures::future::{BoxFuture, poll_fn};
use futures::stream::FusedStream;
use lru::LruCache;
use futures::{channel::mpsc, channel::oneshot, stream::StreamExt, stream::FuturesUnordered};
use polkadot_node_network_protocol::{
PeerId,
UnifiedReputationChange as Rep,
authority_discovery::AuthorityDiscovery,
request_response::{
IncomingRequest,
request::OutgoingResponse,
request::OutgoingResponseSender,
v1::DisputeRequest,
v1::DisputeResponse,
},
};
use polkadot_node_primitives::DISPUTE_WINDOW;
use polkadot_node_subsystem_util::{
runtime,
runtime::RuntimeInfo,
};
use polkadot_subsystem::{
SubsystemSender,
messages::{
AllMessages, DisputeCoordinatorMessage, ImportStatementsResult,
},
};
use crate::metrics::{FAILED, SUCCEEDED};
use crate::{LOG_TARGET, Metrics};
mod error;
use self::error::{log_error, FatalResult, NonFatalResult, NonFatal, Fatal, Result};
const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Received message could not be decoded.");
const COST_INVALID_SIGNATURE: Rep = Rep::Malicious("Signatures were invalid.");
const COST_INVALID_CANDIDATE: Rep = Rep::Malicious("Reported candidate was not available.");
const COST_NOT_A_VALIDATOR: Rep = Rep::CostMajor("Reporting peer was not a validator.");
/// How many statement imports we want to issue in parallel:
pub const MAX_PARALLEL_IMPORTS: usize = 10;
/// State for handling incoming `DisputeRequest` messages.
///
/// This is supposed to run as its own task in order to easily impose back pressure on the incoming
/// request channel and at the same time to drop flood messages as fast as possible.
pub struct DisputesReceiver<Sender, AD> {
/// Access to session information.
runtime: RuntimeInfo,
/// Subsystem sender for communication with other subsystems.
sender: Sender,
/// Channel to retrieve incoming requests from.
receiver: mpsc::Receiver<sc_network::config::IncomingRequest>,
/// Authority discovery service:
authority_discovery: AD,
/// Imports currently being processed.
pending_imports: PendingImports,
/// We keep record of the last banned peers.
///
/// This is needed because once we ban a peer, we will very likely still have pending requests
/// in the incoming channel - we should not waste time recovering availability for those, as we
/// already know the peer is malicious.
banned_peers: LruCache<PeerId, ()>,
/// Log received requests.
metrics: Metrics,
}
/// Messages as handled by this receiver internally.
enum MuxedMessage {
/// An import got confirmed by the coordinator.
///
/// We need to handle those for two reasons:
///
/// - We need to make sure responses are actually sent (therefore we need to await futures
/// promptly).
/// - We need to update banned_peers accordingly to the result.
ConfirmedImport(NonFatalResult<(PeerId, ImportStatementsResult)>),
/// A new request has arrived and should be handled.
NewRequest(sc_network::config::IncomingRequest),
}
impl MuxedMessage {
async fn receive(
pending_imports: &mut PendingImports,
pending_requests: &mut mpsc::Receiver<sc_network::config::IncomingRequest>,
) -> FatalResult<MuxedMessage> {
poll_fn(|ctx| {
if let Poll::Ready(v) = pending_requests.poll_next_unpin(ctx) {
let r = match v {
None => Err(Fatal::RequestChannelFinished),
Some(msg) => Ok(MuxedMessage::NewRequest(msg)),
};
return Poll::Ready(r)
}
// In case of Ready(None) return `Pending` below - we want to wait for the next request
// in that case.
if let Poll::Ready(Some(v)) = pending_imports.poll_next_unpin(ctx) {
return Poll::Ready(Ok(MuxedMessage::ConfirmedImport(v)))
}
Poll::Pending
}).await
}
}
impl<Sender: SubsystemSender, AD> DisputesReceiver<Sender, AD>
where
AD: AuthorityDiscovery,
{
/// Create a new receiver which can be `run`.
pub fn new(
sender: Sender,
receiver: mpsc::Receiver<sc_network::config::IncomingRequest>,
authority_discovery: AD,
metrics: Metrics,
) -> Self {
let runtime = RuntimeInfo::new_with_config(runtime::Config {
keystore: None,
session_cache_lru_size: DISPUTE_WINDOW as usize,
});
Self {
runtime,
sender,
receiver,
authority_discovery,
pending_imports: PendingImports::new(),
// Size of MAX_PARALLEL_IMPORTS ensures we are going to immediately get rid of any
// malicious requests still pending in the incoming queue.
banned_peers: LruCache::new(MAX_PARALLEL_IMPORTS),
metrics,
}
}
/// Get that receiver started.
///
/// This is an endless loop and should be spawned into its own task.
pub async fn run(mut self) {
loop {
match log_error(self.run_inner().await) {
Ok(()) => {}
Err(Fatal::RequestChannelFinished) => {
tracing::debug!(
target: LOG_TARGET,
"Incoming request stream exhausted - shutting down?"
);
return
}
Err(err) => {
tracing::warn!(
target: LOG_TARGET,
?err,
"Dispute receiver died."
);
return
}
}
}
}
/// Actual work happening here.
async fn run_inner(&mut self) -> Result<()> {
let msg = MuxedMessage::receive(
&mut self.pending_imports,
&mut self.receiver
)
.await?;
let raw = match msg {
// We need to clean up futures, to make sure responses are sent:
MuxedMessage::ConfirmedImport(m_bad) => {
self.ban_bad_peer(m_bad)?;
return Ok(())
}
MuxedMessage::NewRequest(req) => req,
};
self.metrics.on_received_request();
let peer = raw.peer;
// Only accept messages from validators:
if self.authority_discovery.get_authority_id_by_peer_id(raw.peer).await.is_none() {
raw.pending_response.send(
sc_network::config::OutgoingResponse {
result: Err(()),
reputation_changes: vec![COST_NOT_A_VALIDATOR.into_base_rep()],
sent_feedback: None,
}
)
.map_err(|_| NonFatal::SendResponse(peer))?;
return Err(NonFatal::NotAValidator(peer).into())
}
let incoming = IncomingRequest::<DisputeRequest>::try_from_raw(
raw,
vec![COST_INVALID_REQUEST]
)
.map_err(NonFatal::FromRawRequest)?;
// Immediately drop requests from peers that already have requests in flight or have
// been banned recently (flood protection):
if self.pending_imports.peer_is_pending(&peer) || self.banned_peers.contains(&peer) {
tracing::trace!(
target: LOG_TARGET,
?peer,
"Dropping message from peer (banned/pending import)"
);
return Ok(())
}
// Wait for a free slot:
if self.pending_imports.len() >= MAX_PARALLEL_IMPORTS as usize {
// Wait for one to finish:
let r = self.pending_imports.next().await;
self.ban_bad_peer(r.expect("pending_imports.len() is greater 0. qed."))?;
}
// All good - initiate import.
self.start_import(incoming).await
}
/// Start importing votes for the given request.
async fn start_import(
&mut self,
incoming: IncomingRequest<DisputeRequest>,
) -> Result<()> {
let IncomingRequest {
peer, payload, pending_response,
} = incoming;
let info = self.runtime.get_session_info_by_index(
&mut self.sender,
payload.0.candidate_receipt.descriptor.relay_parent,
payload.0.session_index
)
.await?;
let votes_result = payload.0.try_into_signed_votes(&info.session_info);
let (candidate_receipt, valid_vote, invalid_vote) = match votes_result {
Err(()) => { // Signature invalid:
pending_response.send_outgoing_response(
OutgoingResponse {
result: Err(()),
reputation_changes: vec![COST_INVALID_SIGNATURE],
sent_feedback: None,
}
)
.map_err(|_| NonFatal::SetPeerReputation(peer))?;
return Err(From::from(NonFatal::InvalidSignature(peer)))
}
Ok(votes) => votes,
};
let (pending_confirmation, confirmation_rx) = oneshot::channel();
let candidate_hash = candidate_receipt.hash();
self.sender.send_message(
AllMessages::DisputeCoordinator(
DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
candidate_receipt,
session: valid_vote.0.session_index(),
statements: vec![valid_vote, invalid_vote],
pending_confirmation,
}
)
)
.await;
self.pending_imports.push(peer, confirmation_rx, pending_response);
Ok(())
}
/// Await an import and ban any misbehaving peers.
///
/// In addition we report import metrics.
fn ban_bad_peer(
&mut self,
result: NonFatalResult<(PeerId, ImportStatementsResult)>
) -> NonFatalResult<()> {
match result? {
(_, ImportStatementsResult::ValidImport) => {
self.metrics.on_imported(SUCCEEDED);
}
(bad_peer, ImportStatementsResult::InvalidImport) => {
self.metrics.on_imported(FAILED);
self.banned_peers.put(bad_peer, ());
}
}
Ok(())
}
}
/// Manage pending imports in a way that preserves invariants.
struct PendingImports {
/// Futures in flight.
futures: FuturesUnordered<BoxFuture<'static, (PeerId, NonFatalResult<ImportStatementsResult>)>>,
/// Peers whose requests are currently in flight.
peers: HashSet<PeerId>,
}
impl PendingImports {
pub fn new() -> Self {
Self {
futures: FuturesUnordered::new(),
peers: HashSet::new(),
}
}
pub fn push(
&mut self,
peer: PeerId,
handled: oneshot::Receiver<ImportStatementsResult>,
pending_response: OutgoingResponseSender<DisputeRequest>
) {
self.peers.insert(peer);
self.futures.push(
async move {
let r = respond_to_request(peer, handled, pending_response).await;
(peer, r)
}.boxed()
)
}
/// Returns the number of contained futures.
pub fn len(&self) -> usize {
self.futures.len()
}
/// Check whether a peer has a pending import.
pub fn peer_is_pending(&self, peer: &PeerId) -> bool {
self.peers.contains(peer)
}
}
impl Stream for PendingImports {
type Item = NonFatalResult<(PeerId, ImportStatementsResult)>;
fn poll_next(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>
) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.futures).poll_next(ctx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some((peer, result))) => {
self.peers.remove(&peer);
Poll::Ready(Some(result.map(|r| (peer,r))))
}
}
}
}
impl FusedStream for PendingImports {
fn is_terminated(&self) -> bool {
self.futures.is_terminated()
}
}
// Future for `PendingImports`
//
// - Wait for import
// - Punish peer
// - Deliver result
async fn respond_to_request(
peer: PeerId,
handled: oneshot::Receiver<ImportStatementsResult>,
pending_response: OutgoingResponseSender<DisputeRequest>
) -> NonFatalResult<ImportStatementsResult> {
let result = handled
.await
.map_err(|_| NonFatal::ImportCanceled(peer))?
;
let response = match result {
ImportStatementsResult::ValidImport =>
OutgoingResponse {
result: Ok(DisputeResponse::Confirmed),
reputation_changes: Vec::new(),
sent_feedback: None,
},
ImportStatementsResult::InvalidImport =>
OutgoingResponse {
result: Err(()),
reputation_changes: vec![COST_INVALID_CANDIDATE],
sent_feedback: None,
},
};
pending_response
.send_outgoing_response(response)
.map_err(|_| NonFatal::SendResponse(peer))?;
Ok(result)
}
@@ -0,0 +1,107 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//
//! Error handling related code and Error/Result definitions.
use thiserror::Error;
use polkadot_node_subsystem_util::{Fault, runtime};
use polkadot_subsystem::SubsystemError;
use polkadot_node_primitives::disputes::DisputeMessageCheckError;
#[derive(Debug, Error)]
#[error(transparent)]
pub struct Error(pub Fault<NonFatal, Fatal>);
impl From<NonFatal> for Error {
fn from(e: NonFatal) -> Self {
Self(Fault::from_non_fatal(e))
}
}
impl From<Fatal> for Error {
fn from(f: Fatal) -> Self {
Self(Fault::from_fatal(f))
}
}
impl From<runtime::Error> for Error {
fn from(o: runtime::Error) -> Self {
Self(Fault::from_other(o))
}
}
/// Fatal errors of this subsystem.
#[derive(Debug, Error)]
pub enum Fatal {
/// Spawning a running task failed.
#[error("Spawning subsystem task failed")]
SpawnTask(#[source] SubsystemError),
/// Errors coming from runtime::Runtime.
#[error("Error while accessing runtime information")]
Runtime(#[from] #[source] runtime::Fatal),
}
/// Non-fatal errors of this subsystem.
#[derive(Debug, Error)]
pub enum NonFatal {
/// We need available active heads for finding relevant authorities.
#[error("No active heads available - needed for finding relevant authorities.")]
NoActiveHeads,
/// This error likely indicates a bug in the coordinator.
#[error("Oneshot for asking dispute coordinator for active disputes got canceled.")]
AskActiveDisputesCanceled,
/// This error likely indicates a bug in the coordinator.
#[error("Oneshot for asking dispute coordinator for candidate votes got canceled.")]
AskCandidateVotesCanceled,
/// This error does indicate a bug in the coordinator.
///
/// We were not able to successfully construct a `DisputeMessage` from disputes votes.
#[error("Invalid dispute encountered")]
InvalidDisputeFromCoordinator(#[source] DisputeMessageCheckError),
/// This error does indicate a bug in the coordinator.
///
/// We did not receive votes on both sides for `CandidateVotes` received from the coordinator.
#[error("Missing votes for valid dispute")]
MissingVotesFromCoordinator,
/// This error does indicate a bug in the coordinator.
///
/// `SignedDisputeStatement` could not be reconstructed from recorded statements.
#[error("Invalid statements from coordinator")]
InvalidStatementFromCoordinator,
/// This error does indicate a bug in the coordinator.
///
/// A statement's `ValidatorIndex` could not be looked up.
#[error("ValidatorIndex of statement could not be found")]
InvalidValidatorIndexFromCoordinator,
/// Errors coming from runtime::Runtime.
#[error("Error while accessing runtime information")]
Runtime(#[from] #[source] runtime::NonFatal),
}
pub type Result<T> = std::result::Result<T, Error>;
pub type NonFatalResult<T> = std::result::Result<T, NonFatal>;
@@ -0,0 +1,362 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{HashMap, HashSet, hash_map::Entry};
use futures::channel::{mpsc, oneshot};
use polkadot_node_network_protocol::request_response::v1::DisputeRequest;
use polkadot_node_primitives::{CandidateVotes, DisputeMessage, SignedDisputeStatement};
use polkadot_node_subsystem_util::runtime::RuntimeInfo;
use polkadot_primitives::v1::{CandidateHash, DisputeStatement, Hash, SessionIndex};
use polkadot_subsystem::{
ActiveLeavesUpdate, SubsystemContext,
messages::{AllMessages, DisputeCoordinatorMessage}
};
/// For each ongoing dispute we have a `SendTask` which takes care of it.
///
/// It is going to spawn real tasks as it sees fit for getting the votes of the particular dispute
/// out.
mod send_task;
use send_task::SendTask;
pub use send_task::TaskFinish;
/// Error and [`Result`] type for sender
mod error;
pub use error::{Result, Error, Fatal, NonFatal};
use crate::{LOG_TARGET, Metrics};
use self::error::NonFatalResult;
/// The `DisputeSender` keeps track of all ongoing disputes we need to send statements out.
///
/// For each dispute a `SendTask` is responsible of sending to the concerned validators for that
/// particular dispute. The `DisputeSender` keeps track of those tasks, informs them about new
/// sessions/validator sets and cleans them up when they become obsolete.
pub struct DisputeSender {
/// All heads we currently consider active.
active_heads: Vec<Hash>,
/// List of currently active sessions.
///
/// Value is the hash that was used for the query.
active_sessions: HashMap<SessionIndex, Hash>,
/// All ongoing dispute sendings this subsystem is aware of.
disputes: HashMap<CandidateHash, SendTask>,
/// Sender to be cloned for `SendTask`s.
tx: mpsc::Sender<TaskFinish>,
/// Metrics for reporting stats about sent requests.
metrics: Metrics,
}
impl DisputeSender
{
/// Create a new `DisputeSender` which can be used to start dispute sendings.
pub fn new(tx: mpsc::Sender<TaskFinish>, metrics: Metrics) -> Self {
Self {
active_heads: Vec::new(),
active_sessions: HashMap::new(),
disputes: HashMap::new(),
tx,
metrics,
}
}
/// Create a `SendTask` for a particular new dispute.
pub async fn start_sender<Context: SubsystemContext>(
&mut self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
msg: DisputeMessage,
) -> Result<()> {
let req: DisputeRequest = msg.into();
let candidate_hash = req.0.candidate_receipt.hash();
match self.disputes.entry(candidate_hash) {
Entry::Occupied(_) => {
tracing::trace!(
target: LOG_TARGET,
?candidate_hash,
"Dispute sending already active."
);
return Ok(())
}
Entry::Vacant(vacant) => {
let send_task = SendTask::new(
ctx,
runtime,
&self.active_sessions,
self.tx.clone(),
req,
)
.await?;
vacant.insert(send_task);
}
}
Ok(())
}
/// Take care of a change in active leaves.
///
/// - Initiate a retry of failed sends which are still active.
/// - Get new authorities to send messages to.
/// - Get rid of obsolete tasks and disputes.
/// - Get dispute sending started in case we missed one for some reason (e.g. on node startup)
pub async fn update_leaves<Context: SubsystemContext>(
&mut self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
update: ActiveLeavesUpdate,
) -> Result<()> {
let ActiveLeavesUpdate { activated, deactivated } = update;
let deactivated: HashSet<_> = deactivated.into_iter().collect();
self.active_heads.retain(|h| !deactivated.contains(h));
self.active_heads.extend(activated.into_iter().map(|l| l.hash));
let have_new_sessions = self.refresh_sessions(ctx, runtime).await?;
let active_disputes = get_active_disputes(ctx).await?;
let unknown_disputes = {
let mut disputes = active_disputes.clone();
disputes.retain(|(_, c)| !self.disputes.contains_key(c));
disputes
};
let active_disputes: HashSet<_> = active_disputes.into_iter().map(|(_, c)| c).collect();
// Cleanup obsolete senders:
self.disputes.retain(
|candidate_hash, _| active_disputes.contains(candidate_hash)
);
for dispute in self.disputes.values_mut() {
if have_new_sessions || dispute.has_failed_sends() {
dispute.refresh_sends(ctx, runtime, &self.active_sessions).await?;
}
}
// This should only be non-empty on startup, but if not - we got you covered:
for dispute in unknown_disputes {
self.start_send_for_dispute(ctx, runtime, dispute).await?
}
Ok(())
}
/// Receive message from a sending task.
pub async fn on_task_message(&mut self, msg: TaskFinish) {
let TaskFinish { candidate_hash, receiver, result } = msg;
self.metrics.on_sent_request(result.as_metrics_label());
let task = match self.disputes.get_mut(&candidate_hash) {
None => {
// Can happen when a dispute ends, with messages still in queue:
tracing::trace!(
target: LOG_TARGET,
?result,
"Received `FromSendingTask::Finished` for non existing dispute."
);
return
}
Some(task) => task,
};
task.on_finished_send(&receiver, result);
}
/// Call `start_sender` on all passed in disputes.
///
/// Recover necessary votes for building up `DisputeMessage` and start sending for all of them.
async fn start_send_for_dispute<Context: SubsystemContext>(
&mut self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
dispute: (SessionIndex, CandidateHash),
) -> Result<()> {
let (session_index, candidate_hash) = dispute;
// We need some relay chain head for context for receiving session info information:
let ref_head = self.active_sessions.values().next().ok_or(NonFatal::NoActiveHeads)?;
let info = runtime.get_session_info_by_index(ctx.sender(), *ref_head, session_index).await?;
let our_index = match info.validator_info.our_index {
None => {
tracing::trace!(
target: LOG_TARGET,
"Not a validator in that session - not starting dispute sending."
);
return Ok(())
}
Some(index) => index,
};
let votes = match get_candidate_votes(ctx, session_index, candidate_hash).await? {
None => {
tracing::debug!(
target: LOG_TARGET,
?session_index,
?candidate_hash,
"No votes for active dispute?! - possible, due to race."
);
return Ok(())
}
Some(votes) => votes,
};
let our_valid_vote = votes
.valid
.iter()
.find(|(_, i, _)| *i == our_index);
let our_invalid_vote = votes
.invalid
.iter()
.find(|(_, i, _)| *i == our_index);
let (valid_vote, invalid_vote) =
if let Some(our_valid_vote) = our_valid_vote {
// Get some invalid vote as well:
let invalid_vote = votes
.invalid
.get(0)
.ok_or(NonFatal::MissingVotesFromCoordinator)?;
(our_valid_vote, invalid_vote)
} else if let Some(our_invalid_vote) = our_invalid_vote {
// Get some valid vote as well:
let valid_vote = votes
.valid
.get(0)
.ok_or(NonFatal::MissingVotesFromCoordinator)?;
(valid_vote, our_invalid_vote)
} else {
return Err(From::from(NonFatal::MissingVotesFromCoordinator))
}
;
let (kind, valid_index, signature) = valid_vote;
let valid_public = info
.session_info
.validators
.get(valid_index.0 as usize)
.ok_or(NonFatal::InvalidStatementFromCoordinator)?;
let valid_signed = SignedDisputeStatement::new_checked(
DisputeStatement::Valid(kind.clone()),
candidate_hash,
session_index,
valid_public.clone(),
signature.clone(),
)
.map_err(|()| NonFatal::InvalidStatementFromCoordinator)?;
let (kind, invalid_index, signature) = invalid_vote;
let invalid_public = info
.session_info
.validators
.get(invalid_index.0 as usize)
.ok_or(NonFatal::InvalidValidatorIndexFromCoordinator)?;
let invalid_signed = SignedDisputeStatement::new_checked(
DisputeStatement::Invalid(kind.clone()),
candidate_hash,
session_index,
invalid_public.clone(),
signature.clone(),
)
.map_err(|()| NonFatal::InvalidValidatorIndexFromCoordinator)?;
// Reconstructing the checked signed dispute statements is hardly useful here and wasteful,
// but I don't want to enable a bypass for the below smart constructor and this code path
// is supposed to be only hit on startup basically.
//
// Revisit this decision when the `from_signed_statements` is unneded for the normal code
// path as well.
let message = DisputeMessage::from_signed_statements(
valid_signed,
*valid_index,
invalid_signed,
*invalid_index,
votes.candidate_receipt,
&info.session_info
)
.map_err(NonFatal::InvalidDisputeFromCoordinator)?;
// Finally, get the party started:
self.start_sender(ctx, runtime, message).await
}
/// Make active sessions correspond to currently active heads.
///
/// Returns: true if sessions changed.
async fn refresh_sessions<Context: SubsystemContext>(
&mut self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
) -> Result<bool> {
let new_sessions = get_active_session_indeces(ctx, runtime, &self.active_heads).await?;
let new_sessions_raw: HashSet<_> = new_sessions.keys().collect();
let old_sessions_raw: HashSet<_> = self.active_sessions.keys().collect();
let updated = new_sessions_raw != old_sessions_raw;
// Update in any case, so we use current heads for queries:
self.active_sessions = new_sessions;
Ok(updated)
}
}
/// Retrieve the currently active sessions.
///
/// List is all indeces of all active sessions together with the head that was used for the query.
async fn get_active_session_indeces<Context: SubsystemContext>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
active_heads: &Vec<Hash>,
) -> Result<HashMap<SessionIndex, Hash>> {
let mut indeces = HashMap::new();
for head in active_heads {
let session_index = runtime.get_session_index(ctx.sender(), *head).await?;
indeces.insert(session_index, *head);
}
Ok(indeces)
}
/// Retrieve Set of active disputes from the dispute coordinator.
async fn get_active_disputes<Context: SubsystemContext>(ctx: &mut Context)
-> NonFatalResult<Vec<(SessionIndex, CandidateHash)>> {
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::DisputeCoordinator(
DisputeCoordinatorMessage::ActiveDisputes(tx)
))
.await;
rx.await.map_err(|_| NonFatal::AskActiveDisputesCanceled)
}
/// Get all locally available dispute votes for a given dispute.
async fn get_candidate_votes<Context: SubsystemContext>(
ctx: &mut Context,
session_index: SessionIndex,
candidate_hash: CandidateHash,
) -> NonFatalResult<Option<CandidateVotes>> {
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::DisputeCoordinator(
DisputeCoordinatorMessage::QueryCandidateVotes(
session_index,
candidate_hash,
tx
)
))
.await;
rx.await.map_err(|_| NonFatal::AskCandidateVotesCanceled)
}
@@ -0,0 +1,328 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashMap;
use std::collections::HashSet;
use futures::Future;
use futures::FutureExt;
use futures::SinkExt;
use futures::channel::mpsc;
use futures::future::RemoteHandle;
use polkadot_node_network_protocol::{
IfDisconnected,
request_response::{
OutgoingRequest, OutgoingResult, Recipient, Requests,
v1::{DisputeRequest, DisputeResponse},
}
};
use polkadot_node_subsystem_util::runtime::RuntimeInfo;
use polkadot_primitives::v1::{
AuthorityDiscoveryId, CandidateHash, Hash, SessionIndex, ValidatorIndex,
};
use polkadot_subsystem::{
SubsystemContext,
messages::{AllMessages, NetworkBridgeMessage},
};
use super::error::{Fatal, Result};
use crate::LOG_TARGET;
use crate::metrics::FAILED;
use crate::metrics::SUCCEEDED;
/// Delivery status for a particular dispute.
///
/// Keeps track of all the validators that have to be reached for a dispute.
pub struct SendTask {
/// The request we are supposed to get out to all parachain validators of the dispute's session
/// and to all current authorities.
request: DisputeRequest,
/// The set of authorities we need to send our messages to. This set will change at session
/// boundaries. It will always be at least the parachain validators of the session where the
/// dispute happened and the authorities of the current sessions as determined by active heads.
deliveries: HashMap<AuthorityDiscoveryId, DeliveryStatus>,
/// Whether or not we have any tasks failed since the last refresh.
has_failed_sends: bool,
/// Sender to be cloned for tasks.
tx: mpsc::Sender<TaskFinish>,
}
/// Status of a particular vote/statement delivery to a particular validator.
enum DeliveryStatus {
/// Request is still in flight.
Pending(RemoteHandle<()>),
/// Succeeded - no need to send request to this peer anymore.
Succeeded,
}
/// A sending task finishes with this result:
#[derive(Debug)]
pub struct TaskFinish {
/// The candidate this task was running for.
pub candidate_hash: CandidateHash,
/// The authority the request was sent to.
pub receiver: AuthorityDiscoveryId,
/// The result of the delivery attempt.
pub result: TaskResult,
}
#[derive(Debug)]
pub enum TaskResult {
/// Task succeeded in getting the request to its peer.
Succeeded,
/// Task was not able to get the request out to its peer.
///
/// It should be retried in that case.
Failed,
}
impl TaskResult {
pub fn as_metrics_label(&self) -> &'static str {
match self {
Self::Succeeded => SUCCEEDED,
Self::Failed => FAILED,
}
}
}
impl SendTask
{
/// Initiates sending a dispute message to peers.
pub async fn new<Context: SubsystemContext>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
active_sessions: &HashMap<SessionIndex,Hash>,
tx: mpsc::Sender<TaskFinish>,
request: DisputeRequest,
) -> Result<Self> {
let mut send_task = Self {
request,
deliveries: HashMap::new(),
has_failed_sends: false,
tx,
};
send_task.refresh_sends(
ctx,
runtime,
active_sessions,
).await?;
Ok(send_task)
}
/// Make sure we are sending to all relevant authorities.
///
/// This function is called at construction and should also be called whenever a session change
/// happens and on a regular basis to ensure we are retrying failed attempts.
pub async fn refresh_sends<Context: SubsystemContext>(
&mut self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
active_sessions: &HashMap<SessionIndex, Hash>,
) -> Result<()> {
let new_authorities = self.get_relevant_validators(ctx, runtime, active_sessions).await?;
let add_authorities = new_authorities
.iter()
.filter(|a| !self.deliveries.contains_key(a))
.map(Clone::clone)
.collect();
// Get rid of dead/irrelevant tasks/statuses:
self.deliveries.retain(|k, _| new_authorities.contains(k));
// Start any new tasks that are needed:
let new_statuses = send_requests(
ctx,
self.tx.clone(),
add_authorities,
self.request.clone(),
).await?;
self.deliveries.extend(new_statuses.into_iter());
self.has_failed_sends = false;
Ok(())
}
/// Whether or not any sends have failed since the last refreshed.
pub fn has_failed_sends(&self) -> bool {
self.has_failed_sends
}
/// Handle a finished response waiting task.
pub fn on_finished_send(&mut self, authority: &AuthorityDiscoveryId, result: TaskResult) {
match result {
TaskResult::Failed => {
tracing::warn!(
target: LOG_TARGET,
candidate = ?self.request.0.candidate_receipt.hash(),
?authority,
"Could not get our message out! If this keeps happening, then check chain whether the dispute made it there."
);
self.has_failed_sends = true;
// Remove state, so we know what to try again:
self.deliveries.remove(authority);
}
TaskResult::Succeeded => {
let status = match self.deliveries.get_mut(&authority) {
None => {
// Can happen when a sending became irrelevant while the response was already
// queued.
tracing::debug!(
target: LOG_TARGET,
candidate = ?self.request.0.candidate_receipt.hash(),
?authority,
?result,
"Received `FromSendingTask::Finished` for non existing task."
);
return
}
Some(status) => status,
};
// We are done here:
*status = DeliveryStatus::Succeeded;
}
}
}
/// Determine all validators that should receive the given dispute requests.
///
/// This is all parachain validators of the session the candidate occurred and all authorities
/// of all currently active sessions, determined by currently active heads.
async fn get_relevant_validators<Context: SubsystemContext>(
&self,
ctx: &mut Context,
runtime: &mut RuntimeInfo,
active_sessions: &HashMap<SessionIndex, Hash>,
) -> Result<HashSet<AuthorityDiscoveryId>> {
let ref_head = self.request.0.candidate_receipt.descriptor.relay_parent;
// Parachain validators:
let info = runtime
.get_session_info_by_index(ctx.sender(), ref_head, self.request.0.session_index)
.await?;
let session_info = &info.session_info;
let validator_count = session_info.validators.len();
let mut authorities: HashSet<_> = session_info
.discovery_keys
.iter()
.take(validator_count)
.enumerate()
.filter(|(i, _)| Some(ValidatorIndex(*i as _)) != info.validator_info.our_index)
.map(|(_, v)| v.clone())
.collect();
// Current authorities:
for (session_index, head) in active_sessions.iter() {
let info = runtime.get_session_info_by_index(ctx.sender(), *head, *session_index).await?;
let session_info = &info.session_info;
let new_set = session_info
.discovery_keys
.iter()
.enumerate()
.filter(|(i, _)| Some(ValidatorIndex(*i as _)) != info.validator_info.our_index)
.map(|(_, v)| v.clone());
authorities.extend(new_set);
}
Ok(authorities)
}
}
/// Start sending of the given msg to all given authorities.
///
/// And spawn tasks for handling the response.
async fn send_requests<Context: SubsystemContext>(
ctx: &mut Context,
tx: mpsc::Sender<TaskFinish>,
receivers: Vec<AuthorityDiscoveryId>,
req: DisputeRequest,
) -> Result<HashMap<AuthorityDiscoveryId, DeliveryStatus>> {
let mut statuses = HashMap::with_capacity(receivers.len());
let mut reqs = Vec::with_capacity(receivers.len());
for receiver in receivers {
let (outgoing, pending_response) = OutgoingRequest::new(
Recipient::Authority(receiver.clone()),
req.clone(),
);
reqs.push(Requests::DisputeSending(outgoing));
let fut = wait_response_task(
pending_response,
req.0.candidate_receipt.hash(),
receiver.clone(),
tx.clone(),
);
let (remote, remote_handle) = fut.remote_handle();
ctx.spawn("dispute-sender", remote.boxed())
.map_err(Fatal::SpawnTask)?;
statuses.insert(receiver, DeliveryStatus::Pending(remote_handle));
}
let msg = NetworkBridgeMessage::SendRequests(
reqs,
// We should be connected, but the hell - if not, try!
IfDisconnected::TryConnect,
);
ctx.send_message(AllMessages::NetworkBridge(msg)).await;
Ok(statuses)
}
/// Future to be spawned in a task for awaiting a response.
async fn wait_response_task(
pending_response: impl Future<Output = OutgoingResult<DisputeResponse>>,
candidate_hash: CandidateHash,
receiver: AuthorityDiscoveryId,
mut tx: mpsc::Sender<TaskFinish>,
) {
let result = pending_response.await;
let msg = match result {
Err(err) => {
tracing::warn!(
target: LOG_TARGET,
%candidate_hash,
%receiver,
%err,
"Error sending dispute statements to node."
);
TaskFinish { candidate_hash, receiver, result: TaskResult::Failed}
}
Ok(DisputeResponse::Confirmed) => {
tracing::trace!(
target: LOG_TARGET,
%candidate_hash,
%receiver,
"Sending dispute message succeeded"
);
TaskFinish { candidate_hash, receiver, result: TaskResult::Succeeded }
}
};
if let Err(err) = tx.feed(msg).await {
tracing::debug!(
target: LOG_TARGET,
%err,
"Failed to notify susystem about dispute sending result."
);
}
}
@@ -0,0 +1,195 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//
//! Mock data and utility functions for unit tests in this subsystem.
use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use lazy_static::lazy_static;
use polkadot_node_network_protocol::{PeerId, authority_discovery::AuthorityDiscovery};
use sc_keystore::LocalKeystore;
use sp_application_crypto::AppKey;
use sp_keyring::{Sr25519Keyring};
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use polkadot_node_primitives::{DisputeMessage, SignedDisputeStatement};
use polkadot_primitives::v1::{
CandidateDescriptor, CandidateHash, CandidateReceipt, Hash,
SessionIndex, SessionInfo, ValidatorId, ValidatorIndex, AuthorityDiscoveryId,
};
pub const MOCK_SESSION_INDEX: SessionIndex = 1;
pub const MOCK_NEXT_SESSION_INDEX: SessionIndex = 2;
pub const MOCK_VALIDATORS: [Sr25519Keyring; 6] = [
Sr25519Keyring::Ferdie,
Sr25519Keyring::Alice,
Sr25519Keyring::Bob,
Sr25519Keyring::Charlie,
Sr25519Keyring::Dave,
Sr25519Keyring::Eve,
];
pub const MOCK_AUTHORITIES_NEXT_SESSION: [Sr25519Keyring;2] = [
Sr25519Keyring::One,
Sr25519Keyring::Two,
];
pub const FERDIE_INDEX: ValidatorIndex = ValidatorIndex(0);
pub const ALICE_INDEX: ValidatorIndex = ValidatorIndex(1);
lazy_static! {
/// Mocked AuthorityDiscovery service.
pub static ref MOCK_AUTHORITY_DISCOVERY: MockAuthorityDiscovery = MockAuthorityDiscovery::new();
// Creating an innocent looking `SessionInfo` is really expensive in a debug build. Around
// 700ms on my machine, We therefore cache those keys here:
pub static ref MOCK_VALIDATORS_DISCOVERY_KEYS: HashMap<Sr25519Keyring, AuthorityDiscoveryId> =
MOCK_VALIDATORS
.iter()
.chain(MOCK_AUTHORITIES_NEXT_SESSION.iter())
.map(|v| (v.clone(), v.public().into()))
.collect()
;
pub static ref FERDIE_DISCOVERY_KEY: AuthorityDiscoveryId =
MOCK_VALIDATORS_DISCOVERY_KEYS.get(&Sr25519Keyring::Ferdie).unwrap().clone();
pub static ref MOCK_SESSION_INFO: SessionInfo =
SessionInfo {
validators: MOCK_VALIDATORS.iter().take(4).map(|k| k.public().into()).collect(),
discovery_keys: MOCK_VALIDATORS
.iter()
.map(|k| MOCK_VALIDATORS_DISCOVERY_KEYS.get(&k).unwrap().clone())
.collect(),
..Default::default()
};
/// SessionInfo for the second session. (No more validators, but two more authorities.
pub static ref MOCK_NEXT_SESSION_INFO: SessionInfo =
SessionInfo {
discovery_keys:
MOCK_AUTHORITIES_NEXT_SESSION
.iter()
.map(|k| MOCK_VALIDATORS_DISCOVERY_KEYS.get(&k).unwrap().clone())
.collect(),
..Default::default()
};
}
pub fn make_candidate_receipt(relay_parent: Hash) -> CandidateReceipt {
CandidateReceipt {
descriptor: CandidateDescriptor {
relay_parent,
..Default::default()
},
commitments_hash: Hash::random(),
}
}
pub async fn make_explicit_signed(
validator: Sr25519Keyring,
candidate_hash: CandidateHash,
valid: bool
) -> SignedDisputeStatement {
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
SyncCryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&validator.to_seed()),
)
.expect("Insert key into keystore");
SignedDisputeStatement::sign_explicit(
&keystore,
valid,
candidate_hash,
MOCK_SESSION_INDEX,
validator.public().into(),
)
.await
.expect("Keystore should be fine.")
.expect("Signing should work.")
}
pub async fn make_dispute_message(
candidate: CandidateReceipt,
valid_validator: ValidatorIndex,
invalid_validator: ValidatorIndex,
) -> DisputeMessage {
let candidate_hash = candidate.hash();
let valid_vote =
make_explicit_signed(MOCK_VALIDATORS[valid_validator.0 as usize], candidate_hash, true).await;
let invalid_vote =
make_explicit_signed(MOCK_VALIDATORS[invalid_validator.0 as usize], candidate_hash, false).await;
DisputeMessage::from_signed_statements(
valid_vote,
valid_validator,
invalid_vote,
invalid_validator,
candidate,
&MOCK_SESSION_INFO,
)
.expect("DisputeMessage construction should work.")
}
/// Dummy `AuthorityDiscovery` service.
#[derive(Debug, Clone)]
pub struct MockAuthorityDiscovery {
peer_ids: HashMap<Sr25519Keyring, PeerId>
}
impl MockAuthorityDiscovery {
pub fn new() -> Self {
let mut peer_ids = HashMap::new();
peer_ids.insert(Sr25519Keyring::Alice, PeerId::random());
peer_ids.insert(Sr25519Keyring::Bob, PeerId::random());
peer_ids.insert(Sr25519Keyring::Ferdie, PeerId::random());
peer_ids.insert(Sr25519Keyring::Charlie, PeerId::random());
peer_ids.insert(Sr25519Keyring::Dave, PeerId::random());
peer_ids.insert(Sr25519Keyring::Eve, PeerId::random());
peer_ids.insert(Sr25519Keyring::One, PeerId::random());
peer_ids.insert(Sr25519Keyring::Two, PeerId::random());
Self { peer_ids }
}
pub fn get_peer_id_by_authority(&self, authority: Sr25519Keyring) -> PeerId {
*self.peer_ids.get(&authority).expect("Tester only picks valid authorities")
}
}
#[async_trait]
impl AuthorityDiscovery for MockAuthorityDiscovery {
async fn get_addresses_by_authority_id(&mut self, _authority: polkadot_primitives::v1::AuthorityDiscoveryId)
-> Option<Vec<sc_network::Multiaddr>> {
panic!("Not implemented");
}
async fn get_authority_id_by_peer_id(&mut self, peer_id: polkadot_node_network_protocol::PeerId)
-> Option<polkadot_primitives::v1::AuthorityDiscoveryId> {
for (a, p) in self.peer_ids.iter() {
if p == &peer_id {
return Some(MOCK_VALIDATORS_DISCOVERY_KEYS.get(&a).unwrap().clone())
}
}
None
}
}
@@ -0,0 +1,765 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//
//! Subsystem unit tests
use std::collections::HashSet;
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use assert_matches::assert_matches;
use futures::{
channel::{oneshot, mpsc},
future::poll_fn,
pin_mut,
SinkExt, Future
};
use futures_timer::Delay;
use parity_scale_codec::{Encode, Decode};
use polkadot_node_network_protocol::PeerId;
use polkadot_node_network_protocol::request_response::v1::DisputeRequest;
use sp_keyring::Sr25519Keyring;
use polkadot_node_network_protocol::{IfDisconnected, request_response::{Recipient, Requests, v1::DisputeResponse}};
use polkadot_node_primitives::{CandidateVotes, UncheckedDisputeMessage};
use polkadot_primitives::v1::{AuthorityDiscoveryId, CandidateHash, Hash, SessionIndex, SessionInfo};
use polkadot_subsystem::messages::{DisputeCoordinatorMessage, ImportStatementsResult};
use polkadot_subsystem::{
ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, LeafStatus, OverseerSignal, Span,
messages::{
AllMessages, DisputeDistributionMessage, NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest
},
};
use polkadot_subsystem_testhelpers::{TestSubsystemContextHandle, mock::make_ferdie_keystore, subsystem_test_harness};
use crate::{DisputeDistributionSubsystem, LOG_TARGET, Metrics};
use self::mock::{
ALICE_INDEX, FERDIE_INDEX, make_candidate_receipt, make_dispute_message,
MOCK_AUTHORITY_DISCOVERY, MOCK_SESSION_INDEX, MOCK_SESSION_INFO, MOCK_NEXT_SESSION_INDEX,
MOCK_NEXT_SESSION_INFO, FERDIE_DISCOVERY_KEY,
};
/// Useful mock providers.
pub mod mock;
#[test]
fn send_dispute_sends_dispute() {
let test = |mut handle: TestSubsystemContextHandle<DisputeDistributionMessage>|
async move {
let (_, _) = handle_subsystem_startup(&mut handle, None).await;
let relay_parent = Hash::random();
let candidate = make_candidate_receipt(relay_parent);
let message =
make_dispute_message(candidate.clone(), ALICE_INDEX, FERDIE_INDEX,).await;
handle.send(
FromOverseer::Communication {
msg: DisputeDistributionMessage::SendDispute(message.clone())
}
).await;
// Requests needed session info:
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(
hash,
RuntimeApiRequest::SessionInfo(session_index, tx)
)
) => {
assert_eq!(session_index, MOCK_SESSION_INDEX);
assert_eq!(
hash,
message.candidate_receipt().descriptor.relay_parent
);
tx.send(Ok(Some(MOCK_SESSION_INFO.clone()))).expect("Receiver should stay alive.");
}
);
let expected_receivers = {
let info = &MOCK_SESSION_INFO;
info.discovery_keys
.clone()
.into_iter()
.filter(|a| a != &Sr25519Keyring::Ferdie.public().into())
.collect()
// All validators are also authorities in the first session, so we are
// done here.
};
check_sent_requests(&mut handle, expected_receivers, true).await;
conclude(&mut handle).await;
};
test_harness(test);
}
#[test]
fn received_request_triggers_import() {
let test = |mut handle: TestSubsystemContextHandle<DisputeDistributionMessage>|
async move {
let (_, mut req_tx) = handle_subsystem_startup(&mut handle, None).await;
let relay_parent = Hash::random();
let candidate = make_candidate_receipt(relay_parent);
let message =
make_dispute_message(candidate.clone(), ALICE_INDEX, FERDIE_INDEX,).await;
// Non validator request should get dropped:
let rx_response = send_network_dispute_request(
&mut req_tx,
PeerId::random(),
message.clone().into()
).await;
assert_matches!(
rx_response.await,
Ok(resp) => {
let sc_network::config::OutgoingResponse {
result: _,
reputation_changes,
sent_feedback: _,
} = resp;
// Peer should get punished:
assert_eq!(reputation_changes.len(), 1);
}
);
// Nested valid and invalid import.
//
// Nested requests from same peer should get dropped. For the invalid request even
// subsequent requests should get dropped.
nested_network_dispute_request(
&mut handle,
&mut req_tx,
MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Alice),
message.clone().into(),
ImportStatementsResult::InvalidImport,
true,
move |handle, req_tx, message|
nested_network_dispute_request(
handle,
req_tx,
MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Bob),
message.clone().into(),
ImportStatementsResult::ValidImport,
false,
move |_, req_tx, message| async move {
// Another request from Alice should get dropped (request already in
// flight):
{
let rx_response = send_network_dispute_request(
req_tx,
MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Alice),
message.clone(),
).await;
assert_matches!(
rx_response.await,
Err(err) => {
tracing::trace!(
target: LOG_TARGET,
?err,
"Request got dropped - other request already in flight"
);
}
);
}
// Another request from Bob should get dropped (request already in
// flight):
{
let rx_response = send_network_dispute_request(
req_tx,
MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Bob),
message.clone(),
).await;
assert_matches!(
rx_response.await,
Err(err) => {
tracing::trace!(
target: LOG_TARGET,
?err,
"Request got dropped - other request already in flight"
);
}
);
}
}
)
).await;
// Subsequent sends from Alice should fail (peer is banned):
{
let rx_response = send_network_dispute_request(
&mut req_tx,
MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Alice),
message.clone().into()
).await;
assert_matches!(
rx_response.await,
Err(err) => {
tracing::trace!(
target: LOG_TARGET,
?err,
"Request got dropped - peer is banned."
);
}
);
}
// But should work fine for Bob:
nested_network_dispute_request(
&mut handle,
&mut req_tx,
MOCK_AUTHORITY_DISCOVERY.get_peer_id_by_authority(Sr25519Keyring::Bob),
message.clone().into(),
ImportStatementsResult::ValidImport,
false,
|_, _, _| async {}
).await;
tracing::trace!(target: LOG_TARGET, "Concluding.");
conclude(&mut handle).await;
};
test_harness(test);
}
#[test]
fn disputes_are_recovered_at_startup() {
let test = |mut handle: TestSubsystemContextHandle<DisputeDistributionMessage>|
async move {
let relay_parent = Hash::random();
let candidate = make_candidate_receipt(relay_parent);
let (_, _) = handle_subsystem_startup(&mut handle, Some(candidate.hash())).await;
let message =
make_dispute_message(candidate.clone(), ALICE_INDEX, FERDIE_INDEX,).await;
// Requests needed session info:
assert_matches!(
handle.recv().await,
AllMessages::DisputeCoordinator(
DisputeCoordinatorMessage::QueryCandidateVotes(
session_index,
candidate_hash,
tx,
)
) => {
assert_eq!(session_index, MOCK_SESSION_INDEX);
assert_eq!(candidate_hash, candidate.hash());
let unchecked: UncheckedDisputeMessage = message.into();
tx.send(Some(CandidateVotes {
candidate_receipt: candidate,
valid: vec![(
unchecked.valid_vote.kind,
unchecked.valid_vote.validator_index,
unchecked.valid_vote.signature
)],
invalid: vec![(
unchecked.invalid_vote.kind,
unchecked.invalid_vote.validator_index,
unchecked.invalid_vote.signature
)],
}))
.expect("Receiver should stay alive.");
}
);
let expected_receivers = {
let info = &MOCK_SESSION_INFO;
info.discovery_keys
.clone()
.into_iter()
.filter(|a| a != &Sr25519Keyring::Ferdie.public().into())
.collect()
// All validators are also authorities in the first session, so we are
// done here.
};
check_sent_requests(&mut handle, expected_receivers, true).await;
conclude(&mut handle).await;
};
test_harness(test);
}
#[test]
fn send_dispute_gets_cleaned_up() {
let test = |mut handle: TestSubsystemContextHandle<DisputeDistributionMessage>|
async move {
let (old_head, _) = handle_subsystem_startup(&mut handle, None).await;
let relay_parent = Hash::random();
let candidate = make_candidate_receipt(relay_parent);
let message =
make_dispute_message(candidate.clone(), ALICE_INDEX, FERDIE_INDEX,).await;
handle.send(
FromOverseer::Communication {
msg: DisputeDistributionMessage::SendDispute(message.clone())
}
).await;
// Requests needed session info:
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(
hash,
RuntimeApiRequest::SessionInfo(session_index, tx)
)
) => {
assert_eq!(session_index, MOCK_SESSION_INDEX);
assert_eq!(
hash,
message.candidate_receipt().descriptor.relay_parent
);
tx.send(Ok(Some(MOCK_SESSION_INFO.clone()))).expect("Receiver should stay alive.");
}
);
let expected_receivers = {
let info = &MOCK_SESSION_INFO;
info.discovery_keys
.clone()
.into_iter()
.filter(|a| a != &Sr25519Keyring::Ferdie.public().into())
.collect()
// All validators are also authorities in the first session, so we are
// done here.
};
check_sent_requests(&mut handle, expected_receivers, false).await;
// Give tasks a chance to finish:
Delay::new(Duration::from_millis(20)).await;
activate_leaf(
&mut handle,
Hash::random(),
Some(old_head),
MOCK_SESSION_INDEX,
None,
// No disputes any more:
Vec::new(),
).await;
// Yield, so subsystem can make progess:
Delay::new(Duration::from_millis(2)).await;
conclude(&mut handle).await;
};
test_harness(test);
}
#[test]
fn dispute_retries_and_works_across_session_boundaries() {
let test = |mut handle: TestSubsystemContextHandle<DisputeDistributionMessage>|
async move {
let (old_head, _) = handle_subsystem_startup(&mut handle, None).await;
let relay_parent = Hash::random();
let candidate = make_candidate_receipt(relay_parent);
let message =
make_dispute_message(candidate.clone(), ALICE_INDEX, FERDIE_INDEX,).await;
handle.send(
FromOverseer::Communication {
msg: DisputeDistributionMessage::SendDispute(message.clone())
}
).await;
// Requests needed session info:
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(
hash,
RuntimeApiRequest::SessionInfo(session_index, tx)
)
) => {
assert_eq!(session_index, MOCK_SESSION_INDEX);
assert_eq!(
hash,
message.candidate_receipt().descriptor.relay_parent
);
tx.send(Ok(Some(MOCK_SESSION_INFO.clone()))).expect("Receiver should stay alive.");
}
);
let expected_receivers: HashSet<_> = {
let info = &MOCK_SESSION_INFO;
info.discovery_keys
.clone()
.into_iter()
.filter(|a| a != &Sr25519Keyring::Ferdie.public().into())
.collect()
// All validators are also authorities in the first session, so we are
// done here.
};
// Requests don't get confirmed - dispute is carried over to next session.
check_sent_requests(&mut handle, expected_receivers.clone(), false).await;
// Give tasks a chance to finish:
Delay::new(Duration::from_millis(20)).await;
// Trigger retry:
let old_head2 = Hash::random();
activate_leaf(
&mut handle,
old_head2,
Some(old_head),
MOCK_SESSION_INDEX,
None,
vec![(MOCK_SESSION_INDEX, candidate.hash())]
).await;
check_sent_requests(&mut handle, expected_receivers.clone(), false).await;
// Give tasks a chance to finish:
Delay::new(Duration::from_millis(20)).await;
// Session change:
activate_leaf(
&mut handle,
Hash::random(),
Some(old_head2),
MOCK_NEXT_SESSION_INDEX,
Some(MOCK_NEXT_SESSION_INFO.clone()),
vec![(MOCK_SESSION_INDEX, candidate.hash())]
).await;
let expected_receivers = {
let validator_count = MOCK_SESSION_INFO.validators.len();
let old_validators = MOCK_SESSION_INFO
.discovery_keys
.clone()
.into_iter()
.take(validator_count)
.filter(|a| *a != *FERDIE_DISCOVERY_KEY);
MOCK_NEXT_SESSION_INFO
.discovery_keys
.clone()
.into_iter()
.filter(|a| *a != *FERDIE_DISCOVERY_KEY)
.chain(old_validators)
.collect()
};
check_sent_requests(&mut handle, expected_receivers, true).await;
conclude(&mut handle).await;
};
test_harness(test);
}
async fn send_network_dispute_request(
req_tx: &mut mpsc::Sender<sc_network::config::IncomingRequest>,
peer: PeerId,
message: DisputeRequest,
) -> oneshot::Receiver<sc_network::config::OutgoingResponse> {
let (pending_response, rx_response) = oneshot::channel();
let req = sc_network::config::IncomingRequest {
peer,
payload: message.encode(),
pending_response,
};
req_tx.feed(req).await.unwrap();
rx_response
}
/// Send request and handle its reactions.
///
/// Passed in function will be called while votes are still being imported.
async fn nested_network_dispute_request<'a, F, O>(
handle: &'a mut TestSubsystemContextHandle<DisputeDistributionMessage>,
req_tx: &'a mut mpsc::Sender<sc_network::config::IncomingRequest>,
peer: PeerId,
message: DisputeRequest,
import_result: ImportStatementsResult,
need_session_info: bool,
inner: F,
)
where
F: FnOnce(
&'a mut TestSubsystemContextHandle<DisputeDistributionMessage>,
&'a mut mpsc::Sender<sc_network::config::IncomingRequest>,
DisputeRequest,
) -> O + 'a,
O: Future<Output = ()> + 'a
{
let rx_response = send_network_dispute_request(
req_tx,
peer,
message.clone().into()
).await;
if need_session_info {
// Subsystem might need `SessionInfo` for determining indices:
match handle.recv().await {
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
_,
RuntimeApiRequest::SessionInfo(_, tx)
)) => {
tx.send(Ok(Some(MOCK_SESSION_INFO.clone()))).expect("Receiver should stay alive.");
}
unexpected => panic!("Unexpected message {:?}", unexpected),
}
}
// Import should get initiated:
let pending_confirmation = assert_matches!(
handle.recv().await,
AllMessages::DisputeCoordinator(
DisputeCoordinatorMessage::ImportStatements {
candidate_hash,
candidate_receipt,
session,
statements,
pending_confirmation,
}
) => {
assert_eq!(session, MOCK_SESSION_INDEX);
assert_eq!(candidate_hash, message.0.candidate_receipt.hash());
assert_eq!(candidate_hash, candidate_receipt.hash());
assert_eq!(statements.len(), 2);
pending_confirmation
}
);
// Do the inner thing:
inner(handle, req_tx, message).await;
// Confirm import
pending_confirmation.send(import_result).unwrap();
assert_matches!(
rx_response.await,
Ok(resp) => {
let sc_network::config::OutgoingResponse {
result,
reputation_changes,
sent_feedback,
} = resp;
match import_result {
ImportStatementsResult::ValidImport => {
let result = result.unwrap();
let decoded =
<DisputeResponse as Decode>::decode(&mut result.as_slice()).unwrap();
assert!(decoded == DisputeResponse::Confirmed);
if let Some(sent_feedback) = sent_feedback {
sent_feedback.send(()).unwrap();
}
tracing::trace!(
target: LOG_TARGET,
"Valid import happened."
);
}
ImportStatementsResult::InvalidImport => {
// Peer should get punished:
assert_eq!(reputation_changes.len(), 1);
}
}
}
);
}
async fn conclude(
handle: &mut TestSubsystemContextHandle<DisputeDistributionMessage>,
) {
// No more messages should be in the queue:
poll_fn(|ctx| {
let fut = handle.recv();
pin_mut!(fut);
// No requests should be inititated, as there is no longer any dispute active:
assert_matches!(
fut.poll(ctx),
Poll::Pending,
"No requests expected"
);
Poll::Ready(())
}).await;
handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
}
/// Pass a `new_session` if you expect the subsystem to retrieve `SessionInfo` when given the
/// `session_index`.
async fn activate_leaf(
handle: &mut TestSubsystemContextHandle<DisputeDistributionMessage>,
activate: Hash,
deactivate: Option<Hash>,
session_index: SessionIndex,
// New session if we expect the subsystem to request it.
new_session: Option<SessionInfo>,
// Currently active disputes to send to the subsystem.
active_disputes: Vec<(SessionIndex, CandidateHash)>,
) {
let has_active_disputes = !active_disputes.is_empty();
handle.send(FromOverseer::Signal(
OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate {
activated: [ActivatedLeaf {
hash: activate,
number: 10,
status: LeafStatus::Fresh,
span: Arc::new(Span::Disabled),
}][..]
.into(),
deactivated: deactivate.into_iter().collect(),
}
)))
.await;
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(tx)
)) => {
assert_eq!(h, activate);
tx.send(Ok(session_index)).expect("Receiver should stay alive.");
}
);
assert_matches!(
handle.recv().await,
AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::ActiveDisputes(tx)) => {
tx.send(active_disputes).expect("Receiver should stay alive.");
}
);
let new_session = match (new_session, has_active_disputes) {
(Some(new_session), true) => new_session,
_ => return,
};
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionInfo(i, tx)
)) => {
assert_eq!(h, activate);
assert_eq!(i, session_index);
tx.send(Ok(Some(new_session))).expect("Receiver should stay alive.");
}
);
}
/// Check whether sent network bridge requests match the expectation.
async fn check_sent_requests(
handle: &mut TestSubsystemContextHandle<DisputeDistributionMessage>,
expected_receivers: HashSet<AuthorityDiscoveryId>,
confirm_receive: bool,
) {
let expected_receivers: HashSet<_> =
expected_receivers
.into_iter()
.map(Recipient::Authority)
.collect();
// Sends to concerned validators:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::TryConnect)
) => {
let reqs: Vec<_> = reqs.into_iter().map(|r|
assert_matches!(
r,
Requests::DisputeSending(req) => {req}
)
)
.collect();
let receivers_raw: Vec<_> = reqs.iter().map(|r| r.peer.clone()).collect();
let receivers: HashSet<_> = receivers_raw.clone().clone().into_iter().collect();
assert_eq!(receivers_raw.len(), receivers.len(), "No duplicates are expected.");
assert_eq!(receivers.len(), expected_receivers.len());
assert_eq!(receivers, expected_receivers);
if confirm_receive {
for req in reqs {
req.pending_response.send(
Ok(DisputeResponse::Confirmed.encode())
)
.expect("Subsystem should be listening for a response.");
}
}
}
);
}
/// Initialize subsystem and return request sender needed for sending incoming requests to the
/// subsystem.
async fn handle_subsystem_startup(
handle: &mut TestSubsystemContextHandle<DisputeDistributionMessage>,
ongoing_dispute: Option<CandidateHash>,
) -> (Hash, mpsc::Sender<sc_network::config::IncomingRequest>) {
let (request_tx, request_rx) = mpsc::channel(5);
handle.send(
FromOverseer::Communication {
msg: DisputeDistributionMessage::DisputeSendingReceiver(request_rx),
}
).await;
let relay_parent = Hash::random();
activate_leaf(
handle,
relay_parent,
None,
MOCK_SESSION_INDEX,
Some(MOCK_SESSION_INFO.clone()),
ongoing_dispute.into_iter().map(|c| (MOCK_SESSION_INDEX, c)).collect()
).await;
(relay_parent, request_tx)
}
/// Launch subsystem and provided test function
///
/// which simulates the overseer.
fn test_harness<TestFn, Fut>(test: TestFn)
where
TestFn: FnOnce(TestSubsystemContextHandle<DisputeDistributionMessage>) -> Fut,
Fut: Future<Output = ()>
{
sp_tracing::try_init_simple();
let keystore = make_ferdie_keystore();
let subsystem = DisputeDistributionSubsystem::new(
keystore,
MOCK_AUTHORITY_DISCOVERY.clone(),
Metrics::new_dummy()
);
let subsystem = |ctx| async {
match subsystem.run(ctx).await {
Ok(()) => {},
Err(fatal) => {
tracing::debug!(
target: LOG_TARGET,
?fatal,
"Dispute distribution exited with fatal error."
);
}
}
};
subsystem_test_harness(test, subsystem);
}
@@ -22,7 +22,6 @@ tracing = "0.1.26"
[dev-dependencies]
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
@@ -23,12 +23,11 @@ use polkadot_node_subsystem::{
};
use polkadot_node_subsystem_test_helpers as test_helpers;
use polkadot_node_subsystem_util::TimeoutExt as _;
use sc_keystore::LocalKeystore;
use sp_keyring::Sr25519Keyring;
use sp_keystore::SyncCryptoStore;
use sp_consensus_babe::{
Epoch as BabeEpoch, BabeEpochConfiguration, AllowedSlots,
};
use test_helpers::mock::make_ferdie_keystore;
use std::sync::Arc;
use std::time::Duration;
@@ -98,17 +97,6 @@ async fn overseer_recv(
msg
}
fn make_ferdie_keystore() -> SyncCryptoStorePtr {
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
SyncCryptoStore::sr25519_generate_new(
&*keystore,
AuthorityDiscoveryId::ID,
Some(&Sr25519Keyring::Ferdie.to_seed()),
)
.expect("Insert key into keystore");
keystore
}
fn authorities() -> Vec<AuthorityDiscoveryId> {
vec![
Sr25519Keyring::Alice.public().into(),
@@ -6,11 +6,13 @@ edition = "2018"
description = "Primitives types for the Node-side"
[dependencies]
async-trait = "0.1.42"
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-primitives = { path = "../../primitives" }
polkadot-node-jaeger = { path = "../../jaeger" }
parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
strum = { version = "0.20", features = ["derive"] }
futures = "0.3.15"
thiserror = "1.0.23"
@@ -0,0 +1,48 @@
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Authority discovery service interfacing.
use std::fmt::Debug;
use async_trait::async_trait;
use sc_authority_discovery::Service as AuthorityDiscoveryService;
use polkadot_primitives::v1::AuthorityDiscoveryId;
use sc_network::{Multiaddr, PeerId};
/// An abstraction over the authority discovery service.
///
/// Needed for mocking in tests mostly.
#[async_trait]
pub trait AuthorityDiscovery: Send + Debug + 'static {
/// Get the addresses for the given [`AuthorityId`] from the local address cache.
async fn get_addresses_by_authority_id(&mut self, authority: AuthorityDiscoveryId) -> Option<Vec<Multiaddr>>;
/// Get the [`AuthorityId`] for the given [`PeerId`] from the local address cache.
async fn get_authority_id_by_peer_id(&mut self, peer_id: PeerId) -> Option<AuthorityDiscoveryId>;
}
#[async_trait]
impl AuthorityDiscovery for AuthorityDiscoveryService {
async fn get_addresses_by_authority_id(&mut self, authority: AuthorityDiscoveryId) -> Option<Vec<Multiaddr>> {
AuthorityDiscoveryService::get_addresses_by_authority_id(self, authority).await
}
async fn get_authority_id_by_peer_id(&mut self, peer_id: PeerId) -> Option<AuthorityDiscoveryId> {
AuthorityDiscoveryService::get_authority_id_by_peer_id(self, peer_id).await
}
}
@@ -38,6 +38,9 @@ pub mod peer_set;
/// Request/response protocols used in Polkadot.
pub mod request_response;
/// Accessing authority discovery service
pub mod authority_discovery;
/// A version of the protocol.
pub type ProtocolVersion = u32;
/// The minimum amount of peers to send gossip messages to.
@@ -66,6 +66,8 @@ pub enum Protocol {
AvailableDataFetching,
/// Fetching of statements that are too large for gossip.
StatementFetching,
/// Sending of dispute statements with application level confirmations.
DisputeSending,
}
@@ -98,7 +100,7 @@ const STATEMENTS_TIMEOUT: Duration = Duration::from_secs(1);
/// We don't want a slow peer to slow down all the others, at the same time we want to get out the
/// data quickly in full to at least some peers (as this will reduce load on us as they then can
/// start serving the data). So this value is a tradeoff. 3 seems to be sensible. So we would need
/// to have 3 slow noded connected, to delay transfer for others by `STATEMENTS_TIMEOUT`.
/// to have 3 slow nodes connected, to delay transfer for others by `STATEMENTS_TIMEOUT`.
pub const MAX_PARALLEL_STATEMENT_REQUESTS: u32 = 3;
impl Protocol {
@@ -106,9 +108,6 @@ impl Protocol {
///
/// Returns a receiver for messages received on this protocol and the requested
/// `ProtocolConfig`.
///
/// See also `dispatcher::RequestDispatcher`, which makes use of this function and provides a more
/// high-level interface.
pub fn get_config(
self,
) -> (
@@ -167,6 +166,17 @@ impl Protocol {
request_timeout: Duration::from_secs(1),
inbound_queue: Some(tx),
},
Protocol::DisputeSending => RequestResponseConfig {
name: p_name,
max_request_size: 1_000,
/// Responses are just confirmation, in essence not even a bit. So 100 seems
/// plenty.
max_response_size: 100,
/// We can have relative large timeouts here, there is no value of hitting a
/// timeout as we want to get statements through to each node in any case.
request_timeout: Duration::from_secs(12),
inbound_queue: Some(tx),
},
};
(rx, cfg)
}
@@ -195,7 +205,7 @@ impl Protocol {
// This is just a guess/estimate, with the following considerations: If we are
// faster than that, queue size will stay low anyway, even if not - requesters will
// get an immediate error, but if we are slower, requesters will run in a timeout -
// waisting precious time.
// wasting precious time.
let available_bandwidth = 7 * MIN_BANDWIDTH_BYTES / 10;
let size = u64::saturating_sub(
STATEMENTS_TIMEOUT.as_millis() as u64 * available_bandwidth / (1000 * MAX_CODE_SIZE as u64),
@@ -207,6 +217,10 @@ impl Protocol {
);
size as usize
}
// Incoming requests can get bursty, we should also be able to handle them fast on
// average, so something in the ballpark of 100 should be fine. Nodes will retry on
// failure, so having a good value here is mostly about performance tuning.
Protocol::DisputeSending => 100,
}
}
@@ -223,6 +237,7 @@ impl Protocol {
Protocol::PoVFetching => "/polkadot/req_pov/1",
Protocol::AvailableDataFetching => "/polkadot/req_available_data/1",
Protocol::StatementFetching => "/polkadot/req_statement/1",
Protocol::DisputeSending => "/polkadot/send_dispute/1",
}
}
}
@@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::marker::PhantomData;
use futures::channel::oneshot;
use futures::prelude::Future;
@@ -54,6 +56,8 @@ pub enum Requests {
AvailableDataFetching(OutgoingRequest<v1::AvailableDataFetchingRequest>),
/// Requests for fetching large statements as part of statement distribution.
StatementFetching(OutgoingRequest<v1::StatementFetchingRequest>),
/// Requests for notifying about an ongoing dispute.
DisputeSending(OutgoingRequest<v1::DisputeRequest>),
}
impl Requests {
@@ -65,6 +69,7 @@ impl Requests {
Self::PoVFetching(_) => Protocol::PoVFetching,
Self::AvailableDataFetching(_) => Protocol::AvailableDataFetching,
Self::StatementFetching(_) => Protocol::StatementFetching,
Self::DisputeSending(_) => Protocol::DisputeSending,
}
}
@@ -82,12 +87,13 @@ impl Requests {
Self::PoVFetching(r) => r.encode_request(),
Self::AvailableDataFetching(r) => r.encode_request(),
Self::StatementFetching(r) => r.encode_request(),
Self::DisputeSending(r) => r.encode_request(),
}
}
}
/// Potential recipients of an outgoing request.
#[derive(Debug, Eq, Hash, PartialEq)]
#[derive(Debug, Eq, Hash, PartialEq, Clone)]
pub enum Recipient {
/// Recipient is a regular peer and we know its peer id.
Peer(PeerId),
@@ -131,6 +137,18 @@ pub enum RequestError {
Canceled(#[source] oneshot::Canceled),
}
/// Things that can go wrong when decoding an incoming request.
#[derive(Debug, Error)]
pub enum ReceiveError {
/// Decoding failed, we were able to change the peer's reputation accordingly.
#[error("Decoding request failed for peer {0}.")]
DecodingError(PeerId, #[source] DecodingError),
/// Decoding failed, but sending reputation change failed.
#[error("Decoding request failed for peer {0}, and changing reputation failed.")]
DecodingErrorNoReputationChange(PeerId, #[source] DecodingError),
}
/// Responses received for an `OutgoingRequest`.
pub type OutgoingResult<Res> = Result<Res, RequestError>;
@@ -205,43 +223,22 @@ pub struct IncomingRequest<Req> {
pub peer: PeerId,
/// The sent request.
pub payload: Req,
/// Sender for sending response back.
pub pending_response: OutgoingResponseSender<Req>,
}
/// Sender for sendinb back responses on an `IncomingRequest`.
#[derive(Debug)]
pub struct OutgoingResponseSender<Req>{
pending_response: oneshot::Sender<netconfig::OutgoingResponse>,
phantom: PhantomData<Req>,
}
/// Typed variant of [`netconfig::OutgoingResponse`].
///
/// Responses to `IncomingRequest`s.
pub struct OutgoingResponse<Response> {
/// The payload of the response.
pub result: Result<Response, ()>,
/// Reputation changes accrued while handling the request. To be applied to the reputation of
/// the peer sending the request.
pub reputation_changes: Vec<UnifiedReputationChange>,
/// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
/// peer.
pub sent_feedback: Option<oneshot::Sender<()>>,
}
impl<Req> IncomingRequest<Req>
impl<Req> OutgoingResponseSender<Req>
where
Req: IsRequest,
Req: IsRequest + Decode,
Req::Response: Encode,
{
/// Create new `IncomingRequest`.
pub fn new(
peer: PeerId,
payload: Req,
pending_response: oneshot::Sender<netconfig::OutgoingResponse>,
) -> Self {
Self {
peer,
payload,
pending_response,
}
}
/// Send the response back.
///
/// On success we return Ok(()), on error we return the not sent `Response`.
@@ -284,6 +281,100 @@ where
}
}
/// Typed variant of [`netconfig::OutgoingResponse`].
///
/// Responses to `IncomingRequest`s.
pub struct OutgoingResponse<Response> {
/// The payload of the response.
///
/// `Err(())` if none is available e.g. due an error while handling the request.
pub result: Result<Response, ()>,
/// Reputation changes accrued while handling the request. To be applied to the reputation of
/// the peer sending the request.
pub reputation_changes: Vec<UnifiedReputationChange>,
/// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
/// peer.
pub sent_feedback: Option<oneshot::Sender<()>>,
}
impl<Req> IncomingRequest<Req>
where
Req: IsRequest + Decode,
Req::Response: Encode,
{
/// Create new `IncomingRequest`.
pub fn new(
peer: PeerId,
payload: Req,
pending_response: oneshot::Sender<netconfig::OutgoingResponse>,
) -> Self {
Self {
peer,
payload,
pending_response: OutgoingResponseSender {
pending_response,
phantom: PhantomData {},
},
}
}
/// Try building from raw substrate request.
///
/// This function will fail if the request cannot be decoded and will apply passed in
/// reputation changes in that case.
///
/// Params:
/// - The raw request to decode
/// - Reputation changes to apply for the peer in case decoding fails.
pub fn try_from_raw(
raw: sc_network::config::IncomingRequest,
reputation_changes: Vec<UnifiedReputationChange>
) -> Result<Self, ReceiveError> {
let sc_network::config::IncomingRequest {
payload,
peer,
pending_response,
} = raw;
let payload = match Req::decode(&mut payload.as_ref()) {
Ok(payload) => payload,
Err(err) => {
let reputation_changes = reputation_changes
.into_iter()
.map(|r| r.into_base_rep())
.collect();
let response = sc_network::config::OutgoingResponse {
result: Err(()),
reputation_changes,
sent_feedback: None,
};
if let Err(_) = pending_response.send(response) {
return Err(ReceiveError::DecodingErrorNoReputationChange(peer, err))
}
return Err(ReceiveError::DecodingError(peer, err))
}
};
Ok(Self::new(peer, payload, pending_response))
}
/// Send the response back.
///
/// Calls [`OutgoingResponseSender::send_response`].
pub fn send_response(self, resp: Req::Response) -> Result<(), Req::Response> {
self.pending_response.send_response(resp)
}
/// Send response with additional options.
///
/// Calls [`OutgoingResponseSender::send_outgoing_response`].
pub fn send_outgoing_response(self, resp: OutgoingResponse<<Req as IsRequest>::Response>)
-> Result<(), ()> {
self.pending_response.send_outgoing_response(resp)
}
}
/// Future for actually receiving a typed response for an OutgoingRequest.
async fn receive_response<Req>(
rec: oneshot::Receiver<Result<Vec<u8>, network::RequestFailure>>,
@@ -20,7 +20,7 @@ use parity_scale_codec::{Decode, Encode};
use polkadot_primitives::v1::{CandidateHash, CandidateReceipt, CommittedCandidateReceipt, Hash, ValidatorIndex};
use polkadot_primitives::v1::Id as ParaId;
use polkadot_node_primitives::{AvailableData, PoV, ErasureChunk};
use polkadot_node_primitives::{AvailableData, DisputeMessage, ErasureChunk, PoV, UncheckedDisputeMessage};
use super::request::IsRequest;
use super::Protocol;
@@ -192,3 +192,28 @@ impl IsRequest for StatementFetchingRequest {
type Response = StatementFetchingResponse;
const PROTOCOL: Protocol = Protocol::StatementFetching;
}
/// A dispute request.
///
/// Contains an invalid vote a valid one for a particular candidate in a given session.
#[derive(Clone, Encode, Decode, Debug)]
pub struct DisputeRequest(pub UncheckedDisputeMessage);
impl From<DisputeMessage> for DisputeRequest {
fn from(msg: DisputeMessage) -> Self {
Self(msg.into())
}
}
/// Possible responses to a `DisputeRequest`.
#[derive(Encode, Decode, Debug, PartialEq, Eq)]
pub enum DisputeResponse {
/// Recipient successfully processed the dispute request.
#[codec(index = 0)]
Confirmed
}
impl IsRequest for DisputeRequest {
type Response = DisputeResponse;
const PROTOCOL: Protocol = Protocol::DisputeSending;
}
@@ -60,11 +60,11 @@ impl From<runtime::Error> for Error {
#[derive(Debug, Error)]
pub enum Fatal {
/// Requester channel is never closed.
#[error("Requester receiver stream finished.")]
#[error("Requester receiver stream finished")]
RequesterReceiverFinished,
/// Responder channel is never closed.
#[error("Responder receiver stream finished.")]
#[error("Responder receiver stream finished")]
ResponderReceiverFinished,
/// Spawning a running task failed.
@@ -580,7 +580,7 @@ struct FetchingInfo {
}
/// Messages to be handled in this subsystem.
enum Message {
enum MuxedMessage {
/// Messages from other subsystems.
Subsystem(FatalResult<FromOverseer<StatementDistributionMessage>>),
/// Messages from spawned requester background tasks.
@@ -589,12 +589,12 @@ enum Message {
Responder(Option<ResponderMessage>)
}
impl Message {
impl MuxedMessage {
async fn receive(
ctx: &mut (impl SubsystemContext<Message = StatementDistributionMessage> + overseer::SubsystemContext<Message = StatementDistributionMessage>),
from_requester: &mut mpsc::Receiver<RequesterMessage>,
from_responder: &mut mpsc::Receiver<ResponderMessage>,
) -> Message {
) -> MuxedMessage {
// We are only fusing here to make `select` happy, in reality we will quit if one of those
// streams end:
let from_overseer = ctx.recv().fuse();
@@ -602,9 +602,9 @@ impl Message {
let from_responder = from_responder.next();
futures::pin_mut!(from_overseer, from_requester, from_responder);
futures::select! {
msg = from_overseer => Message::Subsystem(msg.map_err(Fatal::SubsystemReceive)),
msg = from_requester => Message::Requester(msg),
msg = from_responder => Message::Responder(msg),
msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(Fatal::SubsystemReceive)),
msg = from_requester => MuxedMessage::Requester(msg),
msg = from_responder => MuxedMessage::Responder(msg),
}
}
}
@@ -1614,9 +1614,9 @@ impl StatementDistribution {
let (res_sender, mut res_receiver) = mpsc::channel(1);
loop {
let message = Message::receive(&mut ctx, &mut req_receiver, &mut res_receiver).await;
let message = MuxedMessage::receive(&mut ctx, &mut req_receiver, &mut res_receiver).await;
match message {
Message::Subsystem(result) => {
MuxedMessage::Subsystem(result) => {
let result = self.handle_subsystem_message(
&mut ctx,
&mut runtime,
@@ -1637,7 +1637,7 @@ impl StatementDistribution {
tracing::debug!(target: LOG_TARGET, ?error)
}
}
Message::Requester(result) => {
MuxedMessage::Requester(result) => {
let result = self.handle_requester_message(
&mut ctx,
&gossip_peers,
@@ -1649,7 +1649,7 @@ impl StatementDistribution {
.await;
log_error(result.map_err(From::from), "handle_requester_message")?;
}
Message::Responder(result) => {
MuxedMessage::Responder(result) => {
let result = self.handle_responder_message(
&peers,
&mut active_heads,
@@ -1856,8 +1856,8 @@ impl StatementDistribution {
"New active leaf",
);
let session_index = runtime.get_session_index(ctx, relay_parent).await?;
let info = runtime.get_session_info_by_index(ctx, relay_parent, session_index).await?;
let session_index = runtime.get_session_index(ctx.sender(), relay_parent).await?;
let info = runtime.get_session_info_by_index(ctx.sender(), relay_parent, session_index).await?;
let session_info = &info.session_info;
active_heads.entry(relay_parent)
@@ -1899,7 +1899,7 @@ impl StatementDistribution {
}
}
let info = runtime.get_session_info(ctx, relay_parent).await?;
let info = runtime.get_session_info(ctx.sender(), relay_parent).await?;
let session_info = &info.session_info;
let validator_info = &info.validator_info;
@@ -16,8 +16,6 @@
use futures::{SinkExt, StreamExt, channel::{mpsc, oneshot}, stream::FuturesUnordered};
use parity_scale_codec::Decode;
use polkadot_node_network_protocol::{
PeerId, UnifiedReputationChange as Rep,
request_response::{
@@ -85,35 +83,26 @@ pub async fn respond(
Some(v) => v,
};
let sc_network::config::IncomingRequest {
payload,
peer,
pending_response,
} = raw;
let payload = match StatementFetchingRequest::decode(&mut payload.as_ref()) {
let req =
match IncomingRequest::<StatementFetchingRequest>::try_from_raw(
raw,
vec![COST_INVALID_REQUEST],
) {
Err(err) => {
tracing::debug!(
target: LOG_TARGET,
?err,
"Decoding request failed"
);
report_peer(pending_response, COST_INVALID_REQUEST);
continue
}
Ok(payload) => payload,
};
let req = IncomingRequest::new(
peer,
payload,
pending_response
);
let (tx, rx) = oneshot::channel();
if let Err(err) = sender.feed(
ResponderMessage::GetData {
requesting_peer: peer,
requesting_peer: req.peer,
relay_parent: req.payload.relay_parent,
candidate_hash: req.payload.candidate_hash,
tx,
@@ -152,20 +141,3 @@ pub async fn respond(
}
}
}
/// Report peer who sent us a request.
fn report_peer(
tx: oneshot::Sender<sc_network::config::OutgoingResponse>,
rep: Rep,
) {
if let Err(_) = tx.send(sc_network::config::OutgoingResponse {
result: Err(()),
reputation_changes: vec![rep.into_base_rep()],
sent_feedback: None,
}) {
tracing::debug!(
target: LOG_TARGET,
"Reporting peer failed."
);
}
}
@@ -18,6 +18,7 @@ use std::time::Duration;
use std::sync::Arc;
use std::iter::FromIterator as _;
use parity_scale_codec::{Decode, Encode};
use polkadot_node_subsystem_test_helpers::mock::make_ferdie_keystore;
use super::*;
use sp_keyring::Sr25519Keyring;
use sp_application_crypto::{AppKey, sr25519::Pair, Pair as TraitPair};
@@ -1704,14 +1705,3 @@ fn make_session_info(validators: Vec<Pair>, groups: Vec<Vec<u32>>) -> SessionInf
needed_approvals: 0,
}
}
pub fn make_ferdie_keystore() -> SyncCryptoStorePtr {
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
SyncCryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&Sr25519Keyring::Ferdie.to_seed()),
)
.expect("Insert key into keystore");
keystore
}