mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 01:11:10 +00:00
Remove sleep in import blocks (#6124)
* Add Delay and info logging * Switch from Duration to Delay in enum declaration * Remove sleep from import_blocks fn * Set back constans and remove unnecessary code * Fix hot loop * Reset timer once poll is ready, not when it's pending
This commit is contained in:
@@ -39,7 +39,9 @@ use sp_core::storage::{StorageKey, well_known_keys, ChildInfo, Storage, StorageC
|
||||
use sc_client_api::{StorageProvider, BlockBackend, UsageProvider};
|
||||
|
||||
use std::{io::{Read, Write, Seek}, pin::Pin, collections::HashMap};
|
||||
use std::{thread, time::{Duration, Instant}};
|
||||
use std::time::{Duration, Instant};
|
||||
use futures_timer::Delay;
|
||||
use std::task::Poll;
|
||||
use serde_json::{de::IoRead as JsonIoRead, Deserializer, StreamDeserializer};
|
||||
use std::convert::{TryFrom, TryInto};
|
||||
use sp_runtime::traits::{CheckedDiv, Saturating};
|
||||
@@ -272,14 +274,14 @@ enum ImportState<R, B> where
|
||||
/// The queue is full (contains at least MAX_PENDING_BLOCKS blocks) and we are waiting for it to catch up.
|
||||
WaitingForImportQueueToCatchUp{
|
||||
block_iter: BlockIter<R, B>,
|
||||
delay: Duration,
|
||||
delay: Delay,
|
||||
block: SignedBlock<B>
|
||||
},
|
||||
// We have added all the blocks to the queue but they are still being processed.
|
||||
WaitingForImportQueueToFinish{
|
||||
num_expected_blocks: Option<u64>,
|
||||
read_block_count: u64,
|
||||
delay: Duration,
|
||||
delay: Delay,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -373,7 +375,7 @@ impl<
|
||||
// The iterator is over: we now need to wait for the import queue to finish.
|
||||
let num_expected_blocks = block_iter.num_expected_blocks();
|
||||
let read_block_count = block_iter.read_block_count();
|
||||
let delay = Duration::from_millis(DELAY_TIME);
|
||||
let delay = Delay::new(Duration::from_millis(DELAY_TIME));
|
||||
state = Some(ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay});
|
||||
},
|
||||
Some(block_result) => {
|
||||
@@ -383,7 +385,7 @@ impl<
|
||||
if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS {
|
||||
// The queue is full, so do not add this block and simply wait until
|
||||
// the queue has made some progress.
|
||||
let delay = Duration::from_millis(DELAY_TIME);
|
||||
let delay = Delay::new(Duration::from_millis(DELAY_TIME));
|
||||
state = Some(ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block});
|
||||
} else {
|
||||
// Queue is not full, we can keep on adding blocks to the queue.
|
||||
@@ -392,18 +394,26 @@ impl<
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
return std::task::Poll::Ready(
|
||||
return Poll::Ready(
|
||||
Err(Error::Other(format!("Error reading block #{}: {}", read_block_count, e))))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block} => {
|
||||
ImportState::WaitingForImportQueueToCatchUp{block_iter, mut delay, block} => {
|
||||
let read_block_count = block_iter.read_block_count();
|
||||
if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS {
|
||||
thread::sleep(delay);
|
||||
// Queue is still full, so wait until there is room to insert our block.
|
||||
match Pin::new(&mut delay).poll(cx) {
|
||||
Poll::Pending => {
|
||||
state = Some(ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block});
|
||||
return Poll::Pending
|
||||
},
|
||||
Poll::Ready(_) => {
|
||||
delay.reset(Duration::from_millis(DELAY_TIME));
|
||||
},
|
||||
}
|
||||
state = Some(ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block});
|
||||
} else {
|
||||
// Queue is no longer full, so we can add our block to the queue.
|
||||
@@ -412,7 +422,7 @@ impl<
|
||||
state = Some(ImportState::Reading{block_iter});
|
||||
}
|
||||
},
|
||||
ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay} => {
|
||||
ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, mut delay} => {
|
||||
// All the blocks have been added to the queue, which doesn't mean they
|
||||
// have all been properly imported.
|
||||
if importing_is_done(num_expected_blocks, read_block_count, link.imported_blocks) {
|
||||
@@ -421,10 +431,20 @@ impl<
|
||||
"🎉 Imported {} blocks. Best: #{}",
|
||||
read_block_count, client.chain_info().best_number
|
||||
);
|
||||
return std::task::Poll::Ready(Ok(()))
|
||||
return Poll::Ready(Ok(()))
|
||||
} else {
|
||||
thread::sleep(delay);
|
||||
// Importing is not done, we still have to wait for the queue to finish.
|
||||
// Wait for the delay, because we know the queue is lagging behind.
|
||||
match Pin::new(&mut delay).poll(cx) {
|
||||
Poll::Pending => {
|
||||
state = Some(ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay});
|
||||
return Poll::Pending
|
||||
},
|
||||
Poll::Ready(_) => {
|
||||
delay.reset(Duration::from_millis(DELAY_TIME));
|
||||
},
|
||||
}
|
||||
|
||||
state = Some(ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay});
|
||||
}
|
||||
}
|
||||
@@ -436,7 +456,7 @@ impl<
|
||||
speedometer.notify_user(best_number);
|
||||
|
||||
if link.has_error {
|
||||
return std::task::Poll::Ready(Err(
|
||||
return Poll::Ready(Err(
|
||||
Error::Other(
|
||||
format!("Stopping after #{} blocks because of an error", link.imported_blocks)
|
||||
)
|
||||
@@ -444,7 +464,7 @@ impl<
|
||||
}
|
||||
|
||||
cx.waker().wake_by_ref();
|
||||
std::task::Poll::Pending
|
||||
Poll::Pending
|
||||
});
|
||||
Box::pin(import)
|
||||
}
|
||||
@@ -477,7 +497,7 @@ impl<
|
||||
let client = &self.client;
|
||||
|
||||
if last < block {
|
||||
return std::task::Poll::Ready(Err("Invalid block range specified".into()));
|
||||
return Poll::Ready(Err("Invalid block range specified".into()));
|
||||
}
|
||||
|
||||
if !wrote_header {
|
||||
@@ -501,19 +521,19 @@ impl<
|
||||
}
|
||||
},
|
||||
// Reached end of the chain.
|
||||
None => return std::task::Poll::Ready(Ok(())),
|
||||
None => return Poll::Ready(Ok(())),
|
||||
}
|
||||
if (block % 10000.into()).is_zero() {
|
||||
info!("#{}", block);
|
||||
}
|
||||
if block == last {
|
||||
return std::task::Poll::Ready(Ok(()));
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
block += One::one();
|
||||
|
||||
// Re-schedule the task in order to continue the operation.
|
||||
cx.waker().wake_by_ref();
|
||||
std::task::Poll::Pending
|
||||
Poll::Pending
|
||||
});
|
||||
|
||||
Box::pin(export)
|
||||
|
||||
Reference in New Issue
Block a user