limit retry count in OnDemand (#513)

This commit is contained in:
Svyatoslav Nikolsky
2018-09-05 12:43:39 +03:00
committed by Gav Wood
parent 2ea89556b4
commit 951fbd1f3b
5 changed files with 128 additions and 14 deletions
@@ -196,6 +196,7 @@ impl<Block, S, F, H, C> StateBackend<H, C> for OnDemandState<Block, S, F>
block: self.block,
header: header.expect("if block above guarantees that header is_some(); qed"),
key: key.to_vec(),
retry_count: None,
})
.into_future().wait()
}
@@ -101,6 +101,7 @@ impl<S, F, Block> BlockchainHeaderBackend<Block> for Blockchain<S, F> where Bloc
.remote_header(RemoteHeaderRequest {
cht_root: self.storage.cht_root(cht::SIZE, number)?,
block: number,
retry_count: None,
})
.into_future().wait()
.map(Some)
@@ -73,6 +73,7 @@ impl<B, F, Block> CallExecutor<Block, KeccakHasher, RlpCodec> for RemoteCallExec
header: block_header,
method: method.into(),
call_data: call_data.to_vec(),
retry_count: None,
}).into_future().wait()
}
@@ -168,6 +169,7 @@ mod tests {
},
method: "authorities".into(),
call_data: vec![],
retry_count: None,
}, remote_execution_proof).unwrap();
}
}
@@ -43,6 +43,8 @@ pub struct RemoteCallRequest<Header: HeaderT> {
pub method: String,
/// Call data.
pub call_data: Vec<u8>,
/// Number of times to retry request. None means that default RETRY_COUNT is used.
pub retry_count: Option<usize>,
}
/// Remote canonical header request.
@@ -52,6 +54,8 @@ pub struct RemoteHeaderRequest<Header: HeaderT> {
pub cht_root: Header::Hash,
/// Number of the header to query.
pub block: Header::Number,
/// Number of times to retry request. None means that default RETRY_COUNT is used.
pub retry_count: Option<usize>,
}
/// Remote storage read request.
@@ -63,6 +67,8 @@ pub struct RemoteReadRequest<Header: HeaderT> {
pub header: Header,
/// Storage key to read.
pub key: Vec<u8>,
/// Number of times to retry request. None means that default RETRY_COUNT is used.
pub retry_count: Option<usize>,
}
/// Light client data fetcher. Implementations of this trait must check if remote data
@@ -264,6 +270,7 @@ pub mod tests {
block: remote_block_header.hash(),
header: remote_block_header,
key: b":auth:len".to_vec(),
retry_count: None,
}, remote_read_proof).unwrap().unwrap()[0], authorities_len as u8);
}
@@ -273,6 +280,7 @@ pub mod tests {
assert_eq!((&local_checker as &FetchChecker<Block>).check_header_proof(&RemoteHeaderRequest::<Header> {
cht_root: local_cht_root,
block: 1,
retry_count: None,
}, Some(remote_block_header.clone()), remote_header_proof).unwrap(), remote_block_header);
}
@@ -283,6 +291,7 @@ pub mod tests {
assert!((&local_checker as &FetchChecker<Block>).check_header_proof(&RemoteHeaderRequest::<Header> {
cht_root: Default::default(),
block: 1,
retry_count: None,
}, Some(remote_block_header.clone()), remote_header_proof).is_err());
}
@@ -293,6 +302,7 @@ pub mod tests {
assert!((&local_checker as &FetchChecker<Block>).check_header_proof(&RemoteHeaderRequest::<Header> {
cht_root: local_cht_root,
block: 1,
retry_count: None,
}, Some(remote_block_header.clone()), remote_header_proof).is_err());
}
}
+114 -14
View File
@@ -35,6 +35,8 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
/// Remote request timeout.
const REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
/// Default request retry count.
const RETRY_COUNT: usize = 1;
/// On-demand service API.
pub trait OnDemandService<Block: BlockT>: Send + Sync {
@@ -85,6 +87,7 @@ struct OnDemandCore<B: BlockT, E: service::ExecuteInContext<B>> {
struct Request<Block: BlockT> {
id: u64,
timestamp: Instant,
retry_count: usize,
data: RequestData<Block>,
}
@@ -139,9 +142,9 @@ impl<B: BlockT, E> OnDemand<B, E> where
}
/// Schedule && dispatch all scheduled requests.
fn schedule_request<R>(&self, data: RequestData<B>, result: R) -> R {
fn schedule_request<R>(&self, retry_count: Option<usize>, data: RequestData<B>, result: R) -> R {
let mut core = self.core.lock();
core.insert(data);
core.insert(retry_count.unwrap_or(RETRY_COUNT), data);
core.dispatch();
result
}
@@ -158,21 +161,31 @@ impl<B: BlockT, E> OnDemand<B, E> where
},
};
let retry_request_data = match try_accept(request) {
Accept::Ok => None,
let retry_count = request.retry_count;
let (retry_count, retry_request_data) = match try_accept(request) {
Accept::Ok => (retry_count, None),
Accept::CheckFailed(error, retry_request_data) => {
io.report_peer(peer, Severity::Bad(&format!("Failed to check remote {} response from peer: {}", rtype, error)));
core.remove_peer(peer);
Some(retry_request_data)
if retry_count > 0 {
(retry_count - 1, Some(retry_request_data))
} else {
trace!(target: "sync", "Failed to get remote {} response for given number of retries", rtype);
retry_request_data.fail(client::error::ErrorKind::RemoteFetchFailed.into());
(0, None)
}
},
Accept::Unexpected(retry_request_data) => {
trace!(target: "sync", "Unexpected response to remote {} from peer {}", rtype, peer);
Some(retry_request_data)
io.report_peer(peer, Severity::Bad(&format!("Unexpected response to remote {} from peer", rtype)));
core.remove_peer(peer);
(retry_count, Some(retry_request_data))
},
};
if let Some(request_data) = retry_request_data {
core.insert(request_data);
core.insert(retry_count, request_data);
}
core.dispatch();
@@ -262,19 +275,19 @@ impl<B, E> Fetcher<B> for OnDemand<B, E> where
fn remote_header(&self, request: RemoteHeaderRequest<B::Header>) -> Self::RemoteHeaderResult {
let (sender, receiver) = channel();
self.schedule_request(RequestData::RemoteHeader(request, sender),
self.schedule_request(request.retry_count.clone(), RequestData::RemoteHeader(request, sender),
RemoteResponse { receiver })
}
fn remote_read(&self, request: RemoteReadRequest<B::Header>) -> Self::RemoteReadResult {
let (sender, receiver) = channel();
self.schedule_request(RequestData::RemoteRead(request, sender),
self.schedule_request(request.retry_count.clone(), RequestData::RemoteRead(request, sender),
RemoteResponse { receiver })
}
fn remote_call(&self, request: RemoteCallRequest<B::Header>) -> Self::RemoteCallResult {
let (sender, receiver) = channel();
self.schedule_request(RequestData::RemoteCall(request, sender),
self.schedule_request(request.retry_count.clone(), RequestData::RemoteCall(request, sender),
RemoteResponse { receiver })
}
}
@@ -314,13 +327,14 @@ impl<B, E> OnDemandCore<B, E> where
}
}
pub fn insert(&mut self, data: RequestData<B>) {
pub fn insert(&mut self, retry_count: usize, data: RequestData<B>) {
let request_id = self.next_request_id;
self.next_request_id += 1;
self.pending_requests.push_back(Request {
id: request_id,
timestamp: Instant::now(),
retry_count,
data,
});
}
@@ -385,6 +399,17 @@ impl<Block: BlockT> Request<Block> {
}
}
impl<Block: BlockT> RequestData<Block> {
pub fn fail(self, error: client::error::Error) {
// don't care if anyone is listening
match self {
RequestData::RemoteHeader(_, sender) => { let _ = sender.send(Err(error)); },
RequestData::RemoteCall(_, sender) => { let _ = sender.send(Err(error)); },
RequestData::RemoteRead(_, sender) => { let _ = sender.send(Err(error)); },
}
}
}
#[cfg(test)]
pub mod tests {
use std::collections::VecDeque;
@@ -503,6 +528,7 @@ pub mod tests {
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: None,
});
assert_eq!(vec![1], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>());
assert_eq!(vec![0], on_demand.core.lock().active_peers.keys().cloned().collect::<Vec<_>>());
@@ -526,6 +552,7 @@ pub mod tests {
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: None,
});
receive_call_response(&*on_demand, &mut network, 0, 1);
assert!(network.to_disconnect.contains(&0));
@@ -542,6 +569,7 @@ pub mod tests {
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: Some(1),
});
on_demand.on_connect(0, Roles::FULL);
@@ -561,6 +589,72 @@ pub mod tests {
assert!(network.to_disconnect.contains(&0));
}
#[test]
fn disconnects_from_peer_on_wrong_response_type() {
let (_x, on_demand) = dummy(false);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);
on_demand.remote_call(RemoteCallRequest {
block: Default::default(),
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: Some(1),
});
on_demand.on_remote_read_response(&mut network, 0, message::RemoteReadResponse {
id: 0,
proof: vec![vec![2]],
});
assert!(network.to_disconnect.contains(&0));
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
}
#[test]
fn receives_remote_failure_after_retry_count_failures() {
use parking_lot::{Condvar, Mutex};
let retry_count = 2;
let (_x, on_demand) = dummy(false);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
for i in 0..retry_count+1 {
on_demand.on_connect(i, Roles::FULL);
}
let sync = Arc::new((Mutex::new(0), Mutex::new(0), Condvar::new()));
let thread_sync = sync.clone();
let response = on_demand.remote_call(RemoteCallRequest {
block: Default::default(),
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: Some(retry_count)
});
let thread = ::std::thread::spawn(move || {
let &(ref current, ref finished_at, ref finished) = &*thread_sync;
let _ = response.wait().unwrap_err();
*finished_at.lock() = *current.lock();
finished.notify_one();
});
let &(ref current, ref finished_at, ref finished) = &*sync;
for i in 0..retry_count+1 {
let mut current = current.lock();
*current = *current + 1;
receive_call_response(&*on_demand, &mut network, i, i as u64);
}
let mut finished_at = finished_at.lock();
assert!(!finished.wait_for(&mut finished_at, ::std::time::Duration::from_millis(1000)).timed_out());
assert_eq!(*finished_at, retry_count + 1);
thread.join().unwrap();
}
#[test]
fn receives_remote_call_response() {
let (_x, on_demand) = dummy(true);
@@ -573,6 +667,7 @@ pub mod tests {
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: None,
});
let thread = ::std::thread::spawn(move || {
let result = response.wait().unwrap();
@@ -593,7 +688,8 @@ pub mod tests {
let response = on_demand.remote_read(RemoteReadRequest {
header: dummy_header(),
block: Default::default(),
key: b":key".to_vec()
key: b":key".to_vec(),
retry_count: None,
});
let thread = ::std::thread::spawn(move || {
let result = response.wait().unwrap();
@@ -614,7 +710,11 @@ pub mod tests {
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);
let response = on_demand.remote_header(RemoteHeaderRequest { cht_root: Default::default(), block: 1 });
let response = on_demand.remote_header(RemoteHeaderRequest {
cht_root: Default::default(),
block: 1,
retry_count: None,
});
let thread = ::std::thread::spawn(move || {
let result = response.wait().unwrap();
assert_eq!(result.hash(), "80729accb7bb10ff9c637a10e8bb59f21c52571aa7b46544c5885ca89ed190f4".into());