diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index 0510a7f810..d2fcf0b861 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -83,12 +83,12 @@ pub trait Network: Send + Sync { /// The returned stream will not terminate, so it is required to make sure that the stream is /// dropped when it is not required anymore. Otherwise, it will stick around in memory /// infinitely. - fn checked_statements(&self, relay_parent: Hash) -> Pin>>; + fn checked_statements(&self, relay_parent: Hash) -> Pin + Send>>; } impl Network for polkadot_network::protocol::Service { - fn checked_statements(&self, relay_parent: Hash) -> Pin>> { - polkadot_network::protocol::Service::checked_statements(self, relay_parent) + fn checked_statements(&self, relay_parent: Hash) -> Pin + Send>> { + polkadot_network::protocol::Service::checked_statements(self, relay_parent).boxed() } } @@ -241,12 +241,14 @@ fn build_collator_service( let (service, handles) = service; let spawner = service.spawn_task_handle(); - let polkadot_network = match handles.polkadot_network { - None => return Err( - "Collator cannot run when Polkadot-specific networking has not been started".into() - ), - Some(n) => n, - }; + let polkadot_network = handles.polkadot_network + .ok_or_else(|| "Collator cannot run when Polkadot-specific networking has not been started")?; + + // We don't require this here, but we need to make sure that the validation service is started. + // This service makes sure the collator is joining the correct gossip topics and receives the appropiate + // messages. + handles.validation_service_handle + .ok_or_else(|| "Collator cannot run when validation networking has not been started")?; let client = service.client(); diff --git a/polkadot/network/src/protocol/mod.rs b/polkadot/network/src/protocol/mod.rs index 7368dc338a..d9257650eb 100644 --- a/polkadot/network/src/protocol/mod.rs +++ b/polkadot/network/src/protocol/mod.rs @@ -1375,7 +1375,7 @@ impl Service { /// Take care to drop the stream, as the sending side will not be cleaned /// up until it is. pub fn checked_statements(&self, relay_parent: Hash) - -> Pin>> { + -> impl Stream + Send { let (tx, rx) = oneshot::channel(); let mut sender = self.sender.clone(); @@ -1400,7 +1400,6 @@ impl Service { } }) .flatten_stream() - .boxed() } } diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index ca9af8e329..f27f19e1c7 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -320,9 +320,12 @@ pub fn westend_new_full( /// Handles to other sub-services that full nodes instantiate, which consumers /// of the node may use. #[cfg(feature = "full-node")] +#[derive(Default)] pub struct FullNodeHandles { /// A handle to the Polkadot networking protocol. pub polkadot_network: Option, + /// A handle to the validation service. + pub validation_service_handle: Option, } /// Builds a new service for a full client. @@ -389,7 +392,7 @@ pub fn new_full( let client = service.client(); let known_oracle = client.clone(); - let mut handles = FullNodeHandles { polkadot_network: None }; + let mut handles = FullNodeHandles::default(); let select_chain = if let Some(select_chain) = service.select_chain() { select_chain } else { @@ -427,7 +430,7 @@ pub fn new_full( service.spawn_task_handle(), ).map_err(|e| format!("Could not spawn network worker: {:?}", e))?; - if let Role::Authority { .. } = &role { + let authority_handles = if is_collator || role.is_authority() { let availability_store = { use std::path::PathBuf; @@ -464,6 +467,18 @@ pub fn new_full( service.spawn_essential_task("validation-service", Box::pin(validation_service)); + handles.validation_service_handle = Some(validation_service_handle.clone()); + + Some((validation_service_handle, availability_store)) + } else { + None + }; + + if role.is_authority() { + let (validation_service_handle, availability_store) = authority_handles + .clone() + .expect("Authority handles are set for authority nodes; qed"); + let proposer = consensus::ProposerFactory::new( client.clone(), service.transaction_pool(),