mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 18:41:05 +00:00
Make EventSubscription and FilterEvents Send-able (#471)
* Make EventSubscription and FilterEvents Send-able * Cargo fmt * clippy * Remove unused import
This commit is contained in:
@@ -84,7 +84,7 @@ pub struct EventSubscription<'a, T: Config, Evs: 'static> {
|
||||
#[derivative(Debug = "ignore")]
|
||||
at: Option<
|
||||
std::pin::Pin<
|
||||
Box<dyn Future<Output = Result<Events<'a, T, Evs>, BasicError>> + 'a>,
|
||||
Box<dyn Future<Output = Result<Events<'a, T, Evs>, BasicError>> + Send + 'a>,
|
||||
>,
|
||||
>,
|
||||
_event_type: std::marker::PhantomData<Evs>,
|
||||
@@ -175,3 +175,15 @@ impl<'a, T: Config, Evs: Decode> Stream for EventSubscription<'a, T, Evs> {
|
||||
Poll::Ready(Some(events))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
// Ensure `EventSubscription` can be sent; only actually a compile-time check.
|
||||
#[test]
|
||||
fn check_sendability() {
|
||||
fn assert_send<T: Send>() {}
|
||||
assert_send::<EventSubscription<crate::DefaultConfig, ()>>();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,7 +51,8 @@ pub struct FilterEvents<'a, Sub: 'a, T: Config, Filter: EventFilter> {
|
||||
FilteredEventDetails<T::Hash, Filter::ReturnType>,
|
||||
BasicError,
|
||||
>,
|
||||
> + 'a,
|
||||
> + Send
|
||||
+ 'a,
|
||||
>,
|
||||
>,
|
||||
}
|
||||
@@ -131,7 +132,8 @@ pub trait EventFilter: private::Sealed {
|
||||
FilteredEventDetails<T::Hash, Self::ReturnType>,
|
||||
BasicError,
|
||||
>,
|
||||
> + 'a,
|
||||
> + Send
|
||||
+ 'a,
|
||||
>;
|
||||
}
|
||||
|
||||
@@ -150,7 +152,9 @@ impl<Ev: Event> EventFilter for (Ev,) {
|
||||
fn filter<'a, T: Config, Evs: Decode + 'static>(
|
||||
events: Events<'a, T, Evs>,
|
||||
) -> Box<
|
||||
dyn Iterator<Item = Result<FilteredEventDetails<T::Hash, Ev>, BasicError>> + 'a,
|
||||
dyn Iterator<Item = Result<FilteredEventDetails<T::Hash, Ev>, BasicError>>
|
||||
+ Send
|
||||
+ 'a,
|
||||
> {
|
||||
let block_hash = events.block_hash();
|
||||
let mut iter = events.into_iter_raw();
|
||||
@@ -189,7 +193,7 @@ macro_rules! impl_event_filter {
|
||||
type ReturnType = ( $(Option<$ty>,)+ );
|
||||
fn filter<'a, T: Config, Evs: Decode + 'static>(
|
||||
events: Events<'a, T, Evs>
|
||||
) -> Box<dyn Iterator<Item=Result<FilteredEventDetails<T::Hash,Self::ReturnType>, BasicError>> + 'a> {
|
||||
) -> Box<dyn Iterator<Item=Result<FilteredEventDetails<T::Hash,Self::ReturnType>, BasicError>> + Send + 'a> {
|
||||
let block_hash = events.block_hash();
|
||||
let mut iter = events.into_iter_raw();
|
||||
Box::new(std::iter::from_fn(move || {
|
||||
|
||||
+4
-7
@@ -31,15 +31,12 @@ use crate::{
|
||||
storage::StorageKeyPrefix,
|
||||
Config,
|
||||
Metadata,
|
||||
PhantomDataSendSync,
|
||||
};
|
||||
use codec::{
|
||||
Decode,
|
||||
Encode,
|
||||
};
|
||||
use core::{
|
||||
convert::TryInto,
|
||||
marker::PhantomData,
|
||||
};
|
||||
use frame_metadata::RuntimeMetadataPrefixed;
|
||||
pub use jsonrpsee::{
|
||||
client_transport::ws::{
|
||||
@@ -211,14 +208,14 @@ pub struct ReadProof<Hash> {
|
||||
pub struct Rpc<T: Config> {
|
||||
/// Rpc client for sending requests.
|
||||
pub client: Arc<RpcClient>,
|
||||
marker: PhantomData<T>,
|
||||
_marker: PhantomDataSendSync<T>,
|
||||
}
|
||||
|
||||
impl<T: Config> Clone for Rpc<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
client: self.client.clone(),
|
||||
marker: PhantomData,
|
||||
_marker: PhantomDataSendSync::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -228,7 +225,7 @@ impl<T: Config> Rpc<T> {
|
||||
pub fn new(client: RpcClient) -> Self {
|
||||
Self {
|
||||
client: Arc::new(client),
|
||||
marker: PhantomData,
|
||||
_marker: PhantomDataSendSync::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -135,3 +135,45 @@ async fn balance_transfer_subscription() -> Result<(), subxt::BasicError> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// This is just a compile-time check that we can subscribe to events in
|
||||
// a context that requires the event subscription/filtering to be Send-able.
|
||||
// We test a typical use of EventSubscription and FilterEvents. We don't need
|
||||
// to run this code; just check that it compiles.
|
||||
#[allow(unused)]
|
||||
async fn check_events_are_sendable() {
|
||||
// check that EventSubscription can be used across await points.
|
||||
async_std::task::spawn(async {
|
||||
let ctx = test_context().await;
|
||||
|
||||
let mut event_sub = ctx.api.events().subscribe().await?;
|
||||
|
||||
while let Some(ev) = event_sub.next().await {
|
||||
// if `event_sub` doesn't implement Send, we can't hold
|
||||
// it across an await point inside of a tokio::spawn, which
|
||||
// requires Send. This will lead to a compile error.
|
||||
}
|
||||
|
||||
Ok::<_, subxt::BasicError>(())
|
||||
});
|
||||
|
||||
// Check that FilterEvents can be used across await points.
|
||||
async_std::task::spawn(async {
|
||||
let ctx = test_context().await;
|
||||
|
||||
let mut event_sub = ctx
|
||||
.api
|
||||
.events()
|
||||
.subscribe()
|
||||
.await?
|
||||
.filter_events::<(balances::events::Transfer,)>();
|
||||
|
||||
while let Some(ev) = event_sub.next().await {
|
||||
// if `event_sub` doesn't implement Send, we can't hold
|
||||
// it across an await point inside of a tokio::spawn, which
|
||||
// requires Send; This will lead to a compile error.
|
||||
}
|
||||
|
||||
Ok::<_, subxt::BasicError>(())
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user