// This file is part of Substrate. // Copyright (C) 2017-2021 Parity Technologies (UK) Ltd. // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with this program. If not, see . //! On-demand requests service. use crate::light_client_requests; use futures::{channel::oneshot, prelude::*}; use parking_lot::Mutex; use sc_client_api::{ ChangesProof, FetchChecker, Fetcher, RemoteBodyRequest, RemoteCallRequest, RemoteChangesRequest, RemoteHeaderRequest, RemoteReadChildRequest, RemoteReadRequest, StorageProof, }; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_blockchain::Error as ClientError; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; use std::{ collections::HashMap, pin::Pin, sync::Arc, task::{Context, Poll}, }; /// Implements the `Fetcher` trait of the client. Makes it possible for the light client to perform /// network requests for some state. /// /// This implementation stores all the requests in a queue. The network, in parallel, is then /// responsible for pulling elements out of that queue and fulfilling them. pub struct OnDemand { /// Objects that checks whether what has been retrieved is correct. checker: Arc>, /// Queue of requests. Set to `Some` at initialization, then extracted by the network. /// /// Note that a better alternative would be to use a MPMC queue here, and add a `poll` method /// from the `OnDemand`. However there exists no popular implementation of MPMC channels in /// asynchronous Rust at the moment requests_queue: Mutex>>>, /// Sending side of `requests_queue`. requests_send: TracingUnboundedSender>, } #[derive(Debug, thiserror::Error)] #[error("AlwaysBadChecker")] struct ErrorAlwaysBadChecker; impl Into for ErrorAlwaysBadChecker { fn into(self) -> ClientError { ClientError::Application(Box::new(self)) } } /// Dummy implementation of `FetchChecker` that always assumes that responses are bad. /// /// Considering that it is the responsibility of the client to build the fetcher, it can use this /// implementation if it knows that it will never perform any request. #[derive(Default, Clone)] pub struct AlwaysBadChecker; impl FetchChecker for AlwaysBadChecker { fn check_header_proof( &self, _request: &RemoteHeaderRequest, _remote_header: Option, _remote_proof: StorageProof, ) -> Result { Err(ErrorAlwaysBadChecker.into()) } fn check_read_proof( &self, _request: &RemoteReadRequest, _remote_proof: StorageProof, ) -> Result, Option>>, ClientError> { Err(ErrorAlwaysBadChecker.into()) } fn check_read_child_proof( &self, _request: &RemoteReadChildRequest, _remote_proof: StorageProof, ) -> Result, Option>>, ClientError> { Err(ErrorAlwaysBadChecker.into()) } fn check_execution_proof( &self, _request: &RemoteCallRequest, _remote_proof: StorageProof, ) -> Result, ClientError> { Err(ErrorAlwaysBadChecker.into()) } fn check_changes_proof( &self, _request: &RemoteChangesRequest, _remote_proof: ChangesProof, ) -> Result, u32)>, ClientError> { Err(ErrorAlwaysBadChecker.into()) } fn check_body_proof( &self, _request: &RemoteBodyRequest, _body: Vec, ) -> Result, ClientError> { Err(ErrorAlwaysBadChecker.into()) } } impl OnDemand where B::Header: HeaderT, { /// Creates new on-demand service. pub fn new(checker: Arc>) -> Self { let (requests_send, requests_queue) = tracing_unbounded("mpsc_ondemand"); let requests_queue = Mutex::new(Some(requests_queue)); Self { checker, requests_queue, requests_send } } /// Get checker reference. pub fn checker(&self) -> &Arc> { &self.checker } /// Extracts the queue of requests. /// /// Whenever one of the methods of the `Fetcher` trait is called, an element is pushed on this /// channel. /// /// If this function returns `None`, that means that the receiver has already been extracted in /// the past, and therefore that something already handles the requests. pub(crate) fn extract_receiver( &self, ) -> Option>> { self.requests_queue.lock().take() } } impl Fetcher for OnDemand where B: BlockT, B::Header: HeaderT, { type RemoteHeaderResult = RemoteResponse; type RemoteReadResult = RemoteResponse, Option>>>; type RemoteCallResult = RemoteResponse>; type RemoteChangesResult = RemoteResponse, u32)>>; type RemoteBodyResult = RemoteResponse>; fn remote_header(&self, request: RemoteHeaderRequest) -> Self::RemoteHeaderResult { let (sender, receiver) = oneshot::channel(); let _ = self .requests_send .unbounded_send(light_client_requests::sender::Request::Header { request, sender }); RemoteResponse { receiver } } fn remote_read(&self, request: RemoteReadRequest) -> Self::RemoteReadResult { let (sender, receiver) = oneshot::channel(); let _ = self .requests_send .unbounded_send(light_client_requests::sender::Request::Read { request, sender }); RemoteResponse { receiver } } fn remote_read_child( &self, request: RemoteReadChildRequest, ) -> Self::RemoteReadResult { let (sender, receiver) = oneshot::channel(); let _ = self .requests_send .unbounded_send(light_client_requests::sender::Request::ReadChild { request, sender }); RemoteResponse { receiver } } fn remote_call(&self, request: RemoteCallRequest) -> Self::RemoteCallResult { let (sender, receiver) = oneshot::channel(); let _ = self .requests_send .unbounded_send(light_client_requests::sender::Request::Call { request, sender }); RemoteResponse { receiver } } fn remote_changes( &self, request: RemoteChangesRequest, ) -> Self::RemoteChangesResult { let (sender, receiver) = oneshot::channel(); let _ = self .requests_send .unbounded_send(light_client_requests::sender::Request::Changes { request, sender }); RemoteResponse { receiver } } fn remote_body(&self, request: RemoteBodyRequest) -> Self::RemoteBodyResult { let (sender, receiver) = oneshot::channel(); let _ = self .requests_send .unbounded_send(light_client_requests::sender::Request::Body { request, sender }); RemoteResponse { receiver } } } /// Future for an on-demand remote call response. pub struct RemoteResponse { receiver: oneshot::Receiver>, } impl Future for RemoteResponse { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { match self.receiver.poll_unpin(cx) { Poll::Ready(Ok(res)) => Poll::Ready(res), Poll::Ready(Err(_)) => Poll::Ready(Err(ClientError::RemoteFetchCancelled)), Poll::Pending => Poll::Pending, } } }