// Copyright 2017-2020 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . //! On-demand requests service. use crate::protocol::light_dispatch::RequestData; use std::{collections::HashMap, pin::Pin, sync::Arc, task::Context, task::Poll}; use futures::{prelude::*, channel::mpsc, channel::oneshot}; use parking_lot::Mutex; use sp_blockchain::Error as ClientError; use sc_client_api::{ Fetcher, FetchChecker, RemoteHeaderRequest, RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest, RemoteReadChildRequest, RemoteBodyRequest, }; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; /// Implements the `Fetcher` trait of the client. Makes it possible for the light client to perform /// network requests for some state. /// /// This implementation stores all the requests in a queue. The network, in parallel, is then /// responsible for pulling elements out of that queue and fulfilling them. pub struct OnDemand { /// Objects that checks whether what has been retrieved is correct. checker: Arc>, /// Queue of requests. Set to `Some` at initialization, then extracted by the network. /// /// Note that a better alternative would be to use a MPMC queue here, and add a `poll` method /// from the `OnDemand`. However there exists no popular implementation of MPMC channels in /// asynchronous Rust at the moment requests_queue: Mutex>>>, /// Sending side of `requests_queue`. requests_send: mpsc::UnboundedSender>, } impl OnDemand where B::Header: HeaderT, { /// Creates new on-demand service. pub fn new(checker: Arc>) -> Self { let (requests_send, requests_queue) = mpsc::unbounded(); let requests_queue = Mutex::new(Some(requests_queue)); OnDemand { checker, requests_queue, requests_send, } } /// Get checker reference. pub fn checker(&self) -> &Arc> { &self.checker } /// Extracts the queue of requests. /// /// Whenever one of the methods of the `Fetcher` trait is called, an element is pushed on this /// channel. /// /// If this function returns `None`, that means that the receiver has already been extracted in /// the past, and therefore that something already handles the requests. pub(crate) fn extract_receiver(&self) -> Option>> { self.requests_queue.lock().take() } } impl Fetcher for OnDemand where B: BlockT, B::Header: HeaderT, { type RemoteHeaderResult = RemoteResponse; type RemoteReadResult = RemoteResponse, Option>>>; type RemoteCallResult = RemoteResponse>; type RemoteChangesResult = RemoteResponse, u32)>>; type RemoteBodyResult = RemoteResponse>; fn remote_header(&self, request: RemoteHeaderRequest) -> Self::RemoteHeaderResult { let (sender, receiver) = oneshot::channel(); let _ = self.requests_send.unbounded_send(RequestData::RemoteHeader(request, sender)); RemoteResponse { receiver } } fn remote_read(&self, request: RemoteReadRequest) -> Self::RemoteReadResult { let (sender, receiver) = oneshot::channel(); let _ = self.requests_send.unbounded_send(RequestData::RemoteRead(request, sender)); RemoteResponse { receiver } } fn remote_read_child( &self, request: RemoteReadChildRequest ) -> Self::RemoteReadResult { let (sender, receiver) = oneshot::channel(); let _ = self.requests_send.unbounded_send(RequestData::RemoteReadChild(request, sender)); RemoteResponse { receiver } } fn remote_call(&self, request: RemoteCallRequest) -> Self::RemoteCallResult { let (sender, receiver) = oneshot::channel(); let _ = self.requests_send.unbounded_send(RequestData::RemoteCall(request, sender)); RemoteResponse { receiver } } fn remote_changes(&self, request: RemoteChangesRequest) -> Self::RemoteChangesResult { let (sender, receiver) = oneshot::channel(); let _ = self.requests_send.unbounded_send(RequestData::RemoteChanges(request, sender)); RemoteResponse { receiver } } fn remote_body(&self, request: RemoteBodyRequest) -> Self::RemoteBodyResult { let (sender, receiver) = oneshot::channel(); let _ = self.requests_send.unbounded_send(RequestData::RemoteBody(request, sender)); RemoteResponse { receiver } } } /// Future for an on-demand remote call response. pub struct RemoteResponse { receiver: oneshot::Receiver>, } impl Future for RemoteResponse { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { match self.receiver.poll_unpin(cx) { Poll::Ready(Ok(res)) => Poll::Ready(res), Poll::Ready(Err(_)) => Poll::Ready(Err(From::from(ClientError::RemoteFetchCancelled))), Poll::Pending => Poll::Pending, } } }