Skip to content

Commit

Permalink
refactor(anvil): flip filter expiration timestamp (foundry-rs#2694)
Browse files Browse the repository at this point in the history
* refactor(anvil): flip filter expiration timestamp

* fix: use interval_at
  • Loading branch information
mattsse authored and iFrostizz committed Nov 9, 2022
1 parent 0536a6b commit af2baa9
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
21 changes: 13 additions & 8 deletions anvil/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ use anvil_core::eth::subscription::SubscriptionId;
use anvil_rpc::response::ResponseResult;
use ethers::{
prelude::{Log as EthersLog, H256 as TxHash},
types::FilteredParams,
types::{Filter, FilteredParams},
};
use futures::{channel::mpsc::Receiver, Stream, StreamExt};

use ethers::types::Filter;
use std::{
collections::HashMap,
pin::Pin,
Expand All @@ -23,6 +21,7 @@ use std::{
use tokio::sync::Mutex;
use tracing::{trace, warn};

/// Type alias for filters identified by their id and their expiration timestamp
type FilterMap = Arc<Mutex<HashMap<String, (EthFilter, Instant)>>>;

/// timeout after which to remove an active filter if it wasn't polled since then
Expand All @@ -45,19 +44,19 @@ impl Filters {
let id = new_id();
trace!(target: "node::filter", "Adding new filter id {}", id);
let mut filters = self.active_filters.lock().await;
filters.insert(id.clone(), (filter, Instant::now()));
filters.insert(id.clone(), (filter, self.next_deadline()));
id
}

pub async fn get_filter_changes(&self, id: &str) -> ResponseResult {
{
let mut filters = self.active_filters.lock().await;
if let Some((filter, timestamp)) = filters.get_mut(id) {
if let Some((filter, deadline)) = filters.get_mut(id) {
let resp = filter
.next()
.await
.unwrap_or_else(|| ResponseResult::success(Vec::<()>::new()));
*timestamp = Instant::now();
*deadline = self.next_deadline();
return resp
}
}
Expand Down Expand Up @@ -85,11 +84,17 @@ impl Filters {
self.keepalive
}

/// Returns the timestamp after which a filter should expire
fn next_deadline(&self) -> Instant {
Instant::now() + self.keep_alive()
}

/// Evict all filters that weren't updated and reached there deadline
pub async fn evict(&self) {
trace!(target: "node::filter", "Evicting stale filters");
let deadline = Instant::now() - self.keepalive;
let now = Instant::now();
let mut active_filters = self.active_filters.lock().await;
active_filters.retain(|_, (_, timestamp)| *timestamp > deadline);
active_filters.retain(|_, (_, deadline)| *deadline > now);
}
}

Expand Down
4 changes: 3 additions & 1 deletion anvil/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ impl NodeService {
fee_history: FeeHistoryService,
filters: Filters,
) -> Self {
let start = tokio::time::Instant::now() + filters.keep_alive();
let filter_eviction_interval = tokio::time::interval_at(start, filters.keep_alive());
Self {
pool,
block_producer: BlockProducer::new(backend),
miner,
fee_history,
filter_eviction_interval: tokio::time::interval(filters.keep_alive()),
filter_eviction_interval,
filters,
}
}
Expand Down

0 comments on commit af2baa9

Please sign in to comment.