mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-20 03:31:03 +00:00
Introduce a Proof-of-Validation block type and use that in place of BlockData (#227)
* validators expect collators to give them parachain messages * mostly port network to use pov_block * network tests pass * verify ingress when fetching pov block * fix runtime compilation * all tests build * fix some grumbles * Update validation/src/collation.rs Co-Authored-By: rphmeier <rphmeier@gmail.com> * Update primitives/src/parachain.rs Co-Authored-By: rphmeier <rphmeier@gmail.com> * Update network/src/lib.rs Co-Authored-By: rphmeier <rphmeier@gmail.com>
This commit is contained in:
committed by
GitHub
parent
2bbfa0ae98
commit
1437c8e224
@@ -220,9 +220,18 @@ impl CollatorPool {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use substrate_primitives::crypto::UncheckedInto;
|
||||
use polkadot_primitives::parachain::{CandidateReceipt, BlockData, HeadData};
|
||||
use polkadot_primitives::parachain::{
|
||||
CandidateReceipt, BlockData, PoVBlock, HeadData, ConsolidatedIngress,
|
||||
};
|
||||
use futures::Future;
|
||||
|
||||
fn make_pov(block_data: Vec<u8>) -> PoVBlock {
|
||||
PoVBlock {
|
||||
block_data: BlockData(block_data),
|
||||
ingress: ConsolidatedIngress(Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn disconnect_primary_gives_new_primary() {
|
||||
let mut pool = CollatorPool::new();
|
||||
@@ -272,7 +281,7 @@ mod tests {
|
||||
fees: 0,
|
||||
block_data_hash: [3; 32].into(),
|
||||
},
|
||||
block_data: BlockData(vec![4, 5, 6]),
|
||||
pov: make_pov(vec![4, 5, 6]),
|
||||
});
|
||||
|
||||
rx1.wait().unwrap();
|
||||
@@ -299,7 +308,7 @@ mod tests {
|
||||
fees: 0,
|
||||
block_data_hash: [3; 32].into(),
|
||||
},
|
||||
block_data: BlockData(vec![4, 5, 6]),
|
||||
pov: make_pov(vec![4, 5, 6]),
|
||||
});
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
+77
-21
@@ -56,7 +56,10 @@ pub mod gossip;
|
||||
use codec::{Decode, Encode};
|
||||
use futures::sync::oneshot;
|
||||
use polkadot_primitives::{Block, SessionKey, Hash, Header};
|
||||
use polkadot_primitives::parachain::{Id as ParaId, CollatorId, BlockData, CandidateReceipt, Collation};
|
||||
use polkadot_primitives::parachain::{
|
||||
Id as ParaId, BlockData, CollatorId, CandidateReceipt, Collation, PoVBlock,
|
||||
ConsolidatedIngressRoots,
|
||||
};
|
||||
use substrate_network::{PeerId, RequestId, Context, Severity};
|
||||
use substrate_network::{message, generic_message};
|
||||
use substrate_network::specialization::NetworkSpecialization as Specialization;
|
||||
@@ -84,12 +87,33 @@ pub struct Status {
|
||||
collating_for: Option<(CollatorId, ParaId)>,
|
||||
}
|
||||
|
||||
struct BlockDataRequest {
|
||||
struct PoVBlockRequest {
|
||||
attempted_peers: HashSet<SessionKey>,
|
||||
validation_session_parent: Hash,
|
||||
candidate_hash: Hash,
|
||||
block_data_hash: Hash,
|
||||
sender: oneshot::Sender<BlockData>,
|
||||
sender: oneshot::Sender<PoVBlock>,
|
||||
canon_roots: ConsolidatedIngressRoots,
|
||||
}
|
||||
|
||||
impl PoVBlockRequest {
|
||||
// Attempt to process a response. If the provided block is invalid,
|
||||
// this returns an error result containing the unmodified request.
|
||||
//
|
||||
// If `Ok(())` is returned, that indicates that the request has been processed.
|
||||
fn process_response(self, pov_block: PoVBlock) -> Result<(), Self> {
|
||||
if pov_block.block_data.hash() != self.block_data_hash {
|
||||
return Err(self);
|
||||
}
|
||||
|
||||
match polkadot_validation::validate_incoming(&self.canon_roots, &pov_block.ingress) {
|
||||
Ok(()) => {
|
||||
let _ = self.sender.send(pov_block);
|
||||
Ok(())
|
||||
}
|
||||
Err(_) => Err(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ensures collator-protocol messages are sent in correct order.
|
||||
@@ -147,9 +171,13 @@ pub enum Message {
|
||||
// TODO: do this with a cryptographic proof of some kind
|
||||
// https://github.com/paritytech/polkadot/issues/47
|
||||
SessionKey(SessionKey),
|
||||
/// Requesting parachain block data by (relay_parent, candidate_hash).
|
||||
/// Requesting parachain proof-of-validation block (relay_parent, candidate_hash).
|
||||
RequestPovBlock(RequestId, Hash, Hash),
|
||||
/// Provide requested proof-of-validation block data by candidate hash or nothing if unknown.
|
||||
PovBlock(RequestId, Option<PoVBlock>),
|
||||
/// Request block data (relay_parent, candidate_hash)
|
||||
RequestBlockData(RequestId, Hash, Hash),
|
||||
/// Provide block data by candidate hash or nothing if unknown.
|
||||
/// Provide requested block data by candidate hash or nothing.
|
||||
BlockData(RequestId, Option<BlockData>),
|
||||
/// Tell a collator their role.
|
||||
CollatorRole(Role),
|
||||
@@ -171,8 +199,8 @@ pub struct PolkadotProtocol {
|
||||
validators: HashMap<SessionKey, PeerId>,
|
||||
local_collations: LocalCollations<Collation>,
|
||||
live_validation_sessions: LiveValidationSessions,
|
||||
in_flight: HashMap<(RequestId, PeerId), BlockDataRequest>,
|
||||
pending: Vec<BlockDataRequest>,
|
||||
in_flight: HashMap<(RequestId, PeerId), PoVBlockRequest>,
|
||||
pending: Vec<PoVBlockRequest>,
|
||||
extrinsic_store: Option<::av_store::Store>,
|
||||
next_req_id: u64,
|
||||
}
|
||||
@@ -195,15 +223,22 @@ impl PolkadotProtocol {
|
||||
}
|
||||
|
||||
/// Fetch block data by candidate receipt.
|
||||
fn fetch_block_data(&mut self, ctx: &mut Context<Block>, candidate: &CandidateReceipt, relay_parent: Hash) -> oneshot::Receiver<BlockData> {
|
||||
fn fetch_pov_block(
|
||||
&mut self,
|
||||
ctx: &mut Context<Block>,
|
||||
candidate: &CandidateReceipt,
|
||||
relay_parent: Hash,
|
||||
canon_roots: ConsolidatedIngressRoots,
|
||||
) -> oneshot::Receiver<PoVBlock> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
self.pending.push(BlockDataRequest {
|
||||
self.pending.push(PoVBlockRequest {
|
||||
attempted_peers: Default::default(),
|
||||
validation_session_parent: relay_parent,
|
||||
candidate_hash: candidate.hash(),
|
||||
block_data_hash: candidate.block_data_hash,
|
||||
sender: tx,
|
||||
canon_roots,
|
||||
});
|
||||
|
||||
self.dispatch_pending_requests(ctx);
|
||||
@@ -250,7 +285,7 @@ impl PolkadotProtocol {
|
||||
let parent = pending.validation_session_parent;
|
||||
let c_hash = pending.candidate_hash;
|
||||
|
||||
let still_pending = self.live_validation_sessions.with_block_data(&parent, &c_hash, |x| match x {
|
||||
let still_pending = self.live_validation_sessions.with_pov_block(&parent, &c_hash, |x| match x {
|
||||
Ok(data @ &_) => {
|
||||
// answer locally.
|
||||
let _ = pending.sender.send(data.clone());
|
||||
@@ -270,7 +305,7 @@ impl PolkadotProtocol {
|
||||
send_polkadot_message(
|
||||
ctx,
|
||||
who.clone(),
|
||||
Message::RequestBlockData(req_id, parent, c_hash),
|
||||
Message::RequestPovBlock(req_id, parent, c_hash),
|
||||
);
|
||||
|
||||
in_flight.insert((req_id, who), pending);
|
||||
@@ -295,12 +330,21 @@ impl PolkadotProtocol {
|
||||
trace!(target: "p_net", "Polkadot message from {}: {:?}", who, msg);
|
||||
match msg {
|
||||
Message::SessionKey(key) => self.on_session_key(ctx, who, key),
|
||||
Message::RequestPovBlock(req_id, relay_parent, candidate_hash) => {
|
||||
let pov_block = self.live_validation_sessions.with_pov_block(
|
||||
&relay_parent,
|
||||
&candidate_hash,
|
||||
|res| res.ok().map(|b| b.clone()),
|
||||
);
|
||||
|
||||
send_polkadot_message(ctx, who, Message::PovBlock(req_id, pov_block));
|
||||
}
|
||||
Message::RequestBlockData(req_id, relay_parent, candidate_hash) => {
|
||||
let block_data = self.live_validation_sessions
|
||||
.with_block_data(
|
||||
.with_pov_block(
|
||||
&relay_parent,
|
||||
&candidate_hash,
|
||||
|res| res.ok().map(|b| b.clone()),
|
||||
|res| res.ok().map(|b| b.block_data.clone()),
|
||||
)
|
||||
.or_else(|| self.extrinsic_store.as_ref()
|
||||
.and_then(|s| s.block_data(relay_parent, candidate_hash))
|
||||
@@ -308,7 +352,11 @@ impl PolkadotProtocol {
|
||||
|
||||
send_polkadot_message(ctx, who, Message::BlockData(req_id, block_data));
|
||||
}
|
||||
Message::BlockData(req_id, data) => self.on_block_data(ctx, who, req_id, data),
|
||||
Message::PovBlock(req_id, data) => self.on_pov_block(ctx, who, req_id, data),
|
||||
Message::BlockData(_req_id, _data) => {
|
||||
// current block data is never requested bare by the node.
|
||||
ctx.report_peer(who, Severity::Bad("Peer sent un-requested block data".to_string()));
|
||||
}
|
||||
Message::Collation(relay_parent, collation) => self.on_collation(ctx, who, relay_parent, collation),
|
||||
Message::CollatorRole(role) => self.on_new_role(ctx, who, role),
|
||||
}
|
||||
@@ -355,13 +403,19 @@ impl PolkadotProtocol {
|
||||
self.dispatch_pending_requests(ctx);
|
||||
}
|
||||
|
||||
fn on_block_data(&mut self, ctx: &mut Context<Block>, who: PeerId, req_id: RequestId, data: Option<BlockData>) {
|
||||
fn on_pov_block(
|
||||
&mut self,
|
||||
ctx: &mut Context<Block>,
|
||||
who: PeerId,
|
||||
req_id: RequestId,
|
||||
pov_block: Option<PoVBlock>,
|
||||
) {
|
||||
match self.in_flight.remove(&(req_id, who.clone())) {
|
||||
Some(req) => {
|
||||
if let Some(data) = data {
|
||||
if data.hash() == req.block_data_hash {
|
||||
let _ = req.sender.send(data);
|
||||
return
|
||||
Some(mut req) => {
|
||||
if let Some(pov_block) = pov_block {
|
||||
match req.process_response(pov_block) {
|
||||
Ok(()) => return,
|
||||
Err(r) => { req = r; }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -486,12 +540,14 @@ impl Specialization<Block> for PolkadotProtocol {
|
||||
self.in_flight.retain(|&(_, ref peer), val| {
|
||||
let retain = peer != &who;
|
||||
if !retain {
|
||||
// swap with a dummy value which will be dropped immediately.
|
||||
let (sender, _) = oneshot::channel();
|
||||
pending.push(::std::mem::replace(val, BlockDataRequest {
|
||||
pending.push(::std::mem::replace(val, PoVBlockRequest {
|
||||
attempted_peers: Default::default(),
|
||||
validation_session_parent: Default::default(),
|
||||
candidate_hash: Default::default(),
|
||||
block_data_hash: Default::default(),
|
||||
canon_roots: ConsolidatedIngressRoots(Vec::new()),
|
||||
sender,
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -29,7 +29,8 @@ use polkadot_validation::{
|
||||
};
|
||||
use polkadot_primitives::{Block, Hash, SessionKey};
|
||||
use polkadot_primitives::parachain::{
|
||||
BlockData, Extrinsic, CandidateReceipt, ParachainHost, Id as ParaId, Message
|
||||
Extrinsic, CandidateReceipt, ParachainHost, Id as ParaId, Message,
|
||||
Collation, PoVBlock,
|
||||
};
|
||||
use gossip::RegisteredMessageValidator;
|
||||
|
||||
@@ -41,7 +42,7 @@ use std::collections::{HashMap, HashSet};
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
|
||||
use validation::{self, SessionDataFetcher, NetworkService, Executor, Incoming};
|
||||
use validation::{self, SessionDataFetcher, NetworkService, Executor};
|
||||
|
||||
type IngressPairRef<'a> = (ParaId, &'a [Message]);
|
||||
|
||||
@@ -92,6 +93,12 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
|
||||
.map(|msg| msg.statement)
|
||||
}
|
||||
|
||||
/// Get access to the session data fetcher.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn fetcher(&self) -> &SessionDataFetcher<P, E, N, T> {
|
||||
&self.fetcher
|
||||
}
|
||||
|
||||
fn parent_hash(&self) -> Hash {
|
||||
self.fetcher.parent_hash()
|
||||
}
|
||||
@@ -201,7 +208,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
|
||||
fn create_work<D>(&self, candidate_hash: Hash, producer: ParachainWork<D>)
|
||||
-> impl Future<Item=(),Error=()> + Send + 'static
|
||||
where
|
||||
D: Future<Item=(BlockData, Incoming),Error=io::Error> + Send + 'static,
|
||||
D: Future<Item=PoVBlock,Error=io::Error> + Send + 'static,
|
||||
{
|
||||
let table = self.table.clone();
|
||||
let network = self.network().clone();
|
||||
@@ -213,7 +220,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
|
||||
// store the data before broadcasting statements, so other peers can fetch.
|
||||
knowledge.lock().note_candidate(
|
||||
candidate_hash,
|
||||
Some(validated.block_data().clone()),
|
||||
Some(validated.pov_block().clone()),
|
||||
validated.extrinsic().cloned(),
|
||||
);
|
||||
|
||||
@@ -234,26 +241,21 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
|
||||
E: Future<Item=(),Error=()> + Clone + Send + 'static,
|
||||
{
|
||||
type Error = io::Error;
|
||||
type FetchCandidate = validation::BlockDataReceiver;
|
||||
type FetchIncoming = validation::IncomingReceiver;
|
||||
type FetchValidationProof = validation::PoVReceiver;
|
||||
|
||||
fn local_candidate(&self, receipt: CandidateReceipt, block_data: BlockData, extrinsic: Extrinsic) {
|
||||
fn local_collation(&self, collation: Collation, extrinsic: Extrinsic) {
|
||||
// produce a signed statement
|
||||
let hash = receipt.hash();
|
||||
let validated = Validated::collated_local(receipt, block_data.clone(), extrinsic.clone());
|
||||
let hash = collation.receipt.hash();
|
||||
let validated = Validated::collated_local(collation.receipt, collation.pov.clone(), extrinsic.clone());
|
||||
let statement = self.table.import_validated(validated);
|
||||
|
||||
// give to network to make available.
|
||||
self.fetcher.knowledge().lock().note_candidate(hash, Some(block_data), Some(extrinsic));
|
||||
self.fetcher.knowledge().lock().note_candidate(hash, Some(collation.pov), Some(extrinsic));
|
||||
self.network().gossip_message(self.attestation_topic, statement.encode());
|
||||
}
|
||||
|
||||
fn fetch_block_data(&self, candidate: &CandidateReceipt) -> Self::FetchCandidate {
|
||||
self.fetcher.fetch_block_data(candidate)
|
||||
}
|
||||
|
||||
fn fetch_incoming(&self, parachain: ParaId) -> Self::FetchIncoming {
|
||||
self.fetcher.fetch_incoming(parachain)
|
||||
fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> Self::FetchValidationProof {
|
||||
self.fetcher.fetch_pov_block(candidate)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -21,7 +21,10 @@ use validation::SessionParams;
|
||||
|
||||
use polkadot_validation::GenericStatement;
|
||||
use polkadot_primitives::{Block, Hash, SessionKey};
|
||||
use polkadot_primitives::parachain::{CandidateReceipt, HeadData, BlockData, CollatorId, ValidatorId};
|
||||
use polkadot_primitives::parachain::{
|
||||
CandidateReceipt, HeadData, PoVBlock, BlockData, CollatorId, ValidatorId,
|
||||
ConsolidatedIngressRoots,
|
||||
};
|
||||
use substrate_primitives::crypto::UncheckedInto;
|
||||
use codec::Encode;
|
||||
use substrate_network::{
|
||||
@@ -74,6 +77,14 @@ impl TestContext {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
fn make_pov(block_data: Vec<u8>) -> PoVBlock {
|
||||
PoVBlock {
|
||||
block_data: BlockData(block_data),
|
||||
ingress: polkadot_primitives::parachain::ConsolidatedIngress(Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
fn make_status(status: &Status, roles: Roles) -> FullStatus {
|
||||
FullStatus {
|
||||
version: 1,
|
||||
@@ -164,7 +175,13 @@ fn fetches_from_those_with_knowledge() {
|
||||
let knowledge = session.knowledge();
|
||||
|
||||
knowledge.lock().note_statement(a_key.clone(), &GenericStatement::Valid(candidate_hash));
|
||||
let recv = protocol.fetch_block_data(&mut TestContext::default(), &candidate_receipt, parent_hash);
|
||||
let canon_roots = ConsolidatedIngressRoots(Vec::new());
|
||||
let recv = protocol.fetch_pov_block(
|
||||
&mut TestContext::default(),
|
||||
&candidate_receipt,
|
||||
parent_hash,
|
||||
canon_roots,
|
||||
);
|
||||
|
||||
// connect peer A
|
||||
{
|
||||
@@ -178,7 +195,7 @@ fn fetches_from_those_with_knowledge() {
|
||||
let mut ctx = TestContext::default();
|
||||
on_message(&mut protocol, &mut ctx, peer_a.clone(), Message::SessionKey(a_key.clone()));
|
||||
assert!(protocol.validators.contains_key(&a_key));
|
||||
assert!(ctx.has_message(peer_a.clone(), Message::RequestBlockData(1, parent_hash, candidate_hash)));
|
||||
assert!(ctx.has_message(peer_a.clone(), Message::RequestPovBlock(1, parent_hash, candidate_hash)));
|
||||
}
|
||||
|
||||
knowledge.lock().note_statement(b_key.clone(), &GenericStatement::Valid(candidate_hash));
|
||||
@@ -188,7 +205,7 @@ fn fetches_from_those_with_knowledge() {
|
||||
let mut ctx = TestContext::default();
|
||||
protocol.on_connect(&mut ctx, peer_b.clone(), make_status(&status, Roles::AUTHORITY));
|
||||
on_message(&mut protocol, &mut ctx, peer_b.clone(), Message::SessionKey(b_key.clone()));
|
||||
assert!(!ctx.has_message(peer_b.clone(), Message::RequestBlockData(2, parent_hash, candidate_hash)));
|
||||
assert!(!ctx.has_message(peer_b.clone(), Message::RequestPovBlock(2, parent_hash, candidate_hash)));
|
||||
|
||||
}
|
||||
|
||||
@@ -197,15 +214,16 @@ fn fetches_from_those_with_knowledge() {
|
||||
let mut ctx = TestContext::default();
|
||||
protocol.on_disconnect(&mut ctx, peer_a.clone());
|
||||
assert!(!protocol.validators.contains_key(&a_key));
|
||||
assert!(ctx.has_message(peer_b.clone(), Message::RequestBlockData(2, parent_hash, candidate_hash)));
|
||||
assert!(ctx.has_message(peer_b.clone(), Message::RequestPovBlock(2, parent_hash, candidate_hash)));
|
||||
}
|
||||
|
||||
// peer B comes back with block data.
|
||||
{
|
||||
let mut ctx = TestContext::default();
|
||||
on_message(&mut protocol, &mut ctx, peer_b, Message::BlockData(2, Some(block_data.clone())));
|
||||
let pov_block = make_pov(block_data.0);
|
||||
on_message(&mut protocol, &mut ctx, peer_b, Message::PovBlock(2, Some(pov_block.clone())));
|
||||
drop(protocol);
|
||||
assert_eq!(recv.wait().unwrap(), block_data);
|
||||
assert_eq!(recv.wait().unwrap(), pov_block);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,10 +22,12 @@ use substrate_primitives::{NativeOrEncoded, ExecutionContext};
|
||||
use substrate_keyring::AuthorityKeyring;
|
||||
use {PolkadotProtocol};
|
||||
|
||||
use polkadot_validation::{SharedTable, MessagesFrom, Network, TableRouter};
|
||||
use polkadot_validation::{SharedTable, MessagesFrom, Network};
|
||||
use polkadot_primitives::{SessionKey, Block, Hash, Header, BlockId};
|
||||
use polkadot_primitives::parachain::{Id as ParaId, Chain, DutyRoster, ParachainHost, OutgoingMessage,
|
||||
ValidatorId};
|
||||
use polkadot_primitives::parachain::{
|
||||
Id as ParaId, Chain, DutyRoster, ParachainHost, OutgoingMessage,
|
||||
ValidatorId, ConsolidatedIngressRoots,
|
||||
};
|
||||
use parking_lot::Mutex;
|
||||
use substrate_client::error::Result as ClientResult;
|
||||
use substrate_client::runtime_api::{Core, RuntimeVersion, ApiExt};
|
||||
@@ -158,7 +160,7 @@ struct ApiData {
|
||||
validators: Vec<ValidatorId>,
|
||||
duties: Vec<Chain>,
|
||||
active_parachains: Vec<ParaId>,
|
||||
ingress: HashMap<ParaId, Vec<(ParaId, Hash)>>,
|
||||
ingress: HashMap<ParaId, ConsolidatedIngressRoots>,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
@@ -293,7 +295,7 @@ impl ParachainHost<Block> for RuntimeApi {
|
||||
_: ExecutionContext,
|
||||
id: Option<ParaId>,
|
||||
_: Vec<u8>,
|
||||
) -> ClientResult<NativeOrEncoded<Option<Vec<(ParaId, Hash)>>>> {
|
||||
) -> ClientResult<NativeOrEncoded<Option<ConsolidatedIngressRoots>>> {
|
||||
let id = id.unwrap();
|
||||
Ok(NativeOrEncoded::Native(self.data.lock().ingress.get(&id).cloned()))
|
||||
}
|
||||
@@ -358,7 +360,7 @@ impl IngressBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
fn build(self) -> HashMap<ParaId, Vec<(ParaId, Hash)>> {
|
||||
fn build(self) -> HashMap<ParaId, ConsolidatedIngressRoots> {
|
||||
let mut map = HashMap::new();
|
||||
for ((source, target), messages) in self.egress {
|
||||
map.entry(target).or_insert_with(Vec::new)
|
||||
@@ -369,7 +371,7 @@ impl IngressBuilder {
|
||||
roots.sort_by_key(|&(para_id, _)| para_id);
|
||||
}
|
||||
|
||||
map
|
||||
map.into_iter().map(|(k, v)| (k, ConsolidatedIngressRoots(v))).collect()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -471,11 +473,11 @@ fn ingress_fetch_works() {
|
||||
};
|
||||
|
||||
// make sure everyone can get ingress for their own parachain.
|
||||
let fetch_a = router_a.then(move |r| r.unwrap()
|
||||
let fetch_a = router_a.then(move |r| r.unwrap().fetcher()
|
||||
.fetch_incoming(id_a).map_err(|_| format!("Could not fetch ingress_a")));
|
||||
let fetch_b = router_b.then(move |r| r.unwrap()
|
||||
let fetch_b = router_b.then(move |r| r.unwrap().fetcher()
|
||||
.fetch_incoming(id_b).map_err(|_| format!("Could not fetch ingress_b")));
|
||||
let fetch_c = router_c.then(move |r| r.unwrap()
|
||||
let fetch_c = router_c.then(move |r| r.unwrap().fetcher()
|
||||
.fetch_incoming(id_c).map_err(|_| format!("Could not fetch ingress_c")));
|
||||
|
||||
let work = fetch_a.join3(fetch_b, fetch_c);
|
||||
|
||||
@@ -22,10 +22,10 @@
|
||||
use sr_primitives::traits::{BlakeTwo256, ProvideRuntimeApi, Hash as HashT};
|
||||
use substrate_network::Context as NetContext;
|
||||
use polkadot_validation::{Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement};
|
||||
use polkadot_primitives::{Block, Hash, SessionKey};
|
||||
use polkadot_primitives::{Block, BlockId, Hash, SessionKey};
|
||||
use polkadot_primitives::parachain::{
|
||||
Id as ParaId, Collation, Extrinsic, ParachainHost, BlockData, Message, CandidateReceipt,
|
||||
CollatorId, ValidatorId,
|
||||
Id as ParaId, Collation, Extrinsic, ParachainHost, Message, CandidateReceipt,
|
||||
CollatorId, ValidatorId, PoVBlock,
|
||||
};
|
||||
use codec::{Encode, Decode};
|
||||
|
||||
@@ -325,7 +325,7 @@ impl<P, E: Clone, N, T: Clone> Collators for ValidationNetwork<P, E, N, T> where
|
||||
struct KnowledgeEntry {
|
||||
knows_block_data: Vec<ValidatorId>,
|
||||
knows_extrinsic: Vec<ValidatorId>,
|
||||
block_data: Option<BlockData>,
|
||||
pov: Option<PoVBlock>,
|
||||
extrinsic: Option<Extrinsic>,
|
||||
}
|
||||
|
||||
@@ -366,9 +366,9 @@ impl Knowledge {
|
||||
}
|
||||
|
||||
/// Note a candidate collated or seen locally.
|
||||
pub(crate) fn note_candidate(&mut self, hash: Hash, block_data: Option<BlockData>, extrinsic: Option<Extrinsic>) {
|
||||
pub(crate) fn note_candidate(&mut self, hash: Hash, pov: Option<PoVBlock>, extrinsic: Option<Extrinsic>) {
|
||||
let entry = self.candidates.entry(hash).or_insert_with(Default::default);
|
||||
entry.block_data = entry.block_data.take().or(block_data);
|
||||
entry.pov = entry.pov.take().or(pov);
|
||||
entry.extrinsic = entry.extrinsic.take().or(extrinsic);
|
||||
}
|
||||
}
|
||||
@@ -436,15 +436,15 @@ impl ValidationSession {
|
||||
&self.fetch_incoming
|
||||
}
|
||||
|
||||
// execute a closure with locally stored block data for a candidate, or a slice of session identities
|
||||
// execute a closure with locally stored proof-of-validation for a candidate, or a slice of session identities
|
||||
// we believe should have the data.
|
||||
fn with_block_data<F, U>(&self, hash: &Hash, f: F) -> U
|
||||
where F: FnOnce(Result<&BlockData, &[ValidatorId]>) -> U
|
||||
fn with_pov_block<F, U>(&self, hash: &Hash, f: F) -> U
|
||||
where F: FnOnce(Result<&PoVBlock, &[ValidatorId]>) -> U
|
||||
{
|
||||
let knowledge = self.knowledge.lock();
|
||||
let res = knowledge.candidates.get(hash)
|
||||
.ok_or(&[] as &_)
|
||||
.and_then(|entry| entry.block_data.as_ref().ok_or(&entry.knows_block_data[..]));
|
||||
.and_then(|entry| entry.pov.as_ref().ok_or(&entry.knows_block_data[..]));
|
||||
|
||||
f(res)
|
||||
}
|
||||
@@ -590,32 +590,33 @@ impl LiveValidationSessions {
|
||||
self.recent.as_slice()
|
||||
}
|
||||
|
||||
/// Call a closure with block data from validation session at parent hash.
|
||||
/// Call a closure with pov-data from validation session at parent hash for a given
|
||||
/// candidate-receipt hash.
|
||||
///
|
||||
/// This calls the closure with `Some(data)` where the session and data are live,
|
||||
/// `Err(Some(keys))` when the session is live but the data unknown, with a list of keys
|
||||
/// who have the data, and `Err(None)` where the session is unknown.
|
||||
pub(crate) fn with_block_data<F, U>(&self, parent_hash: &Hash, c_hash: &Hash, f: F) -> U
|
||||
where F: FnOnce(Result<&BlockData, Option<&[ValidatorId]>>) -> U
|
||||
pub(crate) fn with_pov_block<F, U>(&self, parent_hash: &Hash, c_hash: &Hash, f: F) -> U
|
||||
where F: FnOnce(Result<&PoVBlock, Option<&[ValidatorId]>>) -> U
|
||||
{
|
||||
match self.live_instances.get(parent_hash) {
|
||||
Some(c) => c.1.with_block_data(c_hash, |res| f(res.map_err(Some))),
|
||||
Some(c) => c.1.with_pov_block(c_hash, |res| f(res.map_err(Some))),
|
||||
None => f(Err(None))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Receiver for block data.
|
||||
pub struct BlockDataReceiver {
|
||||
outer: Receiver<Receiver<BlockData>>,
|
||||
inner: Option<Receiver<BlockData>>
|
||||
pub struct PoVReceiver {
|
||||
outer: Receiver<Receiver<PoVBlock>>,
|
||||
inner: Option<Receiver<PoVBlock>>
|
||||
}
|
||||
|
||||
impl Future for BlockDataReceiver {
|
||||
type Item = BlockData;
|
||||
impl Future for PoVReceiver {
|
||||
type Item = PoVBlock;
|
||||
type Error = io::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<BlockData, io::Error> {
|
||||
fn poll(&mut self) -> Poll<PoVBlock, io::Error> {
|
||||
let map_err = |_| io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"Sending end of channel hung up",
|
||||
@@ -746,22 +747,34 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> SessionDataFetcher<P, E, N, T> where
|
||||
T: Clone + Executor + Send + 'static,
|
||||
E: Future<Item=(),Error=()> + Clone + Send + 'static,
|
||||
{
|
||||
/// Fetch block data for the given candidate receipt.
|
||||
pub fn fetch_block_data(&self, candidate: &CandidateReceipt) -> BlockDataReceiver {
|
||||
/// Fetch PoV block for the given candidate receipt.
|
||||
pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> PoVReceiver {
|
||||
let parachain = candidate.parachain_index.clone();
|
||||
let parent_hash = self.parent_hash;
|
||||
|
||||
let canon_roots = self.api.runtime_api().ingress(&BlockId::hash(parent_hash), parachain)
|
||||
.map_err(|e|
|
||||
format!(
|
||||
"Cannot fetch ingress for parachain {:?} at {:?}: {:?}",
|
||||
parachain,
|
||||
parent_hash,
|
||||
e,
|
||||
)
|
||||
);
|
||||
|
||||
let candidate = candidate.clone();
|
||||
let (tx, rx) = ::futures::sync::oneshot::channel();
|
||||
self.network.with_spec(move |spec, ctx| {
|
||||
let inner_rx = spec.fetch_block_data(ctx, &candidate, parent_hash);
|
||||
let _ = tx.send(inner_rx);
|
||||
if let Ok(Some(canon_roots)) = canon_roots {
|
||||
let inner_rx = spec.fetch_pov_block(ctx, &candidate, parent_hash, canon_roots);
|
||||
let _ = tx.send(inner_rx);
|
||||
}
|
||||
});
|
||||
BlockDataReceiver { outer: rx, inner: None }
|
||||
PoVReceiver { outer: rx, inner: None }
|
||||
}
|
||||
|
||||
/// Fetch incoming messages for a parachain.
|
||||
pub fn fetch_incoming(&self, parachain: ParaId) -> IncomingReceiver {
|
||||
use polkadot_primitives::BlockId;
|
||||
|
||||
let (rx, work) = self.fetch_incoming.lock().fetch_with_work(parachain.clone(), move || {
|
||||
let parent_hash: Hash = self.parent_hash();
|
||||
let topic = incoming_message_topic(parent_hash, parachain);
|
||||
@@ -778,7 +791,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> SessionDataFetcher<P, E, N, T> where
|
||||
canon_roots.into_future()
|
||||
.and_then(move |ingress_roots| match ingress_roots {
|
||||
None => Err(format!("No parachain {:?} registered at {}", parachain, parent_hash)),
|
||||
Some(roots) => Ok(roots.into_iter().collect())
|
||||
Some(roots) => Ok(roots.0.into_iter().collect())
|
||||
})
|
||||
.and_then(move |ingress_roots| ComputeIngress {
|
||||
inner: gossip_messages,
|
||||
|
||||
Reference in New Issue
Block a user