Skip to content

Commit

Permalink
feat: Header pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
fl0rek committed Aug 9, 2024
1 parent d3e255d commit dea37a1
Show file tree
Hide file tree
Showing 10 changed files with 416 additions and 26 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ resolver = "2"
members = ["cli", "node", "node-wasm", "proto", "rpc", "types"]

[workspace.dependencies]
blockstore = "0.5.0"
blockstore = "0.6.0"
lumina-node = { version = "0.2.0", path = "node" }
lumina-node-wasm = { version = "0.1.1", path = "node-wasm" }
celestia-proto = { version = "0.2.0", path = "proto" }
Expand All @@ -16,6 +16,7 @@ celestia-tendermint-proto = "0.32.1"

[patch.crates-io]
# Uncomment to apply local changes
#beetswap = { path = "../beetswap" }
#blockstore = { path = "../blockstore" }
#celestia-tendermint = { path = "../celestia-tendermint-rs/tendermint" }
#celestia-tendermint-proto = { path = "../celestia-tendermint-rs/proto" }
Expand Down
4 changes: 2 additions & 2 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ libp2p = { workspace = true, features = [
] }

async-trait = "0.1.80"
beetswap = "0.1.1"
beetswap = "0.2.0"
cid = { version = "0.11.1", features = ["serde-codec"] }
dashmap = "5.5.3"
futures = "0.3.30"
Expand Down Expand Up @@ -72,7 +72,7 @@ rustls-pki-types = "1.7.0"

[target.'cfg(target_arch = "wasm32")'.dependencies]
backoff = { version = "0.4.0", features = ["wasm-bindgen"] }
beetswap = { version = "0.1.1", features = ["wasm-bindgen"] }
beetswap = { version = "0.2.0", features = ["wasm-bindgen"] }
blockstore = { workspace = true, features = ["indexeddb"] }
celestia-types = { workspace = true, features = ["wasm-bindgen"] }
getrandom = { version = "0.2.15", features = ["js"] }
Expand Down
22 changes: 21 additions & 1 deletion node/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,18 @@ pub enum NodeEvent {
error: String,
},

/// Pruned headers up to and including specified height.
PrunedHeaders {
/// Last header height that was pruned
to_height: u64,
},

/// Pruning fatal error.
FatalPrunerError {
/// A human readable error.
error: String,
},

