diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 046d2ac453..cfc4f6ed86 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -4579,6 +4579,25 @@ dependencies = [ "sp-runtime", ] +[[package]] +name = "polkadot-node-core-av-store" +version = "0.1.0" +dependencies = [ + "assert_matches", + "derive_more 0.99.9", + "futures 0.3.5", + "kvdb", + "kvdb-memorydb", + "kvdb-rocksdb", + "log 0.4.8", + "parity-scale-codec", + "polkadot-erasure-coding", + "polkadot-node-subsystem", + "polkadot-overseer", + "polkadot-primitives", + "sp-core", +] + [[package]] name = "polkadot-node-core-backing" version = "0.1.0" diff --git a/polkadot/Cargo.toml b/polkadot/Cargo.toml index a18c98f83d..f595fef855 100644 --- a/polkadot/Cargo.toml +++ b/polkadot/Cargo.toml @@ -45,6 +45,7 @@ members = [ "node/core/bitfield-signing", "node/core/proposer", + "node/core/av-store", "node/network/bridge", "node/network/pov-distribution", "node/network/statement-distribution", diff --git a/polkadot/node/core/av-store/Cargo.toml b/polkadot/node/core/av-store/Cargo.toml new file mode 100644 index 0000000000..e89ef7a6ec --- /dev/null +++ b/polkadot/node/core/av-store/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "polkadot-node-core-av-store" +version = "0.1.0" +authors = ["Parity Technologies "] +edition = "2018" + +[dependencies] +futures = "0.3.5" +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } +polkadot-overseer = { path = "../../overseer" } +polkadot-primitives = { path = "../../../primitives" } +erasure = { package = "polkadot-erasure-coding", path = "../../../erasure-coding" } +kvdb = "0.7.0" +kvdb-rocksdb = "0.9.0" +codec = { package = "parity-scale-codec", version = "1.3.1", features = ["derive"] } +log = "0.4.8" +derive_more = "0.99.9" + +[dev-dependencies] +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } +futures = { version = "0.3.5", features = ["thread-pool"] } +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem", features = [ "test-helpers" ] } +kvdb-memorydb = "0.7.0" +assert_matches = "1.3.0" diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs new file mode 100644 index 0000000000..386ff99f27 --- /dev/null +++ b/polkadot/node/core/av-store/src/lib.rs @@ -0,0 +1,553 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Implements a `AvailabilityStoreSubsystem`. + +#![recursion_limit="256"] +#![warn(missing_docs)] + +use std::collections::HashMap; +use std::io; +use std::path::PathBuf; +use std::sync::Arc; + +use codec::{Encode, Decode}; +use futures::{select, channel::oneshot, FutureExt}; +use kvdb_rocksdb::{Database, DatabaseConfig}; +use kvdb::{KeyValueDB, DBTransaction}; + +use polkadot_primitives::v1::{ + Hash, AvailableData, ErasureChunk, ValidatorIndex, +}; +use polkadot_subsystem::{ + FromOverseer, SubsystemError, Subsystem, SubsystemContext, SpawnedSubsystem, +}; +use polkadot_subsystem::messages::AvailabilityStoreMessage; + +const LOG_TARGET: &str = "availability"; + +mod columns { + pub const DATA: u32 = 0; + pub const NUM_COLUMNS: u32 = 1; +} + +#[derive(Debug, derive_more::From)] +enum Error { + #[from] + Erasure(erasure::Error), + #[from] + Io(io::Error), + #[from] + Oneshot(oneshot::Canceled), + #[from] + Subsystem(SubsystemError), +} + +/// An implementation of the Availability Store subsystem. +pub struct AvailabilityStoreSubsystem { + inner: Arc, +} + +fn available_data_key(candidate_hash: &Hash) -> Vec { + (candidate_hash, 0i8).encode() +} + +fn erasure_chunk_key(candidate_hash: &Hash, index: u32) -> Vec { + (candidate_hash, index, 0i8).encode() +} + +#[derive(Encode, Decode)] +struct StoredAvailableData { + data: AvailableData, + n_validators: u32, +} + +/// Configuration for the availability store. +pub struct Config { + /// Total cache size in megabytes. If `None` the default (128 MiB per column) is used. + pub cache_size: Option, + /// Path to the database. + pub path: PathBuf, +} + +impl AvailabilityStoreSubsystem { + /// Create a new `AvailabilityStoreSubsystem` with a given config on disk. + pub fn new_on_disk(config: Config) -> io::Result { + let mut db_config = DatabaseConfig::with_columns(columns::NUM_COLUMNS); + + if let Some(cache_size) = config.cache_size { + let mut memory_budget = HashMap::new(); + + for i in 0..columns::NUM_COLUMNS { + memory_budget.insert(i, cache_size / columns::NUM_COLUMNS as usize); + } + db_config.memory_budget = memory_budget; + } + + let path = config.path.to_str().ok_or_else(|| io::Error::new( + io::ErrorKind::Other, + format!("Bad database path: {:?}", config.path), + ))?; + + let db = Database::open(&db_config, &path)?; + + Ok(Self { + inner: Arc::new(db), + }) + } + + #[cfg(test)] + fn new_in_memory(inner: Arc) -> Self { + Self { + inner, + } + } +} + +async fn run(subsystem: AvailabilityStoreSubsystem, mut ctx: Context) + -> Result<(), Error> +where + Context: SubsystemContext, +{ + let ctx = &mut ctx; + loop { + select! { + incoming = ctx.recv().fuse() => { + match incoming { + Ok(FromOverseer::Signal(Conclude)) => break, + Ok(FromOverseer::Signal(_)) => (), + Ok(FromOverseer::Communication { msg }) => { + process_message(&subsystem.inner, msg)?; + } + Err(_) => break, + } + } + complete => break, + } + } + + Ok(()) +} + +fn process_message(db: &Arc, msg: AvailabilityStoreMessage) -> Result<(), Error> { + use AvailabilityStoreMessage::*; + match msg { + QueryAvailableData(hash, tx) => { + tx.send(available_data(db, &hash).map(|d| d.data)).map_err(|_| oneshot::Canceled)?; + } + QueryDataAvailability(hash, tx) => { + let result = match available_data(db, &hash) { + Some(_) => true, + None => false, + }; + + tx.send(result).map_err(|_| oneshot::Canceled)?; + } + QueryChunk(hash, id, tx) => { + tx.send(get_chunk(db, &hash, id)?).map_err(|_| oneshot::Canceled)?; + } + StoreChunk(hash, id, chunk, tx) => { + match store_chunk(db, &hash, id, chunk) { + Err(e) => { + tx.send(Err(())).map_err(|_| oneshot::Canceled)?; + return Err(e); + } + Ok(()) => { + tx.send(Ok(())).map_err(|_| oneshot::Canceled)?; + } + } + } + StoreAvailableData(hash, id, n_validators, av_data, tx) => { + match store_available_data(db, &hash, id, n_validators, av_data) { + Err(e) => { + tx.send(Err(())).map_err(|_| oneshot::Canceled)?; + return Err(e); + } + Ok(()) => { + tx.send(Ok(())).map_err(|_| oneshot::Canceled)?; + } + } + } + } + + Ok(()) +} + +fn available_data(db: &Arc, candidate_hash: &Hash) -> Option { + query_inner(db, columns::DATA, &available_data_key(candidate_hash)) +} + +fn store_available_data( + db: &Arc, + candidate_hash: &Hash, + id: Option, + n_validators: u32, + available_data: AvailableData, +) -> Result<(), Error> { + let mut tx = DBTransaction::new(); + + if let Some(index) = id { + let chunks = get_chunks(&available_data, n_validators as usize)?; + store_chunk(db, candidate_hash, n_validators, chunks[index as usize].clone())?; + } + + let stored_data = StoredAvailableData { + data: available_data, + n_validators, + }; + + tx.put_vec( + columns::DATA, + available_data_key(&candidate_hash).as_slice(), + stored_data.encode(), + ); + + db.write(tx)?; + + Ok(()) +} + +fn store_chunk(db: &Arc, candidate_hash: &Hash, _n_validators: u32, chunk: ErasureChunk) + -> Result<(), Error> +{ + let mut tx = DBTransaction::new(); + + let dbkey = erasure_chunk_key(candidate_hash, chunk.index); + + tx.put_vec(columns::DATA, &dbkey, chunk.encode()); + db.write(tx)?; + + Ok(()) +} + +fn get_chunk(db: &Arc, candidate_hash: &Hash, index: u32) + -> Result, Error> +{ + if let Some(chunk) = query_inner( + db, + columns::DATA, + &erasure_chunk_key(candidate_hash, index)) { + return Ok(Some(chunk)); + } + + if let Some(data) = available_data(db, candidate_hash) { + let mut chunks = get_chunks(&data.data, data.n_validators as usize)?; + let desired_chunk = chunks.get(index as usize).cloned(); + for chunk in chunks.drain(..) { + store_chunk(db, candidate_hash, data.n_validators, chunk)?; + } + return Ok(desired_chunk); + } + + Ok(None) +} + +fn query_inner(db: &Arc, column: u32, key: &[u8]) -> Option { + match db.get(column, key) { + Ok(Some(raw)) => { + let res = D::decode(&mut &raw[..]).expect("all stored data serialized correctly; qed"); + Some(res) + } + Ok(None) => None, + Err(e) => { + log::warn!(target: LOG_TARGET, "Error reading from the availability store: {:?}", e); + None + } + } +} + +impl Subsystem for AvailabilityStoreSubsystem + where + Context: SubsystemContext, +{ + fn start(self, ctx: Context) -> SpawnedSubsystem { + let future = Box::pin(async move { + if let Err(e) = run(self, ctx).await { + log::error!(target: "availabilitystore", "Subsystem exited with an error {:?}", e); + } + }); + + SpawnedSubsystem { + name: "availability-store-subsystem", + future, + } + } +} + +fn get_chunks(data: &AvailableData, n_validators: usize) -> Result, Error> { + let chunks = erasure::obtain_chunks_v1(n_validators, data)?; + let branches = erasure::branches(chunks.as_ref()); + + Ok(chunks + .iter() + .zip(branches.map(|(proof, _)| proof)) + .enumerate() + .map(|(index, (chunk, proof))| ErasureChunk { + chunk: chunk.clone(), + proof, + index: index as u32, + }) + .collect() + ) +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::{ + future, + channel::oneshot, + executor, + Future, + }; + use std::cell::RefCell; + use polkadot_primitives::v1::{ + AvailableData, BlockData, HeadData, GlobalValidationData, LocalValidationData, PoV, + OmittedValidationData, + }; + use polkadot_subsystem::test_helpers; + + struct TestHarness { + virtual_overseer: test_helpers::TestSubsystemContextHandle, + } + + thread_local! { + static TIME_NOW: RefCell> = RefCell::new(None); + } + + struct TestState { + global_validation_schedule: GlobalValidationData, + local_validation_data: LocalValidationData, + } + + impl Default for TestState { + fn default() -> Self { + + let local_validation_data = LocalValidationData { + parent_head: HeadData(vec![7, 8, 9]), + balance: Default::default(), + code_upgrade_allowed: None, + validation_code_hash: Default::default(), + }; + + let global_validation_schedule = GlobalValidationData { + max_code_size: 1000, + max_head_data_size: 1000, + block_number: Default::default(), + }; + + Self { + local_validation_data, + global_validation_schedule, + } + } + } + + fn test_harness>( + store: Arc, + test: impl FnOnce(TestHarness) -> T, + ) { + let pool = sp_core::testing::TaskExecutor::new(); + let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); + + let subsystem = AvailabilityStoreSubsystem::new_in_memory(store); + let subsystem = run(subsystem, context); + + let test_fut = test(TestHarness { + virtual_overseer, + }); + + futures::pin_mut!(test_fut); + futures::pin_mut!(subsystem); + + executor::block_on(future::select(test_fut, subsystem)); + } + + #[test] + fn store_chunk_works() { + let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); + test_harness(store.clone(), |test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + let relay_parent = Hash::from([1; 32]); + let validator_index = 5; + + let chunk = ErasureChunk { + chunk: vec![1, 2, 3], + index: validator_index, + proof: vec![vec![3, 4, 5]], + }; + + let (tx, rx) = oneshot::channel(); + + let chunk_msg = AvailabilityStoreMessage::StoreChunk( + relay_parent, + validator_index, + chunk.clone(), + tx, + ); + + virtual_overseer.send(FromOverseer::Communication{ msg: chunk_msg }).await; + assert_eq!(rx.await.unwrap(), Ok(())); + + let (tx, rx) = oneshot::channel(); + let query_chunk = AvailabilityStoreMessage::QueryChunk( + relay_parent, + validator_index, + tx, + ); + + virtual_overseer.send(FromOverseer::Communication{ msg: query_chunk }).await; + + assert_eq!(rx.await.unwrap().unwrap(), chunk); + }); + } + + #[test] + fn store_block_works() { + let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); + let test_state = TestState::default(); + test_harness(store.clone(), |test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + let candidate_hash = Hash::from([1; 32]); + let validator_index = 5; + let n_validators = 10; + + let pov = PoV { + block_data: BlockData(vec![4, 5, 6]), + }; + + let global_validation = test_state.global_validation_schedule; + let local_validation = test_state.local_validation_data; + + let omitted_validation = OmittedValidationData { + global_validation, + local_validation, + }; + + let available_data = AvailableData { + pov, + omitted_validation, + }; + + + let (tx, rx) = oneshot::channel(); + let block_msg = AvailabilityStoreMessage::StoreAvailableData( + candidate_hash, + Some(validator_index), + n_validators, + available_data.clone(), + tx, + ); + + virtual_overseer.send(FromOverseer::Communication{ msg: block_msg }).await; + assert_eq!(rx.await.unwrap(), Ok(())); + + let pov = query_available_data(&mut virtual_overseer, candidate_hash).await.unwrap(); + assert_eq!(pov, available_data); + + let chunk = query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(); + + let chunks = erasure::obtain_chunks_v1(10, &available_data).unwrap(); + + let mut branches = erasure::branches(chunks.as_ref()); + + let branch = branches.nth(5).unwrap(); + let expected_chunk = ErasureChunk { + chunk: branch.1.to_vec(), + index: 5, + proof: branch.0, + }; + + assert_eq!(chunk, expected_chunk); + }); + } + + + #[test] + fn store_pov_and_query_chunk_works() { + let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS)); + let test_state = TestState::default(); + + test_harness(store.clone(), |test_harness| async move { + let TestHarness { mut virtual_overseer } = test_harness; + let candidate_hash = Hash::from([1; 32]); + let n_validators = 10; + + let pov = PoV { + block_data: BlockData(vec![4, 5, 6]), + }; + + let global_validation = test_state.global_validation_schedule; + let local_validation = test_state.local_validation_data; + + let omitted_validation = OmittedValidationData { + global_validation, + local_validation, + }; + + let available_data = AvailableData { + pov, + omitted_validation, + }; + + let chunks_expected = get_chunks(&available_data, n_validators as usize).unwrap(); + + let (tx, rx) = oneshot::channel(); + let block_msg = AvailabilityStoreMessage::StoreAvailableData( + candidate_hash, + None, + n_validators, + available_data, + tx, + ); + + virtual_overseer.send(FromOverseer::Communication{ msg: block_msg }).await; + + assert_eq!(rx.await.unwrap(), Ok(())); + + for validator_index in 0..n_validators { + let chunk = query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap(); + + assert_eq!(chunk, chunks_expected[validator_index as usize]); + } + }); + } + + async fn query_available_data( + virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, + candidate_hash: Hash, + ) -> Option { + let (tx, rx) = oneshot::channel(); + + let query = AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx); + virtual_overseer.send(FromOverseer::Communication{ msg: query }).await; + + rx.await.unwrap() + } + + async fn query_chunk( + virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, + candidate_hash: Hash, + index: u32, + ) -> Option { + let (tx, rx) = oneshot::channel(); + + let query = AvailabilityStoreMessage::QueryChunk(candidate_hash, index, tx); + virtual_overseer.send(FromOverseer::Communication{ msg: query }).await; + + rx.await.unwrap() + } +} diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 89da1716dc..837bcac767 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -31,7 +31,7 @@ use keystore::KeyStorePtr; use polkadot_primitives::v1::{ CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorId, ValidatorIndex, SigningContext, PoV, OmittedValidationData, - CandidateDescriptor, AvailableData, ErasureChunk, ValidatorSignature, Hash, CandidateReceipt, + CandidateDescriptor, AvailableData, ValidatorSignature, Hash, CandidateReceipt, CandidateCommitments, }; use polkadot_node_primitives::{ @@ -67,6 +67,7 @@ use statement_table::{ enum Error { CandidateNotFound, InvalidSignature, + StoreFailed, #[from] Erasure(erasure_coding::Error), #[from] @@ -581,20 +582,30 @@ impl CandidateBackingJob { Ok(rx.await??) } - async fn store_chunk( + async fn store_available_data( &mut self, - id: ValidatorIndex, - chunk: ErasureChunk, + id: Option, + n_validators: u32, + available_data: AvailableData, ) -> Result<(), Error> { + let (tx, rx) = oneshot::channel(); self.tx_from.send(FromJob::AvailabilityStore( - AvailabilityStoreMessage::StoreChunk(self.parent, id, chunk) + AvailabilityStoreMessage::StoreAvailableData( + self.parent, + id, + n_validators, + available_data, + tx, + ) ) ).await?; + rx.await?.map_err(|_| Error::StoreFailed)?; + Ok(()) } - // Compute the erasure-coding and make it available. + // Make a `PoV` available. // // This calls an inspection function before making the PoV available for any last checks // that need to be done. If the inspection function returns an error, this function returns @@ -636,15 +647,11 @@ impl CandidateBackingJob { Err(e) => return Ok(Err(e)), }; - for (index, (proof, chunk)) in branches.enumerate() { - let chunk = ErasureChunk { - chunk: chunk.to_vec(), - index: index as u32, - proof, - }; - - self.store_chunk(index as ValidatorIndex, chunk).await?; - } + self.store_available_data( + self.table_context.validator.as_ref().map(|v| v.index()), + self.table_context.validators.len() as u32, + available_data, + ).await?; Ok(Ok(res)) } @@ -1059,14 +1066,14 @@ mod tests { } ); - for _ in 0..test_state.validators.len() { - assert_matches!( - virtual_overseer.recv().await, - AllMessages::AvailabilityStore( - AvailabilityStoreMessage::StoreChunk(parent_hash, _, _) - ) if parent_hash == test_state.relay_parent - ); - } + assert_matches!( + virtual_overseer.recv().await, + AllMessages::AvailabilityStore( + AvailabilityStoreMessage::StoreAvailableData(parent_hash, _, _, _, tx) + ) if parent_hash == test_state.relay_parent => { + tx.send(Ok(())).unwrap(); + } + ); assert_matches!( virtual_overseer.recv().await, @@ -1171,6 +1178,15 @@ mod tests { } ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::AvailabilityStore( + AvailabilityStoreMessage::StoreAvailableData(parent_hash, _, _, _, tx) + ) if parent_hash == test_state.relay_parent => { + tx.send(Ok(())).unwrap(); + } + ); + let statement = CandidateBackingMessage::Statement( test_state.relay_parent, signed_b.clone(), @@ -1292,14 +1308,14 @@ mod tests { } ); - for _ in 0..test_state.validators.len() { - assert_matches!( - virtual_overseer.recv().await, - AllMessages::AvailabilityStore( - AvailabilityStoreMessage::StoreChunk(parent_hash, _, _) - ) if parent_hash == test_state.relay_parent - ); - } + assert_matches!( + virtual_overseer.recv().await, + AllMessages::AvailabilityStore( + AvailabilityStoreMessage::StoreAvailableData(parent_hash, _, _, _, tx) + ) if parent_hash == test_state.relay_parent => { + tx.send(Ok(())).unwrap(); + } + ); assert_matches!( virtual_overseer.recv().await, @@ -1449,14 +1465,14 @@ mod tests { } ); - for _ in 0..test_state.validators.len() { - assert_matches!( - virtual_overseer.recv().await, - AllMessages::AvailabilityStore( - AvailabilityStoreMessage::StoreChunk(parent_hash, _, _) - ) if parent_hash == test_state.relay_parent - ); - } + assert_matches!( + virtual_overseer.recv().await, + AllMessages::AvailabilityStore( + AvailabilityStoreMessage::StoreAvailableData(parent_hash, _, _, _, tx) + ) if parent_hash == test_state.relay_parent => { + tx.send(Ok(())).unwrap(); + } + ); assert_matches!( virtual_overseer.recv().await, diff --git a/polkadot/node/core/bitfield-signing/src/lib.rs b/polkadot/node/core/bitfield-signing/src/lib.rs index b973f4b3ae..806e3c1418 100644 --- a/polkadot/node/core/bitfield-signing/src/lib.rs +++ b/polkadot/node/core/bitfield-signing/src/lib.rs @@ -135,7 +135,7 @@ async fn get_core_availability( sender: &mpsc::Sender, ) -> Result { use messages::{ - AvailabilityStoreMessage::QueryPoVAvailable, + AvailabilityStoreMessage::QueryDataAvailability, RuntimeApiRequest::CandidatePendingAvailability, }; use FromJob::{AvailabilityStore, RuntimeApi}; @@ -159,7 +159,7 @@ async fn get_core_availability( }; let (tx, rx) = oneshot::channel(); sender - .send(AvailabilityStore(QueryPoVAvailable( + .send(AvailabilityStore(QueryDataAvailability( committed_candidate_receipt.descriptor.pov_hash, tx, ))) diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs index 9517504c1f..11c9563cb0 100644 --- a/polkadot/node/network/bitfield-distribution/src/lib.rs +++ b/polkadot/node/network/bitfield-distribution/src/lib.rs @@ -179,6 +179,9 @@ impl BitfieldDistribution { // defer the cleanup to the view change } } + FromOverseer::Signal(OverseerSignal::BlockFinalized(hash)) => { + trace!(target: "bitd", "Block finalized {:?}", hash); + } FromOverseer::Signal(OverseerSignal::Conclude) => { trace!(target: "bitd", "Conclude"); return Ok(()); diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index a1f5694568..56117732f9 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -213,6 +213,7 @@ enum Action { PeerMessages(PeerId, Vec), Abort, + Nop, } fn action_from_overseer_message( @@ -229,6 +230,8 @@ fn action_from_overseer_message( NetworkBridgeMessage::SendMessage(peers, protocol, message) => Action::SendMessage(peers, protocol, message), }, + Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_))) + => Action::Nop, Err(e) => { log::warn!(target: TARGET, "Shutting down Network Bridge due to error {:?}", e); Action::Abort @@ -502,6 +505,7 @@ async fn run_network( }, Action::Abort => return Ok(()), + Action::Nop => (), } } } diff --git a/polkadot/node/network/pov-distribution/src/lib.rs b/polkadot/node/network/pov-distribution/src/lib.rs index 55e3755266..24f14df3b0 100644 --- a/polkadot/node/network/pov-distribution/src/lib.rs +++ b/polkadot/node/network/pov-distribution/src/lib.rs @@ -128,6 +128,7 @@ async fn handle_signal( Ok(false) } + OverseerSignal::BlockFinalized(_) => Ok(false), } } diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 5d9445a920..86a3efbe91 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -864,6 +864,9 @@ async fn run( .or_insert(ActiveHeadData::new(validators, session_index)); } } + FromOverseer::Signal(OverseerSignal::BlockFinalized(_block_hash)) => { + // do nothing + } FromOverseer::Signal(OverseerSignal::Conclude) => break, FromOverseer::Communication { msg } => match msg { StatementDistributionMessage::Share(relay_parent, statement) => diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index d7b8b1df6f..9aa3468c0d 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -808,6 +808,8 @@ where self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash)).await?; + Ok(()) } @@ -1388,6 +1390,7 @@ mod tests { deactivated: [first_block_hash, second_block_hash].as_ref().into(), ..Default::default() }), + OverseerSignal::BlockFinalized(third_block_hash), ]; loop { diff --git a/polkadot/node/subsystem/src/lib.rs b/polkadot/node/subsystem/src/lib.rs index 0bdc9ffcf8..6ed552a3f5 100644 --- a/polkadot/node/subsystem/src/lib.rs +++ b/polkadot/node/subsystem/src/lib.rs @@ -84,6 +84,8 @@ impl PartialEq for ActiveLeavesUpdate { pub enum OverseerSignal { /// Subsystems should adjust their jobs to start and stop work on appropriate block hashes. ActiveLeaves(ActiveLeavesUpdate), + /// `Subsystem` is informed of a finalized block by its block hash. + BlockFinalized(Hash), /// Conclude the work of the `Overseer` and all `Subsystem`s. Conclude, } diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index a2f0bf8bbc..710ba77ce9 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -25,11 +25,11 @@ use futures::channel::{mpsc, oneshot}; use polkadot_primitives::v1::{ - BlockNumber, Hash, - CandidateReceipt, CommittedCandidateReceipt, PoV, ErasureChunk, BackedCandidate, Id as ParaId, + BlockNumber, Hash, CommittedCandidateReceipt, + CandidateReceipt, PoV, ErasureChunk, BackedCandidate, Id as ParaId, SignedAvailabilityBitfield, SigningContext, ValidatorId, ValidationCode, ValidatorIndex, CoreAssignment, CoreOccupied, HeadData, CandidateDescriptor, - ValidatorSignature, OmittedValidationData, + ValidatorSignature, OmittedValidationData, AvailableData, }; use polkadot_node_primitives::{ MisbehaviorReport, SignedFullStatement, View, ProtocolId, ValidationResult, @@ -235,31 +235,40 @@ impl BitfieldSigningMessage { /// Availability store subsystem message. #[derive(Debug)] pub enum AvailabilityStoreMessage { - /// Query a `PoV` from the AV store. - QueryPoV(Hash, oneshot::Sender>), + /// Query a `AvailableData` from the AV store. + QueryAvailableData(Hash, oneshot::Sender>), - /// Query whether a `PoV` exists within the AV Store. + /// Query whether a `AvailableData` exists within the AV Store. /// /// This is useful in cases like bitfield signing, when existence /// matters, but we don't want to necessarily pass around multiple /// megabytes of data to get a single bit of information. - QueryPoVAvailable(Hash, oneshot::Sender), + QueryDataAvailability(Hash, oneshot::Sender), /// Query an `ErasureChunk` from the AV store. - QueryChunk(Hash, ValidatorIndex, oneshot::Sender), + QueryChunk(Hash, ValidatorIndex, oneshot::Sender>), /// Store an `ErasureChunk` in the AV store. - StoreChunk(Hash, ValidatorIndex, ErasureChunk), + /// + /// Return `Ok(())` if the store operation succeeded, `Err(())` if it failed. + StoreChunk(Hash, ValidatorIndex, ErasureChunk, oneshot::Sender>), + + /// Store a `AvailableData` in the AV store. + /// If `ValidatorIndex` is present store corresponding chunk also. + /// + /// Return `Ok(())` if the store operation succeeded, `Err(())` if it failed. + StoreAvailableData(Hash, Option, u32, AvailableData, oneshot::Sender>), } impl AvailabilityStoreMessage { /// If the current variant contains the relay parent hash, return it. pub fn relay_parent(&self) -> Option { match self { - Self::QueryPoV(hash, _) => Some(*hash), - Self::QueryPoVAvailable(hash, _) => Some(*hash), + Self::QueryAvailableData(hash, _) => Some(*hash), + Self::QueryDataAvailability(hash, _) => Some(*hash), Self::QueryChunk(hash, _, _) => Some(*hash), - Self::StoreChunk(hash, _, _) => Some(*hash), + Self::StoreChunk(hash, _, _, _) => Some(*hash), + Self::StoreAvailableData(hash, _, _, _, _) => Some(*hash), } } } diff --git a/polkadot/node/subsystem/src/util.rs b/polkadot/node/subsystem/src/util.rs index d37e46ad05..ced6b21bd9 100644 --- a/polkadot/node/subsystem/src/util.rs +++ b/polkadot/node/subsystem/src/util.rs @@ -603,7 +603,7 @@ where ) -> bool { use crate::FromOverseer::{Communication, Signal}; use crate::ActiveLeavesUpdate; - use crate::OverseerSignal::{Conclude, ActiveLeaves}; + use crate::OverseerSignal::{BlockFinalized, Conclude, ActiveLeaves}; match incoming { Ok(Signal(ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }))) => { @@ -667,6 +667,7 @@ where } } } + Ok(Signal(BlockFinalized(_))) => {} Err(err) => { log::error!("error receiving message from subsystem context: {:?}", err); Self::fwd_err(None, Error::from(err).into(), err_tx).await; diff --git a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md index d777460176..d6d62fbdaf 100644 --- a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -59,14 +59,19 @@ Messages to and from the availability store. ```rust enum AvailabilityStoreMessage { - /// Query the PoV of a candidate by hash. - QueryPoV(Hash, ResponseChannel), + /// Query the `AvailableData` of a candidate by hash. + QueryAvailableData(Hash, ResponseChannel), + /// Query whether an `AvailableData` exists within the AV Store. + QueryDataAvailability(Hash, ResponseChannel), /// Query a specific availability chunk of the candidate's erasure-coding by validator index. /// Returns the chunk and its inclusion proof against the candidate's erasure-root. QueryChunk(Hash, ValidatorIndex, ResponseChannel), /// Store a specific chunk of the candidate's erasure-coding by validator index, with an /// accompanying proof. - StoreChunk(Hash, ValidatorIndex, AvailabilityChunkAndProof), + StoreChunk(Hash, ValidatorIndex, AvailabilityChunkAndProof, ResponseChannel>), + /// Store `AvailableData`. If `ValidatorIndex` is provided, also store this validator's + /// `AvailabilityChunkAndProof`. + StoreAvailableData(Hash, Option, u32, AvailableData, ResponseChannel>), } ```