Consensus Engines Implementation: Aura (#911)

* Generalize BlockImport

 - move ImportBlock, BlockOrigin, ImportResult into shared sr-primitives
 - let Consensus provide  and  traits again
 - update consensus traits to latest development
 - implement traits on client::Client, test_client::TestClient
 - update RHD to use the new import_block API

* Move ImportBlock into consensus-common
* Send import notification in aura tests
* Integrating aura into service
* Make Signatures more generic
* Aura Block Production with the given key
* run aura on the thread pool
* start at exact step start in aura
* Add needed wasm blob, in leiu of better solutions.
* Make API ids consistent with traits and bring upstream for sharing.
* Add decrease_free_balance to Balances module
* Encode `Metadata` once instead of two times
* Bitops include xor
* Upgrade key module.
* Default pages to somewhat bigger.
* Introduce upgrade key into node
* Add `Created` event
This commit is contained in:
Benjamin Kampmann
2018-10-27 15:59:18 +02:00
committed by GitHub
parent c0f7021427
commit 50adea6220
82 changed files with 3125 additions and 1902 deletions
+9 -4
View File
@@ -16,10 +16,12 @@
//! Blockchain access trait
use client::{self, Client as SubstrateClient, ImportBlock, ImportResult, ClientInfo, BlockStatus, CallExecutor};
use client::{self, Client as SubstrateClient, ClientInfo, BlockStatus, CallExecutor};
use client::error::Error;
use consensus::BlockImport;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use runtime_primitives::generic::BlockId;
use runtime_primitives::generic::{BlockId};
use consensus::{ImportBlock, ImportResult};
use runtime_primitives::Justification;
use primitives::{Blake2Hasher, AuthorityId};
@@ -69,9 +71,12 @@ pub trait Client<Block: BlockT>: Send + Sync {
impl<B, E, Block> Client<Block> for SubstrateClient<B, E, Block> where
B: client::backend::Backend<Block, Blake2Hasher> + Send + Sync + 'static,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
Block: BlockT,
Self: BlockImport<Block, Error=Error>,
Block: BlockT
{
fn import(&self, block: ImportBlock<Block>, new_authorities: Option<Vec<AuthorityId>>) -> Result<ImportResult, Error> {
fn import(&self, block: ImportBlock<Block>, new_authorities: Option<Vec<AuthorityId>>)
-> Result<ImportResult, Error>
{
(self as &SubstrateClient<B, E, Block>).import_block(block, new_authorities)
}
+6 -7
View File
@@ -28,8 +28,6 @@ use std::collections::{HashSet, VecDeque};
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicBool, Ordering};
use parking_lot::{Condvar, Mutex, RwLock};
pub use client::{BlockOrigin, ImportBlock, ImportResult};
use network_libp2p::{NodeIndex, Severity};
use primitives::AuthorityId;
@@ -42,6 +40,9 @@ use protocol::Context;
use service::ExecuteInContext;
use sync::ChainSync;
pub use consensus::{ImportBlock, ImportResult, BlockOrigin};
#[cfg(any(test, feature = "test-helpers"))]
use std::cell::RefCell;
@@ -65,7 +66,6 @@ pub trait ImportQueue<B: BlockT>: Send + Sync {
///
/// This is called automatically by the network service when synchronization
/// begins.
fn start<E>(
&self,
_sync: Weak<RwLock<ChainSync<B>>>,
@@ -284,7 +284,7 @@ struct SyncLink<'a, B: 'a + BlockT, E: 'a + ExecuteInContext<B>> {
}
impl<'a, B: 'static + BlockT, E: 'a + ExecuteInContext<B>> SyncLink<'a, B, E> {
/// Execute closure with locked ChainSync.
/// Execute closure with locked ChainSync.
fn with_sync<F: Fn(&mut ChainSync<B>, &mut Context<B>)>(&mut self, closure: F) {
let service = self.context;
let sync = self.chain;
@@ -440,7 +440,6 @@ fn import_single_block<B: BlockT, V: Verifier<B>>(
trace!(target: "sync", "Verifying {}({}) failed: {}", number, hash, msg);
}
BlockImportError::VerificationFailed(peer, msg)
})?;
match chain.import(import_block, new_authorities) {
@@ -545,7 +544,7 @@ unsafe impl<B: BlockT> Sync for ImportCB<B> {}
#[cfg(any(test, feature = "test-helpers"))]
/// A Verifier that accepts all blocks and passes them on with the configured
/// finality to be imported.
/// finality to be imported.
pub struct PassThroughVerifier(pub bool);
#[cfg(any(test, feature = "test-helpers"))]
@@ -564,7 +563,7 @@ impl<B: BlockT> Verifier<B> for PassThroughVerifier {
body,
finalized: self.0,
external_justification: justification,
internal_justification: vec![],
post_runtime_digests: vec![],
auxiliary: Vec::new(),
}, None))
}
+1
View File
@@ -28,6 +28,7 @@ extern crate substrate_primitives as primitives;
extern crate substrate_client as client;
extern crate sr_primitives as runtime_primitives;
extern crate substrate_network_libp2p as network_libp2p;
extern crate substrate_consensus_common as consensus;
extern crate parity_codec as codec;
extern crate futures;
extern crate rustc_hex;
+16 -16
View File
@@ -776,41 +776,41 @@ macro_rules! construct_simple_protocol {
fn on_connect(
&mut self,
ctx: &mut $crate::Context<$block>,
who: $crate::NodeIndex,
status: $crate::StatusMessage<$block>
_ctx: &mut $crate::Context<$block>,
_who: $crate::NodeIndex,
_status: $crate::StatusMessage<$block>
) {
$( self.$sub_protocol_name.on_connect(ctx, who, status); )*
$( self.$sub_protocol_name.on_connect(_ctx, _who, _status); )*
}
fn on_disconnect(&mut self, ctx: &mut $crate::Context<$block>, who: $crate::NodeIndex) {
$( self.$sub_protocol_name.on_disconnect(ctx, who); )*
fn on_disconnect(&mut self, _ctx: &mut $crate::Context<$block>, _who: $crate::NodeIndex) {
$( self.$sub_protocol_name.on_disconnect(_ctx, _who); )*
}
fn on_message(
&mut self,
ctx: &mut $crate::Context<$block>,
who: $crate::NodeIndex,
message: &mut Option<$crate::message::Message<$block>>
_ctx: &mut $crate::Context<$block>,
_who: $crate::NodeIndex,
_message: &mut Option<$crate::message::Message<$block>>
) {
$( self.$sub_protocol_name.on_message(ctx, who, message); )*
$( self.$sub_protocol_name.on_message(_ctx, _who, _message); )*
}
fn on_abort(&mut self) {
$( self.$sub_protocol_name.on_abort(); )*
}
fn maintain_peers(&mut self, ctx: &mut $crate::Context<$block>) {
$( self.$sub_protocol_name.maintain_peers(ctx); )*
fn maintain_peers(&mut self, _ctx: &mut $crate::Context<$block>) {
$( self.$sub_protocol_name.maintain_peers(_ctx); )*
}
fn on_block_imported(
&mut self,
ctx: &mut $crate::Context<$block>,
hash: <$block as $crate::BlockT>::Hash,
header: &<$block as $crate::BlockT>::Header
_ctx: &mut $crate::Context<$block>,
_hash: <$block as $crate::BlockT>::Hash,
_header: &<$block as $crate::BlockT>::Header
) {
$( self.$sub_protocol_name.on_block_imported(ctx, hash, header); )*
$( self.$sub_protocol_name.on_block_imported(_ctx, _hash, _header); )*
}
}
}
+6
View File
@@ -178,6 +178,12 @@ impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> Service<B, S, H> {
}
}
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> ::consensus::SyncOracle for Service<B, S, H> {
fn is_major_syncing(&self) -> bool {
self.handler.sync().read().status().is_major_syncing()
}
}
impl<B: BlockT + 'static, S: Specialization<B>, H:ExHashT> Drop for Service<B, S, H> {
fn drop(&mut self) {
self.handler.stop();
@@ -25,9 +25,6 @@ pub trait Specialization<B: BlockT>: Send + Sync + 'static {
/// Get the current specialization-status.
fn status(&self) -> Vec<u8>;
/// Called on start-up.
fn on_start(&mut self) { }
/// Called when a peer successfully handshakes.
fn on_connect(&mut self, ctx: &mut Context<B>, who: NodeIndex, status: ::message::Status<B>);
+13 -1
View File
@@ -18,7 +18,8 @@ use std::collections::HashMap;
use std::sync::Arc;
use protocol::Context;
use network_libp2p::{Severity, NodeIndex};
use client::{BlockStatus, BlockOrigin, ClientInfo};
use client::{BlockStatus, ClientInfo};
use consensus::BlockOrigin;
use client::error::Error as ClientError;
use blocks::{self, BlockCollection};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor};
@@ -77,6 +78,17 @@ pub struct Status<B: BlockT> {
pub best_seen_block: Option<NumberFor<B>>,
}
impl<B: BlockT> Status<B> {
/// Whether the synchronization status is doing major downloading work or
/// is near the head of the chain.
pub fn is_major_syncing(&self) -> bool {
match self.state {
SyncState::Idle => false,
SyncState::Downloading => true,
}
}
}
impl<B: BlockT> ChainSync<B> {
/// Create a new instance.
pub(crate) fn new(role: Roles, info: &ClientInfo<B>, import_queue: Arc<ImportQueue<B>>) -> Self {
+152 -75
View File
@@ -34,14 +34,16 @@ use service::TransactionPool;
use network_libp2p::{NodeIndex, PeerId, Severity};
use keyring::Keyring;
use codec::Encode;
use import_queue::{SyncImportQueue, PassThroughVerifier};
use test_client::{self, TestClient};
use import_queue::{SyncImportQueue, PassThroughVerifier, Verifier};
use consensus::BlockOrigin;
use specialization::Specialization;
use consensus_gossip::ConsensusGossip;
use import_queue::ImportQueue;
use service::ExecuteInContext;
use test_client;
pub use test_client::runtime::{Block, Hash, Transfer, Extrinsic};
pub use test_client::TestClient;
struct DummyContextExecutor(Arc<Protocol<Block, DummySpecialization, Hash>>, Arc<RwLock<VecDeque<TestPacket>>>);
unsafe impl Send for DummyContextExecutor {}
@@ -135,20 +137,22 @@ pub struct TestPacket {
recipient: NodeIndex,
}
pub struct Peer {
client: Arc<client::Client<test_client::Backend, test_client::Executor, Block>>,
pub type PeersClient = client::Client<test_client::Backend, test_client::Executor, Block>;
pub struct Peer<V: Verifier<Block>> {
client: Arc<PeersClient>,
pub sync: Arc<Protocol<Block, DummySpecialization, Hash>>,
pub queue: Arc<RwLock<VecDeque<TestPacket>>>,
import_queue: Arc<SyncImportQueue<Block, PassThroughVerifier>>,
import_queue: Arc<SyncImportQueue<Block, V>>,
executor: Arc<DummyContextExecutor>,
}
impl Peer {
impl<V: 'static + Verifier<Block>> Peer<V> {
fn new(
client: Arc<client::Client<test_client::Backend, test_client::Executor, Block>>,
client: Arc<PeersClient>,
sync: Arc<Protocol<Block, DummySpecialization, Hash>>,
queue: Arc<RwLock<VecDeque<TestPacket>>>,
import_queue: Arc<SyncImportQueue<Block, PassThroughVerifier>>,
import_queue: Arc<SyncImportQueue<Block, V>>,
) -> Self {
let executor = Arc::new(DummyContextExecutor(sync.clone(), queue.clone()));
Peer { client, sync, queue, import_queue, executor}
@@ -201,6 +205,13 @@ impl Peer {
self.sync.tick(&mut TestIo::new(&self.queue, None));
}
/// Send block import notifications.
fn send_import_notifications(&self) {
let info = self.client.info().expect("In-mem client does not fail");
let header = self.client.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap();
self.sync.on_block_imported(&mut TestIo::new(&self.queue, None), info.chain.best_hash, &header);
}
/// Restart sync for a peer.
fn restart_sync(&self) {
self.sync.abort();
@@ -218,15 +229,18 @@ impl Peer {
}
/// Add blocks to the peer -- edit the block before adding
pub fn generate_blocks<F>(&self, count: usize, mut edit_block: F)
pub fn generate_blocks<F>(&self, count: usize, origin: BlockOrigin, mut edit_block: F)
where F: FnMut(&mut BlockBuilder<test_client::Backend, test_client::Executor, Block, Blake2Hasher>)
{
for _ in 0 .. count {
let mut builder = self.client.new_block().unwrap();
edit_block(&mut builder);
let block = builder.bake().unwrap();
trace!("Generating {}, (#{}, parent={})", block.header.hash(), block.header.number, block.header.parent_hash);
self.client.justify_and_import(client::BlockOrigin::File, block).unwrap();
let hash = block.header.hash();
trace!("Generating {}, (#{}, parent={})", hash, block.header.number, block.header.parent_hash);
let header = block.header.clone();
self.client.justify_and_import(origin, block).unwrap();
self.sync.on_block_imported(&mut TestIo::new(&self.queue, None), hash, &header);
}
}
@@ -234,7 +248,7 @@ impl Peer {
pub fn push_blocks(&self, count: usize, with_tx: bool) {
let mut nonce = 0;
if with_tx {
self.generate_blocks(count, |builder| {
self.generate_blocks(count, BlockOrigin::File, |builder| {
let transfer = Transfer {
from: Keyring::Alice.to_raw_public().into(),
to: Keyring::Alice.to_raw_public().into(),
@@ -246,7 +260,7 @@ impl Peer {
nonce = nonce + 1;
});
} else {
self.generate_blocks(count, |_| ());
self.generate_blocks(count, BlockOrigin::File, |_| ());
}
}
@@ -258,7 +272,7 @@ impl Peer {
}
/// Get a reference to the client.
pub fn client(&self) -> &Arc<client::Client<test_client::Backend, test_client::Executor, Block>> {
pub fn client(&self) -> &Arc<PeersClient> {
&self.client
}
}
@@ -277,25 +291,30 @@ impl TransactionPool<Hash, Block> for EmptyTransactionPool {
fn on_broadcasted(&self, _: HashMap<Hash, Vec<String>>) {}
}
pub struct TestNet {
peers: Vec<Arc<Peer>>,
started: bool,
disconnect_events: Vec<(NodeIndex, NodeIndex)>, //disconnected (initiated by, to)
}
pub trait TestNetFactory: Sized {
type Verifier: 'static + Verifier<Block>;
impl TestNet {
/// Create new test network with this many peers.
pub fn new(n: usize) -> Self {
Self::new_with_config(n, ProtocolConfig::default())
/// These two need to be implemented!
fn from_config(config: &ProtocolConfig) -> Self;
fn make_verifier(&self, client: Arc<PeersClient>, config: &ProtocolConfig) -> Arc<Self::Verifier>;
/// Get reference to peer.
fn peer(&self, i: usize) -> &Peer<Self::Verifier>;
fn peers(&self) -> &Vec<Arc<Peer<Self::Verifier>>>;
fn mut_peers<F: Fn(&mut Vec<Arc<Peer<Self::Verifier>>>)>(&mut self, closure: F );
fn started(&self) -> bool;
fn set_started(&mut self, now: bool);
fn default_config() -> ProtocolConfig {
ProtocolConfig::default()
}
/// Create new test network with peers and given config.
pub fn new_with_config(n: usize, config: ProtocolConfig) -> Self {
let mut net = TestNet {
peers: Vec::new(),
started: false,
disconnect_events: Vec::new(),
};
/// Create new test network with this many peers.
fn new(n: usize) -> Self {
let config = Self::default_config();
let mut net = Self::from_config(&config);
for _ in 0..n {
net.add_peer(&config);
@@ -304,10 +323,11 @@ impl TestNet {
}
/// Add a peer.
pub fn add_peer(&mut self, config: &ProtocolConfig) {
fn add_peer(&mut self, config: &ProtocolConfig) {
let client = Arc::new(test_client::new());
let tx_pool = Arc::new(EmptyTransactionPool);
let import_queue = Arc::new(SyncImportQueue::new(Arc::new(PassThroughVerifier(false))));
let verifier = self.make_verifier(client.clone(), config);
let import_queue = Arc::new(SyncImportQueue::new(verifier));
let specialization = DummySpecialization {
gossip: ConsensusGossip::new(),
};
@@ -320,93 +340,107 @@ impl TestNet {
specialization
).unwrap();
self.peers.push(Arc::new(Peer::new(
let peer = Arc::new(Peer::new(
client,
Arc::new(sync),
Arc::new(RwLock::new(VecDeque::new())),
import_queue
)));
}
));
/// Get reference to peer.
pub fn peer(&self, i: usize) -> &Peer {
&self.peers[i]
self.mut_peers(|peers| {
peers.push(peer.clone())
});
}
/// Start network.
fn start(&mut self) {
if self.started {
if self.started() {
return;
}
for peer in 0..self.peers.len() {
self.peers[peer].start();
for client in 0..self.peers.len() {
if peer != client {
self.peers[peer].on_connect(client as NodeIndex);
self.mut_peers(|peers| {
for peer in 0..peers.len() {
peers[peer].start();
for client in 0..peers.len() {
if peer != client {
peers[peer].on_connect(client as NodeIndex);
}
}
}
}
self.started = true;
});
self.set_started(true);
}
/// Do one step of routing.
pub fn route(&mut self) {
for peer in 0..self.peers.len() {
let packet = self.peers[peer].pending_message();
if let Some(packet) = packet {
let disconnecting = {
let recipient = packet.recipient;
trace!("--- {} -> {} ---", peer, recipient);
let to_disconnect = self.peers[recipient].receive_message(peer as NodeIndex, packet);
for d in &to_disconnect {
// notify this that disconnecting peers are disconnecting
self.peers[recipient].on_disconnect(*d as NodeIndex);
self.disconnect_events.push((peer, *d));
fn route(&mut self) {
self.mut_peers(move |peers| {
for peer in 0..peers.len() {
let packet = peers[peer].pending_message();
if let Some(packet) = packet {
let disconnecting = {
let recipient = packet.recipient;
trace!(target: "sync", "--- {} -> {} ---", peer, recipient);
let to_disconnect = peers[recipient].receive_message(peer as NodeIndex, packet);
for d in &to_disconnect {
// notify this that disconnecting peers are disconnecting
peers[recipient].on_disconnect(*d as NodeIndex);
}
to_disconnect
};
for d in &disconnecting {
// notify other peers that this peer is disconnecting
peers[*d].on_disconnect(peer as NodeIndex);
}
to_disconnect
};
for d in &disconnecting {
// notify other peers that this peer is disconnecting
self.peers[*d].on_disconnect(peer as NodeIndex);
}
}
}
});
}
/// Route messages between peers until all queues are empty.
pub fn route_until_complete(&mut self) {
fn route_until_complete(&mut self) {
while !self.done() {
self.route()
}
}
/// Do a step of synchronization.
pub fn sync_step(&mut self) {
fn sync_step(&mut self) {
self.route();
for peer in &mut self.peers {
peer.sync_step();
}
self.mut_peers(|peers| {
for peer in peers {
peer.sync_step();
}
})
}
/// Send block import notifications for all peers.
fn send_import_notifications(&mut self) {
self.mut_peers(|peers| {
for peer in peers {
peer.send_import_notifications();
}
})
}
/// Restart sync for a peer.
pub fn restart_peer(&mut self, i: usize) {
self.peers[i].restart_sync();
fn restart_peer(&mut self, i: usize) {
self.peers()[i].restart_sync();
}
/// Perform synchronization until complete.
pub fn sync(&mut self) -> u32 {
fn sync(&mut self) -> u32 {
self.start();
let mut total_steps = 0;
while !self.done() {
self.sync_step();
total_steps += 1;
self.route();
}
total_steps
}
/// Do the given amount of sync steps.
pub fn sync_steps(&mut self, count: usize) {
fn sync_steps(&mut self, count: usize) {
self.start();
for _ in 0..count {
self.sync_step();
@@ -414,7 +448,50 @@ impl TestNet {
}
/// Whether all peers have synced.
pub fn done(&self) -> bool {
self.peers.iter().all(|p| p.is_done())
fn done(&self) -> bool {
self.peers().iter().all(|p| p.is_done())
}
}
pub struct TestNet {
peers: Vec<Arc<Peer<PassThroughVerifier>>>,
started: bool
}
impl TestNetFactory for TestNet {
type Verifier = PassThroughVerifier;
/// Create new test network with peers and given config.
fn from_config(_config: &ProtocolConfig) -> Self {
TestNet {
peers: Vec::new(),
started: false
}
}
fn make_verifier(&self, _client: Arc<PeersClient>, _config: &ProtocolConfig)
-> Arc<Self::Verifier>
{
Arc::new(PassThroughVerifier(false))
}
fn peer(&self, i: usize) -> &Peer<Self::Verifier> {
&self.peers[i]
}
fn peers(&self) -> &Vec<Arc<Peer<Self::Verifier>>> {
&self.peers
}
fn mut_peers<F: Fn(&mut Vec<Arc<Peer<Self::Verifier>>>)>(&mut self, closure: F ) {
closure(&mut self.peers);
}
fn started(&self) -> bool {
self.started
}
fn set_started(&mut self, new: bool) {
self.started = new;
}
}
+16
View File
@@ -16,6 +16,7 @@
use client::backend::Backend;
use client::blockchain::HeaderBackend as BlockchainHeaderBackend;
use consensus::BlockOrigin;
use sync::SyncState;
use Roles;
use super::*;
@@ -68,6 +69,7 @@ fn sync_no_common_longer_chain_fails() {
fn sync_after_fork_works() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
net.sync_step();
net.peer(0).push_blocks(30, false);
net.peer(1).push_blocks(30, false);
net.peer(2).push_blocks(30, false);
@@ -87,6 +89,20 @@ fn sync_after_fork_works() {
assert!(net.peer(2).client.backend().blockchain().canon_equals_to(&peer1_chain));
}
#[test]
fn own_blocks_are_announced() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
net.sync(); // connect'em
net.peer(0).generate_blocks(1, BlockOrigin::Own, |_| ());
net.sync();
assert_eq!(net.peer(0).client.backend().blockchain().info().unwrap().best_number, 1);
assert_eq!(net.peer(1).client.backend().blockchain().info().unwrap().best_number, 1);
let peer0_chain = net.peer(0).client.backend().blockchain().clone();
assert!(net.peer(1).client.backend().blockchain().canon_equals_to(&peer0_chain));
assert!(net.peer(2).client.backend().blockchain().canon_equals_to(&peer0_chain));
}
#[test]
fn blocks_are_not_announced_by_light_nodes() {
::env_logger::init().ok();