mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 13:31:10 +00:00
Get event context on EventSubscription (#423)
* implement next_context * write test_context for method next_context * change how events are uniquely identified * undo local changes for test-runtime * introduce EventContext struct * adjust test_context to EventContext struct * fix return type for next_context * add suggestions by jsdw * ran cargo fmt and clippy
This commit is contained in:
+143
-20
@@ -39,6 +39,16 @@ use sp_core::{
|
||||
use sp_runtime::traits::Header;
|
||||
use std::collections::VecDeque;
|
||||
|
||||
/// Raw bytes for an Event, including the block hash where it occurred and its
|
||||
/// corresponding event index.
|
||||
#[derive(Debug)]
|
||||
#[cfg_attr(test, derive(PartialEq, Clone))]
|
||||
pub struct EventContext<Hash> {
|
||||
pub block_hash: Hash,
|
||||
pub event_idx: usize,
|
||||
pub event: RawEvent,
|
||||
}
|
||||
|
||||
/// Event subscription simplifies filtering a storage change set stream for
|
||||
/// events of interest.
|
||||
pub struct EventSubscription<'a, T: Config> {
|
||||
@@ -46,7 +56,7 @@ pub struct EventSubscription<'a, T: Config> {
|
||||
block: Option<T::Hash>,
|
||||
extrinsic: Option<usize>,
|
||||
event: Option<(&'static str, &'static str)>,
|
||||
events: VecDeque<RawEvent>,
|
||||
events: VecDeque<EventContext<T::Hash>>,
|
||||
finished: bool,
|
||||
}
|
||||
|
||||
@@ -57,13 +67,19 @@ enum BlockReader<'a, T: Config> {
|
||||
},
|
||||
/// Mock event listener for unit tests
|
||||
#[cfg(test)]
|
||||
Mock(Box<dyn Iterator<Item = (T::Hash, Result<Vec<(Phase, RawEvent)>, BasicError>)>>),
|
||||
Mock(
|
||||
Box<
|
||||
dyn Iterator<
|
||||
Item = (T::Hash, Result<Vec<(Phase, usize, RawEvent)>, BasicError>),
|
||||
>,
|
||||
>,
|
||||
),
|
||||
}
|
||||
|
||||
impl<'a, T: Config> BlockReader<'a, T> {
|
||||
async fn next(
|
||||
&mut self,
|
||||
) -> Option<(T::Hash, Result<Vec<(Phase, RawEvent)>, BasicError>)> {
|
||||
) -> Option<(T::Hash, Result<Vec<(Phase, usize, RawEvent)>, BasicError>)> {
|
||||
match self {
|
||||
BlockReader::Decoder {
|
||||
subscription,
|
||||
@@ -78,7 +94,13 @@ impl<'a, T: Config> BlockReader<'a, T> {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let flattened_events = events.map(|x| x.into_iter().flatten().collect());
|
||||
let flattened_events = events.map(|x| {
|
||||
x.into_iter()
|
||||
.flatten()
|
||||
.enumerate()
|
||||
.map(|(event_idx, (phase, raw))| (phase, event_idx, raw))
|
||||
.collect()
|
||||
});
|
||||
Some((change_set.block, flattened_events))
|
||||
}
|
||||
#[cfg(test)]
|
||||
@@ -124,6 +146,15 @@ impl<'a, T: Config> EventSubscription<'a, T> {
|
||||
|
||||
/// Gets the next event.
|
||||
pub async fn next(&mut self) -> Option<Result<RawEvent, BasicError>> {
|
||||
self.next_context()
|
||||
.await
|
||||
.map(|res| res.map(|ctx| ctx.event))
|
||||
}
|
||||
/// Gets the next event with the associated block hash and its corresponding
|
||||
/// event index.
|
||||
pub async fn next_context(
|
||||
&mut self,
|
||||
) -> Option<Result<EventContext<T::Hash>, BasicError>> {
|
||||
loop {
|
||||
if let Some(raw_event) = self.events.pop_front() {
|
||||
return Some(Ok(raw_event))
|
||||
@@ -144,7 +175,7 @@ impl<'a, T: Config> EventSubscription<'a, T> {
|
||||
match events {
|
||||
Err(err) => return Some(Err(err)),
|
||||
Ok(raw_events) => {
|
||||
for (phase, raw) in raw_events {
|
||||
for (phase, event_idx, raw) in raw_events {
|
||||
if let Some(ext_index) = self.extrinsic {
|
||||
if !matches!(phase, Phase::ApplyExtrinsic(i) if i as usize == ext_index)
|
||||
{
|
||||
@@ -156,7 +187,11 @@ impl<'a, T: Config> EventSubscription<'a, T> {
|
||||
continue
|
||||
}
|
||||
}
|
||||
self.events.push_back(raw);
|
||||
self.events.push_back(EventContext {
|
||||
block_hash: received_hash,
|
||||
event_idx,
|
||||
event: raw,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -276,7 +311,7 @@ mod tests {
|
||||
#[async_std::test]
|
||||
/// test that filters work correctly, and are independent of each other
|
||||
async fn test_filters() {
|
||||
let mut events = vec![];
|
||||
let mut events: Vec<(H256, Phase, usize, RawEvent)> = vec![];
|
||||
// create all events
|
||||
for block_hash in [H256::from([0; 32]), H256::from([1; 32])] {
|
||||
for phase in [
|
||||
@@ -285,14 +320,24 @@ mod tests {
|
||||
Phase::ApplyExtrinsic(1),
|
||||
Phase::Finalization,
|
||||
] {
|
||||
for event in [named_event("a"), named_event("b")] {
|
||||
events.push((block_hash, phase.clone(), event))
|
||||
}
|
||||
[named_event("a"), named_event("b")]
|
||||
.iter()
|
||||
.enumerate()
|
||||
.for_each(|(idx, event)| {
|
||||
events.push((
|
||||
block_hash,
|
||||
phase.clone(),
|
||||
// The event index
|
||||
idx,
|
||||
event.clone(),
|
||||
))
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// set variant index so we can uniquely identify the event
|
||||
events.iter_mut().enumerate().for_each(|(idx, event)| {
|
||||
event.2.variant_index = idx as u8;
|
||||
event.3.variant_index = idx as u8;
|
||||
});
|
||||
|
||||
let half_len = events.len() / 2;
|
||||
@@ -309,8 +354,8 @@ mod tests {
|
||||
Ok(events
|
||||
.iter()
|
||||
.take(half_len)
|
||||
.map(|(_, phase, event)| {
|
||||
(phase.clone(), event.clone())
|
||||
.map(|(_, phase, idx, event)| {
|
||||
(phase.clone(), *idx, event.clone())
|
||||
})
|
||||
.collect()),
|
||||
),
|
||||
@@ -319,8 +364,8 @@ mod tests {
|
||||
Ok(events
|
||||
.iter()
|
||||
.skip(half_len)
|
||||
.map(|(_, phase, event)| {
|
||||
(phase.clone(), event.clone())
|
||||
.map(|(_, phase, idx, event)| {
|
||||
(phase.clone(), *idx, event.clone())
|
||||
})
|
||||
.collect()),
|
||||
),
|
||||
@@ -333,21 +378,24 @@ mod tests {
|
||||
events: Default::default(),
|
||||
finished: false,
|
||||
};
|
||||
let mut expected_events = events.clone();
|
||||
|
||||
let mut expected_events: Vec<(H256, Phase, usize, RawEvent)> =
|
||||
events.clone();
|
||||
|
||||
if let Some(hash) = block_filter {
|
||||
expected_events.retain(|(h, _, _)| h == &hash);
|
||||
expected_events.retain(|(h, _, _, _)| h == &hash);
|
||||
}
|
||||
if let Some(idx) = extrinsic_filter {
|
||||
expected_events.retain(|(_, phase, _)| matches!(phase, Phase::ApplyExtrinsic(i) if *i as usize == idx));
|
||||
expected_events.retain(|(_, phase, _, _)| matches!(phase, Phase::ApplyExtrinsic(i) if *i as usize == idx));
|
||||
}
|
||||
if let Some(name) = event_filter {
|
||||
expected_events.retain(|(_, _, event)| event.pallet == name.0);
|
||||
expected_events.retain(|(_, _, _, event)| event.pallet == name.0);
|
||||
}
|
||||
|
||||
for expected_event in expected_events {
|
||||
assert_eq!(
|
||||
subscription.next().await.unwrap().unwrap(),
|
||||
expected_event.2
|
||||
expected_event.3
|
||||
);
|
||||
}
|
||||
assert!(subscription.next().await.is_none());
|
||||
@@ -355,4 +403,79 @@ mod tests {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_std::test]
|
||||
async fn test_context() {
|
||||
let mut events = vec![];
|
||||
// create all events
|
||||
for block_hash in [H256::from([0; 32]), H256::from([1; 32])] {
|
||||
for phase in [
|
||||
Phase::Initialization,
|
||||
Phase::ApplyExtrinsic(0),
|
||||
Phase::ApplyExtrinsic(1),
|
||||
Phase::Finalization,
|
||||
] {
|
||||
[named_event("a"), named_event("b")]
|
||||
.iter()
|
||||
.enumerate()
|
||||
.for_each(|(idx, event)| {
|
||||
events.push((
|
||||
phase.clone(),
|
||||
EventContext {
|
||||
block_hash,
|
||||
event_idx: idx,
|
||||
event: event.clone(),
|
||||
},
|
||||
));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// set variant index so we can uniquely identify the event
|
||||
events.iter_mut().enumerate().for_each(|(idx, (_, ctx))| {
|
||||
ctx.event.variant_index = idx as u8;
|
||||
});
|
||||
|
||||
let half_len = events.len() / 2;
|
||||
|
||||
let mut subscription: EventSubscription<DefaultConfig> = EventSubscription {
|
||||
block_reader: BlockReader::Mock(Box::new(
|
||||
vec![
|
||||
(
|
||||
events[0].1.block_hash,
|
||||
Ok(events
|
||||
.iter()
|
||||
.take(half_len)
|
||||
.map(|(phase, ctx)| {
|
||||
(phase.clone(), ctx.event_idx, ctx.event.clone())
|
||||
})
|
||||
.collect()),
|
||||
),
|
||||
(
|
||||
events[half_len].1.block_hash,
|
||||
Ok(events
|
||||
.iter()
|
||||
.skip(half_len)
|
||||
.map(|(phase, ctx)| {
|
||||
(phase.clone(), ctx.event_idx, ctx.event.clone())
|
||||
})
|
||||
.collect()),
|
||||
),
|
||||
]
|
||||
.into_iter(),
|
||||
)),
|
||||
block: None,
|
||||
extrinsic: None,
|
||||
event: None,
|
||||
events: Default::default(),
|
||||
finished: false,
|
||||
};
|
||||
|
||||
let expected_events = events.clone();
|
||||
|
||||
for exp in expected_events {
|
||||
assert_eq!(subscription.next_context().await.unwrap().unwrap(), exp.1);
|
||||
}
|
||||
assert!(subscription.next().await.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user