/// Network was compromised.
///
/// This happens when a valid bad encoding fraud proof is received.
Expand All @@ -296,6 +308,7 @@ impl NodeEvent {
match self {
NodeEvent::FatalDaserError { .. }
| NodeEvent::FatalSyncerError { .. }
| NodeEvent::FatalPrunerError { .. }
| NodeEvent::FetchingHeadersFailed { .. }
| NodeEvent::NetworkCompromised => true,
NodeEvent::ConnectingToBootnodes
Expand All @@ -308,7 +321,8 @@ impl NodeEvent {
| NodeEvent::FetchingHeadHeaderStarted
| NodeEvent::FetchingHeadHeaderFinished { .. }
| NodeEvent::FetchingHeadersStarted { .. }
| NodeEvent::FetchingHeadersFinished { .. } => false,
| NodeEvent::FetchingHeadersFinished { .. }
| NodeEvent::PrunedHeaders { .. } => false,
}
}
}
Expand Down Expand Up @@ -413,6 +427,12 @@ impl fmt::Display for NodeEvent {
NodeEvent::FatalSyncerError { error } => {
write!(f, "Syncer stopped because of a fatal error: {error}")
}
Self::PrunedHeaders { to_height } => {
write!(f, "Pruned headers up to and including {to_height}")
}
NodeEvent::FatalPrunerError { error } => {
write!(f, "Pruner stopped because of a fatal error: {error}")
}
NodeEvent::NetworkCompromised => {
write!(f, "The network is compromised and should not be trusted. ")?;
write!(f, "Node stopped synchronizing and sampling, but you can still make some queries to the network.")
Expand Down
1 change: 1 addition & 0 deletions node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod network;
pub mod node;
mod p2p;
mod peer_tracker;
mod pruner;
pub mod store;
mod syncer;
#[cfg(any(test, feature = "test-utils"))]
Expand Down
14 changes: 13 additions & 1 deletion node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::daser::{Daser, DaserArgs};
use crate::events::{EventChannel, EventSubscriber, NodeEvent};
use crate::executor::spawn;
use crate::p2p::{P2p, P2pArgs};
use crate::pruner::{Pruner, PrunerArgs};
use crate::store::{SamplingMetadata, Store, StoreError};
use crate::syncer::{Syncer, SyncerArgs};

Expand Down Expand Up @@ -90,6 +91,7 @@ where
store: Arc<S>,
syncer: Arc<Syncer<S>>,
_daser: Arc<Daser>,
_pruner: Arc<Pruner>,
tasks_cancellation_token: CancellationToken,
}

Expand Down Expand Up @@ -117,14 +119,15 @@ where
let event_channel = EventChannel::new();
let event_sub = event_channel.subscribe();
let store = Arc::new(config.store);
let blockstore = Arc::new(config.blockstore);

let p2p = Arc::new(
P2p::start(P2pArgs {
network_id: config.network_id,
local_keypair: config.p2p_local_keypair,
bootnodes: config.p2p_bootnodes,
listen_on: config.p2p_listen_on,
blockstore: config.blockstore,
blockstore: blockstore.clone(),
store: store.clone(),
event_pub: event_channel.publisher(),
})
Expand All @@ -144,13 +147,20 @@ where
event_pub: event_channel.publisher(),
})?);

let pruner = Arc::new(Pruner::start(PrunerArgs {
store: store.clone(),
blockstore,
event_pub: event_channel.publisher(),
}));

// spawn the task that will stop the services when the fraud is detected
let network_compromised_token = p2p.get_network_compromised_token().await?;
let tasks_cancellation_token = CancellationToken::new();

spawn({
let syncer = syncer.clone();
let daser = daser.clone();
let pruner = pruner.clone();
let tasks_cancellation_token = tasks_cancellation_token.child_token();
let event_pub = event_channel.publisher();

Expand All @@ -160,6 +170,7 @@ where
_ = network_compromised_token.cancelled() => {
syncer.stop();
daser.stop();
pruner.stop();

if event_pub.has_subscribers() {
event_pub.send(NodeEvent::NetworkCompromised);
Expand All @@ -179,6 +190,7 @@ where
store,
syncer,
_daser: daser,
_pruner: pruner,
tasks_cancellation_token,
};

Expand Down
10 changes: 7 additions & 3 deletions node/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ where
/// List of the addresses on which to listen for incoming connections.
pub listen_on: Vec<Multiaddr>,
/// The store for headers.
pub blockstore: B,
pub blockstore: Arc<B>,
/// The store for headers.
pub store: Arc<S>,
/// Event publisher.
Expand Down Expand Up @@ -624,7 +624,11 @@ where
let gossipsub = init_gossipsub(&args, [&header_sub_topic, &bad_encoding_fraud_sub_topic])?;

let kademlia = init_kademlia(&args)?;
let bitswap = init_bitswap(args.blockstore, args.store.clone(), &args.network_id)?;
let bitswap = init_bitswap(
args.blockstore.clone(),
args.store.clone(),
&args.network_id,
)?;

let header_ex = HeaderExBehaviour::new(HeaderExConfig {
network_id: &args.network_id,
Expand Down Expand Up @@ -1200,7 +1204,7 @@ where
}

fn init_bitswap<B, S>(
blockstore: B,
blockstore: Arc<B>,
store: Arc<S>,
network_id: &str,
) -> Result<beetswap::Behaviour<MAX_MH_SIZE, B>>
Expand Down
Loading

0 comments on commit dea37a1

Please sign in to comment.