diff --git a/substrate/core/rpc/src/chain/mod.rs b/substrate/core/rpc/src/chain/mod.rs index 308a9996ef..8542d2ff4c 100644 --- a/substrate/core/rpc/src/chain/mod.rs +++ b/substrate/core/rpc/src/chain/mod.rs @@ -56,6 +56,10 @@ build_rpc_trait! { #[rpc(name = "chain_getBlockHash", alias = ["chain_getHead", ])] fn block_hash(&self, Trailing) -> Result>; + /// Get hash of the last finalised block in the canon chain. + #[rpc(name = "chain_getFinalisedHead")] + fn finalised_head(&self) -> Result; + /// Get the runtime version. #[rpc(name = "chain_getRuntimeVersion")] fn runtime_version(&self, Trailing) -> Result; @@ -70,6 +74,16 @@ build_rpc_trait! { fn unsubscribe_new_head(&self, SubscriptionId) -> RpcResult; } + #[pubsub(name = "chain_finalisedHead")] { + /// New head subscription + #[rpc(name = "chain_subscribeFinalisedHeads")] + fn subscribe_finalised_heads(&self, Self::Metadata, pubsub::Subscriber
); + + /// Unsubscribe from new head subscription. + #[rpc(name = "chain_unsubscribeFinalisedHeads")] + fn unsubscribe_finalised_heads(&self, SubscriptionId) -> RpcResult; + } + #[pubsub(name = "chain_runtimeVersion")] { /// New runtime version subscription #[rpc(name = "chain_subscribeRuntimeVersion")] @@ -111,6 +125,42 @@ impl Chain where Some(hash) => hash, }) } + + fn subscribe_headers( + &self, + subscriber: pubsub::Subscriber, + best_block_hash: G, + stream: F, + ) where + F: FnOnce() -> S, + G: FnOnce() -> Result>, + ERR: ::std::fmt::Debug, + S: Stream + Send + 'static, + { + self.subscriptions.add(subscriber, |sink| { + // send current head right at the start. + let header = best_block_hash() + .and_then(|hash| self.header(hash.into())) + .and_then(|header| { + header.ok_or_else(|| self::error::ErrorKind::Unimplemented.into()) + }) + .map_err(Into::into); + + // send further subscriptions + let stream = stream() + .map(|res| Ok(res)) + .map_err(|e| warn!("Block notification stream error: {:?}", e)); + + sink + .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) + .send_all( + stream::iter_result(vec![Ok(header)]) + .chain(stream) + ) + // we ignore the resulting Stream (if the first stream is over we are unsubscribed) + .map(|_| ()) + }); + } } impl ChainApi, Block::Extrinsic> for Chain where @@ -139,42 +189,41 @@ impl ChainApi, Block:: }) } + fn finalised_head(&self) -> Result { + Ok(self.client.info()?.chain.finalized_hash) + } + fn runtime_version(&self, at: Trailing) -> Result { let at = self.unwrap_or_best(at)?; Ok(self.client.runtime_version_at(&BlockId::Hash(at))?) } fn subscribe_new_head(&self, _metadata: Self::Metadata, subscriber: pubsub::Subscriber) { - self.subscriptions.add(subscriber, |sink| { - // send current head right at the start. - let header = self.block_hash(None.into()) - .and_then(|hash| self.header(hash.into())) - .and_then(|header| { - header.ok_or_else(|| self::error::ErrorKind::Unimplemented.into()) - }) - .map_err(Into::into); - - // send further subscriptions - let stream = self.client.import_notification_stream() + self.subscribe_headers( + subscriber, + || self.block_hash(None.into()), + || self.client.import_notification_stream() .filter(|notification| notification.is_new_best) - .map(|notification| Ok(notification.header)) - .map_err(|e| warn!("Block notification stream error: {:?}", e)); - - sink - .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) - .send_all( - stream::iter_result(vec![Ok(header)]) - .chain(stream) - ) - // we ignore the resulting Stream (if the first stream is over we are unsubscribed) - .map(|_| ()) - }); + .map(|notification| notification.header), + ) } fn unsubscribe_new_head(&self, id: SubscriptionId) -> RpcResult { Ok(self.subscriptions.cancel(id)) } + fn subscribe_finalised_heads(&self, _meta: Self::Metadata, subscriber: pubsub::Subscriber) { + self.subscribe_headers( + subscriber, + || Ok(Some(self.client.info()?.chain.finalized_hash)), + || self.client.finality_notification_stream() + .map(|notification| notification.header), + ) + } + + fn unsubscribe_finalised_heads(&self, id: SubscriptionId) -> RpcResult { + Ok(self.subscriptions.cancel(id)) + } fn subscribe_runtime_version(&self, _meta: Self::Metadata, subscriber: pubsub::Subscriber) { let stream = match self.client.storage_changes_notification_stream(Some(&[storage::StorageKey(storage::well_known_keys::CODE.to_vec())])) { diff --git a/substrate/core/rpc/src/chain/tests.rs b/substrate/core/rpc/src/chain/tests.rs index e673fe0af7..172223b941 100644 --- a/substrate/core/rpc/src/chain/tests.rs +++ b/substrate/core/rpc/src/chain/tests.rs @@ -150,7 +150,39 @@ fn should_return_block_hash() { client.block_hash(Some(1u64).into()), Ok(Some(ref x)) if x == &block.hash() ); +} + +#[test] +fn should_return_finalised_hash() { + let core = ::tokio::runtime::Runtime::new().unwrap(); + let remote = core.executor(); + + let client = Chain { + client: Arc::new(test_client::new()), + subscriptions: Subscriptions::new(remote), + }; + + assert_matches!( + client.finalised_head(), + Ok(ref x) if x == &client.client.genesis_hash() + ); + + // import new block + let builder = client.client.new_block().unwrap(); + client.client.justify_and_import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); + // no finalisation yet + assert_matches!( + client.finalised_head(), + Ok(ref x) if x == &client.client.genesis_hash() + ); + + // finalise + client.client.finalize_block(BlockId::number(1), true).unwrap(); + assert_matches!( + client.finalised_head(), + Ok(ref x) if x == &client.client.block_hash(1).unwrap().unwrap() + ); } #[test] @@ -184,6 +216,38 @@ fn should_notify_about_latest_block() { assert_eq!(core.block_on(next.into_future()).unwrap().0, None); } +#[test] +fn should_notify_about_finalised_block() { + let mut core = ::tokio::runtime::Runtime::new().unwrap(); + let remote = core.executor(); + let (subscriber, id, transport) = pubsub::Subscriber::new_test("test"); + + { + let api = Chain { + client: Arc::new(test_client::new()), + subscriptions: Subscriptions::new(remote), + }; + + api.subscribe_finalised_heads(Default::default(), subscriber); + + // assert id assigned + assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1)))); + + let builder = api.client.new_block().unwrap(); + api.client.justify_and_import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); + api.client.finalize_block(BlockId::number(1), true).unwrap(); + } + + // assert initial head sent. + let (notification, next) = core.block_on(transport.into_future()).unwrap(); + assert!(notification.is_some()); + // assert notification sent to transport + let (notification, next) = core.block_on(next.into_future()).unwrap(); + assert!(notification.is_some()); + // no more notifications on this channel + assert_eq!(core.block_on(next.into_future()).unwrap().0, None); +} + #[test] fn should_return_runtime_version() { let core = ::tokio::runtime::Runtime::new().unwrap();