BFT gossip (#106)

* CLI options and keystore integration

* Replace multiqueue with future::mpsc

* BFT gossip

* Revert to app_dirs

* generate_from_seed commented
This commit is contained in:
Arkadiy Paronyan
2018-04-03 18:06:34 +02:00
committed by Robert Habermeier
parent 5fad9efc0a
commit 3ec6d2dde6
19 changed files with 288 additions and 179 deletions
+1 -1
View File
@@ -9,7 +9,7 @@ log = "0.3"
parking_lot = "0.4"
triehash = "0.1"
hex-literal = "0.1"
multiqueue = "0.3"
futures = "0.1.17"
ed25519 = { path = "../ed25519" }
substrate-bft = { path = "../bft" }
substrate-codec = { path = "../codec" }
+9 -17
View File
@@ -16,7 +16,7 @@
//! Substrate Client
use multiqueue;
use futures::sync::mpsc;
use parking_lot::Mutex;
use primitives::{self, block, AuthorityId};
use primitives::block::Id as BlockId;
@@ -30,17 +30,13 @@ use blockchain::{self, Info as ChainInfo, Backend as ChainBackend};
use {error, in_mem, block_builder, runtime_io, bft};
/// Type that implements `futures::Stream` of block import events.
pub type BlockchainEventStream = multiqueue::BroadcastFutReceiver<BlockImportNotification>;
//TODO: The queue is preallocated in multiqueue. Make it unbounded
const NOTIFICATION_QUEUE_SIZE: u64 = 1 << 16;
pub type BlockchainEventStream = mpsc::UnboundedReceiver<BlockImportNotification>;
/// Polkadot Client
pub struct Client<B, E> where B: backend::Backend {
backend: B,
executor: E,
import_notification_sink: Mutex<multiqueue::BroadcastFutSender<BlockImportNotification>>,
import_notification_stream: Mutex<multiqueue::BroadcastFutReceiver<BlockImportNotification>>,
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<BlockImportNotification>>>,
}
/// A source of blockchain evenets.
@@ -165,7 +161,6 @@ impl<B, E> Client<B, E> where
where
F: FnOnce() -> (block::Header, Vec<(Vec<u8>, Vec<u8>)>)
{
let (sink, stream) = multiqueue::broadcast_fut_queue(NOTIFICATION_QUEUE_SIZE);
if backend.blockchain().header(BlockId::Number(0))?.is_none() {
trace!("Empty database, writing genesis block");
let (genesis_header, genesis_store) = build_genesis();
@@ -177,8 +172,7 @@ impl<B, E> Client<B, E> where
Ok(Client {
backend,
executor,
import_notification_sink: Mutex::new(sink),
import_notification_stream: Mutex::new(stream),
import_notification_sinks: Mutex::new(Vec::new()),
})
}
@@ -212,9 +206,7 @@ impl<B, E> Client<B, E> where
/// Close notification streams.
pub fn stop_notifications(&self) {
let (sink, stream) = multiqueue::broadcast_fut_queue(NOTIFICATION_QUEUE_SIZE);
*self.import_notification_sink.lock() = sink;
*self.import_notification_stream.lock() = stream;
self.import_notification_sinks.lock().clear();
}
/// Get the current set of authorities from storage.
@@ -325,9 +317,7 @@ impl<B, E> Client<B, E> where
header: header,
is_new_best: is_new_best,
};
if let Err(e) = self.import_notification_sink.lock().try_send(notification) {
warn!("Error queueing block import notification: {:?}", e);
}
self.import_notification_sinks.lock().retain(|sink| !sink.unbounded_send(notification.clone()).is_err());
}
Ok(ImportResult::Queued)
@@ -424,7 +414,9 @@ impl<B, E> BlockchainEvents for Client<B, E>
{
/// Get block import event stream.
fn import_notification_stream(&self) -> BlockchainEventStream {
self.import_notification_stream.lock().add_stream()
let (sink, stream) = mpsc::unbounded();
self.import_notification_sinks.lock().push(sink);
stream
}
}
+1 -1
View File
@@ -31,7 +31,7 @@ extern crate ed25519;
extern crate triehash;
extern crate parking_lot;
extern crate multiqueue;
extern crate futures;
#[cfg(test)] #[macro_use] extern crate hex_literal;
#[macro_use] extern crate error_chain;
#[macro_use] extern crate log;