mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 09:57:56 +00:00
Rewrite reconnect_after_disconnect test (#5351)
This commit is contained in:
@@ -345,13 +345,21 @@ fn reconnect_after_disconnect() {
|
||||
let mut service1_state = ServiceState::NotConnected;
|
||||
let mut service2_state = ServiceState::NotConnected;
|
||||
|
||||
// Run the events loops.
|
||||
futures::executor::block_on(future::poll_fn(|cx| -> Poll<Result<_, io::Error>> {
|
||||
futures::executor::block_on(async move {
|
||||
loop {
|
||||
let mut service1_not_ready = false;
|
||||
// Grab next event from services.
|
||||
let event = {
|
||||
let s1 = service1.next();
|
||||
let s2 = service2.next();
|
||||
futures::pin_mut!(s1, s2);
|
||||
match future::select(s1, s2).await {
|
||||
future::Either::Left((ev, _)) => future::Either::Left(ev),
|
||||
future::Either::Right((ev, _)) => future::Either::Right(ev),
|
||||
}
|
||||
};
|
||||
|
||||
match service1.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(GenericProtoOut::CustomProtocolOpen { .. })) => {
|
||||
match event {
|
||||
future::Either::Left(GenericProtoOut::CustomProtocolOpen { .. }) => {
|
||||
match service1_state {
|
||||
ServiceState::NotConnected => {
|
||||
service1_state = ServiceState::FirstConnec;
|
||||
@@ -363,19 +371,14 @@ fn reconnect_after_disconnect() {
|
||||
ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(),
|
||||
}
|
||||
},
|
||||
Poll::Ready(Some(GenericProtoOut::CustomProtocolClosed { .. })) => {
|
||||
future::Either::Left(GenericProtoOut::CustomProtocolClosed { .. }) => {
|
||||
match service1_state {
|
||||
ServiceState::FirstConnec => service1_state = ServiceState::Disconnected,
|
||||
ServiceState::ConnectedAgain| ServiceState::NotConnected |
|
||||
ServiceState::Disconnected => panic!(),
|
||||
}
|
||||
},
|
||||
Poll::Pending => service1_not_ready = true,
|
||||
_ => panic!()
|
||||
}
|
||||
|
||||
match service2.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(GenericProtoOut::CustomProtocolOpen { .. })) => {
|
||||
future::Either::Right(GenericProtoOut::CustomProtocolOpen { .. }) => {
|
||||
match service2_state {
|
||||
ServiceState::NotConnected => {
|
||||
service2_state = ServiceState::FirstConnec;
|
||||
@@ -387,43 +390,43 @@ fn reconnect_after_disconnect() {
|
||||
ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(),
|
||||
}
|
||||
},
|
||||
Poll::Ready(Some(GenericProtoOut::CustomProtocolClosed { .. })) => {
|
||||
future::Either::Right(GenericProtoOut::CustomProtocolClosed { .. }) => {
|
||||
match service2_state {
|
||||
ServiceState::FirstConnec => service2_state = ServiceState::Disconnected,
|
||||
ServiceState::ConnectedAgain| ServiceState::NotConnected |
|
||||
ServiceState::Disconnected => panic!(),
|
||||
}
|
||||
},
|
||||
Poll::Pending if service1_not_ready => break,
|
||||
Poll::Pending => {}
|
||||
_ => panic!()
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if service1_state == ServiceState::ConnectedAgain && service2_state == ServiceState::ConnectedAgain {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if service1_state == ServiceState::ConnectedAgain && service2_state == ServiceState::ConnectedAgain {
|
||||
Poll::Ready(Ok(()))
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
})).unwrap();
|
||||
// Now that the two services have disconnected and reconnected, wait for 3 seconds and
|
||||
// check whether they're still connected.
|
||||
let mut delay = futures_timer::Delay::new(Duration::from_secs(3));
|
||||
|
||||
// Do a second 3-seconds run to make sure we don't get disconnected immediately again.
|
||||
let mut delay = futures_timer::Delay::new(Duration::from_secs(3));
|
||||
futures::executor::block_on(future::poll_fn(|cx| -> Poll<Result<_, io::Error>> {
|
||||
match service1.poll_next_unpin(cx) {
|
||||
Poll::Pending => {},
|
||||
_ => panic!()
|
||||
}
|
||||
loop {
|
||||
// Grab next event from services.
|
||||
let event = {
|
||||
let s1 = service1.next();
|
||||
let s2 = service2.next();
|
||||
futures::pin_mut!(s1, s2);
|
||||
match future::select(future::select(s1, s2), &mut delay).await {
|
||||
future::Either::Right(_) => break, // success
|
||||
future::Either::Left((future::Either::Left((ev, _)), _)) => ev,
|
||||
future::Either::Left((future::Either::Right((ev, _)), _)) => ev,
|
||||
}
|
||||
};
|
||||
|
||||
match service2.poll_next_unpin(cx) {
|
||||
Poll::Pending => {},
|
||||
_ => panic!()
|
||||
match event {
|
||||
GenericProtoOut::CustomProtocolOpen { .. } |
|
||||
GenericProtoOut::CustomProtocolClosed { .. } => panic!(),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
if let Poll::Ready(()) = delay.poll_unpin(cx) {
|
||||
Poll::Ready(Ok(()))
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
})).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user