update to latest substrate - protocol API update (#130)

* update to latest substrate - protocol API update

* nits

* remove the wait on a future

* use std mpsc for oneshot
This commit is contained in:
Gregory Terzian
2019-02-10 00:52:57 +08:00
committed by Gav Wood
parent 9fb22ce28d
commit cdb2faf155
5 changed files with 358 additions and 289 deletions
+272 -242
View File
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -307,7 +307,7 @@ impl<P, E> Worker for CollationNode<P, E> where
parachain_context, parachain_context,
key, key,
).map(move |collation| { ).map(move |collation| {
network.with_spec(|spec, ctx| spec.add_local_collation( network.with_spec(move |spec, ctx| spec.add_local_collation(
ctx, ctx,
relay_parent, relay_parent,
targets, targets,
+43 -17
View File
@@ -150,26 +150,35 @@ impl<P, E> Network for ConsensusNetwork<P,E> where
let attestation_topic = table_router.gossip_topic(); let attestation_topic = table_router.gossip_topic();
let exit = self.exit.clone(); let exit = self.exit.clone();
let (tx, rx) = std::sync::mpsc::channel();
self.network.with_gossip(move |gossip, _| {
let inner_rx = gossip.messages_for(attestation_topic);
let _ = tx.send(inner_rx);
});
let table_router_clone = table_router.clone();
let executor = task_executor.clone();
// spin up a task in the background that processes all incoming statements // spin up a task in the background that processes all incoming statements
// TODO: propagate statements on a timer? // TODO: propagate statements on a timer?
let inner_stream = self.network.consensus_gossip().write().messages_for(attestation_topic); self.network
let process_task = self.network .with_spec(move |spec, ctx| {
.with_spec(|spec, ctx| {
spec.new_consensus(ctx, parent_hash, CurrentConsensus { spec.new_consensus(ctx, parent_hash, CurrentConsensus {
knowledge, knowledge,
local_session_key, local_session_key,
}); });
let inner_stream = match rx.try_recv() {
MessageProcessTask { Ok(inner_stream) => inner_stream,
_ => unreachable!("1. The with_gossip closure executed first, 2. the reply should be available")
};
let process_task = MessageProcessTask {
inner_stream, inner_stream,
parent_hash, parent_hash,
table_router: table_router.clone(), table_router: table_router_clone,
exit, exit,
} };
}) executor.spawn(process_task);
.then(|_| Ok(())); });
task_executor.spawn(process_task);
table_router table_router
} }
@@ -180,14 +189,27 @@ impl<P, E> Network for ConsensusNetwork<P,E> where
pub struct NetworkDown; pub struct NetworkDown;
/// A future that resolves when a collation is received. /// A future that resolves when a collation is received.
pub struct AwaitingCollation(::futures::sync::oneshot::Receiver<Collation>); pub struct AwaitingCollation {
outer: ::futures::sync::oneshot::Receiver<::futures::sync::oneshot::Receiver<Collation>>,
inner: Option<::futures::sync::oneshot::Receiver<Collation>>
}
impl Future for AwaitingCollation { impl Future for AwaitingCollation {
type Item = Collation; type Item = Collation;
type Error = NetworkDown; type Error = NetworkDown;
fn poll(&mut self) -> Poll<Collation, NetworkDown> { fn poll(&mut self) -> Poll<Collation, NetworkDown> {
self.0.poll().map_err(|_| NetworkDown) if let Some(ref mut inner) = self.inner {
return inner
.poll()
.map_err(|_| NetworkDown)
}
if let Ok(futures::Async::Ready(mut inner)) = self.outer.poll() {
let poll_result = inner.poll();
self.inner = Some(inner);
return poll_result.map_err(|_| NetworkDown)
}
Ok(futures::Async::NotReady)
} }
} }
@@ -198,13 +220,17 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E: Clone> Collators for Conse
type Collation = AwaitingCollation; type Collation = AwaitingCollation;
fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation { fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation {
AwaitingCollation( let (tx, rx) = ::futures::sync::oneshot::channel();
self.network.with_spec(|spec, _| spec.await_collation(relay_parent, parachain)) self.network.with_spec(move |spec, _| {
) let collation = spec.await_collation(relay_parent, parachain);
let _ = tx.send(collation);
});
AwaitingCollation{outer: rx, inner: None}
} }
fn note_bad_collator(&self, collator: AccountId) { fn note_bad_collator(&self, collator: AccountId) {
self.network.with_spec(|spec, ctx| spec.disconnect_bad_collator(ctx, collator)); self.network.with_spec(move |spec, ctx| spec.disconnect_bad_collator(ctx, collator));
} }
} }
+10 -10
View File
@@ -69,7 +69,7 @@ pub const DOT_PROTOCOL_ID: ::substrate_network::ProtocolId = *b"dot";
type FullStatus = GenericFullStatus<Block>; type FullStatus = GenericFullStatus<Block>;
/// Specialization of the network service for the polkadot protocol. /// Specialization of the network service for the polkadot protocol.
pub type NetworkService = ::substrate_network::Service<Block, PolkadotProtocol, Hash>; pub type NetworkService = ::substrate_network::Service<Block, PolkadotProtocol>;
/// Status of a Polkadot node. /// Status of a Polkadot node.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
@@ -311,7 +311,7 @@ impl PolkadotProtocol {
}; };
if !info.claimed_validator { if !info.claimed_validator {
ctx.report_peer(who, Severity::Bad("Session key broadcasted without setting authority role")); ctx.report_peer(who, Severity::Bad("Session key broadcasted without setting authority role".to_string()));
return; return;
} }
@@ -354,7 +354,7 @@ impl PolkadotProtocol {
self.pending.push(req); self.pending.push(req);
self.dispatch_pending_requests(ctx); self.dispatch_pending_requests(ctx);
} }
None => ctx.report_peer(who, Severity::Bad("Unexpected block data response")), None => ctx.report_peer(who, Severity::Bad("Unexpected block data response".to_string())),
} }
} }
@@ -373,7 +373,7 @@ impl PolkadotProtocol {
if info.validator_keys.as_slice().is_empty() { if info.validator_keys.as_slice().is_empty() {
ctx.report_peer( ctx.report_peer(
who, who,
Severity::Bad("Sent collator role without registering first as validator"), Severity::Bad("Sent collator role without registering first as validator".to_string()),
); );
} else { } else {
// update role for all saved session keys for this validator. // update role for all saved session keys for this validator.
@@ -419,7 +419,7 @@ impl Specialization<Block> for PolkadotProtocol {
if let Some((ref acc_id, ref para_id)) = local_status.collating_for { if let Some((ref acc_id, ref para_id)) = local_status.collating_for {
if self.collator_peer(acc_id.clone()).is_some() { if self.collator_peer(acc_id.clone()).is_some() {
ctx.report_peer(who, Severity::Useless("Unknown Polkadot-specific reason")); ctx.report_peer(who, Severity::Useless("Unknown Polkadot-specific reason".to_string()));
return return
} }
@@ -496,7 +496,7 @@ impl Specialization<Block> for PolkadotProtocol {
Some(msg) => self.on_polkadot_message(ctx, who, msg), Some(msg) => self.on_polkadot_message(ctx, who, msg),
None => { None => {
trace!(target: "p_net", "Bad message from {}", who); trace!(target: "p_net", "Bad message from {}", who);
ctx.report_peer(who, Severity::Bad("Invalid polkadot protocol message format")); ctx.report_peer(who, Severity::Bad("Invalid polkadot protocol message format".to_string()));
*message = Some(generic_message::Message::ChainSpecific(raw)); *message = Some(generic_message::Message::ChainSpecific(raw));
} }
} }
@@ -540,16 +540,16 @@ impl PolkadotProtocol {
let collated_acc = collation.receipt.collator; let collated_acc = collation.receipt.collator;
match self.peers.get(&from) { match self.peers.get(&from) {
None => ctx.report_peer(from, Severity::Useless("Unknown Polkadot specific reason")), None => ctx.report_peer(from, Severity::Useless("Unknown Polkadot specific reason".to_string())),
Some(peer_info) => match peer_info.collating_for { Some(peer_info) => match peer_info.collating_for {
None => ctx.report_peer(from, Severity::Bad("Sent collation without registering collator intent")), None => ctx.report_peer(from, Severity::Bad("Sent collation without registering collator intent".to_string())),
Some((ref acc_id, ref para_id)) => { Some((ref acc_id, ref para_id)) => {
let structurally_valid = para_id == &collation_para && acc_id == &collated_acc; let structurally_valid = para_id == &collation_para && acc_id == &collated_acc;
if structurally_valid && collation.receipt.check_signature().is_ok() { if structurally_valid && collation.receipt.check_signature().is_ok() {
debug!(target: "p_net", "Received collation for parachain {:?} from peer {}", para_id, from); debug!(target: "p_net", "Received collation for parachain {:?} from peer {}", para_id, from);
self.collators.on_collation(acc_id.clone(), relay_parent, collation) self.collators.on_collation(acc_id.clone(), relay_parent, collation)
} else { } else {
ctx.report_peer(from, Severity::Bad("Sent malformed collation")) ctx.report_peer(from, Severity::Bad("Sent malformed collation".to_string()))
}; };
} }
}, },
@@ -580,7 +580,7 @@ impl PolkadotProtocol {
// disconnect a collator by account-id. // disconnect a collator by account-id.
fn disconnect_bad_collator(&mut self, ctx: &mut Context<Block>, account_id: AccountId) { fn disconnect_bad_collator(&mut self, ctx: &mut Context<Block>, account_id: AccountId) {
if let Some((who, _)) = self.collator_peer(account_id) { if let Some((who, _)) = self.collator_peer(account_id) {
ctx.report_peer(who, Severity::Bad("Consensus layer determined the given collator misbehaved")) ctx.report_peer(who, Severity::Bad("Consensus layer determined the given collator misbehaved".to_string()))
} }
} }
} }
+32 -19
View File
@@ -170,14 +170,10 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Router<P>
produced.extrinsic, produced.extrinsic,
); );
let mut gossip = network.consensus_gossip().write();
// propagate the statement. // propagate the statement.
// consider something more targeted than gossip in the future. // consider something more targeted than gossip in the future.
let signed = table.sign_and_import(produced.validity); let signed = table.sign_and_import(produced.validity);
network.with_spec(|_, ctx| network.gossip_consensus_message(attestation_topic, signed.encode(), false);
gossip.multicast(ctx, attestation_topic, signed.encode(), false)
);
}) })
.map_err(|e| debug!(target: "p_net", "Failed to produce statements: {:?}", e)) .map_err(|e| debug!(target: "p_net", "Failed to produce statements: {:?}", e))
} }
@@ -195,29 +191,32 @@ impl<P: ProvideRuntimeApi + Send> TableRouter for Router<P>
let candidate = self.table.sign_and_import(GenericStatement::Candidate(receipt)); let candidate = self.table.sign_and_import(GenericStatement::Candidate(receipt));
self.knowledge.lock().note_candidate(hash, Some(block_data), Some(extrinsic)); self.knowledge.lock().note_candidate(hash, Some(block_data), Some(extrinsic));
let mut gossip = self.network.consensus_gossip().write(); self.network.gossip_consensus_message(self.attestation_topic, candidate.encode(), false);
self.network.with_spec(|_spec, ctx| {
gossip.multicast(ctx, self.attestation_topic, candidate.encode(), false);
});
} }
fn fetch_block_data(&self, candidate: &CandidateReceipt) -> BlockDataReceiver { fn fetch_block_data(&self, candidate: &CandidateReceipt) -> BlockDataReceiver {
let parent_hash = self.parent_hash; let parent_hash = self.parent_hash.clone();
let rx = self.network.with_spec(|spec, ctx| { spec.fetch_block_data(ctx, candidate, parent_hash) }); let candidate = candidate.clone();
BlockDataReceiver { inner: rx } let (tx, rx) = ::futures::sync::oneshot::channel();
self.network.with_spec(move |spec, ctx| {
let inner_rx = spec.fetch_block_data(ctx, &candidate, parent_hash);
let _ = tx.send(inner_rx);
});
BlockDataReceiver { outer: rx, inner: None }
} }
} }
impl<P> Drop for Router<P> { impl<P> Drop for Router<P> {
fn drop(&mut self) { fn drop(&mut self) {
let parent_hash = &self.parent_hash; let parent_hash = self.parent_hash.clone();
self.network.with_spec(|spec, _| spec.remove_consensus(parent_hash)); self.network.with_spec(move |spec, _| spec.remove_consensus(&parent_hash));
} }
} }
/// Receiver for block data. /// Receiver for block data.
pub struct BlockDataReceiver { pub struct BlockDataReceiver {
inner: ::futures::sync::oneshot::Receiver<BlockData>, outer: ::futures::sync::oneshot::Receiver<::futures::sync::oneshot::Receiver<BlockData>>,
inner: Option<::futures::sync::oneshot::Receiver<BlockData>>
} }
impl Future for BlockDataReceiver { impl Future for BlockDataReceiver {
@@ -225,10 +224,24 @@ impl Future for BlockDataReceiver {
type Error = io::Error; type Error = io::Error;
fn poll(&mut self) -> Poll<BlockData, io::Error> { fn poll(&mut self) -> Poll<BlockData, io::Error> {
self.inner.poll().map_err(|_| io::Error::new( if let Some(ref mut inner) = self.inner {
io::ErrorKind::Other, return inner
"Sending end of channel hung up", .poll()
)) .map_err(|_| io::Error::new(
io::ErrorKind::Other,
"Sending end of channel hung up",
))
}
if let Ok(futures::Async::Ready(mut inner)) = self.outer.poll() {
let poll_result = inner.poll();
self.inner = Some(inner);
return poll_result
.map_err(|_| io::Error::new(
io::ErrorKind::Other,
"Sending end of channel hung up",
))
}
Ok(futures::Async::NotReady)
} }
} }