diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index a4f98060b19..beda7fe1bf4 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -711,6 +711,16 @@ impl Receiver { ) -> Poll { self.chan.recv_many(cx, buffer, limit) } + + /// Returns the number of [`Sender`] handles. + pub fn sender_strong_count(&self) -> usize { + self.chan.sender_strong_count() + } + + /// Returns the number of [`WeakSender`] handles. + pub fn sender_weak_count(&self) -> usize { + self.chan.sender_weak_count() + } } impl fmt::Debug for Receiver { diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index d8838242a39..4006aa2b746 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -469,6 +469,14 @@ impl Rx { pub(super) fn semaphore(&self) -> &S { &self.inner.semaphore } + + pub(super) fn sender_strong_count(&self) -> usize { + self.inner.tx_count.load(Acquire) + } + + pub(super) fn sender_weak_count(&self) -> usize { + self.inner.tx_weak_count.load(Relaxed) + } } impl Drop for Rx { diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index a3398c4bf54..47e1b6c7c77 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -348,7 +348,7 @@ impl UnboundedReceiver { /// assert!(!rx.is_closed()); /// /// rx.close(); - /// + /// /// assert!(rx.is_closed()); /// } /// ``` @@ -498,6 +498,16 @@ impl UnboundedReceiver { ) -> Poll { self.chan.recv_many(cx, buffer, limit) } + + /// Returns the number of [`UnboundedSender`] handles. + pub fn sender_strong_count(&self) -> usize { + self.chan.sender_strong_count() + } + + /// Returns the number of [`WeakUnboundedSender`] handles. + pub fn sender_weak_count(&self) -> usize { + self.chan.sender_weak_count() + } } impl UnboundedSender { diff --git a/tokio/tests/sync_mpsc_weak.rs b/tokio/tests/sync_mpsc_weak.rs index 6b7555a5cdd..fba0fe4e33a 100644 --- a/tokio/tests/sync_mpsc_weak.rs +++ b/tokio/tests/sync_mpsc_weak.rs @@ -532,12 +532,13 @@ async fn test_rx_unbounded_is_closed_when_dropping_all_senders_except_weak_sende #[tokio::test] async fn sender_strong_count_when_cloned() { - let (tx, _rx) = mpsc::channel::<()>(1); + let (tx, rx) = mpsc::channel::<()>(1); let tx2 = tx.clone(); assert_eq!(tx.strong_count(), 2); assert_eq!(tx2.strong_count(), 2); + assert_eq!(rx.sender_strong_count(), 2); } #[tokio::test] @@ -552,29 +553,31 @@ async fn sender_weak_count_when_downgraded() { #[tokio::test] async fn sender_strong_count_when_dropped() { - let (tx, _rx) = mpsc::channel::<()>(1); + let (tx, rx) = mpsc::channel::<()>(1); let tx2 = tx.clone(); drop(tx2); assert_eq!(tx.strong_count(), 1); + assert_eq!(rx.sender_strong_count(), 1); } #[tokio::test] async fn sender_weak_count_when_dropped() { - let (tx, _rx) = mpsc::channel::<()>(1); + let (tx, rx) = mpsc::channel::<()>(1); let weak = tx.downgrade(); drop(weak); assert_eq!(tx.weak_count(), 0); + assert_eq!(rx.sender_weak_count(), 0); } #[tokio::test] async fn sender_strong_and_weak_conut() { - let (tx, _rx) = mpsc::channel::<()>(1); + let (tx, rx) = mpsc::channel::<()>(1); let tx2 = tx.clone(); @@ -585,67 +588,75 @@ async fn sender_strong_and_weak_conut() { assert_eq!(tx2.strong_count(), 2); assert_eq!(weak.strong_count(), 2); assert_eq!(weak2.strong_count(), 2); + assert_eq!(rx.sender_strong_count(), 2); assert_eq!(tx.weak_count(), 2); assert_eq!(tx2.weak_count(), 2); assert_eq!(weak.weak_count(), 2); assert_eq!(weak2.weak_count(), 2); + assert_eq!(rx.sender_weak_count(), 2); drop(tx2); drop(weak2); assert_eq!(tx.strong_count(), 1); assert_eq!(weak.strong_count(), 1); + assert_eq!(rx.sender_strong_count(), 1); assert_eq!(tx.weak_count(), 1); assert_eq!(weak.weak_count(), 1); + assert_eq!(rx.sender_weak_count(), 1); } #[tokio::test] async fn unbounded_sender_strong_count_when_cloned() { - let (tx, _rx) = mpsc::unbounded_channel::<()>(); + let (tx, rx) = mpsc::unbounded_channel::<()>(); let tx2 = tx.clone(); assert_eq!(tx.strong_count(), 2); assert_eq!(tx2.strong_count(), 2); + assert_eq!(rx.sender_strong_count(), 2); } #[tokio::test] async fn unbounded_sender_weak_count_when_downgraded() { - let (tx, _rx) = mpsc::unbounded_channel::<()>(); + let (tx, rx) = mpsc::unbounded_channel::<()>(); let weak = tx.downgrade(); assert_eq!(tx.weak_count(), 1); assert_eq!(weak.weak_count(), 1); + assert_eq!(rx.sender_weak_count(), 1); } #[tokio::test] async fn unbounded_sender_strong_count_when_dropped() { - let (tx, _rx) = mpsc::unbounded_channel::<()>(); + let (tx, rx) = mpsc::unbounded_channel::<()>(); let tx2 = tx.clone(); drop(tx2); assert_eq!(tx.strong_count(), 1); + assert_eq!(rx.sender_strong_count(), 1); } #[tokio::test] async fn unbounded_sender_weak_count_when_dropped() { - let (tx, _rx) = mpsc::unbounded_channel::<()>(); + let (tx, rx) = mpsc::unbounded_channel::<()>(); let weak = tx.downgrade(); drop(weak); assert_eq!(tx.weak_count(), 0); + assert_eq!(rx.sender_weak_count(), 0); } #[tokio::test] async fn unbounded_sender_strong_and_weak_conut() { - let (tx, _rx) = mpsc::unbounded_channel::<()>(); + let (tx, rx) = mpsc::unbounded_channel::<()>(); let tx2 = tx.clone(); @@ -656,18 +667,22 @@ async fn unbounded_sender_strong_and_weak_conut() { assert_eq!(tx2.strong_count(), 2); assert_eq!(weak.strong_count(), 2); assert_eq!(weak2.strong_count(), 2); + assert_eq!(rx.sender_strong_count(), 2); assert_eq!(tx.weak_count(), 2); assert_eq!(tx2.weak_count(), 2); assert_eq!(weak.weak_count(), 2); assert_eq!(weak2.weak_count(), 2); + assert_eq!(rx.sender_weak_count(), 2); drop(tx2); drop(weak2); assert_eq!(tx.strong_count(), 1); assert_eq!(weak.strong_count(), 1); + assert_eq!(rx.sender_strong_count(), 1); assert_eq!(tx.weak_count(), 1); assert_eq!(weak.weak_count(), 1); + assert_eq!(rx.sender_weak_count(), 1); }