diff --git a/subxt/src/events/event_subscription.rs b/subxt/src/events/event_subscription.rs index ee3c32e97e..afd7fb3220 100644 --- a/subxt/src/events/event_subscription.rs +++ b/subxt/src/events/event_subscription.rs @@ -84,7 +84,7 @@ pub struct EventSubscription<'a, T: Config, Evs: 'static> { #[derivative(Debug = "ignore")] at: Option< std::pin::Pin< - Box, BasicError>> + 'a>, + Box, BasicError>> + Send + 'a>, >, >, _event_type: std::marker::PhantomData, @@ -175,3 +175,15 @@ impl<'a, T: Config, Evs: Decode> Stream for EventSubscription<'a, T, Evs> { Poll::Ready(Some(events)) } } + +#[cfg(test)] +mod test { + use super::*; + + // Ensure `EventSubscription` can be sent; only actually a compile-time check. + #[test] + fn check_sendability() { + fn assert_send() {} + assert_send::>(); + } +} diff --git a/subxt/src/events/filter_events.rs b/subxt/src/events/filter_events.rs index 84f5479b45..51d65a567e 100644 --- a/subxt/src/events/filter_events.rs +++ b/subxt/src/events/filter_events.rs @@ -51,7 +51,8 @@ pub struct FilterEvents<'a, Sub: 'a, T: Config, Filter: EventFilter> { FilteredEventDetails, BasicError, >, - > + 'a, + > + Send + + 'a, >, >, } @@ -131,7 +132,8 @@ pub trait EventFilter: private::Sealed { FilteredEventDetails, BasicError, >, - > + 'a, + > + Send + + 'a, >; } @@ -150,7 +152,9 @@ impl EventFilter for (Ev,) { fn filter<'a, T: Config, Evs: Decode + 'static>( events: Events<'a, T, Evs>, ) -> Box< - dyn Iterator, BasicError>> + 'a, + dyn Iterator, BasicError>> + + Send + + 'a, > { let block_hash = events.block_hash(); let mut iter = events.into_iter_raw(); @@ -189,7 +193,7 @@ macro_rules! impl_event_filter { type ReturnType = ( $(Option<$ty>,)+ ); fn filter<'a, T: Config, Evs: Decode + 'static>( events: Events<'a, T, Evs> - ) -> Box, BasicError>> + 'a> { + ) -> Box, BasicError>> + Send + 'a> { let block_hash = events.block_hash(); let mut iter = events.into_iter_raw(); Box::new(std::iter::from_fn(move || { diff --git a/subxt/src/rpc.rs b/subxt/src/rpc.rs index 8f439e0e2d..73e39e5495 100644 --- a/subxt/src/rpc.rs +++ b/subxt/src/rpc.rs @@ -31,15 +31,12 @@ use crate::{ storage::StorageKeyPrefix, Config, Metadata, + PhantomDataSendSync, }; use codec::{ Decode, Encode, }; -use core::{ - convert::TryInto, - marker::PhantomData, -}; use frame_metadata::RuntimeMetadataPrefixed; pub use jsonrpsee::{ client_transport::ws::{ @@ -211,14 +208,14 @@ pub struct ReadProof { pub struct Rpc { /// Rpc client for sending requests. pub client: Arc, - marker: PhantomData, + _marker: PhantomDataSendSync, } impl Clone for Rpc { fn clone(&self) -> Self { Self { client: self.client.clone(), - marker: PhantomData, + _marker: PhantomDataSendSync::new(), } } } @@ -228,7 +225,7 @@ impl Rpc { pub fn new(client: RpcClient) -> Self { Self { client: Arc::new(client), - marker: PhantomData, + _marker: PhantomDataSendSync::new(), } } diff --git a/subxt/tests/integration/events.rs b/subxt/tests/integration/events.rs index 7741ee78da..fbeb7cb17e 100644 --- a/subxt/tests/integration/events.rs +++ b/subxt/tests/integration/events.rs @@ -135,3 +135,45 @@ async fn balance_transfer_subscription() -> Result<(), subxt::BasicError> { Ok(()) } + +// This is just a compile-time check that we can subscribe to events in +// a context that requires the event subscription/filtering to be Send-able. +// We test a typical use of EventSubscription and FilterEvents. We don't need +// to run this code; just check that it compiles. +#[allow(unused)] +async fn check_events_are_sendable() { + // check that EventSubscription can be used across await points. + async_std::task::spawn(async { + let ctx = test_context().await; + + let mut event_sub = ctx.api.events().subscribe().await?; + + while let Some(ev) = event_sub.next().await { + // if `event_sub` doesn't implement Send, we can't hold + // it across an await point inside of a tokio::spawn, which + // requires Send. This will lead to a compile error. + } + + Ok::<_, subxt::BasicError>(()) + }); + + // Check that FilterEvents can be used across await points. + async_std::task::spawn(async { + let ctx = test_context().await; + + let mut event_sub = ctx + .api + .events() + .subscribe() + .await? + .filter_events::<(balances::events::Transfer,)>(); + + while let Some(ev) = event_sub.next().await { + // if `event_sub` doesn't implement Send, we can't hold + // it across an await point inside of a tokio::spawn, which + // requires Send; This will lead to a compile error. + } + + Ok::<_, subxt::BasicError>(()) + }); +}