Register protocols at the same time as we start (#385)

* Register protocols at the same time as we start

* Fix tests
This commit is contained in:
Pierre Krieger
2018-07-19 20:50:07 +02:00
committed by Gav Wood
parent 0fbf66b07e
commit 831810b155
4 changed files with 56 additions and 70 deletions
@@ -649,11 +649,6 @@ impl NetworkState {
self.disabled_peers.lock().insert(peer_info.id.clone(), timeout);
}
/// Returns true if a peer is disabled.
pub fn is_peer_disabled(&self, node_id: &PeerstorePeerId) -> bool {
is_peer_disabled(&self.disabled_peers, node_id)
}
/// Flushes the caches to the disk.
///
/// This is done in an atomical way, so that an error doesn't corrupt
@@ -887,7 +882,6 @@ fn open_priv_key_file<P>(path: P) -> Result<fs::File, IoError>
#[cfg(test)]
mod tests {
use futures::sync::mpsc;
use libp2p::core::{Endpoint, PublicKey};
use network_state::NetworkState;
@@ -138,30 +138,6 @@ impl NetworkService {
})
}
/// Register a new protocol handler with the event loop.
pub fn register_protocol(
&self,
handler: Arc<NetworkProtocolHandler + Send + Sync>,
protocol: ProtocolId,
versions: &[(u8, u8)]
) {
if self.shared.network_state.has_connected_peer() {
// TODO: figure out if that's correct
warn!(target: "sub-libp2p", "a new network protocol was registered \
while the service was already active ; this is a programmer \
error");
}
self.shared.protocols.write().0
.push(RegisteredProtocol::new(handler.clone(), protocol, versions));
handler.initialize(&NetworkContextImpl {
inner: self.shared.clone(),
protocol: protocol.clone(),
current_peer: None,
});
}
/// Returns network configuration.
pub fn config(&self) -> &NetworkConfiguration {
&self.shared.config
@@ -178,13 +154,23 @@ impl NetworkService {
)
}
/// Start network IO
/// Start network IO.
/// Note that we could use an iterator for `protocols`, but having a
/// generic here is too much and crashes the Rust compiler.
// TODO (design): the notion of having a `NetworkService` alive should mean
// that it is running ; the `start` and `stop` functions are bad design
pub fn start(&self) -> Result<(), (Error, Option<SocketAddr>)> {
pub fn start(
&self,
protocols: Vec<(Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, &[(u8, u8)])>
) -> Result<(), (Error, Option<SocketAddr>)> {
// TODO: check that service is started already?
*self.shared.protocols.write() = Default::default();
*self.shared.protocols.write() = RegisteredProtocols(
protocols.into_iter()
.map(|(handler, protocol, versions)|
RegisteredProtocol::new(handler.clone(), protocol, versions))
.collect()
);
// Channel we use to signal success or failure of the bg thread
// initialization process.
@@ -193,6 +179,17 @@ impl NetworkService {
// should stop
let (close_tx, close_rx) = oneshot::channel();
let (timeouts_register_tx, timeouts_register_rx) = mpsc::unbounded();
*self.shared.timeouts_register_tx.write() = timeouts_register_tx;
// Initialize all the protocols now.
for protocol in self.shared.protocols.read().0.iter() {
protocol.custom_data().initialize(&NetworkContextImpl {
inner: self.shared.clone(),
protocol: protocol.id().clone(),
current_peer: None,
});
}
let shared = self.shared.clone();
let join_handle = thread::spawn(move || {
// Tokio core that is going to run everything in this thread.
@@ -229,7 +226,6 @@ impl NetworkService {
.map_err(|err| (err, self.shared.config.listen_address.clone()))?;
*self.bg_thread.lock() = Some((close_tx, join_handle));
*self.shared.timeouts_register_tx.write() = timeouts_register_tx;
Ok(())
}
@@ -479,6 +475,7 @@ fn init_thread(
}
// Explicitely connect to the boostrap nodes as a temporary measure.
trace!(target: "sub-libp2p", "Dialing bootnodes");
for bootnode in shared.config.boot_nodes.iter() {
// TODO: this code is copy-pasted from `network_state`, but it is
// temporary anyway
@@ -491,7 +488,6 @@ fn init_thread(
_ => return Err(ErrorKind::BadProtocol.into()),
};
trace!(target: "sub-libp2p", "Dialing bootnode {:?}", peer_id);
for proto in shared.protocols.read().0.clone().into_iter() {
open_peer_custom_proto(
shared.clone(),
@@ -1002,11 +998,8 @@ fn open_peer_custom_proto<T, To, St, C>(
// Don't connect to ourselves.
// TODO: remove this eventually
if &expected_peer_id == shared.kad_system.local_peer_id() {
return
}
// Don't connect to a disabled peer.
if shared.network_state.is_peer_disabled(&expected_peer_id) {
trace!(target: "sub-libp2p", "Skipped connecting to {:?} because \
it is ourselves", expected_peer_id);
return
}
@@ -1058,15 +1051,20 @@ fn open_peer_custom_proto<T, To, St, C>(
}
});
if let Ok((peer_id, unique_connec)) = shared2.network_state
.custom_proto(node_id.clone(), proto_id, Endpoint::Dialer) {
if !unique_connec.is_alive() {
trace!(target: "sub-libp2p", "Opening connection to #{} {:?} with \
proto {:?}", peer_id, node_id, proto_id);
}
match shared2.network_state.custom_proto(node_id.clone(), proto_id, Endpoint::Dialer) {
Ok((peer_id, unique_connec)) => {
if !unique_connec.is_alive() {
trace!(target: "sub-libp2p", "Opening connection to #{} {:?} with \
proto {:?}", peer_id, node_id, proto_id);
}
// TODO: this future should be used
let _ = unique_connec.get_or_dial(&swarm_controller, &addr, with_err);
// TODO: this future should be used
let _ = unique_connec.get_or_dial(&swarm_controller, &addr, with_err);
},
Err(err) => {
trace!(target: "sub-libp2p", "Error while opening connection to
{:?} with proto {:?} => {:?}", node_id, proto_id, err);
},
}
}
@@ -1296,6 +1294,6 @@ mod tests {
fn builds_and_finishes_in_finite_time() {
// Checks that merely starting the network doesn't end up in an infinite loop.
let service = NetworkService::new(Default::default(), None).unwrap();
service.start().map_err(|(err, _)| err).unwrap();
service.start(vec![]).map_err(|(err, _)| err).unwrap();
}
}
@@ -47,12 +47,6 @@ impl TestProtocol {
drop_session: drop_session,
}
}
/// Creates and register protocol with the network service
pub fn register(service: &mut NetworkService, drop_session: bool) -> Arc<TestProtocol> {
let handler = Arc::new(TestProtocol::new(drop_session));
service.register_protocol(handler.clone(), *b"tst", &[(42u8, 1), (43u8, 1)]);
handler
}
pub fn got_packet(&self) -> bool {
self.packet.lock()[..] == b"hello"[..]
@@ -100,17 +94,16 @@ impl NetworkProtocolHandler for TestProtocol {
#[test]
fn net_service() {
let service = NetworkService::new(NetworkConfiguration::new_local(), None).expect("Error creating network service");
service.start().unwrap();
service.register_protocol(Arc::new(TestProtocol::new(false)), *b"myp", &[(1u8, 1)]);
service.start(vec![(Arc::new(TestProtocol::new(false)), *b"myp", &[(1u8, 1)])]).unwrap();
}
#[test]
fn net_start_stop() {
let config = NetworkConfiguration::new_local();
let service = NetworkService::new(config, None).unwrap();
service.start().unwrap();
service.start(vec![]).unwrap();
service.stop();
service.start().unwrap();
service.start(vec![]).unwrap();
}
#[test]
@@ -120,14 +113,14 @@ fn net_disconnect() {
let mut config1 = NetworkConfiguration::new_local();
config1.use_secret = Some(key1.secret().clone());
config1.boot_nodes = vec![ ];
let mut service1 = NetworkService::new(config1, None).unwrap();
service1.start().unwrap();
let handler1 = TestProtocol::register(&mut service1, false);
let service1 = NetworkService::new(config1, None).unwrap();
let handler1 = Arc::new(TestProtocol::new(false));
service1.start(vec![(handler1.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap();
let mut config2 = NetworkConfiguration::new_local();
config2.boot_nodes = vec![ service1.external_url().unwrap() ];
let mut service2 = NetworkService::new(config2, None).unwrap();
service2.start().unwrap();
let handler2 = TestProtocol::register(&mut service2, true);
let service2 = NetworkService::new(config2, None).unwrap();
let handler2 = Arc::new(TestProtocol::new(true));
service2.start(vec![(handler2.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap();
while !(handler1.got_disconnect() && handler2.got_disconnect()) {
thread::sleep(Duration::from_millis(50));
}
@@ -138,9 +131,9 @@ fn net_disconnect() {
#[test]
fn net_timeout() {
let config = NetworkConfiguration::new_local();
let mut service = NetworkService::new(config, None).unwrap();
service.start().unwrap();
let handler = TestProtocol::register(&mut service, false);
let service = NetworkService::new(config, None).unwrap();
let handler = Arc::new(TestProtocol::new(false));
service.start(vec![(handler.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap();
while !handler.got_timeout() {
thread::sleep(Duration::from_millis(50));
}
+3 -2
View File
@@ -203,13 +203,14 @@ impl<B: BlockT + 'static, S: Specialization<B>> Service<B, S> {
}
fn start(&self) {
match self.network.start().map_err(|e| e.0.into()) {
let versions = [(::protocol::CURRENT_VERSION as u8, ::protocol::CURRENT_PACKET_COUNT)];
let protocols = vec![(self.handler.clone() as Arc<_>, self.protocol_id, &versions[..])];
match self.network.start(protocols).map_err(|e| e.0.into()) {
Err(ErrorKind::Io(ref e)) if e.kind() == io::ErrorKind::AddrInUse =>
warn!("Network port is already in use, make sure that another instance of Polkadot client is not running or change the port using the --port option."),
Err(err) => warn!("Error starting network: {}", err),
_ => {},
};
self.network.register_protocol(self.handler.clone(), self.protocol_id, &[(::protocol::CURRENT_VERSION as u8, ::protocol::CURRENT_PACKET_COUNT)]);
}
fn stop(&self) {