PoV-block gossip (#930)

* add pov-block gossip message to network

* tests for pov-block gossip

* integrate pov-block gossip into main protocol

* message validation fetches pov blocks

* remove stray dbg! invocation

* test that pov-block is fetched from relay-parent topic
This commit is contained in:
Robert Habermeier
2020-03-31 17:05:42 -04:00
committed by GitHub
parent 5a84c64507
commit cb5defc91d
4 changed files with 549 additions and 74 deletions
+71 -20
View File
@@ -877,9 +877,17 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
&self.gossip_handle,
);
}
ServiceToWorkerMsg::FetchPoVBlock(_candidate, _sender) => {
// TODO https://github.com/paritytech/polkadot/issues/742:
// create a filter on gossip for it and send to sender.
ServiceToWorkerMsg::FetchPoVBlock(candidate, mut sender) => {
// The gossip system checks that the correct pov-block data is present
// before placing in the pool, so we can safely check by candidate hash.
let get_msg = fetch_pov_from_gossip(&candidate, &self.gossip_handle);
let _ = self.executor.spawn(async move {
let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await;
if let Either::Left((pov_block, _)) = res {
let _ = sender.send(pov_block);
}
});
}
ServiceToWorkerMsg::FetchErasureChunk(candidate_hash, validator_index, mut sender) => {
let topic = crate::erasure_coding_topic(&candidate_hash);
@@ -1133,16 +1141,14 @@ async fn statement_import_loop<Api>(
statements.insert(0, statement);
let producers: Vec<_> = {
// TODO: fetch these from gossip.
// https://github.com/paritytech/polkadot/issues/742
fn ignore_pov_fetch_requests(_: &AbridgedCandidateReceipt)
-> future::Pending<Result<PoVBlock, std::io::Error>>
{
future::pending()
}
let gossip_handle = &gossip_handle;
let fetch_pov = |candidate: &AbridgedCandidateReceipt| fetch_pov_from_gossip(
candidate,
gossip_handle,
).map(Result::<_, std::io::Error>::Ok);
table.import_remote_statements(
&ignore_pov_fetch_requests,
&fetch_pov,
statements.iter().cloned(),
)
};
@@ -1192,6 +1198,33 @@ async fn statement_import_loop<Api>(
}
}
fn fetch_pov_from_gossip(
candidate: &AbridgedCandidateReceipt,
gossip_handle: &impl GossipOps,
) -> impl Future<Output = PoVBlock> + Send {
let candidate_hash = candidate.hash();
let topic = crate::legacy::gossip::pov_block_topic(candidate.relay_parent);
// The gossip system checks that the correct pov-block data is present
// before placing in the pool, so we can safely check by candidate hash.
gossip_handle.gossip_messages_for(topic)
.filter_map(move |(msg, _)| {
future::ready(match msg {
GossipMessage::PoVBlock(pov_block_message) =>
if pov_block_message.candidate_hash == candidate_hash {
Some(pov_block_message.pov_block)
} else {
None
},
_ => None,
})
})
.into_future()
.map(|(item, _)| item.expect(
"gossip message streams do not conclude early; qed"
))
}
// distribute a "local collation": this is the collation gotten by a validator
// from a collator. it needs to be distributed to other validators in the same
// group.
@@ -1206,19 +1239,37 @@ fn distribute_validated_collation(
let hash = receipt.hash();
let validated = Validated::collated_local(
receipt,
pov_block,
pov_block.clone(),
);
let statement = crate::legacy::gossip::GossipStatement::new(
instance.relay_parent,
match instance.statement_table.import_validated(validated) {
None => return,
Some(s) => s,
}
);
// gossip the signed statement.
{
let statement = crate::legacy::gossip::GossipStatement::new(
instance.relay_parent,
match instance.statement_table.import_validated(validated) {
None => return,
Some(s) => s,
}
);
gossip_handle.gossip_message(instance.attestation_topic, statement.into());
gossip_handle.gossip_message(instance.attestation_topic, statement.into());
}
// gossip the PoV block.
{
let pov_block_message = crate::legacy::gossip::GossipPoVBlock {
relay_chain_leaf: instance.relay_parent,
candidate_hash: hash,
pov_block,
};
gossip_handle.gossip_message(
crate::legacy::gossip::pov_block_topic(instance.relay_parent),
pov_block_message.into(),
);
}
// gossip erasure chunks.
for chunk in chunks.1 {
let message = crate::legacy::gossip::ErasureChunkMessage {
chunk,
+55 -3
View File
@@ -14,6 +14,7 @@
//! Tests for the protocol.
use super::*;
use crate::legacy::gossip::GossipPoVBlock;
use parking_lot::Mutex;
use polkadot_primitives::Block;
@@ -21,8 +22,9 @@ use polkadot_primitives::parachain::{
Id as ParaId, Chain, DutyRoster, ParachainHost, ValidatorId,
Retriable, CollatorId, AbridgedCandidateReceipt,
GlobalValidationSchedule, LocalValidationData, ErasureChunk, SigningContext,
PoVBlock, BlockData,
};
use polkadot_validation::SharedTable;
use polkadot_validation::{SharedTable, TableRouter};
use av_store::{Store as AvailabilityStore, ErasureNetworking};
use sc_network_gossip::TopicNotification;
@@ -276,7 +278,6 @@ fn worker_task_shuts_down_when_sender_dropped() {
fn consensus_instances_cleaned_up() {
let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
let relay_parent = [0; 32].into();
let authorities = Vec::new();
let signing_context = SigningContext {
session_index: Default::default(),
@@ -294,7 +295,7 @@ fn consensus_instances_cleaned_up() {
pool.spawner().spawn_local(worker_task).unwrap();
let router = pool.run_until(
service.build_table_router(table, &authorities)
service.build_table_router(table, &[])
).unwrap();
drop(router);
@@ -464,3 +465,54 @@ fn erasure_fetch_drop_also_drops_gossip_sender() {
pool.run_until(test_work);
}
#[test]
fn fetches_pov_block_from_gossip() {
let (service, gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
let relay_parent = [255; 32].into();
let pov_block = PoVBlock {
block_data: BlockData(vec![1, 2, 3]),
};
let mut candidate = AbridgedCandidateReceipt::default();
candidate.relay_parent = relay_parent;
candidate.pov_block_hash = pov_block.hash();
let candidate_hash = candidate.hash();
let signing_context = SigningContext {
session_index: Default::default(),
parent_hash: relay_parent,
};
let table = Arc::new(SharedTable::new(
Vec::new(),
HashMap::new(),
None,
signing_context,
AvailabilityStore::new_in_memory(service.clone()),
None,
));
let spawner = pool.spawner();
spawner.spawn_local(worker_task).unwrap();
let topic = crate::legacy::gossip::pov_block_topic(relay_parent);
let (mut gossip_tx, _gossip_taken_rx) = gossip.add_gossip_stream(topic);
let test_work = async move {
let router = service.build_table_router(table, &[]).await.unwrap();
let pov_block_listener = router.fetch_pov_block(&candidate);
let message = GossipMessage::PoVBlock(GossipPoVBlock {
relay_chain_leaf: relay_parent,
candidate_hash,
pov_block,
}).encode();
gossip_tx.send(TopicNotification { message, sender: None }).await.unwrap();
pov_block_listener.await
};
pool.run_until(test_work).unwrap();
}