mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 19:11:04 +00:00
Persistent Local Storage for offchain workers. (#2894)
* WiP. * Implement offchain storage APIs. * Change compare_and_set to return bool. * Add offchain http test. * Fix tests. * Bump spec version. * Fix warnings and test. * Fix compilation. * Remove unused code. * Introduce Local (fork-aware) and Persistent storage. * Fix borked merge. * Prevent warning on depreacated client.backend * Fix long lines. * Clean up dependencies. * Update core/primitives/src/offchain.rs Co-Authored-By: André Silva <andre.beat@gmail.com> * Update core/primitives/src/offchain.rs Co-Authored-By: André Silva <andre.beat@gmail.com>
This commit is contained in:
committed by
Gavin Wood
parent
24aa882ebc
commit
2217c1e9a1
@@ -15,6 +15,7 @@
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::sync::Arc;
|
||||
use client::backend::OffchainStorage;
|
||||
use futures::{Stream, Future, sync::mpsc};
|
||||
use log::{info, debug, warn, error};
|
||||
use parity_codec::Decode;
|
||||
@@ -22,6 +23,7 @@ use primitives::offchain::{
|
||||
Timestamp, HttpRequestId, HttpRequestStatus, HttpError,
|
||||
Externalities as OffchainExt,
|
||||
CryptoKind, CryptoKeyId,
|
||||
StorageKind,
|
||||
};
|
||||
use runtime_primitives::{
|
||||
generic::BlockId,
|
||||
@@ -37,17 +39,24 @@ enum ExtMessage {
|
||||
/// Asynchronous offchain API.
|
||||
///
|
||||
/// NOTE this is done to prevent recursive calls into the runtime (which are not supported currently).
|
||||
pub(crate) struct AsyncApi(mpsc::UnboundedSender<ExtMessage>);
|
||||
pub(crate) struct Api<S> {
|
||||
sender: mpsc::UnboundedSender<ExtMessage>,
|
||||
db: S,
|
||||
}
|
||||
|
||||
fn unavailable_yet<R: Default>(name: &str) -> R {
|
||||
error!("This {:?} API is not available for offchain workers yet. Follow
|
||||
error!("The {:?} API is not available for offchain workers yet. Follow \
|
||||
https://github.com/paritytech/substrate/issues/1458 for details", name);
|
||||
Default::default()
|
||||
}
|
||||
|
||||
impl OffchainExt for AsyncApi {
|
||||
const LOCAL_DB: &str = "LOCAL (fork-aware) DB";
|
||||
const STORAGE_PREFIX: &[u8] = b"storage";
|
||||
|
||||
impl<S: OffchainStorage> OffchainExt for Api<S> {
|
||||
fn submit_transaction(&mut self, ext: Vec<u8>) -> Result<(), ()> {
|
||||
self.0.unbounded_send(ExtMessage::SubmitExtrinsic(ext))
|
||||
self.sender
|
||||
.unbounded_send(ExtMessage::SubmitExtrinsic(ext))
|
||||
.map(|_| ())
|
||||
.map_err(|_| ())
|
||||
}
|
||||
@@ -89,16 +98,33 @@ impl OffchainExt for AsyncApi {
|
||||
unavailable_yet("random_seed")
|
||||
}
|
||||
|
||||
fn local_storage_set(&mut self, _key: &[u8], _value: &[u8]) {
|
||||
unavailable_yet("local_storage_set")
|
||||
fn local_storage_set(&mut self, kind: StorageKind, key: &[u8], value: &[u8]) {
|
||||
match kind {
|
||||
StorageKind::PERSISTENT => self.db.set(STORAGE_PREFIX, key, value),
|
||||
StorageKind::LOCAL => unavailable_yet(LOCAL_DB),
|
||||
}
|
||||
}
|
||||
|
||||
fn local_storage_compare_and_set(&mut self, _key: &[u8], _old_value: &[u8], _new_value: &[u8]) {
|
||||
unavailable_yet("local_storage_compare_and_set")
|
||||
fn local_storage_compare_and_set(
|
||||
&mut self,
|
||||
kind: StorageKind,
|
||||
key: &[u8],
|
||||
old_value: &[u8],
|
||||
new_value: &[u8],
|
||||
) -> bool {
|
||||
match kind {
|
||||
StorageKind::PERSISTENT => {
|
||||
self.db.compare_and_set(STORAGE_PREFIX, key, old_value, new_value)
|
||||
},
|
||||
StorageKind::LOCAL => unavailable_yet(LOCAL_DB),
|
||||
}
|
||||
}
|
||||
|
||||
fn local_storage_get(&mut self, _key: &[u8]) -> Option<Vec<u8>> {
|
||||
unavailable_yet("local_storage_get")
|
||||
fn local_storage_get(&mut self, kind: StorageKind, key: &[u8]) -> Option<Vec<u8>> {
|
||||
match kind {
|
||||
StorageKind::PERSISTENT => self.db.get(STORAGE_PREFIX, key),
|
||||
StorageKind::LOCAL => unavailable_yet(LOCAL_DB),
|
||||
}
|
||||
}
|
||||
|
||||
fn http_request_start(
|
||||
@@ -159,24 +185,35 @@ impl OffchainExt for AsyncApi {
|
||||
}
|
||||
|
||||
/// Offchain extensions implementation API
|
||||
pub(crate) struct Api<A: ChainApi> {
|
||||
///
|
||||
/// This is the asynchronous processing part of the API.
|
||||
pub(crate) struct AsyncApi<A: ChainApi> {
|
||||
receiver: Option<mpsc::UnboundedReceiver<ExtMessage>>,
|
||||
transaction_pool: Arc<Pool<A>>,
|
||||
at: BlockId<A::Block>,
|
||||
}
|
||||
|
||||
impl<A: ChainApi> Api<A> {
|
||||
pub fn new(
|
||||
impl<A: ChainApi> AsyncApi<A> {
|
||||
/// Creates new Offchain extensions API implementation an the asynchronous processing part.
|
||||
pub fn new<S: OffchainStorage>(
|
||||
transaction_pool: Arc<Pool<A>>,
|
||||
db: S,
|
||||
at: BlockId<A::Block>,
|
||||
) -> (AsyncApi, Self) {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
let api = Self {
|
||||
) -> (Api<S>, AsyncApi<A>) {
|
||||
let (sender, rx) = mpsc::unbounded();
|
||||
|
||||
let api = Api {
|
||||
sender,
|
||||
db,
|
||||
};
|
||||
|
||||
let async_api = AsyncApi {
|
||||
receiver: Some(rx),
|
||||
transaction_pool,
|
||||
at,
|
||||
};
|
||||
(AsyncApi(tx), api)
|
||||
|
||||
(api, async_api)
|
||||
}
|
||||
|
||||
/// Run a processing task for the API
|
||||
@@ -209,3 +246,52 @@ impl<A: ChainApi> Api<A> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use client_db::offchain::LocalStorage;
|
||||
|
||||
fn offchain_api() -> (Api<LocalStorage>, AsyncApi<impl ChainApi>) {
|
||||
let _ = env_logger::try_init();
|
||||
let db = LocalStorage::new_test();
|
||||
let client = Arc::new(test_client::new());
|
||||
let pool = Arc::new(
|
||||
Pool::new(Default::default(), transaction_pool::ChainApi::new(client.clone()))
|
||||
);
|
||||
|
||||
AsyncApi::new(pool, db, BlockId::Number(0))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_set_and_get_local_storage() {
|
||||
// given
|
||||
let kind = StorageKind::PERSISTENT;
|
||||
let mut api = offchain_api().0;
|
||||
let key = b"test";
|
||||
|
||||
// when
|
||||
assert_eq!(api.local_storage_get(kind, key), None);
|
||||
api.local_storage_set(kind, key, b"value");
|
||||
|
||||
// then
|
||||
assert_eq!(api.local_storage_get(kind, key), Some(b"value".to_vec()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_compare_and_set_local_storage() {
|
||||
// given
|
||||
let kind = StorageKind::PERSISTENT;
|
||||
let mut api = offchain_api().0;
|
||||
let key = b"test";
|
||||
api.local_storage_set(kind, key, b"value");
|
||||
|
||||
// when
|
||||
assert_eq!(api.local_storage_compare_and_set(kind, key, b"val", b"xxx"), false);
|
||||
assert_eq!(api.local_storage_get(kind, key), Some(b"value".to_vec()));
|
||||
|
||||
// when
|
||||
assert_eq!(api.local_storage_compare_and_set(kind, key, b"value", b"xxx"), true);
|
||||
assert_eq!(api.local_storage_get(kind, key), Some(b"xxx".to_vec()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,31 +56,35 @@ pub mod testing;
|
||||
pub use offchain_primitives::OffchainWorkerApi;
|
||||
|
||||
/// An offchain workers manager.
|
||||
pub struct OffchainWorkers<C, Block: traits::Block> {
|
||||
pub struct OffchainWorkers<C, S, Block: traits::Block> {
|
||||
client: Arc<C>,
|
||||
db: S,
|
||||
_block: PhantomData<Block>,
|
||||
}
|
||||
|
||||
impl<C, Block: traits::Block> fmt::Debug for OffchainWorkers<C, Block> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_tuple("OffchainWorkers").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<C, Block: traits::Block> OffchainWorkers<C, Block> {
|
||||
impl<C, S, Block: traits::Block> OffchainWorkers<C, S, Block> {
|
||||
/// Creates new `OffchainWorkers`.
|
||||
pub fn new(
|
||||
client: Arc<C>,
|
||||
db: S,
|
||||
) -> Self {
|
||||
Self {
|
||||
client,
|
||||
db,
|
||||
_block: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<C, Block> OffchainWorkers<C, Block> where
|
||||
impl<C, S, Block: traits::Block> fmt::Debug for OffchainWorkers<C, S, Block> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_tuple("OffchainWorkers").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<C, S, Block> OffchainWorkers<C, S, Block> where
|
||||
Block: traits::Block,
|
||||
S: client::backend::OffchainStorage + 'static,
|
||||
C: ProvideRuntimeApi,
|
||||
C::Api: OffchainWorkerApi<Block>,
|
||||
{
|
||||
@@ -99,7 +103,11 @@ impl<C, Block> OffchainWorkers<C, Block> where
|
||||
debug!("Checking offchain workers at {:?}: {:?}", at, has_api);
|
||||
|
||||
if has_api.unwrap_or(false) {
|
||||
let (api, runner) = api::Api::new(pool.clone(), at.clone());
|
||||
let (api, runner) = api::AsyncApi::new(
|
||||
pool.clone(),
|
||||
self.db.clone(),
|
||||
at.clone(),
|
||||
);
|
||||
debug!("Running offchain workers at {:?}", at);
|
||||
let api = Box::new(api);
|
||||
runtime.offchain_worker_with_context(&at, ExecutionContext::OffchainWorker(api), *number).unwrap();
|
||||
@@ -122,9 +130,10 @@ mod tests {
|
||||
let runtime = tokio::runtime::Runtime::new().unwrap();
|
||||
let client = Arc::new(test_client::new());
|
||||
let pool = Arc::new(Pool::new(Default::default(), ::transaction_pool::ChainApi::new(client.clone())));
|
||||
let db = client_db::offchain::LocalStorage::new_test();
|
||||
|
||||
// when
|
||||
let offchain = OffchainWorkers::new(client);
|
||||
let offchain = OffchainWorkers::new(client, db);
|
||||
runtime.executor().spawn(offchain.on_block_imported(&0u64, &pool));
|
||||
|
||||
// then
|
||||
|
||||
@@ -20,6 +20,7 @@ use std::{
|
||||
collections::BTreeMap,
|
||||
sync::Arc,
|
||||
};
|
||||
use client::backend::OffchainStorage;
|
||||
use parking_lot::RwLock;
|
||||
use primitives::offchain::{
|
||||
self,
|
||||
@@ -29,6 +30,7 @@ use primitives::offchain::{
|
||||
Timestamp,
|
||||
CryptoKind,
|
||||
CryptoKeyId,
|
||||
StorageKind,
|
||||
};
|
||||
|
||||
/// Pending request.
|
||||
@@ -61,6 +63,11 @@ pub struct PendingRequest {
|
||||
pub struct State {
|
||||
/// A list of pending requests.
|
||||
pub requests: BTreeMap<RequestId, PendingRequest>,
|
||||
expected_requests: BTreeMap<RequestId, PendingRequest>,
|
||||
/// Persistent local storage
|
||||
pub persistent_storage: client::in_mem::OffchainStorage,
|
||||
/// Local storage
|
||||
pub local_storage: client::in_mem::OffchainStorage,
|
||||
}
|
||||
|
||||
impl State {
|
||||
@@ -74,7 +81,7 @@ impl State {
|
||||
) {
|
||||
match self.requests.get_mut(&RequestId(id)) {
|
||||
None => {
|
||||
panic!("Missing expected request: {:?}.\n\nAll: {:?}", id, self.requests);
|
||||
panic!("Missing pending request: {:?}.\n\nAll: {:?}", id, self.requests);
|
||||
}
|
||||
Some(req) => {
|
||||
assert_eq!(
|
||||
@@ -86,12 +93,47 @@ impl State {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn fulfill_expected(&mut self, id: u16) {
|
||||
if let Some(mut req) = self.expected_requests.remove(&RequestId(id)) {
|
||||
let response = std::mem::replace(&mut req.response, vec![]);
|
||||
let headers = std::mem::replace(&mut req.response_headers, vec![]);
|
||||
self.fulfill_pending_request(id, req, response, headers);
|
||||
}
|
||||
}
|
||||
|
||||
/// Add expected HTTP request.
|
||||
///
|
||||
/// This method can be used to initialize expected HTTP requests and their responses
|
||||
/// before running the actual code that utilizes them (for instance before calling into runtime).
|
||||
/// Expected request has to be fulfilled before this struct is dropped,
|
||||
/// the `response` and `response_headers` fields will be used to return results to the callers.
|
||||
pub fn expect_request(&mut self, id: u16, expected: PendingRequest) {
|
||||
self.expected_requests.insert(RequestId(id), expected);
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for State {
|
||||
fn drop(&mut self) {
|
||||
if !self.expected_requests.is_empty() {
|
||||
panic!("Unfulfilled expected requests: {:?}", self.expected_requests);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Implementation of offchain externalities used for tests.
|
||||
#[derive(Clone, Default, Debug)]
|
||||
pub struct TestOffchainExt(pub Arc<RwLock<State>>);
|
||||
|
||||
impl TestOffchainExt {
|
||||
/// Create new `TestOffchainExt` and a reference to the internal state.
|
||||
pub fn new() -> (Self, Arc<RwLock<State>>) {
|
||||
let ext = Self::default();
|
||||
let state = ext.0.clone();
|
||||
(ext, state)
|
||||
}
|
||||
}
|
||||
|
||||
impl offchain::Externalities for TestOffchainExt {
|
||||
fn submit_transaction(&mut self, _ex: Vec<u8>) -> Result<(), ()> {
|
||||
unimplemented!("not needed in tests so far")
|
||||
@@ -129,21 +171,34 @@ impl offchain::Externalities for TestOffchainExt {
|
||||
unimplemented!("not needed in tests so far")
|
||||
}
|
||||
|
||||
fn local_storage_set(&mut self, _key: &[u8], _value: &[u8]) {
|
||||
unimplemented!("not needed in tests so far")
|
||||
fn local_storage_set(&mut self, kind: StorageKind, key: &[u8], value: &[u8]) {
|
||||
let mut state = self.0.write();
|
||||
match kind {
|
||||
StorageKind::LOCAL => &mut state.local_storage,
|
||||
StorageKind::PERSISTENT => &mut state.persistent_storage,
|
||||
}.set(b"", key, value);
|
||||
}
|
||||
|
||||
fn local_storage_compare_and_set(
|
||||
&mut self,
|
||||
_key: &[u8],
|
||||
_old_value: &[u8],
|
||||
_new_value: &[u8]
|
||||
) {
|
||||
unimplemented!("not needed in tests so far")
|
||||
kind: StorageKind,
|
||||
key: &[u8],
|
||||
old_value: &[u8],
|
||||
new_value: &[u8]
|
||||
) -> bool {
|
||||
let mut state = self.0.write();
|
||||
match kind {
|
||||
StorageKind::LOCAL => &mut state.local_storage,
|
||||
StorageKind::PERSISTENT => &mut state.persistent_storage,
|
||||
}.compare_and_set(b"", key, old_value, new_value)
|
||||
}
|
||||
|
||||
fn local_storage_get(&mut self, _key: &[u8]) -> Option<Vec<u8>> {
|
||||
unimplemented!("not needed in tests so far")
|
||||
fn local_storage_get(&mut self, kind: StorageKind, key: &[u8]) -> Option<Vec<u8>> {
|
||||
let state = self.0.read();
|
||||
match kind {
|
||||
StorageKind::LOCAL => &state.local_storage,
|
||||
StorageKind::PERSISTENT => &state.persistent_storage,
|
||||
}.get(b"", key)
|
||||
}
|
||||
|
||||
fn http_request_start(&mut self, method: &str, uri: &str, meta: &[u8]) -> Result<RequestId, ()> {
|
||||
@@ -180,15 +235,21 @@ impl offchain::Externalities for TestOffchainExt {
|
||||
_deadline: Option<Timestamp>
|
||||
) -> Result<(), HttpError> {
|
||||
let mut state = self.0.write();
|
||||
if let Some(req) = state.requests.get_mut(&request_id) {
|
||||
|
||||
let sent = {
|
||||
let req = state.requests.get_mut(&request_id).ok_or(HttpError::IoError)?;
|
||||
req.body.extend(chunk);
|
||||
if chunk.is_empty() {
|
||||
req.sent = true;
|
||||
}
|
||||
req.body.extend(chunk);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(HttpError::IoError)
|
||||
req.sent
|
||||
};
|
||||
|
||||
if sent {
|
||||
state.fulfill_expected(request_id.0);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn http_response_wait(
|
||||
|
||||
Reference in New Issue
Block a user