Skip to content

Commit

Permalink
finish tests
Browse files Browse the repository at this point in the history
  • Loading branch information
fl0rek committed Aug 13, 2024
1 parent 8dcc18f commit 4df9b96
Showing 1 changed file with 92 additions and 26 deletions.
118 changes: 92 additions & 26 deletions node/src/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::syncer::SYNCING_WINDOW;

// pruning window is 1 hour behind syncing window
const PRUNING_WINDOW: Duration = SYNCING_WINDOW.saturating_add(Duration::from_secs(60 * 60));
const PRUNING_INTERVAL: Duration = Duration::from_secs(60);
const PRUNING_INTERVAL: Duration = Duration::from_secs(12);

type Result<T, E = PrunerError> = std::result::Result<T, E>;

Expand Down Expand Up @@ -185,7 +185,9 @@ mod test {
use crate::blockstore::InMemoryBlockstore;
use crate::events::{EventChannel, TryRecvError};
use crate::store::{InMemoryStore, SamplingStatus};
use crate::test_utils::{async_test, gen_filled_store, new_block_ranges, ExtendedHeaderGeneratorExt};
use crate::test_utils::{
async_test, gen_filled_store, new_block_ranges, ExtendedHeaderGeneratorExt,
};

const TEST_CODEC: u64 = 0x0D;
const TEST_MH_CODE: u64 = 0x0D;
Expand Down Expand Up @@ -239,26 +241,46 @@ mod test {
event_subscriber.try_recv().unwrap_err(),
TryRecvError::Empty
));
assert_eq!(store.get_stored_header_ranges().await.unwrap(), new_block_ranges([1..=100]));
assert_eq!(
store.get_stored_header_ranges().await.unwrap(),
new_block_ranges([1..=100])
);
}

