diff --git a/substrate/candidate-agreement/src/handle_incoming.rs b/substrate/candidate-agreement/src/handle_incoming.rs index 331ee7a923..9071ba4b58 100644 --- a/substrate/candidate-agreement/src/handle_incoming.rs +++ b/substrate/candidate-agreement/src/handle_incoming.rs @@ -24,7 +24,7 @@ use futures::prelude::*; use futures::stream::{Fuse, FuturesUnordered}; use futures::sync::mpsc; -use table::{self, Statement, SignedStatement, Context as TableContext}; +use table::{self, Statement, Context as TableContext}; use super::{Context, CheckedMessage, SharedTable, TypeResolve}; @@ -94,16 +94,8 @@ impl HandleIncoming { CheckResult::Unavailable => return, // no such statement and not provable. }; - let signature = self.table.context().sign_table_statement(&statement); - - let statement = SignedStatement { - statement, - signature, - sender: self.local_id.clone(), - }; - // TODO: trigger broadcast to peers immediately? - self.table.import_statement(statement, None); + self.table.sign_and_import(statement); } fn import_message(&mut self, origin: C::ValidatorId, message: CheckedMessage) { diff --git a/substrate/candidate-agreement/src/lib.rs b/substrate/candidate-agreement/src/lib.rs index 8aec09d076..7f5d374201 100644 --- a/substrate/candidate-agreement/src/lib.rs +++ b/substrate/candidate-agreement/src/lib.rs @@ -283,6 +283,20 @@ impl SharedTable { self.inner.lock().import_statement(&*self.context, statement, received_from) } + /// Sign and import a local statement. + pub fn sign_and_import( + &self, + statement: table::Statement, + ) -> Option> { + let signed_statement = table::SignedStatement { + signature: self.context.sign_table_statement(&statement), + sender: self.context.local_id(), + statement, + }; + + self.import_statement(signed_statement, None) + } + /// Import many statements at once. /// /// Provide an iterator yielding pairs of (statement, received_from). @@ -486,6 +500,7 @@ pub fn agree< NetOut, Recovery, PropagateStatements, + LocalCandidate, Err, >( params: AgreementParams, @@ -493,6 +508,7 @@ pub fn agree< net_out: NetOut, recovery: Recovery, propagate_statements: PropagateStatements, + local_candidate: LocalCandidate, ) -> Box::BftCommitted,Error=Error>> where @@ -503,6 +519,7 @@ pub fn agree< NetOut: Sink> + 'static, Recovery: MessageRecovery + 'static, PropagateStatements: Stream + 'static, + LocalCandidate: Future + 'static { let (bft_in_in, bft_in_out) = mpsc::unbounded(); let (bft_out_in, bft_out_out) = mpsc::unbounded(); @@ -537,7 +554,6 @@ pub fn agree< ).map_err(|_| Error::IoTerminated) }; - let route_messages_out = { let table = params.table.clone(); let periodic_table_statements = propagate_statements @@ -554,6 +570,15 @@ pub fn agree< net_out.sink_map_err(|_| Error::IoTerminated).send_all(complete_out_stream) }; + let import_local_candidate = { + let table = params.table.clone(); + local_candidate + .map(table::Statement::Candidate) + .map(Some) + .or_else(|_| Ok(None)) + .map(move |s| if let Some(s) = s { table.sign_and_import(s); }) + }; + let create_proposal_on_interval = { let table = params.table; params.timer.interval(params.form_proposal_interval) @@ -562,11 +587,12 @@ pub fn agree< }; // TODO: avoid having errors take down everything. - let future = agreement.join4( + let future = agreement.join5( route_messages_in, route_messages_out, - create_proposal_on_interval - ).map(|(agreed, _, _, _)| agreed); + create_proposal_on_interval, + import_local_candidate, + ).map(|(agreed, _, _, _, _)| agreed); Box::new(future) }