mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 15:21:08 +00:00
fill batch statements from table
This commit is contained in:
@@ -52,6 +52,9 @@ mod handle_incoming;
|
||||
mod round_robin;
|
||||
mod table;
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests;
|
||||
|
||||
/// Context necessary for agreement.
|
||||
pub trait Context: Send + Clone {
|
||||
/// A validator ID
|
||||
@@ -78,6 +81,12 @@ pub trait Context: Send + Clone {
|
||||
/// data is checked.
|
||||
type CheckAvailability: IntoFuture<Item=bool>;
|
||||
|
||||
/// The statement batch type.
|
||||
type StatementBatch: StatementBatch<
|
||||
Self::ValidatorId,
|
||||
table::SignedStatement<Self::ParachainCandidate, Self::Digest, Self::ValidatorId, Self::Signature>,
|
||||
>;
|
||||
|
||||
/// Get the digest of a candidate.
|
||||
fn candidate_digest(candidate: &Self::ParachainCandidate) -> Self::Digest;
|
||||
|
||||
@@ -128,12 +137,14 @@ pub trait Context: Send + Clone {
|
||||
pub trait TypeResolve {
|
||||
type SignedTableStatement;
|
||||
type BftCommunication;
|
||||
type BftCommitted;
|
||||
type Misbehavior;
|
||||
}
|
||||
|
||||
impl<C: Context> TypeResolve for C {
|
||||
type SignedTableStatement = table::SignedStatement<C::ParachainCandidate, C::Digest, C::ValidatorId, C::Signature>;
|
||||
type BftCommunication = bft::Communication<C::Proposal, C::Digest, C::ValidatorId, C::Signature>;
|
||||
type BftCommitted = bft::Committed<C::Proposal,C::Digest,C::Signature>;
|
||||
type Misbehavior = table::Misbehavior<C::ParachainCandidate, C::Digest, C::ValidatorId, C::Signature>;
|
||||
}
|
||||
|
||||
@@ -318,6 +329,11 @@ impl<C: Context> SharedTable<C> {
|
||||
self.inner.lock().table.get_misbehavior().clone()
|
||||
}
|
||||
|
||||
/// Fill a statement batch.
|
||||
pub fn fill_batch(&self, batch: &mut C::StatementBatch) {
|
||||
self.inner.lock().table.fill_batch(batch);
|
||||
}
|
||||
|
||||
// Get a handle to the table context.
|
||||
fn context(&self) -> &TableContext<C> {
|
||||
&*self.context
|
||||
@@ -425,8 +441,6 @@ pub struct AgreementParams<C: Context> {
|
||||
pub message_buffer_size: usize,
|
||||
/// Interval to attempt forming proposals over.
|
||||
pub form_proposal_interval: Duration,
|
||||
/// Interval to create table statement packets over.
|
||||
pub table_broadcast_interval: Duration,
|
||||
}
|
||||
|
||||
/// Recovery for messages
|
||||
@@ -435,6 +449,19 @@ pub trait MessageRecovery<C: Context> {
|
||||
fn check_message(&self, UncheckedMessage) -> Option<CheckedMessage<C>>;
|
||||
}
|
||||
|
||||
/// A batch of statements to send out.
|
||||
pub trait StatementBatch<V, T> {
|
||||
/// Get the target authorities of these statements.
|
||||
fn targets(&self) -> &[V];
|
||||
|
||||
/// Push a statement onto the batch. Returns false when the batch is full.
|
||||
///
|
||||
/// This is meant to do work like incrementally serializing the statements
|
||||
/// into a vector of bytes while making sure the length is below a certain
|
||||
/// amount.
|
||||
fn push(&mut self, statement: T) -> bool;
|
||||
}
|
||||
|
||||
/// Recovered and fully checked messages.
|
||||
pub enum CheckedMessage<C: Context> {
|
||||
/// Messages meant for the BFT agreement logic.
|
||||
@@ -443,19 +470,42 @@ pub enum CheckedMessage<C: Context> {
|
||||
Table(Vec<<C as TypeResolve>::SignedTableStatement>),
|
||||
}
|
||||
|
||||
/// Outgoing messages to the network.
|
||||
pub enum OutgoingMessage<C: Context> {
|
||||
/// Messages meant for BFT agreement peers.
|
||||
Bft(<C as TypeResolve>::BftCommunication),
|
||||
/// Batches of table statements.
|
||||
Table(C::StatementBatch),
|
||||
}
|
||||
|
||||
/// Create an agreement future, and I/O streams.
|
||||
pub fn agree<C, I, O, R, E>(params: AgreementParams<C>, net_in: I, net_out: O, recovery: R)
|
||||
-> Box<Future<Item=bft::Committed<C::Proposal,C::Digest,C::Signature>,Error=Error>>
|
||||
// TODO: kill 'static bounds and use impl Future.
|
||||
pub fn agree<
|
||||
Context,
|
||||
NetIn,
|
||||
NetOut,
|
||||
Recovery,
|
||||
PropagateStatements,
|
||||
Err,
|
||||
>(
|
||||
params: AgreementParams<Context>,
|
||||
net_in: NetIn,
|
||||
net_out: NetOut,
|
||||
recovery: Recovery,
|
||||
propagate_statements: PropagateStatements,
|
||||
)
|
||||
-> Box<Future<Item=<Context as TypeResolve>::BftCommitted,Error=Error>>
|
||||
where
|
||||
C: Context + 'static,
|
||||
C::CheckCandidate: IntoFuture<Error=E>,
|
||||
C::CheckAvailability: IntoFuture<Error=E>,
|
||||
I: Stream<Item=(C::ValidatorId, Vec<UncheckedMessage>),Error=E> + 'static,
|
||||
O: Sink<SinkItem=CheckedMessage<C>> + 'static,
|
||||
R: MessageRecovery<C> + 'static,
|
||||
Context: ::Context + 'static,
|
||||
Context::CheckCandidate: IntoFuture<Error=Err>,
|
||||
Context::CheckAvailability: IntoFuture<Error=Err>,
|
||||
NetIn: Stream<Item=(Context::ValidatorId, Vec<UncheckedMessage>),Error=Err> + 'static,
|
||||
NetOut: Sink<SinkItem=OutgoingMessage<Context>> + 'static,
|
||||
Recovery: MessageRecovery<Context> + 'static,
|
||||
PropagateStatements: Stream<Item=Context::StatementBatch,Error=Err> + 'static,
|
||||
{
|
||||
let (bft_in_in, bft_in_out) = mpsc::unbounded();
|
||||
let (bft_out_in, bft_out_out) = mpsc::unbounded::<bft::ContextCommunication<BftContext<C>>>();
|
||||
let (bft_out_in, bft_out_out) = mpsc::unbounded();
|
||||
|
||||
let agreement = {
|
||||
let bft_context = BftContext {
|
||||
@@ -489,14 +539,16 @@ pub fn agree<C, I, O, R, E>(params: AgreementParams<C>, net_in: I, net_out: O, r
|
||||
|
||||
|
||||
let route_messages_out = {
|
||||
let periodic_table_statements = params.timer.interval(params.table_broadcast_interval)
|
||||
.map_err(|_| Error::FaultyTimer)
|
||||
.map(|()| unimplemented!()); // create table statements to send. but to _who_ and how many?
|
||||
let table = params.table.clone();
|
||||
let periodic_table_statements = propagate_statements
|
||||
.or_else(|_| ::futures::future::empty()) // halt the stream instead of error.
|
||||
.map(move |mut batch| { table.fill_batch(&mut batch); batch })
|
||||
.map(OutgoingMessage::Table);
|
||||
|
||||
let complete_out_stream = bft_out_out
|
||||
.map_err(|_| Error::IoTerminated)
|
||||
.map(|bft::ContextCommunication(x)| x)
|
||||
.map(CheckedMessage::Bft)
|
||||
.map(OutgoingMessage::Bft)
|
||||
.select(periodic_table_statements);
|
||||
|
||||
net_out.sink_map_err(|_| Error::IoTerminated).send_all(complete_out_stream)
|
||||
|
||||
@@ -32,6 +32,8 @@ use std::collections::hash_map::{HashMap, Entry};
|
||||
use std::hash::Hash;
|
||||
use std::fmt::Debug;
|
||||
|
||||
use super::StatementBatch;
|
||||
|
||||
/// Context for the statement table.
|
||||
pub trait Context {
|
||||
/// A validator ID
|
||||
@@ -238,6 +240,15 @@ struct ValidatorData<C: Context> {
|
||||
known_statements: HashSet<StatementTrace<C::ValidatorId, C::Digest>>,
|
||||
}
|
||||
|
||||
impl<C: Context> Default for ValidatorData<C> {
|
||||
fn default() -> Self {
|
||||
ValidatorData {
|
||||
proposal: None,
|
||||
known_statements: HashSet::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Stores votes
|
||||
pub struct Table<C: Context> {
|
||||
validator_data: HashMap<C::ValidatorId, ValidatorData<C>>,
|
||||
@@ -369,6 +380,120 @@ impl<C: Context> Table<C> {
|
||||
&self.detected_misbehavior
|
||||
}
|
||||
|
||||
/// Fill a statement batch and note messages seen by the targets.
|
||||
pub fn fill_batch<B>(&mut self, batch: &mut B)
|
||||
where B: StatementBatch<
|
||||
C::ValidatorId,
|
||||
SignedStatement<C::Candidate, C::Digest, C::ValidatorId, C::Signature>,
|
||||
>
|
||||
{
|
||||
// naively iterate all statements so far, taking any that
|
||||
// at least one of the targets has not seen.
|
||||
|
||||
// workaround for the fact that it's inconvenient to borrow multiple
|
||||
// entries out of a hashmap mutably -- we just move them out and
|
||||
// replace them when we're done.
|
||||
struct SwappedTargetData<'a, C: 'a + Context> {
|
||||
validator_data: &'a mut HashMap<C::ValidatorId, ValidatorData<C>>,
|
||||
target_data: Vec<(C::ValidatorId, ValidatorData<C>)>,
|
||||
}
|
||||
|
||||
impl<'a, C: 'a + Context> Drop for SwappedTargetData<'a, C> {
|
||||
fn drop(&mut self) {
|
||||
for (id, data) in self.target_data.drain(..) {
|
||||
self.validator_data.insert(id, data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// pre-fetch authority data for all the targets.
|
||||
let mut target_data = {
|
||||
let validator_data = &mut self.validator_data;
|
||||
let mut target_data = Vec::with_capacity(batch.targets().len());
|
||||
for target in batch.targets() {
|
||||
let active_data = match validator_data.get_mut(target) {
|
||||
None => Default::default(),
|
||||
Some(x) => ::std::mem::replace(x, Default::default()),
|
||||
};
|
||||
|
||||
target_data.push((target.clone(), active_data));
|
||||
}
|
||||
|
||||
SwappedTargetData {
|
||||
validator_data,
|
||||
target_data
|
||||
}
|
||||
};
|
||||
|
||||
let target_data = &mut target_data.target_data;
|
||||
|
||||
macro_rules! attempt_send {
|
||||
($trace:expr, sender=$sender:expr, sig=$sig:expr, statement=$statement:expr) => {{
|
||||
let trace = $trace;
|
||||
let can_send = target_data.iter()
|
||||
.any(|t| t.1.known_statements.contains(&trace));
|
||||
|
||||
if can_send {
|
||||
let statement = SignedStatement {
|
||||
statement: $statement,
|
||||
signature: $sig,
|
||||
sender: $sender,
|
||||
};
|
||||
|
||||
if batch.push(statement) {
|
||||
for target in target_data.iter_mut() {
|
||||
target.1.known_statements.insert(trace.clone());
|
||||
}
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}}
|
||||
}
|
||||
|
||||
// reconstruct statements for anything whose trace passes the filter.
|
||||
for (digest, candidate) in self.candidate_votes.iter() {
|
||||
for (sender, vote) in candidate.validity_votes.iter() {
|
||||
match *vote {
|
||||
ValidityVote::Issued(ref sig) => {
|
||||
attempt_send!(
|
||||
StatementTrace::Candidate(sender.clone()),
|
||||
sender = sender.clone(),
|
||||
sig = sig.clone(),
|
||||
statement = Statement::Candidate(candidate.candidate.clone())
|
||||
)
|
||||
}
|
||||
ValidityVote::Valid(ref sig) => {
|
||||
attempt_send!(
|
||||
StatementTrace::Valid(sender.clone(), digest.clone()),
|
||||
sender = sender.clone(),
|
||||
sig = sig.clone(),
|
||||
statement = Statement::Valid(digest.clone())
|
||||
)
|
||||
}
|
||||
ValidityVote::Invalid(ref sig) => {
|
||||
attempt_send!(
|
||||
StatementTrace::Invalid(sender.clone(), digest.clone()),
|
||||
sender = sender.clone(),
|
||||
sig = sig.clone(),
|
||||
statement = Statement::Invalid(digest.clone())
|
||||
)
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
for (sender, sig) in candidate.availability_votes.iter() {
|
||||
attempt_send!(
|
||||
StatementTrace::Available(sender.clone(), digest.clone()),
|
||||
sender = sender.clone(),
|
||||
sig = sig.clone(),
|
||||
statement = Statement::Available(digest.clone())
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn note_trace_seen(&mut self, trace: StatementTrace<C::ValidatorId, C::Digest>, known_by: C::ValidatorId) {
|
||||
self.validator_data.entry(known_by).or_insert_with(|| ValidatorData {
|
||||
proposal: None,
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Tests and test helpers for the candidate agreement.
|
||||
|
||||
const VALIDITY_CHECK_DELAY_MS: isize = 400;
|
||||
const AVAILABILITY_CHECK_DELAY_MS: isize = 200;
|
||||
|
||||
use super::*;
|
||||
Reference in New Issue
Block a user