Tests for new network code (#897)

* move protocol.rs into subfolder

* add trait for mocking network behavior

* add a mock version of network ops

* remove some redundant parameters from service messages

* ensure fetching erasure chunks automatically cancels

* introduce dummy ProvideRuntimeApi

* abstract over gossip somewhat

* add mock gossip handler

* skeleton test

* remove dependence of shared table on router

* remove worker dependence on its own sender

* test shutdown

* add tests

* test that gossip streams are cleaned up correctly

* refactor worker out into its own struct and reduce bound on executor

* remove reliance of tests on global thread pool
This commit is contained in:
Robert Habermeier
2020-03-16 06:17:08 -04:00
committed by GitHub
parent a81d8cb220
commit 260b2fa336
5 changed files with 924 additions and 179 deletions
@@ -335,6 +335,11 @@ pub struct NewLeafActions {
} }
impl NewLeafActions { impl NewLeafActions {
#[cfg(test)]
pub fn new() -> Self {
NewLeafActions { actions: Vec::new() }
}
/// Perform the queued actions, feeding into gossip. /// Perform the queued actions, feeding into gossip.
pub fn perform( pub fn perform(
self, self,
@@ -42,10 +42,11 @@ use polkadot_validation::{
}; };
use sc_network::{config::Roles, Event, PeerId}; use sc_network::{config::Roles, Event, PeerId};
use sp_api::ProvideRuntimeApi; use sp_api::ProvideRuntimeApi;
use sp_runtime::ConsensusEngineId;
use std::collections::{hash_map::{Entry, HashMap}, HashSet}; use std::collections::{hash_map::{Entry, HashMap}, HashSet};
use std::pin::Pin; use std::pin::Pin;
use std::sync::{Arc, Weak}; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use super::{cost, benefit, PolkadotNetworkService}; use super::{cost, benefit, PolkadotNetworkService};
@@ -58,12 +59,15 @@ pub const VERSION: u32 = 1;
pub const MIN_SUPPORTED_VERSION: u32 = 1; pub const MIN_SUPPORTED_VERSION: u32 = 1;
/// The engine ID of the polkadot network protocol. /// The engine ID of the polkadot network protocol.
pub const POLKADOT_ENGINE_ID: sp_runtime::ConsensusEngineId = *b"dot2"; pub const POLKADOT_ENGINE_ID: ConsensusEngineId = *b"dot2";
/// The protocol name. /// The protocol name.
pub const POLKADOT_PROTOCOL_NAME: &[u8] = b"/polkadot/1"; pub const POLKADOT_PROTOCOL_NAME: &[u8] = b"/polkadot/1";
pub use crate::legacy::gossip::ChainContext; pub use crate::legacy::gossip::ChainContext;
#[cfg(test)]
mod tests;
// Messages from the service API or network adapter. // Messages from the service API or network adapter.
enum ServiceToWorkerMsg { enum ServiceToWorkerMsg {
// basic peer messages. // basic peer messages.
@@ -72,16 +76,14 @@ enum ServiceToWorkerMsg {
PeerDisconnected(PeerId), PeerDisconnected(PeerId),
// service messages. // service messages.
BuildConsensusNetworking(Arc<SharedTable>, Vec<ValidatorId>, oneshot::Sender<Router>), BuildConsensusNetworking(Arc<SharedTable>, Vec<ValidatorId>),
DropConsensusNetworking(Hash), DropConsensusNetworking(Hash),
SubmitValidatedCollation( SubmitValidatedCollation(
Hash, // relay-parent
AbridgedCandidateReceipt, AbridgedCandidateReceipt,
PoVBlock, PoVBlock,
(ValidatorIndex, Vec<ErasureChunk>), (ValidatorIndex, Vec<ErasureChunk>),
), ),
FetchPoVBlock( FetchPoVBlock(
Hash, // relay-parent
AbridgedCandidateReceipt, AbridgedCandidateReceipt,
oneshot::Sender<PoVBlock>, oneshot::Sender<PoVBlock>,
), ),
@@ -113,13 +115,87 @@ enum ServiceToWorkerMsg {
Hash, // relay-parent, Hash, // relay-parent,
oneshot::Sender<Pin<Box<dyn Stream<Item = SignedStatement> + Send>>>, oneshot::Sender<Pin<Box<dyn Stream<Item = SignedStatement> + Send>>>,
), ),
/// Used in tests to ensure that all other messages sent from the same
/// thread have been flushed. Also executes arbitrary logic with the protocl
/// handler.
#[cfg(test)]
Synchronize(Box<dyn FnOnce(&mut ProtocolHandler) + Send>),
}
/// Messages from a background task to the main worker task.
enum BackgroundToWorkerMsg {
// Spawn a given future.
Spawn(future::BoxFuture<'static, ()>),
}
/// Operations that a handle to an underlying network service should provide.
trait NetworkServiceOps: Send + Sync {
/// Report the peer as having a particular positive or negative value.
fn report_peer(&self, peer: PeerId, value: sc_network::ReputationChange);
/// Write a notification to a given peer.
fn write_notification(
&self,
peer: PeerId,
engine_id: ConsensusEngineId,
notification: Vec<u8>,
);
}
impl NetworkServiceOps for PolkadotNetworkService {
fn report_peer(&self, peer: PeerId, value: sc_network::ReputationChange) {
PolkadotNetworkService::report_peer(self, peer, value);
}
fn write_notification(
&self,
peer: PeerId,
engine_id: ConsensusEngineId,
notification: Vec<u8>,
) {
PolkadotNetworkService::write_notification(self, peer, engine_id, notification);
}
}
/// Operations that a handle to a gossip network should provide.
trait GossipOps: Clone + Send + crate::legacy::GossipService + 'static {
fn new_local_leaf(
&self,
relay_parent: Hash,
validation_data: crate::legacy::gossip::MessageValidationData,
) -> crate::legacy::gossip::NewLeafActions;
/// Register an availability store in the gossip service to evaluate incoming
/// messages with.
fn register_availability_store(
&self,
store: av_store::Store,
);
}
impl GossipOps for RegisteredMessageValidator {
fn new_local_leaf(
&self,
relay_parent: Hash,
validation_data: crate::legacy::gossip::MessageValidationData,
) -> crate::legacy::gossip::NewLeafActions {
RegisteredMessageValidator::new_local_leaf(self, relay_parent, validation_data)
}
fn register_availability_store(
&self,
store: av_store::Store,
) {
RegisteredMessageValidator::register_availability_store(self, store);
}
} }
/// An async handle to the network service. /// An async handle to the network service.
#[derive(Clone)] #[derive(Clone)]
pub struct Service { pub struct Service {
sender: mpsc::Sender<ServiceToWorkerMsg>, sender: mpsc::Sender<ServiceToWorkerMsg>,
network_service: Arc<PolkadotNetworkService>, network_service: Arc<dyn NetworkServiceOps>,
} }
/// Registers the protocol. /// Registers the protocol.
@@ -153,7 +229,6 @@ pub fn start<C, Api, SP>(
config, config,
service.clone(), service.clone(),
gossip_validator, gossip_validator,
worker_sender.clone(),
api, api,
worker_receiver, worker_receiver,
executor.clone(), executor.clone(),
@@ -305,6 +380,28 @@ struct ConsensusNetworkingInstance {
_drop_signal: exit_future::Signal, _drop_signal: exit_future::Signal,
} }
/// A utility future that resolves when the receiving end of a channel has hung up.
///
/// This is an `.await`-friendly interface around `poll_canceled`.
// TODO: remove in favor of https://github.com/rust-lang/futures-rs/pull/2092/
// once published.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct AwaitCanceled<'a, T> {
inner: &'a mut oneshot::Sender<T>,
}
impl<T> Future for AwaitCanceled<'_, T> {
type Output = ();
fn poll(
mut self: Pin<&mut Self>,
cx: &mut futures::task::Context<'_>,
) -> futures::task::Poll<()> {
self.inner.poll_canceled(cx)
}
}
/// Protocol configuration. /// Protocol configuration.
#[derive(Default)] #[derive(Default)]
pub struct Config { pub struct Config {
@@ -356,33 +453,37 @@ impl RecentValidatorIds {
} }
struct ProtocolHandler { struct ProtocolHandler {
service: Arc<PolkadotNetworkService>, service: Arc<dyn NetworkServiceOps>,
peers: HashMap<PeerId, PeerData>, peers: HashMap<PeerId, PeerData>,
// reverse mapping from validator-ID to PeerID. Multiple peers can represent // reverse mapping from validator-ID to PeerID. Multiple peers can represent
// the same validator because of sentry nodes. // the same validator because of sentry nodes.
connected_validators: HashMap<ValidatorId, HashSet<PeerId>>, connected_validators: HashMap<ValidatorId, HashSet<PeerId>>,
consensus_instances: HashMap<Hash, ConsensusNetworkingInstance>,
collators: crate::legacy::collator_pool::CollatorPool, collators: crate::legacy::collator_pool::CollatorPool,
local_collations: crate::legacy::local_collations::LocalCollations<Collation>, local_collations: crate::legacy::local_collations::LocalCollations<Collation>,
config: Config, config: Config,
local_keys: RecentValidatorIds,
} }
impl ProtocolHandler { impl ProtocolHandler {
fn new( fn new(
service: Arc<PolkadotNetworkService>, service: Arc<dyn NetworkServiceOps>,
config: Config, config: Config,
) -> Self { ) -> Self {
ProtocolHandler { ProtocolHandler {
service, service,
peers: HashMap::new(), peers: HashMap::new(),
connected_validators: HashMap::new(), connected_validators: HashMap::new(),
consensus_instances: HashMap::new(),
collators: Default::default(), collators: Default::default(),
local_collations: Default::default(), local_collations: Default::default(),
local_keys: Default::default(),
config, config,
} }
} }
fn on_connect(&mut self, peer: PeerId, roles: Roles) { fn on_connect(&mut self, peer: PeerId, roles: Roles) {
let claimed_validator = roles.contains(sc_network::config::Roles::AUTHORITY); let claimed_validator = roles.contains(Roles::AUTHORITY);
self.peers.insert(peer.clone(), PeerData { self.peers.insert(peer.clone(), PeerData {
claimed_validator, claimed_validator,
@@ -586,6 +687,7 @@ impl ProtocolHandler {
if let Some(invalidated) = invalidated_key { if let Some(invalidated) = invalidated_key {
self.validator_representative_removed(invalidated, &remote); self.validator_representative_removed(invalidated, &remote);
} }
self.connected_validators.entry(key).or_insert_with(HashSet::new).insert(remote.clone());
send_peer_collations(&*self.service, remote, collations_to_send); send_peer_collations(&*self.service, remote, collations_to_send);
} }
@@ -658,10 +760,15 @@ impl ProtocolHandler {
} }
} }
} }
fn drop_consensus_networking(&mut self, relay_parent: &Hash) {
// this triggers an abort of the background task.
self.consensus_instances.remove(relay_parent);
}
} }
fn send_peer_collations( fn send_peer_collations(
service: &PolkadotNetworkService, service: &dyn NetworkServiceOps,
remote: PeerId, remote: PeerId,
collations: impl IntoIterator<Item=(Hash, Collation)>, collations: impl IntoIterator<Item=(Hash, Collation)>,
) { ) {
@@ -674,102 +781,90 @@ fn send_peer_collations(
} }
} }
async fn worker_loop<Api, Sp>( struct Worker<Api, Sp, Gossip> {
config: Config, protocol_handler: ProtocolHandler,
service: Arc<PolkadotNetworkService>,
gossip_handle: RegisteredMessageValidator,
sender: mpsc::Sender<ServiceToWorkerMsg>,
api: Arc<Api>, api: Arc<Api>,
mut receiver: mpsc::Receiver<ServiceToWorkerMsg>,
executor: Sp, executor: Sp,
) where gossip_handle: Gossip,
background_to_main_sender: mpsc::Sender<BackgroundToWorkerMsg>,
background_receiver: mpsc::Receiver<BackgroundToWorkerMsg>,
service_receiver: mpsc::Receiver<ServiceToWorkerMsg>,
}
impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
Api: ProvideRuntimeApi<Block> + Send + Sync + 'static, Api: ProvideRuntimeApi<Block> + Send + Sync + 'static,
Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>, Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
Sp: Spawn + Clone + Send + 'static, Sp: Spawn + Clone,
Gossip: GossipOps,
{ {
const COLLECT_GARBAGE_INTERVAL: Duration = Duration::from_secs(29); // spawns a background task to spawn consensus networking.
fn build_consensus_networking(
&mut self,
table: Arc<SharedTable>,
authorities: Vec<ValidatorId>,
) {
// glue: let gossip know about our new local leaf.
let relay_parent = table.consensus_parent_hash().clone();
let (signal, exit) = exit_future::signal();
let mut protocol_handler = ProtocolHandler::new(service, config); let key = table.session_key();
let mut consensus_instances = HashMap::new(); if let Some(key) = key {
let mut local_keys = RecentValidatorIds::default(); if let InsertedRecentKey::New(_) = self.protocol_handler.local_keys.insert(key.clone()) {
self.protocol_handler.distribute_new_session_key(key);
let mut collect_garbage = stream::unfold((), move |_| {
futures_timer::Delay::new(COLLECT_GARBAGE_INTERVAL).map(|_| Some(((), ())))
}).map(drop);
loop {
let message = match future::select(receiver.next(), collect_garbage.next()).await {
Either::Left((None, _)) | Either::Right((None, _)) => break,
Either::Left((Some(message), _)) => message,
Either::Right(_) => {
protocol_handler.collect_garbage();
continue
} }
}; }
let new_leaf_actions = self.gossip_handle.new_local_leaf(
relay_parent,
crate::legacy::gossip::MessageValidationData { authorities },
);
new_leaf_actions.perform(&self.gossip_handle);
self.protocol_handler.consensus_instances.insert(
relay_parent,
ConsensusNetworkingInstance {
statement_table: table.clone(),
relay_parent,
attestation_topic: crate::legacy::gossip::attestation_topic(relay_parent),
_drop_signal: signal,
},
);
// glue the incoming messages, shared table, and validation
// work together.
let _ = self.executor.spawn(statement_import_loop(
relay_parent,
table,
self.api.clone(),
self.gossip_handle.clone(),
self.background_to_main_sender.clone(),
exit,
));
}
fn handle_service_message(&mut self, message: ServiceToWorkerMsg) {
match message { match message {
ServiceToWorkerMsg::PeerConnected(remote, roles) => { ServiceToWorkerMsg::PeerConnected(remote, roles) => {
protocol_handler.on_connect(remote, roles); self.protocol_handler.on_connect(remote, roles);
} }
ServiceToWorkerMsg::PeerDisconnected(remote) => { ServiceToWorkerMsg::PeerDisconnected(remote) => {
protocol_handler.on_disconnect(remote); self.protocol_handler.on_disconnect(remote);
} }
ServiceToWorkerMsg::PeerMessage(remote, messages) => { ServiceToWorkerMsg::PeerMessage(remote, messages) => {
protocol_handler.on_raw_messages(remote, messages) self.protocol_handler.on_raw_messages(remote, messages)
} }
ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities, router_sender) => { ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities) => {
// glue: let gossip know about our new local leaf. self.build_consensus_networking(table, authorities);
let relay_parent = table.consensus_parent_hash().clone();
let (signal, exit) = exit_future::signal();
let router = Router {
inner: Arc::new(RouterInner { relay_parent, sender: sender.clone() }),
};
let key = table.session_key();
if let Some(key) = key {
if let InsertedRecentKey::New(_) = local_keys.insert(key.clone()) {
protocol_handler.distribute_new_session_key(key);
}
}
let new_leaf_actions = gossip_handle.new_local_leaf(
relay_parent,
crate::legacy::gossip::MessageValidationData { authorities },
);
new_leaf_actions.perform(&gossip_handle);
consensus_instances.insert(relay_parent, ConsensusNetworkingInstance {
statement_table: table.clone(),
relay_parent,
attestation_topic: crate::legacy::gossip::attestation_topic(relay_parent),
_drop_signal: signal,
});
let weak_router = Arc::downgrade(&router.inner);
// glue the incoming messages, shared table, and validation
// work together.
let _ = executor.spawn(statement_import_loop(
relay_parent,
table,
api.clone(),
weak_router,
gossip_handle.clone(),
exit,
executor.clone(),
));
let _ = router_sender.send(router);
} }
ServiceToWorkerMsg::DropConsensusNetworking(relay_parent) => { ServiceToWorkerMsg::DropConsensusNetworking(relay_parent) => {
consensus_instances.remove(&relay_parent); self.protocol_handler.drop_consensus_networking(&relay_parent);
} }
ServiceToWorkerMsg::SubmitValidatedCollation(relay_parent, receipt, pov_block, chunks) => { ServiceToWorkerMsg::SubmitValidatedCollation(receipt, pov_block, chunks) => {
let instance = match consensus_instances.get(&relay_parent) { let relay_parent = receipt.relay_parent;
None => continue, let instance = match self.protocol_handler.consensus_instances.get(&relay_parent) {
None => return,
Some(instance) => instance, Some(instance) => instance,
}; };
@@ -778,21 +873,21 @@ async fn worker_loop<Api, Sp>(
receipt, receipt,
pov_block, pov_block,
chunks, chunks,
&gossip_handle, &self.gossip_handle,
); );
} }
ServiceToWorkerMsg::FetchPoVBlock(_relay_parent, _candidate, _sender) => { ServiceToWorkerMsg::FetchPoVBlock(_candidate, _sender) => {
// TODO https://github.com/paritytech/polkadot/issues/742: // TODO https://github.com/paritytech/polkadot/issues/742:
// create a filter on gossip for it and send to sender. // create a filter on gossip for it and send to sender.
} }
ServiceToWorkerMsg::FetchErasureChunk(candidate_hash, validator_index, sender) => { ServiceToWorkerMsg::FetchErasureChunk(candidate_hash, validator_index, mut sender) => {
let topic = crate::erasure_coding_topic(&candidate_hash); let topic = crate::erasure_coding_topic(&candidate_hash);
// for every erasure-root, relay-parent pair, there should only be one // for every erasure-root, relay-parent pair, there should only be one
// valid chunk with the given index. // valid chunk with the given index.
// //
// so we only care about the first item of the filtered stream. // so we only care about the first item of the filtered stream.
let get_msg = gossip_handle.gossip_messages_for(topic) let get_msg = self.gossip_handle.gossip_messages_for(topic)
.filter_map(move |(msg, _)| { .filter_map(move |(msg, _)| {
future::ready(match msg { future::ready(match msg {
GossipMessage::ErasureChunk(chunk) => GossipMessage::ErasureChunk(chunk) =>
@@ -809,14 +904,16 @@ async fn worker_loop<Api, Sp>(
"gossip message streams do not conclude early; qed" "gossip message streams do not conclude early; qed"
)); ));
let _ = executor.spawn(async move { let _ = self.executor.spawn(async move {
let chunk = get_msg.await; let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await;
let _ = sender.send(chunk); if let Either::Left((chunk, _)) = res {
let _ = sender.send(chunk);
}
}); });
} }
ServiceToWorkerMsg::DistributeErasureChunk(candidate_hash, erasure_chunk) => { ServiceToWorkerMsg::DistributeErasureChunk(candidate_hash, erasure_chunk) => {
let topic = crate::erasure_coding_topic(&candidate_hash); let topic = crate::erasure_coding_topic(&candidate_hash);
gossip_handle.gossip_message( self.gossip_handle.gossip_message(
topic, topic,
GossipMessage::ErasureChunk(ErasureChunkMessage { GossipMessage::ErasureChunk(ErasureChunkMessage {
chunk: erasure_chunk, chunk: erasure_chunk,
@@ -826,20 +923,20 @@ async fn worker_loop<Api, Sp>(
} }
ServiceToWorkerMsg::AwaitCollation(relay_parent, para_id, sender) => { ServiceToWorkerMsg::AwaitCollation(relay_parent, para_id, sender) => {
debug!(target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", para_id, relay_parent); debug!(target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", para_id, relay_parent);
protocol_handler.await_collation(relay_parent, para_id, sender) self.protocol_handler.await_collation(relay_parent, para_id, sender)
} }
ServiceToWorkerMsg::NoteBadCollator(collator) => { ServiceToWorkerMsg::NoteBadCollator(collator) => {
protocol_handler.note_bad_collator(collator); self.protocol_handler.note_bad_collator(collator);
} }
ServiceToWorkerMsg::RegisterAvailabilityStore(store) => { ServiceToWorkerMsg::RegisterAvailabilityStore(store) => {
gossip_handle.register_availability_store(store); self.gossip_handle.register_availability_store(store);
} }
ServiceToWorkerMsg::OurCollation(targets, collation) => { ServiceToWorkerMsg::OurCollation(targets, collation) => {
protocol_handler.distribute_our_collation(targets, collation); self.protocol_handler.distribute_our_collation(targets, collation);
} }
ServiceToWorkerMsg::ListenCheckedStatements(relay_parent, sender) => { ServiceToWorkerMsg::ListenCheckedStatements(relay_parent, sender) => {
let topic = crate::legacy::gossip::attestation_topic(relay_parent); let topic = crate::legacy::gossip::attestation_topic(relay_parent);
let checked_messages = gossip_handle.gossip_messages_for(topic) let checked_messages = self.gossip_handle.gossip_messages_for(topic)
.filter_map(|msg| match msg.0 { .filter_map(|msg| match msg.0 {
GossipMessage::Statement(s) => future::ready(Some(s.signed_statement)), GossipMessage::Statement(s) => future::ready(Some(s.signed_statement)),
_ => future::ready(None), _ => future::ready(None),
@@ -848,8 +945,72 @@ async fn worker_loop<Api, Sp>(
let _ = sender.send(checked_messages); let _ = sender.send(checked_messages);
} }
#[cfg(test)]
ServiceToWorkerMsg::Synchronize(callback) => {
(callback)(&mut self.protocol_handler)
}
} }
} }
fn handle_background_message(&mut self, message: BackgroundToWorkerMsg) {
match message {
BackgroundToWorkerMsg::Spawn(task) => {
let _ = self.executor.spawn(task);
}
}
}
async fn main_loop(&mut self) {
const COLLECT_GARBAGE_INTERVAL: Duration = Duration::from_secs(29);
let mut collect_garbage = stream::unfold((), move |_| {
futures_timer::Delay::new(COLLECT_GARBAGE_INTERVAL).map(|_| Some(((), ())))
}).map(drop);
loop {
futures::select! {
_do_collect = collect_garbage.next() => {
self.protocol_handler.collect_garbage();
}
service_msg = self.service_receiver.next() => match service_msg {
Some(msg) => self.handle_service_message(msg),
None => return,
},
background_msg = self.background_receiver.next() => match background_msg {
Some(msg) => self.handle_background_message(msg),
None => return,
},
}
}
}
}
async fn worker_loop<Api, Sp>(
config: Config,
service: Arc<dyn NetworkServiceOps>,
gossip_handle: impl GossipOps,
api: Arc<Api>,
receiver: mpsc::Receiver<ServiceToWorkerMsg>,
executor: Sp,
) where
Api: ProvideRuntimeApi<Block> + Send + Sync + 'static,
Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
Sp: Spawn + Clone,
{
const BACKGROUND_TO_MAIN_BUF: usize = 16;
let (background_tx, background_rx) = mpsc::channel(BACKGROUND_TO_MAIN_BUF);
let mut worker = Worker {
protocol_handler: ProtocolHandler::new(service, config),
api,
executor,
gossip_handle,
background_to_main_sender: background_tx,
background_receiver: background_rx,
service_receiver: receiver,
};
worker.main_loop().await
} }
// A unique trace for valid statements issued by a validator. // A unique trace for valid statements issued by a validator.
@@ -917,10 +1078,9 @@ async fn statement_import_loop<Api>(
relay_parent: Hash, relay_parent: Hash,
table: Arc<SharedTable>, table: Arc<SharedTable>,
api: Arc<Api>, api: Arc<Api>,
weak_router: Weak<RouterInner>, gossip_handle: impl GossipOps,
gossip_handle: RegisteredMessageValidator, mut to_worker: mpsc::Sender<BackgroundToWorkerMsg>,
mut exit: exit_future::Exit, mut exit: exit_future::Exit,
executor: impl Spawn,
) where ) where
Api: ProvideRuntimeApi<Block> + Send + Sync + 'static, Api: ProvideRuntimeApi<Block> + Send + Sync + 'static,
Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>, Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
@@ -972,14 +1132,16 @@ async fn statement_import_loop<Api>(
statements.insert(0, statement); statements.insert(0, statement);
let producers: Vec<_> = { let producers: Vec<_> = {
// create a temporary router handle for importing all of these statements // TODO: fetch these from gossip.
let temp_router = match weak_router.upgrade() { // https://github.com/paritytech/polkadot/issues/742
None => break, fn ignore_pov_fetch_requests(_: &AbridgedCandidateReceipt)
Some(inner) => Router { inner }, -> future::Pending<Result<PoVBlock, std::io::Error>>
}; {
future::pending()
}
table.import_remote_statements( table.import_remote_statements(
&temp_router, &ignore_pov_fetch_requests,
statements.iter().cloned(), statements.iter().cloned(),
) )
}; };
@@ -1015,7 +1177,14 @@ async fn statement_import_loop<Api>(
}); });
let work = future::select(work.boxed(), exit.clone()).map(drop); let work = future::select(work.boxed(), exit.clone()).map(drop);
let _ = executor.spawn(work); if let Err(_) = to_worker.send(
BackgroundToWorkerMsg::Spawn(work.boxed())
).await {
// can fail only if remote has hung up - worker is dead,
// we should die too. this is defensive, since the exit future
// would fire shortly anyway.
return
}
} }
} }
} }
@@ -1030,7 +1199,7 @@ fn distribute_validated_collation(
receipt: AbridgedCandidateReceipt, receipt: AbridgedCandidateReceipt,
pov_block: PoVBlock, pov_block: PoVBlock,
chunks: (ValidatorIndex, Vec<ErasureChunk>), chunks: (ValidatorIndex, Vec<ErasureChunk>),
gossip_handle: &RegisteredMessageValidator, gossip_handle: &impl GossipOps,
) { ) {
// produce a signed statement. // produce a signed statement.
let hash = receipt.hash(); let hash = receipt.hash();
@@ -1144,7 +1313,7 @@ impl Service {
} }
impl ParachainNetwork for Service { impl ParachainNetwork for Service {
type Error = future::Either<mpsc::SendError, oneshot::Canceled>; type Error = mpsc::SendError;
type TableRouter = Router; type TableRouter = Router;
type BuildTableRouter = Pin<Box<dyn Future<Output=Result<Router,Self::Error>> + Send>>; type BuildTableRouter = Pin<Box<dyn Future<Output=Result<Router,Self::Error>> + Send>>;
@@ -1155,14 +1324,19 @@ impl ParachainNetwork for Service {
) -> Self::BuildTableRouter { ) -> Self::BuildTableRouter {
let authorities = authorities.to_vec(); let authorities = authorities.to_vec();
let mut sender = self.sender.clone(); let mut sender = self.sender.clone();
let relay_parent = table.consensus_parent_hash().clone();
let (tx, rx) = oneshot::channel();
Box::pin(async move { Box::pin(async move {
sender.send( sender.send(
ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities, tx) ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities)
).map_err(future::Either::Left).await?; ).await?;
rx.map_err(future::Either::Right).await Ok(Router {
inner: Arc::new(RouterInner {
relay_parent,
sender,
})
})
}) })
} }
} }
@@ -1228,6 +1402,8 @@ pub enum RouterError {
Canceled(oneshot::Canceled), Canceled(oneshot::Canceled),
#[display(fmt = "Could not reach worker with request: {}", _0)] #[display(fmt = "Could not reach worker with request: {}", _0)]
SendError(mpsc::SendError), SendError(mpsc::SendError),
#[display(fmt = "Provided candidate receipt does not have expected relay parent {}", _0)]
IncorrectRelayParent(Hash),
} }
impl TableRouter for Router { impl TableRouter for Router {
@@ -1241,8 +1417,13 @@ impl TableRouter for Router {
pov_block: PoVBlock, pov_block: PoVBlock,
chunks: (ValidatorIndex, &[ErasureChunk]), chunks: (ValidatorIndex, &[ErasureChunk]),
) -> Self::SendLocalCollation { ) -> Self::SendLocalCollation {
if receipt.relay_parent != self.inner.relay_parent {
return Box::pin(
future::ready(Err(RouterError::IncorrectRelayParent(self.inner.relay_parent)))
);
}
let message = ServiceToWorkerMsg::SubmitValidatedCollation( let message = ServiceToWorkerMsg::SubmitValidatedCollation(
self.inner.relay_parent.clone(),
receipt, receipt,
pov_block, pov_block,
(chunks.0, chunks.1.to_vec()), (chunks.0, chunks.1.to_vec()),
@@ -1254,9 +1435,14 @@ impl TableRouter for Router {
} }
fn fetch_pov_block(&self, candidate: &AbridgedCandidateReceipt) -> Self::FetchValidationProof { fn fetch_pov_block(&self, candidate: &AbridgedCandidateReceipt) -> Self::FetchValidationProof {
if candidate.relay_parent != self.inner.relay_parent {
return Box::pin(
future::ready(Err(RouterError::IncorrectRelayParent(self.inner.relay_parent)))
);
}
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let message = ServiceToWorkerMsg::FetchPoVBlock( let message = ServiceToWorkerMsg::FetchPoVBlock(
self.inner.relay_parent.clone(),
candidate.clone(), candidate.clone(),
tx, tx,
); );
@@ -1268,24 +1454,3 @@ impl TableRouter for Router {
}) })
} }
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn router_inner_drop_sends_worker_message() {
let parent = [1; 32].into();
let (sender, mut receiver) = mpsc::channel(0);
drop(RouterInner {
relay_parent: parent,
sender,
});
match receiver.try_next() {
Ok(Some(ServiceToWorkerMsg::DropConsensusNetworking(x))) => assert_eq!(parent, x),
_ => panic!("message not sent"),
}
}
}
+573
View File
@@ -0,0 +1,573 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//! Tests for the protocol.
use super::*;
use parking_lot::Mutex;
use polkadot_primitives::{Block, Header, BlockId};
use polkadot_primitives::parachain::{
Id as ParaId, Chain, DutyRoster, ParachainHost, ValidatorId,
Retriable, CollatorId, AbridgedCandidateReceipt,
GlobalValidationSchedule, LocalValidationData, ErasureChunk,
};
use polkadot_validation::SharedTable;
use av_store::{Store as AvailabilityStore, ErasureNetworking};
use sc_network_gossip::TopicNotification;
use sp_blockchain::Result as ClientResult;
use sp_api::{ApiRef, Core, RuntimeVersion, StorageProof, ApiErrorExt, ApiExt, ProvideRuntimeApi};
use sp_runtime::traits::{Block as BlockT, HashFor, NumberFor};
use sp_state_machine::ChangesTrieState;
use sp_core::{crypto::Pair, NativeOrEncoded, ExecutionContext};
use sp_keyring::Sr25519Keyring;
use futures::executor::LocalPool;
use futures::task::LocalSpawnExt;
#[derive(Default)]
struct MockNetworkOps {
recorded: Mutex<Recorded>,
}
#[derive(Default)]
struct Recorded {
peer_reputations: HashMap<PeerId, i32>,
notifications: Vec<(PeerId, Message)>,
}
// Test setup registers receivers of gossip messages as well as signals that
// fire when they are taken.
type GossipStreamEntry = (mpsc::UnboundedReceiver<TopicNotification>, oneshot::Sender<()>);
#[derive(Default, Clone)]
struct MockGossip {
inner: Arc<Mutex<HashMap<Hash, GossipStreamEntry>>>,
}
impl MockGossip {
fn add_gossip_stream(&self, topic: Hash)
-> (mpsc::UnboundedSender<TopicNotification>, oneshot::Receiver<()>)
{
let (tx, rx) = mpsc::unbounded();
let (o_tx, o_rx) = oneshot::channel();
self.inner.lock().insert(topic, (rx, o_tx));
(tx, o_rx)
}
fn contains_listener(&self, topic: &Hash) -> bool {
self.inner.lock().contains_key(topic)
}
}
impl NetworkServiceOps for MockNetworkOps {
fn report_peer(&self, peer: PeerId, value: sc_network::ReputationChange) {
let mut recorded = self.recorded.lock();
let total_rep = recorded.peer_reputations.entry(peer).or_insert(0);
*total_rep = total_rep.saturating_add(value.value);
}
fn write_notification(
&self,
peer: PeerId,
engine_id: ConsensusEngineId,
notification: Vec<u8>,
) {
assert_eq!(engine_id, POLKADOT_ENGINE_ID);
let message = Message::decode(&mut &notification[..]).expect("invalid notification");
self.recorded.lock().notifications.push((peer, message));
}
}
impl crate::legacy::GossipService for MockGossip {
fn gossip_messages_for(&self, topic: Hash) -> crate::legacy::GossipMessageStream {
crate::legacy::GossipMessageStream::new(match self.inner.lock().remove(&topic) {
None => Box::pin(stream::empty()),
Some((rx, o_rx)) => {
let _ = o_rx.send(());
Box::pin(rx)
}
})
}
fn gossip_message(&self, _topic: Hash, _message: GossipMessage) {
}
fn send_message(&self, _who: PeerId, _message: GossipMessage) {
}
}
impl GossipOps for MockGossip {
fn new_local_leaf(
&self,
_relay_parent: Hash,
_validation_data: crate::legacy::gossip::MessageValidationData,
) -> crate::legacy::gossip::NewLeafActions {
crate::legacy::gossip::NewLeafActions::new()
}
fn register_availability_store(
&self,
_store: av_store::Store,
) {
}
}
#[derive(Default)]
struct ApiData {
validators: Vec<ValidatorId>,
duties: Vec<Chain>,
active_parachains: Vec<(ParaId, Option<(CollatorId, Retriable)>)>,
}
#[derive(Default, Clone)]
struct TestApi {
data: Arc<Mutex<ApiData>>,
}
#[derive(Default)]
struct RuntimeApi {
data: Arc<Mutex<ApiData>>,
}
impl ProvideRuntimeApi<Block> for TestApi {
type Api = RuntimeApi;
fn runtime_api<'a>(&'a self) -> ApiRef<'a, Self::Api> {
RuntimeApi { data: self.data.clone() }.into()
}
}
impl Core<Block> for RuntimeApi {
fn Core_version_runtime_api_impl(
&self,
_: &BlockId,
_: ExecutionContext,
_: Option<()>,
_: Vec<u8>,
) -> ClientResult<NativeOrEncoded<RuntimeVersion>> {
unimplemented!("Not required for testing!")
}
fn Core_execute_block_runtime_api_impl(
&self,
_: &BlockId,
_: ExecutionContext,
_: Option<Block>,
_: Vec<u8>,
) -> ClientResult<NativeOrEncoded<()>> {
unimplemented!("Not required for testing!")
}
fn Core_initialize_block_runtime_api_impl(
&self,
_: &BlockId,
_: ExecutionContext,
_: Option<&Header>,
_: Vec<u8>,
) -> ClientResult<NativeOrEncoded<()>> {
unimplemented!("Not required for testing!")
}
}
impl ApiErrorExt for RuntimeApi {
type Error = sp_blockchain::Error;
}
impl ApiExt<Block> for RuntimeApi {
type StateBackend = sp_state_machine::InMemoryBackend<sp_api::HashFor<Block>>;
fn map_api_result<F: FnOnce(&Self) -> Result<R, E>, R, E>(
&self,
_: F
) -> Result<R, E> {
unimplemented!("Not required for testing!")
}
fn runtime_version_at(&self, _: &BlockId) -> ClientResult<RuntimeVersion> {
unimplemented!("Not required for testing!")
}
fn record_proof(&mut self) { }
fn extract_proof(&mut self) -> Option<StorageProof> {
None
}
fn into_storage_changes(
&self,
_: &Self::StateBackend,
_: Option<&ChangesTrieState<HashFor<Block>, NumberFor<Block>>>,
_: <Block as sp_api::BlockT>::Hash,
) -> std::result::Result<sp_api::StorageChanges<Self::StateBackend, Block>, String>
where Self: Sized
{
unimplemented!("Not required for testing!")
}
}
impl ParachainHost<Block> for RuntimeApi {
fn ParachainHost_validators_runtime_api_impl(
&self,
_at: &BlockId,
_: ExecutionContext,
_: Option<()>,
_: Vec<u8>,
) -> ClientResult<NativeOrEncoded<Vec<ValidatorId>>> {
Ok(NativeOrEncoded::Native(self.data.lock().validators.clone()))
}
fn ParachainHost_duty_roster_runtime_api_impl(
&self,
_at: &BlockId,
_: ExecutionContext,
_: Option<()>,
_: Vec<u8>,
) -> ClientResult<NativeOrEncoded<DutyRoster>> {
Ok(NativeOrEncoded::Native(DutyRoster {
validator_duty: self.data.lock().duties.clone(),
}))
}
fn ParachainHost_active_parachains_runtime_api_impl(
&self,
_at: &BlockId,
_: ExecutionContext,
_: Option<()>,
_: Vec<u8>,
) -> ClientResult<NativeOrEncoded<Vec<(ParaId, Option<(CollatorId, Retriable)>)>>> {
Ok(NativeOrEncoded::Native(self.data.lock().active_parachains.clone()))
}
fn ParachainHost_parachain_code_runtime_api_impl(
&self,
_at: &BlockId,
_: ExecutionContext,
_: Option<ParaId>,
_: Vec<u8>,
) -> ClientResult<NativeOrEncoded<Option<Vec<u8>>>> {
Ok(NativeOrEncoded::Native(Some(Vec::new())))
}
fn ParachainHost_global_validation_schedule_runtime_api_impl(
&self,
_at: &BlockId,
_: ExecutionContext,
_: Option<()>,
_: Vec<u8>,
) -> ClientResult<NativeOrEncoded<GlobalValidationSchedule>> {
Ok(NativeOrEncoded::Native(Default::default()))
}
fn ParachainHost_local_validation_data_runtime_api_impl(
&self,
_at: &BlockId,
_: ExecutionContext,
_: Option<ParaId>,
_: Vec<u8>,
) -> ClientResult<NativeOrEncoded<Option<LocalValidationData>>> {
Ok(NativeOrEncoded::Native(Some(Default::default())))
}
fn ParachainHost_get_heads_runtime_api_impl(
&self,
_at: &BlockId,
_: ExecutionContext,
_extrinsics: Option<Vec<<Block as BlockT>::Extrinsic>>,
_: Vec<u8>,
) -> ClientResult<NativeOrEncoded<Option<Vec<AbridgedCandidateReceipt>>>> {
Ok(NativeOrEncoded::Native(Some(Vec::new())))
}
}
impl super::Service {
async fn connect_peer(&mut self, peer: PeerId, roles: Roles) {
self.sender.send(ServiceToWorkerMsg::PeerConnected(peer, roles)).await.unwrap();
}
async fn peer_message(&mut self, peer: PeerId, message: Message) {
let bytes = message.encode().into();
self.sender.send(ServiceToWorkerMsg::PeerMessage(peer, vec![bytes])).await.unwrap();
}
async fn disconnect_peer(&mut self, peer: PeerId) {
self.sender.send(ServiceToWorkerMsg::PeerDisconnected(peer)).await.unwrap();
}
async fn synchronize<T: Send + 'static>(
&mut self,
callback: impl FnOnce(&mut ProtocolHandler) -> T + Send + 'static,
) -> T {
let (tx, rx) = oneshot::channel();
let msg = ServiceToWorkerMsg::Synchronize(Box::new(move |proto| {
let res = callback(proto);
if let Err(_) = tx.send(res) {
log::warn!(target: "p_net", "Failed to send synchronization result");
}
}));
self.sender.send(msg).await.expect("Worker thread unexpectedly hung up");
rx.await.expect("Worker thread failed to send back result")
}
}
fn test_setup(config: Config) -> (
Service,
MockGossip,
LocalPool,
impl Future<Output = ()> + 'static,
) {
let pool = LocalPool::new();
let network_ops = Arc::new(MockNetworkOps::default());
let mock_gossip = MockGossip::default();
let (worker_tx, worker_rx) = mpsc::channel(0);
let api = Arc::new(TestApi::default());
let worker_task = worker_loop(
config,
network_ops.clone(),
mock_gossip.clone(),
api.clone(),
worker_rx,
pool.spawner(),
);
let service = Service {
sender: worker_tx,
network_service: network_ops,
};
(service, mock_gossip, pool, worker_task)
}
#[test]
fn router_inner_drop_sends_worker_message() {
let parent = [1; 32].into();
let (sender, mut receiver) = mpsc::channel(0);
drop(RouterInner {
relay_parent: parent,
sender,
});
match receiver.try_next() {
Ok(Some(ServiceToWorkerMsg::DropConsensusNetworking(x))) => assert_eq!(parent, x),
_ => panic!("message not sent"),
}
}
#[test]
fn worker_task_shuts_down_when_sender_dropped() {
let (service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
drop(service);
let _ = pool.run_until(worker_task);
}
#[test]
fn consensus_instances_cleaned_up() {
let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
let relay_parent = [0; 32].into();
let authorities = Vec::new();
let table = Arc::new(SharedTable::new(
Vec::new(),
HashMap::new(),
None,
relay_parent,
AvailabilityStore::new_in_memory(service.clone()),
None,
));
pool.spawner().spawn_local(worker_task).unwrap();
let router = pool.run_until(
service.build_table_router(table, &authorities)
).unwrap();
drop(router);
assert!(pool.run_until(service.synchronize(move |proto| {
!proto.consensus_instances.contains_key(&relay_parent)
})));
}
#[test]
fn validator_peer_cleaned_up() {
let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
let peer = PeerId::random();
let validator_key = Sr25519Keyring::Alice.pair();
let validator_id = ValidatorId::from(validator_key.public());
pool.spawner().spawn_local(worker_task).unwrap();
pool.run_until(async move {
service.connect_peer(peer.clone(), Roles::AUTHORITY).await;
service.peer_message(peer.clone(), Message::Status(Status {
version: VERSION,
collating_for: None,
})).await;
service.peer_message(peer.clone(), Message::ValidatorId(validator_id.clone())).await;
let p = peer.clone();
let v = validator_id.clone();
let (peer_has_key, reverse_lookup) = service.synchronize(move |proto| {
let peer_has_key = proto.peers.get(&p).map_or(
false,
|p_data| p_data.session_keys.as_slice().contains(&v),
);
let reverse_lookup = proto.connected_validators.get(&v).map_or(
false,
|reps| reps.contains(&p),
);
(peer_has_key, reverse_lookup)
}).await;
assert!(peer_has_key);
assert!(reverse_lookup);
service.disconnect_peer(peer.clone()).await;
let p = peer.clone();
let v = validator_id.clone();
let (peer_removed, rev_removed) = service.synchronize(move |proto| {
let peer_removed = !proto.peers.contains_key(&p);
let reverse_mapping_removed = !proto.connected_validators.contains_key(&v);
(peer_removed, reverse_mapping_removed)
}).await;
assert!(peer_removed);
assert!(rev_removed);
});
}
#[test]
fn validator_key_spillover_cleaned() {
let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
let peer = PeerId::random();
let make_validator_id = |ring: Sr25519Keyring| ValidatorId::from(ring.public());
// We will push 1 extra beyond what is normally kept.
assert_eq!(RECENT_SESSIONS, 3);
let key_a = make_validator_id(Sr25519Keyring::Alice);
let key_b = make_validator_id(Sr25519Keyring::Bob);
let key_c = make_validator_id(Sr25519Keyring::Charlie);
let key_d = make_validator_id(Sr25519Keyring::Dave);
let keys = vec![key_a, key_b, key_c, key_d];
pool.spawner().spawn_local(worker_task).unwrap();
pool.run_until(async move {
service.connect_peer(peer.clone(), Roles::AUTHORITY).await;
service.peer_message(peer.clone(), Message::Status(Status {
version: VERSION,
collating_for: None,
})).await;
for key in &keys {
service.peer_message(peer.clone(), Message::ValidatorId(key.clone())).await;
}
let p = peer.clone();
let active_keys = keys[1..].to_vec();
let discarded_key = keys[0].clone();
assert!(service.synchronize(move |proto| {
let active_correct = proto.peers.get(&p).map_or(false, |p_data| {
p_data.session_keys.as_slice() == &active_keys[..]
});
let active_lookup = active_keys.iter().all(|k| {
proto.connected_validators.get(&k).map_or(false, |m| m.contains(&p))
});
let discarded = !proto.connected_validators.contains_key(&discarded_key);
active_correct && active_lookup && discarded
}).await);
});
}
#[test]
fn erasure_fetch_drop_also_drops_gossip_sender() {
let (service, gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
let candidate_hash = [1; 32].into();
let expected_index = 1;
let spawner = pool.spawner();
spawner.spawn_local(worker_task).unwrap();
let topic = crate::erasure_coding_topic(&candidate_hash);
let (mut gossip_tx, gossip_taken_rx) = gossip.add_gossip_stream(topic);
let test_work = async move {
let chunk_listener = service.fetch_erasure_chunk(
&candidate_hash,
expected_index,
);
// spawn an abortable handle to the chunk listener future.
// we will wait until this future has proceeded enough to start grabbing
// messages from gossip, and then we will abort the future.
let (chunk_listener, abort_handle) = future::abortable(chunk_listener);
let handle = spawner.spawn_with_handle(chunk_listener).unwrap();
gossip_taken_rx.await.unwrap();
// gossip listener was taken. and is active.
assert!(!gossip.contains_listener(&topic));
assert!(!gossip_tx.is_closed());
abort_handle.abort();
// we must `await` this, otherwise context may never transfer over
// to the spawned `Abortable` future.
assert!(handle.await.is_err());
loop {
// if dropping the sender leads to the gossip listener
// being cleaned up, we will eventually be unable to send a message
// on the sender.
if gossip_tx.is_closed() { break }
let fake_chunk = GossipMessage::ErasureChunk(
crate::legacy::gossip::ErasureChunkMessage {
chunk: ErasureChunk {
chunk: vec![],
index: expected_index + 1,
proof: vec![],
},
candidate_hash,
}
).encode();
match gossip_tx.send(TopicNotification { message: fake_chunk, sender: None }).await {
Err(e) => { assert!(e.is_disconnected()); break },
Ok(_) => continue,
}
}
};
pool.run_until(test_work);
}
+1 -3
View File
@@ -197,7 +197,6 @@ impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, Tx
parent_id, parent_id,
client, client,
transaction_pool, transaction_pool,
table,
inherent_data: Some(inherent_data), inherent_data: Some(inherent_data),
inherent_digests, inherent_digests,
// leave some time for the proposal finalisation // leave some time for the proposal finalisation
@@ -216,7 +215,7 @@ impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, Tx
Delay::new(enough_candidates).await; Delay::new(enough_candidates).await;
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let proposed_candidates = data.table.proposed_set(); let proposed_candidates = table.proposed_set();
data.propose_with(proposed_candidates) data.propose_with(proposed_candidates)
}) })
.await? .await?
@@ -235,7 +234,6 @@ struct CreateProposalData<Client, TxPool, Backend> {
parent_id: BlockId, parent_id: BlockId,
client: Arc<Client>, client: Arc<Client>,
transaction_pool: Arc<TxPool>, transaction_pool: Arc<TxPool>,
table: Arc<crate::SharedTable>,
inherent_data: Option<InherentData>, inherent_data: Option<InherentData>,
inherent_digests: DigestFor<Block>, inherent_digests: DigestFor<Block>,
deadline: Instant, deadline: Instant,
+40 -36
View File
@@ -34,7 +34,7 @@ use futures::channel::oneshot;
use log::{warn, debug}; use log::{warn, debug};
use bitvec::bitvec; use bitvec::bitvec;
use super::{GroupInfo, TableRouter}; use super::GroupInfo;
use self::includable::IncludabilitySender; use self::includable::IncludabilitySender;
use primitives::Pair; use primitives::Pair;
use sp_api::ProvideRuntimeApi; use sp_api::ProvideRuntimeApi;
@@ -135,14 +135,14 @@ impl SharedTableInner {
// //
// the statement producer, if any, will produce only statements concerning the same candidate // the statement producer, if any, will produce only statements concerning the same candidate
// as the one just imported // as the one just imported
fn import_remote_statement<R: TableRouter>( fn import_remote_statement<Fetch>(
&mut self, &mut self,
context: &TableContext, context: &TableContext,
router: &R, fetch_pov_block: impl Fn(&AbridgedCandidateReceipt) -> Fetch,
statement: table::SignedStatement, statement: table::SignedStatement,
max_block_data_size: Option<u64>, max_block_data_size: Option<u64>,
) -> Option<ParachainWork< ) -> Option<ParachainWork<
R::FetchValidationProof Fetch,
>> { >> {
let summary = self.table.import_statement(context, statement)?; let summary = self.table.import_statement(context, statement)?;
self.update_trackers(&summary.candidate, context); self.update_trackers(&summary.candidate, context);
@@ -175,7 +175,7 @@ impl SharedTableInner {
None None
} }
Some(candidate) => { Some(candidate) => {
let fetch = router.fetch_pov_block(candidate); let fetch = fetch_pov_block(candidate);
Some(Work { Some(Work {
candidate_receipt: candidate.clone(), candidate_receipt: candidate.clone(),
@@ -446,14 +446,19 @@ impl SharedTable {
/// ///
/// The ParachainWork, if any, will produce only statements concerning the same candidate /// The ParachainWork, if any, will produce only statements concerning the same candidate
/// as the one just imported /// as the one just imported
pub fn import_remote_statement<R: TableRouter>( pub fn import_remote_statement<Fetch>(
&self, &self,
router: &R, fetch_pov_block: impl Fn(&AbridgedCandidateReceipt) -> Fetch,
statement: table::SignedStatement, statement: table::SignedStatement,
) -> Option<ParachainWork< ) -> Option<ParachainWork<
R::FetchValidationProof, Fetch,
>> { >> {
self.inner.lock().import_remote_statement(&*self.context, router, statement, self.max_block_data_size) self.inner.lock().import_remote_statement(
&*self.context,
fetch_pov_block,
statement,
self.max_block_data_size,
)
} }
/// Import many statements at once. /// Import many statements at once.
@@ -464,18 +469,26 @@ impl SharedTable {
/// ///
/// The ParachainWork, if any, will produce only statements concerning the same candidate /// The ParachainWork, if any, will produce only statements concerning the same candidate
/// as the one just imported /// as the one just imported
pub fn import_remote_statements<R, I, U>(&self, router: &R, iterable: I) -> U pub fn import_remote_statements<Fetch, I, U>(
&self,
fetch_pov_block: impl Fn(&AbridgedCandidateReceipt) -> Fetch,
iterable: I,
) -> U
where where
R: TableRouter,
I: IntoIterator<Item=table::SignedStatement>, I: IntoIterator<Item=table::SignedStatement>,
U: ::std::iter::FromIterator<Option<ParachainWork< U: ::std::iter::FromIterator<Option<ParachainWork<
R::FetchValidationProof, Fetch,
>>>, >>>,
{ {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
iterable.into_iter().map(move |statement| { iterable.into_iter().map(move |statement| {
inner.import_remote_statement(&*self.context, router, statement, self.max_block_data_size) inner.import_remote_statement(
&*self.context,
&fetch_pov_block,
statement,
self.max_block_data_size,
)
}).collect() }).collect()
} }
@@ -562,7 +575,7 @@ impl SharedTable {
self.inner.lock().table.get_misbehavior().clone() self.inner.lock().table.get_misbehavior().clone()
} }
/// Track includability of a given set of candidate hashes. /// Track includability of a given set of candidate hashes.
pub fn track_includability<I>(&self, iterable: I) -> oneshot::Receiver<()> pub fn track_includability<I>(&self, iterable: I) -> oneshot::Receiver<()>
where I: IntoIterator<Item=Hash> where I: IntoIterator<Item=Hash>
{ {
@@ -626,23 +639,14 @@ mod tests {
) {} ) {}
} }
#[derive(Clone)] fn lazy_fetch_pov()
struct DummyRouter; -> Box<
impl TableRouter for DummyRouter { dyn Fn(&AbridgedCandidateReceipt) -> future::Ready<
type Error = ::std::io::Error; Result<PoVBlock, std::io::Error>
type SendLocalCollation = future::Ready<Result<(),Self::Error>>; >
type FetchValidationProof = future::Ready<Result<PoVBlock,Self::Error>>; >
{
fn local_collation( Box::new(|_| future::ok(pov_block_with_data(vec![1, 2, 3, 4, 5])))
&self,
_candidate: AbridgedCandidateReceipt,
_pov_block: PoVBlock,
_chunks: (ValidatorIndex, &[ErasureChunk])
) -> Self::SendLocalCollation { future::ready(Ok(())) }
fn fetch_pov_block(&self, _candidate: &AbridgedCandidateReceipt) -> Self::FetchValidationProof {
future::ok(pov_block_with_data(vec![1, 2, 3, 4, 5]))
}
} }
#[test] #[test]
@@ -688,7 +692,7 @@ mod tests {
}; };
shared_table.import_remote_statement( shared_table.import_remote_statement(
&DummyRouter, lazy_fetch_pov(),
signed_statement, signed_statement,
).expect("candidate and local validity group are same"); ).expect("candidate and local validity group are same");
} }
@@ -736,7 +740,7 @@ mod tests {
}; };
shared_table.import_remote_statement( shared_table.import_remote_statement(
&DummyRouter, lazy_fetch_pov(),
signed_statement, signed_statement,
).expect("should produce work"); ).expect("should produce work");
} }
@@ -909,7 +913,7 @@ mod tests {
}; };
let _a = shared_table.import_remote_statement( let _a = shared_table.import_remote_statement(
&DummyRouter, lazy_fetch_pov(),
signed_statement.clone(), signed_statement.clone(),
).expect("should produce work"); ).expect("should produce work");
@@ -917,7 +921,7 @@ mod tests {
.expect("validation has started").is_in_progress()); .expect("validation has started").is_in_progress());
let b = shared_table.import_remote_statement( let b = shared_table.import_remote_statement(
&DummyRouter, lazy_fetch_pov(),
signed_statement.clone(), signed_statement.clone(),
); );
@@ -967,7 +971,7 @@ mod tests {
.expect("validation has started").is_done()); .expect("validation has started").is_done());
let a = shared_table.import_remote_statement( let a = shared_table.import_remote_statement(
&DummyRouter, lazy_fetch_pov(),
signed_statement, signed_statement,
); );