finish GRANDPA test: dynamic authority sets

This commit is contained in:
Robert Habermeier
2018-11-01 20:15:51 +01:00
parent 9630f34775
commit 6704f15e99
5 changed files with 120 additions and 65 deletions
@@ -25,3 +25,4 @@ features = ["derive-codec"]
substrate-network = { path = "../network", features = ["test-helpers"] }
substrate-keyring = { path = "../keyring" }
substrate-test-client = { path = "../test-client"}
env_logger = "0.5"
@@ -120,6 +120,7 @@ impl<H: Eq, N> AuthoritySet<H, N>
}
/// Inspect pending changes.
#[cfg(test)]
pub(crate) fn pending_changes(&self) -> &[PendingChange<H, N>] {
&self.pending_changes
}
+40 -8
View File
@@ -41,6 +41,9 @@ extern crate substrate_keyring as keyring;
#[cfg(test)]
extern crate substrate_test_client as test_client;
#[cfg(test)]
extern crate env_logger;
#[macro_use]
extern crate parity_codec_derive;
@@ -100,6 +103,14 @@ pub struct Config {
pub gossip_duration: Duration,
/// The local signing key.
pub local_key: Option<Arc<ed25519::Pair>>,
/// Some local identifier of the voter.
pub name: Option<String>,
}
impl Config {
fn name(&self) -> &str {
self.name.as_ref().map(|s| s.as_str()).unwrap_or("<unknown>")
}
}
/// Errors that can occur while voting in GRANDPA.
@@ -133,13 +144,13 @@ pub trait Network: Clone {
/// Get a stream of messages for a specific round. This stream should
/// never logically conclude.
fn messages_for(&self, round: u64) -> Self::In;
fn messages_for(&self, round: u64, set_id: u64) -> Self::In;
/// Send a message at a specific round out.
fn send_message(&self, round: u64, message: Vec<u8>);
fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>);
/// Clean up messages for a round.
fn drop_messages(&self, round: u64);
fn drop_messages(&self, round: u64, set_id: u64);
}
/// Something which can determine if a block is known.
@@ -297,6 +308,7 @@ impl<Block: BlockT, Status: BlockStatus<Block>, I> Stream for UntilImported<Bloc
// clears the network messages for inner round on drop.
struct ClearOnDrop<I, N: Network> {
round: u64,
set_id: u64,
inner: I,
network: N,
}
@@ -320,7 +332,7 @@ impl<I: Sink, N: Network> Sink for ClearOnDrop<I, N> {
impl<I, N: Network> Drop for ClearOnDrop<I, N> {
fn drop(&mut self) {
self.network.drop_messages(self.round);
self.network.drop_messages(self.round, self.set_id);
}
}
@@ -406,7 +418,7 @@ fn outgoing_messages<Block: BlockT, N: Network>(
};
// forward to network.
network.send_message(round, signed.encode());
network.send_message(round, set_id, signed.encode());
Some(signed)
} else {
None
@@ -473,6 +485,8 @@ impl<Block: BlockT, B, E, N> grandpa::Chain<Block::Hash, NumberFor<Block>> for E
// once blocks are finalized that make that transition irrelevant or activate it,
// we will proceed onwards. most of the time there will be no pending transition.
let limit = self.authority_set.current_limit();
trace!(target: "afg", "Finding best chain containing block {:?} with number limit {:?}", block, limit);
match self.inner.best_containing(block, limit) {
Ok(Some(hash)) => {
let header = self.inner.header(&BlockId::Hash(hash)).ok()?
@@ -564,7 +578,7 @@ impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> f
let incoming = checked_message_stream::<Block, _>(
round,
self.set_id,
self.network.messages_for(round),
self.network.messages_for(round, self.set_id),
self.voters.clone(),
);
@@ -585,11 +599,12 @@ impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> f
);
// join incoming network messages with locally originating ones.
let incoming = Box::new(incoming.select(out_rx).map_err(Into::into));
let incoming = Box::new(out_rx.select(incoming).map_err(Into::into));
// schedule network message cleanup when sink drops.
let outgoing = Box::new(ClearOnDrop {
round,
set_id: self.set_id,
network: self.network.clone(),
inner: outgoing.sink_map_err(Into::into),
});
@@ -604,6 +619,15 @@ impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> f
}
fn completed(&self, round: u64, state: RoundState<Block::Hash, NumberFor<Block>>) -> Result<(), Self::Error> {
debug!(
target: "afg", "Voter {} completed round {} in set {}. Estimate = {:?}, Finalized in round = {:?}",
self.config.name(),
round,
self.set_id,
state.estimate.as_ref().map(|e| e.1),
state.finalized.as_ref().map(|e| e.1),
);
let encoded_state = (round, state).encode();
if let Err(e) = self.inner.backend()
.insert_aux(&[(LAST_COMPLETED_KEY, &encoded_state[..])], &[])
@@ -669,6 +693,12 @@ impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> f
if let Some((canon_hash, canon_number)) = status.new_set_block {
// the authority set has changed.
let (new_id, set_ref) = authority_set.current();
if set_ref.len() > 16 {
info!("Applying GRANDPA set change to new set with {} authorities", set_ref.len());
} else {
info!("Applying GRANDPA set change to new set {:?}", set_ref);
}
Err(ExitOrError::AuthoritiesChanged(NewAuthoritySet {
canon_hash,
canon_number,
@@ -772,7 +802,7 @@ impl<B, E, Block: BlockT, Api> BlockImport<Block> for GrandpaBlockImport<B, E, B
let old_set = authorities.clone();
authorities.add_pending_change(PendingChange {
next_authorities: change.next_authorities,
finalization_depth: number + change.delay,
finalization_depth: change.delay,
canon_height: number,
canon_hash: hash,
});
@@ -883,6 +913,8 @@ pub fn run_grandpa<B, E, Block: BlockT, N>(
let work = future::loop_fn((initial_environment, last_round_number, last_state), move |params| {
let (env, last_round_number, last_state) = params;
debug!(target: "afg", "{}: Starting new voter with set ID {}", config.name(), env.set_id);
let chain_info = match client.info() {
Ok(i) => i,
Err(e) => return future::Either::B(future::err(Error::Client(e))),
+77 -57
View File
@@ -27,6 +27,7 @@ use client::BlockchainEvents;
use test_client::{self, runtime::BlockNumber};
use codec::Decode;
use consensus_common::BlockOrigin;
use std::collections::HashSet;
use authorities::AuthoritySet;
@@ -69,6 +70,13 @@ impl TestNetFactory for GrandpaTestNet {
}
}
fn default_config() -> ProtocolConfig {
// the authority role ensures gossip hits all nodes here.
ProtocolConfig {
roles: ::network::Roles::AUTHORITY,
}
}
fn make_verifier(&self, _client: Arc<PeersClient>, _cfg: &ProtocolConfig)
-> Arc<Self::Verifier>
{
@@ -118,21 +126,25 @@ impl MessageRouting {
}
}
fn round_to_topic(round: u64) -> Hash {
fn make_topic(round: u64, set_id: u64) -> Hash {
let mut hash = Hash::default();
round.using_encoded(|s| {
let raw = hash.as_mut();
raw[..8].copy_from_slice(s);
});
set_id.using_encoded(|s| {
let raw = hash.as_mut();
raw[8..16].copy_from_slice(s);
});
hash
}
impl Network for MessageRouting {
type In = Box<Stream<Item=Vec<u8>,Error=()>>;
fn messages_for(&self, round: u64) -> Self::In {
fn messages_for(&self, round: u64, set_id: u64) -> Self::In {
let messages = self.inner.lock().peer(self.peer_id)
.with_spec(|spec, _| spec.gossip.messages_for(round_to_topic(round)));
.with_spec(|spec, _| spec.gossip.messages_for(make_topic(round, set_id)));
let messages = messages.map_err(
move |_| panic!("Messages for round {} dropped too early", round)
@@ -141,14 +153,14 @@ impl Network for MessageRouting {
Box::new(messages)
}
fn send_message(&self, round: u64, message: Vec<u8>) {
fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>) {
let mut inner = self.inner.lock();
inner.peer(self.peer_id).gossip_message(round_to_topic(round), message);
inner.route();
inner.peer(self.peer_id).gossip_message(make_topic(round, set_id), message);
inner.route_until_complete();
}
fn drop_messages(&self, round: u64) {
let topic = round_to_topic(round);
fn drop_messages(&self, round: u64, set_id: u64) {
let topic = make_topic(round, set_id);
self.inner.lock().peer(self.peer_id)
.with_spec(|spec, _| spec.gossip.collect_garbage(|t| t == &topic));
}
@@ -179,12 +191,7 @@ impl ApiClient<Block> for TestApi {
{
// we take only scheduled changes at given block number where there are no
// extrinsics.
let change = self.scheduled_changes.lock().get(&header.hash()).map(|c| c.clone());
if change.is_some() {
println!("Found transition for {:?}", header.hash());
}
Ok(change)
Ok(self.scheduled_changes.lock().get(&header.hash()).map(|c| c.clone()))
}
}
@@ -236,6 +243,7 @@ fn finalize_3_voters_no_observers() {
Config {
gossip_duration: TEST_GOSSIP_DURATION,
local_key: Some(Arc::new(key.clone().into())),
name: Some(format!("peer#{}", peer_id)),
},
link,
MessageRouting::new(net.clone(), peer_id),
@@ -293,6 +301,7 @@ fn finalize_3_voters_1_observer() {
Config {
gossip_duration: TEST_GOSSIP_DURATION,
local_key,
name: Some(format!("peer#{}", peer_id)),
},
link,
MessageRouting::new(net.clone(), peer_id),
@@ -344,14 +353,13 @@ fn transition_3_voters_twice_1_observer() {
transitions.lock().insert(hash, change);
};
let mut net = GrandpaTestNet::new(api, 8);
let mut net = GrandpaTestNet::new(api, 9);
// first 20 blocks: transition at 15, applied at 20.
{
net.peer(0).push_blocks(14, false);
net.peer(0).generate_blocks(1, BlockOrigin::File, |builder| {
let block = builder.bake().unwrap();
println!("Adding transition for {:?}", block.header.hash());
add_transition(block.header.hash(), ScheduledChange {
next_authorities: make_ids(peers_b),
delay: 4,
@@ -367,7 +375,6 @@ fn transition_3_voters_twice_1_observer() {
{
net.peer(0).generate_blocks(1, BlockOrigin::File, |builder| {
let block = builder.bake().unwrap();
println!("Adding transition for {:?}", block.header.hash());
add_transition(block.header.hash(), ScheduledChange {
next_authorities: make_ids(peers_c),
delay: 0,
@@ -391,50 +398,63 @@ fn transition_3_voters_twice_1_observer() {
assert_eq!(set.pending_changes().len(), 2);
}
// let net = Arc::new(Mutex::new(net));
// let mut finality_notifications = Vec::new();
let net = Arc::new(Mutex::new(net));
let mut finality_notifications = Vec::new();
// let mut runtime = current_thread::Runtime::new().unwrap();
// let all_peers = peers.iter()
// .cloned()
// .map(|key| Some(Arc::new(key.into())))
// .chain(::std::iter::once(None));
let mut runtime = current_thread::Runtime::new().unwrap();
let all_peers = peers_a.iter()
.chain(peers_b)
.chain(peers_c)
.chain(observer)
.cloned()
.collect::<HashSet<_>>() // deduplicate
.into_iter()
.map(|key| Some(Arc::new(key.into())))
.enumerate();
// for (peer_id, local_key) in all_peers.enumerate() {
// let (client, link) = {
// let mut net = net.lock();
// let link = net.peers[peer_id].data.lock().take().expect("link initialized at startup; qed");
// (
// net.peers[peer_id].client().clone(),
// link,
// )
// };
// finality_notifications.push(
// client.finality_notification_stream()
// .take_while(|n| Ok(n.header.number() < &20))
// .for_each(move |_| Ok(()))
// );
// let voter = run_grandpa(
// Config {
// gossip_duration: TEST_GOSSIP_DURATION,
// local_key,
// },
// link,
// MessageRouting::new(net.clone(), peer_id),
// ).expect("all in order with client and network");
for (peer_id, local_key) in all_peers {
let (client, link) = {
let mut net = net.lock();
let link = net.peers[peer_id].data.lock().take().expect("link initialized at startup; qed");
(
net.peers[peer_id].client().clone(),
link,
)
};
finality_notifications.push(
client.finality_notification_stream()
.take_while(|n| Ok(n.header.number() < &30))
.for_each(move |_| Ok(()))
.map(move |()| {
let set_raw = client.backend().get_aux(::AUTHORITY_SET_KEY).unwrap().unwrap();
let set = AuthoritySet::<Hash, BlockNumber>::decode(&mut &set_raw[..]).unwrap();
// runtime.spawn(voter);
// }
assert_eq!(set.current(), (2, make_ids(peers_c).as_slice()));
assert!(set.pending_changes().is_empty());
})
);
let voter = run_grandpa(
Config {
gossip_duration: TEST_GOSSIP_DURATION,
local_key,
name: Some(format!("peer#{}", peer_id)),
},
link,
MessageRouting::new(net.clone(), peer_id),
).expect("all in order with client and network");
// // wait for all finalized on each.
// let wait_for = ::futures::future::join_all(finality_notifications)
// .map(|_| ())
// .map_err(|_| ());
runtime.spawn(voter);
}
// let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL)
// .for_each(move |_| { net.lock().route_until_complete(); Ok(()) })
// .map(|_| ())
// .map_err(|_| ());
// wait for all finalized on each.
let wait_for = ::futures::future::join_all(finality_notifications)
.map(|_| ())
.map_err(|_| ());
// runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap();
let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL)
.for_each(move |_| { net.lock().route_until_complete(); Ok(()) })
.map(|_| ())
.map_err(|_| ());
runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap();
}