Remove legacy network code (#860)

* expunge legacy code from polkadot-network

* mostly rip out old legacy protocol from service

* ensure validation work is spawned by incoming messages

* decouple availabliity store from network logic; clean up data flow

* av_store: test helpers and use futures-abort

* update polkadot-validation to pass n_validators when submitting chunks

* fallible erasure-chunk fetching

* implement `ErasureNetworking` for new network prot

* API for registering availability store in network

* fully integrate new network service into service

* fix validation tests

* scaffolding for porting collator over to new network

* track connected validators' peer IDs and distribute collators' collations

* helper in network for fetching all checked statements

* fix adder-collator

* actually register notifications protocol

* Update service/src/lib.rs

* merge with master
This commit is contained in:
Robert Habermeier
2020-03-05 10:11:21 -08:00
committed by GitHub
parent b49bf9d5b0
commit 7931380825
19 changed files with 863 additions and 3120 deletions
+30 -45
View File
@@ -32,7 +32,7 @@ use polkadot_primitives::{
ParachainHost, AvailableData, OmittedValidationData,
},
};
use sp_runtime::traits::{BlakeTwo256, Hash as HashT, HashFor};
use sp_runtime::traits::HashFor;
use sp_blockchain::{Result as ClientResult};
use client::{
BlockchainEvents, BlockBody,
@@ -55,7 +55,7 @@ pub use worker::AvailabilityBlockImport;
pub use store::AwaitedFrontierEntry;
use worker::{
Worker, WorkerHandle, Chunks, IncludedParachainBlocks, WorkerMsg, MakeAvailable,
Worker, WorkerHandle, IncludedParachainBlocks, WorkerMsg, MakeAvailable, Chunks
};
use store::{Store as InnerStore};
@@ -70,23 +70,7 @@ pub struct Config {
pub path: PathBuf,
}
/// Compute gossip topic for the erasure chunk messages given the relay parent,
/// root and the chunk index.
///
/// Since at this point we are not able to use [`network`] directly, but both
/// of them need to compute these topics, this lives here and not there.
///
/// [`network`]: ../polkadot_network/index.html
pub fn erasure_coding_topic(relay_parent: Hash, erasure_root: Hash, index: u32) -> Hash {
let mut v = relay_parent.as_ref().to_vec();
v.extend(erasure_root.as_ref());
v.extend(&index.to_le_bytes()[..]);
v.extend(b"erasure_chunks");
BlakeTwo256::hash(&v[..])
}
/// A trait that provides a shim for the [`NetworkService`] trait.
/// An abstraction around networking for the availablity-store.
///
/// Currently it is not possible to use the networking code in the availability store
/// core directly due to a number of loop dependencies it require:
@@ -95,26 +79,25 @@ pub fn erasure_coding_topic(relay_parent: Hash, erasure_root: Hash, index: u32)
///
/// `availability-store` -> `network` -> `validation` -> `availability-store`
///
/// So we provide this shim trait that gets implemented for a wrapper newtype in
/// the [`network`] module.
/// So we provide this trait that gets implemented for a type in
/// the [`network`] module or a mock in tests.
///
/// [`NetworkService`]: ../polkadot_network/trait.NetworkService.html
/// [`network`]: ../polkadot_network/index.html
pub trait ProvideGossipMessages {
/// Get a stream of gossip erasure chunk messages for a given topic.
///
/// Each item is a tuple (relay_parent, candidate_hash, erasure_chunk)
fn gossip_messages_for(
&self,
topic: Hash,
) -> Pin<Box<dyn Stream<Item = (Hash, Hash, ErasureChunk)> + Send>>;
pub trait ErasureNetworking {
/// Errors that can occur when fetching erasure chunks.
type Error: std::fmt::Debug + 'static;
/// Gossip an erasure chunk message.
fn gossip_erasure_chunk(
/// Fetch an erasure chunk from the networking service.
fn fetch_erasure_chunk(
&self,
candidate_hash: &Hash,
index: u32,
) -> Pin<Box<dyn Future<Output = Result<ErasureChunk, Self::Error>> + Send>>;
/// Distributes an erasure chunk to the correct validator node.
fn distribute_erasure_chunk(
&self,
relay_parent: Hash,
candidate_hash: Hash,
erasure_root: Hash,
chunk: ErasureChunk,
);
}
@@ -148,11 +131,11 @@ impl Store {
/// Creating a store among other things starts a background worker thread which
/// handles most of the write operations to the storage.
#[cfg(not(target_os = "unknown"))]
pub fn new<PGM>(config: Config, gossip: PGM) -> io::Result<Self>
where PGM: ProvideGossipMessages + Send + Sync + Clone + 'static
pub fn new<EN>(config: Config, network: EN) -> io::Result<Self>
where EN: ErasureNetworking + Send + Sync + Clone + 'static
{
let inner = InnerStore::new(config)?;
let worker = Arc::new(Worker::start(inner.clone(), gossip));
let worker = Arc::new(Worker::start(inner.clone(), network));
let to_worker = worker.to_worker().clone();
Ok(Self {
@@ -166,11 +149,11 @@ impl Store {
///
/// Creating a store among other things starts a background worker thread
/// which handles most of the write operations to the storage.
pub fn new_in_memory<PGM>(gossip: PGM) -> Self
where PGM: ProvideGossipMessages + Send + Sync + Clone + 'static
pub fn new_in_memory<EN>(network: EN) -> Self
where EN: ErasureNetworking + Send + Sync + Clone + 'static
{
let inner = InnerStore::new_in_memory();
let worker = Arc::new(Worker::start(inner.clone(), gossip));
let worker = Arc::new(Worker::start(inner.clone(), network));
let to_worker = worker.to_worker().clone();
Self {
@@ -204,7 +187,6 @@ impl Store {
let to_worker = self.to_worker.clone();
let import = AvailabilityBlockImport::new(
self.inner.clone(),
client,
wrapped_block_import,
spawner,
@@ -261,35 +243,38 @@ impl Store {
pub async fn add_erasure_chunk(
&self,
candidate: AbridgedCandidateReceipt,
n_validators: u32,
chunk: ErasureChunk,
) -> io::Result<()> {
self.add_erasure_chunks(candidate, vec![chunk]).await
self.add_erasure_chunks(candidate, n_validators, std::iter::once(chunk)).await
}
/// Adds a set of erasure chunks to storage.
///
/// The chunks should be checked for validity against the root of encoding
/// and it's proof prior to calling this.
/// and its proof prior to calling this.
///
/// This method will send the chunks to the background worker, allowing caller to
/// asynchrounously waiting for the result.
pub async fn add_erasure_chunks<I>(
&self,
candidate: AbridgedCandidateReceipt,
n_validators: u32,
chunks: I,
) -> io::Result<()>
where I: IntoIterator<Item = ErasureChunk>
{
let candidate_hash = candidate.hash();
let relay_parent = candidate.relay_parent;
self.add_candidate(candidate).await?;
let (s, r) = oneshot::channel();
let chunks = chunks.into_iter().collect();
let msg = WorkerMsg::Chunks(Chunks {
relay_parent,
candidate_hash,
chunks,
n_validators,
result: s,
});
+33 -36
View File
@@ -60,33 +60,31 @@ fn candidate_key(candidate_hash: &Hash) -> Vec<u8> {
(candidate_hash, 2i8).encode()
}
fn available_chunks_key(relay_parent: &Hash, erasure_root: &Hash) -> Vec<u8> {
(relay_parent, erasure_root, 3i8).encode()
}
fn candidates_with_relay_parent_key(relay_block: &Hash) -> Vec<u8> {
(relay_block, 4i8).encode()
}
// meta keys
fn awaited_chunks_key() -> [u8; 14] {
*b"awaited_chunks"
}
const AWAITED_CHUNKS_KEY: [u8; 14] = *b"awaited_chunks";
fn validator_index_and_n_validators_key(relay_parent: &Hash) -> Vec<u8> {
(relay_parent, 1i8).encode()
}
fn available_chunks_key(candidate_hash: &Hash) -> Vec<u8> {
(candidate_hash, 2i8).encode()
}
/// An entry in the awaited frontier of chunks we are interested in.
#[derive(Encode, Decode, Debug, Hash, PartialEq, Eq, Clone)]
pub struct AwaitedFrontierEntry {
/// The relay-chain parent block hash.
/// The hash of the candidate for which we want to fetch a chunk for.
/// There will be duplicate entries in the case of multiple candidates with
/// the same erasure-root, but this is unlikely.
pub candidate_hash: Hash,
/// Although the relay-parent is implicitly referenced by the candidate hash,
/// we include it here as well for convenience in pruning the set.
pub relay_parent: Hash,
/// The erasure-chunk trie root we are comparing against.
///
/// We index by erasure-root because there may be multiple candidates
/// with the same erasure root.
pub erasure_root: Hash,
/// The index of the validator we represent.
pub validator_index: u32,
}
@@ -153,7 +151,7 @@ impl Store {
/// Get a set of all chunks we are waiting for.
pub fn awaited_chunks(&self) -> Option<HashSet<AwaitedFrontierEntry>> {
self.query_inner(columns::META, &awaited_chunks_key()).map(|vec: Vec<AwaitedFrontierEntry>| {
self.query_inner(columns::META, &AWAITED_CHUNKS_KEY).map(|vec: Vec<AwaitedFrontierEntry>| {
HashSet::from_iter(vec.into_iter())
})
}
@@ -183,21 +181,21 @@ impl Store {
if let Some((validator_index, _)) = self.get_validator_index_and_n_validators(relay_parent) {
let candidates = candidates.clone();
let awaited_frontier: Vec<AwaitedFrontierEntry> = self
.query_inner(columns::META, &awaited_chunks_key())
.query_inner(columns::META, &AWAITED_CHUNKS_KEY)
.unwrap_or_else(|| Vec::new());
let mut awaited_frontier: HashSet<AwaitedFrontierEntry> =
HashSet::from_iter(awaited_frontier.into_iter());
awaited_frontier.extend(candidates.iter().filter_map(|candidate| {
self.get_candidate(&candidate).map(|receipt| AwaitedFrontierEntry {
awaited_frontier.extend(candidates.iter().cloned().map(|candidate_hash| {
AwaitedFrontierEntry {
relay_parent: relay_parent.clone(),
erasure_root: receipt.commitments.erasure_root,
candidate_hash,
validator_index,
})
}
}));
let awaited_frontier = Vec::from_iter(awaited_frontier.into_iter());
tx.put_vec(columns::META, &awaited_chunks_key(), awaited_frontier.encode());
tx.put_vec(columns::META, &AWAITED_CHUNKS_KEY, awaited_frontier.encode());
}
let mut descendent_candidates = self.get_candidates_with_relay_parent(relay_parent);
@@ -246,15 +244,12 @@ impl Store {
let mut v = self.query_inner(columns::DATA, &dbkey).unwrap_or(Vec::new());
let av_chunks_key = available_chunks_key(
&receipt.relay_parent,
&receipt.commitments.erasure_root,
);
let av_chunks_key = available_chunks_key(candidate_hash);
let mut have_chunks = self.query_inner(columns::META, &av_chunks_key).unwrap_or(Vec::new());
let awaited_frontier: Option<Vec<AwaitedFrontierEntry>> = self.query_inner(
columns::META,
&awaited_chunks_key(),
&AWAITED_CHUNKS_KEY,
);
for chunk in chunks.into_iter() {
@@ -268,19 +263,21 @@ impl Store {
awaited_frontier.retain(|entry| {
!(
entry.relay_parent == receipt.relay_parent &&
entry.erasure_root == receipt.commitments.erasure_root &&
&entry.candidate_hash == candidate_hash &&
have_chunks.contains(&entry.validator_index)
)
});
tx.put_vec(columns::META, &awaited_chunks_key(), awaited_frontier.encode());
tx.put_vec(columns::META, &AWAITED_CHUNKS_KEY, awaited_frontier.encode());
}
// If therea are no block data in the store at this point,
// If there are no block data in the store at this point,
// check that they can be reconstructed now and add them to store if they can.
if self.execution_data(&candidate_hash).is_none() {
if let Ok(available_data) = erasure::reconstruct(
n_validators as usize,
v.iter().map(|chunk| (chunk.chunk.as_ref(), chunk.index as usize))) {
v.iter().map(|chunk| (chunk.chunk.as_ref(), chunk.index as usize)),
)
{
self.make_available(*candidate_hash, available_data)?;
}
}
@@ -339,11 +336,11 @@ impl Store {
let mut tx = DBTransaction::new();
let awaited_frontier: Option<Vec<AwaitedFrontierEntry>> = self
.query_inner(columns::META, &awaited_chunks_key());
.query_inner(columns::META, &AWAITED_CHUNKS_KEY);
if let Some(mut awaited_frontier) = awaited_frontier {
awaited_frontier.retain(|entry| entry.relay_parent != relay_parent);
tx.put_vec(columns::META, &awaited_chunks_key(), awaited_frontier.encode());
tx.put_vec(columns::META, &AWAITED_CHUNKS_KEY, awaited_frontier.encode());
}
let candidates = self.get_candidates_with_relay_parent(&relay_parent);
@@ -354,6 +351,8 @@ impl Store {
tx.delete(columns::DATA, execution_data_key(&candidate).as_slice());
tx.delete(columns::DATA, &erasure_chunks_key(&candidate));
tx.delete(columns::DATA, &candidate_key(&candidate));
tx.delete(columns::META, &available_chunks_key(&candidate));
}
self.inner.write(tx)
@@ -576,7 +575,6 @@ mod tests {
proof: Vec::new(),
};
let candidates = vec![receipt_1_hash, receipt_2_hash];
let erasure_roots = vec![erasure_root_1, erasure_root_2];
let store = Store::new_in_memory();
@@ -596,10 +594,9 @@ mod tests {
let expected: HashSet<_> = candidates
.clone()
.into_iter()
.zip(erasure_roots.iter())
.map(|(_c, &e)| AwaitedFrontierEntry {
.map(|c| AwaitedFrontierEntry {
relay_parent,
erasure_root: e,
candidate_hash: c,
validator_index,
})
.collect();
@@ -612,7 +609,7 @@ mod tests {
// Now we wait for the other chunk that we haven't received yet.
let expected: HashSet<_> = vec![AwaitedFrontierEntry {
relay_parent,
erasure_root: erasure_roots[1],
candidate_hash: receipt_2_hash,
validator_index,
}].into_iter().collect();
+257 -333
View File
@@ -38,11 +38,12 @@ use polkadot_primitives::parachain::{
ValidatorPair, ErasureChunk,
};
use futures::{prelude::*, future::select, channel::{mpsc, oneshot}, task::{Spawn, SpawnExt}};
use futures::future::AbortHandle;
use keystore::KeyStorePtr;
use tokio::runtime::{Handle, Runtime as LocalRuntime};
use crate::{LOG_TARGET, ProvideGossipMessages, erasure_coding_topic};
use crate::{LOG_TARGET, ErasureNetworking};
use crate::store::Store;
/// Errors that may occur.
@@ -52,8 +53,17 @@ pub(crate) enum Error {
StoreError(io::Error),
#[display(fmt = "Validator's id and number of validators at block with parent {} not found", relay_parent)]
IdAndNValidatorsNotFound { relay_parent: Hash },
#[display(fmt = "Candidate receipt with hash {} not found", candidate_hash)]
CandidateNotFound { candidate_hash: Hash },
}
/// Used in testing to interact with the worker thread.
#[cfg(test)]
pub(crate) struct WithWorker(Box<dyn FnOnce(&mut Worker) + Send>);
#[cfg(test)]
impl std::fmt::Debug for WithWorker {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "<boxed closure>")
}
}
/// Messages sent to the `Worker`.
@@ -66,10 +76,11 @@ pub(crate) enum Error {
#[derive(Debug)]
pub(crate) enum WorkerMsg {
IncludedParachainBlocks(IncludedParachainBlocks),
ListenForChunks(ListenForChunks),
Chunks(Chunks),
CandidatesFinalized(CandidatesFinalized),
MakeAvailable(MakeAvailable),
#[cfg(test)]
WithWorker(WithWorker),
}
/// A notification of a parachain block included in the relay chain.
@@ -90,26 +101,15 @@ pub(crate) struct IncludedParachainBlocks {
pub result: oneshot::Sender<Result<(), Error>>,
}
/// Listen gossip for these chunks.
#[derive(Debug)]
pub(crate) struct ListenForChunks {
/// The hash of the candidate chunk belongs to.
pub candidate_hash: Hash,
/// The index of the chunk we need.
pub index: u32,
/// A sender to signal the result asynchronously.
pub result: Option<oneshot::Sender<Result<(), Error>>>,
}
/// We have received some chunks.
/// We have received chunks we requested.
#[derive(Debug)]
pub(crate) struct Chunks {
/// The relay parent of the block these chunks belong to.
pub relay_parent: Hash,
/// The hash of the parachain candidate these chunks belong to.
pub candidate_hash: Hash,
/// The chunks.
/// The chunks
pub chunks: Vec<ErasureChunk>,
/// The number of validators present at the candidate's relay-parent.
pub n_validators: u32,
/// A sender to signal the result asynchronously.
pub result: oneshot::Sender<Result<(), Error>>,
}
@@ -134,20 +134,26 @@ pub(crate) struct MakeAvailable {
pub result: oneshot::Sender<Result<(), Error>>,
}
/// Description of a chunk we are listening for.
#[derive(Hash, Debug, PartialEq, Eq)]
struct ListeningKey {
candidate_hash: Hash,
index: u32,
}
/// An availability worker with it's inner state.
pub(super) struct Worker<PGM> {
pub(super) struct Worker {
availability_store: Store,
provide_gossip_messages: PGM,
registered_gossip_streams: HashMap<Hash, exit_future::Signal>,
listening_for: HashMap<ListeningKey, AbortHandle>,
sender: mpsc::UnboundedSender<WorkerMsg>,
}
/// The handle to the `Worker`.
pub(super) struct WorkerHandle {
exit_signal: Option<exit_future::Signal>,
thread: Option<thread::JoinHandle<io::Result<()>>>,
sender: mpsc::UnboundedSender<WorkerMsg>,
exit_signal: Option<exit_future::Signal>,
}
impl WorkerHandle {
@@ -170,34 +176,6 @@ impl Drop for WorkerHandle {
}
}
async fn listen_for_chunks<PGM, S>(
p: PGM,
topic: Hash,
mut sender: S
)
where
PGM: ProvideGossipMessages,
S: Sink<WorkerMsg> + Unpin,
{
trace!(target: LOG_TARGET, "Registering gossip listener for topic {}", topic);
let mut chunks_stream = p.gossip_messages_for(topic);
while let Some(item) = chunks_stream.next().await {
let (s, _) = oneshot::channel();
trace!(target: LOG_TARGET, "Received for {:?}", item);
let chunks = Chunks {
relay_parent: item.0,
candidate_hash: item.1,
chunks: vec![item.2],
result: s,
};
if let Err(_) = sender.send(WorkerMsg::Chunks(chunks)).await {
break;
}
}
}
fn fetch_candidates<P>(client: &P, extrinsics: Vec<<Block as BlockT>::Extrinsic>, parent: &BlockId)
-> ClientResult<Option<Vec<AbridgedCandidateReceipt>>>
@@ -293,76 +271,90 @@ where
}
}
impl<PGM> Drop for Worker<PGM> {
fn drop(&mut self) {
for (_, signal) in self.registered_gossip_streams.drain() {
let _ = signal.fire();
}
}
}
impl Worker {
impl<PGM> Worker<PGM>
where
PGM: ProvideGossipMessages + Clone + Send + 'static,
{
// Called on startup of the worker to register listeners for all awaited chunks.
fn register_listeners(
// Called on startup of the worker to initiate fetch from network for all awaited chunks.
fn initiate_all_fetches<EN: ErasureNetworking>(
&mut self,
runtime_handle: &Handle,
erasure_network: &EN,
sender: &mut mpsc::UnboundedSender<WorkerMsg>,
) {
if let Some(awaited_chunks) = self.availability_store.awaited_chunks() {
for awaited_chunk in awaited_chunks {
if let Err(e) = self.register_chunks_listener(
if let Err(e) = self.initiate_fetch(
runtime_handle,
erasure_network,
sender,
awaited_chunk.relay_parent,
awaited_chunk.erasure_root,
awaited_chunk.candidate_hash,
) {
warn!(target: LOG_TARGET, "Failed to register gossip listener: {}", e);
warn!(target: LOG_TARGET, "Failed to register network listener: {}", e);
}
}
}
}
fn register_chunks_listener(
// initiates a fetch from network for the described chunk, with our local index.
fn initiate_fetch<EN: ErasureNetworking>(
&mut self,
runtime_handle: &Handle,
erasure_network: &EN,
sender: &mut mpsc::UnboundedSender<WorkerMsg>,
relay_parent: Hash,
erasure_root: Hash,
candidate_hash: Hash,
) -> Result<(), Error> {
let (local_id, _) = self.availability_store
let (local_id, n_validators) = self.availability_store
.get_validator_index_and_n_validators(&relay_parent)
.ok_or(Error::IdAndNValidatorsNotFound { relay_parent })?;
let topic = erasure_coding_topic(relay_parent, erasure_root, local_id);
// fast exit for if we already have the chunk.
if self.availability_store.get_erasure_chunk(&candidate_hash, local_id as _).is_some() {
return Ok(())
}
trace!(
target: LOG_TARGET,
"Registering listener for erasure chunks topic {} for ({}, {})",
topic,
"Initiating fetch for erasure-chunk at parent {} with candidate-hash {}",
relay_parent,
erasure_root,
candidate_hash,
);
let (signal, exit) = exit_future::signal();
let fut = erasure_network.fetch_erasure_chunk(&candidate_hash, local_id);
let mut sender = sender.clone();
let (fut, signal) = future::abortable(async move {
let chunk = match fut.await {
Ok(chunk) => chunk,
Err(e) => {
warn!(target: LOG_TARGET, "Unable to fetch erasure-chunk from network: {:?}", e);
return
}
};
let (s, _) = oneshot::channel();
let _ = sender.send(WorkerMsg::Chunks(Chunks {
candidate_hash,
chunks: vec![chunk],
n_validators,
result: s,
})).await;
}.map(drop).boxed());
let fut = listen_for_chunks(
self.provide_gossip_messages.clone(),
topic,
sender.clone(),
);
self.registered_gossip_streams.insert(topic, signal);
let key = ListeningKey {
candidate_hash,
index: local_id,
};
let _ = runtime_handle.spawn(select(fut.boxed(), exit).map(drop));
self.listening_for.insert(key, signal);
let _ = runtime_handle.spawn(fut);
Ok(())
}
fn on_parachain_blocks_received(
fn on_parachain_blocks_received<EN: ErasureNetworking>(
&mut self,
runtime_handle: &Handle,
erasure_network: &EN,
sender: &mut mpsc::UnboundedSender<WorkerMsg>,
blocks: Vec<IncludedParachainBlock>,
) -> Result<(), Error> {
@@ -376,54 +368,47 @@ where
// Should we be breaking block into chunks here and gossiping it and so on?
}
if let Err(e) = self.register_chunks_listener(
runtime_handle,
sender,
candidate.relay_parent,
candidate.commitments.erasure_root,
) {
warn!(target: LOG_TARGET, "Failed to register chunk listener: {}", e);
}
// This leans on the codebase-wide assumption that the `relay_parent`
// of all candidates in a block matches the parent hash of that block.
//
// In the future this will not always be true.
let candidate_hash = candidate.hash();
let _ = self.availability_store.note_candidates_with_relay_parent(
&candidate.relay_parent,
&[candidate.hash()],
&[candidate_hash],
);
if let Err(e) = self.initiate_fetch(
runtime_handle,
erasure_network,
sender,
candidate.relay_parent,
candidate_hash,
) {
warn!(target: LOG_TARGET, "Failed to register chunk listener: {}", e);
}
}
Ok(())
}
// Processes chunks messages that contain awaited items.
//
// When an awaited item is received, it is placed into the availability store
// and removed from the frontier. Listener de-registered.
fn on_chunks_received(
// Handles chunks that were required.
fn on_chunks(
&mut self,
relay_parent: Hash,
candidate_hash: Hash,
chunks: Vec<ErasureChunk>,
n_validators: u32,
) -> Result<(), Error> {
let (_, n_validators) = self.availability_store
.get_validator_index_and_n_validators(&relay_parent)
.ok_or(Error::IdAndNValidatorsNotFound { relay_parent })?;
for c in &chunks {
let key = ListeningKey {
candidate_hash,
index: c.index,
};
let receipt = self.availability_store.get_candidate(&candidate_hash)
.ok_or(Error::CandidateNotFound { candidate_hash })?;
for chunk in &chunks {
let topic = erasure_coding_topic(
relay_parent,
receipt.commitments.erasure_root,
chunk.index,
);
// need to remove gossip listener and stop it.
if let Some(signal) = self.registered_gossip_streams.remove(&topic) {
let _ = signal.fire();
// remove bookkeeping so network does not attempt to fetch
// any longer.
if let Some(exit_signal) = self.listening_for.remove(&key) {
exit_signal.abort();
}
}
@@ -436,47 +421,16 @@ where
Ok(())
}
// Processes the `ListenForChunks` message.
//
// When the worker receives a `ListenForChunk` message, it double-checks that
// we don't have that piece, and then it registers a listener.
fn on_listen_for_chunks_received(
&mut self,
runtime_handle: &Handle,
sender: &mut mpsc::UnboundedSender<WorkerMsg>,
candidate_hash: Hash,
id: usize
) -> Result<(), Error> {
let candidate = self.availability_store.get_candidate(&candidate_hash)
.ok_or(Error::CandidateNotFound { candidate_hash })?;
if self.availability_store
.get_erasure_chunk(&candidate_hash, id)
.is_none() {
if let Err(e) = self.register_chunks_listener(
runtime_handle,
sender,
candidate.relay_parent,
candidate.commitments.erasure_root
) {
warn!(target: LOG_TARGET, "Failed to register a gossip listener: {}", e);
}
}
Ok(())
}
/// Starts a worker with a given availability store and a gossip messages provider.
pub fn start(
pub fn start<EN: ErasureNetworking + Send + 'static>(
availability_store: Store,
provide_gossip_messages: PGM,
erasure_network: EN,
) -> WorkerHandle {
let (sender, mut receiver) = mpsc::unbounded();
let mut worker = Self {
let mut worker = Worker {
availability_store,
provide_gossip_messages,
registered_gossip_streams: HashMap::new(),
listening_for: HashMap::new(),
sender: sender.clone(),
};
@@ -489,34 +443,15 @@ where
let runtime_handle = runtime.handle().clone();
// On startup, registers listeners (gossip streams) for all
// On startup, initiates fetch from network for all
// entries in the awaited frontier.
worker.register_listeners(runtime.handle(), &mut sender);
worker.initiate_all_fetches(runtime.handle(), &erasure_network, &mut sender);
let process_notification = async move {
while let Some(msg) = receiver.next().await {
trace!(target: LOG_TARGET, "Received message {:?}", msg);
let res = match msg {
WorkerMsg::ListenForChunks(msg) => {
let ListenForChunks {
candidate_hash,
index,
result,
} = msg;
let res = worker.on_listen_for_chunks_received(
&runtime_handle,
&mut sender,
candidate_hash,
index as usize,
);
if let Some(result) = result {
let _ = result.send(res);
}
Ok(())
}
WorkerMsg::IncludedParachainBlocks(msg) => {
let IncludedParachainBlocks {
blocks,
@@ -525,6 +460,7 @@ where
let res = worker.on_parachain_blocks_received(
&runtime_handle,
&erasure_network,
&mut sender,
blocks,
);
@@ -533,11 +469,17 @@ where
Ok(())
}
WorkerMsg::Chunks(msg) => {
let Chunks { relay_parent, candidate_hash, chunks, result } = msg;
let res = worker.on_chunks_received(
relay_parent,
let Chunks {
candidate_hash,
chunks,
n_validators,
result,
} = msg;
let res = worker.on_chunks(
candidate_hash,
chunks,
n_validators,
);
let _ = result.send(res);
@@ -559,6 +501,11 @@ where
let _ = result.send(res);
Ok(())
}
#[cfg(test)]
WorkerMsg::WithWorker(with_worker) => {
(with_worker.0)(&mut worker);
Ok(())
}
};
if let Err(_) = res {
@@ -569,7 +516,6 @@ where
};
runtime.spawn(select(process_notification.boxed(), exit.clone()).map(drop));
runtime.block_on(exit);
info!(target: LOG_TARGET, "Availability worker exiting");
@@ -591,19 +537,16 @@ where
///
/// [`BlockImport`]: https://substrate.dev/rustdocs/v1.0/substrate_consensus_common/trait.BlockImport.html
pub struct AvailabilityBlockImport<I, P> {
availability_store: Store,
inner: I,
client: Arc<P>,
keystore: KeyStorePtr,
to_worker: mpsc::UnboundedSender<WorkerMsg>,
exit_signal: Option<exit_future::Signal>,
exit_signal: AbortHandle,
}
impl<I, P> Drop for AvailabilityBlockImport<I, P> {
fn drop(&mut self) {
if let Some(signal) = self.exit_signal.take() {
let _ = signal.fire();
}
self.exit_signal.abort();
}
}
@@ -653,24 +596,6 @@ impl<I, P> BlockImport<Block> for AvailabilityBlockImport<I, P> where
candidates,
);
for candidate in &candidates {
let candidate_hash = candidate.hash();
// If we don't yet have our chunk of this candidate,
// tell the worker to listen for one.
if self.availability_store.get_erasure_chunk(
&candidate_hash,
our_id as usize,
).is_none() {
let msg = WorkerMsg::ListenForChunks(ListenForChunks {
candidate_hash,
index: our_id as u32,
result: None,
});
let _ = self.to_worker.unbounded_send(msg);
}
}
let (s, _) = oneshot::channel();
// Inform the worker about the included parachain blocks.
@@ -714,7 +639,6 @@ impl<I, P> BlockImport<Block> for AvailabilityBlockImport<I, P> where
impl<I, P> AvailabilityBlockImport<I, P> {
pub(crate) fn new(
availability_store: Store,
client: Arc<P>,
block_import: I,
spawner: impl Spawn,
@@ -728,26 +652,21 @@ impl<I, P> AvailabilityBlockImport<I, P> {
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HashFor<Block>>,
{
let (signal, exit) = exit_future::signal();
// This is not the right place to spawn the finality future,
// it would be more appropriate to spawn it in the `start` method of the `Worker`.
// However, this would make the type of the `Worker` and the `Store` itself
// dependent on the types of client and executor, which would prove
// not not so handy in the testing code.
let mut exit_signal = Some(signal);
let prune_available = select(
prune_unneeded_availability(client.clone(), to_worker.clone()).boxed(),
exit.clone()
).map(drop);
let (prune_available, exit_signal) = future::abortable(prune_unneeded_availability(
client.clone(),
to_worker.clone(),
));
if let Err(_) = spawner.spawn(prune_available) {
if let Err(_) = spawner.spawn(prune_available.map(drop)) {
error!(target: LOG_TARGET, "Failed to spawn availability pruning task");
exit_signal = None;
}
AvailabilityBlockImport {
availability_store,
client,
inner: block_import,
to_worker,
@@ -771,55 +690,54 @@ impl<I, P> AvailabilityBlockImport<I, P> {
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use futures::{stream, channel::mpsc, Stream};
use std::sync::{Arc, Mutex, Condvar};
use futures::channel::oneshot;
use std::sync::Arc;
use std::pin::Pin;
use tokio::runtime::Runtime;
use parking_lot::Mutex;
use crate::store::AwaitedFrontierEntry;
// Just contains topic->channel mapping to give to outer code on `gossip_messages_for` calls.
struct TestGossipMessages {
messages: Arc<Mutex<HashMap<
Hash,
(
Arc<(Mutex<bool>, Condvar)>,
mpsc::UnboundedReceiver<(Hash, Hash, ErasureChunk)>,
),
#[derive(Default, Clone)]
struct TestErasureNetwork {
chunk_receivers: Arc<Mutex<HashMap<
(Hash, u32),
oneshot::Receiver<ErasureChunk>
>>>,
}
impl ProvideGossipMessages for TestGossipMessages {
fn gossip_messages_for(&self, topic: Hash)
-> Pin<Box<dyn Stream<Item = (Hash, Hash, ErasureChunk)> + Send>>
impl TestErasureNetwork {
// adds a receiver. this returns a sender for the erasure-chunk
// along with an exit future that fires when the erasure chunk has
// been fully-processed
fn add_receiver(&self, candidate_hash: Hash, index: u32)
-> oneshot::Sender<ErasureChunk>
{
match self.messages.lock().unwrap().remove(&topic) {
Some((pair, receiver)) => {
let (lock, cvar) = &*pair;
let mut consumed = lock.lock().unwrap();
*consumed = true;
cvar.notify_one();
receiver.boxed()
},
None => stream::iter(vec![]).boxed(),
}
let (sender, receiver) = oneshot::channel();
self.chunk_receivers.lock().insert((candidate_hash, index), receiver);
sender
}
fn gossip_erasure_chunk(
&self,
_relay_parent: Hash,
_candidate_hash: Hash,
_erasure_root: Hash,
_chunk: ErasureChunk
) {}
}
impl Clone for TestGossipMessages {
fn clone(&self) -> Self {
TestGossipMessages {
messages: self.messages.clone(),
impl ErasureNetworking for TestErasureNetwork {
type Error = String;
fn fetch_erasure_chunk(&self, candidate_hash: &Hash, index: u32)
-> Pin<Box<dyn Future<Output = Result<ErasureChunk, Self::Error>> + Send>>
{
match self.chunk_receivers.lock().remove(&(*candidate_hash, index)) {
Some(receiver) => receiver.then(|x| match x {
Ok(x) => future::ready(Ok(x)).left_future(),
Err(_) => future::pending().right_future(),
}).boxed(),
None => future::pending().boxed(),
}
}
fn distribute_erasure_chunk(
&self,
_candidate_hash: Hash,
_chunk: ErasureChunk
) {}
}
// This test tests that as soon as the worker receives info about new parachain blocks
@@ -830,31 +748,21 @@ mod tests {
fn receiving_gossip_chunk_removes_from_frontier() {
let mut runtime = Runtime::new().unwrap();
let relay_parent = [1; 32].into();
let erasure_root = [2; 32].into();
let local_id = 2;
let n_validators = 4;
let store = Store::new_in_memory();
let mut candidate = AbridgedCandidateReceipt::default();
candidate.relay_parent = relay_parent;
let candidate_hash = candidate.hash();
// Tell the store our validator's position and the number of validators at given point.
store.note_validator_index_and_n_validators(&relay_parent, local_id, n_validators).unwrap();
let (gossip_sender, gossip_receiver) = mpsc::unbounded();
let topic = erasure_coding_topic(relay_parent, erasure_root, local_id);
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let messages = TestGossipMessages {
messages: Arc::new(Mutex::new(vec![
(topic, (pair.clone(), gossip_receiver))
].into_iter().collect()))
};
let mut candidate = AbridgedCandidateReceipt::default();
candidate.commitments.erasure_root = erasure_root;
candidate.relay_parent = relay_parent;
let candidate_hash = candidate.hash();
let network = TestErasureNetwork::default();
let chunk_sender = network.add_receiver(candidate_hash, local_id);
// At this point we shouldn't be waiting for any chunks.
assert!(store.awaited_chunks().is_none());
@@ -869,7 +777,7 @@ mod tests {
result: s,
});
let handle = Worker::start(store.clone(), messages);
let handle = Worker::start(store.clone(), network);
// Tell the worker that the new blocks have been included into the relay chain.
// This should trigger the registration of gossip message listeners for the
@@ -883,30 +791,36 @@ mod tests {
store.awaited_chunks().unwrap(),
vec![AwaitedFrontierEntry {
relay_parent,
erasure_root,
candidate_hash,
validator_index: local_id,
}].into_iter().collect()
);
let msg = (
relay_parent,
candidate_hash,
ErasureChunk {
chunk: vec![1, 2, 3],
index: local_id as u32,
proof: vec![],
// Complete the chunk request.
chunk_sender.send(ErasureChunk {
chunk: vec![1, 2, 3],
index: local_id as u32,
proof: vec![],
}).unwrap();
// wait until worker thread has de-registered the listener for a
// particular chunk.
loop {
let (s, r) = oneshot::channel();
handle.sender.unbounded_send(WorkerMsg::WithWorker(WithWorker(Box::new(move |worker| {
let key = ListeningKey {
candidate_hash,
index: local_id,
};
let is_waiting = worker.listening_for.contains_key(&key);
s.send(!is_waiting).unwrap(); // tell the test thread `true` if we are not waiting.
})))).unwrap();
if runtime.block_on(r).unwrap() {
break
}
);
// Send a gossip message with an awaited chunk
gossip_sender.unbounded_send(msg).unwrap();
// At the point the needed piece is received, the gossip listener for
// this topic is deregistered and it's receiver side is dropped.
// Wait for the sender side to become closed.
while !gossip_sender.is_closed() {
// Probably we can just .wait this somehow?
thread::sleep(Duration::from_millis(100));
}
// The awaited chunk has been received so at this point we no longer wait for any chunks.
@@ -914,7 +828,7 @@ mod tests {
}
#[test]
fn listen_for_chunk_registers_listener() {
fn included_parachain_blocks_registers_listener() {
let mut runtime = Runtime::new().unwrap();
let relay_parent = [1; 32].into();
let erasure_root_1 = [2; 32].into();
@@ -956,63 +870,73 @@ mod tests {
}],
).unwrap();
let (_, gossip_receiver_1) = mpsc::unbounded();
let (_, gossip_receiver_2) = mpsc::unbounded();
let network = TestErasureNetwork::default();
let _ = network.add_receiver(candidate_1_hash, local_id);
let _ = network.add_receiver(candidate_2_hash, local_id);
let topic_1 = erasure_coding_topic(relay_parent, erasure_root_1, local_id);
let topic_2 = erasure_coding_topic(relay_parent, erasure_root_2, local_id);
let handle = Worker::start(store.clone(), network.clone());
let cvar_pair1 = Arc::new((Mutex::new(false), Condvar::new()));
let cvar_pair2 = Arc::new((Mutex::new(false), Condvar::new()));
{
let (s, r) = oneshot::channel();
// Tell the worker to listen for chunks from candidate 2 (we alredy have a chunk from it).
let listen_msg_2 = WorkerMsg::IncludedParachainBlocks(IncludedParachainBlocks {
blocks: vec![IncludedParachainBlock {
candidate: candidate_2,
available_data: None,
}],
result: s,
});
let messages = TestGossipMessages {
messages: Arc::new(Mutex::new(
vec![
(topic_1, (cvar_pair1.clone(), gossip_receiver_1)),
(topic_2, (cvar_pair2, gossip_receiver_2)),
].into_iter().collect()))
};
handle.sender.unbounded_send(listen_msg_2).unwrap();
let handle = Worker::start(store.clone(), messages.clone());
runtime.block_on(r).unwrap().unwrap();
// The receiver for this chunk left intact => listener not registered.
assert!(network.chunk_receivers.lock().contains_key(&(candidate_2_hash, local_id)));
let (s2, r2) = oneshot::channel();
// Tell the worker to listen for chunks from candidate 2 (we alredy have a chunk from it).
let listen_msg_2 = WorkerMsg::ListenForChunks(ListenForChunks {
candidate_hash: candidate_2_hash,
index: local_id as u32,
result: Some(s2),
});
// more directly:
let (s, r) = oneshot::channel();
handle.sender.unbounded_send(WorkerMsg::WithWorker(WithWorker(Box::new(move |worker| {
let key = ListeningKey {
candidate_hash: candidate_2_hash,
index: local_id,
};
let _ = s.send(worker.listening_for.contains_key(&key));
})))).unwrap();
handle.sender.unbounded_send(listen_msg_2).unwrap();
runtime.block_on(r2).unwrap().unwrap();
// The gossip sender for this topic left intact => listener not registered.
assert!(messages.messages.lock().unwrap().contains_key(&topic_2));
let (s1, r1) = oneshot::channel();
// Tell the worker to listen for chunks from candidate 1.
// (we don't have a chunk from it yet).
let listen_msg_1 = WorkerMsg::ListenForChunks(ListenForChunks {
candidate_hash: candidate_1_hash,
index: local_id as u32,
result: Some(s1),
});
handle.sender.unbounded_send(listen_msg_1).unwrap();
runtime.block_on(r1).unwrap().unwrap();
// Here, we are racing against the worker thread that might have not yet
// reached the point when it requests the gossip messages for `topic_2`
// which will get them removed from `TestGossipMessages`. Therefore, the
// `Condvar` is used to wait for that event.
let (lock, cvar1) = &*cvar_pair1;
let mut gossip_stream_consumed = lock.lock().unwrap();
while !*gossip_stream_consumed {
gossip_stream_consumed = cvar1.wait(gossip_stream_consumed).unwrap();
assert!(!runtime.block_on(r).unwrap());
}
// The gossip sender taken => listener registered.
assert!(!messages.messages.lock().unwrap().contains_key(&topic_1));
{
let (s, r) = oneshot::channel();
// Tell the worker to listen for chunks from candidate 1.
// (we don't have a chunk from it yet).
let listen_msg_1 = WorkerMsg::IncludedParachainBlocks(IncludedParachainBlocks {
blocks: vec![IncludedParachainBlock {
candidate: candidate_1,
available_data: None,
}],
result: s,
});
handle.sender.unbounded_send(listen_msg_1).unwrap();
runtime.block_on(r).unwrap().unwrap();
// The receiver taken => listener registered.
assert!(!network.chunk_receivers.lock().contains_key(&(candidate_1_hash, local_id)));
// more directly:
let (s, r) = oneshot::channel();
handle.sender.unbounded_send(WorkerMsg::WithWorker(WithWorker(Box::new(move |worker| {
let key = ListeningKey {
candidate_hash: candidate_1_hash,
index: local_id,
};
let _ = s.send(worker.listening_for.contains_key(&key));
})))).unwrap();
assert!(runtime.block_on(r).unwrap());
}
}
}