overseer: send_msg should not return an error (#1995)

* send_message should not return an error

* Apply suggestions from code review

Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* s/send_logging_error/send_and_log_error

Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
This commit is contained in:
Andronik Ordian
2020-11-23 12:42:14 +01:00
committed by GitHub
parent 8cdb063f72
commit 69b103b1d5
21 changed files with 330 additions and 462 deletions
@@ -95,10 +95,7 @@ impl CollationGenerationSubsystem {
},
msg = receiver.next().fuse() => {
if let Some(msg) = msg {
if let Err(err) = ctx.send_message(msg).await {
tracing::warn!(target: LOG_TARGET, err = ?err, "failed to forward message to overseer");
break;
}
ctx.send_message(msg).await;
}
},
}
+2 -2
View File
@@ -690,7 +690,7 @@ where
RuntimeApiRequest::CandidateEvents(tx),
));
ctx.send_message(msg.into()).await?;
ctx.send_message(msg.into()).await;
Ok(rx.await??)
}
@@ -858,7 +858,7 @@ where
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::ChainApi(ChainApiMessage::BlockNumber(block_hash, tx))).await?;
ctx.send_message(AllMessages::ChainApi(ChainApiMessage::BlockNumber(block_hash, tx))).await;
Ok(rx.await??.map(|number| number).unwrap_or_default())
}
@@ -171,7 +171,7 @@ async fn runtime_api_request<T>(
relay_parent,
request,
))
).await?;
).await;
receiver.await.map_err(Into::into)
}
+2 -2
View File
@@ -143,13 +143,13 @@ where
let (sender, receiver) = futures::channel::oneshot::channel();
overseer.wait_for_activation(parent_header_hash, sender).await?;
overseer.wait_for_activation(parent_header_hash, sender).await;
receiver.await.map_err(|_| Error::ClosedChannelAwaitingActivation)??;
let (sender, receiver) = futures::channel::oneshot::channel();
overseer.send_msg(AllMessages::Provisioner(
ProvisionerMessage::RequestInherentData(parent_header_hash, sender),
)).await?;
)).await;
let mut timeout = futures_timer::Delay::new(PROPOSE_TIMEOUT).fuse();
@@ -56,62 +56,40 @@ const LOG_TARGET: &'static str = "availability_distribution";
#[derive(Debug, Error)]
enum Error {
#[error("Sending PendingAvailability query failed")]
QueryPendingAvailabilitySendQuery(#[source] SubsystemError),
#[error("Response channel to obtain PendingAvailability failed")]
QueryPendingAvailabilityResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain PendingAvailability failed")]
QueryPendingAvailability(#[source] RuntimeApiError),
#[error("Sending StoreChunk query failed")]
StoreChunkSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain StoreChunk failed")]
StoreChunkResponseChannel(#[source] oneshot::Canceled),
#[error("Sending QueryChunk query failed")]
QueryChunkSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain QueryChunk failed")]
QueryChunkResponseChannel(#[source] oneshot::Canceled),
#[error("Sending QueryAncestors query failed")]
QueryAncestorsSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain QueryAncestors failed")]
QueryAncestorsResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain QueryAncestors failed")]
QueryAncestors(#[source] ChainApiError),
#[error("Sending QuerySession query failed")]
QuerySessionSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain QuerySession failed")]
QuerySessionResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain QuerySession failed")]
QuerySession(#[source] RuntimeApiError),
#[error("Sending QueryValidators query failed")]
QueryValidatorsSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain QueryValidators failed")]
QueryValidatorsResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain QueryValidators failed")]
QueryValidators(#[source] RuntimeApiError),
#[error("Sending AvailabilityCores query failed")]
AvailabilityCoresSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain AvailabilityCores failed")]
AvailabilityCoresResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain AvailabilityCores failed")]
AvailabilityCores(#[source] RuntimeApiError),
#[error("Sending AvailabilityCores query failed")]
QueryAvailabilitySendQuery(#[source] SubsystemError),
#[error("Response channel to obtain AvailabilityCores failed")]
QueryAvailabilityResponseChannel(#[source] oneshot::Canceled),
#[error("Sending out a peer report message")]
ReportPeerMessageSend(#[source] SubsystemError),
#[error("Sending a gossip message")]
TrackedGossipMessage(#[source] SubsystemError),
#[error("Receive channel closed")]
IncomingMessageChannel(#[source] SubsystemError),
}
@@ -290,7 +268,7 @@ impl ProtocolState {
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn remove_relay_parent(&mut self, relay_parent: &Hash) -> Result<()> {
fn remove_relay_parent(&mut self, relay_parent: &Hash) {
// we might be ancestor of some other relay_parent
if let Some(ref mut descendants) = self.ancestry.get_mut(relay_parent) {
// if we were the last user, and it is
@@ -324,7 +302,6 @@ impl ProtocolState {
}
}
}
Ok(())
}
}
@@ -351,7 +328,7 @@ where
state.peer_views.remove(&peerid);
}
NetworkBridgeEvent::PeerViewChange(peerid, view) => {
handle_peer_view_change(ctx, state, peerid, view, metrics).await?;
handle_peer_view_change(ctx, state, peerid, view, metrics).await;
}
NetworkBridgeEvent::OurViewChange(view) => {
handle_our_view_change(ctx, keystore, state, view, metrics).await?;
@@ -472,14 +449,14 @@ where
};
send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message)
.await?;
.await;
}
}
// cleanup the removed relay parents and their states
let removed = old_view.difference(&view).collect::<Vec<_>>();
for removed in removed {
state.remove_relay_parent(&removed)?;
state.remove_relay_parent(&removed);
}
Ok(())
}
@@ -491,7 +468,7 @@ async fn send_tracked_gossip_message_to_peers<Context>(
metrics: &Metrics,
peers: Vec<PeerId>,
message: AvailabilityGossipMessage,
) -> Result<()>
)
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
@@ -506,7 +483,7 @@ async fn send_tracked_gossip_messages_to_peer<Context>(
metrics: &Metrics,
peer: PeerId,
message_iter: impl IntoIterator<Item = AvailabilityGossipMessage>,
) -> Result<()>
)
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
@@ -521,12 +498,12 @@ async fn send_tracked_gossip_messages_to_peers<Context>(
metrics: &Metrics,
peers: Vec<PeerId>,
message_iter: impl IntoIterator<Item = AvailabilityGossipMessage>,
) -> Result<()>
)
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
if peers.is_empty() {
return Ok(());
return;
}
for message in message_iter {
for peer in peers.iter() {
@@ -553,13 +530,10 @@ where
protocol_v1::ValidationProtocol::AvailabilityDistribution(wire_message),
),
))
.await
.map_err(|e| Error::TrackedGossipMessage(e))?;
.await;
metrics.on_chunk_distributed();
}
Ok(())
}
// Send the difference between two views which were not sent
@@ -571,7 +545,7 @@ async fn handle_peer_view_change<Context>(
origin: PeerId,
view: View,
metrics: &Metrics,
) -> Result<()>
)
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
@@ -616,9 +590,8 @@ where
.collect::<HashSet<_>>();
send_tracked_gossip_messages_to_peer(ctx, per_candidate, metrics, origin.clone(), messages)
.await?;
.await;
}
Ok(())
}
/// Obtain the first key which has a signing key.
@@ -662,7 +635,8 @@ where
let live_candidate = if let Some(live_candidate) = live_candidates.get(&message.candidate_hash) {
live_candidate
} else {
return modify_reputation(ctx, origin, COST_NOT_A_LIVE_CANDIDATE).await;
modify_reputation(ctx, origin, COST_NOT_A_LIVE_CANDIDATE).await;
return Ok(());
};
// check the merkle proof
@@ -674,12 +648,14 @@ where
) {
hash
} else {
return modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
return Ok(());
};
let erasure_chunk_hash = BlakeTwo256::hash(&message.erasure_chunk.chunk);
if anticipated_hash != erasure_chunk_hash {
return modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
return Ok(());
}
// an internal unique identifier of this message
@@ -695,7 +671,8 @@ where
.entry(origin.clone())
.or_default();
if received_set.contains(&message_id) {
return modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await;
modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await;
return Ok(());
} else {
received_set.insert(message_id.clone());
}
@@ -707,9 +684,9 @@ where
.insert(message_id.1, message.clone())
.is_some()
{
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await?;
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await;
} else {
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await?;
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await;
// save the chunk for our index
if let Some(validator_index) = per_candidate.validator_index {
@@ -762,7 +739,8 @@ where
.collect::<Vec<_>>();
// gossip that message to interested peers
send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await
send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await;
Ok(())
}
/// The bitfield distribution subsystem.
@@ -947,8 +925,7 @@ where
relay_parent,
RuntimeApiRequest::AvailabilityCores(tx),
)))
.await
.map_err(|e| Error::AvailabilityCoresSendQuery(e))?;
.await;
let all_para_ids: Vec<_> = rx
.await
@@ -970,7 +947,7 @@ where
/// Modify the reputation of a peer based on its behavior.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn modify_reputation<Context>(ctx: &mut Context, peer: PeerId, rep: Rep) -> Result<()>
async fn modify_reputation<Context>(ctx: &mut Context, peer: PeerId, rep: Rep)
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
@@ -982,9 +959,7 @@ where
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep),
))
.await
.map_err(|e| Error::ReportPeerMessageSend(e))
)).await;
}
/// Query the proof of validity for a particular candidate hash.
@@ -996,9 +971,8 @@ where
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryDataAvailability(candidate_hash, tx),
))
.await
.map_err(|e| Error::QueryAvailabilitySendQuery(e))?;
)).await;
rx.await
.map_err(|e| Error::QueryAvailabilityResponseChannel(e))
}
@@ -1015,9 +989,8 @@ where
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx),
))
.await
.map_err(|e| Error::QueryChunkSendQuery(e))?;
)).await;
rx.await.map_err(|e| Error::QueryChunkResponseChannel(e))
}
@@ -1033,17 +1006,15 @@ where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(
AllMessages::AvailabilityStore(
AvailabilityStoreMessage::StoreChunk {
candidate_hash,
relay_parent,
validator_index,
chunk: erasure_chunk,
tx,
}
)).await
.map_err(|e| Error::StoreChunkSendQuery(e))?;
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::StoreChunk {
candidate_hash,
relay_parent,
validator_index,
chunk: erasure_chunk,
tx,
}
)).await;
rx.await.map_err(|e| Error::StoreChunkResponseChannel(e))
}
@@ -1062,9 +1033,7 @@ where
ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::CandidatePendingAvailability(para, tx),
)))
.await
.map_err(|e| Error::QueryPendingAvailabilitySendQuery(e))?;
))).await;
rx.await
.map_err(|e| Error::QueryPendingAvailabilityResponseChannel(e))?
@@ -1087,8 +1056,7 @@ where
));
ctx.send_message(query_validators)
.await
.map_err(|e| Error::QueryValidatorsSendQuery(e))?;
.await;
rx.await
.map_err(|e| Error::QueryValidatorsResponseChannel(e))?
.map_err(|e| Error::QueryValidators(e))
@@ -1112,8 +1080,7 @@ where
});
ctx.send_message(query_ancestors)
.await
.map_err(|e| Error::QueryAncestorsSendQuery(e))?;
.await;
rx.await
.map_err(|e| Error::QueryAncestorsResponseChannel(e))?
.map_err(|e| Error::QueryAncestors(e))
@@ -1135,8 +1102,7 @@ where
));
ctx.send_message(query_session_idx_for_child)
.await
.map_err(|e| Error::QuerySessionSendQuery(e))?;
.await;
rx.await
.map_err(|e| Error::QuerySessionResponseChannel(e))?
.map_err(|e| Error::QuerySession(e))
@@ -163,24 +163,20 @@ impl BitfieldDistribution {
msg: BitfieldDistributionMessage::DistributeBitfield(hash, signed_availability),
} => {
tracing::trace!(target: LOG_TARGET, "Processing DistributeBitfield");
if let Err(err) = handle_bitfield_distribution(
handle_bitfield_distribution(
&mut ctx,
&mut state,
&self.metrics,
hash,
signed_availability,
).await {
tracing::warn!(target: LOG_TARGET, err = ?err, "Failed to reply to `DistributeBitfield` message");
}
).await;
}
FromOverseer::Communication {
msg: BitfieldDistributionMessage::NetworkBridgeUpdateV1(event),
} => {
tracing::trace!(target: LOG_TARGET, "Processing NetworkMessage");
// a network message was received
if let Err(e) = handle_network_msg(&mut ctx, &mut state, &self.metrics, event).await {
tracing::warn!(target: LOG_TARGET, err = ?e, "Failed to handle incoming network messages");
}
handle_network_msg(&mut ctx, &mut state, &self.metrics, event).await;
}
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated })) => {
let _timer = self.metrics.time_active_leaves_update();
@@ -234,7 +230,7 @@ async fn modify_reputation<Context>(
ctx: &mut Context,
peer: PeerId,
rep: ReputationChange,
) -> SubsystemResult<()>
)
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
@@ -255,7 +251,7 @@ async fn handle_bitfield_distribution<Context>(
metrics: &Metrics,
relay_parent: Hash,
signed_availability: SignedAvailabilityBitfield,
) -> SubsystemResult<()>
)
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
@@ -272,12 +268,12 @@ where
"Not supposed to work on relay parent related data",
);
return Ok(());
return;
};
let validator_set = &job_data.validator_set;
if validator_set.is_empty() {
tracing::trace!(target: LOG_TARGET, relay_parent = %relay_parent, "validator set is empty");
return Ok(());
return;
}
let validator_index = signed_availability.validator_index() as usize;
@@ -285,7 +281,7 @@ where
validator.clone()
} else {
tracing::trace!(target: LOG_TARGET, "Could not find a validator for index {}", validator_index);
return Ok(());
return;
};
let peer_views = &mut state.peer_views;
@@ -294,11 +290,9 @@ where
signed_availability,
};
relay_message(ctx, job_data, peer_views, validator, msg).await?;
relay_message(ctx, job_data, peer_views, validator, msg).await;
metrics.on_own_bitfield_gossipped();
Ok(())
}
/// Distribute a given valid and signature checked bitfield message.
@@ -311,7 +305,7 @@ async fn relay_message<Context>(
peer_views: &mut HashMap<PeerId, View>,
validator: ValidatorId,
message: BitfieldGossipMessage,
) -> SubsystemResult<()>
)
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
@@ -325,7 +319,7 @@ where
),
),
))
.await?;
.await;
let message_sent_to_peer = &mut (job_data.message_sent_to_peer);
@@ -361,9 +355,8 @@ where
message.into_validation_protocol(),
),
))
.await?;
.await;
}
Ok(())
}
/// Handle an incoming message from a peer.
@@ -374,13 +367,14 @@ async fn process_incoming_peer_message<Context>(
metrics: &Metrics,
origin: PeerId,
message: BitfieldGossipMessage,
) -> SubsystemResult<()>
)
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
// we don't care about this, not part of our view.
if !state.view.contains(&message.relay_parent) {
return modify_reputation(ctx, origin, COST_NOT_IN_VIEW).await;
modify_reputation(ctx, origin, COST_NOT_IN_VIEW).await;
return;
}
// Ignore anything the overseer did not tell this subsystem to work on.
@@ -388,7 +382,8 @@ where
let job_data: &mut _ = if let Some(ref mut job_data) = job_data {
job_data
} else {
return modify_reputation(ctx, origin, COST_NOT_IN_VIEW).await;
modify_reputation(ctx, origin, COST_NOT_IN_VIEW).await;
return;
};
let validator_set = &job_data.validator_set;
@@ -398,7 +393,8 @@ where
relay_parent = %message.relay_parent,
"Validator set is empty",
);
return modify_reputation(ctx, origin, COST_MISSING_PEER_SESSION_KEY).await;
modify_reputation(ctx, origin, COST_MISSING_PEER_SESSION_KEY).await;
return;
}
// Use the (untrusted) validator index provided by the signed payload
@@ -408,7 +404,8 @@ where
let validator = if let Some(validator) = validator_set.get(validator_index) {
validator.clone()
} else {
return modify_reputation(ctx, origin, COST_VALIDATOR_INDEX_INVALID).await;
modify_reputation(ctx, origin, COST_VALIDATOR_INDEX_INVALID).await;
return;
};
// Check if the peer already sent us a message for the validator denoted in the message earlier.
@@ -422,7 +419,8 @@ where
if !received_set.contains(&validator) {
received_set.insert(validator.clone());
} else {
return modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await;
modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await;
return;
};
if message
@@ -440,12 +438,12 @@ where
validator_index,
"already received a message for validator",
);
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await?;
return Ok(());
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await;
return;
}
one_per_validator.insert(validator.clone(), message.clone());
relay_message(ctx, job_data, &mut state.peer_views, validator, message).await?;
relay_message(ctx, job_data, &mut state.peer_views, validator, message).await;
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await
} else {
@@ -461,7 +459,7 @@ async fn handle_network_msg<Context>(
state: &mut ProtocolState,
metrics: &Metrics,
bridge_message: NetworkBridgeEvent<protocol_v1::BitfieldDistributionMessage>,
) -> SubsystemResult<()>
)
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
@@ -477,10 +475,10 @@ where
state.peer_views.remove(&peerid);
}
NetworkBridgeEvent::PeerViewChange(peerid, view) => {
handle_peer_view_change(ctx, state, peerid, view).await?;
handle_peer_view_change(ctx, state, peerid, view).await;
}
NetworkBridgeEvent::OurViewChange(view) => {
handle_our_view_change(state, view)?;
handle_our_view_change(state, view);
}
NetworkBridgeEvent::PeerMessage(remote, message) => {
match message {
@@ -490,17 +488,16 @@ where
relay_parent,
signed_availability: bitfield,
};
process_incoming_peer_message(ctx, state, metrics, remote, gossiped_bitfield).await?;
process_incoming_peer_message(ctx, state, metrics, remote, gossiped_bitfield).await;
}
}
}
}
Ok(())
}
/// Handle the changes necassary when our view changes.
#[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))]
fn handle_our_view_change(state: &mut ProtocolState, view: View) -> SubsystemResult<()> {
fn handle_our_view_change(state: &mut ProtocolState, view: View) {
let old_view = std::mem::replace(&mut (state.view), view);
for added in state.view.difference(&old_view) {
@@ -517,7 +514,6 @@ fn handle_our_view_change(state: &mut ProtocolState, view: View) -> SubsystemRes
// cleanup relay parents we are not interested in any more
let _ = state.per_relay_parent.remove(&removed);
}
Ok(())
}
@@ -529,7 +525,7 @@ async fn handle_peer_view_change<Context>(
state: &mut ProtocolState,
origin: PeerId,
view: View,
) -> SubsystemResult<()>
)
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
@@ -567,10 +563,8 @@ where
.collect();
for (validator, message) in delta_set.into_iter() {
send_tracked_gossip_message(ctx, state, origin.clone(), validator, message).await?;
send_tracked_gossip_message(ctx, state, origin.clone(), validator, message).await;
}
Ok(())
}
/// Send a gossip message and track it in the per relay parent data.
@@ -581,14 +575,14 @@ async fn send_tracked_gossip_message<Context>(
dest: PeerId,
validator: ValidatorId,
message: BitfieldGossipMessage,
) -> SubsystemResult<()>
)
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
let job_data = if let Some(job_data) = state.per_relay_parent.get_mut(&message.relay_parent) {
job_data
} else {
return Ok(());
return;
};
let message_sent_to_peer = &mut (job_data.message_sent_to_peer);
@@ -602,10 +596,7 @@ where
vec![dest],
message.into_validation_protocol(),
),
))
.await?;
Ok(())
)).await;
}
impl<C> Subsystem<C> for BitfieldDistribution
@@ -647,7 +638,7 @@ where
));
ctx.send_messages(std::iter::once(query_validators).chain(std::iter::once(query_signing)))
.await?;
.await;
match (validators_rx.await?, session_rx.await?) {
(Ok(v), Ok(s)) => Ok(Some((
@@ -788,7 +779,6 @@ mod test {
.timeout(Duration::from_millis(10))
.await
.expect("10ms is more than enough for sending messages.")
.expect("Error values should really never occur.")
};
}
+12 -52
View File
@@ -374,21 +374,15 @@ async fn update_view(
WireMessage::ViewUpdate(new_view.clone()),
).await?;
if let Err(e) = dispatch_validation_event_to_all(
dispatch_validation_event_to_all(
NetworkBridgeEvent::OurViewChange(new_view.clone()),
ctx,
).await {
tracing::warn!(target: LOG_TARGET, err = ?e, "Aborting - Failure to dispatch messages to overseer");
return Err(e)
}
).await;
if let Err(e) = dispatch_collation_event_to_all(
dispatch_collation_event_to_all(
NetworkBridgeEvent::OurViewChange(new_view.clone()),
ctx,
).await {
tracing::warn!(target: LOG_TARGET, err = ?e, "Aborting - Failure to dispatch messages to overseer");
return Err(e)
}
).await;
Ok(())
}
@@ -507,14 +501,14 @@ async fn send_message<M, I>(
async fn dispatch_validation_event_to_all(
event: NetworkBridgeEvent<protocol_v1::ValidationProtocol>,
ctx: &mut impl SubsystemContext<Message=NetworkBridgeMessage>,
) -> SubsystemResult<()> {
) {
dispatch_validation_events_to_all(std::iter::once(event), ctx).await
}
async fn dispatch_collation_event_to_all(
event: NetworkBridgeEvent<protocol_v1::CollationProtocol>,
ctx: &mut impl SubsystemContext<Message=NetworkBridgeMessage>,
) -> SubsystemResult<()> {
) {
dispatch_collation_events_to_all(std::iter::once(event), ctx).await
}
@@ -522,7 +516,7 @@ async fn dispatch_collation_event_to_all(
async fn dispatch_validation_events_to_all<I>(
events: I,
ctx: &mut impl SubsystemContext<Message=NetworkBridgeMessage>,
) -> SubsystemResult<()>
)
where
I: IntoIterator<Item = NetworkBridgeEvent<protocol_v1::ValidationProtocol>>,
I::IntoIter: Send,
@@ -554,7 +548,7 @@ async fn dispatch_validation_events_to_all<I>(
async fn dispatch_collation_events_to_all<I>(
events: I,
ctx: &mut impl SubsystemContext<Message=NetworkBridgeMessage>,
) -> SubsystemResult<()>
)
where
I: IntoIterator<Item = NetworkBridgeEvent<protocol_v1::CollationProtocol>>,
I::IntoIter: Send,
@@ -665,7 +659,7 @@ where
view: View(Vec::new()),
});
let res = match peer_set {
match peer_set {
PeerSet::Validation => dispatch_validation_events_to_all(
vec![
NetworkBridgeEvent::PeerConnected(peer.clone(), role),
@@ -686,11 +680,6 @@ where
],
&mut ctx,
).await,
};
if let Err(e) = res {
tracing::warn!(err = ?e, "Aborting - Failure to dispatch messages to overseer");
return Err(e);
}
}
}
@@ -704,7 +693,7 @@ where
validator_discovery.on_peer_disconnected(&peer);
if peer_map.remove(&peer).is_some() {
let res = match peer_set {
match peer_set {
PeerSet::Validation => dispatch_validation_event_to_all(
NetworkBridgeEvent::PeerDisconnected(peer),
&mut ctx,
@@ -713,15 +702,6 @@ where
NetworkBridgeEvent::PeerDisconnected(peer),
&mut ctx,
).await,
};
if let Err(e) = res {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Aborting - Failure to dispatch messages to overseer",
);
return Err(e)
}
}
},
@@ -734,17 +714,7 @@ where
&mut network_service,
).await?;
if let Err(e) = dispatch_validation_events_to_all(
events,
&mut ctx,
).await {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Aborting - Failure to dispatch messages to overseer",
);
return Err(e)
}
dispatch_validation_events_to_all(events, &mut ctx).await;
}
if !c_messages.is_empty() {
@@ -755,17 +725,7 @@ where
&mut network_service,
).await?;
if let Err(e) = dispatch_collation_events_to_all(
events,
&mut ctx,
).await {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Aborting - Failure to dispatch messages to overseer",
);
return Err(e)
}
dispatch_collation_events_to_all(events, &mut ctx).await;
}
},
}
@@ -274,7 +274,7 @@ async fn distribute_collation(
"there are no validators assigned to core",
);
return Ok(())
return Ok(());
}
// Issue a discovery request for the validators of the current group and the next group.
@@ -342,7 +342,7 @@ async fn declare(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
state: &mut State,
peer: PeerId,
) -> Result<()> {
) {
let wire_message = protocol_v1::CollatorProtocolMessage::Declare(state.our_id.clone());
ctx.send_message(AllMessages::NetworkBridge(
@@ -350,9 +350,7 @@ async fn declare(
vec![peer],
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
)).await?;
Ok(())
)).await;
}
/// Issue a connection request to a set of validators and
@@ -385,12 +383,10 @@ async fn advertise_collation(
state: &mut State,
relay_parent: Hash,
peer: PeerId,
) -> Result<()> {
) {
let collating_on = match state.collating_on {
Some(collating_on) => collating_on,
None => {
return Ok(());
}
None => return,
};
let should_advertise = state.our_validators_groups
@@ -399,7 +395,7 @@ async fn advertise_collation(
.unwrap_or(false);
if !state.collations.contains_key(&relay_parent) || !should_advertise {
return Ok(())
return;
}
let wire_message = protocol_v1::CollatorProtocolMessage::AdvertiseCollation(relay_parent, collating_on);
@@ -409,15 +405,13 @@ async fn advertise_collation(
vec![peer.clone()],
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
)).await?;
)).await;
if let Some(validators) = state.our_validators_groups.get_mut(&relay_parent) {
validators.advertised_to_peer(&peer);
}
state.metrics.on_advertisment_made();
Ok(())
}
/// The main incoming message dispatching switch.
@@ -504,7 +498,7 @@ async fn send_collation(
origin: PeerId,
receipt: CandidateReceipt,
pov: PoV,
) -> Result<()> {
) {
let wire_message = protocol_v1::CollatorProtocolMessage::Collation(
request_id,
receipt,
@@ -516,11 +510,9 @@ async fn send_collation(
vec![origin],
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
)).await?;
)).await;
state.metrics.on_collation_sent();
Ok(())
}
/// A networking messages switch.
@@ -551,7 +543,7 @@ async fn handle_incoming_peer_message(
Some(our_para_id) => {
if our_para_id == para_id {
if let Some(collation) = state.collations.get(&relay_parent).cloned() {
send_collation(ctx, state, request_id, origin, collation.0, collation.1).await?;
send_collation(ctx, state, request_id, origin, collation.0, collation.1).await;
}
} else {
tracing::warn!(
@@ -589,7 +581,7 @@ async fn handle_peer_view_change(
state: &mut State,
peer_id: PeerId,
view: View,
) -> Result<()> {
) {
let current = state.peer_views.entry(peer_id.clone()).or_default();
let added: Vec<Hash> = view.difference(&*current).cloned().collect();
@@ -597,10 +589,8 @@ async fn handle_peer_view_change(
*current = view;
for added in added.into_iter() {
advertise_collation(ctx, state, added, peer_id.clone()).await?;
advertise_collation(ctx, state, added, peer_id.clone()).await;
}
Ok(())
}
/// A validator is connected.
@@ -613,11 +603,11 @@ async fn handle_validator_connected(
peer_id: PeerId,
validator_id: ValidatorId,
relay_parent: Hash,
) -> Result<()> {
) {
let not_declared = state.declared_at.insert(peer_id.clone());
if not_declared {
declare(ctx, state, peer_id.clone()).await?;
declare(ctx, state, peer_id.clone()).await;
}
// Store the PeerId and find out if we should advertise to this peer.
@@ -630,10 +620,8 @@ async fn handle_validator_connected(
};
if advertise && state.peer_interested_in_leaf(&peer_id, &relay_parent) {
advertise_collation(ctx, state, relay_parent, peer_id).await?;
advertise_collation(ctx, state, relay_parent, peer_id).await;
}
Ok(())
}
/// Bridge messages switch.
@@ -651,7 +639,7 @@ async fn handle_network_msg(
// it should be handled here.
}
PeerViewChange(peer_id, view) => {
handle_peer_view_change(ctx, state, peer_id, view).await?;
handle_peer_view_change(ctx, state, peer_id, view).await;
}
PeerDisconnected(peer_id) => {
state.peer_views.remove(&peer_id);
@@ -712,19 +700,13 @@ pub(crate) async fn run(
let _timer = state.metrics.time_handle_connection_request();
if let Err(err) = handle_validator_connected(
handle_validator_connected(
&mut ctx,
&mut state,
peer_id,
validator_id,
relay_parent,
).await {
tracing::warn!(
target: LOG_TARGET,
err = ?err,
"Failed to declare our collator id",
);
}
).await;
},
msg = ctx.recv().fuse() => match msg? {
Communication { msg } => {
@@ -137,7 +137,7 @@ where
/// Modify the reputation of a peer based on its behavior.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn modify_reputation<Context>(ctx: &mut Context, peer: PeerId, rep: Rep) -> Result<()>
async fn modify_reputation<Context>(ctx: &mut Context, peer: PeerId, rep: Rep)
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
@@ -150,7 +150,5 @@ where
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep),
)).await?;
Ok(())
)).await;
}
@@ -225,7 +225,7 @@ async fn fetch_collation<Context>(
collator_id: CollatorId,
para_id: ParaId,
tx: oneshot::Sender<(CandidateReceipt, PoV)>
) -> Result<()>
)
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
@@ -242,7 +242,7 @@ where
"Failed to send collation",
);
}
return Ok(());
return;
}
}
}
@@ -262,10 +262,8 @@ where
// Request the collation.
// Assume it is `request_collation`'s job to check and ignore duplicate requests.
if let Some(relevant_advertiser) = relevant_advertiser {
request_collation(ctx, state, relay_parent, para_id, relevant_advertiser, tx).await?;
request_collation(ctx, state, relay_parent, para_id, relevant_advertiser, tx).await;
}
Ok(())
}
/// Report a collator for some malicious actions.
@@ -274,7 +272,7 @@ async fn report_collator<Context>(
ctx: &mut Context,
state: &mut State,
id: CollatorId,
) -> Result<()>
)
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
@@ -283,11 +281,9 @@ where
// is a tolerable thing to do.
for (k, v) in state.known_collators.iter() {
if *v == id {
modify_reputation(ctx, k.clone(), COST_REPORT_BAD).await?;
modify_reputation(ctx, k.clone(), COST_REPORT_BAD).await;
}
}
Ok(())
}
/// Some other subsystem has reported a collator as a good one, bump reputation.
@@ -296,17 +292,15 @@ async fn note_good_collation<Context>(
ctx: &mut Context,
state: &mut State,
id: CollatorId,
) -> Result<()>
)
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
for (peer_id, collator_id) in state.known_collators.iter() {
if id == *collator_id {
modify_reputation(ctx, peer_id.clone(), BENEFIT_NOTIFY_GOOD).await?;
modify_reputation(ctx, peer_id.clone(), BENEFIT_NOTIFY_GOOD).await;
}
}
Ok(())
}
/// A peer's view has changed. A number of things should be done:
@@ -362,7 +356,7 @@ async fn received_collation<Context>(
request_id: RequestId,
receipt: CandidateReceipt,
pov: PoV,
) -> Result<()>
)
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
@@ -390,11 +384,9 @@ where
// If this collation is not just a delayed one that we were expecting,
// but our view has moved on, in that case modify peer's reputation.
if !state.recently_removed_heads.contains(&relay_parent) {
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await?;
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
}
}
Ok(())
}
/// Request a collation from the network.
@@ -411,7 +403,7 @@ async fn request_collation<Context>(
para_id: ParaId,
peer_id: PeerId,
result: oneshot::Sender<(CandidateReceipt, PoV)>,
) -> Result<()>
)
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
@@ -423,7 +415,7 @@ where
relay_parent = %relay_parent,
"collation is no longer in view",
);
return Ok(());
return;
}
if state.requested_collations.contains_key(&(relay_parent, para_id.clone(), peer_id.clone())) {
@@ -434,7 +426,7 @@ where
relay_parent = %relay_parent,
"collation has already been requested",
);
return Ok(());
return;
}
let request_id = state.next_request_id;
@@ -470,9 +462,7 @@ where
vec![peer_id],
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
)).await?;
Ok(())
)).await;
}
/// Notify `CandidateSelectionSubsystem` that a collation has been advertised.
@@ -482,7 +472,7 @@ async fn notify_candidate_selection<Context>(
collator: CollatorId,
relay_parent: Hash,
para_id: ParaId,
) -> Result<()>
)
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
@@ -492,9 +482,7 @@ where
para_id,
collator,
)
)).await?;
Ok(())
)).await;
}
/// Networking message has been received.
@@ -504,7 +492,7 @@ async fn process_incoming_peer_message<Context>(
state: &mut State,
origin: PeerId,
msg: protocol_v1::CollatorProtocolMessage,
)-> Result<()>
)
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
@@ -519,19 +507,17 @@ where
state.advertisements.entry(origin.clone()).or_default().insert((para_id, relay_parent));
if let Some(collator) = state.known_collators.get(&origin) {
notify_candidate_selection(ctx, collator.clone(), relay_parent, para_id).await?;
notify_candidate_selection(ctx, collator.clone(), relay_parent, para_id).await;
}
}
RequestCollation(_, _, _) => {
// This is a validator side of the protocol, collation requests are not expected here.
return modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
}
Collation(request_id, receipt, pov) => {
received_collation(ctx, state, origin, request_id, receipt, pov).await?;
received_collation(ctx, state, origin, request_id, receipt, pov).await;
}
}
Ok(())
}
/// A leaf has become inactive so we want to
@@ -592,7 +578,7 @@ async fn request_timed_out<Context>(
ctx: &mut Context,
state: &mut State,
id: RequestId,
) -> Result<()>
)
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
@@ -604,12 +590,10 @@ where
if let Some(_) = state.requests_info.remove(&id) {
let peer_id = key.2;
modify_reputation(ctx, peer_id, COST_REQUEST_TIMED_OUT).await?;
modify_reputation(ctx, peer_id, COST_REQUEST_TIMED_OUT).await;
}
}
}
Ok(())
}
/// Bridge event switch.
@@ -639,7 +623,7 @@ where
handle_our_view_change(state, view).await?;
},
PeerMessage(remote, msg) => {
process_incoming_peer_message(ctx, state, remote, msg).await?;
process_incoming_peer_message(ctx, state, remote, msg).await;
}
}
@@ -652,7 +636,7 @@ async fn process_msg<Context>(
ctx: &mut Context,
msg: CollatorProtocolMessage,
state: &mut State,
) -> Result<()>
)
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
@@ -675,13 +659,13 @@ where
);
}
FetchCollation(relay_parent, collator_id, para_id, tx) => {
fetch_collation(ctx, state, relay_parent, collator_id, para_id, tx).await?;
fetch_collation(ctx, state, relay_parent, collator_id, para_id, tx).await;
}
ReportCollator(id) => {
report_collator(ctx, state, id).await?;
report_collator(ctx, state, id).await;
}
NoteGoodCollation(id) => {
note_good_collation(ctx, state, id).await?;
note_good_collation(ctx, state, id).await;
}
NetworkBridgeUpdateV1(event) => {
if let Err(e) = handle_network_msg(
@@ -697,8 +681,6 @@ where
}
}
}
Ok(())
}
/// The main run loop.
@@ -726,7 +708,7 @@ where
tracing::trace!(target: LOG_TARGET, msg = ?msg, "received a message");
match msg {
Communication { msg } => process_msg(&mut ctx, msg, &mut state).await?,
Communication { msg } => process_msg(&mut ctx, msg, &mut state).await,
Signal(BlockFinalized(_)) => {}
Signal(ActiveLeaves(_)) => {}
Signal(Conclude) => { break }
@@ -742,7 +724,7 @@ where
match request {
CollationRequestResult::Timeout(id) => {
tracing::trace!(target: LOG_TARGET, id, "request timed out");
request_timed_out(&mut ctx, &mut state, id).await?;
request_timed_out(&mut ctx, &mut state, id).await;
}
CollationRequestResult::Received(id) => {
state.requests_info.remove(&id);
@@ -132,7 +132,7 @@ async fn handle_signal(
ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::Validators(vals_tx),
))).await?;
))).await;
let n_validators = match vals_rx.await? {
Ok(v) => v.len(),
@@ -178,7 +178,7 @@ async fn notify_all_we_are_awaiting(
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
relay_parent: Hash,
pov_hash: Hash,
) -> SubsystemResult<()> {
) {
// We use `awaited` as a proxy for which heads are in the peer's view.
let peers_to_send: Vec<_> = peers.iter()
.filter_map(|(peer, state)| if state.awaited.contains_key(&relay_parent) {
@@ -188,7 +188,9 @@ async fn notify_all_we_are_awaiting(
})
.collect();
if peers_to_send.is_empty() { return Ok(()) }
if peers_to_send.is_empty() {
return;
}
let payload = awaiting_message(relay_parent, vec![pov_hash]);
@@ -205,7 +207,7 @@ async fn notify_one_we_are_awaiting_many(
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
relay_parent_state: &HashMap<Hash, BlockBasedState>,
relay_parent: Hash,
) -> SubsystemResult<()> {
) {
let awaiting_hashes = relay_parent_state.get(&relay_parent).into_iter().flat_map(|s| {
// Send the peer everything we are fetching at this relay-parent
s.fetching.iter()
@@ -213,7 +215,9 @@ async fn notify_one_we_are_awaiting_many(
.map(|(pov_hash, _)| *pov_hash)
}).collect::<Vec<_>>();
if awaiting_hashes.is_empty() { return Ok(()) }
if awaiting_hashes.is_empty() {
return;
}
let payload = awaiting_message(relay_parent, awaiting_hashes);
@@ -232,7 +236,7 @@ async fn distribute_to_awaiting(
relay_parent: Hash,
pov_hash: Hash,
pov: &PoV,
) -> SubsystemResult<()> {
) {
// Send to all peers who are awaiting the PoV and have that relay-parent in their view.
//
// Also removes it from their awaiting set.
@@ -246,18 +250,16 @@ async fn distribute_to_awaiting(
}))
.collect();
if peers_to_send.is_empty() { return Ok(()) }
if peers_to_send.is_empty() { return; }
let payload = send_pov_message(relay_parent, pov_hash, pov.clone());
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers_to_send,
payload,
))).await?;
))).await;
metrics.on_pov_distributed();
Ok(())
}
/// Handles a `FetchPoV` message.
@@ -268,17 +270,17 @@ async fn handle_fetch(
relay_parent: Hash,
descriptor: CandidateDescriptor,
response_sender: oneshot::Sender<Arc<PoV>>,
) -> SubsystemResult<()> {
) {
let _timer = state.metrics.time_handle_fetch();
let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) {
Some(s) => s,
None => return Ok(()),
None => return,
};
if let Some(pov) = relay_parent_state.known.get(&descriptor.pov_hash) {
let _ = response_sender.send(pov.clone());
return Ok(());
return;
}
{
@@ -286,7 +288,7 @@ async fn handle_fetch(
Entry::Occupied(mut e) => {
// we are already awaiting this PoV if there is an entry.
e.get_mut().push(response_sender);
return Ok(());
return;
}
Entry::Vacant(e) => {
e.insert(vec![response_sender]);
@@ -299,7 +301,7 @@ async fn handle_fetch(
relay_parent_state.fetching.len = relay_parent_state.fetching.len(),
"other subsystems have requested PoV distribution to fetch more PoVs than reasonably expected",
);
return Ok(());
return;
}
// Issue an `Awaiting` message to all peers with this in their view.
@@ -319,12 +321,12 @@ async fn handle_distribute(
relay_parent: Hash,
descriptor: CandidateDescriptor,
pov: Arc<PoV>,
) -> SubsystemResult<()> {
) {
let _timer = state.metrics.time_handle_distribute();
let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) {
None => return Ok(()),
Some(s) => s,
None => return,
};
if let Some(our_awaited) = relay_parent_state.fetching.get_mut(&descriptor.pov_hash) {
@@ -355,7 +357,7 @@ async fn report_peer(
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
peer: PeerId,
rep: Rep,
) -> SubsystemResult<()> {
) {
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(peer, rep))).await
}
@@ -367,16 +369,16 @@ async fn handle_awaiting(
peer: PeerId,
relay_parent: Hash,
pov_hashes: Vec<Hash>,
) -> SubsystemResult<()> {
) {
if !state.our_view.0.contains(&relay_parent) {
report_peer(ctx, peer, COST_AWAITED_NOT_IN_VIEW).await?;
return Ok(());
report_peer(ctx, peer, COST_AWAITED_NOT_IN_VIEW).await;
return;
}
let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) {
None => {
tracing::warn!("PoV Distribution relay parent state out-of-sync with our view");
return Ok(());
return;
}
Some(s) => s,
};
@@ -385,8 +387,8 @@ async fn handle_awaiting(
state.peer_state.get_mut(&peer).and_then(|s| s.awaited.get_mut(&relay_parent))
{
None => {
report_peer(ctx, peer, COST_AWAITED_NOT_IN_VIEW).await?;
return Ok(());
report_peer(ctx, peer, COST_AWAITED_NOT_IN_VIEW).await;
return;
}
Some(a) => a,
};
@@ -400,16 +402,14 @@ async fn handle_awaiting(
let payload = send_pov_message(relay_parent, pov_hash, (&**pov).clone());
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await?;
)).await;
} else {
peer_awaiting.insert(pov_hash);
}
}
} else {
report_peer(ctx, peer, COST_APPARENT_FLOOD).await?;
report_peer(ctx, peer, COST_APPARENT_FLOOD).await;
}
Ok(())
}
/// Handle an incoming PoV from our peer. Reports them if unexpected, rewards them if not.
@@ -423,11 +423,11 @@ async fn handle_incoming_pov(
relay_parent: Hash,
pov_hash: Hash,
pov: PoV,
) -> SubsystemResult<()> {
) {
let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) {
None => {
report_peer(ctx, peer, COST_UNEXPECTED_POV).await?;
return Ok(());
report_peer(ctx, peer, COST_UNEXPECTED_POV).await;
return;
},
Some(r) => r,
};
@@ -436,16 +436,16 @@ async fn handle_incoming_pov(
// Do validity checks and complete all senders awaiting this PoV.
let fetching = match relay_parent_state.fetching.get_mut(&pov_hash) {
None => {
report_peer(ctx, peer, COST_UNEXPECTED_POV).await?;
return Ok(());
report_peer(ctx, peer, COST_UNEXPECTED_POV).await;
return;
}
Some(f) => f,
};
let hash = pov.hash();
if hash != pov_hash {
report_peer(ctx, peer, COST_UNEXPECTED_POV).await?;
return Ok(());
report_peer(ctx, peer, COST_UNEXPECTED_POV).await;
return;
}
let pov = Arc::new(pov);
@@ -453,10 +453,10 @@ async fn handle_incoming_pov(
if fetching.is_empty() {
// fetching is empty whenever we were awaiting something and
// it was completed afterwards.
report_peer(ctx, peer.clone(), BENEFIT_LATE_POV).await?;
report_peer(ctx, peer.clone(), BENEFIT_LATE_POV).await;
} else {
// fetching is non-empty when the peer just provided us with data we needed.
report_peer(ctx, peer.clone(), BENEFIT_FRESH_POV).await?;
report_peer(ctx, peer.clone(), BENEFIT_FRESH_POV).await;
}
for response_sender in fetching.drain(..) {
@@ -488,17 +488,15 @@ async fn handle_network_update(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
update: NetworkBridgeEvent<protocol_v1::PoVDistributionMessage>,
) -> SubsystemResult<()> {
) {
let _timer = state.metrics.time_handle_network_update();
match update {
NetworkBridgeEvent::PeerConnected(peer, _observed_role) => {
state.peer_state.insert(peer, PeerState { awaited: HashMap::new() });
Ok(())
}
NetworkBridgeEvent::PeerDisconnected(peer) => {
state.peer_state.remove(&peer);
Ok(())
}
NetworkBridgeEvent::PeerViewChange(peer_id, view) => {
if let Some(peer_state) = state.peer_state.get_mut(&peer_id) {
@@ -516,12 +514,11 @@ async fn handle_network_update(
ctx,
&state.relay_parent_state,
*relay_parent,
).await?;
).await;
}
}
}
Ok(())
}
NetworkBridgeEvent::PeerMessage(peer, message) => {
match message {
@@ -546,7 +543,6 @@ async fn handle_network_update(
}
NetworkBridgeEvent::OurViewChange(view) => {
state.our_view = view;
Ok(())
}
}
}
@@ -582,7 +578,7 @@ impl PoVDistribution {
relay_parent,
descriptor,
response_sender,
).await?,
).await,
PoVDistributionMessage::DistributePoV(relay_parent, descriptor, pov) =>
handle_distribute(
&mut state,
@@ -590,13 +586,13 @@ impl PoVDistribution {
relay_parent,
descriptor,
pov,
).await?,
).await,
PoVDistributionMessage::NetworkBridgeUpdateV1(event) =>
handle_network_update(
&mut state,
&mut ctx,
event,
).await?,
).await,
},
}
}
@@ -80,7 +80,7 @@ fn distributes_to_those_awaiting_and_completes_local() {
hash_a,
descriptor,
Arc::new(pov.clone()),
).await.unwrap();
).await;
assert!(!state.peer_state[&peer_a].awaited[&hash_a].contains(&pov_hash));
assert!(state.peer_state[&peer_c].awaited[&hash_b].contains(&pov_hash));
@@ -160,7 +160,7 @@ fn we_inform_peers_with_same_view_we_are_awaiting() {
hash_a,
descriptor,
pov_send,
).await.unwrap();
).await;
assert_eq!(state.relay_parent_state[&hash_a].fetching[&pov_hash].len(), 1);
@@ -234,7 +234,7 @@ fn peer_view_change_leads_to_us_informing() {
&mut state,
&mut ctx,
NetworkBridgeEvent::PeerViewChange(peer_a.clone(), View(vec![hash_a, hash_b])),
).await.unwrap();
).await;
assert_matches!(
handle.recv().await,
@@ -310,7 +310,7 @@ fn peer_complete_fetch_and_is_rewarded() {
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
).focus().unwrap(),
).await.unwrap();
).await;
handle_network_update(
&mut state,
@@ -319,7 +319,7 @@ fn peer_complete_fetch_and_is_rewarded() {
peer_b.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
).focus().unwrap(),
).await.unwrap();
).await;
assert_eq!(&*pov_recv.await.unwrap(), &pov);
@@ -399,7 +399,7 @@ fn peer_punished_for_sending_bad_pov() {
peer_a.clone(),
send_pov_message(hash_a, pov_hash, bad_pov.clone()),
).focus().unwrap(),
).await.unwrap();
).await;
// didn't complete our sender.
assert_eq!(state.relay_parent_state[&hash_a].fetching[&pov_hash].len(), 1);
@@ -463,7 +463,7 @@ fn peer_punished_for_sending_unexpected_pov() {
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
).focus().unwrap(),
).await.unwrap();
).await;
assert_matches!(
handle.recv().await,
@@ -525,7 +525,7 @@ fn peer_punished_for_sending_pov_out_of_our_view() {
peer_a.clone(),
send_pov_message(hash_b, pov_hash, pov.clone()),
).focus().unwrap(),
).await.unwrap();
).await;
assert_matches!(
handle.recv().await,
@@ -588,7 +588,7 @@ fn peer_reported_for_awaiting_too_much() {
peer_a.clone(),
awaiting_message(hash_a, vec![pov_hash]),
).focus().unwrap(),
).await.unwrap();
).await;
}
assert_eq!(state.peer_state[&peer_a].awaited[&hash_a].len(), max_plausibly_awaited);
@@ -602,7 +602,7 @@ fn peer_reported_for_awaiting_too_much() {
peer_a.clone(),
awaiting_message(hash_a, vec![last_pov_hash]),
).focus().unwrap(),
).await.unwrap();
).await;
// No more bookkeeping for you!
assert_eq!(state.peer_state[&peer_a].awaited[&hash_a].len(), max_plausibly_awaited);
@@ -672,7 +672,7 @@ fn peer_reported_for_awaiting_outside_their_view() {
peer_a.clone(),
awaiting_message(hash_b, vec![pov_hash]),
).focus().unwrap(),
).await.unwrap();
).await;
assert!(state.peer_state[&peer_a].awaited.get(&hash_b).is_none());
@@ -735,7 +735,7 @@ fn peer_reported_for_awaiting_outside_our_view() {
peer_a.clone(),
awaiting_message(hash_b, vec![pov_hash]),
).focus().unwrap(),
).await.unwrap();
).await;
// Illegal `awaited` is ignored.
assert!(state.peer_state[&peer_a].awaited[&hash_b].is_empty());
@@ -810,7 +810,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() {
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
).focus().unwrap(),
).await.unwrap();
).await;
assert_eq!(&*pov_recv.await.unwrap(), &pov);
@@ -893,7 +893,7 @@ fn peer_completing_request_no_longer_awaiting() {
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
).focus().unwrap(),
).await.unwrap();
).await;
assert_eq!(&*pov_recv.await.unwrap(), &pov);
@@ -523,7 +523,7 @@ async fn circulate_statement_and_dependents(
relay_parent: Hash,
statement: SignedFullStatement,
metrics: &Metrics,
) -> SubsystemResult<()> {
) {
if let Some(active_head)= active_heads.get_mut(&relay_parent) {
// First circulate the statement directly to all peers needing it.
@@ -532,7 +532,7 @@ async fn circulate_statement_and_dependents(
match active_head.note_statement(statement) {
NotedStatement::Fresh(stored) => Some((
*stored.compact().candidate_hash(),
circulate_statement(peers, ctx, relay_parent, stored).await?,
circulate_statement(peers, ctx, relay_parent, stored).await,
)),
_ => None,
}
@@ -552,13 +552,11 @@ async fn circulate_statement_and_dependents(
candidate_hash,
&*active_head,
metrics,
).await?;
).await;
}
}
}
}
Ok(())
}
fn statement_message(relay_parent: Hash, statement: SignedFullStatement)
@@ -577,7 +575,7 @@ async fn circulate_statement(
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
relay_parent: Hash,
stored: &StoredStatement,
) -> SubsystemResult<Vec<PeerId>> {
) -> Vec<PeerId> {
let fingerprint = stored.fingerprint();
let mut peers_to_send = HashMap::new();
@@ -594,14 +592,14 @@ async fn circulate_statement(
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers_to_send.keys().cloned().collect(),
payload,
))).await?;
))).await;
}
Ok(peers_to_send.into_iter().filter_map(|(peer, needs_dependent)| if needs_dependent {
peers_to_send.into_iter().filter_map(|(peer, needs_dependent)| if needs_dependent {
Some(peer)
} else {
None
}).collect())
}).collect()
}
/// Send all statements about a given candidate hash to a peer.
@@ -614,7 +612,7 @@ async fn send_statements_about(
candidate_hash: CandidateHash,
active_head: &ActiveHeadData,
metrics: &Metrics,
) -> SubsystemResult<()> {
) {
for statement in active_head.statements_about(candidate_hash) {
if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() {
let payload = statement_message(
@@ -624,13 +622,11 @@ async fn send_statements_about(
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await?;
)).await;
metrics.on_statement_distributed();
}
}
Ok(())
}
/// Send all statements at a given relay-parent to a peer.
@@ -642,7 +638,7 @@ async fn send_statements(
relay_parent: Hash,
active_head: &ActiveHeadData,
metrics: &Metrics,
) -> SubsystemResult<()> {
) {
for statement in active_head.statements() {
if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() {
let payload = statement_message(
@@ -652,20 +648,18 @@ async fn send_statements(
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await?;
)).await;
metrics.on_statement_distributed();
}
}
Ok(())
}
async fn report_peer(
ctx: &mut impl SubsystemContext,
peer: PeerId,
rep: Rep,
) -> SubsystemResult<()> {
) {
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep)
)).await
@@ -685,13 +679,14 @@ async fn handle_incoming_message<'a>(
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
message: protocol_v1::StatementDistributionMessage,
metrics: &Metrics,
) -> SubsystemResult<Option<(Hash, &'a StoredStatement)>> {
) -> Option<(Hash, &'a StoredStatement)> {
let (relay_parent, statement) = match message {
protocol_v1::StatementDistributionMessage::Statement(r, s) => (r, s),
};
if !our_view.contains(&relay_parent) {
return report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await.map(|_| None);
report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await;
return None;
}
let active_head = match active_heads.get_mut(&relay_parent) {
@@ -703,13 +698,14 @@ async fn handle_incoming_message<'a>(
requested_relay_parent = %relay_parent,
"our view out-of-sync with active heads; head not found",
);
return Ok(None);
return None;
}
};
// check the signature on the statement.
if let Err(()) = check_statement_signature(&active_head, relay_parent, &statement) {
return report_peer(ctx, peer, COST_INVALID_SIGNATURE).await.map(|_| None);
report_peer(ctx, peer, COST_INVALID_SIGNATURE).await;
return None;
}
// Ensure the statement is stored in the peer data.
@@ -720,8 +716,8 @@ async fn handle_incoming_message<'a>(
let max_message_count = active_head.validators.len() * 2;
match peer_data.receive(&relay_parent, &fingerprint, max_message_count) {
Err(rep) => {
report_peer(ctx, peer, rep).await?;
return Ok(None)
report_peer(ctx, peer, rep).await;
return None;
}
Ok(true) => {
// Send the peer all statements concerning the candidate that we have,
@@ -734,7 +730,7 @@ async fn handle_incoming_message<'a>(
fingerprint.0.candidate_hash().clone(),
&*active_head,
metrics,
).await?
).await;
}
Ok(false) => {}
}
@@ -742,14 +738,14 @@ async fn handle_incoming_message<'a>(
// Note: `peer_data.receive` already ensures that the statement is not an unbounded equivocation
// or unpinned to a seconded candidate. So it is safe to place it into the storage.
match active_head.note_statement(statement) {
NotedStatement::NotUseful => Ok(None),
NotedStatement::NotUseful => None,
NotedStatement::UsefulButKnown => {
report_peer(ctx, peer, BENEFIT_VALID_STATEMENT).await?;
Ok(None)
report_peer(ctx, peer, BENEFIT_VALID_STATEMENT).await;
None
}
NotedStatement::Fresh(statement) => {
report_peer(ctx, peer, BENEFIT_VALID_STATEMENT_FIRST).await?;
Ok(Some((relay_parent, statement)))
report_peer(ctx, peer, BENEFIT_VALID_STATEMENT_FIRST).await;
Some((relay_parent, statement))
}
}
}
@@ -763,7 +759,7 @@ async fn update_peer_view_and_send_unlocked(
active_heads: &HashMap<Hash, ActiveHeadData>,
new_view: View,
metrics: &Metrics,
) -> SubsystemResult<()> {
) {
let old_view = std::mem::replace(&mut peer_data.view, new_view);
// Remove entries for all relay-parents in the old view but not the new.
@@ -785,11 +781,9 @@ async fn update_peer_view_and_send_unlocked(
new,
active_head,
metrics,
).await?;
).await;
}
}
Ok(())
}
#[tracing::instrument(level = "trace", skip(peers, active_heads, ctx, metrics), fields(subsystem = LOG_TARGET))]
@@ -800,19 +794,16 @@ async fn handle_network_update(
our_view: &mut View,
update: NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>,
metrics: &Metrics,
) -> SubsystemResult<()> {
) {
match update {
NetworkBridgeEvent::PeerConnected(peer, _role) => {
peers.insert(peer, PeerData {
view: Default::default(),
view_knowledge: Default::default(),
});
Ok(())
}
NetworkBridgeEvent::PeerDisconnected(peer) => {
peers.remove(&peer);
Ok(())
}
NetworkBridgeEvent::PeerMessage(peer, message) => {
match peers.get_mut(&peer) {
@@ -825,7 +816,7 @@ async fn handle_network_update(
ctx,
message,
metrics,
).await?;
).await;
if let Some((relay_parent, new)) = new_stored {
// When we receive a new message from a peer, we forward it to the
@@ -833,12 +824,10 @@ async fn handle_network_update(
let message = AllMessages::CandidateBacking(
CandidateBackingMessage::Statement(relay_parent, new.statement.clone())
);
ctx.send_message(message).await?;
ctx.send_message(message).await;
}
Ok(())
}
None => Ok(()),
None => (),
}
}
@@ -854,7 +843,7 @@ async fn handle_network_update(
metrics,
).await
}
None => Ok(()),
None => (),
}
}
NetworkBridgeEvent::OurViewChange(view) => {
@@ -872,8 +861,6 @@ async fn handle_network_update(
);
}
}
Ok(())
}
}
@@ -917,7 +904,7 @@ impl StatementDistribution {
ctx.send_messages(
std::iter::once(val_message).chain(std::iter::once(session_message))
).await?;
).await;
match (val_rx.await?, session_rx.await?) {
(Ok(v), Ok(s)) => (v, s),
@@ -959,7 +946,7 @@ impl StatementDistribution {
relay_parent,
statement,
&metrics,
).await?;
).await;
}
StatementDistributionMessage::NetworkBridgeUpdateV1(event) => {
let _timer = metrics.time_network_bridge_update_v1();
@@ -971,7 +958,7 @@ impl StatementDistribution {
&mut our_view,
event,
&metrics,
).await?
).await;
}
StatementDistributionMessage::RegisterStatementListener(tx) => {
statement_listeners.push(tx);
@@ -1428,7 +1415,7 @@ mod tests {
&active_heads,
new_view.clone(),
&Default::default(),
).await.unwrap();
).await;
assert_eq!(peer_data.view, new_view);
assert!(!peer_data.view_knowledge.contains_key(&hash_a));
@@ -1544,7 +1531,7 @@ mod tests {
&mut ctx,
hash_b,
&statement,
).await.unwrap();
).await;
{
assert_eq!(needs_dependents.len(), 2);
@@ -64,7 +64,7 @@ impl Subsystem1 {
}.into(),
tx,
)
)).await.unwrap();
)).await;
}
}
}
+71 -48
View File
@@ -193,20 +193,20 @@ pub struct OverseerHandler {
impl OverseerHandler {
/// Inform the `Overseer` that that some block was imported.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
pub async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
self.events_tx.send(Event::BlockImported(block)).await.map_err(Into::into)
pub async fn block_imported(&mut self, block: BlockInfo) {
self.send_and_log_error(Event::BlockImported(block)).await
}
/// Send some message to one of the `Subsystem`s.
#[tracing::instrument(level = "trace", skip(self, msg), fields(subsystem = LOG_TARGET))]
pub async fn send_msg(&mut self, msg: impl Into<AllMessages>) -> SubsystemResult<()> {
self.events_tx.send(Event::MsgToSubsystem(msg.into())).await.map_err(Into::into)
pub async fn send_msg(&mut self, msg: impl Into<AllMessages>) {
self.send_and_log_error(Event::MsgToSubsystem(msg.into())).await
}
/// Inform the `Overseer` that that some block was finalized.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
pub async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
self.events_tx.send(Event::BlockFinalized(block)).await.map_err(Into::into)
pub async fn block_finalized(&mut self, block: BlockInfo) {
self.send_and_log_error(Event::BlockFinalized(block)).await
}
/// Wait for a block with the given hash to be in the active-leaves set.
@@ -217,17 +217,23 @@ impl OverseerHandler {
/// the response channel may never return if the hash was deactivated before this call.
/// In this case, it's the caller's responsibility to ensure a timeout is set.
#[tracing::instrument(level = "trace", skip(self, response_channel), fields(subsystem = LOG_TARGET))]
pub async fn wait_for_activation(&mut self, hash: Hash, response_channel: oneshot::Sender<SubsystemResult<()>>) -> SubsystemResult<()> {
self.events_tx.send(Event::ExternalRequest(ExternalRequest::WaitForActivation {
pub async fn wait_for_activation(&mut self, hash: Hash, response_channel: oneshot::Sender<SubsystemResult<()>>) {
self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation {
hash,
response_channel
})).await.map_err(Into::into)
})).await
}
/// Tell `Overseer` to shutdown.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
pub async fn stop(&mut self) -> SubsystemResult<()> {
self.events_tx.send(Event::Stop).await.map_err(Into::into)
pub async fn stop(&mut self) {
self.send_and_log_error(Event::Stop).await
}
async fn send_and_log_error(&mut self, event: Event) {
if self.events_tx.send(event).await.is_err() {
tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
}
}
}
@@ -239,7 +245,7 @@ impl OverseerHandler {
pub async fn forward_events<P: BlockchainEvents<Block>>(
client: Arc<P>,
mut handler: OverseerHandler,
) -> SubsystemResult<()> {
) {
let mut finality = client.finality_notification_stream();
let mut imports = client.import_notification_stream();
@@ -248,7 +254,7 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(
f = finality.next() => {
match f {
Some(block) => {
handler.block_finalized(block.into()).await?;
handler.block_finalized(block.into()).await;
}
None => break,
}
@@ -256,7 +262,7 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(
i = imports.next() => {
match i {
Some(block) => {
handler.block_imported(block.into()).await?;
handler.block_imported(block.into()).await;
}
None => break,
}
@@ -264,8 +270,6 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(
complete => break,
}
}
Ok(())
}
impl Debug for ToOverseer {
@@ -338,15 +342,34 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
}).await.map_err(Into::into)
}
async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
self.tx.send(ToOverseer::SubsystemMessage(msg)).await.map_err(Into::into)
async fn send_message(&mut self, msg: AllMessages) {
self.send_and_log_error(ToOverseer::SubsystemMessage(msg)).await
}
async fn send_messages<T>(&mut self, msgs: T) -> SubsystemResult<()>
async fn send_messages<T>(&mut self, msgs: T)
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
{
let mut msgs = stream::iter(msgs.into_iter().map(ToOverseer::SubsystemMessage).map(Ok));
self.tx.send_all(&mut msgs).await.map_err(Into::into)
if self.tx.send_all(&mut msgs).await.is_err() {
tracing::debug!(
target: LOG_TARGET,
msg_type = std::any::type_name::<M>(),
"Failed to send messages to Overseer",
);
}
}
}
impl<M> OverseerSubsystemContext<M> {
async fn send_and_log_error(&mut self, msg: ToOverseer) {
if self.tx.send(msg).await.is_err() {
tracing::debug!(
target: LOG_TARGET,
msg_type = std::any::type_name::<M>(),
"Failed to send a message to Overseer",
);
}
}
}
@@ -1712,7 +1735,7 @@ mod tests {
tx,
)
)
).await.unwrap();
).await;
c += 1;
continue;
}
@@ -1786,7 +1809,7 @@ mod tests {
Some(msg) => {
s1_results.push(msg);
if s1_results.len() == 10 {
handler.stop().await.unwrap();
handler.stop().await;
}
}
None => break,
@@ -1844,10 +1867,10 @@ mod tests {
pin_mut!(overseer_fut);
handler.block_imported(second_block).await.unwrap();
handler.block_imported(third_block).await.unwrap();
handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await.unwrap();
handler.stop().await.unwrap();
handler.block_imported(second_block).await;
handler.block_imported(third_block).await;
handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
handler.stop().await;
select! {
res = overseer_fut => {
@@ -2012,8 +2035,8 @@ mod tests {
let mut ss5_results = Vec::new();
let mut ss6_results = Vec::new();
handler.block_imported(second_block).await.unwrap();
handler.block_imported(third_block).await.unwrap();
handler.block_imported(second_block).await;
handler.block_imported(third_block).await;
let expected_heartbeats = vec![
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(first_block_hash)),
@@ -2048,7 +2071,7 @@ mod tests {
if ss5_results.len() == expected_heartbeats.len() &&
ss6_results.len() == expected_heartbeats.len() {
handler.stop().await.unwrap();
handler.stop().await;
}
}
@@ -2106,7 +2129,7 @@ mod tests {
let mut ss6_results = Vec::new();
// this should stop work on both forks we started with earlier.
handler.block_finalized(third_block).await.unwrap();
handler.block_finalized(third_block).await;
let expected_heartbeats = vec![
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
@@ -2141,7 +2164,7 @@ mod tests {
if ss5_results.len() == expected_heartbeats.len() &&
ss6_results.len() == expected_heartbeats.len() {
handler.stop().await.unwrap();
handler.stop().await;
}
}
@@ -2343,28 +2366,28 @@ mod tests {
hash: Default::default(),
parent_hash: Default::default(),
number: Default::default(),
}).await.unwrap();
}).await;
// send a msg to each subsystem
// except for BitfieldSigning as the message is not instantiable
handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await.unwrap();
handler.send_msg(AllMessages::CandidateBacking(test_candidate_backing_msg())).await.unwrap();
handler.send_msg(AllMessages::CandidateSelection(test_candidate_selection_msg())).await.unwrap();
handler.send_msg(AllMessages::CollationGeneration(test_collator_generation_msg())).await.unwrap();
handler.send_msg(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await.unwrap();
handler.send_msg(AllMessages::StatementDistribution(test_statement_distribution_msg())).await.unwrap();
handler.send_msg(AllMessages::AvailabilityDistribution(test_availability_distribution_msg())).await.unwrap();
// handler.send_msg(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await.unwrap();
handler.send_msg(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await.unwrap();
handler.send_msg(AllMessages::Provisioner(test_provisioner_msg())).await.unwrap();
handler.send_msg(AllMessages::PoVDistribution(test_pov_distribution_msg())).await.unwrap();
handler.send_msg(AllMessages::RuntimeApi(test_runtime_api_msg())).await.unwrap();
handler.send_msg(AllMessages::AvailabilityStore(test_availability_store_msg())).await.unwrap();
handler.send_msg(AllMessages::NetworkBridge(test_network_bridge_msg())).await.unwrap();
handler.send_msg(AllMessages::ChainApi(test_chain_api_msg())).await.unwrap();
handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
handler.send_msg(AllMessages::CandidateBacking(test_candidate_backing_msg())).await;
handler.send_msg(AllMessages::CandidateSelection(test_candidate_selection_msg())).await;
handler.send_msg(AllMessages::CollationGeneration(test_collator_generation_msg())).await;
handler.send_msg(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await;
handler.send_msg(AllMessages::StatementDistribution(test_statement_distribution_msg())).await;
handler.send_msg(AllMessages::AvailabilityDistribution(test_availability_distribution_msg())).await;
// handler.send_msg(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await;
handler.send_msg(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await;
handler.send_msg(AllMessages::Provisioner(test_provisioner_msg())).await;
handler.send_msg(AllMessages::PoVDistribution(test_pov_distribution_msg())).await;
handler.send_msg(AllMessages::RuntimeApi(test_runtime_api_msg())).await;
handler.send_msg(AllMessages::AvailabilityStore(test_availability_store_msg())).await;
handler.send_msg(AllMessages::NetworkBridge(test_network_bridge_msg())).await;
handler.send_msg(AllMessages::ChainApi(test_chain_api_msg())).await;
// send a stop signal to each subsystems
handler.stop().await.unwrap();
handler.stop().await;
select! {
res = overseer_fut => {
@@ -191,15 +191,14 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext
Ok(())
}
async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
async fn send_message(&mut self, msg: AllMessages) {
self.tx
.send(msg)
.await
.expect("test overseer no longer live");
Ok(())
}
async fn send_messages<T>(&mut self, msgs: T) -> SubsystemResult<()>
async fn send_messages<T>(&mut self, msgs: T)
where
T: IntoIterator<Item = AllMessages> + Send,
T::IntoIter: Send,
@@ -209,8 +208,6 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext
.send_all(&mut iter)
.await
.expect("test overseer no longer live");
Ok(())
}
}
@@ -341,7 +338,7 @@ mod tests {
spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed());
block_on(handler.send_msg(CandidateSelectionMessage::Invalid(Default::default(), Default::default()))).unwrap();
block_on(handler.send_msg(CandidateSelectionMessage::Invalid(Default::default(), Default::default())));
assert!(matches!(block_on(rx.into_future()).0.unwrap(), CandidateSelectionMessage::Invalid(_, _)));
}
}
+7 -13
View File
@@ -207,13 +207,11 @@ where
{
let (tx, rx) = oneshot::channel();
ctx
.send_message(
AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx)))
.try_into()
.map_err(|err| Error::SenderConversion(format!("{:?}", err)))?,
)
.await?;
ctx.send_message(
AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx)))
.try_into()
.map_err(|err| Error::SenderConversion(format!("{:?}", err)))?,
).await;
Ok(rx)
}
@@ -752,7 +750,7 @@ where
break
},
outgoing = jobs.next().fuse() =>
Self::handle_outgoing(outgoing, &mut ctx, &mut err_tx).await,
Self::handle_outgoing(outgoing, &mut ctx).await,
complete => break,
}
}
@@ -866,13 +864,9 @@ where
async fn handle_outgoing(
outgoing: Option<Job::FromJob>,
ctx: &mut Context,
err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
) {
let msg = outgoing.expect("the Jobs stream never ends; qed");
if let Err(e) = ctx.send_message(msg.into()).await {
let e = JobsError::Utility(e.into());
Self::fwd_err(None, e, err_tx).await;
}
ctx.send_message(msg.into()).await;
}
}
@@ -63,7 +63,7 @@ pub async fn connect_to_validators<Context: SubsystemContext>(
relay_parent,
RuntimeApiRequest::ValidatorDiscovery(validators.clone(), tx),
)
)).await?;
)).await;
let maybe_authorities = rx.await??;
let authorities: Vec<_> = maybe_authorities.iter()
@@ -97,7 +97,7 @@ async fn connect_to_authorities<Context: SubsystemContext>(
validator_ids,
connected,
}
)).await?;
)).await;
Ok(connected_rx)
}
+2 -2
View File
@@ -204,10 +204,10 @@ pub trait SubsystemContext: Send + 'static {
) -> SubsystemResult<()>;
/// Send a direct message to some other `Subsystem`, routed based on message type.
async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()>;
async fn send_message(&mut self, msg: AllMessages);
/// Send multiple direct messages to other `Subsystem`s, routed based on message type.
async fn send_messages<T>(&mut self, msgs: T) -> SubsystemResult<()>
async fn send_messages<T>(&mut self, msgs: T)
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send;
}
+2 -4
View File
@@ -336,13 +336,11 @@ impl PolkadotTestNode {
self.overseer_handler
.send_msg(CollationGenerationMessage::Initialize(config))
.await
.expect("Registers the collator");
.await;
self.overseer_handler
.send_msg(CollatorProtocolMessage::CollateOn(para_id))
.await
.expect("Sends CollateOn");
.await;
}
}
@@ -86,13 +86,11 @@ fn main() -> Result<()> {
};
overseer_handler
.send_msg(CollationGenerationMessage::Initialize(config))
.await
.expect("Registers collator");
.await;
overseer_handler
.send_msg(CollatorProtocolMessage::CollateOn(PARA_ID))
.await
.expect("Collates on");
.await;
Ok(full_node.task_manager)
}