diff options
author | DongHun Kwak <dh0128.kwak@samsung.com> | 2023-03-03 13:39:32 +0900 |
---|---|---|
committer | DongHun Kwak <dh0128.kwak@samsung.com> | 2023-03-03 13:39:32 +0900 |
commit | 751f2b5ccba90348ef9b058be4cfda9093541531 (patch) | |
tree | 00406def7d46c1a843032dd89b8292fa1d423750 | |
download | rust-async-channel-upstream.tar.gz rust-async-channel-upstream.tar.bz2 rust-async-channel-upstream.zip |
Import async-channel 1.8.0upstream/1.8.0upstream
-rw-r--r-- | .cargo_vcs_info.json | 6 | ||||
-rw-r--r-- | CHANGELOG.md | 62 | ||||
-rw-r--r-- | Cargo.toml | 48 | ||||
-rw-r--r-- | Cargo.toml.orig | 24 | ||||
-rw-r--r-- | LICENSE-APACHE | 201 | ||||
-rw-r--r-- | LICENSE-MIT | 23 | ||||
-rw-r--r-- | README.md | 51 | ||||
-rw-r--r-- | src/lib.rs | 1194 | ||||
-rw-r--r-- | tests/bounded.rs | 482 | ||||
-rw-r--r-- | tests/unbounded.rs | 343 |
10 files changed, 2434 insertions, 0 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json new file mode 100644 index 0000000..0ba30ee --- /dev/null +++ b/.cargo_vcs_info.json @@ -0,0 +1,6 @@ +{ + "git": { + "sha1": "c273864b03556158b7f75df8bbf2ea97f1e7dbef" + }, + "path_in_vcs": "" +}
\ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..26f862a --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,62 @@ +# Version 1.8.0 + +- Prevent deadlock if sender/receiver is forgotten (#49) +- Add weak sender and receiver (#51) +- Update `concurrent-queue` to v2 (#50) + +# Version 1.7.1 + +- Work around MSRV increase due to a cargo bug. + +# Version 1.7.0 + +- Add `send_blocking` and `recv_blocking` (#47) + +# Version 1.6.1 + +- Make `send` return `Send` (#34) + +# Version 1.6.0 + +- Added `Send` and `Recv` futures (#33) +- impl `FusedStream` for `Receiver` (#30) + +# Version 1.5.1 + +- Fix typos in the docs. + +# Version 1.5.0 + +- Add `receiver_count()` and `sender_count()`. + +# Version 1.4.2 + +- Fix a bug that would sometime cause 100% CPU usage. + +# Version 1.4.1 + +- Update dependencies. + +# Version 1.4.0 + +- Update dependencies. + +# Version 1.3.0 + +- Add `Sender::is_closed()` and `Receiver::is_closed()`. + +# Version 1.2.0 + +- Add `Sender::close()` and `Receiver::close()`. + +# Version 1.1.1 + +- Replace `usize::MAX` with `std::usize::MAX`. + +# Version 1.1.0 + +- Add methods to error types. + +# Version 1.0.0 + +- Initial version diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..bdb1261 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,48 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies. +# +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. + +[package] +edition = "2018" +rust-version = "1.38" +name = "async-channel" +version = "1.8.0" +authors = ["Stjepan Glavina <stjepang@gmail.com>"] +exclude = ["/.*"] +description = "Async multi-producer multi-consumer channel" +readme = "README.md" +keywords = [ + "mpmc", + "mpsc", + "spmc", + "chan", + "futures", +] +categories = [ + "asynchronous", + "concurrency", +] +license = "Apache-2.0 OR MIT" +repository = "https://github.com/smol-rs/async-channel" + +[dependencies.concurrent-queue] +version = "2" + +[dependencies.event-listener] +version = "2.4.0" + +[dependencies.futures-core] +version = "0.3.5" + +[dev-dependencies.easy-parallel] +version = "3" + +[dev-dependencies.futures-lite] +version = "1" diff --git a/Cargo.toml.orig b/Cargo.toml.orig new file mode 100644 index 0000000..5f0aaae --- /dev/null +++ b/Cargo.toml.orig @@ -0,0 +1,24 @@ +[package] +name = "async-channel" +# When publishing a new version: +# - Update CHANGELOG.md +# - Create "v1.x.y" git tag +version = "1.8.0" +authors = ["Stjepan Glavina <stjepang@gmail.com>"] +edition = "2018" +rust-version = "1.38" +description = "Async multi-producer multi-consumer channel" +license = "Apache-2.0 OR MIT" +repository = "https://github.com/smol-rs/async-channel" +keywords = ["mpmc", "mpsc", "spmc", "chan", "futures"] +categories = ["asynchronous", "concurrency"] +exclude = ["/.*"] + +[dependencies] +concurrent-queue = "2" +event-listener = "2.4.0" +futures-core = "0.3.5" + +[dev-dependencies] +easy-parallel = "3" +futures-lite = "1" diff --git a/LICENSE-APACHE b/LICENSE-APACHE new file mode 100644 index 0000000..16fe87b --- /dev/null +++ b/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/LICENSE-MIT b/LICENSE-MIT new file mode 100644 index 0000000..31aa793 --- /dev/null +++ b/LICENSE-MIT @@ -0,0 +1,23 @@ +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..d670870 --- /dev/null +++ b/README.md @@ -0,0 +1,51 @@ +# async-channel + +[![Build](https://github.com/smol-rs/async-channel/workflows/Build%20and%20test/badge.svg)]( +https://github.com/smol-rs/async-channel/actions) +[![License](https://img.shields.io/badge/license-Apache--2.0_OR_MIT-blue.svg)]( +https://github.com/smol-rs/async-channel) +[![Cargo](https://img.shields.io/crates/v/async-channel.svg)]( +https://crates.io/crates/async-channel) +[![Documentation](https://docs.rs/async-channel/badge.svg)]( +https://docs.rs/async-channel) + +An async multi-producer multi-consumer channel, where each message can be received by only +one of all existing consumers. + +There are two kinds of channels: + +1. Bounded channel with limited capacity. +2. Unbounded channel with unlimited capacity. + +A channel has the `Sender` and `Receiver` side. Both sides are cloneable and can be shared +among multiple threads. + +When all `Sender`s or all `Receiver`s are dropped, the channel becomes closed. When a +channel is closed, no more messages can be sent, but remaining messages can still be received. + +The channel can also be closed manually by calling `Sender::close()` or +`Receiver::close()`. + +## Examples + +```rust +let (s, r) = async_channel::unbounded(); + +assert_eq!(s.send("Hello").await, Ok(())); +assert_eq!(r.recv().await, Ok("Hello")); +``` + +## License + +Licensed under either of + + * Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) + * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) + +at your option. + +#### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in the work by you, as defined in the Apache-2.0 license, shall be +dual licensed as above, without any additional terms or conditions. diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..854d0bd --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,1194 @@ +//! An async multi-producer multi-consumer channel, where each message can be received by only +//! one of all existing consumers. +//! +//! There are two kinds of channels: +//! +//! 1. [Bounded][`bounded()`] channel with limited capacity. +//! 2. [Unbounded][`unbounded()`] channel with unlimited capacity. +//! +//! A channel has the [`Sender`] and [`Receiver`] side. Both sides are cloneable and can be shared +//! among multiple threads. +//! +//! When all [`Sender`]s or all [`Receiver`]s are dropped, the channel becomes closed. When a +//! channel is closed, no more messages can be sent, but remaining messages can still be received. +//! +//! The channel can also be closed manually by calling [`Sender::close()`] or +//! [`Receiver::close()`]. +//! +//! # Examples +//! +//! ``` +//! # futures_lite::future::block_on(async { +//! let (s, r) = async_channel::unbounded(); +//! +//! assert_eq!(s.send("Hello").await, Ok(())); +//! assert_eq!(r.recv().await, Ok("Hello")); +//! # }); +//! ``` + +#![forbid(unsafe_code)] +#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] + +use std::error; +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::process; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::usize; + +use concurrent_queue::{ConcurrentQueue, PopError, PushError}; +use event_listener::{Event, EventListener}; +use futures_core::stream::Stream; + +struct Channel<T> { + /// Inner message queue. + queue: ConcurrentQueue<T>, + + /// Send operations waiting while the channel is full. + send_ops: Event, + + /// Receive operations waiting while the channel is empty and not closed. + recv_ops: Event, + + /// Stream operations while the channel is empty and not closed. + stream_ops: Event, + + /// The number of currently active `Sender`s. + sender_count: AtomicUsize, + + /// The number of currently active `Receivers`s. + receiver_count: AtomicUsize, +} + +impl<T> Channel<T> { + /// Closes the channel and notifies all blocked operations. + /// + /// Returns `true` if this call has closed the channel and it was not closed already. + fn close(&self) -> bool { + if self.queue.close() { + // Notify all send operations. + self.send_ops.notify(usize::MAX); + + // Notify all receive and stream operations. + self.recv_ops.notify(usize::MAX); + self.stream_ops.notify(usize::MAX); + + true + } else { + false + } + } +} + +/// Creates a bounded channel. +/// +/// The created channel has space to hold at most `cap` messages at a time. +/// +/// # Panics +/// +/// Capacity must be a positive number. If `cap` is zero, this function will panic. +/// +/// # Examples +/// +/// ``` +/// # futures_lite::future::block_on(async { +/// use async_channel::{bounded, TryRecvError, TrySendError}; +/// +/// let (s, r) = bounded(1); +/// +/// assert_eq!(s.send(10).await, Ok(())); +/// assert_eq!(s.try_send(20), Err(TrySendError::Full(20))); +/// +/// assert_eq!(r.recv().await, Ok(10)); +/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +/// # }); +/// ``` +pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) { + assert!(cap > 0, "capacity cannot be zero"); + + let channel = Arc::new(Channel { + queue: ConcurrentQueue::bounded(cap), + send_ops: Event::new(), + recv_ops: Event::new(), + stream_ops: Event::new(), + sender_count: AtomicUsize::new(1), + receiver_count: AtomicUsize::new(1), + }); + + let s = Sender { + channel: channel.clone(), + }; + let r = Receiver { + channel, + listener: None, + }; + (s, r) +} + +/// Creates an unbounded channel. +/// +/// The created channel can hold an unlimited number of messages. +/// +/// # Examples +/// +/// ``` +/// # futures_lite::future::block_on(async { +/// use async_channel::{unbounded, TryRecvError}; +/// +/// let (s, r) = unbounded(); +/// +/// assert_eq!(s.send(10).await, Ok(())); +/// assert_eq!(s.send(20).await, Ok(())); +/// +/// assert_eq!(r.recv().await, Ok(10)); +/// assert_eq!(r.recv().await, Ok(20)); +/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +/// # }); +/// ``` +pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) { + let channel = Arc::new(Channel { + queue: ConcurrentQueue::unbounded(), + send_ops: Event::new(), + recv_ops: Event::new(), + stream_ops: Event::new(), + sender_count: AtomicUsize::new(1), + receiver_count: AtomicUsize::new(1), + }); + + let s = Sender { + channel: channel.clone(), + }; + let r = Receiver { + channel, + listener: None, + }; + (s, r) +} + +/// The sending side of a channel. +/// +/// Senders can be cloned and shared among threads. When all senders associated with a channel are +/// dropped, the channel becomes closed. +/// +/// The channel can also be closed manually by calling [`Sender::close()`]. +pub struct Sender<T> { + /// Inner channel state. + channel: Arc<Channel<T>>, +} + +impl<T> Sender<T> { + /// Attempts to send a message into the channel. + /// + /// If the channel is full or closed, this method returns an error. + /// + /// # Examples + /// + /// ``` + /// use async_channel::{bounded, TrySendError}; + /// + /// let (s, r) = bounded(1); + /// + /// assert_eq!(s.try_send(1), Ok(())); + /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2))); + /// + /// drop(r); + /// assert_eq!(s.try_send(3), Err(TrySendError::Closed(3))); + /// ``` + pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { + match self.channel.queue.push(msg) { + Ok(()) => { + // Notify a blocked receive operation. If the notified operation gets canceled, + // it will notify another blocked receive operation. + self.channel.recv_ops.notify_additional(1); + + // Notify all blocked streams. + self.channel.stream_ops.notify(usize::MAX); + + Ok(()) + } + Err(PushError::Full(msg)) => Err(TrySendError::Full(msg)), + Err(PushError::Closed(msg)) => Err(TrySendError::Closed(msg)), + } + } + + /// Sends a message into the channel. + /// + /// If the channel is full, this method waits until there is space for a message. + /// + /// If the channel is closed, this method returns an error. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_channel::{unbounded, SendError}; + /// + /// let (s, r) = unbounded(); + /// + /// assert_eq!(s.send(1).await, Ok(())); + /// drop(r); + /// assert_eq!(s.send(2).await, Err(SendError(2))); + /// # }); + /// ``` + pub fn send(&self, msg: T) -> Send<'_, T> { + Send { + sender: self, + listener: None, + msg: Some(msg), + } + } + + /// Sends a message into this channel using the blocking strategy. + /// + /// If the channel is full, this method will block until there is room. + /// If the channel is closed, this method returns an error. + /// + /// # Blocking + /// + /// Rather than using asynchronous waiting, like the [`send`](Self::send) method, + /// this method will block the current thread until the message is sent. + /// + /// This method should not be used in an asynchronous context. It is intended + /// to be used such that a channel can be used in both asynchronous and synchronous contexts. + /// Calling this method in an asynchronous context may result in deadlocks. + /// + /// # Examples + /// + /// ``` + /// use async_channel::{unbounded, SendError}; + /// + /// let (s, r) = unbounded(); + /// + /// assert_eq!(s.send_blocking(1), Ok(())); + /// drop(r); + /// assert_eq!(s.send_blocking(2), Err(SendError(2))); + /// ``` + pub fn send_blocking(&self, msg: T) -> Result<(), SendError<T>> { + self.send(msg).wait() + } + + /// Closes the channel. + /// + /// Returns `true` if this call has closed the channel and it was not closed already. + /// + /// The remaining messages can still be received. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_channel::{unbounded, RecvError}; + /// + /// let (s, r) = unbounded(); + /// assert_eq!(s.send(1).await, Ok(())); + /// assert!(s.close()); + /// + /// assert_eq!(r.recv().await, Ok(1)); + /// assert_eq!(r.recv().await, Err(RecvError)); + /// # }); + /// ``` + pub fn close(&self) -> bool { + self.channel.close() + } + + /// Returns `true` if the channel is closed. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_channel::{unbounded, RecvError}; + /// + /// let (s, r) = unbounded::<()>(); + /// assert!(!s.is_closed()); + /// + /// drop(r); + /// assert!(s.is_closed()); + /// # }); + /// ``` + pub fn is_closed(&self) -> bool { + self.channel.queue.is_closed() + } + + /// Returns `true` if the channel is empty. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_channel::unbounded; + /// + /// let (s, r) = unbounded(); + /// + /// assert!(s.is_empty()); + /// s.send(1).await; + /// assert!(!s.is_empty()); + /// # }); + /// ``` + pub fn is_empty(&self) -> bool { + self.channel.queue.is_empty() + } + + /// Returns `true` if the channel is full. + /// + /// Unbounded channels are never full. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_channel::bounded; + /// + /// let (s, r) = bounded(1); + /// + /// assert!(!s.is_full()); + /// s.send(1).await; + /// assert!(s.is_full()); + /// # }); + /// ``` + pub fn is_full(&self) -> bool { + self.channel.queue.is_full() + } + + /// Returns the number of messages in the channel. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_channel::unbounded; + /// + /// let (s, r) = unbounded(); + /// assert_eq!(s.len(), 0); + /// + /// s.send(1).await; + /// s.send(2).await; + /// assert_eq!(s.len(), 2); + /// # }); + /// ``` + pub fn len(&self) -> usize { + self.channel.queue.len() + } + + /// Returns the channel capacity if it's bounded. + /// + /// # Examples + /// + /// ``` + /// use async_channel::{bounded, unbounded}; + /// + /// let (s, r) = bounded::<i32>(5); + /// assert_eq!(s.capacity(), Some(5)); + /// + /// let (s, r) = unbounded::<i32>(); + /// assert_eq!(s.capacity(), None); + /// ``` + pub fn capacity(&self) -> Option<usize> { + self.channel.queue.capacity() + } + + /// Returns the number of receivers for the channel. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_channel::unbounded; + /// + /// let (s, r) = unbounded::<()>(); + /// assert_eq!(s.receiver_count(), 1); + /// + /// let r2 = r.clone(); + /// assert_eq!(s.receiver_count(), 2); + /// # }); + /// ``` + pub fn receiver_count(&self) -> usize { + self.channel.receiver_count.load(Ordering::SeqCst) + } + + /// Returns the number of senders for the channel. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_channel::unbounded; + /// + /// let (s, r) = unbounded::<()>(); + /// assert_eq!(s.sender_count(), 1); + /// + /// let s2 = s.clone(); + /// assert_eq!(s.sender_count(), 2); + /// # }); + /// ``` + pub fn sender_count(&self) -> usize { + self.channel.sender_count.load(Ordering::SeqCst) + } + + /// Downgrade the sender to a weak reference. + pub fn downgrade(&self) -> WeakSender<T> { + WeakSender { + channel: self.channel.clone(), + } + } +} + +impl<T> Drop for Sender<T> { + fn drop(&mut self) { + // Decrement the sender count and close the channel if it drops down to zero. + if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 { + self.channel.close(); + } + } +} + +impl<T> fmt::Debug for Sender<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Sender {{ .. }}") + } +} + +impl<T> Clone for Sender<T> { + fn clone(&self) -> Sender<T> { + let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed); + + // Make sure the count never overflows, even if lots of sender clones are leaked. + if count > usize::MAX / 2 { + process::abort(); + } + + Sender { + channel: self.channel.clone(), + } + } +} + +/// The receiving side of a channel. +/// +/// Receivers can be cloned and shared among threads. When all receivers associated with a channel +/// are dropped, the channel becomes closed. +/// +/// The channel can also be closed manually by calling [`Receiver::close()`]. +/// +/// Receivers implement the [`Stream`] trait. +pub struct Receiver<T> { + /// Inner channel state. + channel: Arc<Channel<T>>, + + /// Listens for a send or close event to unblock this stream. + listener: Option<EventListener>, +} + +impl<T> Receiver<T> { + /// Attempts to receive a message from the channel. + /// + /// If the channel is empty, or empty and closed, this method returns an error. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_channel::{unbounded, TryRecvError}; + /// + /// let (s, r) = unbounded(); + /// assert_eq!(s.send(1).await, Ok(())); + /// + /// assert_eq!(r.try_recv(), Ok(1)); + /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + /// + /// drop(s); + /// assert_eq!(r.try_recv(), Err(TryRecvError::Closed)); + /// # }); + /// ``` + pub fn try_recv(&self) -> Result<T, TryRecvError> { + match self.channel.queue.pop() { + Ok(msg) => { + // Notify a blocked send operation. If the notified operation gets canceled, it + // will notify another blocked send operation. + self.channel.send_ops.notify_additional(1); + + Ok(msg) + } + Err(PopError::Empty) => Err(TryRecvError::Empty), + Err(PopError::Closed) => Err(TryRecvError::Closed), + } + } + + /// Receives a message from the channel. + /// + /// If the channel is empty, this method waits until there is a message. + /// + /// If the channel is closed, this method receives a message or returns an error if there are + /// no more messages. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_channel::{unbounded, RecvError}; + /// + /// let (s, r) = unbounded(); + /// + /// assert_eq!(s.send(1).await, Ok(())); + /// drop(s); + /// + /// assert_eq!(r.recv().await, Ok(1)); + /// assert_eq!(r.recv().await, Err(RecvError)); + /// # }); + /// ``` + pub fn recv(&self) -> Recv<'_, T> { + Recv { + receiver: self, + listener: None, + } + } + + /// Receives a message from the channel using the blocking strategy. + /// + /// If the channel is empty, this method waits until there is a message. + /// If the channel is closed, this method receives a message or returns an error if there are + /// no more messages. + /// + /// # Blocking + /// + /// Rather than using asynchronous waiting, like the [`recv`](Self::recv) method, + /// this method will block the current thread until the message is sent. + /// + /// This method should not be used in an asynchronous context. It is intended + /// to be used such that a channel can be used in both asynchronous and synchronous contexts. + /// Calling this method in an asynchronous context may result in deadlocks. + /// + /// # Examples + /// + /// ``` + /// use async_channel::{unbounded, RecvError}; + /// + /// let (s, r) = unbounded(); + /// + /// assert_eq!(s.send_blocking(1), Ok(())); + /// drop(s); + /// + /// assert_eq!(r.recv_blocking(), Ok(1)); + /// assert_eq!(r.recv_blocking(), Err(RecvError)); + /// ``` + pub fn recv_blocking(&self) -> Result<T, RecvError> { + self.recv().wait() + } + + /// Closes the channel. + /// + /// Returns `true` if this call has closed the channel and it was not closed already. + /// + /// The remaining messages can still be received. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_channel::{unbounded, RecvError}; + /// + /// let (s, r) = unbounded(); + /// assert_eq!(s.send(1).await, Ok(())); + /// + /// assert!(r.close()); + /// assert_eq!(r.recv().await, Ok(1)); + /// assert_eq!(r.recv().await, Err(RecvError)); + /// # }); + /// ``` + pub fn close(&self) -> bool { + self.channel.close() + } + + /// Returns `true` if the channel is closed. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_channel::{unbounded, RecvError}; + /// + /// let (s, r) = unbounded::<()>(); + /// assert!(!r.is_closed()); + /// + /// drop(s); + /// assert!(r.is_closed()); + /// # }); + /// ``` + pub fn is_closed(&self) -> bool { + self.channel.queue.is_closed() + } + + /// Returns `true` if the channel is empty. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_channel::unbounded; + /// + /// let (s, r) = unbounded(); + /// + /// assert!(s.is_empty()); + /// s.send(1).await; + /// assert!(!s.is_empty()); + /// # }); + /// ``` + pub fn is_empty(&self) -> bool { + self.channel.queue.is_empty() + } + + /// Returns `true` if the channel is full. + /// + /// Unbounded channels are never full. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_channel::bounded; + /// + /// let (s, r) = bounded(1); + /// + /// assert!(!r.is_full()); + /// s.send(1).await; + /// assert!(r.is_full()); + /// # }); + /// ``` + pub fn is_full(&self) -> bool { + self.channel.queue.is_full() + } + + /// Returns the number of messages in the channel. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_channel::unbounded; + /// + /// let (s, r) = unbounded(); + /// assert_eq!(r.len(), 0); + /// + /// s.send(1).await; + /// s.send(2).await; + /// assert_eq!(r.len(), 2); + /// # }); + /// ``` + pub fn len(&self) -> usize { + self.channel.queue.len() + } + + /// Returns the channel capacity if it's bounded. + /// + /// # Examples + /// + /// ``` + /// use async_channel::{bounded, unbounded}; + /// + /// let (s, r) = bounded::<i32>(5); + /// assert_eq!(r.capacity(), Some(5)); + /// + /// let (s, r) = unbounded::<i32>(); + /// assert_eq!(r.capacity(), None); + /// ``` + pub fn capacity(&self) -> Option<usize> { + self.channel.queue.capacity() + } + + /// Returns the number of receivers for the channel. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_channel::unbounded; + /// + /// let (s, r) = unbounded::<()>(); + /// assert_eq!(r.receiver_count(), 1); + /// + /// let r2 = r.clone(); + /// assert_eq!(r.receiver_count(), 2); + /// # }); + /// ``` + pub fn receiver_count(&self) -> usize { + self.channel.receiver_count.load(Ordering::SeqCst) + } + + /// Returns the number of senders for the channel. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_channel::unbounded; + /// + /// let (s, r) = unbounded::<()>(); + /// assert_eq!(r.sender_count(), 1); + /// + /// let s2 = s.clone(); + /// assert_eq!(r.sender_count(), 2); + /// # }); + /// ``` + pub fn sender_count(&self) -> usize { + self.channel.sender_count.load(Ordering::SeqCst) + } + + /// Downgrade the receiver to a weak reference. + pub fn downgrade(&self) -> WeakReceiver<T> { + WeakReceiver { + channel: self.channel.clone(), + } + } +} + +impl<T> Drop for Receiver<T> { + fn drop(&mut self) { + // Decrement the receiver count and close the channel if it drops down to zero. + if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 { + self.channel.close(); + } + } +} + +impl<T> fmt::Debug for Receiver<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Receiver {{ .. }}") + } +} + +impl<T> Clone for Receiver<T> { + fn clone(&self) -> Receiver<T> { + let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed); + + // Make sure the count never overflows, even if lots of receiver clones are leaked. + if count > usize::MAX / 2 { + process::abort(); + } + + Receiver { + channel: self.channel.clone(), + listener: None, + } + } +} + +impl<T> Stream for Receiver<T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + loop { + // If this stream is listening for events, first wait for a notification. + if let Some(listener) = self.listener.as_mut() { + futures_core::ready!(Pin::new(listener).poll(cx)); + self.listener = None; + } + + loop { + // Attempt to receive a message. + match self.try_recv() { + Ok(msg) => { + // The stream is not blocked on an event - drop the listener. + self.listener = None; + return Poll::Ready(Some(msg)); + } + Err(TryRecvError::Closed) => { + // The stream is not blocked on an event - drop the listener. + self.listener = None; + return Poll::Ready(None); + } + Err(TryRecvError::Empty) => {} + } + + // Receiving failed - now start listening for notifications or wait for one. + match self.listener.as_mut() { + None => { + // Create a listener and try sending the message again. + self.listener = Some(self.channel.stream_ops.listen()); + } + Some(_) => { + // Go back to the outer loop to poll the listener. + break; + } + } + } + } + } +} + +impl<T> futures_core::stream::FusedStream for Receiver<T> { + fn is_terminated(&self) -> bool { + self.channel.queue.is_closed() && self.channel.queue.is_empty() + } +} + +/// A [`Sender`] that prevents the channel from not being closed. +/// +/// This is created through the [`Sender::downgrade`] method. In order to use it, it needs +/// to be upgraded into a [`Sender`] through the `upgrade` method. +#[derive(Clone)] +pub struct WeakSender<T> { + channel: Arc<Channel<T>>, +} + +impl<T> WeakSender<T> { + /// Upgrade the [`WeakSender`] into a [`Sender`]. + pub fn upgrade(&self) -> Option<Sender<T>> { + if self.channel.queue.is_closed() { + None + } else { + let old_count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed); + if old_count == 0 { + // Channel was closed while we were incrementing the count. + self.channel.sender_count.store(0, Ordering::Release); + None + } else if old_count > usize::MAX / 2 { + // Make sure the count never overflows, even if lots of sender clones are leaked. + process::abort(); + } else { + Some(Sender { + channel: self.channel.clone(), + }) + } + } + } +} + +impl<T> fmt::Debug for WeakSender<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "WeakSender {{ .. }}") + } +} + +/// A [`Receiver`] that prevents the channel from not being closed. +/// +/// This is created through the [`Receiver::downgrade`] method. In order to use it, it needs +/// to be upgraded into a [`Receiver`] through the `upgrade` method. +#[derive(Clone)] +pub struct WeakReceiver<T> { + channel: Arc<Channel<T>>, +} + +impl<T> WeakReceiver<T> { + /// Upgrade the [`WeakReceiver`] into a [`Receiver`]. + pub fn upgrade(&self) -> Option<Receiver<T>> { + if self.channel.queue.is_closed() { + None + } else { + let old_count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed); + if old_count == 0 { + // Channel was closed while we were incrementing the count. + self.channel.receiver_count.store(0, Ordering::Release); + None + } else if old_count > usize::MAX / 2 { + // Make sure the count never overflows, even if lots of receiver clones are leaked. + process::abort(); + } else { + Some(Receiver { + channel: self.channel.clone(), + listener: None, + }) + } + } + } +} + +impl<T> fmt::Debug for WeakReceiver<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "WeakReceiver {{ .. }}") + } +} + +/// An error returned from [`Sender::send()`]. +/// +/// Received because the channel is closed. +#[derive(PartialEq, Eq, Clone, Copy)] +pub struct SendError<T>(pub T); + +impl<T> SendError<T> { + /// Unwraps the message that couldn't be sent. + pub fn into_inner(self) -> T { + self.0 + } +} + +impl<T> error::Error for SendError<T> {} + +impl<T> fmt::Debug for SendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SendError(..)") + } +} + +impl<T> fmt::Display for SendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "sending into a closed channel") + } +} + +/// An error returned from [`Sender::try_send()`]. +#[derive(PartialEq, Eq, Clone, Copy)] +pub enum TrySendError<T> { + /// The channel is full but not closed. + Full(T), + + /// The channel is closed. + Closed(T), +} + +impl<T> TrySendError<T> { + /// Unwraps the message that couldn't be sent. + pub fn into_inner(self) -> T { + match self { + TrySendError::Full(t) => t, + TrySendError::Closed(t) => t, + } + } + + /// Returns `true` if the channel is full but not closed. + pub fn is_full(&self) -> bool { + match self { + TrySendError::Full(_) => true, + TrySendError::Closed(_) => false, + } + } + + /// Returns `true` if the channel is closed. + pub fn is_closed(&self) -> bool { + match self { + TrySendError::Full(_) => false, + TrySendError::Closed(_) => true, + } + } +} + +impl<T> error::Error for TrySendError<T> {} + +impl<T> fmt::Debug for TrySendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + TrySendError::Full(..) => write!(f, "Full(..)"), + TrySendError::Closed(..) => write!(f, "Closed(..)"), + } + } +} + +impl<T> fmt::Display for TrySendError<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + TrySendError::Full(..) => write!(f, "sending into a full channel"), + TrySendError::Closed(..) => write!(f, "sending into a closed channel"), + } + } +} + +/// An error returned from [`Receiver::recv()`]. +/// +/// Received because the channel is empty and closed. +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub struct RecvError; + +impl error::Error for RecvError {} + +impl fmt::Display for RecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "receiving from an empty and closed channel") + } +} + +/// An error returned from [`Receiver::try_recv()`]. +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub enum TryRecvError { + /// The channel is empty but not closed. + Empty, + + /// The channel is empty and closed. + Closed, +} + +impl TryRecvError { + /// Returns `true` if the channel is empty but not closed. + pub fn is_empty(&self) -> bool { + match self { + TryRecvError::Empty => true, + TryRecvError::Closed => false, + } + } + + /// Returns `true` if the channel is empty and closed. + pub fn is_closed(&self) -> bool { + match self { + TryRecvError::Empty => false, + TryRecvError::Closed => true, + } + } +} + +impl error::Error for TryRecvError {} + +impl fmt::Display for TryRecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + TryRecvError::Empty => write!(f, "receiving from an empty channel"), + TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"), + } + } +} + +/// A future returned by [`Sender::send()`]. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Send<'a, T> { + sender: &'a Sender<T>, + listener: Option<EventListener>, + msg: Option<T>, +} + +impl<'a, T> Send<'a, T> { + /// Run this future with the given `Strategy`. + fn run_with_strategy<S: Strategy>( + &mut self, + cx: &mut S::Context, + ) -> Poll<Result<(), SendError<T>>> { + loop { + let msg = self.msg.take().unwrap(); + // Attempt to send a message. + match self.sender.try_send(msg) { + Ok(()) => return Poll::Ready(Ok(())), + Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))), + Err(TrySendError::Full(m)) => self.msg = Some(m), + } + + // Sending failed - now start listening for notifications or wait for one. + match self.listener.take() { + None => { + // Start listening and then try sending again. + self.listener = Some(self.sender.channel.send_ops.listen()); + } + Some(l) => { + // Poll using the given strategy + if let Err(l) = S::poll(l, cx) { + self.listener = Some(l); + return Poll::Pending; + } + } + } + } + } + + /// Run using the blocking strategy. + fn wait(mut self) -> Result<(), SendError<T>> { + match self.run_with_strategy::<Blocking>(&mut ()) { + Poll::Ready(res) => res, + Poll::Pending => unreachable!(), + } + } +} + +impl<'a, T> Unpin for Send<'a, T> {} + +impl<'a, T> Future for Send<'a, T> { + type Output = Result<(), SendError<T>>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + self.run_with_strategy::<NonBlocking<'_>>(cx) + } +} + +/// A future returned by [`Receiver::recv()`]. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Recv<'a, T> { + receiver: &'a Receiver<T>, + listener: Option<EventListener>, +} + +impl<'a, T> Unpin for Recv<'a, T> {} + +impl<'a, T> Recv<'a, T> { + /// Run this future with the given `Strategy`. + fn run_with_strategy<S: Strategy>( + &mut self, + cx: &mut S::Context, + ) -> Poll<Result<T, RecvError>> { + loop { + // Attempt to receive a message. + match self.receiver.try_recv() { + Ok(msg) => return Poll::Ready(Ok(msg)), + Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)), + Err(TryRecvError::Empty) => {} + } + + // Receiving failed - now start listening for notifications or wait for one. + match self.listener.take() { + None => { + // Start listening and then try receiving again. + self.listener = Some(self.receiver.channel.recv_ops.listen()); + } + Some(l) => { + // Poll using the given strategy. + if let Err(l) = S::poll(l, cx) { + self.listener = Some(l); + return Poll::Pending; + } + } + } + } + } + + /// Run with the blocking strategy. + fn wait(mut self) -> Result<T, RecvError> { + match self.run_with_strategy::<Blocking>(&mut ()) { + Poll::Ready(res) => res, + Poll::Pending => unreachable!(), + } + } +} + +impl<'a, T> Future for Recv<'a, T> { + type Output = Result<T, RecvError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + self.run_with_strategy::<NonBlocking<'_>>(cx) + } +} + +/// A strategy used to poll an `EventListener`. +trait Strategy { + /// Context needed to be provided to the `poll` method. + type Context; + + /// Polls the given `EventListener`. + /// + /// Returns the `EventListener` back if it was not completed; otherwise, + /// returns `Ok(())`. + fn poll(evl: EventListener, cx: &mut Self::Context) -> Result<(), EventListener>; +} + +/// Non-blocking strategy for use in asynchronous code. +struct NonBlocking<'a>(&'a mut ()); + +impl<'a> Strategy for NonBlocking<'a> { + type Context = Context<'a>; + + fn poll(mut evl: EventListener, cx: &mut Context<'a>) -> Result<(), EventListener> { + match Pin::new(&mut evl).poll(cx) { + Poll::Ready(()) => Ok(()), + Poll::Pending => Err(evl), + } + } +} + +/// Blocking strategy for use in synchronous code. +struct Blocking; + +impl Strategy for Blocking { + type Context = (); + + fn poll(evl: EventListener, _cx: &mut ()) -> Result<(), EventListener> { + evl.wait(); + Ok(()) + } +} diff --git a/tests/bounded.rs b/tests/bounded.rs new file mode 100644 index 0000000..0ae4890 --- /dev/null +++ b/tests/bounded.rs @@ -0,0 +1,482 @@ +#![allow(clippy::bool_assert_comparison)] + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::thread::sleep; +use std::time::Duration; + +use async_channel::{bounded, RecvError, SendError, TryRecvError, TrySendError}; +use easy_parallel::Parallel; +use futures_lite::{future, prelude::*}; + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn smoke() { + let (s, r) = bounded(1); + + future::block_on(s.send(7)).unwrap(); + assert_eq!(r.try_recv(), Ok(7)); + + future::block_on(s.send(8)).unwrap(); + assert_eq!(future::block_on(r.recv()), Ok(8)); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn smoke_blocking() { + let (s, r) = bounded(1); + + s.send_blocking(7).unwrap(); + assert_eq!(r.try_recv(), Ok(7)); + + s.send_blocking(8).unwrap(); + assert_eq!(future::block_on(r.recv()), Ok(8)); + + future::block_on(s.send(9)).unwrap(); + assert_eq!(r.recv_blocking(), Ok(9)); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn capacity() { + for i in 1..10 { + let (s, r) = bounded::<()>(i); + assert_eq!(s.capacity(), Some(i)); + assert_eq!(r.capacity(), Some(i)); + } +} + +#[test] +fn len_empty_full() { + let (s, r) = bounded(2); + + assert_eq!(s.len(), 0); + assert_eq!(s.is_empty(), true); + assert_eq!(s.is_full(), false); + assert_eq!(r.len(), 0); + assert_eq!(r.is_empty(), true); + assert_eq!(r.is_full(), false); + + future::block_on(s.send(())).unwrap(); + + assert_eq!(s.len(), 1); + assert_eq!(s.is_empty(), false); + assert_eq!(s.is_full(), false); + assert_eq!(r.len(), 1); + assert_eq!(r.is_empty(), false); + assert_eq!(r.is_full(), false); + + future::block_on(s.send(())).unwrap(); + + assert_eq!(s.len(), 2); + assert_eq!(s.is_empty(), false); + assert_eq!(s.is_full(), true); + assert_eq!(r.len(), 2); + assert_eq!(r.is_empty(), false); + assert_eq!(r.is_full(), true); + + future::block_on(r.recv()).unwrap(); + + assert_eq!(s.len(), 1); + assert_eq!(s.is_empty(), false); + assert_eq!(s.is_full(), false); + assert_eq!(r.len(), 1); + assert_eq!(r.is_empty(), false); + assert_eq!(r.is_full(), false); +} + +#[test] +fn try_recv() { + let (s, r) = bounded(100); + + Parallel::new() + .add(move || { + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + sleep(ms(1500)); + assert_eq!(r.try_recv(), Ok(7)); + sleep(ms(500)); + assert_eq!(r.try_recv(), Err(TryRecvError::Closed)); + }) + .add(move || { + sleep(ms(1000)); + future::block_on(s.send(7)).unwrap(); + }) + .run(); +} + +#[test] +fn recv() { + let (s, r) = bounded(100); + + Parallel::new() + .add(move || { + assert_eq!(future::block_on(r.recv()), Ok(7)); + sleep(ms(1000)); + assert_eq!(future::block_on(r.recv()), Ok(8)); + sleep(ms(1000)); + assert_eq!(future::block_on(r.recv()), Ok(9)); + assert_eq!(future::block_on(r.recv()), Err(RecvError)); + }) + .add(move || { + sleep(ms(1500)); + future::block_on(s.send(7)).unwrap(); + future::block_on(s.send(8)).unwrap(); + future::block_on(s.send(9)).unwrap(); + }) + .run(); +} + +#[test] +fn try_send() { + let (s, r) = bounded(1); + + Parallel::new() + .add(move || { + assert_eq!(s.try_send(1), Ok(())); + assert_eq!(s.try_send(2), Err(TrySendError::Full(2))); + sleep(ms(1500)); + assert_eq!(s.try_send(3), Ok(())); + sleep(ms(500)); + assert_eq!(s.try_send(4), Err(TrySendError::Closed(4))); + }) + .add(move || { + sleep(ms(1000)); + assert_eq!(r.try_recv(), Ok(1)); + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + assert_eq!(future::block_on(r.recv()), Ok(3)); + }) + .run(); +} + +#[test] +fn send() { + let (s, r) = bounded(1); + + Parallel::new() + .add(|| { + future::block_on(s.send(7)).unwrap(); + sleep(ms(1000)); + future::block_on(s.send(8)).unwrap(); + sleep(ms(1000)); + future::block_on(s.send(9)).unwrap(); + sleep(ms(1000)); + future::block_on(s.send(10)).unwrap(); + }) + .add(|| { + sleep(ms(1500)); + assert_eq!(future::block_on(r.recv()), Ok(7)); + assert_eq!(future::block_on(r.recv()), Ok(8)); + assert_eq!(future::block_on(r.recv()), Ok(9)); + }) + .run(); +} + +#[test] +fn send_after_close() { + let (s, r) = bounded(100); + + future::block_on(s.send(1)).unwrap(); + future::block_on(s.send(2)).unwrap(); + future::block_on(s.send(3)).unwrap(); + + drop(r); + + assert_eq!(future::block_on(s.send(4)), Err(SendError(4))); + assert_eq!(s.try_send(5), Err(TrySendError::Closed(5))); + assert_eq!(future::block_on(s.send(6)), Err(SendError(6))); +} + +#[test] +fn recv_after_close() { + let (s, r) = bounded(100); + + future::block_on(s.send(1)).unwrap(); + future::block_on(s.send(2)).unwrap(); + future::block_on(s.send(3)).unwrap(); + + drop(s); + + assert_eq!(future::block_on(r.recv()), Ok(1)); + assert_eq!(future::block_on(r.recv()), Ok(2)); + assert_eq!(future::block_on(r.recv()), Ok(3)); + assert_eq!(future::block_on(r.recv()), Err(RecvError)); +} + +#[test] +fn len() { + const COUNT: usize = 25_000; + const CAP: usize = 1000; + + let (s, r) = bounded(CAP); + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); + + for _ in 0..CAP / 10 { + for i in 0..50 { + future::block_on(s.send(i)).unwrap(); + assert_eq!(s.len(), i + 1); + } + + for i in 0..50 { + future::block_on(r.recv()).unwrap(); + assert_eq!(r.len(), 50 - i - 1); + } + } + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); + + for i in 0..CAP { + future::block_on(s.send(i)).unwrap(); + assert_eq!(s.len(), i + 1); + } + + for _ in 0..CAP { + future::block_on(r.recv()).unwrap(); + } + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); + + Parallel::new() + .add(|| { + for i in 0..COUNT { + assert_eq!(future::block_on(r.recv()), Ok(i)); + let len = r.len(); + assert!(len <= CAP); + } + }) + .add(|| { + for i in 0..COUNT { + future::block_on(s.send(i)).unwrap(); + let len = s.len(); + assert!(len <= CAP); + } + }) + .run(); + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); +} + +#[test] +fn receiver_count() { + let (s, r) = bounded::<()>(5); + let receiver_clones: Vec<_> = (0..20).map(|_| r.clone()).collect(); + + assert_eq!(s.receiver_count(), 21); + assert_eq!(r.receiver_count(), 21); + + drop(receiver_clones); + + assert_eq!(s.receiver_count(), 1); + assert_eq!(r.receiver_count(), 1); +} + +#[test] +fn sender_count() { + let (s, r) = bounded::<()>(5); + let sender_clones: Vec<_> = (0..20).map(|_| s.clone()).collect(); + + assert_eq!(s.sender_count(), 21); + assert_eq!(r.sender_count(), 21); + + drop(sender_clones); + + assert_eq!(s.receiver_count(), 1); + assert_eq!(r.receiver_count(), 1); +} + +#[test] +fn close_wakes_sender() { + let (s, r) = bounded(1); + + Parallel::new() + .add(move || { + assert_eq!(future::block_on(s.send(())), Ok(())); + assert_eq!(future::block_on(s.send(())), Err(SendError(()))); + }) + .add(move || { + sleep(ms(1000)); + drop(r); + }) + .run(); +} + +#[test] +fn close_wakes_receiver() { + let (s, r) = bounded::<()>(1); + + Parallel::new() + .add(move || { + assert_eq!(future::block_on(r.recv()), Err(RecvError)); + }) + .add(move || { + sleep(ms(1000)); + drop(s); + }) + .run(); +} + +#[test] +fn forget_blocked_sender() { + let (s1, r) = bounded(2); + let s2 = s1.clone(); + + Parallel::new() + .add(move || { + assert!(future::block_on(s1.send(3)).is_ok()); + assert!(future::block_on(s1.send(7)).is_ok()); + let mut s1_fut = s1.send(13); + // Poll but keep the future alive. + assert_eq!(future::block_on(future::poll_once(&mut s1_fut)), None); + sleep(ms(500)); + }) + .add(move || { + sleep(ms(100)); + assert!(future::block_on(s2.send(42)).is_ok()); + }) + .add(move || { + sleep(ms(200)); + assert_eq!(future::block_on(r.recv()), Ok(3)); + assert_eq!(future::block_on(r.recv()), Ok(7)); + sleep(ms(100)); + assert_eq!(r.try_recv(), Ok(42)); + }) + .run(); +} + +#[test] +fn forget_blocked_receiver() { + let (s, r1) = bounded(2); + let r2 = r1.clone(); + + Parallel::new() + .add(move || { + let mut r1_fut = r1.recv(); + // Poll but keep the future alive. + assert_eq!(future::block_on(future::poll_once(&mut r1_fut)), None); + sleep(ms(500)); + }) + .add(move || { + sleep(ms(100)); + assert_eq!(future::block_on(r2.recv()), Ok(3)); + }) + .add(move || { + sleep(ms(200)); + assert!(future::block_on(s.send(3)).is_ok()); + assert!(future::block_on(s.send(7)).is_ok()); + sleep(ms(100)); + assert!(s.try_send(42).is_ok()); + }) + .run(); +} + +#[test] +fn spsc() { + const COUNT: usize = 100_000; + + let (s, r) = bounded(3); + + Parallel::new() + .add(move || { + for i in 0..COUNT { + assert_eq!(future::block_on(r.recv()), Ok(i)); + } + assert_eq!(future::block_on(r.recv()), Err(RecvError)); + }) + .add(move || { + for i in 0..COUNT { + future::block_on(s.send(i)).unwrap(); + } + }) + .run(); +} + +#[test] +fn mpmc() { + const COUNT: usize = 25_000; + const THREADS: usize = 4; + + let (s, r) = bounded::<usize>(3); + let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); + + Parallel::new() + .each(0..THREADS, |_| { + for _ in 0..COUNT { + let n = future::block_on(r.recv()).unwrap(); + v[n].fetch_add(1, Ordering::SeqCst); + } + }) + .each(0..THREADS, |_| { + for i in 0..COUNT { + future::block_on(s.send(i)).unwrap(); + } + }) + .run(); + + for c in v { + assert_eq!(c.load(Ordering::SeqCst), THREADS); + } +} + +#[test] +fn mpmc_stream() { + const COUNT: usize = 25_000; + const THREADS: usize = 4; + + let (s, r) = bounded::<usize>(3); + let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); + let v = &v; + + Parallel::new() + .each(0..THREADS, { + let mut r = r; + move |_| { + for _ in 0..COUNT { + let n = future::block_on(r.next()).unwrap(); + v[n].fetch_add(1, Ordering::SeqCst); + } + } + }) + .each(0..THREADS, |_| { + for i in 0..COUNT { + future::block_on(s.send(i)).unwrap(); + } + }) + .run(); + + for c in v { + assert_eq!(c.load(Ordering::SeqCst), THREADS); + } +} + +#[test] +fn weak() { + let (s, r) = bounded::<usize>(3); + + // Create a weak sender/receiver pair. + let (weak_s, weak_r) = (s.downgrade(), r.downgrade()); + + // Upgrade and send. + { + let s = weak_s.upgrade().unwrap(); + s.send_blocking(3).unwrap(); + let r = weak_r.upgrade().unwrap(); + assert_eq!(r.recv_blocking(), Ok(3)); + } + + // Drop the original sender/receiver pair. + drop((s, r)); + + // Try to upgrade again. + { + assert!(weak_s.upgrade().is_none()); + assert!(weak_r.upgrade().is_none()); + } +} diff --git a/tests/unbounded.rs b/tests/unbounded.rs new file mode 100644 index 0000000..e239d34 --- /dev/null +++ b/tests/unbounded.rs @@ -0,0 +1,343 @@ +#![allow(clippy::bool_assert_comparison)] + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::thread::sleep; +use std::time::Duration; + +use async_channel::{unbounded, RecvError, SendError, TryRecvError, TrySendError}; +use easy_parallel::Parallel; +use futures_lite::{future, prelude::*}; + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn smoke() { + let (s, r) = unbounded(); + + s.try_send(7).unwrap(); + assert_eq!(r.try_recv(), Ok(7)); + + future::block_on(s.send(8)).unwrap(); + assert_eq!(future::block_on(r.recv()), Ok(8)); + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn smoke_blocking() { + let (s, r) = unbounded(); + + s.send_blocking(7).unwrap(); + assert_eq!(r.try_recv(), Ok(7)); + + s.send_blocking(8).unwrap(); + assert_eq!(future::block_on(r.recv()), Ok(8)); + + future::block_on(s.send(9)).unwrap(); + assert_eq!(r.recv_blocking(), Ok(9)); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn capacity() { + let (s, r) = unbounded::<()>(); + assert_eq!(s.capacity(), None); + assert_eq!(r.capacity(), None); +} + +#[test] +fn len_empty_full() { + let (s, r) = unbounded(); + + assert_eq!(s.len(), 0); + assert_eq!(s.is_empty(), true); + assert_eq!(s.is_full(), false); + assert_eq!(r.len(), 0); + assert_eq!(r.is_empty(), true); + assert_eq!(r.is_full(), false); + + future::block_on(s.send(())).unwrap(); + + assert_eq!(s.len(), 1); + assert_eq!(s.is_empty(), false); + assert_eq!(s.is_full(), false); + assert_eq!(r.len(), 1); + assert_eq!(r.is_empty(), false); + assert_eq!(r.is_full(), false); + + future::block_on(r.recv()).unwrap(); + + assert_eq!(s.len(), 0); + assert_eq!(s.is_empty(), true); + assert_eq!(s.is_full(), false); + assert_eq!(r.len(), 0); + assert_eq!(r.is_empty(), true); + assert_eq!(r.is_full(), false); +} + +#[test] +fn try_recv() { + let (s, r) = unbounded(); + + Parallel::new() + .add(move || { + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + sleep(ms(1500)); + assert_eq!(r.try_recv(), Ok(7)); + sleep(ms(500)); + assert_eq!(r.try_recv(), Err(TryRecvError::Closed)); + }) + .add(move || { + sleep(ms(1000)); + future::block_on(s.send(7)).unwrap(); + }) + .run(); +} + +#[test] +fn recv() { + let (s, r) = unbounded(); + + Parallel::new() + .add(move || { + assert_eq!(future::block_on(r.recv()), Ok(7)); + sleep(ms(1000)); + assert_eq!(future::block_on(r.recv()), Ok(8)); + sleep(ms(1000)); + assert_eq!(future::block_on(r.recv()), Ok(9)); + assert_eq!(future::block_on(r.recv()), Err(RecvError)); + }) + .add(move || { + sleep(ms(1500)); + future::block_on(s.send(7)).unwrap(); + future::block_on(s.send(8)).unwrap(); + future::block_on(s.send(9)).unwrap(); + }) + .run(); +} + +#[test] +fn try_send() { + let (s, r) = unbounded(); + for i in 0..1000 { + assert_eq!(s.try_send(i), Ok(())); + } + + drop(r); + assert_eq!(s.try_send(777), Err(TrySendError::Closed(777))); +} + +#[test] +fn send() { + let (s, r) = unbounded(); + for i in 0..1000 { + assert_eq!(future::block_on(s.send(i)), Ok(())); + } + + drop(r); + assert_eq!(future::block_on(s.send(777)), Err(SendError(777))); +} + +#[test] +fn send_after_close() { + let (s, r) = unbounded(); + + future::block_on(s.send(1)).unwrap(); + future::block_on(s.send(2)).unwrap(); + future::block_on(s.send(3)).unwrap(); + + drop(r); + + assert_eq!(future::block_on(s.send(4)), Err(SendError(4))); + assert_eq!(s.try_send(5), Err(TrySendError::Closed(5))); +} + +#[test] +fn recv_after_close() { + let (s, r) = unbounded(); + + future::block_on(s.send(1)).unwrap(); + future::block_on(s.send(2)).unwrap(); + future::block_on(s.send(3)).unwrap(); + + drop(s); + + assert_eq!(future::block_on(r.recv()), Ok(1)); + assert_eq!(future::block_on(r.recv()), Ok(2)); + assert_eq!(future::block_on(r.recv()), Ok(3)); + assert_eq!(future::block_on(r.recv()), Err(RecvError)); +} + +#[test] +fn len() { + let (s, r) = unbounded(); + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); + + for i in 0..50 { + future::block_on(s.send(i)).unwrap(); + assert_eq!(s.len(), i + 1); + } + + for i in 0..50 { + future::block_on(r.recv()).unwrap(); + assert_eq!(r.len(), 50 - i - 1); + } + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); +} + +#[test] +fn receiver_count() { + let (s, r) = unbounded::<()>(); + let receiver_clones: Vec<_> = (0..20).map(|_| r.clone()).collect(); + + assert_eq!(s.receiver_count(), 21); + assert_eq!(r.receiver_count(), 21); + + drop(receiver_clones); + + assert_eq!(s.receiver_count(), 1); + assert_eq!(r.receiver_count(), 1); +} + +#[test] +fn sender_count() { + let (s, r) = unbounded::<()>(); + let sender_clones: Vec<_> = (0..20).map(|_| s.clone()).collect(); + + assert_eq!(s.sender_count(), 21); + assert_eq!(r.sender_count(), 21); + + drop(sender_clones); + + assert_eq!(s.receiver_count(), 1); + assert_eq!(r.receiver_count(), 1); +} + +#[test] +fn close_wakes_receiver() { + let (s, r) = unbounded::<()>(); + + Parallel::new() + .add(move || { + assert_eq!(future::block_on(r.recv()), Err(RecvError)); + }) + .add(move || { + sleep(ms(1000)); + drop(s); + }) + .run(); +} + +#[test] +fn spsc() { + const COUNT: usize = 100_000; + + let (s, r) = unbounded(); + + Parallel::new() + .add(move || { + for i in 0..COUNT { + assert_eq!(future::block_on(r.recv()), Ok(i)); + } + assert_eq!(future::block_on(r.recv()), Err(RecvError)); + }) + .add(move || { + for i in 0..COUNT { + future::block_on(s.send(i)).unwrap(); + } + }) + .run(); +} + +#[test] +fn mpmc() { + const COUNT: usize = 25_000; + const THREADS: usize = 4; + + let (s, r) = unbounded::<usize>(); + let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); + + Parallel::new() + .each(0..THREADS, |_| { + for _ in 0..COUNT { + let n = future::block_on(r.recv()).unwrap(); + v[n].fetch_add(1, Ordering::SeqCst); + } + }) + .each(0..THREADS, |_| { + for i in 0..COUNT { + future::block_on(s.send(i)).unwrap(); + } + }) + .run(); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + + for c in v { + assert_eq!(c.load(Ordering::SeqCst), THREADS); + } +} + +#[test] +fn mpmc_stream() { + const COUNT: usize = 25_000; + const THREADS: usize = 4; + + let (s, r) = unbounded::<usize>(); + let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); + let v = &v; + + Parallel::new() + .each(0..THREADS, { + let mut r = r.clone(); + move |_| { + for _ in 0..COUNT { + let n = future::block_on(r.next()).unwrap(); + v[n].fetch_add(1, Ordering::SeqCst); + } + } + }) + .each(0..THREADS, |_| { + for i in 0..COUNT { + future::block_on(s.send(i)).unwrap(); + } + }) + .run(); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + + for c in v { + assert_eq!(c.load(Ordering::SeqCst), THREADS); + } +} + +#[test] +fn weak() { + let (s, r) = unbounded::<usize>(); + + // Create a weak sender/receiver pair. + let (weak_s, weak_r) = (s.downgrade(), r.downgrade()); + + // Upgrade and send. + { + let s = weak_s.upgrade().unwrap(); + s.send_blocking(3).unwrap(); + let r = weak_r.upgrade().unwrap(); + assert_eq!(r.recv_blocking(), Ok(3)); + } + + // Drop the original sender/receiver pair. + drop((s, r)); + + // Try to upgrade again. + { + assert!(weak_s.upgrade().is_none()); + assert!(weak_r.upgrade().is_none()); + } +} |