// Copyright 2017-2019 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . //! Substrate blockchain API. use std::sync::Arc; use log::warn; use client::{self, Client, BlockchainEvents}; use jsonrpc_derive::rpc; use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId}; use primitives::{H256, Blake2Hasher}; use crate::rpc::Result as RpcResult; use crate::rpc::futures::{stream, Future, Sink, Stream}; use runtime_primitives::generic::{BlockId, SignedBlock}; use runtime_primitives::traits::{Block as BlockT, Header, NumberFor}; use crate::subscriptions::Subscriptions; mod error; #[cfg(test)] mod tests; mod number; use self::error::Result; /// Substrate blockchain API #[rpc] pub trait ChainApi { /// RPC metadata type Metadata; /// Get header of a relay chain block. #[rpc(name = "chain_getHeader")] fn header(&self, hash: Option) -> Result>; /// Get header and body of a relay chain block. #[rpc(name = "chain_getBlock")] fn block(&self, hash: Option) -> Result>; /// Get hash of the n-th block in the canon chain. /// /// By default returns latest block hash. #[rpc(name = "chain_getBlockHash", alias("chain_getHead"))] fn block_hash(&self, hash: Option>) -> Result>; /// Get hash of the last finalized block in the canon chain. #[rpc(name = "chain_getFinalizedHead", alias("chain_getFinalisedHead"))] fn finalized_head(&self) -> Result; /// New head subscription #[pubsub( subscription = "chain_newHead", subscribe, name = "chain_subscribeNewHead", alias("subscribe_newHead") )] fn subscribe_new_head(&self, metadata: Self::Metadata, subscriber: Subscriber
); /// Unsubscribe from new head subscription. #[pubsub( subscription = "chain_newHead", unsubscribe, name = "chain_unsubscribeNewHead", alias("unsubscribe_newHead") )] fn unsubscribe_new_head(&self, metadata: Option, id: SubscriptionId) -> RpcResult; /// New head subscription #[pubsub( subscription = "chain_finalizedHead", subscribe, name = "chain_subscribeFinalizedHeads", alias("chain_subscribeFinalisedHeads") )] fn subscribe_finalized_heads(&self, metadata: Self::Metadata, subscriber: Subscriber
); /// Unsubscribe from new head subscription. #[pubsub( subscription = "chain_finalizedHead", unsubscribe, name = "chain_unsubscribeFinalizedHeads", alias("chain_unsubscribeFinalisedHeads") )] fn unsubscribe_finalized_heads(&self, metadata: Option, id: SubscriptionId) -> RpcResult; } /// Chain API with subscriptions support. pub struct Chain { /// Substrate client. client: Arc>, /// Current subscriptions. subscriptions: Subscriptions, } impl Chain { /// Create new Chain API RPC handler. pub fn new(client: Arc>, subscriptions: Subscriptions) -> Self { Self { client, subscriptions, } } } impl Chain where Block: BlockT + 'static, B: client::backend::Backend + Send + Sync + 'static, E: client::CallExecutor + Send + Sync + 'static, RA: Send + Sync + 'static { fn unwrap_or_best(&self, hash: Option) -> Result { Ok(match hash.into() { None => self.client.info()?.chain.best_hash, Some(hash) => hash, }) } fn subscribe_headers( &self, subscriber: Subscriber, best_block_hash: G, stream: F, ) where F: FnOnce() -> S, G: FnOnce() -> Result>, ERR: ::std::fmt::Debug, S: Stream + Send + 'static, { self.subscriptions.add(subscriber, |sink| { // send current head right at the start. let header = best_block_hash() .and_then(|hash| self.header(hash.into())) .and_then(|header| { header.ok_or_else(|| self::error::ErrorKind::Unimplemented.into()) }) .map_err(Into::into); // send further subscriptions let stream = stream() .map(|res| Ok(res)) .map_err(|e| warn!("Block notification stream error: {:?}", e)); sink .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) .send_all( stream::iter_result(vec![Ok(header)]) .chain(stream) ) // we ignore the resulting Stream (if the first stream is over we are unsubscribed) .map(|_| ()) }); } } impl ChainApi, Block::Hash, Block::Header, SignedBlock> for Chain where Block: BlockT + 'static, B: client::backend::Backend + Send + Sync + 'static, E: client::CallExecutor + Send + Sync + 'static, RA: Send + Sync + 'static { type Metadata = crate::metadata::Metadata; fn header(&self, hash: Option) -> Result> { let hash = self.unwrap_or_best(hash)?; Ok(self.client.header(&BlockId::Hash(hash))?) } fn block(&self, hash: Option) -> Result>> { let hash = self.unwrap_or_best(hash)?; Ok(self.client.block(&BlockId::Hash(hash))?) } fn block_hash(&self, number: Option>>) -> Result> { Ok(match number { None => Some(self.client.info()?.chain.best_hash), Some(num_or_hex) => self.client.header(&BlockId::number(num_or_hex.to_number()?))?.map(|h| h.hash()), }) } fn finalized_head(&self) -> Result { Ok(self.client.info()?.chain.finalized_hash) } fn subscribe_new_head(&self, _metadata: Self::Metadata, subscriber: Subscriber) { self.subscribe_headers( subscriber, || self.block_hash(None.into()), || self.client.import_notification_stream() .filter(|notification| notification.is_new_best) .map(|notification| notification.header), ) } fn unsubscribe_new_head(&self, _metadata: Option, id: SubscriptionId) -> RpcResult { Ok(self.subscriptions.cancel(id)) } fn subscribe_finalized_heads(&self, _meta: Self::Metadata, subscriber: Subscriber) { self.subscribe_headers( subscriber, || Ok(Some(self.client.info()?.chain.finalized_hash)), || self.client.finality_notification_stream() .map(|notification| notification.header), ) } fn unsubscribe_finalized_heads(&self, _metadata: Option, id: SubscriptionId) -> RpcResult { Ok(self.subscriptions.cancel(id)) } }