diff options
author | DongHun Kwak <dh0128.kwak@samsung.com> | 2023-03-06 10:13:41 +0900 |
---|---|---|
committer | DongHun Kwak <dh0128.kwak@samsung.com> | 2023-03-06 10:13:41 +0900 |
commit | 2053087767ec3e56e39ad6b20b6a8d4a953a9cf2 (patch) | |
tree | a293e2412de801fa701c772bf5a81bc2cd6c7995 | |
download | rust-async-global-executor-2053087767ec3e56e39ad6b20b6a8d4a953a9cf2.tar.gz rust-async-global-executor-2053087767ec3e56e39ad6b20b6a8d4a953a9cf2.tar.bz2 rust-async-global-executor-2053087767ec3e56e39ad6b20b6a8d4a953a9cf2.zip |
Import async-global-executor 2.3.1upstream/2.3.1upstream
-rw-r--r-- | .cargo_vcs_info.json | 6 | ||||
-rw-r--r-- | .github/FUNDING.yml | 1 | ||||
-rw-r--r-- | .github/workflows/build-and-test.yaml | 41 | ||||
-rw-r--r-- | .github/workflows/lint.yaml | 36 | ||||
-rw-r--r-- | .github/workflows/security.yaml | 15 | ||||
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | CHANGELOG.md | 126 | ||||
-rw-r--r-- | Cargo.toml | 92 | ||||
-rw-r--r-- | Cargo.toml.orig | 56 | ||||
-rw-r--r-- | LICENSE-APACHE | 201 | ||||
-rw-r--r-- | LICENSE-MIT | 23 | ||||
-rw-r--r-- | README.md | 55 | ||||
-rw-r--r-- | src/config.rs | 97 | ||||
-rw-r--r-- | src/executor.rs | 102 | ||||
-rw-r--r-- | src/init.rs | 42 | ||||
-rw-r--r-- | src/lib.rs | 51 | ||||
-rw-r--r-- | src/reactor.rs | 13 | ||||
-rw-r--r-- | src/threading.rs | 156 | ||||
-rw-r--r-- | src/tokio.rs | 41 | ||||
-rw-r--r-- | src/tokio02.rs | 41 | ||||
-rw-r--r-- | src/tokio03.rs | 41 |
21 files changed, 1238 insertions, 0 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json new file mode 100644 index 0000000..90080c7 --- /dev/null +++ b/.cargo_vcs_info.json @@ -0,0 +1,6 @@ +{ + "git": { + "sha1": "0abe723db4ad440f5cebbd06f95b6234a8116398" + }, + "path_in_vcs": "" +}
\ No newline at end of file diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml new file mode 100644 index 0000000..0f5af72 --- /dev/null +++ b/.github/FUNDING.yml @@ -0,0 +1 @@ +github: Keruspe diff --git a/.github/workflows/build-and-test.yaml b/.github/workflows/build-and-test.yaml new file mode 100644 index 0000000..3b53331 --- /dev/null +++ b/.github/workflows/build-and-test.yaml @@ -0,0 +1,41 @@ +name: Build and test + +on: + push: + pull_request: + +jobs: + build_and_test: + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, windows-latest, macos-latest] + rust: [nightly, beta, stable, 1.59.0] + steps: + - uses: actions/checkout@v2 + + - name: Install latest ${{ matrix.rust }} + uses: actions-rs/toolchain@v1 + with: + toolchain: ${{ matrix.rust }} + profile: minimal + override: true + + - name: Run cargo check + uses: actions-rs/cargo@v1 + with: + command: check + args: --all --bins --examples --tests --all-features + + - name: Run cargo check (without dev-dependencies to catch missing feature flags) + if: startsWith(matrix.rust, 'nightly') + uses: actions-rs/cargo@v1 + with: + command: check + args: -Z features=dev_dep + + - name: Run cargo test + uses: actions-rs/cargo@v1 + with: + command: test diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml new file mode 100644 index 0000000..1c56821 --- /dev/null +++ b/.github/workflows/lint.yaml @@ -0,0 +1,36 @@ +name: Lint + +on: + push: + pull_request: + +jobs: + clippy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - uses: actions-rs/toolchain@v1 + with: + toolchain: stable + profile: minimal + components: clippy + - uses: actions-rs/clippy-check@v1 + with: + token: ${{ secrets.GITHUB_TOKEN }} + args: --all-features -- -W clippy::all + + rustfmt: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - uses: actions-rs/toolchain@v1 + with: + toolchain: stable + profile: minimal + components: rustfmt + - uses: actions-rs/cargo@v1 + with: + command: fmt + args: --all -- --check diff --git a/.github/workflows/security.yaml b/.github/workflows/security.yaml new file mode 100644 index 0000000..9890baa --- /dev/null +++ b/.github/workflows/security.yaml @@ -0,0 +1,15 @@ +name: Security audit + +on: + push: + pull_request: + +jobs: + security_audit: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - uses: actions-rs/audit-check@v1 + with: + token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..63486dc --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,126 @@ +# Version 2.3.1 + +- Update blocking dependency + +# Version 2.3.0 + +- Switch back to edition 2021 and MSRV 1.59, dropping num-cups dependency + +# Version 2.2.0 + +- Revert back to edition 2018 and MSRV 1.49 + +# Version 2.1.0 + +- Switch from async-mutex to async-lock +- Switch from num-cpus to Use `std::thread::available_parallelism` +- Update MSRV to 1.59 +- Update to edition 2021 + +# Version 2.0.4 + +- Return concrete type Task from `spawn_blocking` + +# Version 2.0.3 + +- Documentation updates + +# Version 2.0.2 + +- Documentation updates + +# Version 2.0.1 + +- fix build without default features + +# Version 2.0.0 + +- add tokio 1.0 integration +- rework configuration +- add a way to update the number of threads at runtime within configured bounds + +# Version 1.4.3 + +- switch to multi threaded tokio schedulers when enabled + +# Version 1.4.2 + +- Drop an Arc + +# Version 1.4.1 + +- switch back to manual implementation for tokio02 integration + +# Version 1.4.0 + +- add tokio03 integration + +# Version 1.3.0 + +- use async-compat for tokio02 integration + +# Version 1.2.1 + +- tokio02 fix + +# Version 1.2.0 + +- Add tokio02 feature + +# Version 1.1.1 + +- Update `async-executor`. + +# Version 1.1.0 + +- Update async-executor + +# Version 1.0.2 + +- Do not run global tasks in `block_on()` + +# Version 1.0.1 + +- Update dependencies + +# Version 1.0.0 + +- Update dependencies +- Make async-io support optional + +# Version 0.2.3 + +- Change license to MIT or Apache-2.0 + +# Version 0.2.2 + +- Reexport `async_executor::Task` + +# Version 0.2.1 + +- Make sure we spawn at least one thread + +# Version 0.2.0 + +- Rename `run` to `block_on` and drop `'static` requirement +- Add `GlobalExecutorConfig::with_thread_name` + +# Version 0.1.4 + +- Add init functions + +# Version 0.1.3 + +- `run`: do not require `Future` to be `Send` + +# Version 0.1.2 + +- Adjust dependencies + +# Versio 0.1.1 + +- Fix the number of spawned threads + +# Version 0.1.0 + +- Initial release diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..3ff63cb --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,92 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies. +# +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. + +[package] +edition = "2021" +rust-version = "1.59" +name = "async-global-executor" +version = "2.3.1" +authors = ["Marc-Antoine Perennou <Marc-Antoine@Perennou.com>"] +description = "A global executor built on top of async-executor and async-io" +homepage = "https://github.com/Keruspe/async-global-executor" +documentation = "https://docs.rs/async-global-executor" +readme = "README.md" +keywords = [ + "async", + "await", + "future", + "executor", +] +categories = [ + "asynchronous", + "concurrency", +] +license = "Apache-2.0 OR MIT" +repository = "https://github.com/Keruspe/async-global-executor" +resolver = "2" + +[dependencies.async-channel] +version = "^1.5" + +[dependencies.async-executor] +version = "^1.4" + +[dependencies.async-io] +version = "^1.2" +optional = true + +[dependencies.async-lock] +version = "^2.5" + +[dependencies.blocking] +version = "^1.2" + +[dependencies.futures-lite] +version = "^1.0" + +[dependencies.once_cell] +version = "^1.4" + +[dependencies.tokio-crate] +version = "^1.0" +features = [ + "rt", + "rt-multi-thread", +] +optional = true +default-features = false +package = "tokio" + +[dependencies.tokio02-crate] +version = "^0.2" +features = ["rt-core"] +optional = true +default-features = false +package = "tokio" + +[dependencies.tokio03-crate] +version = "^0.3.4" +features = [ + "rt", + "rt-multi-thread", +] +optional = true +default-features = false +package = "tokio" + +[dev-dependencies.doc-comment] +version = "^0.3" + +[features] +default = ["async-io"] +tokio = ["tokio-crate"] +tokio02 = ["tokio02-crate"] +tokio03 = ["tokio03-crate"] diff --git a/Cargo.toml.orig b/Cargo.toml.orig new file mode 100644 index 0000000..e961706 --- /dev/null +++ b/Cargo.toml.orig @@ -0,0 +1,56 @@ +[package] +name = "async-global-executor" +version = "2.3.1" +authors = ["Marc-Antoine Perennou <Marc-Antoine@Perennou.com>"] +description = "A global executor built on top of async-executor and async-io" +edition = "2021" +license = "Apache-2.0 OR MIT" +repository = "https://github.com/Keruspe/async-global-executor" +homepage = "https://github.com/Keruspe/async-global-executor" +documentation = "https://docs.rs/async-global-executor" +keywords = ["async", "await", "future", "executor"] +categories = ["asynchronous", "concurrency"] +readme = "README.md" +rust-version = "1.59" + +[features] +default = ["async-io"] +tokio = ["tokio-crate"] +tokio02 = ["tokio02-crate"] +tokio03 = ["tokio03-crate"] + +[dependencies] +async-channel = "^1.5" +async-executor = "^1.4" +async-lock = "^2.5" +blocking = "^1.2" +futures-lite = "^1.0" +once_cell = "^1.4" + +[dependencies.async-io] +version = "^1.2" +optional = true + +[dependencies.tokio-crate] +package = "tokio" +version = "^1.0" +optional = true +default-features = false +features = ["rt", "rt-multi-thread"] + +[dependencies.tokio02-crate] +package = "tokio" +version = "^0.2" +optional = true +default-features = false +features = ["rt-core"] + +[dependencies.tokio03-crate] +package = "tokio" +version = "^0.3.4" +optional = true +default-features = false +features = ["rt", "rt-multi-thread"] + +[dev-dependencies] +doc-comment = "^0.3" diff --git a/LICENSE-APACHE b/LICENSE-APACHE new file mode 100644 index 0000000..16fe87b --- /dev/null +++ b/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/LICENSE-MIT b/LICENSE-MIT new file mode 100644 index 0000000..31aa793 --- /dev/null +++ b/LICENSE-MIT @@ -0,0 +1,23 @@ +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..a1e210a --- /dev/null +++ b/README.md @@ -0,0 +1,55 @@ +# async-global-executor + +[![API Docs](https://docs.rs/async-global-executor/badge.svg)](https://docs.rs/async-global-executor) +[![Build status](https://github.com/Keruspe/async-global-executor/workflows/Build%20and%20test/badge.svg)](https://github.com/Keruspe/async-global-executor/actions) +[![Downloads](https://img.shields.io/crates/d/async-global-executor.svg)](https://crates.io/crates/async-global-executor) + +A global executor built on top of async-executor and async-io + +# Features + +* `async-io`: if enabled, `async-global-executor` will use `async_io::block_on` instead of + `futures_lite::future::block_on` internally. this is preferred if your application also uses `async-io`. +* `blocking`: enable the use of the `blocking` crate through `async_global_executor::spawn_blocking`. +* `tokio`: if enabled, `async-global-executor` will ensure that all tasks that you will spawn run in the context of a + tokio 1.0 runtime, spawning a new one if required. +* `tokio03`: if enabled, `async-global-executor` will ensure that all tasks that you will spawn run in the context of a + tokio 0.3 runtime, spawning a new one if required. +* `tokio02`: if enabled, `async-global-executor` will ensure that all tasks that you will spawn run in the context of a + tokio 0.2 runtime, spawning a new one if required. + +# Examples + +``` +# use futures_lite::future; + +// spawn a task on the multi-threaded executor +let task1 = async_global_executor::spawn(async { + 1 + 2 +}); +// spawn a task on the local executor (same thread) +let task2 = async_global_executor::spawn_local(async { + 3 + 4 +}); +let task = future::zip(task1, task2); + +// run the executor +async_global_executor::block_on(async { + assert_eq!(task.await, (3, 7)); +}); +``` + +## License + +Licensed under either of + + * Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) + * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) + +at your option. + +#### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in the work by you, as defined in the Apache-2.0 license, shall be +dual licensed as above, without any additional terms or conditions. diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..45048ab --- /dev/null +++ b/src/config.rs @@ -0,0 +1,97 @@ +use once_cell::sync::OnceCell; +use std::{ + fmt, + sync::atomic::{AtomicUsize, Ordering}, +}; + +pub(crate) static GLOBAL_EXECUTOR_CONFIG: OnceCell<Config> = OnceCell::new(); + +/// Configuration to init the thread pool for the multi-threaded global executor. +#[derive(Default)] +pub struct GlobalExecutorConfig { + /// The environment variable from which we'll try to parse the number of threads to spawn. + env_var: Option<&'static str>, + /// The minimum number of threads to spawn. + min_threads: Option<usize>, + /// The maximum number of threads to spawn. + max_threads: Option<usize>, + /// The closure function used to get the name of the thread. The name can be used for identification in panic messages. + thread_name_fn: Option<Box<dyn Fn() -> String + Send + Sync>>, +} + +impl fmt::Debug for GlobalExecutorConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("GlobalExecutorConfig") + .field("env_var", &self.env_var) + .field("min_threads", &self.min_threads) + .field("max_threads", &self.max_threads) + .finish() + } +} + +impl GlobalExecutorConfig { + /// Use the specified environment variable to find the number of threads to spawn. + pub fn with_env_var(mut self, env_var: &'static str) -> Self { + self.env_var = Some(env_var); + self + } + + /// Use the specified value as the minimum number of threads. + pub fn with_min_threads(mut self, min_threads: usize) -> Self { + self.min_threads = Some(min_threads); + self + } + + /// Use the specified value as the maximum number of threads for async tasks. + /// To limit the maximum number of threads for blocking tasks, please use the + /// `BLOCKING_MAX_THREADS` environment variable. + pub fn with_max_threads(mut self, max_threads: usize) -> Self { + self.max_threads = Some(max_threads); + self + } + + /// Use the specified prefix to name the threads. + pub fn with_thread_name_fn( + mut self, + thread_name_fn: impl Fn() -> String + Send + Sync + 'static, + ) -> Self { + self.thread_name_fn = Some(Box::new(thread_name_fn)); + self + } + + pub(crate) fn seal(self) -> Config { + let min_threads = std::env::var(self.env_var.unwrap_or("ASYNC_GLOBAL_EXECUTOR_THREADS")) + .ok() + .and_then(|threads| threads.parse().ok()) + .or(self.min_threads) + .unwrap_or_else(|| std::thread::available_parallelism().map_or(1, usize::from)) + .max(1); + let max_threads = self.max_threads.unwrap_or(min_threads * 4).max(min_threads); + Config { + min_threads, + max_threads, + thread_name_fn: self.thread_name_fn.unwrap_or_else(|| { + Box::new(|| { + static GLOBAL_EXECUTOR_NEXT_THREAD: AtomicUsize = AtomicUsize::new(1); + format!( + "async-global-executor-{}", + GLOBAL_EXECUTOR_NEXT_THREAD.fetch_add(1, Ordering::SeqCst) + ) + }) + }), + } + } +} + +// The actual configuration, computed from the given GlobalExecutorConfig +pub(crate) struct Config { + pub(crate) min_threads: usize, + pub(crate) max_threads: usize, + pub(crate) thread_name_fn: Box<dyn Fn() -> String + Send + Sync>, +} + +impl Default for Config { + fn default() -> Self { + GlobalExecutorConfig::default().seal() + } +} diff --git a/src/executor.rs b/src/executor.rs new file mode 100644 index 0000000..838fbd6 --- /dev/null +++ b/src/executor.rs @@ -0,0 +1,102 @@ +use crate::Task; +use async_executor::{Executor, LocalExecutor}; +use std::future::Future; + +pub(crate) static GLOBAL_EXECUTOR: Executor<'_> = Executor::new(); + +thread_local! { + pub(crate) static LOCAL_EXECUTOR: LocalExecutor<'static> = LocalExecutor::new(); +} + +/// Runs the global and the local executor on the current thread +/// +/// Note: this calls `async_io::block_on` underneath. +/// +/// # Examples +/// +/// ``` +/// let task = async_global_executor::spawn(async { +/// 1 + 2 +/// }); +/// async_global_executor::block_on(async { +/// assert_eq!(task.await, 3); +/// }); +/// ``` +pub fn block_on<F: Future<Output = T>, T>(future: F) -> T { + LOCAL_EXECUTOR.with(|executor| crate::reactor::block_on(executor.run(future))) +} + +/// Spawns a task onto the multi-threaded global executor. +/// +/// # Examples +/// +/// ``` +/// # use futures_lite::future; +/// +/// let task1 = async_global_executor::spawn(async { +/// 1 + 2 +/// }); +/// let task2 = async_global_executor::spawn(async { +/// 3 + 4 +/// }); +/// let task = future::zip(task1, task2); +/// +/// async_global_executor::block_on(async { +/// assert_eq!(task.await, (3, 7)); +/// }); +/// ``` +pub fn spawn<F: Future<Output = T> + Send + 'static, T: Send + 'static>(future: F) -> Task<T> { + crate::init(); + GLOBAL_EXECUTOR.spawn(future) +} + +/// Spawns a task onto the local executor. +/// +/// +/// The task does not need to be `Send` as it will be spawned on the same thread. +/// +/// # Examples +/// +/// ``` +/// # use futures_lite::future; +/// +/// let task1 = async_global_executor::spawn_local(async { +/// 1 + 2 +/// }); +/// let task2 = async_global_executor::spawn_local(async { +/// 3 + 4 +/// }); +/// let task = future::zip(task1, task2); +/// +/// async_global_executor::block_on(async { +/// assert_eq!(task.await, (3, 7)); +/// }); +/// ``` +pub fn spawn_local<F: Future<Output = T> + 'static, T: 'static>(future: F) -> Task<T> { + LOCAL_EXECUTOR.with(|executor| executor.spawn(future)) +} + +/// Runs blocking code on a thread pool. +/// +/// # Examples +/// +/// Read the contents of a file: +/// +/// ```no_run +/// # async_global_executor::block_on(async { +/// let contents = async_global_executor::spawn_blocking(|| std::fs::read_to_string("file.txt")).await?; +/// # std::io::Result::Ok(()) }); +/// ``` +/// +/// Spawn a process: +/// +/// ```no_run +/// use std::process::Command; +/// +/// # async_global_executor::block_on(async { +/// let out = async_global_executor::spawn_blocking(|| Command::new("dir").output()).await?; +/// # std::io::Result::Ok(()) }); +/// ``` +pub fn spawn_blocking<F: FnOnce() -> T + Send + 'static, T: Send + 'static>(f: F) -> Task<T> { + blocking::unblock(f) +} diff --git a/src/init.rs b/src/init.rs new file mode 100644 index 0000000..b32b388 --- /dev/null +++ b/src/init.rs @@ -0,0 +1,42 @@ +use std::sync::atomic::{AtomicBool, Ordering}; + +/// Init the global executor, spawning as many threads as specified or +/// the value specified by the specified environment variable. +/// +/// # Examples +/// +/// ``` +/// async_global_executor::init_with_config( +/// async_global_executor::GlobalExecutorConfig::default() +/// .with_env_var("NUMBER_OF_THREADS") +/// .with_min_threads(4) +/// .with_max_threads(6) +/// .with_thread_name_fn(Box::new(|| "worker".to_string())) +/// ); +/// ``` +pub fn init_with_config(config: crate::config::GlobalExecutorConfig) { + let _ = crate::config::GLOBAL_EXECUTOR_CONFIG.set(config.seal()); + init(); +} + +/// Init the global executor, spawning as many threads as the number or cpus or +/// the value specified by the `ASYNC_GLOBAL_EXECUTOR_THREADS` environment variable +/// if specified. +/// +/// # Examples +/// +/// ``` +/// async_global_executor::init(); +/// ``` +pub fn init() { + static INIT_DONE: AtomicBool = AtomicBool::new(false); + if !INIT_DONE.swap(true, Ordering::SeqCst) { + let config = + crate::config::GLOBAL_EXECUTOR_CONFIG.get_or_init(crate::config::Config::default); + crate::reactor::block_on(async { + crate::threading::spawn_more_threads(config.min_threads) + .await + .expect("cannot spawn executor threads"); + }); + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..0900941 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,51 @@ +//! A global executor built on top of async-executor and async_io +//! +//! The global executor is lazily spawned on first use. It spawns as many threads +//! as the number of cpus by default. You can override this using the +//! `ASYNC_GLOBAL_EXECUTOR_THREADS` environment variable. +//! +//! # Examples +//! +//! ``` +//! # use futures_lite::future; +//! +//! // spawn a task on the multi-threaded executor +//! let task1 = async_global_executor::spawn(async { +//! 1 + 2 +//! }); +//! // spawn a task on the local executor (same thread) +//! let task2 = async_global_executor::spawn_local(async { +//! 3 + 4 +//! }); +//! let task = future::zip(task1, task2); +//! +//! // run the executor +//! async_global_executor::block_on(async { +//! assert_eq!(task.await, (3, 7)); +//! }); +//! ``` + +#![forbid(unsafe_code)] +#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] + +#[cfg(doctest)] +doc_comment::doctest!("../README.md"); + +pub use async_executor::Task; +pub use config::GlobalExecutorConfig; +pub use executor::{block_on, spawn, spawn_blocking, spawn_local}; +pub use init::{init, init_with_config}; +pub use threading::{spawn_more_threads, stop_current_thread, stop_thread}; + +mod config; +mod executor; +mod init; +mod reactor; +mod threading; + +#[cfg(feature = "tokio")] +mod tokio; +#[cfg(feature = "tokio02")] +mod tokio02; +#[cfg(feature = "tokio03")] +mod tokio03; diff --git a/src/reactor.rs b/src/reactor.rs new file mode 100644 index 0000000..090e1a7 --- /dev/null +++ b/src/reactor.rs @@ -0,0 +1,13 @@ +pub(crate) fn block_on<F: std::future::Future<Output = T>, T>(future: F) -> T { + #[cfg(feature = "async-io")] + let run = || async_io::block_on(future); + #[cfg(not(feature = "async-io"))] + let run = || futures_lite::future::block_on(future); + #[cfg(feature = "tokio")] + let _tokio_enter = crate::tokio::enter(); + #[cfg(feature = "tokio02")] + let run = || crate::tokio02::enter(run); + #[cfg(feature = "tokio03")] + let _tokio03_enter = crate::tokio03::enter(); + run() +} diff --git a/src/threading.rs b/src/threading.rs new file mode 100644 index 0000000..ed8b549 --- /dev/null +++ b/src/threading.rs @@ -0,0 +1,156 @@ +use crate::Task; +use async_channel::{Receiver, Sender}; +use async_lock::Mutex; +use futures_lite::future; +use once_cell::sync::OnceCell; +use std::{io, thread}; + +// The current number of threads (some might be shutting down and not in the pool anymore) +static GLOBAL_EXECUTOR_THREADS_NUMBER: Mutex<usize> = Mutex::new(0); +// The expected number of threads (excluding the one that are shutting down) +static GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER: Mutex<usize> = Mutex::new(0); + +thread_local! { + // Used to shutdown a thread when we receive a message from the Sender. + // We send an ack using to the Receiver once we're finished shutting down. + static THREAD_SHUTDOWN: OnceCell<(Sender<()>, Receiver<()>)> = OnceCell::new(); +} + +/// Spawn more executor threads, up to configured max value. +/// +/// Returns how many threads we spawned. +/// +/// # Examples +/// +/// ``` +/// async_global_executor::spawn_more_threads(2); +/// ``` +pub async fn spawn_more_threads(count: usize) -> io::Result<usize> { + // Get the current configuration, or initialize the thread pool. + let config = crate::config::GLOBAL_EXECUTOR_CONFIG + .get() + .unwrap_or_else(|| { + crate::init(); + crate::config::GLOBAL_EXECUTOR_CONFIG.get().unwrap() + }); + // How many threads do we have (including shutting down) + let mut threads_number = GLOBAL_EXECUTOR_THREADS_NUMBER.lock().await; + // How many threads are we supposed to have (when all shutdowns are complete) + let mut expected_threads_number = GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER.lock().await; + // Ensure we don't exceed configured max threads (including shutting down) + let count = count.min(config.max_threads - *threads_number); + for _ in 0..count { + thread::Builder::new() + .name((config.thread_name_fn)()) + .spawn(thread_main_loop)?; + *threads_number += 1; + *expected_threads_number += 1; + } + Ok(count) +} + +/// Stop one of the executor threads, down to configured min value +/// +/// Returns whether a thread has been stopped. +/// +/// # Examples +/// +/// ``` +/// async_global_executor::stop_thread(); +/// ``` +pub fn stop_thread() -> Task<bool> { + crate::spawn(stop_current_executor_thread()) +} + +/// Stop the current executor thread, if we exceed the configured min value +/// +/// Returns whether the thread has been stopped. +/// +/// # Examples +/// +/// ``` +/// async_global_executor::stop_current_thread(); +/// ``` +pub fn stop_current_thread() -> Task<bool> { + crate::spawn_local(stop_current_executor_thread()) +} + +fn thread_main_loop() { + // This will be used to ask for shutdown. + let (s, r) = async_channel::bounded(1); + // This wil be used to ack once shutdown is complete. + let (s_ack, r_ack) = async_channel::bounded(1); + THREAD_SHUTDOWN.with(|thread_shutdown| drop(thread_shutdown.set((s, r_ack)))); + + // Main loop + loop { + #[allow(clippy::blocks_in_if_conditions)] + if std::panic::catch_unwind(|| { + crate::executor::LOCAL_EXECUTOR.with(|executor| { + let local = executor.run(async { + // Wait until we're asked to shutdown. + let _ = r.recv().await; + }); + let global = crate::executor::GLOBAL_EXECUTOR.run(future::pending::<()>()); + crate::reactor::block_on(future::or(local, global)); + }); + }) + .is_ok() + { + break; + } + } + + wait_for_local_executor_completion(); + + // Ack that we're done shutting down. + crate::reactor::block_on(async { + let _ = s_ack.send(()).await; + }); +} + +fn wait_for_local_executor_completion() { + loop { + #[allow(clippy::blocks_in_if_conditions)] + if std::panic::catch_unwind(|| { + crate::executor::LOCAL_EXECUTOR.with(|executor| { + crate::reactor::block_on(async { + // Wait for spawned tasks completion + while !executor.is_empty() { + executor.tick().await; + } + }); + }); + }) + .is_ok() + { + break; + } + } +} + +async fn stop_current_executor_thread() -> bool { + // How many threads are we supposed to have (when all shutdowns are complete) + let mut expected_threads_number = GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER.lock().await; + // Ensure we don't go below the configured min_threads (ignoring shutting down) + if *expected_threads_number + > crate::config::GLOBAL_EXECUTOR_CONFIG + .get() + .unwrap() + .min_threads + { + let (s, r_ack) = + THREAD_SHUTDOWN.with(|thread_shutdown| thread_shutdown.get().unwrap().clone()); + let _ = s.send(()).await; + // We now expect to have one less thread (this one is shutting down) + *expected_threads_number -= 1; + // Unlock the Mutex + drop(expected_threads_number); + let _ = r_ack.recv().await; + // This thread is done shutting down + *GLOBAL_EXECUTOR_THREADS_NUMBER.lock().await -= 1; + true + } else { + false + } +} diff --git a/src/tokio.rs b/src/tokio.rs new file mode 100644 index 0000000..a594640 --- /dev/null +++ b/src/tokio.rs @@ -0,0 +1,41 @@ +use once_cell::sync::Lazy; +use tokio_crate as tokio; + +pub(crate) fn enter() -> tokio::runtime::EnterGuard<'static> { + RUNTIME.enter() +} + +static RUNTIME: Lazy<tokio::runtime::Handle> = Lazy::new(|| { + tokio::runtime::Handle::try_current().unwrap_or_else(|_| { + let rt = tokio::runtime::Runtime::new().expect("failed to build tokio runtime"); + let handle = rt.handle().clone(); + std::thread::Builder::new() + .name("async-global-executor/tokio".to_string()) + .spawn(move || { + rt.block_on(futures_lite::future::pending::<()>()); + }) + .expect("failed to spawn tokio driver thread"); + handle + }) +}); + +#[cfg(test)] +mod test { + use super::*; + + async fn compute() -> u8 { + tokio::spawn(async { 1 + 2 }).await.unwrap() + } + + #[test] + fn spawn_tokio() { + crate::block_on(async { + assert_eq!( + crate::spawn(compute()).await + + crate::spawn_local(compute()).await + + tokio::spawn(compute()).await.unwrap(), + 9 + ); + }); + } +} diff --git a/src/tokio02.rs b/src/tokio02.rs new file mode 100644 index 0000000..d4ff571 --- /dev/null +++ b/src/tokio02.rs @@ -0,0 +1,41 @@ +use once_cell::sync::Lazy; +use tokio02_crate as tokio; + +pub(crate) fn enter<F: FnOnce() -> R, R>(f: F) -> R { + RUNTIME.enter(f) +} + +static RUNTIME: Lazy<tokio::runtime::Handle> = Lazy::new(|| { + tokio::runtime::Handle::try_current().unwrap_or_else(|_| { + let mut rt = tokio::runtime::Runtime::new().expect("failed to build tokio02 runtime"); + let handle = rt.handle().clone(); + std::thread::Builder::new() + .name("async-global-executor/tokio02".to_string()) + .spawn(move || { + rt.block_on(futures_lite::future::pending::<()>()); + }) + .expect("failed to spawn tokio02 driver thread"); + handle + }) +}); + +#[cfg(test)] +mod test { + use super::*; + + async fn compute() -> u8 { + tokio::spawn(async { 1 + 2 }).await.unwrap() + } + + #[test] + fn spawn_tokio() { + crate::block_on(async { + assert_eq!( + crate::spawn(compute()).await + + crate::spawn_local(compute()).await + + tokio::spawn(compute()).await.unwrap(), + 9 + ); + }); + } +} diff --git a/src/tokio03.rs b/src/tokio03.rs new file mode 100644 index 0000000..67a1ce7 --- /dev/null +++ b/src/tokio03.rs @@ -0,0 +1,41 @@ +use once_cell::sync::Lazy; +use tokio03_crate as tokio; + +pub(crate) fn enter() -> tokio::runtime::EnterGuard<'static> { + RUNTIME.enter() +} + +static RUNTIME: Lazy<tokio::runtime::Handle> = Lazy::new(|| { + tokio::runtime::Handle::try_current().unwrap_or_else(|_| { + let rt = tokio::runtime::Runtime::new().expect("failed to build tokio03 runtime"); + let handle = rt.handle().clone(); + std::thread::Builder::new() + .name("async-global-executor/tokio03".to_string()) + .spawn(move || { + rt.block_on(futures_lite::future::pending::<()>()); + }) + .expect("failed to spawn tokio03 driver thread"); + handle + }) +}); + +#[cfg(test)] +mod test { + use super::*; + + async fn compute() -> u8 { + tokio::spawn(async { 1 + 2 }).await.unwrap() + } + + #[test] + fn spawn_tokio() { + crate::block_on(async { + assert_eq!( + crate::spawn(compute()).await + + crate::spawn_local(compute()).await + + tokio::spawn(compute()).await.unwrap(), + 9 + ); + }); + } +} |