From 151d73af5bb6d7e01469c143ac0e394acb6e9afd Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 8 Jul 2020 18:15:39 -0400 Subject: [PATCH] Implement PoV Distribution Subsystem (#1344) * introduce candidatedescriptor type * add PoVDistribution message type * loosen bound on PoV Distribution to account for equivocations * re-export some types from the messages module * begin PoV Distribution subsystem * remove redundant index from PoV distribution * define state machine for pov distribution * handle overseer signals * set up control flow * remove `ValidatorStatement` section * implement PoV fetching * implement distribution logic * add missing ` * implement some network bridge event handlers * stub for message processing, handle our view change * control flow for handling messages * handle `awaiting` message * handle any incoming PoVs and redistribute * actually provide a subsystem implementation * remove set-builder notation * begin testing PoV distribution * test that we send awaiting messages only to peers with same view * ensure we distribute awaited PoVs to peers on view changes * test that peers can complete fetch and are rewarded * test some reporting logic * ensure peer is reported for flooding * test punishing peers diverging from awaited protocol * test that we eagerly complete peers' awaited PoVs based on what we receive * test that we prune the awaited set after receiving * expand pov-distribution in guide to match a change I made * remove unneeded import --- polkadot/Cargo.lock | 19 + polkadot/Cargo.toml | 1 + polkadot/node/network/README.md | 2 +- .../node/network/pov-distribution/Cargo.toml | 22 + .../node/network/pov-distribution/src/lib.rs | 1457 +++++++++++++++++ polkadot/node/subsystem/src/messages.rs | 20 + polkadot/primitives/src/parachain.rs | 31 + .../src/node/backing/pov-distribution.md | 22 +- .../src/types/overseer-protocol.md | 7 +- 9 files changed, 1565 insertions(+), 16 deletions(-) create mode 100644 polkadot/node/network/pov-distribution/Cargo.toml create mode 100644 polkadot/node/network/pov-distribution/src/lib.rs diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index e10ee33e1c..9b53e65c3f 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -4540,6 +4540,25 @@ dependencies = [ "sp-wasm-interface", ] +[[package]] +name = "polkadot-pov-distribution" +version = "0.1.0" +dependencies = [ + "assert_matches", + "futures 0.3.5", + "futures-timer 3.0.2", + "log 0.4.8", + "parity-scale-codec", + "parking_lot 0.10.2", + "polkadot-node-primitives", + "polkadot-node-subsystem", + "polkadot-primitives", + "polkadot-subsystem-test-helpers", + "sc-network", + "sp-runtime", + "streamunordered", +] + [[package]] name = "polkadot-primitives" version = "0.8.14" diff --git a/polkadot/Cargo.toml b/polkadot/Cargo.toml index 622355ecb3..fa5fa16279 100644 --- a/polkadot/Cargo.toml +++ b/polkadot/Cargo.toml @@ -45,6 +45,7 @@ members = [ "node/core/proposer", "node/network/bridge", + "node/network/pov-distribution", "node/network/statement-distribution", "node/overseer", "node/primitives", diff --git a/polkadot/node/network/README.md b/polkadot/node/network/README.md index 64f0f11af5..e035485b85 100644 --- a/polkadot/node/network/README.md +++ b/polkadot/node/network/README.md @@ -1 +1 @@ -Stub - This folder will hold networking subsystem implementations, each with their own crate. +This folder holds all networking subsystem implementations, each with their own crate. diff --git a/polkadot/node/network/pov-distribution/Cargo.toml b/polkadot/node/network/pov-distribution/Cargo.toml new file mode 100644 index 0000000000..a99e5e3d56 --- /dev/null +++ b/polkadot/node/network/pov-distribution/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "polkadot-pov-distribution" +version = "0.1.0" +authors = ["Parity Technologies "] +edition = "2018" + +[dependencies] +futures = "0.3.5" +log = "0.4.8" +futures-timer = "3.0.2" +streamunordered = "0.5.1" +polkadot-primitives = { path = "../../../primitives" } +node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" } +parity-scale-codec = "1.3.0" +sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } + +[dev-dependencies] +parking_lot = "0.10.0" +subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" } +assert_matches = "1.3.0" diff --git a/polkadot/node/network/pov-distribution/src/lib.rs b/polkadot/node/network/pov-distribution/src/lib.rs new file mode 100644 index 0000000000..589e33fde0 --- /dev/null +++ b/polkadot/node/network/pov-distribution/src/lib.rs @@ -0,0 +1,1457 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! PoV Distribution Subsystem of Polkadot. +//! +//! This is a gossip implementation of code that is responsible for distributing PoVs +//! among validators. + +use polkadot_primitives::Hash; +use polkadot_primitives::parachain::{PoVBlock as PoV, CandidateDescriptor}; +use polkadot_subsystem::{ + OverseerSignal, SubsystemContext, Subsystem, SubsystemResult, FromOverseer, SpawnedSubsystem, +}; +use polkadot_subsystem::messages::{ + PoVDistributionMessage, NetworkBridgeEvent, ReputationChange as Rep, PeerId, + RuntimeApiMessage, RuntimeApiRequest, AllMessages, NetworkBridgeMessage, +}; +use node_primitives::{View, ProtocolId}; + +use futures::prelude::*; +use futures::channel::oneshot; +use parity_scale_codec::{Encode, Decode}; + +use std::collections::{hash_map::{Entry, HashMap}, HashSet}; +use std::sync::Arc; + +const COST_APPARENT_FLOOD: Rep = Rep::new(-500, "Peer appears to be flooding us with PoV requests"); +const COST_UNEXPECTED_POV: Rep = Rep::new(-500, "Peer sent us an unexpected PoV"); +const COST_MALFORMED_MESSAGE: Rep = Rep::new(-500, "Peer sent us a malformed message"); +const COST_AWAITED_NOT_IN_VIEW: Rep + = Rep::new(-100, "Peer claims to be awaiting something outside of its view"); + +const BENEFIT_FRESH_POV: Rep = Rep::new(25, "Peer supplied us with an awaited PoV"); +const BENEFIT_LATE_POV: Rep = Rep::new(10, "Peer supplied us with an awaited PoV, \ + but was not the first to do so"); + +const PROTOCOL_V1: ProtocolId = *b"pvd1"; + +#[derive(Encode, Decode)] +enum WireMessage { + /// Notification that we are awaiting the given PoVs (by hash) against a + /// specific relay-parent hash. + #[codec(index = "0")] + Awaiting(Hash, Vec), + /// Notification of an awaited PoV, in a given relay-parent context. + /// (relay_parent, pov_hash, pov) + #[codec(index = "1")] + SendPoV(Hash, Hash, PoV), +} + +/// The PoV Distribution Subsystem. +pub struct PoVDistribution; + +impl Subsystem for PoVDistribution + where C: SubsystemContext +{ + fn start(self, ctx: C) -> SpawnedSubsystem { + // Swallow error because failure is fatal to the node and we log with more precision + // within `run`. + SpawnedSubsystem(run(ctx).map(|_| ()).boxed()) + } +} + +struct State { + relay_parent_state: HashMap, + peer_state: HashMap, + our_view: View, +} + +struct BlockBasedState { + known: HashMap>, + /// All the PoVs we are or were fetching, coupled with channels expecting the data. + /// + /// This may be an empty list, which indicates that we were once awaiting this PoV but have + /// received it already. + fetching: HashMap>>>, + n_validators: usize, +} + +#[derive(Default)] +struct PeerState { + /// A set of awaited PoV-hashes for each relay-parent in the peer's view. + awaited: HashMap>, +} + +/// Handles the signal. If successful, returns `true` if the subsystem should conclude, +/// `false` otherwise. +async fn handle_signal( + state: &mut State, + ctx: &mut impl SubsystemContext, + signal: OverseerSignal, +) -> SubsystemResult { + match signal { + OverseerSignal::Conclude => Ok(true), + OverseerSignal::StartWork(relay_parent) => { + let (vals_tx, vals_rx) = oneshot::channel(); + ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(vals_tx), + ))).await?; + + state.relay_parent_state.insert(relay_parent, BlockBasedState { + known: HashMap::new(), + fetching: HashMap::new(), + n_validators: vals_rx.await?.len(), + }); + + Ok(false) + } + OverseerSignal::StopWork(relay_parent) => { + state.relay_parent_state.remove(&relay_parent); + + Ok(false) + } + } +} + +/// Notify peers that we are awaiting a given PoV hash. +/// +/// This only notifies peers who have the relay parent in their view. +async fn notify_all_we_are_awaiting( + peers: &mut HashMap, + ctx: &mut impl SubsystemContext, + relay_parent: Hash, + pov_hash: Hash, +) -> SubsystemResult<()> { + // We use `awaited` as a proxy for which heads are in the peer's view. + let peers_to_send: Vec<_> = peers.iter() + .filter_map(|(peer, state)| if state.awaited.contains_key(&relay_parent) { + Some(peer.clone()) + } else { + None + }) + .collect(); + + if peers_to_send.is_empty() { return Ok(()) } + + let payload = WireMessage::Awaiting(relay_parent, vec![pov_hash]).encode(); + + ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( + peers_to_send, + PROTOCOL_V1, + payload, + ))).await +} + +/// Notify one peer about everything we're awaiting at a given relay-parent. +async fn notify_one_we_are_awaiting_many( + peer: &PeerId, + ctx: &mut impl SubsystemContext, + relay_parent_state: &HashMap, + relay_parent: Hash, +) -> SubsystemResult<()> { + let awaiting_hashes = relay_parent_state.get(&relay_parent).into_iter().flat_map(|s| { + // Send the peer everything we are fetching at this relay-parent + s.fetching.iter() + .filter(|(_, senders)| !senders.is_empty()) // that has not been completed already. + .map(|(pov_hash, _)| *pov_hash) + }).collect::>(); + + if awaiting_hashes.is_empty() { return Ok(()) } + + let payload = WireMessage::Awaiting(relay_parent, awaiting_hashes).encode(); + + ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( + vec![peer.clone()], + PROTOCOL_V1, + payload, + ))).await +} + +/// Distribute a PoV to peers who are awaiting it. +async fn distribute_to_awaiting( + peers: &mut HashMap, + ctx: &mut impl SubsystemContext, + relay_parent: Hash, + pov_hash: Hash, + pov: &PoV, +) -> SubsystemResult<()> { + // Send to all peers who are awaiting the PoV and have that relay-parent in their view. + // + // Also removes it from their awaiting set. + let peers_to_send: Vec<_> = peers.iter_mut() + .filter_map(|(peer, state)| state.awaited.get_mut(&relay_parent).and_then(|awaited| { + if awaited.remove(&pov_hash) { + Some(peer.clone()) + } else { + None + } + })) + .collect(); + + if peers_to_send.is_empty() { return Ok(()) } + + let payload = WireMessage::SendPoV(relay_parent, pov_hash, pov.clone()).encode(); + + ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( + peers_to_send, + PROTOCOL_V1, + payload, + ))).await +} + +/// Handles a `FetchPoV` message. +async fn handle_fetch( + state: &mut State, + ctx: &mut impl SubsystemContext, + relay_parent: Hash, + descriptor: CandidateDescriptor, + response_sender: oneshot::Sender>, +) -> SubsystemResult<()> { + let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) { + Some(s) => s, + None => return Ok(()), + }; + + if let Some(pov) = relay_parent_state.known.get(&descriptor.pov_hash) { + let _ = response_sender.send(pov.clone()); + return Ok(()); + } + + { + match relay_parent_state.fetching.entry(descriptor.pov_hash) { + Entry::Occupied(mut e) => { + // we are already awaiting this PoV if there is an entry. + e.get_mut().push(response_sender); + return Ok(()); + } + Entry::Vacant(e) => { + e.insert(vec![response_sender]); + } + } + } + + if relay_parent_state.fetching.len() > 2 * relay_parent_state.n_validators { + log::warn!("Other subsystems have requested PoV distribution to \ + fetch more PoVs than reasonably expected: {}", relay_parent_state.fetching.len()); + return Ok(()); + } + + // Issue an `Awaiting` message to all peers with this in their view. + notify_all_we_are_awaiting( + &mut state.peer_state, + ctx, + relay_parent, + descriptor.pov_hash + ).await +} + +/// Handles a `DistributePoV` message. +async fn handle_distribute( + state: &mut State, + ctx: &mut impl SubsystemContext, + relay_parent: Hash, + descriptor: CandidateDescriptor, + pov: Arc, +) -> SubsystemResult<()> { + let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) { + None => return Ok(()), + Some(s) => s, + }; + + if let Some(our_awaited) = relay_parent_state.fetching.get_mut(&descriptor.pov_hash) { + // Drain all the senders, but keep the entry in the map around intentionally. + // + // It signals that we were at one point awaiting this, so we will be able to tell + // why peers are sending it to us. + for response_sender in our_awaited.drain(..) { + let _ = response_sender.send(pov.clone()); + } + } + + relay_parent_state.known.insert(descriptor.pov_hash, pov.clone()); + + distribute_to_awaiting( + &mut state.peer_state, + ctx, + relay_parent, + descriptor.pov_hash, + &*pov, + ).await +} + +/// Report a reputation change for a peer. +async fn report_peer( + ctx: &mut impl SubsystemContext, + peer: PeerId, + rep: Rep, +) -> SubsystemResult<()> { + ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(peer, rep))).await +} + +/// Handle a notification from a peer that they are awaiting some PoVs. +async fn handle_awaiting( + state: &mut State, + ctx: &mut impl SubsystemContext, + peer: PeerId, + relay_parent: Hash, + pov_hashes: Vec, +) -> SubsystemResult<()> { + if !state.our_view.0.contains(&relay_parent) { + report_peer(ctx, peer, COST_AWAITED_NOT_IN_VIEW).await?; + return Ok(()); + } + + let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) { + None => { + log::warn!("PoV Distribution relay parent state out-of-sync with our view"); + return Ok(()); + } + Some(s) => s, + }; + + let peer_awaiting = match + state.peer_state.get_mut(&peer).and_then(|s| s.awaited.get_mut(&relay_parent)) + { + None => { + report_peer(ctx, peer, COST_AWAITED_NOT_IN_VIEW).await?; + return Ok(()); + } + Some(a) => a, + }; + + let will_be_awaited = peer_awaiting.len() + pov_hashes.len(); + if will_be_awaited <= 2 * relay_parent_state.n_validators { + for pov_hash in pov_hashes { + // For all requested PoV hashes, if we have it, we complete the request immediately. + // Otherwise, we note that the peer is awaiting the PoV. + if let Some(pov) = relay_parent_state.known.get(&pov_hash) { + ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( + vec![peer.clone()], + PROTOCOL_V1, + WireMessage::SendPoV(relay_parent, pov_hash, (&**pov).clone()).encode(), + ))).await?; + } else { + peer_awaiting.insert(pov_hash); + } + } + } else { + report_peer(ctx, peer, COST_APPARENT_FLOOD).await?; + } + + Ok(()) +} + +/// Handle an incoming PoV from our peer. Reports them if unexpected, rewards them if not. +/// +/// Completes any requests awaiting that PoV. +async fn handle_incoming_pov( + state: &mut State, + ctx: &mut impl SubsystemContext, + peer: PeerId, + relay_parent: Hash, + pov_hash: Hash, + pov: PoV, +) -> SubsystemResult<()> { + let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) { + None => { + report_peer(ctx, peer, COST_UNEXPECTED_POV).await?; + return Ok(()); + }, + Some(r) => r, + }; + + let pov = { + // Do validity checks and complete all senders awaiting this PoV. + let fetching = match relay_parent_state.fetching.get_mut(&pov_hash) { + None => { + report_peer(ctx, peer, COST_UNEXPECTED_POV).await?; + return Ok(()); + } + Some(f) => f, + }; + + let hash = pov.hash(); + if hash != pov_hash { + report_peer(ctx, peer, COST_UNEXPECTED_POV).await?; + return Ok(()); + } + + let pov = Arc::new(pov); + + if fetching.is_empty() { + // fetching is empty whenever we were awaiting something and + // it was completed afterwards. + report_peer(ctx, peer.clone(), BENEFIT_LATE_POV).await?; + } else { + // fetching is non-empty when the peer just provided us with data we needed. + report_peer(ctx, peer.clone(), BENEFIT_FRESH_POV).await?; + } + + for response_sender in fetching.drain(..) { + let _ = response_sender.send(pov.clone()); + } + + pov + }; + + // make sure we don't consider this peer as awaiting that PoV anymore. + if let Some(peer_state) = state.peer_state.get_mut(&peer) { + peer_state.awaited.remove(&pov_hash); + } + + // distribute the PoV to all other peers who are awaiting it. + distribute_to_awaiting( + &mut state.peer_state, + ctx, + relay_parent, + pov_hash, + &*pov, + ).await +} + +/// Handles a network bridge update. +async fn handle_network_update( + state: &mut State, + ctx: &mut impl SubsystemContext, + update: NetworkBridgeEvent, +) -> SubsystemResult<()> { + match update { + NetworkBridgeEvent::PeerConnected(peer, _observed_role) => { + state.peer_state.insert(peer, PeerState { awaited: HashMap::new() }); + Ok(()) + } + NetworkBridgeEvent::PeerDisconnected(peer) => { + state.peer_state.remove(&peer); + Ok(()) + } + NetworkBridgeEvent::PeerViewChange(peer_id, view) => { + if let Some(peer_state) = state.peer_state.get_mut(&peer_id) { + // prune anything not in the new view. + peer_state.awaited.retain(|relay_parent, _| view.0.contains(&relay_parent)); + + // introduce things from the new view. + for relay_parent in view.0.iter() { + if let Entry::Vacant(entry) = peer_state.awaited.entry(*relay_parent) { + entry.insert(HashSet::new()); + + // Notify the peer about everything we're awaiting at the new relay-parent. + notify_one_we_are_awaiting_many( + &peer_id, + ctx, + &state.relay_parent_state, + *relay_parent, + ).await?; + } + } + } + + Ok(()) + } + NetworkBridgeEvent::PeerMessage(peer, bytes) => { + match WireMessage::decode(&mut &bytes[..]) { + Ok(msg) => match msg { + WireMessage::Awaiting(relay_parent, pov_hashes) => handle_awaiting( + state, + ctx, + peer, + relay_parent, + pov_hashes, + ).await, + WireMessage::SendPoV(relay_parent, pov_hash, pov) => handle_incoming_pov( + state, + ctx, + peer, + relay_parent, + pov_hash, + pov, + ).await, + }, + Err(_) => { + report_peer(ctx, peer, COST_MALFORMED_MESSAGE).await?; + Ok(()) + } + } + } + NetworkBridgeEvent::OurViewChange(view) => { + state.our_view = view; + Ok(()) + } + } +} + +fn network_update_message(update: NetworkBridgeEvent) -> AllMessages { + AllMessages::PoVDistribution(PoVDistributionMessage::NetworkBridgeUpdate(update)) +} + +async fn run( + mut ctx: impl SubsystemContext, +) -> SubsystemResult<()> { + // startup: register the network protocol with the bridge. + ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::RegisterEventProducer( + PROTOCOL_V1, + network_update_message, + ))).await?; + + let mut state = State { + relay_parent_state: HashMap::new(), + peer_state: HashMap::new(), + our_view: View(Vec::new()), + }; + + loop { + match ctx.recv().await? { + FromOverseer::Signal(signal) => if handle_signal(&mut state, &mut ctx, signal).await? { + return Ok(()); + }, + FromOverseer::Communication { msg } => match msg { + PoVDistributionMessage::FetchPoV(relay_parent, descriptor, response_sender) => + handle_fetch( + &mut state, + &mut ctx, + relay_parent, + descriptor, + response_sender, + ).await?, + PoVDistributionMessage::DistributePoV(relay_parent, descriptor, pov) => + handle_distribute( + &mut state, + &mut ctx, + relay_parent, + descriptor, + pov, + ).await?, + PoVDistributionMessage::NetworkBridgeUpdate(event) => + handle_network_update( + &mut state, + &mut ctx, + event, + ).await?, + }, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::executor::{self, ThreadPool}; + use polkadot_primitives::parachain::BlockData; + use assert_matches::assert_matches; + + fn make_pov(data: Vec) -> PoV { + PoV { block_data: BlockData(data) } + } + + fn make_peer_state(awaited: Vec<(Hash, Vec)>) + -> PeerState + { + PeerState { + awaited: awaited.into_iter().map(|(rp, h)| (rp, h.into_iter().collect())).collect() + } + } + + #[test] + fn distributes_to_those_awaiting_and_completes_local() { + let hash_a: Hash = [0; 32].into(); + let hash_b: Hash = [1; 32].into(); + + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + let peer_c = PeerId::random(); + + let (pov_send, pov_recv) = oneshot::channel(); + let pov = make_pov(vec![1, 2, 3]); + let pov_hash = pov.hash(); + + let mut state = State { + relay_parent_state: { + let mut s = HashMap::new(); + let mut b = BlockBasedState { + known: HashMap::new(), + fetching: HashMap::new(), + n_validators: 10, + }; + + b.fetching.insert(pov_hash, vec![pov_send]); + s.insert(hash_a, b); + s + }, + peer_state: { + let mut s = HashMap::new(); + + // peer A has hash_a in its view and is awaiting the PoV. + s.insert( + peer_a.clone(), + make_peer_state(vec![(hash_a, vec![pov_hash])]), + ); + + // peer B has hash_a in its view but is not awaiting. + s.insert( + peer_b.clone(), + make_peer_state(vec![(hash_a, vec![])]), + ); + + // peer C doesn't have hash_a in its view but is awaiting the PoV under hash_b. + s.insert( + peer_c.clone(), + make_peer_state(vec![(hash_b, vec![pov_hash])]), + ); + + s + }, + our_view: View(vec![hash_a, hash_b]), + }; + + let pool = ThreadPool::new().unwrap(); + let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + let mut descriptor = CandidateDescriptor::default(); + descriptor.pov_hash = pov_hash; + + executor::block_on(async move { + handle_distribute( + &mut state, + &mut ctx, + hash_a, + descriptor, + Arc::new(pov.clone()), + ).await.unwrap(); + + assert!(!state.peer_state[&peer_a].awaited[&hash_a].contains(&pov_hash)); + assert!(state.peer_state[&peer_c].awaited[&hash_b].contains(&pov_hash)); + + // our local sender also completed + assert_eq!(&*pov_recv.await.unwrap(), &pov); + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendMessage(peers, protocol, message) + ) => { + assert_eq!(peers, vec![peer_a.clone()]); + assert_eq!(protocol, PROTOCOL_V1); + assert_eq!( + message, + WireMessage::SendPoV(hash_a, pov_hash, pov.clone()).encode(), + ); + } + ) + }); + } + + #[test] + fn we_inform_peers_with_same_view_we_are_awaiting() { + let hash_a: Hash = [0; 32].into(); + let hash_b: Hash = [1; 32].into(); + + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + + let (pov_send, _) = oneshot::channel(); + let pov = make_pov(vec![1, 2, 3]); + let pov_hash = pov.hash(); + + let mut state = State { + relay_parent_state: { + let mut s = HashMap::new(); + let b = BlockBasedState { + known: HashMap::new(), + fetching: HashMap::new(), + n_validators: 10, + }; + + s.insert(hash_a, b); + s + }, + peer_state: { + let mut s = HashMap::new(); + + // peer A has hash_a in its view. + s.insert( + peer_a.clone(), + make_peer_state(vec![(hash_a, vec![])]), + ); + + // peer B doesn't have hash_a in its view. + s.insert( + peer_b.clone(), + make_peer_state(vec![(hash_b, vec![])]), + ); + + s + }, + our_view: View(vec![hash_a]), + }; + + let pool = ThreadPool::new().unwrap(); + let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + let mut descriptor = CandidateDescriptor::default(); + descriptor.pov_hash = pov_hash; + + executor::block_on(async move { + handle_fetch( + &mut state, + &mut ctx, + hash_a, + descriptor, + pov_send, + ).await.unwrap(); + + assert_eq!(state.relay_parent_state[&hash_a].fetching[&pov_hash].len(), 1); + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendMessage(peers, protocol, message) + ) => { + assert_eq!(peers, vec![peer_a.clone()]); + assert_eq!(protocol, PROTOCOL_V1); + assert_eq!( + message, + WireMessage::Awaiting(hash_a, vec![pov_hash]).encode(), + ); + } + ) + }); + } + + #[test] + fn peer_view_change_leads_to_us_informing() { + let hash_a: Hash = [0; 32].into(); + let hash_b: Hash = [1; 32].into(); + + let peer_a = PeerId::random(); + + let (pov_a_send, _) = oneshot::channel(); + + let pov_a = make_pov(vec![1, 2, 3]); + let pov_a_hash = pov_a.hash(); + + let pov_b = make_pov(vec![4, 5, 6]); + let pov_b_hash = pov_b.hash(); + + let mut state = State { + relay_parent_state: { + let mut s = HashMap::new(); + let mut b = BlockBasedState { + known: HashMap::new(), + fetching: HashMap::new(), + n_validators: 10, + }; + + // pov_a is still being fetched, whereas the fetch of pov_b has already + // completed, as implied by the empty vector. + b.fetching.insert(pov_a_hash, vec![pov_a_send]); + b.fetching.insert(pov_b_hash, vec![]); + + s.insert(hash_a, b); + s + }, + peer_state: { + let mut s = HashMap::new(); + + // peer A doesn't yet have hash_a in its view. + s.insert( + peer_a.clone(), + make_peer_state(vec![(hash_b, vec![])]), + ); + + s + }, + our_view: View(vec![hash_a]), + }; + + let pool = ThreadPool::new().unwrap(); + let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + + executor::block_on(async move { + handle_network_update( + &mut state, + &mut ctx, + NetworkBridgeEvent::PeerViewChange(peer_a.clone(), View(vec![hash_a, hash_b])), + ).await.unwrap(); + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendMessage(peers, protocol, message) + ) => { + assert_eq!(peers, vec![peer_a.clone()]); + assert_eq!(protocol, PROTOCOL_V1); + assert_eq!( + message, + WireMessage::Awaiting(hash_a, vec![pov_a_hash]).encode(), + ); + } + ) + }); + } + + #[test] + fn peer_complete_fetch_and_is_rewarded() { + let hash_a: Hash = [0; 32].into(); + + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + + let (pov_send, pov_recv) = oneshot::channel(); + + let pov = make_pov(vec![1, 2, 3]); + let pov_hash = pov.hash(); + + let mut state = State { + relay_parent_state: { + let mut s = HashMap::new(); + let mut b = BlockBasedState { + known: HashMap::new(), + fetching: HashMap::new(), + n_validators: 10, + }; + + // pov is being fetched. + b.fetching.insert(pov_hash, vec![pov_send]); + + s.insert(hash_a, b); + s + }, + peer_state: { + let mut s = HashMap::new(); + + // peers A and B are functionally the same. + s.insert( + peer_a.clone(), + make_peer_state(vec![(hash_a, vec![])]), + ); + + s.insert( + peer_b.clone(), + make_peer_state(vec![(hash_a, vec![])]), + ); + + s + }, + our_view: View(vec![hash_a]), + }; + + let pool = ThreadPool::new().unwrap(); + let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + + executor::block_on(async move { + // Peer A answers our request before peer B. + handle_network_update( + &mut state, + &mut ctx, + NetworkBridgeEvent::PeerMessage( + peer_a.clone(), + WireMessage::SendPoV(hash_a, pov_hash, pov.clone()).encode(), + ), + ).await.unwrap(); + + handle_network_update( + &mut state, + &mut ctx, + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + WireMessage::SendPoV(hash_a, pov_hash, pov.clone()).encode(), + ), + ).await.unwrap(); + + assert_eq!(&*pov_recv.await.unwrap(), &pov); + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer(peer, rep) + ) => { + assert_eq!(peer, peer_a); + assert_eq!(rep, BENEFIT_FRESH_POV); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer(peer, rep) + ) => { + assert_eq!(peer, peer_b); + assert_eq!(rep, BENEFIT_LATE_POV); + } + ); + }); + } + + #[test] + fn peer_punished_for_sending_bad_pov() { + let hash_a: Hash = [0; 32].into(); + + let peer_a = PeerId::random(); + + let (pov_send, _) = oneshot::channel(); + + let pov = make_pov(vec![1, 2, 3]); + let pov_hash = pov.hash(); + + let bad_pov = make_pov(vec![6, 6, 6]); + + let mut state = State { + relay_parent_state: { + let mut s = HashMap::new(); + let mut b = BlockBasedState { + known: HashMap::new(), + fetching: HashMap::new(), + n_validators: 10, + }; + + // pov is being fetched. + b.fetching.insert(pov_hash, vec![pov_send]); + + s.insert(hash_a, b); + s + }, + peer_state: { + let mut s = HashMap::new(); + + s.insert( + peer_a.clone(), + make_peer_state(vec![(hash_a, vec![])]), + ); + + s + }, + our_view: View(vec![hash_a]), + }; + + let pool = ThreadPool::new().unwrap(); + let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + + executor::block_on(async move { + // Peer A answers our request: right relay parent, awaited hash, wrong PoV. + handle_network_update( + &mut state, + &mut ctx, + NetworkBridgeEvent::PeerMessage( + peer_a.clone(), + WireMessage::SendPoV(hash_a, pov_hash, bad_pov.clone()).encode(), + ), + ).await.unwrap(); + + // didn't complete our sender. + assert_eq!(state.relay_parent_state[&hash_a].fetching[&pov_hash].len(), 1); + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer(peer, rep) + ) => { + assert_eq!(peer, peer_a); + assert_eq!(rep, COST_UNEXPECTED_POV); + } + ); + }); + } + + #[test] + fn peer_punished_for_sending_unexpected_pov() { + let hash_a: Hash = [0; 32].into(); + + let peer_a = PeerId::random(); + + let pov = make_pov(vec![1, 2, 3]); + let pov_hash = pov.hash(); + + let mut state = State { + relay_parent_state: { + let mut s = HashMap::new(); + let b = BlockBasedState { + known: HashMap::new(), + fetching: HashMap::new(), + n_validators: 10, + }; + + s.insert(hash_a, b); + s + }, + peer_state: { + let mut s = HashMap::new(); + + s.insert( + peer_a.clone(), + make_peer_state(vec![(hash_a, vec![])]), + ); + + s + }, + our_view: View(vec![hash_a]), + }; + + let pool = ThreadPool::new().unwrap(); + let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + + executor::block_on(async move { + // Peer A answers our request: right relay parent, awaited hash, wrong PoV. + handle_network_update( + &mut state, + &mut ctx, + NetworkBridgeEvent::PeerMessage( + peer_a.clone(), + WireMessage::SendPoV(hash_a, pov_hash, pov.clone()).encode(), + ), + ).await.unwrap(); + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer(peer, rep) + ) => { + assert_eq!(peer, peer_a); + assert_eq!(rep, COST_UNEXPECTED_POV); + } + ); + }); + } + + #[test] + fn peer_punished_for_sending_pov_out_of_our_view() { + let hash_a: Hash = [0; 32].into(); + let hash_b: Hash = [1; 32].into(); + + let peer_a = PeerId::random(); + + let pov = make_pov(vec![1, 2, 3]); + let pov_hash = pov.hash(); + + let mut state = State { + relay_parent_state: { + let mut s = HashMap::new(); + let b = BlockBasedState { + known: HashMap::new(), + fetching: HashMap::new(), + n_validators: 10, + }; + + s.insert(hash_a, b); + s + }, + peer_state: { + let mut s = HashMap::new(); + + s.insert( + peer_a.clone(), + make_peer_state(vec![(hash_a, vec![])]), + ); + + s + }, + our_view: View(vec![hash_a]), + }; + + let pool = ThreadPool::new().unwrap(); + let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + + executor::block_on(async move { + // Peer A answers our request: right relay parent, awaited hash, wrong PoV. + handle_network_update( + &mut state, + &mut ctx, + NetworkBridgeEvent::PeerMessage( + peer_a.clone(), + WireMessage::SendPoV(hash_b, pov_hash, pov.clone()).encode(), + ), + ).await.unwrap(); + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer(peer, rep) + ) => { + assert_eq!(peer, peer_a); + assert_eq!(rep, COST_UNEXPECTED_POV); + } + ); + }); + } + + #[test] + fn peer_reported_for_awaiting_too_much() { + let hash_a: Hash = [0; 32].into(); + + let peer_a = PeerId::random(); + let n_validators = 10; + + let mut state = State { + relay_parent_state: { + let mut s = HashMap::new(); + let b = BlockBasedState { + known: HashMap::new(), + fetching: HashMap::new(), + n_validators, + }; + + s.insert(hash_a, b); + s + }, + peer_state: { + let mut s = HashMap::new(); + + s.insert( + peer_a.clone(), + make_peer_state(vec![(hash_a, vec![])]), + ); + + s + }, + our_view: View(vec![hash_a]), + }; + + let pool = ThreadPool::new().unwrap(); + let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + + executor::block_on(async move { + let max_plausibly_awaited = n_validators * 2; + + // The peer awaits a plausible (albeit unlikely) amount of PoVs. + for i in 0..max_plausibly_awaited { + let pov_hash = make_pov(vec![i as u8; 32]).hash(); + handle_network_update( + &mut state, + &mut ctx, + NetworkBridgeEvent::PeerMessage( + peer_a.clone(), + WireMessage::Awaiting(hash_a, vec![pov_hash]).encode(), + ), + ).await.unwrap(); + } + + assert_eq!(state.peer_state[&peer_a].awaited[&hash_a].len(), max_plausibly_awaited); + + // The last straw: + let last_pov_hash = make_pov(vec![max_plausibly_awaited as u8; 32]).hash(); + handle_network_update( + &mut state, + &mut ctx, + NetworkBridgeEvent::PeerMessage( + peer_a.clone(), + WireMessage::Awaiting(hash_a, vec![last_pov_hash]).encode(), + ), + ).await.unwrap(); + + // No more bookkeeping for you! + assert_eq!(state.peer_state[&peer_a].awaited[&hash_a].len(), max_plausibly_awaited); + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer(peer, rep) + ) => { + assert_eq!(peer, peer_a); + assert_eq!(rep, COST_APPARENT_FLOOD); + } + ); + }); + } + + #[test] + fn peer_reported_for_awaiting_outside_their_view() { + let hash_a: Hash = [0; 32].into(); + let hash_b: Hash = [1; 32].into(); + + let peer_a = PeerId::random(); + + let mut state = State { + relay_parent_state: { + let mut s = HashMap::new(); + s.insert(hash_a, BlockBasedState { + known: HashMap::new(), + fetching: HashMap::new(), + n_validators: 10, + }); + + s.insert(hash_b, BlockBasedState { + known: HashMap::new(), + fetching: HashMap::new(), + n_validators: 10, + }); + + s + }, + peer_state: { + let mut s = HashMap::new(); + + // Peer has only hash A in its view. + s.insert( + peer_a.clone(), + make_peer_state(vec![(hash_a, vec![])]), + ); + + s + }, + our_view: View(vec![hash_a, hash_b]), + }; + + let pool = ThreadPool::new().unwrap(); + let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + + executor::block_on(async move { + let pov_hash = make_pov(vec![1, 2, 3]).hash(); + + // Hash B is in our view but not the peer's + handle_network_update( + &mut state, + &mut ctx, + NetworkBridgeEvent::PeerMessage( + peer_a.clone(), + WireMessage::Awaiting(hash_b, vec![pov_hash]).encode(), + ), + ).await.unwrap(); + + assert!(state.peer_state[&peer_a].awaited.get(&hash_b).is_none()); + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer(peer, rep) + ) => { + assert_eq!(peer, peer_a); + assert_eq!(rep, COST_AWAITED_NOT_IN_VIEW); + } + ); + }); + } + + #[test] + fn peer_reported_for_awaiting_outside_our_view() { + let hash_a: Hash = [0; 32].into(); + let hash_b: Hash = [1; 32].into(); + + let peer_a = PeerId::random(); + + let mut state = State { + relay_parent_state: { + let mut s = HashMap::new(); + s.insert(hash_a, BlockBasedState { + known: HashMap::new(), + fetching: HashMap::new(), + n_validators: 10, + }); + + s + }, + peer_state: { + let mut s = HashMap::new(); + + // Peer has hashes A and B in their view. + s.insert( + peer_a.clone(), + make_peer_state(vec![(hash_a, vec![]), (hash_b, vec![])]), + ); + + s + }, + our_view: View(vec![hash_a]), + }; + + let pool = ThreadPool::new().unwrap(); + let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + + executor::block_on(async move { + let pov_hash = make_pov(vec![1, 2, 3]).hash(); + + // Hash B is in peer's view but not ours. + handle_network_update( + &mut state, + &mut ctx, + NetworkBridgeEvent::PeerMessage( + peer_a.clone(), + WireMessage::Awaiting(hash_b, vec![pov_hash]).encode(), + ), + ).await.unwrap(); + + // Illegal `awaited` is ignored. + assert!(state.peer_state[&peer_a].awaited[&hash_b].is_empty()); + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer(peer, rep) + ) => { + assert_eq!(peer, peer_a); + assert_eq!(rep, COST_AWAITED_NOT_IN_VIEW); + } + ); + }); + } + + #[test] + fn peer_complete_fetch_leads_to_us_completing_others() { + let hash_a: Hash = [0; 32].into(); + + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + + let (pov_send, pov_recv) = oneshot::channel(); + + let pov = make_pov(vec![1, 2, 3]); + let pov_hash = pov.hash(); + + let mut state = State { + relay_parent_state: { + let mut s = HashMap::new(); + let mut b = BlockBasedState { + known: HashMap::new(), + fetching: HashMap::new(), + n_validators: 10, + }; + + // pov is being fetched. + b.fetching.insert(pov_hash, vec![pov_send]); + + s.insert(hash_a, b); + s + }, + peer_state: { + let mut s = HashMap::new(); + + s.insert( + peer_a.clone(), + make_peer_state(vec![(hash_a, vec![])]), + ); + + // peer B is awaiting peer A's request. + s.insert( + peer_b.clone(), + make_peer_state(vec![(hash_a, vec![pov_hash])]), + ); + + s + }, + our_view: View(vec![hash_a]), + }; + + let pool = ThreadPool::new().unwrap(); + let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + + executor::block_on(async move { + handle_network_update( + &mut state, + &mut ctx, + NetworkBridgeEvent::PeerMessage( + peer_a.clone(), + WireMessage::SendPoV(hash_a, pov_hash, pov.clone()).encode(), + ), + ).await.unwrap(); + + assert_eq!(&*pov_recv.await.unwrap(), &pov); + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer(peer, rep) + ) => { + assert_eq!(peer, peer_a); + assert_eq!(rep, BENEFIT_FRESH_POV); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendMessage(peers, protocol, message) + ) => { + assert_eq!(peers, vec![peer_b.clone()]); + assert_eq!(protocol, PROTOCOL_V1); + assert_eq!( + message, + WireMessage::SendPoV(hash_a, pov_hash, pov.clone()).encode(), + ); + } + ); + + assert!(!state.peer_state[&peer_b].awaited[&hash_a].contains(&pov_hash)); + }); + } + + // TODO [now] awaiting peer sending us something is no longer awaiting. + #[test] + fn peer_completing_request_no_longer_awaiting() { + let hash_a: Hash = [0; 32].into(); + + let peer_a = PeerId::random(); + + let (pov_send, pov_recv) = oneshot::channel(); + + let pov = make_pov(vec![1, 2, 3]); + let pov_hash = pov.hash(); + + let mut state = State { + relay_parent_state: { + let mut s = HashMap::new(); + let mut b = BlockBasedState { + known: HashMap::new(), + fetching: HashMap::new(), + n_validators: 10, + }; + + // pov is being fetched. + b.fetching.insert(pov_hash, vec![pov_send]); + + s.insert(hash_a, b); + s + }, + peer_state: { + let mut s = HashMap::new(); + + // peer A is registered as awaiting. + s.insert( + peer_a.clone(), + make_peer_state(vec![(hash_a, vec![pov_hash])]), + ); + + s + }, + our_view: View(vec![hash_a]), + }; + + let pool = ThreadPool::new().unwrap(); + let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool); + + executor::block_on(async move { + handle_network_update( + &mut state, + &mut ctx, + NetworkBridgeEvent::PeerMessage( + peer_a.clone(), + WireMessage::SendPoV(hash_a, pov_hash, pov.clone()).encode(), + ), + ).await.unwrap(); + + assert_eq!(&*pov_recv.await.unwrap(), &pov); + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer(peer, rep) + ) => { + assert_eq!(peer, peer_a); + assert_eq!(rep, BENEFIT_FRESH_POV); + } + ); + + // We received the PoV from peer A, so we do not consider it awaited by peer A anymore. + assert!(!state.peer_state[&peer_a].awaited[&hash_a].contains(&pov_hash)); + }); + } +} diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index 1ff4986b06..73371e28fd 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -28,11 +28,14 @@ use polkadot_primitives::{BlockNumber, Hash, Signature}; use polkadot_primitives::parachain::{ AbridgedCandidateReceipt, PoVBlock, ErasureChunk, BackedCandidate, Id as ParaId, SignedAvailabilityBitfield, SigningContext, ValidatorId, ValidationCode, ValidatorIndex, + CandidateDescriptor, }; use polkadot_node_primitives::{ MisbehaviorReport, SignedFullStatement, View, ProtocolId, }; +use std::sync::Arc; + pub use sc_network::{ObservedRole, ReputationChange, PeerId}; /// A notification of a new backed candidate. @@ -214,6 +217,21 @@ pub enum ProvisionerMessage { ProvisionableData(ProvisionableData), } +/// Message to the PoV Distribution Subsystem. +#[derive(Debug)] +pub enum PoVDistributionMessage { + /// Fetch a PoV from the network. + /// + /// This `CandidateDescriptor` should correspond to a candidate seconded under the provided + /// relay-parent hash. + FetchPoV(Hash, CandidateDescriptor, oneshot::Sender>), + /// Distribute a PoV for the given relay-parent and CandidateDescriptor. + /// The PoV should correctly hash to the PoV hash mentioned in the CandidateDescriptor + DistributePoV(Hash, CandidateDescriptor, Arc), + /// An update from the network bridge. + NetworkBridgeUpdate(NetworkBridgeEvent), +} + /// A message type tying together all message types that are used across Subsystems. #[derive(Debug)] pub enum AllMessages { @@ -231,6 +249,8 @@ pub enum AllMessages { BitfieldDistribution(BitfieldDistributionMessage), /// Message for the Provisioner subsystem. Provisioner(ProvisionerMessage), + /// Message for the PoV Distribution subsystem. + PoVDistribution(PoVDistributionMessage), /// Message for the Runtime API subsystem. RuntimeApi(RuntimeApiMessage), /// Message for the availability store subsystem. diff --git a/polkadot/primitives/src/parachain.rs b/polkadot/primitives/src/parachain.rs index b187d488e1..d0d3dce558 100644 --- a/polkadot/primitives/src/parachain.rs +++ b/polkadot/primitives/src/parachain.rs @@ -460,6 +460,17 @@ impl AbridgedCandidateReceipt { pov_block_hash: *pov_block_hash, } } + + /// Clone the relevant portions of the `AbridgedCandidateReceipt` to form a `CandidateDescriptor`. + pub fn to_descriptor(&self) -> CandidateDescriptor { + CandidateDescriptor { + para_id: self.parachain_index, + relay_parent: self.relay_parent, + collator: self.collator.clone(), + signature: self.signature.clone(), + pov_hash: self.pov_block_hash.clone(), + } + } } @@ -478,6 +489,26 @@ impl Ord for AbridgedCandidateReceipt { } } +/// A unique descriptor of the candidate receipt, in a lightweight format. +#[derive(PartialEq, Eq, Clone, Encode, Decode)] +#[cfg_attr(feature = "std", derive(Debug, Default))] +pub struct CandidateDescriptor { + /// The ID of the para this is a candidate for. + pub para_id: Id, + /// The hash of the relay-chain block this should be executed in + /// the context of. + // NOTE: the fact that the hash includes this value means that code depends + // on this for deduplication. Removing this field is likely to break things. + pub relay_parent: H, + /// The collator's relay-chain account ID + pub collator: CollatorId, + /// Signature on blake2-256 of components of this receipt: + /// The para ID, the relay parent, and the pov_hash. + pub signature: CollatorSignature, + /// The hash of the pov-block. + pub pov_hash: H, +} + /// A collation sent by a collator. #[derive(PartialEq, Eq, Clone, Encode, Decode)] #[cfg_attr(feature = "std", derive(Debug, Default))] diff --git a/polkadot/roadmap/implementers-guide/src/node/backing/pov-distribution.md b/polkadot/roadmap/implementers-guide/src/node/backing/pov-distribution.md index 1cede76674..32c8fd3788 100644 --- a/polkadot/roadmap/implementers-guide/src/node/backing/pov-distribution.md +++ b/polkadot/roadmap/implementers-guide/src/node/backing/pov-distribution.md @@ -26,11 +26,13 @@ This protocol is described in terms of "us" and our peers, with the understandin As we are gossiping, we need to track which PoVs our peers are waiting for to avoid sending them data that they are not expecting. It is not reasonable to expect our peers to buffer unexpected PoVs, just as we will not buffer unexpected PoVs. So notifying our peers about what is being awaited is key. However it is important that the notifications system is also bounded. -For this, in order to avoid reaching into the internals of the [Statement Distribution](statement-distribution.md) Subsystem, we can rely on an expected propery of candidate backing: that each validator can only second one candidate at each chain head. So we can set a cap on the number of PoVs each peer is allowed to notify us that they are waiting for at a given relay-parent. This cap will be the number of validators at that relay-parent. And the view update mechanism of the [Network Bridge](../utility/network-bridge.md) ensures that peers are only allowed to consider a certain set of relay-parents as live. So this bounding mechanism caps the amount of data we need to store per peer at any time at `sum({ n_validators_at_head(head) | head in view_heads })`. Additionally, peers should only be allowed to notify us of PoV hashes they are waiting for in the context of relay-parents in our own local view, which means that `n_validators_at_head` is implied to be `0` for relay-parents not in our own local view. +For this, in order to avoid reaching into the internals of the [Statement Distribution](statement-distribution.md) Subsystem, we can rely on an expected propery of candidate backing: that each validator can second up to 2 candidates per chain head. This will typically be only one, because they are only supposed to issue one, but they can equivocate if they are willing to be slashed. So we can set a cap on the number of PoVs each peer is allowed to notify us that they are waiting for at a given relay-parent. This cap will be twice the number of validators at that relay-parent. In practice, this is a very lax upper bound that can be reduced much further if desired. + +The view update mechanism of the [Network Bridge](../utility/network-bridge.md) ensures that peers are only allowed to consider a certain set of relay-parents as live. So this bounding mechanism caps the amount of data we need to store per peer at any time at `sum({ 2 * n_validators_at_head(head) * sizeof(hash) for head in view_heads })`. Additionally, peers should only be allowed to notify us of PoV hashes they are waiting for in the context of relay-parents in our own local view, which means that `n_validators_at_head` is implied to be `0` for relay-parents not in our own local view. View updates from peers and our own view updates are received from the network bridge. These will lag somewhat behind the `StartWork` and `StopWork` messages received from the overseer, which will influence the actual data we store. The `OurViewUpdate`s from the [`NetworkBridgeEvent`](../../types/overseer-protocol.md#network-bridge-update) must be considered canonical in terms of our peers' perception of us. -Lastly, the system needs to be bootstrapped with our own perception of which PoVs we are cognizant of but awaiting data for. This is done by receipt of the [`PoVDistributionMessage`](../../types/overseer-protocol.md#pov-distribution-message)::ValidatorStatement variant. We can ignore anything except for `Seconded` statements. +Lastly, the system needs to be bootstrapped with our own perception of which PoVs we are cognizant of but awaiting data for. This is done by receipt of the [`PoVDistributionMessage`](../../types/overseer-protocol.md#pov-distribution-message)::FetchPoV variant. Proper operation of this subsystem depends on the descriptors passed faithfully representing candidates which have been seconded by other validators. ## Formal Description @@ -45,7 +47,6 @@ struct State { struct BlockBasedState { known: Map, // should be a shared PoV in practice. these things are heavy. - awaited: Set, // awaited PoVs by blake2-256 hash. fetching: Map]>, n_validators: usize, } @@ -79,14 +80,11 @@ Here is the logic of the state machine: - On `Concluded`: conclude. *PoV Distribution Messages* -- On `ValidatorStatement(relay_parent, statement)` - - If this is not `Statement::Seconded`, ignore. - - If there is an entry under `relay_parent` in `relay_parent_state`, add the `pov_hash` of the seconded Candidate's [`CandidateDescriptor`](../../types/candidate.md#candidate-descriptor) to the `awaited` set of the entry. - - If the `pov_hash` was not previously awaited and there are `n_validators` or fewer entries in the `awaited` set, send `NetworkMessage::Awaiting(relay_parent, vec![pov_hash])` to all peers. - On `FetchPoV(relay_parent, descriptor, response_channel)` - If there is no entry in `relay_parent_state` under `relay_parent`, ignore. - If there is a PoV under `descriptor.pov_hash` in the `known` map, send that PoV on the channel and return. - Otherwise, place the `response_channel` in the `fetching` map under `descriptor.pov_hash`. + - If the `pov_hash` had no previous entry in `fetching` and there are `2 * n_validators` or fewer entries in the `fetching` set, send `NetworkMessage::Awaiting(relay_parent, vec![pov_hash])` to all peers. - On `DistributePoV(relay_parent, descriptor, PoV)` - If there is no entry in `relay_parent_state` under `relay_parent`, ignore. - Complete and remove any channels under `descriptor.pov_hash` in the `fetching` map. @@ -96,26 +94,28 @@ Here is the logic of the state machine: *Network Bridge Updates* - On `PeerConnected(peer_id, observed_role)` - Make a fresh entry in the `peer_state` map for the `peer_id`. -- On `PeerDisconnected(peer_id) +- On `PeerDisconnected(peer_id)` - Remove the entry for `peer_id` from the `peer_state` map. - On `PeerMessage(peer_id, bytes)` - If the bytes do not decode to a `NetworkMessage` or the `peer_id` has no entry in the `peer_state` map, report and ignore. - If this is `NetworkMessage::Awaiting(relay_parent, pov_hashes)`: - If there is no entry under `peer_state.awaited` for the `relay_parent`, report and ignore. - If `relay_parent` is not contained within `our_view`, report and ignore. - - Otherwise, if the `awaited` map combined with the `pov_hashes` would have more than `relay_parent_state[relay_parent].n_validators` entries, report and ignore. Note that we are leaning on the property of the network bridge that it sets our view based on `StartWork` messages. + - Otherwise, if the peer's `awaited` map combined with the `pov_hashes` would have more than ` 2 * relay_parent_state[relay_parent].n_validators` entries, report and ignore. Note that we are leaning on the property of the network bridge that it sets our view based on `StartWork` messages. - For each new `pov_hash` in `pov_hashes`, if there is a `pov` under `pov_hash` in the `known` map, send the peer a `NetworkMessage::SendPoV(relay_parent, pov_hash, pov)`. - Otherwise, add the `pov_hash` to the `awaited` map - If this is `NetworkMessage::SendPoV(relay_parent, pov_hash, pov)`: - - If there is no entry under `relay_parent` in `relay_parent_state` or no entry under `pov_hash` in our `awaited` map for that `relay_parent`, report and ignore. + - If there is no entry under `relay_parent` in `relay_parent_state` or no entry under `pov_hash` in our `fetching` map for that `relay_parent`, report and ignore. - If the blake2-256 hash of the pov doesn't equal `pov_hash`, report and ignore. - - Complete and remove any listeners in the `fetching` map under `pov_hash`. + - Complete and remove any listeners in the `fetching` map under `pov_hash`. However, leave an empty set of listeners in the `fetching` map to denote that this was something we once awaited. This will allow us to recognize peers who have sent us something we were expecting, but just a little late. - Add to `known` map. + - Remove the `pov_hash` from the `peer.awaited` map, if any. - Send `NetworkMessage::SendPoV(relay_parent, descriptor.pov_hash, PoV)` to all peers who have the `descriptor.pov_hash` in the set under `relay_parent` in the `peer.awaited` map and remove the entry from `peer.awaited`. - On `PeerViewChange(peer_id, view)` - If Peer is unknown, ignore. - Ensure there is an entry under `relay_parent` for each `relay_parent` in `view` within the `peer.awaited` map, creating blank `awaited` lists as necessary. - Remove all entries under `peer.awaited` that are not within `view`. + - For all hashes in `view` but were not within the old, send the peer all the keys in our `fetching` map under the block-based state for that hash - i.e. notify the peer of everything we are awaiting at that hash. - On `OurViewChange(view)` - Update `our_view` to `view` diff --git a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md index 7908def940..799d1e2a7f 100644 --- a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -172,11 +172,10 @@ If this subsystem chooses to second a parachain block, it dispatches a `Candidat ```rust enum PoVDistributionMessage { - /// Note a statement by a validator on a relay-parent. `Seconded` statements must always - /// have been passed in before `Valid` or `Invalid` statements. - ValidatorStatement(Hash, SignedFullStatement), /// Fetch a PoV from the network. - /// (relay_parent, PoV-hash, Response channel). + /// + /// This `CandidateDescriptor` should correspond to a candidate seconded under the provided + /// relay-parent hash. FetchPoV(Hash, CandidateDescriptor, ResponseChannel), /// Distribute a PoV for the given relay-parent and CandidateDescriptor. /// The PoV should correctly hash to the PoV hash mentioned in the CandidateDescriptor