mirror of
https://github.com/pezkuwichain/revive-differential-tests.git
synced 2026-05-01 05:17:58 +00:00
74fdeb4a2e
* Implement a solution for the pre-fund account limit * Update the account pre-funding handling * Fix the lighthouse node tracing issue * refactor existing dt infra * Implement the platform driver * Wire up the cleaned up driver implementation * Implement the core benchmarking components * Remove some debug logging * Fix issues in the benchmarks driver * Implement a global concurrency limit on provider requests * Update the concurrency limit * Update the concurrency limit * Cleanups * Update the lighthouse ports * Ignore certain tests * Update the new geth test
70 lines
1.7 KiB
Rust
70 lines
1.7 KiB
Rust
use std::sync::Arc;
|
|
|
|
use alloy::transports::BoxFuture;
|
|
use tokio::sync::Semaphore;
|
|
use tower::{Layer, Service};
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct ConcurrencyLimiterLayer {
|
|
semaphore: Arc<Semaphore>,
|
|
}
|
|
|
|
impl ConcurrencyLimiterLayer {
|
|
pub fn new(permit_count: usize) -> Self {
|
|
Self {
|
|
semaphore: Arc::new(Semaphore::new(permit_count)),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<S> Layer<S> for ConcurrencyLimiterLayer {
|
|
type Service = ConcurrencyLimiterService<S>;
|
|
|
|
fn layer(&self, inner: S) -> Self::Service {
|
|
ConcurrencyLimiterService {
|
|
service: inner,
|
|
semaphore: self.semaphore.clone(),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct ConcurrencyLimiterService<S> {
|
|
service: S,
|
|
semaphore: Arc<Semaphore>,
|
|
}
|
|
|
|
impl<S, Request> Service<Request> for ConcurrencyLimiterService<S>
|
|
where
|
|
S: Service<Request> + Send,
|
|
S::Future: Send + 'static,
|
|
{
|
|
type Response = S::Response;
|
|
type Error = S::Error;
|
|
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
|
|
|
fn poll_ready(
|
|
&mut self,
|
|
cx: &mut std::task::Context<'_>,
|
|
) -> std::task::Poll<Result<(), Self::Error>> {
|
|
self.service.poll_ready(cx)
|
|
}
|
|
|
|
fn call(&mut self, req: Request) -> Self::Future {
|
|
let semaphore = self.semaphore.clone();
|
|
let future = self.service.call(req);
|
|
|
|
Box::pin(async move {
|
|
let _permit = semaphore
|
|
.acquire()
|
|
.await
|
|
.expect("Semaphore has been closed");
|
|
tracing::debug!(
|
|
available_permits = semaphore.available_permits(),
|
|
"Acquired Semaphore Permit"
|
|
);
|
|
future.await
|
|
})
|
|
}
|
|
}
|