[companion] Fix request-response protocols backpressure mechanism (#7276)

* Replace request-response incoming requests queue with `async-channel`

* Fix unused imports

* Fix channel type in tests

* Fix channel type in tests (part 2)

* Fix unused imports

* update lockfile for {"substrate"}

---------

Co-authored-by: parity-processbot <>
This commit is contained in:
Dmitry Markin
2023-05-24 13:34:06 +03:00
committed by GitHub
parent d219260cac
commit 86564f1d66
7 changed files with 352 additions and 344 deletions
+338 -328
View File
File diff suppressed because it is too large Load Diff
@@ -19,7 +19,7 @@ use super::*;
use std::{collections::HashSet, sync::Arc, time::Duration};
use assert_matches::assert_matches;
use futures::{executor, future, Future, SinkExt};
use futures::{executor, future, Future};
use futures_timer::Delay;
use parity_scale_codec::{Decode, Encode};
@@ -25,6 +25,7 @@ lru = "0.9.0"
indexmap = "1.9.1"
[dev-dependencies]
async-channel = "1.8.0"
async-trait = "0.1.57"
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
@@ -26,9 +26,9 @@ use std::{
use assert_matches::assert_matches;
use futures::{
channel::{mpsc, oneshot},
channel::oneshot,
future::{poll_fn, ready},
pin_mut, Future, SinkExt,
pin_mut, Future,
};
use futures_timer::Delay;
use parity_scale_codec::{Decode, Encode};
@@ -590,14 +590,14 @@ fn dispute_retries_and_works_across_session_boundaries() {
}
async fn send_network_dispute_request(
req_tx: &mut mpsc::Sender<sc_network::config::IncomingRequest>,
req_tx: &mut async_channel::Sender<sc_network::config::IncomingRequest>,
peer: PeerId,
message: DisputeRequest,
) -> oneshot::Receiver<sc_network::config::OutgoingResponse> {
let (pending_response, rx_response) = oneshot::channel();
let req =
sc_network::config::IncomingRequest { peer, payload: message.encode(), pending_response };
req_tx.feed(req).await.unwrap();
req_tx.send(req).await.unwrap();
rx_response
}
@@ -606,7 +606,7 @@ async fn send_network_dispute_request(
/// Passed in function will be called while votes are still being imported.
async fn nested_network_dispute_request<'a, F, O>(
handle: &'a mut TestSubsystemContextHandle<DisputeDistributionMessage>,
req_tx: &'a mut mpsc::Sender<sc_network::config::IncomingRequest>,
req_tx: &'a mut async_channel::Sender<sc_network::config::IncomingRequest>,
peer: PeerId,
message: DisputeRequest,
import_result: ImportStatementsResult,
@@ -615,7 +615,7 @@ async fn nested_network_dispute_request<'a, F, O>(
) where
F: FnOnce(
&'a mut TestSubsystemContextHandle<DisputeDistributionMessage>,
&'a mut mpsc::Sender<sc_network::config::IncomingRequest>,
&'a mut async_channel::Sender<sc_network::config::IncomingRequest>,
DisputeRequest,
) -> O
+ 'a,
@@ -6,6 +6,7 @@ edition.workspace = true
description = "Primitives types for the Node-side"
[dependencies]
async-channel = "1.8.0"
async-trait = "0.1.57"
hex = "0.4.3"
polkadot-primitives = { path = "../../../primitives" }
@@ -16,10 +16,7 @@
use std::marker::PhantomData;
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use futures::{channel::oneshot, StreamExt};
use parity_scale_codec::{Decode, Encode};
@@ -208,7 +205,7 @@ pub struct OutgoingResponse<Response> {
///
/// Takes care of decoding and handling of invalid encoded requests.
pub struct IncomingRequestReceiver<Req> {
raw: mpsc::Receiver<netconfig::IncomingRequest>,
raw: async_channel::Receiver<netconfig::IncomingRequest>,
phantom: PhantomData<Req>,
}
@@ -34,7 +34,6 @@
use std::{collections::HashMap, time::Duration, u64};
use futures::channel::mpsc;
use polkadot_primitives::{MAX_CODE_SIZE, MAX_POV_SIZE};
use strum::{EnumIter, IntoEnumIterator};
@@ -144,8 +143,8 @@ impl Protocol {
pub fn get_config(
self,
req_protocol_names: &ReqProtocolNames,
) -> (mpsc::Receiver<network::IncomingRequest>, RequestResponseConfig) {
let (tx, rx) = mpsc::channel(self.get_channel_size());
) -> (async_channel::Receiver<network::IncomingRequest>, RequestResponseConfig) {
let (tx, rx) = async_channel::bounded(self.get_channel_size());
let cfg = self.create_config(req_protocol_names, Some(tx));
(rx, cfg)
}
@@ -153,7 +152,7 @@ impl Protocol {
fn create_config(
self,
req_protocol_names: &ReqProtocolNames,
tx: Option<mpsc::Sender<network::IncomingRequest>>,
tx: Option<async_channel::Sender<network::IncomingRequest>>,
) -> RequestResponseConfig {
let name = req_protocol_names.get_name(self);
let fallback_names = self.get_fallback_names();