Improve dispute-coordinator message burstiness handling (#5471)

* Increase message channel size to 2048

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Use unbounded channel for reading data

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
This commit is contained in:
Andrei Sandu
2022-05-09 13:00:05 +03:00
committed by GitHub
parent 992705d127
commit e29d8e91da
3 changed files with 14 additions and 11 deletions
+6 -4
View File
@@ -599,7 +599,8 @@ async fn request_disputes(
RequestType::Recent => DisputeCoordinatorMessage::RecentDisputes(tx), RequestType::Recent => DisputeCoordinatorMessage::RecentDisputes(tx),
RequestType::Active => DisputeCoordinatorMessage::ActiveDisputes(tx), RequestType::Active => DisputeCoordinatorMessage::ActiveDisputes(tx),
}; };
sender.send_message(msg.into()).await; // Bounded by block production - `ProvisionerMessage::RequestInherentData`.
sender.send_unbounded_message(msg.into());
let recent_disputes = match rx.await { let recent_disputes = match rx.await {
Ok(r) => r, Ok(r) => r,
@@ -617,9 +618,10 @@ async fn request_votes(
disputes_to_query: Vec<(SessionIndex, CandidateHash)>, disputes_to_query: Vec<(SessionIndex, CandidateHash)>,
) -> Vec<(SessionIndex, CandidateHash, CandidateVotes)> { ) -> Vec<(SessionIndex, CandidateHash, CandidateVotes)> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
sender // Bounded by block production - `ProvisionerMessage::RequestInherentData`.
.send_message(DisputeCoordinatorMessage::QueryCandidateVotes(disputes_to_query, tx).into()) sender.send_unbounded_message(
.await; DisputeCoordinatorMessage::QueryCandidateVotes(disputes_to_query, tx).into(),
);
match rx.await { match rx.await {
Ok(v) => v, Ok(v) => v,
@@ -340,10 +340,10 @@ async fn get_active_disputes<Context: SubsystemContext>(
ctx: &mut Context, ctx: &mut Context,
) -> JfyiErrorResult<Vec<(SessionIndex, CandidateHash)>> { ) -> JfyiErrorResult<Vec<(SessionIndex, CandidateHash)>> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::ActiveDisputes( // Caller scope is in `update_leaves` and this is bounded by fork count.
tx, ctx.send_unbounded_message(AllMessages::DisputeCoordinator(
))) DisputeCoordinatorMessage::ActiveDisputes(tx),
.await; ));
rx.await.map_err(|_| JfyiError::AskActiveDisputesCanceled) rx.await.map_err(|_| JfyiError::AskActiveDisputesCanceled)
} }
@@ -354,10 +354,10 @@ async fn get_candidate_votes<Context: SubsystemContext>(
candidate_hash: CandidateHash, candidate_hash: CandidateHash,
) -> JfyiErrorResult<Option<CandidateVotes>> { ) -> JfyiErrorResult<Option<CandidateVotes>> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::DisputeCoordinator( // Caller scope is in `update_leaves` and this is bounded by fork count.
ctx.send_unbounded_message(AllMessages::DisputeCoordinator(
DisputeCoordinatorMessage::QueryCandidateVotes(vec![(session_index, candidate_hash)], tx), DisputeCoordinatorMessage::QueryCandidateVotes(vec![(session_index, candidate_hash)], tx),
)) ));
.await;
rx.await rx.await
.map(|v| v.get(0).map(|inner| inner.to_owned().2)) .map(|v| v.get(0).map(|inner| inner.to_owned().2))
.map_err(|_| JfyiError::AskCandidateVotesCanceled) .map_err(|_| JfyiError::AskCandidateVotesCanceled)
+1
View File
@@ -415,6 +415,7 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut hand
signal=OverseerSignal, signal=OverseerSignal,
error=SubsystemError, error=SubsystemError,
network=NetworkBridgeEvent<VersionedValidationProtocol>, network=NetworkBridgeEvent<VersionedValidationProtocol>,
message_capacity=2048,
)] )]
pub struct Overseer<SupportsParachains> { pub struct Overseer<SupportsParachains> {
#[subsystem(no_dispatch, CandidateValidationMessage)] #[subsystem(no_dispatch, CandidateValidationMessage)]