mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 08:47:57 +00:00
Only fetch one collation at a time per relay parent (#3333)
* Only fetch one collation at a time per relay parent Before a validator would fetch all collations that were advertised to him. This pr changes the behavior to always just fetch one collation at a time. If fetching fails, the validator will start fetching one of the other collations. * Use enum to be more explicit * Review comments
This commit is contained in:
@@ -164,6 +164,7 @@ struct PerRequest {
|
||||
span: Option<jaeger::Span>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct CollatingPeerState {
|
||||
collator_id: CollatorId,
|
||||
para_id: ParaId,
|
||||
@@ -172,6 +173,7 @@ struct CollatingPeerState {
|
||||
last_active: Instant,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum PeerState {
|
||||
// The peer has connected at the given instant.
|
||||
Connected(Instant),
|
||||
@@ -186,6 +188,7 @@ enum AdvertisementError {
|
||||
UndeclaredCollator,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PeerData {
|
||||
view: View,
|
||||
state: PeerState,
|
||||
@@ -465,8 +468,7 @@ struct PendingCollation {
|
||||
|
||||
impl PendingCollation {
|
||||
fn new(relay_parent: Hash, para_id: &ParaId, peer_id: &PeerId) -> Self {
|
||||
let commitments_hash = None;
|
||||
Self { relay_parent, para_id: para_id.clone(), peer_id: peer_id.clone(), commitments_hash }
|
||||
Self { relay_parent, para_id: para_id.clone(), peer_id: peer_id.clone(), commitments_hash: None }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -477,6 +479,32 @@ type PendingCollationFetch = (
|
||||
std::result::Result<(CandidateReceipt, PoV), oneshot::Canceled>,
|
||||
);
|
||||
|
||||
/// The status of the collations in [`CollationsPerRelayParent`].
|
||||
#[derive(Debug)]
|
||||
enum CollationStatus {
|
||||
/// We are waiting for a collation to be advertised to us.
|
||||
Waiting,
|
||||
/// We are currently fetching a collation.
|
||||
Fetching,
|
||||
/// We have seconded a collation.
|
||||
Seconded,
|
||||
}
|
||||
|
||||
impl Default for CollationStatus {
|
||||
fn default() -> Self {
|
||||
Self::Waiting
|
||||
}
|
||||
}
|
||||
|
||||
/// Information about collations per relay parent.
|
||||
#[derive(Default)]
|
||||
struct CollationsPerRelayParent {
|
||||
/// What is the current status in regards to a collation for this relay parent?
|
||||
status: CollationStatus,
|
||||
/// Collation that were advertised to us, but we did not yet fetch.
|
||||
unfetched_collations: Vec<(PendingCollation, CollatorId)>,
|
||||
}
|
||||
|
||||
/// All state relevant for the validator side of the protocol lives here.
|
||||
#[derive(Default)]
|
||||
struct State {
|
||||
@@ -503,7 +531,10 @@ struct State {
|
||||
span_per_relay_parent: HashMap<Hash, PerLeafSpan>,
|
||||
|
||||
/// Keep track of all fetch collation requests
|
||||
collations: FuturesUnordered<BoxFuture<'static, PendingCollationFetch>>,
|
||||
collation_fetches: FuturesUnordered<BoxFuture<'static, PendingCollationFetch>>,
|
||||
|
||||
/// Information about the collations per relay parent.
|
||||
collations_per_relay_parent: HashMap<Hash, CollationsPerRelayParent>,
|
||||
|
||||
/// Keep track of all pending candidate collations
|
||||
pending_candidates: HashMap<Hash, CollationEvent>,
|
||||
@@ -528,19 +559,20 @@ async fn disconnect_peer(ctx: &mut impl SubsystemContext, peer_id: PeerId) {
|
||||
}
|
||||
|
||||
/// Another subsystem has requested to fetch collations on a particular leaf for some para.
|
||||
async fn fetch_collation<Context>(
|
||||
ctx: &mut Context,
|
||||
async fn fetch_collation(
|
||||
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
|
||||
state: &mut State,
|
||||
pc: PendingCollation,
|
||||
tx: oneshot::Sender<(CandidateReceipt, PoV)>
|
||||
)
|
||||
where
|
||||
Context: SubsystemContext<Message = CollatorProtocolMessage>
|
||||
{
|
||||
id: CollatorId,
|
||||
) {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let PendingCollation { relay_parent, para_id, peer_id, .. } = pc;
|
||||
if state.peer_data.get(&peer_id).map_or(false, |d| d.has_advertised(&relay_parent)) {
|
||||
request_collation(ctx, state, relay_parent, para_id, peer_id, tx).await;
|
||||
}
|
||||
|
||||
state.collation_fetches.push(rx.map(|r| ((id, pc), r)).boxed());
|
||||
}
|
||||
|
||||
/// Report a collator for some malicious actions.
|
||||
@@ -770,22 +802,26 @@ where
|
||||
?relay_parent,
|
||||
"Received advertise collation",
|
||||
);
|
||||
let (tx, rx) = oneshot::channel::<(
|
||||
CandidateReceipt,
|
||||
PoV,
|
||||
)>();
|
||||
|
||||
let pending_collation = PendingCollation::new(
|
||||
relay_parent,
|
||||
¶_id,
|
||||
&origin,
|
||||
);
|
||||
fetch_collation(ctx, state, pending_collation.clone(), tx).await;
|
||||
|
||||
let future = rx.map(|r|
|
||||
((id, pending_collation), r)
|
||||
);
|
||||
state.collations.push(Box::pin(future));
|
||||
let collations = state.collations_per_relay_parent.entry(relay_parent).or_default();
|
||||
|
||||
match collations.status {
|
||||
CollationStatus::Fetching =>
|
||||
collations.unfetched_collations.push((pending_collation, id)),
|
||||
CollationStatus::Waiting => {
|
||||
collations.status = CollationStatus::Fetching;
|
||||
drop(collations);
|
||||
|
||||
fetch_collation(ctx, state, pending_collation.clone(), id).await;
|
||||
},
|
||||
CollationStatus::Seconded => {},
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
tracing::debug!(
|
||||
@@ -824,6 +860,8 @@ async fn remove_relay_parent(
|
||||
state.pending_candidates.retain(|k, _| {
|
||||
k != &relay_parent
|
||||
});
|
||||
|
||||
state.collations_per_relay_parent.remove(&relay_parent);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -973,6 +1011,10 @@ where
|
||||
let PendingCollation { relay_parent, peer_id, .. } = pending_collation;
|
||||
note_good_collation(ctx, &state.peer_data, collator_id).await;
|
||||
notify_collation_seconded(ctx, peer_id, relay_parent, stmt).await;
|
||||
|
||||
if let Some(collations) = state.collations_per_relay_parent.get_mut(&parent) {
|
||||
collations.status = CollationStatus::Seconded;
|
||||
}
|
||||
} else {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
@@ -982,12 +1024,11 @@ where
|
||||
}
|
||||
}
|
||||
Invalid(parent, candidate_receipt) => {
|
||||
if match state.pending_candidates.get(&parent) {
|
||||
Some(collation_event)
|
||||
if Some(candidate_receipt.commitments_hash) == collation_event.1.commitments_hash
|
||||
=> true,
|
||||
_ => false,
|
||||
} {
|
||||
if state.pending_candidates
|
||||
.get(&parent)
|
||||
.map(|e| e.1.commitments_hash == Some(candidate_receipt.commitments_hash))
|
||||
.unwrap_or_default()
|
||||
{
|
||||
if let Some((id, _)) = state.pending_candidates.remove(&parent) {
|
||||
report_collator(ctx, &state.peer_data, id).await;
|
||||
}
|
||||
@@ -1022,7 +1063,6 @@ pub(crate) async fn run<Context>(
|
||||
|
||||
let mut state = State {
|
||||
metrics,
|
||||
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@@ -1053,53 +1093,9 @@ pub(crate) async fn run<Context>(
|
||||
_ = next_inactivity_stream.next() => {
|
||||
disconnect_inactive_peers(&mut ctx, &eviction_policy, &state.peer_data).await;
|
||||
}
|
||||
res = state.collations.next() => {
|
||||
// If no prior collation for this relay parent has been seconded, then
|
||||
// memoize the collation_event for that relay_parent, such that we may
|
||||
// notify the collator of their successful second backing
|
||||
if let Some((relay_parent, collation_event)) = match res {
|
||||
Some(
|
||||
(mut collation_event, Ok((candidate_receipt, pov)))
|
||||
) => {
|
||||
let relay_parent = &collation_event.1.relay_parent;
|
||||
// Verify whether this relay_parent has already been seconded
|
||||
if state.pending_candidates.get(relay_parent).is_none() {
|
||||
// Forward Candidate Receipt and PoV to candidate backing [CB]
|
||||
collation_event.1
|
||||
.commitments_hash = Some(candidate_receipt.commitments_hash);
|
||||
ctx.send_message(
|
||||
CandidateBackingMessage::Second(
|
||||
relay_parent.clone(),
|
||||
candidate_receipt,
|
||||
pov,
|
||||
).into()
|
||||
).await;
|
||||
Some((relay_parent.clone(), collation_event))
|
||||
} else {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
relay_parent = ?relay_parent,
|
||||
collator_id = ?collation_event.0,
|
||||
"Collation for this relay parent has already been seconded.",
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
Some(
|
||||
(collation_event, _)
|
||||
) => {
|
||||
let (id, pending_collation) = collation_event;
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
relay_parent = ?pending_collation.relay_parent,
|
||||
collator_id = ?id,
|
||||
"Collation fetching has timed out.",
|
||||
);
|
||||
None
|
||||
}
|
||||
_ => None,
|
||||
} {
|
||||
state.pending_candidates.insert(relay_parent, collation_event);
|
||||
res = state.collation_fetches.next() => {
|
||||
if let Some(res) = res {
|
||||
handle_collation_fetched_result(&mut ctx, &mut state, res).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1119,6 +1115,68 @@ pub(crate) async fn run<Context>(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle a fetched collation result.
|
||||
async fn handle_collation_fetched_result(
|
||||
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
|
||||
state: &mut State,
|
||||
(mut collation_event, res): PendingCollationFetch,
|
||||
) {
|
||||
// If no prior collation for this relay parent has been seconded, then
|
||||
// memoize the collation_event for that relay_parent, such that we may
|
||||
// notify the collator of their successful second backing
|
||||
let relay_parent = collation_event.1.relay_parent;
|
||||
|
||||
let (candidate_receipt, pov) = match res {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
tracing::debug!(
|
||||
target: LOG_TARGET,
|
||||
relay_parent = ?collation_event.1.relay_parent,
|
||||
para_id = ?collation_event.1.para_id,
|
||||
peer_id = ?collation_event.1.peer_id,
|
||||
collator_id = ?collation_event.0,
|
||||
error = ?e,
|
||||
"Failed to fetch collation.",
|
||||
);
|
||||
|
||||
let (next_try, id) = if let Some(collations) = state.collations_per_relay_parent.get_mut(&relay_parent) {
|
||||
if let Some(next_try) = collations.unfetched_collations.pop() {
|
||||
next_try
|
||||
} else if matches!(collations.status, CollationStatus::Fetching) {
|
||||
collations.status = CollationStatus::Waiting;
|
||||
return
|
||||
} else {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
status = ?collations.status,
|
||||
"Expected status `CollationStatus::Fetching` but got unexpected status."
|
||||
);
|
||||
return
|
||||
}
|
||||
} else {
|
||||
return
|
||||
};
|
||||
|
||||
fetch_collation(ctx, state, next_try, id).await;
|
||||
|
||||
return
|
||||
},
|
||||
};
|
||||
|
||||
if let Entry::Vacant(entry) = state.pending_candidates.entry(relay_parent) {
|
||||
collation_event.1.commitments_hash = Some(candidate_receipt.commitments_hash);
|
||||
ctx.send_message(
|
||||
CandidateBackingMessage::Second(
|
||||
relay_parent.clone(),
|
||||
candidate_receipt,
|
||||
pov,
|
||||
).into()
|
||||
).await;
|
||||
|
||||
entry.insert(collation_event);
|
||||
}
|
||||
}
|
||||
|
||||
// This issues `NetworkBridge` notifications to disconnect from all inactive peers at the
|
||||
// earliest possible point. This does not yet clean up any metadata, as that will be done upon
|
||||
// receipt of the `PeerDisconnected` event.
|
||||
@@ -1145,7 +1203,7 @@ async fn poll_collation_response<Context>(
|
||||
metrics: &Metrics,
|
||||
spans: &HashMap<Hash, PerLeafSpan>,
|
||||
pending_collation: &PendingCollation,
|
||||
per_req: &mut PerRequest
|
||||
per_req: &mut PerRequest,
|
||||
)
|
||||
-> bool
|
||||
where
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user