mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 17:31:05 +00:00
@@ -412,7 +412,7 @@ async fn handle_new_activations<Context: SubsystemContext>(
|
||||
"failed to send collation result",
|
||||
);
|
||||
}
|
||||
})).await?;
|
||||
}))?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2100,7 +2100,6 @@ async fn launch_approval(
|
||||
|
||||
let (background, remote_handle) = background.remote_handle();
|
||||
ctx.spawn("approval-checks", Box::pin(background))
|
||||
.await
|
||||
.map(move |()| Some(remote_handle))
|
||||
}
|
||||
|
||||
|
||||
@@ -107,7 +107,7 @@ async fn run(
|
||||
let (mut validation_host, task) = polkadot_node_core_pvf::start(
|
||||
polkadot_node_core_pvf::Config::new(cache_path, program_path),
|
||||
);
|
||||
ctx.spawn_blocking("pvf-validation-host", task.boxed()).await?;
|
||||
ctx.spawn_blocking("pvf-validation-host", task.boxed())?;
|
||||
|
||||
loop {
|
||||
match ctx.recv().await? {
|
||||
|
||||
@@ -134,20 +134,20 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
async fn spawn(
|
||||
fn spawn(
|
||||
&mut self,
|
||||
name: &'static str,
|
||||
s: Pin<Box<dyn Future<Output = ()> + Send>>,
|
||||
) -> SubsystemResult<()> {
|
||||
self.inner.spawn(name, s).await
|
||||
self.inner.spawn(name, s)
|
||||
}
|
||||
|
||||
async fn spawn_blocking(
|
||||
fn spawn_blocking(
|
||||
&mut self,
|
||||
name: &'static str,
|
||||
s: Pin<Box<dyn Future<Output = ()> + Send>>,
|
||||
) -> SubsystemResult<()> {
|
||||
self.inner.spawn_blocking(name, s).await
|
||||
self.inner.spawn_blocking(name, s)
|
||||
}
|
||||
|
||||
fn sender(&mut self) -> &mut Self::Sender {
|
||||
|
||||
@@ -169,14 +169,13 @@ mod tests {
|
||||
#[test]
|
||||
fn failed_send_does_not_inc_sent() {
|
||||
let (mut bounded, _) = channel::<Msg>(5);
|
||||
let (mut unbounded, _) = unbounded::<Msg>();
|
||||
let (unbounded, _) = unbounded::<Msg>();
|
||||
|
||||
block_on(async move {
|
||||
assert!(bounded.send(Msg::default()).await.is_err());
|
||||
assert!(bounded.try_send(Msg::default()).is_err());
|
||||
assert_eq!(bounded.meter().read(), Readout { sent: 0, received: 0 });
|
||||
|
||||
assert!(unbounded.send(Msg::default()).await.is_err());
|
||||
assert!(unbounded.unbounded_send(Msg::default()).is_err());
|
||||
assert_eq!(unbounded.meter().read(), Readout { sent: 0, received: 0 });
|
||||
});
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
|
||||
//! Metered variant of unbounded mpsc channels to be able to extract metrics.
|
||||
|
||||
use futures::{channel::mpsc, task::Poll, task::Context, sink::SinkExt, stream::Stream};
|
||||
use futures::{channel::mpsc, task::Poll, task::Context, stream::Stream};
|
||||
|
||||
use std::result;
|
||||
use std::pin::Pin;
|
||||
@@ -130,21 +130,6 @@ impl<T> UnboundedMeteredSender<T> {
|
||||
&self.meter
|
||||
}
|
||||
|
||||
/// Send message, wait until capacity is available.
|
||||
pub async fn send(&mut self, item: T) -> result::Result<(), mpsc::SendError>
|
||||
where
|
||||
Self: Unpin,
|
||||
{
|
||||
self.meter.note_sent();
|
||||
let fut = self.inner.send(item);
|
||||
futures::pin_mut!(fut);
|
||||
fut.await.map_err(|e| {
|
||||
self.meter.retract_sent();
|
||||
e
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
/// Attempt to send message or fail immediately.
|
||||
pub fn unbounded_send(&self, msg: T) -> result::Result<(), mpsc::TrySendError<T>> {
|
||||
self.meter.note_sent();
|
||||
@@ -154,34 +139,3 @@ impl<T> UnboundedMeteredSender<T> {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> futures::sink::Sink<T> for UnboundedMeteredSender<T> {
|
||||
type Error = <futures::channel::mpsc::UnboundedSender<T> as futures::sink::Sink<T>>::Error;
|
||||
|
||||
fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
|
||||
Pin::new(&mut self.inner).start_send(item)
|
||||
}
|
||||
|
||||
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Pin::new(&mut self.inner).poll_ready(cx)
|
||||
}
|
||||
|
||||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
match Pin::new(&mut self.inner).poll_ready(cx) {
|
||||
val @ Poll::Ready(_)=> {
|
||||
val
|
||||
}
|
||||
other => other,
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
match Pin::new(&mut self.inner).poll_ready(cx) {
|
||||
val @ Poll::Ready(_)=> {
|
||||
self.meter.note_sent();
|
||||
val
|
||||
}
|
||||
other => other,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -76,7 +76,6 @@ where
|
||||
.with_validator_index(from_validator)
|
||||
.with_relay_parent(parent);
|
||||
ctx.spawn("pov-fetcher", fetch_pov_job(pov_hash, pending_response.boxed(), span, tx).boxed())
|
||||
.await
|
||||
.map_err(|e| Fatal::SpawnTask(e))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -189,7 +189,6 @@ impl FetchTask {
|
||||
let (handle, kill) = oneshot::channel();
|
||||
|
||||
ctx.spawn("chunk-fetcher", running.run(kill).boxed())
|
||||
.await
|
||||
.map_err(|e| Fatal::SpawnTask(e))?;
|
||||
|
||||
Ok(FetchTask {
|
||||
|
||||
@@ -650,7 +650,7 @@ async fn launch_interaction(
|
||||
awaiting: vec![response_sender],
|
||||
});
|
||||
|
||||
if let Err(e) = ctx.spawn("recovery interaction", Box::pin(remote)).await {
|
||||
if let Err(e) = ctx.spawn("recovery interaction", Box::pin(remote)) {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
err = ?e,
|
||||
|
||||
@@ -887,7 +887,7 @@ where
|
||||
shared.clone(),
|
||||
).remote_handle();
|
||||
|
||||
ctx.spawn("network-bridge-network-worker", Box::pin(remote)).await?;
|
||||
ctx.spawn("network-bridge-network-worker", Box::pin(remote))?;
|
||||
|
||||
ctx.send_message(AllMessages::StatementDistribution(
|
||||
StatementDistributionMessage::StatementFetchingReceiver(statement_receiver)
|
||||
|
||||
@@ -1238,8 +1238,7 @@ async fn launch_request(
|
||||
)
|
||||
.remote_handle();
|
||||
|
||||
let result = ctx.spawn("large-statement-fetcher", task.boxed())
|
||||
.await;
|
||||
let result = ctx.spawn("large-statement-fetcher", task.boxed());
|
||||
if let Err(err) = result {
|
||||
tracing::error!(target: LOG_TARGET, ?err, "Spawning task failed.");
|
||||
return None
|
||||
@@ -1952,9 +1951,7 @@ impl StatementDistribution {
|
||||
ctx.spawn(
|
||||
"large-statement-responder",
|
||||
respond(receiver, res_sender.clone()).boxed()
|
||||
)
|
||||
.await
|
||||
.map_err(Fatal::SpawnTask)?;
|
||||
).map_err(Fatal::SpawnTask)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,7 +103,7 @@ impl Subsystem2 {
|
||||
Delay::new(Duration::from_secs(1)).await;
|
||||
}
|
||||
}),
|
||||
).await.unwrap();
|
||||
).unwrap();
|
||||
|
||||
loop {
|
||||
match ctx.try_recv().await {
|
||||
|
||||
@@ -926,22 +926,22 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
|
||||
fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
|
||||
-> SubsystemResult<()>
|
||||
{
|
||||
self.to_overseer.send(ToOverseer::SpawnJob {
|
||||
self.to_overseer.unbounded_send(ToOverseer::SpawnJob {
|
||||
name,
|
||||
s,
|
||||
}).await.map_err(Into::into)
|
||||
}).map_err(|_| SubsystemError::TaskSpawn(name))
|
||||
}
|
||||
|
||||
async fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
|
||||
fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
|
||||
-> SubsystemResult<()>
|
||||
{
|
||||
self.to_overseer.send(ToOverseer::SpawnBlockingJob {
|
||||
self.to_overseer.unbounded_send(ToOverseer::SpawnBlockingJob {
|
||||
name,
|
||||
s,
|
||||
}).await.map_err(Into::into)
|
||||
}).map_err(|_| SubsystemError::TaskSpawn(name))
|
||||
}
|
||||
|
||||
fn sender(&mut self) -> &mut OverseerSubsystemSender {
|
||||
|
||||
@@ -224,7 +224,7 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext
|
||||
.ok_or_else(|| SubsystemError::Context("Receiving end closed".to_owned()))
|
||||
}
|
||||
|
||||
async fn spawn(
|
||||
fn spawn(
|
||||
&mut self,
|
||||
name: &'static str,
|
||||
s: Pin<Box<dyn Future<Output = ()> + Send>>,
|
||||
@@ -233,7 +233,7 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
|
||||
fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
|
||||
-> SubsystemResult<()>
|
||||
{
|
||||
self.spawn.spawn_blocking(name, s);
|
||||
|
||||
@@ -726,9 +726,9 @@ impl<Job: JobTrait, Spawner> JobSubsystem<Job, Spawner> {
|
||||
}
|
||||
outgoing = jobs.next() => {
|
||||
let res = match outgoing.expect("the Jobs stream never ends; qed") {
|
||||
FromJobCommand::Spawn(name, task) => ctx.spawn(name, task).await,
|
||||
FromJobCommand::Spawn(name, task) => ctx.spawn(name, task),
|
||||
FromJobCommand::SpawnBlocking(name, task)
|
||||
=> ctx.spawn_blocking(name, task).await,
|
||||
=> ctx.spawn_blocking(name, task),
|
||||
};
|
||||
|
||||
if let Err(e) = res {
|
||||
|
||||
@@ -197,8 +197,8 @@ pub enum SubsystemError {
|
||||
#[error(transparent)]
|
||||
QueueError(#[from] mpsc::SendError),
|
||||
|
||||
#[error(transparent)]
|
||||
TaskSpawn(#[from] futures::task::SpawnError),
|
||||
#[error("Failed to spawn a task: {0}")]
|
||||
TaskSpawn(&'static str),
|
||||
|
||||
#[error(transparent)]
|
||||
Infallible(#[from] std::convert::Infallible),
|
||||
@@ -293,10 +293,10 @@ pub trait SubsystemContext: Send + Sized + 'static {
|
||||
async fn recv(&mut self) -> SubsystemResult<FromOverseer<Self::Message>>;
|
||||
|
||||
/// Spawn a child task on the executor.
|
||||
async fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()>;
|
||||
fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()>;
|
||||
|
||||
/// Spawn a blocking child task on the executor's dedicated thread pool.
|
||||
async fn spawn_blocking(
|
||||
fn spawn_blocking(
|
||||
&mut self,
|
||||
name: &'static str,
|
||||
s: Pin<Box<dyn Future<Output = ()> + Send>>,
|
||||
|
||||
Reference in New Issue
Block a user