// Copyright 2019 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . use std::sync::Arc; use futures::{Stream, Future, sync::mpsc}; use inherents::pool::InherentsPool; use log::{info, debug, warn}; use parity_codec::Decode; use primitives::OffchainExt; use runtime_primitives::{ generic::BlockId, traits::{self, Extrinsic}, }; use transaction_pool::txpool::{Pool, ChainApi}; /// A message between the offchain extension and the processing thread. enum ExtMessage { SubmitExtrinsic(Vec), } /// Asynchronous offchain API. /// /// NOTE this is done to prevent recursive calls into the runtime (which are not supported currently). pub(crate) struct AsyncApi(mpsc::UnboundedSender); impl OffchainExt for AsyncApi { fn submit_extrinsic(&mut self, ext: Vec) { let _ = self.0.unbounded_send(ExtMessage::SubmitExtrinsic(ext)); } } /// Offchain extensions implementation API pub(crate) struct Api { receiver: Option>, transaction_pool: Arc>, inherents_pool: Arc::Extrinsic>>, at: BlockId, } impl Api { pub fn new( transaction_pool: Arc>, inherents_pool: Arc::Extrinsic>>, at: BlockId, ) -> (AsyncApi, Self) { let (tx, rx) = mpsc::unbounded(); let api = Self { receiver: Some(rx), transaction_pool, inherents_pool, at, }; (AsyncApi(tx), api) } /// Run a processing task for the API pub fn process(mut self) -> impl Future { let receiver = self.receiver.take().expect("Take invoked only once."); receiver.for_each(move |msg| { match msg { ExtMessage::SubmitExtrinsic(ext) => self.submit_extrinsic(ext), } Ok(()) }) } fn submit_extrinsic(&mut self, ext: Vec) { let xt = match ::Extrinsic::decode(&mut &*ext) { Some(xt) => xt, None => { warn!("Unable to decode extrinsic: {:?}", ext); return }, }; info!("Submitting to the pool: {:?} (isSigned: {:?})", xt, xt.is_signed()); match self.transaction_pool.submit_one(&self.at, xt.clone()) { Ok(hash) => debug!("[{:?}] Offchain transaction added to the pool.", hash), Err(_) => { debug!("Offchain inherent added to the pool."); self.inherents_pool.add(xt); }, } } }