mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-09 10:37:58 +00:00
Remove grandpa liveness oracle (#1271)
* core: support broadcasting consensus message to all peers * grandpa: remove liveness oracle * node: always start grandpa on full nodes * core: don't check for grandpa justifications on failed block imports * core: fix network connectivity test
This commit is contained in:
committed by
Robert Habermeier
parent
b2ce2f4bd9
commit
45d53ad022
@@ -96,14 +96,13 @@ use runtime_primitives::traits::{
|
||||
use fg_primitives::GrandpaApi;
|
||||
use runtime_primitives::generic::BlockId;
|
||||
use substrate_primitives::{ed25519, H256, AuthorityId, Blake2Hasher};
|
||||
use tokio::timer::{Delay, Interval};
|
||||
use tokio::timer::Delay;
|
||||
|
||||
use grandpa::Error as GrandpaError;
|
||||
use grandpa::{voter, round::State as RoundState, Equivocation, BlockNumberOps};
|
||||
|
||||
use network::{Service as NetworkService, ExHashT};
|
||||
use network::consensus_gossip::{ConsensusMessage};
|
||||
use parking_lot::Mutex;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
@@ -266,7 +265,7 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>, H: ExHashT
|
||||
|
||||
fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>) {
|
||||
let topic = message_topic::<B>(round, set_id);
|
||||
self.service.gossip_consensus_message(topic, message);
|
||||
self.service.gossip_consensus_message(topic, message, false);
|
||||
}
|
||||
|
||||
fn drop_messages(&self, round: u64, set_id: u64) {
|
||||
@@ -280,7 +279,7 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>, H: ExHashT
|
||||
|
||||
fn send_commit(&self, set_id: u64, message: Vec<u8>) {
|
||||
let topic = commit_topic::<B>(set_id);
|
||||
self.service.gossip_consensus_message(topic, message);
|
||||
self.service.gossip_consensus_message(topic, message, true);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -809,63 +808,6 @@ fn finalize_block<B, Block: BlockT<Hash=H256>, E, RA>(
|
||||
}
|
||||
}
|
||||
|
||||
/// An oracle for liveness checking of a GRANDPA authority set. This is used
|
||||
/// when importing blocks, if the block enacts an authority set change then
|
||||
/// either it must provide a justification or if the GRANDPA authority set is
|
||||
/// still live then the block can be imported unjustified since the block will
|
||||
/// still be finalized by GRANDPA in a future round. The current heuristic for
|
||||
/// deciding whether an authority set is live is to check if there were any
|
||||
/// recent commit messages on an unfiltered stream).
|
||||
struct GrandpaOracle<Block: BlockT> {
|
||||
unfiltered_commits_stream: Box<dyn Stream<Item=(u64, CompactCommit<Block>), Error=Error> + Send>,
|
||||
last_commit_target: Option<(Instant, Block::Hash, NumberFor<Block>)>,
|
||||
}
|
||||
|
||||
impl<Block: BlockT> GrandpaOracle<Block> {
|
||||
fn new(stream: Box<dyn Stream<Item=(u64, CompactCommit<Block>), Error=Error> + Send>) -> GrandpaOracle<Block> {
|
||||
GrandpaOracle {
|
||||
unfiltered_commits_stream: stream,
|
||||
last_commit_target: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn poll(&mut self) {
|
||||
while let Ok(Async::Ready(Some((_, commit)))) = self.unfiltered_commits_stream.poll() {
|
||||
self.last_commit_target = Some((Instant::now(), commit.target_hash, commit.target_number));
|
||||
}
|
||||
}
|
||||
|
||||
fn is_live(&self) -> bool {
|
||||
self.last_commit_target.map(|(instant, _, _)| {
|
||||
instant.elapsed() < Duration::from_secs(30)
|
||||
}).unwrap_or(false)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct SharedGrandpaOracle<Block: BlockT> {
|
||||
inner: Arc<Mutex<Option<GrandpaOracle<Block>>>>,
|
||||
}
|
||||
|
||||
impl<Block: BlockT> SharedGrandpaOracle<Block> {
|
||||
fn empty() -> SharedGrandpaOracle<Block> {
|
||||
SharedGrandpaOracle { inner: Arc::new(Mutex::new(None)) }
|
||||
}
|
||||
|
||||
fn poll(&self) {
|
||||
if let Some(inner) = self.inner.lock().as_mut() {
|
||||
inner.poll();
|
||||
}
|
||||
}
|
||||
|
||||
fn is_live(&self) -> bool {
|
||||
self.inner.lock()
|
||||
.as_ref()
|
||||
.map(|inner| inner.is_live())
|
||||
.unwrap_or(false)
|
||||
}
|
||||
}
|
||||
|
||||
/// A block-import handler for GRANDPA.
|
||||
///
|
||||
/// This scans each imported block for signals of changing authority set.
|
||||
@@ -879,7 +821,6 @@ pub struct GrandpaBlockImport<B, E, Block: BlockT<Hash=H256>, RA, PRA> {
|
||||
inner: Arc<Client<B, E, Block, RA>>,
|
||||
authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
|
||||
authority_set_change: mpsc::UnboundedSender<NewAuthoritySet<Block::Hash, NumberFor<Block>>>,
|
||||
authority_set_oracle: SharedGrandpaOracle<Block>,
|
||||
api: Arc<PRA>,
|
||||
}
|
||||
|
||||
@@ -900,49 +841,46 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block>
|
||||
{
|
||||
use authorities::PendingChange;
|
||||
|
||||
// we don't want to finalize on `inner.import_block`
|
||||
let justification = block.justification.take();
|
||||
let number = block.header.number().clone();
|
||||
let hash = block.post_header().hash();
|
||||
let parent_hash = *block.header.parent_hash();
|
||||
let digest = block.header.digest().clone();
|
||||
let is_live = self.authority_set_oracle.is_live();
|
||||
|
||||
let import_result = self.inner.import_block(block, new_authorities)?;
|
||||
if import_result != ImportResult::Queued {
|
||||
return Ok(import_result);
|
||||
}
|
||||
let number = block.header.number().clone();
|
||||
|
||||
let maybe_change = self.api.runtime_api().grandpa_pending_change(
|
||||
&BlockId::hash(parent_hash),
|
||||
&digest,
|
||||
&BlockId::hash(*block.header.parent_hash()),
|
||||
&block.header.digest().clone(),
|
||||
)?;
|
||||
|
||||
let is_equal_or_descendent_of = |base: &Block::Hash| -> Result<(), ClientError> {
|
||||
let error = || {
|
||||
Err(ClientErrorKind::Backend(
|
||||
"invalid authority set change: multiple pending changes on the same chain".to_string()
|
||||
).into())
|
||||
// when we update the authorities, we need to hold the lock
|
||||
// until the block is written to prevent a race if we need to restore
|
||||
// the old authority set on error.
|
||||
let just_in_case = if let Some(change) = maybe_change {
|
||||
let parent_hash = *block.header.parent_hash();
|
||||
|
||||
let mut authorities = self.authority_set.inner().write();
|
||||
let old_set = authorities.clone();
|
||||
|
||||
let is_equal_or_descendent_of = |base: &Block::Hash| -> Result<(), ClientError> {
|
||||
let error = || {
|
||||
Err(ClientErrorKind::Backend(
|
||||
"invalid authority set change: multiple pending changes on the same chain".to_string()
|
||||
).into())
|
||||
};
|
||||
|
||||
if *base == hash { return error(); }
|
||||
if *base == parent_hash { return error(); }
|
||||
|
||||
let tree_route = ::client::blockchain::tree_route(
|
||||
self.inner.backend().blockchain(),
|
||||
BlockId::Hash(parent_hash),
|
||||
BlockId::Hash(*base),
|
||||
)?;
|
||||
|
||||
if tree_route.common_block().hash == *base {
|
||||
return error();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
};
|
||||
|
||||
if *base == hash { return error(); }
|
||||
if *base == parent_hash { return error(); }
|
||||
|
||||
let tree_route = ::client::blockchain::tree_route(
|
||||
self.inner.backend().blockchain(),
|
||||
BlockId::Hash(parent_hash),
|
||||
BlockId::Hash(*base),
|
||||
)?;
|
||||
|
||||
if tree_route.common_block().hash == *base {
|
||||
return error();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
};
|
||||
|
||||
if let Some(change) = maybe_change {
|
||||
let mut authorities = self.authority_set.inner().write();
|
||||
authorities.add_pending_change(
|
||||
PendingChange {
|
||||
next_authorities: change.next_authorities,
|
||||
@@ -953,62 +891,74 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block>
|
||||
is_equal_or_descendent_of,
|
||||
)?;
|
||||
|
||||
let encoded = authorities.encode();
|
||||
Backend::insert_aux(&**self.inner.backend(), &[(AUTHORITY_SET_KEY, &encoded[..])], &[])?;
|
||||
block.auxiliary.push((AUTHORITY_SET_KEY.to_vec(), Some(authorities.encode())));
|
||||
Some((old_set, authorities))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// we don't want to finalize on `inner.import_block`
|
||||
let justification = block.justification.take();
|
||||
let import_result = self.inner.import_block(block, new_authorities).map_err(|e| {
|
||||
if let Some((old_set, mut authorities)) = just_in_case {
|
||||
debug!(target: "afg", "Restoring old set after block import error: {:?}", e);
|
||||
*authorities = old_set;
|
||||
}
|
||||
e
|
||||
})?;
|
||||
|
||||
if import_result != ImportResult::Queued {
|
||||
return Ok(import_result);
|
||||
}
|
||||
|
||||
let enacts_change = self.authority_set.inner().read().enacts_change(number, |canon_number| {
|
||||
canonical_at_height(&self.inner, (hash, number), canon_number)
|
||||
})?;
|
||||
|
||||
// a pending change is enacted by the given block, if the current
|
||||
// grandpa authority set isn't live anymore the provided `ImportBlock`
|
||||
// should include a justification for finalizing the block.
|
||||
if !enacts_change {
|
||||
return Ok(import_result);
|
||||
}
|
||||
|
||||
match justification {
|
||||
Some(justification) => {
|
||||
if enacts_change && !is_live {
|
||||
let justification = GrandpaJustification::decode_and_verify(
|
||||
justification,
|
||||
self.authority_set.set_id(),
|
||||
&self.authority_set.current_authorities(),
|
||||
)?;
|
||||
let justification = GrandpaJustification::decode_and_verify(
|
||||
justification,
|
||||
self.authority_set.set_id(),
|
||||
&self.authority_set.current_authorities(),
|
||||
)?;
|
||||
|
||||
let result = finalize_block(
|
||||
&*self.inner,
|
||||
&self.authority_set,
|
||||
hash,
|
||||
number,
|
||||
justification.into(),
|
||||
);
|
||||
let result = finalize_block(
|
||||
&*self.inner,
|
||||
&self.authority_set,
|
||||
hash,
|
||||
number,
|
||||
justification.into(),
|
||||
);
|
||||
|
||||
match result {
|
||||
Ok(_) => {
|
||||
unreachable!("returns Ok when no authority set change should be enacted; \
|
||||
verified previously that finalizing the current block enacts a change; \
|
||||
qed;");
|
||||
},
|
||||
Err(ExitOrError::AuthoritiesChanged(new)) => {
|
||||
debug!(target: "finality", "Imported justified block #{} that enacts authority set change, signalling voter.", number);
|
||||
if let Err(_) = self.authority_set_change.unbounded_send(new) {
|
||||
return Err(ClientErrorKind::Backend(
|
||||
"imported and finalized change block but grandpa voter is no longer running".to_string()
|
||||
).into());
|
||||
}
|
||||
},
|
||||
Err(ExitOrError::Error(_)) => {
|
||||
match result {
|
||||
Ok(_) => {
|
||||
unreachable!("returns Ok when no authority set change should be enacted; \
|
||||
verified previously that finalizing the current block enacts a change; \
|
||||
qed;");
|
||||
},
|
||||
Err(ExitOrError::AuthoritiesChanged(new)) => {
|
||||
debug!(target: "finality", "Imported justified block #{} that enacts authority set change, signalling voter.", number);
|
||||
if let Err(_) = self.authority_set_change.unbounded_send(new) {
|
||||
return Err(ClientErrorKind::Backend(
|
||||
"imported change block but failed to finalize it, node may be in an inconsistent state".to_string()
|
||||
"imported and finalized change block but grandpa voter is no longer running".to_string()
|
||||
).into());
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(ExitOrError::Error(_)) => {
|
||||
return Err(ClientErrorKind::Backend(
|
||||
"imported change block but failed to finalize it, node may be in an inconsistent state".to_string()
|
||||
).into());
|
||||
},
|
||||
}
|
||||
},
|
||||
None if enacts_change && !is_live => {
|
||||
return Err(ClientErrorKind::BadJustification(
|
||||
"missing justification for block that enacts authority set change".to_string()
|
||||
).into());
|
||||
},
|
||||
_ => {}
|
||||
None => {
|
||||
trace!(target: "finality", "Imported unjustified block #{} that enacts authority set change, waiting for finality for enactment.", number);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(import_result)
|
||||
@@ -1085,7 +1035,6 @@ pub struct LinkHalf<B, E, Block: BlockT<Hash=H256>, RA> {
|
||||
client: Arc<Client<B, E, Block, RA>>,
|
||||
authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
|
||||
authority_set_change: mpsc::UnboundedReceiver<NewAuthoritySet<Block::Hash, NumberFor<Block>>>,
|
||||
authority_set_oracle: SharedGrandpaOracle<Block>,
|
||||
}
|
||||
|
||||
struct AncestryChain<Block: BlockT> {
|
||||
@@ -1170,21 +1119,17 @@ pub fn block_import<B, E, Block: BlockT<Hash=H256>, RA, PRA>(
|
||||
|
||||
let (authority_set_change_tx, authority_set_change_rx) = mpsc::unbounded();
|
||||
|
||||
let authority_set_oracle = SharedGrandpaOracle::empty();
|
||||
|
||||
Ok((
|
||||
GrandpaBlockImport {
|
||||
inner: client.clone(),
|
||||
authority_set: authority_set.clone(),
|
||||
authority_set_change: authority_set_change_tx,
|
||||
authority_set_oracle: authority_set_oracle.clone(),
|
||||
api
|
||||
},
|
||||
LinkHalf {
|
||||
client,
|
||||
authority_set,
|
||||
authority_set_change: authority_set_change_rx,
|
||||
authority_set_oracle,
|
||||
},
|
||||
))
|
||||
}
|
||||
@@ -1241,10 +1186,7 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
|
||||
config: Config,
|
||||
link: LinkHalf<B, E, Block, RA>,
|
||||
network: N,
|
||||
) -> ::client::error::Result<(
|
||||
impl Future<Item=(),Error=()> + Send + 'static,
|
||||
impl Future<Item=(),Error=()> + Send + 'static,
|
||||
)> where
|
||||
) -> ::client::error::Result<impl Future<Item=(),Error=()> + Send + 'static> where
|
||||
Block::Hash: Ord,
|
||||
B: Backend<Block, Blake2Hasher> + 'static,
|
||||
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
|
||||
@@ -1261,16 +1203,8 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
|
||||
client,
|
||||
authority_set,
|
||||
authority_set_change,
|
||||
authority_set_oracle
|
||||
} = link;
|
||||
|
||||
let oracle_work = {
|
||||
let authority_set_oracle = authority_set_oracle.clone();
|
||||
Interval::new(Instant::now(), Duration::from_secs(1))
|
||||
.for_each(move |_| Ok(authority_set_oracle.poll()))
|
||||
.map_err(|_| ())
|
||||
};
|
||||
|
||||
let chain_info = client.info()?;
|
||||
let genesis_hash = chain_info.chain.genesis_hash;
|
||||
|
||||
@@ -1315,14 +1249,6 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
|
||||
&network,
|
||||
);
|
||||
|
||||
let unfiltered_commits_stream = Box::new(::communication::checked_commit_stream::<Block, _>(
|
||||
env.set_id,
|
||||
network.commit_messages(env.set_id),
|
||||
env.voters.clone(),
|
||||
));
|
||||
|
||||
*authority_set_oracle.inner.lock() = Some(GrandpaOracle::new(unfiltered_commits_stream));
|
||||
|
||||
let voters = (*env.voters).clone();
|
||||
|
||||
let voter = voter::Voter::new(
|
||||
@@ -1386,5 +1312,5 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>(
|
||||
}))
|
||||
}).map_err(|e| warn!("GRANDPA Voter failed: {:?}", e));
|
||||
|
||||
Ok((voter_work, oracle_work))
|
||||
Ok(voter_work)
|
||||
}
|
||||
|
||||
@@ -192,7 +192,7 @@ impl Network for MessageRouting {
|
||||
|
||||
fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>) {
|
||||
let mut inner = self.inner.lock();
|
||||
inner.peer(self.peer_id).gossip_message(make_topic(round, set_id), message);
|
||||
inner.peer(self.peer_id).gossip_message(make_topic(round, set_id), message, false);
|
||||
inner.route_until_complete();
|
||||
}
|
||||
|
||||
@@ -223,7 +223,7 @@ impl Network for MessageRouting {
|
||||
|
||||
fn send_commit(&self, set_id: u64, message: Vec<u8>) {
|
||||
let mut inner = self.inner.lock();
|
||||
inner.peer(self.peer_id).gossip_message(make_commit_topic(set_id), message);
|
||||
inner.peer(self.peer_id).gossip_message(make_commit_topic(set_id), message, true);
|
||||
inner.route_until_complete();
|
||||
}
|
||||
}
|
||||
@@ -368,7 +368,7 @@ fn finalize_3_voters_no_observers() {
|
||||
);
|
||||
fn assert_send<T: Send>(_: &T) { }
|
||||
|
||||
let (voter, oracle) = run_grandpa(
|
||||
let voter = run_grandpa(
|
||||
Config {
|
||||
gossip_duration: TEST_GOSSIP_DURATION,
|
||||
local_key: Some(Arc::new(key.clone().into())),
|
||||
@@ -380,7 +380,6 @@ fn finalize_3_voters_no_observers() {
|
||||
|
||||
assert_send(&voter);
|
||||
|
||||
runtime.spawn(oracle);
|
||||
runtime.spawn(voter);
|
||||
}
|
||||
|
||||
@@ -429,7 +428,7 @@ fn finalize_3_voters_1_observer() {
|
||||
.take_while(|n| Ok(n.header.number() < &20))
|
||||
.for_each(move |_| Ok(()))
|
||||
);
|
||||
let (voter, oracle) = run_grandpa(
|
||||
let voter = run_grandpa(
|
||||
Config {
|
||||
gossip_duration: TEST_GOSSIP_DURATION,
|
||||
local_key,
|
||||
@@ -439,7 +438,6 @@ fn finalize_3_voters_1_observer() {
|
||||
MessageRouting::new(net.clone(), peer_id),
|
||||
).expect("all in order with client and network");
|
||||
|
||||
runtime.spawn(oracle);
|
||||
runtime.spawn(voter);
|
||||
}
|
||||
|
||||
@@ -508,7 +506,6 @@ fn transition_3_voters_twice_1_observer() {
|
||||
transitions.lock().insert(parent_hash, change);
|
||||
};
|
||||
let peers_c = peers_c.clone();
|
||||
let executor = runtime.executor().clone();
|
||||
|
||||
// wait for blocks to be finalized before generating new ones
|
||||
let block_production = client.finality_notification_stream()
|
||||
@@ -533,34 +530,18 @@ fn transition_3_voters_twice_1_observer() {
|
||||
net.lock().peer(0).push_blocks(5, false);
|
||||
},
|
||||
20 => {
|
||||
let net = net.clone();
|
||||
let add_transition = add_transition.clone();
|
||||
|
||||
// at block 21 we do another transition, but this time instant.
|
||||
// add more until we have 30.
|
||||
let generate_blocks = move || {
|
||||
net.lock().peer(0).generate_blocks(1, BlockOrigin::File, |builder| {
|
||||
let block = builder.bake().unwrap();
|
||||
add_transition(*block.header.parent_hash(), ScheduledChange {
|
||||
next_authorities: make_ids(&peers_c),
|
||||
delay: 0,
|
||||
});
|
||||
|
||||
block
|
||||
net.lock().peer(0).generate_blocks(1, BlockOrigin::File, |builder| {
|
||||
let block = builder.bake().unwrap();
|
||||
add_transition(*block.header.parent_hash(), ScheduledChange {
|
||||
next_authorities: make_ids(&peers_c),
|
||||
delay: 0,
|
||||
});
|
||||
net.lock().peer(0).push_blocks(9, false);
|
||||
};
|
||||
|
||||
// delay block generation for a bit for the liveness tracker to be
|
||||
// able to update due to the authority set change
|
||||
let delay_generate = Delay::new(Instant::now() + Duration::from_millis(5000))
|
||||
.and_then(move |_| {
|
||||
generate_blocks();
|
||||
Ok(())
|
||||
})
|
||||
.map_err(|_| ());
|
||||
|
||||
executor.spawn(delay_generate);
|
||||
block
|
||||
});
|
||||
net.lock().peer(0).push_blocks(9, false);
|
||||
},
|
||||
_ => {},
|
||||
}
|
||||
@@ -603,7 +584,7 @@ fn transition_3_voters_twice_1_observer() {
|
||||
assert!(set.pending_changes().is_empty());
|
||||
})
|
||||
);
|
||||
let (voter, oracle) = run_grandpa(
|
||||
let voter = run_grandpa(
|
||||
Config {
|
||||
gossip_duration: TEST_GOSSIP_DURATION,
|
||||
local_key,
|
||||
@@ -613,7 +594,6 @@ fn transition_3_voters_twice_1_observer() {
|
||||
MessageRouting::new(net.clone(), peer_id),
|
||||
).expect("all in order with client and network");
|
||||
|
||||
runtime.spawn(oracle);
|
||||
runtime.spawn(voter);
|
||||
}
|
||||
|
||||
|
||||
@@ -40,6 +40,7 @@ struct MessageEntry<B: BlockT> {
|
||||
topic: B::Hash,
|
||||
message_hash: B::Hash,
|
||||
message: ConsensusMessage,
|
||||
broadcast: bool,
|
||||
instant: Instant,
|
||||
}
|
||||
|
||||
@@ -78,7 +79,7 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
let mut known_messages = HashSet::new();
|
||||
for entry in self.messages.iter() {
|
||||
known_messages.insert((entry.topic, entry.message_hash));
|
||||
protocol.send_message(who, Message::Consensus(entry.topic.clone(), entry.message.clone()));
|
||||
protocol.send_message(who, Message::Consensus(entry.topic.clone(), entry.message.clone(), entry.broadcast));
|
||||
}
|
||||
self.peers.insert(who, PeerConsensus {
|
||||
known_messages,
|
||||
@@ -98,10 +99,27 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
protocol: &mut Context<B>,
|
||||
message_hash: B::Hash,
|
||||
topic: B::Hash,
|
||||
broadcast: bool,
|
||||
get_message: F,
|
||||
)
|
||||
where F: Fn() -> ConsensusMessage,
|
||||
{
|
||||
if broadcast {
|
||||
for (id, ref mut peer) in self.peers.iter_mut() {
|
||||
if peer.known_messages.insert((topic.clone(), message_hash.clone())) {
|
||||
let message = get_message();
|
||||
if peer.is_authority {
|
||||
trace!(target:"gossip", "Propagating to authority {}: {:?}", id, message);
|
||||
} else {
|
||||
trace!(target:"gossip", "Propagating to {}: {:?}", id, message);
|
||||
}
|
||||
protocol.send_message(*id, Message::Consensus(topic, message, broadcast));
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
let mut non_authorities: Vec<_> = self.peers.iter()
|
||||
.filter_map(|(id, ref peer)| if !peer.is_authority && !peer.known_messages.contains(&(topic, message_hash)) { Some(*id) } else { None })
|
||||
.collect();
|
||||
@@ -118,24 +136,25 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
if peer.known_messages.insert((topic.clone(), message_hash.clone())) {
|
||||
let message = get_message();
|
||||
trace!(target:"gossip", "Propagating to authority {}: {:?}", id, message);
|
||||
protocol.send_message(*id, Message::Consensus(topic, message));
|
||||
protocol.send_message(*id, Message::Consensus(topic, message, broadcast));
|
||||
}
|
||||
} else if non_authorities.contains(&id) {
|
||||
let message = get_message();
|
||||
trace!(target:"gossip", "Propagating to {}: {:?}", id, message);
|
||||
peer.known_messages.insert((topic.clone(), message_hash.clone()));
|
||||
protocol.send_message(*id, Message::Consensus(topic, message));
|
||||
protocol.send_message(*id, Message::Consensus(topic, message, broadcast));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn register_message<F>(&mut self, message_hash: B::Hash, topic: B::Hash, get_message: F)
|
||||
fn register_message<F>(&mut self, message_hash: B::Hash, topic: B::Hash, broadcast: bool, get_message: F)
|
||||
where F: Fn() -> ConsensusMessage
|
||||
{
|
||||
if self.known_messages.insert((topic, message_hash)) {
|
||||
self.messages.push(MessageEntry {
|
||||
topic,
|
||||
message_hash,
|
||||
broadcast,
|
||||
instant: Instant::now(),
|
||||
message: get_message(),
|
||||
});
|
||||
@@ -193,6 +212,7 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
who: NodeIndex,
|
||||
topic: B::Hash,
|
||||
message: ConsensusMessage,
|
||||
broadcast: bool,
|
||||
) -> Option<(B::Hash, ConsensusMessage)> {
|
||||
let message_hash = HashFor::<B>::hash(&message[..]);
|
||||
|
||||
@@ -236,21 +256,34 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
return None;
|
||||
}
|
||||
|
||||
self.multicast_inner(protocol, message_hash, topic, || message.clone());
|
||||
self.multicast_inner(protocol, message_hash, topic, broadcast, || message.clone());
|
||||
Some((topic, message))
|
||||
}
|
||||
|
||||
/// Multicast a message to all peers.
|
||||
pub fn multicast(&mut self, protocol: &mut Context<B>, topic: B::Hash, message: ConsensusMessage) {
|
||||
pub fn multicast(
|
||||
&mut self,
|
||||
protocol: &mut Context<B>,
|
||||
topic: B::Hash,
|
||||
message: ConsensusMessage,
|
||||
broadcast: bool,
|
||||
) {
|
||||
let message_hash = HashFor::<B>::hash(&message);
|
||||
self.multicast_inner(protocol, message_hash, topic, || message.clone());
|
||||
self.multicast_inner(protocol, message_hash, topic, broadcast, || message.clone());
|
||||
}
|
||||
|
||||
fn multicast_inner<F>(&mut self, protocol: &mut Context<B>, message_hash: B::Hash, topic: B::Hash, get_message: F)
|
||||
fn multicast_inner<F>(
|
||||
&mut self,
|
||||
protocol: &mut Context<B>,
|
||||
message_hash: B::Hash,
|
||||
topic: B::Hash,
|
||||
broadcast: bool,
|
||||
get_message: F,
|
||||
)
|
||||
where F: Fn() -> ConsensusMessage
|
||||
{
|
||||
self.register_message(message_hash, topic, &get_message);
|
||||
self.propagate(protocol, message_hash, topic, get_message);
|
||||
self.register_message(message_hash, topic, broadcast, &get_message);
|
||||
self.propagate(protocol, message_hash, topic, broadcast, get_message);
|
||||
}
|
||||
|
||||
/// Note new consensus session.
|
||||
@@ -287,6 +320,7 @@ mod tests {
|
||||
message_hash: $hash,
|
||||
instant: $now,
|
||||
message: $m,
|
||||
broadcast: false,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -331,7 +365,7 @@ mod tests {
|
||||
let message_hash = HashFor::<Block>::hash(&message);
|
||||
let topic = HashFor::<Block>::hash(&[1,2,3]);
|
||||
|
||||
consensus.register_message(message_hash, topic, || message.clone());
|
||||
consensus.register_message(message_hash, topic, false, || message.clone());
|
||||
let stream = consensus.messages_for(topic);
|
||||
|
||||
assert_eq!(stream.wait().next(), Some(Ok(message)));
|
||||
@@ -345,8 +379,8 @@ mod tests {
|
||||
let msg_a = vec![1, 2, 3];
|
||||
let msg_b = vec![4, 5, 6];
|
||||
|
||||
consensus.register_message(HashFor::<Block>::hash(&msg_a), topic, || msg_a.clone());
|
||||
consensus.register_message(HashFor::<Block>::hash(&msg_b), topic, || msg_b.clone());
|
||||
consensus.register_message(HashFor::<Block>::hash(&msg_a), topic, false, || msg_a.clone());
|
||||
consensus.register_message(HashFor::<Block>::hash(&msg_b), topic, false, || msg_b.clone());
|
||||
|
||||
assert_eq!(consensus.messages.len(), 2);
|
||||
}
|
||||
@@ -362,7 +396,7 @@ mod tests {
|
||||
let message_hash = HashFor::<Block>::hash(&message);
|
||||
let topic = HashFor::<Block>::hash(&[1,2,3]);
|
||||
|
||||
consensus.register_message(message_hash, topic, || message.clone());
|
||||
consensus.register_message(message_hash, topic, false, || message.clone());
|
||||
|
||||
let stream1 = consensus.messages_for(topic);
|
||||
let stream2 = consensus.messages_for(topic);
|
||||
|
||||
@@ -173,7 +173,7 @@ pub mod generic {
|
||||
/// Transactions.
|
||||
Transactions(Transactions<Extrinsic>),
|
||||
/// Consensus protocol message.
|
||||
Consensus(Hash, ConsensusMessage), // topic, opaque Vec<u8>
|
||||
Consensus(Hash, ConsensusMessage, bool), // topic, opaque Vec<u8>, broadcast
|
||||
/// Remote method call request.
|
||||
RemoteCallRequest(RemoteCallRequest<Hash>),
|
||||
/// Remote method call response.
|
||||
|
||||
@@ -285,8 +285,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
GenericMessage::RemoteHeaderResponse(response) => self.on_remote_header_response(io, who, response),
|
||||
GenericMessage::RemoteChangesRequest(request) => self.on_remote_changes_request(io, who, request),
|
||||
GenericMessage::RemoteChangesResponse(response) => self.on_remote_changes_response(io, who, response),
|
||||
GenericMessage::Consensus(topic, msg) => {
|
||||
self.consensus_gossip.write().on_incoming(&mut ProtocolContext::new(&self.context_data, io), who, topic, msg);
|
||||
GenericMessage::Consensus(topic, msg, broadcast) => {
|
||||
self.consensus_gossip.write().on_incoming(&mut ProtocolContext::new(&self.context_data, io), who, topic, msg, broadcast);
|
||||
},
|
||||
other => self.specialization.write().on_message(&mut ProtocolContext::new(&self.context_data, io), who, &mut Some(other)),
|
||||
}
|
||||
@@ -296,10 +296,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
send_message::<B, H>(&self.context_data.peers, io, who, message)
|
||||
}
|
||||
|
||||
pub fn gossip_consensus_message(&self, io: &mut SyncIo, topic: B::Hash, message: Vec<u8>) {
|
||||
pub fn gossip_consensus_message(&self, io: &mut SyncIo, topic: B::Hash, message: Vec<u8>, broadcast: bool) {
|
||||
let gossip = self.consensus_gossip();
|
||||
self.with_spec(io, move |_s, context|{
|
||||
gossip.write().multicast(context, topic, message);
|
||||
gossip.write().multicast(context, topic, message, broadcast);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -127,11 +127,13 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Service<B, S,
|
||||
}
|
||||
|
||||
/// Send a consensus message through the gossip
|
||||
pub fn gossip_consensus_message(&self, topic: B::Hash, message: Vec<u8>) {
|
||||
pub fn gossip_consensus_message(&self, topic: B::Hash, message: Vec<u8>, broadcast: bool) {
|
||||
self.handler.gossip_consensus_message(
|
||||
&mut NetSyncIo::new(&self.network, self.protocol_id),
|
||||
topic,
|
||||
message)
|
||||
message,
|
||||
broadcast,
|
||||
)
|
||||
}
|
||||
/// Execute a closure with the chain-specific network specialization.
|
||||
pub fn with_spec<F, U>(&self, f: F) -> U
|
||||
|
||||
@@ -229,8 +229,8 @@ impl<V: 'static + Verifier<Block>, D> Peer<V, D> {
|
||||
|
||||
/// Push a message into the gossip network and relay to peers.
|
||||
/// `TestNet::sync_step` needs to be called to ensure it's propagated.
|
||||
pub fn gossip_message(&self, topic: Hash, data: Vec<u8>) {
|
||||
self.sync.gossip_consensus_message(&mut TestIo::new(&self.queue, None), topic, data);
|
||||
pub fn gossip_message(&self, topic: Hash, data: Vec<u8>, broadcast: bool) {
|
||||
self.sync.gossip_consensus_message(&mut TestIo::new(&self.queue, None), topic, data, broadcast);
|
||||
}
|
||||
|
||||
/// Add blocks to the peer -- edit the block before adding
|
||||
|
||||
@@ -578,11 +578,8 @@ macro_rules! construct_service_factory {
|
||||
) -> Result<Self::FullService, $crate::Error>
|
||||
{
|
||||
( $( $full_service_init )* ) (config, executor.clone()).and_then(|service| {
|
||||
if let Some(key) = (&service).authority_key() {
|
||||
($( $authority_setup )*)(service, executor, Arc::new(key))
|
||||
} else {
|
||||
Ok(service)
|
||||
}
|
||||
let key = (&service).authority_key().map(Arc::new);
|
||||
($( $authority_setup )*)(service, executor, key)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -204,7 +204,7 @@ pub fn connectivity<F: ServiceFactory, Inherent>(spec: FactoryChainSpec<F>) wher
|
||||
{
|
||||
let temp = TempDir::new("substrate-connectivity-test").expect("Error creating test dir");
|
||||
{
|
||||
let mut network = TestNet::<F>::new(&temp, spec, NUM_NODES as u32, 0, vec![], 30400);
|
||||
let mut network = TestNet::<F>::new(&temp, spec, NUM_NODES, 0, vec![], 30500);
|
||||
info!("Checking linked topology");
|
||||
let mut address = network.full_nodes[0].1.network().node_id().expect("No node address");
|
||||
for (_, service) in network.full_nodes.iter().skip(1) {
|
||||
|
||||
@@ -311,7 +311,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_connectiviy() {
|
||||
fn test_connectivity() {
|
||||
service_test::connectivity::<Factory, node_primitives::InherentData>(integration_test_config());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -77,42 +77,51 @@ construct_service_factory! {
|
||||
{ |config: FactoryFullConfiguration<Self>, executor: TaskExecutor|
|
||||
FullComponents::<Factory>::new(config, executor) },
|
||||
AuthoritySetup = {
|
||||
|mut service: Self::FullService, executor: TaskExecutor, key: Arc<Pair>| {
|
||||
|mut service: Self::FullService, executor: TaskExecutor, key: Option<Arc<Pair>>| {
|
||||
let (block_import, link_half) = service.config.custom.grandpa_import_setup.take()
|
||||
.expect("Link Half and Block Import are present for Full Services or setup failed before. qed");
|
||||
|
||||
if service.config.custom.grandpa_authority {
|
||||
info!("Running Grandpa session as Authority {}", key.public());
|
||||
let (voter, oracle) = grandpa::run_grandpa(
|
||||
grandpa::Config {
|
||||
gossip_duration: Duration::new(4, 0), // FIXME: make this available through chainspec?
|
||||
local_key: Some(key.clone()),
|
||||
name: Some(service.config.name.clone())
|
||||
},
|
||||
link_half,
|
||||
grandpa::NetworkBridge::new(service.network()),
|
||||
)?;
|
||||
let local_key = if let Some(key) = key {
|
||||
if !service.config.custom.grandpa_authority_only {
|
||||
info!("Using authority key {}", key.public());
|
||||
let proposer = Arc::new(substrate_service::ProposerFactory {
|
||||
client: service.client(),
|
||||
transaction_pool: service.transaction_pool(),
|
||||
});
|
||||
|
||||
executor.spawn(oracle);
|
||||
executor.spawn(voter);
|
||||
}
|
||||
if !service.config.custom.grandpa_authority_only {
|
||||
info!("Using authority key {}", key.public());
|
||||
let proposer = Arc::new(substrate_service::ProposerFactory {
|
||||
client: service.client(),
|
||||
transaction_pool: service.transaction_pool(),
|
||||
});
|
||||
let client = service.client();
|
||||
executor.spawn(start_aura(
|
||||
SlotDuration::get_or_compute(&*client)?,
|
||||
key.clone(),
|
||||
client,
|
||||
block_import.clone(),
|
||||
proposer,
|
||||
service.network(),
|
||||
));
|
||||
}
|
||||
|
||||
if service.config.custom.grandpa_authority {
|
||||
info!("Running Grandpa session as Authority {}", key.public());
|
||||
Some(key)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let voter = grandpa::run_grandpa(
|
||||
grandpa::Config {
|
||||
local_key,
|
||||
gossip_duration: Duration::new(4, 0), // FIXME: make this available through chainspec?
|
||||
name: Some(service.config.name.clone())
|
||||
},
|
||||
link_half,
|
||||
grandpa::NetworkBridge::new(service.network()),
|
||||
)?;
|
||||
|
||||
executor.spawn(voter);
|
||||
|
||||
let client = service.client();
|
||||
executor.spawn(start_aura(
|
||||
SlotDuration::get_or_compute(&*client)?,
|
||||
key,
|
||||
client,
|
||||
block_import.clone(),
|
||||
proposer,
|
||||
service.network(),
|
||||
));
|
||||
}
|
||||
Ok(service)
|
||||
}
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user