Erasure encoding availability (#345)

* Erasure encoding availability initial commit

 * Modifications to availability store to keep chunks as well as
   reconstructed blocks and extrinsics.
 * Gossip messages containig signed erasure chunks.
 * Requesting eraure chunks with polkadot-specific messages.
 * Validation of erasure chunk messages.

* Apply suggestions from code review

Co-Authored-By: Luke Schoen <ltfschoen@users.noreply.github.com>

* Fix build after a merge

* Gossip erasure chunk messages under their own topic

* erasure_chunks should use the appropriate topic

* Updates Cargo.lock

* Fixes after merge

* Removes a couple of leftover pieces of code

* Fixes simple stuff from review

* Updates erasure and storage for more flexible logic

* Changes validation and candidate receipt production.

* Adds add_erasure_chunks method

* Fixes most of the nits

* Better validate_collation and validate_receipt functions

* Fixes the tests

* Apply suggestions from code review

Co-Authored-By: Robert Habermeier <rphmeier@gmail.com>

* Removes unwrap() calls

* Removes ErasureChunks primitive

* Removes redundant fields from ErasureChunk struct

* AvailabilityStore should store CandidateReceipt

* Changes the way chunk messages are imported and validated.

 * Availability store now stores a validator_index and n_validators for
 each relay_parent.
 * Availability store now also stores candidate receipts.
 * Removes importing chunks in the table and moves it into network
 gossip validation.
 * Validation of erasure messages id done against receipts that are
 stored in the availability store.

* Correctly compute topics for erasure messages

* Removes an unused parameter

* Refactors availability db querying into a helper

* Adds the apis described in the writeup

* Adds a runtime api to extract erasure roots form raw extrinsics.

* Adds a barebone BlockImport impl for avalability store

* Adds the implementation of the availability worker

* Fix build after the merge with master.

* Make availability store API async

* Bring back the default wasmtime feature

* Lines width

* Bump runtime version

* Formatting and dead code elimination

* some style nits (#1)

* More nits and api cleanup

* Disable wasm CI for availability-store

* Another nit

* Formatting
This commit is contained in:
Fedor Sakharov
2019-12-03 17:49:07 +03:00
committed by Robert Habermeier
parent ec54d5b1e4
commit 99d164b5e7
29 changed files with 2957 additions and 572 deletions
+149 -71
View File
@@ -20,12 +20,12 @@
use std::collections::hash_map::{HashMap, Entry};
use std::sync::Arc;
use availability_store::{Data, Store as AvailabilityStore};
use availability_store::{Store as AvailabilityStore};
use table::{self, Table, Context as TableContextTrait};
use polkadot_primitives::{Block, BlockId, Hash};
use polkadot_primitives::parachain::{
Id as ParaId, Collation, OutgoingMessages, CandidateReceipt, ValidatorPair, ValidatorId,
AttestedCandidate, ParachainHost, PoVBlock, ValidatorIndex
Id as ParaId, OutgoingMessages, CandidateReceipt, ValidatorPair, ValidatorId,
AttestedCandidate, ParachainHost, PoVBlock, ValidatorIndex, ErasureChunk,
};
use parking_lot::Mutex;
@@ -146,7 +146,6 @@ impl SharedTableInner {
let local_index = context.local_index()?;
let para_member = context.is_member_of(local_index, &summary.group_id);
let digest = &summary.candidate;
// TODO: consider a strategy based on the number of candidate votes as well.
@@ -189,6 +188,7 @@ impl SharedTableInner {
availability_store: self.availability_store.clone(),
relay_parent: context.parent_hash.clone(),
work,
local_index: local_index as usize,
max_block_data_size,
})
}
@@ -262,6 +262,7 @@ impl Validated {
pub struct ParachainWork<Fetch> {
work: Work<Fetch>,
relay_parent: Hash,
local_index: usize,
availability_store: AvailabilityStore,
max_block_data_size: Option<u64>,
}
@@ -272,23 +273,28 @@ impl<Fetch: Future> ParachainWork<Fetch> {
pub fn prime<P: ProvideRuntimeApi>(self, api: Arc<P>)
-> PrimedParachainWork<
Fetch,
impl Send + FnMut(&BlockId, &Collation) -> Result<OutgoingMessages, ()>,
impl Send + FnMut(&BlockId, &PoVBlock, &CandidateReceipt) -> Result<(OutgoingMessages, ErasureChunk), ()>,
>
where
P: Send + Sync + 'static,
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
{
let max_block_data_size = self.max_block_data_size;
let validate = move |id: &_, collation: &_| {
let res = crate::collation::validate_collation(
let local_index = self.local_index;
let validate = move |id: &_, pov_block: &_, receipt: &_| {
let res = crate::collation::validate_receipt(
&*api,
id,
collation,
pov_block,
receipt,
max_block_data_size,
);
match res {
Ok(e) => Ok(e),
Ok((messages, mut chunks)) => {
Ok((messages, chunks.swap_remove(local_index)))
}
Err(e) => {
debug!(target: "validation", "Encountered bad collation: {}", e);
Err(())
@@ -301,7 +307,7 @@ impl<Fetch: Future> ParachainWork<Fetch> {
/// Prime the parachain work with a custom validation function.
pub fn prime_with<F>(self, validate: F) -> PrimedParachainWork<Fetch, F>
where F: FnMut(&BlockId, &Collation) -> Result<OutgoingMessages, ()>
where F: FnMut(&BlockId, &PoVBlock, &CandidateReceipt) -> Result<(OutgoingMessages, ErasureChunk), ()>
{
PrimedParachainWork { inner: self, validate }
}
@@ -318,23 +324,21 @@ pub struct PrimedParachainWork<Fetch, F> {
validate: F,
}
impl<Fetch, F, Err> Future for PrimedParachainWork<Fetch, F>
impl<Fetch, F, Err> PrimedParachainWork<Fetch, F>
where
Fetch: Future<Item=PoVBlock,Error=Err>,
F: FnMut(&BlockId, &Collation) -> Result<OutgoingMessages, ()>,
F: FnMut(&BlockId, &PoVBlock, &CandidateReceipt) -> Result<(OutgoingMessages, ErasureChunk), ()>,
Err: From<::std::io::Error>,
{
type Item = Validated;
type Error = Err;
pub async fn validate(mut self) -> Result<(Validated, Option<ErasureChunk>), Err> {
use futures03::compat::Future01CompatExt;
let candidate = &self.inner.work.candidate_receipt;
let pov_block = self.inner.work.fetch.compat().await?;
fn poll(&mut self) -> Poll<Validated, Err> {
let work = &mut self.inner.work;
let candidate = &work.candidate_receipt;
let pov_block = futures::try_ready!(work.fetch.poll());
let validation_res = (self.validate)(
&BlockId::hash(self.inner.relay_parent),
&Collation { pov: pov_block.clone(), receipt: candidate.clone() },
&pov_block,
&candidate,
);
let candidate_hash = candidate.hash();
@@ -342,35 +346,30 @@ impl<Fetch, F, Err> Future for PrimedParachainWork<Fetch, F>
debug!(target: "validation", "Making validity statement about candidate {}: is_good? {:?}",
candidate_hash, validation_res.is_ok());
let (validity_statement, result) = match validation_res {
Err(()) => (
GenericStatement::Invalid(candidate_hash),
Validation::Invalid(pov_block),
),
Ok(outgoing_targeted) => {
let outgoing_queues = crate::outgoing_queues(&outgoing_targeted)
.map(|(_target, root, data)| (root, data))
.collect();
match validation_res {
Err(()) => Ok((
Validated {
statement: GenericStatement::Invalid(candidate_hash),
result: Validation::Invalid(pov_block),
},
None,
)),
Ok((outgoing_targeted, our_chunk)) => {
self.inner.availability_store.add_erasure_chunk(
self.inner.relay_parent,
candidate.clone(),
our_chunk.clone(),
).await?;
self.inner.availability_store.make_available(Data {
relay_parent: self.inner.relay_parent,
parachain_id: work.candidate_receipt.parachain_index,
candidate_hash,
block_data: pov_block.block_data.clone(),
outgoing_queues: Some(outgoing_queues),
})?;
(
GenericStatement::Valid(candidate_hash),
Validation::Valid(pov_block, outgoing_targeted)
)
Ok((
Validated {
statement: GenericStatement::Valid(candidate_hash),
result: Validation::Valid(pov_block, outgoing_targeted),
},
Some(our_chunk),
))
}
};
Ok(Async::Ready(Validated {
statement: validity_statement,
result,
}))
}
}
}
@@ -573,8 +572,11 @@ mod tests {
use super::*;
use sp_keyring::Sr25519Keyring;
use primitives::crypto::UncheckedInto;
use polkadot_primitives::parachain::{BlockData, ConsolidatedIngress};
use futures::future;
use polkadot_primitives::parachain::{AvailableMessages, BlockData, ConsolidatedIngress, Collation};
use polkadot_erasure_coding::{self as erasure};
use availability_store::ProvideGossipMessages;
use futures::{future};
fn pov_block_with_data(data: Vec<u8>) -> PoVBlock {
PoVBlock {
@@ -583,14 +585,39 @@ mod tests {
}
}
#[derive(Clone)]
struct DummyGossipMessages;
impl ProvideGossipMessages for DummyGossipMessages {
fn gossip_messages_for(
&self,
_topic: Hash
) -> Box<dyn futures03::Stream<Item = (Hash, Hash, ErasureChunk)> + Unpin + Send> {
Box::new(futures03::stream::empty())
}
fn gossip_erasure_chunk(
&self,
_relay_parent: Hash,
_candidate_hash: Hash,
_erasure_root: Hash,
_chunk: ErasureChunk,
) {}
}
#[derive(Clone)]
struct DummyRouter;
impl TableRouter for DummyRouter {
type Error = ::std::io::Error;
type FetchValidationProof = future::FutureResult<PoVBlock,Self::Error>;
fn local_collation(&self, _collation: Collation, _outgoing: OutgoingMessages) {
}
fn local_collation(
&self,
_collation: Collation,
_candidate: CandidateReceipt,
_outgoing: OutgoingMessages,
_chunks: (ValidatorIndex, &[ErasureChunk])
) {}
fn fetch_pov_block(&self, _candidate: &CandidateReceipt) -> Self::FetchValidationProof {
future::ok(pov_block_with_data(vec![1, 2, 3, 4, 5]))
@@ -622,7 +649,7 @@ mod tests {
groups,
Some(local_key.clone()),
parent_hash,
AvailabilityStore::new_in_memory(),
AvailabilityStore::new_in_memory(DummyGossipMessages),
None,
);
@@ -635,6 +662,7 @@ mod tests {
fees: 1_000_000,
block_data_hash: [2; 32].into(),
upward_messages: Vec::new(),
erasure_root: [1u8; 32].into(),
};
let candidate_statement = GenericStatement::Candidate(candidate);
@@ -677,7 +705,7 @@ mod tests {
groups,
Some(local_key.clone()),
parent_hash,
AvailabilityStore::new_in_memory(),
AvailabilityStore::new_in_memory(DummyGossipMessages),
None,
);
@@ -690,6 +718,7 @@ mod tests {
fees: 1_000_000,
block_data_hash: [2; 32].into(),
upward_messages: Vec::new(),
erasure_root: [1u8; 32].into(),
};
let candidate_statement = GenericStatement::Candidate(candidate);
@@ -709,10 +738,13 @@ mod tests {
#[test]
fn evaluate_makes_block_data_available() {
let store = AvailabilityStore::new_in_memory();
let store = AvailabilityStore::new_in_memory(DummyGossipMessages);
let relay_parent = [0; 32].into();
let para_id = 5.into();
let pov_block = pov_block_with_data(vec![1, 2, 3]);
let block_data_hash = [2; 32].into();
let local_index = 0;
let n_validators = 2;
let candidate = CandidateReceipt {
parachain_index: para_id,
@@ -721,39 +753,62 @@ mod tests {
head_data: ::polkadot_primitives::parachain::HeadData(vec![1, 2, 3, 4]),
egress_queue_roots: Vec::new(),
fees: 1_000_000,
block_data_hash: [2; 32].into(),
block_data_hash,
upward_messages: Vec::new(),
erasure_root: [1u8; 32].into(),
};
let hash = candidate.hash();
store.add_validator_index_and_n_validators(
&relay_parent,
local_index as u32,
n_validators as u32,
).unwrap();
let producer: ParachainWork<future::FutureResult<_, ::std::io::Error>> = ParachainWork {
work: Work {
candidate_receipt: candidate,
fetch: future::ok(pov_block.clone()),
},
local_index,
relay_parent,
availability_store: store.clone(),
max_block_data_size: None,
};
let validated = producer.prime_with(|_, _| Ok(OutgoingMessages { outgoing_messages: Vec::new() }))
.wait()
.unwrap();
let validated = futures03::executor::block_on(producer.prime_with(|_, _, _| Ok((
OutgoingMessages { outgoing_messages: Vec::new() },
ErasureChunk {
chunk: vec![1, 2, 3],
index: local_index as u32,
proof: vec![],
},
))).validate()).unwrap();
assert_eq!(validated.pov_block(), &pov_block);
assert_eq!(validated.statement, GenericStatement::Valid(hash));
assert_eq!(validated.0.pov_block(), &pov_block);
assert_eq!(validated.0.statement, GenericStatement::Valid(hash));
assert_eq!(store.block_data(relay_parent, hash).unwrap(), pov_block.block_data);
// TODO: check that a message queue is included by root.
if let Some(messages) = validated.0.outgoing_messages() {
let available_messages: AvailableMessages = messages.clone().into();
for (root, queue) in available_messages.0 {
assert_eq!(store.queue_by_root(&root), Some(queue));
}
}
assert!(store.get_erasure_chunk(&relay_parent, block_data_hash, local_index).is_some());
assert!(store.get_erasure_chunk(&relay_parent, block_data_hash, local_index + 1).is_none());
}
#[test]
fn full_availability() {
let store = AvailabilityStore::new_in_memory();
let store = AvailabilityStore::new_in_memory(DummyGossipMessages);
let relay_parent = [0; 32].into();
let para_id = 5.into();
let pov_block = pov_block_with_data(vec![1, 2, 3]);
let block_data_hash = pov_block.block_data.hash();
let local_index = 0;
let n_validators = 2;
let ex = Some(AvailableMessages(Vec::new()));
let candidate = CandidateReceipt {
parachain_index: para_id,
@@ -764,27 +819,48 @@ mod tests {
fees: 1_000_000,
block_data_hash: [2; 32].into(),
upward_messages: Vec::new(),
erasure_root: [1u8; 32].into(),
};
let hash = candidate.hash();
let chunks = erasure::obtain_chunks(n_validators, &pov_block.block_data, ex.as_ref()).unwrap();
store.add_validator_index_and_n_validators(
&relay_parent,
local_index as u32,
n_validators as u32,
).unwrap();
let producer = ParachainWork {
work: Work {
candidate_receipt: candidate,
fetch: future::ok::<_, ::std::io::Error>(pov_block.clone()),
},
local_index,
relay_parent,
availability_store: store.clone(),
max_block_data_size: None,
};
let validated = producer.prime_with(|_, _| Ok(OutgoingMessages { outgoing_messages: Vec::new() }))
.wait()
.unwrap();
let validated = futures03::executor::block_on(producer.prime_with(|_, _, _| Ok((
OutgoingMessages { outgoing_messages: Vec::new() },
ErasureChunk {
chunk: chunks[local_index].clone(),
index: local_index as u32,
proof: vec![],
},
))).validate()).unwrap();
assert_eq!(validated.pov_block(), &pov_block);
assert_eq!(validated.0.pov_block(), &pov_block);
assert_eq!(store.block_data(relay_parent, hash).unwrap(), pov_block.block_data);
if let Some(messages) = validated.0.outgoing_messages() {
let available_messages: AvailableMessages = messages.clone().into();
for (root, queue) in available_messages.0 {
assert_eq!(store.queue_by_root(&root), Some(queue));
}
}
// This works since there are only two validators and one erasure chunk should be
// enough to reconstruct the block data.
assert_eq!(store.block_data(relay_parent, block_data_hash).unwrap(), pov_block.block_data);
// TODO: check that a message queue is included by root.
}
@@ -813,7 +889,7 @@ mod tests {
groups,
Some(local_key.clone()),
parent_hash,
AvailabilityStore::new_in_memory(),
AvailabilityStore::new_in_memory(DummyGossipMessages),
None,
);
@@ -826,6 +902,7 @@ mod tests {
fees: 1_000_000,
block_data_hash: [2; 32].into(),
upward_messages: Vec::new(),
erasure_root: [1u8; 32].into(),
};
let hash = candidate.hash();
@@ -879,7 +956,7 @@ mod tests {
groups,
Some(local_key.clone()),
parent_hash,
AvailabilityStore::new_in_memory(),
AvailabilityStore::new_in_memory(DummyGossipMessages),
None,
);
@@ -892,6 +969,7 @@ mod tests {
fees: 1_000_000,
block_data_hash: [2; 32].into(),
upward_messages: Vec::new(),
erasure_root: [1u8; 32].into(),
};
let hash = candidate.hash();