Replace request-response incoming requests queue with async-channel (#14199)

This commit is contained in:
Dmitry Markin
2023-05-24 12:24:09 +03:00
committed by GitHub
parent 4766ec5531
commit db90f3b622
13 changed files with 53 additions and 53 deletions
@@ -23,10 +23,7 @@ use crate::{
};
use codec::{Decode, Encode};
use futures::{
channel::{mpsc, oneshot},
stream::StreamExt,
};
use futures::{channel::oneshot, stream::StreamExt};
use libp2p::PeerId;
use log::debug;
use lru::LruCache;
@@ -136,7 +133,7 @@ enum SeenRequestsValue {
/// Handler for incoming block requests from a remote peer.
pub struct BlockRequestHandler<B: BlockT, Client> {
client: Arc<Client>,
request_receiver: mpsc::Receiver<IncomingRequest>,
request_receiver: async_channel::Receiver<IncomingRequest>,
/// Maps from request to number of times we have seen this request.
///
/// This is used to check if a peer is spamming us with the same request.
@@ -157,7 +154,7 @@ where
) -> (Self, ProtocolConfig) {
// Reserve enough request slots for one request per peer when we are at the maximum
// number of peers.
let (tx, request_receiver) = mpsc::channel(num_peer_hint);
let (tx, request_receiver) = async_channel::bounded(num_peer_hint);
let mut protocol_config = generate_protocol_config(
protocol_id,
@@ -20,10 +20,7 @@
use crate::schema::v1::{KeyValueStateEntry, StateEntry, StateRequest, StateResponse};
use codec::{Decode, Encode};
use futures::{
channel::{mpsc, oneshot},
stream::StreamExt,
};
use futures::{channel::oneshot, stream::StreamExt};
use libp2p::PeerId;
use log::{debug, trace};
use lru::LruCache;
@@ -114,7 +111,7 @@ enum SeenRequestsValue {
/// Handler for incoming block requests from a remote peer.
pub struct StateRequestHandler<B: BlockT, Client> {
client: Arc<Client>,
request_receiver: mpsc::Receiver<IncomingRequest>,
request_receiver: async_channel::Receiver<IncomingRequest>,
/// Maps from request to number of times we have seen this request.
///
/// This is used to check if a peer is spamming us with the same request.
@@ -135,7 +132,7 @@ where
) -> (Self, ProtocolConfig) {
// Reserve enough request slots for one request per peer when we are at the maximum
// number of peers.
let (tx, request_receiver) = mpsc::channel(num_peer_hint);
let (tx, request_receiver) = async_channel::bounded(num_peer_hint);
let mut protocol_config = generate_protocol_config(
protocol_id,
@@ -17,10 +17,7 @@
//! Helper for handling (i.e. answering) grandpa warp sync requests from a remote peer.
use codec::Decode;
use futures::{
channel::{mpsc, oneshot},
stream::StreamExt,
};
use futures::{channel::oneshot, stream::StreamExt};
use log::debug;
use sc_network::{
@@ -36,6 +33,9 @@ use std::{sync::Arc, time::Duration};
const MAX_RESPONSE_SIZE: u64 = 16 * 1024 * 1024;
/// Incoming warp requests bounded queue size.
const MAX_WARP_REQUEST_QUEUE: usize = 20;
/// Generates a [`RequestResponseConfig`] for the grandpa warp sync request protocol, refusing
/// incoming requests.
pub fn generate_request_response_config<Hash: AsRef<[u8]>>(
@@ -72,7 +72,7 @@ fn generate_legacy_protocol_name(protocol_id: ProtocolId) -> String {
/// Handler for incoming grandpa warp sync requests from a remote peer.
pub struct RequestHandler<TBlock: BlockT> {
backend: Arc<dyn WarpSyncProvider<TBlock>>,
request_receiver: mpsc::Receiver<IncomingRequest>,
request_receiver: async_channel::Receiver<IncomingRequest>,
}
impl<TBlock: BlockT> RequestHandler<TBlock> {
@@ -83,7 +83,7 @@ impl<TBlock: BlockT> RequestHandler<TBlock> {
fork_id: Option<&str>,
backend: Arc<dyn WarpSyncProvider<TBlock>>,
) -> (Self, RequestResponseConfig) {
let (tx, request_receiver) = mpsc::channel(20);
let (tx, request_receiver) = async_channel::bounded(MAX_WARP_REQUEST_QUEUE);
let mut request_response_config =
generate_request_response_config(protocol_id, genesis_hash, fork_id);