mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-20 11:41:02 +00:00
Parachain node should not recover blocks while syncing (#2462)
This commit is contained in:
@@ -47,7 +47,7 @@
|
|||||||
|
|
||||||
use sc_client_api::{BlockBackend, BlockchainEvents, UsageProvider};
|
use sc_client_api::{BlockBackend, BlockchainEvents, UsageProvider};
|
||||||
use sc_consensus::import_queue::{ImportQueueService, IncomingBlock};
|
use sc_consensus::import_queue::{ImportQueueService, IncomingBlock};
|
||||||
use sp_consensus::{BlockOrigin, BlockStatus};
|
use sp_consensus::{BlockOrigin, BlockStatus, SyncOracle};
|
||||||
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
|
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
|
||||||
|
|
||||||
use polkadot_node_primitives::{AvailableData, POV_BOMB_LIMIT};
|
use polkadot_node_primitives::{AvailableData, POV_BOMB_LIMIT};
|
||||||
@@ -228,6 +228,7 @@ pub struct PoVRecovery<Block: BlockT, PC, RC> {
|
|||||||
recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
|
recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
|
||||||
/// Blocks that we are retrying currently
|
/// Blocks that we are retrying currently
|
||||||
candidates_in_retry: HashSet<Block::Hash>,
|
candidates_in_retry: HashSet<Block::Hash>,
|
||||||
|
parachain_sync_service: Arc<dyn SyncOracle + Sync + Send>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Block: BlockT, PC, RCInterface> PoVRecovery<Block, PC, RCInterface>
|
impl<Block: BlockT, PC, RCInterface> PoVRecovery<Block, PC, RCInterface>
|
||||||
@@ -244,6 +245,7 @@ where
|
|||||||
relay_chain_interface: RCInterface,
|
relay_chain_interface: RCInterface,
|
||||||
para_id: ParaId,
|
para_id: ParaId,
|
||||||
recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
|
recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
|
||||||
|
parachain_sync_service: Arc<dyn SyncOracle + Sync + Send>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
candidates: HashMap::new(),
|
candidates: HashMap::new(),
|
||||||
@@ -256,6 +258,7 @@ where
|
|||||||
para_id,
|
para_id,
|
||||||
candidates_in_retry: HashSet::new(),
|
candidates_in_retry: HashSet::new(),
|
||||||
recovery_chan_rx,
|
recovery_chan_rx,
|
||||||
|
parachain_sync_service,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -538,14 +541,19 @@ where
|
|||||||
pub async fn run(mut self) {
|
pub async fn run(mut self) {
|
||||||
let mut imported_blocks = self.parachain_client.import_notification_stream().fuse();
|
let mut imported_blocks = self.parachain_client.import_notification_stream().fuse();
|
||||||
let mut finalized_blocks = self.parachain_client.finality_notification_stream().fuse();
|
let mut finalized_blocks = self.parachain_client.finality_notification_stream().fuse();
|
||||||
let pending_candidates =
|
let pending_candidates = match pending_candidates(
|
||||||
match pending_candidates(self.relay_chain_interface.clone(), self.para_id).await {
|
self.relay_chain_interface.clone(),
|
||||||
Ok(pending_candidate_stream) => pending_candidate_stream.fuse(),
|
self.para_id,
|
||||||
Err(err) => {
|
self.parachain_sync_service.clone(),
|
||||||
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream.");
|
)
|
||||||
return
|
.await
|
||||||
},
|
{
|
||||||
};
|
Ok(pending_candidate_stream) => pending_candidate_stream.fuse(),
|
||||||
|
Err(err) => {
|
||||||
|
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream.");
|
||||||
|
return
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
futures::pin_mut!(pending_candidates);
|
futures::pin_mut!(pending_candidates);
|
||||||
|
|
||||||
@@ -600,13 +608,24 @@ where
|
|||||||
async fn pending_candidates(
|
async fn pending_candidates(
|
||||||
relay_chain_client: impl RelayChainInterface + Clone,
|
relay_chain_client: impl RelayChainInterface + Clone,
|
||||||
para_id: ParaId,
|
para_id: ParaId,
|
||||||
|
sync_service: Arc<dyn SyncOracle + Sync + Send>,
|
||||||
) -> RelayChainResult<impl Stream<Item = (CommittedCandidateReceipt, SessionIndex)>> {
|
) -> RelayChainResult<impl Stream<Item = (CommittedCandidateReceipt, SessionIndex)>> {
|
||||||
let import_notification_stream = relay_chain_client.import_notification_stream().await?;
|
let import_notification_stream = relay_chain_client.import_notification_stream().await?;
|
||||||
|
|
||||||
let filtered_stream = import_notification_stream.filter_map(move |n| {
|
let filtered_stream = import_notification_stream.filter_map(move |n| {
|
||||||
let client_for_closure = relay_chain_client.clone();
|
let client_for_closure = relay_chain_client.clone();
|
||||||
|
let sync_oracle = sync_service.clone();
|
||||||
async move {
|
async move {
|
||||||
let hash = n.hash();
|
let hash = n.hash();
|
||||||
|
if sync_oracle.is_major_syncing() {
|
||||||
|
tracing::debug!(
|
||||||
|
target: LOG_TARGET,
|
||||||
|
relay_hash = ?hash,
|
||||||
|
"Skipping candidate due to sync.",
|
||||||
|
);
|
||||||
|
return None
|
||||||
|
}
|
||||||
|
|
||||||
let pending_availability_result = client_for_closure
|
let pending_availability_result = client_for_closure
|
||||||
.candidate_pending_availability(hash, para_id)
|
.candidate_pending_availability(hash, para_id)
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -70,6 +70,7 @@ pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawn
|
|||||||
pub collator_key: CollatorPair,
|
pub collator_key: CollatorPair,
|
||||||
pub relay_chain_slot_duration: Duration,
|
pub relay_chain_slot_duration: Duration,
|
||||||
pub recovery_handle: Box<dyn RecoveryHandle>,
|
pub recovery_handle: Box<dyn RecoveryHandle>,
|
||||||
|
pub sync_service: Arc<SyncingService<Block>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Start a collator node for a parachain.
|
/// Start a collator node for a parachain.
|
||||||
@@ -91,6 +92,7 @@ pub async fn start_collator<'a, Block, BS, Client, Backend, RCInterface, Spawner
|
|||||||
collator_key,
|
collator_key,
|
||||||
relay_chain_slot_duration,
|
relay_chain_slot_duration,
|
||||||
recovery_handle,
|
recovery_handle,
|
||||||
|
sync_service,
|
||||||
}: StartCollatorParams<'a, Block, BS, Client, RCInterface, Spawner>,
|
}: StartCollatorParams<'a, Block, BS, Client, RCInterface, Spawner>,
|
||||||
) -> sc_service::error::Result<()>
|
) -> sc_service::error::Result<()>
|
||||||
where
|
where
|
||||||
@@ -136,6 +138,7 @@ where
|
|||||||
relay_chain_interface.clone(),
|
relay_chain_interface.clone(),
|
||||||
para_id,
|
para_id,
|
||||||
recovery_chan_rx,
|
recovery_chan_rx,
|
||||||
|
sync_service,
|
||||||
);
|
);
|
||||||
|
|
||||||
task_manager
|
task_manager
|
||||||
@@ -170,6 +173,7 @@ pub struct StartFullNodeParams<'a, Block: BlockT, Client, RCInterface> {
|
|||||||
pub relay_chain_slot_duration: Duration,
|
pub relay_chain_slot_duration: Duration,
|
||||||
pub import_queue: Box<dyn ImportQueueService<Block>>,
|
pub import_queue: Box<dyn ImportQueueService<Block>>,
|
||||||
pub recovery_handle: Box<dyn RecoveryHandle>,
|
pub recovery_handle: Box<dyn RecoveryHandle>,
|
||||||
|
pub sync_service: Arc<SyncingService<Block>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Start a full node for a parachain.
|
/// Start a full node for a parachain.
|
||||||
@@ -186,6 +190,7 @@ pub fn start_full_node<Block, Client, Backend, RCInterface>(
|
|||||||
relay_chain_slot_duration,
|
relay_chain_slot_duration,
|
||||||
import_queue,
|
import_queue,
|
||||||
recovery_handle,
|
recovery_handle,
|
||||||
|
sync_service,
|
||||||
}: StartFullNodeParams<Block, Client, RCInterface>,
|
}: StartFullNodeParams<Block, Client, RCInterface>,
|
||||||
) -> sc_service::error::Result<()>
|
) -> sc_service::error::Result<()>
|
||||||
where
|
where
|
||||||
@@ -231,6 +236,7 @@ where
|
|||||||
relay_chain_interface,
|
relay_chain_interface,
|
||||||
para_id,
|
para_id,
|
||||||
recovery_chan_rx,
|
recovery_chan_rx,
|
||||||
|
sync_service,
|
||||||
);
|
);
|
||||||
|
|
||||||
task_manager
|
task_manager
|
||||||
|
|||||||
@@ -271,7 +271,7 @@ async fn start_node_impl(
|
|||||||
&task_manager,
|
&task_manager,
|
||||||
relay_chain_interface.clone(),
|
relay_chain_interface.clone(),
|
||||||
transaction_pool,
|
transaction_pool,
|
||||||
sync_service,
|
sync_service.clone(),
|
||||||
params.keystore_container.keystore(),
|
params.keystore_container.keystore(),
|
||||||
force_authoring,
|
force_authoring,
|
||||||
para_id,
|
para_id,
|
||||||
@@ -291,6 +291,7 @@ async fn start_node_impl(
|
|||||||
collator_key: collator_key.expect("Command line arguments do not allow this. qed"),
|
collator_key: collator_key.expect("Command line arguments do not allow this. qed"),
|
||||||
relay_chain_slot_duration,
|
relay_chain_slot_duration,
|
||||||
recovery_handle: Box::new(overseer_handle),
|
recovery_handle: Box::new(overseer_handle),
|
||||||
|
sync_service,
|
||||||
};
|
};
|
||||||
|
|
||||||
start_collator(params).await?;
|
start_collator(params).await?;
|
||||||
@@ -304,6 +305,7 @@ async fn start_node_impl(
|
|||||||
relay_chain_slot_duration,
|
relay_chain_slot_duration,
|
||||||
import_queue: import_queue_service,
|
import_queue: import_queue_service,
|
||||||
recovery_handle: Box::new(overseer_handle),
|
recovery_handle: Box::new(overseer_handle),
|
||||||
|
sync_service,
|
||||||
};
|
};
|
||||||
|
|
||||||
start_full_node(params)?;
|
start_full_node(params)?;
|
||||||
|
|||||||
@@ -460,7 +460,7 @@ where
|
|||||||
&task_manager,
|
&task_manager,
|
||||||
relay_chain_interface.clone(),
|
relay_chain_interface.clone(),
|
||||||
transaction_pool,
|
transaction_pool,
|
||||||
sync_service,
|
sync_service.clone(),
|
||||||
params.keystore_container.keystore(),
|
params.keystore_container.keystore(),
|
||||||
force_authoring,
|
force_authoring,
|
||||||
)?;
|
)?;
|
||||||
@@ -480,6 +480,7 @@ where
|
|||||||
collator_key: collator_key.expect("Command line arguments do not allow this. qed"),
|
collator_key: collator_key.expect("Command line arguments do not allow this. qed"),
|
||||||
relay_chain_slot_duration,
|
relay_chain_slot_duration,
|
||||||
recovery_handle: Box::new(overseer_handle),
|
recovery_handle: Box::new(overseer_handle),
|
||||||
|
sync_service,
|
||||||
};
|
};
|
||||||
|
|
||||||
start_collator(params).await?;
|
start_collator(params).await?;
|
||||||
@@ -493,6 +494,7 @@ where
|
|||||||
relay_chain_slot_duration,
|
relay_chain_slot_duration,
|
||||||
import_queue: import_queue_service,
|
import_queue: import_queue_service,
|
||||||
recovery_handle: Box::new(overseer_handle),
|
recovery_handle: Box::new(overseer_handle),
|
||||||
|
sync_service,
|
||||||
};
|
};
|
||||||
|
|
||||||
start_full_node(params)?;
|
start_full_node(params)?;
|
||||||
@@ -659,7 +661,7 @@ where
|
|||||||
&task_manager,
|
&task_manager,
|
||||||
relay_chain_interface.clone(),
|
relay_chain_interface.clone(),
|
||||||
transaction_pool,
|
transaction_pool,
|
||||||
sync_service,
|
sync_service.clone(),
|
||||||
params.keystore_container.keystore(),
|
params.keystore_container.keystore(),
|
||||||
force_authoring,
|
force_authoring,
|
||||||
)?;
|
)?;
|
||||||
@@ -679,6 +681,7 @@ where
|
|||||||
collator_key: collator_key.expect("Command line arguments do not allow this. qed"),
|
collator_key: collator_key.expect("Command line arguments do not allow this. qed"),
|
||||||
relay_chain_slot_duration,
|
relay_chain_slot_duration,
|
||||||
recovery_handle: Box::new(overseer_handle),
|
recovery_handle: Box::new(overseer_handle),
|
||||||
|
sync_service,
|
||||||
};
|
};
|
||||||
|
|
||||||
start_collator(params).await?;
|
start_collator(params).await?;
|
||||||
@@ -692,6 +695,7 @@ where
|
|||||||
relay_chain_slot_duration,
|
relay_chain_slot_duration,
|
||||||
import_queue: import_queue_service,
|
import_queue: import_queue_service,
|
||||||
recovery_handle: Box::new(overseer_handle),
|
recovery_handle: Box::new(overseer_handle),
|
||||||
|
sync_service,
|
||||||
};
|
};
|
||||||
|
|
||||||
start_full_node(params)?;
|
start_full_node(params)?;
|
||||||
@@ -1429,7 +1433,7 @@ where
|
|||||||
&task_manager,
|
&task_manager,
|
||||||
relay_chain_interface.clone(),
|
relay_chain_interface.clone(),
|
||||||
transaction_pool,
|
transaction_pool,
|
||||||
sync_service,
|
sync_service.clone(),
|
||||||
params.keystore_container.keystore(),
|
params.keystore_container.keystore(),
|
||||||
force_authoring,
|
force_authoring,
|
||||||
)?;
|
)?;
|
||||||
@@ -1449,6 +1453,7 @@ where
|
|||||||
collator_key: collator_key.expect("Command line arguments do not allow this. qed"),
|
collator_key: collator_key.expect("Command line arguments do not allow this. qed"),
|
||||||
relay_chain_slot_duration,
|
relay_chain_slot_duration,
|
||||||
recovery_handle: Box::new(overseer_handle),
|
recovery_handle: Box::new(overseer_handle),
|
||||||
|
sync_service,
|
||||||
};
|
};
|
||||||
|
|
||||||
start_collator(params).await?;
|
start_collator(params).await?;
|
||||||
@@ -1462,6 +1467,7 @@ where
|
|||||||
relay_chain_slot_duration,
|
relay_chain_slot_duration,
|
||||||
import_queue: import_queue_service,
|
import_queue: import_queue_service,
|
||||||
recovery_handle: Box::new(overseer_handle),
|
recovery_handle: Box::new(overseer_handle),
|
||||||
|
sync_service,
|
||||||
};
|
};
|
||||||
|
|
||||||
start_full_node(params)?;
|
start_full_node(params)?;
|
||||||
|
|||||||
@@ -433,6 +433,7 @@ where
|
|||||||
import_queue: import_queue_service,
|
import_queue: import_queue_service,
|
||||||
relay_chain_slot_duration: Duration::from_secs(6),
|
relay_chain_slot_duration: Duration::from_secs(6),
|
||||||
recovery_handle,
|
recovery_handle,
|
||||||
|
sync_service,
|
||||||
};
|
};
|
||||||
|
|
||||||
start_collator(params).await?;
|
start_collator(params).await?;
|
||||||
@@ -446,6 +447,7 @@ where
|
|||||||
import_queue: import_queue_service,
|
import_queue: import_queue_service,
|
||||||
relay_chain_slot_duration: Duration::from_secs(6),
|
relay_chain_slot_duration: Duration::from_secs(6),
|
||||||
recovery_handle,
|
recovery_handle,
|
||||||
|
sync_service,
|
||||||
};
|
};
|
||||||
|
|
||||||
start_full_node(params)?;
|
start_full_node(params)?;
|
||||||
|
|||||||
Reference in New Issue
Block a user