diff options
author | DongHun Kwak <dh0128.kwak@samsung.com> | 2023-04-12 15:57:33 +0900 |
---|---|---|
committer | DongHun Kwak <dh0128.kwak@samsung.com> | 2023-04-12 15:57:33 +0900 |
commit | ae11f7847deda4b2ccb61499eed06ce8c0e3836d (patch) | |
tree | 9c28a8546ec67a9b6decb125de53dcca5e86fd90 | |
download | rust-async-net-ae11f7847deda4b2ccb61499eed06ce8c0e3836d.tar.gz rust-async-net-ae11f7847deda4b2ccb61499eed06ce8c0e3836d.tar.bz2 rust-async-net-ae11f7847deda4b2ccb61499eed06ce8c0e3836d.zip |
Import async-net 1.7.0upstream/1.7.0upstream
-rw-r--r-- | .cargo_vcs_info.json | 6 | ||||
-rw-r--r-- | CHANGELOG.md | 79 | ||||
-rw-r--r-- | Cargo.toml | 48 | ||||
-rw-r--r-- | Cargo.toml.orig | 25 | ||||
-rw-r--r-- | LICENSE-APACHE | 201 | ||||
-rw-r--r-- | LICENSE-MIT | 23 | ||||
-rw-r--r-- | README.md | 55 | ||||
-rw-r--r-- | build.rs | 16 | ||||
-rw-r--r-- | src/addr.rs | 212 | ||||
-rw-r--r-- | src/lib.rs | 65 | ||||
-rw-r--r-- | src/tcp.rs | 770 | ||||
-rw-r--r-- | src/udp.rs | 667 | ||||
-rw-r--r-- | src/unix.rs | 779 |
13 files changed, 2946 insertions, 0 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json new file mode 100644 index 0000000..80ae989 --- /dev/null +++ b/.cargo_vcs_info.json @@ -0,0 +1,6 @@ +{ + "git": { + "sha1": "d2508358c75910050eaae181efdd9fc276f450c2" + }, + "path_in_vcs": "" +}
\ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..da1cfe1 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,79 @@ +# Version 1.7.0 + +- Implement I/O safety traits on Rust 1.63+ (#21) + +# Version 1.6.1 + +- Override `AsyncWrite::poll_write_vectored` for `TcpStream`. +- Remove boxed futures from `TcpStream` and `UnixStream`. + +# Version 1.6.0 + +- Add `From` impls for conversion into inner networking types `Arc<Async<T>>`. (#12) +- Optimize allocations in Listeners. (#11) + +# Version 1.5.0 + +- Add `Into` impls for conversion into inner networking types `Arc<Async<T>>`. + +# Version 1.4.7 + +- Update `futures-lite`. + +# Version 1.4.6 + +- Remove random yielding - rely on `async-io` for that instead. + +# Version 1.4.5 + +- Don't poll `readiness()` future again after it has returned an error. + +# Version 1.4.4 + +- Store `readable` future inside `Incoming` struct. + +# Version 1.4.3 + +- Minor nits in the docs. + +# Version 1.4.3 + +- Make `TcpStream` and `UnixStream` unwind-safe. + +# Version 1.4.1 + +- Make `TcpStream` and `UnixStream` implement `Sync`. + +# Version 1.4.0 + +- Remove `AsyncRead`/`AsyncWrite` impls for `&TcpStream`/`&UnixStream` + (technically a breaking change, but the existence of these impls is a bug) + +# Version 1.3.0 + +- Add type converstions using `From` and `TryFrom` impls. + +# Version 1.2.0 + +- Update `blocking` and `async-io` to v1.0 + +# Version 1.1.0 + +- Reexport `AddrParseError`. + +# Version 1.0.0 + +- Add `resolve()`. +- Re-export more types from `std::net`. + +# Version 0.1.2 + +- Update `blocking` to v0.5.0 + +# Version 0.1.1 + +- Reduce the number of dependencies + +# Version 0.1.0 + +- Initial version diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..b859dc7 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,48 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies. +# +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. + +[package] +edition = "2018" +rust-version = "1.47" +name = "async-net" +version = "1.7.0" +authors = ["Stjepan Glavina <stjepang@gmail.com>"] +exclude = ["/.*"] +description = "Async networking primitives for TCP/UDP/Unix communication" +homepage = "https://github.com/smol-rs/async-net" +documentation = "https://docs.rs/async-net" +readme = "README.md" +keywords = [ + "networking", + "uds", + "mio", + "reactor", + "std", +] +categories = [ + "asynchronous", + "network-programming", + "os", +] +license = "Apache-2.0 OR MIT" +repository = "https://github.com/smol-rs/async-net" + +[dependencies.async-io] +version = "1.6.0" + +[dependencies.blocking] +version = "1.0.0" + +[dependencies.futures-lite] +version = "1.11.0" + +[build-dependencies.autocfg] +version = "1" diff --git a/Cargo.toml.orig b/Cargo.toml.orig new file mode 100644 index 0000000..b435df6 --- /dev/null +++ b/Cargo.toml.orig @@ -0,0 +1,25 @@ +[package] +name = "async-net" +# When publishing a new version: +# - Update CHANGELOG.md +# - Create "v1.x.y" git tag +version = "1.7.0" +authors = ["Stjepan Glavina <stjepang@gmail.com>"] +edition = "2018" +rust-version = "1.47" +description = "Async networking primitives for TCP/UDP/Unix communication" +license = "Apache-2.0 OR MIT" +repository = "https://github.com/smol-rs/async-net" +homepage = "https://github.com/smol-rs/async-net" +documentation = "https://docs.rs/async-net" +keywords = ["networking", "uds", "mio", "reactor", "std"] +categories = ["asynchronous", "network-programming", "os"] +exclude = ["/.*"] + +[dependencies] +async-io = "1.6.0" +blocking = "1.0.0" +futures-lite = "1.11.0" + +[build-dependencies] +autocfg = "1" diff --git a/LICENSE-APACHE b/LICENSE-APACHE new file mode 100644 index 0000000..16fe87b --- /dev/null +++ b/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/LICENSE-MIT b/LICENSE-MIT new file mode 100644 index 0000000..31aa793 --- /dev/null +++ b/LICENSE-MIT @@ -0,0 +1,23 @@ +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..06af360 --- /dev/null +++ b/README.md @@ -0,0 +1,55 @@ +# async-net + +[![Build](https://github.com/smol-rs/async-net/workflows/Build%20and%20test/badge.svg)]( +https://github.com/smol-rs/async-net/actions) +[![License](https://img.shields.io/badge/license-Apache--2.0_OR_MIT-blue.svg)]( +https://github.com/smol-rs/async-net) +[![Cargo](https://img.shields.io/crates/v/async-net.svg)]( +https://crates.io/crates/async-net) +[![Documentation](https://docs.rs/async-net/badge.svg)]( +https://docs.rs/async-net) + +Async networking primitives for TCP/UDP/Unix communication. + +This crate is an async version of [`std::net`] and [`std::os::unix::net`]. + +[`std::net`]: https://doc.rust-lang.org/std/net/index.html +[`std::os::unix::net`]: https://doc.rust-lang.org/std/os/unix/net/index.html + +## Implementation + +This crate uses [`async-io`] for async I/O and [`blocking`] for DNS lookups. + +[`async-io`]: https://docs.rs/async-io +[`blocking`]: https://docs.rs/blocking + +## Examples + +A simple UDP server that echoes messages back to the sender: + +```rust +use async_net::UdpSocket; + +let socket = UdpSocket::bind("127.0.0.1:8080").await?; +let mut buf = vec![0u8; 1024]; + +loop { + let (n, addr) = socket.recv_from(&mut buf).await?; + socket.send_to(&buf[..n], &addr).await?; +} +``` + +## License + +Licensed under either of + + * Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) + * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) + +at your option. + +#### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in the work by you, as defined in the Apache-2.0 license, shall be +dual licensed as above, without any additional terms or conditions. diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..db1e913 --- /dev/null +++ b/build.rs @@ -0,0 +1,16 @@ +fn main() { + let cfg = match autocfg::AutoCfg::new() { + Ok(cfg) => cfg, + Err(e) => { + println!( + "cargo-warning=async-net: failed to detect compiler features: {}", + e + ); + return; + } + }; + + if !cfg.probe_rustc_version(1, 63) { + autocfg::emit("async_net_no_io_safety"); + } +} diff --git a/src/addr.rs b/src/addr.rs new file mode 100644 index 0000000..d210916 --- /dev/null +++ b/src/addr.rs @@ -0,0 +1,212 @@ +use std::fmt; +use std::future::Future; +use std::io; +use std::mem; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use blocking::unblock; +use futures_lite::future; + +/// Converts or resolves addresses to [`SocketAddr`] values. +/// +/// This trait currently only appears in function signatures and cannot be used directly. +/// +/// However, you can use the [`resolve()`][`super::resolve()`] function to resolve addresses. +pub trait AsyncToSocketAddrs: Sealed {} + +pub trait Sealed { + /// Returned iterator over socket addresses which this type may correspond to. + type Iter: Iterator<Item = SocketAddr> + Unpin; + + /// Converts this object to an iterator of resolved `SocketAddr`s. + /// + /// The returned iterator may not actually yield any values depending on the outcome of any + /// resolution performed. + /// + /// Note that this function may block a backend thread while resolution is performed. + fn to_socket_addrs(&self) -> ToSocketAddrsFuture<Self::Iter>; +} + +pub enum ToSocketAddrsFuture<I> { + Resolving(future::Boxed<io::Result<I>>), + Ready(io::Result<I>), + Done, +} + +impl<I> fmt::Debug for ToSocketAddrsFuture<I> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "ToSocketAddrsFuture") + } +} + +impl<I: Iterator<Item = SocketAddr> + Unpin> Future for ToSocketAddrsFuture<I> { + type Output = io::Result<I>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let state = mem::replace(&mut *self, ToSocketAddrsFuture::Done); + + match state { + ToSocketAddrsFuture::Resolving(mut task) => { + let poll = Pin::new(&mut task).poll(cx); + if poll.is_pending() { + *self = ToSocketAddrsFuture::Resolving(task); + } + poll + } + ToSocketAddrsFuture::Ready(res) => Poll::Ready(res), + ToSocketAddrsFuture::Done => panic!("polled a completed future"), + } + } +} + +impl AsyncToSocketAddrs for SocketAddr {} + +impl Sealed for SocketAddr { + type Iter = std::option::IntoIter<SocketAddr>; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture<Self::Iter> { + ToSocketAddrsFuture::Ready(Ok(Some(*self).into_iter())) + } +} + +impl AsyncToSocketAddrs for SocketAddrV4 {} + +impl Sealed for SocketAddrV4 { + type Iter = std::option::IntoIter<SocketAddr>; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture<Self::Iter> { + Sealed::to_socket_addrs(&SocketAddr::V4(*self)) + } +} + +impl AsyncToSocketAddrs for SocketAddrV6 {} + +impl Sealed for SocketAddrV6 { + type Iter = std::option::IntoIter<SocketAddr>; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture<Self::Iter> { + Sealed::to_socket_addrs(&SocketAddr::V6(*self)) + } +} + +impl AsyncToSocketAddrs for (IpAddr, u16) {} + +impl Sealed for (IpAddr, u16) { + type Iter = std::option::IntoIter<SocketAddr>; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture<Self::Iter> { + let (ip, port) = *self; + match ip { + IpAddr::V4(a) => Sealed::to_socket_addrs(&(a, port)), + IpAddr::V6(a) => Sealed::to_socket_addrs(&(a, port)), + } + } +} + +impl AsyncToSocketAddrs for (Ipv4Addr, u16) {} + +impl Sealed for (Ipv4Addr, u16) { + type Iter = std::option::IntoIter<SocketAddr>; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture<Self::Iter> { + let (ip, port) = *self; + Sealed::to_socket_addrs(&SocketAddrV4::new(ip, port)) + } +} + +impl AsyncToSocketAddrs for (Ipv6Addr, u16) {} + +impl Sealed for (Ipv6Addr, u16) { + type Iter = std::option::IntoIter<SocketAddr>; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture<Self::Iter> { + let (ip, port) = *self; + Sealed::to_socket_addrs(&SocketAddrV6::new(ip, port, 0, 0)) + } +} + +impl AsyncToSocketAddrs for (&str, u16) {} + +impl Sealed for (&str, u16) { + type Iter = std::vec::IntoIter<SocketAddr>; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture<Self::Iter> { + let (host, port) = *self; + + if let Ok(addr) = host.parse::<Ipv4Addr>() { + let addr = SocketAddrV4::new(addr, port); + return ToSocketAddrsFuture::Ready(Ok(vec![SocketAddr::V4(addr)].into_iter())); + } + + if let Ok(addr) = host.parse::<Ipv6Addr>() { + let addr = SocketAddrV6::new(addr, port, 0, 0); + return ToSocketAddrsFuture::Ready(Ok(vec![SocketAddr::V6(addr)].into_iter())); + } + + let host = host.to_string(); + let future = unblock(move || { + let addr = (host.as_str(), port); + ToSocketAddrs::to_socket_addrs(&addr) + }); + ToSocketAddrsFuture::Resolving(Box::pin(future)) + } +} + +impl AsyncToSocketAddrs for (String, u16) {} + +impl Sealed for (String, u16) { + type Iter = std::vec::IntoIter<SocketAddr>; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture<Self::Iter> { + Sealed::to_socket_addrs(&(&*self.0, self.1)) + } +} + +impl AsyncToSocketAddrs for str {} + +impl Sealed for str { + type Iter = std::vec::IntoIter<SocketAddr>; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture<Self::Iter> { + if let Ok(addr) = self.parse() { + return ToSocketAddrsFuture::Ready(Ok(vec![addr].into_iter())); + } + + let addr = self.to_string(); + let future = unblock(move || std::net::ToSocketAddrs::to_socket_addrs(addr.as_str())); + ToSocketAddrsFuture::Resolving(Box::pin(future)) + } +} + +impl AsyncToSocketAddrs for &[SocketAddr] {} + +impl<'a> Sealed for &'a [SocketAddr] { + type Iter = std::iter::Cloned<std::slice::Iter<'a, SocketAddr>>; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture<Self::Iter> { + ToSocketAddrsFuture::Ready(Ok(self.iter().cloned())) + } +} + +impl<T: AsyncToSocketAddrs + ?Sized> AsyncToSocketAddrs for &T {} + +impl<T: Sealed + ?Sized> Sealed for &T { + type Iter = T::Iter; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture<Self::Iter> { + Sealed::to_socket_addrs(&**self) + } +} + +impl AsyncToSocketAddrs for String {} + +impl Sealed for String { + type Iter = std::vec::IntoIter<SocketAddr>; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture<Self::Iter> { + Sealed::to_socket_addrs(&**self) + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..c281ec2 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,65 @@ +//! Async networking primitives for TCP/UDP/Unix communication. +//! +//! This crate is an async version of [`std::net`] and [`std::os::unix::net`]. +//! +//! # Implementation +//! +//! This crate uses [`async-io`] for async I/O and [`blocking`] for DNS lookups. +//! +//! [`async-io`]: https://docs.rs/async-io +//! [`blocking`]: https://docs.rs/blocking +//! +//! # Examples +//! +//! A simple UDP server that echoes messages back to the sender: +//! +//! ```no_run +//! use async_net::UdpSocket; +//! +//! # futures_lite::future::block_on(async { +//! let socket = UdpSocket::bind("127.0.0.1:8080").await?; +//! let mut buf = vec![0u8; 1024]; +//! +//! loop { +//! let (n, addr) = socket.recv_from(&mut buf).await?; +//! socket.send_to(&buf[..n], &addr).await?; +//! } +//! # std::io::Result::Ok(()) }); +//! ``` + +#![forbid(unsafe_code)] +#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] + +#[cfg(unix)] +pub mod unix; + +mod addr; +mod tcp; +mod udp; + +pub use addr::AsyncToSocketAddrs; +pub use tcp::{Incoming, TcpListener, TcpStream}; +pub use udp::UdpSocket; + +use std::io; + +#[doc(no_inline)] +pub use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, Shutdown, SocketAddr, SocketAddrV4, SocketAddrV6}; + +#[doc(no_inline)] +pub use std::net::AddrParseError; + +/// Converts or resolves addresses to [`SocketAddr`] values. +/// +/// # Examples +/// +/// ``` +/// # futures_lite::future::block_on(async { +/// for addr in async_net::resolve("google.com:80").await? { +/// println!("{}", addr); +/// } +/// # std::io::Result::Ok(()) }); +/// ``` +pub async fn resolve<A: AsyncToSocketAddrs>(addr: A) -> io::Result<Vec<SocketAddr>> { + Ok(addr.to_socket_addrs().await?.collect()) +} diff --git a/src/tcp.rs b/src/tcp.rs new file mode 100644 index 0000000..2c1d10d --- /dev/null +++ b/src/tcp.rs @@ -0,0 +1,770 @@ +use std::convert::TryFrom; +use std::fmt; +use std::io::{self, IoSlice, Read as _, Write as _}; +use std::net::{Shutdown, SocketAddr}; +#[cfg(all(not(async_net_no_io_safety), unix))] +use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; +#[cfg(unix)] +use std::os::unix::io::{AsRawFd, RawFd}; +#[cfg(windows)] +use std::os::windows::io::{AsRawSocket, RawSocket}; +#[cfg(all(not(async_net_no_io_safety), windows))] +use std::os::windows::io::{AsSocket, BorrowedSocket, OwnedSocket}; +use std::panic::{RefUnwindSafe, UnwindSafe}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use async_io::Async; +use futures_lite::{prelude::*, ready}; + +use crate::addr::AsyncToSocketAddrs; + +/// A TCP server, listening for connections. +/// +/// After creating a [`TcpListener`] by [`bind`][`TcpListener::bind()`]ing it to an address, it +/// listens for incoming TCP connections. These can be accepted by calling +/// [`accept()`][`TcpListener::accept()`] or by awaiting items from the stream of +/// [`incoming`][`TcpListener::incoming()`] connections. +/// +/// Cloning a [`TcpListener`] creates another handle to the same socket. The socket will be closed +/// when all handles to it are dropped. +/// +/// The Transmission Control Protocol is specified in [IETF RFC 793]. +/// +/// [IETF RFC 793]: https://tools.ietf.org/html/rfc793 +/// +/// # Examples +/// +/// ```no_run +/// use async_net::TcpListener; +/// use futures_lite::prelude::*; +/// +/// # futures_lite::future::block_on(async { +/// let listener = TcpListener::bind("127.0.0.1:8080").await?; +/// let mut incoming = listener.incoming(); +/// +/// while let Some(stream) = incoming.next().await { +/// let mut stream = stream?; +/// stream.write_all(b"hello").await?; +/// } +/// # std::io::Result::Ok(()) }); +/// ``` +#[derive(Clone, Debug)] +pub struct TcpListener { + inner: Arc<Async<std::net::TcpListener>>, +} + +impl TcpListener { + fn new(inner: Arc<Async<std::net::TcpListener>>) -> TcpListener { + TcpListener { inner } + } + + /// Creates a new [`TcpListener`] bound to the given address. + /// + /// Binding with a port number of 0 will request that the operating system assigns an available + /// port to this listener. The assigned port can be queried via the + /// [`local_addr()`][`TcpListener::local_addr()`] method. + /// + /// If `addr` yields multiple addresses, binding will be attempted with each of the addresses + /// until one succeeds and returns the listener. If none of the addresses succeed in creating a + /// listener, the error from the last attempt is returned. + /// + /// # Examples + /// + /// Create a TCP listener bound to `127.0.0.1:80`: + /// + /// ```no_run + /// use async_net::TcpListener; + /// + /// # futures_lite::future::block_on(async { + /// let listener = TcpListener::bind("127.0.0.1:80").await?; + /// # std::io::Result::Ok(()) }); + /// ``` + /// + /// Create a TCP listener bound to `127.0.0.1:80`. If that address is unavailable, then try + /// binding to `127.0.0.1:443`: + /// + /// ```no_run + /// use async_net::{SocketAddr, TcpListener}; + /// + /// # futures_lite::future::block_on(async { + /// let addrs = [ + /// SocketAddr::from(([127, 0, 0, 1], 80)), + /// SocketAddr::from(([127, 0, 0, 1], 443)), + /// ]; + /// let listener = TcpListener::bind(&addrs[..]).await.unwrap(); + /// # std::io::Result::Ok(()) }); + pub async fn bind<A: AsyncToSocketAddrs>(addr: A) -> io::Result<TcpListener> { + let mut last_err = None; + + for addr in addr.to_socket_addrs().await? { + match Async::<std::net::TcpListener>::bind(addr) { + Ok(listener) => return Ok(TcpListener::new(Arc::new(listener))), + Err(err) => last_err = Some(err), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not resolve to any of the addresses", + ) + })) + } + + /// Returns the local address this listener is bound to. + /// + /// # Examples + /// + /// Bind to port 0 and then see which port was assigned by the operating system: + /// + /// ```no_run + /// use async_net::{SocketAddr, TcpListener}; + /// + /// # futures_lite::future::block_on(async { + /// let listener = TcpListener::bind("127.0.0.1:0").await?; + /// println!("Listening on {}", listener.local_addr()?); + /// # std::io::Result::Ok(()) }); + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.inner.get_ref().local_addr() + } + + /// Accepts a new incoming connection. + /// + /// Returns a TCP stream and the address it is connected to. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::TcpListener; + /// + /// # futures_lite::future::block_on(async { + /// let listener = TcpListener::bind("127.0.0.1:8080").await?; + /// let (stream, addr) = listener.accept().await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { + let (stream, addr) = self.inner.accept().await?; + Ok((TcpStream::new(Arc::new(stream)), addr)) + } + + /// Returns a stream of incoming connections. + /// + /// Iterating over this stream is equivalent to calling [`accept()`][`TcpListener::accept()`] + /// in a loop. The stream of connections is infinite, i.e awaiting the next connection will + /// never result in [`None`]. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::TcpListener; + /// use futures_lite::prelude::*; + /// + /// # futures_lite::future::block_on(async { + /// let listener = TcpListener::bind("127.0.0.1:0").await?; + /// let mut incoming = listener.incoming(); + /// + /// while let Some(stream) = incoming.next().await { + /// let mut stream = stream?; + /// stream.write_all(b"hello").await?; + /// } + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn incoming(&self) -> Incoming<'_> { + Incoming { + incoming: Box::pin(self.inner.incoming()), + } + } + + /// Gets the value of the `IP_TTL` option for this socket. + /// + /// This option configures the time-to-live field that is used in every packet sent from this + /// socket. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::TcpListener; + /// + /// # futures_lite::future::block_on(async { + /// let listener = TcpListener::bind("127.0.0.1:80").await?; + /// listener.set_ttl(100)?; + /// assert_eq!(listener.ttl()?, 100); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn ttl(&self) -> io::Result<u32> { + self.inner.get_ref().ttl() + } + + /// Sets the value of the `IP_TTL` option for this socket. + /// + /// This option configures the time-to-live field that is used in every packet sent from this + /// socket. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::TcpListener; + /// + /// # futures_lite::future::block_on(async { + /// let listener = TcpListener::bind("127.0.0.1:80").await?; + /// listener.set_ttl(100)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.inner.get_ref().set_ttl(ttl) + } +} + +impl From<Async<std::net::TcpListener>> for TcpListener { + fn from(listener: Async<std::net::TcpListener>) -> TcpListener { + TcpListener::new(Arc::new(listener)) + } +} + +impl TryFrom<std::net::TcpListener> for TcpListener { + type Error = io::Error; + + fn try_from(listener: std::net::TcpListener) -> io::Result<TcpListener> { + Ok(TcpListener::new(Arc::new(Async::new(listener)?))) + } +} + +impl From<TcpListener> for Arc<Async<std::net::TcpListener>> { + fn from(val: TcpListener) -> Self { + val.inner + } +} + +#[cfg(unix)] +impl AsRawFd for TcpListener { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +#[cfg(all(not(async_net_no_io_safety), unix))] +impl AsFd for TcpListener { + fn as_fd(&self) -> BorrowedFd<'_> { + self.inner.get_ref().as_fd() + } +} + +#[cfg(all(not(async_net_no_io_safety), unix))] +impl TryFrom<OwnedFd> for TcpListener { + type Error = io::Error; + + fn try_from(value: OwnedFd) -> Result<Self, Self::Error> { + Self::try_from(std::net::TcpListener::from(value)) + } +} + +#[cfg(windows)] +impl AsRawSocket for TcpListener { + fn as_raw_socket(&self) -> RawSocket { + self.inner.as_raw_socket() + } +} + +#[cfg(all(not(async_net_no_io_safety), windows))] +impl AsSocket for TcpListener { + fn as_socket(&self) -> BorrowedSocket<'_> { + self.inner.get_ref().as_socket() + } +} + +#[cfg(all(not(async_net_no_io_safety), windows))] +impl TryFrom<OwnedSocket> for TcpListener { + type Error = io::Error; + + fn try_from(value: OwnedSocket) -> Result<Self, Self::Error> { + Self::try_from(std::net::TcpListener::from(value)) + } +} + +/// A stream of incoming TCP connections. +/// +/// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is +/// created by the [`TcpListener::incoming()`] method. +pub struct Incoming<'a> { + incoming: + Pin<Box<dyn Stream<Item = io::Result<Async<std::net::TcpStream>>> + Send + Sync + 'a>>, +} + +impl Stream for Incoming<'_> { + type Item = io::Result<TcpStream>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let res = ready!(Pin::new(&mut self.incoming).poll_next(cx)); + Poll::Ready(res.map(|res| res.map(|stream| TcpStream::new(Arc::new(stream))))) + } +} + +impl fmt::Debug for Incoming<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Incoming {{ ... }}") + } +} + +/// A TCP connection. +/// +/// A [`TcpStream`] can be created by [`connect`][`TcpStream::connect()`]ing to an endpoint or by +/// [`accept`][`TcpListener::accept()`]ing an incoming connection. +/// +/// [`TcpStream`] is a bidirectional stream that implements traits [`AsyncRead`] and +/// [`AsyncWrite`]. +/// +/// Cloning a [`TcpStream`] creates another handle to the same socket. The socket will be closed +/// when all handles to it are dropped. The reading and writing portions of the connection can also +/// be shut down individually with the [`shutdown()`][`TcpStream::shutdown()`] method. +/// +/// The Transmission Control Protocol is specified in [IETF RFC 793]. +/// +/// [IETF RFC 793]: https://tools.ietf.org/html/rfc793 +/// +/// # Examples +/// +/// ```no_run +/// use async_net::TcpStream; +/// use futures_lite::prelude::*; +/// +/// # futures_lite::future::block_on(async { +/// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; +/// stream.write_all(b"hello").await?; +/// +/// let mut buf = vec![0u8; 1024]; +/// let n = stream.read(&mut buf).await?; +/// # std::io::Result::Ok(()) }); +/// ``` +pub struct TcpStream { + inner: Arc<Async<std::net::TcpStream>>, + readable: Option<async_io::ReadableOwned<std::net::TcpStream>>, + writable: Option<async_io::WritableOwned<std::net::TcpStream>>, +} + +impl UnwindSafe for TcpStream {} +impl RefUnwindSafe for TcpStream {} + +impl TcpStream { + fn new(inner: Arc<Async<std::net::TcpStream>>) -> TcpStream { + TcpStream { + inner, + readable: None, + writable: None, + } + } + + /// Creates a TCP connection to the specified address. + /// + /// This method will create a new TCP socket and attempt to connect it to the provided `addr`, + /// + /// If `addr` yields multiple addresses, connecting will be attempted with each of the + /// addresses until connecting to one succeeds. If none of the addresses result in a successful + /// connection, the error from the last connect attempt is returned. + /// + /// # Examples + /// + /// Connect to `example.com:80`: + /// + /// ``` + /// use async_net::TcpStream; + /// + /// # futures_lite::future::block_on(async { + /// let stream = TcpStream::connect("example.com:80").await?; + /// # std::io::Result::Ok(()) }); + /// ``` + /// + /// Connect to `127.0.0.1:8080`. If that fails, then try connecting to `127.0.0.1:8081`: + /// + /// ```no_run + /// use async_net::{SocketAddr, TcpStream}; + /// + /// # futures_lite::future::block_on(async { + /// let addrs = [ + /// SocketAddr::from(([127, 0, 0, 1], 8080)), + /// SocketAddr::from(([127, 0, 0, 1], 8081)), + /// ]; + /// let stream = TcpStream::connect(&addrs[..]).await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn connect<A: AsyncToSocketAddrs>(addr: A) -> io::Result<TcpStream> { + let mut last_err = None; + + for addr in addr.to_socket_addrs().await? { + match Async::<std::net::TcpStream>::connect(addr).await { + Ok(stream) => return Ok(TcpStream::new(Arc::new(stream))), + Err(e) => last_err = Some(e), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not connect to any of the addresses", + ) + })) + } + + /// Returns the local address this stream is bound to. + /// + /// # Examples + /// + /// ``` + /// use async_net::TcpStream; + /// + /// # futures_lite::future::block_on(async { + /// let stream = TcpStream::connect("example.com:80").await?; + /// println!("Local address is {}", stream.local_addr()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.inner.get_ref().local_addr() + } + + /// Returns the remote address this stream is connected to. + /// + /// # Examples + /// + /// ``` + /// use async_net::TcpStream; + /// + /// # futures_lite::future::block_on(async { + /// let stream = TcpStream::connect("example.com:80").await?; + /// println!("Connected to {}", stream.peer_addr()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.inner.get_ref().peer_addr() + } + + /// Shuts down the read half, write half, or both halves of this connection. + /// + /// This method will cause all pending and future I/O in the given directions to return + /// immediately with an appropriate value (see the documentation of [`Shutdown`]). + /// + /// [`Shutdown`]: https://doc.rust-lang.org/std/net/enum.Shutdown.html + /// + /// # Examples + /// + /// ```no_run + /// use async_net::{Shutdown, TcpStream}; + /// + /// # futures_lite::future::block_on(async { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// stream.shutdown(Shutdown::Both)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn shutdown(&self, how: std::net::Shutdown) -> std::io::Result<()> { + self.inner.get_ref().shutdown(how) + } + + /// Receives data without removing it from the queue. + /// + /// On success, returns the number of bytes peeked. + /// + /// Successive calls return the same data. This is accomplished by passing `MSG_PEEK` as a flag + /// to the underlying `recv` system call. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::TcpStream; + /// + /// # futures_lite::future::block_on(async { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// let mut buf = vec![0; 1024]; + /// let n = stream.peek(&mut buf).await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> { + self.inner.peek(buf).await + } + + /// Gets the value of the `TCP_NODELAY` option for this socket. + /// + /// If set to `true`, this option disables the [Nagle algorithm][nagle-wiki]. This means that + /// written data is always sent as soon as possible, even if there is only a small amount of + /// it. + /// + /// When set to `false`, written data is buffered until there is a certain amount to send out, + /// thereby avoiding the frequent sending of small packets. + /// + /// [nagle-wiki]: https://en.wikipedia.org/wiki/Nagle%27s_algorithm + /// + /// # Examples + /// + /// ```no_run + /// use async_net::TcpStream; + /// + /// # futures_lite::future::block_on(async { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// println!("TCP_NODELAY is set to {}", stream.nodelay()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn nodelay(&self) -> io::Result<bool> { + self.inner.get_ref().nodelay() + } + + /// Sets the value of the `TCP_NODELAY` option for this socket. + /// + /// If set to `true`, this option disables the [Nagle algorithm][nagle-wiki]. This means that + /// written data is always sent as soon as possible, even if there is only a small amount of + /// it. + /// + /// When set to `false`, written data is buffered until there is a certain amount to send out, + /// thereby avoiding the frequent sending of small packets. + /// + /// [nagle-wiki]: https://en.wikipedia.org/wiki/Nagle%27s_algorithm + /// + /// # Examples + /// + /// ```no_run + /// use async_net::TcpStream; + /// + /// # futures_lite::future::block_on(async { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// stream.set_nodelay(false)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { + self.inner.get_ref().set_nodelay(nodelay) + } + + /// Gets the value of the `IP_TTL` option for this socket. + /// + /// This option configures the time-to-live field that is used in every packet sent from this + /// socket. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::TcpStream; + /// + /// # futures_lite::future::block_on(async { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// println!("IP_TTL is set to {}", stream.ttl()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn ttl(&self) -> io::Result<u32> { + self.inner.get_ref().ttl() + } + + /// Sets the value of the `IP_TTL` option for this socket. + /// + /// This option configures the time-to-live field that is used in every packet sent from this + /// socket. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::TcpStream; + /// + /// # futures_lite::future::block_on(async { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// stream.set_ttl(100)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.inner.get_ref().set_ttl(ttl) + } +} + +impl fmt::Debug for TcpStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.inner.fmt(f) + } +} + +impl Clone for TcpStream { + fn clone(&self) -> TcpStream { + TcpStream::new(self.inner.clone()) + } +} + +impl From<Async<std::net::TcpStream>> for TcpStream { + fn from(stream: Async<std::net::TcpStream>) -> TcpStream { + TcpStream::new(Arc::new(stream)) + } +} + +impl From<TcpStream> for Arc<Async<std::net::TcpStream>> { + fn from(val: TcpStream) -> Self { + val.inner + } +} + +impl TryFrom<std::net::TcpStream> for TcpStream { + type Error = io::Error; + + fn try_from(stream: std::net::TcpStream) -> io::Result<TcpStream> { + Ok(TcpStream::new(Arc::new(Async::new(stream)?))) + } +} + +#[cfg(unix)] +impl AsRawFd for TcpStream { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +#[cfg(all(not(async_net_no_io_safety), unix))] +impl AsFd for TcpStream { + fn as_fd(&self) -> BorrowedFd<'_> { + self.inner.get_ref().as_fd() + } +} + +#[cfg(all(not(async_net_no_io_safety), unix))] +impl TryFrom<OwnedFd> for TcpStream { + type Error = io::Error; + + fn try_from(value: OwnedFd) -> Result<Self, Self::Error> { + Self::try_from(std::net::TcpStream::from(value)) + } +} + +#[cfg(windows)] +impl AsRawSocket for TcpStream { + fn as_raw_socket(&self) -> RawSocket { + self.inner.as_raw_socket() + } +} + +#[cfg(all(not(async_net_no_io_safety), windows))] +impl AsSocket for TcpStream { + fn as_socket(&self) -> BorrowedSocket<'_> { + self.inner.get_ref().as_socket() + } +} + +#[cfg(all(not(async_net_no_io_safety), windows))] +impl TryFrom<OwnedSocket> for TcpStream { + type Error = io::Error; + + fn try_from(value: OwnedSocket) -> Result<Self, Self::Error> { + Self::try_from(std::net::TcpStream::from(value)) + } +} + +impl AsyncRead for TcpStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + loop { + // Attempt the non-blocking operation. + match self.inner.get_ref().read(buf) { + Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} + res => { + self.readable = None; + return Poll::Ready(res); + } + } + + // Initialize the future to wait for readiness. + if self.readable.is_none() { + self.readable = Some(self.inner.clone().readable_owned()); + } + + // Poll the future for readiness. + if let Some(f) = &mut self.readable { + let res = ready!(Pin::new(f).poll(cx)); + self.readable = None; + res?; + } + } + } +} + +impl AsyncWrite for TcpStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + loop { + // Attempt the non-blocking operation. + match self.inner.get_ref().write(buf) { + Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} + res => { + self.writable = None; + return Poll::Ready(res); + } + } + + // Initialize the future to wait for readiness. + if self.writable.is_none() { + self.writable = Some(self.inner.clone().writable_owned()); + } + + // Poll the future for readiness. + if let Some(f) = &mut self.writable { + let res = ready!(Pin::new(f).poll(cx)); + self.writable = None; + res?; + } + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + loop { + // Attempt the non-blocking operation. + match self.inner.get_ref().flush() { + Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} + res => { + self.writable = None; + return Poll::Ready(res); + } + } + + // Initialize the future to wait for readiness. + if self.writable.is_none() { + self.writable = Some(self.inner.clone().writable_owned()); + } + + // Poll the future for readiness. + if let Some(f) = &mut self.writable { + let res = ready!(Pin::new(f).poll(cx)); + self.writable = None; + res?; + } + } + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + Poll::Ready(self.inner.get_ref().shutdown(Shutdown::Write)) + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll<io::Result<usize>> { + loop { + // Attempt the non-blocking operation. + match self.inner.get_ref().write_vectored(bufs) { + Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} + res => { + self.writable = None; + return Poll::Ready(res); + } + } + + // Initialize the future to wait for readiness. + if self.writable.is_none() { + self.writable = Some(self.inner.clone().writable_owned()); + } + + // Poll the future for readiness. + if let Some(f) = &mut self.writable { + let res = ready!(Pin::new(f).poll(cx)); + self.writable = None; + res?; + } + } + } +} diff --git a/src/udp.rs b/src/udp.rs new file mode 100644 index 0000000..af14c47 --- /dev/null +++ b/src/udp.rs @@ -0,0 +1,667 @@ +use std::convert::TryFrom; +use std::io; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; +#[cfg(all(not(async_net_no_io_safety), unix))] +use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; +#[cfg(unix)] +use std::os::unix::io::{AsRawFd, RawFd}; +#[cfg(windows)] +use std::os::windows::io::{AsRawSocket, RawSocket}; +#[cfg(all(not(async_net_no_io_safety), windows))] +use std::os::windows::io::{AsSocket, BorrowedSocket, OwnedSocket}; +use std::sync::Arc; + +use async_io::Async; + +use crate::addr::AsyncToSocketAddrs; + +/// A UDP socket. +/// +/// After creating a [`UdpSocket`] by [`bind`][`UdpSocket::bind()`]ing it to a socket address, data +/// can be [sent to] and [received from] any other socket address. +/// +/// Cloning a [`UdpSocket`] creates another handle to the same socket. The socket will be closed +/// when all handles to it are dropped. +/// +/// Although UDP is a connectionless protocol, this implementation provides an interface to set an +/// address where data should be sent and received from. After setting a remote address with +/// [`connect()`][`UdpSocket::connect()`], data can be sent to and received from that address with +/// [`send()`][`UdpSocket::send()`] and [`recv()`][`UdpSocket::recv()`]. +/// +/// As stated in the User Datagram Protocol's specification in [IETF RFC 768], UDP is an unordered, +/// unreliable protocol. Refer to [`TcpListener`][`super::TcpListener`] and +/// [`TcpStream`][`super::TcpStream`] for TCP primitives. +/// +/// [received from]: UdpSocket::recv_from() +/// [sent to]: UdpSocket::send_to() +/// [IETF RFC 768]: https://tools.ietf.org/html/rfc768 +/// +/// # Examples +/// +/// ```no_run +/// use async_net::UdpSocket; +/// +/// # futures_lite::future::block_on(async { +/// let socket = UdpSocket::bind("127.0.0.1:8080").await?; +/// let mut buf = vec![0u8; 20]; +/// +/// loop { +/// // Receive a single datagram message. +/// // If `buf` is too small to hold the entire message, it will be cut off. +/// let (n, addr) = socket.recv_from(&mut buf).await?; +/// +/// // Send the message back to the same address that has sent it. +/// socket.send_to(&buf[..n], &addr).await?; +/// } +/// # std::io::Result::Ok(()) }); +/// ``` +#[derive(Clone, Debug)] +pub struct UdpSocket { + inner: Arc<Async<std::net::UdpSocket>>, +} + +impl UdpSocket { + fn new(inner: Arc<Async<std::net::UdpSocket>>) -> UdpSocket { + UdpSocket { inner } + } + + /// Creates a new [`UdpSocket`] bound to the given address. + /// + /// Binding with a port number of 0 will request that the operating system assigns an available + /// port to this socket. The assigned port can be queried via the + /// [`local_addr()`][`UdpSocket::local_addr()`] method. + /// + /// If `addr` yields multiple addresses, binding will be attempted with each of the addresses + /// until one succeeds and returns the socket. If none of the addresses succeed in creating a + /// socket, the error from the last attempt is returned. + /// + /// # Examples + /// + /// Create a UDP socket bound to `127.0.0.1:3400`: + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:3400").await?; + /// # std::io::Result::Ok(()) }); + /// ``` + /// + /// Create a UDP socket bound to `127.0.0.1:3400`. If that address is unavailable, then try + /// binding to `127.0.0.1:3401`: + /// + /// ```no_run + /// use async_net::{SocketAddr, UdpSocket}; + /// + /// # futures_lite::future::block_on(async { + /// let addrs = [ + /// SocketAddr::from(([127, 0, 0, 1], 3400)), + /// SocketAddr::from(([127, 0, 0, 1], 3401)), + /// ]; + /// let socket = UdpSocket::bind(&addrs[..]).await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn bind<A: AsyncToSocketAddrs>(addr: A) -> io::Result<UdpSocket> { + let mut last_err = None; + + for addr in addr.to_socket_addrs().await? { + match Async::<std::net::UdpSocket>::bind(addr) { + Ok(socket) => return Ok(UdpSocket::new(Arc::new(socket))), + Err(err) => last_err = Some(err), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not bind to any of the addresses", + ) + })) + } + + /// Returns the local address this socket is bound to. + /// + /// This can be useful, for example, when binding to port 0 to figure out which port was + /// actually bound. + /// + /// # Examples + /// + /// Bind to port 0 and then see which port was assigned by the operating system: + /// + /// ```no_run + /// use async_net::{SocketAddr, UdpSocket}; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:0").await?; + /// println!("Bound to {}", socket.local_addr()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.inner.get_ref().local_addr() + } + + /// Returns the remote address this socket is connected to. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// socket.connect("192.168.0.1:41203").await?; + /// println!("Connected to {}", socket.peer_addr()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.inner.get_ref().peer_addr() + } + + /// Connects the UDP socket to an address. + /// + /// When connected, methods [`send()`][`UdpSocket::send()`] and [`recv()`][`UdpSocket::recv()`] + /// will use the specified address for sending and receiving messages. Additionally, a filter + /// will be applied to [`recv_from()`][`UdpSocket::recv_from()`] so that it only receives + /// messages from that same address. + /// + /// If `addr` yields multiple addresses, connecting will be attempted with each of the + /// addresses until the operating system accepts one. If none of the addresses are accepted, + /// the error from the last attempt is returned. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:3400").await?; + /// socket.connect("127.0.0.1:8080").await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn connect<A: AsyncToSocketAddrs>(&self, addr: A) -> io::Result<()> { + let mut last_err = None; + + for addr in addr.to_socket_addrs().await? { + match self.inner.get_ref().connect(addr) { + Ok(()) => return Ok(()), + Err(err) => last_err = Some(err), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not connect to any of the addresses", + ) + })) + } + + /// Receives a single datagram message. + /// + /// On success, returns the number of bytes received and the address message came from. + /// + /// This method must be called with a valid byte buffer of sufficient size to hold a message. + /// If the received message is too long to fit into the buffer, it may be truncated. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// + /// let mut buf = vec![0u8; 1024]; + /// let (n, addr) = socket.recv_from(&mut buf).await?; + /// println!("Received {} bytes from {}", n, addr); + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.inner.recv_from(buf).await + } + + /// Receives a single datagram message without removing it from the queue. + /// + /// On success, returns the number of bytes peeked and the address message came from. + /// + /// This method must be called with a valid byte buffer of sufficient size to hold a message. + /// If the received message is too long to fit into the buffer, it may be truncated. + /// + /// Successive calls return the same message. This is accomplished by passing `MSG_PEEK` as a + /// flag to the underlying `recvfrom` system call. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// + /// let mut buf = vec![0u8; 1024]; + /// let (n, addr) = socket.peek_from(&mut buf).await?; + /// println!("Peeked {} bytes from {}", n, addr); + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.inner.get_ref().peek_from(buf) + } + + /// Sends data to the given address. + /// + /// On success, returns the number of bytes sent. + /// + /// If `addr` yields multiple addresses, the message will only be sent to the first address. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// socket.send_to(b"hello", "127.0.0.1:4242").await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn send_to<A: AsyncToSocketAddrs>(&self, buf: &[u8], addr: A) -> io::Result<usize> { + let addr = match addr.to_socket_addrs().await?.next() { + Some(addr) => addr, + None => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "no addresses to send data to", + )) + } + }; + + self.inner.send_to(buf, addr).await + } + + /// Receives a single datagram message from the connected address. + /// + /// On success, returns the number of bytes received. + /// + /// This method must be called with a valid byte buffer of sufficient size to hold a message. + /// If the received message is too long to fit into the buffer, it may be truncated. + /// + /// The [`connect()`][`UdpSocket::connect()`] method connects this socket to an address. This + /// method will fail if the socket is not connected. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// socket.connect("127.0.0.1:8080").await?; + /// + /// let mut buf = vec![0u8; 1024]; + /// let n = socket.recv(&mut buf).await?; + /// println!("Received {} bytes", n); + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { + self.inner.recv(buf).await + } + + /// Receives a single datagram from the connected address without removing it from the queue. + /// + /// On success, returns the number of bytes peeked. + /// + /// This method must be called with a valid byte buffer of sufficient size to hold a message. + /// If the received message is too long to fit into the buffer, it may be truncated. + /// + /// Successive calls return the same message. This is accomplished by passing `MSG_PEEK` as a + /// flag to the underlying `recv` system call. + /// + /// The [`connect()`][`UdpSocket::connect()`] method connects this socket to an address. This + /// method will fail if the socket is not connected. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// socket.connect("127.0.0.1:8080").await?; + /// + /// let mut buf = vec![0u8; 1024]; + /// let n = socket.peek(&mut buf).await?; + /// println!("Peeked {} bytes", n); + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> { + self.inner.peek(buf).await + } + + /// Sends data to the connected address. + /// + /// The [`connect()`][`UdpSocket::connect()`] method connects this socket to an address. This + /// method will fail if the socket is not connected. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// socket.connect("127.0.0.1:8080").await?; + /// socket.send(b"hello").await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { + self.inner.send(buf).await + } + + /// Gets the value of the `SO_BROADCAST` option for this socket. + /// + /// If set to `true`, this socket is allowed to send packets to a broadcast address. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// println!("SO_BROADCAST is set to {}", socket.broadcast()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn broadcast(&self) -> io::Result<bool> { + self.inner.get_ref().broadcast() + } + + /// Sets the value of the `SO_BROADCAST` option for this socket. + /// + /// If set to `true`, this socket is allowed to send packets to a broadcast address. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// socket.set_broadcast(true)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn set_broadcast(&self, broadcast: bool) -> io::Result<()> { + self.inner.get_ref().set_broadcast(broadcast) + } + + /// Gets the value of the `IP_MULTICAST_LOOP` option for this socket. + /// + /// If set to `true`, multicast packets will be looped back to the local socket. + /// + /// Note that this option may not have any affect on IPv6 sockets. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// println!("IP_MULTICAST_LOOP is set to {}", socket.multicast_loop_v4()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn multicast_loop_v4(&self) -> io::Result<bool> { + self.inner.get_ref().multicast_loop_v4() + } + + /// Sets the value of the `IP_MULTICAST_LOOP` option for this socket. + /// + /// If set to `true`, multicast packets will be looped back to the local socket. + /// + /// Note that this option may not have any affect on IPv6 sockets. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// socket.set_multicast_loop_v4(true)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn set_multicast_loop_v4(&self, multicast_loop_v4: bool) -> io::Result<()> { + self.inner + .get_ref() + .set_multicast_loop_v4(multicast_loop_v4) + } + + /// Gets the value of the `IP_MULTICAST_TTL` option for this socket. + /// + /// Indicates the time-to-live value of outgoing multicast packets for this socket. The default + /// value is 1, which means that multicast packets don't leave the local network unless + /// explicitly requested. + /// + /// Note that this option may not have any effect on IPv6 sockets. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// println!("IP_MULTICAST_TTL is set to {}", socket.multicast_loop_v4()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn multicast_ttl_v4(&self) -> io::Result<u32> { + self.inner.get_ref().multicast_ttl_v4() + } + + /// Sets the value of the `IP_MULTICAST_TTL` option for this socket. + /// + /// Indicates the time-to-live value of outgoing multicast packets for this socket. The default + /// value is 1, which means that multicast packets don't leave the local network unless + /// explicitly requested. + /// + /// Note that this option may not have any effect on IPv6 sockets. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// socket.set_multicast_ttl_v4(10)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> { + self.inner.get_ref().set_multicast_ttl_v4(ttl) + } + + /// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket. + /// + /// Controls whether this socket sees the multicast packets it sends itself. + /// + /// Note that this option may not have any effect on IPv4 sockets. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// println!("IPV6_MULTICAST_LOOP is set to {}", socket.multicast_loop_v6()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn multicast_loop_v6(&self) -> io::Result<bool> { + self.inner.get_ref().multicast_loop_v6() + } + + /// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket. + /// + /// Controls whether this socket sees the multicast packets it sends itself. + /// + /// Note that this option may not have any effect on IPv4 sockets. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// socket.set_multicast_loop_v6(true)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn set_multicast_loop_v6(&self, multicast_loop_v6: bool) -> io::Result<()> { + self.inner + .get_ref() + .set_multicast_loop_v6(multicast_loop_v6) + } + + /// Gets the value of the `IP_TTL` option for this socket. + /// + /// This option configures the time-to-live field that is used in every packet sent from this + /// socket. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// println!("IP_TTL is set to {}", socket.ttl()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn ttl(&self) -> io::Result<u32> { + self.inner.get_ref().ttl() + } + + /// Sets the value of the `IP_TTL` option for this socket. + /// + /// This option configures the time-to-live field that is used in every packet sent from this + /// socket. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// socket.set_ttl(100)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.inner.get_ref().set_ttl(ttl) + } + + /// Executes an operation of the `IP_ADD_MEMBERSHIP` type. + /// + /// This method specifies a new multicast group for this socket to join. Argument `multiaddr` + /// must be a valid multicast address, and `interface` is the address of the local interface + /// with which the system should join the multicast group. If it's equal to `INADDR_ANY` then + /// an appropriate interface is chosen by the system. + pub fn join_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> { + self.inner + .get_ref() + .join_multicast_v4(&multiaddr, &interface) + } + + /// Executes an operation of the `IP_DROP_MEMBERSHIP` type. + /// + /// This method leaves a multicast group. Argument `multiaddr` must be a valid multicast + /// address, and `interface` is the index of the interface to leave. + pub fn leave_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> { + self.inner + .get_ref() + .leave_multicast_v4(&multiaddr, &interface) + } + + /// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type. + /// + /// This method specifies a new multicast group for this socket to join. Argument `multiaddr` + /// must be a valid multicast address, and `interface` is the index of the interface to join + /// (or 0 to indicate any interface). + pub fn join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { + self.inner.get_ref().join_multicast_v6(multiaddr, interface) + } + + /// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type. + /// + /// This method leaves a multicast group. Argument `multiaddr` must be a valid multicast + /// address, and `interface` is the index of the interface to leave. + pub fn leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { + self.inner + .get_ref() + .leave_multicast_v6(multiaddr, interface) + } +} + +impl From<Async<std::net::UdpSocket>> for UdpSocket { + fn from(socket: Async<std::net::UdpSocket>) -> UdpSocket { + UdpSocket::new(Arc::new(socket)) + } +} + +impl TryFrom<std::net::UdpSocket> for UdpSocket { + type Error = io::Error; + + fn try_from(socket: std::net::UdpSocket) -> io::Result<UdpSocket> { + Ok(UdpSocket::new(Arc::new(Async::new(socket)?))) + } +} + +impl From<UdpSocket> for Arc<Async<std::net::UdpSocket>> { + fn from(val: UdpSocket) -> Self { + val.inner + } +} + +#[cfg(unix)] +impl AsRawFd for UdpSocket { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +#[cfg(all(not(async_net_no_io_safety), unix))] +impl AsFd for UdpSocket { + fn as_fd(&self) -> BorrowedFd<'_> { + self.inner.get_ref().as_fd() + } +} + +#[cfg(all(not(async_net_no_io_safety), unix))] +impl TryFrom<OwnedFd> for UdpSocket { + type Error = io::Error; + + fn try_from(value: OwnedFd) -> Result<Self, Self::Error> { + Self::try_from(std::net::UdpSocket::from(value)) + } +} + +#[cfg(windows)] +impl AsRawSocket for UdpSocket { + fn as_raw_socket(&self) -> RawSocket { + self.inner.as_raw_socket() + } +} + +#[cfg(all(not(async_net_no_io_safety), windows))] +impl AsSocket for UdpSocket { + fn as_socket(&self) -> BorrowedSocket<'_> { + self.inner.get_ref().as_socket() + } +} + +#[cfg(all(not(async_net_no_io_safety), windows))] +impl TryFrom<OwnedSocket> for UdpSocket { + type Error = io::Error; + + fn try_from(value: OwnedSocket) -> Result<Self, Self::Error> { + Self::try_from(std::net::UdpSocket::from(value)) + } +} diff --git a/src/unix.rs b/src/unix.rs new file mode 100644 index 0000000..3d2c7ba --- /dev/null +++ b/src/unix.rs @@ -0,0 +1,779 @@ +//! Unix domain sockets. +//! +//! This module is an async version of [`std::os::unix::net`]. + +use std::convert::TryFrom; +use std::fmt; +use std::io::{self, Read as _, Write as _}; +use std::net::Shutdown; +#[cfg(not(async_net_no_io_safety))] +use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; +#[cfg(unix)] +use std::os::unix::io::{AsRawFd, RawFd}; +#[cfg(windows)] +use std::os::windows::io::{AsRawSocket, RawSocket}; +use std::panic::{RefUnwindSafe, UnwindSafe}; +use std::path::Path; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +#[doc(no_inline)] +pub use std::os::unix::net::SocketAddr; + +use async_io::Async; +use futures_lite::{prelude::*, ready}; + +/// A Unix server, listening for connections. +/// +/// After creating a [`UnixListener`] by [`bind`][`UnixListener::bind()`]ing it to an address, it +/// listens for incoming connections. These can be accepted by calling +/// [`accept()`][`UnixListener::accept()`] or by awaiting items from the async stream of +/// [`incoming`][`UnixListener::incoming()`] connections. +/// +/// Cloning a [`UnixListener`] creates another handle to the same socket. The socket will be closed +/// when all handles to it are dropped. +/// +/// # Examples +/// +/// ```no_run +/// use async_net::unix::UnixListener; +/// use futures_lite::prelude::*; +/// +/// # futures_lite::future::block_on(async { +/// let listener = UnixListener::bind("/tmp/socket")?; +/// let mut incoming = listener.incoming(); +/// +/// while let Some(stream) = incoming.next().await { +/// let mut stream = stream?; +/// stream.write_all(b"hello").await?; +/// } +/// # std::io::Result::Ok(()) }); +/// ``` +#[derive(Clone, Debug)] +pub struct UnixListener { + inner: Arc<Async<std::os::unix::net::UnixListener>>, +} + +impl UnixListener { + fn new(inner: Arc<Async<std::os::unix::net::UnixListener>>) -> UnixListener { + UnixListener { inner } + } + + /// Creates a new [`UnixListener`] bound to the given path. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixListener; + /// use futures_lite::prelude::*; + /// + /// # futures_lite::future::block_on(async { + /// let listener = UnixListener::bind("/tmp/socket")?; + /// let mut incoming = listener.incoming(); + /// + /// while let Some(stream) = incoming.next().await { + /// let mut stream = stream?; + /// stream.write_all(b"hello").await?; + /// } + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> { + let listener = Async::<std::os::unix::net::UnixListener>::bind(path)?; + Ok(UnixListener::new(Arc::new(listener))) + } + + /// Accepts a new incoming connection. + /// + /// Returns a TCP stream and the address it is connected to. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixListener; + /// + /// # futures_lite::future::block_on(async { + /// let listener = UnixListener::bind("/tmp/socket")?; + /// let (stream, addr) = listener.accept().await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { + let (stream, addr) = self.inner.accept().await?; + Ok((UnixStream::new(Arc::new(stream)), addr)) + } + + /// Returns a stream of incoming connections. + /// + /// Iterating over this stream is equivalent to calling [`accept()`][`UnixListener::accept()`] + /// in a loop. The stream of connections is infinite, i.e awaiting the next connection will + /// never result in [`None`]. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixListener; + /// use futures_lite::prelude::*; + /// + /// # futures_lite::future::block_on(async { + /// let listener = UnixListener::bind("/tmp/socket")?; + /// let mut incoming = listener.incoming(); + /// + /// while let Some(stream) = incoming.next().await { + /// let mut stream = stream?; + /// stream.write_all(b"hello").await?; + /// } + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn incoming(&self) -> Incoming<'_> { + Incoming { + incoming: Box::pin(self.inner.incoming()), + } + } + + /// Returns the local address this listener is bound to. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixListener; + /// + /// # futures_lite::future::block_on(async { + /// let listener = UnixListener::bind("/tmp/socket")?; + /// println!("Local address is {:?}", listener.local_addr()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.inner.get_ref().local_addr() + } +} + +impl From<Async<std::os::unix::net::UnixListener>> for UnixListener { + fn from(listener: Async<std::os::unix::net::UnixListener>) -> UnixListener { + UnixListener::new(Arc::new(listener)) + } +} + +impl TryFrom<std::os::unix::net::UnixListener> for UnixListener { + type Error = io::Error; + + fn try_from(listener: std::os::unix::net::UnixListener) -> io::Result<UnixListener> { + Ok(UnixListener::new(Arc::new(Async::new(listener)?))) + } +} + +impl From<UnixListener> for Arc<Async<std::os::unix::net::UnixListener>> { + fn from(val: UnixListener) -> Self { + val.inner + } +} + +#[cfg(unix)] +impl AsRawFd for UnixListener { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +#[cfg(all(not(async_net_no_io_safety), unix))] +impl AsFd for UnixListener { + fn as_fd(&self) -> BorrowedFd<'_> { + self.inner.get_ref().as_fd() + } +} + +#[cfg(all(not(async_net_no_io_safety), unix))] +impl TryFrom<OwnedFd> for UnixListener { + type Error = io::Error; + + fn try_from(value: OwnedFd) -> Result<Self, Self::Error> { + Self::try_from(std::os::unix::net::UnixListener::from(value)) + } +} + +#[cfg(windows)] +impl AsRawSocket for UnixListener { + fn as_raw_socket(&self) -> RawSocket { + self.inner.as_raw_socket() + } +} + +/// A stream of incoming Unix connections. +/// +/// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is +/// created by the [`UnixListener::incoming()`] method. +pub struct Incoming<'a> { + incoming: Pin< + Box< + dyn Stream<Item = io::Result<Async<std::os::unix::net::UnixStream>>> + Send + Sync + 'a, + >, + >, +} + +impl Stream for Incoming<'_> { + type Item = io::Result<UnixStream>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let res = ready!(Pin::new(&mut self.incoming).poll_next(cx)); + Poll::Ready(res.map(|res| res.map(|stream| UnixStream::new(Arc::new(stream))))) + } +} + +impl fmt::Debug for Incoming<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Incoming {{ ... }}") + } +} + +/// A Unix connection. +/// +/// A [`UnixStream`] can be created by [`connect`][`UnixStream::connect()`]ing to an endpoint or by +/// [`accept`][`UnixListener::accept()`]ing an incoming connection. +/// +/// [`UnixStream`] is a bidirectional stream that implements traits [`AsyncRead`] and +/// [`AsyncWrite`]. +/// +/// Cloning a [`UnixStream`] creates another handle to the same socket. The socket will be closed +/// when all handles to it are dropped. The reading and writing portions of the connection can also +/// be shut down individually with the [`shutdown()`][`UnixStream::shutdown()`] method. +/// +/// # Examples +/// +/// ```no_run +/// use async_net::unix::UnixStream; +/// use futures_lite::prelude::*; +/// +/// # futures_lite::future::block_on(async { +/// let mut stream = UnixStream::connect("/tmp/socket").await?; +/// stream.write_all(b"hello").await?; +/// +/// let mut buf = vec![0u8; 1024]; +/// let n = stream.read(&mut buf).await?; +/// # std::io::Result::Ok(()) }); +/// ``` +pub struct UnixStream { + inner: Arc<Async<std::os::unix::net::UnixStream>>, + readable: Option<async_io::ReadableOwned<std::os::unix::net::UnixStream>>, + writable: Option<async_io::WritableOwned<std::os::unix::net::UnixStream>>, +} + +impl UnwindSafe for UnixStream {} +impl RefUnwindSafe for UnixStream {} + +impl UnixStream { + fn new(inner: Arc<Async<std::os::unix::net::UnixStream>>) -> UnixStream { + UnixStream { + inner, + readable: None, + writable: None, + } + } + + /// Creates a Unix connection to given path. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixStream; + /// + /// # futures_lite::future::block_on(async { + /// let stream = UnixStream::connect("/tmp/socket").await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> { + let stream = Async::<std::os::unix::net::UnixStream>::connect(path).await?; + Ok(UnixStream::new(Arc::new(stream))) + } + + /// Creates a pair of connected Unix sockets. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixStream; + /// + /// # futures_lite::future::block_on(async { + /// let (stream1, stream2) = UnixStream::pair()?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn pair() -> io::Result<(UnixStream, UnixStream)> { + let (a, b) = Async::<std::os::unix::net::UnixStream>::pair()?; + Ok((UnixStream::new(Arc::new(a)), UnixStream::new(Arc::new(b)))) + } + + /// Returns the local address this socket is connected to. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixStream; + /// + /// # futures_lite::future::block_on(async { + /// let stream = UnixStream::connect("/tmp/socket").await?; + /// println!("Local address is {:?}", stream.local_addr()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.inner.get_ref().local_addr() + } + + /// Returns the remote address this socket is connected to. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixStream; + /// + /// # futures_lite::future::block_on(async { + /// let stream = UnixStream::connect("/tmp/socket").await?; + /// println!("Connected to {:?}", stream.peer_addr()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.inner.get_ref().peer_addr() + } + + /// Shuts down the read half, write half, or both halves of this connection. + /// + /// This method will cause all pending and future I/O in the given directions to return + /// immediately with an appropriate value (see the documentation of [`Shutdown`]). + /// + /// ```no_run + /// use async_net::{Shutdown, unix::UnixStream}; + /// + /// # futures_lite::future::block_on(async { + /// let stream = UnixStream::connect("/tmp/socket").await?; + /// stream.shutdown(Shutdown::Both)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + self.inner.get_ref().shutdown(how) + } +} + +impl fmt::Debug for UnixStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.inner.fmt(f) + } +} + +impl Clone for UnixStream { + fn clone(&self) -> UnixStream { + UnixStream::new(self.inner.clone()) + } +} + +impl From<Async<std::os::unix::net::UnixStream>> for UnixStream { + fn from(stream: Async<std::os::unix::net::UnixStream>) -> UnixStream { + UnixStream::new(Arc::new(stream)) + } +} + +impl TryFrom<std::os::unix::net::UnixStream> for UnixStream { + type Error = io::Error; + + fn try_from(stream: std::os::unix::net::UnixStream) -> io::Result<UnixStream> { + Ok(UnixStream::new(Arc::new(Async::new(stream)?))) + } +} + +impl From<UnixStream> for Arc<Async<std::os::unix::net::UnixStream>> { + fn from(val: UnixStream) -> Self { + val.inner + } +} + +#[cfg(unix)] +impl AsRawFd for UnixStream { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +#[cfg(all(not(async_net_no_io_safety), unix))] +impl AsFd for UnixStream { + fn as_fd(&self) -> BorrowedFd<'_> { + self.inner.get_ref().as_fd() + } +} + +#[cfg(all(not(async_net_no_io_safety), unix))] +impl TryFrom<OwnedFd> for UnixStream { + type Error = io::Error; + + fn try_from(value: OwnedFd) -> Result<Self, Self::Error> { + Self::try_from(std::os::unix::net::UnixStream::from(value)) + } +} + +#[cfg(windows)] +impl AsRawSocket for UnixStream { + fn as_raw_socket(&self) -> RawSocket { + self.inner.as_raw_socket() + } +} + +impl AsyncRead for UnixStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + loop { + // Attempt the non-blocking operation. + match self.inner.get_ref().read(buf) { + Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} + res => { + self.readable = None; + return Poll::Ready(res); + } + } + + // Initialize the future to wait for readiness. + if self.readable.is_none() { + self.readable = Some(self.inner.clone().readable_owned()); + } + + // Poll the future for readiness. + if let Some(f) = &mut self.readable { + let res = ready!(Pin::new(f).poll(cx)); + self.readable = None; + res?; + } + } + } +} + +impl AsyncWrite for UnixStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + loop { + // Attempt the non-blocking operation. + match self.inner.get_ref().write(buf) { + Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} + res => { + self.writable = None; + return Poll::Ready(res); + } + } + + // Initialize the future to wait for readiness. + if self.writable.is_none() { + self.writable = Some(self.inner.clone().writable_owned()); + } + + // Poll the future for readiness. + if let Some(f) = &mut self.writable { + let res = ready!(Pin::new(f).poll(cx)); + self.writable = None; + res?; + } + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + loop { + // Attempt the non-blocking operation. + match self.inner.get_ref().flush() { + Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} + res => { + self.writable = None; + return Poll::Ready(res); + } + } + + // Initialize the future to wait for readiness. + if self.writable.is_none() { + self.writable = Some(self.inner.clone().writable_owned()); + } + + // Poll the future for readiness. + if let Some(f) = &mut self.writable { + let res = ready!(Pin::new(f).poll(cx)); + self.writable = None; + res?; + } + } + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + Poll::Ready(self.inner.get_ref().shutdown(Shutdown::Write)) + } +} + +/// A Unix datagram socket. +/// +/// After creating a [`UnixDatagram`] by [`bind`][`UnixDatagram::bind()`]ing it to a path, data can +/// be [sent to] and [received from] any other socket address. +/// +/// Cloning a [`UnixDatagram`] creates another handle to the same socket. The socket will be closed +/// when all handles to it are dropped. The reading and writing portions of the socket can also be +/// shut down individually with the [`shutdown()`][`UnixStream::shutdown()`] method. +/// +/// [received from]: UnixDatagram::recv_from() +/// [sent to]: UnixDatagram::send_to() +/// +/// # Examples +/// +/// ```no_run +/// use async_net::unix::UnixDatagram; +/// +/// # futures_lite::future::block_on(async { +/// let socket = UnixDatagram::bind("/tmp/socket1")?; +/// socket.send_to(b"hello", "/tmp/socket2").await?; +/// +/// let mut buf = vec![0u8; 1024]; +/// let (n, addr) = socket.recv_from(&mut buf).await?; +/// # std::io::Result::Ok(()) }); +/// ``` +#[derive(Clone, Debug)] +pub struct UnixDatagram { + inner: Arc<Async<std::os::unix::net::UnixDatagram>>, +} + +impl UnixDatagram { + fn new(inner: Arc<Async<std::os::unix::net::UnixDatagram>>) -> UnixDatagram { + UnixDatagram { inner } + } + + /// Creates a new [`UnixDatagram`] bound to the given address. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixDatagram; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UnixDatagram::bind("/tmp/socket")?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixDatagram> { + let socket = Async::<std::os::unix::net::UnixDatagram>::bind(path)?; + Ok(UnixDatagram::new(Arc::new(socket))) + } + + /// Creates a Unix datagram socket not bound to any address. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixDatagram; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UnixDatagram::unbound()?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn unbound() -> io::Result<UnixDatagram> { + let socket = Async::<std::os::unix::net::UnixDatagram>::unbound()?; + Ok(UnixDatagram::new(Arc::new(socket))) + } + + /// Creates a pair of connected Unix datagram sockets. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixDatagram; + /// + /// # futures_lite::future::block_on(async { + /// let (socket1, socket2) = UnixDatagram::pair()?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> { + let (a, b) = Async::<std::os::unix::net::UnixDatagram>::pair()?; + Ok(( + UnixDatagram::new(Arc::new(a)), + UnixDatagram::new(Arc::new(b)), + )) + } + + /// Connects the Unix datagram socket to the given address. + /// + /// When connected, methods [`send()`][`UnixDatagram::send()`] and + /// [`recv()`][`UnixDatagram::recv()`] will use the specified address for sending and receiving + /// messages. Additionally, a filter will be applied to + /// [`recv_from()`][`UnixDatagram::recv_from()`] so that it only receives messages from that + /// same address. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixDatagram; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UnixDatagram::unbound()?; + /// socket.connect("/tmp/socket")?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> { + let p = path.as_ref(); + self.inner.get_ref().connect(p) + } + + /// Returns the local address this socket is bound to. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixDatagram; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UnixDatagram::bind("/tmp/socket")?; + /// println!("Bound to {:?}", socket.local_addr()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.inner.get_ref().local_addr() + } + + /// Returns the remote address this socket is connected to. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixDatagram; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UnixDatagram::unbound()?; + /// socket.connect("/tmp/socket")?; + /// println!("Connected to {:?}", socket.peer_addr()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.inner.get_ref().peer_addr() + } + + /// Receives data from an address. + /// + /// On success, returns the number of bytes received and the address data came from. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixDatagram; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UnixDatagram::bind("/tmp/socket")?; + /// + /// let mut buf = vec![0; 1024]; + /// let (n, addr) = socket.recv_from(&mut buf).await?; + /// println!("Received {} bytes from {:?}", n, addr); + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.inner.recv_from(buf).await + } + + /// Sends data to the given address. + /// + /// On success, returns the number of bytes sent. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixDatagram; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UnixDatagram::unbound()?; + /// socket.send_to(b"hello", "/tmp/socket").await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn send_to<P: AsRef<Path>>(&self, buf: &[u8], path: P) -> io::Result<usize> { + self.inner.send_to(buf, path.as_ref()).await + } + + /// Receives data from the connected address. + /// + /// On success, returns the number of bytes received. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixDatagram; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UnixDatagram::unbound()?; + /// socket.connect("/tmp/socket")?; + /// + /// let mut buf = vec![0; 1024]; + /// let n = socket.recv(&mut buf).await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { + self.inner.recv(buf).await + } + + /// Sends data to the connected address. + /// + /// On success, returns the number of bytes sent. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixDatagram; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UnixDatagram::unbound()?; + /// socket.connect("/tmp/socket")?; + /// socket.send(b"hello").await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { + self.inner.send(buf).await + } + + /// Shuts down the read half, write half, or both halves of this socket. + /// + /// This method will cause all pending and future I/O in the given directions to return + /// immediately with an appropriate value (see the documentation of [`Shutdown`]). + /// + /// # Examples + /// + /// ```no_run + /// use async_net::{Shutdown, unix::UnixDatagram}; + /// + /// # futures_lite::future::block_on(async { + /// let socket = UnixDatagram::unbound()?; + /// socket.shutdown(Shutdown::Both)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + self.inner.get_ref().shutdown(how) + } +} + +impl From<Async<std::os::unix::net::UnixDatagram>> for UnixDatagram { + fn from(socket: Async<std::os::unix::net::UnixDatagram>) -> UnixDatagram { + UnixDatagram::new(Arc::new(socket)) + } +} + +impl TryFrom<std::os::unix::net::UnixDatagram> for UnixDatagram { + type Error = io::Error; + + fn try_from(socket: std::os::unix::net::UnixDatagram) -> io::Result<UnixDatagram> { + Ok(UnixDatagram::new(Arc::new(Async::new(socket)?))) + } +} + +impl From<UnixDatagram> for Arc<Async<std::os::unix::net::UnixDatagram>> { + fn from(val: UnixDatagram) -> Self { + val.inner + } +} + +#[cfg(unix)] +impl AsRawFd for UnixDatagram { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +#[cfg(windows)] +impl AsRawSocket for UnixDatagram { + fn as_raw_socket(&self) -> RawSocket { + self.inner.as_raw_socket() + } +} |