mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 19:51:05 +00:00
av-store: clean up StoreAvailableData message (#3984)
* av-store: clean up StoreAvailableData message * fmt * use named fields
This commit is contained in:
@@ -1074,19 +1074,18 @@ fn process_message(
|
||||
},
|
||||
}
|
||||
},
|
||||
AvailabilityStoreMessage::StoreAvailableData(
|
||||
candidate,
|
||||
_our_index,
|
||||
AvailabilityStoreMessage::StoreAvailableData {
|
||||
candidate_hash,
|
||||
n_validators,
|
||||
available_data,
|
||||
tx,
|
||||
) => {
|
||||
} => {
|
||||
subsystem.metrics.on_chunks_received(n_validators as _);
|
||||
|
||||
let _timer = subsystem.metrics.time_store_available_data();
|
||||
|
||||
let res =
|
||||
store_available_data(&subsystem, candidate, n_validators as _, available_data);
|
||||
store_available_data(&subsystem, candidate_hash, n_validators as _, available_data);
|
||||
|
||||
match res {
|
||||
Ok(()) => {
|
||||
|
||||
@@ -420,13 +420,12 @@ fn store_block_works() {
|
||||
};
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
|
||||
let block_msg = AvailabilityStoreMessage::StoreAvailableData {
|
||||
candidate_hash,
|
||||
Some(validator_index),
|
||||
n_validators,
|
||||
available_data.clone(),
|
||||
available_data: available_data.clone(),
|
||||
tx,
|
||||
);
|
||||
};
|
||||
|
||||
virtual_overseer.send(FromOverseer::Communication { msg: block_msg }).await;
|
||||
assert_eq!(rx.await.unwrap(), Ok(()));
|
||||
@@ -474,13 +473,12 @@ fn store_pov_and_query_chunk_works() {
|
||||
erasure::obtain_chunks_v1(n_validators as _, &available_data).unwrap();
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
|
||||
let block_msg = AvailabilityStoreMessage::StoreAvailableData {
|
||||
candidate_hash,
|
||||
None,
|
||||
n_validators,
|
||||
available_data,
|
||||
tx,
|
||||
);
|
||||
};
|
||||
|
||||
virtual_overseer.send(FromOverseer::Communication { msg: block_msg }).await;
|
||||
|
||||
@@ -521,13 +519,12 @@ fn query_all_chunks_works() {
|
||||
|
||||
{
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
|
||||
candidate_hash_1,
|
||||
None,
|
||||
let block_msg = AvailabilityStoreMessage::StoreAvailableData {
|
||||
candidate_hash: candidate_hash_1,
|
||||
n_validators,
|
||||
available_data,
|
||||
tx,
|
||||
);
|
||||
};
|
||||
|
||||
virtual_overseer.send(FromOverseer::Communication { msg: block_msg }).await;
|
||||
assert_eq!(rx.await.unwrap(), Ok(()));
|
||||
@@ -610,13 +607,12 @@ fn stored_but_not_included_data_is_pruned() {
|
||||
};
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
|
||||
let block_msg = AvailabilityStoreMessage::StoreAvailableData {
|
||||
candidate_hash,
|
||||
None,
|
||||
n_validators,
|
||||
available_data.clone(),
|
||||
available_data: available_data.clone(),
|
||||
tx,
|
||||
);
|
||||
};
|
||||
|
||||
virtual_overseer.send(FromOverseer::Communication { msg: block_msg }).await;
|
||||
|
||||
@@ -663,13 +659,12 @@ fn stored_data_kept_until_finalized() {
|
||||
let block_number = 10;
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
|
||||
let block_msg = AvailabilityStoreMessage::StoreAvailableData {
|
||||
candidate_hash,
|
||||
None,
|
||||
n_validators,
|
||||
available_data.clone(),
|
||||
available_data: available_data.clone(),
|
||||
tx,
|
||||
);
|
||||
};
|
||||
|
||||
virtual_overseer.send(FromOverseer::Communication { msg: block_msg }).await;
|
||||
|
||||
@@ -899,26 +894,24 @@ fn forkfullness_works() {
|
||||
};
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let msg = AvailabilityStoreMessage::StoreAvailableData(
|
||||
candidate_1_hash,
|
||||
None,
|
||||
let msg = AvailabilityStoreMessage::StoreAvailableData {
|
||||
candidate_hash: candidate_1_hash,
|
||||
n_validators,
|
||||
available_data_1.clone(),
|
||||
available_data: available_data_1.clone(),
|
||||
tx,
|
||||
);
|
||||
};
|
||||
|
||||
virtual_overseer.send(FromOverseer::Communication { msg }).await;
|
||||
|
||||
rx.await.unwrap().unwrap();
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let msg = AvailabilityStoreMessage::StoreAvailableData(
|
||||
candidate_2_hash,
|
||||
None,
|
||||
let msg = AvailabilityStoreMessage::StoreAvailableData {
|
||||
candidate_hash: candidate_2_hash,
|
||||
n_validators,
|
||||
available_data_2.clone(),
|
||||
available_data: available_data_2.clone(),
|
||||
tx,
|
||||
);
|
||||
};
|
||||
|
||||
virtual_overseer.send(FromOverseer::Communication { msg }).await;
|
||||
|
||||
|
||||
@@ -294,20 +294,18 @@ fn table_attested_to_backed(
|
||||
|
||||
async fn store_available_data(
|
||||
sender: &mut JobSender<impl SubsystemSender>,
|
||||
id: Option<ValidatorIndex>,
|
||||
n_validators: u32,
|
||||
candidate_hash: CandidateHash,
|
||||
available_data: AvailableData,
|
||||
) -> Result<(), Error> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
sender
|
||||
.send_message(AvailabilityStoreMessage::StoreAvailableData(
|
||||
.send_message(AvailabilityStoreMessage::StoreAvailableData {
|
||||
candidate_hash,
|
||||
id,
|
||||
n_validators,
|
||||
available_data,
|
||||
tx,
|
||||
))
|
||||
})
|
||||
.await;
|
||||
|
||||
let _ = rx.await.map_err(Error::StoreAvailableData)?;
|
||||
@@ -321,7 +319,6 @@ async fn store_available_data(
|
||||
// This returns `Err()` iff there is an internal error. Otherwise, it returns either `Ok(Ok(()))` or `Ok(Err(_))`.
|
||||
async fn make_pov_available(
|
||||
sender: &mut JobSender<impl SubsystemSender>,
|
||||
validator_index: Option<ValidatorIndex>,
|
||||
n_validators: usize,
|
||||
pov: Arc<PoV>,
|
||||
candidate_hash: CandidateHash,
|
||||
@@ -347,14 +344,7 @@ async fn make_pov_available(
|
||||
{
|
||||
let _span = span.as_ref().map(|s| s.child("store-data").with_candidate(candidate_hash));
|
||||
|
||||
store_available_data(
|
||||
sender,
|
||||
validator_index,
|
||||
n_validators as u32,
|
||||
candidate_hash,
|
||||
available_data,
|
||||
)
|
||||
.await?;
|
||||
store_available_data(sender, n_validators as u32, candidate_hash, available_data).await?;
|
||||
}
|
||||
|
||||
Ok(Ok(()))
|
||||
@@ -409,7 +399,6 @@ struct BackgroundValidationParams<S: overseer::SubsystemSender<AllMessages>, F>
|
||||
candidate: CandidateReceipt,
|
||||
relay_parent: Hash,
|
||||
pov: PoVData,
|
||||
validator_index: Option<ValidatorIndex>,
|
||||
n_validators: usize,
|
||||
span: Option<jaeger::Span>,
|
||||
make_command: F,
|
||||
@@ -427,7 +416,6 @@ async fn validate_and_make_available(
|
||||
candidate,
|
||||
relay_parent,
|
||||
pov,
|
||||
validator_index,
|
||||
n_validators,
|
||||
span,
|
||||
make_command,
|
||||
@@ -484,7 +472,6 @@ async fn validate_and_make_available(
|
||||
} else {
|
||||
let erasure_valid = make_pov_available(
|
||||
&mut sender,
|
||||
validator_index,
|
||||
n_validators,
|
||||
pov.clone(),
|
||||
candidate.hash(),
|
||||
@@ -719,7 +706,6 @@ impl CandidateBackingJob {
|
||||
candidate: candidate.clone(),
|
||||
relay_parent: self.parent,
|
||||
pov: PoVData::Ready(pov),
|
||||
validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
|
||||
n_validators: self.table_context.validators.len(),
|
||||
span,
|
||||
make_command: ValidatedCandidateCommand::Second,
|
||||
@@ -1033,7 +1019,6 @@ impl CandidateBackingJob {
|
||||
candidate: attesting.candidate,
|
||||
relay_parent: self.parent,
|
||||
pov,
|
||||
validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
|
||||
n_validators: self.table_context.validators.len(),
|
||||
span,
|
||||
make_command: ValidatedCandidateCommand::Attest,
|
||||
|
||||
@@ -336,7 +336,7 @@ fn backing_second_works() {
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::AvailabilityStore(
|
||||
AvailabilityStoreMessage::StoreAvailableData(candidate_hash, _, _, _, tx)
|
||||
AvailabilityStoreMessage::StoreAvailableData { candidate_hash, tx, .. }
|
||||
) if candidate_hash == candidate.hash() => {
|
||||
tx.send(Ok(())).unwrap();
|
||||
}
|
||||
@@ -495,7 +495,7 @@ fn backing_works() {
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::AvailabilityStore(
|
||||
AvailabilityStoreMessage::StoreAvailableData(candidate_hash, _, _, _, tx)
|
||||
AvailabilityStoreMessage::StoreAvailableData { candidate_hash, tx, .. }
|
||||
) if candidate_hash == candidate_a.hash() => {
|
||||
tx.send(Ok(())).unwrap();
|
||||
}
|
||||
@@ -853,7 +853,7 @@ fn backing_misbehavior_works() {
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::AvailabilityStore(
|
||||
AvailabilityStoreMessage::StoreAvailableData(candidate_hash, _, _, _, tx)
|
||||
AvailabilityStoreMessage::StoreAvailableData { candidate_hash, tx, .. }
|
||||
) if candidate_hash == candidate_a.hash() => {
|
||||
tx.send(Ok(())).unwrap();
|
||||
}
|
||||
@@ -1027,7 +1027,7 @@ fn backing_dont_second_invalid() {
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::AvailabilityStore(
|
||||
AvailabilityStoreMessage::StoreAvailableData(candidate_hash, _, _, _, tx)
|
||||
AvailabilityStoreMessage::StoreAvailableData { candidate_hash, tx, .. }
|
||||
) if candidate_hash == candidate_b.hash() => {
|
||||
tx.send(Ok(())).unwrap();
|
||||
}
|
||||
|
||||
@@ -248,13 +248,12 @@ async fn participate(
|
||||
// we dispatch a request to store the available data for the candidate. we
|
||||
// want to maximize data availability for other potential checkers involved
|
||||
// in the dispute
|
||||
ctx.send_message(AvailabilityStoreMessage::StoreAvailableData(
|
||||
ctx.send_message(AvailabilityStoreMessage::StoreAvailableData {
|
||||
candidate_hash,
|
||||
None,
|
||||
n_validators,
|
||||
available_data.clone(),
|
||||
store_available_data_tx,
|
||||
))
|
||||
available_data: available_data.clone(),
|
||||
tx: store_available_data_tx,
|
||||
})
|
||||
.await;
|
||||
|
||||
match store_available_data_rx.await? {
|
||||
|
||||
@@ -150,13 +150,7 @@ async fn fetch_validation_code(virtual_overseer: &mut VirtualOverseer) {
|
||||
async fn store_available_data(virtual_overseer: &mut VirtualOverseer, success: bool) {
|
||||
assert_matches!(
|
||||
virtual_overseer.recv().await,
|
||||
AllMessages::AvailabilityStore(AvailabilityStoreMessage::StoreAvailableData(
|
||||
_,
|
||||
_,
|
||||
_,
|
||||
_,
|
||||
tx,
|
||||
)) => {
|
||||
AllMessages::AvailabilityStore(AvailabilityStoreMessage::StoreAvailableData { tx, .. }) => {
|
||||
if success {
|
||||
tx.send(Ok(())).unwrap();
|
||||
} else {
|
||||
|
||||
@@ -486,17 +486,19 @@ pub enum AvailabilityStoreMessage {
|
||||
tx: oneshot::Sender<Result<(), ()>>,
|
||||
},
|
||||
|
||||
/// Store a `AvailableData` in the AV store.
|
||||
/// If `ValidatorIndex` is present store corresponding chunk also.
|
||||
/// Store a `AvailableData` and all of its chunks in the AV store.
|
||||
///
|
||||
/// Return `Ok(())` if the store operation succeeded, `Err(())` if it failed.
|
||||
StoreAvailableData(
|
||||
CandidateHash,
|
||||
Option<ValidatorIndex>,
|
||||
u32,
|
||||
AvailableData,
|
||||
oneshot::Sender<Result<(), ()>>,
|
||||
),
|
||||
StoreAvailableData {
|
||||
/// A hash of the candidate this `available_data` belongs to.
|
||||
candidate_hash: CandidateHash,
|
||||
/// The number of validators in the session.
|
||||
n_validators: u32,
|
||||
/// The `AvailableData` itself.
|
||||
available_data: AvailableData,
|
||||
/// Sending side of the channel to send result to.
|
||||
tx: oneshot::Sender<Result<(), ()>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl AvailabilityStoreMessage {
|
||||
|
||||
Reference in New Issue
Block a user