mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 17:31:05 +00:00
fix: keep processing a block's events after encountering a dispatch error (#310)
* fix: keep processing a block's events after encountering a dispatch error * test: unit test for subscription
This commit is contained in:
@@ -42,6 +42,7 @@ use sp_core::Bytes;
|
||||
|
||||
/// Raw bytes for an Event
|
||||
#[derive(Debug)]
|
||||
#[cfg_attr(test, derive(PartialEq, Clone))]
|
||||
pub struct RawEvent {
|
||||
/// The name of the pallet from whence the Event originated.
|
||||
pub pallet: String,
|
||||
|
||||
+214
-22
@@ -44,15 +44,49 @@ use crate::{
|
||||
/// Event subscription simplifies filtering a storage change set stream for
|
||||
/// events of interest.
|
||||
pub struct EventSubscription<'a, T: Config> {
|
||||
subscription: EventStorageSubscription<T>,
|
||||
decoder: &'a EventsDecoder<T>,
|
||||
block_reader: BlockReader<'a, T>,
|
||||
block: Option<T::Hash>,
|
||||
extrinsic: Option<usize>,
|
||||
event: Option<(&'static str, &'static str)>,
|
||||
events: VecDeque<RawEvent>,
|
||||
events: VecDeque<Raw>,
|
||||
finished: bool,
|
||||
}
|
||||
|
||||
enum BlockReader<'a, T: Config> {
|
||||
Decoder {
|
||||
subscription: EventStorageSubscription<T>,
|
||||
decoder: &'a EventsDecoder<T>,
|
||||
},
|
||||
/// Mock event listener for unit tests
|
||||
#[cfg(test)]
|
||||
Mock(Box<dyn Iterator<Item = (T::Hash, Result<Vec<(Phase, Raw)>, Error>)>>),
|
||||
}
|
||||
|
||||
impl<'a, T: Config> BlockReader<'a, T> {
|
||||
async fn next(&mut self) -> Option<(T::Hash, Result<Vec<(Phase, Raw)>, Error>)> {
|
||||
match self {
|
||||
BlockReader::Decoder {
|
||||
subscription,
|
||||
decoder,
|
||||
} => {
|
||||
let change_set = subscription.next().await?;
|
||||
let events: Result<Vec<_>, _> = change_set
|
||||
.changes
|
||||
.into_iter()
|
||||
.filter_map(|(_key, change)| {
|
||||
Some(decoder.decode_events(&mut change?.0.as_slice()))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let flattened_events = events.map(|x| x.into_iter().flatten().collect());
|
||||
Some((change_set.block, flattened_events))
|
||||
}
|
||||
#[cfg(test)]
|
||||
BlockReader::Mock(it) => it.next(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: Config> EventSubscription<'a, T> {
|
||||
/// Creates a new event subscription.
|
||||
pub fn new(
|
||||
@@ -60,8 +94,10 @@ impl<'a, T: Config> EventSubscription<'a, T> {
|
||||
decoder: &'a EventsDecoder<T>,
|
||||
) -> Self {
|
||||
Self {
|
||||
subscription,
|
||||
decoder,
|
||||
block_reader: BlockReader::Decoder {
|
||||
subscription,
|
||||
decoder,
|
||||
},
|
||||
block: None,
|
||||
extrinsic: None,
|
||||
event: None,
|
||||
@@ -89,27 +125,28 @@ impl<'a, T: Config> EventSubscription<'a, T> {
|
||||
/// Gets the next event.
|
||||
pub async fn next(&mut self) -> Option<Result<RawEvent, Error>> {
|
||||
loop {
|
||||
if let Some(event) = self.events.pop_front() {
|
||||
return Some(Ok(event))
|
||||
if let Some(raw_event) = self.events.pop_front() {
|
||||
match raw_event {
|
||||
Raw::Event(event) => return Some(Ok(event)),
|
||||
Raw::Error(err) => return Some(Err(err.into())),
|
||||
};
|
||||
}
|
||||
if self.finished {
|
||||
return None
|
||||
}
|
||||
// always return None if subscription has closed
|
||||
let change_set = self.subscription.next().await?;
|
||||
let (received_hash, events) = self.block_reader.next().await?;
|
||||
if let Some(hash) = self.block.as_ref() {
|
||||
if &change_set.block == hash {
|
||||
if &received_hash == hash {
|
||||
self.finished = true;
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
}
|
||||
for (_key, data) in change_set.changes {
|
||||
if let Some(data) = data {
|
||||
let raw_events = match self.decoder.decode_events(&mut &data.0[..]) {
|
||||
Ok(events) => events,
|
||||
Err(error) => return Some(Err(error)),
|
||||
};
|
||||
|
||||
match events {
|
||||
Err(err) => return Some(Err(err)),
|
||||
Ok(raw_events) => {
|
||||
for (phase, raw) in raw_events {
|
||||
if let Phase::ApplyExtrinsic(i) = phase {
|
||||
if let Some(ext_index) = self.extrinsic {
|
||||
@@ -117,16 +154,15 @@ impl<'a, T: Config> EventSubscription<'a, T> {
|
||||
continue
|
||||
}
|
||||
}
|
||||
let event = match raw {
|
||||
Raw::Event(event) => event,
|
||||
Raw::Error(err) => return Some(Err(err.into())),
|
||||
};
|
||||
if let Some((module, variant)) = self.event {
|
||||
if event.pallet != module || event.variant != variant {
|
||||
continue
|
||||
if let Raw::Event(ref event) = raw {
|
||||
if event.pallet != module || event.variant != variant
|
||||
{
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
self.events.push_back(event);
|
||||
self.events.push_back(raw);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -227,3 +263,159 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::RuntimeError;
|
||||
|
||||
use super::*;
|
||||
use sp_core::H256;
|
||||
#[derive(Clone)]
|
||||
struct MockConfig;
|
||||
|
||||
impl Config for MockConfig {
|
||||
type Index = u32;
|
||||
type BlockNumber = u32;
|
||||
type Hash = sp_core::H256;
|
||||
type Hashing = sp_runtime::traits::BlakeTwo256;
|
||||
type AccountId = sp_runtime::AccountId32;
|
||||
type Address = sp_runtime::MultiAddress<Self::AccountId, u32>;
|
||||
type Header = sp_runtime::generic::Header<
|
||||
Self::BlockNumber,
|
||||
sp_runtime::traits::BlakeTwo256,
|
||||
>;
|
||||
type Signature = sp_runtime::MultiSignature;
|
||||
type Extrinsic = sp_runtime::OpaqueExtrinsic;
|
||||
}
|
||||
|
||||
fn named_event(event_name: &str) -> RawEvent {
|
||||
RawEvent {
|
||||
data: sp_core::Bytes::from(Vec::new()),
|
||||
pallet: event_name.to_string(),
|
||||
variant: event_name.to_string(),
|
||||
pallet_index: 0,
|
||||
variant_index: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn raw_event(id: u8) -> RawEvent {
|
||||
RawEvent {
|
||||
data: sp_core::Bytes::from(Vec::new()),
|
||||
pallet: "SomePallet".to_string(),
|
||||
variant: "SomeVariant".to_string(),
|
||||
pallet_index: id,
|
||||
variant_index: id,
|
||||
}
|
||||
}
|
||||
|
||||
fn event(id: u8) -> Raw {
|
||||
Raw::Event(raw_event(id))
|
||||
}
|
||||
|
||||
#[async_std::test]
|
||||
async fn test_error_does_not_stop_subscription() {
|
||||
let mut subscription: EventSubscription<MockConfig> = EventSubscription {
|
||||
block_reader: BlockReader::Mock(Box::new(
|
||||
vec![(
|
||||
H256::from([0; 32]),
|
||||
Ok(vec![
|
||||
(
|
||||
Phase::ApplyExtrinsic(0),
|
||||
Raw::Error(RuntimeError::BadOrigin),
|
||||
),
|
||||
(Phase::ApplyExtrinsic(0), event(1)),
|
||||
]),
|
||||
)]
|
||||
.into_iter(),
|
||||
)),
|
||||
block: None,
|
||||
extrinsic: None,
|
||||
event: None,
|
||||
events: Default::default(),
|
||||
finished: false,
|
||||
};
|
||||
|
||||
assert!(matches!(
|
||||
subscription.next().await.unwrap().unwrap_err(),
|
||||
Error::Runtime(RuntimeError::BadOrigin)
|
||||
));
|
||||
assert_eq!(subscription.next().await.unwrap().unwrap(), raw_event(1));
|
||||
assert!(subscription.next().await.is_none());
|
||||
}
|
||||
|
||||
#[async_std::test]
|
||||
/// test that filters work correctly, and are independent of each other
|
||||
async fn test_filters() {
|
||||
let mut events = vec![];
|
||||
// create all events
|
||||
for block_hash in [H256::from([0; 32]), H256::from([1; 32])] {
|
||||
for phase in [Phase::ApplyExtrinsic(0), Phase::ApplyExtrinsic(1)] {
|
||||
for event in [named_event("a"), named_event("b")] {
|
||||
events.push((block_hash, phase.clone(), event))
|
||||
}
|
||||
}
|
||||
}
|
||||
// set variant index so we can uniquely identify the event
|
||||
events.iter_mut().enumerate().for_each(|(idx, event)| {
|
||||
event.2.variant_index = idx as u8;
|
||||
});
|
||||
|
||||
for block_filter in [None, Some(H256::from([1; 32]))] {
|
||||
for extrinsic_filter in [None, Some(1)] {
|
||||
for event_filter in [None, Some(("b", "b"))] {
|
||||
let mut subscription: EventSubscription<MockConfig> =
|
||||
EventSubscription {
|
||||
block_reader: BlockReader::Mock(Box::new(
|
||||
vec![
|
||||
(
|
||||
events[0].0,
|
||||
Ok(events
|
||||
.iter()
|
||||
.take(4)
|
||||
.map(|(_, phase, event)| {
|
||||
(phase.clone(), Raw::Event(event.clone()))
|
||||
})
|
||||
.collect()),
|
||||
),
|
||||
(
|
||||
events[4].0,
|
||||
Ok(events
|
||||
.iter()
|
||||
.skip(4)
|
||||
.take(4)
|
||||
.map(|(_, phase, event)| {
|
||||
(phase.clone(), Raw::Event(event.clone()))
|
||||
})
|
||||
.collect()),
|
||||
),
|
||||
]
|
||||
.into_iter(),
|
||||
)),
|
||||
block: block_filter.clone(),
|
||||
extrinsic: extrinsic_filter.clone(),
|
||||
event: event_filter.clone(),
|
||||
events: Default::default(),
|
||||
finished: false,
|
||||
};
|
||||
let mut expected_events = events.clone();
|
||||
if let Some(hash) = block_filter {
|
||||
expected_events.retain(|(h, _, _)| h == &hash);
|
||||
}
|
||||
if let Some(idx) = extrinsic_filter {
|
||||
expected_events.retain(|(_, phase, _)| matches!(phase, Phase::ApplyExtrinsic(i) if *i as usize == idx));
|
||||
}
|
||||
if let Some(name) = event_filter {
|
||||
expected_events.retain(|(_, _, event)| event.pallet == name.0);
|
||||
}
|
||||
for expected_event in expected_events {
|
||||
assert_eq!(
|
||||
subscription.next().await.unwrap().unwrap(),
|
||||
expected_event.2
|
||||
);
|
||||
}
|
||||
assert!(subscription.next().await.is_none());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user