Skip to content

Commit

Permalink
Stream::size_hint for mpsc channels (#2660)
Browse files Browse the repository at this point in the history
  • Loading branch information
stepancheg authored and taiki-e committed Jan 30, 2023
1 parent 09a49dc commit 64eb689
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 0 deletions.
24 changes: 24 additions & 0 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,14 @@ impl<T> Stream for Receiver<T> {
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
if let Some(inner) = &self.inner {
decode_state(inner.state.load(SeqCst)).size_hint()
} else {
(0, Some(0))
}
}
}

impl<T> Drop for Receiver<T> {
Expand Down Expand Up @@ -1222,6 +1230,14 @@ impl<T> Stream for UnboundedReceiver<T> {
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
if let Some(inner) = &self.inner {
decode_state(inner.state.load(SeqCst)).size_hint()
} else {
(0, Some(0))
}
}
}

impl<T> Drop for UnboundedReceiver<T> {
Expand Down Expand Up @@ -1312,6 +1328,14 @@ impl State {
fn is_closed(&self) -> bool {
!self.is_open && self.num_messages == 0
}

fn size_hint(&self) -> (usize, Option<usize>) {
if self.is_open {
(self.num_messages, None)
} else {
(self.num_messages, Some(self.num_messages))
}
}
}

/*
Expand Down
40 changes: 40 additions & 0 deletions futures-channel/tests/mpsc-size_hint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use futures::channel::mpsc;
use futures::stream::Stream;

#[test]
fn unbounded_size_hint() {
let (tx, mut rx) = mpsc::unbounded::<u32>();
assert_eq!((0, None), rx.size_hint());
tx.unbounded_send(1).unwrap();
assert_eq!((1, None), rx.size_hint());
rx.try_next().unwrap().unwrap();
assert_eq!((0, None), rx.size_hint());
tx.unbounded_send(2).unwrap();
tx.unbounded_send(3).unwrap();
assert_eq!((2, None), rx.size_hint());
drop(tx);
assert_eq!((2, Some(2)), rx.size_hint());
rx.try_next().unwrap().unwrap();
assert_eq!((1, Some(1)), rx.size_hint());
rx.try_next().unwrap().unwrap();
assert_eq!((0, Some(0)), rx.size_hint());
}

#[test]
fn channel_size_hint() {
let (mut tx, mut rx) = mpsc::channel::<u32>(10);
assert_eq!((0, None), rx.size_hint());
tx.try_send(1).unwrap();
assert_eq!((1, None), rx.size_hint());
rx.try_next().unwrap().unwrap();
assert_eq!((0, None), rx.size_hint());
tx.try_send(2).unwrap();
tx.try_send(3).unwrap();
assert_eq!((2, None), rx.size_hint());
drop(tx);
assert_eq!((2, Some(2)), rx.size_hint());
rx.try_next().unwrap().unwrap();
assert_eq!((1, Some(1)), rx.size_hint());
rx.try_next().unwrap().unwrap();
assert_eq!((0, Some(0)), rx.size_hint());
}

0 comments on commit 64eb689

Please sign in to comment.