diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index 016a884fe9..8fbc6dba3a 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -49,7 +49,7 @@ use parking_lot::Mutex; use polkadot_consensus::{Statement, SignedStatement, GenericStatement}; use polkadot_primitives::{AccountId, Block, SessionKey, Hash, Header}; use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt, Collation}; -use substrate_network::{PeerId, RequestId, Context, Severity}; +use substrate_network::{NodeIndex, RequestId, Context, Severity}; use substrate_network::consensus_gossip::ConsensusGossip; use substrate_network::{message, generic_message}; use substrate_network::specialization::Specialization; @@ -244,7 +244,7 @@ impl Decode for Message { } } -fn send_polkadot_message(ctx: &mut Context, to: PeerId, message: Message) { +fn send_polkadot_message(ctx: &mut Context, to: NodeIndex, message: Message) { trace!(target: "p_net", "Sending polkadot message to {}: {:?}", to, message); let encoded = message.encode(); ctx.send_message(to, generic_message::Message::ChainSpecific(encoded)) @@ -252,14 +252,14 @@ fn send_polkadot_message(ctx: &mut Context, to: PeerId, message: Message) /// Polkadot protocol attachment for substrate. pub struct PolkadotProtocol { - peers: HashMap, + peers: HashMap, collating_for: Option<(AccountId, ParaId)>, consensus_gossip: ConsensusGossip, collators: CollatorPool, - validators: HashMap, + validators: HashMap, local_collations: LocalCollations, live_consensus: Option, - in_flight: HashMap<(RequestId, PeerId), BlockDataRequest>, + in_flight: HashMap<(RequestId, NodeIndex), BlockDataRequest>, pending: Vec, next_req_id: u64, } @@ -352,17 +352,17 @@ impl PolkadotProtocol { .map(|(_, id)| id); // dispatch to peer - if let Some(peer_id) = next_peer { + if let Some(who) = next_peer { let req_id = self.next_req_id; self.next_req_id += 1; send_polkadot_message( ctx, - peer_id, + who, Message::RequestBlockData(req_id, pending.candidate_hash) ); - self.in_flight.insert((req_id, peer_id), pending); + self.in_flight.insert((req_id, who), pending); continue; } @@ -374,36 +374,36 @@ impl PolkadotProtocol { self.pending = new_pending; } - fn on_polkadot_message(&mut self, ctx: &mut Context, peer_id: PeerId, raw: Vec, msg: Message) { - trace!(target: "p_net", "Polkadot message from {}: {:?}", peer_id, msg); + fn on_polkadot_message(&mut self, ctx: &mut Context, who: NodeIndex, raw: Vec, msg: Message) { + trace!(target: "p_net", "Polkadot message from {}: {:?}", who, msg); match msg { Message::Statement(parent_hash, _statement) => - self.consensus_gossip.on_chain_specific(ctx, peer_id, raw, parent_hash), - Message::SessionKey(key) => self.on_session_key(ctx, peer_id, key), + self.consensus_gossip.on_chain_specific(ctx, who, raw, parent_hash), + Message::SessionKey(key) => self.on_session_key(ctx, who, key), Message::RequestBlockData(req_id, hash) => { let block_data = self.live_consensus.as_ref() .and_then(|c| c.block_data(&hash)); - send_polkadot_message(ctx, peer_id, Message::BlockData(req_id, block_data)); + send_polkadot_message(ctx, who, Message::BlockData(req_id, block_data)); } - Message::BlockData(req_id, data) => self.on_block_data(ctx, peer_id, req_id, data), - Message::Collation(relay_parent, collation) => self.on_collation(ctx, peer_id, relay_parent, collation), - Message::CollatorRole(role) => self.on_new_role(ctx, peer_id, role), + Message::BlockData(req_id, data) => self.on_block_data(ctx, who, req_id, data), + Message::Collation(relay_parent, collation) => self.on_collation(ctx, who, relay_parent, collation), + Message::CollatorRole(role) => self.on_new_role(ctx, who, role), } } - fn on_session_key(&mut self, ctx: &mut Context, peer_id: PeerId, key: SessionKey) { + fn on_session_key(&mut self, ctx: &mut Context, who: NodeIndex, key: SessionKey) { { - let info = match self.peers.get_mut(&peer_id) { + let info = match self.peers.get_mut(&who) { Some(peer) => peer, None => { - trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", peer_id); + trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", who); return } }; if !info.claimed_validator { - ctx.report_peer(peer_id, Severity::Bad("Session key broadcasted without setting authority role")); + ctx.report_peer(who, Severity::Bad("Session key broadcasted without setting authority role")); return; } @@ -413,20 +413,20 @@ impl PolkadotProtocol { for (relay_parent, collation) in self.local_collations.fresh_key(&old_key, &key) { send_polkadot_message( ctx, - peer_id, + who, Message::Collation(relay_parent, collation), ) } } - self.validators.insert(key, peer_id); + self.validators.insert(key, who); } self.dispatch_pending_requests(ctx); } - fn on_block_data(&mut self, ctx: &mut Context, peer_id: PeerId, req_id: RequestId, data: Option) { - match self.in_flight.remove(&(req_id, peer_id)) { + fn on_block_data(&mut self, ctx: &mut Context, who: NodeIndex, req_id: RequestId, data: Option) { + match self.in_flight.remove(&(req_id, who)) { Some(req) => { if let Some(data) = data { if data.hash() == req.block_data_hash { @@ -438,29 +438,29 @@ impl PolkadotProtocol { self.pending.push(req); self.dispatch_pending_requests(ctx); } - None => ctx.report_peer(peer_id, Severity::Bad("Unexpected block data response")), + None => ctx.report_peer(who, Severity::Bad("Unexpected block data response")), } } // when a validator sends us (a collator) a new role. - fn on_new_role(&mut self, ctx: &mut Context, peer_id: PeerId, role: Role) { - let info = match self.peers.get(&peer_id) { + fn on_new_role(&mut self, ctx: &mut Context, who: NodeIndex, role: Role) { + let info = match self.peers.get(&who) { Some(peer) => peer, None => { - trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", peer_id); + trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", who); return } }; match info.validator_key { None => ctx.report_peer( - peer_id, + who, Severity::Bad("Sent collator role without registering first as validator"), ), Some(key) => for (relay_parent, collation) in self.local_collations.note_validator_role(key, role) { send_polkadot_message( ctx, - peer_id, + who, Message::Collation(relay_parent, collation), ) }, @@ -473,7 +473,7 @@ impl Specialization for PolkadotProtocol { Status { collating_for: self.collating_for.clone() }.encode() } - fn on_connect(&mut self, ctx: &mut Context, peer_id: PeerId, status: FullStatus) { + fn on_connect(&mut self, ctx: &mut Context, who: NodeIndex, status: FullStatus) { let local_status = match Status::decode(&mut &status.chain_status[..]) { Some(status) => status, None => { @@ -483,14 +483,14 @@ impl Specialization for PolkadotProtocol { if let Some((ref acc_id, ref para_id)) = local_status.collating_for { if self.collator_peer_id(acc_id.clone()).is_some() { - ctx.report_peer(peer_id, Severity::Useless("Unknown Polkadot-specific reason")); + ctx.report_peer(who, Severity::Useless("Unknown Polkadot-specific reason")); return } let collator_role = self.collators.on_new_collator(acc_id.clone(), para_id.clone()); send_polkadot_message( ctx, - peer_id, + who, Message::CollatorRole(collator_role), ); } @@ -498,17 +498,17 @@ impl Specialization for PolkadotProtocol { let validator = status.roles.contains(substrate_network::Roles::AUTHORITY); let send_key = validator || local_status.collating_for.is_some(); - self.peers.insert(peer_id, PeerInfo { + self.peers.insert(who, PeerInfo { collating_for: local_status.collating_for, validator_key: None, claimed_validator: validator, }); - self.consensus_gossip.new_peer(ctx, peer_id, status.roles); + self.consensus_gossip.new_peer(ctx, who, status.roles); if let (true, &Some(ref consensus)) = (send_key, &self.live_consensus) { send_polkadot_message( ctx, - peer_id, + who, Message::SessionKey(consensus.local_session_key) ); } @@ -516,8 +516,8 @@ impl Specialization for PolkadotProtocol { self.dispatch_pending_requests(ctx); } - fn on_disconnect(&mut self, ctx: &mut Context, peer_id: PeerId) { - if let Some(info) = self.peers.remove(&peer_id) { + fn on_disconnect(&mut self, ctx: &mut Context, who: NodeIndex) { + if let Some(info) = self.peers.remove(&who) { if let Some((acc_id, _)) = info.collating_for { let new_primary = self.collators.on_disconnect(acc_id) .and_then(|new_primary| self.collator_peer_id(new_primary)); @@ -539,7 +539,7 @@ impl Specialization for PolkadotProtocol { { let pending = &mut self.pending; self.in_flight.retain(|&(_, ref peer), val| { - let retain = peer != &peer_id; + let retain = peer != &who; if !retain { let (sender, _) = oneshot::channel(); pending.push(::std::mem::replace(val, BlockDataRequest { @@ -554,24 +554,24 @@ impl Specialization for PolkadotProtocol { retain }); } - self.consensus_gossip.peer_disconnected(ctx, peer_id); + self.consensus_gossip.peer_disconnected(ctx, who); self.dispatch_pending_requests(ctx); } } - fn on_message(&mut self, ctx: &mut Context, peer_id: PeerId, message: message::Message) { + fn on_message(&mut self, ctx: &mut Context, who: NodeIndex, message: message::Message) { match message { generic_message::Message::BftMessage(msg) => { - trace!(target: "p_net", "Polkadot BFT message from {}: {:?}", peer_id, msg); + trace!(target: "p_net", "Polkadot BFT message from {}: {:?}", who, msg); // TODO: check signature here? what if relevant block is unknown? - self.consensus_gossip.on_bft_message(ctx, peer_id, msg) + self.consensus_gossip.on_bft_message(ctx, who, msg) } generic_message::Message::ChainSpecific(raw) => { match Message::decode(&mut raw.as_slice()) { - Some(msg) => self.on_polkadot_message(ctx, peer_id, raw, msg), + Some(msg) => self.on_polkadot_message(ctx, who, raw, msg), None => { - trace!(target: "p_net", "Bad message from {}", peer_id); - ctx.report_peer(peer_id, Severity::Bad("Invalid polkadot protocol message format")); + trace!(target: "p_net", "Bad message from {}", who); + ctx.report_peer(who, Severity::Bad("Invalid polkadot protocol message format")); } } } @@ -611,7 +611,7 @@ impl Specialization for PolkadotProtocol { impl PolkadotProtocol { // we received a collation from a peer - fn on_collation(&mut self, ctx: &mut Context, from: PeerId, relay_parent: Hash, collation: Collation) { + fn on_collation(&mut self, ctx: &mut Context, from: NodeIndex, relay_parent: Hash, collation: Collation) { let collation_para = collation.receipt.parachain_index; let collated_acc = collation.receipt.collator; @@ -638,7 +638,7 @@ impl PolkadotProtocol { } // get connected peer with given account ID for collation. - fn collator_peer_id(&self, account_id: AccountId) -> Option { + fn collator_peer_id(&self, account_id: AccountId) -> Option { let check_info = |info: &PeerInfo| info .collating_for .as_ref() @@ -647,14 +647,14 @@ impl PolkadotProtocol { self.peers .iter() .filter(|&(_, info)| check_info(info)) - .map(|(peer_id, _)| *peer_id) + .map(|(who, _)| *who) .next() } // disconnect a collator by account-id. fn disconnect_bad_collator(&self, ctx: &mut Context, account_id: AccountId) { - if let Some(peer_id) = self.collator_peer_id(account_id) { - ctx.report_peer(peer_id, Severity::Bad("Consensus layer determined the given collator misbehaved")) + if let Some(who) = self.collator_peer_id(account_id) { + ctx.report_peer(who, Severity::Bad("Consensus layer determined the given collator misbehaved")) } } } @@ -670,9 +670,9 @@ impl PolkadotProtocol { ) { for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) { match self.validators.get(&primary) { - Some(peer_id) => send_polkadot_message( + Some(who) => send_polkadot_message( ctx, - *peer_id, + *who, Message::Collation(relay_parent, cloned_collation), ), None => diff --git a/polkadot/network/src/tests.rs b/polkadot/network/src/tests.rs index 6d7fde3fe5..356d272e3f 100644 --- a/polkadot/network/src/tests.rs +++ b/polkadot/network/src/tests.rs @@ -24,16 +24,16 @@ use polkadot_primitives::{Block, Hash, SessionKey}; use polkadot_primitives::parachain::{CandidateReceipt, HeadData, BlockData}; use substrate_primitives::H512; use codec::Encode; -use substrate_network::{Severity, PeerId, PeerInfo, ClientHandle, Context, Roles, message::Message as SubstrateMessage, specialization::Specialization, generic_message::Message as GenericMessage}; +use substrate_network::{Severity, NodeIndex, PeerInfo, ClientHandle, Context, Roles, message::Message as SubstrateMessage, specialization::Specialization, generic_message::Message as GenericMessage}; use std::sync::Arc; use futures::Future; #[derive(Default)] struct TestContext { - disabled: Vec, - disconnected: Vec, - messages: Vec<(PeerId, SubstrateMessage)>, + disabled: Vec, + disconnected: Vec, + messages: Vec<(NodeIndex, SubstrateMessage)>, } impl Context for TestContext { @@ -41,24 +41,24 @@ impl Context for TestContext { unimplemented!() } - fn report_peer(&mut self, peer: PeerId, reason: Severity) { + fn report_peer(&mut self, peer: NodeIndex, reason: Severity) { match reason { Severity::Bad(_) => self.disabled.push(peer), _ => self.disconnected.push(peer), } } - fn peer_info(&self, _peer: PeerId) -> Option> { + fn peer_info(&self, _peer: NodeIndex) -> Option> { unimplemented!() } - fn send_message(&mut self, peer_id: PeerId, data: SubstrateMessage) { - self.messages.push((peer_id, data)) + fn send_message(&mut self, who: NodeIndex, data: SubstrateMessage) { + self.messages.push((who, data)) } } impl TestContext { - fn has_message(&self, to: PeerId, message: Message) -> bool { + fn has_message(&self, to: NodeIndex, message: Message) -> bool { use substrate_network::generic_message::Message as GenericMessage; let encoded = message.encode(); @@ -91,7 +91,7 @@ fn make_consensus(parent_hash: Hash, local_key: SessionKey) -> (CurrentConsensus (c, knowledge) } -fn on_message(protocol: &mut PolkadotProtocol, ctx: &mut TestContext, from: PeerId, message: Message) { +fn on_message(protocol: &mut PolkadotProtocol, ctx: &mut TestContext, from: NodeIndex, message: Message) { let encoded = message.encode(); protocol.on_message(ctx, from, GenericMessage::ChainSpecific(encoded)); } @@ -209,19 +209,19 @@ fn fetches_from_those_with_knowledge() { fn remove_bad_collator() { let mut protocol = PolkadotProtocol::new(None); - let peer_id = 1; + let who = 1; let account_id = [2; 32].into(); let status = Status { collating_for: Some((account_id, 5.into())) }; { let mut ctx = TestContext::default(); - protocol.on_connect(&mut ctx, peer_id, make_status(&status, Roles::NONE)); + protocol.on_connect(&mut ctx, who, make_status(&status, Roles::NONE)); } { let mut ctx = TestContext::default(); protocol.disconnect_bad_collator(&mut ctx, account_id); - assert!(ctx.disabled.contains(&peer_id)); + assert!(ctx.disabled.contains(&who)); } }