mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 02:21:14 +00:00
cleanup stream polls (#3397)
* metered-channel: remove dead code * we don't need no fuse * even more
This commit is contained in:
@@ -191,7 +191,7 @@ impl ProvisioningJob {
|
||||
};
|
||||
loop {
|
||||
futures::select! {
|
||||
msg = self.receiver.next().fuse() => match msg {
|
||||
msg = self.receiver.next() => match msg {
|
||||
Some(RequestInherentData(_, return_sender)) => {
|
||||
let _span = span.child("req-inherent-data");
|
||||
let _timer = self.metrics.time_request_inherent_data();
|
||||
|
||||
@@ -153,34 +153,3 @@ impl<T> MeteredSender<T> {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> futures::sink::Sink<T> for MeteredSender<T> {
|
||||
type Error = mpsc::SendError;
|
||||
|
||||
fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
|
||||
Pin::new(&mut self.inner).start_send(item)
|
||||
}
|
||||
|
||||
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Pin::new(&mut self.inner).poll_ready(cx)
|
||||
}
|
||||
|
||||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
match Pin::new(&mut self.inner).poll_close(cx) {
|
||||
val @ Poll::Ready(_)=> {
|
||||
val
|
||||
}
|
||||
other => other,
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
match Pin::new(&mut self.inner).poll_flush(cx) {
|
||||
val @ Poll::Ready(_)=> {
|
||||
self.meter.note_sent();
|
||||
val
|
||||
}
|
||||
other => other,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -830,7 +830,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
|
||||
}
|
||||
}
|
||||
},
|
||||
req_res_event = request_multiplexer.next().fuse() => match req_res_event {
|
||||
req_res_event = request_multiplexer.next() => match req_res_event {
|
||||
None => return Err(UnexpectedAbort::RequestStreamConcluded),
|
||||
Some(Err(err)) => {
|
||||
network_service.report_peer(err.peer, MALFORMED_MESSAGE_COST);
|
||||
|
||||
@@ -595,14 +595,14 @@ impl Message {
|
||||
// We are only fusing here to make `select` happy, in reality we will quit if one of those
|
||||
// streams end:
|
||||
let from_overseer = ctx.recv().fuse();
|
||||
let from_requester = from_requester.next().fuse();
|
||||
let from_responder = from_responder.next().fuse();
|
||||
let from_requester = from_requester.next();
|
||||
let from_responder = from_responder.next();
|
||||
futures::pin_mut!(from_overseer, from_requester, from_responder);
|
||||
futures::select!(
|
||||
futures::select! {
|
||||
msg = from_overseer => Message::Subsystem(msg.map_err(Fatal::SubsystemReceive)),
|
||||
msg = from_requester => Message::Requester(msg),
|
||||
msg = from_responder => Message::Responder(msg),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -909,8 +909,8 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
|
||||
}
|
||||
}
|
||||
|
||||
let mut await_message = self.messages.next().fuse();
|
||||
let mut await_signal = self.signals.next().fuse();
|
||||
let mut await_message = self.messages.next();
|
||||
let mut await_signal = self.signals.next();
|
||||
let signals_received = self.signals_received.load();
|
||||
let pending_incoming = &mut self.pending_incoming;
|
||||
|
||||
@@ -1901,13 +1901,7 @@ where
|
||||
|
||||
loop {
|
||||
select! {
|
||||
msg = self.events_rx.next().fuse() => {
|
||||
let msg = if let Some(msg) = msg {
|
||||
msg
|
||||
} else {
|
||||
continue
|
||||
};
|
||||
|
||||
msg = self.events_rx.select_next_some() => {
|
||||
match msg {
|
||||
Event::MsgToSubsystem(msg) => {
|
||||
self.route_message(msg.into()).await?;
|
||||
@@ -1927,16 +1921,7 @@ where
|
||||
}
|
||||
}
|
||||
},
|
||||
msg = self.to_overseer_rx.next() => {
|
||||
let msg = match msg {
|
||||
Some(m) => m,
|
||||
None => {
|
||||
// This is a fused stream so we will shut down after receiving all
|
||||
// shutdown notifications.
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
msg = self.to_overseer_rx.select_next_some() => {
|
||||
match msg {
|
||||
ToOverseer::SpawnJob { name, s } => {
|
||||
self.spawn_job(name, s);
|
||||
@@ -1946,16 +1931,14 @@ where
|
||||
}
|
||||
}
|
||||
},
|
||||
res = self.running_subsystems.next().fuse() => {
|
||||
let finished = if let Some(finished) = res {
|
||||
finished
|
||||
} else {
|
||||
continue
|
||||
};
|
||||
|
||||
tracing::error!(target: LOG_TARGET, subsystem = ?finished, "subsystem finished unexpectedly");
|
||||
res = self.running_subsystems.select_next_some() => {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
subsystem = ?res,
|
||||
"subsystem finished unexpectedly",
|
||||
);
|
||||
self.stop().await;
|
||||
return finished;
|
||||
return res;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user