mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 15:11:02 +00:00
Minimal parachains part 2: Parachain statement and data routing (#173)
* dynamic inclusion threshold calculator * collators interface * collation helpers * initial proposal-creation future * create proposer when asked to propose * remove local_availability duty * statement table tracks includable parachain count * beginnings of timing future * finish proposal logic * remove stray println * extract shared table to separate module * change ordering * includability tracking * fix doc * initial changes to parachains module * initialise dummy block before API calls * give polkadot control over round proposer based on random seed * propose only after enough candidates * flesh out parachains module a bit more * set_heads * actually introduce set_heads to runtime * update block_builder to accept parachains * split block validity errors from real errors in evaluation * update WASM runtimes * polkadot-api methods for parachains additions * delay evaluation until candidates are ready * comments * fix dynamic inclusion with zero initial * test for includability tracker * wasm validation of parachain candidates * move primitives to primitives crate * remove runtime-std dependency from codec * adjust doc * polkadot-parachain-primitives * kill legacy polkadot-validator crate * basic-add test chain * test for basic_add parachain * move to test-chains dir * use wasm-build * new wasm directory layout * reorganize a bit more * Fix for rh-minimal-parachain (#141) * Remove extern "C" We already encountered such behavior (bug?) in pwasm-std, I believe. * Fix `panic_fmt` signature by adding `_col` Wrong `panic_fmt` signature can inhibit some optimizations in LTO mode. * Add linker flags and use wasm-gc in build script Pass --import-memory to LLD to emit wasm binary with imported memory. Also use wasm-gc instead of wasm-build. * Fix effective_max. I'm not sure why it was the way it was actually. * Recompile wasm. * Fix indent * more basic_add tests * validate parachain WASM * produce statements on receiving statements * tests for reactive statement production * fix build * add OOM lang item to runtime-io * use dynamic_inclusion when evaluating as well * fix update_includable_count * remove dead code * grumbles * actually defer round_proposer logic * update wasm * address a few more grumbles * schedule collation work as soon as BFT is started * impl future in collator * fix comment * governance proposals for adding and removing parachains * bump protocol version * tear out polkadot-specific pieces of substrate-network * extract out polkadot-specific stuff from substrate-network * begin polkadot network subsystem * grumbles * update WASM checkins * parse status from polkadot peer * allow invoke of network specialization * begin statement router implementation * remove dependency on tokio-timer * fix sanity check and have proposer factory create communication streams * pull out statement routing from consensus library * fix comments * adjust typedefs * extract consensus_gossip out of main network protocol handler * port substrate-bft to new tokio * port polkadot-consensus to new tokio * fix typo * start message processing task * initial consensus network implementation * remove known tracking from statement-table crate * extract router into separate module * defer statements until later * double signature is invalid * propagating statements * grumbles * request block data * fix compilation * embed new consensus network into service * port demo CLI to tokio * all test crates compile * some tests for fetching block data * whitespace * adjusting some tokio stuff * update exit-future * remove overly noisy warning * clean up collation work a bit * address review grumbles * fix lock order in protocol handler * rebuild wasm artifacts * tag AuthorityId::from_slice for std only * address formatting grumbles * rename event_loop to executor * some more docs for polkadot-network crate
This commit is contained in:
committed by
GitHub
parent
b8115b257f
commit
6bfcbd6d59
@@ -0,0 +1,349 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Statement routing and consensus table router implementation.
|
||||
//!
|
||||
//! During the consensus process, validators exchange statements on validity and availability
|
||||
//! of parachain candidates.
|
||||
//! The `Router` in this file hooks into the underlying network to fulfill
|
||||
//! the `TableRouter` trait from `polkadot-consensus`, which is expected to call into a shared statement table
|
||||
//! and dispatch evaluation work as necessary when new statements come in.
|
||||
|
||||
use polkadot_api::{PolkadotApi, LocalPolkadotApi};
|
||||
use polkadot_consensus::{SharedTable, TableRouter, SignedStatement, GenericStatement, StatementProducer};
|
||||
use polkadot_primitives::{Hash, BlockId, SessionKey};
|
||||
use polkadot_primitives::parachain::{BlockData, Extrinsic, CandidateReceipt, Id as ParaId};
|
||||
|
||||
use futures::prelude::*;
|
||||
use tokio::runtime::TaskExecutor;
|
||||
use parking_lot::Mutex;
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::{NetworkService, Knowledge};
|
||||
|
||||
/// Table routing implementation.
|
||||
pub struct Router<P: PolkadotApi> {
|
||||
table: Arc<SharedTable>,
|
||||
network: Arc<NetworkService>,
|
||||
api: Arc<P>,
|
||||
task_executor: TaskExecutor,
|
||||
parent_hash: Hash,
|
||||
knowledge: Arc<Mutex<Knowledge>>,
|
||||
deferred_statements: Arc<Mutex<DeferredStatements>>,
|
||||
}
|
||||
|
||||
impl<P: PolkadotApi> Router<P> {
|
||||
pub(crate) fn new(
|
||||
table: Arc<SharedTable>,
|
||||
network: Arc<NetworkService>,
|
||||
api: Arc<P>,
|
||||
task_executor: TaskExecutor,
|
||||
parent_hash: Hash,
|
||||
knowledge: Arc<Mutex<Knowledge>>,
|
||||
) -> Self {
|
||||
Router {
|
||||
table,
|
||||
network,
|
||||
api,
|
||||
task_executor,
|
||||
parent_hash,
|
||||
knowledge,
|
||||
deferred_statements: Arc::new(Mutex::new(DeferredStatements::new())),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn session_key(&self) -> SessionKey {
|
||||
self.table.session_key()
|
||||
}
|
||||
}
|
||||
|
||||
impl<P: PolkadotApi> Clone for Router<P> {
|
||||
fn clone(&self) -> Self {
|
||||
Router {
|
||||
table: self.table.clone(),
|
||||
network: self.network.clone(),
|
||||
api: self.api.clone(),
|
||||
task_executor: self.task_executor.clone(),
|
||||
parent_hash: self.parent_hash.clone(),
|
||||
deferred_statements: self.deferred_statements.clone(),
|
||||
knowledge: self.knowledge.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<P: LocalPolkadotApi + Send + Sync + 'static> Router<P> {
|
||||
/// Import a statement whose signature has been checked already.
|
||||
pub(crate) fn import_statement(&self, statement: SignedStatement) {
|
||||
// defer any statements for which we haven't imported the candidate yet
|
||||
let (c_hash, parachain_index) = {
|
||||
let candidate_data = match statement.statement {
|
||||
GenericStatement::Candidate(ref c) => Some((c.hash(), c.parachain_index)),
|
||||
GenericStatement::Valid(ref hash)
|
||||
| GenericStatement::Invalid(ref hash)
|
||||
| GenericStatement::Available(ref hash)
|
||||
=> self.table.with_candidate(hash, |c| c.map(|c| (*hash, c.parachain_index))),
|
||||
};
|
||||
match candidate_data {
|
||||
Some(x) => x,
|
||||
None => {
|
||||
self.deferred_statements.lock().push(statement);
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// import all statements pending on this candidate
|
||||
let (mut statements, _traces) = if let GenericStatement::Candidate(_) = statement.statement {
|
||||
self.deferred_statements.lock().get_deferred(&c_hash)
|
||||
} else {
|
||||
(Vec::new(), Vec::new())
|
||||
};
|
||||
|
||||
// prepend the candidate statement.
|
||||
statements.insert(0, statement);
|
||||
let producers: Vec<_> = self.table.import_remote_statements(
|
||||
self,
|
||||
statements.iter().cloned(),
|
||||
);
|
||||
// dispatch future work as necessary.
|
||||
for (producer, statement) in producers.into_iter().zip(statements) {
|
||||
let producer = match producer {
|
||||
Some(p) => p,
|
||||
None => continue, // statement redundant
|
||||
};
|
||||
|
||||
self.knowledge.lock().note_statement(statement.sender, &statement.statement);
|
||||
self.dispatch_work(c_hash, producer, parachain_index);
|
||||
}
|
||||
}
|
||||
|
||||
fn dispatch_work<D, E>(&self, candidate_hash: Hash, producer: StatementProducer<D, E>, parachain: ParaId) where
|
||||
D: Future<Item=BlockData,Error=()> + Send + 'static,
|
||||
E: Future<Item=Extrinsic,Error=()> + Send + 'static,
|
||||
{
|
||||
let parent_hash = self.parent_hash.clone();
|
||||
|
||||
let api = self.api.clone();
|
||||
let validate = move |collation| -> Option<bool> {
|
||||
let id = BlockId::hash(parent_hash);
|
||||
match ::polkadot_consensus::validate_collation(&*api, &id, &collation) {
|
||||
Ok(()) => Some(true),
|
||||
Err(e) => {
|
||||
debug!(target: "p_net", "Encountered bad collation: {}", e);
|
||||
Some(false)
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let table = self.table.clone();
|
||||
let network = self.network.clone();
|
||||
let knowledge = self.knowledge.clone();
|
||||
|
||||
let work = producer.prime(validate).map(move |produced| {
|
||||
// store the data before broadcasting statements, so other peers can fetch.
|
||||
knowledge.lock().note_candidate(candidate_hash, produced.block_data, produced.extrinsic);
|
||||
|
||||
// propagate the statements
|
||||
if let Some(validity) = produced.validity {
|
||||
let signed = table.sign_and_import(validity.clone());
|
||||
route_statement(&*network, &*table, parachain, parent_hash, signed);
|
||||
}
|
||||
|
||||
if let Some(availability) = produced.availability {
|
||||
let signed = table.sign_and_import(availability);
|
||||
route_statement(&*network, &*table, parachain, parent_hash, signed);
|
||||
}
|
||||
});
|
||||
|
||||
self.task_executor.spawn(work);
|
||||
}
|
||||
}
|
||||
|
||||
impl<P: LocalPolkadotApi + Send> TableRouter for Router<P> {
|
||||
type Error = ();
|
||||
type FetchCandidate = BlockDataReceiver;
|
||||
type FetchExtrinsic = Result<Extrinsic, Self::Error>;
|
||||
|
||||
fn local_candidate(&self, receipt: CandidateReceipt, block_data: BlockData, extrinsic: Extrinsic) {
|
||||
// give to network to make available.
|
||||
let hash = receipt.hash();
|
||||
let para_id = receipt.parachain_index;
|
||||
let signed = self.table.sign_and_import(GenericStatement::Candidate(receipt));
|
||||
|
||||
self.knowledge.lock().note_candidate(hash, Some(block_data), Some(extrinsic));
|
||||
route_statement(&*self.network, &*self.table, para_id, self.parent_hash, signed);
|
||||
}
|
||||
|
||||
fn fetch_block_data(&self, candidate: &CandidateReceipt) -> BlockDataReceiver {
|
||||
let parent_hash = self.parent_hash;
|
||||
let rx = self.network.with_spec(|spec, ctx| { spec.fetch_block_data(ctx, candidate, parent_hash) });
|
||||
BlockDataReceiver { inner: rx }
|
||||
}
|
||||
|
||||
fn fetch_extrinsic_data(&self, _candidate: &CandidateReceipt) -> Self::FetchExtrinsic {
|
||||
Ok(Extrinsic)
|
||||
}
|
||||
}
|
||||
|
||||
/// Receiver for block data.
|
||||
pub struct BlockDataReceiver {
|
||||
inner: Option<::futures::sync::oneshot::Receiver<BlockData>>,
|
||||
}
|
||||
|
||||
impl Future for BlockDataReceiver {
|
||||
type Item = BlockData;
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<BlockData, ()> {
|
||||
match self.inner {
|
||||
Some(ref mut inner) => inner.poll().map_err(|_| ()),
|
||||
None => return Err(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// get statement to relevant validators.
|
||||
fn route_statement(network: &NetworkService, table: &SharedTable, para_id: ParaId, parent_hash: Hash, statement: SignedStatement) {
|
||||
let broadcast = |i: &mut Iterator<Item=&SessionKey>| {
|
||||
let local_key = table.session_key();
|
||||
network.with_spec(|spec, ctx| {
|
||||
for val in i.filter(|&x| x != &local_key) {
|
||||
spec.send_statement(ctx, *val, parent_hash, statement.clone());
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
let g_info = table
|
||||
.group_info()
|
||||
.get(¶_id)
|
||||
.expect("statements only produced about groups which exist");
|
||||
|
||||
match statement.statement {
|
||||
GenericStatement::Candidate(_) =>
|
||||
broadcast(&mut g_info.validity_guarantors.iter().chain(g_info.availability_guarantors.iter())),
|
||||
GenericStatement::Valid(_) | GenericStatement::Invalid(_) =>
|
||||
broadcast(&mut g_info.validity_guarantors.iter()),
|
||||
GenericStatement::Available(_) =>
|
||||
broadcast(&mut g_info.availability_guarantors.iter()),
|
||||
}
|
||||
}
|
||||
|
||||
// A unique trace for valid statements issued by a validator.
|
||||
#[derive(Hash, PartialEq, Eq, Clone, Debug)]
|
||||
enum StatementTrace {
|
||||
Valid(SessionKey, Hash),
|
||||
Invalid(SessionKey, Hash),
|
||||
Available(SessionKey, Hash),
|
||||
}
|
||||
|
||||
// helper for deferring statements whose associated candidate is unknown.
|
||||
struct DeferredStatements {
|
||||
deferred: HashMap<Hash, Vec<SignedStatement>>,
|
||||
known_traces: HashSet<StatementTrace>,
|
||||
}
|
||||
|
||||
impl DeferredStatements {
|
||||
fn new() -> Self {
|
||||
DeferredStatements {
|
||||
deferred: HashMap::new(),
|
||||
known_traces: HashSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn push(&mut self, statement: SignedStatement) {
|
||||
let (hash, trace) = match statement.statement {
|
||||
GenericStatement::Candidate(_) => return,
|
||||
GenericStatement::Valid(hash) => (hash, StatementTrace::Valid(statement.sender, hash)),
|
||||
GenericStatement::Invalid(hash) => (hash, StatementTrace::Invalid(statement.sender, hash)),
|
||||
GenericStatement::Available(hash) => (hash, StatementTrace::Available(statement.sender, hash)),
|
||||
};
|
||||
|
||||
if self.known_traces.insert(trace) {
|
||||
self.deferred.entry(hash).or_insert_with(Vec::new).push(statement);
|
||||
}
|
||||
}
|
||||
|
||||
fn get_deferred(&mut self, hash: &Hash) -> (Vec<SignedStatement>, Vec<StatementTrace>) {
|
||||
match self.deferred.remove(hash) {
|
||||
None => (Vec::new(), Vec::new()),
|
||||
Some(deferred) => {
|
||||
let mut traces = Vec::new();
|
||||
for statement in deferred.iter() {
|
||||
let trace = match statement.statement {
|
||||
GenericStatement::Candidate(_) => continue,
|
||||
GenericStatement::Valid(hash) => StatementTrace::Valid(statement.sender, hash),
|
||||
GenericStatement::Invalid(hash) => StatementTrace::Invalid(statement.sender, hash),
|
||||
GenericStatement::Available(hash) => StatementTrace::Available(statement.sender, hash),
|
||||
};
|
||||
|
||||
self.known_traces.remove(&trace);
|
||||
traces.push(trace);
|
||||
}
|
||||
|
||||
(deferred, traces)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use substrate_primitives::H512;
|
||||
|
||||
#[test]
|
||||
fn deferred_statements_works() {
|
||||
let mut deferred = DeferredStatements::new();
|
||||
let hash = [1; 32].into();
|
||||
let sig = H512([2; 64]).into();
|
||||
let sender = [255; 32].into();
|
||||
|
||||
let statement = SignedStatement {
|
||||
statement: GenericStatement::Valid(hash),
|
||||
sender,
|
||||
signature: sig,
|
||||
};
|
||||
|
||||
// pre-push.
|
||||
{
|
||||
let (signed, traces) = deferred.get_deferred(&hash);
|
||||
assert!(signed.is_empty());
|
||||
assert!(traces.is_empty());
|
||||
}
|
||||
|
||||
deferred.push(statement.clone());
|
||||
deferred.push(statement.clone());
|
||||
|
||||
// draining: second push should have been ignored.
|
||||
{
|
||||
let (signed, traces) = deferred.get_deferred(&hash);
|
||||
assert_eq!(signed.len(), 1);
|
||||
|
||||
assert_eq!(traces.len(), 1);
|
||||
assert_eq!(signed[0].clone(), statement);
|
||||
assert_eq!(traces[0].clone(), StatementTrace::Valid(sender, hash));
|
||||
}
|
||||
|
||||
// after draining
|
||||
{
|
||||
let (signed, traces) = deferred.get_deferred(&hash);
|
||||
assert!(signed.is_empty());
|
||||
assert!(traces.is_empty());
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user