Runtime agnostic Events (#20)

* Introduce OpaqueEvent

* Look up event by module and variant

* Index events by module

* Get events by module

* Dynamically decode events

* Decode System events and EventRecord topics

* Use type sizes to decode raw events

* Remove unused imports

* rustfmt

* Unify error types, fix some compiler errors

* Make dynamic event decoding work

- fix compilation errors
- skip modules with no events when indexing
- preallocate vec for raw event data

* Remove printlns, replace where required with log

* Remove unused import

* Check missing type sizes

* Ignore unknown event arg type sizes

* Decode concrete System events, assumes every Runtime has the module

* Reorganise usings

* pub use some types

* Code docs

* Export Error

* Error Display impls

* Format code
This commit is contained in:
Andrew Jones
2019-09-26 17:48:25 +01:00
committed by GitHub
parent f9ae14bdb9
commit ee6db12917
8 changed files with 595 additions and 99 deletions
+113 -47
View File
@@ -16,8 +16,15 @@
use crate::{
error::Error,
events::{
EventsDecoder,
RuntimeEvent,
},
metadata::Metadata,
srml::system::System,
srml::{
balances::Balances,
system::System,
},
};
use futures::future::{
self,
@@ -29,6 +36,7 @@ use num_traits::bounds::Bounded;
use parity_scale_codec::{
Decode,
Encode,
Error as CodecError,
};
use runtime_metadata::RuntimeMetadataPrefixed;
@@ -107,22 +115,29 @@ impl<T: System> Rpc<T> {
}
/// Get a block hash, returns hash of latest block by default
pub fn block_hash(&self, hash: Option<BlockNumber<T>>) -> impl Future<Item = Option<T::Hash>, Error = Error> {
pub fn block_hash(
&self,
hash: Option<BlockNumber<T>>,
) -> impl Future<Item = Option<T::Hash>, Error = Error> {
self.chain.block_hash(hash).map_err(Into::into)
}
/// Get a Block
pub fn block(&self, hash: Option<T::Hash>) -> impl Future<Item = Option<ChainBlock<T>>, Error = Error> {
pub fn block(
&self,
hash: Option<T::Hash>,
) -> impl Future<Item = Option<ChainBlock<T>>, Error = Error> {
self.chain.block(hash).map_err(Into::into)
}
/// Fetch the runtime version
pub fn runtime_version(&self, at: Option<T::Hash>) -> impl Future<Item = RuntimeVersion, Error = Error> {
pub fn runtime_version(
&self,
at: Option<T::Hash>,
) -> impl Future<Item = RuntimeVersion, Error = Error> {
self.state.runtime_version(at).map_err(Into::into)
}
}
use crate::ExtrinsicSuccess;
use futures::{
future::IntoFuture,
stream::{
@@ -132,17 +147,22 @@ use futures::{
};
use jsonrpc_core_client::TypedSubscriptionStream;
use runtime_primitives::traits::Hash;
use srml_system::EventRecord;
use substrate_primitives::{
storage::StorageChangeSet,
twox_128,
};
use transaction_pool::txpool::watcher::Status;
use crate::{
events::RawEvent,
srml::system::SystemEvent,
};
use srml_system::Phase;
type MapClosure<T> = Box<dyn Fn(T) -> T + Send>;
pub type MapStream<T> = stream::Map<TypedSubscriptionStream<T>, MapClosure<T>>;
impl<T: System> Rpc<T> {
impl<T: System + Balances + 'static> Rpc<T> {
/// Subscribe to substrate System Events
pub fn subscribe_events(
&self,
@@ -205,9 +225,10 @@ impl<T: System> Rpc<T> {
}
/// Create and submit an extrinsic and return corresponding Event if successful
pub fn submit_and_watch_extrinsic<E>(
pub fn submit_and_watch_extrinsic<E: 'static>(
self,
extrinsic: E,
decoder: EventsDecoder<T>,
) -> impl Future<Item = ExtrinsicSuccess<T>, Error = Error>
where
E: Encode,
@@ -267,16 +288,71 @@ impl<T: System> Rpc<T> {
bh,
sb.block.extrinsics.len()
);
wait_for_block_events::<T>(ext_hash, &sb, bh, events)
wait_for_block_events(decoder, ext_hash, sb, bh, events)
})
})
}
}
/// Captures data for when an extrinsic is successfully included in a block
#[derive(Debug)]
pub struct ExtrinsicSuccess<T: System> {
/// Block hash.
pub block: T::Hash,
/// Extrinsic hash.
pub extrinsic: T::Hash,
/// Raw runtime events, can be decoded by the caller.
pub events: Vec<RuntimeEvent>,
}
impl<T: System> ExtrinsicSuccess<T> {
/// Find the Event for the given module/variant, with raw encoded event data.
/// Returns `None` if the Event is not found.
pub fn find_event_raw(&self, module: &str, variant: &str) -> Option<&RawEvent> {
self.events.iter().find_map(|evt| {
match evt {
RuntimeEvent::Raw(ref raw)
if raw.module == module && raw.variant == variant =>
{
Some(raw)
}
_ => None,
}
})
}
/// Returns all System Events
pub fn system_events(&self) -> Vec<&SystemEvent> {
self.events
.iter()
.filter_map(|evt| {
match evt {
RuntimeEvent::System(evt) => Some(evt),
_ => None,
}
})
.collect()
}
/// Find the Event for the given module/variant, attempting to decode the event data.
/// Returns `None` if the Event is not found.
/// Returns `Err` if the data fails to decode into the supplied type
pub fn find_event<E: Decode>(
&self,
module: &str,
variant: &str,
) -> Option<Result<E, CodecError>> {
self.find_event_raw(module, variant)
.map(|evt| E::decode(&mut &evt.data[..]))
}
}
/// Waits for events for the block triggered by the extrinsic
fn wait_for_block_events<T: System>(
pub fn wait_for_block_events<T: System + Balances + 'static>(
decoder: EventsDecoder<T>,
ext_hash: T::Hash,
signed_block: &ChainBlock<T>,
signed_block: ChainBlock<T>,
block_hash: T::Hash,
events_stream: MapStream<StorageChangeSet<T::Hash>>,
) -> impl Future<Item = ExtrinsicSuccess<T>, Error = Error> {
@@ -292,49 +368,39 @@ fn wait_for_block_events<T: System>(
.into_future();
let block_hash = block_hash.clone();
let block_events = events_stream
events_stream
.filter(move |event| event.block == block_hash)
.into_future()
.map_err(|(e, _)| e.into())
.and_then(|(change_set, _)| {
match change_set {
None => future::ok(Vec::new()),
Some(change_set) => {
let events = change_set
.changes
.iter()
.filter_map(|(_key, data)| {
data.as_ref().map(|data| Decode::decode(&mut &data.0[..]))
})
.collect::<Result<Vec<Vec<EventRecord<T::Event, T::Hash>>>, _>>()
.map(|events| events.into_iter().flat_map(|es| es).collect())
.map_err(Into::into);
future::result(events)
}
}
});
block_events
.join(ext_index)
.map(move |(events, ext_index)| {
let events: Vec<T::Event> = events
.iter()
.filter_map(|e| {
if let srml_system::Phase::ApplyExtrinsic(i) = e.phase {
if i as usize == ext_index {
Some(e.event.clone())
} else {
None
.and_then(move |((change_set, _), ext_index)| {
let events = match change_set {
None => Vec::new(),
Some(change_set) => {
let mut events = Vec::new();
for (_key, data) in change_set.changes {
if let Some(data) = data {
match decoder.decode_events(&mut &data.0[..]) {
Ok(raw_events) => {
for (phase, event) in raw_events {
if let Phase::ApplyExtrinsic(i) = phase {
if i as usize == ext_index {
events.push(event)
}
}
}
}
Err(err) => return future::err(err.into()),
}
}
} else {
None
}
})
.collect::<Vec<T::Event>>();
ExtrinsicSuccess {
events
}
};
future::ok(ExtrinsicSuccess {
block: block_hash,
extrinsic: ext_hash,
events,
}
})
})
}