diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index a15f1f3650..60fc04871d 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -883,7 +883,7 @@ dependencies = [ "log 0.4.11", "regalloc", "serde", - "smallvec 1.4.1", + "smallvec 1.4.2", "target-lexicon", "thiserror", ] @@ -921,7 +921,7 @@ checksum = "2ef419efb4f94ecc02e5d9fbcc910d2bb7f0040e2de570e63a454f883bc891d6" dependencies = [ "cranelift-codegen", "log 0.4.11", - "smallvec 1.4.1", + "smallvec 1.4.2", "target-lexicon", ] @@ -1566,7 +1566,7 @@ dependencies = [ "parity-scale-codec", "paste", "serde", - "smallvec 1.4.1", + "smallvec 1.4.2", "sp-arithmetic", "sp-core", "sp-inherents", @@ -2665,7 +2665,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "smallvec 1.4.1", + "smallvec 1.4.2", "sp-api", "sp-authority-discovery", "sp-block-builder", @@ -2703,7 +2703,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0315ef2f688e33844400b31f11c263f2b3dc21d8b9355c6891c5f185fae43f9a" dependencies = [ "parity-util-mem", - "smallvec 1.4.1", + "smallvec 1.4.2", ] [[package]] @@ -2732,7 +2732,7 @@ dependencies = [ "parking_lot 0.10.2", "regex", "rocksdb", - "smallvec 1.4.1", + "smallvec 1.4.2", ] [[package]] @@ -2828,7 +2828,7 @@ dependencies = [ "parity-multiaddr", "parking_lot 0.10.2", "pin-project", - "smallvec 1.4.1", + "smallvec 1.4.2", "wasm-timer", ] @@ -2859,7 +2859,7 @@ dependencies = [ "ring", "rw-stream-sink", "sha2 0.8.2", - "smallvec 1.4.1", + "smallvec 1.4.2", "thiserror", "unsigned-varint 0.4.0", "void", @@ -2912,7 +2912,7 @@ dependencies = [ "prost", "prost-build", "rand 0.7.3", - "smallvec 1.4.1", + "smallvec 1.4.2", ] [[package]] @@ -2936,7 +2936,7 @@ dependencies = [ "prost-build", "rand 0.7.3", "sha2 0.8.2", - "smallvec 1.4.1", + "smallvec 1.4.2", "unsigned-varint 0.4.0", "wasm-timer", ] @@ -2953,7 +2953,7 @@ dependencies = [ "log 0.4.11", "prost", "prost-build", - "smallvec 1.4.1", + "smallvec 1.4.2", "wasm-timer", ] @@ -2977,7 +2977,7 @@ dependencies = [ "prost-build", "rand 0.7.3", "sha2 0.8.2", - "smallvec 1.4.1", + "smallvec 1.4.2", "uint", "unsigned-varint 0.4.0", "void", @@ -3001,7 +3001,7 @@ dependencies = [ "log 0.4.11", "net2", "rand 0.7.3", - "smallvec 1.4.1", + "smallvec 1.4.2", "void", "wasm-timer", ] @@ -3104,7 +3104,7 @@ dependencies = [ "log 0.4.11", "lru 0.6.0", "rand 0.7.3", - "smallvec 1.4.1", + "smallvec 1.4.2", "wasm-timer", ] @@ -3148,7 +3148,7 @@ dependencies = [ "libp2p-core", "log 0.4.11", "rand 0.7.3", - "smallvec 1.4.1", + "smallvec 1.4.2", "void", "wasm-timer", ] @@ -3604,7 +3604,7 @@ dependencies = [ "futures 0.3.5", "log 0.4.11", "pin-project", - "smallvec 1.4.1", + "smallvec 1.4.2", "unsigned-varint 0.4.0", ] @@ -4307,7 +4307,7 @@ dependencies = [ "pallet-transaction-payment-rpc-runtime-api", "parity-scale-codec", "serde", - "smallvec 1.4.1", + "smallvec 1.4.2", "sp-core", "sp-io", "sp-runtime", @@ -4486,7 +4486,7 @@ dependencies = [ "parity-util-mem-derive", "parking_lot 0.10.2", "primitive-types", - "smallvec 1.4.1", + "smallvec 1.4.2", "winapi 0.3.9", ] @@ -4593,7 +4593,7 @@ dependencies = [ "cloudabi 0.0.3", "libc", "redox_syscall", - "smallvec 1.4.1", + "smallvec 1.4.2", "winapi 0.3.9", ] @@ -4608,7 +4608,7 @@ dependencies = [ "instant", "libc", "redox_syscall", - "smallvec 1.4.1", + "smallvec 1.4.2", "winapi 0.3.9", ] @@ -4751,6 +4751,7 @@ dependencies = [ "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", + "polkadot-node-subsystem-util", "polkadot-primitives", "sc-network", "smol 0.3.3", @@ -4776,9 +4777,10 @@ dependencies = [ "polkadot-node-network-protocol", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", + "polkadot-node-subsystem-util", "polkadot-primitives", "sc-keystore", - "smallvec 1.4.1", + "smallvec 1.4.2", "sp-core", "sp-keyring", "streamunordered", @@ -4810,6 +4812,29 @@ dependencies = [ "wasm-bindgen-futures", ] +[[package]] +name = "polkadot-collator-protocol" +version = "0.1.0" +dependencies = [ + "assert_matches", + "derive_more 0.99.9", + "env_logger", + "futures 0.3.5", + "futures-timer 3.0.2", + "log 0.4.11", + "parity-scale-codec", + "polkadot-network-bridge", + "polkadot-node-network-protocol", + "polkadot-node-subsystem", + "polkadot-node-subsystem-test-helpers", + "polkadot-node-subsystem-util", + "polkadot-primitives", + "smallvec 1.4.2", + "smol-timeout", + "sp-core", + "sp-keyring", +] + [[package]] name = "polkadot-core-primitives" version = "0.7.30" @@ -5073,7 +5098,7 @@ dependencies = [ "polkadot-primitives", "polkadot-statement-table", "sc-network", - "smallvec 1.4.1", + "smallvec 1.4.2", "sp-core", "substrate-prometheus-endpoint", ] @@ -5092,10 +5117,11 @@ dependencies = [ "pin-project", "polkadot-node-primitives", "polkadot-node-subsystem", + "polkadot-node-subsystem-util", "polkadot-primitives", "polkadot-statement-table", "sc-network", - "smallvec 1.4.1", + "smallvec 1.4.2", "sp-core", ] @@ -5120,7 +5146,7 @@ dependencies = [ "polkadot-statement-table", "sc-keystore", "sc-network", - "smallvec 1.4.1", + "smallvec 1.4.2", "sp-core", "streamunordered", ] @@ -5286,7 +5312,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "smallvec 1.4.1", + "smallvec 1.4.2", "sp-api", "sp-authority-discovery", "sp-block-builder", @@ -5588,7 +5614,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "smallvec 1.4.1", + "smallvec 1.4.2", "sp-api", "sp-authority-discovery", "sp-block-builder", @@ -6229,7 +6255,7 @@ version = "4.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a415a013dd7c5d4221382329a5a3482566da675737494935cbbbcdec04662f9d" dependencies = [ - "smallvec 1.4.1", + "smallvec 1.4.2", ] [[package]] @@ -6260,7 +6286,7 @@ checksum = "b9ba8aaf5fe7cf307c6dbdaeed85478961d29e25e3bee5169e11b92fa9f027a8" dependencies = [ "log 0.4.11", "rustc-hash", - "smallvec 1.4.1", + "smallvec 1.4.2", ] [[package]] @@ -6393,7 +6419,7 @@ dependencies = [ "polkadot-runtime-parachains", "serde", "serde_derive", - "smallvec 1.4.1", + "smallvec 1.4.2", "sp-api", "sp-authority-discovery", "sp-block-builder", @@ -7767,9 +7793,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.4.1" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3757cb9d89161a2f24e1cf78efa0c1fcff485d18e3f55e0aa3480824ddaa0f3f" +checksum = "fbee7696b84bbf3d89a1c2eccff0850e3047ed46bfcd2e92c29a2d074d57e252" [[package]] name = "smol" @@ -7808,6 +7834,16 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "smol-timeout" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "024818c1f00b80e8171ddcfcee33860134293aa3aced60c9cbd7a5a2d41db392" +dependencies = [ + "pin-project", + "smol 0.1.18", +] + [[package]] name = "snow" version = "0.7.1" @@ -8334,7 +8370,7 @@ dependencies = [ "parity-scale-codec", "parking_lot 0.10.2", "rand 0.7.3", - "smallvec 1.4.1", + "smallvec 1.4.2", "sp-core", "sp-externalities", "sp-panic-handler", @@ -9312,7 +9348,7 @@ dependencies = [ "serde", "serde_json", "sharded-slab", - "smallvec 1.4.1", + "smallvec 1.4.2", "thread_local", "tracing-core", "tracing-log", @@ -9335,7 +9371,7 @@ dependencies = [ "hashbrown 0.8.0", "log 0.4.11", "rustc-hex", - "smallvec 1.4.1", + "smallvec 1.4.2", ] [[package]] @@ -9703,7 +9739,7 @@ dependencies = [ "log 0.4.11", "region", "rustc-demangle", - "smallvec 1.4.1", + "smallvec 1.4.2", "target-lexicon", "wasmparser 0.59.0", "wasmtime-environ", @@ -9964,7 +10000,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "smallvec 1.4.1", + "smallvec 1.4.2", "sp-api", "sp-authority-discovery", "sp-block-builder", diff --git a/polkadot/Cargo.toml b/polkadot/Cargo.toml index 4e75139010..e66ef5a329 100644 --- a/polkadot/Cargo.toml +++ b/polkadot/Cargo.toml @@ -54,6 +54,7 @@ members = [ "node/network/statement-distribution", "node/network/bitfield-distribution", "node/network/availability-distribution", + "node/network/collator-protocol", "node/overseer", "node/primitives", "node/service", diff --git a/polkadot/node/core/candidate-selection/src/lib.rs b/polkadot/node/core/candidate-selection/src/lib.rs index 565cc3e9c8..7d31c0318e 100644 --- a/polkadot/node/core/candidate-selection/src/lib.rs +++ b/polkadot/node/core/candidate-selection/src/lib.rs @@ -212,7 +212,12 @@ impl CandidateSelectionJob { ) { if self.seconded_candidate.is_none() { let (candidate_receipt, pov) = - match get_collation(relay_parent, para_id, self.sender.clone()).await { + match get_collation( + relay_parent, + para_id, + collator_id.clone(), + self.sender.clone(), + ).await { Ok(response) => response, Err(err) => { log::warn!( @@ -296,12 +301,14 @@ impl CandidateSelectionJob { async fn get_collation( relay_parent: Hash, para_id: ParaId, + collator_id: CollatorId, mut sender: mpsc::Sender, ) -> Result<(CandidateReceipt, PoV), Error> { let (tx, rx) = oneshot::channel(); sender .send(FromJob::Collator(CollatorProtocolMessage::FetchCollation( relay_parent, + collator_id, para_id, tx, ))) @@ -514,7 +521,7 @@ mod tests { CandidateSelectionMessage::Collation( relay_parent, para_id, - collator_id_clone, + collator_id_clone.clone(), ), )) .await @@ -525,11 +532,13 @@ mod tests { match msg { FromJob::Collator(CollatorProtocolMessage::FetchCollation( got_relay_parent, + collator_id, got_para_id, return_sender, )) => { assert_eq!(got_relay_parent, relay_parent); assert_eq!(got_para_id, para_id); + assert_eq!(collator_id, collator_id_clone); return_sender .send((candidate_receipt.clone(), pov.clone())) diff --git a/polkadot/node/network/availability-distribution/Cargo.toml b/polkadot/node/network/availability-distribution/Cargo.toml index 18c1769a52..fd2028e538 100644 --- a/polkadot/node/network/availability-distribution/Cargo.toml +++ b/polkadot/node/network/availability-distribution/Cargo.toml @@ -14,6 +14,7 @@ polkadot-erasure-coding = { path = "../../../erasure-coding" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } polkadot-network-bridge = { path = "../../network/bridge" } polkadot-node-network-protocol = { path = "../../network/protocol" } +polkadot-node-subsystem-util = { path = "../../subsystem-util" } sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } derive_more = "0.99.9" sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] } diff --git a/polkadot/node/network/availability-distribution/src/tests.rs b/polkadot/node/network/availability-distribution/src/tests.rs index 9b2b1b35e5..0c27e4c212 100644 --- a/polkadot/node/network/availability-distribution/src/tests.rs +++ b/polkadot/node/network/availability-distribution/src/tests.rs @@ -22,7 +22,8 @@ use polkadot_primitives::v1::{ GroupRotationInfo, HeadData, PersistedValidationData, OccupiedCore, PoV, ScheduledCore, ValidatorPair, }; -use polkadot_subsystem_testhelpers::{self as test_helpers, TimeoutExt}; +use polkadot_subsystem_testhelpers::{self as test_helpers}; +use polkadot_node_subsystem_util::TimeoutExt; use polkadot_node_network_protocol::ObservedRole; use futures::{executor, future, Future}; diff --git a/polkadot/node/network/bitfield-distribution/Cargo.toml b/polkadot/node/network/bitfield-distribution/Cargo.toml index 67c2671d41..b8b19cbecb 100644 --- a/polkadot/node/network/bitfield-distribution/Cargo.toml +++ b/polkadot/node/network/bitfield-distribution/Cargo.toml @@ -13,6 +13,7 @@ codec = { package="parity-scale-codec", version = "1.3.4" } node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" } polkadot-primitives = { path = "../../../primitives" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } +polkadot-node-subsystem-util = { package = "polkadot-node-subsystem-util", path = "../../subsystem-util" } polkadot-network-bridge = { path = "../../network/bridge" } polkadot-node-network-protocol = { path = "../../network/protocol" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs index dd09712cf6..9b0e5c747d 100644 --- a/polkadot/node/network/bitfield-distribution/src/lib.rs +++ b/polkadot/node/network/bitfield-distribution/src/lib.rs @@ -614,7 +614,8 @@ mod test { use futures::executor; use maplit::hashmap; use polkadot_primitives::v1::{Signed, ValidatorPair, AvailabilityBitfield}; - use polkadot_node_subsystem_test_helpers::{make_subsystem_context, TimeoutExt}; + use polkadot_node_subsystem_test_helpers::make_subsystem_context; + use polkadot_node_subsystem_util::TimeoutExt; use sp_core::crypto::Pair; use std::time::Duration; use assert_matches::assert_matches; diff --git a/polkadot/node/network/collator-protocol/Cargo.toml b/polkadot/node/network/collator-protocol/Cargo.toml new file mode 100644 index 0000000000..8469bc5b94 --- /dev/null +++ b/polkadot/node/network/collator-protocol/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "polkadot-collator-protocol" +version = "0.1.0" +authors = ["Parity Technologies "] +edition = "2018" + +[dependencies] +futures = "0.3.5" +log = "0.4.11" +derive_more = "0.99.9" + +codec = { package="parity-scale-codec", version = "1.3.4", features = ["std"] } + +polkadot-primitives = { path = "../../../primitives" } +polkadot-network-bridge = { path = "../../network/bridge" } +polkadot-node-network-protocol = { path = "../../network/protocol" } +polkadot-node-subsystem-util = { path = "../../subsystem-util" } +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } + +[dev-dependencies] +env_logger = "0.7.1" +assert_matches = "1.3.0" +smol-timeout = "0.1.0" +smallvec = "1.4.2" +futures-timer = "3.0.2" + +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] } +sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } + +polkadot-subsystem-testhelpers = { package = "polkadot-node-subsystem-test-helpers", path = "../../subsystem-test-helpers" } diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs new file mode 100644 index 0000000000..297ccce7e3 --- /dev/null +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -0,0 +1,1023 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use std::collections::HashMap; + +use futures::channel::oneshot; +use log::{trace, warn}; +use polkadot_primitives::v1::{ + CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, + PoV, ValidatorId, +}; +use super::{TARGET, Result}; +use polkadot_subsystem::{ + FromOverseer, OverseerSignal, SubsystemContext, + messages::{ + AllMessages, CollatorProtocolMessage, RuntimeApiMessage, RuntimeApiRequest, + NetworkBridgeMessage, + }, +}; +use polkadot_node_network_protocol::{ + v1 as protocol_v1, View, PeerId, PeerSet, NetworkBridgeEvent, RequestId, +}; +use polkadot_node_subsystem_util::{ + request_validators_ctx, + request_validator_groups_ctx, +}; + +#[derive(Default)] +struct State { + /// Our id. + our_id: CollatorId, + + /// The para this collator is collating on. + /// Starts as `None` and is updated with every `CollateOn` message. + collating_on: Option, + + /// Track all active peers and their views + /// to determine what is relevant to them. + peer_views: HashMap, + + /// Our own view. + view: View, + + /// Possessed collations. + /// + /// We will keep up to one local collation per relay-parent. + collations: HashMap, + + /// Our validator groups active leafs. + our_validators_groups: HashMap>, + + /// Validators we know about via `ConnectToValidators` message. + /// + /// These are the only validators we are interested in talking to and as such + /// all actions from peers not in this map will be ignored. + /// Entries in this map will be cleared as validator groups in `our_validator_groups` + /// go out of scope with their respective deactivated leafs. + known_validators: HashMap, +} + +/// Distribute a collation. +/// +/// Figure out the core our para is assigned to and the relevant validators. +/// Issue a connection request to these validators. +/// If the para is not scheduled or next up on any core, at the relay-parent, +/// or the relay-parent isn't in the active-leaves set, we ignore the message +/// as it must be invalid in that case - although this indicates a logic error +/// elsewhere in the node. +async fn distribute_collation( + ctx: &mut Context, + state: &mut State, + id: ParaId, + receipt: CandidateReceipt, + pov: PoV, +) -> Result<()> +where + Context: SubsystemContext +{ + let relay_parent = receipt.descriptor.relay_parent; + + // This collation is not in the active-leaves set. + if !state.view.contains(&relay_parent) { + warn!( + target: TARGET, + "Distribute collation message parent {:?} is outside of our view", + relay_parent, + ); + + return Ok(()); + } + + // We have already seen collation for this relay parent. + if state.collations.contains_key(&relay_parent) { + return Ok(()); + } + + // Determine which core the para collated-on is assigned to. + // If it is not scheduled then ignore the message. + let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? { + Some(core) => core, + None => { + warn!( + target: TARGET, + "Looks like no core is assigned to {:?} at {:?}", id, relay_parent, + ); + return Ok(()); + } + }; + + // Determine the group on that core and the next group on that core. + let our_validators = match determine_our_validators(ctx, our_core, num_cores, relay_parent).await? { + Some(validators) => validators, + None => { + warn!( + target: TARGET, + "There are no validators assigned to {:?} core", our_core, + ); + + return Ok(()); + } + }; + + state.our_validators_groups.insert(relay_parent, our_validators.clone()); + + // Issue a discovery request for the validators of the current group and the next group. + connect_to_validators(ctx, state, our_validators).await?; + + state.collations.insert(relay_parent, (receipt, pov)); + + Ok(()) +} + +/// Get the Id of the Core that is assigned to the para being collated on if any +/// and the total number of cores. +async fn determine_core( + ctx: &mut Context, + para_id: ParaId, + relay_parent: Hash, +) -> Result> +where + Context: SubsystemContext +{ + let (tx, rx) = oneshot::channel(); + + ctx.send_message(AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::AvailabilityCores(tx), + ) + )).await?; + + let cores = rx.await??; + + for (idx, core) in cores.iter().enumerate() { + if let CoreState::Scheduled(occupied) = core { + if occupied.para_id == para_id { + return Ok(Some(((idx as u32).into(), cores.len()))); + } + } + } + + Ok(None) +} + +/// Figure out a group of validators assigned to the para being collated on. +/// +/// This returns validators for the current group and the next group. +async fn determine_our_validators( + ctx: &mut Context, + core_index: CoreIndex, + cores: usize, + relay_parent: Hash, +) -> Result>> +where + Context: SubsystemContext +{ + let groups = request_validator_groups_ctx(relay_parent, ctx).await?; + + let groups = groups.await??; + + let current_group_index = groups.1.group_for_core(core_index, cores); + + let mut connect_to_validators = match groups.0.get(current_group_index.0 as usize) { + Some(group) => group.clone(), + None => return Ok(None), + }; + + let next_group_idx = (current_group_index.0 as usize + 1) % groups.0.len(); + + if let Some(next_group) = groups.0.get(next_group_idx) { + connect_to_validators.extend_from_slice(&next_group); + } + + let validators = request_validators_ctx(relay_parent, ctx).await?; + + let validators = validators.await??; + + let validators = connect_to_validators + .into_iter() + .map(|idx| validators[idx as usize].clone()) + .collect(); + + Ok(Some(validators)) +} + +/// Issue a `Declare` collation message to a set of peers. +async fn declare( + ctx: &mut Context, + state: &mut State, + to: Vec, +) -> Result<()> +where + Context: SubsystemContext +{ + let wire_message = protocol_v1::CollatorProtocolMessage::Declare(state.our_id.clone()); + + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::SendCollationMessage( + to, + protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + ) + )).await?; + + Ok(()) +} + +/// Issue a connection request to a set of validators. +async fn connect_to_validators( + ctx: &mut Context, + state: &mut State, + validators: Vec, +) -> Result<()> +where + Context: SubsystemContext +{ + let (tx, rx) = oneshot::channel(); + + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::ConnectToValidators(PeerSet::Collation, validators, tx), + )).await?; + + let mut validators_ids = rx.await?; + + for id in validators_ids.drain(..) { + state.known_validators.insert(id.1, id.0); + } + + Ok(()) +} + +/// Advertise collation to a set of relay chain validators. +async fn advertise_collation( + ctx: &mut Context, + state: &mut State, + relay_parent: Hash, + to: Vec, +) -> Result<()> +where + Context: SubsystemContext +{ + let collating_on = match state.collating_on { + Some(collating_on) => collating_on, + None => { + return Ok(()); + } + }; + + let wire_message = protocol_v1::CollatorProtocolMessage::AdvertiseCollation(relay_parent, collating_on); + + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::SendCollationMessage( + to, + protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + ) + )).await?; + + Ok(()) +} + +/// The main incoming message dispatching switch. +async fn process_msg( + ctx: &mut Context, + state: &mut State, + msg: CollatorProtocolMessage, +) -> Result<()> +where + Context: SubsystemContext +{ + use CollatorProtocolMessage::*; + + match msg { + CollateOn(id) => { + state.collating_on = Some(id); + } + DistributeCollation(receipt, pov) => { + match state.collating_on { + Some(id) if receipt.descriptor.para_id != id => { + // If the ParaId of a collation requested to be distributed does not match + // the one we expect, we ignore the message. + warn!( + target: TARGET, + "DistributeCollation message for para {:?} while collating on {:?}", + receipt.descriptor.para_id, + id, + ); + } + Some(id) => { + distribute_collation(ctx, state, id, receipt, pov).await?; + } + None => { + warn!( + target: TARGET, + "DistributeCollation message for para {:?} while not collating on any", + receipt.descriptor.para_id, + ); + } + } + } + FetchCollation(_, _, _, _) => { + warn!( + target: TARGET, + "FetchCollation message is not expected on the collator side of the protocol", + ); + } + ReportCollator(_) => { + warn!( + target: TARGET, + "ReportCollator message is not expected on the collator side of the protocol", + ); + } + NoteGoodCollation(_) => { + warn!( + target: TARGET, + "NoteGoodCollation message is not expected on the collator side of the protocol", + ); + } + NetworkBridgeUpdateV1(event) => { + if let Err(e) = handle_network_msg( + ctx, + state, + event, + ).await { + warn!( + target: TARGET, + "Failed to handle incoming network message: {:?}", e, + ); + } + }, + } + + Ok(()) +} + +/// Issue a response to a previously requested collation. +async fn send_collation( + ctx: &mut Context, + request_id: RequestId, + origin: PeerId, + receipt: CandidateReceipt, + pov: PoV, +) -> Result<()> +where + Context: SubsystemContext +{ + let wire_message = protocol_v1::CollatorProtocolMessage::Collation( + request_id, + receipt, + pov, + ); + + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::SendCollationMessage( + vec![origin], + protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + ) + )).await?; + + Ok(()) +} + +/// A networking messages switch. +async fn handle_incoming_peer_message( + ctx: &mut Context, + state: &mut State, + origin: PeerId, + msg: protocol_v1::CollatorProtocolMessage, +) -> Result<()> +where + Context: SubsystemContext +{ + use protocol_v1::CollatorProtocolMessage::*; + + match msg { + Declare(_) => { + warn!( + target: TARGET, + "Declare message is not expected on the collator side of the protocol", + ); + } + AdvertiseCollation(_, _) => { + warn!( + target: TARGET, + "AdvertiseCollation message is not expected on the collator side of the protocol", + ); + } + RequestCollation(request_id, relay_parent, para_id) => { + match state.collating_on { + Some(our_para_id) => { + if our_para_id == para_id { + if let Some(collation) = state.collations.get(&relay_parent).cloned() { + send_collation(ctx, request_id, origin, collation.0, collation.1).await?; + } + } else { + warn!( + target: TARGET, + "Received a RequestCollation for {:?} while collating on {:?}", + para_id, our_para_id, + ); + } + } + None => { + warn!( + target: TARGET, + "Received a RequestCollation for {:?} while not collating on any para", + para_id, + ); + } + } + } + Collation(_, _, _) => { + warn!( + target: TARGET, + "Collation message is not expected on the collator side of the protocol", + ); + } + } + + Ok(()) +} + +/// Our view has changed. +async fn handle_peer_view_change( + ctx: &mut Context, + state: &mut State, + peer_id: PeerId, + view: View, +) -> Result<()> +where + Context: SubsystemContext +{ + let current = state.peer_views.entry(peer_id.clone()).or_default(); + + let added: Vec = view.difference(&*current).cloned().collect(); + + *current = view; + + for added in added.into_iter() { + if state.collations.contains_key(&added) { + advertise_collation(ctx, state, added.clone(), vec![peer_id.clone()]).await?; + } + } + + Ok(()) +} + +/// A peer is connected. +/// +/// We first want to check if this is a validator we are expecting to talk to +/// and if so `Declare` that we are a collator with a given `CollatorId`. +async fn handle_peer_connected( + ctx: &mut Context, + state: &mut State, + peer_id: PeerId, +) -> Result<()> +where + Context: SubsystemContext +{ + if !state.known_validators.contains_key(&peer_id) { + trace!(target: TARGET, "An unknown peer has connected {:?}", peer_id); + + return Ok(()) + } + + state.peer_views.entry(peer_id.clone()).or_default(); + + declare(ctx, state, vec![peer_id]).await?; + + Ok(()) +} + +/// Bridge messages switch. +async fn handle_network_msg( + ctx: &mut Context, + state: &mut State, + bridge_message: NetworkBridgeEvent, +) -> Result<()> +where + Context: SubsystemContext +{ + use NetworkBridgeEvent::*; + + match bridge_message { + PeerConnected(peer_id, _observed_role) => { + handle_peer_connected(ctx, state, peer_id).await?; + } + PeerViewChange(peer_id, view) => { + handle_peer_view_change(ctx, state, peer_id, view).await?; + } + PeerDisconnected(peer_id) => { + state.peer_views.remove(&peer_id); + } + OurViewChange(view) => { + handle_our_view_change(state, view).await?; + } + PeerMessage(remote, msg) => { + handle_incoming_peer_message(ctx, state, remote, msg).await?; + } + } + + Ok(()) +} + +/// Handles our view changes. +async fn handle_our_view_change( + state: &mut State, + view: View, +) -> Result<()> { + let old_view = std::mem::replace(&mut (state.view), view); + + let view = state.view.clone(); + + let removed = old_view.difference(&view).collect::>(); + + for removed in removed.into_iter() { + state.collations.remove(&removed); + if let Some(group) = state.our_validators_groups.remove(&removed) { + state.known_validators.retain(|_, v| !group.contains(v)); + } + } + + Ok(()) +} + +/// The collator protocol collator side main loop. +pub(crate) async fn run(mut ctx: Context, our_id: CollatorId) -> Result<()> +where + Context: SubsystemContext +{ + use FromOverseer::*; + use OverseerSignal::*; + + let mut state = State::default(); + + state.our_id = our_id; + + loop { + match ctx.recv().await? { + Communication { msg } => process_msg(&mut ctx, &mut state, msg).await?, + Signal(ActiveLeaves(_update)) => {} + Signal(BlockFinalized(_)) => {} + Signal(Conclude) => break, + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + use log::trace; + use std::time::Duration; + use futures::{executor, future, Future}; + use assert_matches::assert_matches; + use smallvec::smallvec; + + use sp_core::crypto::Pair; + use sp_keyring::Sr25519Keyring; + + use polkadot_primitives::v1::{ + BlockData, CandidateDescriptor, CollatorPair, ScheduledCore, + ValidatorIndex, GroupRotationInfo, + }; + use polkadot_subsystem::ActiveLeavesUpdate; + use polkadot_node_subsystem_util::TimeoutExt; + use polkadot_subsystem_testhelpers::{self as test_helpers}; + use polkadot_node_network_protocol::ObservedRole; + + #[derive(Default)] + struct TestCandidateBuilder { + para_id: ParaId, + pov_hash: Hash, + relay_parent: Hash, + commitments_hash: Hash, + } + + impl TestCandidateBuilder { + fn build(self) -> CandidateReceipt { + CandidateReceipt { + descriptor: CandidateDescriptor { + para_id: self.para_id, + pov_hash: self.pov_hash, + relay_parent: self.relay_parent, + ..Default::default() + }, + commitments_hash: self.commitments_hash, + } + } + } + + #[derive(Clone)] + struct TestState { + chain_ids: Vec, + validators: Vec, + validator_public: Vec, + validator_peer_id: Vec, + validator_groups: (Vec>, GroupRotationInfo), + relay_parent: Hash, + availability_cores: Vec, + our_collator_pair: CollatorPair, + } + + fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec { + val_ids.iter().map(|v| v.public().into()).collect() + } + + impl Default for TestState { + fn default() -> Self { + let chain_a = ParaId::from(1); + let chain_b = ParaId::from(2); + + let chain_ids = vec![chain_a, chain_b]; + + let validators = vec![ + Sr25519Keyring::Alice, + Sr25519Keyring::Bob, + Sr25519Keyring::Charlie, + Sr25519Keyring::Dave, + Sr25519Keyring::Ferdie, + ]; + + let validator_public = validator_pubkeys(&validators); + + let validator_peer_id = std::iter::repeat_with(|| PeerId::random()) + .take(validator_public.len()) + .collect(); + + let validator_groups = vec![vec![2, 0, 4], vec![1], vec![3]]; + let group_rotation_info = GroupRotationInfo { + session_start_block: 0, + group_rotation_frequency: 100, + now: 1, + }; + let validator_groups = (validator_groups, group_rotation_info); + + let availability_cores = vec![ + CoreState::Scheduled(ScheduledCore { + para_id: chain_ids[0], + collator: None, + }), + CoreState::Scheduled(ScheduledCore { + para_id: chain_ids[1], + collator: None, + }), + ]; + + let relay_parent = Hash::repeat_byte(0x05); + + let our_collator_pair = CollatorPair::generate().0; + + Self { + chain_ids, + validators, + validator_public, + validator_peer_id, + validator_groups, + relay_parent, + availability_cores, + our_collator_pair, + } + } + } + + struct TestHarness { + virtual_overseer: test_helpers::TestSubsystemContextHandle, + } + + fn test_harness>( + collator_id: CollatorId, + test: impl FnOnce(TestHarness) -> T, + ) { + let _ = env_logger::builder() + .is_test(true) + .filter( + Some("polkadot_collator_protocol"), + log::LevelFilter::Trace, + ) + .filter( + Some(TARGET), + log::LevelFilter::Trace, + ) + .try_init(); + + let pool = sp_core::testing::TaskExecutor::new(); + + let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); + + let subsystem = run(context, collator_id); + + let test_fut = test(TestHarness { virtual_overseer }); + + futures::pin_mut!(test_fut); + futures::pin_mut!(subsystem); + + executor::block_on(future::select(test_fut, subsystem)); + } + + const TIMEOUT: Duration = Duration::from_millis(100); + + async fn overseer_send( + overseer: &mut test_helpers::TestSubsystemContextHandle, + msg: CollatorProtocolMessage, + ) { + trace!("Sending message:\n{:?}", &msg); + overseer + .send(FromOverseer::Communication { msg }) + .timeout(TIMEOUT) + .await + .expect(&format!("{:?} is more than enough for sending messages.", TIMEOUT)); + } + + async fn overseer_recv( + overseer: &mut test_helpers::TestSubsystemContextHandle, + ) -> AllMessages { + let msg = overseer_recv_with_timeout(overseer, TIMEOUT) + .await + .expect(&format!("{:?} is more than enough to receive messages", TIMEOUT)); + + trace!("Received message:\n{:?}", &msg); + + msg + } + + async fn overseer_recv_with_timeout( + overseer: &mut test_helpers::TestSubsystemContextHandle, + timeout: Duration, + ) -> Option { + trace!("Waiting for message..."); + overseer + .recv() + .timeout(timeout) + .await + } + + async fn overseer_signal( + overseer: &mut test_helpers::TestSubsystemContextHandle, + signal: OverseerSignal, + ) { + overseer + .send(FromOverseer::Signal(signal)) + .timeout(TIMEOUT) + .await + .expect(&format!("{:?} is more than enough for sending signals.", TIMEOUT)); + } + + #[test] + fn advertise_and_send_collation() { + let test_state = TestState::default(); + + test_harness(test_state.our_collator_pair.public(), |test_harness| async move { + let current = test_state.relay_parent; + let TestHarness { + mut virtual_overseer, + } = test_harness; + + let pov_block = PoV { + block_data: BlockData(vec![42, 43, 44]), + }; + + let pov_hash = pov_block.hash(); + + let candidate = TestCandidateBuilder { + para_id: test_state.chain_ids[0], + relay_parent: test_state.relay_parent, + pov_hash, + ..Default::default() + }.build(); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::CollateOn(test_state.chain_ids[0]) + ).await; + + overseer_signal( + &mut virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: smallvec![current.clone()], + deactivated: smallvec![], + }), + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(View(vec![current])), + ), + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone()), + ).await; + + // obtain the availability cores. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::AvailabilityCores(tx) + )) => { + assert_eq!(relay_parent, current); + tx.send(Ok(test_state.availability_cores.clone())).unwrap(); + } + ); + + // Obtain the validator groups + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::ValidatorGroups(tx) + )) => { + assert_eq!(relay_parent, current); + tx.send(Ok(test_state.validator_groups.clone())).unwrap(); + } + ); + + // obtain the validators per relay parent + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(tx), + )) => { + assert_eq!(relay_parent, current); + tx.send(Ok(test_state.validator_public.clone())).unwrap(); + } + ); + + // We now should connect to our validator group. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ConnectToValidators( + peer_set, + validators, + tx, + ) + ) => { + assert_eq!(peer_set, PeerSet::Collation); + assert_eq!(validators.len(), 4); + assert!(validators.contains(&test_state.validator_public[2])); + assert!(validators.contains(&test_state.validator_public[0])); + assert!(validators.contains(&test_state.validator_public[4])); + assert!(validators.contains(&test_state.validator_public[1])); + + tx.send(vec![ + (test_state.validator_public[2].clone(), test_state.validator_peer_id[2].clone()), + (test_state.validator_public[0].clone(), test_state.validator_peer_id[0].clone()), + (test_state.validator_public[4].clone(), test_state.validator_peer_id[4].clone()), + (test_state.validator_public[1].clone(), test_state.validator_peer_id[1].clone()), + ]).unwrap(); + } + ); + + // Validator 2 connects. + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + test_state.validator_peer_id[2].clone(), + ObservedRole::Authority, + ) + ), + ).await; + + // We declare to the connected validator that we are a collator. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendCollationMessage( + to, + protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + ) + ) => { + assert_eq!(to, vec![test_state.validator_peer_id[2].clone()]); + assert_matches!( + wire_message, + protocol_v1::CollatorProtocolMessage::Declare(collator_id) => { + assert_eq!(collator_id, test_state.our_collator_pair.public()); + } + ); + } + ); + + // Send info about peer's view. + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange( + test_state.validator_peer_id[2].clone(), + View(vec![current]), + ) + ) + ).await; + + // The peer is interested in a leaf that we have a collation for; + // advertise it. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendCollationMessage( + to, + protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + ) + ) => { + assert_eq!(to, vec![test_state.validator_peer_id[2].clone()]); + assert_matches!( + wire_message, + protocol_v1::CollatorProtocolMessage::AdvertiseCollation( + relay_parent, + collating_on, + ) => { + assert_eq!(relay_parent, current); + assert_eq!(collating_on, test_state.chain_ids[0]); + } + ); + } + ); + + let request_id = 42; + + // Request a collation. + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + test_state.validator_peer_id[2].clone(), + protocol_v1::CollatorProtocolMessage::RequestCollation( + request_id, + current, + test_state.chain_ids[0], + ) + ) + ) + ).await; + + // Wait for the reply. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendCollationMessage( + to, + protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + ) + ) => { + assert_eq!(to, vec![test_state.validator_peer_id[2].clone()]); + assert_matches!( + wire_message, + protocol_v1::CollatorProtocolMessage::Collation(req_id, receipt, pov) => { + assert_eq!(req_id, request_id); + assert_eq!(receipt, candidate); + assert_eq!(pov, pov_block); + } + ); + } + ); + + let new_head = Hash::repeat_byte(0xA); + + // Collator's view moves on. + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(View(vec![new_head])), + ), + ).await; + + let request_id = 43; + + // Re-request a collation. + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + test_state.validator_peer_id[2].clone(), + protocol_v1::CollatorProtocolMessage::RequestCollation( + request_id, + current, + test_state.chain_ids[0], + ) + ) + ) + ).await; + + assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none()); + }); + } +} diff --git a/polkadot/node/network/collator-protocol/src/lib.rs b/polkadot/node/network/collator-protocol/src/lib.rs new file mode 100644 index 0000000000..bf1c009b44 --- /dev/null +++ b/polkadot/node/network/collator-protocol/src/lib.rs @@ -0,0 +1,136 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! The Collator Protocol allows collators and validators talk to each other. +//! This subsystem implements both sides of the collator protocol. + +#![deny(missing_docs)] + +use std::time::Duration; +use futures::{channel::oneshot, FutureExt}; +use log::trace; + +use polkadot_subsystem::{ + Subsystem, SubsystemContext, SubsystemError, SpawnedSubsystem, + errors::RuntimeApiError, + metrics::{self, prometheus}, + messages::{ + AllMessages, CollatorProtocolMessage, NetworkBridgeMessage, + }, +}; +use polkadot_node_network_protocol::{ + PeerId, ReputationChange as Rep, +}; +use polkadot_primitives::v1::CollatorId; +use polkadot_node_subsystem_util as util; + +mod collator_side; +mod validator_side; + +const TARGET: &'static str = "colp"; +const REQUEST_TIMEOUT: Duration = Duration::from_secs(1); + +#[derive(Debug, derive_more::From)] +enum Error { + #[from] + Subsystem(SubsystemError), + #[from] + Oneshot(oneshot::Canceled), + #[from] + RuntimeApi(RuntimeApiError), + #[from] + UtilError(util::Error), +} + +type Result = std::result::Result; + +enum ProtocolSide { + Validator, + Collator(CollatorId), +} + +/// The collator protocol subsystem. +pub struct CollatorProtocolSubsystem { + protocol_side: ProtocolSide, +} + +impl CollatorProtocolSubsystem { + /// Start the collator protocol. + /// If `id` is `Some` this is a collator side of the protocol. + /// If `id` is `None` this is a validator side of the protocol. + pub fn new(id: Option) -> Self { + let protocol_side = match id { + Some(id) => ProtocolSide::Collator(id), + None => ProtocolSide::Validator, + }; + + Self { + protocol_side, + } + } + + async fn run(self, ctx: Context) -> Result<()> + where + Context: SubsystemContext, + { + match self.protocol_side { + ProtocolSide::Validator => validator_side::run(ctx, REQUEST_TIMEOUT).await, + ProtocolSide::Collator(id) => collator_side::run(ctx, id).await, + } + } +} + +/// Collator protocol metrics. +#[derive(Default, Clone)] +pub struct Metrics; + +impl metrics::Metrics for Metrics { + fn try_register(_registry: &prometheus::Registry) + -> std::result::Result { + Ok(Metrics) + } +} + +impl Subsystem for CollatorProtocolSubsystem +where + Context: SubsystemContext + Sync + Send, +{ + type Metrics = Metrics; + + fn start(self, ctx: Context) -> SpawnedSubsystem { + SpawnedSubsystem { + name: "collator-protocol-subsystem", + future: Box::pin(async move { self.run(ctx) }.map(|_| ())), + } + } +} + +/// Modify the reputation of a peer based on its behavior. +async fn modify_reputation(ctx: &mut Context, peer: PeerId, rep: Rep) -> Result<()> +where + Context: SubsystemContext, +{ + trace!( + target: TARGET, + "Reputation change of {:?} for peer {:?}", rep, peer, + ); + + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer(peer, rep), + )).await?; + + Ok(()) +} diff --git a/polkadot/node/network/collator-protocol/src/validator_side.rs b/polkadot/node/network/collator-protocol/src/validator_side.rs new file mode 100644 index 0000000000..a41d2628b5 --- /dev/null +++ b/polkadot/node/network/collator-protocol/src/validator_side.rs @@ -0,0 +1,1224 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use std::collections::{HashMap, HashSet}; +use std::time::Duration; +use std::task::Poll; + +use futures::{ + StreamExt, + channel::oneshot, + future::BoxFuture, + stream::FuturesUnordered, +}; +use log::{trace, warn}; + +use polkadot_primitives::v1::{ + Id as ParaId, CandidateReceipt, CollatorId, Hash, PoV, +}; +use polkadot_subsystem::{ + FromOverseer, OverseerSignal, SubsystemContext, + messages::{ + AllMessages, CandidateSelectionMessage, CollatorProtocolMessage, NetworkBridgeMessage, + }, +}; +use polkadot_node_network_protocol::{ + v1 as protocol_v1, View, PeerId, ReputationChange as Rep, RequestId, + NetworkBridgeEvent, +}; +use polkadot_node_subsystem_util::TimeoutExt; + +use super::{modify_reputation, TARGET, Result}; + +const COST_UNEXPECTED_MESSAGE: Rep = Rep::new(-10, "An unexpected message"); +const COST_REQUEST_TIMED_OUT: Rep = Rep::new(-20, "A collation request has timed out"); +const COST_REPORT_BAD: Rep = Rep::new(-50, "A collator was reported by another subsystem"); +const BENEFIT_NOTIFY_GOOD: Rep = Rep::new(50, "A collator was noted good by another subsystem"); + +#[derive(Debug)] +enum CollationRequestResult { + Received(RequestId), + Timeout(RequestId), +} + +/// A Future representing an ongoing collation request. +/// It may timeout or end in a graceful fashion if a requested +/// collation has been received sucessfully or chain has moved on. +struct CollationRequest { + // The response for this request has been received successfully or + // chain has moved forward and this request is no longer relevant. + received: oneshot::Receiver<()>, + + // The timeout of this request. + timeout: Duration, + + // The id of this request. + request_id: RequestId, +} + +impl CollationRequest { + async fn wait(self) -> CollationRequestResult { + use CollationRequestResult::*; + + let CollationRequest { + received, + timeout, + request_id, + } = self; + + + match received.timeout(timeout).await { + None => Timeout(request_id), + Some(_) => Received(request_id), + } + } +} + +struct PerRequest { + // The sender side to signal the `CollationRequest` to resolve successfully. + received: oneshot::Sender<()>, + + // Send result here. + result: oneshot::Sender<(CandidateReceipt, PoV)>, +} + +/// All state relevant for the validator side of the protocol lives here. +#[derive(Default)] +struct State { + /// Our own view. + view: View, + + /// Track all active collators and their views. + peer_views: HashMap, + + /// Peers that have declared themselves as collators. + known_collators: HashMap, + + /// Advertisments received from collators. We accept one advertisment + /// per collator per source per relay-parent. + advertisments: HashMap>, + + /// Derive RequestIds from this. + next_request_id: RequestId, + + /// The collations we have requested by relay parent and para id. + /// + /// For each relay parent and para id we may be connected to a number + /// of collators each of those may have advertised a different collation. + /// So we group such cases here. + requested_collations: HashMap<(Hash, ParaId, PeerId), RequestId>, + + /// Housekeeping handles we need to have per request to: + /// - cancel ongoing requests + /// - reply with collations to other subsystems. + requests_info: HashMap, + + /// Collation requests that are currently in progress. + requests_in_progress: FuturesUnordered>, + + /// Delay after which a collation request would time out. + request_timeout: Duration, + + /// Possessed collations. + collations: HashMap<(Hash, ParaId), Vec<(CollatorId, CandidateReceipt, PoV)>>, +} + +/// Another subsystem has requested to fetch collations on a particular leaf for some para. +async fn fetch_collation( + ctx: &mut Context, + state: &mut State, + relay_parent: Hash, + collator_id: CollatorId, + para_id: ParaId, + tx: oneshot::Sender<(CandidateReceipt, PoV)> +) -> Result<()> +where + Context: SubsystemContext +{ + // First take a look if we have already stored some of the relevant collations. + if let Some(collations) = state.collations.get(&(relay_parent, para_id)) { + for collation in collations.iter() { + if collation.0 == collator_id { + if let Err(e) = tx.send((collation.1.clone(), collation.2.clone())) { + // We do not want this to be fatal because the receving subsystem + // may have closed the results channel for some reason. + trace!( + target: TARGET, + "Failed to send collation: {:?}", e, + ); + } + return Ok(()); + } + } + } + + // Dodge multiple references to `state`. + let mut relevant_advertiser = None; + + // Has the collator in question advertised a relevant collation? + for (k, v) in state.advertisments.iter() { + if v.contains(&(para_id, relay_parent)) { + if state.known_collators.get(k) == Some(&collator_id) { + relevant_advertiser = Some(k.clone()); + } + } + } + + // Request the collation. + // Assume it is `request_collation`'s job to check and ignore duplicate requests. + if let Some(relevant_advertiser) = relevant_advertiser { + request_collation(ctx, state, relay_parent, para_id, relevant_advertiser, tx).await?; + } + + Ok(()) +} + +/// Report a collator for some malicious actions. +async fn report_collator( + ctx: &mut Context, + state: &mut State, + id: CollatorId, +) -> Result<()> +where + Context: SubsystemContext +{ + // Since we have a one way map of PeerId -> CollatorId we have to + // iterate here. Since a huge amount of peers is not expected this + // is a tolerable thing to do. + for (k, v) in state.known_collators.iter() { + if *v == id { + modify_reputation(ctx, k.clone(), COST_REPORT_BAD).await?; + } + } + + Ok(()) +} + +/// Some other subsystem has reported a collator as a good one, bump reputation. +async fn note_good_collation( + ctx: &mut Context, + state: &mut State, + id: CollatorId, +) -> Result<()> +where + Context: SubsystemContext +{ + for (peer_id, collator_id) in state.known_collators.iter() { + if id == *collator_id { + modify_reputation(ctx, peer_id.clone(), BENEFIT_NOTIFY_GOOD).await?; + } + } + + Ok(()) +} + +/// A peer's view has changed. A number of things should be done: +/// - Ongoing collation requests have to be cancelled. +/// - Advertisments by this peer that are no longer relevant have to be removed. +async fn handle_peer_view_change( + state: &mut State, + peer_id: PeerId, + view: View, +) -> Result<()> { + let current = state.peer_views.entry(peer_id.clone()).or_default(); + + let removed: Vec<_> = current.difference(&view).cloned().collect(); + + *current = view; + + if let Some(advertisments) = state.advertisments.get_mut(&peer_id) { + advertisments.retain(|(_, relay_parent)| !removed.contains(relay_parent)); + } + + let mut requests_to_cancel = Vec::new(); + + for removed in removed.into_iter() { + state.requested_collations.retain(|k, v| { + if k.0 == removed { + requests_to_cancel.push(*v); + false + } else { + true + } + }); + } + + for r in requests_to_cancel.into_iter() { + if let Some(per_request) = state.requests_info.remove(&r) { + per_request.received.send(()).map_err(|_| oneshot::Canceled)?; + } + } + + Ok(()) +} + +/// We have received a collation. +/// - Cancel all ongoing requests +/// - Reply to interested parties if any +/// - Store collation. +async fn received_collation( + ctx: &mut Context, + state: &mut State, + origin: PeerId, + request_id: RequestId, + receipt: CandidateReceipt, + pov: PoV, +) -> Result<()> +where + Context: SubsystemContext +{ + let relay_parent = receipt.descriptor.relay_parent; + let para_id = receipt.descriptor.para_id; + + if let Some(id) = state.requested_collations.remove( + &(relay_parent, para_id, origin.clone()) + ) { + if id == request_id { + if let Some(per_request) = state.requests_info.remove(&id) { + let _ = per_request.received.send(()); + if let Some(collator_id) = state.known_collators.get(&origin) { + let _ = per_request.result.send((receipt.clone(), pov.clone())); + + state.collations + .entry((relay_parent, para_id)) + .or_default() + .push((collator_id.clone(), receipt, pov)); + } + } + } + } else { + // TODO: https://github.com/paritytech/polkadot/issues/1694 + // This is tricky. If our chain has moved on, we have already canceled + // the relevant request and removed it from the map; so and we are not expecting + // this reply although technically it is not a malicious behaviur. + modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await?; + } + + Ok(()) +} + +/// Request a collation from the network. +/// This function will +/// - Check for duplicate requests. +/// - Check if the requested collation is in our view. +/// - Update PerRequest records with the `result` field if necessary. +/// And as such invocations of this function may rely on that. +async fn request_collation( + ctx: &mut Context, + state: &mut State, + relay_parent: Hash, + para_id: ParaId, + peer_id: PeerId, + result: oneshot::Sender<(CandidateReceipt, PoV)>, +) -> Result<()> +where + Context: SubsystemContext +{ + if !state.view.contains(&relay_parent) { + trace!( + target: TARGET, + "Collation by {} on {} on relay parent {} is no longer in view", + peer_id, para_id, relay_parent, + ); + return Ok(()); + } + + if state.requested_collations.contains_key(&(relay_parent, para_id.clone(), peer_id.clone())) { + trace!( + target: TARGET, + "Collation by {} on {} on relay parent {} has already been requested", + peer_id, para_id, relay_parent, + ); + return Ok(()); + } + + let request_id = state.next_request_id; + state.next_request_id += 1; + + let (tx, rx) = oneshot::channel(); + + let per_request = PerRequest { + received: tx, + result, + }; + + let request = CollationRequest { + received: rx, + timeout: state.request_timeout, + request_id, + }; + + state.requested_collations.insert((relay_parent, para_id.clone(), peer_id.clone()), request_id); + + state.requests_info.insert(request_id, per_request); + + state.requests_in_progress.push(Box::pin(async move { + request.wait().await + })); + + let wire_message = protocol_v1::CollatorProtocolMessage::RequestCollation( + request_id, + relay_parent, + para_id, + ); + + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::SendCollationMessage( + vec![peer_id], + protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + ) + )).await?; + + Ok(()) +} + +/// Notify `CandidateSelectionSubsystem` that a collation has been advertised. +async fn notify_candidate_selection( + ctx: &mut Context, + collator: CollatorId, + relay_parent: Hash, + para_id: ParaId, +) -> Result<()> +where + Context: SubsystemContext +{ + ctx.send_message(AllMessages::CandidateSelection( + CandidateSelectionMessage::Collation( + relay_parent, + para_id, + collator, + ) + )).await?; + + Ok(()) +} + +/// Networking message has been received. +async fn process_incoming_peer_message( + ctx: &mut Context, + state: &mut State, + origin: PeerId, + msg: protocol_v1::CollatorProtocolMessage, +)-> Result<()> +where + Context: SubsystemContext +{ + use protocol_v1::CollatorProtocolMessage::*; + + match msg { + Declare(id) => { + state.known_collators.insert(origin.clone(), id); + state.peer_views.entry(origin).or_default(); + } + AdvertiseCollation(relay_parent, para_id) => { + state.advertisments.entry(origin.clone()).or_default().insert((para_id, relay_parent)); + + if let Some(collator) = state.known_collators.get(&origin) { + notify_candidate_selection(ctx, collator.clone(), relay_parent, para_id).await?; + } + } + RequestCollation(_, _, _) => { + // This is a validator side of the protocol, collation requests are not expected here. + return modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; + } + Collation(request_id, receipt, pov) => { + received_collation(ctx, state, origin, request_id, receipt, pov).await?; + } + } + + Ok(()) +} + +/// A leaf has become inactive so we want to +/// - Cancel all ongoing collation requests that are on top of that leaf. +/// - Remove all stored collations relevant to that leaf. +async fn remove_relay_parent( + state: &mut State, + relay_parent: Hash, +) -> Result<()> { + let mut remove_these = Vec::new(); + + state.requested_collations.retain(|k, v| { + if k.0 == relay_parent { + remove_these.push(*v); + } + k.0 != relay_parent + }); + + for id in remove_these.into_iter() { + if let Some(info) = state.requests_info.remove(&id) { + info.received.send(()).map_err(|_| oneshot::Canceled)?; + } + } + + state.collations.retain(|k, _| k.0 != relay_parent); + + Ok(()) +} + +/// Our view has changed. +async fn handle_our_view_change( + state: &mut State, + view: View, +) -> Result<()> { + let old_view = std::mem::replace(&mut (state.view), view); + + let removed = old_view + .difference(&state.view) + .cloned() + .collect::>(); + + for removed in removed.into_iter() { + remove_relay_parent(state, removed).await?; + } + + Ok(()) +} + +/// A request has timed out. +async fn request_timed_out( + ctx: &mut Context, + state: &mut State, + id: RequestId, +) -> Result<()> +where + Context: SubsystemContext +{ + // We have to go backwards in the map, again. + if let Some(key) = find_val_in_map(&state.requested_collations, &id) { + if let Some(_) = state.requested_collations.remove(&key) { + if let Some(_) = state.requests_info.remove(&id) { + let peer_id = key.2; + + modify_reputation(ctx, peer_id, COST_REQUEST_TIMED_OUT).await?; + } + } + } + + Ok(()) +} + +/// Bridge event switch. +async fn handle_network_msg( + ctx: &mut Context, + state: &mut State, + bridge_message: NetworkBridgeEvent, +) -> Result<()> +where + Context: SubsystemContext +{ + use NetworkBridgeEvent::*; + + match bridge_message { + PeerConnected(_id, _role) => { + // A peer has connected. Until it issues a `Declare` message we do not + // want to track it's view or take any other actions. + }, + PeerDisconnected(peer_id) => { + state.peer_views.remove(&peer_id); + }, + PeerViewChange(peer_id, view) => { + handle_peer_view_change(state, peer_id, view).await?; + }, + OurViewChange(view) => { + handle_our_view_change(state, view).await?; + }, + PeerMessage(remote, msg) => { + process_incoming_peer_message(ctx, state, remote, msg).await?; + } + } + + Ok(()) +} + +/// The main message receiver switch. +async fn process_msg( + ctx: &mut Context, + msg: CollatorProtocolMessage, + state: &mut State, +) -> Result<()> +where + Context: SubsystemContext +{ + use CollatorProtocolMessage::*; + + match msg { + CollateOn(id) => { + warn!( + target: TARGET, + "CollateOn({}) message is not expected on the validator side of the protocol", id, + ); + } + DistributeCollation(_, _) => { + warn!( + target: TARGET, + "DistributeCollation message is not expected on the validator side of the protocol", + ); + } + FetchCollation(relay_parent, collator_id, para_id, tx) => { + fetch_collation(ctx, state, relay_parent, collator_id, para_id, tx).await?; + } + ReportCollator(id) => { + report_collator(ctx, state, id).await?; + } + NoteGoodCollation(id) => { + note_good_collation(ctx, state, id).await?; + } + NetworkBridgeUpdateV1(event) => { + if let Err(e) = handle_network_msg( + ctx, + state, + event, + ).await { + warn!( + target: TARGET, + "Failed to handle incoming network message: {:?}", e, + ); + } + } + } + + Ok(()) +} + +/// The main run loop. +pub(crate) async fn run(mut ctx: Context, request_timeout: Duration) -> Result<()> +where + Context: SubsystemContext +{ + use FromOverseer::*; + use OverseerSignal::*; + + let mut state = State { + request_timeout, + ..Default::default() + }; + + loop { + if let Poll::Ready(msg) = futures::poll!(ctx.recv()) { + let msg = msg?; + trace!(target: TARGET, "Received a message {:?}", msg); + + match msg { + Communication { msg } => process_msg(&mut ctx, msg, &mut state).await?, + Signal(BlockFinalized(_)) => {} + Signal(ActiveLeaves(_)) => {} + Signal(Conclude) => { break } + } + continue; + } + + while let Poll::Ready(Some(request)) = futures::poll!(state.requests_in_progress.next()) { + // Request has timed out, we need to penalize the collator and re-send the request + // if the chain has not moved on yet. + match request { + CollationRequestResult::Timeout(id) => { + trace!(target: TARGET, "Request timed out {}", id); + request_timed_out(&mut ctx, &mut state, id).await?; + } + CollationRequestResult::Received(id) => { + state.requests_info.remove(&id); + } + } + } + + futures::pending!(); + } + + Ok(()) +} + +fn find_val_in_map(map: &HashMap, val: &V) -> Option { + map + .iter() + .find_map(|(k, v)| if v == val { Some(k.clone()) } else { None }) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::iter; + use futures::{executor, future, Future}; + use sp_core::crypto::Pair; + use assert_matches::assert_matches; + use futures_timer::Delay; + + use polkadot_primitives::v1::{BlockData, CollatorPair}; + use polkadot_subsystem_testhelpers as test_helpers; + + #[derive(Clone)] + struct TestState { + chain_ids: Vec, + relay_parent: Hash, + collators: Vec, + } + + impl Default for TestState { + fn default() -> Self { + let chain_a = ParaId::from(1); + let chain_b = ParaId::from(2); + + let chain_ids = vec![chain_a, chain_b]; + let relay_parent = Hash::repeat_byte(0x05); + let collators = iter::repeat(()) + .map(|_| CollatorPair::generate().0) + .take(4) + .collect(); + + Self { + chain_ids, + relay_parent, + collators, + } + } + } + + struct TestHarness { + virtual_overseer: test_helpers::TestSubsystemContextHandle, + } + + fn test_harness>(test: impl FnOnce(TestHarness) -> T) { + let _ = env_logger::builder() + .is_test(true) + .filter( + Some("polkadot_collator_protocol"), + log::LevelFilter::Trace, + ) + .filter( + Some(TARGET), + log::LevelFilter::Trace, + ) + .try_init(); + + let pool = sp_core::testing::TaskExecutor::new(); + + let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); + + let subsystem = run(context, Duration::from_millis(50)); + + let test_fut = test(TestHarness { virtual_overseer }); + + futures::pin_mut!(test_fut); + futures::pin_mut!(subsystem); + + executor::block_on(future::select(test_fut, subsystem)); + } + + const TIMEOUT: Duration = Duration::from_millis(100); + + async fn overseer_send( + overseer: &mut test_helpers::TestSubsystemContextHandle, + msg: CollatorProtocolMessage, + ) { + log::trace!("Sending message:\n{:?}", &msg); + overseer + .send(FromOverseer::Communication { msg }) + .timeout(TIMEOUT) + .await + .expect(&format!("{:?} is enough for sending messages.", TIMEOUT)); + } + + async fn overseer_recv( + overseer: &mut test_helpers::TestSubsystemContextHandle, + ) -> AllMessages { + let msg = overseer_recv_with_timeout(overseer, TIMEOUT) + .await + .expect(&format!("{:?} is enough to receive messages.", TIMEOUT)); + + log::trace!("Received message:\n{:?}", &msg); + + msg + } + + async fn overseer_recv_with_timeout( + overseer: &mut test_helpers::TestSubsystemContextHandle, + timeout: Duration, + ) -> Option { + log::trace!("Waiting for message..."); + overseer + .recv() + .timeout(timeout) + .await + } + + // As we receive a relevan advertisment act on it and issue a collation request. + #[test] + fn act_on_advertisment() { + let test_state = TestState::default(); + + test_harness(|test_harness| async move { + let TestHarness { + mut virtual_overseer, + } = test_harness; + + let pair = CollatorPair::generate().0; + log::trace!("activating"); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(View(vec![test_state.relay_parent])) + ) + ).await; + + + let peer_b = PeerId::random(); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + protocol_v1::CollatorProtocolMessage::Declare(pair.public()), + ) + ) + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + protocol_v1::CollatorProtocolMessage::AdvertiseCollation( + test_state.relay_parent, + test_state.chain_ids[0], + ) + ) + ) + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::CandidateSelection(CandidateSelectionMessage::Collation( + relay_parent, + para_id, + collator, + ) + ) => { + assert_eq!(relay_parent, test_state.relay_parent); + assert_eq!(para_id, test_state.chain_ids[0]); + assert_eq!(collator, pair.public()); + }); + }); + } + + // Test that an issued request times out a number of times until our view moves on. + #[test] + fn collation_request_times_out() { + let test_state = TestState::default(); + + test_harness(|test_harness| async move { + let TestHarness { + mut virtual_overseer, + } = test_harness; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(View(vec![test_state.relay_parent])) + ) + ).await; + + let peer_b = PeerId::random(); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + protocol_v1::CollatorProtocolMessage::Declare( + test_state.collators[0].public(), + ), + ) + ) + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + protocol_v1::CollatorProtocolMessage::AdvertiseCollation( + test_state.relay_parent, + test_state.chain_ids[0], + ) + ) + ) + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::CandidateSelection(CandidateSelectionMessage::Collation( + relay_parent, + para_id, + collator, + )) => { + assert_eq!(relay_parent, test_state.relay_parent); + assert_eq!(para_id, test_state.chain_ids[0]); + assert_eq!(collator, test_state.collators[0].public()); + } + ); + + let (tx, _rx) = oneshot::channel(); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::FetchCollation( + test_state.relay_parent, + test_state.collators[0].public(), + test_state.chain_ids[0], + tx, + ) + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendCollationMessage( + peers, + protocol_v1::CollationProtocol::CollatorProtocol( + protocol_v1::CollatorProtocolMessage::RequestCollation( + _id, + relay_parent, + para_id, + ) + ) + ) + ) => { + assert_eq!(relay_parent, test_state.relay_parent); + assert_eq!(peers, vec![peer_b.clone()]); + assert_eq!(para_id, test_state.chain_ids[0]); + }); + + // Don't send a response and we shoud see reputation penalties to the + // collator. + Delay::new(Duration::from_millis(50)).await; + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer(peer, rep) + ) => { + assert_eq!(peer, peer_b); + assert_eq!(rep, COST_REQUEST_TIMED_OUT); + } + ); + + // Deactivate the relay parent in question. + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(View(vec![Hash::repeat_byte(0x42)])) + ) + ).await; + + // After we've deactivated it we are not expecting any more requests + // for timed out collations. + assert!( + overseer_recv_with_timeout( + &mut virtual_overseer, + Duration::from_secs(1), + ).await.is_none() + ); + }); + } + + // Test that other subsystems may modify collators' reputations. + #[test] + fn collator_reporting_works() { + let test_state = TestState::default(); + + test_harness(|test_harness| async move { + let TestHarness { + mut virtual_overseer, + } = test_harness; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(View(vec![test_state.relay_parent])) + ) + ).await; + + let peer_b = PeerId::random(); + let peer_c = PeerId::random(); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + protocol_v1::CollatorProtocolMessage::Declare( + test_state.collators[0].public(), + ), + ) + ) + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_c.clone(), + protocol_v1::CollatorProtocolMessage::Declare( + test_state.collators[1].public(), + ), + ) + ) + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::ReportCollator(test_state.collators[0].public()), + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer(peer, rep), + ) => { + assert_eq!(peer, peer_b); + assert_eq!(rep, COST_REPORT_BAD); + } + ); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NoteGoodCollation(test_state.collators[1].public()), + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer(peer, rep), + ) => { + assert_eq!(peer, peer_c); + assert_eq!(rep, BENEFIT_NOTIFY_GOOD); + } + ); + }); + } + + // A test scenario that takes the following steps + // - Two collators connect, declare themselves and advertise a collation relevant to + // our view. + // - This results subsystem acting upon these advertisments and issuing two messages to + // the CandidateBacking subsystem. + // - CandidateBacking requests both of the collations. + // - Collation protocol requests these collations. + // - The collations are sent to it. + // - Collations are fetched correctly. + #[test] + fn fetch_collations_works() { + let test_state = TestState::default(); + + test_harness(|test_harness| async move { + let TestHarness { + mut virtual_overseer, + } = test_harness; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(View(vec![test_state.relay_parent])) + ) + ).await; + + let peer_b = PeerId::random(); + let peer_c = PeerId::random(); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + protocol_v1::CollatorProtocolMessage::Declare( + test_state.collators[0].public(), + ) + ) + ) + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_c.clone(), + protocol_v1::CollatorProtocolMessage::Declare( + test_state.collators[1].public(), + ) + ) + ) + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + protocol_v1::CollatorProtocolMessage::AdvertiseCollation( + test_state.relay_parent, + test_state.chain_ids[0], + ) + ) + ) + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::CandidateSelection(CandidateSelectionMessage::Collation( + relay_parent, + para_id, + collator, + ) + ) => { + assert_eq!(relay_parent, test_state.relay_parent); + assert_eq!(para_id, test_state.chain_ids[0]); + assert_eq!(collator, test_state.collators[0].public()); + }); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_c.clone(), + protocol_v1::CollatorProtocolMessage::AdvertiseCollation( + test_state.relay_parent, + test_state.chain_ids[0], + ) + ) + ) + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::CandidateSelection(CandidateSelectionMessage::Collation( + relay_parent, + para_id, + collator, + ) + ) => { + assert_eq!(relay_parent, test_state.relay_parent); + assert_eq!(para_id, test_state.chain_ids[0]); + assert_eq!(collator, test_state.collators[1].public()); + }); + + let (tx_0, rx_0) = oneshot::channel(); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::FetchCollation( + test_state.relay_parent, + test_state.collators[0].public(), + test_state.chain_ids[0], + tx_0, + ) + ).await; + + let (tx_1, rx_1) = oneshot::channel(); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::FetchCollation( + test_state.relay_parent, + test_state.collators[1].public(), + test_state.chain_ids[0], + tx_1, + ) + ).await; + + let (request_id, peer_id) = assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendCollationMessage( + peers, + protocol_v1::CollationProtocol::CollatorProtocol( + protocol_v1::CollatorProtocolMessage::RequestCollation( + id, + relay_parent, + para_id, + ) + ) + ) + ) => { + assert_eq!(relay_parent, test_state.relay_parent); + assert_eq!(para_id, test_state.chain_ids[0]); + (id, peers[0].clone()) + }); + + let mut candidate_a = CandidateReceipt::default(); + candidate_a.descriptor.para_id = test_state.chain_ids[0]; + candidate_a.descriptor.relay_parent = test_state.relay_parent; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_id, + protocol_v1::CollatorProtocolMessage::Collation( + request_id, + candidate_a.clone(), + PoV { + block_data: BlockData(vec![]), + }, + ) + ) + ) + ).await; + + let (request_id, peer_id) = assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendCollationMessage( + peers, + protocol_v1::CollationProtocol::CollatorProtocol( + protocol_v1::CollatorProtocolMessage::RequestCollation( + id, + relay_parent, + para_id, + ) + ) + ) + ) => { + assert_eq!(relay_parent, test_state.relay_parent); + assert_eq!(para_id, test_state.chain_ids[0]); + (id, peers[0].clone()) + }); + + let mut candidate_b = CandidateReceipt::default(); + candidate_b.descriptor.para_id = test_state.chain_ids[0]; + candidate_b.descriptor.relay_parent = test_state.relay_parent; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_id, + protocol_v1::CollatorProtocolMessage::Collation( + request_id, + candidate_b.clone(), + PoV { + block_data: BlockData(vec![1, 2, 3]), + }, + ) + ) + ) + ).await; + + let collation_0 = rx_0.await.unwrap(); + let collation_1 = rx_1.await.unwrap(); + + assert_eq!(collation_0.0, candidate_a); + assert_eq!(collation_1.0, candidate_b); + }); + } +} diff --git a/polkadot/node/subsystem-test-helpers/Cargo.toml b/polkadot/node/subsystem-test-helpers/Cargo.toml index 869fb74d06..220af40deb 100644 --- a/polkadot/node/subsystem-test-helpers/Cargo.toml +++ b/polkadot/node/subsystem-test-helpers/Cargo.toml @@ -16,6 +16,7 @@ parking_lot = "0.10.0" pin-project = "0.4.23" polkadot-node-primitives = { path = "../primitives" } polkadot-node-subsystem = { path = "../subsystem" } +polkadot-node-subsystem-util = { path = "../subsystem-util" } polkadot-primitives = { path = "../../primitives" } polkadot-statement-table = { path = "../../statement-table" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/subsystem-test-helpers/src/lib.rs b/polkadot/node/subsystem-test-helpers/src/lib.rs index 4d5be56cdf..817cad37f0 100644 --- a/polkadot/node/subsystem-test-helpers/src/lib.rs +++ b/polkadot/node/subsystem-test-helpers/src/lib.rs @@ -18,13 +18,12 @@ use polkadot_node_subsystem::messages::AllMessages; use polkadot_node_subsystem::{FromOverseer, SubsystemContext, SubsystemError, SubsystemResult}; +use polkadot_node_subsystem_util::TimeoutExt; use futures::channel::mpsc; use futures::poll; use futures::prelude::*; -use futures_timer::Delay; use parking_lot::Mutex; -use pin_project::pin_project; use sp_core::{testing::TaskExecutor, traits::SpawnNamed}; use std::convert::Infallible; @@ -284,45 +283,3 @@ pub fn subsystem_test_harness( .expect("test timed out instead of completing") }); } - -/// A future that wraps another future with a `Delay` allowing for time-limited futures. -#[pin_project] -pub struct Timeout { - #[pin] - future: F, - #[pin] - delay: Delay, -} - -/// Extends `Future` to allow time-limited futures. -pub trait TimeoutExt: Future { - fn timeout(self, duration: Duration) -> Timeout - where - Self: Sized, - { - Timeout { - future: self, - delay: Delay::new(duration), - } - } -} - -impl TimeoutExt for F {} - -impl Future for Timeout { - type Output = Option; - - fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll { - let this = self.project(); - - if this.delay.poll(ctx).is_ready() { - return Poll::Ready(None); - } - - if let Poll::Ready(output) = this.future.poll(ctx) { - return Poll::Ready(Some(output)); - } - - Poll::Pending - } -} diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index 0cd7a05c31..0a44a327d6 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -50,6 +50,7 @@ use std::{ convert::{TryFrom, TryInto}, marker::Unpin, pin::Pin, + task::{Poll, Context}, time::Duration, }; use streamunordered::{StreamUnordered, StreamYield}; @@ -973,9 +974,51 @@ macro_rules! delegated_subsystem { }; } +/// A future that wraps another future with a `Delay` allowing for time-limited futures. +#[pin_project] +pub struct Timeout { + #[pin] + future: F, + #[pin] + delay: Delay, +} + +/// Extends `Future` to allow time-limited futures. +pub trait TimeoutExt: Future { + fn timeout(self, duration: Duration) -> Timeout + where + Self: Sized, + { + Timeout { + future: self, + delay: Delay::new(duration), + } + } +} + +impl TimeoutExt for F {} + +impl Future for Timeout { + type Output = Option; + + fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll { + let this = self.project(); + + if this.delay.poll(ctx).is_ready() { + return Poll::Ready(None); + } + + if let Poll::Ready(output) = this.future.poll(ctx) { + return Poll::Ready(Some(output)); + } + + Poll::Pending + } +} + #[cfg(test)] mod tests { - use super::{Error as UtilError, JobManager, JobTrait, JobsError, ToJobTrait}; + use super::{Error as UtilError, JobManager, JobTrait, JobsError, TimeoutExt, ToJobTrait}; use polkadot_node_subsystem::{ messages::{AllMessages, CandidateSelectionMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, @@ -988,7 +1031,7 @@ mod tests { future, Future, FutureExt, SinkExt, }; use polkadot_primitives::v1::Hash; - use polkadot_node_subsystem_test_helpers::{self as test_helpers, make_subsystem_context, TimeoutExt as _}; + use polkadot_node_subsystem_test_helpers::{self as test_helpers, make_subsystem_context}; use std::{collections::HashMap, convert::TryFrom, pin::Pin, time::Duration}; // basic usage: in a nutshell, when you want to define a subsystem, just focus on what its jobs do; diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index afc1667867..ad8b5a3bb1 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -160,7 +160,7 @@ pub enum CollatorProtocolMessage { /// Provide a collation to distribute to validators. DistributeCollation(CandidateReceipt, PoV), /// Fetch a collation under the given relay-parent for the given ParaId. - FetchCollation(Hash, ParaId, oneshot::Sender<(CandidateReceipt, PoV)>), + FetchCollation(Hash, CollatorId, ParaId, oneshot::Sender<(CandidateReceipt, PoV)>), /// Report a collator as having provided an invalid collation. This should lead to disconnect /// and blacklist of the collator. ReportCollator(CollatorId), @@ -176,7 +176,7 @@ impl CollatorProtocolMessage { match self { Self::CollateOn(_) => None, Self::DistributeCollation(receipt, _) => Some(receipt.descriptor().relay_parent), - Self::FetchCollation(relay_parent, _, _) => Some(*relay_parent), + Self::FetchCollation(relay_parent, _, _, _) => Some(*relay_parent), Self::ReportCollator(_) => None, Self::NoteGoodCollation(_) => None, Self::NetworkBridgeUpdateV1(_) => None, diff --git a/polkadot/runtime/westend/src/lib.rs b/polkadot/runtime/westend/src/lib.rs index 687324fa77..8093b44a1f 100644 --- a/polkadot/runtime/westend/src/lib.rs +++ b/polkadot/runtime/westend/src/lib.rs @@ -85,7 +85,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { impl_name: create_runtime_str!("parity-westend"), authoring_version: 2, spec_version: 43, - impl_version: 0, + impl_version: 1, #[cfg(not(feature = "disable-runtime-api"))] apis: RUNTIME_API_VERSIONS, #[cfg(feature = "disable-runtime-api")]