add fallback request for req-response protocols (#2771)

Previously, it was only possible to retry the same request on a
different protocol name that had the exact same binary payloads.

Introduce a way of trying a different request on a different protocol if
the first one fails with Unsupported protocol.

This helps with adding new req-response versions in polkadot while
preserving compatibility with unupgraded nodes.

The way req-response protocols were bumped previously was that they were
bundled with some other notifications protocol upgrade, like for async
backing (but that is more complicated, especially if the feature does
not require any changes to a notifications protocol). Will be needed for
implementing https://github.com/polkadot-fellows/RFCs/pull/47

TODO:
- [x]  add tests
- [x] add guidance docs in polkadot about req-response protocol
versioning
This commit is contained in:
Alin Dima
2024-01-10 15:19:50 +02:00
committed by GitHub
parent af2e30e383
commit f2a750ee86
29 changed files with 802 additions and 304 deletions
+11 -4
View File
@@ -231,13 +231,20 @@ impl<B: BlockT> Behaviour<B> {
pub fn send_request(
&mut self,
target: &PeerId,
protocol: &str,
protocol: ProtocolName,
request: Vec<u8>,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
connect: IfDisconnected,
) {
self.request_responses
.send_request(target, protocol, request, pending_response, connect)
self.request_responses.send_request(
target,
protocol,
request,
fallback_request,
pending_response,
connect,
)
}
/// Returns a shared reference to the user protocol.
+369 -45
View File
@@ -56,6 +56,7 @@ use libp2p::{
use std::{
collections::{hash_map::Entry, HashMap},
io, iter,
ops::Deref,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
@@ -172,6 +173,13 @@ pub struct OutgoingResponse {
pub sent_feedback: Option<oneshot::Sender<()>>,
}
/// Information stored about a pending request.
struct PendingRequest {
started_at: Instant,
response_tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
}
/// When sending a request, what to do on a disconnected recipient.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum IfDisconnected {
@@ -264,8 +272,7 @@ pub struct RequestResponsesBehaviour {
>,
/// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply.
pending_requests:
HashMap<ProtocolRequestId, (Instant, oneshot::Sender<Result<Vec<u8>, RequestFailure>>)>,
pending_requests: HashMap<ProtocolRequestId, PendingRequest>,
/// Whenever an incoming request arrives, a `Future` is added to this list and will yield the
/// start time and the response to send back to the remote.
@@ -348,29 +355,25 @@ impl RequestResponsesBehaviour {
pub fn send_request(
&mut self,
target: &PeerId,
protocol_name: &str,
protocol_name: ProtocolName,
request: Vec<u8>,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
connect: IfDisconnected,
) {
log::trace!(target: "sub-libp2p", "send request to {target} ({protocol_name:?}), {} bytes", request.len());
if let Some((protocol, _)) = self.protocols.get_mut(protocol_name) {
if protocol.is_connected(target) || connect.should_connect() {
let request_id = protocol.send_request(target, request);
let prev_req_id = self.pending_requests.insert(
(protocol_name.to_string().into(), request_id).into(),
(Instant::now(), pending_response),
);
debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
} else if pending_response.send(Err(RequestFailure::NotConnected)).is_err() {
log::debug!(
target: "sub-libp2p",
"Not connected to peer {:?}. At the same time local \
node is no longer interested in the result.",
target,
);
}
if let Some((protocol, _)) = self.protocols.get_mut(protocol_name.deref()) {
Self::send_request_inner(
protocol,
&mut self.pending_requests,
target,
protocol_name,
request,
fallback_request,
pending_response,
connect,
)
} else if pending_response.send(Err(RequestFailure::UnknownProtocol)).is_err() {
log::debug!(
target: "sub-libp2p",
@@ -380,6 +383,37 @@ impl RequestResponsesBehaviour {
);
}
}
fn send_request_inner(
behaviour: &mut Behaviour<GenericCodec>,
pending_requests: &mut HashMap<ProtocolRequestId, PendingRequest>,
target: &PeerId,
protocol_name: ProtocolName,
request: Vec<u8>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
connect: IfDisconnected,
) {
if behaviour.is_connected(target) || connect.should_connect() {
let request_id = behaviour.send_request(target, request);
let prev_req_id = pending_requests.insert(
(protocol_name.to_string().into(), request_id).into(),
PendingRequest {
started_at: Instant::now(),
response_tx: pending_response,
fallback_request,
},
);
debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
} else if pending_response.send(Err(RequestFailure::NotConnected)).is_err() {
log::debug!(
target: "sub-libp2p",
"Not connected to peer {:?}. At the same time local \
node is no longer interested in the result.",
target,
);
}
}
}
impl NetworkBehaviour for RequestResponsesBehaviour {
@@ -596,8 +630,10 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
}
}
let mut fallback_requests = vec![];
// Poll request-responses protocols.
for (protocol, (behaviour, resp_builder)) in &mut self.protocols {
for (protocol, (ref mut behaviour, ref mut resp_builder)) in &mut self.protocols {
'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx, params) {
let ev = match ev {
// Main events we are interested in.
@@ -698,17 +734,21 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
.pending_requests
.remove(&(protocol.clone(), request_id).into())
{
Some((started, pending_response)) => {
Some(PendingRequest { started_at, response_tx, .. }) => {
log::trace!(
target: "sub-libp2p",
"received response from {peer} ({protocol:?}), {} bytes",
response.as_ref().map_or(0usize, |response| response.len()),
);
let delivered = pending_response
.send(response.map_err(|()| RequestFailure::Refused))
let delivered = response_tx
.send(
response
.map_err(|()| RequestFailure::Refused)
.map(|resp| (resp, protocol.clone())),
)
.map_err(|_| RequestFailure::Obsolete);
(started, delivered)
(started_at, delivered)
},
None => {
log::warn!(
@@ -742,8 +782,34 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
.pending_requests
.remove(&(protocol.clone(), request_id).into())
{
Some((started, pending_response)) => {
if pending_response
Some(PendingRequest {
started_at,
response_tx,
fallback_request,
}) => {
// Try using the fallback request if the protocol was not
// supported.
if let OutboundFailure::UnsupportedProtocols = error {
if let Some((fallback_request, fallback_protocol)) =
fallback_request
{
log::trace!(
target: "sub-libp2p",
"Request with id {:?} failed. Trying the fallback protocol. {}",
request_id,
fallback_protocol.deref()
);
fallback_requests.push((
peer,
fallback_protocol,
fallback_request,
response_tx,
));
continue
}
}
if response_tx
.send(Err(RequestFailure::Network(error.clone())))
.is_err()
{
@@ -754,7 +820,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
request_id,
);
}
started
started_at
},
None => {
log::warn!(
@@ -825,6 +891,25 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
}
}
// Send out fallback requests.
for (peer, protocol, request, pending_response) in fallback_requests.drain(..) {
if let Some((behaviour, _)) = self.protocols.get_mut(&protocol) {
Self::send_request_inner(
behaviour,
&mut self.pending_requests,
&peer,
protocol,
request,
None,
pending_response,
// We can error if not connected because the
// previous attempt would have tried to establish a
// connection already or errored and we wouldn't have gotten here.
IfDisconnected::ImmediateError,
);
}
}
break Poll::Pending
}
}
@@ -976,6 +1061,7 @@ mod tests {
use super::*;
use crate::mock::MockPeerStore;
use assert_matches::assert_matches;
use futures::{channel::oneshot, executor::LocalPool, task::Spawn};
use libp2p::{
core::{
@@ -1025,7 +1111,7 @@ mod tests {
#[test]
fn basic_request_response_works() {
let protocol_name = "/test/req-resp/1";
let protocol_name = ProtocolName::from("/test/req-resp/1");
let mut pool = LocalPool::new();
// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
@@ -1053,7 +1139,7 @@ mod tests {
.unwrap();
let protocol_config = ProtocolConfig {
name: From::from(protocol_name),
name: protocol_name.clone(),
fallback_names: Vec::new(),
max_request_size: 1024,
max_response_size: 1024 * 1024,
@@ -1102,8 +1188,9 @@ mod tests {
let (sender, receiver) = oneshot::channel();
swarm.behaviour_mut().send_request(
&peer_id,
protocol_name,
protocol_name.clone(),
b"this is a request".to_vec(),
None,
sender,
IfDisconnected::ImmediateError,
);
@@ -1118,13 +1205,16 @@ mod tests {
}
}
assert_eq!(response_receiver.unwrap().await.unwrap().unwrap(), b"this is a response");
assert_eq!(
response_receiver.unwrap().await.unwrap().unwrap(),
(b"this is a response".to_vec(), protocol_name)
);
});
}
#[test]
fn max_response_size_exceeded() {
let protocol_name = "/test/req-resp/1";
let protocol_name = ProtocolName::from("/test/req-resp/1");
let mut pool = LocalPool::new();
// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
@@ -1150,7 +1240,7 @@ mod tests {
.unwrap();
let protocol_config = ProtocolConfig {
name: From::from(protocol_name),
name: protocol_name.clone(),
fallback_names: Vec::new(),
max_request_size: 1024,
max_response_size: 8, // <-- important for the test
@@ -1201,8 +1291,9 @@ mod tests {
let (sender, receiver) = oneshot::channel();
swarm.behaviour_mut().send_request(
&peer_id,
protocol_name,
protocol_name.clone(),
b"this is a request".to_vec(),
None,
sender,
IfDisconnected::ImmediateError,
);
@@ -1236,14 +1327,14 @@ mod tests {
/// See [`ProtocolRequestId`] for additional information.
#[test]
fn request_id_collision() {
let protocol_name_1 = "/test/req-resp-1/1";
let protocol_name_2 = "/test/req-resp-2/1";
let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1");
let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1");
let mut pool = LocalPool::new();
let mut swarm_1 = {
let protocol_configs = vec![
ProtocolConfig {
name: From::from(protocol_name_1),
name: protocol_name_1.clone(),
fallback_names: Vec::new(),
max_request_size: 1024,
max_response_size: 1024 * 1024,
@@ -1251,7 +1342,7 @@ mod tests {
inbound_queue: None,
},
ProtocolConfig {
name: From::from(protocol_name_2),
name: protocol_name_2.clone(),
fallback_names: Vec::new(),
max_request_size: 1024,
max_response_size: 1024 * 1024,
@@ -1269,7 +1360,7 @@ mod tests {
let protocol_configs = vec![
ProtocolConfig {
name: From::from(protocol_name_1),
name: protocol_name_1.clone(),
fallback_names: Vec::new(),
max_request_size: 1024,
max_response_size: 1024 * 1024,
@@ -1277,7 +1368,7 @@ mod tests {
inbound_queue: Some(tx_1),
},
ProtocolConfig {
name: From::from(protocol_name_2),
name: protocol_name_2.clone(),
fallback_names: Vec::new(),
max_request_size: 1024,
max_response_size: 1024 * 1024,
@@ -1359,15 +1450,17 @@ mod tests {
let (sender_2, receiver_2) = oneshot::channel();
swarm_1.behaviour_mut().send_request(
&peer_id,
protocol_name_1,
protocol_name_1.clone(),
b"this is a request".to_vec(),
None,
sender_1,
IfDisconnected::ImmediateError,
);
swarm_1.behaviour_mut().send_request(
&peer_id,
protocol_name_2,
protocol_name_2.clone(),
b"this is a request".to_vec(),
None,
sender_2,
IfDisconnected::ImmediateError,
);
@@ -1385,8 +1478,239 @@ mod tests {
}
}
let (response_receiver_1, response_receiver_2) = response_receivers.unwrap();
assert_eq!(response_receiver_1.await.unwrap().unwrap(), b"this is a response");
assert_eq!(response_receiver_2.await.unwrap().unwrap(), b"this is a response");
assert_eq!(
response_receiver_1.await.unwrap().unwrap(),
(b"this is a response".to_vec(), protocol_name_1)
);
assert_eq!(
response_receiver_2.await.unwrap().unwrap(),
(b"this is a response".to_vec(), protocol_name_2)
);
});
}
#[test]
fn request_fallback() {
let protocol_name_1 = ProtocolName::from("/test/req-resp/2");
let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1");
let protocol_name_2 = ProtocolName::from("/test/another");
let mut pool = LocalPool::new();
let protocol_config_1 = ProtocolConfig {
name: protocol_name_1.clone(),
fallback_names: Vec::new(),
max_request_size: 1024,
max_response_size: 1024 * 1024,
request_timeout: Duration::from_secs(30),
inbound_queue: None,
};
let protocol_config_1_fallback = ProtocolConfig {
name: protocol_name_1_fallback.clone(),
fallback_names: Vec::new(),
max_request_size: 1024,
max_response_size: 1024 * 1024,
request_timeout: Duration::from_secs(30),
inbound_queue: None,
};
let protocol_config_2 = ProtocolConfig {
name: protocol_name_2.clone(),
fallback_names: Vec::new(),
max_request_size: 1024,
max_response_size: 1024 * 1024,
request_timeout: Duration::from_secs(30),
inbound_queue: None,
};
// This swarm only speaks protocol_name_1_fallback and protocol_name_2.
// It only responds to requests.
let mut older_swarm = {
let (tx_1, mut rx_1) = async_channel::bounded::<IncomingRequest>(64);
let (tx_2, mut rx_2) = async_channel::bounded::<IncomingRequest>(64);
let mut protocol_config_1_fallback = protocol_config_1_fallback.clone();
protocol_config_1_fallback.inbound_queue = Some(tx_1);
let mut protocol_config_2 = protocol_config_2.clone();
protocol_config_2.inbound_queue = Some(tx_2);
pool.spawner()
.spawn_obj(
async move {
for _ in 0..2 {
if let Some(rq) = rx_1.next().await {
let (fb_tx, fb_rx) = oneshot::channel();
assert_eq!(rq.payload, b"request on protocol /test/req-resp/1");
let _ = rq.pending_response.send(super::OutgoingResponse {
result: Ok(
b"this is a response on protocol /test/req-resp/1".to_vec()
),
reputation_changes: Vec::new(),
sent_feedback: Some(fb_tx),
});
fb_rx.await.unwrap();
}
}
if let Some(rq) = rx_2.next().await {
let (fb_tx, fb_rx) = oneshot::channel();
assert_eq!(rq.payload, b"request on protocol /test/other");
let _ = rq.pending_response.send(super::OutgoingResponse {
result: Ok(b"this is a response on protocol /test/other".to_vec()),
reputation_changes: Vec::new(),
sent_feedback: Some(fb_tx),
});
fb_rx.await.unwrap();
}
}
.boxed()
.into(),
)
.unwrap();
build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter())
};
// This swarm speaks all protocols.
let mut new_swarm = build_swarm(
vec![
protocol_config_1.clone(),
protocol_config_1_fallback.clone(),
protocol_config_2.clone(),
]
.into_iter(),
);
{
let dial_addr = older_swarm.1.clone();
Swarm::dial(&mut new_swarm.0, dial_addr).unwrap();
}
// Running `older_swarm`` in the background.
pool.spawner()
.spawn_obj({
async move {
loop {
_ = older_swarm.0.select_next_some().await;
}
}
.boxed()
.into()
})
.unwrap();
// Run the newer swarm. Attempt to make requests on all protocols.
let (mut swarm, _) = new_swarm;
let mut older_peer_id = None;
pool.run_until(async move {
let mut response_receiver = None;
// Try the new protocol with a fallback.
loop {
match swarm.select_next_some().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
older_peer_id = Some(peer_id);
let (sender, receiver) = oneshot::channel();
swarm.behaviour_mut().send_request(
&peer_id,
protocol_name_1.clone(),
b"request on protocol /test/req-resp/2".to_vec(),
Some((
b"request on protocol /test/req-resp/1".to_vec(),
protocol_config_1_fallback.name.clone(),
)),
sender,
IfDisconnected::ImmediateError,
);
response_receiver = Some(receiver);
},
SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
result.unwrap();
break
},
_ => {},
}
}
assert_eq!(
response_receiver.unwrap().await.unwrap().unwrap(),
(
b"this is a response on protocol /test/req-resp/1".to_vec(),
protocol_name_1_fallback.clone()
)
);
// Try the old protocol with a useless fallback.
let (sender, response_receiver) = oneshot::channel();
swarm.behaviour_mut().send_request(
older_peer_id.as_ref().unwrap(),
protocol_name_1_fallback.clone(),
b"request on protocol /test/req-resp/1".to_vec(),
Some((
b"dummy request, will fail if processed".to_vec(),
protocol_config_1_fallback.name.clone(),
)),
sender,
IfDisconnected::ImmediateError,
);
loop {
match swarm.select_next_some().await {
SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
result.unwrap();
break
},
_ => {},
}
}
assert_eq!(
response_receiver.await.unwrap().unwrap(),
(
b"this is a response on protocol /test/req-resp/1".to_vec(),
protocol_name_1_fallback.clone()
)
);
// Try the new protocol with no fallback. Should fail.
let (sender, response_receiver) = oneshot::channel();
swarm.behaviour_mut().send_request(
older_peer_id.as_ref().unwrap(),
protocol_name_1.clone(),
b"request on protocol /test/req-resp-2".to_vec(),
None,
sender,
IfDisconnected::ImmediateError,
);
loop {
match swarm.select_next_some().await {
SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
assert_matches!(
result.unwrap_err(),
RequestFailure::Network(OutboundFailure::UnsupportedProtocols)
);
break
},
_ => {},
}
}
assert!(response_receiver.await.unwrap().is_err());
// Try the other protocol with no fallback.
let (sender, response_receiver) = oneshot::channel();
swarm.behaviour_mut().send_request(
older_peer_id.as_ref().unwrap(),
protocol_name_2.clone(),
b"request on protocol /test/other".to_vec(),
None,
sender,
IfDisconnected::ImmediateError,
);
loop {
match swarm.select_next_some().await {
SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
result.unwrap();
break
},
_ => {},
}
}
assert_eq!(
response_receiver.await.unwrap().unwrap(),
(b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone())
);
});
}
}
+11 -5
View File
@@ -1048,11 +1048,12 @@ where
target: PeerId,
protocol: ProtocolName,
request: Vec<u8>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
connect: IfDisconnected,
) -> Result<Vec<u8>, RequestFailure> {
) -> Result<(Vec<u8>, ProtocolName), RequestFailure> {
let (tx, rx) = oneshot::channel();
self.start_request(target, protocol, request, tx, connect);
self.start_request(target, protocol, request, fallback_request, tx, connect);
match rx.await {
Ok(v) => v,
@@ -1068,13 +1069,15 @@ where
target: PeerId,
protocol: ProtocolName,
request: Vec<u8>,
tx: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
connect: IfDisconnected,
) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request {
target,
protocol: protocol.into(),
request,
fallback_request,
pending_response: tx,
connect,
});
@@ -1160,7 +1163,8 @@ enum ServiceToWorkerMsg {
target: PeerId,
protocol: ProtocolName,
request: Vec<u8>,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
connect: IfDisconnected,
},
NetworkStatus {
@@ -1287,13 +1291,15 @@ where
target,
protocol,
request,
fallback_request,
pending_response,
connect,
} => {
self.network_service.behaviour_mut().send_request(
&target,
&protocol,
protocol,
request,
fallback_request,
pending_response,
connect,
);
+16 -6
View File
@@ -551,8 +551,9 @@ pub trait NetworkRequest {
target: PeerId,
protocol: ProtocolName,
request: Vec<u8>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
connect: IfDisconnected,
) -> Result<Vec<u8>, RequestFailure>;
) -> Result<(Vec<u8>, ProtocolName), RequestFailure>;
/// Variation of `request` which starts a request whose response is delivered on a provided
/// channel.
@@ -569,7 +570,8 @@ pub trait NetworkRequest {
target: PeerId,
protocol: ProtocolName,
request: Vec<u8>,
tx: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
connect: IfDisconnected,
);
}
@@ -585,13 +587,20 @@ where
target: PeerId,
protocol: ProtocolName,
request: Vec<u8>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
connect: IfDisconnected,
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, RequestFailure>> + Send + 'async_trait>>
) -> Pin<
Box<
dyn Future<Output = Result<(Vec<u8>, ProtocolName), RequestFailure>>
+ Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
T::request(self, target, protocol, request, connect)
T::request(self, target, protocol, request, fallback_request, connect)
}
fn start_request(
@@ -599,10 +608,11 @@ where
target: PeerId,
protocol: ProtocolName,
request: Vec<u8>,
tx: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
connect: IfDisconnected,
) {
T::start_request(self, target, protocol, request, tx, connect)
T::start_request(self, target, protocol, request, fallback_request, tx, connect)
}
}