refactor+feat: allow subsystems to send only declared messages, generate graphviz (#5314)

Closes #3774
Closes #3826
This commit is contained in:
Bernhard Schuster
2022-05-12 17:39:05 +02:00
committed by GitHub
parent 26340b9054
commit 511891dcce
102 changed files with 3853 additions and 2514 deletions
@@ -38,8 +38,10 @@ use polkadot_node_network_protocol::{
use polkadot_node_primitives::{CollationSecondedSignal, PoV, Statement};
use polkadot_node_subsystem::{
jaeger,
messages::{CollatorProtocolMessage, NetworkBridgeEvent, NetworkBridgeMessage},
overseer, FromOverseer, OverseerSignal, PerLeafSpan, SubsystemContext,
messages::{
CollatorProtocolMessage, NetworkBridgeEvent, NetworkBridgeMessage, RuntimeApiMessage,
},
overseer, FromOverseer, OverseerSignal, PerLeafSpan,
};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
@@ -360,6 +362,7 @@ impl State {
/// or the relay-parent isn't in the active-leaves set, we ignore the message
/// as it must be invalid in that case - although this indicates a logic error
/// elsewhere in the node.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn distribute_collation<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
@@ -368,11 +371,7 @@ async fn distribute_collation<Context>(
receipt: CandidateReceipt,
pov: PoV,
result_sender: Option<oneshot::Sender<CollationSecondedSignal>>,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
) -> Result<()> {
let relay_parent = receipt.descriptor.relay_parent;
// This collation is not in the active-leaves set.
@@ -398,7 +397,7 @@ where
// Determine which core the para collated-on is assigned to.
// If it is not scheduled then ignore the message.
let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? {
let (our_core, num_cores) = match determine_core(ctx.sender(), id, relay_parent).await? {
Some(core) => core,
None => {
gum::warn!(
@@ -461,16 +460,12 @@ where
/// Get the Id of the Core that is assigned to the para being collated on if any
/// and the total number of cores.
async fn determine_core<Context>(
ctx: &mut Context,
async fn determine_core(
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
para_id: ParaId,
relay_parent: Hash,
) -> Result<Option<(CoreIndex, usize)>>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
let cores = get_availability_cores(ctx, relay_parent).await?;
) -> Result<Option<(CoreIndex, usize)>> {
let cores = get_availability_cores(sender, relay_parent).await?;
for (idx, core) in cores.iter().enumerate() {
if let CoreState::Scheduled(occupied) = core {
@@ -493,17 +488,14 @@ struct GroupValidators {
/// Figure out current group of validators assigned to the para being collated on.
///
/// Returns [`ValidatorId`]'s of current group as determined based on the `relay_parent`.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn determine_our_validators<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
core_index: CoreIndex,
cores: usize,
relay_parent: Hash,
) -> Result<GroupValidators>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
) -> Result<GroupValidators> {
let session_index = runtime.get_session_index_for_child(ctx.sender(), relay_parent).await?;
let info = &runtime
.get_session_info_by_index(ctx.sender(), relay_parent, session_index)
@@ -511,7 +503,7 @@ where
.session_info;
gum::debug!(target: LOG_TARGET, ?session_index, "Received session info");
let groups = &info.validator_groups;
let rotation_info = get_group_rotation_info(ctx, relay_parent).await?;
let rotation_info = get_group_rotation_info(ctx.sender(), relay_parent).await?;
let current_group_index = rotation_info.group_for_core(core_index, cores);
let current_validators = groups
@@ -530,11 +522,8 @@ where
}
/// Issue a `Declare` collation message to the given `peer`.
async fn declare<Context>(ctx: &mut Context, state: &mut State, peer: PeerId)
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn declare<Context>(ctx: &mut Context, state: &mut State, peer: PeerId) {
let declare_signature_payload = protocol_v1::declare_signature_payload(&state.local_peer_id);
if let Some(para_id) = state.collating_on {
@@ -554,11 +543,11 @@ where
/// Issue a connection request to a set of validators and
/// revoke the previous connection request.
async fn connect_to_validators<Context>(ctx: &mut Context, validator_ids: Vec<AuthorityDiscoveryId>)
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn connect_to_validators<Context>(
ctx: &mut Context,
validator_ids: Vec<AuthorityDiscoveryId>,
) {
// ignore address resolution failure
// will reissue a new request on new collation
let (failed, _) = oneshot::channel();
@@ -574,15 +563,13 @@ where
///
/// This will only advertise a collation if there exists one for the given `relay_parent` and the given `peer` is
/// set as validator for our para at the given `relay_parent`.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn advertise_collation<Context>(
ctx: &mut Context,
state: &mut State,
relay_parent: Hash,
peer: PeerId,
) where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
) {
let should_advertise = state
.our_validators_groups
.get(&relay_parent)
@@ -635,16 +622,13 @@ async fn advertise_collation<Context>(
}
/// The main incoming message dispatching switch.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn process_msg<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
state: &mut State,
msg: CollatorProtocolMessage,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
) -> Result<()> {
use CollatorProtocolMessage::*;
match msg {
@@ -748,17 +732,14 @@ async fn send_collation(
}
/// A networking messages switch.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn handle_incoming_peer_message<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
state: &mut State,
origin: PeerId,
msg: protocol_v1::CollatorProtocolMessage,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
) -> Result<()> {
use protocol_v1::CollatorProtocolMessage::*;
match msg {
@@ -831,15 +812,12 @@ where
}
/// Process an incoming network request for a collation.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn handle_incoming_request<Context>(
ctx: &mut Context,
state: &mut State,
req: IncomingRequest<request_v1::CollationFetchingRequest>,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
) -> Result<()> {
let _span = state
.span_per_relay_parent
.get(&req.payload.relay_parent)
@@ -907,15 +885,13 @@ where
}
/// Our view has changed.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn handle_peer_view_change<Context>(
ctx: &mut Context,
state: &mut State,
peer_id: PeerId,
view: View,
) where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
) {
let current = state.peer_views.entry(peer_id.clone()).or_default();
let added: Vec<Hash> = view.difference(&*current).cloned().collect();
@@ -928,16 +904,13 @@ async fn handle_peer_view_change<Context>(
}
/// Bridge messages switch.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn handle_network_msg<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
state: &mut State,
bridge_message: NetworkBridgeEvent<net_protocol::CollatorProtocolMessage>,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
) -> Result<()> {
use NetworkBridgeEvent::*;
match bridge_message {
@@ -1021,17 +994,14 @@ async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()>
}
/// The collator protocol collator side main loop.
#[overseer::contextbounds(CollatorProtocol, prefix = crate::overseer)]
pub(crate) async fn run<Context>(
mut ctx: Context,
local_peer_id: PeerId,
collator_pair: CollatorPair,
mut req_receiver: IncomingRequestReceiver<request_v1::CollationFetchingRequest>,
metrics: Metrics,
) -> std::result::Result<(), FatalError>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
) -> std::result::Result<(), FatalError> {
use OverseerSignal::*;
let mut state = State::new(local_peer_id, collator_pair, metrics);
@@ -34,9 +34,7 @@ use polkadot_node_network_protocol::{
use polkadot_primitives::v2::CollatorPair;
use polkadot_node_subsystem::{
errors::SubsystemError,
messages::{CollatorProtocolMessage, NetworkBridgeMessage},
overseer, SpawnedSubsystem, SubsystemContext, SubsystemSender,
errors::SubsystemError, messages::NetworkBridgeMessage, overseer, SpawnedSubsystem,
};
mod error;
@@ -89,6 +87,7 @@ pub struct CollatorProtocolSubsystem {
protocol_side: ProtocolSide,
}
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
impl CollatorProtocolSubsystem {
/// Start the collator protocol.
/// If `id` is `Some` this is a collator side of the protocol.
@@ -98,11 +97,7 @@ impl CollatorProtocolSubsystem {
Self { protocol_side }
}
async fn run<Context>(self, ctx: Context) -> std::result::Result<(), error::FatalError>
where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
async fn run<Context>(self, ctx: Context) -> std::result::Result<(), error::FatalError> {
match self.protocol_side {
ProtocolSide::Validator { keystore, eviction_policy, metrics } =>
validator_side::run(ctx, keystore, eviction_policy, metrics).await,
@@ -112,12 +107,8 @@ impl CollatorProtocolSubsystem {
}
}
impl<Context> overseer::Subsystem<Context, SubsystemError> for CollatorProtocolSubsystem
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
<Context as SubsystemContext>::Sender: SubsystemSender,
{
#[overseer::subsystem(CollatorProtocol, error=SubsystemError, prefix=self::overseer)]
impl<Context> CollatorProtocolSubsystem {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self
.run(ctx)
@@ -129,10 +120,11 @@ where
}
/// Modify the reputation of a peer based on its behavior.
async fn modify_reputation<Context>(ctx: &mut Context, peer: PeerId, rep: Rep)
where
Context: SubsystemContext,
{
async fn modify_reputation(
sender: &mut impl overseer::CollatorProtocolSenderTrait,
peer: PeerId,
rep: Rep,
) {
gum::trace!(
target: LOG_TARGET,
rep = ?rep,
@@ -140,5 +132,5 @@ where
"reputation change for peer",
);
ctx.send_message(NetworkBridgeMessage::ReportPeer(peer, rep)).await;
sender.send_message(NetworkBridgeMessage::ReportPeer(peer, rep)).await;
}
@@ -48,9 +48,9 @@ use polkadot_node_subsystem::{
jaeger,
messages::{
CandidateBackingMessage, CollatorProtocolMessage, IfDisconnected, NetworkBridgeEvent,
NetworkBridgeMessage,
NetworkBridgeMessage, RuntimeApiMessage,
},
overseer, FromOverseer, OverseerSignal, PerLeafSpan, SubsystemContext, SubsystemSender,
overseer, FromOverseer, OverseerSignal, PerLeafSpan, SubsystemSender,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_primitives::v2::{CandidateReceipt, CollatorId, Hash, Id as ParaId};
@@ -362,7 +362,7 @@ struct ActiveParas {
impl ActiveParas {
async fn assign_incoming(
&mut self,
sender: &mut impl SubsystemSender,
sender: &mut impl SubsystemSender<RuntimeApiMessage>,
keystore: &SyncCryptoStorePtr,
new_relay_parents: impl IntoIterator<Item = Hash>,
) {
@@ -630,25 +630,19 @@ fn collator_peer_id(
})
}
async fn disconnect_peer<Context>(ctx: &mut Context, peer_id: PeerId)
where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
ctx.send_message(NetworkBridgeMessage::DisconnectPeer(peer_id, PeerSet::Collation))
async fn disconnect_peer(sender: &mut impl overseer::CollatorProtocolSenderTrait, peer_id: PeerId) {
sender
.send_message(NetworkBridgeMessage::DisconnectPeer(peer_id, PeerSet::Collation))
.await
}
/// Another subsystem has requested to fetch collations on a particular leaf for some para.
async fn fetch_collation<Context>(
ctx: &mut Context,
async fn fetch_collation(
sender: &mut impl overseer::CollatorProtocolSenderTrait,
state: &mut State,
pc: PendingCollation,
id: CollatorId,
) where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
) {
let (tx, rx) = oneshot::channel();
let PendingCollation { relay_parent, para_id, peer_id, .. } = pc;
@@ -663,7 +657,7 @@ async fn fetch_collation<Context>(
if let Some(peer_data) = state.peer_data.get(&peer_id) {
if peer_data.has_advertised(&relay_parent) {
request_collation(ctx, state, relay_parent, para_id, peer_id, tx).await;
request_collation(sender, state, relay_parent, para_id, peer_id, tx).await;
} else {
gum::debug!(
target: LOG_TARGET,
@@ -687,51 +681,44 @@ async fn fetch_collation<Context>(
}
/// Report a collator for some malicious actions.
async fn report_collator<Context>(
ctx: &mut Context,
async fn report_collator(
sender: &mut impl overseer::CollatorProtocolSenderTrait,
peer_data: &HashMap<PeerId, PeerData>,
id: CollatorId,
) where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
) {
if let Some(peer_id) = collator_peer_id(peer_data, &id) {
modify_reputation(ctx, peer_id, COST_REPORT_BAD).await;
modify_reputation(sender, peer_id, COST_REPORT_BAD).await;
}
}
/// Some other subsystem has reported a collator as a good one, bump reputation.
async fn note_good_collation<Context>(
ctx: &mut Context,
async fn note_good_collation(
sender: &mut impl overseer::CollatorProtocolSenderTrait,
peer_data: &HashMap<PeerId, PeerData>,
id: CollatorId,
) where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
) {
if let Some(peer_id) = collator_peer_id(peer_data, &id) {
modify_reputation(ctx, peer_id, BENEFIT_NOTIFY_GOOD).await;
modify_reputation(sender, peer_id, BENEFIT_NOTIFY_GOOD).await;
}
}
/// Notify a collator that its collation got seconded.
async fn notify_collation_seconded<Context>(
ctx: &mut Context,
async fn notify_collation_seconded(
sender: &mut impl overseer::CollatorProtocolSenderTrait,
peer_id: PeerId,
relay_parent: Hash,
statement: SignedFullStatement,
) where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
) {
let wire_message =
protocol_v1::CollatorProtocolMessage::CollationSeconded(relay_parent, statement.into());
ctx.send_message(NetworkBridgeMessage::SendCollationMessage(
vec![peer_id],
Versioned::V1(protocol_v1::CollationProtocol::CollatorProtocol(wire_message)),
))
.await;
sender
.send_message(NetworkBridgeMessage::SendCollationMessage(
vec![peer_id],
Versioned::V1(protocol_v1::CollationProtocol::CollatorProtocol(wire_message)),
))
.await;
modify_reputation(ctx, peer_id, BENEFIT_NOTIFY_GOOD).await;
modify_reputation(sender, peer_id, BENEFIT_NOTIFY_GOOD).await;
}
/// A peer's view has changed. A number of things should be done:
@@ -754,17 +741,14 @@ async fn handle_peer_view_change(state: &mut State, peer_id: PeerId, view: View)
/// - Check if the requested collation is in our view.
/// - Update `PerRequest` records with the `result` field if necessary.
/// And as such invocations of this function may rely on that.
async fn request_collation<Context>(
ctx: &mut Context,
async fn request_collation(
sender: &mut impl overseer::CollatorProtocolSenderTrait,
state: &mut State,
relay_parent: Hash,
para_id: ParaId,
peer_id: PeerId,
result: oneshot::Sender<(CandidateReceipt, PoV)>,
) where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
) {
if !state.view.contains(&relay_parent) {
gum::debug!(
target: LOG_TARGET,
@@ -815,29 +799,28 @@ async fn request_collation<Context>(
"Requesting collation",
);
ctx.send_message(NetworkBridgeMessage::SendRequests(
vec![requests],
IfDisconnected::ImmediateError,
))
.await;
sender
.send_message(NetworkBridgeMessage::SendRequests(
vec![requests],
IfDisconnected::ImmediateError,
))
.await;
}
/// Networking message has been received.
#[overseer::contextbounds(CollatorProtocol, prefix = overseer)]
async fn process_incoming_peer_message<Context>(
ctx: &mut Context,
state: &mut State,
origin: PeerId,
msg: protocol_v1::CollatorProtocolMessage,
) where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
) {
use protocol_v1::CollatorProtocolMessage::*;
use sp_runtime::traits::AppVerify;
match msg {
Declare(collator_id, para_id, signature) => {
if collator_peer_id(&state.peer_data, &collator_id).is_some() {
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(ctx.sender(), origin, COST_UNEXPECTED_MESSAGE).await;
return
}
@@ -850,7 +833,7 @@ async fn process_incoming_peer_message<Context>(
?para_id,
"Unknown peer",
);
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(ctx.sender(), origin, COST_UNEXPECTED_MESSAGE).await;
return
},
};
@@ -862,7 +845,7 @@ async fn process_incoming_peer_message<Context>(
?para_id,
"Peer is not in the collating state",
);
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(ctx.sender(), origin, COST_UNEXPECTED_MESSAGE).await;
return
}
@@ -873,7 +856,7 @@ async fn process_incoming_peer_message<Context>(
?para_id,
"Signature verification failure",
);
modify_reputation(ctx, origin, COST_INVALID_SIGNATURE).await;
modify_reputation(ctx.sender(), origin, COST_INVALID_SIGNATURE).await;
return
}
@@ -896,9 +879,9 @@ async fn process_incoming_peer_message<Context>(
"Declared as collator for unneeded para",
);
modify_reputation(ctx, origin.clone(), COST_UNNEEDED_COLLATOR).await;
modify_reputation(ctx.sender(), origin.clone(), COST_UNNEEDED_COLLATOR).await;
gum::trace!(target: LOG_TARGET, "Disconnecting unneeded collator");
disconnect_peer(ctx, origin).await;
disconnect_peer(ctx.sender(), origin).await;
}
},
AdvertiseCollation(relay_parent) => {
@@ -914,7 +897,7 @@ async fn process_incoming_peer_message<Context>(
"Advertise collation out of view",
);
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(ctx.sender(), origin, COST_UNEXPECTED_MESSAGE).await;
return
}
@@ -926,7 +909,7 @@ async fn process_incoming_peer_message<Context>(
?relay_parent,
"Advertise collation message has been received from an unknown peer",
);
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(ctx.sender(), origin, COST_UNEXPECTED_MESSAGE).await;
return
},
Some(p) => p,
@@ -962,7 +945,8 @@ async fn process_incoming_peer_message<Context>(
collations.status = CollationStatus::Fetching;
collations.waiting_collation = Some(id.clone());
fetch_collation(ctx, state, pending_collation.clone(), id).await;
fetch_collation(ctx.sender(), state, pending_collation.clone(), id)
.await;
},
CollationStatus::Seconded => {
gum::trace!(
@@ -984,7 +968,7 @@ async fn process_incoming_peer_message<Context>(
"Invalid advertisement",
);
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
modify_reputation(ctx.sender(), origin, COST_UNEXPECTED_MESSAGE).await;
},
}
},
@@ -1011,16 +995,13 @@ async fn remove_relay_parent(state: &mut State, relay_parent: Hash) -> Result<()
}
/// Our view has changed.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn handle_our_view_change<Context>(
ctx: &mut Context,
state: &mut State,
keystore: &SyncCryptoStorePtr,
view: OurView,
) -> Result<()>
where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
) -> Result<()> {
let old_view = std::mem::replace(&mut state.view, view);
let added: HashMap<Hash, Arc<jaeger::Span>> = state
@@ -1061,7 +1042,7 @@ where
?para_id,
"Disconnecting peer on view change (not current parachain id)"
);
disconnect_peer(ctx, peer_id.clone()).await;
disconnect_peer(ctx.sender(), peer_id.clone()).await;
}
}
}
@@ -1070,16 +1051,13 @@ where
}
/// Bridge event switch.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn handle_network_msg<Context>(
ctx: &mut Context,
state: &mut State,
keystore: &SyncCryptoStorePtr,
bridge_message: NetworkBridgeEvent<net_protocol::CollatorProtocolMessage>,
) -> Result<()>
where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
) -> Result<()> {
use NetworkBridgeEvent::*;
match bridge_message {
@@ -1109,15 +1087,13 @@ where
}
/// The main message receiver switch.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn process_msg<Context>(
ctx: &mut Context,
keystore: &SyncCryptoStorePtr,
msg: CollatorProtocolMessage,
state: &mut State,
) where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
) {
use CollatorProtocolMessage::*;
let _timer = state.metrics.time_process_msg();
@@ -1137,7 +1113,7 @@ async fn process_msg<Context>(
);
},
ReportCollator(id) => {
report_collator(ctx, &state.peer_data, id).await;
report_collator(ctx.sender(), &state.peer_data, id).await;
},
NetworkBridgeUpdate(event) => {
if let Err(e) = handle_network_msg(ctx, state, keystore, event).await {
@@ -1152,8 +1128,8 @@ async fn process_msg<Context>(
if let Some(collation_event) = state.pending_candidates.remove(&parent) {
let (collator_id, pending_collation) = collation_event;
let PendingCollation { relay_parent, peer_id, .. } = pending_collation;
note_good_collation(ctx, &state.peer_data, collator_id).await;
notify_collation_seconded(ctx, peer_id, relay_parent, stmt).await;
note_good_collation(ctx.sender(), &state.peer_data, collator_id).await;
notify_collation_seconded(ctx.sender(), peer_id, relay_parent, stmt).await;
if let Some(collations) = state.collations_per_relay_parent.get_mut(&parent) {
collations.status = CollationStatus::Seconded;
@@ -1184,7 +1160,7 @@ async fn process_msg<Context>(
Entry::Vacant(_) => return,
};
report_collator(ctx, &state.peer_data, id.clone()).await;
report_collator(ctx.sender(), &state.peer_data, id.clone()).await;
dequeue_next_collation_and_fetch(ctx, state, parent, id).await;
},
@@ -1211,16 +1187,13 @@ fn infinite_stream(every: Duration) -> impl FusedStream<Item = ()> {
}
/// The main run loop.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
pub(crate) async fn run<Context>(
mut ctx: Context,
keystore: SyncCryptoStorePtr,
eviction_policy: crate::CollatorEvictionPolicy,
metrics: Metrics,
) -> std::result::Result<(), crate::error::FatalError>
where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
) -> std::result::Result<(), crate::error::FatalError> {
let mut state = State { metrics, ..Default::default() };
let next_inactivity_stream = infinite_stream(ACTIVITY_POLL);
@@ -1247,7 +1220,7 @@ where
}
}
_ = next_inactivity_stream.next() => {
disconnect_inactive_peers(&mut ctx, &eviction_policy, &state.peer_data).await;
disconnect_inactive_peers(ctx.sender(), &eviction_policy, &state.peer_data).await;
}
res = state.collation_fetches.select_next_some() => {
handle_collation_fetched_result(&mut ctx, &mut state, res).await;
@@ -1270,7 +1243,7 @@ where
).await;
for (peer_id, rep) in reputation_changes {
modify_reputation(&mut ctx, peer_id, rep).await;
modify_reputation(ctx.sender(), peer_id, rep).await;
}
},
}
@@ -1304,9 +1277,9 @@ async fn poll_requests(
}
/// Dequeue another collation and fetch.
async fn dequeue_next_collation_and_fetch(
ctx: &mut (impl SubsystemContext<Message = CollatorProtocolMessage>
+ overseer::SubsystemContext<Message = CollatorProtocolMessage>),
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn dequeue_next_collation_and_fetch<Context>(
ctx: &mut Context,
state: &mut State,
relay_parent: Hash,
// The collator we tried to fetch from last.
@@ -1323,7 +1296,7 @@ async fn dequeue_next_collation_and_fetch(
?id,
"Successfully dequeued next advertisement - fetching ..."
);
fetch_collation(ctx, state, next, id).await;
fetch_collation(ctx.sender(), state, next, id).await;
} else {
gum::debug!(
target: LOG_TARGET,
@@ -1335,14 +1308,12 @@ async fn dequeue_next_collation_and_fetch(
}
/// Handle a fetched collation result.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn handle_collation_fetched_result<Context>(
ctx: &mut Context,
state: &mut State,
(mut collation_event, res): PendingCollationFetch,
) where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
) {
// If no prior collation for this relay parent has been seconded, then
// memorize the `collation_event` for that `relay_parent`, such that we may
// notify the collator of their successful second backing
@@ -1380,12 +1351,13 @@ async fn handle_collation_fetched_result<Context>(
if let Entry::Vacant(entry) = state.pending_candidates.entry(relay_parent) {
collation_event.1.commitments_hash = Some(candidate_receipt.commitments_hash);
ctx.send_message(CandidateBackingMessage::Second(
relay_parent.clone(),
candidate_receipt,
pov,
))
.await;
ctx.sender()
.send_message(CandidateBackingMessage::Second(
relay_parent.clone(),
candidate_receipt,
pov,
))
.await;
entry.insert(collation_event);
} else {
@@ -1401,18 +1373,15 @@ async fn handle_collation_fetched_result<Context>(
// This issues `NetworkBridge` notifications to disconnect from all inactive peers at the
// earliest possible point. This does not yet clean up any metadata, as that will be done upon
// receipt of the `PeerDisconnected` event.
async fn disconnect_inactive_peers<Context>(
ctx: &mut Context,
async fn disconnect_inactive_peers(
sender: &mut impl overseer::CollatorProtocolSenderTrait,
eviction_policy: &crate::CollatorEvictionPolicy,
peers: &HashMap<PeerId, PeerData>,
) where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
) {
for (peer, peer_data) in peers {
if peer_data.is_inactive(&eviction_policy) {
gum::trace!(target: LOG_TARGET, "Disconnecting inactive peer");
disconnect_peer(ctx, peer.clone()).await;
disconnect_peer(sender, peer.clone()).await;
}
}
}