mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 17:41:08 +00:00
import a local candidate when it is available
This commit is contained in:
@@ -24,7 +24,7 @@ use futures::prelude::*;
|
||||
use futures::stream::{Fuse, FuturesUnordered};
|
||||
use futures::sync::mpsc;
|
||||
|
||||
use table::{self, Statement, SignedStatement, Context as TableContext};
|
||||
use table::{self, Statement, Context as TableContext};
|
||||
|
||||
use super::{Context, CheckedMessage, SharedTable, TypeResolve};
|
||||
|
||||
@@ -94,16 +94,8 @@ impl<C: Context, I> HandleIncoming<C, I> {
|
||||
CheckResult::Unavailable => return, // no such statement and not provable.
|
||||
};
|
||||
|
||||
let signature = self.table.context().sign_table_statement(&statement);
|
||||
|
||||
let statement = SignedStatement {
|
||||
statement,
|
||||
signature,
|
||||
sender: self.local_id.clone(),
|
||||
};
|
||||
|
||||
// TODO: trigger broadcast to peers immediately?
|
||||
self.table.import_statement(statement, None);
|
||||
self.table.sign_and_import(statement);
|
||||
}
|
||||
|
||||
fn import_message(&mut self, origin: C::ValidatorId, message: CheckedMessage<C>) {
|
||||
|
||||
@@ -283,6 +283,20 @@ impl<C: Context> SharedTable<C> {
|
||||
self.inner.lock().import_statement(&*self.context, statement, received_from)
|
||||
}
|
||||
|
||||
/// Sign and import a local statement.
|
||||
pub fn sign_and_import(
|
||||
&self,
|
||||
statement: table::Statement<C::ParachainCandidate, C::Digest>,
|
||||
) -> Option<table::Summary<C::Digest, C::GroupId>> {
|
||||
let signed_statement = table::SignedStatement {
|
||||
signature: self.context.sign_table_statement(&statement),
|
||||
sender: self.context.local_id(),
|
||||
statement,
|
||||
};
|
||||
|
||||
self.import_statement(signed_statement, None)
|
||||
}
|
||||
|
||||
/// Import many statements at once.
|
||||
///
|
||||
/// Provide an iterator yielding pairs of (statement, received_from).
|
||||
@@ -486,6 +500,7 @@ pub fn agree<
|
||||
NetOut,
|
||||
Recovery,
|
||||
PropagateStatements,
|
||||
LocalCandidate,
|
||||
Err,
|
||||
>(
|
||||
params: AgreementParams<Context>,
|
||||
@@ -493,6 +508,7 @@ pub fn agree<
|
||||
net_out: NetOut,
|
||||
recovery: Recovery,
|
||||
propagate_statements: PropagateStatements,
|
||||
local_candidate: LocalCandidate,
|
||||
)
|
||||
-> Box<Future<Item=<Context as TypeResolve>::BftCommitted,Error=Error>>
|
||||
where
|
||||
@@ -503,6 +519,7 @@ pub fn agree<
|
||||
NetOut: Sink<SinkItem=OutgoingMessage<Context>> + 'static,
|
||||
Recovery: MessageRecovery<Context> + 'static,
|
||||
PropagateStatements: Stream<Item=Context::StatementBatch,Error=Err> + 'static,
|
||||
LocalCandidate: Future<Item=Context::ParachainCandidate> + 'static
|
||||
{
|
||||
let (bft_in_in, bft_in_out) = mpsc::unbounded();
|
||||
let (bft_out_in, bft_out_out) = mpsc::unbounded();
|
||||
@@ -537,7 +554,6 @@ pub fn agree<
|
||||
).map_err(|_| Error::IoTerminated)
|
||||
};
|
||||
|
||||
|
||||
let route_messages_out = {
|
||||
let table = params.table.clone();
|
||||
let periodic_table_statements = propagate_statements
|
||||
@@ -554,6 +570,15 @@ pub fn agree<
|
||||
net_out.sink_map_err(|_| Error::IoTerminated).send_all(complete_out_stream)
|
||||
};
|
||||
|
||||
let import_local_candidate = {
|
||||
let table = params.table.clone();
|
||||
local_candidate
|
||||
.map(table::Statement::Candidate)
|
||||
.map(Some)
|
||||
.or_else(|_| Ok(None))
|
||||
.map(move |s| if let Some(s) = s { table.sign_and_import(s); })
|
||||
};
|
||||
|
||||
let create_proposal_on_interval = {
|
||||
let table = params.table;
|
||||
params.timer.interval(params.form_proposal_interval)
|
||||
@@ -562,11 +587,12 @@ pub fn agree<
|
||||
};
|
||||
|
||||
// TODO: avoid having errors take down everything.
|
||||
let future = agreement.join4(
|
||||
let future = agreement.join5(
|
||||
route_messages_in,
|
||||
route_messages_out,
|
||||
create_proposal_on_interval
|
||||
).map(|(agreed, _, _, _)| agreed);
|
||||
create_proposal_on_interval,
|
||||
import_local_candidate,
|
||||
).map(|(agreed, _, _, _, _)| agreed);
|
||||
|
||||
Box::new(future)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user