#[async_test]
async fn prune_large_tail() {
async fn prune_large_tail_with_cids() {
let events = EventChannel::new();
let store = Arc::new(InMemoryStore::new());
let mut gen = ExtendedHeaderGenerator::new();

let blockstore = Arc::new(InMemoryBlockstore::new());
let mut event_subscriber = events.subscribe();

let first_header_time = (Time::now() - (PRUNING_WINDOW + Duration::from_secs(30 * 24 * 60 * 60))).unwrap();
let first_header_time =
(Time::now() - (PRUNING_WINDOW + Duration::from_secs(30 * 24 * 60 * 60))).unwrap();
gen.set_time(first_header_time, Duration::from_secs(1));

let blocks_with_sampling = (1..=1000)
.map(|height| {
let block = TestBlock::from(height);
let sampling_status = match height % 3 {
0 => SamplingStatus::Unknown,
1 => SamplingStatus::Accepted,
2 => SamplingStatus::Accepted,
_ => unreachable!(),
};
(height, block, block.cid().unwrap(), sampling_status)
})
.collect::<Vec<_>>();

store.insert(gen.next_many_verified(1_000)).await.unwrap();
for i in 1..=1000 {
let block = TestBlock::from(i);
blockstore.put(block).await.unwrap();
store.update_sampling_metadata(i, SamplingStatus::Unknown, vec![block.cid().unwrap()]).await.unwrap();

for (height, block, cid, status) in &blocks_with_sampling {
blockstore.put_keyed(cid, block.data()).await.unwrap();
store
.update_sampling_metadata(*height, *status, vec![*cid])
.await
.unwrap()
}

let pruner = Pruner::start(PrunerArgs {
Expand All @@ -270,49 +292,93 @@ mod test {
sleep(Duration::from_secs(1)).await;

let pruner_event = event_subscriber.recv().await.unwrap().event;
assert!(matches!(pruner_event, NodeEvent::PrunedHeaders { to_height: 1_000 }));
assert!(matches!(
pruner_event,
NodeEvent::PrunedHeaders { to_height: 1_000 }
));
assert!(store.get_stored_header_ranges().await.unwrap().is_empty());

for i in 1..=1000 {
let block = TestBlock::from(i);
assert!(!blockstore.has(&block.cid().unwrap()).await.unwrap());
for (height, _, cid, _) in &blocks_with_sampling {
assert!(matches!(
store.get_sampling_metadata(*height).await.unwrap_err(),
StoreError::NotFound
));
assert!(!blockstore.has(cid).await.unwrap());
}

pruner.stop();

assert!(matches!(
event_subscriber.try_recv().unwrap_err(),
TryRecvError::Empty
));
}

//#[async_test]
#[async_test]
async fn prune_tail() {
const BLOCK_TIME: Duration = Duration::from_millis(10);

let events = EventChannel::new();
let store = Arc::new(InMemoryStore::new());
let mut gen = ExtendedHeaderGenerator::new();
// TODO: insert cids
let blockstore = Arc::new(InMemoryBlockstore::new());
let mut event_subscriber = events.subscribe();

let first_header_time = (Time::now() - (PRUNING_WINDOW + Duration::from_secs(20 * 10 + 5))).unwrap();
// large block time to avoid timing issues
gen.set_time(first_header_time, Duration::from_secs(10));
// 50 headers before pruning window edge
let before_pruning_edge = (Time::now() - (PRUNING_WINDOW + BLOCK_TIME * 100)).unwrap();
gen.set_time(before_pruning_edge, BLOCK_TIME);
store.insert(gen.next_many_verified(50)).await.unwrap();

// 10 headers within 1sec of pruning window edge
let after_pruning_edge = (Time::now() - (PRUNING_WINDOW - BLOCK_TIME * 10)).unwrap();
gen.set_time(after_pruning_edge, BLOCK_TIME);
store.insert(gen.next_many_verified(10)).await.unwrap();

// 20 before and 20 after pruning window
store.insert(gen.next_many_verified(40)).await.unwrap();
// 50 headers at current time
gen.set_time(Time::now(), BLOCK_TIME);
store.insert(gen.next_many_verified(10)).await.unwrap();

let _pruner = Pruner::start(PrunerArgs {
let pruner = Pruner::start(PrunerArgs {
store: store.clone(),
blockstore,
event_pub: events.publisher(),
});

sleep(Duration::from_secs(1)).await;

let pruner_event = event_subscriber.recv().await.unwrap().event;
assert!(matches!(pruner_event, NodeEvent::PrunedHeaders { to_height: 20 }));
assert_eq!(store.get_stored_header_ranges().await.unwrap(), new_block_ranges([21..=40]));
assert!(matches!(
pruner_event,
NodeEvent::PrunedHeaders { to_height: 50 }
));
assert_eq!(
store.get_stored_header_ranges().await.unwrap(),
new_block_ranges([51..=70])
);

// TODO: oof, long sleep in tests
sleep(Duration::from_secs(63)).await;
// sleep needs to be longer than PRUNING_INTERVAL for pruner to wake up
sleep(Duration::from_secs(12)).await;

let pruner_event = event_subscriber.try_recv().unwrap().event;
assert!(matches!(pruner_event, NodeEvent::PrunedHeaders { to_height: 26 }));
assert_eq!(store.get_stored_header_ranges().await.unwrap(), new_block_ranges([27..=40]));
assert!(matches!(
pruner_event,
NodeEvent::PrunedHeaders { to_height: 60 }
));
assert_eq!(
store.get_stored_header_ranges().await.unwrap(),
new_block_ranges([61..=70])
);

pruner.stop();
assert!(matches!(
event_subscriber.try_recv().unwrap_err(),
TryRecvError::Empty
));
assert_eq!(
store.get_stored_header_ranges().await.unwrap(),
new_block_ranges([61..=70])
);
}

#[derive(Debug, PartialEq, Clone, Copy)]
Expand Down

0 comments on commit 4df9b96

Please sign in to comment.