Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Make sure we remove a peer on disconnect in gossip (#5104)
Browse files Browse the repository at this point in the history
* Make sure we remove peers on disconnect in gossip state machine

* Clear up the code

* Add a comment
  • Loading branch information
bkchr authored Mar 2, 2020
1 parent 65ad8e9 commit 7e383ed
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 16 deletions.
49 changes: 49 additions & 0 deletions client/network-gossip/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ impl<B: BlockT> ConsensusGossip<B> {
let mut context = NetworkContext { gossip: self, network, engine_id: engine_id.clone() };
v.peer_disconnected(&mut context, &who);
}
self.peers.remove(&who);
}

/// Perform periodic maintenance
Expand Down Expand Up @@ -644,4 +645,52 @@ mod tests {
let _ = consensus.live_message_sinks.remove(&([0, 0, 0, 0], topic));
assert_eq!(stream.next(), None);
}

#[test]
fn peer_is_removed_on_disconnect() {
struct TestNetwork;
impl Network<Block> for TestNetwork {
fn event_stream(
&self,
) -> std::pin::Pin<Box<dyn futures::Stream<Item = crate::Event> + Send>> {
unimplemented!("Not required in tests")
}

fn report_peer(&self, _: PeerId, _: crate::ReputationChange) {
unimplemented!("Not required in tests")
}

fn disconnect_peer(&self, _: PeerId) {
unimplemented!("Not required in tests")
}

fn write_notification(&self, _: PeerId, _: crate::ConsensusEngineId, _: Vec<u8>) {
unimplemented!("Not required in tests")
}

fn register_notifications_protocol(
&self,
_: ConsensusEngineId,
_: std::borrow::Cow<'static, [u8]>,
) {
unimplemented!("Not required in tests")
}

fn announce(&self, _: H256, _: Vec<u8>) {
unimplemented!("Not required in tests")
}
}

let mut consensus = ConsensusGossip::<Block>::new();
consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll));

let mut network = TestNetwork;

let peer_id = PeerId::random();
consensus.new_peer(&mut network, peer_id.clone(), Roles::FULL);
assert!(consensus.peers.contains_key(&peer_id));

consensus.peer_disconnected(&mut network, peer_id.clone());
assert!(!consensus.peers.contains_key(&peer_id));
}
}
3 changes: 1 addition & 2 deletions client/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1167,8 +1167,7 @@ impl<B: BlockT> ChainSync<B> {
}

/// Restart the sync process.
fn restart<'a>(&'a mut self) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + 'a
{
fn restart<'a>(&'a mut self) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + 'a {
self.queue_blocks.clear();
self.blocks.clear();
let info = self.client.info();
Expand Down
27 changes: 13 additions & 14 deletions client/network/src/protocol/sync/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ impl<B: BlockT> BlockCollection<B> {
common: NumberFor<B>,
max_parallel: u32,
max_ahead: u32,
) -> Option<Range<NumberFor<B>>>
{
) -> Option<Range<NumberFor<B>>> {
if peer_best <= common {
// Bail out early
return None;
Expand Down Expand Up @@ -165,20 +164,20 @@ impl<B: BlockT> BlockCollection<B> {
pub fn drain(&mut self, from: NumberFor<B>) -> Vec<BlockData<B>> {
let mut drained = Vec::new();
let mut ranges = Vec::new();
{
let mut prev = from;
for (start, range_data) in &mut self.blocks {
match range_data {
&mut BlockRangeState::Complete(ref mut blocks) if *start <= prev => {
prev = *start + (blocks.len() as u32).into();
let mut blocks = mem::replace(blocks, Vec::new());
drained.append(&mut blocks);
ranges.push(*start);
},
_ => break,
}

let mut prev = from;
for (start, range_data) in &mut self.blocks {
match range_data {
&mut BlockRangeState::Complete(ref mut blocks) if *start <= prev => {
prev = *start + (blocks.len() as u32).into();
// Remove all elements from `blocks` and add them to `drained`
drained.append(blocks);
ranges.push(*start);
},
_ => break,
}
}

for r in ranges {
self.blocks.remove(&r);
}
Expand Down

0 comments on commit 7e383ed

Please sign in to comment.